X-Git-Url: https://git.arvados.org/lightning.git/blobdiff_plain/5b9beb5a3f445af98c7b2f6844eeb8b00cbce0ea..fa0e5d14b6ae74ea740d84672bc6c759b1bc6a58:/tilelib.go diff --git a/tilelib.go b/tilelib.go index 22e2d700e2..df6cb73b3b 100644 --- a/tilelib.go +++ b/tilelib.go @@ -1,3 +1,7 @@ +// Copyright (C) The Lightning Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + package lightning import ( @@ -57,6 +61,7 @@ type tileLibrary struct { retainNoCalls bool skipOOO bool retainTileSequences bool + useDups bool taglib *tagLibrary variant [][][blake2b.Size256]byte @@ -67,6 +72,12 @@ type tileLibrary struct { variants int64 // if non-nil, write out any tile variants added while tiling encoder *gob.Encoder + // set Ref flag when writing new variants to encoder + encodeRef bool + + onAddTileVariant func(libref tileLibRef, hash [blake2b.Size256]byte, seq []byte) error + onAddGenome func(CompactGenome) error + onAddRefseq func(CompactSequence) error mtx sync.RWMutex vlock []sync.Locker @@ -112,12 +123,12 @@ func (tilelib *tileLibrary) loadTileVariants(tvs []TileVariant, variantmap map[t for _, tv := range tvs { // Assign a new variant ID (unique across all inputs) // for each input variant. - variantmap[tileLibRef{Tag: tv.Tag, Variant: tv.Variant}] = tilelib.getRef(tv.Tag, tv.Sequence).Variant + variantmap[tileLibRef{Tag: tv.Tag, Variant: tv.Variant}] = tilelib.getRef(tv.Tag, tv.Sequence, tv.Ref).Variant } return nil } -func (tilelib *tileLibrary) loadCompactGenomes(cgs []CompactGenome, variantmap map[tileLibRef]tileVariantID, onLoadGenome func(CompactGenome)) error { +func (tilelib *tileLibrary) loadCompactGenomes(cgs []CompactGenome, variantmap map[tileLibRef]tileVariantID) error { log.Debugf("loadCompactGenomes: %d", len(cgs)) var wg sync.WaitGroup errs := make(chan error, 1) @@ -146,8 +157,15 @@ func (tilelib *tileLibrary) loadCompactGenomes(cgs []CompactGenome, variantmap m // log.Tracef("loadCompactGenomes: cg %s tag %d variant %d => %d", cg.Name, tag, variant, newvariant) cg.Variants[i] = newvariant } - if onLoadGenome != nil { - onLoadGenome(cg) + if tilelib.onAddGenome != nil { + err := tilelib.onAddGenome(cg) + if err != nil { + select { + case errs <- err: + default: + } + return + } } if tilelib.encoder != nil { err := tilelib.encoder.Encode(LibraryEntry{ @@ -199,6 +217,12 @@ func (tilelib *tileLibrary) loadCompactSequences(cseqs []CompactSequence, varian return err } } + if tilelib.onAddRefseq != nil { + err := tilelib.onAddRefseq(cseq) + if err != nil { + return err + } + } log.Infof("loadCompactSequences: checking %s done", cseq.Name) } tilelib.mtx.Lock() @@ -213,36 +237,39 @@ func (tilelib *tileLibrary) loadCompactSequences(cseqs []CompactSequence, varian return nil } -func (tilelib *tileLibrary) LoadDir(ctx context.Context, path string, onLoadGenome func(CompactGenome)) error { +func allFiles(path string, re *regexp.Regexp) ([]string, error) { var files []string - var walk func(string) error - walk = func(path string) error { - f, err := open(path) - if err != nil { - return err - } - defer f.Close() - fis, err := f.Readdir(-1) - if err != nil { - files = append(files, path) - return nil - } - for _, fi := range fis { - if fi.Name() == "." || fi.Name() == ".." { - continue - } else if child := path + "/" + fi.Name(); fi.IsDir() { - err = walk(child) - if err != nil { - return err - } - } else if strings.HasSuffix(child, ".gob") || strings.HasSuffix(child, ".gob.gz") { - files = append(files, child) + f, err := open(path) + if err != nil { + return nil, err + } + defer f.Close() + fis, err := f.Readdir(-1) + if err != nil { + return []string{path}, nil + } + for _, fi := range fis { + if fi.Name() == "." || fi.Name() == ".." { + continue + } else if child := path + "/" + fi.Name(); fi.IsDir() { + add, err := allFiles(child, re) + if err != nil { + return nil, err } + files = append(files, add...) + } else if re == nil || re.MatchString(child) { + files = append(files, child) } - return nil } + sort.Strings(files) + return files, nil +} + +var matchGobFile = regexp.MustCompile(`\.gob(\.gz)?$`) + +func (tilelib *tileLibrary) LoadDir(ctx context.Context, path string) error { log.Infof("LoadDir: walk dir %s", path) - err := walk(path) + files, err := allFiles(path, matchGobFile) if err != nil { return err } @@ -286,7 +313,7 @@ func (tilelib *tileLibrary) LoadDir(ctx context.Context, path string, onLoadGeno mtx.Unlock() } for _, tv := range ent.TileVariants { - variantmap[tileLibRef{Tag: tv.Tag, Variant: tv.Variant}] = tilelib.getRef(tv.Tag, tv.Sequence).Variant + variantmap[tileLibRef{Tag: tv.Tag, Variant: tv.Variant}] = tilelib.getRef(tv.Tag, tv.Sequence, tv.Ref).Variant } cgs = append(cgs, ent.CompactGenomes...) cseqs = append(cseqs, ent.CompactSequences...) @@ -314,7 +341,7 @@ func (tilelib *tileLibrary) LoadDir(ctx context.Context, path string, onLoadGeno for _, cgs := range allcgs { flatcgs = append(flatcgs, cgs...) } - err = tilelib.loadCompactGenomes(flatcgs, allvariantmap, onLoadGenome) + err = tilelib.loadCompactGenomes(flatcgs, allvariantmap) if err != nil { return err } @@ -334,7 +361,8 @@ func (tilelib *tileLibrary) LoadDir(ctx context.Context, path string, onLoadGeno } func (tilelib *tileLibrary) WriteDir(dir string) error { - nfiles := 128 + len(tilelib.refseqs) + ntilefiles := 128 + nfiles := ntilefiles + len(tilelib.refseqs) files := make([]*os.File, nfiles) for i := range files { f, err := os.OpenFile(fmt.Sprintf("%s/library.%04d.gob.gz", dir, i), os.O_CREATE|os.O_WRONLY, 0666) @@ -382,7 +410,7 @@ func (tilelib *tileLibrary) WriteDir(dir string) error { errs <- err return } - if refidx := start - (nfiles - len(tilelib.refseqs)); refidx >= 0 { + if refidx := start - ntilefiles; refidx >= 0 { // write each ref to its own file // (they seem to load very slowly) name := refnames[refidx] @@ -392,7 +420,7 @@ func (tilelib *tileLibrary) WriteDir(dir string) error { }}}) return } - for i := start; i < len(cgnames); i += nfiles { + for i := start; i < len(cgnames); i += ntilefiles { err := encoders[start].Encode(LibraryEntry{CompactGenomes: []CompactGenome{{ Name: cgnames[i], Variants: tilelib.compactGenomes[cgnames[i]], @@ -403,7 +431,7 @@ func (tilelib *tileLibrary) WriteDir(dir string) error { } } tvs := []TileVariant{} - for tag := start; tag < len(tilelib.variant) && ctx.Err() == nil; tag += nfiles { + for tag := start; tag < len(tilelib.variant) && ctx.Err() == nil; tag += ntilefiles { tvs = tvs[:0] for idx, hash := range tilelib.variant[tag] { tvs = append(tvs, TileVariant{ @@ -450,9 +478,7 @@ func (tilelib *tileLibrary) WriteDir(dir string) error { // Load library data from rdr. Tile variants might be renumbered in // the process; in that case, genomes variants will be renumbered to // match. -// -// If onLoadGenome is non-nil, call it on each CompactGenome entry. -func (tilelib *tileLibrary) LoadGob(ctx context.Context, rdr io.Reader, gz bool, onLoadGenome func(CompactGenome)) error { +func (tilelib *tileLibrary) LoadGob(ctx context.Context, rdr io.Reader, gz bool) error { cgs := []CompactGenome{} cseqs := []CompactSequence{} variantmap := map[tileLibRef]tileVariantID{} @@ -476,7 +502,7 @@ func (tilelib *tileLibrary) LoadGob(ctx context.Context, rdr io.Reader, gz bool, if ctx.Err() != nil { return ctx.Err() } - err = tilelib.loadCompactGenomes(cgs, variantmap, onLoadGenome) + err = tilelib.loadCompactGenomes(cgs, variantmap) if err != nil { return err } @@ -518,15 +544,16 @@ func (tilelib *tileLibrary) dump(out io.Writer) { } type importStats struct { - InputFile string - InputLabel string - InputLength int - InputCoverage int - PathLength int - DroppedOutOfOrderTiles int + InputFile string + InputLabel string + InputLength int + InputCoverage int + PathLength int + DroppedRepeatedTags int + DroppedOutOfOrderTags int } -func (tilelib *tileLibrary) TileFasta(filelabel string, rdr io.Reader, matchChromosome *regexp.Regexp) (tileSeq, []importStats, error) { +func (tilelib *tileLibrary) TileFasta(filelabel string, rdr io.Reader, matchChromosome *regexp.Regexp, isRef bool) (tileSeq, []importStats, error) { ret := tileSeq{} type jobT struct { label string @@ -534,6 +561,7 @@ func (tilelib *tileLibrary) TileFasta(filelabel string, rdr io.Reader, matchChro } todo := make(chan jobT, 1) scanner := bufio.NewScanner(rdr) + scanner.Buffer(make([]byte, 256), 1<<29) // 512 MiB, in case fasta does not have line breaks go func() { defer close(todo) var fasta []byte @@ -579,14 +607,33 @@ func (tilelib *tileLibrary) TileFasta(filelabel string, rdr io.Reader, matchChro log.Warnf("%s %s no tags found", filelabel, job.label) } - skipped := 0 + droppedDup := 0 + if !tilelib.useDups { + // Remove any tags that appeared more than once + dup := map[tagID]bool{} + for _, ft := range found { + _, dup[ft.tagid] = dup[ft.tagid] + } + dst := 0 + for _, ft := range found { + if !dup[ft.tagid] { + found[dst] = ft + dst++ + } + } + droppedDup = len(found) - dst + log.Infof("%s %s dropping %d non-unique tags", filelabel, job.label, droppedDup) + found = found[:dst] + } + + droppedOOO := 0 if tilelib.skipOOO { - log.Infof("%s %s keeping longest increasing subsequence", filelabel, job.label) keep := longestIncreasingSubsequence(len(found), func(i int) int { return int(found[i].tagid) }) for i, x := range keep { found[i] = found[x] } - skipped = len(found) - len(keep) + droppedOOO = len(found) - len(keep) + log.Infof("%s %s dropping %d out-of-order tags", filelabel, job.label, droppedOOO) found = found[:len(keep)] } @@ -610,7 +657,7 @@ func (tilelib *tileLibrary) TileFasta(filelabel string, rdr io.Reader, matchChro } else { endpos = found[i+1].pos + taglen } - path[i] = tilelib.getRef(f.tagid, job.fasta[startpos:endpos]) + path[i] = tilelib.getRef(f.tagid, job.fasta[startpos:endpos], isRef) if countBases(job.fasta[startpos:endpos]) != endpos-startpos { atomic.AddInt64(&lowquality, 1) } @@ -625,14 +672,15 @@ func (tilelib *tileLibrary) TileFasta(filelabel string, rdr io.Reader, matchChro ret[job.label] = pathcopy basesIn := countBases(job.fasta) - log.Infof("%s %s fasta in %d coverage in %d path len %d low-quality %d skipped-out-of-order %d", filelabel, job.label, len(job.fasta), basesIn, len(path), lowquality, skipped) + log.Infof("%s %s fasta in %d coverage in %d path len %d low-quality %d", filelabel, job.label, len(job.fasta), basesIn, len(path), lowquality) stats = append(stats, importStats{ - InputFile: filelabel, - InputLabel: job.label, - InputLength: len(job.fasta), - InputCoverage: basesIn, - PathLength: len(path), - DroppedOutOfOrderTiles: skipped, + InputFile: filelabel, + InputLabel: job.label, + InputLength: len(job.fasta), + InputCoverage: basesIn, + PathLength: len(path), + DroppedOutOfOrderTags: droppedOOO, + DroppedRepeatedTags: droppedDup, }) totalPathLen += len(path) @@ -647,7 +695,7 @@ func (tilelib *tileLibrary) Len() int64 { // Return a tileLibRef for a tile with the given tag and sequence, // adding the sequence to the library if needed. -func (tilelib *tileLibrary) getRef(tag tagID, seq []byte) tileLibRef { +func (tilelib *tileLibrary) getRef(tag tagID, seq []byte, usedByRef bool) tileLibRef { dropSeq := false if !tilelib.retainNoCalls { for _, b := range seq { @@ -759,21 +807,25 @@ func (tilelib *tileLibrary) getRef(tag tagID, seq []byte) tileLibRef { locker.Unlock() } + saveSeq := seq + if dropSeq { + // Save the hash, but not the sequence + saveSeq = nil + } if tilelib.encoder != nil { - saveSeq := seq - if dropSeq { - // Save the hash, but not the sequence - saveSeq = nil - } tilelib.encoder.Encode(LibraryEntry{ TileVariants: []TileVariant{{ Tag: tag, + Ref: usedByRef, Variant: variant, Blake2b: seqhash, Sequence: saveSeq, }}, }) } + if tilelib.onAddTileVariant != nil { + tilelib.onAddTileVariant(tileLibRef{tag, variant}, seqhash, saveSeq) + } return tileLibRef{Tag: tag, Variant: variant} }