X-Git-Url: https://git.arvados.org/lightning.git/blobdiff_plain/0291ed7441cf389cd360c4f0001f90221f60f07e..e4839e500e2da73b476f095425c0347ae7c5de97:/vcf2fasta.go diff --git a/vcf2fasta.go b/vcf2fasta.go index afafdd70a4..3ca18c410f 100644 --- a/vcf2fasta.go +++ b/vcf2fasta.go @@ -1,4 +1,8 @@ -package main +// Copyright (C) The Lightning Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + +package lightning import ( "bufio" @@ -15,12 +19,14 @@ import ( "os" "os/exec" "path/filepath" + "regexp" "runtime" + "strconv" "strings" "sync" "syscall" - "git.arvados.org/arvados.git/sdk/go/arvados" + "github.com/klauspost/pgzip" log "github.com/sirupsen/logrus" ) @@ -30,10 +36,12 @@ type vcf2fasta struct { mask bool gvcfRegionsPy string gvcfRegionsPyData []byte + gvcfType string projectUUID string outputDir string runLocal bool vcpus int + batchArgs stderr io.Writer } @@ -51,10 +59,12 @@ func (cmd *vcf2fasta) RunCommand(prog string, args []string, stdin io.Reader, st flags.StringVar(&cmd.genomeFile, "genome", "", "reference genome `file`") flags.BoolVar(&cmd.mask, "mask", false, "mask uncalled regions (default: output hom ref)") flags.StringVar(&cmd.gvcfRegionsPy, "gvcf-regions.py", "https://raw.githubusercontent.com/lijiayong/gvcf_regions/master/gvcf_regions.py", "source of gvcf_regions.py") + flags.StringVar(&cmd.gvcfType, "gvcf-type", "gatk", "gvcf_type argument to gvcf_regions.py: gatk, complete_genomics, freebayes") flags.StringVar(&cmd.projectUUID, "project", "", "project `UUID` for containers and output data") flags.StringVar(&cmd.outputDir, "output-dir", "", "output directory") flags.IntVar(&cmd.vcpus, "vcpus", 0, "number of VCPUs to request for arvados container (default: 2*number of input files, max 32)") flags.BoolVar(&cmd.runLocal, "local", false, "run on local host (default: run in an arvados container)") + cmd.batchArgs.Flags(flags) priority := flags.Int("priority", 500, "container request priority") pprof := flags.String("pprof", "", "serve Go profile data at http://`[addr]:port`") err = flags.Parse(args) @@ -96,17 +106,20 @@ func (cmd *vcf2fasta) RunCommand(prog string, args []string, stdin io.Reader, st if err != nil { return 1 } - if cmd.vcpus = len(infiles) * 2; cmd.vcpus > 32 { + batchsize := (len(infiles) + cmd.batchArgs.batches - 1) / cmd.batchArgs.batches + if cmd.vcpus = batchsize * 2; cmd.vcpus > 32 { cmd.vcpus = 32 } } runner := arvadosContainerRunner{ Name: "lightning vcf2fasta", - Client: arvados.NewClientFromEnv(), + Client: arvadosClientFromEnv, ProjectUUID: cmd.projectUUID, RAM: 2<<30 + int64(cmd.vcpus)<<28, VCPUs: cmd.vcpus, Priority: *priority, + KeepCache: 2, + APIAccess: true, Mounts: map[string]map[string]interface{}{ "/gvcf_regions.py": map[string]interface{}{ "kind": "text", @@ -125,18 +138,29 @@ func (cmd *vcf2fasta) RunCommand(prog string, args []string, stdin io.Reader, st return 1 } } - runner.Args = append([]string{"vcf2fasta", - "-local=true", - "-ref", cmd.refFile, fmt.Sprintf("-mask=%v", cmd.mask), - "-genome", cmd.genomeFile, - "-gvcf-regions.py", "/gvcf_regions.py", - "-output-dir", "/mnt/output"}, inputs...) - var output string - output, err = runner.Run() + var outputs []string + outputs, err = cmd.batchArgs.RunBatches(context.Background(), func(ctx context.Context, batch int) (string, error) { + runner := runner + if cmd.mask { + runner.RAM += int64(cmd.vcpus) << 31 + } + runner.Args = []string{"vcf2fasta", + "-local=true", + "-ref", cmd.refFile, fmt.Sprintf("-mask=%v", cmd.mask), + "-genome", cmd.genomeFile, + "-gvcf-regions.py", "/gvcf_regions.py", + "-gvcf-type", cmd.gvcfType, + "-output-dir", "/mnt/output", + } + runner.Args = append(runner.Args, cmd.batchArgs.Args(batch)...) + runner.Args = append(runner.Args, inputs...) + log.Printf("batch %d: %v", batch, runner.Args) + return runner.RunContext(ctx) + }) if err != nil { return 1 } - fmt.Fprintln(stdout, output) + fmt.Fprintln(stdout, strings.Join(outputs, " ")) return 0 } @@ -144,6 +168,7 @@ func (cmd *vcf2fasta) RunCommand(prog string, args []string, stdin io.Reader, st if err != nil { return 1 } + infiles = cmd.batchArgs.Slice(infiles) type job struct { vcffile string @@ -217,7 +242,8 @@ func (cmd *vcf2fasta) vcf2fasta(infile string, phase int) error { return fmt.Errorf("error opening output file: %s", err) } defer outf.Close() - gzipw := gzip.NewWriter(outf) + bufw := bufio.NewWriterSize(outf, 8*1024*1024) + gzipw := pgzip.NewWriter(bufw) defer gzipw.Close() var maskfifo string // filename of mask fifo if we're running bedtools, otherwise "" @@ -225,46 +251,93 @@ func (cmd *vcf2fasta) vcf2fasta(infile string, phase int) error { var wg sync.WaitGroup errs := make(chan error, 2) if cmd.mask { - if cmd.genomeFile == "" { - // TODO: get chromosome sizes from VCF header, ##contig= - return errors.New("cannot apply mask without -genome argument") + chrSize := map[string]int{} + + vcffile, err := open(infile) + if err != nil { + return err + } + defer vcffile.Close() + var rdr io.Reader = vcffile + rdr = bufio.NewReaderSize(rdr, 8*1024*1024) + if strings.HasSuffix(infile, ".gz") { + rdr, err = gzip.NewReader(vcffile) + if err != nil { + return err + } + } + contigre := regexp.MustCompile(`([^=,]*)=([^>,]*)`) + scanner := bufio.NewScanner(rdr) + for scanner.Scan() { + if s := scanner.Text(); !strings.HasPrefix(s, "##") { + break + } else if !strings.HasPrefix(s, "##contig=<") { + continue + } else { + kv := map[string]string{} + for _, m := range contigre.FindAllStringSubmatch(s[10:], -1) { + kv[m[1]] = m[2] + } + if kv["ID"] != "" && kv["length"] != "" { + chrSize[kv["ID"]], _ = strconv.Atoi(kv["length"]) + } + } + } + if err = scanner.Err(); err != nil { + return fmt.Errorf("error scanning input file %q: %s", infile, err) } var regions bytes.Buffer - bedargs := []string{"python2", "-", "--gvcf_type", "gatk", infile} + bedargs := []string{"python2", "-"} + if cmd.gvcfType == "complete_genomics_pass_all" { + bedargs = append(bedargs, + "--ignore_phrases", "CNV", "INS:ME", + "--unreported_is_called", + ) + } else if cmd.gvcfType != "" { + bedargs = append(bedargs, "--gvcf_type", cmd.gvcfType) + } + bedargs = append(bedargs, infile) bed := exec.CommandContext(ctx, bedargs[0], bedargs[1:]...) bed.Stdin = bytes.NewBuffer(cmd.gvcfRegionsPyData) bed.Stdout = ®ions bed.Stderr = cmd.stderr log.Printf("running %v", bed.Args) - err := bed.Run() + err = bed.Run() log.Printf("exited %v", bed.Args) if err != nil { return fmt.Errorf("gvcf_regions: %s", err) } + if cmd.genomeFile != "" { + // Read chromosome sizes from genome file in + // case any weren't specified in the VCF + // header. + genomeFile, err := open(cmd.genomeFile) + if err != nil { + return fmt.Errorf("error opening genome file %q: %s", cmd.genomeFile, err) + } + scanner := bufio.NewScanner(genomeFile) + for scanner.Scan() { + var chr string + var size int + _, err := fmt.Sscanf(scanner.Text(), "%s\t%d", &chr, &size) + if err != nil { + return fmt.Errorf("error parsing genome file %q: %s", cmd.genomeFile, err) + } + if chrSize[chr] == 0 { + chrSize[chr] = size + } + } + if err = scanner.Err(); err != nil { + return fmt.Errorf("error scanning genome file %q: %s", cmd.genomeFile, err) + } + } + // "bedtools complement" expects the chromosome sizes // ("genome file") to appear in the same order as the // chromosomes in the input vcf, so we need to sort // them. - genomeFile, err := os.Open(cmd.genomeFile) - if err != nil { - return fmt.Errorf("error opening genome file: %s", err) - } - chrSize := map[string]int{} - scanner := bufio.NewScanner(genomeFile) - for scanner.Scan() { - var chr string - var size int - _, err := fmt.Sscanf(scanner.Text(), "%s\t%d", &chr, &size) - if err != nil { - return fmt.Errorf("error parsing genome file: %s", err) - } - chrSize[chr] = size - } - if err = scanner.Err(); err != nil { - return fmt.Errorf("error scanning genome file: %s", err) - } scanner = bufio.NewScanner(bytes.NewBuffer(append([]byte(nil), regions.Bytes()...))) var sortedGenomeFile bytes.Buffer for scanner.Scan() { @@ -364,11 +437,17 @@ func (cmd *vcf2fasta) vcf2fasta(infile string, phase int) error { errs <- fmt.Errorf("bcftools consensus: %s", err) return } + log.Printf("exited %v", consensus.Args) err = gzipw.Close() if err != nil { errs <- err return } + err = bufw.Flush() + if err != nil { + errs <- err + return + } errs <- outf.Close() }() @@ -396,7 +475,7 @@ func (cmd *vcf2fasta) loadRegionsPy() error { if resp.StatusCode != http.StatusOK { return fmt.Errorf("get %q: http status %d", cmd.gvcfRegionsPy, resp.StatusCode) } - buf, err := ioutil.ReadAll(resp.Body) + buf, err := io.ReadAll(resp.Body) if err != nil { return fmt.Errorf("get %q: read body: %s", cmd.gvcfRegionsPy, err) }