X-Git-Url: https://git.arvados.org/lightning.git/blobdiff_plain/bcdc8e84b9b072d3ee0d509fc803eb0e9174bcfe..de10f554682eecf3926b409c70f9685901d11272:/tilelib.go diff --git a/tilelib.go b/tilelib.go index 522159dede..926c0324a2 100644 --- a/tilelib.go +++ b/tilelib.go @@ -253,7 +253,6 @@ type importStats struct { InputLabel string InputLength int InputCoverage int - TileCoverage int PathLength int DroppedOutOfOrderTiles int } @@ -283,15 +282,15 @@ func (tilelib *tileLibrary) TileFasta(filelabel string, rdr io.Reader, matchChro todo <- jobT{seqlabel, fasta} }() type foundtag struct { - pos int - tagid tagID - taglen int + pos int + tagid tagID } found := make([]foundtag, 2000000) path := make([]tileLibRef, 2000000) totalFoundTags := 0 totalPathLen := 0 skippedSequences := 0 + taglen := tilelib.taglib.TagLen() var stats []importStats for job := range todo { if len(job.fasta) == 0 { @@ -304,15 +303,16 @@ func (tilelib *tileLibrary) TileFasta(filelabel string, rdr io.Reader, matchChro found = found[:0] tilelib.taglib.FindAll(job.fasta, func(tagid tagID, pos, taglen int) { - found = append(found, foundtag{pos: pos, tagid: tagid, taglen: taglen}) + found = append(found, foundtag{pos: pos, tagid: tagid}) }) totalFoundTags += len(found) + if len(found) == 0 { + log.Warnf("%s %s no tags found", filelabel, job.label) + } - basesOut := 0 skipped := 0 - path = path[:0] - last := foundtag{tagid: -1} if tilelib.skipOOO { + log.Infof("%s %s keeping longest increasing subsequence", filelabel, job.label) keep := longestIncreasingSubsequence(len(found), func(i int) int { return int(found[i].tagid) }) for i, x := range keep { found[i] = found[x] @@ -320,56 +320,44 @@ func (tilelib *tileLibrary) TileFasta(filelabel string, rdr io.Reader, matchChro skipped = len(found) - len(keep) found = found[:len(keep)] } + + log.Infof("%s %s getting %d librefs", filelabel, job.label, len(found)) + throttle := &throttle{Max: runtime.NumCPU()} + path = path[:len(found)] for i, f := range found { - log.Tracef("%s %s found[%d] == %#v", filelabel, job.label, i, f) - if last.tagid < 0 { - // first tag in sequence - last = foundtag{tagid: f.tagid} - continue - } - libref := tilelib.getRef(last.tagid, job.fasta[last.pos:f.pos+f.taglen]) - path = append(path, libref) - if libref.Variant > 0 { - // Count output coverage from - // the end of the previous tag - // (if any) to the end of the - // current tag, IOW don't - // double-count coverage for - // the previous tag. - basesOut += countBases(job.fasta[last.pos+last.taglen : f.pos+f.taglen]) - } else { - // If we dropped this tile - // (because !retainNoCalls), - // set taglen=0 so the - // overlapping tag is counted - // toward coverage on the - // following tile. - f.taglen = 0 - } - last = f - } - if last.tagid < 0 { - log.Warnf("%s %s no tags found", filelabel, job.label) - } else { - libref := tilelib.getRef(last.tagid, job.fasta[last.pos:]) - path = append(path, libref) - if libref.Variant > 0 { - basesOut += countBases(job.fasta[last.pos+last.taglen:]) - } + i, f := i, f + throttle.Acquire() + go func() { + defer throttle.Release() + var startpos, endpos int + if i == 0 { + startpos = 0 + } else { + startpos = f.pos + } + if i == len(found)-1 { + endpos = len(job.fasta) + } else { + endpos = found[i+1].pos + taglen + } + path[i] = tilelib.getRef(f.tagid, job.fasta[startpos:endpos]) + }() } + throttle.Wait() + + log.Infof("%s %s copying path", filelabel, job.label) pathcopy := make([]tileLibRef, len(path)) copy(pathcopy, path) ret[job.label] = pathcopy basesIn := countBases(job.fasta) - log.Infof("%s %s fasta in %d coverage in %d coverage out %d path len %d skipped %d", filelabel, job.label, len(job.fasta), basesIn, basesOut, len(path), skipped) + log.Infof("%s %s fasta in %d coverage in %d path len %d skipped-out-of-order %d", filelabel, job.label, len(job.fasta), basesIn, len(path), skipped) stats = append(stats, importStats{ InputFile: filelabel, InputLabel: job.label, InputLength: len(job.fasta), InputCoverage: basesIn, - TileCoverage: basesOut, PathLength: len(path), DroppedOutOfOrderTiles: skipped, }) @@ -397,36 +385,42 @@ func (tilelib *tileLibrary) getRef(tag tagID, seq []byte) tileLibRef { } } seqhash := blake2b.Sum256(seq) + var vlock sync.Locker + tilelib.mtx.RLock() - if int(tag) < len(tilelib.variant) { + if len(tilelib.vlock) > int(tag) { + vlock = tilelib.vlock[tag] + } + tilelib.mtx.RUnlock() + + if vlock != nil { + vlock.Lock() for i, varhash := range tilelib.variant[tag] { if varhash == seqhash { - tilelib.mtx.RUnlock() + vlock.Unlock() return tileLibRef{Tag: tag, Variant: tileVariantID(i + 1)} } } - } - var vlock sync.Locker - if len(tilelib.vlock) > int(tag) { - vlock = tilelib.vlock[tag] - } - tilelib.mtx.RUnlock() - if vlock == nil { + vlock.Unlock() + } else { tilelib.mtx.Lock() if tilelib.variant == nil && tilelib.taglib != nil { tilelib.variant = make([][][blake2b.Size256]byte, tilelib.taglib.Len()) tilelib.vlock = make([]sync.Locker, tilelib.taglib.Len()) for i := range tilelib.vlock { - tilelib.vlock[i] = &sync.Mutex{} + tilelib.vlock[i] = new(sync.Mutex) } } if int(tag) >= len(tilelib.variant) { - oldlen := len(tilelib.variant) + oldlen := len(tilelib.vlock) + for i := 0; i < oldlen; i++ { + tilelib.vlock[i].Lock() + } // If we haven't seen the tag library yet (as // in a merge), tilelib.taglib.Len() is // zero. We can still behave correctly, we - // just need to expand the tilelib.variant - // slice as needed. + // just need to expand the tilelib.variant and + // tilelib.vlock slices as needed. if int(tag) >= cap(tilelib.variant) { // Allocate 2x capacity. newslice := make([][][blake2b.Size256]byte, int(tag)+1, (int(tag)+1)*2) @@ -441,34 +435,37 @@ func (tilelib *tileLibrary) getRef(tag tagID, seq []byte) tileLibRef { tilelib.variant = tilelib.variant[:int(tag)+1] tilelib.vlock = tilelib.vlock[:int(tag)+1] } - for i := oldlen; i < len(tilelib.variant); i++ { - tilelib.vlock[i] = &sync.Mutex{} + for i := oldlen; i < len(tilelib.vlock); i++ { + tilelib.vlock[i] = new(sync.Mutex) + } + for i := 0; i < oldlen; i++ { + tilelib.vlock[i].Unlock() } } vlock = tilelib.vlock[tag] tilelib.mtx.Unlock() } - tilelib.mtx.RLock() vlock.Lock() for i, varhash := range tilelib.variant[tag] { if varhash == seqhash { vlock.Unlock() - tilelib.mtx.RUnlock() return tileLibRef{Tag: tag, Variant: tileVariantID(i + 1)} } } atomic.AddInt64(&tilelib.variants, 1) tilelib.variant[tag] = append(tilelib.variant[tag], seqhash) + variant := tileVariantID(len(tilelib.variant[tag])) + vlock.Unlock() + if tilelib.retainTileSequences && !dropSeq { + tilelib.mtx.Lock() if tilelib.seq == nil { tilelib.seq = map[[blake2b.Size256]byte][]byte{} } tilelib.seq[seqhash] = append([]byte(nil), seq...) + tilelib.mtx.Unlock() } - variant := tileVariantID(len(tilelib.variant[tag])) - vlock.Unlock() - tilelib.mtx.RUnlock() if tilelib.encoder != nil { saveSeq := seq