Use named pipe for bcftools --mask data.
authorTom Clegg <tom@tomclegg.ca>
Wed, 11 Mar 2020 18:53:49 +0000 (14:53 -0400)
committerTom Clegg <tom@tomclegg.ca>
Wed, 11 Mar 2020 18:53:49 +0000 (14:53 -0400)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@tomclegg.ca>

vcf2fasta.go

index e02f77556c66f39033074ff5c5bbdb6ab0ecc7c7..04e0b7422f3970eaf1271311226a8a3c6a4a6ae4 100644 (file)
@@ -16,6 +16,7 @@ import (
        "runtime"
        "strings"
        "sync"
+       "syscall"
 
        "git.arvados.org/arvados.git/sdk/go/arvados"
        log "github.com/sirupsen/logrus"
@@ -209,7 +210,7 @@ func (cmd *vcf2fasta) vcf2fasta(infile string, phase int) error {
        gzipw := gzip.NewWriter(outf)
        defer gzipw.Close()
 
-       var maskfile *os.File // reading side of a pipe if we're running bedtools, otherwise nil
+       var maskfifo string // filename of mask fifo if we're running bedtools, otherwise ""
 
        var wg sync.WaitGroup
        errs := make(chan error, 3)
@@ -232,55 +233,79 @@ func (cmd *vcf2fasta) vcf2fasta(infile string, phase int) error {
                        defer bedw.Close()
                        log.Printf("running %v", bed.Args)
                        err := bed.Run()
+                       log.Printf("exited %v", bed.Args)
                        if err != nil {
                                errs <- fmt.Errorf("gvcf_regions: %s", err)
                        }
                }()
 
-               bedcompr, bedcompw, err := os.Pipe()
+               // The bcftools --mask argument needs to end in ".bed"
+               // in order to be parsed as a BED file, so we need to
+               // use a named pipe instead of stdin.
+               tempdir, err := ioutil.TempDir("", "")
                if err != nil {
-                       return err
+                       return fmt.Errorf("TempDir: %s", err)
+               }
+               defer os.RemoveAll(tempdir)
+               maskfifo = filepath.Join(tempdir, "fifo.bed")
+               err = syscall.Mkfifo(maskfifo, 0600)
+               if err != nil {
+                       return fmt.Errorf("mkfifo: %s", err)
                }
-               bedcompargs := []string{"bedtools", "complement", "-i", "/dev/stdin", "-g", cmd.genomeFile}
-               bedcompargs = maybeInDocker(bedcompargs, []string{cmd.genomeFile})
-               bedcomp := exec.Command(bedcompargs[0], bedcompargs[1:]...)
-               bedcomp.Stdin = bedr
-               bedcomp.Stdout = bedcompw
-               bedcomp.Stderr = cmd.stderr
                wg.Add(1)
                go func() {
                        defer wg.Done()
-                       defer bedcompw.Close()
+
+                       maskfifow, err := os.OpenFile(maskfifo, os.O_WRONLY, 0)
+                       if err != nil {
+                               errs <- err
+                               return
+                       }
+                       defer maskfifow.Close()
+
+                       bedcompargs := []string{"bedtools", "complement", "-i", "/dev/stdin", "-g", cmd.genomeFile}
+                       bedcompargs = maybeInDocker(bedcompargs, []string{cmd.genomeFile})
+                       bedcomp := exec.Command(bedcompargs[0], bedcompargs[1:]...)
+                       bedcomp.Stdin = bedr
+                       bedcomp.Stdout = maskfifow
+                       bedcomp.Stderr = cmd.stderr
                        log.Printf("running %v", bedcomp.Args)
-                       err := bedcomp.Run()
+                       err = bedcomp.Run()
+                       log.Printf("exited %v", bedcomp.Args)
                        if err != nil {
                                errs <- fmt.Errorf("bedtools complement: %s", err)
+                               return
+                       }
+                       err = maskfifow.Close()
+                       if err != nil {
+                               errs <- err
+                               return
                        }
                }()
-               maskfile = bedcompr
        }
 
        wg.Add(1)
        go func() {
                defer wg.Done()
                consargs := []string{"bcftools", "consensus", "--fasta-ref", cmd.refFile, "-H", fmt.Sprint(phase)}
-               if maskfile != nil {
-                       consargs = append(consargs, "--mask", "/dev/stdin")
+               if maskfifo != "" {
+                       consargs = append(consargs, "--mask", maskfifo)
                }
                consargs = append(consargs, infile)
                indexsuffix := ".tbi"
                if _, err := os.Stat(infile + ".csi"); err == nil {
                        indexsuffix = ".csi"
                }
-               consargs = maybeInDocker(consargs, []string{infile, infile + indexsuffix, cmd.refFile})
+               mounts := []string{infile, infile + indexsuffix, cmd.refFile}
+               if maskfifo != "" {
+                       mounts = append(mounts, maskfifo)
+               }
+               consargs = maybeInDocker(consargs, mounts)
 
                consensus := exec.Command(consargs[0], consargs[1:]...)
                consensus.Stderr = os.Stderr
                consensus.Stdout = gzipw
                consensus.Stderr = cmd.stderr
-               if maskfile != nil {
-                       consensus.Stdin = maskfile
-               }
                log.Printf("running %v", consensus.Args)
                err = consensus.Run()
                if err != nil {