Module phc.services

Contains services for accessing different parts of the PHC platform.

Expand source code
"""
Contains services for accessing different parts of the PHC platform.
"""

from phc.services.accounts import Accounts
from phc.services.analytics import Analytics
from phc.services.fhir import Fhir
from phc.services.projects import Projects
from phc.services.files import Files
from phc.services.cohorts import Cohorts
from phc.services.genomics import Genomics
from phc.services.tools import Tools
from phc.services.workflows import Workflows


__all__ = [
    "Accounts",
    "Analytics",
    "Fhir",
    "Projects",
    "Files",
    "Cohorts",
    "Genomics",
    "Tools",
    "Workflows",
]

__pdoc__ = {
    "accounts": False,
    "analytics": False,
    "fhir": False,
    "projects": False,
    "files": False,
    "cohorts": False,
    "genomics": False,
    "tools": False,
    "workflows": False,
}

Classes

class Accounts (session: phc.session.Session, run_async: bool = False, timeout: int = 30, trust_env: bool = False)

Provides acccess to PHC accounts

Parameters

session : Session
The PHC session
run_async : bool
True to return promises, False to return results (default is False)
timeout : int
Operation timeout (default is 30)
trust_env : bool
Get proxies information from HTTP_PROXY / HTTPS_PROXY environment variables if the parameter is True (False by default)
Expand source code
class Accounts(BaseClient):
    """Provides acccess to PHC accounts

    Parameters
    ----------
    session : phc.Session
        The PHC session
    run_async: bool
        True to return promises, False to return results (default is False)
    timeout: int
        Operation timeout (default is 30)
    trust_env: bool
        Get proxies information from HTTP_PROXY / HTTPS_PROXY environment variables if the parameter is True (False by default)
    """

    def get_list(self) -> ApiResponse:
        """Fetches the list of accounts for the current session

        Returns
        -------
        phc.ApiResponse
            The list accounts response
        """
        return self._api_call("accounts", http_verb="GET")

Ancestors

  • phc.base_client.BaseClient

Methods

def get_list(self) ‑> phc.api_response.ApiResponse

Fetches the list of accounts for the current session

Returns

ApiResponse
The list accounts response
Expand source code
def get_list(self) -> ApiResponse:
    """Fetches the list of accounts for the current session

    Returns
    -------
    phc.ApiResponse
        The list accounts response
    """
    return self._api_call("accounts", http_verb="GET")
class Analytics (session: phc.session.Session, run_async: bool = False, timeout: int = 30, trust_env: bool = False)

Provides acccess to PHC Analytics and Data Lake

Parameters

session : Session
The PHC session
run_async : bool
True to return promises, False to return results (default is False)
timeout : int
Operation timeout (default is 30)
trust_env : bool
Get proxies information from HTTP_PROXY / HTTPS_PROXY environment variables if the parameter is True (False by default)
Expand source code
class Analytics(BaseClient):
    """Provides acccess to PHC Analytics and Data Lake

    Parameters
    ----------
    session : phc.Session
        The PHC session
    run_async: bool
        True to return promises, False to return results (default is False)
    timeout: int
        Operation timeout (default is 30)
    trust_env: bool
        Get proxies information from HTTP_PROXY / HTTPS_PROXY environment variables if the parameter is True (False by default)
    """

    def execute_sql(
        self, statement: str, project_id: str = None, cohort_id: str = None
    ) -> ApiResponse:
        """Executes a SQL query against Analytics

        Parameters
        ----------
        project_id : str
            The project ID
        cohort_id : str
            The cohort ID
        statement : str
            The SQL statement

        Returns
        -------
        ApiResponse
            The API Response

        Raises
        ------
        ValueError
            If no project or cohort ID is provided

        Examples
        --------
        >>> from phc.services import Analytics
        >>> client = Analytics(session)
        >>> res = client.execute_sql(cohort_id='5a07dedb-fa2a-4cb0-b662-95b23a050221', statement='SELECT patients from patient')
        >>> print(f"Found {len(res.get('data').get('patients'))} patients")
        """
        if not project_id and not cohort_id:
            raise ValueError(
                "Must provide a value for the project or cohort ID"
            )

        payload = {"string_query": statement}

        if project_id:
            payload["dataset_id"] = project_id
        if cohort_id:
            payload["cohort_id"] = cohort_id

        return self._api_call("analytics/dsl", http_verb="POST", json=payload)

    def get_patients(
        self, project_id: str, query_builder: PatientFilterQueryBuilder
    ) -> ApiResponse:
        """Executes a query that returns patients

        Parameters
        ----------
        project_id : str
            The project ID
        query_builder : util.PatientFilterQueryBuilder
            The query builder

        Returns
        -------
        list
            The list of patients

        Examples
        --------
        >>> from phc.services import Analytics
        >>> from phc.util import PatientFilterQueryBuilder
        >>> client = Analytics(session)
        >>> search = PatientFilterQueryBuilder()
        >>> search.patient() \
                .observations() \
                .code(eq='11142-7') \
                .system(eq='http://loinc.org') \
                .value_quantity(lt=40)
        >>> res = client.get_patients(project='5a07dedb-fa2a-4cb0-b662-95b23a050221', query_builder=search)
        >>> print(f"Found {len(res)} patients")
        """
        payload = query_builder.to_dict()
        payload["dataset_id"] = project_id
        return (
            self._api_call("analytics/dsl", http_verb="POST", json=payload)
            .get("data")
            .get("patients")
        )

    def execute_data_lake_query(self, query: DataLakeQuery) -> ApiResponse:
        """Executes a data lake query

        Parameters
        ----------
        query : util.DataLakeQuery
            The query builder

        Returns
        -------
        phc.ApiResponse
            The data lake query

        Examples
        --------
        >>> from phc import Session
        >>> from phc.services import Analytics
        >>> from phc.util import DataLakeQuery

        >>> session = Session()
        >>> client = Analytics(session)

        >>> project_id = '19e34782-91c4-4143-aaee-2ba81ed0b206'
        >>> query_string = "SELECT sample_id, gene, impact, amino_acid_change, histology FROM variant WHERE tumor_site='breast'"
        >>> output_file_name = 'query-test-notebook'
        >>> query = DataLakeQuery(project_id=project_id, query=query_string, output_file_name=output_file_name)

        >>> query_id = client.execute_data_lake_query(query)
        >>> specific_query = client.get_data_lake_query(query_id)
        >>> paginated_dataset_queries = client.list_data_lake_queries(project_id)
        >>> print(query_id)
        """
        payload = query.to_request_dict()
        return self._api_call(
            "analytics/data-lake/query", http_verb="POST", json=payload
        ).get("queryId")

    def list_data_lake_queries(
        self, project_id: str, page_size: int = 25, next_page_token: str = None
    ) -> ApiResponse:
        """Fetches a list of data lake queries

        Parameters
        ----------
        project_id : str
            The project ID
        page_size : int, optional
            The page size, by default 25
        next_page_token : str, optional
            The next page token, by default None

        Returns
        -------
        phc.ApiResponse
            The data lake list query response
        """
        path = "analytics/data-lake/query?datasetId=%s&pageSize=%d" % (
            project_id,
            page_size,
        )
        if next_page_token:
            path = "%s&nextPageToken=%s" % (path, next_page_token)
        return self._api_call(path, http_verb="GET")

    def get_data_lake_query(self, query_id: str) -> ApiResponse:
        """Fetches a data lake query

        Parameters
        ----------
        query_id : string
            The query ID

        Returns
        -------
        phc.ApiResponse
            The data lake query get response
        """
        return self._api_call(
            "analytics/data-lake/query/%s" % query_id, http_verb="GET"
        )

    def list_data_lake_schemas(self, project_id: str) -> ApiResponse:
        """Fetches the data lake table schemas

        Parameters
        ----------
        project_id : string
            The dataset to fetch the table schemas of

        Returns
        -------
        phc.ApiResponse
            The schema for each data lake table
        """
        path = "analytics/data-lake/schema?datasetId=%s" % (project_id)
        return self._api_call(path, http_verb="GET")

    def get_data_lake_schema(self, project_id: str, table: str) -> ApiResponse:
        """Fetches the schema for a specific data lake table

        Parameters
        ----------
        project_id : string
            The dataset to fetch the table schema of

        table : string
            Name of the table

        Returns
        -------
        phc.ApiResponse
            Schema of the specified table
        """
        path = "analytics/data-lake/schema/%s?datasetId=%s" % (
            table,
            project_id,
        )
        return self._api_call(path, http_verb="GET")

    def execute_data_lake_query_to_dataframe(
        self, query: DataLakeQuery, dest_dir: str = os.getcwd()
    ):
        """Executes a data lake query, downloads the result file and converts to a Pandas dataframe.

        To use this method, the 'pandas' module is required.
        Otherwise, an exception will be thrown.

        Parameters
        ----------
        query : util.DataLakeQuery
            The query builder

        dest_dir : string
            Directory the result file will be downloaded to.
            Defaults to the current working directory.

        Returns
        -------
        asyncio.Future || pandas.DataFrame
            A Future if run_async is True, the data lake query result contained in a Pandas dataframe otherwise.

        Examples
        --------
        >>> from phc import Session
        >>> from phc.services import Analytics
        >>> from phc.util import DataLakeQuery

        >>> session = Session()
        >>> client = Analytics(session)

        >>> project_id = '19e34782-91c4-4143-aaee-2ba81ed0b206'
        >>> query_string = "SELECT sample_id, gene, impact, amino_acid_change, histology FROM variant WHERE tumor_site='breast'"
        >>> output_file_name = 'query-dataframe-test'
        >>> query = DataLakeQuery(project_id=project_id, query=query_string, output_file_name=output_file_name)

        >>> dataframe = client.execute_data_lake_query_to_dataframe(query)
        >>> dataframe.head()
        """
        if not _has_pandas:
            raise ImportError("pandas is required")

        future = asyncio.ensure_future(
            self.__execute_data_lake_query_to_dataframe_impl(query, dest_dir),
            loop=self._event_loop,
        )
        return (
            future
            if self.run_async
            else self._event_loop.run_until_complete(future)
        )

    def load_data_lake_result_to_dataframe(
        self, query_id: str, dest_dir: str = os.getcwd()
    ):
        """Downloads the result file of a query and converts to a Pandas dataframe.

        To use this method, the 'pandas' module is required.
        Otherwise, an exception will be thrown.

        Parameters
        ----------
        query_id : string
            Id of the query to load results from

        dest_dir : string
            Directory the result file will be downloaded to.
            Defaults to the current working directory.

        Returns
        -------
        asyncio.Future || pandas.DataFrame
            A Future if run_async is True, the data lake query result contained in a Pandas dataframe otherwise.
        """
        if not _has_pandas:
            raise ImportError("pandas is required")

        future = asyncio.ensure_future(
            self.__load_data_lake_result_to_dataframe_impl(query_id, dest_dir),
            loop=self._event_loop,
        )
        return (
            future
            if self.run_async
            else self._event_loop.run_until_complete(future)
        )

    async def __execute_data_lake_query_to_dataframe_impl(
        self, query: DataLakeQuery, dest_dir: str
    ):
        """Internal method for execting a data lake query, downloads the result file and converts to a Pandas dataframe.

        This method exists to support either async or synchronous execution.

        Parameters
        ----------
        query : util.DataLakeQuery
            The query builder

        dest_dir : string
            Directory the result file will be downloaded to

        Returns
        -------
        pandas.DataFrame
            The data lake query result contained in a Pandas dataframe.
        """
        analytics_client = (
            Analytics(
                self.session,
                run_async=False,
                timeout=self.timeout,
                trust_env=self.trust_env,
            )
            if self.run_async
            else self
        )
        query_id = analytics_client.execute_data_lake_query(query)

        if not self.__poll_predicate(
            self.__data_lake_query_predicate, 3600, analytics_client, query_id
        ):
            raise RuntimeError(
                f"Timed out waiting for query {query_id} to complete"
            )

        return await self.__load_data_lake_result_to_dataframe_impl(
            query_id, dest_dir
        )

    async def __load_data_lake_result_to_dataframe_impl(
        self, query_id: str, dest_dir: str
    ):
        """Internal method for loading an existing data lake query result to a Pandas dataframe.

        This method exists to support either async or synchronous execution.

        Parameters
        ----------
        query_id : string
            Id of the query to load results from

        dest_dir : string
            Directory the result file will be downloaded to

        Returns
        -------
        pandas.DataFrame
            The data lake query result contained in a Pandas dataframe.
        """
        analytics_client = (
            Analytics(
                self.session,
                run_async=False,
                timeout=self.timeout,
                trust_env=self.trust_env,
            )
            if self.run_async
            else self
        )
        analytics_client.get_data_lake_query(
            query_id
        )  # verify the query exists, an exception will be thrown if it does not

        files_client = files.Files(
            self.session,
            run_async=False,
            timeout=self.timeout,
            trust_env=self.trust_env,
        )
        if not self.__poll_predicate(files_client.exists, 30, query_id):
            raise RuntimeError(
                f"Timed out waiting for result file {query_id} to become available"
            )

        download_path = files_client.download(query_id, dest_dir=dest_dir)
        return _pd.read_csv(download_path)

    def __data_lake_query_predicate(self, analytics_client, query_id):
        """Checks if a query has completed successfully.

        If the query was cancelled or failed an exception will be thrown.

        Parameters
        ----------
        analytics_client : phc.services.Analytics
            Instance of the Analytics client

        query_id : string
            Id of the query to check for completion

        Returns
        -------
        bool
            True if the query is in the 'succeeded' state, False if 'running'.
        """
        response = analytics_client.get_data_lake_query(query_id)
        state = response.get("state")

        if state == "failed" or state == "cancelled":
            raise RuntimeError(f"Query {query_id} is {state}")
        return state == "succeeded"

    def __poll_predicate(self, predicate, timeout_sec, *args, **kwargs):
        """Executes a function until it returns a truthy value or the timeout is reached.

        This method will wait 2 seconds between predicate function executions.

        Parameters
        ----------
        predicate : function
            Function to invoke until it returns a truthy value

        timeout_sec : int
            The number of seconds to wait until timing out

        args : list
            The positional args to invoke the predicate function with

        kwargs : dict
            The keyword args to invoke the predicate function with
        Returns
        -------
        bool
            True if the function evaluated to True, False otherwise.
        """
        timeout_time = time.time() + timeout_sec
        while timeout_time > time.time():
            if predicate(*args, **kwargs):
                return True
            time.sleep(2)
        return False

Ancestors

  • phc.base_client.BaseClient

Methods

def execute_data_lake_query(self, query: phc.util.data_lake_query.DataLakeQuery) ‑> phc.api_response.ApiResponse

Executes a data lake query

Parameters

query : util.DataLakeQuery
The query builder

Returns

ApiResponse
The data lake query

Examples

>>> from phc import Session
>>> from phc.services import Analytics
>>> from phc.util import DataLakeQuery
>>> session = Session()
>>> client = Analytics(session)
>>> project_id = '19e34782-91c4-4143-aaee-2ba81ed0b206'
>>> query_string = "SELECT sample_id, gene, impact, amino_acid_change, histology FROM variant WHERE tumor_site='breast'"
>>> output_file_name = 'query-test-notebook'
>>> query = DataLakeQuery(project_id=project_id, query=query_string, output_file_name=output_file_name)
>>> query_id = client.execute_data_lake_query(query)
>>> specific_query = client.get_data_lake_query(query_id)
>>> paginated_dataset_queries = client.list_data_lake_queries(project_id)
>>> print(query_id)
Expand source code
def execute_data_lake_query(self, query: DataLakeQuery) -> ApiResponse:
    """Executes a data lake query

    Parameters
    ----------
    query : util.DataLakeQuery
        The query builder

    Returns
    -------
    phc.ApiResponse
        The data lake query

    Examples
    --------
    >>> from phc import Session
    >>> from phc.services import Analytics
    >>> from phc.util import DataLakeQuery

    >>> session = Session()
    >>> client = Analytics(session)

    >>> project_id = '19e34782-91c4-4143-aaee-2ba81ed0b206'
    >>> query_string = "SELECT sample_id, gene, impact, amino_acid_change, histology FROM variant WHERE tumor_site='breast'"
    >>> output_file_name = 'query-test-notebook'
    >>> query = DataLakeQuery(project_id=project_id, query=query_string, output_file_name=output_file_name)

    >>> query_id = client.execute_data_lake_query(query)
    >>> specific_query = client.get_data_lake_query(query_id)
    >>> paginated_dataset_queries = client.list_data_lake_queries(project_id)
    >>> print(query_id)
    """
    payload = query.to_request_dict()
    return self._api_call(
        "analytics/data-lake/query", http_verb="POST", json=payload
    ).get("queryId")
def execute_data_lake_query_to_dataframe(self, query: phc.util.data_lake_query.DataLakeQuery, dest_dir: str = '/home/runner/work/phc-sdk-py/phc-sdk-py')

Executes a data lake query, downloads the result file and converts to a Pandas dataframe.

To use this method, the 'pandas' module is required. Otherwise, an exception will be thrown.

Parameters

query : util.DataLakeQuery
The query builder
dest_dir : string
Directory the result file will be downloaded to. Defaults to the current working directory.

Returns

asyncio.Future || pandas.DataFrame
A Future if run_async is True, the data lake query result contained in a Pandas dataframe otherwise.

Examples

>>> from phc import Session
>>> from phc.services import Analytics
>>> from phc.util import DataLakeQuery
>>> session = Session()
>>> client = Analytics(session)
>>> project_id = '19e34782-91c4-4143-aaee-2ba81ed0b206'
>>> query_string = "SELECT sample_id, gene, impact, amino_acid_change, histology FROM variant WHERE tumor_site='breast'"
>>> output_file_name = 'query-dataframe-test'
>>> query = DataLakeQuery(project_id=project_id, query=query_string, output_file_name=output_file_name)
>>> dataframe = client.execute_data_lake_query_to_dataframe(query)
>>> dataframe.head()
Expand source code
def execute_data_lake_query_to_dataframe(
    self, query: DataLakeQuery, dest_dir: str = os.getcwd()
):
    """Executes a data lake query, downloads the result file and converts to a Pandas dataframe.

    To use this method, the 'pandas' module is required.
    Otherwise, an exception will be thrown.

    Parameters
    ----------
    query : util.DataLakeQuery
        The query builder

    dest_dir : string
        Directory the result file will be downloaded to.
        Defaults to the current working directory.

    Returns
    -------
    asyncio.Future || pandas.DataFrame
        A Future if run_async is True, the data lake query result contained in a Pandas dataframe otherwise.

    Examples
    --------
    >>> from phc import Session
    >>> from phc.services import Analytics
    >>> from phc.util import DataLakeQuery

    >>> session = Session()
    >>> client = Analytics(session)

    >>> project_id = '19e34782-91c4-4143-aaee-2ba81ed0b206'
    >>> query_string = "SELECT sample_id, gene, impact, amino_acid_change, histology FROM variant WHERE tumor_site='breast'"
    >>> output_file_name = 'query-dataframe-test'
    >>> query = DataLakeQuery(project_id=project_id, query=query_string, output_file_name=output_file_name)

    >>> dataframe = client.execute_data_lake_query_to_dataframe(query)
    >>> dataframe.head()
    """
    if not _has_pandas:
        raise ImportError("pandas is required")

    future = asyncio.ensure_future(
        self.__execute_data_lake_query_to_dataframe_impl(query, dest_dir),
        loop=self._event_loop,
    )
    return (
        future
        if self.run_async
        else self._event_loop.run_until_complete(future)
    )
def execute_sql(self, statement: str, project_id: str = None, cohort_id: str = None) ‑> phc.api_response.ApiResponse

Executes a SQL query against Analytics

Parameters

project_id : str
The project ID
cohort_id : str
The cohort ID
statement : str
The SQL statement

Returns

ApiResponse
The API Response

Raises

ValueError
If no project or cohort ID is provided

Examples

>>> from phc.services import Analytics
>>> client = Analytics(session)
>>> res = client.execute_sql(cohort_id='5a07dedb-fa2a-4cb0-b662-95b23a050221', statement='SELECT patients from patient')
>>> print(f"Found {len(res.get('data').get('patients'))} patients")
Expand source code
def execute_sql(
    self, statement: str, project_id: str = None, cohort_id: str = None
) -> ApiResponse:
    """Executes a SQL query against Analytics

    Parameters
    ----------
    project_id : str
        The project ID
    cohort_id : str
        The cohort ID
    statement : str
        The SQL statement

    Returns
    -------
    ApiResponse
        The API Response

    Raises
    ------
    ValueError
        If no project or cohort ID is provided

    Examples
    --------
    >>> from phc.services import Analytics
    >>> client = Analytics(session)
    >>> res = client.execute_sql(cohort_id='5a07dedb-fa2a-4cb0-b662-95b23a050221', statement='SELECT patients from patient')
    >>> print(f"Found {len(res.get('data').get('patients'))} patients")
    """
    if not project_id and not cohort_id:
        raise ValueError(
            "Must provide a value for the project or cohort ID"
        )

    payload = {"string_query": statement}

    if project_id:
        payload["dataset_id"] = project_id
    if cohort_id:
        payload["cohort_id"] = cohort_id

    return self._api_call("analytics/dsl", http_verb="POST", json=payload)
def get_data_lake_query(self, query_id: str) ‑> phc.api_response.ApiResponse

Fetches a data lake query

Parameters

query_id : string
The query ID

Returns

ApiResponse
The data lake query get response
Expand source code
def get_data_lake_query(self, query_id: str) -> ApiResponse:
    """Fetches a data lake query

    Parameters
    ----------
    query_id : string
        The query ID

    Returns
    -------
    phc.ApiResponse
        The data lake query get response
    """
    return self._api_call(
        "analytics/data-lake/query/%s" % query_id, http_verb="GET"
    )
def get_data_lake_schema(self, project_id: str, table: str) ‑> phc.api_response.ApiResponse

Fetches the schema for a specific data lake table

Parameters

project_id : string
The dataset to fetch the table schema of
table : string
Name of the table

Returns

ApiResponse
Schema of the specified table
Expand source code
def get_data_lake_schema(self, project_id: str, table: str) -> ApiResponse:
    """Fetches the schema for a specific data lake table

    Parameters
    ----------
    project_id : string
        The dataset to fetch the table schema of

    table : string
        Name of the table

    Returns
    -------
    phc.ApiResponse
        Schema of the specified table
    """
    path = "analytics/data-lake/schema/%s?datasetId=%s" % (
        table,
        project_id,
    )
    return self._api_call(path, http_verb="GET")
def get_patients(self, project_id: str, query_builder: phc.util.patient_filter_query_builder.PatientFilterQueryBuilder) ‑> phc.api_response.ApiResponse

Executes a query that returns patients

Parameters

project_id : str
The project ID
query_builder : util.PatientFilterQueryBuilder
The query builder

Returns

list
The list of patients

Examples

>>> from phc.services import Analytics
>>> from phc.util import PatientFilterQueryBuilder
>>> client = Analytics(session)
>>> search = PatientFilterQueryBuilder()
>>> search.patient()                 .observations()                 .code(eq='11142-7')                 .system(eq='http://loinc.org')                 .value_quantity(lt=40)
>>> res = client.get_patients(project='5a07dedb-fa2a-4cb0-b662-95b23a050221', query_builder=search)
>>> print(f"Found {len(res)} patients")
Expand source code
def get_patients(
    self, project_id: str, query_builder: PatientFilterQueryBuilder
) -> ApiResponse:
    """Executes a query that returns patients

    Parameters
    ----------
    project_id : str
        The project ID
    query_builder : util.PatientFilterQueryBuilder
        The query builder

    Returns
    -------
    list
        The list of patients

    Examples
    --------
    >>> from phc.services import Analytics
    >>> from phc.util import PatientFilterQueryBuilder
    >>> client = Analytics(session)
    >>> search = PatientFilterQueryBuilder()
    >>> search.patient() \
            .observations() \
            .code(eq='11142-7') \
            .system(eq='http://loinc.org') \
            .value_quantity(lt=40)
    >>> res = client.get_patients(project='5a07dedb-fa2a-4cb0-b662-95b23a050221', query_builder=search)
    >>> print(f"Found {len(res)} patients")
    """
    payload = query_builder.to_dict()
    payload["dataset_id"] = project_id
    return (
        self._api_call("analytics/dsl", http_verb="POST", json=payload)
        .get("data")
        .get("patients")
    )
def list_data_lake_queries(self, project_id: str, page_size: int = 25, next_page_token: str = None) ‑> phc.api_response.ApiResponse

Fetches a list of data lake queries

Parameters

project_id : str
The project ID
page_size : int, optional
The page size, by default 25
next_page_token : str, optional
The next page token, by default None

Returns

ApiResponse
The data lake list query response
Expand source code
def list_data_lake_queries(
    self, project_id: str, page_size: int = 25, next_page_token: str = None
) -> ApiResponse:
    """Fetches a list of data lake queries

    Parameters
    ----------
    project_id : str
        The project ID
    page_size : int, optional
        The page size, by default 25
    next_page_token : str, optional
        The next page token, by default None

    Returns
    -------
    phc.ApiResponse
        The data lake list query response
    """
    path = "analytics/data-lake/query?datasetId=%s&pageSize=%d" % (
        project_id,
        page_size,
    )
    if next_page_token:
        path = "%s&nextPageToken=%s" % (path, next_page_token)
    return self._api_call(path, http_verb="GET")
def list_data_lake_schemas(self, project_id: str) ‑> phc.api_response.ApiResponse

Fetches the data lake table schemas

Parameters

project_id : string
The dataset to fetch the table schemas of

Returns

ApiResponse
The schema for each data lake table
Expand source code
def list_data_lake_schemas(self, project_id: str) -> ApiResponse:
    """Fetches the data lake table schemas

    Parameters
    ----------
    project_id : string
        The dataset to fetch the table schemas of

    Returns
    -------
    phc.ApiResponse
        The schema for each data lake table
    """
    path = "analytics/data-lake/schema?datasetId=%s" % (project_id)
    return self._api_call(path, http_verb="GET")
def load_data_lake_result_to_dataframe(self, query_id: str, dest_dir: str = '/home/runner/work/phc-sdk-py/phc-sdk-py')

Downloads the result file of a query and converts to a Pandas dataframe.

To use this method, the 'pandas' module is required. Otherwise, an exception will be thrown.

Parameters

query_id : string
Id of the query to load results from
dest_dir : string
Directory the result file will be downloaded to. Defaults to the current working directory.

Returns

asyncio.Future || pandas.DataFrame
A Future if run_async is True, the data lake query result contained in a Pandas dataframe otherwise.
Expand source code
def load_data_lake_result_to_dataframe(
    self, query_id: str, dest_dir: str = os.getcwd()
):
    """Downloads the result file of a query and converts to a Pandas dataframe.

    To use this method, the 'pandas' module is required.
    Otherwise, an exception will be thrown.

    Parameters
    ----------
    query_id : string
        Id of the query to load results from

    dest_dir : string
        Directory the result file will be downloaded to.
        Defaults to the current working directory.

    Returns
    -------
    asyncio.Future || pandas.DataFrame
        A Future if run_async is True, the data lake query result contained in a Pandas dataframe otherwise.
    """
    if not _has_pandas:
        raise ImportError("pandas is required")

    future = asyncio.ensure_future(
        self.__load_data_lake_result_to_dataframe_impl(query_id, dest_dir),
        loop=self._event_loop,
    )
    return (
        future
        if self.run_async
        else self._event_loop.run_until_complete(future)
    )
class Cohorts (session: phc.session.Session, run_async: bool = False, timeout: int = 30, trust_env: bool = False)

Provides acccess to PHC cohorts

Parameters

session : Session
The PHC session
run_async : bool
True to return promises, False to return results (default is False)
timeout : int
Operation timeout (default is 30)
trust_env : bool
Get proxies information from HTTP_PROXY / HTTPS_PROXY environment variables if the parameter is True (False by default)
Expand source code
class Cohorts(BaseClient):
    """Provides acccess to PHC cohorts

    Parameters
    ----------
    session : phc.Session
        The PHC session
    run_async: bool
        True to return promises, False to return results (default is False)
    timeout: int
        Operation timeout (default is 30)
    trust_env: bool
        Get proxies information from HTTP_PROXY / HTTPS_PROXY environment variables if the parameter is True (False by default)
    """

    def create(
        self, project_id: str, name: str, queries: list, description: str = None
    ) -> ApiResponse:
        """Creates a cohort

        Parameters
        ----------
        project_id: str
            The project that owns the cohort
        name : str
            The cohort name.
        queries: list
            The list of queries that define the cohort
        description : str, optional
            The cohort description, by default None

        Returns
        -------
        phc.ApiResponse
            The create cohort response
        """
        json_body = {
            "name": name,
            "ownerProject": project_id,
            "queries": queries,
        }
        if description:
            json_body["description"] = description
        return self._api_call("cohorts", json=json_body, http_verb="POST")

    def get(self, cohort_id) -> ApiResponse:
        """Fetch a cohort by id

        Parameters
        ----------
        cohort_id : str
            The cohort ID.

        Returns
        -------
        phc.ApiResponse
            The get cohort response
        """
        return self._api_call(f"cohorts/{cohort_id}", http_verb="GET")

    def delete(self, cohort_id: str) -> bool:
        """Delete a cohort

        Parameters
        ----------
        cohort_id : str
            The cohort ID.

        Returns
        -------
        bool
            True if the delete succeeeds, otherwise False
        """
        return (
            self._api_call(
                f"cohorts/{cohort_id}", http_verb="DELETE"
            ).status_code
            == 204
        )

    def get_list(
        self,
        project_id: str,
        page_size: int = None,
        next_page_token: str = None,
        name: str = None,
    ) -> ApiResponse:
        """Fetch a list of cohorts in a project

        Parameters
        ----------
        project_id: str
            The project ID to search within
        page_size : int, optional
            The page size, by default None
        next_page_token : str, optional
            The next page token, by default None
        name : str, optional
            A cohort name filter, by default None

        Returns
        -------
        phc.ApiResponse
            The list cohorts response
        """
        query_dict = {"projectId": project_id}
        if page_size:
            query_dict["pageSize"] = page_size
        if next_page_token:
            query_dict["nextPageToken"] = next_page_token
        if name:
            query_dict["name"] = name
        return self._api_call(
            f"cohorts?{urlencode(query_dict)}", http_verb="GET"
        )

Ancestors

  • phc.base_client.BaseClient

Methods

def create(self, project_id: str, name: str, queries: list, description: str = None) ‑> phc.api_response.ApiResponse

Creates a cohort

Parameters

project_id : str
The project that owns the cohort
name : str
The cohort name.
queries : list
The list of queries that define the cohort
description : str, optional
The cohort description, by default None

Returns

ApiResponse
The create cohort response
Expand source code
def create(
    self, project_id: str, name: str, queries: list, description: str = None
) -> ApiResponse:
    """Creates a cohort

    Parameters
    ----------
    project_id: str
        The project that owns the cohort
    name : str
        The cohort name.
    queries: list
        The list of queries that define the cohort
    description : str, optional
        The cohort description, by default None

    Returns
    -------
    phc.ApiResponse
        The create cohort response
    """
    json_body = {
        "name": name,
        "ownerProject": project_id,
        "queries": queries,
    }
    if description:
        json_body["description"] = description
    return self._api_call("cohorts", json=json_body, http_verb="POST")
def delete(self, cohort_id: str) ‑> bool

Delete a cohort

Parameters

cohort_id : str
The cohort ID.

Returns

bool
True if the delete succeeeds, otherwise False
Expand source code
def delete(self, cohort_id: str) -> bool:
    """Delete a cohort

    Parameters
    ----------
    cohort_id : str
        The cohort ID.

    Returns
    -------
    bool
        True if the delete succeeeds, otherwise False
    """
    return (
        self._api_call(
            f"cohorts/{cohort_id}", http_verb="DELETE"
        ).status_code
        == 204
    )
def get(self, cohort_id) ‑> phc.api_response.ApiResponse

Fetch a cohort by id

Parameters

cohort_id : str
The cohort ID.

Returns

ApiResponse
The get cohort response
Expand source code
def get(self, cohort_id) -> ApiResponse:
    """Fetch a cohort by id

    Parameters
    ----------
    cohort_id : str
        The cohort ID.

    Returns
    -------
    phc.ApiResponse
        The get cohort response
    """
    return self._api_call(f"cohorts/{cohort_id}", http_verb="GET")
def get_list(self, project_id: str, page_size: int = None, next_page_token: str = None, name: str = None) ‑> phc.api_response.ApiResponse

Fetch a list of cohorts in a project

Parameters

project_id : str
The project ID to search within
page_size : int, optional
The page size, by default None
next_page_token : str, optional
The next page token, by default None
name : str, optional
A cohort name filter, by default None

Returns

ApiResponse
The list cohorts response
Expand source code
def get_list(
    self,
    project_id: str,
    page_size: int = None,
    next_page_token: str = None,
    name: str = None,
) -> ApiResponse:
    """Fetch a list of cohorts in a project

    Parameters
    ----------
    project_id: str
        The project ID to search within
    page_size : int, optional
        The page size, by default None
    next_page_token : str, optional
        The next page token, by default None
    name : str, optional
        A cohort name filter, by default None

    Returns
    -------
    phc.ApiResponse
        The list cohorts response
    """
    query_dict = {"projectId": project_id}
    if page_size:
        query_dict["pageSize"] = page_size
    if next_page_token:
        query_dict["nextPageToken"] = next_page_token
    if name:
        query_dict["name"] = name
    return self._api_call(
        f"cohorts?{urlencode(query_dict)}", http_verb="GET"
    )
class Fhir (session: phc.session.Session, run_async: bool = False, timeout: int = 30, trust_env: bool = False)

Provides bindings to the LifeOmic FHIR Service APIs

Expand source code
class Fhir(BaseClient):
    """Provides bindings to the LifeOmic FHIR Service APIs"""

    def dsl(self, project: str, data: dict, scroll=""):
        """Executes a LifeOmic FHIR Service DSL request

        Parameters
        ----------
        project : str
            The target LifeOmic project identifier
        data : dict
            The DSL request object
        scroll
            The scroll request parameter

        Returns
        -------
        phc.ApiResponse
            The API response
        """
        path = f"fhir-search/projects/{project}"
        scroll = scroll if scroll is not True else "true"
        params = {"scroll": scroll if scroll is not True else "true"}
        return self._api_call(
            http_verb="POST", api_path=path, params=params, json=data
        )

    def sql(self, project: str, statement: str, scroll="") -> ApiResponse:
        """Executes a LifeOmic FHIR Service SQL request

        Parameters
        ----------
        project : str
            The target LifeOmic project identifier
        statement : str
            The SQL request statement
        scroll
            The scroll request parameter

        Returns
        -------
        phc.ApiResponse
            The API response
        """
        path = f"fhir-search/projects/{project}"
        headers = {"Content-Type": "text/plain"}
        params = {"scroll": scroll if scroll is not True else "true"}
        return self._api_call(
            http_verb="POST",
            api_path=path,
            headers=headers,
            params=params,
            data=statement,
        )

    def execute_sql(
        self, project_id: str, statement: str, scroll=""
    ) -> ApiResponse:
        """Executes an SQL query against fhir-searh-service

        Parameters
        ----------
        project_id : str
            The project ID.
        statement : str
            The SQL statement.

        Returns
        -------
        phc.ApiResponse
            The query response.

        Examples
        --------
        >>> import pandas as pd
        >>> from phc.services import Fhir
        >>> fhir = Fhir(session)
        >>> res = fhir.execute_sql(project_id='19e34782-91c4-4143-aaee-2ba81ed0b206',
                       statement='SELECT * from patient LIMIT 0,5000')

        >>> resources = list(map(lambda r: r.get("_source"), res.get("hits").get("hits")))
        >>> df = pd.DataFrame(resources)
        """

        """Executes an SQL query against fhir-searh-service
        Returns:
            [List] -- Dictionary with query response
        """
        warnings.warn("Use the sql method instead", DeprecationWarning)
        return self._api_call(
            api_path=f"fhir-search/projects/{project_id}",
            http_verb="POST",
            data=statement,
            headers={"Content-Type": "text/plain"},
            params={"scroll": scroll},
        )

    def execute_es(
        self, project_id: str, query: dict, scroll=""
    ) -> ApiResponse:
        """Executes an elasticsearch query against fhir-searh-service

        Parameters
        ----------
        project_id : str
            The project ID
        query : dict
            The ES query dictionary

        Returns
        -------
        phc.ApiResponse
            The query response
        """
        warnings.warn("Use the dsl method instead", DeprecationWarning)
        return self._api_call(
            api_path=f"fhir-search/projects/{project_id}",
            http_verb="POST",
            json=query,
            params={"scroll": scroll},
        )

Ancestors

  • phc.base_client.BaseClient

Methods

def dsl(self, project: str, data: dict, scroll='')

Executes a LifeOmic FHIR Service DSL request

Parameters

project : str
The target LifeOmic project identifier
data : dict
The DSL request object
scroll
The scroll request parameter

Returns

ApiResponse
The API response
Expand source code
def dsl(self, project: str, data: dict, scroll=""):
    """Executes a LifeOmic FHIR Service DSL request

    Parameters
    ----------
    project : str
        The target LifeOmic project identifier
    data : dict
        The DSL request object
    scroll
        The scroll request parameter

    Returns
    -------
    phc.ApiResponse
        The API response
    """
    path = f"fhir-search/projects/{project}"
    scroll = scroll if scroll is not True else "true"
    params = {"scroll": scroll if scroll is not True else "true"}
    return self._api_call(
        http_verb="POST", api_path=path, params=params, json=data
    )
def execute_es(self, project_id: str, query: dict, scroll='') ‑> phc.api_response.ApiResponse

Executes an elasticsearch query against fhir-searh-service

Parameters

project_id : str
The project ID
query : dict
The ES query dictionary

Returns

ApiResponse
The query response
Expand source code
def execute_es(
    self, project_id: str, query: dict, scroll=""
) -> ApiResponse:
    """Executes an elasticsearch query against fhir-searh-service

    Parameters
    ----------
    project_id : str
        The project ID
    query : dict
        The ES query dictionary

    Returns
    -------
    phc.ApiResponse
        The query response
    """
    warnings.warn("Use the dsl method instead", DeprecationWarning)
    return self._api_call(
        api_path=f"fhir-search/projects/{project_id}",
        http_verb="POST",
        json=query,
        params={"scroll": scroll},
    )
def execute_sql(self, project_id: str, statement: str, scroll='') ‑> phc.api_response.ApiResponse

Executes an SQL query against fhir-searh-service

Parameters

project_id : str
The project ID.
statement : str
The SQL statement.

Returns

ApiResponse
The query response.

Examples

>>> import pandas as pd
>>> from phc.services import Fhir
>>> fhir = Fhir(session)
>>> res = fhir.execute_sql(project_id='19e34782-91c4-4143-aaee-2ba81ed0b206',
               statement='SELECT * from patient LIMIT 0,5000')
>>> resources = list(map(lambda r: r.get("_source"), res.get("hits").get("hits")))
>>> df = pd.DataFrame(resources)
Expand source code
def execute_sql(
    self, project_id: str, statement: str, scroll=""
) -> ApiResponse:
    """Executes an SQL query against fhir-searh-service

    Parameters
    ----------
    project_id : str
        The project ID.
    statement : str
        The SQL statement.

    Returns
    -------
    phc.ApiResponse
        The query response.

    Examples
    --------
    >>> import pandas as pd
    >>> from phc.services import Fhir
    >>> fhir = Fhir(session)
    >>> res = fhir.execute_sql(project_id='19e34782-91c4-4143-aaee-2ba81ed0b206',
                   statement='SELECT * from patient LIMIT 0,5000')

    >>> resources = list(map(lambda r: r.get("_source"), res.get("hits").get("hits")))
    >>> df = pd.DataFrame(resources)
    """

    """Executes an SQL query against fhir-searh-service
    Returns:
        [List] -- Dictionary with query response
    """
    warnings.warn("Use the sql method instead", DeprecationWarning)
    return self._api_call(
        api_path=f"fhir-search/projects/{project_id}",
        http_verb="POST",
        data=statement,
        headers={"Content-Type": "text/plain"},
        params={"scroll": scroll},
    )
def sql(self, project: str, statement: str, scroll='') ‑> phc.api_response.ApiResponse

Executes a LifeOmic FHIR Service SQL request

Parameters

project : str
The target LifeOmic project identifier
statement : str
The SQL request statement
scroll
The scroll request parameter

Returns

ApiResponse
The API response
Expand source code
def sql(self, project: str, statement: str, scroll="") -> ApiResponse:
    """Executes a LifeOmic FHIR Service SQL request

    Parameters
    ----------
    project : str
        The target LifeOmic project identifier
    statement : str
        The SQL request statement
    scroll
        The scroll request parameter

    Returns
    -------
    phc.ApiResponse
        The API response
    """
    path = f"fhir-search/projects/{project}"
    headers = {"Content-Type": "text/plain"}
    params = {"scroll": scroll if scroll is not True else "true"}
    return self._api_call(
        http_verb="POST",
        api_path=path,
        headers=headers,
        params=params,
        data=statement,
    )
class Files (session: phc.session.Session, run_async: bool = False, timeout: int = 30, trust_env: bool = False)

Provides acccess to PHC files

Parameters

session : Session
The PHC session
run_async : bool
True to return promises, False to return results (default is False)
timeout : int
Operation timeout (default is 30)
trust_env : bool
Get proxies information from HTTP_PROXY / HTTPS_PROXY environment variables if the parameter is True (False by default)
Expand source code
class Files(BaseClient):
    """Provides acccess to PHC files

    Parameters
    ----------
    session : phc.Session
        The PHC session
    run_async: bool
        True to return promises, False to return results (default is False)
    timeout: int
        Operation timeout (default is 30)
    trust_env: bool
        Get proxies information from HTTP_PROXY / HTTPS_PROXY environment variables if the parameter is True (False by default)
    """

    _MULTIPART_MIN_SIZE = 5 * 1024 * 1024
    _MAX_PARTS = 10000

    def upload(
        self,
        project_id: str,
        source: str,
        file_name: str = None,
        overwrite: bool = False,
    ) -> ApiResponse:
        """Upload a file.

        Parameters
        ----------
        project_id : str
            The project ID
        source : str
            The path of the file to upload
        file_name : str, optional
            The name of the file, If None will default to the actual base file name.
        overwrite : bool, optional
            True to overwrite an existing file of the same name, by default False

        Returns
        -------
        ApiResponse
            The upload file response

        Examples
        --------
        >>> from phc.services import Files
        >>> files = files(session)
        >>> files.upload(project_id="db3e09e9-1ecd-4976-aa5e-70ac7ada0cc3", source="./myfile.txt", overwrite=True)
        """
        file_size = os.path.getsize(source)
        if file_size > self._MULTIPART_MIN_SIZE:
            res = self._api_call(
                "uploads",
                json={
                    "name": file_name
                    if file_name is not None
                    else os.path.basename(source),
                    "datasetId": project_id,
                    "overwrite": overwrite,
                },
            )
            upload_id = res.get("uploadId")
            part_size = max(
                math.ceil(file_size / self._MAX_PARTS), self._MULTIPART_MIN_SIZE
            )
            total_parts = math.ceil(file_size / part_size)
            part = 1
            while part <= total_parts:
                start = (part - 1) * part_size
                end = file_size if part == total_parts else start + part_size
                f = open(source, "rb")
                f.seek(start)
                data = f.read(end - start)
                f.close()
                part_res = self._api_call(
                    f"uploads/{upload_id}/parts/{part}", http_verb="GET"
                )
                self._api_call_impl(
                    http_verb="PUT",
                    url=part_res.get("uploadUrl"),
                    api_path=None,
                    upload_file=data,
                    headers={
                        "Content-Length": str(end - start),
                        "Authorization": None,
                        "LifeOmic-Account": None,
                        "Content-Type": None,
                    },
                )
                print(f"Upload {part}")
                part += 1
            self._api_call(f"uploads/{upload_id}", http_verb="DELETE")
            return res
        else:
            res = self._api_call(
                "files",
                json={
                    "name": file_name
                    if file_name is not None
                    else os.path.basename(source),
                    "datasetId": project_id,
                    "overwrite": overwrite,
                },
            )
            self._api_call_impl(
                http_verb="PUT",
                url=res.get("uploadUrl"),
                api_path=None,
                upload_file=source,
                headers={
                    "Content-Length": str(file_size),
                    "Authorization": None,
                    "LifeOmic-Account": None,
                    "Content-Type": None,
                },
            )
            return res

    @backoff.on_exception(
        backoff.expo, OSError, max_tries=6, jitter=backoff.full_jitter
    )
    def download(self, file_id: str, dest_dir: str = os.getcwd()) -> None:
        """Download a file

        Parameters
        ----------
        file_id : str
            The file ID
        dest_dir : str, optional
            The local directory to save the file.  Defaults to the current working directory

        Examples
        --------
        >>> from phc.services import Files
        >>> files = files(session)
        >>> files.download(file_id="db3e09e9-1ecd-4976-aa5e-70ac7ada0cc3", dest_dir="./mydata")
        """
        res = self._api_call(
            f"files/{file_id}?include=downloadUrl", http_verb="GET"
        )

        file_path = os.path.join(dest_dir, res.get("name"))
        target_dir = os.path.dirname(file_path)
        if not os.path.exists(target_dir):
            os.makedirs(target_dir)

        urlretrieve(res.get("downloadUrl"), file_path)
        return file_path

    def get(self, file_id: str) -> ApiResponse:
        """Fetch a file by id

        Parameters
        ----------
        file_id : str
            The file ID.

        Returns
        -------
        phc.ApiResponse
            The get file response
        """
        return self._api_call(f"files/{file_id}", http_verb="GET")

    def update(
        self, file_id: str, project_id: str = None, name: str = None
    ) -> ApiResponse:
        """Update a files by moving it to a new project or by renaming it.

        Parameters
        ----------
        file_id : str
            The file ID to update.
        project_id : str
            The new project ID for the file.
        name : str
            The new file name

        Returns
        -------
        phc.ApiResponse
            The update file response
        """
        if not project_id and not name:
            raise ValueError(
                "Must provide a value for either 'project_id' or 'name'"
            )

        json_body = {}
        if name:
            json_body["name"] = name
        if project_id:
            json_body["datasetId"] = project_id

        return self._api_call(
            f"files/{file_id}", json=json_body, http_verb="PATCH"
        )

    def delete(self, file_id: str) -> bool:
        """Delete a file

        Parameters
        ----------
        file_id : str
            The file ID.

        Returns
        -------
        bool
            True if the delete succeeeds, otherwise False
        """
        return (
            self._api_call(f"files/{file_id}", http_verb="DELETE").status_code
            == 204
        )

    def get_list(
        self,
        project_id: str,
        folder: str = None,
        page_size: int = None,
        next_page_token: str = None,
    ) -> ApiResponse:
        """Fetch a list of files in a project

        Parameters
        ----------
        project_id: str
            The project ID
        folder: str, optional
            The folder prefix to look for files, by default None
        page_size : int, optional
            The page size, by default None
        next_page_token : str, optional
            The next page token, by default None

        Returns
        -------
        phc.ApiResponse
            The list files response
        """
        query_dict = {}
        if page_size:
            query_dict["pageSize"] = page_size
        if next_page_token:
            query_dict["nextPageToken"] = next_page_token
        if folder:
            query_dict["prefix"] = folder

        return self._api_call(
            f"projects/{project_id}/files?{urlencode(query_dict)}",
            http_verb="GET",
        )

    def exists(self, file_id: str) -> ApiResponse:
        """Check if a file exists by id

        Parameters
        ----------
        file_id : str
            The file ID.

        Returns
        -------
        bool
            True if the file exists, false otherwise
        """
        try:
            self._api_call(f"files/{file_id}", http_verb="GET")
            return True
        except ApiError as e:
            if e.response.status_code == 404:
                return False
            raise e

Ancestors

  • phc.base_client.BaseClient

Methods

def delete(self, file_id: str) ‑> bool

Delete a file

Parameters

file_id : str
The file ID.

Returns

bool
True if the delete succeeeds, otherwise False
Expand source code
def delete(self, file_id: str) -> bool:
    """Delete a file

    Parameters
    ----------
    file_id : str
        The file ID.

    Returns
    -------
    bool
        True if the delete succeeeds, otherwise False
    """
    return (
        self._api_call(f"files/{file_id}", http_verb="DELETE").status_code
        == 204
    )
def download(self, file_id: str, dest_dir: str = '/home/runner/work/phc-sdk-py/phc-sdk-py') ‑> NoneType

Download a file

Parameters

file_id : str
The file ID
dest_dir : str, optional
The local directory to save the file. Defaults to the current working directory

Examples

>>> from phc.services import Files
>>> files = files(session)
>>> files.download(file_id="db3e09e9-1ecd-4976-aa5e-70ac7ada0cc3", dest_dir="./mydata")
Expand source code
@backoff.on_exception(
    backoff.expo, OSError, max_tries=6, jitter=backoff.full_jitter
)
def download(self, file_id: str, dest_dir: str = os.getcwd()) -> None:
    """Download a file

    Parameters
    ----------
    file_id : str
        The file ID
    dest_dir : str, optional
        The local directory to save the file.  Defaults to the current working directory

    Examples
    --------
    >>> from phc.services import Files
    >>> files = files(session)
    >>> files.download(file_id="db3e09e9-1ecd-4976-aa5e-70ac7ada0cc3", dest_dir="./mydata")
    """
    res = self._api_call(
        f"files/{file_id}?include=downloadUrl", http_verb="GET"
    )

    file_path = os.path.join(dest_dir, res.get("name"))
    target_dir = os.path.dirname(file_path)
    if not os.path.exists(target_dir):
        os.makedirs(target_dir)

    urlretrieve(res.get("downloadUrl"), file_path)
    return file_path
def exists(self, file_id: str) ‑> phc.api_response.ApiResponse

Check if a file exists by id

Parameters

file_id : str
The file ID.

Returns

bool
True if the file exists, false otherwise
Expand source code
def exists(self, file_id: str) -> ApiResponse:
    """Check if a file exists by id

    Parameters
    ----------
    file_id : str
        The file ID.

    Returns
    -------
    bool
        True if the file exists, false otherwise
    """
    try:
        self._api_call(f"files/{file_id}", http_verb="GET")
        return True
    except ApiError as e:
        if e.response.status_code == 404:
            return False
        raise e
def get(self, file_id: str) ‑> phc.api_response.ApiResponse

Fetch a file by id

Parameters

file_id : str
The file ID.

Returns

ApiResponse
The get file response
Expand source code
def get(self, file_id: str) -> ApiResponse:
    """Fetch a file by id

    Parameters
    ----------
    file_id : str
        The file ID.

    Returns
    -------
    phc.ApiResponse
        The get file response
    """
    return self._api_call(f"files/{file_id}", http_verb="GET")
def get_list(self, project_id: str, folder: str = None, page_size: int = None, next_page_token: str = None) ‑> phc.api_response.ApiResponse

Fetch a list of files in a project

Parameters

project_id : str
The project ID
folder : str, optional
The folder prefix to look for files, by default None
page_size : int, optional
The page size, by default None
next_page_token : str, optional
The next page token, by default None

Returns

ApiResponse
The list files response
Expand source code
def get_list(
    self,
    project_id: str,
    folder: str = None,
    page_size: int = None,
    next_page_token: str = None,
) -> ApiResponse:
    """Fetch a list of files in a project

    Parameters
    ----------
    project_id: str
        The project ID
    folder: str, optional
        The folder prefix to look for files, by default None
    page_size : int, optional
        The page size, by default None
    next_page_token : str, optional
        The next page token, by default None

    Returns
    -------
    phc.ApiResponse
        The list files response
    """
    query_dict = {}
    if page_size:
        query_dict["pageSize"] = page_size
    if next_page_token:
        query_dict["nextPageToken"] = next_page_token
    if folder:
        query_dict["prefix"] = folder

    return self._api_call(
        f"projects/{project_id}/files?{urlencode(query_dict)}",
        http_verb="GET",
    )
def update(self, file_id: str, project_id: str = None, name: str = None) ‑> phc.api_response.ApiResponse

Update a files by moving it to a new project or by renaming it.

Parameters

file_id : str
The file ID to update.
project_id : str
The new project ID for the file.
name : str
The new file name

Returns

ApiResponse
The update file response
Expand source code
def update(
    self, file_id: str, project_id: str = None, name: str = None
) -> ApiResponse:
    """Update a files by moving it to a new project or by renaming it.

    Parameters
    ----------
    file_id : str
        The file ID to update.
    project_id : str
        The new project ID for the file.
    name : str
        The new file name

    Returns
    -------
    phc.ApiResponse
        The update file response
    """
    if not project_id and not name:
        raise ValueError(
            "Must provide a value for either 'project_id' or 'name'"
        )

    json_body = {}
    if name:
        json_body["name"] = name
    if project_id:
        json_body["datasetId"] = project_id

    return self._api_call(
        f"files/{file_id}", json=json_body, http_verb="PATCH"
    )
def upload(self, project_id: str, source: str, file_name: str = None, overwrite: bool = False) ‑> phc.api_response.ApiResponse

Upload a file.

Parameters

project_id : str
The project ID
source : str
The path of the file to upload
file_name : str, optional
The name of the file, If None will default to the actual base file name.
overwrite : bool, optional
True to overwrite an existing file of the same name, by default False

Returns

ApiResponse
The upload file response

Examples

>>> from phc.services import Files
>>> files = files(session)
>>> files.upload(project_id="db3e09e9-1ecd-4976-aa5e-70ac7ada0cc3", source="./myfile.txt", overwrite=True)
Expand source code
def upload(
    self,
    project_id: str,
    source: str,
    file_name: str = None,
    overwrite: bool = False,
) -> ApiResponse:
    """Upload a file.

    Parameters
    ----------
    project_id : str
        The project ID
    source : str
        The path of the file to upload
    file_name : str, optional
        The name of the file, If None will default to the actual base file name.
    overwrite : bool, optional
        True to overwrite an existing file of the same name, by default False

    Returns
    -------
    ApiResponse
        The upload file response

    Examples
    --------
    >>> from phc.services import Files
    >>> files = files(session)
    >>> files.upload(project_id="db3e09e9-1ecd-4976-aa5e-70ac7ada0cc3", source="./myfile.txt", overwrite=True)
    """
    file_size = os.path.getsize(source)
    if file_size > self._MULTIPART_MIN_SIZE:
        res = self._api_call(
            "uploads",
            json={
                "name": file_name
                if file_name is not None
                else os.path.basename(source),
                "datasetId": project_id,
                "overwrite": overwrite,
            },
        )
        upload_id = res.get("uploadId")
        part_size = max(
            math.ceil(file_size / self._MAX_PARTS), self._MULTIPART_MIN_SIZE
        )
        total_parts = math.ceil(file_size / part_size)
        part = 1
        while part <= total_parts:
            start = (part - 1) * part_size
            end = file_size if part == total_parts else start + part_size
            f = open(source, "rb")
            f.seek(start)
            data = f.read(end - start)
            f.close()
            part_res = self._api_call(
                f"uploads/{upload_id}/parts/{part}", http_verb="GET"
            )
            self._api_call_impl(
                http_verb="PUT",
                url=part_res.get("uploadUrl"),
                api_path=None,
                upload_file=data,
                headers={
                    "Content-Length": str(end - start),
                    "Authorization": None,
                    "LifeOmic-Account": None,
                    "Content-Type": None,
                },
            )
            print(f"Upload {part}")
            part += 1
        self._api_call(f"uploads/{upload_id}", http_verb="DELETE")
        return res
    else:
        res = self._api_call(
            "files",
            json={
                "name": file_name
                if file_name is not None
                else os.path.basename(source),
                "datasetId": project_id,
                "overwrite": overwrite,
            },
        )
        self._api_call_impl(
            http_verb="PUT",
            url=res.get("uploadUrl"),
            api_path=None,
            upload_file=source,
            headers={
                "Content-Length": str(file_size),
                "Authorization": None,
                "LifeOmic-Account": None,
                "Content-Type": None,
            },
        )
        return res
class Genomics (session: phc.session.Session, run_async: bool = False, timeout: int = 30, trust_env: bool = False)

Provides acccess to PHC genomic resources

Parameters

session : Session
The PHC session
run_async : bool
True to return promises, False to return results (default is False)
timeout : int
Operation timeout (default is 30)
trust_env : bool
Get proxies information from HTTP_PROXY / HTTPS_PROXY environment variables if the parameter is True (False by default)
Expand source code
class Genomics(BaseClient):
    """Provides acccess to PHC genomic resources

    Parameters
    ----------
    session : phc.Session
        The PHC session
    run_async: bool
        True to return promises, False to return results (default is False)
    timeout: int
        Operation timeout (default is 30)
    trust_env: bool
        Get proxies information from HTTP_PROXY / HTTPS_PROXY environment variables if the parameter is True (False by default)
    """

    class SetType(Enum):
        VARIANT = "variantsets"
        STRUCTURAL_VARIANT = "fusionsets"
        RNA = "rnaquantificationsets"
        READ = "readgroupsets"
        COPY_NUMBER = "copynumbersets"

    class Reference(Enum):
        GRCh37 = "GRCh37"
        GRCh38 = "GRCh38"

    class SequenceType(Enum):
        GERMLINE = "germline"
        SOMATIC = "somatic"
        METASTATIC = "metastatic"
        CTDNA = "ctDNA"
        RNA = "rna"

    class Status(Enum):
        ACTIVE = "ACTIVE"
        INDEXING = "INDEXING"
        FAILED = "FAILED"

    def create_set(
        self,
        set_type: SetType,
        project_id: str,
        name: str,
        file_id: str,
        patient_id: str,
        reference: Reference,
        sequence_type: SequenceType,
        test_type: str,
        sequence_id: str = str(uuid.uuid4()),
        indexed_date: datetime = None,
        performer_id: str = None,
        test_id: str = None,
        update_sample: bool = False,
        pass_filter: bool = False,
        output_vcf_name: str = None,
    ) -> ApiResponse:
        """Creates a genomic set

        Parameters
        ----------
        set_type : SetType
            The genomic set type
        project_id : str
            The project ID
        name : str
            The set name
        file_id : str
            The genomic file ID
        patient_id : str
            The patient ID
        reference : Reference
            The genomic reference
        sequence_type : SequenceType
            The sequence type
        test_type : str
            The test type
        sequence_id : str, optional
            The FHIR Sequence ID, by default str(uuid.uuid4())
        indexed_date : datetime, optional
            The indexed date, by default None
        performer_id : str, optional
            The performer ID, by default None
        test_id : str, optional
            The test ID, by default None
        update_sample : bool, optional
            For variants only, True to update the sample ID, by default False
        pass_filter : bool, optional
            For variants only, True to update all filters to pass, by default False
        output_vcf_name : str, optional
            For variants only, the name of the output VCF, by default None

        Returns
        -------
        ApiResponse
            The create set response
        """
        json_body = {
            "datasetId": project_id,
            "name": name,
            "patientId": patient_id,
            "referenceSetId": reference.value,
            "sequenceType": sequence_type.value,
            "testType": test_type,
            "indexedDate": indexed_date.isoformat() if indexed_date else None,
            "performerId": performer_id,
            "testId": test_id,
            "sequenceId": sequence_id,
        }

        if set_type == Genomics.SetType.VARIANT:
            json_body["variantsFileIds"] = [file_id]
            json_body["updateSample"] = (update_sample,)
            json_body["passFile"] = pass_filter
            json_body["outputVcfName"] = output_vcf_name

            return self._ga4gh_call(
                "genomicsets", json=json_body, http_verb="POST"
            )
        else:
            json_body["fileId"] = file_id

            return self._ga4gh_call(
                set_type.value, json=json_body, http_verb="POST"
            )

    def update_set(self, set_type: SetType, set_id: str, updates: dict) -> ApiResponse:
        """Update a genomic set

        Parameters
        ----------
        set_type : SetType
            The set type
        set_id : str
            The set ID
        updates : dict
            The updates to apply

        Returns
        -------
        ApiResponse
            The fetch response
        """
        return self._ga4gh_call(f"{set_type.value}/{set_id}", json=updates, http_verb="PATCH")

    def get_set(self, set_type: SetType, set_id: str) -> ApiResponse:
        """Fetch a genomic set

        Parameters
        ----------
        set_type : SetType
            The set type
        set_id : str
            The set ID

        Returns
        -------
        ApiResponse
            The fetch response
        """
        return self._ga4gh_call(f"{set_type.value}/{set_id}", http_verb="GET")

    def delete_set(self, set_type: SetType, set_id: str) -> bool:
        """Delete a genomic set

        Parameters
        ----------
        set_type : SetType
            The set type
        set_id : str
            The set ID

        Returns
        -------
        bool
            True if the delete succeeeds, otherwise False
        """
        return (
            self._ga4gh_call(
                f"{set_type.value}/{set_id}", http_verb="DELETE"
            ).status_code
            == 204
        )

    def list_sets(
        self,
        set_type: SetType,
        project_id: str,
        sequence_id: str = None,
        patient_id: str = None,
        status: Status = None,
        next_page_token: str = None,
        page_size: int = 50,
    ) -> ApiResponse:
        """List genomic sets

        Parameters
        ----------
        set_type : SetType
            The set type
        project_id : str
            The project ID
        sequence_id : str, optional
            List sets by sequence ID, by default None
        patient_id : str, optional
            List sets by patient ID, by default None
        status : Status, optional
            Filter sets by status, by default None
        next_page_token : str, optional
            The next page token, by default None
        page_size : int, optional
            The page size, by default 50

        Returns
        -------
        ApiResponse
            The list sets response
        """

        json_body = {
            "datasetIds": [project_id],
            "status": status,
            "patientId": patient_id,
            "sequenceId": sequence_id,
            "pageSize": page_size,
            "pageToken": next_page_token,
        }

        return self._ga4gh_call(
            f"{set_type.value}/search", json=json_body, http_verb="POST"
        )

    def list_tests(self, project_id: str, patient_id: str) -> ApiResponse:
        """List tests for a patient

        Parameters
        ----------
        project_id : str
            The project ID
        patient_id : str
            The patient ID

        Returns
        -------
        ApiResponse
            The list tests response
        """
        return self._api_call(
            f"genomics/projects/{project_id}/subjects/{patient_id}/tests",
            http_verb="GET",
        )

    def get_test(self, project_id: str, test_id: str) -> ApiResponse:
        """Get test by project and test id

        Parameters
        ----------
        project_id : str
            The project ID
        test_id : str
            The Test ID

        Returns
        -------
        ApiResponse
            The get test response
        """
        return self._api_call(
            f"genomics/projects/{project_id}/tests/{test_id}",
            http_verb="GET",
        )

    def delete_test(self, project_id: str, test_id: str) -> bool:
        """Delete a genomic test

        Parameters
        ----------
        project_id : SetType
            The project ID
        test_id : str
            The test ID

        Returns
        -------
        bool
            True if the delete succeeeds, otherwise False
        """
        return (
            self._ga4gh_call(
                f"genomics/projects/{project_id}/tests/{test_id}",
                http_verb="DELETE",
            ).status_code
            == 204
        )

Ancestors

  • phc.base_client.BaseClient

Class variables

var Reference

An enumeration.

var SequenceType

An enumeration.

var SetType

An enumeration.

var Status

An enumeration.

Methods

def create_set(self, set_type: phc.services.genomics.Genomics.SetType, project_id: str, name: str, file_id: str, patient_id: str, reference: phc.services.genomics.Genomics.Reference, sequence_type: phc.services.genomics.Genomics.SequenceType, test_type: str, sequence_id: str = '4f7eb96c-96a9-4b63-8401-68e1f859a9ac', indexed_date: datetime.datetime = None, performer_id: str = None, test_id: str = None, update_sample: bool = False, pass_filter: bool = False, output_vcf_name: str = None) ‑> phc.api_response.ApiResponse

Creates a genomic set

Parameters

set_type : SetType
The genomic set type
project_id : str
The project ID
name : str
The set name
file_id : str
The genomic file ID
patient_id : str
The patient ID
reference : Reference
The genomic reference
sequence_type : SequenceType
The sequence type
test_type : str
The test type
sequence_id : str, optional
The FHIR Sequence ID, by default str(uuid.uuid4())
indexed_date : datetime, optional
The indexed date, by default None
performer_id : str, optional
The performer ID, by default None
test_id : str, optional
The test ID, by default None
update_sample : bool, optional
For variants only, True to update the sample ID, by default False
pass_filter : bool, optional
For variants only, True to update all filters to pass, by default False
output_vcf_name : str, optional
For variants only, the name of the output VCF, by default None

Returns

ApiResponse
The create set response
Expand source code
def create_set(
    self,
    set_type: SetType,
    project_id: str,
    name: str,
    file_id: str,
    patient_id: str,
    reference: Reference,
    sequence_type: SequenceType,
    test_type: str,
    sequence_id: str = str(uuid.uuid4()),
    indexed_date: datetime = None,
    performer_id: str = None,
    test_id: str = None,
    update_sample: bool = False,
    pass_filter: bool = False,
    output_vcf_name: str = None,
) -> ApiResponse:
    """Creates a genomic set

    Parameters
    ----------
    set_type : SetType
        The genomic set type
    project_id : str
        The project ID
    name : str
        The set name
    file_id : str
        The genomic file ID
    patient_id : str
        The patient ID
    reference : Reference
        The genomic reference
    sequence_type : SequenceType
        The sequence type
    test_type : str
        The test type
    sequence_id : str, optional
        The FHIR Sequence ID, by default str(uuid.uuid4())
    indexed_date : datetime, optional
        The indexed date, by default None
    performer_id : str, optional
        The performer ID, by default None
    test_id : str, optional
        The test ID, by default None
    update_sample : bool, optional
        For variants only, True to update the sample ID, by default False
    pass_filter : bool, optional
        For variants only, True to update all filters to pass, by default False
    output_vcf_name : str, optional
        For variants only, the name of the output VCF, by default None

    Returns
    -------
    ApiResponse
        The create set response
    """
    json_body = {
        "datasetId": project_id,
        "name": name,
        "patientId": patient_id,
        "referenceSetId": reference.value,
        "sequenceType": sequence_type.value,
        "testType": test_type,
        "indexedDate": indexed_date.isoformat() if indexed_date else None,
        "performerId": performer_id,
        "testId": test_id,
        "sequenceId": sequence_id,
    }

    if set_type == Genomics.SetType.VARIANT:
        json_body["variantsFileIds"] = [file_id]
        json_body["updateSample"] = (update_sample,)
        json_body["passFile"] = pass_filter
        json_body["outputVcfName"] = output_vcf_name

        return self._ga4gh_call(
            "genomicsets", json=json_body, http_verb="POST"
        )
    else:
        json_body["fileId"] = file_id

        return self._ga4gh_call(
            set_type.value, json=json_body, http_verb="POST"
        )
def delete_set(self, set_type: phc.services.genomics.Genomics.SetType, set_id: str) ‑> bool

Delete a genomic set

Parameters

set_type : SetType
The set type
set_id : str
The set ID

Returns

bool
True if the delete succeeeds, otherwise False
Expand source code
def delete_set(self, set_type: SetType, set_id: str) -> bool:
    """Delete a genomic set

    Parameters
    ----------
    set_type : SetType
        The set type
    set_id : str
        The set ID

    Returns
    -------
    bool
        True if the delete succeeeds, otherwise False
    """
    return (
        self._ga4gh_call(
            f"{set_type.value}/{set_id}", http_verb="DELETE"
        ).status_code
        == 204
    )
def delete_test(self, project_id: str, test_id: str) ‑> bool

Delete a genomic test

Parameters

project_id : SetType
The project ID
test_id : str
The test ID

Returns

bool
True if the delete succeeeds, otherwise False
Expand source code
def delete_test(self, project_id: str, test_id: str) -> bool:
    """Delete a genomic test

    Parameters
    ----------
    project_id : SetType
        The project ID
    test_id : str
        The test ID

    Returns
    -------
    bool
        True if the delete succeeeds, otherwise False
    """
    return (
        self._ga4gh_call(
            f"genomics/projects/{project_id}/tests/{test_id}",
            http_verb="DELETE",
        ).status_code
        == 204
    )
def get_set(self, set_type: phc.services.genomics.Genomics.SetType, set_id: str) ‑> phc.api_response.ApiResponse

Fetch a genomic set

Parameters

set_type : SetType
The set type
set_id : str
The set ID

Returns

ApiResponse
The fetch response
Expand source code
def get_set(self, set_type: SetType, set_id: str) -> ApiResponse:
    """Fetch a genomic set

    Parameters
    ----------
    set_type : SetType
        The set type
    set_id : str
        The set ID

    Returns
    -------
    ApiResponse
        The fetch response
    """
    return self._ga4gh_call(f"{set_type.value}/{set_id}", http_verb="GET")
def get_test(self, project_id: str, test_id: str) ‑> phc.api_response.ApiResponse

Get test by project and test id

Parameters

project_id : str
The project ID
test_id : str
The Test ID

Returns

ApiResponse
The get test response
Expand source code
def get_test(self, project_id: str, test_id: str) -> ApiResponse:
    """Get test by project and test id

    Parameters
    ----------
    project_id : str
        The project ID
    test_id : str
        The Test ID

    Returns
    -------
    ApiResponse
        The get test response
    """
    return self._api_call(
        f"genomics/projects/{project_id}/tests/{test_id}",
        http_verb="GET",
    )
def list_sets(self, set_type: phc.services.genomics.Genomics.SetType, project_id: str, sequence_id: str = None, patient_id: str = None, status: phc.services.genomics.Genomics.Status = None, next_page_token: str = None, page_size: int = 50) ‑> phc.api_response.ApiResponse

List genomic sets

Parameters

set_type : SetType
The set type
project_id : str
The project ID
sequence_id : str, optional
List sets by sequence ID, by default None
patient_id : str, optional
List sets by patient ID, by default None
status : Status, optional
Filter sets by status, by default None
next_page_token : str, optional
The next page token, by default None
page_size : int, optional
The page size, by default 50

Returns

ApiResponse
The list sets response
Expand source code
def list_sets(
    self,
    set_type: SetType,
    project_id: str,
    sequence_id: str = None,
    patient_id: str = None,
    status: Status = None,
    next_page_token: str = None,
    page_size: int = 50,
) -> ApiResponse:
    """List genomic sets

    Parameters
    ----------
    set_type : SetType
        The set type
    project_id : str
        The project ID
    sequence_id : str, optional
        List sets by sequence ID, by default None
    patient_id : str, optional
        List sets by patient ID, by default None
    status : Status, optional
        Filter sets by status, by default None
    next_page_token : str, optional
        The next page token, by default None
    page_size : int, optional
        The page size, by default 50

    Returns
    -------
    ApiResponse
        The list sets response
    """

    json_body = {
        "datasetIds": [project_id],
        "status": status,
        "patientId": patient_id,
        "sequenceId": sequence_id,
        "pageSize": page_size,
        "pageToken": next_page_token,
    }

    return self._ga4gh_call(
        f"{set_type.value}/search", json=json_body, http_verb="POST"
    )
def list_tests(self, project_id: str, patient_id: str) ‑> phc.api_response.ApiResponse

List tests for a patient

Parameters

project_id : str
The project ID
patient_id : str
The patient ID

Returns

ApiResponse
The list tests response
Expand source code
def list_tests(self, project_id: str, patient_id: str) -> ApiResponse:
    """List tests for a patient

    Parameters
    ----------
    project_id : str
        The project ID
    patient_id : str
        The patient ID

    Returns
    -------
    ApiResponse
        The list tests response
    """
    return self._api_call(
        f"genomics/projects/{project_id}/subjects/{patient_id}/tests",
        http_verb="GET",
    )
def update_set(self, set_type: phc.services.genomics.Genomics.SetType, set_id: str, updates: dict) ‑> phc.api_response.ApiResponse

Update a genomic set

Parameters

set_type : SetType
The set type
set_id : str
The set ID
updates : dict
The updates to apply

Returns

ApiResponse
The fetch response
Expand source code
def update_set(self, set_type: SetType, set_id: str, updates: dict) -> ApiResponse:
    """Update a genomic set

    Parameters
    ----------
    set_type : SetType
        The set type
    set_id : str
        The set ID
    updates : dict
        The updates to apply

    Returns
    -------
    ApiResponse
        The fetch response
    """
    return self._ga4gh_call(f"{set_type.value}/{set_id}", json=updates, http_verb="PATCH")
class Projects (session: phc.session.Session, run_async: bool = False, timeout: int = 30, trust_env: bool = False)

Provides acccess to PHC projects

Parameters

session : Session
The PHC session
run_async : bool
True to return promises, False to return results (default is False)
timeout : int
Operation timeout (default is 30)
trust_env : bool
Get proxies information from HTTP_PROXY / HTTPS_PROXY environment variables if the parameter is True (False by default)
Expand source code
class Projects(BaseClient):
    """Provides acccess to PHC projects

    Parameters
    ----------
    session : phc.Session
        The PHC session
    run_async: bool
        True to return promises, False to return results (default is False)
    timeout: int
        Operation timeout (default is 30)
    trust_env: bool
        Get proxies information from HTTP_PROXY / HTTPS_PROXY environment variables if the parameter is True (False by default)
    """

    def create(self, name: str, description: str = None) -> ApiResponse:
        """Creates a project

        Parameters
        ----------
        name : str
            The project name.
        description : str, optional
            The project description, by default None

        Returns
        -------
        phc.ApiResponse
            The create project response
        """
        json_body = {"name": name}
        if description:
            json_body["description"] = description
        return self._api_call("projects", json=json_body, http_verb="POST")

    def get(self, project_id) -> ApiResponse:
        """Fetch a project by id

        Parameters
        ----------
        project_id : str
            The project ID.

        Returns
        -------
        phc.ApiResponse
            The get project response
        """
        return self._api_call(f"projects/{project_id}", http_verb="GET")

    def update(
        self, project_id: str, name: str, description: str = None
    ) -> ApiResponse:
        """Update a project

        Parameters
        ----------
         project_id : str
            The project ID.
        name : str
            The project name.
        description : str, optional
            The project description, by default None

        Returns
        -------
        phc.ApiResponse
            The update project response
        """
        json_body = {"name": name}
        if description:
            json_body["description"] = description
        return self._api_call(
            f"projects/{project_id}", json=json_body, http_verb="PATCH"
        ).data

    def delete(self, project_id: str) -> bool:
        """Delete a project

        Parameters
        ----------
        project_id : str
            The project ID.

        Returns
        -------
        bool
            True if the delete succeeeds, otherwise False
        """
        return (
            self._api_call(
                f"projects/{project_id}", http_verb="DELETE"
            ).status_code
            == 204
        )

    def get_list(
        self,
        page_size: int = None,
        next_page_token: str = None,
        name: str = None,
    ) -> ApiResponse:
        """Fetch a list of projects in an account

        Parameters
        ----------
        page_size : int, optional
            The page size, by default None
        next_page_token : str, optional
            The next page token, by default None
        name : str, optional
            A project name filter, by default None

        Returns
        -------
        phc.ApiResponse
            The list projects response
        """
        query_dict = {}
        if page_size:
            query_dict["pageSize"] = page_size
        if next_page_token:
            query_dict["nextPageToken"] = next_page_token
        if name:
            query_dict["name"] = name
        return self._api_call(
            f"projects?{urlencode(query_dict)}", http_verb="GET"
        )

Ancestors

  • phc.base_client.BaseClient

Methods

def create(self, name: str, description: str = None) ‑> phc.api_response.ApiResponse

Creates a project

Parameters

name : str
The project name.
description : str, optional
The project description, by default None

Returns

ApiResponse
The create project response
Expand source code
def create(self, name: str, description: str = None) -> ApiResponse:
    """Creates a project

    Parameters
    ----------
    name : str
        The project name.
    description : str, optional
        The project description, by default None

    Returns
    -------
    phc.ApiResponse
        The create project response
    """
    json_body = {"name": name}
    if description:
        json_body["description"] = description
    return self._api_call("projects", json=json_body, http_verb="POST")
def delete(self, project_id: str) ‑> bool

Delete a project

Parameters

project_id : str
The project ID.

Returns

bool
True if the delete succeeeds, otherwise False
Expand source code
def delete(self, project_id: str) -> bool:
    """Delete a project

    Parameters
    ----------
    project_id : str
        The project ID.

    Returns
    -------
    bool
        True if the delete succeeeds, otherwise False
    """
    return (
        self._api_call(
            f"projects/{project_id}", http_verb="DELETE"
        ).status_code
        == 204
    )
def get(self, project_id) ‑> phc.api_response.ApiResponse

Fetch a project by id

Parameters

project_id : str
The project ID.

Returns

ApiResponse
The get project response
Expand source code
def get(self, project_id) -> ApiResponse:
    """Fetch a project by id

    Parameters
    ----------
    project_id : str
        The project ID.

    Returns
    -------
    phc.ApiResponse
        The get project response
    """
    return self._api_call(f"projects/{project_id}", http_verb="GET")
def get_list(self, page_size: int = None, next_page_token: str = None, name: str = None) ‑> phc.api_response.ApiResponse

Fetch a list of projects in an account

Parameters

page_size : int, optional
The page size, by default None
next_page_token : str, optional
The next page token, by default None
name : str, optional
A project name filter, by default None

Returns

ApiResponse
The list projects response
Expand source code
def get_list(
    self,
    page_size: int = None,
    next_page_token: str = None,
    name: str = None,
) -> ApiResponse:
    """Fetch a list of projects in an account

    Parameters
    ----------
    page_size : int, optional
        The page size, by default None
    next_page_token : str, optional
        The next page token, by default None
    name : str, optional
        A project name filter, by default None

    Returns
    -------
    phc.ApiResponse
        The list projects response
    """
    query_dict = {}
    if page_size:
        query_dict["pageSize"] = page_size
    if next_page_token:
        query_dict["nextPageToken"] = next_page_token
    if name:
        query_dict["name"] = name
    return self._api_call(
        f"projects?{urlencode(query_dict)}", http_verb="GET"
    )
def update(self, project_id: str, name: str, description: str = None) ‑> phc.api_response.ApiResponse

Update a project

Parameters

project_id : str
The project ID.
name : str
The project name.
description : str, optional
The project description, by default None

Returns

ApiResponse
The update project response
Expand source code
def update(
    self, project_id: str, name: str, description: str = None
) -> ApiResponse:
    """Update a project

    Parameters
    ----------
     project_id : str
        The project ID.
    name : str
        The project name.
    description : str, optional
        The project description, by default None

    Returns
    -------
    phc.ApiResponse
        The update project response
    """
    json_body = {"name": name}
    if description:
        json_body["description"] = description
    return self._api_call(
        f"projects/{project_id}", json=json_body, http_verb="PATCH"
    ).data
class Tools (session: phc.session.Session, run_async: bool = False, timeout: int = 30, trust_env: bool = False)

Provides acccess to PHC tools registry

Parameters

session : Session
The PHC session
run_async : bool
True to return promises, False to return results (default is False)
timeout : int
Operation timeout (default is 30)
trust_env : bool
Get proxies information from HTTP_PROXY / HTTPS_PROXY environment variables if the parameter is True (False by default)
Expand source code
class Tools(BaseClient):
    """Provides acccess to PHC tools registry

    Parameters
    ----------
    session: phc.Session
        The PHC session
    run_async: bool
        True to return promises, False to return results (default is False)
    timeout: int
        Operation timeout (default is 30)
    trust_env: bool
        Get proxies information from HTTP_PROXY / HTTPS_PROXY environment variables if the parameter is True (False by default)
    """

    def create(
        self,
        name: str,
        description: str,
        access: ToolAccess,
        version: str,
        tool_class: ToolClass,
        source: str,
        labels: Optional[List[str]] = None,
    ) -> ApiResponse:
        """Create a tool.

        Parameters
        ----------
        name: str
            The name to give to the tool
        description: str
            A description of the tool
        access: ToolAccess
            The access level given to the tool [PRIVATE, ACCOUNT, PHC, PUBLIC]
        version: str
            The initial version of the tool
        tool_class: ToolClass
            The class of the tool [Workflow, Notebook]
        source: str
            The path of the tool to upload
        labels: List[str], optional
            A list of labels to apply to the tool, i.e. ["bam","samtools"]

        Returns
        -------
        ApiResponse
            The create tool response

        Examples
        --------
        >>> from phc.services import Tools
        >>> tools = Tools(session)
        >>> tools.create(name="Read Depth Notebook", description="Generates a chart of positional read depth from a bam file",
              access="PHC", version="1.0.0", tool_class="Notebook", source="./mynotebook.ipynb", labels=["bam","samtools]")
        """
        if not hasattr(ToolClass, tool_class):
            raise ValueError(
                f"{tool_class} is not a valid Tool Class value {[e.value for e in ToolClass]}"
            )

        if not hasattr(ToolAccess, access):
            raise ValueError(
                f"{access} is not a valid Tool Class value {[e.value for e in ToolAccess]}"
            )

        create_request = {
            "version": version,
            "access": access,
            "name": name,
            "toolClassId": ToolClassIdMappings[tool_class],
            "descriptorType": DescriptorTypeMappings[tool_class],
            "description": description,
        }
        if labels:
            create_request["labels"] = labels

        res = self._api_call(
            "/v1/trs/v2/tools", json=create_request, http_verb="POST"
        )

        upload_request = {
            "fileName": source.split("/").pop(),
            "toolId": res["id"],
            "version": res["meta_version"],
        }

        upload_response = self._api_call(
            "/v1/trs/files", json=upload_request, http_verb="POST"
        )
        file_size = os.path.getsize(source)
        self._api_call_impl(
            http_verb="PUT",
            url=upload_response["uploadUrl"],
            api_path=None,
            upload_file=source,
            headers={
                "Content-Length": str(file_size),
                "Authorization": None,
                "LifeOmic-Account": None,
                "Content-Type": None,
            },
        )
        return res

    @backoff.on_exception(
        backoff.expo, OSError, max_tries=6, jitter=backoff.full_jitter
    )
    def download(
        self,
        tool_id: str,
        version: Optional[str] = None,
        dest_dir: Optional[str] = os.getcwd(),
    ) -> None:
        """Download a tool

        Parameters
        ----------
        tool_id : str
            The tool ID
        version : str, optional
            The version.
        dest_dir : str, optional
            The local directory to save the tool.  Defaults to the current working directory

        Examples
        --------
        >>> from phc.services import Tools
        >>> tools = Tools(session)
        >>> tools.download(tool_id="db3e09e9-1ecd-4976-aa5e-70ac7ada0cc3", dest_dir="./mydata")
        """
        id = f"{tool_id}:{version}" if version else tool_id
        res = self._api_call(f"/v1/trs/files/{id}/download", http_verb="GET")

        file_path = os.path.join(dest_dir, res.get("fileName"))
        target_dir = os.path.dirname(file_path)
        if not os.path.exists(target_dir):
            os.makedirs(target_dir)

        urlretrieve(res.get("downloadUrl"), file_path)
        return file_path

    def get(self, tool_id: str, version: Optional[str] = None) -> ApiResponse:
        """Fetch a tool by id

        Parameters
        ----------
        tool_id : str
            The tool ID.
        version : str, optional
            The version.

        Returns
        -------
        phc.ApiResponse
            The get tool response
        """
        id = f"{tool_id}:{version}" if version else tool_id
        return self._api_call(f"/v1/trs/v2/tools/{id}", http_verb="GET")

    def add_version(
        self,
        tool_id: str,
        version: str,
        source: str,
        is_default: Optional[bool] = False,
    ) -> ApiResponse:
        """Adds a new version to a tool.

        Parameters
        ----------
        tool_id : str
            The tool ID to add the version to.
        version : str
            The new version for the tool.
        source: str
            The path of the version to upload.
        is_default: bool = False
            Updates default setting for the tool.

        Returns
        -------
        phc.ApiResponse
            The updated tool response
        """
        version_request = {
            "version": version,
            "isDefault": is_default,
        }

        res = self._api_call(
            f"/v1/trs/v2/tools/{tool_id}/versions",
            json=version_request,
            http_verb="POST",
        )
        upload_request = {
            "fileName": source.split("/").pop(),
            "toolId": res["id"],
            "version": res["meta_version"],
        }

        upload_response = self._api_call(
            "/v1/trs/files", json=upload_request, http_verb="POST"
        )
        file_size = os.path.getsize(source)
        self._api_call_impl(
            http_verb="PUT",
            url=upload_response["uploadUrl"],
            api_path=None,
            upload_file=source,
            headers={
                "Content-Length": str(file_size),
                "Authorization": None,
                "LifeOmic-Account": None,
                "Content-Type": None,
            },
        )
        return res

    def delete(self, tool_id: str, version: Optional[str] = None) -> bool:
        """Deletes a tool

        Parameters
        ----------
        tool_id : str
            The tool ID.
        version : str, optional
            The version.

        Returns
        -------
        bool
            True if the delete succeeeds, otherwise False
        """
        id = f"{tool_id}:{version}" if version else tool_id
        return (
            self._api_call(
                f"/v1/trs/v2/tools/{id}", http_verb="DELETE"
            ).status_code
            == 200
        )

    def get_list(
        self,
        tool_class: Optional[ToolClass] = None,
        organization: Optional[str] = None,
        tool_name: Optional[str] = None,
        author: Optional[str] = None,
        labels: Optional[List[str]] = None,
        page_size: Optional[int] = 1000,
        page_count: Optional[int] = 0,
    ) -> ApiResponse:
        """Fetch a list of tools from the registry

        Parameters
        ----------
        tool_class: str, optional
            The class of the tool, by default None
        organization: str, optional
            The organization that owns the tool, by default None
        tool_name: str, optional
            The name of the tool, by default None
        author: str, optional
            The creator of the tool, by default None
        labels: List[str], optional
            A list of labels describing the tool, by default None
        page_size: int, optional
            The count of tools to return in a single request, by default 1000
        page_count: int, optional
            The page count to return, by default 0

        Returns
        -------
        phc.ApiResponse
            The list files response
        """
        query_dict = {
            "limit": page_size,
            "offset": page_count,
        }
        if tool_class:
            if not hasattr(ToolClass, tool_class):
                raise ValueError(
                    f"{tool_class} is not a valid Tool Class value {[e.value for e in ToolClass]}"
                )
            query_dict["toolClass"] = tool_class
        if organization:
            query_dict["organization"] = organization
        if tool_name:
            query_dict["toolname"] = tool_name
        if author:
            query_dict["author"] = author
        if labels:
            query_dict["label"] = ",".join(labels)

        return self._api_call(
            f"/v1/trs/v2/tools?{urlencode(query_dict)}",
            http_verb="GET",
        )

Ancestors

  • phc.base_client.BaseClient

Methods

def add_version(self, tool_id: str, version: str, source: str, is_default: Union[bool, NoneType] = False) ‑> phc.api_response.ApiResponse

Adds a new version to a tool.

Parameters

tool_id : str
The tool ID to add the version to.
version : str
The new version for the tool.
source : str
The path of the version to upload.
is_default : bool = False
Updates default setting for the tool.

Returns

ApiResponse
The updated tool response
Expand source code
def add_version(
    self,
    tool_id: str,
    version: str,
    source: str,
    is_default: Optional[bool] = False,
) -> ApiResponse:
    """Adds a new version to a tool.

    Parameters
    ----------
    tool_id : str
        The tool ID to add the version to.
    version : str
        The new version for the tool.
    source: str
        The path of the version to upload.
    is_default: bool = False
        Updates default setting for the tool.

    Returns
    -------
    phc.ApiResponse
        The updated tool response
    """
    version_request = {
        "version": version,
        "isDefault": is_default,
    }

    res = self._api_call(
        f"/v1/trs/v2/tools/{tool_id}/versions",
        json=version_request,
        http_verb="POST",
    )
    upload_request = {
        "fileName": source.split("/").pop(),
        "toolId": res["id"],
        "version": res["meta_version"],
    }

    upload_response = self._api_call(
        "/v1/trs/files", json=upload_request, http_verb="POST"
    )
    file_size = os.path.getsize(source)
    self._api_call_impl(
        http_verb="PUT",
        url=upload_response["uploadUrl"],
        api_path=None,
        upload_file=source,
        headers={
            "Content-Length": str(file_size),
            "Authorization": None,
            "LifeOmic-Account": None,
            "Content-Type": None,
        },
    )
    return res
def create(self, name: str, description: str, access: phc.services.tools.ToolAccess, version: str, tool_class: phc.services.tools.ToolClass, source: str, labels: Union[List[str], NoneType] = None) ‑> phc.api_response.ApiResponse

Create a tool.

Parameters

name : str
The name to give to the tool
description : str
A description of the tool
access : ToolAccess
The access level given to the tool [PRIVATE, ACCOUNT, PHC, PUBLIC]
version : str
The initial version of the tool
tool_class : ToolClass
The class of the tool [Workflow, Notebook]
source : str
The path of the tool to upload
labels : List[str], optional
A list of labels to apply to the tool, i.e. ["bam","samtools"]

Returns

ApiResponse
The create tool response

Examples

>>> from phc.services import Tools
>>> tools = Tools(session)
>>> tools.create(name="Read Depth Notebook", description="Generates a chart of positional read depth from a bam file",
      access="PHC", version="1.0.0", tool_class="Notebook", source="./mynotebook.ipynb", labels=["bam","samtools]")
Expand source code
def create(
    self,
    name: str,
    description: str,
    access: ToolAccess,
    version: str,
    tool_class: ToolClass,
    source: str,
    labels: Optional[List[str]] = None,
) -> ApiResponse:
    """Create a tool.

    Parameters
    ----------
    name: str
        The name to give to the tool
    description: str
        A description of the tool
    access: ToolAccess
        The access level given to the tool [PRIVATE, ACCOUNT, PHC, PUBLIC]
    version: str
        The initial version of the tool
    tool_class: ToolClass
        The class of the tool [Workflow, Notebook]
    source: str
        The path of the tool to upload
    labels: List[str], optional
        A list of labels to apply to the tool, i.e. ["bam","samtools"]

    Returns
    -------
    ApiResponse
        The create tool response

    Examples
    --------
    >>> from phc.services import Tools
    >>> tools = Tools(session)
    >>> tools.create(name="Read Depth Notebook", description="Generates a chart of positional read depth from a bam file",
          access="PHC", version="1.0.0", tool_class="Notebook", source="./mynotebook.ipynb", labels=["bam","samtools]")
    """
    if not hasattr(ToolClass, tool_class):
        raise ValueError(
            f"{tool_class} is not a valid Tool Class value {[e.value for e in ToolClass]}"
        )

    if not hasattr(ToolAccess, access):
        raise ValueError(
            f"{access} is not a valid Tool Class value {[e.value for e in ToolAccess]}"
        )

    create_request = {
        "version": version,
        "access": access,
        "name": name,
        "toolClassId": ToolClassIdMappings[tool_class],
        "descriptorType": DescriptorTypeMappings[tool_class],
        "description": description,
    }
    if labels:
        create_request["labels"] = labels

    res = self._api_call(
        "/v1/trs/v2/tools", json=create_request, http_verb="POST"
    )

    upload_request = {
        "fileName": source.split("/").pop(),
        "toolId": res["id"],
        "version": res["meta_version"],
    }

    upload_response = self._api_call(
        "/v1/trs/files", json=upload_request, http_verb="POST"
    )
    file_size = os.path.getsize(source)
    self._api_call_impl(
        http_verb="PUT",
        url=upload_response["uploadUrl"],
        api_path=None,
        upload_file=source,
        headers={
            "Content-Length": str(file_size),
            "Authorization": None,
            "LifeOmic-Account": None,
            "Content-Type": None,
        },
    )
    return res
def delete(self, tool_id: str, version: Union[str, NoneType] = None) ‑> bool

Deletes a tool

Parameters

tool_id : str
The tool ID.
version : str, optional
The version.

Returns

bool
True if the delete succeeeds, otherwise False
Expand source code
def delete(self, tool_id: str, version: Optional[str] = None) -> bool:
    """Deletes a tool

    Parameters
    ----------
    tool_id : str
        The tool ID.
    version : str, optional
        The version.

    Returns
    -------
    bool
        True if the delete succeeeds, otherwise False
    """
    id = f"{tool_id}:{version}" if version else tool_id
    return (
        self._api_call(
            f"/v1/trs/v2/tools/{id}", http_verb="DELETE"
        ).status_code
        == 200
    )
def download(self, tool_id: str, version: Union[str, NoneType] = None, dest_dir: Union[str, NoneType] = '/home/runner/work/phc-sdk-py/phc-sdk-py') ‑> NoneType

Download a tool

Parameters

tool_id : str
The tool ID
version : str, optional
The version.
dest_dir : str, optional
The local directory to save the tool. Defaults to the current working directory

Examples

>>> from phc.services import Tools
>>> tools = Tools(session)
>>> tools.download(tool_id="db3e09e9-1ecd-4976-aa5e-70ac7ada0cc3", dest_dir="./mydata")
Expand source code
@backoff.on_exception(
    backoff.expo, OSError, max_tries=6, jitter=backoff.full_jitter
)
def download(
    self,
    tool_id: str,
    version: Optional[str] = None,
    dest_dir: Optional[str] = os.getcwd(),
) -> None:
    """Download a tool

    Parameters
    ----------
    tool_id : str
        The tool ID
    version : str, optional
        The version.
    dest_dir : str, optional
        The local directory to save the tool.  Defaults to the current working directory

    Examples
    --------
    >>> from phc.services import Tools
    >>> tools = Tools(session)
    >>> tools.download(tool_id="db3e09e9-1ecd-4976-aa5e-70ac7ada0cc3", dest_dir="./mydata")
    """
    id = f"{tool_id}:{version}" if version else tool_id
    res = self._api_call(f"/v1/trs/files/{id}/download", http_verb="GET")

    file_path = os.path.join(dest_dir, res.get("fileName"))
    target_dir = os.path.dirname(file_path)
    if not os.path.exists(target_dir):
        os.makedirs(target_dir)

    urlretrieve(res.get("downloadUrl"), file_path)
    return file_path
def get(self, tool_id: str, version: Union[str, NoneType] = None) ‑> phc.api_response.ApiResponse

Fetch a tool by id

Parameters

tool_id : str
The tool ID.
version : str, optional
The version.

Returns

ApiResponse
The get tool response
Expand source code
def get(self, tool_id: str, version: Optional[str] = None) -> ApiResponse:
    """Fetch a tool by id

    Parameters
    ----------
    tool_id : str
        The tool ID.
    version : str, optional
        The version.

    Returns
    -------
    phc.ApiResponse
        The get tool response
    """
    id = f"{tool_id}:{version}" if version else tool_id
    return self._api_call(f"/v1/trs/v2/tools/{id}", http_verb="GET")
def get_list(self, tool_class: Union[phc.services.tools.ToolClass, NoneType] = None, organization: Union[str, NoneType] = None, tool_name: Union[str, NoneType] = None, author: Union[str, NoneType] = None, labels: Union[List[str], NoneType] = None, page_size: Union[int, NoneType] = 1000, page_count: Union[int, NoneType] = 0) ‑> phc.api_response.ApiResponse

Fetch a list of tools from the registry

Parameters

tool_class : str, optional
The class of the tool, by default None
organization : str, optional
The organization that owns the tool, by default None
tool_name : str, optional
The name of the tool, by default None
author : str, optional
The creator of the tool, by default None
labels : List[str], optional
A list of labels describing the tool, by default None
page_size : int, optional
The count of tools to return in a single request, by default 1000
page_count : int, optional
The page count to return, by default 0

Returns

ApiResponse
The list files response
Expand source code
def get_list(
    self,
    tool_class: Optional[ToolClass] = None,
    organization: Optional[str] = None,
    tool_name: Optional[str] = None,
    author: Optional[str] = None,
    labels: Optional[List[str]] = None,
    page_size: Optional[int] = 1000,
    page_count: Optional[int] = 0,
) -> ApiResponse:
    """Fetch a list of tools from the registry

    Parameters
    ----------
    tool_class: str, optional
        The class of the tool, by default None
    organization: str, optional
        The organization that owns the tool, by default None
    tool_name: str, optional
        The name of the tool, by default None
    author: str, optional
        The creator of the tool, by default None
    labels: List[str], optional
        A list of labels describing the tool, by default None
    page_size: int, optional
        The count of tools to return in a single request, by default 1000
    page_count: int, optional
        The page count to return, by default 0

    Returns
    -------
    phc.ApiResponse
        The list files response
    """
    query_dict = {
        "limit": page_size,
        "offset": page_count,
    }
    if tool_class:
        if not hasattr(ToolClass, tool_class):
            raise ValueError(
                f"{tool_class} is not a valid Tool Class value {[e.value for e in ToolClass]}"
            )
        query_dict["toolClass"] = tool_class
    if organization:
        query_dict["organization"] = organization
    if tool_name:
        query_dict["toolname"] = tool_name
    if author:
        query_dict["author"] = author
    if labels:
        query_dict["label"] = ",".join(labels)

    return self._api_call(
        f"/v1/trs/v2/tools?{urlencode(query_dict)}",
        http_verb="GET",
    )
class Workflows (session: phc.session.Session, run_async: bool = False, timeout: int = 30, trust_env: bool = False)

Provides acccess to PHC Workflows

Parameters

session : Session
The PHC session
run_async : bool
True to return promises, False to return results (default is False)
timeout : int
Operation timeout (default is 30)
trust_env : bool
Get proxies information from HTTP_PROXY / HTTPS_PROXY environment variables if the parameter is True (False by default)
Expand source code
class Workflows(BaseClient):
    """Provides acccess to PHC Workflows

    Parameters
    ----------
    session: phc.Session
        The PHC session
    run_async: bool
        True to return promises, False to return results (default is False)
    timeout: int
        Operation timeout (default is 30)
    trust_env: bool
        Get proxies information from HTTP_PROXY / HTTPS_PROXY environment variables if the parameter is True (False by default)
    """

    def run(
        self,
        project_id: str,
        name: str,
        tool: str,
        workflow_inputs: Optional[str] = None,
        workflow_inputs_file_id: Optional[str] = None,
        output_project_folder: Optional[str] = None,
    ) -> ApiResponse:
        """Create a tool.

        Parameters
        ----------
        project_id: str
            The project ID
        name: str
            The name to give to this run of a tool
        tool: str
            The tool id or organization/name of the tool to run
        workflow_inputs: str, optional
            The inputs required by the workflow as a json string, either this or workflow_inputs_file_id are required
        workflow_inputs_file_id: str, optional
            The inputs required by the workflow as provided in a file in PHC, either this or workflow_inputs are required
        output_project_folder: str, optional
            The destination output folder in PHC for the workflow run outputs

        Returns
        -------
        ApiResponse
            The workflow run response

        Examples
        --------
        >>> from phc.services import Workflows
        >>> workflows = Workflows(session)
        >>> workflows.run(project_id="d2876f48-724f-4987-9cf0-92c7ef99a9fa",
              name="Ashion ingest subj: 2405",
              tool="lifeomic/ashion-ingest-workflow",
              workflow_inputs="{'reference': 'GRCh37','tarFile': {'class': 'File','fileId': '28235c74-9731-4496-bb3c-41c361f106f3'}, 'source': 'incoming/ashion_C043_9999_009990_T1_K1ID2_ps20190814000000.tar.gz'}")
        """
        create_request = {
            "datasetId": project_id,
            "name": name,
            "workflowSourceFileId": tool,
        }

        if workflow_inputs:
            create_request["workflowInputs"] = workflow_inputs
        elif workflow_inputs_file_id:
            create_request["workflowInputsFileId"] = workflow_inputs_file_id
        else:
            raise ValueError(
                "Must provide a value for the workflow_inputs or workflow_inputs_file_id"
            )

        if output_project_folder:
            create_request["outputProjectFolder"] = output_project_folder

        res = self._api_call(
            "/v1/workflows/ga4gh/wes/runs",
            json=create_request,
            http_verb="POST",
        )
        return res

    def get(self, project_id: str, workflow_id: str) -> ApiResponse:
        """Get workflow metadata by id

        Parameters
        ----------
        project_id: str
            The project ID
        workflow_id : str
            The workflow ID.

        Returns
        -------
        phc.ApiResponse
            The get workflow response
        """
        return self._api_call(
            f"/v1/workflows/ga4gh/wes/runs/{project_id}:{workflow_id}",
            http_verb="GET",
        )

    def get_list(
        self,
        project_id: str,
        page_size: Optional[int] = 100,
        next_page_token: Optional[str] = None,
    ) -> ApiResponse:
        """Fetch a list of workflows run in the specified project

        Parameters
        ----------
        project_id: str
            The project ID
        page_size : int, optional
            The page size, by default 100
        next_page_token : str, optional
            The next page token, by default None

        Returns
        -------
        phc.ApiResponse
            The list workflow run response
        """
        query_dict = {
            "datasetId": project_id,
        }
        if page_size:
            query_dict["pageSize"] = page_size
        if next_page_token:
            query_dict["nextPageToken"] = next_page_token

        return self._api_call(
            f"/v1/workflows/ga4gh/wes/runs?{urlencode(query_dict)}",
            http_verb="GET",
        )

    def describe(self, project_id: str, tool: str) -> ApiResponse:
        """Returns a description of the inputs the workflow engine requires for the given tool

        Parameters
        ----------
        project_id: str
            The project ID
        tool: str
            The tool id or organization/name of the tool to run

        Returns
        -------
        phc.ApiResponse
            The description of the inputs for the given tool
        """
        describe_request = {
            "datasetId": project_id,
            "workflowSourceFileId": tool,
        }

        return self._api_call(
            "/v1/workflows/ga4gh/wes/runs/parse",
            json=describe_request,
            http_verb="POST",
        )

Ancestors

  • phc.base_client.BaseClient

Methods

def describe(self, project_id: str, tool: str) ‑> phc.api_response.ApiResponse

Returns a description of the inputs the workflow engine requires for the given tool

Parameters

project_id : str
The project ID
tool : str
The tool id or organization/name of the tool to run

Returns

ApiResponse
The description of the inputs for the given tool
Expand source code
def describe(self, project_id: str, tool: str) -> ApiResponse:
    """Returns a description of the inputs the workflow engine requires for the given tool

    Parameters
    ----------
    project_id: str
        The project ID
    tool: str
        The tool id or organization/name of the tool to run

    Returns
    -------
    phc.ApiResponse
        The description of the inputs for the given tool
    """
    describe_request = {
        "datasetId": project_id,
        "workflowSourceFileId": tool,
    }

    return self._api_call(
        "/v1/workflows/ga4gh/wes/runs/parse",
        json=describe_request,
        http_verb="POST",
    )
def get(self, project_id: str, workflow_id: str) ‑> phc.api_response.ApiResponse

Get workflow metadata by id

Parameters

project_id : str
The project ID
workflow_id : str
The workflow ID.

Returns

ApiResponse
The get workflow response
Expand source code
def get(self, project_id: str, workflow_id: str) -> ApiResponse:
    """Get workflow metadata by id

    Parameters
    ----------
    project_id: str
        The project ID
    workflow_id : str
        The workflow ID.

    Returns
    -------
    phc.ApiResponse
        The get workflow response
    """
    return self._api_call(
        f"/v1/workflows/ga4gh/wes/runs/{project_id}:{workflow_id}",
        http_verb="GET",
    )
def get_list(self, project_id: str, page_size: Union[int, NoneType] = 100, next_page_token: Union[str, NoneType] = None) ‑> phc.api_response.ApiResponse

Fetch a list of workflows run in the specified project

Parameters

project_id : str
The project ID
page_size : int, optional
The page size, by default 100
next_page_token : str, optional
The next page token, by default None

Returns

ApiResponse
The list workflow run response
Expand source code
def get_list(
    self,
    project_id: str,
    page_size: Optional[int] = 100,
    next_page_token: Optional[str] = None,
) -> ApiResponse:
    """Fetch a list of workflows run in the specified project

    Parameters
    ----------
    project_id: str
        The project ID
    page_size : int, optional
        The page size, by default 100
    next_page_token : str, optional
        The next page token, by default None

    Returns
    -------
    phc.ApiResponse
        The list workflow run response
    """
    query_dict = {
        "datasetId": project_id,
    }
    if page_size:
        query_dict["pageSize"] = page_size
    if next_page_token:
        query_dict["nextPageToken"] = next_page_token

    return self._api_call(
        f"/v1/workflows/ga4gh/wes/runs?{urlencode(query_dict)}",
        http_verb="GET",
    )
def run(self, project_id: str, name: str, tool: str, workflow_inputs: Union[str, NoneType] = None, workflow_inputs_file_id: Union[str, NoneType] = None, output_project_folder: Union[str, NoneType] = None) ‑> phc.api_response.ApiResponse

Create a tool.

Parameters

project_id : str
The project ID
name : str
The name to give to this run of a tool
tool : str
The tool id or organization/name of the tool to run
workflow_inputs : str, optional
The inputs required by the workflow as a json string, either this or workflow_inputs_file_id are required
workflow_inputs_file_id : str, optional
The inputs required by the workflow as provided in a file in PHC, either this or workflow_inputs are required
output_project_folder : str, optional
The destination output folder in PHC for the workflow run outputs

Returns

ApiResponse
The workflow run response

Examples

>>> from phc.services import Workflows
>>> workflows = Workflows(session)
>>> workflows.run(project_id="d2876f48-724f-4987-9cf0-92c7ef99a9fa",
      name="Ashion ingest subj: 2405",
      tool="lifeomic/ashion-ingest-workflow",
      workflow_inputs="{'reference': 'GRCh37','tarFile': {'class': 'File','fileId': '28235c74-9731-4496-bb3c-41c361f106f3'}, 'source': 'incoming/ashion_C043_9999_009990_T1_K1ID2_ps20190814000000.tar.gz'}")
Expand source code
def run(
    self,
    project_id: str,
    name: str,
    tool: str,
    workflow_inputs: Optional[str] = None,
    workflow_inputs_file_id: Optional[str] = None,
    output_project_folder: Optional[str] = None,
) -> ApiResponse:
    """Create a tool.

    Parameters
    ----------
    project_id: str
        The project ID
    name: str
        The name to give to this run of a tool
    tool: str
        The tool id or organization/name of the tool to run
    workflow_inputs: str, optional
        The inputs required by the workflow as a json string, either this or workflow_inputs_file_id are required
    workflow_inputs_file_id: str, optional
        The inputs required by the workflow as provided in a file in PHC, either this or workflow_inputs are required
    output_project_folder: str, optional
        The destination output folder in PHC for the workflow run outputs

    Returns
    -------
    ApiResponse
        The workflow run response

    Examples
    --------
    >>> from phc.services import Workflows
    >>> workflows = Workflows(session)
    >>> workflows.run(project_id="d2876f48-724f-4987-9cf0-92c7ef99a9fa",
          name="Ashion ingest subj: 2405",
          tool="lifeomic/ashion-ingest-workflow",
          workflow_inputs="{'reference': 'GRCh37','tarFile': {'class': 'File','fileId': '28235c74-9731-4496-bb3c-41c361f106f3'}, 'source': 'incoming/ashion_C043_9999_009990_T1_K1ID2_ps20190814000000.tar.gz'}")
    """
    create_request = {
        "datasetId": project_id,
        "name": name,
        "workflowSourceFileId": tool,
    }

    if workflow_inputs:
        create_request["workflowInputs"] = workflow_inputs
    elif workflow_inputs_file_id:
        create_request["workflowInputsFileId"] = workflow_inputs_file_id
    else:
        raise ValueError(
            "Must provide a value for the workflow_inputs or workflow_inputs_file_id"
        )

    if output_project_folder:
        create_request["outputProjectFolder"] = output_project_folder

    res = self._api_call(
        "/v1/workflows/ga4gh/wes/runs",
        json=create_request,
        http_verb="POST",
    )
    return res