5 Commits

5 changed files with 9 additions and 282 deletions

View File

@@ -1,70 +0,0 @@
import asyncio
from concurrent.futures import Future, ThreadPoolExecutor
from contextlib import AbstractContextManager
from typing import Any, Set, Union
from Bio.Align import PairwiseAligner
from queue import Queue
from autobigs.engine.structures.alignment import AlignmentStats, PairwiseAlignment
class AsyncBiopythonPairwiseAlignmentEngine(AbstractContextManager):
def __enter__(self):
self._thread_pool = ThreadPoolExecutor(self._max_threads, thread_name_prefix="async-pairwise-alignment")
return self
def __init__(self, aligner: PairwiseAligner, max_threads: int = 4):
self._max_threads = max_threads
self._aligner = aligner
self._work_left: Set[Future] = set()
self._work_complete: Queue[Future] = Queue()
def align(self, reference: str, query: str, **associated_data):
work = self._thread_pool.submit(
self.work, reference, query, **associated_data)
work.add_done_callback(self._on_complete)
self._work_left.add(work)
def _on_complete(self, future: Future):
self._work_left.remove(future)
self._work_complete.put(future)
def work(self, reference, query, **associated_data):
alignments = self._aligner.align(reference, query)
top_alignment = alignments[0]
top_alignment_stats = top_alignment.counts()
top_alignment_gaps = top_alignment_stats.gaps
top_alignment_identities = top_alignment_stats.identities
top_alignment_mismatches = top_alignment_stats.mismatches
top_alignment_score = top_alignment.score # type: ignore
return PairwiseAlignment(
top_alignment.sequences[0],
top_alignment.sequences[1],
tuple(top_alignment.indices[0]),
tuple(top_alignment.indices[1]),
AlignmentStats(
percent_identity=top_alignment_identities/top_alignment.length,
mismatches=top_alignment_mismatches,
gaps=top_alignment_gaps,
match_metric=top_alignment_score
)), associated_data
async def next_completed(self) -> Union[tuple[PairwiseAlignment, dict[str, Any]], None]:
if self._work_complete.empty() and len(self._work_left):
return None
completed_alignment = await asyncio.wrap_future(self._work_complete.get())
return completed_alignment
def __exit__(self, exc_type, exc_value, traceback):
self.shutdown()
def __aiter__(self):
return self
async def __anext__(self):
result = await self.next_completed()
if result is None:
raise StopAsyncIteration
return result
def shutdown(self):
self._thread_pool.shutdown(wait=True, cancel_futures=True)

View File

@@ -139,141 +139,6 @@ class RemoteBIGSdbMLSTProfiler(BIGSdbMLSTProfiler):
async def __aexit__(self, exc_type, exc_value, traceback): async def __aexit__(self, exc_type, exc_value, traceback):
await self.close() await self.close()
class LocalBIGSdbMLSTProfiler(BIGSdbMLSTProfiler):
async def __aenter__(self):
if self._prepare:
await self.update_scheme_locis()
await asyncio.gather(
self.download_alleles_cache_data(),
self.download_scheme_profiles()
)
await self.load_scheme_profiles()
return self
def __init__(self, database_api: str, database_name: str, schema_id: int, cache_path: Union[str, None] = None, prepare: bool =True):
self._database_api = database_api
self._database_name = database_name
self._schema_id = schema_id
self._base_url = f"{self._database_api}/db/{self._database_name}/schemes/{self._schema_id}/"
self._http_client = ClientSession(self._base_url, timeout=ClientTimeout(60))
if cache_path is None:
self._cache_path = tempfile.mkdtemp("BIGSdb")
self._cleanup_required = True
else:
self._cache_path = cache_path
self._cleanup_required = False
self._loci: list[str] = []
self._profiles_st_map = {}
self._prepare = prepare
async def update_scheme_locis(self):
self._loci.clear()
async with self._http_client.get(f"/api/db/{self._database_name}/schemes/{self._schema_id}") as schema_response:
schema_json = await schema_response.json()
for locus in schema_json["loci"]:
locus_name = path.basename(locus)
self._loci.append(locus_name)
self._loci.sort()
async def load_scheme_profiles(self):
self._profiles_st_map.clear()
with open(self.get_scheme_profile_path()) as profile_cache_handle:
reader = csv.DictReader(profile_cache_handle, delimiter="\t")
for line in reader:
alleles = []
for locus in self._loci:
alleles.append(line[locus])
self._profiles_st_map[tuple(alleles)] = (line["ST"], line["clonal_complex"])
def get_locus_cache_path(self, locus) -> str:
return path.join(self._cache_path, locus + "." + "fasta")
def get_scheme_profile_path(self):
return path.join(self._cache_path, "profiles.csv")
async def download_alleles_cache_data(self):
for locus in self._loci:
with open(self.get_locus_cache_path(locus), "wb") as fasta_handle:
async with self._http_client.get(f"/api/db/{self._database_name}/loci/{locus}/alleles_fasta") as fasta_response:
async for chunk, eof in fasta_response.content.iter_chunks():
fasta_handle.write(chunk)
async def download_scheme_profiles(self):
with open(self.get_scheme_profile_path(), "wb") as profile_cache_handle:
async with self._http_client.get("profiles_csv") as profiles_response:
async for chunk, eof in profiles_response.content.iter_chunks():
profile_cache_handle.write(chunk)
await self.load_scheme_profiles()
async def determine_mlst_allele_variants(self, query_sequence_strings: Iterable[str]) -> AsyncGenerator[Allele, Any]:
aligner = PairwiseAligner("blastn")
aligner.mode = "local"
with AsyncBiopythonPairwiseAlignmentEngine(aligner, max_threads=4) as aligner_engine:
for query_sequence_string in query_sequence_strings:
for locus in self._loci:
async for allele_variant in read_fasta(self.get_locus_cache_path(locus)):
aligner_engine.align(allele_variant.sequence, query_sequence_string, variant_name=allele_variant.name, full=True)
break # start a bunch of full alignments for each variant to select segments
alignment_rankings: dict[str, set[tuple[PairwiseAlignment, str]]] = defaultdict(set)
async for alignment_result, additional_information in aligner_engine:
result_variant_name = additional_information["variant_name"]
result_locus, variant_id = result_variant_name.split("_")
full_alignment = additional_information["full"]
if full_alignment:
if alignment_result.alignment_stats.gaps == 0 and alignment_result.alignment_stats.mismatches == 0:
# I.e., 100% exactly the same
yield Allele(result_locus, variant_id, None)
continue
else:
alignment_rankings[result_locus].add((alignment_result, variant_id))
interest_sequence = full_alignment[alignment_result.query_indices[0]:alignment_result.query_indices[-1]]
async for allele_variant in read_fasta(self.get_locus_cache_path(result_locus)):
if result_variant_name == allele_variant.name:
continue # Skip if we just finished aligning this
aligner_engine.align(allele_variant.sequence, interest_sequence, variant_name=result_variant_name.name, full=False)
else:
alignment_rankings[result_locus].add((alignment_result, variant_id))
for final_locus, alignments in alignment_rankings.items():
closest_alignment, closest_variant_id = sorted(alignments, key=lambda index: index[0].alignment_stats.match_metric)[0]
yield Allele(final_locus, closest_variant_id, closest_alignment.alignment_stats)
async def determine_mlst_st(self, alleles):
allele_variants: dict[str, Allele] = {}
if isinstance(alleles, AsyncIterable):
async for allele in alleles:
allele_variants[allele.allele_locus] = allele
else:
for allele in alleles:
allele_variants[allele.allele_locus] = allele
ordered_profile = []
for locus in self._loci:
ordered_profile.append(allele_variants[locus].allele_variant)
st, clonal_complex = self._profiles_st_map[tuple(ordered_profile)]
return MLSTProfile(set(allele_variants.values()), st, clonal_complex)
async def profile_string(self, query_sequence_strings: Iterable[str]) -> MLSTProfile:
alleles = self.determine_mlst_allele_variants(query_sequence_strings)
return await self.determine_mlst_st(alleles)
async def profile_multiple_strings(self, query_named_string_groups: AsyncIterable[Iterable[NamedString]], stop_on_fail: bool = False) -> AsyncGenerator[NamedMLSTProfile, Any]:
async for named_strings in query_named_string_groups:
for named_string in named_strings:
try:
yield NamedMLSTProfile(named_string.name, await self.profile_string([named_string.sequence]))
except NoBIGSdbMatchesException as e:
if stop_on_fail:
raise e
yield NamedMLSTProfile(named_string.name, None)
async def close(self):
await self._http_client.close()
if self._cleanup_required:
shutil.rmtree(self._cache_path)
async def __aexit__(self, exc_type, exc_value, traceback):
await self.close()
class BIGSdbIndex(AbstractAsyncContextManager): class BIGSdbIndex(AbstractAsyncContextManager):
KNOWN_BIGSDB_APIS = { KNOWN_BIGSDB_APIS = {
"https://bigsdb.pasteur.fr/api", "https://bigsdb.pasteur.fr/api",
@@ -334,5 +199,5 @@ class BIGSdbIndex(AbstractAsyncContextManager):
def get_BIGSdb_MLST_profiler(local: bool, database_api: str, database_name: str, schema_id: int): def get_BIGSdb_MLST_profiler(local: bool, database_api: str, database_name: str, schema_id: int):
if local: if local:
return LocalBIGSdbMLSTProfiler(database_api=database_api, database_name=database_name, schema_id=schema_id) raise NotImplementedError()
return RemoteBIGSdbMLSTProfiler(database_api=database_api, database_name=database_name, schema_id=schema_id) return RemoteBIGSdbMLSTProfiler(database_api=database_api, database_name=database_name, schema_id=schema_id)

View File

@@ -1,26 +0,0 @@
import asyncio
from contextlib import AbstractAsyncContextManager
import tempfile
from typing import Iterable, Union
from Bio import Entrez
from Bio import SeqIO
from autobigs.engine.structures.genomics import AnnotatedString, StringAnnotation
async def fetch_ncbi_genbank(genbank_id: str) -> AnnotatedString:
with (await asyncio.to_thread(Entrez.efetch, db="nucleotide", id=genbank_id, rettype="gb", retmode="text")) as fetch_stream:
record = SeqIO.read(fetch_stream, "genbank")
sequence_features = list()
for feature in record.features:
start = int(feature.location.start)
end = int(feature.location.end)
qualifiers = feature.qualifiers
for qualifier_key in qualifiers:
qualifiers[qualifier_key] = set(qualifiers[qualifier_key])
sequence_features.append(StringAnnotation(
type=feature.type,
start=start,
end=end+1, # Position is exclusive
feature_properties=qualifiers
))
return AnnotatedString(name=genbank_id, sequence=str(record.seq), annotations=sequence_features)

View File

@@ -1,42 +0,0 @@
from Bio import SeqIO
from Bio.Align import PairwiseAligner
from pytest import mark
from pytest import fixture
from autobigs.engine.analysis.aligners import AsyncBiopythonPairwiseAlignmentEngine
from autobigs.engine.structures.alignment import PairwiseAlignment
@fixture
def tohamaI_bpertussis_adk():
return str(SeqIO.read("tests/resources/tohama_I_bpertussis_adk.fasta", format="fasta").seq)
@fixture
def tohamaI_bpertussis_genome():
return str(SeqIO.read("tests/resources/tohama_I_bpertussis.fasta", format="fasta").seq)
@fixture
def fdaargos_1560_hinfluenza_adk():
return str(SeqIO.read("tests/resources/fdaargos_1560_hinfluenza_adk.fasta", format="fasta").seq)
@fixture
def fdaargos_1560_hinfluenza_genome():
return str(SeqIO.read("tests/resources/fdaargos_1560_hinfluenza.fasta", format="fasta").seq)
@fixture(params=[1, 2])
def dummy_engine(request):
aligner = PairwiseAligner("blastn")
aligner.mode = "local"
with AsyncBiopythonPairwiseAlignmentEngine(aligner, request.param) as engine:
yield engine
class TestAsyncPairwiseAlignmentEngine:
async def test_single_alignment_no_errors_single_alignment(self, tohamaI_bpertussis_genome, tohamaI_bpertussis_adk: str, dummy_engine: AsyncBiopythonPairwiseAlignmentEngine):
dummy_engine.align(tohamaI_bpertussis_genome, tohamaI_bpertussis_adk)
async for alignment, additional_information in dummy_engine:
assert isinstance(alignment, PairwiseAlignment)
async def test_single_alignment_no_errors_multiple(self, tohamaI_bpertussis_genome, tohamaI_bpertussis_adk, fdaargos_1560_hinfluenza_genome, fdaargos_1560_hinfluenza_adk, dummy_engine: AsyncBiopythonPairwiseAlignmentEngine):
dummy_engine.align(tohamaI_bpertussis_genome, tohamaI_bpertussis_adk)
dummy_engine.align(fdaargos_1560_hinfluenza_genome, fdaargos_1560_hinfluenza_adk)
async for alignment, additional_information in dummy_engine:
assert isinstance(alignment, PairwiseAlignment)

View File

@@ -9,7 +9,7 @@ from autobigs.engine.structures import mlst
from autobigs.engine.structures.genomics import NamedString from autobigs.engine.structures.genomics import NamedString
from autobigs.engine.structures.mlst import Allele, MLSTProfile from autobigs.engine.structures.mlst import Allele, MLSTProfile
from autobigs.engine.exceptions.database import NoBIGSdbExactMatchesException, NoBIGSdbMatchesException from autobigs.engine.exceptions.database import NoBIGSdbExactMatchesException, NoBIGSdbMatchesException
from autobigs.engine.analysis.bigsdb import BIGSdbIndex, BIGSdbMLSTProfiler, LocalBIGSdbMLSTProfiler, RemoteBIGSdbMLSTProfiler from autobigs.engine.analysis.bigsdb import BIGSdbIndex, BIGSdbMLSTProfiler, RemoteBIGSdbMLSTProfiler
async def generate_async_iterable(normal_iterable): async def generate_async_iterable(normal_iterable):
for dummy_sequence in normal_iterable: for dummy_sequence in normal_iterable:
@@ -61,12 +61,12 @@ hinfluenzae_fdaargos_profile = MLSTProfile((
), "3", "ST-3 complex") ), "3", "ST-3 complex")
hinfluenzae_fdaargos_bad_profile = MLSTProfile(( hinfluenzae_fdaargos_bad_profile = MLSTProfile((
Allele("adk", "1", None), Allele("adk", "3", None),
Allele("atpG", "1", None), Allele("atpG", "121", None),
Allele("frdB", "1", None), Allele("frdB", "6", None),
Allele("fucK", "1", None), Allele("fucK", "5", None),
Allele("mdh", "1", None), Allele("mdh", "12", None),
Allele("pgi", "1", None), Allele("pgi", "4", None),
Allele("recA", "5", None) Allele("recA", "5", None)
), "3", "ST-3 complex") ), "3", "ST-3 complex")
@@ -76,7 +76,7 @@ hinfluenzae_fdaargos_fragmented_sequence = tuple(SeqIO.parse("tests/resources/to
@pytest.mark.parametrize("local_db,database_api,database_name,schema_id,seq_path,feature_seqs_path,expected_profile,bad_profile", [ @pytest.mark.parametrize("local_db,database_api,database_name,schema_id,seq_path,feature_seqs_path,expected_profile,bad_profile", [
(False, "https://bigsdb.pasteur.fr/api", "pubmlst_bordetella_seqdef", 3, "tohama_I_bpertussis.fasta", "tohama_I_bpertussis_features.fasta", bpertussis_tohamaI_profile, bpertussis_tohamaI_bad_profile), (False, "https://bigsdb.pasteur.fr/api", "pubmlst_bordetella_seqdef", 3, "tohama_I_bpertussis.fasta", "tohama_I_bpertussis_features.fasta", bpertussis_tohamaI_profile, bpertussis_tohamaI_bad_profile),
(True, "https://bigsdb.pasteur.fr/api", "pubmlst_bordetella_seqdef", 3, "tohama_I_bpertussis.fasta", "tohama_I_bpertussis_features.fasta", bpertussis_tohamaI_profile, bpertussis_tohamaI_bad_profile), (False, "https://rest.pubmlst.org", "pubmlst_hinfluenzae_seqdef", 1, "fdaargos_1560_hinfluenza.fasta", "fdaargos_1560_hinfluenza_features.fasta", hinfluenzae_fdaargos_profile, hinfluenzae_fdaargos_bad_profile),
]) ])
class TestBIGSdbMLSTProfiler: class TestBIGSdbMLSTProfiler:
async def test_profiling_results_in_exact_matches_when_exact(self, local_db, database_api, database_name, schema_id, seq_path: str, feature_seqs_path: str, expected_profile: MLSTProfile, bad_profile: MLSTProfile): async def test_profiling_results_in_exact_matches_when_exact(self, local_db, database_api, database_name, schema_id, seq_path: str, feature_seqs_path: str, expected_profile: MLSTProfile, bad_profile: MLSTProfile):