14 "git.curoverse.com/arvados.git/sdk/go/arvados"
15 "git.curoverse.com/arvados.git/sdk/go/keepclient"
18 // CheckConfig returns an error if anything is wrong with the given
19 // config and runOptions.
20 func CheckConfig(config Config, runOptions RunOptions) error {
21 if len(config.KeepServiceList.Items) > 0 && config.KeepServiceTypes != nil {
22 return fmt.Errorf("cannot specify both KeepServiceList and KeepServiceTypes in config")
24 if !runOptions.Once && config.RunPeriod == arvados.Duration(0) {
25 return fmt.Errorf("you must either use the -once flag, or specify RunPeriod in config")
30 // Balancer compares the contents of keepstore servers with the
31 // collections stored in Arvados, and issues pull/trash requests
32 // needed to get (closer to) the optimal data layout.
34 // In the optimal data layout: every data block referenced by a
35 // collection is replicated at least as many times as desired by the
36 // collection; there are no unreferenced data blocks older than
37 // BlobSignatureTTL; and all N existing replicas of a given data block
38 // are in the N best positions in rendezvous probe order.
39 type Balancer struct {
41 KeepServices map[string]*KeepService
42 DefaultReplication int
48 serviceRoots map[string]string
53 // Run performs a balance operation using the given config and
54 // runOptions, and returns RunOptions suitable for passing to a
55 // subsequent balance operation.
57 // Run should only be called once on a given Balancer object.
61 // runOptions, err = (&Balancer{}).Run(config, runOptions)
62 func (bal *Balancer) Run(config Config, runOptions RunOptions) (nextRunOptions RunOptions, err error) {
63 nextRunOptions = runOptions
65 bal.Dumper = runOptions.Dumper
66 bal.Logger = runOptions.Logger
67 if bal.Logger == nil {
68 bal.Logger = log.New(os.Stderr, "", log.LstdFlags)
71 defer timeMe(bal.Logger, "Run")()
73 if len(config.KeepServiceList.Items) > 0 {
74 err = bal.SetKeepServices(config.KeepServiceList)
76 err = bal.DiscoverKeepServices(&config.Client, config.KeepServiceTypes)
82 if err = bal.CheckSanityEarly(&config.Client); err != nil {
85 rs := bal.rendezvousState()
86 if runOptions.CommitTrash && rs != runOptions.SafeRendezvousState {
87 if runOptions.SafeRendezvousState != "" {
88 bal.logf("notice: KeepServices list has changed since last run")
90 bal.logf("clearing existing trash lists, in case the new rendezvous order differs from previous run")
91 if err = bal.ClearTrashLists(&config.Client); err != nil {
94 // The current rendezvous state becomes "safe" (i.e.,
95 // OK to compute changes for that state without
96 // clearing existing trash lists) only now, after we
97 // succeed in clearing existing trash lists.
98 nextRunOptions.SafeRendezvousState = rs
100 if err = bal.GetCurrentState(&config.Client, config.CollectionBatchSize, config.CollectionBuffers); err != nil {
103 bal.ComputeChangeSets()
104 bal.PrintStatistics()
105 if err = bal.CheckSanityLate(); err != nil {
108 if runOptions.CommitPulls {
109 err = bal.CommitPulls(&config.Client)
111 // Skip trash if we can't pull. (Too cautious?)
115 if runOptions.CommitTrash {
116 err = bal.CommitTrash(&config.Client)
121 // SetKeepServices sets the list of KeepServices to operate on.
122 func (bal *Balancer) SetKeepServices(srvList arvados.KeepServiceList) error {
123 bal.KeepServices = make(map[string]*KeepService)
124 for _, srv := range srvList.Items {
125 bal.KeepServices[srv.UUID] = &KeepService{
127 ChangeSet: &ChangeSet{},
133 // DiscoverKeepServices sets the list of KeepServices by calling the
134 // API to get a list of all services, and selecting the ones whose
135 // ServiceType is in okTypes.
136 func (bal *Balancer) DiscoverKeepServices(c *arvados.Client, okTypes []string) error {
137 bal.KeepServices = make(map[string]*KeepService)
138 ok := make(map[string]bool)
139 for _, t := range okTypes {
142 return c.EachKeepService(func(srv arvados.KeepService) error {
143 if ok[srv.ServiceType] {
144 bal.KeepServices[srv.UUID] = &KeepService{
146 ChangeSet: &ChangeSet{},
149 bal.logf("skipping %v with service type %q", srv.UUID, srv.ServiceType)
155 // CheckSanityEarly checks for configuration and runtime errors that
156 // can be detected before GetCurrentState() and ComputeChangeSets()
159 // If it returns an error, it is pointless to run GetCurrentState or
160 // ComputeChangeSets: after doing so, the statistics would be
161 // meaningless and it would be dangerous to run any Commit methods.
162 func (bal *Balancer) CheckSanityEarly(c *arvados.Client) error {
163 u, err := c.CurrentUser()
165 return fmt.Errorf("CurrentUser(): %v", err)
167 if !u.IsActive || !u.IsAdmin {
168 return fmt.Errorf("current user (%s) is not an active admin user", u.UUID)
170 for _, srv := range bal.KeepServices {
171 if srv.ServiceType == "proxy" {
172 return fmt.Errorf("config error: %s: proxy servers cannot be balanced", srv)
178 // rendezvousState returns a fingerprint (e.g., a sorted list of
179 // UUID+host+port) of the current set of keep services.
180 func (bal *Balancer) rendezvousState() string {
181 srvs := make([]string, 0, len(bal.KeepServices))
182 for _, srv := range bal.KeepServices {
183 srvs = append(srvs, srv.String())
186 return strings.Join(srvs, "; ")
189 // ClearTrashLists sends an empty trash list to each keep
190 // service. Calling this before GetCurrentState avoids races.
192 // When a block appears in an index, we assume that replica will still
193 // exist after we delete other replicas on other servers. However,
194 // it's possible that a previous rebalancing operation made different
195 // decisions (e.g., servers were added/removed, and rendezvous order
196 // changed). In this case, the replica might already be on that
197 // server's trash list, and it might be deleted before we send a
198 // replacement trash list.
200 // We avoid this problem if we clear all trash lists before getting
201 // indexes. (We also assume there is only one rebalancing process
202 // running at a time.)
203 func (bal *Balancer) ClearTrashLists(c *arvados.Client) error {
204 for _, srv := range bal.KeepServices {
205 srv.ChangeSet = &ChangeSet{}
207 return bal.CommitTrash(c)
210 // GetCurrentState determines the current replication state, and the
211 // desired replication level, for every block that is either
212 // retrievable or referenced.
214 // It determines the current replication state by reading the block index
215 // from every known Keep service.
217 // It determines the desired replication level by retrieving all
218 // collection manifests in the database (API server).
220 // It encodes the resulting information in BlockStateMap.
221 func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) error {
222 defer timeMe(bal.Logger, "GetCurrentState")()
223 bal.BlockStateMap = NewBlockStateMap()
225 dd, err := c.DiscoveryDocument()
229 bal.DefaultReplication = dd.DefaultCollectionReplication
230 bal.MinMtime = time.Now().UnixNano() - dd.BlobSignatureTTL*1e9
232 errs := make(chan error, 2+len(bal.KeepServices))
233 wg := sync.WaitGroup{}
235 // Start one goroutine for each KeepService: retrieve the
236 // index, and add the returned blocks to BlockStateMap.
237 for _, srv := range bal.KeepServices {
239 go func(srv *KeepService) {
241 bal.logf("%s: retrieve index", srv)
242 idx, err := srv.Index(c, "")
244 errs <- fmt.Errorf("%s: %v", srv, err)
248 // Some other goroutine encountered an
249 // error -- any futher effort here
253 bal.logf("%s: add %d replicas to map", srv, len(idx))
254 bal.BlockStateMap.AddReplicas(srv, idx)
255 bal.logf("%s: done", srv)
259 // collQ buffers incoming collections so we can start fetching
260 // the next page without waiting for the current page to
261 // finish processing.
262 collQ := make(chan arvados.Collection, bufs)
264 // Start a goroutine to process collections. (We could use a
265 // worker pool here, but even with a single worker we already
266 // process collections much faster than we can retrieve them.)
270 for coll := range collQ {
271 err := bal.addCollection(coll)
282 // Start a goroutine to retrieve all collections from the
283 // Arvados database and send them to collQ for processing.
287 err = EachCollection(c, pageSize,
288 func(coll arvados.Collection) error {
291 // some other GetCurrentState
292 // error happened: no point
295 return fmt.Errorf("")
298 }, func(done, total int) {
299 bal.logf("collections: %d/%d", done, total)
314 func (bal *Balancer) addCollection(coll arvados.Collection) error {
315 blkids, err := coll.SizedDigests()
318 bal.errors = append(bal.errors, fmt.Errorf("%v: %v", coll.UUID, err))
322 repl := bal.DefaultReplication
323 if coll.ReplicationDesired != nil {
324 repl = *coll.ReplicationDesired
326 debugf("%v: %d block x%d", coll.UUID, len(blkids), repl)
327 bal.BlockStateMap.IncreaseDesired(repl, blkids)
331 // ComputeChangeSets compares, for each known block, the current and
332 // desired replication states. If it is possible to get closer to the
333 // desired state by copying or deleting blocks, it adds those changes
334 // to the relevant KeepServices' ChangeSets.
336 // It does not actually apply any of the computed changes.
337 func (bal *Balancer) ComputeChangeSets() {
338 // This just calls balanceBlock() once for each block, using a
339 // pool of worker goroutines.
340 defer timeMe(bal.Logger, "ComputeChangeSets")()
341 bal.setupServiceRoots()
343 type balanceTask struct {
344 blkid arvados.SizedDigest
347 nWorkers := 1 + runtime.NumCPU()
348 todo := make(chan balanceTask, nWorkers)
349 var wg sync.WaitGroup
350 for i := 0; i < nWorkers; i++ {
353 for work := range todo {
354 bal.balanceBlock(work.blkid, work.blk)
359 bal.BlockStateMap.Apply(func(blkid arvados.SizedDigest, blk *BlockState) {
369 func (bal *Balancer) setupServiceRoots() {
370 bal.serviceRoots = make(map[string]string)
371 for _, srv := range bal.KeepServices {
372 bal.serviceRoots[srv.UUID] = srv.UUID
383 var changeName = map[int]string{
386 changeTrash: "trash",
390 // balanceBlock compares current state to desired state for a single
391 // block, and makes the appropriate ChangeSet calls.
392 func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) {
393 debugf("balanceBlock: %v %+v", blkid, blk)
394 uuids := keepclient.NewRootSorter(bal.serviceRoots, string(blkid[:32])).GetSortedRoots()
395 hasRepl := make(map[string]Replica, len(bal.serviceRoots))
396 for _, repl := range blk.Replicas {
397 hasRepl[repl.UUID] = repl
398 // TODO: when multiple copies are on one server, use
399 // the oldest one that doesn't have a timestamp
400 // collision with other replicas.
402 // number of replicas already found in positions better than
403 // the position we're contemplating now.
404 reportedBestRepl := 0
405 // To be safe we assume two replicas with the same Mtime are
406 // in fact the same replica being reported more than
407 // once. len(uniqueBestRepl) is the number of distinct
408 // replicas in the best rendezvous positions we've considered
410 uniqueBestRepl := make(map[int64]bool, len(bal.serviceRoots))
411 // pulls is the number of Pull changes we have already
412 // requested. (For purposes of deciding whether to Pull to
413 // rendezvous position N, we should assume all pulls we have
414 // requested on rendezvous positions M<N will be successful.)
417 for _, uuid := range uuids {
419 srv := bal.KeepServices[uuid]
420 // TODO: request a Touch if Mtime is duplicated.
421 repl, ok := hasRepl[srv.UUID]
423 // This service has a replica. We should
424 // delete it if [1] we already have enough
425 // distinct replicas in better rendezvous
426 // positions and [2] this replica's Mtime is
427 // distinct from all of the better replicas'
430 repl.Mtime < bal.MinMtime &&
431 len(uniqueBestRepl) >= blk.Desired &&
432 !uniqueBestRepl[repl.Mtime] {
441 uniqueBestRepl[repl.Mtime] = true
443 } else if pulls+reportedBestRepl < blk.Desired &&
444 len(blk.Replicas) > 0 &&
446 // This service doesn't have a replica. We
447 // should pull one to this server if we don't
448 // already have enough (existing+requested)
449 // replicas in better rendezvous positions.
452 Source: blk.Replicas[0].KeepService,
457 if bal.Dumper != nil {
458 changes = append(changes, fmt.Sprintf("%s:%d=%s,%d", srv.ServiceHost, srv.ServicePort, changeName[change], repl.Mtime))
461 if bal.Dumper != nil {
462 bal.Dumper.Printf("%s have=%d want=%d %s", blkid, len(blk.Replicas), blk.Desired, strings.Join(changes, " "))
466 type blocksNBytes struct {
472 func (bb blocksNBytes) String() string {
473 return fmt.Sprintf("%d replicas (%d blocks, %d bytes)", bb.replicas, bb.blocks, bb.bytes)
476 type balancerStats struct {
477 lost, overrep, unref, garbage, underrep, justright blocksNBytes
478 desired, current blocksNBytes
483 func (bal *Balancer) getStatistics() (s balancerStats) {
484 s.replHistogram = make([]int, 2)
485 bal.BlockStateMap.Apply(func(blkid arvados.SizedDigest, blk *BlockState) {
486 surplus := len(blk.Replicas) - blk.Desired
487 bytes := blkid.Size()
489 case len(blk.Replicas) == 0 && blk.Desired > 0:
490 s.lost.replicas -= surplus
492 s.lost.bytes += bytes * int64(-surplus)
493 case len(blk.Replicas) < blk.Desired:
494 s.underrep.replicas -= surplus
496 s.underrep.bytes += bytes * int64(-surplus)
497 case len(blk.Replicas) > 0 && blk.Desired == 0:
498 counter := &s.garbage
499 for _, r := range blk.Replicas {
500 if r.Mtime >= bal.MinMtime {
505 counter.replicas += surplus
507 counter.bytes += bytes * int64(surplus)
508 case len(blk.Replicas) > blk.Desired:
509 s.overrep.replicas += surplus
511 s.overrep.bytes += bytes * int64(len(blk.Replicas)-blk.Desired)
513 s.justright.replicas += blk.Desired
515 s.justright.bytes += bytes * int64(blk.Desired)
519 s.desired.replicas += blk.Desired
521 s.desired.bytes += bytes * int64(blk.Desired)
523 if len(blk.Replicas) > 0 {
524 s.current.replicas += len(blk.Replicas)
526 s.current.bytes += bytes * int64(len(blk.Replicas))
529 for len(s.replHistogram) <= len(blk.Replicas) {
530 s.replHistogram = append(s.replHistogram, 0)
532 s.replHistogram[len(blk.Replicas)]++
534 for _, srv := range bal.KeepServices {
535 s.pulls += len(srv.ChangeSet.Pulls)
536 s.trashes += len(srv.ChangeSet.Trashes)
541 // PrintStatistics writes statistics about the computed changes to
542 // bal.Logger. It should not be called until ComputeChangeSets has
544 func (bal *Balancer) PrintStatistics() {
545 s := bal.getStatistics()
547 bal.logf("%s lost (0=have<want)", s.lost)
548 bal.logf("%s underreplicated (0<have<want)", s.underrep)
549 bal.logf("%s just right (have=want)", s.justright)
550 bal.logf("%s overreplicated (have>want>0)", s.overrep)
551 bal.logf("%s unreferenced (have>want=0, new)", s.unref)
552 bal.logf("%s garbage (have>want=0, old)", s.garbage)
554 bal.logf("%s total commitment (excluding unreferenced)", s.desired)
555 bal.logf("%s total usage", s.current)
557 for _, srv := range bal.KeepServices {
558 bal.logf("%s: %v\n", srv, srv.ChangeSet)
561 bal.printHistogram(s, 60)
565 func (bal *Balancer) printHistogram(s balancerStats, hashColumns int) {
566 bal.logf("Replication level distribution (counting N replicas on a single server as N):")
568 for _, count := range s.replHistogram {
569 if maxCount < count {
573 hashes := strings.Repeat("#", hashColumns)
574 countWidth := 1 + int(math.Log10(float64(maxCount+1)))
575 scaleCount := 10 * float64(hashColumns) / math.Floor(1+10*math.Log10(float64(maxCount+1)))
576 for repl, count := range s.replHistogram {
577 nHashes := int(scaleCount * math.Log10(float64(count+1)))
578 bal.logf("%2d: %*d %s", repl, countWidth, count, hashes[:nHashes])
582 // CheckSanityLate checks for configuration and runtime errors after
583 // GetCurrentState() and ComputeChangeSets() have finished.
585 // If it returns an error, it is dangerous to run any Commit methods.
586 func (bal *Balancer) CheckSanityLate() error {
587 if bal.errors != nil {
588 for _, err := range bal.errors {
589 bal.logf("deferred error: %v", err)
591 return fmt.Errorf("cannot proceed safely after deferred errors")
594 if bal.collScanned == 0 {
595 return fmt.Errorf("received zero collections")
599 bal.BlockStateMap.Apply(func(_ arvados.SizedDigest, blk *BlockState) {
605 return fmt.Errorf("zero blocks have desired replication>0")
608 if dr := bal.DefaultReplication; dr < 1 {
609 return fmt.Errorf("Default replication (%d) is less than 1", dr)
612 // TODO: no two services have identical indexes
613 // TODO: no collisions (same md5, different size)
617 // CommitPulls sends the computed lists of pull requests to the
618 // keepstore servers. This has the effect of increasing replication of
619 // existing blocks that are either underreplicated or poorly
620 // distributed according to rendezvous hashing.
621 func (bal *Balancer) CommitPulls(c *arvados.Client) error {
622 return bal.commitAsync(c, "send pull list",
623 func(srv *KeepService) error {
624 return srv.CommitPulls(c)
628 // CommitTrash sends the computed lists of trash requests to the
629 // keepstore servers. This has the effect of deleting blocks that are
630 // overreplicated or unreferenced.
631 func (bal *Balancer) CommitTrash(c *arvados.Client) error {
632 return bal.commitAsync(c, "send trash list",
633 func(srv *KeepService) error {
634 return srv.CommitTrash(c)
638 func (bal *Balancer) commitAsync(c *arvados.Client, label string, f func(srv *KeepService) error) error {
639 errs := make(chan error)
640 for _, srv := range bal.KeepServices {
641 go func(srv *KeepService) {
643 defer func() { errs <- err }()
644 label := fmt.Sprintf("%s: %v", srv, label)
645 defer timeMe(bal.Logger, label)()
648 err = fmt.Errorf("%s: %v", label, err)
653 for range bal.KeepServices {
654 if err := <-errs; err != nil {
663 func (bal *Balancer) logf(f string, args ...interface{}) {
664 if bal.Logger != nil {
665 bal.Logger.Printf(f, args...)