Use new Arvados logs API.
[lightning.git] / vcf2fasta.go
1 // Copyright (C) The Lightning Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package lightning
6
7 import (
8         "bufio"
9         "bytes"
10         "compress/gzip"
11         "context"
12         "errors"
13         "flag"
14         "fmt"
15         "io"
16         "io/ioutil"
17         "net/http"
18         _ "net/http/pprof"
19         "os"
20         "os/exec"
21         "path/filepath"
22         "regexp"
23         "runtime"
24         "strconv"
25         "strings"
26         "sync"
27         "syscall"
28
29         "github.com/klauspost/pgzip"
30         log "github.com/sirupsen/logrus"
31 )
32
33 type vcf2fasta struct {
34         refFile           string
35         genomeFile        string
36         mask              bool
37         gvcfRegionsPy     string
38         gvcfRegionsPyData []byte
39         gvcfType          string
40         projectUUID       string
41         outputDir         string
42         runLocal          bool
43         vcpus             int
44         batchArgs
45
46         stderr io.Writer
47 }
48
49 func (cmd *vcf2fasta) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
50         var err error
51         defer func() {
52                 if err != nil {
53                         fmt.Fprintf(stderr, "%s\n", err)
54                 }
55         }()
56         flags := flag.NewFlagSet("", flag.ContinueOnError)
57         flags.SetOutput(stderr)
58         flags.StringVar(&cmd.refFile, "ref", "", "reference fasta `file`")
59         flags.StringVar(&cmd.genomeFile, "genome", "", "reference genome `file`")
60         flags.BoolVar(&cmd.mask, "mask", false, "mask uncalled regions (default: output hom ref)")
61         flags.StringVar(&cmd.gvcfRegionsPy, "gvcf-regions.py", "https://raw.githubusercontent.com/lijiayong/gvcf_regions/master/gvcf_regions.py", "source of gvcf_regions.py")
62         flags.StringVar(&cmd.gvcfType, "gvcf-type", "gatk", "gvcf_type argument to gvcf_regions.py: gatk, complete_genomics, freebayes")
63         flags.StringVar(&cmd.projectUUID, "project", "", "project `UUID` for containers and output data")
64         flags.StringVar(&cmd.outputDir, "output-dir", "", "output directory")
65         flags.IntVar(&cmd.vcpus, "vcpus", 0, "number of VCPUs to request for arvados container (default: 2*number of input files, max 32)")
66         flags.BoolVar(&cmd.runLocal, "local", false, "run on local host (default: run in an arvados container)")
67         cmd.batchArgs.Flags(flags)
68         priority := flags.Int("priority", 500, "container request priority")
69         pprof := flags.String("pprof", "", "serve Go profile data at http://`[addr]:port`")
70         err = flags.Parse(args)
71         if err == flag.ErrHelp {
72                 err = nil
73                 return 0
74         } else if err != nil {
75                 return 2
76         } else if cmd.refFile == "" {
77                 err = errors.New("reference data (-ref) not specified")
78                 return 2
79         } else if flags.NArg() == 0 {
80                 flags.Usage()
81                 return 2
82         }
83         cmd.stderr = stderr
84
85         if *pprof != "" {
86                 go func() {
87                         log.Println(http.ListenAndServe(*pprof, nil))
88                 }()
89         }
90
91         if cmd.mask {
92                 err = cmd.loadRegionsPy()
93                 if err != nil {
94                         return 1
95                 }
96         }
97
98         if !cmd.runLocal {
99                 if cmd.outputDir != "" {
100                         err = errors.New("cannot specify output dir in non-local mode")
101                         return 2
102                 }
103                 if cmd.vcpus < 1 {
104                         var infiles []string
105                         infiles, err = listInputFiles(flags.Args())
106                         if err != nil {
107                                 return 1
108                         }
109                         batchsize := (len(infiles) + cmd.batchArgs.batches - 1) / cmd.batchArgs.batches
110                         if cmd.vcpus = batchsize * 2; cmd.vcpus > 32 {
111                                 cmd.vcpus = 32
112                         }
113                 }
114                 runner := arvadosContainerRunner{
115                         Name:        "lightning vcf2fasta",
116                         Client:      arvadosClientFromEnv,
117                         ProjectUUID: cmd.projectUUID,
118                         RAM:         2<<30 + int64(cmd.vcpus)<<28,
119                         VCPUs:       cmd.vcpus,
120                         Priority:    *priority,
121                         KeepCache:   2,
122                         APIAccess:   true,
123                         Mounts: map[string]map[string]interface{}{
124                                 "/gvcf_regions.py": map[string]interface{}{
125                                         "kind":    "text",
126                                         "content": string(cmd.gvcfRegionsPyData),
127                                 },
128                         },
129                 }
130                 err = runner.TranslatePaths(&cmd.refFile, &cmd.genomeFile)
131                 if err != nil {
132                         return 1
133                 }
134                 inputs := flags.Args()
135                 for i := range inputs {
136                         err = runner.TranslatePaths(&inputs[i])
137                         if err != nil {
138                                 return 1
139                         }
140                 }
141                 var outputs []string
142                 outputs, err = cmd.batchArgs.RunBatches(context.Background(), func(ctx context.Context, batch int) (string, error) {
143                         runner := runner
144                         if cmd.mask {
145                                 runner.RAM += int64(cmd.vcpus) << 31
146                         }
147                         runner.Args = []string{"vcf2fasta",
148                                 "-local=true",
149                                 "-ref", cmd.refFile, fmt.Sprintf("-mask=%v", cmd.mask),
150                                 "-genome", cmd.genomeFile,
151                                 "-gvcf-regions.py", "/gvcf_regions.py",
152                                 "-gvcf-type", cmd.gvcfType,
153                                 "-output-dir", "/mnt/output",
154                         }
155                         runner.Args = append(runner.Args, cmd.batchArgs.Args(batch)...)
156                         runner.Args = append(runner.Args, inputs...)
157                         log.Printf("batch %d: %v", batch, runner.Args)
158                         return runner.RunContext(ctx)
159                 })
160                 if err != nil {
161                         return 1
162                 }
163                 fmt.Fprintln(stdout, strings.Join(outputs, " "))
164                 return 0
165         }
166
167         infiles, err := listInputFiles(flags.Args())
168         if err != nil {
169                 return 1
170         }
171         infiles = cmd.batchArgs.Slice(infiles)
172
173         type job struct {
174                 vcffile string
175                 phase   int
176         }
177         todo := make(chan job)
178         go func() {
179                 for _, infile := range infiles {
180                         for phase := 1; phase <= 2; phase++ {
181                                 todo <- job{vcffile: infile, phase: phase}
182                         }
183                 }
184                 close(todo)
185         }()
186
187         done := make(chan error, runtime.NumCPU()*2)
188         var wg sync.WaitGroup
189         for i := 0; i < runtime.NumCPU(); i++ {
190                 wg.Add(1)
191                 go func() {
192                         defer wg.Done()
193                         for job := range todo {
194                                 if len(done) > 0 {
195                                         // a different worker encountered an error
196                                         return
197                                 }
198                                 err := cmd.vcf2fasta(job.vcffile, job.phase)
199                                 if err != nil {
200                                         done <- fmt.Errorf("%s phase %d: %s", job.vcffile, job.phase, err)
201                                         return
202                                 }
203                         }
204                 }()
205         }
206         go func() {
207                 wg.Wait()
208                 close(done)
209         }()
210
211         err = <-done
212         if err != nil {
213                 return 1
214         }
215         return 0
216 }
217
218 func maybeInDocker(args, mountfiles []string) []string {
219         if out, err := exec.Command("docker", "image", "ls", "-q", "lightning-runtime").Output(); err != nil || len(out) == 0 {
220                 return args
221         }
222         dockerrun := []string{
223                 "docker", "run", "--rm", "-i",
224                 "--log-driver=none",
225         }
226         for _, f := range mountfiles {
227                 dockerrun = append(dockerrun, "--volume="+f+":"+f+":ro")
228         }
229         dockerrun = append(dockerrun, "lightning-runtime")
230         dockerrun = append(dockerrun, args...)
231         return dockerrun
232 }
233
234 func (cmd *vcf2fasta) vcf2fasta(infile string, phase int) error {
235         ctx, cancel := context.WithCancel(context.Background())
236         defer cancel()
237
238         _, basename := filepath.Split(infile)
239         outfile := filepath.Join(cmd.outputDir, fmt.Sprintf("%s.%d.fasta.gz", basename, phase))
240         outf, err := os.OpenFile(outfile, os.O_CREATE|os.O_WRONLY, 0777)
241         if err != nil {
242                 return fmt.Errorf("error opening output file: %s", err)
243         }
244         defer outf.Close()
245         bufw := bufio.NewWriterSize(outf, 8*1024*1024)
246         gzipw := pgzip.NewWriter(bufw)
247         defer gzipw.Close()
248
249         var maskfifo string // filename of mask fifo if we're running bedtools, otherwise ""
250
251         var wg sync.WaitGroup
252         errs := make(chan error, 2)
253         if cmd.mask {
254                 chrSize := map[string]int{}
255
256                 vcffile, err := open(infile)
257                 if err != nil {
258                         return err
259                 }
260                 defer vcffile.Close()
261                 var rdr io.Reader = vcffile
262                 rdr = bufio.NewReaderSize(rdr, 8*1024*1024)
263                 if strings.HasSuffix(infile, ".gz") {
264                         rdr, err = gzip.NewReader(vcffile)
265                         if err != nil {
266                                 return err
267                         }
268                 }
269                 contigre := regexp.MustCompile(`([^=,]*)=([^>,]*)`)
270                 scanner := bufio.NewScanner(rdr)
271                 for scanner.Scan() {
272                         if s := scanner.Text(); !strings.HasPrefix(s, "##") {
273                                 break
274                         } else if !strings.HasPrefix(s, "##contig=<") {
275                                 continue
276                         } else {
277                                 kv := map[string]string{}
278                                 for _, m := range contigre.FindAllStringSubmatch(s[10:], -1) {
279                                         kv[m[1]] = m[2]
280                                 }
281                                 if kv["ID"] != "" && kv["length"] != "" {
282                                         chrSize[kv["ID"]], _ = strconv.Atoi(kv["length"])
283                                 }
284                         }
285                 }
286                 if err = scanner.Err(); err != nil {
287                         return fmt.Errorf("error scanning input file %q: %s", infile, err)
288                 }
289
290                 var regions bytes.Buffer
291                 bedargs := []string{"python2", "-"}
292                 if cmd.gvcfType == "complete_genomics_pass_all" {
293                         bedargs = append(bedargs,
294                                 "--ignore_phrases", "CNV", "INS:ME",
295                                 "--unreported_is_called",
296                         )
297                 } else if cmd.gvcfType != "" {
298                         bedargs = append(bedargs, "--gvcf_type", cmd.gvcfType)
299                 }
300                 bedargs = append(bedargs, infile)
301                 bed := exec.CommandContext(ctx, bedargs[0], bedargs[1:]...)
302                 bed.Stdin = bytes.NewBuffer(cmd.gvcfRegionsPyData)
303                 bed.Stdout = &regions
304                 bed.Stderr = cmd.stderr
305                 log.Printf("running %v", bed.Args)
306                 err = bed.Run()
307                 log.Printf("exited %v", bed.Args)
308                 if err != nil {
309                         return fmt.Errorf("gvcf_regions: %s", err)
310                 }
311
312                 if cmd.genomeFile != "" {
313                         // Read chromosome sizes from genome file in
314                         // case any weren't specified in the VCF
315                         // header.
316                         genomeFile, err := open(cmd.genomeFile)
317                         if err != nil {
318                                 return fmt.Errorf("error opening genome file %q: %s", cmd.genomeFile, err)
319                         }
320                         scanner := bufio.NewScanner(genomeFile)
321                         for scanner.Scan() {
322                                 var chr string
323                                 var size int
324                                 _, err := fmt.Sscanf(scanner.Text(), "%s\t%d", &chr, &size)
325                                 if err != nil {
326                                         return fmt.Errorf("error parsing genome file %q: %s", cmd.genomeFile, err)
327                                 }
328                                 if chrSize[chr] == 0 {
329                                         chrSize[chr] = size
330                                 }
331                         }
332                         if err = scanner.Err(); err != nil {
333                                 return fmt.Errorf("error scanning genome file %q: %s", cmd.genomeFile, err)
334                         }
335                 }
336
337                 // "bedtools complement" expects the chromosome sizes
338                 // ("genome file") to appear in the same order as the
339                 // chromosomes in the input vcf, so we need to sort
340                 // them.
341                 scanner = bufio.NewScanner(bytes.NewBuffer(append([]byte(nil), regions.Bytes()...)))
342                 var sortedGenomeFile bytes.Buffer
343                 for scanner.Scan() {
344                         var chr string
345                         var size int
346                         _, err := fmt.Sscanf(scanner.Text(), "%s\t%d", &chr, &size)
347                         if err != nil {
348                                 return fmt.Errorf("error parsing gvcf_regions output: %s", err)
349                         }
350                         if size, ok := chrSize[chr]; ok {
351                                 fmt.Fprintf(&sortedGenomeFile, "%s\t%d\n", chr, size)
352                                 delete(chrSize, chr)
353                         }
354                 }
355
356                 // The bcftools --mask argument needs to end in ".bed"
357                 // in order to be parsed as a BED file, so we need to
358                 // use a named pipe instead of stdin.
359                 tempdir, err := ioutil.TempDir("", "")
360                 if err != nil {
361                         return fmt.Errorf("TempDir: %s", err)
362                 }
363                 defer os.RemoveAll(tempdir)
364                 maskfifo = filepath.Join(tempdir, "fifo.bed")
365                 err = syscall.Mkfifo(maskfifo, 0600)
366                 if err != nil {
367                         return fmt.Errorf("mkfifo: %s", err)
368                 }
369
370                 // bedtools complement can't seem to read from a pipe
371                 // reliably -- "Error: line number 1 of file
372                 // /dev/stdin has 1 fields, but 3 were expected." --
373                 // so we stage to a temp file.
374                 regionsFile := filepath.Join(tempdir, "gvcf_regions.bed")
375                 err = ioutil.WriteFile(regionsFile, regions.Bytes(), 0644)
376                 if err != nil {
377                         return err
378                 }
379
380                 wg.Add(1)
381                 go func() {
382                         defer wg.Done()
383
384                         maskfifow, err := os.OpenFile(maskfifo, os.O_WRONLY, 0)
385                         if err != nil {
386                                 errs <- err
387                                 return
388                         }
389                         defer maskfifow.Close()
390
391                         bedcompargs := []string{"bedtools", "complement", "-i", regionsFile, "-g", "/dev/stdin"}
392                         bedcompargs = maybeInDocker(bedcompargs, []string{cmd.genomeFile})
393                         bedcomp := exec.CommandContext(ctx, bedcompargs[0], bedcompargs[1:]...)
394                         bedcomp.Stdin = &sortedGenomeFile
395                         bedcomp.Stdout = maskfifow
396                         bedcomp.Stderr = cmd.stderr
397                         log.Printf("running %v", bedcomp.Args)
398                         err = bedcomp.Run()
399                         log.Printf("exited %v", bedcomp.Args)
400                         if err != nil {
401                                 errs <- fmt.Errorf("bedtools complement: %s", err)
402                                 return
403                         }
404                         err = maskfifow.Close()
405                         if err != nil {
406                                 errs <- err
407                                 return
408                         }
409                 }()
410         }
411
412         wg.Add(1)
413         go func() {
414                 defer wg.Done()
415                 consargs := []string{"bcftools", "consensus", "--fasta-ref", cmd.refFile, "-H", fmt.Sprint(phase)}
416                 if maskfifo != "" {
417                         consargs = append(consargs, "--mask", maskfifo)
418                 }
419                 consargs = append(consargs, infile)
420                 indexsuffix := ".tbi"
421                 if _, err := os.Stat(infile + ".csi"); err == nil {
422                         indexsuffix = ".csi"
423                 }
424                 mounts := []string{infile, infile + indexsuffix, cmd.refFile}
425                 if maskfifo != "" {
426                         mounts = append(mounts, maskfifo)
427                 }
428                 consargs = maybeInDocker(consargs, mounts)
429
430                 consensus := exec.CommandContext(ctx, consargs[0], consargs[1:]...)
431                 consensus.Stderr = os.Stderr
432                 consensus.Stdout = gzipw
433                 consensus.Stderr = cmd.stderr
434                 log.Printf("running %v", consensus.Args)
435                 err = consensus.Run()
436                 if err != nil {
437                         errs <- fmt.Errorf("bcftools consensus: %s", err)
438                         return
439                 }
440                 log.Printf("exited %v", consensus.Args)
441                 err = gzipw.Close()
442                 if err != nil {
443                         errs <- err
444                         return
445                 }
446                 err = bufw.Flush()
447                 if err != nil {
448                         errs <- err
449                         return
450                 }
451                 errs <- outf.Close()
452         }()
453
454         go func() {
455                 wg.Wait()
456                 close(errs)
457         }()
458
459         for err := range errs {
460                 if err != nil {
461                         cancel()
462                         wg.Wait()
463                         return err
464                 }
465         }
466         return nil
467 }
468
469 func (cmd *vcf2fasta) loadRegionsPy() error {
470         if strings.HasPrefix(cmd.gvcfRegionsPy, "http") {
471                 resp, err := http.Get(cmd.gvcfRegionsPy)
472                 if err != nil {
473                         return err
474                 }
475                 if resp.StatusCode != http.StatusOK {
476                         return fmt.Errorf("get %q: http status %d", cmd.gvcfRegionsPy, resp.StatusCode)
477                 }
478                 buf, err := io.ReadAll(resp.Body)
479                 if err != nil {
480                         return fmt.Errorf("get %q: read body: %s", cmd.gvcfRegionsPy, err)
481                 }
482                 cmd.gvcfRegionsPyData = buf
483                 return nil
484         } else {
485                 buf, err := ioutil.ReadFile(cmd.gvcfRegionsPy)
486                 if err != nil {
487                         return err
488                 }
489                 cmd.gvcfRegionsPyData = buf
490                 return nil
491         }
492 }