Updated naming (again).

This commit is contained in:
2025-01-22 21:05:19 +00:00
parent 1372141b57
commit 1773bb9dcb
17 changed files with 20 additions and 20 deletions

View File

View File

View File

@@ -0,0 +1,41 @@
import csv
from os import PathLike
from typing import AsyncIterable, Mapping, Sequence, Union
from autobigs.engine.data.structures.mlst import Allele, MLSTProfile
def dict_loci_alleles_variants_from_loci(alleles_map: Mapping[str, Sequence[Allele]]):
result_dict: dict[str, Union[list[str], str]] = {}
for loci, alleles in alleles_map.items():
if len(alleles) == 1:
result_dict[loci] = alleles[0].allele_variant
else:
result_locis = list()
for allele in alleles:
result_locis.append(allele.allele_variant)
result_dict[loci] = result_locis
return result_dict
async def write_mlst_profiles_as_csv(mlst_profiles_iterable: AsyncIterable[tuple[str, Union[MLSTProfile, None]]], handle: Union[str, bytes, PathLike[str], PathLike[bytes]]) -> Sequence[str]:
failed = list()
with open(handle, "w", newline='') as filehandle:
header = None
writer: Union[csv.DictWriter, None] = None
async for name, mlst_profile in mlst_profiles_iterable:
if mlst_profile is None:
failed.append(name)
continue
if writer is None:
header = ["id", "st", "clonal-complex", *mlst_profile.alleles.keys()]
writer = csv.DictWriter(filehandle, fieldnames=header)
writer.writeheader()
row_dictionary = {
"st": mlst_profile.sequence_type,
"clonal-complex": mlst_profile.clonal_complex,
"id": name,
**dict_loci_alleles_variants_from_loci(mlst_profile.alleles)
}
writer.writerow(rowdict=row_dictionary)
return failed

View File

@@ -0,0 +1,16 @@
import asyncio
from io import TextIOWrapper
from typing import Any, AsyncGenerator, Generator, Iterable, Sequence, Union
from Bio import SeqIO
from autobigs.engine.data.structures.genomics import NamedString
async def read_fasta(handle: Union[str, TextIOWrapper]) -> AsyncGenerator[NamedString, Any]:
fasta_sequences = asyncio.to_thread(SeqIO.parse, handle=handle, format="fasta")
for fasta_sequence in await fasta_sequences:
yield NamedString(fasta_sequence.id, str(fasta_sequence.seq))
async def read_multiple_fastas(handles: Iterable[Union[str, TextIOWrapper]]) -> AsyncGenerator[NamedString, Any]:
for handle in handles:
async for named_seq in read_fasta(handle):
yield named_seq

View File

@@ -0,0 +1,166 @@
from collections import defaultdict
from contextlib import AbstractAsyncContextManager
from numbers import Number
from typing import Any, AsyncGenerator, AsyncIterable, Collection, Generator, Iterable, Mapping, Sequence, Union
from aiohttp import ClientSession, ClientTimeout
from autobigs.engine.data.structures.genomics import NamedString
from autobigs.engine.data.structures.mlst import Allele, PartialAllelicMatchProfile, MLSTProfile
from autobigs.engine.exceptions.database import NoBIGSdbExactMatchesException, NoBIGSdbMatchesException, NoSuchBIGSdbDatabaseException
class BIGSdbMLSTProfiler(AbstractAsyncContextManager):
def __init__(self, database_api: str, database_name: str, schema_id: int):
self._database_name = database_name
self._schema_id = schema_id
self._base_url = f"{database_api}/db/{self._database_name}/schemes/{self._schema_id}/"
self._http_client = ClientSession(self._base_url, timeout=ClientTimeout(10000))
async def __aenter__(self):
return self
async def fetch_mlst_allele_variants(self, sequence_string: str, exact: bool) -> AsyncGenerator[Allele, Any]:
# See https://bigsdb.pasteur.fr/api/db/pubmlst_bordetella_seqdef/schemes
uri_path = "sequence"
response = await self._http_client.post(uri_path, json={
"sequence": sequence_string,
"partial_matches": not exact
})
sequence_response: dict = await response.json()
if "exact_matches" in sequence_response:
# loci -> list of alleles with id and loci
exact_matches: dict[str, Sequence[dict[str, str]]] = sequence_response["exact_matches"]
for allele_loci, alleles in exact_matches.items():
for allele in alleles:
alelle_id = allele["allele_id"]
yield Allele(allele_loci=allele_loci, allele_variant=alelle_id, partial_match_profile=None)
elif "partial_matches" in sequence_response:
if exact:
raise NoBIGSdbExactMatchesException(self._database_name, self._schema_id)
partial_matches: dict[str, dict[str, Union[str, float, int]]] = sequence_response["partial_matches"]
for allele_loci, partial_match in partial_matches.items():
if len(partial_match) <= 0:
continue
partial_match_profile = PartialAllelicMatchProfile(
percent_identity=float(partial_match["identity"]),
mismatches=int(partial_match["mismatches"]),
bitscore=float(partial_match["bitscore"]),
gaps=int(partial_match["gaps"])
)
yield Allele(
allele_loci=allele_loci,
allele_variant=str(partial_match["allele"]),
partial_match_profile=partial_match_profile
)
else:
raise NoBIGSdbMatchesException(self._database_name, self._schema_id)
async def fetch_mlst_st(self, alleles: Union[AsyncIterable[Allele], Iterable[Allele]]) -> MLSTProfile:
uri_path = "designations"
allele_request_dict: dict[str, list[dict[str, str]]] = defaultdict(list)
if isinstance(alleles, AsyncIterable):
async for allele in alleles:
allele_request_dict[allele.allele_loci].append({"allele": str(allele.allele_variant)})
else:
for allele in alleles:
allele_request_dict[allele.allele_loci].append({"allele": str(allele.allele_variant)})
request_json = {
"designations": allele_request_dict
}
async with self._http_client.post(uri_path, json=request_json) as response:
response_json: dict = await response.json()
allele_map: dict[str, list[Allele]] = defaultdict(list)
response_json.setdefault("fields", dict())
schema_fields_returned: dict[str, str] = response_json["fields"]
schema_fields_returned.setdefault("ST", "unknown")
schema_fields_returned.setdefault("clonal_complex", "unknown")
schema_exact_matches: dict = response_json["exact_matches"]
for exact_match_loci, exact_match_alleles in schema_exact_matches.items():
for exact_match_allele in exact_match_alleles:
allele_map[exact_match_loci].append(Allele(exact_match_loci, exact_match_allele["allele_id"], None))
if len(allele_map) == 0:
raise ValueError("Passed in no alleles.")
return MLSTProfile(dict(allele_map), schema_fields_returned["ST"], schema_fields_returned["clonal_complex"])
async def profile_string(self, string: str, exact: bool = False) -> MLSTProfile:
alleles = self.fetch_mlst_allele_variants(string, exact)
return await self.fetch_mlst_st(alleles)
async def profile_multiple_strings(self, namedStrings: AsyncIterable[NamedString], exact: bool = False, stop_on_fail: bool = False) -> AsyncGenerator[tuple[str, Union[MLSTProfile, None]], Any]:
async for named_string in namedStrings:
try:
yield (named_string.name, await self.profile_string(named_string.sequence, exact))
except NoBIGSdbMatchesException as e:
if stop_on_fail:
raise e
yield (named_string.name, None)
async def close(self):
await self._http_client.close()
async def __aexit__(self, exc_type, exc_value, traceback):
await self.close()
class BIGSdbIndex(AbstractAsyncContextManager):
KNOWN_BIGSDB_APIS = {
"https://bigsdb.pasteur.fr/api",
"https://rest.pubmlst.org"
}
def __init__(self):
self._http_client = ClientSession()
self._known_seqdef_dbs_origin: Union[Mapping[str, str], None] = None
self._seqdefdb_schemas: dict[str, Union[Mapping[str, int], None]] = dict()
super().__init__()
async def __aenter__(self):
return self
async def get_known_seqdef_dbs(self, force: bool = False) -> Mapping[str, str]:
if self._known_seqdef_dbs_origin is not None and not force:
return self._known_seqdef_dbs_origin
known_seqdef_dbs = dict()
for known_bigsdb in BIGSdbIndex.KNOWN_BIGSDB_APIS:
async with self._http_client.get(f"{known_bigsdb}/db") as response:
response_json_databases = await response.json()
for database_group in response_json_databases:
for database_info in database_group["databases"]:
if str(database_info["name"]).endswith("seqdef"):
known_seqdef_dbs[database_info["name"]] = known_bigsdb
self._known_seqdef_dbs_origin = dict(known_seqdef_dbs)
return self._known_seqdef_dbs_origin
async def get_bigsdb_api_from_seqdefdb(self, seqdef_db_name: str) -> str:
known_databases = await self.get_known_seqdef_dbs()
if seqdef_db_name not in known_databases:
raise NoSuchBIGSdbDatabaseException(seqdef_db_name)
return known_databases[seqdef_db_name]
async def get_schemas_for_seqdefdb(self, seqdef_db_name: str, force: bool = False) -> Mapping[str, int]:
if seqdef_db_name in self._seqdefdb_schemas and not force:
return self._seqdefdb_schemas[seqdef_db_name] # type: ignore since it's guaranteed to not be none by conditional
uri_path = f"{await self.get_bigsdb_api_from_seqdefdb(seqdef_db_name)}/db/{seqdef_db_name}/schemes"
async with self._http_client.get(uri_path) as response:
response_json = await response.json()
schema_descriptions: Mapping[str, int] = dict()
for scheme_definition in response_json["schemes"]:
scheme_id: int = int(str(scheme_definition["scheme"]).split("/")[-1])
scheme_desc: str = scheme_definition["description"]
schema_descriptions[scheme_desc] = scheme_id
self._seqdefdb_schemas[seqdef_db_name] = schema_descriptions
return self._seqdefdb_schemas[seqdef_db_name] # type: ignore
async def build_profiler_from_seqdefdb(self, dbseqdef_name: str, schema_id: int) -> BIGSdbMLSTProfiler:
return BIGSdbMLSTProfiler(await self.get_bigsdb_api_from_seqdefdb(dbseqdef_name), dbseqdef_name, schema_id)
async def close(self):
await self._http_client.close()
async def __aexit__(self, exc_type, exc_value, traceback):
await self.close()

View File

@@ -0,0 +1,104 @@
from dataclasses import dataclass
from numbers import Number
from typing import Mapping, Sequence, Set, Union
@dataclass(frozen=True)
class StringAnnotation:
type: str
start: int
end: int
feature_properties: Mapping[str, Set[str]]
@dataclass(frozen=True)
class NamedString:
name: str
sequence: str
@dataclass(frozen=True)
class AnnotatedString(NamedString):
annotations: Sequence[StringAnnotation]
@dataclass(frozen=True)
class SangerTraceData(NamedString):
seq_param_file_name: str
analysis_proto_settings_name: str
analysis_rpto_settings_ver: str
analysis_proto_xml_data: str
analysis_proto_xml_schema_ver: str
sample_comment: Union[None, str]
capillary_machine: bool
container_identifier: str
container_name: str
comment_title: str
channel_1: Sequence[Number]
channel_2: Sequence[Number]
channel_3: Sequence[Number]
channel_4: Sequence[Number]
measured_voltage_dv: Sequence[Number]
measured_current_ma: Sequence[Number]
measured_power_mw: Sequence[Number]
measured_temperature_celsius: Sequence[Number]
down_sample_factor: Number
dye_1: str
dye_2: str
dye_3: str
dye_4: str
dye_wavelength_1: str
dye_wavelength_2: str
dye_wavelength_3: str
dye_wavelength_4: str
dye_set_name: str
electrophoresis_voltage_setting_v: Number
start_run_event: str
stop_run_event: str
start_collection_event: str
stop_collection_event: str
base_order: Sequence[str]
gel_type_desc: str
injection_time_sec: Number
inection_voltage_v: Number
lane_or_capillary: Number
sample_tracking_id: str
length_to_detector_cm: Number
laser_power_mw: Number
instrument_name_and_serial: str
data_collection_module_file: str
model_number: str
pixels_avg_per_lane: Number
number_of_capillaries: Number
marked_off_scale_scans: Union[None, Sequence[Number]]
# Skipped Ovrl, OvrV
mobility_file: str
# Skipped PRJT, PROJ
pixel_bin_size: Number
# Skipped scan rate
results_group_comment: Union[None, str]
results_group_name: str
run_module_ver: str
run_module_xml: str
run_module_xml_ver: str
run_proto_name: str
run_proto_ver: str
run_start_date: str # Date time object
run_stop_date: str # Date time object
data_collection_start_date: str
data_collection_stop_date: str
run_name: str
run_start_time: str # time object
run_stop_time: str # time object
collection_start_time: str # time object
collection_stop_time: str # time object
saturated_data_points: Union[None, Sequence[Number]]
color_rescaling_divisor: Number
scan_count: Number
polymer_lot_expiration: str # date time object
polymer_lot_number: Number
sample_name: str
# Skipped genescan data
# Skipped size standard file name
data_collection_software_ver: str
data_collection_firmware_ver: str
run_temperature_setting_celcius: Number
well_id: str
plate_user_name: str

View File

@@ -0,0 +1,21 @@
from dataclasses import dataclass
from typing import Mapping, Sequence, Union
@dataclass(frozen=True)
class PartialAllelicMatchProfile:
percent_identity: float
mismatches: int
bitscore: float
gaps: int
@dataclass(frozen=True)
class Allele:
allele_loci: str
allele_variant: str
partial_match_profile: Union[None, PartialAllelicMatchProfile]
@dataclass(frozen=True)
class MLSTProfile:
alleles: Mapping[str, Sequence[Allele]]
sequence_type: str
clonal_complex: str

View File

@@ -0,0 +1,21 @@
from typing import Union
class BIGSDbDatabaseAPIException(Exception):
pass
class NoBIGSdbMatchesException(BIGSDbDatabaseAPIException):
def __init__(self, database_name: str, database_schema_id: int, *args):
super().__init__(f"No matches found with schema with ID {database_schema_id} in the database \"{database_name}\".", *args)
class NoBIGSdbExactMatchesException(NoBIGSdbMatchesException):
def __init__(self, database_name: str, database_schema_id: int, *args):
super().__init__(f"No exact match found with schema with ID {database_schema_id} in the database \"{database_name}\".", *args)
class NoSuchBIGSdbDatabaseException(BIGSDbDatabaseAPIException):
def __init__(self, database_name: str, *args):
super().__init__(f"No database \"{database_name}\" found.", *args)
class NoSuchBigSdbSchemaException(BIGSDbDatabaseAPIException):
def __init__(self, database_name: str, database_schema_id: int, *args):
super().__init__(f"No schema with ID {database_schema_id} in \"{database_name}\" found.", *args)