13427: Fix replication stats reporting for multiple-mounted devices.
[arvados.git] / services / keep-balance / balance.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "crypto/md5"
10         "fmt"
11         "log"
12         "math"
13         "os"
14         "runtime"
15         "sort"
16         "strings"
17         "sync"
18         "time"
19
20         "git.curoverse.com/arvados.git/sdk/go/arvados"
21         "git.curoverse.com/arvados.git/sdk/go/keepclient"
22 )
23
24 // CheckConfig returns an error if anything is wrong with the given
25 // config and runOptions.
26 func CheckConfig(config Config, runOptions RunOptions) error {
27         if len(config.KeepServiceList.Items) > 0 && config.KeepServiceTypes != nil {
28                 return fmt.Errorf("cannot specify both KeepServiceList and KeepServiceTypes in config")
29         }
30         if !runOptions.Once && config.RunPeriod == arvados.Duration(0) {
31                 return fmt.Errorf("you must either use the -once flag, or specify RunPeriod in config")
32         }
33         return nil
34 }
35
36 // Balancer compares the contents of keepstore servers with the
37 // collections stored in Arvados, and issues pull/trash requests
38 // needed to get (closer to) the optimal data layout.
39 //
40 // In the optimal data layout: every data block referenced by a
41 // collection is replicated at least as many times as desired by the
42 // collection; there are no unreferenced data blocks older than
43 // BlobSignatureTTL; and all N existing replicas of a given data block
44 // are in the N best positions in rendezvous probe order.
45 type Balancer struct {
46         *BlockStateMap
47         KeepServices       map[string]*KeepService
48         DefaultReplication int
49         Logger             *log.Logger
50         Dumper             *log.Logger
51         MinMtime           int64
52
53         classes       []string
54         mounts        int
55         mountsByClass map[string]map[*KeepMount]bool
56         collScanned   int
57         serviceRoots  map[string]string
58         errors        []error
59         stats         balancerStats
60         mutex         sync.Mutex
61 }
62
63 // Run performs a balance operation using the given config and
64 // runOptions, and returns RunOptions suitable for passing to a
65 // subsequent balance operation.
66 //
67 // Run should only be called once on a given Balancer object.
68 //
69 // Typical usage:
70 //
71 //   runOptions, err = (&Balancer{}).Run(config, runOptions)
72 func (bal *Balancer) Run(config Config, runOptions RunOptions) (nextRunOptions RunOptions, err error) {
73         nextRunOptions = runOptions
74
75         bal.Dumper = runOptions.Dumper
76         bal.Logger = runOptions.Logger
77         if bal.Logger == nil {
78                 bal.Logger = log.New(os.Stderr, "", log.LstdFlags)
79         }
80
81         defer timeMe(bal.Logger, "Run")()
82
83         if len(config.KeepServiceList.Items) > 0 {
84                 err = bal.SetKeepServices(config.KeepServiceList)
85         } else {
86                 err = bal.DiscoverKeepServices(&config.Client, config.KeepServiceTypes)
87         }
88         if err != nil {
89                 return
90         }
91
92         for _, srv := range bal.KeepServices {
93                 err = srv.discoverMounts(&config.Client)
94                 if err != nil {
95                         return
96                 }
97         }
98         bal.dedupDevices()
99
100         if err = bal.CheckSanityEarly(&config.Client); err != nil {
101                 return
102         }
103         rs := bal.rendezvousState()
104         if runOptions.CommitTrash && rs != runOptions.SafeRendezvousState {
105                 if runOptions.SafeRendezvousState != "" {
106                         bal.logf("notice: KeepServices list has changed since last run")
107                 }
108                 bal.logf("clearing existing trash lists, in case the new rendezvous order differs from previous run")
109                 if err = bal.ClearTrashLists(&config.Client); err != nil {
110                         return
111                 }
112                 // The current rendezvous state becomes "safe" (i.e.,
113                 // OK to compute changes for that state without
114                 // clearing existing trash lists) only now, after we
115                 // succeed in clearing existing trash lists.
116                 nextRunOptions.SafeRendezvousState = rs
117         }
118         if err = bal.GetCurrentState(&config.Client, config.CollectionBatchSize, config.CollectionBuffers); err != nil {
119                 return
120         }
121         bal.ComputeChangeSets()
122         bal.PrintStatistics()
123         if err = bal.CheckSanityLate(); err != nil {
124                 return
125         }
126         if runOptions.CommitPulls {
127                 err = bal.CommitPulls(&config.Client)
128                 if err != nil {
129                         // Skip trash if we can't pull. (Too cautious?)
130                         return
131                 }
132         }
133         if runOptions.CommitTrash {
134                 err = bal.CommitTrash(&config.Client)
135         }
136         return
137 }
138
139 // SetKeepServices sets the list of KeepServices to operate on.
140 func (bal *Balancer) SetKeepServices(srvList arvados.KeepServiceList) error {
141         bal.KeepServices = make(map[string]*KeepService)
142         for _, srv := range srvList.Items {
143                 bal.KeepServices[srv.UUID] = &KeepService{
144                         KeepService: srv,
145                         ChangeSet:   &ChangeSet{},
146                 }
147         }
148         return nil
149 }
150
151 // DiscoverKeepServices sets the list of KeepServices by calling the
152 // API to get a list of all services, and selecting the ones whose
153 // ServiceType is in okTypes.
154 func (bal *Balancer) DiscoverKeepServices(c *arvados.Client, okTypes []string) error {
155         bal.KeepServices = make(map[string]*KeepService)
156         ok := make(map[string]bool)
157         for _, t := range okTypes {
158                 ok[t] = true
159         }
160         return c.EachKeepService(func(srv arvados.KeepService) error {
161                 if ok[srv.ServiceType] {
162                         bal.KeepServices[srv.UUID] = &KeepService{
163                                 KeepService: srv,
164                                 ChangeSet:   &ChangeSet{},
165                         }
166                 } else {
167                         bal.logf("skipping %v with service type %q", srv.UUID, srv.ServiceType)
168                 }
169                 return nil
170         })
171 }
172
173 func (bal *Balancer) dedupDevices() {
174         rwdev := map[string]*KeepService{}
175         for _, srv := range bal.KeepServices {
176                 for _, mnt := range srv.mounts {
177                         if !mnt.ReadOnly && mnt.DeviceID != "" {
178                                 rwdev[mnt.DeviceID] = srv
179                         }
180                 }
181         }
182         // Drop the readonly mounts whose device is mounted RW
183         // elsewhere.
184         for _, srv := range bal.KeepServices {
185                 var dedup []*KeepMount
186                 for _, mnt := range srv.mounts {
187                         if mnt.ReadOnly && rwdev[mnt.DeviceID] != nil {
188                                 bal.logf("skipping srv %s readonly mount %q because same device %q is mounted read-write on srv %s", srv, mnt.UUID, mnt.DeviceID, rwdev[mnt.DeviceID])
189                         } else {
190                                 dedup = append(dedup, mnt)
191                         }
192                 }
193                 srv.mounts = dedup
194         }
195 }
196
197 // CheckSanityEarly checks for configuration and runtime errors that
198 // can be detected before GetCurrentState() and ComputeChangeSets()
199 // are called.
200 //
201 // If it returns an error, it is pointless to run GetCurrentState or
202 // ComputeChangeSets: after doing so, the statistics would be
203 // meaningless and it would be dangerous to run any Commit methods.
204 func (bal *Balancer) CheckSanityEarly(c *arvados.Client) error {
205         u, err := c.CurrentUser()
206         if err != nil {
207                 return fmt.Errorf("CurrentUser(): %v", err)
208         }
209         if !u.IsActive || !u.IsAdmin {
210                 return fmt.Errorf("current user (%s) is not an active admin user", u.UUID)
211         }
212         for _, srv := range bal.KeepServices {
213                 if srv.ServiceType == "proxy" {
214                         return fmt.Errorf("config error: %s: proxy servers cannot be balanced", srv)
215                 }
216         }
217         return nil
218 }
219
220 // rendezvousState returns a fingerprint (e.g., a sorted list of
221 // UUID+host+port) of the current set of keep services.
222 func (bal *Balancer) rendezvousState() string {
223         srvs := make([]string, 0, len(bal.KeepServices))
224         for _, srv := range bal.KeepServices {
225                 srvs = append(srvs, srv.String())
226         }
227         sort.Strings(srvs)
228         return strings.Join(srvs, "; ")
229 }
230
231 // ClearTrashLists sends an empty trash list to each keep
232 // service. Calling this before GetCurrentState avoids races.
233 //
234 // When a block appears in an index, we assume that replica will still
235 // exist after we delete other replicas on other servers. However,
236 // it's possible that a previous rebalancing operation made different
237 // decisions (e.g., servers were added/removed, and rendezvous order
238 // changed). In this case, the replica might already be on that
239 // server's trash list, and it might be deleted before we send a
240 // replacement trash list.
241 //
242 // We avoid this problem if we clear all trash lists before getting
243 // indexes. (We also assume there is only one rebalancing process
244 // running at a time.)
245 func (bal *Balancer) ClearTrashLists(c *arvados.Client) error {
246         for _, srv := range bal.KeepServices {
247                 srv.ChangeSet = &ChangeSet{}
248         }
249         return bal.CommitTrash(c)
250 }
251
252 // GetCurrentState determines the current replication state, and the
253 // desired replication level, for every block that is either
254 // retrievable or referenced.
255 //
256 // It determines the current replication state by reading the block index
257 // from every known Keep service.
258 //
259 // It determines the desired replication level by retrieving all
260 // collection manifests in the database (API server).
261 //
262 // It encodes the resulting information in BlockStateMap.
263 func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) error {
264         defer timeMe(bal.Logger, "GetCurrentState")()
265         bal.BlockStateMap = NewBlockStateMap()
266
267         dd, err := c.DiscoveryDocument()
268         if err != nil {
269                 return err
270         }
271         bal.DefaultReplication = dd.DefaultCollectionReplication
272         bal.MinMtime = time.Now().UnixNano() - dd.BlobSignatureTTL*1e9
273
274         errs := make(chan error, 2+len(bal.KeepServices))
275         wg := sync.WaitGroup{}
276
277         // Start one goroutine for each KeepService: retrieve the
278         // index, and add the returned blocks to BlockStateMap.
279         for _, srv := range bal.KeepServices {
280                 wg.Add(1)
281                 go func(srv *KeepService) {
282                         defer wg.Done()
283                         bal.logf("%s: retrieve indexes", srv)
284                         for _, mount := range srv.mounts {
285                                 bal.logf("%s: retrieve index", mount)
286                                 idx, err := srv.IndexMount(c, mount.UUID, "")
287                                 if err != nil {
288                                         errs <- fmt.Errorf("%s: retrieve index: %v", mount, err)
289                                         return
290                                 }
291                                 if len(errs) > 0 {
292                                         // Some other goroutine encountered an
293                                         // error -- any further effort here
294                                         // will be wasted.
295                                         return
296                                 }
297                                 bal.logf("%s: add %d replicas to map", mount, len(idx))
298                                 bal.BlockStateMap.AddReplicas(mount, idx)
299                                 bal.logf("%s: done", mount)
300                         }
301                         bal.logf("%s: done", srv)
302                 }(srv)
303         }
304
305         // collQ buffers incoming collections so we can start fetching
306         // the next page without waiting for the current page to
307         // finish processing.
308         collQ := make(chan arvados.Collection, bufs)
309
310         // Start a goroutine to process collections. (We could use a
311         // worker pool here, but even with a single worker we already
312         // process collections much faster than we can retrieve them.)
313         wg.Add(1)
314         go func() {
315                 defer wg.Done()
316                 for coll := range collQ {
317                         err := bal.addCollection(coll)
318                         if err != nil {
319                                 errs <- err
320                                 for range collQ {
321                                 }
322                                 return
323                         }
324                         bal.collScanned++
325                 }
326         }()
327
328         // Start a goroutine to retrieve all collections from the
329         // Arvados database and send them to collQ for processing.
330         wg.Add(1)
331         go func() {
332                 defer wg.Done()
333                 err = EachCollection(c, pageSize,
334                         func(coll arvados.Collection) error {
335                                 collQ <- coll
336                                 if len(errs) > 0 {
337                                         // some other GetCurrentState
338                                         // error happened: no point
339                                         // getting any more
340                                         // collections.
341                                         return fmt.Errorf("")
342                                 }
343                                 return nil
344                         }, func(done, total int) {
345                                 bal.logf("collections: %d/%d", done, total)
346                         })
347                 close(collQ)
348                 if err != nil {
349                         errs <- err
350                 }
351         }()
352
353         wg.Wait()
354         if len(errs) > 0 {
355                 return <-errs
356         }
357         return nil
358 }
359
360 func (bal *Balancer) addCollection(coll arvados.Collection) error {
361         blkids, err := coll.SizedDigests()
362         if err != nil {
363                 bal.mutex.Lock()
364                 bal.errors = append(bal.errors, fmt.Errorf("%v: %v", coll.UUID, err))
365                 bal.mutex.Unlock()
366                 return nil
367         }
368         repl := bal.DefaultReplication
369         if coll.ReplicationDesired != nil {
370                 repl = *coll.ReplicationDesired
371         }
372         debugf("%v: %d block x%d", coll.UUID, len(blkids), repl)
373         bal.BlockStateMap.IncreaseDesired(coll.StorageClassesDesired, repl, blkids)
374         return nil
375 }
376
377 // ComputeChangeSets compares, for each known block, the current and
378 // desired replication states. If it is possible to get closer to the
379 // desired state by copying or deleting blocks, it adds those changes
380 // to the relevant KeepServices' ChangeSets.
381 //
382 // It does not actually apply any of the computed changes.
383 func (bal *Balancer) ComputeChangeSets() {
384         // This just calls balanceBlock() once for each block, using a
385         // pool of worker goroutines.
386         defer timeMe(bal.Logger, "ComputeChangeSets")()
387         bal.setupLookupTables()
388
389         type balanceTask struct {
390                 blkid arvados.SizedDigest
391                 blk   *BlockState
392         }
393         workers := runtime.GOMAXPROCS(-1)
394         todo := make(chan balanceTask, workers)
395         go func() {
396                 bal.BlockStateMap.Apply(func(blkid arvados.SizedDigest, blk *BlockState) {
397                         todo <- balanceTask{
398                                 blkid: blkid,
399                                 blk:   blk,
400                         }
401                 })
402                 close(todo)
403         }()
404         results := make(chan balanceResult, workers)
405         go func() {
406                 var wg sync.WaitGroup
407                 for i := 0; i < workers; i++ {
408                         wg.Add(1)
409                         go func() {
410                                 for work := range todo {
411                                         results <- bal.balanceBlock(work.blkid, work.blk)
412                                 }
413                                 wg.Done()
414                         }()
415                 }
416                 wg.Wait()
417                 close(results)
418         }()
419         bal.collectStatistics(results)
420 }
421
422 func (bal *Balancer) setupLookupTables() {
423         bal.serviceRoots = make(map[string]string)
424         bal.classes = []string{"default"}
425         bal.mountsByClass = map[string]map[*KeepMount]bool{"default": {}}
426         bal.mounts = 0
427         for _, srv := range bal.KeepServices {
428                 bal.serviceRoots[srv.UUID] = srv.UUID
429                 for _, mnt := range srv.mounts {
430                         bal.mounts++
431
432                         // All mounts on a read-only service are
433                         // effectively read-only.
434                         mnt.ReadOnly = mnt.ReadOnly || srv.ReadOnly
435
436                         if len(mnt.StorageClasses) == 0 {
437                                 bal.mountsByClass["default"][mnt] = true
438                                 continue
439                         }
440                         for _, class := range mnt.StorageClasses {
441                                 if mbc := bal.mountsByClass[class]; mbc == nil {
442                                         bal.classes = append(bal.classes, class)
443                                         bal.mountsByClass[class] = map[*KeepMount]bool{mnt: true}
444                                 } else {
445                                         mbc[mnt] = true
446                                 }
447                         }
448                 }
449         }
450         // Consider classes in lexicographic order to avoid flapping
451         // between balancing runs.  The outcome of the "prefer a mount
452         // we're already planning to use for a different storage
453         // class" case in balanceBlock depends on the order classes
454         // are considered.
455         sort.Strings(bal.classes)
456 }
457
458 const (
459         changeStay = iota
460         changePull
461         changeTrash
462         changeNone
463 )
464
465 var changeName = map[int]string{
466         changeStay:  "stay",
467         changePull:  "pull",
468         changeTrash: "trash",
469         changeNone:  "none",
470 }
471
472 type balanceResult struct {
473         blk        *BlockState
474         blkid      arvados.SizedDigest
475         have       int
476         want       int
477         classState map[string]balancedBlockState
478 }
479
480 // balanceBlock compares current state to desired state for a single
481 // block, and makes the appropriate ChangeSet calls.
482 func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) balanceResult {
483         debugf("balanceBlock: %v %+v", blkid, blk)
484
485         type slot struct {
486                 mnt  *KeepMount // never nil
487                 repl *Replica   // replica already stored here (or nil)
488                 want bool       // we should pull/leave a replica here
489         }
490
491         // Build a list of all slots (one per mounted volume).
492         slots := make([]slot, 0, bal.mounts)
493         for _, srv := range bal.KeepServices {
494                 for _, mnt := range srv.mounts {
495                         var repl *Replica
496                         for r := range blk.Replicas {
497                                 if blk.Replicas[r].KeepMount == mnt {
498                                         repl = &blk.Replicas[r]
499                                 }
500                         }
501                         // Initial value of "want" is "have, and can't
502                         // delete". These untrashable replicas get
503                         // prioritized when sorting slots: otherwise,
504                         // non-optimal readonly copies would cause us
505                         // to overreplicate.
506                         slots = append(slots, slot{
507                                 mnt:  mnt,
508                                 repl: repl,
509                                 want: repl != nil && (mnt.ReadOnly || repl.Mtime >= bal.MinMtime),
510                         })
511                 }
512         }
513
514         uuids := keepclient.NewRootSorter(bal.serviceRoots, string(blkid[:32])).GetSortedRoots()
515         srvRendezvous := make(map[*KeepService]int, len(uuids))
516         for i, uuid := range uuids {
517                 srv := bal.KeepServices[uuid]
518                 srvRendezvous[srv] = i
519         }
520
521         // Below we set underreplicated=true if we find any storage
522         // class that's currently underreplicated -- in that case we
523         // won't want to trash any replicas.
524         underreplicated := false
525
526         classState := make(map[string]balancedBlockState, len(bal.classes))
527         unsafeToDelete := make(map[int64]bool, len(slots))
528         for _, class := range bal.classes {
529                 desired := blk.Desired[class]
530
531                 countedDev := map[string]bool{}
532                 have := 0
533                 for _, slot := range slots {
534                         if slot.repl != nil && bal.mountsByClass[class][slot.mnt] && !countedDev[slot.mnt.DeviceID] {
535                                 have++
536                                 if slot.mnt.DeviceID != "" {
537                                         countedDev[slot.mnt.DeviceID] = true
538                                 }
539                         }
540                 }
541                 classState[class] = balancedBlockState{
542                         desired: desired,
543                         surplus: have - desired,
544                 }
545
546                 if desired == 0 {
547                         continue
548                 }
549
550                 // Sort the slots by desirability.
551                 sort.Slice(slots, func(i, j int) bool {
552                         si, sj := slots[i], slots[j]
553                         if classi, classj := bal.mountsByClass[class][si.mnt], bal.mountsByClass[class][sj.mnt]; classi != classj {
554                                 // Prefer a mount that satisfies the
555                                 // desired class.
556                                 return bal.mountsByClass[class][si.mnt]
557                         } else if wanti, wantj := si.want, si.want; wanti != wantj {
558                                 // Prefer a mount that will have a
559                                 // replica no matter what we do here
560                                 // -- either because it already has an
561                                 // untrashable replica, or because we
562                                 // already need it to satisfy a
563                                 // different storage class.
564                                 return slots[i].want
565                         } else if orderi, orderj := srvRendezvous[si.mnt.KeepService], srvRendezvous[sj.mnt.KeepService]; orderi != orderj {
566                                 // Prefer a better rendezvous
567                                 // position.
568                                 return orderi < orderj
569                         } else if repli, replj := si.repl != nil, sj.repl != nil; repli != replj {
570                                 // Prefer a mount that already has a
571                                 // replica.
572                                 return repli
573                         } else {
574                                 // If pull/trash turns out to be
575                                 // needed, distribute the
576                                 // new/remaining replicas uniformly
577                                 // across qualifying mounts on a given
578                                 // server.
579                                 return rendezvousLess(si.mnt.DeviceID, sj.mnt.DeviceID, blkid)
580                         }
581                 })
582
583                 // Servers/mounts/devices (with or without existing
584                 // replicas) that are part of the best achievable
585                 // layout for this storage class.
586                 wantSrv := map[*KeepService]bool{}
587                 wantMnt := map[*KeepMount]bool{}
588                 wantDev := map[string]bool{}
589                 // Positions (with existing replicas) that have been
590                 // protected (via unsafeToDelete) to ensure we don't
591                 // reduce replication below desired level when
592                 // trashing replicas that aren't optimal positions for
593                 // any storage class.
594                 protMnt := map[*KeepMount]bool{}
595
596                 // trySlot tries using a slot to meet requirements,
597                 // and returns true if all requirements are met.
598                 trySlot := func(i int) bool {
599                         slot := slots[i]
600                         if wantDev[slot.mnt.DeviceID] {
601                                 // Already allocated a replica to this
602                                 // backend device, possibly on a
603                                 // different server.
604                                 return false
605                         }
606                         if len(protMnt) < desired && slot.repl != nil {
607                                 unsafeToDelete[slot.repl.Mtime] = true
608                                 protMnt[slot.mnt] = true
609                         }
610                         if len(wantMnt) < desired && (slot.repl != nil || !slot.mnt.ReadOnly) {
611                                 slots[i].want = true
612                                 wantSrv[slot.mnt.KeepService] = true
613                                 wantMnt[slot.mnt] = true
614                                 if slot.mnt.DeviceID != "" {
615                                         wantDev[slot.mnt.DeviceID] = true
616                                 }
617                         }
618                         return len(protMnt) >= desired && len(wantMnt) >= desired
619                 }
620
621                 // First try to achieve desired replication without
622                 // using the same server twice.
623                 done := false
624                 for i := 0; i < len(slots) && !done; i++ {
625                         if !wantSrv[slots[i].mnt.KeepService] {
626                                 done = trySlot(i)
627                         }
628                 }
629
630                 // If that didn't suffice, do another pass without the
631                 // "distinct services" restriction. (Achieving the
632                 // desired volume replication on fewer than the
633                 // desired number of services is better than
634                 // underreplicating.)
635                 for i := 0; i < len(slots) && !done; i++ {
636                         done = trySlot(i)
637                 }
638
639                 if !underreplicated {
640                         safe := 0
641                         for _, slot := range slots {
642                                 if slot.repl == nil || !bal.mountsByClass[class][slot.mnt] {
643                                         continue
644                                 }
645                                 if safe++; safe >= desired {
646                                         break
647                                 }
648                         }
649                         underreplicated = safe < desired
650                 }
651
652                 // set the unachievable flag if there aren't enough
653                 // slots offering the relevant storage class. (This is
654                 // as easy as checking slots[desired] because we
655                 // already sorted the qualifying slots to the front.)
656                 if desired >= len(slots) || !bal.mountsByClass[class][slots[desired].mnt] {
657                         cs := classState[class]
658                         cs.unachievable = true
659                         classState[class] = cs
660                 }
661
662                 // Avoid deleting wanted replicas from devices that
663                 // are mounted on multiple servers -- even if they
664                 // haven't already been added to unsafeToDelete
665                 // because the servers report different Mtimes.
666                 for _, slot := range slots {
667                         if slot.repl != nil && wantDev[slot.mnt.DeviceID] {
668                                 unsafeToDelete[slot.repl.Mtime] = true
669                         }
670                 }
671         }
672
673         // TODO: If multiple replicas are trashable, prefer the oldest
674         // replica that doesn't have a timestamp collision with
675         // others.
676
677         countedDev := map[string]bool{}
678         var have, want int
679         for _, slot := range slots {
680                 if slot.want {
681                         want++
682                 }
683                 if slot.repl != nil && !countedDev[slot.mnt.DeviceID] {
684                         have++
685                         if slot.mnt.DeviceID != "" {
686                                 countedDev[slot.mnt.DeviceID] = true
687                         }
688                 }
689         }
690
691         var changes []string
692         for _, slot := range slots {
693                 // TODO: request a Touch if Mtime is duplicated.
694                 var change int
695                 switch {
696                 case !underreplicated && slot.repl != nil && !slot.want && !unsafeToDelete[slot.repl.Mtime]:
697                         slot.mnt.KeepService.AddTrash(Trash{
698                                 SizedDigest: blkid,
699                                 Mtime:       slot.repl.Mtime,
700                                 From:        slot.mnt,
701                         })
702                         change = changeTrash
703                 case len(blk.Replicas) == 0:
704                         change = changeNone
705                 case slot.repl == nil && slot.want && !slot.mnt.ReadOnly:
706                         slot.mnt.KeepService.AddPull(Pull{
707                                 SizedDigest: blkid,
708                                 From:        blk.Replicas[0].KeepMount.KeepService,
709                                 To:          slot.mnt,
710                         })
711                         change = changePull
712                 default:
713                         change = changeStay
714                 }
715                 if bal.Dumper != nil {
716                         var mtime int64
717                         if slot.repl != nil {
718                                 mtime = slot.repl.Mtime
719                         }
720                         srv := slot.mnt.KeepService
721                         changes = append(changes, fmt.Sprintf("%s:%d/%s=%s,%d", srv.ServiceHost, srv.ServicePort, slot.mnt.UUID, changeName[change], mtime))
722                 }
723         }
724         if bal.Dumper != nil {
725                 bal.Dumper.Printf("%s have=%d want=%v %s", blkid, have, want, strings.Join(changes, " "))
726         }
727         return balanceResult{
728                 blk:        blk,
729                 blkid:      blkid,
730                 have:       have,
731                 want:       want,
732                 classState: classState,
733         }
734 }
735
736 type blocksNBytes struct {
737         replicas int
738         blocks   int
739         bytes    int64
740 }
741
742 func (bb blocksNBytes) String() string {
743         return fmt.Sprintf("%d replicas (%d blocks, %d bytes)", bb.replicas, bb.blocks, bb.bytes)
744 }
745
746 type balancerStats struct {
747         lost          blocksNBytes
748         overrep       blocksNBytes
749         unref         blocksNBytes
750         garbage       blocksNBytes
751         underrep      blocksNBytes
752         unachievable  blocksNBytes
753         justright     blocksNBytes
754         desired       blocksNBytes
755         current       blocksNBytes
756         pulls         int
757         trashes       int
758         replHistogram []int
759         classStats    map[string]replicationStats
760 }
761
762 type replicationStats struct {
763         desired      blocksNBytes
764         surplus      blocksNBytes
765         short        blocksNBytes
766         unachievable blocksNBytes
767 }
768
769 type balancedBlockState struct {
770         desired      int
771         surplus      int
772         unachievable bool
773 }
774
775 func (bal *Balancer) collectStatistics(results <-chan balanceResult) {
776         var s balancerStats
777         s.replHistogram = make([]int, 2)
778         s.classStats = make(map[string]replicationStats, len(bal.classes))
779         for result := range results {
780                 surplus := result.have - result.want
781                 bytes := result.blkid.Size()
782
783                 for class, state := range result.classState {
784                         cs := s.classStats[class]
785                         if state.unachievable {
786                                 cs.unachievable.blocks++
787                                 cs.unachievable.bytes += bytes
788                         }
789                         if state.desired > 0 {
790                                 cs.desired.replicas += state.desired
791                                 cs.desired.blocks++
792                                 cs.desired.bytes += bytes * int64(state.desired)
793                         }
794                         if state.surplus > 0 {
795                                 cs.surplus.replicas += state.surplus
796                                 cs.surplus.blocks++
797                                 cs.surplus.bytes += bytes * int64(state.surplus)
798                         } else if state.surplus < 0 {
799                                 cs.short.replicas += -state.surplus
800                                 cs.short.blocks++
801                                 cs.short.bytes += bytes * int64(-state.surplus)
802                         }
803                         s.classStats[class] = cs
804                 }
805
806                 switch {
807                 case result.have == 0 && result.want > 0:
808                         s.lost.replicas -= surplus
809                         s.lost.blocks++
810                         s.lost.bytes += bytes * int64(-surplus)
811                 case surplus < 0:
812                         s.underrep.replicas -= surplus
813                         s.underrep.blocks++
814                         s.underrep.bytes += bytes * int64(-surplus)
815                 case surplus > 0 && result.want == 0:
816                         counter := &s.garbage
817                         for _, r := range result.blk.Replicas {
818                                 if r.Mtime >= bal.MinMtime {
819                                         counter = &s.unref
820                                         break
821                                 }
822                         }
823                         counter.replicas += surplus
824                         counter.blocks++
825                         counter.bytes += bytes * int64(surplus)
826                 case surplus > 0:
827                         s.overrep.replicas += surplus
828                         s.overrep.blocks++
829                         s.overrep.bytes += bytes * int64(len(result.blk.Replicas)-result.want)
830                 default:
831                         s.justright.replicas += result.want
832                         s.justright.blocks++
833                         s.justright.bytes += bytes * int64(result.want)
834                 }
835
836                 if result.want > 0 {
837                         s.desired.replicas += result.want
838                         s.desired.blocks++
839                         s.desired.bytes += bytes * int64(result.want)
840                 }
841                 if len(result.blk.Replicas) > 0 {
842                         s.current.replicas += len(result.blk.Replicas)
843                         s.current.blocks++
844                         s.current.bytes += bytes * int64(len(result.blk.Replicas))
845                 }
846
847                 for len(s.replHistogram) <= len(result.blk.Replicas) {
848                         s.replHistogram = append(s.replHistogram, 0)
849                 }
850                 s.replHistogram[len(result.blk.Replicas)]++
851         }
852         for _, srv := range bal.KeepServices {
853                 s.pulls += len(srv.ChangeSet.Pulls)
854                 s.trashes += len(srv.ChangeSet.Trashes)
855         }
856         bal.stats = s
857 }
858
859 // PrintStatistics writes statistics about the computed changes to
860 // bal.Logger. It should not be called until ComputeChangeSets has
861 // finished.
862 func (bal *Balancer) PrintStatistics() {
863         bal.logf("===")
864         bal.logf("%s lost (0=have<want)", bal.stats.lost)
865         bal.logf("%s underreplicated (0<have<want)", bal.stats.underrep)
866         bal.logf("%s just right (have=want)", bal.stats.justright)
867         bal.logf("%s overreplicated (have>want>0)", bal.stats.overrep)
868         bal.logf("%s unreferenced (have>want=0, new)", bal.stats.unref)
869         bal.logf("%s garbage (have>want=0, old)", bal.stats.garbage)
870         for _, class := range bal.classes {
871                 cs := bal.stats.classStats[class]
872                 bal.logf("===")
873                 bal.logf("storage class %q: %s desired", class, cs.desired)
874                 bal.logf("storage class %q: %s short", class, cs.short)
875                 bal.logf("storage class %q: %s surplus", class, cs.surplus)
876                 bal.logf("storage class %q: %s unachievable", class, cs.unachievable)
877         }
878         bal.logf("===")
879         bal.logf("%s total commitment (excluding unreferenced)", bal.stats.desired)
880         bal.logf("%s total usage", bal.stats.current)
881         bal.logf("===")
882         for _, srv := range bal.KeepServices {
883                 bal.logf("%s: %v\n", srv, srv.ChangeSet)
884         }
885         bal.logf("===")
886         bal.printHistogram(60)
887         bal.logf("===")
888 }
889
890 func (bal *Balancer) printHistogram(hashColumns int) {
891         bal.logf("Replication level distribution (counting N replicas on a single server as N):")
892         maxCount := 0
893         for _, count := range bal.stats.replHistogram {
894                 if maxCount < count {
895                         maxCount = count
896                 }
897         }
898         hashes := strings.Repeat("#", hashColumns)
899         countWidth := 1 + int(math.Log10(float64(maxCount+1)))
900         scaleCount := 10 * float64(hashColumns) / math.Floor(1+10*math.Log10(float64(maxCount+1)))
901         for repl, count := range bal.stats.replHistogram {
902                 nHashes := int(scaleCount * math.Log10(float64(count+1)))
903                 bal.logf("%2d: %*d %s", repl, countWidth, count, hashes[:nHashes])
904         }
905 }
906
907 // CheckSanityLate checks for configuration and runtime errors after
908 // GetCurrentState() and ComputeChangeSets() have finished.
909 //
910 // If it returns an error, it is dangerous to run any Commit methods.
911 func (bal *Balancer) CheckSanityLate() error {
912         if bal.errors != nil {
913                 for _, err := range bal.errors {
914                         bal.logf("deferred error: %v", err)
915                 }
916                 return fmt.Errorf("cannot proceed safely after deferred errors")
917         }
918
919         if bal.collScanned == 0 {
920                 return fmt.Errorf("received zero collections")
921         }
922
923         anyDesired := false
924         bal.BlockStateMap.Apply(func(_ arvados.SizedDigest, blk *BlockState) {
925                 for _, desired := range blk.Desired {
926                         if desired > 0 {
927                                 anyDesired = true
928                                 break
929                         }
930                 }
931         })
932         if !anyDesired {
933                 return fmt.Errorf("zero blocks have desired replication>0")
934         }
935
936         if dr := bal.DefaultReplication; dr < 1 {
937                 return fmt.Errorf("Default replication (%d) is less than 1", dr)
938         }
939
940         // TODO: no two services have identical indexes
941         // TODO: no collisions (same md5, different size)
942         return nil
943 }
944
945 // CommitPulls sends the computed lists of pull requests to the
946 // keepstore servers. This has the effect of increasing replication of
947 // existing blocks that are either underreplicated or poorly
948 // distributed according to rendezvous hashing.
949 func (bal *Balancer) CommitPulls(c *arvados.Client) error {
950         return bal.commitAsync(c, "send pull list",
951                 func(srv *KeepService) error {
952                         return srv.CommitPulls(c)
953                 })
954 }
955
956 // CommitTrash sends the computed lists of trash requests to the
957 // keepstore servers. This has the effect of deleting blocks that are
958 // overreplicated or unreferenced.
959 func (bal *Balancer) CommitTrash(c *arvados.Client) error {
960         return bal.commitAsync(c, "send trash list",
961                 func(srv *KeepService) error {
962                         return srv.CommitTrash(c)
963                 })
964 }
965
966 func (bal *Balancer) commitAsync(c *arvados.Client, label string, f func(srv *KeepService) error) error {
967         errs := make(chan error)
968         for _, srv := range bal.KeepServices {
969                 go func(srv *KeepService) {
970                         var err error
971                         defer func() { errs <- err }()
972                         label := fmt.Sprintf("%s: %v", srv, label)
973                         defer timeMe(bal.Logger, label)()
974                         err = f(srv)
975                         if err != nil {
976                                 err = fmt.Errorf("%s: %v", label, err)
977                         }
978                 }(srv)
979         }
980         var lastErr error
981         for range bal.KeepServices {
982                 if err := <-errs; err != nil {
983                         bal.logf("%v", err)
984                         lastErr = err
985                 }
986         }
987         close(errs)
988         return lastErr
989 }
990
991 func (bal *Balancer) logf(f string, args ...interface{}) {
992         if bal.Logger != nil {
993                 bal.Logger.Printf(f, args...)
994         }
995 }
996
997 // Rendezvous hash sort function. Less efficient than sorting on
998 // precomputed rendezvous hashes, but also rarely used.
999 func rendezvousLess(i, j string, blkid arvados.SizedDigest) bool {
1000         a := md5.Sum([]byte(string(blkid[:32]) + i))
1001         b := md5.Sum([]byte(string(blkid[:32]) + j))
1002         return bytes.Compare(a[:], b[:]) < 0
1003 }