Module phc.easy.ocr.suggestion
Expand source code
import pandas as pd
from toolz import partial, pipe
from phc.base_client import BaseClient
from phc.easy.auth import Auth
from phc.easy.document_reference import DocumentReference
from phc.easy.query import Query
SUGGESTION_TYPES = [
"observation",
"condition",
"procedure",
"medicationAdministration",
]
COMPLEX_COLUMNS = [
"suggestions_comprehendResults",
"comprehendResults",
"wordIds",
]
class Suggestion(DocumentReference):
@classmethod
def get_data_frame(
cls,
document_id: str,
all_results=False,
raw: bool = False,
drop_complex_columns: bool = True,
auth_args: Auth = Auth.shared(),
**kw_args,
):
auth = Auth(auth_args)
results = Query.execute_paging_api(
f"ocr/fhir/projects/{auth.project_id}/documentReferences/{document_id}/suggestions",
{},
auth_args=auth_args,
item_key="records",
all_results=all_results,
try_count=False,
**{"ignore_cache": True, **kw_args},
)
if raw:
return results
results = expand_suggestion_df(results)
complex_columns = [c for c in COMPLEX_COLUMNS if c in results.columns]
if drop_complex_columns and len(complex_columns) > 0:
return results.drop(complex_columns, axis=1)
return results
def frame_for_type(df: pd.DataFrame, type: str):
return expand_json_and_merge(
df[[c for c in df.columns if c == type or c not in SUGGESTION_TYPES]],
type,
)
def expand_suggestion_df(frame: pd.DataFrame):
return (
expand_array_column(frame, key="suggestions")
.pipe(
lambda df: pd.concat(
[
expand_observations(frame_for_type(df, "observation")),
expand_conditions(frame_for_type(df, "condition")),
expand_procedures(frame_for_type(df, "procedure")),
expand_medication_administrations(
frame_for_type(df, "medicationAdministration")
),
]
)
)
.reset_index(drop=True)
)
def expand_medication_administrations(frame: pd.DataFrame):
return pipe(
frame.rename(
columns={
"medicationAdministration_medicationCode": "medication_code"
}
),
partial(
expand_generic, two_level_column="medicationAdministration_date"
),
partial(
expand_generic, two_level_column="medicationAdministration_endDate"
),
partial(
expand_generic, two_level_column="medicationAdministration_status"
),
partial(
expand_generic, two_level_column="medicationAdministration_dosage"
),
partial(expand_json_and_merge, key="dosage_value"),
partial(
expand_generic,
two_level_column="medication_code",
nested_json_columns=["dataSource", "value"],
),
preview_source_text_columns,
).pipe(lambda df: df.assign(type="medicationAdministration"))
def expand_procedures(frame: pd.DataFrame):
return pipe(
frame.rename(columns={"procedure_procedureCode": "procedure_code"}),
partial(expand_generic, two_level_column="procedure_date"),
partial(expand_generic, two_level_column="procedure_endDate"),
partial(expand_generic, two_level_column="procedure_value"),
partial(
expand_generic,
two_level_column="procedure_code",
nested_json_columns=["dataSource", "value"],
),
partial(
expand_generic,
two_level_column="procedure_bodySite",
expand_array=expand_nested_array_column,
),
partial(expand_json_and_merge, key="bodySite_value"),
preview_source_text_columns,
).pipe(lambda df: df.assign(type="procedure"))
def expand_observations(frame: pd.DataFrame):
return pipe(
frame.rename(
columns={"observation_observationCode": "observation_code"}
),
partial(expand_generic, two_level_column="observation_date"),
partial(expand_generic, two_level_column="observation_value"),
partial(
expand_generic,
two_level_column="observation_code",
nested_json_columns=["dataSource", "value"],
),
preview_source_text_columns,
).pipe(lambda df: df.assign(type="observation"))
def expand_conditions(frame: pd.DataFrame):
return pipe(
frame.rename(columns={"condition_conditionCode": "condition_code"}),
partial(
expand_generic,
two_level_column="condition_code",
nested_json_columns=["dataSource", "value"],
),
partial(expand_generic, two_level_column="condition_onsetDate"),
partial(expand_generic, two_level_column="condition_abatementDate"),
partial(
expand_generic,
two_level_column="condition_bodySite",
expand_array=expand_nested_array_column,
),
partial(expand_json_and_merge, key="bodySite_value"),
preview_source_text_columns,
).pipe(lambda df: df.assign(type="condition"))
def preview_source_text_columns(frame: pd.DataFrame):
columns = [c for c in frame.columns if c.endswith("sourceText")]
if len(columns) == 0:
return frame
def get_text(value: any):
if isinstance(value, dict):
return value["text"]
elif isinstance(value, list):
return " ".join([w["word"] for w in value])
else:
return None
return pd.concat(
[
frame.drop(columns, axis=1),
*[frame[c].apply(get_text) for c in columns],
],
axis=1,
)
def expand_nested_array_column(df: pd.DataFrame, key: str, lprefix=""):
if key not in df.columns:
return df
main = df.drop([key], axis=1)
expanded = pd.concat(
df.apply(
lambda x: (
pd.concat(
[
pd.DataFrame(
[{"index": x.name, "_item": i, **v} for v in array]
)
for i, array in enumerate(x[key])
]
)
# pd.concat does not like an empty array so we avoid that situation
if x[key] != []
else pd.DataFrame()
),
axis=1,
).values
).add_prefix(lprefix)
if len(expanded) == 0:
return main
return main.join(expanded.set_index(lprefix + "index")).reset_index(
drop=True
)
def expand_array_column(df: pd.DataFrame, key: str, lprefix=""):
if key not in df.columns:
return df
main = df.drop([key], axis=1)
expanded = pd.concat(
df.apply(
lambda x: pd.DataFrame([{"index": x.name, **s} for s in x[key]]),
axis=1,
).values
).add_prefix(lprefix)
if len(expanded) == 0:
return main
return main.join(
expanded.rename(
columns={"comprehendResults": f"{key}_comprehendResults"}
).set_index(lprefix + "index")
).reset_index(drop=True)
def expand_generic(
frame: pd.DataFrame,
two_level_column: str,
use_prefix=False,
nested_json_columns=["dataSource"],
expand_array=expand_array_column,
):
prefix, column = two_level_column.split("_")
prefix = prefix + "_" if use_prefix else ""
if two_level_column not in frame.columns:
return frame
return pipe(
frame,
partial(
expand_array, key=two_level_column, lprefix=prefix + column + "_"
),
*[
partial(expand_json_and_merge, key=prefix + column + "_" + c)
for c in nested_json_columns
],
)
def expand_json_and_merge(df: pd.DataFrame, key: str):
if key not in df.columns:
return df
series = df[key].fillna(df[key].apply(lambda _: dict()))
main = df.drop([key], axis=1)
expanded = pd.json_normalize(series)
if list(expanded.columns) == [key]:
return main
return pd.concat([main, expanded.add_prefix(f"{key}_")], axis=1)
Functions
def expand_array_column(df: pandas.core.frame.DataFrame, key: str, lprefix='')
-
Expand source code
def expand_array_column(df: pd.DataFrame, key: str, lprefix=""): if key not in df.columns: return df main = df.drop([key], axis=1) expanded = pd.concat( df.apply( lambda x: pd.DataFrame([{"index": x.name, **s} for s in x[key]]), axis=1, ).values ).add_prefix(lprefix) if len(expanded) == 0: return main return main.join( expanded.rename( columns={"comprehendResults": f"{key}_comprehendResults"} ).set_index(lprefix + "index") ).reset_index(drop=True)
def expand_conditions(frame: pandas.core.frame.DataFrame)
-
Expand source code
def expand_conditions(frame: pd.DataFrame): return pipe( frame.rename(columns={"condition_conditionCode": "condition_code"}), partial( expand_generic, two_level_column="condition_code", nested_json_columns=["dataSource", "value"], ), partial(expand_generic, two_level_column="condition_onsetDate"), partial(expand_generic, two_level_column="condition_abatementDate"), partial( expand_generic, two_level_column="condition_bodySite", expand_array=expand_nested_array_column, ), partial(expand_json_and_merge, key="bodySite_value"), preview_source_text_columns, ).pipe(lambda df: df.assign(type="condition"))
def expand_generic(frame: pandas.core.frame.DataFrame, two_level_column: str, use_prefix=False, nested_json_columns=['dataSource'], expand_array=<function expand_array_column>)
-
Expand source code
def expand_generic( frame: pd.DataFrame, two_level_column: str, use_prefix=False, nested_json_columns=["dataSource"], expand_array=expand_array_column, ): prefix, column = two_level_column.split("_") prefix = prefix + "_" if use_prefix else "" if two_level_column not in frame.columns: return frame return pipe( frame, partial( expand_array, key=two_level_column, lprefix=prefix + column + "_" ), *[ partial(expand_json_and_merge, key=prefix + column + "_" + c) for c in nested_json_columns ], )
def expand_json_and_merge(df: pandas.core.frame.DataFrame, key: str)
-
Expand source code
def expand_json_and_merge(df: pd.DataFrame, key: str): if key not in df.columns: return df series = df[key].fillna(df[key].apply(lambda _: dict())) main = df.drop([key], axis=1) expanded = pd.json_normalize(series) if list(expanded.columns) == [key]: return main return pd.concat([main, expanded.add_prefix(f"{key}_")], axis=1)
def expand_medication_administrations(frame: pandas.core.frame.DataFrame)
-
Expand source code
def expand_medication_administrations(frame: pd.DataFrame): return pipe( frame.rename( columns={ "medicationAdministration_medicationCode": "medication_code" } ), partial( expand_generic, two_level_column="medicationAdministration_date" ), partial( expand_generic, two_level_column="medicationAdministration_endDate" ), partial( expand_generic, two_level_column="medicationAdministration_status" ), partial( expand_generic, two_level_column="medicationAdministration_dosage" ), partial(expand_json_and_merge, key="dosage_value"), partial( expand_generic, two_level_column="medication_code", nested_json_columns=["dataSource", "value"], ), preview_source_text_columns, ).pipe(lambda df: df.assign(type="medicationAdministration"))
def expand_nested_array_column(df: pandas.core.frame.DataFrame, key: str, lprefix='')
-
Expand source code
def expand_nested_array_column(df: pd.DataFrame, key: str, lprefix=""): if key not in df.columns: return df main = df.drop([key], axis=1) expanded = pd.concat( df.apply( lambda x: ( pd.concat( [ pd.DataFrame( [{"index": x.name, "_item": i, **v} for v in array] ) for i, array in enumerate(x[key]) ] ) # pd.concat does not like an empty array so we avoid that situation if x[key] != [] else pd.DataFrame() ), axis=1, ).values ).add_prefix(lprefix) if len(expanded) == 0: return main return main.join(expanded.set_index(lprefix + "index")).reset_index( drop=True )
def expand_observations(frame: pandas.core.frame.DataFrame)
-
Expand source code
def expand_observations(frame: pd.DataFrame): return pipe( frame.rename( columns={"observation_observationCode": "observation_code"} ), partial(expand_generic, two_level_column="observation_date"), partial(expand_generic, two_level_column="observation_value"), partial( expand_generic, two_level_column="observation_code", nested_json_columns=["dataSource", "value"], ), preview_source_text_columns, ).pipe(lambda df: df.assign(type="observation"))
def expand_procedures(frame: pandas.core.frame.DataFrame)
-
Expand source code
def expand_procedures(frame: pd.DataFrame): return pipe( frame.rename(columns={"procedure_procedureCode": "procedure_code"}), partial(expand_generic, two_level_column="procedure_date"), partial(expand_generic, two_level_column="procedure_endDate"), partial(expand_generic, two_level_column="procedure_value"), partial( expand_generic, two_level_column="procedure_code", nested_json_columns=["dataSource", "value"], ), partial( expand_generic, two_level_column="procedure_bodySite", expand_array=expand_nested_array_column, ), partial(expand_json_and_merge, key="bodySite_value"), preview_source_text_columns, ).pipe(lambda df: df.assign(type="procedure"))
def expand_suggestion_df(frame: pandas.core.frame.DataFrame)
-
Expand source code
def expand_suggestion_df(frame: pd.DataFrame): return ( expand_array_column(frame, key="suggestions") .pipe( lambda df: pd.concat( [ expand_observations(frame_for_type(df, "observation")), expand_conditions(frame_for_type(df, "condition")), expand_procedures(frame_for_type(df, "procedure")), expand_medication_administrations( frame_for_type(df, "medicationAdministration") ), ] ) ) .reset_index(drop=True) )
def frame_for_type(df: pandas.core.frame.DataFrame, type: str)
-
Expand source code
def frame_for_type(df: pd.DataFrame, type: str): return expand_json_and_merge( df[[c for c in df.columns if c == type or c not in SUGGESTION_TYPES]], type, )
def preview_source_text_columns(frame: pandas.core.frame.DataFrame)
-
Expand source code
def preview_source_text_columns(frame: pd.DataFrame): columns = [c for c in frame.columns if c.endswith("sourceText")] if len(columns) == 0: return frame def get_text(value: any): if isinstance(value, dict): return value["text"] elif isinstance(value, list): return " ".join([w["word"] for w in value]) else: return None return pd.concat( [ frame.drop(columns, axis=1), *[frame[c].apply(get_text) for c in columns], ], axis=1, )
Classes
class Suggestion
-
Provides an abstract class and/or static methods for retrieving items from a FSS table that relates to a patient
Expand source code
class Suggestion(DocumentReference): @classmethod def get_data_frame( cls, document_id: str, all_results=False, raw: bool = False, drop_complex_columns: bool = True, auth_args: Auth = Auth.shared(), **kw_args, ): auth = Auth(auth_args) results = Query.execute_paging_api( f"ocr/fhir/projects/{auth.project_id}/documentReferences/{document_id}/suggestions", {}, auth_args=auth_args, item_key="records", all_results=all_results, try_count=False, **{"ignore_cache": True, **kw_args}, ) if raw: return results results = expand_suggestion_df(results) complex_columns = [c for c in COMPLEX_COLUMNS if c in results.columns] if drop_complex_columns and len(complex_columns) > 0: return results.drop(complex_columns, axis=1) return results
Ancestors
Static methods
def code_fields()
-
Inherited from:
DocumentReference
.code_fields
Returns the code keys (e.g. when searching for codes)
def get_codes(display_query: Optional[str] = None, sample_size: Optional[int] = None, exclude_meta_tag=True, **kwargs)
-
Inherited from:
DocumentReference
.get_codes
Find all codes …
def get_count(query_overrides: dict = {}, auth_args=<phc.easy.auth.Auth object>)
-
Inherited from:
DocumentReference
.get_count
Get the count for a given FSS query
def get_count_by_field(field: str, **kwargs)
-
Inherited from:
DocumentReference
.get_count_by_field
Count records by a given field …
def get_count_by_patient(**kwargs)
-
Inherited from:
DocumentReference
.get_count_by_patient
Count records by a given field …
def get_data_frame(document_id: str, all_results=False, raw: bool = False, drop_complex_columns: bool = True, auth_args: Auth = <phc.easy.auth.Auth object>, **kw_args)
-
Inherited from:
DocumentReference
.get_data_frame
Retrieve records …
Expand source code
@classmethod def get_data_frame( cls, document_id: str, all_results=False, raw: bool = False, drop_complex_columns: bool = True, auth_args: Auth = Auth.shared(), **kw_args, ): auth = Auth(auth_args) results = Query.execute_paging_api( f"ocr/fhir/projects/{auth.project_id}/documentReferences/{document_id}/suggestions", {}, auth_args=auth_args, item_key="records", all_results=all_results, try_count=False, **{"ignore_cache": True, **kw_args}, ) if raw: return results results = expand_suggestion_df(results) complex_columns = [c for c in COMPLEX_COLUMNS if c in results.columns] if drop_complex_columns and len(complex_columns) > 0: return results.drop(complex_columns, axis=1) return results
def table_name()
-
Inherited from:
DocumentReference
.table_name
Returns the FSS table name for retrieval
def transform_results(df: pandas.core.frame.DataFrame, **expand_args)
-
Inherited from:
DocumentReference
.transform_results
Transform data frame batch