Add vcf2fasta -mask option.
authorTom Clegg <tom@tomclegg.ca>
Mon, 9 Mar 2020 19:28:31 +0000 (15:28 -0400)
committerTom Clegg <tom@tomclegg.ca>
Mon, 9 Mar 2020 19:28:31 +0000 (15:28 -0400)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@tomclegg.ca>

vcf2fasta.go

index 77ae6abb68db548b75700faee14c4668073702f3..9f6f085cd854bed1ddd4e6db14a03918b31eff4f 100644 (file)
@@ -1,11 +1,13 @@
 package main
 
 import (
+       "bytes"
        "compress/gzip"
        "errors"
        "flag"
        "fmt"
        "io"
+       "io/ioutil"
        "net/http"
        _ "net/http/pprof"
        "os"
@@ -19,11 +21,14 @@ import (
 )
 
 type vcf2fasta struct {
-       refFile     string
-       projectUUID string
-       outputDir   string
-       runLocal    bool
-       vcpus       int
+       refFile           string
+       mask              bool
+       gvcfRegionsPy     string
+       gvcfRegionsPyData []byte
+       projectUUID       string
+       outputDir         string
+       runLocal          bool
+       vcpus             int
 }
 
 func (cmd *vcf2fasta) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
@@ -36,6 +41,8 @@ func (cmd *vcf2fasta) RunCommand(prog string, args []string, stdin io.Reader, st
        flags := flag.NewFlagSet("", flag.ContinueOnError)
        flags.SetOutput(stderr)
        flags.StringVar(&cmd.refFile, "ref", "", "reference fasta `file`")
+       flags.BoolVar(&cmd.mask, "mask", false, "mask uncalled regions (default: output hom ref)")
+       flags.StringVar(&cmd.gvcfRegionsPy, "gvcf-regions.py", "https://raw.githubusercontent.com/lijiayong/gvcf_regions/master/gvcf_regions.py", "source of gvcf_regions.py")
        flags.StringVar(&cmd.projectUUID, "project", "", "project `UUID` for containers and output data")
        flags.StringVar(&cmd.outputDir, "output-dir", "", "output directory")
        flags.IntVar(&cmd.vcpus, "vcpus", 0, "number of VCPUs to request for arvados container (default: 2*number of input files, max 32)")
@@ -94,7 +101,7 @@ func (cmd *vcf2fasta) RunCommand(prog string, args []string, stdin io.Reader, st
                                return 1
                        }
                }
-               runner.Args = append([]string{"vcf2fasta", "-local=true", "-ref", cmd.refFile, "-output-dir", "/mnt/output"}, inputs...)
+               runner.Args = append([]string{"vcf2fasta", "-local=true", "-ref", cmd.refFile, fmt.Sprintf("-mask=%v", cmd.mask), "-gvcf-regions.py", cmd.gvcfRegionsPy, "-output-dir", "/mnt/output"}, inputs...)
                var output string
                output, err = runner.Run()
                if err != nil {
@@ -104,6 +111,23 @@ func (cmd *vcf2fasta) RunCommand(prog string, args []string, stdin io.Reader, st
                return 0
        }
 
+       if cmd.mask {
+               var resp *http.Response
+               resp, err = http.Get(cmd.gvcfRegionsPy)
+               if err != nil {
+                       return 1
+               }
+               if resp.StatusCode != http.StatusOK {
+                       err = fmt.Errorf("get %q: http status %d", cmd.gvcfRegionsPy, resp.StatusCode)
+                       return 1
+               }
+               cmd.gvcfRegionsPyData, err = ioutil.ReadAll(resp.Body)
+               if err != nil {
+                       err = fmt.Errorf("get %q: read body: %s", cmd.gvcfRegionsPy, err)
+                       return 1
+               }
+       }
+
        infiles, err := listInputFiles(flags.Args())
        if err != nil {
                return 1
@@ -154,23 +178,23 @@ func (cmd *vcf2fasta) RunCommand(prog string, args []string, stdin io.Reader, st
        return 0
 }
 
-func (cmd *vcf2fasta) vcf2fasta(infile string, phase int) error {
-       args := []string{"bcftools", "consensus", "--fasta-ref", cmd.refFile, "-H", fmt.Sprint(phase), infile}
-       indexsuffix := ".tbi"
-       if _, err := os.Stat(infile + ".csi"); err == nil {
-               indexsuffix = ".csi"
+func maybeInDocker(args, mountfiles []string) []string {
+       if out, err := exec.Command("docker", "image", "ls", "-q", "lightning-runtime").Output(); err != nil || len(out) == 0 {
+               return args
        }
-       if out, err := exec.Command("docker", "image", "ls", "-q", "lightning-runtime").Output(); err == nil && len(out) > 0 {
-               args = append([]string{
-                       "docker", "run", "--rm",
-                       "--log-driver=none",
-                       "--volume=" + infile + ":" + infile + ":ro",
-                       "--volume=" + infile + indexsuffix + ":" + infile + indexsuffix + ":ro",
-                       "--volume=" + cmd.refFile + ":" + cmd.refFile + ":ro",
-                       "lightning-runtime",
-               }, args...)
+       dockerrun := []string{
+               "docker", "run", "--rm",
+               "--log-driver=none",
        }
+       for _, f := range mountfiles {
+               dockerrun = append(dockerrun, "--volume="+f+":"+f+":ro")
+       }
+       dockerrun = append(dockerrun, "lightning-runtime")
+       dockerrun = append(dockerrun, args...)
+       return dockerrun
+}
 
+func (cmd *vcf2fasta) vcf2fasta(infile string, phase int) error {
        _, basename := filepath.Split(infile)
        outfile := filepath.Join(cmd.outputDir, fmt.Sprintf("%s.%d.fasta.gz", basename, phase))
        outf, err := os.OpenFile(outfile, os.O_CREATE|os.O_WRONLY|os.O_EXCL, 0777)
@@ -181,9 +205,59 @@ func (cmd *vcf2fasta) vcf2fasta(infile string, phase int) error {
        gzipw := gzip.NewWriter(outf)
        defer gzipw.Close()
 
-       consensus := exec.Command(args[0], args[1:]...)
+       var maskfile *os.File // reading side of a pipe if we're running bedtools, otherwise nil
+
+       var wg sync.WaitGroup
+       errs := make(chan error, 3)
+       if cmd.mask {
+               bedr, bedw, err := os.Pipe()
+               if err != nil {
+                       return err
+               }
+               bedargs := []string{"python", "-", "--gvcf_type", "gatk", infile}
+               bed := exec.Command(bedargs[0], bedargs[1:]...)
+               bed.Stdin = bytes.NewBuffer(cmd.gvcfRegionsPyData)
+               bed.Stdout = bedw
+               wg.Add(1)
+               go func() {
+                       defer wg.Done()
+                       errs <- bed.Run()
+               }()
+
+               bedcompr, bedcompw, err := os.Pipe()
+               if err != nil {
+                       return err
+               }
+               bedcompargs := []string{"bedtools", "complement", "-i", "/dev/stdin", "-g", cmd.refFile}
+               bedcompargs = maybeInDocker(bedcompargs, []string{cmd.refFile, infile})
+               bedcomp := exec.Command(bedcompargs[0], bedcompargs[1:]...)
+               bedcomp.Stdin = bedr
+               bedcomp.Stdout = bedcompw
+               wg.Add(1)
+               go func() {
+                       defer wg.Done()
+                       errs <- bedcomp.Run()
+               }()
+               maskfile = bedcompr
+       }
+
+       consargs := []string{"bcftools", "consensus", "--fasta-ref", cmd.refFile, "-H", fmt.Sprint(phase)}
+       if maskfile != nil {
+               consargs = append(consargs, "--mask", "/dev/fd/3")
+       }
+       consargs = append(consargs, infile)
+       indexsuffix := ".tbi"
+       if _, err := os.Stat(infile + ".csi"); err == nil {
+               indexsuffix = ".csi"
+       }
+       consargs = maybeInDocker(consargs, []string{infile, infile + indexsuffix, cmd.refFile})
+
+       consensus := exec.Command(consargs[0], consargs[1:]...)
        consensus.Stderr = os.Stderr
        consensus.Stdout = gzipw
+       if maskfile != nil {
+               consensus.ExtraFiles = []*os.File{maskfile}
+       }
        err = consensus.Start()
        if err != nil {
                return err