Propagate genome arg.
[lightning.git] / vcf2fasta.go
1 package main
2
3 import (
4         "bytes"
5         "compress/gzip"
6         "errors"
7         "flag"
8         "fmt"
9         "io"
10         "io/ioutil"
11         "net/http"
12         _ "net/http/pprof"
13         "os"
14         "os/exec"
15         "path/filepath"
16         "runtime"
17         "strings"
18         "sync"
19         "syscall"
20
21         "git.arvados.org/arvados.git/sdk/go/arvados"
22         log "github.com/sirupsen/logrus"
23 )
24
25 type vcf2fasta struct {
26         refFile           string
27         genomeFile        string
28         mask              bool
29         gvcfRegionsPy     string
30         gvcfRegionsPyData []byte
31         projectUUID       string
32         outputDir         string
33         runLocal          bool
34         vcpus             int
35
36         stderr io.Writer
37 }
38
39 func (cmd *vcf2fasta) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
40         var err error
41         defer func() {
42                 if err != nil {
43                         fmt.Fprintf(stderr, "%s\n", err)
44                 }
45         }()
46         flags := flag.NewFlagSet("", flag.ContinueOnError)
47         flags.SetOutput(stderr)
48         flags.StringVar(&cmd.refFile, "ref", "", "reference fasta `file`")
49         flags.StringVar(&cmd.genomeFile, "genome", "", "reference genome `file`")
50         flags.BoolVar(&cmd.mask, "mask", false, "mask uncalled regions (default: output hom ref)")
51         flags.StringVar(&cmd.gvcfRegionsPy, "gvcf-regions.py", "https://raw.githubusercontent.com/lijiayong/gvcf_regions/master/gvcf_regions.py", "source of gvcf_regions.py")
52         flags.StringVar(&cmd.projectUUID, "project", "", "project `UUID` for containers and output data")
53         flags.StringVar(&cmd.outputDir, "output-dir", "", "output directory")
54         flags.IntVar(&cmd.vcpus, "vcpus", 0, "number of VCPUs to request for arvados container (default: 2*number of input files, max 32)")
55         flags.BoolVar(&cmd.runLocal, "local", false, "run on local host (default: run in an arvados container)")
56         priority := flags.Int("priority", 500, "container request priority")
57         pprof := flags.String("pprof", "", "serve Go profile data at http://`[addr]:port`")
58         err = flags.Parse(args)
59         if err == flag.ErrHelp {
60                 err = nil
61                 return 0
62         } else if err != nil {
63                 return 2
64         } else if cmd.refFile == "" {
65                 err = errors.New("reference data (-ref) not specified")
66                 return 2
67         } else if flags.NArg() == 0 {
68                 flags.Usage()
69                 return 2
70         }
71         cmd.stderr = stderr
72
73         if *pprof != "" {
74                 go func() {
75                         log.Println(http.ListenAndServe(*pprof, nil))
76                 }()
77         }
78
79         if cmd.mask {
80                 err = cmd.loadRegionsPy()
81                 if err != nil {
82                         return 1
83                 }
84         }
85
86         if !cmd.runLocal {
87                 if cmd.outputDir != "" {
88                         err = errors.New("cannot specify output dir in non-local mode")
89                         return 2
90                 }
91                 if cmd.vcpus < 1 {
92                         var infiles []string
93                         infiles, err = listInputFiles(flags.Args())
94                         if err != nil {
95                                 return 1
96                         }
97                         if cmd.vcpus = len(infiles) * 2; cmd.vcpus > 32 {
98                                 cmd.vcpus = 32
99                         }
100                 }
101                 runner := arvadosContainerRunner{
102                         Name:        "lightning vcf2fasta",
103                         Client:      arvados.NewClientFromEnv(),
104                         ProjectUUID: cmd.projectUUID,
105                         RAM:         2<<30 + int64(cmd.vcpus)<<28,
106                         VCPUs:       cmd.vcpus,
107                         Priority:    *priority,
108                         Mounts: map[string]map[string]interface{}{
109                                 "/gvcf_regions.py": map[string]interface{}{
110                                         "kind":    "text",
111                                         "content": string(cmd.gvcfRegionsPyData),
112                                 },
113                         },
114                 }
115                 err = runner.TranslatePaths(&cmd.refFile, &cmd.genomeFile)
116                 if err != nil {
117                         return 1
118                 }
119                 inputs := flags.Args()
120                 for i := range inputs {
121                         err = runner.TranslatePaths(&inputs[i])
122                         if err != nil {
123                                 return 1
124                         }
125                 }
126                 runner.Args = append([]string{"vcf2fasta",
127                         "-local=true",
128                         "-ref", cmd.refFile, fmt.Sprintf("-mask=%v", cmd.mask),
129                         "-genome", cmd.genomeFile,
130                         "-gvcf-regions.py", "/gvcf_regions.py",
131                         "-output-dir", "/mnt/output"}, inputs...)
132                 var output string
133                 output, err = runner.Run()
134                 if err != nil {
135                         return 1
136                 }
137                 fmt.Fprintln(stdout, output)
138                 return 0
139         }
140
141         infiles, err := listInputFiles(flags.Args())
142         if err != nil {
143                 return 1
144         }
145
146         type job struct {
147                 vcffile string
148                 phase   int
149         }
150         todo := make(chan job)
151         go func() {
152                 for _, infile := range infiles {
153                         for phase := 1; phase <= 2; phase++ {
154                                 todo <- job{vcffile: infile, phase: phase}
155                         }
156                 }
157                 close(todo)
158         }()
159
160         done := make(chan error, runtime.NumCPU()*2)
161         var wg sync.WaitGroup
162         for i := 0; i < runtime.NumCPU(); i++ {
163                 wg.Add(1)
164                 go func() {
165                         defer wg.Done()
166                         for job := range todo {
167                                 if len(done) > 0 {
168                                         // a different worker encountered an error
169                                         return
170                                 }
171                                 err := cmd.vcf2fasta(job.vcffile, job.phase)
172                                 if err != nil {
173                                         done <- fmt.Errorf("%s phase %d: %s", job.vcffile, job.phase, err)
174                                         return
175                                 }
176                         }
177                 }()
178         }
179         go func() {
180                 wg.Wait()
181                 close(done)
182         }()
183
184         err = <-done
185         if err != nil {
186                 return 1
187         }
188         return 0
189 }
190
191 func maybeInDocker(args, mountfiles []string) []string {
192         if out, err := exec.Command("docker", "image", "ls", "-q", "lightning-runtime").Output(); err != nil || len(out) == 0 {
193                 return args
194         }
195         dockerrun := []string{
196                 "docker", "run", "--rm", "-i",
197                 "--log-driver=none",
198         }
199         for _, f := range mountfiles {
200                 dockerrun = append(dockerrun, "--volume="+f+":"+f+":ro")
201         }
202         dockerrun = append(dockerrun, "lightning-runtime")
203         dockerrun = append(dockerrun, args...)
204         return dockerrun
205 }
206
207 func (cmd *vcf2fasta) vcf2fasta(infile string, phase int) error {
208         _, basename := filepath.Split(infile)
209         outfile := filepath.Join(cmd.outputDir, fmt.Sprintf("%s.%d.fasta.gz", basename, phase))
210         outf, err := os.OpenFile(outfile, os.O_CREATE|os.O_WRONLY, 0777)
211         if err != nil {
212                 return fmt.Errorf("error opening output file: %s", err)
213         }
214         defer outf.Close()
215         gzipw := gzip.NewWriter(outf)
216         defer gzipw.Close()
217
218         var maskfifo string // filename of mask fifo if we're running bedtools, otherwise ""
219
220         var wg sync.WaitGroup
221         errs := make(chan error, 2)
222         if cmd.mask {
223                 if cmd.genomeFile == "" {
224                         return errors.New("cannot apply mask without -genome argument")
225                 }
226                 var regions bytes.Buffer
227                 bedargs := []string{"python", "-", "--gvcf_type", "gatk", infile}
228                 bed := exec.Command(bedargs[0], bedargs[1:]...)
229                 bed.Stdin = bytes.NewBuffer(cmd.gvcfRegionsPyData)
230                 bed.Stdout = &regions
231                 bed.Stderr = cmd.stderr
232                 log.Printf("running %v", bed.Args)
233                 err := bed.Run()
234                 log.Printf("exited %v", bed.Args)
235                 if err != nil {
236                         return fmt.Errorf("gvcf_regions: %s", err)
237                 }
238
239                 // The bcftools --mask argument needs to end in ".bed"
240                 // in order to be parsed as a BED file, so we need to
241                 // use a named pipe instead of stdin.
242                 tempdir, err := ioutil.TempDir("", "")
243                 if err != nil {
244                         return fmt.Errorf("TempDir: %s", err)
245                 }
246                 defer os.RemoveAll(tempdir)
247                 maskfifo = filepath.Join(tempdir, "fifo.bed")
248                 err = syscall.Mkfifo(maskfifo, 0600)
249                 if err != nil {
250                         return fmt.Errorf("mkfifo: %s", err)
251                 }
252                 wg.Add(1)
253                 go func() {
254                         defer wg.Done()
255
256                         maskfifow, err := os.OpenFile(maskfifo, os.O_WRONLY, 0)
257                         if err != nil {
258                                 errs <- err
259                                 return
260                         }
261                         defer maskfifow.Close()
262
263                         bedcompargs := []string{"bedtools", "complement", "-i", "/dev/stdin", "-g", cmd.genomeFile}
264                         bedcompargs = maybeInDocker(bedcompargs, []string{cmd.genomeFile})
265                         bedcomp := exec.Command(bedcompargs[0], bedcompargs[1:]...)
266                         bedcomp.Stdin = &regions
267                         bedcomp.Stdout = maskfifow
268                         bedcomp.Stderr = cmd.stderr
269                         log.Printf("running %v", bedcomp.Args)
270                         err = bedcomp.Run()
271                         log.Printf("exited %v", bedcomp.Args)
272                         if err != nil {
273                                 errs <- fmt.Errorf("bedtools complement: %s", err)
274                                 return
275                         }
276                         err = maskfifow.Close()
277                         if err != nil {
278                                 errs <- err
279                                 return
280                         }
281                 }()
282         }
283
284         wg.Add(1)
285         go func() {
286                 defer wg.Done()
287                 consargs := []string{"bcftools", "consensus", "--fasta-ref", cmd.refFile, "-H", fmt.Sprint(phase)}
288                 if maskfifo != "" {
289                         consargs = append(consargs, "--mask", maskfifo)
290                 }
291                 consargs = append(consargs, infile)
292                 indexsuffix := ".tbi"
293                 if _, err := os.Stat(infile + ".csi"); err == nil {
294                         indexsuffix = ".csi"
295                 }
296                 mounts := []string{infile, infile + indexsuffix, cmd.refFile}
297                 if maskfifo != "" {
298                         mounts = append(mounts, maskfifo)
299                 }
300                 consargs = maybeInDocker(consargs, mounts)
301
302                 consensus := exec.Command(consargs[0], consargs[1:]...)
303                 consensus.Stderr = os.Stderr
304                 consensus.Stdout = gzipw
305                 consensus.Stderr = cmd.stderr
306                 log.Printf("running %v", consensus.Args)
307                 err = consensus.Run()
308                 if err != nil {
309                         errs <- fmt.Errorf("bcftools consensus: %s", err)
310                         return
311                 }
312                 err = gzipw.Close()
313                 if err != nil {
314                         errs <- err
315                         return
316                 }
317                 errs <- outf.Close()
318         }()
319
320         go func() {
321                 wg.Wait()
322                 close(errs)
323         }()
324
325         for err := range errs {
326                 if err != nil {
327                         wg.Wait()
328                         return err
329                 }
330         }
331         return nil
332 }
333
334 func (cmd *vcf2fasta) loadRegionsPy() error {
335         if strings.HasPrefix(cmd.gvcfRegionsPy, "http") {
336                 resp, err := http.Get(cmd.gvcfRegionsPy)
337                 if err != nil {
338                         return err
339                 }
340                 if resp.StatusCode != http.StatusOK {
341                         return fmt.Errorf("get %q: http status %d", cmd.gvcfRegionsPy, resp.StatusCode)
342                 }
343                 buf, err := ioutil.ReadAll(resp.Body)
344                 if err != nil {
345                         return fmt.Errorf("get %q: read body: %s", cmd.gvcfRegionsPy, err)
346                 }
347                 cmd.gvcfRegionsPyData = buf
348                 return nil
349         } else {
350                 buf, err := ioutil.ReadFile(cmd.gvcfRegionsPy)
351                 if err != nil {
352                         return err
353                 }
354                 cmd.gvcfRegionsPyData = buf
355                 return nil
356         }
357 }