9442: Fix runtime.outdir for containers.
[arvados.git] / services / keep-balance / balance.go
index 7471aa6cb06d084416615c8a9eb975d231098dc5..2d1a59e8909bc250b2ca995775496f1839adf9f9 100644 (file)
@@ -3,6 +3,7 @@ package main
 import (
        "fmt"
        "log"
+       "math"
        "os"
        "runtime"
        "strings"
@@ -79,7 +80,7 @@ func (bal *Balancer) Run(config Config, runOptions RunOptions) (err error) {
                        return
                }
        }
-       if err = bal.GetCurrentState(&config.Client); err != nil {
+       if err = bal.GetCurrentState(&config.Client, config.CollectionBatchSize, config.CollectionBuffers); err != nil {
                return
        }
        bal.ComputeChangeSets()
@@ -189,7 +190,7 @@ func (bal *Balancer) ClearTrashLists(c *arvados.Client) error {
 // collection manifests in the database (API server).
 //
 // It encodes the resulting information in BlockStateMap.
-func (bal *Balancer) GetCurrentState(c *arvados.Client) error {
+func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) error {
        defer timeMe(bal.Logger, "GetCurrentState")()
        bal.BlockStateMap = NewBlockStateMap()
 
@@ -223,10 +224,8 @@ func (bal *Balancer) GetCurrentState(c *arvados.Client) error {
 
        // collQ buffers incoming collections so we can start fetching
        // the next page without waiting for the current page to
-       // finish processing. (1000 happens to match the page size
-       // used by (*arvados.Client)EachCollection(), but it's OK if
-       // they don't match.)
-       collQ := make(chan arvados.Collection, 1000)
+       // finish processing.
+       collQ := make(chan arvados.Collection, bufs)
 
        // Start a goroutine to process collections. (We could use a
        // worker pool here, but even with a single worker we already
@@ -251,7 +250,7 @@ func (bal *Balancer) GetCurrentState(c *arvados.Client) error {
        wg.Add(1)
        go func() {
                defer wg.Done()
-               err = EachCollection(c,
+               err = EachCollection(c, pageSize,
                        func(coll arvados.Collection) error {
                                collQ <- coll
                                if len(errs) > 0 {
@@ -447,9 +446,11 @@ type balancerStats struct {
        lost, overrep, unref, garbage, underrep, justright blocksNBytes
        desired, current                                   blocksNBytes
        pulls, trashes                                     int
+       replHistogram                                      []int
 }
 
 func (bal *Balancer) getStatistics() (s balancerStats) {
+       s.replHistogram = make([]int, 2)
        bal.BlockStateMap.Apply(func(blkid arvados.SizedDigest, blk *BlockState) {
                surplus := len(blk.Replicas) - blk.Desired
                bytes := blkid.Size()
@@ -493,6 +494,11 @@ func (bal *Balancer) getStatistics() (s balancerStats) {
                        s.current.blocks++
                        s.current.bytes += bytes * int64(len(blk.Replicas))
                }
+
+               for len(s.replHistogram) <= len(blk.Replicas) {
+                       s.replHistogram = append(s.replHistogram, 0)
+               }
+               s.replHistogram[len(blk.Replicas)]++
        })
        for _, srv := range bal.KeepServices {
                s.pulls += len(srv.ChangeSet.Pulls)
@@ -521,6 +527,25 @@ func (bal *Balancer) PrintStatistics() {
                bal.logf("%s: %v\n", srv, srv.ChangeSet)
        }
        bal.logf("===")
+       bal.printHistogram(s, 60)
+       bal.logf("===")
+}
+
+func (bal *Balancer) printHistogram(s balancerStats, hashColumns int) {
+       bal.logf("Replication level distribution (counting N replicas on a single server as N):")
+       maxCount := 0
+       for _, count := range s.replHistogram {
+               if maxCount < count {
+                       maxCount = count
+               }
+       }
+       hashes := strings.Repeat("#", hashColumns)
+       countWidth := 1 + int(math.Log10(float64(maxCount+1)))
+       scaleCount := 10 * float64(hashColumns) / math.Floor(1+10*math.Log10(float64(maxCount+1)))
+       for repl, count := range s.replHistogram {
+               nHashes := int(scaleCount * math.Log10(float64(count+1)))
+               bal.logf("%2d: %*d %s", repl, countWidth, count, hashes[:nHashes])
+       }
 }
 
 // CheckSanityLate checks for configuration and runtime errors after