splitmsa/msa_splitter.py

381 lines
11 KiB
Python
Raw Normal View History

2023-03-22 19:30:53 +00:00
#!/usr/bin/env python3
# General Outline of What Needs to be Done
# Multiple sequence alignments in FASTA format
# -> Split into individual gene files ->
# 2 Files for each gene, one with full nucleotide sequence, one without.
# Looks like we also want the amino acid sequence, we'll do that too.
import argparse
from logging import debug, error, info, warning
import logging
import os
from Bio import SeqIO, SeqRecord
import csv
def read_genes_from_csv(batch_genes_csv_path: str):
csv_header = None
genes = []
with open(batch_genes_csv_path) as batch_csv_fd:
reader = csv.reader(batch_csv_fd)
for row in reader:
if not csv_header:
csv_header = list(row)
else:
gene_name = row[csv_header.index("Gene")]
gene_start = int(row[csv_header.index("Start")])
2023-03-22 19:30:53 +00:00
gene_end = int(row[csv_header.index("End")])
genes.append((gene_name, gene_start, gene_end))
return genes
def read_msa_file(input: str):
if not os.path.isfile(input):
error(f'"{input}" could not be found.')
exit(3)
for s_record in SeqIO.parse(input, "fasta"):
yield s_record
def write_to_file(
output_dir: str,
gene_name: str,
start: int,
end: int,
full_suffix: str,
ns_suffix: str,
aa_suffix: str,
nt_sequence_records: list,
nt_no_stop_sequence_records: list,
aa_sequence_records: list = None,
aa_no_stop_sequence_records: list = None,
):
output_path = os.path.abspath(output_dir)
if not os.path.isdir(output_dir):
os.makedirs(output_path)
SeqIO.write(
nt_sequence_records,
os.path.join(
output_path,
f'{gene_name or f"{start} - {end - 3}"}' f" - {full_suffix}.fasta",
),
"fasta",
)
SeqIO.write(
nt_no_stop_sequence_records,
os.path.join(
output_path,
f'{gene_name or f"{start} - {end - 3}"}' f" - {ns_suffix}.fasta",
),
"fasta",
)
if aa_sequence_records:
SeqIO.write(
aa_sequence_records,
os.path.join(
output_path,
f'{gene_name or f"{start} - {end - 3}"}'
f" - {aa_suffix}"
f" - {full_suffix}.fasta",
),
"fasta",
)
if aa_no_stop_sequence_records:
SeqIO.write(
aa_no_stop_sequence_records,
os.path.join(
output_path,
f'{gene_name or f"{start} - {end - 3}"}'
f" - {aa_suffix}"
f" - {ns_suffix}.fasta",
),
"fasta",
)
STOP_CODONS = {"TAG", "TAA", "TGA"}
START_CODON = "ATG"
def trim(
start: int,
end: int,
gen_cut_stop_codon: bool,
msa_records,
correction_range=16
):
tru_start = start - 1
2023-03-22 19:30:53 +00:00
nt_sequence_records = []
nt_no_stop_sequence_records = []
aa_sequence_records = []
aa_no_stop_sequence_records = []
debug(f"Beginning sequence trimming for {tru_start} - {end}.")
2023-03-22 19:30:53 +00:00
for s_record in msa_records:
record_metadata = (
s_record.id,
s_record.name,
s_record.description,
s_record.dbxrefs,
s_record.features,
s_record.annotations,
s_record.letter_annotations,
)
found_start_codon = False
start_shift = 0
for i in range(correction_range):
if s_record.seq[tru_start + i: tru_start + i + 3] == START_CODON:
2023-03-22 19:30:53 +00:00
found_start_codon = True
start_shift = i
break
if not found_start_codon and s_record.seq[
tru_start - i: tru_start - i + 3] == START_CODON:
2023-03-22 19:30:53 +00:00
found_start_codon = True
start_shift = -i
break
if not found_start_codon:
warning(
f"Could not find start codon for region {tru_start} - {end} "
f"with sequence ID {s_record.id}. Continuing without "
f"correction...")
if found_start_codon and start_shift != 0:
2023-03-22 19:30:53 +00:00
warning(
"Start codon was not found at expected location for region "
f"{tru_start} - {end} with sequence ID {s_record.id}. "
f"Correcting start location to {tru_start + start_shift}")
2023-03-22 19:30:53 +00:00
end_shift = 0
found_stop_codon = False
for i in range(correction_range):
if s_record.seq[end + i - 3: end + i] in STOP_CODONS:
found_stop_codon = True
end_shift = i
break
if not found_stop_codon and s_record.seq[
end - i - 3: end - i] in STOP_CODONS:
2023-03-22 19:30:53 +00:00
found_stop_codon = True
end_shift = -i
break
if not found_stop_codon:
warning(
f"Could not find stop codon for region {tru_start} - {end} "
f"with sequence ID {s_record.id}. Continuing without "
f"correction...")
if found_stop_codon and end_shift != 0:
2023-03-22 19:30:53 +00:00
warning(
f"Stop codon was not found at expected location for region "
f"{tru_start} - {end} with sequence ID {s_record.id}. "
f"Correcting end location to: {end + end_shift}.")
nt_sequence = s_record.seq[tru_start +
start_shift: end + end_shift] # Cropping
nt_no_stop_sequence = s_record.seq[tru_start +
2023-03-22 19:30:53 +00:00
start_shift: end - 3 + end_shift]
nt_sequence_records.append(
SeqRecord.SeqRecord(nt_sequence, *record_metadata))
nt_no_stop_sequence_records.append(
SeqRecord.SeqRecord(nt_no_stop_sequence, *record_metadata)
)
if gen_cut_stop_codon:
aa_sequence = nt_sequence.translate()
aa_no_stop_sequence = nt_no_stop_sequence.translate()
aa_sequence_records.append(
SeqRecord.SeqRecord(aa_sequence, *record_metadata)
)
aa_no_stop_sequence_records.append(
SeqRecord.SeqRecord(aa_no_stop_sequence, *record_metadata)
)
debug(f"Trimming for {s_record.id} complete.")
debug(f"Sequence trimming for {tru_start} - {end} complete.")
2023-03-22 19:30:53 +00:00
return (
nt_sequence_records,
nt_no_stop_sequence_records,
aa_sequence_records,
aa_no_stop_sequence_records,
)
def main(args):
logging.basicConfig(level=args.log_level.upper())
msa_records = list(read_msa_file(args.input))
info(f"MSA records read complete. Found {len(msa_records)} records.")
genes = []
if args.gene_list:
genes = read_genes_from_csv(args.gene_list)
info(
f"Gene list read from {args.gene_list} resulted in {len(genes)} "
"genes.")
2023-03-22 19:30:53 +00:00
else:
if args.gene_name and args.start and args.end:
genes.append([args.gene_name, args.start, args.end])
info(
f"Extracting {args.gene_name} starting at {args.start} to "
f"{args.end}.")
2023-03-22 19:30:53 +00:00
else:
raise Exception(
"Need either a gene list by --gene-list or a start and end "
"via --start, and --end respectively."
2023-03-22 19:30:53 +00:00
)
for gene_name, start, end in genes:
info(f"Started on gene {gene_name} ({start} - {end})")
2023-03-22 19:30:53 +00:00
(
nt_sequence_records,
nt_no_stop_sequence_records,
aa_sequence_records,
aa_no_stop_sequence_records,
) = trim(start, end, args.gen_cut_stop_codon,
msa_records, correction_range=args.correction_range)
2023-03-22 19:30:53 +00:00
write_to_file(
args.output_dir,
gene_name,
start,
end,
args.full_suffix,
args.ns_suffix,
args.aa_suffix,
nt_sequence_records,
nt_no_stop_sequence_records,
aa_sequence_records,
aa_no_stop_sequence_records,
)
info(f"Completed gene {gene_name} ({start} - {end})")
2023-03-22 19:30:53 +00:00
if __name__ == "__main__":
parser = argparse.ArgumentParser(
prog="msa_splitter",
description="""
The MSA splitter is a simple program that takes in two positions
and a MSA file and produces two separate FASTA files
""",
)
parser.add_argument(
"input",
help="The MSA file in FASTA format.",
type=str,
metavar="i",
)
parser.add_argument(
"-L",
"--gene-list",
help="A list of genes in CSV file.",
type=str,
required=False,
default=None,
)
parser.add_argument(
"-s",
"--start",
help="The start of the desired sequence.",
type=int,
required=False,
default=None,
)
parser.add_argument(
"-e",
"--end",
help="The end of the desired sequence.",
type=int,
required=False,
default=None,
)
parser.add_argument(
"-n",
"--gene-name",
help="The name of the gene with the given start and end locations.",
type=str,
required=False,
default=None,
)
parser.add_argument(
"-o",
"--output-dir",
help="""The directory (folder) of which the
generated files should reside.
""",
type=str,
required=False,
default="output",
)
parser.add_argument(
"-G",
"--gen-cut-stop-codon",
help="""If specified, for each FASTA file generated,
another FASTA will be generated without the stop codon.
""",
required=False,
default=False,
action="store_true"
)
parser.add_argument(
"-nt",
"--full-suffix",
help="""The suffix to append to the full nucleotide sequence
FASTA output files.
""",
type=str,
required=False,
default="full",
)
parser.add_argument(
"-ns",
"--ns-suffix",
help="""The suffix to append to the full nucleotide sequence
without the stop FASTA output files.
""",
type=str,
required=False,
default="no stop",
)
parser.add_argument(
"-aa",
"--aa-suffix",
help="""The suffix to append to the amino acid sequence FASTA
output files.
""",
type=str,
required=False,
default="aa",
)
parser.add_argument(
"--correction-range",
"-R",
help="The number of offsets in terms of nucleotides to try in both "
"directions to correct start and stop codons before giving up.",
2023-03-22 19:30:53 +00:00
type=int,
required=False,
default=9
)
parser.add_argument(
"-E",
"--log-level",
help="The verbosity of the program.",
type=str,
required=False,
default="INFO"
)
main(parser.parse_args())