Merge branch 'rails5-schema-env-data'
[arvados.git] / services / keep-balance / balance.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "crypto/md5"
10         "fmt"
11         "log"
12         "math"
13         "runtime"
14         "sort"
15         "strings"
16         "sync"
17         "time"
18
19         "git.curoverse.com/arvados.git/sdk/go/arvados"
20         "git.curoverse.com/arvados.git/sdk/go/keepclient"
21         "github.com/sirupsen/logrus"
22 )
23
24 // Balancer compares the contents of keepstore servers with the
25 // collections stored in Arvados, and issues pull/trash requests
26 // needed to get (closer to) the optimal data layout.
27 //
28 // In the optimal data layout: every data block referenced by a
29 // collection is replicated at least as many times as desired by the
30 // collection; there are no unreferenced data blocks older than
31 // BlobSignatureTTL; and all N existing replicas of a given data block
32 // are in the N best positions in rendezvous probe order.
33 type Balancer struct {
34         Logger  logrus.FieldLogger
35         Dumper  logrus.FieldLogger
36         Metrics *metrics
37
38         *BlockStateMap
39         KeepServices       map[string]*KeepService
40         DefaultReplication int
41         MinMtime           int64
42
43         classes       []string
44         mounts        int
45         mountsByClass map[string]map[*KeepMount]bool
46         collScanned   int
47         serviceRoots  map[string]string
48         errors        []error
49         stats         balancerStats
50         mutex         sync.Mutex
51 }
52
53 // Run performs a balance operation using the given config and
54 // runOptions, and returns RunOptions suitable for passing to a
55 // subsequent balance operation.
56 //
57 // Run should only be called once on a given Balancer object.
58 //
59 // Typical usage:
60 //
61 //   runOptions, err = (&Balancer{}).Run(config, runOptions)
62 func (bal *Balancer) Run(config Config, runOptions RunOptions) (nextRunOptions RunOptions, err error) {
63         nextRunOptions = runOptions
64
65         defer bal.time("sweep", "wall clock time to run one full sweep")()
66
67         if len(config.KeepServiceList.Items) > 0 {
68                 err = bal.SetKeepServices(config.KeepServiceList)
69         } else {
70                 err = bal.DiscoverKeepServices(&config.Client, config.KeepServiceTypes)
71         }
72         if err != nil {
73                 return
74         }
75
76         for _, srv := range bal.KeepServices {
77                 err = srv.discoverMounts(&config.Client)
78                 if err != nil {
79                         return
80                 }
81         }
82         bal.cleanupMounts()
83
84         if err = bal.CheckSanityEarly(&config.Client); err != nil {
85                 return
86         }
87         rs := bal.rendezvousState()
88         if runOptions.CommitTrash && rs != runOptions.SafeRendezvousState {
89                 if runOptions.SafeRendezvousState != "" {
90                         bal.logf("notice: KeepServices list has changed since last run")
91                 }
92                 bal.logf("clearing existing trash lists, in case the new rendezvous order differs from previous run")
93                 if err = bal.ClearTrashLists(&config.Client); err != nil {
94                         return
95                 }
96                 // The current rendezvous state becomes "safe" (i.e.,
97                 // OK to compute changes for that state without
98                 // clearing existing trash lists) only now, after we
99                 // succeed in clearing existing trash lists.
100                 nextRunOptions.SafeRendezvousState = rs
101         }
102         if err = bal.GetCurrentState(&config.Client, config.CollectionBatchSize, config.CollectionBuffers); err != nil {
103                 return
104         }
105         bal.ComputeChangeSets()
106         bal.PrintStatistics()
107         if err = bal.CheckSanityLate(); err != nil {
108                 return
109         }
110         if runOptions.CommitPulls {
111                 err = bal.CommitPulls(&config.Client)
112                 if err != nil {
113                         // Skip trash if we can't pull. (Too cautious?)
114                         return
115                 }
116         }
117         if runOptions.CommitTrash {
118                 err = bal.CommitTrash(&config.Client)
119         }
120         return
121 }
122
123 // SetKeepServices sets the list of KeepServices to operate on.
124 func (bal *Balancer) SetKeepServices(srvList arvados.KeepServiceList) error {
125         bal.KeepServices = make(map[string]*KeepService)
126         for _, srv := range srvList.Items {
127                 bal.KeepServices[srv.UUID] = &KeepService{
128                         KeepService: srv,
129                         ChangeSet:   &ChangeSet{},
130                 }
131         }
132         return nil
133 }
134
135 // DiscoverKeepServices sets the list of KeepServices by calling the
136 // API to get a list of all services, and selecting the ones whose
137 // ServiceType is in okTypes.
138 func (bal *Balancer) DiscoverKeepServices(c *arvados.Client, okTypes []string) error {
139         bal.KeepServices = make(map[string]*KeepService)
140         ok := make(map[string]bool)
141         for _, t := range okTypes {
142                 ok[t] = true
143         }
144         return c.EachKeepService(func(srv arvados.KeepService) error {
145                 if ok[srv.ServiceType] {
146                         bal.KeepServices[srv.UUID] = &KeepService{
147                                 KeepService: srv,
148                                 ChangeSet:   &ChangeSet{},
149                         }
150                 } else {
151                         bal.logf("skipping %v with service type %q", srv.UUID, srv.ServiceType)
152                 }
153                 return nil
154         })
155 }
156
157 func (bal *Balancer) cleanupMounts() {
158         rwdev := map[string]*KeepService{}
159         for _, srv := range bal.KeepServices {
160                 for _, mnt := range srv.mounts {
161                         if !mnt.ReadOnly && mnt.DeviceID != "" {
162                                 rwdev[mnt.DeviceID] = srv
163                         }
164                 }
165         }
166         // Drop the readonly mounts whose device is mounted RW
167         // elsewhere.
168         for _, srv := range bal.KeepServices {
169                 var dedup []*KeepMount
170                 for _, mnt := range srv.mounts {
171                         if mnt.ReadOnly && rwdev[mnt.DeviceID] != nil {
172                                 bal.logf("skipping srv %s readonly mount %q because same device %q is mounted read-write on srv %s", srv, mnt.UUID, mnt.DeviceID, rwdev[mnt.DeviceID])
173                         } else {
174                                 dedup = append(dedup, mnt)
175                         }
176                 }
177                 srv.mounts = dedup
178         }
179         for _, srv := range bal.KeepServices {
180                 for _, mnt := range srv.mounts {
181                         if mnt.Replication <= 0 {
182                                 log.Printf("%s: mount %s reports replication=%d, using replication=1", srv, mnt.UUID, mnt.Replication)
183                                 mnt.Replication = 1
184                         }
185                 }
186         }
187 }
188
189 // CheckSanityEarly checks for configuration and runtime errors that
190 // can be detected before GetCurrentState() and ComputeChangeSets()
191 // are called.
192 //
193 // If it returns an error, it is pointless to run GetCurrentState or
194 // ComputeChangeSets: after doing so, the statistics would be
195 // meaningless and it would be dangerous to run any Commit methods.
196 func (bal *Balancer) CheckSanityEarly(c *arvados.Client) error {
197         u, err := c.CurrentUser()
198         if err != nil {
199                 return fmt.Errorf("CurrentUser(): %v", err)
200         }
201         if !u.IsActive || !u.IsAdmin {
202                 return fmt.Errorf("current user (%s) is not an active admin user", u.UUID)
203         }
204         for _, srv := range bal.KeepServices {
205                 if srv.ServiceType == "proxy" {
206                         return fmt.Errorf("config error: %s: proxy servers cannot be balanced", srv)
207                 }
208         }
209
210         var checkPage arvados.CollectionList
211         if err = c.RequestAndDecode(&checkPage, "GET", "arvados/v1/collections", nil, arvados.ResourceListParams{
212                 Limit:              new(int),
213                 Count:              "exact",
214                 IncludeTrash:       true,
215                 IncludeOldVersions: true,
216                 Filters: []arvados.Filter{{
217                         Attr:     "modified_at",
218                         Operator: "=",
219                         Operand:  nil,
220                 }},
221         }); err != nil {
222                 return err
223         } else if n := checkPage.ItemsAvailable; n > 0 {
224                 return fmt.Errorf("%d collections exist with null modified_at; cannot fetch reliably", n)
225         }
226
227         return nil
228 }
229
230 // rendezvousState returns a fingerprint (e.g., a sorted list of
231 // UUID+host+port) of the current set of keep services.
232 func (bal *Balancer) rendezvousState() string {
233         srvs := make([]string, 0, len(bal.KeepServices))
234         for _, srv := range bal.KeepServices {
235                 srvs = append(srvs, srv.String())
236         }
237         sort.Strings(srvs)
238         return strings.Join(srvs, "; ")
239 }
240
241 // ClearTrashLists sends an empty trash list to each keep
242 // service. Calling this before GetCurrentState avoids races.
243 //
244 // When a block appears in an index, we assume that replica will still
245 // exist after we delete other replicas on other servers. However,
246 // it's possible that a previous rebalancing operation made different
247 // decisions (e.g., servers were added/removed, and rendezvous order
248 // changed). In this case, the replica might already be on that
249 // server's trash list, and it might be deleted before we send a
250 // replacement trash list.
251 //
252 // We avoid this problem if we clear all trash lists before getting
253 // indexes. (We also assume there is only one rebalancing process
254 // running at a time.)
255 func (bal *Balancer) ClearTrashLists(c *arvados.Client) error {
256         for _, srv := range bal.KeepServices {
257                 srv.ChangeSet = &ChangeSet{}
258         }
259         return bal.CommitTrash(c)
260 }
261
262 // GetCurrentState determines the current replication state, and the
263 // desired replication level, for every block that is either
264 // retrievable or referenced.
265 //
266 // It determines the current replication state by reading the block index
267 // from every known Keep service.
268 //
269 // It determines the desired replication level by retrieving all
270 // collection manifests in the database (API server).
271 //
272 // It encodes the resulting information in BlockStateMap.
273 func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) error {
274         defer bal.time("get_state", "wall clock time to get current state")()
275         bal.BlockStateMap = NewBlockStateMap()
276
277         dd, err := c.DiscoveryDocument()
278         if err != nil {
279                 return err
280         }
281         bal.DefaultReplication = dd.DefaultCollectionReplication
282         bal.MinMtime = time.Now().UnixNano() - dd.BlobSignatureTTL*1e9
283
284         errs := make(chan error, 1)
285         wg := sync.WaitGroup{}
286
287         // When a device is mounted more than once, we will get its
288         // index only once, and call AddReplicas on all of the mounts.
289         // equivMount keys are the mounts that will be indexed, and
290         // each value is a list of mounts to apply the received index
291         // to.
292         equivMount := map[*KeepMount][]*KeepMount{}
293         // deviceMount maps each device ID to the one mount that will
294         // be indexed for that device.
295         deviceMount := map[string]*KeepMount{}
296         for _, srv := range bal.KeepServices {
297                 for _, mnt := range srv.mounts {
298                         equiv := deviceMount[mnt.DeviceID]
299                         if equiv == nil {
300                                 equiv = mnt
301                                 if mnt.DeviceID != "" {
302                                         deviceMount[mnt.DeviceID] = equiv
303                                 }
304                         }
305                         equivMount[equiv] = append(equivMount[equiv], mnt)
306                 }
307         }
308
309         // Start one goroutine for each (non-redundant) mount:
310         // retrieve the index, and add the returned blocks to
311         // BlockStateMap.
312         for _, mounts := range equivMount {
313                 wg.Add(1)
314                 go func(mounts []*KeepMount) {
315                         defer wg.Done()
316                         bal.logf("mount %s: retrieve index from %s", mounts[0], mounts[0].KeepService)
317                         idx, err := mounts[0].KeepService.IndexMount(c, mounts[0].UUID, "")
318                         if err != nil {
319                                 select {
320                                 case errs <- fmt.Errorf("%s: retrieve index: %v", mounts[0], err):
321                                 default:
322                                 }
323                                 return
324                         }
325                         if len(errs) > 0 {
326                                 // Some other goroutine encountered an
327                                 // error -- any further effort here
328                                 // will be wasted.
329                                 return
330                         }
331                         for _, mount := range mounts {
332                                 bal.logf("%s: add %d entries to map", mount, len(idx))
333                                 bal.BlockStateMap.AddReplicas(mount, idx)
334                                 bal.logf("%s: added %d entries to map at %dx (%d replicas)", mount, len(idx), mount.Replication, len(idx)*mount.Replication)
335                         }
336                         bal.logf("mount %s: index done", mounts[0])
337                 }(mounts)
338         }
339
340         // collQ buffers incoming collections so we can start fetching
341         // the next page without waiting for the current page to
342         // finish processing.
343         collQ := make(chan arvados.Collection, bufs)
344
345         // Start a goroutine to process collections. (We could use a
346         // worker pool here, but even with a single worker we already
347         // process collections much faster than we can retrieve them.)
348         wg.Add(1)
349         go func() {
350                 defer wg.Done()
351                 for coll := range collQ {
352                         err := bal.addCollection(coll)
353                         if err != nil || len(errs) > 0 {
354                                 select {
355                                 case errs <- err:
356                                 default:
357                                 }
358                                 for range collQ {
359                                 }
360                                 return
361                         }
362                         bal.collScanned++
363                 }
364         }()
365
366         // Start a goroutine to retrieve all collections from the
367         // Arvados database and send them to collQ for processing.
368         wg.Add(1)
369         go func() {
370                 defer wg.Done()
371                 err = EachCollection(c, pageSize,
372                         func(coll arvados.Collection) error {
373                                 collQ <- coll
374                                 if len(errs) > 0 {
375                                         // some other GetCurrentState
376                                         // error happened: no point
377                                         // getting any more
378                                         // collections.
379                                         return fmt.Errorf("")
380                                 }
381                                 return nil
382                         }, func(done, total int) {
383                                 bal.logf("collections: %d/%d", done, total)
384                         })
385                 close(collQ)
386                 if err != nil {
387                         select {
388                         case errs <- err:
389                         default:
390                         }
391                 }
392         }()
393
394         wg.Wait()
395         if len(errs) > 0 {
396                 return <-errs
397         }
398         return nil
399 }
400
401 func (bal *Balancer) addCollection(coll arvados.Collection) error {
402         blkids, err := coll.SizedDigests()
403         if err != nil {
404                 return fmt.Errorf("%v: %v", coll.UUID, err)
405         }
406         repl := bal.DefaultReplication
407         if coll.ReplicationDesired != nil {
408                 repl = *coll.ReplicationDesired
409         }
410         debugf("%v: %d block x%d", coll.UUID, len(blkids), repl)
411         bal.BlockStateMap.IncreaseDesired(coll.StorageClassesDesired, repl, blkids)
412         return nil
413 }
414
415 // ComputeChangeSets compares, for each known block, the current and
416 // desired replication states. If it is possible to get closer to the
417 // desired state by copying or deleting blocks, it adds those changes
418 // to the relevant KeepServices' ChangeSets.
419 //
420 // It does not actually apply any of the computed changes.
421 func (bal *Balancer) ComputeChangeSets() {
422         // This just calls balanceBlock() once for each block, using a
423         // pool of worker goroutines.
424         defer bal.time("changeset_compute", "wall clock time to compute changesets")()
425         bal.setupLookupTables()
426
427         type balanceTask struct {
428                 blkid arvados.SizedDigest
429                 blk   *BlockState
430         }
431         workers := runtime.GOMAXPROCS(-1)
432         todo := make(chan balanceTask, workers)
433         go func() {
434                 bal.BlockStateMap.Apply(func(blkid arvados.SizedDigest, blk *BlockState) {
435                         todo <- balanceTask{
436                                 blkid: blkid,
437                                 blk:   blk,
438                         }
439                 })
440                 close(todo)
441         }()
442         results := make(chan balanceResult, workers)
443         go func() {
444                 var wg sync.WaitGroup
445                 for i := 0; i < workers; i++ {
446                         wg.Add(1)
447                         go func() {
448                                 for work := range todo {
449                                         results <- bal.balanceBlock(work.blkid, work.blk)
450                                 }
451                                 wg.Done()
452                         }()
453                 }
454                 wg.Wait()
455                 close(results)
456         }()
457         bal.collectStatistics(results)
458 }
459
460 func (bal *Balancer) setupLookupTables() {
461         bal.serviceRoots = make(map[string]string)
462         bal.classes = defaultClasses
463         bal.mountsByClass = map[string]map[*KeepMount]bool{"default": {}}
464         bal.mounts = 0
465         for _, srv := range bal.KeepServices {
466                 bal.serviceRoots[srv.UUID] = srv.UUID
467                 for _, mnt := range srv.mounts {
468                         bal.mounts++
469
470                         // All mounts on a read-only service are
471                         // effectively read-only.
472                         mnt.ReadOnly = mnt.ReadOnly || srv.ReadOnly
473
474                         if len(mnt.StorageClasses) == 0 {
475                                 bal.mountsByClass["default"][mnt] = true
476                                 continue
477                         }
478                         for _, class := range mnt.StorageClasses {
479                                 if mbc := bal.mountsByClass[class]; mbc == nil {
480                                         bal.classes = append(bal.classes, class)
481                                         bal.mountsByClass[class] = map[*KeepMount]bool{mnt: true}
482                                 } else {
483                                         mbc[mnt] = true
484                                 }
485                         }
486                 }
487         }
488         // Consider classes in lexicographic order to avoid flapping
489         // between balancing runs.  The outcome of the "prefer a mount
490         // we're already planning to use for a different storage
491         // class" case in balanceBlock depends on the order classes
492         // are considered.
493         sort.Strings(bal.classes)
494 }
495
496 const (
497         changeStay = iota
498         changePull
499         changeTrash
500         changeNone
501 )
502
503 var changeName = map[int]string{
504         changeStay:  "stay",
505         changePull:  "pull",
506         changeTrash: "trash",
507         changeNone:  "none",
508 }
509
510 type balanceResult struct {
511         blk        *BlockState
512         blkid      arvados.SizedDigest
513         have       int
514         want       int
515         classState map[string]balancedBlockState
516 }
517
518 // balanceBlock compares current state to desired state for a single
519 // block, and makes the appropriate ChangeSet calls.
520 func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) balanceResult {
521         debugf("balanceBlock: %v %+v", blkid, blk)
522
523         type slot struct {
524                 mnt  *KeepMount // never nil
525                 repl *Replica   // replica already stored here (or nil)
526                 want bool       // we should pull/leave a replica here
527         }
528
529         // Build a list of all slots (one per mounted volume).
530         slots := make([]slot, 0, bal.mounts)
531         for _, srv := range bal.KeepServices {
532                 for _, mnt := range srv.mounts {
533                         var repl *Replica
534                         for r := range blk.Replicas {
535                                 if blk.Replicas[r].KeepMount == mnt {
536                                         repl = &blk.Replicas[r]
537                                 }
538                         }
539                         // Initial value of "want" is "have, and can't
540                         // delete". These untrashable replicas get
541                         // prioritized when sorting slots: otherwise,
542                         // non-optimal readonly copies would cause us
543                         // to overreplicate.
544                         slots = append(slots, slot{
545                                 mnt:  mnt,
546                                 repl: repl,
547                                 want: repl != nil && mnt.ReadOnly,
548                         })
549                 }
550         }
551
552         uuids := keepclient.NewRootSorter(bal.serviceRoots, string(blkid[:32])).GetSortedRoots()
553         srvRendezvous := make(map[*KeepService]int, len(uuids))
554         for i, uuid := range uuids {
555                 srv := bal.KeepServices[uuid]
556                 srvRendezvous[srv] = i
557         }
558
559         // Below we set underreplicated=true if we find any storage
560         // class that's currently underreplicated -- in that case we
561         // won't want to trash any replicas.
562         underreplicated := false
563
564         classState := make(map[string]balancedBlockState, len(bal.classes))
565         unsafeToDelete := make(map[int64]bool, len(slots))
566         for _, class := range bal.classes {
567                 desired := blk.Desired[class]
568
569                 countedDev := map[string]bool{}
570                 have := 0
571                 for _, slot := range slots {
572                         if slot.repl != nil && bal.mountsByClass[class][slot.mnt] && !countedDev[slot.mnt.DeviceID] {
573                                 have += slot.mnt.Replication
574                                 if slot.mnt.DeviceID != "" {
575                                         countedDev[slot.mnt.DeviceID] = true
576                                 }
577                         }
578                 }
579                 classState[class] = balancedBlockState{
580                         desired: desired,
581                         surplus: have - desired,
582                 }
583
584                 if desired == 0 {
585                         continue
586                 }
587
588                 // Sort the slots by desirability.
589                 sort.Slice(slots, func(i, j int) bool {
590                         si, sj := slots[i], slots[j]
591                         if classi, classj := bal.mountsByClass[class][si.mnt], bal.mountsByClass[class][sj.mnt]; classi != classj {
592                                 // Prefer a mount that satisfies the
593                                 // desired class.
594                                 return bal.mountsByClass[class][si.mnt]
595                         } else if si.want != sj.want {
596                                 // Prefer a mount that will have a
597                                 // replica no matter what we do here
598                                 // -- either because it already has an
599                                 // untrashable replica, or because we
600                                 // already need it to satisfy a
601                                 // different storage class.
602                                 return si.want
603                         } else if orderi, orderj := srvRendezvous[si.mnt.KeepService], srvRendezvous[sj.mnt.KeepService]; orderi != orderj {
604                                 // Prefer a better rendezvous
605                                 // position.
606                                 return orderi < orderj
607                         } else if repli, replj := si.repl != nil, sj.repl != nil; repli != replj {
608                                 // Prefer a mount that already has a
609                                 // replica.
610                                 return repli
611                         } else {
612                                 // If pull/trash turns out to be
613                                 // needed, distribute the
614                                 // new/remaining replicas uniformly
615                                 // across qualifying mounts on a given
616                                 // server.
617                                 return rendezvousLess(si.mnt.DeviceID, sj.mnt.DeviceID, blkid)
618                         }
619                 })
620
621                 // Servers/mounts/devices (with or without existing
622                 // replicas) that are part of the best achievable
623                 // layout for this storage class.
624                 wantSrv := map[*KeepService]bool{}
625                 wantMnt := map[*KeepMount]bool{}
626                 wantDev := map[string]bool{}
627                 // Positions (with existing replicas) that have been
628                 // protected (via unsafeToDelete) to ensure we don't
629                 // reduce replication below desired level when
630                 // trashing replicas that aren't optimal positions for
631                 // any storage class.
632                 protMnt := map[*KeepMount]bool{}
633                 // Replication planned so far (corresponds to wantMnt).
634                 replWant := 0
635                 // Protected replication (corresponds to protMnt).
636                 replProt := 0
637
638                 // trySlot tries using a slot to meet requirements,
639                 // and returns true if all requirements are met.
640                 trySlot := func(i int) bool {
641                         slot := slots[i]
642                         if wantMnt[slot.mnt] || wantDev[slot.mnt.DeviceID] {
643                                 // Already allocated a replica to this
644                                 // backend device, possibly on a
645                                 // different server.
646                                 return false
647                         }
648                         if replProt < desired && slot.repl != nil && !protMnt[slot.mnt] {
649                                 unsafeToDelete[slot.repl.Mtime] = true
650                                 protMnt[slot.mnt] = true
651                                 replProt += slot.mnt.Replication
652                         }
653                         if replWant < desired && (slot.repl != nil || !slot.mnt.ReadOnly) {
654                                 slots[i].want = true
655                                 wantSrv[slot.mnt.KeepService] = true
656                                 wantMnt[slot.mnt] = true
657                                 if slot.mnt.DeviceID != "" {
658                                         wantDev[slot.mnt.DeviceID] = true
659                                 }
660                                 replWant += slot.mnt.Replication
661                         }
662                         return replProt >= desired && replWant >= desired
663                 }
664
665                 // First try to achieve desired replication without
666                 // using the same server twice.
667                 done := false
668                 for i := 0; i < len(slots) && !done; i++ {
669                         if !wantSrv[slots[i].mnt.KeepService] {
670                                 done = trySlot(i)
671                         }
672                 }
673
674                 // If that didn't suffice, do another pass without the
675                 // "distinct services" restriction. (Achieving the
676                 // desired volume replication on fewer than the
677                 // desired number of services is better than
678                 // underreplicating.)
679                 for i := 0; i < len(slots) && !done; i++ {
680                         done = trySlot(i)
681                 }
682
683                 if !underreplicated {
684                         safe := 0
685                         for _, slot := range slots {
686                                 if slot.repl == nil || !bal.mountsByClass[class][slot.mnt] {
687                                         continue
688                                 }
689                                 if safe += slot.mnt.Replication; safe >= desired {
690                                         break
691                                 }
692                         }
693                         underreplicated = safe < desired
694                 }
695
696                 // set the unachievable flag if there aren't enough
697                 // slots offering the relevant storage class. (This is
698                 // as easy as checking slots[desired] because we
699                 // already sorted the qualifying slots to the front.)
700                 if desired >= len(slots) || !bal.mountsByClass[class][slots[desired].mnt] {
701                         cs := classState[class]
702                         cs.unachievable = true
703                         classState[class] = cs
704                 }
705
706                 // Avoid deleting wanted replicas from devices that
707                 // are mounted on multiple servers -- even if they
708                 // haven't already been added to unsafeToDelete
709                 // because the servers report different Mtimes.
710                 for _, slot := range slots {
711                         if slot.repl != nil && wantDev[slot.mnt.DeviceID] {
712                                 unsafeToDelete[slot.repl.Mtime] = true
713                         }
714                 }
715         }
716
717         // TODO: If multiple replicas are trashable, prefer the oldest
718         // replica that doesn't have a timestamp collision with
719         // others.
720
721         countedDev := map[string]bool{}
722         var have, want int
723         for _, slot := range slots {
724                 if countedDev[slot.mnt.DeviceID] {
725                         continue
726                 }
727                 if slot.want {
728                         want += slot.mnt.Replication
729                 }
730                 if slot.repl != nil {
731                         have += slot.mnt.Replication
732                 }
733                 if slot.mnt.DeviceID != "" {
734                         countedDev[slot.mnt.DeviceID] = true
735                 }
736         }
737
738         var changes []string
739         for _, slot := range slots {
740                 // TODO: request a Touch if Mtime is duplicated.
741                 var change int
742                 switch {
743                 case !underreplicated && !slot.want && slot.repl != nil && slot.repl.Mtime < bal.MinMtime && !unsafeToDelete[slot.repl.Mtime]:
744                         slot.mnt.KeepService.AddTrash(Trash{
745                                 SizedDigest: blkid,
746                                 Mtime:       slot.repl.Mtime,
747                                 From:        slot.mnt,
748                         })
749                         change = changeTrash
750                 case len(blk.Replicas) > 0 && slot.repl == nil && slot.want && !slot.mnt.ReadOnly:
751                         slot.mnt.KeepService.AddPull(Pull{
752                                 SizedDigest: blkid,
753                                 From:        blk.Replicas[0].KeepMount.KeepService,
754                                 To:          slot.mnt,
755                         })
756                         change = changePull
757                 case slot.repl != nil:
758                         change = changeStay
759                 default:
760                         change = changeNone
761                 }
762                 if bal.Dumper != nil {
763                         var mtime int64
764                         if slot.repl != nil {
765                                 mtime = slot.repl.Mtime
766                         }
767                         srv := slot.mnt.KeepService
768                         changes = append(changes, fmt.Sprintf("%s:%d/%s=%s,%d", srv.ServiceHost, srv.ServicePort, slot.mnt.UUID, changeName[change], mtime))
769                 }
770         }
771         if bal.Dumper != nil {
772                 bal.Dumper.Printf("%s refs=%d have=%d want=%v %v %v", blkid, blk.RefCount, have, want, blk.Desired, changes)
773         }
774         return balanceResult{
775                 blk:        blk,
776                 blkid:      blkid,
777                 have:       have,
778                 want:       want,
779                 classState: classState,
780         }
781 }
782
783 type blocksNBytes struct {
784         replicas int
785         blocks   int
786         bytes    int64
787 }
788
789 func (bb blocksNBytes) String() string {
790         return fmt.Sprintf("%d replicas (%d blocks, %d bytes)", bb.replicas, bb.blocks, bb.bytes)
791 }
792
793 type balancerStats struct {
794         lost          blocksNBytes
795         overrep       blocksNBytes
796         unref         blocksNBytes
797         garbage       blocksNBytes
798         underrep      blocksNBytes
799         unachievable  blocksNBytes
800         justright     blocksNBytes
801         desired       blocksNBytes
802         current       blocksNBytes
803         pulls         int
804         trashes       int
805         replHistogram []int
806         classStats    map[string]replicationStats
807
808         // collectionBytes / collectionBlockBytes = deduplication ratio
809         collectionBytes      int64 // sum(bytes in referenced blocks) across all collections
810         collectionBlockBytes int64 // sum(block size) across all blocks referenced by collections
811         collectionBlockRefs  int64 // sum(number of blocks referenced) across all collections
812         collectionBlocks     int64 // number of blocks referenced by any collection
813 }
814
815 func (s *balancerStats) dedupByteRatio() float64 {
816         if s.collectionBlockBytes == 0 {
817                 return 0
818         }
819         return float64(s.collectionBytes) / float64(s.collectionBlockBytes)
820 }
821
822 func (s *balancerStats) dedupBlockRatio() float64 {
823         if s.collectionBlocks == 0 {
824                 return 0
825         }
826         return float64(s.collectionBlockRefs) / float64(s.collectionBlocks)
827 }
828
829 type replicationStats struct {
830         desired      blocksNBytes
831         surplus      blocksNBytes
832         short        blocksNBytes
833         unachievable blocksNBytes
834 }
835
836 type balancedBlockState struct {
837         desired      int
838         surplus      int
839         unachievable bool
840 }
841
842 func (bal *Balancer) collectStatistics(results <-chan balanceResult) {
843         var s balancerStats
844         s.replHistogram = make([]int, 2)
845         s.classStats = make(map[string]replicationStats, len(bal.classes))
846         for result := range results {
847                 surplus := result.have - result.want
848                 bytes := result.blkid.Size()
849
850                 if rc := int64(result.blk.RefCount); rc > 0 {
851                         s.collectionBytes += rc * bytes
852                         s.collectionBlockBytes += bytes
853                         s.collectionBlockRefs += rc
854                         s.collectionBlocks++
855                 }
856
857                 for class, state := range result.classState {
858                         cs := s.classStats[class]
859                         if state.unachievable {
860                                 cs.unachievable.blocks++
861                                 cs.unachievable.bytes += bytes
862                         }
863                         if state.desired > 0 {
864                                 cs.desired.replicas += state.desired
865                                 cs.desired.blocks++
866                                 cs.desired.bytes += bytes * int64(state.desired)
867                         }
868                         if state.surplus > 0 {
869                                 cs.surplus.replicas += state.surplus
870                                 cs.surplus.blocks++
871                                 cs.surplus.bytes += bytes * int64(state.surplus)
872                         } else if state.surplus < 0 {
873                                 cs.short.replicas += -state.surplus
874                                 cs.short.blocks++
875                                 cs.short.bytes += bytes * int64(-state.surplus)
876                         }
877                         s.classStats[class] = cs
878                 }
879
880                 switch {
881                 case result.have == 0 && result.want > 0:
882                         s.lost.replicas -= surplus
883                         s.lost.blocks++
884                         s.lost.bytes += bytes * int64(-surplus)
885                 case surplus < 0:
886                         s.underrep.replicas -= surplus
887                         s.underrep.blocks++
888                         s.underrep.bytes += bytes * int64(-surplus)
889                 case surplus > 0 && result.want == 0:
890                         counter := &s.garbage
891                         for _, r := range result.blk.Replicas {
892                                 if r.Mtime >= bal.MinMtime {
893                                         counter = &s.unref
894                                         break
895                                 }
896                         }
897                         counter.replicas += surplus
898                         counter.blocks++
899                         counter.bytes += bytes * int64(surplus)
900                 case surplus > 0:
901                         s.overrep.replicas += surplus
902                         s.overrep.blocks++
903                         s.overrep.bytes += bytes * int64(result.have-result.want)
904                 default:
905                         s.justright.replicas += result.want
906                         s.justright.blocks++
907                         s.justright.bytes += bytes * int64(result.want)
908                 }
909
910                 if result.want > 0 {
911                         s.desired.replicas += result.want
912                         s.desired.blocks++
913                         s.desired.bytes += bytes * int64(result.want)
914                 }
915                 if result.have > 0 {
916                         s.current.replicas += result.have
917                         s.current.blocks++
918                         s.current.bytes += bytes * int64(result.have)
919                 }
920
921                 for len(s.replHistogram) <= result.have {
922                         s.replHistogram = append(s.replHistogram, 0)
923                 }
924                 s.replHistogram[result.have]++
925         }
926         for _, srv := range bal.KeepServices {
927                 s.pulls += len(srv.ChangeSet.Pulls)
928                 s.trashes += len(srv.ChangeSet.Trashes)
929         }
930         bal.stats = s
931         bal.Metrics.UpdateStats(s)
932 }
933
934 // PrintStatistics writes statistics about the computed changes to
935 // bal.Logger. It should not be called until ComputeChangeSets has
936 // finished.
937 func (bal *Balancer) PrintStatistics() {
938         bal.logf("===")
939         bal.logf("%s lost (0=have<want)", bal.stats.lost)
940         bal.logf("%s underreplicated (0<have<want)", bal.stats.underrep)
941         bal.logf("%s just right (have=want)", bal.stats.justright)
942         bal.logf("%s overreplicated (have>want>0)", bal.stats.overrep)
943         bal.logf("%s unreferenced (have>want=0, new)", bal.stats.unref)
944         bal.logf("%s garbage (have>want=0, old)", bal.stats.garbage)
945         for _, class := range bal.classes {
946                 cs := bal.stats.classStats[class]
947                 bal.logf("===")
948                 bal.logf("storage class %q: %s desired", class, cs.desired)
949                 bal.logf("storage class %q: %s short", class, cs.short)
950                 bal.logf("storage class %q: %s surplus", class, cs.surplus)
951                 bal.logf("storage class %q: %s unachievable", class, cs.unachievable)
952         }
953         bal.logf("===")
954         bal.logf("%s total commitment (excluding unreferenced)", bal.stats.desired)
955         bal.logf("%s total usage", bal.stats.current)
956         bal.logf("===")
957         for _, srv := range bal.KeepServices {
958                 bal.logf("%s: %v\n", srv, srv.ChangeSet)
959         }
960         bal.logf("===")
961         bal.printHistogram(60)
962         bal.logf("===")
963 }
964
965 func (bal *Balancer) printHistogram(hashColumns int) {
966         bal.logf("Replication level distribution (counting N replicas on a single server as N):")
967         maxCount := 0
968         for _, count := range bal.stats.replHistogram {
969                 if maxCount < count {
970                         maxCount = count
971                 }
972         }
973         hashes := strings.Repeat("#", hashColumns)
974         countWidth := 1 + int(math.Log10(float64(maxCount+1)))
975         scaleCount := 10 * float64(hashColumns) / math.Floor(1+10*math.Log10(float64(maxCount+1)))
976         for repl, count := range bal.stats.replHistogram {
977                 nHashes := int(scaleCount * math.Log10(float64(count+1)))
978                 bal.logf("%2d: %*d %s", repl, countWidth, count, hashes[:nHashes])
979         }
980 }
981
982 // CheckSanityLate checks for configuration and runtime errors after
983 // GetCurrentState() and ComputeChangeSets() have finished.
984 //
985 // If it returns an error, it is dangerous to run any Commit methods.
986 func (bal *Balancer) CheckSanityLate() error {
987         if bal.errors != nil {
988                 for _, err := range bal.errors {
989                         bal.logf("deferred error: %v", err)
990                 }
991                 return fmt.Errorf("cannot proceed safely after deferred errors")
992         }
993
994         if bal.collScanned == 0 {
995                 return fmt.Errorf("received zero collections")
996         }
997
998         anyDesired := false
999         bal.BlockStateMap.Apply(func(_ arvados.SizedDigest, blk *BlockState) {
1000                 for _, desired := range blk.Desired {
1001                         if desired > 0 {
1002                                 anyDesired = true
1003                                 break
1004                         }
1005                 }
1006         })
1007         if !anyDesired {
1008                 return fmt.Errorf("zero blocks have desired replication>0")
1009         }
1010
1011         if dr := bal.DefaultReplication; dr < 1 {
1012                 return fmt.Errorf("Default replication (%d) is less than 1", dr)
1013         }
1014
1015         // TODO: no two services have identical indexes
1016         // TODO: no collisions (same md5, different size)
1017         return nil
1018 }
1019
1020 // CommitPulls sends the computed lists of pull requests to the
1021 // keepstore servers. This has the effect of increasing replication of
1022 // existing blocks that are either underreplicated or poorly
1023 // distributed according to rendezvous hashing.
1024 func (bal *Balancer) CommitPulls(c *arvados.Client) error {
1025         defer bal.time("send_pull_lists", "wall clock time to send pull lists")()
1026         return bal.commitAsync(c, "send pull list",
1027                 func(srv *KeepService) error {
1028                         return srv.CommitPulls(c)
1029                 })
1030 }
1031
1032 // CommitTrash sends the computed lists of trash requests to the
1033 // keepstore servers. This has the effect of deleting blocks that are
1034 // overreplicated or unreferenced.
1035 func (bal *Balancer) CommitTrash(c *arvados.Client) error {
1036         defer bal.time("send_trash_lists", "wall clock time to send trash lists")()
1037         return bal.commitAsync(c, "send trash list",
1038                 func(srv *KeepService) error {
1039                         return srv.CommitTrash(c)
1040                 })
1041 }
1042
1043 func (bal *Balancer) commitAsync(c *arvados.Client, label string, f func(srv *KeepService) error) error {
1044         errs := make(chan error)
1045         for _, srv := range bal.KeepServices {
1046                 go func(srv *KeepService) {
1047                         var err error
1048                         defer func() { errs <- err }()
1049                         label := fmt.Sprintf("%s: %v", srv, label)
1050                         err = f(srv)
1051                         if err != nil {
1052                                 err = fmt.Errorf("%s: %v", label, err)
1053                         }
1054                 }(srv)
1055         }
1056         var lastErr error
1057         for range bal.KeepServices {
1058                 if err := <-errs; err != nil {
1059                         bal.logf("%v", err)
1060                         lastErr = err
1061                 }
1062         }
1063         close(errs)
1064         return lastErr
1065 }
1066
1067 func (bal *Balancer) logf(f string, args ...interface{}) {
1068         if bal.Logger != nil {
1069                 bal.Logger.Printf(f, args...)
1070         }
1071 }
1072
1073 func (bal *Balancer) time(name, help string) func() {
1074         observer := bal.Metrics.DurationObserver(name+"_seconds", help)
1075         t0 := time.Now()
1076         bal.Logger.Printf("%s: start", name)
1077         return func() {
1078                 dur := time.Since(t0)
1079                 observer.Observe(dur.Seconds())
1080                 bal.Logger.Printf("%s: took %vs", name, dur.Seconds())
1081         }
1082 }
1083
1084 // Rendezvous hash sort function. Less efficient than sorting on
1085 // precomputed rendezvous hashes, but also rarely used.
1086 func rendezvousLess(i, j string, blkid arvados.SizedDigest) bool {
1087         a := md5.Sum([]byte(string(blkid[:32]) + i))
1088         b := md5.Sum([]byte(string(blkid[:32]) + j))
1089         return bytes.Compare(a[:], b[:]) < 0
1090 }