14714: keep-balance uses cluster config
[arvados.git] / services / keep-balance / balance.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "crypto/md5"
10         "fmt"
11         "io"
12         "io/ioutil"
13         "log"
14         "math"
15         "os"
16         "runtime"
17         "sort"
18         "strings"
19         "sync"
20         "syscall"
21         "time"
22
23         "git.curoverse.com/arvados.git/sdk/go/arvados"
24         "git.curoverse.com/arvados.git/sdk/go/keepclient"
25         "github.com/sirupsen/logrus"
26 )
27
28 // Balancer compares the contents of keepstore servers with the
29 // collections stored in Arvados, and issues pull/trash requests
30 // needed to get (closer to) the optimal data layout.
31 //
32 // In the optimal data layout: every data block referenced by a
33 // collection is replicated at least as many times as desired by the
34 // collection; there are no unreferenced data blocks older than
35 // BlobSignatureTTL; and all N existing replicas of a given data block
36 // are in the N best positions in rendezvous probe order.
37 type Balancer struct {
38         Logger  logrus.FieldLogger
39         Dumper  logrus.FieldLogger
40         Metrics *metrics
41
42         LostBlocksFile string
43
44         *BlockStateMap
45         KeepServices       map[string]*KeepService
46         DefaultReplication int
47         MinMtime           int64
48
49         classes       []string
50         mounts        int
51         mountsByClass map[string]map[*KeepMount]bool
52         collScanned   int
53         serviceRoots  map[string]string
54         errors        []error
55         stats         balancerStats
56         mutex         sync.Mutex
57         lostBlocks    io.Writer
58 }
59
60 // Run performs a balance operation using the given config and
61 // runOptions, and returns RunOptions suitable for passing to a
62 // subsequent balance operation.
63 //
64 // Run should only be called once on a given Balancer object.
65 //
66 // Typical usage:
67 //
68 //   runOptions, err = (&Balancer{}).Run(config, runOptions)
69 func (bal *Balancer) Run(client *arvados.Client, cluster *arvados.Cluster, runOptions RunOptions) (nextRunOptions RunOptions, err error) {
70         nextRunOptions = runOptions
71
72         defer bal.time("sweep", "wall clock time to run one full sweep")()
73
74         var lbFile *os.File
75         if bal.LostBlocksFile != "" {
76                 tmpfn := bal.LostBlocksFile + ".tmp"
77                 lbFile, err = os.OpenFile(tmpfn, os.O_CREATE|os.O_WRONLY, 0777)
78                 if err != nil {
79                         return
80                 }
81                 defer lbFile.Close()
82                 err = syscall.Flock(int(lbFile.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
83                 if err != nil {
84                         return
85                 }
86                 defer func() {
87                         // Remove the tempfile only if we didn't get
88                         // as far as successfully renaming it.
89                         if lbFile != nil {
90                                 os.Remove(tmpfn)
91                         }
92                 }()
93                 bal.lostBlocks = lbFile
94         } else {
95                 bal.lostBlocks = ioutil.Discard
96         }
97
98         diskService := []string{"disk"}
99         err = bal.DiscoverKeepServices(client, diskService)
100         if err != nil {
101                 return
102         }
103
104         for _, srv := range bal.KeepServices {
105                 err = srv.discoverMounts(client)
106                 if err != nil {
107                         return
108                 }
109         }
110         bal.cleanupMounts()
111
112         if err = bal.CheckSanityEarly(client); err != nil {
113                 return
114         }
115         rs := bal.rendezvousState()
116         if runOptions.CommitTrash && rs != runOptions.SafeRendezvousState {
117                 if runOptions.SafeRendezvousState != "" {
118                         bal.logf("notice: KeepServices list has changed since last run")
119                 }
120                 bal.logf("clearing existing trash lists, in case the new rendezvous order differs from previous run")
121                 if err = bal.ClearTrashLists(client); err != nil {
122                         return
123                 }
124                 // The current rendezvous state becomes "safe" (i.e.,
125                 // OK to compute changes for that state without
126                 // clearing existing trash lists) only now, after we
127                 // succeed in clearing existing trash lists.
128                 nextRunOptions.SafeRendezvousState = rs
129         }
130         if err = bal.GetCurrentState(client, cluster.Collections.BalanceCollectionBatch, cluster.Collections.BalanceCollectionBuffers); err != nil {
131                 return
132         }
133         bal.ComputeChangeSets()
134         bal.PrintStatistics()
135         if err = bal.CheckSanityLate(); err != nil {
136                 return
137         }
138         if lbFile != nil {
139                 err = lbFile.Sync()
140                 if err != nil {
141                         return
142                 }
143                 err = os.Rename(bal.LostBlocksFile+".tmp", bal.LostBlocksFile)
144                 if err != nil {
145                         return
146                 }
147                 lbFile = nil
148         }
149         if runOptions.CommitPulls {
150                 err = bal.CommitPulls(client)
151                 if err != nil {
152                         // Skip trash if we can't pull. (Too cautious?)
153                         return
154                 }
155         }
156         if runOptions.CommitTrash {
157                 err = bal.CommitTrash(client)
158         }
159         return
160 }
161
162 // SetKeepServices sets the list of KeepServices to operate on.
163 func (bal *Balancer) SetKeepServices(srvList arvados.KeepServiceList) error {
164         bal.KeepServices = make(map[string]*KeepService)
165         for _, srv := range srvList.Items {
166                 bal.KeepServices[srv.UUID] = &KeepService{
167                         KeepService: srv,
168                         ChangeSet:   &ChangeSet{},
169                 }
170         }
171         return nil
172 }
173
174 // DiscoverKeepServices sets the list of KeepServices by calling the
175 // API to get a list of all services, and selecting the ones whose
176 // ServiceType is in okTypes.
177 func (bal *Balancer) DiscoverKeepServices(c *arvados.Client, okTypes []string) error {
178         bal.KeepServices = make(map[string]*KeepService)
179         ok := make(map[string]bool)
180         for _, t := range okTypes {
181                 ok[t] = true
182         }
183         return c.EachKeepService(func(srv arvados.KeepService) error {
184                 if ok[srv.ServiceType] {
185                         bal.KeepServices[srv.UUID] = &KeepService{
186                                 KeepService: srv,
187                                 ChangeSet:   &ChangeSet{},
188                         }
189                 } else {
190                         bal.logf("skipping %v with service type %q", srv.UUID, srv.ServiceType)
191                 }
192                 return nil
193         })
194 }
195
196 func (bal *Balancer) cleanupMounts() {
197         rwdev := map[string]*KeepService{}
198         for _, srv := range bal.KeepServices {
199                 for _, mnt := range srv.mounts {
200                         if !mnt.ReadOnly && mnt.DeviceID != "" {
201                                 rwdev[mnt.DeviceID] = srv
202                         }
203                 }
204         }
205         // Drop the readonly mounts whose device is mounted RW
206         // elsewhere.
207         for _, srv := range bal.KeepServices {
208                 var dedup []*KeepMount
209                 for _, mnt := range srv.mounts {
210                         if mnt.ReadOnly && rwdev[mnt.DeviceID] != nil {
211                                 bal.logf("skipping srv %s readonly mount %q because same device %q is mounted read-write on srv %s", srv, mnt.UUID, mnt.DeviceID, rwdev[mnt.DeviceID])
212                         } else {
213                                 dedup = append(dedup, mnt)
214                         }
215                 }
216                 srv.mounts = dedup
217         }
218         for _, srv := range bal.KeepServices {
219                 for _, mnt := range srv.mounts {
220                         if mnt.Replication <= 0 {
221                                 log.Printf("%s: mount %s reports replication=%d, using replication=1", srv, mnt.UUID, mnt.Replication)
222                                 mnt.Replication = 1
223                         }
224                 }
225         }
226 }
227
228 // CheckSanityEarly checks for configuration and runtime errors that
229 // can be detected before GetCurrentState() and ComputeChangeSets()
230 // are called.
231 //
232 // If it returns an error, it is pointless to run GetCurrentState or
233 // ComputeChangeSets: after doing so, the statistics would be
234 // meaningless and it would be dangerous to run any Commit methods.
235 func (bal *Balancer) CheckSanityEarly(c *arvados.Client) error {
236         u, err := c.CurrentUser()
237         if err != nil {
238                 return fmt.Errorf("CurrentUser(): %v", err)
239         }
240         if !u.IsActive || !u.IsAdmin {
241                 return fmt.Errorf("current user (%s) is not an active admin user", u.UUID)
242         }
243         for _, srv := range bal.KeepServices {
244                 if srv.ServiceType == "proxy" {
245                         return fmt.Errorf("config error: %s: proxy servers cannot be balanced", srv)
246                 }
247         }
248
249         var checkPage arvados.CollectionList
250         if err = c.RequestAndDecode(&checkPage, "GET", "arvados/v1/collections", nil, arvados.ResourceListParams{
251                 Limit:              new(int),
252                 Count:              "exact",
253                 IncludeTrash:       true,
254                 IncludeOldVersions: true,
255                 Filters: []arvados.Filter{{
256                         Attr:     "modified_at",
257                         Operator: "=",
258                         Operand:  nil,
259                 }},
260         }); err != nil {
261                 return err
262         } else if n := checkPage.ItemsAvailable; n > 0 {
263                 return fmt.Errorf("%d collections exist with null modified_at; cannot fetch reliably", n)
264         }
265
266         return nil
267 }
268
269 // rendezvousState returns a fingerprint (e.g., a sorted list of
270 // UUID+host+port) of the current set of keep services.
271 func (bal *Balancer) rendezvousState() string {
272         srvs := make([]string, 0, len(bal.KeepServices))
273         for _, srv := range bal.KeepServices {
274                 srvs = append(srvs, srv.String())
275         }
276         sort.Strings(srvs)
277         return strings.Join(srvs, "; ")
278 }
279
280 // ClearTrashLists sends an empty trash list to each keep
281 // service. Calling this before GetCurrentState avoids races.
282 //
283 // When a block appears in an index, we assume that replica will still
284 // exist after we delete other replicas on other servers. However,
285 // it's possible that a previous rebalancing operation made different
286 // decisions (e.g., servers were added/removed, and rendezvous order
287 // changed). In this case, the replica might already be on that
288 // server's trash list, and it might be deleted before we send a
289 // replacement trash list.
290 //
291 // We avoid this problem if we clear all trash lists before getting
292 // indexes. (We also assume there is only one rebalancing process
293 // running at a time.)
294 func (bal *Balancer) ClearTrashLists(c *arvados.Client) error {
295         for _, srv := range bal.KeepServices {
296                 srv.ChangeSet = &ChangeSet{}
297         }
298         return bal.CommitTrash(c)
299 }
300
301 // GetCurrentState determines the current replication state, and the
302 // desired replication level, for every block that is either
303 // retrievable or referenced.
304 //
305 // It determines the current replication state by reading the block index
306 // from every known Keep service.
307 //
308 // It determines the desired replication level by retrieving all
309 // collection manifests in the database (API server).
310 //
311 // It encodes the resulting information in BlockStateMap.
312 func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) error {
313         defer bal.time("get_state", "wall clock time to get current state")()
314         bal.BlockStateMap = NewBlockStateMap()
315
316         dd, err := c.DiscoveryDocument()
317         if err != nil {
318                 return err
319         }
320         bal.DefaultReplication = dd.DefaultCollectionReplication
321         bal.MinMtime = time.Now().UnixNano() - dd.BlobSignatureTTL*1e9
322
323         errs := make(chan error, 1)
324         wg := sync.WaitGroup{}
325
326         // When a device is mounted more than once, we will get its
327         // index only once, and call AddReplicas on all of the mounts.
328         // equivMount keys are the mounts that will be indexed, and
329         // each value is a list of mounts to apply the received index
330         // to.
331         equivMount := map[*KeepMount][]*KeepMount{}
332         // deviceMount maps each device ID to the one mount that will
333         // be indexed for that device.
334         deviceMount := map[string]*KeepMount{}
335         for _, srv := range bal.KeepServices {
336                 for _, mnt := range srv.mounts {
337                         equiv := deviceMount[mnt.DeviceID]
338                         if equiv == nil {
339                                 equiv = mnt
340                                 if mnt.DeviceID != "" {
341                                         deviceMount[mnt.DeviceID] = equiv
342                                 }
343                         }
344                         equivMount[equiv] = append(equivMount[equiv], mnt)
345                 }
346         }
347
348         // Start one goroutine for each (non-redundant) mount:
349         // retrieve the index, and add the returned blocks to
350         // BlockStateMap.
351         for _, mounts := range equivMount {
352                 wg.Add(1)
353                 go func(mounts []*KeepMount) {
354                         defer wg.Done()
355                         bal.logf("mount %s: retrieve index from %s", mounts[0], mounts[0].KeepService)
356                         idx, err := mounts[0].KeepService.IndexMount(c, mounts[0].UUID, "")
357                         if err != nil {
358                                 select {
359                                 case errs <- fmt.Errorf("%s: retrieve index: %v", mounts[0], err):
360                                 default:
361                                 }
362                                 return
363                         }
364                         if len(errs) > 0 {
365                                 // Some other goroutine encountered an
366                                 // error -- any further effort here
367                                 // will be wasted.
368                                 return
369                         }
370                         for _, mount := range mounts {
371                                 bal.logf("%s: add %d entries to map", mount, len(idx))
372                                 bal.BlockStateMap.AddReplicas(mount, idx)
373                                 bal.logf("%s: added %d entries to map at %dx (%d replicas)", mount, len(idx), mount.Replication, len(idx)*mount.Replication)
374                         }
375                         bal.logf("mount %s: index done", mounts[0])
376                 }(mounts)
377         }
378
379         // collQ buffers incoming collections so we can start fetching
380         // the next page without waiting for the current page to
381         // finish processing.
382         collQ := make(chan arvados.Collection, bufs)
383
384         // Start a goroutine to process collections. (We could use a
385         // worker pool here, but even with a single worker we already
386         // process collections much faster than we can retrieve them.)
387         wg.Add(1)
388         go func() {
389                 defer wg.Done()
390                 for coll := range collQ {
391                         err := bal.addCollection(coll)
392                         if err != nil || len(errs) > 0 {
393                                 select {
394                                 case errs <- err:
395                                 default:
396                                 }
397                                 for range collQ {
398                                 }
399                                 return
400                         }
401                         bal.collScanned++
402                 }
403         }()
404
405         // Start a goroutine to retrieve all collections from the
406         // Arvados database and send them to collQ for processing.
407         wg.Add(1)
408         go func() {
409                 defer wg.Done()
410                 err = EachCollection(c, pageSize,
411                         func(coll arvados.Collection) error {
412                                 collQ <- coll
413                                 if len(errs) > 0 {
414                                         // some other GetCurrentState
415                                         // error happened: no point
416                                         // getting any more
417                                         // collections.
418                                         return fmt.Errorf("")
419                                 }
420                                 return nil
421                         }, func(done, total int) {
422                                 bal.logf("collections: %d/%d", done, total)
423                         })
424                 close(collQ)
425                 if err != nil {
426                         select {
427                         case errs <- err:
428                         default:
429                         }
430                 }
431         }()
432
433         wg.Wait()
434         if len(errs) > 0 {
435                 return <-errs
436         }
437         return nil
438 }
439
440 func (bal *Balancer) addCollection(coll arvados.Collection) error {
441         blkids, err := coll.SizedDigests()
442         if err != nil {
443                 return fmt.Errorf("%v: %v", coll.UUID, err)
444         }
445         repl := bal.DefaultReplication
446         if coll.ReplicationDesired != nil {
447                 repl = *coll.ReplicationDesired
448         }
449         debugf("%v: %d block x%d", coll.UUID, len(blkids), repl)
450         // Pass pdh to IncreaseDesired only if LostBlocksFile is being
451         // written -- otherwise it's just a waste of memory.
452         pdh := ""
453         if bal.LostBlocksFile != "" {
454                 pdh = coll.PortableDataHash
455         }
456         bal.BlockStateMap.IncreaseDesired(pdh, coll.StorageClassesDesired, repl, blkids)
457         return nil
458 }
459
460 // ComputeChangeSets compares, for each known block, the current and
461 // desired replication states. If it is possible to get closer to the
462 // desired state by copying or deleting blocks, it adds those changes
463 // to the relevant KeepServices' ChangeSets.
464 //
465 // It does not actually apply any of the computed changes.
466 func (bal *Balancer) ComputeChangeSets() {
467         // This just calls balanceBlock() once for each block, using a
468         // pool of worker goroutines.
469         defer bal.time("changeset_compute", "wall clock time to compute changesets")()
470         bal.setupLookupTables()
471
472         type balanceTask struct {
473                 blkid arvados.SizedDigest
474                 blk   *BlockState
475         }
476         workers := runtime.GOMAXPROCS(-1)
477         todo := make(chan balanceTask, workers)
478         go func() {
479                 bal.BlockStateMap.Apply(func(blkid arvados.SizedDigest, blk *BlockState) {
480                         todo <- balanceTask{
481                                 blkid: blkid,
482                                 blk:   blk,
483                         }
484                 })
485                 close(todo)
486         }()
487         results := make(chan balanceResult, workers)
488         go func() {
489                 var wg sync.WaitGroup
490                 for i := 0; i < workers; i++ {
491                         wg.Add(1)
492                         go func() {
493                                 for work := range todo {
494                                         results <- bal.balanceBlock(work.blkid, work.blk)
495                                 }
496                                 wg.Done()
497                         }()
498                 }
499                 wg.Wait()
500                 close(results)
501         }()
502         bal.collectStatistics(results)
503 }
504
505 func (bal *Balancer) setupLookupTables() {
506         bal.serviceRoots = make(map[string]string)
507         bal.classes = defaultClasses
508         bal.mountsByClass = map[string]map[*KeepMount]bool{"default": {}}
509         bal.mounts = 0
510         for _, srv := range bal.KeepServices {
511                 bal.serviceRoots[srv.UUID] = srv.UUID
512                 for _, mnt := range srv.mounts {
513                         bal.mounts++
514
515                         // All mounts on a read-only service are
516                         // effectively read-only.
517                         mnt.ReadOnly = mnt.ReadOnly || srv.ReadOnly
518
519                         if len(mnt.StorageClasses) == 0 {
520                                 bal.mountsByClass["default"][mnt] = true
521                                 continue
522                         }
523                         for class := range mnt.StorageClasses {
524                                 if mbc := bal.mountsByClass[class]; mbc == nil {
525                                         bal.classes = append(bal.classes, class)
526                                         bal.mountsByClass[class] = map[*KeepMount]bool{mnt: true}
527                                 } else {
528                                         mbc[mnt] = true
529                                 }
530                         }
531                 }
532         }
533         // Consider classes in lexicographic order to avoid flapping
534         // between balancing runs.  The outcome of the "prefer a mount
535         // we're already planning to use for a different storage
536         // class" case in balanceBlock depends on the order classes
537         // are considered.
538         sort.Strings(bal.classes)
539 }
540
541 const (
542         changeStay = iota
543         changePull
544         changeTrash
545         changeNone
546 )
547
548 var changeName = map[int]string{
549         changeStay:  "stay",
550         changePull:  "pull",
551         changeTrash: "trash",
552         changeNone:  "none",
553 }
554
555 type balanceResult struct {
556         blk        *BlockState
557         blkid      arvados.SizedDigest
558         have       int
559         want       int
560         classState map[string]balancedBlockState
561 }
562
563 // balanceBlock compares current state to desired state for a single
564 // block, and makes the appropriate ChangeSet calls.
565 func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) balanceResult {
566         debugf("balanceBlock: %v %+v", blkid, blk)
567
568         type slot struct {
569                 mnt  *KeepMount // never nil
570                 repl *Replica   // replica already stored here (or nil)
571                 want bool       // we should pull/leave a replica here
572         }
573
574         // Build a list of all slots (one per mounted volume).
575         slots := make([]slot, 0, bal.mounts)
576         for _, srv := range bal.KeepServices {
577                 for _, mnt := range srv.mounts {
578                         var repl *Replica
579                         for r := range blk.Replicas {
580                                 if blk.Replicas[r].KeepMount == mnt {
581                                         repl = &blk.Replicas[r]
582                                 }
583                         }
584                         // Initial value of "want" is "have, and can't
585                         // delete". These untrashable replicas get
586                         // prioritized when sorting slots: otherwise,
587                         // non-optimal readonly copies would cause us
588                         // to overreplicate.
589                         slots = append(slots, slot{
590                                 mnt:  mnt,
591                                 repl: repl,
592                                 want: repl != nil && mnt.ReadOnly,
593                         })
594                 }
595         }
596
597         uuids := keepclient.NewRootSorter(bal.serviceRoots, string(blkid[:32])).GetSortedRoots()
598         srvRendezvous := make(map[*KeepService]int, len(uuids))
599         for i, uuid := range uuids {
600                 srv := bal.KeepServices[uuid]
601                 srvRendezvous[srv] = i
602         }
603
604         // Below we set underreplicated=true if we find any storage
605         // class that's currently underreplicated -- in that case we
606         // won't want to trash any replicas.
607         underreplicated := false
608
609         classState := make(map[string]balancedBlockState, len(bal.classes))
610         unsafeToDelete := make(map[int64]bool, len(slots))
611         for _, class := range bal.classes {
612                 desired := blk.Desired[class]
613
614                 countedDev := map[string]bool{}
615                 have := 0
616                 for _, slot := range slots {
617                         if slot.repl != nil && bal.mountsByClass[class][slot.mnt] && !countedDev[slot.mnt.DeviceID] {
618                                 have += slot.mnt.Replication
619                                 if slot.mnt.DeviceID != "" {
620                                         countedDev[slot.mnt.DeviceID] = true
621                                 }
622                         }
623                 }
624                 classState[class] = balancedBlockState{
625                         desired: desired,
626                         surplus: have - desired,
627                 }
628
629                 if desired == 0 {
630                         continue
631                 }
632
633                 // Sort the slots by desirability.
634                 sort.Slice(slots, func(i, j int) bool {
635                         si, sj := slots[i], slots[j]
636                         if classi, classj := bal.mountsByClass[class][si.mnt], bal.mountsByClass[class][sj.mnt]; classi != classj {
637                                 // Prefer a mount that satisfies the
638                                 // desired class.
639                                 return bal.mountsByClass[class][si.mnt]
640                         } else if si.want != sj.want {
641                                 // Prefer a mount that will have a
642                                 // replica no matter what we do here
643                                 // -- either because it already has an
644                                 // untrashable replica, or because we
645                                 // already need it to satisfy a
646                                 // different storage class.
647                                 return si.want
648                         } else if orderi, orderj := srvRendezvous[si.mnt.KeepService], srvRendezvous[sj.mnt.KeepService]; orderi != orderj {
649                                 // Prefer a better rendezvous
650                                 // position.
651                                 return orderi < orderj
652                         } else if repli, replj := si.repl != nil, sj.repl != nil; repli != replj {
653                                 // Prefer a mount that already has a
654                                 // replica.
655                                 return repli
656                         } else {
657                                 // If pull/trash turns out to be
658                                 // needed, distribute the
659                                 // new/remaining replicas uniformly
660                                 // across qualifying mounts on a given
661                                 // server.
662                                 return rendezvousLess(si.mnt.DeviceID, sj.mnt.DeviceID, blkid)
663                         }
664                 })
665
666                 // Servers/mounts/devices (with or without existing
667                 // replicas) that are part of the best achievable
668                 // layout for this storage class.
669                 wantSrv := map[*KeepService]bool{}
670                 wantMnt := map[*KeepMount]bool{}
671                 wantDev := map[string]bool{}
672                 // Positions (with existing replicas) that have been
673                 // protected (via unsafeToDelete) to ensure we don't
674                 // reduce replication below desired level when
675                 // trashing replicas that aren't optimal positions for
676                 // any storage class.
677                 protMnt := map[*KeepMount]bool{}
678                 // Replication planned so far (corresponds to wantMnt).
679                 replWant := 0
680                 // Protected replication (corresponds to protMnt).
681                 replProt := 0
682
683                 // trySlot tries using a slot to meet requirements,
684                 // and returns true if all requirements are met.
685                 trySlot := func(i int) bool {
686                         slot := slots[i]
687                         if wantMnt[slot.mnt] || wantDev[slot.mnt.DeviceID] {
688                                 // Already allocated a replica to this
689                                 // backend device, possibly on a
690                                 // different server.
691                                 return false
692                         }
693                         if replProt < desired && slot.repl != nil && !protMnt[slot.mnt] {
694                                 unsafeToDelete[slot.repl.Mtime] = true
695                                 protMnt[slot.mnt] = true
696                                 replProt += slot.mnt.Replication
697                         }
698                         if replWant < desired && (slot.repl != nil || !slot.mnt.ReadOnly) {
699                                 slots[i].want = true
700                                 wantSrv[slot.mnt.KeepService] = true
701                                 wantMnt[slot.mnt] = true
702                                 if slot.mnt.DeviceID != "" {
703                                         wantDev[slot.mnt.DeviceID] = true
704                                 }
705                                 replWant += slot.mnt.Replication
706                         }
707                         return replProt >= desired && replWant >= desired
708                 }
709
710                 // First try to achieve desired replication without
711                 // using the same server twice.
712                 done := false
713                 for i := 0; i < len(slots) && !done; i++ {
714                         if !wantSrv[slots[i].mnt.KeepService] {
715                                 done = trySlot(i)
716                         }
717                 }
718
719                 // If that didn't suffice, do another pass without the
720                 // "distinct services" restriction. (Achieving the
721                 // desired volume replication on fewer than the
722                 // desired number of services is better than
723                 // underreplicating.)
724                 for i := 0; i < len(slots) && !done; i++ {
725                         done = trySlot(i)
726                 }
727
728                 if !underreplicated {
729                         safe := 0
730                         for _, slot := range slots {
731                                 if slot.repl == nil || !bal.mountsByClass[class][slot.mnt] {
732                                         continue
733                                 }
734                                 if safe += slot.mnt.Replication; safe >= desired {
735                                         break
736                                 }
737                         }
738                         underreplicated = safe < desired
739                 }
740
741                 // set the unachievable flag if there aren't enough
742                 // slots offering the relevant storage class. (This is
743                 // as easy as checking slots[desired] because we
744                 // already sorted the qualifying slots to the front.)
745                 if desired >= len(slots) || !bal.mountsByClass[class][slots[desired].mnt] {
746                         cs := classState[class]
747                         cs.unachievable = true
748                         classState[class] = cs
749                 }
750
751                 // Avoid deleting wanted replicas from devices that
752                 // are mounted on multiple servers -- even if they
753                 // haven't already been added to unsafeToDelete
754                 // because the servers report different Mtimes.
755                 for _, slot := range slots {
756                         if slot.repl != nil && wantDev[slot.mnt.DeviceID] {
757                                 unsafeToDelete[slot.repl.Mtime] = true
758                         }
759                 }
760         }
761
762         // TODO: If multiple replicas are trashable, prefer the oldest
763         // replica that doesn't have a timestamp collision with
764         // others.
765
766         countedDev := map[string]bool{}
767         var have, want int
768         for _, slot := range slots {
769                 if countedDev[slot.mnt.DeviceID] {
770                         continue
771                 }
772                 if slot.want {
773                         want += slot.mnt.Replication
774                 }
775                 if slot.repl != nil {
776                         have += slot.mnt.Replication
777                 }
778                 if slot.mnt.DeviceID != "" {
779                         countedDev[slot.mnt.DeviceID] = true
780                 }
781         }
782
783         var changes []string
784         for _, slot := range slots {
785                 // TODO: request a Touch if Mtime is duplicated.
786                 var change int
787                 switch {
788                 case !underreplicated && !slot.want && slot.repl != nil && slot.repl.Mtime < bal.MinMtime && !unsafeToDelete[slot.repl.Mtime]:
789                         slot.mnt.KeepService.AddTrash(Trash{
790                                 SizedDigest: blkid,
791                                 Mtime:       slot.repl.Mtime,
792                                 From:        slot.mnt,
793                         })
794                         change = changeTrash
795                 case len(blk.Replicas) > 0 && slot.repl == nil && slot.want && !slot.mnt.ReadOnly:
796                         slot.mnt.KeepService.AddPull(Pull{
797                                 SizedDigest: blkid,
798                                 From:        blk.Replicas[0].KeepMount.KeepService,
799                                 To:          slot.mnt,
800                         })
801                         change = changePull
802                 case slot.repl != nil:
803                         change = changeStay
804                 default:
805                         change = changeNone
806                 }
807                 if bal.Dumper != nil {
808                         var mtime int64
809                         if slot.repl != nil {
810                                 mtime = slot.repl.Mtime
811                         }
812                         srv := slot.mnt.KeepService
813                         changes = append(changes, fmt.Sprintf("%s:%d/%s=%s,%d", srv.ServiceHost, srv.ServicePort, slot.mnt.UUID, changeName[change], mtime))
814                 }
815         }
816         if bal.Dumper != nil {
817                 bal.Dumper.Printf("%s refs=%d have=%d want=%v %v %v", blkid, blk.RefCount, have, want, blk.Desired, changes)
818         }
819         return balanceResult{
820                 blk:        blk,
821                 blkid:      blkid,
822                 have:       have,
823                 want:       want,
824                 classState: classState,
825         }
826 }
827
828 type blocksNBytes struct {
829         replicas int
830         blocks   int
831         bytes    int64
832 }
833
834 func (bb blocksNBytes) String() string {
835         return fmt.Sprintf("%d replicas (%d blocks, %d bytes)", bb.replicas, bb.blocks, bb.bytes)
836 }
837
838 type balancerStats struct {
839         lost          blocksNBytes
840         overrep       blocksNBytes
841         unref         blocksNBytes
842         garbage       blocksNBytes
843         underrep      blocksNBytes
844         unachievable  blocksNBytes
845         justright     blocksNBytes
846         desired       blocksNBytes
847         current       blocksNBytes
848         pulls         int
849         trashes       int
850         replHistogram []int
851         classStats    map[string]replicationStats
852
853         // collectionBytes / collectionBlockBytes = deduplication ratio
854         collectionBytes      int64 // sum(bytes in referenced blocks) across all collections
855         collectionBlockBytes int64 // sum(block size) across all blocks referenced by collections
856         collectionBlockRefs  int64 // sum(number of blocks referenced) across all collections
857         collectionBlocks     int64 // number of blocks referenced by any collection
858 }
859
860 func (s *balancerStats) dedupByteRatio() float64 {
861         if s.collectionBlockBytes == 0 {
862                 return 0
863         }
864         return float64(s.collectionBytes) / float64(s.collectionBlockBytes)
865 }
866
867 func (s *balancerStats) dedupBlockRatio() float64 {
868         if s.collectionBlocks == 0 {
869                 return 0
870         }
871         return float64(s.collectionBlockRefs) / float64(s.collectionBlocks)
872 }
873
874 type replicationStats struct {
875         desired      blocksNBytes
876         surplus      blocksNBytes
877         short        blocksNBytes
878         unachievable blocksNBytes
879 }
880
881 type balancedBlockState struct {
882         desired      int
883         surplus      int
884         unachievable bool
885 }
886
887 func (bal *Balancer) collectStatistics(results <-chan balanceResult) {
888         var s balancerStats
889         s.replHistogram = make([]int, 2)
890         s.classStats = make(map[string]replicationStats, len(bal.classes))
891         for result := range results {
892                 surplus := result.have - result.want
893                 bytes := result.blkid.Size()
894
895                 if rc := int64(result.blk.RefCount); rc > 0 {
896                         s.collectionBytes += rc * bytes
897                         s.collectionBlockBytes += bytes
898                         s.collectionBlockRefs += rc
899                         s.collectionBlocks++
900                 }
901
902                 for class, state := range result.classState {
903                         cs := s.classStats[class]
904                         if state.unachievable {
905                                 cs.unachievable.blocks++
906                                 cs.unachievable.bytes += bytes
907                         }
908                         if state.desired > 0 {
909                                 cs.desired.replicas += state.desired
910                                 cs.desired.blocks++
911                                 cs.desired.bytes += bytes * int64(state.desired)
912                         }
913                         if state.surplus > 0 {
914                                 cs.surplus.replicas += state.surplus
915                                 cs.surplus.blocks++
916                                 cs.surplus.bytes += bytes * int64(state.surplus)
917                         } else if state.surplus < 0 {
918                                 cs.short.replicas += -state.surplus
919                                 cs.short.blocks++
920                                 cs.short.bytes += bytes * int64(-state.surplus)
921                         }
922                         s.classStats[class] = cs
923                 }
924
925                 switch {
926                 case result.have == 0 && result.want > 0:
927                         s.lost.replicas -= surplus
928                         s.lost.blocks++
929                         s.lost.bytes += bytes * int64(-surplus)
930                         fmt.Fprintf(bal.lostBlocks, "%s", strings.SplitN(string(result.blkid), "+", 2)[0])
931                         for pdh := range result.blk.Refs {
932                                 fmt.Fprintf(bal.lostBlocks, " %s", pdh)
933                         }
934                         fmt.Fprint(bal.lostBlocks, "\n")
935                 case surplus < 0:
936                         s.underrep.replicas -= surplus
937                         s.underrep.blocks++
938                         s.underrep.bytes += bytes * int64(-surplus)
939                 case surplus > 0 && result.want == 0:
940                         counter := &s.garbage
941                         for _, r := range result.blk.Replicas {
942                                 if r.Mtime >= bal.MinMtime {
943                                         counter = &s.unref
944                                         break
945                                 }
946                         }
947                         counter.replicas += surplus
948                         counter.blocks++
949                         counter.bytes += bytes * int64(surplus)
950                 case surplus > 0:
951                         s.overrep.replicas += surplus
952                         s.overrep.blocks++
953                         s.overrep.bytes += bytes * int64(result.have-result.want)
954                 default:
955                         s.justright.replicas += result.want
956                         s.justright.blocks++
957                         s.justright.bytes += bytes * int64(result.want)
958                 }
959
960                 if result.want > 0 {
961                         s.desired.replicas += result.want
962                         s.desired.blocks++
963                         s.desired.bytes += bytes * int64(result.want)
964                 }
965                 if result.have > 0 {
966                         s.current.replicas += result.have
967                         s.current.blocks++
968                         s.current.bytes += bytes * int64(result.have)
969                 }
970
971                 for len(s.replHistogram) <= result.have {
972                         s.replHistogram = append(s.replHistogram, 0)
973                 }
974                 s.replHistogram[result.have]++
975         }
976         for _, srv := range bal.KeepServices {
977                 s.pulls += len(srv.ChangeSet.Pulls)
978                 s.trashes += len(srv.ChangeSet.Trashes)
979         }
980         bal.stats = s
981         bal.Metrics.UpdateStats(s)
982 }
983
984 // PrintStatistics writes statistics about the computed changes to
985 // bal.Logger. It should not be called until ComputeChangeSets has
986 // finished.
987 func (bal *Balancer) PrintStatistics() {
988         bal.logf("===")
989         bal.logf("%s lost (0=have<want)", bal.stats.lost)
990         bal.logf("%s underreplicated (0<have<want)", bal.stats.underrep)
991         bal.logf("%s just right (have=want)", bal.stats.justright)
992         bal.logf("%s overreplicated (have>want>0)", bal.stats.overrep)
993         bal.logf("%s unreferenced (have>want=0, new)", bal.stats.unref)
994         bal.logf("%s garbage (have>want=0, old)", bal.stats.garbage)
995         for _, class := range bal.classes {
996                 cs := bal.stats.classStats[class]
997                 bal.logf("===")
998                 bal.logf("storage class %q: %s desired", class, cs.desired)
999                 bal.logf("storage class %q: %s short", class, cs.short)
1000                 bal.logf("storage class %q: %s surplus", class, cs.surplus)
1001                 bal.logf("storage class %q: %s unachievable", class, cs.unachievable)
1002         }
1003         bal.logf("===")
1004         bal.logf("%s total commitment (excluding unreferenced)", bal.stats.desired)
1005         bal.logf("%s total usage", bal.stats.current)
1006         bal.logf("===")
1007         for _, srv := range bal.KeepServices {
1008                 bal.logf("%s: %v\n", srv, srv.ChangeSet)
1009         }
1010         bal.logf("===")
1011         bal.printHistogram(60)
1012         bal.logf("===")
1013 }
1014
1015 func (bal *Balancer) printHistogram(hashColumns int) {
1016         bal.logf("Replication level distribution (counting N replicas on a single server as N):")
1017         maxCount := 0
1018         for _, count := range bal.stats.replHistogram {
1019                 if maxCount < count {
1020                         maxCount = count
1021                 }
1022         }
1023         hashes := strings.Repeat("#", hashColumns)
1024         countWidth := 1 + int(math.Log10(float64(maxCount+1)))
1025         scaleCount := 10 * float64(hashColumns) / math.Floor(1+10*math.Log10(float64(maxCount+1)))
1026         for repl, count := range bal.stats.replHistogram {
1027                 nHashes := int(scaleCount * math.Log10(float64(count+1)))
1028                 bal.logf("%2d: %*d %s", repl, countWidth, count, hashes[:nHashes])
1029         }
1030 }
1031
1032 // CheckSanityLate checks for configuration and runtime errors after
1033 // GetCurrentState() and ComputeChangeSets() have finished.
1034 //
1035 // If it returns an error, it is dangerous to run any Commit methods.
1036 func (bal *Balancer) CheckSanityLate() error {
1037         if bal.errors != nil {
1038                 for _, err := range bal.errors {
1039                         bal.logf("deferred error: %v", err)
1040                 }
1041                 return fmt.Errorf("cannot proceed safely after deferred errors")
1042         }
1043
1044         if bal.collScanned == 0 {
1045                 return fmt.Errorf("received zero collections")
1046         }
1047
1048         anyDesired := false
1049         bal.BlockStateMap.Apply(func(_ arvados.SizedDigest, blk *BlockState) {
1050                 for _, desired := range blk.Desired {
1051                         if desired > 0 {
1052                                 anyDesired = true
1053                                 break
1054                         }
1055                 }
1056         })
1057         if !anyDesired {
1058                 return fmt.Errorf("zero blocks have desired replication>0")
1059         }
1060
1061         if dr := bal.DefaultReplication; dr < 1 {
1062                 return fmt.Errorf("Default replication (%d) is less than 1", dr)
1063         }
1064
1065         // TODO: no two services have identical indexes
1066         // TODO: no collisions (same md5, different size)
1067         return nil
1068 }
1069
1070 // CommitPulls sends the computed lists of pull requests to the
1071 // keepstore servers. This has the effect of increasing replication of
1072 // existing blocks that are either underreplicated or poorly
1073 // distributed according to rendezvous hashing.
1074 func (bal *Balancer) CommitPulls(c *arvados.Client) error {
1075         defer bal.time("send_pull_lists", "wall clock time to send pull lists")()
1076         return bal.commitAsync(c, "send pull list",
1077                 func(srv *KeepService) error {
1078                         return srv.CommitPulls(c)
1079                 })
1080 }
1081
1082 // CommitTrash sends the computed lists of trash requests to the
1083 // keepstore servers. This has the effect of deleting blocks that are
1084 // overreplicated or unreferenced.
1085 func (bal *Balancer) CommitTrash(c *arvados.Client) error {
1086         defer bal.time("send_trash_lists", "wall clock time to send trash lists")()
1087         return bal.commitAsync(c, "send trash list",
1088                 func(srv *KeepService) error {
1089                         return srv.CommitTrash(c)
1090                 })
1091 }
1092
1093 func (bal *Balancer) commitAsync(c *arvados.Client, label string, f func(srv *KeepService) error) error {
1094         errs := make(chan error)
1095         for _, srv := range bal.KeepServices {
1096                 go func(srv *KeepService) {
1097                         var err error
1098                         defer func() { errs <- err }()
1099                         label := fmt.Sprintf("%s: %v", srv, label)
1100                         err = f(srv)
1101                         if err != nil {
1102                                 err = fmt.Errorf("%s: %v", label, err)
1103                         }
1104                 }(srv)
1105         }
1106         var lastErr error
1107         for range bal.KeepServices {
1108                 if err := <-errs; err != nil {
1109                         bal.logf("%v", err)
1110                         lastErr = err
1111                 }
1112         }
1113         close(errs)
1114         return lastErr
1115 }
1116
1117 func (bal *Balancer) logf(f string, args ...interface{}) {
1118         if bal.Logger != nil {
1119                 bal.Logger.Printf(f, args...)
1120         }
1121 }
1122
1123 func (bal *Balancer) time(name, help string) func() {
1124         observer := bal.Metrics.DurationObserver(name+"_seconds", help)
1125         t0 := time.Now()
1126         bal.Logger.Printf("%s: start", name)
1127         return func() {
1128                 dur := time.Since(t0)
1129                 observer.Observe(dur.Seconds())
1130                 bal.Logger.Printf("%s: took %vs", name, dur.Seconds())
1131         }
1132 }
1133
1134 // Rendezvous hash sort function. Less efficient than sorting on
1135 // precomputed rendezvous hashes, but also rarely used.
1136 func rendezvousLess(i, j string, blkid arvados.SizedDigest) bool {
1137         a := md5.Sum([]byte(string(blkid[:32]) + i))
1138         b := md5.Sum([]byte(string(blkid[:32]) + j))
1139         return bytes.Compare(a[:], b[:]) < 0
1140 }