Bump merge memory.
[lightning.git] / vcf2fasta.go
1 package lightning
2
3 import (
4         "bufio"
5         "bytes"
6         "compress/gzip"
7         "context"
8         "errors"
9         "flag"
10         "fmt"
11         "io"
12         "io/ioutil"
13         "net/http"
14         _ "net/http/pprof"
15         "os"
16         "os/exec"
17         "path/filepath"
18         "regexp"
19         "runtime"
20         "strconv"
21         "strings"
22         "sync"
23         "syscall"
24
25         "github.com/klauspost/pgzip"
26         log "github.com/sirupsen/logrus"
27 )
28
29 type vcf2fasta struct {
30         refFile           string
31         genomeFile        string
32         mask              bool
33         gvcfRegionsPy     string
34         gvcfRegionsPyData []byte
35         gvcfType          string
36         projectUUID       string
37         outputDir         string
38         runLocal          bool
39         vcpus             int
40         batchArgs
41
42         stderr io.Writer
43 }
44
45 func (cmd *vcf2fasta) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
46         var err error
47         defer func() {
48                 if err != nil {
49                         fmt.Fprintf(stderr, "%s\n", err)
50                 }
51         }()
52         flags := flag.NewFlagSet("", flag.ContinueOnError)
53         flags.SetOutput(stderr)
54         flags.StringVar(&cmd.refFile, "ref", "", "reference fasta `file`")
55         flags.StringVar(&cmd.genomeFile, "genome", "", "reference genome `file`")
56         flags.BoolVar(&cmd.mask, "mask", false, "mask uncalled regions (default: output hom ref)")
57         flags.StringVar(&cmd.gvcfRegionsPy, "gvcf-regions.py", "https://raw.githubusercontent.com/lijiayong/gvcf_regions/master/gvcf_regions.py", "source of gvcf_regions.py")
58         flags.StringVar(&cmd.gvcfType, "gvcf-type", "gatk", "gvcf_type argument to gvcf_regions.py: gatk, complete_genomics, freebayes")
59         flags.StringVar(&cmd.projectUUID, "project", "", "project `UUID` for containers and output data")
60         flags.StringVar(&cmd.outputDir, "output-dir", "", "output directory")
61         flags.IntVar(&cmd.vcpus, "vcpus", 0, "number of VCPUs to request for arvados container (default: 2*number of input files, max 32)")
62         flags.BoolVar(&cmd.runLocal, "local", false, "run on local host (default: run in an arvados container)")
63         cmd.batchArgs.Flags(flags)
64         priority := flags.Int("priority", 500, "container request priority")
65         pprof := flags.String("pprof", "", "serve Go profile data at http://`[addr]:port`")
66         err = flags.Parse(args)
67         if err == flag.ErrHelp {
68                 err = nil
69                 return 0
70         } else if err != nil {
71                 return 2
72         } else if cmd.refFile == "" {
73                 err = errors.New("reference data (-ref) not specified")
74                 return 2
75         } else if flags.NArg() == 0 {
76                 flags.Usage()
77                 return 2
78         }
79         cmd.stderr = stderr
80
81         if *pprof != "" {
82                 go func() {
83                         log.Println(http.ListenAndServe(*pprof, nil))
84                 }()
85         }
86
87         if cmd.mask {
88                 err = cmd.loadRegionsPy()
89                 if err != nil {
90                         return 1
91                 }
92         }
93
94         if !cmd.runLocal {
95                 if cmd.outputDir != "" {
96                         err = errors.New("cannot specify output dir in non-local mode")
97                         return 2
98                 }
99                 if cmd.vcpus < 1 {
100                         var infiles []string
101                         infiles, err = listInputFiles(flags.Args())
102                         if err != nil {
103                                 return 1
104                         }
105                         batchsize := (len(infiles) + cmd.batchArgs.batches - 1) / cmd.batchArgs.batches
106                         if cmd.vcpus = batchsize * 2; cmd.vcpus > 32 {
107                                 cmd.vcpus = 32
108                         }
109                 }
110                 runner := arvadosContainerRunner{
111                         Name:        "lightning vcf2fasta",
112                         Client:      arvadosClientFromEnv,
113                         ProjectUUID: cmd.projectUUID,
114                         RAM:         2<<30 + int64(cmd.vcpus)<<28,
115                         VCPUs:       cmd.vcpus,
116                         Priority:    *priority,
117                         KeepCache:   2,
118                         APIAccess:   true,
119                         Mounts: map[string]map[string]interface{}{
120                                 "/gvcf_regions.py": map[string]interface{}{
121                                         "kind":    "text",
122                                         "content": string(cmd.gvcfRegionsPyData),
123                                 },
124                         },
125                 }
126                 err = runner.TranslatePaths(&cmd.refFile, &cmd.genomeFile)
127                 if err != nil {
128                         return 1
129                 }
130                 inputs := flags.Args()
131                 for i := range inputs {
132                         err = runner.TranslatePaths(&inputs[i])
133                         if err != nil {
134                                 return 1
135                         }
136                 }
137                 var outputs []string
138                 outputs, err = cmd.batchArgs.RunBatches(context.Background(), func(ctx context.Context, batch int) (string, error) {
139                         runner := runner
140                         if cmd.mask {
141                                 runner.RAM += int64(cmd.vcpus) << 31
142                         }
143                         runner.Args = []string{"vcf2fasta",
144                                 "-local=true",
145                                 "-ref", cmd.refFile, fmt.Sprintf("-mask=%v", cmd.mask),
146                                 "-genome", cmd.genomeFile,
147                                 "-gvcf-regions.py", "/gvcf_regions.py",
148                                 "-gvcf-type", cmd.gvcfType,
149                                 "-output-dir", "/mnt/output",
150                         }
151                         runner.Args = append(runner.Args, cmd.batchArgs.Args(batch)...)
152                         runner.Args = append(runner.Args, inputs...)
153                         log.Printf("batch %d: %v", batch, runner.Args)
154                         return runner.RunContext(ctx)
155                 })
156                 if err != nil {
157                         return 1
158                 }
159                 fmt.Fprintln(stdout, strings.Join(outputs, " "))
160                 return 0
161         }
162
163         infiles, err := listInputFiles(flags.Args())
164         if err != nil {
165                 return 1
166         }
167         infiles = cmd.batchArgs.Slice(infiles)
168
169         type job struct {
170                 vcffile string
171                 phase   int
172         }
173         todo := make(chan job)
174         go func() {
175                 for _, infile := range infiles {
176                         for phase := 1; phase <= 2; phase++ {
177                                 todo <- job{vcffile: infile, phase: phase}
178                         }
179                 }
180                 close(todo)
181         }()
182
183         done := make(chan error, runtime.NumCPU()*2)
184         var wg sync.WaitGroup
185         for i := 0; i < runtime.NumCPU(); i++ {
186                 wg.Add(1)
187                 go func() {
188                         defer wg.Done()
189                         for job := range todo {
190                                 if len(done) > 0 {
191                                         // a different worker encountered an error
192                                         return
193                                 }
194                                 err := cmd.vcf2fasta(job.vcffile, job.phase)
195                                 if err != nil {
196                                         done <- fmt.Errorf("%s phase %d: %s", job.vcffile, job.phase, err)
197                                         return
198                                 }
199                         }
200                 }()
201         }
202         go func() {
203                 wg.Wait()
204                 close(done)
205         }()
206
207         err = <-done
208         if err != nil {
209                 return 1
210         }
211         return 0
212 }
213
214 func maybeInDocker(args, mountfiles []string) []string {
215         if out, err := exec.Command("docker", "image", "ls", "-q", "lightning-runtime").Output(); err != nil || len(out) == 0 {
216                 return args
217         }
218         dockerrun := []string{
219                 "docker", "run", "--rm", "-i",
220                 "--log-driver=none",
221         }
222         for _, f := range mountfiles {
223                 dockerrun = append(dockerrun, "--volume="+f+":"+f+":ro")
224         }
225         dockerrun = append(dockerrun, "lightning-runtime")
226         dockerrun = append(dockerrun, args...)
227         return dockerrun
228 }
229
230 func (cmd *vcf2fasta) vcf2fasta(infile string, phase int) error {
231         ctx, cancel := context.WithCancel(context.Background())
232         defer cancel()
233
234         _, basename := filepath.Split(infile)
235         outfile := filepath.Join(cmd.outputDir, fmt.Sprintf("%s.%d.fasta.gz", basename, phase))
236         outf, err := os.OpenFile(outfile, os.O_CREATE|os.O_WRONLY, 0777)
237         if err != nil {
238                 return fmt.Errorf("error opening output file: %s", err)
239         }
240         defer outf.Close()
241         bufw := bufio.NewWriterSize(outf, 8*1024*1024)
242         gzipw := pgzip.NewWriter(bufw)
243         defer gzipw.Close()
244
245         var maskfifo string // filename of mask fifo if we're running bedtools, otherwise ""
246
247         var wg sync.WaitGroup
248         errs := make(chan error, 2)
249         if cmd.mask {
250                 chrSize := map[string]int{}
251
252                 vcffile, err := open(infile)
253                 if err != nil {
254                         return err
255                 }
256                 defer vcffile.Close()
257                 var rdr io.Reader = vcffile
258                 rdr = bufio.NewReaderSize(rdr, 8*1024*1024)
259                 if strings.HasSuffix(infile, ".gz") {
260                         rdr, err = gzip.NewReader(vcffile)
261                         if err != nil {
262                                 return err
263                         }
264                 }
265                 contigre := regexp.MustCompile(`([^=,]*)=([^>,]*)`)
266                 scanner := bufio.NewScanner(rdr)
267                 for scanner.Scan() {
268                         if s := scanner.Text(); !strings.HasPrefix(s, "##") {
269                                 break
270                         } else if !strings.HasPrefix(s, "##contig=<") {
271                                 continue
272                         } else {
273                                 kv := map[string]string{}
274                                 for _, m := range contigre.FindAllStringSubmatch(s[10:], -1) {
275                                         kv[m[1]] = m[2]
276                                 }
277                                 if kv["ID"] != "" && kv["length"] != "" {
278                                         chrSize[kv["ID"]], _ = strconv.Atoi(kv["length"])
279                                 }
280                         }
281                 }
282                 if err = scanner.Err(); err != nil {
283                         return fmt.Errorf("error scanning input file %q: %s", infile, err)
284                 }
285
286                 var regions bytes.Buffer
287                 bedargs := []string{"python2", "-"}
288                 if cmd.gvcfType == "complete_genomics_pass_all" {
289                         bedargs = append(bedargs,
290                                 "--ignore_phrases", "CNV", "INS:ME",
291                                 "--unreported_is_called",
292                         )
293                 } else if cmd.gvcfType != "" {
294                         bedargs = append(bedargs, "--gvcf_type", cmd.gvcfType)
295                 }
296                 bedargs = append(bedargs, infile)
297                 bed := exec.CommandContext(ctx, bedargs[0], bedargs[1:]...)
298                 bed.Stdin = bytes.NewBuffer(cmd.gvcfRegionsPyData)
299                 bed.Stdout = &regions
300                 bed.Stderr = cmd.stderr
301                 log.Printf("running %v", bed.Args)
302                 err = bed.Run()
303                 log.Printf("exited %v", bed.Args)
304                 if err != nil {
305                         return fmt.Errorf("gvcf_regions: %s", err)
306                 }
307
308                 if cmd.genomeFile != "" {
309                         // Read chromosome sizes from genome file in
310                         // case any weren't specified in the VCF
311                         // header.
312                         genomeFile, err := open(cmd.genomeFile)
313                         if err != nil {
314                                 return fmt.Errorf("error opening genome file %q: %s", cmd.genomeFile, err)
315                         }
316                         scanner := bufio.NewScanner(genomeFile)
317                         for scanner.Scan() {
318                                 var chr string
319                                 var size int
320                                 _, err := fmt.Sscanf(scanner.Text(), "%s\t%d", &chr, &size)
321                                 if err != nil {
322                                         return fmt.Errorf("error parsing genome file %q: %s", cmd.genomeFile, err)
323                                 }
324                                 if chrSize[chr] == 0 {
325                                         chrSize[chr] = size
326                                 }
327                         }
328                         if err = scanner.Err(); err != nil {
329                                 return fmt.Errorf("error scanning genome file %q: %s", cmd.genomeFile, err)
330                         }
331                 }
332
333                 // "bedtools complement" expects the chromosome sizes
334                 // ("genome file") to appear in the same order as the
335                 // chromosomes in the input vcf, so we need to sort
336                 // them.
337                 scanner = bufio.NewScanner(bytes.NewBuffer(append([]byte(nil), regions.Bytes()...)))
338                 var sortedGenomeFile bytes.Buffer
339                 for scanner.Scan() {
340                         var chr string
341                         var size int
342                         _, err := fmt.Sscanf(scanner.Text(), "%s\t%d", &chr, &size)
343                         if err != nil {
344                                 return fmt.Errorf("error parsing gvcf_regions output: %s", err)
345                         }
346                         if size, ok := chrSize[chr]; ok {
347                                 fmt.Fprintf(&sortedGenomeFile, "%s\t%d\n", chr, size)
348                                 delete(chrSize, chr)
349                         }
350                 }
351
352                 // The bcftools --mask argument needs to end in ".bed"
353                 // in order to be parsed as a BED file, so we need to
354                 // use a named pipe instead of stdin.
355                 tempdir, err := ioutil.TempDir("", "")
356                 if err != nil {
357                         return fmt.Errorf("TempDir: %s", err)
358                 }
359                 defer os.RemoveAll(tempdir)
360                 maskfifo = filepath.Join(tempdir, "fifo.bed")
361                 err = syscall.Mkfifo(maskfifo, 0600)
362                 if err != nil {
363                         return fmt.Errorf("mkfifo: %s", err)
364                 }
365
366                 // bedtools complement can't seem to read from a pipe
367                 // reliably -- "Error: line number 1 of file
368                 // /dev/stdin has 1 fields, but 3 were expected." --
369                 // so we stage to a temp file.
370                 regionsFile := filepath.Join(tempdir, "gvcf_regions.bed")
371                 err = ioutil.WriteFile(regionsFile, regions.Bytes(), 0644)
372                 if err != nil {
373                         return err
374                 }
375
376                 wg.Add(1)
377                 go func() {
378                         defer wg.Done()
379
380                         maskfifow, err := os.OpenFile(maskfifo, os.O_WRONLY, 0)
381                         if err != nil {
382                                 errs <- err
383                                 return
384                         }
385                         defer maskfifow.Close()
386
387                         bedcompargs := []string{"bedtools", "complement", "-i", regionsFile, "-g", "/dev/stdin"}
388                         bedcompargs = maybeInDocker(bedcompargs, []string{cmd.genomeFile})
389                         bedcomp := exec.CommandContext(ctx, bedcompargs[0], bedcompargs[1:]...)
390                         bedcomp.Stdin = &sortedGenomeFile
391                         bedcomp.Stdout = maskfifow
392                         bedcomp.Stderr = cmd.stderr
393                         log.Printf("running %v", bedcomp.Args)
394                         err = bedcomp.Run()
395                         log.Printf("exited %v", bedcomp.Args)
396                         if err != nil {
397                                 errs <- fmt.Errorf("bedtools complement: %s", err)
398                                 return
399                         }
400                         err = maskfifow.Close()
401                         if err != nil {
402                                 errs <- err
403                                 return
404                         }
405                 }()
406         }
407
408         wg.Add(1)
409         go func() {
410                 defer wg.Done()
411                 consargs := []string{"bcftools", "consensus", "--fasta-ref", cmd.refFile, "-H", fmt.Sprint(phase)}
412                 if maskfifo != "" {
413                         consargs = append(consargs, "--mask", maskfifo)
414                 }
415                 consargs = append(consargs, infile)
416                 indexsuffix := ".tbi"
417                 if _, err := os.Stat(infile + ".csi"); err == nil {
418                         indexsuffix = ".csi"
419                 }
420                 mounts := []string{infile, infile + indexsuffix, cmd.refFile}
421                 if maskfifo != "" {
422                         mounts = append(mounts, maskfifo)
423                 }
424                 consargs = maybeInDocker(consargs, mounts)
425
426                 consensus := exec.CommandContext(ctx, consargs[0], consargs[1:]...)
427                 consensus.Stderr = os.Stderr
428                 consensus.Stdout = gzipw
429                 consensus.Stderr = cmd.stderr
430                 log.Printf("running %v", consensus.Args)
431                 err = consensus.Run()
432                 if err != nil {
433                         errs <- fmt.Errorf("bcftools consensus: %s", err)
434                         return
435                 }
436                 log.Printf("exited %v", consensus.Args)
437                 err = gzipw.Close()
438                 if err != nil {
439                         errs <- err
440                         return
441                 }
442                 err = bufw.Flush()
443                 if err != nil {
444                         errs <- err
445                         return
446                 }
447                 errs <- outf.Close()
448         }()
449
450         go func() {
451                 wg.Wait()
452                 close(errs)
453         }()
454
455         for err := range errs {
456                 if err != nil {
457                         cancel()
458                         wg.Wait()
459                         return err
460                 }
461         }
462         return nil
463 }
464
465 func (cmd *vcf2fasta) loadRegionsPy() error {
466         if strings.HasPrefix(cmd.gvcfRegionsPy, "http") {
467                 resp, err := http.Get(cmd.gvcfRegionsPy)
468                 if err != nil {
469                         return err
470                 }
471                 if resp.StatusCode != http.StatusOK {
472                         return fmt.Errorf("get %q: http status %d", cmd.gvcfRegionsPy, resp.StatusCode)
473                 }
474                 buf, err := ioutil.ReadAll(resp.Body)
475                 if err != nil {
476                         return fmt.Errorf("get %q: read body: %s", cmd.gvcfRegionsPy, err)
477                 }
478                 cmd.gvcfRegionsPyData = buf
479                 return nil
480         } else {
481                 buf, err := ioutil.ReadFile(cmd.gvcfRegionsPy)
482                 if err != nil {
483                         return err
484                 }
485                 cmd.gvcfRegionsPyData = buf
486                 return nil
487         }
488 }