Module phc.services
Contains services for accessing different parts of the PHC platform.
Expand source code
"""
Contains services for accessing different parts of the PHC platform.
"""
from phc.services.accounts import Accounts
from phc.services.analytics import Analytics
from phc.services.fhir import Fhir
from phc.services.projects import Projects
from phc.services.files import Files
from phc.services.cohorts import Cohorts
from phc.services.genomics import Genomics
from phc.services.tools import Tools
from phc.services.workflows import Workflows
from phc.services.genomic_ingestions import GenomicIngestions, IngestionStep
from phc.services.tasks import Tasks
from phc.services.patient_ml import PatientML
__all__ = [
"Accounts",
"Analytics",
"Fhir",
"Projects",
"Files",
"Cohorts",
"Genomics",
"Tools",
"Workflows",
"GenomicIngestions",
"Tasks",
"PatientML",
]
__pdoc__ = {
"accounts": False,
"analytics": False,
"fhir": False,
"projects": False,
"files": False,
"cohorts": False,
"genomics": False,
"tools": False,
"workflows": False,
"genomic_ingestions": False,
"tasks": False,
"patient_ml": False,
}
Sub-modules
phc.services.agents
Classes
class Accounts (session: phc.session.Session, run_async: bool = False, timeout: int = 30, trust_env: bool = False)
-
Provides acccess to PHC accounts
Parameters
session
:Session
- The PHC session
run_async
:bool
- True to return promises, False to return results (default is False)
timeout
:int
- Operation timeout (default is 30)
trust_env
:bool
- Get proxies information from HTTP_PROXY / HTTPS_PROXY environment variables if the parameter is True (False by default)
Expand source code
class Accounts(BaseClient): """Provides acccess to PHC accounts Parameters ---------- session : phc.Session The PHC session run_async: bool True to return promises, False to return results (default is False) timeout: int Operation timeout (default is 30) trust_env: bool Get proxies information from HTTP_PROXY / HTTPS_PROXY environment variables if the parameter is True (False by default) """ def get_list(self) -> ApiResponse: """Fetches the list of accounts for the current session Returns ------- phc.ApiResponse The list accounts response """ return self._api_call("accounts", http_verb="GET")
Ancestors
- phc.base_client.BaseClient
Methods
def get_list(self) ‑> phc.api_response.ApiResponse
-
Expand source code
def get_list(self) -> ApiResponse: """Fetches the list of accounts for the current session Returns ------- phc.ApiResponse The list accounts response """ return self._api_call("accounts", http_verb="GET")
class Analytics (session: phc.session.Session, run_async: bool = False, timeout: int = 30, trust_env: bool = False)
-
Provides acccess to PHC Analytics
Parameters
session
:Session
- The PHC session
run_async
:bool
- True to return promises, False to return results (default is False)
timeout
:int
- Operation timeout (default is 30)
trust_env
:bool
- Get proxies information from HTTP_PROXY / HTTPS_PROXY environment variables if the parameter is True (False by default)
Expand source code
class Analytics(BaseClient): """Provides acccess to PHC Analytics Parameters ---------- session : phc.Session The PHC session run_async: bool True to return promises, False to return results (default is False) timeout: int Operation timeout (default is 30) trust_env: bool Get proxies information from HTTP_PROXY / HTTPS_PROXY environment variables if the parameter is True (False by default) """ def execute_sql( self, statement: str, project_id: Optional[str] = None, cohort_id: Optional[str] = None, ) -> ApiResponse: """Executes a SQL query against Analytics Parameters ---------- project_id : str The project ID cohort_id : str The cohort ID statement : str The SQL statement Returns ------- ApiResponse The API Response Raises ------ ValueError If no project or cohort ID is provided Examples -------- >>> from phc.services import Analytics >>> client = Analytics(session) >>> res = client.execute_sql(cohort_id='5a07dedb-fa2a-4cb0-b662-95b23a050221', statement='SELECT patients from patient') >>> print(f"Found {len(res.get('data').get('patients'))} patients") """ if not project_id and not cohort_id: raise ValueError( "Must provide a value for the project or cohort ID" ) payload = {"string_query": statement} if project_id: payload["dataset_id"] = project_id if cohort_id: payload["cohort_id"] = cohort_id return self._api_call("analytics/dsl", http_verb="POST", json=payload) def get_patients( self, project_id: str, query_builder: PatientFilterQueryBuilder ) -> ApiResponse: """Executes a query that returns patients Parameters ---------- project_id : str The project ID query_builder : util.PatientFilterQueryBuilder The query builder Returns ------- list The list of patients Examples -------- >>> from phc.services import Analytics >>> from phc.util import PatientFilterQueryBuilder >>> client = Analytics(session) >>> search = PatientFilterQueryBuilder() >>> search.patient() \ .observations() \ .code(eq='11142-7') \ .system(eq='http://loinc.org') \ .value_quantity(lt=40) >>> res = client.get_patients(project='5a07dedb-fa2a-4cb0-b662-95b23a050221', query_builder=search) >>> print(f"Found {len(res)} patients") """ payload = query_builder.to_dict() payload["dataset_id"] = project_id return ( self._api_call("analytics/dsl", http_verb="POST", json=payload) .get("data") .get("patients") )
Ancestors
- phc.base_client.BaseClient
Methods
def execute_sql(self, statement: str, project_id: Optional[str] = None, cohort_id: Optional[str] = None) ‑> phc.api_response.ApiResponse
-
Executes a SQL query against Analytics
Parameters
project_id
:str
- The project ID
cohort_id
:str
- The cohort ID
statement
:str
- The SQL statement
Returns
ApiResponse
- The API Response
Raises
ValueError
- If no project or cohort ID is provided
Examples
>>> from phc.services import Analytics >>> client = Analytics(session) >>> res = client.execute_sql(cohort_id='5a07dedb-fa2a-4cb0-b662-95b23a050221', statement='SELECT patients from patient') >>> print(f"Found {len(res.get('data').get('patients'))} patients")
Expand source code
def execute_sql( self, statement: str, project_id: Optional[str] = None, cohort_id: Optional[str] = None, ) -> ApiResponse: """Executes a SQL query against Analytics Parameters ---------- project_id : str The project ID cohort_id : str The cohort ID statement : str The SQL statement Returns ------- ApiResponse The API Response Raises ------ ValueError If no project or cohort ID is provided Examples -------- >>> from phc.services import Analytics >>> client = Analytics(session) >>> res = client.execute_sql(cohort_id='5a07dedb-fa2a-4cb0-b662-95b23a050221', statement='SELECT patients from patient') >>> print(f"Found {len(res.get('data').get('patients'))} patients") """ if not project_id and not cohort_id: raise ValueError( "Must provide a value for the project or cohort ID" ) payload = {"string_query": statement} if project_id: payload["dataset_id"] = project_id if cohort_id: payload["cohort_id"] = cohort_id return self._api_call("analytics/dsl", http_verb="POST", json=payload)
def get_patients(self, project_id: str, query_builder: phc.util.patient_filter_query_builder.PatientFilterQueryBuilder) ‑> phc.api_response.ApiResponse
-
Executes a query that returns patients
Parameters
project_id
:str
- The project ID
query_builder
:util.PatientFilterQueryBuilder
- The query builder
Returns
list
- The list of patients
Examples
>>> from phc.services import Analytics >>> from phc.util import PatientFilterQueryBuilder >>> client = Analytics(session) >>> search = PatientFilterQueryBuilder() >>> search.patient() .observations() .code(eq='11142-7') .system(eq='http://loinc.org') .value_quantity(lt=40) >>> res = client.get_patients(project='5a07dedb-fa2a-4cb0-b662-95b23a050221', query_builder=search) >>> print(f"Found {len(res)} patients")
Expand source code
def get_patients( self, project_id: str, query_builder: PatientFilterQueryBuilder ) -> ApiResponse: """Executes a query that returns patients Parameters ---------- project_id : str The project ID query_builder : util.PatientFilterQueryBuilder The query builder Returns ------- list The list of patients Examples -------- >>> from phc.services import Analytics >>> from phc.util import PatientFilterQueryBuilder >>> client = Analytics(session) >>> search = PatientFilterQueryBuilder() >>> search.patient() \ .observations() \ .code(eq='11142-7') \ .system(eq='http://loinc.org') \ .value_quantity(lt=40) >>> res = client.get_patients(project='5a07dedb-fa2a-4cb0-b662-95b23a050221', query_builder=search) >>> print(f"Found {len(res)} patients") """ payload = query_builder.to_dict() payload["dataset_id"] = project_id return ( self._api_call("analytics/dsl", http_verb="POST", json=payload) .get("data") .get("patients") )
class Cohorts (session: phc.session.Session, run_async: bool = False, timeout: int = 30, trust_env: bool = False)
-
Provides acccess to PHC cohorts
Parameters
session
:Session
- The PHC session
run_async
:bool
- True to return promises, False to return results (default is False)
timeout
:int
- Operation timeout (default is 30)
trust_env
:bool
- Get proxies information from HTTP_PROXY / HTTPS_PROXY environment variables if the parameter is True (False by default)
Expand source code
class Cohorts(BaseClient): """Provides acccess to PHC cohorts Parameters ---------- session : phc.Session The PHC session run_async: bool True to return promises, False to return results (default is False) timeout: int Operation timeout (default is 30) trust_env: bool Get proxies information from HTTP_PROXY / HTTPS_PROXY environment variables if the parameter is True (False by default) """ def create( self, project_id: str, name: str, queries: list, description: Optional[str] = None, ) -> ApiResponse: """Creates a cohort Parameters ---------- project_id: str The project that owns the cohort name : str The cohort name. queries: list The list of queries that define the cohort description : str, optional The cohort description, by default None Returns ------- phc.ApiResponse The create cohort response """ json_body = { "name": name, "ownerProject": project_id, "queries": queries, } if description: json_body["description"] = description return self._api_call("cohorts", json=json_body, http_verb="POST") def get(self, cohort_id: str) -> ApiResponse: """Fetch a cohort by id Parameters ---------- cohort_id : str The cohort ID. Returns ------- phc.ApiResponse The get cohort response """ return self._api_call(f"cohorts/{cohort_id}", http_verb="GET") def delete(self, cohort_id: str) -> bool: """Delete a cohort Parameters ---------- cohort_id : str The cohort ID. Returns ------- bool True if the delete succeeeds, otherwise False """ return ( self._api_call( f"cohorts/{cohort_id}", http_verb="DELETE" ).status_code == 204 ) def get_list( self, project_id: str, page_size: Optional[int] = None, next_page_token: Optional[str] = None, name: Optional[str] = None, ) -> ApiResponse: """Fetch a list of cohorts in a project Parameters ---------- project_id: str The project ID to search within page_size : int, optional The page size, by default None next_page_token : str, optional The next page token, by default None name : str, optional A cohort name filter, by default None Returns ------- phc.ApiResponse The list cohorts response """ query_dict = {"projectId": project_id} if page_size: query_dict["pageSize"] = page_size if next_page_token: query_dict["nextPageToken"] = next_page_token if name: query_dict["name"] = name return self._api_call( f"cohorts?{urlencode(query_dict)}", http_verb="GET" )
Ancestors
- phc.base_client.BaseClient
Methods
def create(self, project_id: str, name: str, queries: list, description: Optional[str] = None) ‑> phc.api_response.ApiResponse
-
Creates a cohort
Parameters
project_id
:str
- The project that owns the cohort
name
:str
- The cohort name.
queries
:list
- The list of queries that define the cohort
description
:str
, optional- The cohort description, by default None
Returns
ApiResponse
- The create cohort response
Expand source code
def create( self, project_id: str, name: str, queries: list, description: Optional[str] = None, ) -> ApiResponse: """Creates a cohort Parameters ---------- project_id: str The project that owns the cohort name : str The cohort name. queries: list The list of queries that define the cohort description : str, optional The cohort description, by default None Returns ------- phc.ApiResponse The create cohort response """ json_body = { "name": name, "ownerProject": project_id, "queries": queries, } if description: json_body["description"] = description return self._api_call("cohorts", json=json_body, http_verb="POST")
def delete(self, cohort_id: str) ‑> bool
-
Delete a cohort
Parameters
cohort_id
:str
- The cohort ID.
Returns
bool
- True if the delete succeeeds, otherwise False
Expand source code
def delete(self, cohort_id: str) -> bool: """Delete a cohort Parameters ---------- cohort_id : str The cohort ID. Returns ------- bool True if the delete succeeeds, otherwise False """ return ( self._api_call( f"cohorts/{cohort_id}", http_verb="DELETE" ).status_code == 204 )
def get(self, cohort_id: str) ‑> phc.api_response.ApiResponse
-
Fetch a cohort by id
Parameters
cohort_id
:str
- The cohort ID.
Returns
ApiResponse
- The get cohort response
Expand source code
def get(self, cohort_id: str) -> ApiResponse: """Fetch a cohort by id Parameters ---------- cohort_id : str The cohort ID. Returns ------- phc.ApiResponse The get cohort response """ return self._api_call(f"cohorts/{cohort_id}", http_verb="GET")
def get_list(self, project_id: str, page_size: Optional[int] = None, next_page_token: Optional[str] = None, name: Optional[str] = None) ‑> phc.api_response.ApiResponse
-
Fetch a list of cohorts in a project
Parameters
project_id
:str
- The project ID to search within
page_size
:int
, optional- The page size, by default None
next_page_token
:str
, optional- The next page token, by default None
name
:str
, optional- A cohort name filter, by default None
Returns
ApiResponse
- The list cohorts response
Expand source code
def get_list( self, project_id: str, page_size: Optional[int] = None, next_page_token: Optional[str] = None, name: Optional[str] = None, ) -> ApiResponse: """Fetch a list of cohorts in a project Parameters ---------- project_id: str The project ID to search within page_size : int, optional The page size, by default None next_page_token : str, optional The next page token, by default None name : str, optional A cohort name filter, by default None Returns ------- phc.ApiResponse The list cohorts response """ query_dict = {"projectId": project_id} if page_size: query_dict["pageSize"] = page_size if next_page_token: query_dict["nextPageToken"] = next_page_token if name: query_dict["name"] = name return self._api_call( f"cohorts?{urlencode(query_dict)}", http_verb="GET" )
class Fhir (session: phc.session.Session, run_async: bool = False, timeout: int = 30, trust_env: bool = False)
-
Provides bindings to the LifeOmic FHIR Service APIs
Expand source code
class Fhir(BaseClient): """Provides bindings to the LifeOmic FHIR Service APIs""" def dsl(self, project: str, data: dict, scroll=""): """Executes a LifeOmic FHIR Service DSL request Parameters ---------- project : str The target LifeOmic project identifier data : dict The DSL request object scroll The scroll request parameter Returns ------- phc.ApiResponse The API response """ path = f"fhir-search/projects/{project}" scroll = scroll if scroll is not True else "true" params = {"scroll": scroll if scroll is not True else "true"} return self._api_call( http_verb="POST", api_path=path, params=params, json=data ) def sql(self, project: str, statement: str, scroll="") -> ApiResponse: """Executes a LifeOmic FHIR Service SQL request Parameters ---------- project : str The target LifeOmic project identifier statement : str The SQL request statement scroll The scroll request parameter Returns ------- phc.ApiResponse The API response """ path = f"fhir-search/projects/{project}" headers = {"Content-Type": "text/plain"} params = {"scroll": scroll if scroll is not True else "true"} return self._api_call( http_verb="POST", api_path=path, headers=headers, params=params, data=statement, ) def execute_sql( self, project_id: str, statement: str, scroll="" ) -> ApiResponse: """Executes an SQL query against fhir-search-service Parameters ---------- project_id : str The project ID. statement : str The SQL statement. Returns ------- phc.ApiResponse The query response. Examples -------- >>> import pandas as pd >>> from phc.services import Fhir >>> fhir = Fhir(session) >>> res = fhir.execute_sql(project_id='19e34782-91c4-4143-aaee-2ba81ed0b206', statement='SELECT * from patient LIMIT 0,5000') >>> resources = list(map(lambda r: r.get("_source"), res.get("hits").get("hits"))) >>> df = pd.DataFrame(resources) """ """Executes an SQL query against fhir-search-service Returns: [List] -- Dictionary with query response """ warnings.warn("Use the sql method instead", DeprecationWarning) return self._api_call( api_path=f"fhir-search/projects/{project_id}", http_verb="POST", data=statement, headers={"Content-Type": "text/plain"}, params={"scroll": scroll}, ) def execute_es( self, project_id: str, query: dict, scroll="" ) -> ApiResponse: """Executes an elasticsearch query against fhir-search-service Parameters ---------- project_id : str The project ID query : dict The ES query dictionary Returns ------- phc.ApiResponse The query response """ warnings.warn("Use the dsl method instead", DeprecationWarning) return self._api_call( api_path=f"fhir-search/projects/{project_id}", http_verb="POST", json=query, params={"scroll": scroll}, ) def es_sql( self, project_id: str, statement: str, params: List[Dict] = [], subject_id="", ) -> ApiResponse: """Executes an OpenSearch SQL against fhir-search-service Parameters ---------- project_id : str The project ID statement : str The prepared OpenSearch SQL statement params: List[Dict] The parameters for the SQL statement subject_id : str, optional The subject ID Returns ------- phc.ApiResponse The query response """ api_path = f"fhir-search/sql/projects/{project_id}" if subject_id is not None and subject_id != "": api_path = f"{api_path}/patients/{subject_id}" return self._api_call( api_path=api_path, http_verb="POST", json={"query": statement, "parameters": params}, )
Ancestors
- phc.base_client.BaseClient
Methods
def dsl(self, project: str, data: dict, scroll='')
-
Executes a LifeOmic FHIR Service DSL request
Parameters
project
:str
- The target LifeOmic project identifier
data
:dict
- The DSL request object
scroll
- The scroll request parameter
Returns
ApiResponse
- The API response
Expand source code
def dsl(self, project: str, data: dict, scroll=""): """Executes a LifeOmic FHIR Service DSL request Parameters ---------- project : str The target LifeOmic project identifier data : dict The DSL request object scroll The scroll request parameter Returns ------- phc.ApiResponse The API response """ path = f"fhir-search/projects/{project}" scroll = scroll if scroll is not True else "true" params = {"scroll": scroll if scroll is not True else "true"} return self._api_call( http_verb="POST", api_path=path, params=params, json=data )
def es_sql(self, project_id: str, statement: str, params: List[Dict] = [], subject_id='') ‑> phc.api_response.ApiResponse
-
Executes an OpenSearch SQL against fhir-search-service
Parameters
project_id
:str
- The project ID
statement
:str
- The prepared OpenSearch SQL statement
params
:List[Dict]
- The parameters for the SQL statement
subject_id
:str
, optional- The subject ID
Returns
ApiResponse
- The query response
Expand source code
def es_sql( self, project_id: str, statement: str, params: List[Dict] = [], subject_id="", ) -> ApiResponse: """Executes an OpenSearch SQL against fhir-search-service Parameters ---------- project_id : str The project ID statement : str The prepared OpenSearch SQL statement params: List[Dict] The parameters for the SQL statement subject_id : str, optional The subject ID Returns ------- phc.ApiResponse The query response """ api_path = f"fhir-search/sql/projects/{project_id}" if subject_id is not None and subject_id != "": api_path = f"{api_path}/patients/{subject_id}" return self._api_call( api_path=api_path, http_verb="POST", json={"query": statement, "parameters": params}, )
def execute_es(self, project_id: str, query: dict, scroll='') ‑> phc.api_response.ApiResponse
-
Executes an elasticsearch query against fhir-search-service
Parameters
project_id
:str
- The project ID
query
:dict
- The ES query dictionary
Returns
ApiResponse
- The query response
Expand source code
def execute_es( self, project_id: str, query: dict, scroll="" ) -> ApiResponse: """Executes an elasticsearch query against fhir-search-service Parameters ---------- project_id : str The project ID query : dict The ES query dictionary Returns ------- phc.ApiResponse The query response """ warnings.warn("Use the dsl method instead", DeprecationWarning) return self._api_call( api_path=f"fhir-search/projects/{project_id}", http_verb="POST", json=query, params={"scroll": scroll}, )
def execute_sql(self, project_id: str, statement: str, scroll='') ‑> phc.api_response.ApiResponse
-
Executes an SQL query against fhir-search-service
Parameters
project_id
:str
- The project ID.
statement
:str
- The SQL statement.
Returns
ApiResponse
- The query response.
Examples
>>> import pandas as pd >>> from phc.services import Fhir >>> fhir = Fhir(session) >>> res = fhir.execute_sql(project_id='19e34782-91c4-4143-aaee-2ba81ed0b206', statement='SELECT * from patient LIMIT 0,5000')
>>> resources = list(map(lambda r: r.get("_source"), res.get("hits").get("hits"))) >>> df = pd.DataFrame(resources)
Expand source code
def execute_sql( self, project_id: str, statement: str, scroll="" ) -> ApiResponse: """Executes an SQL query against fhir-search-service Parameters ---------- project_id : str The project ID. statement : str The SQL statement. Returns ------- phc.ApiResponse The query response. Examples -------- >>> import pandas as pd >>> from phc.services import Fhir >>> fhir = Fhir(session) >>> res = fhir.execute_sql(project_id='19e34782-91c4-4143-aaee-2ba81ed0b206', statement='SELECT * from patient LIMIT 0,5000') >>> resources = list(map(lambda r: r.get("_source"), res.get("hits").get("hits"))) >>> df = pd.DataFrame(resources) """ """Executes an SQL query against fhir-search-service Returns: [List] -- Dictionary with query response """ warnings.warn("Use the sql method instead", DeprecationWarning) return self._api_call( api_path=f"fhir-search/projects/{project_id}", http_verb="POST", data=statement, headers={"Content-Type": "text/plain"}, params={"scroll": scroll}, )
def sql(self, project: str, statement: str, scroll='') ‑> phc.api_response.ApiResponse
-
Executes a LifeOmic FHIR Service SQL request
Parameters
project
:str
- The target LifeOmic project identifier
statement
:str
- The SQL request statement
scroll
- The scroll request parameter
Returns
ApiResponse
- The API response
Expand source code
def sql(self, project: str, statement: str, scroll="") -> ApiResponse: """Executes a LifeOmic FHIR Service SQL request Parameters ---------- project : str The target LifeOmic project identifier statement : str The SQL request statement scroll The scroll request parameter Returns ------- phc.ApiResponse The API response """ path = f"fhir-search/projects/{project}" headers = {"Content-Type": "text/plain"} params = {"scroll": scroll if scroll is not True else "true"} return self._api_call( http_verb="POST", api_path=path, headers=headers, params=params, data=statement, )
class Files (session: phc.session.Session, run_async: bool = False, timeout: int = 30, trust_env: bool = False)
-
Provides acccess to PHC files
Parameters
session
:Session
- The PHC session
run_async
:bool
- True to return promises, False to return results (default is False)
timeout
:int
- Operation timeout (default is 30)
trust_env
:bool
- Get proxies information from HTTP_PROXY / HTTPS_PROXY environment variables if the parameter is True (False by default)
Expand source code
class Files(BaseClient): """Provides acccess to PHC files Parameters ---------- session : phc.Session The PHC session run_async: bool True to return promises, False to return results (default is False) timeout: int Operation timeout (default is 30) trust_env: bool Get proxies information from HTTP_PROXY / HTTPS_PROXY environment variables if the parameter is True (False by default) """ _MULTIPART_MIN_SIZE = 5 * 1024 * 1024 _MAX_PARTS = 10000 def upload( self, project_id: str, source: str, file_name: Optional[str] = None, overwrite: Optional[bool] = False, ) -> ApiResponse: """Upload a file. Parameters ---------- project_id : str The project ID source : str The path of the file to upload file_name : str, optional The name of the file, If None will default to the actual base file name. overwrite : bool, optional True to overwrite an existing file of the same name, by default False Returns ------- ApiResponse The upload file response Examples -------- >>> from phc.services import Files >>> files = files(session) >>> files.upload(project_id="db3e09e9-1ecd-4976-aa5e-70ac7ada0cc3", source="./myfile.txt", overwrite=True) """ file_size = os.path.getsize(source) if file_size > self._MULTIPART_MIN_SIZE: res = self._api_call( "uploads", json={ "name": ( file_name if file_name is not None else os.path.basename(source) ), "datasetId": project_id, "overwrite": overwrite, }, ) upload_id = res.get("uploadId") part_size = max( math.ceil(file_size / self._MAX_PARTS), self._MULTIPART_MIN_SIZE ) total_parts = math.ceil(file_size / part_size) part = 1 while part <= total_parts: start = (part - 1) * part_size end = file_size if part == total_parts else start + part_size f = open(source, "rb") f.seek(start) data = f.read(end - start) f.close() part_res = self._api_call( f"uploads/{upload_id}/parts/{part}", http_verb="GET" ) self._api_call_impl( http_verb="PUT", url=part_res.get("uploadUrl"), api_path=None, upload_file=data, headers={ "Content-Length": str(end - start), "Authorization": None, "LifeOmic-Account": None, "Content-Type": None, }, ) print(f"Upload {part}") part += 1 self._api_call(f"uploads/{upload_id}", http_verb="DELETE") return res else: res = self._api_call( "files", json={ "name": ( file_name if file_name is not None else os.path.basename(source) ), "datasetId": project_id, "overwrite": overwrite, }, ) self._api_call_impl( http_verb="PUT", url=res.get("uploadUrl"), api_path=None, upload_file=source, headers={ "Content-Length": str(file_size), "Authorization": None, "LifeOmic-Account": None, "Content-Type": None, }, ) return res @backoff.on_exception( backoff.expo, OSError, max_tries=6, jitter=backoff.full_jitter ) def download(self, file_id: str, dest_dir: str = os.getcwd()) -> None: """Download a file Parameters ---------- file_id : str The file ID dest_dir : str, optional The local directory to save the file. Defaults to the current working directory Examples -------- >>> from phc.services import Files >>> files = files(session) >>> files.download(file_id="db3e09e9-1ecd-4976-aa5e-70ac7ada0cc3", dest_dir="./mydata") """ try: res = self._api_call( f"files/{file_id}?include=downloadUrl", http_verb="GET" ) except ApiError as e: if e.response.status_code == 422: raise FileArchiveError( "This file is currently archived and is not available for download. Contact LifeOmic support to learn more." ) from None file_path = os.path.join(dest_dir, res.get("name")) target_dir = os.path.dirname(file_path) if not os.path.exists(target_dir): os.makedirs(target_dir) urlretrieve(res.get("downloadUrl"), file_path) return file_path def get(self, file_id: str) -> ApiResponse: """Fetch a file by id Parameters ---------- file_id : str The file ID. Returns ------- phc.ApiResponse The get file response """ return self._api_call(f"files/{file_id}", http_verb="GET") def update( self, file_id: str, project_id: Optional[str] = None, name: Optional[str] = None, ) -> ApiResponse: """Update a files by moving it to a new project or by renaming it. Parameters ---------- file_id : str The file ID to update. project_id : str, optional The new project ID for the file. name : str, optional The new file name Returns ------- phc.ApiResponse The update file response """ if not project_id and not name: raise ValueError( "Must provide a value for either 'project_id' or 'name'" ) json_body = {} if name: json_body["name"] = name if project_id: json_body["datasetId"] = project_id try: return self._api_call( f"files/{file_id}", json=json_body, http_verb="PATCH" ) except ApiError as e: if e.response.status_code == 422: raise FileArchiveError( "This file is currently archived and cannot be moved. Contact LifeOmic support to learn more." ) from None def delete(self, file_id: str) -> bool: """Delete a file Parameters ---------- file_id : str The file ID. Returns ------- bool True if the delete succeeeds, otherwise False """ return ( self._api_call(f"files/{file_id}", http_verb="DELETE").status_code == 204 ) def get_list( self, project_id: str, folder: Optional[str] = None, page_size: Optional[int] = None, next_page_token: Optional[str] = None, ) -> ApiResponse: """Fetch a list of files in a project Parameters ---------- project_id: str The project ID folder: str, optional The folder prefix to look for files, by default None page_size : int, optional The page size, by default None next_page_token : str, optional The next page token, by default None Returns ------- phc.ApiResponse The list files response """ query_dict = {} if page_size: query_dict["pageSize"] = page_size if next_page_token: query_dict["nextPageToken"] = next_page_token if folder: query_dict["prefix"] = folder return self._api_call( f"projects/{project_id}/files?{urlencode(query_dict)}", http_verb="GET", ) def exists(self, file_id: str) -> ApiResponse: """Check if a file exists by id Parameters ---------- file_id : str The file ID. Returns ------- bool True if the file exists, false otherwise """ try: self._api_call(f"files/{file_id}", http_verb="GET") return True except ApiError as e: if e.response.status_code == 404: return False raise e
Ancestors
- phc.base_client.BaseClient
Methods
def delete(self, file_id: str) ‑> bool
-
Delete a file
Parameters
file_id
:str
- The file ID.
Returns
bool
- True if the delete succeeeds, otherwise False
Expand source code
def delete(self, file_id: str) -> bool: """Delete a file Parameters ---------- file_id : str The file ID. Returns ------- bool True if the delete succeeeds, otherwise False """ return ( self._api_call(f"files/{file_id}", http_verb="DELETE").status_code == 204 )
def download(self, file_id: str, dest_dir: str = '/home/runner/work/phc-sdk-py/phc-sdk-py') ‑> None
-
Download a file
Parameters
file_id
:str
- The file ID
dest_dir
:str
, optional- The local directory to save the file. Defaults to the current working directory
Examples
>>> from phc.services import Files >>> files = files(session) >>> files.download(file_id="db3e09e9-1ecd-4976-aa5e-70ac7ada0cc3", dest_dir="./mydata")
Expand source code
@backoff.on_exception( backoff.expo, OSError, max_tries=6, jitter=backoff.full_jitter ) def download(self, file_id: str, dest_dir: str = os.getcwd()) -> None: """Download a file Parameters ---------- file_id : str The file ID dest_dir : str, optional The local directory to save the file. Defaults to the current working directory Examples -------- >>> from phc.services import Files >>> files = files(session) >>> files.download(file_id="db3e09e9-1ecd-4976-aa5e-70ac7ada0cc3", dest_dir="./mydata") """ try: res = self._api_call( f"files/{file_id}?include=downloadUrl", http_verb="GET" ) except ApiError as e: if e.response.status_code == 422: raise FileArchiveError( "This file is currently archived and is not available for download. Contact LifeOmic support to learn more." ) from None file_path = os.path.join(dest_dir, res.get("name")) target_dir = os.path.dirname(file_path) if not os.path.exists(target_dir): os.makedirs(target_dir) urlretrieve(res.get("downloadUrl"), file_path) return file_path
def exists(self, file_id: str) ‑> phc.api_response.ApiResponse
-
Check if a file exists by id
Parameters
file_id
:str
- The file ID.
Returns
bool
- True if the file exists, false otherwise
Expand source code
def exists(self, file_id: str) -> ApiResponse: """Check if a file exists by id Parameters ---------- file_id : str The file ID. Returns ------- bool True if the file exists, false otherwise """ try: self._api_call(f"files/{file_id}", http_verb="GET") return True except ApiError as e: if e.response.status_code == 404: return False raise e
def get(self, file_id: str) ‑> phc.api_response.ApiResponse
-
Expand source code
def get(self, file_id: str) -> ApiResponse: """Fetch a file by id Parameters ---------- file_id : str The file ID. Returns ------- phc.ApiResponse The get file response """ return self._api_call(f"files/{file_id}", http_verb="GET")
def get_list(self, project_id: str, folder: Optional[str] = None, page_size: Optional[int] = None, next_page_token: Optional[str] = None) ‑> phc.api_response.ApiResponse
-
Fetch a list of files in a project
Parameters
project_id
:str
- The project ID
folder
:str
, optional- The folder prefix to look for files, by default None
page_size
:int
, optional- The page size, by default None
next_page_token
:str
, optional- The next page token, by default None
Returns
ApiResponse
- The list files response
Expand source code
def get_list( self, project_id: str, folder: Optional[str] = None, page_size: Optional[int] = None, next_page_token: Optional[str] = None, ) -> ApiResponse: """Fetch a list of files in a project Parameters ---------- project_id: str The project ID folder: str, optional The folder prefix to look for files, by default None page_size : int, optional The page size, by default None next_page_token : str, optional The next page token, by default None Returns ------- phc.ApiResponse The list files response """ query_dict = {} if page_size: query_dict["pageSize"] = page_size if next_page_token: query_dict["nextPageToken"] = next_page_token if folder: query_dict["prefix"] = folder return self._api_call( f"projects/{project_id}/files?{urlencode(query_dict)}", http_verb="GET", )
def update(self, file_id: str, project_id: Optional[str] = None, name: Optional[str] = None) ‑> phc.api_response.ApiResponse
-
Update a files by moving it to a new project or by renaming it.
Parameters
file_id
:str
- The file ID to update.
project_id
:str
, optional- The new project ID for the file.
name
:str
, optional- The new file name
Returns
ApiResponse
- The update file response
Expand source code
def update( self, file_id: str, project_id: Optional[str] = None, name: Optional[str] = None, ) -> ApiResponse: """Update a files by moving it to a new project or by renaming it. Parameters ---------- file_id : str The file ID to update. project_id : str, optional The new project ID for the file. name : str, optional The new file name Returns ------- phc.ApiResponse The update file response """ if not project_id and not name: raise ValueError( "Must provide a value for either 'project_id' or 'name'" ) json_body = {} if name: json_body["name"] = name if project_id: json_body["datasetId"] = project_id try: return self._api_call( f"files/{file_id}", json=json_body, http_verb="PATCH" ) except ApiError as e: if e.response.status_code == 422: raise FileArchiveError( "This file is currently archived and cannot be moved. Contact LifeOmic support to learn more." ) from None
def upload(self, project_id: str, source: str, file_name: Optional[str] = None, overwrite: Optional[bool] = False) ‑> phc.api_response.ApiResponse
-
Upload a file.
Parameters
project_id
:str
- The project ID
source
:str
- The path of the file to upload
file_name
:str
, optional- The name of the file, If None will default to the actual base file name.
overwrite
:bool
, optional- True to overwrite an existing file of the same name, by default False
Returns
ApiResponse
- The upload file response
Examples
>>> from phc.services import Files >>> files = files(session) >>> files.upload(project_id="db3e09e9-1ecd-4976-aa5e-70ac7ada0cc3", source="./myfile.txt", overwrite=True)
Expand source code
def upload( self, project_id: str, source: str, file_name: Optional[str] = None, overwrite: Optional[bool] = False, ) -> ApiResponse: """Upload a file. Parameters ---------- project_id : str The project ID source : str The path of the file to upload file_name : str, optional The name of the file, If None will default to the actual base file name. overwrite : bool, optional True to overwrite an existing file of the same name, by default False Returns ------- ApiResponse The upload file response Examples -------- >>> from phc.services import Files >>> files = files(session) >>> files.upload(project_id="db3e09e9-1ecd-4976-aa5e-70ac7ada0cc3", source="./myfile.txt", overwrite=True) """ file_size = os.path.getsize(source) if file_size > self._MULTIPART_MIN_SIZE: res = self._api_call( "uploads", json={ "name": ( file_name if file_name is not None else os.path.basename(source) ), "datasetId": project_id, "overwrite": overwrite, }, ) upload_id = res.get("uploadId") part_size = max( math.ceil(file_size / self._MAX_PARTS), self._MULTIPART_MIN_SIZE ) total_parts = math.ceil(file_size / part_size) part = 1 while part <= total_parts: start = (part - 1) * part_size end = file_size if part == total_parts else start + part_size f = open(source, "rb") f.seek(start) data = f.read(end - start) f.close() part_res = self._api_call( f"uploads/{upload_id}/parts/{part}", http_verb="GET" ) self._api_call_impl( http_verb="PUT", url=part_res.get("uploadUrl"), api_path=None, upload_file=data, headers={ "Content-Length": str(end - start), "Authorization": None, "LifeOmic-Account": None, "Content-Type": None, }, ) print(f"Upload {part}") part += 1 self._api_call(f"uploads/{upload_id}", http_verb="DELETE") return res else: res = self._api_call( "files", json={ "name": ( file_name if file_name is not None else os.path.basename(source) ), "datasetId": project_id, "overwrite": overwrite, }, ) self._api_call_impl( http_verb="PUT", url=res.get("uploadUrl"), api_path=None, upload_file=source, headers={ "Content-Length": str(file_size), "Authorization": None, "LifeOmic-Account": None, "Content-Type": None, }, ) return res
class GenomicIngestions (session: phc.session.Session, run_async: bool = False, timeout: int = 30, trust_env: bool = False)
-
Provides access to PHC Genomic Ingestions
Parameters
session
:Session
- The PHC session.
run_async
:bool
- True to return promises, False to return results (default is False).
timeout
:int
- Operation timeout (default is 30).
trust_env
:bool
- Get proxies information from HTTP_PROXY / HTTPS_PROXY environment variables if the parameter is True (False by default).
Expand source code
class GenomicIngestions(BaseClient): """Provides access to PHC Genomic Ingestions Parameters ---------- session: phc.Session The PHC session. run_async: bool True to return promises, False to return results (default is False). timeout: int Operation timeout (default is 30). trust_env: bool Get proxies information from HTTP_PROXY / HTTPS_PROXY environment variables if the parameter is True (False by default). """ def get(self, ingestion_id: str, project_id: str) -> ApiResponse: """Fetch an ingestion by id Parameters ---------- ingestion_id: str The ingestion ID. project_id: str The project ID for the ingestion. Returns ------- phc.ApiResponse The get ingestion response. """ return self._api_call( f"genomic-ingestion/projects/{project_id}/ingestions/{ingestion_id}", http_verb="GET", ) def list( self, project_id: str, name: Optional[str] = None, failed: Optional[bool] = None, step: Optional[IngestionStep] = None, page_size: Optional[int] = None, next_page_token: Optional[str] = None, ) -> ApiResponse: """Fetch a list of ingestions in a project Parameters ---------- project_id: str The project ID for the ingestions. name: str, optional The name to filter ingestions by, by default None. failed: bool, optional The status of the ingestions to filter by, by default None. step: IngestionStep, optional The ingestion steps to filter by, by default None. page_size: int, optional The page size, by default None. next_page_token: str, optional The next page token, by default None. Returns ------- phc.ApiResponse The list ingestions response. """ query_dict: Dict[str, Union[str, int, bool]] = {} if page_size: query_dict["pageSize"] = page_size if next_page_token: query_dict["nextPageToken"] = next_page_token if name: query_dict["name"] = name if failed is not None: query_dict["failed"] = "true" if failed else "false" if step: query_dict["steps"] = step.value return self._api_call( f"genomic-ingestion/projects/{project_id}/ingestions", http_verb="GET", params=query_dict, ) def create_foundation( self, project_id: str, xml_file_id: str, report_file_id: str, vcf_file_id: Optional[str] = None, succeeded_email: Optional[str] = None, failed_email: Optional[str] = None, ) -> ApiResponse: """Create a Foundation ingestion in a project Parameters ---------- project_id: str The project ID to create the ingestion in. xml_file_id: str The ID of the XML file to ingest. report_file_id: str The ID of the Report file to ingest. vcf_file_id: str, optional The ID of the VCF file to ingest, by default None. succeeded_email: str, optional The email address to notify if the ingestion succeeds, by default None. failed_email: str, optional The email address to notify if the ingestion fails, by default None. Returns phc.ApiResponse The ingestion that was created. """ return self._api_call( f"genomic-ingestion/projects/{project_id}/ingestions", json={ "ingestionType": "Foundation", "inputFiles": { "xml": xml_file_id, "vcf": vcf_file_id, "report": report_file_id, }, "notificationConfig": { "succeededEmail": succeeded_email, "failedEmail": failed_email, }, }, ) def create_caris( self, project_id: str, tar_file_id: str, succeeded_email: Optional[str] = None, failed_email: Optional[str] = None, ) -> ApiResponse: """Create a Caris ingestion in a project Parameters ---------- project_id: str The project ID to create the ingestion in. tar_file_id: str The ID of the TAR file to ingest. succeeded_email: str, optional The email address to notify if the ingestion succeeds, by default None. failed_email: str, optional The email address to notify if the ingestion fails, by default None. Returns phc.ApiResponse The ingestion that was created. """ return self._api_call( f"genomic-ingestion/projects/{project_id}/ingestions", json={ "ingestionType": "Caris", "inputFiles": {"tar": tar_file_id}, "notificationConfig": { "succeededEmail": succeeded_email, "failedEmail": failed_email, }, }, ) def create_foundation_bam( self, project_id: str, bam_file_id: str, succeeded_email: Optional[str] = None, failed_email: Optional[str] = None, ) -> ApiResponse: """Create a Foundation BAM ingestion in a project Parameters ---------- project_id: str The project ID to create the ingestion in. bam_file_id: str The ID of the BAM file to ingest. succeeded_email: str, optional The email address to notify if the ingestion succeeds, by default None. failed_email: str, optional The email address to notify if the ingestion fails, by default None. Returns phc.ApiResponse The ingestion that was created. """ return self._api_call( f"genomic-ingestion/projects/{project_id}/ingestions", json={ "ingestionType": "FoundationBam", "inputFiles": {"bam": bam_file_id}, "notificationConfig": { "succeededEmail": succeeded_email, "failedEmail": failed_email, }, }, ) def create_caris_bam( self, project_id: str, bam_file_id: str, succeeded_email: Optional[str] = None, failed_email: Optional[str] = None, ) -> ApiResponse: """Create a Caris BAM ingestion in a project Parameters ---------- project_id: str The project ID to create the ingestion in. bam_file_id: str The ID of the BAM file to ingest. succeeded_email: str, optional The email address to notify if the ingestion succeeds, by default None. failed_email: str, optional The email address to notify if the ingestion fails, by default None. Returns phc.ApiResponse The ingestion that was created. """ return self._api_call( f"genomic-ingestion/projects/{project_id}/ingestions", json={ "ingestionType": "CarisBam", "inputFiles": {"bam": bam_file_id}, "notificationConfig": { "succeededEmail": succeeded_email, "failedEmail": failed_email, }, }, ) def create_nextgen( self, project_id: str, tar_file_id: str, succeeded_email: Optional[str] = None, failed_email: Optional[str] = None, ) -> ApiResponse: """Create a NextGen ingestion in a project Parameters ---------- project_id: str The project ID to create the ingestion in. tar_file_id: str The ID of the TAR file to ingest. succeeded_email: str, optional The email address to notify if the ingestion succeeds, by default None. failed_email: str, optional The email address to notify if the ingestion fails, by default None. Returns phc.ApiResponse The ingestion that was created. """ return self._api_call( f"genomic-ingestion/projects/{project_id}/ingestions", json={ "ingestionType": "NextGen", "inputFiles": {"tar": tar_file_id}, "notificationConfig": { "succeededEmail": succeeded_email, "failedEmail": failed_email, }, }, ) def create_vcf( self, project_id: str, vcf_file_id: str, manifest_file_id: str, succeeded_email: Optional[str] = None, failed_email: Optional[str] = None, ) -> ApiResponse: """Creates a VCF ingestion in a project Parameters ---------- project_id: str The project ID to create the ingestion in. vcf_file_id: str The ID of the VCF file to ingest. manifest_file_id: str The ID of the manifest file to ingest. succeeded_email: str, optional The email address to notify if the ingestion succeeds, by default None. failed_email: str, optional The email address to notify if the ingestion fails, by default None. Returns ------- phc.ApiResponse The ingestion that was created. """ return self._api_call( f"genomic-ingestion/projects/{project_id}/ingestions", json={ "ingestionType": "Vcf", "inputFiles": { "vcf": vcf_file_id, "manifest": manifest_file_id, }, "notificationConfig": { "succeededEmail": succeeded_email, "failedEmail": failed_email, }, }, )
Ancestors
- phc.base_client.BaseClient
Methods
def create_caris(self, project_id: str, tar_file_id: str, succeeded_email: Optional[str] = None, failed_email: Optional[str] = None) ‑> phc.api_response.ApiResponse
-
Create a Caris ingestion in a project
Parameters
project_id
:str
- The project ID to create the ingestion in.
tar_file_id
:str
- The ID of the TAR file to ingest.
succeeded_email
:str
, optional- The email address to notify if the ingestion succeeds, by default None.
failed_email
:str
, optional- The email address to notify if the ingestion fails, by default None.
Returns
phc.ApiResponse The ingestion that was created.
Expand source code
def create_caris( self, project_id: str, tar_file_id: str, succeeded_email: Optional[str] = None, failed_email: Optional[str] = None, ) -> ApiResponse: """Create a Caris ingestion in a project Parameters ---------- project_id: str The project ID to create the ingestion in. tar_file_id: str The ID of the TAR file to ingest. succeeded_email: str, optional The email address to notify if the ingestion succeeds, by default None. failed_email: str, optional The email address to notify if the ingestion fails, by default None. Returns phc.ApiResponse The ingestion that was created. """ return self._api_call( f"genomic-ingestion/projects/{project_id}/ingestions", json={ "ingestionType": "Caris", "inputFiles": {"tar": tar_file_id}, "notificationConfig": { "succeededEmail": succeeded_email, "failedEmail": failed_email, }, }, )
def create_caris_bam(self, project_id: str, bam_file_id: str, succeeded_email: Optional[str] = None, failed_email: Optional[str] = None) ‑> phc.api_response.ApiResponse
-
Create a Caris BAM ingestion in a project
Parameters
project_id
:str
- The project ID to create the ingestion in.
bam_file_id
:str
- The ID of the BAM file to ingest.
succeeded_email
:str
, optional- The email address to notify if the ingestion succeeds, by default None.
failed_email
:str
, optional- The email address to notify if the ingestion fails, by default None.
Returns
phc.ApiResponse The ingestion that was created.
Expand source code
def create_caris_bam( self, project_id: str, bam_file_id: str, succeeded_email: Optional[str] = None, failed_email: Optional[str] = None, ) -> ApiResponse: """Create a Caris BAM ingestion in a project Parameters ---------- project_id: str The project ID to create the ingestion in. bam_file_id: str The ID of the BAM file to ingest. succeeded_email: str, optional The email address to notify if the ingestion succeeds, by default None. failed_email: str, optional The email address to notify if the ingestion fails, by default None. Returns phc.ApiResponse The ingestion that was created. """ return self._api_call( f"genomic-ingestion/projects/{project_id}/ingestions", json={ "ingestionType": "CarisBam", "inputFiles": {"bam": bam_file_id}, "notificationConfig": { "succeededEmail": succeeded_email, "failedEmail": failed_email, }, }, )
def create_foundation(self, project_id: str, xml_file_id: str, report_file_id: str, vcf_file_id: Optional[str] = None, succeeded_email: Optional[str] = None, failed_email: Optional[str] = None) ‑> phc.api_response.ApiResponse
-
Create a Foundation ingestion in a project
Parameters
project_id
:str
- The project ID to create the ingestion in.
xml_file_id
:str
- The ID of the XML file to ingest.
report_file_id
:str
- The ID of the Report file to ingest.
vcf_file_id
:str
, optional- The ID of the VCF file to ingest, by default None.
succeeded_email
:str
, optional- The email address to notify if the ingestion succeeds, by default None.
failed_email
:str
, optional- The email address to notify if the ingestion fails, by default None.
Returns
phc.ApiResponse The ingestion that was created.
Expand source code
def create_foundation( self, project_id: str, xml_file_id: str, report_file_id: str, vcf_file_id: Optional[str] = None, succeeded_email: Optional[str] = None, failed_email: Optional[str] = None, ) -> ApiResponse: """Create a Foundation ingestion in a project Parameters ---------- project_id: str The project ID to create the ingestion in. xml_file_id: str The ID of the XML file to ingest. report_file_id: str The ID of the Report file to ingest. vcf_file_id: str, optional The ID of the VCF file to ingest, by default None. succeeded_email: str, optional The email address to notify if the ingestion succeeds, by default None. failed_email: str, optional The email address to notify if the ingestion fails, by default None. Returns phc.ApiResponse The ingestion that was created. """ return self._api_call( f"genomic-ingestion/projects/{project_id}/ingestions", json={ "ingestionType": "Foundation", "inputFiles": { "xml": xml_file_id, "vcf": vcf_file_id, "report": report_file_id, }, "notificationConfig": { "succeededEmail": succeeded_email, "failedEmail": failed_email, }, }, )
def create_foundation_bam(self, project_id: str, bam_file_id: str, succeeded_email: Optional[str] = None, failed_email: Optional[str] = None) ‑> phc.api_response.ApiResponse
-
Create a Foundation BAM ingestion in a project
Parameters
project_id
:str
- The project ID to create the ingestion in.
bam_file_id
:str
- The ID of the BAM file to ingest.
succeeded_email
:str
, optional- The email address to notify if the ingestion succeeds, by default None.
failed_email
:str
, optional- The email address to notify if the ingestion fails, by default None.
Returns
phc.ApiResponse The ingestion that was created.
Expand source code
def create_foundation_bam( self, project_id: str, bam_file_id: str, succeeded_email: Optional[str] = None, failed_email: Optional[str] = None, ) -> ApiResponse: """Create a Foundation BAM ingestion in a project Parameters ---------- project_id: str The project ID to create the ingestion in. bam_file_id: str The ID of the BAM file to ingest. succeeded_email: str, optional The email address to notify if the ingestion succeeds, by default None. failed_email: str, optional The email address to notify if the ingestion fails, by default None. Returns phc.ApiResponse The ingestion that was created. """ return self._api_call( f"genomic-ingestion/projects/{project_id}/ingestions", json={ "ingestionType": "FoundationBam", "inputFiles": {"bam": bam_file_id}, "notificationConfig": { "succeededEmail": succeeded_email, "failedEmail": failed_email, }, }, )
def create_nextgen(self, project_id: str, tar_file_id: str, succeeded_email: Optional[str] = None, failed_email: Optional[str] = None) ‑> phc.api_response.ApiResponse
-
Create a NextGen ingestion in a project
Parameters
project_id
:str
- The project ID to create the ingestion in.
tar_file_id
:str
- The ID of the TAR file to ingest.
succeeded_email
:str
, optional- The email address to notify if the ingestion succeeds, by default None.
failed_email
:str
, optional- The email address to notify if the ingestion fails, by default None.
Returns
phc.ApiResponse The ingestion that was created.
Expand source code
def create_nextgen( self, project_id: str, tar_file_id: str, succeeded_email: Optional[str] = None, failed_email: Optional[str] = None, ) -> ApiResponse: """Create a NextGen ingestion in a project Parameters ---------- project_id: str The project ID to create the ingestion in. tar_file_id: str The ID of the TAR file to ingest. succeeded_email: str, optional The email address to notify if the ingestion succeeds, by default None. failed_email: str, optional The email address to notify if the ingestion fails, by default None. Returns phc.ApiResponse The ingestion that was created. """ return self._api_call( f"genomic-ingestion/projects/{project_id}/ingestions", json={ "ingestionType": "NextGen", "inputFiles": {"tar": tar_file_id}, "notificationConfig": { "succeededEmail": succeeded_email, "failedEmail": failed_email, }, }, )
def create_vcf(self, project_id: str, vcf_file_id: str, manifest_file_id: str, succeeded_email: Optional[str] = None, failed_email: Optional[str] = None) ‑> phc.api_response.ApiResponse
-
Creates a VCF ingestion in a project
Parameters
project_id
:str
- The project ID to create the ingestion in.
vcf_file_id
:str
- The ID of the VCF file to ingest.
manifest_file_id
:str
- The ID of the manifest file to ingest.
succeeded_email
:str
, optional- The email address to notify if the ingestion succeeds, by default None.
failed_email
:str
, optional- The email address to notify if the ingestion fails, by default None.
Returns
ApiResponse
- The ingestion that was created.
Expand source code
def create_vcf( self, project_id: str, vcf_file_id: str, manifest_file_id: str, succeeded_email: Optional[str] = None, failed_email: Optional[str] = None, ) -> ApiResponse: """Creates a VCF ingestion in a project Parameters ---------- project_id: str The project ID to create the ingestion in. vcf_file_id: str The ID of the VCF file to ingest. manifest_file_id: str The ID of the manifest file to ingest. succeeded_email: str, optional The email address to notify if the ingestion succeeds, by default None. failed_email: str, optional The email address to notify if the ingestion fails, by default None. Returns ------- phc.ApiResponse The ingestion that was created. """ return self._api_call( f"genomic-ingestion/projects/{project_id}/ingestions", json={ "ingestionType": "Vcf", "inputFiles": { "vcf": vcf_file_id, "manifest": manifest_file_id, }, "notificationConfig": { "succeededEmail": succeeded_email, "failedEmail": failed_email, }, }, )
def get(self, ingestion_id: str, project_id: str) ‑> phc.api_response.ApiResponse
-
Fetch an ingestion by id
Parameters
ingestion_id
:str
- The ingestion ID.
project_id
:str
- The project ID for the ingestion.
Returns
ApiResponse
- The get ingestion response.
Expand source code
def get(self, ingestion_id: str, project_id: str) -> ApiResponse: """Fetch an ingestion by id Parameters ---------- ingestion_id: str The ingestion ID. project_id: str The project ID for the ingestion. Returns ------- phc.ApiResponse The get ingestion response. """ return self._api_call( f"genomic-ingestion/projects/{project_id}/ingestions/{ingestion_id}", http_verb="GET", )
def list(self, project_id: str, name: Optional[str] = None, failed: Optional[bool] = None, step: Optional[phc.services.genomic_ingestions.IngestionStep] = None, page_size: Optional[int] = None, next_page_token: Optional[str] = None) ‑> phc.api_response.ApiResponse
-
Fetch a list of ingestions in a project
Parameters
project_id
:str
- The project ID for the ingestions.
name
:str
, optional- The name to filter ingestions by, by default None.
failed
:bool
, optional- The status of the ingestions to filter by, by default None.
step
:IngestionStep
, optional- The ingestion steps to filter by, by default None.
page_size
:int
, optional- The page size, by default None.
next_page_token
:str
, optional- The next page token, by default None.
Returns
ApiResponse
- The list ingestions response.
Expand source code
def list( self, project_id: str, name: Optional[str] = None, failed: Optional[bool] = None, step: Optional[IngestionStep] = None, page_size: Optional[int] = None, next_page_token: Optional[str] = None, ) -> ApiResponse: """Fetch a list of ingestions in a project Parameters ---------- project_id: str The project ID for the ingestions. name: str, optional The name to filter ingestions by, by default None. failed: bool, optional The status of the ingestions to filter by, by default None. step: IngestionStep, optional The ingestion steps to filter by, by default None. page_size: int, optional The page size, by default None. next_page_token: str, optional The next page token, by default None. Returns ------- phc.ApiResponse The list ingestions response. """ query_dict: Dict[str, Union[str, int, bool]] = {} if page_size: query_dict["pageSize"] = page_size if next_page_token: query_dict["nextPageToken"] = next_page_token if name: query_dict["name"] = name if failed is not None: query_dict["failed"] = "true" if failed else "false" if step: query_dict["steps"] = step.value return self._api_call( f"genomic-ingestion/projects/{project_id}/ingestions", http_verb="GET", params=query_dict, )
class Genomics (session: phc.session.Session, run_async: bool = False, timeout: int = 30, trust_env: bool = False)
-
Provides acccess to PHC genomic resources
Parameters
session
:Session
- The PHC session
run_async
:bool
- True to return promises, False to return results (default is False)
timeout
:int
- Operation timeout (default is 30)
trust_env
:bool
- Get proxies information from HTTP_PROXY / HTTPS_PROXY environment variables if the parameter is True (False by default)
Expand source code
class Genomics(BaseClient): """Provides acccess to PHC genomic resources Parameters ---------- session : phc.Session The PHC session run_async: bool True to return promises, False to return results (default is False) timeout: int Operation timeout (default is 30) trust_env: bool Get proxies information from HTTP_PROXY / HTTPS_PROXY environment variables if the parameter is True (False by default) """ class SetType(Enum): VARIANT = "variantsets" STRUCTURAL_VARIANT = "fusionsets" RNA = "rnaquantificationsets" READ = "readgroupsets" COPY_NUMBER = "copynumbersets" class Reference(Enum): GRCh37 = "GRCh37" GRCh38 = "GRCh38" class SequenceType(Enum): GERMLINE = "germline" SOMATIC = "somatic" METASTATIC = "metastatic" CTDNA = "ctDNA" RNA = "rna" class Status(Enum): ACTIVE = "ACTIVE" INDEXING = "INDEXING" FAILED = "FAILED" def create_set( self, set_type: SetType, project_id: str, name: str, file_id: str, patient_id: str, reference: Reference, sequence_type: SequenceType, test_type: str, sequence_id: Optional[str] = str(uuid.uuid4()), indexed_date: Optional[datetime] = None, performer_id: Optional[str] = None, test_id: Optional[str] = None, update_sample: Optional[bool] = False, pass_filter: Optional[bool] = False, output_vcf_name: Optional[str] = None, ) -> ApiResponse: """Creates a genomic set Parameters ---------- set_type : SetType The genomic set type project_id : str The project ID name : str The set name file_id : str The genomic file ID patient_id : str The patient ID reference : Reference The genomic reference sequence_type : SequenceType The sequence type test_type : str The test type sequence_id : str, optional The FHIR Sequence ID, by default str(uuid.uuid4()) indexed_date : datetime, optional The indexed date, by default None performer_id : str, optional The performer ID, by default None test_id : str, optional The test ID, by default None update_sample : bool, optional For variants only, True to update the sample ID, by default False pass_filter : bool, optional For variants only, True to update all filters to pass, by default False output_vcf_name : str, optional For variants only, the name of the output VCF, by default None Returns ------- ApiResponse The create set response """ json_body = { "datasetId": project_id, "name": name, "patientId": patient_id, "referenceSetId": reference.value, "sequenceType": sequence_type.value, "testType": test_type, "indexedDate": indexed_date.isoformat() if indexed_date else None, "performerId": performer_id, "testId": test_id, "sequenceId": sequence_id, } if set_type == Genomics.SetType.VARIANT: json_body["variantsFileIds"] = [file_id] json_body["updateSample"] = update_sample json_body["passFile"] = pass_filter json_body["outputVcfName"] = output_vcf_name return self._ga4gh_call( "genomicsets", json=json_body, http_verb="POST" ) else: json_body["fileId"] = file_id return self._ga4gh_call( set_type.value, json=json_body, http_verb="POST" ) def update_set( self, set_type: SetType, set_id: str, updates: dict ) -> ApiResponse: """Update a genomic set Parameters ---------- set_type : SetType The set type set_id : str The set ID updates : dict The updates to apply Returns ------- ApiResponse The fetch response """ return self._ga4gh_call( f"{set_type.value}/{set_id}", json=updates, http_verb="PATCH" ) def get_set(self, set_type: SetType, set_id: str) -> ApiResponse: """Fetch a genomic set Parameters ---------- set_type : SetType The set type set_id : str The set ID Returns ------- ApiResponse The fetch response """ return self._ga4gh_call(f"{set_type.value}/{set_id}", http_verb="GET") def delete_set(self, set_type: SetType, set_id: str) -> bool: """Delete a genomic set Parameters ---------- set_type : SetType The set type set_id : str The set ID Returns ------- bool True if the delete succeeeds, otherwise False """ return ( self._ga4gh_call( f"{set_type.value}/{set_id}", http_verb="DELETE" ).status_code == 204 ) def list_sets( self, set_type: SetType, project_id: str, sequence_id: Optional[str] = None, patient_id: Optional[str] = None, status: Optional[Status] = None, next_page_token: Optional[str] = None, page_size: Optional[int] = 50, ) -> ApiResponse: """List genomic sets Parameters ---------- set_type : SetType The set type project_id : str The project ID sequence_id : str, optional List sets by sequence ID, by default None patient_id : str, optional List sets by patient ID, by default None status : Status, optional Filter sets by status, by default None next_page_token : str, optional The next page token, by default None page_size : int, optional The page size, by default 50 Returns ------- ApiResponse The list sets response """ json_body = { "datasetIds": [project_id], "status": status, "patientId": patient_id, "sequenceId": sequence_id, "pageSize": page_size, "pageToken": next_page_token, } return self._ga4gh_call( f"{set_type.value}/search", json=json_body, http_verb="POST" ) def list_tests(self, project_id: str, patient_id: str) -> ApiResponse: """List tests for a patient Parameters ---------- project_id : str The project ID patient_id : str The patient ID Returns ------- ApiResponse The list tests response """ return self._api_call( f"genomics/projects/{project_id}/subjects/{patient_id}/tests", http_verb="GET", ) def get_test(self, project_id: str, test_id: str) -> ApiResponse: """Get test by project and test id Parameters ---------- project_id : str The project ID test_id : str The Test ID Returns ------- ApiResponse The get test response """ return self._api_call( f"genomics/projects/{project_id}/tests/{test_id}", http_verb="GET" ) def delete_test(self, project_id: str, test_id: str) -> bool: """Delete a genomic test Parameters ---------- project_id : SetType The project ID test_id : str The test ID Returns ------- bool True if the delete succeeeds, otherwise False """ return ( self._ga4gh_call( f"genomics/projects/{project_id}/tests/{test_id}", http_verb="DELETE", ).status_code == 204 )
Ancestors
- phc.base_client.BaseClient
Class variables
var Reference
-
An enumeration.
var SequenceType
-
An enumeration.
var SetType
-
An enumeration.
var Status
-
An enumeration.
Methods
def create_set(self, set_type: phc.services.genomics.Genomics.SetType, project_id: str, name: str, file_id: str, patient_id: str, reference: phc.services.genomics.Genomics.Reference, sequence_type: phc.services.genomics.Genomics.SequenceType, test_type: str, sequence_id: Optional[str] = '5a26d3af-e9da-487e-b2a9-125c85d9aeb5', indexed_date: Optional[datetime.datetime] = None, performer_id: Optional[str] = None, test_id: Optional[str] = None, update_sample: Optional[bool] = False, pass_filter: Optional[bool] = False, output_vcf_name: Optional[str] = None) ‑> phc.api_response.ApiResponse
-
Creates a genomic set
Parameters
set_type
:SetType
- The genomic set type
project_id
:str
- The project ID
name
:str
- The set name
file_id
:str
- The genomic file ID
patient_id
:str
- The patient ID
reference
:Reference
- The genomic reference
sequence_type
:SequenceType
- The sequence type
test_type
:str
- The test type
sequence_id
:str
, optional- The FHIR Sequence ID, by default str(uuid.uuid4())
indexed_date
:datetime
, optional- The indexed date, by default None
performer_id
:str
, optional- The performer ID, by default None
test_id
:str
, optional- The test ID, by default None
update_sample
:bool
, optional- For variants only, True to update the sample ID, by default False
pass_filter
:bool
, optional- For variants only, True to update all filters to pass, by default False
output_vcf_name
:str
, optional- For variants only, the name of the output VCF, by default None
Returns
ApiResponse
- The create set response
Expand source code
def create_set( self, set_type: SetType, project_id: str, name: str, file_id: str, patient_id: str, reference: Reference, sequence_type: SequenceType, test_type: str, sequence_id: Optional[str] = str(uuid.uuid4()), indexed_date: Optional[datetime] = None, performer_id: Optional[str] = None, test_id: Optional[str] = None, update_sample: Optional[bool] = False, pass_filter: Optional[bool] = False, output_vcf_name: Optional[str] = None, ) -> ApiResponse: """Creates a genomic set Parameters ---------- set_type : SetType The genomic set type project_id : str The project ID name : str The set name file_id : str The genomic file ID patient_id : str The patient ID reference : Reference The genomic reference sequence_type : SequenceType The sequence type test_type : str The test type sequence_id : str, optional The FHIR Sequence ID, by default str(uuid.uuid4()) indexed_date : datetime, optional The indexed date, by default None performer_id : str, optional The performer ID, by default None test_id : str, optional The test ID, by default None update_sample : bool, optional For variants only, True to update the sample ID, by default False pass_filter : bool, optional For variants only, True to update all filters to pass, by default False output_vcf_name : str, optional For variants only, the name of the output VCF, by default None Returns ------- ApiResponse The create set response """ json_body = { "datasetId": project_id, "name": name, "patientId": patient_id, "referenceSetId": reference.value, "sequenceType": sequence_type.value, "testType": test_type, "indexedDate": indexed_date.isoformat() if indexed_date else None, "performerId": performer_id, "testId": test_id, "sequenceId": sequence_id, } if set_type == Genomics.SetType.VARIANT: json_body["variantsFileIds"] = [file_id] json_body["updateSample"] = update_sample json_body["passFile"] = pass_filter json_body["outputVcfName"] = output_vcf_name return self._ga4gh_call( "genomicsets", json=json_body, http_verb="POST" ) else: json_body["fileId"] = file_id return self._ga4gh_call( set_type.value, json=json_body, http_verb="POST" )
def delete_set(self, set_type: phc.services.genomics.Genomics.SetType, set_id: str) ‑> bool
-
Delete a genomic set
Parameters
set_type
:SetType
- The set type
set_id
:str
- The set ID
Returns
bool
- True if the delete succeeeds, otherwise False
Expand source code
def delete_set(self, set_type: SetType, set_id: str) -> bool: """Delete a genomic set Parameters ---------- set_type : SetType The set type set_id : str The set ID Returns ------- bool True if the delete succeeeds, otherwise False """ return ( self._ga4gh_call( f"{set_type.value}/{set_id}", http_verb="DELETE" ).status_code == 204 )
def delete_test(self, project_id: str, test_id: str) ‑> bool
-
Delete a genomic test
Parameters
project_id
:SetType
- The project ID
test_id
:str
- The test ID
Returns
bool
- True if the delete succeeeds, otherwise False
Expand source code
def delete_test(self, project_id: str, test_id: str) -> bool: """Delete a genomic test Parameters ---------- project_id : SetType The project ID test_id : str The test ID Returns ------- bool True if the delete succeeeds, otherwise False """ return ( self._ga4gh_call( f"genomics/projects/{project_id}/tests/{test_id}", http_verb="DELETE", ).status_code == 204 )
def get_set(self, set_type: phc.services.genomics.Genomics.SetType, set_id: str) ‑> phc.api_response.ApiResponse
-
Fetch a genomic set
Parameters
set_type
:SetType
- The set type
set_id
:str
- The set ID
Returns
ApiResponse
- The fetch response
Expand source code
def get_set(self, set_type: SetType, set_id: str) -> ApiResponse: """Fetch a genomic set Parameters ---------- set_type : SetType The set type set_id : str The set ID Returns ------- ApiResponse The fetch response """ return self._ga4gh_call(f"{set_type.value}/{set_id}", http_verb="GET")
def get_test(self, project_id: str, test_id: str) ‑> phc.api_response.ApiResponse
-
Get test by project and test id
Parameters
project_id
:str
- The project ID
test_id
:str
- The Test ID
Returns
ApiResponse
- The get test response
Expand source code
def get_test(self, project_id: str, test_id: str) -> ApiResponse: """Get test by project and test id Parameters ---------- project_id : str The project ID test_id : str The Test ID Returns ------- ApiResponse The get test response """ return self._api_call( f"genomics/projects/{project_id}/tests/{test_id}", http_verb="GET" )
def list_sets(self, set_type: phc.services.genomics.Genomics.SetType, project_id: str, sequence_id: Optional[str] = None, patient_id: Optional[str] = None, status: Optional[phc.services.genomics.Genomics.Status] = None, next_page_token: Optional[str] = None, page_size: Optional[int] = 50) ‑> phc.api_response.ApiResponse
-
List genomic sets
Parameters
set_type
:SetType
- The set type
project_id
:str
- The project ID
sequence_id
:str
, optional- List sets by sequence ID, by default None
patient_id
:str
, optional- List sets by patient ID, by default None
status
:Status
, optional- Filter sets by status, by default None
next_page_token
:str
, optional- The next page token, by default None
page_size
:int
, optional- The page size, by default 50
Returns
ApiResponse
- The list sets response
Expand source code
def list_sets( self, set_type: SetType, project_id: str, sequence_id: Optional[str] = None, patient_id: Optional[str] = None, status: Optional[Status] = None, next_page_token: Optional[str] = None, page_size: Optional[int] = 50, ) -> ApiResponse: """List genomic sets Parameters ---------- set_type : SetType The set type project_id : str The project ID sequence_id : str, optional List sets by sequence ID, by default None patient_id : str, optional List sets by patient ID, by default None status : Status, optional Filter sets by status, by default None next_page_token : str, optional The next page token, by default None page_size : int, optional The page size, by default 50 Returns ------- ApiResponse The list sets response """ json_body = { "datasetIds": [project_id], "status": status, "patientId": patient_id, "sequenceId": sequence_id, "pageSize": page_size, "pageToken": next_page_token, } return self._ga4gh_call( f"{set_type.value}/search", json=json_body, http_verb="POST" )
def list_tests(self, project_id: str, patient_id: str) ‑> phc.api_response.ApiResponse
-
List tests for a patient
Parameters
project_id
:str
- The project ID
patient_id
:str
- The patient ID
Returns
ApiResponse
- The list tests response
Expand source code
def list_tests(self, project_id: str, patient_id: str) -> ApiResponse: """List tests for a patient Parameters ---------- project_id : str The project ID patient_id : str The patient ID Returns ------- ApiResponse The list tests response """ return self._api_call( f"genomics/projects/{project_id}/subjects/{patient_id}/tests", http_verb="GET", )
def update_set(self, set_type: phc.services.genomics.Genomics.SetType, set_id: str, updates: dict) ‑> phc.api_response.ApiResponse
-
Update a genomic set
Parameters
set_type
:SetType
- The set type
set_id
:str
- The set ID
updates
:dict
- The updates to apply
Returns
ApiResponse
- The fetch response
Expand source code
def update_set( self, set_type: SetType, set_id: str, updates: dict ) -> ApiResponse: """Update a genomic set Parameters ---------- set_type : SetType The set type set_id : str The set ID updates : dict The updates to apply Returns ------- ApiResponse The fetch response """ return self._ga4gh_call( f"{set_type.value}/{set_id}", json=updates, http_verb="PATCH" )
class PatientML (session: phc.session.Session, run_async: bool = False, timeout: int = 30, trust_env: bool = False)
-
Base client for making API requests.
Expand source code
class PatientML(BaseClient): def create_model(self, body: ModelConfigInput): """Creates a new model via a model config object.""" res = self._api_call( api_path="/v1/patient-ml/models", http_verb="POST", json=json.loads(body.json(exclude_none=True)), ) return CreateModelResponse.parse_obj(res.data) def get_models(self): """Gets all model configs for an account.""" res = self._api_call(api_path="/v1/patient-ml/models", http_verb="GET") return GetModelsResponse.parse_obj(res.data) def update_model(self, id: str, body: ModelConfigInput): """Updates a model config.""" res = self._api_call( api_path=f"/v1/patient-ml/models/{id}", http_verb="PUT", json=json.loads(body.json(exclude_none=True)), ) return UpdateModelResponse.parse_obj(res.data) def delete_model(self, id: str): """Deletes a model.""" res = self._api_call( api_path=f"/v1/patient-ml/models/{id}", http_verb="DELETE" ) return DeleteModelResponse.parse_obj(res.data) def get_model(self, id: str): """Gets a model config.""" res = self._api_call( api_path=f"/v1/patient-ml/models/{id}", http_verb="GET" ) return GetModelResponse.parse_obj(res.data) def create_run(self, model_id: str): """Begins a new ML run for a given model.""" res = self._api_call( api_path=f"/v1/patient-ml/models/{model_id}/runs", http_verb="POST" ) return CreateRunResponse.parse_obj(res.data) def get_runs(self, model_id: str): """Gets data for all ML runs for a model.""" res = self._api_call( api_path=f"/v1/patient-ml/models/{model_id}/runs", http_verb="GET" ) return GetRunsResponse.parse_obj(res.data) def get_run(self, model_id: str, run_id: str): """Gets data for a particular run.""" res = self._api_call( api_path=f"/v1/patient-ml/models/{model_id}/runs/{run_id}", http_verb="GET", ) return GetRunResponse.parse_obj(res.data) def get_model_artifact(self, model_id: str, run_id: str): """Gets a url that can be used to download the model artifact for a particular run.""" res = self._api_call( api_path=f"/v1/patient-ml/models/{model_id}/runs/{run_id}/model-artifact", http_verb="GET", ) return GetModelArtifactResponse.parse_obj(res.data) def get_model_logs( self, model_id: str, run_id: str, params: GetModelLogsParams ): """Gets the log events for a particular run.""" res = self._api_call( api_path=f"/v1/patient-ml/models/{model_id}/runs/{run_id}/logs", http_verb="GET", params=json.loads(params.json(exclude_none=True)), ) return GetModelLogsResponse.parse_obj(res.data) def create_approval_decision( self, model_id: str, run_id: str, body: ApprovalDecisionInput ): """Adds a new approval decision to a model run.""" res = self._api_call( api_path=f"/v1/patient-ml/models/{model_id}/runs/{run_id}/approvals", http_verb="POST", json=json.loads(body.json(exclude_none=True)), ) return CreateApprovalDecisionResponse.parse_obj(res.data) def predict(self, model_id: str): """Constructs an example and submits it to the referenced model for inference. The model's output predictions are then returned. The example is retrieved based on the model's problem type and the provided query parameters. Note that this route is only supported for models using the `cloud` deploy type, and which have a currently deployed model version (champion).""" res = self._api_call( api_path=f"/v1/patient-ml/models/{model_id}/predictions", http_verb="GET", ) return PredictionResponse.parse_obj(res.data) def create_dataset(self, body: DatasetConfigInput): """Creates a new dataset via a dataset config object.""" res = self._api_call( api_path="/v1/patient-ml/datasets", http_verb="POST", json=json.loads(body.json(exclude_none=True)), ) return CreateDatasetResponse.parse_obj(res.data) def get_datasets(self): """Gets all dataset configs in the account.""" res = self._api_call( api_path="/v1/patient-ml/datasets", http_verb="GET" ) return GetDatasetsResponse.parse_obj(res.data) def update_dataset(self, id: str, body: DatasetConfigInput): """Updates a dataset config.""" res = self._api_call( api_path=f"/v1/patient-ml/datasets/{id}", http_verb="PUT", json=json.loads(body.json(exclude_none=True)), ) return UpdateDatasetResponse.parse_obj(res.data) def delete_dataset(self, id: str): """Deletes a dataset.""" res = self._api_call( api_path=f"/v1/patient-ml/datasets/{id}", http_verb="DELETE" ) return DeleteDatasetResponse.parse_obj(res.data) def get_dataset(self, id: str): """Gets a dataset config.""" res = self._api_call( api_path=f"/v1/patient-ml/datasets/{id}", http_verb="GET" ) return GetDatasetResponse.parse_obj(res.data) def get_dataset_examples( self, dataset_id: str, params: GetDatasetExamplesParams ): """Fetches a page of training data examples for data labeling.""" res = self._api_call( api_path=f"/v1/patient-ml/datasets/{dataset_id}/examples", http_verb="GET", params=json.loads(params.json(exclude_none=True)), ) return GetDatasetExamplesResponse.parse_obj(res.data) def get_dataset_example( self, dataset_id: str, example_id: str, params: GetDatasetExampleParams ): """Fetches a single training data example for data labeling.""" res = self._api_call( api_path=f"/v1/patient-ml/datasets/{dataset_id}/examples/{example_id}", http_verb="GET", params=json.loads(params.json(exclude_none=True)), ) return GetDatasetExampleResponse.parse_obj(res.data) def put_dataset_label(self, dataset_id: str, example_id: str, body: Label): """Updates the label for a training data example.""" res = self._api_call( api_path=f"/v1/patient-ml/datasets/{dataset_id}/examples/{example_id}/label", http_verb="PUT", json=json.loads(body.json(exclude_none=True)), ) return Example.parse_obj(res.data) def get_dataset_label_file(self, dataset_id: str, example_id: str): """Retrieves the label file for the given example, if it exists, and converts it to the format LabelStudio expects.""" res = self._api_call( api_path=f"/v1/patient-ml/datasets/{dataset_id}/examples/{example_id}/label-file", http_verb="GET", ) return GetDatasetLabelFileResponse.parse_obj(res.data) def put_dataset_label_file( self, dataset_id: str, example_id: str, body: LabelFileData ): """Preprocesses the label data and updates the label file for a training data example. This is done for ML problem types that store their labels as independent files, such as image segmentation. For those problem types, The label data is not stored on a label FHIR record, but in a separate file-service file, and pointed to by a label FHIR record.""" res = self._api_call( api_path=f"/v1/patient-ml/datasets/{dataset_id}/examples/{example_id}/label-file", http_verb="PUT", json=json.loads(body.json(exclude_none=True)), ) return PutDatasetLabelFileResponse.parse_obj(res.data)
Ancestors
- phc.base_client.BaseClient
Methods
def create_approval_decision(self, model_id: str, run_id: str, body: phc.services.patient_ml.ApprovalDecisionInput)
-
Adds a new approval decision to a model run.
Expand source code
def create_approval_decision( self, model_id: str, run_id: str, body: ApprovalDecisionInput ): """Adds a new approval decision to a model run.""" res = self._api_call( api_path=f"/v1/patient-ml/models/{model_id}/runs/{run_id}/approvals", http_verb="POST", json=json.loads(body.json(exclude_none=True)), ) return CreateApprovalDecisionResponse.parse_obj(res.data)
def create_dataset(self, body: phc.services.patient_ml.DatasetConfigInput)
-
Creates a new dataset via a dataset config object.
Expand source code
def create_dataset(self, body: DatasetConfigInput): """Creates a new dataset via a dataset config object.""" res = self._api_call( api_path="/v1/patient-ml/datasets", http_verb="POST", json=json.loads(body.json(exclude_none=True)), ) return CreateDatasetResponse.parse_obj(res.data)
def create_model(self, body: phc.services.patient_ml.ModelConfigInput)
-
Creates a new model via a model config object.
Expand source code
def create_model(self, body: ModelConfigInput): """Creates a new model via a model config object.""" res = self._api_call( api_path="/v1/patient-ml/models", http_verb="POST", json=json.loads(body.json(exclude_none=True)), ) return CreateModelResponse.parse_obj(res.data)
def create_run(self, model_id: str)
-
Begins a new ML run for a given model.
Expand source code
def create_run(self, model_id: str): """Begins a new ML run for a given model.""" res = self._api_call( api_path=f"/v1/patient-ml/models/{model_id}/runs", http_verb="POST" ) return CreateRunResponse.parse_obj(res.data)
def delete_dataset(self, id: str)
-
Deletes a dataset.
Expand source code
def delete_dataset(self, id: str): """Deletes a dataset.""" res = self._api_call( api_path=f"/v1/patient-ml/datasets/{id}", http_verb="DELETE" ) return DeleteDatasetResponse.parse_obj(res.data)
def delete_model(self, id: str)
-
Deletes a model.
Expand source code
def delete_model(self, id: str): """Deletes a model.""" res = self._api_call( api_path=f"/v1/patient-ml/models/{id}", http_verb="DELETE" ) return DeleteModelResponse.parse_obj(res.data)
def get_dataset(self, id: str)
-
Gets a dataset config.
Expand source code
def get_dataset(self, id: str): """Gets a dataset config.""" res = self._api_call( api_path=f"/v1/patient-ml/datasets/{id}", http_verb="GET" ) return GetDatasetResponse.parse_obj(res.data)
def get_dataset_example(self, dataset_id: str, example_id: str, params: phc.services.patient_ml.GetDatasetExampleParams)
-
Fetches a single training data example for data labeling.
Expand source code
def get_dataset_example( self, dataset_id: str, example_id: str, params: GetDatasetExampleParams ): """Fetches a single training data example for data labeling.""" res = self._api_call( api_path=f"/v1/patient-ml/datasets/{dataset_id}/examples/{example_id}", http_verb="GET", params=json.loads(params.json(exclude_none=True)), ) return GetDatasetExampleResponse.parse_obj(res.data)
def get_dataset_examples(self, dataset_id: str, params: phc.services.patient_ml.GetDatasetExamplesParams)
-
Fetches a page of training data examples for data labeling.
Expand source code
def get_dataset_examples( self, dataset_id: str, params: GetDatasetExamplesParams ): """Fetches a page of training data examples for data labeling.""" res = self._api_call( api_path=f"/v1/patient-ml/datasets/{dataset_id}/examples", http_verb="GET", params=json.loads(params.json(exclude_none=True)), ) return GetDatasetExamplesResponse.parse_obj(res.data)
def get_dataset_label_file(self, dataset_id: str, example_id: str)
-
Retrieves the label file for the given example, if it exists, and converts it to the format LabelStudio expects.
Expand source code
def get_dataset_label_file(self, dataset_id: str, example_id: str): """Retrieves the label file for the given example, if it exists, and converts it to the format LabelStudio expects.""" res = self._api_call( api_path=f"/v1/patient-ml/datasets/{dataset_id}/examples/{example_id}/label-file", http_verb="GET", ) return GetDatasetLabelFileResponse.parse_obj(res.data)
def get_datasets(self)
-
Gets all dataset configs in the account.
Expand source code
def get_datasets(self): """Gets all dataset configs in the account.""" res = self._api_call( api_path="/v1/patient-ml/datasets", http_verb="GET" ) return GetDatasetsResponse.parse_obj(res.data)
def get_model(self, id: str)
-
Gets a model config.
Expand source code
def get_model(self, id: str): """Gets a model config.""" res = self._api_call( api_path=f"/v1/patient-ml/models/{id}", http_verb="GET" ) return GetModelResponse.parse_obj(res.data)
def get_model_artifact(self, model_id: str, run_id: str)
-
Gets a url that can be used to download the model artifact for a particular run.
Expand source code
def get_model_artifact(self, model_id: str, run_id: str): """Gets a url that can be used to download the model artifact for a particular run.""" res = self._api_call( api_path=f"/v1/patient-ml/models/{model_id}/runs/{run_id}/model-artifact", http_verb="GET", ) return GetModelArtifactResponse.parse_obj(res.data)
def get_model_logs(self, model_id: str, run_id: str, params: phc.services.patient_ml.GetModelLogsParams)
-
Gets the log events for a particular run.
Expand source code
def get_model_logs( self, model_id: str, run_id: str, params: GetModelLogsParams ): """Gets the log events for a particular run.""" res = self._api_call( api_path=f"/v1/patient-ml/models/{model_id}/runs/{run_id}/logs", http_verb="GET", params=json.loads(params.json(exclude_none=True)), ) return GetModelLogsResponse.parse_obj(res.data)
def get_models(self)
-
Gets all model configs for an account.
Expand source code
def get_models(self): """Gets all model configs for an account.""" res = self._api_call(api_path="/v1/patient-ml/models", http_verb="GET") return GetModelsResponse.parse_obj(res.data)
def get_run(self, model_id: str, run_id: str)
-
Gets data for a particular run.
Expand source code
def get_run(self, model_id: str, run_id: str): """Gets data for a particular run.""" res = self._api_call( api_path=f"/v1/patient-ml/models/{model_id}/runs/{run_id}", http_verb="GET", ) return GetRunResponse.parse_obj(res.data)
def get_runs(self, model_id: str)
-
Gets data for all ML runs for a model.
Expand source code
def get_runs(self, model_id: str): """Gets data for all ML runs for a model.""" res = self._api_call( api_path=f"/v1/patient-ml/models/{model_id}/runs", http_verb="GET" ) return GetRunsResponse.parse_obj(res.data)
def predict(self, model_id: str)
-
Constructs an example and submits it to the referenced model for inference. The model's output predictions are then returned. The example is retrieved based on the model's problem type and the provided query parameters. Note that this route is only supported for models using the
cloud
deploy type, and which have a currently deployed model version (champion).Expand source code
def predict(self, model_id: str): """Constructs an example and submits it to the referenced model for inference. The model's output predictions are then returned. The example is retrieved based on the model's problem type and the provided query parameters. Note that this route is only supported for models using the `cloud` deploy type, and which have a currently deployed model version (champion).""" res = self._api_call( api_path=f"/v1/patient-ml/models/{model_id}/predictions", http_verb="GET", ) return PredictionResponse.parse_obj(res.data)
def put_dataset_label(self, dataset_id: str, example_id: str, body: phc.services.patient_ml.Label)
-
Updates the label for a training data example.
Expand source code
def put_dataset_label(self, dataset_id: str, example_id: str, body: Label): """Updates the label for a training data example.""" res = self._api_call( api_path=f"/v1/patient-ml/datasets/{dataset_id}/examples/{example_id}/label", http_verb="PUT", json=json.loads(body.json(exclude_none=True)), ) return Example.parse_obj(res.data)
def put_dataset_label_file(self, dataset_id: str, example_id: str, body: phc.services.patient_ml.LabelFileData)
-
Preprocesses the label data and updates the label file for a training data example. This is done for ML problem types that store their labels as independent files, such as image segmentation. For those problem types, The label data is not stored on a label FHIR record, but in a separate file-service file, and pointed to by a label FHIR record.
Expand source code
def put_dataset_label_file( self, dataset_id: str, example_id: str, body: LabelFileData ): """Preprocesses the label data and updates the label file for a training data example. This is done for ML problem types that store their labels as independent files, such as image segmentation. For those problem types, The label data is not stored on a label FHIR record, but in a separate file-service file, and pointed to by a label FHIR record.""" res = self._api_call( api_path=f"/v1/patient-ml/datasets/{dataset_id}/examples/{example_id}/label-file", http_verb="PUT", json=json.loads(body.json(exclude_none=True)), ) return PutDatasetLabelFileResponse.parse_obj(res.data)
def update_dataset(self, id: str, body: phc.services.patient_ml.DatasetConfigInput)
-
Updates a dataset config.
Expand source code
def update_dataset(self, id: str, body: DatasetConfigInput): """Updates a dataset config.""" res = self._api_call( api_path=f"/v1/patient-ml/datasets/{id}", http_verb="PUT", json=json.loads(body.json(exclude_none=True)), ) return UpdateDatasetResponse.parse_obj(res.data)
def update_model(self, id: str, body: phc.services.patient_ml.ModelConfigInput)
-
Updates a model config.
Expand source code
def update_model(self, id: str, body: ModelConfigInput): """Updates a model config.""" res = self._api_call( api_path=f"/v1/patient-ml/models/{id}", http_verb="PUT", json=json.loads(body.json(exclude_none=True)), ) return UpdateModelResponse.parse_obj(res.data)
class Projects (session: phc.session.Session, run_async: bool = False, timeout: int = 30, trust_env: bool = False)
-
Provides acccess to PHC projects
Parameters
session
:Session
- The PHC session
run_async
:bool
- True to return promises, False to return results (default is False)
timeout
:int
- Operation timeout (default is 30)
trust_env
:bool
- Get proxies information from HTTP_PROXY / HTTPS_PROXY environment variables if the parameter is True (False by default)
Expand source code
class Projects(BaseClient): """Provides acccess to PHC projects Parameters ---------- session : phc.Session The PHC session run_async: bool True to return promises, False to return results (default is False) timeout: int Operation timeout (default is 30) trust_env: bool Get proxies information from HTTP_PROXY / HTTPS_PROXY environment variables if the parameter is True (False by default) """ def create(self, name: str, description: str = None) -> ApiResponse: """Creates a project Parameters ---------- name : str The project name. description : str, optional The project description, by default None Returns ------- phc.ApiResponse The create project response """ json_body = {"name": name} if description: json_body["description"] = description return self._api_call("projects", json=json_body, http_verb="POST") def get(self, project_id: str) -> ApiResponse: """Fetch a project by id Parameters ---------- project_id : str The project ID. Returns ------- phc.ApiResponse The get project response """ return self._api_call(f"projects/{project_id}", http_verb="GET") def update( self, project_id: str, name: str, description: Optional[str] = None ) -> ApiResponse: """Update a project Parameters ---------- project_id : str The project ID. name : str The project name. description : str, optional The project description, by default None Returns ------- phc.ApiResponse The update project response """ json_body = {"name": name} if description: json_body["description"] = description return self._api_call( f"projects/{project_id}", json=json_body, http_verb="PATCH" ).data def delete(self, project_id: str) -> bool: """Delete a project Parameters ---------- project_id : str The project ID. Returns ------- bool True if the delete succeeeds, otherwise False """ return ( self._api_call( f"projects/{project_id}", http_verb="DELETE" ).status_code == 204 ) def get_list( self, page_size: Optional[int] = None, next_page_token: Optional[str] = None, name: Optional[str] = None, ) -> ApiResponse: """Fetch a list of projects in an account Parameters ---------- page_size : int, optional The page size, by default None next_page_token : str, optional The next page token, by default None name : str, optional A project name filter, by default None Returns ------- phc.ApiResponse The list projects response """ query_dict = {} if page_size: query_dict["pageSize"] = page_size if next_page_token: query_dict["nextPageToken"] = next_page_token if name: query_dict["name"] = name return self._api_call( f"projects?{urlencode(query_dict)}", http_verb="GET" )
Ancestors
- phc.base_client.BaseClient
Methods
def create(self, name: str, description: str = None) ‑> phc.api_response.ApiResponse
-
Creates a project
Parameters
name
:str
- The project name.
description
:str
, optional- The project description, by default None
Returns
ApiResponse
- The create project response
Expand source code
def create(self, name: str, description: str = None) -> ApiResponse: """Creates a project Parameters ---------- name : str The project name. description : str, optional The project description, by default None Returns ------- phc.ApiResponse The create project response """ json_body = {"name": name} if description: json_body["description"] = description return self._api_call("projects", json=json_body, http_verb="POST")
def delete(self, project_id: str) ‑> bool
-
Delete a project
Parameters
project_id
:str
- The project ID.
Returns
bool
- True if the delete succeeeds, otherwise False
Expand source code
def delete(self, project_id: str) -> bool: """Delete a project Parameters ---------- project_id : str The project ID. Returns ------- bool True if the delete succeeeds, otherwise False """ return ( self._api_call( f"projects/{project_id}", http_verb="DELETE" ).status_code == 204 )
def get(self, project_id: str) ‑> phc.api_response.ApiResponse
-
Fetch a project by id
Parameters
project_id
:str
- The project ID.
Returns
ApiResponse
- The get project response
Expand source code
def get(self, project_id: str) -> ApiResponse: """Fetch a project by id Parameters ---------- project_id : str The project ID. Returns ------- phc.ApiResponse The get project response """ return self._api_call(f"projects/{project_id}", http_verb="GET")
def get_list(self, page_size: Optional[int] = None, next_page_token: Optional[str] = None, name: Optional[str] = None) ‑> phc.api_response.ApiResponse
-
Fetch a list of projects in an account
Parameters
page_size
:int
, optional- The page size, by default None
next_page_token
:str
, optional- The next page token, by default None
name
:str
, optional- A project name filter, by default None
Returns
ApiResponse
- The list projects response
Expand source code
def get_list( self, page_size: Optional[int] = None, next_page_token: Optional[str] = None, name: Optional[str] = None, ) -> ApiResponse: """Fetch a list of projects in an account Parameters ---------- page_size : int, optional The page size, by default None next_page_token : str, optional The next page token, by default None name : str, optional A project name filter, by default None Returns ------- phc.ApiResponse The list projects response """ query_dict = {} if page_size: query_dict["pageSize"] = page_size if next_page_token: query_dict["nextPageToken"] = next_page_token if name: query_dict["name"] = name return self._api_call( f"projects?{urlencode(query_dict)}", http_verb="GET" )
def update(self, project_id: str, name: str, description: Optional[str] = None) ‑> phc.api_response.ApiResponse
-
Update a project
Parameters
- project_id : str
- The project ID.
name
:str
- The project name.
description
:str
, optional- The project description, by default None
Returns
ApiResponse
- The update project response
Expand source code
def update( self, project_id: str, name: str, description: Optional[str] = None ) -> ApiResponse: """Update a project Parameters ---------- project_id : str The project ID. name : str The project name. description : str, optional The project description, by default None Returns ------- phc.ApiResponse The update project response """ json_body = {"name": name} if description: json_body["description"] = description return self._api_call( f"projects/{project_id}", json=json_body, http_verb="PATCH" ).data
class Tasks (session: phc.session.Session, run_async: bool = False, timeout: int = 30, trust_env: bool = False)
-
Provides access to PHC Tasks
Parameters
session
:Session
- The PHC session.
run_async
:bool
- True to return promises, False to return results (default is False).
timeout
:int
- Operation timeout (default is 30).
trust_env
:bool
- Get proxies information from HTTP_PROXY / HTTPS_PROXY environment variables if the parameter is True (False by default).
Expand source code
class Tasks(BaseClient): """Provides access to PHC Tasks Parameters ---------- session: phc.Session The PHC session. run_async: bool True to return promises, False to return results (default is False). timeout: int Operation timeout (default is 30). trust_env: bool Get proxies information from HTTP_PROXY / HTTPS_PROXY environment variables if the parameter is True (False by default). """ def get(self, task_id: str) -> ApiResponse: """Fetch a task by id Parameters ---------- task_id: str The task ID. Returns ------- phc.ApiResponse The get task response. """ return self._api_call(f"tasks/{task_id}", http_verb="GET") def retry(self, task_id: str) -> ApiResponse: """Retry a task by id Parameters ---------- task_id: str The task ID. Returns ------- phc.ApiResponse The retry task response. """ return self._api_call(f"tasks/{task_id}:clone") def cancel(self, task_id: str) -> ApiResponse: """Cancel a task by id Parameters ---------- task_id: str The task ID. Returns ------- phc.ApiResponse The cancel task response. """ return self._api_call(f"tasks/{task_id}:cancel") def create(self, task: dict) -> ApiResponse: """Create a task Parameters ---------- task: dict The task to create. Returns ------- phc.ApiResponse The create task response. """ return self._api_call("tasks", json=task) def list( self, project_id: str, prefix: Optional[str] = None, state: Optional[str] = None, minimal: Optional[bool] = None, page_size: Optional[int] = None, next_page_token: Optional[str] = None, ): """Fetch a list of tasks in a project Parameters ---------- project_id: str The project ID for the tasks. prefix: str, optional The prefix to filter tasks by, by default None. state: str, optional The state to filter tasks by, by default None. bool: bool, optional Set to True to just get task state, by default None. page_size: int, optional The page size, by default None. next_page_token: str, optional The next page token, by default None. Returns ------- phc.ApiResponse The list tasks response. """ query_dict: Dict[str, Union[str, int]] = {} query_dict["datasetId"] = project_id if page_size: query_dict["pageSize"] = page_size if next_page_token: query_dict["nextPageToken"] = next_page_token if prefix: query_dict["name"] = prefix if state: query_dict["state"] = state if minimal: query_dict["view"] = "MINIMAL" return self._api_call(f"tasks?{urlencode(query_dict)}", http_verb="GET")
Ancestors
- phc.base_client.BaseClient
Methods
def cancel(self, task_id: str) ‑> phc.api_response.ApiResponse
-
Cancel a task by id
Parameters
task_id
:str
- The task ID.
Returns
ApiResponse
- The cancel task response.
Expand source code
def cancel(self, task_id: str) -> ApiResponse: """Cancel a task by id Parameters ---------- task_id: str The task ID. Returns ------- phc.ApiResponse The cancel task response. """ return self._api_call(f"tasks/{task_id}:cancel")
def create(self, task: dict) ‑> phc.api_response.ApiResponse
-
Create a task
Parameters
task
:dict
- The task to create.
Returns
ApiResponse
- The create task response.
Expand source code
def create(self, task: dict) -> ApiResponse: """Create a task Parameters ---------- task: dict The task to create. Returns ------- phc.ApiResponse The create task response. """ return self._api_call("tasks", json=task)
def get(self, task_id: str) ‑> phc.api_response.ApiResponse
-
Expand source code
def get(self, task_id: str) -> ApiResponse: """Fetch a task by id Parameters ---------- task_id: str The task ID. Returns ------- phc.ApiResponse The get task response. """ return self._api_call(f"tasks/{task_id}", http_verb="GET")
def list(self, project_id: str, prefix: Optional[str] = None, state: Optional[str] = None, minimal: Optional[bool] = None, page_size: Optional[int] = None, next_page_token: Optional[str] = None)
-
Fetch a list of tasks in a project
Parameters
project_id
:str
- The project ID for the tasks.
prefix
:str
, optional- The prefix to filter tasks by, by default None.
state
:str
, optional- The state to filter tasks by, by default None.
bool
:bool
, optional- Set to True to just get task state, by default None.
page_size
:int
, optional- The page size, by default None.
next_page_token
:str
, optional- The next page token, by default None.
Returns
ApiResponse
- The list tasks response.
Expand source code
def list( self, project_id: str, prefix: Optional[str] = None, state: Optional[str] = None, minimal: Optional[bool] = None, page_size: Optional[int] = None, next_page_token: Optional[str] = None, ): """Fetch a list of tasks in a project Parameters ---------- project_id: str The project ID for the tasks. prefix: str, optional The prefix to filter tasks by, by default None. state: str, optional The state to filter tasks by, by default None. bool: bool, optional Set to True to just get task state, by default None. page_size: int, optional The page size, by default None. next_page_token: str, optional The next page token, by default None. Returns ------- phc.ApiResponse The list tasks response. """ query_dict: Dict[str, Union[str, int]] = {} query_dict["datasetId"] = project_id if page_size: query_dict["pageSize"] = page_size if next_page_token: query_dict["nextPageToken"] = next_page_token if prefix: query_dict["name"] = prefix if state: query_dict["state"] = state if minimal: query_dict["view"] = "MINIMAL" return self._api_call(f"tasks?{urlencode(query_dict)}", http_verb="GET")
def retry(self, task_id: str) ‑> phc.api_response.ApiResponse
-
Retry a task by id
Parameters
task_id
:str
- The task ID.
Returns
ApiResponse
- The retry task response.
Expand source code
def retry(self, task_id: str) -> ApiResponse: """Retry a task by id Parameters ---------- task_id: str The task ID. Returns ------- phc.ApiResponse The retry task response. """ return self._api_call(f"tasks/{task_id}:clone")
class Tools (session: phc.session.Session, run_async: bool = False, timeout: int = 30, trust_env: bool = False)
-
Provides acccess to PHC tools registry
Parameters
session
:Session
- The PHC session
run_async
:bool
- True to return promises, False to return results (default is False)
timeout
:int
- Operation timeout (default is 30)
trust_env
:bool
- Get proxies information from HTTP_PROXY / HTTPS_PROXY environment variables if the parameter is True (False by default)
Expand source code
class Tools(BaseClient): """Provides acccess to PHC tools registry Parameters ---------- session: phc.Session The PHC session run_async: bool True to return promises, False to return results (default is False) timeout: int Operation timeout (default is 30) trust_env: bool Get proxies information from HTTP_PROXY / HTTPS_PROXY environment variables if the parameter is True (False by default) """ def create( self, name: str, description: str, access: ToolAccess, version: str, tool_class: ToolClass, source: str, labels: Optional[List[str]] = None, ) -> ApiResponse: """Create a tool. Parameters ---------- name: str The name to give to the tool description: str A description of the tool access: ToolAccess The access level given to the tool [PRIVATE, ACCOUNT, PHC, PUBLIC] version: str The initial version of the tool tool_class: ToolClass The class of the tool [Workflow, Notebook] source: str The path of the tool to upload labels: List[str], optional A list of labels to apply to the tool, i.e. ["bam","samtools"] Returns ------- ApiResponse The create tool response Examples -------- >>> from phc.services import Tools >>> tools = Tools(session) >>> tools.create(name="Read Depth Notebook", description="Generates a chart of positional read depth from a bam file", access="PHC", version="1.0.0", tool_class="Notebook", source="./mynotebook.ipynb", labels=["bam","samtools]") """ if not hasattr(ToolClass, tool_class): raise ValueError( f"{tool_class} is not a valid Tool Class value {[e.value for e in ToolClass]}" ) if not hasattr(ToolAccess, access): raise ValueError( f"{access} is not a valid Tool Class value {[e.value for e in ToolAccess]}" ) create_request = { "version": version, "access": access, "name": name, "toolClassId": ToolClassIdMappings[tool_class], "descriptorType": DescriptorTypeMappings[tool_class], "description": description, } if labels: create_request["labels"] = labels res = self._api_call( "/v1/trs/v2/tools", json=create_request, http_verb="POST" ) upload_request = { "fileName": source.split("/").pop(), "toolId": res["id"], "version": res["meta_version"], } upload_response = self._api_call( "/v1/trs/files", json=upload_request, http_verb="POST" ) file_size = os.path.getsize(source) self._api_call_impl( http_verb="PUT", url=upload_response["uploadUrl"], api_path=None, upload_file=source, headers={ "Content-Length": str(file_size), "Authorization": None, "LifeOmic-Account": None, "Content-Type": None, }, ) return res @backoff.on_exception( backoff.expo, OSError, max_tries=6, jitter=backoff.full_jitter ) def download( self, tool_id: str, version: Optional[str] = None, dest_dir: Optional[str] = os.getcwd(), ) -> None: """Download a tool Parameters ---------- tool_id : str The tool ID version : str, optional The version. dest_dir : str, optional The local directory to save the tool. Defaults to the current working directory Examples -------- >>> from phc.services import Tools >>> tools = Tools(session) >>> tools.download(tool_id="db3e09e9-1ecd-4976-aa5e-70ac7ada0cc3", dest_dir="./mydata") """ id = f"{tool_id}:{version}" if version else tool_id res = self._api_call(f"/v1/trs/files/{id}/download", http_verb="GET") file_path = os.path.join(dest_dir, res.get("fileName")) target_dir = os.path.dirname(file_path) if not os.path.exists(target_dir): os.makedirs(target_dir) urlretrieve(res.get("downloadUrl"), file_path) return file_path def get(self, tool_id: str, version: Optional[str] = None) -> ApiResponse: """Fetch a tool by id Parameters ---------- tool_id : str The tool ID. version : str, optional The version. Returns ------- phc.ApiResponse The get tool response """ id = f"{tool_id}:{version}" if version else tool_id return self._api_call(f"/v1/trs/v2/tools/{id}", http_verb="GET") def add_version( self, tool_id: str, version: str, source: str, is_default: Optional[bool] = False, ) -> ApiResponse: """Adds a new version to a tool. Parameters ---------- tool_id : str The tool ID to add the version to. version : str The new version for the tool. source: str The path of the version to upload. is_default: bool = False Updates default setting for the tool. Returns ------- phc.ApiResponse The updated tool response """ version_request = {"version": version, "isDefault": is_default} res = self._api_call( f"/v1/trs/v2/tools/{tool_id}/versions", json=version_request, http_verb="POST", ) upload_request = { "fileName": source.split("/").pop(), "toolId": res["id"], "version": res["meta_version"], } upload_response = self._api_call( "/v1/trs/files", json=upload_request, http_verb="POST" ) file_size = os.path.getsize(source) self._api_call_impl( http_verb="PUT", url=upload_response["uploadUrl"], api_path=None, upload_file=source, headers={ "Content-Length": str(file_size), "Authorization": None, "LifeOmic-Account": None, "Content-Type": None, }, ) return res def delete(self, tool_id: str, version: Optional[str] = None) -> bool: """Deletes a tool Parameters ---------- tool_id : str The tool ID. version : str, optional The version. Returns ------- bool True if the delete succeeeds, otherwise False """ id = f"{tool_id}:{version}" if version else tool_id return ( self._api_call( f"/v1/trs/v2/tools/{id}", http_verb="DELETE" ).status_code == 200 ) def get_list( self, tool_class: Optional[ToolClass] = None, organization: Optional[str] = None, tool_name: Optional[str] = None, author: Optional[str] = None, labels: Optional[List[str]] = None, page_size: Optional[int] = 1000, page_count: Optional[int] = 0, ) -> ApiResponse: """Fetch a list of tools from the registry Parameters ---------- tool_class: str, optional The class of the tool, by default None organization: str, optional The organization that owns the tool, by default None tool_name: str, optional The name of the tool, by default None author: str, optional The creator of the tool, by default None labels: List[str], optional A list of labels describing the tool, by default None page_size: int, optional The count of tools to return in a single request, by default 1000 page_count: int, optional The page count to return, by default 0 Returns ------- phc.ApiResponse The list files response """ query_dict = {"limit": page_size, "offset": page_count} if tool_class: if not hasattr(ToolClass, tool_class): raise ValueError( f"{tool_class} is not a valid Tool Class value {[e.value for e in ToolClass]}" ) query_dict["toolClass"] = tool_class if organization: query_dict["organization"] = organization if tool_name: query_dict["toolname"] = tool_name if author: query_dict["author"] = author if labels: query_dict["label"] = ",".join(labels) return self._api_call( f"/v1/trs/v2/tools?{urlencode(query_dict)}", http_verb="GET" )
Ancestors
- phc.base_client.BaseClient
Methods
def add_version(self, tool_id: str, version: str, source: str, is_default: Optional[bool] = False) ‑> phc.api_response.ApiResponse
-
Adds a new version to a tool.
Parameters
tool_id
:str
- The tool ID to add the version to.
version
:str
- The new version for the tool.
source
:str
- The path of the version to upload.
is_default
:bool = False
- Updates default setting for the tool.
Returns
ApiResponse
- The updated tool response
Expand source code
def add_version( self, tool_id: str, version: str, source: str, is_default: Optional[bool] = False, ) -> ApiResponse: """Adds a new version to a tool. Parameters ---------- tool_id : str The tool ID to add the version to. version : str The new version for the tool. source: str The path of the version to upload. is_default: bool = False Updates default setting for the tool. Returns ------- phc.ApiResponse The updated tool response """ version_request = {"version": version, "isDefault": is_default} res = self._api_call( f"/v1/trs/v2/tools/{tool_id}/versions", json=version_request, http_verb="POST", ) upload_request = { "fileName": source.split("/").pop(), "toolId": res["id"], "version": res["meta_version"], } upload_response = self._api_call( "/v1/trs/files", json=upload_request, http_verb="POST" ) file_size = os.path.getsize(source) self._api_call_impl( http_verb="PUT", url=upload_response["uploadUrl"], api_path=None, upload_file=source, headers={ "Content-Length": str(file_size), "Authorization": None, "LifeOmic-Account": None, "Content-Type": None, }, ) return res
def create(self, name: str, description: str, access: phc.services.tools.ToolAccess, version: str, tool_class: phc.services.tools.ToolClass, source: str, labels: Optional[List[str]] = None) ‑> phc.api_response.ApiResponse
-
Create a tool.
Parameters
name
:str
- The name to give to the tool
description
:str
- A description of the tool
access
:ToolAccess
- The access level given to the tool [PRIVATE, ACCOUNT, PHC, PUBLIC]
version
:str
- The initial version of the tool
tool_class
:ToolClass
- The class of the tool [Workflow, Notebook]
source
:str
- The path of the tool to upload
labels
:List[str]
, optional- A list of labels to apply to the tool, i.e. ["bam","samtools"]
Returns
ApiResponse
- The create tool response
Examples
>>> from phc.services import Tools >>> tools = Tools(session) >>> tools.create(name="Read Depth Notebook", description="Generates a chart of positional read depth from a bam file", access="PHC", version="1.0.0", tool_class="Notebook", source="./mynotebook.ipynb", labels=["bam","samtools]")
Expand source code
def create( self, name: str, description: str, access: ToolAccess, version: str, tool_class: ToolClass, source: str, labels: Optional[List[str]] = None, ) -> ApiResponse: """Create a tool. Parameters ---------- name: str The name to give to the tool description: str A description of the tool access: ToolAccess The access level given to the tool [PRIVATE, ACCOUNT, PHC, PUBLIC] version: str The initial version of the tool tool_class: ToolClass The class of the tool [Workflow, Notebook] source: str The path of the tool to upload labels: List[str], optional A list of labels to apply to the tool, i.e. ["bam","samtools"] Returns ------- ApiResponse The create tool response Examples -------- >>> from phc.services import Tools >>> tools = Tools(session) >>> tools.create(name="Read Depth Notebook", description="Generates a chart of positional read depth from a bam file", access="PHC", version="1.0.0", tool_class="Notebook", source="./mynotebook.ipynb", labels=["bam","samtools]") """ if not hasattr(ToolClass, tool_class): raise ValueError( f"{tool_class} is not a valid Tool Class value {[e.value for e in ToolClass]}" ) if not hasattr(ToolAccess, access): raise ValueError( f"{access} is not a valid Tool Class value {[e.value for e in ToolAccess]}" ) create_request = { "version": version, "access": access, "name": name, "toolClassId": ToolClassIdMappings[tool_class], "descriptorType": DescriptorTypeMappings[tool_class], "description": description, } if labels: create_request["labels"] = labels res = self._api_call( "/v1/trs/v2/tools", json=create_request, http_verb="POST" ) upload_request = { "fileName": source.split("/").pop(), "toolId": res["id"], "version": res["meta_version"], } upload_response = self._api_call( "/v1/trs/files", json=upload_request, http_verb="POST" ) file_size = os.path.getsize(source) self._api_call_impl( http_verb="PUT", url=upload_response["uploadUrl"], api_path=None, upload_file=source, headers={ "Content-Length": str(file_size), "Authorization": None, "LifeOmic-Account": None, "Content-Type": None, }, ) return res
def delete(self, tool_id: str, version: Optional[str] = None) ‑> bool
-
Deletes a tool
Parameters
tool_id
:str
- The tool ID.
version
:str
, optional- The version.
Returns
bool
- True if the delete succeeeds, otherwise False
Expand source code
def delete(self, tool_id: str, version: Optional[str] = None) -> bool: """Deletes a tool Parameters ---------- tool_id : str The tool ID. version : str, optional The version. Returns ------- bool True if the delete succeeeds, otherwise False """ id = f"{tool_id}:{version}" if version else tool_id return ( self._api_call( f"/v1/trs/v2/tools/{id}", http_verb="DELETE" ).status_code == 200 )
def download(self, tool_id: str, version: Optional[str] = None, dest_dir: Optional[str] = '/home/runner/work/phc-sdk-py/phc-sdk-py') ‑> None
-
Download a tool
Parameters
tool_id
:str
- The tool ID
version
:str
, optional- The version.
dest_dir
:str
, optional- The local directory to save the tool. Defaults to the current working directory
Examples
>>> from phc.services import Tools >>> tools = Tools(session) >>> tools.download(tool_id="db3e09e9-1ecd-4976-aa5e-70ac7ada0cc3", dest_dir="./mydata")
Expand source code
@backoff.on_exception( backoff.expo, OSError, max_tries=6, jitter=backoff.full_jitter ) def download( self, tool_id: str, version: Optional[str] = None, dest_dir: Optional[str] = os.getcwd(), ) -> None: """Download a tool Parameters ---------- tool_id : str The tool ID version : str, optional The version. dest_dir : str, optional The local directory to save the tool. Defaults to the current working directory Examples -------- >>> from phc.services import Tools >>> tools = Tools(session) >>> tools.download(tool_id="db3e09e9-1ecd-4976-aa5e-70ac7ada0cc3", dest_dir="./mydata") """ id = f"{tool_id}:{version}" if version else tool_id res = self._api_call(f"/v1/trs/files/{id}/download", http_verb="GET") file_path = os.path.join(dest_dir, res.get("fileName")) target_dir = os.path.dirname(file_path) if not os.path.exists(target_dir): os.makedirs(target_dir) urlretrieve(res.get("downloadUrl"), file_path) return file_path
def get(self, tool_id: str, version: Optional[str] = None) ‑> phc.api_response.ApiResponse
-
Fetch a tool by id
Parameters
tool_id
:str
- The tool ID.
version
:str
, optional- The version.
Returns
ApiResponse
- The get tool response
Expand source code
def get(self, tool_id: str, version: Optional[str] = None) -> ApiResponse: """Fetch a tool by id Parameters ---------- tool_id : str The tool ID. version : str, optional The version. Returns ------- phc.ApiResponse The get tool response """ id = f"{tool_id}:{version}" if version else tool_id return self._api_call(f"/v1/trs/v2/tools/{id}", http_verb="GET")
def get_list(self, tool_class: Optional[phc.services.tools.ToolClass] = None, organization: Optional[str] = None, tool_name: Optional[str] = None, author: Optional[str] = None, labels: Optional[List[str]] = None, page_size: Optional[int] = 1000, page_count: Optional[int] = 0) ‑> phc.api_response.ApiResponse
-
Fetch a list of tools from the registry
Parameters
tool_class
:str
, optional- The class of the tool, by default None
organization
:str
, optional- The organization that owns the tool, by default None
tool_name
:str
, optional- The name of the tool, by default None
author
:str
, optional- The creator of the tool, by default None
labels
:List[str]
, optional- A list of labels describing the tool, by default None
page_size
:int
, optional- The count of tools to return in a single request, by default 1000
page_count
:int
, optional- The page count to return, by default 0
Returns
ApiResponse
- The list files response
Expand source code
def get_list( self, tool_class: Optional[ToolClass] = None, organization: Optional[str] = None, tool_name: Optional[str] = None, author: Optional[str] = None, labels: Optional[List[str]] = None, page_size: Optional[int] = 1000, page_count: Optional[int] = 0, ) -> ApiResponse: """Fetch a list of tools from the registry Parameters ---------- tool_class: str, optional The class of the tool, by default None organization: str, optional The organization that owns the tool, by default None tool_name: str, optional The name of the tool, by default None author: str, optional The creator of the tool, by default None labels: List[str], optional A list of labels describing the tool, by default None page_size: int, optional The count of tools to return in a single request, by default 1000 page_count: int, optional The page count to return, by default 0 Returns ------- phc.ApiResponse The list files response """ query_dict = {"limit": page_size, "offset": page_count} if tool_class: if not hasattr(ToolClass, tool_class): raise ValueError( f"{tool_class} is not a valid Tool Class value {[e.value for e in ToolClass]}" ) query_dict["toolClass"] = tool_class if organization: query_dict["organization"] = organization if tool_name: query_dict["toolname"] = tool_name if author: query_dict["author"] = author if labels: query_dict["label"] = ",".join(labels) return self._api_call( f"/v1/trs/v2/tools?{urlencode(query_dict)}", http_verb="GET" )
class Workflows (session: phc.session.Session, run_async: bool = False, timeout: int = 30, trust_env: bool = False)
-
Provides acccess to PHC Workflows
Parameters
session
:Session
- The PHC session
run_async
:bool
- True to return promises, False to return results (default is False)
timeout
:int
- Operation timeout (default is 30)
trust_env
:bool
- Get proxies information from HTTP_PROXY / HTTPS_PROXY environment variables if the parameter is True (False by default)
Expand source code
class Workflows(BaseClient): """Provides acccess to PHC Workflows Parameters ---------- session: phc.Session The PHC session run_async: bool True to return promises, False to return results (default is False) timeout: int Operation timeout (default is 30) trust_env: bool Get proxies information from HTTP_PROXY / HTTPS_PROXY environment variables if the parameter is True (False by default) """ def run( self, project_id: str, name: str, tool: str, workflow_inputs: Optional[str] = None, workflow_inputs_file_id: Optional[str] = None, output_project_folder: Optional[str] = None, ) -> ApiResponse: """Create a tool. Parameters ---------- project_id: str The project ID name: str The name to give to this run of a tool tool: str The tool id or organization/name of the tool to run workflow_inputs: str, optional The inputs required by the workflow as a json string, either this or workflow_inputs_file_id are required workflow_inputs_file_id: str, optional The inputs required by the workflow as provided in a file in PHC, either this or workflow_inputs are required output_project_folder: str, optional The destination output folder in PHC for the workflow run outputs Returns ------- ApiResponse The workflow run response Examples -------- >>> from phc.services import Workflows >>> workflows = Workflows(session) >>> workflows.run(project_id="d2876f48-724f-4987-9cf0-92c7ef99a9fa", name="Ashion ingest subj: 2405", tool="lifeomic/ashion-ingest-workflow", workflow_inputs="{'reference': 'GRCh37','tarFile': {'class': 'File','fileId': '28235c74-9731-4496-bb3c-41c361f106f3'}, 'source': 'incoming/ashion_C043_9999_009990_T1_K1ID2_ps20190814000000.tar.gz'}") """ create_request = { "datasetId": project_id, "name": name, "workflowSourceFileId": tool, } if workflow_inputs: create_request["workflowInputs"] = workflow_inputs elif workflow_inputs_file_id: create_request["workflowInputsFileId"] = workflow_inputs_file_id else: raise ValueError( "Must provide a value for the workflow_inputs or workflow_inputs_file_id" ) if output_project_folder: create_request["outputProjectFolder"] = output_project_folder res = self._api_call( "/v1/workflows/ga4gh/wes/runs", json=create_request, http_verb="POST", ) return res def get(self, project_id: str, workflow_id: str) -> ApiResponse: """Get workflow metadata by id Parameters ---------- project_id: str The project ID workflow_id : str The workflow ID. Returns ------- phc.ApiResponse The get workflow response """ return self._api_call( f"/v1/workflows/ga4gh/wes/runs/{project_id}:{workflow_id}", http_verb="GET", ) def get_list( self, project_id: str, page_size: Optional[int] = 100, next_page_token: Optional[str] = None, ) -> ApiResponse: """Fetch a list of workflows run in the specified project Parameters ---------- project_id: str The project ID page_size : int, optional The page size, by default 100 next_page_token : str, optional The next page token, by default None Returns ------- phc.ApiResponse The list workflow run response """ query_dict = {"datasetId": project_id} if page_size: query_dict["pageSize"] = page_size if next_page_token: query_dict["nextPageToken"] = next_page_token return self._api_call( f"/v1/workflows/ga4gh/wes/runs?{urlencode(query_dict)}", http_verb="GET", ) def describe(self, project_id: str, tool: str) -> ApiResponse: """Returns a description of the inputs the workflow engine requires for the given tool Parameters ---------- project_id: str The project ID tool: str The tool id or organization/name of the tool to run Returns ------- phc.ApiResponse The description of the inputs for the given tool """ describe_request = { "datasetId": project_id, "workflowSourceFileId": tool, } return self._api_call( "/v1/workflows/ga4gh/wes/runs/parse", json=describe_request, http_verb="POST", )
Ancestors
- phc.base_client.BaseClient
Methods
def describe(self, project_id: str, tool: str) ‑> phc.api_response.ApiResponse
-
Returns a description of the inputs the workflow engine requires for the given tool
Parameters
project_id
:str
- The project ID
tool
:str
- The tool id or organization/name of the tool to run
Returns
ApiResponse
- The description of the inputs for the given tool
Expand source code
def describe(self, project_id: str, tool: str) -> ApiResponse: """Returns a description of the inputs the workflow engine requires for the given tool Parameters ---------- project_id: str The project ID tool: str The tool id or organization/name of the tool to run Returns ------- phc.ApiResponse The description of the inputs for the given tool """ describe_request = { "datasetId": project_id, "workflowSourceFileId": tool, } return self._api_call( "/v1/workflows/ga4gh/wes/runs/parse", json=describe_request, http_verb="POST", )
def get(self, project_id: str, workflow_id: str) ‑> phc.api_response.ApiResponse
-
Get workflow metadata by id
Parameters
project_id
:str
- The project ID
workflow_id
:str
- The workflow ID.
Returns
ApiResponse
- The get workflow response
Expand source code
def get(self, project_id: str, workflow_id: str) -> ApiResponse: """Get workflow metadata by id Parameters ---------- project_id: str The project ID workflow_id : str The workflow ID. Returns ------- phc.ApiResponse The get workflow response """ return self._api_call( f"/v1/workflows/ga4gh/wes/runs/{project_id}:{workflow_id}", http_verb="GET", )
def get_list(self, project_id: str, page_size: Optional[int] = 100, next_page_token: Optional[str] = None) ‑> phc.api_response.ApiResponse
-
Fetch a list of workflows run in the specified project
Parameters
project_id
:str
- The project ID
page_size
:int
, optional- The page size, by default 100
next_page_token
:str
, optional- The next page token, by default None
Returns
ApiResponse
- The list workflow run response
Expand source code
def get_list( self, project_id: str, page_size: Optional[int] = 100, next_page_token: Optional[str] = None, ) -> ApiResponse: """Fetch a list of workflows run in the specified project Parameters ---------- project_id: str The project ID page_size : int, optional The page size, by default 100 next_page_token : str, optional The next page token, by default None Returns ------- phc.ApiResponse The list workflow run response """ query_dict = {"datasetId": project_id} if page_size: query_dict["pageSize"] = page_size if next_page_token: query_dict["nextPageToken"] = next_page_token return self._api_call( f"/v1/workflows/ga4gh/wes/runs?{urlencode(query_dict)}", http_verb="GET", )
def run(self, project_id: str, name: str, tool: str, workflow_inputs: Optional[str] = None, workflow_inputs_file_id: Optional[str] = None, output_project_folder: Optional[str] = None) ‑> phc.api_response.ApiResponse
-
Create a tool.
Parameters
project_id
:str
- The project ID
name
:str
- The name to give to this run of a tool
tool
:str
- The tool id or organization/name of the tool to run
workflow_inputs
:str
, optional- The inputs required by the workflow as a json string, either this or workflow_inputs_file_id are required
workflow_inputs_file_id
:str
, optional- The inputs required by the workflow as provided in a file in PHC, either this or workflow_inputs are required
output_project_folder
:str
, optional- The destination output folder in PHC for the workflow run outputs
Returns
ApiResponse
- The workflow run response
Examples
>>> from phc.services import Workflows >>> workflows = Workflows(session) >>> workflows.run(project_id="d2876f48-724f-4987-9cf0-92c7ef99a9fa", name="Ashion ingest subj: 2405", tool="lifeomic/ashion-ingest-workflow", workflow_inputs="{'reference': 'GRCh37','tarFile': {'class': 'File','fileId': '28235c74-9731-4496-bb3c-41c361f106f3'}, 'source': 'incoming/ashion_C043_9999_009990_T1_K1ID2_ps20190814000000.tar.gz'}")
Expand source code
def run( self, project_id: str, name: str, tool: str, workflow_inputs: Optional[str] = None, workflow_inputs_file_id: Optional[str] = None, output_project_folder: Optional[str] = None, ) -> ApiResponse: """Create a tool. Parameters ---------- project_id: str The project ID name: str The name to give to this run of a tool tool: str The tool id or organization/name of the tool to run workflow_inputs: str, optional The inputs required by the workflow as a json string, either this or workflow_inputs_file_id are required workflow_inputs_file_id: str, optional The inputs required by the workflow as provided in a file in PHC, either this or workflow_inputs are required output_project_folder: str, optional The destination output folder in PHC for the workflow run outputs Returns ------- ApiResponse The workflow run response Examples -------- >>> from phc.services import Workflows >>> workflows = Workflows(session) >>> workflows.run(project_id="d2876f48-724f-4987-9cf0-92c7ef99a9fa", name="Ashion ingest subj: 2405", tool="lifeomic/ashion-ingest-workflow", workflow_inputs="{'reference': 'GRCh37','tarFile': {'class': 'File','fileId': '28235c74-9731-4496-bb3c-41c361f106f3'}, 'source': 'incoming/ashion_C043_9999_009990_T1_K1ID2_ps20190814000000.tar.gz'}") """ create_request = { "datasetId": project_id, "name": name, "workflowSourceFileId": tool, } if workflow_inputs: create_request["workflowInputs"] = workflow_inputs elif workflow_inputs_file_id: create_request["workflowInputsFileId"] = workflow_inputs_file_id else: raise ValueError( "Must provide a value for the workflow_inputs or workflow_inputs_file_id" ) if output_project_folder: create_request["outputProjectFolder"] = output_project_folder res = self._api_call( "/v1/workflows/ga4gh/wes/runs", json=create_request, http_verb="POST", ) return res