9f6f085cd854bed1ddd4e6db14a03918b31eff4f
[lightning.git] / vcf2fasta.go
1 package main
2
3 import (
4         "bytes"
5         "compress/gzip"
6         "errors"
7         "flag"
8         "fmt"
9         "io"
10         "io/ioutil"
11         "net/http"
12         _ "net/http/pprof"
13         "os"
14         "os/exec"
15         "path/filepath"
16         "runtime"
17         "sync"
18
19         "git.arvados.org/arvados.git/sdk/go/arvados"
20         log "github.com/sirupsen/logrus"
21 )
22
23 type vcf2fasta struct {
24         refFile           string
25         mask              bool
26         gvcfRegionsPy     string
27         gvcfRegionsPyData []byte
28         projectUUID       string
29         outputDir         string
30         runLocal          bool
31         vcpus             int
32 }
33
34 func (cmd *vcf2fasta) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
35         var err error
36         defer func() {
37                 if err != nil {
38                         fmt.Fprintf(stderr, "%s\n", err)
39                 }
40         }()
41         flags := flag.NewFlagSet("", flag.ContinueOnError)
42         flags.SetOutput(stderr)
43         flags.StringVar(&cmd.refFile, "ref", "", "reference fasta `file`")
44         flags.BoolVar(&cmd.mask, "mask", false, "mask uncalled regions (default: output hom ref)")
45         flags.StringVar(&cmd.gvcfRegionsPy, "gvcf-regions.py", "https://raw.githubusercontent.com/lijiayong/gvcf_regions/master/gvcf_regions.py", "source of gvcf_regions.py")
46         flags.StringVar(&cmd.projectUUID, "project", "", "project `UUID` for containers and output data")
47         flags.StringVar(&cmd.outputDir, "output-dir", "", "output directory")
48         flags.IntVar(&cmd.vcpus, "vcpus", 0, "number of VCPUs to request for arvados container (default: 2*number of input files, max 32)")
49         flags.BoolVar(&cmd.runLocal, "local", false, "run on local host (default: run in an arvados container)")
50         pprof := flags.String("pprof", "", "serve Go profile data at http://`[addr]:port`")
51         err = flags.Parse(args)
52         if err == flag.ErrHelp {
53                 err = nil
54                 return 0
55         } else if err != nil {
56                 return 2
57         } else if cmd.refFile == "" {
58                 err = errors.New("reference data (-ref) not specified")
59                 return 2
60         } else if flags.NArg() == 0 {
61                 flags.Usage()
62                 return 2
63         }
64
65         if *pprof != "" {
66                 go func() {
67                         log.Println(http.ListenAndServe(*pprof, nil))
68                 }()
69         }
70
71         if !cmd.runLocal {
72                 if cmd.outputDir != "" {
73                         err = errors.New("cannot specify output dir in non-local mode")
74                         return 2
75                 }
76                 if cmd.vcpus < 1 {
77                         var infiles []string
78                         infiles, err = listInputFiles(flags.Args())
79                         if err != nil {
80                                 return 1
81                         }
82                         if cmd.vcpus = len(infiles) * 2; cmd.vcpus > 32 {
83                                 cmd.vcpus = 32
84                         }
85                 }
86                 runner := arvadosContainerRunner{
87                         Name:        "lightning vcf2fasta",
88                         Client:      arvados.NewClientFromEnv(),
89                         ProjectUUID: cmd.projectUUID,
90                         RAM:         2<<30 + int64(cmd.vcpus)<<28,
91                         VCPUs:       cmd.vcpus,
92                 }
93                 err = runner.TranslatePaths(&cmd.refFile)
94                 if err != nil {
95                         return 1
96                 }
97                 inputs := flags.Args()
98                 for i := range inputs {
99                         err = runner.TranslatePaths(&inputs[i])
100                         if err != nil {
101                                 return 1
102                         }
103                 }
104                 runner.Args = append([]string{"vcf2fasta", "-local=true", "-ref", cmd.refFile, fmt.Sprintf("-mask=%v", cmd.mask), "-gvcf-regions.py", cmd.gvcfRegionsPy, "-output-dir", "/mnt/output"}, inputs...)
105                 var output string
106                 output, err = runner.Run()
107                 if err != nil {
108                         return 1
109                 }
110                 fmt.Fprintln(stdout, output)
111                 return 0
112         }
113
114         if cmd.mask {
115                 var resp *http.Response
116                 resp, err = http.Get(cmd.gvcfRegionsPy)
117                 if err != nil {
118                         return 1
119                 }
120                 if resp.StatusCode != http.StatusOK {
121                         err = fmt.Errorf("get %q: http status %d", cmd.gvcfRegionsPy, resp.StatusCode)
122                         return 1
123                 }
124                 cmd.gvcfRegionsPyData, err = ioutil.ReadAll(resp.Body)
125                 if err != nil {
126                         err = fmt.Errorf("get %q: read body: %s", cmd.gvcfRegionsPy, err)
127                         return 1
128                 }
129         }
130
131         infiles, err := listInputFiles(flags.Args())
132         if err != nil {
133                 return 1
134         }
135
136         type job struct {
137                 vcffile string
138                 phase   int
139         }
140         todo := make(chan job)
141         go func() {
142                 for _, infile := range infiles {
143                         for phase := 1; phase <= 2; phase++ {
144                                 todo <- job{vcffile: infile, phase: phase}
145                         }
146                 }
147                 close(todo)
148         }()
149
150         done := make(chan error, runtime.NumCPU()*2)
151         var wg sync.WaitGroup
152         for i := 0; i < runtime.NumCPU(); i++ {
153                 wg.Add(1)
154                 go func() {
155                         defer wg.Done()
156                         for job := range todo {
157                                 if len(done) > 0 {
158                                         // a different worker encountered an error
159                                         return
160                                 }
161                                 err := cmd.vcf2fasta(job.vcffile, job.phase)
162                                 if err != nil {
163                                         done <- fmt.Errorf("%s phase %d: %s", job.vcffile, job.phase, err)
164                                         return
165                                 }
166                         }
167                 }()
168         }
169         go func() {
170                 wg.Wait()
171                 close(done)
172         }()
173
174         err = <-done
175         if err != nil {
176                 return 1
177         }
178         return 0
179 }
180
181 func maybeInDocker(args, mountfiles []string) []string {
182         if out, err := exec.Command("docker", "image", "ls", "-q", "lightning-runtime").Output(); err != nil || len(out) == 0 {
183                 return args
184         }
185         dockerrun := []string{
186                 "docker", "run", "--rm",
187                 "--log-driver=none",
188         }
189         for _, f := range mountfiles {
190                 dockerrun = append(dockerrun, "--volume="+f+":"+f+":ro")
191         }
192         dockerrun = append(dockerrun, "lightning-runtime")
193         dockerrun = append(dockerrun, args...)
194         return dockerrun
195 }
196
197 func (cmd *vcf2fasta) vcf2fasta(infile string, phase int) error {
198         _, basename := filepath.Split(infile)
199         outfile := filepath.Join(cmd.outputDir, fmt.Sprintf("%s.%d.fasta.gz", basename, phase))
200         outf, err := os.OpenFile(outfile, os.O_CREATE|os.O_WRONLY|os.O_EXCL, 0777)
201         if err != nil {
202                 return fmt.Errorf("error opening output file: %s", err)
203         }
204         defer outf.Close()
205         gzipw := gzip.NewWriter(outf)
206         defer gzipw.Close()
207
208         var maskfile *os.File // reading side of a pipe if we're running bedtools, otherwise nil
209
210         var wg sync.WaitGroup
211         errs := make(chan error, 3)
212         if cmd.mask {
213                 bedr, bedw, err := os.Pipe()
214                 if err != nil {
215                         return err
216                 }
217                 bedargs := []string{"python", "-", "--gvcf_type", "gatk", infile}
218                 bed := exec.Command(bedargs[0], bedargs[1:]...)
219                 bed.Stdin = bytes.NewBuffer(cmd.gvcfRegionsPyData)
220                 bed.Stdout = bedw
221                 wg.Add(1)
222                 go func() {
223                         defer wg.Done()
224                         errs <- bed.Run()
225                 }()
226
227                 bedcompr, bedcompw, err := os.Pipe()
228                 if err != nil {
229                         return err
230                 }
231                 bedcompargs := []string{"bedtools", "complement", "-i", "/dev/stdin", "-g", cmd.refFile}
232                 bedcompargs = maybeInDocker(bedcompargs, []string{cmd.refFile, infile})
233                 bedcomp := exec.Command(bedcompargs[0], bedcompargs[1:]...)
234                 bedcomp.Stdin = bedr
235                 bedcomp.Stdout = bedcompw
236                 wg.Add(1)
237                 go func() {
238                         defer wg.Done()
239                         errs <- bedcomp.Run()
240                 }()
241                 maskfile = bedcompr
242         }
243
244         consargs := []string{"bcftools", "consensus", "--fasta-ref", cmd.refFile, "-H", fmt.Sprint(phase)}
245         if maskfile != nil {
246                 consargs = append(consargs, "--mask", "/dev/fd/3")
247         }
248         consargs = append(consargs, infile)
249         indexsuffix := ".tbi"
250         if _, err := os.Stat(infile + ".csi"); err == nil {
251                 indexsuffix = ".csi"
252         }
253         consargs = maybeInDocker(consargs, []string{infile, infile + indexsuffix, cmd.refFile})
254
255         consensus := exec.Command(consargs[0], consargs[1:]...)
256         consensus.Stderr = os.Stderr
257         consensus.Stdout = gzipw
258         if maskfile != nil {
259                 consensus.ExtraFiles = []*os.File{maskfile}
260         }
261         err = consensus.Start()
262         if err != nil {
263                 return err
264         }
265         err = consensus.Wait()
266         if err != nil {
267                 return err
268         }
269         err = gzipw.Close()
270         if err != nil {
271                 return err
272         }
273         err = outf.Close()
274         if err != nil {
275                 return err
276         }
277         return nil
278 }