Module phc.easy.query.ga4gh
Expand source code
from typing import List, Union
from phc.easy.auth import Auth
from phc.base_client import BaseClient
PAGE_SIZE = 50
def recursive_execute_ga4gh(
auth: Auth,
client: BaseClient,
path: str,
http_verb: str,
results_key: str,
params: dict,
scroll: bool = False,
next_page_token: Union[str, None] = None,
_prev_results: List[dict] = [],
):
page_size = params.get("pageSize", PAGE_SIZE)
response = client._ga4gh_call(
path, http_verb=http_verb, json={**params, "pageToken": next_page_token}
)
current_results = response.data[results_key]
results = [*_prev_results, *current_results]
is_last_batch = len(current_results) < page_size or scroll is False
if is_last_batch:
print(f"Retrieved {len(results)} results")
return results
return recursive_execute_ga4gh(
auth=auth,
client=client,
path=path,
http_verb=http_verb,
results_key=results_key,
params=params,
scroll=scroll,
next_page_token=response.data["nextPageToken"],
_prev_results=results,
)
Functions
def recursive_execute_ga4gh(auth: Auth, client: phc.base_client.BaseClient, path: str, http_verb: str, results_key: str, params: dict, scroll: bool = False, next_page_token: Optional[str] = None)
-
Expand source code
def recursive_execute_ga4gh( auth: Auth, client: BaseClient, path: str, http_verb: str, results_key: str, params: dict, scroll: bool = False, next_page_token: Union[str, None] = None, _prev_results: List[dict] = [], ): page_size = params.get("pageSize", PAGE_SIZE) response = client._ga4gh_call( path, http_verb=http_verb, json={**params, "pageToken": next_page_token} ) current_results = response.data[results_key] results = [*_prev_results, *current_results] is_last_batch = len(current_results) < page_size or scroll is False if is_last_batch: print(f"Retrieved {len(results)} results") return results return recursive_execute_ga4gh( auth=auth, client=client, path=path, http_verb=http_verb, results_key=results_key, params=params, scroll=scroll, next_page_token=response.data["nextPageToken"], _prev_results=results, )