Module phc.easy.util.api_cache
Expand source code
import hashlib
import json
import re
import os
from pathlib import Path
from typing import Callable, Optional
import numpy as np
import pandas as pd
from phc.easy.query.fhir_aggregation import FhirAggregation
from phc.util.csv_writer import CSVWriter
TABLE_REGEX = r"^[^F]+FROM (\w+)"
DIR = "~/Downloads/phc/api-cache"
DATE_FORMAT_REGEX = (
r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(\.\d{3})?([-+]\d{4}|Z)"
)
FHIR_DSL = "fhir_dsl"
class APICache:
@staticmethod
def filename_for_query(query: dict, namespace: Optional[str] = None):
"Descriptive filename with hash of query for easy retrieval"
is_aggregation = FhirAggregation.is_aggregation_query(query)
agg_description = "agg" if is_aggregation else ""
column_description = (
f"{len(query.get('columns', []))}col"
if not is_aggregation and isinstance(query.get("columns"), list)
else ""
)
where_description = "where" if query.get("where") else ""
unique_hash = hashlib.sha256(
json.dumps(query).encode("utf-8")
).hexdigest()[0:8]
path_name = [
# Exclude UUIDs but not paths with dashes
c.replace("-", "_")
for c in query.get("path", "").split("/")
if "-" not in c or len(c) != 36
]
components = [
namespace or "",
*path_name,
*[d.get("table", "") for d in query.get("from", [])],
agg_description,
column_description,
where_description,
unique_hash,
]
extension = "json" if is_aggregation else "csv"
return "_".join([c for c in components if len(c) > 0]) + "." + extension
@staticmethod
def does_cache_for_query_exist(
query: dict, namespace: Optional[str] = None
) -> bool:
return (
Path(DIR)
.expanduser()
.joinpath(APICache.filename_for_query(query, namespace))
.exists()
)
@staticmethod
def load_cache_for_query(
query: dict, namespace: Optional[str] = None
) -> pd.DataFrame:
filename = str(
Path(DIR)
.expanduser()
.joinpath(APICache.filename_for_query(query, namespace))
)
print(f'[CACHE] Loading from "{filename}"')
if FhirAggregation.is_aggregation_query(query):
with open(filename, "r") as f:
return FhirAggregation(json.load(f))
return APICache.read_csv(filename)
@staticmethod
def build_cache_callback(
query: dict,
transform: Callable[[pd.DataFrame], pd.DataFrame],
nested_key: Optional[str] = "_source",
namespace: Optional[str] = None,
):
"Build a CSV callback (not used for aggregations)"
folder = Path(DIR).expanduser()
folder.mkdir(parents=True, exist_ok=True)
filename = str(
folder.joinpath(APICache.filename_for_query(query, namespace))
)
writer = CSVWriter(filename)
def handle_batch(batch, is_finished):
batch = (
batch
if nested_key is None
else map(lambda r: r[nested_key], batch)
)
df = pd.DataFrame(batch)
if len(df) != 0:
writer.write(transform(df))
if is_finished and not os.path.exists(filename):
return pd.DataFrame()
if is_finished:
print(f'Loading data frame from "{filename}"')
return APICache.read_csv(filename)
return handle_batch
@staticmethod
def write_agg(
query: dict, agg: FhirAggregation, namespace: Optional[str] = None
):
folder = Path(DIR).expanduser()
folder.mkdir(parents=True, exist_ok=True)
filename = str(
folder.joinpath(APICache.filename_for_query(query, namespace))
)
print(f'Writing aggregation to "{filename}"')
with open(filename, "w") as file:
json.dump(agg.data, file, indent=2)
@staticmethod
def read_csv(filename: str) -> pd.DataFrame:
df = pd.read_csv(filename)
min_count = max(min(int(len(df) / 3), 5), 1)
# Columns are considered dates if enough examples of that format are found
mask = df.astype(str).apply(
lambda c: np.count_nonzero(c.str.match(DATE_FORMAT_REGEX))
> min_count
)
try:
df.loc[:, mask] = df.loc[:, mask].apply(pd.to_datetime)
except pd.errors.OutOfBoundsDatetime as ex:
print(
"[WARNING]: OutOfBoundsDatetime encountered. Casting to NaT.",
ex,
)
df.loc[:, mask] = df.loc[:, mask].apply(
lambda c: pd.to_datetime(c, errors="coerce")
)
return df
Classes
class APICache
-
Expand source code
class APICache: @staticmethod def filename_for_query(query: dict, namespace: Optional[str] = None): "Descriptive filename with hash of query for easy retrieval" is_aggregation = FhirAggregation.is_aggregation_query(query) agg_description = "agg" if is_aggregation else "" column_description = ( f"{len(query.get('columns', []))}col" if not is_aggregation and isinstance(query.get("columns"), list) else "" ) where_description = "where" if query.get("where") else "" unique_hash = hashlib.sha256( json.dumps(query).encode("utf-8") ).hexdigest()[0:8] path_name = [ # Exclude UUIDs but not paths with dashes c.replace("-", "_") for c in query.get("path", "").split("/") if "-" not in c or len(c) != 36 ] components = [ namespace or "", *path_name, *[d.get("table", "") for d in query.get("from", [])], agg_description, column_description, where_description, unique_hash, ] extension = "json" if is_aggregation else "csv" return "_".join([c for c in components if len(c) > 0]) + "." + extension @staticmethod def does_cache_for_query_exist( query: dict, namespace: Optional[str] = None ) -> bool: return ( Path(DIR) .expanduser() .joinpath(APICache.filename_for_query(query, namespace)) .exists() ) @staticmethod def load_cache_for_query( query: dict, namespace: Optional[str] = None ) -> pd.DataFrame: filename = str( Path(DIR) .expanduser() .joinpath(APICache.filename_for_query(query, namespace)) ) print(f'[CACHE] Loading from "{filename}"') if FhirAggregation.is_aggregation_query(query): with open(filename, "r") as f: return FhirAggregation(json.load(f)) return APICache.read_csv(filename) @staticmethod def build_cache_callback( query: dict, transform: Callable[[pd.DataFrame], pd.DataFrame], nested_key: Optional[str] = "_source", namespace: Optional[str] = None, ): "Build a CSV callback (not used for aggregations)" folder = Path(DIR).expanduser() folder.mkdir(parents=True, exist_ok=True) filename = str( folder.joinpath(APICache.filename_for_query(query, namespace)) ) writer = CSVWriter(filename) def handle_batch(batch, is_finished): batch = ( batch if nested_key is None else map(lambda r: r[nested_key], batch) ) df = pd.DataFrame(batch) if len(df) != 0: writer.write(transform(df)) if is_finished and not os.path.exists(filename): return pd.DataFrame() if is_finished: print(f'Loading data frame from "{filename}"') return APICache.read_csv(filename) return handle_batch @staticmethod def write_agg( query: dict, agg: FhirAggregation, namespace: Optional[str] = None ): folder = Path(DIR).expanduser() folder.mkdir(parents=True, exist_ok=True) filename = str( folder.joinpath(APICache.filename_for_query(query, namespace)) ) print(f'Writing aggregation to "{filename}"') with open(filename, "w") as file: json.dump(agg.data, file, indent=2) @staticmethod def read_csv(filename: str) -> pd.DataFrame: df = pd.read_csv(filename) min_count = max(min(int(len(df) / 3), 5), 1) # Columns are considered dates if enough examples of that format are found mask = df.astype(str).apply( lambda c: np.count_nonzero(c.str.match(DATE_FORMAT_REGEX)) > min_count ) try: df.loc[:, mask] = df.loc[:, mask].apply(pd.to_datetime) except pd.errors.OutOfBoundsDatetime as ex: print( "[WARNING]: OutOfBoundsDatetime encountered. Casting to NaT.", ex, ) df.loc[:, mask] = df.loc[:, mask].apply( lambda c: pd.to_datetime(c, errors="coerce") ) return df
Static methods
def build_cache_callback(query: dict, transform: Callable[[pandas.core.frame.DataFrame], pandas.core.frame.DataFrame], nested_key: Optional[str] = '_source', namespace: Optional[str] = None)
-
Build a CSV callback (not used for aggregations)
Expand source code
@staticmethod def build_cache_callback( query: dict, transform: Callable[[pd.DataFrame], pd.DataFrame], nested_key: Optional[str] = "_source", namespace: Optional[str] = None, ): "Build a CSV callback (not used for aggregations)" folder = Path(DIR).expanduser() folder.mkdir(parents=True, exist_ok=True) filename = str( folder.joinpath(APICache.filename_for_query(query, namespace)) ) writer = CSVWriter(filename) def handle_batch(batch, is_finished): batch = ( batch if nested_key is None else map(lambda r: r[nested_key], batch) ) df = pd.DataFrame(batch) if len(df) != 0: writer.write(transform(df)) if is_finished and not os.path.exists(filename): return pd.DataFrame() if is_finished: print(f'Loading data frame from "{filename}"') return APICache.read_csv(filename) return handle_batch
def does_cache_for_query_exist(query: dict, namespace: Optional[str] = None) ‑> bool
-
Expand source code
@staticmethod def does_cache_for_query_exist( query: dict, namespace: Optional[str] = None ) -> bool: return ( Path(DIR) .expanduser() .joinpath(APICache.filename_for_query(query, namespace)) .exists() )
def filename_for_query(query: dict, namespace: Optional[str] = None)
-
Descriptive filename with hash of query for easy retrieval
Expand source code
@staticmethod def filename_for_query(query: dict, namespace: Optional[str] = None): "Descriptive filename with hash of query for easy retrieval" is_aggregation = FhirAggregation.is_aggregation_query(query) agg_description = "agg" if is_aggregation else "" column_description = ( f"{len(query.get('columns', []))}col" if not is_aggregation and isinstance(query.get("columns"), list) else "" ) where_description = "where" if query.get("where") else "" unique_hash = hashlib.sha256( json.dumps(query).encode("utf-8") ).hexdigest()[0:8] path_name = [ # Exclude UUIDs but not paths with dashes c.replace("-", "_") for c in query.get("path", "").split("/") if "-" not in c or len(c) != 36 ] components = [ namespace or "", *path_name, *[d.get("table", "") for d in query.get("from", [])], agg_description, column_description, where_description, unique_hash, ] extension = "json" if is_aggregation else "csv" return "_".join([c for c in components if len(c) > 0]) + "." + extension
def load_cache_for_query(query: dict, namespace: Optional[str] = None) ‑> pandas.core.frame.DataFrame
-
Expand source code
@staticmethod def load_cache_for_query( query: dict, namespace: Optional[str] = None ) -> pd.DataFrame: filename = str( Path(DIR) .expanduser() .joinpath(APICache.filename_for_query(query, namespace)) ) print(f'[CACHE] Loading from "{filename}"') if FhirAggregation.is_aggregation_query(query): with open(filename, "r") as f: return FhirAggregation(json.load(f)) return APICache.read_csv(filename)
def read_csv(filename: str) ‑> pandas.core.frame.DataFrame
-
Expand source code
@staticmethod def read_csv(filename: str) -> pd.DataFrame: df = pd.read_csv(filename) min_count = max(min(int(len(df) / 3), 5), 1) # Columns are considered dates if enough examples of that format are found mask = df.astype(str).apply( lambda c: np.count_nonzero(c.str.match(DATE_FORMAT_REGEX)) > min_count ) try: df.loc[:, mask] = df.loc[:, mask].apply(pd.to_datetime) except pd.errors.OutOfBoundsDatetime as ex: print( "[WARNING]: OutOfBoundsDatetime encountered. Casting to NaT.", ex, ) df.loc[:, mask] = df.loc[:, mask].apply( lambda c: pd.to_datetime(c, errors="coerce") ) return df
def write_agg(query: dict, agg: FhirAggregation, namespace: Optional[str] = None)
-
Expand source code
@staticmethod def write_agg( query: dict, agg: FhirAggregation, namespace: Optional[str] = None ): folder = Path(DIR).expanduser() folder.mkdir(parents=True, exist_ok=True) filename = str( folder.joinpath(APICache.filename_for_query(query, namespace)) ) print(f'Writing aggregation to "{filename}"') with open(filename, "w") as file: json.dump(agg.data, file, indent=2)