1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
18 "git.curoverse.com/arvados.git/sdk/go/arvados"
19 "git.curoverse.com/arvados.git/sdk/go/keepclient"
22 // CheckConfig returns an error if anything is wrong with the given
23 // config and runOptions.
24 func CheckConfig(config Config, runOptions RunOptions) error {
25 if len(config.KeepServiceList.Items) > 0 && config.KeepServiceTypes != nil {
26 return fmt.Errorf("cannot specify both KeepServiceList and KeepServiceTypes in config")
28 if !runOptions.Once && config.RunPeriod == arvados.Duration(0) {
29 return fmt.Errorf("you must either use the -once flag, or specify RunPeriod in config")
34 // Balancer compares the contents of keepstore servers with the
35 // collections stored in Arvados, and issues pull/trash requests
36 // needed to get (closer to) the optimal data layout.
38 // In the optimal data layout: every data block referenced by a
39 // collection is replicated at least as many times as desired by the
40 // collection; there are no unreferenced data blocks older than
41 // BlobSignatureTTL; and all N existing replicas of a given data block
42 // are in the N best positions in rendezvous probe order.
43 type Balancer struct {
45 KeepServices map[string]*KeepService
46 DefaultReplication int
52 serviceRoots map[string]string
57 // Run performs a balance operation using the given config and
58 // runOptions, and returns RunOptions suitable for passing to a
59 // subsequent balance operation.
61 // Run should only be called once on a given Balancer object.
65 // runOptions, err = (&Balancer{}).Run(config, runOptions)
66 func (bal *Balancer) Run(config Config, runOptions RunOptions) (nextRunOptions RunOptions, err error) {
67 nextRunOptions = runOptions
69 bal.Dumper = runOptions.Dumper
70 bal.Logger = runOptions.Logger
71 if bal.Logger == nil {
72 bal.Logger = log.New(os.Stderr, "", log.LstdFlags)
75 defer timeMe(bal.Logger, "Run")()
77 if len(config.KeepServiceList.Items) > 0 {
78 err = bal.SetKeepServices(config.KeepServiceList)
80 err = bal.DiscoverKeepServices(&config.Client, config.KeepServiceTypes)
85 for _, srv := range bal.KeepServices {
86 err = srv.discoverMounts(&config.Client)
92 if err = bal.CheckSanityEarly(&config.Client); err != nil {
95 rs := bal.rendezvousState()
96 if runOptions.CommitTrash && rs != runOptions.SafeRendezvousState {
97 if runOptions.SafeRendezvousState != "" {
98 bal.logf("notice: KeepServices list has changed since last run")
100 bal.logf("clearing existing trash lists, in case the new rendezvous order differs from previous run")
101 if err = bal.ClearTrashLists(&config.Client); err != nil {
104 // The current rendezvous state becomes "safe" (i.e.,
105 // OK to compute changes for that state without
106 // clearing existing trash lists) only now, after we
107 // succeed in clearing existing trash lists.
108 nextRunOptions.SafeRendezvousState = rs
110 if err = bal.GetCurrentState(&config.Client, config.CollectionBatchSize, config.CollectionBuffers); err != nil {
113 bal.ComputeChangeSets()
114 bal.PrintStatistics()
115 if err = bal.CheckSanityLate(); err != nil {
118 if runOptions.CommitPulls {
119 err = bal.CommitPulls(&config.Client)
121 // Skip trash if we can't pull. (Too cautious?)
125 if runOptions.CommitTrash {
126 err = bal.CommitTrash(&config.Client)
131 // SetKeepServices sets the list of KeepServices to operate on.
132 func (bal *Balancer) SetKeepServices(srvList arvados.KeepServiceList) error {
133 bal.KeepServices = make(map[string]*KeepService)
134 for _, srv := range srvList.Items {
135 bal.KeepServices[srv.UUID] = &KeepService{
137 ChangeSet: &ChangeSet{},
143 // DiscoverKeepServices sets the list of KeepServices by calling the
144 // API to get a list of all services, and selecting the ones whose
145 // ServiceType is in okTypes.
146 func (bal *Balancer) DiscoverKeepServices(c *arvados.Client, okTypes []string) error {
147 bal.KeepServices = make(map[string]*KeepService)
148 ok := make(map[string]bool)
149 for _, t := range okTypes {
152 return c.EachKeepService(func(srv arvados.KeepService) error {
153 if ok[srv.ServiceType] {
154 bal.KeepServices[srv.UUID] = &KeepService{
156 ChangeSet: &ChangeSet{},
159 bal.logf("skipping %v with service type %q", srv.UUID, srv.ServiceType)
165 // CheckSanityEarly checks for configuration and runtime errors that
166 // can be detected before GetCurrentState() and ComputeChangeSets()
169 // If it returns an error, it is pointless to run GetCurrentState or
170 // ComputeChangeSets: after doing so, the statistics would be
171 // meaningless and it would be dangerous to run any Commit methods.
172 func (bal *Balancer) CheckSanityEarly(c *arvados.Client) error {
173 u, err := c.CurrentUser()
175 return fmt.Errorf("CurrentUser(): %v", err)
177 if !u.IsActive || !u.IsAdmin {
178 return fmt.Errorf("current user (%s) is not an active admin user", u.UUID)
180 for _, srv := range bal.KeepServices {
181 if srv.ServiceType == "proxy" {
182 return fmt.Errorf("config error: %s: proxy servers cannot be balanced", srv)
188 // rendezvousState returns a fingerprint (e.g., a sorted list of
189 // UUID+host+port) of the current set of keep services.
190 func (bal *Balancer) rendezvousState() string {
191 srvs := make([]string, 0, len(bal.KeepServices))
192 for _, srv := range bal.KeepServices {
193 srvs = append(srvs, srv.String())
196 return strings.Join(srvs, "; ")
199 // ClearTrashLists sends an empty trash list to each keep
200 // service. Calling this before GetCurrentState avoids races.
202 // When a block appears in an index, we assume that replica will still
203 // exist after we delete other replicas on other servers. However,
204 // it's possible that a previous rebalancing operation made different
205 // decisions (e.g., servers were added/removed, and rendezvous order
206 // changed). In this case, the replica might already be on that
207 // server's trash list, and it might be deleted before we send a
208 // replacement trash list.
210 // We avoid this problem if we clear all trash lists before getting
211 // indexes. (We also assume there is only one rebalancing process
212 // running at a time.)
213 func (bal *Balancer) ClearTrashLists(c *arvados.Client) error {
214 for _, srv := range bal.KeepServices {
215 srv.ChangeSet = &ChangeSet{}
217 return bal.CommitTrash(c)
220 // GetCurrentState determines the current replication state, and the
221 // desired replication level, for every block that is either
222 // retrievable or referenced.
224 // It determines the current replication state by reading the block index
225 // from every known Keep service.
227 // It determines the desired replication level by retrieving all
228 // collection manifests in the database (API server).
230 // It encodes the resulting information in BlockStateMap.
231 func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) error {
232 defer timeMe(bal.Logger, "GetCurrentState")()
233 bal.BlockStateMap = NewBlockStateMap()
235 dd, err := c.DiscoveryDocument()
239 bal.DefaultReplication = dd.DefaultCollectionReplication
240 bal.MinMtime = time.Now().UnixNano() - dd.BlobSignatureTTL*1e9
242 errs := make(chan error, 2+len(bal.KeepServices))
243 wg := sync.WaitGroup{}
245 // Start one goroutine for each KeepService: retrieve the
246 // index, and add the returned blocks to BlockStateMap.
247 for _, srv := range bal.KeepServices {
249 go func(srv *KeepService) {
251 bal.logf("%s: retrieve indexes", srv)
252 for _, mount := range srv.mounts {
253 bal.logf("%s: retrieve index", mount)
254 idx, err := srv.IndexMount(c, mount.UUID, "")
256 errs <- fmt.Errorf("%s: retrieve index: %v", mount, err)
260 // Some other goroutine encountered an
261 // error -- any further effort here
265 bal.logf("%s: add %d replicas to map", mount, len(idx))
266 bal.BlockStateMap.AddReplicas(mount, idx)
267 bal.logf("%s: done", mount)
269 bal.logf("%s: done", srv)
273 // collQ buffers incoming collections so we can start fetching
274 // the next page without waiting for the current page to
275 // finish processing.
276 collQ := make(chan arvados.Collection, bufs)
278 // Start a goroutine to process collections. (We could use a
279 // worker pool here, but even with a single worker we already
280 // process collections much faster than we can retrieve them.)
284 for coll := range collQ {
285 err := bal.addCollection(coll)
296 // Start a goroutine to retrieve all collections from the
297 // Arvados database and send them to collQ for processing.
301 err = EachCollection(c, pageSize,
302 func(coll arvados.Collection) error {
305 // some other GetCurrentState
306 // error happened: no point
309 return fmt.Errorf("")
312 }, func(done, total int) {
313 bal.logf("collections: %d/%d", done, total)
328 func (bal *Balancer) addCollection(coll arvados.Collection) error {
329 blkids, err := coll.SizedDigests()
332 bal.errors = append(bal.errors, fmt.Errorf("%v: %v", coll.UUID, err))
336 repl := bal.DefaultReplication
337 if coll.ReplicationDesired != nil {
338 repl = *coll.ReplicationDesired
340 debugf("%v: %d block x%d", coll.UUID, len(blkids), repl)
341 bal.BlockStateMap.IncreaseDesired(repl, blkids)
345 // ComputeChangeSets compares, for each known block, the current and
346 // desired replication states. If it is possible to get closer to the
347 // desired state by copying or deleting blocks, it adds those changes
348 // to the relevant KeepServices' ChangeSets.
350 // It does not actually apply any of the computed changes.
351 func (bal *Balancer) ComputeChangeSets() {
352 // This just calls balanceBlock() once for each block, using a
353 // pool of worker goroutines.
354 defer timeMe(bal.Logger, "ComputeChangeSets")()
355 bal.setupServiceRoots()
357 type balanceTask struct {
358 blkid arvados.SizedDigest
361 nWorkers := 1 + runtime.NumCPU()
362 todo := make(chan balanceTask, nWorkers)
363 var wg sync.WaitGroup
364 for i := 0; i < nWorkers; i++ {
367 for work := range todo {
368 bal.balanceBlock(work.blkid, work.blk)
373 bal.BlockStateMap.Apply(func(blkid arvados.SizedDigest, blk *BlockState) {
383 func (bal *Balancer) setupServiceRoots() {
384 bal.serviceRoots = make(map[string]string)
385 for _, srv := range bal.KeepServices {
386 bal.serviceRoots[srv.UUID] = srv.UUID
397 var changeName = map[int]string{
400 changeTrash: "trash",
404 // balanceBlock compares current state to desired state for a single
405 // block, and makes the appropriate ChangeSet calls.
406 func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) {
407 debugf("balanceBlock: %v %+v", blkid, blk)
409 // A slot is somewhere a replica could potentially be trashed
410 // from, pulled from, or pulled to. Each KeepService gets
411 // either one empty slot, or one or more non-empty slots.
413 srv *KeepService // never nil
414 repl *Replica // nil if none found
417 // First, we build an ordered list of all slots worth
418 // considering (including all slots where replicas have been
419 // found, as well as all of the optimal slots for this block).
420 // Then, when we consider each slot in that order, we will
421 // have all of the information we need to make a decision
424 uuids := keepclient.NewRootSorter(bal.serviceRoots, string(blkid[:32])).GetSortedRoots()
425 rendezvousOrder := make(map[*KeepService]int, len(uuids))
426 slots := make([]slot, len(uuids))
427 for i, uuid := range uuids {
428 srv := bal.KeepServices[uuid]
429 rendezvousOrder[srv] = i
433 // Sort readonly replicas ahead of trashable ones. This way,
434 // if a single service has excessive replicas, the ones we
435 // encounter last (and therefore choose to delete) will be on
436 // the writable volumes, where possible.
438 // TODO: within the trashable set, prefer the oldest replica
439 // that doesn't have a timestamp collision with others.
440 sort.Slice(blk.Replicas, func(i, j int) bool {
441 mnt := blk.Replicas[i].KeepMount
442 return mnt.ReadOnly || mnt.KeepService.ReadOnly
445 // Assign existing replicas to slots.
446 for ri := range blk.Replicas {
447 repl := &blk.Replicas[ri]
448 srv := repl.KeepService
449 slotIdx := rendezvousOrder[srv]
450 if slots[slotIdx].repl != nil {
451 // Additional replicas on a single server are
452 // considered non-optimal. Within this
453 // category, we don't try to optimize layout:
454 // we just say the optimal order is the order
455 // we encounter them.
457 slots = append(slots, slot{srv: srv})
459 slots[slotIdx].repl = repl
462 // number of replicas already found in positions better than
463 // the position we're contemplating now.
464 reportedBestRepl := 0
465 // To be safe we assume two replicas with the same Mtime are
466 // in fact the same replica being reported more than
467 // once. len(uniqueBestRepl) is the number of distinct
468 // replicas in the best rendezvous positions we've considered
470 uniqueBestRepl := make(map[int64]bool, len(bal.serviceRoots))
471 // pulls is the number of Pull changes we have already
472 // requested. (For purposes of deciding whether to Pull to
473 // rendezvous position N, we should assume all pulls we have
474 // requested on rendezvous positions M<N will be successful.)
477 for _, slot := range slots {
479 srv, repl := slot.srv, slot.repl
480 // TODO: request a Touch if Mtime is duplicated.
482 // This service has a replica. We should
483 // delete it if [1] we already have enough
484 // distinct replicas in better rendezvous
485 // positions and [2] this replica's Mtime is
486 // distinct from all of the better replicas'
489 !repl.KeepMount.ReadOnly &&
490 repl.Mtime < bal.MinMtime &&
491 len(uniqueBestRepl) >= blk.Desired &&
492 !uniqueBestRepl[repl.Mtime] {
501 uniqueBestRepl[repl.Mtime] = true
503 } else if pulls+reportedBestRepl < blk.Desired &&
504 len(blk.Replicas) > 0 &&
506 // This service doesn't have a replica. We
507 // should pull one to this server if we don't
508 // already have enough (existing+requested)
509 // replicas in better rendezvous positions.
512 Source: blk.Replicas[0].KeepService,
517 if bal.Dumper != nil {
522 changes = append(changes, fmt.Sprintf("%s:%d=%s,%d", srv.ServiceHost, srv.ServicePort, changeName[change], mtime))
525 if bal.Dumper != nil {
526 bal.Dumper.Printf("%s have=%d want=%d %s", blkid, len(blk.Replicas), blk.Desired, strings.Join(changes, " "))
530 type blocksNBytes struct {
536 func (bb blocksNBytes) String() string {
537 return fmt.Sprintf("%d replicas (%d blocks, %d bytes)", bb.replicas, bb.blocks, bb.bytes)
540 type balancerStats struct {
541 lost, overrep, unref, garbage, underrep, justright blocksNBytes
542 desired, current blocksNBytes
547 func (bal *Balancer) getStatistics() (s balancerStats) {
548 s.replHistogram = make([]int, 2)
549 bal.BlockStateMap.Apply(func(blkid arvados.SizedDigest, blk *BlockState) {
550 surplus := len(blk.Replicas) - blk.Desired
551 bytes := blkid.Size()
553 case len(blk.Replicas) == 0 && blk.Desired > 0:
554 s.lost.replicas -= surplus
556 s.lost.bytes += bytes * int64(-surplus)
557 case len(blk.Replicas) < blk.Desired:
558 s.underrep.replicas -= surplus
560 s.underrep.bytes += bytes * int64(-surplus)
561 case len(blk.Replicas) > 0 && blk.Desired == 0:
562 counter := &s.garbage
563 for _, r := range blk.Replicas {
564 if r.Mtime >= bal.MinMtime {
569 counter.replicas += surplus
571 counter.bytes += bytes * int64(surplus)
572 case len(blk.Replicas) > blk.Desired:
573 s.overrep.replicas += surplus
575 s.overrep.bytes += bytes * int64(len(blk.Replicas)-blk.Desired)
577 s.justright.replicas += blk.Desired
579 s.justright.bytes += bytes * int64(blk.Desired)
583 s.desired.replicas += blk.Desired
585 s.desired.bytes += bytes * int64(blk.Desired)
587 if len(blk.Replicas) > 0 {
588 s.current.replicas += len(blk.Replicas)
590 s.current.bytes += bytes * int64(len(blk.Replicas))
593 for len(s.replHistogram) <= len(blk.Replicas) {
594 s.replHistogram = append(s.replHistogram, 0)
596 s.replHistogram[len(blk.Replicas)]++
598 for _, srv := range bal.KeepServices {
599 s.pulls += len(srv.ChangeSet.Pulls)
600 s.trashes += len(srv.ChangeSet.Trashes)
605 // PrintStatistics writes statistics about the computed changes to
606 // bal.Logger. It should not be called until ComputeChangeSets has
608 func (bal *Balancer) PrintStatistics() {
609 s := bal.getStatistics()
611 bal.logf("%s lost (0=have<want)", s.lost)
612 bal.logf("%s underreplicated (0<have<want)", s.underrep)
613 bal.logf("%s just right (have=want)", s.justright)
614 bal.logf("%s overreplicated (have>want>0)", s.overrep)
615 bal.logf("%s unreferenced (have>want=0, new)", s.unref)
616 bal.logf("%s garbage (have>want=0, old)", s.garbage)
618 bal.logf("%s total commitment (excluding unreferenced)", s.desired)
619 bal.logf("%s total usage", s.current)
621 for _, srv := range bal.KeepServices {
622 bal.logf("%s: %v\n", srv, srv.ChangeSet)
625 bal.printHistogram(s, 60)
629 func (bal *Balancer) printHistogram(s balancerStats, hashColumns int) {
630 bal.logf("Replication level distribution (counting N replicas on a single server as N):")
632 for _, count := range s.replHistogram {
633 if maxCount < count {
637 hashes := strings.Repeat("#", hashColumns)
638 countWidth := 1 + int(math.Log10(float64(maxCount+1)))
639 scaleCount := 10 * float64(hashColumns) / math.Floor(1+10*math.Log10(float64(maxCount+1)))
640 for repl, count := range s.replHistogram {
641 nHashes := int(scaleCount * math.Log10(float64(count+1)))
642 bal.logf("%2d: %*d %s", repl, countWidth, count, hashes[:nHashes])
646 // CheckSanityLate checks for configuration and runtime errors after
647 // GetCurrentState() and ComputeChangeSets() have finished.
649 // If it returns an error, it is dangerous to run any Commit methods.
650 func (bal *Balancer) CheckSanityLate() error {
651 if bal.errors != nil {
652 for _, err := range bal.errors {
653 bal.logf("deferred error: %v", err)
655 return fmt.Errorf("cannot proceed safely after deferred errors")
658 if bal.collScanned == 0 {
659 return fmt.Errorf("received zero collections")
663 bal.BlockStateMap.Apply(func(_ arvados.SizedDigest, blk *BlockState) {
669 return fmt.Errorf("zero blocks have desired replication>0")
672 if dr := bal.DefaultReplication; dr < 1 {
673 return fmt.Errorf("Default replication (%d) is less than 1", dr)
676 // TODO: no two services have identical indexes
677 // TODO: no collisions (same md5, different size)
681 // CommitPulls sends the computed lists of pull requests to the
682 // keepstore servers. This has the effect of increasing replication of
683 // existing blocks that are either underreplicated or poorly
684 // distributed according to rendezvous hashing.
685 func (bal *Balancer) CommitPulls(c *arvados.Client) error {
686 return bal.commitAsync(c, "send pull list",
687 func(srv *KeepService) error {
688 return srv.CommitPulls(c)
692 // CommitTrash sends the computed lists of trash requests to the
693 // keepstore servers. This has the effect of deleting blocks that are
694 // overreplicated or unreferenced.
695 func (bal *Balancer) CommitTrash(c *arvados.Client) error {
696 return bal.commitAsync(c, "send trash list",
697 func(srv *KeepService) error {
698 return srv.CommitTrash(c)
702 func (bal *Balancer) commitAsync(c *arvados.Client, label string, f func(srv *KeepService) error) error {
703 errs := make(chan error)
704 for _, srv := range bal.KeepServices {
705 go func(srv *KeepService) {
707 defer func() { errs <- err }()
708 label := fmt.Sprintf("%s: %v", srv, label)
709 defer timeMe(bal.Logger, label)()
712 err = fmt.Errorf("%s: %v", label, err)
717 for range bal.KeepServices {
718 if err := <-errs; err != nil {
727 func (bal *Balancer) logf(f string, args ...interface{}) {
728 if bal.Logger != nil {
729 bal.Logger.Printf(f, args...)