17574: Get collections directly from DB instead of controller.
[arvados.git] / services / keep-balance / balance.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "context"
10         "crypto/md5"
11         "fmt"
12         "io"
13         "io/ioutil"
14         "log"
15         "math"
16         "os"
17         "runtime"
18         "sort"
19         "strings"
20         "sync"
21         "syscall"
22         "time"
23
24         "git.arvados.org/arvados.git/sdk/go/arvados"
25         "git.arvados.org/arvados.git/sdk/go/keepclient"
26         "github.com/jmoiron/sqlx"
27         "github.com/sirupsen/logrus"
28 )
29
30 // Balancer compares the contents of keepstore servers with the
31 // collections stored in Arvados, and issues pull/trash requests
32 // needed to get (closer to) the optimal data layout.
33 //
34 // In the optimal data layout: every data block referenced by a
35 // collection is replicated at least as many times as desired by the
36 // collection; there are no unreferenced data blocks older than
37 // BlobSignatureTTL; and all N existing replicas of a given data block
38 // are in the N best positions in rendezvous probe order.
39 type Balancer struct {
40         DB      *sqlx.DB
41         Logger  logrus.FieldLogger
42         Dumper  logrus.FieldLogger
43         Metrics *metrics
44
45         LostBlocksFile string
46
47         *BlockStateMap
48         KeepServices       map[string]*KeepService
49         DefaultReplication int
50         MinMtime           int64
51
52         classes       []string
53         mounts        int
54         mountsByClass map[string]map[*KeepMount]bool
55         collScanned   int
56         serviceRoots  map[string]string
57         errors        []error
58         stats         balancerStats
59         mutex         sync.Mutex
60         lostBlocks    io.Writer
61 }
62
63 // Run performs a balance operation using the given config and
64 // runOptions, and returns RunOptions suitable for passing to a
65 // subsequent balance operation.
66 //
67 // Run should only be called once on a given Balancer object.
68 //
69 // Typical usage:
70 //
71 //   runOptions, err = (&Balancer{}).Run(config, runOptions)
72 func (bal *Balancer) Run(client *arvados.Client, cluster *arvados.Cluster, runOptions RunOptions) (nextRunOptions RunOptions, err error) {
73         nextRunOptions = runOptions
74
75         defer bal.time("sweep", "wall clock time to run one full sweep")()
76
77         ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(cluster.Collections.BalanceTimeout.Duration()))
78         defer cancel()
79
80         var lbFile *os.File
81         if bal.LostBlocksFile != "" {
82                 tmpfn := bal.LostBlocksFile + ".tmp"
83                 lbFile, err = os.OpenFile(tmpfn, os.O_CREATE|os.O_WRONLY, 0777)
84                 if err != nil {
85                         return
86                 }
87                 defer lbFile.Close()
88                 err = syscall.Flock(int(lbFile.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
89                 if err != nil {
90                         return
91                 }
92                 defer func() {
93                         // Remove the tempfile only if we didn't get
94                         // as far as successfully renaming it.
95                         if lbFile != nil {
96                                 os.Remove(tmpfn)
97                         }
98                 }()
99                 bal.lostBlocks = lbFile
100         } else {
101                 bal.lostBlocks = ioutil.Discard
102         }
103
104         err = bal.DiscoverKeepServices(client)
105         if err != nil {
106                 return
107         }
108
109         for _, srv := range bal.KeepServices {
110                 err = srv.discoverMounts(client)
111                 if err != nil {
112                         return
113                 }
114         }
115         bal.cleanupMounts()
116
117         if err = bal.CheckSanityEarly(client); err != nil {
118                 return
119         }
120
121         // On a big site, indexing and sending trash/pull lists can
122         // take much longer than the usual 5 minute client
123         // timeout. From here on, we rely on the context deadline
124         // instead, aborting the entire operation if any part takes
125         // too long.
126         client.Timeout = 0
127
128         rs := bal.rendezvousState()
129         if runOptions.CommitTrash && rs != runOptions.SafeRendezvousState {
130                 if runOptions.SafeRendezvousState != "" {
131                         bal.logf("notice: KeepServices list has changed since last run")
132                 }
133                 bal.logf("clearing existing trash lists, in case the new rendezvous order differs from previous run")
134                 if err = bal.ClearTrashLists(ctx, client); err != nil {
135                         return
136                 }
137                 // The current rendezvous state becomes "safe" (i.e.,
138                 // OK to compute changes for that state without
139                 // clearing existing trash lists) only now, after we
140                 // succeed in clearing existing trash lists.
141                 nextRunOptions.SafeRendezvousState = rs
142         }
143
144         if err = bal.GetCurrentState(ctx, client, cluster.Collections.BalanceCollectionBatch, cluster.Collections.BalanceCollectionBuffers); err != nil {
145                 return
146         }
147         bal.ComputeChangeSets()
148         bal.PrintStatistics()
149         if err = bal.CheckSanityLate(); err != nil {
150                 return
151         }
152         if lbFile != nil {
153                 err = lbFile.Sync()
154                 if err != nil {
155                         return
156                 }
157                 err = os.Rename(bal.LostBlocksFile+".tmp", bal.LostBlocksFile)
158                 if err != nil {
159                         return
160                 }
161                 lbFile = nil
162         }
163         if runOptions.CommitPulls {
164                 err = bal.CommitPulls(ctx, client)
165                 if err != nil {
166                         // Skip trash if we can't pull. (Too cautious?)
167                         return
168                 }
169         }
170         if runOptions.CommitTrash {
171                 err = bal.CommitTrash(ctx, client)
172                 if err != nil {
173                         return
174                 }
175         }
176         err = bal.updateCollections(ctx, client, cluster)
177         return
178 }
179
180 // SetKeepServices sets the list of KeepServices to operate on.
181 func (bal *Balancer) SetKeepServices(srvList arvados.KeepServiceList) error {
182         bal.KeepServices = make(map[string]*KeepService)
183         for _, srv := range srvList.Items {
184                 bal.KeepServices[srv.UUID] = &KeepService{
185                         KeepService: srv,
186                         ChangeSet:   &ChangeSet{},
187                 }
188         }
189         return nil
190 }
191
192 // DiscoverKeepServices sets the list of KeepServices by calling the
193 // API to get a list of all services, and selecting the ones whose
194 // ServiceType is "disk"
195 func (bal *Balancer) DiscoverKeepServices(c *arvados.Client) error {
196         bal.KeepServices = make(map[string]*KeepService)
197         return c.EachKeepService(func(srv arvados.KeepService) error {
198                 if srv.ServiceType == "disk" {
199                         bal.KeepServices[srv.UUID] = &KeepService{
200                                 KeepService: srv,
201                                 ChangeSet:   &ChangeSet{},
202                         }
203                 } else {
204                         bal.logf("skipping %v with service type %q", srv.UUID, srv.ServiceType)
205                 }
206                 return nil
207         })
208 }
209
210 func (bal *Balancer) cleanupMounts() {
211         rwdev := map[string]*KeepService{}
212         for _, srv := range bal.KeepServices {
213                 for _, mnt := range srv.mounts {
214                         if !mnt.ReadOnly && mnt.DeviceID != "" {
215                                 rwdev[mnt.DeviceID] = srv
216                         }
217                 }
218         }
219         // Drop the readonly mounts whose device is mounted RW
220         // elsewhere.
221         for _, srv := range bal.KeepServices {
222                 var dedup []*KeepMount
223                 for _, mnt := range srv.mounts {
224                         if mnt.ReadOnly && rwdev[mnt.DeviceID] != nil {
225                                 bal.logf("skipping srv %s readonly mount %q because same device %q is mounted read-write on srv %s", srv, mnt.UUID, mnt.DeviceID, rwdev[mnt.DeviceID])
226                         } else {
227                                 dedup = append(dedup, mnt)
228                         }
229                 }
230                 srv.mounts = dedup
231         }
232         for _, srv := range bal.KeepServices {
233                 for _, mnt := range srv.mounts {
234                         if mnt.Replication <= 0 {
235                                 log.Printf("%s: mount %s reports replication=%d, using replication=1", srv, mnt.UUID, mnt.Replication)
236                                 mnt.Replication = 1
237                         }
238                 }
239         }
240 }
241
242 // CheckSanityEarly checks for configuration and runtime errors that
243 // can be detected before GetCurrentState() and ComputeChangeSets()
244 // are called.
245 //
246 // If it returns an error, it is pointless to run GetCurrentState or
247 // ComputeChangeSets: after doing so, the statistics would be
248 // meaningless and it would be dangerous to run any Commit methods.
249 func (bal *Balancer) CheckSanityEarly(c *arvados.Client) error {
250         u, err := c.CurrentUser()
251         if err != nil {
252                 return fmt.Errorf("CurrentUser(): %v", err)
253         }
254         if !u.IsActive || !u.IsAdmin {
255                 return fmt.Errorf("current user (%s) is not an active admin user", u.UUID)
256         }
257         for _, srv := range bal.KeepServices {
258                 if srv.ServiceType == "proxy" {
259                         return fmt.Errorf("config error: %s: proxy servers cannot be balanced", srv)
260                 }
261         }
262
263         var checkPage arvados.CollectionList
264         if err = c.RequestAndDecode(&checkPage, "GET", "arvados/v1/collections", nil, arvados.ResourceListParams{
265                 Limit:              new(int),
266                 Count:              "exact",
267                 IncludeTrash:       true,
268                 IncludeOldVersions: true,
269                 Filters: []arvados.Filter{{
270                         Attr:     "modified_at",
271                         Operator: "=",
272                         Operand:  nil,
273                 }},
274         }); err != nil {
275                 return err
276         } else if n := checkPage.ItemsAvailable; n > 0 {
277                 return fmt.Errorf("%d collections exist with null modified_at; cannot fetch reliably", n)
278         }
279
280         return nil
281 }
282
283 // rendezvousState returns a fingerprint (e.g., a sorted list of
284 // UUID+host+port) of the current set of keep services.
285 func (bal *Balancer) rendezvousState() string {
286         srvs := make([]string, 0, len(bal.KeepServices))
287         for _, srv := range bal.KeepServices {
288                 srvs = append(srvs, srv.String())
289         }
290         sort.Strings(srvs)
291         return strings.Join(srvs, "; ")
292 }
293
294 // ClearTrashLists sends an empty trash list to each keep
295 // service. Calling this before GetCurrentState avoids races.
296 //
297 // When a block appears in an index, we assume that replica will still
298 // exist after we delete other replicas on other servers. However,
299 // it's possible that a previous rebalancing operation made different
300 // decisions (e.g., servers were added/removed, and rendezvous order
301 // changed). In this case, the replica might already be on that
302 // server's trash list, and it might be deleted before we send a
303 // replacement trash list.
304 //
305 // We avoid this problem if we clear all trash lists before getting
306 // indexes. (We also assume there is only one rebalancing process
307 // running at a time.)
308 func (bal *Balancer) ClearTrashLists(ctx context.Context, c *arvados.Client) error {
309         for _, srv := range bal.KeepServices {
310                 srv.ChangeSet = &ChangeSet{}
311         }
312         return bal.CommitTrash(ctx, c)
313 }
314
315 // GetCurrentState determines the current replication state, and the
316 // desired replication level, for every block that is either
317 // retrievable or referenced.
318 //
319 // It determines the current replication state by reading the block index
320 // from every known Keep service.
321 //
322 // It determines the desired replication level by retrieving all
323 // collection manifests in the database (API server).
324 //
325 // It encodes the resulting information in BlockStateMap.
326 func (bal *Balancer) GetCurrentState(ctx context.Context, c *arvados.Client, pageSize, bufs int) error {
327         ctx, cancel := context.WithCancel(ctx)
328         defer cancel()
329
330         defer bal.time("get_state", "wall clock time to get current state")()
331         bal.BlockStateMap = NewBlockStateMap()
332
333         dd, err := c.DiscoveryDocument()
334         if err != nil {
335                 return err
336         }
337         bal.DefaultReplication = dd.DefaultCollectionReplication
338         bal.MinMtime = time.Now().UnixNano() - dd.BlobSignatureTTL*1e9
339
340         errs := make(chan error, 1)
341         wg := sync.WaitGroup{}
342
343         // When a device is mounted more than once, we will get its
344         // index only once, and call AddReplicas on all of the mounts.
345         // equivMount keys are the mounts that will be indexed, and
346         // each value is a list of mounts to apply the received index
347         // to.
348         equivMount := map[*KeepMount][]*KeepMount{}
349         // deviceMount maps each device ID to the one mount that will
350         // be indexed for that device.
351         deviceMount := map[string]*KeepMount{}
352         for _, srv := range bal.KeepServices {
353                 for _, mnt := range srv.mounts {
354                         equiv := deviceMount[mnt.DeviceID]
355                         if equiv == nil {
356                                 equiv = mnt
357                                 if mnt.DeviceID != "" {
358                                         deviceMount[mnt.DeviceID] = equiv
359                                 }
360                         }
361                         equivMount[equiv] = append(equivMount[equiv], mnt)
362                 }
363         }
364
365         // Start one goroutine for each (non-redundant) mount:
366         // retrieve the index, and add the returned blocks to
367         // BlockStateMap.
368         for _, mounts := range equivMount {
369                 wg.Add(1)
370                 go func(mounts []*KeepMount) {
371                         defer wg.Done()
372                         bal.logf("mount %s: retrieve index from %s", mounts[0], mounts[0].KeepService)
373                         idx, err := mounts[0].KeepService.IndexMount(ctx, c, mounts[0].UUID, "")
374                         if err != nil {
375                                 select {
376                                 case errs <- fmt.Errorf("%s: retrieve index: %v", mounts[0], err):
377                                 default:
378                                 }
379                                 cancel()
380                                 return
381                         }
382                         if len(errs) > 0 {
383                                 // Some other goroutine encountered an
384                                 // error -- any further effort here
385                                 // will be wasted.
386                                 return
387                         }
388                         for _, mount := range mounts {
389                                 bal.logf("%s: add %d entries to map", mount, len(idx))
390                                 bal.BlockStateMap.AddReplicas(mount, idx)
391                                 bal.logf("%s: added %d entries to map at %dx (%d replicas)", mount, len(idx), mount.Replication, len(idx)*mount.Replication)
392                         }
393                         bal.logf("mount %s: index done", mounts[0])
394                 }(mounts)
395         }
396
397         // collQ buffers incoming collections so we can start fetching
398         // the next page without waiting for the current page to
399         // finish processing.
400         collQ := make(chan arvados.Collection, bufs)
401
402         // Start a goroutine to process collections. (We could use a
403         // worker pool here, but even with a single worker we already
404         // process collections much faster than we can retrieve them.)
405         wg.Add(1)
406         go func() {
407                 defer wg.Done()
408                 for coll := range collQ {
409                         err := bal.addCollection(coll)
410                         if err != nil || len(errs) > 0 {
411                                 select {
412                                 case errs <- err:
413                                 default:
414                                 }
415                                 for range collQ {
416                                 }
417                                 cancel()
418                                 return
419                         }
420                         bal.collScanned++
421                 }
422         }()
423
424         // Start a goroutine to retrieve all collections from the
425         // Arvados database and send them to collQ for processing.
426         wg.Add(1)
427         go func() {
428                 defer wg.Done()
429                 err = EachCollection(ctx, bal.DB, c,
430                         func(coll arvados.Collection) error {
431                                 collQ <- coll
432                                 if len(errs) > 0 {
433                                         // some other GetCurrentState
434                                         // error happened: no point
435                                         // getting any more
436                                         // collections.
437                                         return fmt.Errorf("")
438                                 }
439                                 return nil
440                         }, func(done, total int) {
441                                 bal.logf("collections: %d/%d", done, total)
442                         })
443                 close(collQ)
444                 if err != nil {
445                         select {
446                         case errs <- err:
447                         default:
448                         }
449                         cancel()
450                 }
451         }()
452
453         wg.Wait()
454         if len(errs) > 0 {
455                 return <-errs
456         }
457         return nil
458 }
459
460 func (bal *Balancer) addCollection(coll arvados.Collection) error {
461         blkids, err := coll.SizedDigests()
462         if err != nil {
463                 return fmt.Errorf("%v: %v", coll.UUID, err)
464         }
465         repl := bal.DefaultReplication
466         if coll.ReplicationDesired != nil {
467                 repl = *coll.ReplicationDesired
468         }
469         bal.Logger.Debugf("%v: %d blocks x%d", coll.UUID, len(blkids), repl)
470         // Pass pdh to IncreaseDesired only if LostBlocksFile is being
471         // written -- otherwise it's just a waste of memory.
472         pdh := ""
473         if bal.LostBlocksFile != "" {
474                 pdh = coll.PortableDataHash
475         }
476         bal.BlockStateMap.IncreaseDesired(pdh, coll.StorageClassesDesired, repl, blkids)
477         return nil
478 }
479
480 // ComputeChangeSets compares, for each known block, the current and
481 // desired replication states. If it is possible to get closer to the
482 // desired state by copying or deleting blocks, it adds those changes
483 // to the relevant KeepServices' ChangeSets.
484 //
485 // It does not actually apply any of the computed changes.
486 func (bal *Balancer) ComputeChangeSets() {
487         // This just calls balanceBlock() once for each block, using a
488         // pool of worker goroutines.
489         defer bal.time("changeset_compute", "wall clock time to compute changesets")()
490         bal.setupLookupTables()
491
492         type balanceTask struct {
493                 blkid arvados.SizedDigest
494                 blk   *BlockState
495         }
496         workers := runtime.GOMAXPROCS(-1)
497         todo := make(chan balanceTask, workers)
498         go func() {
499                 bal.BlockStateMap.Apply(func(blkid arvados.SizedDigest, blk *BlockState) {
500                         todo <- balanceTask{
501                                 blkid: blkid,
502                                 blk:   blk,
503                         }
504                 })
505                 close(todo)
506         }()
507         results := make(chan balanceResult, workers)
508         go func() {
509                 var wg sync.WaitGroup
510                 for i := 0; i < workers; i++ {
511                         wg.Add(1)
512                         go func() {
513                                 for work := range todo {
514                                         results <- bal.balanceBlock(work.blkid, work.blk)
515                                 }
516                                 wg.Done()
517                         }()
518                 }
519                 wg.Wait()
520                 close(results)
521         }()
522         bal.collectStatistics(results)
523 }
524
525 func (bal *Balancer) setupLookupTables() {
526         bal.serviceRoots = make(map[string]string)
527         bal.classes = defaultClasses
528         bal.mountsByClass = map[string]map[*KeepMount]bool{"default": {}}
529         bal.mounts = 0
530         for _, srv := range bal.KeepServices {
531                 bal.serviceRoots[srv.UUID] = srv.UUID
532                 for _, mnt := range srv.mounts {
533                         bal.mounts++
534
535                         // All mounts on a read-only service are
536                         // effectively read-only.
537                         mnt.ReadOnly = mnt.ReadOnly || srv.ReadOnly
538
539                         if len(mnt.StorageClasses) == 0 {
540                                 bal.mountsByClass["default"][mnt] = true
541                                 continue
542                         }
543                         for class := range mnt.StorageClasses {
544                                 if mbc := bal.mountsByClass[class]; mbc == nil {
545                                         bal.classes = append(bal.classes, class)
546                                         bal.mountsByClass[class] = map[*KeepMount]bool{mnt: true}
547                                 } else {
548                                         mbc[mnt] = true
549                                 }
550                         }
551                 }
552         }
553         // Consider classes in lexicographic order to avoid flapping
554         // between balancing runs.  The outcome of the "prefer a mount
555         // we're already planning to use for a different storage
556         // class" case in balanceBlock depends on the order classes
557         // are considered.
558         sort.Strings(bal.classes)
559 }
560
561 const (
562         changeStay = iota
563         changePull
564         changeTrash
565         changeNone
566 )
567
568 var changeName = map[int]string{
569         changeStay:  "stay",
570         changePull:  "pull",
571         changeTrash: "trash",
572         changeNone:  "none",
573 }
574
575 type balancedBlockState struct {
576         needed       int
577         unneeded     int
578         pulling      int
579         unachievable bool
580 }
581
582 type balanceResult struct {
583         blk        *BlockState
584         blkid      arvados.SizedDigest
585         lost       bool
586         blockState balancedBlockState
587         classState map[string]balancedBlockState
588 }
589
590 type slot struct {
591         mnt  *KeepMount // never nil
592         repl *Replica   // replica already stored here (or nil)
593         want bool       // we should pull/leave a replica here
594 }
595
596 // balanceBlock compares current state to desired state for a single
597 // block, and makes the appropriate ChangeSet calls.
598 func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) balanceResult {
599         bal.Logger.Debugf("balanceBlock: %v %+v", blkid, blk)
600
601         // Build a list of all slots (one per mounted volume).
602         slots := make([]slot, 0, bal.mounts)
603         for _, srv := range bal.KeepServices {
604                 for _, mnt := range srv.mounts {
605                         var repl *Replica
606                         for r := range blk.Replicas {
607                                 if blk.Replicas[r].KeepMount == mnt {
608                                         repl = &blk.Replicas[r]
609                                 }
610                         }
611                         // Initial value of "want" is "have, and can't
612                         // delete". These untrashable replicas get
613                         // prioritized when sorting slots: otherwise,
614                         // non-optimal readonly copies would cause us
615                         // to overreplicate.
616                         slots = append(slots, slot{
617                                 mnt:  mnt,
618                                 repl: repl,
619                                 want: repl != nil && mnt.ReadOnly,
620                         })
621                 }
622         }
623
624         uuids := keepclient.NewRootSorter(bal.serviceRoots, string(blkid[:32])).GetSortedRoots()
625         srvRendezvous := make(map[*KeepService]int, len(uuids))
626         for i, uuid := range uuids {
627                 srv := bal.KeepServices[uuid]
628                 srvRendezvous[srv] = i
629         }
630
631         // Below we set underreplicated=true if we find any storage
632         // class that's currently underreplicated -- in that case we
633         // won't want to trash any replicas.
634         underreplicated := false
635
636         unsafeToDelete := make(map[int64]bool, len(slots))
637         for _, class := range bal.classes {
638                 desired := blk.Desired[class]
639                 if desired == 0 {
640                         continue
641                 }
642
643                 // Sort the slots by desirability.
644                 sort.Slice(slots, func(i, j int) bool {
645                         si, sj := slots[i], slots[j]
646                         if classi, classj := bal.mountsByClass[class][si.mnt], bal.mountsByClass[class][sj.mnt]; classi != classj {
647                                 // Prefer a mount that satisfies the
648                                 // desired class.
649                                 return bal.mountsByClass[class][si.mnt]
650                         } else if si.want != sj.want {
651                                 // Prefer a mount that will have a
652                                 // replica no matter what we do here
653                                 // -- either because it already has an
654                                 // untrashable replica, or because we
655                                 // already need it to satisfy a
656                                 // different storage class.
657                                 return si.want
658                         } else if orderi, orderj := srvRendezvous[si.mnt.KeepService], srvRendezvous[sj.mnt.KeepService]; orderi != orderj {
659                                 // Prefer a better rendezvous
660                                 // position.
661                                 return orderi < orderj
662                         } else if repli, replj := si.repl != nil, sj.repl != nil; repli != replj {
663                                 // Prefer a mount that already has a
664                                 // replica.
665                                 return repli
666                         } else {
667                                 // If pull/trash turns out to be
668                                 // needed, distribute the
669                                 // new/remaining replicas uniformly
670                                 // across qualifying mounts on a given
671                                 // server.
672                                 return rendezvousLess(si.mnt.DeviceID, sj.mnt.DeviceID, blkid)
673                         }
674                 })
675
676                 // Servers/mounts/devices (with or without existing
677                 // replicas) that are part of the best achievable
678                 // layout for this storage class.
679                 wantSrv := map[*KeepService]bool{}
680                 wantMnt := map[*KeepMount]bool{}
681                 wantDev := map[string]bool{}
682                 // Positions (with existing replicas) that have been
683                 // protected (via unsafeToDelete) to ensure we don't
684                 // reduce replication below desired level when
685                 // trashing replicas that aren't optimal positions for
686                 // any storage class.
687                 protMnt := map[*KeepMount]bool{}
688                 // Replication planned so far (corresponds to wantMnt).
689                 replWant := 0
690                 // Protected replication (corresponds to protMnt).
691                 replProt := 0
692
693                 // trySlot tries using a slot to meet requirements,
694                 // and returns true if all requirements are met.
695                 trySlot := func(i int) bool {
696                         slot := slots[i]
697                         if wantMnt[slot.mnt] || wantDev[slot.mnt.DeviceID] {
698                                 // Already allocated a replica to this
699                                 // backend device, possibly on a
700                                 // different server.
701                                 return false
702                         }
703                         if replProt < desired && slot.repl != nil && !protMnt[slot.mnt] {
704                                 unsafeToDelete[slot.repl.Mtime] = true
705                                 protMnt[slot.mnt] = true
706                                 replProt += slot.mnt.Replication
707                         }
708                         if replWant < desired && (slot.repl != nil || !slot.mnt.ReadOnly) {
709                                 slots[i].want = true
710                                 wantSrv[slot.mnt.KeepService] = true
711                                 wantMnt[slot.mnt] = true
712                                 if slot.mnt.DeviceID != "" {
713                                         wantDev[slot.mnt.DeviceID] = true
714                                 }
715                                 replWant += slot.mnt.Replication
716                         }
717                         return replProt >= desired && replWant >= desired
718                 }
719
720                 // First try to achieve desired replication without
721                 // using the same server twice.
722                 done := false
723                 for i := 0; i < len(slots) && !done; i++ {
724                         if !wantSrv[slots[i].mnt.KeepService] {
725                                 done = trySlot(i)
726                         }
727                 }
728
729                 // If that didn't suffice, do another pass without the
730                 // "distinct services" restriction. (Achieving the
731                 // desired volume replication on fewer than the
732                 // desired number of services is better than
733                 // underreplicating.)
734                 for i := 0; i < len(slots) && !done; i++ {
735                         done = trySlot(i)
736                 }
737
738                 if !underreplicated {
739                         safe := 0
740                         for _, slot := range slots {
741                                 if slot.repl == nil || !bal.mountsByClass[class][slot.mnt] {
742                                         continue
743                                 }
744                                 if safe += slot.mnt.Replication; safe >= desired {
745                                         break
746                                 }
747                         }
748                         underreplicated = safe < desired
749                 }
750
751                 // Avoid deleting wanted replicas from devices that
752                 // are mounted on multiple servers -- even if they
753                 // haven't already been added to unsafeToDelete
754                 // because the servers report different Mtimes.
755                 for _, slot := range slots {
756                         if slot.repl != nil && wantDev[slot.mnt.DeviceID] {
757                                 unsafeToDelete[slot.repl.Mtime] = true
758                         }
759                 }
760         }
761
762         // TODO: If multiple replicas are trashable, prefer the oldest
763         // replica that doesn't have a timestamp collision with
764         // others.
765
766         for i, slot := range slots {
767                 // Don't trash (1) any replicas of an underreplicated
768                 // block, even if they're in the wrong positions, or
769                 // (2) any replicas whose Mtimes are identical to
770                 // needed replicas (in case we're really seeing the
771                 // same copy via different mounts).
772                 if slot.repl != nil && (underreplicated || unsafeToDelete[slot.repl.Mtime]) {
773                         slots[i].want = true
774                 }
775         }
776
777         classState := make(map[string]balancedBlockState, len(bal.classes))
778         for _, class := range bal.classes {
779                 classState[class] = computeBlockState(slots, bal.mountsByClass[class], len(blk.Replicas), blk.Desired[class])
780         }
781         blockState := computeBlockState(slots, nil, len(blk.Replicas), 0)
782
783         var lost bool
784         var changes []string
785         for _, slot := range slots {
786                 // TODO: request a Touch if Mtime is duplicated.
787                 var change int
788                 switch {
789                 case !slot.want && slot.repl != nil && slot.repl.Mtime < bal.MinMtime:
790                         slot.mnt.KeepService.AddTrash(Trash{
791                                 SizedDigest: blkid,
792                                 Mtime:       slot.repl.Mtime,
793                                 From:        slot.mnt,
794                         })
795                         change = changeTrash
796                 case slot.repl == nil && slot.want && len(blk.Replicas) == 0:
797                         lost = true
798                         change = changeNone
799                 case slot.repl == nil && slot.want && !slot.mnt.ReadOnly:
800                         slot.mnt.KeepService.AddPull(Pull{
801                                 SizedDigest: blkid,
802                                 From:        blk.Replicas[0].KeepMount.KeepService,
803                                 To:          slot.mnt,
804                         })
805                         change = changePull
806                 case slot.repl != nil:
807                         change = changeStay
808                 default:
809                         change = changeNone
810                 }
811                 if bal.Dumper != nil {
812                         var mtime int64
813                         if slot.repl != nil {
814                                 mtime = slot.repl.Mtime
815                         }
816                         srv := slot.mnt.KeepService
817                         changes = append(changes, fmt.Sprintf("%s:%d/%s=%s,%d", srv.ServiceHost, srv.ServicePort, slot.mnt.UUID, changeName[change], mtime))
818                 }
819         }
820         if bal.Dumper != nil {
821                 bal.Dumper.Printf("%s refs=%d needed=%d unneeded=%d pulling=%v %v %v", blkid, blk.RefCount, blockState.needed, blockState.unneeded, blockState.pulling, blk.Desired, changes)
822         }
823         return balanceResult{
824                 blk:        blk,
825                 blkid:      blkid,
826                 lost:       lost,
827                 blockState: blockState,
828                 classState: classState,
829         }
830 }
831
832 func computeBlockState(slots []slot, onlyCount map[*KeepMount]bool, have, needRepl int) (bbs balancedBlockState) {
833         repl := 0
834         countedDev := map[string]bool{}
835         for _, slot := range slots {
836                 if onlyCount != nil && !onlyCount[slot.mnt] {
837                         continue
838                 }
839                 if countedDev[slot.mnt.DeviceID] {
840                         continue
841                 }
842                 switch {
843                 case slot.repl != nil && slot.want:
844                         bbs.needed++
845                         repl += slot.mnt.Replication
846                 case slot.repl != nil && !slot.want:
847                         bbs.unneeded++
848                         repl += slot.mnt.Replication
849                 case slot.repl == nil && slot.want && have > 0:
850                         bbs.pulling++
851                         repl += slot.mnt.Replication
852                 }
853                 if slot.mnt.DeviceID != "" {
854                         countedDev[slot.mnt.DeviceID] = true
855                 }
856         }
857         if repl < needRepl {
858                 bbs.unachievable = true
859         }
860         return
861 }
862
863 type blocksNBytes struct {
864         replicas int
865         blocks   int
866         bytes    int64
867 }
868
869 func (bb blocksNBytes) String() string {
870         return fmt.Sprintf("%d replicas (%d blocks, %d bytes)", bb.replicas, bb.blocks, bb.bytes)
871 }
872
873 type replicationStats struct {
874         needed       blocksNBytes
875         unneeded     blocksNBytes
876         pulling      blocksNBytes
877         unachievable blocksNBytes
878 }
879
880 type balancerStats struct {
881         lost          blocksNBytes
882         overrep       blocksNBytes
883         unref         blocksNBytes
884         garbage       blocksNBytes
885         underrep      blocksNBytes
886         unachievable  blocksNBytes
887         justright     blocksNBytes
888         desired       blocksNBytes
889         current       blocksNBytes
890         pulls         int
891         trashes       int
892         replHistogram []int
893         classStats    map[string]replicationStats
894
895         // collectionBytes / collectionBlockBytes = deduplication ratio
896         collectionBytes      int64 // sum(bytes in referenced blocks) across all collections
897         collectionBlockBytes int64 // sum(block size) across all blocks referenced by collections
898         collectionBlockRefs  int64 // sum(number of blocks referenced) across all collections
899         collectionBlocks     int64 // number of blocks referenced by any collection
900 }
901
902 func (s *balancerStats) dedupByteRatio() float64 {
903         if s.collectionBlockBytes == 0 {
904                 return 0
905         }
906         return float64(s.collectionBytes) / float64(s.collectionBlockBytes)
907 }
908
909 func (s *balancerStats) dedupBlockRatio() float64 {
910         if s.collectionBlocks == 0 {
911                 return 0
912         }
913         return float64(s.collectionBlockRefs) / float64(s.collectionBlocks)
914 }
915
916 func (bal *Balancer) collectStatistics(results <-chan balanceResult) {
917         var s balancerStats
918         s.replHistogram = make([]int, 2)
919         s.classStats = make(map[string]replicationStats, len(bal.classes))
920         for result := range results {
921                 bytes := result.blkid.Size()
922
923                 if rc := int64(result.blk.RefCount); rc > 0 {
924                         s.collectionBytes += rc * bytes
925                         s.collectionBlockBytes += bytes
926                         s.collectionBlockRefs += rc
927                         s.collectionBlocks++
928                 }
929
930                 for class, state := range result.classState {
931                         cs := s.classStats[class]
932                         if state.unachievable {
933                                 cs.unachievable.replicas++
934                                 cs.unachievable.blocks++
935                                 cs.unachievable.bytes += bytes
936                         }
937                         if state.needed > 0 {
938                                 cs.needed.replicas += state.needed
939                                 cs.needed.blocks++
940                                 cs.needed.bytes += bytes * int64(state.needed)
941                         }
942                         if state.unneeded > 0 {
943                                 cs.unneeded.replicas += state.unneeded
944                                 cs.unneeded.blocks++
945                                 cs.unneeded.bytes += bytes * int64(state.unneeded)
946                         }
947                         if state.pulling > 0 {
948                                 cs.pulling.replicas += state.pulling
949                                 cs.pulling.blocks++
950                                 cs.pulling.bytes += bytes * int64(state.pulling)
951                         }
952                         s.classStats[class] = cs
953                 }
954
955                 bs := result.blockState
956                 switch {
957                 case result.lost:
958                         s.lost.replicas++
959                         s.lost.blocks++
960                         s.lost.bytes += bytes
961                         fmt.Fprintf(bal.lostBlocks, "%s", strings.SplitN(string(result.blkid), "+", 2)[0])
962                         for pdh := range result.blk.Refs {
963                                 fmt.Fprintf(bal.lostBlocks, " %s", pdh)
964                         }
965                         fmt.Fprint(bal.lostBlocks, "\n")
966                 case bs.pulling > 0:
967                         s.underrep.replicas += bs.pulling
968                         s.underrep.blocks++
969                         s.underrep.bytes += bytes * int64(bs.pulling)
970                 case bs.unachievable:
971                         s.underrep.replicas++
972                         s.underrep.blocks++
973                         s.underrep.bytes += bytes
974                 case bs.unneeded > 0 && bs.needed == 0:
975                         // Count as "garbage" if all replicas are old
976                         // enough to trash, otherwise count as
977                         // "unref".
978                         counter := &s.garbage
979                         for _, r := range result.blk.Replicas {
980                                 if r.Mtime >= bal.MinMtime {
981                                         counter = &s.unref
982                                         break
983                                 }
984                         }
985                         counter.replicas += bs.unneeded
986                         counter.blocks++
987                         counter.bytes += bytes * int64(bs.unneeded)
988                 case bs.unneeded > 0:
989                         s.overrep.replicas += bs.unneeded
990                         s.overrep.blocks++
991                         s.overrep.bytes += bytes * int64(bs.unneeded)
992                 default:
993                         s.justright.replicas += bs.needed
994                         s.justright.blocks++
995                         s.justright.bytes += bytes * int64(bs.needed)
996                 }
997
998                 if bs.needed > 0 {
999                         s.desired.replicas += bs.needed
1000                         s.desired.blocks++
1001                         s.desired.bytes += bytes * int64(bs.needed)
1002                 }
1003                 if bs.needed+bs.unneeded > 0 {
1004                         s.current.replicas += bs.needed + bs.unneeded
1005                         s.current.blocks++
1006                         s.current.bytes += bytes * int64(bs.needed+bs.unneeded)
1007                 }
1008
1009                 for len(s.replHistogram) <= bs.needed+bs.unneeded {
1010                         s.replHistogram = append(s.replHistogram, 0)
1011                 }
1012                 s.replHistogram[bs.needed+bs.unneeded]++
1013         }
1014         for _, srv := range bal.KeepServices {
1015                 s.pulls += len(srv.ChangeSet.Pulls)
1016                 s.trashes += len(srv.ChangeSet.Trashes)
1017         }
1018         bal.stats = s
1019         bal.Metrics.UpdateStats(s)
1020 }
1021
1022 // PrintStatistics writes statistics about the computed changes to
1023 // bal.Logger. It should not be called until ComputeChangeSets has
1024 // finished.
1025 func (bal *Balancer) PrintStatistics() {
1026         bal.logf("===")
1027         bal.logf("%s lost (0=have<want)", bal.stats.lost)
1028         bal.logf("%s underreplicated (0<have<want)", bal.stats.underrep)
1029         bal.logf("%s just right (have=want)", bal.stats.justright)
1030         bal.logf("%s overreplicated (have>want>0)", bal.stats.overrep)
1031         bal.logf("%s unreferenced (have>want=0, new)", bal.stats.unref)
1032         bal.logf("%s garbage (have>want=0, old)", bal.stats.garbage)
1033         for _, class := range bal.classes {
1034                 cs := bal.stats.classStats[class]
1035                 bal.logf("===")
1036                 bal.logf("storage class %q: %s needed", class, cs.needed)
1037                 bal.logf("storage class %q: %s unneeded", class, cs.unneeded)
1038                 bal.logf("storage class %q: %s pulling", class, cs.pulling)
1039                 bal.logf("storage class %q: %s unachievable", class, cs.unachievable)
1040         }
1041         bal.logf("===")
1042         bal.logf("%s total commitment (excluding unreferenced)", bal.stats.desired)
1043         bal.logf("%s total usage", bal.stats.current)
1044         bal.logf("===")
1045         for _, srv := range bal.KeepServices {
1046                 bal.logf("%s: %v\n", srv, srv.ChangeSet)
1047         }
1048         bal.logf("===")
1049         bal.printHistogram(60)
1050         bal.logf("===")
1051 }
1052
1053 func (bal *Balancer) printHistogram(hashColumns int) {
1054         bal.logf("Replication level distribution:")
1055         maxCount := 0
1056         for _, count := range bal.stats.replHistogram {
1057                 if maxCount < count {
1058                         maxCount = count
1059                 }
1060         }
1061         hashes := strings.Repeat("#", hashColumns)
1062         countWidth := 1 + int(math.Log10(float64(maxCount+1)))
1063         scaleCount := 10 * float64(hashColumns) / math.Floor(1+10*math.Log10(float64(maxCount+1)))
1064         for repl, count := range bal.stats.replHistogram {
1065                 nHashes := int(scaleCount * math.Log10(float64(count+1)))
1066                 bal.logf("%2d: %*d %s", repl, countWidth, count, hashes[:nHashes])
1067         }
1068 }
1069
1070 // CheckSanityLate checks for configuration and runtime errors after
1071 // GetCurrentState() and ComputeChangeSets() have finished.
1072 //
1073 // If it returns an error, it is dangerous to run any Commit methods.
1074 func (bal *Balancer) CheckSanityLate() error {
1075         if bal.errors != nil {
1076                 for _, err := range bal.errors {
1077                         bal.logf("deferred error: %v", err)
1078                 }
1079                 return fmt.Errorf("cannot proceed safely after deferred errors")
1080         }
1081
1082         if bal.collScanned == 0 {
1083                 return fmt.Errorf("received zero collections")
1084         }
1085
1086         anyDesired := false
1087         bal.BlockStateMap.Apply(func(_ arvados.SizedDigest, blk *BlockState) {
1088                 for _, desired := range blk.Desired {
1089                         if desired > 0 {
1090                                 anyDesired = true
1091                                 break
1092                         }
1093                 }
1094         })
1095         if !anyDesired {
1096                 return fmt.Errorf("zero blocks have desired replication>0")
1097         }
1098
1099         if dr := bal.DefaultReplication; dr < 1 {
1100                 return fmt.Errorf("Default replication (%d) is less than 1", dr)
1101         }
1102
1103         // TODO: no two services have identical indexes
1104         // TODO: no collisions (same md5, different size)
1105         return nil
1106 }
1107
1108 // CommitPulls sends the computed lists of pull requests to the
1109 // keepstore servers. This has the effect of increasing replication of
1110 // existing blocks that are either underreplicated or poorly
1111 // distributed according to rendezvous hashing.
1112 func (bal *Balancer) CommitPulls(ctx context.Context, c *arvados.Client) error {
1113         defer bal.time("send_pull_lists", "wall clock time to send pull lists")()
1114         return bal.commitAsync(c, "send pull list",
1115                 func(srv *KeepService) error {
1116                         return srv.CommitPulls(ctx, c)
1117                 })
1118 }
1119
1120 // CommitTrash sends the computed lists of trash requests to the
1121 // keepstore servers. This has the effect of deleting blocks that are
1122 // overreplicated or unreferenced.
1123 func (bal *Balancer) CommitTrash(ctx context.Context, c *arvados.Client) error {
1124         defer bal.time("send_trash_lists", "wall clock time to send trash lists")()
1125         return bal.commitAsync(c, "send trash list",
1126                 func(srv *KeepService) error {
1127                         return srv.CommitTrash(ctx, c)
1128                 })
1129 }
1130
1131 func (bal *Balancer) commitAsync(c *arvados.Client, label string, f func(srv *KeepService) error) error {
1132         errs := make(chan error)
1133         for _, srv := range bal.KeepServices {
1134                 go func(srv *KeepService) {
1135                         var err error
1136                         defer func() { errs <- err }()
1137                         label := fmt.Sprintf("%s: %v", srv, label)
1138                         err = f(srv)
1139                         if err != nil {
1140                                 err = fmt.Errorf("%s: %v", label, err)
1141                         }
1142                 }(srv)
1143         }
1144         var lastErr error
1145         for range bal.KeepServices {
1146                 if err := <-errs; err != nil {
1147                         bal.logf("%v", err)
1148                         lastErr = err
1149                 }
1150         }
1151         close(errs)
1152         return lastErr
1153 }
1154
1155 func (bal *Balancer) logf(f string, args ...interface{}) {
1156         if bal.Logger != nil {
1157                 bal.Logger.Printf(f, args...)
1158         }
1159 }
1160
1161 func (bal *Balancer) time(name, help string) func() {
1162         observer := bal.Metrics.DurationObserver(name+"_seconds", help)
1163         t0 := time.Now()
1164         bal.Logger.Printf("%s: start", name)
1165         return func() {
1166                 dur := time.Since(t0)
1167                 observer.Observe(dur.Seconds())
1168                 bal.Logger.Printf("%s: took %vs", name, dur.Seconds())
1169         }
1170 }
1171
1172 // Rendezvous hash sort function. Less efficient than sorting on
1173 // precomputed rendezvous hashes, but also rarely used.
1174 func rendezvousLess(i, j string, blkid arvados.SizedDigest) bool {
1175         a := md5.Sum([]byte(string(blkid[:32]) + i))
1176         b := md5.Sum([]byte(string(blkid[:32]) + j))
1177         return bytes.Compare(a[:], b[:]) < 0
1178 }