17394: Use BlockWrite interface in crunch-run.
[arvados.git] / services / keep-balance / balance.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "context"
10         "crypto/md5"
11         "fmt"
12         "io"
13         "io/ioutil"
14         "log"
15         "math"
16         "os"
17         "runtime"
18         "sort"
19         "strings"
20         "sync"
21         "syscall"
22         "time"
23
24         "git.arvados.org/arvados.git/sdk/go/arvados"
25         "git.arvados.org/arvados.git/sdk/go/keepclient"
26         "github.com/sirupsen/logrus"
27 )
28
29 // Balancer compares the contents of keepstore servers with the
30 // collections stored in Arvados, and issues pull/trash requests
31 // needed to get (closer to) the optimal data layout.
32 //
33 // In the optimal data layout: every data block referenced by a
34 // collection is replicated at least as many times as desired by the
35 // collection; there are no unreferenced data blocks older than
36 // BlobSignatureTTL; and all N existing replicas of a given data block
37 // are in the N best positions in rendezvous probe order.
38 type Balancer struct {
39         Logger  logrus.FieldLogger
40         Dumper  logrus.FieldLogger
41         Metrics *metrics
42
43         LostBlocksFile string
44
45         *BlockStateMap
46         KeepServices       map[string]*KeepService
47         DefaultReplication int
48         MinMtime           int64
49
50         classes       []string
51         mounts        int
52         mountsByClass map[string]map[*KeepMount]bool
53         collScanned   int
54         serviceRoots  map[string]string
55         errors        []error
56         stats         balancerStats
57         mutex         sync.Mutex
58         lostBlocks    io.Writer
59 }
60
61 // Run performs a balance operation using the given config and
62 // runOptions, and returns RunOptions suitable for passing to a
63 // subsequent balance operation.
64 //
65 // Run should only be called once on a given Balancer object.
66 //
67 // Typical usage:
68 //
69 //   runOptions, err = (&Balancer{}).Run(config, runOptions)
70 func (bal *Balancer) Run(client *arvados.Client, cluster *arvados.Cluster, runOptions RunOptions) (nextRunOptions RunOptions, err error) {
71         nextRunOptions = runOptions
72
73         defer bal.time("sweep", "wall clock time to run one full sweep")()
74
75         ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(cluster.Collections.BalanceTimeout.Duration()))
76         defer cancel()
77
78         var lbFile *os.File
79         if bal.LostBlocksFile != "" {
80                 tmpfn := bal.LostBlocksFile + ".tmp"
81                 lbFile, err = os.OpenFile(tmpfn, os.O_CREATE|os.O_WRONLY, 0777)
82                 if err != nil {
83                         return
84                 }
85                 defer lbFile.Close()
86                 err = syscall.Flock(int(lbFile.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
87                 if err != nil {
88                         return
89                 }
90                 defer func() {
91                         // Remove the tempfile only if we didn't get
92                         // as far as successfully renaming it.
93                         if lbFile != nil {
94                                 os.Remove(tmpfn)
95                         }
96                 }()
97                 bal.lostBlocks = lbFile
98         } else {
99                 bal.lostBlocks = ioutil.Discard
100         }
101
102         err = bal.DiscoverKeepServices(client)
103         if err != nil {
104                 return
105         }
106
107         for _, srv := range bal.KeepServices {
108                 err = srv.discoverMounts(client)
109                 if err != nil {
110                         return
111                 }
112         }
113         bal.cleanupMounts()
114
115         if err = bal.CheckSanityEarly(client); err != nil {
116                 return
117         }
118
119         // On a big site, indexing and sending trash/pull lists can
120         // take much longer than the usual 5 minute client
121         // timeout. From here on, we rely on the context deadline
122         // instead, aborting the entire operation if any part takes
123         // too long.
124         client.Timeout = 0
125
126         rs := bal.rendezvousState()
127         if runOptions.CommitTrash && rs != runOptions.SafeRendezvousState {
128                 if runOptions.SafeRendezvousState != "" {
129                         bal.logf("notice: KeepServices list has changed since last run")
130                 }
131                 bal.logf("clearing existing trash lists, in case the new rendezvous order differs from previous run")
132                 if err = bal.ClearTrashLists(ctx, client); err != nil {
133                         return
134                 }
135                 // The current rendezvous state becomes "safe" (i.e.,
136                 // OK to compute changes for that state without
137                 // clearing existing trash lists) only now, after we
138                 // succeed in clearing existing trash lists.
139                 nextRunOptions.SafeRendezvousState = rs
140         }
141
142         if err = bal.GetCurrentState(ctx, client, cluster.Collections.BalanceCollectionBatch, cluster.Collections.BalanceCollectionBuffers); err != nil {
143                 return
144         }
145         bal.ComputeChangeSets()
146         bal.PrintStatistics()
147         if err = bal.CheckSanityLate(); err != nil {
148                 return
149         }
150         if lbFile != nil {
151                 err = lbFile.Sync()
152                 if err != nil {
153                         return
154                 }
155                 err = os.Rename(bal.LostBlocksFile+".tmp", bal.LostBlocksFile)
156                 if err != nil {
157                         return
158                 }
159                 lbFile = nil
160         }
161         if runOptions.CommitPulls {
162                 err = bal.CommitPulls(ctx, client)
163                 if err != nil {
164                         // Skip trash if we can't pull. (Too cautious?)
165                         return
166                 }
167         }
168         if runOptions.CommitTrash {
169                 err = bal.CommitTrash(ctx, client)
170         }
171         return
172 }
173
174 // SetKeepServices sets the list of KeepServices to operate on.
175 func (bal *Balancer) SetKeepServices(srvList arvados.KeepServiceList) error {
176         bal.KeepServices = make(map[string]*KeepService)
177         for _, srv := range srvList.Items {
178                 bal.KeepServices[srv.UUID] = &KeepService{
179                         KeepService: srv,
180                         ChangeSet:   &ChangeSet{},
181                 }
182         }
183         return nil
184 }
185
186 // DiscoverKeepServices sets the list of KeepServices by calling the
187 // API to get a list of all services, and selecting the ones whose
188 // ServiceType is "disk"
189 func (bal *Balancer) DiscoverKeepServices(c *arvados.Client) error {
190         bal.KeepServices = make(map[string]*KeepService)
191         return c.EachKeepService(func(srv arvados.KeepService) error {
192                 if srv.ServiceType == "disk" {
193                         bal.KeepServices[srv.UUID] = &KeepService{
194                                 KeepService: srv,
195                                 ChangeSet:   &ChangeSet{},
196                         }
197                 } else {
198                         bal.logf("skipping %v with service type %q", srv.UUID, srv.ServiceType)
199                 }
200                 return nil
201         })
202 }
203
204 func (bal *Balancer) cleanupMounts() {
205         rwdev := map[string]*KeepService{}
206         for _, srv := range bal.KeepServices {
207                 for _, mnt := range srv.mounts {
208                         if !mnt.ReadOnly && mnt.DeviceID != "" {
209                                 rwdev[mnt.DeviceID] = srv
210                         }
211                 }
212         }
213         // Drop the readonly mounts whose device is mounted RW
214         // elsewhere.
215         for _, srv := range bal.KeepServices {
216                 var dedup []*KeepMount
217                 for _, mnt := range srv.mounts {
218                         if mnt.ReadOnly && rwdev[mnt.DeviceID] != nil {
219                                 bal.logf("skipping srv %s readonly mount %q because same device %q is mounted read-write on srv %s", srv, mnt.UUID, mnt.DeviceID, rwdev[mnt.DeviceID])
220                         } else {
221                                 dedup = append(dedup, mnt)
222                         }
223                 }
224                 srv.mounts = dedup
225         }
226         for _, srv := range bal.KeepServices {
227                 for _, mnt := range srv.mounts {
228                         if mnt.Replication <= 0 {
229                                 log.Printf("%s: mount %s reports replication=%d, using replication=1", srv, mnt.UUID, mnt.Replication)
230                                 mnt.Replication = 1
231                         }
232                 }
233         }
234 }
235
236 // CheckSanityEarly checks for configuration and runtime errors that
237 // can be detected before GetCurrentState() and ComputeChangeSets()
238 // are called.
239 //
240 // If it returns an error, it is pointless to run GetCurrentState or
241 // ComputeChangeSets: after doing so, the statistics would be
242 // meaningless and it would be dangerous to run any Commit methods.
243 func (bal *Balancer) CheckSanityEarly(c *arvados.Client) error {
244         u, err := c.CurrentUser()
245         if err != nil {
246                 return fmt.Errorf("CurrentUser(): %v", err)
247         }
248         if !u.IsActive || !u.IsAdmin {
249                 return fmt.Errorf("current user (%s) is not an active admin user", u.UUID)
250         }
251         for _, srv := range bal.KeepServices {
252                 if srv.ServiceType == "proxy" {
253                         return fmt.Errorf("config error: %s: proxy servers cannot be balanced", srv)
254                 }
255         }
256
257         var checkPage arvados.CollectionList
258         if err = c.RequestAndDecode(&checkPage, "GET", "arvados/v1/collections", nil, arvados.ResourceListParams{
259                 Limit:              new(int),
260                 Count:              "exact",
261                 IncludeTrash:       true,
262                 IncludeOldVersions: true,
263                 Filters: []arvados.Filter{{
264                         Attr:     "modified_at",
265                         Operator: "=",
266                         Operand:  nil,
267                 }},
268         }); err != nil {
269                 return err
270         } else if n := checkPage.ItemsAvailable; n > 0 {
271                 return fmt.Errorf("%d collections exist with null modified_at; cannot fetch reliably", n)
272         }
273
274         return nil
275 }
276
277 // rendezvousState returns a fingerprint (e.g., a sorted list of
278 // UUID+host+port) of the current set of keep services.
279 func (bal *Balancer) rendezvousState() string {
280         srvs := make([]string, 0, len(bal.KeepServices))
281         for _, srv := range bal.KeepServices {
282                 srvs = append(srvs, srv.String())
283         }
284         sort.Strings(srvs)
285         return strings.Join(srvs, "; ")
286 }
287
288 // ClearTrashLists sends an empty trash list to each keep
289 // service. Calling this before GetCurrentState avoids races.
290 //
291 // When a block appears in an index, we assume that replica will still
292 // exist after we delete other replicas on other servers. However,
293 // it's possible that a previous rebalancing operation made different
294 // decisions (e.g., servers were added/removed, and rendezvous order
295 // changed). In this case, the replica might already be on that
296 // server's trash list, and it might be deleted before we send a
297 // replacement trash list.
298 //
299 // We avoid this problem if we clear all trash lists before getting
300 // indexes. (We also assume there is only one rebalancing process
301 // running at a time.)
302 func (bal *Balancer) ClearTrashLists(ctx context.Context, c *arvados.Client) error {
303         for _, srv := range bal.KeepServices {
304                 srv.ChangeSet = &ChangeSet{}
305         }
306         return bal.CommitTrash(ctx, c)
307 }
308
309 // GetCurrentState determines the current replication state, and the
310 // desired replication level, for every block that is either
311 // retrievable or referenced.
312 //
313 // It determines the current replication state by reading the block index
314 // from every known Keep service.
315 //
316 // It determines the desired replication level by retrieving all
317 // collection manifests in the database (API server).
318 //
319 // It encodes the resulting information in BlockStateMap.
320 func (bal *Balancer) GetCurrentState(ctx context.Context, c *arvados.Client, pageSize, bufs int) error {
321         ctx, cancel := context.WithCancel(ctx)
322         defer cancel()
323
324         defer bal.time("get_state", "wall clock time to get current state")()
325         bal.BlockStateMap = NewBlockStateMap()
326
327         dd, err := c.DiscoveryDocument()
328         if err != nil {
329                 return err
330         }
331         bal.DefaultReplication = dd.DefaultCollectionReplication
332         bal.MinMtime = time.Now().UnixNano() - dd.BlobSignatureTTL*1e9
333
334         errs := make(chan error, 1)
335         wg := sync.WaitGroup{}
336
337         // When a device is mounted more than once, we will get its
338         // index only once, and call AddReplicas on all of the mounts.
339         // equivMount keys are the mounts that will be indexed, and
340         // each value is a list of mounts to apply the received index
341         // to.
342         equivMount := map[*KeepMount][]*KeepMount{}
343         // deviceMount maps each device ID to the one mount that will
344         // be indexed for that device.
345         deviceMount := map[string]*KeepMount{}
346         for _, srv := range bal.KeepServices {
347                 for _, mnt := range srv.mounts {
348                         equiv := deviceMount[mnt.DeviceID]
349                         if equiv == nil {
350                                 equiv = mnt
351                                 if mnt.DeviceID != "" {
352                                         deviceMount[mnt.DeviceID] = equiv
353                                 }
354                         }
355                         equivMount[equiv] = append(equivMount[equiv], mnt)
356                 }
357         }
358
359         // Start one goroutine for each (non-redundant) mount:
360         // retrieve the index, and add the returned blocks to
361         // BlockStateMap.
362         for _, mounts := range equivMount {
363                 wg.Add(1)
364                 go func(mounts []*KeepMount) {
365                         defer wg.Done()
366                         bal.logf("mount %s: retrieve index from %s", mounts[0], mounts[0].KeepService)
367                         idx, err := mounts[0].KeepService.IndexMount(ctx, c, mounts[0].UUID, "")
368                         if err != nil {
369                                 select {
370                                 case errs <- fmt.Errorf("%s: retrieve index: %v", mounts[0], err):
371                                 default:
372                                 }
373                                 cancel()
374                                 return
375                         }
376                         if len(errs) > 0 {
377                                 // Some other goroutine encountered an
378                                 // error -- any further effort here
379                                 // will be wasted.
380                                 return
381                         }
382                         for _, mount := range mounts {
383                                 bal.logf("%s: add %d entries to map", mount, len(idx))
384                                 bal.BlockStateMap.AddReplicas(mount, idx)
385                                 bal.logf("%s: added %d entries to map at %dx (%d replicas)", mount, len(idx), mount.Replication, len(idx)*mount.Replication)
386                         }
387                         bal.logf("mount %s: index done", mounts[0])
388                 }(mounts)
389         }
390
391         // collQ buffers incoming collections so we can start fetching
392         // the next page without waiting for the current page to
393         // finish processing.
394         collQ := make(chan arvados.Collection, bufs)
395
396         // Start a goroutine to process collections. (We could use a
397         // worker pool here, but even with a single worker we already
398         // process collections much faster than we can retrieve them.)
399         wg.Add(1)
400         go func() {
401                 defer wg.Done()
402                 for coll := range collQ {
403                         err := bal.addCollection(coll)
404                         if err != nil || len(errs) > 0 {
405                                 select {
406                                 case errs <- err:
407                                 default:
408                                 }
409                                 for range collQ {
410                                 }
411                                 cancel()
412                                 return
413                         }
414                         bal.collScanned++
415                 }
416         }()
417
418         // Start a goroutine to retrieve all collections from the
419         // Arvados database and send them to collQ for processing.
420         wg.Add(1)
421         go func() {
422                 defer wg.Done()
423                 err = EachCollection(ctx, c, pageSize,
424                         func(coll arvados.Collection) error {
425                                 collQ <- coll
426                                 if len(errs) > 0 {
427                                         // some other GetCurrentState
428                                         // error happened: no point
429                                         // getting any more
430                                         // collections.
431                                         return fmt.Errorf("")
432                                 }
433                                 return nil
434                         }, func(done, total int) {
435                                 bal.logf("collections: %d/%d", done, total)
436                         })
437                 close(collQ)
438                 if err != nil {
439                         select {
440                         case errs <- err:
441                         default:
442                         }
443                         cancel()
444                 }
445         }()
446
447         wg.Wait()
448         if len(errs) > 0 {
449                 return <-errs
450         }
451         return nil
452 }
453
454 func (bal *Balancer) addCollection(coll arvados.Collection) error {
455         blkids, err := coll.SizedDigests()
456         if err != nil {
457                 return fmt.Errorf("%v: %v", coll.UUID, err)
458         }
459         repl := bal.DefaultReplication
460         if coll.ReplicationDesired != nil {
461                 repl = *coll.ReplicationDesired
462         }
463         bal.Logger.Debugf("%v: %d block x%d", coll.UUID, len(blkids), repl)
464         // Pass pdh to IncreaseDesired only if LostBlocksFile is being
465         // written -- otherwise it's just a waste of memory.
466         pdh := ""
467         if bal.LostBlocksFile != "" {
468                 pdh = coll.PortableDataHash
469         }
470         bal.BlockStateMap.IncreaseDesired(pdh, coll.StorageClassesDesired, repl, blkids)
471         return nil
472 }
473
474 // ComputeChangeSets compares, for each known block, the current and
475 // desired replication states. If it is possible to get closer to the
476 // desired state by copying or deleting blocks, it adds those changes
477 // to the relevant KeepServices' ChangeSets.
478 //
479 // It does not actually apply any of the computed changes.
480 func (bal *Balancer) ComputeChangeSets() {
481         // This just calls balanceBlock() once for each block, using a
482         // pool of worker goroutines.
483         defer bal.time("changeset_compute", "wall clock time to compute changesets")()
484         bal.setupLookupTables()
485
486         type balanceTask struct {
487                 blkid arvados.SizedDigest
488                 blk   *BlockState
489         }
490         workers := runtime.GOMAXPROCS(-1)
491         todo := make(chan balanceTask, workers)
492         go func() {
493                 bal.BlockStateMap.Apply(func(blkid arvados.SizedDigest, blk *BlockState) {
494                         todo <- balanceTask{
495                                 blkid: blkid,
496                                 blk:   blk,
497                         }
498                 })
499                 close(todo)
500         }()
501         results := make(chan balanceResult, workers)
502         go func() {
503                 var wg sync.WaitGroup
504                 for i := 0; i < workers; i++ {
505                         wg.Add(1)
506                         go func() {
507                                 for work := range todo {
508                                         results <- bal.balanceBlock(work.blkid, work.blk)
509                                 }
510                                 wg.Done()
511                         }()
512                 }
513                 wg.Wait()
514                 close(results)
515         }()
516         bal.collectStatistics(results)
517 }
518
519 func (bal *Balancer) setupLookupTables() {
520         bal.serviceRoots = make(map[string]string)
521         bal.classes = defaultClasses
522         bal.mountsByClass = map[string]map[*KeepMount]bool{"default": {}}
523         bal.mounts = 0
524         for _, srv := range bal.KeepServices {
525                 bal.serviceRoots[srv.UUID] = srv.UUID
526                 for _, mnt := range srv.mounts {
527                         bal.mounts++
528
529                         // All mounts on a read-only service are
530                         // effectively read-only.
531                         mnt.ReadOnly = mnt.ReadOnly || srv.ReadOnly
532
533                         if len(mnt.StorageClasses) == 0 {
534                                 bal.mountsByClass["default"][mnt] = true
535                                 continue
536                         }
537                         for class := range mnt.StorageClasses {
538                                 if mbc := bal.mountsByClass[class]; mbc == nil {
539                                         bal.classes = append(bal.classes, class)
540                                         bal.mountsByClass[class] = map[*KeepMount]bool{mnt: true}
541                                 } else {
542                                         mbc[mnt] = true
543                                 }
544                         }
545                 }
546         }
547         // Consider classes in lexicographic order to avoid flapping
548         // between balancing runs.  The outcome of the "prefer a mount
549         // we're already planning to use for a different storage
550         // class" case in balanceBlock depends on the order classes
551         // are considered.
552         sort.Strings(bal.classes)
553 }
554
555 const (
556         changeStay = iota
557         changePull
558         changeTrash
559         changeNone
560 )
561
562 var changeName = map[int]string{
563         changeStay:  "stay",
564         changePull:  "pull",
565         changeTrash: "trash",
566         changeNone:  "none",
567 }
568
569 type balancedBlockState struct {
570         needed       int
571         unneeded     int
572         pulling      int
573         unachievable bool
574 }
575
576 type balanceResult struct {
577         blk        *BlockState
578         blkid      arvados.SizedDigest
579         lost       bool
580         blockState balancedBlockState
581         classState map[string]balancedBlockState
582 }
583
584 type slot struct {
585         mnt  *KeepMount // never nil
586         repl *Replica   // replica already stored here (or nil)
587         want bool       // we should pull/leave a replica here
588 }
589
590 // balanceBlock compares current state to desired state for a single
591 // block, and makes the appropriate ChangeSet calls.
592 func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) balanceResult {
593         bal.Logger.Debugf("balanceBlock: %v %+v", blkid, blk)
594
595         // Build a list of all slots (one per mounted volume).
596         slots := make([]slot, 0, bal.mounts)
597         for _, srv := range bal.KeepServices {
598                 for _, mnt := range srv.mounts {
599                         var repl *Replica
600                         for r := range blk.Replicas {
601                                 if blk.Replicas[r].KeepMount == mnt {
602                                         repl = &blk.Replicas[r]
603                                 }
604                         }
605                         // Initial value of "want" is "have, and can't
606                         // delete". These untrashable replicas get
607                         // prioritized when sorting slots: otherwise,
608                         // non-optimal readonly copies would cause us
609                         // to overreplicate.
610                         slots = append(slots, slot{
611                                 mnt:  mnt,
612                                 repl: repl,
613                                 want: repl != nil && mnt.ReadOnly,
614                         })
615                 }
616         }
617
618         uuids := keepclient.NewRootSorter(bal.serviceRoots, string(blkid[:32])).GetSortedRoots()
619         srvRendezvous := make(map[*KeepService]int, len(uuids))
620         for i, uuid := range uuids {
621                 srv := bal.KeepServices[uuid]
622                 srvRendezvous[srv] = i
623         }
624
625         // Below we set underreplicated=true if we find any storage
626         // class that's currently underreplicated -- in that case we
627         // won't want to trash any replicas.
628         underreplicated := false
629
630         unsafeToDelete := make(map[int64]bool, len(slots))
631         for _, class := range bal.classes {
632                 desired := blk.Desired[class]
633                 if desired == 0 {
634                         continue
635                 }
636
637                 // Sort the slots by desirability.
638                 sort.Slice(slots, func(i, j int) bool {
639                         si, sj := slots[i], slots[j]
640                         if classi, classj := bal.mountsByClass[class][si.mnt], bal.mountsByClass[class][sj.mnt]; classi != classj {
641                                 // Prefer a mount that satisfies the
642                                 // desired class.
643                                 return bal.mountsByClass[class][si.mnt]
644                         } else if si.want != sj.want {
645                                 // Prefer a mount that will have a
646                                 // replica no matter what we do here
647                                 // -- either because it already has an
648                                 // untrashable replica, or because we
649                                 // already need it to satisfy a
650                                 // different storage class.
651                                 return si.want
652                         } else if orderi, orderj := srvRendezvous[si.mnt.KeepService], srvRendezvous[sj.mnt.KeepService]; orderi != orderj {
653                                 // Prefer a better rendezvous
654                                 // position.
655                                 return orderi < orderj
656                         } else if repli, replj := si.repl != nil, sj.repl != nil; repli != replj {
657                                 // Prefer a mount that already has a
658                                 // replica.
659                                 return repli
660                         } else {
661                                 // If pull/trash turns out to be
662                                 // needed, distribute the
663                                 // new/remaining replicas uniformly
664                                 // across qualifying mounts on a given
665                                 // server.
666                                 return rendezvousLess(si.mnt.DeviceID, sj.mnt.DeviceID, blkid)
667                         }
668                 })
669
670                 // Servers/mounts/devices (with or without existing
671                 // replicas) that are part of the best achievable
672                 // layout for this storage class.
673                 wantSrv := map[*KeepService]bool{}
674                 wantMnt := map[*KeepMount]bool{}
675                 wantDev := map[string]bool{}
676                 // Positions (with existing replicas) that have been
677                 // protected (via unsafeToDelete) to ensure we don't
678                 // reduce replication below desired level when
679                 // trashing replicas that aren't optimal positions for
680                 // any storage class.
681                 protMnt := map[*KeepMount]bool{}
682                 // Replication planned so far (corresponds to wantMnt).
683                 replWant := 0
684                 // Protected replication (corresponds to protMnt).
685                 replProt := 0
686
687                 // trySlot tries using a slot to meet requirements,
688                 // and returns true if all requirements are met.
689                 trySlot := func(i int) bool {
690                         slot := slots[i]
691                         if wantMnt[slot.mnt] || wantDev[slot.mnt.DeviceID] {
692                                 // Already allocated a replica to this
693                                 // backend device, possibly on a
694                                 // different server.
695                                 return false
696                         }
697                         if replProt < desired && slot.repl != nil && !protMnt[slot.mnt] {
698                                 unsafeToDelete[slot.repl.Mtime] = true
699                                 protMnt[slot.mnt] = true
700                                 replProt += slot.mnt.Replication
701                         }
702                         if replWant < desired && (slot.repl != nil || !slot.mnt.ReadOnly) {
703                                 slots[i].want = true
704                                 wantSrv[slot.mnt.KeepService] = true
705                                 wantMnt[slot.mnt] = true
706                                 if slot.mnt.DeviceID != "" {
707                                         wantDev[slot.mnt.DeviceID] = true
708                                 }
709                                 replWant += slot.mnt.Replication
710                         }
711                         return replProt >= desired && replWant >= desired
712                 }
713
714                 // First try to achieve desired replication without
715                 // using the same server twice.
716                 done := false
717                 for i := 0; i < len(slots) && !done; i++ {
718                         if !wantSrv[slots[i].mnt.KeepService] {
719                                 done = trySlot(i)
720                         }
721                 }
722
723                 // If that didn't suffice, do another pass without the
724                 // "distinct services" restriction. (Achieving the
725                 // desired volume replication on fewer than the
726                 // desired number of services is better than
727                 // underreplicating.)
728                 for i := 0; i < len(slots) && !done; i++ {
729                         done = trySlot(i)
730                 }
731
732                 if !underreplicated {
733                         safe := 0
734                         for _, slot := range slots {
735                                 if slot.repl == nil || !bal.mountsByClass[class][slot.mnt] {
736                                         continue
737                                 }
738                                 if safe += slot.mnt.Replication; safe >= desired {
739                                         break
740                                 }
741                         }
742                         underreplicated = safe < desired
743                 }
744
745                 // Avoid deleting wanted replicas from devices that
746                 // are mounted on multiple servers -- even if they
747                 // haven't already been added to unsafeToDelete
748                 // because the servers report different Mtimes.
749                 for _, slot := range slots {
750                         if slot.repl != nil && wantDev[slot.mnt.DeviceID] {
751                                 unsafeToDelete[slot.repl.Mtime] = true
752                         }
753                 }
754         }
755
756         // TODO: If multiple replicas are trashable, prefer the oldest
757         // replica that doesn't have a timestamp collision with
758         // others.
759
760         for i, slot := range slots {
761                 // Don't trash (1) any replicas of an underreplicated
762                 // block, even if they're in the wrong positions, or
763                 // (2) any replicas whose Mtimes are identical to
764                 // needed replicas (in case we're really seeing the
765                 // same copy via different mounts).
766                 if slot.repl != nil && (underreplicated || unsafeToDelete[slot.repl.Mtime]) {
767                         slots[i].want = true
768                 }
769         }
770
771         classState := make(map[string]balancedBlockState, len(bal.classes))
772         for _, class := range bal.classes {
773                 classState[class] = computeBlockState(slots, bal.mountsByClass[class], len(blk.Replicas), blk.Desired[class])
774         }
775         blockState := computeBlockState(slots, nil, len(blk.Replicas), 0)
776
777         var lost bool
778         var changes []string
779         for _, slot := range slots {
780                 // TODO: request a Touch if Mtime is duplicated.
781                 var change int
782                 switch {
783                 case !slot.want && slot.repl != nil && slot.repl.Mtime < bal.MinMtime:
784                         slot.mnt.KeepService.AddTrash(Trash{
785                                 SizedDigest: blkid,
786                                 Mtime:       slot.repl.Mtime,
787                                 From:        slot.mnt,
788                         })
789                         change = changeTrash
790                 case slot.repl == nil && slot.want && len(blk.Replicas) == 0:
791                         lost = true
792                         change = changeNone
793                 case slot.repl == nil && slot.want && !slot.mnt.ReadOnly:
794                         slot.mnt.KeepService.AddPull(Pull{
795                                 SizedDigest: blkid,
796                                 From:        blk.Replicas[0].KeepMount.KeepService,
797                                 To:          slot.mnt,
798                         })
799                         change = changePull
800                 case slot.repl != nil:
801                         change = changeStay
802                 default:
803                         change = changeNone
804                 }
805                 if bal.Dumper != nil {
806                         var mtime int64
807                         if slot.repl != nil {
808                                 mtime = slot.repl.Mtime
809                         }
810                         srv := slot.mnt.KeepService
811                         changes = append(changes, fmt.Sprintf("%s:%d/%s=%s,%d", srv.ServiceHost, srv.ServicePort, slot.mnt.UUID, changeName[change], mtime))
812                 }
813         }
814         if bal.Dumper != nil {
815                 bal.Dumper.Printf("%s refs=%d needed=%d unneeded=%d pulling=%v %v %v", blkid, blk.RefCount, blockState.needed, blockState.unneeded, blockState.pulling, blk.Desired, changes)
816         }
817         return balanceResult{
818                 blk:        blk,
819                 blkid:      blkid,
820                 lost:       lost,
821                 blockState: blockState,
822                 classState: classState,
823         }
824 }
825
826 func computeBlockState(slots []slot, onlyCount map[*KeepMount]bool, have, needRepl int) (bbs balancedBlockState) {
827         repl := 0
828         countedDev := map[string]bool{}
829         for _, slot := range slots {
830                 if onlyCount != nil && !onlyCount[slot.mnt] {
831                         continue
832                 }
833                 if countedDev[slot.mnt.DeviceID] {
834                         continue
835                 }
836                 switch {
837                 case slot.repl != nil && slot.want:
838                         bbs.needed++
839                         repl += slot.mnt.Replication
840                 case slot.repl != nil && !slot.want:
841                         bbs.unneeded++
842                         repl += slot.mnt.Replication
843                 case slot.repl == nil && slot.want && have > 0:
844                         bbs.pulling++
845                         repl += slot.mnt.Replication
846                 }
847                 if slot.mnt.DeviceID != "" {
848                         countedDev[slot.mnt.DeviceID] = true
849                 }
850         }
851         if repl < needRepl {
852                 bbs.unachievable = true
853         }
854         return
855 }
856
857 type blocksNBytes struct {
858         replicas int
859         blocks   int
860         bytes    int64
861 }
862
863 func (bb blocksNBytes) String() string {
864         return fmt.Sprintf("%d replicas (%d blocks, %d bytes)", bb.replicas, bb.blocks, bb.bytes)
865 }
866
867 type replicationStats struct {
868         needed       blocksNBytes
869         unneeded     blocksNBytes
870         pulling      blocksNBytes
871         unachievable blocksNBytes
872 }
873
874 type balancerStats struct {
875         lost          blocksNBytes
876         overrep       blocksNBytes
877         unref         blocksNBytes
878         garbage       blocksNBytes
879         underrep      blocksNBytes
880         unachievable  blocksNBytes
881         justright     blocksNBytes
882         desired       blocksNBytes
883         current       blocksNBytes
884         pulls         int
885         trashes       int
886         replHistogram []int
887         classStats    map[string]replicationStats
888
889         // collectionBytes / collectionBlockBytes = deduplication ratio
890         collectionBytes      int64 // sum(bytes in referenced blocks) across all collections
891         collectionBlockBytes int64 // sum(block size) across all blocks referenced by collections
892         collectionBlockRefs  int64 // sum(number of blocks referenced) across all collections
893         collectionBlocks     int64 // number of blocks referenced by any collection
894 }
895
896 func (s *balancerStats) dedupByteRatio() float64 {
897         if s.collectionBlockBytes == 0 {
898                 return 0
899         }
900         return float64(s.collectionBytes) / float64(s.collectionBlockBytes)
901 }
902
903 func (s *balancerStats) dedupBlockRatio() float64 {
904         if s.collectionBlocks == 0 {
905                 return 0
906         }
907         return float64(s.collectionBlockRefs) / float64(s.collectionBlocks)
908 }
909
910 func (bal *Balancer) collectStatistics(results <-chan balanceResult) {
911         var s balancerStats
912         s.replHistogram = make([]int, 2)
913         s.classStats = make(map[string]replicationStats, len(bal.classes))
914         for result := range results {
915                 bytes := result.blkid.Size()
916
917                 if rc := int64(result.blk.RefCount); rc > 0 {
918                         s.collectionBytes += rc * bytes
919                         s.collectionBlockBytes += bytes
920                         s.collectionBlockRefs += rc
921                         s.collectionBlocks++
922                 }
923
924                 for class, state := range result.classState {
925                         cs := s.classStats[class]
926                         if state.unachievable {
927                                 cs.unachievable.replicas++
928                                 cs.unachievable.blocks++
929                                 cs.unachievable.bytes += bytes
930                         }
931                         if state.needed > 0 {
932                                 cs.needed.replicas += state.needed
933                                 cs.needed.blocks++
934                                 cs.needed.bytes += bytes * int64(state.needed)
935                         }
936                         if state.unneeded > 0 {
937                                 cs.unneeded.replicas += state.unneeded
938                                 cs.unneeded.blocks++
939                                 cs.unneeded.bytes += bytes * int64(state.unneeded)
940                         }
941                         if state.pulling > 0 {
942                                 cs.pulling.replicas += state.pulling
943                                 cs.pulling.blocks++
944                                 cs.pulling.bytes += bytes * int64(state.pulling)
945                         }
946                         s.classStats[class] = cs
947                 }
948
949                 bs := result.blockState
950                 switch {
951                 case result.lost:
952                         s.lost.replicas++
953                         s.lost.blocks++
954                         s.lost.bytes += bytes
955                         fmt.Fprintf(bal.lostBlocks, "%s", strings.SplitN(string(result.blkid), "+", 2)[0])
956                         for pdh := range result.blk.Refs {
957                                 fmt.Fprintf(bal.lostBlocks, " %s", pdh)
958                         }
959                         fmt.Fprint(bal.lostBlocks, "\n")
960                 case bs.pulling > 0:
961                         s.underrep.replicas += bs.pulling
962                         s.underrep.blocks++
963                         s.underrep.bytes += bytes * int64(bs.pulling)
964                 case bs.unachievable:
965                         s.underrep.replicas++
966                         s.underrep.blocks++
967                         s.underrep.bytes += bytes
968                 case bs.unneeded > 0 && bs.needed == 0:
969                         // Count as "garbage" if all replicas are old
970                         // enough to trash, otherwise count as
971                         // "unref".
972                         counter := &s.garbage
973                         for _, r := range result.blk.Replicas {
974                                 if r.Mtime >= bal.MinMtime {
975                                         counter = &s.unref
976                                         break
977                                 }
978                         }
979                         counter.replicas += bs.unneeded
980                         counter.blocks++
981                         counter.bytes += bytes * int64(bs.unneeded)
982                 case bs.unneeded > 0:
983                         s.overrep.replicas += bs.unneeded
984                         s.overrep.blocks++
985                         s.overrep.bytes += bytes * int64(bs.unneeded)
986                 default:
987                         s.justright.replicas += bs.needed
988                         s.justright.blocks++
989                         s.justright.bytes += bytes * int64(bs.needed)
990                 }
991
992                 if bs.needed > 0 {
993                         s.desired.replicas += bs.needed
994                         s.desired.blocks++
995                         s.desired.bytes += bytes * int64(bs.needed)
996                 }
997                 if bs.needed+bs.unneeded > 0 {
998                         s.current.replicas += bs.needed + bs.unneeded
999                         s.current.blocks++
1000                         s.current.bytes += bytes * int64(bs.needed+bs.unneeded)
1001                 }
1002
1003                 for len(s.replHistogram) <= bs.needed+bs.unneeded {
1004                         s.replHistogram = append(s.replHistogram, 0)
1005                 }
1006                 s.replHistogram[bs.needed+bs.unneeded]++
1007         }
1008         for _, srv := range bal.KeepServices {
1009                 s.pulls += len(srv.ChangeSet.Pulls)
1010                 s.trashes += len(srv.ChangeSet.Trashes)
1011         }
1012         bal.stats = s
1013         bal.Metrics.UpdateStats(s)
1014 }
1015
1016 // PrintStatistics writes statistics about the computed changes to
1017 // bal.Logger. It should not be called until ComputeChangeSets has
1018 // finished.
1019 func (bal *Balancer) PrintStatistics() {
1020         bal.logf("===")
1021         bal.logf("%s lost (0=have<want)", bal.stats.lost)
1022         bal.logf("%s underreplicated (0<have<want)", bal.stats.underrep)
1023         bal.logf("%s just right (have=want)", bal.stats.justright)
1024         bal.logf("%s overreplicated (have>want>0)", bal.stats.overrep)
1025         bal.logf("%s unreferenced (have>want=0, new)", bal.stats.unref)
1026         bal.logf("%s garbage (have>want=0, old)", bal.stats.garbage)
1027         for _, class := range bal.classes {
1028                 cs := bal.stats.classStats[class]
1029                 bal.logf("===")
1030                 bal.logf("storage class %q: %s needed", class, cs.needed)
1031                 bal.logf("storage class %q: %s unneeded", class, cs.unneeded)
1032                 bal.logf("storage class %q: %s pulling", class, cs.pulling)
1033                 bal.logf("storage class %q: %s unachievable", class, cs.unachievable)
1034         }
1035         bal.logf("===")
1036         bal.logf("%s total commitment (excluding unreferenced)", bal.stats.desired)
1037         bal.logf("%s total usage", bal.stats.current)
1038         bal.logf("===")
1039         for _, srv := range bal.KeepServices {
1040                 bal.logf("%s: %v\n", srv, srv.ChangeSet)
1041         }
1042         bal.logf("===")
1043         bal.printHistogram(60)
1044         bal.logf("===")
1045 }
1046
1047 func (bal *Balancer) printHistogram(hashColumns int) {
1048         bal.logf("Replication level distribution:")
1049         maxCount := 0
1050         for _, count := range bal.stats.replHistogram {
1051                 if maxCount < count {
1052                         maxCount = count
1053                 }
1054         }
1055         hashes := strings.Repeat("#", hashColumns)
1056         countWidth := 1 + int(math.Log10(float64(maxCount+1)))
1057         scaleCount := 10 * float64(hashColumns) / math.Floor(1+10*math.Log10(float64(maxCount+1)))
1058         for repl, count := range bal.stats.replHistogram {
1059                 nHashes := int(scaleCount * math.Log10(float64(count+1)))
1060                 bal.logf("%2d: %*d %s", repl, countWidth, count, hashes[:nHashes])
1061         }
1062 }
1063
1064 // CheckSanityLate checks for configuration and runtime errors after
1065 // GetCurrentState() and ComputeChangeSets() have finished.
1066 //
1067 // If it returns an error, it is dangerous to run any Commit methods.
1068 func (bal *Balancer) CheckSanityLate() error {
1069         if bal.errors != nil {
1070                 for _, err := range bal.errors {
1071                         bal.logf("deferred error: %v", err)
1072                 }
1073                 return fmt.Errorf("cannot proceed safely after deferred errors")
1074         }
1075
1076         if bal.collScanned == 0 {
1077                 return fmt.Errorf("received zero collections")
1078         }
1079
1080         anyDesired := false
1081         bal.BlockStateMap.Apply(func(_ arvados.SizedDigest, blk *BlockState) {
1082                 for _, desired := range blk.Desired {
1083                         if desired > 0 {
1084                                 anyDesired = true
1085                                 break
1086                         }
1087                 }
1088         })
1089         if !anyDesired {
1090                 return fmt.Errorf("zero blocks have desired replication>0")
1091         }
1092
1093         if dr := bal.DefaultReplication; dr < 1 {
1094                 return fmt.Errorf("Default replication (%d) is less than 1", dr)
1095         }
1096
1097         // TODO: no two services have identical indexes
1098         // TODO: no collisions (same md5, different size)
1099         return nil
1100 }
1101
1102 // CommitPulls sends the computed lists of pull requests to the
1103 // keepstore servers. This has the effect of increasing replication of
1104 // existing blocks that are either underreplicated or poorly
1105 // distributed according to rendezvous hashing.
1106 func (bal *Balancer) CommitPulls(ctx context.Context, c *arvados.Client) error {
1107         defer bal.time("send_pull_lists", "wall clock time to send pull lists")()
1108         return bal.commitAsync(c, "send pull list",
1109                 func(srv *KeepService) error {
1110                         return srv.CommitPulls(ctx, c)
1111                 })
1112 }
1113
1114 // CommitTrash sends the computed lists of trash requests to the
1115 // keepstore servers. This has the effect of deleting blocks that are
1116 // overreplicated or unreferenced.
1117 func (bal *Balancer) CommitTrash(ctx context.Context, c *arvados.Client) error {
1118         defer bal.time("send_trash_lists", "wall clock time to send trash lists")()
1119         return bal.commitAsync(c, "send trash list",
1120                 func(srv *KeepService) error {
1121                         return srv.CommitTrash(ctx, c)
1122                 })
1123 }
1124
1125 func (bal *Balancer) commitAsync(c *arvados.Client, label string, f func(srv *KeepService) error) error {
1126         errs := make(chan error)
1127         for _, srv := range bal.KeepServices {
1128                 go func(srv *KeepService) {
1129                         var err error
1130                         defer func() { errs <- err }()
1131                         label := fmt.Sprintf("%s: %v", srv, label)
1132                         err = f(srv)
1133                         if err != nil {
1134                                 err = fmt.Errorf("%s: %v", label, err)
1135                         }
1136                 }(srv)
1137         }
1138         var lastErr error
1139         for range bal.KeepServices {
1140                 if err := <-errs; err != nil {
1141                         bal.logf("%v", err)
1142                         lastErr = err
1143                 }
1144         }
1145         close(errs)
1146         return lastErr
1147 }
1148
1149 func (bal *Balancer) logf(f string, args ...interface{}) {
1150         if bal.Logger != nil {
1151                 bal.Logger.Printf(f, args...)
1152         }
1153 }
1154
1155 func (bal *Balancer) time(name, help string) func() {
1156         observer := bal.Metrics.DurationObserver(name+"_seconds", help)
1157         t0 := time.Now()
1158         bal.Logger.Printf("%s: start", name)
1159         return func() {
1160                 dur := time.Since(t0)
1161                 observer.Observe(dur.Seconds())
1162                 bal.Logger.Printf("%s: took %vs", name, dur.Seconds())
1163         }
1164 }
1165
1166 // Rendezvous hash sort function. Less efficient than sorting on
1167 // precomputed rendezvous hashes, but also rarely used.
1168 func rendezvousLess(i, j string, blkid arvados.SizedDigest) bool {
1169         a := md5.Sum([]byte(string(blkid[:32]) + i))
1170         b := md5.Sum([]byte(string(blkid[:32]) + j))
1171         return bytes.Compare(a[:], b[:]) < 0
1172 }