Completed updating CLI to be more organized with bettert error messages
This commit is contained in:
@@ -21,7 +21,7 @@ async def write_mlst_profiles_as_csv(mlst_profiles_iterable: AsyncIterable[tuple
|
||||
writer: Union[csv.DictWriter, None] = None
|
||||
async for name, mlst_profile in mlst_profiles_iterable:
|
||||
if writer is None:
|
||||
header = ["st", "clonal-complex", "id", *mlst_profile.alleles.keys()]
|
||||
header = ["id", "st", "clonal-complex", *mlst_profile.alleles.keys()]
|
||||
writer = csv.DictWriter(filehandle, fieldnames=header)
|
||||
writer.writeheader()
|
||||
row_dictionary = {
|
||||
|
@@ -10,7 +10,9 @@ from automlst.engine.data.mlst import Allele, MLSTProfile
|
||||
class BigSDBMLSTProfiler(AbstractAsyncContextManager):
|
||||
|
||||
def __init__(self, database_api: str, database_name: str, schema_id: int):
|
||||
self._base_url = f"{database_api}/db/{database_name}/schemes/{schema_id}/"
|
||||
self._database_name = database_name
|
||||
self._schema_id = schema_id
|
||||
self._base_url = f"{database_api}/db/{self._database_name}/schemes/{self._schema_id}/"
|
||||
self._http_client = ClientSession(self._base_url, timeout=ClientTimeout(10000))
|
||||
|
||||
async def __aenter__(self):
|
||||
@@ -26,6 +28,9 @@ class BigSDBMLSTProfiler(AbstractAsyncContextManager):
|
||||
if "exact_matches" not in sequence_response:
|
||||
# TODO throw exception for not finding matches.
|
||||
pass
|
||||
|
||||
if "exact_matches" not in sequence_response:
|
||||
raise ValueError(f"Unable to find exact matches in \"{self._database_name}\" under schema ID \"{self._schema_id}\".")
|
||||
exact_matches: dict[str, Sequence[dict[str, str]]] = sequence_response["exact_matches"]
|
||||
for allele_loci, alleles in exact_matches.items():
|
||||
for allele in alleles:
|
||||
@@ -100,12 +105,15 @@ class BIGSdbIndex(AbstractAsyncContextManager):
|
||||
return self._known_seqdef_dbs_origin
|
||||
|
||||
async def get_bigsdb_api_from_seqdefdb(self, seqdef_db_name: str) -> str:
|
||||
return (await self.get_known_seqdef_dbs())[seqdef_db_name]
|
||||
known_databases = await self.get_known_seqdef_dbs()
|
||||
if seqdef_db_name not in known_databases:
|
||||
raise ValueError(f"The database \"{seqdef_db_name}\" could not be found.")
|
||||
return known_databases[seqdef_db_name]
|
||||
|
||||
async def get_schemas_for_seqdefdb(self, seqdef_db_name: str, force: bool = False) -> Mapping[str, int]:
|
||||
if self._seqdefdb_schemas[seqdef_db_name] is not None and not force:
|
||||
if seqdef_db_name in self._seqdefdb_schemas and not force:
|
||||
return self._seqdefdb_schemas[seqdef_db_name] # type: ignore since it's guaranteed to not be none by conditional
|
||||
uri_path = f"{await self.get_bigsdb_api_from_seqdefdb(seqdef_db_name)}/{seqdef_db_name}/schemes"
|
||||
uri_path = f"{await self.get_bigsdb_api_from_seqdefdb(seqdef_db_name)}/db/{seqdef_db_name}/schemes"
|
||||
async with self._http_client.get(uri_path) as response:
|
||||
response_json = await response.json()
|
||||
schema_descriptions: Mapping[str, int] = dict()
|
||||
|
Reference in New Issue
Block a user