Don't try to pass fd 3 to docker.
[lightning.git] / vcf2fasta.go
1 package main
2
3 import (
4         "bytes"
5         "compress/gzip"
6         "errors"
7         "flag"
8         "fmt"
9         "io"
10         "io/ioutil"
11         "net/http"
12         _ "net/http/pprof"
13         "os"
14         "os/exec"
15         "path/filepath"
16         "runtime"
17         "strings"
18         "sync"
19
20         "git.arvados.org/arvados.git/sdk/go/arvados"
21         log "github.com/sirupsen/logrus"
22 )
23
24 type vcf2fasta struct {
25         refFile           string
26         mask              bool
27         gvcfRegionsPy     string
28         gvcfRegionsPyData []byte
29         projectUUID       string
30         outputDir         string
31         runLocal          bool
32         vcpus             int
33 }
34
35 func (cmd *vcf2fasta) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
36         var err error
37         defer func() {
38                 if err != nil {
39                         fmt.Fprintf(stderr, "%s\n", err)
40                 }
41         }()
42         flags := flag.NewFlagSet("", flag.ContinueOnError)
43         flags.SetOutput(stderr)
44         flags.StringVar(&cmd.refFile, "ref", "", "reference fasta `file`")
45         flags.BoolVar(&cmd.mask, "mask", false, "mask uncalled regions (default: output hom ref)")
46         flags.StringVar(&cmd.gvcfRegionsPy, "gvcf-regions.py", "https://raw.githubusercontent.com/lijiayong/gvcf_regions/master/gvcf_regions.py", "source of gvcf_regions.py")
47         flags.StringVar(&cmd.projectUUID, "project", "", "project `UUID` for containers and output data")
48         flags.StringVar(&cmd.outputDir, "output-dir", "", "output directory")
49         flags.IntVar(&cmd.vcpus, "vcpus", 0, "number of VCPUs to request for arvados container (default: 2*number of input files, max 32)")
50         flags.BoolVar(&cmd.runLocal, "local", false, "run on local host (default: run in an arvados container)")
51         priority := flags.Int("priority", 500, "container request priority")
52         pprof := flags.String("pprof", "", "serve Go profile data at http://`[addr]:port`")
53         err = flags.Parse(args)
54         if err == flag.ErrHelp {
55                 err = nil
56                 return 0
57         } else if err != nil {
58                 return 2
59         } else if cmd.refFile == "" {
60                 err = errors.New("reference data (-ref) not specified")
61                 return 2
62         } else if flags.NArg() == 0 {
63                 flags.Usage()
64                 return 2
65         }
66
67         if *pprof != "" {
68                 go func() {
69                         log.Println(http.ListenAndServe(*pprof, nil))
70                 }()
71         }
72
73         if cmd.mask {
74                 err = cmd.loadRegionsPy()
75                 if err != nil {
76                         return 1
77                 }
78         }
79
80         if !cmd.runLocal {
81                 if cmd.outputDir != "" {
82                         err = errors.New("cannot specify output dir in non-local mode")
83                         return 2
84                 }
85                 if cmd.vcpus < 1 {
86                         var infiles []string
87                         infiles, err = listInputFiles(flags.Args())
88                         if err != nil {
89                                 return 1
90                         }
91                         if cmd.vcpus = len(infiles) * 2; cmd.vcpus > 32 {
92                                 cmd.vcpus = 32
93                         }
94                 }
95                 runner := arvadosContainerRunner{
96                         Name:        "lightning vcf2fasta",
97                         Client:      arvados.NewClientFromEnv(),
98                         ProjectUUID: cmd.projectUUID,
99                         RAM:         2<<30 + int64(cmd.vcpus)<<28,
100                         VCPUs:       cmd.vcpus,
101                         Priority:    *priority,
102                         Mounts: map[string]map[string]interface{}{
103                                 "/gvcf_regions.py": map[string]interface{}{
104                                         "kind":    "text",
105                                         "content": string(cmd.gvcfRegionsPyData),
106                                 },
107                         },
108                 }
109                 err = runner.TranslatePaths(&cmd.refFile)
110                 if err != nil {
111                         return 1
112                 }
113                 inputs := flags.Args()
114                 for i := range inputs {
115                         err = runner.TranslatePaths(&inputs[i])
116                         if err != nil {
117                                 return 1
118                         }
119                 }
120                 runner.Args = append([]string{"vcf2fasta", "-local=true", "-ref", cmd.refFile, fmt.Sprintf("-mask=%v", cmd.mask), "-gvcf-regions.py", "/gvcf_regions.py", "-output-dir", "/mnt/output"}, inputs...)
121                 var output string
122                 output, err = runner.Run()
123                 if err != nil {
124                         return 1
125                 }
126                 fmt.Fprintln(stdout, output)
127                 return 0
128         }
129
130         infiles, err := listInputFiles(flags.Args())
131         if err != nil {
132                 return 1
133         }
134
135         type job struct {
136                 vcffile string
137                 phase   int
138         }
139         todo := make(chan job)
140         go func() {
141                 for _, infile := range infiles {
142                         for phase := 1; phase <= 2; phase++ {
143                                 todo <- job{vcffile: infile, phase: phase}
144                         }
145                 }
146                 close(todo)
147         }()
148
149         done := make(chan error, runtime.NumCPU()*2)
150         var wg sync.WaitGroup
151         for i := 0; i < runtime.NumCPU(); i++ {
152                 wg.Add(1)
153                 go func() {
154                         defer wg.Done()
155                         for job := range todo {
156                                 if len(done) > 0 {
157                                         // a different worker encountered an error
158                                         return
159                                 }
160                                 err := cmd.vcf2fasta(job.vcffile, job.phase)
161                                 if err != nil {
162                                         done <- fmt.Errorf("%s phase %d: %s", job.vcffile, job.phase, err)
163                                         return
164                                 }
165                         }
166                 }()
167         }
168         go func() {
169                 wg.Wait()
170                 close(done)
171         }()
172
173         err = <-done
174         if err != nil {
175                 return 1
176         }
177         return 0
178 }
179
180 func maybeInDocker(args, mountfiles []string) []string {
181         if out, err := exec.Command("docker", "image", "ls", "-q", "lightning-runtime").Output(); err != nil || len(out) == 0 {
182                 return args
183         }
184         dockerrun := []string{
185                 "docker", "run", "--rm",
186                 "--log-driver=none",
187         }
188         for _, f := range mountfiles {
189                 dockerrun = append(dockerrun, "--volume="+f+":"+f+":ro")
190         }
191         dockerrun = append(dockerrun, "lightning-runtime")
192         dockerrun = append(dockerrun, args...)
193         return dockerrun
194 }
195
196 func (cmd *vcf2fasta) vcf2fasta(infile string, phase int) error {
197         _, basename := filepath.Split(infile)
198         outfile := filepath.Join(cmd.outputDir, fmt.Sprintf("%s.%d.fasta.gz", basename, phase))
199         outf, err := os.OpenFile(outfile, os.O_CREATE|os.O_WRONLY|os.O_EXCL, 0777)
200         if err != nil {
201                 return fmt.Errorf("error opening output file: %s", err)
202         }
203         defer outf.Close()
204         gzipw := gzip.NewWriter(outf)
205         defer gzipw.Close()
206
207         var maskfile *os.File // reading side of a pipe if we're running bedtools, otherwise nil
208
209         var wg sync.WaitGroup
210         errs := make(chan error, 3)
211         if cmd.mask {
212                 bedr, bedw, err := os.Pipe()
213                 if err != nil {
214                         return err
215                 }
216                 bedargs := []string{"python", "-", "--gvcf_type", "gatk", infile}
217                 bed := exec.Command(bedargs[0], bedargs[1:]...)
218                 bed.Stdin = bytes.NewBuffer(cmd.gvcfRegionsPyData)
219                 bed.Stdout = bedw
220                 wg.Add(1)
221                 go func() {
222                         defer wg.Done()
223                         log.Printf("running %v", bed.Args)
224                         errs <- bed.Run()
225                 }()
226
227                 bedcompr, bedcompw, err := os.Pipe()
228                 if err != nil {
229                         return err
230                 }
231                 bedcompargs := []string{"bedtools", "complement", "-i", "/dev/stdin", "-g", cmd.refFile}
232                 bedcompargs = maybeInDocker(bedcompargs, []string{cmd.refFile, infile})
233                 bedcomp := exec.Command(bedcompargs[0], bedcompargs[1:]...)
234                 bedcomp.Stdin = bedr
235                 bedcomp.Stdout = bedcompw
236                 wg.Add(1)
237                 go func() {
238                         defer wg.Done()
239                         log.Printf("running %v", bedcomp.Args)
240                         errs <- bedcomp.Run()
241                 }()
242                 maskfile = bedcompr
243         }
244
245         wg.Add(1)
246         go func() {
247                 defer wg.Done()
248                 consargs := []string{"bcftools", "consensus", "--fasta-ref", cmd.refFile, "-H", fmt.Sprint(phase)}
249                 if maskfile != nil {
250                         consargs = append(consargs, "--mask", "/dev/stdin")
251                 }
252                 consargs = append(consargs, infile)
253                 indexsuffix := ".tbi"
254                 if _, err := os.Stat(infile + ".csi"); err == nil {
255                         indexsuffix = ".csi"
256                 }
257                 consargs = maybeInDocker(consargs, []string{infile, infile + indexsuffix, cmd.refFile})
258
259                 consensus := exec.Command(consargs[0], consargs[1:]...)
260                 consensus.Stderr = os.Stderr
261                 consensus.Stdout = gzipw
262                 if maskfile != nil {
263                         consensus.Stdin = maskfile
264                 }
265                 log.Printf("running %v", consensus.Args)
266                 err = consensus.Run()
267                 if err != nil {
268                         errs <- err
269                         return
270                 }
271                 err = gzipw.Close()
272                 if err != nil {
273                         errs <- err
274                         return
275                 }
276                 errs <- outf.Close()
277         }()
278
279         go func() {
280                 wg.Wait()
281                 close(errs)
282         }()
283
284         for err := range errs {
285                 if err != nil {
286                         return err
287                 }
288         }
289         return nil
290 }
291
292 func (cmd *vcf2fasta) loadRegionsPy() error {
293         if strings.HasPrefix(cmd.gvcfRegionsPy, "http") {
294                 resp, err := http.Get(cmd.gvcfRegionsPy)
295                 if err != nil {
296                         return err
297                 }
298                 if resp.StatusCode != http.StatusOK {
299                         return fmt.Errorf("get %q: http status %d", cmd.gvcfRegionsPy, resp.StatusCode)
300                 }
301                 buf, err := ioutil.ReadAll(resp.Body)
302                 if err != nil {
303                         return fmt.Errorf("get %q: read body: %s", cmd.gvcfRegionsPy, err)
304                 }
305                 cmd.gvcfRegionsPyData = buf
306                 return nil
307         } else {
308                 buf, err := ioutil.ReadFile(cmd.gvcfRegionsPy)
309                 if err != nil {
310                         return err
311                 }
312                 cmd.gvcfRegionsPyData = buf
313                 return nil
314         }
315 }