14595: Fix comment.
[arvados.git] / services / keep-balance / balance.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "crypto/md5"
10         "fmt"
11         "log"
12         "math"
13         "runtime"
14         "sort"
15         "strings"
16         "sync"
17         "time"
18
19         "git.curoverse.com/arvados.git/sdk/go/arvados"
20         "git.curoverse.com/arvados.git/sdk/go/keepclient"
21         "github.com/Sirupsen/logrus"
22 )
23
24 // Balancer compares the contents of keepstore servers with the
25 // collections stored in Arvados, and issues pull/trash requests
26 // needed to get (closer to) the optimal data layout.
27 //
28 // In the optimal data layout: every data block referenced by a
29 // collection is replicated at least as many times as desired by the
30 // collection; there are no unreferenced data blocks older than
31 // BlobSignatureTTL; and all N existing replicas of a given data block
32 // are in the N best positions in rendezvous probe order.
33 type Balancer struct {
34         Logger  *logrus.Logger
35         Dumper  *logrus.Logger
36         Metrics *metrics
37
38         *BlockStateMap
39         KeepServices       map[string]*KeepService
40         DefaultReplication int
41         MinMtime           int64
42
43         classes       []string
44         mounts        int
45         mountsByClass map[string]map[*KeepMount]bool
46         collScanned   int
47         serviceRoots  map[string]string
48         errors        []error
49         stats         balancerStats
50         mutex         sync.Mutex
51 }
52
53 // Run performs a balance operation using the given config and
54 // runOptions, and returns RunOptions suitable for passing to a
55 // subsequent balance operation.
56 //
57 // Run should only be called once on a given Balancer object.
58 //
59 // Typical usage:
60 //
61 //   runOptions, err = (&Balancer{}).Run(config, runOptions)
62 func (bal *Balancer) Run(config Config, runOptions RunOptions) (nextRunOptions RunOptions, err error) {
63         nextRunOptions = runOptions
64
65         defer bal.time("sweep", "wall clock time to run one full sweep")()
66
67         if len(config.KeepServiceList.Items) > 0 {
68                 err = bal.SetKeepServices(config.KeepServiceList)
69         } else {
70                 err = bal.DiscoverKeepServices(&config.Client, config.KeepServiceTypes)
71         }
72         if err != nil {
73                 return
74         }
75
76         for _, srv := range bal.KeepServices {
77                 err = srv.discoverMounts(&config.Client)
78                 if err != nil {
79                         return
80                 }
81         }
82         bal.cleanupMounts()
83
84         if err = bal.CheckSanityEarly(&config.Client); err != nil {
85                 return
86         }
87         rs := bal.rendezvousState()
88         if runOptions.CommitTrash && rs != runOptions.SafeRendezvousState {
89                 if runOptions.SafeRendezvousState != "" {
90                         bal.logf("notice: KeepServices list has changed since last run")
91                 }
92                 bal.logf("clearing existing trash lists, in case the new rendezvous order differs from previous run")
93                 if err = bal.ClearTrashLists(&config.Client); err != nil {
94                         return
95                 }
96                 // The current rendezvous state becomes "safe" (i.e.,
97                 // OK to compute changes for that state without
98                 // clearing existing trash lists) only now, after we
99                 // succeed in clearing existing trash lists.
100                 nextRunOptions.SafeRendezvousState = rs
101         }
102         if err = bal.GetCurrentState(&config.Client, config.CollectionBatchSize, config.CollectionBuffers); err != nil {
103                 return
104         }
105         bal.ComputeChangeSets()
106         bal.PrintStatistics()
107         if err = bal.CheckSanityLate(); err != nil {
108                 return
109         }
110         if runOptions.CommitPulls {
111                 err = bal.CommitPulls(&config.Client)
112                 if err != nil {
113                         // Skip trash if we can't pull. (Too cautious?)
114                         return
115                 }
116         }
117         if runOptions.CommitTrash {
118                 err = bal.CommitTrash(&config.Client)
119         }
120         return
121 }
122
123 // SetKeepServices sets the list of KeepServices to operate on.
124 func (bal *Balancer) SetKeepServices(srvList arvados.KeepServiceList) error {
125         bal.KeepServices = make(map[string]*KeepService)
126         for _, srv := range srvList.Items {
127                 bal.KeepServices[srv.UUID] = &KeepService{
128                         KeepService: srv,
129                         ChangeSet:   &ChangeSet{},
130                 }
131         }
132         return nil
133 }
134
135 // DiscoverKeepServices sets the list of KeepServices by calling the
136 // API to get a list of all services, and selecting the ones whose
137 // ServiceType is in okTypes.
138 func (bal *Balancer) DiscoverKeepServices(c *arvados.Client, okTypes []string) error {
139         bal.KeepServices = make(map[string]*KeepService)
140         ok := make(map[string]bool)
141         for _, t := range okTypes {
142                 ok[t] = true
143         }
144         return c.EachKeepService(func(srv arvados.KeepService) error {
145                 if ok[srv.ServiceType] {
146                         bal.KeepServices[srv.UUID] = &KeepService{
147                                 KeepService: srv,
148                                 ChangeSet:   &ChangeSet{},
149                         }
150                 } else {
151                         bal.logf("skipping %v with service type %q", srv.UUID, srv.ServiceType)
152                 }
153                 return nil
154         })
155 }
156
157 func (bal *Balancer) cleanupMounts() {
158         rwdev := map[string]*KeepService{}
159         for _, srv := range bal.KeepServices {
160                 for _, mnt := range srv.mounts {
161                         if !mnt.ReadOnly && mnt.DeviceID != "" {
162                                 rwdev[mnt.DeviceID] = srv
163                         }
164                 }
165         }
166         // Drop the readonly mounts whose device is mounted RW
167         // elsewhere.
168         for _, srv := range bal.KeepServices {
169                 var dedup []*KeepMount
170                 for _, mnt := range srv.mounts {
171                         if mnt.ReadOnly && rwdev[mnt.DeviceID] != nil {
172                                 bal.logf("skipping srv %s readonly mount %q because same device %q is mounted read-write on srv %s", srv, mnt.UUID, mnt.DeviceID, rwdev[mnt.DeviceID])
173                         } else {
174                                 dedup = append(dedup, mnt)
175                         }
176                 }
177                 srv.mounts = dedup
178         }
179         for _, srv := range bal.KeepServices {
180                 for _, mnt := range srv.mounts {
181                         if mnt.Replication <= 0 {
182                                 log.Printf("%s: mount %s reports replication=%d, using replication=1", srv, mnt.UUID, mnt.Replication)
183                                 mnt.Replication = 1
184                         }
185                 }
186         }
187 }
188
189 // CheckSanityEarly checks for configuration and runtime errors that
190 // can be detected before GetCurrentState() and ComputeChangeSets()
191 // are called.
192 //
193 // If it returns an error, it is pointless to run GetCurrentState or
194 // ComputeChangeSets: after doing so, the statistics would be
195 // meaningless and it would be dangerous to run any Commit methods.
196 func (bal *Balancer) CheckSanityEarly(c *arvados.Client) error {
197         u, err := c.CurrentUser()
198         if err != nil {
199                 return fmt.Errorf("CurrentUser(): %v", err)
200         }
201         if !u.IsActive || !u.IsAdmin {
202                 return fmt.Errorf("current user (%s) is not an active admin user", u.UUID)
203         }
204         for _, srv := range bal.KeepServices {
205                 if srv.ServiceType == "proxy" {
206                         return fmt.Errorf("config error: %s: proxy servers cannot be balanced", srv)
207                 }
208         }
209         return nil
210 }
211
212 // rendezvousState returns a fingerprint (e.g., a sorted list of
213 // UUID+host+port) of the current set of keep services.
214 func (bal *Balancer) rendezvousState() string {
215         srvs := make([]string, 0, len(bal.KeepServices))
216         for _, srv := range bal.KeepServices {
217                 srvs = append(srvs, srv.String())
218         }
219         sort.Strings(srvs)
220         return strings.Join(srvs, "; ")
221 }
222
223 // ClearTrashLists sends an empty trash list to each keep
224 // service. Calling this before GetCurrentState avoids races.
225 //
226 // When a block appears in an index, we assume that replica will still
227 // exist after we delete other replicas on other servers. However,
228 // it's possible that a previous rebalancing operation made different
229 // decisions (e.g., servers were added/removed, and rendezvous order
230 // changed). In this case, the replica might already be on that
231 // server's trash list, and it might be deleted before we send a
232 // replacement trash list.
233 //
234 // We avoid this problem if we clear all trash lists before getting
235 // indexes. (We also assume there is only one rebalancing process
236 // running at a time.)
237 func (bal *Balancer) ClearTrashLists(c *arvados.Client) error {
238         for _, srv := range bal.KeepServices {
239                 srv.ChangeSet = &ChangeSet{}
240         }
241         return bal.CommitTrash(c)
242 }
243
244 // GetCurrentState determines the current replication state, and the
245 // desired replication level, for every block that is either
246 // retrievable or referenced.
247 //
248 // It determines the current replication state by reading the block index
249 // from every known Keep service.
250 //
251 // It determines the desired replication level by retrieving all
252 // collection manifests in the database (API server).
253 //
254 // It encodes the resulting information in BlockStateMap.
255 func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) error {
256         defer bal.time("get_state", "wall clock time to get current state")()
257         bal.BlockStateMap = NewBlockStateMap()
258
259         dd, err := c.DiscoveryDocument()
260         if err != nil {
261                 return err
262         }
263         bal.DefaultReplication = dd.DefaultCollectionReplication
264         bal.MinMtime = time.Now().UnixNano() - dd.BlobSignatureTTL*1e9
265
266         errs := make(chan error, 1)
267         wg := sync.WaitGroup{}
268
269         // When a device is mounted more than once, we will get its
270         // index only once, and call AddReplicas on all of the mounts.
271         // equivMount keys are the mounts that will be indexed, and
272         // each value is a list of mounts to apply the received index
273         // to.
274         equivMount := map[*KeepMount][]*KeepMount{}
275         // deviceMount maps each device ID to the one mount that will
276         // be indexed for that device.
277         deviceMount := map[string]*KeepMount{}
278         for _, srv := range bal.KeepServices {
279                 for _, mnt := range srv.mounts {
280                         equiv := deviceMount[mnt.DeviceID]
281                         if equiv == nil {
282                                 equiv = mnt
283                                 if mnt.DeviceID != "" {
284                                         deviceMount[mnt.DeviceID] = equiv
285                                 }
286                         }
287                         equivMount[equiv] = append(equivMount[equiv], mnt)
288                 }
289         }
290
291         // Start one goroutine for each (non-redundant) mount:
292         // retrieve the index, and add the returned blocks to
293         // BlockStateMap.
294         for _, mounts := range equivMount {
295                 wg.Add(1)
296                 go func(mounts []*KeepMount) {
297                         defer wg.Done()
298                         bal.logf("mount %s: retrieve index from %s", mounts[0], mounts[0].KeepService)
299                         idx, err := mounts[0].KeepService.IndexMount(c, mounts[0].UUID, "")
300                         if err != nil {
301                                 select {
302                                 case errs <- fmt.Errorf("%s: retrieve index: %v", mounts[0], err):
303                                 default:
304                                 }
305                                 return
306                         }
307                         if len(errs) > 0 {
308                                 // Some other goroutine encountered an
309                                 // error -- any further effort here
310                                 // will be wasted.
311                                 return
312                         }
313                         for _, mount := range mounts {
314                                 bal.logf("%s: add %d entries to map", mount, len(idx))
315                                 bal.BlockStateMap.AddReplicas(mount, idx)
316                                 bal.logf("%s: added %d entries to map at %dx (%d replicas)", mount, len(idx), mount.Replication, len(idx)*mount.Replication)
317                         }
318                         bal.logf("mount %s: index done", mounts[0])
319                 }(mounts)
320         }
321
322         // collQ buffers incoming collections so we can start fetching
323         // the next page without waiting for the current page to
324         // finish processing.
325         collQ := make(chan arvados.Collection, bufs)
326
327         // Start a goroutine to process collections. (We could use a
328         // worker pool here, but even with a single worker we already
329         // process collections much faster than we can retrieve them.)
330         wg.Add(1)
331         go func() {
332                 defer wg.Done()
333                 for coll := range collQ {
334                         err := bal.addCollection(coll)
335                         if err != nil {
336                                 select {
337                                 case errs <- err:
338                                 default:
339                                 }
340                                 for range collQ {
341                                 }
342                                 return
343                         }
344                         bal.collScanned++
345                 }
346         }()
347
348         // Start a goroutine to retrieve all collections from the
349         // Arvados database and send them to collQ for processing.
350         wg.Add(1)
351         go func() {
352                 defer wg.Done()
353                 err = EachCollection(c, pageSize,
354                         func(coll arvados.Collection) error {
355                                 collQ <- coll
356                                 if len(errs) > 0 {
357                                         // some other GetCurrentState
358                                         // error happened: no point
359                                         // getting any more
360                                         // collections.
361                                         return fmt.Errorf("")
362                                 }
363                                 return nil
364                         }, func(done, total int) {
365                                 bal.logf("collections: %d/%d", done, total)
366                         })
367                 close(collQ)
368                 if err != nil {
369                         select {
370                         case errs <- err:
371                         default:
372                         }
373                 }
374         }()
375
376         wg.Wait()
377         if len(errs) > 0 {
378                 return <-errs
379         }
380         return nil
381 }
382
383 func (bal *Balancer) addCollection(coll arvados.Collection) error {
384         blkids, err := coll.SizedDigests()
385         if err != nil {
386                 bal.mutex.Lock()
387                 bal.errors = append(bal.errors, fmt.Errorf("%v: %v", coll.UUID, err))
388                 bal.mutex.Unlock()
389                 return nil
390         }
391         repl := bal.DefaultReplication
392         if coll.ReplicationDesired != nil {
393                 repl = *coll.ReplicationDesired
394         }
395         debugf("%v: %d block x%d", coll.UUID, len(blkids), repl)
396         bal.BlockStateMap.IncreaseDesired(coll.StorageClassesDesired, repl, blkids)
397         return nil
398 }
399
400 // ComputeChangeSets compares, for each known block, the current and
401 // desired replication states. If it is possible to get closer to the
402 // desired state by copying or deleting blocks, it adds those changes
403 // to the relevant KeepServices' ChangeSets.
404 //
405 // It does not actually apply any of the computed changes.
406 func (bal *Balancer) ComputeChangeSets() {
407         // This just calls balanceBlock() once for each block, using a
408         // pool of worker goroutines.
409         defer bal.time("changeset_compute", "wall clock time to compute changesets")()
410         bal.setupLookupTables()
411
412         type balanceTask struct {
413                 blkid arvados.SizedDigest
414                 blk   *BlockState
415         }
416         workers := runtime.GOMAXPROCS(-1)
417         todo := make(chan balanceTask, workers)
418         go func() {
419                 bal.BlockStateMap.Apply(func(blkid arvados.SizedDigest, blk *BlockState) {
420                         todo <- balanceTask{
421                                 blkid: blkid,
422                                 blk:   blk,
423                         }
424                 })
425                 close(todo)
426         }()
427         results := make(chan balanceResult, workers)
428         go func() {
429                 var wg sync.WaitGroup
430                 for i := 0; i < workers; i++ {
431                         wg.Add(1)
432                         go func() {
433                                 for work := range todo {
434                                         results <- bal.balanceBlock(work.blkid, work.blk)
435                                 }
436                                 wg.Done()
437                         }()
438                 }
439                 wg.Wait()
440                 close(results)
441         }()
442         bal.collectStatistics(results)
443 }
444
445 func (bal *Balancer) setupLookupTables() {
446         bal.serviceRoots = make(map[string]string)
447         bal.classes = []string{"default"}
448         bal.mountsByClass = map[string]map[*KeepMount]bool{"default": {}}
449         bal.mounts = 0
450         for _, srv := range bal.KeepServices {
451                 bal.serviceRoots[srv.UUID] = srv.UUID
452                 for _, mnt := range srv.mounts {
453                         bal.mounts++
454
455                         // All mounts on a read-only service are
456                         // effectively read-only.
457                         mnt.ReadOnly = mnt.ReadOnly || srv.ReadOnly
458
459                         if len(mnt.StorageClasses) == 0 {
460                                 bal.mountsByClass["default"][mnt] = true
461                                 continue
462                         }
463                         for _, class := range mnt.StorageClasses {
464                                 if mbc := bal.mountsByClass[class]; mbc == nil {
465                                         bal.classes = append(bal.classes, class)
466                                         bal.mountsByClass[class] = map[*KeepMount]bool{mnt: true}
467                                 } else {
468                                         mbc[mnt] = true
469                                 }
470                         }
471                 }
472         }
473         // Consider classes in lexicographic order to avoid flapping
474         // between balancing runs.  The outcome of the "prefer a mount
475         // we're already planning to use for a different storage
476         // class" case in balanceBlock depends on the order classes
477         // are considered.
478         sort.Strings(bal.classes)
479 }
480
481 const (
482         changeStay = iota
483         changePull
484         changeTrash
485         changeNone
486 )
487
488 var changeName = map[int]string{
489         changeStay:  "stay",
490         changePull:  "pull",
491         changeTrash: "trash",
492         changeNone:  "none",
493 }
494
495 type balanceResult struct {
496         blk        *BlockState
497         blkid      arvados.SizedDigest
498         have       int
499         want       int
500         classState map[string]balancedBlockState
501 }
502
503 // balanceBlock compares current state to desired state for a single
504 // block, and makes the appropriate ChangeSet calls.
505 func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) balanceResult {
506         debugf("balanceBlock: %v %+v", blkid, blk)
507
508         type slot struct {
509                 mnt  *KeepMount // never nil
510                 repl *Replica   // replica already stored here (or nil)
511                 want bool       // we should pull/leave a replica here
512         }
513
514         // Build a list of all slots (one per mounted volume).
515         slots := make([]slot, 0, bal.mounts)
516         for _, srv := range bal.KeepServices {
517                 for _, mnt := range srv.mounts {
518                         var repl *Replica
519                         for r := range blk.Replicas {
520                                 if blk.Replicas[r].KeepMount == mnt {
521                                         repl = &blk.Replicas[r]
522                                 }
523                         }
524                         // Initial value of "want" is "have, and can't
525                         // delete". These untrashable replicas get
526                         // prioritized when sorting slots: otherwise,
527                         // non-optimal readonly copies would cause us
528                         // to overreplicate.
529                         slots = append(slots, slot{
530                                 mnt:  mnt,
531                                 repl: repl,
532                                 want: repl != nil && mnt.ReadOnly,
533                         })
534                 }
535         }
536
537         uuids := keepclient.NewRootSorter(bal.serviceRoots, string(blkid[:32])).GetSortedRoots()
538         srvRendezvous := make(map[*KeepService]int, len(uuids))
539         for i, uuid := range uuids {
540                 srv := bal.KeepServices[uuid]
541                 srvRendezvous[srv] = i
542         }
543
544         // Below we set underreplicated=true if we find any storage
545         // class that's currently underreplicated -- in that case we
546         // won't want to trash any replicas.
547         underreplicated := false
548
549         classState := make(map[string]balancedBlockState, len(bal.classes))
550         unsafeToDelete := make(map[int64]bool, len(slots))
551         for _, class := range bal.classes {
552                 desired := blk.Desired[class]
553
554                 countedDev := map[string]bool{}
555                 have := 0
556                 for _, slot := range slots {
557                         if slot.repl != nil && bal.mountsByClass[class][slot.mnt] && !countedDev[slot.mnt.DeviceID] {
558                                 have += slot.mnt.Replication
559                                 if slot.mnt.DeviceID != "" {
560                                         countedDev[slot.mnt.DeviceID] = true
561                                 }
562                         }
563                 }
564                 classState[class] = balancedBlockState{
565                         desired: desired,
566                         surplus: have - desired,
567                 }
568
569                 if desired == 0 {
570                         continue
571                 }
572
573                 // Sort the slots by desirability.
574                 sort.Slice(slots, func(i, j int) bool {
575                         si, sj := slots[i], slots[j]
576                         if classi, classj := bal.mountsByClass[class][si.mnt], bal.mountsByClass[class][sj.mnt]; classi != classj {
577                                 // Prefer a mount that satisfies the
578                                 // desired class.
579                                 return bal.mountsByClass[class][si.mnt]
580                         } else if si.want != sj.want {
581                                 // Prefer a mount that will have a
582                                 // replica no matter what we do here
583                                 // -- either because it already has an
584                                 // untrashable replica, or because we
585                                 // already need it to satisfy a
586                                 // different storage class.
587                                 return si.want
588                         } else if orderi, orderj := srvRendezvous[si.mnt.KeepService], srvRendezvous[sj.mnt.KeepService]; orderi != orderj {
589                                 // Prefer a better rendezvous
590                                 // position.
591                                 return orderi < orderj
592                         } else if repli, replj := si.repl != nil, sj.repl != nil; repli != replj {
593                                 // Prefer a mount that already has a
594                                 // replica.
595                                 return repli
596                         } else {
597                                 // If pull/trash turns out to be
598                                 // needed, distribute the
599                                 // new/remaining replicas uniformly
600                                 // across qualifying mounts on a given
601                                 // server.
602                                 return rendezvousLess(si.mnt.DeviceID, sj.mnt.DeviceID, blkid)
603                         }
604                 })
605
606                 // Servers/mounts/devices (with or without existing
607                 // replicas) that are part of the best achievable
608                 // layout for this storage class.
609                 wantSrv := map[*KeepService]bool{}
610                 wantMnt := map[*KeepMount]bool{}
611                 wantDev := map[string]bool{}
612                 // Positions (with existing replicas) that have been
613                 // protected (via unsafeToDelete) to ensure we don't
614                 // reduce replication below desired level when
615                 // trashing replicas that aren't optimal positions for
616                 // any storage class.
617                 protMnt := map[*KeepMount]bool{}
618                 // Replication planned so far (corresponds to wantMnt).
619                 replWant := 0
620                 // Protected replication (corresponds to protMnt).
621                 replProt := 0
622
623                 // trySlot tries using a slot to meet requirements,
624                 // and returns true if all requirements are met.
625                 trySlot := func(i int) bool {
626                         slot := slots[i]
627                         if wantMnt[slot.mnt] || wantDev[slot.mnt.DeviceID] {
628                                 // Already allocated a replica to this
629                                 // backend device, possibly on a
630                                 // different server.
631                                 return false
632                         }
633                         if replProt < desired && slot.repl != nil && !protMnt[slot.mnt] {
634                                 unsafeToDelete[slot.repl.Mtime] = true
635                                 protMnt[slot.mnt] = true
636                                 replProt += slot.mnt.Replication
637                         }
638                         if replWant < desired && (slot.repl != nil || !slot.mnt.ReadOnly) {
639                                 slots[i].want = true
640                                 wantSrv[slot.mnt.KeepService] = true
641                                 wantMnt[slot.mnt] = true
642                                 if slot.mnt.DeviceID != "" {
643                                         wantDev[slot.mnt.DeviceID] = true
644                                 }
645                                 replWant += slot.mnt.Replication
646                         }
647                         return replProt >= desired && replWant >= desired
648                 }
649
650                 // First try to achieve desired replication without
651                 // using the same server twice.
652                 done := false
653                 for i := 0; i < len(slots) && !done; i++ {
654                         if !wantSrv[slots[i].mnt.KeepService] {
655                                 done = trySlot(i)
656                         }
657                 }
658
659                 // If that didn't suffice, do another pass without the
660                 // "distinct services" restriction. (Achieving the
661                 // desired volume replication on fewer than the
662                 // desired number of services is better than
663                 // underreplicating.)
664                 for i := 0; i < len(slots) && !done; i++ {
665                         done = trySlot(i)
666                 }
667
668                 if !underreplicated {
669                         safe := 0
670                         for _, slot := range slots {
671                                 if slot.repl == nil || !bal.mountsByClass[class][slot.mnt] {
672                                         continue
673                                 }
674                                 if safe += slot.mnt.Replication; safe >= desired {
675                                         break
676                                 }
677                         }
678                         underreplicated = safe < desired
679                 }
680
681                 // set the unachievable flag if there aren't enough
682                 // slots offering the relevant storage class. (This is
683                 // as easy as checking slots[desired] because we
684                 // already sorted the qualifying slots to the front.)
685                 if desired >= len(slots) || !bal.mountsByClass[class][slots[desired].mnt] {
686                         cs := classState[class]
687                         cs.unachievable = true
688                         classState[class] = cs
689                 }
690
691                 // Avoid deleting wanted replicas from devices that
692                 // are mounted on multiple servers -- even if they
693                 // haven't already been added to unsafeToDelete
694                 // because the servers report different Mtimes.
695                 for _, slot := range slots {
696                         if slot.repl != nil && wantDev[slot.mnt.DeviceID] {
697                                 unsafeToDelete[slot.repl.Mtime] = true
698                         }
699                 }
700         }
701
702         // TODO: If multiple replicas are trashable, prefer the oldest
703         // replica that doesn't have a timestamp collision with
704         // others.
705
706         countedDev := map[string]bool{}
707         var have, want int
708         for _, slot := range slots {
709                 if countedDev[slot.mnt.DeviceID] {
710                         continue
711                 }
712                 if slot.want {
713                         want += slot.mnt.Replication
714                 }
715                 if slot.repl != nil {
716                         have += slot.mnt.Replication
717                 }
718                 if slot.mnt.DeviceID != "" {
719                         countedDev[slot.mnt.DeviceID] = true
720                 }
721         }
722
723         var changes []string
724         for _, slot := range slots {
725                 // TODO: request a Touch if Mtime is duplicated.
726                 var change int
727                 switch {
728                 case !underreplicated && !slot.want && slot.repl != nil && slot.repl.Mtime < bal.MinMtime && !unsafeToDelete[slot.repl.Mtime]:
729                         slot.mnt.KeepService.AddTrash(Trash{
730                                 SizedDigest: blkid,
731                                 Mtime:       slot.repl.Mtime,
732                                 From:        slot.mnt,
733                         })
734                         change = changeTrash
735                 case len(blk.Replicas) == 0:
736                         change = changeNone
737                 case slot.repl == nil && slot.want && !slot.mnt.ReadOnly:
738                         slot.mnt.KeepService.AddPull(Pull{
739                                 SizedDigest: blkid,
740                                 From:        blk.Replicas[0].KeepMount.KeepService,
741                                 To:          slot.mnt,
742                         })
743                         change = changePull
744                 default:
745                         change = changeStay
746                 }
747                 if bal.Dumper != nil {
748                         var mtime int64
749                         if slot.repl != nil {
750                                 mtime = slot.repl.Mtime
751                         }
752                         srv := slot.mnt.KeepService
753                         changes = append(changes, fmt.Sprintf("%s:%d/%s=%s,%d", srv.ServiceHost, srv.ServicePort, slot.mnt.UUID, changeName[change], mtime))
754                 }
755         }
756         if bal.Dumper != nil {
757                 bal.Dumper.Printf("%s have=%d want=%v %s", blkid, have, want, strings.Join(changes, " "))
758         }
759         return balanceResult{
760                 blk:        blk,
761                 blkid:      blkid,
762                 have:       have,
763                 want:       want,
764                 classState: classState,
765         }
766 }
767
768 type blocksNBytes struct {
769         replicas int
770         blocks   int
771         bytes    int64
772 }
773
774 func (bb blocksNBytes) String() string {
775         return fmt.Sprintf("%d replicas (%d blocks, %d bytes)", bb.replicas, bb.blocks, bb.bytes)
776 }
777
778 type balancerStats struct {
779         lost          blocksNBytes
780         overrep       blocksNBytes
781         unref         blocksNBytes
782         garbage       blocksNBytes
783         underrep      blocksNBytes
784         unachievable  blocksNBytes
785         justright     blocksNBytes
786         desired       blocksNBytes
787         current       blocksNBytes
788         pulls         int
789         trashes       int
790         replHistogram []int
791         classStats    map[string]replicationStats
792
793         // collectionBytes / collectionBlockBytes = deduplication ratio
794         collectionBytes      int64 // sum(bytes in referenced blocks) across all collections
795         collectionBlockBytes int64 // sum(block size) across all blocks referenced by collections
796         collectionBlockRefs  int64 // sum(number of blocks referenced) across all collections
797         collectionBlocks     int64 // number of blocks referenced by any collection
798 }
799
800 func (s *balancerStats) dedupByteRatio() float64 {
801         if s.collectionBlockBytes == 0 {
802                 return 0
803         }
804         return float64(s.collectionBytes) / float64(s.collectionBlockBytes)
805 }
806
807 func (s *balancerStats) dedupBlockRatio() float64 {
808         if s.collectionBlocks == 0 {
809                 return 0
810         }
811         return float64(s.collectionBlockRefs) / float64(s.collectionBlocks)
812 }
813
814 type replicationStats struct {
815         desired      blocksNBytes
816         surplus      blocksNBytes
817         short        blocksNBytes
818         unachievable blocksNBytes
819 }
820
821 type balancedBlockState struct {
822         desired      int
823         surplus      int
824         unachievable bool
825 }
826
827 func (bal *Balancer) collectStatistics(results <-chan balanceResult) {
828         var s balancerStats
829         s.replHistogram = make([]int, 2)
830         s.classStats = make(map[string]replicationStats, len(bal.classes))
831         for result := range results {
832                 surplus := result.have - result.want
833                 bytes := result.blkid.Size()
834
835                 if rc := int64(result.blk.RefCount); rc > 0 {
836                         s.collectionBytes += rc * bytes
837                         s.collectionBlockBytes += bytes
838                         s.collectionBlockRefs += rc
839                         s.collectionBlocks++
840                 }
841
842                 for class, state := range result.classState {
843                         cs := s.classStats[class]
844                         if state.unachievable {
845                                 cs.unachievable.blocks++
846                                 cs.unachievable.bytes += bytes
847                         }
848                         if state.desired > 0 {
849                                 cs.desired.replicas += state.desired
850                                 cs.desired.blocks++
851                                 cs.desired.bytes += bytes * int64(state.desired)
852                         }
853                         if state.surplus > 0 {
854                                 cs.surplus.replicas += state.surplus
855                                 cs.surplus.blocks++
856                                 cs.surplus.bytes += bytes * int64(state.surplus)
857                         } else if state.surplus < 0 {
858                                 cs.short.replicas += -state.surplus
859                                 cs.short.blocks++
860                                 cs.short.bytes += bytes * int64(-state.surplus)
861                         }
862                         s.classStats[class] = cs
863                 }
864
865                 switch {
866                 case result.have == 0 && result.want > 0:
867                         s.lost.replicas -= surplus
868                         s.lost.blocks++
869                         s.lost.bytes += bytes * int64(-surplus)
870                 case surplus < 0:
871                         s.underrep.replicas -= surplus
872                         s.underrep.blocks++
873                         s.underrep.bytes += bytes * int64(-surplus)
874                 case surplus > 0 && result.want == 0:
875                         counter := &s.garbage
876                         for _, r := range result.blk.Replicas {
877                                 if r.Mtime >= bal.MinMtime {
878                                         counter = &s.unref
879                                         break
880                                 }
881                         }
882                         counter.replicas += surplus
883                         counter.blocks++
884                         counter.bytes += bytes * int64(surplus)
885                 case surplus > 0:
886                         s.overrep.replicas += surplus
887                         s.overrep.blocks++
888                         s.overrep.bytes += bytes * int64(result.have-result.want)
889                 default:
890                         s.justright.replicas += result.want
891                         s.justright.blocks++
892                         s.justright.bytes += bytes * int64(result.want)
893                 }
894
895                 if result.want > 0 {
896                         s.desired.replicas += result.want
897                         s.desired.blocks++
898                         s.desired.bytes += bytes * int64(result.want)
899                 }
900                 if result.have > 0 {
901                         s.current.replicas += result.have
902                         s.current.blocks++
903                         s.current.bytes += bytes * int64(result.have)
904                 }
905
906                 for len(s.replHistogram) <= result.have {
907                         s.replHistogram = append(s.replHistogram, 0)
908                 }
909                 s.replHistogram[result.have]++
910         }
911         for _, srv := range bal.KeepServices {
912                 s.pulls += len(srv.ChangeSet.Pulls)
913                 s.trashes += len(srv.ChangeSet.Trashes)
914         }
915         bal.stats = s
916         bal.Metrics.UpdateStats(s)
917 }
918
919 // PrintStatistics writes statistics about the computed changes to
920 // bal.Logger. It should not be called until ComputeChangeSets has
921 // finished.
922 func (bal *Balancer) PrintStatistics() {
923         bal.logf("===")
924         bal.logf("%s lost (0=have<want)", bal.stats.lost)
925         bal.logf("%s underreplicated (0<have<want)", bal.stats.underrep)
926         bal.logf("%s just right (have=want)", bal.stats.justright)
927         bal.logf("%s overreplicated (have>want>0)", bal.stats.overrep)
928         bal.logf("%s unreferenced (have>want=0, new)", bal.stats.unref)
929         bal.logf("%s garbage (have>want=0, old)", bal.stats.garbage)
930         for _, class := range bal.classes {
931                 cs := bal.stats.classStats[class]
932                 bal.logf("===")
933                 bal.logf("storage class %q: %s desired", class, cs.desired)
934                 bal.logf("storage class %q: %s short", class, cs.short)
935                 bal.logf("storage class %q: %s surplus", class, cs.surplus)
936                 bal.logf("storage class %q: %s unachievable", class, cs.unachievable)
937         }
938         bal.logf("===")
939         bal.logf("%s total commitment (excluding unreferenced)", bal.stats.desired)
940         bal.logf("%s total usage", bal.stats.current)
941         bal.logf("===")
942         for _, srv := range bal.KeepServices {
943                 bal.logf("%s: %v\n", srv, srv.ChangeSet)
944         }
945         bal.logf("===")
946         bal.printHistogram(60)
947         bal.logf("===")
948 }
949
950 func (bal *Balancer) printHistogram(hashColumns int) {
951         bal.logf("Replication level distribution (counting N replicas on a single server as N):")
952         maxCount := 0
953         for _, count := range bal.stats.replHistogram {
954                 if maxCount < count {
955                         maxCount = count
956                 }
957         }
958         hashes := strings.Repeat("#", hashColumns)
959         countWidth := 1 + int(math.Log10(float64(maxCount+1)))
960         scaleCount := 10 * float64(hashColumns) / math.Floor(1+10*math.Log10(float64(maxCount+1)))
961         for repl, count := range bal.stats.replHistogram {
962                 nHashes := int(scaleCount * math.Log10(float64(count+1)))
963                 bal.logf("%2d: %*d %s", repl, countWidth, count, hashes[:nHashes])
964         }
965 }
966
967 // CheckSanityLate checks for configuration and runtime errors after
968 // GetCurrentState() and ComputeChangeSets() have finished.
969 //
970 // If it returns an error, it is dangerous to run any Commit methods.
971 func (bal *Balancer) CheckSanityLate() error {
972         if bal.errors != nil {
973                 for _, err := range bal.errors {
974                         bal.logf("deferred error: %v", err)
975                 }
976                 return fmt.Errorf("cannot proceed safely after deferred errors")
977         }
978
979         if bal.collScanned == 0 {
980                 return fmt.Errorf("received zero collections")
981         }
982
983         anyDesired := false
984         bal.BlockStateMap.Apply(func(_ arvados.SizedDigest, blk *BlockState) {
985                 for _, desired := range blk.Desired {
986                         if desired > 0 {
987                                 anyDesired = true
988                                 break
989                         }
990                 }
991         })
992         if !anyDesired {
993                 return fmt.Errorf("zero blocks have desired replication>0")
994         }
995
996         if dr := bal.DefaultReplication; dr < 1 {
997                 return fmt.Errorf("Default replication (%d) is less than 1", dr)
998         }
999
1000         // TODO: no two services have identical indexes
1001         // TODO: no collisions (same md5, different size)
1002         return nil
1003 }
1004
1005 // CommitPulls sends the computed lists of pull requests to the
1006 // keepstore servers. This has the effect of increasing replication of
1007 // existing blocks that are either underreplicated or poorly
1008 // distributed according to rendezvous hashing.
1009 func (bal *Balancer) CommitPulls(c *arvados.Client) error {
1010         defer bal.time("send_pull_lists", "wall clock time to send pull lists")()
1011         return bal.commitAsync(c, "send pull list",
1012                 func(srv *KeepService) error {
1013                         return srv.CommitPulls(c)
1014                 })
1015 }
1016
1017 // CommitTrash sends the computed lists of trash requests to the
1018 // keepstore servers. This has the effect of deleting blocks that are
1019 // overreplicated or unreferenced.
1020 func (bal *Balancer) CommitTrash(c *arvados.Client) error {
1021         defer bal.time("send_trash_lists", "wall clock time to send trash lists")()
1022         return bal.commitAsync(c, "send trash list",
1023                 func(srv *KeepService) error {
1024                         return srv.CommitTrash(c)
1025                 })
1026 }
1027
1028 func (bal *Balancer) commitAsync(c *arvados.Client, label string, f func(srv *KeepService) error) error {
1029         errs := make(chan error)
1030         for _, srv := range bal.KeepServices {
1031                 go func(srv *KeepService) {
1032                         var err error
1033                         defer func() { errs <- err }()
1034                         label := fmt.Sprintf("%s: %v", srv, label)
1035                         err = f(srv)
1036                         if err != nil {
1037                                 err = fmt.Errorf("%s: %v", label, err)
1038                         }
1039                 }(srv)
1040         }
1041         var lastErr error
1042         for range bal.KeepServices {
1043                 if err := <-errs; err != nil {
1044                         bal.logf("%v", err)
1045                         lastErr = err
1046                 }
1047         }
1048         close(errs)
1049         return lastErr
1050 }
1051
1052 func (bal *Balancer) logf(f string, args ...interface{}) {
1053         if bal.Logger != nil {
1054                 bal.Logger.Printf(f, args...)
1055         }
1056 }
1057
1058 func (bal *Balancer) time(name, help string) func() {
1059         observer := bal.Metrics.DurationObserver(name+"_seconds", help)
1060         t0 := time.Now()
1061         bal.Logger.Printf("%s: start", name)
1062         return func() {
1063                 dur := time.Since(t0)
1064                 observer.Observe(dur.Seconds())
1065                 bal.Logger.Printf("%s: took %vs", name, dur.Seconds())
1066         }
1067 }
1068
1069 // Rendezvous hash sort function. Less efficient than sorting on
1070 // precomputed rendezvous hashes, but also rarely used.
1071 func rendezvousLess(i, j string, blkid arvados.SizedDigest) bool {
1072         a := md5.Sum([]byte(string(blkid[:32]) + i))
1073         b := md5.Sum([]byte(string(blkid[:32]) + j))
1074         return bytes.Compare(a[:], b[:]) < 0
1075 }