Module phc.easy.query.fhir_dsl_query
Expand source code
from functools import partial, reduce
from typing import Callable, List, Optional, Union
from funcy import chunks
from lenses import lens
from phc.easy.query.util import flat_map_pipe
from phc.easy.util import add_prefixes
from toolz import compose, curry, identity, pipe
DEFAULT_MAX_TERMS = 30_000
MAX_RESULT_SIZE = 10000
DEFAULT_SCROLL_SIZE = int(MAX_RESULT_SIZE * 0.9)
FHIR_WHERE = lens.Get("where", {})
FHIR_WHERE_TYPE = FHIR_WHERE.Get("type", "")
FHIR_SIMPLE_QUERY = FHIR_WHERE.Get("query", {})
FHIR_BOOL_QUERY = FHIR_SIMPLE_QUERY.Get("bool", {})
FHIR_BOOL_MUST_QUERY = FHIR_BOOL_QUERY.Get("must", [])
FHIR_LIMIT = lens.Get(
"limit",
[
{"type": "number", "value": 0},
{"type": "number", "value": DEFAULT_SCROLL_SIZE},
],
)[1]["value"]
def get_limit(query: dict):
return lens.Get("limit", [{}, {}])[1].Get("value", None).get()(query)
def update_limit(query: dict, update: Callable[[int], int]):
return FHIR_LIMIT.modify(compose(int, update))(query)
@curry
def and_query_clause_terms(second_query_clause, first_query_clause):
return {"bool": {"must": [first_query_clause, second_query_clause]}}
def and_query_clause(query: dict, query_clause: dict):
"Append a term/terms clause to an existing FSS query"
if FHIR_WHERE.get()(query) == {}:
return pipe(
query,
FHIR_SIMPLE_QUERY.set(query_clause),
FHIR_WHERE_TYPE.set("elasticsearch"),
)
if FHIR_WHERE_TYPE.get()(query) != "elasticsearch":
raise ValueError(
"Could not add clause to query that is not elasticsearch",
query_clause,
query,
)
query_keys = list(FHIR_SIMPLE_QUERY.get()(query).keys())
bool_keys = FHIR_BOOL_QUERY.get()(query).keys()
if (len(query_keys) == 1 and (query_keys[0] in ["term", "terms"])) or (
"should" in bool_keys
):
return FHIR_SIMPLE_QUERY.modify(and_query_clause_terms(query_clause))(
query
)
if len(bool_keys) == 1 and "must" in bool_keys:
return FHIR_BOOL_MUST_QUERY.modify(lambda must: [*must, query_clause])(
query
)
raise ValueError("Could not add clause to query", query_clause, query)
def _ids_adder(
id: Union[str, None] = None,
ids: List[str] = [],
max_terms: int = DEFAULT_MAX_TERMS,
):
ids = [*ids, *([id] if id else [])]
if len(ids) == 0:
return identity
return terms_adder({"id.keyword": ids}, max_terms=max_terms)
def foreign_ids_adder(
foreign_id: Optional[str],
foreign_ids: List[str],
foreign_key: str,
foreign_id_prefixes: List[str],
max_terms: int = DEFAULT_MAX_TERMS,
):
foreign_ids = [*foreign_ids, *([foreign_id] if foreign_id else [])]
if len(foreign_ids) == 0:
return identity
return terms_adder(
{
f"{foreign_key}.keyword": [
*add_prefixes(foreign_ids, foreign_id_prefixes),
*foreign_ids,
]
},
max_terms=max_terms,
)
def _term_or_terms_adder(
term: Optional[dict], terms: List[dict], max_terms: int = DEFAULT_MAX_TERMS
):
if term is None and len(terms) == 0:
return identity
terms = [term, *terms] if term is not None else terms
def _adder(query):
return flat_map_pipe(
query,
*[
(
terms_adder(t, max_terms=max_terms)
if isinstance(list(t.values())[0], list)
else term_adder(t)
)
for t in terms
],
)
return _adder
def term_adder(term: Optional[dict]):
if term is None:
return identity
if len(term.keys()) > 1:
raise ValueError(
f"Multiple keys unexpected for term dictionary for fhir-search-service. {term}"
)
return partial(and_query_clause, query_clause={"term": term})
def terms_adder(terms: Optional[dict], max_terms: int = DEFAULT_MAX_TERMS):
if terms is None:
return identity
if len(terms.keys()) > 1:
raise ValueError(
f"Multiple keys unexpected for terms dictionary for fhir-search-service. {terms}"
)
key = list(terms.keys())[0]
# NOTE: Must convert chunks from generator so that function can be run multiple times
value_batches = list(chunks(max_terms, list(terms.values())[0]))
def _adder(query):
return [
and_query_clause(query, {"terms": {key: value_batch}})
for value_batch in value_batches
]
return _adder
def _code_adder(
attribute: Union[str],
code_fields: List[str],
value: Optional[Union[str, List[str]]],
):
if len(code_fields) == 0 or value is None:
return identity
term_or_terms = "term" if isinstance(value, str) else "terms"
return partial(
and_query_clause,
query_clause={
"bool": {
"should": [
{term_or_terms: {f"{key}.{attribute}.keyword": value}}
for key in code_fields
]
}
},
)
def _limit_adder(page_size: Union[int, None]):
if page_size is None:
return identity
return FHIR_LIMIT.set(page_size)
def build_queries(
query: dict,
id: Optional[str] = None,
ids: List[str] = [],
patient_id: Optional[str] = None,
patient_ids: List[str] = [],
patient_key: str = "subject.reference",
patient_id_prefixes: List[str] = ["Patient/"],
page_size: Optional[int] = None,
term: Optional[dict] = None,
terms: List[dict] = [],
max_terms: int = DEFAULT_MAX_TERMS,
# Codes
code_fields: List[str] = [],
code: Optional[Union[str, List[str]]] = None,
display: Optional[Union[str, List[str]]] = None,
system: Optional[Union[str, List[str]]] = None,
):
"""Build query with various options
Attributes
----------
query : dict
The base FSS query
id : str
Adds where clause for a single id (will be merged with
ids if both supplied)
ids : List[str]
Adds where clause for multiple ids
patient_id : str
Adds where clause for a single patient (will be merged with
patient_ids if both supplied)
patient_ids : List[str]
Adds where clause for multiple patients
patient_key : str
The column that associates this table's records to a patient
patient_id_prefixes : str
Adds a prefix to patient_id values (e.g.
"Patient/0a20d90f-c73c-4149-953d-7614ce7867f" as well as
"0a20d90f-c73c-4149-953d-7614ce7867f")
term : dict
Add an arbitrary ES term/s to the query (includes chunking)
terms : dict
Add multiple arbitrary ES term/s to the query (includes chunking)
page_size: int
The number of records to fetch per page
code_fields : List[str]
A list of paths to find FHIR codes in
code : str | List[str]
Adds where clause for code value(s)
display : str | List[str]
Adds where clause for code display value(s)
system : str | List[str]
Adds where clause for code system value(s)
"""
return flat_map_pipe(
query,
_ids_adder(id=id, ids=ids, max_terms=max_terms),
foreign_ids_adder(
foreign_id=patient_id,
foreign_ids=patient_ids,
foreign_key=patient_key,
foreign_id_prefixes=patient_id_prefixes,
max_terms=max_terms,
),
_term_or_terms_adder(term=term, terms=terms, max_terms=max_terms),
_code_adder(attribute="code", code_fields=code_fields, value=code),
_code_adder(
attribute="display", code_fields=code_fields, value=display
),
_code_adder(attribute="system", code_fields=code_fields, value=system),
_limit_adder(page_size),
)
Functions
def and_query_clause(query: dict, query_clause: dict)
-
Append a term/terms clause to an existing FSS query
Expand source code
def and_query_clause(query: dict, query_clause: dict): "Append a term/terms clause to an existing FSS query" if FHIR_WHERE.get()(query) == {}: return pipe( query, FHIR_SIMPLE_QUERY.set(query_clause), FHIR_WHERE_TYPE.set("elasticsearch"), ) if FHIR_WHERE_TYPE.get()(query) != "elasticsearch": raise ValueError( "Could not add clause to query that is not elasticsearch", query_clause, query, ) query_keys = list(FHIR_SIMPLE_QUERY.get()(query).keys()) bool_keys = FHIR_BOOL_QUERY.get()(query).keys() if (len(query_keys) == 1 and (query_keys[0] in ["term", "terms"])) or ( "should" in bool_keys ): return FHIR_SIMPLE_QUERY.modify(and_query_clause_terms(query_clause))( query ) if len(bool_keys) == 1 and "must" in bool_keys: return FHIR_BOOL_MUST_QUERY.modify(lambda must: [*must, query_clause])( query ) raise ValueError("Could not add clause to query", query_clause, query)
def and_query_clause_terms(second_query_clause='__no__default__', first_query_clause='__no__default__')
def build_queries(query: dict, id: Optional[str] = None, ids: List[str] = [], patient_id: Optional[str] = None, patient_ids: List[str] = [], patient_key: str = 'subject.reference', patient_id_prefixes: List[str] = ['Patient/'], page_size: Optional[int] = None, term: Optional[dict] = None, terms: List[dict] = [], max_terms: int = 30000, code_fields: List[str] = [], code: Union[str, List[str], ForwardRef(None)] = None, display: Union[str, List[str], ForwardRef(None)] = None, system: Union[str, List[str], ForwardRef(None)] = None)
-
Build query with various options
Attributes
query
:dict
- The base FSS query
id
:str
- Adds where clause for a single id (will be merged with ids if both supplied)
ids
:List[str]
- Adds where clause for multiple ids
patient_id
:str
- Adds where clause for a single patient (will be merged with patient_ids if both supplied)
patient_ids
:List[str]
- Adds where clause for multiple patients
patient_key
:str
- The column that associates this table's records to a patient
patient_id_prefixes
:str
- Adds a prefix to patient_id values (e.g. "Patient/0a20d90f-c73c-4149-953d-7614ce7867f" as well as "0a20d90f-c73c-4149-953d-7614ce7867f")
term
:dict
- Add an arbitrary ES term/s to the query (includes chunking)
terms
:dict
- Add multiple arbitrary ES term/s to the query (includes chunking)
page_size
:int
- The number of records to fetch per page
code_fields
:List[str]
- A list of paths to find FHIR codes in
code
:str | List[str]
- Adds where clause for code value(s)
display
:str | List[str]
- Adds where clause for code display value(s)
system
:str | List[str]
- Adds where clause for code system value(s)
Expand source code
def build_queries( query: dict, id: Optional[str] = None, ids: List[str] = [], patient_id: Optional[str] = None, patient_ids: List[str] = [], patient_key: str = "subject.reference", patient_id_prefixes: List[str] = ["Patient/"], page_size: Optional[int] = None, term: Optional[dict] = None, terms: List[dict] = [], max_terms: int = DEFAULT_MAX_TERMS, # Codes code_fields: List[str] = [], code: Optional[Union[str, List[str]]] = None, display: Optional[Union[str, List[str]]] = None, system: Optional[Union[str, List[str]]] = None, ): """Build query with various options Attributes ---------- query : dict The base FSS query id : str Adds where clause for a single id (will be merged with ids if both supplied) ids : List[str] Adds where clause for multiple ids patient_id : str Adds where clause for a single patient (will be merged with patient_ids if both supplied) patient_ids : List[str] Adds where clause for multiple patients patient_key : str The column that associates this table's records to a patient patient_id_prefixes : str Adds a prefix to patient_id values (e.g. "Patient/0a20d90f-c73c-4149-953d-7614ce7867f" as well as "0a20d90f-c73c-4149-953d-7614ce7867f") term : dict Add an arbitrary ES term/s to the query (includes chunking) terms : dict Add multiple arbitrary ES term/s to the query (includes chunking) page_size: int The number of records to fetch per page code_fields : List[str] A list of paths to find FHIR codes in code : str | List[str] Adds where clause for code value(s) display : str | List[str] Adds where clause for code display value(s) system : str | List[str] Adds where clause for code system value(s) """ return flat_map_pipe( query, _ids_adder(id=id, ids=ids, max_terms=max_terms), foreign_ids_adder( foreign_id=patient_id, foreign_ids=patient_ids, foreign_key=patient_key, foreign_id_prefixes=patient_id_prefixes, max_terms=max_terms, ), _term_or_terms_adder(term=term, terms=terms, max_terms=max_terms), _code_adder(attribute="code", code_fields=code_fields, value=code), _code_adder( attribute="display", code_fields=code_fields, value=display ), _code_adder(attribute="system", code_fields=code_fields, value=system), _limit_adder(page_size), )
def foreign_ids_adder(foreign_id: Optional[str], foreign_ids: List[str], foreign_key: str, foreign_id_prefixes: List[str], max_terms: int = 30000)
-
Expand source code
def foreign_ids_adder( foreign_id: Optional[str], foreign_ids: List[str], foreign_key: str, foreign_id_prefixes: List[str], max_terms: int = DEFAULT_MAX_TERMS, ): foreign_ids = [*foreign_ids, *([foreign_id] if foreign_id else [])] if len(foreign_ids) == 0: return identity return terms_adder( { f"{foreign_key}.keyword": [ *add_prefixes(foreign_ids, foreign_id_prefixes), *foreign_ids, ] }, max_terms=max_terms, )
def get_limit(query: dict)
-
Expand source code
def get_limit(query: dict): return lens.Get("limit", [{}, {}])[1].Get("value", None).get()(query)
def term_adder(term: Optional[dict])
-
Expand source code
def term_adder(term: Optional[dict]): if term is None: return identity if len(term.keys()) > 1: raise ValueError( f"Multiple keys unexpected for term dictionary for fhir-search-service. {term}" ) return partial(and_query_clause, query_clause={"term": term})
def terms_adder(terms: Optional[dict], max_terms: int = 30000)
-
Expand source code
def terms_adder(terms: Optional[dict], max_terms: int = DEFAULT_MAX_TERMS): if terms is None: return identity if len(terms.keys()) > 1: raise ValueError( f"Multiple keys unexpected for terms dictionary for fhir-search-service. {terms}" ) key = list(terms.keys())[0] # NOTE: Must convert chunks from generator so that function can be run multiple times value_batches = list(chunks(max_terms, list(terms.values())[0])) def _adder(query): return [ and_query_clause(query, {"terms": {key: value_batch}}) for value_batch in value_batches ] return _adder
def update_limit(query: dict, update: Callable[[int], int])
-
Expand source code
def update_limit(query: dict, update: Callable[[int], int]): return FHIR_LIMIT.modify(compose(int, update))(query)