12 "git.curoverse.com/arvados.git/sdk/go/arvados"
13 "git.curoverse.com/arvados.git/sdk/go/keepclient"
16 // CheckConfig returns an error if anything is wrong with the given
17 // config and runOptions.
18 func CheckConfig(config Config, runOptions RunOptions) error {
19 if len(config.KeepServiceList.Items) > 0 && config.KeepServiceTypes != nil {
20 return fmt.Errorf("cannot specify both KeepServiceList and KeepServiceTypes in config")
22 if !runOptions.Once && config.RunPeriod == arvados.Duration(0) {
23 return fmt.Errorf("you must either use the -once flag, or specify RunPeriod in config")
28 // Balancer compares the contents of keepstore servers with the
29 // collections stored in Arvados, and issues pull/trash requests
30 // needed to get (closer to) the optimal data layout.
32 // In the optimal data layout: every data block referenced by a
33 // collection is replicated at least as many times as desired by the
34 // collection; there are no unreferenced data blocks older than
35 // BlobSignatureTTL; and all N existing replicas of a given data block
36 // are in the N best positions in rendezvous probe order.
37 type Balancer struct {
39 KeepServices map[string]*KeepService
40 DefaultReplication int
46 serviceRoots map[string]string
51 // Run performs a balance operation using the given config and
52 // runOptions. It should only be called once on a given Balancer
53 // object. Typical usage:
55 // err = (&Balancer{}).Run(config, runOptions)
56 func (bal *Balancer) Run(config Config, runOptions RunOptions) (err error) {
57 bal.Dumper = runOptions.Dumper
58 bal.Logger = runOptions.Logger
59 if bal.Logger == nil {
60 bal.Logger = log.New(os.Stderr, "", log.LstdFlags)
63 defer timeMe(bal.Logger, "Run")()
65 if len(config.KeepServiceList.Items) > 0 {
66 err = bal.SetKeepServices(config.KeepServiceList)
68 err = bal.DiscoverKeepServices(&config.Client, config.KeepServiceTypes)
74 if err = bal.CheckSanityEarly(&config.Client); err != nil {
77 if runOptions.CommitTrash {
78 if err = bal.ClearTrashLists(&config.Client); err != nil {
82 if err = bal.GetCurrentState(&config.Client); err != nil {
85 bal.ComputeChangeSets()
87 if err = bal.CheckSanityLate(); err != nil {
90 if runOptions.CommitPulls {
91 err = bal.CommitPulls(&config.Client)
93 // Skip trash if we can't pull. (Too cautious?)
97 if runOptions.CommitTrash {
98 err = bal.CommitTrash(&config.Client)
103 // SetKeepServices sets the list of KeepServices to operate on.
104 func (bal *Balancer) SetKeepServices(srvList arvados.KeepServiceList) error {
105 bal.KeepServices = make(map[string]*KeepService)
106 for _, srv := range srvList.Items {
107 bal.KeepServices[srv.UUID] = &KeepService{
109 ChangeSet: &ChangeSet{},
115 // DiscoverKeepServices sets the list of KeepServices by calling the
116 // API to get a list of all services, and selecting the ones whose
117 // ServiceType is in okTypes.
118 func (bal *Balancer) DiscoverKeepServices(c *arvados.Client, okTypes []string) error {
119 bal.KeepServices = make(map[string]*KeepService)
120 ok := make(map[string]bool)
121 for _, t := range okTypes {
124 return c.EachKeepService(func(srv arvados.KeepService) error {
125 if ok[srv.ServiceType] {
126 bal.KeepServices[srv.UUID] = &KeepService{
128 ChangeSet: &ChangeSet{},
131 bal.logf("skipping %v with service type %q", srv.UUID, srv.ServiceType)
137 // CheckSanityEarly checks for configuration and runtime errors that
138 // can be detected before GetCurrentState() and ComputeChangeSets()
141 // If it returns an error, it is pointless to run GetCurrentState or
142 // ComputeChangeSets: after doing so, the statistics would be
143 // meaningless and it would be dangerous to run any Commit methods.
144 func (bal *Balancer) CheckSanityEarly(c *arvados.Client) error {
145 u, err := c.CurrentUser()
147 return fmt.Errorf("CurrentUser(): %v", err)
149 if !u.IsActive || !u.IsAdmin {
150 return fmt.Errorf("current user (%s) is not an active admin user", u.UUID)
152 for _, srv := range bal.KeepServices {
153 if srv.ServiceType == "proxy" {
154 return fmt.Errorf("config error: %s: proxy servers cannot be balanced", srv)
160 // ClearTrashLists sends an empty trash list to each keep
161 // service. Calling this before GetCurrentState avoids races.
163 // When a block appears in an index, we assume that replica will still
164 // exist after we delete other replicas on other servers. However,
165 // it's possible that a previous rebalancing operation made different
166 // decisions (e.g., servers were added/removed, and rendezvous order
167 // changed). In this case, the replica might already be on that
168 // server's trash list, and it might be deleted before we send a
169 // replacement trash list.
171 // We avoid this problem if we clear all trash lists before getting
172 // indexes. (We also assume there is only one rebalancing process
173 // running at a time.)
174 func (bal *Balancer) ClearTrashLists(c *arvados.Client) error {
175 for _, srv := range bal.KeepServices {
176 srv.ChangeSet = &ChangeSet{}
178 return bal.CommitTrash(c)
181 // GetCurrentState determines the current replication state, and the
182 // desired replication level, for every block that is either
183 // retrievable or referenced.
185 // It determines the current replication state by reading the block index
186 // from every known Keep service.
188 // It determines the desired replication level by retrieving all
189 // collection manifests in the database (API server).
191 // It encodes the resulting information in BlockStateMap.
192 func (bal *Balancer) GetCurrentState(c *arvados.Client) error {
193 defer timeMe(bal.Logger, "GetCurrentState")()
194 bal.BlockStateMap = NewBlockStateMap()
196 dd, err := c.DiscoveryDocument()
200 bal.DefaultReplication = dd.DefaultCollectionReplication
201 bal.MinMtime = time.Now().Unix() - dd.BlobSignatureTTL
203 errs := make(chan error, 2+len(bal.KeepServices))
204 wg := sync.WaitGroup{}
206 // Start one goroutine for each KeepService: retrieve the
207 // index, and add the returned blocks to BlockStateMap.
208 for _, srv := range bal.KeepServices {
210 go func(srv *KeepService) {
212 bal.logf("%s: retrieve index", srv)
213 idx, err := srv.Index(c, "")
215 errs <- fmt.Errorf("%s: %v", srv, err)
218 bal.logf("%s: add %d replicas to map", srv, len(idx))
219 bal.BlockStateMap.AddReplicas(srv, idx)
220 bal.logf("%s: done", srv)
224 // collQ buffers incoming collections so we can start fetching
225 // the next page without waiting for the current page to
226 // finish processing. (1000 happens to match the page size
227 // used by (*arvados.Client)EachCollection(), but it's OK if
228 // they don't match.)
229 collQ := make(chan arvados.Collection, 1000)
231 // Start a goroutine to process collections. (We could use a
232 // worker pool here, but even with a single worker we already
233 // process collections much faster than we can retrieve them.)
237 for coll := range collQ {
238 err := bal.addCollection(coll)
249 // Start a goroutine to retrieve all collections from the
250 // Arvados database and send them to collQ for processing.
254 err = EachCollection(c,
255 func(coll arvados.Collection) error {
258 // some other GetCurrentState
259 // error happened: no point
262 return fmt.Errorf("")
265 }, func(done, total int) {
266 bal.logf("collections: %d/%d", done, total)
275 // Send a nil error when all goroutines finish. If
276 // this is the first error sent to errs, then
277 // everything worked.
284 func (bal *Balancer) addCollection(coll arvados.Collection) error {
285 blkids, err := coll.SizedDigests()
288 bal.errors = append(bal.errors, fmt.Errorf("%v: %v", coll.UUID, err))
292 repl := bal.DefaultReplication
293 if coll.ReplicationDesired != nil {
294 repl = *coll.ReplicationDesired
296 debugf("%v: %d block x%d", coll.UUID, len(blkids), repl)
297 bal.BlockStateMap.IncreaseDesired(repl, blkids)
301 // ComputeChangeSets compares, for each known block, the current and
302 // desired replication states. If it is possible to get closer to the
303 // desired state by copying or deleting blocks, it adds those changes
304 // to the relevant KeepServices' ChangeSets.
306 // It does not actually apply any of the computed changes.
307 func (bal *Balancer) ComputeChangeSets() {
308 // This just calls balanceBlock() once for each block, using a
309 // pool of worker goroutines.
310 defer timeMe(bal.Logger, "ComputeChangeSets")()
311 bal.setupServiceRoots()
313 type balanceTask struct {
314 blkid arvados.SizedDigest
317 nWorkers := 1 + runtime.NumCPU()
318 todo := make(chan balanceTask, nWorkers)
319 var wg sync.WaitGroup
320 for i := 0; i < nWorkers; i++ {
323 for work := range todo {
324 bal.balanceBlock(work.blkid, work.blk)
329 bal.BlockStateMap.Apply(func(blkid arvados.SizedDigest, blk *BlockState) {
339 func (bal *Balancer) setupServiceRoots() {
340 bal.serviceRoots = make(map[string]string)
341 for _, srv := range bal.KeepServices {
342 bal.serviceRoots[srv.UUID] = srv.UUID
353 var changeName = map[int]string{
356 changeTrash: "trash",
360 // balanceBlock compares current state to desired state for a single
361 // block, and makes the appropriate ChangeSet calls.
362 func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) {
363 debugf("balanceBlock: %v %+v", blkid, blk)
364 uuids := keepclient.NewRootSorter(bal.serviceRoots, string(blkid[:32])).GetSortedRoots()
365 hasRepl := make(map[string]Replica, len(bal.serviceRoots))
366 for _, repl := range blk.Replicas {
367 hasRepl[repl.UUID] = repl
368 // TODO: when multiple copies are on one server, use
369 // the oldest one that doesn't have a timestamp
370 // collision with other replicas.
372 // number of replicas already found in positions better than
373 // the position we're contemplating now.
374 reportedBestRepl := 0
375 // To be safe we assume two replicas with the same Mtime are
376 // in fact the same replica being reported more than
377 // once. len(uniqueBestRepl) is the number of distinct
378 // replicas in the best rendezvous positions we've considered
380 uniqueBestRepl := make(map[int64]bool, len(bal.serviceRoots))
381 // pulls is the number of Pull changes we have already
382 // requested. (For purposes of deciding whether to Pull to
383 // rendezvous position N, we should assume all pulls we have
384 // requested on rendezvous positions M<N will be successful.)
387 for _, uuid := range uuids {
389 srv := bal.KeepServices[uuid]
390 // TODO: request a Touch if Mtime is duplicated.
391 repl, ok := hasRepl[srv.UUID]
393 // This service has a replica. We should
394 // delete it if [1] we already have enough
395 // distinct replicas in better rendezvous
396 // positions and [2] this replica's Mtime is
397 // distinct from all of the better replicas'
400 repl.Mtime < bal.MinMtime &&
401 len(uniqueBestRepl) >= blk.Desired &&
402 !uniqueBestRepl[repl.Mtime] {
411 uniqueBestRepl[repl.Mtime] = true
413 } else if pulls+reportedBestRepl < blk.Desired &&
414 len(blk.Replicas) > 0 &&
416 // This service doesn't have a replica. We
417 // should pull one to this server if we don't
418 // already have enough (existing+requested)
419 // replicas in better rendezvous positions.
422 Source: blk.Replicas[0].KeepService,
427 if bal.Dumper != nil {
428 changes = append(changes, fmt.Sprintf("%s:%d=%s,%d", srv.ServiceHost, srv.ServicePort, changeName[change], repl.Mtime))
431 if bal.Dumper != nil {
432 bal.Dumper.Printf("%s have=%d want=%d %s", blkid, len(blk.Replicas), blk.Desired, strings.Join(changes, " "))
436 type blocksNBytes struct {
442 func (bb blocksNBytes) String() string {
443 return fmt.Sprintf("%d replicas (%d blocks, %d bytes)", bb.replicas, bb.blocks, bb.bytes)
446 type balancerStats struct {
447 lost, overrep, unref, garbage, underrep, justright blocksNBytes
448 desired, current blocksNBytes
452 func (bal *Balancer) getStatistics() (s balancerStats) {
453 bal.BlockStateMap.Apply(func(blkid arvados.SizedDigest, blk *BlockState) {
454 surplus := len(blk.Replicas) - blk.Desired
455 bytes := blkid.Size()
457 case len(blk.Replicas) == 0 && blk.Desired > 0:
458 s.lost.replicas -= surplus
460 s.lost.bytes += bytes * int64(-surplus)
461 case len(blk.Replicas) < blk.Desired:
462 s.underrep.replicas -= surplus
464 s.underrep.bytes += bytes * int64(-surplus)
465 case len(blk.Replicas) > 0 && blk.Desired == 0:
466 counter := &s.garbage
467 for _, r := range blk.Replicas {
468 if r.Mtime >= bal.MinMtime {
473 counter.replicas += surplus
475 counter.bytes += bytes * int64(surplus)
476 case len(blk.Replicas) > blk.Desired:
477 s.overrep.replicas += surplus
479 s.overrep.bytes += bytes * int64(len(blk.Replicas)-blk.Desired)
481 s.justright.replicas += blk.Desired
483 s.justright.bytes += bytes * int64(blk.Desired)
487 s.desired.replicas += blk.Desired
489 s.desired.bytes += bytes * int64(blk.Desired)
491 if len(blk.Replicas) > 0 {
492 s.current.replicas += len(blk.Replicas)
494 s.current.bytes += bytes * int64(len(blk.Replicas))
497 for _, srv := range bal.KeepServices {
498 s.pulls += len(srv.ChangeSet.Pulls)
499 s.trashes += len(srv.ChangeSet.Trashes)
504 // PrintStatistics writes statistics about the computed changes to
505 // bal.Logger. It should not be called until ComputeChangeSets has
507 func (bal *Balancer) PrintStatistics() {
508 s := bal.getStatistics()
510 bal.logf("%s lost (0=have<want)", s.lost)
511 bal.logf("%s underreplicated (0<have<want)", s.underrep)
512 bal.logf("%s just right (have=want)", s.justright)
513 bal.logf("%s overreplicated (have>want>0)", s.overrep)
514 bal.logf("%s unreferenced (have>want=0, new)", s.unref)
515 bal.logf("%s garbage (have>want=0, old)", s.garbage)
517 bal.logf("%s total commitment (excluding unreferenced)", s.desired)
518 bal.logf("%s total usage", s.current)
520 for _, srv := range bal.KeepServices {
521 bal.logf("%s: %v\n", srv, srv.ChangeSet)
526 // CheckSanityLate checks for configuration and runtime errors after
527 // GetCurrentState() and ComputeChangeSets() have finished.
529 // If it returns an error, it is dangerous to run any Commit methods.
530 func (bal *Balancer) CheckSanityLate() error {
531 if bal.errors != nil {
532 for _, err := range bal.errors {
533 bal.logf("deferred error: %v", err)
535 return fmt.Errorf("cannot proceed safely after deferred errors")
538 if bal.collScanned == 0 {
539 return fmt.Errorf("received zero collections")
543 bal.BlockStateMap.Apply(func(_ arvados.SizedDigest, blk *BlockState) {
549 return fmt.Errorf("zero blocks have desired replication>0")
552 if dr := bal.DefaultReplication; dr < 1 {
553 return fmt.Errorf("Default replication (%d) is less than 1", dr)
556 // TODO: no two services have identical indexes
557 // TODO: no collisions (same md5, different size)
561 // CommitPulls sends the computed lists of pull requests to the
562 // keepstore servers. This has the effect of increasing replication of
563 // existing blocks that are either underreplicated or poorly
564 // distributed according to rendezvous hashing.
565 func (bal *Balancer) CommitPulls(c *arvados.Client) error {
566 return bal.commitAsync(c, "send pull list",
567 func(srv *KeepService) error {
568 return srv.CommitPulls(c)
572 // CommitTrash sends the computed lists of trash requests to the
573 // keepstore servers. This has the effect of deleting blocks that are
574 // overreplicated or unreferenced.
575 func (bal *Balancer) CommitTrash(c *arvados.Client) error {
576 return bal.commitAsync(c, "send trash list",
577 func(srv *KeepService) error {
578 return srv.CommitTrash(c)
582 func (bal *Balancer) commitAsync(c *arvados.Client, label string, f func(srv *KeepService) error) error {
583 errs := make(chan error)
584 for _, srv := range bal.KeepServices {
585 go func(srv *KeepService) {
587 defer func() { errs <- err }()
588 label := fmt.Sprintf("%s: %v", srv, label)
589 defer timeMe(bal.Logger, label)()
592 err = fmt.Errorf("%s: %v", label, err)
597 for _ = range bal.KeepServices {
598 if err := <-errs; err != nil {
607 func (bal *Balancer) logf(f string, args ...interface{}) {
608 if bal.Logger != nil {
609 bal.Logger.Printf(f, args...)