X-Git-Url: https://git.arvados.org/lightning.git/blobdiff_plain/1da1956511f6716c72631ec413a4c2cd0f561dd6..HEAD:/import.go diff --git a/import.go b/import.go index cea24f2785..20058dc8a9 100644 --- a/import.go +++ b/import.go @@ -1,4 +1,8 @@ -package main +// Copyright (C) The Lightning Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + +package lightning import ( "bufio" @@ -10,6 +14,7 @@ import ( "flag" "fmt" "io" + "io/ioutil" "net/http" _ "net/http/pprof" "os" @@ -23,13 +28,8 @@ import ( "sync/atomic" "time" - "git.arvados.org/arvados.git/sdk/go/arvados" - "github.com/lucasb-eyer/go-colorful" + "github.com/klauspost/pgzip" log "github.com/sirupsen/logrus" - "gonum.org/v1/plot" - "gonum.org/v1/plot/plotter" - "gonum.org/v1/plot/vg" - "gonum.org/v1/plot/vg/draw" ) type importer struct { @@ -46,6 +46,7 @@ type importer struct { outputStats string matchChromosome *regexp.Regexp encoder *gob.Encoder + retainAfterEncoding bool // keep imported genomes/refseqs in memory after writing to disk batchArgs } @@ -132,12 +133,12 @@ func (cmd *importer) RunCommand(prog string, args []string, stdin io.Reader, std } defer outf.Close() if strings.HasSuffix(cmd.outputFile, ".gz") { - outw = gzip.NewWriter(outf) + outw = pgzip.NewWriter(outf) } else { outw = outf } } - bufw := bufio.NewWriter(outw) + bufw := bufio.NewWriterSize(outw, 64*1024*1024) cmd.encoder = gob.NewEncoder(bufw) tilelib := &tileLibrary{taglib: taglib, retainNoCalls: cmd.saveIncompleteTiles, skipOOO: cmd.skipOOO} @@ -179,14 +180,15 @@ func (cmd *importer) runBatches(stdout io.Writer, inputs []string) error { // possibly even an in-place update. return errors.New("cannot specify output file in container mode: not implemented") } - client := arvados.NewClientFromEnv() runner := arvadosContainerRunner{ Name: "lightning import", - Client: client, + Client: arvadosClientFromEnv, ProjectUUID: cmd.projectUUID, - RAM: 80000000000, - VCPUs: 32, + APIAccess: true, + RAM: 350000000000, + VCPUs: 96, Priority: cmd.priority, + KeepCache: 1, } err := runner.TranslatePaths(&cmd.tagLibraryFile, &cmd.refFile, &cmd.outputFile) if err != nil { @@ -207,6 +209,7 @@ func (cmd *importer) runBatches(stdout io.Writer, inputs []string) error { runner.Args = []string{"import", "-local=true", "-loglevel=" + cmd.loglevel, + "-pprof=:6061", fmt.Sprintf("-skip-ooo=%v", cmd.skipOOO), fmt.Sprintf("-output-tiles=%v", cmd.outputTiles), fmt.Sprintf("-save-incomplete-tiles=%v", cmd.saveIncompleteTiles), @@ -231,33 +234,34 @@ func (cmd *importer) runBatches(stdout io.Writer, inputs []string) error { return nil } -func (cmd *importer) tileFasta(tilelib *tileLibrary, infile string) (tileSeq, []importStats, error) { +func (cmd *importer) tileFasta(tilelib *tileLibrary, infile string, isRef bool) (tileSeq, []importStats, error) { var input io.ReadCloser - input, err := os.Open(infile) + input, err := open(infile) if err != nil { return nil, nil, err } defer input.Close() + input = ioutil.NopCloser(bufio.NewReaderSize(input, 8*1024*1024)) if strings.HasSuffix(infile, ".gz") { - input, err = gzip.NewReader(input) + input, err = pgzip.NewReader(input) if err != nil { return nil, nil, err } defer input.Close() } - return tilelib.TileFasta(infile, input, cmd.matchChromosome) + return tilelib.TileFasta(infile, input, cmd.matchChromosome, isRef) } func (cmd *importer) loadTagLibrary() (*tagLibrary, error) { log.Printf("tag library %s load starting", cmd.tagLibraryFile) - f, err := os.Open(cmd.tagLibraryFile) + f, err := open(cmd.tagLibraryFile) if err != nil { return nil, err } defer f.Close() - var rdr io.ReadCloser = f + rdr := ioutil.NopCloser(bufio.NewReaderSize(f, 64*1024*1024)) if strings.HasSuffix(cmd.tagLibraryFile, ".gz") { - rdr, err = gzip.NewReader(f) + rdr, err = gzip.NewReader(rdr) if err != nil { return nil, fmt.Errorf("%s: gzip: %s", cmd.tagLibraryFile, err) } @@ -277,8 +281,8 @@ func (cmd *importer) loadTagLibrary() (*tagLibrary, error) { var ( vcfFilenameRe = regexp.MustCompile(`\.vcf(\.gz)?$`) - fasta1FilenameRe = regexp.MustCompile(`\.1\.fa(sta)?(\.gz)?$`) - fasta2FilenameRe = regexp.MustCompile(`\.2\.fa(sta)?(\.gz)?$`) + fasta1FilenameRe = regexp.MustCompile(`\.1\.fa(sta)?(\.fa(sta)?)?(\.gz)?$`) + fasta2FilenameRe = regexp.MustCompile(`\.2\.fa(sta)?(\.fa(sta)?)?(\.gz)?$`) fastaFilenameRe = regexp.MustCompile(`\.fa(sta)?(\.gz)?$`) ) @@ -343,35 +347,34 @@ func (cmd *importer) tileInputs(tilelib *tileLibrary, infiles []string) error { if fasta1FilenameRe.MatchString(infile) { todo <- func() error { defer phases.Done() - log.Printf("%s starting", infile) + log.Printf("%s (sample.1) starting tiling", infile) defer log.Printf("%s done", infile) - tseqs, stats, err := cmd.tileFasta(tilelib, infile) + tseqs, stats, err := cmd.tileFasta(tilelib, infile, false) allstats[idx*2] = stats var kept, dropped int variants[0], kept, dropped = tseqs.Variants() - log.Printf("%s found %d unique tags plus %d repeats", infile, kept, dropped) + log.Printf("%s (sample.1) found %d unique tags plus %d repeats", infile, kept, dropped) return err } - infile2 := fasta1FilenameRe.ReplaceAllString(infile, `.2.fa$1$2`) + infile2 := fasta1FilenameRe.ReplaceAllString(infile, `.2.fa$1$2$4`) todo <- func() error { defer phases.Done() - log.Printf("%s starting", infile2) + log.Printf("%s (sample.2) starting tiling", infile2) defer log.Printf("%s done", infile2) - tseqs, stats, err := cmd.tileFasta(tilelib, infile2) + tseqs, stats, err := cmd.tileFasta(tilelib, infile2, false) allstats[idx*2+1] = stats var kept, dropped int variants[1], kept, dropped = tseqs.Variants() - log.Printf("%s found %d unique tags plus %d repeats", infile2, kept, dropped) - + log.Printf("%s (sample.2) found %d unique tags plus %d repeats", infile2, kept, dropped) return err } } else if fastaFilenameRe.MatchString(infile) { todo <- func() error { defer phases.Done() defer phases.Done() - log.Printf("%s starting", infile) + log.Printf("%s (reference) starting tiling", infile) defer log.Printf("%s done", infile) - tseqs, stats, err := cmd.tileFasta(tilelib, infile) + tseqs, stats, err := cmd.tileFasta(tilelib, infile, true) allstats[idx*2] = stats if err != nil { return err @@ -380,7 +383,17 @@ func (cmd *importer) tileInputs(tilelib *tileLibrary, infiles []string) error { for _, tseq := range tseqs { totlen += len(tseq) } - log.Printf("%s tiled %d seqs, total len %d", infile, len(tseqs), totlen) + log.Printf("%s (reference) tiled %d seqs, total len %d", infile, len(tseqs), totlen) + + if cmd.retainAfterEncoding { + tilelib.mtx.Lock() + if tilelib.refseqs == nil { + tilelib.refseqs = map[string]map[string][]tileLibRef{} + } + tilelib.refseqs[infile] = tseqs + tilelib.mtx.Unlock() + } + return cmd.encoder.Encode(LibraryEntry{ CompactSequences: []CompactSequence{{Name: infile, TileSequences: tseqs}}, }) @@ -412,8 +425,9 @@ func (cmd *importer) tileInputs(tilelib *tileLibrary, infiles []string) error { if len(errs) > 0 { return } + variants := flatten(variants) err := cmd.encoder.Encode(LibraryEntry{ - CompactGenomes: []CompactGenome{{Name: infile, Variants: flatten(variants)}}, + CompactGenomes: []CompactGenome{{Name: infile, Variants: variants}}, }) if err != nil { select { @@ -421,12 +435,20 @@ func (cmd *importer) tileInputs(tilelib *tileLibrary, infiles []string) error { default: } } + if cmd.retainAfterEncoding { + tilelib.mtx.Lock() + if tilelib.compactGenomes == nil { + tilelib.compactGenomes = make(map[string][]tileVariantID) + } + tilelib.compactGenomes[infile] = variants + tilelib.mtx.Unlock() + } }() } go close(todo) var tileJobs sync.WaitGroup var running int64 - for i := 0; i < runtime.NumCPU()*9/8+1; i++ { + for i := 0; i < runtime.GOMAXPROCS(-1); i++ { tileJobs.Add(1) atomic.AddInt64(&running, 1) go func() { @@ -453,6 +475,13 @@ func (cmd *importer) tileInputs(tilelib *tileLibrary, infiles []string) error { }() } tileJobs.Wait() + if len(errs) > 0 { + // Must not wait on encodeJobs in this case. If the + // tileJobs goroutines exited early, some funcs in + // todo haven't been called, so the corresponding + // encodeJobs will wait forever. + return <-errs + } encodeJobs.Wait() go close(errs) @@ -511,7 +540,7 @@ func (cmd *importer) tileGVCF(tilelib *tileLibrary, infile string, phase int) (t return } defer consensus.Wait() - tileseq, stats, err = tilelib.TileFasta(fmt.Sprintf("%s phase %d", infile, phase+1), stdout, cmd.matchChromosome) + tileseq, stats, err = tilelib.TileFasta(fmt.Sprintf("%s phase %d", infile, phase+1), stdout, cmd.matchChromosome, false) if err != nil { return } @@ -544,73 +573,3 @@ func flatten(variants [][]tileVariantID) []tileVariantID { } return flat } - -type importstatsplot struct{} - -func (cmd *importstatsplot) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int { - err := cmd.Plot(stdin, stdout) - if err != nil { - log.Errorf("%s", err) - return 1 - } - return 0 -} - -func (cmd *importstatsplot) Plot(stdin io.Reader, stdout io.Writer) error { - var stats []importStats - err := json.NewDecoder(stdin).Decode(&stats) - if err != nil { - return err - } - - p, err := plot.New() - if err != nil { - return err - } - p.Title.Text = "coverage preserved by import (excl X<0.65)" - p.X.Label.Text = "input base calls ÷ sequence length" - p.Y.Label.Text = "output base calls ÷ input base calls" - p.Add(plotter.NewGrid()) - - data := map[string]plotter.XYs{} - for _, stat := range stats { - data[stat.InputLabel] = append(data[stat.InputLabel], plotter.XY{ - X: float64(stat.InputCoverage) / float64(stat.InputLength), - Y: float64(stat.TileCoverage) / float64(stat.InputCoverage), - }) - } - - labels := []string{} - for label := range data { - labels = append(labels, label) - } - sort.Strings(labels) - palette, err := colorful.SoftPalette(len(labels)) - if err != nil { - return err - } - nextInPalette := 0 - for idx, label := range labels { - s, err := plotter.NewScatter(data[label]) - if err != nil { - return err - } - s.GlyphStyle.Color = palette[idx] - s.GlyphStyle.Radius = vg.Millimeter / 2 - s.GlyphStyle.Shape = draw.CrossGlyph{} - nextInPalette += 7 - p.Add(s) - if false { - p.Legend.Add(label, s) - } - } - p.X.Min = 0.65 - p.X.Max = 1 - - w, err := p.WriterTo(8*vg.Inch, 6*vg.Inch, "svg") - if err != nil { - return err - } - _, err = w.WriteTo(stdout) - return err -}