Module phc.easy

Expand source code
from phc.easy.abstract.fhir_service_item import FhirServiceItem
from phc.easy.abstract.fhir_service_patient_item import FhirServicePatientItem
from phc.easy.audit_event import AuditEvent
from phc.easy.auth import Auth
from phc.easy.care_plan import CarePlan
from phc.easy.codeable import Codeable
from phc.easy.composition import Composition
from phc.easy.condition import Condition
from phc.easy.consent import Consent
from phc.easy.diagnostic_report import DiagnosticReport
from phc.easy.document_reference import DocumentReference
from phc.easy.encounter import Encounter
from phc.easy.frame import Frame
from phc.easy.goal import Goal
from phc.easy.imaging_study import ImagingStudy
from phc.easy.immunization import Immunization
from phc.easy.media import Media
from phc.easy.medication_administration import MedicationAdministration
from phc.easy.medication_dispense import MedicationDispense
from phc.easy.medication_request import MedicationRequest
from phc.easy.medication_statement import MedicationStatement
from phc.easy.observation import Observation
from phc.easy.ocr import Ocr
from phc.easy.omics.gene import Gene
from phc.easy.omics.gene_set import GeneSet
from phc.easy.omics.genomic_copy_number_variant import GenomicCopyNumberVariant
from phc.easy.omics.genomic_expression import GenomicExpression
from phc.easy.omics.genomic_short_variant import GenomicShortVariant
from phc.easy.omics.genomic_structural_variant import GenomicStructuralVariant
from phc.easy.omics.genomic_test import GenomicTest
from phc.easy.option import Option
from phc.easy.organization import Organization
from phc.easy.patients import Patient
from phc.easy.person import Person
from phc.easy.practitioner import Practitioner
from phc.easy.procedure import Procedure
from phc.easy.procedure_request import ProcedureRequest
from phc.easy.projects import Project
from phc.easy.provenance import Provenance
from phc.easy.query import Query
from phc.easy.referral_request import ReferralRequest
from phc.easy.sequence import Sequence
from phc.easy.specimen import Specimen

from phc.easy.summary.counts import SummaryCounts
from phc.easy.summary.item_counts import SummaryItemCounts
from phc.easy.summary.clinical_counts import SummaryClinicalCounts
from phc.easy.summary.omics_counts import SummaryOmicsCounts

__all__ = [
    "AuditEvent",
    "Auth",
    "CarePlan",
    "Codeable",
    "Composition",
    "Condition",
    "Consent",
    "DiagnosticReport",
    "DocumentReference",
    "Encounter",
    "Frame",
    "Gene",
    "GeneSet",
    "GenomicShortVariant",
    "GenomicStructuralVariant",
    "GenomicCopyNumberVariant",
    "GenomicExpression",
    "GenomicTest",
    "Goal",
    "ImagingStudy",
    "Immunization",
    "FhirServiceItem",
    "Media",
    "MedicationAdministration",
    "MedicationDispense",
    "MedicationRequest",
    "MedicationStatement",
    "Ocr",
    "Observation",
    "Option",
    "Organization",
    "FhirServicePatientItem",
    "Patient",
    "Person",
    "Practitioner",
    "Procedure",
    "ProcedureRequest",
    "Project",
    "Provenance",
    "Query",
    "ReferralRequest",
    "Sequence",
    "Specimen",
    "SummaryCounts",
    "SummaryItemCounts",
    "SummaryOmicsCounts",
    "SummaryClinicalCounts",
]

Sub-modules

phc.easy.abstract
phc.easy.audit_event
phc.easy.auth
phc.easy.care_plan
phc.easy.codeable
phc.easy.composition
phc.easy.condition
phc.easy.consent
phc.easy.diagnostic_report
phc.easy.document_reference
phc.easy.dstu3
phc.easy.encounter
phc.easy.frame
phc.easy.goal
phc.easy.imaging_study
phc.easy.immunization
phc.easy.media
phc.easy.medication_administration
phc.easy.medication_dispense
phc.easy.medication_request
phc.easy.medication_statement
phc.easy.observation
phc.easy.ocr
phc.easy.omics
phc.easy.option
phc.easy.organization
phc.easy.patients
phc.easy.person
phc.easy.practitioner
phc.easy.procedure
phc.easy.procedure_request
phc.easy.projects
phc.easy.provenance
phc.easy.query
phc.easy.referral_request
phc.easy.sequence
phc.easy.specimen
phc.easy.summary
phc.easy.util

Classes

class AuditEvent

Provides an abstract class and/or static methods for retrieving items from a FSS table that relates to a patient

Expand source code
class AuditEvent(FhirServicePatientItem):
    @staticmethod
    def table_name():
        return "audit_event"

    @staticmethod
    def patient_key():
        return "entity.reference.reference"

    @staticmethod
    def code_fields():
        return ["source.type", "entity.lifecycle"]

    @staticmethod
    def transform_results(df: pd.DataFrame, **expand_args):
        return Frame.expand(
            df,
            date_columns=[
                *expand_args.get("date_columns", []),
                "effectiveDateTime",
            ],
            code_columns=[
                *expand_args.get("code_columns", []),
                "type",
                "subtype",
            ],
            custom_columns=[
                *expand_args.get("custom_columns", []),
                Frame.codeable_like_column_expander("agent"),
                Frame.codeable_like_column_expander("source"),
                Frame.codeable_like_column_expander("entity"),
            ],
        )

    @staticmethod
    def get_count_by_patient():
        raise ValueError("AuditEvent records are not exclusive to a patient.")

Ancestors

Static methods

def code_fields()

Inherited from: FhirServicePatientItem.code_fields

Returns the code keys (e.g. when searching for codes)

Expand source code
@staticmethod
def code_fields():
    return ["source.type", "entity.lifecycle"]
def get_codes(display_query: Optional[str] = None, sample_size: Optional[int] = None, exclude_meta_tag=True, **kwargs)

Inherited from: FhirServicePatientItem.get_codes

Find all codes …

def get_count(query_overrides: dict = {}, auth_args=<phc.easy.auth.Auth object>)

Inherited from: FhirServicePatientItem.get_count

Get the count for a given FSS query

def get_count_by_field(field: str, **kwargs)

Inherited from: FhirServicePatientItem.get_count_by_field

Count records by a given field …

def get_count_by_patient()

Inherited from: FhirServicePatientItem.get_count_by_patient

Count records by a given field …

Expand source code
@staticmethod
def get_count_by_patient():
    raise ValueError("AuditEvent records are not exclusive to a patient.")
def get_data_frame(all_results: bool = False, raw: bool = False, id: Optional[str] = None, ids: List[str] = [], patient_id: Optional[str] = None, patient_ids: List[str] = [], device_id: Optional[str] = None, device_ids: List[str] = [], page_size: Optional[int] = None, max_pages: Optional[int] = None, query_overrides: dict = {}, auth_args=<phc.easy.auth.Auth object>, ignore_cache: bool = False, expand_args: dict = {}, log: bool = False, term: Optional[dict] = None, terms: List[dict] = [], max_terms: int = 30000, code: Union[str, List[str], None] = None, display: Union[str, List[str], None] = None, system: Union[str, List[str], None] = None, code_fields: List[str] = [])

Inherited from: FhirServicePatientItem.get_data_frame

Retrieve records …

def patient_key()
Expand source code
@staticmethod
def patient_key():
    return "entity.reference.reference"
def table_name()

Inherited from: FhirServicePatientItem.table_name

Returns the FSS table name for retrieval

Expand source code
@staticmethod
def table_name():
    return "audit_event"
def transform_results(df: pandas.core.frame.DataFrame, **expand_args)

Inherited from: FhirServicePatientItem.transform_results

Transform data frame batch

Expand source code
@staticmethod
def transform_results(df: pd.DataFrame, **expand_args):
    return Frame.expand(
        df,
        date_columns=[
            *expand_args.get("date_columns", []),
            "effectiveDateTime",
        ],
        code_columns=[
            *expand_args.get("code_columns", []),
            "type",
            "subtype",
        ],
        custom_columns=[
            *expand_args.get("custom_columns", []),
            Frame.codeable_like_column_expander("agent"),
            Frame.codeable_like_column_expander("source"),
            Frame.codeable_like_column_expander("entity"),
        ],
    )
class Auth (details: Union[Any, None, Dict[str, str]] = None)

Create an authentication object that can be shared as a single argument to the 'easy' SDK calls

Attributes

details : Auth | dict | None
 

A dictionary representation of the token, account, and/or project id. Can also be another authentication object. Will use environment variables as the default.

account : str The PHC account to authenticate against Defaults to $PHC_ACCOUNT

project_id : str (Optional) The ID of the project to pull resources from Defaults to $PHC_PROJECT_ID

token : str (Optional) The API key to use Defaults to $PHC_ACCESS_TOKEN

adapter: Adapter (Optional) A custom adapter to execute requests Defaults to normal API adapter

Expand source code
class Auth:
    token: str
    project_id: str
    account: str
    adapter: Adapter

    def __init__(self, details: Union[Any, None, Dict[str, str]] = None):
        """Create an authentication object that can be shared as a single argument to
        the 'easy' SDK calls

        Attributes
        ----------
        details : Auth | dict | None

          A dictionary representation of the token, account, and/or project id.
          Can also be another authentication object. Will use environment
          variables as the default.

          account : str
              The PHC account to authenticate against
              Defaults to $PHC_ACCOUNT

          project_id : str
              (Optional) The ID of the project to pull resources from
              Defaults to $PHC_PROJECT_ID

          token : str
              (Optional) The API key to use
              Defaults to $PHC_ACCESS_TOKEN

          adapter: Adapter
              (Optional) A custom adapter to execute requests
              Defaults to normal API adapter
        """
        if _shared_auth:
            # Start with shared credentials
            self.update(_shared_auth.details())

        self.update(details)

    @staticmethod
    def custom(details: Union[None, Dict[str, str]]):
        "Returns customized auth object from the shared one"
        return Auth.shared().customized(details)

    @staticmethod
    def set(details: Union[None, Dict[str, str]]):
        "Updates and returns the shared authentication singleton"
        shared = Auth.shared()
        shared.update(details)
        return shared

    @staticmethod
    def shared():
        global _shared_auth

        if not _shared_auth:
            _shared_auth = Auth()

        return _shared_auth

    def customized(self, details: Union[None, Dict[str, str]]):
        "Returns copied, customized auth object from this object"
        custom = self.__copy()
        custom.update(details)
        return custom

    @defaultprop
    def token(self):
        return os.environ.get("PHC_ACCESS_TOKEN")

    @defaultprop
    def account(self):
        return os.environ.get("PHC_ACCOUNT")

    @defaultprop
    def project_id(self):
        env_project_id = os.environ.get("PHC_PROJECT_ID")

        if env_project_id is None:
            raise ValueError("No project_id has been selected.")

        return env_project_id

    @defaultprop
    def adapter(self):
        return Adapter()

    def session(self):
        "Create an API session for use with modules not in the 'easy' namespace"
        return Session(
            token=self.token, account=self.account, adapter=self.adapter
        )

    def accounts(self):
        "List available accounts for the authenticated user"
        return Accounts(self.session()).get_list().data.get("accounts")

    def details(self):
        return {
            "account": self.account,
            "project_id": getattr(self, "_project_id", None),
            "token": self.token,
            "adapter": self.adapter,
        }

    def __copy(self):
        return Auth(self)

    def update(self, details: Union[Any, None, Dict[str, str]] = None):
        """Set details of authentication for API calls
        (Prefer auth.customized unless mutation is required.)

        Attributes
        ----------
        details : Auth | dict | None

          A dictionary representation of the token, account, and/or project id.
          Can also be another authentication object. Will use environment
          variables as the default.

          account : str
              The PHC account to authenticate against
              Defaults to $PHC_ACCOUNT

          project_id : str
              (Optional) The ID of the project to pull resources from
              Defaults to $PHC_PROJECT_ID

          token : str
              (Optional) The API key to use
              Defaults to $PHC_ACCESS_TOKEN

          adapter: Adapter
              (Optional) A custom adapter to execute requests
              Defaults to normal API adapter
        """
        if details is None:
            return

        if type(details) == Auth:
            auth = details
            details = auth.details()

        if details.get("account"):
            self._account = details.get("account")

        if details.get("project_id"):
            self._project_id = details.get("project_id")

        if details.get("token"):
            self._token = details.get("token")

        if details.get("adapter"):
            self._adapter = details.get("adapter")

Static methods

def custom(details: Optional[None])

Returns customized auth object from the shared one

Expand source code
@staticmethod
def custom(details: Union[None, Dict[str, str]]):
    "Returns customized auth object from the shared one"
    return Auth.shared().customized(details)
def set(details: Optional[None])

Updates and returns the shared authentication singleton

Expand source code
@staticmethod
def set(details: Union[None, Dict[str, str]]):
    "Updates and returns the shared authentication singleton"
    shared = Auth.shared()
    shared.update(details)
    return shared
def shared()
Expand source code
@staticmethod
def shared():
    global _shared_auth

    if not _shared_auth:
        _shared_auth = Auth()

    return _shared_auth

Instance variables

var account : str
Expand source code
@defaultprop
def account(self):
    return os.environ.get("PHC_ACCOUNT")
var adapterAdapter
Expand source code
@defaultprop
def adapter(self):
    return Adapter()
var project_id : str
Expand source code
@defaultprop
def project_id(self):
    env_project_id = os.environ.get("PHC_PROJECT_ID")

    if env_project_id is None:
        raise ValueError("No project_id has been selected.")

    return env_project_id
var token : str
Expand source code
@defaultprop
def token(self):
    return os.environ.get("PHC_ACCESS_TOKEN")

Methods

def accounts(self)

List available accounts for the authenticated user

Expand source code
def accounts(self):
    "List available accounts for the authenticated user"
    return Accounts(self.session()).get_list().data.get("accounts")
def customized(self, details: Optional[None])

Returns copied, customized auth object from this object

Expand source code
def customized(self, details: Union[None, Dict[str, str]]):
    "Returns copied, customized auth object from this object"
    custom = self.__copy()
    custom.update(details)
    return custom
def details(self)
Expand source code
def details(self):
    return {
        "account": self.account,
        "project_id": getattr(self, "_project_id", None),
        "token": self.token,
        "adapter": self.adapter,
    }
def session(self)

Create an API session for use with modules not in the 'easy' namespace

Expand source code
def session(self):
    "Create an API session for use with modules not in the 'easy' namespace"
    return Session(
        token=self.token, account=self.account, adapter=self.adapter
    )
def update(self, details: Union[Any, None, Dict[str, str]] = None)

Set details of authentication for API calls (Prefer auth.customized unless mutation is required.)

Attributes

details : Auth | dict | None
 

A dictionary representation of the token, account, and/or project id. Can also be another authentication object. Will use environment variables as the default.

account : str The PHC account to authenticate against Defaults to $PHC_ACCOUNT

project_id : str (Optional) The ID of the project to pull resources from Defaults to $PHC_PROJECT_ID

token : str (Optional) The API key to use Defaults to $PHC_ACCESS_TOKEN

adapter: Adapter (Optional) A custom adapter to execute requests Defaults to normal API adapter

Expand source code
def update(self, details: Union[Any, None, Dict[str, str]] = None):
    """Set details of authentication for API calls
    (Prefer auth.customized unless mutation is required.)

    Attributes
    ----------
    details : Auth | dict | None

      A dictionary representation of the token, account, and/or project id.
      Can also be another authentication object. Will use environment
      variables as the default.

      account : str
          The PHC account to authenticate against
          Defaults to $PHC_ACCOUNT

      project_id : str
          (Optional) The ID of the project to pull resources from
          Defaults to $PHC_PROJECT_ID

      token : str
          (Optional) The API key to use
          Defaults to $PHC_ACCESS_TOKEN

      adapter: Adapter
          (Optional) A custom adapter to execute requests
          Defaults to normal API adapter
    """
    if details is None:
        return

    if type(details) == Auth:
        auth = details
        details = auth.details()

    if details.get("account"):
        self._account = details.get("account")

    if details.get("project_id"):
        self._project_id = details.get("project_id")

    if details.get("token"):
        self._token = details.get("token")

    if details.get("adapter"):
        self._adapter = details.get("adapter")
class CarePlan

Provides an abstract class and/or static methods for retrieving items from a FSS table that relates to a patient

Expand source code
class CarePlan(FhirServicePatientItem):
    @staticmethod
    def table_name():
        return "care_plan"

    @staticmethod
    def code_fields():
        return ["meta.tag", "category.coding"]

    @staticmethod
    def transform_results(df: pd.DataFrame, **expand_args):
        return Frame.expand(
            df,
            custom_columns=[
                *expand_args.get("custom_columns", []),
                Frame.codeable_like_column_expander("subject"),
                Frame.codeable_like_column_expander("context"),
                Frame.codeable_like_column_expander("activity"),
            ],
        )

Ancestors

Static methods

def code_fields()

Inherited from: FhirServicePatientItem.code_fields

Returns the code keys (e.g. when searching for codes)

Expand source code
@staticmethod
def code_fields():
    return ["meta.tag", "category.coding"]
def get_codes(display_query: Optional[str] = None, sample_size: Optional[int] = None, exclude_meta_tag=True, **kwargs)

Inherited from: FhirServicePatientItem.get_codes

Find all codes …

def get_count(query_overrides: dict = {}, auth_args=<phc.easy.auth.Auth object>)

Inherited from: FhirServicePatientItem.get_count

Get the count for a given FSS query

def get_count_by_field(field: str, **kwargs)

Inherited from: FhirServicePatientItem.get_count_by_field

Count records by a given field …

def get_count_by_patient(**kwargs)

Inherited from: FhirServicePatientItem.get_count_by_patient

Count records by a given field …

def get_data_frame(all_results: bool = False, raw: bool = False, id: Optional[str] = None, ids: List[str] = [], patient_id: Optional[str] = None, patient_ids: List[str] = [], device_id: Optional[str] = None, device_ids: List[str] = [], page_size: Optional[int] = None, max_pages: Optional[int] = None, query_overrides: dict = {}, auth_args=<phc.easy.auth.Auth object>, ignore_cache: bool = False, expand_args: dict = {}, log: bool = False, term: Optional[dict] = None, terms: List[dict] = [], max_terms: int = 30000, code: Union[str, List[str], None] = None, display: Union[str, List[str], None] = None, system: Union[str, List[str], None] = None, code_fields: List[str] = [])

Inherited from: FhirServicePatientItem.get_data_frame

Retrieve records …

def table_name()

Inherited from: FhirServicePatientItem.table_name

Returns the FSS table name for retrieval

Expand source code
@staticmethod
def table_name():
    return "care_plan"
def transform_results(df: pandas.core.frame.DataFrame, **expand_args)

Inherited from: FhirServicePatientItem.transform_results

Transform data frame batch

Expand source code
@staticmethod
def transform_results(df: pd.DataFrame, **expand_args):
    return Frame.expand(
        df,
        custom_columns=[
            *expand_args.get("custom_columns", []),
            Frame.codeable_like_column_expander("subject"),
            Frame.codeable_like_column_expander("context"),
            Frame.codeable_like_column_expander("activity"),
        ],
    )
class Codeable
Expand source code
class Codeable:
    @staticmethod
    def expand_column(codeable_col: pd.Series):
        """Convert a pandas dictionary column with codeable data into a data frame

        Attributes
        ----------
        codeable_col : pd.Series
            A pandas column that contains codeable data (FHIR resources)
        """
        return pd.DataFrame(map(generic_codeable_to_dict, codeable_col.values))

Static methods

def expand_column(codeable_col: pandas.core.series.Series)

Convert a pandas dictionary column with codeable data into a data frame

Attributes

codeable_col : pd.Series
A pandas column that contains codeable data (FHIR resources)
Expand source code
@staticmethod
def expand_column(codeable_col: pd.Series):
    """Convert a pandas dictionary column with codeable data into a data frame

    Attributes
    ----------
    codeable_col : pd.Series
        A pandas column that contains codeable data (FHIR resources)
    """
    return pd.DataFrame(map(generic_codeable_to_dict, codeable_col.values))
class Composition

Provides an abstract class and/or static methods for retrieving items from a FSS table that relates to a patient

Expand source code
class Composition(FhirServicePatientItem):
    @staticmethod
    def table_name():
        return "composition"

    @staticmethod
    def code_fields():
        return ["meta.tag"]

    @staticmethod
    def transform_results(df: pd.DataFrame, **expand_args):
        return Frame.expand(
            df,
            date_columns=[*expand_args.get("date_columns", []), "date"],
            code_columns=[
                *expand_args.get("code_columns", []),
                "type",
                "author",
            ],
            custom_columns=[
                *expand_args.get("custom_columns", []),
                Frame.codeable_like_column_expander("subject"),
                Frame.codeable_like_column_expander("text"),
                Frame.codeable_like_column_expander("relatesTo"),
            ],
        )

Ancestors

Subclasses

Static methods

def code_fields()

Inherited from: FhirServicePatientItem.code_fields

Returns the code keys (e.g. when searching for codes)

Expand source code
@staticmethod
def code_fields():
    return ["meta.tag"]
def get_codes(display_query: Optional[str] = None, sample_size: Optional[int] = None, exclude_meta_tag=True, **kwargs)

Inherited from: FhirServicePatientItem.get_codes

Find all codes …

def get_count(query_overrides: dict = {}, auth_args=<phc.easy.auth.Auth object>)

Inherited from: FhirServicePatientItem.get_count

Get the count for a given FSS query

def get_count_by_field(field: str, **kwargs)

Inherited from: FhirServicePatientItem.get_count_by_field

Count records by a given field …

def get_count_by_patient(**kwargs)

Inherited from: FhirServicePatientItem.get_count_by_patient

Count records by a given field …

def get_data_frame(all_results: bool = False, raw: bool = False, id: Optional[str] = None, ids: List[str] = [], patient_id: Optional[str] = None, patient_ids: List[str] = [], device_id: Optional[str] = None, device_ids: List[str] = [], page_size: Optional[int] = None, max_pages: Optional[int] = None, query_overrides: dict = {}, auth_args=<phc.easy.auth.Auth object>, ignore_cache: bool = False, expand_args: dict = {}, log: bool = False, term: Optional[dict] = None, terms: List[dict] = [], max_terms: int = 30000, code: Union[str, List[str], None] = None, display: Union[str, List[str], None] = None, system: Union[str, List[str], None] = None, code_fields: List[str] = [])

Inherited from: FhirServicePatientItem.get_data_frame

Retrieve records …

def table_name()

Inherited from: FhirServicePatientItem.table_name

Returns the FSS table name for retrieval

Expand source code
@staticmethod
def table_name():
    return "composition"
def transform_results(df: pandas.core.frame.DataFrame, **expand_args)

Inherited from: FhirServicePatientItem.transform_results

Transform data frame batch

Expand source code
@staticmethod
def transform_results(df: pd.DataFrame, **expand_args):
    return Frame.expand(
        df,
        date_columns=[*expand_args.get("date_columns", []), "date"],
        code_columns=[
            *expand_args.get("code_columns", []),
            "type",
            "author",
        ],
        custom_columns=[
            *expand_args.get("custom_columns", []),
            Frame.codeable_like_column_expander("subject"),
            Frame.codeable_like_column_expander("text"),
            Frame.codeable_like_column_expander("relatesTo"),
        ],
    )
class Condition

Provides an abstract class and/or static methods for retrieving items from a FSS table that relates to a patient

Expand source code
class Condition(FhirServicePatientItem):
    @staticmethod
    def table_name():
        return "condition"

    @staticmethod
    def code_fields():
        return [
            "meta.tag",
            "code.coding",
            "bodySite.coding",
            "stage.summary.coding",
        ]

    @classmethod
    def get_codes(cls, query: Optional[str] = None):
        """Find codes based on case-insensitive matching of code/display/system

        Example
        --------
        >>> import phc.easy as phc
        >>> phc.Auth.set({'account': '<your-account-name>'})
        >>> phc.Project.set_current('My Project Name')
        >>>
        >>> phc.Observation.get_codes("loinc")
        """
        return search(
            SummaryItemCounts.get_data_frame(cls.table_name()), query=query
        )

    @staticmethod
    def transform_results(df: pd.DataFrame, **expand_args):
        return Frame.expand(
            df,
            date_columns=[
                *expand_args.get("date_columns", []),
                "onsetDateTime",
                "assertedDate",
                "onsetPeriod.start",
                "onsetPeriod.end",
            ],
            code_columns=[
                *expand_args.get("code_columns", []),
                "bodySite",
                "stage",
            ],
            custom_columns=[
                *expand_args.get("custom_columns", []),
                Frame.codeable_like_column_expander("subject"),
                Frame.codeable_like_column_expander("onsetPeriod"),
                Frame.codeable_like_column_expander("context"),
            ],
        )

Ancestors

Static methods

def code_fields()

Inherited from: FhirServicePatientItem.code_fields

Returns the code keys (e.g. when searching for codes)

Expand source code
@staticmethod
def code_fields():
    return [
        "meta.tag",
        "code.coding",
        "bodySite.coding",
        "stage.summary.coding",
    ]
def get_codes(query: Optional[str] = None)

Find codes based on case-insensitive matching of code/display/system

Example

>>> import phc.easy as phc
>>> phc.Auth.set({'account': '<your-account-name>'})
>>> phc.Project.set_current('My Project Name')
>>>
>>> phc.Observation.get_codes("loinc")
Expand source code
@classmethod
def get_codes(cls, query: Optional[str] = None):
    """Find codes based on case-insensitive matching of code/display/system

    Example
    --------
    >>> import phc.easy as phc
    >>> phc.Auth.set({'account': '<your-account-name>'})
    >>> phc.Project.set_current('My Project Name')
    >>>
    >>> phc.Observation.get_codes("loinc")
    """
    return search(
        SummaryItemCounts.get_data_frame(cls.table_name()), query=query
    )
def get_count(query_overrides: dict = {}, auth_args=<phc.easy.auth.Auth object>)

Inherited from: FhirServicePatientItem.get_count

Get the count for a given FSS query

def get_count_by_field(field: str, **kwargs)

Inherited from: FhirServicePatientItem.get_count_by_field

Count records by a given field …

def get_count_by_patient(**kwargs)

Inherited from: FhirServicePatientItem.get_count_by_patient

Count records by a given field …

def get_data_frame(all_results: bool = False, raw: bool = False, id: Optional[str] = None, ids: List[str] = [], patient_id: Optional[str] = None, patient_ids: List[str] = [], device_id: Optional[str] = None, device_ids: List[str] = [], page_size: Optional[int] = None, max_pages: Optional[int] = None, query_overrides: dict = {}, auth_args=<phc.easy.auth.Auth object>, ignore_cache: bool = False, expand_args: dict = {}, log: bool = False, term: Optional[dict] = None, terms: List[dict] = [], max_terms: int = 30000, code: Union[str, List[str], None] = None, display: Union[str, List[str], None] = None, system: Union[str, List[str], None] = None, code_fields: List[str] = [])

Inherited from: FhirServicePatientItem.get_data_frame

Retrieve records …

def table_name()

Inherited from: FhirServicePatientItem.table_name

Returns the FSS table name for retrieval

Expand source code
@staticmethod
def table_name():
    return "condition"
def transform_results(df: pandas.core.frame.DataFrame, **expand_args)

Inherited from: FhirServicePatientItem.transform_results

Transform data frame batch

Expand source code
@staticmethod
def transform_results(df: pd.DataFrame, **expand_args):
    return Frame.expand(
        df,
        date_columns=[
            *expand_args.get("date_columns", []),
            "onsetDateTime",
            "assertedDate",
            "onsetPeriod.start",
            "onsetPeriod.end",
        ],
        code_columns=[
            *expand_args.get("code_columns", []),
            "bodySite",
            "stage",
        ],
        custom_columns=[
            *expand_args.get("custom_columns", []),
            Frame.codeable_like_column_expander("subject"),
            Frame.codeable_like_column_expander("onsetPeriod"),
            Frame.codeable_like_column_expander("context"),
        ],
    )
class Consent

Provides an abstract class and/or static methods for retrieving items from a FSS table that relates to a patient

Expand source code
class Consent(FhirServicePatientItem):
    @staticmethod
    def table_name():
        return "consent"

    @staticmethod
    def patient_key():
        return "patient.reference"

    @staticmethod
    def code_fields():
        return ["meta.tag"]

    @staticmethod
    def transform_results(df: pd.DataFrame, **expand_args):
        return Frame.expand(
            df,
            date_columns=[*expand_args.get("date_columns", []), "dateTime"],
            custom_columns=[
                *expand_args.get("custom_columns", []),
                Frame.codeable_like_column_expander("sourceReference"),
                Frame.codeable_like_column_expander("actor"),
                Frame.codeable_like_column_expander("patient"),
            ],
        )

Ancestors

Static methods

def code_fields()

Inherited from: FhirServicePatientItem.code_fields

Returns the code keys (e.g. when searching for codes)

Expand source code
@staticmethod
def code_fields():
    return ["meta.tag"]
def get_codes(display_query: Optional[str] = None, sample_size: Optional[int] = None, exclude_meta_tag=True, **kwargs)

Inherited from: FhirServicePatientItem.get_codes

Find all codes …

def get_count(query_overrides: dict = {}, auth_args=<phc.easy.auth.Auth object>)

Inherited from: FhirServicePatientItem.get_count

Get the count for a given FSS query

def get_count_by_field(field: str, **kwargs)

Inherited from: FhirServicePatientItem.get_count_by_field

Count records by a given field …

def get_count_by_patient(**kwargs)

Inherited from: FhirServicePatientItem.get_count_by_patient

Count records by a given field …

def get_data_frame(all_results: bool = False, raw: bool = False, id: Optional[str] = None, ids: List[str] = [], patient_id: Optional[str] = None, patient_ids: List[str] = [], device_id: Optional[str] = None, device_ids: List[str] = [], page_size: Optional[int] = None, max_pages: Optional[int] = None, query_overrides: dict = {}, auth_args=<phc.easy.auth.Auth object>, ignore_cache: bool = False, expand_args: dict = {}, log: bool = False, term: Optional[dict] = None, terms: List[dict] = [], max_terms: int = 30000, code: Union[str, List[str], None] = None, display: Union[str, List[str], None] = None, system: Union[str, List[str], None] = None, code_fields: List[str] = [])

Inherited from: FhirServicePatientItem.get_data_frame

Retrieve records …

def patient_key()
Expand source code
@staticmethod
def patient_key():
    return "patient.reference"
def table_name()

Inherited from: FhirServicePatientItem.table_name

Returns the FSS table name for retrieval

Expand source code
@staticmethod
def table_name():
    return "consent"
def transform_results(df: pandas.core.frame.DataFrame, **expand_args)

Inherited from: FhirServicePatientItem.transform_results

Transform data frame batch

Expand source code
@staticmethod
def transform_results(df: pd.DataFrame, **expand_args):
    return Frame.expand(
        df,
        date_columns=[*expand_args.get("date_columns", []), "dateTime"],
        custom_columns=[
            *expand_args.get("custom_columns", []),
            Frame.codeable_like_column_expander("sourceReference"),
            Frame.codeable_like_column_expander("actor"),
            Frame.codeable_like_column_expander("patient"),
        ],
    )
class DiagnosticReport

Provides an abstract class and/or static methods for retrieving items from a FSS table that relates to a patient

Expand source code
class DiagnosticReport(FhirServicePatientItem):
    @staticmethod
    def table_name():
        return "diagnostic_report"

    @staticmethod
    def patient_id_prefixes():
        return ["Patient/", "urn:uuid:"]

    @staticmethod
    def patient_key():
        return "subject.reference"

    @staticmethod
    def code_fields():
        return ["meta.tag"]

    @staticmethod
    def transform_results(df: pd.DataFrame, **expand_args):
        return Frame.expand(
            df,
            custom_columns=[
                *expand_args.get("custom_columns", []),
                Frame.codeable_like_column_expander("subject"),
                Frame.codeable_like_column_expander("presentedForm"),
                Frame.codeable_like_column_expander("result"),
            ],
        )

Ancestors

Static methods

def code_fields()

Inherited from: FhirServicePatientItem.code_fields

Returns the code keys (e.g. when searching for codes)

Expand source code
@staticmethod
def code_fields():
    return ["meta.tag"]
def get_codes(display_query: Optional[str] = None, sample_size: Optional[int] = None, exclude_meta_tag=True, **kwargs)

Inherited from: FhirServicePatientItem.get_codes

Find all codes …

def get_count(query_overrides: dict = {}, auth_args=<phc.easy.auth.Auth object>)

Inherited from: FhirServicePatientItem.get_count

Get the count for a given FSS query

def get_count_by_field(field: str, **kwargs)

Inherited from: FhirServicePatientItem.get_count_by_field

Count records by a given field …

def get_count_by_patient(**kwargs)

Inherited from: FhirServicePatientItem.get_count_by_patient

Count records by a given field …

def get_data_frame(all_results: bool = False, raw: bool = False, id: Optional[str] = None, ids: List[str] = [], patient_id: Optional[str] = None, patient_ids: List[str] = [], device_id: Optional[str] = None, device_ids: List[str] = [], page_size: Optional[int] = None, max_pages: Optional[int] = None, query_overrides: dict = {}, auth_args=<phc.easy.auth.Auth object>, ignore_cache: bool = False, expand_args: dict = {}, log: bool = False, term: Optional[dict] = None, terms: List[dict] = [], max_terms: int = 30000, code: Union[str, List[str], None] = None, display: Union[str, List[str], None] = None, system: Union[str, List[str], None] = None, code_fields: List[str] = [])

Inherited from: FhirServicePatientItem.get_data_frame

Retrieve records …

def patient_id_prefixes()
Expand source code
@staticmethod
def patient_id_prefixes():
    return ["Patient/", "urn:uuid:"]
def patient_key()
Expand source code
@staticmethod
def patient_key():
    return "subject.reference"
def table_name()

Inherited from: FhirServicePatientItem.table_name

Returns the FSS table name for retrieval

Expand source code
@staticmethod
def table_name():
    return "diagnostic_report"
def transform_results(df: pandas.core.frame.DataFrame, **expand_args)

Inherited from: FhirServicePatientItem.transform_results

Transform data frame batch

Expand source code
@staticmethod
def transform_results(df: pd.DataFrame, **expand_args):
    return Frame.expand(
        df,
        custom_columns=[
            *expand_args.get("custom_columns", []),
            Frame.codeable_like_column_expander("subject"),
            Frame.codeable_like_column_expander("presentedForm"),
            Frame.codeable_like_column_expander("result"),
        ],
    )
class DocumentReference

Provides an abstract class and/or static methods for retrieving items from a FSS table that relates to a patient

Expand source code
class DocumentReference(FhirServicePatientItem):
    @staticmethod
    def table_name():
        return "document_reference"

    @staticmethod
    def code_fields():
        return ["type.coding", "content.attachment", "meta.tag"]

    @staticmethod
    def transform_results(df: pd.DataFrame, **expand_args):
        return Frame.expand(
            df,
            code_columns=[*expand_args.get("code_columns", []), "type"],
            custom_columns=[
                *expand_args.get("custom_columns", []),
                Frame.codeable_like_column_expander("subject"),
                # TODO: Properly parse content column
                #
                # Example:
                # [{'attachment': {'contentType': 'application/gzip',
                #    'url': 'https://api.us.lifeomic.com/v1/files/<uuid>',
                #    'size': 182539,
                #    'title': 'helix-source-files/normalized/<filename>.vcf.gz'}}]
            ],
        )

Ancestors

Subclasses

Static methods

def code_fields()

Inherited from: FhirServicePatientItem.code_fields

Returns the code keys (e.g. when searching for codes)

Expand source code
@staticmethod
def code_fields():
    return ["type.coding", "content.attachment", "meta.tag"]
def get_codes(display_query: Optional[str] = None, sample_size: Optional[int] = None, exclude_meta_tag=True, **kwargs)

Inherited from: FhirServicePatientItem.get_codes

Find all codes …

def get_count(query_overrides: dict = {}, auth_args=<phc.easy.auth.Auth object>)

Inherited from: FhirServicePatientItem.get_count

Get the count for a given FSS query

def get_count_by_field(field: str, **kwargs)

Inherited from: FhirServicePatientItem.get_count_by_field

Count records by a given field …

def get_count_by_patient(**kwargs)

Inherited from: FhirServicePatientItem.get_count_by_patient

Count records by a given field …

def get_data_frame(all_results: bool = False, raw: bool = False, id: Optional[str] = None, ids: List[str] = [], patient_id: Optional[str] = None, patient_ids: List[str] = [], device_id: Optional[str] = None, device_ids: List[str] = [], page_size: Optional[int] = None, max_pages: Optional[int] = None, query_overrides: dict = {}, auth_args=<phc.easy.auth.Auth object>, ignore_cache: bool = False, expand_args: dict = {}, log: bool = False, term: Optional[dict] = None, terms: List[dict] = [], max_terms: int = 30000, code: Union[str, List[str], None] = None, display: Union[str, List[str], None] = None, system: Union[str, List[str], None] = None, code_fields: List[str] = [])

Inherited from: FhirServicePatientItem.get_data_frame

Retrieve records …

def table_name()

Inherited from: FhirServicePatientItem.table_name

Returns the FSS table name for retrieval

Expand source code
@staticmethod
def table_name():
    return "document_reference"
def transform_results(df: pandas.core.frame.DataFrame, **expand_args)

Inherited from: FhirServicePatientItem.transform_results

Transform data frame batch

Expand source code
@staticmethod
def transform_results(df: pd.DataFrame, **expand_args):
    return Frame.expand(
        df,
        code_columns=[*expand_args.get("code_columns", []), "type"],
        custom_columns=[
            *expand_args.get("custom_columns", []),
            Frame.codeable_like_column_expander("subject"),
            # TODO: Properly parse content column
            #
            # Example:
            # [{'attachment': {'contentType': 'application/gzip',
            #    'url': 'https://api.us.lifeomic.com/v1/files/<uuid>',
            #    'size': 182539,
            #    'title': 'helix-source-files/normalized/<filename>.vcf.gz'}}]
        ],
    )
class Encounter

Provides an abstract class and/or static methods for retrieving items from a FSS table that relates to a patient

Expand source code
class Encounter(FhirServicePatientItem):
    @staticmethod
    def table_name():
        return "encounter"

    @staticmethod
    def patient_key():
        return "subject.reference"

    @staticmethod
    def code_fields():
        return [
            "class",
            "priority.coding",
            "participant.type.coding",
            "length",
            "hospitalization.admitSource.coding",
            "hospitalization.dischargeDisposition.coding",
            "meta.tag",
        ]

    @staticmethod
    def transform_results(df: pd.DataFrame, **expand_args):
        return Frame.expand(
            df,
            date_columns=[
                *expand_args.get("date_columns", []),
                "period.start",
                "period.end",
            ],
            code_columns=[
                *expand_args.get("code_columns", []),
                "class",
                "priority",
                "participant",
                "length",
                "hospitalization",
            ],
            custom_columns=[
                *expand_args.get("custom_columns", []),
                Frame.codeable_like_column_expander("subject"),
                Frame.codeable_like_column_expander("period"),
                Frame.codeable_like_column_expander("reason"),
                Frame.codeable_like_column_expander("location"),
                Frame.codeable_like_column_expander("serviceProvider"),
            ],
        )

Ancestors

Static methods

def code_fields()

Inherited from: FhirServicePatientItem.code_fields

Returns the code keys (e.g. when searching for codes)

Expand source code
@staticmethod
def code_fields():
    return [
        "class",
        "priority.coding",
        "participant.type.coding",
        "length",
        "hospitalization.admitSource.coding",
        "hospitalization.dischargeDisposition.coding",
        "meta.tag",
    ]
def get_codes(display_query: Optional[str] = None, sample_size: Optional[int] = None, exclude_meta_tag=True, **kwargs)

Inherited from: FhirServicePatientItem.get_codes

Find all codes …

def get_count(query_overrides: dict = {}, auth_args=<phc.easy.auth.Auth object>)

Inherited from: FhirServicePatientItem.get_count

Get the count for a given FSS query

def get_count_by_field(field: str, **kwargs)

Inherited from: FhirServicePatientItem.get_count_by_field

Count records by a given field …

def get_count_by_patient(**kwargs)

Inherited from: FhirServicePatientItem.get_count_by_patient

Count records by a given field …

def get_data_frame(all_results: bool = False, raw: bool = False, id: Optional[str] = None, ids: List[str] = [], patient_id: Optional[str] = None, patient_ids: List[str] = [], device_id: Optional[str] = None, device_ids: List[str] = [], page_size: Optional[int] = None, max_pages: Optional[int] = None, query_overrides: dict = {}, auth_args=<phc.easy.auth.Auth object>, ignore_cache: bool = False, expand_args: dict = {}, log: bool = False, term: Optional[dict] = None, terms: List[dict] = [], max_terms: int = 30000, code: Union[str, List[str], None] = None, display: Union[str, List[str], None] = None, system: Union[str, List[str], None] = None, code_fields: List[str] = [])

Inherited from: FhirServicePatientItem.get_data_frame

Retrieve records …

def patient_key()
Expand source code
@staticmethod
def patient_key():
    return "subject.reference"
def table_name()

Inherited from: FhirServicePatientItem.table_name

Returns the FSS table name for retrieval

Expand source code
@staticmethod
def table_name():
    return "encounter"
def transform_results(df: pandas.core.frame.DataFrame, **expand_args)

Inherited from: FhirServicePatientItem.transform_results

Transform data frame batch

Expand source code
@staticmethod
def transform_results(df: pd.DataFrame, **expand_args):
    return Frame.expand(
        df,
        date_columns=[
            *expand_args.get("date_columns", []),
            "period.start",
            "period.end",
        ],
        code_columns=[
            *expand_args.get("code_columns", []),
            "class",
            "priority",
            "participant",
            "length",
            "hospitalization",
        ],
        custom_columns=[
            *expand_args.get("custom_columns", []),
            Frame.codeable_like_column_expander("subject"),
            Frame.codeable_like_column_expander("period"),
            Frame.codeable_like_column_expander("reason"),
            Frame.codeable_like_column_expander("location"),
            Frame.codeable_like_column_expander("serviceProvider"),
        ],
    )
class FhirServiceItem

Provides an abstract class and/or static methods for retrieving items from a FSS table

Expand source code
class FhirServiceItem:
    """Provides an abstract class and/or static methods for retrieving items
    from a FSS table
    """

    @ClassProperty
    @classmethod
    def DSTU3(cls) -> DSTU3:
        """Return a DSTU3 instance with the entity name configured

        Usage:
            phc.Patient.DSTU3.get(...)
        """
        try:
            # Must wrap in try/except since docs try to access this on abstract classes
            # where table_name() throws a ValueError
            table_name = cls.table_name()
        except Exception:
            table_name = ""

        return DSTU3(snake_to_title_case(table_name))

    @staticmethod
    def table_name() -> str:
        "Returns the FSS table name for retrieval"
        raise ValueError("Table name should be implemented by subclass")

    @staticmethod
    def code_fields() -> List[str]:
        "Returns the code keys (e.g. when searching for codes)"
        return []

    @classmethod
    def get_count(cls, query_overrides: dict = {}, auth_args=Auth.shared()):
        "Get the count for a given FSS query"
        return Query.find_count_of_dsl_query(
            {
                "type": "select",
                "columns": "*",
                "from": [{"table": cls.table_name()}],
                **query_overrides,
            },
            auth_args=auth_args,
        )

    @staticmethod
    def transform_results(data_frame: pd.DataFrame, **_expand_args):
        "Transform data frame batch"
        return data_frame

    @classmethod
    def get_data_frame(
        cls,
        all_results: bool = False,
        raw: bool = False,
        page_size: Union[int, None] = None,
        max_pages: Union[int, None] = None,
        query_overrides: dict = {},
        auth_args=Auth.shared(),
        ignore_cache: bool = False,
        expand_args: dict = {},
        log: bool = False,
        id: Optional[str] = None,
        ids: List[str] = [],
        # Terms
        term: Optional[dict] = None,
        terms: List[dict] = [],
        max_terms: int = DEFAULT_MAX_TERMS,
        # Codes
        code: Optional[Union[str, List[str]]] = None,
        display: Optional[Union[str, List[str]]] = None,
        system: Optional[Union[str, List[str]]] = None,
        code_fields: List[str] = [],
    ):
        """Retrieve records

        Attributes
        ----------
        all_results : bool = False
            Retrieve sample of results (10) or entire set of records

        raw : bool = False
            If raw, then values will not be expanded (useful for manual
            inspection if something goes wrong)

        page_size : int
            The number of records to fetch per page

        max_pages : int
            The number of pages to retrieve (useful if working with tons of records)

        query_overrides : dict = {}
            Override any part of the elasticsearch FHIR query

        auth_args : Any
            The authenication to use for the account and project (defaults to shared)

        ignore_cache : bool = False
            Bypass the caching system that auto-saves results to a CSV file.
            Caching only occurs when all results are being retrieved.

        expand_args : Any
            Additional arguments passed to phc.Frame.expand

        log : bool = False
            Whether to log some diagnostic statements for debugging

        id : None or str = None
            Find records for a given id

        ids : List[str]
            Find records for given ids

        max_terms : int
            Maximum terms per query clause before chunking into multiple requests

        term : dict
            Add an arbitrary ES term/s to the query (includes chunking)

        terms : dict
            Add multiple arbitrary ES term/s to the query (includes chunking)

        code : str | List[str]
            Adds where clause for code value(s)

        display : str | List[str]
            Adds where clause for code display value(s)

        system : str | List[str]
            Adds where clause for code system value(s)

        code_fields : List[str]
            A list of paths to find FHIR codes in (default: codes for the given entity)

        Examples
        --------
        >>> import phc.easy as phc
        >>> phc.Auth.set({'account': '<your-account-name>'})
        >>> phc.Project.set_current('My Project Name')
        >>>
        >>> phc.Observation.get_data_frame(patient_id='<patient-id>')
        >>>
        >>> phc.Goal.get_data_frame(patient_id='<patient-id>')
        """
        query = {
            "type": "select",
            "columns": "*",
            "from": [{"table": cls.table_name()}],
        }

        code_fields = [*cls.code_fields(), *code_fields]

        def transform(df: pd.DataFrame):
            return cls.transform_results(df, **expand_args)

        return Query.execute_fhir_dsl_with_options(
            query,
            transform,
            all_results,
            raw,
            query_overrides,
            auth_args,
            ignore_cache,
            page_size=page_size,
            max_pages=max_pages,
            log=log,
            # Terms
            term=term,
            terms=terms,
            max_terms=max_terms,
            # Codes
            code_fields=code_fields,
            code=code,
            display=display,
            system=system,
            id=id,
            ids=ids,
        )

    @classmethod
    def get_codes(
        cls,
        display_query: Optional[str] = None,
        sample_size: Optional[int] = None,
        exclude_meta_tag=True,
        **kwargs,
    ):
        """Find all codes

        See possible argments for `phc.easy.query.Query.get_codes`

        Examples
        --------
        >>> import phc.easy as phc
        >>> phc.Auth.set({'account': '<your-account-name>'})
        >>> phc.Project.set_current('My Project Name')
        >>>
        >>> phc.Observation.get_codes(patient_id="<id>", max_pages=3)
        """
        code_fields = [*cls.code_fields(), *kwargs.get("code_fields", [])]

        # Meta tag can significantly clutter things up since it's often a date
        # value instead of a real code
        if exclude_meta_tag:
            code_fields = [
                field for field in code_fields if field != "meta.tag"
            ]

        return Query.get_codes(
            display_query=display_query,
            sample_size=sample_size,
            table_name=cls.table_name(),
            code_fields=code_fields,
            **without_keys(kwargs, ["code_fields"]),
        )

    @classmethod
    def get_count_by_field(cls, field: str, **kwargs):
        """Count records by a given field

        See argments for :func:`~phc.easy.query.Query.get_count_by_field`

        Attributes
        ----------
        field : str
            The field name to count the values of (e.g. "gender")

        Examples
        --------
        >>> import phc.easy as phc
        >>> phc.Auth.set({'account': '<your-account-name>'})
        >>> phc.Project.set_current('My Project Name')
        >>>
        >>> phc.Observation.get_count_by_field('category.coding.code')
        """
        return Query.get_count_by_field(
            table_name=cls.table_name(), field=field, **kwargs
        )

Subclasses

Class variables

var DSTU3

Static methods

def code_fields() ‑> List[str]

Returns the code keys (e.g. when searching for codes)

Expand source code
@staticmethod
def code_fields() -> List[str]:
    "Returns the code keys (e.g. when searching for codes)"
    return []
def get_codes(display_query: Optional[str] = None, sample_size: Optional[int] = None, exclude_meta_tag=True, **kwargs)

Find all codes

See possible argments for Query.get_codes()

Examples

>>> import phc.easy as phc
>>> phc.Auth.set({'account': '<your-account-name>'})
>>> phc.Project.set_current('My Project Name')
>>>
>>> phc.Observation.get_codes(patient_id="<id>", max_pages=3)
Expand source code
@classmethod
def get_codes(
    cls,
    display_query: Optional[str] = None,
    sample_size: Optional[int] = None,
    exclude_meta_tag=True,
    **kwargs,
):
    """Find all codes

    See possible argments for `phc.easy.query.Query.get_codes`

    Examples
    --------
    >>> import phc.easy as phc
    >>> phc.Auth.set({'account': '<your-account-name>'})
    >>> phc.Project.set_current('My Project Name')
    >>>
    >>> phc.Observation.get_codes(patient_id="<id>", max_pages=3)
    """
    code_fields = [*cls.code_fields(), *kwargs.get("code_fields", [])]

    # Meta tag can significantly clutter things up since it's often a date
    # value instead of a real code
    if exclude_meta_tag:
        code_fields = [
            field for field in code_fields if field != "meta.tag"
        ]

    return Query.get_codes(
        display_query=display_query,
        sample_size=sample_size,
        table_name=cls.table_name(),
        code_fields=code_fields,
        **without_keys(kwargs, ["code_fields"]),
    )
def get_count(query_overrides: dict = {}, auth_args=<phc.easy.auth.Auth object>)

Get the count for a given FSS query

Expand source code
@classmethod
def get_count(cls, query_overrides: dict = {}, auth_args=Auth.shared()):
    "Get the count for a given FSS query"
    return Query.find_count_of_dsl_query(
        {
            "type": "select",
            "columns": "*",
            "from": [{"table": cls.table_name()}],
            **query_overrides,
        },
        auth_args=auth_args,
    )
def get_count_by_field(field: str, **kwargs)

Count records by a given field

See argments for :func:~phc.easy.query.Query.get_count_by_field

Attributes

field : str
The field name to count the values of (e.g. "gender")

Examples

>>> import phc.easy as phc
>>> phc.Auth.set({'account': '<your-account-name>'})
>>> phc.Project.set_current('My Project Name')
>>>
>>> phc.Observation.get_count_by_field('category.coding.code')
Expand source code
@classmethod
def get_count_by_field(cls, field: str, **kwargs):
    """Count records by a given field

    See argments for :func:`~phc.easy.query.Query.get_count_by_field`

    Attributes
    ----------
    field : str
        The field name to count the values of (e.g. "gender")

    Examples
    --------
    >>> import phc.easy as phc
    >>> phc.Auth.set({'account': '<your-account-name>'})
    >>> phc.Project.set_current('My Project Name')
    >>>
    >>> phc.Observation.get_count_by_field('category.coding.code')
    """
    return Query.get_count_by_field(
        table_name=cls.table_name(), field=field, **kwargs
    )
def get_data_frame(all_results: bool = False, raw: bool = False, page_size: Optional[int] = None, max_pages: Optional[int] = None, query_overrides: dict = {}, auth_args=<phc.easy.auth.Auth object>, ignore_cache: bool = False, expand_args: dict = {}, log: bool = False, id: Optional[str] = None, ids: List[str] = [], term: Optional[dict] = None, terms: List[dict] = [], max_terms: int = 30000, code: Union[str, List[str], None] = None, display: Union[str, List[str], None] = None, system: Union[str, List[str], None] = None, code_fields: List[str] = [])

Retrieve records

Attributes

all_results : bool = False
Retrieve sample of results (10) or entire set of records
raw : bool = False
If raw, then values will not be expanded (useful for manual inspection if something goes wrong)
page_size : int
The number of records to fetch per page
max_pages : int
The number of pages to retrieve (useful if working with tons of records)
query_overrides : dict = {}
Override any part of the elasticsearch FHIR query
auth_args : Any
The authenication to use for the account and project (defaults to shared)
ignore_cache : bool = False
Bypass the caching system that auto-saves results to a CSV file. Caching only occurs when all results are being retrieved.
expand_args : Any
Additional arguments passed to phc.Frame.expand
log : bool = False
Whether to log some diagnostic statements for debugging
id : None or str = None
Find records for a given id
ids : List[str]
Find records for given ids
max_terms : int
Maximum terms per query clause before chunking into multiple requests
term : dict
Add an arbitrary ES term/s to the query (includes chunking)
terms : dict
Add multiple arbitrary ES term/s to the query (includes chunking)
code : str | List[str]
Adds where clause for code value(s)
display : str | List[str]
Adds where clause for code display value(s)
system : str | List[str]
Adds where clause for code system value(s)
code_fields : List[str]
A list of paths to find FHIR codes in (default: codes for the given entity)

Examples

>>> import phc.easy as phc
>>> phc.Auth.set({'account': '<your-account-name>'})
>>> phc.Project.set_current('My Project Name')
>>>
>>> phc.Observation.get_data_frame(patient_id='<patient-id>')
>>>
>>> phc.Goal.get_data_frame(patient_id='<patient-id>')
Expand source code
@classmethod
def get_data_frame(
    cls,
    all_results: bool = False,
    raw: bool = False,
    page_size: Union[int, None] = None,
    max_pages: Union[int, None] = None,
    query_overrides: dict = {},
    auth_args=Auth.shared(),
    ignore_cache: bool = False,
    expand_args: dict = {},
    log: bool = False,
    id: Optional[str] = None,
    ids: List[str] = [],
    # Terms
    term: Optional[dict] = None,
    terms: List[dict] = [],
    max_terms: int = DEFAULT_MAX_TERMS,
    # Codes
    code: Optional[Union[str, List[str]]] = None,
    display: Optional[Union[str, List[str]]] = None,
    system: Optional[Union[str, List[str]]] = None,
    code_fields: List[str] = [],
):
    """Retrieve records

    Attributes
    ----------
    all_results : bool = False
        Retrieve sample of results (10) or entire set of records

    raw : bool = False
        If raw, then values will not be expanded (useful for manual
        inspection if something goes wrong)

    page_size : int
        The number of records to fetch per page

    max_pages : int
        The number of pages to retrieve (useful if working with tons of records)

    query_overrides : dict = {}
        Override any part of the elasticsearch FHIR query

    auth_args : Any
        The authenication to use for the account and project (defaults to shared)

    ignore_cache : bool = False
        Bypass the caching system that auto-saves results to a CSV file.
        Caching only occurs when all results are being retrieved.

    expand_args : Any
        Additional arguments passed to phc.Frame.expand

    log : bool = False
        Whether to log some diagnostic statements for debugging

    id : None or str = None
        Find records for a given id

    ids : List[str]
        Find records for given ids

    max_terms : int
        Maximum terms per query clause before chunking into multiple requests

    term : dict
        Add an arbitrary ES term/s to the query (includes chunking)

    terms : dict
        Add multiple arbitrary ES term/s to the query (includes chunking)

    code : str | List[str]
        Adds where clause for code value(s)

    display : str | List[str]
        Adds where clause for code display value(s)

    system : str | List[str]
        Adds where clause for code system value(s)

    code_fields : List[str]
        A list of paths to find FHIR codes in (default: codes for the given entity)

    Examples
    --------
    >>> import phc.easy as phc
    >>> phc.Auth.set({'account': '<your-account-name>'})
    >>> phc.Project.set_current('My Project Name')
    >>>
    >>> phc.Observation.get_data_frame(patient_id='<patient-id>')
    >>>
    >>> phc.Goal.get_data_frame(patient_id='<patient-id>')
    """
    query = {
        "type": "select",
        "columns": "*",
        "from": [{"table": cls.table_name()}],
    }

    code_fields = [*cls.code_fields(), *code_fields]

    def transform(df: pd.DataFrame):
        return cls.transform_results(df, **expand_args)

    return Query.execute_fhir_dsl_with_options(
        query,
        transform,
        all_results,
        raw,
        query_overrides,
        auth_args,
        ignore_cache,
        page_size=page_size,
        max_pages=max_pages,
        log=log,
        # Terms
        term=term,
        terms=terms,
        max_terms=max_terms,
        # Codes
        code_fields=code_fields,
        code=code,
        display=display,
        system=system,
        id=id,
        ids=ids,
    )
def table_name() ‑> str

Returns the FSS table name for retrieval

Expand source code
@staticmethod
def table_name() -> str:
    "Returns the FSS table name for retrieval"
    raise ValueError("Table name should be implemented by subclass")
def transform_results(data_frame: pandas.core.frame.DataFrame, **_expand_args)

Transform data frame batch

Expand source code
@staticmethod
def transform_results(data_frame: pd.DataFrame, **_expand_args):
    "Transform data frame batch"
    return data_frame
class FhirServicePatientItem

Provides an abstract class and/or static methods for retrieving items from a FSS table that relates to a patient

Expand source code
class FhirServicePatientItem(FhirServiceItem):
    """Provides an abstract class and/or static methods for retrieving items
    from a FSS table that relates to a patient
    """

    @staticmethod
    def patient_key() -> str:
        return "subject.reference"

    @staticmethod
    def patient_id_prefixes() -> List[str]:
        return ["Patient/"]

    @classmethod
    def get_data_frame(
        cls,
        all_results: bool = False,
        raw: bool = False,
        id: Optional[str] = None,
        ids: List[str] = [],
        patient_id: Union[None, str] = None,
        patient_ids: List[str] = [],
        device_id: Union[None, str] = None,
        device_ids: List[str] = [],
        page_size: Union[int, None] = None,
        max_pages: Union[int, None] = None,
        query_overrides: dict = {},
        auth_args=Auth.shared(),
        ignore_cache: bool = False,
        expand_args: dict = {},
        log: bool = False,
        # Terms
        term: Optional[dict] = None,
        terms: List[dict] = [],
        max_terms: int = DEFAULT_MAX_TERMS,
        # Codes
        code: Optional[Union[str, List[str]]] = None,
        display: Optional[Union[str, List[str]]] = None,
        system: Optional[Union[str, List[str]]] = None,
        code_fields: List[str] = [],
    ):
        """Retrieve records

        Attributes
        ----------
        all_results : bool = False
            Retrieve sample of results (10) or entire set of records

        raw : bool = False
            If raw, then values will not be expanded (useful for manual
            inspection if something goes wrong)

        id : None or str = None
            Find records for a given id

        ids : List[str]
            Find records for given ids

        patient_id : None or str = None
            Find records for a given patient_id

        patient_ids : List[str]
            Find records for given patient_ids

        device_id: None or str = None
            Find records for a given device_id

        device_ids: List[str]
            Find records for given device_ids

        page_size : int
            The number of records to fetch per page

        max_pages : int
            The number of pages to retrieve (useful if working with tons of records)

        query_overrides : dict = {}
            Override any part of the elasticsearch FHIR query

        auth_args : Any
            The authenication to use for the account and project (defaults to shared)

        ignore_cache : bool = False
            Bypass the caching system that auto-saves results to a CSV file.
            Caching only occurs when all results are being retrieved.

        expand_args : Any
            Additional arguments passed to phc.Frame.expand

        log : bool = False
            Whether to log some diagnostic statements for debugging

        max_terms : int
            Maximum terms per query clause before chunking into multiple requests

        term : dict
            Add an arbitrary ES term/s to the query (includes chunking)

        terms : dict
            Add multiple arbitrary ES term/s to the query (includes chunking)

        code : str | List[str]
            Adds where clause for code value(s)

        display : str | List[str]
            Adds where clause for code display value(s)

        system : str | List[str]
            Adds where clause for code system value(s)

        code_fields : List[str]
            A list of paths to find FHIR codes in (default: codes for the given entity)

        Examples
        --------
        >>> import phc.easy as phc
        >>> phc.Auth.set({'account': '<your-account-name>'})
        >>> phc.Project.set_current('My Project Name')
        >>>
        >>> phc.Observation.get_data_frame(patient_id='<patient-id>')
        >>>
        >>> phc.Goal.get_data_frame(patient_id='<patient-id>')
        """
        query = {
            "type": "select",
            "columns": "*",
            "from": [{"table": cls.table_name()}],
        }

        code_fields = [*cls.code_fields(), *code_fields]

        def transform(df: pd.DataFrame):
            return cls.transform_results(df, **expand_args)

        # TODO: As of Feb 2023, only observations with patient references are indexed.
        # So in order to query for observations associated with a device we have to set the
        # device_id to the patient_id.
        # This is a workaround and should be fixed once The Platform properly indexes devices.
        if device_id:
            patient_id = device_id

        if device_ids:
            patient_ids = device_ids

        return Query.execute_fhir_dsl_with_options(
            query,
            transform,
            all_results,
            raw,
            query_overrides,
            auth_args,
            ignore_cache,
            id=id,
            ids=ids,
            patient_id=patient_id,
            patient_ids=patient_ids,
            page_size=page_size,
            max_pages=max_pages,
            patient_key=cls.patient_key(),
            log=log,
            patient_id_prefixes=cls.patient_id_prefixes(),
            # Terms
            term=term,
            terms=terms,
            max_terms=max_terms,
            # Codes
            code_fields=code_fields,
            code=code,
            display=display,
            system=system,
        )

    @classmethod
    def get_count_by_patient(cls, **kwargs):
        """Count records by a given field

        See argments for :func:`~phc.easy.query.Query.get_count_by_field`

        Examples
        --------
        >>> import phc.easy as phc
        >>> phc.Auth.set({'account': '<your-account-name>'})
        >>> phc.Project.set_current('My Project Name')
        >>>
        >>> phc.Observation.get_count_by_patient()
        """
        patient_key = cls.patient_key()

        df = Query.get_count_by_field(
            table_name=cls.table_name(), field=cls.patient_key(), **kwargs
        )

        # Make keys consistent (some are prefixed while others are not)
        df[patient_key] = df[patient_key].str.replace("Patient/", "")

        return df.groupby(patient_key).sum()

Ancestors

Subclasses

Static methods

def code_fields() ‑> List[str]

Inherited from: FhirServiceItem.code_fields

Returns the code keys (e.g. when searching for codes)

def get_codes(display_query: Optional[str] = None, sample_size: Optional[int] = None, exclude_meta_tag=True, **kwargs)

Inherited from: FhirServiceItem.get_codes

Find all codes …

def get_count(query_overrides: dict = {}, auth_args=<phc.easy.auth.Auth object>)

Inherited from: FhirServiceItem.get_count

Get the count for a given FSS query

def get_count_by_field(field: str, **kwargs)

Inherited from: FhirServiceItem.get_count_by_field

Count records by a given field …

def get_count_by_patient(**kwargs)

Count records by a given field

See argments for :func:~phc.easy.query.Query.get_count_by_field

Examples

>>> import phc.easy as phc
>>> phc.Auth.set({'account': '<your-account-name>'})
>>> phc.Project.set_current('My Project Name')
>>>
>>> phc.Observation.get_count_by_patient()
Expand source code
@classmethod
def get_count_by_patient(cls, **kwargs):
    """Count records by a given field

    See argments for :func:`~phc.easy.query.Query.get_count_by_field`

    Examples
    --------
    >>> import phc.easy as phc
    >>> phc.Auth.set({'account': '<your-account-name>'})
    >>> phc.Project.set_current('My Project Name')
    >>>
    >>> phc.Observation.get_count_by_patient()
    """
    patient_key = cls.patient_key()

    df = Query.get_count_by_field(
        table_name=cls.table_name(), field=cls.patient_key(), **kwargs
    )

    # Make keys consistent (some are prefixed while others are not)
    df[patient_key] = df[patient_key].str.replace("Patient/", "")

    return df.groupby(patient_key).sum()
def get_data_frame(all_results: bool = False, raw: bool = False, id: Optional[str] = None, ids: List[str] = [], patient_id: Optional[str] = None, patient_ids: List[str] = [], device_id: Optional[str] = None, device_ids: List[str] = [], page_size: Optional[int] = None, max_pages: Optional[int] = None, query_overrides: dict = {}, auth_args=<phc.easy.auth.Auth object>, ignore_cache: bool = False, expand_args: dict = {}, log: bool = False, term: Optional[dict] = None, terms: List[dict] = [], max_terms: int = 30000, code: Union[str, List[str], None] = None, display: Union[str, List[str], None] = None, system: Union[str, List[str], None] = None, code_fields: List[str] = [])

Retrieve records

Attributes

all_results : bool = False
Retrieve sample of results (10) or entire set of records
raw : bool = False
If raw, then values will not be expanded (useful for manual inspection if something goes wrong)
id : None or str = None
Find records for a given id
ids : List[str]
Find records for given ids
patient_id : None or str = None
Find records for a given patient_id
patient_ids : List[str]
Find records for given patient_ids
device_id : None or str = None
Find records for a given device_id
device_ids : List[str]
Find records for given device_ids
page_size : int
The number of records to fetch per page
max_pages : int
The number of pages to retrieve (useful if working with tons of records)
query_overrides : dict = {}
Override any part of the elasticsearch FHIR query
auth_args : Any
The authenication to use for the account and project (defaults to shared)
ignore_cache : bool = False
Bypass the caching system that auto-saves results to a CSV file. Caching only occurs when all results are being retrieved.
expand_args : Any
Additional arguments passed to phc.Frame.expand
log : bool = False
Whether to log some diagnostic statements for debugging
max_terms : int
Maximum terms per query clause before chunking into multiple requests
term : dict
Add an arbitrary ES term/s to the query (includes chunking)
terms : dict
Add multiple arbitrary ES term/s to the query (includes chunking)
code : str | List[str]
Adds where clause for code value(s)
display : str | List[str]
Adds where clause for code display value(s)
system : str | List[str]
Adds where clause for code system value(s)
code_fields : List[str]
A list of paths to find FHIR codes in (default: codes for the given entity)

Examples

>>> import phc.easy as phc
>>> phc.Auth.set({'account': '<your-account-name>'})
>>> phc.Project.set_current('My Project Name')
>>>
>>> phc.Observation.get_data_frame(patient_id='<patient-id>')
>>>
>>> phc.Goal.get_data_frame(patient_id='<patient-id>')
Expand source code
@classmethod
def get_data_frame(
    cls,
    all_results: bool = False,
    raw: bool = False,
    id: Optional[str] = None,
    ids: List[str] = [],
    patient_id: Union[None, str] = None,
    patient_ids: List[str] = [],
    device_id: Union[None, str] = None,
    device_ids: List[str] = [],
    page_size: Union[int, None] = None,
    max_pages: Union[int, None] = None,
    query_overrides: dict = {},
    auth_args=Auth.shared(),
    ignore_cache: bool = False,
    expand_args: dict = {},
    log: bool = False,
    # Terms
    term: Optional[dict] = None,
    terms: List[dict] = [],
    max_terms: int = DEFAULT_MAX_TERMS,
    # Codes
    code: Optional[Union[str, List[str]]] = None,
    display: Optional[Union[str, List[str]]] = None,
    system: Optional[Union[str, List[str]]] = None,
    code_fields: List[str] = [],
):
    """Retrieve records

    Attributes
    ----------
    all_results : bool = False
        Retrieve sample of results (10) or entire set of records

    raw : bool = False
        If raw, then values will not be expanded (useful for manual
        inspection if something goes wrong)

    id : None or str = None
        Find records for a given id

    ids : List[str]
        Find records for given ids

    patient_id : None or str = None
        Find records for a given patient_id

    patient_ids : List[str]
        Find records for given patient_ids

    device_id: None or str = None
        Find records for a given device_id

    device_ids: List[str]
        Find records for given device_ids

    page_size : int
        The number of records to fetch per page

    max_pages : int
        The number of pages to retrieve (useful if working with tons of records)

    query_overrides : dict = {}
        Override any part of the elasticsearch FHIR query

    auth_args : Any
        The authenication to use for the account and project (defaults to shared)

    ignore_cache : bool = False
        Bypass the caching system that auto-saves results to a CSV file.
        Caching only occurs when all results are being retrieved.

    expand_args : Any
        Additional arguments passed to phc.Frame.expand

    log : bool = False
        Whether to log some diagnostic statements for debugging

    max_terms : int
        Maximum terms per query clause before chunking into multiple requests

    term : dict
        Add an arbitrary ES term/s to the query (includes chunking)

    terms : dict
        Add multiple arbitrary ES term/s to the query (includes chunking)

    code : str | List[str]
        Adds where clause for code value(s)

    display : str | List[str]
        Adds where clause for code display value(s)

    system : str | List[str]
        Adds where clause for code system value(s)

    code_fields : List[str]
        A list of paths to find FHIR codes in (default: codes for the given entity)

    Examples
    --------
    >>> import phc.easy as phc
    >>> phc.Auth.set({'account': '<your-account-name>'})
    >>> phc.Project.set_current('My Project Name')
    >>>
    >>> phc.Observation.get_data_frame(patient_id='<patient-id>')
    >>>
    >>> phc.Goal.get_data_frame(patient_id='<patient-id>')
    """
    query = {
        "type": "select",
        "columns": "*",
        "from": [{"table": cls.table_name()}],
    }

    code_fields = [*cls.code_fields(), *code_fields]

    def transform(df: pd.DataFrame):
        return cls.transform_results(df, **expand_args)

    # TODO: As of Feb 2023, only observations with patient references are indexed.
    # So in order to query for observations associated with a device we have to set the
    # device_id to the patient_id.
    # This is a workaround and should be fixed once The Platform properly indexes devices.
    if device_id:
        patient_id = device_id

    if device_ids:
        patient_ids = device_ids

    return Query.execute_fhir_dsl_with_options(
        query,
        transform,
        all_results,
        raw,
        query_overrides,
        auth_args,
        ignore_cache,
        id=id,
        ids=ids,
        patient_id=patient_id,
        patient_ids=patient_ids,
        page_size=page_size,
        max_pages=max_pages,
        patient_key=cls.patient_key(),
        log=log,
        patient_id_prefixes=cls.patient_id_prefixes(),
        # Terms
        term=term,
        terms=terms,
        max_terms=max_terms,
        # Codes
        code_fields=code_fields,
        code=code,
        display=display,
        system=system,
    )
def patient_id_prefixes() ‑> List[str]
Expand source code
@staticmethod
def patient_id_prefixes() -> List[str]:
    return ["Patient/"]
def patient_key() ‑> str
Expand source code
@staticmethod
def patient_key() -> str:
    return "subject.reference"
def table_name() ‑> str

Inherited from: FhirServiceItem.table_name

Returns the FSS table name for retrieval

def transform_results(data_frame: pandas.core.frame.DataFrame, **_expand_args)

Inherited from: FhirServiceItem.transform_results

Transform data frame batch

class Frame
Expand source code
class Frame:
    @staticmethod
    @curry
    def _find_index_of_similar(columns: List[str], column_name: str):
        "Find sort order by original frame column names"
        MAX_INDEX = len(columns)

        return next(
            filter(
                lambda pair: pair[1] in column_name,
                # Start from reverse end since later columns might be longer
                reversed(list(enumerate(columns))),
            ),
            (MAX_INDEX, None),
        )[0]

    @staticmethod
    def codeable_like_column_expander(column_name: str):
        """Codeable expansion with prefix for passing to Frame.expand#custom_columns"""

        def _expander(column):
            return Codeable.expand_column(column).add_prefix(f"{column_name}.")

        return (column_name, _expander)

    @staticmethod
    def expand(
        frame: pd.DataFrame,
        code_columns: List[str] = [],
        date_columns: List[str] = [],
        custom_columns: List[
            Tuple[str, Callable[[pd.Series], pd.DataFrame]]
        ] = [],
    ):
        """Expand a data frame with FHIR codes, nested JSON structures, etc into a full,
        tabular data frame that can much more easily be wrangled

        Attributes
        ----------
        frame : pd.DataFrame
            The data frame to expand

        code_columns : List[str]
            The list of column names that contain code-like data (e.g. FHIR dictionaries)

        date_columns : List[str]
            The list of column names that contain dates (may not able to parse but might)

        custom_columns : List[Tuple[str, Callable[[pd.Series], pd.DataFrame]]]
            A list of tuples with the column name and a function that expands a
            column to a data frame. This will get merged index-wise into the
            combined frame

        """
        all_code_columns = [*CODE_COLUMNS, *code_columns]
        all_date_columns = [*DATE_COLUMNS, *date_columns]

        codeable_column_names = [
            key for key in all_code_columns if key in frame.columns
        ]

        custom_names = [
            key for key, _func in custom_columns if key in frame.columns
        ]

        code_frames = [
            (Codeable.expand_column(frame[col_name]).add_prefix(f"{col_name}."))
            for col_name in codeable_column_names
        ]

        columns = [
            frame.drop([*codeable_column_names, *custom_names], axis=1),
            *[
                (column_to_frame(frame, key, func))
                for key, func in custom_columns
            ],
            *code_frames,
        ]

        combined = pd.concat(columns, axis=1)

        date_column_names = list(
            filter(lambda k: k in combined.columns, all_date_columns)
        )

        # Mutate data frame to parse date columns
        for column_key in date_column_names:
            local_key = f"{column_key}.local"
            tz_key = f"{column_key}.tz"

            try:
                utc = pd.to_datetime(combined[column_key], utc=True)

                # Cleverness: Use regex to remove TZ and parse as utc=True to
                # produce local datetime. The column name will have ".local" as
                # suffix so it'll be clear what's happening.
                localized = pd.to_datetime(
                    combined[column_key].str.replace(TZ_REGEX, ""), utc=True
                )
            except pd.errors.OutOfBoundsDatetime as ex:
                print(
                    "[WARNING]: OutOfBoundsDatetime encountered. Casting to NaT.",
                    ex,
                )
                utc = pd.to_datetime(
                    combined[column_key], utc=True, errors="coerce"
                )
                localized = pd.to_datetime(
                    combined[column_key].str.replace(TZ_REGEX, ""),
                    utc=True,
                    errors="coerce",
                )

            combined[tz_key] = (localized - utc).dt.total_seconds() / 3600
            combined[local_key] = localized

        # Drop duplicate columns (nicety for same transform applied to cache)
        # Sort columns by original order (where possible)
        return combined.loc[:, ~combined.columns.duplicated()].reindex(
            sorted(
                [
                    c
                    for c in combined.columns.unique()
                    if c not in date_column_names
                ],
                key=Frame._find_index_of_similar(frame.columns),
            ),
            axis="columns",
        )

Static methods

def codeable_like_column_expander(column_name: str)

Codeable expansion with prefix for passing to Frame.expand#custom_columns

Expand source code
@staticmethod
def codeable_like_column_expander(column_name: str):
    """Codeable expansion with prefix for passing to Frame.expand#custom_columns"""

    def _expander(column):
        return Codeable.expand_column(column).add_prefix(f"{column_name}.")

    return (column_name, _expander)
def expand(frame: pandas.core.frame.DataFrame, code_columns: List[str] = [], date_columns: List[str] = [], custom_columns: List[Tuple[str, Callable[[pandas.core.series.Series], pandas.core.frame.DataFrame]]] = [])

Expand a data frame with FHIR codes, nested JSON structures, etc into a full, tabular data frame that can much more easily be wrangled

Attributes

frame : pd.DataFrame
The data frame to expand
code_columns : List[str]
The list of column names that contain code-like data (e.g. FHIR dictionaries)
date_columns : List[str]
The list of column names that contain dates (may not able to parse but might)
custom_columns : List[Tuple[str, Callable[[pd.Series], pd.DataFrame]]]
A list of tuples with the column name and a function that expands a column to a data frame. This will get merged index-wise into the combined frame
Expand source code
@staticmethod
def expand(
    frame: pd.DataFrame,
    code_columns: List[str] = [],
    date_columns: List[str] = [],
    custom_columns: List[
        Tuple[str, Callable[[pd.Series], pd.DataFrame]]
    ] = [],
):
    """Expand a data frame with FHIR codes, nested JSON structures, etc into a full,
    tabular data frame that can much more easily be wrangled

    Attributes
    ----------
    frame : pd.DataFrame
        The data frame to expand

    code_columns : List[str]
        The list of column names that contain code-like data (e.g. FHIR dictionaries)

    date_columns : List[str]
        The list of column names that contain dates (may not able to parse but might)

    custom_columns : List[Tuple[str, Callable[[pd.Series], pd.DataFrame]]]
        A list of tuples with the column name and a function that expands a
        column to a data frame. This will get merged index-wise into the
        combined frame

    """
    all_code_columns = [*CODE_COLUMNS, *code_columns]
    all_date_columns = [*DATE_COLUMNS, *date_columns]

    codeable_column_names = [
        key for key in all_code_columns if key in frame.columns
    ]

    custom_names = [
        key for key, _func in custom_columns if key in frame.columns
    ]

    code_frames = [
        (Codeable.expand_column(frame[col_name]).add_prefix(f"{col_name}."))
        for col_name in codeable_column_names
    ]

    columns = [
        frame.drop([*codeable_column_names, *custom_names], axis=1),
        *[
            (column_to_frame(frame, key, func))
            for key, func in custom_columns
        ],
        *code_frames,
    ]

    combined = pd.concat(columns, axis=1)

    date_column_names = list(
        filter(lambda k: k in combined.columns, all_date_columns)
    )

    # Mutate data frame to parse date columns
    for column_key in date_column_names:
        local_key = f"{column_key}.local"
        tz_key = f"{column_key}.tz"

        try:
            utc = pd.to_datetime(combined[column_key], utc=True)

            # Cleverness: Use regex to remove TZ and parse as utc=True to
            # produce local datetime. The column name will have ".local" as
            # suffix so it'll be clear what's happening.
            localized = pd.to_datetime(
                combined[column_key].str.replace(TZ_REGEX, ""), utc=True
            )
        except pd.errors.OutOfBoundsDatetime as ex:
            print(
                "[WARNING]: OutOfBoundsDatetime encountered. Casting to NaT.",
                ex,
            )
            utc = pd.to_datetime(
                combined[column_key], utc=True, errors="coerce"
            )
            localized = pd.to_datetime(
                combined[column_key].str.replace(TZ_REGEX, ""),
                utc=True,
                errors="coerce",
            )

        combined[tz_key] = (localized - utc).dt.total_seconds() / 3600
        combined[local_key] = localized

    # Drop duplicate columns (nicety for same transform applied to cache)
    # Sort columns by original order (where possible)
    return combined.loc[:, ~combined.columns.duplicated()].reindex(
        sorted(
            [
                c
                for c in combined.columns.unique()
                if c not in date_column_names
            ],
            key=Frame._find_index_of_similar(frame.columns),
        ),
        axis="columns",
    )
class Gene
Expand source code
class Gene:
    def get_data_frame(search: str = "", auth_args: Auth = Auth.shared()):
        auth = Auth(auth_args)
        client = BaseClient(auth.session())

        response = client._api_call(
            "knowledge/genes",
            http_verb="GET",
            params={"datasetId": auth.project_id, "gene": search},
        )

        frame = pd.DataFrame(response.data["items"])

        if "alias" in frame.columns:
            frame["alias"] = frame.alias.apply(
                lambda aliases: ",".join(aliases)
                if isinstance(aliases, list)
                else None
            )

        # We choose to not expand topCancerDrivers and cancerDrivers since it
        # can easily have 50 values in each. If we really need those, the user
        # will have to extract those.
        return frame

Methods

def get_data_frame(search: str = '', auth_args: Auth = <phc.easy.auth.Auth object>)
Expand source code
def get_data_frame(search: str = "", auth_args: Auth = Auth.shared()):
    auth = Auth(auth_args)
    client = BaseClient(auth.session())

    response = client._api_call(
        "knowledge/genes",
        http_verb="GET",
        params={"datasetId": auth.project_id, "gene": search},
    )

    frame = pd.DataFrame(response.data["items"])

    if "alias" in frame.columns:
        frame["alias"] = frame.alias.apply(
            lambda aliases: ",".join(aliases)
            if isinstance(aliases, list)
            else None
        )

    # We choose to not expand topCancerDrivers and cancerDrivers since it
    # can easily have 50 values in each. If we really need those, the user
    # will have to extract those.
    return frame
class GeneSet
Expand source code
class GeneSet:
    def get_data_frame(auth_args: Auth = Auth.shared()):
        auth = Auth(auth_args)
        client = BaseClient(auth.session())

        response = client._api_call(
            "knowledge/gene-sets",
            http_verb="GET",
            params={"datasetId": auth.project_id},
        )

        frame = pd.DataFrame(response.data["items"])

        if "genes" in frame.columns:
            frame["genes"] = frame.genes.apply(
                lambda genes: ",".join([d["gene"] for d in genes])
            )

        frame = frame.drop(["datasetId"], errors="ignore")

        return frame

Methods

def get_data_frame(auth_args: Auth = <phc.easy.auth.Auth object>)
Expand source code
def get_data_frame(auth_args: Auth = Auth.shared()):
    auth = Auth(auth_args)
    client = BaseClient(auth.session())

    response = client._api_call(
        "knowledge/gene-sets",
        http_verb="GET",
        params={"datasetId": auth.project_id},
    )

    frame = pd.DataFrame(response.data["items"])

    if "genes" in frame.columns:
        frame["genes"] = frame.genes.apply(
            lambda genes: ",".join([d["gene"] for d in genes])
        )

    frame = frame.drop(["datasetId"], errors="ignore")

    return frame
class GenomicCopyNumberVariant
Expand source code
class GenomicCopyNumberVariant(GenomicVariant):
    @staticmethod
    def resource_path():
        return "genomics/copy-numbers"

    @staticmethod
    def params_class():
        return GenomicCopyNumberVariantOptions

    @staticmethod
    def transform_results(data_frame: pd.DataFrame, params={}, **expand_args):
        def expand_id(id_column: pd.Series):
            return pd.concat(
                [
                    id_column,
                    id_column.str.split(":", expand=True).rename(
                        columns={0: "variant_set_id"}
                    )["variant_set_id"],
                ],
                axis=1,
            )

        args = {
            **expand_args,
            "custom_columns": [
                *expand_args.get("custom_columns", []),
                ("id", expand_id),
            ],
        }

        return Frame.expand(data_frame, **args)

    @classmethod
    def get_data_frame(
        cls,
        # Query parameters
        variant_set_ids: List[str] = [],
        include: List[GenomicVariantInclude] = [],
        gene: List[str] = [],
        interpretation: List[str] = [],
        effect: List[CopyNumberStatus] = [],
        in_ckb: Optional[bool] = None,
        # Test parameters
        patient_id: Optional[str] = None,
        test_status: Optional[GenomicTestStatus] = GenomicTestStatus.ACTIVE,
        # Execution parameters,
        all_results: bool = False,
        auth_args: Auth = Auth.shared(),
        max_pages: Optional[int] = None,
        page_size: Optional[int] = None,
        log: bool = False,
        **kw_args,
    ):
        """Execute a request for genomic copy number variants

        ## Parameters

        Query: `phc.easy.omics.options.genomic_copy_number_variant.GenomicCopyNumberVariantOptions`

        Execution: `phc.easy.query.Query.execute_paging_api`

        Expansion: `phc.easy.frame.Frame.expand`
        """

        args = cls._get_current_args(inspect.currentframe(), locals())

        return super().get_data_frame(
            test_type=GenomicTestType.COPY_NUMBER_VARIANT, **{**kw_args, **args}
        )

Ancestors

Static methods

def get_data_frame(variant_set_ids: List[str] = [], include: List[GenomicVariantInclude] = [], gene: List[str] = [], interpretation: List[str] = [], effect: List[CopyNumberStatus] = [], in_ckb: Optional[bool] = None, patient_id: Optional[str] = None, test_status: Optional[GenomicTestStatus] = GenomicTestStatus.ACTIVE, all_results: bool = False, auth_args: Auth = <phc.easy.auth.Auth object>, max_pages: Optional[int] = None, page_size: Optional[int] = None, log: bool = False, **kw_args)

Execute a request for genomic copy number variants

Parameters

Query: GenomicCopyNumberVariantOptions

Execution: Query.execute_paging_api()

Expansion: Frame.expand()

Expand source code
@classmethod
def get_data_frame(
    cls,
    # Query parameters
    variant_set_ids: List[str] = [],
    include: List[GenomicVariantInclude] = [],
    gene: List[str] = [],
    interpretation: List[str] = [],
    effect: List[CopyNumberStatus] = [],
    in_ckb: Optional[bool] = None,
    # Test parameters
    patient_id: Optional[str] = None,
    test_status: Optional[GenomicTestStatus] = GenomicTestStatus.ACTIVE,
    # Execution parameters,
    all_results: bool = False,
    auth_args: Auth = Auth.shared(),
    max_pages: Optional[int] = None,
    page_size: Optional[int] = None,
    log: bool = False,
    **kw_args,
):
    """Execute a request for genomic copy number variants

    ## Parameters

    Query: `phc.easy.omics.options.genomic_copy_number_variant.GenomicCopyNumberVariantOptions`

    Execution: `phc.easy.query.Query.execute_paging_api`

    Expansion: `phc.easy.frame.Frame.expand`
    """

    args = cls._get_current_args(inspect.currentframe(), locals())

    return super().get_data_frame(
        test_type=GenomicTestType.COPY_NUMBER_VARIANT, **{**kw_args, **args}
    )
def params_class()

Inherited from: GenomicVariant.params_class

Returns a pydantic type that validates and transforms the params with dict()

Expand source code
@staticmethod
def params_class():
    return GenomicCopyNumberVariantOptions
def process_params(params: dict) ‑> dict

Inherited from: GenomicVariant.process_params

Validates and transforms the API query parameters

def resource_path()

Inherited from: GenomicVariant.resource_path

Returns the API url name for retrieval

Expand source code
@staticmethod
def resource_path():
    return "genomics/copy-numbers"
def transform_results(data_frame: pandas.core.frame.DataFrame, params={}, **expand_args)

Inherited from: GenomicVariant.transform_results

Transform data frame batch

Expand source code
@staticmethod
def transform_results(data_frame: pd.DataFrame, params={}, **expand_args):
    def expand_id(id_column: pd.Series):
        return pd.concat(
            [
                id_column,
                id_column.str.split(":", expand=True).rename(
                    columns={0: "variant_set_id"}
                )["variant_set_id"],
            ],
            axis=1,
        )

    args = {
        **expand_args,
        "custom_columns": [
            *expand_args.get("custom_columns", []),
            ("id", expand_id),
        ],
    }

    return Frame.expand(data_frame, **args)
class GenomicExpression
Expand source code
class GenomicExpression(GenomicVariant):
    @staticmethod
    def resource_path():
        return "genomics/expressions"

    @staticmethod
    def params_class():
        return GenomicExpressionOptions

    @staticmethod
    def transform_results(data_frame: pd.DataFrame, params={}, **expand_args):
        def expand_id(id_column: pd.Series):
            return pd.concat(
                [
                    id_column,
                    id_column.str.split(":", expand=True).rename(
                        columns={0: "variant_set_id"}
                    )["variant_set_id"],
                ],
                axis=1,
            )

        args = {
            **expand_args,
            "custom_columns": [
                *expand_args.get("custom_columns", []),
                ("id", expand_id),
            ],
        }

        return Frame.expand(data_frame, **args)

    @classmethod
    def get_data_frame(
        cls,
        # Query parameters
        variant_set_ids: List[str] = [],
        include: List[GenomicVariantInclude] = [],
        gene: List[str] = [],
        expression: Optional[str] = None,
        outlier_std_dev: str = None,
        in_ckb: Optional[bool] = None,
        order_by: Optional[str] = None,
        # Execution parameters,
        all_results: bool = False,
        auth_args: Auth = Auth.shared(),
        max_pages: Optional[int] = None,
        page_size: Optional[int] = None,
        log: bool = False,
        **kw_args,
    ):
        """Execute a request for genomic expression

        ## Parameters

        Query: `phc.easy.omics.options.genomic_expression.GenomicExpressionOptions`

        Execution: `phc.easy.query.Query.execute_paging_api`

        Expansion: `phc.easy.frame.Frame.expand`

        """

        args = cls._get_current_args(inspect.currentframe(), locals())

        return super().get_data_frame(
            test_type=GenomicTestType.EXPRESSION, **{**kw_args, **args}
        )

Ancestors

Static methods

def get_data_frame(variant_set_ids: List[str] = [], include: List[GenomicVariantInclude] = [], gene: List[str] = [], expression: Optional[str] = None, outlier_std_dev: str = None, in_ckb: Optional[bool] = None, order_by: Optional[str] = None, all_results: bool = False, auth_args: Auth = <phc.easy.auth.Auth object>, max_pages: Optional[int] = None, page_size: Optional[int] = None, log: bool = False, **kw_args)

Execute a request for genomic expression

Parameters

Query: GenomicExpressionOptions

Execution: Query.execute_paging_api()

Expansion: Frame.expand()

Expand source code
@classmethod
def get_data_frame(
    cls,
    # Query parameters
    variant_set_ids: List[str] = [],
    include: List[GenomicVariantInclude] = [],
    gene: List[str] = [],
    expression: Optional[str] = None,
    outlier_std_dev: str = None,
    in_ckb: Optional[bool] = None,
    order_by: Optional[str] = None,
    # Execution parameters,
    all_results: bool = False,
    auth_args: Auth = Auth.shared(),
    max_pages: Optional[int] = None,
    page_size: Optional[int] = None,
    log: bool = False,
    **kw_args,
):
    """Execute a request for genomic expression

    ## Parameters

    Query: `phc.easy.omics.options.genomic_expression.GenomicExpressionOptions`

    Execution: `phc.easy.query.Query.execute_paging_api`

    Expansion: `phc.easy.frame.Frame.expand`

    """

    args = cls._get_current_args(inspect.currentframe(), locals())

    return super().get_data_frame(
        test_type=GenomicTestType.EXPRESSION, **{**kw_args, **args}
    )
def params_class()

Inherited from: GenomicVariant.params_class

Returns a pydantic type that validates and transforms the params with dict()

Expand source code
@staticmethod
def params_class():
    return GenomicExpressionOptions
def process_params(params: dict) ‑> dict

Inherited from: GenomicVariant.process_params

Validates and transforms the API query parameters

def resource_path()

Inherited from: GenomicVariant.resource_path

Returns the API url name for retrieval

Expand source code
@staticmethod
def resource_path():
    return "genomics/expressions"
def transform_results(data_frame: pandas.core.frame.DataFrame, params={}, **expand_args)

Inherited from: GenomicVariant.transform_results

Transform data frame batch

Expand source code
@staticmethod
def transform_results(data_frame: pd.DataFrame, params={}, **expand_args):
    def expand_id(id_column: pd.Series):
        return pd.concat(
            [
                id_column,
                id_column.str.split(":", expand=True).rename(
                    columns={0: "variant_set_id"}
                )["variant_set_id"],
            ],
            axis=1,
        )

    args = {
        **expand_args,
        "custom_columns": [
            *expand_args.get("custom_columns", []),
            ("id", expand_id),
        ],
    }

    return Frame.expand(data_frame, **args)
class GenomicShortVariant
Expand source code
class GenomicShortVariant(GenomicVariant):
    @staticmethod
    def resource_path():
        return "genomics/variants"

    @staticmethod
    def params_class():
        return GenomicShortVariantOptions

    @staticmethod
    def transform_results(data_frame: pd.DataFrame, params={}, **expand_args):
        def expand_id(id_column: pd.Series):
            return pd.concat(
                [
                    id_column,
                    id_column.str.split(":", expand=True).rename(
                        columns={0: "variant_set_id", 2: "gene"}
                    )[["variant_set_id", "gene"]],
                ],
                axis=1,
            )

        args = {
            **expand_args,
            "custom_columns": [
                *expand_args.get("custom_columns", []),
                *[
                    Frame.codeable_like_column_expander(k)
                    for k in [
                        "clinvar",
                        "cosmic",
                        "vcf",
                        "ensemblCanon",
                        "dbnsfp",
                    ]
                ],
                ("id", expand_id),
            ],
        }

        return Frame.expand(data_frame, **args)

    @classmethod
    def get_data_frame(
        cls,
        # Query parameters
        variant_set_ids: List[str] = [],
        include: List[GenomicVariantInclude] = ["vcf"],
        gene: List[str] = [],
        rs_id: List[str] = [],
        chromosome: List[Chromosome] = [],
        clinvar_allele_id: List[str] = [],
        clinvar_disease: List[str] = [],
        clinvar_review: List[ClinVarReview] = [],
        clinvar_significance: List[ClinVarSignificance] = [],
        cosmic_id: List[str] = [],
        cosmic_status: List[str] = [],
        cosmic_histology: List[str] = [],
        cosmic_tumor_site: List[str] = [],
        variant_class: List[str] = [],
        coding_effect: List[CodingEffect] = [],
        impact: List[str] = [],
        transcript_id: List[str] = [],
        gene_class: List[GeneClass] = [],
        protein_changes: List[str] = [],
        sequence_type: List[str] = [],
        position: List[Union[str, int]] = [],
        cosmic_min_count: Optional[int] = None,
        min_allele_frequency: Optional[str] = None,
        max_allele_frequency: Optional[str] = None,
        pop_allele_frequency: Optional[str] = None,
        exac_allele_frequency: Optional[str] = None,
        exac_homozygous: List[str] = [],
        dbnsfp_damaging_count: List[str] = [],
        dbnsfp_damaging_predictor: List[str] = [],
        dbnsfp_damaging_vote: List[str] = [],
        dbnsfp_fathmm_rankscore: List[str] = [],
        dbnsfp_fathmm_pred: List[str] = [],
        dbnsfp_mean_rankscore: List[str] = [],
        dbnsfp_mean_rankscore_predictor: List[str] = [],
        dbnsfp_mutationtaster_rankscore: List[str] = [],
        dbnsfp_mutationtaster_pred: List[str] = [],
        dbnsfp_sift_rankscore: List[str] = [],
        dbnsfp_sift_pred: List[str] = [],
        zygosity: List[Zygosity] = [],
        genotype: List[str] = [],
        variant_allele_frequency: List[str] = [],
        quality: List[str] = [],
        read_depth: List[str] = [],
        alt_read_depth: List[str] = [],
        ref_read_depth: List[str] = [],
        variant_filter: List[str] = [],
        in_ckb: Optional[bool] = None,
        # Test parameters
        patient_id: Optional[str] = None,
        test_status: Optional[GenomicTestStatus] = GenomicTestStatus.ACTIVE,
        # Execution parameters,
        all_results: bool = False,
        auth_args: Auth = Auth.shared(),
        max_pages: Optional[int] = None,
        page_size: Optional[int] = None,
        log: bool = False,
        **kw_args,
    ):
        """Execute a request for genomic short variants

        ## Parameters

        Query: `phc.easy.omics.options.genomic_short_variant.GenomicShortVariantOptions`

        Execution: `phc.easy.query.Query.execute_paging_api`

        Expansion: `phc.easy.frame.Frame.expand`

        NOTE:
         - `variant_class` is translated to `class` as a parameter
         - `variant_filter` is translated to `filter` as a parameter
        """

        args = cls._get_current_args(inspect.currentframe(), locals())

        return super().get_data_frame(
            test_type=GenomicTestType.SHORT_VARIANT, **{**kw_args, **args}
        )

Ancestors

Static methods

def get_data_frame(variant_set_ids: List[str] = [], include: List[GenomicVariantInclude] = ['vcf'], gene: List[str] = [], rs_id: List[str] = [], chromosome: List[Chromosome] = [], clinvar_allele_id: List[str] = [], clinvar_disease: List[str] = [], clinvar_review: List[ClinVarReview] = [], clinvar_significance: List[ClinVarSignificance] = [], cosmic_id: List[str] = [], cosmic_status: List[str] = [], cosmic_histology: List[str] = [], cosmic_tumor_site: List[str] = [], variant_class: List[str] = [], coding_effect: List[CodingEffect] = [], impact: List[str] = [], transcript_id: List[str] = [], gene_class: List[GeneClass] = [], protein_changes: List[str] = [], sequence_type: List[str] = [], position: List[Union[str, int]] = [], cosmic_min_count: Optional[int] = None, min_allele_frequency: Optional[str] = None, max_allele_frequency: Optional[str] = None, pop_allele_frequency: Optional[str] = None, exac_allele_frequency: Optional[str] = None, exac_homozygous: List[str] = [], dbnsfp_damaging_count: List[str] = [], dbnsfp_damaging_predictor: List[str] = [], dbnsfp_damaging_vote: List[str] = [], dbnsfp_fathmm_rankscore: List[str] = [], dbnsfp_fathmm_pred: List[str] = [], dbnsfp_mean_rankscore: List[str] = [], dbnsfp_mean_rankscore_predictor: List[str] = [], dbnsfp_mutationtaster_rankscore: List[str] = [], dbnsfp_mutationtaster_pred: List[str] = [], dbnsfp_sift_rankscore: List[str] = [], dbnsfp_sift_pred: List[str] = [], zygosity: List[Zygosity] = [], genotype: List[str] = [], variant_allele_frequency: List[str] = [], quality: List[str] = [], read_depth: List[str] = [], alt_read_depth: List[str] = [], ref_read_depth: List[str] = [], variant_filter: List[str] = [], in_ckb: Optional[bool] = None, patient_id: Optional[str] = None, test_status: Optional[GenomicTestStatus] = GenomicTestStatus.ACTIVE, all_results: bool = False, auth_args: Auth = <phc.easy.auth.Auth object>, max_pages: Optional[int] = None, page_size: Optional[int] = None, log: bool = False, **kw_args)

Execute a request for genomic short variants

Parameters

Query: GenomicShortVariantOptions

Execution: Query.execute_paging_api()

Expansion: Frame.expand()

NOTE: - variant_class is translated to class as a parameter - variant_filter is translated to filter as a parameter

Expand source code
@classmethod
def get_data_frame(
    cls,
    # Query parameters
    variant_set_ids: List[str] = [],
    include: List[GenomicVariantInclude] = ["vcf"],
    gene: List[str] = [],
    rs_id: List[str] = [],
    chromosome: List[Chromosome] = [],
    clinvar_allele_id: List[str] = [],
    clinvar_disease: List[str] = [],
    clinvar_review: List[ClinVarReview] = [],
    clinvar_significance: List[ClinVarSignificance] = [],
    cosmic_id: List[str] = [],
    cosmic_status: List[str] = [],
    cosmic_histology: List[str] = [],
    cosmic_tumor_site: List[str] = [],
    variant_class: List[str] = [],
    coding_effect: List[CodingEffect] = [],
    impact: List[str] = [],
    transcript_id: List[str] = [],
    gene_class: List[GeneClass] = [],
    protein_changes: List[str] = [],
    sequence_type: List[str] = [],
    position: List[Union[str, int]] = [],
    cosmic_min_count: Optional[int] = None,
    min_allele_frequency: Optional[str] = None,
    max_allele_frequency: Optional[str] = None,
    pop_allele_frequency: Optional[str] = None,
    exac_allele_frequency: Optional[str] = None,
    exac_homozygous: List[str] = [],
    dbnsfp_damaging_count: List[str] = [],
    dbnsfp_damaging_predictor: List[str] = [],
    dbnsfp_damaging_vote: List[str] = [],
    dbnsfp_fathmm_rankscore: List[str] = [],
    dbnsfp_fathmm_pred: List[str] = [],
    dbnsfp_mean_rankscore: List[str] = [],
    dbnsfp_mean_rankscore_predictor: List[str] = [],
    dbnsfp_mutationtaster_rankscore: List[str] = [],
    dbnsfp_mutationtaster_pred: List[str] = [],
    dbnsfp_sift_rankscore: List[str] = [],
    dbnsfp_sift_pred: List[str] = [],
    zygosity: List[Zygosity] = [],
    genotype: List[str] = [],
    variant_allele_frequency: List[str] = [],
    quality: List[str] = [],
    read_depth: List[str] = [],
    alt_read_depth: List[str] = [],
    ref_read_depth: List[str] = [],
    variant_filter: List[str] = [],
    in_ckb: Optional[bool] = None,
    # Test parameters
    patient_id: Optional[str] = None,
    test_status: Optional[GenomicTestStatus] = GenomicTestStatus.ACTIVE,
    # Execution parameters,
    all_results: bool = False,
    auth_args: Auth = Auth.shared(),
    max_pages: Optional[int] = None,
    page_size: Optional[int] = None,
    log: bool = False,
    **kw_args,
):
    """Execute a request for genomic short variants

    ## Parameters

    Query: `phc.easy.omics.options.genomic_short_variant.GenomicShortVariantOptions`

    Execution: `phc.easy.query.Query.execute_paging_api`

    Expansion: `phc.easy.frame.Frame.expand`

    NOTE:
     - `variant_class` is translated to `class` as a parameter
     - `variant_filter` is translated to `filter` as a parameter
    """

    args = cls._get_current_args(inspect.currentframe(), locals())

    return super().get_data_frame(
        test_type=GenomicTestType.SHORT_VARIANT, **{**kw_args, **args}
    )
def params_class()

Inherited from: GenomicVariant.params_class

Returns a pydantic type that validates and transforms the params with dict()

Expand source code
@staticmethod
def params_class():
    return GenomicShortVariantOptions
def process_params(params: dict) ‑> dict

Inherited from: GenomicVariant.process_params

Validates and transforms the API query parameters

def resource_path()

Inherited from: GenomicVariant.resource_path

Returns the API url name for retrieval

Expand source code
@staticmethod
def resource_path():
    return "genomics/variants"
def transform_results(data_frame: pandas.core.frame.DataFrame, params={}, **expand_args)

Inherited from: GenomicVariant.transform_results

Transform data frame batch

Expand source code
@staticmethod
def transform_results(data_frame: pd.DataFrame, params={}, **expand_args):
    def expand_id(id_column: pd.Series):
        return pd.concat(
            [
                id_column,
                id_column.str.split(":", expand=True).rename(
                    columns={0: "variant_set_id", 2: "gene"}
                )[["variant_set_id", "gene"]],
            ],
            axis=1,
        )

    args = {
        **expand_args,
        "custom_columns": [
            *expand_args.get("custom_columns", []),
            *[
                Frame.codeable_like_column_expander(k)
                for k in [
                    "clinvar",
                    "cosmic",
                    "vcf",
                    "ensemblCanon",
                    "dbnsfp",
                ]
            ],
            ("id", expand_id),
        ],
    }

    return Frame.expand(data_frame, **args)
class GenomicStructuralVariant
Expand source code
class GenomicStructuralVariant(GenomicVariant):
    @staticmethod
    def resource_path():
        return "genomics/structural-variants"

    @staticmethod
    def params_class():
        return GenomicStructuralVariantOptions

    @staticmethod
    def transform_results(data_frame: pd.DataFrame, params={}, **expand_args):
        def expand_id(id_column: pd.Series):
            return pd.concat(
                [
                    id_column,
                    id_column.str.split(":", expand=True).rename(
                        columns={0: "variant_set_id"}
                    )["variant_set_id"],
                ],
                axis=1,
            )

        args = {
            **expand_args,
            "custom_columns": [
                *expand_args.get("custom_columns", []),
                ("id", expand_id),
            ],
        }

        return Frame.expand(data_frame, **args)

    @classmethod
    def get_data_frame(
        cls,
        # Query parameters
        variant_set_ids: List[str] = [],
        gene: List[str] = [],
        effect: List[StructuralType] = [],
        interpretation: List[str] = [],
        in_frame: List[InFrame] = [],
        in_ckb: Optional[bool] = None,
        include: List[GenomicVariantInclude] = [],
        # Execution parameters,
        all_results: bool = False,
        auth_args: Auth = Auth.shared(),
        max_pages: Optional[int] = None,
        page_size: Optional[int] = None,
        log: bool = False,
        **kw_args,
    ):
        """Execute a request for genomic structural variants

        ## Parameters

        Query: `phc.easy.omics.options.genomic_structural_variant.GenomicStructuralVariantOptions`

        Execution: `phc.easy.query.Query.execute_paging_api`

        Expansion: `phc.easy.frame.Frame.expand`

        """

        args = cls._get_current_args(inspect.currentframe(), locals())

        return super().get_data_frame(
            test_type=GenomicTestType.STRUCTURAL_VARIANT, **{**kw_args, **args}
        )

Ancestors

Static methods

def get_data_frame(variant_set_ids: List[str] = [], gene: List[str] = [], effect: List[StructuralType] = [], interpretation: List[str] = [], in_frame: List[InFrame] = [], in_ckb: Optional[bool] = None, include: List[GenomicVariantInclude] = [], all_results: bool = False, auth_args: Auth = <phc.easy.auth.Auth object>, max_pages: Optional[int] = None, page_size: Optional[int] = None, log: bool = False, **kw_args)

Execute a request for genomic structural variants

Parameters

Query: GenomicStructuralVariantOptions

Execution: Query.execute_paging_api()

Expansion: Frame.expand()

Expand source code
@classmethod
def get_data_frame(
    cls,
    # Query parameters
    variant_set_ids: List[str] = [],
    gene: List[str] = [],
    effect: List[StructuralType] = [],
    interpretation: List[str] = [],
    in_frame: List[InFrame] = [],
    in_ckb: Optional[bool] = None,
    include: List[GenomicVariantInclude] = [],
    # Execution parameters,
    all_results: bool = False,
    auth_args: Auth = Auth.shared(),
    max_pages: Optional[int] = None,
    page_size: Optional[int] = None,
    log: bool = False,
    **kw_args,
):
    """Execute a request for genomic structural variants

    ## Parameters

    Query: `phc.easy.omics.options.genomic_structural_variant.GenomicStructuralVariantOptions`

    Execution: `phc.easy.query.Query.execute_paging_api`

    Expansion: `phc.easy.frame.Frame.expand`

    """

    args = cls._get_current_args(inspect.currentframe(), locals())

    return super().get_data_frame(
        test_type=GenomicTestType.STRUCTURAL_VARIANT, **{**kw_args, **args}
    )
def params_class()

Inherited from: GenomicVariant.params_class

Returns a pydantic type that validates and transforms the params with dict()

Expand source code
@staticmethod
def params_class():
    return GenomicStructuralVariantOptions
def process_params(params: dict) ‑> dict

Inherited from: GenomicVariant.process_params

Validates and transforms the API query parameters

def resource_path()

Inherited from: GenomicVariant.resource_path

Returns the API url name for retrieval

Expand source code
@staticmethod
def resource_path():
    return "genomics/structural-variants"
def transform_results(data_frame: pandas.core.frame.DataFrame, params={}, **expand_args)

Inherited from: GenomicVariant.transform_results

Transform data frame batch

Expand source code
@staticmethod
def transform_results(data_frame: pd.DataFrame, params={}, **expand_args):
    def expand_id(id_column: pd.Series):
        return pd.concat(
            [
                id_column,
                id_column.str.split(":", expand=True).rename(
                    columns={0: "variant_set_id"}
                )["variant_set_id"],
            ],
            axis=1,
        )

    args = {
        **expand_args,
        "custom_columns": [
            *expand_args.get("custom_columns", []),
            ("id", expand_id),
        ],
    }

    return Frame.expand(data_frame, **args)
class GenomicTest
Expand source code
class GenomicTest(PagingApiItem):
    @staticmethod
    def resource_path():
        return "genomics/projects/{project_id}/tests"

    @staticmethod
    def params_class():
        return GenomicTestOptions

    @staticmethod
    def transform_results(
        data_frame: pd.DataFrame, params: dict, **expand_args
    ):
        args = {
            **expand_args,
            "code_columns": [
                *expand_args.get("code_columns", []),
                "bodySite",
                "patient",
            ],
            "custom_columns": [
                *expand_args.get("custom_columns", []),
                Frame.codeable_like_column_expander("sourceFile"),
            ],
        }
        df = Frame.expand(data_frame, **args)

        if "sets" in df.columns:
            df = (
                pd.concat(
                    data_frame.apply(
                        lambda x: pd.DataFrame(
                            [{"index": x.name, **s} for s in x.sets]
                        ),
                        axis=1,
                    ).values
                )
                .join(df.drop(["sets"], axis=1), on="index", rsuffix=".test")
                .drop(["index"], axis=1)
                .reset_index(drop=True)
            )

        test_type = params.get("type", None)

        if test_type and len(df) > 0:
            # TODO: Remove when API fixed

            # NOTE: The API does not filter the returned sets because it is a
            # nested structure. Since it's not a boatload of information, we opt
            # to filter client-side for now.
            return df[df.setType == test_type].reset_index(drop=True)

        return df

    @classmethod
    def get_data_frame(
        cls,
        patient_id: Optional[str] = None,
        status: Optional[GenomicTestStatus] = GenomicTestStatus.ACTIVE,
        test_type: Optional[GenomicTestType] = None,
        all_results: bool = False,
        auth_args: Auth = Auth.shared(),
        max_pages: Optional[int] = None,
        page_size: Optional[int] = None,
        log: bool = False,
        ignore_cache: bool = False,
        **kw_args,
    ):
        """Execute a request for genomic tests

        ## Parameters

        Query: `phc.easy.omics.options.genomic_test.GenomicTestOptions`

        Execution: `phc.easy.query.Query.execute_paging_api`

        Expansion: `phc.easy.frame.Frame.expand`

        NOTE: `test_type` is translated to `type` as a parameter
        """
        df = super().get_data_frame(
            **kw_args, **cls._get_current_args(inspect.currentframe(), locals())
        )

        return df

Ancestors

Static methods

def get_data_frame(patient_id: Optional[str] = None, status: Optional[GenomicTestStatus] = GenomicTestStatus.ACTIVE, test_type: Optional[GenomicTestType] = None, all_results: bool = False, auth_args: Auth = <phc.easy.auth.Auth object>, max_pages: Optional[int] = None, page_size: Optional[int] = None, log: bool = False, ignore_cache: bool = False, **kw_args)

Execute a request for genomic tests

Parameters

Query: phc.easy.omics.options.genomic_test.GenomicTestOptions

Execution: Query.execute_paging_api()

Expansion: Frame.expand()

NOTE: test_type is translated to type as a parameter

Expand source code
@classmethod
def get_data_frame(
    cls,
    patient_id: Optional[str] = None,
    status: Optional[GenomicTestStatus] = GenomicTestStatus.ACTIVE,
    test_type: Optional[GenomicTestType] = None,
    all_results: bool = False,
    auth_args: Auth = Auth.shared(),
    max_pages: Optional[int] = None,
    page_size: Optional[int] = None,
    log: bool = False,
    ignore_cache: bool = False,
    **kw_args,
):
    """Execute a request for genomic tests

    ## Parameters

    Query: `phc.easy.omics.options.genomic_test.GenomicTestOptions`

    Execution: `phc.easy.query.Query.execute_paging_api`

    Expansion: `phc.easy.frame.Frame.expand`

    NOTE: `test_type` is translated to `type` as a parameter
    """
    df = super().get_data_frame(
        **kw_args, **cls._get_current_args(inspect.currentframe(), locals())
    )

    return df
def params_class()

Inherited from: PagingApiItem.params_class

Returns a pydantic type that validates and transforms the params with dict()

Expand source code
@staticmethod
def params_class():
    return GenomicTestOptions
def process_params(params: dict) ‑> dict

Inherited from: PagingApiItem.process_params

Validates and transforms the API query parameters

def resource_path()

Inherited from: PagingApiItem.resource_path

Returns the API url name for retrieval

Expand source code
@staticmethod
def resource_path():
    return "genomics/projects/{project_id}/tests"
def transform_results(data_frame: pandas.core.frame.DataFrame, params: dict, **expand_args)

Inherited from: PagingApiItem.transform_results

Transform data frame batch

Expand source code
@staticmethod
def transform_results(
    data_frame: pd.DataFrame, params: dict, **expand_args
):
    args = {
        **expand_args,
        "code_columns": [
            *expand_args.get("code_columns", []),
            "bodySite",
            "patient",
        ],
        "custom_columns": [
            *expand_args.get("custom_columns", []),
            Frame.codeable_like_column_expander("sourceFile"),
        ],
    }
    df = Frame.expand(data_frame, **args)

    if "sets" in df.columns:
        df = (
            pd.concat(
                data_frame.apply(
                    lambda x: pd.DataFrame(
                        [{"index": x.name, **s} for s in x.sets]
                    ),
                    axis=1,
                ).values
            )
            .join(df.drop(["sets"], axis=1), on="index", rsuffix=".test")
            .drop(["index"], axis=1)
            .reset_index(drop=True)
        )

    test_type = params.get("type", None)

    if test_type and len(df) > 0:
        # TODO: Remove when API fixed

        # NOTE: The API does not filter the returned sets because it is a
        # nested structure. Since it's not a boatload of information, we opt
        # to filter client-side for now.
        return df[df.setType == test_type].reset_index(drop=True)

    return df
class Goal

Provides an abstract class and/or static methods for retrieving items from a FSS table that relates to a patient

Expand source code
class Goal(FhirServicePatientItem):
    @staticmethod
    def table_name():
        return "goal"

    @staticmethod
    def code_fields():
        return ["meta.tag", "target.detailQuantity", "target.measure.coding"]

    @staticmethod
    def transform_results(data_frame: pd.DataFrame, **expand_args):
        args = {
            **expand_args,
            "date_columns": [*expand_args.get("date_columns", []), "startDate"],
            "custom_columns": [
                *expand_args.get("custom_columns", []),
                Frame.codeable_like_column_expander("subject"),
            ],
        }

        return Frame.expand(data_frame, **args)

Ancestors

Static methods

def code_fields()

Inherited from: FhirServicePatientItem.code_fields

Returns the code keys (e.g. when searching for codes)

Expand source code
@staticmethod
def code_fields():
    return ["meta.tag", "target.detailQuantity", "target.measure.coding"]
def get_codes(display_query: Optional[str] = None, sample_size: Optional[int] = None, exclude_meta_tag=True, **kwargs)

Inherited from: FhirServicePatientItem.get_codes

Find all codes …

def get_count(query_overrides: dict = {}, auth_args=<phc.easy.auth.Auth object>)

Inherited from: FhirServicePatientItem.get_count

Get the count for a given FSS query

def get_count_by_field(field: str, **kwargs)

Inherited from: FhirServicePatientItem.get_count_by_field

Count records by a given field …

def get_count_by_patient(**kwargs)

Inherited from: FhirServicePatientItem.get_count_by_patient

Count records by a given field …

def get_data_frame(all_results: bool = False, raw: bool = False, id: Optional[str] = None, ids: List[str] = [], patient_id: Optional[str] = None, patient_ids: List[str] = [], device_id: Optional[str] = None, device_ids: List[str] = [], page_size: Optional[int] = None, max_pages: Optional[int] = None, query_overrides: dict = {}, auth_args=<phc.easy.auth.Auth object>, ignore_cache: bool = False, expand_args: dict = {}, log: bool = False, term: Optional[dict] = None, terms: List[dict] = [], max_terms: int = 30000, code: Union[str, List[str], None] = None, display: Union[str, List[str], None] = None, system: Union[str, List[str], None] = None, code_fields: List[str] = [])

Inherited from: FhirServicePatientItem.get_data_frame

Retrieve records …

def table_name()

Inherited from: FhirServicePatientItem.table_name

Returns the FSS table name for retrieval

Expand source code
@staticmethod
def table_name():
    return "goal"
def transform_results(data_frame: pandas.core.frame.DataFrame, **expand_args)

Inherited from: FhirServicePatientItem.transform_results

Transform data frame batch

Expand source code
@staticmethod
def transform_results(data_frame: pd.DataFrame, **expand_args):
    args = {
        **expand_args,
        "date_columns": [*expand_args.get("date_columns", []), "startDate"],
        "custom_columns": [
            *expand_args.get("custom_columns", []),
            Frame.codeable_like_column_expander("subject"),
        ],
    }

    return Frame.expand(data_frame, **args)
class ImagingStudy

Provides an abstract class and/or static methods for retrieving items from a FSS table that relates to a patient

Expand source code
class ImagingStudy(FhirServicePatientItem):
    @staticmethod
    def table_name():
        return "imaging_study"

    @staticmethod
    def patient_key():
        return "patient.reference"

    @staticmethod
    def code_fields():
        return ["procedureCode.coding" "meta.tag"]

    @staticmethod
    def transform_results(df: pd.DataFrame, **expand_args):
        return Frame.expand(
            df,
            code_columns=[
                *expand_args.get("code_columns", []),
                "procedureCode",
            ],
            custom_columns=[
                *expand_args.get("custom_columns", []),
                Frame.codeable_like_column_expander("patient"),
                Frame.codeable_like_column_expander("context"),
            ],
        )

Ancestors

Static methods

def code_fields()

Inherited from: FhirServicePatientItem.code_fields

Returns the code keys (e.g. when searching for codes)

Expand source code
@staticmethod
def code_fields():
    return ["procedureCode.coding" "meta.tag"]
def get_codes(display_query: Optional[str] = None, sample_size: Optional[int] = None, exclude_meta_tag=True, **kwargs)

Inherited from: FhirServicePatientItem.get_codes

Find all codes …

def get_count(query_overrides: dict = {}, auth_args=<phc.easy.auth.Auth object>)

Inherited from: FhirServicePatientItem.get_count

Get the count for a given FSS query

def get_count_by_field(field: str, **kwargs)

Inherited from: FhirServicePatientItem.get_count_by_field

Count records by a given field …

def get_count_by_patient(**kwargs)

Inherited from: FhirServicePatientItem.get_count_by_patient

Count records by a given field …

def get_data_frame(all_results: bool = False, raw: bool = False, id: Optional[str] = None, ids: List[str] = [], patient_id: Optional[str] = None, patient_ids: List[str] = [], device_id: Optional[str] = None, device_ids: List[str] = [], page_size: Optional[int] = None, max_pages: Optional[int] = None, query_overrides: dict = {}, auth_args=<phc.easy.auth.Auth object>, ignore_cache: bool = False, expand_args: dict = {}, log: bool = False, term: Optional[dict] = None, terms: List[dict] = [], max_terms: int = 30000, code: Union[str, List[str], None] = None, display: Union[str, List[str], None] = None, system: Union[str, List[str], None] = None, code_fields: List[str] = [])

Inherited from: FhirServicePatientItem.get_data_frame

Retrieve records …

def patient_key()
Expand source code
@staticmethod
def patient_key():
    return "patient.reference"
def table_name()

Inherited from: FhirServicePatientItem.table_name

Returns the FSS table name for retrieval

Expand source code
@staticmethod
def table_name():
    return "imaging_study"
def transform_results(df: pandas.core.frame.DataFrame, **expand_args)

Inherited from: FhirServicePatientItem.transform_results

Transform data frame batch

Expand source code
@staticmethod
def transform_results(df: pd.DataFrame, **expand_args):
    return Frame.expand(
        df,
        code_columns=[
            *expand_args.get("code_columns", []),
            "procedureCode",
        ],
        custom_columns=[
            *expand_args.get("custom_columns", []),
            Frame.codeable_like_column_expander("patient"),
            Frame.codeable_like_column_expander("context"),
        ],
    )
class Immunization

Provides an abstract class and/or static methods for retrieving items from a FSS table that relates to a patient

Expand source code
class Immunization(FhirServicePatientItem):
    @staticmethod
    def table_name():
        return "immunization"

    @staticmethod
    def patient_id_prefixes():
        return ["Patient/", "urn:uuid:"]

    @staticmethod
    def patient_key():
        return "patient.reference"

    @staticmethod
    def code_fields():
        return ["vaccineCode.coding", "meta.tag"]

    @staticmethod
    def transform_results(df: pd.DataFrame, **expand_args):
        return Frame.expand(
            df,
            date_columns=[*expand_args.get("date_columns", []), "date"],
            code_columns=[*expand_args.get("code_columns", []), "vaccineCode"],
            custom_columns=[
                *expand_args.get("custom_columns", []),
                Frame.codeable_like_column_expander("patient"),
                Frame.codeable_like_column_expander("encounter"),
            ],
        )

Ancestors

Static methods

def code_fields()

Inherited from: FhirServicePatientItem.code_fields

Returns the code keys (e.g. when searching for codes)

Expand source code
@staticmethod
def code_fields():
    return ["vaccineCode.coding", "meta.tag"]
def get_codes(display_query: Optional[str] = None, sample_size: Optional[int] = None, exclude_meta_tag=True, **kwargs)

Inherited from: FhirServicePatientItem.get_codes

Find all codes …

def get_count(query_overrides: dict = {}, auth_args=<phc.easy.auth.Auth object>)

Inherited from: FhirServicePatientItem.get_count

Get the count for a given FSS query

def get_count_by_field(field: str, **kwargs)

Inherited from: FhirServicePatientItem.get_count_by_field

Count records by a given field …

def get_count_by_patient(**kwargs)

Inherited from: FhirServicePatientItem.get_count_by_patient

Count records by a given field …

def get_data_frame(all_results: bool = False, raw: bool = False, id: Optional[str] = None, ids: List[str] = [], patient_id: Optional[str] = None, patient_ids: List[str] = [], device_id: Optional[str] = None, device_ids: List[str] = [], page_size: Optional[int] = None, max_pages: Optional[int] = None, query_overrides: dict = {}, auth_args=<phc.easy.auth.Auth object>, ignore_cache: bool = False, expand_args: dict = {}, log: bool = False, term: Optional[dict] = None, terms: List[dict] = [], max_terms: int = 30000, code: Union[str, List[str], None] = None, display: Union[str, List[str], None] = None, system: Union[str, List[str], None] = None, code_fields: List[str] = [])

Inherited from: FhirServicePatientItem.get_data_frame

Retrieve records …

def patient_id_prefixes()
Expand source code
@staticmethod
def patient_id_prefixes():
    return ["Patient/", "urn:uuid:"]
def patient_key()
Expand source code
@staticmethod
def patient_key():
    return "patient.reference"
def table_name()

Inherited from: FhirServicePatientItem.table_name

Returns the FSS table name for retrieval

Expand source code
@staticmethod
def table_name():
    return "immunization"
def transform_results(df: pandas.core.frame.DataFrame, **expand_args)

Inherited from: FhirServicePatientItem.transform_results

Transform data frame batch

Expand source code
@staticmethod
def transform_results(df: pd.DataFrame, **expand_args):
    return Frame.expand(
        df,
        date_columns=[*expand_args.get("date_columns", []), "date"],
        code_columns=[*expand_args.get("code_columns", []), "vaccineCode"],
        custom_columns=[
            *expand_args.get("custom_columns", []),
            Frame.codeable_like_column_expander("patient"),
            Frame.codeable_like_column_expander("encounter"),
        ],
    )
class Media

Provides an abstract class and/or static methods for retrieving items from a FSS table that relates to a patient

Expand source code
class Media(FhirServicePatientItem):
    @staticmethod
    def table_name():
        return "media"

    @staticmethod
    def code_fields():
        return ["bodySite.coding", "meta.tag"]

    @staticmethod
    def transform_results(df: pd.DataFrame, **expand_args):
        return Frame.expand(
            df,
            date_columns=[
                *expand_args.get("date_columns", []),
                "occurrenceDateTime",
            ],
            code_columns=[*expand_args.get("code_columns", []), "bodySite"],
            custom_columns=[
                *expand_args.get("custom_columns", []),
                Frame.codeable_like_column_expander("subject"),
                (
                    "content",
                    lambda r: pd.json_normalize(r).add_prefix("content."),
                ),
            ],
        )

Ancestors

Static methods

def code_fields()

Inherited from: FhirServicePatientItem.code_fields

Returns the code keys (e.g. when searching for codes)

Expand source code
@staticmethod
def code_fields():
    return ["bodySite.coding", "meta.tag"]
def get_codes(display_query: Optional[str] = None, sample_size: Optional[int] = None, exclude_meta_tag=True, **kwargs)

Inherited from: FhirServicePatientItem.get_codes

Find all codes …

def get_count(query_overrides: dict = {}, auth_args=<phc.easy.auth.Auth object>)

Inherited from: FhirServicePatientItem.get_count

Get the count for a given FSS query

def get_count_by_field(field: str, **kwargs)

Inherited from: FhirServicePatientItem.get_count_by_field

Count records by a given field …

def get_count_by_patient(**kwargs)

Inherited from: FhirServicePatientItem.get_count_by_patient

Count records by a given field …

def get_data_frame(all_results: bool = False, raw: bool = False, id: Optional[str] = None, ids: List[str] = [], patient_id: Optional[str] = None, patient_ids: List[str] = [], device_id: Optional[str] = None, device_ids: List[str] = [], page_size: Optional[int] = None, max_pages: Optional[int] = None, query_overrides: dict = {}, auth_args=<phc.easy.auth.Auth object>, ignore_cache: bool = False, expand_args: dict = {}, log: bool = False, term: Optional[dict] = None, terms: List[dict] = [], max_terms: int = 30000, code: Union[str, List[str], None] = None, display: Union[str, List[str], None] = None, system: Union[str, List[str], None] = None, code_fields: List[str] = [])

Inherited from: FhirServicePatientItem.get_data_frame

Retrieve records …

def table_name()

Inherited from: FhirServicePatientItem.table_name

Returns the FSS table name for retrieval

Expand source code
@staticmethod
def table_name():
    return "media"
def transform_results(df: pandas.core.frame.DataFrame, **expand_args)

Inherited from: FhirServicePatientItem.transform_results

Transform data frame batch

Expand source code
@staticmethod
def transform_results(df: pd.DataFrame, **expand_args):
    return Frame.expand(
        df,
        date_columns=[
            *expand_args.get("date_columns", []),
            "occurrenceDateTime",
        ],
        code_columns=[*expand_args.get("code_columns", []), "bodySite"],
        custom_columns=[
            *expand_args.get("custom_columns", []),
            Frame.codeable_like_column_expander("subject"),
            (
                "content",
                lambda r: pd.json_normalize(r).add_prefix("content."),
            ),
        ],
    )
class MedicationAdministration

Provides an abstract class and/or static methods for retrieving items from a FSS table that relates to a patient

Expand source code
class MedicationAdministration(FhirServicePatientItem):
    @staticmethod
    def table_name():
        return "medication_administration"

    @staticmethod
    def code_fields():
        return ["medicationCodeableConcept.coding", "dosage.dose", "meta.tag"]

    @staticmethod
    def transform_results(df: pd.DataFrame, **expand_args):
        return Frame.expand(
            df,
            date_columns=[
                *expand_args.get("date_columns", []),
                "effectivePeriod.start",
                "effectivePeriod.end",
            ],
            code_columns=[
                *expand_args.get("code_columns", []),
                "medicationCodeableConcept",
            ],
            custom_columns=[
                *expand_args.get("custom_columns", []),
                Frame.codeable_like_column_expander("subject"),
                Frame.codeable_like_column_expander("context"),
                Frame.codeable_like_column_expander("prescription"),
                Frame.codeable_like_column_expander("dosage"),
                Frame.codeable_like_column_expander("effectivePeriod"),
            ],
        )

Ancestors

Static methods

def code_fields()

Inherited from: FhirServicePatientItem.code_fields

Returns the code keys (e.g. when searching for codes)

Expand source code
@staticmethod
def code_fields():
    return ["medicationCodeableConcept.coding", "dosage.dose", "meta.tag"]
def get_codes(display_query: Optional[str] = None, sample_size: Optional[int] = None, exclude_meta_tag=True, **kwargs)

Inherited from: FhirServicePatientItem.get_codes

Find all codes …

def get_count(query_overrides: dict = {}, auth_args=<phc.easy.auth.Auth object>)

Inherited from: FhirServicePatientItem.get_count

Get the count for a given FSS query

def get_count_by_field(field: str, **kwargs)

Inherited from: FhirServicePatientItem.get_count_by_field

Count records by a given field …

def get_count_by_patient(**kwargs)

Inherited from: FhirServicePatientItem.get_count_by_patient

Count records by a given field …

def get_data_frame(all_results: bool = False, raw: bool = False, id: Optional[str] = None, ids: List[str] = [], patient_id: Optional[str] = None, patient_ids: List[str] = [], device_id: Optional[str] = None, device_ids: List[str] = [], page_size: Optional[int] = None, max_pages: Optional[int] = None, query_overrides: dict = {}, auth_args=<phc.easy.auth.Auth object>, ignore_cache: bool = False, expand_args: dict = {}, log: bool = False, term: Optional[dict] = None, terms: List[dict] = [], max_terms: int = 30000, code: Union[str, List[str], None] = None, display: Union[str, List[str], None] = None, system: Union[str, List[str], None] = None, code_fields: List[str] = [])

Inherited from: FhirServicePatientItem.get_data_frame

Retrieve records …

def table_name()

Inherited from: FhirServicePatientItem.table_name

Returns the FSS table name for retrieval

Expand source code
@staticmethod
def table_name():
    return "medication_administration"
def transform_results(df: pandas.core.frame.DataFrame, **expand_args)

Inherited from: FhirServicePatientItem.transform_results

Transform data frame batch

Expand source code
@staticmethod
def transform_results(df: pd.DataFrame, **expand_args):
    return Frame.expand(
        df,
        date_columns=[
            *expand_args.get("date_columns", []),
            "effectivePeriod.start",
            "effectivePeriod.end",
        ],
        code_columns=[
            *expand_args.get("code_columns", []),
            "medicationCodeableConcept",
        ],
        custom_columns=[
            *expand_args.get("custom_columns", []),
            Frame.codeable_like_column_expander("subject"),
            Frame.codeable_like_column_expander("context"),
            Frame.codeable_like_column_expander("prescription"),
            Frame.codeable_like_column_expander("dosage"),
            Frame.codeable_like_column_expander("effectivePeriod"),
        ],
    )
class MedicationDispense

Provides an abstract class and/or static methods for retrieving items from a FSS table that relates to a patient

Expand source code
class MedicationDispense(FhirServicePatientItem):
    @staticmethod
    def table_name():
        return "medication_dispense"

    @staticmethod
    def code_fields():
        return [
            "quantity",
            "medicationCodeableConcept.coding",
            "dosageInstruction.route.coding",
            "daysSupply",
            "meta.tag",
        ]

    @staticmethod
    def transform_results(df: pd.DataFrame, **expand_args):
        return Frame.expand(
            df,
            code_columns=[
                *expand_args.get("code_columns", []),
                "medicationCodeableConcept",
                "quantity",
                "dosageInstruction",
                "daysSupply",
            ],
            custom_columns=[
                *expand_args.get("custom_columns", []),
                Frame.codeable_like_column_expander("subject"),
            ],
        )

Ancestors

Static methods

def code_fields()

Inherited from: FhirServicePatientItem.code_fields

Returns the code keys (e.g. when searching for codes)

Expand source code
@staticmethod
def code_fields():
    return [
        "quantity",
        "medicationCodeableConcept.coding",
        "dosageInstruction.route.coding",
        "daysSupply",
        "meta.tag",
    ]
def get_codes(display_query: Optional[str] = None, sample_size: Optional[int] = None, exclude_meta_tag=True, **kwargs)

Inherited from: FhirServicePatientItem.get_codes

Find all codes …

def get_count(query_overrides: dict = {}, auth_args=<phc.easy.auth.Auth object>)

Inherited from: FhirServicePatientItem.get_count

Get the count for a given FSS query

def get_count_by_field(field: str, **kwargs)

Inherited from: FhirServicePatientItem.get_count_by_field

Count records by a given field …

def get_count_by_patient(**kwargs)

Inherited from: FhirServicePatientItem.get_count_by_patient

Count records by a given field …

def get_data_frame(all_results: bool = False, raw: bool = False, id: Optional[str] = None, ids: List[str] = [], patient_id: Optional[str] = None, patient_ids: List[str] = [], device_id: Optional[str] = None, device_ids: List[str] = [], page_size: Optional[int] = None, max_pages: Optional[int] = None, query_overrides: dict = {}, auth_args=<phc.easy.auth.Auth object>, ignore_cache: bool = False, expand_args: dict = {}, log: bool = False, term: Optional[dict] = None, terms: List[dict] = [], max_terms: int = 30000, code: Union[str, List[str], None] = None, display: Union[str, List[str], None] = None, system: Union[str, List[str], None] = None, code_fields: List[str] = [])

Inherited from: FhirServicePatientItem.get_data_frame

Retrieve records …

def table_name()

Inherited from: FhirServicePatientItem.table_name

Returns the FSS table name for retrieval

Expand source code
@staticmethod
def table_name():
    return "medication_dispense"
def transform_results(df: pandas.core.frame.DataFrame, **expand_args)

Inherited from: FhirServicePatientItem.transform_results

Transform data frame batch

Expand source code
@staticmethod
def transform_results(df: pd.DataFrame, **expand_args):
    return Frame.expand(
        df,
        code_columns=[
            *expand_args.get("code_columns", []),
            "medicationCodeableConcept",
            "quantity",
            "dosageInstruction",
            "daysSupply",
        ],
        custom_columns=[
            *expand_args.get("custom_columns", []),
            Frame.codeable_like_column_expander("subject"),
        ],
    )
class MedicationRequest

Provides an abstract class and/or static methods for retrieving items from a FSS table that relates to a patient

Expand source code
class MedicationRequest(FhirServicePatientItem):
    @staticmethod
    def table_name():
        return "medication_request"

    @staticmethod
    def code_fields():
        return ["medicationCodeableConcept.coding", "meta.tag"]

    @staticmethod
    def transform_results(df: pd.DataFrame, **expand_args):
        return Frame.expand(
            df,
            date_columns=[
                *expand_args.get("date_columns", []),
                "authoredOn",
                "dispenseRequest.validityPeriod.start",
                "dispenseRequest.validityPeriod.end",
            ],
            code_columns=[
                *expand_args.get("code_columns", []),
                "medicationCodeableConcept",
            ],
            custom_columns=[
                *expand_args.get("custom_columns", []),
                Frame.codeable_like_column_expander("subject"),
                Frame.codeable_like_column_expander("context"),
                Frame.codeable_like_column_expander("note"),
                (
                    "dispenseRequest",
                    lambda r: pd.json_normalize(r).add_prefix(
                        "dispenseRequest."
                    ),
                ),
            ],
        )

Ancestors

Static methods

def code_fields()

Inherited from: FhirServicePatientItem.code_fields

Returns the code keys (e.g. when searching for codes)

Expand source code
@staticmethod
def code_fields():
    return ["medicationCodeableConcept.coding", "meta.tag"]
def get_codes(display_query: Optional[str] = None, sample_size: Optional[int] = None, exclude_meta_tag=True, **kwargs)

Inherited from: FhirServicePatientItem.get_codes

Find all codes …

def get_count(query_overrides: dict = {}, auth_args=<phc.easy.auth.Auth object>)

Inherited from: FhirServicePatientItem.get_count

Get the count for a given FSS query

def get_count_by_field(field: str, **kwargs)

Inherited from: FhirServicePatientItem.get_count_by_field

Count records by a given field …

def get_count_by_patient(**kwargs)

Inherited from: FhirServicePatientItem.get_count_by_patient

Count records by a given field …

def get_data_frame(all_results: bool = False, raw: bool = False, id: Optional[str] = None, ids: List[str] = [], patient_id: Optional[str] = None, patient_ids: List[str] = [], device_id: Optional[str] = None, device_ids: List[str] = [], page_size: Optional[int] = None, max_pages: Optional[int] = None, query_overrides: dict = {}, auth_args=<phc.easy.auth.Auth object>, ignore_cache: bool = False, expand_args: dict = {}, log: bool = False, term: Optional[dict] = None, terms: List[dict] = [], max_terms: int = 30000, code: Union[str, List[str], None] = None, display: Union[str, List[str], None] = None, system: Union[str, List[str], None] = None, code_fields: List[str] = [])

Inherited from: FhirServicePatientItem.get_data_frame

Retrieve records …

def table_name()

Inherited from: FhirServicePatientItem.table_name

Returns the FSS table name for retrieval

Expand source code
@staticmethod
def table_name():
    return "medication_request"
def transform_results(df: pandas.core.frame.DataFrame, **expand_args)

Inherited from: FhirServicePatientItem.transform_results

Transform data frame batch

Expand source code
@staticmethod
def transform_results(df: pd.DataFrame, **expand_args):
    return Frame.expand(
        df,
        date_columns=[
            *expand_args.get("date_columns", []),
            "authoredOn",
            "dispenseRequest.validityPeriod.start",
            "dispenseRequest.validityPeriod.end",
        ],
        code_columns=[
            *expand_args.get("code_columns", []),
            "medicationCodeableConcept",
        ],
        custom_columns=[
            *expand_args.get("custom_columns", []),
            Frame.codeable_like_column_expander("subject"),
            Frame.codeable_like_column_expander("context"),
            Frame.codeable_like_column_expander("note"),
            (
                "dispenseRequest",
                lambda r: pd.json_normalize(r).add_prefix(
                    "dispenseRequest."
                ),
            ),
        ],
    )
class MedicationStatement

Provides an abstract class and/or static methods for retrieving items from a FSS table that relates to a patient

Expand source code
class MedicationStatement(FhirServicePatientItem):
    @staticmethod
    def table_name():
        return "medication_statement"

    @staticmethod
    def code_fields():
        return ["medicationCodeableConcept.coding", "meta.tag"]

    @staticmethod
    def transform_results(df: pd.DataFrame, **expand_args):
        return Frame.expand(
            df,
            date_columns=[
                *expand_args.get("date_columns", []),
                "effectiveDateTime",
            ],
            code_columns=[
                *expand_args.get("code_columns", []),
                "medicationCodeableConcept",
            ],
            custom_columns=[
                *expand_args.get("custom_columns", []),
                Frame.codeable_like_column_expander("subject"),
            ],
        )

Ancestors

Static methods

def code_fields()

Inherited from: FhirServicePatientItem.code_fields

Returns the code keys (e.g. when searching for codes)

Expand source code
@staticmethod
def code_fields():
    return ["medicationCodeableConcept.coding", "meta.tag"]
def get_codes(display_query: Optional[str] = None, sample_size: Optional[int] = None, exclude_meta_tag=True, **kwargs)

Inherited from: FhirServicePatientItem.get_codes

Find all codes …

def get_count(query_overrides: dict = {}, auth_args=<phc.easy.auth.Auth object>)

Inherited from: FhirServicePatientItem.get_count

Get the count for a given FSS query

def get_count_by_field(field: str, **kwargs)

Inherited from: FhirServicePatientItem.get_count_by_field

Count records by a given field …

def get_count_by_patient(**kwargs)

Inherited from: FhirServicePatientItem.get_count_by_patient

Count records by a given field …

def get_data_frame(all_results: bool = False, raw: bool = False, id: Optional[str] = None, ids: List[str] = [], patient_id: Optional[str] = None, patient_ids: List[str] = [], device_id: Optional[str] = None, device_ids: List[str] = [], page_size: Optional[int] = None, max_pages: Optional[int] = None, query_overrides: dict = {}, auth_args=<phc.easy.auth.Auth object>, ignore_cache: bool = False, expand_args: dict = {}, log: bool = False, term: Optional[dict] = None, terms: List[dict] = [], max_terms: int = 30000, code: Union[str, List[str], None] = None, display: Union[str, List[str], None] = None, system: Union[str, List[str], None] = None, code_fields: List[str] = [])

Inherited from: FhirServicePatientItem.get_data_frame

Retrieve records …

def table_name()

Inherited from: FhirServicePatientItem.table_name

Returns the FSS table name for retrieval

Expand source code
@staticmethod
def table_name():
    return "medication_statement"
def transform_results(df: pandas.core.frame.DataFrame, **expand_args)

Inherited from: FhirServicePatientItem.transform_results

Transform data frame batch

Expand source code
@staticmethod
def transform_results(df: pd.DataFrame, **expand_args):
    return Frame.expand(
        df,
        date_columns=[
            *expand_args.get("date_columns", []),
            "effectiveDateTime",
        ],
        code_columns=[
            *expand_args.get("code_columns", []),
            "medicationCodeableConcept",
        ],
        custom_columns=[
            *expand_args.get("custom_columns", []),
            Frame.codeable_like_column_expander("subject"),
        ],
    )
class Observation

Provides an abstract class and/or static methods for retrieving items from a FSS table that relates to a patient

Expand source code
class Observation(FhirServicePatientItem):
    @staticmethod
    def table_name():
        return "observation"

    @staticmethod
    def code_fields():
        return [
            "meta.tag",
            "code.coding",
            "component.code.coding",
            "valueCodeableConcept.coding",
            "category.coding",
            "referenceRange.type.coding",
        ]

    @classmethod
    def get_codes(cls, query: Optional[str] = None):
        """Find codes based on case-insensitive matching of code/display/system

        Example
        --------
        >>> import phc.easy as phc
        >>> phc.Auth.set({'account': '<your-account-name>'})
        >>> phc.Project.set_current('My Project Name')
        >>>
        >>> phc.Observation.get_codes("loinc")
        """
        return search(
            SummaryItemCounts.get_data_frame(cls.table_name()), query=query
        )

    @staticmethod
    def transform_results(data_frame: pd.DataFrame, **expand_args):
        args = {
            **expand_args,
            "code_columns": [
                *expand_args.get("code_columns", []),
                "component",
                "interpretation",
            ],
            "custom_columns": [
                *expand_args.get("custom_columns", []),
                Frame.codeable_like_column_expander("subject"),
                Frame.codeable_like_column_expander("related"),
                Frame.codeable_like_column_expander("performer"),
                Frame.codeable_like_column_expander("context"),
            ],
        }

        return Frame.expand(data_frame, **args)

Ancestors

Static methods

def code_fields()

Inherited from: FhirServicePatientItem.code_fields

Returns the code keys (e.g. when searching for codes)

Expand source code
@staticmethod
def code_fields():
    return [
        "meta.tag",
        "code.coding",
        "component.code.coding",
        "valueCodeableConcept.coding",
        "category.coding",
        "referenceRange.type.coding",
    ]
def get_codes(query: Optional[str] = None)

Find codes based on case-insensitive matching of code/display/system

Example

>>> import phc.easy as phc
>>> phc.Auth.set({'account': '<your-account-name>'})
>>> phc.Project.set_current('My Project Name')
>>>
>>> phc.Observation.get_codes("loinc")
Expand source code
@classmethod
def get_codes(cls, query: Optional[str] = None):
    """Find codes based on case-insensitive matching of code/display/system

    Example
    --------
    >>> import phc.easy as phc
    >>> phc.Auth.set({'account': '<your-account-name>'})
    >>> phc.Project.set_current('My Project Name')
    >>>
    >>> phc.Observation.get_codes("loinc")
    """
    return search(
        SummaryItemCounts.get_data_frame(cls.table_name()), query=query
    )
def get_count(query_overrides: dict = {}, auth_args=<phc.easy.auth.Auth object>)

Inherited from: FhirServicePatientItem.get_count

Get the count for a given FSS query

def get_count_by_field(field: str, **kwargs)

Inherited from: FhirServicePatientItem.get_count_by_field

Count records by a given field …

def get_count_by_patient(**kwargs)

Inherited from: FhirServicePatientItem.get_count_by_patient

Count records by a given field …

def get_data_frame(all_results: bool = False, raw: bool = False, id: Optional[str] = None, ids: List[str] = [], patient_id: Optional[str] = None, patient_ids: List[str] = [], device_id: Optional[str] = None, device_ids: List[str] = [], page_size: Optional[int] = None, max_pages: Optional[int] = None, query_overrides: dict = {}, auth_args=<phc.easy.auth.Auth object>, ignore_cache: bool = False, expand_args: dict = {}, log: bool = False, term: Optional[dict] = None, terms: List[dict] = [], max_terms: int = 30000, code: Union[str, List[str], None] = None, display: Union[str, List[str], None] = None, system: Union[str, List[str], None] = None, code_fields: List[str] = [])

Inherited from: FhirServicePatientItem.get_data_frame

Retrieve records …

def table_name()

Inherited from: FhirServicePatientItem.table_name

Returns the FSS table name for retrieval

Expand source code
@staticmethod
def table_name():
    return "observation"
def transform_results(data_frame: pandas.core.frame.DataFrame, **expand_args)

Inherited from: FhirServicePatientItem.transform_results

Transform data frame batch

Expand source code
@staticmethod
def transform_results(data_frame: pd.DataFrame, **expand_args):
    args = {
        **expand_args,
        "code_columns": [
            *expand_args.get("code_columns", []),
            "component",
            "interpretation",
        ],
        "custom_columns": [
            *expand_args.get("custom_columns", []),
            Frame.codeable_like_column_expander("subject"),
            Frame.codeable_like_column_expander("related"),
            Frame.codeable_like_column_expander("performer"),
            Frame.codeable_like_column_expander("context"),
        ],
    }

    return Frame.expand(data_frame, **args)
class Ocr
Expand source code
class Ocr:
    Config = Config
    Suggestion = Suggestion
    Document = Document
    DocumentComposition = DocumentComposition
    Block = Block

    @staticmethod
    def upload(
        source: str, folder="ocr-uploads", auth_args: Auth = Auth.shared()
    ):
        """Upload a file from a path to the ocr directory (defaults to 'ocr-uploads')"""
        auth = Auth(auth_args)
        files = Files(auth.session())
        filename = source.split("/")[-1]

        return files.upload(
            auth.project_id, source, file_name=f"/{folder}/{filename}"
        ).data

    @staticmethod
    def upload_and_run(
        source: str,
        folder="ocr-uploads",
        auth_args: Auth = Auth.shared(),
        **document_kw_args,
    ):
        """Upload a document and run PrecisionOCR

        Returns the DocumentReference
        """
        auth = Auth(auth_args)

        file_id = Ocr.upload(source, folder=folder, auth_args=auth)["id"]
        return Ocr.run(file_id, auth_args=auth, **document_kw_args)

    @staticmethod
    def run(
        file_id: str,
        auth_args: Auth = Auth.shared(),
        pause_time=1,
        **document_kw_args,
    ):
        """Run PrecisionOCR on a specific file id

        Returns the DocumentReference
        """
        auth = Auth(auth_args)
        client = BaseClient(auth.session())

        response = client._api_call(
            "ocr/documents",
            json={"project": auth.project_id, "fileId": file_id},
        )

        document_reference_id = response.data["documentReferenceId"]

        # Unfortunately, we just have to wait for it to be in FSS
        sleep(pause_time)

        return Document.get(
            id=document_reference_id, auth_args=auth_args, **document_kw_args
        )

Class variables

var Block
var Config
var Document

Provides an abstract class and/or static methods for retrieving items from a FSS table that relates to a patient

var DocumentComposition

Provides an abstract class and/or static methods for retrieving items from a FSS table that relates to a patient

var Suggestion

Provides an abstract class and/or static methods for retrieving items from a FSS table that relates to a patient

Static methods

def run(file_id: str, auth_args: Auth = <phc.easy.auth.Auth object>, pause_time=1, **document_kw_args)

Run PrecisionOCR on a specific file id

Returns the DocumentReference

Expand source code
@staticmethod
def run(
    file_id: str,
    auth_args: Auth = Auth.shared(),
    pause_time=1,
    **document_kw_args,
):
    """Run PrecisionOCR on a specific file id

    Returns the DocumentReference
    """
    auth = Auth(auth_args)
    client = BaseClient(auth.session())

    response = client._api_call(
        "ocr/documents",
        json={"project": auth.project_id, "fileId": file_id},
    )

    document_reference_id = response.data["documentReferenceId"]

    # Unfortunately, we just have to wait for it to be in FSS
    sleep(pause_time)

    return Document.get(
        id=document_reference_id, auth_args=auth_args, **document_kw_args
    )
def upload(source: str, folder='ocr-uploads', auth_args: Auth = <phc.easy.auth.Auth object>)

Upload a file from a path to the ocr directory (defaults to 'ocr-uploads')

Expand source code
@staticmethod
def upload(
    source: str, folder="ocr-uploads", auth_args: Auth = Auth.shared()
):
    """Upload a file from a path to the ocr directory (defaults to 'ocr-uploads')"""
    auth = Auth(auth_args)
    files = Files(auth.session())
    filename = source.split("/")[-1]

    return files.upload(
        auth.project_id, source, file_name=f"/{folder}/{filename}"
    ).data
def upload_and_run(source: str, folder='ocr-uploads', auth_args: Auth = <phc.easy.auth.Auth object>, **document_kw_args)

Upload a document and run PrecisionOCR

Returns the DocumentReference

Expand source code
@staticmethod
def upload_and_run(
    source: str,
    folder="ocr-uploads",
    auth_args: Auth = Auth.shared(),
    **document_kw_args,
):
    """Upload a document and run PrecisionOCR

    Returns the DocumentReference
    """
    auth = Auth(auth_args)

    file_id = Ocr.upload(source, folder=folder, auth_args=auth)["id"]
    return Ocr.run(file_id, auth_args=auth, **document_kw_args)
class Option

Class that references all available API options

Expand source code
class Option:
    """Class that references all available API options"""

    # Omics
    GenomicVariantInclude = GenomicVariantInclude
    GenomicCopyNumberVariantOptions = GenomicCopyNumberVariantOptions
    GenomicShortVariantOptions = GenomicShortVariantOptions
    GenomicTestType = GenomicTestType
    GenomicTestStatus = GenomicTestStatus
    CodingEffect = CodingEffect
    Chromosome = Chromosome
    ClinVarSignificance = ClinVarSignificance
    ClinVarReview = ClinVarReview
    GeneClass = GeneClass
    Zygosity = Zygosity
    CopyNumberStatus = CopyNumberStatus
    InFrame = InFrame
    StructuralType = StructuralType

    # OCR
    OcrConfig = OcrConfig

    # Summary APIs
    SummaryItemCountsOptions = SummaryItemCountsOptions
    SummaryOmicsType = SummaryOmicsType
    SummaryClinicalType = SummaryClinicalType
    SummaryClinicalCountsOptions = SummaryClinicalCountsOptions

Class variables

var Chromosome

An enumeration.

var ClinVarReview

An enumeration.

var ClinVarSignificance

An enumeration.

var CodingEffect

An enumeration.

var CopyNumberStatus

An enumeration.

var GeneClass

An enumeration.

var GenomicCopyNumberVariantOptions

Options to pass to /v1/genomics/copy-numbers

var GenomicShortVariantOptions

Options to pass to /v1/genomics/variants

var GenomicTestStatus

An enumeration.

var GenomicTestType

An enumeration.

var GenomicVariantInclude

An enumeration.

var InFrame

An enumeration.

var OcrConfig
var StructuralType

An enumeration.

var SummaryClinicalCountsOptions
var SummaryClinicalType

An enumeration.

var SummaryItemCountsOptions
var SummaryOmicsType

An enumeration.

var Zygosity

An enumeration.

class Organization

Provides an abstract class and/or static methods for retrieving items from a FSS table

Expand source code
class Organization(FhirServiceItem):
    @staticmethod
    def table_name():
        return "organization"

    @staticmethod
    def code_fields():
        return ["type.coding", "meta.tag"]

    @staticmethod
    def transform_results(df: pd.DataFrame, **expand_args):
        return Frame.expand(
            df,
            code_columns=[*expand_args.get("code_columns", []), "type"],
            custom_columns=[
                *expand_args.get("custom_columns", []),
                Frame.codeable_like_column_expander("subject"),
            ],
        )

Ancestors

Static methods

def code_fields()

Inherited from: FhirServiceItem.code_fields

Returns the code keys (e.g. when searching for codes)

Expand source code
@staticmethod
def code_fields():
    return ["type.coding", "meta.tag"]
def get_codes(display_query: Optional[str] = None, sample_size: Optional[int] = None, exclude_meta_tag=True, **kwargs)

Inherited from: FhirServiceItem.get_codes

Find all codes …

def get_count(query_overrides: dict = {}, auth_args=<phc.easy.auth.Auth object>)

Inherited from: FhirServiceItem.get_count

Get the count for a given FSS query

def get_count_by_field(field: str, **kwargs)

Inherited from: FhirServiceItem.get_count_by_field

Count records by a given field …

def get_data_frame(all_results: bool = False, raw: bool = False, page_size: Optional[int] = None, max_pages: Optional[int] = None, query_overrides: dict = {}, auth_args=<phc.easy.auth.Auth object>, ignore_cache: bool = False, expand_args: dict = {}, log: bool = False, id: Optional[str] = None, ids: List[str] = [], term: Optional[dict] = None, terms: List[dict] = [], max_terms: int = 30000, code: Union[str, List[str], None] = None, display: Union[str, List[str], None] = None, system: Union[str, List[str], None] = None, code_fields: List[str] = [])

Inherited from: FhirServiceItem.get_data_frame

Retrieve records …

def table_name()

Inherited from: FhirServiceItem.table_name

Returns the FSS table name for retrieval

Expand source code
@staticmethod
def table_name():
    return "organization"
def transform_results(df: pandas.core.frame.DataFrame, **expand_args)

Inherited from: FhirServiceItem.transform_results

Transform data frame batch

Expand source code
@staticmethod
def transform_results(df: pd.DataFrame, **expand_args):
    return Frame.expand(
        df,
        code_columns=[*expand_args.get("code_columns", []), "type"],
        custom_columns=[
            *expand_args.get("custom_columns", []),
            Frame.codeable_like_column_expander("subject"),
        ],
    )
class Patient

Provides an abstract class and/or static methods for retrieving items from a FSS table

Expand source code
class Patient(FhirServiceItem):
    @staticmethod
    def table_name():
        return "patient"

    @staticmethod
    def code_fields():
        return [
            "extension.valueCodeableConcept.coding",
            "identifier.type.coding",
            "maritalStatus.coding",
            "meta.tag",
        ]

    @staticmethod
    def transform_results(data_frame: pd.DataFrame, **expand_args):
        args = {
            **expand_args,
            "code_columns": [
                *expand_args.get("code_columns", []),
                "contained",
                "maritalStatus",
            ],
            "custom_columns": [
                *expand_args.get("custom_columns", []),
                Frame.codeable_like_column_expander("managingOrganization"),
                ("address", expand_address_column),
                ("name", expand_name_column),
            ],
        }

        return Frame.expand(data_frame, **args)

Ancestors

Static methods

def code_fields()

Inherited from: FhirServiceItem.code_fields

Returns the code keys (e.g. when searching for codes)

Expand source code
@staticmethod
def code_fields():
    return [
        "extension.valueCodeableConcept.coding",
        "identifier.type.coding",
        "maritalStatus.coding",
        "meta.tag",
    ]
def get_codes(display_query: Optional[str] = None, sample_size: Optional[int] = None, exclude_meta_tag=True, **kwargs)

Inherited from: FhirServiceItem.get_codes

Find all codes …

def get_count(query_overrides: dict = {}, auth_args=<phc.easy.auth.Auth object>)

Inherited from: FhirServiceItem.get_count

Get the count for a given FSS query

def get_count_by_field(field: str, **kwargs)

Inherited from: FhirServiceItem.get_count_by_field

Count records by a given field …

def get_data_frame(all_results: bool = False, raw: bool = False, page_size: Optional[int] = None, max_pages: Optional[int] = None, query_overrides: dict = {}, auth_args=<phc.easy.auth.Auth object>, ignore_cache: bool = False, expand_args: dict = {}, log: bool = False, id: Optional[str] = None, ids: List[str] = [], term: Optional[dict] = None, terms: List[dict] = [], max_terms: int = 30000, code: Union[str, List[str], None] = None, display: Union[str, List[str], None] = None, system: Union[str, List[str], None] = None, code_fields: List[str] = [])

Inherited from: FhirServiceItem.get_data_frame

Retrieve records …

def table_name()

Inherited from: FhirServiceItem.table_name

Returns the FSS table name for retrieval

Expand source code
@staticmethod
def table_name():
    return "patient"
def transform_results(data_frame: pandas.core.frame.DataFrame, **expand_args)

Inherited from: FhirServiceItem.transform_results

Transform data frame batch

Expand source code
@staticmethod
def transform_results(data_frame: pd.DataFrame, **expand_args):
    args = {
        **expand_args,
        "code_columns": [
            *expand_args.get("code_columns", []),
            "contained",
            "maritalStatus",
        ],
        "custom_columns": [
            *expand_args.get("custom_columns", []),
            Frame.codeable_like_column_expander("managingOrganization"),
            ("address", expand_address_column),
            ("name", expand_name_column),
        ],
    }

    return Frame.expand(data_frame, **args)
class Person

Provides an abstract class and/or static methods for retrieving items from a FSS table that relates to a patient

Expand source code
class Person(FhirServicePatientItem):
    @staticmethod
    def table_name():
        return "person"

    @staticmethod
    def patient_key():
        return "link.target.reference"

    @staticmethod
    def code_fields():
        return ["meta.tag"]

    @staticmethod
    def transform_results(df: pd.DataFrame, **expand_args):
        return Frame.expand(
            df,
            code_columns=[*expand_args.get("code_columns", []), "link"],
            custom_columns=[
                *expand_args.get("custom_columns", []),
                Frame.codeable_like_column_expander("subject"),
            ],
        )

Ancestors

Static methods

def code_fields()

Inherited from: FhirServicePatientItem.code_fields

Returns the code keys (e.g. when searching for codes)

Expand source code
@staticmethod
def code_fields():
    return ["meta.tag"]
def get_codes(display_query: Optional[str] = None, sample_size: Optional[int] = None, exclude_meta_tag=True, **kwargs)

Inherited from: FhirServicePatientItem.get_codes

Find all codes …

def get_count(query_overrides: dict = {}, auth_args=<phc.easy.auth.Auth object>)

Inherited from: FhirServicePatientItem.get_count

Get the count for a given FSS query

def get_count_by_field(field: str, **kwargs)

Inherited from: FhirServicePatientItem.get_count_by_field

Count records by a given field …

def get_count_by_patient(**kwargs)

Inherited from: FhirServicePatientItem.get_count_by_patient

Count records by a given field …

def get_data_frame(all_results: bool = False, raw: bool = False, id: Optional[str] = None, ids: List[str] = [], patient_id: Optional[str] = None, patient_ids: List[str] = [], device_id: Optional[str] = None, device_ids: List[str] = [], page_size: Optional[int] = None, max_pages: Optional[int] = None, query_overrides: dict = {}, auth_args=<phc.easy.auth.Auth object>, ignore_cache: bool = False, expand_args: dict = {}, log: bool = False, term: Optional[dict] = None, terms: List[dict] = [], max_terms: int = 30000, code: Union[str, List[str], None] = None, display: Union[str, List[str], None] = None, system: Union[str, List[str], None] = None, code_fields: List[str] = [])

Inherited from: FhirServicePatientItem.get_data_frame

Retrieve records …

def patient_key()
Expand source code
@staticmethod
def patient_key():
    return "link.target.reference"
def table_name()

Inherited from: FhirServicePatientItem.table_name

Returns the FSS table name for retrieval

Expand source code
@staticmethod
def table_name():
    return "person"
def transform_results(df: pandas.core.frame.DataFrame, **expand_args)

Inherited from: FhirServicePatientItem.transform_results

Transform data frame batch

Expand source code
@staticmethod
def transform_results(df: pd.DataFrame, **expand_args):
    return Frame.expand(
        df,
        code_columns=[*expand_args.get("code_columns", []), "link"],
        custom_columns=[
            *expand_args.get("custom_columns", []),
            Frame.codeable_like_column_expander("subject"),
        ],
    )
class Practitioner

Provides an abstract class and/or static methods for retrieving items from a FSS table

Expand source code
class Practitioner(FhirServiceItem):
    @staticmethod
    def table_name():
        return "practitioner"

    @staticmethod
    def code_fields():
        return ["meta.tag"]

    @staticmethod
    def transform_results(df: pd.DataFrame, **expand_args):
        return Frame.expand(
            df,
            custom_columns=[
                *expand_args.get("custom_columns", []),
                ("name", expand_name_column),
            ],
        )

Ancestors

Static methods

def code_fields()

Inherited from: FhirServiceItem.code_fields

Returns the code keys (e.g. when searching for codes)

Expand source code
@staticmethod
def code_fields():
    return ["meta.tag"]
def get_codes(display_query: Optional[str] = None, sample_size: Optional[int] = None, exclude_meta_tag=True, **kwargs)

Inherited from: FhirServiceItem.get_codes

Find all codes …

def get_count(query_overrides: dict = {}, auth_args=<phc.easy.auth.Auth object>)

Inherited from: FhirServiceItem.get_count

Get the count for a given FSS query

def get_count_by_field(field: str, **kwargs)

Inherited from: FhirServiceItem.get_count_by_field

Count records by a given field …

def get_data_frame(all_results: bool = False, raw: bool = False, page_size: Optional[int] = None, max_pages: Optional[int] = None, query_overrides: dict = {}, auth_args=<phc.easy.auth.Auth object>, ignore_cache: bool = False, expand_args: dict = {}, log: bool = False, id: Optional[str] = None, ids: List[str] = [], term: Optional[dict] = None, terms: List[dict] = [], max_terms: int = 30000, code: Union[str, List[str], None] = None, display: Union[str, List[str], None] = None, system: Union[str, List[str], None] = None, code_fields: List[str] = [])

Inherited from: FhirServiceItem.get_data_frame

Retrieve records …

def table_name()

Inherited from: FhirServiceItem.table_name

Returns the FSS table name for retrieval

Expand source code
@staticmethod
def table_name():
    return "practitioner"
def transform_results(df: pandas.core.frame.DataFrame, **expand_args)

Inherited from: FhirServiceItem.transform_results

Transform data frame batch

Expand source code
@staticmethod
def transform_results(df: pd.DataFrame, **expand_args):
    return Frame.expand(
        df,
        custom_columns=[
            *expand_args.get("custom_columns", []),
            ("name", expand_name_column),
        ],
    )
class Procedure

Provides an abstract class and/or static methods for retrieving items from a FSS table that relates to a patient

Expand source code
class Procedure(FhirServicePatientItem):
    @staticmethod
    def table_name():
        return "procedure"

    @staticmethod
    def code_fields():
        return ["meta.tag", "code.coding", "category.coding"]

    @classmethod
    def get_codes(cls, query: Optional[str] = None):
        """Find codes based on case-insensitive matching of code/display/system

        Example
        --------
        >>> import phc.easy as phc
        >>> phc.Auth.set({'account': '<your-account-name>'})
        >>> phc.Project.set_current('My Project Name')
        >>>
        >>> phc.Observation.get_codes("loinc")
        """
        return search(
            SummaryItemCounts.get_data_frame(cls.table_name()), query=query
        )

    @staticmethod
    def transform_results(data_frame: pd.DataFrame, **expand_args):
        args = {
            **expand_args,
            "date_columns": [
                *expand_args.get("date_columns", []),
                "performedPeriod.start",
                "performedPeriod.end",
            ],
            "custom_columns": [
                *expand_args.get("custom_columns", []),
                Frame.codeable_like_column_expander("subject"),
                Frame.codeable_like_column_expander("performedPeriod"),
                Frame.codeable_like_column_expander("context"),
                Frame.codeable_like_column_expander("managingOrganization"),
            ],
        }

        return Frame.expand(data_frame, **args)

Ancestors

Static methods

def code_fields()

Inherited from: FhirServicePatientItem.code_fields

Returns the code keys (e.g. when searching for codes)

Expand source code
@staticmethod
def code_fields():
    return ["meta.tag", "code.coding", "category.coding"]
def get_codes(query: Optional[str] = None)

Find codes based on case-insensitive matching of code/display/system

Example

>>> import phc.easy as phc
>>> phc.Auth.set({'account': '<your-account-name>'})
>>> phc.Project.set_current('My Project Name')
>>>
>>> phc.Observation.get_codes("loinc")
Expand source code
@classmethod
def get_codes(cls, query: Optional[str] = None):
    """Find codes based on case-insensitive matching of code/display/system

    Example
    --------
    >>> import phc.easy as phc
    >>> phc.Auth.set({'account': '<your-account-name>'})
    >>> phc.Project.set_current('My Project Name')
    >>>
    >>> phc.Observation.get_codes("loinc")
    """
    return search(
        SummaryItemCounts.get_data_frame(cls.table_name()), query=query
    )
def get_count(query_overrides: dict = {}, auth_args=<phc.easy.auth.Auth object>)

Inherited from: FhirServicePatientItem.get_count

Get the count for a given FSS query

def get_count_by_field(field: str, **kwargs)

Inherited from: FhirServicePatientItem.get_count_by_field

Count records by a given field …

def get_count_by_patient(**kwargs)

Inherited from: FhirServicePatientItem.get_count_by_patient

Count records by a given field …

def get_data_frame(all_results: bool = False, raw: bool = False, id: Optional[str] = None, ids: List[str] = [], patient_id: Optional[str] = None, patient_ids: List[str] = [], device_id: Optional[str] = None, device_ids: List[str] = [], page_size: Optional[int] = None, max_pages: Optional[int] = None, query_overrides: dict = {}, auth_args=<phc.easy.auth.Auth object>, ignore_cache: bool = False, expand_args: dict = {}, log: bool = False, term: Optional[dict] = None, terms: List[dict] = [], max_terms: int = 30000, code: Union[str, List[str], None] = None, display: Union[str, List[str], None] = None, system: Union[str, List[str], None] = None, code_fields: List[str] = [])

Inherited from: FhirServicePatientItem.get_data_frame

Retrieve records …

def table_name()

Inherited from: FhirServicePatientItem.table_name

Returns the FSS table name for retrieval

Expand source code
@staticmethod
def table_name():
    return "procedure"
def transform_results(data_frame: pandas.core.frame.DataFrame, **expand_args)

Inherited from: FhirServicePatientItem.transform_results

Transform data frame batch

Expand source code
@staticmethod
def transform_results(data_frame: pd.DataFrame, **expand_args):
    args = {
        **expand_args,
        "date_columns": [
            *expand_args.get("date_columns", []),
            "performedPeriod.start",
            "performedPeriod.end",
        ],
        "custom_columns": [
            *expand_args.get("custom_columns", []),
            Frame.codeable_like_column_expander("subject"),
            Frame.codeable_like_column_expander("performedPeriod"),
            Frame.codeable_like_column_expander("context"),
            Frame.codeable_like_column_expander("managingOrganization"),
        ],
    }

    return Frame.expand(data_frame, **args)
class ProcedureRequest

Provides an abstract class and/or static methods for retrieving items from a FSS table that relates to a patient

Expand source code
class ProcedureRequest(FhirServicePatientItem):
    @staticmethod
    def table_name():
        return "procedure_request"

    @staticmethod
    def code_fields():
        return ["code.coding", "meta.tag"]

    @staticmethod
    def transform_results(df: pd.DataFrame, **expand_args):
        return Frame.expand(
            df,
            date_columns=[
                *expand_args.get("date_columns", []),
                "occurrencePeriod.start",
                "occurrencePeriod.end",
                "occurrenceDateTime",
                "authoredOn",
            ],
            custom_columns=[
                *expand_args.get("custom_columns", []),
                Frame.codeable_like_column_expander("subject"),
                Frame.codeable_like_column_expander("context"),
                Frame.codeable_like_column_expander("occurrencePeriod"),
                Frame.codeable_like_column_expander("note"),
            ],
        )

Ancestors

Static methods

def code_fields()

Inherited from: FhirServicePatientItem.code_fields

Returns the code keys (e.g. when searching for codes)

Expand source code
@staticmethod
def code_fields():
    return ["code.coding", "meta.tag"]
def get_codes(display_query: Optional[str] = None, sample_size: Optional[int] = None, exclude_meta_tag=True, **kwargs)

Inherited from: FhirServicePatientItem.get_codes

Find all codes …

def get_count(query_overrides: dict = {}, auth_args=<phc.easy.auth.Auth object>)

Inherited from: FhirServicePatientItem.get_count

Get the count for a given FSS query

def get_count_by_field(field: str, **kwargs)

Inherited from: FhirServicePatientItem.get_count_by_field

Count records by a given field …

def get_count_by_patient(**kwargs)

Inherited from: FhirServicePatientItem.get_count_by_patient

Count records by a given field …

def get_data_frame(all_results: bool = False, raw: bool = False, id: Optional[str] = None, ids: List[str] = [], patient_id: Optional[str] = None, patient_ids: List[str] = [], device_id: Optional[str] = None, device_ids: List[str] = [], page_size: Optional[int] = None, max_pages: Optional[int] = None, query_overrides: dict = {}, auth_args=<phc.easy.auth.Auth object>, ignore_cache: bool = False, expand_args: dict = {}, log: bool = False, term: Optional[dict] = None, terms: List[dict] = [], max_terms: int = 30000, code: Union[str, List[str], None] = None, display: Union[str, List[str], None] = None, system: Union[str, List[str], None] = None, code_fields: List[str] = [])

Inherited from: FhirServicePatientItem.get_data_frame

Retrieve records …

def table_name()

Inherited from: FhirServicePatientItem.table_name

Returns the FSS table name for retrieval

Expand source code
@staticmethod
def table_name():
    return "procedure_request"
def transform_results(df: pandas.core.frame.DataFrame, **expand_args)

Inherited from: FhirServicePatientItem.transform_results

Transform data frame batch

Expand source code
@staticmethod
def transform_results(df: pd.DataFrame, **expand_args):
    return Frame.expand(
        df,
        date_columns=[
            *expand_args.get("date_columns", []),
            "occurrencePeriod.start",
            "occurrencePeriod.end",
            "occurrenceDateTime",
            "authoredOn",
        ],
        custom_columns=[
            *expand_args.get("custom_columns", []),
            Frame.codeable_like_column_expander("subject"),
            Frame.codeable_like_column_expander("context"),
            Frame.codeable_like_column_expander("occurrencePeriod"),
            Frame.codeable_like_column_expander("note"),
        ],
    )
class Project
Expand source code
class Project(PagingApiItem):
    @staticmethod
    def resource_path():
        return "projects"

    @staticmethod
    def params_class():
        return ProjectListOptions

    @classmethod
    @memoize
    def get_data_frame(
        cls,
        name: Optional[str] = None,
        auth_args: Auth = Auth.shared(),
        max_pages: Optional[int] = None,
        page_size: Optional[int] = None,
        log: bool = False,
        show_progress: bool = False,
        account: Optional[str] = None,
    ):
        """Execute a request for projects

        ## Parameters

        Query: `phc.easy.projects.ProjectListOptions`

        Execution: `phc.easy.query.Query.execute_paging_api`
        """

        if page_size is None:
            # Projects do not have much data so use a higher page size
            page_size = 100

        get_data_frame = super().get_data_frame

        auth = Auth(auth_args)

        get_data_frame_args = without_keys(
            cls._get_current_args(inspect.currentframe(), locals()),
            ["auth_args", "account", "show_progress"],
        )

        def get_projects_for_account(account: dict):
            try:
                df = get_data_frame(
                    ignore_cache=True,
                    all_results=max_pages is None,
                    auth_args=auth.customized({"account": account["id"]}),
                    show_progress=show_progress,
                    **get_data_frame_args,
                )
                df["account"] = account["id"]

                return df

            except ApiError as e:
                message = e.response.get("error", "Unknown API error")
                print(f"Skipping \"{account['id']}\" due to \"{message}\"")

                return pd.DataFrame()

        if account:
            return get_projects_for_account({"id": account})

        return pd.concat(
            list(pmap(get_projects_for_account, auth.accounts()))
        ).reset_index(drop=True)

    @staticmethod
    def find(
        search: str,
        account: Optional[str] = None,
        auth_args: Auth = Auth.shared(),
    ):
        """Search for a project using given criteria and return results as a data frame

        Attributes
        ----------
        search : str
            Part of a project's id, name, or description to search for

        auth_args : Any
            The authenication to use for the account and project (defaults to shared)
        """
        projects = Project.get_data_frame(auth_args=auth_args, account=account)
        text = projects[SEARCH_COLUMNS].agg(join_strings, axis=1)
        return projects[text.str.contains(search.lower())]

    @staticmethod
    def set_current(
        search: str, account: Optional[str] = None, auth: Auth = Auth.shared()
    ):
        """Search for a project using given criteria, set it to the authentication
        object, and return the matching projects as a data frame

        Attributes
        ----------
        search : str
            Part of a project's id, name, or description to search for

        auth : Auth
            The authenication to update for the account and project (defaults to shared)
        """
        matches = Project.find(search, account=account, auth_args=auth)

        if len(matches) > 1:
            print("Multiple projects found. Try a more specific search")
        elif len(matches) == 0:
            print(f'No matches found for search "{search}"')
        else:
            project = matches.iloc[0]
            # Uses private method since this is a special case
            auth.update({"account": project.account, "project_id": project.id})

        return matches

Ancestors

Static methods

def find(search: str, account: Optional[str] = None, auth_args: Auth = <phc.easy.auth.Auth object>)

Search for a project using given criteria and return results as a data frame

Attributes

search : str
Part of a project's id, name, or description to search for
auth_args : Any
The authenication to use for the account and project (defaults to shared)
Expand source code
@staticmethod
def find(
    search: str,
    account: Optional[str] = None,
    auth_args: Auth = Auth.shared(),
):
    """Search for a project using given criteria and return results as a data frame

    Attributes
    ----------
    search : str
        Part of a project's id, name, or description to search for

    auth_args : Any
        The authenication to use for the account and project (defaults to shared)
    """
    projects = Project.get_data_frame(auth_args=auth_args, account=account)
    text = projects[SEARCH_COLUMNS].agg(join_strings, axis=1)
    return projects[text.str.contains(search.lower())]
def get_data_frame(cls, name: Optional[str] = None, auth_args: Auth = <phc.easy.auth.Auth object>, max_pages: Optional[int] = None, page_size: Optional[int] = None, log: bool = False, show_progress: bool = False, account: Optional[str] = None)

Execute a request for projects

Parameters

Query: ProjectListOptions

Execution: Query.execute_paging_api()

Expand source code
@classmethod
@memoize
def get_data_frame(
    cls,
    name: Optional[str] = None,
    auth_args: Auth = Auth.shared(),
    max_pages: Optional[int] = None,
    page_size: Optional[int] = None,
    log: bool = False,
    show_progress: bool = False,
    account: Optional[str] = None,
):
    """Execute a request for projects

    ## Parameters

    Query: `phc.easy.projects.ProjectListOptions`

    Execution: `phc.easy.query.Query.execute_paging_api`
    """

    if page_size is None:
        # Projects do not have much data so use a higher page size
        page_size = 100

    get_data_frame = super().get_data_frame

    auth = Auth(auth_args)

    get_data_frame_args = without_keys(
        cls._get_current_args(inspect.currentframe(), locals()),
        ["auth_args", "account", "show_progress"],
    )

    def get_projects_for_account(account: dict):
        try:
            df = get_data_frame(
                ignore_cache=True,
                all_results=max_pages is None,
                auth_args=auth.customized({"account": account["id"]}),
                show_progress=show_progress,
                **get_data_frame_args,
            )
            df["account"] = account["id"]

            return df

        except ApiError as e:
            message = e.response.get("error", "Unknown API error")
            print(f"Skipping \"{account['id']}\" due to \"{message}\"")

            return pd.DataFrame()

    if account:
        return get_projects_for_account({"id": account})

    return pd.concat(
        list(pmap(get_projects_for_account, auth.accounts()))
    ).reset_index(drop=True)
def params_class()

Inherited from: PagingApiItem.params_class

Returns a pydantic type that validates and transforms the params with dict()

Expand source code
@staticmethod
def params_class():
    return ProjectListOptions
def process_params(params: dict) ‑> dict

Inherited from: PagingApiItem.process_params

Validates and transforms the API query parameters

def resource_path()

Inherited from: PagingApiItem.resource_path

Returns the API url name for retrieval

Expand source code
@staticmethod
def resource_path():
    return "projects"
def set_current(search: str, account: Optional[str] = None, auth: Auth = <phc.easy.auth.Auth object>)

Search for a project using given criteria, set it to the authentication object, and return the matching projects as a data frame

Attributes

search : str
Part of a project's id, name, or description to search for
auth : Auth
The authenication to update for the account and project (defaults to shared)
Expand source code
@staticmethod
def set_current(
    search: str, account: Optional[str] = None, auth: Auth = Auth.shared()
):
    """Search for a project using given criteria, set it to the authentication
    object, and return the matching projects as a data frame

    Attributes
    ----------
    search : str
        Part of a project's id, name, or description to search for

    auth : Auth
        The authenication to update for the account and project (defaults to shared)
    """
    matches = Project.find(search, account=account, auth_args=auth)

    if len(matches) > 1:
        print("Multiple projects found. Try a more specific search")
    elif len(matches) == 0:
        print(f'No matches found for search "{search}"')
    else:
        project = matches.iloc[0]
        # Uses private method since this is a special case
        auth.update({"account": project.account, "project_id": project.id})

    return matches
def transform_results(data_frame: pandas.core.frame.DataFrame, **expand_args)

Inherited from: PagingApiItem.transform_results

Transform data frame batch

class Provenance

Provides an abstract class and/or static methods for retrieving items from a FSS table that relates to a patient

Expand source code
class Provenance(FhirServicePatientItem):
    @staticmethod
    def table_name():
        return "provenance"

    @staticmethod
    def patient_key():
        """Patient relationship is based on who signed this provenance"""
        return "signature.whoReference.reference"

    @staticmethod
    def code_fields():
        return ["signature.type", "agent.role.coding", "meta.tag"]

    @staticmethod
    def transform_results(df: pd.DataFrame, **expand_args):
        return Frame.expand(
            df,
            date_columns=[
                *expand_args.get("date_columns", []),
                "recorded",
                "signature.when",
            ],
            code_columns=[
                *expand_args.get("code_columns", []),
                "agent",
                "signature",
            ],
        )

Ancestors

Static methods

def code_fields()

Inherited from: FhirServicePatientItem.code_fields

Returns the code keys (e.g. when searching for codes)

Expand source code
@staticmethod
def code_fields():
    return ["signature.type", "agent.role.coding", "meta.tag"]
def get_codes(display_query: Optional[str] = None, sample_size: Optional[int] = None, exclude_meta_tag=True, **kwargs)

Inherited from: FhirServicePatientItem.get_codes

Find all codes …

def get_count(query_overrides: dict = {}, auth_args=<phc.easy.auth.Auth object>)

Inherited from: FhirServicePatientItem.get_count

Get the count for a given FSS query

def get_count_by_field(field: str, **kwargs)

Inherited from: FhirServicePatientItem.get_count_by_field

Count records by a given field …

def get_count_by_patient(**kwargs)

Inherited from: FhirServicePatientItem.get_count_by_patient

Count records by a given field …

def get_data_frame(all_results: bool = False, raw: bool = False, id: Optional[str] = None, ids: List[str] = [], patient_id: Optional[str] = None, patient_ids: List[str] = [], device_id: Optional[str] = None, device_ids: List[str] = [], page_size: Optional[int] = None, max_pages: Optional[int] = None, query_overrides: dict = {}, auth_args=<phc.easy.auth.Auth object>, ignore_cache: bool = False, expand_args: dict = {}, log: bool = False, term: Optional[dict] = None, terms: List[dict] = [], max_terms: int = 30000, code: Union[str, List[str], None] = None, display: Union[str, List[str], None] = None, system: Union[str, List[str], None] = None, code_fields: List[str] = [])

Inherited from: FhirServicePatientItem.get_data_frame

Retrieve records …

def patient_key()

Patient relationship is based on who signed this provenance

Expand source code
@staticmethod
def patient_key():
    """Patient relationship is based on who signed this provenance"""
    return "signature.whoReference.reference"
def table_name()

Inherited from: FhirServicePatientItem.table_name

Returns the FSS table name for retrieval

Expand source code
@staticmethod
def table_name():
    return "provenance"
def transform_results(df: pandas.core.frame.DataFrame, **expand_args)

Inherited from: FhirServicePatientItem.transform_results

Transform data frame batch

Expand source code
@staticmethod
def transform_results(df: pd.DataFrame, **expand_args):
    return Frame.expand(
        df,
        date_columns=[
            *expand_args.get("date_columns", []),
            "recorded",
            "signature.when",
        ],
        code_columns=[
            *expand_args.get("code_columns", []),
            "agent",
            "signature",
        ],
    )
class Query
Expand source code
class Query:
    @staticmethod
    def find_count_of_dsl_query(query: dict, auth_args: Auth = Auth.shared()):
        """Find count of a given dsl query

        See https://devcenter.docs.lifeomic.com/development/fhir-service/dsl

        Attributes
        ----------
        query : dict
            The FHIR query to run a count against

        auth_args : Auth, dict
            Additional arguments for authentication

        Examples
        --------
        >>> import phc.easy as phc
        >>> phc.Auth.set({ 'account': '<your-account-name>' })
        >>> phc.Project.set_current('My Project Name')
        >>> phc.Query.find_count_of_dsl_query({
          "type": "select",
          "columns": "*",
          "from": [{"table": "patient"}],
        })
        """
        if FhirAggregation.is_aggregation_query(query):
            raise ValueError("Count is not support for aggregation queries.")

        auth = Auth(auth_args)
        fhir = Fhir(auth.session())

        response = fhir.execute_es(
            auth.project_id, build_queries(query, page_size=1)[0], scroll="true"
        )

        return response.data["hits"]["total"]["value"]

    @staticmethod
    def execute_fhir_dsl(
        query: dict,
        all_results: bool = False,
        auth_args: Auth = Auth.shared(),
        callback: Union[Callable[[Any, bool], None], None] = None,
        max_pages: Union[int, None] = None,
        log: bool = False,
        **query_kwargs,
    ):
        """Execute a FHIR query with the DSL

        See https://devcenter.docs.lifeomic.com/development/fhir-service/dsl

        Attributes
        ----------
        query : dict
            The FHIR query to run (is a superset of elasticsearch)

        all_results : bool
            Return all results by scrolling through mutliple pages of data
            (Limit is ignored if provided)

        auth_args : Auth, dict
            Additional arguments for authentication

        callback : Callable[[Any, bool], None] (optional)
            A progress function that is invoked for each batch. When the second
            argument passed is true, then the result of the callback function is
            used as the return value. This is useful if writing results out to a
            file and then returning the completed result from that file.

            Example:

                def handle_batch(batch, is_finished):
                    print(len(batch))
                    if is_finished:
                        return "batch finished

        max_pages : int
            The number of pages to retrieve (useful if working with tons of records)

        log : bool = False
            Whether to log the elasticsearch query sent to the server

        query_kwargs : dict
            Arguments to pass to build_queries such as patient_id, patient_ids,
            and patient_key. (See phc.easy.query.fhir_dsl_query.build_queries)

        Examples
        --------
        >>> import phc.easy as phc
        >>> phc.Auth.set({ 'account': '<your-account-name>' })
        >>> phc.Project.set_current('My Project Name')
        >>> phc.Query.execute_fhir_dsl({
          "type": "select",
          "columns": "*",
          "from": [
              {"table": "patient"}
          ],
        }, all_results=True)

        """
        queries = build_queries(query, **query_kwargs)

        if log:
            print(json.dumps(queries, indent=4))

        if len(queries) > 1 and FhirAggregation.is_aggregation_query(
            queries[0]
        ):
            raise ValueError(
                "Cannot combine multiple aggregation query results"
            )

        if FhirAggregation.is_aggregation_query(queries[0]):
            response = execute_single_fhir_dsl(queries[0], auth_args=auth_args)
            return FhirAggregation.from_response(response)

        if len(queries) > 1 and _has_tqdm:
            queries = tqdm(queries)

        result_set = []

        for query in queries:
            if all_results:
                results = with_progress(
                    lambda: tqdm(total=MAX_RESULT_SIZE),
                    lambda progress: recursive_execute_fhir_dsl(
                        {
                            "limit": [
                                {"type": "number", "value": 0},
                                # Make window size smaller than maximum to reduce
                                # pressure on API
                                {
                                    "type": "number",
                                    "value": DEFAULT_SCROLL_SIZE,
                                },
                            ],
                            **query,
                        },
                        scroll=all_results,
                        progress=progress,
                        callback=callback,
                        auth_args=auth_args,
                        max_pages=max_pages,
                    ),
                )
            else:
                results = recursive_execute_fhir_dsl(
                    query,
                    scroll=all_results,
                    callback=callback,
                    auth_args=auth_args,
                    max_pages=max_pages,
                )

            if len(result_set) == 0:
                result_set = results
            else:
                result_set.append(*results)

        return result_set

    @staticmethod
    def execute_paging_api(
        path: str,
        params: dict = {},
        http_verb: str = "GET",
        transform: Callable[[pd.DataFrame], pd.DataFrame] = identity,
        all_results: bool = False,
        auth_args: Auth = Auth.shared(),
        max_pages: Optional[int] = None,
        page_size: Optional[int] = None,
        log: bool = False,
        raw: bool = False,
        ignore_cache: bool = False,
        show_progress: bool = True,
        progress: Optional[tqdm] = None,
        item_key: str = "items",
        try_count: bool = True,
        response_to_items: Optional[Callable[[Union[list, dict]], list]] = None,
    ):
        """Execute a API query that pages through results

        Attributes
        ----------
        path : str
            The API path to hit
            (Special tokens: `{project_id}`)

        params : dict
            The parameters to include with request

        http_verb : str
            The HTTP method to use

        all_results : bool = False
            Retrieve sample of results (25) or entire set of records

        auth_args : Auth, dict
            Additional arguments for authentication

        max_pages : int
            The number of pages to retrieve (useful if working with tons of records)

        page_size : int
            The number of records to fetch per page

        log : bool = False
            Whether to log some diagnostic statements for debugging

        progress : Optional[tqdm] = None
            Override the given progress indicator

        item_key : str
            The key to find the results underneath (usually "items" but not always)

        try_count : bool
            Whether to try and send a "count" param to update the progress bar

        response_to_items : Callable
            Custom function to transform response data to list of items
            (Overrides item_key when present)

        Examples
        --------
        >>> import phc.easy as phc
        >>> phc.Auth.set({ 'account': '<your-account-name>' })
        >>> phc.Project.set_current('My Project Name')
        >>> phc.Query.execute_paging_api(
                "genomics/projects/{project_id}/tests",
                params={
                    "patientId": "<patient-uuid>"
                }
            )

        """

        auth = Auth(auth_args)

        params = clean_params(params)

        # Do not pull project_id if not in URL (which throws error if project not selected)
        if "project_id" in path:
            path = path.replace("{project_id}", auth.project_id)

        path, params = merge_pattern(path, params)

        query = {"path": path, "method": http_verb, "params": params}

        if all_results and page_size is None:
            # Default to 100 if not provided but getting all results
            page_size = 100

        if log:
            print(json.dumps(query, indent=4))

        use_cache = (
            (not ignore_cache)
            and (not raw)
            and all_results
            and (max_pages is None)
        )

        if use_cache and APICache.does_cache_for_query_exist(query):
            return APICache.load_cache_for_query(query)

        callback = (
            APICache.build_cache_callback(query, transform, nested_key=None)
            if use_cache
            else None
        )

        results = with_progress(
            lambda: (progress if progress is not None else tqdm())
            if show_progress
            else None,
            lambda progress: recursive_paging_api_call(
                path,
                params=params,
                http_verb=http_verb,
                callback=callback,
                scroll=all_results or (max_pages is not None),
                max_pages=max_pages,
                page_size=page_size,
                log=log,
                auth_args=auth_args,
                progress=progress,
                item_key=item_key,
                response_to_items=response_to_items,
                try_count=try_count,
            ),
        )

        df = pd.DataFrame(results)

        if raw:
            return df

        return transform(df)

    @staticmethod
    def execute_fhir_dsl_with_options(
        query: dict,
        transform: Callable[[pd.DataFrame], pd.DataFrame],
        all_results: bool,
        raw: bool,
        query_overrides: dict,
        auth_args: Auth,
        ignore_cache: bool,
        max_pages: Union[int, None],
        log: bool = False,
        **query_kwargs,
    ):
        queries = build_queries({**query, **query_overrides}, **query_kwargs)

        if log:
            print(json.dumps(queries, indent=4))

        is_first_agg_query = FhirAggregation.is_aggregation_query(queries[0])

        if len(queries) > 1 and is_first_agg_query:
            raise ValueError("Cannot combine multiple aggregate results")

        use_cache = (
            (not ignore_cache)
            and (not raw)
            and (all_results or is_first_agg_query)
            and (max_pages is None)
        )

        if len(queries) > 1 and _has_tqdm:
            queries = tqdm(queries)

        frame = pd.DataFrame()

        for one_query in queries:
            if use_cache and APICache.does_cache_for_query_exist(
                one_query, namespace=FHIR_DSL
            ):
                results = APICache.load_cache_for_query(
                    one_query, namespace=FHIR_DSL
                )
            else:
                results = Query.execute_fhir_dsl(
                    one_query,
                    all_results,
                    auth_args,
                    callback=(
                        APICache.build_cache_callback(
                            one_query, transform, namespace=FHIR_DSL
                        )
                        if use_cache
                        else None
                    ),
                    max_pages=max_pages,
                )

            if isinstance(results, FhirAggregation):
                # Cache isn't written in batches so we need to explicitly do it here
                if use_cache:
                    APICache.write_agg(one_query, results)

                # We don't support multiple agg queries so fine to return first one
                return results

            batch_frame = (
                pd.DataFrame(map(lambda r: r["_source"], results))
                if not isinstance(results, pd.DataFrame)
                else results
            )

            frame = (
                batch_frame
                if len(frame) == 0
                else pd.concat([frame, batch_frame]).reset_index(drop=True)
            )

        if raw:
            return frame

        return transform(frame)

    @staticmethod
    def get_codes(
        table_name: str,
        code_fields: List[str],
        display_query: Optional[str] = None,
        sample_size: Optional[int] = None,
        **kwargs,
    ):
        """Find FHIR codes with a display for a given table

        Attributes
        ----------
        table_name : str
            The FHIR Search Service table to retrieve from

        code_fields : List[str]
            The fields of this table that contain a system, code, and display

        display_query : Optional[str]
            Part of the code's display to match (will try to extract full code
            if passed)

        sample_size : Optional[int]
            Override the search size for finding codes (may miss codes on later
            records)

        kwargs : dict
            Arguments to pass to `phc.easy.query.Query.execute_composite_aggregations`

        Examples
        --------
        >>> import phc.easy as phc
        >>> phc.Auth.set({ 'account': '<your-account-name>' })
        >>> phc.Project.set_current('My Project Name')
        >>> phc.Query.get_codes(
            table_name="observation",
            code_fields=["meta.tag", "code.coding"],
            patient_id="<my-patient-id>"
        )
        """
        if len(code_fields) == 0:
            raise ValueError("No code columns specified.")

        def agg_composite_to_frame(prefix: str, data: dict):
            frame = pd.json_normalize(data["buckets"])
            frame.columns = frame.columns.str.lstrip("key.")
            frame["field"] = prefix
            return frame

        if display_query is not None:
            kwargs = {
                **kwargs,
                "query_overrides": {
                    "where": {
                        "type": "elasticsearch",
                        "query": {
                            "multi_match": {
                                "query": display_query,
                                "fields": [
                                    f"{key}.display" for key in code_fields
                                ],
                            }
                        },
                    }
                },
            }

        results = Query.execute_composite_aggregations(
            table_name=table_name,
            key_sources_pairs=[
                (
                    field,
                    [
                        {
                            "display": {
                                "terms": {"field": f"{field}.display.keyword"}
                            }
                        }
                    ],
                )
                for field in code_fields
            ],
            **kwargs,
        )

        agg_result = (
            pd.concat(
                [
                    agg_composite_to_frame(key, value)
                    for key, value in results.items()
                ]
            )
            .pipe(
                lambda df: df
                if len(df) == 0 or display_query is None
                # Poor man's way to filter only matching codes (since Elasticsearch
                # returns records which will include other codes)
                else df[
                    df["display"]
                    .str.lower()
                    .str.contains(display_query.lower())
                ]
            )
            .pipe(
                lambda df: pd.DataFrame()
                if len(df) == 0
                else df.sort_values("doc_count", ascending=False).reset_index(
                    drop=True
                )
            )
        )

        if display_query is None or len(agg_result) == 0:
            return agg_result

        min_count = sample_size or agg_result.doc_count.sum()
        filtered_code_fields = agg_result.field.unique()

        # Shortcut: If one result, we just need to get the other associated
        # attributes of the code
        if len(agg_result) == 1:
            min_count = 1

        code_results = Query.execute_fhir_dsl(
            {
                "type": "select",
                "from": [{"table": table_name}],
                "columns": [
                    {
                        "expr": {
                            "type": "column_ref",
                            "column": key.split(".")[0],
                        }
                    }
                    for key in filtered_code_fields
                ],
                "where": {
                    "type": "elasticsearch",
                    "query": {
                        "multi_match": {
                            "query": display_query,
                            "fields": [
                                f"{key}.display" for key in filtered_code_fields
                            ],
                        }
                    },
                },
            },
            page_size=int(min_count % 9000),
            max_pages=int(math.ceil(min_count / 9000)),
            log=kwargs.get("log", False),
        )

        codes = extract_codes(
            map(lambda d: d["_source"], code_results),
            display_query,
            code_fields,
        )

        if len(codes) == 0:
            return codes

        if len(codes) == codes.display.nunique():
            # If display values are unique, then the counts from Elasticsearch
            # are correct. We can therefore join them.
            codes = (
                codes.join(
                    agg_result[["display", "doc_count"]].set_index("display"),
                    on="display",
                    how="outer",
                )
                .sort_values("doc_count", ascending=False)
                .reset_index(drop=True)
            )

            if len(codes[codes.field.isnull()]) > 0:
                print(
                    "Records with missing system/code values were not retrieved."
                )

            return codes

        return codes

    @staticmethod
    def execute_composite_aggregations(
        table_name: str,
        key_sources_pairs: List[Tuple[str, List[dict]]],
        batch_size: int = 100,
        query_overrides: dict = {},
        log: bool = False,
        auth_args: Auth = Auth.shared(),
        max_pages: Union[int, None] = None,
        **query_kwargs,
    ):
        """Count records by multiple fields

        Attributes
        ----------
        table_name : str
            The FHIR Search Service table to retrieve from

        key_sources_pairs : str
            Pairs of keys and sources to pull composite results from

            Example Input:
                [
                    ("meta.tag", [{"terms": {"field": "meta.tag.system.keyword"}}])
                ]

        batch_size : int
            The size of each page from elasticsearch to use

        query_overrides : dict
            Parts of the FSS query to override
            (Note that passing certain values can cause the method to error out)

            Example aggregation query executed (can use log=True to inspect):
                {
                    "type": "select",
                    "columns": [{
                        "type": "elasticsearch",
                        "aggregations": {
                            "results": {
                                "composite": {
                                    "sources": [{
                                        "meta.tag": {
                                            "terms": {
                                                "field": "meta.tag.system.keyword"
                                            }
                                        }
                                    }],
                                    "size": 100,
                                }
                            }
                        },
                    }],
                    "from": [{"table": "observation"}],
                }


        auth_args : Auth, dict
            Additional arguments for authentication

        log : bool = False
            Whether to log the elasticsearch query sent to the server

        max_pages : int
            The number of pages to retrieve (useful if working with tons of records)

        query_kwargs : dict
            Arguments to pass to build_queries such as patient_id, patient_ids,
            and patient_key. See :func:`~phc.easy.query.fhir_dsl_query.build_queries`.

        Examples
        --------
        >>> import phc.easy as phc
        >>> phc.Auth.set({ 'account': '<your-account-name>' })
        >>> phc.Project.set_current('My Project Name')
        >>> phc.Query.execute_composite_aggregations(
            table_name="observation",
            key_sources_pairs=[
                ("meta.tag", [
                    {"code": {"terms": {"field": "meta.tag.code.keyword"}}},
                ]),
                ("code.coding", [
                    {"display": {"terms": {"field": "code.coding.display.keyword"}}}
                ]),
            ]
        )
        """
        if len(key_sources_pairs) == 0:
            raise ValueError("No aggregate composite terms specified.")

        return with_progress(
            tqdm,
            lambda progress: Query._recursive_execute_composite_aggregations(
                table_name=table_name,
                key_sources_pairs=key_sources_pairs,
                batch_size=batch_size,
                progress=progress,
                log=log,
                auth_args=auth_args,
                query_overrides=query_overrides,
                max_pages=max_pages,
                **query_kwargs,
            ),
        )

    @staticmethod
    def get_count_by_field(
        table_name: str,
        field: str,
        batch_size: int = 1000,
        query_overrides: dict = {},
        log: bool = False,
        auth_args: Auth = Auth.shared(),
        **query_kwargs,
    ):
        """Count records by a given field

        Attributes
        ----------
        table_name : str
            The FHIR Search Service table to retrieve from

        field : str
            The field name to count the values of (e.g. "subject.reference")

        batch_size : int
            The size of each page from elasticsearch to use

        query_overrides : dict
            Parts of the FSS query to override
            (Note that passing certain values can cause the method to error out)

            The aggregation query is similar to this:
                {
                    "type": "select",
                    "columns": [{
                        "type": "elasticsearch",
                        "aggregations": {
                            "results": {
                                "composite": {
                                    "sources": [{
                                        "value": {
                                            "terms": {
                                                "field": "gender.keyword"
                                            }
                                        }
                                    }],
                                    "size": 100,
                                }
                            }
                        },
                    }],
                    "from": [{"table": "patient"}],
                }


        auth_args : Auth, dict
            Additional arguments for authentication

        log : bool = False
            Whether to log the elasticsearch query sent to the server

        query_kwargs : dict
            Arguments to pass to build_queries such as patient_id, patient_ids,
            and patient_key. (See phc.easy.query.fhir_dsl_query.build_queries)

        Examples
        --------
        >>> import phc.easy as phc
        >>> phc.Auth.set({ 'account': '<your-account-name>' })
        >>> phc.Project.set_current('My Project Name')
        >>> phc.Query.get_count_by_field(
            table_name="patient",
            field="gender"
        )
        """
        data = Query.execute_composite_aggregations(
            table_name=table_name,
            key_sources_pairs=[
                (
                    "results",
                    [{"value": {"terms": {"field": f"{field}.keyword"}}}],
                )
            ],
            batch_size=batch_size,
            log=log,
            auth_args=auth_args,
            query_overrides=query_overrides,
            **query_kwargs,
        )

        return pd.DataFrame(
            [
                {field: r["key"]["value"], "doc_count": r["doc_count"]}
                for r in data["results"]["buckets"]
            ]
        )

    @staticmethod
    def execute_ga4gh(
        query: dict, all_results: bool = False, auth_args: dict = Auth.shared()
    ) -> pd.DataFrame:
        auth = Auth(auth_args)
        client = BaseClient(auth.session())
        path = query["path"]
        http_verb = query.get("http_verb", "POST")
        results_key = query["results_key"]
        params = {
            **{"datasetIds": [auth.project_id]},
            **{
                k: v for k, v in query.items() if k not in ["path", "http_verb"]
            },
        }

        return recursive_execute_ga4gh(
            auth=auth,
            client=client,
            path=path,
            http_verb=http_verb,
            results_key=results_key,
            params=params,
            scroll=all_results,
        )

    @staticmethod
    def _recursive_execute_composite_aggregations(
        table_name: str,
        key_sources_pairs: List[Tuple[str, List[dict]]],
        batch_size: int = 100,
        progress: Union[tqdm, None] = None,
        query_overrides: dict = {},
        log: bool = False,
        auth_args: Auth = Auth.shared(),
        max_pages: Union[int, None] = None,
        _current_page: int = 1,
        _prev_results: dict = {},
        _after_keys: dict = {},
        **query_kwargs,
    ):
        aggregation = Query.execute_fhir_dsl(
            {
                "type": "select",
                "columns": [
                    {
                        "type": "elasticsearch",
                        "aggregations": {
                            key: {
                                "composite": {
                                    "sources": sources,
                                    "size": batch_size,
                                    **(
                                        {"after": _after_keys[key]}
                                        if key in _after_keys
                                        else {}
                                    ),
                                }
                            }
                            for key, sources in key_sources_pairs
                            if (len(_after_keys) == 0) or (key in _after_keys)
                        },
                    }
                ],
                "from": [{"table": table_name}],
                **query_overrides,
            },
            auth_args=auth_args,
            log=log,
            **query_kwargs,
        )

        current_results = aggregation.data
        results = FhirAggregation.reduce_composite_results(
            _prev_results, current_results
        )

        if (progress is not None) and (_current_page == 1) and max_pages:
            progress.reset(max_pages)

        if progress is not None:
            # Update by count or pages (if max_pages specified)
            progress.update(
                1
                if max_pages
                else FhirAggregation.count_composite_results(current_results)
            )

        after_keys = FhirAggregation.find_composite_after_keys(
            current_results, batch_size
        )

        if len(after_keys) == 0 or (
            (max_pages is not None) and (_current_page >= max_pages)
        ):
            print(
                f"Retrieved {FhirAggregation.count_composite_results(results)} results"
            )
            return results

        return Query._recursive_execute_composite_aggregations(
            table_name=table_name,
            key_sources_pairs=key_sources_pairs,
            batch_size=batch_size,
            progress=progress,
            query_overrides=query_overrides,
            log=log,
            auth_args=auth_args,
            max_pages=max_pages,
            _current_page=_current_page + 1,
            _prev_results=results,
            _after_keys=after_keys,
            **query_kwargs,
        )

Static methods

def execute_composite_aggregations(table_name: str, key_sources_pairs: List[Tuple[str, List[dict]]], batch_size: int = 100, query_overrides: dict = {}, log: bool = False, auth_args: Auth = <phc.easy.auth.Auth object>, max_pages: Optional[int] = None, **query_kwargs)

Count records by multiple fields

Attributes

table_name : str
The FHIR Search Service table to retrieve from
key_sources_pairs : str

Pairs of keys and sources to pull composite results from

Example Input: [ ("meta.tag", [{"terms": {"field": "meta.tag.system.keyword"}}]) ]

batch_size : int
The size of each page from elasticsearch to use
query_overrides : dict

Parts of the FSS query to override (Note that passing certain values can cause the method to error out)

Example aggregation query executed (can use log=True to inspect): { "type": "select", "columns": [{ "type": "elasticsearch", "aggregations": { "results": { "composite": { "sources": [{ "meta.tag": { "terms": { "field": "meta.tag.system.keyword" } } }], "size": 100, } } }, }], "from": [{"table": "observation"}], }

auth_args : Auth, dict
Additional arguments for authentication
log : bool = False
Whether to log the elasticsearch query sent to the server
max_pages : int
The number of pages to retrieve (useful if working with tons of records)
query_kwargs : dict
Arguments to pass to build_queries such as patient_id, patient_ids, and patient_key. See :func:~phc.easy.query.fhir_dsl_query.build_queries.

Examples

>>> import phc.easy as phc
>>> phc.Auth.set({ 'account': '<your-account-name>' })
>>> phc.Project.set_current('My Project Name')
>>> phc.Query.execute_composite_aggregations(
    table_name="observation",
    key_sources_pairs=[
        ("meta.tag", [
            {"code": {"terms": {"field": "meta.tag.code.keyword"}}},
        ]),
        ("code.coding", [
            {"display": {"terms": {"field": "code.coding.display.keyword"}}}
        ]),
    ]
)
Expand source code
@staticmethod
def execute_composite_aggregations(
    table_name: str,
    key_sources_pairs: List[Tuple[str, List[dict]]],
    batch_size: int = 100,
    query_overrides: dict = {},
    log: bool = False,
    auth_args: Auth = Auth.shared(),
    max_pages: Union[int, None] = None,
    **query_kwargs,
):
    """Count records by multiple fields

    Attributes
    ----------
    table_name : str
        The FHIR Search Service table to retrieve from

    key_sources_pairs : str
        Pairs of keys and sources to pull composite results from

        Example Input:
            [
                ("meta.tag", [{"terms": {"field": "meta.tag.system.keyword"}}])
            ]

    batch_size : int
        The size of each page from elasticsearch to use

    query_overrides : dict
        Parts of the FSS query to override
        (Note that passing certain values can cause the method to error out)

        Example aggregation query executed (can use log=True to inspect):
            {
                "type": "select",
                "columns": [{
                    "type": "elasticsearch",
                    "aggregations": {
                        "results": {
                            "composite": {
                                "sources": [{
                                    "meta.tag": {
                                        "terms": {
                                            "field": "meta.tag.system.keyword"
                                        }
                                    }
                                }],
                                "size": 100,
                            }
                        }
                    },
                }],
                "from": [{"table": "observation"}],
            }


    auth_args : Auth, dict
        Additional arguments for authentication

    log : bool = False
        Whether to log the elasticsearch query sent to the server

    max_pages : int
        The number of pages to retrieve (useful if working with tons of records)

    query_kwargs : dict
        Arguments to pass to build_queries such as patient_id, patient_ids,
        and patient_key. See :func:`~phc.easy.query.fhir_dsl_query.build_queries`.

    Examples
    --------
    >>> import phc.easy as phc
    >>> phc.Auth.set({ 'account': '<your-account-name>' })
    >>> phc.Project.set_current('My Project Name')
    >>> phc.Query.execute_composite_aggregations(
        table_name="observation",
        key_sources_pairs=[
            ("meta.tag", [
                {"code": {"terms": {"field": "meta.tag.code.keyword"}}},
            ]),
            ("code.coding", [
                {"display": {"terms": {"field": "code.coding.display.keyword"}}}
            ]),
        ]
    )
    """
    if len(key_sources_pairs) == 0:
        raise ValueError("No aggregate composite terms specified.")

    return with_progress(
        tqdm,
        lambda progress: Query._recursive_execute_composite_aggregations(
            table_name=table_name,
            key_sources_pairs=key_sources_pairs,
            batch_size=batch_size,
            progress=progress,
            log=log,
            auth_args=auth_args,
            query_overrides=query_overrides,
            max_pages=max_pages,
            **query_kwargs,
        ),
    )
def execute_fhir_dsl(query: dict, all_results: bool = False, auth_args: Auth = <phc.easy.auth.Auth object>, callback: Optional[Callable[[Any, bool], None]] = None, max_pages: Optional[int] = None, log: bool = False, **query_kwargs)

Execute a FHIR query with the DSL

See https://devcenter.docs.lifeomic.com/development/fhir-service/dsl

Attributes

query : dict
The FHIR query to run (is a superset of elasticsearch)
all_results : bool
Return all results by scrolling through mutliple pages of data (Limit is ignored if provided)
auth_args : Auth, dict
Additional arguments for authentication
callback : Callable[[Any, bool], None] (optional)

A progress function that is invoked for each batch. When the second argument passed is true, then the result of the callback function is used as the return value. This is useful if writing results out to a file and then returning the completed result from that file.

Example:

def handle_batch(batch, is_finished):
    print(len(batch))
    if is_finished:
        return "batch finished
max_pages : int
The number of pages to retrieve (useful if working with tons of records)
log : bool = False
Whether to log the elasticsearch query sent to the server
query_kwargs : dict
Arguments to pass to build_queries such as patient_id, patient_ids, and patient_key. (See phc.easy.query.fhir_dsl_query.build_queries)

Examples

>>> import phc.easy as phc
>>> phc.Auth.set({ 'account': '<your-account-name>' })
>>> phc.Project.set_current('My Project Name')
>>> phc.Query.execute_fhir_dsl({
  "type": "select",
  "columns": "*",
  "from": [
      {"table": "patient"}
  ],
}, all_results=True)
Expand source code
@staticmethod
def execute_fhir_dsl(
    query: dict,
    all_results: bool = False,
    auth_args: Auth = Auth.shared(),
    callback: Union[Callable[[Any, bool], None], None] = None,
    max_pages: Union[int, None] = None,
    log: bool = False,
    **query_kwargs,
):
    """Execute a FHIR query with the DSL

    See https://devcenter.docs.lifeomic.com/development/fhir-service/dsl

    Attributes
    ----------
    query : dict
        The FHIR query to run (is a superset of elasticsearch)

    all_results : bool
        Return all results by scrolling through mutliple pages of data
        (Limit is ignored if provided)

    auth_args : Auth, dict
        Additional arguments for authentication

    callback : Callable[[Any, bool], None] (optional)
        A progress function that is invoked for each batch. When the second
        argument passed is true, then the result of the callback function is
        used as the return value. This is useful if writing results out to a
        file and then returning the completed result from that file.

        Example:

            def handle_batch(batch, is_finished):
                print(len(batch))
                if is_finished:
                    return "batch finished

    max_pages : int
        The number of pages to retrieve (useful if working with tons of records)

    log : bool = False
        Whether to log the elasticsearch query sent to the server

    query_kwargs : dict
        Arguments to pass to build_queries such as patient_id, patient_ids,
        and patient_key. (See phc.easy.query.fhir_dsl_query.build_queries)

    Examples
    --------
    >>> import phc.easy as phc
    >>> phc.Auth.set({ 'account': '<your-account-name>' })
    >>> phc.Project.set_current('My Project Name')
    >>> phc.Query.execute_fhir_dsl({
      "type": "select",
      "columns": "*",
      "from": [
          {"table": "patient"}
      ],
    }, all_results=True)

    """
    queries = build_queries(query, **query_kwargs)

    if log:
        print(json.dumps(queries, indent=4))

    if len(queries) > 1 and FhirAggregation.is_aggregation_query(
        queries[0]
    ):
        raise ValueError(
            "Cannot combine multiple aggregation query results"
        )

    if FhirAggregation.is_aggregation_query(queries[0]):
        response = execute_single_fhir_dsl(queries[0], auth_args=auth_args)
        return FhirAggregation.from_response(response)

    if len(queries) > 1 and _has_tqdm:
        queries = tqdm(queries)

    result_set = []

    for query in queries:
        if all_results:
            results = with_progress(
                lambda: tqdm(total=MAX_RESULT_SIZE),
                lambda progress: recursive_execute_fhir_dsl(
                    {
                        "limit": [
                            {"type": "number", "value": 0},
                            # Make window size smaller than maximum to reduce
                            # pressure on API
                            {
                                "type": "number",
                                "value": DEFAULT_SCROLL_SIZE,
                            },
                        ],
                        **query,
                    },
                    scroll=all_results,
                    progress=progress,
                    callback=callback,
                    auth_args=auth_args,
                    max_pages=max_pages,
                ),
            )
        else:
            results = recursive_execute_fhir_dsl(
                query,
                scroll=all_results,
                callback=callback,
                auth_args=auth_args,
                max_pages=max_pages,
            )

        if len(result_set) == 0:
            result_set = results
        else:
            result_set.append(*results)

    return result_set
def execute_fhir_dsl_with_options(query: dict, transform: Callable[[pandas.core.frame.DataFrame], pandas.core.frame.DataFrame], all_results: bool, raw: bool, query_overrides: dict, auth_args: Auth, ignore_cache: bool, max_pages: Optional[int], log: bool = False, **query_kwargs)
Expand source code
@staticmethod
def execute_fhir_dsl_with_options(
    query: dict,
    transform: Callable[[pd.DataFrame], pd.DataFrame],
    all_results: bool,
    raw: bool,
    query_overrides: dict,
    auth_args: Auth,
    ignore_cache: bool,
    max_pages: Union[int, None],
    log: bool = False,
    **query_kwargs,
):
    queries = build_queries({**query, **query_overrides}, **query_kwargs)

    if log:
        print(json.dumps(queries, indent=4))

    is_first_agg_query = FhirAggregation.is_aggregation_query(queries[0])

    if len(queries) > 1 and is_first_agg_query:
        raise ValueError("Cannot combine multiple aggregate results")

    use_cache = (
        (not ignore_cache)
        and (not raw)
        and (all_results or is_first_agg_query)
        and (max_pages is None)
    )

    if len(queries) > 1 and _has_tqdm:
        queries = tqdm(queries)

    frame = pd.DataFrame()

    for one_query in queries:
        if use_cache and APICache.does_cache_for_query_exist(
            one_query, namespace=FHIR_DSL
        ):
            results = APICache.load_cache_for_query(
                one_query, namespace=FHIR_DSL
            )
        else:
            results = Query.execute_fhir_dsl(
                one_query,
                all_results,
                auth_args,
                callback=(
                    APICache.build_cache_callback(
                        one_query, transform, namespace=FHIR_DSL
                    )
                    if use_cache
                    else None
                ),
                max_pages=max_pages,
            )

        if isinstance(results, FhirAggregation):
            # Cache isn't written in batches so we need to explicitly do it here
            if use_cache:
                APICache.write_agg(one_query, results)

            # We don't support multiple agg queries so fine to return first one
            return results

        batch_frame = (
            pd.DataFrame(map(lambda r: r["_source"], results))
            if not isinstance(results, pd.DataFrame)
            else results
        )

        frame = (
            batch_frame
            if len(frame) == 0
            else pd.concat([frame, batch_frame]).reset_index(drop=True)
        )

    if raw:
        return frame

    return transform(frame)
def execute_ga4gh(query: dict, all_results: bool = False, auth_args: dict = <phc.easy.auth.Auth object>) ‑> pandas.core.frame.DataFrame
Expand source code
@staticmethod
def execute_ga4gh(
    query: dict, all_results: bool = False, auth_args: dict = Auth.shared()
) -> pd.DataFrame:
    auth = Auth(auth_args)
    client = BaseClient(auth.session())
    path = query["path"]
    http_verb = query.get("http_verb", "POST")
    results_key = query["results_key"]
    params = {
        **{"datasetIds": [auth.project_id]},
        **{
            k: v for k, v in query.items() if k not in ["path", "http_verb"]
        },
    }

    return recursive_execute_ga4gh(
        auth=auth,
        client=client,
        path=path,
        http_verb=http_verb,
        results_key=results_key,
        params=params,
        scroll=all_results,
    )
def execute_paging_api(path: str, params: dict = {}, http_verb: str = 'GET', transform: Callable[[pandas.core.frame.DataFrame], pandas.core.frame.DataFrame] = <function identity>, all_results: bool = False, auth_args: Auth = <phc.easy.auth.Auth object>, max_pages: Optional[int] = None, page_size: Optional[int] = None, log: bool = False, raw: bool = False, ignore_cache: bool = False, show_progress: bool = True, progress: None = None, item_key: str = 'items', try_count: bool = True, response_to_items: Optional[Callable[[Union[list, dict]], list]] = None)

Execute a API query that pages through results

Attributes

path : str
The API path to hit (Special tokens: {project_id})
params : dict
The parameters to include with request
http_verb : str
The HTTP method to use
all_results : bool = False
Retrieve sample of results (25) or entire set of records
auth_args : Auth, dict
Additional arguments for authentication
max_pages : int
The number of pages to retrieve (useful if working with tons of records)
page_size : int
The number of records to fetch per page
log : bool = False
Whether to log some diagnostic statements for debugging
progress : Optional[tqdm] = None
Override the given progress indicator
item_key : str
The key to find the results underneath (usually "items" but not always)
try_count : bool
Whether to try and send a "count" param to update the progress bar
response_to_items : Callable
Custom function to transform response data to list of items (Overrides item_key when present)

Examples

>>> import phc.easy as phc
>>> phc.Auth.set({ 'account': '<your-account-name>' })
>>> phc.Project.set_current('My Project Name')
>>> phc.Query.execute_paging_api(
        "genomics/projects/{project_id}/tests",
        params={
            "patientId": "<patient-uuid>"
        }
    )
Expand source code
@staticmethod
def execute_paging_api(
    path: str,
    params: dict = {},
    http_verb: str = "GET",
    transform: Callable[[pd.DataFrame], pd.DataFrame] = identity,
    all_results: bool = False,
    auth_args: Auth = Auth.shared(),
    max_pages: Optional[int] = None,
    page_size: Optional[int] = None,
    log: bool = False,
    raw: bool = False,
    ignore_cache: bool = False,
    show_progress: bool = True,
    progress: Optional[tqdm] = None,
    item_key: str = "items",
    try_count: bool = True,
    response_to_items: Optional[Callable[[Union[list, dict]], list]] = None,
):
    """Execute a API query that pages through results

    Attributes
    ----------
    path : str
        The API path to hit
        (Special tokens: `{project_id}`)

    params : dict
        The parameters to include with request

    http_verb : str
        The HTTP method to use

    all_results : bool = False
        Retrieve sample of results (25) or entire set of records

    auth_args : Auth, dict
        Additional arguments for authentication

    max_pages : int
        The number of pages to retrieve (useful if working with tons of records)

    page_size : int
        The number of records to fetch per page

    log : bool = False
        Whether to log some diagnostic statements for debugging

    progress : Optional[tqdm] = None
        Override the given progress indicator

    item_key : str
        The key to find the results underneath (usually "items" but not always)

    try_count : bool
        Whether to try and send a "count" param to update the progress bar

    response_to_items : Callable
        Custom function to transform response data to list of items
        (Overrides item_key when present)

    Examples
    --------
    >>> import phc.easy as phc
    >>> phc.Auth.set({ 'account': '<your-account-name>' })
    >>> phc.Project.set_current('My Project Name')
    >>> phc.Query.execute_paging_api(
            "genomics/projects/{project_id}/tests",
            params={
                "patientId": "<patient-uuid>"
            }
        )

    """

    auth = Auth(auth_args)

    params = clean_params(params)

    # Do not pull project_id if not in URL (which throws error if project not selected)
    if "project_id" in path:
        path = path.replace("{project_id}", auth.project_id)

    path, params = merge_pattern(path, params)

    query = {"path": path, "method": http_verb, "params": params}

    if all_results and page_size is None:
        # Default to 100 if not provided but getting all results
        page_size = 100

    if log:
        print(json.dumps(query, indent=4))

    use_cache = (
        (not ignore_cache)
        and (not raw)
        and all_results
        and (max_pages is None)
    )

    if use_cache and APICache.does_cache_for_query_exist(query):
        return APICache.load_cache_for_query(query)

    callback = (
        APICache.build_cache_callback(query, transform, nested_key=None)
        if use_cache
        else None
    )

    results = with_progress(
        lambda: (progress if progress is not None else tqdm())
        if show_progress
        else None,
        lambda progress: recursive_paging_api_call(
            path,
            params=params,
            http_verb=http_verb,
            callback=callback,
            scroll=all_results or (max_pages is not None),
            max_pages=max_pages,
            page_size=page_size,
            log=log,
            auth_args=auth_args,
            progress=progress,
            item_key=item_key,
            response_to_items=response_to_items,
            try_count=try_count,
        ),
    )

    df = pd.DataFrame(results)

    if raw:
        return df

    return transform(df)
def find_count_of_dsl_query(query: dict, auth_args: Auth = <phc.easy.auth.Auth object>)

Find count of a given dsl query

See https://devcenter.docs.lifeomic.com/development/fhir-service/dsl

Attributes

query : dict
The FHIR query to run a count against
auth_args : Auth, dict
Additional arguments for authentication

Examples

>>> import phc.easy as phc
>>> phc.Auth.set({ 'account': '<your-account-name>' })
>>> phc.Project.set_current('My Project Name')
>>> phc.Query.find_count_of_dsl_query({
  "type": "select",
  "columns": "*",
  "from": [{"table": "patient"}],
})
Expand source code
@staticmethod
def find_count_of_dsl_query(query: dict, auth_args: Auth = Auth.shared()):
    """Find count of a given dsl query

    See https://devcenter.docs.lifeomic.com/development/fhir-service/dsl

    Attributes
    ----------
    query : dict
        The FHIR query to run a count against

    auth_args : Auth, dict
        Additional arguments for authentication

    Examples
    --------
    >>> import phc.easy as phc
    >>> phc.Auth.set({ 'account': '<your-account-name>' })
    >>> phc.Project.set_current('My Project Name')
    >>> phc.Query.find_count_of_dsl_query({
      "type": "select",
      "columns": "*",
      "from": [{"table": "patient"}],
    })
    """
    if FhirAggregation.is_aggregation_query(query):
        raise ValueError("Count is not support for aggregation queries.")

    auth = Auth(auth_args)
    fhir = Fhir(auth.session())

    response = fhir.execute_es(
        auth.project_id, build_queries(query, page_size=1)[0], scroll="true"
    )

    return response.data["hits"]["total"]["value"]
def get_codes(table_name: str, code_fields: List[str], display_query: Optional[str] = None, sample_size: Optional[int] = None, **kwargs)

Find FHIR codes with a display for a given table

Attributes

table_name : str
The FHIR Search Service table to retrieve from
code_fields : List[str]
The fields of this table that contain a system, code, and display
display_query : Optional[str]
Part of the code's display to match (will try to extract full code if passed)
sample_size : Optional[int]
Override the search size for finding codes (may miss codes on later records)
kwargs : dict
Arguments to pass to Query.execute_composite_aggregations()

Examples

>>> import phc.easy as phc
>>> phc.Auth.set({ 'account': '<your-account-name>' })
>>> phc.Project.set_current('My Project Name')
>>> phc.Query.get_codes(
    table_name="observation",
    code_fields=["meta.tag", "code.coding"],
    patient_id="<my-patient-id>"
)
Expand source code
@staticmethod
def get_codes(
    table_name: str,
    code_fields: List[str],
    display_query: Optional[str] = None,
    sample_size: Optional[int] = None,
    **kwargs,
):
    """Find FHIR codes with a display for a given table

    Attributes
    ----------
    table_name : str
        The FHIR Search Service table to retrieve from

    code_fields : List[str]
        The fields of this table that contain a system, code, and display

    display_query : Optional[str]
        Part of the code's display to match (will try to extract full code
        if passed)

    sample_size : Optional[int]
        Override the search size for finding codes (may miss codes on later
        records)

    kwargs : dict
        Arguments to pass to `phc.easy.query.Query.execute_composite_aggregations`

    Examples
    --------
    >>> import phc.easy as phc
    >>> phc.Auth.set({ 'account': '<your-account-name>' })
    >>> phc.Project.set_current('My Project Name')
    >>> phc.Query.get_codes(
        table_name="observation",
        code_fields=["meta.tag", "code.coding"],
        patient_id="<my-patient-id>"
    )
    """
    if len(code_fields) == 0:
        raise ValueError("No code columns specified.")

    def agg_composite_to_frame(prefix: str, data: dict):
        frame = pd.json_normalize(data["buckets"])
        frame.columns = frame.columns.str.lstrip("key.")
        frame["field"] = prefix
        return frame

    if display_query is not None:
        kwargs = {
            **kwargs,
            "query_overrides": {
                "where": {
                    "type": "elasticsearch",
                    "query": {
                        "multi_match": {
                            "query": display_query,
                            "fields": [
                                f"{key}.display" for key in code_fields
                            ],
                        }
                    },
                }
            },
        }

    results = Query.execute_composite_aggregations(
        table_name=table_name,
        key_sources_pairs=[
            (
                field,
                [
                    {
                        "display": {
                            "terms": {"field": f"{field}.display.keyword"}
                        }
                    }
                ],
            )
            for field in code_fields
        ],
        **kwargs,
    )

    agg_result = (
        pd.concat(
            [
                agg_composite_to_frame(key, value)
                for key, value in results.items()
            ]
        )
        .pipe(
            lambda df: df
            if len(df) == 0 or display_query is None
            # Poor man's way to filter only matching codes (since Elasticsearch
            # returns records which will include other codes)
            else df[
                df["display"]
                .str.lower()
                .str.contains(display_query.lower())
            ]
        )
        .pipe(
            lambda df: pd.DataFrame()
            if len(df) == 0
            else df.sort_values("doc_count", ascending=False).reset_index(
                drop=True
            )
        )
    )

    if display_query is None or len(agg_result) == 0:
        return agg_result

    min_count = sample_size or agg_result.doc_count.sum()
    filtered_code_fields = agg_result.field.unique()

    # Shortcut: If one result, we just need to get the other associated
    # attributes of the code
    if len(agg_result) == 1:
        min_count = 1

    code_results = Query.execute_fhir_dsl(
        {
            "type": "select",
            "from": [{"table": table_name}],
            "columns": [
                {
                    "expr": {
                        "type": "column_ref",
                        "column": key.split(".")[0],
                    }
                }
                for key in filtered_code_fields
            ],
            "where": {
                "type": "elasticsearch",
                "query": {
                    "multi_match": {
                        "query": display_query,
                        "fields": [
                            f"{key}.display" for key in filtered_code_fields
                        ],
                    }
                },
            },
        },
        page_size=int(min_count % 9000),
        max_pages=int(math.ceil(min_count / 9000)),
        log=kwargs.get("log", False),
    )

    codes = extract_codes(
        map(lambda d: d["_source"], code_results),
        display_query,
        code_fields,
    )

    if len(codes) == 0:
        return codes

    if len(codes) == codes.display.nunique():
        # If display values are unique, then the counts from Elasticsearch
        # are correct. We can therefore join them.
        codes = (
            codes.join(
                agg_result[["display", "doc_count"]].set_index("display"),
                on="display",
                how="outer",
            )
            .sort_values("doc_count", ascending=False)
            .reset_index(drop=True)
        )

        if len(codes[codes.field.isnull()]) > 0:
            print(
                "Records with missing system/code values were not retrieved."
            )

        return codes

    return codes
def get_count_by_field(table_name: str, field: str, batch_size: int = 1000, query_overrides: dict = {}, log: bool = False, auth_args: Auth = <phc.easy.auth.Auth object>, **query_kwargs)

Count records by a given field

Attributes

table_name : str
The FHIR Search Service table to retrieve from
field : str
The field name to count the values of (e.g. "subject.reference")
batch_size : int
The size of each page from elasticsearch to use
query_overrides : dict

Parts of the FSS query to override (Note that passing certain values can cause the method to error out)

The aggregation query is similar to this: { "type": "select", "columns": [{ "type": "elasticsearch", "aggregations": { "results": { "composite": { "sources": [{ "value": { "terms": { "field": "gender.keyword" } } }], "size": 100, } } }, }], "from": [{"table": "patient"}], }

auth_args : Auth, dict
Additional arguments for authentication
log : bool = False
Whether to log the elasticsearch query sent to the server
query_kwargs : dict
Arguments to pass to build_queries such as patient_id, patient_ids, and patient_key. (See phc.easy.query.fhir_dsl_query.build_queries)

Examples

>>> import phc.easy as phc
>>> phc.Auth.set({ 'account': '<your-account-name>' })
>>> phc.Project.set_current('My Project Name')
>>> phc.Query.get_count_by_field(
    table_name="patient",
    field="gender"
)
Expand source code
@staticmethod
def get_count_by_field(
    table_name: str,
    field: str,
    batch_size: int = 1000,
    query_overrides: dict = {},
    log: bool = False,
    auth_args: Auth = Auth.shared(),
    **query_kwargs,
):
    """Count records by a given field

    Attributes
    ----------
    table_name : str
        The FHIR Search Service table to retrieve from

    field : str
        The field name to count the values of (e.g. "subject.reference")

    batch_size : int
        The size of each page from elasticsearch to use

    query_overrides : dict
        Parts of the FSS query to override
        (Note that passing certain values can cause the method to error out)

        The aggregation query is similar to this:
            {
                "type": "select",
                "columns": [{
                    "type": "elasticsearch",
                    "aggregations": {
                        "results": {
                            "composite": {
                                "sources": [{
                                    "value": {
                                        "terms": {
                                            "field": "gender.keyword"
                                        }
                                    }
                                }],
                                "size": 100,
                            }
                        }
                    },
                }],
                "from": [{"table": "patient"}],
            }


    auth_args : Auth, dict
        Additional arguments for authentication

    log : bool = False
        Whether to log the elasticsearch query sent to the server

    query_kwargs : dict
        Arguments to pass to build_queries such as patient_id, patient_ids,
        and patient_key. (See phc.easy.query.fhir_dsl_query.build_queries)

    Examples
    --------
    >>> import phc.easy as phc
    >>> phc.Auth.set({ 'account': '<your-account-name>' })
    >>> phc.Project.set_current('My Project Name')
    >>> phc.Query.get_count_by_field(
        table_name="patient",
        field="gender"
    )
    """
    data = Query.execute_composite_aggregations(
        table_name=table_name,
        key_sources_pairs=[
            (
                "results",
                [{"value": {"terms": {"field": f"{field}.keyword"}}}],
            )
        ],
        batch_size=batch_size,
        log=log,
        auth_args=auth_args,
        query_overrides=query_overrides,
        **query_kwargs,
    )

    return pd.DataFrame(
        [
            {field: r["key"]["value"], "doc_count": r["doc_count"]}
            for r in data["results"]["buckets"]
        ]
    )
class ReferralRequest

Provides an abstract class and/or static methods for retrieving items from a FSS table that relates to a patient

Expand source code
class ReferralRequest(FhirServicePatientItem):
    @staticmethod
    def table_name():
        return "referral_request"

    @staticmethod
    def code_fields():
        return ["type.coding", "meta.tag"]

    @staticmethod
    def transform_results(df: pd.DataFrame, **expand_args):
        return Frame.expand(
            df,
            code_columns=[*expand_args.get("code_columns", []), "type"],
            custom_columns=[
                *expand_args.get("custom_columns", []),
                Frame.codeable_like_column_expander("subject"),
                Frame.codeable_like_column_expander("context"),
                (
                    "requester",
                    lambda r: pd.json_normalize(r).add_prefix("requester."),
                ),
            ],
        )

Ancestors

Static methods

def code_fields()

Inherited from: FhirServicePatientItem.code_fields

Returns the code keys (e.g. when searching for codes)

Expand source code
@staticmethod
def code_fields():
    return ["type.coding", "meta.tag"]
def get_codes(display_query: Optional[str] = None, sample_size: Optional[int] = None, exclude_meta_tag=True, **kwargs)

Inherited from: FhirServicePatientItem.get_codes

Find all codes …

def get_count(query_overrides: dict = {}, auth_args=<phc.easy.auth.Auth object>)

Inherited from: FhirServicePatientItem.get_count

Get the count for a given FSS query

def get_count_by_field(field: str, **kwargs)

Inherited from: FhirServicePatientItem.get_count_by_field

Count records by a given field …

def get_count_by_patient(**kwargs)

Inherited from: FhirServicePatientItem.get_count_by_patient

Count records by a given field …

def get_data_frame(all_results: bool = False, raw: bool = False, id: Optional[str] = None, ids: List[str] = [], patient_id: Optional[str] = None, patient_ids: List[str] = [], device_id: Optional[str] = None, device_ids: List[str] = [], page_size: Optional[int] = None, max_pages: Optional[int] = None, query_overrides: dict = {}, auth_args=<phc.easy.auth.Auth object>, ignore_cache: bool = False, expand_args: dict = {}, log: bool = False, term: Optional[dict] = None, terms: List[dict] = [], max_terms: int = 30000, code: Union[str, List[str], None] = None, display: Union[str, List[str], None] = None, system: Union[str, List[str], None] = None, code_fields: List[str] = [])

Inherited from: FhirServicePatientItem.get_data_frame

Retrieve records …

def table_name()

Inherited from: FhirServicePatientItem.table_name

Returns the FSS table name for retrieval

Expand source code
@staticmethod
def table_name():
    return "referral_request"
def transform_results(df: pandas.core.frame.DataFrame, **expand_args)

Inherited from: FhirServicePatientItem.transform_results

Transform data frame batch

Expand source code
@staticmethod
def transform_results(df: pd.DataFrame, **expand_args):
    return Frame.expand(
        df,
        code_columns=[*expand_args.get("code_columns", []), "type"],
        custom_columns=[
            *expand_args.get("custom_columns", []),
            Frame.codeable_like_column_expander("subject"),
            Frame.codeable_like_column_expander("context"),
            (
                "requester",
                lambda r: pd.json_normalize(r).add_prefix("requester."),
            ),
        ],
    )
class Sequence

Provides an abstract class and/or static methods for retrieving items from a FSS table that relates to a patient

Expand source code
class Sequence(FhirServicePatientItem):
    @staticmethod
    def table_name():
        return "sequence"

    @staticmethod
    def patient_key() -> str:
        return "patient.reference"

    @staticmethod
    def transform_results(data_frame: pd.DataFrame, **expand_args):
        args = {
            **expand_args,
            "code_columns": [
                *expand_args.get("code_columns", []),
                "specimen",
                "repository",
            ],
            "custom_columns": [
                *expand_args.get("custom_columns", []),
                Frame.codeable_like_column_expander("patient"),
                Frame.codeable_like_column_expander("referenceSeq"),
            ],
        }

        return Frame.expand(data_frame, **args)

Ancestors

Static methods

def code_fields() ‑> List[str]

Inherited from: FhirServicePatientItem.code_fields

Returns the code keys (e.g. when searching for codes)

def get_codes(display_query: Optional[str] = None, sample_size: Optional[int] = None, exclude_meta_tag=True, **kwargs)

Inherited from: FhirServicePatientItem.get_codes

Find all codes …

def get_count(query_overrides: dict = {}, auth_args=<phc.easy.auth.Auth object>)

Inherited from: FhirServicePatientItem.get_count

Get the count for a given FSS query

def get_count_by_field(field: str, **kwargs)

Inherited from: FhirServicePatientItem.get_count_by_field

Count records by a given field …

def get_count_by_patient(**kwargs)

Inherited from: FhirServicePatientItem.get_count_by_patient

Count records by a given field …

def get_data_frame(all_results: bool = False, raw: bool = False, id: Optional[str] = None, ids: List[str] = [], patient_id: Optional[str] = None, patient_ids: List[str] = [], device_id: Optional[str] = None, device_ids: List[str] = [], page_size: Optional[int] = None, max_pages: Optional[int] = None, query_overrides: dict = {}, auth_args=<phc.easy.auth.Auth object>, ignore_cache: bool = False, expand_args: dict = {}, log: bool = False, term: Optional[dict] = None, terms: List[dict] = [], max_terms: int = 30000, code: Union[str, List[str], None] = None, display: Union[str, List[str], None] = None, system: Union[str, List[str], None] = None, code_fields: List[str] = [])

Inherited from: FhirServicePatientItem.get_data_frame

Retrieve records …

def patient_key() ‑> str
Expand source code
@staticmethod
def patient_key() -> str:
    return "patient.reference"
def table_name()

Inherited from: FhirServicePatientItem.table_name

Returns the FSS table name for retrieval

Expand source code
@staticmethod
def table_name():
    return "sequence"
def transform_results(data_frame: pandas.core.frame.DataFrame, **expand_args)

Inherited from: FhirServicePatientItem.transform_results

Transform data frame batch

Expand source code
@staticmethod
def transform_results(data_frame: pd.DataFrame, **expand_args):
    args = {
        **expand_args,
        "code_columns": [
            *expand_args.get("code_columns", []),
            "specimen",
            "repository",
        ],
        "custom_columns": [
            *expand_args.get("custom_columns", []),
            Frame.codeable_like_column_expander("patient"),
            Frame.codeable_like_column_expander("referenceSeq"),
        ],
    }

    return Frame.expand(data_frame, **args)
class Specimen

Provides an abstract class and/or static methods for retrieving items from a FSS table that relates to a patient

Expand source code
class Specimen(FhirServicePatientItem):
    @staticmethod
    def table_name():
        return "specimen"

    @staticmethod
    def code_fields():
        return ["type.coding", "meta.tag", "collection.bodySite.coding"]

    @staticmethod
    def transform_results(df: pd.DataFrame, **expand_args):
        return Frame.expand(
            df,
            code_columns=[
                *expand_args.get("code_columns", []),
                "collection",
                "type",
            ],
            custom_columns=[
                *expand_args.get("custom_columns", []),
                Frame.codeable_like_column_expander("subject"),
            ],
        )

Ancestors

Static methods

def code_fields()

Inherited from: FhirServicePatientItem.code_fields

Returns the code keys (e.g. when searching for codes)

Expand source code
@staticmethod
def code_fields():
    return ["type.coding", "meta.tag", "collection.bodySite.coding"]
def get_codes(display_query: Optional[str] = None, sample_size: Optional[int] = None, exclude_meta_tag=True, **kwargs)

Inherited from: FhirServicePatientItem.get_codes

Find all codes …

def get_count(query_overrides: dict = {}, auth_args=<phc.easy.auth.Auth object>)

Inherited from: FhirServicePatientItem.get_count

Get the count for a given FSS query

def get_count_by_field(field: str, **kwargs)

Inherited from: FhirServicePatientItem.get_count_by_field

Count records by a given field …

def get_count_by_patient(**kwargs)

Inherited from: FhirServicePatientItem.get_count_by_patient

Count records by a given field …

def get_data_frame(all_results: bool = False, raw: bool = False, id: Optional[str] = None, ids: List[str] = [], patient_id: Optional[str] = None, patient_ids: List[str] = [], device_id: Optional[str] = None, device_ids: List[str] = [], page_size: Optional[int] = None, max_pages: Optional[int] = None, query_overrides: dict = {}, auth_args=<phc.easy.auth.Auth object>, ignore_cache: bool = False, expand_args: dict = {}, log: bool = False, term: Optional[dict] = None, terms: List[dict] = [], max_terms: int = 30000, code: Union[str, List[str], None] = None, display: Union[str, List[str], None] = None, system: Union[str, List[str], None] = None, code_fields: List[str] = [])

Inherited from: FhirServicePatientItem.get_data_frame

Retrieve records …

def table_name()

Inherited from: FhirServicePatientItem.table_name

Returns the FSS table name for retrieval

Expand source code
@staticmethod
def table_name():
    return "specimen"
def transform_results(df: pandas.core.frame.DataFrame, **expand_args)

Inherited from: FhirServicePatientItem.transform_results

Transform data frame batch

Expand source code
@staticmethod
def transform_results(df: pd.DataFrame, **expand_args):
    return Frame.expand(
        df,
        code_columns=[
            *expand_args.get("code_columns", []),
            "collection",
            "type",
        ],
        custom_columns=[
            *expand_args.get("custom_columns", []),
            Frame.codeable_like_column_expander("subject"),
        ],
    )
class SummaryClinicalCounts
Expand source code
class SummaryClinicalCounts(PagingApiItem):
    @staticmethod
    def resource_path():
        return "analytics/summary/{project_id}/clinical/codified/counts"

    @staticmethod
    def execute_args() -> dict:
        return SummaryCounts.execute_args()

    @staticmethod
    def params_class():
        return SummaryClinicalCountsOptions

    @classmethod
    def get_data_frame(
        cls,
        all_results: bool = True,
        auth_args: Auth = Auth.shared(),
        max_pages: Optional[int] = None,
        page_size: Optional[int] = None,
        log: bool = False,
        match: Optional[SummarySearchMatchOption] = None,
        code: Optional[Union[str, List[str]]] = None,
        display: Optional[Union[str, List[str]]] = None,
        system: Optional[Union[str, List[str]]] = None,
        **kw_args,
    ):
        """Execute a request for summary counts across clinical data

        >>> from phc.easy.summary.clinical_counts import SummaryClinicalCountsOptions
        >>> SummaryClinicalCountsOptions.get_data_frame(match="fuzzy", display="sleep")

        ## Parameters

        Execution: `phc.easy.query.Query.execute_paging_api`
        """

        df = super().get_data_frame(
            **kw_args, **cls._get_current_args(inspect.currentframe(), locals())
        )

        return df

Ancestors

Static methods

def execute_args() ‑> dict
Expand source code
@staticmethod
def execute_args() -> dict:
    return SummaryCounts.execute_args()
def get_data_frame(all_results: bool = True, auth_args: Auth = <phc.easy.auth.Auth object>, max_pages: Optional[int] = None, page_size: Optional[int] = None, log: bool = False, match: Optional[SummarySearchMatchOption] = None, code: Union[str, List[str], None] = None, display: Union[str, List[str], None] = None, system: Union[str, List[str], None] = None, **kw_args)

Execute a request for summary counts across clinical data

>>> from phc.easy.summary.clinical_counts import SummaryClinicalCountsOptions
>>> SummaryClinicalCountsOptions.get_data_frame(match="fuzzy", display="sleep")

Parameters

Execution: Query.execute_paging_api()

Expand source code
@classmethod
def get_data_frame(
    cls,
    all_results: bool = True,
    auth_args: Auth = Auth.shared(),
    max_pages: Optional[int] = None,
    page_size: Optional[int] = None,
    log: bool = False,
    match: Optional[SummarySearchMatchOption] = None,
    code: Optional[Union[str, List[str]]] = None,
    display: Optional[Union[str, List[str]]] = None,
    system: Optional[Union[str, List[str]]] = None,
    **kw_args,
):
    """Execute a request for summary counts across clinical data

    >>> from phc.easy.summary.clinical_counts import SummaryClinicalCountsOptions
    >>> SummaryClinicalCountsOptions.get_data_frame(match="fuzzy", display="sleep")

    ## Parameters

    Execution: `phc.easy.query.Query.execute_paging_api`
    """

    df = super().get_data_frame(
        **kw_args, **cls._get_current_args(inspect.currentframe(), locals())
    )

    return df
def params_class()

Inherited from: PagingApiItem.params_class

Returns a pydantic type that validates and transforms the params with dict()

Expand source code
@staticmethod
def params_class():
    return SummaryClinicalCountsOptions
def process_params(params: dict) ‑> dict

Inherited from: PagingApiItem.process_params

Validates and transforms the API query parameters

def resource_path()

Inherited from: PagingApiItem.resource_path

Returns the API url name for retrieval

Expand source code
@staticmethod
def resource_path():
    return "analytics/summary/{project_id}/clinical/codified/counts"
def transform_results(data_frame: pandas.core.frame.DataFrame, **expand_args)

Inherited from: PagingApiItem.transform_results

Transform data frame batch

class SummaryCounts
Expand source code
class SummaryCounts(PagingApiItem):
    @staticmethod
    def resource_path():
        return "analytics/summary/{project_id}"

    @staticmethod
    def response_to_items(data):
        squashed = first(pd.json_normalize(data).to_dict("records")) or {}
        return lmapcat(
            lambda k: [{"summary": k, **v} for v in squashed[k]]
            if isinstance(squashed[k], list)
            else [],
            squashed.keys(),
        )

    @staticmethod
    def execute_args() -> dict:
        return dict(ignore_cache=True)

    @staticmethod
    def params_class():
        return NoOptions

    @staticmethod
    def transform_results(
        data_frame: pd.DataFrame, include_demographics: bool, **expand_args
    ):
        return pipe(
            data_frame,
            rpartial(
                combine_first, ["code", "index", "demographic_value"], "code"
            ),
            rpartial(
                combine_first,
                [
                    "code_count",
                    "count",
                    "sequence_type_count",
                    "test_type_count",
                    "variant_count",
                ],
                "count",
            ),
            rpartial(combine_first, ["display", "sequence_type"], "display"),
            iffy(
                lambda df: "summary" in df.columns,
                lambda df: df.assign(
                    summary=df.summary.str.replace(".counts", "", regex=False)
                ),
            ),
            rpartial(
                drop,
                [
                    "index",
                    "sequence_type_count",
                    "sequence_type",
                    "code_count",
                    "demographic_value",
                    "test_type_count",
                    "variant_count",
                ],
            ),
            iffy(
                lambda df: "summary" in df.columns and "count" in df.columns,
                lambda df: df.sort_values(
                    ["summary", "count"], ascending=False
                ),
            ),
            iffy(
                lambda df: include_demographics is False
                and "summary" in df.columns,
                lambda df: df[~df.summary.str.contains("demographic")],
            ),
        ).reset_index(drop=True)

    @classmethod
    def get_data_frame(
        cls,
        include_demographics: bool = False,
        all_results: bool = True,
        auth_args: Auth = Auth.shared(),
        max_pages: Optional[int] = None,
        page_size: Optional[int] = None,
        log: bool = False,
        **kw_args,
    ):
        """Execute a request for summary counts across clinical and omics data

        NOTE: By default, demographic data is excluded since it is not
        technically counts of entities. If demographics-only data is desired,
        use this:

        >>> from phc.easy.summary.item_counts import SummaryItemCounts
        >>> SummaryItemCounts.get_data_frame(summary="demographics")

        ## Parameters

        Execution: `phc.easy.query.Query.execute_paging_api`
        """

        # NOTE: include_demographics gets passed through to transform_results
        # since explicitly declared there.

        df = super().get_data_frame(
            **kw_args, **cls._get_current_args(inspect.currentframe(), locals())
        )

        return df

Ancestors

Static methods

def execute_args() ‑> dict
Expand source code
@staticmethod
def execute_args() -> dict:
    return dict(ignore_cache=True)
def get_data_frame(include_demographics: bool = False, all_results: bool = True, auth_args: Auth = <phc.easy.auth.Auth object>, max_pages: Optional[int] = None, page_size: Optional[int] = None, log: bool = False, **kw_args)

Execute a request for summary counts across clinical and omics data

NOTE: By default, demographic data is excluded since it is not technically counts of entities. If demographics-only data is desired, use this:

>>> from phc.easy.summary.item_counts import SummaryItemCounts
>>> SummaryItemCounts.get_data_frame(summary="demographics")

Parameters

Execution: Query.execute_paging_api()

Expand source code
@classmethod
def get_data_frame(
    cls,
    include_demographics: bool = False,
    all_results: bool = True,
    auth_args: Auth = Auth.shared(),
    max_pages: Optional[int] = None,
    page_size: Optional[int] = None,
    log: bool = False,
    **kw_args,
):
    """Execute a request for summary counts across clinical and omics data

    NOTE: By default, demographic data is excluded since it is not
    technically counts of entities. If demographics-only data is desired,
    use this:

    >>> from phc.easy.summary.item_counts import SummaryItemCounts
    >>> SummaryItemCounts.get_data_frame(summary="demographics")

    ## Parameters

    Execution: `phc.easy.query.Query.execute_paging_api`
    """

    # NOTE: include_demographics gets passed through to transform_results
    # since explicitly declared there.

    df = super().get_data_frame(
        **kw_args, **cls._get_current_args(inspect.currentframe(), locals())
    )

    return df
def params_class()

Inherited from: PagingApiItem.params_class

Returns a pydantic type that validates and transforms the params with dict()

Expand source code
@staticmethod
def params_class():
    return NoOptions
def process_params(params: dict) ‑> dict

Inherited from: PagingApiItem.process_params

Validates and transforms the API query parameters

def resource_path()

Inherited from: PagingApiItem.resource_path

Returns the API url name for retrieval

Expand source code
@staticmethod
def resource_path():
    return "analytics/summary/{project_id}"
def response_to_items(data)
Expand source code
@staticmethod
def response_to_items(data):
    squashed = first(pd.json_normalize(data).to_dict("records")) or {}
    return lmapcat(
        lambda k: [{"summary": k, **v} for v in squashed[k]]
        if isinstance(squashed[k], list)
        else [],
        squashed.keys(),
    )
def transform_results(data_frame: pandas.core.frame.DataFrame, include_demographics: bool, **expand_args)

Inherited from: PagingApiItem.transform_results

Transform data frame batch

Expand source code
@staticmethod
def transform_results(
    data_frame: pd.DataFrame, include_demographics: bool, **expand_args
):
    return pipe(
        data_frame,
        rpartial(
            combine_first, ["code", "index", "demographic_value"], "code"
        ),
        rpartial(
            combine_first,
            [
                "code_count",
                "count",
                "sequence_type_count",
                "test_type_count",
                "variant_count",
            ],
            "count",
        ),
        rpartial(combine_first, ["display", "sequence_type"], "display"),
        iffy(
            lambda df: "summary" in df.columns,
            lambda df: df.assign(
                summary=df.summary.str.replace(".counts", "", regex=False)
            ),
        ),
        rpartial(
            drop,
            [
                "index",
                "sequence_type_count",
                "sequence_type",
                "code_count",
                "demographic_value",
                "test_type_count",
                "variant_count",
            ],
        ),
        iffy(
            lambda df: "summary" in df.columns and "count" in df.columns,
            lambda df: df.sort_values(
                ["summary", "count"], ascending=False
            ),
        ),
        iffy(
            lambda df: include_demographics is False
            and "summary" in df.columns,
            lambda df: df[~df.summary.str.contains("demographic")],
        ),
    ).reset_index(drop=True)
class SummaryItemCounts
Expand source code
class SummaryItemCounts(PagingApiItem):
    @staticmethod
    def resource_path():
        return "analytics/summary/{project_id}/{summary_type}/{summary}/counts"

    @classmethod
    def process_params(cls, params: dict) -> dict:
        new_params = cls.params_class()(**params).dict()

        if SummaryClinicalType.has_value(new_params["summary"]):
            return {**new_params, "summary_type": "clinical"}
        elif SummaryOmicsType.has_value(new_params["summary"]):
            return {**new_params, "summary_type": "omics"}

        # Unknown summary_type
        return None

    @staticmethod
    def transform_results(data_frame: pd.DataFrame, **expand_args):
        if len(data_frame) == 0:
            return data_frame

        if expand_args.get("params", {}).get("summary") == "demographic":
            # Sort demographics results in a nice way
            return data_frame.sort_values(
                ["demographic_name", "count"], ascending=False
            ).reset_index(drop=True)

        if (
            "code_count" in data_frame.columns
            and "patient_count" in data_frame.columns
        ):
            return data_frame.sort_values(
                ["code_count", "patient_count"], ascending=False
            )

        return data_frame

    @staticmethod
    def execute_args() -> dict:
        return dict(ignore_cache=True)

    @staticmethod
    def params_class():
        return SummaryItemCountsOptions

    @classmethod
    def get_data_frame(
        cls,
        summary: Union[SummaryOmicsType, SummaryClinicalType],
        all_results: bool = True,
        auth_args: Auth = Auth.shared(),
        max_pages: Optional[int] = None,
        page_size: Optional[int] = None,
        log: bool = False,
        **kw_args,
    ):
        """Execute a request for summary counts across clinical and omics data

        NOTE: By default, demographic data is excluded since it is not
        technically counts of entities. If demographics-only data is desired,
        use this:

        >>> from phc.easy.summary.item_counts import SummaryItemCounts
        >>> SummaryItemCounts.get_data_frame(summary="demographics")

        ## Parameters

        Execution: `phc.easy.query.Query.execute_paging_api`
        """

        df = super().get_data_frame(
            **kw_args, **cls._get_current_args(inspect.currentframe(), locals())
        )

        return df

Ancestors

Static methods

def execute_args() ‑> dict
Expand source code
@staticmethod
def execute_args() -> dict:
    return dict(ignore_cache=True)
def get_data_frame(summary: Union[SummaryOmicsTypeSummaryClinicalType], all_results: bool = True, auth_args: Auth = <phc.easy.auth.Auth object>, max_pages: Optional[int] = None, page_size: Optional[int] = None, log: bool = False, **kw_args)

Execute a request for summary counts across clinical and omics data

NOTE: By default, demographic data is excluded since it is not technically counts of entities. If demographics-only data is desired, use this:

>>> from phc.easy.summary.item_counts import SummaryItemCounts
>>> SummaryItemCounts.get_data_frame(summary="demographics")

Parameters

Execution: Query.execute_paging_api()

Expand source code
@classmethod
def get_data_frame(
    cls,
    summary: Union[SummaryOmicsType, SummaryClinicalType],
    all_results: bool = True,
    auth_args: Auth = Auth.shared(),
    max_pages: Optional[int] = None,
    page_size: Optional[int] = None,
    log: bool = False,
    **kw_args,
):
    """Execute a request for summary counts across clinical and omics data

    NOTE: By default, demographic data is excluded since it is not
    technically counts of entities. If demographics-only data is desired,
    use this:

    >>> from phc.easy.summary.item_counts import SummaryItemCounts
    >>> SummaryItemCounts.get_data_frame(summary="demographics")

    ## Parameters

    Execution: `phc.easy.query.Query.execute_paging_api`
    """

    df = super().get_data_frame(
        **kw_args, **cls._get_current_args(inspect.currentframe(), locals())
    )

    return df
def params_class()

Inherited from: PagingApiItem.params_class

Returns a pydantic type that validates and transforms the params with dict()

Expand source code
@staticmethod
def params_class():
    return SummaryItemCountsOptions
def process_params(params: dict) ‑> dict

Inherited from: PagingApiItem.process_params

Validates and transforms the API query parameters

Expand source code
@classmethod
def process_params(cls, params: dict) -> dict:
    new_params = cls.params_class()(**params).dict()

    if SummaryClinicalType.has_value(new_params["summary"]):
        return {**new_params, "summary_type": "clinical"}
    elif SummaryOmicsType.has_value(new_params["summary"]):
        return {**new_params, "summary_type": "omics"}

    # Unknown summary_type
    return None
def resource_path()

Inherited from: PagingApiItem.resource_path

Returns the API url name for retrieval

Expand source code
@staticmethod
def resource_path():
    return "analytics/summary/{project_id}/{summary_type}/{summary}/counts"
def transform_results(data_frame: pandas.core.frame.DataFrame, **expand_args)

Inherited from: PagingApiItem.transform_results

Transform data frame batch

Expand source code
@staticmethod
def transform_results(data_frame: pd.DataFrame, **expand_args):
    if len(data_frame) == 0:
        return data_frame

    if expand_args.get("params", {}).get("summary") == "demographic":
        # Sort demographics results in a nice way
        return data_frame.sort_values(
            ["demographic_name", "count"], ascending=False
        ).reset_index(drop=True)

    if (
        "code_count" in data_frame.columns
        and "patient_count" in data_frame.columns
    ):
        return data_frame.sort_values(
            ["code_count", "patient_count"], ascending=False
        )

    return data_frame
class SummaryOmicsCounts
Expand source code
class SummaryOmicsCounts(PagingApiItem):
    @staticmethod
    def resource_path():
        return "analytics/summary/{project_id}/omics"

    @staticmethod
    def response_to_items(data):
        # Common functionality with SummaryCounts since pd.json_normalize squashes nested keys until nested arrays encountered
        return SummaryCounts.response_to_items(data)

    @staticmethod
    def transform_results(data_frame: pd.DataFrame, **expand_args):
        return SummaryCounts.transform_results(
            data_frame,
            # Omics doesn't include demographics
            include_demographics=False,
            **expand_args,
        )

    @staticmethod
    def execute_args() -> dict:
        return SummaryCounts.execute_args()

    @staticmethod
    def params_class():
        return NoOptions

    @classmethod
    def get_data_frame(
        cls,
        all_results: bool = True,
        auth_args: Auth = Auth.shared(),
        max_pages: Optional[int] = None,
        page_size: Optional[int] = None,
        log: bool = False,
        **kw_args,
    ):
        """Execute a request for summary counts across omics data

        ## Parameters

        Execution: `phc.easy.query.Query.execute_paging_api`
        """

        df = super().get_data_frame(
            **kw_args, **cls._get_current_args(inspect.currentframe(), locals())
        )

        return df

Ancestors

Static methods

def execute_args() ‑> dict
Expand source code
@staticmethod
def execute_args() -> dict:
    return SummaryCounts.execute_args()
def get_data_frame(all_results: bool = True, auth_args: Auth = <phc.easy.auth.Auth object>, max_pages: Optional[int] = None, page_size: Optional[int] = None, log: bool = False, **kw_args)

Execute a request for summary counts across omics data

Parameters

Execution: Query.execute_paging_api()

Expand source code
@classmethod
def get_data_frame(
    cls,
    all_results: bool = True,
    auth_args: Auth = Auth.shared(),
    max_pages: Optional[int] = None,
    page_size: Optional[int] = None,
    log: bool = False,
    **kw_args,
):
    """Execute a request for summary counts across omics data

    ## Parameters

    Execution: `phc.easy.query.Query.execute_paging_api`
    """

    df = super().get_data_frame(
        **kw_args, **cls._get_current_args(inspect.currentframe(), locals())
    )

    return df
def params_class()

Inherited from: PagingApiItem.params_class

Returns a pydantic type that validates and transforms the params with dict()

Expand source code
@staticmethod
def params_class():
    return NoOptions
def process_params(params: dict) ‑> dict

Inherited from: PagingApiItem.process_params

Validates and transforms the API query parameters

def resource_path()

Inherited from: PagingApiItem.resource_path

Returns the API url name for retrieval

Expand source code
@staticmethod
def resource_path():
    return "analytics/summary/{project_id}/omics"
def response_to_items(data)
Expand source code
@staticmethod
def response_to_items(data):
    # Common functionality with SummaryCounts since pd.json_normalize squashes nested keys until nested arrays encountered
    return SummaryCounts.response_to_items(data)
def transform_results(data_frame: pandas.core.frame.DataFrame, **expand_args)

Inherited from: PagingApiItem.transform_results

Transform data frame batch

Expand source code
@staticmethod
def transform_results(data_frame: pd.DataFrame, **expand_args):
    return SummaryCounts.transform_results(
        data_frame,
        # Omics doesn't include demographics
        include_demographics=False,
        **expand_args,
    )