X-Git-Url: https://git.arvados.org/lightning.git/blobdiff_plain/c237c16e8eac15e8c925851dc708d7cbe7b12d7a..8778237e68cdbe4fd1d4b43ed5140438fc92f3a6:/vcf2fasta.go diff --git a/vcf2fasta.go b/vcf2fasta.go index 4044cfdeee..e1a3a3944e 100644 --- a/vcf2fasta.go +++ b/vcf2fasta.go @@ -1,4 +1,8 @@ -package main +// Copyright (C) The Lightning Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + +package lightning import ( "bufio" @@ -22,7 +26,7 @@ import ( "sync" "syscall" - "git.arvados.org/arvados.git/sdk/go/arvados" + "github.com/klauspost/pgzip" log "github.com/sirupsen/logrus" ) @@ -37,6 +41,7 @@ type vcf2fasta struct { outputDir string runLocal bool vcpus int + batchArgs stderr io.Writer } @@ -59,6 +64,7 @@ func (cmd *vcf2fasta) RunCommand(prog string, args []string, stdin io.Reader, st flags.StringVar(&cmd.outputDir, "output-dir", "", "output directory") flags.IntVar(&cmd.vcpus, "vcpus", 0, "number of VCPUs to request for arvados container (default: 2*number of input files, max 32)") flags.BoolVar(&cmd.runLocal, "local", false, "run on local host (default: run in an arvados container)") + cmd.batchArgs.Flags(flags) priority := flags.Int("priority", 500, "container request priority") pprof := flags.String("pprof", "", "serve Go profile data at http://`[addr]:port`") err = flags.Parse(args) @@ -100,17 +106,20 @@ func (cmd *vcf2fasta) RunCommand(prog string, args []string, stdin io.Reader, st if err != nil { return 1 } - if cmd.vcpus = len(infiles) * 2; cmd.vcpus > 32 { + batchsize := (len(infiles) + cmd.batchArgs.batches - 1) / cmd.batchArgs.batches + if cmd.vcpus = batchsize * 2; cmd.vcpus > 32 { cmd.vcpus = 32 } } runner := arvadosContainerRunner{ Name: "lightning vcf2fasta", - Client: arvados.NewClientFromEnv(), + Client: arvadosClientFromEnv, ProjectUUID: cmd.projectUUID, RAM: 2<<30 + int64(cmd.vcpus)<<28, VCPUs: cmd.vcpus, Priority: *priority, + KeepCache: 2, + APIAccess: true, Mounts: map[string]map[string]interface{}{ "/gvcf_regions.py": map[string]interface{}{ "kind": "text", @@ -118,9 +127,6 @@ func (cmd *vcf2fasta) RunCommand(prog string, args []string, stdin io.Reader, st }, }, } - if cmd.mask { - runner.RAM += int64(cmd.vcpus) << 31 - } err = runner.TranslatePaths(&cmd.refFile, &cmd.genomeFile) if err != nil { return 1 @@ -132,19 +138,29 @@ func (cmd *vcf2fasta) RunCommand(prog string, args []string, stdin io.Reader, st return 1 } } - runner.Args = append([]string{"vcf2fasta", - "-local=true", - "-ref", cmd.refFile, fmt.Sprintf("-mask=%v", cmd.mask), - "-genome", cmd.genomeFile, - "-gvcf-regions.py", "/gvcf_regions.py", - "-gvcf-type", cmd.gvcfType, - "-output-dir", "/mnt/output"}, inputs...) - var output string - output, err = runner.Run() + var outputs []string + outputs, err = cmd.batchArgs.RunBatches(context.Background(), func(ctx context.Context, batch int) (string, error) { + runner := runner + if cmd.mask { + runner.RAM += int64(cmd.vcpus) << 31 + } + runner.Args = []string{"vcf2fasta", + "-local=true", + "-ref", cmd.refFile, fmt.Sprintf("-mask=%v", cmd.mask), + "-genome", cmd.genomeFile, + "-gvcf-regions.py", "/gvcf_regions.py", + "-gvcf-type", cmd.gvcfType, + "-output-dir", "/mnt/output", + } + runner.Args = append(runner.Args, cmd.batchArgs.Args(batch)...) + runner.Args = append(runner.Args, inputs...) + log.Printf("batch %d: %v", batch, runner.Args) + return runner.RunContext(ctx) + }) if err != nil { return 1 } - fmt.Fprintln(stdout, output) + fmt.Fprintln(stdout, strings.Join(outputs, " ")) return 0 } @@ -152,6 +168,7 @@ func (cmd *vcf2fasta) RunCommand(prog string, args []string, stdin io.Reader, st if err != nil { return 1 } + infiles = cmd.batchArgs.Slice(infiles) type job struct { vcffile string @@ -225,7 +242,8 @@ func (cmd *vcf2fasta) vcf2fasta(infile string, phase int) error { return fmt.Errorf("error opening output file: %s", err) } defer outf.Close() - gzipw := gzip.NewWriter(outf) + bufw := bufio.NewWriterSize(outf, 8*1024*1024) + gzipw := pgzip.NewWriter(bufw) defer gzipw.Close() var maskfifo string // filename of mask fifo if we're running bedtools, otherwise "" @@ -235,12 +253,13 @@ func (cmd *vcf2fasta) vcf2fasta(infile string, phase int) error { if cmd.mask { chrSize := map[string]int{} - vcffile, err := os.Open(infile) + vcffile, err := open(infile) if err != nil { return err } defer vcffile.Close() var rdr io.Reader = vcffile + rdr = bufio.NewReaderSize(rdr, 8*1024*1024) if strings.HasSuffix(infile, ".gz") { rdr, err = gzip.NewReader(vcffile) if err != nil { @@ -267,8 +286,18 @@ func (cmd *vcf2fasta) vcf2fasta(infile string, phase int) error { if err = scanner.Err(); err != nil { return fmt.Errorf("error scanning input file %q: %s", infile, err) } + var regions bytes.Buffer - bedargs := []string{"python2", "-", "--gvcf_type", cmd.gvcfType, infile} + bedargs := []string{"python2", "-"} + if cmd.gvcfType == "complete_genomics_pass_all" { + bedargs = append(bedargs, + "--ignore_phrases", "CNV", "INS:ME", + "--unreported_is_called", + ) + } else if cmd.gvcfType != "" { + bedargs = append(bedargs, "--gvcf_type", cmd.gvcfType) + } + bedargs = append(bedargs, infile) bed := exec.CommandContext(ctx, bedargs[0], bedargs[1:]...) bed.Stdin = bytes.NewBuffer(cmd.gvcfRegionsPyData) bed.Stdout = ®ions @@ -284,7 +313,7 @@ func (cmd *vcf2fasta) vcf2fasta(infile string, phase int) error { // Read chromosome sizes from genome file in // case any weren't specified in the VCF // header. - genomeFile, err := os.Open(cmd.genomeFile) + genomeFile, err := open(cmd.genomeFile) if err != nil { return fmt.Errorf("error opening genome file %q: %s", cmd.genomeFile, err) } @@ -408,11 +437,17 @@ func (cmd *vcf2fasta) vcf2fasta(infile string, phase int) error { errs <- fmt.Errorf("bcftools consensus: %s", err) return } + log.Printf("exited %v", consensus.Args) err = gzipw.Close() if err != nil { errs <- err return } + err = bufw.Flush() + if err != nil { + errs <- err + return + } errs <- outf.Close() }()