Renaming project to NSBDiagnosisToolkit

This commit is contained in:
2025-01-03 19:56:55 +00:00
parent 6ff0dca3ae
commit 2cd56ca1ec
23 changed files with 29 additions and 29 deletions

View File

@@ -0,0 +1,23 @@
from os import path
from typing import Any, AsyncGenerator, AsyncIterable, Iterable, Sequence
from nsbdiagnosistoolkit.engine.data.MLST import MLSTProfile
from nsbdiagnosistoolkit.engine.data.genomics import NamedString
from nsbdiagnosistoolkit.engine.local.abif import read_abif
from nsbdiagnosistoolkit.engine.local.fasta import read_fasta
from nsbdiagnosistoolkit.engine.remote.databases.institutpasteur.profiling import InstitutPasteurProfiler
async def aggregate_sequences(fastas: Iterable[str], abifs: Iterable[str]) -> AsyncGenerator[str, Any]:
for fasta_path in fastas:
async for fasta in read_fasta(fasta_path):
yield fasta.sequence
for abif_path in abifs:
abif_data = await read_abif(abif_path)
yield "".join(abif_data.sequence)
async def profile_all_genetic_strings(strings: AsyncIterable[str], database_name: str) -> Sequence[MLSTProfile]:
profiles = list()
async with InstitutPasteurProfiler(database_name=database_name) as profiler:
async for string in strings:
profiles.append(await profiler.profile_string(string))
return profiles

View File

@@ -0,0 +1,58 @@
import argparse
import asyncio
from os import path
from nsbdiagnosistoolkit.cli import aggregator
from nsbdiagnosistoolkit.engine.data.genomics import NamedString
from nsbdiagnosistoolkit.engine.local.abif import read_abif
from nsbdiagnosistoolkit.engine.local.csv import write_mlst_profiles_as_csv
from nsbdiagnosistoolkit.engine.local.fasta import read_fasta
parser = argparse.ArgumentParser()
parser.add_argument(
"--fasta", "-fa", "-fst",
nargs="+",
action='extend',
dest="fastas",
required=False,
default=[],
type=str,
help="The FASTA files to process. Multiple can be listed."
)
parser.add_argument(
"--abif", "-abi", "-ab1",
action='extend',
dest="abifs",
required=False,
default=[],
type=str,
help="The ABIF files to process. Multiple can be listed."
)
parser.add_argument(
"--institut-pasteur-mlst",
"-ipdbmlst",
dest="institut_pasteur_db",
type=str,
help="The Institut Pasteur MLST database to use."
)
parser.add_argument(
"-csv",
dest="csv_path",
required=False,
default=None,
help="The destination to place the CSV output."
)
def cli():
args = parser.parse_args()
gen_strings = aggregator.aggregate_sequences(args.fastas, args.abifs)
mlst_profiles = aggregator.profile_all_genetic_strings(
gen_strings, args.institut_pasteur_db)
asyncio.run(write_mlst_profiles_as_csv(
asyncio.run(mlst_profiles), str(args.csv_path)))
if __name__ == "__main__":
cli()

View File

@@ -0,0 +1,44 @@
import asyncio
from collections.abc import Set
from typing import Any, Generator, List, Sequence
from Bio.Align import PairwiseAligner
from Bio import Entrez
from Bio import SeqIO
import numpy as np
from nsbdiagnosistoolkit.engine.data.genomics import AnnotatedString, StringAnnotation
from nsbdiagnosistoolkit.engine.remote.databases.ncbi.genbank import fetch_ncbi_genbank
async def annotate_from_genbank(genbank_id: str, query_name: str, query_string: str, max_annotation_length:int = 512, gene_targets:Set = set()):
# TODO implement asynchronous alignment algorithm
reference_annotations = await fetch_ncbi_genbank(genbank_id=genbank_id)
query_annotations = list()
aligner = PairwiseAligner("blastn")
aligner.mode = "local"
for annotation in reference_annotations.annotations:
if annotation.type != "gene" or "gene" not in annotation.feature_properties:
continue
if len(gene_targets) > 0 and "gene" in annotation.feature_properties:
if not annotation.feature_properties["gene"].intersection(gene_targets):
continue
if max_annotation_length > 0 and annotation.end - annotation.start > max_annotation_length:
# TODO implement a failsafe
continue
feature_string_sequence = get_feature_coding(annotated_string=reference_annotations, string_annotation=annotation)
alignments = aligner.align(query_string, feature_string_sequence)
if len(alignments) < 1:
# TODO implement a failsafe
continue
top_alignment = sorted(alignments)[0]
# TODO Check if alternatives are better
query_annotations.append(StringAnnotation(
type=annotation.type, # same as original
start=np.min(top_alignment.aligned[0]), # We only care about the start of first chunk
end=np.max(top_alignment.aligned[0]), # and the end of the last chunk
feature_properties=dict(annotation.feature_properties) # same as original
))
return AnnotatedString(name=query_name, sequence=query_string, annotations=query_annotations)
def get_feature_coding(annotated_string: AnnotatedString, string_annotation: StringAnnotation) -> str:
return annotated_string.sequence[string_annotation.start:string_annotation.end]

View File

@@ -0,0 +1,13 @@
from dataclasses import dataclass
from typing import Mapping, Sequence
@dataclass
class Allele:
allele_loci: str
allele_variant: str
@dataclass
class MLSTProfile:
alleles: Mapping[str, Sequence[Allele]]
sequence_type: int
clonal_complex: str

View File

@@ -0,0 +1,105 @@
from dataclasses import dataclass
from numbers import Number
from typing import Mapping, Sequence, Set, Union
@dataclass
class StringAnnotation:
type: str
start: int
end: int
feature_properties: Mapping[str, Set[str]]
@dataclass
class NamedString:
name: str
sequence: str
@dataclass
class AnnotatedString(NamedString):
annotations: Sequence[StringAnnotation]
@dataclass
class SangerTraceData:
sequence: Sequence[str]
seq_param_file_name: str
analysis_proto_settings_name: str
analysis_rpto_settings_ver: str
analysis_proto_xml_data: str
analysis_proto_xml_schema_ver: str
sample_comment: Union[None, str]
capillary_machine: bool
container_identifier: str
container_name: str
comment_title: str
channel_1: Sequence[Number]
channel_2: Sequence[Number]
channel_3: Sequence[Number]
channel_4: Sequence[Number]
measured_voltage_dv: Sequence[Number]
measured_current_ma: Sequence[Number]
measured_power_mw: Sequence[Number]
measured_temperature_celsius: Sequence[Number]
down_sample_factor: Number
dye_1: str
dye_2: str
dye_3: str
dye_4: str
dye_wavelength_1: str
dye_wavelength_2: str
dye_wavelength_3: str
dye_wavelength_4: str
dye_set_name: str
electrophoresis_voltage_setting_v: Number
start_run_event: str
stop_run_event: str
start_collection_event: str
stop_collection_event: str
base_order: Sequence[str]
gel_type_desc: str
injection_time_sec: Number
inection_voltage_v: Number
lane_or_capillary: Number
sample_tracking_id: str
length_to_detector_cm: Number
laser_power_mw: Number
instrument_name_and_serial: str
data_collection_module_file: str
model_number: str
pixels_avg_per_lane: Number
number_of_capillaries: Number
marked_off_scale_scans: Union[None, Sequence[Number]]
# Skipped Ovrl, OvrV
mobility_file: str
# Skipped PRJT, PROJ
pixel_bin_size: Number
# Skipped scan rate
results_group_comment: Union[None, str]
results_group_name: str
run_module_ver: str
run_module_xml: str
run_module_xml_ver: str
run_proto_name: str
run_proto_ver: str
run_start_date: str # Date time object
run_stop_date: str # Date time object
data_collection_start_date: str
data_collection_stop_date: str
run_name: str
run_start_time: str # time object
run_stop_time: str # time object
collection_start_time: str # time object
collection_stop_time: str # time object
saturated_data_points: Union[None, Sequence[Number]]
color_rescaling_divisor: Number
scan_count: Number
polymer_lot_expiration: str # date time object
polymer_lot_number: Number
sample_name: str
# Skipped genescan data
# Skipped size standard file name
data_collection_software_ver: str
data_collection_firmware_ver: str
run_temperature_setting_celcius: Number
well_id: str
plate_user_name: str

View File

@@ -0,0 +1,104 @@
import asyncio
from numbers import Number
from os import path
from typing import Sequence, Union
from nsbdiagnosistoolkit.engine.data.genomics import SangerTraceData
from Bio.SeqRecord import SeqRecord
from Bio import SeqIO
def _biopython_read_abif_sequence(seq_path: str) -> SeqRecord:
with open(seq_path, "rb") as seq_handle:
return SeqIO.read(seq_handle, "abi")
async def read_abif(seq_path: str) -> SangerTraceData:
ext = path.splitext(seq_path)[1]
if ext.lower() != ".ab1" and ext.lower() != "abi":
raise ValueError(
'seq_path must have file extension of "ab1", or "abi".')
biopython_seq = await asyncio.to_thread(_biopython_read_abif_sequence, seq_path)
biopython_annotations = biopython_seq.annotations
# Lot of type ignoring since Biopython did not define their typing.
biopython_abif_raw = biopython_annotations["abif_raw"] # type: ignore
trace_data = SangerTraceData(
biopython_seq.seq,
biopython_abif_raw.get("APFN2"), # type: ignore
biopython_abif_raw.get("APrN1"), # type: ignore
biopython_abif_raw.get("APrV1"), # type: ignore
biopython_abif_raw.get("APrX1"), # type: ignore
biopython_abif_raw.get("APXV1"), # type: ignore
biopython_abif_raw.get("CMNT1"), # type: ignore
biopython_abif_raw.get("CpEP1"), # type: ignore
biopython_abif_raw.get("CTID1"), # type: ignore
biopython_abif_raw.get("CTNM1"), # type: ignore
biopython_abif_raw.get("CTTL1"), # type: ignore
biopython_abif_raw.get("DATA1"), # type: ignore
biopython_abif_raw.get("DATA2"), # type: ignore
biopython_abif_raw.get("DATA3"), # type: ignore
biopython_abif_raw.get("DATA4"), # type: ignore
biopython_abif_raw.get("DATA5"), # type: ignore
biopython_abif_raw.get("DATA6"), # type: ignore
biopython_abif_raw.get("DATA7"), # type: ignore
biopython_abif_raw.get("DATA8"), # type: ignore
biopython_abif_raw.get("DSam1"), # type: ignore
biopython_abif_raw.get("DyeN1"), # type: ignore
biopython_abif_raw.get("DyeN2"), # type: ignore
biopython_abif_raw.get("DyeN3"), # type: ignore
biopython_abif_raw.get("DyeN4"), # type: ignore
biopython_abif_raw.get("DyeW1"), # type: ignore
biopython_abif_raw.get("DyeW2"), # type: ignore
biopython_abif_raw.get("DyeW3"), # type: ignore
biopython_abif_raw.get("DyeW4"), # type: ignore
biopython_abif_raw.get("DySN1"), # type: ignore
biopython_abif_raw.get("EPVt1"), # type: ignore
biopython_abif_raw.get("EVNT1"), # type: ignore
biopython_abif_raw.get("EVNT2"), # type: ignore
biopython_abif_raw.get("EVNT3"), # type: ignore
biopython_abif_raw.get("EVNT4"), # type: ignore
biopython_abif_raw.get("FWO_1"), # type: ignore
biopython_abif_raw.get("GTyp1"), # type: ignore
biopython_abif_raw.get("InSc1"), # type: ignore
biopython_abif_raw.get("InVt1"), # type: ignore
biopython_abif_raw.get("LANE1"), # type: ignore
biopython_abif_raw.get("LIMS1"), # type: ignore
biopython_abif_raw.get("LNTD1"), # type: ignore
biopython_abif_raw.get("LsrP1"), # type: ignore
biopython_abif_raw.get("MCHN1"), # type: ignore
biopython_abif_raw.get("MODF1"), # type: ignore
biopython_abif_raw.get("MODL1"), # type: ignore
biopython_abif_raw.get("NAVG1"), # type: ignore
biopython_abif_raw.get("NLNE1"), # type: ignore
biopython_abif_raw.get("OfSc1"), # type: ignore
biopython_abif_raw.get("PDMF1"), # type: ignore
biopython_abif_raw.get("PXLB1"), # type: ignore
biopython_abif_raw.get("RGCm1"), # type: ignore
biopython_abif_raw.get("RGNm1"), # type: ignore
biopython_abif_raw.get("RMdV1"), # type: ignore
biopython_abif_raw.get("RMdX1"), # type: ignore
biopython_abif_raw.get("RMXV1"), # type: ignore
biopython_abif_raw.get("RPrN1"), # type: ignore
biopython_abif_raw.get("RPrV1"), # type: ignore
biopython_abif_raw.get("RUND1"), # type: ignore
biopython_abif_raw.get("RUND2"), # type: ignore
biopython_abif_raw.get("RUND3"), # type: ignore
biopython_abif_raw.get("RUND4"), # type: ignore
biopython_abif_raw.get("RunN1"), # type: ignore
biopython_abif_raw.get("RUNT1"), # type: ignore
biopython_abif_raw.get("RUNT2"), # type: ignore
biopython_abif_raw.get("RUNT3"), # type: ignore
biopython_abif_raw.get("RUNT4"), # type: ignore
biopython_abif_raw.get("Satd"), # type: ignore
biopython_abif_raw.get("Scal1"), # type: ignore
biopython_abif_raw.get("SCAN1"), # type: ignore
biopython_abif_raw.get("SMED1"), # type: ignore
biopython_abif_raw.get("SMLt"), # type: ignore
biopython_abif_raw.get("SMPL1"), # type: ignore
biopython_abif_raw.get("SVER1"), # type: ignore
biopython_abif_raw.get("SVER3"), # type: ignore
biopython_abif_raw.get("Tmpr1"), # type: ignore
biopython_abif_raw.get("TUBE"), # type: ignore
biopython_abif_raw.get("User") # type: ignore
)
return trace_data

View File

@@ -0,0 +1,31 @@
import csv
from io import TextIOWrapper
from os import PathLike
from typing import AsyncIterable, Iterable, Mapping, Sequence, Union
from nsbdiagnosistoolkit.engine.data.MLST import Allele, MLSTProfile
def loci_alleles_variants_from_loci(alleles_map: Mapping[str, Sequence[Allele]]):
result_dict: dict[str, list[str]] = {}
for loci, alleles in alleles_map.items():
result_dict[loci] = list()
for allele in alleles:
result_dict[loci].append(allele.allele_variant)
return result_dict
async def write_mlst_profiles_as_csv(mlst_profiles_iterable: Iterable[MLSTProfile], handle: Union[str, bytes, PathLike[str], PathLike[bytes]]):
mlst_profiles = list(mlst_profiles_iterable)
header = ["st", "clonal-complex", *mlst_profiles[0].alleles.keys()]
with open(handle, "w", newline='') as filehandle:
writer = csv.DictWriter(filehandle, fieldnames=header)
writer.writeheader()
for mlst_profile in mlst_profiles:
row_dictionary = {
"st": mlst_profile.sequence_type,
"clonal-complex": mlst_profile.clonal_complex,
**loci_alleles_variants_from_loci(mlst_profile.alleles)
}
writer.writerow(rowdict=row_dictionary)

View File

@@ -0,0 +1,11 @@
import asyncio
from io import TextIOWrapper
from typing import Any, AsyncGenerator, Generator, Sequence, Union
from Bio import SeqIO
from nsbdiagnosistoolkit.engine.data.genomics import NamedString
async def read_fasta(handle: Union[str, TextIOWrapper]) -> AsyncGenerator[NamedString, Any]:
fasta_sequences = asyncio.to_thread(SeqIO.parse, handle=handle, format="fasta")
for fasta_sequence in await fasta_sequences:
yield NamedString(fasta_sequence.id, str(fasta_sequence.seq))

View File

@@ -0,0 +1,62 @@
from collections import defaultdict
from contextlib import AbstractAsyncContextManager
import re
from typing import Any, AsyncGenerator, AsyncIterable, Generator, Iterable, Sequence, Union
from aiohttp import ClientSession, ClientTimeout
from nsbdiagnosistoolkit.engine.data.MLST import Allele, MLSTProfile
from nsbdiagnosistoolkit.engine.data.genomics import NamedString
class InstitutPasteurProfiler(AbstractAsyncContextManager):
async def __aenter__(self):
return self
def __init__(self, database_name: str):
self._base_url = f"https://bigsdb.pasteur.fr/api/db/{database_name}/"
self._http_client = ClientSession(self._base_url, timeout=ClientTimeout(10000))
async def fetch_mlst_allele_variants(self, sequence_string: str) -> AsyncGenerator[Allele, Any]:
# See https://bigsdb.pasteur.fr/api/db/pubmlst_bordetella_seqdef/schemes
uri_path = "schemes/3/sequence"
response = await self._http_client.post(uri_path, json={
"sequence": sequence_string
})
sequence_response: dict = await response.json()
exact_matches: dict[str, Sequence[dict[str, str]]] = sequence_response["exact_matches"]
for allele_loci, alleles in exact_matches.items():
for allele in alleles:
alelle_id = allele["allele_id"]
yield Allele(allele_loci=allele_loci, allele_variant=alelle_id)
async def fetch_mlst_st(self, alleles: Union[AsyncIterable[Allele], Iterable[Allele]]) -> MLSTProfile:
uri_path = "schemes/3/designations"
allele_request_dict: dict[str, list[dict[str, str]]] = defaultdict(list)
if isinstance(alleles, AsyncIterable):
async for allele in alleles:
allele_request_dict[allele.allele_loci].append({"allele": str(allele.allele_variant)})
else:
for allele in alleles:
allele_request_dict[allele.allele_loci].append({"allele": str(allele.allele_variant)})
response = await self._http_client.post(uri_path, json={
"designations": allele_request_dict
})
response_json = await response.json()
schema_fields_returned = response_json["fields"]
schema_exact_matches = response_json["exact_matches"]
allele_map: dict[str, list[Allele]] = defaultdict(list)
for exact_match_loci, exact_match_alleles in schema_exact_matches.items():
for exact_match_allele in exact_match_alleles:
allele_map[exact_match_loci].append(Allele(exact_match_loci, exact_match_allele["allele_id"]))
return MLSTProfile(allele_map, schema_fields_returned["ST"], schema_fields_returned["clonal_complex"])
async def profile_string(self, string: str) -> MLSTProfile:
alleles = self.fetch_mlst_allele_variants(string)
return await self.fetch_mlst_st(alleles)
async def close(self):
await self._http_client.close()
async def __aexit__(self, exc_type, exc_value, traceback):
await self.close()

View File

@@ -0,0 +1,27 @@
import asyncio
from Bio import Entrez
from Bio import SeqIO
# TODO Change this out for a more professional approach
Entrez.email = "yunyangdeng@outlook.com"
from nsbdiagnosistoolkit.engine.data.genomics import AnnotatedString, StringAnnotation
async def fetch_ncbi_genbank(genbank_id: str) -> AnnotatedString:
with (await asyncio.to_thread(Entrez.efetch, db="nucleotide", id=genbank_id, rettype="gb", retmode="text")) as fetch_stream:
record = SeqIO.read(fetch_stream, "genbank")
sequence_features = list()
for feature in record.features:
start = int(feature.location.start)
end = int(feature.location.end)
qualifiers = feature.qualifiers
for qualifier_key in qualifiers:
qualifiers[qualifier_key] = set(qualifiers[qualifier_key])
sequence_features.append(StringAnnotation(
type=feature.type,
start=start,
end=end+1, # Position is exclusive
feature_properties=qualifiers
))
return AnnotatedString(name=genbank_id, sequence=str(record.seq), annotations=sequence_features)

View File

@@ -0,0 +1,5 @@
import logging
from aiohttp import web
webapp = web.Application(logger=logging.getLogger(__name__))
routes = web.RouteTableDef