Add vcf2fasta command.
[lightning.git] / vcf2fasta.go
1 package main
2
3 import (
4         "compress/gzip"
5         "errors"
6         "flag"
7         "fmt"
8         "io"
9         "net/http"
10         _ "net/http/pprof"
11         "os"
12         "os/exec"
13         "path/filepath"
14         "runtime"
15         "sync"
16
17         "git.arvados.org/arvados.git/sdk/go/arvados"
18         log "github.com/sirupsen/logrus"
19 )
20
21 type vcf2fasta struct {
22         refFile     string
23         projectUUID string
24         outputDir   string
25         runLocal    bool
26         vcpus       int
27 }
28
29 func (cmd *vcf2fasta) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
30         var err error
31         defer func() {
32                 if err != nil {
33                         fmt.Fprintf(stderr, "%s\n", err)
34                 }
35         }()
36         flags := flag.NewFlagSet("", flag.ContinueOnError)
37         flags.SetOutput(stderr)
38         flags.StringVar(&cmd.refFile, "ref", "", "reference fasta `file`")
39         flags.StringVar(&cmd.projectUUID, "project", "", "project `UUID` for containers and output data")
40         flags.StringVar(&cmd.outputDir, "output-dir", "", "output directory")
41         flags.IntVar(&cmd.vcpus, "vcpus", 0, "number of VCPUs to request for arvados container (default: 2*number of input files, max 32)")
42         flags.BoolVar(&cmd.runLocal, "local", false, "run on local host (default: run in an arvados container)")
43         pprof := flags.String("pprof", "", "serve Go profile data at http://`[addr]:port`")
44         err = flags.Parse(args)
45         if err == flag.ErrHelp {
46                 err = nil
47                 return 0
48         } else if err != nil {
49                 return 2
50         } else if cmd.refFile == "" {
51                 err = errors.New("reference data (-ref) not specified")
52                 return 2
53         } else if flags.NArg() == 0 {
54                 flags.Usage()
55                 return 2
56         }
57
58         if *pprof != "" {
59                 go func() {
60                         log.Println(http.ListenAndServe(*pprof, nil))
61                 }()
62         }
63
64         if !cmd.runLocal {
65                 if cmd.outputDir != "" {
66                         err = errors.New("cannot specify output dir in non-local mode")
67                         return 2
68                 }
69                 if cmd.vcpus < 1 {
70                         var infiles []string
71                         infiles, err = listInputFiles(flags.Args())
72                         if err != nil {
73                                 return 1
74                         }
75                         if cmd.vcpus = len(infiles) * 2; cmd.vcpus > 32 {
76                                 cmd.vcpus = 32
77                         }
78                 }
79                 runner := arvadosContainerRunner{
80                         Name:        "lightning vcf2fasta",
81                         Client:      arvados.NewClientFromEnv(),
82                         ProjectUUID: cmd.projectUUID,
83                         RAM:         2<<30 + int64(cmd.vcpus)<<28,
84                         VCPUs:       cmd.vcpus,
85                 }
86                 err = runner.TranslatePaths(&cmd.refFile)
87                 if err != nil {
88                         return 1
89                 }
90                 inputs := flags.Args()
91                 for i := range inputs {
92                         err = runner.TranslatePaths(&inputs[i])
93                         if err != nil {
94                                 return 1
95                         }
96                 }
97                 runner.Args = append([]string{"vcf2fasta", "-local=true", "-ref", cmd.refFile, "-output-dir", "/mnt/output"}, inputs...)
98                 var output string
99                 output, err = runner.Run()
100                 if err != nil {
101                         return 1
102                 }
103                 fmt.Fprintln(stdout, output)
104                 return 0
105         }
106
107         infiles, err := listInputFiles(flags.Args())
108         if err != nil {
109                 return 1
110         }
111
112         type job struct {
113                 vcffile string
114                 phase   int
115         }
116         todo := make(chan job)
117         go func() {
118                 for _, infile := range infiles {
119                         for phase := 1; phase <= 2; phase++ {
120                                 todo <- job{vcffile: infile, phase: phase}
121                         }
122                 }
123                 close(todo)
124         }()
125
126         done := make(chan error, runtime.NumCPU()*2)
127         var wg sync.WaitGroup
128         for i := 0; i < runtime.NumCPU(); i++ {
129                 wg.Add(1)
130                 go func() {
131                         defer wg.Done()
132                         for job := range todo {
133                                 if len(done) > 0 {
134                                         // a different worker encountered an error
135                                         return
136                                 }
137                                 err := cmd.vcf2fasta(job.vcffile, job.phase)
138                                 if err != nil {
139                                         done <- fmt.Errorf("%s phase %d: %s", job.vcffile, job.phase, err)
140                                         return
141                                 }
142                         }
143                 }()
144         }
145         go func() {
146                 wg.Wait()
147                 close(done)
148         }()
149
150         err = <-done
151         if err != nil {
152                 return 1
153         }
154         return 0
155 }
156
157 func (cmd *vcf2fasta) vcf2fasta(infile string, phase int) error {
158         args := []string{"bcftools", "consensus", "--fasta-ref", cmd.refFile, "-H", fmt.Sprint(phase), infile}
159         indexsuffix := ".tbi"
160         if _, err := os.Stat(infile + ".csi"); err == nil {
161                 indexsuffix = ".csi"
162         }
163         if out, err := exec.Command("docker", "image", "ls", "-q", "lightning-runtime").Output(); err == nil && len(out) > 0 {
164                 args = append([]string{
165                         "docker", "run", "--rm",
166                         "--log-driver=none",
167                         "--volume=" + infile + ":" + infile + ":ro",
168                         "--volume=" + infile + indexsuffix + ":" + infile + indexsuffix + ":ro",
169                         "--volume=" + cmd.refFile + ":" + cmd.refFile + ":ro",
170                         "lightning-runtime",
171                 }, args...)
172         }
173
174         _, basename := filepath.Split(infile)
175         outfile := filepath.Join(cmd.outputDir, fmt.Sprintf("%s.%d.fasta.gz", basename, phase))
176         outf, err := os.OpenFile(outfile, os.O_CREATE|os.O_WRONLY|os.O_EXCL, 0777)
177         if err != nil {
178                 return fmt.Errorf("error opening output file: %s", err)
179         }
180         defer outf.Close()
181         gzipw := gzip.NewWriter(outf)
182         defer gzipw.Close()
183
184         consensus := exec.Command(args[0], args[1:]...)
185         consensus.Stderr = os.Stderr
186         consensus.Stdout = gzipw
187         err = consensus.Start()
188         if err != nil {
189                 return err
190         }
191         err = consensus.Wait()
192         if err != nil {
193                 return err
194         }
195         err = gzipw.Close()
196         if err != nil {
197                 return err
198         }
199         err = outf.Close()
200         if err != nil {
201                 return err
202         }
203         return nil
204 }