-package main
+// Copyright (C) The Lightning Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package lightning
import (
"bufio"
"sync"
"syscall"
- "git.arvados.org/arvados.git/sdk/go/arvados"
+ "github.com/klauspost/pgzip"
log "github.com/sirupsen/logrus"
)
if err != nil {
return 1
}
- if cmd.vcpus = len(cmd.batchArgs.Slice(infiles)) * 2; cmd.vcpus > 32 {
+ batchsize := (len(infiles) + cmd.batchArgs.batches - 1) / cmd.batchArgs.batches
+ if cmd.vcpus = batchsize * 2; cmd.vcpus > 32 {
cmd.vcpus = 32
}
}
- client := arvados.NewClientFromEnv()
runner := arvadosContainerRunner{
Name: "lightning vcf2fasta",
- Client: client,
+ Client: arvadosClientFromEnv,
ProjectUUID: cmd.projectUUID,
RAM: 2<<30 + int64(cmd.vcpus)<<28,
VCPUs: cmd.vcpus,
Priority: *priority,
+ KeepCache: 2,
+ APIAccess: true,
Mounts: map[string]map[string]interface{}{
"/gvcf_regions.py": map[string]interface{}{
"kind": "text",
return fmt.Errorf("error opening output file: %s", err)
}
defer outf.Close()
- gzipw := gzip.NewWriter(outf)
+ bufw := bufio.NewWriterSize(outf, 8*1024*1024)
+ gzipw := pgzip.NewWriter(bufw)
defer gzipw.Close()
var maskfifo string // filename of mask fifo if we're running bedtools, otherwise ""
if cmd.mask {
chrSize := map[string]int{}
- vcffile, err := os.Open(infile)
+ vcffile, err := open(infile)
if err != nil {
return err
}
defer vcffile.Close()
var rdr io.Reader = vcffile
+ rdr = bufio.NewReaderSize(rdr, 8*1024*1024)
if strings.HasSuffix(infile, ".gz") {
rdr, err = gzip.NewReader(vcffile)
if err != nil {
if err = scanner.Err(); err != nil {
return fmt.Errorf("error scanning input file %q: %s", infile, err)
}
+
var regions bytes.Buffer
- bedargs := []string{"python2", "-", "--gvcf_type", cmd.gvcfType, infile}
+ bedargs := []string{"python2", "-"}
+ if cmd.gvcfType == "complete_genomics_pass_all" {
+ bedargs = append(bedargs,
+ "--ignore_phrases", "CNV", "INS:ME",
+ "--unreported_is_called",
+ )
+ } else if cmd.gvcfType != "" {
+ bedargs = append(bedargs, "--gvcf_type", cmd.gvcfType)
+ }
+ bedargs = append(bedargs, infile)
bed := exec.CommandContext(ctx, bedargs[0], bedargs[1:]...)
bed.Stdin = bytes.NewBuffer(cmd.gvcfRegionsPyData)
bed.Stdout = ®ions
// Read chromosome sizes from genome file in
// case any weren't specified in the VCF
// header.
- genomeFile, err := os.Open(cmd.genomeFile)
+ genomeFile, err := open(cmd.genomeFile)
if err != nil {
return fmt.Errorf("error opening genome file %q: %s", cmd.genomeFile, err)
}
errs <- fmt.Errorf("bcftools consensus: %s", err)
return
}
+ log.Printf("exited %v", consensus.Args)
err = gzipw.Close()
if err != nil {
errs <- err
return
}
+ err = bufw.Flush()
+ if err != nil {
+ errs <- err
+ return
+ }
errs <- outf.Close()
}()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("get %q: http status %d", cmd.gvcfRegionsPy, resp.StatusCode)
}
- buf, err := ioutil.ReadAll(resp.Body)
+ buf, err := io.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("get %q: read body: %s", cmd.gvcfRegionsPy, err)
}