Don't pipe gvcf_regions to bedtools. Avoid short-read race bug.
[lightning.git] / vcf2fasta.go
1 package main
2
3 import (
4         "bytes"
5         "compress/gzip"
6         "errors"
7         "flag"
8         "fmt"
9         "io"
10         "io/ioutil"
11         "net/http"
12         _ "net/http/pprof"
13         "os"
14         "os/exec"
15         "path/filepath"
16         "runtime"
17         "strings"
18         "sync"
19         "syscall"
20
21         "git.arvados.org/arvados.git/sdk/go/arvados"
22         log "github.com/sirupsen/logrus"
23 )
24
25 type vcf2fasta struct {
26         refFile           string
27         genomeFile        string
28         mask              bool
29         gvcfRegionsPy     string
30         gvcfRegionsPyData []byte
31         projectUUID       string
32         outputDir         string
33         runLocal          bool
34         vcpus             int
35
36         stderr io.Writer
37 }
38
39 func (cmd *vcf2fasta) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
40         var err error
41         defer func() {
42                 if err != nil {
43                         fmt.Fprintf(stderr, "%s\n", err)
44                 }
45         }()
46         flags := flag.NewFlagSet("", flag.ContinueOnError)
47         flags.SetOutput(stderr)
48         flags.StringVar(&cmd.refFile, "ref", "", "reference fasta `file`")
49         flags.StringVar(&cmd.genomeFile, "genome", "", "reference genome `file`")
50         flags.BoolVar(&cmd.mask, "mask", false, "mask uncalled regions (default: output hom ref)")
51         flags.StringVar(&cmd.gvcfRegionsPy, "gvcf-regions.py", "https://raw.githubusercontent.com/lijiayong/gvcf_regions/master/gvcf_regions.py", "source of gvcf_regions.py")
52         flags.StringVar(&cmd.projectUUID, "project", "", "project `UUID` for containers and output data")
53         flags.StringVar(&cmd.outputDir, "output-dir", "", "output directory")
54         flags.IntVar(&cmd.vcpus, "vcpus", 0, "number of VCPUs to request for arvados container (default: 2*number of input files, max 32)")
55         flags.BoolVar(&cmd.runLocal, "local", false, "run on local host (default: run in an arvados container)")
56         priority := flags.Int("priority", 500, "container request priority")
57         pprof := flags.String("pprof", "", "serve Go profile data at http://`[addr]:port`")
58         err = flags.Parse(args)
59         if err == flag.ErrHelp {
60                 err = nil
61                 return 0
62         } else if err != nil {
63                 return 2
64         } else if cmd.refFile == "" {
65                 err = errors.New("reference data (-ref) not specified")
66                 return 2
67         } else if flags.NArg() == 0 {
68                 flags.Usage()
69                 return 2
70         }
71         cmd.stderr = stderr
72
73         if *pprof != "" {
74                 go func() {
75                         log.Println(http.ListenAndServe(*pprof, nil))
76                 }()
77         }
78
79         if cmd.mask {
80                 err = cmd.loadRegionsPy()
81                 if err != nil {
82                         return 1
83                 }
84         }
85
86         if !cmd.runLocal {
87                 if cmd.outputDir != "" {
88                         err = errors.New("cannot specify output dir in non-local mode")
89                         return 2
90                 }
91                 if cmd.vcpus < 1 {
92                         var infiles []string
93                         infiles, err = listInputFiles(flags.Args())
94                         if err != nil {
95                                 return 1
96                         }
97                         if cmd.vcpus = len(infiles) * 2; cmd.vcpus > 32 {
98                                 cmd.vcpus = 32
99                         }
100                 }
101                 runner := arvadosContainerRunner{
102                         Name:        "lightning vcf2fasta",
103                         Client:      arvados.NewClientFromEnv(),
104                         ProjectUUID: cmd.projectUUID,
105                         RAM:         2<<30 + int64(cmd.vcpus)<<28,
106                         VCPUs:       cmd.vcpus,
107                         Priority:    *priority,
108                         Mounts: map[string]map[string]interface{}{
109                                 "/gvcf_regions.py": map[string]interface{}{
110                                         "kind":    "text",
111                                         "content": string(cmd.gvcfRegionsPyData),
112                                 },
113                         },
114                 }
115                 err = runner.TranslatePaths(&cmd.refFile)
116                 if err != nil {
117                         return 1
118                 }
119                 inputs := flags.Args()
120                 for i := range inputs {
121                         err = runner.TranslatePaths(&inputs[i])
122                         if err != nil {
123                                 return 1
124                         }
125                 }
126                 runner.Args = append([]string{"vcf2fasta", "-local=true", "-ref", cmd.refFile, fmt.Sprintf("-mask=%v", cmd.mask), "-gvcf-regions.py", "/gvcf_regions.py", "-output-dir", "/mnt/output"}, inputs...)
127                 var output string
128                 output, err = runner.Run()
129                 if err != nil {
130                         return 1
131                 }
132                 fmt.Fprintln(stdout, output)
133                 return 0
134         }
135
136         infiles, err := listInputFiles(flags.Args())
137         if err != nil {
138                 return 1
139         }
140
141         type job struct {
142                 vcffile string
143                 phase   int
144         }
145         todo := make(chan job)
146         go func() {
147                 for _, infile := range infiles {
148                         for phase := 1; phase <= 2; phase++ {
149                                 todo <- job{vcffile: infile, phase: phase}
150                         }
151                 }
152                 close(todo)
153         }()
154
155         done := make(chan error, runtime.NumCPU()*2)
156         var wg sync.WaitGroup
157         for i := 0; i < runtime.NumCPU(); i++ {
158                 wg.Add(1)
159                 go func() {
160                         defer wg.Done()
161                         for job := range todo {
162                                 if len(done) > 0 {
163                                         // a different worker encountered an error
164                                         return
165                                 }
166                                 err := cmd.vcf2fasta(job.vcffile, job.phase)
167                                 if err != nil {
168                                         done <- fmt.Errorf("%s phase %d: %s", job.vcffile, job.phase, err)
169                                         return
170                                 }
171                         }
172                 }()
173         }
174         go func() {
175                 wg.Wait()
176                 close(done)
177         }()
178
179         err = <-done
180         if err != nil {
181                 return 1
182         }
183         return 0
184 }
185
186 func maybeInDocker(args, mountfiles []string) []string {
187         if out, err := exec.Command("docker", "image", "ls", "-q", "lightning-runtime").Output(); err != nil || len(out) == 0 {
188                 return args
189         }
190         dockerrun := []string{
191                 "docker", "run", "--rm", "-i",
192                 "--log-driver=none",
193         }
194         for _, f := range mountfiles {
195                 dockerrun = append(dockerrun, "--volume="+f+":"+f+":ro")
196         }
197         dockerrun = append(dockerrun, "lightning-runtime")
198         dockerrun = append(dockerrun, args...)
199         return dockerrun
200 }
201
202 func (cmd *vcf2fasta) vcf2fasta(infile string, phase int) error {
203         _, basename := filepath.Split(infile)
204         outfile := filepath.Join(cmd.outputDir, fmt.Sprintf("%s.%d.fasta.gz", basename, phase))
205         outf, err := os.OpenFile(outfile, os.O_CREATE|os.O_WRONLY, 0777)
206         if err != nil {
207                 return fmt.Errorf("error opening output file: %s", err)
208         }
209         defer outf.Close()
210         gzipw := gzip.NewWriter(outf)
211         defer gzipw.Close()
212
213         var maskfifo string // filename of mask fifo if we're running bedtools, otherwise ""
214
215         var wg sync.WaitGroup
216         errs := make(chan error, 2)
217         if cmd.mask {
218                 if cmd.genomeFile == "" {
219                         return errors.New("cannot apply mask without -genome argument")
220                 }
221                 var regions bytes.Buffer
222                 bedargs := []string{"python", "-", "--gvcf_type", "gatk", infile}
223                 bed := exec.Command(bedargs[0], bedargs[1:]...)
224                 bed.Stdin = bytes.NewBuffer(cmd.gvcfRegionsPyData)
225                 bed.Stdout = &regions
226                 bed.Stderr = cmd.stderr
227                 log.Printf("running %v", bed.Args)
228                 err := bed.Run()
229                 log.Printf("exited %v", bed.Args)
230                 if err != nil {
231                         return fmt.Errorf("gvcf_regions: %s", err)
232                 }
233
234                 // The bcftools --mask argument needs to end in ".bed"
235                 // in order to be parsed as a BED file, so we need to
236                 // use a named pipe instead of stdin.
237                 tempdir, err := ioutil.TempDir("", "")
238                 if err != nil {
239                         return fmt.Errorf("TempDir: %s", err)
240                 }
241                 defer os.RemoveAll(tempdir)
242                 maskfifo = filepath.Join(tempdir, "fifo.bed")
243                 err = syscall.Mkfifo(maskfifo, 0600)
244                 if err != nil {
245                         return fmt.Errorf("mkfifo: %s", err)
246                 }
247                 wg.Add(1)
248                 go func() {
249                         defer wg.Done()
250
251                         maskfifow, err := os.OpenFile(maskfifo, os.O_WRONLY, 0)
252                         if err != nil {
253                                 errs <- err
254                                 return
255                         }
256                         defer maskfifow.Close()
257
258                         bedcompargs := []string{"bedtools", "complement", "-i", "/dev/stdin", "-g", cmd.genomeFile}
259                         bedcompargs = maybeInDocker(bedcompargs, []string{cmd.genomeFile})
260                         bedcomp := exec.Command(bedcompargs[0], bedcompargs[1:]...)
261                         bedcomp.Stdin = &regions
262                         bedcomp.Stdout = maskfifow
263                         bedcomp.Stderr = cmd.stderr
264                         log.Printf("running %v", bedcomp.Args)
265                         err = bedcomp.Run()
266                         log.Printf("exited %v", bedcomp.Args)
267                         if err != nil {
268                                 errs <- fmt.Errorf("bedtools complement: %s", err)
269                                 return
270                         }
271                         err = maskfifow.Close()
272                         if err != nil {
273                                 errs <- err
274                                 return
275                         }
276                 }()
277         }
278
279         wg.Add(1)
280         go func() {
281                 defer wg.Done()
282                 consargs := []string{"bcftools", "consensus", "--fasta-ref", cmd.refFile, "-H", fmt.Sprint(phase)}
283                 if maskfifo != "" {
284                         consargs = append(consargs, "--mask", maskfifo)
285                 }
286                 consargs = append(consargs, infile)
287                 indexsuffix := ".tbi"
288                 if _, err := os.Stat(infile + ".csi"); err == nil {
289                         indexsuffix = ".csi"
290                 }
291                 mounts := []string{infile, infile + indexsuffix, cmd.refFile}
292                 if maskfifo != "" {
293                         mounts = append(mounts, maskfifo)
294                 }
295                 consargs = maybeInDocker(consargs, mounts)
296
297                 consensus := exec.Command(consargs[0], consargs[1:]...)
298                 consensus.Stderr = os.Stderr
299                 consensus.Stdout = gzipw
300                 consensus.Stderr = cmd.stderr
301                 log.Printf("running %v", consensus.Args)
302                 err = consensus.Run()
303                 if err != nil {
304                         errs <- fmt.Errorf("bcftools consensus: %s", err)
305                         return
306                 }
307                 err = gzipw.Close()
308                 if err != nil {
309                         errs <- err
310                         return
311                 }
312                 errs <- outf.Close()
313         }()
314
315         go func() {
316                 wg.Wait()
317                 close(errs)
318         }()
319
320         for err := range errs {
321                 if err != nil {
322                         wg.Wait()
323                         return err
324                 }
325         }
326         return nil
327 }
328
329 func (cmd *vcf2fasta) loadRegionsPy() error {
330         if strings.HasPrefix(cmd.gvcfRegionsPy, "http") {
331                 resp, err := http.Get(cmd.gvcfRegionsPy)
332                 if err != nil {
333                         return err
334                 }
335                 if resp.StatusCode != http.StatusOK {
336                         return fmt.Errorf("get %q: http status %d", cmd.gvcfRegionsPy, resp.StatusCode)
337                 }
338                 buf, err := ioutil.ReadAll(resp.Body)
339                 if err != nil {
340                         return fmt.Errorf("get %q: read body: %s", cmd.gvcfRegionsPy, err)
341                 }
342                 cmd.gvcfRegionsPyData = buf
343                 return nil
344         } else {
345                 buf, err := ioutil.ReadFile(cmd.gvcfRegionsPy)
346                 if err != nil {
347                         return err
348                 }
349                 cmd.gvcfRegionsPyData = buf
350                 return nil
351         }
352 }