From 9f57f9b6662e7950f909e0734ba839b3a13a0658 Mon Sep 17 00:00:00 2001 From: Tom Clegg Date: Mon, 9 Mar 2020 15:28:31 -0400 Subject: [PATCH] Add vcf2fasta -mask option. Arvados-DCO-1.1-Signed-off-by: Tom Clegg --- vcf2fasta.go | 116 +++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 95 insertions(+), 21 deletions(-) diff --git a/vcf2fasta.go b/vcf2fasta.go index 77ae6abb68..9f6f085cd8 100644 --- a/vcf2fasta.go +++ b/vcf2fasta.go @@ -1,11 +1,13 @@ package main import ( + "bytes" "compress/gzip" "errors" "flag" "fmt" "io" + "io/ioutil" "net/http" _ "net/http/pprof" "os" @@ -19,11 +21,14 @@ import ( ) type vcf2fasta struct { - refFile string - projectUUID string - outputDir string - runLocal bool - vcpus int + refFile string + mask bool + gvcfRegionsPy string + gvcfRegionsPyData []byte + projectUUID string + outputDir string + runLocal bool + vcpus int } func (cmd *vcf2fasta) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int { @@ -36,6 +41,8 @@ func (cmd *vcf2fasta) RunCommand(prog string, args []string, stdin io.Reader, st flags := flag.NewFlagSet("", flag.ContinueOnError) flags.SetOutput(stderr) flags.StringVar(&cmd.refFile, "ref", "", "reference fasta `file`") + flags.BoolVar(&cmd.mask, "mask", false, "mask uncalled regions (default: output hom ref)") + flags.StringVar(&cmd.gvcfRegionsPy, "gvcf-regions.py", "https://raw.githubusercontent.com/lijiayong/gvcf_regions/master/gvcf_regions.py", "source of gvcf_regions.py") flags.StringVar(&cmd.projectUUID, "project", "", "project `UUID` for containers and output data") flags.StringVar(&cmd.outputDir, "output-dir", "", "output directory") flags.IntVar(&cmd.vcpus, "vcpus", 0, "number of VCPUs to request for arvados container (default: 2*number of input files, max 32)") @@ -94,7 +101,7 @@ func (cmd *vcf2fasta) RunCommand(prog string, args []string, stdin io.Reader, st return 1 } } - runner.Args = append([]string{"vcf2fasta", "-local=true", "-ref", cmd.refFile, "-output-dir", "/mnt/output"}, inputs...) + runner.Args = append([]string{"vcf2fasta", "-local=true", "-ref", cmd.refFile, fmt.Sprintf("-mask=%v", cmd.mask), "-gvcf-regions.py", cmd.gvcfRegionsPy, "-output-dir", "/mnt/output"}, inputs...) var output string output, err = runner.Run() if err != nil { @@ -104,6 +111,23 @@ func (cmd *vcf2fasta) RunCommand(prog string, args []string, stdin io.Reader, st return 0 } + if cmd.mask { + var resp *http.Response + resp, err = http.Get(cmd.gvcfRegionsPy) + if err != nil { + return 1 + } + if resp.StatusCode != http.StatusOK { + err = fmt.Errorf("get %q: http status %d", cmd.gvcfRegionsPy, resp.StatusCode) + return 1 + } + cmd.gvcfRegionsPyData, err = ioutil.ReadAll(resp.Body) + if err != nil { + err = fmt.Errorf("get %q: read body: %s", cmd.gvcfRegionsPy, err) + return 1 + } + } + infiles, err := listInputFiles(flags.Args()) if err != nil { return 1 @@ -154,23 +178,23 @@ func (cmd *vcf2fasta) RunCommand(prog string, args []string, stdin io.Reader, st return 0 } -func (cmd *vcf2fasta) vcf2fasta(infile string, phase int) error { - args := []string{"bcftools", "consensus", "--fasta-ref", cmd.refFile, "-H", fmt.Sprint(phase), infile} - indexsuffix := ".tbi" - if _, err := os.Stat(infile + ".csi"); err == nil { - indexsuffix = ".csi" +func maybeInDocker(args, mountfiles []string) []string { + if out, err := exec.Command("docker", "image", "ls", "-q", "lightning-runtime").Output(); err != nil || len(out) == 0 { + return args } - if out, err := exec.Command("docker", "image", "ls", "-q", "lightning-runtime").Output(); err == nil && len(out) > 0 { - args = append([]string{ - "docker", "run", "--rm", - "--log-driver=none", - "--volume=" + infile + ":" + infile + ":ro", - "--volume=" + infile + indexsuffix + ":" + infile + indexsuffix + ":ro", - "--volume=" + cmd.refFile + ":" + cmd.refFile + ":ro", - "lightning-runtime", - }, args...) + dockerrun := []string{ + "docker", "run", "--rm", + "--log-driver=none", } + for _, f := range mountfiles { + dockerrun = append(dockerrun, "--volume="+f+":"+f+":ro") + } + dockerrun = append(dockerrun, "lightning-runtime") + dockerrun = append(dockerrun, args...) + return dockerrun +} +func (cmd *vcf2fasta) vcf2fasta(infile string, phase int) error { _, basename := filepath.Split(infile) outfile := filepath.Join(cmd.outputDir, fmt.Sprintf("%s.%d.fasta.gz", basename, phase)) outf, err := os.OpenFile(outfile, os.O_CREATE|os.O_WRONLY|os.O_EXCL, 0777) @@ -181,9 +205,59 @@ func (cmd *vcf2fasta) vcf2fasta(infile string, phase int) error { gzipw := gzip.NewWriter(outf) defer gzipw.Close() - consensus := exec.Command(args[0], args[1:]...) + var maskfile *os.File // reading side of a pipe if we're running bedtools, otherwise nil + + var wg sync.WaitGroup + errs := make(chan error, 3) + if cmd.mask { + bedr, bedw, err := os.Pipe() + if err != nil { + return err + } + bedargs := []string{"python", "-", "--gvcf_type", "gatk", infile} + bed := exec.Command(bedargs[0], bedargs[1:]...) + bed.Stdin = bytes.NewBuffer(cmd.gvcfRegionsPyData) + bed.Stdout = bedw + wg.Add(1) + go func() { + defer wg.Done() + errs <- bed.Run() + }() + + bedcompr, bedcompw, err := os.Pipe() + if err != nil { + return err + } + bedcompargs := []string{"bedtools", "complement", "-i", "/dev/stdin", "-g", cmd.refFile} + bedcompargs = maybeInDocker(bedcompargs, []string{cmd.refFile, infile}) + bedcomp := exec.Command(bedcompargs[0], bedcompargs[1:]...) + bedcomp.Stdin = bedr + bedcomp.Stdout = bedcompw + wg.Add(1) + go func() { + defer wg.Done() + errs <- bedcomp.Run() + }() + maskfile = bedcompr + } + + consargs := []string{"bcftools", "consensus", "--fasta-ref", cmd.refFile, "-H", fmt.Sprint(phase)} + if maskfile != nil { + consargs = append(consargs, "--mask", "/dev/fd/3") + } + consargs = append(consargs, infile) + indexsuffix := ".tbi" + if _, err := os.Stat(infile + ".csi"); err == nil { + indexsuffix = ".csi" + } + consargs = maybeInDocker(consargs, []string{infile, infile + indexsuffix, cmd.refFile}) + + consensus := exec.Command(consargs[0], consargs[1:]...) consensus.Stderr = os.Stderr consensus.Stdout = gzipw + if maskfile != nil { + consensus.ExtraFiles = []*os.File{maskfile} + } err = consensus.Start() if err != nil { return err -- 2.30.2