X-Git-Url: https://git.arvados.org/lightning.git/blobdiff_plain/f50e2e9f8c572357ecab7117991efb7c9bea13e7..HEAD:/import.go diff --git a/import.go b/import.go index 71ab688e98..20058dc8a9 100644 --- a/import.go +++ b/import.go @@ -1,13 +1,20 @@ -package main +// Copyright (C) The Lightning Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + +package lightning import ( "bufio" "compress/gzip" + "context" "encoding/gob" + "encoding/json" "errors" "flag" "fmt" "io" + "io/ioutil" "net/http" _ "net/http/pprof" "os" @@ -21,20 +28,26 @@ import ( "sync/atomic" "time" - "git.arvados.org/arvados.git/sdk/go/arvados" + "github.com/klauspost/pgzip" log "github.com/sirupsen/logrus" ) type importer struct { - tagLibraryFile string - refFile string - outputFile string - projectUUID string - runLocal bool - skipOOO bool - outputTiles bool - includeNoCalls bool - encoder *gob.Encoder + tagLibraryFile string + refFile string + outputFile string + projectUUID string + loglevel string + priority int + runLocal bool + skipOOO bool + outputTiles bool + saveIncompleteTiles bool + outputStats string + matchChromosome *regexp.Regexp + encoder *gob.Encoder + retainAfterEncoding bool // keep imported genomes/refseqs in memory after writing to disk + batchArgs } func (cmd *importer) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int { @@ -53,10 +66,13 @@ func (cmd *importer) RunCommand(prog string, args []string, stdin io.Reader, std flags.BoolVar(&cmd.runLocal, "local", false, "run on local host (default: run in an arvados container)") flags.BoolVar(&cmd.skipOOO, "skip-ooo", false, "skip out-of-order tags") flags.BoolVar(&cmd.outputTiles, "output-tiles", false, "include tile variant sequences in output file") - flags.BoolVar(&cmd.includeNoCalls, "include-no-calls", false, "treat tiles with no-calls as regular tiles") - priority := flags.Int("priority", 500, "container request priority") + flags.BoolVar(&cmd.saveIncompleteTiles, "save-incomplete-tiles", false, "treat tiles with no-calls as regular tiles") + flags.StringVar(&cmd.outputStats, "output-stats", "", "output stats to `file` (json)") + cmd.batchArgs.Flags(flags) + matchChromosome := flags.String("match-chromosome", "^(chr)?([0-9]+|X|Y|MT?)$", "import chromosomes that match the given `regexp`") + flags.IntVar(&cmd.priority, "priority", 500, "container request priority") pprof := flags.String("pprof", "", "serve Go profile data at http://`[addr]:port`") - loglevel := flags.String("loglevel", "info", "logging threshold (trace, debug, info, warn, error, fatal, or panic)") + flags.StringVar(&cmd.loglevel, "loglevel", "info", "logging threshold (trace, debug, info, warn, error, fatal, or panic)") err = flags.Parse(args) if err == flag.ErrHelp { err = nil @@ -77,57 +93,22 @@ func (cmd *importer) RunCommand(prog string, args []string, stdin io.Reader, std }() } - lvl, err := log.ParseLevel(*loglevel) + lvl, err := log.ParseLevel(cmd.loglevel) if err != nil { return 2 } log.SetLevel(lvl) + cmd.matchChromosome, err = regexp.Compile(*matchChromosome) + if err != nil { + return 1 + } + if !cmd.runLocal { - runner := arvadosContainerRunner{ - Name: "lightning import", - Client: arvados.NewClientFromEnv(), - ProjectUUID: cmd.projectUUID, - RAM: 60000000000, - VCPUs: 16, - Priority: *priority, - } - err = runner.TranslatePaths(&cmd.tagLibraryFile, &cmd.refFile, &cmd.outputFile) + err = cmd.runBatches(stdout, flags.Args()) if err != nil { return 1 } - inputs := flags.Args() - for i := range inputs { - err = runner.TranslatePaths(&inputs[i]) - if err != nil { - return 1 - } - } - if cmd.outputFile == "-" { - cmd.outputFile = "/mnt/output/library.gob" - } else { - // Not yet implemented, but this should write - // the collection to an existing collection, - // possibly even an in-place update. - err = errors.New("cannot specify output file in container mode: not implemented") - return 1 - } - runner.Args = append([]string{"import", - "-local=true", - "-loglevel=" + *loglevel, - fmt.Sprintf("-skip-ooo=%v", cmd.skipOOO), - fmt.Sprintf("-output-tiles=%v", cmd.outputTiles), - fmt.Sprintf("-include-no-calls=%v", cmd.includeNoCalls), - "-tag-library", cmd.tagLibraryFile, - "-ref", cmd.refFile, - "-o", cmd.outputFile, - }, inputs...) - var output string - output, err = runner.Run() - if err != nil { - return 1 - } - fmt.Fprintln(stdout, output+"/library.gob") return 0 } @@ -135,26 +116,32 @@ func (cmd *importer) RunCommand(prog string, args []string, stdin io.Reader, std if err != nil { return 1 } + infiles = cmd.batchArgs.Slice(infiles) taglib, err := cmd.loadTagLibrary() if err != nil { return 1 } - var output io.WriteCloser + var outw, outf io.WriteCloser if cmd.outputFile == "-" { - output = nopCloser{stdout} + outw = nopCloser{stdout} } else { - output, err = os.OpenFile(cmd.outputFile, os.O_CREATE|os.O_WRONLY, 0777) + outf, err = os.OpenFile(cmd.outputFile, os.O_CREATE|os.O_WRONLY, 0777) if err != nil { return 1 } - defer output.Close() + defer outf.Close() + if strings.HasSuffix(cmd.outputFile, ".gz") { + outw = pgzip.NewWriter(outf) + } else { + outw = outf + } } - bufw := bufio.NewWriter(output) + bufw := bufio.NewWriterSize(outw, 64*1024*1024) cmd.encoder = gob.NewEncoder(bufw) - tilelib := &tileLibrary{taglib: taglib, includeNoCalls: cmd.includeNoCalls, skipOOO: cmd.skipOOO} + tilelib := &tileLibrary{taglib: taglib, retainNoCalls: cmd.saveIncompleteTiles, skipOOO: cmd.skipOOO} if cmd.outputTiles { cmd.encoder.Encode(LibraryEntry{TagSet: taglib.Tags()}) tilelib.encoder = cmd.encoder @@ -173,40 +160,108 @@ func (cmd *importer) RunCommand(prog string, args []string, stdin io.Reader, std if err != nil { return 1 } - err = output.Close() + err = outw.Close() if err != nil { return 1 } + if outf != nil && outf != outw { + err = outf.Close() + if err != nil { + return 1 + } + } return 0 } -func (cmd *importer) tileFasta(tilelib *tileLibrary, infile string) (tileSeq, error) { +func (cmd *importer) runBatches(stdout io.Writer, inputs []string) error { + if cmd.outputFile != "-" { + // Not yet implemented, but this should write + // the collection to an existing collection, + // possibly even an in-place update. + return errors.New("cannot specify output file in container mode: not implemented") + } + runner := arvadosContainerRunner{ + Name: "lightning import", + Client: arvadosClientFromEnv, + ProjectUUID: cmd.projectUUID, + APIAccess: true, + RAM: 350000000000, + VCPUs: 96, + Priority: cmd.priority, + KeepCache: 1, + } + err := runner.TranslatePaths(&cmd.tagLibraryFile, &cmd.refFile, &cmd.outputFile) + if err != nil { + return err + } + for i := range inputs { + err = runner.TranslatePaths(&inputs[i]) + if err != nil { + return err + } + } + + outputs, err := cmd.batchArgs.RunBatches(context.Background(), func(ctx context.Context, batch int) (string, error) { + runner := runner + if cmd.batches > 1 { + runner.Name += fmt.Sprintf(" (batch %d of %d)", batch, cmd.batches) + } + runner.Args = []string{"import", + "-local=true", + "-loglevel=" + cmd.loglevel, + "-pprof=:6061", + fmt.Sprintf("-skip-ooo=%v", cmd.skipOOO), + fmt.Sprintf("-output-tiles=%v", cmd.outputTiles), + fmt.Sprintf("-save-incomplete-tiles=%v", cmd.saveIncompleteTiles), + "-match-chromosome", cmd.matchChromosome.String(), + "-output-stats", "/mnt/output/stats.json", + "-tag-library", cmd.tagLibraryFile, + "-ref", cmd.refFile, + "-o", "/mnt/output/library.gob.gz", + } + runner.Args = append(runner.Args, cmd.batchArgs.Args(batch)...) + runner.Args = append(runner.Args, inputs...) + return runner.RunContext(ctx) + }) + if err != nil { + return err + } + var outfiles []string + for _, o := range outputs { + outfiles = append(outfiles, o+"/library.gob.gz") + } + fmt.Fprintln(stdout, strings.Join(outfiles, " ")) + return nil +} + +func (cmd *importer) tileFasta(tilelib *tileLibrary, infile string, isRef bool) (tileSeq, []importStats, error) { var input io.ReadCloser - input, err := os.Open(infile) + input, err := open(infile) if err != nil { - return nil, err + return nil, nil, err } defer input.Close() + input = ioutil.NopCloser(bufio.NewReaderSize(input, 8*1024*1024)) if strings.HasSuffix(infile, ".gz") { - input, err = gzip.NewReader(input) + input, err = pgzip.NewReader(input) if err != nil { - return nil, err + return nil, nil, err } defer input.Close() } - return tilelib.TileFasta(infile, input) + return tilelib.TileFasta(infile, input, cmd.matchChromosome, isRef) } func (cmd *importer) loadTagLibrary() (*tagLibrary, error) { log.Printf("tag library %s load starting", cmd.tagLibraryFile) - f, err := os.Open(cmd.tagLibraryFile) + f, err := open(cmd.tagLibraryFile) if err != nil { return nil, err } defer f.Close() - var rdr io.ReadCloser = f + rdr := ioutil.NopCloser(bufio.NewReaderSize(f, 64*1024*1024)) if strings.HasSuffix(cmd.tagLibraryFile, ".gz") { - rdr, err = gzip.NewReader(f) + rdr, err = gzip.NewReader(rdr) if err != nil { return nil, fmt.Errorf("%s: gzip: %s", cmd.tagLibraryFile, err) } @@ -226,8 +281,8 @@ func (cmd *importer) loadTagLibrary() (*tagLibrary, error) { var ( vcfFilenameRe = regexp.MustCompile(`\.vcf(\.gz)?$`) - fasta1FilenameRe = regexp.MustCompile(`\.1\.fa(sta)?(\.gz)?$`) - fasta2FilenameRe = regexp.MustCompile(`\.2\.fa(sta)?(\.gz)?$`) + fasta1FilenameRe = regexp.MustCompile(`\.1\.fa(sta)?(\.fa(sta)?)?(\.gz)?$`) + fasta2FilenameRe = regexp.MustCompile(`\.2\.fa(sta)?(\.fa(sta)?)?(\.gz)?$`) fastaFilenameRe = regexp.MustCompile(`\.fa(sta)?(\.gz)?$`) ) @@ -282,41 +337,45 @@ func (cmd *importer) tileInputs(tilelib *tileLibrary, infiles []string) error { starttime := time.Now() errs := make(chan error, 1) todo := make(chan func() error, len(infiles)*2) + allstats := make([][]importStats, len(infiles)*2) var encodeJobs sync.WaitGroup - for _, infile := range infiles { - infile := infile + for idx, infile := range infiles { + idx, infile := idx, infile var phases sync.WaitGroup phases.Add(2) variants := make([][]tileVariantID, 2) if fasta1FilenameRe.MatchString(infile) { todo <- func() error { defer phases.Done() - log.Printf("%s starting", infile) + log.Printf("%s (sample.1) starting tiling", infile) defer log.Printf("%s done", infile) - tseqs, err := cmd.tileFasta(tilelib, infile) + tseqs, stats, err := cmd.tileFasta(tilelib, infile, false) + allstats[idx*2] = stats var kept, dropped int variants[0], kept, dropped = tseqs.Variants() - log.Printf("%s found %d unique tags plus %d repeats", infile, kept, dropped) + log.Printf("%s (sample.1) found %d unique tags plus %d repeats", infile, kept, dropped) return err } - infile2 := fasta1FilenameRe.ReplaceAllString(infile, `.2.fa$1$2`) + infile2 := fasta1FilenameRe.ReplaceAllString(infile, `.2.fa$1$2$4`) todo <- func() error { defer phases.Done() - log.Printf("%s starting", infile2) + log.Printf("%s (sample.2) starting tiling", infile2) defer log.Printf("%s done", infile2) - tseqs, err := cmd.tileFasta(tilelib, infile2) + tseqs, stats, err := cmd.tileFasta(tilelib, infile2, false) + allstats[idx*2+1] = stats var kept, dropped int variants[1], kept, dropped = tseqs.Variants() - log.Printf("%s found %d unique tags plus %d repeats", infile2, kept, dropped) + log.Printf("%s (sample.2) found %d unique tags plus %d repeats", infile2, kept, dropped) return err } } else if fastaFilenameRe.MatchString(infile) { todo <- func() error { defer phases.Done() defer phases.Done() - log.Printf("%s starting", infile) + log.Printf("%s (reference) starting tiling", infile) defer log.Printf("%s done", infile) - tseqs, err := cmd.tileFasta(tilelib, infile) + tseqs, stats, err := cmd.tileFasta(tilelib, infile, true) + allstats[idx*2] = stats if err != nil { return err } @@ -324,7 +383,17 @@ func (cmd *importer) tileInputs(tilelib *tileLibrary, infiles []string) error { for _, tseq := range tseqs { totlen += len(tseq) } - log.Printf("%s tiled %d seqs, total len %d", infile, len(tseqs), totlen) + log.Printf("%s (reference) tiled %d seqs, total len %d", infile, len(tseqs), totlen) + + if cmd.retainAfterEncoding { + tilelib.mtx.Lock() + if tilelib.refseqs == nil { + tilelib.refseqs = map[string]map[string][]tileLibRef{} + } + tilelib.refseqs[infile] = tseqs + tilelib.mtx.Unlock() + } + return cmd.encoder.Encode(LibraryEntry{ CompactSequences: []CompactSequence{{Name: infile, TileSequences: tseqs}}, }) @@ -338,7 +407,8 @@ func (cmd *importer) tileInputs(tilelib *tileLibrary, infiles []string) error { defer phases.Done() log.Printf("%s phase %d starting", infile, phase+1) defer log.Printf("%s phase %d done", infile, phase+1) - tseqs, err := cmd.tileGVCF(tilelib, infile, phase) + tseqs, stats, err := cmd.tileGVCF(tilelib, infile, phase) + allstats[idx*2] = stats var kept, dropped int variants[phase], kept, dropped = tseqs.Variants() log.Printf("%s phase %d found %d unique tags plus %d repeats", infile, phase+1, kept, dropped) @@ -355,8 +425,9 @@ func (cmd *importer) tileInputs(tilelib *tileLibrary, infiles []string) error { if len(errs) > 0 { return } + variants := flatten(variants) err := cmd.encoder.Encode(LibraryEntry{ - CompactGenomes: []CompactGenome{{Name: infile, Variants: flatten(variants)}}, + CompactGenomes: []CompactGenome{{Name: infile, Variants: variants}}, }) if err != nil { select { @@ -364,12 +435,20 @@ func (cmd *importer) tileInputs(tilelib *tileLibrary, infiles []string) error { default: } } + if cmd.retainAfterEncoding { + tilelib.mtx.Lock() + if tilelib.compactGenomes == nil { + tilelib.compactGenomes = make(map[string][]tileVariantID) + } + tilelib.compactGenomes[infile] = variants + tilelib.mtx.Unlock() + } }() } go close(todo) var tileJobs sync.WaitGroup var running int64 - for i := 0; i < runtime.NumCPU()*9/8+1; i++ { + for i := 0; i < runtime.GOMAXPROCS(-1); i++ { tileJobs.Add(1) atomic.AddInt64(&running, 1) go func() { @@ -387,19 +466,49 @@ func (cmd *importer) tileInputs(tilelib *tileLibrary, infiles []string) error { } } remain := len(todo) + int(atomic.LoadInt64(&running)) - 1 - ttl := time.Now().Sub(starttime) * time.Duration(remain) / time.Duration(cap(todo)-remain) - eta := time.Now().Add(ttl) - log.Printf("progress %d/%d, eta %v (%v)", cap(todo)-remain, cap(todo), eta, ttl) + if remain < cap(todo) { + ttl := time.Now().Sub(starttime) * time.Duration(remain) / time.Duration(cap(todo)-remain) + eta := time.Now().Add(ttl) + log.Printf("progress %d/%d, eta %v (%v)", cap(todo)-remain, cap(todo), eta, ttl) + } } }() } tileJobs.Wait() + if len(errs) > 0 { + // Must not wait on encodeJobs in this case. If the + // tileJobs goroutines exited early, some funcs in + // todo haven't been called, so the corresponding + // encodeJobs will wait forever. + return <-errs + } encodeJobs.Wait() + go close(errs) - return <-errs + err := <-errs + if err != nil { + return err + } + + if cmd.outputStats != "" { + f, err := os.OpenFile(cmd.outputStats, os.O_CREATE|os.O_WRONLY, 0666) + if err != nil { + return err + } + var flatstats []importStats + for _, stats := range allstats { + flatstats = append(flatstats, stats...) + } + err = json.NewEncoder(f).Encode(flatstats) + if err != nil { + return err + } + } + + return nil } -func (cmd *importer) tileGVCF(tilelib *tileLibrary, infile string, phase int) (tileseq tileSeq, err error) { +func (cmd *importer) tileGVCF(tilelib *tileLibrary, infile string, phase int) (tileseq tileSeq, stats []importStats, err error) { if cmd.refFile == "" { err = errors.New("cannot import vcf: reference data (-ref) not specified") return @@ -431,7 +540,7 @@ func (cmd *importer) tileGVCF(tilelib *tileLibrary, infile string, phase int) (t return } defer consensus.Wait() - tileseq, err = tilelib.TileFasta(fmt.Sprintf("%s phase %d", infile, phase+1), stdout) + tileseq, stats, err = tilelib.TileFasta(fmt.Sprintf("%s phase %d", infile, phase+1), stdout, cmd.matchChromosome, false) if err != nil { return }