Connect stdin/stdout in docker containers.
[lightning.git] / vcf2fasta.go
1 package main
2
3 import (
4         "bytes"
5         "compress/gzip"
6         "errors"
7         "flag"
8         "fmt"
9         "io"
10         "io/ioutil"
11         "net/http"
12         _ "net/http/pprof"
13         "os"
14         "os/exec"
15         "path/filepath"
16         "runtime"
17         "strings"
18         "sync"
19
20         "git.arvados.org/arvados.git/sdk/go/arvados"
21         log "github.com/sirupsen/logrus"
22 )
23
24 type vcf2fasta struct {
25         refFile           string
26         genomeFile        string
27         mask              bool
28         gvcfRegionsPy     string
29         gvcfRegionsPyData []byte
30         projectUUID       string
31         outputDir         string
32         runLocal          bool
33         vcpus             int
34
35         stderr io.Writer
36 }
37
38 func (cmd *vcf2fasta) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
39         var err error
40         defer func() {
41                 if err != nil {
42                         fmt.Fprintf(stderr, "%s\n", err)
43                 }
44         }()
45         flags := flag.NewFlagSet("", flag.ContinueOnError)
46         flags.SetOutput(stderr)
47         flags.StringVar(&cmd.refFile, "ref", "", "reference fasta `file`")
48         flags.StringVar(&cmd.genomeFile, "genome", "", "reference genome `file`")
49         flags.BoolVar(&cmd.mask, "mask", false, "mask uncalled regions (default: output hom ref)")
50         flags.StringVar(&cmd.gvcfRegionsPy, "gvcf-regions.py", "https://raw.githubusercontent.com/lijiayong/gvcf_regions/master/gvcf_regions.py", "source of gvcf_regions.py")
51         flags.StringVar(&cmd.projectUUID, "project", "", "project `UUID` for containers and output data")
52         flags.StringVar(&cmd.outputDir, "output-dir", "", "output directory")
53         flags.IntVar(&cmd.vcpus, "vcpus", 0, "number of VCPUs to request for arvados container (default: 2*number of input files, max 32)")
54         flags.BoolVar(&cmd.runLocal, "local", false, "run on local host (default: run in an arvados container)")
55         priority := flags.Int("priority", 500, "container request priority")
56         pprof := flags.String("pprof", "", "serve Go profile data at http://`[addr]:port`")
57         err = flags.Parse(args)
58         if err == flag.ErrHelp {
59                 err = nil
60                 return 0
61         } else if err != nil {
62                 return 2
63         } else if cmd.refFile == "" {
64                 err = errors.New("reference data (-ref) not specified")
65                 return 2
66         } else if flags.NArg() == 0 {
67                 flags.Usage()
68                 return 2
69         }
70         cmd.stderr = stderr
71
72         if *pprof != "" {
73                 go func() {
74                         log.Println(http.ListenAndServe(*pprof, nil))
75                 }()
76         }
77
78         if cmd.mask {
79                 err = cmd.loadRegionsPy()
80                 if err != nil {
81                         return 1
82                 }
83         }
84
85         if !cmd.runLocal {
86                 if cmd.outputDir != "" {
87                         err = errors.New("cannot specify output dir in non-local mode")
88                         return 2
89                 }
90                 if cmd.vcpus < 1 {
91                         var infiles []string
92                         infiles, err = listInputFiles(flags.Args())
93                         if err != nil {
94                                 return 1
95                         }
96                         if cmd.vcpus = len(infiles) * 2; cmd.vcpus > 32 {
97                                 cmd.vcpus = 32
98                         }
99                 }
100                 runner := arvadosContainerRunner{
101                         Name:        "lightning vcf2fasta",
102                         Client:      arvados.NewClientFromEnv(),
103                         ProjectUUID: cmd.projectUUID,
104                         RAM:         2<<30 + int64(cmd.vcpus)<<28,
105                         VCPUs:       cmd.vcpus,
106                         Priority:    *priority,
107                         Mounts: map[string]map[string]interface{}{
108                                 "/gvcf_regions.py": map[string]interface{}{
109                                         "kind":    "text",
110                                         "content": string(cmd.gvcfRegionsPyData),
111                                 },
112                         },
113                 }
114                 err = runner.TranslatePaths(&cmd.refFile)
115                 if err != nil {
116                         return 1
117                 }
118                 inputs := flags.Args()
119                 for i := range inputs {
120                         err = runner.TranslatePaths(&inputs[i])
121                         if err != nil {
122                                 return 1
123                         }
124                 }
125                 runner.Args = append([]string{"vcf2fasta", "-local=true", "-ref", cmd.refFile, fmt.Sprintf("-mask=%v", cmd.mask), "-gvcf-regions.py", "/gvcf_regions.py", "-output-dir", "/mnt/output"}, inputs...)
126                 var output string
127                 output, err = runner.Run()
128                 if err != nil {
129                         return 1
130                 }
131                 fmt.Fprintln(stdout, output)
132                 return 0
133         }
134
135         infiles, err := listInputFiles(flags.Args())
136         if err != nil {
137                 return 1
138         }
139
140         type job struct {
141                 vcffile string
142                 phase   int
143         }
144         todo := make(chan job)
145         go func() {
146                 for _, infile := range infiles {
147                         for phase := 1; phase <= 2; phase++ {
148                                 todo <- job{vcffile: infile, phase: phase}
149                         }
150                 }
151                 close(todo)
152         }()
153
154         done := make(chan error, runtime.NumCPU()*2)
155         var wg sync.WaitGroup
156         for i := 0; i < runtime.NumCPU(); i++ {
157                 wg.Add(1)
158                 go func() {
159                         defer wg.Done()
160                         for job := range todo {
161                                 if len(done) > 0 {
162                                         // a different worker encountered an error
163                                         return
164                                 }
165                                 err := cmd.vcf2fasta(job.vcffile, job.phase)
166                                 if err != nil {
167                                         done <- fmt.Errorf("%s phase %d: %s", job.vcffile, job.phase, err)
168                                         return
169                                 }
170                         }
171                 }()
172         }
173         go func() {
174                 wg.Wait()
175                 close(done)
176         }()
177
178         err = <-done
179         if err != nil {
180                 return 1
181         }
182         return 0
183 }
184
185 func maybeInDocker(args, mountfiles []string) []string {
186         if out, err := exec.Command("docker", "image", "ls", "-q", "lightning-runtime").Output(); err != nil || len(out) == 0 {
187                 return args
188         }
189         dockerrun := []string{
190                 "docker", "run", "--rm", "-i",
191                 "--log-driver=none",
192         }
193         for _, f := range mountfiles {
194                 dockerrun = append(dockerrun, "--volume="+f+":"+f+":ro")
195         }
196         dockerrun = append(dockerrun, "lightning-runtime")
197         dockerrun = append(dockerrun, args...)
198         return dockerrun
199 }
200
201 func (cmd *vcf2fasta) vcf2fasta(infile string, phase int) error {
202         _, basename := filepath.Split(infile)
203         outfile := filepath.Join(cmd.outputDir, fmt.Sprintf("%s.%d.fasta.gz", basename, phase))
204         outf, err := os.OpenFile(outfile, os.O_CREATE|os.O_WRONLY, 0777)
205         if err != nil {
206                 return fmt.Errorf("error opening output file: %s", err)
207         }
208         defer outf.Close()
209         gzipw := gzip.NewWriter(outf)
210         defer gzipw.Close()
211
212         var maskfile *os.File // reading side of a pipe if we're running bedtools, otherwise nil
213
214         var wg sync.WaitGroup
215         errs := make(chan error, 3)
216         if cmd.mask {
217                 if cmd.genomeFile == "" {
218                         return errors.New("cannot apply mask without -genome argument")
219                 }
220                 bedr, bedw, err := os.Pipe()
221                 if err != nil {
222                         return err
223                 }
224                 bedargs := []string{"python", "-", "--gvcf_type", "gatk", infile}
225                 bed := exec.Command(bedargs[0], bedargs[1:]...)
226                 bed.Stdin = bytes.NewBuffer(cmd.gvcfRegionsPyData)
227                 bed.Stdout = bedw
228                 bed.Stderr = cmd.stderr
229                 wg.Add(1)
230                 go func() {
231                         defer wg.Done()
232                         defer bedw.Close()
233                         log.Printf("running %v", bed.Args)
234                         err := bed.Run()
235                         if err != nil {
236                                 errs <- fmt.Errorf("gvcf_regions: %s", err)
237                         }
238                 }()
239
240                 bedcompr, bedcompw, err := os.Pipe()
241                 if err != nil {
242                         return err
243                 }
244                 bedcompargs := []string{"bedtools", "complement", "-i", "/dev/stdin", "-g", cmd.genomeFile}
245                 bedcompargs = maybeInDocker(bedcompargs, []string{cmd.genomeFile})
246                 bedcomp := exec.Command(bedcompargs[0], bedcompargs[1:]...)
247                 bedcomp.Stdin = bedr
248                 bedcomp.Stdout = bedcompw
249                 bedcomp.Stderr = cmd.stderr
250                 wg.Add(1)
251                 go func() {
252                         defer wg.Done()
253                         defer bedcompw.Close()
254                         log.Printf("running %v", bedcomp.Args)
255                         err := bedcomp.Run()
256                         if err != nil {
257                                 errs <- fmt.Errorf("bedtools complement: %s", err)
258                         }
259                 }()
260                 maskfile = bedcompr
261         }
262
263         wg.Add(1)
264         go func() {
265                 defer wg.Done()
266                 consargs := []string{"bcftools", "consensus", "--fasta-ref", cmd.refFile, "-H", fmt.Sprint(phase)}
267                 if maskfile != nil {
268                         consargs = append(consargs, "--mask", "/dev/stdin")
269                 }
270                 consargs = append(consargs, infile)
271                 indexsuffix := ".tbi"
272                 if _, err := os.Stat(infile + ".csi"); err == nil {
273                         indexsuffix = ".csi"
274                 }
275                 consargs = maybeInDocker(consargs, []string{infile, infile + indexsuffix, cmd.refFile})
276
277                 consensus := exec.Command(consargs[0], consargs[1:]...)
278                 consensus.Stderr = os.Stderr
279                 consensus.Stdout = gzipw
280                 consensus.Stderr = cmd.stderr
281                 if maskfile != nil {
282                         consensus.Stdin = maskfile
283                 }
284                 log.Printf("running %v", consensus.Args)
285                 err = consensus.Run()
286                 if err != nil {
287                         errs <- fmt.Errorf("bcftools consensus: %s", err)
288                         return
289                 }
290                 err = gzipw.Close()
291                 if err != nil {
292                         errs <- err
293                         return
294                 }
295                 errs <- outf.Close()
296         }()
297
298         go func() {
299                 wg.Wait()
300                 close(errs)
301         }()
302
303         for err := range errs {
304                 if err != nil {
305                         wg.Wait()
306                         return err
307                 }
308         }
309         return nil
310 }
311
312 func (cmd *vcf2fasta) loadRegionsPy() error {
313         if strings.HasPrefix(cmd.gvcfRegionsPy, "http") {
314                 resp, err := http.Get(cmd.gvcfRegionsPy)
315                 if err != nil {
316                         return err
317                 }
318                 if resp.StatusCode != http.StatusOK {
319                         return fmt.Errorf("get %q: http status %d", cmd.gvcfRegionsPy, resp.StatusCode)
320                 }
321                 buf, err := ioutil.ReadAll(resp.Body)
322                 if err != nil {
323                         return fmt.Errorf("get %q: read body: %s", cmd.gvcfRegionsPy, err)
324                 }
325                 cmd.gvcfRegionsPyData = buf
326                 return nil
327         } else {
328                 buf, err := ioutil.ReadFile(cmd.gvcfRegionsPy)
329                 if err != nil {
330                         return err
331                 }
332                 cmd.gvcfRegionsPyData = buf
333                 return nil
334         }
335 }