package main
import (
+ "bytes"
"compress/gzip"
"errors"
"flag"
"fmt"
"io"
+ "io/ioutil"
"net/http"
_ "net/http/pprof"
"os"
)
type vcf2fasta struct {
- refFile string
- projectUUID string
- outputDir string
- runLocal bool
- vcpus int
+ refFile string
+ mask bool
+ gvcfRegionsPy string
+ gvcfRegionsPyData []byte
+ projectUUID string
+ outputDir string
+ runLocal bool
+ vcpus int
}
func (cmd *vcf2fasta) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
flags := flag.NewFlagSet("", flag.ContinueOnError)
flags.SetOutput(stderr)
flags.StringVar(&cmd.refFile, "ref", "", "reference fasta `file`")
+ flags.BoolVar(&cmd.mask, "mask", false, "mask uncalled regions (default: output hom ref)")
+ flags.StringVar(&cmd.gvcfRegionsPy, "gvcf-regions.py", "https://raw.githubusercontent.com/lijiayong/gvcf_regions/master/gvcf_regions.py", "source of gvcf_regions.py")
flags.StringVar(&cmd.projectUUID, "project", "", "project `UUID` for containers and output data")
flags.StringVar(&cmd.outputDir, "output-dir", "", "output directory")
flags.IntVar(&cmd.vcpus, "vcpus", 0, "number of VCPUs to request for arvados container (default: 2*number of input files, max 32)")
return 1
}
}
- runner.Args = append([]string{"vcf2fasta", "-local=true", "-ref", cmd.refFile, "-output-dir", "/mnt/output"}, inputs...)
+ runner.Args = append([]string{"vcf2fasta", "-local=true", "-ref", cmd.refFile, fmt.Sprintf("-mask=%v", cmd.mask), "-gvcf-regions.py", cmd.gvcfRegionsPy, "-output-dir", "/mnt/output"}, inputs...)
var output string
output, err = runner.Run()
if err != nil {
return 0
}
+ if cmd.mask {
+ var resp *http.Response
+ resp, err = http.Get(cmd.gvcfRegionsPy)
+ if err != nil {
+ return 1
+ }
+ if resp.StatusCode != http.StatusOK {
+ err = fmt.Errorf("get %q: http status %d", cmd.gvcfRegionsPy, resp.StatusCode)
+ return 1
+ }
+ cmd.gvcfRegionsPyData, err = ioutil.ReadAll(resp.Body)
+ if err != nil {
+ err = fmt.Errorf("get %q: read body: %s", cmd.gvcfRegionsPy, err)
+ return 1
+ }
+ }
+
infiles, err := listInputFiles(flags.Args())
if err != nil {
return 1
return 0
}
-func (cmd *vcf2fasta) vcf2fasta(infile string, phase int) error {
- args := []string{"bcftools", "consensus", "--fasta-ref", cmd.refFile, "-H", fmt.Sprint(phase), infile}
- indexsuffix := ".tbi"
- if _, err := os.Stat(infile + ".csi"); err == nil {
- indexsuffix = ".csi"
+func maybeInDocker(args, mountfiles []string) []string {
+ if out, err := exec.Command("docker", "image", "ls", "-q", "lightning-runtime").Output(); err != nil || len(out) == 0 {
+ return args
}
- if out, err := exec.Command("docker", "image", "ls", "-q", "lightning-runtime").Output(); err == nil && len(out) > 0 {
- args = append([]string{
- "docker", "run", "--rm",
- "--log-driver=none",
- "--volume=" + infile + ":" + infile + ":ro",
- "--volume=" + infile + indexsuffix + ":" + infile + indexsuffix + ":ro",
- "--volume=" + cmd.refFile + ":" + cmd.refFile + ":ro",
- "lightning-runtime",
- }, args...)
+ dockerrun := []string{
+ "docker", "run", "--rm",
+ "--log-driver=none",
}
+ for _, f := range mountfiles {
+ dockerrun = append(dockerrun, "--volume="+f+":"+f+":ro")
+ }
+ dockerrun = append(dockerrun, "lightning-runtime")
+ dockerrun = append(dockerrun, args...)
+ return dockerrun
+}
+func (cmd *vcf2fasta) vcf2fasta(infile string, phase int) error {
_, basename := filepath.Split(infile)
outfile := filepath.Join(cmd.outputDir, fmt.Sprintf("%s.%d.fasta.gz", basename, phase))
outf, err := os.OpenFile(outfile, os.O_CREATE|os.O_WRONLY|os.O_EXCL, 0777)
gzipw := gzip.NewWriter(outf)
defer gzipw.Close()
- consensus := exec.Command(args[0], args[1:]...)
+ var maskfile *os.File // reading side of a pipe if we're running bedtools, otherwise nil
+
+ var wg sync.WaitGroup
+ errs := make(chan error, 3)
+ if cmd.mask {
+ bedr, bedw, err := os.Pipe()
+ if err != nil {
+ return err
+ }
+ bedargs := []string{"python", "-", "--gvcf_type", "gatk", infile}
+ bed := exec.Command(bedargs[0], bedargs[1:]...)
+ bed.Stdin = bytes.NewBuffer(cmd.gvcfRegionsPyData)
+ bed.Stdout = bedw
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ errs <- bed.Run()
+ }()
+
+ bedcompr, bedcompw, err := os.Pipe()
+ if err != nil {
+ return err
+ }
+ bedcompargs := []string{"bedtools", "complement", "-i", "/dev/stdin", "-g", cmd.refFile}
+ bedcompargs = maybeInDocker(bedcompargs, []string{cmd.refFile, infile})
+ bedcomp := exec.Command(bedcompargs[0], bedcompargs[1:]...)
+ bedcomp.Stdin = bedr
+ bedcomp.Stdout = bedcompw
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ errs <- bedcomp.Run()
+ }()
+ maskfile = bedcompr
+ }
+
+ consargs := []string{"bcftools", "consensus", "--fasta-ref", cmd.refFile, "-H", fmt.Sprint(phase)}
+ if maskfile != nil {
+ consargs = append(consargs, "--mask", "/dev/fd/3")
+ }
+ consargs = append(consargs, infile)
+ indexsuffix := ".tbi"
+ if _, err := os.Stat(infile + ".csi"); err == nil {
+ indexsuffix = ".csi"
+ }
+ consargs = maybeInDocker(consargs, []string{infile, infile + indexsuffix, cmd.refFile})
+
+ consensus := exec.Command(consargs[0], consargs[1:]...)
consensus.Stderr = os.Stderr
consensus.Stdout = gzipw
+ if maskfile != nil {
+ consensus.ExtraFiles = []*os.File{maskfile}
+ }
err = consensus.Start()
if err != nil {
return err