X-Git-Url: https://git.arvados.org/lightning.git/blobdiff_plain/de10f554682eecf3926b409c70f9685901d11272..879ae19de38d5b718fc49a366c09678d82c3950e:/tilelib.go diff --git a/tilelib.go b/tilelib.go index 926c0324a2..3852b4d093 100644 --- a/tilelib.go +++ b/tilelib.go @@ -1,4 +1,4 @@ -package main +package lightning import ( "bufio" @@ -7,6 +7,7 @@ import ( "encoding/gob" "fmt" "io" + "os" "regexp" "runtime" "sort" @@ -14,6 +15,7 @@ import ( "sync" "sync/atomic" + "github.com/klauspost/pgzip" log "github.com/sirupsen/logrus" "golang.org/x/crypto/blake2b" ) @@ -208,6 +210,221 @@ func (tilelib *tileLibrary) loadCompactSequences(cseqs []CompactSequence, varian return nil } +func (tilelib *tileLibrary) LoadDir(ctx context.Context, path string, onLoadGenome func(CompactGenome)) error { + var files []string + var walk func(string) error + walk = func(path string) error { + f, err := open(path) + if err != nil { + return err + } + defer f.Close() + fis, err := f.Readdir(-1) + if err != nil { + files = append(files, path) + return nil + } + for _, fi := range fis { + if fi.Name() == "." || fi.Name() == ".." { + continue + } else if child := path + "/" + fi.Name(); fi.IsDir() { + err = walk(child) + if err != nil { + return err + } + } else if strings.HasSuffix(child, ".gob") || strings.HasSuffix(child, ".gob.gz") { + files = append(files, child) + } + } + return nil + } + log.Infof("LoadDir: walk dir %s", path) + err := walk(path) + if err != nil { + return err + } + ctx, cancel := context.WithCancel(ctx) + defer cancel() + var mtx sync.Mutex + cgs := []CompactGenome{} + cseqs := []CompactSequence{} + variantmap := map[tileLibRef]tileVariantID{} + errs := make(chan error, len(files)) + log.Infof("LoadDir: read %d files", len(files)) + for _, path := range files { + path := path + go func() { + f, err := open(path) + if err != nil { + errs <- err + return + } + defer f.Close() + defer log.Infof("LoadDir: finished reading %s", path) + errs <- DecodeLibrary(f, strings.HasSuffix(path, ".gz"), func(ent *LibraryEntry) error { + if ctx.Err() != nil { + return ctx.Err() + } + if len(ent.TagSet) > 0 { + mtx.Lock() + if tilelib.taglib == nil || tilelib.taglib.Len() != len(ent.TagSet) { + // load first set of tags, or + // report mismatch if 2 sets + // have different #tags. + if err := tilelib.loadTagSet(ent.TagSet); err != nil { + mtx.Unlock() + return err + } + } + mtx.Unlock() + } + variantmapadd := map[tileLibRef]tileVariantID{} + for _, tv := range ent.TileVariants { + variantmapadd[tileLibRef{Tag: tv.Tag, Variant: tv.Variant}] = tilelib.getRef(tv.Tag, tv.Sequence).Variant + } + mtx.Lock() + cgs = append(cgs, ent.CompactGenomes...) + cseqs = append(cseqs, ent.CompactSequences...) + for k, v := range variantmapadd { + variantmap[k] = v + } + mtx.Unlock() + return nil + }) + }() + } + for range files { + err := <-errs + if err != nil { + return err + } + } + log.Info("LoadDir: loadCompactGenomes") + err = tilelib.loadCompactGenomes(cgs, variantmap, onLoadGenome) + if err != nil { + return err + } + log.Info("LoadDir: loadCompactSequences") + err = tilelib.loadCompactSequences(cseqs, variantmap) + if err != nil { + return err + } + log.Info("LoadDir done") + return nil +} + +func (tilelib *tileLibrary) WriteDir(dir string) error { + nfiles := 128 + files := make([]*os.File, nfiles) + for i := range files { + f, err := os.OpenFile(fmt.Sprintf("%s/library.%04d.gob.gz", dir, i), os.O_CREATE|os.O_WRONLY, 0666) + if err != nil { + return err + } + defer f.Close() + files[i] = f + } + bufws := make([]*bufio.Writer, nfiles) + for i := range bufws { + bufws[i] = bufio.NewWriterSize(files[i], 1<<26) + } + zws := make([]*pgzip.Writer, nfiles) + for i := range zws { + zws[i] = pgzip.NewWriter(bufws[i]) + defer zws[i].Close() + } + encoders := make([]*gob.Encoder, nfiles) + for i := range encoders { + encoders[i] = gob.NewEncoder(zws[i]) + } + + cgnames := make([]string, 0, len(tilelib.compactGenomes)) + for name := range tilelib.compactGenomes { + cgnames = append(cgnames, name) + } + sort.Strings(cgnames) + + log.Infof("WriteDir: writing %d files", nfiles) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + errs := make(chan error, nfiles) + for start := range files { + start := start + go func() { + err := encoders[start].Encode(LibraryEntry{TagSet: tilelib.taglib.Tags()}) + if err != nil { + errs <- err + return + } + if start == 0 { + // For now, just write all the refs to + // the first file + for name, tseqs := range tilelib.refseqs { + err := encoders[start].Encode(LibraryEntry{CompactSequences: []CompactSequence{{ + Name: name, + TileSequences: tseqs, + }}}) + if err != nil { + errs <- err + return + } + } + } + for i := start; i < len(cgnames); i += nfiles { + err := encoders[start].Encode(LibraryEntry{CompactGenomes: []CompactGenome{{ + Name: cgnames[i], + Variants: tilelib.compactGenomes[cgnames[i]], + }}}) + if err != nil { + errs <- err + return + } + } + tvs := []TileVariant{} + for tag := start; tag < len(tilelib.variant) && ctx.Err() == nil; tag += nfiles { + tvs = tvs[:0] + for idx, hash := range tilelib.variant[tag] { + tvs = append(tvs, TileVariant{ + Tag: tagID(tag), + Variant: tileVariantID(idx + 1), + Blake2b: hash, + Sequence: tilelib.seq[hash], + }) + } + err := encoders[start].Encode(LibraryEntry{TileVariants: tvs}) + if err != nil { + errs <- err + return + } + } + errs <- nil + }() + } + for range files { + err := <-errs + if err != nil { + return err + } + } + log.Info("WriteDir: flushing") + for i := range zws { + err := zws[i].Close() + if err != nil { + return err + } + err = bufws[i].Flush() + if err != nil { + return err + } + err = files[i].Close() + if err != nil { + return err + } + } + log.Info("WriteDir: done") + return nil +} + // Load library data from rdr. Tile variants might be renumbered in // the process; in that case, genomes variants will be renumbered to // match. @@ -248,6 +465,36 @@ func (tilelib *tileLibrary) LoadGob(ctx context.Context, rdr io.Reader, gz bool, return nil } +func (tilelib *tileLibrary) dump(out io.Writer) { + printTV := func(tag int, variant tileVariantID) { + if variant < 1 { + fmt.Fprintf(out, " -") + } else if tag >= len(tilelib.variant) { + fmt.Fprintf(out, " (!tag=%d)", tag) + } else if int(variant) > len(tilelib.variant[tag]) { + fmt.Fprintf(out, " (tag=%d,!variant=%d)", tag, variant) + } else { + fmt.Fprintf(out, " %x", tilelib.variant[tag][variant-1][:8]) + } + } + for refname, refseqs := range tilelib.refseqs { + for seqname, seq := range refseqs { + fmt.Fprintf(out, "ref %s %s", refname, seqname) + for _, libref := range seq { + printTV(int(libref.Tag), libref.Variant) + } + fmt.Fprintf(out, "\n") + } + } + for name, cg := range tilelib.compactGenomes { + fmt.Fprintf(out, "cg %s", name) + for tag, variant := range cg { + printTV(tag/2, variant) + } + fmt.Fprintf(out, "\n") + } +} + type importStats struct { InputFile string InputLabel string @@ -324,6 +571,7 @@ func (tilelib *tileLibrary) TileFasta(filelabel string, rdr io.Reader, matchChro log.Infof("%s %s getting %d librefs", filelabel, job.label, len(found)) throttle := &throttle{Max: runtime.NumCPU()} path = path[:len(found)] + var lowquality int64 for i, f := range found { i, f := i, f throttle.Acquire() @@ -341,6 +589,9 @@ func (tilelib *tileLibrary) TileFasta(filelabel string, rdr io.Reader, matchChro endpos = found[i+1].pos + taglen } path[i] = tilelib.getRef(f.tagid, job.fasta[startpos:endpos]) + if countBases(job.fasta[startpos:endpos]) != endpos-startpos { + atomic.AddInt64(&lowquality, 1) + } }() } throttle.Wait() @@ -352,7 +603,7 @@ func (tilelib *tileLibrary) TileFasta(filelabel string, rdr io.Reader, matchChro ret[job.label] = pathcopy basesIn := countBases(job.fasta) - log.Infof("%s %s fasta in %d coverage in %d path len %d skipped-out-of-order %d", filelabel, job.label, len(job.fasta), basesIn, len(path), skipped) + log.Infof("%s %s fasta in %d coverage in %d path len %d low-quality %d skipped-out-of-order %d", filelabel, job.label, len(job.fasta), basesIn, len(path), lowquality, skipped) stats = append(stats, importStats{ InputFile: filelabel, InputLabel: job.label, @@ -560,18 +811,30 @@ func (tilelib *tileLibrary) Tidy() { // Apply remap to genomes and reference sequences, so they // refer to the same tile variants using the changed IDs. log.Print("Tidy: apply remap") + var wg sync.WaitGroup for _, cg := range tilelib.compactGenomes { - for idx, variant := range cg { - cg[idx] = remap[tagID(idx/2)][variant] - } + cg := cg + wg.Add(1) + go func() { + defer wg.Done() + for idx, variant := range cg { + cg[idx] = remap[tagID(idx/2)][variant] + } + }() } for _, refcs := range tilelib.refseqs { for _, refseq := range refcs { - for i, tv := range refseq { - refseq[i].Variant = remap[tv.Tag][tv.Variant] - } + refseq := refseq + wg.Add(1) + go func() { + defer wg.Done() + for i, tv := range refseq { + refseq[i].Variant = remap[tv.Tag][tv.Variant] + } + }() } } + wg.Wait() log.Print("Tidy: done") }