Replaced schema with scheme
All checks were successful
autoBIGS.engine/pipeline/head This commit looks good

This commit is contained in:
2025-02-26 04:50:54 +00:00
parent 06dbb56c28
commit 27ae89fde7
6 changed files with 51 additions and 76 deletions

View File

@@ -43,10 +43,10 @@ class BIGSdbMLSTProfiler(AbstractAsyncContextManager):
class RemoteBIGSdbMLSTProfiler(BIGSdbMLSTProfiler):
def __init__(self, database_api: str, database_name: str, schema_id: int):
def __init__(self, database_api: str, database_name: str, scheme_id: int):
self._database_name = database_name
self._schema_id = schema_id
self._base_url = f"{database_api}/db/{self._database_name}/schemes/{self._schema_id}/"
self._scheme_id = scheme_id
self._base_url = f"{database_api}/db/{self._database_name}/schemes/{self._scheme_id}/"
self._http_client = ClientSession(self._base_url, timeout=ClientTimeout(60))
async def __aenter__(self):
@@ -90,7 +90,7 @@ class RemoteBIGSdbMLSTProfiler(BIGSdbMLSTProfiler):
)
yield result_allele if isinstance(sequence_string, str) else (sequence_string.name, result_allele)
else:
raise NoBIGSdbMatchesException(self._database_name, self._schema_id, sequence_string.name if isinstance(sequence_string, NamedString) else None)
raise NoBIGSdbMatchesException(self._database_name, self._scheme_id, sequence_string.name if isinstance(sequence_string, NamedString) else None)
async def determine_mlst_st(self, alleles: Union[AsyncIterable[Union[Allele, tuple[str, Allele]]], Iterable[Union[Allele, tuple[str, Allele]]]]) -> Union[MLSTProfile, NamedMLSTProfile]:
uri_path = "designations"
@@ -117,15 +117,15 @@ class RemoteBIGSdbMLSTProfiler(BIGSdbMLSTProfiler):
response_json: dict = await response.json()
allele_set: Set[Allele] = set()
response_json.setdefault("fields", dict())
schema_fields_returned: dict[str, str] = response_json["fields"]
schema_fields_returned.setdefault("ST", "unknown")
schema_fields_returned.setdefault("clonal_complex", "unknown")
schema_exact_matches: dict = response_json["exact_matches"]
for exact_match_locus, exact_match_alleles in schema_exact_matches.items():
scheme_fields_returned: dict[str, str] = response_json["fields"]
scheme_fields_returned.setdefault("ST", "unknown")
scheme_fields_returned.setdefault("clonal_complex", "unknown")
scheme_exact_matches: dict = response_json["exact_matches"]
for exact_match_locus, exact_match_alleles in scheme_exact_matches.items():
allele_set.add(Allele(exact_match_locus, exact_match_alleles[0]["allele_id"], None))
if len(allele_set) == 0:
raise ValueError("Passed in no alleles.")
result_mlst_profile = MLSTProfile(allele_set, schema_fields_returned["ST"], schema_fields_returned["clonal_complex"])
result_mlst_profile = MLSTProfile(allele_set, scheme_fields_returned["ST"], scheme_fields_returned["clonal_complex"])
if len(names_list) > 0:
result_mlst_profile = NamedMLSTProfile(str(tuple(names_list)), result_mlst_profile)
return result_mlst_profile
@@ -165,7 +165,7 @@ class BIGSdbIndex(AbstractAsyncContextManager):
def __init__(self):
self._http_client = ClientSession()
self._known_seqdef_dbs_origin: Union[Mapping[str, str], None] = None
self._seqdefdb_schemas: dict[str, Union[Mapping[str, int], None]] = dict()
self._seqdefdb_schemes: dict[str, Union[Mapping[str, int], None]] = dict()
super().__init__()
async def __aenter__(self):
@@ -191,22 +191,22 @@ class BIGSdbIndex(AbstractAsyncContextManager):
raise NoSuchBIGSdbDatabaseException(seqdef_db_name)
return known_databases[seqdef_db_name]
async def get_schemas_for_seqdefdb(self, seqdef_db_name: str, force: bool = False) -> Mapping[str, int]:
if seqdef_db_name in self._seqdefdb_schemas and not force:
return self._seqdefdb_schemas[seqdef_db_name] # type: ignore since it's guaranteed to not be none by conditional
async def get_schemes_for_seqdefdb(self, seqdef_db_name: str, force: bool = False) -> Mapping[str, int]:
if seqdef_db_name in self._seqdefdb_schemes and not force:
return self._seqdefdb_schemes[seqdef_db_name] # type: ignore since it's guaranteed to not be none by conditional
uri_path = f"{await self.get_bigsdb_api_from_seqdefdb(seqdef_db_name)}/db/{seqdef_db_name}/schemes"
async with self._http_client.get(uri_path) as response:
response_json = await response.json()
schema_descriptions: Mapping[str, int] = dict()
scheme_descriptions: Mapping[str, int] = dict()
for scheme_definition in response_json["schemes"]:
scheme_id: int = int(str(scheme_definition["scheme"]).split("/")[-1])
scheme_desc: str = scheme_definition["description"]
schema_descriptions[scheme_desc] = scheme_id
self._seqdefdb_schemas[seqdef_db_name] = schema_descriptions
return self._seqdefdb_schemas[seqdef_db_name] # type: ignore
scheme_descriptions[scheme_desc] = scheme_id
self._seqdefdb_schemes[seqdef_db_name] = scheme_descriptions
return self._seqdefdb_schemes[seqdef_db_name] # type: ignore
async def build_profiler_from_seqdefdb(self, local: bool, dbseqdef_name: str, schema_id: int) -> BIGSdbMLSTProfiler:
return get_BIGSdb_MLST_profiler(local, await self.get_bigsdb_api_from_seqdefdb(dbseqdef_name), dbseqdef_name, schema_id)
async def build_profiler_from_seqdefdb(self, local: bool, dbseqdef_name: str, scheme_id: int) -> BIGSdbMLSTProfiler:
return get_BIGSdb_MLST_profiler(local, await self.get_bigsdb_api_from_seqdefdb(dbseqdef_name), dbseqdef_name, scheme_id)
async def close(self):
await self._http_client.close()
@@ -214,7 +214,7 @@ class BIGSdbIndex(AbstractAsyncContextManager):
async def __aexit__(self, exc_type, exc_value, traceback):
await self.close()
def get_BIGSdb_MLST_profiler(local: bool, database_api: str, database_name: str, schema_id: int):
def get_BIGSdb_MLST_profiler(local: bool, database_api: str, database_name: str, scheme_id: int):
if local:
raise NotImplementedError()
return RemoteBIGSdbMLSTProfiler(database_api=database_api, database_name=database_name, schema_id=schema_id)
return RemoteBIGSdbMLSTProfiler(database_api=database_api, database_name=database_name, scheme_id=scheme_id)

View File

@@ -5,21 +5,21 @@ class BIGSDbDatabaseAPIException(Exception):
class NoBIGSdbMatchesException(BIGSDbDatabaseAPIException):
def __init__(self, database_name: str, database_schema_id: int, query_name: Union[None, str], *args):
def __init__(self, database_name: str, database_scheme_id: int, query_name: Union[None, str], *args):
self._query_name = query_name
super().__init__(f"No matches found with schema with ID {database_schema_id} in the database \"{database_name}\".", *args)
super().__init__(f"No matches found with scheme with ID {database_scheme_id} in the database \"{database_name}\".", *args)
def get_causal_query_name(self) -> Union[str, None]:
return self._query_name
class NoBIGSdbExactMatchesException(NoBIGSdbMatchesException):
def __init__(self, database_name: str, database_schema_id: int, *args):
super().__init__(f"No exact match found with schema with ID {database_schema_id} in the database \"{database_name}\".", *args)
def __init__(self, database_name: str, database_scheme_id: int, *args):
super().__init__(f"No exact match found with scheme with ID {database_scheme_id} in the database \"{database_name}\".", *args)
class NoSuchBIGSdbDatabaseException(BIGSDbDatabaseAPIException):
def __init__(self, database_name: str, *args):
super().__init__(f"No database \"{database_name}\" found.", *args)
class NoSuchBigSdbSchemaException(BIGSDbDatabaseAPIException):
def __init__(self, database_name: str, database_schema_id: int, *args):
super().__init__(f"No schema with ID {database_schema_id} in \"{database_name}\" found.", *args)
class NoSuchBigSdbschemeException(BIGSDbDatabaseAPIException):
def __init__(self, database_name: str, database_scheme_id: int, *args):
super().__init__(f"No scheme with ID {database_scheme_id} in \"{database_name}\" found.", *args)

View File

@@ -25,7 +25,7 @@ class SangerTraceData(NamedString):
analysis_proto_settings_name: str
analysis_rpto_settings_ver: str
analysis_proto_xml_data: str
analysis_proto_xml_schema_ver: str
analysis_proto_xml_scheme_ver: str
sample_comment: Union[None, str]
capillary_machine: bool
container_identifier: str