restructured MLST profiling classes

This commit is contained in:
Harrison Deng 2025-01-08 16:24:42 +00:00
parent e93296705d
commit c3a492fa8f
9 changed files with 138 additions and 11 deletions

View File

@ -4,7 +4,7 @@ from automlst.engine.data.MLST import MLSTProfile
from automlst.engine.data.genomics import NamedString
from automlst.engine.local.abif import read_abif
from automlst.engine.local.fasta import read_fasta
from automlst.engine.remote.databases.institutpasteur.profiling import InstitutPasteurProfiler
from automlst.engine.remote.databases.institutpasteur.mlst import InstitutPasteurProfiler
async def aggregate_sequences(fastas: Iterable[str], abifs: Iterable[str]) -> AsyncGenerator[str, Any]:

View File

@ -4,7 +4,7 @@ import datetime
from os import path
import os
from automlst.cli import aggregator
from automlst.cli import aggregated
from automlst.engine.data.genomics import NamedString
from automlst.engine.local.abif import read_abif
from automlst.engine.local.csv import write_mlst_profiles_as_csv
@ -39,6 +39,22 @@ parser.add_argument(
type=str,
help="The ABIF files to process. Multiple can be listed."
)
parser.add_argument(
"--ncbi-assembly-reference", "-refncbi",
dest="ncbi_assembly_reference",
required=False,
default=None,
type=str,
help="The NCBI GenBank accession ID for the consensus assembly. Either this argument, or the path equivalent must be given if ABIF files are used."
)
parser.add_argument(
"--assembly-reference", "-ref",
dest="assembly_reference",
required=False,
default=None,
type=str,
help="The path to the FASTA sequence to be used as a reference for consensus building."
)
parser.add_argument(
"--institut-pasteur-mlst",
"-ipdbmlst",
@ -51,16 +67,16 @@ parser.add_argument(
parser.add_argument(
"out",
default="./.",
help="The output folder. Files will be named by the provided (or default) run name."
help="The output folder. Files will be named by the provided (or default) run name. Outputs will be automatically generated depending on which arguments are used."
)
def cli():
args = parser.parse_args()
gen_strings = aggregator.aggregate_sequences(args.fastas, args.abifs)
gen_strings = aggregated.aggregate_sequences(args.fastas, args.abifs)
os.makedirs(args.out, exist_ok=True)
if args.institut_pasteur_db is not None:
mlst_profiles = aggregator.profile_all_genetic_strings(
mlst_profiles = aggregated.profile_all_genetic_strings(
gen_strings, args.institut_pasteur_db)
asyncio.run(write_mlst_profiles_as_csv(
asyncio.run(mlst_profiles), str(path.join(args.out, "MLST_" + args.run_name + ".csv"))))

View File

@ -20,8 +20,7 @@ class AnnotatedString(NamedString):
annotations: Sequence[StringAnnotation]
@dataclass
class SangerTraceData:
sequence: Sequence[str]
class SangerTraceData(NamedString):
seq_param_file_name: str
analysis_proto_settings_name: str
analysis_rpto_settings_ver: str

View File

@ -1,10 +1,10 @@
import asyncio
from numbers import Number
from os import path
from typing import Sequence, Union
from automlst.engine.data.genomics import SangerTraceData
from typing import AsyncGenerator, Collection, Sequence, Union
from automlst.engine.data.genomics import NamedString, SangerTraceData
from Bio.SeqRecord import SeqRecord
from Bio import SeqIO
from Bio import SeqIO, Align
def _biopython_read_abif_sequence(seq_path: str) -> SeqRecord:
@ -23,6 +23,7 @@ async def read_abif(seq_path: str) -> SangerTraceData:
# Lot of type ignoring since Biopython did not define their typing.
biopython_abif_raw = biopython_annotations["abif_raw"] # type: ignore
trace_data = SangerTraceData(
path.basename(seq_path),
biopython_seq.seq,
biopython_abif_raw.get("APFN2"), # type: ignore
biopython_abif_raw.get("APrN1"), # type: ignore
@ -102,3 +103,13 @@ async def read_abif(seq_path: str) -> SangerTraceData:
biopython_abif_raw.get("User") # type: ignore
)
return trace_data
def _biopython_local_pairwise_alignment(reference: NamedString, query: NamedString) -> tuple[NamedString, NamedString]:
aligner = Align.PairwiseAligner(scoring="blastn")
aligner.mode = "local"
alignment_result = sorted(aligner.align(reference.sequence, query.sequence))[0] # take the best alignment
return NamedString(alignment_result.sequences[0].id, alignment_result.sequences[0].seq), NamedString(alignment_result.sequences[1].id, alignment_result.sequences[1].seq)
async def reference_consensus_assembly(reference: NamedString, sanger_traces: Collection[SangerTraceData]) -> AsyncGenerator[NamedString]:
for sanger_trace in sanger_traces:
yield (await asyncio.to_thread(_biopython_local_pairwise_alignment, reference, sanger_trace))[1]

View File

@ -0,0 +1,33 @@
from abc import abstractmethod
from contextlib import AbstractAsyncContextManager
from typing import AsyncGenerator, AsyncIterable, Generator, Iterable, Mapping, Union
from aiohttp import ClientSession
from automlst.engine.data.MLST import Allele, MLSTProfile
MLST_DATABASES = [
"https://bigsdb.pasteur.fr/api/db",
"https://rest.pubmlst.org/db"
]
class MLSTProfiler(AbstractAsyncContextManager):
@abstractmethod
def fetch_mlst_allele_variants(self, schema_id: int, sequence_string: str) -> AsyncGenerator[Allele]:
pass
@abstractmethod
async def fetch_mlst_st(self, schema_id: int, alleles: AsyncIterable[Allele]) -> MLSTProfile:
pass
@abstractmethod
async def profile_string(self, schema_id: int, string: str) -> MLSTProfile:
pass
@abstractmethod
async def close(self):
pass
@abstractmethod
async def get_scheme_ids(self) -> Mapping[str, int]:
pass

View File

@ -0,0 +1,68 @@
from collections import defaultdict
from contextlib import AbstractAsyncContextManager
import re
from typing import Any, AsyncGenerator, AsyncIterable, Generator, Iterable, Mapping, Sequence, Union
from aiohttp import ClientSession, ClientTimeout
from automlst.engine.data.MLST import Allele, MLSTProfile
from automlst.engine.data.genomics import NamedString
from automlst.engine.remote.databases.mlst import MLSTProfiler
class PubMLSTProfiler(MLSTProfiler):
async def __aenter__(self):
return self
def __init__(self, database_name: str):
self._base_url = f"https://rest.pubmlst.org/db/{database_name}/"
self._http_client = ClientSession(self._base_url, timeout=ClientTimeout(10000))
async def fetch_mlst_allele_variants(self, schema_id: int, sequence_string: str) -> AsyncGenerator[Allele]:
uri_path = f"schemes/{schema_id}/sequence"
response = await self._http_client.post(uri_path, json={
"sequence": sequence_string
})
sequence_response: dict = await response.json()
exact_matches: dict[str, Sequence[dict[str, str]]] = sequence_response["exact_matches"]
for allele_loci, alleles in exact_matches.items():
for allele in alleles:
alelle_id = allele["allele_id"]
yield Allele(allele_loci=allele_loci, allele_variant=alelle_id)
async def fetch_mlst_st(self, schema_id: int, alleles: AsyncIterable[Allele]) -> MLSTProfile:
uri_path = f"schemes/{schema_id}/designations"
allele_request_dict: dict[str, list[dict[str, str]]] = defaultdict(list)
async for allele in alleles:
allele_request_dict[allele.allele_loci].append({"allele": str(allele.allele_variant)})
response = await self._http_client.post(uri_path, json={
"designations": allele_request_dict
})
response_json = await response.json()
schema_fields_returned = response_json["fields"]
schema_exact_matches = response_json["exact_matches"]
allele_map: dict[str, list[Allele]] = defaultdict(list)
for exact_match_loci, exact_match_alleles in schema_exact_matches.items():
for exact_match_allele in exact_match_alleles:
allele_map[exact_match_loci].append(Allele(exact_match_loci, exact_match_allele["allele_id"]))
return MLSTProfile(allele_map, schema_fields_returned["ST"], schema_fields_returned["clonal_complex"])
async def profile_string(self, schema_id: int, string: str) -> MLSTProfile:
alleles = self.fetch_mlst_allele_variants(schema_id, string)
return await self.fetch_mlst_st(schema_id, alleles)
async def get_scheme_ids(self) -> Mapping[str, int]:
uri_path = "schemes"
response = await self._http_client.get(uri_path)
response_json = await response.json()
schema_descriptions: Mapping[str, int] = dict()
for scheme_definition in response_json["schemes"]:
scheme_id: int = int(str(scheme_definition["scheme"]).split("/")[-1])
scheme_desc: str = scheme_definition["description"]
schema_descriptions[scheme_desc] = scheme_id
return schema_descriptions
async def close(self):
await self._http_client.close()
async def __aexit__(self, exc_type, exc_value, traceback):
await self.close()

View File

@ -1,6 +1,6 @@
from Bio import SeqIO
from automlst.engine.data.MLST import Allele, MLSTProfile
from automlst.engine.remote.databases.institutpasteur.profiling import InstitutPasteurProfiler
from automlst.engine.remote.databases.institutpasteur.mlst import InstitutPasteurProfiler
async def test_profiling_results_in_exact_matches_when_exact():