Pass gvcf-type to gvcf_regions.
[lightning.git] / vcf2fasta.go
1 package main
2
3 import (
4         "bufio"
5         "bytes"
6         "compress/gzip"
7         "context"
8         "errors"
9         "flag"
10         "fmt"
11         "io"
12         "io/ioutil"
13         "net/http"
14         _ "net/http/pprof"
15         "os"
16         "os/exec"
17         "path/filepath"
18         "regexp"
19         "runtime"
20         "strconv"
21         "strings"
22         "sync"
23         "syscall"
24
25         "git.arvados.org/arvados.git/sdk/go/arvados"
26         log "github.com/sirupsen/logrus"
27 )
28
29 type vcf2fasta struct {
30         refFile           string
31         genomeFile        string
32         mask              bool
33         gvcfRegionsPy     string
34         gvcfRegionsPyData []byte
35         gvcfType          string
36         projectUUID       string
37         outputDir         string
38         runLocal          bool
39         vcpus             int
40
41         stderr io.Writer
42 }
43
44 func (cmd *vcf2fasta) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
45         var err error
46         defer func() {
47                 if err != nil {
48                         fmt.Fprintf(stderr, "%s\n", err)
49                 }
50         }()
51         flags := flag.NewFlagSet("", flag.ContinueOnError)
52         flags.SetOutput(stderr)
53         flags.StringVar(&cmd.refFile, "ref", "", "reference fasta `file`")
54         flags.StringVar(&cmd.genomeFile, "genome", "", "reference genome `file`")
55         flags.BoolVar(&cmd.mask, "mask", false, "mask uncalled regions (default: output hom ref)")
56         flags.StringVar(&cmd.gvcfRegionsPy, "gvcf-regions.py", "https://raw.githubusercontent.com/lijiayong/gvcf_regions/master/gvcf_regions.py", "source of gvcf_regions.py")
57         flags.StringVar(&cmd.gvcfType, "gvcf-type", "gatk", "gvcf_type argument to gvcf_regions.py: gatk, complete_genomics, freebayes")
58         flags.StringVar(&cmd.projectUUID, "project", "", "project `UUID` for containers and output data")
59         flags.StringVar(&cmd.outputDir, "output-dir", "", "output directory")
60         flags.IntVar(&cmd.vcpus, "vcpus", 0, "number of VCPUs to request for arvados container (default: 2*number of input files, max 32)")
61         flags.BoolVar(&cmd.runLocal, "local", false, "run on local host (default: run in an arvados container)")
62         priority := flags.Int("priority", 500, "container request priority")
63         pprof := flags.String("pprof", "", "serve Go profile data at http://`[addr]:port`")
64         err = flags.Parse(args)
65         if err == flag.ErrHelp {
66                 err = nil
67                 return 0
68         } else if err != nil {
69                 return 2
70         } else if cmd.refFile == "" {
71                 err = errors.New("reference data (-ref) not specified")
72                 return 2
73         } else if flags.NArg() == 0 {
74                 flags.Usage()
75                 return 2
76         }
77         cmd.stderr = stderr
78
79         if *pprof != "" {
80                 go func() {
81                         log.Println(http.ListenAndServe(*pprof, nil))
82                 }()
83         }
84
85         if cmd.mask {
86                 err = cmd.loadRegionsPy()
87                 if err != nil {
88                         return 1
89                 }
90         }
91
92         if !cmd.runLocal {
93                 if cmd.outputDir != "" {
94                         err = errors.New("cannot specify output dir in non-local mode")
95                         return 2
96                 }
97                 if cmd.vcpus < 1 {
98                         var infiles []string
99                         infiles, err = listInputFiles(flags.Args())
100                         if err != nil {
101                                 return 1
102                         }
103                         if cmd.vcpus = len(infiles) * 2; cmd.vcpus > 32 {
104                                 cmd.vcpus = 32
105                         }
106                 }
107                 runner := arvadosContainerRunner{
108                         Name:        "lightning vcf2fasta",
109                         Client:      arvados.NewClientFromEnv(),
110                         ProjectUUID: cmd.projectUUID,
111                         RAM:         2<<30 + int64(cmd.vcpus)<<28,
112                         VCPUs:       cmd.vcpus,
113                         Priority:    *priority,
114                         Mounts: map[string]map[string]interface{}{
115                                 "/gvcf_regions.py": map[string]interface{}{
116                                         "kind":    "text",
117                                         "content": string(cmd.gvcfRegionsPyData),
118                                 },
119                         },
120                 }
121                 if cmd.mask {
122                         runner.RAM += int64(cmd.vcpus) << 31
123                 }
124                 err = runner.TranslatePaths(&cmd.refFile, &cmd.genomeFile)
125                 if err != nil {
126                         return 1
127                 }
128                 inputs := flags.Args()
129                 for i := range inputs {
130                         err = runner.TranslatePaths(&inputs[i])
131                         if err != nil {
132                                 return 1
133                         }
134                 }
135                 runner.Args = append([]string{"vcf2fasta",
136                         "-local=true",
137                         "-ref", cmd.refFile, fmt.Sprintf("-mask=%v", cmd.mask),
138                         "-genome", cmd.genomeFile,
139                         "-gvcf-regions.py", "/gvcf_regions.py",
140                         "-gvcf-type", cmd.gvcfType,
141                         "-output-dir", "/mnt/output"}, inputs...)
142                 var output string
143                 output, err = runner.Run()
144                 if err != nil {
145                         return 1
146                 }
147                 fmt.Fprintln(stdout, output)
148                 return 0
149         }
150
151         infiles, err := listInputFiles(flags.Args())
152         if err != nil {
153                 return 1
154         }
155
156         type job struct {
157                 vcffile string
158                 phase   int
159         }
160         todo := make(chan job)
161         go func() {
162                 for _, infile := range infiles {
163                         for phase := 1; phase <= 2; phase++ {
164                                 todo <- job{vcffile: infile, phase: phase}
165                         }
166                 }
167                 close(todo)
168         }()
169
170         done := make(chan error, runtime.NumCPU()*2)
171         var wg sync.WaitGroup
172         for i := 0; i < runtime.NumCPU(); i++ {
173                 wg.Add(1)
174                 go func() {
175                         defer wg.Done()
176                         for job := range todo {
177                                 if len(done) > 0 {
178                                         // a different worker encountered an error
179                                         return
180                                 }
181                                 err := cmd.vcf2fasta(job.vcffile, job.phase)
182                                 if err != nil {
183                                         done <- fmt.Errorf("%s phase %d: %s", job.vcffile, job.phase, err)
184                                         return
185                                 }
186                         }
187                 }()
188         }
189         go func() {
190                 wg.Wait()
191                 close(done)
192         }()
193
194         err = <-done
195         if err != nil {
196                 return 1
197         }
198         return 0
199 }
200
201 func maybeInDocker(args, mountfiles []string) []string {
202         if out, err := exec.Command("docker", "image", "ls", "-q", "lightning-runtime").Output(); err != nil || len(out) == 0 {
203                 return args
204         }
205         dockerrun := []string{
206                 "docker", "run", "--rm", "-i",
207                 "--log-driver=none",
208         }
209         for _, f := range mountfiles {
210                 dockerrun = append(dockerrun, "--volume="+f+":"+f+":ro")
211         }
212         dockerrun = append(dockerrun, "lightning-runtime")
213         dockerrun = append(dockerrun, args...)
214         return dockerrun
215 }
216
217 func (cmd *vcf2fasta) vcf2fasta(infile string, phase int) error {
218         ctx, cancel := context.WithCancel(context.Background())
219         defer cancel()
220
221         _, basename := filepath.Split(infile)
222         outfile := filepath.Join(cmd.outputDir, fmt.Sprintf("%s.%d.fasta.gz", basename, phase))
223         outf, err := os.OpenFile(outfile, os.O_CREATE|os.O_WRONLY, 0777)
224         if err != nil {
225                 return fmt.Errorf("error opening output file: %s", err)
226         }
227         defer outf.Close()
228         gzipw := gzip.NewWriter(outf)
229         defer gzipw.Close()
230
231         var maskfifo string // filename of mask fifo if we're running bedtools, otherwise ""
232
233         var wg sync.WaitGroup
234         errs := make(chan error, 2)
235         if cmd.mask {
236                 chrSize := map[string]int{}
237
238                 vcffile, err := os.Open(infile)
239                 if err != nil {
240                         return err
241                 }
242                 defer vcffile.Close()
243                 var rdr io.Reader = vcffile
244                 if strings.HasSuffix(infile, ".gz") {
245                         rdr, err = gzip.NewReader(vcffile)
246                         if err != nil {
247                                 return err
248                         }
249                 }
250                 contigre := regexp.MustCompile(`([^=,]*)=([^>,]*)`)
251                 scanner := bufio.NewScanner(rdr)
252                 for scanner.Scan() {
253                         if s := scanner.Text(); !strings.HasPrefix(s, "##") {
254                                 break
255                         } else if !strings.HasPrefix(s, "##contig=<") {
256                                 continue
257                         } else {
258                                 kv := map[string]string{}
259                                 for _, m := range contigre.FindAllStringSubmatch(s[10:], -1) {
260                                         kv[m[1]] = m[2]
261                                 }
262                                 if kv["ID"] != "" && kv["length"] != "" {
263                                         chrSize[kv["ID"]], _ = strconv.Atoi(kv["length"])
264                                 }
265                         }
266                 }
267                 if err = scanner.Err(); err != nil {
268                         return fmt.Errorf("error scanning input file %q: %s", infile, err)
269                 }
270                 var regions bytes.Buffer
271                 bedargs := []string{"python2", "-", "--gvcf_type", cmd.gvcfType, infile}
272                 bed := exec.CommandContext(ctx, bedargs[0], bedargs[1:]...)
273                 bed.Stdin = bytes.NewBuffer(cmd.gvcfRegionsPyData)
274                 bed.Stdout = &regions
275                 bed.Stderr = cmd.stderr
276                 log.Printf("running %v", bed.Args)
277                 err = bed.Run()
278                 log.Printf("exited %v", bed.Args)
279                 if err != nil {
280                         return fmt.Errorf("gvcf_regions: %s", err)
281                 }
282
283                 if cmd.genomeFile != "" {
284                         // Read chromosome sizes from genome file in
285                         // case any weren't specified in the VCF
286                         // header.
287                         genomeFile, err := os.Open(cmd.genomeFile)
288                         if err != nil {
289                                 return fmt.Errorf("error opening genome file %q: %s", cmd.genomeFile, err)
290                         }
291                         scanner := bufio.NewScanner(genomeFile)
292                         for scanner.Scan() {
293                                 var chr string
294                                 var size int
295                                 _, err := fmt.Sscanf(scanner.Text(), "%s\t%d", &chr, &size)
296                                 if err != nil {
297                                         return fmt.Errorf("error parsing genome file %q: %s", cmd.genomeFile, err)
298                                 }
299                                 if chrSize[chr] == 0 {
300                                         chrSize[chr] = size
301                                 }
302                         }
303                         if err = scanner.Err(); err != nil {
304                                 return fmt.Errorf("error scanning genome file %q: %s", cmd.genomeFile, err)
305                         }
306                 }
307
308                 // "bedtools complement" expects the chromosome sizes
309                 // ("genome file") to appear in the same order as the
310                 // chromosomes in the input vcf, so we need to sort
311                 // them.
312                 scanner = bufio.NewScanner(bytes.NewBuffer(append([]byte(nil), regions.Bytes()...)))
313                 var sortedGenomeFile bytes.Buffer
314                 for scanner.Scan() {
315                         var chr string
316                         var size int
317                         _, err := fmt.Sscanf(scanner.Text(), "%s\t%d", &chr, &size)
318                         if err != nil {
319                                 return fmt.Errorf("error parsing gvcf_regions output: %s", err)
320                         }
321                         if size, ok := chrSize[chr]; ok {
322                                 fmt.Fprintf(&sortedGenomeFile, "%s\t%d\n", chr, size)
323                                 delete(chrSize, chr)
324                         }
325                 }
326
327                 // The bcftools --mask argument needs to end in ".bed"
328                 // in order to be parsed as a BED file, so we need to
329                 // use a named pipe instead of stdin.
330                 tempdir, err := ioutil.TempDir("", "")
331                 if err != nil {
332                         return fmt.Errorf("TempDir: %s", err)
333                 }
334                 defer os.RemoveAll(tempdir)
335                 maskfifo = filepath.Join(tempdir, "fifo.bed")
336                 err = syscall.Mkfifo(maskfifo, 0600)
337                 if err != nil {
338                         return fmt.Errorf("mkfifo: %s", err)
339                 }
340
341                 // bedtools complement can't seem to read from a pipe
342                 // reliably -- "Error: line number 1 of file
343                 // /dev/stdin has 1 fields, but 3 were expected." --
344                 // so we stage to a temp file.
345                 regionsFile := filepath.Join(tempdir, "gvcf_regions.bed")
346                 err = ioutil.WriteFile(regionsFile, regions.Bytes(), 0644)
347                 if err != nil {
348                         return err
349                 }
350
351                 wg.Add(1)
352                 go func() {
353                         defer wg.Done()
354
355                         maskfifow, err := os.OpenFile(maskfifo, os.O_WRONLY, 0)
356                         if err != nil {
357                                 errs <- err
358                                 return
359                         }
360                         defer maskfifow.Close()
361
362                         bedcompargs := []string{"bedtools", "complement", "-i", regionsFile, "-g", "/dev/stdin"}
363                         bedcompargs = maybeInDocker(bedcompargs, []string{cmd.genomeFile})
364                         bedcomp := exec.CommandContext(ctx, bedcompargs[0], bedcompargs[1:]...)
365                         bedcomp.Stdin = &sortedGenomeFile
366                         bedcomp.Stdout = maskfifow
367                         bedcomp.Stderr = cmd.stderr
368                         log.Printf("running %v", bedcomp.Args)
369                         err = bedcomp.Run()
370                         log.Printf("exited %v", bedcomp.Args)
371                         if err != nil {
372                                 errs <- fmt.Errorf("bedtools complement: %s", err)
373                                 return
374                         }
375                         err = maskfifow.Close()
376                         if err != nil {
377                                 errs <- err
378                                 return
379                         }
380                 }()
381         }
382
383         wg.Add(1)
384         go func() {
385                 defer wg.Done()
386                 consargs := []string{"bcftools", "consensus", "--fasta-ref", cmd.refFile, "-H", fmt.Sprint(phase)}
387                 if maskfifo != "" {
388                         consargs = append(consargs, "--mask", maskfifo)
389                 }
390                 consargs = append(consargs, infile)
391                 indexsuffix := ".tbi"
392                 if _, err := os.Stat(infile + ".csi"); err == nil {
393                         indexsuffix = ".csi"
394                 }
395                 mounts := []string{infile, infile + indexsuffix, cmd.refFile}
396                 if maskfifo != "" {
397                         mounts = append(mounts, maskfifo)
398                 }
399                 consargs = maybeInDocker(consargs, mounts)
400
401                 consensus := exec.CommandContext(ctx, consargs[0], consargs[1:]...)
402                 consensus.Stderr = os.Stderr
403                 consensus.Stdout = gzipw
404                 consensus.Stderr = cmd.stderr
405                 log.Printf("running %v", consensus.Args)
406                 err = consensus.Run()
407                 if err != nil {
408                         errs <- fmt.Errorf("bcftools consensus: %s", err)
409                         return
410                 }
411                 err = gzipw.Close()
412                 if err != nil {
413                         errs <- err
414                         return
415                 }
416                 errs <- outf.Close()
417         }()
418
419         go func() {
420                 wg.Wait()
421                 close(errs)
422         }()
423
424         for err := range errs {
425                 if err != nil {
426                         cancel()
427                         wg.Wait()
428                         return err
429                 }
430         }
431         return nil
432 }
433
434 func (cmd *vcf2fasta) loadRegionsPy() error {
435         if strings.HasPrefix(cmd.gvcfRegionsPy, "http") {
436                 resp, err := http.Get(cmd.gvcfRegionsPy)
437                 if err != nil {
438                         return err
439                 }
440                 if resp.StatusCode != http.StatusOK {
441                         return fmt.Errorf("get %q: http status %d", cmd.gvcfRegionsPy, resp.StatusCode)
442                 }
443                 buf, err := ioutil.ReadAll(resp.Body)
444                 if err != nil {
445                         return fmt.Errorf("get %q: read body: %s", cmd.gvcfRegionsPy, err)
446                 }
447                 cmd.gvcfRegionsPyData = buf
448                 return nil
449         } else {
450                 buf, err := ioutil.ReadFile(cmd.gvcfRegionsPy)
451                 if err != nil {
452                         return err
453                 }
454                 cmd.gvcfRegionsPyData = buf
455                 return nil
456         }
457 }