Merge branch '19414-keep-balance-panic'
[arvados.git] / services / keep-balance / balance.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "context"
10         "crypto/md5"
11         "errors"
12         "fmt"
13         "io"
14         "io/ioutil"
15         "log"
16         "math"
17         "os"
18         "runtime"
19         "sort"
20         "strings"
21         "sync"
22         "sync/atomic"
23         "syscall"
24         "time"
25
26         "git.arvados.org/arvados.git/sdk/go/arvados"
27         "git.arvados.org/arvados.git/sdk/go/keepclient"
28         "github.com/jmoiron/sqlx"
29         "github.com/sirupsen/logrus"
30 )
31
32 // Balancer compares the contents of keepstore servers with the
33 // collections stored in Arvados, and issues pull/trash requests
34 // needed to get (closer to) the optimal data layout.
35 //
36 // In the optimal data layout: every data block referenced by a
37 // collection is replicated at least as many times as desired by the
38 // collection; there are no unreferenced data blocks older than
39 // BlobSignatureTTL; and all N existing replicas of a given data block
40 // are in the N best positions in rendezvous probe order.
41 type Balancer struct {
42         DB      *sqlx.DB
43         Logger  logrus.FieldLogger
44         Dumper  logrus.FieldLogger
45         Metrics *metrics
46
47         LostBlocksFile string
48
49         *BlockStateMap
50         KeepServices       map[string]*KeepService
51         DefaultReplication int
52         MinMtime           int64
53
54         classes       []string
55         mounts        int
56         mountsByClass map[string]map[*KeepMount]bool
57         collScanned   int64
58         serviceRoots  map[string]string
59         errors        []error
60         stats         balancerStats
61         mutex         sync.Mutex
62         lostBlocks    io.Writer
63 }
64
65 // Run performs a balance operation using the given config and
66 // runOptions, and returns RunOptions suitable for passing to a
67 // subsequent balance operation.
68 //
69 // Run should only be called once on a given Balancer object.
70 //
71 // Typical usage:
72 //
73 //   runOptions, err = (&Balancer{}).Run(config, runOptions)
74 func (bal *Balancer) Run(client *arvados.Client, cluster *arvados.Cluster, runOptions RunOptions) (nextRunOptions RunOptions, err error) {
75         nextRunOptions = runOptions
76
77         defer bal.time("sweep", "wall clock time to run one full sweep")()
78
79         ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(cluster.Collections.BalanceTimeout.Duration()))
80         defer cancel()
81
82         var lbFile *os.File
83         if bal.LostBlocksFile != "" {
84                 tmpfn := bal.LostBlocksFile + ".tmp"
85                 lbFile, err = os.OpenFile(tmpfn, os.O_CREATE|os.O_WRONLY, 0777)
86                 if err != nil {
87                         return
88                 }
89                 defer lbFile.Close()
90                 err = syscall.Flock(int(lbFile.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
91                 if err != nil {
92                         return
93                 }
94                 defer func() {
95                         // Remove the tempfile only if we didn't get
96                         // as far as successfully renaming it.
97                         if lbFile != nil {
98                                 os.Remove(tmpfn)
99                         }
100                 }()
101                 bal.lostBlocks = lbFile
102         } else {
103                 bal.lostBlocks = ioutil.Discard
104         }
105
106         err = bal.DiscoverKeepServices(client)
107         if err != nil {
108                 return
109         }
110
111         for _, srv := range bal.KeepServices {
112                 err = srv.discoverMounts(client)
113                 if err != nil {
114                         return
115                 }
116         }
117         bal.cleanupMounts()
118
119         if err = bal.CheckSanityEarly(client); err != nil {
120                 return
121         }
122
123         // On a big site, indexing and sending trash/pull lists can
124         // take much longer than the usual 5 minute client
125         // timeout. From here on, we rely on the context deadline
126         // instead, aborting the entire operation if any part takes
127         // too long.
128         client.Timeout = 0
129
130         rs := bal.rendezvousState()
131         if runOptions.CommitTrash && rs != runOptions.SafeRendezvousState {
132                 if runOptions.SafeRendezvousState != "" {
133                         bal.logf("notice: KeepServices list has changed since last run")
134                 }
135                 bal.logf("clearing existing trash lists, in case the new rendezvous order differs from previous run")
136                 if err = bal.ClearTrashLists(ctx, client); err != nil {
137                         return
138                 }
139                 // The current rendezvous state becomes "safe" (i.e.,
140                 // OK to compute changes for that state without
141                 // clearing existing trash lists) only now, after we
142                 // succeed in clearing existing trash lists.
143                 nextRunOptions.SafeRendezvousState = rs
144         }
145
146         if err = bal.GetCurrentState(ctx, client, cluster.Collections.BalanceCollectionBatch, cluster.Collections.BalanceCollectionBuffers); err != nil {
147                 return
148         }
149         bal.ComputeChangeSets()
150         bal.PrintStatistics()
151         if err = bal.CheckSanityLate(); err != nil {
152                 return
153         }
154         if lbFile != nil {
155                 err = lbFile.Sync()
156                 if err != nil {
157                         return
158                 }
159                 err = os.Rename(bal.LostBlocksFile+".tmp", bal.LostBlocksFile)
160                 if err != nil {
161                         return
162                 }
163                 lbFile = nil
164         }
165         if runOptions.CommitPulls {
166                 err = bal.CommitPulls(ctx, client)
167                 if err != nil {
168                         // Skip trash if we can't pull. (Too cautious?)
169                         return
170                 }
171         }
172         if runOptions.CommitTrash {
173                 err = bal.CommitTrash(ctx, client)
174                 if err != nil {
175                         return
176                 }
177         }
178         if runOptions.CommitConfirmedFields {
179                 err = bal.updateCollections(ctx, client, cluster)
180                 if err != nil {
181                         return
182                 }
183         }
184         return
185 }
186
187 // SetKeepServices sets the list of KeepServices to operate on.
188 func (bal *Balancer) SetKeepServices(srvList arvados.KeepServiceList) error {
189         bal.KeepServices = make(map[string]*KeepService)
190         for _, srv := range srvList.Items {
191                 bal.KeepServices[srv.UUID] = &KeepService{
192                         KeepService: srv,
193                         ChangeSet:   &ChangeSet{},
194                 }
195         }
196         return nil
197 }
198
199 // DiscoverKeepServices sets the list of KeepServices by calling the
200 // API to get a list of all services, and selecting the ones whose
201 // ServiceType is "disk"
202 func (bal *Balancer) DiscoverKeepServices(c *arvados.Client) error {
203         bal.KeepServices = make(map[string]*KeepService)
204         return c.EachKeepService(func(srv arvados.KeepService) error {
205                 if srv.ServiceType == "disk" {
206                         bal.KeepServices[srv.UUID] = &KeepService{
207                                 KeepService: srv,
208                                 ChangeSet:   &ChangeSet{},
209                         }
210                 } else {
211                         bal.logf("skipping %v with service type %q", srv.UUID, srv.ServiceType)
212                 }
213                 return nil
214         })
215 }
216
217 func (bal *Balancer) cleanupMounts() {
218         rwdev := map[string]*KeepService{}
219         for _, srv := range bal.KeepServices {
220                 for _, mnt := range srv.mounts {
221                         if !mnt.ReadOnly {
222                                 rwdev[mnt.UUID] = srv
223                         }
224                 }
225         }
226         // Drop the readonly mounts whose device is mounted RW
227         // elsewhere.
228         for _, srv := range bal.KeepServices {
229                 var dedup []*KeepMount
230                 for _, mnt := range srv.mounts {
231                         if mnt.ReadOnly && rwdev[mnt.UUID] != nil {
232                                 bal.logf("skipping srv %s readonly mount %q because same volume is mounted read-write on srv %s", srv, mnt.UUID, rwdev[mnt.UUID])
233                         } else {
234                                 dedup = append(dedup, mnt)
235                         }
236                 }
237                 srv.mounts = dedup
238         }
239         for _, srv := range bal.KeepServices {
240                 for _, mnt := range srv.mounts {
241                         if mnt.Replication <= 0 {
242                                 log.Printf("%s: mount %s reports replication=%d, using replication=1", srv, mnt.UUID, mnt.Replication)
243                                 mnt.Replication = 1
244                         }
245                 }
246         }
247 }
248
249 // CheckSanityEarly checks for configuration and runtime errors that
250 // can be detected before GetCurrentState() and ComputeChangeSets()
251 // are called.
252 //
253 // If it returns an error, it is pointless to run GetCurrentState or
254 // ComputeChangeSets: after doing so, the statistics would be
255 // meaningless and it would be dangerous to run any Commit methods.
256 func (bal *Balancer) CheckSanityEarly(c *arvados.Client) error {
257         u, err := c.CurrentUser()
258         if err != nil {
259                 return fmt.Errorf("CurrentUser(): %v", err)
260         }
261         if !u.IsActive || !u.IsAdmin {
262                 return fmt.Errorf("current user (%s) is not an active admin user", u.UUID)
263         }
264         for _, srv := range bal.KeepServices {
265                 if srv.ServiceType == "proxy" {
266                         return fmt.Errorf("config error: %s: proxy servers cannot be balanced", srv)
267                 }
268         }
269
270         mountProblem := false
271         type deviceMount struct {
272                 srv *KeepService
273                 mnt *KeepMount
274         }
275         deviceMounted := map[string]deviceMount{} // DeviceID -> mount
276         for _, srv := range bal.KeepServices {
277                 for _, mnt := range srv.mounts {
278                         if first, dup := deviceMounted[mnt.DeviceID]; dup && first.mnt.UUID != mnt.UUID && mnt.DeviceID != "" {
279                                 bal.logf("config error: device %s is mounted with multiple volume UUIDs: %s on %s, and %s on %s",
280                                         mnt.DeviceID,
281                                         first.mnt.UUID, first.srv,
282                                         mnt.UUID, srv)
283                                 mountProblem = true
284                                 continue
285                         }
286                         deviceMounted[mnt.DeviceID] = deviceMount{srv, mnt}
287                 }
288         }
289         if mountProblem {
290                 return errors.New("cannot continue with config errors (see above)")
291         }
292
293         var checkPage arvados.CollectionList
294         if err = c.RequestAndDecode(&checkPage, "GET", "arvados/v1/collections", nil, arvados.ResourceListParams{
295                 Limit:              new(int),
296                 Count:              "exact",
297                 IncludeTrash:       true,
298                 IncludeOldVersions: true,
299                 Filters: []arvados.Filter{{
300                         Attr:     "modified_at",
301                         Operator: "=",
302                         Operand:  nil,
303                 }},
304         }); err != nil {
305                 return err
306         } else if n := checkPage.ItemsAvailable; n > 0 {
307                 return fmt.Errorf("%d collections exist with null modified_at; cannot fetch reliably", n)
308         }
309
310         return nil
311 }
312
313 // rendezvousState returns a fingerprint (e.g., a sorted list of
314 // UUID+host+port) of the current set of keep services.
315 func (bal *Balancer) rendezvousState() string {
316         srvs := make([]string, 0, len(bal.KeepServices))
317         for _, srv := range bal.KeepServices {
318                 srvs = append(srvs, srv.String())
319         }
320         sort.Strings(srvs)
321         return strings.Join(srvs, "; ")
322 }
323
324 // ClearTrashLists sends an empty trash list to each keep
325 // service. Calling this before GetCurrentState avoids races.
326 //
327 // When a block appears in an index, we assume that replica will still
328 // exist after we delete other replicas on other servers. However,
329 // it's possible that a previous rebalancing operation made different
330 // decisions (e.g., servers were added/removed, and rendezvous order
331 // changed). In this case, the replica might already be on that
332 // server's trash list, and it might be deleted before we send a
333 // replacement trash list.
334 //
335 // We avoid this problem if we clear all trash lists before getting
336 // indexes. (We also assume there is only one rebalancing process
337 // running at a time.)
338 func (bal *Balancer) ClearTrashLists(ctx context.Context, c *arvados.Client) error {
339         for _, srv := range bal.KeepServices {
340                 srv.ChangeSet = &ChangeSet{}
341         }
342         return bal.CommitTrash(ctx, c)
343 }
344
345 // GetCurrentState determines the current replication state, and the
346 // desired replication level, for every block that is either
347 // retrievable or referenced.
348 //
349 // It determines the current replication state by reading the block index
350 // from every known Keep service.
351 //
352 // It determines the desired replication level by retrieving all
353 // collection manifests in the database (API server).
354 //
355 // It encodes the resulting information in BlockStateMap.
356 func (bal *Balancer) GetCurrentState(ctx context.Context, c *arvados.Client, pageSize, bufs int) error {
357         ctx, cancel := context.WithCancel(ctx)
358         defer cancel()
359
360         defer bal.time("get_state", "wall clock time to get current state")()
361         bal.BlockStateMap = NewBlockStateMap()
362
363         dd, err := c.DiscoveryDocument()
364         if err != nil {
365                 return err
366         }
367         bal.DefaultReplication = dd.DefaultCollectionReplication
368         bal.MinMtime = time.Now().UnixNano() - dd.BlobSignatureTTL*1e9
369
370         errs := make(chan error, 1)
371         wg := sync.WaitGroup{}
372
373         // When a device is mounted more than once, we will get its
374         // index only once, and call AddReplicas on all of the mounts.
375         // equivMount keys are the mounts that will be indexed, and
376         // each value is a list of mounts to apply the received index
377         // to.
378         equivMount := map[*KeepMount][]*KeepMount{}
379         // deviceMount maps each device ID to the one mount that will
380         // be indexed for that device.
381         deviceMount := map[string]*KeepMount{}
382         for _, srv := range bal.KeepServices {
383                 for _, mnt := range srv.mounts {
384                         equiv := deviceMount[mnt.UUID]
385                         if equiv == nil {
386                                 equiv = mnt
387                                 deviceMount[mnt.UUID] = equiv
388                         }
389                         equivMount[equiv] = append(equivMount[equiv], mnt)
390                 }
391         }
392
393         // Start one goroutine for each (non-redundant) mount:
394         // retrieve the index, and add the returned blocks to
395         // BlockStateMap.
396         for _, mounts := range equivMount {
397                 wg.Add(1)
398                 go func(mounts []*KeepMount) {
399                         defer wg.Done()
400                         bal.logf("mount %s: retrieve index from %s", mounts[0], mounts[0].KeepService)
401                         idx, err := mounts[0].KeepService.IndexMount(ctx, c, mounts[0].UUID, "")
402                         if err != nil {
403                                 select {
404                                 case errs <- fmt.Errorf("%s: retrieve index: %v", mounts[0], err):
405                                 default:
406                                 }
407                                 cancel()
408                                 return
409                         }
410                         if len(errs) > 0 {
411                                 // Some other goroutine encountered an
412                                 // error -- any further effort here
413                                 // will be wasted.
414                                 return
415                         }
416                         for _, mount := range mounts {
417                                 bal.logf("%s: add %d entries to map", mount, len(idx))
418                                 bal.BlockStateMap.AddReplicas(mount, idx)
419                                 bal.logf("%s: added %d entries to map at %dx (%d replicas)", mount, len(idx), mount.Replication, len(idx)*mount.Replication)
420                         }
421                         bal.logf("mount %s: index done", mounts[0])
422                 }(mounts)
423         }
424
425         collQ := make(chan arvados.Collection, bufs)
426
427         // Retrieve all collections from the database and send them to
428         // collQ.
429         wg.Add(1)
430         go func() {
431                 defer wg.Done()
432                 err = EachCollection(ctx, bal.DB, c,
433                         func(coll arvados.Collection) error {
434                                 collQ <- coll
435                                 if len(errs) > 0 {
436                                         // some other GetCurrentState
437                                         // error happened: no point
438                                         // getting any more
439                                         // collections.
440                                         return fmt.Errorf("")
441                                 }
442                                 return nil
443                         }, func(done, total int) {
444                                 bal.logf("collections: %d/%d", done, total)
445                         })
446                 close(collQ)
447                 if err != nil {
448                         select {
449                         case errs <- err:
450                         default:
451                         }
452                         cancel()
453                 }
454         }()
455
456         // Parse manifests from collQ and pass the block hashes to
457         // BlockStateMap to track desired replication.
458         for i := 0; i < runtime.NumCPU(); i++ {
459                 wg.Add(1)
460                 go func() {
461                         defer wg.Done()
462                         for coll := range collQ {
463                                 err := bal.addCollection(coll)
464                                 if err != nil || len(errs) > 0 {
465                                         select {
466                                         case errs <- err:
467                                         default:
468                                         }
469                                         cancel()
470                                         continue
471                                 }
472                                 atomic.AddInt64(&bal.collScanned, 1)
473                         }
474                 }()
475         }
476
477         wg.Wait()
478         if len(errs) > 0 {
479                 return <-errs
480         }
481         return nil
482 }
483
484 func (bal *Balancer) addCollection(coll arvados.Collection) error {
485         blkids, err := coll.SizedDigests()
486         if err != nil {
487                 return fmt.Errorf("%v: %v", coll.UUID, err)
488         }
489         repl := bal.DefaultReplication
490         if coll.ReplicationDesired != nil {
491                 repl = *coll.ReplicationDesired
492         }
493         bal.Logger.Debugf("%v: %d blocks x%d", coll.UUID, len(blkids), repl)
494         // Pass pdh to IncreaseDesired only if LostBlocksFile is being
495         // written -- otherwise it's just a waste of memory.
496         pdh := ""
497         if bal.LostBlocksFile != "" {
498                 pdh = coll.PortableDataHash
499         }
500         bal.BlockStateMap.IncreaseDesired(pdh, coll.StorageClassesDesired, repl, blkids)
501         return nil
502 }
503
504 // ComputeChangeSets compares, for each known block, the current and
505 // desired replication states. If it is possible to get closer to the
506 // desired state by copying or deleting blocks, it adds those changes
507 // to the relevant KeepServices' ChangeSets.
508 //
509 // It does not actually apply any of the computed changes.
510 func (bal *Balancer) ComputeChangeSets() {
511         // This just calls balanceBlock() once for each block, using a
512         // pool of worker goroutines.
513         defer bal.time("changeset_compute", "wall clock time to compute changesets")()
514         bal.setupLookupTables()
515
516         type balanceTask struct {
517                 blkid arvados.SizedDigest
518                 blk   *BlockState
519         }
520         workers := runtime.GOMAXPROCS(-1)
521         todo := make(chan balanceTask, workers)
522         go func() {
523                 bal.BlockStateMap.Apply(func(blkid arvados.SizedDigest, blk *BlockState) {
524                         todo <- balanceTask{
525                                 blkid: blkid,
526                                 blk:   blk,
527                         }
528                 })
529                 close(todo)
530         }()
531         results := make(chan balanceResult, workers)
532         go func() {
533                 var wg sync.WaitGroup
534                 for i := 0; i < workers; i++ {
535                         wg.Add(1)
536                         go func() {
537                                 for work := range todo {
538                                         results <- bal.balanceBlock(work.blkid, work.blk)
539                                 }
540                                 wg.Done()
541                         }()
542                 }
543                 wg.Wait()
544                 close(results)
545         }()
546         bal.collectStatistics(results)
547 }
548
549 func (bal *Balancer) setupLookupTables() {
550         bal.serviceRoots = make(map[string]string)
551         bal.classes = defaultClasses
552         bal.mountsByClass = map[string]map[*KeepMount]bool{"default": {}}
553         bal.mounts = 0
554         for _, srv := range bal.KeepServices {
555                 bal.serviceRoots[srv.UUID] = srv.UUID
556                 for _, mnt := range srv.mounts {
557                         bal.mounts++
558
559                         // All mounts on a read-only service are
560                         // effectively read-only.
561                         mnt.ReadOnly = mnt.ReadOnly || srv.ReadOnly
562
563                         for class := range mnt.StorageClasses {
564                                 if mbc := bal.mountsByClass[class]; mbc == nil {
565                                         bal.classes = append(bal.classes, class)
566                                         bal.mountsByClass[class] = map[*KeepMount]bool{mnt: true}
567                                 } else {
568                                         mbc[mnt] = true
569                                 }
570                         }
571                 }
572         }
573         // Consider classes in lexicographic order to avoid flapping
574         // between balancing runs.  The outcome of the "prefer a mount
575         // we're already planning to use for a different storage
576         // class" case in balanceBlock depends on the order classes
577         // are considered.
578         sort.Strings(bal.classes)
579 }
580
581 const (
582         changeStay = iota
583         changePull
584         changeTrash
585         changeNone
586 )
587
588 var changeName = map[int]string{
589         changeStay:  "stay",
590         changePull:  "pull",
591         changeTrash: "trash",
592         changeNone:  "none",
593 }
594
595 type balancedBlockState struct {
596         needed       int
597         unneeded     int
598         pulling      int
599         unachievable bool
600 }
601
602 type balanceResult struct {
603         blk        *BlockState
604         blkid      arvados.SizedDigest
605         lost       bool
606         blockState balancedBlockState
607         classState map[string]balancedBlockState
608 }
609
610 type slot struct {
611         mnt  *KeepMount // never nil
612         repl *Replica   // replica already stored here (or nil)
613         want bool       // we should pull/leave a replica here
614 }
615
616 // balanceBlock compares current state to desired state for a single
617 // block, and makes the appropriate ChangeSet calls.
618 func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) balanceResult {
619         bal.Logger.Debugf("balanceBlock: %v %+v", blkid, blk)
620
621         // Build a list of all slots (one per mounted volume).
622         slots := make([]slot, 0, bal.mounts)
623         for _, srv := range bal.KeepServices {
624                 for _, mnt := range srv.mounts {
625                         var repl *Replica
626                         for r := range blk.Replicas {
627                                 if blk.Replicas[r].KeepMount == mnt {
628                                         repl = &blk.Replicas[r]
629                                 }
630                         }
631                         // Initial value of "want" is "have, and can't
632                         // delete". These untrashable replicas get
633                         // prioritized when sorting slots: otherwise,
634                         // non-optimal readonly copies would cause us
635                         // to overreplicate.
636                         slots = append(slots, slot{
637                                 mnt:  mnt,
638                                 repl: repl,
639                                 want: repl != nil && mnt.ReadOnly,
640                         })
641                 }
642         }
643
644         uuids := keepclient.NewRootSorter(bal.serviceRoots, string(blkid[:32])).GetSortedRoots()
645         srvRendezvous := make(map[*KeepService]int, len(uuids))
646         for i, uuid := range uuids {
647                 srv := bal.KeepServices[uuid]
648                 srvRendezvous[srv] = i
649         }
650
651         // Below we set underreplicated=true if we find any storage
652         // class that's currently underreplicated -- in that case we
653         // won't want to trash any replicas.
654         underreplicated := false
655
656         unsafeToDelete := make(map[int64]bool, len(slots))
657         for _, class := range bal.classes {
658                 desired := blk.Desired[class]
659                 if desired == 0 {
660                         continue
661                 }
662
663                 // Sort the slots by desirability.
664                 sort.Slice(slots, func(i, j int) bool {
665                         si, sj := slots[i], slots[j]
666                         if classi, classj := bal.mountsByClass[class][si.mnt], bal.mountsByClass[class][sj.mnt]; classi != classj {
667                                 // Prefer a mount that satisfies the
668                                 // desired class.
669                                 return bal.mountsByClass[class][si.mnt]
670                         } else if si.want != sj.want {
671                                 // Prefer a mount that will have a
672                                 // replica no matter what we do here
673                                 // -- either because it already has an
674                                 // untrashable replica, or because we
675                                 // already need it to satisfy a
676                                 // different storage class.
677                                 return si.want
678                         } else if orderi, orderj := srvRendezvous[si.mnt.KeepService], srvRendezvous[sj.mnt.KeepService]; orderi != orderj {
679                                 // Prefer a better rendezvous
680                                 // position.
681                                 return orderi < orderj
682                         } else if repli, replj := si.repl != nil, sj.repl != nil; repli != replj {
683                                 // Prefer a mount that already has a
684                                 // replica.
685                                 return repli
686                         } else {
687                                 // If pull/trash turns out to be
688                                 // needed, distribute the
689                                 // new/remaining replicas uniformly
690                                 // across qualifying mounts on a given
691                                 // server.
692                                 return rendezvousLess(si.mnt.UUID, sj.mnt.UUID, blkid)
693                         }
694                 })
695
696                 // Servers/mounts/devices (with or without existing
697                 // replicas) that are part of the best achievable
698                 // layout for this storage class.
699                 wantSrv := map[*KeepService]bool{}
700                 wantMnt := map[*KeepMount]bool{}
701                 wantDev := map[string]bool{}
702                 // Positions (with existing replicas) that have been
703                 // protected (via unsafeToDelete) to ensure we don't
704                 // reduce replication below desired level when
705                 // trashing replicas that aren't optimal positions for
706                 // any storage class.
707                 protMnt := map[*KeepMount]bool{}
708                 // Replication planned so far (corresponds to wantMnt).
709                 replWant := 0
710                 // Protected replication (corresponds to protMnt).
711                 replProt := 0
712
713                 // trySlot tries using a slot to meet requirements,
714                 // and returns true if all requirements are met.
715                 trySlot := func(i int) bool {
716                         slot := slots[i]
717                         if wantMnt[slot.mnt] || wantDev[slot.mnt.UUID] {
718                                 // Already allocated a replica to this
719                                 // backend device, possibly on a
720                                 // different server.
721                                 return false
722                         }
723                         if replProt < desired && slot.repl != nil && !protMnt[slot.mnt] {
724                                 unsafeToDelete[slot.repl.Mtime] = true
725                                 protMnt[slot.mnt] = true
726                                 replProt += slot.mnt.Replication
727                         }
728                         if replWant < desired && (slot.repl != nil || !slot.mnt.ReadOnly) {
729                                 slots[i].want = true
730                                 wantSrv[slot.mnt.KeepService] = true
731                                 wantMnt[slot.mnt] = true
732                                 wantDev[slot.mnt.UUID] = true
733                                 replWant += slot.mnt.Replication
734                         }
735                         return replProt >= desired && replWant >= desired
736                 }
737
738                 // First try to achieve desired replication without
739                 // using the same server twice.
740                 done := false
741                 for i := 0; i < len(slots) && !done; i++ {
742                         if !wantSrv[slots[i].mnt.KeepService] {
743                                 done = trySlot(i)
744                         }
745                 }
746
747                 // If that didn't suffice, do another pass without the
748                 // "distinct services" restriction. (Achieving the
749                 // desired volume replication on fewer than the
750                 // desired number of services is better than
751                 // underreplicating.)
752                 for i := 0; i < len(slots) && !done; i++ {
753                         done = trySlot(i)
754                 }
755
756                 if !underreplicated {
757                         safe := 0
758                         for _, slot := range slots {
759                                 if slot.repl == nil || !bal.mountsByClass[class][slot.mnt] {
760                                         continue
761                                 }
762                                 if safe += slot.mnt.Replication; safe >= desired {
763                                         break
764                                 }
765                         }
766                         underreplicated = safe < desired
767                 }
768
769                 // Avoid deleting wanted replicas from devices that
770                 // are mounted on multiple servers -- even if they
771                 // haven't already been added to unsafeToDelete
772                 // because the servers report different Mtimes.
773                 for _, slot := range slots {
774                         if slot.repl != nil && wantDev[slot.mnt.UUID] {
775                                 unsafeToDelete[slot.repl.Mtime] = true
776                         }
777                 }
778         }
779
780         // TODO: If multiple replicas are trashable, prefer the oldest
781         // replica that doesn't have a timestamp collision with
782         // others.
783
784         for i, slot := range slots {
785                 // Don't trash (1) any replicas of an underreplicated
786                 // block, even if they're in the wrong positions, or
787                 // (2) any replicas whose Mtimes are identical to
788                 // needed replicas (in case we're really seeing the
789                 // same copy via different mounts).
790                 if slot.repl != nil && (underreplicated || unsafeToDelete[slot.repl.Mtime]) {
791                         slots[i].want = true
792                 }
793         }
794
795         classState := make(map[string]balancedBlockState, len(bal.classes))
796         for _, class := range bal.classes {
797                 classState[class] = computeBlockState(slots, bal.mountsByClass[class], len(blk.Replicas), blk.Desired[class])
798         }
799         blockState := computeBlockState(slots, nil, len(blk.Replicas), 0)
800
801         var lost bool
802         var changes []string
803         for _, slot := range slots {
804                 // TODO: request a Touch if Mtime is duplicated.
805                 var change int
806                 switch {
807                 case !slot.want && slot.repl != nil && slot.repl.Mtime < bal.MinMtime:
808                         slot.mnt.KeepService.AddTrash(Trash{
809                                 SizedDigest: blkid,
810                                 Mtime:       slot.repl.Mtime,
811                                 From:        slot.mnt,
812                         })
813                         change = changeTrash
814                 case slot.repl == nil && slot.want && len(blk.Replicas) == 0:
815                         lost = true
816                         change = changeNone
817                 case slot.repl == nil && slot.want && !slot.mnt.ReadOnly:
818                         slot.mnt.KeepService.AddPull(Pull{
819                                 SizedDigest: blkid,
820                                 From:        blk.Replicas[0].KeepMount.KeepService,
821                                 To:          slot.mnt,
822                         })
823                         change = changePull
824                 case slot.repl != nil:
825                         change = changeStay
826                 default:
827                         change = changeNone
828                 }
829                 if bal.Dumper != nil {
830                         var mtime int64
831                         if slot.repl != nil {
832                                 mtime = slot.repl.Mtime
833                         }
834                         srv := slot.mnt.KeepService
835                         changes = append(changes, fmt.Sprintf("%s:%d/%s=%s,%d", srv.ServiceHost, srv.ServicePort, slot.mnt.UUID, changeName[change], mtime))
836                 }
837         }
838         if bal.Dumper != nil {
839                 bal.Dumper.Printf("%s refs=%d needed=%d unneeded=%d pulling=%v %v %v", blkid, blk.RefCount, blockState.needed, blockState.unneeded, blockState.pulling, blk.Desired, changes)
840         }
841         return balanceResult{
842                 blk:        blk,
843                 blkid:      blkid,
844                 lost:       lost,
845                 blockState: blockState,
846                 classState: classState,
847         }
848 }
849
850 func computeBlockState(slots []slot, onlyCount map[*KeepMount]bool, have, needRepl int) (bbs balancedBlockState) {
851         repl := 0
852         countedDev := map[string]bool{}
853         for _, slot := range slots {
854                 if onlyCount != nil && !onlyCount[slot.mnt] {
855                         continue
856                 }
857                 if countedDev[slot.mnt.UUID] {
858                         continue
859                 }
860                 switch {
861                 case slot.repl != nil && slot.want:
862                         bbs.needed++
863                         repl += slot.mnt.Replication
864                 case slot.repl != nil && !slot.want:
865                         bbs.unneeded++
866                         repl += slot.mnt.Replication
867                 case slot.repl == nil && slot.want && have > 0:
868                         bbs.pulling++
869                         repl += slot.mnt.Replication
870                 }
871                 countedDev[slot.mnt.UUID] = true
872         }
873         if repl < needRepl {
874                 bbs.unachievable = true
875         }
876         return
877 }
878
879 type blocksNBytes struct {
880         replicas int
881         blocks   int
882         bytes    int64
883 }
884
885 func (bb blocksNBytes) String() string {
886         return fmt.Sprintf("%d replicas (%d blocks, %d bytes)", bb.replicas, bb.blocks, bb.bytes)
887 }
888
889 type replicationStats struct {
890         needed       blocksNBytes
891         unneeded     blocksNBytes
892         pulling      blocksNBytes
893         unachievable blocksNBytes
894 }
895
896 type balancerStats struct {
897         lost          blocksNBytes
898         overrep       blocksNBytes
899         unref         blocksNBytes
900         garbage       blocksNBytes
901         underrep      blocksNBytes
902         unachievable  blocksNBytes
903         justright     blocksNBytes
904         desired       blocksNBytes
905         current       blocksNBytes
906         pulls         int
907         trashes       int
908         replHistogram []int
909         classStats    map[string]replicationStats
910
911         // collectionBytes / collectionBlockBytes = deduplication ratio
912         collectionBytes      int64 // sum(bytes in referenced blocks) across all collections
913         collectionBlockBytes int64 // sum(block size) across all blocks referenced by collections
914         collectionBlockRefs  int64 // sum(number of blocks referenced) across all collections
915         collectionBlocks     int64 // number of blocks referenced by any collection
916 }
917
918 func (s *balancerStats) dedupByteRatio() float64 {
919         if s.collectionBlockBytes == 0 {
920                 return 0
921         }
922         return float64(s.collectionBytes) / float64(s.collectionBlockBytes)
923 }
924
925 func (s *balancerStats) dedupBlockRatio() float64 {
926         if s.collectionBlocks == 0 {
927                 return 0
928         }
929         return float64(s.collectionBlockRefs) / float64(s.collectionBlocks)
930 }
931
932 func (bal *Balancer) collectStatistics(results <-chan balanceResult) {
933         var s balancerStats
934         s.replHistogram = make([]int, 2)
935         s.classStats = make(map[string]replicationStats, len(bal.classes))
936         for result := range results {
937                 bytes := result.blkid.Size()
938
939                 if rc := int64(result.blk.RefCount); rc > 0 {
940                         s.collectionBytes += rc * bytes
941                         s.collectionBlockBytes += bytes
942                         s.collectionBlockRefs += rc
943                         s.collectionBlocks++
944                 }
945
946                 for class, state := range result.classState {
947                         cs := s.classStats[class]
948                         if state.unachievable {
949                                 cs.unachievable.replicas++
950                                 cs.unachievable.blocks++
951                                 cs.unachievable.bytes += bytes
952                         }
953                         if state.needed > 0 {
954                                 cs.needed.replicas += state.needed
955                                 cs.needed.blocks++
956                                 cs.needed.bytes += bytes * int64(state.needed)
957                         }
958                         if state.unneeded > 0 {
959                                 cs.unneeded.replicas += state.unneeded
960                                 cs.unneeded.blocks++
961                                 cs.unneeded.bytes += bytes * int64(state.unneeded)
962                         }
963                         if state.pulling > 0 {
964                                 cs.pulling.replicas += state.pulling
965                                 cs.pulling.blocks++
966                                 cs.pulling.bytes += bytes * int64(state.pulling)
967                         }
968                         s.classStats[class] = cs
969                 }
970
971                 bs := result.blockState
972                 switch {
973                 case result.lost:
974                         s.lost.replicas++
975                         s.lost.blocks++
976                         s.lost.bytes += bytes
977                         fmt.Fprintf(bal.lostBlocks, "%s", strings.SplitN(string(result.blkid), "+", 2)[0])
978                         for pdh := range result.blk.Refs {
979                                 fmt.Fprintf(bal.lostBlocks, " %s", pdh)
980                         }
981                         fmt.Fprint(bal.lostBlocks, "\n")
982                 case bs.pulling > 0:
983                         s.underrep.replicas += bs.pulling
984                         s.underrep.blocks++
985                         s.underrep.bytes += bytes * int64(bs.pulling)
986                 case bs.unachievable:
987                         s.underrep.replicas++
988                         s.underrep.blocks++
989                         s.underrep.bytes += bytes
990                 case bs.unneeded > 0 && bs.needed == 0:
991                         // Count as "garbage" if all replicas are old
992                         // enough to trash, otherwise count as
993                         // "unref".
994                         counter := &s.garbage
995                         for _, r := range result.blk.Replicas {
996                                 if r.Mtime >= bal.MinMtime {
997                                         counter = &s.unref
998                                         break
999                                 }
1000                         }
1001                         counter.replicas += bs.unneeded
1002                         counter.blocks++
1003                         counter.bytes += bytes * int64(bs.unneeded)
1004                 case bs.unneeded > 0:
1005                         s.overrep.replicas += bs.unneeded
1006                         s.overrep.blocks++
1007                         s.overrep.bytes += bytes * int64(bs.unneeded)
1008                 default:
1009                         s.justright.replicas += bs.needed
1010                         s.justright.blocks++
1011                         s.justright.bytes += bytes * int64(bs.needed)
1012                 }
1013
1014                 if bs.needed > 0 {
1015                         s.desired.replicas += bs.needed
1016                         s.desired.blocks++
1017                         s.desired.bytes += bytes * int64(bs.needed)
1018                 }
1019                 if bs.needed+bs.unneeded > 0 {
1020                         s.current.replicas += bs.needed + bs.unneeded
1021                         s.current.blocks++
1022                         s.current.bytes += bytes * int64(bs.needed+bs.unneeded)
1023                 }
1024
1025                 for len(s.replHistogram) <= bs.needed+bs.unneeded {
1026                         s.replHistogram = append(s.replHistogram, 0)
1027                 }
1028                 s.replHistogram[bs.needed+bs.unneeded]++
1029         }
1030         for _, srv := range bal.KeepServices {
1031                 s.pulls += len(srv.ChangeSet.Pulls)
1032                 s.trashes += len(srv.ChangeSet.Trashes)
1033         }
1034         bal.stats = s
1035         bal.Metrics.UpdateStats(s)
1036 }
1037
1038 // PrintStatistics writes statistics about the computed changes to
1039 // bal.Logger. It should not be called until ComputeChangeSets has
1040 // finished.
1041 func (bal *Balancer) PrintStatistics() {
1042         bal.logf("===")
1043         bal.logf("%s lost (0=have<want)", bal.stats.lost)
1044         bal.logf("%s underreplicated (0<have<want)", bal.stats.underrep)
1045         bal.logf("%s just right (have=want)", bal.stats.justright)
1046         bal.logf("%s overreplicated (have>want>0)", bal.stats.overrep)
1047         bal.logf("%s unreferenced (have>want=0, new)", bal.stats.unref)
1048         bal.logf("%s garbage (have>want=0, old)", bal.stats.garbage)
1049         for _, class := range bal.classes {
1050                 cs := bal.stats.classStats[class]
1051                 bal.logf("===")
1052                 bal.logf("storage class %q: %s needed", class, cs.needed)
1053                 bal.logf("storage class %q: %s unneeded", class, cs.unneeded)
1054                 bal.logf("storage class %q: %s pulling", class, cs.pulling)
1055                 bal.logf("storage class %q: %s unachievable", class, cs.unachievable)
1056         }
1057         bal.logf("===")
1058         bal.logf("%s total commitment (excluding unreferenced)", bal.stats.desired)
1059         bal.logf("%s total usage", bal.stats.current)
1060         bal.logf("===")
1061         for _, srv := range bal.KeepServices {
1062                 bal.logf("%s: %v\n", srv, srv.ChangeSet)
1063         }
1064         bal.logf("===")
1065         bal.printHistogram(60)
1066         bal.logf("===")
1067 }
1068
1069 func (bal *Balancer) printHistogram(hashColumns int) {
1070         bal.logf("Replication level distribution:")
1071         maxCount := 0
1072         for _, count := range bal.stats.replHistogram {
1073                 if maxCount < count {
1074                         maxCount = count
1075                 }
1076         }
1077         hashes := strings.Repeat("#", hashColumns)
1078         countWidth := 1 + int(math.Log10(float64(maxCount+1)))
1079         scaleCount := 10 * float64(hashColumns) / math.Floor(1+10*math.Log10(float64(maxCount+1)))
1080         for repl, count := range bal.stats.replHistogram {
1081                 nHashes := int(scaleCount * math.Log10(float64(count+1)))
1082                 bal.logf("%2d: %*d %s", repl, countWidth, count, hashes[:nHashes])
1083         }
1084 }
1085
1086 // CheckSanityLate checks for configuration and runtime errors after
1087 // GetCurrentState() and ComputeChangeSets() have finished.
1088 //
1089 // If it returns an error, it is dangerous to run any Commit methods.
1090 func (bal *Balancer) CheckSanityLate() error {
1091         if bal.errors != nil {
1092                 for _, err := range bal.errors {
1093                         bal.logf("deferred error: %v", err)
1094                 }
1095                 return fmt.Errorf("cannot proceed safely after deferred errors")
1096         }
1097
1098         if bal.collScanned == 0 {
1099                 return fmt.Errorf("received zero collections")
1100         }
1101
1102         anyDesired := false
1103         bal.BlockStateMap.Apply(func(_ arvados.SizedDigest, blk *BlockState) {
1104                 for _, desired := range blk.Desired {
1105                         if desired > 0 {
1106                                 anyDesired = true
1107                                 break
1108                         }
1109                 }
1110         })
1111         if !anyDesired {
1112                 return fmt.Errorf("zero blocks have desired replication>0")
1113         }
1114
1115         if dr := bal.DefaultReplication; dr < 1 {
1116                 return fmt.Errorf("Default replication (%d) is less than 1", dr)
1117         }
1118
1119         // TODO: no two services have identical indexes
1120         // TODO: no collisions (same md5, different size)
1121         return nil
1122 }
1123
1124 // CommitPulls sends the computed lists of pull requests to the
1125 // keepstore servers. This has the effect of increasing replication of
1126 // existing blocks that are either underreplicated or poorly
1127 // distributed according to rendezvous hashing.
1128 func (bal *Balancer) CommitPulls(ctx context.Context, c *arvados.Client) error {
1129         defer bal.time("send_pull_lists", "wall clock time to send pull lists")()
1130         return bal.commitAsync(c, "send pull list",
1131                 func(srv *KeepService) error {
1132                         return srv.CommitPulls(ctx, c)
1133                 })
1134 }
1135
1136 // CommitTrash sends the computed lists of trash requests to the
1137 // keepstore servers. This has the effect of deleting blocks that are
1138 // overreplicated or unreferenced.
1139 func (bal *Balancer) CommitTrash(ctx context.Context, c *arvados.Client) error {
1140         defer bal.time("send_trash_lists", "wall clock time to send trash lists")()
1141         return bal.commitAsync(c, "send trash list",
1142                 func(srv *KeepService) error {
1143                         return srv.CommitTrash(ctx, c)
1144                 })
1145 }
1146
1147 func (bal *Balancer) commitAsync(c *arvados.Client, label string, f func(srv *KeepService) error) error {
1148         errs := make(chan error)
1149         for _, srv := range bal.KeepServices {
1150                 go func(srv *KeepService) {
1151                         var err error
1152                         defer func() { errs <- err }()
1153                         label := fmt.Sprintf("%s: %v", srv, label)
1154                         err = f(srv)
1155                         if err != nil {
1156                                 err = fmt.Errorf("%s: %v", label, err)
1157                         }
1158                 }(srv)
1159         }
1160         var lastErr error
1161         for range bal.KeepServices {
1162                 if err := <-errs; err != nil {
1163                         bal.logf("%v", err)
1164                         lastErr = err
1165                 }
1166         }
1167         close(errs)
1168         return lastErr
1169 }
1170
1171 func (bal *Balancer) logf(f string, args ...interface{}) {
1172         if bal.Logger != nil {
1173                 bal.Logger.Printf(f, args...)
1174         }
1175 }
1176
1177 func (bal *Balancer) time(name, help string) func() {
1178         observer := bal.Metrics.DurationObserver(name+"_seconds", help)
1179         t0 := time.Now()
1180         bal.Logger.Printf("%s: start", name)
1181         return func() {
1182                 dur := time.Since(t0)
1183                 observer.Observe(dur.Seconds())
1184                 bal.Logger.Printf("%s: took %vs", name, dur.Seconds())
1185         }
1186 }
1187
1188 // Rendezvous hash sort function. Less efficient than sorting on
1189 // precomputed rendezvous hashes, but also rarely used.
1190 func rendezvousLess(i, j string, blkid arvados.SizedDigest) bool {
1191         a := md5.Sum([]byte(string(blkid[:32]) + i))
1192         b := md5.Sum([]byte(string(blkid[:32]) + j))
1193         return bytes.Compare(a[:], b[:]) < 0
1194 }