From 46a33e285d6179ebd5041733f98949a23147d55d Mon Sep 17 00:00:00 2001 From: Tom Clegg Date: Wed, 28 Jul 2021 00:19:03 -0400 Subject: [PATCH] 17574: Process collections in a worker pool. Arvados-DCO-1.1-Signed-off-by: Tom Clegg --- services/keep-balance/balance.go | 53 +++++++++++++++----------------- 1 file changed, 25 insertions(+), 28 deletions(-) diff --git a/services/keep-balance/balance.go b/services/keep-balance/balance.go index 67021bef36..e69d941b1e 100644 --- a/services/keep-balance/balance.go +++ b/services/keep-balance/balance.go @@ -18,6 +18,7 @@ import ( "sort" "strings" "sync" + "sync/atomic" "syscall" "time" @@ -52,7 +53,7 @@ type Balancer struct { classes []string mounts int mountsByClass map[string]map[*KeepMount]bool - collScanned int + collScanned int64 serviceRoots map[string]string errors []error stats balancerStats @@ -399,35 +400,10 @@ func (bal *Balancer) GetCurrentState(ctx context.Context, c *arvados.Client, pag }(mounts) } - // collQ buffers incoming collections so we can start fetching - // the next page without waiting for the current page to - // finish processing. collQ := make(chan arvados.Collection, bufs) - // Start a goroutine to process collections. (We could use a - // worker pool here, but even with a single worker we already - // process collections much faster than we can retrieve them.) - wg.Add(1) - go func() { - defer wg.Done() - for coll := range collQ { - err := bal.addCollection(coll) - if err != nil || len(errs) > 0 { - select { - case errs <- err: - default: - } - for range collQ { - } - cancel() - return - } - bal.collScanned++ - } - }() - - // Start a goroutine to retrieve all collections from the - // Arvados database and send them to collQ for processing. + // Retrieve all collections from the database and send them to + // collQ. wg.Add(1) go func() { defer wg.Done() @@ -455,6 +431,27 @@ func (bal *Balancer) GetCurrentState(ctx context.Context, c *arvados.Client, pag } }() + // Parse manifests from collQ and pass the block hashes to + // BlockStateMap to track desired replication. + for i := 0; i < runtime.NumCPU(); i++ { + wg.Add(1) + go func() { + defer wg.Done() + for coll := range collQ { + err := bal.addCollection(coll) + if err != nil || len(errs) > 0 { + select { + case errs <- err: + default: + } + cancel() + continue + } + atomic.AddInt64(&bal.collScanned, 1) + } + }() + } + wg.Wait() if len(errs) > 0 { return <-errs -- 2.30.2