Module phc.easy.query.api_paging

Expand source code
import json
from typing import Any, Callable, List, Optional, Union
from urllib.parse import parse_qs, quote, urlparse

from funcy import nth
from phc.base_client import BaseClient
from phc.easy.auth import Auth
from phc.easy.util import tqdm

MAX_RESULT_SIZE = 999


def clean_params(params: dict):
    return {
        k: v
        for k, v in params.items()
        if ((v is not None) and (not isinstance(v, str) or len(v) > 0))
    }


def get_next_page_token(data: dict):
    if "links" in data:
        # If links is present, the only 100% accurate way to get the next token is by
        # parsing the next URL
        return parse_next_page_token_from_url(
            data.get("links", {}).get("next", "")
        )

    next_page_token = data.get("nextPageToken")
    if next_page_token and isinstance(next_page_token, dict):
        # URL encode next page token since it's sometimes a dictionary
        return json.dumps(next_page_token)

    if next_page_token and isinstance(next_page_token, str):
        return next_page_token

    return None


def parse_next_page_token_from_url(next_url: str):
    "Parse next url and retrieve nextPageToken (or None)"
    return nth(0, parse_qs(urlparse(next_url).query).get("nextPageToken", []))


def recursive_paging_api_call(
    path: str,
    params: dict = {},
    http_verb: str = "GET",
    scroll: bool = False,
    progress: Optional[tqdm] = None,
    auth_args: Optional[Auth] = Auth.shared(),
    callback: Union[Callable[[Any, bool], None], None] = None,
    max_pages: Optional[int] = None,
    page_size: Optional[int] = None,
    item_key: str = "items",
    response_to_items: Optional[Callable[[Union[list, dict]], list]] = None,
    log: bool = False,
    try_count: bool = True,
    _current_page: int = 1,
    _prev_results: List[dict] = [],
    _next_page_token: Optional[str] = None,
    _count: Optional[Union[float, int]] = None,
):
    auth = Auth(auth_args)
    client = BaseClient(auth.session())

    if _next_page_token:
        params = {**params, "nextPageToken": _next_page_token}

    if page_size:
        params = {**params, "pageSize": page_size}

    # NOTE: Parallelism is kept with execute_fhir_dsl to unify the API calls
    if scroll is False:
        max_pages = 1

    # Compute count and add to progress
    if try_count and _count is None and len(_prev_results) == 0:
        count_response = client._api_call(
            path,
            http_verb=http_verb,
            # Use minimum pageSize in case this endpoint doesn't support count
            params={**params, "include": "count", "pageSize": 1},
        )

        _count = count_response.get("count")
        # Count appears to only go up to 999
        if _count == 999:
            print(f"Results are {_count}+.")
            _count = None

        if _count and (progress is not None):
            progress.reset(_count)

    response = client._api_call(path, http_verb=http_verb, params=params)

    if response_to_items is None:

        def response_to_items(data):
            return data.get(item_key, [])

    current_results = response_to_items(response.data)

    if progress is not None:
        progress.update(len(current_results))

    next_page_token = get_next_page_token(response.data)

    is_last_batch = (
        (scroll is False)
        or ((max_pages is not None) and (_current_page >= max_pages))
        # Using the next link is the only completely reliable way to tell if a
        # next page exists
        or (next_page_token is None)
    )
    results = [] if callback else [*_prev_results, *current_results]

    # Sometimes the count doesn't match the results. We make it sync up if the
    # count doesn't match but we got all results.
    # TODO: Remove this when API fixed
    if (
        (progress is not None)
        and scroll
        and is_last_batch
        and (progress.total != progress.n)
    ):
        count = progress.n
        progress.reset(count)
        progress.update(count)

    if callback and not is_last_batch:
        callback(current_results, False)
    elif callback and is_last_batch:
        return callback(current_results, True)
    elif is_last_batch:
        if progress is not None:
            progress.close()

        # Because count is often wrong, we'll skip the logging here
        # TODO: Uncomment this when API fixed
        # print(
        #     f"Retrieved {len(results)}{f'/{_count}' if _count else ''} results"
        # )
        return results

    return recursive_paging_api_call(
        path,
        params=params,
        http_verb=http_verb,
        progress=progress,
        auth_args=auth_args,
        callback=callback,
        max_pages=max_pages,
        page_size=page_size,
        log=log,
        scroll=scroll,
        try_count=try_count,
        item_key=item_key,
        response_to_items=response_to_items,
        _current_page=_current_page + 1,
        _prev_results=results,
        _next_page_token=next_page_token,
        _count=_count,
    )

Functions

def clean_params(params: dict)
Expand source code
def clean_params(params: dict):
    return {
        k: v
        for k, v in params.items()
        if ((v is not None) and (not isinstance(v, str) or len(v) > 0))
    }
def get_next_page_token(data: dict)
Expand source code
def get_next_page_token(data: dict):
    if "links" in data:
        # If links is present, the only 100% accurate way to get the next token is by
        # parsing the next URL
        return parse_next_page_token_from_url(
            data.get("links", {}).get("next", "")
        )

    next_page_token = data.get("nextPageToken")
    if next_page_token and isinstance(next_page_token, dict):
        # URL encode next page token since it's sometimes a dictionary
        return json.dumps(next_page_token)

    if next_page_token and isinstance(next_page_token, str):
        return next_page_token

    return None
def parse_next_page_token_from_url(next_url: str)

Parse next url and retrieve nextPageToken (or None)

Expand source code
def parse_next_page_token_from_url(next_url: str):
    "Parse next url and retrieve nextPageToken (or None)"
    return nth(0, parse_qs(urlparse(next_url).query).get("nextPageToken", []))
def recursive_paging_api_call(path: str, params: dict = {}, http_verb: str = 'GET', scroll: bool = False, progress: None = None, auth_args: Optional[Auth] = <phc.easy.auth.Auth object>, callback: Optional[Callable[[Any, bool], None]] = None, max_pages: Optional[int] = None, page_size: Optional[int] = None, item_key: str = 'items', response_to_items: Optional[Callable[[Union[list, dict]], list]] = None, log: bool = False, try_count: bool = True)
Expand source code
def recursive_paging_api_call(
    path: str,
    params: dict = {},
    http_verb: str = "GET",
    scroll: bool = False,
    progress: Optional[tqdm] = None,
    auth_args: Optional[Auth] = Auth.shared(),
    callback: Union[Callable[[Any, bool], None], None] = None,
    max_pages: Optional[int] = None,
    page_size: Optional[int] = None,
    item_key: str = "items",
    response_to_items: Optional[Callable[[Union[list, dict]], list]] = None,
    log: bool = False,
    try_count: bool = True,
    _current_page: int = 1,
    _prev_results: List[dict] = [],
    _next_page_token: Optional[str] = None,
    _count: Optional[Union[float, int]] = None,
):
    auth = Auth(auth_args)
    client = BaseClient(auth.session())

    if _next_page_token:
        params = {**params, "nextPageToken": _next_page_token}

    if page_size:
        params = {**params, "pageSize": page_size}

    # NOTE: Parallelism is kept with execute_fhir_dsl to unify the API calls
    if scroll is False:
        max_pages = 1

    # Compute count and add to progress
    if try_count and _count is None and len(_prev_results) == 0:
        count_response = client._api_call(
            path,
            http_verb=http_verb,
            # Use minimum pageSize in case this endpoint doesn't support count
            params={**params, "include": "count", "pageSize": 1},
        )

        _count = count_response.get("count")
        # Count appears to only go up to 999
        if _count == 999:
            print(f"Results are {_count}+.")
            _count = None

        if _count and (progress is not None):
            progress.reset(_count)

    response = client._api_call(path, http_verb=http_verb, params=params)

    if response_to_items is None:

        def response_to_items(data):
            return data.get(item_key, [])

    current_results = response_to_items(response.data)

    if progress is not None:
        progress.update(len(current_results))

    next_page_token = get_next_page_token(response.data)

    is_last_batch = (
        (scroll is False)
        or ((max_pages is not None) and (_current_page >= max_pages))
        # Using the next link is the only completely reliable way to tell if a
        # next page exists
        or (next_page_token is None)
    )
    results = [] if callback else [*_prev_results, *current_results]

    # Sometimes the count doesn't match the results. We make it sync up if the
    # count doesn't match but we got all results.
    # TODO: Remove this when API fixed
    if (
        (progress is not None)
        and scroll
        and is_last_batch
        and (progress.total != progress.n)
    ):
        count = progress.n
        progress.reset(count)
        progress.update(count)

    if callback and not is_last_batch:
        callback(current_results, False)
    elif callback and is_last_batch:
        return callback(current_results, True)
    elif is_last_batch:
        if progress is not None:
            progress.close()

        # Because count is often wrong, we'll skip the logging here
        # TODO: Uncomment this when API fixed
        # print(
        #     f"Retrieved {len(results)}{f'/{_count}' if _count else ''} results"
        # )
        return results

    return recursive_paging_api_call(
        path,
        params=params,
        http_verb=http_verb,
        progress=progress,
        auth_args=auth_args,
        callback=callback,
        max_pages=max_pages,
        page_size=page_size,
        log=log,
        scroll=scroll,
        try_count=try_count,
        item_key=item_key,
        response_to_items=response_to_items,
        _current_page=_current_page + 1,
        _prev_results=results,
        _next_page_token=next_page_token,
        _count=_count,
    )