Merge branch 'main' into 19582-aws-s3v2-driver
[arvados.git] / services / keep-balance / balance.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package keepbalance
6
7 import (
8         "bytes"
9         "context"
10         "crypto/md5"
11         "errors"
12         "fmt"
13         "io"
14         "io/ioutil"
15         "log"
16         "math"
17         "os"
18         "runtime"
19         "sort"
20         "strings"
21         "sync"
22         "sync/atomic"
23         "syscall"
24         "time"
25
26         "git.arvados.org/arvados.git/lib/controller/dblock"
27         "git.arvados.org/arvados.git/sdk/go/arvados"
28         "git.arvados.org/arvados.git/sdk/go/ctxlog"
29         "git.arvados.org/arvados.git/sdk/go/keepclient"
30         "github.com/jmoiron/sqlx"
31         "github.com/sirupsen/logrus"
32 )
33
34 // Balancer compares the contents of keepstore servers with the
35 // collections stored in Arvados, and issues pull/trash requests
36 // needed to get (closer to) the optimal data layout.
37 //
38 // In the optimal data layout: every data block referenced by a
39 // collection is replicated at least as many times as desired by the
40 // collection; there are no unreferenced data blocks older than
41 // BlobSignatureTTL; and all N existing replicas of a given data block
42 // are in the N best positions in rendezvous probe order.
43 type Balancer struct {
44         DB      *sqlx.DB
45         Logger  logrus.FieldLogger
46         Dumper  logrus.FieldLogger
47         Metrics *metrics
48
49         LostBlocksFile string
50
51         *BlockStateMap
52         KeepServices       map[string]*KeepService
53         DefaultReplication int
54         MinMtime           int64
55
56         classes       []string
57         mounts        int
58         mountsByClass map[string]map[*KeepMount]bool
59         collScanned   int64
60         serviceRoots  map[string]string
61         errors        []error
62         stats         balancerStats
63         mutex         sync.Mutex
64         lostBlocks    io.Writer
65 }
66
67 // Run performs a balance operation using the given config and
68 // runOptions, and returns RunOptions suitable for passing to a
69 // subsequent balance operation.
70 //
71 // Run should only be called once on a given Balancer object.
72 func (bal *Balancer) Run(ctx context.Context, client *arvados.Client, cluster *arvados.Cluster, runOptions RunOptions) (nextRunOptions RunOptions, err error) {
73         nextRunOptions = runOptions
74
75         ctxlog.FromContext(ctx).Info("acquiring active lock")
76         if !dblock.KeepBalanceActive.Lock(ctx, func(context.Context) (*sqlx.DB, error) { return bal.DB, nil }) {
77                 // context canceled
78                 return
79         }
80         defer dblock.KeepBalanceActive.Unlock()
81
82         defer bal.time("sweep", "wall clock time to run one full sweep")()
83
84         ctx, cancel := context.WithDeadline(ctx, time.Now().Add(cluster.Collections.BalanceTimeout.Duration()))
85         defer cancel()
86
87         var lbFile *os.File
88         if bal.LostBlocksFile != "" {
89                 tmpfn := bal.LostBlocksFile + ".tmp"
90                 lbFile, err = os.OpenFile(tmpfn, os.O_CREATE|os.O_WRONLY, 0777)
91                 if err != nil {
92                         return
93                 }
94                 defer lbFile.Close()
95                 err = syscall.Flock(int(lbFile.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
96                 if err != nil {
97                         return
98                 }
99                 defer func() {
100                         // Remove the tempfile only if we didn't get
101                         // as far as successfully renaming it.
102                         if lbFile != nil {
103                                 os.Remove(tmpfn)
104                         }
105                 }()
106                 bal.lostBlocks = lbFile
107         } else {
108                 bal.lostBlocks = ioutil.Discard
109         }
110
111         err = bal.DiscoverKeepServices(client)
112         if err != nil {
113                 return
114         }
115
116         for _, srv := range bal.KeepServices {
117                 err = srv.discoverMounts(client)
118                 if err != nil {
119                         return
120                 }
121         }
122         bal.cleanupMounts()
123
124         if err = bal.CheckSanityEarly(client); err != nil {
125                 return
126         }
127
128         // On a big site, indexing and sending trash/pull lists can
129         // take much longer than the usual 5 minute client
130         // timeout. From here on, we rely on the context deadline
131         // instead, aborting the entire operation if any part takes
132         // too long.
133         client.Timeout = 0
134
135         rs := bal.rendezvousState()
136         if runOptions.CommitTrash && rs != runOptions.SafeRendezvousState {
137                 if runOptions.SafeRendezvousState != "" {
138                         bal.logf("notice: KeepServices list has changed since last run")
139                 }
140                 bal.logf("clearing existing trash lists, in case the new rendezvous order differs from previous run")
141                 if err = bal.ClearTrashLists(ctx, client); err != nil {
142                         return
143                 }
144                 // The current rendezvous state becomes "safe" (i.e.,
145                 // OK to compute changes for that state without
146                 // clearing existing trash lists) only now, after we
147                 // succeed in clearing existing trash lists.
148                 nextRunOptions.SafeRendezvousState = rs
149         }
150
151         if err = bal.GetCurrentState(ctx, client, cluster.Collections.BalanceCollectionBatch, cluster.Collections.BalanceCollectionBuffers); err != nil {
152                 return
153         }
154         bal.ComputeChangeSets()
155         bal.PrintStatistics()
156         if err = bal.CheckSanityLate(); err != nil {
157                 return
158         }
159         if lbFile != nil {
160                 err = lbFile.Sync()
161                 if err != nil {
162                         return
163                 }
164                 err = os.Rename(bal.LostBlocksFile+".tmp", bal.LostBlocksFile)
165                 if err != nil {
166                         return
167                 }
168                 lbFile = nil
169         }
170         if runOptions.CommitPulls {
171                 err = bal.CommitPulls(ctx, client)
172                 if err != nil {
173                         // Skip trash if we can't pull. (Too cautious?)
174                         return
175                 }
176         }
177         if runOptions.CommitTrash {
178                 err = bal.CommitTrash(ctx, client)
179                 if err != nil {
180                         return
181                 }
182         }
183         if runOptions.CommitConfirmedFields {
184                 err = bal.updateCollections(ctx, client, cluster)
185                 if err != nil {
186                         return
187                 }
188         }
189         return
190 }
191
192 // SetKeepServices sets the list of KeepServices to operate on.
193 func (bal *Balancer) SetKeepServices(srvList arvados.KeepServiceList) error {
194         bal.KeepServices = make(map[string]*KeepService)
195         for _, srv := range srvList.Items {
196                 bal.KeepServices[srv.UUID] = &KeepService{
197                         KeepService: srv,
198                         ChangeSet:   &ChangeSet{},
199                 }
200         }
201         return nil
202 }
203
204 // DiscoverKeepServices sets the list of KeepServices by calling the
205 // API to get a list of all services, and selecting the ones whose
206 // ServiceType is "disk"
207 func (bal *Balancer) DiscoverKeepServices(c *arvados.Client) error {
208         bal.KeepServices = make(map[string]*KeepService)
209         return c.EachKeepService(func(srv arvados.KeepService) error {
210                 if srv.ServiceType == "disk" {
211                         bal.KeepServices[srv.UUID] = &KeepService{
212                                 KeepService: srv,
213                                 ChangeSet:   &ChangeSet{},
214                         }
215                 } else {
216                         bal.logf("skipping %v with service type %q", srv.UUID, srv.ServiceType)
217                 }
218                 return nil
219         })
220 }
221
222 func (bal *Balancer) cleanupMounts() {
223         rwdev := map[string]*KeepService{}
224         for _, srv := range bal.KeepServices {
225                 for _, mnt := range srv.mounts {
226                         if !mnt.ReadOnly {
227                                 rwdev[mnt.UUID] = srv
228                         }
229                 }
230         }
231         // Drop the readonly mounts whose device is mounted RW
232         // elsewhere.
233         for _, srv := range bal.KeepServices {
234                 var dedup []*KeepMount
235                 for _, mnt := range srv.mounts {
236                         if mnt.ReadOnly && rwdev[mnt.UUID] != nil {
237                                 bal.logf("skipping srv %s readonly mount %q because same volume is mounted read-write on srv %s", srv, mnt.UUID, rwdev[mnt.UUID])
238                         } else {
239                                 dedup = append(dedup, mnt)
240                         }
241                 }
242                 srv.mounts = dedup
243         }
244         for _, srv := range bal.KeepServices {
245                 for _, mnt := range srv.mounts {
246                         if mnt.Replication <= 0 {
247                                 log.Printf("%s: mount %s reports replication=%d, using replication=1", srv, mnt.UUID, mnt.Replication)
248                                 mnt.Replication = 1
249                         }
250                 }
251         }
252 }
253
254 // CheckSanityEarly checks for configuration and runtime errors that
255 // can be detected before GetCurrentState() and ComputeChangeSets()
256 // are called.
257 //
258 // If it returns an error, it is pointless to run GetCurrentState or
259 // ComputeChangeSets: after doing so, the statistics would be
260 // meaningless and it would be dangerous to run any Commit methods.
261 func (bal *Balancer) CheckSanityEarly(c *arvados.Client) error {
262         u, err := c.CurrentUser()
263         if err != nil {
264                 return fmt.Errorf("CurrentUser(): %v", err)
265         }
266         if !u.IsActive || !u.IsAdmin {
267                 return fmt.Errorf("current user (%s) is not an active admin user", u.UUID)
268         }
269         for _, srv := range bal.KeepServices {
270                 if srv.ServiceType == "proxy" {
271                         return fmt.Errorf("config error: %s: proxy servers cannot be balanced", srv)
272                 }
273         }
274
275         mountProblem := false
276         type deviceMount struct {
277                 srv *KeepService
278                 mnt *KeepMount
279         }
280         deviceMounted := map[string]deviceMount{} // DeviceID -> mount
281         for _, srv := range bal.KeepServices {
282                 for _, mnt := range srv.mounts {
283                         if first, dup := deviceMounted[mnt.DeviceID]; dup && first.mnt.UUID != mnt.UUID && mnt.DeviceID != "" {
284                                 bal.logf("config error: device %s is mounted with multiple volume UUIDs: %s on %s, and %s on %s",
285                                         mnt.DeviceID,
286                                         first.mnt.UUID, first.srv,
287                                         mnt.UUID, srv)
288                                 mountProblem = true
289                                 continue
290                         }
291                         deviceMounted[mnt.DeviceID] = deviceMount{srv, mnt}
292                 }
293         }
294         if mountProblem {
295                 return errors.New("cannot continue with config errors (see above)")
296         }
297
298         var checkPage arvados.CollectionList
299         if err = c.RequestAndDecode(&checkPage, "GET", "arvados/v1/collections", nil, arvados.ResourceListParams{
300                 Limit:              new(int),
301                 Count:              "exact",
302                 IncludeTrash:       true,
303                 IncludeOldVersions: true,
304                 Filters: []arvados.Filter{{
305                         Attr:     "modified_at",
306                         Operator: "=",
307                         Operand:  nil,
308                 }},
309         }); err != nil {
310                 return err
311         } else if n := checkPage.ItemsAvailable; n > 0 {
312                 return fmt.Errorf("%d collections exist with null modified_at; cannot fetch reliably", n)
313         }
314
315         return nil
316 }
317
318 // rendezvousState returns a fingerprint (e.g., a sorted list of
319 // UUID+host+port) of the current set of keep services.
320 func (bal *Balancer) rendezvousState() string {
321         srvs := make([]string, 0, len(bal.KeepServices))
322         for _, srv := range bal.KeepServices {
323                 srvs = append(srvs, srv.String())
324         }
325         sort.Strings(srvs)
326         return strings.Join(srvs, "; ")
327 }
328
329 // ClearTrashLists sends an empty trash list to each keep
330 // service. Calling this before GetCurrentState avoids races.
331 //
332 // When a block appears in an index, we assume that replica will still
333 // exist after we delete other replicas on other servers. However,
334 // it's possible that a previous rebalancing operation made different
335 // decisions (e.g., servers were added/removed, and rendezvous order
336 // changed). In this case, the replica might already be on that
337 // server's trash list, and it might be deleted before we send a
338 // replacement trash list.
339 //
340 // We avoid this problem if we clear all trash lists before getting
341 // indexes. (We also assume there is only one rebalancing process
342 // running at a time.)
343 func (bal *Balancer) ClearTrashLists(ctx context.Context, c *arvados.Client) error {
344         for _, srv := range bal.KeepServices {
345                 srv.ChangeSet = &ChangeSet{}
346         }
347         return bal.CommitTrash(ctx, c)
348 }
349
350 // GetCurrentState determines the current replication state, and the
351 // desired replication level, for every block that is either
352 // retrievable or referenced.
353 //
354 // It determines the current replication state by reading the block index
355 // from every known Keep service.
356 //
357 // It determines the desired replication level by retrieving all
358 // collection manifests in the database (API server).
359 //
360 // It encodes the resulting information in BlockStateMap.
361 func (bal *Balancer) GetCurrentState(ctx context.Context, c *arvados.Client, pageSize, bufs int) error {
362         ctx, cancel := context.WithCancel(ctx)
363         defer cancel()
364
365         defer bal.time("get_state", "wall clock time to get current state")()
366         bal.BlockStateMap = NewBlockStateMap()
367
368         dd, err := c.DiscoveryDocument()
369         if err != nil {
370                 return err
371         }
372         bal.DefaultReplication = dd.DefaultCollectionReplication
373         bal.MinMtime = time.Now().UnixNano() - dd.BlobSignatureTTL*1e9
374
375         errs := make(chan error, 1)
376         wg := sync.WaitGroup{}
377
378         // When a device is mounted more than once, we will get its
379         // index only once, and call AddReplicas on all of the mounts.
380         // equivMount keys are the mounts that will be indexed, and
381         // each value is a list of mounts to apply the received index
382         // to.
383         equivMount := map[*KeepMount][]*KeepMount{}
384         // deviceMount maps each device ID to the one mount that will
385         // be indexed for that device.
386         deviceMount := map[string]*KeepMount{}
387         for _, srv := range bal.KeepServices {
388                 for _, mnt := range srv.mounts {
389                         equiv := deviceMount[mnt.UUID]
390                         if equiv == nil {
391                                 equiv = mnt
392                                 deviceMount[mnt.UUID] = equiv
393                         }
394                         equivMount[equiv] = append(equivMount[equiv], mnt)
395                 }
396         }
397
398         // Start one goroutine for each (non-redundant) mount:
399         // retrieve the index, and add the returned blocks to
400         // BlockStateMap.
401         for _, mounts := range equivMount {
402                 wg.Add(1)
403                 go func(mounts []*KeepMount) {
404                         defer wg.Done()
405                         bal.logf("mount %s: retrieve index from %s", mounts[0], mounts[0].KeepService)
406                         idx, err := mounts[0].KeepService.IndexMount(ctx, c, mounts[0].UUID, "")
407                         if err != nil {
408                                 select {
409                                 case errs <- fmt.Errorf("%s: retrieve index: %v", mounts[0], err):
410                                 default:
411                                 }
412                                 cancel()
413                                 return
414                         }
415                         if len(errs) > 0 {
416                                 // Some other goroutine encountered an
417                                 // error -- any further effort here
418                                 // will be wasted.
419                                 return
420                         }
421                         for _, mount := range mounts {
422                                 bal.logf("%s: add %d entries to map", mount, len(idx))
423                                 bal.BlockStateMap.AddReplicas(mount, idx)
424                                 bal.logf("%s: added %d entries to map at %dx (%d replicas)", mount, len(idx), mount.Replication, len(idx)*mount.Replication)
425                         }
426                         bal.logf("mount %s: index done", mounts[0])
427                 }(mounts)
428         }
429
430         collQ := make(chan arvados.Collection, bufs)
431
432         // Retrieve all collections from the database and send them to
433         // collQ.
434         wg.Add(1)
435         go func() {
436                 defer wg.Done()
437                 err = EachCollection(ctx, bal.DB, c,
438                         func(coll arvados.Collection) error {
439                                 collQ <- coll
440                                 if len(errs) > 0 {
441                                         // some other GetCurrentState
442                                         // error happened: no point
443                                         // getting any more
444                                         // collections.
445                                         return fmt.Errorf("")
446                                 }
447                                 return nil
448                         }, func(done, total int) {
449                                 bal.logf("collections: %d/%d", done, total)
450                         })
451                 close(collQ)
452                 if err != nil {
453                         select {
454                         case errs <- err:
455                         default:
456                         }
457                         cancel()
458                 }
459         }()
460
461         // Parse manifests from collQ and pass the block hashes to
462         // BlockStateMap to track desired replication.
463         for i := 0; i < runtime.NumCPU(); i++ {
464                 wg.Add(1)
465                 go func() {
466                         defer wg.Done()
467                         for coll := range collQ {
468                                 err := bal.addCollection(coll)
469                                 if err != nil || len(errs) > 0 {
470                                         select {
471                                         case errs <- err:
472                                         default:
473                                         }
474                                         cancel()
475                                         continue
476                                 }
477                                 atomic.AddInt64(&bal.collScanned, 1)
478                         }
479                 }()
480         }
481
482         wg.Wait()
483         if len(errs) > 0 {
484                 return <-errs
485         }
486         return nil
487 }
488
489 func (bal *Balancer) addCollection(coll arvados.Collection) error {
490         blkids, err := coll.SizedDigests()
491         if err != nil {
492                 return fmt.Errorf("%v: %v", coll.UUID, err)
493         }
494         repl := bal.DefaultReplication
495         if coll.ReplicationDesired != nil {
496                 repl = *coll.ReplicationDesired
497         }
498         bal.Logger.Debugf("%v: %d blocks x%d", coll.UUID, len(blkids), repl)
499         // Pass pdh to IncreaseDesired only if LostBlocksFile is being
500         // written -- otherwise it's just a waste of memory.
501         pdh := ""
502         if bal.LostBlocksFile != "" {
503                 pdh = coll.PortableDataHash
504         }
505         bal.BlockStateMap.IncreaseDesired(pdh, coll.StorageClassesDesired, repl, blkids)
506         return nil
507 }
508
509 // ComputeChangeSets compares, for each known block, the current and
510 // desired replication states. If it is possible to get closer to the
511 // desired state by copying or deleting blocks, it adds those changes
512 // to the relevant KeepServices' ChangeSets.
513 //
514 // It does not actually apply any of the computed changes.
515 func (bal *Balancer) ComputeChangeSets() {
516         // This just calls balanceBlock() once for each block, using a
517         // pool of worker goroutines.
518         defer bal.time("changeset_compute", "wall clock time to compute changesets")()
519         bal.setupLookupTables()
520
521         type balanceTask struct {
522                 blkid arvados.SizedDigest
523                 blk   *BlockState
524         }
525         workers := runtime.GOMAXPROCS(-1)
526         todo := make(chan balanceTask, workers)
527         go func() {
528                 bal.BlockStateMap.Apply(func(blkid arvados.SizedDigest, blk *BlockState) {
529                         todo <- balanceTask{
530                                 blkid: blkid,
531                                 blk:   blk,
532                         }
533                 })
534                 close(todo)
535         }()
536         results := make(chan balanceResult, workers)
537         go func() {
538                 var wg sync.WaitGroup
539                 for i := 0; i < workers; i++ {
540                         wg.Add(1)
541                         go func() {
542                                 for work := range todo {
543                                         results <- bal.balanceBlock(work.blkid, work.blk)
544                                 }
545                                 wg.Done()
546                         }()
547                 }
548                 wg.Wait()
549                 close(results)
550         }()
551         bal.collectStatistics(results)
552 }
553
554 func (bal *Balancer) setupLookupTables() {
555         bal.serviceRoots = make(map[string]string)
556         bal.classes = defaultClasses
557         bal.mountsByClass = map[string]map[*KeepMount]bool{"default": {}}
558         bal.mounts = 0
559         for _, srv := range bal.KeepServices {
560                 bal.serviceRoots[srv.UUID] = srv.UUID
561                 for _, mnt := range srv.mounts {
562                         bal.mounts++
563
564                         // All mounts on a read-only service are
565                         // effectively read-only.
566                         mnt.ReadOnly = mnt.ReadOnly || srv.ReadOnly
567
568                         for class := range mnt.StorageClasses {
569                                 if mbc := bal.mountsByClass[class]; mbc == nil {
570                                         bal.classes = append(bal.classes, class)
571                                         bal.mountsByClass[class] = map[*KeepMount]bool{mnt: true}
572                                 } else {
573                                         mbc[mnt] = true
574                                 }
575                         }
576                 }
577         }
578         // Consider classes in lexicographic order to avoid flapping
579         // between balancing runs.  The outcome of the "prefer a mount
580         // we're already planning to use for a different storage
581         // class" case in balanceBlock depends on the order classes
582         // are considered.
583         sort.Strings(bal.classes)
584 }
585
586 const (
587         changeStay = iota
588         changePull
589         changeTrash
590         changeNone
591 )
592
593 var changeName = map[int]string{
594         changeStay:  "stay",
595         changePull:  "pull",
596         changeTrash: "trash",
597         changeNone:  "none",
598 }
599
600 type balancedBlockState struct {
601         needed       int
602         unneeded     int
603         pulling      int
604         unachievable bool
605 }
606
607 type balanceResult struct {
608         blk        *BlockState
609         blkid      arvados.SizedDigest
610         lost       bool
611         blockState balancedBlockState
612         classState map[string]balancedBlockState
613 }
614
615 type slot struct {
616         mnt  *KeepMount // never nil
617         repl *Replica   // replica already stored here (or nil)
618         want bool       // we should pull/leave a replica here
619 }
620
621 // balanceBlock compares current state to desired state for a single
622 // block, and makes the appropriate ChangeSet calls.
623 func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) balanceResult {
624         bal.Logger.Debugf("balanceBlock: %v %+v", blkid, blk)
625
626         // Build a list of all slots (one per mounted volume).
627         slots := make([]slot, 0, bal.mounts)
628         for _, srv := range bal.KeepServices {
629                 for _, mnt := range srv.mounts {
630                         var repl *Replica
631                         for r := range blk.Replicas {
632                                 if blk.Replicas[r].KeepMount == mnt {
633                                         repl = &blk.Replicas[r]
634                                 }
635                         }
636                         // Initial value of "want" is "have, and can't
637                         // delete". These untrashable replicas get
638                         // prioritized when sorting slots: otherwise,
639                         // non-optimal readonly copies would cause us
640                         // to overreplicate.
641                         slots = append(slots, slot{
642                                 mnt:  mnt,
643                                 repl: repl,
644                                 want: repl != nil && mnt.ReadOnly,
645                         })
646                 }
647         }
648
649         uuids := keepclient.NewRootSorter(bal.serviceRoots, string(blkid[:32])).GetSortedRoots()
650         srvRendezvous := make(map[*KeepService]int, len(uuids))
651         for i, uuid := range uuids {
652                 srv := bal.KeepServices[uuid]
653                 srvRendezvous[srv] = i
654         }
655
656         // Below we set underreplicated=true if we find any storage
657         // class that's currently underreplicated -- in that case we
658         // won't want to trash any replicas.
659         underreplicated := false
660
661         unsafeToDelete := make(map[int64]bool, len(slots))
662         for _, class := range bal.classes {
663                 desired := blk.Desired[class]
664                 if desired == 0 {
665                         continue
666                 }
667
668                 // Sort the slots by desirability.
669                 sort.Slice(slots, func(i, j int) bool {
670                         si, sj := slots[i], slots[j]
671                         if classi, classj := bal.mountsByClass[class][si.mnt], bal.mountsByClass[class][sj.mnt]; classi != classj {
672                                 // Prefer a mount that satisfies the
673                                 // desired class.
674                                 return bal.mountsByClass[class][si.mnt]
675                         } else if si.want != sj.want {
676                                 // Prefer a mount that will have a
677                                 // replica no matter what we do here
678                                 // -- either because it already has an
679                                 // untrashable replica, or because we
680                                 // already need it to satisfy a
681                                 // different storage class.
682                                 return si.want
683                         } else if orderi, orderj := srvRendezvous[si.mnt.KeepService], srvRendezvous[sj.mnt.KeepService]; orderi != orderj {
684                                 // Prefer a better rendezvous
685                                 // position.
686                                 return orderi < orderj
687                         } else if repli, replj := si.repl != nil, sj.repl != nil; repli != replj {
688                                 // Prefer a mount that already has a
689                                 // replica.
690                                 return repli
691                         } else {
692                                 // If pull/trash turns out to be
693                                 // needed, distribute the
694                                 // new/remaining replicas uniformly
695                                 // across qualifying mounts on a given
696                                 // server.
697                                 return rendezvousLess(si.mnt.UUID, sj.mnt.UUID, blkid)
698                         }
699                 })
700
701                 // Servers/mounts/devices (with or without existing
702                 // replicas) that are part of the best achievable
703                 // layout for this storage class.
704                 wantSrv := map[*KeepService]bool{}
705                 wantMnt := map[*KeepMount]bool{}
706                 wantDev := map[string]bool{}
707                 // Positions (with existing replicas) that have been
708                 // protected (via unsafeToDelete) to ensure we don't
709                 // reduce replication below desired level when
710                 // trashing replicas that aren't optimal positions for
711                 // any storage class.
712                 protMnt := map[*KeepMount]bool{}
713                 // Replication planned so far (corresponds to wantMnt).
714                 replWant := 0
715                 // Protected replication (corresponds to protMnt).
716                 replProt := 0
717
718                 // trySlot tries using a slot to meet requirements,
719                 // and returns true if all requirements are met.
720                 trySlot := func(i int) bool {
721                         slot := slots[i]
722                         if wantMnt[slot.mnt] || wantDev[slot.mnt.UUID] {
723                                 // Already allocated a replica to this
724                                 // backend device, possibly on a
725                                 // different server.
726                                 return false
727                         }
728                         if replProt < desired && slot.repl != nil && !protMnt[slot.mnt] {
729                                 unsafeToDelete[slot.repl.Mtime] = true
730                                 protMnt[slot.mnt] = true
731                                 replProt += slot.mnt.Replication
732                         }
733                         if replWant < desired && (slot.repl != nil || !slot.mnt.ReadOnly) {
734                                 slots[i].want = true
735                                 wantSrv[slot.mnt.KeepService] = true
736                                 wantMnt[slot.mnt] = true
737                                 wantDev[slot.mnt.UUID] = true
738                                 replWant += slot.mnt.Replication
739                         }
740                         return replProt >= desired && replWant >= desired
741                 }
742
743                 // First try to achieve desired replication without
744                 // using the same server twice.
745                 done := false
746                 for i := 0; i < len(slots) && !done; i++ {
747                         if !wantSrv[slots[i].mnt.KeepService] {
748                                 done = trySlot(i)
749                         }
750                 }
751
752                 // If that didn't suffice, do another pass without the
753                 // "distinct services" restriction. (Achieving the
754                 // desired volume replication on fewer than the
755                 // desired number of services is better than
756                 // underreplicating.)
757                 for i := 0; i < len(slots) && !done; i++ {
758                         done = trySlot(i)
759                 }
760
761                 if !underreplicated {
762                         safe := 0
763                         for _, slot := range slots {
764                                 if slot.repl == nil || !bal.mountsByClass[class][slot.mnt] {
765                                         continue
766                                 }
767                                 if safe += slot.mnt.Replication; safe >= desired {
768                                         break
769                                 }
770                         }
771                         underreplicated = safe < desired
772                 }
773
774                 // Avoid deleting wanted replicas from devices that
775                 // are mounted on multiple servers -- even if they
776                 // haven't already been added to unsafeToDelete
777                 // because the servers report different Mtimes.
778                 for _, slot := range slots {
779                         if slot.repl != nil && wantDev[slot.mnt.UUID] {
780                                 unsafeToDelete[slot.repl.Mtime] = true
781                         }
782                 }
783         }
784
785         // TODO: If multiple replicas are trashable, prefer the oldest
786         // replica that doesn't have a timestamp collision with
787         // others.
788
789         for i, slot := range slots {
790                 // Don't trash (1) any replicas of an underreplicated
791                 // block, even if they're in the wrong positions, or
792                 // (2) any replicas whose Mtimes are identical to
793                 // needed replicas (in case we're really seeing the
794                 // same copy via different mounts).
795                 if slot.repl != nil && (underreplicated || unsafeToDelete[slot.repl.Mtime]) {
796                         slots[i].want = true
797                 }
798         }
799
800         classState := make(map[string]balancedBlockState, len(bal.classes))
801         for _, class := range bal.classes {
802                 classState[class] = computeBlockState(slots, bal.mountsByClass[class], len(blk.Replicas), blk.Desired[class])
803         }
804         blockState := computeBlockState(slots, nil, len(blk.Replicas), 0)
805
806         var lost bool
807         var changes []string
808         for _, slot := range slots {
809                 // TODO: request a Touch if Mtime is duplicated.
810                 var change int
811                 switch {
812                 case !slot.want && slot.repl != nil && slot.repl.Mtime < bal.MinMtime:
813                         slot.mnt.KeepService.AddTrash(Trash{
814                                 SizedDigest: blkid,
815                                 Mtime:       slot.repl.Mtime,
816                                 From:        slot.mnt,
817                         })
818                         change = changeTrash
819                 case slot.repl == nil && slot.want && len(blk.Replicas) == 0:
820                         lost = true
821                         change = changeNone
822                 case slot.repl == nil && slot.want && !slot.mnt.ReadOnly:
823                         slot.mnt.KeepService.AddPull(Pull{
824                                 SizedDigest: blkid,
825                                 From:        blk.Replicas[0].KeepMount.KeepService,
826                                 To:          slot.mnt,
827                         })
828                         change = changePull
829                 case slot.repl != nil:
830                         change = changeStay
831                 default:
832                         change = changeNone
833                 }
834                 if bal.Dumper != nil {
835                         var mtime int64
836                         if slot.repl != nil {
837                                 mtime = slot.repl.Mtime
838                         }
839                         srv := slot.mnt.KeepService
840                         changes = append(changes, fmt.Sprintf("%s:%d/%s=%s,%d", srv.ServiceHost, srv.ServicePort, slot.mnt.UUID, changeName[change], mtime))
841                 }
842         }
843         if bal.Dumper != nil {
844                 bal.Dumper.Printf("%s refs=%d needed=%d unneeded=%d pulling=%v %v %v", blkid, blk.RefCount, blockState.needed, blockState.unneeded, blockState.pulling, blk.Desired, changes)
845         }
846         return balanceResult{
847                 blk:        blk,
848                 blkid:      blkid,
849                 lost:       lost,
850                 blockState: blockState,
851                 classState: classState,
852         }
853 }
854
855 func computeBlockState(slots []slot, onlyCount map[*KeepMount]bool, have, needRepl int) (bbs balancedBlockState) {
856         repl := 0
857         countedDev := map[string]bool{}
858         for _, slot := range slots {
859                 if onlyCount != nil && !onlyCount[slot.mnt] {
860                         continue
861                 }
862                 if countedDev[slot.mnt.UUID] {
863                         continue
864                 }
865                 switch {
866                 case slot.repl != nil && slot.want:
867                         bbs.needed++
868                         repl += slot.mnt.Replication
869                 case slot.repl != nil && !slot.want:
870                         bbs.unneeded++
871                         repl += slot.mnt.Replication
872                 case slot.repl == nil && slot.want && have > 0:
873                         bbs.pulling++
874                         repl += slot.mnt.Replication
875                 }
876                 countedDev[slot.mnt.UUID] = true
877         }
878         if repl < needRepl {
879                 bbs.unachievable = true
880         }
881         return
882 }
883
884 type blocksNBytes struct {
885         replicas int
886         blocks   int
887         bytes    int64
888 }
889
890 func (bb blocksNBytes) String() string {
891         return fmt.Sprintf("%d replicas (%d blocks, %d bytes)", bb.replicas, bb.blocks, bb.bytes)
892 }
893
894 type replicationStats struct {
895         needed       blocksNBytes
896         unneeded     blocksNBytes
897         pulling      blocksNBytes
898         unachievable blocksNBytes
899 }
900
901 type balancerStats struct {
902         lost          blocksNBytes
903         overrep       blocksNBytes
904         unref         blocksNBytes
905         garbage       blocksNBytes
906         underrep      blocksNBytes
907         unachievable  blocksNBytes
908         justright     blocksNBytes
909         desired       blocksNBytes
910         current       blocksNBytes
911         pulls         int
912         trashes       int
913         replHistogram []int
914         classStats    map[string]replicationStats
915
916         // collectionBytes / collectionBlockBytes = deduplication ratio
917         collectionBytes      int64 // sum(bytes in referenced blocks) across all collections
918         collectionBlockBytes int64 // sum(block size) across all blocks referenced by collections
919         collectionBlockRefs  int64 // sum(number of blocks referenced) across all collections
920         collectionBlocks     int64 // number of blocks referenced by any collection
921 }
922
923 func (s *balancerStats) dedupByteRatio() float64 {
924         if s.collectionBlockBytes == 0 {
925                 return 0
926         }
927         return float64(s.collectionBytes) / float64(s.collectionBlockBytes)
928 }
929
930 func (s *balancerStats) dedupBlockRatio() float64 {
931         if s.collectionBlocks == 0 {
932                 return 0
933         }
934         return float64(s.collectionBlockRefs) / float64(s.collectionBlocks)
935 }
936
937 func (bal *Balancer) collectStatistics(results <-chan balanceResult) {
938         var s balancerStats
939         s.replHistogram = make([]int, 2)
940         s.classStats = make(map[string]replicationStats, len(bal.classes))
941         for result := range results {
942                 bytes := result.blkid.Size()
943
944                 if rc := int64(result.blk.RefCount); rc > 0 {
945                         s.collectionBytes += rc * bytes
946                         s.collectionBlockBytes += bytes
947                         s.collectionBlockRefs += rc
948                         s.collectionBlocks++
949                 }
950
951                 for class, state := range result.classState {
952                         cs := s.classStats[class]
953                         if state.unachievable {
954                                 cs.unachievable.replicas++
955                                 cs.unachievable.blocks++
956                                 cs.unachievable.bytes += bytes
957                         }
958                         if state.needed > 0 {
959                                 cs.needed.replicas += state.needed
960                                 cs.needed.blocks++
961                                 cs.needed.bytes += bytes * int64(state.needed)
962                         }
963                         if state.unneeded > 0 {
964                                 cs.unneeded.replicas += state.unneeded
965                                 cs.unneeded.blocks++
966                                 cs.unneeded.bytes += bytes * int64(state.unneeded)
967                         }
968                         if state.pulling > 0 {
969                                 cs.pulling.replicas += state.pulling
970                                 cs.pulling.blocks++
971                                 cs.pulling.bytes += bytes * int64(state.pulling)
972                         }
973                         s.classStats[class] = cs
974                 }
975
976                 bs := result.blockState
977                 switch {
978                 case result.lost:
979                         s.lost.replicas++
980                         s.lost.blocks++
981                         s.lost.bytes += bytes
982                         fmt.Fprintf(bal.lostBlocks, "%s", strings.SplitN(string(result.blkid), "+", 2)[0])
983                         for pdh := range result.blk.Refs {
984                                 fmt.Fprintf(bal.lostBlocks, " %s", pdh)
985                         }
986                         fmt.Fprint(bal.lostBlocks, "\n")
987                 case bs.pulling > 0:
988                         s.underrep.replicas += bs.pulling
989                         s.underrep.blocks++
990                         s.underrep.bytes += bytes * int64(bs.pulling)
991                 case bs.unachievable:
992                         s.underrep.replicas++
993                         s.underrep.blocks++
994                         s.underrep.bytes += bytes
995                 case bs.unneeded > 0 && bs.needed == 0:
996                         // Count as "garbage" if all replicas are old
997                         // enough to trash, otherwise count as
998                         // "unref".
999                         counter := &s.garbage
1000                         for _, r := range result.blk.Replicas {
1001                                 if r.Mtime >= bal.MinMtime {
1002                                         counter = &s.unref
1003                                         break
1004                                 }
1005                         }
1006                         counter.replicas += bs.unneeded
1007                         counter.blocks++
1008                         counter.bytes += bytes * int64(bs.unneeded)
1009                 case bs.unneeded > 0:
1010                         s.overrep.replicas += bs.unneeded
1011                         s.overrep.blocks++
1012                         s.overrep.bytes += bytes * int64(bs.unneeded)
1013                 default:
1014                         s.justright.replicas += bs.needed
1015                         s.justright.blocks++
1016                         s.justright.bytes += bytes * int64(bs.needed)
1017                 }
1018
1019                 if bs.needed > 0 {
1020                         s.desired.replicas += bs.needed
1021                         s.desired.blocks++
1022                         s.desired.bytes += bytes * int64(bs.needed)
1023                 }
1024                 if bs.needed+bs.unneeded > 0 {
1025                         s.current.replicas += bs.needed + bs.unneeded
1026                         s.current.blocks++
1027                         s.current.bytes += bytes * int64(bs.needed+bs.unneeded)
1028                 }
1029
1030                 for len(s.replHistogram) <= bs.needed+bs.unneeded {
1031                         s.replHistogram = append(s.replHistogram, 0)
1032                 }
1033                 s.replHistogram[bs.needed+bs.unneeded]++
1034         }
1035         for _, srv := range bal.KeepServices {
1036                 s.pulls += len(srv.ChangeSet.Pulls)
1037                 s.trashes += len(srv.ChangeSet.Trashes)
1038         }
1039         bal.stats = s
1040         bal.Metrics.UpdateStats(s)
1041 }
1042
1043 // PrintStatistics writes statistics about the computed changes to
1044 // bal.Logger. It should not be called until ComputeChangeSets has
1045 // finished.
1046 func (bal *Balancer) PrintStatistics() {
1047         bal.logf("===")
1048         bal.logf("%s lost (0=have<want)", bal.stats.lost)
1049         bal.logf("%s underreplicated (0<have<want)", bal.stats.underrep)
1050         bal.logf("%s just right (have=want)", bal.stats.justright)
1051         bal.logf("%s overreplicated (have>want>0)", bal.stats.overrep)
1052         bal.logf("%s unreferenced (have>want=0, new)", bal.stats.unref)
1053         bal.logf("%s garbage (have>want=0, old)", bal.stats.garbage)
1054         for _, class := range bal.classes {
1055                 cs := bal.stats.classStats[class]
1056                 bal.logf("===")
1057                 bal.logf("storage class %q: %s needed", class, cs.needed)
1058                 bal.logf("storage class %q: %s unneeded", class, cs.unneeded)
1059                 bal.logf("storage class %q: %s pulling", class, cs.pulling)
1060                 bal.logf("storage class %q: %s unachievable", class, cs.unachievable)
1061         }
1062         bal.logf("===")
1063         bal.logf("%s total commitment (excluding unreferenced)", bal.stats.desired)
1064         bal.logf("%s total usage", bal.stats.current)
1065         bal.logf("===")
1066         for _, srv := range bal.KeepServices {
1067                 bal.logf("%s: %v\n", srv, srv.ChangeSet)
1068         }
1069         bal.logf("===")
1070         bal.printHistogram(60)
1071         bal.logf("===")
1072 }
1073
1074 func (bal *Balancer) printHistogram(hashColumns int) {
1075         bal.logf("Replication level distribution:")
1076         maxCount := 0
1077         for _, count := range bal.stats.replHistogram {
1078                 if maxCount < count {
1079                         maxCount = count
1080                 }
1081         }
1082         hashes := strings.Repeat("#", hashColumns)
1083         countWidth := 1 + int(math.Log10(float64(maxCount+1)))
1084         scaleCount := 10 * float64(hashColumns) / math.Floor(1+10*math.Log10(float64(maxCount+1)))
1085         for repl, count := range bal.stats.replHistogram {
1086                 nHashes := int(scaleCount * math.Log10(float64(count+1)))
1087                 bal.logf("%2d: %*d %s", repl, countWidth, count, hashes[:nHashes])
1088         }
1089 }
1090
1091 // CheckSanityLate checks for configuration and runtime errors after
1092 // GetCurrentState() and ComputeChangeSets() have finished.
1093 //
1094 // If it returns an error, it is dangerous to run any Commit methods.
1095 func (bal *Balancer) CheckSanityLate() error {
1096         if bal.errors != nil {
1097                 for _, err := range bal.errors {
1098                         bal.logf("deferred error: %v", err)
1099                 }
1100                 return fmt.Errorf("cannot proceed safely after deferred errors")
1101         }
1102
1103         if bal.collScanned == 0 {
1104                 return fmt.Errorf("received zero collections")
1105         }
1106
1107         anyDesired := false
1108         bal.BlockStateMap.Apply(func(_ arvados.SizedDigest, blk *BlockState) {
1109                 for _, desired := range blk.Desired {
1110                         if desired > 0 {
1111                                 anyDesired = true
1112                                 break
1113                         }
1114                 }
1115         })
1116         if !anyDesired {
1117                 return fmt.Errorf("zero blocks have desired replication>0")
1118         }
1119
1120         if dr := bal.DefaultReplication; dr < 1 {
1121                 return fmt.Errorf("Default replication (%d) is less than 1", dr)
1122         }
1123
1124         // TODO: no two services have identical indexes
1125         // TODO: no collisions (same md5, different size)
1126         return nil
1127 }
1128
1129 // CommitPulls sends the computed lists of pull requests to the
1130 // keepstore servers. This has the effect of increasing replication of
1131 // existing blocks that are either underreplicated or poorly
1132 // distributed according to rendezvous hashing.
1133 func (bal *Balancer) CommitPulls(ctx context.Context, c *arvados.Client) error {
1134         defer bal.time("send_pull_lists", "wall clock time to send pull lists")()
1135         return bal.commitAsync(c, "send pull list",
1136                 func(srv *KeepService) error {
1137                         return srv.CommitPulls(ctx, c)
1138                 })
1139 }
1140
1141 // CommitTrash sends the computed lists of trash requests to the
1142 // keepstore servers. This has the effect of deleting blocks that are
1143 // overreplicated or unreferenced.
1144 func (bal *Balancer) CommitTrash(ctx context.Context, c *arvados.Client) error {
1145         defer bal.time("send_trash_lists", "wall clock time to send trash lists")()
1146         return bal.commitAsync(c, "send trash list",
1147                 func(srv *KeepService) error {
1148                         return srv.CommitTrash(ctx, c)
1149                 })
1150 }
1151
1152 func (bal *Balancer) commitAsync(c *arvados.Client, label string, f func(srv *KeepService) error) error {
1153         errs := make(chan error)
1154         for _, srv := range bal.KeepServices {
1155                 go func(srv *KeepService) {
1156                         var err error
1157                         defer func() { errs <- err }()
1158                         label := fmt.Sprintf("%s: %v", srv, label)
1159                         err = f(srv)
1160                         if err != nil {
1161                                 err = fmt.Errorf("%s: %v", label, err)
1162                         }
1163                 }(srv)
1164         }
1165         var lastErr error
1166         for range bal.KeepServices {
1167                 if err := <-errs; err != nil {
1168                         bal.logf("%v", err)
1169                         lastErr = err
1170                 }
1171         }
1172         close(errs)
1173         return lastErr
1174 }
1175
1176 func (bal *Balancer) logf(f string, args ...interface{}) {
1177         if bal.Logger != nil {
1178                 bal.Logger.Printf(f, args...)
1179         }
1180 }
1181
1182 func (bal *Balancer) time(name, help string) func() {
1183         observer := bal.Metrics.DurationObserver(name+"_seconds", help)
1184         t0 := time.Now()
1185         bal.Logger.Printf("%s: start", name)
1186         return func() {
1187                 dur := time.Since(t0)
1188                 observer.Observe(dur.Seconds())
1189                 bal.Logger.Printf("%s: took %vs", name, dur.Seconds())
1190         }
1191 }
1192
1193 // Rendezvous hash sort function. Less efficient than sorting on
1194 // precomputed rendezvous hashes, but also rarely used.
1195 func rendezvousLess(i, j string, blkid arvados.SizedDigest) bool {
1196         a := md5.Sum([]byte(string(blkid[:32]) + i))
1197         b := md5.Sum([]byte(string(blkid[:32]) + j))
1198         return bytes.Compare(a[:], b[:]) < 0
1199 }