20831: Make the IsAdmin and IsInvited pointers so they are nullable
[arvados.git] / services / keep-balance / balance.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package keepbalance
6
7 import (
8         "bytes"
9         "context"
10         "crypto/md5"
11         "errors"
12         "fmt"
13         "io"
14         "io/ioutil"
15         "log"
16         "math"
17         "os"
18         "regexp"
19         "runtime"
20         "sort"
21         "strconv"
22         "strings"
23         "sync"
24         "sync/atomic"
25         "syscall"
26         "time"
27
28         "git.arvados.org/arvados.git/lib/controller/dblock"
29         "git.arvados.org/arvados.git/sdk/go/arvados"
30         "git.arvados.org/arvados.git/sdk/go/keepclient"
31         "github.com/jmoiron/sqlx"
32         "github.com/sirupsen/logrus"
33 )
34
35 // Balancer compares the contents of keepstore servers with the
36 // collections stored in Arvados, and issues pull/trash requests
37 // needed to get (closer to) the optimal data layout.
38 //
39 // In the optimal data layout: every data block referenced by a
40 // collection is replicated at least as many times as desired by the
41 // collection; there are no unreferenced data blocks older than
42 // BlobSignatureTTL; and all N existing replicas of a given data block
43 // are in the N best positions in rendezvous probe order.
44 type Balancer struct {
45         DB      *sqlx.DB
46         Logger  logrus.FieldLogger
47         Dumper  logrus.FieldLogger
48         Metrics *metrics
49
50         ChunkPrefix    string
51         LostBlocksFile string
52
53         *BlockStateMap
54         KeepServices       map[string]*KeepService
55         DefaultReplication int
56         MinMtime           int64
57
58         classes       []string
59         mounts        int
60         mountsByClass map[string]map[*KeepMount]bool
61         collScanned   int64
62         serviceRoots  map[string]string
63         errors        []error
64         stats         balancerStats
65         mutex         sync.Mutex
66         lostBlocks    io.Writer
67 }
68
69 // Run performs a balance operation using the given config and
70 // runOptions, and returns RunOptions suitable for passing to a
71 // subsequent balance operation.
72 //
73 // Run should only be called once on a given Balancer object.
74 func (bal *Balancer) Run(ctx context.Context, client *arvados.Client, cluster *arvados.Cluster, runOptions RunOptions) (nextRunOptions RunOptions, err error) {
75         nextRunOptions = runOptions
76
77         bal.logf("acquiring active lock")
78         if !dblock.KeepBalanceActive.Lock(ctx, func(context.Context) (*sqlx.DB, error) { return bal.DB, nil }) {
79                 // context canceled
80                 return
81         }
82         defer dblock.KeepBalanceActive.Unlock()
83
84         defer bal.time("sweep", "wall clock time to run one full sweep")()
85
86         ctx, cancel := context.WithDeadline(ctx, time.Now().Add(cluster.Collections.BalanceTimeout.Duration()))
87         defer cancel()
88
89         go bal.reportMemorySize(ctx)
90
91         var lbFile *os.File
92         if bal.LostBlocksFile != "" {
93                 tmpfn := bal.LostBlocksFile + ".tmp"
94                 lbFile, err = os.OpenFile(tmpfn, os.O_CREATE|os.O_WRONLY, 0777)
95                 if err != nil {
96                         return
97                 }
98                 defer lbFile.Close()
99                 err = syscall.Flock(int(lbFile.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
100                 if err != nil {
101                         return
102                 }
103                 defer func() {
104                         // Remove the tempfile only if we didn't get
105                         // as far as successfully renaming it.
106                         if lbFile != nil {
107                                 os.Remove(tmpfn)
108                         }
109                 }()
110                 bal.lostBlocks = lbFile
111         } else {
112                 bal.lostBlocks = ioutil.Discard
113         }
114
115         err = bal.DiscoverKeepServices(client)
116         if err != nil {
117                 return
118         }
119
120         for _, srv := range bal.KeepServices {
121                 err = srv.discoverMounts(client)
122                 if err != nil {
123                         return
124                 }
125         }
126         bal.cleanupMounts()
127
128         if err = bal.CheckSanityEarly(client); err != nil {
129                 return
130         }
131
132         // On a big site, indexing and sending trash/pull lists can
133         // take much longer than the usual 5 minute client
134         // timeout. From here on, we rely on the context deadline
135         // instead, aborting the entire operation if any part takes
136         // too long.
137         client.Timeout = 0
138
139         rs := bal.rendezvousState()
140         if runOptions.CommitTrash && rs != runOptions.SafeRendezvousState {
141                 if runOptions.SafeRendezvousState != "" {
142                         bal.logf("notice: KeepServices list has changed since last run")
143                 }
144                 bal.logf("clearing existing trash lists, in case the new rendezvous order differs from previous run")
145                 if err = bal.ClearTrashLists(ctx, client); err != nil {
146                         return
147                 }
148                 // The current rendezvous state becomes "safe" (i.e.,
149                 // OK to compute changes for that state without
150                 // clearing existing trash lists) only now, after we
151                 // succeed in clearing existing trash lists.
152                 nextRunOptions.SafeRendezvousState = rs
153         }
154
155         if err = bal.GetCurrentState(ctx, client, cluster.Collections.BalanceCollectionBatch, cluster.Collections.BalanceCollectionBuffers); err != nil {
156                 return
157         }
158         bal.ComputeChangeSets()
159         bal.PrintStatistics()
160         if err = bal.CheckSanityLate(); err != nil {
161                 return
162         }
163         if lbFile != nil {
164                 err = lbFile.Sync()
165                 if err != nil {
166                         return
167                 }
168                 err = os.Rename(bal.LostBlocksFile+".tmp", bal.LostBlocksFile)
169                 if err != nil {
170                         return
171                 }
172                 lbFile = nil
173         }
174         if runOptions.CommitPulls {
175                 err = bal.CommitPulls(ctx, client)
176                 if err != nil {
177                         // Skip trash if we can't pull. (Too cautious?)
178                         return
179                 }
180         }
181         if runOptions.CommitTrash {
182                 err = bal.CommitTrash(ctx, client)
183                 if err != nil {
184                         return
185                 }
186         }
187         if runOptions.CommitConfirmedFields {
188                 err = bal.updateCollections(ctx, client, cluster)
189                 if err != nil {
190                         return
191                 }
192         }
193         return
194 }
195
196 // SetKeepServices sets the list of KeepServices to operate on.
197 func (bal *Balancer) SetKeepServices(srvList arvados.KeepServiceList) error {
198         bal.KeepServices = make(map[string]*KeepService)
199         for _, srv := range srvList.Items {
200                 bal.KeepServices[srv.UUID] = &KeepService{
201                         KeepService: srv,
202                         ChangeSet:   &ChangeSet{},
203                 }
204         }
205         return nil
206 }
207
208 // DiscoverKeepServices sets the list of KeepServices by calling the
209 // API to get a list of all services, and selecting the ones whose
210 // ServiceType is "disk"
211 func (bal *Balancer) DiscoverKeepServices(c *arvados.Client) error {
212         bal.KeepServices = make(map[string]*KeepService)
213         return c.EachKeepService(func(srv arvados.KeepService) error {
214                 if srv.ServiceType == "disk" {
215                         bal.KeepServices[srv.UUID] = &KeepService{
216                                 KeepService: srv,
217                                 ChangeSet:   &ChangeSet{},
218                         }
219                 } else {
220                         bal.logf("skipping %v with service type %q", srv.UUID, srv.ServiceType)
221                 }
222                 return nil
223         })
224 }
225
226 func (bal *Balancer) cleanupMounts() {
227         rwdev := map[string]*KeepService{}
228         for _, srv := range bal.KeepServices {
229                 for _, mnt := range srv.mounts {
230                         if mnt.AllowWrite {
231                                 rwdev[mnt.UUID] = srv
232                         }
233                 }
234         }
235         // Drop the readonly mounts whose device is mounted RW
236         // elsewhere.
237         for _, srv := range bal.KeepServices {
238                 var dedup []*KeepMount
239                 for _, mnt := range srv.mounts {
240                         if !mnt.AllowWrite && rwdev[mnt.UUID] != nil {
241                                 bal.logf("skipping srv %s readonly mount %q because same volume is mounted read-write on srv %s", srv, mnt.UUID, rwdev[mnt.UUID])
242                         } else {
243                                 dedup = append(dedup, mnt)
244                         }
245                 }
246                 srv.mounts = dedup
247         }
248         for _, srv := range bal.KeepServices {
249                 for _, mnt := range srv.mounts {
250                         if mnt.Replication <= 0 {
251                                 log.Printf("%s: mount %s reports replication=%d, using replication=1", srv, mnt.UUID, mnt.Replication)
252                                 mnt.Replication = 1
253                         }
254                 }
255         }
256 }
257
258 // CheckSanityEarly checks for configuration and runtime errors that
259 // can be detected before GetCurrentState() and ComputeChangeSets()
260 // are called.
261 //
262 // If it returns an error, it is pointless to run GetCurrentState or
263 // ComputeChangeSets: after doing so, the statistics would be
264 // meaningless and it would be dangerous to run any Commit methods.
265 func (bal *Balancer) CheckSanityEarly(c *arvados.Client) error {
266         u, err := c.CurrentUser()
267         if err != nil {
268                 return fmt.Errorf("CurrentUser(): %v", err)
269         }
270         if !u.IsActive || !*u.IsAdmin {
271                 return fmt.Errorf("current user (%s) is not an active admin user", u.UUID)
272         }
273         for _, srv := range bal.KeepServices {
274                 if srv.ServiceType == "proxy" {
275                         return fmt.Errorf("config error: %s: proxy servers cannot be balanced", srv)
276                 }
277         }
278         for _, c := range bal.ChunkPrefix {
279                 if !strings.ContainsRune("0123456789abcdef", c) {
280                         return fmt.Errorf("invalid char %q in chunk prefix %q: only lowercase hex digits make sense", string(c), bal.ChunkPrefix)
281                 }
282         }
283         if len(bal.ChunkPrefix) > 32 {
284                 return fmt.Errorf("invalid chunk prefix %q: longer than a block hash", bal.ChunkPrefix)
285         }
286
287         mountProblem := false
288         type deviceMount struct {
289                 srv *KeepService
290                 mnt *KeepMount
291         }
292         deviceMounted := map[string]deviceMount{} // DeviceID -> mount
293         for _, srv := range bal.KeepServices {
294                 for _, mnt := range srv.mounts {
295                         if first, dup := deviceMounted[mnt.DeviceID]; dup && first.mnt.UUID != mnt.UUID && mnt.DeviceID != "" {
296                                 bal.logf("config error: device %s is mounted with multiple volume UUIDs: %s on %s, and %s on %s",
297                                         mnt.DeviceID,
298                                         first.mnt.UUID, first.srv,
299                                         mnt.UUID, srv)
300                                 mountProblem = true
301                                 continue
302                         }
303                         deviceMounted[mnt.DeviceID] = deviceMount{srv, mnt}
304                 }
305         }
306         if mountProblem {
307                 return errors.New("cannot continue with config errors (see above)")
308         }
309
310         var checkPage arvados.CollectionList
311         if err = c.RequestAndDecode(&checkPage, "GET", "arvados/v1/collections", nil, arvados.ResourceListParams{
312                 Limit:              new(int),
313                 Count:              "exact",
314                 IncludeTrash:       true,
315                 IncludeOldVersions: true,
316                 Filters: []arvados.Filter{{
317                         Attr:     "modified_at",
318                         Operator: "=",
319                         Operand:  nil,
320                 }},
321         }); err != nil {
322                 return err
323         } else if n := checkPage.ItemsAvailable; n > 0 {
324                 return fmt.Errorf("%d collections exist with null modified_at; cannot fetch reliably", n)
325         }
326
327         return nil
328 }
329
330 // rendezvousState returns a fingerprint (e.g., a sorted list of
331 // UUID+host+port) of the current set of keep services.
332 func (bal *Balancer) rendezvousState() string {
333         srvs := make([]string, 0, len(bal.KeepServices))
334         for _, srv := range bal.KeepServices {
335                 srvs = append(srvs, srv.String())
336         }
337         sort.Strings(srvs)
338         return strings.Join(srvs, "; ")
339 }
340
341 // ClearTrashLists sends an empty trash list to each keep
342 // service. Calling this before GetCurrentState avoids races.
343 //
344 // When a block appears in an index, we assume that replica will still
345 // exist after we delete other replicas on other servers. However,
346 // it's possible that a previous rebalancing operation made different
347 // decisions (e.g., servers were added/removed, and rendezvous order
348 // changed). In this case, the replica might already be on that
349 // server's trash list, and it might be deleted before we send a
350 // replacement trash list.
351 //
352 // We avoid this problem if we clear all trash lists before getting
353 // indexes. (We also assume there is only one rebalancing process
354 // running at a time.)
355 func (bal *Balancer) ClearTrashLists(ctx context.Context, c *arvados.Client) error {
356         for _, srv := range bal.KeepServices {
357                 srv.ChangeSet = &ChangeSet{}
358         }
359         return bal.CommitTrash(ctx, c)
360 }
361
362 // GetCurrentState determines the current replication state, and the
363 // desired replication level, for every block that is either
364 // retrievable or referenced.
365 //
366 // It determines the current replication state by reading the block index
367 // from every known Keep service.
368 //
369 // It determines the desired replication level by retrieving all
370 // collection manifests in the database (API server).
371 //
372 // It encodes the resulting information in BlockStateMap.
373 func (bal *Balancer) GetCurrentState(ctx context.Context, c *arvados.Client, pageSize, bufs int) error {
374         ctx, cancel := context.WithCancel(ctx)
375         defer cancel()
376
377         defer bal.time("get_state", "wall clock time to get current state")()
378         bal.BlockStateMap = NewBlockStateMap()
379
380         dd, err := c.DiscoveryDocument()
381         if err != nil {
382                 return err
383         }
384         bal.DefaultReplication = dd.DefaultCollectionReplication
385         bal.MinMtime = time.Now().UnixNano() - dd.BlobSignatureTTL*1e9
386
387         errs := make(chan error, 1)
388         wg := sync.WaitGroup{}
389
390         // When a device is mounted more than once, we will get its
391         // index only once, and call AddReplicas on all of the mounts.
392         // equivMount keys are the mounts that will be indexed, and
393         // each value is a list of mounts to apply the received index
394         // to.
395         equivMount := map[*KeepMount][]*KeepMount{}
396         // deviceMount maps each device ID to the one mount that will
397         // be indexed for that device.
398         deviceMount := map[string]*KeepMount{}
399         for _, srv := range bal.KeepServices {
400                 for _, mnt := range srv.mounts {
401                         equiv := deviceMount[mnt.UUID]
402                         if equiv == nil {
403                                 equiv = mnt
404                                 deviceMount[mnt.UUID] = equiv
405                         }
406                         equivMount[equiv] = append(equivMount[equiv], mnt)
407                 }
408         }
409
410         // Start one goroutine for each (non-redundant) mount:
411         // retrieve the index, and add the returned blocks to
412         // BlockStateMap.
413         for _, mounts := range equivMount {
414                 wg.Add(1)
415                 go func(mounts []*KeepMount) {
416                         defer wg.Done()
417                         bal.logf("mount %s: retrieve index from %s", mounts[0], mounts[0].KeepService)
418                         idx, err := mounts[0].KeepService.IndexMount(ctx, c, mounts[0].UUID, bal.ChunkPrefix)
419                         if err != nil {
420                                 select {
421                                 case errs <- fmt.Errorf("%s: retrieve index: %v", mounts[0], err):
422                                 default:
423                                 }
424                                 cancel()
425                                 return
426                         }
427                         if len(errs) > 0 {
428                                 // Some other goroutine encountered an
429                                 // error -- any further effort here
430                                 // will be wasted.
431                                 return
432                         }
433                         for _, mount := range mounts {
434                                 bal.logf("%s: add %d entries to map", mount, len(idx))
435                                 bal.BlockStateMap.AddReplicas(mount, idx)
436                                 bal.logf("%s: added %d entries to map at %dx (%d replicas)", mount, len(idx), mount.Replication, len(idx)*mount.Replication)
437                         }
438                         bal.logf("mount %s: index done", mounts[0])
439                 }(mounts)
440         }
441
442         collQ := make(chan arvados.Collection, bufs)
443
444         // Retrieve all collections from the database and send them to
445         // collQ.
446         wg.Add(1)
447         go func() {
448                 defer wg.Done()
449                 err = EachCollection(ctx, bal.DB, c,
450                         func(coll arvados.Collection) error {
451                                 collQ <- coll
452                                 if len(errs) > 0 {
453                                         // some other GetCurrentState
454                                         // error happened: no point
455                                         // getting any more
456                                         // collections.
457                                         return fmt.Errorf("")
458                                 }
459                                 return nil
460                         }, func(done, total int) {
461                                 bal.logf("collections: %d/%d", done, total)
462                         })
463                 close(collQ)
464                 if err != nil {
465                         select {
466                         case errs <- err:
467                         default:
468                         }
469                         cancel()
470                 }
471         }()
472
473         // Parse manifests from collQ and pass the block hashes to
474         // BlockStateMap to track desired replication.
475         for i := 0; i < runtime.NumCPU(); i++ {
476                 wg.Add(1)
477                 go func() {
478                         defer wg.Done()
479                         for coll := range collQ {
480                                 err := bal.addCollection(coll)
481                                 if err != nil || len(errs) > 0 {
482                                         select {
483                                         case errs <- err:
484                                         default:
485                                         }
486                                         cancel()
487                                         continue
488                                 }
489                                 atomic.AddInt64(&bal.collScanned, 1)
490                         }
491                 }()
492         }
493
494         wg.Wait()
495         if len(errs) > 0 {
496                 return <-errs
497         }
498         return nil
499 }
500
501 func (bal *Balancer) addCollection(coll arvados.Collection) error {
502         blkids, err := coll.SizedDigests()
503         if err != nil {
504                 return fmt.Errorf("%v: %v", coll.UUID, err)
505         }
506         repl := bal.DefaultReplication
507         if coll.ReplicationDesired != nil {
508                 repl = *coll.ReplicationDesired
509         }
510         if bal.ChunkPrefix != "" {
511                 // Throw out blocks that don't match the requested
512                 // prefix.  (We save a bit of GC work here by
513                 // preallocating based on each hex digit in
514                 // ChunkPrefix reducing the expected size of the
515                 // filtered set by ~16x.)
516                 filtered := make([]arvados.SizedDigest, 0, len(blkids)>>(4*len(bal.ChunkPrefix)-1))
517                 for _, blkid := range blkids {
518                         if strings.HasPrefix(string(blkid), bal.ChunkPrefix) {
519                                 filtered = append(filtered, blkid)
520                         }
521                 }
522                 blkids = filtered
523         }
524         bal.Logger.Debugf("%v: %d blocks x%d", coll.UUID, len(blkids), repl)
525         // Pass pdh to IncreaseDesired only if LostBlocksFile is being
526         // written -- otherwise it's just a waste of memory.
527         pdh := ""
528         if bal.LostBlocksFile != "" {
529                 pdh = coll.PortableDataHash
530         }
531         bal.BlockStateMap.IncreaseDesired(pdh, coll.StorageClassesDesired, repl, blkids)
532         return nil
533 }
534
535 // ComputeChangeSets compares, for each known block, the current and
536 // desired replication states. If it is possible to get closer to the
537 // desired state by copying or deleting blocks, it adds those changes
538 // to the relevant KeepServices' ChangeSets.
539 //
540 // It does not actually apply any of the computed changes.
541 func (bal *Balancer) ComputeChangeSets() {
542         // This just calls balanceBlock() once for each block, using a
543         // pool of worker goroutines.
544         defer bal.time("changeset_compute", "wall clock time to compute changesets")()
545         bal.setupLookupTables()
546
547         type balanceTask struct {
548                 blkid arvados.SizedDigest
549                 blk   *BlockState
550         }
551         workers := runtime.GOMAXPROCS(-1)
552         todo := make(chan balanceTask, workers)
553         go func() {
554                 bal.BlockStateMap.Apply(func(blkid arvados.SizedDigest, blk *BlockState) {
555                         todo <- balanceTask{
556                                 blkid: blkid,
557                                 blk:   blk,
558                         }
559                 })
560                 close(todo)
561         }()
562         results := make(chan balanceResult, workers)
563         go func() {
564                 var wg sync.WaitGroup
565                 for i := 0; i < workers; i++ {
566                         wg.Add(1)
567                         go func() {
568                                 for work := range todo {
569                                         results <- bal.balanceBlock(work.blkid, work.blk)
570                                 }
571                                 wg.Done()
572                         }()
573                 }
574                 wg.Wait()
575                 close(results)
576         }()
577         bal.collectStatistics(results)
578 }
579
580 func (bal *Balancer) setupLookupTables() {
581         bal.serviceRoots = make(map[string]string)
582         bal.classes = defaultClasses
583         bal.mountsByClass = map[string]map[*KeepMount]bool{"default": {}}
584         bal.mounts = 0
585         for _, srv := range bal.KeepServices {
586                 bal.serviceRoots[srv.UUID] = srv.UUID
587                 for _, mnt := range srv.mounts {
588                         bal.mounts++
589
590                         if srv.ReadOnly {
591                                 // All mounts on a read-only service
592                                 // are effectively read-only.
593                                 mnt.AllowWrite = false
594                         }
595
596                         for class := range mnt.StorageClasses {
597                                 if mbc := bal.mountsByClass[class]; mbc == nil {
598                                         bal.classes = append(bal.classes, class)
599                                         bal.mountsByClass[class] = map[*KeepMount]bool{mnt: true}
600                                 } else {
601                                         mbc[mnt] = true
602                                 }
603                         }
604                 }
605         }
606         // Consider classes in lexicographic order to avoid flapping
607         // between balancing runs.  The outcome of the "prefer a mount
608         // we're already planning to use for a different storage
609         // class" case in balanceBlock depends on the order classes
610         // are considered.
611         sort.Strings(bal.classes)
612 }
613
614 const (
615         changeStay = iota
616         changePull
617         changeTrash
618         changeNone
619 )
620
621 var changeName = map[int]string{
622         changeStay:  "stay",
623         changePull:  "pull",
624         changeTrash: "trash",
625         changeNone:  "none",
626 }
627
628 type balancedBlockState struct {
629         needed       int
630         unneeded     int
631         pulling      int
632         unachievable bool
633 }
634
635 type balanceResult struct {
636         blk        *BlockState
637         blkid      arvados.SizedDigest
638         lost       bool
639         blockState balancedBlockState
640         classState map[string]balancedBlockState
641 }
642
643 type slot struct {
644         mnt  *KeepMount // never nil
645         repl *Replica   // replica already stored here (or nil)
646         want bool       // we should pull/leave a replica here
647 }
648
649 // balanceBlock compares current state to desired state for a single
650 // block, and makes the appropriate ChangeSet calls.
651 func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) balanceResult {
652         bal.Logger.Debugf("balanceBlock: %v %+v", blkid, blk)
653
654         // Build a list of all slots (one per mounted volume).
655         slots := make([]slot, 0, bal.mounts)
656         for _, srv := range bal.KeepServices {
657                 for _, mnt := range srv.mounts {
658                         var repl *Replica
659                         for r := range blk.Replicas {
660                                 if blk.Replicas[r].KeepMount == mnt {
661                                         repl = &blk.Replicas[r]
662                                 }
663                         }
664                         // Initial value of "want" is "have, and can't
665                         // delete". These untrashable replicas get
666                         // prioritized when sorting slots: otherwise,
667                         // non-optimal readonly copies would cause us
668                         // to overreplicate.
669                         slots = append(slots, slot{
670                                 mnt:  mnt,
671                                 repl: repl,
672                                 want: repl != nil && !mnt.AllowTrash,
673                         })
674                 }
675         }
676
677         uuids := keepclient.NewRootSorter(bal.serviceRoots, string(blkid[:32])).GetSortedRoots()
678         srvRendezvous := make(map[*KeepService]int, len(uuids))
679         for i, uuid := range uuids {
680                 srv := bal.KeepServices[uuid]
681                 srvRendezvous[srv] = i
682         }
683
684         // Below we set underreplicated=true if we find any storage
685         // class that's currently underreplicated -- in that case we
686         // won't want to trash any replicas.
687         underreplicated := false
688
689         unsafeToDelete := make(map[int64]bool, len(slots))
690         for _, class := range bal.classes {
691                 desired := blk.Desired[class]
692                 if desired == 0 {
693                         continue
694                 }
695
696                 // Sort the slots by desirability.
697                 sort.Slice(slots, func(i, j int) bool {
698                         si, sj := slots[i], slots[j]
699                         if classi, classj := bal.mountsByClass[class][si.mnt], bal.mountsByClass[class][sj.mnt]; classi != classj {
700                                 // Prefer a mount that satisfies the
701                                 // desired class.
702                                 return bal.mountsByClass[class][si.mnt]
703                         } else if si.want != sj.want {
704                                 // Prefer a mount that will have a
705                                 // replica no matter what we do here
706                                 // -- either because it already has an
707                                 // untrashable replica, or because we
708                                 // already need it to satisfy a
709                                 // different storage class.
710                                 return si.want
711                         } else if orderi, orderj := srvRendezvous[si.mnt.KeepService], srvRendezvous[sj.mnt.KeepService]; orderi != orderj {
712                                 // Prefer a better rendezvous
713                                 // position.
714                                 return orderi < orderj
715                         } else if repli, replj := si.repl != nil, sj.repl != nil; repli != replj {
716                                 // Prefer a mount that already has a
717                                 // replica.
718                                 return repli
719                         } else {
720                                 // If pull/trash turns out to be
721                                 // needed, distribute the
722                                 // new/remaining replicas uniformly
723                                 // across qualifying mounts on a given
724                                 // server.
725                                 return rendezvousLess(si.mnt.UUID, sj.mnt.UUID, blkid)
726                         }
727                 })
728
729                 // Servers/mounts/devices (with or without existing
730                 // replicas) that are part of the best achievable
731                 // layout for this storage class.
732                 wantSrv := map[*KeepService]bool{}
733                 wantMnt := map[*KeepMount]bool{}
734                 wantDev := map[string]bool{}
735                 // Positions (with existing replicas) that have been
736                 // protected (via unsafeToDelete) to ensure we don't
737                 // reduce replication below desired level when
738                 // trashing replicas that aren't optimal positions for
739                 // any storage class.
740                 protMnt := map[*KeepMount]bool{}
741                 // Replication planned so far (corresponds to wantMnt).
742                 replWant := 0
743                 // Protected replication (corresponds to protMnt).
744                 replProt := 0
745
746                 // trySlot tries using a slot to meet requirements,
747                 // and returns true if all requirements are met.
748                 trySlot := func(i int) bool {
749                         slot := slots[i]
750                         if wantMnt[slot.mnt] || wantDev[slot.mnt.UUID] {
751                                 // Already allocated a replica to this
752                                 // backend device, possibly on a
753                                 // different server.
754                                 return false
755                         }
756                         if replProt < desired && slot.repl != nil && !protMnt[slot.mnt] {
757                                 unsafeToDelete[slot.repl.Mtime] = true
758                                 protMnt[slot.mnt] = true
759                                 replProt += slot.mnt.Replication
760                         }
761                         if replWant < desired && (slot.repl != nil || slot.mnt.AllowWrite) {
762                                 slots[i].want = true
763                                 wantSrv[slot.mnt.KeepService] = true
764                                 wantMnt[slot.mnt] = true
765                                 wantDev[slot.mnt.UUID] = true
766                                 replWant += slot.mnt.Replication
767                         }
768                         return replProt >= desired && replWant >= desired
769                 }
770
771                 // First try to achieve desired replication without
772                 // using the same server twice.
773                 done := false
774                 for i := 0; i < len(slots) && !done; i++ {
775                         if !wantSrv[slots[i].mnt.KeepService] {
776                                 done = trySlot(i)
777                         }
778                 }
779
780                 // If that didn't suffice, do another pass without the
781                 // "distinct services" restriction. (Achieving the
782                 // desired volume replication on fewer than the
783                 // desired number of services is better than
784                 // underreplicating.)
785                 for i := 0; i < len(slots) && !done; i++ {
786                         done = trySlot(i)
787                 }
788
789                 if !underreplicated {
790                         safe := 0
791                         for _, slot := range slots {
792                                 if slot.repl == nil || !bal.mountsByClass[class][slot.mnt] {
793                                         continue
794                                 }
795                                 if safe += slot.mnt.Replication; safe >= desired {
796                                         break
797                                 }
798                         }
799                         underreplicated = safe < desired
800                 }
801
802                 // Avoid deleting wanted replicas from devices that
803                 // are mounted on multiple servers -- even if they
804                 // haven't already been added to unsafeToDelete
805                 // because the servers report different Mtimes.
806                 for _, slot := range slots {
807                         if slot.repl != nil && wantDev[slot.mnt.UUID] {
808                                 unsafeToDelete[slot.repl.Mtime] = true
809                         }
810                 }
811         }
812
813         // TODO: If multiple replicas are trashable, prefer the oldest
814         // replica that doesn't have a timestamp collision with
815         // others.
816
817         for i, slot := range slots {
818                 // Don't trash (1) any replicas of an underreplicated
819                 // block, even if they're in the wrong positions, or
820                 // (2) any replicas whose Mtimes are identical to
821                 // needed replicas (in case we're really seeing the
822                 // same copy via different mounts).
823                 if slot.repl != nil && (underreplicated || unsafeToDelete[slot.repl.Mtime]) {
824                         slots[i].want = true
825                 }
826         }
827
828         classState := make(map[string]balancedBlockState, len(bal.classes))
829         for _, class := range bal.classes {
830                 classState[class] = computeBlockState(slots, bal.mountsByClass[class], len(blk.Replicas), blk.Desired[class])
831         }
832         blockState := computeBlockState(slots, nil, len(blk.Replicas), 0)
833
834         // Sort the slots by rendezvous order. This ensures "trash the
835         // first of N replicas with identical timestamps" is
836         // predictable (helpful for testing) and well distributed
837         // across servers.
838         sort.Slice(slots, func(i, j int) bool {
839                 si, sj := slots[i], slots[j]
840                 if orderi, orderj := srvRendezvous[si.mnt.KeepService], srvRendezvous[sj.mnt.KeepService]; orderi != orderj {
841                         return orderi < orderj
842                 } else {
843                         return rendezvousLess(si.mnt.UUID, sj.mnt.UUID, blkid)
844                 }
845         })
846
847         var (
848                 lost         bool
849                 changes      []string
850                 trashedMtime = make(map[int64]bool, len(slots))
851         )
852         for _, slot := range slots {
853                 // TODO: request a Touch if Mtime is duplicated.
854                 var change int
855                 switch {
856                 case !slot.want && slot.repl != nil && slot.repl.Mtime < bal.MinMtime:
857                         if trashedMtime[slot.repl.Mtime] {
858                                 // Don't trash multiple replicas with
859                                 // identical timestamps. If they are
860                                 // multiple views of the same backing
861                                 // storage, asking both servers to
862                                 // trash is redundant and can cause
863                                 // races (see #20242). If they are
864                                 // distinct replicas that happen to
865                                 // have identical timestamps, we'll
866                                 // get this one on the next sweep.
867                                 change = changeNone
868                         } else {
869                                 slot.mnt.KeepService.AddTrash(Trash{
870                                         SizedDigest: blkid,
871                                         Mtime:       slot.repl.Mtime,
872                                         From:        slot.mnt,
873                                 })
874                                 change = changeTrash
875                                 trashedMtime[slot.repl.Mtime] = true
876                         }
877                 case slot.repl == nil && slot.want && len(blk.Replicas) == 0:
878                         lost = true
879                         change = changeNone
880                 case slot.repl == nil && slot.want && slot.mnt.AllowWrite:
881                         slot.mnt.KeepService.AddPull(Pull{
882                                 SizedDigest: blkid,
883                                 From:        blk.Replicas[0].KeepMount.KeepService,
884                                 To:          slot.mnt,
885                         })
886                         change = changePull
887                 case slot.repl != nil:
888                         change = changeStay
889                 default:
890                         change = changeNone
891                 }
892                 if bal.Dumper != nil {
893                         var mtime int64
894                         if slot.repl != nil {
895                                 mtime = slot.repl.Mtime
896                         }
897                         srv := slot.mnt.KeepService
898                         changes = append(changes, fmt.Sprintf("%s:%d/%s=%s,%d", srv.ServiceHost, srv.ServicePort, slot.mnt.UUID, changeName[change], mtime))
899                 }
900         }
901         if bal.Dumper != nil {
902                 bal.Dumper.Printf("%s refs=%d needed=%d unneeded=%d pulling=%v %v %v", blkid, blk.RefCount, blockState.needed, blockState.unneeded, blockState.pulling, blk.Desired, changes)
903         }
904         return balanceResult{
905                 blk:        blk,
906                 blkid:      blkid,
907                 lost:       lost,
908                 blockState: blockState,
909                 classState: classState,
910         }
911 }
912
913 func computeBlockState(slots []slot, onlyCount map[*KeepMount]bool, have, needRepl int) (bbs balancedBlockState) {
914         repl := 0
915         countedDev := map[string]bool{}
916         for _, slot := range slots {
917                 if onlyCount != nil && !onlyCount[slot.mnt] {
918                         continue
919                 }
920                 if countedDev[slot.mnt.UUID] {
921                         continue
922                 }
923                 switch {
924                 case slot.repl != nil && slot.want:
925                         bbs.needed++
926                         repl += slot.mnt.Replication
927                 case slot.repl != nil && !slot.want:
928                         bbs.unneeded++
929                         repl += slot.mnt.Replication
930                 case slot.repl == nil && slot.want && have > 0:
931                         bbs.pulling++
932                         repl += slot.mnt.Replication
933                 }
934                 countedDev[slot.mnt.UUID] = true
935         }
936         if repl < needRepl {
937                 bbs.unachievable = true
938         }
939         return
940 }
941
942 type blocksNBytes struct {
943         replicas int
944         blocks   int
945         bytes    int64
946 }
947
948 func (bb blocksNBytes) String() string {
949         return fmt.Sprintf("%d replicas (%d blocks, %d bytes)", bb.replicas, bb.blocks, bb.bytes)
950 }
951
952 type replicationStats struct {
953         needed       blocksNBytes
954         unneeded     blocksNBytes
955         pulling      blocksNBytes
956         unachievable blocksNBytes
957 }
958
959 type balancerStats struct {
960         lost          blocksNBytes
961         overrep       blocksNBytes
962         unref         blocksNBytes
963         garbage       blocksNBytes
964         underrep      blocksNBytes
965         unachievable  blocksNBytes
966         justright     blocksNBytes
967         desired       blocksNBytes
968         current       blocksNBytes
969         pulls         int
970         trashes       int
971         replHistogram []int
972         classStats    map[string]replicationStats
973
974         // collectionBytes / collectionBlockBytes = deduplication ratio
975         collectionBytes      int64 // sum(bytes in referenced blocks) across all collections
976         collectionBlockBytes int64 // sum(block size) across all blocks referenced by collections
977         collectionBlockRefs  int64 // sum(number of blocks referenced) across all collections
978         collectionBlocks     int64 // number of blocks referenced by any collection
979 }
980
981 func (s *balancerStats) dedupByteRatio() float64 {
982         if s.collectionBlockBytes == 0 {
983                 return 0
984         }
985         return float64(s.collectionBytes) / float64(s.collectionBlockBytes)
986 }
987
988 func (s *balancerStats) dedupBlockRatio() float64 {
989         if s.collectionBlocks == 0 {
990                 return 0
991         }
992         return float64(s.collectionBlockRefs) / float64(s.collectionBlocks)
993 }
994
995 func (bal *Balancer) collectStatistics(results <-chan balanceResult) {
996         var s balancerStats
997         s.replHistogram = make([]int, 2)
998         s.classStats = make(map[string]replicationStats, len(bal.classes))
999         for result := range results {
1000                 bytes := result.blkid.Size()
1001
1002                 if rc := int64(result.blk.RefCount); rc > 0 {
1003                         s.collectionBytes += rc * bytes
1004                         s.collectionBlockBytes += bytes
1005                         s.collectionBlockRefs += rc
1006                         s.collectionBlocks++
1007                 }
1008
1009                 for class, state := range result.classState {
1010                         cs := s.classStats[class]
1011                         if state.unachievable {
1012                                 cs.unachievable.replicas++
1013                                 cs.unachievable.blocks++
1014                                 cs.unachievable.bytes += bytes
1015                         }
1016                         if state.needed > 0 {
1017                                 cs.needed.replicas += state.needed
1018                                 cs.needed.blocks++
1019                                 cs.needed.bytes += bytes * int64(state.needed)
1020                         }
1021                         if state.unneeded > 0 {
1022                                 cs.unneeded.replicas += state.unneeded
1023                                 cs.unneeded.blocks++
1024                                 cs.unneeded.bytes += bytes * int64(state.unneeded)
1025                         }
1026                         if state.pulling > 0 {
1027                                 cs.pulling.replicas += state.pulling
1028                                 cs.pulling.blocks++
1029                                 cs.pulling.bytes += bytes * int64(state.pulling)
1030                         }
1031                         s.classStats[class] = cs
1032                 }
1033
1034                 bs := result.blockState
1035                 switch {
1036                 case result.lost:
1037                         s.lost.replicas++
1038                         s.lost.blocks++
1039                         s.lost.bytes += bytes
1040                         fmt.Fprintf(bal.lostBlocks, "%s", strings.SplitN(string(result.blkid), "+", 2)[0])
1041                         for pdh := range result.blk.Refs {
1042                                 fmt.Fprintf(bal.lostBlocks, " %s", pdh)
1043                         }
1044                         fmt.Fprint(bal.lostBlocks, "\n")
1045                 case bs.pulling > 0:
1046                         s.underrep.replicas += bs.pulling
1047                         s.underrep.blocks++
1048                         s.underrep.bytes += bytes * int64(bs.pulling)
1049                 case bs.unachievable:
1050                         s.underrep.replicas++
1051                         s.underrep.blocks++
1052                         s.underrep.bytes += bytes
1053                 case bs.unneeded > 0 && bs.needed == 0:
1054                         // Count as "garbage" if all replicas are old
1055                         // enough to trash, otherwise count as
1056                         // "unref".
1057                         counter := &s.garbage
1058                         for _, r := range result.blk.Replicas {
1059                                 if r.Mtime >= bal.MinMtime {
1060                                         counter = &s.unref
1061                                         break
1062                                 }
1063                         }
1064                         counter.replicas += bs.unneeded
1065                         counter.blocks++
1066                         counter.bytes += bytes * int64(bs.unneeded)
1067                 case bs.unneeded > 0:
1068                         s.overrep.replicas += bs.unneeded
1069                         s.overrep.blocks++
1070                         s.overrep.bytes += bytes * int64(bs.unneeded)
1071                 default:
1072                         s.justright.replicas += bs.needed
1073                         s.justright.blocks++
1074                         s.justright.bytes += bytes * int64(bs.needed)
1075                 }
1076
1077                 if bs.needed > 0 {
1078                         s.desired.replicas += bs.needed
1079                         s.desired.blocks++
1080                         s.desired.bytes += bytes * int64(bs.needed)
1081                 }
1082                 if bs.needed+bs.unneeded > 0 {
1083                         s.current.replicas += bs.needed + bs.unneeded
1084                         s.current.blocks++
1085                         s.current.bytes += bytes * int64(bs.needed+bs.unneeded)
1086                 }
1087
1088                 for len(s.replHistogram) <= bs.needed+bs.unneeded {
1089                         s.replHistogram = append(s.replHistogram, 0)
1090                 }
1091                 s.replHistogram[bs.needed+bs.unneeded]++
1092         }
1093         for _, srv := range bal.KeepServices {
1094                 s.pulls += len(srv.ChangeSet.Pulls)
1095                 s.trashes += len(srv.ChangeSet.Trashes)
1096         }
1097         bal.stats = s
1098         bal.Metrics.UpdateStats(s)
1099 }
1100
1101 // PrintStatistics writes statistics about the computed changes to
1102 // bal.Logger. It should not be called until ComputeChangeSets has
1103 // finished.
1104 func (bal *Balancer) PrintStatistics() {
1105         bal.logf("===")
1106         bal.logf("%s lost (0=have<want)", bal.stats.lost)
1107         bal.logf("%s underreplicated (0<have<want)", bal.stats.underrep)
1108         bal.logf("%s just right (have=want)", bal.stats.justright)
1109         bal.logf("%s overreplicated (have>want>0)", bal.stats.overrep)
1110         bal.logf("%s unreferenced (have>want=0, new)", bal.stats.unref)
1111         bal.logf("%s garbage (have>want=0, old)", bal.stats.garbage)
1112         for _, class := range bal.classes {
1113                 cs := bal.stats.classStats[class]
1114                 bal.logf("===")
1115                 bal.logf("storage class %q: %s needed", class, cs.needed)
1116                 bal.logf("storage class %q: %s unneeded", class, cs.unneeded)
1117                 bal.logf("storage class %q: %s pulling", class, cs.pulling)
1118                 bal.logf("storage class %q: %s unachievable", class, cs.unachievable)
1119         }
1120         bal.logf("===")
1121         bal.logf("%s total commitment (excluding unreferenced)", bal.stats.desired)
1122         bal.logf("%s total usage", bal.stats.current)
1123         bal.logf("===")
1124         for _, srv := range bal.KeepServices {
1125                 bal.logf("%s: %v\n", srv, srv.ChangeSet)
1126         }
1127         bal.logf("===")
1128         bal.printHistogram(60)
1129         bal.logf("===")
1130 }
1131
1132 func (bal *Balancer) printHistogram(hashColumns int) {
1133         bal.logf("Replication level distribution:")
1134         maxCount := 0
1135         for _, count := range bal.stats.replHistogram {
1136                 if maxCount < count {
1137                         maxCount = count
1138                 }
1139         }
1140         hashes := strings.Repeat("#", hashColumns)
1141         countWidth := 1 + int(math.Log10(float64(maxCount+1)))
1142         scaleCount := 10 * float64(hashColumns) / math.Floor(1+10*math.Log10(float64(maxCount+1)))
1143         for repl, count := range bal.stats.replHistogram {
1144                 nHashes := int(scaleCount * math.Log10(float64(count+1)))
1145                 bal.logf("%2d: %*d %s", repl, countWidth, count, hashes[:nHashes])
1146         }
1147 }
1148
1149 // CheckSanityLate checks for configuration and runtime errors after
1150 // GetCurrentState() and ComputeChangeSets() have finished.
1151 //
1152 // If it returns an error, it is dangerous to run any Commit methods.
1153 func (bal *Balancer) CheckSanityLate() error {
1154         if bal.errors != nil {
1155                 for _, err := range bal.errors {
1156                         bal.logf("deferred error: %v", err)
1157                 }
1158                 return fmt.Errorf("cannot proceed safely after deferred errors")
1159         }
1160
1161         if bal.collScanned == 0 {
1162                 return fmt.Errorf("received zero collections")
1163         }
1164
1165         anyDesired := false
1166         bal.BlockStateMap.Apply(func(_ arvados.SizedDigest, blk *BlockState) {
1167                 for _, desired := range blk.Desired {
1168                         if desired > 0 {
1169                                 anyDesired = true
1170                                 break
1171                         }
1172                 }
1173         })
1174         if !anyDesired {
1175                 return fmt.Errorf("zero blocks have desired replication>0")
1176         }
1177
1178         if dr := bal.DefaultReplication; dr < 1 {
1179                 return fmt.Errorf("Default replication (%d) is less than 1", dr)
1180         }
1181
1182         // TODO: no two services have identical indexes
1183         // TODO: no collisions (same md5, different size)
1184         return nil
1185 }
1186
1187 // CommitPulls sends the computed lists of pull requests to the
1188 // keepstore servers. This has the effect of increasing replication of
1189 // existing blocks that are either underreplicated or poorly
1190 // distributed according to rendezvous hashing.
1191 func (bal *Balancer) CommitPulls(ctx context.Context, c *arvados.Client) error {
1192         defer bal.time("send_pull_lists", "wall clock time to send pull lists")()
1193         return bal.commitAsync(c, "send pull list",
1194                 func(srv *KeepService) error {
1195                         return srv.CommitPulls(ctx, c)
1196                 })
1197 }
1198
1199 // CommitTrash sends the computed lists of trash requests to the
1200 // keepstore servers. This has the effect of deleting blocks that are
1201 // overreplicated or unreferenced.
1202 func (bal *Balancer) CommitTrash(ctx context.Context, c *arvados.Client) error {
1203         defer bal.time("send_trash_lists", "wall clock time to send trash lists")()
1204         return bal.commitAsync(c, "send trash list",
1205                 func(srv *KeepService) error {
1206                         return srv.CommitTrash(ctx, c)
1207                 })
1208 }
1209
1210 func (bal *Balancer) commitAsync(c *arvados.Client, label string, f func(srv *KeepService) error) error {
1211         errs := make(chan error)
1212         for _, srv := range bal.KeepServices {
1213                 go func(srv *KeepService) {
1214                         var err error
1215                         defer func() { errs <- err }()
1216                         label := fmt.Sprintf("%s: %v", srv, label)
1217                         err = f(srv)
1218                         if err != nil {
1219                                 err = fmt.Errorf("%s: %v", label, err)
1220                         }
1221                 }(srv)
1222         }
1223         var lastErr error
1224         for range bal.KeepServices {
1225                 if err := <-errs; err != nil {
1226                         bal.logf("%v", err)
1227                         lastErr = err
1228                 }
1229         }
1230         close(errs)
1231         return lastErr
1232 }
1233
1234 func (bal *Balancer) logf(f string, args ...interface{}) {
1235         if bal.Logger != nil {
1236                 bal.Logger.Printf(f, args...)
1237         }
1238 }
1239
1240 func (bal *Balancer) time(name, help string) func() {
1241         observer := bal.Metrics.DurationObserver(name+"_seconds", help)
1242         t0 := time.Now()
1243         bal.Logger.Printf("%s: start", name)
1244         return func() {
1245                 dur := time.Since(t0)
1246                 observer.Observe(dur.Seconds())
1247                 bal.Logger.Printf("%s: took %vs", name, dur.Seconds())
1248         }
1249 }
1250
1251 // Log current memory usage: once now, at least once every 10 minutes,
1252 // and when memory grows by 40% since the last log. Stop when ctx is
1253 // canceled.
1254 func (bal *Balancer) reportMemorySize(ctx context.Context) {
1255         buf, _ := os.ReadFile("/proc/self/smaps")
1256         m := regexp.MustCompile(`\nKernelPageSize:\s*(\d+) kB\n`).FindSubmatch(buf)
1257         var pagesize int64
1258         if len(m) == 2 {
1259                 pagesize, _ = strconv.ParseInt(string(m[1]), 10, 64)
1260                 pagesize <<= 10
1261         }
1262         if pagesize == 0 {
1263                 bal.logf("cannot log OS-reported memory size: failed to parse KernelPageSize from /proc/self/smaps")
1264         }
1265         osstats := func() string {
1266                 if pagesize == 0 {
1267                         return ""
1268                 }
1269                 buf, _ := os.ReadFile("/proc/self/statm")
1270                 fields := strings.Split(string(buf), " ")
1271                 if len(fields) < 2 {
1272                         return ""
1273                 }
1274                 virt, _ := strconv.ParseInt(fields[0], 10, 64)
1275                 virt *= pagesize
1276                 res, _ := strconv.ParseInt(fields[1], 10, 64)
1277                 res *= pagesize
1278                 if virt == 0 || res == 0 {
1279                         return ""
1280                 }
1281                 return fmt.Sprintf(" virt %d res %d", virt, res)
1282         }
1283
1284         var nextTime time.Time
1285         var nextMem uint64
1286         const maxInterval = time.Minute * 10
1287         const maxIncrease = 1.4
1288
1289         ticker := time.NewTicker(time.Second)
1290         defer ticker.Stop()
1291         var memstats runtime.MemStats
1292         for ctx.Err() == nil {
1293                 now := time.Now()
1294                 runtime.ReadMemStats(&memstats)
1295                 mem := memstats.StackInuse + memstats.HeapInuse
1296                 if now.After(nextTime) || mem >= nextMem {
1297                         bal.logf("heap %d stack %d heapalloc %d%s", memstats.HeapInuse, memstats.StackInuse, memstats.HeapAlloc, osstats())
1298                         nextMem = uint64(float64(mem) * maxIncrease)
1299                         nextTime = now.Add(maxInterval)
1300                 }
1301                 <-ticker.C
1302         }
1303 }
1304
1305 // Rendezvous hash sort function. Less efficient than sorting on
1306 // precomputed rendezvous hashes, but also rarely used.
1307 func rendezvousLess(i, j string, blkid arvados.SizedDigest) bool {
1308         a := md5.Sum([]byte(string(blkid[:32]) + i))
1309         b := md5.Sum([]byte(string(blkid[:32]) + j))
1310         return bytes.Compare(a[:], b[:]) < 0
1311 }