Cumulus geoprocessor utilities

CumulusAPI

Cumulus API class providing functionality to make requests

asyncio implemented with httpx for HTTP/2 protocol

Source code in cumulus_packager/utils/capi.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
class CumulusAPI:
    """Cumulus API class providing functionality to make requests

    asyncio implemented with httpx for HTTP/2 protocol

    """

    def __init__(self, url: str, http2: bool = False):
        # set url to env var if not provided
        self.url = url
        self.http2 = http2
        self.url_split = urlsplit(self.url)._asdict()

    def __repr__(self) -> str:
        return f"{__class__.__name__}({self.url}, {self.http2}, {self.url_split})"

    @property
    def parameters(self):
        return self.url_split

    @parameters.setter
    def parameters(self, key_val):
        self.url_split[key_val[0]] = key_val[1]
        self.build_url(self.url_split)

    def build_url(self, url_params):
        self.url = urlunsplit(namedtuple("UrlUnsplit", url_params.keys())(**url_params))

    @property
    def endpoint(self):
        return self.url_split["path"]

    @endpoint.setter
    def endpoint(self, endpoint):
        self.url_split["path"] = endpoint
        self.build_url(self.url_split)

    @property
    def query(self):
        return self.url_split["query"]

    @query.setter
    def query(self, query: dict):
        self.url_split["query"] = urlencode(query)
        self.build_url(self.url_split)

    async def post_(self, url, payload):
        try:
            client = httpx.AsyncClient(http2=self.http2)
            headers = [(b"content-type", b"application/json")]
            resp = await client.post(url, headers=headers, json=payload)
            if resp.status_code in (200, 201):
                return resp.json()
        except ConnectionError as ex:
            logger.warning(ex)

    async def put_(self, url, payload):
        try:
            client = httpx.AsyncClient(http2=self.http2)
            headers = [(b"content-type", b"application/json")]
            resp = await client.put(url, headers=headers, json=payload)
            if resp.status_code in (200, 201):
                return resp.json()
        except ConnectionError as ex:
            logger.warning(ex)

    async def get_(self, url):
        try:
            async with httpx.AsyncClient(http2=self.http2) as client:
                resp = await client.get(url)
                if resp.status_code == 200:
                    return resp
        except ConnectionError as ex:
            logger.warning(ex)

NotifyCumulus

Bases: CumulusAPI

Cumulus notification class extending CumulusAPI

Parameters:

Name Type Description Default
CumulusAPI class

base class

required
Source code in cumulus_packager/utils/capi.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
class NotifyCumulus(CumulusAPI):
    """Cumulus notification class extending CumulusAPI

    Parameters
    ----------
    CumulusAPI : class
        base class
    """

    def __init__(self, url, http2=True):
        super().__init__(url, http2)
        self.endpoint = "productfiles"
        self.query = {"key": APPLICATION_KEY}

    def run(self, payload):
        self.post(self.url, payload=payload)