vcf2fasta -mask cont'd.
[lightning.git] / vcf2fasta.go
index 77ae6abb68db548b75700faee14c4668073702f3..f7515c40f7d4b43052549533232e3749ecf423e1 100644 (file)
@@ -1,17 +1,20 @@
 package main
 
 import (
+       "bytes"
        "compress/gzip"
        "errors"
        "flag"
        "fmt"
        "io"
+       "io/ioutil"
        "net/http"
        _ "net/http/pprof"
        "os"
        "os/exec"
        "path/filepath"
        "runtime"
+       "strings"
        "sync"
 
        "git.arvados.org/arvados.git/sdk/go/arvados"
@@ -19,11 +22,14 @@ import (
 )
 
 type vcf2fasta struct {
-       refFile     string
-       projectUUID string
-       outputDir   string
-       runLocal    bool
-       vcpus       int
+       refFile           string
+       mask              bool
+       gvcfRegionsPy     string
+       gvcfRegionsPyData []byte
+       projectUUID       string
+       outputDir         string
+       runLocal          bool
+       vcpus             int
 }
 
 func (cmd *vcf2fasta) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
@@ -36,10 +42,13 @@ func (cmd *vcf2fasta) RunCommand(prog string, args []string, stdin io.Reader, st
        flags := flag.NewFlagSet("", flag.ContinueOnError)
        flags.SetOutput(stderr)
        flags.StringVar(&cmd.refFile, "ref", "", "reference fasta `file`")
+       flags.BoolVar(&cmd.mask, "mask", false, "mask uncalled regions (default: output hom ref)")
+       flags.StringVar(&cmd.gvcfRegionsPy, "gvcf-regions.py", "https://raw.githubusercontent.com/lijiayong/gvcf_regions/master/gvcf_regions.py", "source of gvcf_regions.py")
        flags.StringVar(&cmd.projectUUID, "project", "", "project `UUID` for containers and output data")
        flags.StringVar(&cmd.outputDir, "output-dir", "", "output directory")
        flags.IntVar(&cmd.vcpus, "vcpus", 0, "number of VCPUs to request for arvados container (default: 2*number of input files, max 32)")
        flags.BoolVar(&cmd.runLocal, "local", false, "run on local host (default: run in an arvados container)")
+       priority := flags.Int("priority", 500, "container request priority")
        pprof := flags.String("pprof", "", "serve Go profile data at http://`[addr]:port`")
        err = flags.Parse(args)
        if err == flag.ErrHelp {
@@ -61,6 +70,13 @@ func (cmd *vcf2fasta) RunCommand(prog string, args []string, stdin io.Reader, st
                }()
        }
 
+       if cmd.mask {
+               err = cmd.loadRegionsPy()
+               if err != nil {
+                       return 1
+               }
+       }
+
        if !cmd.runLocal {
                if cmd.outputDir != "" {
                        err = errors.New("cannot specify output dir in non-local mode")
@@ -82,6 +98,13 @@ func (cmd *vcf2fasta) RunCommand(prog string, args []string, stdin io.Reader, st
                        ProjectUUID: cmd.projectUUID,
                        RAM:         2<<30 + int64(cmd.vcpus)<<28,
                        VCPUs:       cmd.vcpus,
+                       Priority:    *priority,
+                       Mounts: map[string]map[string]interface{}{
+                               "/gvcf_regions.py": map[string]interface{}{
+                                       "kind":    "text",
+                                       "content": string(cmd.gvcfRegionsPyData),
+                               },
+                       },
                }
                err = runner.TranslatePaths(&cmd.refFile)
                if err != nil {
@@ -94,7 +117,7 @@ func (cmd *vcf2fasta) RunCommand(prog string, args []string, stdin io.Reader, st
                                return 1
                        }
                }
-               runner.Args = append([]string{"vcf2fasta", "-local=true", "-ref", cmd.refFile, "-output-dir", "/mnt/output"}, inputs...)
+               runner.Args = append([]string{"vcf2fasta", "-local=true", "-ref", cmd.refFile, fmt.Sprintf("-mask=%v", cmd.mask), "-gvcf-regions.py", "/gvcf_regions.py", "-output-dir", "/mnt/output"}, inputs...)
                var output string
                output, err = runner.Run()
                if err != nil {
@@ -154,23 +177,23 @@ func (cmd *vcf2fasta) RunCommand(prog string, args []string, stdin io.Reader, st
        return 0
 }
 
-func (cmd *vcf2fasta) vcf2fasta(infile string, phase int) error {
-       args := []string{"bcftools", "consensus", "--fasta-ref", cmd.refFile, "-H", fmt.Sprint(phase), infile}
-       indexsuffix := ".tbi"
-       if _, err := os.Stat(infile + ".csi"); err == nil {
-               indexsuffix = ".csi"
+func maybeInDocker(args, mountfiles []string) []string {
+       if out, err := exec.Command("docker", "image", "ls", "-q", "lightning-runtime").Output(); err != nil || len(out) == 0 {
+               return args
        }
-       if out, err := exec.Command("docker", "image", "ls", "-q", "lightning-runtime").Output(); err == nil && len(out) > 0 {
-               args = append([]string{
-                       "docker", "run", "--rm",
-                       "--log-driver=none",
-                       "--volume=" + infile + ":" + infile + ":ro",
-                       "--volume=" + infile + indexsuffix + ":" + infile + indexsuffix + ":ro",
-                       "--volume=" + cmd.refFile + ":" + cmd.refFile + ":ro",
-                       "lightning-runtime",
-               }, args...)
+       dockerrun := []string{
+               "docker", "run", "--rm",
+               "--log-driver=none",
        }
+       for _, f := range mountfiles {
+               dockerrun = append(dockerrun, "--volume="+f+":"+f+":ro")
+       }
+       dockerrun = append(dockerrun, "lightning-runtime")
+       dockerrun = append(dockerrun, args...)
+       return dockerrun
+}
 
+func (cmd *vcf2fasta) vcf2fasta(infile string, phase int) error {
        _, basename := filepath.Split(infile)
        outfile := filepath.Join(cmd.outputDir, fmt.Sprintf("%s.%d.fasta.gz", basename, phase))
        outf, err := os.OpenFile(outfile, os.O_CREATE|os.O_WRONLY|os.O_EXCL, 0777)
@@ -181,24 +204,112 @@ func (cmd *vcf2fasta) vcf2fasta(infile string, phase int) error {
        gzipw := gzip.NewWriter(outf)
        defer gzipw.Close()
 
-       consensus := exec.Command(args[0], args[1:]...)
-       consensus.Stderr = os.Stderr
-       consensus.Stdout = gzipw
-       err = consensus.Start()
-       if err != nil {
-               return err
-       }
-       err = consensus.Wait()
-       if err != nil {
-               return err
-       }
-       err = gzipw.Close()
-       if err != nil {
-               return err
+       var maskfile *os.File // reading side of a pipe if we're running bedtools, otherwise nil
+
+       var wg sync.WaitGroup
+       errs := make(chan error, 3)
+       if cmd.mask {
+               bedr, bedw, err := os.Pipe()
+               if err != nil {
+                       return err
+               }
+               bedargs := []string{"python", "-", "--gvcf_type", "gatk", infile}
+               bed := exec.Command(bedargs[0], bedargs[1:]...)
+               bed.Stdin = bytes.NewBuffer(cmd.gvcfRegionsPyData)
+               bed.Stdout = bedw
+               wg.Add(1)
+               go func() {
+                       defer wg.Done()
+                       log.Printf("running %v", bed.Args)
+                       errs <- bed.Run()
+               }()
+
+               bedcompr, bedcompw, err := os.Pipe()
+               if err != nil {
+                       return err
+               }
+               bedcompargs := []string{"bedtools", "complement", "-i", "/dev/stdin", "-g", cmd.refFile}
+               bedcompargs = maybeInDocker(bedcompargs, []string{cmd.refFile, infile})
+               bedcomp := exec.Command(bedcompargs[0], bedcompargs[1:]...)
+               bedcomp.Stdin = bedr
+               bedcomp.Stdout = bedcompw
+               wg.Add(1)
+               go func() {
+                       defer wg.Done()
+                       log.Printf("running %v", bedcomp.Args)
+                       errs <- bedcomp.Run()
+               }()
+               maskfile = bedcompr
        }
-       err = outf.Close()
-       if err != nil {
-               return err
+
+       wg.Add(1)
+       go func() {
+               defer wg.Done()
+               consargs := []string{"bcftools", "consensus", "--fasta-ref", cmd.refFile, "-H", fmt.Sprint(phase)}
+               if maskfile != nil {
+                       consargs = append(consargs, "--mask", "/dev/fd/3")
+               }
+               consargs = append(consargs, infile)
+               indexsuffix := ".tbi"
+               if _, err := os.Stat(infile + ".csi"); err == nil {
+                       indexsuffix = ".csi"
+               }
+               consargs = maybeInDocker(consargs, []string{infile, infile + indexsuffix, cmd.refFile})
+
+               consensus := exec.Command(consargs[0], consargs[1:]...)
+               consensus.Stderr = os.Stderr
+               consensus.Stdout = gzipw
+               if maskfile != nil {
+                       consensus.ExtraFiles = []*os.File{maskfile}
+               }
+               log.Printf("running %v", consensus.Args)
+               err = consensus.Run()
+               if err != nil {
+                       errs <- err
+                       return
+               }
+               err = gzipw.Close()
+               if err != nil {
+                       errs <- err
+                       return
+               }
+               errs <- outf.Close()
+       }()
+
+       go func() {
+               wg.Wait()
+               close(errs)
+       }()
+
+       for err := range errs {
+               if err != nil {
+                       return err
+               }
        }
        return nil
 }
+
+func (cmd *vcf2fasta) loadRegionsPy() error {
+       if strings.HasPrefix(cmd.gvcfRegionsPy, "http") {
+               resp, err := http.Get(cmd.gvcfRegionsPy)
+               if err != nil {
+                       return err
+               }
+               if resp.StatusCode != http.StatusOK {
+                       return fmt.Errorf("get %q: http status %d", cmd.gvcfRegionsPy, resp.StatusCode)
+               }
+               buf, err := ioutil.ReadAll(resp.Body)
+               if err != nil {
+                       return fmt.Errorf("get %q: read body: %s", cmd.gvcfRegionsPy, err)
+               }
+               cmd.gvcfRegionsPyData = buf
+               return nil
+       } else {
+               buf, err := ioutil.ReadFile(cmd.gvcfRegionsPy)
+               if err != nil {
+                       return err
+               }
+               cmd.gvcfRegionsPyData = buf
+               return nil
+       }
+}