Request more memory when using mask.
[lightning.git] / vcf2fasta.go
1 package main
2
3 import (
4         "bufio"
5         "bytes"
6         "compress/gzip"
7         "context"
8         "errors"
9         "flag"
10         "fmt"
11         "io"
12         "io/ioutil"
13         "net/http"
14         _ "net/http/pprof"
15         "os"
16         "os/exec"
17         "path/filepath"
18         "regexp"
19         "runtime"
20         "strconv"
21         "strings"
22         "sync"
23         "syscall"
24
25         "git.arvados.org/arvados.git/sdk/go/arvados"
26         log "github.com/sirupsen/logrus"
27 )
28
29 type vcf2fasta struct {
30         refFile           string
31         genomeFile        string
32         mask              bool
33         gvcfRegionsPy     string
34         gvcfRegionsPyData []byte
35         projectUUID       string
36         outputDir         string
37         runLocal          bool
38         vcpus             int
39
40         stderr io.Writer
41 }
42
43 func (cmd *vcf2fasta) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
44         var err error
45         defer func() {
46                 if err != nil {
47                         fmt.Fprintf(stderr, "%s\n", err)
48                 }
49         }()
50         flags := flag.NewFlagSet("", flag.ContinueOnError)
51         flags.SetOutput(stderr)
52         flags.StringVar(&cmd.refFile, "ref", "", "reference fasta `file`")
53         flags.StringVar(&cmd.genomeFile, "genome", "", "reference genome `file`")
54         flags.BoolVar(&cmd.mask, "mask", false, "mask uncalled regions (default: output hom ref)")
55         flags.StringVar(&cmd.gvcfRegionsPy, "gvcf-regions.py", "https://raw.githubusercontent.com/lijiayong/gvcf_regions/master/gvcf_regions.py", "source of gvcf_regions.py")
56         flags.StringVar(&cmd.projectUUID, "project", "", "project `UUID` for containers and output data")
57         flags.StringVar(&cmd.outputDir, "output-dir", "", "output directory")
58         flags.IntVar(&cmd.vcpus, "vcpus", 0, "number of VCPUs to request for arvados container (default: 2*number of input files, max 32)")
59         flags.BoolVar(&cmd.runLocal, "local", false, "run on local host (default: run in an arvados container)")
60         priority := flags.Int("priority", 500, "container request priority")
61         pprof := flags.String("pprof", "", "serve Go profile data at http://`[addr]:port`")
62         err = flags.Parse(args)
63         if err == flag.ErrHelp {
64                 err = nil
65                 return 0
66         } else if err != nil {
67                 return 2
68         } else if cmd.refFile == "" {
69                 err = errors.New("reference data (-ref) not specified")
70                 return 2
71         } else if flags.NArg() == 0 {
72                 flags.Usage()
73                 return 2
74         }
75         cmd.stderr = stderr
76
77         if *pprof != "" {
78                 go func() {
79                         log.Println(http.ListenAndServe(*pprof, nil))
80                 }()
81         }
82
83         if cmd.mask {
84                 err = cmd.loadRegionsPy()
85                 if err != nil {
86                         return 1
87                 }
88         }
89
90         if !cmd.runLocal {
91                 if cmd.outputDir != "" {
92                         err = errors.New("cannot specify output dir in non-local mode")
93                         return 2
94                 }
95                 if cmd.vcpus < 1 {
96                         var infiles []string
97                         infiles, err = listInputFiles(flags.Args())
98                         if err != nil {
99                                 return 1
100                         }
101                         if cmd.vcpus = len(infiles) * 2; cmd.vcpus > 32 {
102                                 cmd.vcpus = 32
103                         }
104                 }
105                 runner := arvadosContainerRunner{
106                         Name:        "lightning vcf2fasta",
107                         Client:      arvados.NewClientFromEnv(),
108                         ProjectUUID: cmd.projectUUID,
109                         RAM:         2<<30 + int64(cmd.vcpus)<<28,
110                         VCPUs:       cmd.vcpus,
111                         Priority:    *priority,
112                         Mounts: map[string]map[string]interface{}{
113                                 "/gvcf_regions.py": map[string]interface{}{
114                                         "kind":    "text",
115                                         "content": string(cmd.gvcfRegionsPyData),
116                                 },
117                         },
118                 }
119                 if cmd.mask {
120                         runner.RAM += int64(cmd.vcpus) << 31
121                 }
122                 err = runner.TranslatePaths(&cmd.refFile, &cmd.genomeFile)
123                 if err != nil {
124                         return 1
125                 }
126                 inputs := flags.Args()
127                 for i := range inputs {
128                         err = runner.TranslatePaths(&inputs[i])
129                         if err != nil {
130                                 return 1
131                         }
132                 }
133                 runner.Args = append([]string{"vcf2fasta",
134                         "-local=true",
135                         "-ref", cmd.refFile, fmt.Sprintf("-mask=%v", cmd.mask),
136                         "-genome", cmd.genomeFile,
137                         "-gvcf-regions.py", "/gvcf_regions.py",
138                         "-output-dir", "/mnt/output"}, inputs...)
139                 var output string
140                 output, err = runner.Run()
141                 if err != nil {
142                         return 1
143                 }
144                 fmt.Fprintln(stdout, output)
145                 return 0
146         }
147
148         infiles, err := listInputFiles(flags.Args())
149         if err != nil {
150                 return 1
151         }
152
153         type job struct {
154                 vcffile string
155                 phase   int
156         }
157         todo := make(chan job)
158         go func() {
159                 for _, infile := range infiles {
160                         for phase := 1; phase <= 2; phase++ {
161                                 todo <- job{vcffile: infile, phase: phase}
162                         }
163                 }
164                 close(todo)
165         }()
166
167         done := make(chan error, runtime.NumCPU()*2)
168         var wg sync.WaitGroup
169         for i := 0; i < runtime.NumCPU(); i++ {
170                 wg.Add(1)
171                 go func() {
172                         defer wg.Done()
173                         for job := range todo {
174                                 if len(done) > 0 {
175                                         // a different worker encountered an error
176                                         return
177                                 }
178                                 err := cmd.vcf2fasta(job.vcffile, job.phase)
179                                 if err != nil {
180                                         done <- fmt.Errorf("%s phase %d: %s", job.vcffile, job.phase, err)
181                                         return
182                                 }
183                         }
184                 }()
185         }
186         go func() {
187                 wg.Wait()
188                 close(done)
189         }()
190
191         err = <-done
192         if err != nil {
193                 return 1
194         }
195         return 0
196 }
197
198 func maybeInDocker(args, mountfiles []string) []string {
199         if out, err := exec.Command("docker", "image", "ls", "-q", "lightning-runtime").Output(); err != nil || len(out) == 0 {
200                 return args
201         }
202         dockerrun := []string{
203                 "docker", "run", "--rm", "-i",
204                 "--log-driver=none",
205         }
206         for _, f := range mountfiles {
207                 dockerrun = append(dockerrun, "--volume="+f+":"+f+":ro")
208         }
209         dockerrun = append(dockerrun, "lightning-runtime")
210         dockerrun = append(dockerrun, args...)
211         return dockerrun
212 }
213
214 func (cmd *vcf2fasta) vcf2fasta(infile string, phase int) error {
215         ctx, cancel := context.WithCancel(context.Background())
216         defer cancel()
217
218         _, basename := filepath.Split(infile)
219         outfile := filepath.Join(cmd.outputDir, fmt.Sprintf("%s.%d.fasta.gz", basename, phase))
220         outf, err := os.OpenFile(outfile, os.O_CREATE|os.O_WRONLY, 0777)
221         if err != nil {
222                 return fmt.Errorf("error opening output file: %s", err)
223         }
224         defer outf.Close()
225         gzipw := gzip.NewWriter(outf)
226         defer gzipw.Close()
227
228         var maskfifo string // filename of mask fifo if we're running bedtools, otherwise ""
229
230         var wg sync.WaitGroup
231         errs := make(chan error, 2)
232         if cmd.mask {
233                 chrSize := map[string]int{}
234
235                 vcffile, err := os.Open(infile)
236                 if err != nil {
237                         return err
238                 }
239                 defer vcffile.Close()
240                 var rdr io.Reader = vcffile
241                 if strings.HasSuffix(infile, ".gz") {
242                         rdr, err = gzip.NewReader(vcffile)
243                         if err != nil {
244                                 return err
245                         }
246                 }
247                 contigre := regexp.MustCompile(`([^=,]*)=([^>,]*)`)
248                 scanner := bufio.NewScanner(rdr)
249                 for scanner.Scan() {
250                         if s := scanner.Text(); !strings.HasPrefix(s, "##") {
251                                 break
252                         } else if !strings.HasPrefix(s, "##contig=<") {
253                                 continue
254                         } else {
255                                 kv := map[string]string{}
256                                 for _, m := range contigre.FindAllStringSubmatch(s[10:], -1) {
257                                         kv[m[1]] = m[2]
258                                 }
259                                 if kv["ID"] != "" && kv["length"] != "" {
260                                         chrSize[kv["ID"]], _ = strconv.Atoi(kv["length"])
261                                 }
262                         }
263                 }
264                 if err = scanner.Err(); err != nil {
265                         return fmt.Errorf("error scanning input file %q: %s", infile, err)
266                 }
267                 var regions bytes.Buffer
268                 bedargs := []string{"python2", "-", "--gvcf_type", "gatk", infile}
269                 bed := exec.CommandContext(ctx, bedargs[0], bedargs[1:]...)
270                 bed.Stdin = bytes.NewBuffer(cmd.gvcfRegionsPyData)
271                 bed.Stdout = &regions
272                 bed.Stderr = cmd.stderr
273                 log.Printf("running %v", bed.Args)
274                 err = bed.Run()
275                 log.Printf("exited %v", bed.Args)
276                 if err != nil {
277                         return fmt.Errorf("gvcf_regions: %s", err)
278                 }
279
280                 if cmd.genomeFile != "" {
281                         // Read chromosome sizes from genome file in
282                         // case any weren't specified in the VCF
283                         // header.
284                         genomeFile, err := os.Open(cmd.genomeFile)
285                         if err != nil {
286                                 return fmt.Errorf("error opening genome file %q: %s", cmd.genomeFile, err)
287                         }
288                         scanner := bufio.NewScanner(genomeFile)
289                         for scanner.Scan() {
290                                 var chr string
291                                 var size int
292                                 _, err := fmt.Sscanf(scanner.Text(), "%s\t%d", &chr, &size)
293                                 if err != nil {
294                                         return fmt.Errorf("error parsing genome file %q: %s", cmd.genomeFile, err)
295                                 }
296                                 if chrSize[chr] == 0 {
297                                         chrSize[chr] = size
298                                 }
299                         }
300                         if err = scanner.Err(); err != nil {
301                                 return fmt.Errorf("error scanning genome file %q: %s", cmd.genomeFile, err)
302                         }
303                 }
304
305                 // "bedtools complement" expects the chromosome sizes
306                 // ("genome file") to appear in the same order as the
307                 // chromosomes in the input vcf, so we need to sort
308                 // them.
309                 scanner = bufio.NewScanner(bytes.NewBuffer(append([]byte(nil), regions.Bytes()...)))
310                 var sortedGenomeFile bytes.Buffer
311                 for scanner.Scan() {
312                         var chr string
313                         var size int
314                         _, err := fmt.Sscanf(scanner.Text(), "%s\t%d", &chr, &size)
315                         if err != nil {
316                                 return fmt.Errorf("error parsing gvcf_regions output: %s", err)
317                         }
318                         if size, ok := chrSize[chr]; ok {
319                                 fmt.Fprintf(&sortedGenomeFile, "%s\t%d\n", chr, size)
320                                 delete(chrSize, chr)
321                         }
322                 }
323
324                 // The bcftools --mask argument needs to end in ".bed"
325                 // in order to be parsed as a BED file, so we need to
326                 // use a named pipe instead of stdin.
327                 tempdir, err := ioutil.TempDir("", "")
328                 if err != nil {
329                         return fmt.Errorf("TempDir: %s", err)
330                 }
331                 defer os.RemoveAll(tempdir)
332                 maskfifo = filepath.Join(tempdir, "fifo.bed")
333                 err = syscall.Mkfifo(maskfifo, 0600)
334                 if err != nil {
335                         return fmt.Errorf("mkfifo: %s", err)
336                 }
337
338                 // bedtools complement can't seem to read from a pipe
339                 // reliably -- "Error: line number 1 of file
340                 // /dev/stdin has 1 fields, but 3 were expected." --
341                 // so we stage to a temp file.
342                 regionsFile := filepath.Join(tempdir, "gvcf_regions.bed")
343                 err = ioutil.WriteFile(regionsFile, regions.Bytes(), 0644)
344                 if err != nil {
345                         return err
346                 }
347
348                 wg.Add(1)
349                 go func() {
350                         defer wg.Done()
351
352                         maskfifow, err := os.OpenFile(maskfifo, os.O_WRONLY, 0)
353                         if err != nil {
354                                 errs <- err
355                                 return
356                         }
357                         defer maskfifow.Close()
358
359                         bedcompargs := []string{"bedtools", "complement", "-i", regionsFile, "-g", "/dev/stdin"}
360                         bedcompargs = maybeInDocker(bedcompargs, []string{cmd.genomeFile})
361                         bedcomp := exec.CommandContext(ctx, bedcompargs[0], bedcompargs[1:]...)
362                         bedcomp.Stdin = &sortedGenomeFile
363                         bedcomp.Stdout = maskfifow
364                         bedcomp.Stderr = cmd.stderr
365                         log.Printf("running %v", bedcomp.Args)
366                         err = bedcomp.Run()
367                         log.Printf("exited %v", bedcomp.Args)
368                         if err != nil {
369                                 errs <- fmt.Errorf("bedtools complement: %s", err)
370                                 return
371                         }
372                         err = maskfifow.Close()
373                         if err != nil {
374                                 errs <- err
375                                 return
376                         }
377                 }()
378         }
379
380         wg.Add(1)
381         go func() {
382                 defer wg.Done()
383                 consargs := []string{"bcftools", "consensus", "--fasta-ref", cmd.refFile, "-H", fmt.Sprint(phase)}
384                 if maskfifo != "" {
385                         consargs = append(consargs, "--mask", maskfifo)
386                 }
387                 consargs = append(consargs, infile)
388                 indexsuffix := ".tbi"
389                 if _, err := os.Stat(infile + ".csi"); err == nil {
390                         indexsuffix = ".csi"
391                 }
392                 mounts := []string{infile, infile + indexsuffix, cmd.refFile}
393                 if maskfifo != "" {
394                         mounts = append(mounts, maskfifo)
395                 }
396                 consargs = maybeInDocker(consargs, mounts)
397
398                 consensus := exec.CommandContext(ctx, consargs[0], consargs[1:]...)
399                 consensus.Stderr = os.Stderr
400                 consensus.Stdout = gzipw
401                 consensus.Stderr = cmd.stderr
402                 log.Printf("running %v", consensus.Args)
403                 err = consensus.Run()
404                 if err != nil {
405                         errs <- fmt.Errorf("bcftools consensus: %s", err)
406                         return
407                 }
408                 err = gzipw.Close()
409                 if err != nil {
410                         errs <- err
411                         return
412                 }
413                 errs <- outf.Close()
414         }()
415
416         go func() {
417                 wg.Wait()
418                 close(errs)
419         }()
420
421         for err := range errs {
422                 if err != nil {
423                         cancel()
424                         wg.Wait()
425                         return err
426                 }
427         }
428         return nil
429 }
430
431 func (cmd *vcf2fasta) loadRegionsPy() error {
432         if strings.HasPrefix(cmd.gvcfRegionsPy, "http") {
433                 resp, err := http.Get(cmd.gvcfRegionsPy)
434                 if err != nil {
435                         return err
436                 }
437                 if resp.StatusCode != http.StatusOK {
438                         return fmt.Errorf("get %q: http status %d", cmd.gvcfRegionsPy, resp.StatusCode)
439                 }
440                 buf, err := ioutil.ReadAll(resp.Body)
441                 if err != nil {
442                         return fmt.Errorf("get %q: read body: %s", cmd.gvcfRegionsPy, err)
443                 }
444                 cmd.gvcfRegionsPyData = buf
445                 return nil
446         } else {
447                 buf, err := ioutil.ReadFile(cmd.gvcfRegionsPy)
448                 if err != nil {
449                         return err
450                 }
451                 cmd.gvcfRegionsPyData = buf
452                 return nil
453         }
454 }