import (
"fmt"
"log"
+ "math"
"os"
"runtime"
+ "sort"
"strings"
"sync"
"time"
}
// Run performs a balance operation using the given config and
-// runOptions. It should only be called once on a given Balancer
-// object. Typical usage:
+// runOptions, and returns RunOptions suitable for passing to a
+// subsequent balance operation.
//
-// err = (&Balancer{}).Run(config, runOptions)
-func (bal *Balancer) Run(config Config, runOptions RunOptions) (err error) {
+// Run should only be called once on a given Balancer object.
+//
+// Typical usage:
+//
+// runOptions, err = (&Balancer{}).Run(config, runOptions)
+func (bal *Balancer) Run(config Config, runOptions RunOptions) (nextRunOptions RunOptions, err error) {
+ nextRunOptions = runOptions
+
bal.Dumper = runOptions.Dumper
bal.Logger = runOptions.Logger
if bal.Logger == nil {
if err = bal.CheckSanityEarly(&config.Client); err != nil {
return
}
- if runOptions.CommitTrash {
+ rs := bal.rendezvousState()
+ if runOptions.CommitTrash && rs != runOptions.SafeRendezvousState {
+ if runOptions.SafeRendezvousState != "" {
+ bal.logf("notice: KeepServices list has changed since last run")
+ }
+ bal.logf("clearing existing trash lists, in case the new rendezvous order differs from previous run")
if err = bal.ClearTrashLists(&config.Client); err != nil {
return
}
+ // The current rendezvous state becomes "safe" (i.e.,
+ // OK to compute changes for that state without
+ // clearing existing trash lists) only now, after we
+ // succeed in clearing existing trash lists.
+ nextRunOptions.SafeRendezvousState = rs
}
- if err = bal.GetCurrentState(&config.Client); err != nil {
+ if err = bal.GetCurrentState(&config.Client, config.CollectionBatchSize, config.CollectionBuffers); err != nil {
return
}
bal.ComputeChangeSets()
return nil
}
+// rendezvousState returns a fingerprint (e.g., a sorted list of
+// UUID+host+port) of the current set of keep services.
+func (bal *Balancer) rendezvousState() string {
+ srvs := make([]string, 0, len(bal.KeepServices))
+ for _, srv := range bal.KeepServices {
+ srvs = append(srvs, srv.String())
+ }
+ sort.Strings(srvs)
+ return strings.Join(srvs, "; ")
+}
+
// ClearTrashLists sends an empty trash list to each keep
// service. Calling this before GetCurrentState avoids races.
//
// collection manifests in the database (API server).
//
// It encodes the resulting information in BlockStateMap.
-func (bal *Balancer) GetCurrentState(c *arvados.Client) error {
+func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) error {
defer timeMe(bal.Logger, "GetCurrentState")()
bal.BlockStateMap = NewBlockStateMap()
return err
}
bal.DefaultReplication = dd.DefaultCollectionReplication
- bal.MinMtime = time.Now().Unix() - dd.BlobSignatureTTL
+ bal.MinMtime = time.Now().UnixNano() - dd.BlobSignatureTTL*1e9
errs := make(chan error, 2+len(bal.KeepServices))
wg := sync.WaitGroup{}
errs <- fmt.Errorf("%s: %v", srv, err)
return
}
+ if len(errs) > 0 {
+ // Some other goroutine encountered an
+ // error -- any further effort here
+ // will be wasted.
+ return
+ }
bal.logf("%s: add %d replicas to map", srv, len(idx))
bal.BlockStateMap.AddReplicas(srv, idx)
bal.logf("%s: done", srv)
// collQ buffers incoming collections so we can start fetching
// the next page without waiting for the current page to
- // finish processing. (1000 happens to match the page size
- // used by (*arvados.Client)EachCollection(), but it's OK if
- // they don't match.)
- collQ := make(chan arvados.Collection, 1000)
+ // finish processing.
+ collQ := make(chan arvados.Collection, bufs)
// Start a goroutine to process collections. (We could use a
// worker pool here, but even with a single worker we already
wg.Add(1)
go func() {
defer wg.Done()
- err = EachCollection(c,
+ err = EachCollection(c, pageSize,
func(coll arvados.Collection) error {
collQ <- coll
if len(errs) > 0 {
}
}()
- go func() {
- // Send a nil error when all goroutines finish. If
- // this is the first error sent to errs, then
- // everything worked.
- wg.Wait()
- errs <- nil
- }()
- return <-errs
+ wg.Wait()
+ if len(errs) > 0 {
+ return <-errs
+ }
+ return nil
}
func (bal *Balancer) addCollection(coll arvados.Collection) error {
lost, overrep, unref, garbage, underrep, justright blocksNBytes
desired, current blocksNBytes
pulls, trashes int
+ replHistogram []int
}
func (bal *Balancer) getStatistics() (s balancerStats) {
+ s.replHistogram = make([]int, 2)
bal.BlockStateMap.Apply(func(blkid arvados.SizedDigest, blk *BlockState) {
surplus := len(blk.Replicas) - blk.Desired
bytes := blkid.Size()
s.current.blocks++
s.current.bytes += bytes * int64(len(blk.Replicas))
}
+
+ for len(s.replHistogram) <= len(blk.Replicas) {
+ s.replHistogram = append(s.replHistogram, 0)
+ }
+ s.replHistogram[len(blk.Replicas)]++
})
for _, srv := range bal.KeepServices {
s.pulls += len(srv.ChangeSet.Pulls)
bal.logf("%s: %v\n", srv, srv.ChangeSet)
}
bal.logf("===")
+ bal.printHistogram(s, 60)
+ bal.logf("===")
+}
+
+func (bal *Balancer) printHistogram(s balancerStats, hashColumns int) {
+ bal.logf("Replication level distribution (counting N replicas on a single server as N):")
+ maxCount := 0
+ for _, count := range s.replHistogram {
+ if maxCount < count {
+ maxCount = count
+ }
+ }
+ hashes := strings.Repeat("#", hashColumns)
+ countWidth := 1 + int(math.Log10(float64(maxCount+1)))
+ scaleCount := 10 * float64(hashColumns) / math.Floor(1+10*math.Log10(float64(maxCount+1)))
+ for repl, count := range s.replHistogram {
+ nHashes := int(scaleCount * math.Log10(float64(count+1)))
+ bal.logf("%2d: %*d %s", repl, countWidth, count, hashes[:nHashes])
+ }
}
// CheckSanityLate checks for configuration and runtime errors after
}(srv)
}
var lastErr error
- for _ = range bal.KeepServices {
+ for range bal.KeepServices {
if err := <-errs; err != nil {
bal.logf("%v", err)
lastErr = err