Cumulus API class providing functionality to make requests
asyncio implemented with httpx for HTTP/2 protocol
Source code in cumulus_geoproc/utils/capi.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87 | class CumulusAPI:
"""Cumulus API class providing functionality to make requests
asyncio implemented with httpx for HTTP/2 protocol
"""
def __init__(self, url: str, http2: bool = False):
# set url to env var if not provided
self.url = url
self.http2 = http2
self.url_split = urlsplit(self.url)._asdict()
def __repr__(self) -> str:
return f"{__class__.__name__}({self.url}, {self.http2}, {self.url_split})"
@property
def parameters(self):
return self.url_split
@parameters.setter
def parameters(self, key_val):
self.url_split[key_val[0]] = key_val[1]
self.build_url(self.url_split)
def build_url(self, url_params):
self.url = urlunsplit(namedtuple("UrlUnsplit", url_params.keys())(**url_params))
@property
def endpoint(self):
return self.url_split["path"]
@endpoint.setter
def endpoint(self, endpoint):
self.url_split["path"] = endpoint
self.build_url(self.url_split)
@property
def query(self):
return self.url_split["query"]
@query.setter
def query(self, query: dict):
self.url_split["query"] = urlencode(query)
self.build_url(self.url_split)
async def post_(self, url, payload):
try:
client = httpx.AsyncClient(http2=self.http2)
headers = [(b"content-type", b"application/json")]
resp = await client.post(url, headers=headers, json=payload)
if resp.status_code in (200, 201):
return resp.json()
except ConnectionError as ex:
logger.warning(ex)
async def put_(self, url, payload):
try:
client = httpx.AsyncClient(http2=self.http2)
headers = [(b"content-type", b"application/json")]
resp = await client.put(url, headers=headers, json=payload)
if resp.status_code in (200, 201):
return resp.json()
except ConnectionError as ex:
logger.warning(ex)
async def get_(self, url):
try:
async with httpx.AsyncClient(http2=self.http2) as client:
resp = await client.get(url)
if resp.status_code == 200:
return resp
except ConnectionError as ex:
logger.warning(ex)
|