X-Git-Url: https://git.arvados.org/lightning.git/blobdiff_plain/5e71949ad96735d7dcdd2d12486d42c43677d0ac..ed9183d5e6ce23f1cf6f9a7d65d3e68214393e8a:/vcf2fasta.go diff --git a/vcf2fasta.go b/vcf2fasta.go index 1e7e276926..e1a3a3944e 100644 --- a/vcf2fasta.go +++ b/vcf2fasta.go @@ -1,9 +1,14 @@ -package main +// Copyright (C) The Lightning Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + +package lightning import ( "bufio" "bytes" "compress/gzip" + "context" "errors" "flag" "fmt" @@ -14,12 +19,14 @@ import ( "os" "os/exec" "path/filepath" + "regexp" "runtime" + "strconv" "strings" "sync" "syscall" - "git.arvados.org/arvados.git/sdk/go/arvados" + "github.com/klauspost/pgzip" log "github.com/sirupsen/logrus" ) @@ -29,10 +36,12 @@ type vcf2fasta struct { mask bool gvcfRegionsPy string gvcfRegionsPyData []byte + gvcfType string projectUUID string outputDir string runLocal bool vcpus int + batchArgs stderr io.Writer } @@ -50,10 +59,12 @@ func (cmd *vcf2fasta) RunCommand(prog string, args []string, stdin io.Reader, st flags.StringVar(&cmd.genomeFile, "genome", "", "reference genome `file`") flags.BoolVar(&cmd.mask, "mask", false, "mask uncalled regions (default: output hom ref)") flags.StringVar(&cmd.gvcfRegionsPy, "gvcf-regions.py", "https://raw.githubusercontent.com/lijiayong/gvcf_regions/master/gvcf_regions.py", "source of gvcf_regions.py") + flags.StringVar(&cmd.gvcfType, "gvcf-type", "gatk", "gvcf_type argument to gvcf_regions.py: gatk, complete_genomics, freebayes") flags.StringVar(&cmd.projectUUID, "project", "", "project `UUID` for containers and output data") flags.StringVar(&cmd.outputDir, "output-dir", "", "output directory") flags.IntVar(&cmd.vcpus, "vcpus", 0, "number of VCPUs to request for arvados container (default: 2*number of input files, max 32)") flags.BoolVar(&cmd.runLocal, "local", false, "run on local host (default: run in an arvados container)") + cmd.batchArgs.Flags(flags) priority := flags.Int("priority", 500, "container request priority") pprof := flags.String("pprof", "", "serve Go profile data at http://`[addr]:port`") err = flags.Parse(args) @@ -95,17 +106,20 @@ func (cmd *vcf2fasta) RunCommand(prog string, args []string, stdin io.Reader, st if err != nil { return 1 } - if cmd.vcpus = len(infiles) * 2; cmd.vcpus > 32 { + batchsize := (len(infiles) + cmd.batchArgs.batches - 1) / cmd.batchArgs.batches + if cmd.vcpus = batchsize * 2; cmd.vcpus > 32 { cmd.vcpus = 32 } } runner := arvadosContainerRunner{ Name: "lightning vcf2fasta", - Client: arvados.NewClientFromEnv(), + Client: arvadosClientFromEnv, ProjectUUID: cmd.projectUUID, RAM: 2<<30 + int64(cmd.vcpus)<<28, VCPUs: cmd.vcpus, Priority: *priority, + KeepCache: 2, + APIAccess: true, Mounts: map[string]map[string]interface{}{ "/gvcf_regions.py": map[string]interface{}{ "kind": "text", @@ -124,18 +138,29 @@ func (cmd *vcf2fasta) RunCommand(prog string, args []string, stdin io.Reader, st return 1 } } - runner.Args = append([]string{"vcf2fasta", - "-local=true", - "-ref", cmd.refFile, fmt.Sprintf("-mask=%v", cmd.mask), - "-genome", cmd.genomeFile, - "-gvcf-regions.py", "/gvcf_regions.py", - "-output-dir", "/mnt/output"}, inputs...) - var output string - output, err = runner.Run() + var outputs []string + outputs, err = cmd.batchArgs.RunBatches(context.Background(), func(ctx context.Context, batch int) (string, error) { + runner := runner + if cmd.mask { + runner.RAM += int64(cmd.vcpus) << 31 + } + runner.Args = []string{"vcf2fasta", + "-local=true", + "-ref", cmd.refFile, fmt.Sprintf("-mask=%v", cmd.mask), + "-genome", cmd.genomeFile, + "-gvcf-regions.py", "/gvcf_regions.py", + "-gvcf-type", cmd.gvcfType, + "-output-dir", "/mnt/output", + } + runner.Args = append(runner.Args, cmd.batchArgs.Args(batch)...) + runner.Args = append(runner.Args, inputs...) + log.Printf("batch %d: %v", batch, runner.Args) + return runner.RunContext(ctx) + }) if err != nil { return 1 } - fmt.Fprintln(stdout, output) + fmt.Fprintln(stdout, strings.Join(outputs, " ")) return 0 } @@ -143,6 +168,7 @@ func (cmd *vcf2fasta) RunCommand(prog string, args []string, stdin io.Reader, st if err != nil { return 1 } + infiles = cmd.batchArgs.Slice(infiles) type job struct { vcffile string @@ -206,6 +232,9 @@ func maybeInDocker(args, mountfiles []string) []string { } func (cmd *vcf2fasta) vcf2fasta(infile string, phase int) error { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + _, basename := filepath.Split(infile) outfile := filepath.Join(cmd.outputDir, fmt.Sprintf("%s.%d.fasta.gz", basename, phase)) outf, err := os.OpenFile(outfile, os.O_CREATE|os.O_WRONLY, 0777) @@ -213,7 +242,8 @@ func (cmd *vcf2fasta) vcf2fasta(infile string, phase int) error { return fmt.Errorf("error opening output file: %s", err) } defer outf.Close() - gzipw := gzip.NewWriter(outf) + bufw := bufio.NewWriterSize(outf, 8*1024*1024) + gzipw := pgzip.NewWriter(bufw) defer gzipw.Close() var maskfifo string // filename of mask fifo if we're running bedtools, otherwise "" @@ -221,46 +251,93 @@ func (cmd *vcf2fasta) vcf2fasta(infile string, phase int) error { var wg sync.WaitGroup errs := make(chan error, 2) if cmd.mask { - if cmd.genomeFile == "" { - // TODO: get chromosome sizes from VCF header, ##contig= - return errors.New("cannot apply mask without -genome argument") + chrSize := map[string]int{} + + vcffile, err := open(infile) + if err != nil { + return err + } + defer vcffile.Close() + var rdr io.Reader = vcffile + rdr = bufio.NewReaderSize(rdr, 8*1024*1024) + if strings.HasSuffix(infile, ".gz") { + rdr, err = gzip.NewReader(vcffile) + if err != nil { + return err + } + } + contigre := regexp.MustCompile(`([^=,]*)=([^>,]*)`) + scanner := bufio.NewScanner(rdr) + for scanner.Scan() { + if s := scanner.Text(); !strings.HasPrefix(s, "##") { + break + } else if !strings.HasPrefix(s, "##contig=<") { + continue + } else { + kv := map[string]string{} + for _, m := range contigre.FindAllStringSubmatch(s[10:], -1) { + kv[m[1]] = m[2] + } + if kv["ID"] != "" && kv["length"] != "" { + chrSize[kv["ID"]], _ = strconv.Atoi(kv["length"]) + } + } + } + if err = scanner.Err(); err != nil { + return fmt.Errorf("error scanning input file %q: %s", infile, err) } var regions bytes.Buffer - bedargs := []string{"python2", "-", "--gvcf_type", "gatk", infile} - bed := exec.Command(bedargs[0], bedargs[1:]...) + bedargs := []string{"python2", "-"} + if cmd.gvcfType == "complete_genomics_pass_all" { + bedargs = append(bedargs, + "--ignore_phrases", "CNV", "INS:ME", + "--unreported_is_called", + ) + } else if cmd.gvcfType != "" { + bedargs = append(bedargs, "--gvcf_type", cmd.gvcfType) + } + bedargs = append(bedargs, infile) + bed := exec.CommandContext(ctx, bedargs[0], bedargs[1:]...) bed.Stdin = bytes.NewBuffer(cmd.gvcfRegionsPyData) bed.Stdout = ®ions bed.Stderr = cmd.stderr log.Printf("running %v", bed.Args) - err := bed.Run() + err = bed.Run() log.Printf("exited %v", bed.Args) if err != nil { return fmt.Errorf("gvcf_regions: %s", err) } + if cmd.genomeFile != "" { + // Read chromosome sizes from genome file in + // case any weren't specified in the VCF + // header. + genomeFile, err := open(cmd.genomeFile) + if err != nil { + return fmt.Errorf("error opening genome file %q: %s", cmd.genomeFile, err) + } + scanner := bufio.NewScanner(genomeFile) + for scanner.Scan() { + var chr string + var size int + _, err := fmt.Sscanf(scanner.Text(), "%s\t%d", &chr, &size) + if err != nil { + return fmt.Errorf("error parsing genome file %q: %s", cmd.genomeFile, err) + } + if chrSize[chr] == 0 { + chrSize[chr] = size + } + } + if err = scanner.Err(); err != nil { + return fmt.Errorf("error scanning genome file %q: %s", cmd.genomeFile, err) + } + } + // "bedtools complement" expects the chromosome sizes // ("genome file") to appear in the same order as the // chromosomes in the input vcf, so we need to sort // them. - genomeFile, err := os.Open(cmd.genomeFile) - if err != nil { - return fmt.Errorf("error opening genome file: %s", err) - } - chrSize := map[string]int{} - scanner := bufio.NewScanner(genomeFile) - for scanner.Scan() { - var chr string - var size int - _, err := fmt.Sscanf(scanner.Text(), "%s\t%d", &chr, &size) - if err != nil { - return fmt.Errorf("error parsing genome file: %s", err) - } - chrSize[chr] = size - } - if err = scanner.Err(); err != nil { - return fmt.Errorf("error scanning genome file: %s", err) - } scanner = bufio.NewScanner(bytes.NewBuffer(append([]byte(nil), regions.Bytes()...))) var sortedGenomeFile bytes.Buffer for scanner.Scan() { @@ -313,7 +390,7 @@ func (cmd *vcf2fasta) vcf2fasta(infile string, phase int) error { bedcompargs := []string{"bedtools", "complement", "-i", regionsFile, "-g", "/dev/stdin"} bedcompargs = maybeInDocker(bedcompargs, []string{cmd.genomeFile}) - bedcomp := exec.Command(bedcompargs[0], bedcompargs[1:]...) + bedcomp := exec.CommandContext(ctx, bedcompargs[0], bedcompargs[1:]...) bedcomp.Stdin = &sortedGenomeFile bedcomp.Stdout = maskfifow bedcomp.Stderr = cmd.stderr @@ -350,7 +427,7 @@ func (cmd *vcf2fasta) vcf2fasta(infile string, phase int) error { } consargs = maybeInDocker(consargs, mounts) - consensus := exec.Command(consargs[0], consargs[1:]...) + consensus := exec.CommandContext(ctx, consargs[0], consargs[1:]...) consensus.Stderr = os.Stderr consensus.Stdout = gzipw consensus.Stderr = cmd.stderr @@ -360,11 +437,17 @@ func (cmd *vcf2fasta) vcf2fasta(infile string, phase int) error { errs <- fmt.Errorf("bcftools consensus: %s", err) return } + log.Printf("exited %v", consensus.Args) err = gzipw.Close() if err != nil { errs <- err return } + err = bufw.Flush() + if err != nil { + errs <- err + return + } errs <- outf.Close() }() @@ -375,6 +458,7 @@ func (cmd *vcf2fasta) vcf2fasta(infile string, phase int) error { for err := range errs { if err != nil { + cancel() wg.Wait() return err }