Merge branch '14407-selenium'
[arvados.git] / services / keep-balance / balance.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "crypto/md5"
10         "fmt"
11         "log"
12         "math"
13         "runtime"
14         "sort"
15         "strings"
16         "sync"
17         "time"
18
19         "git.curoverse.com/arvados.git/sdk/go/arvados"
20         "git.curoverse.com/arvados.git/sdk/go/keepclient"
21         "github.com/Sirupsen/logrus"
22 )
23
24 // Balancer compares the contents of keepstore servers with the
25 // collections stored in Arvados, and issues pull/trash requests
26 // needed to get (closer to) the optimal data layout.
27 //
28 // In the optimal data layout: every data block referenced by a
29 // collection is replicated at least as many times as desired by the
30 // collection; there are no unreferenced data blocks older than
31 // BlobSignatureTTL; and all N existing replicas of a given data block
32 // are in the N best positions in rendezvous probe order.
33 type Balancer struct {
34         Logger  *logrus.Logger
35         Dumper  *logrus.Logger
36         Metrics *metrics
37
38         *BlockStateMap
39         KeepServices       map[string]*KeepService
40         DefaultReplication int
41         MinMtime           int64
42
43         classes       []string
44         mounts        int
45         mountsByClass map[string]map[*KeepMount]bool
46         collScanned   int
47         serviceRoots  map[string]string
48         errors        []error
49         stats         balancerStats
50         mutex         sync.Mutex
51 }
52
53 // Run performs a balance operation using the given config and
54 // runOptions, and returns RunOptions suitable for passing to a
55 // subsequent balance operation.
56 //
57 // Run should only be called once on a given Balancer object.
58 //
59 // Typical usage:
60 //
61 //   runOptions, err = (&Balancer{}).Run(config, runOptions)
62 func (bal *Balancer) Run(config Config, runOptions RunOptions) (nextRunOptions RunOptions, err error) {
63         nextRunOptions = runOptions
64
65         defer bal.time("sweep", "wall clock time to run one full sweep")()
66
67         if len(config.KeepServiceList.Items) > 0 {
68                 err = bal.SetKeepServices(config.KeepServiceList)
69         } else {
70                 err = bal.DiscoverKeepServices(&config.Client, config.KeepServiceTypes)
71         }
72         if err != nil {
73                 return
74         }
75
76         for _, srv := range bal.KeepServices {
77                 err = srv.discoverMounts(&config.Client)
78                 if err != nil {
79                         return
80                 }
81         }
82         bal.cleanupMounts()
83
84         if err = bal.CheckSanityEarly(&config.Client); err != nil {
85                 return
86         }
87         rs := bal.rendezvousState()
88         if runOptions.CommitTrash && rs != runOptions.SafeRendezvousState {
89                 if runOptions.SafeRendezvousState != "" {
90                         bal.logf("notice: KeepServices list has changed since last run")
91                 }
92                 bal.logf("clearing existing trash lists, in case the new rendezvous order differs from previous run")
93                 if err = bal.ClearTrashLists(&config.Client); err != nil {
94                         return
95                 }
96                 // The current rendezvous state becomes "safe" (i.e.,
97                 // OK to compute changes for that state without
98                 // clearing existing trash lists) only now, after we
99                 // succeed in clearing existing trash lists.
100                 nextRunOptions.SafeRendezvousState = rs
101         }
102         if err = bal.GetCurrentState(&config.Client, config.CollectionBatchSize, config.CollectionBuffers); err != nil {
103                 return
104         }
105         bal.ComputeChangeSets()
106         bal.PrintStatistics()
107         if err = bal.CheckSanityLate(); err != nil {
108                 return
109         }
110         if runOptions.CommitPulls {
111                 err = bal.CommitPulls(&config.Client)
112                 if err != nil {
113                         // Skip trash if we can't pull. (Too cautious?)
114                         return
115                 }
116         }
117         if runOptions.CommitTrash {
118                 err = bal.CommitTrash(&config.Client)
119         }
120         return
121 }
122
123 // SetKeepServices sets the list of KeepServices to operate on.
124 func (bal *Balancer) SetKeepServices(srvList arvados.KeepServiceList) error {
125         bal.KeepServices = make(map[string]*KeepService)
126         for _, srv := range srvList.Items {
127                 bal.KeepServices[srv.UUID] = &KeepService{
128                         KeepService: srv,
129                         ChangeSet:   &ChangeSet{},
130                 }
131         }
132         return nil
133 }
134
135 // DiscoverKeepServices sets the list of KeepServices by calling the
136 // API to get a list of all services, and selecting the ones whose
137 // ServiceType is in okTypes.
138 func (bal *Balancer) DiscoverKeepServices(c *arvados.Client, okTypes []string) error {
139         bal.KeepServices = make(map[string]*KeepService)
140         ok := make(map[string]bool)
141         for _, t := range okTypes {
142                 ok[t] = true
143         }
144         return c.EachKeepService(func(srv arvados.KeepService) error {
145                 if ok[srv.ServiceType] {
146                         bal.KeepServices[srv.UUID] = &KeepService{
147                                 KeepService: srv,
148                                 ChangeSet:   &ChangeSet{},
149                         }
150                 } else {
151                         bal.logf("skipping %v with service type %q", srv.UUID, srv.ServiceType)
152                 }
153                 return nil
154         })
155 }
156
157 func (bal *Balancer) cleanupMounts() {
158         rwdev := map[string]*KeepService{}
159         for _, srv := range bal.KeepServices {
160                 for _, mnt := range srv.mounts {
161                         if !mnt.ReadOnly && mnt.DeviceID != "" {
162                                 rwdev[mnt.DeviceID] = srv
163                         }
164                 }
165         }
166         // Drop the readonly mounts whose device is mounted RW
167         // elsewhere.
168         for _, srv := range bal.KeepServices {
169                 var dedup []*KeepMount
170                 for _, mnt := range srv.mounts {
171                         if mnt.ReadOnly && rwdev[mnt.DeviceID] != nil {
172                                 bal.logf("skipping srv %s readonly mount %q because same device %q is mounted read-write on srv %s", srv, mnt.UUID, mnt.DeviceID, rwdev[mnt.DeviceID])
173                         } else {
174                                 dedup = append(dedup, mnt)
175                         }
176                 }
177                 srv.mounts = dedup
178         }
179         for _, srv := range bal.KeepServices {
180                 for _, mnt := range srv.mounts {
181                         if mnt.Replication <= 0 {
182                                 log.Printf("%s: mount %s reports replication=%d, using replication=1", srv, mnt.UUID, mnt.Replication)
183                                 mnt.Replication = 1
184                         }
185                 }
186         }
187 }
188
189 // CheckSanityEarly checks for configuration and runtime errors that
190 // can be detected before GetCurrentState() and ComputeChangeSets()
191 // are called.
192 //
193 // If it returns an error, it is pointless to run GetCurrentState or
194 // ComputeChangeSets: after doing so, the statistics would be
195 // meaningless and it would be dangerous to run any Commit methods.
196 func (bal *Balancer) CheckSanityEarly(c *arvados.Client) error {
197         u, err := c.CurrentUser()
198         if err != nil {
199                 return fmt.Errorf("CurrentUser(): %v", err)
200         }
201         if !u.IsActive || !u.IsAdmin {
202                 return fmt.Errorf("current user (%s) is not an active admin user", u.UUID)
203         }
204         for _, srv := range bal.KeepServices {
205                 if srv.ServiceType == "proxy" {
206                         return fmt.Errorf("config error: %s: proxy servers cannot be balanced", srv)
207                 }
208         }
209         return nil
210 }
211
212 // rendezvousState returns a fingerprint (e.g., a sorted list of
213 // UUID+host+port) of the current set of keep services.
214 func (bal *Balancer) rendezvousState() string {
215         srvs := make([]string, 0, len(bal.KeepServices))
216         for _, srv := range bal.KeepServices {
217                 srvs = append(srvs, srv.String())
218         }
219         sort.Strings(srvs)
220         return strings.Join(srvs, "; ")
221 }
222
223 // ClearTrashLists sends an empty trash list to each keep
224 // service. Calling this before GetCurrentState avoids races.
225 //
226 // When a block appears in an index, we assume that replica will still
227 // exist after we delete other replicas on other servers. However,
228 // it's possible that a previous rebalancing operation made different
229 // decisions (e.g., servers were added/removed, and rendezvous order
230 // changed). In this case, the replica might already be on that
231 // server's trash list, and it might be deleted before we send a
232 // replacement trash list.
233 //
234 // We avoid this problem if we clear all trash lists before getting
235 // indexes. (We also assume there is only one rebalancing process
236 // running at a time.)
237 func (bal *Balancer) ClearTrashLists(c *arvados.Client) error {
238         for _, srv := range bal.KeepServices {
239                 srv.ChangeSet = &ChangeSet{}
240         }
241         return bal.CommitTrash(c)
242 }
243
244 // GetCurrentState determines the current replication state, and the
245 // desired replication level, for every block that is either
246 // retrievable or referenced.
247 //
248 // It determines the current replication state by reading the block index
249 // from every known Keep service.
250 //
251 // It determines the desired replication level by retrieving all
252 // collection manifests in the database (API server).
253 //
254 // It encodes the resulting information in BlockStateMap.
255 func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) error {
256         defer bal.time("get_state", "wall clock time to get current state")()
257         bal.BlockStateMap = NewBlockStateMap()
258
259         dd, err := c.DiscoveryDocument()
260         if err != nil {
261                 return err
262         }
263         bal.DefaultReplication = dd.DefaultCollectionReplication
264         bal.MinMtime = time.Now().UnixNano() - dd.BlobSignatureTTL*1e9
265
266         errs := make(chan error, 2+len(bal.KeepServices))
267         wg := sync.WaitGroup{}
268
269         // When a device is mounted more than once, we will get its
270         // index only once, and call AddReplicas on all of the mounts.
271         // equivMount keys are the mounts that will be indexed, and
272         // each value is a list of mounts to apply the received index
273         // to.
274         equivMount := map[*KeepMount][]*KeepMount{}
275         // deviceMount maps each device ID to the one mount that will
276         // be indexed for that device.
277         deviceMount := map[string]*KeepMount{}
278         for _, srv := range bal.KeepServices {
279                 for _, mnt := range srv.mounts {
280                         equiv := deviceMount[mnt.DeviceID]
281                         if equiv == nil {
282                                 equiv = mnt
283                                 if mnt.DeviceID != "" {
284                                         deviceMount[mnt.DeviceID] = equiv
285                                 }
286                         }
287                         equivMount[equiv] = append(equivMount[equiv], mnt)
288                 }
289         }
290
291         // Start one goroutine for each (non-redundant) mount:
292         // retrieve the index, and add the returned blocks to
293         // BlockStateMap.
294         for _, mounts := range equivMount {
295                 wg.Add(1)
296                 go func(mounts []*KeepMount) {
297                         defer wg.Done()
298                         bal.logf("mount %s: retrieve index from %s", mounts[0], mounts[0].KeepService)
299                         idx, err := mounts[0].KeepService.IndexMount(c, mounts[0].UUID, "")
300                         if err != nil {
301                                 errs <- fmt.Errorf("%s: retrieve index: %v", mounts[0], err)
302                                 return
303                         }
304                         if len(errs) > 0 {
305                                 // Some other goroutine encountered an
306                                 // error -- any further effort here
307                                 // will be wasted.
308                                 return
309                         }
310                         for _, mount := range mounts {
311                                 bal.logf("%s: add %d entries to map", mount, len(idx))
312                                 bal.BlockStateMap.AddReplicas(mount, idx)
313                                 bal.logf("%s: added %d entries to map at %dx (%d replicas)", mount, len(idx), mount.Replication, len(idx)*mount.Replication)
314                         }
315                         bal.logf("mount %s: index done", mounts[0])
316                 }(mounts)
317         }
318
319         // collQ buffers incoming collections so we can start fetching
320         // the next page without waiting for the current page to
321         // finish processing.
322         collQ := make(chan arvados.Collection, bufs)
323
324         // Start a goroutine to process collections. (We could use a
325         // worker pool here, but even with a single worker we already
326         // process collections much faster than we can retrieve them.)
327         wg.Add(1)
328         go func() {
329                 defer wg.Done()
330                 for coll := range collQ {
331                         err := bal.addCollection(coll)
332                         if err != nil {
333                                 errs <- err
334                                 for range collQ {
335                                 }
336                                 return
337                         }
338                         bal.collScanned++
339                 }
340         }()
341
342         // Start a goroutine to retrieve all collections from the
343         // Arvados database and send them to collQ for processing.
344         wg.Add(1)
345         go func() {
346                 defer wg.Done()
347                 err = EachCollection(c, pageSize,
348                         func(coll arvados.Collection) error {
349                                 collQ <- coll
350                                 if len(errs) > 0 {
351                                         // some other GetCurrentState
352                                         // error happened: no point
353                                         // getting any more
354                                         // collections.
355                                         return fmt.Errorf("")
356                                 }
357                                 return nil
358                         }, func(done, total int) {
359                                 bal.logf("collections: %d/%d", done, total)
360                         })
361                 close(collQ)
362                 if err != nil {
363                         errs <- err
364                 }
365         }()
366
367         wg.Wait()
368         if len(errs) > 0 {
369                 return <-errs
370         }
371         return nil
372 }
373
374 func (bal *Balancer) addCollection(coll arvados.Collection) error {
375         blkids, err := coll.SizedDigests()
376         if err != nil {
377                 bal.mutex.Lock()
378                 bal.errors = append(bal.errors, fmt.Errorf("%v: %v", coll.UUID, err))
379                 bal.mutex.Unlock()
380                 return nil
381         }
382         repl := bal.DefaultReplication
383         if coll.ReplicationDesired != nil {
384                 repl = *coll.ReplicationDesired
385         }
386         debugf("%v: %d block x%d", coll.UUID, len(blkids), repl)
387         bal.BlockStateMap.IncreaseDesired(coll.StorageClassesDesired, repl, blkids)
388         return nil
389 }
390
391 // ComputeChangeSets compares, for each known block, the current and
392 // desired replication states. If it is possible to get closer to the
393 // desired state by copying or deleting blocks, it adds those changes
394 // to the relevant KeepServices' ChangeSets.
395 //
396 // It does not actually apply any of the computed changes.
397 func (bal *Balancer) ComputeChangeSets() {
398         // This just calls balanceBlock() once for each block, using a
399         // pool of worker goroutines.
400         defer bal.time("changeset_compute", "wall clock time to compute changesets")()
401         bal.setupLookupTables()
402
403         type balanceTask struct {
404                 blkid arvados.SizedDigest
405                 blk   *BlockState
406         }
407         workers := runtime.GOMAXPROCS(-1)
408         todo := make(chan balanceTask, workers)
409         go func() {
410                 bal.BlockStateMap.Apply(func(blkid arvados.SizedDigest, blk *BlockState) {
411                         todo <- balanceTask{
412                                 blkid: blkid,
413                                 blk:   blk,
414                         }
415                 })
416                 close(todo)
417         }()
418         results := make(chan balanceResult, workers)
419         go func() {
420                 var wg sync.WaitGroup
421                 for i := 0; i < workers; i++ {
422                         wg.Add(1)
423                         go func() {
424                                 for work := range todo {
425                                         results <- bal.balanceBlock(work.blkid, work.blk)
426                                 }
427                                 wg.Done()
428                         }()
429                 }
430                 wg.Wait()
431                 close(results)
432         }()
433         bal.collectStatistics(results)
434 }
435
436 func (bal *Balancer) setupLookupTables() {
437         bal.serviceRoots = make(map[string]string)
438         bal.classes = []string{"default"}
439         bal.mountsByClass = map[string]map[*KeepMount]bool{"default": {}}
440         bal.mounts = 0
441         for _, srv := range bal.KeepServices {
442                 bal.serviceRoots[srv.UUID] = srv.UUID
443                 for _, mnt := range srv.mounts {
444                         bal.mounts++
445
446                         // All mounts on a read-only service are
447                         // effectively read-only.
448                         mnt.ReadOnly = mnt.ReadOnly || srv.ReadOnly
449
450                         if len(mnt.StorageClasses) == 0 {
451                                 bal.mountsByClass["default"][mnt] = true
452                                 continue
453                         }
454                         for _, class := range mnt.StorageClasses {
455                                 if mbc := bal.mountsByClass[class]; mbc == nil {
456                                         bal.classes = append(bal.classes, class)
457                                         bal.mountsByClass[class] = map[*KeepMount]bool{mnt: true}
458                                 } else {
459                                         mbc[mnt] = true
460                                 }
461                         }
462                 }
463         }
464         // Consider classes in lexicographic order to avoid flapping
465         // between balancing runs.  The outcome of the "prefer a mount
466         // we're already planning to use for a different storage
467         // class" case in balanceBlock depends on the order classes
468         // are considered.
469         sort.Strings(bal.classes)
470 }
471
472 const (
473         changeStay = iota
474         changePull
475         changeTrash
476         changeNone
477 )
478
479 var changeName = map[int]string{
480         changeStay:  "stay",
481         changePull:  "pull",
482         changeTrash: "trash",
483         changeNone:  "none",
484 }
485
486 type balanceResult struct {
487         blk        *BlockState
488         blkid      arvados.SizedDigest
489         have       int
490         want       int
491         classState map[string]balancedBlockState
492 }
493
494 // balanceBlock compares current state to desired state for a single
495 // block, and makes the appropriate ChangeSet calls.
496 func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) balanceResult {
497         debugf("balanceBlock: %v %+v", blkid, blk)
498
499         type slot struct {
500                 mnt  *KeepMount // never nil
501                 repl *Replica   // replica already stored here (or nil)
502                 want bool       // we should pull/leave a replica here
503         }
504
505         // Build a list of all slots (one per mounted volume).
506         slots := make([]slot, 0, bal.mounts)
507         for _, srv := range bal.KeepServices {
508                 for _, mnt := range srv.mounts {
509                         var repl *Replica
510                         for r := range blk.Replicas {
511                                 if blk.Replicas[r].KeepMount == mnt {
512                                         repl = &blk.Replicas[r]
513                                 }
514                         }
515                         // Initial value of "want" is "have, and can't
516                         // delete". These untrashable replicas get
517                         // prioritized when sorting slots: otherwise,
518                         // non-optimal readonly copies would cause us
519                         // to overreplicate.
520                         slots = append(slots, slot{
521                                 mnt:  mnt,
522                                 repl: repl,
523                                 want: repl != nil && mnt.ReadOnly,
524                         })
525                 }
526         }
527
528         uuids := keepclient.NewRootSorter(bal.serviceRoots, string(blkid[:32])).GetSortedRoots()
529         srvRendezvous := make(map[*KeepService]int, len(uuids))
530         for i, uuid := range uuids {
531                 srv := bal.KeepServices[uuid]
532                 srvRendezvous[srv] = i
533         }
534
535         // Below we set underreplicated=true if we find any storage
536         // class that's currently underreplicated -- in that case we
537         // won't want to trash any replicas.
538         underreplicated := false
539
540         classState := make(map[string]balancedBlockState, len(bal.classes))
541         unsafeToDelete := make(map[int64]bool, len(slots))
542         for _, class := range bal.classes {
543                 desired := blk.Desired[class]
544
545                 countedDev := map[string]bool{}
546                 have := 0
547                 for _, slot := range slots {
548                         if slot.repl != nil && bal.mountsByClass[class][slot.mnt] && !countedDev[slot.mnt.DeviceID] {
549                                 have += slot.mnt.Replication
550                                 if slot.mnt.DeviceID != "" {
551                                         countedDev[slot.mnt.DeviceID] = true
552                                 }
553                         }
554                 }
555                 classState[class] = balancedBlockState{
556                         desired: desired,
557                         surplus: have - desired,
558                 }
559
560                 if desired == 0 {
561                         continue
562                 }
563
564                 // Sort the slots by desirability.
565                 sort.Slice(slots, func(i, j int) bool {
566                         si, sj := slots[i], slots[j]
567                         if classi, classj := bal.mountsByClass[class][si.mnt], bal.mountsByClass[class][sj.mnt]; classi != classj {
568                                 // Prefer a mount that satisfies the
569                                 // desired class.
570                                 return bal.mountsByClass[class][si.mnt]
571                         } else if si.want != sj.want {
572                                 // Prefer a mount that will have a
573                                 // replica no matter what we do here
574                                 // -- either because it already has an
575                                 // untrashable replica, or because we
576                                 // already need it to satisfy a
577                                 // different storage class.
578                                 return si.want
579                         } else if orderi, orderj := srvRendezvous[si.mnt.KeepService], srvRendezvous[sj.mnt.KeepService]; orderi != orderj {
580                                 // Prefer a better rendezvous
581                                 // position.
582                                 return orderi < orderj
583                         } else if repli, replj := si.repl != nil, sj.repl != nil; repli != replj {
584                                 // Prefer a mount that already has a
585                                 // replica.
586                                 return repli
587                         } else {
588                                 // If pull/trash turns out to be
589                                 // needed, distribute the
590                                 // new/remaining replicas uniformly
591                                 // across qualifying mounts on a given
592                                 // server.
593                                 return rendezvousLess(si.mnt.DeviceID, sj.mnt.DeviceID, blkid)
594                         }
595                 })
596
597                 // Servers/mounts/devices (with or without existing
598                 // replicas) that are part of the best achievable
599                 // layout for this storage class.
600                 wantSrv := map[*KeepService]bool{}
601                 wantMnt := map[*KeepMount]bool{}
602                 wantDev := map[string]bool{}
603                 // Positions (with existing replicas) that have been
604                 // protected (via unsafeToDelete) to ensure we don't
605                 // reduce replication below desired level when
606                 // trashing replicas that aren't optimal positions for
607                 // any storage class.
608                 protMnt := map[*KeepMount]bool{}
609                 // Replication planned so far (corresponds to wantMnt).
610                 replWant := 0
611                 // Protected replication (corresponds to protMnt).
612                 replProt := 0
613
614                 // trySlot tries using a slot to meet requirements,
615                 // and returns true if all requirements are met.
616                 trySlot := func(i int) bool {
617                         slot := slots[i]
618                         if wantMnt[slot.mnt] || wantDev[slot.mnt.DeviceID] {
619                                 // Already allocated a replica to this
620                                 // backend device, possibly on a
621                                 // different server.
622                                 return false
623                         }
624                         if replProt < desired && slot.repl != nil && !protMnt[slot.mnt] {
625                                 unsafeToDelete[slot.repl.Mtime] = true
626                                 protMnt[slot.mnt] = true
627                                 replProt += slot.mnt.Replication
628                         }
629                         if replWant < desired && (slot.repl != nil || !slot.mnt.ReadOnly) {
630                                 slots[i].want = true
631                                 wantSrv[slot.mnt.KeepService] = true
632                                 wantMnt[slot.mnt] = true
633                                 if slot.mnt.DeviceID != "" {
634                                         wantDev[slot.mnt.DeviceID] = true
635                                 }
636                                 replWant += slot.mnt.Replication
637                         }
638                         return replProt >= desired && replWant >= desired
639                 }
640
641                 // First try to achieve desired replication without
642                 // using the same server twice.
643                 done := false
644                 for i := 0; i < len(slots) && !done; i++ {
645                         if !wantSrv[slots[i].mnt.KeepService] {
646                                 done = trySlot(i)
647                         }
648                 }
649
650                 // If that didn't suffice, do another pass without the
651                 // "distinct services" restriction. (Achieving the
652                 // desired volume replication on fewer than the
653                 // desired number of services is better than
654                 // underreplicating.)
655                 for i := 0; i < len(slots) && !done; i++ {
656                         done = trySlot(i)
657                 }
658
659                 if !underreplicated {
660                         safe := 0
661                         for _, slot := range slots {
662                                 if slot.repl == nil || !bal.mountsByClass[class][slot.mnt] {
663                                         continue
664                                 }
665                                 if safe += slot.mnt.Replication; safe >= desired {
666                                         break
667                                 }
668                         }
669                         underreplicated = safe < desired
670                 }
671
672                 // set the unachievable flag if there aren't enough
673                 // slots offering the relevant storage class. (This is
674                 // as easy as checking slots[desired] because we
675                 // already sorted the qualifying slots to the front.)
676                 if desired >= len(slots) || !bal.mountsByClass[class][slots[desired].mnt] {
677                         cs := classState[class]
678                         cs.unachievable = true
679                         classState[class] = cs
680                 }
681
682                 // Avoid deleting wanted replicas from devices that
683                 // are mounted on multiple servers -- even if they
684                 // haven't already been added to unsafeToDelete
685                 // because the servers report different Mtimes.
686                 for _, slot := range slots {
687                         if slot.repl != nil && wantDev[slot.mnt.DeviceID] {
688                                 unsafeToDelete[slot.repl.Mtime] = true
689                         }
690                 }
691         }
692
693         // TODO: If multiple replicas are trashable, prefer the oldest
694         // replica that doesn't have a timestamp collision with
695         // others.
696
697         countedDev := map[string]bool{}
698         var have, want int
699         for _, slot := range slots {
700                 if countedDev[slot.mnt.DeviceID] {
701                         continue
702                 }
703                 if slot.want {
704                         want += slot.mnt.Replication
705                 }
706                 if slot.repl != nil {
707                         have += slot.mnt.Replication
708                 }
709                 if slot.mnt.DeviceID != "" {
710                         countedDev[slot.mnt.DeviceID] = true
711                 }
712         }
713
714         var changes []string
715         for _, slot := range slots {
716                 // TODO: request a Touch if Mtime is duplicated.
717                 var change int
718                 switch {
719                 case !underreplicated && !slot.want && slot.repl != nil && slot.repl.Mtime < bal.MinMtime && !unsafeToDelete[slot.repl.Mtime]:
720                         slot.mnt.KeepService.AddTrash(Trash{
721                                 SizedDigest: blkid,
722                                 Mtime:       slot.repl.Mtime,
723                                 From:        slot.mnt,
724                         })
725                         change = changeTrash
726                 case len(blk.Replicas) == 0:
727                         change = changeNone
728                 case slot.repl == nil && slot.want && !slot.mnt.ReadOnly:
729                         slot.mnt.KeepService.AddPull(Pull{
730                                 SizedDigest: blkid,
731                                 From:        blk.Replicas[0].KeepMount.KeepService,
732                                 To:          slot.mnt,
733                         })
734                         change = changePull
735                 default:
736                         change = changeStay
737                 }
738                 if bal.Dumper != nil {
739                         var mtime int64
740                         if slot.repl != nil {
741                                 mtime = slot.repl.Mtime
742                         }
743                         srv := slot.mnt.KeepService
744                         changes = append(changes, fmt.Sprintf("%s:%d/%s=%s,%d", srv.ServiceHost, srv.ServicePort, slot.mnt.UUID, changeName[change], mtime))
745                 }
746         }
747         if bal.Dumper != nil {
748                 bal.Dumper.Printf("%s have=%d want=%v %s", blkid, have, want, strings.Join(changes, " "))
749         }
750         return balanceResult{
751                 blk:        blk,
752                 blkid:      blkid,
753                 have:       have,
754                 want:       want,
755                 classState: classState,
756         }
757 }
758
759 type blocksNBytes struct {
760         replicas int
761         blocks   int
762         bytes    int64
763 }
764
765 func (bb blocksNBytes) String() string {
766         return fmt.Sprintf("%d replicas (%d blocks, %d bytes)", bb.replicas, bb.blocks, bb.bytes)
767 }
768
769 type balancerStats struct {
770         lost          blocksNBytes
771         overrep       blocksNBytes
772         unref         blocksNBytes
773         garbage       blocksNBytes
774         underrep      blocksNBytes
775         unachievable  blocksNBytes
776         justright     blocksNBytes
777         desired       blocksNBytes
778         current       blocksNBytes
779         pulls         int
780         trashes       int
781         replHistogram []int
782         classStats    map[string]replicationStats
783
784         // collectionBytes / collectionBlockBytes = deduplication ratio
785         collectionBytes      int64 // sum(bytes in referenced blocks) across all collections
786         collectionBlockBytes int64 // sum(block size) across all blocks referenced by collections
787         collectionBlockRefs  int64 // sum(number of blocks referenced) across all collections
788         collectionBlocks     int64 // number of blocks referenced by any collection
789 }
790
791 func (s *balancerStats) dedupByteRatio() float64 {
792         if s.collectionBlockBytes == 0 {
793                 return 0
794         }
795         return float64(s.collectionBytes) / float64(s.collectionBlockBytes)
796 }
797
798 func (s *balancerStats) dedupBlockRatio() float64 {
799         if s.collectionBlocks == 0 {
800                 return 0
801         }
802         return float64(s.collectionBlockRefs) / float64(s.collectionBlocks)
803 }
804
805 type replicationStats struct {
806         desired      blocksNBytes
807         surplus      blocksNBytes
808         short        blocksNBytes
809         unachievable blocksNBytes
810 }
811
812 type balancedBlockState struct {
813         desired      int
814         surplus      int
815         unachievable bool
816 }
817
818 func (bal *Balancer) collectStatistics(results <-chan balanceResult) {
819         var s balancerStats
820         s.replHistogram = make([]int, 2)
821         s.classStats = make(map[string]replicationStats, len(bal.classes))
822         for result := range results {
823                 surplus := result.have - result.want
824                 bytes := result.blkid.Size()
825
826                 if rc := int64(result.blk.RefCount); rc > 0 {
827                         s.collectionBytes += rc * bytes
828                         s.collectionBlockBytes += bytes
829                         s.collectionBlockRefs += rc
830                         s.collectionBlocks++
831                 }
832
833                 for class, state := range result.classState {
834                         cs := s.classStats[class]
835                         if state.unachievable {
836                                 cs.unachievable.blocks++
837                                 cs.unachievable.bytes += bytes
838                         }
839                         if state.desired > 0 {
840                                 cs.desired.replicas += state.desired
841                                 cs.desired.blocks++
842                                 cs.desired.bytes += bytes * int64(state.desired)
843                         }
844                         if state.surplus > 0 {
845                                 cs.surplus.replicas += state.surplus
846                                 cs.surplus.blocks++
847                                 cs.surplus.bytes += bytes * int64(state.surplus)
848                         } else if state.surplus < 0 {
849                                 cs.short.replicas += -state.surplus
850                                 cs.short.blocks++
851                                 cs.short.bytes += bytes * int64(-state.surplus)
852                         }
853                         s.classStats[class] = cs
854                 }
855
856                 switch {
857                 case result.have == 0 && result.want > 0:
858                         s.lost.replicas -= surplus
859                         s.lost.blocks++
860                         s.lost.bytes += bytes * int64(-surplus)
861                 case surplus < 0:
862                         s.underrep.replicas -= surplus
863                         s.underrep.blocks++
864                         s.underrep.bytes += bytes * int64(-surplus)
865                 case surplus > 0 && result.want == 0:
866                         counter := &s.garbage
867                         for _, r := range result.blk.Replicas {
868                                 if r.Mtime >= bal.MinMtime {
869                                         counter = &s.unref
870                                         break
871                                 }
872                         }
873                         counter.replicas += surplus
874                         counter.blocks++
875                         counter.bytes += bytes * int64(surplus)
876                 case surplus > 0:
877                         s.overrep.replicas += surplus
878                         s.overrep.blocks++
879                         s.overrep.bytes += bytes * int64(result.have-result.want)
880                 default:
881                         s.justright.replicas += result.want
882                         s.justright.blocks++
883                         s.justright.bytes += bytes * int64(result.want)
884                 }
885
886                 if result.want > 0 {
887                         s.desired.replicas += result.want
888                         s.desired.blocks++
889                         s.desired.bytes += bytes * int64(result.want)
890                 }
891                 if result.have > 0 {
892                         s.current.replicas += result.have
893                         s.current.blocks++
894                         s.current.bytes += bytes * int64(result.have)
895                 }
896
897                 for len(s.replHistogram) <= result.have {
898                         s.replHistogram = append(s.replHistogram, 0)
899                 }
900                 s.replHistogram[result.have]++
901         }
902         for _, srv := range bal.KeepServices {
903                 s.pulls += len(srv.ChangeSet.Pulls)
904                 s.trashes += len(srv.ChangeSet.Trashes)
905         }
906         bal.stats = s
907         bal.Metrics.UpdateStats(s)
908 }
909
910 // PrintStatistics writes statistics about the computed changes to
911 // bal.Logger. It should not be called until ComputeChangeSets has
912 // finished.
913 func (bal *Balancer) PrintStatistics() {
914         bal.logf("===")
915         bal.logf("%s lost (0=have<want)", bal.stats.lost)
916         bal.logf("%s underreplicated (0<have<want)", bal.stats.underrep)
917         bal.logf("%s just right (have=want)", bal.stats.justright)
918         bal.logf("%s overreplicated (have>want>0)", bal.stats.overrep)
919         bal.logf("%s unreferenced (have>want=0, new)", bal.stats.unref)
920         bal.logf("%s garbage (have>want=0, old)", bal.stats.garbage)
921         for _, class := range bal.classes {
922                 cs := bal.stats.classStats[class]
923                 bal.logf("===")
924                 bal.logf("storage class %q: %s desired", class, cs.desired)
925                 bal.logf("storage class %q: %s short", class, cs.short)
926                 bal.logf("storage class %q: %s surplus", class, cs.surplus)
927                 bal.logf("storage class %q: %s unachievable", class, cs.unachievable)
928         }
929         bal.logf("===")
930         bal.logf("%s total commitment (excluding unreferenced)", bal.stats.desired)
931         bal.logf("%s total usage", bal.stats.current)
932         bal.logf("===")
933         for _, srv := range bal.KeepServices {
934                 bal.logf("%s: %v\n", srv, srv.ChangeSet)
935         }
936         bal.logf("===")
937         bal.printHistogram(60)
938         bal.logf("===")
939 }
940
941 func (bal *Balancer) printHistogram(hashColumns int) {
942         bal.logf("Replication level distribution (counting N replicas on a single server as N):")
943         maxCount := 0
944         for _, count := range bal.stats.replHistogram {
945                 if maxCount < count {
946                         maxCount = count
947                 }
948         }
949         hashes := strings.Repeat("#", hashColumns)
950         countWidth := 1 + int(math.Log10(float64(maxCount+1)))
951         scaleCount := 10 * float64(hashColumns) / math.Floor(1+10*math.Log10(float64(maxCount+1)))
952         for repl, count := range bal.stats.replHistogram {
953                 nHashes := int(scaleCount * math.Log10(float64(count+1)))
954                 bal.logf("%2d: %*d %s", repl, countWidth, count, hashes[:nHashes])
955         }
956 }
957
958 // CheckSanityLate checks for configuration and runtime errors after
959 // GetCurrentState() and ComputeChangeSets() have finished.
960 //
961 // If it returns an error, it is dangerous to run any Commit methods.
962 func (bal *Balancer) CheckSanityLate() error {
963         if bal.errors != nil {
964                 for _, err := range bal.errors {
965                         bal.logf("deferred error: %v", err)
966                 }
967                 return fmt.Errorf("cannot proceed safely after deferred errors")
968         }
969
970         if bal.collScanned == 0 {
971                 return fmt.Errorf("received zero collections")
972         }
973
974         anyDesired := false
975         bal.BlockStateMap.Apply(func(_ arvados.SizedDigest, blk *BlockState) {
976                 for _, desired := range blk.Desired {
977                         if desired > 0 {
978                                 anyDesired = true
979                                 break
980                         }
981                 }
982         })
983         if !anyDesired {
984                 return fmt.Errorf("zero blocks have desired replication>0")
985         }
986
987         if dr := bal.DefaultReplication; dr < 1 {
988                 return fmt.Errorf("Default replication (%d) is less than 1", dr)
989         }
990
991         // TODO: no two services have identical indexes
992         // TODO: no collisions (same md5, different size)
993         return nil
994 }
995
996 // CommitPulls sends the computed lists of pull requests to the
997 // keepstore servers. This has the effect of increasing replication of
998 // existing blocks that are either underreplicated or poorly
999 // distributed according to rendezvous hashing.
1000 func (bal *Balancer) CommitPulls(c *arvados.Client) error {
1001         defer bal.time("send_pull_lists", "wall clock time to send pull lists")()
1002         return bal.commitAsync(c, "send pull list",
1003                 func(srv *KeepService) error {
1004                         return srv.CommitPulls(c)
1005                 })
1006 }
1007
1008 // CommitTrash sends the computed lists of trash requests to the
1009 // keepstore servers. This has the effect of deleting blocks that are
1010 // overreplicated or unreferenced.
1011 func (bal *Balancer) CommitTrash(c *arvados.Client) error {
1012         defer bal.time("send_trash_lists", "wall clock time to send trash lists")()
1013         return bal.commitAsync(c, "send trash list",
1014                 func(srv *KeepService) error {
1015                         return srv.CommitTrash(c)
1016                 })
1017 }
1018
1019 func (bal *Balancer) commitAsync(c *arvados.Client, label string, f func(srv *KeepService) error) error {
1020         errs := make(chan error)
1021         for _, srv := range bal.KeepServices {
1022                 go func(srv *KeepService) {
1023                         var err error
1024                         defer func() { errs <- err }()
1025                         label := fmt.Sprintf("%s: %v", srv, label)
1026                         err = f(srv)
1027                         if err != nil {
1028                                 err = fmt.Errorf("%s: %v", label, err)
1029                         }
1030                 }(srv)
1031         }
1032         var lastErr error
1033         for range bal.KeepServices {
1034                 if err := <-errs; err != nil {
1035                         bal.logf("%v", err)
1036                         lastErr = err
1037                 }
1038         }
1039         close(errs)
1040         return lastErr
1041 }
1042
1043 func (bal *Balancer) logf(f string, args ...interface{}) {
1044         if bal.Logger != nil {
1045                 bal.Logger.Printf(f, args...)
1046         }
1047 }
1048
1049 func (bal *Balancer) time(name, help string) func() {
1050         observer := bal.Metrics.DurationObserver(name+"_seconds", help)
1051         t0 := time.Now()
1052         bal.Logger.Printf("%s: start", name)
1053         return func() {
1054                 dur := time.Since(t0)
1055                 observer.Observe(dur.Seconds())
1056                 bal.Logger.Printf("%s: took %vs", name, dur.Seconds())
1057         }
1058 }
1059
1060 // Rendezvous hash sort function. Less efficient than sorting on
1061 // precomputed rendezvous hashes, but also rarely used.
1062 func rendezvousLess(i, j string, blkid arvados.SizedDigest) bool {
1063         a := md5.Sum([]byte(string(blkid[:32]) + i))
1064         b := md5.Sum([]byte(string(blkid[:32]) + j))
1065         return bytes.Compare(a[:], b[:]) < 0
1066 }