Show stderr from children.
[lightning.git] / vcf2fasta.go
1 package main
2
3 import (
4         "bytes"
5         "compress/gzip"
6         "errors"
7         "flag"
8         "fmt"
9         "io"
10         "io/ioutil"
11         "net/http"
12         _ "net/http/pprof"
13         "os"
14         "os/exec"
15         "path/filepath"
16         "runtime"
17         "strings"
18         "sync"
19
20         "git.arvados.org/arvados.git/sdk/go/arvados"
21         log "github.com/sirupsen/logrus"
22 )
23
24 type vcf2fasta struct {
25         refFile           string
26         mask              bool
27         gvcfRegionsPy     string
28         gvcfRegionsPyData []byte
29         projectUUID       string
30         outputDir         string
31         runLocal          bool
32         vcpus             int
33
34         stderr io.Writer
35 }
36
37 func (cmd *vcf2fasta) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
38         var err error
39         defer func() {
40                 if err != nil {
41                         fmt.Fprintf(stderr, "%s\n", err)
42                 }
43         }()
44         flags := flag.NewFlagSet("", flag.ContinueOnError)
45         flags.SetOutput(stderr)
46         flags.StringVar(&cmd.refFile, "ref", "", "reference fasta `file`")
47         flags.BoolVar(&cmd.mask, "mask", false, "mask uncalled regions (default: output hom ref)")
48         flags.StringVar(&cmd.gvcfRegionsPy, "gvcf-regions.py", "https://raw.githubusercontent.com/lijiayong/gvcf_regions/master/gvcf_regions.py", "source of gvcf_regions.py")
49         flags.StringVar(&cmd.projectUUID, "project", "", "project `UUID` for containers and output data")
50         flags.StringVar(&cmd.outputDir, "output-dir", "", "output directory")
51         flags.IntVar(&cmd.vcpus, "vcpus", 0, "number of VCPUs to request for arvados container (default: 2*number of input files, max 32)")
52         flags.BoolVar(&cmd.runLocal, "local", false, "run on local host (default: run in an arvados container)")
53         priority := flags.Int("priority", 500, "container request priority")
54         pprof := flags.String("pprof", "", "serve Go profile data at http://`[addr]:port`")
55         err = flags.Parse(args)
56         if err == flag.ErrHelp {
57                 err = nil
58                 return 0
59         } else if err != nil {
60                 return 2
61         } else if cmd.refFile == "" {
62                 err = errors.New("reference data (-ref) not specified")
63                 return 2
64         } else if flags.NArg() == 0 {
65                 flags.Usage()
66                 return 2
67         }
68         cmd.stderr = stderr
69
70         if *pprof != "" {
71                 go func() {
72                         log.Println(http.ListenAndServe(*pprof, nil))
73                 }()
74         }
75
76         if cmd.mask {
77                 err = cmd.loadRegionsPy()
78                 if err != nil {
79                         return 1
80                 }
81         }
82
83         if !cmd.runLocal {
84                 if cmd.outputDir != "" {
85                         err = errors.New("cannot specify output dir in non-local mode")
86                         return 2
87                 }
88                 if cmd.vcpus < 1 {
89                         var infiles []string
90                         infiles, err = listInputFiles(flags.Args())
91                         if err != nil {
92                                 return 1
93                         }
94                         if cmd.vcpus = len(infiles) * 2; cmd.vcpus > 32 {
95                                 cmd.vcpus = 32
96                         }
97                 }
98                 runner := arvadosContainerRunner{
99                         Name:        "lightning vcf2fasta",
100                         Client:      arvados.NewClientFromEnv(),
101                         ProjectUUID: cmd.projectUUID,
102                         RAM:         2<<30 + int64(cmd.vcpus)<<28,
103                         VCPUs:       cmd.vcpus,
104                         Priority:    *priority,
105                         Mounts: map[string]map[string]interface{}{
106                                 "/gvcf_regions.py": map[string]interface{}{
107                                         "kind":    "text",
108                                         "content": string(cmd.gvcfRegionsPyData),
109                                 },
110                         },
111                 }
112                 err = runner.TranslatePaths(&cmd.refFile)
113                 if err != nil {
114                         return 1
115                 }
116                 inputs := flags.Args()
117                 for i := range inputs {
118                         err = runner.TranslatePaths(&inputs[i])
119                         if err != nil {
120                                 return 1
121                         }
122                 }
123                 runner.Args = append([]string{"vcf2fasta", "-local=true", "-ref", cmd.refFile, fmt.Sprintf("-mask=%v", cmd.mask), "-gvcf-regions.py", "/gvcf_regions.py", "-output-dir", "/mnt/output"}, inputs...)
124                 var output string
125                 output, err = runner.Run()
126                 if err != nil {
127                         return 1
128                 }
129                 fmt.Fprintln(stdout, output)
130                 return 0
131         }
132
133         infiles, err := listInputFiles(flags.Args())
134         if err != nil {
135                 return 1
136         }
137
138         type job struct {
139                 vcffile string
140                 phase   int
141         }
142         todo := make(chan job)
143         go func() {
144                 for _, infile := range infiles {
145                         for phase := 1; phase <= 2; phase++ {
146                                 todo <- job{vcffile: infile, phase: phase}
147                         }
148                 }
149                 close(todo)
150         }()
151
152         done := make(chan error, runtime.NumCPU()*2)
153         var wg sync.WaitGroup
154         for i := 0; i < runtime.NumCPU(); i++ {
155                 wg.Add(1)
156                 go func() {
157                         defer wg.Done()
158                         for job := range todo {
159                                 if len(done) > 0 {
160                                         // a different worker encountered an error
161                                         return
162                                 }
163                                 err := cmd.vcf2fasta(job.vcffile, job.phase)
164                                 if err != nil {
165                                         done <- fmt.Errorf("%s phase %d: %s", job.vcffile, job.phase, err)
166                                         return
167                                 }
168                         }
169                 }()
170         }
171         go func() {
172                 wg.Wait()
173                 close(done)
174         }()
175
176         err = <-done
177         if err != nil {
178                 return 1
179         }
180         return 0
181 }
182
183 func maybeInDocker(args, mountfiles []string) []string {
184         if out, err := exec.Command("docker", "image", "ls", "-q", "lightning-runtime").Output(); err != nil || len(out) == 0 {
185                 return args
186         }
187         dockerrun := []string{
188                 "docker", "run", "--rm",
189                 "--log-driver=none",
190         }
191         for _, f := range mountfiles {
192                 dockerrun = append(dockerrun, "--volume="+f+":"+f+":ro")
193         }
194         dockerrun = append(dockerrun, "lightning-runtime")
195         dockerrun = append(dockerrun, args...)
196         return dockerrun
197 }
198
199 func (cmd *vcf2fasta) vcf2fasta(infile string, phase int) error {
200         _, basename := filepath.Split(infile)
201         outfile := filepath.Join(cmd.outputDir, fmt.Sprintf("%s.%d.fasta.gz", basename, phase))
202         outf, err := os.OpenFile(outfile, os.O_CREATE|os.O_WRONLY|os.O_EXCL, 0777)
203         if err != nil {
204                 return fmt.Errorf("error opening output file: %s", err)
205         }
206         defer outf.Close()
207         gzipw := gzip.NewWriter(outf)
208         defer gzipw.Close()
209
210         var maskfile *os.File // reading side of a pipe if we're running bedtools, otherwise nil
211
212         var wg sync.WaitGroup
213         errs := make(chan error, 3)
214         if cmd.mask {
215                 bedr, bedw, err := os.Pipe()
216                 if err != nil {
217                         return err
218                 }
219                 bedargs := []string{"python", "-", "--gvcf_type", "gatk", infile}
220                 bed := exec.Command(bedargs[0], bedargs[1:]...)
221                 bed.Stdin = bytes.NewBuffer(cmd.gvcfRegionsPyData)
222                 bed.Stdout = bedw
223                 bed.Stderr = cmd.stderr
224                 wg.Add(1)
225                 go func() {
226                         defer wg.Done()
227                         log.Printf("running %v", bed.Args)
228                         errs <- bed.Run()
229                 }()
230
231                 bedcompr, bedcompw, err := os.Pipe()
232                 if err != nil {
233                         return err
234                 }
235                 bedcompargs := []string{"bedtools", "complement", "-i", "/dev/stdin", "-g", cmd.refFile}
236                 bedcompargs = maybeInDocker(bedcompargs, []string{cmd.refFile, infile})
237                 bedcomp := exec.Command(bedcompargs[0], bedcompargs[1:]...)
238                 bedcomp.Stdin = bedr
239                 bedcomp.Stdout = bedcompw
240                 bedcomp.Stderr = cmd.stderr
241                 wg.Add(1)
242                 go func() {
243                         defer wg.Done()
244                         log.Printf("running %v", bedcomp.Args)
245                         errs <- bedcomp.Run()
246                 }()
247                 maskfile = bedcompr
248         }
249
250         wg.Add(1)
251         go func() {
252                 defer wg.Done()
253                 consargs := []string{"bcftools", "consensus", "--fasta-ref", cmd.refFile, "-H", fmt.Sprint(phase)}
254                 if maskfile != nil {
255                         consargs = append(consargs, "--mask", "/dev/stdin")
256                 }
257                 consargs = append(consargs, infile)
258                 indexsuffix := ".tbi"
259                 if _, err := os.Stat(infile + ".csi"); err == nil {
260                         indexsuffix = ".csi"
261                 }
262                 consargs = maybeInDocker(consargs, []string{infile, infile + indexsuffix, cmd.refFile})
263
264                 consensus := exec.Command(consargs[0], consargs[1:]...)
265                 consensus.Stderr = os.Stderr
266                 consensus.Stdout = gzipw
267                 consensus.Stderr = cmd.stderr
268                 if maskfile != nil {
269                         consensus.Stdin = maskfile
270                 }
271                 log.Printf("running %v", consensus.Args)
272                 err = consensus.Run()
273                 if err != nil {
274                         errs <- err
275                         return
276                 }
277                 err = gzipw.Close()
278                 if err != nil {
279                         errs <- err
280                         return
281                 }
282                 errs <- outf.Close()
283         }()
284
285         go func() {
286                 wg.Wait()
287                 close(errs)
288         }()
289
290         for err := range errs {
291                 if err != nil {
292                         return err
293                 }
294         }
295         return nil
296 }
297
298 func (cmd *vcf2fasta) loadRegionsPy() error {
299         if strings.HasPrefix(cmd.gvcfRegionsPy, "http") {
300                 resp, err := http.Get(cmd.gvcfRegionsPy)
301                 if err != nil {
302                         return err
303                 }
304                 if resp.StatusCode != http.StatusOK {
305                         return fmt.Errorf("get %q: http status %d", cmd.gvcfRegionsPy, resp.StatusCode)
306                 }
307                 buf, err := ioutil.ReadAll(resp.Body)
308                 if err != nil {
309                         return fmt.Errorf("get %q: read body: %s", cmd.gvcfRegionsPy, err)
310                 }
311                 cmd.gvcfRegionsPyData = buf
312                 return nil
313         } else {
314                 buf, err := ioutil.ReadFile(cmd.gvcfRegionsPy)
315                 if err != nil {
316                         return err
317                 }
318                 cmd.gvcfRegionsPyData = buf
319                 return nil
320         }
321 }