Fix some tests.
[lightning.git] / vcf2fasta.go
index 98b285e0ed1dafb4dee7236d1af338a4f082937d..3ca18c410f078f1af1395a3b8034fc2db536cbfa 100644 (file)
@@ -1,4 +1,8 @@
-package main
+// Copyright (C) The Lightning Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package lightning
 
 import (
        "bufio"
@@ -22,7 +26,7 @@ import (
        "sync"
        "syscall"
 
-       "git.arvados.org/arvados.git/sdk/go/arvados"
+       "github.com/klauspost/pgzip"
        log "github.com/sirupsen/logrus"
 )
 
@@ -102,18 +106,20 @@ func (cmd *vcf2fasta) RunCommand(prog string, args []string, stdin io.Reader, st
                        if err != nil {
                                return 1
                        }
-                       if cmd.vcpus = len(cmd.batchArgs.Slice(infiles)) * 2; cmd.vcpus > 32 {
+                       batchsize := (len(infiles) + cmd.batchArgs.batches - 1) / cmd.batchArgs.batches
+                       if cmd.vcpus = batchsize * 2; cmd.vcpus > 32 {
                                cmd.vcpus = 32
                        }
                }
-               client := arvados.NewClientFromEnv()
                runner := arvadosContainerRunner{
                        Name:        "lightning vcf2fasta",
-                       Client:      client,
+                       Client:      arvadosClientFromEnv,
                        ProjectUUID: cmd.projectUUID,
                        RAM:         2<<30 + int64(cmd.vcpus)<<28,
                        VCPUs:       cmd.vcpus,
                        Priority:    *priority,
+                       KeepCache:   2,
+                       APIAccess:   true,
                        Mounts: map[string]map[string]interface{}{
                                "/gvcf_regions.py": map[string]interface{}{
                                        "kind":    "text",
@@ -236,7 +242,8 @@ func (cmd *vcf2fasta) vcf2fasta(infile string, phase int) error {
                return fmt.Errorf("error opening output file: %s", err)
        }
        defer outf.Close()
-       gzipw := gzip.NewWriter(outf)
+       bufw := bufio.NewWriterSize(outf, 8*1024*1024)
+       gzipw := pgzip.NewWriter(bufw)
        defer gzipw.Close()
 
        var maskfifo string // filename of mask fifo if we're running bedtools, otherwise ""
@@ -246,12 +253,13 @@ func (cmd *vcf2fasta) vcf2fasta(infile string, phase int) error {
        if cmd.mask {
                chrSize := map[string]int{}
 
-               vcffile, err := os.Open(infile)
+               vcffile, err := open(infile)
                if err != nil {
                        return err
                }
                defer vcffile.Close()
                var rdr io.Reader = vcffile
+               rdr = bufio.NewReaderSize(rdr, 8*1024*1024)
                if strings.HasSuffix(infile, ".gz") {
                        rdr, err = gzip.NewReader(vcffile)
                        if err != nil {
@@ -278,8 +286,18 @@ func (cmd *vcf2fasta) vcf2fasta(infile string, phase int) error {
                if err = scanner.Err(); err != nil {
                        return fmt.Errorf("error scanning input file %q: %s", infile, err)
                }
+
                var regions bytes.Buffer
-               bedargs := []string{"python2", "-", "--gvcf_type", cmd.gvcfType, infile}
+               bedargs := []string{"python2", "-"}
+               if cmd.gvcfType == "complete_genomics_pass_all" {
+                       bedargs = append(bedargs,
+                               "--ignore_phrases", "CNV", "INS:ME",
+                               "--unreported_is_called",
+                       )
+               } else if cmd.gvcfType != "" {
+                       bedargs = append(bedargs, "--gvcf_type", cmd.gvcfType)
+               }
+               bedargs = append(bedargs, infile)
                bed := exec.CommandContext(ctx, bedargs[0], bedargs[1:]...)
                bed.Stdin = bytes.NewBuffer(cmd.gvcfRegionsPyData)
                bed.Stdout = &regions
@@ -295,7 +313,7 @@ func (cmd *vcf2fasta) vcf2fasta(infile string, phase int) error {
                        // Read chromosome sizes from genome file in
                        // case any weren't specified in the VCF
                        // header.
-                       genomeFile, err := os.Open(cmd.genomeFile)
+                       genomeFile, err := open(cmd.genomeFile)
                        if err != nil {
                                return fmt.Errorf("error opening genome file %q: %s", cmd.genomeFile, err)
                        }
@@ -419,11 +437,17 @@ func (cmd *vcf2fasta) vcf2fasta(infile string, phase int) error {
                        errs <- fmt.Errorf("bcftools consensus: %s", err)
                        return
                }
+               log.Printf("exited %v", consensus.Args)
                err = gzipw.Close()
                if err != nil {
                        errs <- err
                        return
                }
+               err = bufw.Flush()
+               if err != nil {
+                       errs <- err
+                       return
+               }
                errs <- outf.Close()
        }()
 
@@ -451,7 +475,7 @@ func (cmd *vcf2fasta) loadRegionsPy() error {
                if resp.StatusCode != http.StatusOK {
                        return fmt.Errorf("get %q: http status %d", cmd.gvcfRegionsPy, resp.StatusCode)
                }
-               buf, err := ioutil.ReadAll(resp.Body)
+               buf, err := io.ReadAll(resp.Body)
                if err != nil {
                        return fmt.Errorf("get %q: read body: %s", cmd.gvcfRegionsPy, err)
                }