Module phc.easy.util

Expand source code
import math
from functools import reduce, wraps
from typing import Callable, List, Optional, Union

import pandas as pd
from funcy import lmapcat
from toolz import groupby

try:
    from tqdm.auto import tqdm
except ImportError:
    _has_tqdm = False
    tqdm = None
else:
    _has_tqdm = True


def rename_keys(dictionary: dict, mapping: dict):
    "Rename keys in a dictionary"
    return {mapping.get(k, k): v for k, v in dictionary.items()}


def join_underscore(values):
    return "_".join(
        [
            str(value)
            for value in values
            if isinstance(value, int) or len(value) > 0
        ]
    )


def without_keys(dictionary, keys):
    return {k: v for k, v in dictionary.items() if k not in keys}


def prefix_dict_keys(dictionary, prefix: Union[str, int]):
    if isinstance(prefix, str) and len(prefix) == 0:
        return dictionary

    return {f"{prefix}_{key}": value for key, value in dictionary.items()}


def concat_dicts(dicts, prefix: Union[str, int] = ""):
    "Concatenate list of dictionaries"

    def bump_key_index(key, existing_dict, start=1):
        "Prefix with _1 until index not in existing dictionary"
        if key not in existing_dict:
            return key

        new_key = f"{key}_{start}"
        if new_key in existing_dict:
            return bump_key_index(key, existing_dict, start + 1)

        return new_key

    def reduce_two_dicts(acc, dictionary):
        return {
            **acc,
            **{bump_key_index(k, acc): v for k, v in dictionary.items()},
        }

    return prefix_dict_keys(reduce(reduce_two_dicts, dicts, {}), prefix)


def defaultprop(fn):
    """Function decorator to have a default property (but not automatically set
    it)
    """
    attr_name = "_" + fn.__name__

    @property
    @wraps(fn)
    def _defaultprop(self):
        if not hasattr(self, attr_name):
            return fn(self)

        value = getattr(self, attr_name)
        if value is None:
            return fn(self)

        return value

    return _defaultprop


def with_progress(
    init_progress: Callable[[], Optional[tqdm]],
    func: Callable[[Union[None, tqdm]], None],
):
    if _has_tqdm:
        progress = init_progress()
        result = func(progress)
        if progress is not None:
            progress.close()
        return result

    return func(None)


def add_prefixes(values: List[str], prefixes: List[str]):
    """Add prefix to each value if not already present"""
    if len(prefixes) == 0:
        return values

    return lmapcat(
        lambda prefix: [
            (value if value.startswith(prefix) else f"{prefix}{value}")
            for value in values
        ],
        prefixes,
    )


class Hashabledict(dict):
    """Dictionary that is hashable (useful for creating set of unique dictionaries)"""

    def __hash__(self):
        return hash(frozenset(self))


def get_values_at_codeable_paths(value: dict, keys: List[str]):
    """Extract values from FHIR records based on keys (useful for extracting codes)"""

    def _get_value_at_codeable_path(
        value: Union[list, dict], components: List[str], key: str
    ):
        if value is None:
            return []

        if isinstance(value, list):
            return lmapcat(
                lambda v: _get_value_at_codeable_path(v, components, key), value
            )

        if len(components) == 0:
            return [Hashabledict({"field": key, **value})]

        if not isinstance(value, dict) and not isinstance(value, pd.Series):
            return []

        return _get_value_at_codeable_path(
            value.get(components[0], None), components[1:], key
        )

    def get_value_at_codeable_path(value: dict, key: str):
        return _get_value_at_codeable_path(value, key.split("."), key)

    return lmapcat(lambda key: get_value_at_codeable_path(value, key), keys)


def extract_codes(results: list, display: str, code_fields: List[str]):
    """Extract code values from a list of dictionaries based on the code keys.
    Requires a display value to filter results preemptively (instead of
    filtering afterwards)
    """

    codes = set()

    for row in results:
        row_codes = get_values_at_codeable_paths(row, code_fields)
        for code in row_codes:
            if (
                isinstance(code, dict)
                # Poor man's way to filter only matching codes (since Elasticsearch
                # returns records which will include other codes)
                and display.lower() in code.get("display", "").lower()
            ):
                codes.add(code)

    return pd.DataFrame(list(codes))


def split_by(args: dict, left_keys: List[str]):
    """Split into two dictionaries (left is whitelist and right is remaining)"""
    result = {
        k: dict(v)
        for k, v in groupby(
            lambda pair: pair[0] in left_keys, args.items()
        ).items()
    }

    return (result.get(True, {}), result.get(False, {}))

Sub-modules

phc.easy.util.api_cache
phc.easy.util.batch
phc.easy.util.frame

Functions

def add_prefixes(values: List[str], prefixes: List[str])

Add prefix to each value if not already present

Expand source code
def add_prefixes(values: List[str], prefixes: List[str]):
    """Add prefix to each value if not already present"""
    if len(prefixes) == 0:
        return values

    return lmapcat(
        lambda prefix: [
            (value if value.startswith(prefix) else f"{prefix}{value}")
            for value in values
        ],
        prefixes,
    )
def concat_dicts(dicts, prefix: Union[str, int] = '')

Concatenate list of dictionaries

Expand source code
def concat_dicts(dicts, prefix: Union[str, int] = ""):
    "Concatenate list of dictionaries"

    def bump_key_index(key, existing_dict, start=1):
        "Prefix with _1 until index not in existing dictionary"
        if key not in existing_dict:
            return key

        new_key = f"{key}_{start}"
        if new_key in existing_dict:
            return bump_key_index(key, existing_dict, start + 1)

        return new_key

    def reduce_two_dicts(acc, dictionary):
        return {
            **acc,
            **{bump_key_index(k, acc): v for k, v in dictionary.items()},
        }

    return prefix_dict_keys(reduce(reduce_two_dicts, dicts, {}), prefix)
def defaultprop(fn)

Function decorator to have a default property (but not automatically set it)

Expand source code
def defaultprop(fn):
    """Function decorator to have a default property (but not automatically set
    it)
    """
    attr_name = "_" + fn.__name__

    @property
    @wraps(fn)
    def _defaultprop(self):
        if not hasattr(self, attr_name):
            return fn(self)

        value = getattr(self, attr_name)
        if value is None:
            return fn(self)

        return value

    return _defaultprop
def extract_codes(results: list, display: str, code_fields: List[str])

Extract code values from a list of dictionaries based on the code keys. Requires a display value to filter results preemptively (instead of filtering afterwards)

Expand source code
def extract_codes(results: list, display: str, code_fields: List[str]):
    """Extract code values from a list of dictionaries based on the code keys.
    Requires a display value to filter results preemptively (instead of
    filtering afterwards)
    """

    codes = set()

    for row in results:
        row_codes = get_values_at_codeable_paths(row, code_fields)
        for code in row_codes:
            if (
                isinstance(code, dict)
                # Poor man's way to filter only matching codes (since Elasticsearch
                # returns records which will include other codes)
                and display.lower() in code.get("display", "").lower()
            ):
                codes.add(code)

    return pd.DataFrame(list(codes))
def get_values_at_codeable_paths(value: dict, keys: List[str])

Extract values from FHIR records based on keys (useful for extracting codes)

Expand source code
def get_values_at_codeable_paths(value: dict, keys: List[str]):
    """Extract values from FHIR records based on keys (useful for extracting codes)"""

    def _get_value_at_codeable_path(
        value: Union[list, dict], components: List[str], key: str
    ):
        if value is None:
            return []

        if isinstance(value, list):
            return lmapcat(
                lambda v: _get_value_at_codeable_path(v, components, key), value
            )

        if len(components) == 0:
            return [Hashabledict({"field": key, **value})]

        if not isinstance(value, dict) and not isinstance(value, pd.Series):
            return []

        return _get_value_at_codeable_path(
            value.get(components[0], None), components[1:], key
        )

    def get_value_at_codeable_path(value: dict, key: str):
        return _get_value_at_codeable_path(value, key.split("."), key)

    return lmapcat(lambda key: get_value_at_codeable_path(value, key), keys)
def join_underscore(values)
Expand source code
def join_underscore(values):
    return "_".join(
        [
            str(value)
            for value in values
            if isinstance(value, int) or len(value) > 0
        ]
    )
def prefix_dict_keys(dictionary, prefix: Union[str, int])
Expand source code
def prefix_dict_keys(dictionary, prefix: Union[str, int]):
    if isinstance(prefix, str) and len(prefix) == 0:
        return dictionary

    return {f"{prefix}_{key}": value for key, value in dictionary.items()}
def rename_keys(dictionary: dict, mapping: dict)

Rename keys in a dictionary

Expand source code
def rename_keys(dictionary: dict, mapping: dict):
    "Rename keys in a dictionary"
    return {mapping.get(k, k): v for k, v in dictionary.items()}
def split_by(args: dict, left_keys: List[str])

Split into two dictionaries (left is whitelist and right is remaining)

Expand source code
def split_by(args: dict, left_keys: List[str]):
    """Split into two dictionaries (left is whitelist and right is remaining)"""
    result = {
        k: dict(v)
        for k, v in groupby(
            lambda pair: pair[0] in left_keys, args.items()
        ).items()
    }

    return (result.get(True, {}), result.get(False, {}))
def with_progress(init_progress: Callable[[], None], func: Callable[[None], None])
Expand source code
def with_progress(
    init_progress: Callable[[], Optional[tqdm]],
    func: Callable[[Union[None, tqdm]], None],
):
    if _has_tqdm:
        progress = init_progress()
        result = func(progress)
        if progress is not None:
            progress.close()
        return result

    return func(None)
def without_keys(dictionary, keys)
Expand source code
def without_keys(dictionary, keys):
    return {k: v for k, v in dictionary.items() if k not in keys}

Classes

class Hashabledict (*args, **kwargs)

Dictionary that is hashable (useful for creating set of unique dictionaries)

Expand source code
class Hashabledict(dict):
    """Dictionary that is hashable (useful for creating set of unique dictionaries)"""

    def __hash__(self):
        return hash(frozenset(self))

Ancestors

  • builtins.dict