Source code for mpesa.portalsdk.api

"""Mpesa PortalSDK API Helper Module."""
import json
from enum import Enum

import requests
from base64 import b64decode, b64encode
from Crypto.PublicKey import RSA
from Crypto.Cipher import PKCS1_v1_5 as Cipher_PKCS1_v1_5

[docs]class APIRequest: """API Request Class. :param context: context under which to create the API Request. :type context: :class:`mpesa.portalsdk.APIContext`. """ def __init__(self, context=None): """Construct.""" self.context = context
[docs] def execute(self): """Execute API Request using ``self.context``. :raises: ``requests.exceptions.ConnectionError`` :raises: ``TypeError`` :return: response object of ``mpesa.portalsdk.APIResponse``. :rtype: :class:`mpesa.portalsdk.APIResponse`. """ if self.context is not None: self.create_default_headers() try: return { APIMethodType.GET: self.__get, APIMethodType.POST: self.__post, APIMethodType.PUT: self.__put, }.get(self.context.method_type, self.__unknown)() except requests.exceptions.ConnectionError as ce: raise ce else: raise TypeError("Context cannot be None.")
[docs] def create_bearer_token(self): """Return encrypted context api_key using the context public_key.""" key_der = b64decode(self.context.public_key) key_pub = RSA.importKey(key_der) cipher = cipher_text = cipher.encrypt(self.context.api_key.encode("ascii")) encrypted_msg = b64encode(cipher_text) return encrypted_msg
[docs] def create_default_headers(self): """Add some default headers to ``self.context``.""" self.context.add_header( "Authorization", "Bearer {}".format(self.create_bearer_token().decode("utf-8")), ) self.context.add_header("Content-Type", "application/json") self.context.add_header("Host", self.context.address)
def __get(self): """Return ``mpesa.portalsdk.APIResponse`` after GET Request.""" r = requests.get( self.context.get_url(), params=self.context.get_parameters(), headers=self.context.get_headers(), ) print(r) return APIResponse( r.status_code, json.loads(r.headers.__str__().replace("'", '"')), json.loads(r.text), ) def __post(self): """Return ``mpesa.portalsdk.APIResponse`` after POST Request.""" r = self.context.get_url(), headers=self.context.get_headers(), json=self.context.get_parameters(), ) print(r) return APIResponse( r.status_code, json.loads(r.headers.__str__().replace("'", '"')), json.loads(r.text), ) def __put(self): """Return ``mpesa.portalsdk.APIResponse`` after PUT Request.""" print("PUT") r = requests.put( self.context.get_url(), headers=self.context.get_headers(), json=self.context.get_parameters(), ) print("PUT", r) return APIResponse( r.status_code, json.loads(r.headers.__str__().replace("'", '"')), json.loads(r.text), ) def __unknown(self): """Raise Unknown Method Exception. :raises: Exception("Unknown Method"). """ raise Exception("Unknown Method")
[docs]class APIResponse(dict): """API Response Class. :param status_code: String representing HTTP Status Code. :param headers: Dict object of key,value headers. :param body: Dict object of key,value pairs. :type status_code: str :type headers: dict :type body: dict """ def __init__(self, status_code, headers, body): """Construct.""" super(APIResponse, self).__init__() self["status_code"]: str = status_code self["headers"]: dict = headers self["body"]: dict = body @property def status_code(self) -> int: """Return HTTP Status Code.""" return self["status_code"] @status_code.setter def status_code(self, status_code: int): if type(status_code) is not int: raise TypeError("status_code must be a int") else: self["status_code"] = status_code @property def headers(self) -> dict: """Return HTTP Headers.""" return self["headers"] @headers.setter def headers(self, headers: dict): if type(headers) is not dict: raise TypeError("headers must be a dict") else: self["headers"] = headers @property def body(self) -> dict: """Return HTTP Body.""" return self["body"] @body.setter def body(self, body: dict): if type(body) is not dict: raise TypeError("body must be a dict") else: self["body"] = body
[docs]class APIMethodType(Enum): """API Method Type Class.""" GET: int = 0 POST: int = 1 PUT: int = 3 DELETE: int = 4
[docs]class APIContext(dict): """API Context Class. :param api_key: API Key. :param public_key: Public Key. :param ssl: Whether to use ssl, defaults ``False``. :param method_type: HTTP Method Type, defaults ``APIMethodType.GET``. :param address: Address, defaults ``""``. :param port: Port, defaults ``80``. :param path: URL path, defaults ``""``. :param headers: Headers, defaults ``{}``. :param parameters: Parameters, defaults ``{}``. :type api_key: str. :type public_key: str. :type ssl: bool. :type method_type: :class: `APIMethodType`, optional. :type address: str. :type port: int. :type path: str. :type headers: dict. :type parameters: dict. """ def __init__( self, api_key="", public_key="", ssl=False, method_type=APIMethodType.GET, address="", port=80, path="", headers={}, parameters={}, ): super(APIContext, self).__init__() self["api_key"]: str = api_key self["public_key"]: str = public_key self["ssl"]: bool = ssl self["method_type"]: Enum = method_type self["address"]: str = address self["port"]: int = port self["path"]: str = path self["headers"]: dict = headers self["parameters"]: dict = parameters
[docs] def get_url(self): """Return formed url from context data.""" if self.ssl is True: return "https://{}:{}{}".format(self.address, self.port, self.path) else: return "http://{}:{}{}".format(self.address, self.port, self.path)
[docs] def add_header(self, header, value): """Update Headers dict with header,value.""" self["headers"].update({header: value})
[docs] def get_headers(self): """Return self headers.""" return self["headers"]
[docs] def add_parameter(self, key, value): """Update Parameters dict with key,value.""" self["parameters"].update({key: value})
[docs] def get_parameters(self): """Return self parameters.""" return self["parameters"]
@property def api_key(self) -> str: """Return self api_key.""" return self["api_key"] @api_key.setter def api_key(self, api_key: str): if type(api_key) is not str: raise TypeError("api_key must be a str") else: self["api_key"] = api_key @property def public_key(self) -> str: """Return self public_key.""" return self["public_key"] @public_key.setter def public_key(self, public_key: str): if type(public_key) is not str: raise TypeError("public_key must be a str") else: self["public_key"] = public_key @property def ssl(self) -> bool: """Return self ssl.""" return self["ssl"] @ssl.setter def ssl(self, ssl: bool): if type(ssl) is not bool: raise TypeError("ssl must be a bool") else: self["ssl"] = ssl @property def method_type(self) -> APIMethodType: """Return self method_type.""" return self["method_type"] @method_type.setter def method_type(self, method_type: APIMethodType): if type(method_type) is not APIMethodType: raise TypeError("method_type must be a APIMethodType") else: self["method_type"] = method_type @property def address(self) -> str: """Return self address.""" return self["address"] @address.setter def address(self, address: str): if type(address) is not str: raise TypeError("address must be a str") else: self["address"] = address @property def port(self) -> int: """Return self port.""" return self["port"] @port.setter def port(self, port: int): if type(port) is not int: raise TypeError("port must be a int") else: self["port"] = port @property def path(self) -> str: """Return self path.""" return self["path"] @path.setter def path(self, path: str): if type(path) is not str: raise TypeError("path must be a str") else: self["path"] = path