Improve loading concurrency.
authorTom Clegg <tom@tomclegg.ca>
Thu, 17 Jun 2021 20:14:48 +0000 (16:14 -0400)
committerTom Clegg <tom@tomclegg.ca>
Thu, 17 Jun 2021 20:14:48 +0000 (16:14 -0400)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curii.com>

tilelib.go

index f0507f36f852efb3f6180114111d82af0b4b87ac..3b5b4cfac0a9ce87dde1d5ecddb1a13db4825320 100644 (file)
@@ -246,17 +246,14 @@ func (tilelib *tileLibrary) LoadDir(ctx context.Context, path string, onLoadGeno
        ctx, cancel := context.WithCancel(ctx)
        defer cancel()
        var mtx sync.Mutex
-       cgs := []CompactGenome{}
-       cseqs := []CompactSequence{}
-       variantmap := map[tileLibRef]tileVariantID{}
+       allcgs := make([][]CompactGenome, len(files))
+       allcseqs := make([][]CompactSequence, len(files))
+       allvariantmap := make([]map[tileLibRef]tileVariantID, len(files))
        errs := make(chan error, len(files))
        log.Infof("LoadDir: read %d files", len(files))
-       throttle := throttle{Max: runtime.GOMAXPROCS(0)}
-       for _, path := range files {
-               path := path
+       for fileno, path := range files {
+               fileno, path := fileno, path
                go func() {
-                       throttle.Acquire()
-                       defer throttle.Release()
                        f, err := open(path)
                        if err != nil {
                                errs <- err
@@ -264,6 +261,10 @@ func (tilelib *tileLibrary) LoadDir(ctx context.Context, path string, onLoadGeno
                        }
                        defer f.Close()
                        defer log.Infof("LoadDir: finished reading %s", path)
+
+                       var variantmap = map[tileLibRef]tileVariantID{}
+                       var cgs []CompactGenome
+                       var cseqs []CompactSequence
                        errs <- DecodeLibrary(f, strings.HasSuffix(path, ".gz"), func(ent *LibraryEntry) error {
                                if ctx.Err() != nil {
                                        return ctx.Err()
@@ -281,19 +282,16 @@ func (tilelib *tileLibrary) LoadDir(ctx context.Context, path string, onLoadGeno
                                        }
                                        mtx.Unlock()
                                }
-                               variantmapadd := map[tileLibRef]tileVariantID{}
                                for _, tv := range ent.TileVariants {
-                                       variantmapadd[tileLibRef{Tag: tv.Tag, Variant: tv.Variant}] = tilelib.getRef(tv.Tag, tv.Sequence).Variant
+                                       variantmap[tileLibRef{Tag: tv.Tag, Variant: tv.Variant}] = tilelib.getRef(tv.Tag, tv.Sequence).Variant
                                }
-                               mtx.Lock()
                                cgs = append(cgs, ent.CompactGenomes...)
                                cseqs = append(cseqs, ent.CompactSequences...)
-                               for k, v := range variantmapadd {
-                                       variantmap[k] = v
-                               }
-                               mtx.Unlock()
                                return nil
                        })
+                       allcgs[fileno] = cgs
+                       allcseqs[fileno] = cseqs
+                       allvariantmap[fileno] = variantmap
                }()
        }
        for range files {
@@ -302,15 +300,26 @@ func (tilelib *tileLibrary) LoadDir(ctx context.Context, path string, onLoadGeno
                        return err
                }
        }
+       log.Info("LoadDir: merge variantmap")
+       variantmap := map[tileLibRef]tileVariantID{}
+       for _, m := range allvariantmap {
+               for k, v := range m {
+                       variantmap[k] = v
+               }
+       }
        log.Info("LoadDir: loadCompactGenomes")
-       err = tilelib.loadCompactGenomes(cgs, variantmap, onLoadGenome)
-       if err != nil {
-               return err
+       for _, cgs := range allcgs {
+               err = tilelib.loadCompactGenomes(cgs, variantmap, onLoadGenome)
+               if err != nil {
+                       return err
+               }
        }
        log.Info("LoadDir: loadCompactSequences")
-       err = tilelib.loadCompactSequences(cseqs, variantmap)
-       if err != nil {
-               return err
+       for _, cseqs := range allcseqs {
+               err = tilelib.loadCompactSequences(cseqs, variantmap)
+               if err != nil {
+                       return err
+               }
        }
        log.Info("LoadDir done")
        return nil