Merge branch '13513-balance-deadlock'
[arvados.git] / services / keep-balance / balance.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "crypto/md5"
10         "fmt"
11         "log"
12         "math"
13         "os"
14         "runtime"
15         "sort"
16         "strings"
17         "sync"
18         "time"
19
20         "git.curoverse.com/arvados.git/sdk/go/arvados"
21         "git.curoverse.com/arvados.git/sdk/go/keepclient"
22 )
23
24 // CheckConfig returns an error if anything is wrong with the given
25 // config and runOptions.
26 func CheckConfig(config Config, runOptions RunOptions) error {
27         if len(config.KeepServiceList.Items) > 0 && config.KeepServiceTypes != nil {
28                 return fmt.Errorf("cannot specify both KeepServiceList and KeepServiceTypes in config")
29         }
30         if !runOptions.Once && config.RunPeriod == arvados.Duration(0) {
31                 return fmt.Errorf("you must either use the -once flag, or specify RunPeriod in config")
32         }
33         return nil
34 }
35
36 // Balancer compares the contents of keepstore servers with the
37 // collections stored in Arvados, and issues pull/trash requests
38 // needed to get (closer to) the optimal data layout.
39 //
40 // In the optimal data layout: every data block referenced by a
41 // collection is replicated at least as many times as desired by the
42 // collection; there are no unreferenced data blocks older than
43 // BlobSignatureTTL; and all N existing replicas of a given data block
44 // are in the N best positions in rendezvous probe order.
45 type Balancer struct {
46         *BlockStateMap
47         KeepServices       map[string]*KeepService
48         DefaultReplication int
49         Logger             *log.Logger
50         Dumper             *log.Logger
51         MinMtime           int64
52
53         classes       []string
54         mounts        int
55         mountsByClass map[string]map[*KeepMount]bool
56         collScanned   int
57         serviceRoots  map[string]string
58         errors        []error
59         stats         balancerStats
60         mutex         sync.Mutex
61 }
62
63 // Run performs a balance operation using the given config and
64 // runOptions, and returns RunOptions suitable for passing to a
65 // subsequent balance operation.
66 //
67 // Run should only be called once on a given Balancer object.
68 //
69 // Typical usage:
70 //
71 //   runOptions, err = (&Balancer{}).Run(config, runOptions)
72 func (bal *Balancer) Run(config Config, runOptions RunOptions) (nextRunOptions RunOptions, err error) {
73         nextRunOptions = runOptions
74
75         bal.Dumper = runOptions.Dumper
76         bal.Logger = runOptions.Logger
77         if bal.Logger == nil {
78                 bal.Logger = log.New(os.Stderr, "", log.LstdFlags)
79         }
80
81         defer timeMe(bal.Logger, "Run")()
82
83         if len(config.KeepServiceList.Items) > 0 {
84                 err = bal.SetKeepServices(config.KeepServiceList)
85         } else {
86                 err = bal.DiscoverKeepServices(&config.Client, config.KeepServiceTypes)
87         }
88         if err != nil {
89                 return
90         }
91
92         for _, srv := range bal.KeepServices {
93                 err = srv.discoverMounts(&config.Client)
94                 if err != nil {
95                         return
96                 }
97         }
98
99         if err = bal.CheckSanityEarly(&config.Client); err != nil {
100                 return
101         }
102         rs := bal.rendezvousState()
103         if runOptions.CommitTrash && rs != runOptions.SafeRendezvousState {
104                 if runOptions.SafeRendezvousState != "" {
105                         bal.logf("notice: KeepServices list has changed since last run")
106                 }
107                 bal.logf("clearing existing trash lists, in case the new rendezvous order differs from previous run")
108                 if err = bal.ClearTrashLists(&config.Client); err != nil {
109                         return
110                 }
111                 // The current rendezvous state becomes "safe" (i.e.,
112                 // OK to compute changes for that state without
113                 // clearing existing trash lists) only now, after we
114                 // succeed in clearing existing trash lists.
115                 nextRunOptions.SafeRendezvousState = rs
116         }
117         if err = bal.GetCurrentState(&config.Client, config.CollectionBatchSize, config.CollectionBuffers); err != nil {
118                 return
119         }
120         bal.ComputeChangeSets()
121         bal.PrintStatistics()
122         if err = bal.CheckSanityLate(); err != nil {
123                 return
124         }
125         if runOptions.CommitPulls {
126                 err = bal.CommitPulls(&config.Client)
127                 if err != nil {
128                         // Skip trash if we can't pull. (Too cautious?)
129                         return
130                 }
131         }
132         if runOptions.CommitTrash {
133                 err = bal.CommitTrash(&config.Client)
134         }
135         return
136 }
137
138 // SetKeepServices sets the list of KeepServices to operate on.
139 func (bal *Balancer) SetKeepServices(srvList arvados.KeepServiceList) error {
140         bal.KeepServices = make(map[string]*KeepService)
141         for _, srv := range srvList.Items {
142                 bal.KeepServices[srv.UUID] = &KeepService{
143                         KeepService: srv,
144                         ChangeSet:   &ChangeSet{},
145                 }
146         }
147         return nil
148 }
149
150 // DiscoverKeepServices sets the list of KeepServices by calling the
151 // API to get a list of all services, and selecting the ones whose
152 // ServiceType is in okTypes.
153 func (bal *Balancer) DiscoverKeepServices(c *arvados.Client, okTypes []string) error {
154         bal.KeepServices = make(map[string]*KeepService)
155         ok := make(map[string]bool)
156         for _, t := range okTypes {
157                 ok[t] = true
158         }
159         return c.EachKeepService(func(srv arvados.KeepService) error {
160                 if ok[srv.ServiceType] {
161                         bal.KeepServices[srv.UUID] = &KeepService{
162                                 KeepService: srv,
163                                 ChangeSet:   &ChangeSet{},
164                         }
165                 } else {
166                         bal.logf("skipping %v with service type %q", srv.UUID, srv.ServiceType)
167                 }
168                 return nil
169         })
170 }
171
172 // CheckSanityEarly checks for configuration and runtime errors that
173 // can be detected before GetCurrentState() and ComputeChangeSets()
174 // are called.
175 //
176 // If it returns an error, it is pointless to run GetCurrentState or
177 // ComputeChangeSets: after doing so, the statistics would be
178 // meaningless and it would be dangerous to run any Commit methods.
179 func (bal *Balancer) CheckSanityEarly(c *arvados.Client) error {
180         u, err := c.CurrentUser()
181         if err != nil {
182                 return fmt.Errorf("CurrentUser(): %v", err)
183         }
184         if !u.IsActive || !u.IsAdmin {
185                 return fmt.Errorf("current user (%s) is not an active admin user", u.UUID)
186         }
187         for _, srv := range bal.KeepServices {
188                 if srv.ServiceType == "proxy" {
189                         return fmt.Errorf("config error: %s: proxy servers cannot be balanced", srv)
190                 }
191         }
192         return nil
193 }
194
195 // rendezvousState returns a fingerprint (e.g., a sorted list of
196 // UUID+host+port) of the current set of keep services.
197 func (bal *Balancer) rendezvousState() string {
198         srvs := make([]string, 0, len(bal.KeepServices))
199         for _, srv := range bal.KeepServices {
200                 srvs = append(srvs, srv.String())
201         }
202         sort.Strings(srvs)
203         return strings.Join(srvs, "; ")
204 }
205
206 // ClearTrashLists sends an empty trash list to each keep
207 // service. Calling this before GetCurrentState avoids races.
208 //
209 // When a block appears in an index, we assume that replica will still
210 // exist after we delete other replicas on other servers. However,
211 // it's possible that a previous rebalancing operation made different
212 // decisions (e.g., servers were added/removed, and rendezvous order
213 // changed). In this case, the replica might already be on that
214 // server's trash list, and it might be deleted before we send a
215 // replacement trash list.
216 //
217 // We avoid this problem if we clear all trash lists before getting
218 // indexes. (We also assume there is only one rebalancing process
219 // running at a time.)
220 func (bal *Balancer) ClearTrashLists(c *arvados.Client) error {
221         for _, srv := range bal.KeepServices {
222                 srv.ChangeSet = &ChangeSet{}
223         }
224         return bal.CommitTrash(c)
225 }
226
227 // GetCurrentState determines the current replication state, and the
228 // desired replication level, for every block that is either
229 // retrievable or referenced.
230 //
231 // It determines the current replication state by reading the block index
232 // from every known Keep service.
233 //
234 // It determines the desired replication level by retrieving all
235 // collection manifests in the database (API server).
236 //
237 // It encodes the resulting information in BlockStateMap.
238 func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) error {
239         defer timeMe(bal.Logger, "GetCurrentState")()
240         bal.BlockStateMap = NewBlockStateMap()
241
242         dd, err := c.DiscoveryDocument()
243         if err != nil {
244                 return err
245         }
246         bal.DefaultReplication = dd.DefaultCollectionReplication
247         bal.MinMtime = time.Now().UnixNano() - dd.BlobSignatureTTL*1e9
248
249         errs := make(chan error, 2+len(bal.KeepServices))
250         wg := sync.WaitGroup{}
251
252         // Start one goroutine for each KeepService: retrieve the
253         // index, and add the returned blocks to BlockStateMap.
254         for _, srv := range bal.KeepServices {
255                 wg.Add(1)
256                 go func(srv *KeepService) {
257                         defer wg.Done()
258                         bal.logf("%s: retrieve indexes", srv)
259                         for _, mount := range srv.mounts {
260                                 bal.logf("%s: retrieve index", mount)
261                                 idx, err := srv.IndexMount(c, mount.UUID, "")
262                                 if err != nil {
263                                         errs <- fmt.Errorf("%s: retrieve index: %v", mount, err)
264                                         return
265                                 }
266                                 if len(errs) > 0 {
267                                         // Some other goroutine encountered an
268                                         // error -- any further effort here
269                                         // will be wasted.
270                                         return
271                                 }
272                                 bal.logf("%s: add %d replicas to map", mount, len(idx))
273                                 bal.BlockStateMap.AddReplicas(mount, idx)
274                                 bal.logf("%s: done", mount)
275                         }
276                         bal.logf("%s: done", srv)
277                 }(srv)
278         }
279
280         // collQ buffers incoming collections so we can start fetching
281         // the next page without waiting for the current page to
282         // finish processing.
283         collQ := make(chan arvados.Collection, bufs)
284
285         // Start a goroutine to process collections. (We could use a
286         // worker pool here, but even with a single worker we already
287         // process collections much faster than we can retrieve them.)
288         wg.Add(1)
289         go func() {
290                 defer wg.Done()
291                 for coll := range collQ {
292                         err := bal.addCollection(coll)
293                         if err != nil {
294                                 errs <- err
295                                 for range collQ {
296                                 }
297                                 return
298                         }
299                         bal.collScanned++
300                 }
301         }()
302
303         // Start a goroutine to retrieve all collections from the
304         // Arvados database and send them to collQ for processing.
305         wg.Add(1)
306         go func() {
307                 defer wg.Done()
308                 err = EachCollection(c, pageSize,
309                         func(coll arvados.Collection) error {
310                                 collQ <- coll
311                                 if len(errs) > 0 {
312                                         // some other GetCurrentState
313                                         // error happened: no point
314                                         // getting any more
315                                         // collections.
316                                         return fmt.Errorf("")
317                                 }
318                                 return nil
319                         }, func(done, total int) {
320                                 bal.logf("collections: %d/%d", done, total)
321                         })
322                 close(collQ)
323                 if err != nil {
324                         errs <- err
325                 }
326         }()
327
328         wg.Wait()
329         if len(errs) > 0 {
330                 return <-errs
331         }
332         return nil
333 }
334
335 func (bal *Balancer) addCollection(coll arvados.Collection) error {
336         blkids, err := coll.SizedDigests()
337         if err != nil {
338                 bal.mutex.Lock()
339                 bal.errors = append(bal.errors, fmt.Errorf("%v: %v", coll.UUID, err))
340                 bal.mutex.Unlock()
341                 return nil
342         }
343         repl := bal.DefaultReplication
344         if coll.ReplicationDesired != nil {
345                 repl = *coll.ReplicationDesired
346         }
347         debugf("%v: %d block x%d", coll.UUID, len(blkids), repl)
348         bal.BlockStateMap.IncreaseDesired(coll.StorageClassesDesired, repl, blkids)
349         return nil
350 }
351
352 // ComputeChangeSets compares, for each known block, the current and
353 // desired replication states. If it is possible to get closer to the
354 // desired state by copying or deleting blocks, it adds those changes
355 // to the relevant KeepServices' ChangeSets.
356 //
357 // It does not actually apply any of the computed changes.
358 func (bal *Balancer) ComputeChangeSets() {
359         // This just calls balanceBlock() once for each block, using a
360         // pool of worker goroutines.
361         defer timeMe(bal.Logger, "ComputeChangeSets")()
362         bal.setupLookupTables()
363
364         type balanceTask struct {
365                 blkid arvados.SizedDigest
366                 blk   *BlockState
367         }
368         workers := runtime.GOMAXPROCS(-1)
369         todo := make(chan balanceTask, workers)
370         go func() {
371                 bal.BlockStateMap.Apply(func(blkid arvados.SizedDigest, blk *BlockState) {
372                         todo <- balanceTask{
373                                 blkid: blkid,
374                                 blk:   blk,
375                         }
376                 })
377                 close(todo)
378         }()
379         results := make(chan balanceResult, workers)
380         go func() {
381                 var wg sync.WaitGroup
382                 for i := 0; i < workers; i++ {
383                         wg.Add(1)
384                         go func() {
385                                 for work := range todo {
386                                         results <- bal.balanceBlock(work.blkid, work.blk)
387                                 }
388                                 wg.Done()
389                         }()
390                 }
391                 wg.Wait()
392                 close(results)
393         }()
394         bal.collectStatistics(results)
395 }
396
397 func (bal *Balancer) setupLookupTables() {
398         bal.serviceRoots = make(map[string]string)
399         bal.classes = []string{"default"}
400         bal.mountsByClass = map[string]map[*KeepMount]bool{"default": {}}
401         bal.mounts = 0
402         for _, srv := range bal.KeepServices {
403                 bal.serviceRoots[srv.UUID] = srv.UUID
404                 for _, mnt := range srv.mounts {
405                         bal.mounts++
406
407                         // All mounts on a read-only service are
408                         // effectively read-only.
409                         mnt.ReadOnly = mnt.ReadOnly || srv.ReadOnly
410
411                         if len(mnt.StorageClasses) == 0 {
412                                 bal.mountsByClass["default"][mnt] = true
413                                 continue
414                         }
415                         for _, class := range mnt.StorageClasses {
416                                 if mbc := bal.mountsByClass[class]; mbc == nil {
417                                         bal.classes = append(bal.classes, class)
418                                         bal.mountsByClass[class] = map[*KeepMount]bool{mnt: true}
419                                 } else {
420                                         mbc[mnt] = true
421                                 }
422                         }
423                 }
424         }
425         // Consider classes in lexicographic order to avoid flapping
426         // between balancing runs.  The outcome of the "prefer a mount
427         // we're already planning to use for a different storage
428         // class" case in balanceBlock depends on the order classes
429         // are considered.
430         sort.Strings(bal.classes)
431 }
432
433 const (
434         changeStay = iota
435         changePull
436         changeTrash
437         changeNone
438 )
439
440 var changeName = map[int]string{
441         changeStay:  "stay",
442         changePull:  "pull",
443         changeTrash: "trash",
444         changeNone:  "none",
445 }
446
447 type balanceResult struct {
448         blk        *BlockState
449         blkid      arvados.SizedDigest
450         have       int
451         want       int
452         classState map[string]balancedBlockState
453 }
454
455 // balanceBlock compares current state to desired state for a single
456 // block, and makes the appropriate ChangeSet calls.
457 func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) balanceResult {
458         debugf("balanceBlock: %v %+v", blkid, blk)
459
460         type slot struct {
461                 mnt  *KeepMount // never nil
462                 repl *Replica   // replica already stored here (or nil)
463                 want bool       // we should pull/leave a replica here
464         }
465
466         // Build a list of all slots (one per mounted volume).
467         slots := make([]slot, 0, bal.mounts)
468         for _, srv := range bal.KeepServices {
469                 for _, mnt := range srv.mounts {
470                         var repl *Replica
471                         for r := range blk.Replicas {
472                                 if blk.Replicas[r].KeepMount == mnt {
473                                         repl = &blk.Replicas[r]
474                                 }
475                         }
476                         // Initial value of "want" is "have, and can't
477                         // delete". These untrashable replicas get
478                         // prioritized when sorting slots: otherwise,
479                         // non-optimal readonly copies would cause us
480                         // to overreplicate.
481                         slots = append(slots, slot{
482                                 mnt:  mnt,
483                                 repl: repl,
484                                 want: repl != nil && (mnt.ReadOnly || repl.Mtime >= bal.MinMtime),
485                         })
486                 }
487         }
488
489         uuids := keepclient.NewRootSorter(bal.serviceRoots, string(blkid[:32])).GetSortedRoots()
490         srvRendezvous := make(map[*KeepService]int, len(uuids))
491         for i, uuid := range uuids {
492                 srv := bal.KeepServices[uuid]
493                 srvRendezvous[srv] = i
494         }
495
496         // Below we set underreplicated=true if we find any storage
497         // class that's currently underreplicated -- in that case we
498         // won't want to trash any replicas.
499         underreplicated := false
500
501         classState := make(map[string]balancedBlockState, len(bal.classes))
502         unsafeToDelete := make(map[int64]bool, len(slots))
503         for _, class := range bal.classes {
504                 desired := blk.Desired[class]
505
506                 have := 0
507                 for _, slot := range slots {
508                         if slot.repl != nil && bal.mountsByClass[class][slot.mnt] {
509                                 have++
510                         }
511                 }
512                 classState[class] = balancedBlockState{
513                         desired: desired,
514                         surplus: have - desired,
515                 }
516
517                 if desired == 0 {
518                         continue
519                 }
520
521                 // Sort the slots by desirability.
522                 sort.Slice(slots, func(i, j int) bool {
523                         si, sj := slots[i], slots[j]
524                         if classi, classj := bal.mountsByClass[class][si.mnt], bal.mountsByClass[class][sj.mnt]; classi != classj {
525                                 // Prefer a mount that satisfies the
526                                 // desired class.
527                                 return bal.mountsByClass[class][si.mnt]
528                         } else if wanti, wantj := si.want, si.want; wanti != wantj {
529                                 // Prefer a mount that will have a
530                                 // replica no matter what we do here
531                                 // -- either because it already has an
532                                 // untrashable replica, or because we
533                                 // already need it to satisfy a
534                                 // different storage class.
535                                 return slots[i].want
536                         } else if orderi, orderj := srvRendezvous[si.mnt.KeepService], srvRendezvous[sj.mnt.KeepService]; orderi != orderj {
537                                 // Prefer a better rendezvous
538                                 // position.
539                                 return orderi < orderj
540                         } else if repli, replj := si.repl != nil, sj.repl != nil; repli != replj {
541                                 // Prefer a mount that already has a
542                                 // replica.
543                                 return repli
544                         } else {
545                                 // If pull/trash turns out to be
546                                 // needed, distribute the
547                                 // new/remaining replicas uniformly
548                                 // across qualifying mounts on a given
549                                 // server.
550                                 return rendezvousLess(si.mnt.DeviceID, sj.mnt.DeviceID, blkid)
551                         }
552                 })
553
554                 // Servers and mounts (with or without existing
555                 // replicas) that are part of the best achievable
556                 // layout for this storage class.
557                 wantSrv := map[*KeepService]bool{}
558                 wantMnt := map[*KeepMount]bool{}
559                 // Positions (with existing replicas) that have been
560                 // protected (via unsafeToDelete) to ensure we don't
561                 // reduce replication below desired level when
562                 // trashing replicas that aren't optimal positions for
563                 // any storage class.
564                 protMnt := map[*KeepMount]bool{}
565
566                 // trySlot tries using a slot to meet requirements,
567                 // and returns true if all requirements are met.
568                 trySlot := func(i int) bool {
569                         slot := slots[i]
570                         if len(protMnt) < desired && slot.repl != nil {
571                                 unsafeToDelete[slot.repl.Mtime] = true
572                                 protMnt[slot.mnt] = true
573                         }
574                         if len(wantMnt) < desired && (slot.repl != nil || !slot.mnt.ReadOnly) {
575                                 slots[i].want = true
576                                 wantSrv[slot.mnt.KeepService] = true
577                                 wantMnt[slot.mnt] = true
578                         }
579                         return len(protMnt) >= desired && len(wantMnt) >= desired
580                 }
581
582                 // First try to achieve desired replication without
583                 // using the same server twice.
584                 done := false
585                 for i := 0; i < len(slots) && !done; i++ {
586                         if !wantSrv[slots[i].mnt.KeepService] {
587                                 done = trySlot(i)
588                         }
589                 }
590
591                 // If that didn't suffice, do another pass without the
592                 // "distinct services" restriction. (Achieving the
593                 // desired volume replication on fewer than the
594                 // desired number of services is better than
595                 // underreplicating.)
596                 for i := 0; i < len(slots) && !done; i++ {
597                         done = trySlot(i)
598                 }
599
600                 if !underreplicated {
601                         safe := 0
602                         for _, slot := range slots {
603                                 if slot.repl == nil || !bal.mountsByClass[class][slot.mnt] {
604                                         continue
605                                 }
606                                 if safe++; safe >= desired {
607                                         break
608                                 }
609                         }
610                         underreplicated = safe < desired
611                 }
612
613                 // set the unachievable flag if there aren't enough
614                 // slots offering the relevant storage class. (This is
615                 // as easy as checking slots[desired] because we
616                 // already sorted the qualifying slots to the front.)
617                 if desired >= len(slots) || !bal.mountsByClass[class][slots[desired].mnt] {
618                         cs := classState[class]
619                         cs.unachievable = true
620                         classState[class] = cs
621                 }
622         }
623
624         // TODO: If multiple replicas are trashable, prefer the oldest
625         // replica that doesn't have a timestamp collision with
626         // others.
627
628         var have, want int
629         for _, slot := range slots {
630                 if slot.want {
631                         want++
632                 }
633                 if slot.repl != nil {
634                         have++
635                 }
636         }
637
638         var changes []string
639         for _, slot := range slots {
640                 // TODO: request a Touch if Mtime is duplicated.
641                 var change int
642                 switch {
643                 case !underreplicated && slot.repl != nil && !slot.want && !unsafeToDelete[slot.repl.Mtime]:
644                         slot.mnt.KeepService.AddTrash(Trash{
645                                 SizedDigest: blkid,
646                                 Mtime:       slot.repl.Mtime,
647                                 From:        slot.mnt,
648                         })
649                         change = changeTrash
650                 case len(blk.Replicas) == 0:
651                         change = changeNone
652                 case slot.repl == nil && slot.want && !slot.mnt.ReadOnly:
653                         slot.mnt.KeepService.AddPull(Pull{
654                                 SizedDigest: blkid,
655                                 From:        blk.Replicas[0].KeepMount.KeepService,
656                                 To:          slot.mnt,
657                         })
658                         change = changePull
659                 default:
660                         change = changeStay
661                 }
662                 if bal.Dumper != nil {
663                         var mtime int64
664                         if slot.repl != nil {
665                                 mtime = slot.repl.Mtime
666                         }
667                         srv := slot.mnt.KeepService
668                         changes = append(changes, fmt.Sprintf("%s:%d/%s=%s,%d", srv.ServiceHost, srv.ServicePort, slot.mnt.UUID, changeName[change], mtime))
669                 }
670         }
671         if bal.Dumper != nil {
672                 bal.Dumper.Printf("%s have=%d want=%v %s", blkid, have, want, strings.Join(changes, " "))
673         }
674         return balanceResult{
675                 blk:        blk,
676                 blkid:      blkid,
677                 have:       have,
678                 want:       want,
679                 classState: classState,
680         }
681 }
682
683 type blocksNBytes struct {
684         replicas int
685         blocks   int
686         bytes    int64
687 }
688
689 func (bb blocksNBytes) String() string {
690         return fmt.Sprintf("%d replicas (%d blocks, %d bytes)", bb.replicas, bb.blocks, bb.bytes)
691 }
692
693 type balancerStats struct {
694         lost          blocksNBytes
695         overrep       blocksNBytes
696         unref         blocksNBytes
697         garbage       blocksNBytes
698         underrep      blocksNBytes
699         unachievable  blocksNBytes
700         justright     blocksNBytes
701         desired       blocksNBytes
702         current       blocksNBytes
703         pulls         int
704         trashes       int
705         replHistogram []int
706         classStats    map[string]replicationStats
707 }
708
709 type replicationStats struct {
710         desired      blocksNBytes
711         surplus      blocksNBytes
712         short        blocksNBytes
713         unachievable blocksNBytes
714 }
715
716 type balancedBlockState struct {
717         desired      int
718         surplus      int
719         unachievable bool
720 }
721
722 func (bal *Balancer) collectStatistics(results <-chan balanceResult) {
723         var s balancerStats
724         s.replHistogram = make([]int, 2)
725         s.classStats = make(map[string]replicationStats, len(bal.classes))
726         for result := range results {
727                 surplus := result.have - result.want
728                 bytes := result.blkid.Size()
729
730                 for class, state := range result.classState {
731                         cs := s.classStats[class]
732                         if state.unachievable {
733                                 cs.unachievable.blocks++
734                                 cs.unachievable.bytes += bytes
735                         }
736                         if state.desired > 0 {
737                                 cs.desired.replicas += state.desired
738                                 cs.desired.blocks++
739                                 cs.desired.bytes += bytes * int64(state.desired)
740                         }
741                         if state.surplus > 0 {
742                                 cs.surplus.replicas += state.surplus
743                                 cs.surplus.blocks++
744                                 cs.surplus.bytes += bytes * int64(state.surplus)
745                         } else if state.surplus < 0 {
746                                 cs.short.replicas += -state.surplus
747                                 cs.short.blocks++
748                                 cs.short.bytes += bytes * int64(-state.surplus)
749                         }
750                         s.classStats[class] = cs
751                 }
752
753                 switch {
754                 case result.have == 0 && result.want > 0:
755                         s.lost.replicas -= surplus
756                         s.lost.blocks++
757                         s.lost.bytes += bytes * int64(-surplus)
758                 case surplus < 0:
759                         s.underrep.replicas -= surplus
760                         s.underrep.blocks++
761                         s.underrep.bytes += bytes * int64(-surplus)
762                 case surplus > 0 && result.want == 0:
763                         counter := &s.garbage
764                         for _, r := range result.blk.Replicas {
765                                 if r.Mtime >= bal.MinMtime {
766                                         counter = &s.unref
767                                         break
768                                 }
769                         }
770                         counter.replicas += surplus
771                         counter.blocks++
772                         counter.bytes += bytes * int64(surplus)
773                 case surplus > 0:
774                         s.overrep.replicas += surplus
775                         s.overrep.blocks++
776                         s.overrep.bytes += bytes * int64(len(result.blk.Replicas)-result.want)
777                 default:
778                         s.justright.replicas += result.want
779                         s.justright.blocks++
780                         s.justright.bytes += bytes * int64(result.want)
781                 }
782
783                 if result.want > 0 {
784                         s.desired.replicas += result.want
785                         s.desired.blocks++
786                         s.desired.bytes += bytes * int64(result.want)
787                 }
788                 if len(result.blk.Replicas) > 0 {
789                         s.current.replicas += len(result.blk.Replicas)
790                         s.current.blocks++
791                         s.current.bytes += bytes * int64(len(result.blk.Replicas))
792                 }
793
794                 for len(s.replHistogram) <= len(result.blk.Replicas) {
795                         s.replHistogram = append(s.replHistogram, 0)
796                 }
797                 s.replHistogram[len(result.blk.Replicas)]++
798         }
799         for _, srv := range bal.KeepServices {
800                 s.pulls += len(srv.ChangeSet.Pulls)
801                 s.trashes += len(srv.ChangeSet.Trashes)
802         }
803         bal.stats = s
804 }
805
806 // PrintStatistics writes statistics about the computed changes to
807 // bal.Logger. It should not be called until ComputeChangeSets has
808 // finished.
809 func (bal *Balancer) PrintStatistics() {
810         bal.logf("===")
811         bal.logf("%s lost (0=have<want)", bal.stats.lost)
812         bal.logf("%s underreplicated (0<have<want)", bal.stats.underrep)
813         bal.logf("%s just right (have=want)", bal.stats.justright)
814         bal.logf("%s overreplicated (have>want>0)", bal.stats.overrep)
815         bal.logf("%s unreferenced (have>want=0, new)", bal.stats.unref)
816         bal.logf("%s garbage (have>want=0, old)", bal.stats.garbage)
817         for _, class := range bal.classes {
818                 cs := bal.stats.classStats[class]
819                 bal.logf("===")
820                 bal.logf("storage class %q: %s desired", class, cs.desired)
821                 bal.logf("storage class %q: %s short", class, cs.short)
822                 bal.logf("storage class %q: %s surplus", class, cs.surplus)
823                 bal.logf("storage class %q: %s unachievable", class, cs.unachievable)
824         }
825         bal.logf("===")
826         bal.logf("%s total commitment (excluding unreferenced)", bal.stats.desired)
827         bal.logf("%s total usage", bal.stats.current)
828         bal.logf("===")
829         for _, srv := range bal.KeepServices {
830                 bal.logf("%s: %v\n", srv, srv.ChangeSet)
831         }
832         bal.logf("===")
833         bal.printHistogram(60)
834         bal.logf("===")
835 }
836
837 func (bal *Balancer) printHistogram(hashColumns int) {
838         bal.logf("Replication level distribution (counting N replicas on a single server as N):")
839         maxCount := 0
840         for _, count := range bal.stats.replHistogram {
841                 if maxCount < count {
842                         maxCount = count
843                 }
844         }
845         hashes := strings.Repeat("#", hashColumns)
846         countWidth := 1 + int(math.Log10(float64(maxCount+1)))
847         scaleCount := 10 * float64(hashColumns) / math.Floor(1+10*math.Log10(float64(maxCount+1)))
848         for repl, count := range bal.stats.replHistogram {
849                 nHashes := int(scaleCount * math.Log10(float64(count+1)))
850                 bal.logf("%2d: %*d %s", repl, countWidth, count, hashes[:nHashes])
851         }
852 }
853
854 // CheckSanityLate checks for configuration and runtime errors after
855 // GetCurrentState() and ComputeChangeSets() have finished.
856 //
857 // If it returns an error, it is dangerous to run any Commit methods.
858 func (bal *Balancer) CheckSanityLate() error {
859         if bal.errors != nil {
860                 for _, err := range bal.errors {
861                         bal.logf("deferred error: %v", err)
862                 }
863                 return fmt.Errorf("cannot proceed safely after deferred errors")
864         }
865
866         if bal.collScanned == 0 {
867                 return fmt.Errorf("received zero collections")
868         }
869
870         anyDesired := false
871         bal.BlockStateMap.Apply(func(_ arvados.SizedDigest, blk *BlockState) {
872                 for _, desired := range blk.Desired {
873                         if desired > 0 {
874                                 anyDesired = true
875                                 break
876                         }
877                 }
878         })
879         if !anyDesired {
880                 return fmt.Errorf("zero blocks have desired replication>0")
881         }
882
883         if dr := bal.DefaultReplication; dr < 1 {
884                 return fmt.Errorf("Default replication (%d) is less than 1", dr)
885         }
886
887         // TODO: no two services have identical indexes
888         // TODO: no collisions (same md5, different size)
889         return nil
890 }
891
892 // CommitPulls sends the computed lists of pull requests to the
893 // keepstore servers. This has the effect of increasing replication of
894 // existing blocks that are either underreplicated or poorly
895 // distributed according to rendezvous hashing.
896 func (bal *Balancer) CommitPulls(c *arvados.Client) error {
897         return bal.commitAsync(c, "send pull list",
898                 func(srv *KeepService) error {
899                         return srv.CommitPulls(c)
900                 })
901 }
902
903 // CommitTrash sends the computed lists of trash requests to the
904 // keepstore servers. This has the effect of deleting blocks that are
905 // overreplicated or unreferenced.
906 func (bal *Balancer) CommitTrash(c *arvados.Client) error {
907         return bal.commitAsync(c, "send trash list",
908                 func(srv *KeepService) error {
909                         return srv.CommitTrash(c)
910                 })
911 }
912
913 func (bal *Balancer) commitAsync(c *arvados.Client, label string, f func(srv *KeepService) error) error {
914         errs := make(chan error)
915         for _, srv := range bal.KeepServices {
916                 go func(srv *KeepService) {
917                         var err error
918                         defer func() { errs <- err }()
919                         label := fmt.Sprintf("%s: %v", srv, label)
920                         defer timeMe(bal.Logger, label)()
921                         err = f(srv)
922                         if err != nil {
923                                 err = fmt.Errorf("%s: %v", label, err)
924                         }
925                 }(srv)
926         }
927         var lastErr error
928         for range bal.KeepServices {
929                 if err := <-errs; err != nil {
930                         bal.logf("%v", err)
931                         lastErr = err
932                 }
933         }
934         close(errs)
935         return lastErr
936 }
937
938 func (bal *Balancer) logf(f string, args ...interface{}) {
939         if bal.Logger != nil {
940                 bal.Logger.Printf(f, args...)
941         }
942 }
943
944 // Rendezvous hash sort function. Less efficient than sorting on
945 // precomputed rendezvous hashes, but also rarely used.
946 func rendezvousLess(i, j string, blkid arvados.SizedDigest) bool {
947         a := md5.Sum([]byte(string(blkid[:32]) + i))
948         b := md5.Sum([]byte(string(blkid[:32]) + j))
949         return bytes.Compare(a[:], b[:]) < 0
950 }