Merge branch '16325-test-container-collection-properties'
[arvados.git] / services / keep-balance / balance.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package keepbalance
6
7 import (
8         "bytes"
9         "context"
10         "crypto/md5"
11         "errors"
12         "fmt"
13         "io"
14         "io/ioutil"
15         "log"
16         "math"
17         "os"
18         "regexp"
19         "runtime"
20         "sort"
21         "strconv"
22         "strings"
23         "sync"
24         "sync/atomic"
25         "syscall"
26         "time"
27
28         "git.arvados.org/arvados.git/lib/controller/dblock"
29         "git.arvados.org/arvados.git/sdk/go/arvados"
30         "git.arvados.org/arvados.git/sdk/go/keepclient"
31         "github.com/jmoiron/sqlx"
32         "github.com/sirupsen/logrus"
33 )
34
35 // Balancer compares the contents of keepstore servers with the
36 // collections stored in Arvados, and issues pull/trash requests
37 // needed to get (closer to) the optimal data layout.
38 //
39 // In the optimal data layout: every data block referenced by a
40 // collection is replicated at least as many times as desired by the
41 // collection; there are no unreferenced data blocks older than
42 // BlobSignatureTTL; and all N existing replicas of a given data block
43 // are in the N best positions in rendezvous probe order.
44 type Balancer struct {
45         DB      *sqlx.DB
46         Logger  logrus.FieldLogger
47         Dumper  logrus.FieldLogger
48         Metrics *metrics
49
50         ChunkPrefix    string
51         LostBlocksFile string
52
53         *BlockStateMap
54         KeepServices       map[string]*KeepService
55         DefaultReplication int
56         MinMtime           int64
57
58         classes       []string
59         mounts        int
60         mountsByClass map[string]map[*KeepMount]bool
61         collScanned   int64
62         serviceRoots  map[string]string
63         errors        []error
64         stats         balancerStats
65         mutex         sync.Mutex
66         lostBlocks    io.Writer
67 }
68
69 // Run performs a balance operation using the given config and
70 // runOptions, and returns RunOptions suitable for passing to a
71 // subsequent balance operation.
72 //
73 // Run should only be called once on a given Balancer object.
74 func (bal *Balancer) Run(ctx context.Context, client *arvados.Client, cluster *arvados.Cluster, runOptions RunOptions) (nextRunOptions RunOptions, err error) {
75         nextRunOptions = runOptions
76
77         bal.logf("acquiring active lock")
78         if !dblock.KeepBalanceActive.Lock(ctx, func(context.Context) (*sqlx.DB, error) { return bal.DB, nil }) {
79                 // context canceled
80                 return
81         }
82         defer dblock.KeepBalanceActive.Unlock()
83
84         defer bal.time("sweep", "wall clock time to run one full sweep")()
85
86         ctx, cancel := context.WithDeadline(ctx, time.Now().Add(cluster.Collections.BalanceTimeout.Duration()))
87         defer cancel()
88
89         go bal.reportMemorySize(ctx)
90
91         var lbFile *os.File
92         if bal.LostBlocksFile != "" {
93                 tmpfn := bal.LostBlocksFile + ".tmp"
94                 lbFile, err = os.OpenFile(tmpfn, os.O_CREATE|os.O_WRONLY, 0777)
95                 if err != nil {
96                         return
97                 }
98                 defer lbFile.Close()
99                 err = syscall.Flock(int(lbFile.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
100                 if err != nil {
101                         return
102                 }
103                 defer func() {
104                         // Remove the tempfile only if we didn't get
105                         // as far as successfully renaming it.
106                         if lbFile != nil {
107                                 os.Remove(tmpfn)
108                         }
109                 }()
110                 bal.lostBlocks = lbFile
111         } else {
112                 bal.lostBlocks = ioutil.Discard
113         }
114
115         err = bal.DiscoverKeepServices(client)
116         if err != nil {
117                 return
118         }
119
120         for _, srv := range bal.KeepServices {
121                 err = srv.discoverMounts(client)
122                 if err != nil {
123                         return
124                 }
125         }
126         bal.cleanupMounts()
127
128         if err = bal.CheckSanityEarly(client); err != nil {
129                 return
130         }
131
132         // On a big site, indexing and sending trash/pull lists can
133         // take much longer than the usual 5 minute client
134         // timeout. From here on, we rely on the context deadline
135         // instead, aborting the entire operation if any part takes
136         // too long.
137         client.Timeout = 0
138
139         rs := bal.rendezvousState()
140         if runOptions.CommitTrash && rs != runOptions.SafeRendezvousState {
141                 if runOptions.SafeRendezvousState != "" {
142                         bal.logf("notice: KeepServices list has changed since last run")
143                 }
144                 bal.logf("clearing existing trash lists, in case the new rendezvous order differs from previous run")
145                 if err = bal.ClearTrashLists(ctx, client); err != nil {
146                         return
147                 }
148                 // The current rendezvous state becomes "safe" (i.e.,
149                 // OK to compute changes for that state without
150                 // clearing existing trash lists) only now, after we
151                 // succeed in clearing existing trash lists.
152                 nextRunOptions.SafeRendezvousState = rs
153         }
154
155         if err = bal.GetCurrentState(ctx, client, cluster.Collections.BalanceCollectionBatch, cluster.Collections.BalanceCollectionBuffers); err != nil {
156                 return
157         }
158         bal.ComputeChangeSets()
159         bal.PrintStatistics()
160         if err = bal.CheckSanityLate(); err != nil {
161                 return
162         }
163         if lbFile != nil {
164                 err = lbFile.Sync()
165                 if err != nil {
166                         return
167                 }
168                 err = os.Rename(bal.LostBlocksFile+".tmp", bal.LostBlocksFile)
169                 if err != nil {
170                         return
171                 }
172                 lbFile = nil
173         }
174         if runOptions.CommitPulls {
175                 err = bal.CommitPulls(ctx, client)
176                 if err != nil {
177                         // Skip trash if we can't pull. (Too cautious?)
178                         return
179                 }
180         }
181         if runOptions.CommitTrash {
182                 err = bal.CommitTrash(ctx, client)
183                 if err != nil {
184                         return
185                 }
186         }
187         if runOptions.CommitConfirmedFields {
188                 err = bal.updateCollections(ctx, client, cluster)
189                 if err != nil {
190                         return
191                 }
192         }
193         return
194 }
195
196 // SetKeepServices sets the list of KeepServices to operate on.
197 func (bal *Balancer) SetKeepServices(srvList arvados.KeepServiceList) error {
198         bal.KeepServices = make(map[string]*KeepService)
199         for _, srv := range srvList.Items {
200                 bal.KeepServices[srv.UUID] = &KeepService{
201                         KeepService: srv,
202                         ChangeSet:   &ChangeSet{},
203                 }
204         }
205         return nil
206 }
207
208 // DiscoverKeepServices sets the list of KeepServices by calling the
209 // API to get a list of all services, and selecting the ones whose
210 // ServiceType is "disk"
211 func (bal *Balancer) DiscoverKeepServices(c *arvados.Client) error {
212         bal.KeepServices = make(map[string]*KeepService)
213         return c.EachKeepService(func(srv arvados.KeepService) error {
214                 if srv.ServiceType == "disk" {
215                         bal.KeepServices[srv.UUID] = &KeepService{
216                                 KeepService: srv,
217                                 ChangeSet:   &ChangeSet{},
218                         }
219                 } else {
220                         bal.logf("skipping %v with service type %q", srv.UUID, srv.ServiceType)
221                 }
222                 return nil
223         })
224 }
225
226 func (bal *Balancer) cleanupMounts() {
227         rwdev := map[string]*KeepService{}
228         for _, srv := range bal.KeepServices {
229                 for _, mnt := range srv.mounts {
230                         if !mnt.ReadOnly {
231                                 rwdev[mnt.UUID] = srv
232                         }
233                 }
234         }
235         // Drop the readonly mounts whose device is mounted RW
236         // elsewhere.
237         for _, srv := range bal.KeepServices {
238                 var dedup []*KeepMount
239                 for _, mnt := range srv.mounts {
240                         if mnt.ReadOnly && rwdev[mnt.UUID] != nil {
241                                 bal.logf("skipping srv %s readonly mount %q because same volume is mounted read-write on srv %s", srv, mnt.UUID, rwdev[mnt.UUID])
242                         } else {
243                                 dedup = append(dedup, mnt)
244                         }
245                 }
246                 srv.mounts = dedup
247         }
248         for _, srv := range bal.KeepServices {
249                 for _, mnt := range srv.mounts {
250                         if mnt.Replication <= 0 {
251                                 log.Printf("%s: mount %s reports replication=%d, using replication=1", srv, mnt.UUID, mnt.Replication)
252                                 mnt.Replication = 1
253                         }
254                 }
255         }
256 }
257
258 // CheckSanityEarly checks for configuration and runtime errors that
259 // can be detected before GetCurrentState() and ComputeChangeSets()
260 // are called.
261 //
262 // If it returns an error, it is pointless to run GetCurrentState or
263 // ComputeChangeSets: after doing so, the statistics would be
264 // meaningless and it would be dangerous to run any Commit methods.
265 func (bal *Balancer) CheckSanityEarly(c *arvados.Client) error {
266         u, err := c.CurrentUser()
267         if err != nil {
268                 return fmt.Errorf("CurrentUser(): %v", err)
269         }
270         if !u.IsActive || !u.IsAdmin {
271                 return fmt.Errorf("current user (%s) is not an active admin user", u.UUID)
272         }
273         for _, srv := range bal.KeepServices {
274                 if srv.ServiceType == "proxy" {
275                         return fmt.Errorf("config error: %s: proxy servers cannot be balanced", srv)
276                 }
277         }
278         for _, c := range bal.ChunkPrefix {
279                 if !strings.ContainsRune("0123456789abcdef", c) {
280                         return fmt.Errorf("invalid char %q in chunk prefix %q: only lowercase hex digits make sense", string(c), bal.ChunkPrefix)
281                 }
282         }
283         if len(bal.ChunkPrefix) > 32 {
284                 return fmt.Errorf("invalid chunk prefix %q: longer than a block hash", bal.ChunkPrefix)
285         }
286
287         mountProblem := false
288         type deviceMount struct {
289                 srv *KeepService
290                 mnt *KeepMount
291         }
292         deviceMounted := map[string]deviceMount{} // DeviceID -> mount
293         for _, srv := range bal.KeepServices {
294                 for _, mnt := range srv.mounts {
295                         if first, dup := deviceMounted[mnt.DeviceID]; dup && first.mnt.UUID != mnt.UUID && mnt.DeviceID != "" {
296                                 bal.logf("config error: device %s is mounted with multiple volume UUIDs: %s on %s, and %s on %s",
297                                         mnt.DeviceID,
298                                         first.mnt.UUID, first.srv,
299                                         mnt.UUID, srv)
300                                 mountProblem = true
301                                 continue
302                         }
303                         deviceMounted[mnt.DeviceID] = deviceMount{srv, mnt}
304                 }
305         }
306         if mountProblem {
307                 return errors.New("cannot continue with config errors (see above)")
308         }
309
310         var checkPage arvados.CollectionList
311         if err = c.RequestAndDecode(&checkPage, "GET", "arvados/v1/collections", nil, arvados.ResourceListParams{
312                 Limit:              new(int),
313                 Count:              "exact",
314                 IncludeTrash:       true,
315                 IncludeOldVersions: true,
316                 Filters: []arvados.Filter{{
317                         Attr:     "modified_at",
318                         Operator: "=",
319                         Operand:  nil,
320                 }},
321         }); err != nil {
322                 return err
323         } else if n := checkPage.ItemsAvailable; n > 0 {
324                 return fmt.Errorf("%d collections exist with null modified_at; cannot fetch reliably", n)
325         }
326
327         return nil
328 }
329
330 // rendezvousState returns a fingerprint (e.g., a sorted list of
331 // UUID+host+port) of the current set of keep services.
332 func (bal *Balancer) rendezvousState() string {
333         srvs := make([]string, 0, len(bal.KeepServices))
334         for _, srv := range bal.KeepServices {
335                 srvs = append(srvs, srv.String())
336         }
337         sort.Strings(srvs)
338         return strings.Join(srvs, "; ")
339 }
340
341 // ClearTrashLists sends an empty trash list to each keep
342 // service. Calling this before GetCurrentState avoids races.
343 //
344 // When a block appears in an index, we assume that replica will still
345 // exist after we delete other replicas on other servers. However,
346 // it's possible that a previous rebalancing operation made different
347 // decisions (e.g., servers were added/removed, and rendezvous order
348 // changed). In this case, the replica might already be on that
349 // server's trash list, and it might be deleted before we send a
350 // replacement trash list.
351 //
352 // We avoid this problem if we clear all trash lists before getting
353 // indexes. (We also assume there is only one rebalancing process
354 // running at a time.)
355 func (bal *Balancer) ClearTrashLists(ctx context.Context, c *arvados.Client) error {
356         for _, srv := range bal.KeepServices {
357                 srv.ChangeSet = &ChangeSet{}
358         }
359         return bal.CommitTrash(ctx, c)
360 }
361
362 // GetCurrentState determines the current replication state, and the
363 // desired replication level, for every block that is either
364 // retrievable or referenced.
365 //
366 // It determines the current replication state by reading the block index
367 // from every known Keep service.
368 //
369 // It determines the desired replication level by retrieving all
370 // collection manifests in the database (API server).
371 //
372 // It encodes the resulting information in BlockStateMap.
373 func (bal *Balancer) GetCurrentState(ctx context.Context, c *arvados.Client, pageSize, bufs int) error {
374         ctx, cancel := context.WithCancel(ctx)
375         defer cancel()
376
377         defer bal.time("get_state", "wall clock time to get current state")()
378         bal.BlockStateMap = NewBlockStateMap()
379
380         dd, err := c.DiscoveryDocument()
381         if err != nil {
382                 return err
383         }
384         bal.DefaultReplication = dd.DefaultCollectionReplication
385         bal.MinMtime = time.Now().UnixNano() - dd.BlobSignatureTTL*1e9
386
387         errs := make(chan error, 1)
388         wg := sync.WaitGroup{}
389
390         // When a device is mounted more than once, we will get its
391         // index only once, and call AddReplicas on all of the mounts.
392         // equivMount keys are the mounts that will be indexed, and
393         // each value is a list of mounts to apply the received index
394         // to.
395         equivMount := map[*KeepMount][]*KeepMount{}
396         // deviceMount maps each device ID to the one mount that will
397         // be indexed for that device.
398         deviceMount := map[string]*KeepMount{}
399         for _, srv := range bal.KeepServices {
400                 for _, mnt := range srv.mounts {
401                         equiv := deviceMount[mnt.UUID]
402                         if equiv == nil {
403                                 equiv = mnt
404                                 deviceMount[mnt.UUID] = equiv
405                         }
406                         equivMount[equiv] = append(equivMount[equiv], mnt)
407                 }
408         }
409
410         // Start one goroutine for each (non-redundant) mount:
411         // retrieve the index, and add the returned blocks to
412         // BlockStateMap.
413         for _, mounts := range equivMount {
414                 wg.Add(1)
415                 go func(mounts []*KeepMount) {
416                         defer wg.Done()
417                         bal.logf("mount %s: retrieve index from %s", mounts[0], mounts[0].KeepService)
418                         idx, err := mounts[0].KeepService.IndexMount(ctx, c, mounts[0].UUID, bal.ChunkPrefix)
419                         if err != nil {
420                                 select {
421                                 case errs <- fmt.Errorf("%s: retrieve index: %v", mounts[0], err):
422                                 default:
423                                 }
424                                 cancel()
425                                 return
426                         }
427                         if len(errs) > 0 {
428                                 // Some other goroutine encountered an
429                                 // error -- any further effort here
430                                 // will be wasted.
431                                 return
432                         }
433                         for _, mount := range mounts {
434                                 bal.logf("%s: add %d entries to map", mount, len(idx))
435                                 bal.BlockStateMap.AddReplicas(mount, idx)
436                                 bal.logf("%s: added %d entries to map at %dx (%d replicas)", mount, len(idx), mount.Replication, len(idx)*mount.Replication)
437                         }
438                         bal.logf("mount %s: index done", mounts[0])
439                 }(mounts)
440         }
441
442         collQ := make(chan arvados.Collection, bufs)
443
444         // Retrieve all collections from the database and send them to
445         // collQ.
446         wg.Add(1)
447         go func() {
448                 defer wg.Done()
449                 err = EachCollection(ctx, bal.DB, c,
450                         func(coll arvados.Collection) error {
451                                 collQ <- coll
452                                 if len(errs) > 0 {
453                                         // some other GetCurrentState
454                                         // error happened: no point
455                                         // getting any more
456                                         // collections.
457                                         return fmt.Errorf("")
458                                 }
459                                 return nil
460                         }, func(done, total int) {
461                                 bal.logf("collections: %d/%d", done, total)
462                         })
463                 close(collQ)
464                 if err != nil {
465                         select {
466                         case errs <- err:
467                         default:
468                         }
469                         cancel()
470                 }
471         }()
472
473         // Parse manifests from collQ and pass the block hashes to
474         // BlockStateMap to track desired replication.
475         for i := 0; i < runtime.NumCPU(); i++ {
476                 wg.Add(1)
477                 go func() {
478                         defer wg.Done()
479                         for coll := range collQ {
480                                 err := bal.addCollection(coll)
481                                 if err != nil || len(errs) > 0 {
482                                         select {
483                                         case errs <- err:
484                                         default:
485                                         }
486                                         cancel()
487                                         continue
488                                 }
489                                 atomic.AddInt64(&bal.collScanned, 1)
490                         }
491                 }()
492         }
493
494         wg.Wait()
495         if len(errs) > 0 {
496                 return <-errs
497         }
498         return nil
499 }
500
501 func (bal *Balancer) addCollection(coll arvados.Collection) error {
502         blkids, err := coll.SizedDigests()
503         if err != nil {
504                 return fmt.Errorf("%v: %v", coll.UUID, err)
505         }
506         repl := bal.DefaultReplication
507         if coll.ReplicationDesired != nil {
508                 repl = *coll.ReplicationDesired
509         }
510         if bal.ChunkPrefix != "" {
511                 // Throw out blocks that don't match the requested
512                 // prefix.  (We save a bit of GC work here by
513                 // preallocating based on each hex digit in
514                 // ChunkPrefix reducing the expected size of the
515                 // filtered set by ~16x.)
516                 filtered := make([]arvados.SizedDigest, 0, len(blkids)>>(4*len(bal.ChunkPrefix)-1))
517                 for _, blkid := range blkids {
518                         if strings.HasPrefix(string(blkid), bal.ChunkPrefix) {
519                                 filtered = append(filtered, blkid)
520                         }
521                 }
522                 blkids = filtered
523         }
524         bal.Logger.Debugf("%v: %d blocks x%d", coll.UUID, len(blkids), repl)
525         // Pass pdh to IncreaseDesired only if LostBlocksFile is being
526         // written -- otherwise it's just a waste of memory.
527         pdh := ""
528         if bal.LostBlocksFile != "" {
529                 pdh = coll.PortableDataHash
530         }
531         bal.BlockStateMap.IncreaseDesired(pdh, coll.StorageClassesDesired, repl, blkids)
532         return nil
533 }
534
535 // ComputeChangeSets compares, for each known block, the current and
536 // desired replication states. If it is possible to get closer to the
537 // desired state by copying or deleting blocks, it adds those changes
538 // to the relevant KeepServices' ChangeSets.
539 //
540 // It does not actually apply any of the computed changes.
541 func (bal *Balancer) ComputeChangeSets() {
542         // This just calls balanceBlock() once for each block, using a
543         // pool of worker goroutines.
544         defer bal.time("changeset_compute", "wall clock time to compute changesets")()
545         bal.setupLookupTables()
546
547         type balanceTask struct {
548                 blkid arvados.SizedDigest
549                 blk   *BlockState
550         }
551         workers := runtime.GOMAXPROCS(-1)
552         todo := make(chan balanceTask, workers)
553         go func() {
554                 bal.BlockStateMap.Apply(func(blkid arvados.SizedDigest, blk *BlockState) {
555                         todo <- balanceTask{
556                                 blkid: blkid,
557                                 blk:   blk,
558                         }
559                 })
560                 close(todo)
561         }()
562         results := make(chan balanceResult, workers)
563         go func() {
564                 var wg sync.WaitGroup
565                 for i := 0; i < workers; i++ {
566                         wg.Add(1)
567                         go func() {
568                                 for work := range todo {
569                                         results <- bal.balanceBlock(work.blkid, work.blk)
570                                 }
571                                 wg.Done()
572                         }()
573                 }
574                 wg.Wait()
575                 close(results)
576         }()
577         bal.collectStatistics(results)
578 }
579
580 func (bal *Balancer) setupLookupTables() {
581         bal.serviceRoots = make(map[string]string)
582         bal.classes = defaultClasses
583         bal.mountsByClass = map[string]map[*KeepMount]bool{"default": {}}
584         bal.mounts = 0
585         for _, srv := range bal.KeepServices {
586                 bal.serviceRoots[srv.UUID] = srv.UUID
587                 for _, mnt := range srv.mounts {
588                         bal.mounts++
589
590                         // All mounts on a read-only service are
591                         // effectively read-only.
592                         mnt.ReadOnly = mnt.ReadOnly || srv.ReadOnly
593
594                         for class := range mnt.StorageClasses {
595                                 if mbc := bal.mountsByClass[class]; mbc == nil {
596                                         bal.classes = append(bal.classes, class)
597                                         bal.mountsByClass[class] = map[*KeepMount]bool{mnt: true}
598                                 } else {
599                                         mbc[mnt] = true
600                                 }
601                         }
602                 }
603         }
604         // Consider classes in lexicographic order to avoid flapping
605         // between balancing runs.  The outcome of the "prefer a mount
606         // we're already planning to use for a different storage
607         // class" case in balanceBlock depends on the order classes
608         // are considered.
609         sort.Strings(bal.classes)
610 }
611
612 const (
613         changeStay = iota
614         changePull
615         changeTrash
616         changeNone
617 )
618
619 var changeName = map[int]string{
620         changeStay:  "stay",
621         changePull:  "pull",
622         changeTrash: "trash",
623         changeNone:  "none",
624 }
625
626 type balancedBlockState struct {
627         needed       int
628         unneeded     int
629         pulling      int
630         unachievable bool
631 }
632
633 type balanceResult struct {
634         blk        *BlockState
635         blkid      arvados.SizedDigest
636         lost       bool
637         blockState balancedBlockState
638         classState map[string]balancedBlockState
639 }
640
641 type slot struct {
642         mnt  *KeepMount // never nil
643         repl *Replica   // replica already stored here (or nil)
644         want bool       // we should pull/leave a replica here
645 }
646
647 // balanceBlock compares current state to desired state for a single
648 // block, and makes the appropriate ChangeSet calls.
649 func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) balanceResult {
650         bal.Logger.Debugf("balanceBlock: %v %+v", blkid, blk)
651
652         // Build a list of all slots (one per mounted volume).
653         slots := make([]slot, 0, bal.mounts)
654         for _, srv := range bal.KeepServices {
655                 for _, mnt := range srv.mounts {
656                         var repl *Replica
657                         for r := range blk.Replicas {
658                                 if blk.Replicas[r].KeepMount == mnt {
659                                         repl = &blk.Replicas[r]
660                                 }
661                         }
662                         // Initial value of "want" is "have, and can't
663                         // delete". These untrashable replicas get
664                         // prioritized when sorting slots: otherwise,
665                         // non-optimal readonly copies would cause us
666                         // to overreplicate.
667                         slots = append(slots, slot{
668                                 mnt:  mnt,
669                                 repl: repl,
670                                 want: repl != nil && mnt.ReadOnly,
671                         })
672                 }
673         }
674
675         uuids := keepclient.NewRootSorter(bal.serviceRoots, string(blkid[:32])).GetSortedRoots()
676         srvRendezvous := make(map[*KeepService]int, len(uuids))
677         for i, uuid := range uuids {
678                 srv := bal.KeepServices[uuid]
679                 srvRendezvous[srv] = i
680         }
681
682         // Below we set underreplicated=true if we find any storage
683         // class that's currently underreplicated -- in that case we
684         // won't want to trash any replicas.
685         underreplicated := false
686
687         unsafeToDelete := make(map[int64]bool, len(slots))
688         for _, class := range bal.classes {
689                 desired := blk.Desired[class]
690                 if desired == 0 {
691                         continue
692                 }
693
694                 // Sort the slots by desirability.
695                 sort.Slice(slots, func(i, j int) bool {
696                         si, sj := slots[i], slots[j]
697                         if classi, classj := bal.mountsByClass[class][si.mnt], bal.mountsByClass[class][sj.mnt]; classi != classj {
698                                 // Prefer a mount that satisfies the
699                                 // desired class.
700                                 return bal.mountsByClass[class][si.mnt]
701                         } else if si.want != sj.want {
702                                 // Prefer a mount that will have a
703                                 // replica no matter what we do here
704                                 // -- either because it already has an
705                                 // untrashable replica, or because we
706                                 // already need it to satisfy a
707                                 // different storage class.
708                                 return si.want
709                         } else if orderi, orderj := srvRendezvous[si.mnt.KeepService], srvRendezvous[sj.mnt.KeepService]; orderi != orderj {
710                                 // Prefer a better rendezvous
711                                 // position.
712                                 return orderi < orderj
713                         } else if repli, replj := si.repl != nil, sj.repl != nil; repli != replj {
714                                 // Prefer a mount that already has a
715                                 // replica.
716                                 return repli
717                         } else {
718                                 // If pull/trash turns out to be
719                                 // needed, distribute the
720                                 // new/remaining replicas uniformly
721                                 // across qualifying mounts on a given
722                                 // server.
723                                 return rendezvousLess(si.mnt.UUID, sj.mnt.UUID, blkid)
724                         }
725                 })
726
727                 // Servers/mounts/devices (with or without existing
728                 // replicas) that are part of the best achievable
729                 // layout for this storage class.
730                 wantSrv := map[*KeepService]bool{}
731                 wantMnt := map[*KeepMount]bool{}
732                 wantDev := map[string]bool{}
733                 // Positions (with existing replicas) that have been
734                 // protected (via unsafeToDelete) to ensure we don't
735                 // reduce replication below desired level when
736                 // trashing replicas that aren't optimal positions for
737                 // any storage class.
738                 protMnt := map[*KeepMount]bool{}
739                 // Replication planned so far (corresponds to wantMnt).
740                 replWant := 0
741                 // Protected replication (corresponds to protMnt).
742                 replProt := 0
743
744                 // trySlot tries using a slot to meet requirements,
745                 // and returns true if all requirements are met.
746                 trySlot := func(i int) bool {
747                         slot := slots[i]
748                         if wantMnt[slot.mnt] || wantDev[slot.mnt.UUID] {
749                                 // Already allocated a replica to this
750                                 // backend device, possibly on a
751                                 // different server.
752                                 return false
753                         }
754                         if replProt < desired && slot.repl != nil && !protMnt[slot.mnt] {
755                                 unsafeToDelete[slot.repl.Mtime] = true
756                                 protMnt[slot.mnt] = true
757                                 replProt += slot.mnt.Replication
758                         }
759                         if replWant < desired && (slot.repl != nil || !slot.mnt.ReadOnly) {
760                                 slots[i].want = true
761                                 wantSrv[slot.mnt.KeepService] = true
762                                 wantMnt[slot.mnt] = true
763                                 wantDev[slot.mnt.UUID] = true
764                                 replWant += slot.mnt.Replication
765                         }
766                         return replProt >= desired && replWant >= desired
767                 }
768
769                 // First try to achieve desired replication without
770                 // using the same server twice.
771                 done := false
772                 for i := 0; i < len(slots) && !done; i++ {
773                         if !wantSrv[slots[i].mnt.KeepService] {
774                                 done = trySlot(i)
775                         }
776                 }
777
778                 // If that didn't suffice, do another pass without the
779                 // "distinct services" restriction. (Achieving the
780                 // desired volume replication on fewer than the
781                 // desired number of services is better than
782                 // underreplicating.)
783                 for i := 0; i < len(slots) && !done; i++ {
784                         done = trySlot(i)
785                 }
786
787                 if !underreplicated {
788                         safe := 0
789                         for _, slot := range slots {
790                                 if slot.repl == nil || !bal.mountsByClass[class][slot.mnt] {
791                                         continue
792                                 }
793                                 if safe += slot.mnt.Replication; safe >= desired {
794                                         break
795                                 }
796                         }
797                         underreplicated = safe < desired
798                 }
799
800                 // Avoid deleting wanted replicas from devices that
801                 // are mounted on multiple servers -- even if they
802                 // haven't already been added to unsafeToDelete
803                 // because the servers report different Mtimes.
804                 for _, slot := range slots {
805                         if slot.repl != nil && wantDev[slot.mnt.UUID] {
806                                 unsafeToDelete[slot.repl.Mtime] = true
807                         }
808                 }
809         }
810
811         // TODO: If multiple replicas are trashable, prefer the oldest
812         // replica that doesn't have a timestamp collision with
813         // others.
814
815         for i, slot := range slots {
816                 // Don't trash (1) any replicas of an underreplicated
817                 // block, even if they're in the wrong positions, or
818                 // (2) any replicas whose Mtimes are identical to
819                 // needed replicas (in case we're really seeing the
820                 // same copy via different mounts).
821                 if slot.repl != nil && (underreplicated || unsafeToDelete[slot.repl.Mtime]) {
822                         slots[i].want = true
823                 }
824         }
825
826         classState := make(map[string]balancedBlockState, len(bal.classes))
827         for _, class := range bal.classes {
828                 classState[class] = computeBlockState(slots, bal.mountsByClass[class], len(blk.Replicas), blk.Desired[class])
829         }
830         blockState := computeBlockState(slots, nil, len(blk.Replicas), 0)
831
832         var lost bool
833         var changes []string
834         for _, slot := range slots {
835                 // TODO: request a Touch if Mtime is duplicated.
836                 var change int
837                 switch {
838                 case !slot.want && slot.repl != nil && slot.repl.Mtime < bal.MinMtime:
839                         slot.mnt.KeepService.AddTrash(Trash{
840                                 SizedDigest: blkid,
841                                 Mtime:       slot.repl.Mtime,
842                                 From:        slot.mnt,
843                         })
844                         change = changeTrash
845                 case slot.repl == nil && slot.want && len(blk.Replicas) == 0:
846                         lost = true
847                         change = changeNone
848                 case slot.repl == nil && slot.want && !slot.mnt.ReadOnly:
849                         slot.mnt.KeepService.AddPull(Pull{
850                                 SizedDigest: blkid,
851                                 From:        blk.Replicas[0].KeepMount.KeepService,
852                                 To:          slot.mnt,
853                         })
854                         change = changePull
855                 case slot.repl != nil:
856                         change = changeStay
857                 default:
858                         change = changeNone
859                 }
860                 if bal.Dumper != nil {
861                         var mtime int64
862                         if slot.repl != nil {
863                                 mtime = slot.repl.Mtime
864                         }
865                         srv := slot.mnt.KeepService
866                         changes = append(changes, fmt.Sprintf("%s:%d/%s=%s,%d", srv.ServiceHost, srv.ServicePort, slot.mnt.UUID, changeName[change], mtime))
867                 }
868         }
869         if bal.Dumper != nil {
870                 bal.Dumper.Printf("%s refs=%d needed=%d unneeded=%d pulling=%v %v %v", blkid, blk.RefCount, blockState.needed, blockState.unneeded, blockState.pulling, blk.Desired, changes)
871         }
872         return balanceResult{
873                 blk:        blk,
874                 blkid:      blkid,
875                 lost:       lost,
876                 blockState: blockState,
877                 classState: classState,
878         }
879 }
880
881 func computeBlockState(slots []slot, onlyCount map[*KeepMount]bool, have, needRepl int) (bbs balancedBlockState) {
882         repl := 0
883         countedDev := map[string]bool{}
884         for _, slot := range slots {
885                 if onlyCount != nil && !onlyCount[slot.mnt] {
886                         continue
887                 }
888                 if countedDev[slot.mnt.UUID] {
889                         continue
890                 }
891                 switch {
892                 case slot.repl != nil && slot.want:
893                         bbs.needed++
894                         repl += slot.mnt.Replication
895                 case slot.repl != nil && !slot.want:
896                         bbs.unneeded++
897                         repl += slot.mnt.Replication
898                 case slot.repl == nil && slot.want && have > 0:
899                         bbs.pulling++
900                         repl += slot.mnt.Replication
901                 }
902                 countedDev[slot.mnt.UUID] = true
903         }
904         if repl < needRepl {
905                 bbs.unachievable = true
906         }
907         return
908 }
909
910 type blocksNBytes struct {
911         replicas int
912         blocks   int
913         bytes    int64
914 }
915
916 func (bb blocksNBytes) String() string {
917         return fmt.Sprintf("%d replicas (%d blocks, %d bytes)", bb.replicas, bb.blocks, bb.bytes)
918 }
919
920 type replicationStats struct {
921         needed       blocksNBytes
922         unneeded     blocksNBytes
923         pulling      blocksNBytes
924         unachievable blocksNBytes
925 }
926
927 type balancerStats struct {
928         lost          blocksNBytes
929         overrep       blocksNBytes
930         unref         blocksNBytes
931         garbage       blocksNBytes
932         underrep      blocksNBytes
933         unachievable  blocksNBytes
934         justright     blocksNBytes
935         desired       blocksNBytes
936         current       blocksNBytes
937         pulls         int
938         trashes       int
939         replHistogram []int
940         classStats    map[string]replicationStats
941
942         // collectionBytes / collectionBlockBytes = deduplication ratio
943         collectionBytes      int64 // sum(bytes in referenced blocks) across all collections
944         collectionBlockBytes int64 // sum(block size) across all blocks referenced by collections
945         collectionBlockRefs  int64 // sum(number of blocks referenced) across all collections
946         collectionBlocks     int64 // number of blocks referenced by any collection
947 }
948
949 func (s *balancerStats) dedupByteRatio() float64 {
950         if s.collectionBlockBytes == 0 {
951                 return 0
952         }
953         return float64(s.collectionBytes) / float64(s.collectionBlockBytes)
954 }
955
956 func (s *balancerStats) dedupBlockRatio() float64 {
957         if s.collectionBlocks == 0 {
958                 return 0
959         }
960         return float64(s.collectionBlockRefs) / float64(s.collectionBlocks)
961 }
962
963 func (bal *Balancer) collectStatistics(results <-chan balanceResult) {
964         var s balancerStats
965         s.replHistogram = make([]int, 2)
966         s.classStats = make(map[string]replicationStats, len(bal.classes))
967         for result := range results {
968                 bytes := result.blkid.Size()
969
970                 if rc := int64(result.blk.RefCount); rc > 0 {
971                         s.collectionBytes += rc * bytes
972                         s.collectionBlockBytes += bytes
973                         s.collectionBlockRefs += rc
974                         s.collectionBlocks++
975                 }
976
977                 for class, state := range result.classState {
978                         cs := s.classStats[class]
979                         if state.unachievable {
980                                 cs.unachievable.replicas++
981                                 cs.unachievable.blocks++
982                                 cs.unachievable.bytes += bytes
983                         }
984                         if state.needed > 0 {
985                                 cs.needed.replicas += state.needed
986                                 cs.needed.blocks++
987                                 cs.needed.bytes += bytes * int64(state.needed)
988                         }
989                         if state.unneeded > 0 {
990                                 cs.unneeded.replicas += state.unneeded
991                                 cs.unneeded.blocks++
992                                 cs.unneeded.bytes += bytes * int64(state.unneeded)
993                         }
994                         if state.pulling > 0 {
995                                 cs.pulling.replicas += state.pulling
996                                 cs.pulling.blocks++
997                                 cs.pulling.bytes += bytes * int64(state.pulling)
998                         }
999                         s.classStats[class] = cs
1000                 }
1001
1002                 bs := result.blockState
1003                 switch {
1004                 case result.lost:
1005                         s.lost.replicas++
1006                         s.lost.blocks++
1007                         s.lost.bytes += bytes
1008                         fmt.Fprintf(bal.lostBlocks, "%s", strings.SplitN(string(result.blkid), "+", 2)[0])
1009                         for pdh := range result.blk.Refs {
1010                                 fmt.Fprintf(bal.lostBlocks, " %s", pdh)
1011                         }
1012                         fmt.Fprint(bal.lostBlocks, "\n")
1013                 case bs.pulling > 0:
1014                         s.underrep.replicas += bs.pulling
1015                         s.underrep.blocks++
1016                         s.underrep.bytes += bytes * int64(bs.pulling)
1017                 case bs.unachievable:
1018                         s.underrep.replicas++
1019                         s.underrep.blocks++
1020                         s.underrep.bytes += bytes
1021                 case bs.unneeded > 0 && bs.needed == 0:
1022                         // Count as "garbage" if all replicas are old
1023                         // enough to trash, otherwise count as
1024                         // "unref".
1025                         counter := &s.garbage
1026                         for _, r := range result.blk.Replicas {
1027                                 if r.Mtime >= bal.MinMtime {
1028                                         counter = &s.unref
1029                                         break
1030                                 }
1031                         }
1032                         counter.replicas += bs.unneeded
1033                         counter.blocks++
1034                         counter.bytes += bytes * int64(bs.unneeded)
1035                 case bs.unneeded > 0:
1036                         s.overrep.replicas += bs.unneeded
1037                         s.overrep.blocks++
1038                         s.overrep.bytes += bytes * int64(bs.unneeded)
1039                 default:
1040                         s.justright.replicas += bs.needed
1041                         s.justright.blocks++
1042                         s.justright.bytes += bytes * int64(bs.needed)
1043                 }
1044
1045                 if bs.needed > 0 {
1046                         s.desired.replicas += bs.needed
1047                         s.desired.blocks++
1048                         s.desired.bytes += bytes * int64(bs.needed)
1049                 }
1050                 if bs.needed+bs.unneeded > 0 {
1051                         s.current.replicas += bs.needed + bs.unneeded
1052                         s.current.blocks++
1053                         s.current.bytes += bytes * int64(bs.needed+bs.unneeded)
1054                 }
1055
1056                 for len(s.replHistogram) <= bs.needed+bs.unneeded {
1057                         s.replHistogram = append(s.replHistogram, 0)
1058                 }
1059                 s.replHistogram[bs.needed+bs.unneeded]++
1060         }
1061         for _, srv := range bal.KeepServices {
1062                 s.pulls += len(srv.ChangeSet.Pulls)
1063                 s.trashes += len(srv.ChangeSet.Trashes)
1064         }
1065         bal.stats = s
1066         bal.Metrics.UpdateStats(s)
1067 }
1068
1069 // PrintStatistics writes statistics about the computed changes to
1070 // bal.Logger. It should not be called until ComputeChangeSets has
1071 // finished.
1072 func (bal *Balancer) PrintStatistics() {
1073         bal.logf("===")
1074         bal.logf("%s lost (0=have<want)", bal.stats.lost)
1075         bal.logf("%s underreplicated (0<have<want)", bal.stats.underrep)
1076         bal.logf("%s just right (have=want)", bal.stats.justright)
1077         bal.logf("%s overreplicated (have>want>0)", bal.stats.overrep)
1078         bal.logf("%s unreferenced (have>want=0, new)", bal.stats.unref)
1079         bal.logf("%s garbage (have>want=0, old)", bal.stats.garbage)
1080         for _, class := range bal.classes {
1081                 cs := bal.stats.classStats[class]
1082                 bal.logf("===")
1083                 bal.logf("storage class %q: %s needed", class, cs.needed)
1084                 bal.logf("storage class %q: %s unneeded", class, cs.unneeded)
1085                 bal.logf("storage class %q: %s pulling", class, cs.pulling)
1086                 bal.logf("storage class %q: %s unachievable", class, cs.unachievable)
1087         }
1088         bal.logf("===")
1089         bal.logf("%s total commitment (excluding unreferenced)", bal.stats.desired)
1090         bal.logf("%s total usage", bal.stats.current)
1091         bal.logf("===")
1092         for _, srv := range bal.KeepServices {
1093                 bal.logf("%s: %v\n", srv, srv.ChangeSet)
1094         }
1095         bal.logf("===")
1096         bal.printHistogram(60)
1097         bal.logf("===")
1098 }
1099
1100 func (bal *Balancer) printHistogram(hashColumns int) {
1101         bal.logf("Replication level distribution:")
1102         maxCount := 0
1103         for _, count := range bal.stats.replHistogram {
1104                 if maxCount < count {
1105                         maxCount = count
1106                 }
1107         }
1108         hashes := strings.Repeat("#", hashColumns)
1109         countWidth := 1 + int(math.Log10(float64(maxCount+1)))
1110         scaleCount := 10 * float64(hashColumns) / math.Floor(1+10*math.Log10(float64(maxCount+1)))
1111         for repl, count := range bal.stats.replHistogram {
1112                 nHashes := int(scaleCount * math.Log10(float64(count+1)))
1113                 bal.logf("%2d: %*d %s", repl, countWidth, count, hashes[:nHashes])
1114         }
1115 }
1116
1117 // CheckSanityLate checks for configuration and runtime errors after
1118 // GetCurrentState() and ComputeChangeSets() have finished.
1119 //
1120 // If it returns an error, it is dangerous to run any Commit methods.
1121 func (bal *Balancer) CheckSanityLate() error {
1122         if bal.errors != nil {
1123                 for _, err := range bal.errors {
1124                         bal.logf("deferred error: %v", err)
1125                 }
1126                 return fmt.Errorf("cannot proceed safely after deferred errors")
1127         }
1128
1129         if bal.collScanned == 0 {
1130                 return fmt.Errorf("received zero collections")
1131         }
1132
1133         anyDesired := false
1134         bal.BlockStateMap.Apply(func(_ arvados.SizedDigest, blk *BlockState) {
1135                 for _, desired := range blk.Desired {
1136                         if desired > 0 {
1137                                 anyDesired = true
1138                                 break
1139                         }
1140                 }
1141         })
1142         if !anyDesired {
1143                 return fmt.Errorf("zero blocks have desired replication>0")
1144         }
1145
1146         if dr := bal.DefaultReplication; dr < 1 {
1147                 return fmt.Errorf("Default replication (%d) is less than 1", dr)
1148         }
1149
1150         // TODO: no two services have identical indexes
1151         // TODO: no collisions (same md5, different size)
1152         return nil
1153 }
1154
1155 // CommitPulls sends the computed lists of pull requests to the
1156 // keepstore servers. This has the effect of increasing replication of
1157 // existing blocks that are either underreplicated or poorly
1158 // distributed according to rendezvous hashing.
1159 func (bal *Balancer) CommitPulls(ctx context.Context, c *arvados.Client) error {
1160         defer bal.time("send_pull_lists", "wall clock time to send pull lists")()
1161         return bal.commitAsync(c, "send pull list",
1162                 func(srv *KeepService) error {
1163                         return srv.CommitPulls(ctx, c)
1164                 })
1165 }
1166
1167 // CommitTrash sends the computed lists of trash requests to the
1168 // keepstore servers. This has the effect of deleting blocks that are
1169 // overreplicated or unreferenced.
1170 func (bal *Balancer) CommitTrash(ctx context.Context, c *arvados.Client) error {
1171         defer bal.time("send_trash_lists", "wall clock time to send trash lists")()
1172         return bal.commitAsync(c, "send trash list",
1173                 func(srv *KeepService) error {
1174                         return srv.CommitTrash(ctx, c)
1175                 })
1176 }
1177
1178 func (bal *Balancer) commitAsync(c *arvados.Client, label string, f func(srv *KeepService) error) error {
1179         errs := make(chan error)
1180         for _, srv := range bal.KeepServices {
1181                 go func(srv *KeepService) {
1182                         var err error
1183                         defer func() { errs <- err }()
1184                         label := fmt.Sprintf("%s: %v", srv, label)
1185                         err = f(srv)
1186                         if err != nil {
1187                                 err = fmt.Errorf("%s: %v", label, err)
1188                         }
1189                 }(srv)
1190         }
1191         var lastErr error
1192         for range bal.KeepServices {
1193                 if err := <-errs; err != nil {
1194                         bal.logf("%v", err)
1195                         lastErr = err
1196                 }
1197         }
1198         close(errs)
1199         return lastErr
1200 }
1201
1202 func (bal *Balancer) logf(f string, args ...interface{}) {
1203         if bal.Logger != nil {
1204                 bal.Logger.Printf(f, args...)
1205         }
1206 }
1207
1208 func (bal *Balancer) time(name, help string) func() {
1209         observer := bal.Metrics.DurationObserver(name+"_seconds", help)
1210         t0 := time.Now()
1211         bal.Logger.Printf("%s: start", name)
1212         return func() {
1213                 dur := time.Since(t0)
1214                 observer.Observe(dur.Seconds())
1215                 bal.Logger.Printf("%s: took %vs", name, dur.Seconds())
1216         }
1217 }
1218
1219 // Log current memory usage: once now, at least once every 10 minutes,
1220 // and when memory grows by 40% since the last log. Stop when ctx is
1221 // canceled.
1222 func (bal *Balancer) reportMemorySize(ctx context.Context) {
1223         buf, _ := os.ReadFile("/proc/self/smaps")
1224         m := regexp.MustCompile(`\nKernelPageSize:\s*(\d+) kB\n`).FindSubmatch(buf)
1225         var pagesize int64
1226         if len(m) == 2 {
1227                 pagesize, _ = strconv.ParseInt(string(m[1]), 10, 64)
1228                 pagesize <<= 10
1229         }
1230         if pagesize == 0 {
1231                 bal.logf("cannot log OS-reported memory size: failed to parse KernelPageSize from /proc/self/smaps")
1232         }
1233         osstats := func() string {
1234                 if pagesize == 0 {
1235                         return ""
1236                 }
1237                 buf, _ := os.ReadFile("/proc/self/statm")
1238                 fields := strings.Split(string(buf), " ")
1239                 if len(fields) < 2 {
1240                         return ""
1241                 }
1242                 virt, _ := strconv.ParseInt(fields[0], 10, 64)
1243                 virt *= pagesize
1244                 res, _ := strconv.ParseInt(fields[1], 10, 64)
1245                 res *= pagesize
1246                 if virt == 0 || res == 0 {
1247                         return ""
1248                 }
1249                 return fmt.Sprintf(" virt %d res %d", virt, res)
1250         }
1251
1252         var nextTime time.Time
1253         var nextMem uint64
1254         const maxInterval = time.Minute * 10
1255         const maxIncrease = 1.4
1256
1257         ticker := time.NewTicker(time.Second)
1258         defer ticker.Stop()
1259         var memstats runtime.MemStats
1260         for ctx.Err() == nil {
1261                 now := time.Now()
1262                 runtime.ReadMemStats(&memstats)
1263                 mem := memstats.StackInuse + memstats.HeapInuse
1264                 if now.After(nextTime) || mem >= nextMem {
1265                         bal.logf("heap %d stack %d heapalloc %d%s", memstats.HeapInuse, memstats.StackInuse, memstats.HeapAlloc, osstats())
1266                         nextMem = uint64(float64(mem) * maxIncrease)
1267                         nextTime = now.Add(maxInterval)
1268                 }
1269                 <-ticker.C
1270         }
1271 }
1272
1273 // Rendezvous hash sort function. Less efficient than sorting on
1274 // precomputed rendezvous hashes, but also rarely used.
1275 func rendezvousLess(i, j string, blkid arvados.SizedDigest) bool {
1276         a := md5.Sum([]byte(string(blkid[:32]) + i))
1277         b := md5.Sum([]byte(string(blkid[:32]) + j))
1278         return bytes.Compare(a[:], b[:]) < 0
1279 }