1 // Copyright (C) The Lightning Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
29 "github.com/klauspost/pgzip"
30 log "github.com/sirupsen/logrus"
33 type vcf2fasta struct {
38 gvcfRegionsPyData []byte
49 func (cmd *vcf2fasta) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
53 fmt.Fprintf(stderr, "%s\n", err)
56 flags := flag.NewFlagSet("", flag.ContinueOnError)
57 flags.SetOutput(stderr)
58 flags.StringVar(&cmd.refFile, "ref", "", "reference fasta `file`")
59 flags.StringVar(&cmd.genomeFile, "genome", "", "reference genome `file`")
60 flags.BoolVar(&cmd.mask, "mask", false, "mask uncalled regions (default: output hom ref)")
61 flags.StringVar(&cmd.gvcfRegionsPy, "gvcf-regions.py", "https://raw.githubusercontent.com/lijiayong/gvcf_regions/master/gvcf_regions.py", "source of gvcf_regions.py")
62 flags.StringVar(&cmd.gvcfType, "gvcf-type", "gatk", "gvcf_type argument to gvcf_regions.py: gatk, complete_genomics, freebayes")
63 flags.StringVar(&cmd.projectUUID, "project", "", "project `UUID` for containers and output data")
64 flags.StringVar(&cmd.outputDir, "output-dir", "", "output directory")
65 flags.IntVar(&cmd.vcpus, "vcpus", 0, "number of VCPUs to request for arvados container (default: 2*number of input files, max 32)")
66 flags.BoolVar(&cmd.runLocal, "local", false, "run on local host (default: run in an arvados container)")
67 cmd.batchArgs.Flags(flags)
68 priority := flags.Int("priority", 500, "container request priority")
69 pprof := flags.String("pprof", "", "serve Go profile data at http://`[addr]:port`")
70 err = flags.Parse(args)
71 if err == flag.ErrHelp {
74 } else if err != nil {
76 } else if cmd.refFile == "" {
77 err = errors.New("reference data (-ref) not specified")
79 } else if flags.NArg() == 0 {
87 log.Println(http.ListenAndServe(*pprof, nil))
92 err = cmd.loadRegionsPy()
99 if cmd.outputDir != "" {
100 err = errors.New("cannot specify output dir in non-local mode")
105 infiles, err = listInputFiles(flags.Args())
109 batchsize := (len(infiles) + cmd.batchArgs.batches - 1) / cmd.batchArgs.batches
110 if cmd.vcpus = batchsize * 2; cmd.vcpus > 32 {
114 runner := arvadosContainerRunner{
115 Name: "lightning vcf2fasta",
116 Client: arvadosClientFromEnv,
117 ProjectUUID: cmd.projectUUID,
118 RAM: 2<<30 + int64(cmd.vcpus)<<28,
123 Mounts: map[string]map[string]interface{}{
124 "/gvcf_regions.py": map[string]interface{}{
126 "content": string(cmd.gvcfRegionsPyData),
130 err = runner.TranslatePaths(&cmd.refFile, &cmd.genomeFile)
134 inputs := flags.Args()
135 for i := range inputs {
136 err = runner.TranslatePaths(&inputs[i])
142 outputs, err = cmd.batchArgs.RunBatches(context.Background(), func(ctx context.Context, batch int) (string, error) {
145 runner.RAM += int64(cmd.vcpus) << 31
147 runner.Args = []string{"vcf2fasta",
149 "-ref", cmd.refFile, fmt.Sprintf("-mask=%v", cmd.mask),
150 "-genome", cmd.genomeFile,
151 "-gvcf-regions.py", "/gvcf_regions.py",
152 "-gvcf-type", cmd.gvcfType,
153 "-output-dir", "/mnt/output",
155 runner.Args = append(runner.Args, cmd.batchArgs.Args(batch)...)
156 runner.Args = append(runner.Args, inputs...)
157 log.Printf("batch %d: %v", batch, runner.Args)
158 return runner.RunContext(ctx)
163 fmt.Fprintln(stdout, strings.Join(outputs, " "))
167 infiles, err := listInputFiles(flags.Args())
171 infiles = cmd.batchArgs.Slice(infiles)
177 todo := make(chan job)
179 for _, infile := range infiles {
180 for phase := 1; phase <= 2; phase++ {
181 todo <- job{vcffile: infile, phase: phase}
187 done := make(chan error, runtime.NumCPU()*2)
188 var wg sync.WaitGroup
189 for i := 0; i < runtime.NumCPU(); i++ {
193 for job := range todo {
195 // a different worker encountered an error
198 err := cmd.vcf2fasta(job.vcffile, job.phase)
200 done <- fmt.Errorf("%s phase %d: %s", job.vcffile, job.phase, err)
218 func maybeInDocker(args, mountfiles []string) []string {
219 if out, err := exec.Command("docker", "image", "ls", "-q", "lightning-runtime").Output(); err != nil || len(out) == 0 {
222 dockerrun := []string{
223 "docker", "run", "--rm", "-i",
226 for _, f := range mountfiles {
227 dockerrun = append(dockerrun, "--volume="+f+":"+f+":ro")
229 dockerrun = append(dockerrun, "lightning-runtime")
230 dockerrun = append(dockerrun, args...)
234 func (cmd *vcf2fasta) vcf2fasta(infile string, phase int) error {
235 ctx, cancel := context.WithCancel(context.Background())
238 _, basename := filepath.Split(infile)
239 outfile := filepath.Join(cmd.outputDir, fmt.Sprintf("%s.%d.fasta.gz", basename, phase))
240 outf, err := os.OpenFile(outfile, os.O_CREATE|os.O_WRONLY, 0777)
242 return fmt.Errorf("error opening output file: %s", err)
245 bufw := bufio.NewWriterSize(outf, 8*1024*1024)
246 gzipw := pgzip.NewWriter(bufw)
249 var maskfifo string // filename of mask fifo if we're running bedtools, otherwise ""
251 var wg sync.WaitGroup
252 errs := make(chan error, 2)
254 chrSize := map[string]int{}
256 vcffile, err := open(infile)
260 defer vcffile.Close()
261 var rdr io.Reader = vcffile
262 rdr = bufio.NewReaderSize(rdr, 8*1024*1024)
263 if strings.HasSuffix(infile, ".gz") {
264 rdr, err = gzip.NewReader(vcffile)
269 contigre := regexp.MustCompile(`([^=,]*)=([^>,]*)`)
270 scanner := bufio.NewScanner(rdr)
272 if s := scanner.Text(); !strings.HasPrefix(s, "##") {
274 } else if !strings.HasPrefix(s, "##contig=<") {
277 kv := map[string]string{}
278 for _, m := range contigre.FindAllStringSubmatch(s[10:], -1) {
281 if kv["ID"] != "" && kv["length"] != "" {
282 chrSize[kv["ID"]], _ = strconv.Atoi(kv["length"])
286 if err = scanner.Err(); err != nil {
287 return fmt.Errorf("error scanning input file %q: %s", infile, err)
290 var regions bytes.Buffer
291 bedargs := []string{"python2", "-"}
292 if cmd.gvcfType == "complete_genomics_pass_all" {
293 bedargs = append(bedargs,
294 "--ignore_phrases", "CNV", "INS:ME",
295 "--unreported_is_called",
297 } else if cmd.gvcfType != "" {
298 bedargs = append(bedargs, "--gvcf_type", cmd.gvcfType)
300 bedargs = append(bedargs, infile)
301 bed := exec.CommandContext(ctx, bedargs[0], bedargs[1:]...)
302 bed.Stdin = bytes.NewBuffer(cmd.gvcfRegionsPyData)
303 bed.Stdout = ®ions
304 bed.Stderr = cmd.stderr
305 log.Printf("running %v", bed.Args)
307 log.Printf("exited %v", bed.Args)
309 return fmt.Errorf("gvcf_regions: %s", err)
312 if cmd.genomeFile != "" {
313 // Read chromosome sizes from genome file in
314 // case any weren't specified in the VCF
316 genomeFile, err := open(cmd.genomeFile)
318 return fmt.Errorf("error opening genome file %q: %s", cmd.genomeFile, err)
320 scanner := bufio.NewScanner(genomeFile)
324 _, err := fmt.Sscanf(scanner.Text(), "%s\t%d", &chr, &size)
326 return fmt.Errorf("error parsing genome file %q: %s", cmd.genomeFile, err)
328 if chrSize[chr] == 0 {
332 if err = scanner.Err(); err != nil {
333 return fmt.Errorf("error scanning genome file %q: %s", cmd.genomeFile, err)
337 // "bedtools complement" expects the chromosome sizes
338 // ("genome file") to appear in the same order as the
339 // chromosomes in the input vcf, so we need to sort
341 scanner = bufio.NewScanner(bytes.NewBuffer(append([]byte(nil), regions.Bytes()...)))
342 var sortedGenomeFile bytes.Buffer
346 _, err := fmt.Sscanf(scanner.Text(), "%s\t%d", &chr, &size)
348 return fmt.Errorf("error parsing gvcf_regions output: %s", err)
350 if size, ok := chrSize[chr]; ok {
351 fmt.Fprintf(&sortedGenomeFile, "%s\t%d\n", chr, size)
356 // The bcftools --mask argument needs to end in ".bed"
357 // in order to be parsed as a BED file, so we need to
358 // use a named pipe instead of stdin.
359 tempdir, err := ioutil.TempDir("", "")
361 return fmt.Errorf("TempDir: %s", err)
363 defer os.RemoveAll(tempdir)
364 maskfifo = filepath.Join(tempdir, "fifo.bed")
365 err = syscall.Mkfifo(maskfifo, 0600)
367 return fmt.Errorf("mkfifo: %s", err)
370 // bedtools complement can't seem to read from a pipe
371 // reliably -- "Error: line number 1 of file
372 // /dev/stdin has 1 fields, but 3 were expected." --
373 // so we stage to a temp file.
374 regionsFile := filepath.Join(tempdir, "gvcf_regions.bed")
375 err = ioutil.WriteFile(regionsFile, regions.Bytes(), 0644)
384 maskfifow, err := os.OpenFile(maskfifo, os.O_WRONLY, 0)
389 defer maskfifow.Close()
391 bedcompargs := []string{"bedtools", "complement", "-i", regionsFile, "-g", "/dev/stdin"}
392 bedcompargs = maybeInDocker(bedcompargs, []string{cmd.genomeFile})
393 bedcomp := exec.CommandContext(ctx, bedcompargs[0], bedcompargs[1:]...)
394 bedcomp.Stdin = &sortedGenomeFile
395 bedcomp.Stdout = maskfifow
396 bedcomp.Stderr = cmd.stderr
397 log.Printf("running %v", bedcomp.Args)
399 log.Printf("exited %v", bedcomp.Args)
401 errs <- fmt.Errorf("bedtools complement: %s", err)
404 err = maskfifow.Close()
415 consargs := []string{"bcftools", "consensus", "--fasta-ref", cmd.refFile, "-H", fmt.Sprint(phase)}
417 consargs = append(consargs, "--mask", maskfifo)
419 consargs = append(consargs, infile)
420 indexsuffix := ".tbi"
421 if _, err := os.Stat(infile + ".csi"); err == nil {
424 mounts := []string{infile, infile + indexsuffix, cmd.refFile}
426 mounts = append(mounts, maskfifo)
428 consargs = maybeInDocker(consargs, mounts)
430 consensus := exec.CommandContext(ctx, consargs[0], consargs[1:]...)
431 consensus.Stderr = os.Stderr
432 consensus.Stdout = gzipw
433 consensus.Stderr = cmd.stderr
434 log.Printf("running %v", consensus.Args)
435 err = consensus.Run()
437 errs <- fmt.Errorf("bcftools consensus: %s", err)
440 log.Printf("exited %v", consensus.Args)
459 for err := range errs {
469 func (cmd *vcf2fasta) loadRegionsPy() error {
470 if strings.HasPrefix(cmd.gvcfRegionsPy, "http") {
471 resp, err := http.Get(cmd.gvcfRegionsPy)
475 if resp.StatusCode != http.StatusOK {
476 return fmt.Errorf("get %q: http status %d", cmd.gvcfRegionsPy, resp.StatusCode)
478 buf, err := io.ReadAll(resp.Body)
480 return fmt.Errorf("get %q: read body: %s", cmd.gvcfRegionsPy, err)
482 cmd.gvcfRegionsPyData = buf
485 buf, err := ioutil.ReadFile(cmd.gvcfRegionsPy)
489 cmd.gvcfRegionsPyData = buf