From da40bd0960806df8e2799e4fb716d41ad08b169f Mon Sep 17 00:00:00 2001 From: Tom Clegg Date: Wed, 6 Jun 2018 17:01:53 -0400 Subject: [PATCH] 13427: Use same index result N times if a device is mounted N times. Arvados-DCO-1.1-Signed-off-by: Tom Clegg --- services/keep-balance/balance.go | 62 +++++++++++++++++++++----------- 1 file changed, 42 insertions(+), 20 deletions(-) diff --git a/services/keep-balance/balance.go b/services/keep-balance/balance.go index 328e623422..c7fa882303 100644 --- a/services/keep-balance/balance.go +++ b/services/keep-balance/balance.go @@ -274,32 +274,54 @@ func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) erro errs := make(chan error, 2+len(bal.KeepServices)) wg := sync.WaitGroup{} - // Start one goroutine for each KeepService: retrieve the - // index, and add the returned blocks to BlockStateMap. + // When a device is mounted more than once, we will get its + // index only once, and call AddReplicas on all of the mounts. + // equivMount keys are the mounts that will be indexed, and + // each value is a list of mounts to apply the received index + // to. + equivMount := map[*KeepMount][]*KeepMount{} + // deviceMount maps each device ID to the one mount that will + // be indexed for that device. + deviceMount := map[string]*KeepMount{} for _, srv := range bal.KeepServices { + for _, mnt := range srv.mounts { + equiv := deviceMount[mnt.DeviceID] + if equiv == nil { + equiv = mnt + if mnt.DeviceID != "" { + deviceMount[mnt.DeviceID] = equiv + } + } + equivMount[equiv] = append(equivMount[equiv], mnt) + } + } + + // Start one goroutine for each (non-redundant) mount: + // retrieve the index, and add the returned blocks to + // BlockStateMap. + for _, mounts := range equivMount { wg.Add(1) - go func(srv *KeepService) { + go func(mounts []*KeepMount) { defer wg.Done() - bal.logf("%s: retrieve indexes", srv) - for _, mount := range srv.mounts { - bal.logf("%s: retrieve index", mount) - idx, err := srv.IndexMount(c, mount.UUID, "") - if err != nil { - errs <- fmt.Errorf("%s: retrieve index: %v", mount, err) - return - } - if len(errs) > 0 { - // Some other goroutine encountered an - // error -- any further effort here - // will be wasted. - return - } + bal.logf("mount %s: retrieve index from %s", mounts[0], mounts[0].KeepService) + idx, err := mounts[0].KeepService.IndexMount(c, mounts[0].UUID, "") + if err != nil { + errs <- fmt.Errorf("%s: retrieve index: %v", mounts[0], err) + return + } + if len(errs) > 0 { + // Some other goroutine encountered an + // error -- any further effort here + // will be wasted. + return + } + for _, mount := range mounts { bal.logf("%s: add %d replicas to map", mount, len(idx)) bal.BlockStateMap.AddReplicas(mount, idx) - bal.logf("%s: done", mount) + bal.logf("%s: added %d replicas", mount, len(idx)) } - bal.logf("%s: done", srv) - }(srv) + bal.logf("mount %s: index done", mounts[0]) + }(mounts) } // collQ buffers incoming collections so we can start fetching -- 2.30.2