#!/usr/bin/env python3 # General Outline of What Needs to be Done # Multiple sequence alignments in FASTA format # -> Split into individual gene files -> # 2 Files for each gene, one with full nucleotide sequence, one without. # Looks like we also want the amino acid sequence, we'll do that too. import argparse from logging import debug, error, info, warning import logging import os from Bio import SeqIO, SeqRecord from Bio.Seq import Seq import csv def read_genes_from_csv(batch_genes_csv_path: str): csv_header = None genes = [] with open(batch_genes_csv_path) as batch_csv_fd: reader = csv.reader(batch_csv_fd) for row in reader: if not csv_header: csv_header = list(row) else: gene_name = row[csv_header.index("Gene")] gene_start = int(row[csv_header.index("Start")]) gene_end = int(row[csv_header.index("End")]) genes.append((gene_name, gene_start, gene_end)) return genes def read_msa_file(input: str): if not os.path.isfile(input): error(f'"{input}" could not be found.') exit(3) for s_record in SeqIO.parse(input, "fasta"): yield s_record def write_to_file( output_dir: str, gene_name: str, start: int, end: int, full_suffix: str, ns_suffix: str, aa_suffix: str, nt_sequence_records: list, nt_no_stop_sequence_records: list, aa_sequence_records: list = [], aa_no_stop_sequence_records: list = [], ): output_path = os.path.abspath(output_dir) if not os.path.isdir(output_dir): os.makedirs(output_path) SeqIO.write( nt_sequence_records, os.path.join( output_path, f'{gene_name or f"{start} - {end - 3}"}' f" - {full_suffix}.fasta", ), "fasta", ) if len(nt_no_stop_sequence_records): SeqIO.write( nt_no_stop_sequence_records, os.path.join( output_path, f'{gene_name or f"{start} - {end - 3}"}' f" - {ns_suffix}.fasta", ), "fasta", ) if len(aa_sequence_records): SeqIO.write( aa_sequence_records, os.path.join( output_path, f'{gene_name or f"{start} - {end - 3}"}' f" - {aa_suffix}" f" - {full_suffix}.fasta", ), "fasta", ) if len(aa_no_stop_sequence_records): SeqIO.write( aa_no_stop_sequence_records, os.path.join( output_path, f'{gene_name or f"{start} - {end - 3}"}' f" - {aa_suffix}" f" - {ns_suffix}.fasta", ), "fasta", ) STOP_CODONS = {"TAG", "TAA", "TGA"} START_CODON = "ATG" def trim( start: int, end: int, gen_cut_stop_codon: bool, perform_translation: bool, msa_records, correction_range=16, ): tru_start = start - 1 nt_sequence_records = [] nt_no_stop_sequence_records = [] aa_sequence_records = [] aa_no_stop_sequence_records = [] problems = [] skip_translation = False debug(f"Beginning sequence trimming for {tru_start} - {end}.") if (end - tru_start) % 3 != 0: skip_translation = True warning( f"Skipping translating for {tru_start} - {end} due to length not being a " "multiple of 3..." ) for s_record in msa_records: record_metadata = ( s_record.id, s_record.name, s_record.description, s_record.dbxrefs, s_record.features, s_record.annotations, s_record.letter_annotations, ) found_start_codon = False start_shift = 0 for i in range(correction_range): if s_record.seq[tru_start + i : tru_start + i + 3] == START_CODON: found_start_codon = True start_shift = i break if ( not found_start_codon and s_record.seq[tru_start - i : tru_start - i + 3] == START_CODON ): found_start_codon = True start_shift = -i break if not found_start_codon: problems.append([start, end, s_record.id, "Could not find start codon"]) if found_start_codon and start_shift != 0: problems.append( [ start, end, s_record.id, "Corrected start codon to " f"{tru_start + start_shift}", ] ) end_shift = 0 found_stop_codon = False for i in range(correction_range): if s_record.seq[end + i - 3 : end + i] in STOP_CODONS: found_stop_codon = True end_shift = i break if ( not found_stop_codon and s_record.seq[end - i - 3 : end - i] in STOP_CODONS ): found_stop_codon = True end_shift = -i break if not found_stop_codon: problems.append([start, end, s_record.id, "Could not find stop codon"]) if found_stop_codon and end_shift != 0: problems.append( [ start, end, s_record.id, "Corrected stop codon to " f"{end + end_shift}", ] ) nt_sequence = s_record.seq[ tru_start + start_shift : end + end_shift ] # Cropping nt_sequence_records.append(SeqRecord.SeqRecord(nt_sequence, *record_metadata)) if gen_cut_stop_codon: nt_no_stop_sequence = s_record.seq[ tru_start + start_shift : end - 3 + end_shift ] nt_no_stop_sequence_records.append( SeqRecord.SeqRecord(nt_no_stop_sequence, *record_metadata) ) if perform_translation and not skip_translation: if '-' in nt_sequence: sequence_with_ambiguity = [] for codon_in_sequence in range(0, len(nt_sequence), 3): codon = nt_sequence[codon_in_sequence : codon_in_sequence + 3] if "-" in codon and codon != "---": sequence_with_ambiguity.extend(codon.replace("-", "N")) else: sequence_with_ambiguity.extend(codon) sequence_with_ambiguity = Seq("".join(sequence_with_ambiguity)) aa_sequence = sequence_with_ambiguity.translate() else: aa_sequence = nt_sequence.translate() aa_sequence_records.append( SeqRecord.SeqRecord(aa_sequence, *record_metadata) ) if gen_cut_stop_codon: aa_no_stop_sequence = aa_sequence[:-1] aa_no_stop_sequence_records.append( SeqRecord.SeqRecord(aa_no_stop_sequence, *record_metadata) ) debug(f"Trimming for {s_record.id} complete.") debug(f"Sequence trimming for {tru_start} - {end} complete.") return ( nt_sequence_records, nt_no_stop_sequence_records, aa_sequence_records, aa_no_stop_sequence_records, problems, ) def log_problems(gene: str, problems: list[list[str]]): for start, end, id, desc in problems: warning(f"{gene} ({start} - {end}) of {id}: {desc}") def output_as_csv(gene: str, problems: list[list[str]], output_path: str): header = ["Start", "End", "Sequence ID", "Problem"] with open(output_path, "w") as problem_csv_fd: writer = csv.writer(problem_csv_fd) writer.writerow(header) writer.writerows(problems) def main(args): logging.basicConfig(level=args.log_level.upper()) msa_records = list(read_msa_file(args.input)) info(f"MSA records read complete. Found {len(msa_records)} records.") genes = [] if args.gene_list: genes = read_genes_from_csv(args.gene_list) info(f"Gene list read from {args.gene_list} resulted in {len(genes)} " "genes.") else: if args.gene_name and args.start and args.end: genes.append([args.gene_name, args.start, args.end]) info( f"Extracting {args.gene_name} starting at {args.start} to " f"{args.end}." ) else: raise Exception( "Need either a gene list by --gene-list or a start and end " "via --start, and --end respectively." ) for gene_name, start, end in genes: info(f"Started on gene {gene_name} ({start} - {end})") ( nt_sequence_records, nt_no_stop_sequence_records, aa_sequence_records, aa_no_stop_sequence_records, problems, ) = trim( start, end, args.gen_cut_stop_codon, args.do_translate, msa_records, correction_range=args.correction_range, ) if len(problems) > 0: warning( f"There were {len(problems)} problems " f"during trimming {gene_name}!" ) if args.catalogue_problems: output_as_csv( gene_name, problems, os.path.join(args.output_dir, f"{gene_name} - problems.csv"), ) write_to_file( args.output_dir, gene_name, start, end, args.full_suffix, args.ns_suffix, args.aa_suffix, nt_sequence_records, nt_no_stop_sequence_records, aa_sequence_records, aa_no_stop_sequence_records, ) info(f"Completed gene {gene_name} ({start} - {end})") if __name__ == "__main__": parser = argparse.ArgumentParser( prog="msa_splitter", description=""" The MSA splitter is a simple program that takes in two positions and a MSA file and produces two separate FASTA files """, ) parser.add_argument( "input", help="The MSA file in FASTA format.", type=str, metavar="i", ) parser.add_argument( "-L", "--gene-list", help="A list of genes in CSV file.", type=str, required=False, default=None, ) parser.add_argument( "-s", "--start", help="The start of the desired sequence.", type=int, required=False, default=None, ) parser.add_argument( "-e", "--end", help="The end of the desired sequence.", type=int, required=False, default=None, ) parser.add_argument( "-n", "--gene-name", help="The name of the gene with the given start and end locations.", type=str, required=False, default=None, ) parser.add_argument( "-o", "--output-dir", help="""The directory (folder) of which the generated files should reside. """, type=str, required=False, default="output", ) parser.add_argument( "-G", "--gen-cut-stop-codon", help="""If specified, for each FASTA file generated, another FASTA will be generated without the stop codon. """, required=False, default=False, action="store_true", ) parser.add_argument( "-nt", "--full-suffix", help="""The suffix to append to the full nucleotide sequence FASTA output files. """, type=str, required=False, default="full", ) parser.add_argument( "-ns", "--ns-suffix", help="""The suffix to append to the full nucleotide sequence without the stop FASTA output files. """, type=str, required=False, default="no stop", ) parser.add_argument( "-aa", "--aa-suffix", help="""The suffix to append to the amino acid sequence FASTA output files. """, type=str, required=False, default="aa", ) parser.add_argument( "-A", "--do-translate", help="Attempts to translate all nucleotide sequences to amino acid sequences.", required=False, action="store_true", ) parser.add_argument( "--correction-range", "-R", help="The number of offsets in terms of nucleotides to try in both " "directions to correct start and stop codons before giving up.", type=int, required=False, default=9, ) parser.add_argument( "-E", "--log-level", help="The verbosity of the program.", type=str, required=False, default="INFO", ) parser.add_argument( "-C", "--catalogue-problems", help="Generates a CSV for each gene listing the problems that " "occurred during trimming.", required=False, action="store_true", ) main(parser.parse_args())