-package main
+package lightning
import (
+ "bufio"
"bytes"
"compress/gzip"
+ "context"
"errors"
"flag"
"fmt"
"os"
"os/exec"
"path/filepath"
+ "regexp"
"runtime"
+ "strconv"
"strings"
"sync"
+ "syscall"
- "git.arvados.org/arvados.git/sdk/go/arvados"
+ "github.com/klauspost/pgzip"
log "github.com/sirupsen/logrus"
)
mask bool
gvcfRegionsPy string
gvcfRegionsPyData []byte
+ gvcfType string
projectUUID string
outputDir string
runLocal bool
vcpus int
+ batchArgs
stderr io.Writer
}
flags.StringVar(&cmd.genomeFile, "genome", "", "reference genome `file`")
flags.BoolVar(&cmd.mask, "mask", false, "mask uncalled regions (default: output hom ref)")
flags.StringVar(&cmd.gvcfRegionsPy, "gvcf-regions.py", "https://raw.githubusercontent.com/lijiayong/gvcf_regions/master/gvcf_regions.py", "source of gvcf_regions.py")
+ flags.StringVar(&cmd.gvcfType, "gvcf-type", "gatk", "gvcf_type argument to gvcf_regions.py: gatk, complete_genomics, freebayes")
flags.StringVar(&cmd.projectUUID, "project", "", "project `UUID` for containers and output data")
flags.StringVar(&cmd.outputDir, "output-dir", "", "output directory")
flags.IntVar(&cmd.vcpus, "vcpus", 0, "number of VCPUs to request for arvados container (default: 2*number of input files, max 32)")
flags.BoolVar(&cmd.runLocal, "local", false, "run on local host (default: run in an arvados container)")
+ cmd.batchArgs.Flags(flags)
priority := flags.Int("priority", 500, "container request priority")
pprof := flags.String("pprof", "", "serve Go profile data at http://`[addr]:port`")
err = flags.Parse(args)
if err != nil {
return 1
}
- if cmd.vcpus = len(infiles) * 2; cmd.vcpus > 32 {
+ batchsize := (len(infiles) + cmd.batchArgs.batches - 1) / cmd.batchArgs.batches
+ if cmd.vcpus = batchsize * 2; cmd.vcpus > 32 {
cmd.vcpus = 32
}
}
runner := arvadosContainerRunner{
Name: "lightning vcf2fasta",
- Client: arvados.NewClientFromEnv(),
+ Client: arvadosClientFromEnv,
ProjectUUID: cmd.projectUUID,
RAM: 2<<30 + int64(cmd.vcpus)<<28,
VCPUs: cmd.vcpus,
Priority: *priority,
+ KeepCache: 2,
+ APIAccess: true,
Mounts: map[string]map[string]interface{}{
"/gvcf_regions.py": map[string]interface{}{
"kind": "text",
},
},
}
- err = runner.TranslatePaths(&cmd.refFile)
+ err = runner.TranslatePaths(&cmd.refFile, &cmd.genomeFile)
if err != nil {
return 1
}
return 1
}
}
- runner.Args = append([]string{"vcf2fasta", "-local=true", "-ref", cmd.refFile, fmt.Sprintf("-mask=%v", cmd.mask), "-gvcf-regions.py", "/gvcf_regions.py", "-output-dir", "/mnt/output"}, inputs...)
- var output string
- output, err = runner.Run()
+ var outputs []string
+ outputs, err = cmd.batchArgs.RunBatches(context.Background(), func(ctx context.Context, batch int) (string, error) {
+ runner := runner
+ if cmd.mask {
+ runner.RAM += int64(cmd.vcpus) << 31
+ }
+ runner.Args = []string{"vcf2fasta",
+ "-local=true",
+ "-ref", cmd.refFile, fmt.Sprintf("-mask=%v", cmd.mask),
+ "-genome", cmd.genomeFile,
+ "-gvcf-regions.py", "/gvcf_regions.py",
+ "-gvcf-type", cmd.gvcfType,
+ "-output-dir", "/mnt/output",
+ }
+ runner.Args = append(runner.Args, cmd.batchArgs.Args(batch)...)
+ runner.Args = append(runner.Args, inputs...)
+ log.Printf("batch %d: %v", batch, runner.Args)
+ return runner.RunContext(ctx)
+ })
if err != nil {
return 1
}
- fmt.Fprintln(stdout, output)
+ fmt.Fprintln(stdout, strings.Join(outputs, " "))
return 0
}
if err != nil {
return 1
}
+ infiles = cmd.batchArgs.Slice(infiles)
type job struct {
vcffile string
return args
}
dockerrun := []string{
- "docker", "run", "--rm",
+ "docker", "run", "--rm", "-i",
"--log-driver=none",
}
for _, f := range mountfiles {
}
func (cmd *vcf2fasta) vcf2fasta(infile string, phase int) error {
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
_, basename := filepath.Split(infile)
outfile := filepath.Join(cmd.outputDir, fmt.Sprintf("%s.%d.fasta.gz", basename, phase))
- outf, err := os.OpenFile(outfile, os.O_CREATE|os.O_WRONLY|os.O_EXCL, 0777)
+ outf, err := os.OpenFile(outfile, os.O_CREATE|os.O_WRONLY, 0777)
if err != nil {
return fmt.Errorf("error opening output file: %s", err)
}
defer outf.Close()
- gzipw := gzip.NewWriter(outf)
+ bufw := bufio.NewWriterSize(outf, 8*1024*1024)
+ gzipw := pgzip.NewWriter(bufw)
defer gzipw.Close()
- var maskfile *os.File // reading side of a pipe if we're running bedtools, otherwise nil
+ var maskfifo string // filename of mask fifo if we're running bedtools, otherwise ""
var wg sync.WaitGroup
- errs := make(chan error, 3)
+ errs := make(chan error, 2)
if cmd.mask {
- if cmd.genomeFile == "" {
- return errors.New("cannot apply mask without -genome argument")
- }
- bedr, bedw, err := os.Pipe()
+ chrSize := map[string]int{}
+
+ vcffile, err := open(infile)
if err != nil {
return err
}
- bedargs := []string{"python", "-", "--gvcf_type", "gatk", infile}
- bed := exec.Command(bedargs[0], bedargs[1:]...)
+ defer vcffile.Close()
+ var rdr io.Reader = vcffile
+ rdr = bufio.NewReaderSize(rdr, 8*1024*1024)
+ if strings.HasSuffix(infile, ".gz") {
+ rdr, err = gzip.NewReader(vcffile)
+ if err != nil {
+ return err
+ }
+ }
+ contigre := regexp.MustCompile(`([^=,]*)=([^>,]*)`)
+ scanner := bufio.NewScanner(rdr)
+ for scanner.Scan() {
+ if s := scanner.Text(); !strings.HasPrefix(s, "##") {
+ break
+ } else if !strings.HasPrefix(s, "##contig=<") {
+ continue
+ } else {
+ kv := map[string]string{}
+ for _, m := range contigre.FindAllStringSubmatch(s[10:], -1) {
+ kv[m[1]] = m[2]
+ }
+ if kv["ID"] != "" && kv["length"] != "" {
+ chrSize[kv["ID"]], _ = strconv.Atoi(kv["length"])
+ }
+ }
+ }
+ if err = scanner.Err(); err != nil {
+ return fmt.Errorf("error scanning input file %q: %s", infile, err)
+ }
+
+ var regions bytes.Buffer
+ bedargs := []string{"python2", "-"}
+ if cmd.gvcfType == "complete_genomics_pass_all" {
+ bedargs = append(bedargs,
+ "--ignore_phrases", "CNV", "INS:ME",
+ "--unreported_is_called",
+ )
+ } else if cmd.gvcfType != "" {
+ bedargs = append(bedargs, "--gvcf_type", cmd.gvcfType)
+ }
+ bedargs = append(bedargs, infile)
+ bed := exec.CommandContext(ctx, bedargs[0], bedargs[1:]...)
bed.Stdin = bytes.NewBuffer(cmd.gvcfRegionsPyData)
- bed.Stdout = bedw
+ bed.Stdout = ®ions
bed.Stderr = cmd.stderr
- wg.Add(1)
- go func() {
- defer wg.Done()
- log.Printf("running %v", bed.Args)
- errs <- bed.Run()
- }()
+ log.Printf("running %v", bed.Args)
+ err = bed.Run()
+ log.Printf("exited %v", bed.Args)
+ if err != nil {
+ return fmt.Errorf("gvcf_regions: %s", err)
+ }
+
+ if cmd.genomeFile != "" {
+ // Read chromosome sizes from genome file in
+ // case any weren't specified in the VCF
+ // header.
+ genomeFile, err := open(cmd.genomeFile)
+ if err != nil {
+ return fmt.Errorf("error opening genome file %q: %s", cmd.genomeFile, err)
+ }
+ scanner := bufio.NewScanner(genomeFile)
+ for scanner.Scan() {
+ var chr string
+ var size int
+ _, err := fmt.Sscanf(scanner.Text(), "%s\t%d", &chr, &size)
+ if err != nil {
+ return fmt.Errorf("error parsing genome file %q: %s", cmd.genomeFile, err)
+ }
+ if chrSize[chr] == 0 {
+ chrSize[chr] = size
+ }
+ }
+ if err = scanner.Err(); err != nil {
+ return fmt.Errorf("error scanning genome file %q: %s", cmd.genomeFile, err)
+ }
+ }
+
+ // "bedtools complement" expects the chromosome sizes
+ // ("genome file") to appear in the same order as the
+ // chromosomes in the input vcf, so we need to sort
+ // them.
+ scanner = bufio.NewScanner(bytes.NewBuffer(append([]byte(nil), regions.Bytes()...)))
+ var sortedGenomeFile bytes.Buffer
+ for scanner.Scan() {
+ var chr string
+ var size int
+ _, err := fmt.Sscanf(scanner.Text(), "%s\t%d", &chr, &size)
+ if err != nil {
+ return fmt.Errorf("error parsing gvcf_regions output: %s", err)
+ }
+ if size, ok := chrSize[chr]; ok {
+ fmt.Fprintf(&sortedGenomeFile, "%s\t%d\n", chr, size)
+ delete(chrSize, chr)
+ }
+ }
- bedcompr, bedcompw, err := os.Pipe()
+ // The bcftools --mask argument needs to end in ".bed"
+ // in order to be parsed as a BED file, so we need to
+ // use a named pipe instead of stdin.
+ tempdir, err := ioutil.TempDir("", "")
+ if err != nil {
+ return fmt.Errorf("TempDir: %s", err)
+ }
+ defer os.RemoveAll(tempdir)
+ maskfifo = filepath.Join(tempdir, "fifo.bed")
+ err = syscall.Mkfifo(maskfifo, 0600)
+ if err != nil {
+ return fmt.Errorf("mkfifo: %s", err)
+ }
+
+ // bedtools complement can't seem to read from a pipe
+ // reliably -- "Error: line number 1 of file
+ // /dev/stdin has 1 fields, but 3 were expected." --
+ // so we stage to a temp file.
+ regionsFile := filepath.Join(tempdir, "gvcf_regions.bed")
+ err = ioutil.WriteFile(regionsFile, regions.Bytes(), 0644)
if err != nil {
return err
}
- bedcompargs := []string{"bedtools", "complement", "-i", "/dev/stdin", "-g", cmd.genomeFile}
- bedcompargs = maybeInDocker(bedcompargs, []string{cmd.refFile, infile})
- bedcomp := exec.Command(bedcompargs[0], bedcompargs[1:]...)
- bedcomp.Stdin = bedr
- bedcomp.Stdout = bedcompw
- bedcomp.Stderr = cmd.stderr
+
wg.Add(1)
go func() {
defer wg.Done()
+
+ maskfifow, err := os.OpenFile(maskfifo, os.O_WRONLY, 0)
+ if err != nil {
+ errs <- err
+ return
+ }
+ defer maskfifow.Close()
+
+ bedcompargs := []string{"bedtools", "complement", "-i", regionsFile, "-g", "/dev/stdin"}
+ bedcompargs = maybeInDocker(bedcompargs, []string{cmd.genomeFile})
+ bedcomp := exec.CommandContext(ctx, bedcompargs[0], bedcompargs[1:]...)
+ bedcomp.Stdin = &sortedGenomeFile
+ bedcomp.Stdout = maskfifow
+ bedcomp.Stderr = cmd.stderr
log.Printf("running %v", bedcomp.Args)
- errs <- bedcomp.Run()
+ err = bedcomp.Run()
+ log.Printf("exited %v", bedcomp.Args)
+ if err != nil {
+ errs <- fmt.Errorf("bedtools complement: %s", err)
+ return
+ }
+ err = maskfifow.Close()
+ if err != nil {
+ errs <- err
+ return
+ }
}()
- maskfile = bedcompr
}
wg.Add(1)
go func() {
defer wg.Done()
consargs := []string{"bcftools", "consensus", "--fasta-ref", cmd.refFile, "-H", fmt.Sprint(phase)}
- if maskfile != nil {
- consargs = append(consargs, "--mask", "/dev/stdin")
+ if maskfifo != "" {
+ consargs = append(consargs, "--mask", maskfifo)
}
consargs = append(consargs, infile)
indexsuffix := ".tbi"
if _, err := os.Stat(infile + ".csi"); err == nil {
indexsuffix = ".csi"
}
- consargs = maybeInDocker(consargs, []string{infile, infile + indexsuffix, cmd.refFile})
+ mounts := []string{infile, infile + indexsuffix, cmd.refFile}
+ if maskfifo != "" {
+ mounts = append(mounts, maskfifo)
+ }
+ consargs = maybeInDocker(consargs, mounts)
- consensus := exec.Command(consargs[0], consargs[1:]...)
+ consensus := exec.CommandContext(ctx, consargs[0], consargs[1:]...)
consensus.Stderr = os.Stderr
consensus.Stdout = gzipw
consensus.Stderr = cmd.stderr
- if maskfile != nil {
- consensus.Stdin = maskfile
- }
log.Printf("running %v", consensus.Args)
err = consensus.Run()
if err != nil {
- errs <- err
+ errs <- fmt.Errorf("bcftools consensus: %s", err)
return
}
+ log.Printf("exited %v", consensus.Args)
err = gzipw.Close()
if err != nil {
errs <- err
return
}
+ err = bufw.Flush()
+ if err != nil {
+ errs <- err
+ return
+ }
errs <- outf.Close()
}()
for err := range errs {
if err != nil {
+ cancel()
wg.Wait()
return err
}