15112: Error out right away on unusable manifest.
[arvados.git] / services / keep-balance / balance.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "crypto/md5"
10         "fmt"
11         "log"
12         "math"
13         "runtime"
14         "sort"
15         "strings"
16         "sync"
17         "time"
18
19         "git.curoverse.com/arvados.git/sdk/go/arvados"
20         "git.curoverse.com/arvados.git/sdk/go/keepclient"
21         "github.com/sirupsen/logrus"
22 )
23
24 // Balancer compares the contents of keepstore servers with the
25 // collections stored in Arvados, and issues pull/trash requests
26 // needed to get (closer to) the optimal data layout.
27 //
28 // In the optimal data layout: every data block referenced by a
29 // collection is replicated at least as many times as desired by the
30 // collection; there are no unreferenced data blocks older than
31 // BlobSignatureTTL; and all N existing replicas of a given data block
32 // are in the N best positions in rendezvous probe order.
33 type Balancer struct {
34         Logger  logrus.FieldLogger
35         Dumper  logrus.FieldLogger
36         Metrics *metrics
37
38         *BlockStateMap
39         KeepServices       map[string]*KeepService
40         DefaultReplication int
41         MinMtime           int64
42
43         classes       []string
44         mounts        int
45         mountsByClass map[string]map[*KeepMount]bool
46         collScanned   int
47         serviceRoots  map[string]string
48         errors        []error
49         stats         balancerStats
50         mutex         sync.Mutex
51 }
52
53 // Run performs a balance operation using the given config and
54 // runOptions, and returns RunOptions suitable for passing to a
55 // subsequent balance operation.
56 //
57 // Run should only be called once on a given Balancer object.
58 //
59 // Typical usage:
60 //
61 //   runOptions, err = (&Balancer{}).Run(config, runOptions)
62 func (bal *Balancer) Run(config Config, runOptions RunOptions) (nextRunOptions RunOptions, err error) {
63         nextRunOptions = runOptions
64
65         defer bal.time("sweep", "wall clock time to run one full sweep")()
66
67         if len(config.KeepServiceList.Items) > 0 {
68                 err = bal.SetKeepServices(config.KeepServiceList)
69         } else {
70                 err = bal.DiscoverKeepServices(&config.Client, config.KeepServiceTypes)
71         }
72         if err != nil {
73                 return
74         }
75
76         for _, srv := range bal.KeepServices {
77                 err = srv.discoverMounts(&config.Client)
78                 if err != nil {
79                         return
80                 }
81         }
82         bal.cleanupMounts()
83
84         if err = bal.CheckSanityEarly(&config.Client); err != nil {
85                 return
86         }
87         rs := bal.rendezvousState()
88         if runOptions.CommitTrash && rs != runOptions.SafeRendezvousState {
89                 if runOptions.SafeRendezvousState != "" {
90                         bal.logf("notice: KeepServices list has changed since last run")
91                 }
92                 bal.logf("clearing existing trash lists, in case the new rendezvous order differs from previous run")
93                 if err = bal.ClearTrashLists(&config.Client); err != nil {
94                         return
95                 }
96                 // The current rendezvous state becomes "safe" (i.e.,
97                 // OK to compute changes for that state without
98                 // clearing existing trash lists) only now, after we
99                 // succeed in clearing existing trash lists.
100                 nextRunOptions.SafeRendezvousState = rs
101         }
102         if err = bal.GetCurrentState(&config.Client, config.CollectionBatchSize, config.CollectionBuffers); err != nil {
103                 return
104         }
105         bal.ComputeChangeSets()
106         bal.PrintStatistics()
107         if err = bal.CheckSanityLate(); err != nil {
108                 return
109         }
110         if runOptions.CommitPulls {
111                 err = bal.CommitPulls(&config.Client)
112                 if err != nil {
113                         // Skip trash if we can't pull. (Too cautious?)
114                         return
115                 }
116         }
117         if runOptions.CommitTrash {
118                 err = bal.CommitTrash(&config.Client)
119         }
120         return
121 }
122
123 // SetKeepServices sets the list of KeepServices to operate on.
124 func (bal *Balancer) SetKeepServices(srvList arvados.KeepServiceList) error {
125         bal.KeepServices = make(map[string]*KeepService)
126         for _, srv := range srvList.Items {
127                 bal.KeepServices[srv.UUID] = &KeepService{
128                         KeepService: srv,
129                         ChangeSet:   &ChangeSet{},
130                 }
131         }
132         return nil
133 }
134
135 // DiscoverKeepServices sets the list of KeepServices by calling the
136 // API to get a list of all services, and selecting the ones whose
137 // ServiceType is in okTypes.
138 func (bal *Balancer) DiscoverKeepServices(c *arvados.Client, okTypes []string) error {
139         bal.KeepServices = make(map[string]*KeepService)
140         ok := make(map[string]bool)
141         for _, t := range okTypes {
142                 ok[t] = true
143         }
144         return c.EachKeepService(func(srv arvados.KeepService) error {
145                 if ok[srv.ServiceType] {
146                         bal.KeepServices[srv.UUID] = &KeepService{
147                                 KeepService: srv,
148                                 ChangeSet:   &ChangeSet{},
149                         }
150                 } else {
151                         bal.logf("skipping %v with service type %q", srv.UUID, srv.ServiceType)
152                 }
153                 return nil
154         })
155 }
156
157 func (bal *Balancer) cleanupMounts() {
158         rwdev := map[string]*KeepService{}
159         for _, srv := range bal.KeepServices {
160                 for _, mnt := range srv.mounts {
161                         if !mnt.ReadOnly && mnt.DeviceID != "" {
162                                 rwdev[mnt.DeviceID] = srv
163                         }
164                 }
165         }
166         // Drop the readonly mounts whose device is mounted RW
167         // elsewhere.
168         for _, srv := range bal.KeepServices {
169                 var dedup []*KeepMount
170                 for _, mnt := range srv.mounts {
171                         if mnt.ReadOnly && rwdev[mnt.DeviceID] != nil {
172                                 bal.logf("skipping srv %s readonly mount %q because same device %q is mounted read-write on srv %s", srv, mnt.UUID, mnt.DeviceID, rwdev[mnt.DeviceID])
173                         } else {
174                                 dedup = append(dedup, mnt)
175                         }
176                 }
177                 srv.mounts = dedup
178         }
179         for _, srv := range bal.KeepServices {
180                 for _, mnt := range srv.mounts {
181                         if mnt.Replication <= 0 {
182                                 log.Printf("%s: mount %s reports replication=%d, using replication=1", srv, mnt.UUID, mnt.Replication)
183                                 mnt.Replication = 1
184                         }
185                 }
186         }
187 }
188
189 // CheckSanityEarly checks for configuration and runtime errors that
190 // can be detected before GetCurrentState() and ComputeChangeSets()
191 // are called.
192 //
193 // If it returns an error, it is pointless to run GetCurrentState or
194 // ComputeChangeSets: after doing so, the statistics would be
195 // meaningless and it would be dangerous to run any Commit methods.
196 func (bal *Balancer) CheckSanityEarly(c *arvados.Client) error {
197         u, err := c.CurrentUser()
198         if err != nil {
199                 return fmt.Errorf("CurrentUser(): %v", err)
200         }
201         if !u.IsActive || !u.IsAdmin {
202                 return fmt.Errorf("current user (%s) is not an active admin user", u.UUID)
203         }
204         for _, srv := range bal.KeepServices {
205                 if srv.ServiceType == "proxy" {
206                         return fmt.Errorf("config error: %s: proxy servers cannot be balanced", srv)
207                 }
208         }
209         return nil
210 }
211
212 // rendezvousState returns a fingerprint (e.g., a sorted list of
213 // UUID+host+port) of the current set of keep services.
214 func (bal *Balancer) rendezvousState() string {
215         srvs := make([]string, 0, len(bal.KeepServices))
216         for _, srv := range bal.KeepServices {
217                 srvs = append(srvs, srv.String())
218         }
219         sort.Strings(srvs)
220         return strings.Join(srvs, "; ")
221 }
222
223 // ClearTrashLists sends an empty trash list to each keep
224 // service. Calling this before GetCurrentState avoids races.
225 //
226 // When a block appears in an index, we assume that replica will still
227 // exist after we delete other replicas on other servers. However,
228 // it's possible that a previous rebalancing operation made different
229 // decisions (e.g., servers were added/removed, and rendezvous order
230 // changed). In this case, the replica might already be on that
231 // server's trash list, and it might be deleted before we send a
232 // replacement trash list.
233 //
234 // We avoid this problem if we clear all trash lists before getting
235 // indexes. (We also assume there is only one rebalancing process
236 // running at a time.)
237 func (bal *Balancer) ClearTrashLists(c *arvados.Client) error {
238         for _, srv := range bal.KeepServices {
239                 srv.ChangeSet = &ChangeSet{}
240         }
241         return bal.CommitTrash(c)
242 }
243
244 // GetCurrentState determines the current replication state, and the
245 // desired replication level, for every block that is either
246 // retrievable or referenced.
247 //
248 // It determines the current replication state by reading the block index
249 // from every known Keep service.
250 //
251 // It determines the desired replication level by retrieving all
252 // collection manifests in the database (API server).
253 //
254 // It encodes the resulting information in BlockStateMap.
255 func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) error {
256         defer bal.time("get_state", "wall clock time to get current state")()
257         bal.BlockStateMap = NewBlockStateMap()
258
259         dd, err := c.DiscoveryDocument()
260         if err != nil {
261                 return err
262         }
263         bal.DefaultReplication = dd.DefaultCollectionReplication
264         bal.MinMtime = time.Now().UnixNano() - dd.BlobSignatureTTL*1e9
265
266         errs := make(chan error, 1)
267         wg := sync.WaitGroup{}
268
269         // When a device is mounted more than once, we will get its
270         // index only once, and call AddReplicas on all of the mounts.
271         // equivMount keys are the mounts that will be indexed, and
272         // each value is a list of mounts to apply the received index
273         // to.
274         equivMount := map[*KeepMount][]*KeepMount{}
275         // deviceMount maps each device ID to the one mount that will
276         // be indexed for that device.
277         deviceMount := map[string]*KeepMount{}
278         for _, srv := range bal.KeepServices {
279                 for _, mnt := range srv.mounts {
280                         equiv := deviceMount[mnt.DeviceID]
281                         if equiv == nil {
282                                 equiv = mnt
283                                 if mnt.DeviceID != "" {
284                                         deviceMount[mnt.DeviceID] = equiv
285                                 }
286                         }
287                         equivMount[equiv] = append(equivMount[equiv], mnt)
288                 }
289         }
290
291         // Start one goroutine for each (non-redundant) mount:
292         // retrieve the index, and add the returned blocks to
293         // BlockStateMap.
294         for _, mounts := range equivMount {
295                 wg.Add(1)
296                 go func(mounts []*KeepMount) {
297                         defer wg.Done()
298                         bal.logf("mount %s: retrieve index from %s", mounts[0], mounts[0].KeepService)
299                         idx, err := mounts[0].KeepService.IndexMount(c, mounts[0].UUID, "")
300                         if err != nil {
301                                 select {
302                                 case errs <- fmt.Errorf("%s: retrieve index: %v", mounts[0], err):
303                                 default:
304                                 }
305                                 return
306                         }
307                         if len(errs) > 0 {
308                                 // Some other goroutine encountered an
309                                 // error -- any further effort here
310                                 // will be wasted.
311                                 return
312                         }
313                         for _, mount := range mounts {
314                                 bal.logf("%s: add %d entries to map", mount, len(idx))
315                                 bal.BlockStateMap.AddReplicas(mount, idx)
316                                 bal.logf("%s: added %d entries to map at %dx (%d replicas)", mount, len(idx), mount.Replication, len(idx)*mount.Replication)
317                         }
318                         bal.logf("mount %s: index done", mounts[0])
319                 }(mounts)
320         }
321
322         // collQ buffers incoming collections so we can start fetching
323         // the next page without waiting for the current page to
324         // finish processing.
325         collQ := make(chan arvados.Collection, bufs)
326
327         // Start a goroutine to process collections. (We could use a
328         // worker pool here, but even with a single worker we already
329         // process collections much faster than we can retrieve them.)
330         wg.Add(1)
331         go func() {
332                 defer wg.Done()
333                 for coll := range collQ {
334                         err := bal.addCollection(coll)
335                         if err != nil {
336                                 select {
337                                 case errs <- err:
338                                 default:
339                                 }
340                                 for range collQ {
341                                 }
342                                 return
343                         }
344                         bal.collScanned++
345                 }
346         }()
347
348         // Start a goroutine to retrieve all collections from the
349         // Arvados database and send them to collQ for processing.
350         wg.Add(1)
351         go func() {
352                 defer wg.Done()
353                 err = EachCollection(c, pageSize,
354                         func(coll arvados.Collection) error {
355                                 collQ <- coll
356                                 if len(errs) > 0 {
357                                         // some other GetCurrentState
358                                         // error happened: no point
359                                         // getting any more
360                                         // collections.
361                                         return fmt.Errorf("")
362                                 }
363                                 return nil
364                         }, func(done, total int) {
365                                 bal.logf("collections: %d/%d", done, total)
366                         })
367                 close(collQ)
368                 if err != nil {
369                         select {
370                         case errs <- err:
371                         default:
372                         }
373                 }
374         }()
375
376         wg.Wait()
377         if len(errs) > 0 {
378                 return <-errs
379         }
380         return nil
381 }
382
383 func (bal *Balancer) addCollection(coll arvados.Collection) error {
384         blkids, err := coll.SizedDigests()
385         if err != nil {
386                 return fmt.Errorf("%v: %v", coll.UUID, err)
387         }
388         repl := bal.DefaultReplication
389         if coll.ReplicationDesired != nil {
390                 repl = *coll.ReplicationDesired
391         }
392         debugf("%v: %d block x%d", coll.UUID, len(blkids), repl)
393         bal.BlockStateMap.IncreaseDesired(coll.StorageClassesDesired, repl, blkids)
394         return nil
395 }
396
397 // ComputeChangeSets compares, for each known block, the current and
398 // desired replication states. If it is possible to get closer to the
399 // desired state by copying or deleting blocks, it adds those changes
400 // to the relevant KeepServices' ChangeSets.
401 //
402 // It does not actually apply any of the computed changes.
403 func (bal *Balancer) ComputeChangeSets() {
404         // This just calls balanceBlock() once for each block, using a
405         // pool of worker goroutines.
406         defer bal.time("changeset_compute", "wall clock time to compute changesets")()
407         bal.setupLookupTables()
408
409         type balanceTask struct {
410                 blkid arvados.SizedDigest
411                 blk   *BlockState
412         }
413         workers := runtime.GOMAXPROCS(-1)
414         todo := make(chan balanceTask, workers)
415         go func() {
416                 bal.BlockStateMap.Apply(func(blkid arvados.SizedDigest, blk *BlockState) {
417                         todo <- balanceTask{
418                                 blkid: blkid,
419                                 blk:   blk,
420                         }
421                 })
422                 close(todo)
423         }()
424         results := make(chan balanceResult, workers)
425         go func() {
426                 var wg sync.WaitGroup
427                 for i := 0; i < workers; i++ {
428                         wg.Add(1)
429                         go func() {
430                                 for work := range todo {
431                                         results <- bal.balanceBlock(work.blkid, work.blk)
432                                 }
433                                 wg.Done()
434                         }()
435                 }
436                 wg.Wait()
437                 close(results)
438         }()
439         bal.collectStatistics(results)
440 }
441
442 func (bal *Balancer) setupLookupTables() {
443         bal.serviceRoots = make(map[string]string)
444         bal.classes = []string{"default"}
445         bal.mountsByClass = map[string]map[*KeepMount]bool{"default": {}}
446         bal.mounts = 0
447         for _, srv := range bal.KeepServices {
448                 bal.serviceRoots[srv.UUID] = srv.UUID
449                 for _, mnt := range srv.mounts {
450                         bal.mounts++
451
452                         // All mounts on a read-only service are
453                         // effectively read-only.
454                         mnt.ReadOnly = mnt.ReadOnly || srv.ReadOnly
455
456                         if len(mnt.StorageClasses) == 0 {
457                                 bal.mountsByClass["default"][mnt] = true
458                                 continue
459                         }
460                         for _, class := range mnt.StorageClasses {
461                                 if mbc := bal.mountsByClass[class]; mbc == nil {
462                                         bal.classes = append(bal.classes, class)
463                                         bal.mountsByClass[class] = map[*KeepMount]bool{mnt: true}
464                                 } else {
465                                         mbc[mnt] = true
466                                 }
467                         }
468                 }
469         }
470         // Consider classes in lexicographic order to avoid flapping
471         // between balancing runs.  The outcome of the "prefer a mount
472         // we're already planning to use for a different storage
473         // class" case in balanceBlock depends on the order classes
474         // are considered.
475         sort.Strings(bal.classes)
476 }
477
478 const (
479         changeStay = iota
480         changePull
481         changeTrash
482         changeNone
483 )
484
485 var changeName = map[int]string{
486         changeStay:  "stay",
487         changePull:  "pull",
488         changeTrash: "trash",
489         changeNone:  "none",
490 }
491
492 type balanceResult struct {
493         blk        *BlockState
494         blkid      arvados.SizedDigest
495         have       int
496         want       int
497         classState map[string]balancedBlockState
498 }
499
500 // balanceBlock compares current state to desired state for a single
501 // block, and makes the appropriate ChangeSet calls.
502 func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) balanceResult {
503         debugf("balanceBlock: %v %+v", blkid, blk)
504
505         type slot struct {
506                 mnt  *KeepMount // never nil
507                 repl *Replica   // replica already stored here (or nil)
508                 want bool       // we should pull/leave a replica here
509         }
510
511         // Build a list of all slots (one per mounted volume).
512         slots := make([]slot, 0, bal.mounts)
513         for _, srv := range bal.KeepServices {
514                 for _, mnt := range srv.mounts {
515                         var repl *Replica
516                         for r := range blk.Replicas {
517                                 if blk.Replicas[r].KeepMount == mnt {
518                                         repl = &blk.Replicas[r]
519                                 }
520                         }
521                         // Initial value of "want" is "have, and can't
522                         // delete". These untrashable replicas get
523                         // prioritized when sorting slots: otherwise,
524                         // non-optimal readonly copies would cause us
525                         // to overreplicate.
526                         slots = append(slots, slot{
527                                 mnt:  mnt,
528                                 repl: repl,
529                                 want: repl != nil && mnt.ReadOnly,
530                         })
531                 }
532         }
533
534         uuids := keepclient.NewRootSorter(bal.serviceRoots, string(blkid[:32])).GetSortedRoots()
535         srvRendezvous := make(map[*KeepService]int, len(uuids))
536         for i, uuid := range uuids {
537                 srv := bal.KeepServices[uuid]
538                 srvRendezvous[srv] = i
539         }
540
541         // Below we set underreplicated=true if we find any storage
542         // class that's currently underreplicated -- in that case we
543         // won't want to trash any replicas.
544         underreplicated := false
545
546         classState := make(map[string]balancedBlockState, len(bal.classes))
547         unsafeToDelete := make(map[int64]bool, len(slots))
548         for _, class := range bal.classes {
549                 desired := blk.Desired[class]
550
551                 countedDev := map[string]bool{}
552                 have := 0
553                 for _, slot := range slots {
554                         if slot.repl != nil && bal.mountsByClass[class][slot.mnt] && !countedDev[slot.mnt.DeviceID] {
555                                 have += slot.mnt.Replication
556                                 if slot.mnt.DeviceID != "" {
557                                         countedDev[slot.mnt.DeviceID] = true
558                                 }
559                         }
560                 }
561                 classState[class] = balancedBlockState{
562                         desired: desired,
563                         surplus: have - desired,
564                 }
565
566                 if desired == 0 {
567                         continue
568                 }
569
570                 // Sort the slots by desirability.
571                 sort.Slice(slots, func(i, j int) bool {
572                         si, sj := slots[i], slots[j]
573                         if classi, classj := bal.mountsByClass[class][si.mnt], bal.mountsByClass[class][sj.mnt]; classi != classj {
574                                 // Prefer a mount that satisfies the
575                                 // desired class.
576                                 return bal.mountsByClass[class][si.mnt]
577                         } else if si.want != sj.want {
578                                 // Prefer a mount that will have a
579                                 // replica no matter what we do here
580                                 // -- either because it already has an
581                                 // untrashable replica, or because we
582                                 // already need it to satisfy a
583                                 // different storage class.
584                                 return si.want
585                         } else if orderi, orderj := srvRendezvous[si.mnt.KeepService], srvRendezvous[sj.mnt.KeepService]; orderi != orderj {
586                                 // Prefer a better rendezvous
587                                 // position.
588                                 return orderi < orderj
589                         } else if repli, replj := si.repl != nil, sj.repl != nil; repli != replj {
590                                 // Prefer a mount that already has a
591                                 // replica.
592                                 return repli
593                         } else {
594                                 // If pull/trash turns out to be
595                                 // needed, distribute the
596                                 // new/remaining replicas uniformly
597                                 // across qualifying mounts on a given
598                                 // server.
599                                 return rendezvousLess(si.mnt.DeviceID, sj.mnt.DeviceID, blkid)
600                         }
601                 })
602
603                 // Servers/mounts/devices (with or without existing
604                 // replicas) that are part of the best achievable
605                 // layout for this storage class.
606                 wantSrv := map[*KeepService]bool{}
607                 wantMnt := map[*KeepMount]bool{}
608                 wantDev := map[string]bool{}
609                 // Positions (with existing replicas) that have been
610                 // protected (via unsafeToDelete) to ensure we don't
611                 // reduce replication below desired level when
612                 // trashing replicas that aren't optimal positions for
613                 // any storage class.
614                 protMnt := map[*KeepMount]bool{}
615                 // Replication planned so far (corresponds to wantMnt).
616                 replWant := 0
617                 // Protected replication (corresponds to protMnt).
618                 replProt := 0
619
620                 // trySlot tries using a slot to meet requirements,
621                 // and returns true if all requirements are met.
622                 trySlot := func(i int) bool {
623                         slot := slots[i]
624                         if wantMnt[slot.mnt] || wantDev[slot.mnt.DeviceID] {
625                                 // Already allocated a replica to this
626                                 // backend device, possibly on a
627                                 // different server.
628                                 return false
629                         }
630                         if replProt < desired && slot.repl != nil && !protMnt[slot.mnt] {
631                                 unsafeToDelete[slot.repl.Mtime] = true
632                                 protMnt[slot.mnt] = true
633                                 replProt += slot.mnt.Replication
634                         }
635                         if replWant < desired && (slot.repl != nil || !slot.mnt.ReadOnly) {
636                                 slots[i].want = true
637                                 wantSrv[slot.mnt.KeepService] = true
638                                 wantMnt[slot.mnt] = true
639                                 if slot.mnt.DeviceID != "" {
640                                         wantDev[slot.mnt.DeviceID] = true
641                                 }
642                                 replWant += slot.mnt.Replication
643                         }
644                         return replProt >= desired && replWant >= desired
645                 }
646
647                 // First try to achieve desired replication without
648                 // using the same server twice.
649                 done := false
650                 for i := 0; i < len(slots) && !done; i++ {
651                         if !wantSrv[slots[i].mnt.KeepService] {
652                                 done = trySlot(i)
653                         }
654                 }
655
656                 // If that didn't suffice, do another pass without the
657                 // "distinct services" restriction. (Achieving the
658                 // desired volume replication on fewer than the
659                 // desired number of services is better than
660                 // underreplicating.)
661                 for i := 0; i < len(slots) && !done; i++ {
662                         done = trySlot(i)
663                 }
664
665                 if !underreplicated {
666                         safe := 0
667                         for _, slot := range slots {
668                                 if slot.repl == nil || !bal.mountsByClass[class][slot.mnt] {
669                                         continue
670                                 }
671                                 if safe += slot.mnt.Replication; safe >= desired {
672                                         break
673                                 }
674                         }
675                         underreplicated = safe < desired
676                 }
677
678                 // set the unachievable flag if there aren't enough
679                 // slots offering the relevant storage class. (This is
680                 // as easy as checking slots[desired] because we
681                 // already sorted the qualifying slots to the front.)
682                 if desired >= len(slots) || !bal.mountsByClass[class][slots[desired].mnt] {
683                         cs := classState[class]
684                         cs.unachievable = true
685                         classState[class] = cs
686                 }
687
688                 // Avoid deleting wanted replicas from devices that
689                 // are mounted on multiple servers -- even if they
690                 // haven't already been added to unsafeToDelete
691                 // because the servers report different Mtimes.
692                 for _, slot := range slots {
693                         if slot.repl != nil && wantDev[slot.mnt.DeviceID] {
694                                 unsafeToDelete[slot.repl.Mtime] = true
695                         }
696                 }
697         }
698
699         // TODO: If multiple replicas are trashable, prefer the oldest
700         // replica that doesn't have a timestamp collision with
701         // others.
702
703         countedDev := map[string]bool{}
704         var have, want int
705         for _, slot := range slots {
706                 if countedDev[slot.mnt.DeviceID] {
707                         continue
708                 }
709                 if slot.want {
710                         want += slot.mnt.Replication
711                 }
712                 if slot.repl != nil {
713                         have += slot.mnt.Replication
714                 }
715                 if slot.mnt.DeviceID != "" {
716                         countedDev[slot.mnt.DeviceID] = true
717                 }
718         }
719
720         var changes []string
721         for _, slot := range slots {
722                 // TODO: request a Touch if Mtime is duplicated.
723                 var change int
724                 switch {
725                 case !underreplicated && !slot.want && slot.repl != nil && slot.repl.Mtime < bal.MinMtime && !unsafeToDelete[slot.repl.Mtime]:
726                         slot.mnt.KeepService.AddTrash(Trash{
727                                 SizedDigest: blkid,
728                                 Mtime:       slot.repl.Mtime,
729                                 From:        slot.mnt,
730                         })
731                         change = changeTrash
732                 case len(blk.Replicas) > 0 && slot.repl == nil && slot.want && !slot.mnt.ReadOnly:
733                         slot.mnt.KeepService.AddPull(Pull{
734                                 SizedDigest: blkid,
735                                 From:        blk.Replicas[0].KeepMount.KeepService,
736                                 To:          slot.mnt,
737                         })
738                         change = changePull
739                 case slot.repl != nil:
740                         change = changeStay
741                 default:
742                         change = changeNone
743                 }
744                 if bal.Dumper != nil {
745                         var mtime int64
746                         if slot.repl != nil {
747                                 mtime = slot.repl.Mtime
748                         }
749                         srv := slot.mnt.KeepService
750                         changes = append(changes, fmt.Sprintf("%s:%d/%s=%s,%d", srv.ServiceHost, srv.ServicePort, slot.mnt.UUID, changeName[change], mtime))
751                 }
752         }
753         if bal.Dumper != nil {
754                 bal.Dumper.Printf("%s refs=%d have=%d want=%v %v %v", blkid, blk.RefCount, have, want, blk.Desired, changes)
755         }
756         return balanceResult{
757                 blk:        blk,
758                 blkid:      blkid,
759                 have:       have,
760                 want:       want,
761                 classState: classState,
762         }
763 }
764
765 type blocksNBytes struct {
766         replicas int
767         blocks   int
768         bytes    int64
769 }
770
771 func (bb blocksNBytes) String() string {
772         return fmt.Sprintf("%d replicas (%d blocks, %d bytes)", bb.replicas, bb.blocks, bb.bytes)
773 }
774
775 type balancerStats struct {
776         lost          blocksNBytes
777         overrep       blocksNBytes
778         unref         blocksNBytes
779         garbage       blocksNBytes
780         underrep      blocksNBytes
781         unachievable  blocksNBytes
782         justright     blocksNBytes
783         desired       blocksNBytes
784         current       blocksNBytes
785         pulls         int
786         trashes       int
787         replHistogram []int
788         classStats    map[string]replicationStats
789
790         // collectionBytes / collectionBlockBytes = deduplication ratio
791         collectionBytes      int64 // sum(bytes in referenced blocks) across all collections
792         collectionBlockBytes int64 // sum(block size) across all blocks referenced by collections
793         collectionBlockRefs  int64 // sum(number of blocks referenced) across all collections
794         collectionBlocks     int64 // number of blocks referenced by any collection
795 }
796
797 func (s *balancerStats) dedupByteRatio() float64 {
798         if s.collectionBlockBytes == 0 {
799                 return 0
800         }
801         return float64(s.collectionBytes) / float64(s.collectionBlockBytes)
802 }
803
804 func (s *balancerStats) dedupBlockRatio() float64 {
805         if s.collectionBlocks == 0 {
806                 return 0
807         }
808         return float64(s.collectionBlockRefs) / float64(s.collectionBlocks)
809 }
810
811 type replicationStats struct {
812         desired      blocksNBytes
813         surplus      blocksNBytes
814         short        blocksNBytes
815         unachievable blocksNBytes
816 }
817
818 type balancedBlockState struct {
819         desired      int
820         surplus      int
821         unachievable bool
822 }
823
824 func (bal *Balancer) collectStatistics(results <-chan balanceResult) {
825         var s balancerStats
826         s.replHistogram = make([]int, 2)
827         s.classStats = make(map[string]replicationStats, len(bal.classes))
828         for result := range results {
829                 surplus := result.have - result.want
830                 bytes := result.blkid.Size()
831
832                 if rc := int64(result.blk.RefCount); rc > 0 {
833                         s.collectionBytes += rc * bytes
834                         s.collectionBlockBytes += bytes
835                         s.collectionBlockRefs += rc
836                         s.collectionBlocks++
837                 }
838
839                 for class, state := range result.classState {
840                         cs := s.classStats[class]
841                         if state.unachievable {
842                                 cs.unachievable.blocks++
843                                 cs.unachievable.bytes += bytes
844                         }
845                         if state.desired > 0 {
846                                 cs.desired.replicas += state.desired
847                                 cs.desired.blocks++
848                                 cs.desired.bytes += bytes * int64(state.desired)
849                         }
850                         if state.surplus > 0 {
851                                 cs.surplus.replicas += state.surplus
852                                 cs.surplus.blocks++
853                                 cs.surplus.bytes += bytes * int64(state.surplus)
854                         } else if state.surplus < 0 {
855                                 cs.short.replicas += -state.surplus
856                                 cs.short.blocks++
857                                 cs.short.bytes += bytes * int64(-state.surplus)
858                         }
859                         s.classStats[class] = cs
860                 }
861
862                 switch {
863                 case result.have == 0 && result.want > 0:
864                         s.lost.replicas -= surplus
865                         s.lost.blocks++
866                         s.lost.bytes += bytes * int64(-surplus)
867                 case surplus < 0:
868                         s.underrep.replicas -= surplus
869                         s.underrep.blocks++
870                         s.underrep.bytes += bytes * int64(-surplus)
871                 case surplus > 0 && result.want == 0:
872                         counter := &s.garbage
873                         for _, r := range result.blk.Replicas {
874                                 if r.Mtime >= bal.MinMtime {
875                                         counter = &s.unref
876                                         break
877                                 }
878                         }
879                         counter.replicas += surplus
880                         counter.blocks++
881                         counter.bytes += bytes * int64(surplus)
882                 case surplus > 0:
883                         s.overrep.replicas += surplus
884                         s.overrep.blocks++
885                         s.overrep.bytes += bytes * int64(result.have-result.want)
886                 default:
887                         s.justright.replicas += result.want
888                         s.justright.blocks++
889                         s.justright.bytes += bytes * int64(result.want)
890                 }
891
892                 if result.want > 0 {
893                         s.desired.replicas += result.want
894                         s.desired.blocks++
895                         s.desired.bytes += bytes * int64(result.want)
896                 }
897                 if result.have > 0 {
898                         s.current.replicas += result.have
899                         s.current.blocks++
900                         s.current.bytes += bytes * int64(result.have)
901                 }
902
903                 for len(s.replHistogram) <= result.have {
904                         s.replHistogram = append(s.replHistogram, 0)
905                 }
906                 s.replHistogram[result.have]++
907         }
908         for _, srv := range bal.KeepServices {
909                 s.pulls += len(srv.ChangeSet.Pulls)
910                 s.trashes += len(srv.ChangeSet.Trashes)
911         }
912         bal.stats = s
913         bal.Metrics.UpdateStats(s)
914 }
915
916 // PrintStatistics writes statistics about the computed changes to
917 // bal.Logger. It should not be called until ComputeChangeSets has
918 // finished.
919 func (bal *Balancer) PrintStatistics() {
920         bal.logf("===")
921         bal.logf("%s lost (0=have<want)", bal.stats.lost)
922         bal.logf("%s underreplicated (0<have<want)", bal.stats.underrep)
923         bal.logf("%s just right (have=want)", bal.stats.justright)
924         bal.logf("%s overreplicated (have>want>0)", bal.stats.overrep)
925         bal.logf("%s unreferenced (have>want=0, new)", bal.stats.unref)
926         bal.logf("%s garbage (have>want=0, old)", bal.stats.garbage)
927         for _, class := range bal.classes {
928                 cs := bal.stats.classStats[class]
929                 bal.logf("===")
930                 bal.logf("storage class %q: %s desired", class, cs.desired)
931                 bal.logf("storage class %q: %s short", class, cs.short)
932                 bal.logf("storage class %q: %s surplus", class, cs.surplus)
933                 bal.logf("storage class %q: %s unachievable", class, cs.unachievable)
934         }
935         bal.logf("===")
936         bal.logf("%s total commitment (excluding unreferenced)", bal.stats.desired)
937         bal.logf("%s total usage", bal.stats.current)
938         bal.logf("===")
939         for _, srv := range bal.KeepServices {
940                 bal.logf("%s: %v\n", srv, srv.ChangeSet)
941         }
942         bal.logf("===")
943         bal.printHistogram(60)
944         bal.logf("===")
945 }
946
947 func (bal *Balancer) printHistogram(hashColumns int) {
948         bal.logf("Replication level distribution (counting N replicas on a single server as N):")
949         maxCount := 0
950         for _, count := range bal.stats.replHistogram {
951                 if maxCount < count {
952                         maxCount = count
953                 }
954         }
955         hashes := strings.Repeat("#", hashColumns)
956         countWidth := 1 + int(math.Log10(float64(maxCount+1)))
957         scaleCount := 10 * float64(hashColumns) / math.Floor(1+10*math.Log10(float64(maxCount+1)))
958         for repl, count := range bal.stats.replHistogram {
959                 nHashes := int(scaleCount * math.Log10(float64(count+1)))
960                 bal.logf("%2d: %*d %s", repl, countWidth, count, hashes[:nHashes])
961         }
962 }
963
964 // CheckSanityLate checks for configuration and runtime errors after
965 // GetCurrentState() and ComputeChangeSets() have finished.
966 //
967 // If it returns an error, it is dangerous to run any Commit methods.
968 func (bal *Balancer) CheckSanityLate() error {
969         if bal.errors != nil {
970                 for _, err := range bal.errors {
971                         bal.logf("deferred error: %v", err)
972                 }
973                 return fmt.Errorf("cannot proceed safely after deferred errors")
974         }
975
976         if bal.collScanned == 0 {
977                 return fmt.Errorf("received zero collections")
978         }
979
980         anyDesired := false
981         bal.BlockStateMap.Apply(func(_ arvados.SizedDigest, blk *BlockState) {
982                 for _, desired := range blk.Desired {
983                         if desired > 0 {
984                                 anyDesired = true
985                                 break
986                         }
987                 }
988         })
989         if !anyDesired {
990                 return fmt.Errorf("zero blocks have desired replication>0")
991         }
992
993         if dr := bal.DefaultReplication; dr < 1 {
994                 return fmt.Errorf("Default replication (%d) is less than 1", dr)
995         }
996
997         // TODO: no two services have identical indexes
998         // TODO: no collisions (same md5, different size)
999         return nil
1000 }
1001
1002 // CommitPulls sends the computed lists of pull requests to the
1003 // keepstore servers. This has the effect of increasing replication of
1004 // existing blocks that are either underreplicated or poorly
1005 // distributed according to rendezvous hashing.
1006 func (bal *Balancer) CommitPulls(c *arvados.Client) error {
1007         defer bal.time("send_pull_lists", "wall clock time to send pull lists")()
1008         return bal.commitAsync(c, "send pull list",
1009                 func(srv *KeepService) error {
1010                         return srv.CommitPulls(c)
1011                 })
1012 }
1013
1014 // CommitTrash sends the computed lists of trash requests to the
1015 // keepstore servers. This has the effect of deleting blocks that are
1016 // overreplicated or unreferenced.
1017 func (bal *Balancer) CommitTrash(c *arvados.Client) error {
1018         defer bal.time("send_trash_lists", "wall clock time to send trash lists")()
1019         return bal.commitAsync(c, "send trash list",
1020                 func(srv *KeepService) error {
1021                         return srv.CommitTrash(c)
1022                 })
1023 }
1024
1025 func (bal *Balancer) commitAsync(c *arvados.Client, label string, f func(srv *KeepService) error) error {
1026         errs := make(chan error)
1027         for _, srv := range bal.KeepServices {
1028                 go func(srv *KeepService) {
1029                         var err error
1030                         defer func() { errs <- err }()
1031                         label := fmt.Sprintf("%s: %v", srv, label)
1032                         err = f(srv)
1033                         if err != nil {
1034                                 err = fmt.Errorf("%s: %v", label, err)
1035                         }
1036                 }(srv)
1037         }
1038         var lastErr error
1039         for range bal.KeepServices {
1040                 if err := <-errs; err != nil {
1041                         bal.logf("%v", err)
1042                         lastErr = err
1043                 }
1044         }
1045         close(errs)
1046         return lastErr
1047 }
1048
1049 func (bal *Balancer) logf(f string, args ...interface{}) {
1050         if bal.Logger != nil {
1051                 bal.Logger.Printf(f, args...)
1052         }
1053 }
1054
1055 func (bal *Balancer) time(name, help string) func() {
1056         observer := bal.Metrics.DurationObserver(name+"_seconds", help)
1057         t0 := time.Now()
1058         bal.Logger.Printf("%s: start", name)
1059         return func() {
1060                 dur := time.Since(t0)
1061                 observer.Observe(dur.Seconds())
1062                 bal.Logger.Printf("%s: took %vs", name, dur.Seconds())
1063         }
1064 }
1065
1066 // Rendezvous hash sort function. Less efficient than sorting on
1067 // precomputed rendezvous hashes, but also rarely used.
1068 func rendezvousLess(i, j string, blkid arvados.SizedDigest) bool {
1069         a := md5.Sum([]byte(string(blkid[:32]) + i))
1070         b := md5.Sum([]byte(string(blkid[:32]) + j))
1071         return bytes.Compare(a[:], b[:]) < 0
1072 }