Fix some tests.
[lightning.git] / vcf2fasta.go
index 2f02e44a28b8b9188ba8700c4ce15c88a08647c5..3ca18c410f078f1af1395a3b8034fc2db536cbfa 100644 (file)
@@ -1,4 +1,8 @@
-package main
+// Copyright (C) The Lightning Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package lightning
 
 import (
        "bufio"
@@ -22,7 +26,7 @@ import (
        "sync"
        "syscall"
 
-       "git.arvados.org/arvados.git/sdk/go/arvados"
+       "github.com/klauspost/pgzip"
        log "github.com/sirupsen/logrus"
 )
 
@@ -32,10 +36,12 @@ type vcf2fasta struct {
        mask              bool
        gvcfRegionsPy     string
        gvcfRegionsPyData []byte
+       gvcfType          string
        projectUUID       string
        outputDir         string
        runLocal          bool
        vcpus             int
+       batchArgs
 
        stderr io.Writer
 }
@@ -53,10 +59,12 @@ func (cmd *vcf2fasta) RunCommand(prog string, args []string, stdin io.Reader, st
        flags.StringVar(&cmd.genomeFile, "genome", "", "reference genome `file`")
        flags.BoolVar(&cmd.mask, "mask", false, "mask uncalled regions (default: output hom ref)")
        flags.StringVar(&cmd.gvcfRegionsPy, "gvcf-regions.py", "https://raw.githubusercontent.com/lijiayong/gvcf_regions/master/gvcf_regions.py", "source of gvcf_regions.py")
+       flags.StringVar(&cmd.gvcfType, "gvcf-type", "gatk", "gvcf_type argument to gvcf_regions.py: gatk, complete_genomics, freebayes")
        flags.StringVar(&cmd.projectUUID, "project", "", "project `UUID` for containers and output data")
        flags.StringVar(&cmd.outputDir, "output-dir", "", "output directory")
        flags.IntVar(&cmd.vcpus, "vcpus", 0, "number of VCPUs to request for arvados container (default: 2*number of input files, max 32)")
        flags.BoolVar(&cmd.runLocal, "local", false, "run on local host (default: run in an arvados container)")
+       cmd.batchArgs.Flags(flags)
        priority := flags.Int("priority", 500, "container request priority")
        pprof := flags.String("pprof", "", "serve Go profile data at http://`[addr]:port`")
        err = flags.Parse(args)
@@ -98,17 +106,20 @@ func (cmd *vcf2fasta) RunCommand(prog string, args []string, stdin io.Reader, st
                        if err != nil {
                                return 1
                        }
-                       if cmd.vcpus = len(infiles) * 2; cmd.vcpus > 32 {
+                       batchsize := (len(infiles) + cmd.batchArgs.batches - 1) / cmd.batchArgs.batches
+                       if cmd.vcpus = batchsize * 2; cmd.vcpus > 32 {
                                cmd.vcpus = 32
                        }
                }
                runner := arvadosContainerRunner{
                        Name:        "lightning vcf2fasta",
-                       Client:      arvados.NewClientFromEnv(),
+                       Client:      arvadosClientFromEnv,
                        ProjectUUID: cmd.projectUUID,
                        RAM:         2<<30 + int64(cmd.vcpus)<<28,
                        VCPUs:       cmd.vcpus,
                        Priority:    *priority,
+                       KeepCache:   2,
+                       APIAccess:   true,
                        Mounts: map[string]map[string]interface{}{
                                "/gvcf_regions.py": map[string]interface{}{
                                        "kind":    "text",
@@ -116,9 +127,6 @@ func (cmd *vcf2fasta) RunCommand(prog string, args []string, stdin io.Reader, st
                                },
                        },
                }
-               if cmd.mask {
-                       runner.RAM += int64(cmd.vcpus) << 31
-               }
                err = runner.TranslatePaths(&cmd.refFile, &cmd.genomeFile)
                if err != nil {
                        return 1
@@ -130,18 +138,29 @@ func (cmd *vcf2fasta) RunCommand(prog string, args []string, stdin io.Reader, st
                                return 1
                        }
                }
-               runner.Args = append([]string{"vcf2fasta",
-                       "-local=true",
-                       "-ref", cmd.refFile, fmt.Sprintf("-mask=%v", cmd.mask),
-                       "-genome", cmd.genomeFile,
-                       "-gvcf-regions.py", "/gvcf_regions.py",
-                       "-output-dir", "/mnt/output"}, inputs...)
-               var output string
-               output, err = runner.Run()
+               var outputs []string
+               outputs, err = cmd.batchArgs.RunBatches(context.Background(), func(ctx context.Context, batch int) (string, error) {
+                       runner := runner
+                       if cmd.mask {
+                               runner.RAM += int64(cmd.vcpus) << 31
+                       }
+                       runner.Args = []string{"vcf2fasta",
+                               "-local=true",
+                               "-ref", cmd.refFile, fmt.Sprintf("-mask=%v", cmd.mask),
+                               "-genome", cmd.genomeFile,
+                               "-gvcf-regions.py", "/gvcf_regions.py",
+                               "-gvcf-type", cmd.gvcfType,
+                               "-output-dir", "/mnt/output",
+                       }
+                       runner.Args = append(runner.Args, cmd.batchArgs.Args(batch)...)
+                       runner.Args = append(runner.Args, inputs...)
+                       log.Printf("batch %d: %v", batch, runner.Args)
+                       return runner.RunContext(ctx)
+               })
                if err != nil {
                        return 1
                }
-               fmt.Fprintln(stdout, output)
+               fmt.Fprintln(stdout, strings.Join(outputs, " "))
                return 0
        }
 
@@ -149,6 +168,7 @@ func (cmd *vcf2fasta) RunCommand(prog string, args []string, stdin io.Reader, st
        if err != nil {
                return 1
        }
+       infiles = cmd.batchArgs.Slice(infiles)
 
        type job struct {
                vcffile string
@@ -222,7 +242,8 @@ func (cmd *vcf2fasta) vcf2fasta(infile string, phase int) error {
                return fmt.Errorf("error opening output file: %s", err)
        }
        defer outf.Close()
-       gzipw := gzip.NewWriter(outf)
+       bufw := bufio.NewWriterSize(outf, 8*1024*1024)
+       gzipw := pgzip.NewWriter(bufw)
        defer gzipw.Close()
 
        var maskfifo string // filename of mask fifo if we're running bedtools, otherwise ""
@@ -232,12 +253,13 @@ func (cmd *vcf2fasta) vcf2fasta(infile string, phase int) error {
        if cmd.mask {
                chrSize := map[string]int{}
 
-               vcffile, err := os.Open(infile)
+               vcffile, err := open(infile)
                if err != nil {
                        return err
                }
                defer vcffile.Close()
                var rdr io.Reader = vcffile
+               rdr = bufio.NewReaderSize(rdr, 8*1024*1024)
                if strings.HasSuffix(infile, ".gz") {
                        rdr, err = gzip.NewReader(vcffile)
                        if err != nil {
@@ -264,8 +286,18 @@ func (cmd *vcf2fasta) vcf2fasta(infile string, phase int) error {
                if err = scanner.Err(); err != nil {
                        return fmt.Errorf("error scanning input file %q: %s", infile, err)
                }
+
                var regions bytes.Buffer
-               bedargs := []string{"python2", "-", "--gvcf_type", "gatk", infile}
+               bedargs := []string{"python2", "-"}
+               if cmd.gvcfType == "complete_genomics_pass_all" {
+                       bedargs = append(bedargs,
+                               "--ignore_phrases", "CNV", "INS:ME",
+                               "--unreported_is_called",
+                       )
+               } else if cmd.gvcfType != "" {
+                       bedargs = append(bedargs, "--gvcf_type", cmd.gvcfType)
+               }
+               bedargs = append(bedargs, infile)
                bed := exec.CommandContext(ctx, bedargs[0], bedargs[1:]...)
                bed.Stdin = bytes.NewBuffer(cmd.gvcfRegionsPyData)
                bed.Stdout = &regions
@@ -281,7 +313,7 @@ func (cmd *vcf2fasta) vcf2fasta(infile string, phase int) error {
                        // Read chromosome sizes from genome file in
                        // case any weren't specified in the VCF
                        // header.
-                       genomeFile, err := os.Open(cmd.genomeFile)
+                       genomeFile, err := open(cmd.genomeFile)
                        if err != nil {
                                return fmt.Errorf("error opening genome file %q: %s", cmd.genomeFile, err)
                        }
@@ -405,11 +437,17 @@ func (cmd *vcf2fasta) vcf2fasta(infile string, phase int) error {
                        errs <- fmt.Errorf("bcftools consensus: %s", err)
                        return
                }
+               log.Printf("exited %v", consensus.Args)
                err = gzipw.Close()
                if err != nil {
                        errs <- err
                        return
                }
+               err = bufw.Flush()
+               if err != nil {
+                       errs <- err
+                       return
+               }
                errs <- outf.Close()
        }()
 
@@ -437,7 +475,7 @@ func (cmd *vcf2fasta) loadRegionsPy() error {
                if resp.StatusCode != http.StatusOK {
                        return fmt.Errorf("get %q: http status %d", cmd.gvcfRegionsPy, resp.StatusCode)
                }
-               buf, err := ioutil.ReadAll(resp.Body)
+               buf, err := io.ReadAll(resp.Body)
                if err != nil {
                        return fmt.Errorf("get %q: read body: %s", cmd.gvcfRegionsPy, err)
                }