From b2b4bab3f8532b626884652c37616c33b15ca788 Mon Sep 17 00:00:00 2001 From: Tom Clegg Date: Thu, 17 Jun 2021 16:14:48 -0400 Subject: [PATCH] Improve loading concurrency. Arvados-DCO-1.1-Signed-off-by: Tom Clegg --- tilelib.go | 51 ++++++++++++++++++++++++++++++--------------------- 1 file changed, 30 insertions(+), 21 deletions(-) diff --git a/tilelib.go b/tilelib.go index f0507f36f8..3b5b4cfac0 100644 --- a/tilelib.go +++ b/tilelib.go @@ -246,17 +246,14 @@ func (tilelib *tileLibrary) LoadDir(ctx context.Context, path string, onLoadGeno ctx, cancel := context.WithCancel(ctx) defer cancel() var mtx sync.Mutex - cgs := []CompactGenome{} - cseqs := []CompactSequence{} - variantmap := map[tileLibRef]tileVariantID{} + allcgs := make([][]CompactGenome, len(files)) + allcseqs := make([][]CompactSequence, len(files)) + allvariantmap := make([]map[tileLibRef]tileVariantID, len(files)) errs := make(chan error, len(files)) log.Infof("LoadDir: read %d files", len(files)) - throttle := throttle{Max: runtime.GOMAXPROCS(0)} - for _, path := range files { - path := path + for fileno, path := range files { + fileno, path := fileno, path go func() { - throttle.Acquire() - defer throttle.Release() f, err := open(path) if err != nil { errs <- err @@ -264,6 +261,10 @@ func (tilelib *tileLibrary) LoadDir(ctx context.Context, path string, onLoadGeno } defer f.Close() defer log.Infof("LoadDir: finished reading %s", path) + + var variantmap = map[tileLibRef]tileVariantID{} + var cgs []CompactGenome + var cseqs []CompactSequence errs <- DecodeLibrary(f, strings.HasSuffix(path, ".gz"), func(ent *LibraryEntry) error { if ctx.Err() != nil { return ctx.Err() @@ -281,19 +282,16 @@ func (tilelib *tileLibrary) LoadDir(ctx context.Context, path string, onLoadGeno } mtx.Unlock() } - variantmapadd := map[tileLibRef]tileVariantID{} for _, tv := range ent.TileVariants { - variantmapadd[tileLibRef{Tag: tv.Tag, Variant: tv.Variant}] = tilelib.getRef(tv.Tag, tv.Sequence).Variant + variantmap[tileLibRef{Tag: tv.Tag, Variant: tv.Variant}] = tilelib.getRef(tv.Tag, tv.Sequence).Variant } - mtx.Lock() cgs = append(cgs, ent.CompactGenomes...) cseqs = append(cseqs, ent.CompactSequences...) - for k, v := range variantmapadd { - variantmap[k] = v - } - mtx.Unlock() return nil }) + allcgs[fileno] = cgs + allcseqs[fileno] = cseqs + allvariantmap[fileno] = variantmap }() } for range files { @@ -302,15 +300,26 @@ func (tilelib *tileLibrary) LoadDir(ctx context.Context, path string, onLoadGeno return err } } + log.Info("LoadDir: merge variantmap") + variantmap := map[tileLibRef]tileVariantID{} + for _, m := range allvariantmap { + for k, v := range m { + variantmap[k] = v + } + } log.Info("LoadDir: loadCompactGenomes") - err = tilelib.loadCompactGenomes(cgs, variantmap, onLoadGenome) - if err != nil { - return err + for _, cgs := range allcgs { + err = tilelib.loadCompactGenomes(cgs, variantmap, onLoadGenome) + if err != nil { + return err + } } log.Info("LoadDir: loadCompactSequences") - err = tilelib.loadCompactSequences(cseqs, variantmap) - if err != nil { - return err + for _, cseqs := range allcseqs { + err = tilelib.loadCompactSequences(cseqs, variantmap) + if err != nil { + return err + } } log.Info("LoadDir done") return nil -- 2.30.2