X-Git-Url: https://git.arvados.org/lightning.git/blobdiff_plain/395c842444856a96a4216353a201ff5f42e3ef64..76fbc75a359348e2a91546a70b8d2c738865cce2:/tilelib.go?ds=sidebyside diff --git a/tilelib.go b/tilelib.go index 75e952696a..e9251f038d 100644 --- a/tilelib.go +++ b/tilelib.go @@ -7,6 +7,9 @@ import ( "encoding/gob" "fmt" "io" + "regexp" + "runtime" + "sort" "strings" "sync" @@ -48,13 +51,16 @@ func (tseq tileSeq) Variants() ([]tileVariantID, int, int) { } type tileLibrary struct { - includeNoCalls bool - skipOOO bool + retainNoCalls bool + skipOOO bool + retainTileSequences bool + taglib *tagLibrary variant [][][blake2b.Size256]byte refseqs map[string]map[string][]tileLibRef + compactGenomes map[string][]tileVariantID // count [][]int - // seq map[[blake2b.Size]byte][]byte + seq map[[blake2b.Size256]byte][]byte variants int // if non-nil, write out any tile variants added while tiling encoder *gob.Encoder @@ -151,6 +157,11 @@ func (tilelib *tileLibrary) loadCompactGenomes(cgs []CompactGenome, variantmap m return } } + if tilelib.compactGenomes != nil { + tilelib.mtx.Lock() + defer tilelib.mtx.Unlock() + tilelib.compactGenomes[cg.Name] = cg.Variants + } }() } wg.Wait() @@ -200,11 +211,11 @@ func (tilelib *tileLibrary) loadCompactSequences(cseqs []CompactSequence, varian // match. // // If onLoadGenome is non-nil, call it on each CompactGenome entry. -func (tilelib *tileLibrary) LoadGob(ctx context.Context, rdr io.Reader, onLoadGenome func(CompactGenome)) error { +func (tilelib *tileLibrary) LoadGob(ctx context.Context, rdr io.Reader, gz bool, onLoadGenome func(CompactGenome)) error { cgs := []CompactGenome{} cseqs := []CompactSequence{} variantmap := map[tileLibRef]tileVariantID{} - err := DecodeLibrary(rdr, func(ent *LibraryEntry) error { + err := DecodeLibrary(rdr, gz, func(ent *LibraryEntry) error { if ctx.Err() != nil { return ctx.Err() } @@ -245,7 +256,7 @@ type importStats struct { DroppedOutOfOrderTiles int } -func (tilelib *tileLibrary) TileFasta(filelabel string, rdr io.Reader) (tileSeq, []importStats, error) { +func (tilelib *tileLibrary) TileFasta(filelabel string, rdr io.Reader, matchChromosome *regexp.Regexp) (tileSeq, []importStats, error) { ret := tileSeq{} type jobT struct { label string @@ -283,7 +294,7 @@ func (tilelib *tileLibrary) TileFasta(filelabel string, rdr io.Reader) (tileSeq, for job := range todo { if len(job.fasta) == 0 { continue - } else if strings.Contains(job.label, "_") { + } else if !matchChromosome.MatchString(job.label) { skippedSequences++ continue } @@ -326,7 +337,7 @@ func (tilelib *tileLibrary) TileFasta(filelabel string, rdr io.Reader) (tileSeq, basesOut += countBases(job.fasta[last.pos+last.taglen : f.pos+f.taglen]) } else { // If we dropped this tile - // (because !includeNoCalls), + // (because !retainNoCalls), // set taglen=0 so the // overlapping tag is counted // toward coverage on the @@ -348,10 +359,9 @@ func (tilelib *tileLibrary) TileFasta(filelabel string, rdr io.Reader) (tileSeq, pathcopy := make([]tileLibRef, len(path)) copy(pathcopy, path) ret[job.label] = pathcopy - log.Debugf("%s %s tiled with path len %d, skipped %d", filelabel, job.label, len(path), skipped) basesIn := countBases(job.fasta) - log.Infof("%s %s fasta in %d coverage in %d coverage out %d", filelabel, job.label, len(job.fasta), basesIn, basesOut) + log.Infof("%s %s fasta in %d coverage in %d coverage out %d path len %d skipped %d", filelabel, job.label, len(job.fasta), basesIn, basesOut, len(path), skipped) stats = append(stats, importStats{ InputFile: filelabel, InputLabel: job.label, @@ -364,7 +374,7 @@ func (tilelib *tileLibrary) TileFasta(filelabel string, rdr io.Reader) (tileSeq, totalPathLen += len(path) } - log.Printf("%s tiled with total path len %d in %d sequences (skipped %d sequences with '_' in name, skipped %d out-of-order tags)", filelabel, totalPathLen, len(ret), skippedSequences, totalFoundTags-totalPathLen) + log.Printf("%s tiled with total path len %d in %d sequences (skipped %d sequences that did not match chromosome regexp, skipped %d out-of-order tags)", filelabel, totalPathLen, len(ret), skippedSequences, totalFoundTags-totalPathLen) return ret, stats, scanner.Err() } @@ -377,19 +387,16 @@ func (tilelib *tileLibrary) Len() int { // Return a tileLibRef for a tile with the given tag and sequence, // adding the sequence to the library if needed. func (tilelib *tileLibrary) getRef(tag tagID, seq []byte) tileLibRef { - if !tilelib.includeNoCalls { + dropSeq := false + if !tilelib.retainNoCalls { for _, b := range seq { if b != 'a' && b != 'c' && b != 'g' && b != 't' { - // return "tile not found" if seq has any - // no-calls - return tileLibRef{Tag: tag} + dropSeq = true + break } } } tilelib.mtx.Lock() - // if tilelib.seq == nil { - // tilelib.seq = map[[blake2b.Size]byte][]byte{} - // } if tilelib.variant == nil && tilelib.taglib != nil { tilelib.variant = make([][][blake2b.Size256]byte, tilelib.taglib.Len()) } @@ -418,23 +425,123 @@ func (tilelib *tileLibrary) getRef(tag tagID, seq []byte) tileLibRef { } tilelib.variants++ tilelib.variant[tag] = append(tilelib.variant[tag], seqhash) - // tilelib.seq[seqhash] = append([]byte(nil), seq...) + if tilelib.retainTileSequences && !dropSeq { + if tilelib.seq == nil { + tilelib.seq = map[[blake2b.Size256]byte][]byte{} + } + tilelib.seq[seqhash] = append([]byte(nil), seq...) + } variant := tileVariantID(len(tilelib.variant[tag])) tilelib.mtx.Unlock() if tilelib.encoder != nil { + saveSeq := seq + if dropSeq { + // Save the hash, but not the sequence + saveSeq = nil + } tilelib.encoder.Encode(LibraryEntry{ TileVariants: []TileVariant{{ Tag: tag, Variant: variant, Blake2b: seqhash, - Sequence: seq, + Sequence: saveSeq, }}, }) } return tileLibRef{Tag: tag, Variant: variant} } +func (tilelib *tileLibrary) TileVariantSequence(libref tileLibRef) []byte { + if libref.Variant == 0 || len(tilelib.variant) <= int(libref.Tag) || len(tilelib.variant[libref.Tag]) < int(libref.Variant) { + return nil + } + return tilelib.seq[tilelib.variant[libref.Tag][libref.Variant-1]] +} + +// Tidy deletes unreferenced tile variants and renumbers variants so +// more common variants have smaller IDs. +func (tilelib *tileLibrary) Tidy() { + log.Print("Tidy: compute inref") + inref := map[tileLibRef]bool{} + for _, refseq := range tilelib.refseqs { + for _, librefs := range refseq { + for _, libref := range librefs { + inref[libref] = true + } + } + } + log.Print("Tidy: compute remap") + remap := make([][]tileVariantID, len(tilelib.variant)) + throttle := throttle{Max: runtime.NumCPU() + 1} + for tag, oldvariants := range tilelib.variant { + tag, oldvariants := tagID(tag), oldvariants + if tag%1000000 == 0 { + log.Printf("Tidy: tag %d", tag) + } + throttle.Acquire() + go func() { + defer throttle.Release() + uses := make([]int, len(oldvariants)) + for _, cg := range tilelib.compactGenomes { + for phase := 0; phase < 2; phase++ { + cgi := int(tag)*2 + phase + if cgi < len(cg) && cg[cgi] > 0 { + uses[cg[cgi]-1]++ + } + } + } + + // Compute desired order of variants: + // neworder[x] == index in oldvariants that + // should move to position x. + neworder := make([]int, len(oldvariants)) + for i := range neworder { + neworder[i] = i + } + sort.Slice(neworder, func(i, j int) bool { + if cmp := uses[neworder[i]] - uses[neworder[j]]; cmp != 0 { + return cmp > 0 + } else { + return bytes.Compare(oldvariants[neworder[i]][:], oldvariants[neworder[j]][:]) < 0 + } + }) + + // Replace tilelib.variants[tag] with a new + // re-ordered slice of hashes, and make a + // mapping from old to new variant IDs. + remaptag := make([]tileVariantID, len(oldvariants)+1) + newvariants := make([][blake2b.Size256]byte, 0, len(neworder)) + for _, oldi := range neworder { + if uses[oldi] > 0 || inref[tileLibRef{Tag: tag, Variant: tileVariantID(oldi + 1)}] { + newvariants = append(newvariants, oldvariants[oldi]) + remaptag[oldi+1] = tileVariantID(len(newvariants)) + } + } + tilelib.variant[tag] = newvariants + remap[tag] = remaptag + }() + } + throttle.Wait() + + // Apply remap to genomes and reference sequences, so they + // refer to the same tile variants using the changed IDs. + log.Print("Tidy: apply remap") + for _, cg := range tilelib.compactGenomes { + for idx, variant := range cg { + cg[idx] = remap[tagID(idx/2)][variant] + } + } + for _, refcs := range tilelib.refseqs { + for _, refseq := range refcs { + for i, tv := range refseq { + refseq[i].Variant = remap[tv.Tag][tv.Variant] + } + } + } + log.Print("Tidy: done") +} + func countBases(seq []byte) int { n := 0 for _, c := range seq {