X-Git-Url: https://git.arvados.org/lightning.git/blobdiff_plain/69b71af4136fdaeeb5c2afbc559208dc5f428c48..00b2acd54dd1aa412f6f2bddc24b1bbb31c7ae3f:/tilelib.go diff --git a/tilelib.go b/tilelib.go index 5ad12e7228..455a526215 100644 --- a/tilelib.go +++ b/tilelib.go @@ -1,4 +1,4 @@ -package main +package lightning import ( "bufio" @@ -7,10 +7,12 @@ import ( "encoding/gob" "fmt" "io" + "regexp" "runtime" "sort" "strings" "sync" + "sync/atomic" log "github.com/sirupsen/logrus" "golang.org/x/crypto/blake2b" @@ -60,11 +62,12 @@ type tileLibrary struct { compactGenomes map[string][]tileVariantID // count [][]int seq map[[blake2b.Size256]byte][]byte - variants int + variants int64 // if non-nil, write out any tile variants added while tiling encoder *gob.Encoder - mtx sync.Mutex + mtx sync.RWMutex + vlock []sync.Locker } func (tilelib *tileLibrary) loadTagSet(newtagset [][]byte) error { @@ -250,18 +253,17 @@ type importStats struct { InputLabel string InputLength int InputCoverage int - TileCoverage int PathLength int DroppedOutOfOrderTiles int } -func (tilelib *tileLibrary) TileFasta(filelabel string, rdr io.Reader) (tileSeq, []importStats, error) { +func (tilelib *tileLibrary) TileFasta(filelabel string, rdr io.Reader, matchChromosome *regexp.Regexp) (tileSeq, []importStats, error) { ret := tileSeq{} type jobT struct { label string fasta []byte } - todo := make(chan jobT) + todo := make(chan jobT, 1) scanner := bufio.NewScanner(rdr) go func() { defer close(todo) @@ -270,8 +272,8 @@ func (tilelib *tileLibrary) TileFasta(filelabel string, rdr io.Reader) (tileSeq, for scanner.Scan() { buf := scanner.Bytes() if len(buf) > 0 && buf[0] == '>' { - todo <- jobT{seqlabel, fasta} - seqlabel, fasta = strings.SplitN(string(buf[1:]), " ", 2)[0], nil + todo <- jobT{seqlabel, append([]byte(nil), fasta...)} + seqlabel, fasta = strings.SplitN(string(buf[1:]), " ", 2)[0], fasta[:0] log.Debugf("%s %s reading fasta", filelabel, seqlabel) } else { fasta = append(fasta, bytes.ToLower(buf)...) @@ -280,20 +282,20 @@ func (tilelib *tileLibrary) TileFasta(filelabel string, rdr io.Reader) (tileSeq, todo <- jobT{seqlabel, fasta} }() type foundtag struct { - pos int - tagid tagID - taglen int + pos int + tagid tagID } found := make([]foundtag, 2000000) path := make([]tileLibRef, 2000000) totalFoundTags := 0 totalPathLen := 0 skippedSequences := 0 - stats := make([]importStats, 0, len(todo)) + taglen := tilelib.taglib.TagLen() + var stats []importStats for job := range todo { if len(job.fasta) == 0 { continue - } else if strings.Contains(job.label, "_") { + } else if !matchChromosome.MatchString(job.label) { skippedSequences++ continue } @@ -301,15 +303,16 @@ func (tilelib *tileLibrary) TileFasta(filelabel string, rdr io.Reader) (tileSeq, found = found[:0] tilelib.taglib.FindAll(job.fasta, func(tagid tagID, pos, taglen int) { - found = append(found, foundtag{pos: pos, tagid: tagid, taglen: taglen}) + found = append(found, foundtag{pos: pos, tagid: tagid}) }) totalFoundTags += len(found) + if len(found) == 0 { + log.Warnf("%s %s no tags found", filelabel, job.label) + } - basesOut := 0 skipped := 0 - path = path[:0] - last := foundtag{tagid: -1} if tilelib.skipOOO { + log.Infof("%s %s keeping longest increasing subsequence", filelabel, job.label) keep := longestIncreasingSubsequence(len(found), func(i int) int { return int(found[i].tagid) }) for i, x := range keep { found[i] = found[x] @@ -317,70 +320,60 @@ func (tilelib *tileLibrary) TileFasta(filelabel string, rdr io.Reader) (tileSeq, skipped = len(found) - len(keep) found = found[:len(keep)] } + + log.Infof("%s %s getting %d librefs", filelabel, job.label, len(found)) + throttle := &throttle{Max: runtime.NumCPU()} + path = path[:len(found)] + var lowquality int64 for i, f := range found { - log.Tracef("%s %s found[%d] == %#v", filelabel, job.label, i, f) - if last.tagid < 0 { - // first tag in sequence - last = foundtag{tagid: f.tagid} - continue - } - libref := tilelib.getRef(last.tagid, job.fasta[last.pos:f.pos+f.taglen]) - path = append(path, libref) - if libref.Variant > 0 { - // Count output coverage from - // the end of the previous tag - // (if any) to the end of the - // current tag, IOW don't - // double-count coverage for - // the previous tag. - basesOut += countBases(job.fasta[last.pos+last.taglen : f.pos+f.taglen]) - } else { - // If we dropped this tile - // (because !retainNoCalls), - // set taglen=0 so the - // overlapping tag is counted - // toward coverage on the - // following tile. - f.taglen = 0 - } - last = f - } - if last.tagid < 0 { - log.Warnf("%s %s no tags found", filelabel, job.label) - } else { - libref := tilelib.getRef(last.tagid, job.fasta[last.pos:]) - path = append(path, libref) - if libref.Variant > 0 { - basesOut += countBases(job.fasta[last.pos+last.taglen:]) - } + i, f := i, f + throttle.Acquire() + go func() { + defer throttle.Release() + var startpos, endpos int + if i == 0 { + startpos = 0 + } else { + startpos = f.pos + } + if i == len(found)-1 { + endpos = len(job.fasta) + } else { + endpos = found[i+1].pos + taglen + } + path[i] = tilelib.getRef(f.tagid, job.fasta[startpos:endpos]) + if countBases(job.fasta[startpos:endpos]) != endpos-startpos { + atomic.AddInt64(&lowquality, 1) + } + }() } + throttle.Wait() + + log.Infof("%s %s copying path", filelabel, job.label) pathcopy := make([]tileLibRef, len(path)) copy(pathcopy, path) ret[job.label] = pathcopy basesIn := countBases(job.fasta) - log.Infof("%s %s fasta in %d coverage in %d coverage out %d path len %d skipped %d", filelabel, job.label, len(job.fasta), basesIn, basesOut, len(path), skipped) + log.Infof("%s %s fasta in %d coverage in %d path len %d low-quality %d skipped-out-of-order %d", filelabel, job.label, len(job.fasta), basesIn, len(path), lowquality, skipped) stats = append(stats, importStats{ InputFile: filelabel, InputLabel: job.label, InputLength: len(job.fasta), InputCoverage: basesIn, - TileCoverage: basesOut, PathLength: len(path), DroppedOutOfOrderTiles: skipped, }) totalPathLen += len(path) } - log.Printf("%s tiled with total path len %d in %d sequences (skipped %d sequences with '_' in name, skipped %d out-of-order tags)", filelabel, totalPathLen, len(ret), skippedSequences, totalFoundTags-totalPathLen) + log.Printf("%s tiled with total path len %d in %d sequences (skipped %d sequences that did not match chromosome regexp, skipped %d out-of-order tags)", filelabel, totalPathLen, len(ret), skippedSequences, totalFoundTags-totalPathLen) return ret, stats, scanner.Err() } -func (tilelib *tileLibrary) Len() int { - tilelib.mtx.Lock() - defer tilelib.mtx.Unlock() - return tilelib.variants +func (tilelib *tileLibrary) Len() int64 { + return atomic.LoadInt64(&tilelib.variants) } // Return a tileLibRef for a tile with the given tag and sequence, @@ -395,43 +388,88 @@ func (tilelib *tileLibrary) getRef(tag tagID, seq []byte) tileLibRef { } } } - tilelib.mtx.Lock() - if tilelib.variant == nil && tilelib.taglib != nil { - tilelib.variant = make([][][blake2b.Size256]byte, tilelib.taglib.Len()) + seqhash := blake2b.Sum256(seq) + var vlock sync.Locker + + tilelib.mtx.RLock() + if len(tilelib.vlock) > int(tag) { + vlock = tilelib.vlock[tag] } - if int(tag) >= len(tilelib.variant) { - // If we haven't seen the tag library yet (as in a - // merge), tilelib.taglib.Len() is zero. We can still - // behave correctly, we just need to expand the - // tilelib.variant slice as needed. - if int(tag) >= cap(tilelib.variant) { - // Allocate 2x capacity. - newslice := make([][][blake2b.Size256]byte, int(tag)+1, (int(tag)+1)*2) - copy(newslice, tilelib.variant) - tilelib.variant = newslice[:int(tag)+1] - } else { - // Use previously allocated capacity, avoiding - // copy. - tilelib.variant = tilelib.variant[:int(tag)+1] + tilelib.mtx.RUnlock() + + if vlock != nil { + vlock.Lock() + for i, varhash := range tilelib.variant[tag] { + if varhash == seqhash { + vlock.Unlock() + return tileLibRef{Tag: tag, Variant: tileVariantID(i + 1)} + } + } + vlock.Unlock() + } else { + tilelib.mtx.Lock() + if tilelib.variant == nil && tilelib.taglib != nil { + tilelib.variant = make([][][blake2b.Size256]byte, tilelib.taglib.Len()) + tilelib.vlock = make([]sync.Locker, tilelib.taglib.Len()) + for i := range tilelib.vlock { + tilelib.vlock[i] = new(sync.Mutex) + } + } + if int(tag) >= len(tilelib.variant) { + oldlen := len(tilelib.vlock) + for i := 0; i < oldlen; i++ { + tilelib.vlock[i].Lock() + } + // If we haven't seen the tag library yet (as + // in a merge), tilelib.taglib.Len() is + // zero. We can still behave correctly, we + // just need to expand the tilelib.variant and + // tilelib.vlock slices as needed. + if int(tag) >= cap(tilelib.variant) { + // Allocate 2x capacity. + newslice := make([][][blake2b.Size256]byte, int(tag)+1, (int(tag)+1)*2) + copy(newslice, tilelib.variant) + tilelib.variant = newslice[:int(tag)+1] + newvlock := make([]sync.Locker, int(tag)+1, (int(tag)+1)*2) + copy(newvlock, tilelib.vlock) + tilelib.vlock = newvlock[:int(tag)+1] + } else { + // Use previously allocated capacity, + // avoiding copy. + tilelib.variant = tilelib.variant[:int(tag)+1] + tilelib.vlock = tilelib.vlock[:int(tag)+1] + } + for i := oldlen; i < len(tilelib.vlock); i++ { + tilelib.vlock[i] = new(sync.Mutex) + } + for i := 0; i < oldlen; i++ { + tilelib.vlock[i].Unlock() + } } + vlock = tilelib.vlock[tag] + tilelib.mtx.Unlock() } - seqhash := blake2b.Sum256(seq) + + vlock.Lock() for i, varhash := range tilelib.variant[tag] { if varhash == seqhash { - tilelib.mtx.Unlock() + vlock.Unlock() return tileLibRef{Tag: tag, Variant: tileVariantID(i + 1)} } } - tilelib.variants++ + atomic.AddInt64(&tilelib.variants, 1) tilelib.variant[tag] = append(tilelib.variant[tag], seqhash) + variant := tileVariantID(len(tilelib.variant[tag])) + vlock.Unlock() + if tilelib.retainTileSequences && !dropSeq { + tilelib.mtx.Lock() if tilelib.seq == nil { tilelib.seq = map[[blake2b.Size256]byte][]byte{} } tilelib.seq[seqhash] = append([]byte(nil), seq...) + tilelib.mtx.Unlock() } - variant := tileVariantID(len(tilelib.variant[tag])) - tilelib.mtx.Unlock() if tilelib.encoder != nil { saveSeq := seq @@ -475,7 +513,7 @@ func (tilelib *tileLibrary) Tidy() { throttle := throttle{Max: runtime.NumCPU() + 1} for tag, oldvariants := range tilelib.variant { tag, oldvariants := tagID(tag), oldvariants - if tag%10000 == 0 { + if tag%1000000 == 0 { log.Printf("Tidy: tag %d", tag) } throttle.Acquire() @@ -506,7 +544,7 @@ func (tilelib *tileLibrary) Tidy() { } }) - // Replace tilelib.variants[tag] with a new + // Replace tilelib.variant[tag] with a new // re-ordered slice of hashes, and make a // mapping from old to new variant IDs. remaptag := make([]tileVariantID, len(oldvariants)+1)