X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/b002129afda08bbb4fdbed6e629858a5c298c068..cb9c8f800495b8a499348d931326a06f020b8ffa:/services/keep-balance/balance.go diff --git a/services/keep-balance/balance.go b/services/keep-balance/balance.go index 7471aa6cb0..0c4dd5ba4c 100644 --- a/services/keep-balance/balance.go +++ b/services/keep-balance/balance.go @@ -1,10 +1,16 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + package main import ( "fmt" "log" + "math" "os" "runtime" + "sort" "strings" "sync" "time" @@ -49,11 +55,17 @@ type Balancer struct { } // Run performs a balance operation using the given config and -// runOptions. It should only be called once on a given Balancer -// object. Typical usage: +// runOptions, and returns RunOptions suitable for passing to a +// subsequent balance operation. +// +// Run should only be called once on a given Balancer object. +// +// Typical usage: // -// err = (&Balancer{}).Run(config, runOptions) -func (bal *Balancer) Run(config Config, runOptions RunOptions) (err error) { +// runOptions, err = (&Balancer{}).Run(config, runOptions) +func (bal *Balancer) Run(config Config, runOptions RunOptions) (nextRunOptions RunOptions, err error) { + nextRunOptions = runOptions + bal.Dumper = runOptions.Dumper bal.Logger = runOptions.Logger if bal.Logger == nil { @@ -74,12 +86,22 @@ func (bal *Balancer) Run(config Config, runOptions RunOptions) (err error) { if err = bal.CheckSanityEarly(&config.Client); err != nil { return } - if runOptions.CommitTrash { + rs := bal.rendezvousState() + if runOptions.CommitTrash && rs != runOptions.SafeRendezvousState { + if runOptions.SafeRendezvousState != "" { + bal.logf("notice: KeepServices list has changed since last run") + } + bal.logf("clearing existing trash lists, in case the new rendezvous order differs from previous run") if err = bal.ClearTrashLists(&config.Client); err != nil { return } + // The current rendezvous state becomes "safe" (i.e., + // OK to compute changes for that state without + // clearing existing trash lists) only now, after we + // succeed in clearing existing trash lists. + nextRunOptions.SafeRendezvousState = rs } - if err = bal.GetCurrentState(&config.Client); err != nil { + if err = bal.GetCurrentState(&config.Client, config.CollectionBatchSize, config.CollectionBuffers); err != nil { return } bal.ComputeChangeSets() @@ -157,6 +179,17 @@ func (bal *Balancer) CheckSanityEarly(c *arvados.Client) error { return nil } +// rendezvousState returns a fingerprint (e.g., a sorted list of +// UUID+host+port) of the current set of keep services. +func (bal *Balancer) rendezvousState() string { + srvs := make([]string, 0, len(bal.KeepServices)) + for _, srv := range bal.KeepServices { + srvs = append(srvs, srv.String()) + } + sort.Strings(srvs) + return strings.Join(srvs, "; ") +} + // ClearTrashLists sends an empty trash list to each keep // service. Calling this before GetCurrentState avoids races. // @@ -189,7 +222,7 @@ func (bal *Balancer) ClearTrashLists(c *arvados.Client) error { // collection manifests in the database (API server). // // It encodes the resulting information in BlockStateMap. -func (bal *Balancer) GetCurrentState(c *arvados.Client) error { +func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) error { defer timeMe(bal.Logger, "GetCurrentState")() bal.BlockStateMap = NewBlockStateMap() @@ -198,7 +231,7 @@ func (bal *Balancer) GetCurrentState(c *arvados.Client) error { return err } bal.DefaultReplication = dd.DefaultCollectionReplication - bal.MinMtime = time.Now().Unix() - dd.BlobSignatureTTL + bal.MinMtime = time.Now().UnixNano() - dd.BlobSignatureTTL*1e9 errs := make(chan error, 2+len(bal.KeepServices)) wg := sync.WaitGroup{} @@ -215,6 +248,12 @@ func (bal *Balancer) GetCurrentState(c *arvados.Client) error { errs <- fmt.Errorf("%s: %v", srv, err) return } + if len(errs) > 0 { + // Some other goroutine encountered an + // error -- any further effort here + // will be wasted. + return + } bal.logf("%s: add %d replicas to map", srv, len(idx)) bal.BlockStateMap.AddReplicas(srv, idx) bal.logf("%s: done", srv) @@ -223,10 +262,8 @@ func (bal *Balancer) GetCurrentState(c *arvados.Client) error { // collQ buffers incoming collections so we can start fetching // the next page without waiting for the current page to - // finish processing. (1000 happens to match the page size - // used by (*arvados.Client)EachCollection(), but it's OK if - // they don't match.) - collQ := make(chan arvados.Collection, 1000) + // finish processing. + collQ := make(chan arvados.Collection, bufs) // Start a goroutine to process collections. (We could use a // worker pool here, but even with a single worker we already @@ -251,7 +288,7 @@ func (bal *Balancer) GetCurrentState(c *arvados.Client) error { wg.Add(1) go func() { defer wg.Done() - err = EachCollection(c, + err = EachCollection(c, pageSize, func(coll arvados.Collection) error { collQ <- coll if len(errs) > 0 { @@ -271,14 +308,11 @@ func (bal *Balancer) GetCurrentState(c *arvados.Client) error { } }() - go func() { - // Send a nil error when all goroutines finish. If - // this is the first error sent to errs, then - // everything worked. - wg.Wait() - errs <- nil - }() - return <-errs + wg.Wait() + if len(errs) > 0 { + return <-errs + } + return nil } func (bal *Balancer) addCollection(coll arvados.Collection) error { @@ -447,9 +481,11 @@ type balancerStats struct { lost, overrep, unref, garbage, underrep, justright blocksNBytes desired, current blocksNBytes pulls, trashes int + replHistogram []int } func (bal *Balancer) getStatistics() (s balancerStats) { + s.replHistogram = make([]int, 2) bal.BlockStateMap.Apply(func(blkid arvados.SizedDigest, blk *BlockState) { surplus := len(blk.Replicas) - blk.Desired bytes := blkid.Size() @@ -493,6 +529,11 @@ func (bal *Balancer) getStatistics() (s balancerStats) { s.current.blocks++ s.current.bytes += bytes * int64(len(blk.Replicas)) } + + for len(s.replHistogram) <= len(blk.Replicas) { + s.replHistogram = append(s.replHistogram, 0) + } + s.replHistogram[len(blk.Replicas)]++ }) for _, srv := range bal.KeepServices { s.pulls += len(srv.ChangeSet.Pulls) @@ -521,6 +562,25 @@ func (bal *Balancer) PrintStatistics() { bal.logf("%s: %v\n", srv, srv.ChangeSet) } bal.logf("===") + bal.printHistogram(s, 60) + bal.logf("===") +} + +func (bal *Balancer) printHistogram(s balancerStats, hashColumns int) { + bal.logf("Replication level distribution (counting N replicas on a single server as N):") + maxCount := 0 + for _, count := range s.replHistogram { + if maxCount < count { + maxCount = count + } + } + hashes := strings.Repeat("#", hashColumns) + countWidth := 1 + int(math.Log10(float64(maxCount+1))) + scaleCount := 10 * float64(hashColumns) / math.Floor(1+10*math.Log10(float64(maxCount+1))) + for repl, count := range s.replHistogram { + nHashes := int(scaleCount * math.Log10(float64(count+1))) + bal.logf("%2d: %*d %s", repl, countWidth, count, hashes[:nHashes]) + } } // CheckSanityLate checks for configuration and runtime errors after @@ -594,7 +654,7 @@ func (bal *Balancer) commitAsync(c *arvados.Client, label string, f func(srv *Ke }(srv) } var lastErr error - for _ = range bal.KeepServices { + for range bal.KeepServices { if err := <-errs; err != nil { bal.logf("%v", err) lastErr = err