"runtime"
"strings"
"sync"
+ "syscall"
"git.arvados.org/arvados.git/sdk/go/arvados"
log "github.com/sirupsen/logrus"
gzipw := gzip.NewWriter(outf)
defer gzipw.Close()
- var maskfile *os.File // reading side of a pipe if we're running bedtools, otherwise nil
+ var maskfifo string // filename of mask fifo if we're running bedtools, otherwise ""
var wg sync.WaitGroup
errs := make(chan error, 3)
defer bedw.Close()
log.Printf("running %v", bed.Args)
err := bed.Run()
+ log.Printf("exited %v", bed.Args)
if err != nil {
errs <- fmt.Errorf("gvcf_regions: %s", err)
}
}()
- bedcompr, bedcompw, err := os.Pipe()
+ // The bcftools --mask argument needs to end in ".bed"
+ // in order to be parsed as a BED file, so we need to
+ // use a named pipe instead of stdin.
+ tempdir, err := ioutil.TempDir("", "")
if err != nil {
- return err
+ return fmt.Errorf("TempDir: %s", err)
+ }
+ defer os.RemoveAll(tempdir)
+ maskfifo = filepath.Join(tempdir, "fifo.bed")
+ err = syscall.Mkfifo(maskfifo, 0600)
+ if err != nil {
+ return fmt.Errorf("mkfifo: %s", err)
}
- bedcompargs := []string{"bedtools", "complement", "-i", "/dev/stdin", "-g", cmd.genomeFile}
- bedcompargs = maybeInDocker(bedcompargs, []string{cmd.genomeFile})
- bedcomp := exec.Command(bedcompargs[0], bedcompargs[1:]...)
- bedcomp.Stdin = bedr
- bedcomp.Stdout = bedcompw
- bedcomp.Stderr = cmd.stderr
wg.Add(1)
go func() {
defer wg.Done()
- defer bedcompw.Close()
+
+ maskfifow, err := os.OpenFile(maskfifo, os.O_WRONLY, 0)
+ if err != nil {
+ errs <- err
+ return
+ }
+ defer maskfifow.Close()
+
+ bedcompargs := []string{"bedtools", "complement", "-i", "/dev/stdin", "-g", cmd.genomeFile}
+ bedcompargs = maybeInDocker(bedcompargs, []string{cmd.genomeFile})
+ bedcomp := exec.Command(bedcompargs[0], bedcompargs[1:]...)
+ bedcomp.Stdin = bedr
+ bedcomp.Stdout = maskfifow
+ bedcomp.Stderr = cmd.stderr
log.Printf("running %v", bedcomp.Args)
- err := bedcomp.Run()
+ err = bedcomp.Run()
+ log.Printf("exited %v", bedcomp.Args)
if err != nil {
errs <- fmt.Errorf("bedtools complement: %s", err)
+ return
+ }
+ err = maskfifow.Close()
+ if err != nil {
+ errs <- err
+ return
}
}()
- maskfile = bedcompr
}
wg.Add(1)
go func() {
defer wg.Done()
consargs := []string{"bcftools", "consensus", "--fasta-ref", cmd.refFile, "-H", fmt.Sprint(phase)}
- if maskfile != nil {
- consargs = append(consargs, "--mask", "/dev/stdin")
+ if maskfifo != "" {
+ consargs = append(consargs, "--mask", maskfifo)
}
consargs = append(consargs, infile)
indexsuffix := ".tbi"
if _, err := os.Stat(infile + ".csi"); err == nil {
indexsuffix = ".csi"
}
- consargs = maybeInDocker(consargs, []string{infile, infile + indexsuffix, cmd.refFile})
+ mounts := []string{infile, infile + indexsuffix, cmd.refFile}
+ if maskfifo != "" {
+ mounts = append(mounts, maskfifo)
+ }
+ consargs = maybeInDocker(consargs, mounts)
consensus := exec.Command(consargs[0], consargs[1:]...)
consensus.Stderr = os.Stderr
consensus.Stdout = gzipw
consensus.Stderr = cmd.stderr
- if maskfile != nil {
- consensus.Stdin = maskfile
- }
log.Printf("running %v", consensus.Args)
err = consensus.Run()
if err != nil {