X-Git-Url: https://git.arvados.org/lightning.git/blobdiff_plain/5fe4a5feeff081241b3da124336ed9ec086ba902..66cf9d08ca8504cdb660ed44738622b90f7fd42f:/vcf2fasta.go diff --git a/vcf2fasta.go b/vcf2fasta.go index 6c8ec5fbcc..e1a3a3944e 100644 --- a/vcf2fasta.go +++ b/vcf2fasta.go @@ -1,8 +1,14 @@ -package main +// Copyright (C) The Lightning Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + +package lightning import ( + "bufio" "bytes" "compress/gzip" + "context" "errors" "flag" "fmt" @@ -13,11 +19,14 @@ import ( "os" "os/exec" "path/filepath" + "regexp" "runtime" + "strconv" "strings" "sync" + "syscall" - "git.arvados.org/arvados.git/sdk/go/arvados" + "github.com/klauspost/pgzip" log "github.com/sirupsen/logrus" ) @@ -27,10 +36,12 @@ type vcf2fasta struct { mask bool gvcfRegionsPy string gvcfRegionsPyData []byte + gvcfType string projectUUID string outputDir string runLocal bool vcpus int + batchArgs stderr io.Writer } @@ -48,10 +59,12 @@ func (cmd *vcf2fasta) RunCommand(prog string, args []string, stdin io.Reader, st flags.StringVar(&cmd.genomeFile, "genome", "", "reference genome `file`") flags.BoolVar(&cmd.mask, "mask", false, "mask uncalled regions (default: output hom ref)") flags.StringVar(&cmd.gvcfRegionsPy, "gvcf-regions.py", "https://raw.githubusercontent.com/lijiayong/gvcf_regions/master/gvcf_regions.py", "source of gvcf_regions.py") + flags.StringVar(&cmd.gvcfType, "gvcf-type", "gatk", "gvcf_type argument to gvcf_regions.py: gatk, complete_genomics, freebayes") flags.StringVar(&cmd.projectUUID, "project", "", "project `UUID` for containers and output data") flags.StringVar(&cmd.outputDir, "output-dir", "", "output directory") flags.IntVar(&cmd.vcpus, "vcpus", 0, "number of VCPUs to request for arvados container (default: 2*number of input files, max 32)") flags.BoolVar(&cmd.runLocal, "local", false, "run on local host (default: run in an arvados container)") + cmd.batchArgs.Flags(flags) priority := flags.Int("priority", 500, "container request priority") pprof := flags.String("pprof", "", "serve Go profile data at http://`[addr]:port`") err = flags.Parse(args) @@ -93,17 +106,20 @@ func (cmd *vcf2fasta) RunCommand(prog string, args []string, stdin io.Reader, st if err != nil { return 1 } - if cmd.vcpus = len(infiles) * 2; cmd.vcpus > 32 { + batchsize := (len(infiles) + cmd.batchArgs.batches - 1) / cmd.batchArgs.batches + if cmd.vcpus = batchsize * 2; cmd.vcpus > 32 { cmd.vcpus = 32 } } runner := arvadosContainerRunner{ Name: "lightning vcf2fasta", - Client: arvados.NewClientFromEnv(), + Client: arvadosClientFromEnv, ProjectUUID: cmd.projectUUID, RAM: 2<<30 + int64(cmd.vcpus)<<28, VCPUs: cmd.vcpus, Priority: *priority, + KeepCache: 2, + APIAccess: true, Mounts: map[string]map[string]interface{}{ "/gvcf_regions.py": map[string]interface{}{ "kind": "text", @@ -111,7 +127,7 @@ func (cmd *vcf2fasta) RunCommand(prog string, args []string, stdin io.Reader, st }, }, } - err = runner.TranslatePaths(&cmd.refFile) + err = runner.TranslatePaths(&cmd.refFile, &cmd.genomeFile) if err != nil { return 1 } @@ -122,13 +138,29 @@ func (cmd *vcf2fasta) RunCommand(prog string, args []string, stdin io.Reader, st return 1 } } - runner.Args = append([]string{"vcf2fasta", "-local=true", "-ref", cmd.refFile, fmt.Sprintf("-mask=%v", cmd.mask), "-gvcf-regions.py", "/gvcf_regions.py", "-output-dir", "/mnt/output"}, inputs...) - var output string - output, err = runner.Run() + var outputs []string + outputs, err = cmd.batchArgs.RunBatches(context.Background(), func(ctx context.Context, batch int) (string, error) { + runner := runner + if cmd.mask { + runner.RAM += int64(cmd.vcpus) << 31 + } + runner.Args = []string{"vcf2fasta", + "-local=true", + "-ref", cmd.refFile, fmt.Sprintf("-mask=%v", cmd.mask), + "-genome", cmd.genomeFile, + "-gvcf-regions.py", "/gvcf_regions.py", + "-gvcf-type", cmd.gvcfType, + "-output-dir", "/mnt/output", + } + runner.Args = append(runner.Args, cmd.batchArgs.Args(batch)...) + runner.Args = append(runner.Args, inputs...) + log.Printf("batch %d: %v", batch, runner.Args) + return runner.RunContext(ctx) + }) if err != nil { return 1 } - fmt.Fprintln(stdout, output) + fmt.Fprintln(stdout, strings.Join(outputs, " ")) return 0 } @@ -136,6 +168,7 @@ func (cmd *vcf2fasta) RunCommand(prog string, args []string, stdin io.Reader, st if err != nil { return 1 } + infiles = cmd.batchArgs.Slice(infiles) type job struct { vcffile string @@ -187,7 +220,7 @@ func maybeInDocker(args, mountfiles []string) []string { return args } dockerrun := []string{ - "docker", "run", "--rm", + "docker", "run", "--rm", "-i", "--log-driver=none", } for _, f := range mountfiles { @@ -199,91 +232,222 @@ func maybeInDocker(args, mountfiles []string) []string { } func (cmd *vcf2fasta) vcf2fasta(infile string, phase int) error { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + _, basename := filepath.Split(infile) outfile := filepath.Join(cmd.outputDir, fmt.Sprintf("%s.%d.fasta.gz", basename, phase)) - outf, err := os.OpenFile(outfile, os.O_CREATE|os.O_WRONLY|os.O_EXCL, 0777) + outf, err := os.OpenFile(outfile, os.O_CREATE|os.O_WRONLY, 0777) if err != nil { return fmt.Errorf("error opening output file: %s", err) } defer outf.Close() - gzipw := gzip.NewWriter(outf) + bufw := bufio.NewWriterSize(outf, 8*1024*1024) + gzipw := pgzip.NewWriter(bufw) defer gzipw.Close() - var maskfile *os.File // reading side of a pipe if we're running bedtools, otherwise nil + var maskfifo string // filename of mask fifo if we're running bedtools, otherwise "" var wg sync.WaitGroup - errs := make(chan error, 3) + errs := make(chan error, 2) if cmd.mask { - if cmd.genomeFile == "" { - return errors.New("cannot apply mask without -genome argument") - } - bedr, bedw, err := os.Pipe() + chrSize := map[string]int{} + + vcffile, err := open(infile) if err != nil { return err } - bedargs := []string{"python", "-", "--gvcf_type", "gatk", infile} - bed := exec.Command(bedargs[0], bedargs[1:]...) + defer vcffile.Close() + var rdr io.Reader = vcffile + rdr = bufio.NewReaderSize(rdr, 8*1024*1024) + if strings.HasSuffix(infile, ".gz") { + rdr, err = gzip.NewReader(vcffile) + if err != nil { + return err + } + } + contigre := regexp.MustCompile(`([^=,]*)=([^>,]*)`) + scanner := bufio.NewScanner(rdr) + for scanner.Scan() { + if s := scanner.Text(); !strings.HasPrefix(s, "##") { + break + } else if !strings.HasPrefix(s, "##contig=<") { + continue + } else { + kv := map[string]string{} + for _, m := range contigre.FindAllStringSubmatch(s[10:], -1) { + kv[m[1]] = m[2] + } + if kv["ID"] != "" && kv["length"] != "" { + chrSize[kv["ID"]], _ = strconv.Atoi(kv["length"]) + } + } + } + if err = scanner.Err(); err != nil { + return fmt.Errorf("error scanning input file %q: %s", infile, err) + } + + var regions bytes.Buffer + bedargs := []string{"python2", "-"} + if cmd.gvcfType == "complete_genomics_pass_all" { + bedargs = append(bedargs, + "--ignore_phrases", "CNV", "INS:ME", + "--unreported_is_called", + ) + } else if cmd.gvcfType != "" { + bedargs = append(bedargs, "--gvcf_type", cmd.gvcfType) + } + bedargs = append(bedargs, infile) + bed := exec.CommandContext(ctx, bedargs[0], bedargs[1:]...) bed.Stdin = bytes.NewBuffer(cmd.gvcfRegionsPyData) - bed.Stdout = bedw + bed.Stdout = ®ions bed.Stderr = cmd.stderr - wg.Add(1) - go func() { - defer wg.Done() - log.Printf("running %v", bed.Args) - errs <- bed.Run() - }() + log.Printf("running %v", bed.Args) + err = bed.Run() + log.Printf("exited %v", bed.Args) + if err != nil { + return fmt.Errorf("gvcf_regions: %s", err) + } - bedcompr, bedcompw, err := os.Pipe() + if cmd.genomeFile != "" { + // Read chromosome sizes from genome file in + // case any weren't specified in the VCF + // header. + genomeFile, err := open(cmd.genomeFile) + if err != nil { + return fmt.Errorf("error opening genome file %q: %s", cmd.genomeFile, err) + } + scanner := bufio.NewScanner(genomeFile) + for scanner.Scan() { + var chr string + var size int + _, err := fmt.Sscanf(scanner.Text(), "%s\t%d", &chr, &size) + if err != nil { + return fmt.Errorf("error parsing genome file %q: %s", cmd.genomeFile, err) + } + if chrSize[chr] == 0 { + chrSize[chr] = size + } + } + if err = scanner.Err(); err != nil { + return fmt.Errorf("error scanning genome file %q: %s", cmd.genomeFile, err) + } + } + + // "bedtools complement" expects the chromosome sizes + // ("genome file") to appear in the same order as the + // chromosomes in the input vcf, so we need to sort + // them. + scanner = bufio.NewScanner(bytes.NewBuffer(append([]byte(nil), regions.Bytes()...))) + var sortedGenomeFile bytes.Buffer + for scanner.Scan() { + var chr string + var size int + _, err := fmt.Sscanf(scanner.Text(), "%s\t%d", &chr, &size) + if err != nil { + return fmt.Errorf("error parsing gvcf_regions output: %s", err) + } + if size, ok := chrSize[chr]; ok { + fmt.Fprintf(&sortedGenomeFile, "%s\t%d\n", chr, size) + delete(chrSize, chr) + } + } + + // The bcftools --mask argument needs to end in ".bed" + // in order to be parsed as a BED file, so we need to + // use a named pipe instead of stdin. + tempdir, err := ioutil.TempDir("", "") + if err != nil { + return fmt.Errorf("TempDir: %s", err) + } + defer os.RemoveAll(tempdir) + maskfifo = filepath.Join(tempdir, "fifo.bed") + err = syscall.Mkfifo(maskfifo, 0600) + if err != nil { + return fmt.Errorf("mkfifo: %s", err) + } + + // bedtools complement can't seem to read from a pipe + // reliably -- "Error: line number 1 of file + // /dev/stdin has 1 fields, but 3 were expected." -- + // so we stage to a temp file. + regionsFile := filepath.Join(tempdir, "gvcf_regions.bed") + err = ioutil.WriteFile(regionsFile, regions.Bytes(), 0644) if err != nil { return err } - bedcompargs := []string{"bedtools", "complement", "-i", "/dev/stdin", "-g", cmd.genomeFile} - bedcompargs = maybeInDocker(bedcompargs, []string{cmd.refFile, infile}) - bedcomp := exec.Command(bedcompargs[0], bedcompargs[1:]...) - bedcomp.Stdin = bedr - bedcomp.Stdout = bedcompw - bedcomp.Stderr = cmd.stderr + wg.Add(1) go func() { defer wg.Done() + + maskfifow, err := os.OpenFile(maskfifo, os.O_WRONLY, 0) + if err != nil { + errs <- err + return + } + defer maskfifow.Close() + + bedcompargs := []string{"bedtools", "complement", "-i", regionsFile, "-g", "/dev/stdin"} + bedcompargs = maybeInDocker(bedcompargs, []string{cmd.genomeFile}) + bedcomp := exec.CommandContext(ctx, bedcompargs[0], bedcompargs[1:]...) + bedcomp.Stdin = &sortedGenomeFile + bedcomp.Stdout = maskfifow + bedcomp.Stderr = cmd.stderr log.Printf("running %v", bedcomp.Args) - errs <- bedcomp.Run() + err = bedcomp.Run() + log.Printf("exited %v", bedcomp.Args) + if err != nil { + errs <- fmt.Errorf("bedtools complement: %s", err) + return + } + err = maskfifow.Close() + if err != nil { + errs <- err + return + } }() - maskfile = bedcompr } wg.Add(1) go func() { defer wg.Done() consargs := []string{"bcftools", "consensus", "--fasta-ref", cmd.refFile, "-H", fmt.Sprint(phase)} - if maskfile != nil { - consargs = append(consargs, "--mask", "/dev/stdin") + if maskfifo != "" { + consargs = append(consargs, "--mask", maskfifo) } consargs = append(consargs, infile) indexsuffix := ".tbi" if _, err := os.Stat(infile + ".csi"); err == nil { indexsuffix = ".csi" } - consargs = maybeInDocker(consargs, []string{infile, infile + indexsuffix, cmd.refFile}) + mounts := []string{infile, infile + indexsuffix, cmd.refFile} + if maskfifo != "" { + mounts = append(mounts, maskfifo) + } + consargs = maybeInDocker(consargs, mounts) - consensus := exec.Command(consargs[0], consargs[1:]...) + consensus := exec.CommandContext(ctx, consargs[0], consargs[1:]...) consensus.Stderr = os.Stderr consensus.Stdout = gzipw consensus.Stderr = cmd.stderr - if maskfile != nil { - consensus.Stdin = maskfile - } log.Printf("running %v", consensus.Args) err = consensus.Run() if err != nil { - errs <- err + errs <- fmt.Errorf("bcftools consensus: %s", err) return } + log.Printf("exited %v", consensus.Args) err = gzipw.Close() if err != nil { errs <- err return } + err = bufw.Flush() + if err != nil { + errs <- err + return + } errs <- outf.Close() }() @@ -294,6 +458,7 @@ func (cmd *vcf2fasta) vcf2fasta(infile string, phase int) error { for err := range errs { if err != nil { + cancel() wg.Wait() return err }