Improve import and vcf2fasta performance.
[lightning.git] / import.go
1 package main
2
3 import (
4         "bufio"
5         "compress/gzip"
6         "context"
7         "encoding/gob"
8         "encoding/json"
9         "errors"
10         "flag"
11         "fmt"
12         "io"
13         "io/ioutil"
14         "net/http"
15         _ "net/http/pprof"
16         "os"
17         "os/exec"
18         "path/filepath"
19         "regexp"
20         "runtime"
21         "sort"
22         "strings"
23         "sync"
24         "sync/atomic"
25         "time"
26
27         "github.com/klauspost/pgzip"
28         log "github.com/sirupsen/logrus"
29 )
30
31 type importer struct {
32         tagLibraryFile      string
33         refFile             string
34         outputFile          string
35         projectUUID         string
36         loglevel            string
37         priority            int
38         runLocal            bool
39         skipOOO             bool
40         outputTiles         bool
41         saveIncompleteTiles bool
42         outputStats         string
43         matchChromosome     *regexp.Regexp
44         encoder             *gob.Encoder
45         batchArgs
46 }
47
48 func (cmd *importer) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
49         var err error
50         defer func() {
51                 if err != nil {
52                         fmt.Fprintf(stderr, "%s\n", err)
53                 }
54         }()
55         flags := flag.NewFlagSet("", flag.ContinueOnError)
56         flags.SetOutput(stderr)
57         flags.StringVar(&cmd.tagLibraryFile, "tag-library", "", "tag library fasta `file`")
58         flags.StringVar(&cmd.refFile, "ref", "", "reference fasta `file`")
59         flags.StringVar(&cmd.outputFile, "o", "-", "output `file`")
60         flags.StringVar(&cmd.projectUUID, "project", "", "project `UUID` for output data")
61         flags.BoolVar(&cmd.runLocal, "local", false, "run on local host (default: run in an arvados container)")
62         flags.BoolVar(&cmd.skipOOO, "skip-ooo", false, "skip out-of-order tags")
63         flags.BoolVar(&cmd.outputTiles, "output-tiles", false, "include tile variant sequences in output file")
64         flags.BoolVar(&cmd.saveIncompleteTiles, "save-incomplete-tiles", false, "treat tiles with no-calls as regular tiles")
65         flags.StringVar(&cmd.outputStats, "output-stats", "", "output stats to `file` (json)")
66         cmd.batchArgs.Flags(flags)
67         matchChromosome := flags.String("match-chromosome", "^(chr)?([0-9]+|X|Y|MT?)$", "import chromosomes that match the given `regexp`")
68         flags.IntVar(&cmd.priority, "priority", 500, "container request priority")
69         pprof := flags.String("pprof", "", "serve Go profile data at http://`[addr]:port`")
70         flags.StringVar(&cmd.loglevel, "loglevel", "info", "logging threshold (trace, debug, info, warn, error, fatal, or panic)")
71         err = flags.Parse(args)
72         if err == flag.ErrHelp {
73                 err = nil
74                 return 0
75         } else if err != nil {
76                 return 2
77         } else if cmd.tagLibraryFile == "" {
78                 fmt.Fprintln(os.Stderr, "cannot import without -tag-library argument")
79                 return 2
80         } else if flags.NArg() == 0 {
81                 flags.Usage()
82                 return 2
83         }
84
85         if *pprof != "" {
86                 go func() {
87                         log.Println(http.ListenAndServe(*pprof, nil))
88                 }()
89         }
90
91         lvl, err := log.ParseLevel(cmd.loglevel)
92         if err != nil {
93                 return 2
94         }
95         log.SetLevel(lvl)
96
97         cmd.matchChromosome, err = regexp.Compile(*matchChromosome)
98         if err != nil {
99                 return 1
100         }
101
102         if !cmd.runLocal {
103                 err = cmd.runBatches(stdout, flags.Args())
104                 if err != nil {
105                         return 1
106                 }
107                 return 0
108         }
109
110         infiles, err := listInputFiles(flags.Args())
111         if err != nil {
112                 return 1
113         }
114         infiles = cmd.batchArgs.Slice(infiles)
115
116         taglib, err := cmd.loadTagLibrary()
117         if err != nil {
118                 return 1
119         }
120
121         var outw, outf io.WriteCloser
122         if cmd.outputFile == "-" {
123                 outw = nopCloser{stdout}
124         } else {
125                 outf, err = os.OpenFile(cmd.outputFile, os.O_CREATE|os.O_WRONLY, 0777)
126                 if err != nil {
127                         return 1
128                 }
129                 defer outf.Close()
130                 if strings.HasSuffix(cmd.outputFile, ".gz") {
131                         outw = pgzip.NewWriter(outf)
132                 } else {
133                         outw = outf
134                 }
135         }
136         bufw := bufio.NewWriterSize(outw, 64*1024*1024)
137         cmd.encoder = gob.NewEncoder(bufw)
138
139         tilelib := &tileLibrary{taglib: taglib, retainNoCalls: cmd.saveIncompleteTiles, skipOOO: cmd.skipOOO}
140         if cmd.outputTiles {
141                 cmd.encoder.Encode(LibraryEntry{TagSet: taglib.Tags()})
142                 tilelib.encoder = cmd.encoder
143         }
144         go func() {
145                 for range time.Tick(10 * time.Minute) {
146                         log.Printf("tilelib.Len() == %d", tilelib.Len())
147                 }
148         }()
149
150         err = cmd.tileInputs(tilelib, infiles)
151         if err != nil {
152                 return 1
153         }
154         err = bufw.Flush()
155         if err != nil {
156                 return 1
157         }
158         err = outw.Close()
159         if err != nil {
160                 return 1
161         }
162         if outf != nil && outf != outw {
163                 err = outf.Close()
164                 if err != nil {
165                         return 1
166                 }
167         }
168         return 0
169 }
170
171 func (cmd *importer) runBatches(stdout io.Writer, inputs []string) error {
172         if cmd.outputFile != "-" {
173                 // Not yet implemented, but this should write
174                 // the collection to an existing collection,
175                 // possibly even an in-place update.
176                 return errors.New("cannot specify output file in container mode: not implemented")
177         }
178         runner := arvadosContainerRunner{
179                 Name:        "lightning import",
180                 Client:      arvadosClientFromEnv,
181                 ProjectUUID: cmd.projectUUID,
182                 APIAccess:   true,
183                 RAM:         300000000000,
184                 VCPUs:       64,
185                 Priority:    cmd.priority,
186                 KeepCache:   1,
187         }
188         err := runner.TranslatePaths(&cmd.tagLibraryFile, &cmd.refFile, &cmd.outputFile)
189         if err != nil {
190                 return err
191         }
192         for i := range inputs {
193                 err = runner.TranslatePaths(&inputs[i])
194                 if err != nil {
195                         return err
196                 }
197         }
198
199         outputs, err := cmd.batchArgs.RunBatches(context.Background(), func(ctx context.Context, batch int) (string, error) {
200                 runner := runner
201                 if cmd.batches > 1 {
202                         runner.Name += fmt.Sprintf(" (batch %d of %d)", batch, cmd.batches)
203                 }
204                 runner.Args = []string{"import",
205                         "-local=true",
206                         "-loglevel=" + cmd.loglevel,
207                         "-pprof=:6061",
208                         fmt.Sprintf("-skip-ooo=%v", cmd.skipOOO),
209                         fmt.Sprintf("-output-tiles=%v", cmd.outputTiles),
210                         fmt.Sprintf("-save-incomplete-tiles=%v", cmd.saveIncompleteTiles),
211                         "-match-chromosome", cmd.matchChromosome.String(),
212                         "-output-stats", "/mnt/output/stats.json",
213                         "-tag-library", cmd.tagLibraryFile,
214                         "-ref", cmd.refFile,
215                         "-o", "/mnt/output/library.gob.gz",
216                 }
217                 runner.Args = append(runner.Args, cmd.batchArgs.Args(batch)...)
218                 runner.Args = append(runner.Args, inputs...)
219                 return runner.RunContext(ctx)
220         })
221         if err != nil {
222                 return err
223         }
224         var outfiles []string
225         for _, o := range outputs {
226                 outfiles = append(outfiles, o+"/library.gob.gz")
227         }
228         fmt.Fprintln(stdout, strings.Join(outfiles, " "))
229         return nil
230 }
231
232 func (cmd *importer) tileFasta(tilelib *tileLibrary, infile string) (tileSeq, []importStats, error) {
233         var input io.ReadCloser
234         input, err := open(infile)
235         if err != nil {
236                 return nil, nil, err
237         }
238         defer input.Close()
239         input = ioutil.NopCloser(bufio.NewReaderSize(input, 8*1024*1024))
240         if strings.HasSuffix(infile, ".gz") {
241                 input, err = pgzip.NewReader(input)
242                 if err != nil {
243                         return nil, nil, err
244                 }
245                 defer input.Close()
246         }
247         return tilelib.TileFasta(infile, input, cmd.matchChromosome)
248 }
249
250 func (cmd *importer) loadTagLibrary() (*tagLibrary, error) {
251         log.Printf("tag library %s load starting", cmd.tagLibraryFile)
252         f, err := open(cmd.tagLibraryFile)
253         if err != nil {
254                 return nil, err
255         }
256         defer f.Close()
257         rdr := ioutil.NopCloser(bufio.NewReaderSize(f, 64*1024*1024))
258         if strings.HasSuffix(cmd.tagLibraryFile, ".gz") {
259                 rdr, err = gzip.NewReader(rdr)
260                 if err != nil {
261                         return nil, fmt.Errorf("%s: gzip: %s", cmd.tagLibraryFile, err)
262                 }
263                 defer rdr.Close()
264         }
265         var taglib tagLibrary
266         err = taglib.Load(rdr)
267         if err != nil {
268                 return nil, err
269         }
270         if taglib.Len() < 1 {
271                 return nil, fmt.Errorf("cannot tile: tag library is empty")
272         }
273         log.Printf("tag library %s load done", cmd.tagLibraryFile)
274         return &taglib, nil
275 }
276
277 var (
278         vcfFilenameRe    = regexp.MustCompile(`\.vcf(\.gz)?$`)
279         fasta1FilenameRe = regexp.MustCompile(`\.1\.fa(sta)?(\.gz)?$`)
280         fasta2FilenameRe = regexp.MustCompile(`\.2\.fa(sta)?(\.gz)?$`)
281         fastaFilenameRe  = regexp.MustCompile(`\.fa(sta)?(\.gz)?$`)
282 )
283
284 func listInputFiles(paths []string) (files []string, err error) {
285         for _, path := range paths {
286                 if fi, err := os.Stat(path); err != nil {
287                         return nil, fmt.Errorf("%s: stat failed: %s", path, err)
288                 } else if !fi.IsDir() {
289                         if !fasta2FilenameRe.MatchString(path) {
290                                 files = append(files, path)
291                         }
292                         continue
293                 }
294                 d, err := os.Open(path)
295                 if err != nil {
296                         return nil, fmt.Errorf("%s: open failed: %s", path, err)
297                 }
298                 defer d.Close()
299                 names, err := d.Readdirnames(0)
300                 if err != nil {
301                         return nil, fmt.Errorf("%s: readdir failed: %s", path, err)
302                 }
303                 sort.Strings(names)
304                 for _, name := range names {
305                         if vcfFilenameRe.MatchString(name) {
306                                 files = append(files, filepath.Join(path, name))
307                         } else if fastaFilenameRe.MatchString(name) && !fasta2FilenameRe.MatchString(name) {
308                                 files = append(files, filepath.Join(path, name))
309                         }
310                 }
311                 d.Close()
312         }
313         for _, file := range files {
314                 if fastaFilenameRe.MatchString(file) {
315                         continue
316                 } else if vcfFilenameRe.MatchString(file) {
317                         if _, err := os.Stat(file + ".csi"); err == nil {
318                                 continue
319                         } else if _, err = os.Stat(file + ".tbi"); err == nil {
320                                 continue
321                         } else {
322                                 return nil, fmt.Errorf("%s: cannot read without .tbi or .csi index file", file)
323                         }
324                 } else {
325                         return nil, fmt.Errorf("don't know how to handle filename %s", file)
326                 }
327         }
328         return
329 }
330
331 func (cmd *importer) tileInputs(tilelib *tileLibrary, infiles []string) error {
332         starttime := time.Now()
333         errs := make(chan error, 1)
334         todo := make(chan func() error, len(infiles)*2)
335         allstats := make([][]importStats, len(infiles)*2)
336         var encodeJobs sync.WaitGroup
337         for idx, infile := range infiles {
338                 idx, infile := idx, infile
339                 var phases sync.WaitGroup
340                 phases.Add(2)
341                 variants := make([][]tileVariantID, 2)
342                 if fasta1FilenameRe.MatchString(infile) {
343                         todo <- func() error {
344                                 defer phases.Done()
345                                 log.Printf("%s starting", infile)
346                                 defer log.Printf("%s done", infile)
347                                 tseqs, stats, err := cmd.tileFasta(tilelib, infile)
348                                 allstats[idx*2] = stats
349                                 var kept, dropped int
350                                 variants[0], kept, dropped = tseqs.Variants()
351                                 log.Printf("%s found %d unique tags plus %d repeats", infile, kept, dropped)
352                                 return err
353                         }
354                         infile2 := fasta1FilenameRe.ReplaceAllString(infile, `.2.fa$1$2`)
355                         todo <- func() error {
356                                 defer phases.Done()
357                                 log.Printf("%s starting", infile2)
358                                 defer log.Printf("%s done", infile2)
359                                 tseqs, stats, err := cmd.tileFasta(tilelib, infile2)
360                                 allstats[idx*2+1] = stats
361                                 var kept, dropped int
362                                 variants[1], kept, dropped = tseqs.Variants()
363                                 log.Printf("%s found %d unique tags plus %d repeats", infile2, kept, dropped)
364
365                                 return err
366                         }
367                 } else if fastaFilenameRe.MatchString(infile) {
368                         todo <- func() error {
369                                 defer phases.Done()
370                                 defer phases.Done()
371                                 log.Printf("%s starting", infile)
372                                 defer log.Printf("%s done", infile)
373                                 tseqs, stats, err := cmd.tileFasta(tilelib, infile)
374                                 allstats[idx*2] = stats
375                                 if err != nil {
376                                         return err
377                                 }
378                                 totlen := 0
379                                 for _, tseq := range tseqs {
380                                         totlen += len(tseq)
381                                 }
382                                 log.Printf("%s tiled %d seqs, total len %d", infile, len(tseqs), totlen)
383                                 return cmd.encoder.Encode(LibraryEntry{
384                                         CompactSequences: []CompactSequence{{Name: infile, TileSequences: tseqs}},
385                                 })
386                         }
387                         // Don't write out a CompactGenomes entry
388                         continue
389                 } else if vcfFilenameRe.MatchString(infile) {
390                         for phase := 0; phase < 2; phase++ {
391                                 phase := phase
392                                 todo <- func() error {
393                                         defer phases.Done()
394                                         log.Printf("%s phase %d starting", infile, phase+1)
395                                         defer log.Printf("%s phase %d done", infile, phase+1)
396                                         tseqs, stats, err := cmd.tileGVCF(tilelib, infile, phase)
397                                         allstats[idx*2] = stats
398                                         var kept, dropped int
399                                         variants[phase], kept, dropped = tseqs.Variants()
400                                         log.Printf("%s phase %d found %d unique tags plus %d repeats", infile, phase+1, kept, dropped)
401                                         return err
402                                 }
403                         }
404                 } else {
405                         panic(fmt.Sprintf("bug: unhandled filename %q", infile))
406                 }
407                 encodeJobs.Add(1)
408                 go func() {
409                         defer encodeJobs.Done()
410                         phases.Wait()
411                         if len(errs) > 0 {
412                                 return
413                         }
414                         err := cmd.encoder.Encode(LibraryEntry{
415                                 CompactGenomes: []CompactGenome{{Name: infile, Variants: flatten(variants)}},
416                         })
417                         if err != nil {
418                                 select {
419                                 case errs <- err:
420                                 default:
421                                 }
422                         }
423                 }()
424         }
425         go close(todo)
426         var tileJobs sync.WaitGroup
427         var running int64
428         for i := 0; i < runtime.GOMAXPROCS(-1)*2; i++ {
429                 tileJobs.Add(1)
430                 atomic.AddInt64(&running, 1)
431                 go func() {
432                         defer tileJobs.Done()
433                         defer atomic.AddInt64(&running, -1)
434                         for fn := range todo {
435                                 if len(errs) > 0 {
436                                         return
437                                 }
438                                 err := fn()
439                                 if err != nil {
440                                         select {
441                                         case errs <- err:
442                                         default:
443                                         }
444                                 }
445                                 remain := len(todo) + int(atomic.LoadInt64(&running)) - 1
446                                 if remain < cap(todo) {
447                                         ttl := time.Now().Sub(starttime) * time.Duration(remain) / time.Duration(cap(todo)-remain)
448                                         eta := time.Now().Add(ttl)
449                                         log.Printf("progress %d/%d, eta %v (%v)", cap(todo)-remain, cap(todo), eta, ttl)
450                                 }
451                         }
452                 }()
453         }
454         tileJobs.Wait()
455         encodeJobs.Wait()
456
457         go close(errs)
458         err := <-errs
459         if err != nil {
460                 return err
461         }
462
463         if cmd.outputStats != "" {
464                 f, err := os.OpenFile(cmd.outputStats, os.O_CREATE|os.O_WRONLY, 0666)
465                 if err != nil {
466                         return err
467                 }
468                 var flatstats []importStats
469                 for _, stats := range allstats {
470                         flatstats = append(flatstats, stats...)
471                 }
472                 err = json.NewEncoder(f).Encode(flatstats)
473                 if err != nil {
474                         return err
475                 }
476         }
477
478         return nil
479 }
480
481 func (cmd *importer) tileGVCF(tilelib *tileLibrary, infile string, phase int) (tileseq tileSeq, stats []importStats, err error) {
482         if cmd.refFile == "" {
483                 err = errors.New("cannot import vcf: reference data (-ref) not specified")
484                 return
485         }
486         args := []string{"bcftools", "consensus", "--fasta-ref", cmd.refFile, "-H", fmt.Sprint(phase + 1), infile}
487         indexsuffix := ".tbi"
488         if _, err := os.Stat(infile + ".csi"); err == nil {
489                 indexsuffix = ".csi"
490         }
491         if out, err := exec.Command("docker", "image", "ls", "-q", "lightning-runtime").Output(); err == nil && len(out) > 0 {
492                 args = append([]string{
493                         "docker", "run", "--rm",
494                         "--log-driver=none",
495                         "--volume=" + infile + ":" + infile + ":ro",
496                         "--volume=" + infile + indexsuffix + ":" + infile + indexsuffix + ":ro",
497                         "--volume=" + cmd.refFile + ":" + cmd.refFile + ":ro",
498                         "lightning-runtime",
499                 }, args...)
500         }
501         consensus := exec.Command(args[0], args[1:]...)
502         consensus.Stderr = os.Stderr
503         stdout, err := consensus.StdoutPipe()
504         defer stdout.Close()
505         if err != nil {
506                 return
507         }
508         err = consensus.Start()
509         if err != nil {
510                 return
511         }
512         defer consensus.Wait()
513         tileseq, stats, err = tilelib.TileFasta(fmt.Sprintf("%s phase %d", infile, phase+1), stdout, cmd.matchChromosome)
514         if err != nil {
515                 return
516         }
517         err = stdout.Close()
518         if err != nil {
519                 return
520         }
521         err = consensus.Wait()
522         if err != nil {
523                 err = fmt.Errorf("%s phase %d: bcftools: %s", infile, phase, err)
524                 return
525         }
526         return
527 }
528
529 func flatten(variants [][]tileVariantID) []tileVariantID {
530         ntags := 0
531         for _, v := range variants {
532                 if ntags < len(v) {
533                         ntags = len(v)
534                 }
535         }
536         flat := make([]tileVariantID, ntags*2)
537         for i := 0; i < ntags; i++ {
538                 for hap := 0; hap < 2; hap++ {
539                         if i < len(variants[hap]) {
540                                 flat[i*2+hap] = variants[hap][i]
541                         }
542                 }
543         }
544         return flat
545 }