Implemented normalizing genotype
Some checks failed
ydeng/modvcfsamples/pipeline/head There was a failure building this commit

This commit is contained in:
Harrison Deng 2023-06-27 06:15:32 +00:00
parent 08ba073ef9
commit 7bae01b1af
5 changed files with 58 additions and 8 deletions

2
Jenkinsfile vendored
View File

@ -9,7 +9,7 @@ pipeline {
}
stage("unit tests") {
steps {
sh returnStatus: true, script: "python -m pytest --junitxml=unit_tests.xml --cov-report xml:test_coverage.xml --cov=program"
sh returnStatus: true, script: "python -m pytest --junitxml=unit_tests.xml --cov-report xml:test_coverage.xml --cov=modvcfsamples"
xunit checksName: '', tools: [JUnit(excludesPattern: '', pattern: 'test_results.xml', stopProcessingIfError: true)]
cobertura autoUpdateHealth: false, autoUpdateStability: false, coberturaReportFile: 'test_coverage.xml', failUnhealthy: false, failUnstable: false, maxNumberOfBuilds: 64, lineCoverageTargets: '50, 0, 0', methodCoverageTargets: '50, 0, 0', onlyStable: false, sourceEncoding: 'ASCII', zoomCoverageChart: false
}

View File

@ -6,6 +6,7 @@ dependencies:
- pip
- python-build
- pytest
- pytest-cov
- twine
- sphinx
- pyvcf

View File

@ -1,12 +1,18 @@
import argparse
import os
from typing import Union
from modvcfsamples import sample
def run(vcfs: list[str], only: list[str], output_dir: str):
def run(vcfs: list[str], only: list[str], gt: Union[int, None], output_dir: str):
for vcf in vcfs:
vcf_records, header = sample.get_records_from_vcf(vcf)
processed_vcfs, modified_header = sample.keep_specific_call_data(vcf_records, header, *only)
sample.write_records_to_vcf(processed_vcfs, modified_header, os.path.join(output_dir, os.path.basename(vcf)))
modified_vcfs = vcf_records
modified_header = header
if len(only) > 1:
modified_vcfs, modified_header = sample.keep_specific_call_data(modified_vcfs, modified_header, *only)
if gt is not None:
modified_vcfs, modified_header = sample.normalize_gt_to_length(modified_vcfs, modified_header, gt)
sample.write_records_to_vcf(modified_vcfs, modified_header, os.path.join(output_dir, os.path.basename(vcf)))
def main():
parser = argparse.ArgumentParser()
@ -23,16 +29,24 @@ def main():
metavar="O",
type=str
)
parser.add_argument(
"--gt-norm",
"-g",
help="Resizes haploid genotypes to n-ploid by repeating it.",
type=int,
required=False
)
parser.add_argument(
"--only",
"-n",
help="Remove everything but the sample datatype",
action="append",
type=str
type=str,
required=False
)
args = parser.parse_args()
run(args.vcfs, args.only, args.output_dir)
run(args.vcfs, args.only, args.gt_norm, args.output_dir)
if __name__ == "__main__":

View File

@ -46,9 +46,32 @@ def keep_specific_call_data(records: list[vcfpy.Record], header: vcfpy.Header, *
return modified_records, modified_header
def normalize_gt_to_length(records: list[vcfpy.Record], header: vcfpy.Header, num: int):
modified_records = []
for record in records:
modified_calls = []
for call in record.calls:
pass
gt_parts = call.data['GT'].replace("/", "|").split("|")
modified_call = deepcopy(call)
if len(gt_parts) > 1:
# TODO Add logging and output if gt_parts is longer.
pass
else:
modified_call.data['GT'] = "|".join([gt_parts[0]] * num)
modified_calls.append(modified_call)
modified_record = vcfpy.Record(
record.CHROM,
record.POS,
record.ID,
record.REF,
record.ALT,
record.QUAL,
record.FILTER,
record.INFO,
record.FORMAT,
modified_calls,
)
modified_records.append(modified_record)
return modified_records, header
def write_records_to_vcf(records: Iterable[vcfpy.Record], header: vcfpy.Header, path: str):
os.makedirs(os.path.dirname(path), exist_ok=True)

View File

@ -1,4 +1,4 @@
from modvcfsamples.sample import keep_specific_call_data, get_records_from_vcf
from modvcfsamples.sample import keep_specific_call_data, get_records_from_vcf, normalize_gt_to_length
import os
def test_filter_all_sample_datatypes_not_empty():
@ -16,3 +16,15 @@ def test_filter_all_sample_datatypes_filtered():
assert len(call.data.keys()) <= len(filter_for)
for key, _ in call.data.items():
assert key in filter_for
def test_normalize_gt_to_length_not_empty():
records, header = get_records_from_vcf(os.path.abspath("tests/resources/test_files_shortened_haploid.vcf"))
modified_records, _ = normalize_gt_to_length(records, header, 4)
assert len(modified_records) > 0
def test_normalize_gt_to_length_gt_normalized():
records, header = get_records_from_vcf(os.path.abspath("tests/resources/test_files_shortened_haploid.vcf"))
modified_records, _ = normalize_gt_to_length(records, header, 4)
for modified_record in modified_records:
for call in modified_record.calls:
assert len(call.data["GT"].split("|")) == 4 or "/" in call.data["GT"]