20602: Use stdlib heap implementation.
[arvados.git] / services / keep-balance / balance.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package keepbalance
6
7 import (
8         "bytes"
9         "context"
10         "crypto/md5"
11         "errors"
12         "fmt"
13         "io"
14         "io/ioutil"
15         "log"
16         "math"
17         "os"
18         "regexp"
19         "runtime"
20         "sort"
21         "strconv"
22         "strings"
23         "sync"
24         "sync/atomic"
25         "syscall"
26         "time"
27
28         "git.arvados.org/arvados.git/lib/controller/dblock"
29         "git.arvados.org/arvados.git/sdk/go/arvados"
30         "git.arvados.org/arvados.git/sdk/go/keepclient"
31         "github.com/jmoiron/sqlx"
32         "github.com/sirupsen/logrus"
33 )
34
35 // Balancer compares the contents of keepstore servers with the
36 // collections stored in Arvados, and issues pull/trash requests
37 // needed to get (closer to) the optimal data layout.
38 //
39 // In the optimal data layout: every data block referenced by a
40 // collection is replicated at least as many times as desired by the
41 // collection; there are no unreferenced data blocks older than
42 // BlobSignatureTTL; and all N existing replicas of a given data block
43 // are in the N best positions in rendezvous probe order.
44 type Balancer struct {
45         DB      *sqlx.DB
46         Logger  logrus.FieldLogger
47         Dumper  logrus.FieldLogger
48         Metrics *metrics
49
50         ChunkPrefix    string
51         LostBlocksFile string
52
53         *BlockStateMap
54         KeepServices       map[string]*KeepService
55         DefaultReplication int
56         MinMtime           int64
57
58         classes       []string
59         mounts        int
60         mountsByClass map[string]map[*KeepMount]bool
61         collScanned   int64
62         serviceRoots  map[string]string
63         errors        []error
64         stats         balancerStats
65         mutex         sync.Mutex
66         lostBlocks    io.Writer
67 }
68
69 // Run performs a balance operation using the given config and
70 // runOptions, and returns RunOptions suitable for passing to a
71 // subsequent balance operation.
72 //
73 // Run should only be called once on a given Balancer object.
74 func (bal *Balancer) Run(ctx context.Context, client *arvados.Client, cluster *arvados.Cluster, runOptions RunOptions) (nextRunOptions RunOptions, err error) {
75         nextRunOptions = runOptions
76
77         bal.logf("acquiring active lock")
78         if !dblock.KeepBalanceActive.Lock(ctx, func(context.Context) (*sqlx.DB, error) { return bal.DB, nil }) {
79                 // context canceled
80                 return
81         }
82         defer dblock.KeepBalanceActive.Unlock()
83
84         defer bal.time("sweep", "wall clock time to run one full sweep")()
85
86         ctx, cancel := context.WithDeadline(ctx, time.Now().Add(cluster.Collections.BalanceTimeout.Duration()))
87         defer cancel()
88
89         go bal.reportMemorySize(ctx)
90
91         var lbFile *os.File
92         if bal.LostBlocksFile != "" {
93                 tmpfn := bal.LostBlocksFile + ".tmp"
94                 lbFile, err = os.OpenFile(tmpfn, os.O_CREATE|os.O_WRONLY, 0777)
95                 if err != nil {
96                         return
97                 }
98                 defer lbFile.Close()
99                 err = syscall.Flock(int(lbFile.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
100                 if err != nil {
101                         return
102                 }
103                 defer func() {
104                         // Remove the tempfile only if we didn't get
105                         // as far as successfully renaming it.
106                         if lbFile != nil {
107                                 os.Remove(tmpfn)
108                         }
109                 }()
110                 bal.lostBlocks = lbFile
111         } else {
112                 bal.lostBlocks = ioutil.Discard
113         }
114
115         err = bal.DiscoverKeepServices(client)
116         if err != nil {
117                 return
118         }
119
120         for _, srv := range bal.KeepServices {
121                 err = srv.discoverMounts(client)
122                 if err != nil {
123                         return
124                 }
125         }
126         bal.cleanupMounts()
127
128         if err = bal.CheckSanityEarly(client); err != nil {
129                 return
130         }
131
132         // On a big site, indexing and sending trash/pull lists can
133         // take much longer than the usual 5 minute client
134         // timeout. From here on, we rely on the context deadline
135         // instead, aborting the entire operation if any part takes
136         // too long.
137         client.Timeout = 0
138
139         rs := bal.rendezvousState()
140         if runOptions.CommitTrash && rs != runOptions.SafeRendezvousState {
141                 if runOptions.SafeRendezvousState != "" {
142                         bal.logf("notice: KeepServices list has changed since last run")
143                 }
144                 bal.logf("clearing existing trash lists, in case the new rendezvous order differs from previous run")
145                 if err = bal.ClearTrashLists(ctx, client); err != nil {
146                         return
147                 }
148                 // The current rendezvous state becomes "safe" (i.e.,
149                 // OK to compute changes for that state without
150                 // clearing existing trash lists) only now, after we
151                 // succeed in clearing existing trash lists.
152                 nextRunOptions.SafeRendezvousState = rs
153         }
154
155         if err = bal.GetCurrentState(ctx, client, cluster.Collections.BalanceCollectionBatch, cluster.Collections.BalanceCollectionBuffers); err != nil {
156                 return
157         }
158         bal.ComputeChangeSets()
159         bal.PrintStatistics()
160         if err = bal.CheckSanityLate(); err != nil {
161                 return
162         }
163         if lbFile != nil {
164                 err = lbFile.Sync()
165                 if err != nil {
166                         return
167                 }
168                 err = os.Rename(bal.LostBlocksFile+".tmp", bal.LostBlocksFile)
169                 if err != nil {
170                         return
171                 }
172                 lbFile = nil
173         }
174         if runOptions.CommitPulls {
175                 err = bal.CommitPulls(ctx, client)
176                 if err != nil {
177                         // Skip trash if we can't pull. (Too cautious?)
178                         return
179                 }
180         }
181         if runOptions.CommitTrash {
182                 err = bal.CommitTrash(ctx, client)
183                 if err != nil {
184                         return
185                 }
186         }
187         if runOptions.CommitConfirmedFields {
188                 err = bal.updateCollections(ctx, client, cluster)
189                 if err != nil {
190                         return
191                 }
192         }
193         return
194 }
195
196 // SetKeepServices sets the list of KeepServices to operate on.
197 func (bal *Balancer) SetKeepServices(srvList arvados.KeepServiceList) error {
198         bal.KeepServices = make(map[string]*KeepService)
199         for _, srv := range srvList.Items {
200                 bal.KeepServices[srv.UUID] = &KeepService{
201                         KeepService: srv,
202                         ChangeSet:   &ChangeSet{},
203                 }
204         }
205         return nil
206 }
207
208 // DiscoverKeepServices sets the list of KeepServices by calling the
209 // API to get a list of all services, and selecting the ones whose
210 // ServiceType is "disk"
211 func (bal *Balancer) DiscoverKeepServices(c *arvados.Client) error {
212         bal.KeepServices = make(map[string]*KeepService)
213         return c.EachKeepService(func(srv arvados.KeepService) error {
214                 if srv.ServiceType == "disk" {
215                         bal.KeepServices[srv.UUID] = &KeepService{
216                                 KeepService: srv,
217                                 ChangeSet:   &ChangeSet{},
218                         }
219                 } else {
220                         bal.logf("skipping %v with service type %q", srv.UUID, srv.ServiceType)
221                 }
222                 return nil
223         })
224 }
225
226 func (bal *Balancer) cleanupMounts() {
227         rwdev := map[string]*KeepService{}
228         for _, srv := range bal.KeepServices {
229                 for _, mnt := range srv.mounts {
230                         if !mnt.ReadOnly {
231                                 rwdev[mnt.UUID] = srv
232                         }
233                 }
234         }
235         // Drop the readonly mounts whose device is mounted RW
236         // elsewhere.
237         for _, srv := range bal.KeepServices {
238                 var dedup []*KeepMount
239                 for _, mnt := range srv.mounts {
240                         if mnt.ReadOnly && rwdev[mnt.UUID] != nil {
241                                 bal.logf("skipping srv %s readonly mount %q because same volume is mounted read-write on srv %s", srv, mnt.UUID, rwdev[mnt.UUID])
242                         } else {
243                                 dedup = append(dedup, mnt)
244                         }
245                 }
246                 srv.mounts = dedup
247         }
248         for _, srv := range bal.KeepServices {
249                 for _, mnt := range srv.mounts {
250                         if mnt.Replication <= 0 {
251                                 log.Printf("%s: mount %s reports replication=%d, using replication=1", srv, mnt.UUID, mnt.Replication)
252                                 mnt.Replication = 1
253                         }
254                 }
255         }
256 }
257
258 // CheckSanityEarly checks for configuration and runtime errors that
259 // can be detected before GetCurrentState() and ComputeChangeSets()
260 // are called.
261 //
262 // If it returns an error, it is pointless to run GetCurrentState or
263 // ComputeChangeSets: after doing so, the statistics would be
264 // meaningless and it would be dangerous to run any Commit methods.
265 func (bal *Balancer) CheckSanityEarly(c *arvados.Client) error {
266         u, err := c.CurrentUser()
267         if err != nil {
268                 return fmt.Errorf("CurrentUser(): %v", err)
269         }
270         if !u.IsActive || !u.IsAdmin {
271                 return fmt.Errorf("current user (%s) is not an active admin user", u.UUID)
272         }
273         for _, srv := range bal.KeepServices {
274                 if srv.ServiceType == "proxy" {
275                         return fmt.Errorf("config error: %s: proxy servers cannot be balanced", srv)
276                 }
277         }
278         for _, c := range bal.ChunkPrefix {
279                 if !strings.ContainsRune("0123456789abcdef", c) {
280                         return fmt.Errorf("invalid char %q in chunk prefix %q: only lowercase hex digits make sense", string(c), bal.ChunkPrefix)
281                 }
282         }
283         if len(bal.ChunkPrefix) > 32 {
284                 return fmt.Errorf("invalid chunk prefix %q: longer than a block hash", bal.ChunkPrefix)
285         }
286
287         mountProblem := false
288         type deviceMount struct {
289                 srv *KeepService
290                 mnt *KeepMount
291         }
292         deviceMounted := map[string]deviceMount{} // DeviceID -> mount
293         for _, srv := range bal.KeepServices {
294                 for _, mnt := range srv.mounts {
295                         if first, dup := deviceMounted[mnt.DeviceID]; dup && first.mnt.UUID != mnt.UUID && mnt.DeviceID != "" {
296                                 bal.logf("config error: device %s is mounted with multiple volume UUIDs: %s on %s, and %s on %s",
297                                         mnt.DeviceID,
298                                         first.mnt.UUID, first.srv,
299                                         mnt.UUID, srv)
300                                 mountProblem = true
301                                 continue
302                         }
303                         deviceMounted[mnt.DeviceID] = deviceMount{srv, mnt}
304                 }
305         }
306         if mountProblem {
307                 return errors.New("cannot continue with config errors (see above)")
308         }
309
310         var checkPage arvados.CollectionList
311         if err = c.RequestAndDecode(&checkPage, "GET", "arvados/v1/collections", nil, arvados.ResourceListParams{
312                 Limit:              new(int),
313                 Count:              "exact",
314                 IncludeTrash:       true,
315                 IncludeOldVersions: true,
316                 Filters: []arvados.Filter{{
317                         Attr:     "modified_at",
318                         Operator: "=",
319                         Operand:  nil,
320                 }},
321         }); err != nil {
322                 return err
323         } else if n := checkPage.ItemsAvailable; n > 0 {
324                 return fmt.Errorf("%d collections exist with null modified_at; cannot fetch reliably", n)
325         }
326
327         return nil
328 }
329
330 // rendezvousState returns a fingerprint (e.g., a sorted list of
331 // UUID+host+port) of the current set of keep services.
332 func (bal *Balancer) rendezvousState() string {
333         srvs := make([]string, 0, len(bal.KeepServices))
334         for _, srv := range bal.KeepServices {
335                 srvs = append(srvs, srv.String())
336         }
337         sort.Strings(srvs)
338         return strings.Join(srvs, "; ")
339 }
340
341 // ClearTrashLists sends an empty trash list to each keep
342 // service. Calling this before GetCurrentState avoids races.
343 //
344 // When a block appears in an index, we assume that replica will still
345 // exist after we delete other replicas on other servers. However,
346 // it's possible that a previous rebalancing operation made different
347 // decisions (e.g., servers were added/removed, and rendezvous order
348 // changed). In this case, the replica might already be on that
349 // server's trash list, and it might be deleted before we send a
350 // replacement trash list.
351 //
352 // We avoid this problem if we clear all trash lists before getting
353 // indexes. (We also assume there is only one rebalancing process
354 // running at a time.)
355 func (bal *Balancer) ClearTrashLists(ctx context.Context, c *arvados.Client) error {
356         for _, srv := range bal.KeepServices {
357                 srv.ChangeSet = &ChangeSet{}
358         }
359         return bal.CommitTrash(ctx, c)
360 }
361
362 // GetCurrentState determines the current replication state, and the
363 // desired replication level, for every block that is either
364 // retrievable or referenced.
365 //
366 // It determines the current replication state by reading the block index
367 // from every known Keep service.
368 //
369 // It determines the desired replication level by retrieving all
370 // collection manifests in the database (API server).
371 //
372 // It encodes the resulting information in BlockStateMap.
373 func (bal *Balancer) GetCurrentState(ctx context.Context, c *arvados.Client, pageSize, bufs int) error {
374         ctx, cancel := context.WithCancel(ctx)
375         defer cancel()
376
377         defer bal.time("get_state", "wall clock time to get current state")()
378         bal.BlockStateMap = NewBlockStateMap()
379
380         dd, err := c.DiscoveryDocument()
381         if err != nil {
382                 return err
383         }
384         bal.DefaultReplication = dd.DefaultCollectionReplication
385         bal.MinMtime = time.Now().UnixNano() - dd.BlobSignatureTTL*1e9
386
387         errs := make(chan error, 1)
388         wg := sync.WaitGroup{}
389
390         // When a device is mounted more than once, we will get its
391         // index only once, and call AddReplicas on all of the mounts.
392         // equivMount keys are the mounts that will be indexed, and
393         // each value is a list of mounts to apply the received index
394         // to.
395         equivMount := map[*KeepMount][]*KeepMount{}
396         // deviceMount maps each device ID to the one mount that will
397         // be indexed for that device.
398         deviceMount := map[string]*KeepMount{}
399         for _, srv := range bal.KeepServices {
400                 for _, mnt := range srv.mounts {
401                         equiv := deviceMount[mnt.UUID]
402                         if equiv == nil {
403                                 equiv = mnt
404                                 deviceMount[mnt.UUID] = equiv
405                         }
406                         equivMount[equiv] = append(equivMount[equiv], mnt)
407                 }
408         }
409
410         // Start one goroutine for each (non-redundant) mount:
411         // retrieve the index, and add the returned blocks to
412         // BlockStateMap.
413         for _, mounts := range equivMount {
414                 wg.Add(1)
415                 go func(mounts []*KeepMount) {
416                         defer wg.Done()
417                         bal.logf("mount %s: retrieve index from %s", mounts[0], mounts[0].KeepService)
418                         idx, err := mounts[0].KeepService.IndexMount(ctx, c, mounts[0].UUID, bal.ChunkPrefix)
419                         if err != nil {
420                                 select {
421                                 case errs <- fmt.Errorf("%s: retrieve index: %v", mounts[0], err):
422                                 default:
423                                 }
424                                 cancel()
425                                 return
426                         }
427                         if len(errs) > 0 {
428                                 // Some other goroutine encountered an
429                                 // error -- any further effort here
430                                 // will be wasted.
431                                 return
432                         }
433                         for _, mount := range mounts {
434                                 bal.logf("%s: add %d entries to map", mount, len(idx))
435                                 bal.BlockStateMap.AddReplicas(mount, idx)
436                                 bal.logf("%s: added %d entries to map at %dx (%d replicas)", mount, len(idx), mount.Replication, len(idx)*mount.Replication)
437                         }
438                         bal.logf("mount %s: index done", mounts[0])
439                 }(mounts)
440         }
441
442         collQ := make(chan arvados.Collection, bufs)
443
444         // Retrieve all collections from the database and send them to
445         // collQ.
446         wg.Add(1)
447         go func() {
448                 defer wg.Done()
449                 err = EachCollection(ctx, bal.DB, c,
450                         func(coll arvados.Collection) error {
451                                 collQ <- coll
452                                 if len(errs) > 0 {
453                                         // some other GetCurrentState
454                                         // error happened: no point
455                                         // getting any more
456                                         // collections.
457                                         return fmt.Errorf("")
458                                 }
459                                 return nil
460                         }, func(done, total int) {
461                                 bal.logf("collections: %d/%d", done, total)
462                         })
463                 close(collQ)
464                 if err != nil {
465                         select {
466                         case errs <- err:
467                         default:
468                         }
469                         cancel()
470                 }
471         }()
472
473         // Parse manifests from collQ and pass the block hashes to
474         // BlockStateMap to track desired replication.
475         for i := 0; i < runtime.NumCPU(); i++ {
476                 wg.Add(1)
477                 go func() {
478                         defer wg.Done()
479                         for coll := range collQ {
480                                 err := bal.addCollection(coll)
481                                 if err != nil || len(errs) > 0 {
482                                         select {
483                                         case errs <- err:
484                                         default:
485                                         }
486                                         cancel()
487                                         continue
488                                 }
489                                 atomic.AddInt64(&bal.collScanned, 1)
490                         }
491                 }()
492         }
493
494         wg.Wait()
495         if len(errs) > 0 {
496                 return <-errs
497         }
498         return nil
499 }
500
501 func (bal *Balancer) addCollection(coll arvados.Collection) error {
502         blkids, err := coll.SizedDigests()
503         if err != nil {
504                 return fmt.Errorf("%v: %v", coll.UUID, err)
505         }
506         repl := bal.DefaultReplication
507         if coll.ReplicationDesired != nil {
508                 repl = *coll.ReplicationDesired
509         }
510         if bal.ChunkPrefix != "" {
511                 // Throw out blocks that don't match the requested
512                 // prefix.  (We save a bit of GC work here by
513                 // preallocating based on each hex digit in
514                 // ChunkPrefix reducing the expected size of the
515                 // filtered set by ~16x.)
516                 filtered := make([]arvados.SizedDigest, 0, len(blkids)>>(4*len(bal.ChunkPrefix)-1))
517                 for _, blkid := range blkids {
518                         if strings.HasPrefix(string(blkid), bal.ChunkPrefix) {
519                                 filtered = append(filtered, blkid)
520                         }
521                 }
522                 blkids = filtered
523         }
524         bal.Logger.Debugf("%v: %d blocks x%d", coll.UUID, len(blkids), repl)
525         // Pass pdh to IncreaseDesired only if LostBlocksFile is being
526         // written -- otherwise it's just a waste of memory.
527         pdh := ""
528         if bal.LostBlocksFile != "" {
529                 pdh = coll.PortableDataHash
530         }
531         bal.BlockStateMap.IncreaseDesired(pdh, coll.StorageClassesDesired, repl, blkids)
532         return nil
533 }
534
535 // ComputeChangeSets compares, for each known block, the current and
536 // desired replication states. If it is possible to get closer to the
537 // desired state by copying or deleting blocks, it adds those changes
538 // to the relevant KeepServices' ChangeSets.
539 //
540 // It does not actually apply any of the computed changes.
541 func (bal *Balancer) ComputeChangeSets() {
542         // This just calls balanceBlock() once for each block, using a
543         // pool of worker goroutines.
544         defer bal.time("changeset_compute", "wall clock time to compute changesets")()
545         bal.setupLookupTables()
546
547         type balanceTask struct {
548                 blkid arvados.SizedDigest
549                 blk   *BlockState
550         }
551         workers := runtime.GOMAXPROCS(-1)
552         todo := make(chan balanceTask, workers)
553         go func() {
554                 bal.BlockStateMap.Apply(func(blkid arvados.SizedDigest, blk *BlockState) {
555                         todo <- balanceTask{
556                                 blkid: blkid,
557                                 blk:   blk,
558                         }
559                 })
560                 close(todo)
561         }()
562         results := make(chan balanceResult, workers)
563         go func() {
564                 var wg sync.WaitGroup
565                 for i := 0; i < workers; i++ {
566                         wg.Add(1)
567                         go func() {
568                                 for work := range todo {
569                                         results <- bal.balanceBlock(work.blkid, work.blk)
570                                 }
571                                 wg.Done()
572                         }()
573                 }
574                 wg.Wait()
575                 close(results)
576         }()
577         bal.collectStatistics(results)
578 }
579
580 func (bal *Balancer) setupLookupTables() {
581         bal.serviceRoots = make(map[string]string)
582         bal.classes = defaultClasses
583         bal.mountsByClass = map[string]map[*KeepMount]bool{"default": {}}
584         bal.mounts = 0
585         for _, srv := range bal.KeepServices {
586                 bal.serviceRoots[srv.UUID] = srv.UUID
587                 for _, mnt := range srv.mounts {
588                         bal.mounts++
589
590                         // All mounts on a read-only service are
591                         // effectively read-only.
592                         mnt.ReadOnly = mnt.ReadOnly || srv.ReadOnly
593
594                         for class := range mnt.StorageClasses {
595                                 if mbc := bal.mountsByClass[class]; mbc == nil {
596                                         bal.classes = append(bal.classes, class)
597                                         bal.mountsByClass[class] = map[*KeepMount]bool{mnt: true}
598                                 } else {
599                                         mbc[mnt] = true
600                                 }
601                         }
602                 }
603         }
604         // Consider classes in lexicographic order to avoid flapping
605         // between balancing runs.  The outcome of the "prefer a mount
606         // we're already planning to use for a different storage
607         // class" case in balanceBlock depends on the order classes
608         // are considered.
609         sort.Strings(bal.classes)
610 }
611
612 const (
613         changeStay = iota
614         changePull
615         changeTrash
616         changeNone
617 )
618
619 var changeName = map[int]string{
620         changeStay:  "stay",
621         changePull:  "pull",
622         changeTrash: "trash",
623         changeNone:  "none",
624 }
625
626 type balancedBlockState struct {
627         needed       int
628         unneeded     int
629         pulling      int
630         unachievable bool
631 }
632
633 type balanceResult struct {
634         blk        *BlockState
635         blkid      arvados.SizedDigest
636         lost       bool
637         blockState balancedBlockState
638         classState map[string]balancedBlockState
639 }
640
641 type slot struct {
642         mnt  *KeepMount // never nil
643         repl *Replica   // replica already stored here (or nil)
644         want bool       // we should pull/leave a replica here
645 }
646
647 // balanceBlock compares current state to desired state for a single
648 // block, and makes the appropriate ChangeSet calls.
649 func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) balanceResult {
650         bal.Logger.Debugf("balanceBlock: %v %+v", blkid, blk)
651
652         // Build a list of all slots (one per mounted volume).
653         slots := make([]slot, 0, bal.mounts)
654         for _, srv := range bal.KeepServices {
655                 for _, mnt := range srv.mounts {
656                         var repl *Replica
657                         for r := range blk.Replicas {
658                                 if blk.Replicas[r].KeepMount == mnt {
659                                         repl = &blk.Replicas[r]
660                                 }
661                         }
662                         // Initial value of "want" is "have, and can't
663                         // delete". These untrashable replicas get
664                         // prioritized when sorting slots: otherwise,
665                         // non-optimal readonly copies would cause us
666                         // to overreplicate.
667                         slots = append(slots, slot{
668                                 mnt:  mnt,
669                                 repl: repl,
670                                 want: repl != nil && mnt.ReadOnly,
671                         })
672                 }
673         }
674
675         uuids := keepclient.NewRootSorter(bal.serviceRoots, string(blkid[:32])).GetSortedRoots()
676         srvRendezvous := make(map[*KeepService]int, len(uuids))
677         for i, uuid := range uuids {
678                 srv := bal.KeepServices[uuid]
679                 srvRendezvous[srv] = i
680         }
681
682         // Below we set underreplicated=true if we find any storage
683         // class that's currently underreplicated -- in that case we
684         // won't want to trash any replicas.
685         underreplicated := false
686
687         unsafeToDelete := make(map[int64]bool, len(slots))
688         for _, class := range bal.classes {
689                 desired := blk.Desired[class]
690                 if desired == 0 {
691                         continue
692                 }
693
694                 // Sort the slots by desirability.
695                 sort.Slice(slots, func(i, j int) bool {
696                         si, sj := slots[i], slots[j]
697                         if classi, classj := bal.mountsByClass[class][si.mnt], bal.mountsByClass[class][sj.mnt]; classi != classj {
698                                 // Prefer a mount that satisfies the
699                                 // desired class.
700                                 return bal.mountsByClass[class][si.mnt]
701                         } else if si.want != sj.want {
702                                 // Prefer a mount that will have a
703                                 // replica no matter what we do here
704                                 // -- either because it already has an
705                                 // untrashable replica, or because we
706                                 // already need it to satisfy a
707                                 // different storage class.
708                                 return si.want
709                         } else if orderi, orderj := srvRendezvous[si.mnt.KeepService], srvRendezvous[sj.mnt.KeepService]; orderi != orderj {
710                                 // Prefer a better rendezvous
711                                 // position.
712                                 return orderi < orderj
713                         } else if repli, replj := si.repl != nil, sj.repl != nil; repli != replj {
714                                 // Prefer a mount that already has a
715                                 // replica.
716                                 return repli
717                         } else {
718                                 // If pull/trash turns out to be
719                                 // needed, distribute the
720                                 // new/remaining replicas uniformly
721                                 // across qualifying mounts on a given
722                                 // server.
723                                 return rendezvousLess(si.mnt.UUID, sj.mnt.UUID, blkid)
724                         }
725                 })
726
727                 // Servers/mounts/devices (with or without existing
728                 // replicas) that are part of the best achievable
729                 // layout for this storage class.
730                 wantSrv := map[*KeepService]bool{}
731                 wantMnt := map[*KeepMount]bool{}
732                 wantDev := map[string]bool{}
733                 // Positions (with existing replicas) that have been
734                 // protected (via unsafeToDelete) to ensure we don't
735                 // reduce replication below desired level when
736                 // trashing replicas that aren't optimal positions for
737                 // any storage class.
738                 protMnt := map[*KeepMount]bool{}
739                 // Replication planned so far (corresponds to wantMnt).
740                 replWant := 0
741                 // Protected replication (corresponds to protMnt).
742                 replProt := 0
743
744                 // trySlot tries using a slot to meet requirements,
745                 // and returns true if all requirements are met.
746                 trySlot := func(i int) bool {
747                         slot := slots[i]
748                         if wantMnt[slot.mnt] || wantDev[slot.mnt.UUID] {
749                                 // Already allocated a replica to this
750                                 // backend device, possibly on a
751                                 // different server.
752                                 return false
753                         }
754                         if replProt < desired && slot.repl != nil && !protMnt[slot.mnt] {
755                                 unsafeToDelete[slot.repl.Mtime] = true
756                                 protMnt[slot.mnt] = true
757                                 replProt += slot.mnt.Replication
758                         }
759                         if replWant < desired && (slot.repl != nil || !slot.mnt.ReadOnly) {
760                                 slots[i].want = true
761                                 wantSrv[slot.mnt.KeepService] = true
762                                 wantMnt[slot.mnt] = true
763                                 wantDev[slot.mnt.UUID] = true
764                                 replWant += slot.mnt.Replication
765                         }
766                         return replProt >= desired && replWant >= desired
767                 }
768
769                 // First try to achieve desired replication without
770                 // using the same server twice.
771                 done := false
772                 for i := 0; i < len(slots) && !done; i++ {
773                         if !wantSrv[slots[i].mnt.KeepService] {
774                                 done = trySlot(i)
775                         }
776                 }
777
778                 // If that didn't suffice, do another pass without the
779                 // "distinct services" restriction. (Achieving the
780                 // desired volume replication on fewer than the
781                 // desired number of services is better than
782                 // underreplicating.)
783                 for i := 0; i < len(slots) && !done; i++ {
784                         done = trySlot(i)
785                 }
786
787                 if !underreplicated {
788                         safe := 0
789                         for _, slot := range slots {
790                                 if slot.repl == nil || !bal.mountsByClass[class][slot.mnt] {
791                                         continue
792                                 }
793                                 if safe += slot.mnt.Replication; safe >= desired {
794                                         break
795                                 }
796                         }
797                         underreplicated = safe < desired
798                 }
799
800                 // Avoid deleting wanted replicas from devices that
801                 // are mounted on multiple servers -- even if they
802                 // haven't already been added to unsafeToDelete
803                 // because the servers report different Mtimes.
804                 for _, slot := range slots {
805                         if slot.repl != nil && wantDev[slot.mnt.UUID] {
806                                 unsafeToDelete[slot.repl.Mtime] = true
807                         }
808                 }
809         }
810
811         // TODO: If multiple replicas are trashable, prefer the oldest
812         // replica that doesn't have a timestamp collision with
813         // others.
814
815         for i, slot := range slots {
816                 // Don't trash (1) any replicas of an underreplicated
817                 // block, even if they're in the wrong positions, or
818                 // (2) any replicas whose Mtimes are identical to
819                 // needed replicas (in case we're really seeing the
820                 // same copy via different mounts).
821                 if slot.repl != nil && (underreplicated || unsafeToDelete[slot.repl.Mtime]) {
822                         slots[i].want = true
823                 }
824         }
825
826         classState := make(map[string]balancedBlockState, len(bal.classes))
827         for _, class := range bal.classes {
828                 classState[class] = computeBlockState(slots, bal.mountsByClass[class], len(blk.Replicas), blk.Desired[class])
829         }
830         blockState := computeBlockState(slots, nil, len(blk.Replicas), 0)
831
832         // Sort the slots by rendezvous order. This ensures "trash the
833         // first of N replicas with identical timestamps" is
834         // predictable (helpful for testing) and well distributed
835         // across servers.
836         sort.Slice(slots, func(i, j int) bool {
837                 si, sj := slots[i], slots[j]
838                 if orderi, orderj := srvRendezvous[si.mnt.KeepService], srvRendezvous[sj.mnt.KeepService]; orderi != orderj {
839                         return orderi < orderj
840                 } else {
841                         return rendezvousLess(si.mnt.UUID, sj.mnt.UUID, blkid)
842                 }
843         })
844
845         var (
846                 lost         bool
847                 changes      []string
848                 trashedMtime = make(map[int64]bool, len(slots))
849         )
850         for _, slot := range slots {
851                 // TODO: request a Touch if Mtime is duplicated.
852                 var change int
853                 switch {
854                 case !slot.want && slot.repl != nil && slot.repl.Mtime < bal.MinMtime:
855                         if trashedMtime[slot.repl.Mtime] {
856                                 // Don't trash multiple replicas with
857                                 // identical timestamps. If they are
858                                 // multiple views of the same backing
859                                 // storage, asking both servers to
860                                 // trash is redundant and can cause
861                                 // races (see #20242). If they are
862                                 // distinct replicas that happen to
863                                 // have identical timestamps, we'll
864                                 // get this one on the next sweep.
865                                 change = changeNone
866                         } else {
867                                 slot.mnt.KeepService.AddTrash(Trash{
868                                         SizedDigest: blkid,
869                                         Mtime:       slot.repl.Mtime,
870                                         From:        slot.mnt,
871                                 })
872                                 change = changeTrash
873                                 trashedMtime[slot.repl.Mtime] = true
874                         }
875                 case slot.repl == nil && slot.want && len(blk.Replicas) == 0:
876                         lost = true
877                         change = changeNone
878                 case slot.repl == nil && slot.want && !slot.mnt.ReadOnly:
879                         slot.mnt.KeepService.AddPull(Pull{
880                                 SizedDigest: blkid,
881                                 From:        blk.Replicas[0].KeepMount.KeepService,
882                                 To:          slot.mnt,
883                         })
884                         change = changePull
885                 case slot.repl != nil:
886                         change = changeStay
887                 default:
888                         change = changeNone
889                 }
890                 if bal.Dumper != nil {
891                         var mtime int64
892                         if slot.repl != nil {
893                                 mtime = slot.repl.Mtime
894                         }
895                         srv := slot.mnt.KeepService
896                         changes = append(changes, fmt.Sprintf("%s:%d/%s=%s,%d", srv.ServiceHost, srv.ServicePort, slot.mnt.UUID, changeName[change], mtime))
897                 }
898         }
899         if bal.Dumper != nil {
900                 bal.Dumper.Printf("%s refs=%d needed=%d unneeded=%d pulling=%v %v %v", blkid, blk.RefCount, blockState.needed, blockState.unneeded, blockState.pulling, blk.Desired, changes)
901         }
902         return balanceResult{
903                 blk:        blk,
904                 blkid:      blkid,
905                 lost:       lost,
906                 blockState: blockState,
907                 classState: classState,
908         }
909 }
910
911 func computeBlockState(slots []slot, onlyCount map[*KeepMount]bool, have, needRepl int) (bbs balancedBlockState) {
912         repl := 0
913         countedDev := map[string]bool{}
914         for _, slot := range slots {
915                 if onlyCount != nil && !onlyCount[slot.mnt] {
916                         continue
917                 }
918                 if countedDev[slot.mnt.UUID] {
919                         continue
920                 }
921                 switch {
922                 case slot.repl != nil && slot.want:
923                         bbs.needed++
924                         repl += slot.mnt.Replication
925                 case slot.repl != nil && !slot.want:
926                         bbs.unneeded++
927                         repl += slot.mnt.Replication
928                 case slot.repl == nil && slot.want && have > 0:
929                         bbs.pulling++
930                         repl += slot.mnt.Replication
931                 }
932                 countedDev[slot.mnt.UUID] = true
933         }
934         if repl < needRepl {
935                 bbs.unachievable = true
936         }
937         return
938 }
939
940 type blocksNBytes struct {
941         replicas int
942         blocks   int
943         bytes    int64
944 }
945
946 func (bb blocksNBytes) String() string {
947         return fmt.Sprintf("%d replicas (%d blocks, %d bytes)", bb.replicas, bb.blocks, bb.bytes)
948 }
949
950 type replicationStats struct {
951         needed       blocksNBytes
952         unneeded     blocksNBytes
953         pulling      blocksNBytes
954         unachievable blocksNBytes
955 }
956
957 type balancerStats struct {
958         lost          blocksNBytes
959         overrep       blocksNBytes
960         unref         blocksNBytes
961         garbage       blocksNBytes
962         underrep      blocksNBytes
963         unachievable  blocksNBytes
964         justright     blocksNBytes
965         desired       blocksNBytes
966         current       blocksNBytes
967         pulls         int
968         trashes       int
969         replHistogram []int
970         classStats    map[string]replicationStats
971
972         // collectionBytes / collectionBlockBytes = deduplication ratio
973         collectionBytes      int64 // sum(bytes in referenced blocks) across all collections
974         collectionBlockBytes int64 // sum(block size) across all blocks referenced by collections
975         collectionBlockRefs  int64 // sum(number of blocks referenced) across all collections
976         collectionBlocks     int64 // number of blocks referenced by any collection
977 }
978
979 func (s *balancerStats) dedupByteRatio() float64 {
980         if s.collectionBlockBytes == 0 {
981                 return 0
982         }
983         return float64(s.collectionBytes) / float64(s.collectionBlockBytes)
984 }
985
986 func (s *balancerStats) dedupBlockRatio() float64 {
987         if s.collectionBlocks == 0 {
988                 return 0
989         }
990         return float64(s.collectionBlockRefs) / float64(s.collectionBlocks)
991 }
992
993 func (bal *Balancer) collectStatistics(results <-chan balanceResult) {
994         var s balancerStats
995         s.replHistogram = make([]int, 2)
996         s.classStats = make(map[string]replicationStats, len(bal.classes))
997         for result := range results {
998                 bytes := result.blkid.Size()
999
1000                 if rc := int64(result.blk.RefCount); rc > 0 {
1001                         s.collectionBytes += rc * bytes
1002                         s.collectionBlockBytes += bytes
1003                         s.collectionBlockRefs += rc
1004                         s.collectionBlocks++
1005                 }
1006
1007                 for class, state := range result.classState {
1008                         cs := s.classStats[class]
1009                         if state.unachievable {
1010                                 cs.unachievable.replicas++
1011                                 cs.unachievable.blocks++
1012                                 cs.unachievable.bytes += bytes
1013                         }
1014                         if state.needed > 0 {
1015                                 cs.needed.replicas += state.needed
1016                                 cs.needed.blocks++
1017                                 cs.needed.bytes += bytes * int64(state.needed)
1018                         }
1019                         if state.unneeded > 0 {
1020                                 cs.unneeded.replicas += state.unneeded
1021                                 cs.unneeded.blocks++
1022                                 cs.unneeded.bytes += bytes * int64(state.unneeded)
1023                         }
1024                         if state.pulling > 0 {
1025                                 cs.pulling.replicas += state.pulling
1026                                 cs.pulling.blocks++
1027                                 cs.pulling.bytes += bytes * int64(state.pulling)
1028                         }
1029                         s.classStats[class] = cs
1030                 }
1031
1032                 bs := result.blockState
1033                 switch {
1034                 case result.lost:
1035                         s.lost.replicas++
1036                         s.lost.blocks++
1037                         s.lost.bytes += bytes
1038                         fmt.Fprintf(bal.lostBlocks, "%s", strings.SplitN(string(result.blkid), "+", 2)[0])
1039                         for pdh := range result.blk.Refs {
1040                                 fmt.Fprintf(bal.lostBlocks, " %s", pdh)
1041                         }
1042                         fmt.Fprint(bal.lostBlocks, "\n")
1043                 case bs.pulling > 0:
1044                         s.underrep.replicas += bs.pulling
1045                         s.underrep.blocks++
1046                         s.underrep.bytes += bytes * int64(bs.pulling)
1047                 case bs.unachievable:
1048                         s.underrep.replicas++
1049                         s.underrep.blocks++
1050                         s.underrep.bytes += bytes
1051                 case bs.unneeded > 0 && bs.needed == 0:
1052                         // Count as "garbage" if all replicas are old
1053                         // enough to trash, otherwise count as
1054                         // "unref".
1055                         counter := &s.garbage
1056                         for _, r := range result.blk.Replicas {
1057                                 if r.Mtime >= bal.MinMtime {
1058                                         counter = &s.unref
1059                                         break
1060                                 }
1061                         }
1062                         counter.replicas += bs.unneeded
1063                         counter.blocks++
1064                         counter.bytes += bytes * int64(bs.unneeded)
1065                 case bs.unneeded > 0:
1066                         s.overrep.replicas += bs.unneeded
1067                         s.overrep.blocks++
1068                         s.overrep.bytes += bytes * int64(bs.unneeded)
1069                 default:
1070                         s.justright.replicas += bs.needed
1071                         s.justright.blocks++
1072                         s.justright.bytes += bytes * int64(bs.needed)
1073                 }
1074
1075                 if bs.needed > 0 {
1076                         s.desired.replicas += bs.needed
1077                         s.desired.blocks++
1078                         s.desired.bytes += bytes * int64(bs.needed)
1079                 }
1080                 if bs.needed+bs.unneeded > 0 {
1081                         s.current.replicas += bs.needed + bs.unneeded
1082                         s.current.blocks++
1083                         s.current.bytes += bytes * int64(bs.needed+bs.unneeded)
1084                 }
1085
1086                 for len(s.replHistogram) <= bs.needed+bs.unneeded {
1087                         s.replHistogram = append(s.replHistogram, 0)
1088                 }
1089                 s.replHistogram[bs.needed+bs.unneeded]++
1090         }
1091         for _, srv := range bal.KeepServices {
1092                 s.pulls += len(srv.ChangeSet.Pulls)
1093                 s.trashes += len(srv.ChangeSet.Trashes)
1094         }
1095         bal.stats = s
1096         bal.Metrics.UpdateStats(s)
1097 }
1098
1099 // PrintStatistics writes statistics about the computed changes to
1100 // bal.Logger. It should not be called until ComputeChangeSets has
1101 // finished.
1102 func (bal *Balancer) PrintStatistics() {
1103         bal.logf("===")
1104         bal.logf("%s lost (0=have<want)", bal.stats.lost)
1105         bal.logf("%s underreplicated (0<have<want)", bal.stats.underrep)
1106         bal.logf("%s just right (have=want)", bal.stats.justright)
1107         bal.logf("%s overreplicated (have>want>0)", bal.stats.overrep)
1108         bal.logf("%s unreferenced (have>want=0, new)", bal.stats.unref)
1109         bal.logf("%s garbage (have>want=0, old)", bal.stats.garbage)
1110         for _, class := range bal.classes {
1111                 cs := bal.stats.classStats[class]
1112                 bal.logf("===")
1113                 bal.logf("storage class %q: %s needed", class, cs.needed)
1114                 bal.logf("storage class %q: %s unneeded", class, cs.unneeded)
1115                 bal.logf("storage class %q: %s pulling", class, cs.pulling)
1116                 bal.logf("storage class %q: %s unachievable", class, cs.unachievable)
1117         }
1118         bal.logf("===")
1119         bal.logf("%s total commitment (excluding unreferenced)", bal.stats.desired)
1120         bal.logf("%s total usage", bal.stats.current)
1121         bal.logf("===")
1122         for _, srv := range bal.KeepServices {
1123                 bal.logf("%s: %v\n", srv, srv.ChangeSet)
1124         }
1125         bal.logf("===")
1126         bal.printHistogram(60)
1127         bal.logf("===")
1128 }
1129
1130 func (bal *Balancer) printHistogram(hashColumns int) {
1131         bal.logf("Replication level distribution:")
1132         maxCount := 0
1133         for _, count := range bal.stats.replHistogram {
1134                 if maxCount < count {
1135                         maxCount = count
1136                 }
1137         }
1138         hashes := strings.Repeat("#", hashColumns)
1139         countWidth := 1 + int(math.Log10(float64(maxCount+1)))
1140         scaleCount := 10 * float64(hashColumns) / math.Floor(1+10*math.Log10(float64(maxCount+1)))
1141         for repl, count := range bal.stats.replHistogram {
1142                 nHashes := int(scaleCount * math.Log10(float64(count+1)))
1143                 bal.logf("%2d: %*d %s", repl, countWidth, count, hashes[:nHashes])
1144         }
1145 }
1146
1147 // CheckSanityLate checks for configuration and runtime errors after
1148 // GetCurrentState() and ComputeChangeSets() have finished.
1149 //
1150 // If it returns an error, it is dangerous to run any Commit methods.
1151 func (bal *Balancer) CheckSanityLate() error {
1152         if bal.errors != nil {
1153                 for _, err := range bal.errors {
1154                         bal.logf("deferred error: %v", err)
1155                 }
1156                 return fmt.Errorf("cannot proceed safely after deferred errors")
1157         }
1158
1159         if bal.collScanned == 0 {
1160                 return fmt.Errorf("received zero collections")
1161         }
1162
1163         anyDesired := false
1164         bal.BlockStateMap.Apply(func(_ arvados.SizedDigest, blk *BlockState) {
1165                 for _, desired := range blk.Desired {
1166                         if desired > 0 {
1167                                 anyDesired = true
1168                                 break
1169                         }
1170                 }
1171         })
1172         if !anyDesired {
1173                 return fmt.Errorf("zero blocks have desired replication>0")
1174         }
1175
1176         if dr := bal.DefaultReplication; dr < 1 {
1177                 return fmt.Errorf("Default replication (%d) is less than 1", dr)
1178         }
1179
1180         // TODO: no two services have identical indexes
1181         // TODO: no collisions (same md5, different size)
1182         return nil
1183 }
1184
1185 // CommitPulls sends the computed lists of pull requests to the
1186 // keepstore servers. This has the effect of increasing replication of
1187 // existing blocks that are either underreplicated or poorly
1188 // distributed according to rendezvous hashing.
1189 func (bal *Balancer) CommitPulls(ctx context.Context, c *arvados.Client) error {
1190         defer bal.time("send_pull_lists", "wall clock time to send pull lists")()
1191         return bal.commitAsync(c, "send pull list",
1192                 func(srv *KeepService) error {
1193                         return srv.CommitPulls(ctx, c)
1194                 })
1195 }
1196
1197 // CommitTrash sends the computed lists of trash requests to the
1198 // keepstore servers. This has the effect of deleting blocks that are
1199 // overreplicated or unreferenced.
1200 func (bal *Balancer) CommitTrash(ctx context.Context, c *arvados.Client) error {
1201         defer bal.time("send_trash_lists", "wall clock time to send trash lists")()
1202         return bal.commitAsync(c, "send trash list",
1203                 func(srv *KeepService) error {
1204                         return srv.CommitTrash(ctx, c)
1205                 })
1206 }
1207
1208 func (bal *Balancer) commitAsync(c *arvados.Client, label string, f func(srv *KeepService) error) error {
1209         errs := make(chan error)
1210         for _, srv := range bal.KeepServices {
1211                 go func(srv *KeepService) {
1212                         var err error
1213                         defer func() { errs <- err }()
1214                         label := fmt.Sprintf("%s: %v", srv, label)
1215                         err = f(srv)
1216                         if err != nil {
1217                                 err = fmt.Errorf("%s: %v", label, err)
1218                         }
1219                 }(srv)
1220         }
1221         var lastErr error
1222         for range bal.KeepServices {
1223                 if err := <-errs; err != nil {
1224                         bal.logf("%v", err)
1225                         lastErr = err
1226                 }
1227         }
1228         close(errs)
1229         return lastErr
1230 }
1231
1232 func (bal *Balancer) logf(f string, args ...interface{}) {
1233         if bal.Logger != nil {
1234                 bal.Logger.Printf(f, args...)
1235         }
1236 }
1237
1238 func (bal *Balancer) time(name, help string) func() {
1239         observer := bal.Metrics.DurationObserver(name+"_seconds", help)
1240         t0 := time.Now()
1241         bal.Logger.Printf("%s: start", name)
1242         return func() {
1243                 dur := time.Since(t0)
1244                 observer.Observe(dur.Seconds())
1245                 bal.Logger.Printf("%s: took %vs", name, dur.Seconds())
1246         }
1247 }
1248
1249 // Log current memory usage: once now, at least once every 10 minutes,
1250 // and when memory grows by 40% since the last log. Stop when ctx is
1251 // canceled.
1252 func (bal *Balancer) reportMemorySize(ctx context.Context) {
1253         buf, _ := os.ReadFile("/proc/self/smaps")
1254         m := regexp.MustCompile(`\nKernelPageSize:\s*(\d+) kB\n`).FindSubmatch(buf)
1255         var pagesize int64
1256         if len(m) == 2 {
1257                 pagesize, _ = strconv.ParseInt(string(m[1]), 10, 64)
1258                 pagesize <<= 10
1259         }
1260         if pagesize == 0 {
1261                 bal.logf("cannot log OS-reported memory size: failed to parse KernelPageSize from /proc/self/smaps")
1262         }
1263         osstats := func() string {
1264                 if pagesize == 0 {
1265                         return ""
1266                 }
1267                 buf, _ := os.ReadFile("/proc/self/statm")
1268                 fields := strings.Split(string(buf), " ")
1269                 if len(fields) < 2 {
1270                         return ""
1271                 }
1272                 virt, _ := strconv.ParseInt(fields[0], 10, 64)
1273                 virt *= pagesize
1274                 res, _ := strconv.ParseInt(fields[1], 10, 64)
1275                 res *= pagesize
1276                 if virt == 0 || res == 0 {
1277                         return ""
1278                 }
1279                 return fmt.Sprintf(" virt %d res %d", virt, res)
1280         }
1281
1282         var nextTime time.Time
1283         var nextMem uint64
1284         const maxInterval = time.Minute * 10
1285         const maxIncrease = 1.4
1286
1287         ticker := time.NewTicker(time.Second)
1288         defer ticker.Stop()
1289         var memstats runtime.MemStats
1290         for ctx.Err() == nil {
1291                 now := time.Now()
1292                 runtime.ReadMemStats(&memstats)
1293                 mem := memstats.StackInuse + memstats.HeapInuse
1294                 if now.After(nextTime) || mem >= nextMem {
1295                         bal.logf("heap %d stack %d heapalloc %d%s", memstats.HeapInuse, memstats.StackInuse, memstats.HeapAlloc, osstats())
1296                         nextMem = uint64(float64(mem) * maxIncrease)
1297                         nextTime = now.Add(maxInterval)
1298                 }
1299                 <-ticker.C
1300         }
1301 }
1302
1303 // Rendezvous hash sort function. Less efficient than sorting on
1304 // precomputed rendezvous hashes, but also rarely used.
1305 func rendezvousLess(i, j string, blkid arvados.SizedDigest) bool {
1306         a := md5.Sum([]byte(string(blkid[:32]) + i))
1307         b := md5.Sum([]byte(string(blkid[:32]) + j))
1308         return bytes.Compare(a[:], b[:]) < 0
1309 }