Module phc.adapter
Expand source code
import aiohttp
class Adapter:
should_refresh: bool
def __init__(self):
self.should_refresh = True
async def send(
self,
*,
http_verb: str,
api_url: str,
req_args: dict,
trust_env: bool,
timeout: int,
):
"""Submit the HTTP request with the running session or a new session.
Returns:
A dictionary of the response data.
"""
async with aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=timeout), trust_env=trust_env
) as session:
async with session.request(http_verb, api_url, **req_args) as res:
return {
"data": await (
res.json()
if res.content_type == "application/json"
else res.text()
),
"headers": res.headers,
"status_code": res.status,
}
Classes
class Adapter
-
Expand source code
class Adapter: should_refresh: bool def __init__(self): self.should_refresh = True async def send( self, *, http_verb: str, api_url: str, req_args: dict, trust_env: bool, timeout: int, ): """Submit the HTTP request with the running session or a new session. Returns: A dictionary of the response data. """ async with aiohttp.ClientSession( timeout=aiohttp.ClientTimeout(total=timeout), trust_env=trust_env ) as session: async with session.request(http_verb, api_url, **req_args) as res: return { "data": await ( res.json() if res.content_type == "application/json" else res.text() ), "headers": res.headers, "status_code": res.status, }
Class variables
var should_refresh : bool
Methods
async def send(self, *, http_verb: str, api_url: str, req_args: dict, trust_env: bool, timeout: int)
-
Submit the HTTP request with the running session or a new session.
Returns
A dictionary of the response data.
Expand source code
async def send( self, *, http_verb: str, api_url: str, req_args: dict, trust_env: bool, timeout: int, ): """Submit the HTTP request with the running session or a new session. Returns: A dictionary of the response data. """ async with aiohttp.ClientSession( timeout=aiohttp.ClientTimeout(total=timeout), trust_env=trust_env ) as session: async with session.request(http_verb, api_url, **req_args) as res: return { "data": await ( res.json() if res.content_type == "application/json" else res.text() ), "headers": res.headers, "status_code": res.status, }