splitmsa/msa_splitter.py

444 lines
12 KiB
Python
Executable File

#!/usr/bin/env python3
# General Outline of What Needs to be Done
# Multiple sequence alignments in FASTA format
# -> Split into individual gene files ->
# 2 Files for each gene, one with full nucleotide sequence, one without.
# Looks like we also want the amino acid sequence, we'll do that too.
import argparse
from logging import debug, error, info, warning
import logging
import os
from Bio import SeqIO, SeqRecord
from Bio.Seq import Seq
import csv
def read_genes_from_csv(batch_genes_csv_path: str):
csv_header = None
genes = []
with open(batch_genes_csv_path) as batch_csv_fd:
reader = csv.reader(batch_csv_fd)
for row in reader:
if not csv_header:
csv_header = list(row)
else:
gene_name = row[csv_header.index("Gene")]
gene_start = int(row[csv_header.index("Start")])
gene_end = int(row[csv_header.index("End")])
genes.append((gene_name, gene_start, gene_end))
return genes
def read_msa_file(input: str):
if not os.path.isfile(input):
error(f'"{input}" could not be found.')
exit(3)
for s_record in SeqIO.parse(input, "fasta"):
yield s_record
def write_to_file(
output_dir: str,
gene_name: str,
start: int,
end: int,
full_suffix: str,
ns_suffix: str,
aa_suffix: str,
nt_sequence_records: list,
nt_no_stop_sequence_records: list,
aa_sequence_records: list = [],
aa_no_stop_sequence_records: list = [],
):
output_path = os.path.abspath(output_dir)
if not os.path.isdir(output_dir):
os.makedirs(output_path)
SeqIO.write(
nt_sequence_records,
os.path.join(
output_path,
f'{gene_name or f"{start} - {end - 3}"}' f" - {full_suffix}.fasta",
),
"fasta",
)
if len(nt_no_stop_sequence_records):
SeqIO.write(
nt_no_stop_sequence_records,
os.path.join(
output_path,
f'{gene_name or f"{start} - {end - 3}"}' f" - {ns_suffix}.fasta",
),
"fasta",
)
if len(aa_sequence_records):
SeqIO.write(
aa_sequence_records,
os.path.join(
output_path,
f'{gene_name or f"{start} - {end - 3}"}'
f" - {aa_suffix}"
f" - {full_suffix}.fasta",
),
"fasta",
)
if len(aa_no_stop_sequence_records):
SeqIO.write(
aa_no_stop_sequence_records,
os.path.join(
output_path,
f'{gene_name or f"{start} - {end - 3}"}'
f" - {aa_suffix}"
f" - {ns_suffix}.fasta",
),
"fasta",
)
STOP_CODONS = {"TAG", "TAA", "TGA"}
START_CODON = "ATG"
def trim(
start: int,
end: int,
gen_cut_stop_codon: bool,
perform_translation: bool,
msa_records,
correction_range=16,
):
tru_start = start - 1
nt_sequence_records = []
nt_no_stop_sequence_records = []
aa_sequence_records = []
aa_no_stop_sequence_records = []
problems = []
debug(f"Beginning sequence trimming for {tru_start} - {end}.")
for s_record in msa_records:
record_metadata = (
s_record.id,
s_record.name,
s_record.description,
s_record.dbxrefs,
s_record.features,
s_record.annotations,
s_record.letter_annotations,
)
found_start_codon = False
start_shift = 0
for i in range(correction_range):
if s_record.seq[tru_start + i : tru_start + i + 3] == START_CODON:
found_start_codon = True
start_shift = i
break
if (
not found_start_codon
and s_record.seq[tru_start - i : tru_start - i + 3] == START_CODON
):
found_start_codon = True
start_shift = -i
break
if not found_start_codon:
problems.append([start, end, s_record.id, "Could not find start codon"])
if found_start_codon and start_shift != 0:
problems.append(
[
start,
end,
s_record.id,
"Corrected start codon to " f"{tru_start + start_shift}",
]
)
end_shift = 0
found_stop_codon = False
for i in range(correction_range):
if s_record.seq[end + i - 3 : end + i] in STOP_CODONS:
found_stop_codon = True
end_shift = i
break
if (
not found_stop_codon
and s_record.seq[end - i - 3 : end - i] in STOP_CODONS
):
found_stop_codon = True
end_shift = -i
break
if not found_stop_codon:
problems.append([start, end, s_record.id, "Could not find stop codon"])
if found_stop_codon and end_shift != 0:
problems.append(
[
start,
end,
s_record.id,
"Corrected stop codon to " f"{end + end_shift}",
]
)
nt_sequence = s_record.seq[
tru_start + start_shift : end + end_shift
] # Cropping
nt_sequence_records.append(SeqRecord.SeqRecord(nt_sequence, *record_metadata))
if gen_cut_stop_codon:
nt_no_stop_sequence = s_record.seq[
tru_start + start_shift : end - 3 + end_shift
]
nt_no_stop_sequence_records.append(
SeqRecord.SeqRecord(nt_no_stop_sequence, *record_metadata)
)
if perform_translation:
aa_sequence = (
s_record.seq[tru_start + start_shift - 1 : end + end_shift]
.replace("-", "n")
.translate()
)
aa_sequence_records.append(
SeqRecord.SeqRecord(aa_sequence, *record_metadata)
)
if gen_cut_stop_codon:
aa_no_stop_sequence = aa_sequence[:-1]
aa_no_stop_sequence_records.append(
SeqRecord.SeqRecord(aa_no_stop_sequence, *record_metadata)
)
debug(f"Trimming for {s_record.id} complete.")
debug(f"Sequence trimming for {tru_start} - {end} complete.")
return (
nt_sequence_records,
nt_no_stop_sequence_records,
aa_sequence_records,
aa_no_stop_sequence_records,
problems,
)
def log_problems(gene: str, problems: list[list[str]]):
for start, end, id, desc in problems:
warning(f"{gene} ({start} - {end}) of {id}: {desc}")
def output_as_csv(gene: str, problems: list[list[str]], output_path: str):
header = ["Start", "End", "Sequence ID", "Problem"]
with open(output_path, "w") as problem_csv_fd:
writer = csv.writer(problem_csv_fd)
writer.writerow(header)
writer.writerows(problems)
def main(args):
logging.basicConfig(level=args.log_level.upper())
msa_records = list(read_msa_file(args.input))
info(f"MSA records read complete. Found {len(msa_records)} records.")
genes = []
if args.gene_list:
genes = read_genes_from_csv(args.gene_list)
info(f"Gene list read from {args.gene_list} resulted in {len(genes)} " "genes.")
else:
if args.gene_name and args.start and args.end:
genes.append([args.gene_name, args.start, args.end])
info(
f"Extracting {args.gene_name} starting at {args.start} to "
f"{args.end}."
)
else:
raise Exception(
"Need either a gene list by --gene-list or a start and end "
"via --start, and --end respectively."
)
for gene_name, start, end in genes:
info(f"Started on gene {gene_name} ({start} - {end})")
(
nt_sequence_records,
nt_no_stop_sequence_records,
aa_sequence_records,
aa_no_stop_sequence_records,
problems,
) = trim(
start,
end,
args.gen_cut_stop_codon,
args.do_translate,
msa_records,
correction_range=args.correction_range,
)
if len(problems) > 0:
warning(
f"There were {len(problems)} problems " f"during trimming {gene_name}!"
)
if args.catalogue_problems:
output_as_csv(
gene_name,
problems,
os.path.join(args.output_dir, f"{gene_name} - problems.csv"),
)
write_to_file(
args.output_dir,
gene_name,
start,
end,
args.full_suffix,
args.ns_suffix,
args.aa_suffix,
nt_sequence_records,
nt_no_stop_sequence_records,
aa_sequence_records,
aa_no_stop_sequence_records,
)
info(f"Completed gene {gene_name} ({start} - {end})")
if __name__ == "__main__":
parser = argparse.ArgumentParser(
prog="msa_splitter",
description="""
The MSA splitter is a simple program that takes in two positions
and a MSA file and produces two separate FASTA files
""",
)
parser.add_argument(
"input",
help="The MSA file in FASTA format.",
type=str,
metavar="i",
)
parser.add_argument(
"-L",
"--gene-list",
help="A list of genes in CSV file.",
type=str,
required=False,
default=None,
)
parser.add_argument(
"-s",
"--start",
help="The start of the desired sequence.",
type=int,
required=False,
default=None,
)
parser.add_argument(
"-e",
"--end",
help="The end of the desired sequence.",
type=int,
required=False,
default=None,
)
parser.add_argument(
"-n",
"--gene-name",
help="The name of the gene with the given start and end locations.",
type=str,
required=False,
default=None,
)
parser.add_argument(
"-o",
"--output-dir",
help="""The directory (folder) of which the
generated files should reside.
""",
type=str,
required=False,
default="output",
)
parser.add_argument(
"-G",
"--gen-cut-stop-codon",
help="""If specified, for each FASTA file generated,
another FASTA will be generated without the stop codon.
""",
required=False,
default=False,
action="store_true",
)
parser.add_argument(
"-nt",
"--full-suffix",
help="""The suffix to append to the full nucleotide sequence
FASTA output files.
""",
type=str,
required=False,
default="full",
)
parser.add_argument(
"-ns",
"--ns-suffix",
help="""The suffix to append to the full nucleotide sequence
without the stop FASTA output files.
""",
type=str,
required=False,
default="no stop",
)
parser.add_argument(
"-aa",
"--aa-suffix",
help="""The suffix to append to the amino acid sequence FASTA
output files.
""",
type=str,
required=False,
default="aa",
)
parser.add_argument(
"-A",
"--do-translate",
help="Attempts to translate all nucleotide sequences to amino acid sequences.",
required=False,
action="store_true",
)
parser.add_argument(
"--correction-range",
"-R",
help="The number of offsets in terms of nucleotides to try in both "
"directions to correct start and stop codons before giving up.",
type=int,
required=False,
default=9,
)
parser.add_argument(
"-E",
"--log-level",
help="The verbosity of the program.",
type=str,
required=False,
default="INFO",
)
parser.add_argument(
"-C",
"--catalogue-problems",
help="Generates a CSV for each gene listing the problems that "
"occurred during trimming.",
required=False,
action="store_true",
)
main(parser.parse_args())