X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/b002129afda08bbb4fdbed6e629858a5c298c068..a9f1adf0a3e2df296ce0a8c0a1d735b7e5044baa:/services/keep-balance/balance.go diff --git a/services/keep-balance/balance.go b/services/keep-balance/balance.go index 7471aa6cb0..2d1a59e890 100644 --- a/services/keep-balance/balance.go +++ b/services/keep-balance/balance.go @@ -3,6 +3,7 @@ package main import ( "fmt" "log" + "math" "os" "runtime" "strings" @@ -79,7 +80,7 @@ func (bal *Balancer) Run(config Config, runOptions RunOptions) (err error) { return } } - if err = bal.GetCurrentState(&config.Client); err != nil { + if err = bal.GetCurrentState(&config.Client, config.CollectionBatchSize, config.CollectionBuffers); err != nil { return } bal.ComputeChangeSets() @@ -189,7 +190,7 @@ func (bal *Balancer) ClearTrashLists(c *arvados.Client) error { // collection manifests in the database (API server). // // It encodes the resulting information in BlockStateMap. -func (bal *Balancer) GetCurrentState(c *arvados.Client) error { +func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) error { defer timeMe(bal.Logger, "GetCurrentState")() bal.BlockStateMap = NewBlockStateMap() @@ -223,10 +224,8 @@ func (bal *Balancer) GetCurrentState(c *arvados.Client) error { // collQ buffers incoming collections so we can start fetching // the next page without waiting for the current page to - // finish processing. (1000 happens to match the page size - // used by (*arvados.Client)EachCollection(), but it's OK if - // they don't match.) - collQ := make(chan arvados.Collection, 1000) + // finish processing. + collQ := make(chan arvados.Collection, bufs) // Start a goroutine to process collections. (We could use a // worker pool here, but even with a single worker we already @@ -251,7 +250,7 @@ func (bal *Balancer) GetCurrentState(c *arvados.Client) error { wg.Add(1) go func() { defer wg.Done() - err = EachCollection(c, + err = EachCollection(c, pageSize, func(coll arvados.Collection) error { collQ <- coll if len(errs) > 0 { @@ -447,9 +446,11 @@ type balancerStats struct { lost, overrep, unref, garbage, underrep, justright blocksNBytes desired, current blocksNBytes pulls, trashes int + replHistogram []int } func (bal *Balancer) getStatistics() (s balancerStats) { + s.replHistogram = make([]int, 2) bal.BlockStateMap.Apply(func(blkid arvados.SizedDigest, blk *BlockState) { surplus := len(blk.Replicas) - blk.Desired bytes := blkid.Size() @@ -493,6 +494,11 @@ func (bal *Balancer) getStatistics() (s balancerStats) { s.current.blocks++ s.current.bytes += bytes * int64(len(blk.Replicas)) } + + for len(s.replHistogram) <= len(blk.Replicas) { + s.replHistogram = append(s.replHistogram, 0) + } + s.replHistogram[len(blk.Replicas)]++ }) for _, srv := range bal.KeepServices { s.pulls += len(srv.ChangeSet.Pulls) @@ -521,6 +527,25 @@ func (bal *Balancer) PrintStatistics() { bal.logf("%s: %v\n", srv, srv.ChangeSet) } bal.logf("===") + bal.printHistogram(s, 60) + bal.logf("===") +} + +func (bal *Balancer) printHistogram(s balancerStats, hashColumns int) { + bal.logf("Replication level distribution (counting N replicas on a single server as N):") + maxCount := 0 + for _, count := range s.replHistogram { + if maxCount < count { + maxCount = count + } + } + hashes := strings.Repeat("#", hashColumns) + countWidth := 1 + int(math.Log10(float64(maxCount+1))) + scaleCount := 10 * float64(hashColumns) / math.Floor(1+10*math.Log10(float64(maxCount+1))) + for repl, count := range s.replHistogram { + nHashes := int(scaleCount * math.Log10(float64(count+1))) + bal.logf("%2d: %*d %s", repl, countWidth, count, hashes[:nHashes]) + } } // CheckSanityLate checks for configuration and runtime errors after