13427: Use same index result N times if a device is mounted N times.
authorTom Clegg <tclegg@veritasgenetics.com>
Wed, 6 Jun 2018 21:01:53 +0000 (17:01 -0400)
committerTom Clegg <tclegg@veritasgenetics.com>
Wed, 6 Jun 2018 21:01:53 +0000 (17:01 -0400)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg@veritasgenetics.com>

services/keep-balance/balance.go

index 328e6234221f157f8ac18be525b5460d3a0ada93..c7fa88230307e3eb47d06fac094b4768f3c7e2c8 100644 (file)
@@ -274,32 +274,54 @@ func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) erro
        errs := make(chan error, 2+len(bal.KeepServices))
        wg := sync.WaitGroup{}
 
-       // Start one goroutine for each KeepService: retrieve the
-       // index, and add the returned blocks to BlockStateMap.
+       // When a device is mounted more than once, we will get its
+       // index only once, and call AddReplicas on all of the mounts.
+       // equivMount keys are the mounts that will be indexed, and
+       // each value is a list of mounts to apply the received index
+       // to.
+       equivMount := map[*KeepMount][]*KeepMount{}
+       // deviceMount maps each device ID to the one mount that will
+       // be indexed for that device.
+       deviceMount := map[string]*KeepMount{}
        for _, srv := range bal.KeepServices {
+               for _, mnt := range srv.mounts {
+                       equiv := deviceMount[mnt.DeviceID]
+                       if equiv == nil {
+                               equiv = mnt
+                               if mnt.DeviceID != "" {
+                                       deviceMount[mnt.DeviceID] = equiv
+                               }
+                       }
+                       equivMount[equiv] = append(equivMount[equiv], mnt)
+               }
+       }
+
+       // Start one goroutine for each (non-redundant) mount:
+       // retrieve the index, and add the returned blocks to
+       // BlockStateMap.
+       for _, mounts := range equivMount {
                wg.Add(1)
-               go func(srv *KeepService) {
+               go func(mounts []*KeepMount) {
                        defer wg.Done()
-                       bal.logf("%s: retrieve indexes", srv)
-                       for _, mount := range srv.mounts {
-                               bal.logf("%s: retrieve index", mount)
-                               idx, err := srv.IndexMount(c, mount.UUID, "")
-                               if err != nil {
-                                       errs <- fmt.Errorf("%s: retrieve index: %v", mount, err)
-                                       return
-                               }
-                               if len(errs) > 0 {
-                                       // Some other goroutine encountered an
-                                       // error -- any further effort here
-                                       // will be wasted.
-                                       return
-                               }
+                       bal.logf("mount %s: retrieve index from %s", mounts[0], mounts[0].KeepService)
+                       idx, err := mounts[0].KeepService.IndexMount(c, mounts[0].UUID, "")
+                       if err != nil {
+                               errs <- fmt.Errorf("%s: retrieve index: %v", mounts[0], err)
+                               return
+                       }
+                       if len(errs) > 0 {
+                               // Some other goroutine encountered an
+                               // error -- any further effort here
+                               // will be wasted.
+                               return
+                       }
+                       for _, mount := range mounts {
                                bal.logf("%s: add %d replicas to map", mount, len(idx))
                                bal.BlockStateMap.AddReplicas(mount, idx)
-                               bal.logf("%s: done", mount)
+                               bal.logf("%s: added %d replicas", mount, len(idx))
                        }
-                       bal.logf("%s: done", srv)
-               }(srv)
+                       bal.logf("mount %s: index done", mounts[0])
+               }(mounts)
        }
 
        // collQ buffers incoming collections so we can start fetching