Merge branch 'master' of git.curoverse.com:arvados into 13429-cwl-runner-storage...
[arvados.git] / services / keep-balance / balance.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "crypto/md5"
10         "fmt"
11         "log"
12         "math"
13         "os"
14         "runtime"
15         "sort"
16         "strings"
17         "sync"
18         "time"
19
20         "git.curoverse.com/arvados.git/sdk/go/arvados"
21         "git.curoverse.com/arvados.git/sdk/go/keepclient"
22 )
23
24 // CheckConfig returns an error if anything is wrong with the given
25 // config and runOptions.
26 func CheckConfig(config Config, runOptions RunOptions) error {
27         if len(config.KeepServiceList.Items) > 0 && config.KeepServiceTypes != nil {
28                 return fmt.Errorf("cannot specify both KeepServiceList and KeepServiceTypes in config")
29         }
30         if !runOptions.Once && config.RunPeriod == arvados.Duration(0) {
31                 return fmt.Errorf("you must either use the -once flag, or specify RunPeriod in config")
32         }
33         return nil
34 }
35
36 // Balancer compares the contents of keepstore servers with the
37 // collections stored in Arvados, and issues pull/trash requests
38 // needed to get (closer to) the optimal data layout.
39 //
40 // In the optimal data layout: every data block referenced by a
41 // collection is replicated at least as many times as desired by the
42 // collection; there are no unreferenced data blocks older than
43 // BlobSignatureTTL; and all N existing replicas of a given data block
44 // are in the N best positions in rendezvous probe order.
45 type Balancer struct {
46         *BlockStateMap
47         KeepServices       map[string]*KeepService
48         DefaultReplication int
49         Logger             *log.Logger
50         Dumper             *log.Logger
51         MinMtime           int64
52
53         classes       []string
54         mounts        int
55         mountsByClass map[string]map[*KeepMount]bool
56         collScanned   int
57         serviceRoots  map[string]string
58         errors        []error
59         stats         balancerStats
60         mutex         sync.Mutex
61 }
62
63 // Run performs a balance operation using the given config and
64 // runOptions, and returns RunOptions suitable for passing to a
65 // subsequent balance operation.
66 //
67 // Run should only be called once on a given Balancer object.
68 //
69 // Typical usage:
70 //
71 //   runOptions, err = (&Balancer{}).Run(config, runOptions)
72 func (bal *Balancer) Run(config Config, runOptions RunOptions) (nextRunOptions RunOptions, err error) {
73         nextRunOptions = runOptions
74
75         bal.Dumper = runOptions.Dumper
76         bal.Logger = runOptions.Logger
77         if bal.Logger == nil {
78                 bal.Logger = log.New(os.Stderr, "", log.LstdFlags)
79         }
80
81         defer timeMe(bal.Logger, "Run")()
82
83         if len(config.KeepServiceList.Items) > 0 {
84                 err = bal.SetKeepServices(config.KeepServiceList)
85         } else {
86                 err = bal.DiscoverKeepServices(&config.Client, config.KeepServiceTypes)
87         }
88         if err != nil {
89                 return
90         }
91
92         for _, srv := range bal.KeepServices {
93                 err = srv.discoverMounts(&config.Client)
94                 if err != nil {
95                         return
96                 }
97         }
98         bal.dedupDevices()
99
100         if err = bal.CheckSanityEarly(&config.Client); err != nil {
101                 return
102         }
103         rs := bal.rendezvousState()
104         if runOptions.CommitTrash && rs != runOptions.SafeRendezvousState {
105                 if runOptions.SafeRendezvousState != "" {
106                         bal.logf("notice: KeepServices list has changed since last run")
107                 }
108                 bal.logf("clearing existing trash lists, in case the new rendezvous order differs from previous run")
109                 if err = bal.ClearTrashLists(&config.Client); err != nil {
110                         return
111                 }
112                 // The current rendezvous state becomes "safe" (i.e.,
113                 // OK to compute changes for that state without
114                 // clearing existing trash lists) only now, after we
115                 // succeed in clearing existing trash lists.
116                 nextRunOptions.SafeRendezvousState = rs
117         }
118         if err = bal.GetCurrentState(&config.Client, config.CollectionBatchSize, config.CollectionBuffers); err != nil {
119                 return
120         }
121         bal.ComputeChangeSets()
122         bal.PrintStatistics()
123         if err = bal.CheckSanityLate(); err != nil {
124                 return
125         }
126         if runOptions.CommitPulls {
127                 err = bal.CommitPulls(&config.Client)
128                 if err != nil {
129                         // Skip trash if we can't pull. (Too cautious?)
130                         return
131                 }
132         }
133         if runOptions.CommitTrash {
134                 err = bal.CommitTrash(&config.Client)
135         }
136         return
137 }
138
139 // SetKeepServices sets the list of KeepServices to operate on.
140 func (bal *Balancer) SetKeepServices(srvList arvados.KeepServiceList) error {
141         bal.KeepServices = make(map[string]*KeepService)
142         for _, srv := range srvList.Items {
143                 bal.KeepServices[srv.UUID] = &KeepService{
144                         KeepService: srv,
145                         ChangeSet:   &ChangeSet{},
146                 }
147         }
148         return nil
149 }
150
151 // DiscoverKeepServices sets the list of KeepServices by calling the
152 // API to get a list of all services, and selecting the ones whose
153 // ServiceType is in okTypes.
154 func (bal *Balancer) DiscoverKeepServices(c *arvados.Client, okTypes []string) error {
155         bal.KeepServices = make(map[string]*KeepService)
156         ok := make(map[string]bool)
157         for _, t := range okTypes {
158                 ok[t] = true
159         }
160         return c.EachKeepService(func(srv arvados.KeepService) error {
161                 if ok[srv.ServiceType] {
162                         bal.KeepServices[srv.UUID] = &KeepService{
163                                 KeepService: srv,
164                                 ChangeSet:   &ChangeSet{},
165                         }
166                 } else {
167                         bal.logf("skipping %v with service type %q", srv.UUID, srv.ServiceType)
168                 }
169                 return nil
170         })
171 }
172
173 func (bal *Balancer) dedupDevices() {
174         rwdev := map[string]*KeepService{}
175         for _, srv := range bal.KeepServices {
176                 for _, mnt := range srv.mounts {
177                         if !mnt.ReadOnly && mnt.DeviceID != "" {
178                                 rwdev[mnt.DeviceID] = srv
179                         }
180                 }
181         }
182         // Drop the readonly mounts whose device is mounted RW
183         // elsewhere.
184         for _, srv := range bal.KeepServices {
185                 var dedup []*KeepMount
186                 for _, mnt := range srv.mounts {
187                         if mnt.ReadOnly && rwdev[mnt.DeviceID] != nil {
188                                 bal.logf("skipping srv %s readonly mount %q because same device %q is mounted read-write on srv %s", srv, mnt.UUID, mnt.DeviceID, rwdev[mnt.DeviceID])
189                         } else {
190                                 dedup = append(dedup, mnt)
191                         }
192                 }
193                 srv.mounts = dedup
194         }
195 }
196
197 // CheckSanityEarly checks for configuration and runtime errors that
198 // can be detected before GetCurrentState() and ComputeChangeSets()
199 // are called.
200 //
201 // If it returns an error, it is pointless to run GetCurrentState or
202 // ComputeChangeSets: after doing so, the statistics would be
203 // meaningless and it would be dangerous to run any Commit methods.
204 func (bal *Balancer) CheckSanityEarly(c *arvados.Client) error {
205         u, err := c.CurrentUser()
206         if err != nil {
207                 return fmt.Errorf("CurrentUser(): %v", err)
208         }
209         if !u.IsActive || !u.IsAdmin {
210                 return fmt.Errorf("current user (%s) is not an active admin user", u.UUID)
211         }
212         for _, srv := range bal.KeepServices {
213                 if srv.ServiceType == "proxy" {
214                         return fmt.Errorf("config error: %s: proxy servers cannot be balanced", srv)
215                 }
216         }
217         return nil
218 }
219
220 // rendezvousState returns a fingerprint (e.g., a sorted list of
221 // UUID+host+port) of the current set of keep services.
222 func (bal *Balancer) rendezvousState() string {
223         srvs := make([]string, 0, len(bal.KeepServices))
224         for _, srv := range bal.KeepServices {
225                 srvs = append(srvs, srv.String())
226         }
227         sort.Strings(srvs)
228         return strings.Join(srvs, "; ")
229 }
230
231 // ClearTrashLists sends an empty trash list to each keep
232 // service. Calling this before GetCurrentState avoids races.
233 //
234 // When a block appears in an index, we assume that replica will still
235 // exist after we delete other replicas on other servers. However,
236 // it's possible that a previous rebalancing operation made different
237 // decisions (e.g., servers were added/removed, and rendezvous order
238 // changed). In this case, the replica might already be on that
239 // server's trash list, and it might be deleted before we send a
240 // replacement trash list.
241 //
242 // We avoid this problem if we clear all trash lists before getting
243 // indexes. (We also assume there is only one rebalancing process
244 // running at a time.)
245 func (bal *Balancer) ClearTrashLists(c *arvados.Client) error {
246         for _, srv := range bal.KeepServices {
247                 srv.ChangeSet = &ChangeSet{}
248         }
249         return bal.CommitTrash(c)
250 }
251
252 // GetCurrentState determines the current replication state, and the
253 // desired replication level, for every block that is either
254 // retrievable or referenced.
255 //
256 // It determines the current replication state by reading the block index
257 // from every known Keep service.
258 //
259 // It determines the desired replication level by retrieving all
260 // collection manifests in the database (API server).
261 //
262 // It encodes the resulting information in BlockStateMap.
263 func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) error {
264         defer timeMe(bal.Logger, "GetCurrentState")()
265         bal.BlockStateMap = NewBlockStateMap()
266
267         dd, err := c.DiscoveryDocument()
268         if err != nil {
269                 return err
270         }
271         bal.DefaultReplication = dd.DefaultCollectionReplication
272         bal.MinMtime = time.Now().UnixNano() - dd.BlobSignatureTTL*1e9
273
274         errs := make(chan error, 2+len(bal.KeepServices))
275         wg := sync.WaitGroup{}
276
277         // When a device is mounted more than once, we will get its
278         // index only once, and call AddReplicas on all of the mounts.
279         // equivMount keys are the mounts that will be indexed, and
280         // each value is a list of mounts to apply the received index
281         // to.
282         equivMount := map[*KeepMount][]*KeepMount{}
283         // deviceMount maps each device ID to the one mount that will
284         // be indexed for that device.
285         deviceMount := map[string]*KeepMount{}
286         for _, srv := range bal.KeepServices {
287                 for _, mnt := range srv.mounts {
288                         equiv := deviceMount[mnt.DeviceID]
289                         if equiv == nil {
290                                 equiv = mnt
291                                 if mnt.DeviceID != "" {
292                                         deviceMount[mnt.DeviceID] = equiv
293                                 }
294                         }
295                         equivMount[equiv] = append(equivMount[equiv], mnt)
296                 }
297         }
298
299         // Start one goroutine for each (non-redundant) mount:
300         // retrieve the index, and add the returned blocks to
301         // BlockStateMap.
302         for _, mounts := range equivMount {
303                 wg.Add(1)
304                 go func(mounts []*KeepMount) {
305                         defer wg.Done()
306                         bal.logf("mount %s: retrieve index from %s", mounts[0], mounts[0].KeepService)
307                         idx, err := mounts[0].KeepService.IndexMount(c, mounts[0].UUID, "")
308                         if err != nil {
309                                 errs <- fmt.Errorf("%s: retrieve index: %v", mounts[0], err)
310                                 return
311                         }
312                         if len(errs) > 0 {
313                                 // Some other goroutine encountered an
314                                 // error -- any further effort here
315                                 // will be wasted.
316                                 return
317                         }
318                         for _, mount := range mounts {
319                                 bal.logf("%s: add %d replicas to map", mount, len(idx))
320                                 bal.BlockStateMap.AddReplicas(mount, idx)
321                                 bal.logf("%s: added %d replicas", mount, len(idx))
322                         }
323                         bal.logf("mount %s: index done", mounts[0])
324                 }(mounts)
325         }
326
327         // collQ buffers incoming collections so we can start fetching
328         // the next page without waiting for the current page to
329         // finish processing.
330         collQ := make(chan arvados.Collection, bufs)
331
332         // Start a goroutine to process collections. (We could use a
333         // worker pool here, but even with a single worker we already
334         // process collections much faster than we can retrieve them.)
335         wg.Add(1)
336         go func() {
337                 defer wg.Done()
338                 for coll := range collQ {
339                         err := bal.addCollection(coll)
340                         if err != nil {
341                                 errs <- err
342                                 for range collQ {
343                                 }
344                                 return
345                         }
346                         bal.collScanned++
347                 }
348         }()
349
350         // Start a goroutine to retrieve all collections from the
351         // Arvados database and send them to collQ for processing.
352         wg.Add(1)
353         go func() {
354                 defer wg.Done()
355                 err = EachCollection(c, pageSize,
356                         func(coll arvados.Collection) error {
357                                 collQ <- coll
358                                 if len(errs) > 0 {
359                                         // some other GetCurrentState
360                                         // error happened: no point
361                                         // getting any more
362                                         // collections.
363                                         return fmt.Errorf("")
364                                 }
365                                 return nil
366                         }, func(done, total int) {
367                                 bal.logf("collections: %d/%d", done, total)
368                         })
369                 close(collQ)
370                 if err != nil {
371                         errs <- err
372                 }
373         }()
374
375         wg.Wait()
376         if len(errs) > 0 {
377                 return <-errs
378         }
379         return nil
380 }
381
382 func (bal *Balancer) addCollection(coll arvados.Collection) error {
383         blkids, err := coll.SizedDigests()
384         if err != nil {
385                 bal.mutex.Lock()
386                 bal.errors = append(bal.errors, fmt.Errorf("%v: %v", coll.UUID, err))
387                 bal.mutex.Unlock()
388                 return nil
389         }
390         repl := bal.DefaultReplication
391         if coll.ReplicationDesired != nil {
392                 repl = *coll.ReplicationDesired
393         }
394         debugf("%v: %d block x%d", coll.UUID, len(blkids), repl)
395         bal.BlockStateMap.IncreaseDesired(coll.StorageClassesDesired, repl, blkids)
396         return nil
397 }
398
399 // ComputeChangeSets compares, for each known block, the current and
400 // desired replication states. If it is possible to get closer to the
401 // desired state by copying or deleting blocks, it adds those changes
402 // to the relevant KeepServices' ChangeSets.
403 //
404 // It does not actually apply any of the computed changes.
405 func (bal *Balancer) ComputeChangeSets() {
406         // This just calls balanceBlock() once for each block, using a
407         // pool of worker goroutines.
408         defer timeMe(bal.Logger, "ComputeChangeSets")()
409         bal.setupLookupTables()
410
411         type balanceTask struct {
412                 blkid arvados.SizedDigest
413                 blk   *BlockState
414         }
415         workers := runtime.GOMAXPROCS(-1)
416         todo := make(chan balanceTask, workers)
417         go func() {
418                 bal.BlockStateMap.Apply(func(blkid arvados.SizedDigest, blk *BlockState) {
419                         todo <- balanceTask{
420                                 blkid: blkid,
421                                 blk:   blk,
422                         }
423                 })
424                 close(todo)
425         }()
426         results := make(chan balanceResult, workers)
427         go func() {
428                 var wg sync.WaitGroup
429                 for i := 0; i < workers; i++ {
430                         wg.Add(1)
431                         go func() {
432                                 for work := range todo {
433                                         results <- bal.balanceBlock(work.blkid, work.blk)
434                                 }
435                                 wg.Done()
436                         }()
437                 }
438                 wg.Wait()
439                 close(results)
440         }()
441         bal.collectStatistics(results)
442 }
443
444 func (bal *Balancer) setupLookupTables() {
445         bal.serviceRoots = make(map[string]string)
446         bal.classes = []string{"default"}
447         bal.mountsByClass = map[string]map[*KeepMount]bool{"default": {}}
448         bal.mounts = 0
449         for _, srv := range bal.KeepServices {
450                 bal.serviceRoots[srv.UUID] = srv.UUID
451                 for _, mnt := range srv.mounts {
452                         bal.mounts++
453
454                         // All mounts on a read-only service are
455                         // effectively read-only.
456                         mnt.ReadOnly = mnt.ReadOnly || srv.ReadOnly
457
458                         if len(mnt.StorageClasses) == 0 {
459                                 bal.mountsByClass["default"][mnt] = true
460                                 continue
461                         }
462                         for _, class := range mnt.StorageClasses {
463                                 if mbc := bal.mountsByClass[class]; mbc == nil {
464                                         bal.classes = append(bal.classes, class)
465                                         bal.mountsByClass[class] = map[*KeepMount]bool{mnt: true}
466                                 } else {
467                                         mbc[mnt] = true
468                                 }
469                         }
470                 }
471         }
472         // Consider classes in lexicographic order to avoid flapping
473         // between balancing runs.  The outcome of the "prefer a mount
474         // we're already planning to use for a different storage
475         // class" case in balanceBlock depends on the order classes
476         // are considered.
477         sort.Strings(bal.classes)
478 }
479
480 const (
481         changeStay = iota
482         changePull
483         changeTrash
484         changeNone
485 )
486
487 var changeName = map[int]string{
488         changeStay:  "stay",
489         changePull:  "pull",
490         changeTrash: "trash",
491         changeNone:  "none",
492 }
493
494 type balanceResult struct {
495         blk        *BlockState
496         blkid      arvados.SizedDigest
497         have       int
498         want       int
499         classState map[string]balancedBlockState
500 }
501
502 // balanceBlock compares current state to desired state for a single
503 // block, and makes the appropriate ChangeSet calls.
504 func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) balanceResult {
505         debugf("balanceBlock: %v %+v", blkid, blk)
506
507         type slot struct {
508                 mnt  *KeepMount // never nil
509                 repl *Replica   // replica already stored here (or nil)
510                 want bool       // we should pull/leave a replica here
511         }
512
513         // Build a list of all slots (one per mounted volume).
514         slots := make([]slot, 0, bal.mounts)
515         for _, srv := range bal.KeepServices {
516                 for _, mnt := range srv.mounts {
517                         var repl *Replica
518                         for r := range blk.Replicas {
519                                 if blk.Replicas[r].KeepMount == mnt {
520                                         repl = &blk.Replicas[r]
521                                 }
522                         }
523                         // Initial value of "want" is "have, and can't
524                         // delete". These untrashable replicas get
525                         // prioritized when sorting slots: otherwise,
526                         // non-optimal readonly copies would cause us
527                         // to overreplicate.
528                         slots = append(slots, slot{
529                                 mnt:  mnt,
530                                 repl: repl,
531                                 want: repl != nil && (mnt.ReadOnly || repl.Mtime >= bal.MinMtime),
532                         })
533                 }
534         }
535
536         uuids := keepclient.NewRootSorter(bal.serviceRoots, string(blkid[:32])).GetSortedRoots()
537         srvRendezvous := make(map[*KeepService]int, len(uuids))
538         for i, uuid := range uuids {
539                 srv := bal.KeepServices[uuid]
540                 srvRendezvous[srv] = i
541         }
542
543         // Below we set underreplicated=true if we find any storage
544         // class that's currently underreplicated -- in that case we
545         // won't want to trash any replicas.
546         underreplicated := false
547
548         classState := make(map[string]balancedBlockState, len(bal.classes))
549         unsafeToDelete := make(map[int64]bool, len(slots))
550         for _, class := range bal.classes {
551                 desired := blk.Desired[class]
552
553                 countedDev := map[string]bool{}
554                 have := 0
555                 for _, slot := range slots {
556                         if slot.repl != nil && bal.mountsByClass[class][slot.mnt] && !countedDev[slot.mnt.DeviceID] {
557                                 have++
558                                 if slot.mnt.DeviceID != "" {
559                                         countedDev[slot.mnt.DeviceID] = true
560                                 }
561                         }
562                 }
563                 classState[class] = balancedBlockState{
564                         desired: desired,
565                         surplus: have - desired,
566                 }
567
568                 if desired == 0 {
569                         continue
570                 }
571
572                 // Sort the slots by desirability.
573                 sort.Slice(slots, func(i, j int) bool {
574                         si, sj := slots[i], slots[j]
575                         if classi, classj := bal.mountsByClass[class][si.mnt], bal.mountsByClass[class][sj.mnt]; classi != classj {
576                                 // Prefer a mount that satisfies the
577                                 // desired class.
578                                 return bal.mountsByClass[class][si.mnt]
579                         } else if wanti, wantj := si.want, si.want; wanti != wantj {
580                                 // Prefer a mount that will have a
581                                 // replica no matter what we do here
582                                 // -- either because it already has an
583                                 // untrashable replica, or because we
584                                 // already need it to satisfy a
585                                 // different storage class.
586                                 return slots[i].want
587                         } else if orderi, orderj := srvRendezvous[si.mnt.KeepService], srvRendezvous[sj.mnt.KeepService]; orderi != orderj {
588                                 // Prefer a better rendezvous
589                                 // position.
590                                 return orderi < orderj
591                         } else if repli, replj := si.repl != nil, sj.repl != nil; repli != replj {
592                                 // Prefer a mount that already has a
593                                 // replica.
594                                 return repli
595                         } else {
596                                 // If pull/trash turns out to be
597                                 // needed, distribute the
598                                 // new/remaining replicas uniformly
599                                 // across qualifying mounts on a given
600                                 // server.
601                                 return rendezvousLess(si.mnt.DeviceID, sj.mnt.DeviceID, blkid)
602                         }
603                 })
604
605                 // Servers/mounts/devices (with or without existing
606                 // replicas) that are part of the best achievable
607                 // layout for this storage class.
608                 wantSrv := map[*KeepService]bool{}
609                 wantMnt := map[*KeepMount]bool{}
610                 wantDev := map[string]bool{}
611                 // Positions (with existing replicas) that have been
612                 // protected (via unsafeToDelete) to ensure we don't
613                 // reduce replication below desired level when
614                 // trashing replicas that aren't optimal positions for
615                 // any storage class.
616                 protMnt := map[*KeepMount]bool{}
617
618                 // trySlot tries using a slot to meet requirements,
619                 // and returns true if all requirements are met.
620                 trySlot := func(i int) bool {
621                         slot := slots[i]
622                         if wantDev[slot.mnt.DeviceID] {
623                                 // Already allocated a replica to this
624                                 // backend device, possibly on a
625                                 // different server.
626                                 return false
627                         }
628                         if len(protMnt) < desired && slot.repl != nil {
629                                 unsafeToDelete[slot.repl.Mtime] = true
630                                 protMnt[slot.mnt] = true
631                         }
632                         if len(wantMnt) < desired && (slot.repl != nil || !slot.mnt.ReadOnly) {
633                                 slots[i].want = true
634                                 wantSrv[slot.mnt.KeepService] = true
635                                 wantMnt[slot.mnt] = true
636                                 if slot.mnt.DeviceID != "" {
637                                         wantDev[slot.mnt.DeviceID] = true
638                                 }
639                         }
640                         return len(protMnt) >= desired && len(wantMnt) >= desired
641                 }
642
643                 // First try to achieve desired replication without
644                 // using the same server twice.
645                 done := false
646                 for i := 0; i < len(slots) && !done; i++ {
647                         if !wantSrv[slots[i].mnt.KeepService] {
648                                 done = trySlot(i)
649                         }
650                 }
651
652                 // If that didn't suffice, do another pass without the
653                 // "distinct services" restriction. (Achieving the
654                 // desired volume replication on fewer than the
655                 // desired number of services is better than
656                 // underreplicating.)
657                 for i := 0; i < len(slots) && !done; i++ {
658                         done = trySlot(i)
659                 }
660
661                 if !underreplicated {
662                         safe := 0
663                         for _, slot := range slots {
664                                 if slot.repl == nil || !bal.mountsByClass[class][slot.mnt] {
665                                         continue
666                                 }
667                                 if safe++; safe >= desired {
668                                         break
669                                 }
670                         }
671                         underreplicated = safe < desired
672                 }
673
674                 // set the unachievable flag if there aren't enough
675                 // slots offering the relevant storage class. (This is
676                 // as easy as checking slots[desired] because we
677                 // already sorted the qualifying slots to the front.)
678                 if desired >= len(slots) || !bal.mountsByClass[class][slots[desired].mnt] {
679                         cs := classState[class]
680                         cs.unachievable = true
681                         classState[class] = cs
682                 }
683
684                 // Avoid deleting wanted replicas from devices that
685                 // are mounted on multiple servers -- even if they
686                 // haven't already been added to unsafeToDelete
687                 // because the servers report different Mtimes.
688                 for _, slot := range slots {
689                         if slot.repl != nil && wantDev[slot.mnt.DeviceID] {
690                                 unsafeToDelete[slot.repl.Mtime] = true
691                         }
692                 }
693         }
694
695         // TODO: If multiple replicas are trashable, prefer the oldest
696         // replica that doesn't have a timestamp collision with
697         // others.
698
699         countedDev := map[string]bool{}
700         var have, want int
701         for _, slot := range slots {
702                 if slot.want {
703                         want++
704                 }
705                 if slot.repl != nil && !countedDev[slot.mnt.DeviceID] {
706                         have++
707                         if slot.mnt.DeviceID != "" {
708                                 countedDev[slot.mnt.DeviceID] = true
709                         }
710                 }
711         }
712
713         var changes []string
714         for _, slot := range slots {
715                 // TODO: request a Touch if Mtime is duplicated.
716                 var change int
717                 switch {
718                 case !underreplicated && slot.repl != nil && !slot.want && !unsafeToDelete[slot.repl.Mtime]:
719                         slot.mnt.KeepService.AddTrash(Trash{
720                                 SizedDigest: blkid,
721                                 Mtime:       slot.repl.Mtime,
722                                 From:        slot.mnt,
723                         })
724                         change = changeTrash
725                 case len(blk.Replicas) == 0:
726                         change = changeNone
727                 case slot.repl == nil && slot.want && !slot.mnt.ReadOnly:
728                         slot.mnt.KeepService.AddPull(Pull{
729                                 SizedDigest: blkid,
730                                 From:        blk.Replicas[0].KeepMount.KeepService,
731                                 To:          slot.mnt,
732                         })
733                         change = changePull
734                 default:
735                         change = changeStay
736                 }
737                 if bal.Dumper != nil {
738                         var mtime int64
739                         if slot.repl != nil {
740                                 mtime = slot.repl.Mtime
741                         }
742                         srv := slot.mnt.KeepService
743                         changes = append(changes, fmt.Sprintf("%s:%d/%s=%s,%d", srv.ServiceHost, srv.ServicePort, slot.mnt.UUID, changeName[change], mtime))
744                 }
745         }
746         if bal.Dumper != nil {
747                 bal.Dumper.Printf("%s have=%d want=%v %s", blkid, have, want, strings.Join(changes, " "))
748         }
749         return balanceResult{
750                 blk:        blk,
751                 blkid:      blkid,
752                 have:       have,
753                 want:       want,
754                 classState: classState,
755         }
756 }
757
758 type blocksNBytes struct {
759         replicas int
760         blocks   int
761         bytes    int64
762 }
763
764 func (bb blocksNBytes) String() string {
765         return fmt.Sprintf("%d replicas (%d blocks, %d bytes)", bb.replicas, bb.blocks, bb.bytes)
766 }
767
768 type balancerStats struct {
769         lost          blocksNBytes
770         overrep       blocksNBytes
771         unref         blocksNBytes
772         garbage       blocksNBytes
773         underrep      blocksNBytes
774         unachievable  blocksNBytes
775         justright     blocksNBytes
776         desired       blocksNBytes
777         current       blocksNBytes
778         pulls         int
779         trashes       int
780         replHistogram []int
781         classStats    map[string]replicationStats
782 }
783
784 type replicationStats struct {
785         desired      blocksNBytes
786         surplus      blocksNBytes
787         short        blocksNBytes
788         unachievable blocksNBytes
789 }
790
791 type balancedBlockState struct {
792         desired      int
793         surplus      int
794         unachievable bool
795 }
796
797 func (bal *Balancer) collectStatistics(results <-chan balanceResult) {
798         var s balancerStats
799         s.replHistogram = make([]int, 2)
800         s.classStats = make(map[string]replicationStats, len(bal.classes))
801         for result := range results {
802                 surplus := result.have - result.want
803                 bytes := result.blkid.Size()
804
805                 for class, state := range result.classState {
806                         cs := s.classStats[class]
807                         if state.unachievable {
808                                 cs.unachievable.blocks++
809                                 cs.unachievable.bytes += bytes
810                         }
811                         if state.desired > 0 {
812                                 cs.desired.replicas += state.desired
813                                 cs.desired.blocks++
814                                 cs.desired.bytes += bytes * int64(state.desired)
815                         }
816                         if state.surplus > 0 {
817                                 cs.surplus.replicas += state.surplus
818                                 cs.surplus.blocks++
819                                 cs.surplus.bytes += bytes * int64(state.surplus)
820                         } else if state.surplus < 0 {
821                                 cs.short.replicas += -state.surplus
822                                 cs.short.blocks++
823                                 cs.short.bytes += bytes * int64(-state.surplus)
824                         }
825                         s.classStats[class] = cs
826                 }
827
828                 switch {
829                 case result.have == 0 && result.want > 0:
830                         s.lost.replicas -= surplus
831                         s.lost.blocks++
832                         s.lost.bytes += bytes * int64(-surplus)
833                 case surplus < 0:
834                         s.underrep.replicas -= surplus
835                         s.underrep.blocks++
836                         s.underrep.bytes += bytes * int64(-surplus)
837                 case surplus > 0 && result.want == 0:
838                         counter := &s.garbage
839                         for _, r := range result.blk.Replicas {
840                                 if r.Mtime >= bal.MinMtime {
841                                         counter = &s.unref
842                                         break
843                                 }
844                         }
845                         counter.replicas += surplus
846                         counter.blocks++
847                         counter.bytes += bytes * int64(surplus)
848                 case surplus > 0:
849                         s.overrep.replicas += surplus
850                         s.overrep.blocks++
851                         s.overrep.bytes += bytes * int64(len(result.blk.Replicas)-result.want)
852                 default:
853                         s.justright.replicas += result.want
854                         s.justright.blocks++
855                         s.justright.bytes += bytes * int64(result.want)
856                 }
857
858                 if result.want > 0 {
859                         s.desired.replicas += result.want
860                         s.desired.blocks++
861                         s.desired.bytes += bytes * int64(result.want)
862                 }
863                 if len(result.blk.Replicas) > 0 {
864                         s.current.replicas += len(result.blk.Replicas)
865                         s.current.blocks++
866                         s.current.bytes += bytes * int64(len(result.blk.Replicas))
867                 }
868
869                 for len(s.replHistogram) <= len(result.blk.Replicas) {
870                         s.replHistogram = append(s.replHistogram, 0)
871                 }
872                 s.replHistogram[len(result.blk.Replicas)]++
873         }
874         for _, srv := range bal.KeepServices {
875                 s.pulls += len(srv.ChangeSet.Pulls)
876                 s.trashes += len(srv.ChangeSet.Trashes)
877         }
878         bal.stats = s
879 }
880
881 // PrintStatistics writes statistics about the computed changes to
882 // bal.Logger. It should not be called until ComputeChangeSets has
883 // finished.
884 func (bal *Balancer) PrintStatistics() {
885         bal.logf("===")
886         bal.logf("%s lost (0=have<want)", bal.stats.lost)
887         bal.logf("%s underreplicated (0<have<want)", bal.stats.underrep)
888         bal.logf("%s just right (have=want)", bal.stats.justright)
889         bal.logf("%s overreplicated (have>want>0)", bal.stats.overrep)
890         bal.logf("%s unreferenced (have>want=0, new)", bal.stats.unref)
891         bal.logf("%s garbage (have>want=0, old)", bal.stats.garbage)
892         for _, class := range bal.classes {
893                 cs := bal.stats.classStats[class]
894                 bal.logf("===")
895                 bal.logf("storage class %q: %s desired", class, cs.desired)
896                 bal.logf("storage class %q: %s short", class, cs.short)
897                 bal.logf("storage class %q: %s surplus", class, cs.surplus)
898                 bal.logf("storage class %q: %s unachievable", class, cs.unachievable)
899         }
900         bal.logf("===")
901         bal.logf("%s total commitment (excluding unreferenced)", bal.stats.desired)
902         bal.logf("%s total usage", bal.stats.current)
903         bal.logf("===")
904         for _, srv := range bal.KeepServices {
905                 bal.logf("%s: %v\n", srv, srv.ChangeSet)
906         }
907         bal.logf("===")
908         bal.printHistogram(60)
909         bal.logf("===")
910 }
911
912 func (bal *Balancer) printHistogram(hashColumns int) {
913         bal.logf("Replication level distribution (counting N replicas on a single server as N):")
914         maxCount := 0
915         for _, count := range bal.stats.replHistogram {
916                 if maxCount < count {
917                         maxCount = count
918                 }
919         }
920         hashes := strings.Repeat("#", hashColumns)
921         countWidth := 1 + int(math.Log10(float64(maxCount+1)))
922         scaleCount := 10 * float64(hashColumns) / math.Floor(1+10*math.Log10(float64(maxCount+1)))
923         for repl, count := range bal.stats.replHistogram {
924                 nHashes := int(scaleCount * math.Log10(float64(count+1)))
925                 bal.logf("%2d: %*d %s", repl, countWidth, count, hashes[:nHashes])
926         }
927 }
928
929 // CheckSanityLate checks for configuration and runtime errors after
930 // GetCurrentState() and ComputeChangeSets() have finished.
931 //
932 // If it returns an error, it is dangerous to run any Commit methods.
933 func (bal *Balancer) CheckSanityLate() error {
934         if bal.errors != nil {
935                 for _, err := range bal.errors {
936                         bal.logf("deferred error: %v", err)
937                 }
938                 return fmt.Errorf("cannot proceed safely after deferred errors")
939         }
940
941         if bal.collScanned == 0 {
942                 return fmt.Errorf("received zero collections")
943         }
944
945         anyDesired := false
946         bal.BlockStateMap.Apply(func(_ arvados.SizedDigest, blk *BlockState) {
947                 for _, desired := range blk.Desired {
948                         if desired > 0 {
949                                 anyDesired = true
950                                 break
951                         }
952                 }
953         })
954         if !anyDesired {
955                 return fmt.Errorf("zero blocks have desired replication>0")
956         }
957
958         if dr := bal.DefaultReplication; dr < 1 {
959                 return fmt.Errorf("Default replication (%d) is less than 1", dr)
960         }
961
962         // TODO: no two services have identical indexes
963         // TODO: no collisions (same md5, different size)
964         return nil
965 }
966
967 // CommitPulls sends the computed lists of pull requests to the
968 // keepstore servers. This has the effect of increasing replication of
969 // existing blocks that are either underreplicated or poorly
970 // distributed according to rendezvous hashing.
971 func (bal *Balancer) CommitPulls(c *arvados.Client) error {
972         return bal.commitAsync(c, "send pull list",
973                 func(srv *KeepService) error {
974                         return srv.CommitPulls(c)
975                 })
976 }
977
978 // CommitTrash sends the computed lists of trash requests to the
979 // keepstore servers. This has the effect of deleting blocks that are
980 // overreplicated or unreferenced.
981 func (bal *Balancer) CommitTrash(c *arvados.Client) error {
982         return bal.commitAsync(c, "send trash list",
983                 func(srv *KeepService) error {
984                         return srv.CommitTrash(c)
985                 })
986 }
987
988 func (bal *Balancer) commitAsync(c *arvados.Client, label string, f func(srv *KeepService) error) error {
989         errs := make(chan error)
990         for _, srv := range bal.KeepServices {
991                 go func(srv *KeepService) {
992                         var err error
993                         defer func() { errs <- err }()
994                         label := fmt.Sprintf("%s: %v", srv, label)
995                         defer timeMe(bal.Logger, label)()
996                         err = f(srv)
997                         if err != nil {
998                                 err = fmt.Errorf("%s: %v", label, err)
999                         }
1000                 }(srv)
1001         }
1002         var lastErr error
1003         for range bal.KeepServices {
1004                 if err := <-errs; err != nil {
1005                         bal.logf("%v", err)
1006                         lastErr = err
1007                 }
1008         }
1009         close(errs)
1010         return lastErr
1011 }
1012
1013 func (bal *Balancer) logf(f string, args ...interface{}) {
1014         if bal.Logger != nil {
1015                 bal.Logger.Printf(f, args...)
1016         }
1017 }
1018
1019 // Rendezvous hash sort function. Less efficient than sorting on
1020 // precomputed rendezvous hashes, but also rarely used.
1021 func rendezvousLess(i, j string, blkid arvados.SizedDigest) bool {
1022         a := md5.Sum([]byte(string(blkid[:32]) + i))
1023         b := md5.Sum([]byte(string(blkid[:32]) + j))
1024         return bytes.Compare(a[:], b[:]) < 0
1025 }