Sort genome file same way as vcf regions.
[lightning.git] / vcf2fasta.go
1 package main
2
3 import (
4         "bufio"
5         "bytes"
6         "compress/gzip"
7         "errors"
8         "flag"
9         "fmt"
10         "io"
11         "io/ioutil"
12         "net/http"
13         _ "net/http/pprof"
14         "os"
15         "os/exec"
16         "path/filepath"
17         "runtime"
18         "strings"
19         "sync"
20         "syscall"
21
22         "git.arvados.org/arvados.git/sdk/go/arvados"
23         log "github.com/sirupsen/logrus"
24 )
25
26 type vcf2fasta struct {
27         refFile           string
28         genomeFile        string
29         mask              bool
30         gvcfRegionsPy     string
31         gvcfRegionsPyData []byte
32         projectUUID       string
33         outputDir         string
34         runLocal          bool
35         vcpus             int
36
37         stderr io.Writer
38 }
39
40 func (cmd *vcf2fasta) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
41         var err error
42         defer func() {
43                 if err != nil {
44                         fmt.Fprintf(stderr, "%s\n", err)
45                 }
46         }()
47         flags := flag.NewFlagSet("", flag.ContinueOnError)
48         flags.SetOutput(stderr)
49         flags.StringVar(&cmd.refFile, "ref", "", "reference fasta `file`")
50         flags.StringVar(&cmd.genomeFile, "genome", "", "reference genome `file`")
51         flags.BoolVar(&cmd.mask, "mask", false, "mask uncalled regions (default: output hom ref)")
52         flags.StringVar(&cmd.gvcfRegionsPy, "gvcf-regions.py", "https://raw.githubusercontent.com/lijiayong/gvcf_regions/master/gvcf_regions.py", "source of gvcf_regions.py")
53         flags.StringVar(&cmd.projectUUID, "project", "", "project `UUID` for containers and output data")
54         flags.StringVar(&cmd.outputDir, "output-dir", "", "output directory")
55         flags.IntVar(&cmd.vcpus, "vcpus", 0, "number of VCPUs to request for arvados container (default: 2*number of input files, max 32)")
56         flags.BoolVar(&cmd.runLocal, "local", false, "run on local host (default: run in an arvados container)")
57         priority := flags.Int("priority", 500, "container request priority")
58         pprof := flags.String("pprof", "", "serve Go profile data at http://`[addr]:port`")
59         err = flags.Parse(args)
60         if err == flag.ErrHelp {
61                 err = nil
62                 return 0
63         } else if err != nil {
64                 return 2
65         } else if cmd.refFile == "" {
66                 err = errors.New("reference data (-ref) not specified")
67                 return 2
68         } else if flags.NArg() == 0 {
69                 flags.Usage()
70                 return 2
71         }
72         cmd.stderr = stderr
73
74         if *pprof != "" {
75                 go func() {
76                         log.Println(http.ListenAndServe(*pprof, nil))
77                 }()
78         }
79
80         if cmd.mask {
81                 err = cmd.loadRegionsPy()
82                 if err != nil {
83                         return 1
84                 }
85         }
86
87         if !cmd.runLocal {
88                 if cmd.outputDir != "" {
89                         err = errors.New("cannot specify output dir in non-local mode")
90                         return 2
91                 }
92                 if cmd.vcpus < 1 {
93                         var infiles []string
94                         infiles, err = listInputFiles(flags.Args())
95                         if err != nil {
96                                 return 1
97                         }
98                         if cmd.vcpus = len(infiles) * 2; cmd.vcpus > 32 {
99                                 cmd.vcpus = 32
100                         }
101                 }
102                 runner := arvadosContainerRunner{
103                         Name:        "lightning vcf2fasta",
104                         Client:      arvados.NewClientFromEnv(),
105                         ProjectUUID: cmd.projectUUID,
106                         RAM:         2<<30 + int64(cmd.vcpus)<<28,
107                         VCPUs:       cmd.vcpus,
108                         Priority:    *priority,
109                         Mounts: map[string]map[string]interface{}{
110                                 "/gvcf_regions.py": map[string]interface{}{
111                                         "kind":    "text",
112                                         "content": string(cmd.gvcfRegionsPyData),
113                                 },
114                         },
115                 }
116                 err = runner.TranslatePaths(&cmd.refFile, &cmd.genomeFile)
117                 if err != nil {
118                         return 1
119                 }
120                 inputs := flags.Args()
121                 for i := range inputs {
122                         err = runner.TranslatePaths(&inputs[i])
123                         if err != nil {
124                                 return 1
125                         }
126                 }
127                 runner.Args = append([]string{"vcf2fasta",
128                         "-local=true",
129                         "-ref", cmd.refFile, fmt.Sprintf("-mask=%v", cmd.mask),
130                         "-genome", cmd.genomeFile,
131                         "-gvcf-regions.py", "/gvcf_regions.py",
132                         "-output-dir", "/mnt/output"}, inputs...)
133                 var output string
134                 output, err = runner.Run()
135                 if err != nil {
136                         return 1
137                 }
138                 fmt.Fprintln(stdout, output)
139                 return 0
140         }
141
142         infiles, err := listInputFiles(flags.Args())
143         if err != nil {
144                 return 1
145         }
146
147         type job struct {
148                 vcffile string
149                 phase   int
150         }
151         todo := make(chan job)
152         go func() {
153                 for _, infile := range infiles {
154                         for phase := 1; phase <= 2; phase++ {
155                                 todo <- job{vcffile: infile, phase: phase}
156                         }
157                 }
158                 close(todo)
159         }()
160
161         done := make(chan error, runtime.NumCPU()*2)
162         var wg sync.WaitGroup
163         for i := 0; i < runtime.NumCPU(); i++ {
164                 wg.Add(1)
165                 go func() {
166                         defer wg.Done()
167                         for job := range todo {
168                                 if len(done) > 0 {
169                                         // a different worker encountered an error
170                                         return
171                                 }
172                                 err := cmd.vcf2fasta(job.vcffile, job.phase)
173                                 if err != nil {
174                                         done <- fmt.Errorf("%s phase %d: %s", job.vcffile, job.phase, err)
175                                         return
176                                 }
177                         }
178                 }()
179         }
180         go func() {
181                 wg.Wait()
182                 close(done)
183         }()
184
185         err = <-done
186         if err != nil {
187                 return 1
188         }
189         return 0
190 }
191
192 func maybeInDocker(args, mountfiles []string) []string {
193         if out, err := exec.Command("docker", "image", "ls", "-q", "lightning-runtime").Output(); err != nil || len(out) == 0 {
194                 return args
195         }
196         dockerrun := []string{
197                 "docker", "run", "--rm", "-i",
198                 "--log-driver=none",
199         }
200         for _, f := range mountfiles {
201                 dockerrun = append(dockerrun, "--volume="+f+":"+f+":ro")
202         }
203         dockerrun = append(dockerrun, "lightning-runtime")
204         dockerrun = append(dockerrun, args...)
205         return dockerrun
206 }
207
208 func (cmd *vcf2fasta) vcf2fasta(infile string, phase int) error {
209         _, basename := filepath.Split(infile)
210         outfile := filepath.Join(cmd.outputDir, fmt.Sprintf("%s.%d.fasta.gz", basename, phase))
211         outf, err := os.OpenFile(outfile, os.O_CREATE|os.O_WRONLY, 0777)
212         if err != nil {
213                 return fmt.Errorf("error opening output file: %s", err)
214         }
215         defer outf.Close()
216         gzipw := gzip.NewWriter(outf)
217         defer gzipw.Close()
218
219         var maskfifo string // filename of mask fifo if we're running bedtools, otherwise ""
220
221         var wg sync.WaitGroup
222         errs := make(chan error, 2)
223         if cmd.mask {
224                 if cmd.genomeFile == "" {
225                         // TODO: get chromosome sizes from VCF header, ##contig=<ID=chr10,length=133797422>
226                         return errors.New("cannot apply mask without -genome argument")
227                 }
228
229                 var regions bytes.Buffer
230                 bedargs := []string{"python2", "-", "--gvcf_type", "gatk", infile}
231                 bed := exec.Command(bedargs[0], bedargs[1:]...)
232                 bed.Stdin = bytes.NewBuffer(cmd.gvcfRegionsPyData)
233                 bed.Stdout = &regions
234                 bed.Stderr = cmd.stderr
235                 log.Printf("running %v", bed.Args)
236                 err := bed.Run()
237                 log.Printf("exited %v", bed.Args)
238                 if err != nil {
239                         return fmt.Errorf("gvcf_regions: %s", err)
240                 }
241
242                 // "bedtools complement" expects the chromosome sizes
243                 // ("genome file") to appear in the same order as the
244                 // chromosomes in the input vcf, so we need to sort
245                 // them.
246                 genomeFile, err := os.Open(cmd.genomeFile)
247                 if err != nil {
248                         return fmt.Errorf("error opening genome file: %s", err)
249                 }
250                 chrSize := map[string]int{}
251                 scanner := bufio.NewScanner(genomeFile)
252                 for scanner.Scan() {
253                         var chr string
254                         var size int
255                         _, err := fmt.Sscanf(scanner.Text(), "%s\t%d", &chr, &size)
256                         if err != nil {
257                                 return fmt.Errorf("error parsing genome file: %s", err)
258                         }
259                         chrSize[chr] = size
260                 }
261                 if err = scanner.Err(); err != nil {
262                         return fmt.Errorf("error scanning genome file: %s", err)
263                 }
264                 scanner = bufio.NewScanner(bytes.NewBuffer(append([]byte(nil), regions.Bytes()...)))
265                 var sortedGenomeFile bytes.Buffer
266                 for scanner.Scan() {
267                         var chr string
268                         var size int
269                         _, err := fmt.Sscanf(scanner.Text(), "%s\t%d", &chr, &size)
270                         if err != nil {
271                                 return fmt.Errorf("error parsing gvcf_regions output: %s", err)
272                         }
273                         if size, ok := chrSize[chr]; ok {
274                                 fmt.Fprintf(&sortedGenomeFile, "%s\t%d\n", chr, size)
275                                 delete(chrSize, chr)
276                         }
277                 }
278
279                 // The bcftools --mask argument needs to end in ".bed"
280                 // in order to be parsed as a BED file, so we need to
281                 // use a named pipe instead of stdin.
282                 tempdir, err := ioutil.TempDir("", "")
283                 if err != nil {
284                         return fmt.Errorf("TempDir: %s", err)
285                 }
286                 defer os.RemoveAll(tempdir)
287                 maskfifo = filepath.Join(tempdir, "fifo.bed")
288                 err = syscall.Mkfifo(maskfifo, 0600)
289                 if err != nil {
290                         return fmt.Errorf("mkfifo: %s", err)
291                 }
292
293                 // bedtools complement can't seem to read from a pipe
294                 // reliably -- "Error: line number 1 of file
295                 // /dev/stdin has 1 fields, but 3 were expected." --
296                 // so we stage to a temp file.
297                 regionsFile := filepath.Join(tempdir, "gvcf_regions.bed")
298                 err = ioutil.WriteFile(regionsFile, regions.Bytes(), 0644)
299                 if err != nil {
300                         return err
301                 }
302
303                 wg.Add(1)
304                 go func() {
305                         defer wg.Done()
306
307                         maskfifow, err := os.OpenFile(maskfifo, os.O_WRONLY, 0)
308                         if err != nil {
309                                 errs <- err
310                                 return
311                         }
312                         defer maskfifow.Close()
313
314                         bedcompargs := []string{"bedtools", "complement", "-i", regionsFile, "-g", "/dev/stdin"}
315                         bedcompargs = maybeInDocker(bedcompargs, []string{cmd.genomeFile})
316                         bedcomp := exec.Command(bedcompargs[0], bedcompargs[1:]...)
317                         bedcomp.Stdin = &sortedGenomeFile
318                         bedcomp.Stdout = maskfifow
319                         bedcomp.Stderr = cmd.stderr
320                         log.Printf("running %v", bedcomp.Args)
321                         err = bedcomp.Run()
322                         log.Printf("exited %v", bedcomp.Args)
323                         if err != nil {
324                                 errs <- fmt.Errorf("bedtools complement: %s", err)
325                                 return
326                         }
327                         err = maskfifow.Close()
328                         if err != nil {
329                                 errs <- err
330                                 return
331                         }
332                 }()
333         }
334
335         wg.Add(1)
336         go func() {
337                 defer wg.Done()
338                 consargs := []string{"bcftools", "consensus", "--fasta-ref", cmd.refFile, "-H", fmt.Sprint(phase)}
339                 if maskfifo != "" {
340                         consargs = append(consargs, "--mask", maskfifo)
341                 }
342                 consargs = append(consargs, infile)
343                 indexsuffix := ".tbi"
344                 if _, err := os.Stat(infile + ".csi"); err == nil {
345                         indexsuffix = ".csi"
346                 }
347                 mounts := []string{infile, infile + indexsuffix, cmd.refFile}
348                 if maskfifo != "" {
349                         mounts = append(mounts, maskfifo)
350                 }
351                 consargs = maybeInDocker(consargs, mounts)
352
353                 consensus := exec.Command(consargs[0], consargs[1:]...)
354                 consensus.Stderr = os.Stderr
355                 consensus.Stdout = gzipw
356                 consensus.Stderr = cmd.stderr
357                 log.Printf("running %v", consensus.Args)
358                 err = consensus.Run()
359                 if err != nil {
360                         errs <- fmt.Errorf("bcftools consensus: %s", err)
361                         return
362                 }
363                 err = gzipw.Close()
364                 if err != nil {
365                         errs <- err
366                         return
367                 }
368                 errs <- outf.Close()
369         }()
370
371         go func() {
372                 wg.Wait()
373                 close(errs)
374         }()
375
376         for err := range errs {
377                 if err != nil {
378                         wg.Wait()
379                         return err
380                 }
381         }
382         return nil
383 }
384
385 func (cmd *vcf2fasta) loadRegionsPy() error {
386         if strings.HasPrefix(cmd.gvcfRegionsPy, "http") {
387                 resp, err := http.Get(cmd.gvcfRegionsPy)
388                 if err != nil {
389                         return err
390                 }
391                 if resp.StatusCode != http.StatusOK {
392                         return fmt.Errorf("get %q: http status %d", cmd.gvcfRegionsPy, resp.StatusCode)
393                 }
394                 buf, err := ioutil.ReadAll(resp.Body)
395                 if err != nil {
396                         return fmt.Errorf("get %q: read body: %s", cmd.gvcfRegionsPy, err)
397                 }
398                 cmd.gvcfRegionsPyData = buf
399                 return nil
400         } else {
401                 buf, err := ioutil.ReadFile(cmd.gvcfRegionsPy)
402                 if err != nil {
403                         return err
404                 }
405                 cmd.gvcfRegionsPyData = buf
406                 return nil
407         }
408 }