16480: Use longer timeout for keepstore index requests.
[arvados.git] / services / keep-balance / balance.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "context"
10         "crypto/md5"
11         "fmt"
12         "io"
13         "io/ioutil"
14         "log"
15         "math"
16         "os"
17         "runtime"
18         "sort"
19         "strings"
20         "sync"
21         "syscall"
22         "time"
23
24         "git.arvados.org/arvados.git/sdk/go/arvados"
25         "git.arvados.org/arvados.git/sdk/go/keepclient"
26         "github.com/sirupsen/logrus"
27 )
28
29 // Balancer compares the contents of keepstore servers with the
30 // collections stored in Arvados, and issues pull/trash requests
31 // needed to get (closer to) the optimal data layout.
32 //
33 // In the optimal data layout: every data block referenced by a
34 // collection is replicated at least as many times as desired by the
35 // collection; there are no unreferenced data blocks older than
36 // BlobSignatureTTL; and all N existing replicas of a given data block
37 // are in the N best positions in rendezvous probe order.
38 type Balancer struct {
39         Logger  logrus.FieldLogger
40         Dumper  logrus.FieldLogger
41         Metrics *metrics
42
43         LostBlocksFile string
44
45         *BlockStateMap
46         KeepServices       map[string]*KeepService
47         DefaultReplication int
48         MinMtime           int64
49
50         classes       []string
51         mounts        int
52         mountsByClass map[string]map[*KeepMount]bool
53         collScanned   int
54         serviceRoots  map[string]string
55         errors        []error
56         stats         balancerStats
57         mutex         sync.Mutex
58         lostBlocks    io.Writer
59 }
60
61 // Run performs a balance operation using the given config and
62 // runOptions, and returns RunOptions suitable for passing to a
63 // subsequent balance operation.
64 //
65 // Run should only be called once on a given Balancer object.
66 //
67 // Typical usage:
68 //
69 //   runOptions, err = (&Balancer{}).Run(config, runOptions)
70 func (bal *Balancer) Run(client *arvados.Client, cluster *arvados.Cluster, runOptions RunOptions) (nextRunOptions RunOptions, err error) {
71         nextRunOptions = runOptions
72
73         defer bal.time("sweep", "wall clock time to run one full sweep")()
74
75         var lbFile *os.File
76         if bal.LostBlocksFile != "" {
77                 tmpfn := bal.LostBlocksFile + ".tmp"
78                 lbFile, err = os.OpenFile(tmpfn, os.O_CREATE|os.O_WRONLY, 0777)
79                 if err != nil {
80                         return
81                 }
82                 defer lbFile.Close()
83                 err = syscall.Flock(int(lbFile.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
84                 if err != nil {
85                         return
86                 }
87                 defer func() {
88                         // Remove the tempfile only if we didn't get
89                         // as far as successfully renaming it.
90                         if lbFile != nil {
91                                 os.Remove(tmpfn)
92                         }
93                 }()
94                 bal.lostBlocks = lbFile
95         } else {
96                 bal.lostBlocks = ioutil.Discard
97         }
98
99         err = bal.DiscoverKeepServices(client)
100         if err != nil {
101                 return
102         }
103
104         for _, srv := range bal.KeepServices {
105                 err = srv.discoverMounts(client)
106                 if err != nil {
107                         return
108                 }
109         }
110         bal.cleanupMounts()
111
112         if err = bal.CheckSanityEarly(client); err != nil {
113                 return
114         }
115         rs := bal.rendezvousState()
116         if runOptions.CommitTrash && rs != runOptions.SafeRendezvousState {
117                 if runOptions.SafeRendezvousState != "" {
118                         bal.logf("notice: KeepServices list has changed since last run")
119                 }
120                 bal.logf("clearing existing trash lists, in case the new rendezvous order differs from previous run")
121                 if err = bal.ClearTrashLists(client); err != nil {
122                         return
123                 }
124                 // The current rendezvous state becomes "safe" (i.e.,
125                 // OK to compute changes for that state without
126                 // clearing existing trash lists) only now, after we
127                 // succeed in clearing existing trash lists.
128                 nextRunOptions.SafeRendezvousState = rs
129         }
130
131         // Indexing and sending trash/pull lists can take a long time
132         // on a big site. Prefer a long timeout (causing slow recovery
133         // from undetected network problems) to a short timeout
134         // (causing starvation via perpetual timeout/restart cycle).
135         client.Timeout = 24 * time.Hour
136
137         if err = bal.GetCurrentState(client, cluster.Collections.BalanceCollectionBatch, cluster.Collections.BalanceCollectionBuffers); err != nil {
138                 return
139         }
140         bal.ComputeChangeSets()
141         bal.PrintStatistics()
142         if err = bal.CheckSanityLate(); err != nil {
143                 return
144         }
145         if lbFile != nil {
146                 err = lbFile.Sync()
147                 if err != nil {
148                         return
149                 }
150                 err = os.Rename(bal.LostBlocksFile+".tmp", bal.LostBlocksFile)
151                 if err != nil {
152                         return
153                 }
154                 lbFile = nil
155         }
156         if runOptions.CommitPulls {
157                 err = bal.CommitPulls(client)
158                 if err != nil {
159                         // Skip trash if we can't pull. (Too cautious?)
160                         return
161                 }
162         }
163         if runOptions.CommitTrash {
164                 err = bal.CommitTrash(client)
165         }
166         return
167 }
168
169 // SetKeepServices sets the list of KeepServices to operate on.
170 func (bal *Balancer) SetKeepServices(srvList arvados.KeepServiceList) error {
171         bal.KeepServices = make(map[string]*KeepService)
172         for _, srv := range srvList.Items {
173                 bal.KeepServices[srv.UUID] = &KeepService{
174                         KeepService: srv,
175                         ChangeSet:   &ChangeSet{},
176                 }
177         }
178         return nil
179 }
180
181 // DiscoverKeepServices sets the list of KeepServices by calling the
182 // API to get a list of all services, and selecting the ones whose
183 // ServiceType is "disk"
184 func (bal *Balancer) DiscoverKeepServices(c *arvados.Client) error {
185         bal.KeepServices = make(map[string]*KeepService)
186         return c.EachKeepService(func(srv arvados.KeepService) error {
187                 if srv.ServiceType == "disk" {
188                         bal.KeepServices[srv.UUID] = &KeepService{
189                                 KeepService: srv,
190                                 ChangeSet:   &ChangeSet{},
191                         }
192                 } else {
193                         bal.logf("skipping %v with service type %q", srv.UUID, srv.ServiceType)
194                 }
195                 return nil
196         })
197 }
198
199 func (bal *Balancer) cleanupMounts() {
200         rwdev := map[string]*KeepService{}
201         for _, srv := range bal.KeepServices {
202                 for _, mnt := range srv.mounts {
203                         if !mnt.ReadOnly && mnt.DeviceID != "" {
204                                 rwdev[mnt.DeviceID] = srv
205                         }
206                 }
207         }
208         // Drop the readonly mounts whose device is mounted RW
209         // elsewhere.
210         for _, srv := range bal.KeepServices {
211                 var dedup []*KeepMount
212                 for _, mnt := range srv.mounts {
213                         if mnt.ReadOnly && rwdev[mnt.DeviceID] != nil {
214                                 bal.logf("skipping srv %s readonly mount %q because same device %q is mounted read-write on srv %s", srv, mnt.UUID, mnt.DeviceID, rwdev[mnt.DeviceID])
215                         } else {
216                                 dedup = append(dedup, mnt)
217                         }
218                 }
219                 srv.mounts = dedup
220         }
221         for _, srv := range bal.KeepServices {
222                 for _, mnt := range srv.mounts {
223                         if mnt.Replication <= 0 {
224                                 log.Printf("%s: mount %s reports replication=%d, using replication=1", srv, mnt.UUID, mnt.Replication)
225                                 mnt.Replication = 1
226                         }
227                 }
228         }
229 }
230
231 // CheckSanityEarly checks for configuration and runtime errors that
232 // can be detected before GetCurrentState() and ComputeChangeSets()
233 // are called.
234 //
235 // If it returns an error, it is pointless to run GetCurrentState or
236 // ComputeChangeSets: after doing so, the statistics would be
237 // meaningless and it would be dangerous to run any Commit methods.
238 func (bal *Balancer) CheckSanityEarly(c *arvados.Client) error {
239         u, err := c.CurrentUser()
240         if err != nil {
241                 return fmt.Errorf("CurrentUser(): %v", err)
242         }
243         if !u.IsActive || !u.IsAdmin {
244                 return fmt.Errorf("current user (%s) is not an active admin user", u.UUID)
245         }
246         for _, srv := range bal.KeepServices {
247                 if srv.ServiceType == "proxy" {
248                         return fmt.Errorf("config error: %s: proxy servers cannot be balanced", srv)
249                 }
250         }
251
252         var checkPage arvados.CollectionList
253         if err = c.RequestAndDecode(&checkPage, "GET", "arvados/v1/collections", nil, arvados.ResourceListParams{
254                 Limit:              new(int),
255                 Count:              "exact",
256                 IncludeTrash:       true,
257                 IncludeOldVersions: true,
258                 Filters: []arvados.Filter{{
259                         Attr:     "modified_at",
260                         Operator: "=",
261                         Operand:  nil,
262                 }},
263         }); err != nil {
264                 return err
265         } else if n := checkPage.ItemsAvailable; n > 0 {
266                 return fmt.Errorf("%d collections exist with null modified_at; cannot fetch reliably", n)
267         }
268
269         return nil
270 }
271
272 // rendezvousState returns a fingerprint (e.g., a sorted list of
273 // UUID+host+port) of the current set of keep services.
274 func (bal *Balancer) rendezvousState() string {
275         srvs := make([]string, 0, len(bal.KeepServices))
276         for _, srv := range bal.KeepServices {
277                 srvs = append(srvs, srv.String())
278         }
279         sort.Strings(srvs)
280         return strings.Join(srvs, "; ")
281 }
282
283 // ClearTrashLists sends an empty trash list to each keep
284 // service. Calling this before GetCurrentState avoids races.
285 //
286 // When a block appears in an index, we assume that replica will still
287 // exist after we delete other replicas on other servers. However,
288 // it's possible that a previous rebalancing operation made different
289 // decisions (e.g., servers were added/removed, and rendezvous order
290 // changed). In this case, the replica might already be on that
291 // server's trash list, and it might be deleted before we send a
292 // replacement trash list.
293 //
294 // We avoid this problem if we clear all trash lists before getting
295 // indexes. (We also assume there is only one rebalancing process
296 // running at a time.)
297 func (bal *Balancer) ClearTrashLists(c *arvados.Client) error {
298         for _, srv := range bal.KeepServices {
299                 srv.ChangeSet = &ChangeSet{}
300         }
301         return bal.CommitTrash(c)
302 }
303
304 // GetCurrentState determines the current replication state, and the
305 // desired replication level, for every block that is either
306 // retrievable or referenced.
307 //
308 // It determines the current replication state by reading the block index
309 // from every known Keep service.
310 //
311 // It determines the desired replication level by retrieving all
312 // collection manifests in the database (API server).
313 //
314 // It encodes the resulting information in BlockStateMap.
315 func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) error {
316         ctx, cancel := context.WithCancel(context.Background())
317         defer cancel()
318
319         defer bal.time("get_state", "wall clock time to get current state")()
320         bal.BlockStateMap = NewBlockStateMap()
321
322         dd, err := c.DiscoveryDocument()
323         if err != nil {
324                 return err
325         }
326         bal.DefaultReplication = dd.DefaultCollectionReplication
327         bal.MinMtime = time.Now().UnixNano() - dd.BlobSignatureTTL*1e9
328
329         errs := make(chan error, 1)
330         wg := sync.WaitGroup{}
331
332         // When a device is mounted more than once, we will get its
333         // index only once, and call AddReplicas on all of the mounts.
334         // equivMount keys are the mounts that will be indexed, and
335         // each value is a list of mounts to apply the received index
336         // to.
337         equivMount := map[*KeepMount][]*KeepMount{}
338         // deviceMount maps each device ID to the one mount that will
339         // be indexed for that device.
340         deviceMount := map[string]*KeepMount{}
341         for _, srv := range bal.KeepServices {
342                 for _, mnt := range srv.mounts {
343                         equiv := deviceMount[mnt.DeviceID]
344                         if equiv == nil {
345                                 equiv = mnt
346                                 if mnt.DeviceID != "" {
347                                         deviceMount[mnt.DeviceID] = equiv
348                                 }
349                         }
350                         equivMount[equiv] = append(equivMount[equiv], mnt)
351                 }
352         }
353
354         // Start one goroutine for each (non-redundant) mount:
355         // retrieve the index, and add the returned blocks to
356         // BlockStateMap.
357         for _, mounts := range equivMount {
358                 wg.Add(1)
359                 go func(mounts []*KeepMount) {
360                         defer wg.Done()
361                         bal.logf("mount %s: retrieve index from %s", mounts[0], mounts[0].KeepService)
362                         idx, err := mounts[0].KeepService.IndexMount(ctx, c, mounts[0].UUID, "")
363                         if err != nil {
364                                 select {
365                                 case errs <- fmt.Errorf("%s: retrieve index: %v", mounts[0], err):
366                                 default:
367                                 }
368                                 cancel()
369                                 return
370                         }
371                         if len(errs) > 0 {
372                                 // Some other goroutine encountered an
373                                 // error -- any further effort here
374                                 // will be wasted.
375                                 return
376                         }
377                         for _, mount := range mounts {
378                                 bal.logf("%s: add %d entries to map", mount, len(idx))
379                                 bal.BlockStateMap.AddReplicas(mount, idx)
380                                 bal.logf("%s: added %d entries to map at %dx (%d replicas)", mount, len(idx), mount.Replication, len(idx)*mount.Replication)
381                         }
382                         bal.logf("mount %s: index done", mounts[0])
383                 }(mounts)
384         }
385
386         // collQ buffers incoming collections so we can start fetching
387         // the next page without waiting for the current page to
388         // finish processing.
389         collQ := make(chan arvados.Collection, bufs)
390
391         // Start a goroutine to process collections. (We could use a
392         // worker pool here, but even with a single worker we already
393         // process collections much faster than we can retrieve them.)
394         wg.Add(1)
395         go func() {
396                 defer wg.Done()
397                 for coll := range collQ {
398                         err := bal.addCollection(coll)
399                         if err != nil || len(errs) > 0 {
400                                 select {
401                                 case errs <- err:
402                                 default:
403                                 }
404                                 for range collQ {
405                                 }
406                                 cancel()
407                                 return
408                         }
409                         bal.collScanned++
410                 }
411         }()
412
413         // Start a goroutine to retrieve all collections from the
414         // Arvados database and send them to collQ for processing.
415         wg.Add(1)
416         go func() {
417                 defer wg.Done()
418                 err = EachCollection(c, pageSize,
419                         func(coll arvados.Collection) error {
420                                 collQ <- coll
421                                 if len(errs) > 0 {
422                                         // some other GetCurrentState
423                                         // error happened: no point
424                                         // getting any more
425                                         // collections.
426                                         return fmt.Errorf("")
427                                 }
428                                 return nil
429                         }, func(done, total int) {
430                                 bal.logf("collections: %d/%d", done, total)
431                         })
432                 close(collQ)
433                 if err != nil {
434                         select {
435                         case errs <- err:
436                         default:
437                         }
438                         cancel()
439                 }
440         }()
441
442         wg.Wait()
443         if len(errs) > 0 {
444                 return <-errs
445         }
446         return nil
447 }
448
449 func (bal *Balancer) addCollection(coll arvados.Collection) error {
450         blkids, err := coll.SizedDigests()
451         if err != nil {
452                 return fmt.Errorf("%v: %v", coll.UUID, err)
453         }
454         repl := bal.DefaultReplication
455         if coll.ReplicationDesired != nil {
456                 repl = *coll.ReplicationDesired
457         }
458         bal.Logger.Debugf("%v: %d block x%d", coll.UUID, len(blkids), repl)
459         // Pass pdh to IncreaseDesired only if LostBlocksFile is being
460         // written -- otherwise it's just a waste of memory.
461         pdh := ""
462         if bal.LostBlocksFile != "" {
463                 pdh = coll.PortableDataHash
464         }
465         bal.BlockStateMap.IncreaseDesired(pdh, coll.StorageClassesDesired, repl, blkids)
466         return nil
467 }
468
469 // ComputeChangeSets compares, for each known block, the current and
470 // desired replication states. If it is possible to get closer to the
471 // desired state by copying or deleting blocks, it adds those changes
472 // to the relevant KeepServices' ChangeSets.
473 //
474 // It does not actually apply any of the computed changes.
475 func (bal *Balancer) ComputeChangeSets() {
476         // This just calls balanceBlock() once for each block, using a
477         // pool of worker goroutines.
478         defer bal.time("changeset_compute", "wall clock time to compute changesets")()
479         bal.setupLookupTables()
480
481         type balanceTask struct {
482                 blkid arvados.SizedDigest
483                 blk   *BlockState
484         }
485         workers := runtime.GOMAXPROCS(-1)
486         todo := make(chan balanceTask, workers)
487         go func() {
488                 bal.BlockStateMap.Apply(func(blkid arvados.SizedDigest, blk *BlockState) {
489                         todo <- balanceTask{
490                                 blkid: blkid,
491                                 blk:   blk,
492                         }
493                 })
494                 close(todo)
495         }()
496         results := make(chan balanceResult, workers)
497         go func() {
498                 var wg sync.WaitGroup
499                 for i := 0; i < workers; i++ {
500                         wg.Add(1)
501                         go func() {
502                                 for work := range todo {
503                                         results <- bal.balanceBlock(work.blkid, work.blk)
504                                 }
505                                 wg.Done()
506                         }()
507                 }
508                 wg.Wait()
509                 close(results)
510         }()
511         bal.collectStatistics(results)
512 }
513
514 func (bal *Balancer) setupLookupTables() {
515         bal.serviceRoots = make(map[string]string)
516         bal.classes = defaultClasses
517         bal.mountsByClass = map[string]map[*KeepMount]bool{"default": {}}
518         bal.mounts = 0
519         for _, srv := range bal.KeepServices {
520                 bal.serviceRoots[srv.UUID] = srv.UUID
521                 for _, mnt := range srv.mounts {
522                         bal.mounts++
523
524                         // All mounts on a read-only service are
525                         // effectively read-only.
526                         mnt.ReadOnly = mnt.ReadOnly || srv.ReadOnly
527
528                         if len(mnt.StorageClasses) == 0 {
529                                 bal.mountsByClass["default"][mnt] = true
530                                 continue
531                         }
532                         for class := range mnt.StorageClasses {
533                                 if mbc := bal.mountsByClass[class]; mbc == nil {
534                                         bal.classes = append(bal.classes, class)
535                                         bal.mountsByClass[class] = map[*KeepMount]bool{mnt: true}
536                                 } else {
537                                         mbc[mnt] = true
538                                 }
539                         }
540                 }
541         }
542         // Consider classes in lexicographic order to avoid flapping
543         // between balancing runs.  The outcome of the "prefer a mount
544         // we're already planning to use for a different storage
545         // class" case in balanceBlock depends on the order classes
546         // are considered.
547         sort.Strings(bal.classes)
548 }
549
550 const (
551         changeStay = iota
552         changePull
553         changeTrash
554         changeNone
555 )
556
557 var changeName = map[int]string{
558         changeStay:  "stay",
559         changePull:  "pull",
560         changeTrash: "trash",
561         changeNone:  "none",
562 }
563
564 type balancedBlockState struct {
565         needed       int
566         unneeded     int
567         pulling      int
568         unachievable bool
569 }
570
571 type balanceResult struct {
572         blk        *BlockState
573         blkid      arvados.SizedDigest
574         lost       bool
575         blockState balancedBlockState
576         classState map[string]balancedBlockState
577 }
578
579 type slot struct {
580         mnt  *KeepMount // never nil
581         repl *Replica   // replica already stored here (or nil)
582         want bool       // we should pull/leave a replica here
583 }
584
585 // balanceBlock compares current state to desired state for a single
586 // block, and makes the appropriate ChangeSet calls.
587 func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) balanceResult {
588         bal.Logger.Debugf("balanceBlock: %v %+v", blkid, blk)
589
590         // Build a list of all slots (one per mounted volume).
591         slots := make([]slot, 0, bal.mounts)
592         for _, srv := range bal.KeepServices {
593                 for _, mnt := range srv.mounts {
594                         var repl *Replica
595                         for r := range blk.Replicas {
596                                 if blk.Replicas[r].KeepMount == mnt {
597                                         repl = &blk.Replicas[r]
598                                 }
599                         }
600                         // Initial value of "want" is "have, and can't
601                         // delete". These untrashable replicas get
602                         // prioritized when sorting slots: otherwise,
603                         // non-optimal readonly copies would cause us
604                         // to overreplicate.
605                         slots = append(slots, slot{
606                                 mnt:  mnt,
607                                 repl: repl,
608                                 want: repl != nil && mnt.ReadOnly,
609                         })
610                 }
611         }
612
613         uuids := keepclient.NewRootSorter(bal.serviceRoots, string(blkid[:32])).GetSortedRoots()
614         srvRendezvous := make(map[*KeepService]int, len(uuids))
615         for i, uuid := range uuids {
616                 srv := bal.KeepServices[uuid]
617                 srvRendezvous[srv] = i
618         }
619
620         // Below we set underreplicated=true if we find any storage
621         // class that's currently underreplicated -- in that case we
622         // won't want to trash any replicas.
623         underreplicated := false
624
625         unsafeToDelete := make(map[int64]bool, len(slots))
626         for _, class := range bal.classes {
627                 desired := blk.Desired[class]
628                 if desired == 0 {
629                         continue
630                 }
631
632                 // Sort the slots by desirability.
633                 sort.Slice(slots, func(i, j int) bool {
634                         si, sj := slots[i], slots[j]
635                         if classi, classj := bal.mountsByClass[class][si.mnt], bal.mountsByClass[class][sj.mnt]; classi != classj {
636                                 // Prefer a mount that satisfies the
637                                 // desired class.
638                                 return bal.mountsByClass[class][si.mnt]
639                         } else if si.want != sj.want {
640                                 // Prefer a mount that will have a
641                                 // replica no matter what we do here
642                                 // -- either because it already has an
643                                 // untrashable replica, or because we
644                                 // already need it to satisfy a
645                                 // different storage class.
646                                 return si.want
647                         } else if orderi, orderj := srvRendezvous[si.mnt.KeepService], srvRendezvous[sj.mnt.KeepService]; orderi != orderj {
648                                 // Prefer a better rendezvous
649                                 // position.
650                                 return orderi < orderj
651                         } else if repli, replj := si.repl != nil, sj.repl != nil; repli != replj {
652                                 // Prefer a mount that already has a
653                                 // replica.
654                                 return repli
655                         } else {
656                                 // If pull/trash turns out to be
657                                 // needed, distribute the
658                                 // new/remaining replicas uniformly
659                                 // across qualifying mounts on a given
660                                 // server.
661                                 return rendezvousLess(si.mnt.DeviceID, sj.mnt.DeviceID, blkid)
662                         }
663                 })
664
665                 // Servers/mounts/devices (with or without existing
666                 // replicas) that are part of the best achievable
667                 // layout for this storage class.
668                 wantSrv := map[*KeepService]bool{}
669                 wantMnt := map[*KeepMount]bool{}
670                 wantDev := map[string]bool{}
671                 // Positions (with existing replicas) that have been
672                 // protected (via unsafeToDelete) to ensure we don't
673                 // reduce replication below desired level when
674                 // trashing replicas that aren't optimal positions for
675                 // any storage class.
676                 protMnt := map[*KeepMount]bool{}
677                 // Replication planned so far (corresponds to wantMnt).
678                 replWant := 0
679                 // Protected replication (corresponds to protMnt).
680                 replProt := 0
681
682                 // trySlot tries using a slot to meet requirements,
683                 // and returns true if all requirements are met.
684                 trySlot := func(i int) bool {
685                         slot := slots[i]
686                         if wantMnt[slot.mnt] || wantDev[slot.mnt.DeviceID] {
687                                 // Already allocated a replica to this
688                                 // backend device, possibly on a
689                                 // different server.
690                                 return false
691                         }
692                         if replProt < desired && slot.repl != nil && !protMnt[slot.mnt] {
693                                 unsafeToDelete[slot.repl.Mtime] = true
694                                 protMnt[slot.mnt] = true
695                                 replProt += slot.mnt.Replication
696                         }
697                         if replWant < desired && (slot.repl != nil || !slot.mnt.ReadOnly) {
698                                 slots[i].want = true
699                                 wantSrv[slot.mnt.KeepService] = true
700                                 wantMnt[slot.mnt] = true
701                                 if slot.mnt.DeviceID != "" {
702                                         wantDev[slot.mnt.DeviceID] = true
703                                 }
704                                 replWant += slot.mnt.Replication
705                         }
706                         return replProt >= desired && replWant >= desired
707                 }
708
709                 // First try to achieve desired replication without
710                 // using the same server twice.
711                 done := false
712                 for i := 0; i < len(slots) && !done; i++ {
713                         if !wantSrv[slots[i].mnt.KeepService] {
714                                 done = trySlot(i)
715                         }
716                 }
717
718                 // If that didn't suffice, do another pass without the
719                 // "distinct services" restriction. (Achieving the
720                 // desired volume replication on fewer than the
721                 // desired number of services is better than
722                 // underreplicating.)
723                 for i := 0; i < len(slots) && !done; i++ {
724                         done = trySlot(i)
725                 }
726
727                 if !underreplicated {
728                         safe := 0
729                         for _, slot := range slots {
730                                 if slot.repl == nil || !bal.mountsByClass[class][slot.mnt] {
731                                         continue
732                                 }
733                                 if safe += slot.mnt.Replication; safe >= desired {
734                                         break
735                                 }
736                         }
737                         underreplicated = safe < desired
738                 }
739
740                 // Avoid deleting wanted replicas from devices that
741                 // are mounted on multiple servers -- even if they
742                 // haven't already been added to unsafeToDelete
743                 // because the servers report different Mtimes.
744                 for _, slot := range slots {
745                         if slot.repl != nil && wantDev[slot.mnt.DeviceID] {
746                                 unsafeToDelete[slot.repl.Mtime] = true
747                         }
748                 }
749         }
750
751         // TODO: If multiple replicas are trashable, prefer the oldest
752         // replica that doesn't have a timestamp collision with
753         // others.
754
755         for i, slot := range slots {
756                 // Don't trash (1) any replicas of an underreplicated
757                 // block, even if they're in the wrong positions, or
758                 // (2) any replicas whose Mtimes are identical to
759                 // needed replicas (in case we're really seeing the
760                 // same copy via different mounts).
761                 if slot.repl != nil && (underreplicated || unsafeToDelete[slot.repl.Mtime]) {
762                         slots[i].want = true
763                 }
764         }
765
766         classState := make(map[string]balancedBlockState, len(bal.classes))
767         for _, class := range bal.classes {
768                 classState[class] = computeBlockState(slots, bal.mountsByClass[class], len(blk.Replicas), blk.Desired[class])
769         }
770         blockState := computeBlockState(slots, nil, len(blk.Replicas), 0)
771
772         var lost bool
773         var changes []string
774         for _, slot := range slots {
775                 // TODO: request a Touch if Mtime is duplicated.
776                 var change int
777                 switch {
778                 case !slot.want && slot.repl != nil && slot.repl.Mtime < bal.MinMtime:
779                         slot.mnt.KeepService.AddTrash(Trash{
780                                 SizedDigest: blkid,
781                                 Mtime:       slot.repl.Mtime,
782                                 From:        slot.mnt,
783                         })
784                         change = changeTrash
785                 case slot.repl == nil && slot.want && len(blk.Replicas) == 0:
786                         lost = true
787                         change = changeNone
788                 case slot.repl == nil && slot.want && !slot.mnt.ReadOnly:
789                         slot.mnt.KeepService.AddPull(Pull{
790                                 SizedDigest: blkid,
791                                 From:        blk.Replicas[0].KeepMount.KeepService,
792                                 To:          slot.mnt,
793                         })
794                         change = changePull
795                 case slot.repl != nil:
796                         change = changeStay
797                 default:
798                         change = changeNone
799                 }
800                 if bal.Dumper != nil {
801                         var mtime int64
802                         if slot.repl != nil {
803                                 mtime = slot.repl.Mtime
804                         }
805                         srv := slot.mnt.KeepService
806                         changes = append(changes, fmt.Sprintf("%s:%d/%s=%s,%d", srv.ServiceHost, srv.ServicePort, slot.mnt.UUID, changeName[change], mtime))
807                 }
808         }
809         if bal.Dumper != nil {
810                 bal.Dumper.Printf("%s refs=%d needed=%d unneeded=%d pulling=%v %v %v", blkid, blk.RefCount, blockState.needed, blockState.unneeded, blockState.pulling, blk.Desired, changes)
811         }
812         return balanceResult{
813                 blk:        blk,
814                 blkid:      blkid,
815                 lost:       lost,
816                 blockState: blockState,
817                 classState: classState,
818         }
819 }
820
821 func computeBlockState(slots []slot, onlyCount map[*KeepMount]bool, have, needRepl int) (bbs balancedBlockState) {
822         repl := 0
823         countedDev := map[string]bool{}
824         for _, slot := range slots {
825                 if onlyCount != nil && !onlyCount[slot.mnt] {
826                         continue
827                 }
828                 if countedDev[slot.mnt.DeviceID] {
829                         continue
830                 }
831                 switch {
832                 case slot.repl != nil && slot.want:
833                         bbs.needed++
834                         repl += slot.mnt.Replication
835                 case slot.repl != nil && !slot.want:
836                         bbs.unneeded++
837                         repl += slot.mnt.Replication
838                 case slot.repl == nil && slot.want && have > 0:
839                         bbs.pulling++
840                         repl += slot.mnt.Replication
841                 }
842                 if slot.mnt.DeviceID != "" {
843                         countedDev[slot.mnt.DeviceID] = true
844                 }
845         }
846         if repl < needRepl {
847                 bbs.unachievable = true
848         }
849         return
850 }
851
852 type blocksNBytes struct {
853         replicas int
854         blocks   int
855         bytes    int64
856 }
857
858 func (bb blocksNBytes) String() string {
859         return fmt.Sprintf("%d replicas (%d blocks, %d bytes)", bb.replicas, bb.blocks, bb.bytes)
860 }
861
862 type replicationStats struct {
863         needed       blocksNBytes
864         unneeded     blocksNBytes
865         pulling      blocksNBytes
866         unachievable blocksNBytes
867 }
868
869 type balancerStats struct {
870         lost          blocksNBytes
871         overrep       blocksNBytes
872         unref         blocksNBytes
873         garbage       blocksNBytes
874         underrep      blocksNBytes
875         unachievable  blocksNBytes
876         justright     blocksNBytes
877         desired       blocksNBytes
878         current       blocksNBytes
879         pulls         int
880         trashes       int
881         replHistogram []int
882         classStats    map[string]replicationStats
883
884         // collectionBytes / collectionBlockBytes = deduplication ratio
885         collectionBytes      int64 // sum(bytes in referenced blocks) across all collections
886         collectionBlockBytes int64 // sum(block size) across all blocks referenced by collections
887         collectionBlockRefs  int64 // sum(number of blocks referenced) across all collections
888         collectionBlocks     int64 // number of blocks referenced by any collection
889 }
890
891 func (s *balancerStats) dedupByteRatio() float64 {
892         if s.collectionBlockBytes == 0 {
893                 return 0
894         }
895         return float64(s.collectionBytes) / float64(s.collectionBlockBytes)
896 }
897
898 func (s *balancerStats) dedupBlockRatio() float64 {
899         if s.collectionBlocks == 0 {
900                 return 0
901         }
902         return float64(s.collectionBlockRefs) / float64(s.collectionBlocks)
903 }
904
905 func (bal *Balancer) collectStatistics(results <-chan balanceResult) {
906         var s balancerStats
907         s.replHistogram = make([]int, 2)
908         s.classStats = make(map[string]replicationStats, len(bal.classes))
909         for result := range results {
910                 bytes := result.blkid.Size()
911
912                 if rc := int64(result.blk.RefCount); rc > 0 {
913                         s.collectionBytes += rc * bytes
914                         s.collectionBlockBytes += bytes
915                         s.collectionBlockRefs += rc
916                         s.collectionBlocks++
917                 }
918
919                 for class, state := range result.classState {
920                         cs := s.classStats[class]
921                         if state.unachievable {
922                                 cs.unachievable.replicas++
923                                 cs.unachievable.blocks++
924                                 cs.unachievable.bytes += bytes
925                         }
926                         if state.needed > 0 {
927                                 cs.needed.replicas += state.needed
928                                 cs.needed.blocks++
929                                 cs.needed.bytes += bytes * int64(state.needed)
930                         }
931                         if state.unneeded > 0 {
932                                 cs.unneeded.replicas += state.unneeded
933                                 cs.unneeded.blocks++
934                                 cs.unneeded.bytes += bytes * int64(state.unneeded)
935                         }
936                         if state.pulling > 0 {
937                                 cs.pulling.replicas += state.pulling
938                                 cs.pulling.blocks++
939                                 cs.pulling.bytes += bytes * int64(state.pulling)
940                         }
941                         s.classStats[class] = cs
942                 }
943
944                 bs := result.blockState
945                 switch {
946                 case result.lost:
947                         s.lost.replicas++
948                         s.lost.blocks++
949                         s.lost.bytes += bytes
950                         fmt.Fprintf(bal.lostBlocks, "%s", strings.SplitN(string(result.blkid), "+", 2)[0])
951                         for pdh := range result.blk.Refs {
952                                 fmt.Fprintf(bal.lostBlocks, " %s", pdh)
953                         }
954                         fmt.Fprint(bal.lostBlocks, "\n")
955                 case bs.pulling > 0:
956                         s.underrep.replicas += bs.pulling
957                         s.underrep.blocks++
958                         s.underrep.bytes += bytes * int64(bs.pulling)
959                 case bs.unachievable:
960                         s.underrep.replicas++
961                         s.underrep.blocks++
962                         s.underrep.bytes += bytes
963                 case bs.unneeded > 0 && bs.needed == 0:
964                         // Count as "garbage" if all replicas are old
965                         // enough to trash, otherwise count as
966                         // "unref".
967                         counter := &s.garbage
968                         for _, r := range result.blk.Replicas {
969                                 if r.Mtime >= bal.MinMtime {
970                                         counter = &s.unref
971                                         break
972                                 }
973                         }
974                         counter.replicas += bs.unneeded
975                         counter.blocks++
976                         counter.bytes += bytes * int64(bs.unneeded)
977                 case bs.unneeded > 0:
978                         s.overrep.replicas += bs.unneeded
979                         s.overrep.blocks++
980                         s.overrep.bytes += bytes * int64(bs.unneeded)
981                 default:
982                         s.justright.replicas += bs.needed
983                         s.justright.blocks++
984                         s.justright.bytes += bytes * int64(bs.needed)
985                 }
986
987                 if bs.needed > 0 {
988                         s.desired.replicas += bs.needed
989                         s.desired.blocks++
990                         s.desired.bytes += bytes * int64(bs.needed)
991                 }
992                 if bs.needed+bs.unneeded > 0 {
993                         s.current.replicas += bs.needed + bs.unneeded
994                         s.current.blocks++
995                         s.current.bytes += bytes * int64(bs.needed+bs.unneeded)
996                 }
997
998                 for len(s.replHistogram) <= bs.needed+bs.unneeded {
999                         s.replHistogram = append(s.replHistogram, 0)
1000                 }
1001                 s.replHistogram[bs.needed+bs.unneeded]++
1002         }
1003         for _, srv := range bal.KeepServices {
1004                 s.pulls += len(srv.ChangeSet.Pulls)
1005                 s.trashes += len(srv.ChangeSet.Trashes)
1006         }
1007         bal.stats = s
1008         bal.Metrics.UpdateStats(s)
1009 }
1010
1011 // PrintStatistics writes statistics about the computed changes to
1012 // bal.Logger. It should not be called until ComputeChangeSets has
1013 // finished.
1014 func (bal *Balancer) PrintStatistics() {
1015         bal.logf("===")
1016         bal.logf("%s lost (0=have<want)", bal.stats.lost)
1017         bal.logf("%s underreplicated (0<have<want)", bal.stats.underrep)
1018         bal.logf("%s just right (have=want)", bal.stats.justright)
1019         bal.logf("%s overreplicated (have>want>0)", bal.stats.overrep)
1020         bal.logf("%s unreferenced (have>want=0, new)", bal.stats.unref)
1021         bal.logf("%s garbage (have>want=0, old)", bal.stats.garbage)
1022         for _, class := range bal.classes {
1023                 cs := bal.stats.classStats[class]
1024                 bal.logf("===")
1025                 bal.logf("storage class %q: %s needed", class, cs.needed)
1026                 bal.logf("storage class %q: %s unneeded", class, cs.unneeded)
1027                 bal.logf("storage class %q: %s pulling", class, cs.pulling)
1028                 bal.logf("storage class %q: %s unachievable", class, cs.unachievable)
1029         }
1030         bal.logf("===")
1031         bal.logf("%s total commitment (excluding unreferenced)", bal.stats.desired)
1032         bal.logf("%s total usage", bal.stats.current)
1033         bal.logf("===")
1034         for _, srv := range bal.KeepServices {
1035                 bal.logf("%s: %v\n", srv, srv.ChangeSet)
1036         }
1037         bal.logf("===")
1038         bal.printHistogram(60)
1039         bal.logf("===")
1040 }
1041
1042 func (bal *Balancer) printHistogram(hashColumns int) {
1043         bal.logf("Replication level distribution:")
1044         maxCount := 0
1045         for _, count := range bal.stats.replHistogram {
1046                 if maxCount < count {
1047                         maxCount = count
1048                 }
1049         }
1050         hashes := strings.Repeat("#", hashColumns)
1051         countWidth := 1 + int(math.Log10(float64(maxCount+1)))
1052         scaleCount := 10 * float64(hashColumns) / math.Floor(1+10*math.Log10(float64(maxCount+1)))
1053         for repl, count := range bal.stats.replHistogram {
1054                 nHashes := int(scaleCount * math.Log10(float64(count+1)))
1055                 bal.logf("%2d: %*d %s", repl, countWidth, count, hashes[:nHashes])
1056         }
1057 }
1058
1059 // CheckSanityLate checks for configuration and runtime errors after
1060 // GetCurrentState() and ComputeChangeSets() have finished.
1061 //
1062 // If it returns an error, it is dangerous to run any Commit methods.
1063 func (bal *Balancer) CheckSanityLate() error {
1064         if bal.errors != nil {
1065                 for _, err := range bal.errors {
1066                         bal.logf("deferred error: %v", err)
1067                 }
1068                 return fmt.Errorf("cannot proceed safely after deferred errors")
1069         }
1070
1071         if bal.collScanned == 0 {
1072                 return fmt.Errorf("received zero collections")
1073         }
1074
1075         anyDesired := false
1076         bal.BlockStateMap.Apply(func(_ arvados.SizedDigest, blk *BlockState) {
1077                 for _, desired := range blk.Desired {
1078                         if desired > 0 {
1079                                 anyDesired = true
1080                                 break
1081                         }
1082                 }
1083         })
1084         if !anyDesired {
1085                 return fmt.Errorf("zero blocks have desired replication>0")
1086         }
1087
1088         if dr := bal.DefaultReplication; dr < 1 {
1089                 return fmt.Errorf("Default replication (%d) is less than 1", dr)
1090         }
1091
1092         // TODO: no two services have identical indexes
1093         // TODO: no collisions (same md5, different size)
1094         return nil
1095 }
1096
1097 // CommitPulls sends the computed lists of pull requests to the
1098 // keepstore servers. This has the effect of increasing replication of
1099 // existing blocks that are either underreplicated or poorly
1100 // distributed according to rendezvous hashing.
1101 func (bal *Balancer) CommitPulls(c *arvados.Client) error {
1102         defer bal.time("send_pull_lists", "wall clock time to send pull lists")()
1103         return bal.commitAsync(c, "send pull list",
1104                 func(srv *KeepService) error {
1105                         return srv.CommitPulls(c)
1106                 })
1107 }
1108
1109 // CommitTrash sends the computed lists of trash requests to the
1110 // keepstore servers. This has the effect of deleting blocks that are
1111 // overreplicated or unreferenced.
1112 func (bal *Balancer) CommitTrash(c *arvados.Client) error {
1113         defer bal.time("send_trash_lists", "wall clock time to send trash lists")()
1114         return bal.commitAsync(c, "send trash list",
1115                 func(srv *KeepService) error {
1116                         return srv.CommitTrash(c)
1117                 })
1118 }
1119
1120 func (bal *Balancer) commitAsync(c *arvados.Client, label string, f func(srv *KeepService) error) error {
1121         errs := make(chan error)
1122         for _, srv := range bal.KeepServices {
1123                 go func(srv *KeepService) {
1124                         var err error
1125                         defer func() { errs <- err }()
1126                         label := fmt.Sprintf("%s: %v", srv, label)
1127                         err = f(srv)
1128                         if err != nil {
1129                                 err = fmt.Errorf("%s: %v", label, err)
1130                         }
1131                 }(srv)
1132         }
1133         var lastErr error
1134         for range bal.KeepServices {
1135                 if err := <-errs; err != nil {
1136                         bal.logf("%v", err)
1137                         lastErr = err
1138                 }
1139         }
1140         close(errs)
1141         return lastErr
1142 }
1143
1144 func (bal *Balancer) logf(f string, args ...interface{}) {
1145         if bal.Logger != nil {
1146                 bal.Logger.Printf(f, args...)
1147         }
1148 }
1149
1150 func (bal *Balancer) time(name, help string) func() {
1151         observer := bal.Metrics.DurationObserver(name+"_seconds", help)
1152         t0 := time.Now()
1153         bal.Logger.Printf("%s: start", name)
1154         return func() {
1155                 dur := time.Since(t0)
1156                 observer.Observe(dur.Seconds())
1157                 bal.Logger.Printf("%s: took %vs", name, dur.Seconds())
1158         }
1159 }
1160
1161 // Rendezvous hash sort function. Less efficient than sorting on
1162 // precomputed rendezvous hashes, but also rarely used.
1163 func rendezvousLess(i, j string, blkid arvados.SizedDigest) bool {
1164         a := md5.Sum([]byte(string(blkid[:32]) + i))
1165         b := md5.Sum([]byte(string(blkid[:32]) + j))
1166         return bytes.Compare(a[:], b[:]) < 0
1167 }