X-Git-Url: https://git.arvados.org/lightning.git/blobdiff_plain/5571e392bb3ca21e31ddabe21958cb300ca8b731..93e063d8357b8682c0a730fc702f8b05ece6c46f:/tilelib.go diff --git a/tilelib.go b/tilelib.go index ace24cec0a..a3b63ec78f 100644 --- a/tilelib.go +++ b/tilelib.go @@ -1,3 +1,7 @@ +// Copyright (C) The Lightning Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + package lightning import ( @@ -62,9 +66,9 @@ type tileLibrary struct { variant [][][blake2b.Size256]byte refseqs map[string]map[string][]tileLibRef compactGenomes map[string][]tileVariantID - // count [][]int - seq map[[blake2b.Size256]byte][]byte - variants int64 + seq2 map[[2]byte]map[[blake2b.Size256]byte][]byte + seq2lock map[[2]byte]sync.Locker + variants int64 // if non-nil, write out any tile variants added while tiling encoder *gob.Encoder @@ -143,7 +147,7 @@ func (tilelib *tileLibrary) loadCompactGenomes(cgs []CompactGenome, variantmap m } return } - log.Tracef("loadCompactGenomes: cg %s tag %d variant %d => %d", cg.Name, tag, variant, newvariant) + // log.Tracef("loadCompactGenomes: cg %s tag %d variant %d => %d", cg.Name, tag, variant, newvariant) cg.Variants[i] = newvariant } if onLoadGenome != nil { @@ -174,8 +178,9 @@ func (tilelib *tileLibrary) loadCompactGenomes(cgs []CompactGenome, variantmap m } func (tilelib *tileLibrary) loadCompactSequences(cseqs []CompactSequence, variantmap map[tileLibRef]tileVariantID) error { - log.Debugf("loadCompactSequences: %d", len(cseqs)) + log.Infof("loadCompactSequences: %d todo", len(cseqs)) for _, cseq := range cseqs { + log.Infof("loadCompactSequences: checking %s", cseq.Name) for _, tseq := range cseq.TileSequences { for i, libref := range tseq { if libref.Variant == 0 { @@ -198,6 +203,7 @@ func (tilelib *tileLibrary) loadCompactSequences(cseqs []CompactSequence, varian return err } } + log.Infof("loadCompactSequences: checking %s done", cseq.Name) } tilelib.mtx.Lock() defer tilelib.mtx.Unlock() @@ -207,6 +213,7 @@ func (tilelib *tileLibrary) loadCompactSequences(cseqs []CompactSequence, varian for _, cseq := range cseqs { tilelib.refseqs[cseq.Name] = cseq.TileSequences } + log.Info("loadCompactSequences: done") return nil } @@ -227,17 +234,18 @@ func (tilelib *tileLibrary) LoadDir(ctx context.Context, path string, onLoadGeno for _, fi := range fis { if fi.Name() == "." || fi.Name() == ".." { continue - } else if fi.IsDir() { - err = walk(path + "/" + fi.Name()) + } else if child := path + "/" + fi.Name(); fi.IsDir() { + err = walk(child) if err != nil { return err } - } else if strings.HasSuffix(path, ".gob") || strings.HasSuffix(path, ".gob.gz") { - files = append(files, path) + } else if strings.HasSuffix(child, ".gob") || strings.HasSuffix(child, ".gob.gz") { + files = append(files, child) } } return nil } + log.Infof("LoadDir: walk dir %s", path) err := walk(path) if err != nil { return err @@ -245,12 +253,13 @@ func (tilelib *tileLibrary) LoadDir(ctx context.Context, path string, onLoadGeno ctx, cancel := context.WithCancel(ctx) defer cancel() var mtx sync.Mutex - cgs := []CompactGenome{} - cseqs := []CompactSequence{} - variantmap := map[tileLibRef]tileVariantID{} + allcgs := make([][]CompactGenome, len(files)) + allcseqs := make([][]CompactSequence, len(files)) + allvariantmap := map[tileLibRef]tileVariantID{} errs := make(chan error, len(files)) - for _, path := range files { - path := path + log.Infof("LoadDir: read %d files", len(files)) + for fileno, path := range files { + fileno, path := fileno, path go func() { f, err := open(path) if err != nil { @@ -258,22 +267,43 @@ func (tilelib *tileLibrary) LoadDir(ctx context.Context, path string, onLoadGeno return } defer f.Close() - errs <- DecodeLibrary(f, strings.HasSuffix(path, ".gz"), func(ent *LibraryEntry) error { + defer log.Infof("LoadDir: finished reading %s", path) + + var variantmap = map[tileLibRef]tileVariantID{} + var cgs []CompactGenome + var cseqs []CompactSequence + err = DecodeLibrary(f, strings.HasSuffix(path, ".gz"), func(ent *LibraryEntry) error { if ctx.Err() != nil { return ctx.Err() } - mtx.Lock() - defer mtx.Unlock() - if err := tilelib.loadTagSet(ent.TagSet); err != nil { - return err + if len(ent.TagSet) > 0 { + mtx.Lock() + if tilelib.taglib == nil || tilelib.taglib.Len() != len(ent.TagSet) { + // load first set of tags, or + // report mismatch if 2 sets + // have different #tags. + if err := tilelib.loadTagSet(ent.TagSet); err != nil { + mtx.Unlock() + return err + } + } + mtx.Unlock() } - if err := tilelib.loadTileVariants(ent.TileVariants, variantmap); err != nil { - return err + for _, tv := range ent.TileVariants { + variantmap[tileLibRef{Tag: tv.Tag, Variant: tv.Variant}] = tilelib.getRef(tv.Tag, tv.Sequence).Variant } cgs = append(cgs, ent.CompactGenomes...) cseqs = append(cseqs, ent.CompactSequences...) return nil }) + allcgs[fileno] = cgs + allcseqs[fileno] = cseqs + mtx.Lock() + defer mtx.Unlock() + for k, v := range variantmap { + allvariantmap[k] = v + } + errs <- err }() } for range files { @@ -282,19 +312,34 @@ func (tilelib *tileLibrary) LoadDir(ctx context.Context, path string, onLoadGeno return err } } - err = tilelib.loadCompactGenomes(cgs, variantmap, onLoadGenome) + + log.Info("LoadDir: loadCompactGenomes") + var flatcgs []CompactGenome + for _, cgs := range allcgs { + flatcgs = append(flatcgs, cgs...) + } + err = tilelib.loadCompactGenomes(flatcgs, allvariantmap, onLoadGenome) if err != nil { return err } - err = tilelib.loadCompactSequences(cseqs, variantmap) + + log.Info("LoadDir: loadCompactSequences") + var flatcseqs []CompactSequence + for _, cseqs := range allcseqs { + flatcseqs = append(flatcseqs, cseqs...) + } + err = tilelib.loadCompactSequences(flatcseqs, allvariantmap) if err != nil { return err } + + log.Info("LoadDir done") return nil } func (tilelib *tileLibrary) WriteDir(dir string) error { - nfiles := 128 + ntilefiles := 128 + nfiles := ntilefiles + len(tilelib.refseqs) files := make([]*os.File, nfiles) for i := range files { f, err := os.OpenFile(fmt.Sprintf("%s/library.%04d.gob.gz", dir, i), os.O_CREATE|os.O_WRONLY, 0666) @@ -317,45 +362,60 @@ func (tilelib *tileLibrary) WriteDir(dir string) error { for i := range encoders { encoders[i] = gob.NewEncoder(zws[i]) } + + cgnames := make([]string, 0, len(tilelib.compactGenomes)) + for name := range tilelib.compactGenomes { + cgnames = append(cgnames, name) + } + sort.Strings(cgnames) + + refnames := make([]string, 0, len(tilelib.refseqs)) + for name := range tilelib.refseqs { + refnames = append(refnames, name) + } + sort.Strings(refnames) + + log.Infof("WriteDir: writing %d files", nfiles) ctx, cancel := context.WithCancel(context.Background()) defer cancel() errs := make(chan error, nfiles) for start := range files { start := start go func() { - ent0 := LibraryEntry{ - TagSet: tilelib.taglib.Tags(), - } - if start == 0 { - // For now, just write all the genomes and refs - // to the first file - for name, cg := range tilelib.compactGenomes { - ent0.CompactGenomes = append(ent0.CompactGenomes, CompactGenome{ - Name: name, - Variants: cg, - }) - } - for name, tseqs := range tilelib.refseqs { - ent0.CompactSequences = append(ent0.CompactSequences, CompactSequence{ - Name: name, - TileSequences: tseqs, - }) - } - } - err := encoders[start].Encode(ent0) + err := encoders[start].Encode(LibraryEntry{TagSet: tilelib.taglib.Tags()}) if err != nil { errs <- err return } + if refidx := start - ntilefiles; refidx >= 0 { + // write each ref to its own file + // (they seem to load very slowly) + name := refnames[refidx] + errs <- encoders[start].Encode(LibraryEntry{CompactSequences: []CompactSequence{{ + Name: name, + TileSequences: tilelib.refseqs[name], + }}}) + return + } + for i := start; i < len(cgnames); i += ntilefiles { + err := encoders[start].Encode(LibraryEntry{CompactGenomes: []CompactGenome{{ + Name: cgnames[i], + Variants: tilelib.compactGenomes[cgnames[i]], + }}}) + if err != nil { + errs <- err + return + } + } tvs := []TileVariant{} - for tag := start; tag < len(tilelib.variant) && ctx.Err() == nil; tag += nfiles { + for tag := start; tag < len(tilelib.variant) && ctx.Err() == nil; tag += ntilefiles { tvs = tvs[:0] for idx, hash := range tilelib.variant[tag] { tvs = append(tvs, TileVariant{ Tag: tagID(tag), Variant: tileVariantID(idx + 1), Blake2b: hash, - Sequence: tilelib.seq[hash], + Sequence: tilelib.hashSequence(hash), }) } err := encoders[start].Encode(LibraryEntry{TileVariants: tvs}) @@ -373,6 +433,7 @@ func (tilelib *tileLibrary) WriteDir(dir string) error { return err } } + log.Info("WriteDir: flushing") for i := range zws { err := zws[i].Close() if err != nil { @@ -387,6 +448,7 @@ func (tilelib *tileLibrary) WriteDir(dir string) error { return err } } + log.Info("WriteDir: done") return nil } @@ -675,12 +737,31 @@ func (tilelib *tileLibrary) getRef(tag tagID, seq []byte) tileLibRef { vlock.Unlock() if tilelib.retainTileSequences && !dropSeq { - tilelib.mtx.Lock() - if tilelib.seq == nil { - tilelib.seq = map[[blake2b.Size256]byte][]byte{} + seqCopy := append([]byte(nil), seq...) + if tilelib.seq2 == nil { + tilelib.mtx.Lock() + if tilelib.seq2 == nil { + tilelib.seq2lock = map[[2]byte]sync.Locker{} + m := map[[2]byte]map[[blake2b.Size256]byte][]byte{} + var k [2]byte + for i := 0; i < 256; i++ { + k[0] = byte(i) + for j := 0; j < 256; j++ { + k[1] = byte(j) + m[k] = map[[blake2b.Size256]byte][]byte{} + tilelib.seq2lock[k] = &sync.Mutex{} + } + } + tilelib.seq2 = m + } + tilelib.mtx.Unlock() } - tilelib.seq[seqhash] = append([]byte(nil), seq...) - tilelib.mtx.Unlock() + var k [2]byte + copy(k[:], seqhash[:]) + locker := tilelib.seq2lock[k] + locker.Lock() + tilelib.seq2[k][seqhash] = seqCopy + locker.Unlock() } if tilelib.encoder != nil { @@ -701,11 +782,17 @@ func (tilelib *tileLibrary) getRef(tag tagID, seq []byte) tileLibRef { return tileLibRef{Tag: tag, Variant: variant} } +func (tilelib *tileLibrary) hashSequence(hash [blake2b.Size256]byte) []byte { + var partition [2]byte + copy(partition[:], hash[:]) + return tilelib.seq2[partition][hash] +} + func (tilelib *tileLibrary) TileVariantSequence(libref tileLibRef) []byte { if libref.Variant == 0 || len(tilelib.variant) <= int(libref.Tag) || len(tilelib.variant[libref.Tag]) < int(libref.Variant) { return nil } - return tilelib.seq[tilelib.variant[libref.Tag][libref.Variant-1]] + return tilelib.hashSequence(tilelib.variant[libref.Tag][libref.Variant-1]) } // Tidy deletes unreferenced tile variants and renumbers variants so