Merge branch '17574-storage-classes-confirmed' into main
[arvados.git] / services / keep-balance / balance.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "context"
10         "crypto/md5"
11         "fmt"
12         "io"
13         "io/ioutil"
14         "log"
15         "math"
16         "os"
17         "runtime"
18         "sort"
19         "strings"
20         "sync"
21         "sync/atomic"
22         "syscall"
23         "time"
24
25         "git.arvados.org/arvados.git/sdk/go/arvados"
26         "git.arvados.org/arvados.git/sdk/go/keepclient"
27         "github.com/jmoiron/sqlx"
28         "github.com/sirupsen/logrus"
29 )
30
31 // Balancer compares the contents of keepstore servers with the
32 // collections stored in Arvados, and issues pull/trash requests
33 // needed to get (closer to) the optimal data layout.
34 //
35 // In the optimal data layout: every data block referenced by a
36 // collection is replicated at least as many times as desired by the
37 // collection; there are no unreferenced data blocks older than
38 // BlobSignatureTTL; and all N existing replicas of a given data block
39 // are in the N best positions in rendezvous probe order.
40 type Balancer struct {
41         DB      *sqlx.DB
42         Logger  logrus.FieldLogger
43         Dumper  logrus.FieldLogger
44         Metrics *metrics
45
46         LostBlocksFile string
47
48         *BlockStateMap
49         KeepServices       map[string]*KeepService
50         DefaultReplication int
51         MinMtime           int64
52
53         classes       []string
54         mounts        int
55         mountsByClass map[string]map[*KeepMount]bool
56         collScanned   int64
57         serviceRoots  map[string]string
58         errors        []error
59         stats         balancerStats
60         mutex         sync.Mutex
61         lostBlocks    io.Writer
62 }
63
64 // Run performs a balance operation using the given config and
65 // runOptions, and returns RunOptions suitable for passing to a
66 // subsequent balance operation.
67 //
68 // Run should only be called once on a given Balancer object.
69 //
70 // Typical usage:
71 //
72 //   runOptions, err = (&Balancer{}).Run(config, runOptions)
73 func (bal *Balancer) Run(client *arvados.Client, cluster *arvados.Cluster, runOptions RunOptions) (nextRunOptions RunOptions, err error) {
74         nextRunOptions = runOptions
75
76         defer bal.time("sweep", "wall clock time to run one full sweep")()
77
78         ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(cluster.Collections.BalanceTimeout.Duration()))
79         defer cancel()
80
81         var lbFile *os.File
82         if bal.LostBlocksFile != "" {
83                 tmpfn := bal.LostBlocksFile + ".tmp"
84                 lbFile, err = os.OpenFile(tmpfn, os.O_CREATE|os.O_WRONLY, 0777)
85                 if err != nil {
86                         return
87                 }
88                 defer lbFile.Close()
89                 err = syscall.Flock(int(lbFile.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
90                 if err != nil {
91                         return
92                 }
93                 defer func() {
94                         // Remove the tempfile only if we didn't get
95                         // as far as successfully renaming it.
96                         if lbFile != nil {
97                                 os.Remove(tmpfn)
98                         }
99                 }()
100                 bal.lostBlocks = lbFile
101         } else {
102                 bal.lostBlocks = ioutil.Discard
103         }
104
105         err = bal.DiscoverKeepServices(client)
106         if err != nil {
107                 return
108         }
109
110         for _, srv := range bal.KeepServices {
111                 err = srv.discoverMounts(client)
112                 if err != nil {
113                         return
114                 }
115         }
116         bal.cleanupMounts()
117
118         if err = bal.CheckSanityEarly(client); err != nil {
119                 return
120         }
121
122         // On a big site, indexing and sending trash/pull lists can
123         // take much longer than the usual 5 minute client
124         // timeout. From here on, we rely on the context deadline
125         // instead, aborting the entire operation if any part takes
126         // too long.
127         client.Timeout = 0
128
129         rs := bal.rendezvousState()
130         if runOptions.CommitTrash && rs != runOptions.SafeRendezvousState {
131                 if runOptions.SafeRendezvousState != "" {
132                         bal.logf("notice: KeepServices list has changed since last run")
133                 }
134                 bal.logf("clearing existing trash lists, in case the new rendezvous order differs from previous run")
135                 if err = bal.ClearTrashLists(ctx, client); err != nil {
136                         return
137                 }
138                 // The current rendezvous state becomes "safe" (i.e.,
139                 // OK to compute changes for that state without
140                 // clearing existing trash lists) only now, after we
141                 // succeed in clearing existing trash lists.
142                 nextRunOptions.SafeRendezvousState = rs
143         }
144
145         if err = bal.GetCurrentState(ctx, client, cluster.Collections.BalanceCollectionBatch, cluster.Collections.BalanceCollectionBuffers); err != nil {
146                 return
147         }
148         bal.ComputeChangeSets()
149         bal.PrintStatistics()
150         if err = bal.CheckSanityLate(); err != nil {
151                 return
152         }
153         if lbFile != nil {
154                 err = lbFile.Sync()
155                 if err != nil {
156                         return
157                 }
158                 err = os.Rename(bal.LostBlocksFile+".tmp", bal.LostBlocksFile)
159                 if err != nil {
160                         return
161                 }
162                 lbFile = nil
163         }
164         if runOptions.CommitPulls {
165                 err = bal.CommitPulls(ctx, client)
166                 if err != nil {
167                         // Skip trash if we can't pull. (Too cautious?)
168                         return
169                 }
170         }
171         if runOptions.CommitTrash {
172                 err = bal.CommitTrash(ctx, client)
173                 if err != nil {
174                         return
175                 }
176         }
177         if runOptions.CommitConfirmedFields {
178                 err = bal.updateCollections(ctx, client, cluster)
179                 if err != nil {
180                         return
181                 }
182         }
183         return
184 }
185
186 // SetKeepServices sets the list of KeepServices to operate on.
187 func (bal *Balancer) SetKeepServices(srvList arvados.KeepServiceList) error {
188         bal.KeepServices = make(map[string]*KeepService)
189         for _, srv := range srvList.Items {
190                 bal.KeepServices[srv.UUID] = &KeepService{
191                         KeepService: srv,
192                         ChangeSet:   &ChangeSet{},
193                 }
194         }
195         return nil
196 }
197
198 // DiscoverKeepServices sets the list of KeepServices by calling the
199 // API to get a list of all services, and selecting the ones whose
200 // ServiceType is "disk"
201 func (bal *Balancer) DiscoverKeepServices(c *arvados.Client) error {
202         bal.KeepServices = make(map[string]*KeepService)
203         return c.EachKeepService(func(srv arvados.KeepService) error {
204                 if srv.ServiceType == "disk" {
205                         bal.KeepServices[srv.UUID] = &KeepService{
206                                 KeepService: srv,
207                                 ChangeSet:   &ChangeSet{},
208                         }
209                 } else {
210                         bal.logf("skipping %v with service type %q", srv.UUID, srv.ServiceType)
211                 }
212                 return nil
213         })
214 }
215
216 func (bal *Balancer) cleanupMounts() {
217         rwdev := map[string]*KeepService{}
218         for _, srv := range bal.KeepServices {
219                 for _, mnt := range srv.mounts {
220                         if !mnt.ReadOnly && mnt.DeviceID != "" {
221                                 rwdev[mnt.DeviceID] = srv
222                         }
223                 }
224         }
225         // Drop the readonly mounts whose device is mounted RW
226         // elsewhere.
227         for _, srv := range bal.KeepServices {
228                 var dedup []*KeepMount
229                 for _, mnt := range srv.mounts {
230                         if mnt.ReadOnly && rwdev[mnt.DeviceID] != nil {
231                                 bal.logf("skipping srv %s readonly mount %q because same device %q is mounted read-write on srv %s", srv, mnt.UUID, mnt.DeviceID, rwdev[mnt.DeviceID])
232                         } else {
233                                 dedup = append(dedup, mnt)
234                         }
235                 }
236                 srv.mounts = dedup
237         }
238         for _, srv := range bal.KeepServices {
239                 for _, mnt := range srv.mounts {
240                         if mnt.Replication <= 0 {
241                                 log.Printf("%s: mount %s reports replication=%d, using replication=1", srv, mnt.UUID, mnt.Replication)
242                                 mnt.Replication = 1
243                         }
244                 }
245         }
246 }
247
248 // CheckSanityEarly checks for configuration and runtime errors that
249 // can be detected before GetCurrentState() and ComputeChangeSets()
250 // are called.
251 //
252 // If it returns an error, it is pointless to run GetCurrentState or
253 // ComputeChangeSets: after doing so, the statistics would be
254 // meaningless and it would be dangerous to run any Commit methods.
255 func (bal *Balancer) CheckSanityEarly(c *arvados.Client) error {
256         u, err := c.CurrentUser()
257         if err != nil {
258                 return fmt.Errorf("CurrentUser(): %v", err)
259         }
260         if !u.IsActive || !u.IsAdmin {
261                 return fmt.Errorf("current user (%s) is not an active admin user", u.UUID)
262         }
263         for _, srv := range bal.KeepServices {
264                 if srv.ServiceType == "proxy" {
265                         return fmt.Errorf("config error: %s: proxy servers cannot be balanced", srv)
266                 }
267         }
268
269         var checkPage arvados.CollectionList
270         if err = c.RequestAndDecode(&checkPage, "GET", "arvados/v1/collections", nil, arvados.ResourceListParams{
271                 Limit:              new(int),
272                 Count:              "exact",
273                 IncludeTrash:       true,
274                 IncludeOldVersions: true,
275                 Filters: []arvados.Filter{{
276                         Attr:     "modified_at",
277                         Operator: "=",
278                         Operand:  nil,
279                 }},
280         }); err != nil {
281                 return err
282         } else if n := checkPage.ItemsAvailable; n > 0 {
283                 return fmt.Errorf("%d collections exist with null modified_at; cannot fetch reliably", n)
284         }
285
286         return nil
287 }
288
289 // rendezvousState returns a fingerprint (e.g., a sorted list of
290 // UUID+host+port) of the current set of keep services.
291 func (bal *Balancer) rendezvousState() string {
292         srvs := make([]string, 0, len(bal.KeepServices))
293         for _, srv := range bal.KeepServices {
294                 srvs = append(srvs, srv.String())
295         }
296         sort.Strings(srvs)
297         return strings.Join(srvs, "; ")
298 }
299
300 // ClearTrashLists sends an empty trash list to each keep
301 // service. Calling this before GetCurrentState avoids races.
302 //
303 // When a block appears in an index, we assume that replica will still
304 // exist after we delete other replicas on other servers. However,
305 // it's possible that a previous rebalancing operation made different
306 // decisions (e.g., servers were added/removed, and rendezvous order
307 // changed). In this case, the replica might already be on that
308 // server's trash list, and it might be deleted before we send a
309 // replacement trash list.
310 //
311 // We avoid this problem if we clear all trash lists before getting
312 // indexes. (We also assume there is only one rebalancing process
313 // running at a time.)
314 func (bal *Balancer) ClearTrashLists(ctx context.Context, c *arvados.Client) error {
315         for _, srv := range bal.KeepServices {
316                 srv.ChangeSet = &ChangeSet{}
317         }
318         return bal.CommitTrash(ctx, c)
319 }
320
321 // GetCurrentState determines the current replication state, and the
322 // desired replication level, for every block that is either
323 // retrievable or referenced.
324 //
325 // It determines the current replication state by reading the block index
326 // from every known Keep service.
327 //
328 // It determines the desired replication level by retrieving all
329 // collection manifests in the database (API server).
330 //
331 // It encodes the resulting information in BlockStateMap.
332 func (bal *Balancer) GetCurrentState(ctx context.Context, c *arvados.Client, pageSize, bufs int) error {
333         ctx, cancel := context.WithCancel(ctx)
334         defer cancel()
335
336         defer bal.time("get_state", "wall clock time to get current state")()
337         bal.BlockStateMap = NewBlockStateMap()
338
339         dd, err := c.DiscoveryDocument()
340         if err != nil {
341                 return err
342         }
343         bal.DefaultReplication = dd.DefaultCollectionReplication
344         bal.MinMtime = time.Now().UnixNano() - dd.BlobSignatureTTL*1e9
345
346         errs := make(chan error, 1)
347         wg := sync.WaitGroup{}
348
349         // When a device is mounted more than once, we will get its
350         // index only once, and call AddReplicas on all of the mounts.
351         // equivMount keys are the mounts that will be indexed, and
352         // each value is a list of mounts to apply the received index
353         // to.
354         equivMount := map[*KeepMount][]*KeepMount{}
355         // deviceMount maps each device ID to the one mount that will
356         // be indexed for that device.
357         deviceMount := map[string]*KeepMount{}
358         for _, srv := range bal.KeepServices {
359                 for _, mnt := range srv.mounts {
360                         equiv := deviceMount[mnt.DeviceID]
361                         if equiv == nil {
362                                 equiv = mnt
363                                 if mnt.DeviceID != "" {
364                                         deviceMount[mnt.DeviceID] = equiv
365                                 }
366                         }
367                         equivMount[equiv] = append(equivMount[equiv], mnt)
368                 }
369         }
370
371         // Start one goroutine for each (non-redundant) mount:
372         // retrieve the index, and add the returned blocks to
373         // BlockStateMap.
374         for _, mounts := range equivMount {
375                 wg.Add(1)
376                 go func(mounts []*KeepMount) {
377                         defer wg.Done()
378                         bal.logf("mount %s: retrieve index from %s", mounts[0], mounts[0].KeepService)
379                         idx, err := mounts[0].KeepService.IndexMount(ctx, c, mounts[0].UUID, "")
380                         if err != nil {
381                                 select {
382                                 case errs <- fmt.Errorf("%s: retrieve index: %v", mounts[0], err):
383                                 default:
384                                 }
385                                 cancel()
386                                 return
387                         }
388                         if len(errs) > 0 {
389                                 // Some other goroutine encountered an
390                                 // error -- any further effort here
391                                 // will be wasted.
392                                 return
393                         }
394                         for _, mount := range mounts {
395                                 bal.logf("%s: add %d entries to map", mount, len(idx))
396                                 bal.BlockStateMap.AddReplicas(mount, idx)
397                                 bal.logf("%s: added %d entries to map at %dx (%d replicas)", mount, len(idx), mount.Replication, len(idx)*mount.Replication)
398                         }
399                         bal.logf("mount %s: index done", mounts[0])
400                 }(mounts)
401         }
402
403         collQ := make(chan arvados.Collection, bufs)
404
405         // Retrieve all collections from the database and send them to
406         // collQ.
407         wg.Add(1)
408         go func() {
409                 defer wg.Done()
410                 err = EachCollection(ctx, bal.DB, c,
411                         func(coll arvados.Collection) error {
412                                 collQ <- coll
413                                 if len(errs) > 0 {
414                                         // some other GetCurrentState
415                                         // error happened: no point
416                                         // getting any more
417                                         // collections.
418                                         return fmt.Errorf("")
419                                 }
420                                 return nil
421                         }, func(done, total int) {
422                                 bal.logf("collections: %d/%d", done, total)
423                         })
424                 close(collQ)
425                 if err != nil {
426                         select {
427                         case errs <- err:
428                         default:
429                         }
430                         cancel()
431                 }
432         }()
433
434         // Parse manifests from collQ and pass the block hashes to
435         // BlockStateMap to track desired replication.
436         for i := 0; i < runtime.NumCPU(); i++ {
437                 wg.Add(1)
438                 go func() {
439                         defer wg.Done()
440                         for coll := range collQ {
441                                 err := bal.addCollection(coll)
442                                 if err != nil || len(errs) > 0 {
443                                         select {
444                                         case errs <- err:
445                                         default:
446                                         }
447                                         cancel()
448                                         continue
449                                 }
450                                 atomic.AddInt64(&bal.collScanned, 1)
451                         }
452                 }()
453         }
454
455         wg.Wait()
456         if len(errs) > 0 {
457                 return <-errs
458         }
459         return nil
460 }
461
462 func (bal *Balancer) addCollection(coll arvados.Collection) error {
463         blkids, err := coll.SizedDigests()
464         if err != nil {
465                 return fmt.Errorf("%v: %v", coll.UUID, err)
466         }
467         repl := bal.DefaultReplication
468         if coll.ReplicationDesired != nil {
469                 repl = *coll.ReplicationDesired
470         }
471         bal.Logger.Debugf("%v: %d blocks x%d", coll.UUID, len(blkids), repl)
472         // Pass pdh to IncreaseDesired only if LostBlocksFile is being
473         // written -- otherwise it's just a waste of memory.
474         pdh := ""
475         if bal.LostBlocksFile != "" {
476                 pdh = coll.PortableDataHash
477         }
478         bal.BlockStateMap.IncreaseDesired(pdh, coll.StorageClassesDesired, repl, blkids)
479         return nil
480 }
481
482 // ComputeChangeSets compares, for each known block, the current and
483 // desired replication states. If it is possible to get closer to the
484 // desired state by copying or deleting blocks, it adds those changes
485 // to the relevant KeepServices' ChangeSets.
486 //
487 // It does not actually apply any of the computed changes.
488 func (bal *Balancer) ComputeChangeSets() {
489         // This just calls balanceBlock() once for each block, using a
490         // pool of worker goroutines.
491         defer bal.time("changeset_compute", "wall clock time to compute changesets")()
492         bal.setupLookupTables()
493
494         type balanceTask struct {
495                 blkid arvados.SizedDigest
496                 blk   *BlockState
497         }
498         workers := runtime.GOMAXPROCS(-1)
499         todo := make(chan balanceTask, workers)
500         go func() {
501                 bal.BlockStateMap.Apply(func(blkid arvados.SizedDigest, blk *BlockState) {
502                         todo <- balanceTask{
503                                 blkid: blkid,
504                                 blk:   blk,
505                         }
506                 })
507                 close(todo)
508         }()
509         results := make(chan balanceResult, workers)
510         go func() {
511                 var wg sync.WaitGroup
512                 for i := 0; i < workers; i++ {
513                         wg.Add(1)
514                         go func() {
515                                 for work := range todo {
516                                         results <- bal.balanceBlock(work.blkid, work.blk)
517                                 }
518                                 wg.Done()
519                         }()
520                 }
521                 wg.Wait()
522                 close(results)
523         }()
524         bal.collectStatistics(results)
525 }
526
527 func (bal *Balancer) setupLookupTables() {
528         bal.serviceRoots = make(map[string]string)
529         bal.classes = defaultClasses
530         bal.mountsByClass = map[string]map[*KeepMount]bool{"default": {}}
531         bal.mounts = 0
532         for _, srv := range bal.KeepServices {
533                 bal.serviceRoots[srv.UUID] = srv.UUID
534                 for _, mnt := range srv.mounts {
535                         bal.mounts++
536
537                         // All mounts on a read-only service are
538                         // effectively read-only.
539                         mnt.ReadOnly = mnt.ReadOnly || srv.ReadOnly
540
541                         if len(mnt.StorageClasses) == 0 {
542                                 bal.mountsByClass["default"][mnt] = true
543                                 continue
544                         }
545                         for class := range mnt.StorageClasses {
546                                 if mbc := bal.mountsByClass[class]; mbc == nil {
547                                         bal.classes = append(bal.classes, class)
548                                         bal.mountsByClass[class] = map[*KeepMount]bool{mnt: true}
549                                 } else {
550                                         mbc[mnt] = true
551                                 }
552                         }
553                 }
554         }
555         // Consider classes in lexicographic order to avoid flapping
556         // between balancing runs.  The outcome of the "prefer a mount
557         // we're already planning to use for a different storage
558         // class" case in balanceBlock depends on the order classes
559         // are considered.
560         sort.Strings(bal.classes)
561 }
562
563 const (
564         changeStay = iota
565         changePull
566         changeTrash
567         changeNone
568 )
569
570 var changeName = map[int]string{
571         changeStay:  "stay",
572         changePull:  "pull",
573         changeTrash: "trash",
574         changeNone:  "none",
575 }
576
577 type balancedBlockState struct {
578         needed       int
579         unneeded     int
580         pulling      int
581         unachievable bool
582 }
583
584 type balanceResult struct {
585         blk        *BlockState
586         blkid      arvados.SizedDigest
587         lost       bool
588         blockState balancedBlockState
589         classState map[string]balancedBlockState
590 }
591
592 type slot struct {
593         mnt  *KeepMount // never nil
594         repl *Replica   // replica already stored here (or nil)
595         want bool       // we should pull/leave a replica here
596 }
597
598 // balanceBlock compares current state to desired state for a single
599 // block, and makes the appropriate ChangeSet calls.
600 func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) balanceResult {
601         bal.Logger.Debugf("balanceBlock: %v %+v", blkid, blk)
602
603         // Build a list of all slots (one per mounted volume).
604         slots := make([]slot, 0, bal.mounts)
605         for _, srv := range bal.KeepServices {
606                 for _, mnt := range srv.mounts {
607                         var repl *Replica
608                         for r := range blk.Replicas {
609                                 if blk.Replicas[r].KeepMount == mnt {
610                                         repl = &blk.Replicas[r]
611                                 }
612                         }
613                         // Initial value of "want" is "have, and can't
614                         // delete". These untrashable replicas get
615                         // prioritized when sorting slots: otherwise,
616                         // non-optimal readonly copies would cause us
617                         // to overreplicate.
618                         slots = append(slots, slot{
619                                 mnt:  mnt,
620                                 repl: repl,
621                                 want: repl != nil && mnt.ReadOnly,
622                         })
623                 }
624         }
625
626         uuids := keepclient.NewRootSorter(bal.serviceRoots, string(blkid[:32])).GetSortedRoots()
627         srvRendezvous := make(map[*KeepService]int, len(uuids))
628         for i, uuid := range uuids {
629                 srv := bal.KeepServices[uuid]
630                 srvRendezvous[srv] = i
631         }
632
633         // Below we set underreplicated=true if we find any storage
634         // class that's currently underreplicated -- in that case we
635         // won't want to trash any replicas.
636         underreplicated := false
637
638         unsafeToDelete := make(map[int64]bool, len(slots))
639         for _, class := range bal.classes {
640                 desired := blk.Desired[class]
641                 if desired == 0 {
642                         continue
643                 }
644
645                 // Sort the slots by desirability.
646                 sort.Slice(slots, func(i, j int) bool {
647                         si, sj := slots[i], slots[j]
648                         if classi, classj := bal.mountsByClass[class][si.mnt], bal.mountsByClass[class][sj.mnt]; classi != classj {
649                                 // Prefer a mount that satisfies the
650                                 // desired class.
651                                 return bal.mountsByClass[class][si.mnt]
652                         } else if si.want != sj.want {
653                                 // Prefer a mount that will have a
654                                 // replica no matter what we do here
655                                 // -- either because it already has an
656                                 // untrashable replica, or because we
657                                 // already need it to satisfy a
658                                 // different storage class.
659                                 return si.want
660                         } else if orderi, orderj := srvRendezvous[si.mnt.KeepService], srvRendezvous[sj.mnt.KeepService]; orderi != orderj {
661                                 // Prefer a better rendezvous
662                                 // position.
663                                 return orderi < orderj
664                         } else if repli, replj := si.repl != nil, sj.repl != nil; repli != replj {
665                                 // Prefer a mount that already has a
666                                 // replica.
667                                 return repli
668                         } else {
669                                 // If pull/trash turns out to be
670                                 // needed, distribute the
671                                 // new/remaining replicas uniformly
672                                 // across qualifying mounts on a given
673                                 // server.
674                                 return rendezvousLess(si.mnt.DeviceID, sj.mnt.DeviceID, blkid)
675                         }
676                 })
677
678                 // Servers/mounts/devices (with or without existing
679                 // replicas) that are part of the best achievable
680                 // layout for this storage class.
681                 wantSrv := map[*KeepService]bool{}
682                 wantMnt := map[*KeepMount]bool{}
683                 wantDev := map[string]bool{}
684                 // Positions (with existing replicas) that have been
685                 // protected (via unsafeToDelete) to ensure we don't
686                 // reduce replication below desired level when
687                 // trashing replicas that aren't optimal positions for
688                 // any storage class.
689                 protMnt := map[*KeepMount]bool{}
690                 // Replication planned so far (corresponds to wantMnt).
691                 replWant := 0
692                 // Protected replication (corresponds to protMnt).
693                 replProt := 0
694
695                 // trySlot tries using a slot to meet requirements,
696                 // and returns true if all requirements are met.
697                 trySlot := func(i int) bool {
698                         slot := slots[i]
699                         if wantMnt[slot.mnt] || wantDev[slot.mnt.DeviceID] {
700                                 // Already allocated a replica to this
701                                 // backend device, possibly on a
702                                 // different server.
703                                 return false
704                         }
705                         if replProt < desired && slot.repl != nil && !protMnt[slot.mnt] {
706                                 unsafeToDelete[slot.repl.Mtime] = true
707                                 protMnt[slot.mnt] = true
708                                 replProt += slot.mnt.Replication
709                         }
710                         if replWant < desired && (slot.repl != nil || !slot.mnt.ReadOnly) {
711                                 slots[i].want = true
712                                 wantSrv[slot.mnt.KeepService] = true
713                                 wantMnt[slot.mnt] = true
714                                 if slot.mnt.DeviceID != "" {
715                                         wantDev[slot.mnt.DeviceID] = true
716                                 }
717                                 replWant += slot.mnt.Replication
718                         }
719                         return replProt >= desired && replWant >= desired
720                 }
721
722                 // First try to achieve desired replication without
723                 // using the same server twice.
724                 done := false
725                 for i := 0; i < len(slots) && !done; i++ {
726                         if !wantSrv[slots[i].mnt.KeepService] {
727                                 done = trySlot(i)
728                         }
729                 }
730
731                 // If that didn't suffice, do another pass without the
732                 // "distinct services" restriction. (Achieving the
733                 // desired volume replication on fewer than the
734                 // desired number of services is better than
735                 // underreplicating.)
736                 for i := 0; i < len(slots) && !done; i++ {
737                         done = trySlot(i)
738                 }
739
740                 if !underreplicated {
741                         safe := 0
742                         for _, slot := range slots {
743                                 if slot.repl == nil || !bal.mountsByClass[class][slot.mnt] {
744                                         continue
745                                 }
746                                 if safe += slot.mnt.Replication; safe >= desired {
747                                         break
748                                 }
749                         }
750                         underreplicated = safe < desired
751                 }
752
753                 // Avoid deleting wanted replicas from devices that
754                 // are mounted on multiple servers -- even if they
755                 // haven't already been added to unsafeToDelete
756                 // because the servers report different Mtimes.
757                 for _, slot := range slots {
758                         if slot.repl != nil && wantDev[slot.mnt.DeviceID] {
759                                 unsafeToDelete[slot.repl.Mtime] = true
760                         }
761                 }
762         }
763
764         // TODO: If multiple replicas are trashable, prefer the oldest
765         // replica that doesn't have a timestamp collision with
766         // others.
767
768         for i, slot := range slots {
769                 // Don't trash (1) any replicas of an underreplicated
770                 // block, even if they're in the wrong positions, or
771                 // (2) any replicas whose Mtimes are identical to
772                 // needed replicas (in case we're really seeing the
773                 // same copy via different mounts).
774                 if slot.repl != nil && (underreplicated || unsafeToDelete[slot.repl.Mtime]) {
775                         slots[i].want = true
776                 }
777         }
778
779         classState := make(map[string]balancedBlockState, len(bal.classes))
780         for _, class := range bal.classes {
781                 classState[class] = computeBlockState(slots, bal.mountsByClass[class], len(blk.Replicas), blk.Desired[class])
782         }
783         blockState := computeBlockState(slots, nil, len(blk.Replicas), 0)
784
785         var lost bool
786         var changes []string
787         for _, slot := range slots {
788                 // TODO: request a Touch if Mtime is duplicated.
789                 var change int
790                 switch {
791                 case !slot.want && slot.repl != nil && slot.repl.Mtime < bal.MinMtime:
792                         slot.mnt.KeepService.AddTrash(Trash{
793                                 SizedDigest: blkid,
794                                 Mtime:       slot.repl.Mtime,
795                                 From:        slot.mnt,
796                         })
797                         change = changeTrash
798                 case slot.repl == nil && slot.want && len(blk.Replicas) == 0:
799                         lost = true
800                         change = changeNone
801                 case slot.repl == nil && slot.want && !slot.mnt.ReadOnly:
802                         slot.mnt.KeepService.AddPull(Pull{
803                                 SizedDigest: blkid,
804                                 From:        blk.Replicas[0].KeepMount.KeepService,
805                                 To:          slot.mnt,
806                         })
807                         change = changePull
808                 case slot.repl != nil:
809                         change = changeStay
810                 default:
811                         change = changeNone
812                 }
813                 if bal.Dumper != nil {
814                         var mtime int64
815                         if slot.repl != nil {
816                                 mtime = slot.repl.Mtime
817                         }
818                         srv := slot.mnt.KeepService
819                         changes = append(changes, fmt.Sprintf("%s:%d/%s=%s,%d", srv.ServiceHost, srv.ServicePort, slot.mnt.UUID, changeName[change], mtime))
820                 }
821         }
822         if bal.Dumper != nil {
823                 bal.Dumper.Printf("%s refs=%d needed=%d unneeded=%d pulling=%v %v %v", blkid, blk.RefCount, blockState.needed, blockState.unneeded, blockState.pulling, blk.Desired, changes)
824         }
825         return balanceResult{
826                 blk:        blk,
827                 blkid:      blkid,
828                 lost:       lost,
829                 blockState: blockState,
830                 classState: classState,
831         }
832 }
833
834 func computeBlockState(slots []slot, onlyCount map[*KeepMount]bool, have, needRepl int) (bbs balancedBlockState) {
835         repl := 0
836         countedDev := map[string]bool{}
837         for _, slot := range slots {
838                 if onlyCount != nil && !onlyCount[slot.mnt] {
839                         continue
840                 }
841                 if countedDev[slot.mnt.DeviceID] {
842                         continue
843                 }
844                 switch {
845                 case slot.repl != nil && slot.want:
846                         bbs.needed++
847                         repl += slot.mnt.Replication
848                 case slot.repl != nil && !slot.want:
849                         bbs.unneeded++
850                         repl += slot.mnt.Replication
851                 case slot.repl == nil && slot.want && have > 0:
852                         bbs.pulling++
853                         repl += slot.mnt.Replication
854                 }
855                 if slot.mnt.DeviceID != "" {
856                         countedDev[slot.mnt.DeviceID] = true
857                 }
858         }
859         if repl < needRepl {
860                 bbs.unachievable = true
861         }
862         return
863 }
864
865 type blocksNBytes struct {
866         replicas int
867         blocks   int
868         bytes    int64
869 }
870
871 func (bb blocksNBytes) String() string {
872         return fmt.Sprintf("%d replicas (%d blocks, %d bytes)", bb.replicas, bb.blocks, bb.bytes)
873 }
874
875 type replicationStats struct {
876         needed       blocksNBytes
877         unneeded     blocksNBytes
878         pulling      blocksNBytes
879         unachievable blocksNBytes
880 }
881
882 type balancerStats struct {
883         lost          blocksNBytes
884         overrep       blocksNBytes
885         unref         blocksNBytes
886         garbage       blocksNBytes
887         underrep      blocksNBytes
888         unachievable  blocksNBytes
889         justright     blocksNBytes
890         desired       blocksNBytes
891         current       blocksNBytes
892         pulls         int
893         trashes       int
894         replHistogram []int
895         classStats    map[string]replicationStats
896
897         // collectionBytes / collectionBlockBytes = deduplication ratio
898         collectionBytes      int64 // sum(bytes in referenced blocks) across all collections
899         collectionBlockBytes int64 // sum(block size) across all blocks referenced by collections
900         collectionBlockRefs  int64 // sum(number of blocks referenced) across all collections
901         collectionBlocks     int64 // number of blocks referenced by any collection
902 }
903
904 func (s *balancerStats) dedupByteRatio() float64 {
905         if s.collectionBlockBytes == 0 {
906                 return 0
907         }
908         return float64(s.collectionBytes) / float64(s.collectionBlockBytes)
909 }
910
911 func (s *balancerStats) dedupBlockRatio() float64 {
912         if s.collectionBlocks == 0 {
913                 return 0
914         }
915         return float64(s.collectionBlockRefs) / float64(s.collectionBlocks)
916 }
917
918 func (bal *Balancer) collectStatistics(results <-chan balanceResult) {
919         var s balancerStats
920         s.replHistogram = make([]int, 2)
921         s.classStats = make(map[string]replicationStats, len(bal.classes))
922         for result := range results {
923                 bytes := result.blkid.Size()
924
925                 if rc := int64(result.blk.RefCount); rc > 0 {
926                         s.collectionBytes += rc * bytes
927                         s.collectionBlockBytes += bytes
928                         s.collectionBlockRefs += rc
929                         s.collectionBlocks++
930                 }
931
932                 for class, state := range result.classState {
933                         cs := s.classStats[class]
934                         if state.unachievable {
935                                 cs.unachievable.replicas++
936                                 cs.unachievable.blocks++
937                                 cs.unachievable.bytes += bytes
938                         }
939                         if state.needed > 0 {
940                                 cs.needed.replicas += state.needed
941                                 cs.needed.blocks++
942                                 cs.needed.bytes += bytes * int64(state.needed)
943                         }
944                         if state.unneeded > 0 {
945                                 cs.unneeded.replicas += state.unneeded
946                                 cs.unneeded.blocks++
947                                 cs.unneeded.bytes += bytes * int64(state.unneeded)
948                         }
949                         if state.pulling > 0 {
950                                 cs.pulling.replicas += state.pulling
951                                 cs.pulling.blocks++
952                                 cs.pulling.bytes += bytes * int64(state.pulling)
953                         }
954                         s.classStats[class] = cs
955                 }
956
957                 bs := result.blockState
958                 switch {
959                 case result.lost:
960                         s.lost.replicas++
961                         s.lost.blocks++
962                         s.lost.bytes += bytes
963                         fmt.Fprintf(bal.lostBlocks, "%s", strings.SplitN(string(result.blkid), "+", 2)[0])
964                         for pdh := range result.blk.Refs {
965                                 fmt.Fprintf(bal.lostBlocks, " %s", pdh)
966                         }
967                         fmt.Fprint(bal.lostBlocks, "\n")
968                 case bs.pulling > 0:
969                         s.underrep.replicas += bs.pulling
970                         s.underrep.blocks++
971                         s.underrep.bytes += bytes * int64(bs.pulling)
972                 case bs.unachievable:
973                         s.underrep.replicas++
974                         s.underrep.blocks++
975                         s.underrep.bytes += bytes
976                 case bs.unneeded > 0 && bs.needed == 0:
977                         // Count as "garbage" if all replicas are old
978                         // enough to trash, otherwise count as
979                         // "unref".
980                         counter := &s.garbage
981                         for _, r := range result.blk.Replicas {
982                                 if r.Mtime >= bal.MinMtime {
983                                         counter = &s.unref
984                                         break
985                                 }
986                         }
987                         counter.replicas += bs.unneeded
988                         counter.blocks++
989                         counter.bytes += bytes * int64(bs.unneeded)
990                 case bs.unneeded > 0:
991                         s.overrep.replicas += bs.unneeded
992                         s.overrep.blocks++
993                         s.overrep.bytes += bytes * int64(bs.unneeded)
994                 default:
995                         s.justright.replicas += bs.needed
996                         s.justright.blocks++
997                         s.justright.bytes += bytes * int64(bs.needed)
998                 }
999
1000                 if bs.needed > 0 {
1001                         s.desired.replicas += bs.needed
1002                         s.desired.blocks++
1003                         s.desired.bytes += bytes * int64(bs.needed)
1004                 }
1005                 if bs.needed+bs.unneeded > 0 {
1006                         s.current.replicas += bs.needed + bs.unneeded
1007                         s.current.blocks++
1008                         s.current.bytes += bytes * int64(bs.needed+bs.unneeded)
1009                 }
1010
1011                 for len(s.replHistogram) <= bs.needed+bs.unneeded {
1012                         s.replHistogram = append(s.replHistogram, 0)
1013                 }
1014                 s.replHistogram[bs.needed+bs.unneeded]++
1015         }
1016         for _, srv := range bal.KeepServices {
1017                 s.pulls += len(srv.ChangeSet.Pulls)
1018                 s.trashes += len(srv.ChangeSet.Trashes)
1019         }
1020         bal.stats = s
1021         bal.Metrics.UpdateStats(s)
1022 }
1023
1024 // PrintStatistics writes statistics about the computed changes to
1025 // bal.Logger. It should not be called until ComputeChangeSets has
1026 // finished.
1027 func (bal *Balancer) PrintStatistics() {
1028         bal.logf("===")
1029         bal.logf("%s lost (0=have<want)", bal.stats.lost)
1030         bal.logf("%s underreplicated (0<have<want)", bal.stats.underrep)
1031         bal.logf("%s just right (have=want)", bal.stats.justright)
1032         bal.logf("%s overreplicated (have>want>0)", bal.stats.overrep)
1033         bal.logf("%s unreferenced (have>want=0, new)", bal.stats.unref)
1034         bal.logf("%s garbage (have>want=0, old)", bal.stats.garbage)
1035         for _, class := range bal.classes {
1036                 cs := bal.stats.classStats[class]
1037                 bal.logf("===")
1038                 bal.logf("storage class %q: %s needed", class, cs.needed)
1039                 bal.logf("storage class %q: %s unneeded", class, cs.unneeded)
1040                 bal.logf("storage class %q: %s pulling", class, cs.pulling)
1041                 bal.logf("storage class %q: %s unachievable", class, cs.unachievable)
1042         }
1043         bal.logf("===")
1044         bal.logf("%s total commitment (excluding unreferenced)", bal.stats.desired)
1045         bal.logf("%s total usage", bal.stats.current)
1046         bal.logf("===")
1047         for _, srv := range bal.KeepServices {
1048                 bal.logf("%s: %v\n", srv, srv.ChangeSet)
1049         }
1050         bal.logf("===")
1051         bal.printHistogram(60)
1052         bal.logf("===")
1053 }
1054
1055 func (bal *Balancer) printHistogram(hashColumns int) {
1056         bal.logf("Replication level distribution:")
1057         maxCount := 0
1058         for _, count := range bal.stats.replHistogram {
1059                 if maxCount < count {
1060                         maxCount = count
1061                 }
1062         }
1063         hashes := strings.Repeat("#", hashColumns)
1064         countWidth := 1 + int(math.Log10(float64(maxCount+1)))
1065         scaleCount := 10 * float64(hashColumns) / math.Floor(1+10*math.Log10(float64(maxCount+1)))
1066         for repl, count := range bal.stats.replHistogram {
1067                 nHashes := int(scaleCount * math.Log10(float64(count+1)))
1068                 bal.logf("%2d: %*d %s", repl, countWidth, count, hashes[:nHashes])
1069         }
1070 }
1071
1072 // CheckSanityLate checks for configuration and runtime errors after
1073 // GetCurrentState() and ComputeChangeSets() have finished.
1074 //
1075 // If it returns an error, it is dangerous to run any Commit methods.
1076 func (bal *Balancer) CheckSanityLate() error {
1077         if bal.errors != nil {
1078                 for _, err := range bal.errors {
1079                         bal.logf("deferred error: %v", err)
1080                 }
1081                 return fmt.Errorf("cannot proceed safely after deferred errors")
1082         }
1083
1084         if bal.collScanned == 0 {
1085                 return fmt.Errorf("received zero collections")
1086         }
1087
1088         anyDesired := false
1089         bal.BlockStateMap.Apply(func(_ arvados.SizedDigest, blk *BlockState) {
1090                 for _, desired := range blk.Desired {
1091                         if desired > 0 {
1092                                 anyDesired = true
1093                                 break
1094                         }
1095                 }
1096         })
1097         if !anyDesired {
1098                 return fmt.Errorf("zero blocks have desired replication>0")
1099         }
1100
1101         if dr := bal.DefaultReplication; dr < 1 {
1102                 return fmt.Errorf("Default replication (%d) is less than 1", dr)
1103         }
1104
1105         // TODO: no two services have identical indexes
1106         // TODO: no collisions (same md5, different size)
1107         return nil
1108 }
1109
1110 // CommitPulls sends the computed lists of pull requests to the
1111 // keepstore servers. This has the effect of increasing replication of
1112 // existing blocks that are either underreplicated or poorly
1113 // distributed according to rendezvous hashing.
1114 func (bal *Balancer) CommitPulls(ctx context.Context, c *arvados.Client) error {
1115         defer bal.time("send_pull_lists", "wall clock time to send pull lists")()
1116         return bal.commitAsync(c, "send pull list",
1117                 func(srv *KeepService) error {
1118                         return srv.CommitPulls(ctx, c)
1119                 })
1120 }
1121
1122 // CommitTrash sends the computed lists of trash requests to the
1123 // keepstore servers. This has the effect of deleting blocks that are
1124 // overreplicated or unreferenced.
1125 func (bal *Balancer) CommitTrash(ctx context.Context, c *arvados.Client) error {
1126         defer bal.time("send_trash_lists", "wall clock time to send trash lists")()
1127         return bal.commitAsync(c, "send trash list",
1128                 func(srv *KeepService) error {
1129                         return srv.CommitTrash(ctx, c)
1130                 })
1131 }
1132
1133 func (bal *Balancer) commitAsync(c *arvados.Client, label string, f func(srv *KeepService) error) error {
1134         errs := make(chan error)
1135         for _, srv := range bal.KeepServices {
1136                 go func(srv *KeepService) {
1137                         var err error
1138                         defer func() { errs <- err }()
1139                         label := fmt.Sprintf("%s: %v", srv, label)
1140                         err = f(srv)
1141                         if err != nil {
1142                                 err = fmt.Errorf("%s: %v", label, err)
1143                         }
1144                 }(srv)
1145         }
1146         var lastErr error
1147         for range bal.KeepServices {
1148                 if err := <-errs; err != nil {
1149                         bal.logf("%v", err)
1150                         lastErr = err
1151                 }
1152         }
1153         close(errs)
1154         return lastErr
1155 }
1156
1157 func (bal *Balancer) logf(f string, args ...interface{}) {
1158         if bal.Logger != nil {
1159                 bal.Logger.Printf(f, args...)
1160         }
1161 }
1162
1163 func (bal *Balancer) time(name, help string) func() {
1164         observer := bal.Metrics.DurationObserver(name+"_seconds", help)
1165         t0 := time.Now()
1166         bal.Logger.Printf("%s: start", name)
1167         return func() {
1168                 dur := time.Since(t0)
1169                 observer.Observe(dur.Seconds())
1170                 bal.Logger.Printf("%s: took %vs", name, dur.Seconds())
1171         }
1172 }
1173
1174 // Rendezvous hash sort function. Less efficient than sorting on
1175 // precomputed rendezvous hashes, but also rarely used.
1176 func rendezvousLess(i, j string, blkid arvados.SizedDigest) bool {
1177         a := md5.Sum([]byte(string(blkid[:32]) + i))
1178         b := md5.Sum([]byte(string(blkid[:32]) + j))
1179         return bytes.Compare(a[:], b[:]) < 0
1180 }