211 lines
12 KiB
Python
211 lines
12 KiB
Python
|
from os import path
|
||
|
import random
|
||
|
import re
|
||
|
from typing import Callable, Collection, Sequence, Union
|
||
|
from Bio import SeqIO
|
||
|
import pytest
|
||
|
from autobigs.engine.analysis import bigsdb
|
||
|
from autobigs.engine.structures import mlst
|
||
|
from autobigs.engine.structures.genomics import NamedString
|
||
|
from autobigs.engine.structures.mlst import Allele, MLSTProfile
|
||
|
from autobigs.engine.exceptions.database import NoBIGSdbExactMatchesException, NoBIGSdbMatchesException
|
||
|
from autobigs.engine.analysis.bigsdb import BIGSdbIndex, BIGSdbMLSTProfiler, LocalBIGSdbMLSTProfiler, RemoteBIGSdbMLSTProfiler
|
||
|
|
||
|
async def generate_async_iterable(normal_iterable):
|
||
|
for dummy_sequence in normal_iterable:
|
||
|
yield dummy_sequence
|
||
|
|
||
|
def gene_scrambler(gene: str, mutation_site_count: Union[int, float], alphabet: Sequence[str] = ["A", "T", "C", "G"]):
|
||
|
rand = random.Random(gene)
|
||
|
if isinstance(mutation_site_count, float):
|
||
|
mutation_site_count = int(mutation_site_count * len(gene))
|
||
|
random_locations = rand.choices(range(len(gene)), k=mutation_site_count)
|
||
|
scrambled = list(gene)
|
||
|
for random_location in random_locations:
|
||
|
scrambled[random_location] = rand.choice(alphabet)
|
||
|
return "".join(scrambled)
|
||
|
|
||
|
def get_first_sequence_from_fasta(resource: str):
|
||
|
return str(SeqIO.read(path.join("tests/resources/", resource), "fasta").seq)
|
||
|
|
||
|
def get_multiple_sequences_from_fasta(resource: str):
|
||
|
return tuple(SeqIO.parse(path.join("tests/resources/", resource), "fasta"))
|
||
|
|
||
|
bpertussis_tohamaI_profile = MLSTProfile((
|
||
|
Allele("adk", "1", None),
|
||
|
Allele("fumC", "1", None),
|
||
|
Allele("glyA", "1", None),
|
||
|
Allele("tyrB", "1", None),
|
||
|
Allele("icd", "1", None),
|
||
|
Allele("pepA", "1", None),
|
||
|
Allele("pgm", "1", None)), "1", "ST-2 complex")
|
||
|
|
||
|
bpertussis_tohamaI_bad_profile = MLSTProfile((
|
||
|
Allele("adk", "1", None),
|
||
|
Allele("fumC", "2", None),
|
||
|
Allele("glyA", "36", None),
|
||
|
Allele("tyrB", "4", None),
|
||
|
Allele("icd", "4", None),
|
||
|
Allele("pepA", "1", None),
|
||
|
Allele("pgm", "5", None),
|
||
|
), "unknown", "unknown")
|
||
|
|
||
|
hinfluenzae_fdaargos_profile = MLSTProfile((
|
||
|
Allele("adk", "1", None),
|
||
|
Allele("atpG", "1", None),
|
||
|
Allele("frdB", "1", None),
|
||
|
Allele("fucK", "1", None),
|
||
|
Allele("mdh", "1", None),
|
||
|
Allele("pgi", "1", None),
|
||
|
Allele("recA", "5", None)
|
||
|
), "3", "ST-3 complex")
|
||
|
|
||
|
hinfluenzae_fdaargos_bad_profile = MLSTProfile((
|
||
|
Allele("adk", "1", None),
|
||
|
Allele("atpG", "1", None),
|
||
|
Allele("frdB", "1", None),
|
||
|
Allele("fucK", "1", None),
|
||
|
Allele("mdh", "1", None),
|
||
|
Allele("pgi", "1", None),
|
||
|
Allele("recA", "5", None)
|
||
|
), "3", "ST-3 complex")
|
||
|
|
||
|
hinfluenzae_fdaargos_sequence = str(SeqIO.read("tests/resources/fdaargos_1560_hinfluenza.fasta", "fasta").seq)
|
||
|
|
||
|
hinfluenzae_fdaargos_fragmented_sequence = tuple(SeqIO.parse("tests/resources/tohama_I_bpertussis_features.fasta", "fasta"))
|
||
|
|
||
|
@pytest.mark.parametrize("local_db,database_api,database_name,schema_id,seq_path,feature_seqs_path,expected_profile,bad_profile", [
|
||
|
(False, "https://bigsdb.pasteur.fr/api", "pubmlst_bordetella_seqdef", 3, "tohama_I_bpertussis.fasta", "tohama_I_bpertussis_features.fasta", bpertussis_tohamaI_profile, bpertussis_tohamaI_bad_profile),
|
||
|
(True, "https://bigsdb.pasteur.fr/api", "pubmlst_bordetella_seqdef", 3, "tohama_I_bpertussis.fasta", "tohama_I_bpertussis_features.fasta", bpertussis_tohamaI_profile, bpertussis_tohamaI_bad_profile),
|
||
|
])
|
||
|
class TestBIGSdbMLSTProfiler:
|
||
|
async def test_profiling_results_in_exact_matches_when_exact(self, local_db, database_api, database_name, schema_id, seq_path: str, feature_seqs_path: str, expected_profile: MLSTProfile, bad_profile: MLSTProfile):
|
||
|
sequence = get_first_sequence_from_fasta(seq_path)
|
||
|
async with bigsdb.get_BIGSdb_MLST_profiler(local_db, database_api, database_name, schema_id) as dummy_profiler:
|
||
|
expected_alleles = mlst.alleles_to_mapping(expected_profile.alleles)
|
||
|
targets_left = set(mlst.alleles_to_mapping(expected_profile.alleles).keys())
|
||
|
async for exact_match in dummy_profiler.determine_mlst_allele_variants(query_sequence_strings=[sequence]):
|
||
|
assert isinstance(exact_match, Allele)
|
||
|
assert exact_match.allele_locus in expected_alleles
|
||
|
assert exact_match.allele_variant == expected_alleles[exact_match.allele_locus]
|
||
|
targets_left.remove(exact_match.allele_locus)
|
||
|
|
||
|
assert len(targets_left) == 0
|
||
|
|
||
|
async def test_sequence_profiling_non_exact_returns_non_exact(self, local_db, database_api, database_name, schema_id, seq_path: str, feature_seqs_path: str, expected_profile: MLSTProfile, bad_profile: MLSTProfile):
|
||
|
target_sequences = get_multiple_sequences_from_fasta(feature_seqs_path)
|
||
|
mlst_targets = {x.lower() for x in mlst.alleles_to_mapping(expected_profile.alleles).keys()}
|
||
|
async with bigsdb.get_BIGSdb_MLST_profiler(local_db, database_api, database_name, schema_id) as profiler:
|
||
|
for target_sequence in target_sequences:
|
||
|
match = re.fullmatch(r".*\[gene=([\w\d]+)\].*", target_sequence.description)
|
||
|
if match is None:
|
||
|
continue
|
||
|
gene = match.group(1).lower()
|
||
|
if gene not in mlst_targets:
|
||
|
continue
|
||
|
scrambled = gene_scrambler(str(target_sequence.seq), 0.125)
|
||
|
async for partial_match in profiler.determine_mlst_allele_variants([scrambled]):
|
||
|
assert partial_match.partial_match_profile is not None
|
||
|
mlst_targets.remove(gene)
|
||
|
|
||
|
assert len(mlst_targets) == 0
|
||
|
|
||
|
async def test_profiling_results_in_correct_mlst_st(self, local_db, database_api, database_name, schema_id, seq_path: str, feature_seqs_path: str, expected_profile: MLSTProfile, bad_profile: MLSTProfile):
|
||
|
async with bigsdb.get_BIGSdb_MLST_profiler(local_db, database_api, database_name, schema_id) as dummy_profiler:
|
||
|
mlst_st_data = await dummy_profiler.determine_mlst_st(expected_profile.alleles)
|
||
|
assert mlst_st_data is not None
|
||
|
assert isinstance(mlst_st_data, MLSTProfile)
|
||
|
assert mlst_st_data.clonal_complex == expected_profile.clonal_complex
|
||
|
assert mlst_st_data.sequence_type == expected_profile.sequence_type
|
||
|
|
||
|
async def test_profiling_non_exact_results_in_list_of_mlsts(self, local_db, database_api, database_name, schema_id, seq_path: str, feature_seqs_path: str, expected_profile: MLSTProfile, bad_profile: MLSTProfile):
|
||
|
dummy_alleles = bad_profile.alleles
|
||
|
async with bigsdb.get_BIGSdb_MLST_profiler(local_db, database_api, database_name, schema_id) as dummy_profiler:
|
||
|
mlst_profile = await dummy_profiler.determine_mlst_st(dummy_alleles)
|
||
|
assert mlst_profile.clonal_complex == "unknown"
|
||
|
assert mlst_profile.sequence_type == "unknown"
|
||
|
|
||
|
|
||
|
async def test_bigsdb_profile_multiple_strings_same_string_twice(self, local_db, database_api, database_name, schema_id, seq_path: str, feature_seqs_path: str, expected_profile: MLSTProfile, bad_profile: MLSTProfile):
|
||
|
sequence = get_first_sequence_from_fasta(seq_path)
|
||
|
dummy_sequences = [[NamedString("seq1", sequence)], [NamedString("seq2", sequence)]]
|
||
|
async with bigsdb.get_BIGSdb_MLST_profiler(local_db, database_api, database_name, schema_id) as dummy_profiler:
|
||
|
async for named_profile in dummy_profiler.profile_multiple_strings(generate_async_iterable(dummy_sequences)):
|
||
|
name, profile = named_profile.name, named_profile.mlst_profile
|
||
|
assert profile is not None
|
||
|
assert isinstance(profile, MLSTProfile)
|
||
|
assert profile.clonal_complex == expected_profile.clonal_complex
|
||
|
assert profile.sequence_type == expected_profile.sequence_type
|
||
|
|
||
|
async def test_bigsdb_profile_multiple_strings_exactmatch_fail_second_no_stop(self, local_db, database_api, database_name, schema_id, seq_path: str, feature_seqs_path: str, expected_profile: MLSTProfile, bad_profile: MLSTProfile):
|
||
|
valid_seq = get_first_sequence_from_fasta(seq_path)
|
||
|
dummy_sequences = [[NamedString("seq1", valid_seq)], [NamedString("should_fail", gene_scrambler(valid_seq, 0.3))], [NamedString("seq3", valid_seq)]]
|
||
|
async with bigsdb.get_BIGSdb_MLST_profiler(local_db, database_api, database_name, schema_id) as dummy_profiler:
|
||
|
async for name_profile in dummy_profiler.profile_multiple_strings(generate_async_iterable(dummy_sequences), True):
|
||
|
name, profile = name_profile.name, name_profile.mlst_profile
|
||
|
|
||
|
assert profile is not None
|
||
|
if name == "should_fail":
|
||
|
assert profile.clonal_complex == "unknown"
|
||
|
assert profile.sequence_type == "unknown"
|
||
|
assert len(profile.alleles) > 0
|
||
|
else:
|
||
|
assert isinstance(profile, MLSTProfile)
|
||
|
assert profile.clonal_complex == expected_profile.clonal_complex
|
||
|
assert profile.sequence_type == expected_profile.sequence_type
|
||
|
|
||
|
async def test_bigsdb_profile_multiple_strings_nonexact_second_no_stop(self, local_db, database_api, database_name, schema_id, seq_path: str, feature_seqs_path: str, expected_profile: MLSTProfile, bad_profile: MLSTProfile):
|
||
|
valid_seq = get_first_sequence_from_fasta(seq_path)
|
||
|
dummy_sequences = [[NamedString("seq1", valid_seq)], [NamedString("should_fail", gene_scrambler(valid_seq, 0.3))], [NamedString("seq3", valid_seq)]]
|
||
|
|
||
|
async with bigsdb.get_BIGSdb_MLST_profiler(local_db, database_api, database_name, schema_id) as dummy_profiler:
|
||
|
async for named_profile in dummy_profiler.profile_multiple_strings(generate_async_iterable(dummy_sequences), False):
|
||
|
name, profile = named_profile.name, named_profile.mlst_profile
|
||
|
|
||
|
assert profile is not None
|
||
|
if name == "should_fail":
|
||
|
assert profile.clonal_complex == "unknown"
|
||
|
assert profile.sequence_type == "unknown"
|
||
|
assert len(profile.alleles) > 0
|
||
|
else:
|
||
|
assert isinstance(profile, MLSTProfile)
|
||
|
assert profile.clonal_complex == expected_profile.clonal_complex
|
||
|
assert profile.sequence_type == expected_profile.sequence_type
|
||
|
|
||
|
class TestBIGSdbIndex:
|
||
|
|
||
|
async def test_bigsdb_index_all_databases_is_not_empty(self):
|
||
|
async with BIGSdbIndex() as bigsdb_index:
|
||
|
assert len(await bigsdb_index.get_known_seqdef_dbs()) > 0
|
||
|
|
||
|
async def test_bigsdb_index_references_pubmlst_correctly(self):
|
||
|
async with BIGSdbIndex() as bigsdb_index:
|
||
|
assert (await bigsdb_index.get_bigsdb_api_from_seqdefdb("pubmlst_hinfluenzae_seqdef")) == "https://rest.pubmlst.org"
|
||
|
|
||
|
async def test_bigsdb_index_references_institutpasteur_correctly(self):
|
||
|
async with BIGSdbIndex() as bigsdb_index:
|
||
|
assert (await bigsdb_index.get_bigsdb_api_from_seqdefdb("pubmlst_bordetella_seqdef")) == "https://bigsdb.pasteur.fr/api"
|
||
|
|
||
|
async def test_bigsdb_index_get_schemas_for_bordetella(self):
|
||
|
async with BIGSdbIndex() as index:
|
||
|
schemas = await index.get_schemas_for_seqdefdb(seqdef_db_name="pubmlst_bordetella_seqdef")
|
||
|
assert len(schemas.keys()) > 0
|
||
|
assert "MLST" in schemas
|
||
|
assert isinstance(schemas["MLST"], int)
|
||
|
|
||
|
async def test_bigsdb_index_get_databases_has_only_seqdef(self):
|
||
|
async with BIGSdbIndex() as index:
|
||
|
databases = await index.get_known_seqdef_dbs()
|
||
|
assert len(databases.keys()) > 0
|
||
|
for database_name in databases.keys():
|
||
|
assert database_name.endswith("seqdef")
|
||
|
assert databases["pubmlst_bordetella_seqdef"] == "https://bigsdb.pasteur.fr/api"
|
||
|
|
||
|
async def test_bigsdb_index_instantiates_correct_profiler(self):
|
||
|
sequence = str(SeqIO.read("tests/resources/tohama_I_bpertussis.fasta", "fasta").seq)
|
||
|
async with BIGSdbIndex() as bigsdb_index:
|
||
|
async with await bigsdb_index.build_profiler_from_seqdefdb("pubmlst_bordetella_seqdef", 3) as profiler:
|
||
|
profile = await profiler.profile_string(sequence)
|
||
|
assert profile.clonal_complex == "ST-2 complex"
|
||
|
assert profile.sequence_type == "1"
|