19923: Log keep-balance memory usage.
[arvados.git] / services / keep-balance / balance.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package keepbalance
6
7 import (
8         "bytes"
9         "context"
10         "crypto/md5"
11         "errors"
12         "fmt"
13         "io"
14         "io/ioutil"
15         "log"
16         "math"
17         "os"
18         "regexp"
19         "runtime"
20         "sort"
21         "strconv"
22         "strings"
23         "sync"
24         "sync/atomic"
25         "syscall"
26         "time"
27
28         "git.arvados.org/arvados.git/lib/controller/dblock"
29         "git.arvados.org/arvados.git/sdk/go/arvados"
30         "git.arvados.org/arvados.git/sdk/go/keepclient"
31         "github.com/jmoiron/sqlx"
32         "github.com/sirupsen/logrus"
33 )
34
35 // Balancer compares the contents of keepstore servers with the
36 // collections stored in Arvados, and issues pull/trash requests
37 // needed to get (closer to) the optimal data layout.
38 //
39 // In the optimal data layout: every data block referenced by a
40 // collection is replicated at least as many times as desired by the
41 // collection; there are no unreferenced data blocks older than
42 // BlobSignatureTTL; and all N existing replicas of a given data block
43 // are in the N best positions in rendezvous probe order.
44 type Balancer struct {
45         DB      *sqlx.DB
46         Logger  logrus.FieldLogger
47         Dumper  logrus.FieldLogger
48         Metrics *metrics
49
50         ChunkPrefix    string
51         LostBlocksFile string
52
53         *BlockStateMap
54         KeepServices       map[string]*KeepService
55         DefaultReplication int
56         MinMtime           int64
57
58         classes       []string
59         mounts        int
60         mountsByClass map[string]map[*KeepMount]bool
61         collScanned   int64
62         serviceRoots  map[string]string
63         errors        []error
64         stats         balancerStats
65         mutex         sync.Mutex
66         lostBlocks    io.Writer
67 }
68
69 // Run performs a balance operation using the given config and
70 // runOptions, and returns RunOptions suitable for passing to a
71 // subsequent balance operation.
72 //
73 // Run should only be called once on a given Balancer object.
74 func (bal *Balancer) Run(ctx context.Context, client *arvados.Client, cluster *arvados.Cluster, runOptions RunOptions) (nextRunOptions RunOptions, err error) {
75         nextRunOptions = runOptions
76
77         bal.logf("acquiring active lock")
78         if !dblock.KeepBalanceActive.Lock(ctx, func(context.Context) (*sqlx.DB, error) { return bal.DB, nil }) {
79                 // context canceled
80                 return
81         }
82         defer dblock.KeepBalanceActive.Unlock()
83
84         defer bal.time("sweep", "wall clock time to run one full sweep")()
85
86         ctx, cancel := context.WithDeadline(ctx, time.Now().Add(cluster.Collections.BalanceTimeout.Duration()))
87         defer cancel()
88
89         go bal.reportMemorySize(ctx)
90
91         var lbFile *os.File
92         if bal.LostBlocksFile != "" {
93                 tmpfn := bal.LostBlocksFile + ".tmp"
94                 lbFile, err = os.OpenFile(tmpfn, os.O_CREATE|os.O_WRONLY, 0777)
95                 if err != nil {
96                         return
97                 }
98                 defer lbFile.Close()
99                 err = syscall.Flock(int(lbFile.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
100                 if err != nil {
101                         return
102                 }
103                 defer func() {
104                         // Remove the tempfile only if we didn't get
105                         // as far as successfully renaming it.
106                         if lbFile != nil {
107                                 os.Remove(tmpfn)
108                         }
109                 }()
110                 bal.lostBlocks = lbFile
111         } else {
112                 bal.lostBlocks = ioutil.Discard
113         }
114
115         err = bal.DiscoverKeepServices(client)
116         if err != nil {
117                 return
118         }
119
120         for _, srv := range bal.KeepServices {
121                 err = srv.discoverMounts(client)
122                 if err != nil {
123                         return
124                 }
125         }
126         bal.cleanupMounts()
127
128         if err = bal.CheckSanityEarly(client); err != nil {
129                 return
130         }
131
132         // On a big site, indexing and sending trash/pull lists can
133         // take much longer than the usual 5 minute client
134         // timeout. From here on, we rely on the context deadline
135         // instead, aborting the entire operation if any part takes
136         // too long.
137         client.Timeout = 0
138
139         rs := bal.rendezvousState()
140         if runOptions.CommitTrash && rs != runOptions.SafeRendezvousState {
141                 if runOptions.SafeRendezvousState != "" {
142                         bal.logf("notice: KeepServices list has changed since last run")
143                 }
144                 bal.logf("clearing existing trash lists, in case the new rendezvous order differs from previous run")
145                 if err = bal.ClearTrashLists(ctx, client); err != nil {
146                         return
147                 }
148                 // The current rendezvous state becomes "safe" (i.e.,
149                 // OK to compute changes for that state without
150                 // clearing existing trash lists) only now, after we
151                 // succeed in clearing existing trash lists.
152                 nextRunOptions.SafeRendezvousState = rs
153         }
154
155         if err = bal.GetCurrentState(ctx, client, cluster.Collections.BalanceCollectionBatch, cluster.Collections.BalanceCollectionBuffers); err != nil {
156                 return
157         }
158         bal.ComputeChangeSets()
159         time.Sleep(time.Second)
160         bal.PrintStatistics()
161         if err = bal.CheckSanityLate(); err != nil {
162                 return
163         }
164         if lbFile != nil {
165                 err = lbFile.Sync()
166                 if err != nil {
167                         return
168                 }
169                 err = os.Rename(bal.LostBlocksFile+".tmp", bal.LostBlocksFile)
170                 if err != nil {
171                         return
172                 }
173                 lbFile = nil
174         }
175         if runOptions.CommitPulls {
176                 err = bal.CommitPulls(ctx, client)
177                 if err != nil {
178                         // Skip trash if we can't pull. (Too cautious?)
179                         return
180                 }
181         }
182         if runOptions.CommitTrash {
183                 err = bal.CommitTrash(ctx, client)
184                 if err != nil {
185                         return
186                 }
187         }
188         if runOptions.CommitConfirmedFields {
189                 err = bal.updateCollections(ctx, client, cluster)
190                 if err != nil {
191                         return
192                 }
193         }
194         return
195 }
196
197 // SetKeepServices sets the list of KeepServices to operate on.
198 func (bal *Balancer) SetKeepServices(srvList arvados.KeepServiceList) error {
199         bal.KeepServices = make(map[string]*KeepService)
200         for _, srv := range srvList.Items {
201                 bal.KeepServices[srv.UUID] = &KeepService{
202                         KeepService: srv,
203                         ChangeSet:   &ChangeSet{},
204                 }
205         }
206         return nil
207 }
208
209 // DiscoverKeepServices sets the list of KeepServices by calling the
210 // API to get a list of all services, and selecting the ones whose
211 // ServiceType is "disk"
212 func (bal *Balancer) DiscoverKeepServices(c *arvados.Client) error {
213         bal.KeepServices = make(map[string]*KeepService)
214         return c.EachKeepService(func(srv arvados.KeepService) error {
215                 if srv.ServiceType == "disk" {
216                         bal.KeepServices[srv.UUID] = &KeepService{
217                                 KeepService: srv,
218                                 ChangeSet:   &ChangeSet{},
219                         }
220                 } else {
221                         bal.logf("skipping %v with service type %q", srv.UUID, srv.ServiceType)
222                 }
223                 return nil
224         })
225 }
226
227 func (bal *Balancer) cleanupMounts() {
228         rwdev := map[string]*KeepService{}
229         for _, srv := range bal.KeepServices {
230                 for _, mnt := range srv.mounts {
231                         if !mnt.ReadOnly {
232                                 rwdev[mnt.UUID] = srv
233                         }
234                 }
235         }
236         // Drop the readonly mounts whose device is mounted RW
237         // elsewhere.
238         for _, srv := range bal.KeepServices {
239                 var dedup []*KeepMount
240                 for _, mnt := range srv.mounts {
241                         if mnt.ReadOnly && rwdev[mnt.UUID] != nil {
242                                 bal.logf("skipping srv %s readonly mount %q because same volume is mounted read-write on srv %s", srv, mnt.UUID, rwdev[mnt.UUID])
243                         } else {
244                                 dedup = append(dedup, mnt)
245                         }
246                 }
247                 srv.mounts = dedup
248         }
249         for _, srv := range bal.KeepServices {
250                 for _, mnt := range srv.mounts {
251                         if mnt.Replication <= 0 {
252                                 log.Printf("%s: mount %s reports replication=%d, using replication=1", srv, mnt.UUID, mnt.Replication)
253                                 mnt.Replication = 1
254                         }
255                 }
256         }
257 }
258
259 // CheckSanityEarly checks for configuration and runtime errors that
260 // can be detected before GetCurrentState() and ComputeChangeSets()
261 // are called.
262 //
263 // If it returns an error, it is pointless to run GetCurrentState or
264 // ComputeChangeSets: after doing so, the statistics would be
265 // meaningless and it would be dangerous to run any Commit methods.
266 func (bal *Balancer) CheckSanityEarly(c *arvados.Client) error {
267         u, err := c.CurrentUser()
268         if err != nil {
269                 return fmt.Errorf("CurrentUser(): %v", err)
270         }
271         if !u.IsActive || !u.IsAdmin {
272                 return fmt.Errorf("current user (%s) is not an active admin user", u.UUID)
273         }
274         for _, srv := range bal.KeepServices {
275                 if srv.ServiceType == "proxy" {
276                         return fmt.Errorf("config error: %s: proxy servers cannot be balanced", srv)
277                 }
278         }
279
280         mountProblem := false
281         type deviceMount struct {
282                 srv *KeepService
283                 mnt *KeepMount
284         }
285         deviceMounted := map[string]deviceMount{} // DeviceID -> mount
286         for _, srv := range bal.KeepServices {
287                 for _, mnt := range srv.mounts {
288                         if first, dup := deviceMounted[mnt.DeviceID]; dup && first.mnt.UUID != mnt.UUID && mnt.DeviceID != "" {
289                                 bal.logf("config error: device %s is mounted with multiple volume UUIDs: %s on %s, and %s on %s",
290                                         mnt.DeviceID,
291                                         first.mnt.UUID, first.srv,
292                                         mnt.UUID, srv)
293                                 mountProblem = true
294                                 continue
295                         }
296                         deviceMounted[mnt.DeviceID] = deviceMount{srv, mnt}
297                 }
298         }
299         if mountProblem {
300                 return errors.New("cannot continue with config errors (see above)")
301         }
302
303         var checkPage arvados.CollectionList
304         if err = c.RequestAndDecode(&checkPage, "GET", "arvados/v1/collections", nil, arvados.ResourceListParams{
305                 Limit:              new(int),
306                 Count:              "exact",
307                 IncludeTrash:       true,
308                 IncludeOldVersions: true,
309                 Filters: []arvados.Filter{{
310                         Attr:     "modified_at",
311                         Operator: "=",
312                         Operand:  nil,
313                 }},
314         }); err != nil {
315                 return err
316         } else if n := checkPage.ItemsAvailable; n > 0 {
317                 return fmt.Errorf("%d collections exist with null modified_at; cannot fetch reliably", n)
318         }
319
320         return nil
321 }
322
323 // rendezvousState returns a fingerprint (e.g., a sorted list of
324 // UUID+host+port) of the current set of keep services.
325 func (bal *Balancer) rendezvousState() string {
326         srvs := make([]string, 0, len(bal.KeepServices))
327         for _, srv := range bal.KeepServices {
328                 srvs = append(srvs, srv.String())
329         }
330         sort.Strings(srvs)
331         return strings.Join(srvs, "; ")
332 }
333
334 // ClearTrashLists sends an empty trash list to each keep
335 // service. Calling this before GetCurrentState avoids races.
336 //
337 // When a block appears in an index, we assume that replica will still
338 // exist after we delete other replicas on other servers. However,
339 // it's possible that a previous rebalancing operation made different
340 // decisions (e.g., servers were added/removed, and rendezvous order
341 // changed). In this case, the replica might already be on that
342 // server's trash list, and it might be deleted before we send a
343 // replacement trash list.
344 //
345 // We avoid this problem if we clear all trash lists before getting
346 // indexes. (We also assume there is only one rebalancing process
347 // running at a time.)
348 func (bal *Balancer) ClearTrashLists(ctx context.Context, c *arvados.Client) error {
349         for _, srv := range bal.KeepServices {
350                 srv.ChangeSet = &ChangeSet{}
351         }
352         return bal.CommitTrash(ctx, c)
353 }
354
355 // GetCurrentState determines the current replication state, and the
356 // desired replication level, for every block that is either
357 // retrievable or referenced.
358 //
359 // It determines the current replication state by reading the block index
360 // from every known Keep service.
361 //
362 // It determines the desired replication level by retrieving all
363 // collection manifests in the database (API server).
364 //
365 // It encodes the resulting information in BlockStateMap.
366 func (bal *Balancer) GetCurrentState(ctx context.Context, c *arvados.Client, pageSize, bufs int) error {
367         ctx, cancel := context.WithCancel(ctx)
368         defer cancel()
369
370         defer bal.time("get_state", "wall clock time to get current state")()
371         bal.BlockStateMap = NewBlockStateMap()
372
373         dd, err := c.DiscoveryDocument()
374         if err != nil {
375                 return err
376         }
377         bal.DefaultReplication = dd.DefaultCollectionReplication
378         bal.MinMtime = time.Now().UnixNano() - dd.BlobSignatureTTL*1e9
379
380         errs := make(chan error, 1)
381         wg := sync.WaitGroup{}
382
383         // When a device is mounted more than once, we will get its
384         // index only once, and call AddReplicas on all of the mounts.
385         // equivMount keys are the mounts that will be indexed, and
386         // each value is a list of mounts to apply the received index
387         // to.
388         equivMount := map[*KeepMount][]*KeepMount{}
389         // deviceMount maps each device ID to the one mount that will
390         // be indexed for that device.
391         deviceMount := map[string]*KeepMount{}
392         for _, srv := range bal.KeepServices {
393                 for _, mnt := range srv.mounts {
394                         equiv := deviceMount[mnt.UUID]
395                         if equiv == nil {
396                                 equiv = mnt
397                                 deviceMount[mnt.UUID] = equiv
398                         }
399                         equivMount[equiv] = append(equivMount[equiv], mnt)
400                 }
401         }
402
403         // Start one goroutine for each (non-redundant) mount:
404         // retrieve the index, and add the returned blocks to
405         // BlockStateMap.
406         for _, mounts := range equivMount {
407                 wg.Add(1)
408                 go func(mounts []*KeepMount) {
409                         defer wg.Done()
410                         bal.logf("mount %s: retrieve index from %s", mounts[0], mounts[0].KeepService)
411                         idx, err := mounts[0].KeepService.IndexMount(ctx, c, mounts[0].UUID, bal.ChunkPrefix)
412                         if err != nil {
413                                 select {
414                                 case errs <- fmt.Errorf("%s: retrieve index: %v", mounts[0], err):
415                                 default:
416                                 }
417                                 cancel()
418                                 return
419                         }
420                         if len(errs) > 0 {
421                                 // Some other goroutine encountered an
422                                 // error -- any further effort here
423                                 // will be wasted.
424                                 return
425                         }
426                         for _, mount := range mounts {
427                                 bal.logf("%s: add %d entries to map", mount, len(idx))
428                                 bal.BlockStateMap.AddReplicas(mount, idx)
429                                 bal.logf("%s: added %d entries to map at %dx (%d replicas)", mount, len(idx), mount.Replication, len(idx)*mount.Replication)
430                         }
431                         bal.logf("mount %s: index done", mounts[0])
432                 }(mounts)
433         }
434
435         collQ := make(chan arvados.Collection, bufs)
436
437         // Retrieve all collections from the database and send them to
438         // collQ.
439         wg.Add(1)
440         go func() {
441                 defer wg.Done()
442                 err = EachCollection(ctx, bal.DB, c,
443                         func(coll arvados.Collection) error {
444                                 collQ <- coll
445                                 if len(errs) > 0 {
446                                         // some other GetCurrentState
447                                         // error happened: no point
448                                         // getting any more
449                                         // collections.
450                                         return fmt.Errorf("")
451                                 }
452                                 return nil
453                         }, func(done, total int) {
454                                 bal.logf("collections: %d/%d", done, total)
455                         })
456                 close(collQ)
457                 if err != nil {
458                         select {
459                         case errs <- err:
460                         default:
461                         }
462                         cancel()
463                 }
464         }()
465
466         // Parse manifests from collQ and pass the block hashes to
467         // BlockStateMap to track desired replication.
468         for i := 0; i < runtime.NumCPU(); i++ {
469                 wg.Add(1)
470                 go func() {
471                         defer wg.Done()
472                         for coll := range collQ {
473                                 err := bal.addCollection(coll)
474                                 if err != nil || len(errs) > 0 {
475                                         select {
476                                         case errs <- err:
477                                         default:
478                                         }
479                                         cancel()
480                                         continue
481                                 }
482                                 atomic.AddInt64(&bal.collScanned, 1)
483                         }
484                 }()
485         }
486
487         wg.Wait()
488         if len(errs) > 0 {
489                 return <-errs
490         }
491         return nil
492 }
493
494 func (bal *Balancer) addCollection(coll arvados.Collection) error {
495         blkids, err := coll.SizedDigests()
496         if err != nil {
497                 return fmt.Errorf("%v: %v", coll.UUID, err)
498         }
499         repl := bal.DefaultReplication
500         if coll.ReplicationDesired != nil {
501                 repl = *coll.ReplicationDesired
502         }
503         if bal.ChunkPrefix != "" {
504                 // Throw out blocks that don't match the requested
505                 // prefix.  (We save a bit of GC work here by
506                 // preallocating based on each hex digit in
507                 // ChunkPrefix reducing the expected size of the
508                 // filtered set by ~16x.)
509                 filtered := make([]arvados.SizedDigest, 0, len(blkids)>>(4*len(bal.ChunkPrefix)-1))
510                 for _, blkid := range blkids {
511                         if strings.HasPrefix(string(blkid), bal.ChunkPrefix) {
512                                 filtered = append(filtered, blkid)
513                         }
514                 }
515                 blkids = filtered
516         }
517         bal.Logger.Debugf("%v: %d blocks x%d", coll.UUID, len(blkids), repl)
518         // Pass pdh to IncreaseDesired only if LostBlocksFile is being
519         // written -- otherwise it's just a waste of memory.
520         pdh := ""
521         if bal.LostBlocksFile != "" {
522                 pdh = coll.PortableDataHash
523         }
524         bal.BlockStateMap.IncreaseDesired(pdh, coll.StorageClassesDesired, repl, blkids)
525         return nil
526 }
527
528 // ComputeChangeSets compares, for each known block, the current and
529 // desired replication states. If it is possible to get closer to the
530 // desired state by copying or deleting blocks, it adds those changes
531 // to the relevant KeepServices' ChangeSets.
532 //
533 // It does not actually apply any of the computed changes.
534 func (bal *Balancer) ComputeChangeSets() {
535         // This just calls balanceBlock() once for each block, using a
536         // pool of worker goroutines.
537         defer bal.time("changeset_compute", "wall clock time to compute changesets")()
538         bal.setupLookupTables()
539
540         type balanceTask struct {
541                 blkid arvados.SizedDigest
542                 blk   *BlockState
543         }
544         workers := runtime.GOMAXPROCS(-1)
545         todo := make(chan balanceTask, workers)
546         go func() {
547                 bal.BlockStateMap.Apply(func(blkid arvados.SizedDigest, blk *BlockState) {
548                         todo <- balanceTask{
549                                 blkid: blkid,
550                                 blk:   blk,
551                         }
552                 })
553                 close(todo)
554         }()
555         results := make(chan balanceResult, workers)
556         go func() {
557                 var wg sync.WaitGroup
558                 for i := 0; i < workers; i++ {
559                         wg.Add(1)
560                         go func() {
561                                 for work := range todo {
562                                         results <- bal.balanceBlock(work.blkid, work.blk)
563                                 }
564                                 wg.Done()
565                         }()
566                 }
567                 wg.Wait()
568                 close(results)
569         }()
570         bal.collectStatistics(results)
571 }
572
573 func (bal *Balancer) setupLookupTables() {
574         bal.serviceRoots = make(map[string]string)
575         bal.classes = defaultClasses
576         bal.mountsByClass = map[string]map[*KeepMount]bool{"default": {}}
577         bal.mounts = 0
578         for _, srv := range bal.KeepServices {
579                 bal.serviceRoots[srv.UUID] = srv.UUID
580                 for _, mnt := range srv.mounts {
581                         bal.mounts++
582
583                         // All mounts on a read-only service are
584                         // effectively read-only.
585                         mnt.ReadOnly = mnt.ReadOnly || srv.ReadOnly
586
587                         for class := range mnt.StorageClasses {
588                                 if mbc := bal.mountsByClass[class]; mbc == nil {
589                                         bal.classes = append(bal.classes, class)
590                                         bal.mountsByClass[class] = map[*KeepMount]bool{mnt: true}
591                                 } else {
592                                         mbc[mnt] = true
593                                 }
594                         }
595                 }
596         }
597         // Consider classes in lexicographic order to avoid flapping
598         // between balancing runs.  The outcome of the "prefer a mount
599         // we're already planning to use for a different storage
600         // class" case in balanceBlock depends on the order classes
601         // are considered.
602         sort.Strings(bal.classes)
603 }
604
605 const (
606         changeStay = iota
607         changePull
608         changeTrash
609         changeNone
610 )
611
612 var changeName = map[int]string{
613         changeStay:  "stay",
614         changePull:  "pull",
615         changeTrash: "trash",
616         changeNone:  "none",
617 }
618
619 type balancedBlockState struct {
620         needed       int
621         unneeded     int
622         pulling      int
623         unachievable bool
624 }
625
626 type balanceResult struct {
627         blk        *BlockState
628         blkid      arvados.SizedDigest
629         lost       bool
630         blockState balancedBlockState
631         classState map[string]balancedBlockState
632 }
633
634 type slot struct {
635         mnt  *KeepMount // never nil
636         repl *Replica   // replica already stored here (or nil)
637         want bool       // we should pull/leave a replica here
638 }
639
640 // balanceBlock compares current state to desired state for a single
641 // block, and makes the appropriate ChangeSet calls.
642 func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) balanceResult {
643         bal.Logger.Debugf("balanceBlock: %v %+v", blkid, blk)
644
645         // Build a list of all slots (one per mounted volume).
646         slots := make([]slot, 0, bal.mounts)
647         for _, srv := range bal.KeepServices {
648                 for _, mnt := range srv.mounts {
649                         var repl *Replica
650                         for r := range blk.Replicas {
651                                 if blk.Replicas[r].KeepMount == mnt {
652                                         repl = &blk.Replicas[r]
653                                 }
654                         }
655                         // Initial value of "want" is "have, and can't
656                         // delete". These untrashable replicas get
657                         // prioritized when sorting slots: otherwise,
658                         // non-optimal readonly copies would cause us
659                         // to overreplicate.
660                         slots = append(slots, slot{
661                                 mnt:  mnt,
662                                 repl: repl,
663                                 want: repl != nil && mnt.ReadOnly,
664                         })
665                 }
666         }
667
668         uuids := keepclient.NewRootSorter(bal.serviceRoots, string(blkid[:32])).GetSortedRoots()
669         srvRendezvous := make(map[*KeepService]int, len(uuids))
670         for i, uuid := range uuids {
671                 srv := bal.KeepServices[uuid]
672                 srvRendezvous[srv] = i
673         }
674
675         // Below we set underreplicated=true if we find any storage
676         // class that's currently underreplicated -- in that case we
677         // won't want to trash any replicas.
678         underreplicated := false
679
680         unsafeToDelete := make(map[int64]bool, len(slots))
681         for _, class := range bal.classes {
682                 desired := blk.Desired[class]
683                 if desired == 0 {
684                         continue
685                 }
686
687                 // Sort the slots by desirability.
688                 sort.Slice(slots, func(i, j int) bool {
689                         si, sj := slots[i], slots[j]
690                         if classi, classj := bal.mountsByClass[class][si.mnt], bal.mountsByClass[class][sj.mnt]; classi != classj {
691                                 // Prefer a mount that satisfies the
692                                 // desired class.
693                                 return bal.mountsByClass[class][si.mnt]
694                         } else if si.want != sj.want {
695                                 // Prefer a mount that will have a
696                                 // replica no matter what we do here
697                                 // -- either because it already has an
698                                 // untrashable replica, or because we
699                                 // already need it to satisfy a
700                                 // different storage class.
701                                 return si.want
702                         } else if orderi, orderj := srvRendezvous[si.mnt.KeepService], srvRendezvous[sj.mnt.KeepService]; orderi != orderj {
703                                 // Prefer a better rendezvous
704                                 // position.
705                                 return orderi < orderj
706                         } else if repli, replj := si.repl != nil, sj.repl != nil; repli != replj {
707                                 // Prefer a mount that already has a
708                                 // replica.
709                                 return repli
710                         } else {
711                                 // If pull/trash turns out to be
712                                 // needed, distribute the
713                                 // new/remaining replicas uniformly
714                                 // across qualifying mounts on a given
715                                 // server.
716                                 return rendezvousLess(si.mnt.UUID, sj.mnt.UUID, blkid)
717                         }
718                 })
719
720                 // Servers/mounts/devices (with or without existing
721                 // replicas) that are part of the best achievable
722                 // layout for this storage class.
723                 wantSrv := map[*KeepService]bool{}
724                 wantMnt := map[*KeepMount]bool{}
725                 wantDev := map[string]bool{}
726                 // Positions (with existing replicas) that have been
727                 // protected (via unsafeToDelete) to ensure we don't
728                 // reduce replication below desired level when
729                 // trashing replicas that aren't optimal positions for
730                 // any storage class.
731                 protMnt := map[*KeepMount]bool{}
732                 // Replication planned so far (corresponds to wantMnt).
733                 replWant := 0
734                 // Protected replication (corresponds to protMnt).
735                 replProt := 0
736
737                 // trySlot tries using a slot to meet requirements,
738                 // and returns true if all requirements are met.
739                 trySlot := func(i int) bool {
740                         slot := slots[i]
741                         if wantMnt[slot.mnt] || wantDev[slot.mnt.UUID] {
742                                 // Already allocated a replica to this
743                                 // backend device, possibly on a
744                                 // different server.
745                                 return false
746                         }
747                         if replProt < desired && slot.repl != nil && !protMnt[slot.mnt] {
748                                 unsafeToDelete[slot.repl.Mtime] = true
749                                 protMnt[slot.mnt] = true
750                                 replProt += slot.mnt.Replication
751                         }
752                         if replWant < desired && (slot.repl != nil || !slot.mnt.ReadOnly) {
753                                 slots[i].want = true
754                                 wantSrv[slot.mnt.KeepService] = true
755                                 wantMnt[slot.mnt] = true
756                                 wantDev[slot.mnt.UUID] = true
757                                 replWant += slot.mnt.Replication
758                         }
759                         return replProt >= desired && replWant >= desired
760                 }
761
762                 // First try to achieve desired replication without
763                 // using the same server twice.
764                 done := false
765                 for i := 0; i < len(slots) && !done; i++ {
766                         if !wantSrv[slots[i].mnt.KeepService] {
767                                 done = trySlot(i)
768                         }
769                 }
770
771                 // If that didn't suffice, do another pass without the
772                 // "distinct services" restriction. (Achieving the
773                 // desired volume replication on fewer than the
774                 // desired number of services is better than
775                 // underreplicating.)
776                 for i := 0; i < len(slots) && !done; i++ {
777                         done = trySlot(i)
778                 }
779
780                 if !underreplicated {
781                         safe := 0
782                         for _, slot := range slots {
783                                 if slot.repl == nil || !bal.mountsByClass[class][slot.mnt] {
784                                         continue
785                                 }
786                                 if safe += slot.mnt.Replication; safe >= desired {
787                                         break
788                                 }
789                         }
790                         underreplicated = safe < desired
791                 }
792
793                 // Avoid deleting wanted replicas from devices that
794                 // are mounted on multiple servers -- even if they
795                 // haven't already been added to unsafeToDelete
796                 // because the servers report different Mtimes.
797                 for _, slot := range slots {
798                         if slot.repl != nil && wantDev[slot.mnt.UUID] {
799                                 unsafeToDelete[slot.repl.Mtime] = true
800                         }
801                 }
802         }
803
804         // TODO: If multiple replicas are trashable, prefer the oldest
805         // replica that doesn't have a timestamp collision with
806         // others.
807
808         for i, slot := range slots {
809                 // Don't trash (1) any replicas of an underreplicated
810                 // block, even if they're in the wrong positions, or
811                 // (2) any replicas whose Mtimes are identical to
812                 // needed replicas (in case we're really seeing the
813                 // same copy via different mounts).
814                 if slot.repl != nil && (underreplicated || unsafeToDelete[slot.repl.Mtime]) {
815                         slots[i].want = true
816                 }
817         }
818
819         classState := make(map[string]balancedBlockState, len(bal.classes))
820         for _, class := range bal.classes {
821                 classState[class] = computeBlockState(slots, bal.mountsByClass[class], len(blk.Replicas), blk.Desired[class])
822         }
823         blockState := computeBlockState(slots, nil, len(blk.Replicas), 0)
824
825         var lost bool
826         var changes []string
827         for _, slot := range slots {
828                 // TODO: request a Touch if Mtime is duplicated.
829                 var change int
830                 switch {
831                 case !slot.want && slot.repl != nil && slot.repl.Mtime < bal.MinMtime:
832                         slot.mnt.KeepService.AddTrash(Trash{
833                                 SizedDigest: blkid,
834                                 Mtime:       slot.repl.Mtime,
835                                 From:        slot.mnt,
836                         })
837                         change = changeTrash
838                 case slot.repl == nil && slot.want && len(blk.Replicas) == 0:
839                         lost = true
840                         change = changeNone
841                 case slot.repl == nil && slot.want && !slot.mnt.ReadOnly:
842                         slot.mnt.KeepService.AddPull(Pull{
843                                 SizedDigest: blkid,
844                                 From:        blk.Replicas[0].KeepMount.KeepService,
845                                 To:          slot.mnt,
846                         })
847                         change = changePull
848                 case slot.repl != nil:
849                         change = changeStay
850                 default:
851                         change = changeNone
852                 }
853                 if bal.Dumper != nil {
854                         var mtime int64
855                         if slot.repl != nil {
856                                 mtime = slot.repl.Mtime
857                         }
858                         srv := slot.mnt.KeepService
859                         changes = append(changes, fmt.Sprintf("%s:%d/%s=%s,%d", srv.ServiceHost, srv.ServicePort, slot.mnt.UUID, changeName[change], mtime))
860                 }
861         }
862         if bal.Dumper != nil {
863                 bal.Dumper.Printf("%s refs=%d needed=%d unneeded=%d pulling=%v %v %v", blkid, blk.RefCount, blockState.needed, blockState.unneeded, blockState.pulling, blk.Desired, changes)
864         }
865         return balanceResult{
866                 blk:        blk,
867                 blkid:      blkid,
868                 lost:       lost,
869                 blockState: blockState,
870                 classState: classState,
871         }
872 }
873
874 func computeBlockState(slots []slot, onlyCount map[*KeepMount]bool, have, needRepl int) (bbs balancedBlockState) {
875         repl := 0
876         countedDev := map[string]bool{}
877         for _, slot := range slots {
878                 if onlyCount != nil && !onlyCount[slot.mnt] {
879                         continue
880                 }
881                 if countedDev[slot.mnt.UUID] {
882                         continue
883                 }
884                 switch {
885                 case slot.repl != nil && slot.want:
886                         bbs.needed++
887                         repl += slot.mnt.Replication
888                 case slot.repl != nil && !slot.want:
889                         bbs.unneeded++
890                         repl += slot.mnt.Replication
891                 case slot.repl == nil && slot.want && have > 0:
892                         bbs.pulling++
893                         repl += slot.mnt.Replication
894                 }
895                 countedDev[slot.mnt.UUID] = true
896         }
897         if repl < needRepl {
898                 bbs.unachievable = true
899         }
900         return
901 }
902
903 type blocksNBytes struct {
904         replicas int
905         blocks   int
906         bytes    int64
907 }
908
909 func (bb blocksNBytes) String() string {
910         return fmt.Sprintf("%d replicas (%d blocks, %d bytes)", bb.replicas, bb.blocks, bb.bytes)
911 }
912
913 type replicationStats struct {
914         needed       blocksNBytes
915         unneeded     blocksNBytes
916         pulling      blocksNBytes
917         unachievable blocksNBytes
918 }
919
920 type balancerStats struct {
921         lost          blocksNBytes
922         overrep       blocksNBytes
923         unref         blocksNBytes
924         garbage       blocksNBytes
925         underrep      blocksNBytes
926         unachievable  blocksNBytes
927         justright     blocksNBytes
928         desired       blocksNBytes
929         current       blocksNBytes
930         pulls         int
931         trashes       int
932         replHistogram []int
933         classStats    map[string]replicationStats
934
935         // collectionBytes / collectionBlockBytes = deduplication ratio
936         collectionBytes      int64 // sum(bytes in referenced blocks) across all collections
937         collectionBlockBytes int64 // sum(block size) across all blocks referenced by collections
938         collectionBlockRefs  int64 // sum(number of blocks referenced) across all collections
939         collectionBlocks     int64 // number of blocks referenced by any collection
940 }
941
942 func (s *balancerStats) dedupByteRatio() float64 {
943         if s.collectionBlockBytes == 0 {
944                 return 0
945         }
946         return float64(s.collectionBytes) / float64(s.collectionBlockBytes)
947 }
948
949 func (s *balancerStats) dedupBlockRatio() float64 {
950         if s.collectionBlocks == 0 {
951                 return 0
952         }
953         return float64(s.collectionBlockRefs) / float64(s.collectionBlocks)
954 }
955
956 func (bal *Balancer) collectStatistics(results <-chan balanceResult) {
957         var s balancerStats
958         s.replHistogram = make([]int, 2)
959         s.classStats = make(map[string]replicationStats, len(bal.classes))
960         for result := range results {
961                 bytes := result.blkid.Size()
962
963                 if rc := int64(result.blk.RefCount); rc > 0 {
964                         s.collectionBytes += rc * bytes
965                         s.collectionBlockBytes += bytes
966                         s.collectionBlockRefs += rc
967                         s.collectionBlocks++
968                 }
969
970                 for class, state := range result.classState {
971                         cs := s.classStats[class]
972                         if state.unachievable {
973                                 cs.unachievable.replicas++
974                                 cs.unachievable.blocks++
975                                 cs.unachievable.bytes += bytes
976                         }
977                         if state.needed > 0 {
978                                 cs.needed.replicas += state.needed
979                                 cs.needed.blocks++
980                                 cs.needed.bytes += bytes * int64(state.needed)
981                         }
982                         if state.unneeded > 0 {
983                                 cs.unneeded.replicas += state.unneeded
984                                 cs.unneeded.blocks++
985                                 cs.unneeded.bytes += bytes * int64(state.unneeded)
986                         }
987                         if state.pulling > 0 {
988                                 cs.pulling.replicas += state.pulling
989                                 cs.pulling.blocks++
990                                 cs.pulling.bytes += bytes * int64(state.pulling)
991                         }
992                         s.classStats[class] = cs
993                 }
994
995                 bs := result.blockState
996                 switch {
997                 case result.lost:
998                         s.lost.replicas++
999                         s.lost.blocks++
1000                         s.lost.bytes += bytes
1001                         fmt.Fprintf(bal.lostBlocks, "%s", strings.SplitN(string(result.blkid), "+", 2)[0])
1002                         for pdh := range result.blk.Refs {
1003                                 fmt.Fprintf(bal.lostBlocks, " %s", pdh)
1004                         }
1005                         fmt.Fprint(bal.lostBlocks, "\n")
1006                 case bs.pulling > 0:
1007                         s.underrep.replicas += bs.pulling
1008                         s.underrep.blocks++
1009                         s.underrep.bytes += bytes * int64(bs.pulling)
1010                 case bs.unachievable:
1011                         s.underrep.replicas++
1012                         s.underrep.blocks++
1013                         s.underrep.bytes += bytes
1014                 case bs.unneeded > 0 && bs.needed == 0:
1015                         // Count as "garbage" if all replicas are old
1016                         // enough to trash, otherwise count as
1017                         // "unref".
1018                         counter := &s.garbage
1019                         for _, r := range result.blk.Replicas {
1020                                 if r.Mtime >= bal.MinMtime {
1021                                         counter = &s.unref
1022                                         break
1023                                 }
1024                         }
1025                         counter.replicas += bs.unneeded
1026                         counter.blocks++
1027                         counter.bytes += bytes * int64(bs.unneeded)
1028                 case bs.unneeded > 0:
1029                         s.overrep.replicas += bs.unneeded
1030                         s.overrep.blocks++
1031                         s.overrep.bytes += bytes * int64(bs.unneeded)
1032                 default:
1033                         s.justright.replicas += bs.needed
1034                         s.justright.blocks++
1035                         s.justright.bytes += bytes * int64(bs.needed)
1036                 }
1037
1038                 if bs.needed > 0 {
1039                         s.desired.replicas += bs.needed
1040                         s.desired.blocks++
1041                         s.desired.bytes += bytes * int64(bs.needed)
1042                 }
1043                 if bs.needed+bs.unneeded > 0 {
1044                         s.current.replicas += bs.needed + bs.unneeded
1045                         s.current.blocks++
1046                         s.current.bytes += bytes * int64(bs.needed+bs.unneeded)
1047                 }
1048
1049                 for len(s.replHistogram) <= bs.needed+bs.unneeded {
1050                         s.replHistogram = append(s.replHistogram, 0)
1051                 }
1052                 s.replHistogram[bs.needed+bs.unneeded]++
1053         }
1054         for _, srv := range bal.KeepServices {
1055                 s.pulls += len(srv.ChangeSet.Pulls)
1056                 s.trashes += len(srv.ChangeSet.Trashes)
1057         }
1058         bal.stats = s
1059         bal.Metrics.UpdateStats(s)
1060 }
1061
1062 // PrintStatistics writes statistics about the computed changes to
1063 // bal.Logger. It should not be called until ComputeChangeSets has
1064 // finished.
1065 func (bal *Balancer) PrintStatistics() {
1066         bal.logf("===")
1067         bal.logf("%s lost (0=have<want)", bal.stats.lost)
1068         bal.logf("%s underreplicated (0<have<want)", bal.stats.underrep)
1069         bal.logf("%s just right (have=want)", bal.stats.justright)
1070         bal.logf("%s overreplicated (have>want>0)", bal.stats.overrep)
1071         bal.logf("%s unreferenced (have>want=0, new)", bal.stats.unref)
1072         bal.logf("%s garbage (have>want=0, old)", bal.stats.garbage)
1073         for _, class := range bal.classes {
1074                 cs := bal.stats.classStats[class]
1075                 bal.logf("===")
1076                 bal.logf("storage class %q: %s needed", class, cs.needed)
1077                 bal.logf("storage class %q: %s unneeded", class, cs.unneeded)
1078                 bal.logf("storage class %q: %s pulling", class, cs.pulling)
1079                 bal.logf("storage class %q: %s unachievable", class, cs.unachievable)
1080         }
1081         bal.logf("===")
1082         bal.logf("%s total commitment (excluding unreferenced)", bal.stats.desired)
1083         bal.logf("%s total usage", bal.stats.current)
1084         bal.logf("===")
1085         for _, srv := range bal.KeepServices {
1086                 bal.logf("%s: %v\n", srv, srv.ChangeSet)
1087         }
1088         bal.logf("===")
1089         bal.printHistogram(60)
1090         bal.logf("===")
1091 }
1092
1093 func (bal *Balancer) printHistogram(hashColumns int) {
1094         bal.logf("Replication level distribution:")
1095         maxCount := 0
1096         for _, count := range bal.stats.replHistogram {
1097                 if maxCount < count {
1098                         maxCount = count
1099                 }
1100         }
1101         hashes := strings.Repeat("#", hashColumns)
1102         countWidth := 1 + int(math.Log10(float64(maxCount+1)))
1103         scaleCount := 10 * float64(hashColumns) / math.Floor(1+10*math.Log10(float64(maxCount+1)))
1104         for repl, count := range bal.stats.replHistogram {
1105                 nHashes := int(scaleCount * math.Log10(float64(count+1)))
1106                 bal.logf("%2d: %*d %s", repl, countWidth, count, hashes[:nHashes])
1107         }
1108 }
1109
1110 // CheckSanityLate checks for configuration and runtime errors after
1111 // GetCurrentState() and ComputeChangeSets() have finished.
1112 //
1113 // If it returns an error, it is dangerous to run any Commit methods.
1114 func (bal *Balancer) CheckSanityLate() error {
1115         if bal.errors != nil {
1116                 for _, err := range bal.errors {
1117                         bal.logf("deferred error: %v", err)
1118                 }
1119                 return fmt.Errorf("cannot proceed safely after deferred errors")
1120         }
1121
1122         if bal.collScanned == 0 {
1123                 return fmt.Errorf("received zero collections")
1124         }
1125
1126         anyDesired := false
1127         bal.BlockStateMap.Apply(func(_ arvados.SizedDigest, blk *BlockState) {
1128                 for _, desired := range blk.Desired {
1129                         if desired > 0 {
1130                                 anyDesired = true
1131                                 break
1132                         }
1133                 }
1134         })
1135         if !anyDesired {
1136                 return fmt.Errorf("zero blocks have desired replication>0")
1137         }
1138
1139         if dr := bal.DefaultReplication; dr < 1 {
1140                 return fmt.Errorf("Default replication (%d) is less than 1", dr)
1141         }
1142
1143         // TODO: no two services have identical indexes
1144         // TODO: no collisions (same md5, different size)
1145         return nil
1146 }
1147
1148 // CommitPulls sends the computed lists of pull requests to the
1149 // keepstore servers. This has the effect of increasing replication of
1150 // existing blocks that are either underreplicated or poorly
1151 // distributed according to rendezvous hashing.
1152 func (bal *Balancer) CommitPulls(ctx context.Context, c *arvados.Client) error {
1153         defer bal.time("send_pull_lists", "wall clock time to send pull lists")()
1154         return bal.commitAsync(c, "send pull list",
1155                 func(srv *KeepService) error {
1156                         return srv.CommitPulls(ctx, c)
1157                 })
1158 }
1159
1160 // CommitTrash sends the computed lists of trash requests to the
1161 // keepstore servers. This has the effect of deleting blocks that are
1162 // overreplicated or unreferenced.
1163 func (bal *Balancer) CommitTrash(ctx context.Context, c *arvados.Client) error {
1164         defer bal.time("send_trash_lists", "wall clock time to send trash lists")()
1165         return bal.commitAsync(c, "send trash list",
1166                 func(srv *KeepService) error {
1167                         return srv.CommitTrash(ctx, c)
1168                 })
1169 }
1170
1171 func (bal *Balancer) commitAsync(c *arvados.Client, label string, f func(srv *KeepService) error) error {
1172         errs := make(chan error)
1173         for _, srv := range bal.KeepServices {
1174                 go func(srv *KeepService) {
1175                         var err error
1176                         defer func() { errs <- err }()
1177                         label := fmt.Sprintf("%s: %v", srv, label)
1178                         err = f(srv)
1179                         if err != nil {
1180                                 err = fmt.Errorf("%s: %v", label, err)
1181                         }
1182                 }(srv)
1183         }
1184         var lastErr error
1185         for range bal.KeepServices {
1186                 if err := <-errs; err != nil {
1187                         bal.logf("%v", err)
1188                         lastErr = err
1189                 }
1190         }
1191         close(errs)
1192         return lastErr
1193 }
1194
1195 func (bal *Balancer) logf(f string, args ...interface{}) {
1196         if bal.Logger != nil {
1197                 bal.Logger.Printf(f, args...)
1198         }
1199 }
1200
1201 func (bal *Balancer) time(name, help string) func() {
1202         observer := bal.Metrics.DurationObserver(name+"_seconds", help)
1203         t0 := time.Now()
1204         bal.Logger.Printf("%s: start", name)
1205         return func() {
1206                 dur := time.Since(t0)
1207                 observer.Observe(dur.Seconds())
1208                 bal.Logger.Printf("%s: took %vs", name, dur.Seconds())
1209         }
1210 }
1211
1212 // Log current memory usage: once now, at least once every 10 minutes,
1213 // and when memory grows by 40% since the last log. Stop when ctx is
1214 // canceled.
1215 func (bal *Balancer) reportMemorySize(ctx context.Context) {
1216         buf, _ := os.ReadFile("/proc/self/smaps")
1217         m := regexp.MustCompile(`\nKernelPageSize:\s*(\d+) kB\n`).FindSubmatch(buf)
1218         var pagesize int64
1219         if len(m) == 2 {
1220                 pagesize, _ = strconv.ParseInt(string(m[1]), 10, 64)
1221                 pagesize <<= 10
1222         }
1223         if pagesize == 0 {
1224                 bal.logf("cannot report memory size: failed to parse KernelPageSize from /proc/self/smaps")
1225                 return
1226         }
1227
1228         var nextTime time.Time
1229         var nextMem int64
1230         const maxInterval = time.Minute * 10
1231         const maxIncrease = 1.4
1232
1233         ticker := time.NewTicker(time.Second)
1234         defer ticker.Stop()
1235         for ctx.Err() == nil {
1236                 now := time.Now()
1237                 buf, _ := os.ReadFile("/proc/self/statm")
1238                 fields := strings.Split(string(buf), " ")
1239                 mem, _ := strconv.ParseInt(fields[0], 10, 64)
1240                 mem *= pagesize
1241                 if now.After(nextTime) || mem >= nextMem {
1242                         bal.logf("process virtual memory size %d", mem)
1243                         nextMem = int64(float64(mem) * maxIncrease)
1244                         nextTime = now.Add(maxInterval)
1245                 }
1246                 <-ticker.C
1247         }
1248 }
1249
1250 // Rendezvous hash sort function. Less efficient than sorting on
1251 // precomputed rendezvous hashes, but also rarely used.
1252 func rendezvousLess(i, j string, blkid arvados.SizedDigest) bool {
1253         a := md5.Sum([]byte(string(blkid[:32]) + i))
1254         b := md5.Sum([]byte(string(blkid[:32]) + j))
1255         return bytes.Compare(a[:], b[:]) < 0
1256 }