X-Git-Url: https://git.arvados.org/lightning.git/blobdiff_plain/1a76177ea6911896e6e6196f5975aefc2988950a..3f9a08026e7756f2dd4d0950ebea27458f962177:/tilelib.go diff --git a/tilelib.go b/tilelib.go index b4ecd4a3af..b19a188e19 100644 --- a/tilelib.go +++ b/tilelib.go @@ -17,8 +17,8 @@ import ( type tileVariantID uint16 // 1-based type tileLibRef struct { - tag tagID - variant tileVariantID + Tag tagID + Variant tileVariantID } type tileSeq map[string][]tileLibRef @@ -27,8 +27,8 @@ func (tseq tileSeq) Variants() ([]tileVariantID, int, int) { maxtag := 0 for _, refs := range tseq { for _, ref := range refs { - if maxtag < int(ref.tag) { - maxtag = int(ref.tag) + if maxtag < int(ref.Tag) { + maxtag = int(ref.Tag) } } } @@ -36,29 +36,31 @@ func (tseq tileSeq) Variants() ([]tileVariantID, int, int) { var kept, dropped int for _, refs := range tseq { for _, ref := range refs { - if vars[int(ref.tag)] != 0 { + if vars[int(ref.Tag)] != 0 { dropped++ } else { kept++ } - vars[int(ref.tag)] = ref.variant + vars[int(ref.Tag)] = ref.Variant } } return vars, kept, dropped } type tileLibrary struct { - includeNoCalls bool - skipOOO bool + includeNoCalls bool + skipOOO bool + retainTileSequences bool + taglib *tagLibrary variant [][][blake2b.Size256]byte + refseqs map[string]map[string][]tileLibRef + compactGenomes map[string][]tileVariantID // count [][]int - // seq map[[blake2b.Size]byte][]byte + seq map[[blake2b.Size256]byte][]byte variants int // if non-nil, write out any tile variants added while tiling encoder *gob.Encoder - // if non-nil, call this func upon loading a genome - onLoadGenome func(CompactGenome) mtx sync.Mutex } @@ -103,20 +105,21 @@ func (tilelib *tileLibrary) loadTileVariants(tvs []TileVariant, variantmap map[t for _, tv := range tvs { // Assign a new variant ID (unique across all inputs) // for each input variant. - variantmap[tileLibRef{tag: tv.Tag, variant: tv.Variant}] = tilelib.getRef(tv.Tag, tv.Sequence).variant + variantmap[tileLibRef{Tag: tv.Tag, Variant: tv.Variant}] = tilelib.getRef(tv.Tag, tv.Sequence).Variant } return nil } -func (tilelib *tileLibrary) loadGenomes(genomes map[string][]tileVariantID, variantmap map[tileLibRef]tileVariantID, onLoadGenome func(CompactGenome)) error { +func (tilelib *tileLibrary) loadCompactGenomes(cgs []CompactGenome, variantmap map[tileLibRef]tileVariantID, onLoadGenome func(CompactGenome)) error { + log.Debugf("loadCompactGenomes: %d", len(cgs)) var wg sync.WaitGroup errs := make(chan error, 1) - for name, variants := range genomes { - name, variants := name, variants + for _, cg := range cgs { wg.Add(1) + cg := cg go func() { defer wg.Done() - for i, variant := range variants { + for i, variant := range cg.Variants { if len(errs) > 0 { return } @@ -124,38 +127,38 @@ func (tilelib *tileLibrary) loadGenomes(genomes map[string][]tileVariantID, vari continue } tag := tagID(i / 2) - newvariant, ok := variantmap[tileLibRef{tag: tag, variant: variant}] + newvariant, ok := variantmap[tileLibRef{Tag: tag, Variant: variant}] if !ok { - err := fmt.Errorf("oops: genome %q has variant %d for tag %d, but that variant was not in its library", name, variant, tag) + err := fmt.Errorf("oops: genome %q has variant %d for tag %d, but that variant was not in its library", cg.Name, variant, tag) select { case errs <- err: default: } return } - variants[i] = newvariant + log.Tracef("loadCompactGenomes: cg %s tag %d variant %d => %d", cg.Name, tag, variant, newvariant) + cg.Variants[i] = newvariant + } + if onLoadGenome != nil { + onLoadGenome(cg) } if tilelib.encoder != nil { - for name, variants := range genomes { - cg := CompactGenome{ - Name: name, - Variants: variants, - } - if onLoadGenome != nil { - onLoadGenome(cg) - } - err := tilelib.encoder.Encode(LibraryEntry{ - CompactGenomes: []CompactGenome{cg}, - }) - if err != nil { - select { - case errs <- err: - default: - } - return + err := tilelib.encoder.Encode(LibraryEntry{ + CompactGenomes: []CompactGenome{cg}, + }) + if err != nil { + select { + case errs <- err: + default: } + return } } + if tilelib.compactGenomes != nil { + tilelib.mtx.Lock() + defer tilelib.mtx.Unlock() + tilelib.compactGenomes[cg.Name] = cg.Variants + } }() } wg.Wait() @@ -163,8 +166,51 @@ func (tilelib *tileLibrary) loadGenomes(genomes map[string][]tileVariantID, vari return <-errs } +func (tilelib *tileLibrary) loadCompactSequences(cseqs []CompactSequence, variantmap map[tileLibRef]tileVariantID) error { + log.Debugf("loadCompactSequences: %d", len(cseqs)) + for _, cseq := range cseqs { + for _, tseq := range cseq.TileSequences { + for i, libref := range tseq { + if libref.Variant == 0 { + // No variant (e.g., import + // dropped tiles with + // no-calls) = no translation. + continue + } + v, ok := variantmap[libref] + if !ok { + return fmt.Errorf("oops: CompactSequence %q has variant %d for tag %d, but that variant was not in its library", cseq.Name, libref.Variant, libref.Tag) + } + tseq[i].Variant = v + } + } + if tilelib.encoder != nil { + if err := tilelib.encoder.Encode(LibraryEntry{ + CompactSequences: []CompactSequence{cseq}, + }); err != nil { + return err + } + } + } + tilelib.mtx.Lock() + defer tilelib.mtx.Unlock() + if tilelib.refseqs == nil { + tilelib.refseqs = map[string]map[string][]tileLibRef{} + } + for _, cseq := range cseqs { + tilelib.refseqs[cseq.Name] = cseq.TileSequences + } + return nil +} + +// Load library data from rdr. Tile variants might be renumbered in +// the process; in that case, genomes variants will be renumbered to +// match. +// +// If onLoadGenome is non-nil, call it on each CompactGenome entry. func (tilelib *tileLibrary) LoadGob(ctx context.Context, rdr io.Reader, onLoadGenome func(CompactGenome)) error { - genomes := map[string][]tileVariantID{} + cgs := []CompactGenome{} + cseqs := []CompactSequence{} variantmap := map[tileLibRef]tileVariantID{} err := DecodeLibrary(rdr, func(ent *LibraryEntry) error { if ctx.Err() != nil { @@ -172,14 +218,13 @@ func (tilelib *tileLibrary) LoadGob(ctx context.Context, rdr io.Reader, onLoadGe } if err := tilelib.loadTagSet(ent.TagSet); err != nil { return err - } else if err = tilelib.loadTileVariants(ent.TileVariants, variantmap); err != nil { + } + if err := tilelib.loadTileVariants(ent.TileVariants, variantmap); err != nil { return err - } else { - for _, cg := range ent.CompactGenomes { - genomes[cg.Name] = cg.Variants - } - return nil } + cgs = append(cgs, ent.CompactGenomes...) + cseqs = append(cseqs, ent.CompactSequences...) + return nil }) if err != nil { return err @@ -187,14 +232,28 @@ func (tilelib *tileLibrary) LoadGob(ctx context.Context, rdr io.Reader, onLoadGe if ctx.Err() != nil { return ctx.Err() } - err = tilelib.loadGenomes(genomes, variantmap, onLoadGenome) + err = tilelib.loadCompactGenomes(cgs, variantmap, onLoadGenome) + if err != nil { + return err + } + err = tilelib.loadCompactSequences(cseqs, variantmap) if err != nil { return err } return nil } -func (tilelib *tileLibrary) TileFasta(filelabel string, rdr io.Reader) (tileSeq, error) { +type importStats struct { + InputFile string + InputLabel string + InputLength int + InputCoverage int + TileCoverage int + PathLength int + DroppedOutOfOrderTiles int +} + +func (tilelib *tileLibrary) TileFasta(filelabel string, rdr io.Reader) (tileSeq, []importStats, error) { ret := tileSeq{} type jobT struct { label string @@ -210,7 +269,7 @@ func (tilelib *tileLibrary) TileFasta(filelabel string, rdr io.Reader) (tileSeq, buf := scanner.Bytes() if len(buf) > 0 && buf[0] == '>' { todo <- jobT{seqlabel, fasta} - seqlabel, fasta = string(buf[1:]), nil + seqlabel, fasta = strings.SplitN(string(buf[1:]), " ", 2)[0], nil log.Debugf("%s %s reading fasta", filelabel, seqlabel) } else { fasta = append(fasta, bytes.ToLower(buf)...) @@ -228,6 +287,7 @@ func (tilelib *tileLibrary) TileFasta(filelabel string, rdr io.Reader) (tileSeq, totalFoundTags := 0 totalPathLen := 0 skippedSequences := 0 + stats := make([]importStats, 0, len(todo)) for job := range todo { if len(job.fasta) == 0 { continue @@ -243,6 +303,7 @@ func (tilelib *tileLibrary) TileFasta(filelabel string, rdr io.Reader) (tileSeq, }) totalFoundTags += len(found) + basesOut := 0 skipped := 0 path = path[:0] last := foundtag{tagid: -1} @@ -256,23 +317,62 @@ func (tilelib *tileLibrary) TileFasta(filelabel string, rdr io.Reader) (tileSeq, } for i, f := range found { log.Tracef("%s %s found[%d] == %#v", filelabel, job.label, i, f) - if last.taglen > 0 { - path = append(path, tilelib.getRef(last.tagid, job.fasta[last.pos:f.pos+f.taglen])) + if last.tagid < 0 { + // first tag in sequence + last = foundtag{tagid: f.tagid} + continue + } + libref := tilelib.getRef(last.tagid, job.fasta[last.pos:f.pos+f.taglen]) + path = append(path, libref) + if libref.Variant > 0 { + // Count output coverage from + // the end of the previous tag + // (if any) to the end of the + // current tag, IOW don't + // double-count coverage for + // the previous tag. + basesOut += countBases(job.fasta[last.pos+last.taglen : f.pos+f.taglen]) + } else { + // If we dropped this tile + // (because !includeNoCalls), + // set taglen=0 so the + // overlapping tag is counted + // toward coverage on the + // following tile. + f.taglen = 0 } last = f } - if last.taglen > 0 { - path = append(path, tilelib.getRef(last.tagid, job.fasta[last.pos:])) + if last.tagid < 0 { + log.Warnf("%s %s no tags found", filelabel, job.label) + } else { + libref := tilelib.getRef(last.tagid, job.fasta[last.pos:]) + path = append(path, libref) + if libref.Variant > 0 { + basesOut += countBases(job.fasta[last.pos+last.taglen:]) + } } pathcopy := make([]tileLibRef, len(path)) copy(pathcopy, path) ret[job.label] = pathcopy - log.Debugf("%s %s tiled with path len %d, skipped %d", filelabel, job.label, len(path), skipped) + + basesIn := countBases(job.fasta) + log.Infof("%s %s fasta in %d coverage in %d coverage out %d path len %d skipped %d", filelabel, job.label, len(job.fasta), basesIn, basesOut, len(path), skipped) + stats = append(stats, importStats{ + InputFile: filelabel, + InputLabel: job.label, + InputLength: len(job.fasta), + InputCoverage: basesIn, + TileCoverage: basesOut, + PathLength: len(path), + DroppedOutOfOrderTiles: skipped, + }) + totalPathLen += len(path) } log.Printf("%s tiled with total path len %d in %d sequences (skipped %d sequences with '_' in name, skipped %d out-of-order tags)", filelabel, totalPathLen, len(ret), skippedSequences, totalFoundTags-totalPathLen) - return ret, scanner.Err() + return ret, stats, scanner.Err() } func (tilelib *tileLibrary) Len() int { @@ -289,14 +389,11 @@ func (tilelib *tileLibrary) getRef(tag tagID, seq []byte) tileLibRef { if b != 'a' && b != 'c' && b != 'g' && b != 't' { // return "tile not found" if seq has any // no-calls - return tileLibRef{tag: tag} + return tileLibRef{Tag: tag} } } } tilelib.mtx.Lock() - // if tilelib.seq == nil { - // tilelib.seq = map[[blake2b.Size]byte][]byte{} - // } if tilelib.variant == nil && tilelib.taglib != nil { tilelib.variant = make([][][blake2b.Size256]byte, tilelib.taglib.Len()) } @@ -320,12 +417,17 @@ func (tilelib *tileLibrary) getRef(tag tagID, seq []byte) tileLibRef { for i, varhash := range tilelib.variant[tag] { if varhash == seqhash { tilelib.mtx.Unlock() - return tileLibRef{tag: tag, variant: tileVariantID(i + 1)} + return tileLibRef{Tag: tag, Variant: tileVariantID(i + 1)} } } tilelib.variants++ tilelib.variant[tag] = append(tilelib.variant[tag], seqhash) - // tilelib.seq[seqhash] = append([]byte(nil), seq...) + if tilelib.retainTileSequences { + if tilelib.seq == nil { + tilelib.seq = map[[blake2b.Size256]byte][]byte{} + } + tilelib.seq[seqhash] = append([]byte(nil), seq...) + } variant := tileVariantID(len(tilelib.variant[tag])) tilelib.mtx.Unlock() @@ -339,5 +441,22 @@ func (tilelib *tileLibrary) getRef(tag tagID, seq []byte) tileLibRef { }}, }) } - return tileLibRef{tag: tag, variant: variant} + return tileLibRef{Tag: tag, Variant: variant} +} + +func (tilelib *tileLibrary) TileVariantSequence(libref tileLibRef) []byte { + if libref.Variant == 0 || len(tilelib.variant) <= int(libref.Tag) || len(tilelib.variant[libref.Tag]) < int(libref.Variant) { + return nil + } + return tilelib.seq[tilelib.variant[libref.Tag][libref.Variant-1]] +} + +func countBases(seq []byte) int { + n := 0 + for _, c := range seq { + if isbase[c] { + n++ + } + } + return n }