Merge branch 'main' into 19785-add-cwl
[lightning.git] / import.go
1 // Copyright (C) The Lightning Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package lightning
6
7 import (
8         "bufio"
9         "compress/gzip"
10         "context"
11         "encoding/gob"
12         "encoding/json"
13         "errors"
14         "flag"
15         "fmt"
16         "io"
17         "io/ioutil"
18         "net/http"
19         _ "net/http/pprof"
20         "os"
21         "os/exec"
22         "path/filepath"
23         "regexp"
24         "runtime"
25         "sort"
26         "strings"
27         "sync"
28         "sync/atomic"
29         "time"
30
31         "github.com/klauspost/pgzip"
32         log "github.com/sirupsen/logrus"
33 )
34
35 type importer struct {
36         tagLibraryFile      string
37         refFile             string
38         outputFile          string
39         projectUUID         string
40         loglevel            string
41         priority            int
42         runLocal            bool
43         skipOOO             bool
44         outputTiles         bool
45         saveIncompleteTiles bool
46         outputStats         string
47         matchChromosome     *regexp.Regexp
48         encoder             *gob.Encoder
49         retainAfterEncoding bool // keep imported genomes/refseqs in memory after writing to disk
50         batchArgs
51 }
52
53 func (cmd *importer) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
54         var err error
55         defer func() {
56                 if err != nil {
57                         fmt.Fprintf(stderr, "%s\n", err)
58                 }
59         }()
60         flags := flag.NewFlagSet("", flag.ContinueOnError)
61         flags.SetOutput(stderr)
62         flags.StringVar(&cmd.tagLibraryFile, "tag-library", "", "tag library fasta `file`")
63         flags.StringVar(&cmd.refFile, "ref", "", "reference fasta `file`")
64         flags.StringVar(&cmd.outputFile, "o", "-", "output `file`")
65         flags.StringVar(&cmd.projectUUID, "project", "", "project `UUID` for output data")
66         flags.BoolVar(&cmd.runLocal, "local", false, "run on local host (default: run in an arvados container)")
67         flags.BoolVar(&cmd.skipOOO, "skip-ooo", false, "skip out-of-order tags")
68         flags.BoolVar(&cmd.outputTiles, "output-tiles", false, "include tile variant sequences in output file")
69         flags.BoolVar(&cmd.saveIncompleteTiles, "save-incomplete-tiles", false, "treat tiles with no-calls as regular tiles")
70         flags.StringVar(&cmd.outputStats, "output-stats", "", "output stats to `file` (json)")
71         cmd.batchArgs.Flags(flags)
72         matchChromosome := flags.String("match-chromosome", "^(chr)?([0-9]+|X|Y|MT?)$", "import chromosomes that match the given `regexp`")
73         flags.IntVar(&cmd.priority, "priority", 500, "container request priority")
74         pprof := flags.String("pprof", "", "serve Go profile data at http://`[addr]:port`")
75         flags.StringVar(&cmd.loglevel, "loglevel", "info", "logging threshold (trace, debug, info, warn, error, fatal, or panic)")
76         err = flags.Parse(args)
77         if err == flag.ErrHelp {
78                 err = nil
79                 return 0
80         } else if err != nil {
81                 return 2
82         } else if cmd.tagLibraryFile == "" {
83                 fmt.Fprintln(os.Stderr, "cannot import without -tag-library argument")
84                 return 2
85         } else if flags.NArg() == 0 {
86                 flags.Usage()
87                 return 2
88         }
89
90         if *pprof != "" {
91                 go func() {
92                         log.Println(http.ListenAndServe(*pprof, nil))
93                 }()
94         }
95
96         lvl, err := log.ParseLevel(cmd.loglevel)
97         if err != nil {
98                 return 2
99         }
100         log.SetLevel(lvl)
101
102         cmd.matchChromosome, err = regexp.Compile(*matchChromosome)
103         if err != nil {
104                 return 1
105         }
106
107         if !cmd.runLocal {
108                 err = cmd.runBatches(stdout, flags.Args())
109                 if err != nil {
110                         return 1
111                 }
112                 return 0
113         }
114
115         infiles, err := listInputFiles(flags.Args())
116         if err != nil {
117                 return 1
118         }
119         infiles = cmd.batchArgs.Slice(infiles)
120
121         taglib, err := cmd.loadTagLibrary()
122         if err != nil {
123                 return 1
124         }
125
126         var outw, outf io.WriteCloser
127         if cmd.outputFile == "-" {
128                 outw = nopCloser{stdout}
129         } else {
130                 outf, err = os.OpenFile(cmd.outputFile, os.O_CREATE|os.O_WRONLY, 0777)
131                 if err != nil {
132                         return 1
133                 }
134                 defer outf.Close()
135                 if strings.HasSuffix(cmd.outputFile, ".gz") {
136                         outw = pgzip.NewWriter(outf)
137                 } else {
138                         outw = outf
139                 }
140         }
141         bufw := bufio.NewWriterSize(outw, 64*1024*1024)
142         cmd.encoder = gob.NewEncoder(bufw)
143
144         tilelib := &tileLibrary{taglib: taglib, retainNoCalls: cmd.saveIncompleteTiles, skipOOO: cmd.skipOOO}
145         if cmd.outputTiles {
146                 cmd.encoder.Encode(LibraryEntry{TagSet: taglib.Tags()})
147                 tilelib.encoder = cmd.encoder
148         }
149         go func() {
150                 for range time.Tick(10 * time.Minute) {
151                         log.Printf("tilelib.Len() == %d", tilelib.Len())
152                 }
153         }()
154
155         err = cmd.tileInputs(tilelib, infiles)
156         if err != nil {
157                 return 1
158         }
159         err = bufw.Flush()
160         if err != nil {
161                 return 1
162         }
163         err = outw.Close()
164         if err != nil {
165                 return 1
166         }
167         if outf != nil && outf != outw {
168                 err = outf.Close()
169                 if err != nil {
170                         return 1
171                 }
172         }
173         return 0
174 }
175
176 func (cmd *importer) runBatches(stdout io.Writer, inputs []string) error {
177         if cmd.outputFile != "-" {
178                 // Not yet implemented, but this should write
179                 // the collection to an existing collection,
180                 // possibly even an in-place update.
181                 return errors.New("cannot specify output file in container mode: not implemented")
182         }
183         runner := arvadosContainerRunner{
184                 Name:        "lightning import",
185                 Client:      arvadosClientFromEnv,
186                 ProjectUUID: cmd.projectUUID,
187                 APIAccess:   true,
188                 RAM:         700000000000,
189                 VCPUs:       96,
190                 Priority:    cmd.priority,
191                 KeepCache:   1,
192         }
193         err := runner.TranslatePaths(&cmd.tagLibraryFile, &cmd.refFile, &cmd.outputFile)
194         if err != nil {
195                 return err
196         }
197         for i := range inputs {
198                 err = runner.TranslatePaths(&inputs[i])
199                 if err != nil {
200                         return err
201                 }
202         }
203
204         outputs, err := cmd.batchArgs.RunBatches(context.Background(), func(ctx context.Context, batch int) (string, error) {
205                 runner := runner
206                 if cmd.batches > 1 {
207                         runner.Name += fmt.Sprintf(" (batch %d of %d)", batch, cmd.batches)
208                 }
209                 runner.Args = []string{"import",
210                         "-local=true",
211                         "-loglevel=" + cmd.loglevel,
212                         "-pprof=:6061",
213                         fmt.Sprintf("-skip-ooo=%v", cmd.skipOOO),
214                         fmt.Sprintf("-output-tiles=%v", cmd.outputTiles),
215                         fmt.Sprintf("-save-incomplete-tiles=%v", cmd.saveIncompleteTiles),
216                         "-match-chromosome", cmd.matchChromosome.String(),
217                         "-output-stats", "/mnt/output/stats.json",
218                         "-tag-library", cmd.tagLibraryFile,
219                         "-ref", cmd.refFile,
220                         "-o", "/mnt/output/library.gob.gz",
221                 }
222                 runner.Args = append(runner.Args, cmd.batchArgs.Args(batch)...)
223                 runner.Args = append(runner.Args, inputs...)
224                 return runner.RunContext(ctx)
225         })
226         if err != nil {
227                 return err
228         }
229         var outfiles []string
230         for _, o := range outputs {
231                 outfiles = append(outfiles, o+"/library.gob.gz")
232         }
233         fmt.Fprintln(stdout, strings.Join(outfiles, " "))
234         return nil
235 }
236
237 func (cmd *importer) tileFasta(tilelib *tileLibrary, infile string, isRef bool) (tileSeq, []importStats, error) {
238         var input io.ReadCloser
239         input, err := open(infile)
240         if err != nil {
241                 return nil, nil, err
242         }
243         defer input.Close()
244         input = ioutil.NopCloser(bufio.NewReaderSize(input, 8*1024*1024))
245         if strings.HasSuffix(infile, ".gz") {
246                 input, err = pgzip.NewReader(input)
247                 if err != nil {
248                         return nil, nil, err
249                 }
250                 defer input.Close()
251         }
252         return tilelib.TileFasta(infile, input, cmd.matchChromosome, isRef)
253 }
254
255 func (cmd *importer) loadTagLibrary() (*tagLibrary, error) {
256         log.Printf("tag library %s load starting", cmd.tagLibraryFile)
257         f, err := open(cmd.tagLibraryFile)
258         if err != nil {
259                 return nil, err
260         }
261         defer f.Close()
262         rdr := ioutil.NopCloser(bufio.NewReaderSize(f, 64*1024*1024))
263         if strings.HasSuffix(cmd.tagLibraryFile, ".gz") {
264                 rdr, err = gzip.NewReader(rdr)
265                 if err != nil {
266                         return nil, fmt.Errorf("%s: gzip: %s", cmd.tagLibraryFile, err)
267                 }
268                 defer rdr.Close()
269         }
270         var taglib tagLibrary
271         err = taglib.Load(rdr)
272         if err != nil {
273                 return nil, err
274         }
275         if taglib.Len() < 1 {
276                 return nil, fmt.Errorf("cannot tile: tag library is empty")
277         }
278         log.Printf("tag library %s load done", cmd.tagLibraryFile)
279         return &taglib, nil
280 }
281
282 var (
283         vcfFilenameRe    = regexp.MustCompile(`\.vcf(\.gz)?$`)
284         fasta1FilenameRe = regexp.MustCompile(`\.1\.fa(sta)?(\.gz)?$`)
285         fasta2FilenameRe = regexp.MustCompile(`\.2\.fa(sta)?(\.gz)?$`)
286         fastaFilenameRe  = regexp.MustCompile(`\.fa(sta)?(\.gz)?$`)
287 )
288
289 func listInputFiles(paths []string) (files []string, err error) {
290         for _, path := range paths {
291                 if fi, err := os.Stat(path); err != nil {
292                         return nil, fmt.Errorf("%s: stat failed: %s", path, err)
293                 } else if !fi.IsDir() {
294                         if !fasta2FilenameRe.MatchString(path) {
295                                 files = append(files, path)
296                         }
297                         continue
298                 }
299                 d, err := os.Open(path)
300                 if err != nil {
301                         return nil, fmt.Errorf("%s: open failed: %s", path, err)
302                 }
303                 defer d.Close()
304                 names, err := d.Readdirnames(0)
305                 if err != nil {
306                         return nil, fmt.Errorf("%s: readdir failed: %s", path, err)
307                 }
308                 sort.Strings(names)
309                 for _, name := range names {
310                         if vcfFilenameRe.MatchString(name) {
311                                 files = append(files, filepath.Join(path, name))
312                         } else if fastaFilenameRe.MatchString(name) && !fasta2FilenameRe.MatchString(name) {
313                                 files = append(files, filepath.Join(path, name))
314                         }
315                 }
316                 d.Close()
317         }
318         for _, file := range files {
319                 if fastaFilenameRe.MatchString(file) {
320                         continue
321                 } else if vcfFilenameRe.MatchString(file) {
322                         if _, err := os.Stat(file + ".csi"); err == nil {
323                                 continue
324                         } else if _, err = os.Stat(file + ".tbi"); err == nil {
325                                 continue
326                         } else {
327                                 return nil, fmt.Errorf("%s: cannot read without .tbi or .csi index file", file)
328                         }
329                 } else {
330                         return nil, fmt.Errorf("don't know how to handle filename %s", file)
331                 }
332         }
333         return
334 }
335
336 func (cmd *importer) tileInputs(tilelib *tileLibrary, infiles []string) error {
337         starttime := time.Now()
338         errs := make(chan error, 1)
339         todo := make(chan func() error, len(infiles)*2)
340         allstats := make([][]importStats, len(infiles)*2)
341         var encodeJobs sync.WaitGroup
342         for idx, infile := range infiles {
343                 idx, infile := idx, infile
344                 var phases sync.WaitGroup
345                 phases.Add(2)
346                 variants := make([][]tileVariantID, 2)
347                 if fasta1FilenameRe.MatchString(infile) {
348                         todo <- func() error {
349                                 defer phases.Done()
350                                 log.Printf("%s starting", infile)
351                                 defer log.Printf("%s done", infile)
352                                 tseqs, stats, err := cmd.tileFasta(tilelib, infile, false)
353                                 allstats[idx*2] = stats
354                                 var kept, dropped int
355                                 variants[0], kept, dropped = tseqs.Variants()
356                                 log.Printf("%s found %d unique tags plus %d repeats", infile, kept, dropped)
357                                 return err
358                         }
359                         infile2 := fasta1FilenameRe.ReplaceAllString(infile, `.2.fa$1$2`)
360                         todo <- func() error {
361                                 defer phases.Done()
362                                 log.Printf("%s starting", infile2)
363                                 defer log.Printf("%s done", infile2)
364                                 tseqs, stats, err := cmd.tileFasta(tilelib, infile2, false)
365                                 allstats[idx*2+1] = stats
366                                 var kept, dropped int
367                                 variants[1], kept, dropped = tseqs.Variants()
368                                 log.Printf("%s found %d unique tags plus %d repeats", infile2, kept, dropped)
369                                 return err
370                         }
371                 } else if fastaFilenameRe.MatchString(infile) {
372                         todo <- func() error {
373                                 defer phases.Done()
374                                 defer phases.Done()
375                                 log.Printf("%s starting", infile)
376                                 defer log.Printf("%s done", infile)
377                                 tseqs, stats, err := cmd.tileFasta(tilelib, infile, true)
378                                 allstats[idx*2] = stats
379                                 if err != nil {
380                                         return err
381                                 }
382                                 totlen := 0
383                                 for _, tseq := range tseqs {
384                                         totlen += len(tseq)
385                                 }
386                                 log.Printf("%s tiled %d seqs, total len %d", infile, len(tseqs), totlen)
387
388                                 if cmd.retainAfterEncoding {
389                                         tilelib.mtx.Lock()
390                                         if tilelib.refseqs == nil {
391                                                 tilelib.refseqs = map[string]map[string][]tileLibRef{}
392                                         }
393                                         tilelib.refseqs[infile] = tseqs
394                                         tilelib.mtx.Unlock()
395                                 }
396
397                                 return cmd.encoder.Encode(LibraryEntry{
398                                         CompactSequences: []CompactSequence{{Name: infile, TileSequences: tseqs}},
399                                 })
400                         }
401                         // Don't write out a CompactGenomes entry
402                         continue
403                 } else if vcfFilenameRe.MatchString(infile) {
404                         for phase := 0; phase < 2; phase++ {
405                                 phase := phase
406                                 todo <- func() error {
407                                         defer phases.Done()
408                                         log.Printf("%s phase %d starting", infile, phase+1)
409                                         defer log.Printf("%s phase %d done", infile, phase+1)
410                                         tseqs, stats, err := cmd.tileGVCF(tilelib, infile, phase)
411                                         allstats[idx*2] = stats
412                                         var kept, dropped int
413                                         variants[phase], kept, dropped = tseqs.Variants()
414                                         log.Printf("%s phase %d found %d unique tags plus %d repeats", infile, phase+1, kept, dropped)
415                                         return err
416                                 }
417                         }
418                 } else {
419                         panic(fmt.Sprintf("bug: unhandled filename %q", infile))
420                 }
421                 encodeJobs.Add(1)
422                 go func() {
423                         defer encodeJobs.Done()
424                         phases.Wait()
425                         if len(errs) > 0 {
426                                 return
427                         }
428                         variants := flatten(variants)
429                         err := cmd.encoder.Encode(LibraryEntry{
430                                 CompactGenomes: []CompactGenome{{Name: infile, Variants: variants}},
431                         })
432                         if err != nil {
433                                 select {
434                                 case errs <- err:
435                                 default:
436                                 }
437                         }
438                         if cmd.retainAfterEncoding {
439                                 tilelib.mtx.Lock()
440                                 if tilelib.compactGenomes == nil {
441                                         tilelib.compactGenomes = make(map[string][]tileVariantID)
442                                 }
443                                 tilelib.compactGenomes[infile] = variants
444                                 tilelib.mtx.Unlock()
445                         }
446                 }()
447         }
448         go close(todo)
449         var tileJobs sync.WaitGroup
450         var running int64
451         for i := 0; i < runtime.GOMAXPROCS(-1)*2; i++ {
452                 tileJobs.Add(1)
453                 atomic.AddInt64(&running, 1)
454                 go func() {
455                         defer tileJobs.Done()
456                         defer atomic.AddInt64(&running, -1)
457                         for fn := range todo {
458                                 if len(errs) > 0 {
459                                         return
460                                 }
461                                 err := fn()
462                                 if err != nil {
463                                         select {
464                                         case errs <- err:
465                                         default:
466                                         }
467                                 }
468                                 remain := len(todo) + int(atomic.LoadInt64(&running)) - 1
469                                 if remain < cap(todo) {
470                                         ttl := time.Now().Sub(starttime) * time.Duration(remain) / time.Duration(cap(todo)-remain)
471                                         eta := time.Now().Add(ttl)
472                                         log.Printf("progress %d/%d, eta %v (%v)", cap(todo)-remain, cap(todo), eta, ttl)
473                                 }
474                         }
475                 }()
476         }
477         tileJobs.Wait()
478         if len(errs) > 0 {
479                 // Must not wait on encodeJobs in this case. If the
480                 // tileJobs goroutines exited early, some funcs in
481                 // todo haven't been called, so the corresponding
482                 // encodeJobs will wait forever.
483                 return <-errs
484         }
485         encodeJobs.Wait()
486
487         go close(errs)
488         err := <-errs
489         if err != nil {
490                 return err
491         }
492
493         if cmd.outputStats != "" {
494                 f, err := os.OpenFile(cmd.outputStats, os.O_CREATE|os.O_WRONLY, 0666)
495                 if err != nil {
496                         return err
497                 }
498                 var flatstats []importStats
499                 for _, stats := range allstats {
500                         flatstats = append(flatstats, stats...)
501                 }
502                 err = json.NewEncoder(f).Encode(flatstats)
503                 if err != nil {
504                         return err
505                 }
506         }
507
508         return nil
509 }
510
511 func (cmd *importer) tileGVCF(tilelib *tileLibrary, infile string, phase int) (tileseq tileSeq, stats []importStats, err error) {
512         if cmd.refFile == "" {
513                 err = errors.New("cannot import vcf: reference data (-ref) not specified")
514                 return
515         }
516         args := []string{"bcftools", "consensus", "--fasta-ref", cmd.refFile, "-H", fmt.Sprint(phase + 1), infile}
517         indexsuffix := ".tbi"
518         if _, err := os.Stat(infile + ".csi"); err == nil {
519                 indexsuffix = ".csi"
520         }
521         if out, err := exec.Command("docker", "image", "ls", "-q", "lightning-runtime").Output(); err == nil && len(out) > 0 {
522                 args = append([]string{
523                         "docker", "run", "--rm",
524                         "--log-driver=none",
525                         "--volume=" + infile + ":" + infile + ":ro",
526                         "--volume=" + infile + indexsuffix + ":" + infile + indexsuffix + ":ro",
527                         "--volume=" + cmd.refFile + ":" + cmd.refFile + ":ro",
528                         "lightning-runtime",
529                 }, args...)
530         }
531         consensus := exec.Command(args[0], args[1:]...)
532         consensus.Stderr = os.Stderr
533         stdout, err := consensus.StdoutPipe()
534         defer stdout.Close()
535         if err != nil {
536                 return
537         }
538         err = consensus.Start()
539         if err != nil {
540                 return
541         }
542         defer consensus.Wait()
543         tileseq, stats, err = tilelib.TileFasta(fmt.Sprintf("%s phase %d", infile, phase+1), stdout, cmd.matchChromosome, false)
544         if err != nil {
545                 return
546         }
547         err = stdout.Close()
548         if err != nil {
549                 return
550         }
551         err = consensus.Wait()
552         if err != nil {
553                 err = fmt.Errorf("%s phase %d: bcftools: %s", infile, phase, err)
554                 return
555         }
556         return
557 }
558
559 func flatten(variants [][]tileVariantID) []tileVariantID {
560         ntags := 0
561         for _, v := range variants {
562                 if ntags < len(v) {
563                         ntags = len(v)
564                 }
565         }
566         flat := make([]tileVariantID, ntags*2)
567         for i := 0; i < ntags; i++ {
568                 for hap := 0; hap < 2; hap++ {
569                         if i < len(variants[hap]) {
570                                 flat[i*2+hap] = variants[hap][i]
571                         }
572                 }
573         }
574         return flat
575 }