Updated BIGSdb API to be more tolerant towards failures

This commit is contained in:
Harrison Deng 2025-01-09 16:49:12 +00:00
parent d4f890a150
commit 463e320386
3 changed files with 37 additions and 13 deletions

View File

@ -9,5 +9,5 @@ class Allele:
@dataclass(frozen=True)
class MLSTProfile:
alleles: Mapping[str, Sequence[Allele]]
sequence_type: int
sequence_type: str
clonal_complex: str

View File

@ -0,0 +1,16 @@
from typing import Union
class BIGSDbDatabaseAPIException(Exception):
pass
class NoBIGSdbExactMatchesException(BIGSDbDatabaseAPIException):
def __init__(self, database_name: str, database_schema_id: int, *args):
super().__init__(f"No exact match found with schema with ID {database_schema_id} in the database \"{database_name}\".", *args)
class NoSuchBIGSdbDatabaseException(BIGSDbDatabaseAPIException):
def __init__(self, database_name: str, *args):
super().__init__(f"No database \"{database_name}\" found.", *args)
class NoSuchBigSdbSchemaException(BIGSDbDatabaseAPIException):
def __init__(self, database_name: str, database_schema_id: int, *args):
super().__init__(f"No schema with ID {database_schema_id} in \"{database_name}\" found.", *args)

View File

@ -6,6 +6,7 @@ from aiohttp import ClientSession, ClientTimeout
from automlst.engine.data.genomics import NamedString
from automlst.engine.data.mlst import Allele, MLSTProfile
from automlst.engine.exceptions.database import NoBIGSdbExactMatchesException, NoSuchBIGSdbDatabaseException
class BIGSdbMLSTProfiler(AbstractAsyncContextManager):
@ -25,12 +26,10 @@ class BIGSdbMLSTProfiler(AbstractAsyncContextManager):
"sequence": sequence_string
})
sequence_response: dict = await response.json()
if "exact_matches" not in sequence_response:
# TODO throw exception for not finding matches.
pass
if "exact_matches" not in sequence_response:
raise ValueError(f"Unable to find exact matches in \"{self._database_name}\" under schema ID \"{self._schema_id}\".")
raise NoBIGSdbExactMatchesException(self._database_name, self._schema_id)
exact_matches: dict[str, Sequence[dict[str, str]]] = sequence_response["exact_matches"]
for allele_loci, alleles in exact_matches.items():
for allele in alleles:
@ -48,11 +47,16 @@ class BIGSdbMLSTProfiler(AbstractAsyncContextManager):
}
async with self._http_client.post(uri_path, json=request_json) as response:
response_json = await response.json()
if "fields" not in response_json:
# TODO raise exception about invalid parameters or no exact parameterization found
pass
schema_fields_returned = response_json["fields"]
if "exact_matches" not in response_json:
raise NoBIGSdbExactMatchesException(self._database_name, self._schema_id)
schema_exact_matches: dict = response_json["exact_matches"]
if "fields" not in response_json:
schema_fields_returned = {
"ST": "Unknown",
"Clonal Complex": "Unknown"
}
else:
schema_fields_returned: Mapping[str, str] = response_json["fields"]
allele_map: dict[str, list[Allele]] = defaultdict(list)
for exact_match_loci, exact_match_alleles in schema_exact_matches.items():
for exact_match_allele in exact_match_alleles:
@ -64,10 +68,14 @@ class BIGSdbMLSTProfiler(AbstractAsyncContextManager):
return await self.fetch_mlst_st(alleles)
async def profile_multiple_strings(self, namedStrings: AsyncIterable[NamedString]) -> AsyncGenerator[tuple[str, MLSTProfile], Any]:
async def profile_multiple_strings(self, namedStrings: AsyncIterable[NamedString], stop_on_fail: bool = False) -> AsyncGenerator[Union[tuple[str, MLSTProfile], tuple[str, None]], Any]:
async for named_string in namedStrings:
yield (named_string.name, await self.profile_string(named_string.sequence))
try:
yield (named_string.name, await self.profile_string(named_string.sequence))
except NoBIGSdbExactMatchesException as e:
if stop_on_fail:
raise e
yield (named_string.name, None)
async def close(self):
await self._http_client.close()
@ -107,7 +115,7 @@ class BIGSdbIndex(AbstractAsyncContextManager):
async def get_bigsdb_api_from_seqdefdb(self, seqdef_db_name: str) -> str:
known_databases = await self.get_known_seqdef_dbs()
if seqdef_db_name not in known_databases:
raise ValueError(f"The database \"{seqdef_db_name}\" could not be found.")
raise NoSuchBIGSdbDatabaseException(seqdef_db_name)
return known_databases[seqdef_db_name]
async def get_schemas_for_seqdefdb(self, seqdef_db_name: str, force: bool = False) -> Mapping[str, int]: