216 lines
12 KiB
Python

from os import path
import random
import re
from typing import Callable, Collection, Sequence, Union
from Bio import SeqIO
import pytest
from autobigs.engine.analysis import bigsdb
from autobigs.engine.structures import mlst
from autobigs.engine.structures.genomics import NamedString
from autobigs.engine.structures.mlst import Allele, MLSTProfile
from autobigs.engine.exceptions.database import NoBIGSdbExactMatchesException, NoBIGSdbMatchesException
from autobigs.engine.analysis.bigsdb import BIGSdbIndex, BIGSdbMLSTProfiler, RemoteBIGSdbMLSTProfiler
async def generate_async_iterable(normal_iterable):
for dummy_sequence in normal_iterable:
yield dummy_sequence
def gene_scrambler(gene: str, mutation_site_count: Union[int, float], alphabet: Sequence[str] = ["A", "T", "C", "G"]):
rand = random.Random(gene)
if isinstance(mutation_site_count, float):
mutation_site_count = int(mutation_site_count * len(gene))
random_locations = rand.choices(range(len(gene)), k=mutation_site_count)
scrambled = list(gene)
for random_location in random_locations:
scrambled[random_location] = rand.choice(alphabet)
return "".join(scrambled)
def get_first_sequence_from_fasta(resource: str):
return str(SeqIO.read(path.join("tests/resources/", resource), "fasta").seq)
def get_multiple_sequences_from_fasta(resource: str):
return tuple(SeqIO.parse(path.join("tests/resources/", resource), "fasta"))
bpertussis_tohamaI_profile = MLSTProfile((
Allele("adk", "1", None),
Allele("fumC", "1", None),
Allele("glyA", "1", None),
Allele("tyrB", "1", None),
Allele("icd", "1", None),
Allele("pepA", "1", None),
Allele("pgm", "1", None)), "1", "ST-2 complex")
bpertussis_tohamaI_bad_profile = MLSTProfile((
Allele("adk", "1", None),
Allele("fumC", "2", None),
Allele("glyA", "36", None),
Allele("tyrB", "4", None),
Allele("icd", "4", None),
Allele("pepA", "1", None),
Allele("pgm", "5", None),
), "unknown", "unknown")
hinfluenzae_fdaargos_profile = MLSTProfile((
Allele("adk", "1", None),
Allele("atpG", "1", None),
Allele("frdB", "1", None),
Allele("fucK", "1", None),
Allele("mdh", "1", None),
Allele("pgi", "1", None),
Allele("recA", "5", None)
), "3", "ST-3 complex")
hinfluenzae_fdaargos_bad_profile = MLSTProfile((
Allele("adk", "1", None),
Allele("atpG", "1", None),
Allele("frdB", "1", None),
Allele("fucK", "1", None),
Allele("mdh", "1", None),
Allele("pgi", "1", None),
Allele("recA", "5", None)
), "3", "ST-3 complex")
hinfluenzae_fdaargos_sequence = str(SeqIO.read("tests/resources/fdaargos_1560_hinfluenza.fasta", "fasta").seq)
hinfluenzae_fdaargos_fragmented_sequence = tuple(SeqIO.parse("tests/resources/tohama_I_bpertussis_features.fasta", "fasta"))
@pytest.mark.parametrize("local_db,database_api,database_name,schema_id,seq_path,feature_seqs_path,expected_profile,bad_profile", [
(False, "https://bigsdb.pasteur.fr/api", "pubmlst_bordetella_seqdef", 3, "tohama_I_bpertussis.fasta", "tohama_I_bpertussis_features.fasta", bpertussis_tohamaI_profile, bpertussis_tohamaI_bad_profile),
(False, "https://rest.pubmlst.org", "pubmlst_hinfluenzae_seqdef", 1, "fdaargos_1560_hinfluenza.fasta", "fdaargos_1560_hinfluenza_features.fasta", hinfluenzae_fdaargos_profile, hinfluenzae_fdaargos_bad_profile),
])
class TestBIGSdbMLSTProfiler:
async def test_profiling_results_in_exact_matches_when_exact(self, local_db, database_api, database_name, schema_id, seq_path: str, feature_seqs_path: str, expected_profile: MLSTProfile, bad_profile: MLSTProfile):
sequence = get_first_sequence_from_fasta(seq_path)
async with bigsdb.get_BIGSdb_MLST_profiler(local_db, database_api, database_name, schema_id) as dummy_profiler:
expected_alleles = mlst.alleles_to_mapping(expected_profile.alleles)
targets_left = set(mlst.alleles_to_mapping(expected_profile.alleles).keys())
async for exact_match in dummy_profiler.determine_mlst_allele_variants(query_sequence_strings=[sequence]):
assert isinstance(exact_match, Allele)
assert exact_match.allele_locus in expected_alleles
assert exact_match.allele_variant == expected_alleles[exact_match.allele_locus]
targets_left.remove(exact_match.allele_locus)
assert len(targets_left) == 0
async def test_sequence_profiling_non_exact_returns_non_exact(self, local_db, database_api, database_name, schema_id, seq_path: str, feature_seqs_path: str, expected_profile: MLSTProfile, bad_profile: MLSTProfile):
target_sequences = get_multiple_sequences_from_fasta(feature_seqs_path)
mlst_targets = {x.lower() for x in mlst.alleles_to_mapping(expected_profile.alleles).keys()}
async with bigsdb.get_BIGSdb_MLST_profiler(local_db, database_api, database_name, schema_id) as profiler:
for target_sequence in target_sequences:
match = re.fullmatch(r".*\[gene=([\w\d]+)\].*", target_sequence.description)
if match is None:
continue
gene = match.group(1).lower()
if gene not in mlst_targets:
continue
scrambled = gene_scrambler(str(target_sequence.seq), 0.125)
async for partial_match in profiler.determine_mlst_allele_variants([scrambled]):
assert partial_match.partial_match_profile is not None
mlst_targets.remove(gene)
assert len(mlst_targets) == 0
async def test_profiling_results_in_correct_mlst_st(self, local_db, database_api, database_name, schema_id, seq_path: str, feature_seqs_path: str, expected_profile: MLSTProfile, bad_profile: MLSTProfile):
async with bigsdb.get_BIGSdb_MLST_profiler(local_db, database_api, database_name, schema_id) as dummy_profiler:
mlst_st_data = await dummy_profiler.determine_mlst_st(expected_profile.alleles)
assert mlst_st_data is not None
assert isinstance(mlst_st_data, MLSTProfile)
assert mlst_st_data.clonal_complex == expected_profile.clonal_complex
assert mlst_st_data.sequence_type == expected_profile.sequence_type
async def test_profiling_non_exact_results_in_list_of_mlsts(self, local_db, database_api, database_name, schema_id, seq_path: str, feature_seqs_path: str, expected_profile: MLSTProfile, bad_profile: MLSTProfile):
dummy_alleles = bad_profile.alleles
async with bigsdb.get_BIGSdb_MLST_profiler(local_db, database_api, database_name, schema_id) as dummy_profiler:
mlst_profile = await dummy_profiler.determine_mlst_st(dummy_alleles)
assert mlst_profile.clonal_complex == "unknown"
assert mlst_profile.sequence_type == "unknown"
async def test_bigsdb_profile_multiple_strings_same_string_twice(self, local_db, database_api, database_name, schema_id, seq_path: str, feature_seqs_path: str, expected_profile: MLSTProfile, bad_profile: MLSTProfile):
sequence = get_first_sequence_from_fasta(seq_path)
dummy_sequences = [[NamedString("seq1", sequence)], [NamedString("seq2", sequence)]]
async with bigsdb.get_BIGSdb_MLST_profiler(local_db, database_api, database_name, schema_id) as dummy_profiler:
async for named_profile in dummy_profiler.profile_multiple_strings(generate_async_iterable(dummy_sequences)):
name, profile = named_profile.name, named_profile.mlst_profile
assert profile is not None
assert isinstance(profile, MLSTProfile)
assert profile.clonal_complex == expected_profile.clonal_complex
assert profile.sequence_type == expected_profile.sequence_type
async def test_bigsdb_profile_multiple_strings_exactmatch_fail_second_no_stop(self, local_db, database_api, database_name, schema_id, seq_path: str, feature_seqs_path: str, expected_profile: MLSTProfile, bad_profile: MLSTProfile):
valid_seq = get_first_sequence_from_fasta(seq_path)
dummy_sequences = [[NamedString("seq1", valid_seq)], [NamedString("should_fail", gene_scrambler(valid_seq, 0.3))], [NamedString("seq3", valid_seq)]]
async with bigsdb.get_BIGSdb_MLST_profiler(local_db, database_api, database_name, schema_id) as dummy_profiler:
async for name_profile in dummy_profiler.profile_multiple_strings(generate_async_iterable(dummy_sequences), True):
name, profile = name_profile.name, name_profile.mlst_profile
assert profile is not None
if name == "should_fail":
assert profile.clonal_complex == "unknown"
assert profile.sequence_type == "unknown"
assert len(profile.alleles) > 0
else:
assert isinstance(profile, MLSTProfile)
assert profile.clonal_complex == expected_profile.clonal_complex
assert profile.sequence_type == expected_profile.sequence_type
async def test_bigsdb_profile_multiple_strings_nonexact_second_no_stop(self, local_db, database_api, database_name, schema_id, seq_path: str, feature_seqs_path: str, expected_profile: MLSTProfile, bad_profile: MLSTProfile):
valid_seq = get_first_sequence_from_fasta(seq_path)
dummy_sequences = [[NamedString("seq1", valid_seq)], [NamedString("should_fail", gene_scrambler(valid_seq, 0.3))], [NamedString("seq3", valid_seq)]]
async with bigsdb.get_BIGSdb_MLST_profiler(local_db, database_api, database_name, schema_id) as dummy_profiler:
async for named_profile in dummy_profiler.profile_multiple_strings(generate_async_iterable(dummy_sequences), False):
name, profile = named_profile.name, named_profile.mlst_profile
assert profile is not None
if name == "should_fail":
assert profile.clonal_complex == "unknown"
assert profile.sequence_type == "unknown"
assert len(profile.alleles) > 0
else:
assert isinstance(profile, MLSTProfile)
assert profile.clonal_complex == expected_profile.clonal_complex
assert profile.sequence_type == expected_profile.sequence_type
class TestBIGSdbIndex:
async def test_bigsdb_index_all_databases_is_not_empty(self):
async with BIGSdbIndex() as bigsdb_index:
assert len(await bigsdb_index.get_known_seqdef_dbs()) > 0
async def test_bigsdb_index_references_pubmlst_correctly(self):
async with BIGSdbIndex() as bigsdb_index:
assert (await bigsdb_index.get_bigsdb_api_from_seqdefdb("pubmlst_hinfluenzae_seqdef")) == "https://rest.pubmlst.org"
async def test_bigsdb_index_references_institutpasteur_correctly(self):
async with BIGSdbIndex() as bigsdb_index:
assert (await bigsdb_index.get_bigsdb_api_from_seqdefdb("pubmlst_bordetella_seqdef")) == "https://bigsdb.pasteur.fr/api"
async def test_bigsdb_index_get_schemas_for_bordetella(self):
async with BIGSdbIndex() as index:
schemas = await index.get_schemas_for_seqdefdb(seqdef_db_name="pubmlst_bordetella_seqdef")
assert len(schemas.keys()) > 0
assert "MLST" in schemas
assert isinstance(schemas["MLST"], int)
async def test_bigsdb_index_get_databases_has_only_seqdef(self):
async with BIGSdbIndex() as index:
databases = await index.get_known_seqdef_dbs()
assert len(databases.keys()) > 0
for database_name in databases.keys():
assert database_name.endswith("seqdef")
assert databases["pubmlst_bordetella_seqdef"] == "https://bigsdb.pasteur.fr/api"
@pytest.mark.parametrize("local", [
(True),
(False)
])
async def test_bigsdb_index_instantiates_correct_profiler(self, local):
sequence = str(SeqIO.read("tests/resources/tohama_I_bpertussis.fasta", "fasta").seq)
async with BIGSdbIndex() as bigsdb_index:
async with await bigsdb_index.build_profiler_from_seqdefdb(local, "pubmlst_bordetella_seqdef", 3) as profiler:
assert isinstance(profiler, BIGSdbMLSTProfiler)
profile = await profiler.profile_string(sequence)
assert profile.clonal_complex == "ST-2 complex"
assert profile.sequence_type == "1"