Module phc.easy.query
Expand source code
import json
import math
from typing import Any, Callable, List, Optional, Tuple, Union
import pandas as pd
from phc.base_client import BaseClient
from phc.easy.auth import Auth
from phc.easy.query.api_paging import clean_params, recursive_paging_api_call
from phc.easy.query.fhir_aggregation import FhirAggregation
from phc.easy.query.fhir_dsl import (
DEFAULT_SCROLL_SIZE,
MAX_RESULT_SIZE,
execute_single_fhir_dsl,
recursive_execute_fhir_dsl,
tqdm,
with_progress,
)
from phc.easy.query.fhir_dsl_query import build_queries
from phc.easy.query.ga4gh import recursive_execute_ga4gh
from phc.easy.query.url import merge_pattern
from phc.easy.util import _has_tqdm, extract_codes
from phc.easy.util.api_cache import FHIR_DSL, APICache
from phc.services import Fhir
from toolz import identity
class Query:
@staticmethod
def find_count_of_dsl_query(query: dict, auth_args: Auth = Auth.shared()):
"""Find count of a given dsl query
See https://devcenter.docs.lifeomic.com/development/fhir-service/dsl
Attributes
----------
query : dict
The FHIR query to run a count against
auth_args : Auth, dict
Additional arguments for authentication
Examples
--------
>>> import phc.easy as phc
>>> phc.Auth.set({ 'account': '<your-account-name>' })
>>> phc.Project.set_current('My Project Name')
>>> phc.Query.find_count_of_dsl_query({
"type": "select",
"columns": "*",
"from": [{"table": "patient"}],
})
"""
if FhirAggregation.is_aggregation_query(query):
raise ValueError("Count is not support for aggregation queries.")
auth = Auth(auth_args)
fhir = Fhir(auth.session())
response = fhir.execute_es(
auth.project_id, build_queries(query, page_size=1)[0], scroll="true"
)
return response.data["hits"]["total"]["value"]
@staticmethod
def execute_fhir_dsl(
query: dict,
all_results: bool = False,
auth_args: Auth = Auth.shared(),
callback: Union[Callable[[Any, bool], None], None] = None,
max_pages: Union[int, None] = None,
log: bool = False,
**query_kwargs,
):
"""Execute a FHIR query with the DSL
See https://devcenter.docs.lifeomic.com/development/fhir-service/dsl
Attributes
----------
query : dict
The FHIR query to run (is a superset of elasticsearch)
all_results : bool
Return all results by scrolling through mutliple pages of data
(Limit is ignored if provided)
auth_args : Auth, dict
Additional arguments for authentication
callback : Callable[[Any, bool], None] (optional)
A progress function that is invoked for each batch. When the second
argument passed is true, then the result of the callback function is
used as the return value. This is useful if writing results out to a
file and then returning the completed result from that file.
Example:
def handle_batch(batch, is_finished):
print(len(batch))
if is_finished:
return "batch finished
max_pages : int
The number of pages to retrieve (useful if working with tons of records)
log : bool = False
Whether to log the elasticsearch query sent to the server
query_kwargs : dict
Arguments to pass to build_queries such as patient_id, patient_ids,
and patient_key. (See phc.easy.query.fhir_dsl_query.build_queries)
Examples
--------
>>> import phc.easy as phc
>>> phc.Auth.set({ 'account': '<your-account-name>' })
>>> phc.Project.set_current('My Project Name')
>>> phc.Query.execute_fhir_dsl({
"type": "select",
"columns": "*",
"from": [
{"table": "patient"}
],
}, all_results=True)
"""
queries = build_queries(query, **query_kwargs)
if log:
print(json.dumps(queries, indent=4))
if len(queries) > 1 and FhirAggregation.is_aggregation_query(
queries[0]
):
raise ValueError(
"Cannot combine multiple aggregation query results"
)
if FhirAggregation.is_aggregation_query(queries[0]):
response = execute_single_fhir_dsl(queries[0], auth_args=auth_args)
return FhirAggregation.from_response(response)
if len(queries) > 1 and _has_tqdm:
queries = tqdm(queries)
result_set = []
for query in queries:
if all_results:
results = with_progress(
lambda: tqdm(total=MAX_RESULT_SIZE),
lambda progress: recursive_execute_fhir_dsl(
{
"limit": [
{"type": "number", "value": 0},
# Make window size smaller than maximum to reduce
# pressure on API
{
"type": "number",
"value": DEFAULT_SCROLL_SIZE,
},
],
**query,
},
scroll=all_results,
progress=progress,
callback=callback,
auth_args=auth_args,
max_pages=max_pages,
),
)
else:
results = recursive_execute_fhir_dsl(
query,
scroll=all_results,
callback=callback,
auth_args=auth_args,
max_pages=max_pages,
)
if len(result_set) == 0:
result_set = results
else:
result_set.append(*results)
return result_set
@staticmethod
def execute_paging_api(
path: str,
params: dict = {},
http_verb: str = "GET",
transform: Callable[[pd.DataFrame], pd.DataFrame] = identity,
all_results: bool = False,
auth_args: Auth = Auth.shared(),
max_pages: Optional[int] = None,
page_size: Optional[int] = None,
log: bool = False,
raw: bool = False,
ignore_cache: bool = False,
show_progress: bool = True,
progress: Optional[tqdm] = None,
item_key: str = "items",
try_count: bool = True,
response_to_items: Optional[Callable[[Union[list, dict]], list]] = None,
):
"""Execute a API query that pages through results
Attributes
----------
path : str
The API path to hit
(Special tokens: `{project_id}`)
params : dict
The parameters to include with request
http_verb : str
The HTTP method to use
all_results : bool = False
Retrieve sample of results (25) or entire set of records
auth_args : Auth, dict
Additional arguments for authentication
max_pages : int
The number of pages to retrieve (useful if working with tons of records)
page_size : int
The number of records to fetch per page
log : bool = False
Whether to log some diagnostic statements for debugging
progress : Optional[tqdm] = None
Override the given progress indicator
item_key : str
The key to find the results underneath (usually "items" but not always)
try_count : bool
Whether to try and send a "count" param to update the progress bar
response_to_items : Callable
Custom function to transform response data to list of items
(Overrides item_key when present)
Examples
--------
>>> import phc.easy as phc
>>> phc.Auth.set({ 'account': '<your-account-name>' })
>>> phc.Project.set_current('My Project Name')
>>> phc.Query.execute_paging_api(
"genomics/projects/{project_id}/tests",
params={
"patientId": "<patient-uuid>"
}
)
"""
auth = Auth(auth_args)
params = clean_params(params)
# Do not pull project_id if not in URL (which throws error if project not selected)
if "project_id" in path:
path = path.replace("{project_id}", auth.project_id)
path, params = merge_pattern(path, params)
query = {"path": path, "method": http_verb, "params": params}
if all_results and page_size is None:
# Default to 100 if not provided but getting all results
page_size = 100
if log:
print(json.dumps(query, indent=4))
use_cache = (
(not ignore_cache)
and (not raw)
and all_results
and (max_pages is None)
)
if use_cache and APICache.does_cache_for_query_exist(query):
return APICache.load_cache_for_query(query)
callback = (
APICache.build_cache_callback(query, transform, nested_key=None)
if use_cache
else None
)
results = with_progress(
lambda: (
(progress if progress is not None else tqdm())
if show_progress
else None
),
lambda progress: recursive_paging_api_call(
path,
params=params,
http_verb=http_verb,
callback=callback,
scroll=all_results or (max_pages is not None),
max_pages=max_pages,
page_size=page_size,
log=log,
auth_args=auth_args,
progress=progress,
item_key=item_key,
response_to_items=response_to_items,
try_count=try_count,
),
)
df = pd.DataFrame(results)
if raw:
return df
return transform(df)
@staticmethod
def execute_fhir_dsl_with_options(
query: dict,
transform: Callable[[pd.DataFrame], pd.DataFrame],
all_results: bool,
raw: bool,
query_overrides: dict,
auth_args: Auth,
ignore_cache: bool,
max_pages: Union[int, None],
log: bool = False,
**query_kwargs,
):
queries = build_queries({**query, **query_overrides}, **query_kwargs)
if log:
print(json.dumps(queries, indent=4))
is_first_agg_query = FhirAggregation.is_aggregation_query(queries[0])
if len(queries) > 1 and is_first_agg_query:
raise ValueError("Cannot combine multiple aggregate results")
use_cache = (
(not ignore_cache)
and (not raw)
and (all_results or is_first_agg_query)
and (max_pages is None)
)
if len(queries) > 1 and _has_tqdm:
queries = tqdm(queries)
frame = pd.DataFrame()
for one_query in queries:
if use_cache and APICache.does_cache_for_query_exist(
one_query, namespace=FHIR_DSL
):
results = APICache.load_cache_for_query(
one_query, namespace=FHIR_DSL
)
else:
results = Query.execute_fhir_dsl(
one_query,
all_results,
auth_args,
callback=(
APICache.build_cache_callback(
one_query, transform, namespace=FHIR_DSL
)
if use_cache
else None
),
max_pages=max_pages,
)
if isinstance(results, FhirAggregation):
# Cache isn't written in batches so we need to explicitly do it here
if use_cache:
APICache.write_agg(one_query, results)
# We don't support multiple agg queries so fine to return first one
return results
batch_frame = (
pd.DataFrame(map(lambda r: r["_source"], results))
if not isinstance(results, pd.DataFrame)
else results
)
frame = (
batch_frame
if len(frame) == 0
else pd.concat([frame, batch_frame]).reset_index(drop=True)
)
if raw:
return frame
return transform(frame)
@staticmethod
def get_codes(
table_name: str,
code_fields: List[str],
display_query: Optional[str] = None,
sample_size: Optional[int] = None,
**kwargs,
):
"""Find FHIR codes with a display for a given table
Attributes
----------
table_name : str
The FHIR Search Service table to retrieve from
code_fields : List[str]
The fields of this table that contain a system, code, and display
display_query : Optional[str]
Part of the code's display to match (will try to extract full code
if passed)
sample_size : Optional[int]
Override the search size for finding codes (may miss codes on later
records)
kwargs : dict
Arguments to pass to `phc.easy.query.Query.execute_composite_aggregations`
Examples
--------
>>> import phc.easy as phc
>>> phc.Auth.set({ 'account': '<your-account-name>' })
>>> phc.Project.set_current('My Project Name')
>>> phc.Query.get_codes(
table_name="observation",
code_fields=["meta.tag", "code.coding"],
patient_id="<my-patient-id>"
)
"""
if len(code_fields) == 0:
raise ValueError("No code columns specified.")
def agg_composite_to_frame(prefix: str, data: dict):
frame = pd.json_normalize(data["buckets"])
frame.columns = frame.columns.str.lstrip("key.")
frame["field"] = prefix
return frame
if display_query is not None:
kwargs = {
**kwargs,
"query_overrides": {
"where": {
"type": "elasticsearch",
"query": {
"multi_match": {
"query": display_query,
"fields": [
f"{key}.display" for key in code_fields
],
}
},
}
},
}
results = Query.execute_composite_aggregations(
table_name=table_name,
key_sources_pairs=[
(
field,
[
{
"display": {
"terms": {"field": f"{field}.display.keyword"}
}
}
],
)
for field in code_fields
],
**kwargs,
)
agg_result = (
pd.concat(
[
agg_composite_to_frame(key, value)
for key, value in results.items()
]
)
.pipe(
lambda df: (
df
if len(df) == 0 or display_query is None
# Poor man's way to filter only matching codes (since Elasticsearch
# returns records which will include other codes)
else df[
df["display"]
.str.lower()
.str.contains(display_query.lower())
]
)
)
.pipe(
lambda df: (
pd.DataFrame()
if len(df) == 0
else df.sort_values(
"doc_count", ascending=False
).reset_index(drop=True)
)
)
)
if display_query is None or len(agg_result) == 0:
return agg_result
min_count = sample_size or agg_result.doc_count.sum()
filtered_code_fields = agg_result.field.unique()
# Shortcut: If one result, we just need to get the other associated
# attributes of the code
if len(agg_result) == 1:
min_count = 1
code_results = Query.execute_fhir_dsl(
{
"type": "select",
"from": [{"table": table_name}],
"columns": [
{
"expr": {
"type": "column_ref",
"column": key.split(".")[0],
}
}
for key in filtered_code_fields
],
"where": {
"type": "elasticsearch",
"query": {
"multi_match": {
"query": display_query,
"fields": [
f"{key}.display" for key in filtered_code_fields
],
}
},
},
},
page_size=int(min_count % 9000),
max_pages=int(math.ceil(min_count / 9000)),
log=kwargs.get("log", False),
)
codes = extract_codes(
map(lambda d: d["_source"], code_results),
display_query,
code_fields,
)
if len(codes) == 0:
return codes
if len(codes) == codes.display.nunique():
# If display values are unique, then the counts from Elasticsearch
# are correct. We can therefore join them.
codes = (
codes.join(
agg_result[["display", "doc_count"]].set_index("display"),
on="display",
how="outer",
)
.sort_values("doc_count", ascending=False)
.reset_index(drop=True)
)
if len(codes[codes.field.isnull()]) > 0:
print(
"Records with missing system/code values were not retrieved."
)
return codes
return codes
@staticmethod
def execute_composite_aggregations(
table_name: str,
key_sources_pairs: List[Tuple[str, List[dict]]],
batch_size: int = 100,
query_overrides: dict = {},
log: bool = False,
auth_args: Auth = Auth.shared(),
max_pages: Union[int, None] = None,
**query_kwargs,
):
"""Count records by multiple fields
Attributes
----------
table_name : str
The FHIR Search Service table to retrieve from
key_sources_pairs : str
Pairs of keys and sources to pull composite results from
Example Input:
[
("meta.tag", [{"terms": {"field": "meta.tag.system.keyword"}}])
]
batch_size : int
The size of each page from elasticsearch to use
query_overrides : dict
Parts of the FSS query to override
(Note that passing certain values can cause the method to error out)
Example aggregation query executed (can use log=True to inspect):
{
"type": "select",
"columns": [{
"type": "elasticsearch",
"aggregations": {
"results": {
"composite": {
"sources": [{
"meta.tag": {
"terms": {
"field": "meta.tag.system.keyword"
}
}
}],
"size": 100,
}
}
},
}],
"from": [{"table": "observation"}],
}
auth_args : Auth, dict
Additional arguments for authentication
log : bool = False
Whether to log the elasticsearch query sent to the server
max_pages : int
The number of pages to retrieve (useful if working with tons of records)
query_kwargs : dict
Arguments to pass to build_queries such as patient_id, patient_ids,
and patient_key. See :func:`~phc.easy.query.fhir_dsl_query.build_queries`.
Examples
--------
>>> import phc.easy as phc
>>> phc.Auth.set({ 'account': '<your-account-name>' })
>>> phc.Project.set_current('My Project Name')
>>> phc.Query.execute_composite_aggregations(
table_name="observation",
key_sources_pairs=[
("meta.tag", [
{"code": {"terms": {"field": "meta.tag.code.keyword"}}},
]),
("code.coding", [
{"display": {"terms": {"field": "code.coding.display.keyword"}}}
]),
]
)
"""
if len(key_sources_pairs) == 0:
raise ValueError("No aggregate composite terms specified.")
return with_progress(
tqdm,
lambda progress: Query._recursive_execute_composite_aggregations(
table_name=table_name,
key_sources_pairs=key_sources_pairs,
batch_size=batch_size,
progress=progress,
log=log,
auth_args=auth_args,
query_overrides=query_overrides,
max_pages=max_pages,
**query_kwargs,
),
)
@staticmethod
def get_count_by_field(
table_name: str,
field: str,
batch_size: int = 1000,
query_overrides: dict = {},
log: bool = False,
auth_args: Auth = Auth.shared(),
**query_kwargs,
):
"""Count records by a given field
Attributes
----------
table_name : str
The FHIR Search Service table to retrieve from
field : str
The field name to count the values of (e.g. "subject.reference")
batch_size : int
The size of each page from elasticsearch to use
query_overrides : dict
Parts of the FSS query to override
(Note that passing certain values can cause the method to error out)
The aggregation query is similar to this:
{
"type": "select",
"columns": [{
"type": "elasticsearch",
"aggregations": {
"results": {
"composite": {
"sources": [{
"value": {
"terms": {
"field": "gender.keyword"
}
}
}],
"size": 100,
}
}
},
}],
"from": [{"table": "patient"}],
}
auth_args : Auth, dict
Additional arguments for authentication
log : bool = False
Whether to log the elasticsearch query sent to the server
query_kwargs : dict
Arguments to pass to build_queries such as patient_id, patient_ids,
and patient_key. (See phc.easy.query.fhir_dsl_query.build_queries)
Examples
--------
>>> import phc.easy as phc
>>> phc.Auth.set({ 'account': '<your-account-name>' })
>>> phc.Project.set_current('My Project Name')
>>> phc.Query.get_count_by_field(
table_name="patient",
field="gender"
)
"""
data = Query.execute_composite_aggregations(
table_name=table_name,
key_sources_pairs=[
(
"results",
[{"value": {"terms": {"field": f"{field}.keyword"}}}],
)
],
batch_size=batch_size,
log=log,
auth_args=auth_args,
query_overrides=query_overrides,
**query_kwargs,
)
return pd.DataFrame(
[
{field: r["key"]["value"], "doc_count": r["doc_count"]}
for r in data["results"]["buckets"]
]
)
@staticmethod
def execute_ga4gh(
query: dict, all_results: bool = False, auth_args: dict = Auth.shared()
) -> pd.DataFrame:
auth = Auth(auth_args)
client = BaseClient(auth.session())
path = query["path"]
http_verb = query.get("http_verb", "POST")
results_key = query["results_key"]
params = {
**{"datasetIds": [auth.project_id]},
**{
k: v for k, v in query.items() if k not in ["path", "http_verb"]
},
}
return recursive_execute_ga4gh(
auth=auth,
client=client,
path=path,
http_verb=http_verb,
results_key=results_key,
params=params,
scroll=all_results,
)
@staticmethod
def _recursive_execute_composite_aggregations(
table_name: str,
key_sources_pairs: List[Tuple[str, List[dict]]],
batch_size: int = 100,
progress: Union[tqdm, None] = None,
query_overrides: dict = {},
log: bool = False,
auth_args: Auth = Auth.shared(),
max_pages: Union[int, None] = None,
_current_page: int = 1,
_prev_results: dict = {},
_after_keys: dict = {},
**query_kwargs,
):
aggregation = Query.execute_fhir_dsl(
{
"type": "select",
"columns": [
{
"type": "elasticsearch",
"aggregations": {
key: {
"composite": {
"sources": sources,
"size": batch_size,
**(
{"after": _after_keys[key]}
if key in _after_keys
else {}
),
}
}
for key, sources in key_sources_pairs
if (len(_after_keys) == 0) or (key in _after_keys)
},
}
],
"from": [{"table": table_name}],
**query_overrides,
},
auth_args=auth_args,
log=log,
**query_kwargs,
)
current_results = aggregation.data
results = FhirAggregation.reduce_composite_results(
_prev_results, current_results
)
if (progress is not None) and (_current_page == 1) and max_pages:
progress.reset(max_pages)
if progress is not None:
# Update by count or pages (if max_pages specified)
progress.update(
1
if max_pages
else FhirAggregation.count_composite_results(current_results)
)
after_keys = FhirAggregation.find_composite_after_keys(
current_results, batch_size
)
if len(after_keys) == 0 or (
(max_pages is not None) and (_current_page >= max_pages)
):
print(
f"Retrieved {FhirAggregation.count_composite_results(results)} results"
)
return results
return Query._recursive_execute_composite_aggregations(
table_name=table_name,
key_sources_pairs=key_sources_pairs,
batch_size=batch_size,
progress=progress,
query_overrides=query_overrides,
log=log,
auth_args=auth_args,
max_pages=max_pages,
_current_page=_current_page + 1,
_prev_results=results,
_after_keys=after_keys,
**query_kwargs,
)
Sub-modules
phc.easy.query.api_paging
phc.easy.query.fhir_aggregation
phc.easy.query.fhir_dsl
phc.easy.query.fhir_dsl_query
phc.easy.query.ga4gh
phc.easy.query.url
phc.easy.query.util
Classes
class Query
-
Expand source code
class Query: @staticmethod def find_count_of_dsl_query(query: dict, auth_args: Auth = Auth.shared()): """Find count of a given dsl query See https://devcenter.docs.lifeomic.com/development/fhir-service/dsl Attributes ---------- query : dict The FHIR query to run a count against auth_args : Auth, dict Additional arguments for authentication Examples -------- >>> import phc.easy as phc >>> phc.Auth.set({ 'account': '<your-account-name>' }) >>> phc.Project.set_current('My Project Name') >>> phc.Query.find_count_of_dsl_query({ "type": "select", "columns": "*", "from": [{"table": "patient"}], }) """ if FhirAggregation.is_aggregation_query(query): raise ValueError("Count is not support for aggregation queries.") auth = Auth(auth_args) fhir = Fhir(auth.session()) response = fhir.execute_es( auth.project_id, build_queries(query, page_size=1)[0], scroll="true" ) return response.data["hits"]["total"]["value"] @staticmethod def execute_fhir_dsl( query: dict, all_results: bool = False, auth_args: Auth = Auth.shared(), callback: Union[Callable[[Any, bool], None], None] = None, max_pages: Union[int, None] = None, log: bool = False, **query_kwargs, ): """Execute a FHIR query with the DSL See https://devcenter.docs.lifeomic.com/development/fhir-service/dsl Attributes ---------- query : dict The FHIR query to run (is a superset of elasticsearch) all_results : bool Return all results by scrolling through mutliple pages of data (Limit is ignored if provided) auth_args : Auth, dict Additional arguments for authentication callback : Callable[[Any, bool], None] (optional) A progress function that is invoked for each batch. When the second argument passed is true, then the result of the callback function is used as the return value. This is useful if writing results out to a file and then returning the completed result from that file. Example: def handle_batch(batch, is_finished): print(len(batch)) if is_finished: return "batch finished max_pages : int The number of pages to retrieve (useful if working with tons of records) log : bool = False Whether to log the elasticsearch query sent to the server query_kwargs : dict Arguments to pass to build_queries such as patient_id, patient_ids, and patient_key. (See phc.easy.query.fhir_dsl_query.build_queries) Examples -------- >>> import phc.easy as phc >>> phc.Auth.set({ 'account': '<your-account-name>' }) >>> phc.Project.set_current('My Project Name') >>> phc.Query.execute_fhir_dsl({ "type": "select", "columns": "*", "from": [ {"table": "patient"} ], }, all_results=True) """ queries = build_queries(query, **query_kwargs) if log: print(json.dumps(queries, indent=4)) if len(queries) > 1 and FhirAggregation.is_aggregation_query( queries[0] ): raise ValueError( "Cannot combine multiple aggregation query results" ) if FhirAggregation.is_aggregation_query(queries[0]): response = execute_single_fhir_dsl(queries[0], auth_args=auth_args) return FhirAggregation.from_response(response) if len(queries) > 1 and _has_tqdm: queries = tqdm(queries) result_set = [] for query in queries: if all_results: results = with_progress( lambda: tqdm(total=MAX_RESULT_SIZE), lambda progress: recursive_execute_fhir_dsl( { "limit": [ {"type": "number", "value": 0}, # Make window size smaller than maximum to reduce # pressure on API { "type": "number", "value": DEFAULT_SCROLL_SIZE, }, ], **query, }, scroll=all_results, progress=progress, callback=callback, auth_args=auth_args, max_pages=max_pages, ), ) else: results = recursive_execute_fhir_dsl( query, scroll=all_results, callback=callback, auth_args=auth_args, max_pages=max_pages, ) if len(result_set) == 0: result_set = results else: result_set.append(*results) return result_set @staticmethod def execute_paging_api( path: str, params: dict = {}, http_verb: str = "GET", transform: Callable[[pd.DataFrame], pd.DataFrame] = identity, all_results: bool = False, auth_args: Auth = Auth.shared(), max_pages: Optional[int] = None, page_size: Optional[int] = None, log: bool = False, raw: bool = False, ignore_cache: bool = False, show_progress: bool = True, progress: Optional[tqdm] = None, item_key: str = "items", try_count: bool = True, response_to_items: Optional[Callable[[Union[list, dict]], list]] = None, ): """Execute a API query that pages through results Attributes ---------- path : str The API path to hit (Special tokens: `{project_id}`) params : dict The parameters to include with request http_verb : str The HTTP method to use all_results : bool = False Retrieve sample of results (25) or entire set of records auth_args : Auth, dict Additional arguments for authentication max_pages : int The number of pages to retrieve (useful if working with tons of records) page_size : int The number of records to fetch per page log : bool = False Whether to log some diagnostic statements for debugging progress : Optional[tqdm] = None Override the given progress indicator item_key : str The key to find the results underneath (usually "items" but not always) try_count : bool Whether to try and send a "count" param to update the progress bar response_to_items : Callable Custom function to transform response data to list of items (Overrides item_key when present) Examples -------- >>> import phc.easy as phc >>> phc.Auth.set({ 'account': '<your-account-name>' }) >>> phc.Project.set_current('My Project Name') >>> phc.Query.execute_paging_api( "genomics/projects/{project_id}/tests", params={ "patientId": "<patient-uuid>" } ) """ auth = Auth(auth_args) params = clean_params(params) # Do not pull project_id if not in URL (which throws error if project not selected) if "project_id" in path: path = path.replace("{project_id}", auth.project_id) path, params = merge_pattern(path, params) query = {"path": path, "method": http_verb, "params": params} if all_results and page_size is None: # Default to 100 if not provided but getting all results page_size = 100 if log: print(json.dumps(query, indent=4)) use_cache = ( (not ignore_cache) and (not raw) and all_results and (max_pages is None) ) if use_cache and APICache.does_cache_for_query_exist(query): return APICache.load_cache_for_query(query) callback = ( APICache.build_cache_callback(query, transform, nested_key=None) if use_cache else None ) results = with_progress( lambda: ( (progress if progress is not None else tqdm()) if show_progress else None ), lambda progress: recursive_paging_api_call( path, params=params, http_verb=http_verb, callback=callback, scroll=all_results or (max_pages is not None), max_pages=max_pages, page_size=page_size, log=log, auth_args=auth_args, progress=progress, item_key=item_key, response_to_items=response_to_items, try_count=try_count, ), ) df = pd.DataFrame(results) if raw: return df return transform(df) @staticmethod def execute_fhir_dsl_with_options( query: dict, transform: Callable[[pd.DataFrame], pd.DataFrame], all_results: bool, raw: bool, query_overrides: dict, auth_args: Auth, ignore_cache: bool, max_pages: Union[int, None], log: bool = False, **query_kwargs, ): queries = build_queries({**query, **query_overrides}, **query_kwargs) if log: print(json.dumps(queries, indent=4)) is_first_agg_query = FhirAggregation.is_aggregation_query(queries[0]) if len(queries) > 1 and is_first_agg_query: raise ValueError("Cannot combine multiple aggregate results") use_cache = ( (not ignore_cache) and (not raw) and (all_results or is_first_agg_query) and (max_pages is None) ) if len(queries) > 1 and _has_tqdm: queries = tqdm(queries) frame = pd.DataFrame() for one_query in queries: if use_cache and APICache.does_cache_for_query_exist( one_query, namespace=FHIR_DSL ): results = APICache.load_cache_for_query( one_query, namespace=FHIR_DSL ) else: results = Query.execute_fhir_dsl( one_query, all_results, auth_args, callback=( APICache.build_cache_callback( one_query, transform, namespace=FHIR_DSL ) if use_cache else None ), max_pages=max_pages, ) if isinstance(results, FhirAggregation): # Cache isn't written in batches so we need to explicitly do it here if use_cache: APICache.write_agg(one_query, results) # We don't support multiple agg queries so fine to return first one return results batch_frame = ( pd.DataFrame(map(lambda r: r["_source"], results)) if not isinstance(results, pd.DataFrame) else results ) frame = ( batch_frame if len(frame) == 0 else pd.concat([frame, batch_frame]).reset_index(drop=True) ) if raw: return frame return transform(frame) @staticmethod def get_codes( table_name: str, code_fields: List[str], display_query: Optional[str] = None, sample_size: Optional[int] = None, **kwargs, ): """Find FHIR codes with a display for a given table Attributes ---------- table_name : str The FHIR Search Service table to retrieve from code_fields : List[str] The fields of this table that contain a system, code, and display display_query : Optional[str] Part of the code's display to match (will try to extract full code if passed) sample_size : Optional[int] Override the search size for finding codes (may miss codes on later records) kwargs : dict Arguments to pass to `phc.easy.query.Query.execute_composite_aggregations` Examples -------- >>> import phc.easy as phc >>> phc.Auth.set({ 'account': '<your-account-name>' }) >>> phc.Project.set_current('My Project Name') >>> phc.Query.get_codes( table_name="observation", code_fields=["meta.tag", "code.coding"], patient_id="<my-patient-id>" ) """ if len(code_fields) == 0: raise ValueError("No code columns specified.") def agg_composite_to_frame(prefix: str, data: dict): frame = pd.json_normalize(data["buckets"]) frame.columns = frame.columns.str.lstrip("key.") frame["field"] = prefix return frame if display_query is not None: kwargs = { **kwargs, "query_overrides": { "where": { "type": "elasticsearch", "query": { "multi_match": { "query": display_query, "fields": [ f"{key}.display" for key in code_fields ], } }, } }, } results = Query.execute_composite_aggregations( table_name=table_name, key_sources_pairs=[ ( field, [ { "display": { "terms": {"field": f"{field}.display.keyword"} } } ], ) for field in code_fields ], **kwargs, ) agg_result = ( pd.concat( [ agg_composite_to_frame(key, value) for key, value in results.items() ] ) .pipe( lambda df: ( df if len(df) == 0 or display_query is None # Poor man's way to filter only matching codes (since Elasticsearch # returns records which will include other codes) else df[ df["display"] .str.lower() .str.contains(display_query.lower()) ] ) ) .pipe( lambda df: ( pd.DataFrame() if len(df) == 0 else df.sort_values( "doc_count", ascending=False ).reset_index(drop=True) ) ) ) if display_query is None or len(agg_result) == 0: return agg_result min_count = sample_size or agg_result.doc_count.sum() filtered_code_fields = agg_result.field.unique() # Shortcut: If one result, we just need to get the other associated # attributes of the code if len(agg_result) == 1: min_count = 1 code_results = Query.execute_fhir_dsl( { "type": "select", "from": [{"table": table_name}], "columns": [ { "expr": { "type": "column_ref", "column": key.split(".")[0], } } for key in filtered_code_fields ], "where": { "type": "elasticsearch", "query": { "multi_match": { "query": display_query, "fields": [ f"{key}.display" for key in filtered_code_fields ], } }, }, }, page_size=int(min_count % 9000), max_pages=int(math.ceil(min_count / 9000)), log=kwargs.get("log", False), ) codes = extract_codes( map(lambda d: d["_source"], code_results), display_query, code_fields, ) if len(codes) == 0: return codes if len(codes) == codes.display.nunique(): # If display values are unique, then the counts from Elasticsearch # are correct. We can therefore join them. codes = ( codes.join( agg_result[["display", "doc_count"]].set_index("display"), on="display", how="outer", ) .sort_values("doc_count", ascending=False) .reset_index(drop=True) ) if len(codes[codes.field.isnull()]) > 0: print( "Records with missing system/code values were not retrieved." ) return codes return codes @staticmethod def execute_composite_aggregations( table_name: str, key_sources_pairs: List[Tuple[str, List[dict]]], batch_size: int = 100, query_overrides: dict = {}, log: bool = False, auth_args: Auth = Auth.shared(), max_pages: Union[int, None] = None, **query_kwargs, ): """Count records by multiple fields Attributes ---------- table_name : str The FHIR Search Service table to retrieve from key_sources_pairs : str Pairs of keys and sources to pull composite results from Example Input: [ ("meta.tag", [{"terms": {"field": "meta.tag.system.keyword"}}]) ] batch_size : int The size of each page from elasticsearch to use query_overrides : dict Parts of the FSS query to override (Note that passing certain values can cause the method to error out) Example aggregation query executed (can use log=True to inspect): { "type": "select", "columns": [{ "type": "elasticsearch", "aggregations": { "results": { "composite": { "sources": [{ "meta.tag": { "terms": { "field": "meta.tag.system.keyword" } } }], "size": 100, } } }, }], "from": [{"table": "observation"}], } auth_args : Auth, dict Additional arguments for authentication log : bool = False Whether to log the elasticsearch query sent to the server max_pages : int The number of pages to retrieve (useful if working with tons of records) query_kwargs : dict Arguments to pass to build_queries such as patient_id, patient_ids, and patient_key. See :func:`~phc.easy.query.fhir_dsl_query.build_queries`. Examples -------- >>> import phc.easy as phc >>> phc.Auth.set({ 'account': '<your-account-name>' }) >>> phc.Project.set_current('My Project Name') >>> phc.Query.execute_composite_aggregations( table_name="observation", key_sources_pairs=[ ("meta.tag", [ {"code": {"terms": {"field": "meta.tag.code.keyword"}}}, ]), ("code.coding", [ {"display": {"terms": {"field": "code.coding.display.keyword"}}} ]), ] ) """ if len(key_sources_pairs) == 0: raise ValueError("No aggregate composite terms specified.") return with_progress( tqdm, lambda progress: Query._recursive_execute_composite_aggregations( table_name=table_name, key_sources_pairs=key_sources_pairs, batch_size=batch_size, progress=progress, log=log, auth_args=auth_args, query_overrides=query_overrides, max_pages=max_pages, **query_kwargs, ), ) @staticmethod def get_count_by_field( table_name: str, field: str, batch_size: int = 1000, query_overrides: dict = {}, log: bool = False, auth_args: Auth = Auth.shared(), **query_kwargs, ): """Count records by a given field Attributes ---------- table_name : str The FHIR Search Service table to retrieve from field : str The field name to count the values of (e.g. "subject.reference") batch_size : int The size of each page from elasticsearch to use query_overrides : dict Parts of the FSS query to override (Note that passing certain values can cause the method to error out) The aggregation query is similar to this: { "type": "select", "columns": [{ "type": "elasticsearch", "aggregations": { "results": { "composite": { "sources": [{ "value": { "terms": { "field": "gender.keyword" } } }], "size": 100, } } }, }], "from": [{"table": "patient"}], } auth_args : Auth, dict Additional arguments for authentication log : bool = False Whether to log the elasticsearch query sent to the server query_kwargs : dict Arguments to pass to build_queries such as patient_id, patient_ids, and patient_key. (See phc.easy.query.fhir_dsl_query.build_queries) Examples -------- >>> import phc.easy as phc >>> phc.Auth.set({ 'account': '<your-account-name>' }) >>> phc.Project.set_current('My Project Name') >>> phc.Query.get_count_by_field( table_name="patient", field="gender" ) """ data = Query.execute_composite_aggregations( table_name=table_name, key_sources_pairs=[ ( "results", [{"value": {"terms": {"field": f"{field}.keyword"}}}], ) ], batch_size=batch_size, log=log, auth_args=auth_args, query_overrides=query_overrides, **query_kwargs, ) return pd.DataFrame( [ {field: r["key"]["value"], "doc_count": r["doc_count"]} for r in data["results"]["buckets"] ] ) @staticmethod def execute_ga4gh( query: dict, all_results: bool = False, auth_args: dict = Auth.shared() ) -> pd.DataFrame: auth = Auth(auth_args) client = BaseClient(auth.session()) path = query["path"] http_verb = query.get("http_verb", "POST") results_key = query["results_key"] params = { **{"datasetIds": [auth.project_id]}, **{ k: v for k, v in query.items() if k not in ["path", "http_verb"] }, } return recursive_execute_ga4gh( auth=auth, client=client, path=path, http_verb=http_verb, results_key=results_key, params=params, scroll=all_results, ) @staticmethod def _recursive_execute_composite_aggregations( table_name: str, key_sources_pairs: List[Tuple[str, List[dict]]], batch_size: int = 100, progress: Union[tqdm, None] = None, query_overrides: dict = {}, log: bool = False, auth_args: Auth = Auth.shared(), max_pages: Union[int, None] = None, _current_page: int = 1, _prev_results: dict = {}, _after_keys: dict = {}, **query_kwargs, ): aggregation = Query.execute_fhir_dsl( { "type": "select", "columns": [ { "type": "elasticsearch", "aggregations": { key: { "composite": { "sources": sources, "size": batch_size, **( {"after": _after_keys[key]} if key in _after_keys else {} ), } } for key, sources in key_sources_pairs if (len(_after_keys) == 0) or (key in _after_keys) }, } ], "from": [{"table": table_name}], **query_overrides, }, auth_args=auth_args, log=log, **query_kwargs, ) current_results = aggregation.data results = FhirAggregation.reduce_composite_results( _prev_results, current_results ) if (progress is not None) and (_current_page == 1) and max_pages: progress.reset(max_pages) if progress is not None: # Update by count or pages (if max_pages specified) progress.update( 1 if max_pages else FhirAggregation.count_composite_results(current_results) ) after_keys = FhirAggregation.find_composite_after_keys( current_results, batch_size ) if len(after_keys) == 0 or ( (max_pages is not None) and (_current_page >= max_pages) ): print( f"Retrieved {FhirAggregation.count_composite_results(results)} results" ) return results return Query._recursive_execute_composite_aggregations( table_name=table_name, key_sources_pairs=key_sources_pairs, batch_size=batch_size, progress=progress, query_overrides=query_overrides, log=log, auth_args=auth_args, max_pages=max_pages, _current_page=_current_page + 1, _prev_results=results, _after_keys=after_keys, **query_kwargs, )
Static methods
def execute_composite_aggregations(table_name: str, key_sources_pairs: List[Tuple[str, List[dict]]], batch_size: int = 100, query_overrides: dict = {}, log: bool = False, auth_args: Auth = <phc.easy.auth.Auth object>, max_pages: Optional[int] = None, **query_kwargs)
-
Count records by multiple fields
Attributes
table_name
:str
- The FHIR Search Service table to retrieve from
key_sources_pairs
:str
-
Pairs of keys and sources to pull composite results from
Example Input: [ ("meta.tag", [{"terms": {"field": "meta.tag.system.keyword"}}]) ]
batch_size
:int
- The size of each page from elasticsearch to use
query_overrides
:dict
-
Parts of the FSS query to override (Note that passing certain values can cause the method to error out)
Example aggregation query executed (can use log=True to inspect): { "type": "select", "columns": [{ "type": "elasticsearch", "aggregations": { "results": { "composite": { "sources": [{ "meta.tag": { "terms": { "field": "meta.tag.system.keyword" } } }], "size": 100, } } }, }], "from": [{"table": "observation"}], }
auth_args
:Auth, dict
- Additional arguments for authentication
log
:bool = False
- Whether to log the elasticsearch query sent to the server
max_pages
:int
- The number of pages to retrieve (useful if working with tons of records)
query_kwargs
:dict
- Arguments to pass to build_queries such as patient_id, patient_ids,
and patient_key. See :func:
~phc.easy.query.fhir_dsl_query.build_queries
.
Examples
>>> import phc.easy as phc >>> phc.Auth.set({ 'account': '<your-account-name>' }) >>> phc.Project.set_current('My Project Name') >>> phc.Query.execute_composite_aggregations( table_name="observation", key_sources_pairs=[ ("meta.tag", [ {"code": {"terms": {"field": "meta.tag.code.keyword"}}}, ]), ("code.coding", [ {"display": {"terms": {"field": "code.coding.display.keyword"}}} ]), ] )
Expand source code
@staticmethod def execute_composite_aggregations( table_name: str, key_sources_pairs: List[Tuple[str, List[dict]]], batch_size: int = 100, query_overrides: dict = {}, log: bool = False, auth_args: Auth = Auth.shared(), max_pages: Union[int, None] = None, **query_kwargs, ): """Count records by multiple fields Attributes ---------- table_name : str The FHIR Search Service table to retrieve from key_sources_pairs : str Pairs of keys and sources to pull composite results from Example Input: [ ("meta.tag", [{"terms": {"field": "meta.tag.system.keyword"}}]) ] batch_size : int The size of each page from elasticsearch to use query_overrides : dict Parts of the FSS query to override (Note that passing certain values can cause the method to error out) Example aggregation query executed (can use log=True to inspect): { "type": "select", "columns": [{ "type": "elasticsearch", "aggregations": { "results": { "composite": { "sources": [{ "meta.tag": { "terms": { "field": "meta.tag.system.keyword" } } }], "size": 100, } } }, }], "from": [{"table": "observation"}], } auth_args : Auth, dict Additional arguments for authentication log : bool = False Whether to log the elasticsearch query sent to the server max_pages : int The number of pages to retrieve (useful if working with tons of records) query_kwargs : dict Arguments to pass to build_queries such as patient_id, patient_ids, and patient_key. See :func:`~phc.easy.query.fhir_dsl_query.build_queries`. Examples -------- >>> import phc.easy as phc >>> phc.Auth.set({ 'account': '<your-account-name>' }) >>> phc.Project.set_current('My Project Name') >>> phc.Query.execute_composite_aggregations( table_name="observation", key_sources_pairs=[ ("meta.tag", [ {"code": {"terms": {"field": "meta.tag.code.keyword"}}}, ]), ("code.coding", [ {"display": {"terms": {"field": "code.coding.display.keyword"}}} ]), ] ) """ if len(key_sources_pairs) == 0: raise ValueError("No aggregate composite terms specified.") return with_progress( tqdm, lambda progress: Query._recursive_execute_composite_aggregations( table_name=table_name, key_sources_pairs=key_sources_pairs, batch_size=batch_size, progress=progress, log=log, auth_args=auth_args, query_overrides=query_overrides, max_pages=max_pages, **query_kwargs, ), )
def execute_fhir_dsl(query: dict, all_results: bool = False, auth_args: Auth = <phc.easy.auth.Auth object>, callback: Optional[Callable[[Any, bool], None]] = None, max_pages: Optional[int] = None, log: bool = False, **query_kwargs)
-
Execute a FHIR query with the DSL
See https://devcenter.docs.lifeomic.com/development/fhir-service/dsl
Attributes
query
:dict
- The FHIR query to run (is a superset of elasticsearch)
all_results
:bool
- Return all results by scrolling through mutliple pages of data (Limit is ignored if provided)
auth_args
:Auth, dict
- Additional arguments for authentication
callback
:Callable[[Any, bool], None] (optional)
-
A progress function that is invoked for each batch. When the second argument passed is true, then the result of the callback function is used as the return value. This is useful if writing results out to a file and then returning the completed result from that file.
Example:
def handle_batch(batch, is_finished): print(len(batch)) if is_finished: return "batch finished
max_pages
:int
- The number of pages to retrieve (useful if working with tons of records)
log
:bool = False
- Whether to log the elasticsearch query sent to the server
query_kwargs
:dict
- Arguments to pass to build_queries such as patient_id, patient_ids, and patient_key. (See phc.easy.query.fhir_dsl_query.build_queries)
Examples
>>> import phc.easy as phc >>> phc.Auth.set({ 'account': '<your-account-name>' }) >>> phc.Project.set_current('My Project Name') >>> phc.Query.execute_fhir_dsl({ "type": "select", "columns": "*", "from": [ {"table": "patient"} ], }, all_results=True)
Expand source code
@staticmethod def execute_fhir_dsl( query: dict, all_results: bool = False, auth_args: Auth = Auth.shared(), callback: Union[Callable[[Any, bool], None], None] = None, max_pages: Union[int, None] = None, log: bool = False, **query_kwargs, ): """Execute a FHIR query with the DSL See https://devcenter.docs.lifeomic.com/development/fhir-service/dsl Attributes ---------- query : dict The FHIR query to run (is a superset of elasticsearch) all_results : bool Return all results by scrolling through mutliple pages of data (Limit is ignored if provided) auth_args : Auth, dict Additional arguments for authentication callback : Callable[[Any, bool], None] (optional) A progress function that is invoked for each batch. When the second argument passed is true, then the result of the callback function is used as the return value. This is useful if writing results out to a file and then returning the completed result from that file. Example: def handle_batch(batch, is_finished): print(len(batch)) if is_finished: return "batch finished max_pages : int The number of pages to retrieve (useful if working with tons of records) log : bool = False Whether to log the elasticsearch query sent to the server query_kwargs : dict Arguments to pass to build_queries such as patient_id, patient_ids, and patient_key. (See phc.easy.query.fhir_dsl_query.build_queries) Examples -------- >>> import phc.easy as phc >>> phc.Auth.set({ 'account': '<your-account-name>' }) >>> phc.Project.set_current('My Project Name') >>> phc.Query.execute_fhir_dsl({ "type": "select", "columns": "*", "from": [ {"table": "patient"} ], }, all_results=True) """ queries = build_queries(query, **query_kwargs) if log: print(json.dumps(queries, indent=4)) if len(queries) > 1 and FhirAggregation.is_aggregation_query( queries[0] ): raise ValueError( "Cannot combine multiple aggregation query results" ) if FhirAggregation.is_aggregation_query(queries[0]): response = execute_single_fhir_dsl(queries[0], auth_args=auth_args) return FhirAggregation.from_response(response) if len(queries) > 1 and _has_tqdm: queries = tqdm(queries) result_set = [] for query in queries: if all_results: results = with_progress( lambda: tqdm(total=MAX_RESULT_SIZE), lambda progress: recursive_execute_fhir_dsl( { "limit": [ {"type": "number", "value": 0}, # Make window size smaller than maximum to reduce # pressure on API { "type": "number", "value": DEFAULT_SCROLL_SIZE, }, ], **query, }, scroll=all_results, progress=progress, callback=callback, auth_args=auth_args, max_pages=max_pages, ), ) else: results = recursive_execute_fhir_dsl( query, scroll=all_results, callback=callback, auth_args=auth_args, max_pages=max_pages, ) if len(result_set) == 0: result_set = results else: result_set.append(*results) return result_set
def execute_fhir_dsl_with_options(query: dict, transform: Callable[[pandas.core.frame.DataFrame], pandas.core.frame.DataFrame], all_results: bool, raw: bool, query_overrides: dict, auth_args: Auth, ignore_cache: bool, max_pages: Optional[int], log: bool = False, **query_kwargs)
-
Expand source code
@staticmethod def execute_fhir_dsl_with_options( query: dict, transform: Callable[[pd.DataFrame], pd.DataFrame], all_results: bool, raw: bool, query_overrides: dict, auth_args: Auth, ignore_cache: bool, max_pages: Union[int, None], log: bool = False, **query_kwargs, ): queries = build_queries({**query, **query_overrides}, **query_kwargs) if log: print(json.dumps(queries, indent=4)) is_first_agg_query = FhirAggregation.is_aggregation_query(queries[0]) if len(queries) > 1 and is_first_agg_query: raise ValueError("Cannot combine multiple aggregate results") use_cache = ( (not ignore_cache) and (not raw) and (all_results or is_first_agg_query) and (max_pages is None) ) if len(queries) > 1 and _has_tqdm: queries = tqdm(queries) frame = pd.DataFrame() for one_query in queries: if use_cache and APICache.does_cache_for_query_exist( one_query, namespace=FHIR_DSL ): results = APICache.load_cache_for_query( one_query, namespace=FHIR_DSL ) else: results = Query.execute_fhir_dsl( one_query, all_results, auth_args, callback=( APICache.build_cache_callback( one_query, transform, namespace=FHIR_DSL ) if use_cache else None ), max_pages=max_pages, ) if isinstance(results, FhirAggregation): # Cache isn't written in batches so we need to explicitly do it here if use_cache: APICache.write_agg(one_query, results) # We don't support multiple agg queries so fine to return first one return results batch_frame = ( pd.DataFrame(map(lambda r: r["_source"], results)) if not isinstance(results, pd.DataFrame) else results ) frame = ( batch_frame if len(frame) == 0 else pd.concat([frame, batch_frame]).reset_index(drop=True) ) if raw: return frame return transform(frame)
def execute_ga4gh(query: dict, all_results: bool = False, auth_args: dict = <phc.easy.auth.Auth object>) ‑> pandas.core.frame.DataFrame
-
Expand source code
@staticmethod def execute_ga4gh( query: dict, all_results: bool = False, auth_args: dict = Auth.shared() ) -> pd.DataFrame: auth = Auth(auth_args) client = BaseClient(auth.session()) path = query["path"] http_verb = query.get("http_verb", "POST") results_key = query["results_key"] params = { **{"datasetIds": [auth.project_id]}, **{ k: v for k, v in query.items() if k not in ["path", "http_verb"] }, } return recursive_execute_ga4gh( auth=auth, client=client, path=path, http_verb=http_verb, results_key=results_key, params=params, scroll=all_results, )
def execute_paging_api(path: str, params: dict = {}, http_verb: str = 'GET', transform: Callable[[pandas.core.frame.DataFrame], pandas.core.frame.DataFrame] = <function identity>, all_results: bool = False, auth_args: Auth = <phc.easy.auth.Auth object>, max_pages: Optional[int] = None, page_size: Optional[int] = None, log: bool = False, raw: bool = False, ignore_cache: bool = False, show_progress: bool = True, progress: None = None, item_key: str = 'items', try_count: bool = True, response_to_items: Optional[Callable[[Union[list, dict]], list]] = None)
-
Execute a API query that pages through results
Attributes
path
:str
- The API path to hit
(Special tokens:
{project_id}
) params
:dict
- The parameters to include with request
http_verb
:str
- The HTTP method to use
all_results
:bool = False
- Retrieve sample of results (25) or entire set of records
auth_args
:Auth, dict
- Additional arguments for authentication
max_pages
:int
- The number of pages to retrieve (useful if working with tons of records)
page_size
:int
- The number of records to fetch per page
log
:bool = False
- Whether to log some diagnostic statements for debugging
progress
:Optional[tqdm] = None
- Override the given progress indicator
item_key
:str
- The key to find the results underneath (usually "items" but not always)
try_count
:bool
- Whether to try and send a "count" param to update the progress bar
response_to_items
:Callable
- Custom function to transform response data to list of items (Overrides item_key when present)
Examples
>>> import phc.easy as phc >>> phc.Auth.set({ 'account': '<your-account-name>' }) >>> phc.Project.set_current('My Project Name') >>> phc.Query.execute_paging_api( "genomics/projects/{project_id}/tests", params={ "patientId": "<patient-uuid>" } )
Expand source code
@staticmethod def execute_paging_api( path: str, params: dict = {}, http_verb: str = "GET", transform: Callable[[pd.DataFrame], pd.DataFrame] = identity, all_results: bool = False, auth_args: Auth = Auth.shared(), max_pages: Optional[int] = None, page_size: Optional[int] = None, log: bool = False, raw: bool = False, ignore_cache: bool = False, show_progress: bool = True, progress: Optional[tqdm] = None, item_key: str = "items", try_count: bool = True, response_to_items: Optional[Callable[[Union[list, dict]], list]] = None, ): """Execute a API query that pages through results Attributes ---------- path : str The API path to hit (Special tokens: `{project_id}`) params : dict The parameters to include with request http_verb : str The HTTP method to use all_results : bool = False Retrieve sample of results (25) or entire set of records auth_args : Auth, dict Additional arguments for authentication max_pages : int The number of pages to retrieve (useful if working with tons of records) page_size : int The number of records to fetch per page log : bool = False Whether to log some diagnostic statements for debugging progress : Optional[tqdm] = None Override the given progress indicator item_key : str The key to find the results underneath (usually "items" but not always) try_count : bool Whether to try and send a "count" param to update the progress bar response_to_items : Callable Custom function to transform response data to list of items (Overrides item_key when present) Examples -------- >>> import phc.easy as phc >>> phc.Auth.set({ 'account': '<your-account-name>' }) >>> phc.Project.set_current('My Project Name') >>> phc.Query.execute_paging_api( "genomics/projects/{project_id}/tests", params={ "patientId": "<patient-uuid>" } ) """ auth = Auth(auth_args) params = clean_params(params) # Do not pull project_id if not in URL (which throws error if project not selected) if "project_id" in path: path = path.replace("{project_id}", auth.project_id) path, params = merge_pattern(path, params) query = {"path": path, "method": http_verb, "params": params} if all_results and page_size is None: # Default to 100 if not provided but getting all results page_size = 100 if log: print(json.dumps(query, indent=4)) use_cache = ( (not ignore_cache) and (not raw) and all_results and (max_pages is None) ) if use_cache and APICache.does_cache_for_query_exist(query): return APICache.load_cache_for_query(query) callback = ( APICache.build_cache_callback(query, transform, nested_key=None) if use_cache else None ) results = with_progress( lambda: ( (progress if progress is not None else tqdm()) if show_progress else None ), lambda progress: recursive_paging_api_call( path, params=params, http_verb=http_verb, callback=callback, scroll=all_results or (max_pages is not None), max_pages=max_pages, page_size=page_size, log=log, auth_args=auth_args, progress=progress, item_key=item_key, response_to_items=response_to_items, try_count=try_count, ), ) df = pd.DataFrame(results) if raw: return df return transform(df)
def find_count_of_dsl_query(query: dict, auth_args: Auth = <phc.easy.auth.Auth object>)
-
Find count of a given dsl query
See https://devcenter.docs.lifeomic.com/development/fhir-service/dsl
Attributes
query
:dict
- The FHIR query to run a count against
auth_args
:Auth, dict
- Additional arguments for authentication
Examples
>>> import phc.easy as phc >>> phc.Auth.set({ 'account': '<your-account-name>' }) >>> phc.Project.set_current('My Project Name') >>> phc.Query.find_count_of_dsl_query({ "type": "select", "columns": "*", "from": [{"table": "patient"}], })
Expand source code
@staticmethod def find_count_of_dsl_query(query: dict, auth_args: Auth = Auth.shared()): """Find count of a given dsl query See https://devcenter.docs.lifeomic.com/development/fhir-service/dsl Attributes ---------- query : dict The FHIR query to run a count against auth_args : Auth, dict Additional arguments for authentication Examples -------- >>> import phc.easy as phc >>> phc.Auth.set({ 'account': '<your-account-name>' }) >>> phc.Project.set_current('My Project Name') >>> phc.Query.find_count_of_dsl_query({ "type": "select", "columns": "*", "from": [{"table": "patient"}], }) """ if FhirAggregation.is_aggregation_query(query): raise ValueError("Count is not support for aggregation queries.") auth = Auth(auth_args) fhir = Fhir(auth.session()) response = fhir.execute_es( auth.project_id, build_queries(query, page_size=1)[0], scroll="true" ) return response.data["hits"]["total"]["value"]
def get_codes(table_name: str, code_fields: List[str], display_query: Optional[str] = None, sample_size: Optional[int] = None, **kwargs)
-
Find FHIR codes with a display for a given table
Attributes
table_name
:str
- The FHIR Search Service table to retrieve from
code_fields
:List[str]
- The fields of this table that contain a system, code, and display
display_query
:Optional[str]
- Part of the code's display to match (will try to extract full code if passed)
sample_size
:Optional[int]
- Override the search size for finding codes (may miss codes on later records)
kwargs
:dict
- Arguments to pass to
Query.execute_composite_aggregations()
Examples
>>> import phc.easy as phc >>> phc.Auth.set({ 'account': '<your-account-name>' }) >>> phc.Project.set_current('My Project Name') >>> phc.Query.get_codes( table_name="observation", code_fields=["meta.tag", "code.coding"], patient_id="<my-patient-id>" )
Expand source code
@staticmethod def get_codes( table_name: str, code_fields: List[str], display_query: Optional[str] = None, sample_size: Optional[int] = None, **kwargs, ): """Find FHIR codes with a display for a given table Attributes ---------- table_name : str The FHIR Search Service table to retrieve from code_fields : List[str] The fields of this table that contain a system, code, and display display_query : Optional[str] Part of the code's display to match (will try to extract full code if passed) sample_size : Optional[int] Override the search size for finding codes (may miss codes on later records) kwargs : dict Arguments to pass to `phc.easy.query.Query.execute_composite_aggregations` Examples -------- >>> import phc.easy as phc >>> phc.Auth.set({ 'account': '<your-account-name>' }) >>> phc.Project.set_current('My Project Name') >>> phc.Query.get_codes( table_name="observation", code_fields=["meta.tag", "code.coding"], patient_id="<my-patient-id>" ) """ if len(code_fields) == 0: raise ValueError("No code columns specified.") def agg_composite_to_frame(prefix: str, data: dict): frame = pd.json_normalize(data["buckets"]) frame.columns = frame.columns.str.lstrip("key.") frame["field"] = prefix return frame if display_query is not None: kwargs = { **kwargs, "query_overrides": { "where": { "type": "elasticsearch", "query": { "multi_match": { "query": display_query, "fields": [ f"{key}.display" for key in code_fields ], } }, } }, } results = Query.execute_composite_aggregations( table_name=table_name, key_sources_pairs=[ ( field, [ { "display": { "terms": {"field": f"{field}.display.keyword"} } } ], ) for field in code_fields ], **kwargs, ) agg_result = ( pd.concat( [ agg_composite_to_frame(key, value) for key, value in results.items() ] ) .pipe( lambda df: ( df if len(df) == 0 or display_query is None # Poor man's way to filter only matching codes (since Elasticsearch # returns records which will include other codes) else df[ df["display"] .str.lower() .str.contains(display_query.lower()) ] ) ) .pipe( lambda df: ( pd.DataFrame() if len(df) == 0 else df.sort_values( "doc_count", ascending=False ).reset_index(drop=True) ) ) ) if display_query is None or len(agg_result) == 0: return agg_result min_count = sample_size or agg_result.doc_count.sum() filtered_code_fields = agg_result.field.unique() # Shortcut: If one result, we just need to get the other associated # attributes of the code if len(agg_result) == 1: min_count = 1 code_results = Query.execute_fhir_dsl( { "type": "select", "from": [{"table": table_name}], "columns": [ { "expr": { "type": "column_ref", "column": key.split(".")[0], } } for key in filtered_code_fields ], "where": { "type": "elasticsearch", "query": { "multi_match": { "query": display_query, "fields": [ f"{key}.display" for key in filtered_code_fields ], } }, }, }, page_size=int(min_count % 9000), max_pages=int(math.ceil(min_count / 9000)), log=kwargs.get("log", False), ) codes = extract_codes( map(lambda d: d["_source"], code_results), display_query, code_fields, ) if len(codes) == 0: return codes if len(codes) == codes.display.nunique(): # If display values are unique, then the counts from Elasticsearch # are correct. We can therefore join them. codes = ( codes.join( agg_result[["display", "doc_count"]].set_index("display"), on="display", how="outer", ) .sort_values("doc_count", ascending=False) .reset_index(drop=True) ) if len(codes[codes.field.isnull()]) > 0: print( "Records with missing system/code values were not retrieved." ) return codes return codes
def get_count_by_field(table_name: str, field: str, batch_size: int = 1000, query_overrides: dict = {}, log: bool = False, auth_args: Auth = <phc.easy.auth.Auth object>, **query_kwargs)
-
Count records by a given field
Attributes
table_name
:str
- The FHIR Search Service table to retrieve from
field
:str
- The field name to count the values of (e.g. "subject.reference")
batch_size
:int
- The size of each page from elasticsearch to use
query_overrides
:dict
-
Parts of the FSS query to override (Note that passing certain values can cause the method to error out)
The aggregation query is similar to this: { "type": "select", "columns": [{ "type": "elasticsearch", "aggregations": { "results": { "composite": { "sources": [{ "value": { "terms": { "field": "gender.keyword" } } }], "size": 100, } } }, }], "from": [{"table": "patient"}], }
auth_args
:Auth, dict
- Additional arguments for authentication
log
:bool = False
- Whether to log the elasticsearch query sent to the server
query_kwargs
:dict
- Arguments to pass to build_queries such as patient_id, patient_ids, and patient_key. (See phc.easy.query.fhir_dsl_query.build_queries)
Examples
>>> import phc.easy as phc >>> phc.Auth.set({ 'account': '<your-account-name>' }) >>> phc.Project.set_current('My Project Name') >>> phc.Query.get_count_by_field( table_name="patient", field="gender" )
Expand source code
@staticmethod def get_count_by_field( table_name: str, field: str, batch_size: int = 1000, query_overrides: dict = {}, log: bool = False, auth_args: Auth = Auth.shared(), **query_kwargs, ): """Count records by a given field Attributes ---------- table_name : str The FHIR Search Service table to retrieve from field : str The field name to count the values of (e.g. "subject.reference") batch_size : int The size of each page from elasticsearch to use query_overrides : dict Parts of the FSS query to override (Note that passing certain values can cause the method to error out) The aggregation query is similar to this: { "type": "select", "columns": [{ "type": "elasticsearch", "aggregations": { "results": { "composite": { "sources": [{ "value": { "terms": { "field": "gender.keyword" } } }], "size": 100, } } }, }], "from": [{"table": "patient"}], } auth_args : Auth, dict Additional arguments for authentication log : bool = False Whether to log the elasticsearch query sent to the server query_kwargs : dict Arguments to pass to build_queries such as patient_id, patient_ids, and patient_key. (See phc.easy.query.fhir_dsl_query.build_queries) Examples -------- >>> import phc.easy as phc >>> phc.Auth.set({ 'account': '<your-account-name>' }) >>> phc.Project.set_current('My Project Name') >>> phc.Query.get_count_by_field( table_name="patient", field="gender" ) """ data = Query.execute_composite_aggregations( table_name=table_name, key_sources_pairs=[ ( "results", [{"value": {"terms": {"field": f"{field}.keyword"}}}], ) ], batch_size=batch_size, log=log, auth_args=auth_args, query_overrides=query_overrides, **query_kwargs, ) return pd.DataFrame( [ {field: r["key"]["value"], "doc_count": r["doc_count"]} for r in data["results"]["buckets"] ] )