Merge branch '17967-storage-classes-config' into main
[arvados.git] / services / keep-balance / balance.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "context"
10         "crypto/md5"
11         "fmt"
12         "io"
13         "io/ioutil"
14         "log"
15         "math"
16         "os"
17         "runtime"
18         "sort"
19         "strings"
20         "sync"
21         "sync/atomic"
22         "syscall"
23         "time"
24
25         "git.arvados.org/arvados.git/sdk/go/arvados"
26         "git.arvados.org/arvados.git/sdk/go/keepclient"
27         "github.com/jmoiron/sqlx"
28         "github.com/sirupsen/logrus"
29 )
30
31 // Balancer compares the contents of keepstore servers with the
32 // collections stored in Arvados, and issues pull/trash requests
33 // needed to get (closer to) the optimal data layout.
34 //
35 // In the optimal data layout: every data block referenced by a
36 // collection is replicated at least as many times as desired by the
37 // collection; there are no unreferenced data blocks older than
38 // BlobSignatureTTL; and all N existing replicas of a given data block
39 // are in the N best positions in rendezvous probe order.
40 type Balancer struct {
41         DB      *sqlx.DB
42         Logger  logrus.FieldLogger
43         Dumper  logrus.FieldLogger
44         Metrics *metrics
45
46         LostBlocksFile string
47
48         *BlockStateMap
49         KeepServices       map[string]*KeepService
50         DefaultReplication int
51         MinMtime           int64
52
53         classes       []string
54         mounts        int
55         mountsByClass map[string]map[*KeepMount]bool
56         collScanned   int64
57         serviceRoots  map[string]string
58         errors        []error
59         stats         balancerStats
60         mutex         sync.Mutex
61         lostBlocks    io.Writer
62 }
63
64 // Run performs a balance operation using the given config and
65 // runOptions, and returns RunOptions suitable for passing to a
66 // subsequent balance operation.
67 //
68 // Run should only be called once on a given Balancer object.
69 //
70 // Typical usage:
71 //
72 //   runOptions, err = (&Balancer{}).Run(config, runOptions)
73 func (bal *Balancer) Run(client *arvados.Client, cluster *arvados.Cluster, runOptions RunOptions) (nextRunOptions RunOptions, err error) {
74         nextRunOptions = runOptions
75
76         defer bal.time("sweep", "wall clock time to run one full sweep")()
77
78         ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(cluster.Collections.BalanceTimeout.Duration()))
79         defer cancel()
80
81         var lbFile *os.File
82         if bal.LostBlocksFile != "" {
83                 tmpfn := bal.LostBlocksFile + ".tmp"
84                 lbFile, err = os.OpenFile(tmpfn, os.O_CREATE|os.O_WRONLY, 0777)
85                 if err != nil {
86                         return
87                 }
88                 defer lbFile.Close()
89                 err = syscall.Flock(int(lbFile.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
90                 if err != nil {
91                         return
92                 }
93                 defer func() {
94                         // Remove the tempfile only if we didn't get
95                         // as far as successfully renaming it.
96                         if lbFile != nil {
97                                 os.Remove(tmpfn)
98                         }
99                 }()
100                 bal.lostBlocks = lbFile
101         } else {
102                 bal.lostBlocks = ioutil.Discard
103         }
104
105         err = bal.DiscoverKeepServices(client)
106         if err != nil {
107                 return
108         }
109
110         for _, srv := range bal.KeepServices {
111                 err = srv.discoverMounts(client)
112                 if err != nil {
113                         return
114                 }
115         }
116         bal.cleanupMounts()
117
118         if err = bal.CheckSanityEarly(client); err != nil {
119                 return
120         }
121
122         // On a big site, indexing and sending trash/pull lists can
123         // take much longer than the usual 5 minute client
124         // timeout. From here on, we rely on the context deadline
125         // instead, aborting the entire operation if any part takes
126         // too long.
127         client.Timeout = 0
128
129         rs := bal.rendezvousState()
130         if runOptions.CommitTrash && rs != runOptions.SafeRendezvousState {
131                 if runOptions.SafeRendezvousState != "" {
132                         bal.logf("notice: KeepServices list has changed since last run")
133                 }
134                 bal.logf("clearing existing trash lists, in case the new rendezvous order differs from previous run")
135                 if err = bal.ClearTrashLists(ctx, client); err != nil {
136                         return
137                 }
138                 // The current rendezvous state becomes "safe" (i.e.,
139                 // OK to compute changes for that state without
140                 // clearing existing trash lists) only now, after we
141                 // succeed in clearing existing trash lists.
142                 nextRunOptions.SafeRendezvousState = rs
143         }
144
145         if err = bal.GetCurrentState(ctx, client, cluster.Collections.BalanceCollectionBatch, cluster.Collections.BalanceCollectionBuffers); err != nil {
146                 return
147         }
148         bal.ComputeChangeSets()
149         bal.PrintStatistics()
150         if err = bal.CheckSanityLate(); err != nil {
151                 return
152         }
153         if lbFile != nil {
154                 err = lbFile.Sync()
155                 if err != nil {
156                         return
157                 }
158                 err = os.Rename(bal.LostBlocksFile+".tmp", bal.LostBlocksFile)
159                 if err != nil {
160                         return
161                 }
162                 lbFile = nil
163         }
164         if runOptions.CommitPulls {
165                 err = bal.CommitPulls(ctx, client)
166                 if err != nil {
167                         // Skip trash if we can't pull. (Too cautious?)
168                         return
169                 }
170         }
171         if runOptions.CommitTrash {
172                 err = bal.CommitTrash(ctx, client)
173                 if err != nil {
174                         return
175                 }
176         }
177         if runOptions.CommitConfirmedFields {
178                 err = bal.updateCollections(ctx, client, cluster)
179                 if err != nil {
180                         return
181                 }
182         }
183         return
184 }
185
186 // SetKeepServices sets the list of KeepServices to operate on.
187 func (bal *Balancer) SetKeepServices(srvList arvados.KeepServiceList) error {
188         bal.KeepServices = make(map[string]*KeepService)
189         for _, srv := range srvList.Items {
190                 bal.KeepServices[srv.UUID] = &KeepService{
191                         KeepService: srv,
192                         ChangeSet:   &ChangeSet{},
193                 }
194         }
195         return nil
196 }
197
198 // DiscoverKeepServices sets the list of KeepServices by calling the
199 // API to get a list of all services, and selecting the ones whose
200 // ServiceType is "disk"
201 func (bal *Balancer) DiscoverKeepServices(c *arvados.Client) error {
202         bal.KeepServices = make(map[string]*KeepService)
203         return c.EachKeepService(func(srv arvados.KeepService) error {
204                 if srv.ServiceType == "disk" {
205                         bal.KeepServices[srv.UUID] = &KeepService{
206                                 KeepService: srv,
207                                 ChangeSet:   &ChangeSet{},
208                         }
209                 } else {
210                         bal.logf("skipping %v with service type %q", srv.UUID, srv.ServiceType)
211                 }
212                 return nil
213         })
214 }
215
216 func (bal *Balancer) cleanupMounts() {
217         rwdev := map[string]*KeepService{}
218         for _, srv := range bal.KeepServices {
219                 for _, mnt := range srv.mounts {
220                         if !mnt.ReadOnly && mnt.DeviceID != "" {
221                                 rwdev[mnt.DeviceID] = srv
222                         }
223                 }
224         }
225         // Drop the readonly mounts whose device is mounted RW
226         // elsewhere.
227         for _, srv := range bal.KeepServices {
228                 var dedup []*KeepMount
229                 for _, mnt := range srv.mounts {
230                         if mnt.ReadOnly && rwdev[mnt.DeviceID] != nil {
231                                 bal.logf("skipping srv %s readonly mount %q because same device %q is mounted read-write on srv %s", srv, mnt.UUID, mnt.DeviceID, rwdev[mnt.DeviceID])
232                         } else {
233                                 dedup = append(dedup, mnt)
234                         }
235                 }
236                 srv.mounts = dedup
237         }
238         for _, srv := range bal.KeepServices {
239                 for _, mnt := range srv.mounts {
240                         if mnt.Replication <= 0 {
241                                 log.Printf("%s: mount %s reports replication=%d, using replication=1", srv, mnt.UUID, mnt.Replication)
242                                 mnt.Replication = 1
243                         }
244                 }
245         }
246 }
247
248 // CheckSanityEarly checks for configuration and runtime errors that
249 // can be detected before GetCurrentState() and ComputeChangeSets()
250 // are called.
251 //
252 // If it returns an error, it is pointless to run GetCurrentState or
253 // ComputeChangeSets: after doing so, the statistics would be
254 // meaningless and it would be dangerous to run any Commit methods.
255 func (bal *Balancer) CheckSanityEarly(c *arvados.Client) error {
256         u, err := c.CurrentUser()
257         if err != nil {
258                 return fmt.Errorf("CurrentUser(): %v", err)
259         }
260         if !u.IsActive || !u.IsAdmin {
261                 return fmt.Errorf("current user (%s) is not an active admin user", u.UUID)
262         }
263         for _, srv := range bal.KeepServices {
264                 if srv.ServiceType == "proxy" {
265                         return fmt.Errorf("config error: %s: proxy servers cannot be balanced", srv)
266                 }
267         }
268
269         var checkPage arvados.CollectionList
270         if err = c.RequestAndDecode(&checkPage, "GET", "arvados/v1/collections", nil, arvados.ResourceListParams{
271                 Limit:              new(int),
272                 Count:              "exact",
273                 IncludeTrash:       true,
274                 IncludeOldVersions: true,
275                 Filters: []arvados.Filter{{
276                         Attr:     "modified_at",
277                         Operator: "=",
278                         Operand:  nil,
279                 }},
280         }); err != nil {
281                 return err
282         } else if n := checkPage.ItemsAvailable; n > 0 {
283                 return fmt.Errorf("%d collections exist with null modified_at; cannot fetch reliably", n)
284         }
285
286         return nil
287 }
288
289 // rendezvousState returns a fingerprint (e.g., a sorted list of
290 // UUID+host+port) of the current set of keep services.
291 func (bal *Balancer) rendezvousState() string {
292         srvs := make([]string, 0, len(bal.KeepServices))
293         for _, srv := range bal.KeepServices {
294                 srvs = append(srvs, srv.String())
295         }
296         sort.Strings(srvs)
297         return strings.Join(srvs, "; ")
298 }
299
300 // ClearTrashLists sends an empty trash list to each keep
301 // service. Calling this before GetCurrentState avoids races.
302 //
303 // When a block appears in an index, we assume that replica will still
304 // exist after we delete other replicas on other servers. However,
305 // it's possible that a previous rebalancing operation made different
306 // decisions (e.g., servers were added/removed, and rendezvous order
307 // changed). In this case, the replica might already be on that
308 // server's trash list, and it might be deleted before we send a
309 // replacement trash list.
310 //
311 // We avoid this problem if we clear all trash lists before getting
312 // indexes. (We also assume there is only one rebalancing process
313 // running at a time.)
314 func (bal *Balancer) ClearTrashLists(ctx context.Context, c *arvados.Client) error {
315         for _, srv := range bal.KeepServices {
316                 srv.ChangeSet = &ChangeSet{}
317         }
318         return bal.CommitTrash(ctx, c)
319 }
320
321 // GetCurrentState determines the current replication state, and the
322 // desired replication level, for every block that is either
323 // retrievable or referenced.
324 //
325 // It determines the current replication state by reading the block index
326 // from every known Keep service.
327 //
328 // It determines the desired replication level by retrieving all
329 // collection manifests in the database (API server).
330 //
331 // It encodes the resulting information in BlockStateMap.
332 func (bal *Balancer) GetCurrentState(ctx context.Context, c *arvados.Client, pageSize, bufs int) error {
333         ctx, cancel := context.WithCancel(ctx)
334         defer cancel()
335
336         defer bal.time("get_state", "wall clock time to get current state")()
337         bal.BlockStateMap = NewBlockStateMap()
338
339         dd, err := c.DiscoveryDocument()
340         if err != nil {
341                 return err
342         }
343         bal.DefaultReplication = dd.DefaultCollectionReplication
344         bal.MinMtime = time.Now().UnixNano() - dd.BlobSignatureTTL*1e9
345
346         errs := make(chan error, 1)
347         wg := sync.WaitGroup{}
348
349         // When a device is mounted more than once, we will get its
350         // index only once, and call AddReplicas on all of the mounts.
351         // equivMount keys are the mounts that will be indexed, and
352         // each value is a list of mounts to apply the received index
353         // to.
354         equivMount := map[*KeepMount][]*KeepMount{}
355         // deviceMount maps each device ID to the one mount that will
356         // be indexed for that device.
357         deviceMount := map[string]*KeepMount{}
358         for _, srv := range bal.KeepServices {
359                 for _, mnt := range srv.mounts {
360                         equiv := deviceMount[mnt.DeviceID]
361                         if equiv == nil {
362                                 equiv = mnt
363                                 if mnt.DeviceID != "" {
364                                         deviceMount[mnt.DeviceID] = equiv
365                                 }
366                         }
367                         equivMount[equiv] = append(equivMount[equiv], mnt)
368                 }
369         }
370
371         // Start one goroutine for each (non-redundant) mount:
372         // retrieve the index, and add the returned blocks to
373         // BlockStateMap.
374         for _, mounts := range equivMount {
375                 wg.Add(1)
376                 go func(mounts []*KeepMount) {
377                         defer wg.Done()
378                         bal.logf("mount %s: retrieve index from %s", mounts[0], mounts[0].KeepService)
379                         idx, err := mounts[0].KeepService.IndexMount(ctx, c, mounts[0].UUID, "")
380                         if err != nil {
381                                 select {
382                                 case errs <- fmt.Errorf("%s: retrieve index: %v", mounts[0], err):
383                                 default:
384                                 }
385                                 cancel()
386                                 return
387                         }
388                         if len(errs) > 0 {
389                                 // Some other goroutine encountered an
390                                 // error -- any further effort here
391                                 // will be wasted.
392                                 return
393                         }
394                         for _, mount := range mounts {
395                                 bal.logf("%s: add %d entries to map", mount, len(idx))
396                                 bal.BlockStateMap.AddReplicas(mount, idx)
397                                 bal.logf("%s: added %d entries to map at %dx (%d replicas)", mount, len(idx), mount.Replication, len(idx)*mount.Replication)
398                         }
399                         bal.logf("mount %s: index done", mounts[0])
400                 }(mounts)
401         }
402
403         collQ := make(chan arvados.Collection, bufs)
404
405         // Retrieve all collections from the database and send them to
406         // collQ.
407         wg.Add(1)
408         go func() {
409                 defer wg.Done()
410                 err = EachCollection(ctx, bal.DB, c,
411                         func(coll arvados.Collection) error {
412                                 collQ <- coll
413                                 if len(errs) > 0 {
414                                         // some other GetCurrentState
415                                         // error happened: no point
416                                         // getting any more
417                                         // collections.
418                                         return fmt.Errorf("")
419                                 }
420                                 return nil
421                         }, func(done, total int) {
422                                 bal.logf("collections: %d/%d", done, total)
423                         })
424                 close(collQ)
425                 if err != nil {
426                         select {
427                         case errs <- err:
428                         default:
429                         }
430                         cancel()
431                 }
432         }()
433
434         // Parse manifests from collQ and pass the block hashes to
435         // BlockStateMap to track desired replication.
436         for i := 0; i < runtime.NumCPU(); i++ {
437                 wg.Add(1)
438                 go func() {
439                         defer wg.Done()
440                         for coll := range collQ {
441                                 err := bal.addCollection(coll)
442                                 if err != nil || len(errs) > 0 {
443                                         select {
444                                         case errs <- err:
445                                         default:
446                                         }
447                                         cancel()
448                                         continue
449                                 }
450                                 atomic.AddInt64(&bal.collScanned, 1)
451                         }
452                 }()
453         }
454
455         wg.Wait()
456         if len(errs) > 0 {
457                 return <-errs
458         }
459         return nil
460 }
461
462 func (bal *Balancer) addCollection(coll arvados.Collection) error {
463         blkids, err := coll.SizedDigests()
464         if err != nil {
465                 return fmt.Errorf("%v: %v", coll.UUID, err)
466         }
467         repl := bal.DefaultReplication
468         if coll.ReplicationDesired != nil {
469                 repl = *coll.ReplicationDesired
470         }
471         bal.Logger.Debugf("%v: %d blocks x%d", coll.UUID, len(blkids), repl)
472         // Pass pdh to IncreaseDesired only if LostBlocksFile is being
473         // written -- otherwise it's just a waste of memory.
474         pdh := ""
475         if bal.LostBlocksFile != "" {
476                 pdh = coll.PortableDataHash
477         }
478         bal.BlockStateMap.IncreaseDesired(pdh, coll.StorageClassesDesired, repl, blkids)
479         return nil
480 }
481
482 // ComputeChangeSets compares, for each known block, the current and
483 // desired replication states. If it is possible to get closer to the
484 // desired state by copying or deleting blocks, it adds those changes
485 // to the relevant KeepServices' ChangeSets.
486 //
487 // It does not actually apply any of the computed changes.
488 func (bal *Balancer) ComputeChangeSets() {
489         // This just calls balanceBlock() once for each block, using a
490         // pool of worker goroutines.
491         defer bal.time("changeset_compute", "wall clock time to compute changesets")()
492         bal.setupLookupTables()
493
494         type balanceTask struct {
495                 blkid arvados.SizedDigest
496                 blk   *BlockState
497         }
498         workers := runtime.GOMAXPROCS(-1)
499         todo := make(chan balanceTask, workers)
500         go func() {
501                 bal.BlockStateMap.Apply(func(blkid arvados.SizedDigest, blk *BlockState) {
502                         todo <- balanceTask{
503                                 blkid: blkid,
504                                 blk:   blk,
505                         }
506                 })
507                 close(todo)
508         }()
509         results := make(chan balanceResult, workers)
510         go func() {
511                 var wg sync.WaitGroup
512                 for i := 0; i < workers; i++ {
513                         wg.Add(1)
514                         go func() {
515                                 for work := range todo {
516                                         results <- bal.balanceBlock(work.blkid, work.blk)
517                                 }
518                                 wg.Done()
519                         }()
520                 }
521                 wg.Wait()
522                 close(results)
523         }()
524         bal.collectStatistics(results)
525 }
526
527 func (bal *Balancer) setupLookupTables() {
528         bal.serviceRoots = make(map[string]string)
529         bal.classes = defaultClasses
530         bal.mountsByClass = map[string]map[*KeepMount]bool{"default": {}}
531         bal.mounts = 0
532         for _, srv := range bal.KeepServices {
533                 bal.serviceRoots[srv.UUID] = srv.UUID
534                 for _, mnt := range srv.mounts {
535                         bal.mounts++
536
537                         // All mounts on a read-only service are
538                         // effectively read-only.
539                         mnt.ReadOnly = mnt.ReadOnly || srv.ReadOnly
540
541                         for class := range mnt.StorageClasses {
542                                 if mbc := bal.mountsByClass[class]; mbc == nil {
543                                         bal.classes = append(bal.classes, class)
544                                         bal.mountsByClass[class] = map[*KeepMount]bool{mnt: true}
545                                 } else {
546                                         mbc[mnt] = true
547                                 }
548                         }
549                 }
550         }
551         // Consider classes in lexicographic order to avoid flapping
552         // between balancing runs.  The outcome of the "prefer a mount
553         // we're already planning to use for a different storage
554         // class" case in balanceBlock depends on the order classes
555         // are considered.
556         sort.Strings(bal.classes)
557 }
558
559 const (
560         changeStay = iota
561         changePull
562         changeTrash
563         changeNone
564 )
565
566 var changeName = map[int]string{
567         changeStay:  "stay",
568         changePull:  "pull",
569         changeTrash: "trash",
570         changeNone:  "none",
571 }
572
573 type balancedBlockState struct {
574         needed       int
575         unneeded     int
576         pulling      int
577         unachievable bool
578 }
579
580 type balanceResult struct {
581         blk        *BlockState
582         blkid      arvados.SizedDigest
583         lost       bool
584         blockState balancedBlockState
585         classState map[string]balancedBlockState
586 }
587
588 type slot struct {
589         mnt  *KeepMount // never nil
590         repl *Replica   // replica already stored here (or nil)
591         want bool       // we should pull/leave a replica here
592 }
593
594 // balanceBlock compares current state to desired state for a single
595 // block, and makes the appropriate ChangeSet calls.
596 func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) balanceResult {
597         bal.Logger.Debugf("balanceBlock: %v %+v", blkid, blk)
598
599         // Build a list of all slots (one per mounted volume).
600         slots := make([]slot, 0, bal.mounts)
601         for _, srv := range bal.KeepServices {
602                 for _, mnt := range srv.mounts {
603                         var repl *Replica
604                         for r := range blk.Replicas {
605                                 if blk.Replicas[r].KeepMount == mnt {
606                                         repl = &blk.Replicas[r]
607                                 }
608                         }
609                         // Initial value of "want" is "have, and can't
610                         // delete". These untrashable replicas get
611                         // prioritized when sorting slots: otherwise,
612                         // non-optimal readonly copies would cause us
613                         // to overreplicate.
614                         slots = append(slots, slot{
615                                 mnt:  mnt,
616                                 repl: repl,
617                                 want: repl != nil && mnt.ReadOnly,
618                         })
619                 }
620         }
621
622         uuids := keepclient.NewRootSorter(bal.serviceRoots, string(blkid[:32])).GetSortedRoots()
623         srvRendezvous := make(map[*KeepService]int, len(uuids))
624         for i, uuid := range uuids {
625                 srv := bal.KeepServices[uuid]
626                 srvRendezvous[srv] = i
627         }
628
629         // Below we set underreplicated=true if we find any storage
630         // class that's currently underreplicated -- in that case we
631         // won't want to trash any replicas.
632         underreplicated := false
633
634         unsafeToDelete := make(map[int64]bool, len(slots))
635         for _, class := range bal.classes {
636                 desired := blk.Desired[class]
637                 if desired == 0 {
638                         continue
639                 }
640
641                 // Sort the slots by desirability.
642                 sort.Slice(slots, func(i, j int) bool {
643                         si, sj := slots[i], slots[j]
644                         if classi, classj := bal.mountsByClass[class][si.mnt], bal.mountsByClass[class][sj.mnt]; classi != classj {
645                                 // Prefer a mount that satisfies the
646                                 // desired class.
647                                 return bal.mountsByClass[class][si.mnt]
648                         } else if si.want != sj.want {
649                                 // Prefer a mount that will have a
650                                 // replica no matter what we do here
651                                 // -- either because it already has an
652                                 // untrashable replica, or because we
653                                 // already need it to satisfy a
654                                 // different storage class.
655                                 return si.want
656                         } else if orderi, orderj := srvRendezvous[si.mnt.KeepService], srvRendezvous[sj.mnt.KeepService]; orderi != orderj {
657                                 // Prefer a better rendezvous
658                                 // position.
659                                 return orderi < orderj
660                         } else if repli, replj := si.repl != nil, sj.repl != nil; repli != replj {
661                                 // Prefer a mount that already has a
662                                 // replica.
663                                 return repli
664                         } else {
665                                 // If pull/trash turns out to be
666                                 // needed, distribute the
667                                 // new/remaining replicas uniformly
668                                 // across qualifying mounts on a given
669                                 // server.
670                                 return rendezvousLess(si.mnt.DeviceID, sj.mnt.DeviceID, blkid)
671                         }
672                 })
673
674                 // Servers/mounts/devices (with or without existing
675                 // replicas) that are part of the best achievable
676                 // layout for this storage class.
677                 wantSrv := map[*KeepService]bool{}
678                 wantMnt := map[*KeepMount]bool{}
679                 wantDev := map[string]bool{}
680                 // Positions (with existing replicas) that have been
681                 // protected (via unsafeToDelete) to ensure we don't
682                 // reduce replication below desired level when
683                 // trashing replicas that aren't optimal positions for
684                 // any storage class.
685                 protMnt := map[*KeepMount]bool{}
686                 // Replication planned so far (corresponds to wantMnt).
687                 replWant := 0
688                 // Protected replication (corresponds to protMnt).
689                 replProt := 0
690
691                 // trySlot tries using a slot to meet requirements,
692                 // and returns true if all requirements are met.
693                 trySlot := func(i int) bool {
694                         slot := slots[i]
695                         if wantMnt[slot.mnt] || wantDev[slot.mnt.DeviceID] {
696                                 // Already allocated a replica to this
697                                 // backend device, possibly on a
698                                 // different server.
699                                 return false
700                         }
701                         if replProt < desired && slot.repl != nil && !protMnt[slot.mnt] {
702                                 unsafeToDelete[slot.repl.Mtime] = true
703                                 protMnt[slot.mnt] = true
704                                 replProt += slot.mnt.Replication
705                         }
706                         if replWant < desired && (slot.repl != nil || !slot.mnt.ReadOnly) {
707                                 slots[i].want = true
708                                 wantSrv[slot.mnt.KeepService] = true
709                                 wantMnt[slot.mnt] = true
710                                 if slot.mnt.DeviceID != "" {
711                                         wantDev[slot.mnt.DeviceID] = true
712                                 }
713                                 replWant += slot.mnt.Replication
714                         }
715                         return replProt >= desired && replWant >= desired
716                 }
717
718                 // First try to achieve desired replication without
719                 // using the same server twice.
720                 done := false
721                 for i := 0; i < len(slots) && !done; i++ {
722                         if !wantSrv[slots[i].mnt.KeepService] {
723                                 done = trySlot(i)
724                         }
725                 }
726
727                 // If that didn't suffice, do another pass without the
728                 // "distinct services" restriction. (Achieving the
729                 // desired volume replication on fewer than the
730                 // desired number of services is better than
731                 // underreplicating.)
732                 for i := 0; i < len(slots) && !done; i++ {
733                         done = trySlot(i)
734                 }
735
736                 if !underreplicated {
737                         safe := 0
738                         for _, slot := range slots {
739                                 if slot.repl == nil || !bal.mountsByClass[class][slot.mnt] {
740                                         continue
741                                 }
742                                 if safe += slot.mnt.Replication; safe >= desired {
743                                         break
744                                 }
745                         }
746                         underreplicated = safe < desired
747                 }
748
749                 // Avoid deleting wanted replicas from devices that
750                 // are mounted on multiple servers -- even if they
751                 // haven't already been added to unsafeToDelete
752                 // because the servers report different Mtimes.
753                 for _, slot := range slots {
754                         if slot.repl != nil && wantDev[slot.mnt.DeviceID] {
755                                 unsafeToDelete[slot.repl.Mtime] = true
756                         }
757                 }
758         }
759
760         // TODO: If multiple replicas are trashable, prefer the oldest
761         // replica that doesn't have a timestamp collision with
762         // others.
763
764         for i, slot := range slots {
765                 // Don't trash (1) any replicas of an underreplicated
766                 // block, even if they're in the wrong positions, or
767                 // (2) any replicas whose Mtimes are identical to
768                 // needed replicas (in case we're really seeing the
769                 // same copy via different mounts).
770                 if slot.repl != nil && (underreplicated || unsafeToDelete[slot.repl.Mtime]) {
771                         slots[i].want = true
772                 }
773         }
774
775         classState := make(map[string]balancedBlockState, len(bal.classes))
776         for _, class := range bal.classes {
777                 classState[class] = computeBlockState(slots, bal.mountsByClass[class], len(blk.Replicas), blk.Desired[class])
778         }
779         blockState := computeBlockState(slots, nil, len(blk.Replicas), 0)
780
781         var lost bool
782         var changes []string
783         for _, slot := range slots {
784                 // TODO: request a Touch if Mtime is duplicated.
785                 var change int
786                 switch {
787                 case !slot.want && slot.repl != nil && slot.repl.Mtime < bal.MinMtime:
788                         slot.mnt.KeepService.AddTrash(Trash{
789                                 SizedDigest: blkid,
790                                 Mtime:       slot.repl.Mtime,
791                                 From:        slot.mnt,
792                         })
793                         change = changeTrash
794                 case slot.repl == nil && slot.want && len(blk.Replicas) == 0:
795                         lost = true
796                         change = changeNone
797                 case slot.repl == nil && slot.want && !slot.mnt.ReadOnly:
798                         slot.mnt.KeepService.AddPull(Pull{
799                                 SizedDigest: blkid,
800                                 From:        blk.Replicas[0].KeepMount.KeepService,
801                                 To:          slot.mnt,
802                         })
803                         change = changePull
804                 case slot.repl != nil:
805                         change = changeStay
806                 default:
807                         change = changeNone
808                 }
809                 if bal.Dumper != nil {
810                         var mtime int64
811                         if slot.repl != nil {
812                                 mtime = slot.repl.Mtime
813                         }
814                         srv := slot.mnt.KeepService
815                         changes = append(changes, fmt.Sprintf("%s:%d/%s=%s,%d", srv.ServiceHost, srv.ServicePort, slot.mnt.UUID, changeName[change], mtime))
816                 }
817         }
818         if bal.Dumper != nil {
819                 bal.Dumper.Printf("%s refs=%d needed=%d unneeded=%d pulling=%v %v %v", blkid, blk.RefCount, blockState.needed, blockState.unneeded, blockState.pulling, blk.Desired, changes)
820         }
821         return balanceResult{
822                 blk:        blk,
823                 blkid:      blkid,
824                 lost:       lost,
825                 blockState: blockState,
826                 classState: classState,
827         }
828 }
829
830 func computeBlockState(slots []slot, onlyCount map[*KeepMount]bool, have, needRepl int) (bbs balancedBlockState) {
831         repl := 0
832         countedDev := map[string]bool{}
833         for _, slot := range slots {
834                 if onlyCount != nil && !onlyCount[slot.mnt] {
835                         continue
836                 }
837                 if countedDev[slot.mnt.DeviceID] {
838                         continue
839                 }
840                 switch {
841                 case slot.repl != nil && slot.want:
842                         bbs.needed++
843                         repl += slot.mnt.Replication
844                 case slot.repl != nil && !slot.want:
845                         bbs.unneeded++
846                         repl += slot.mnt.Replication
847                 case slot.repl == nil && slot.want && have > 0:
848                         bbs.pulling++
849                         repl += slot.mnt.Replication
850                 }
851                 if slot.mnt.DeviceID != "" {
852                         countedDev[slot.mnt.DeviceID] = true
853                 }
854         }
855         if repl < needRepl {
856                 bbs.unachievable = true
857         }
858         return
859 }
860
861 type blocksNBytes struct {
862         replicas int
863         blocks   int
864         bytes    int64
865 }
866
867 func (bb blocksNBytes) String() string {
868         return fmt.Sprintf("%d replicas (%d blocks, %d bytes)", bb.replicas, bb.blocks, bb.bytes)
869 }
870
871 type replicationStats struct {
872         needed       blocksNBytes
873         unneeded     blocksNBytes
874         pulling      blocksNBytes
875         unachievable blocksNBytes
876 }
877
878 type balancerStats struct {
879         lost          blocksNBytes
880         overrep       blocksNBytes
881         unref         blocksNBytes
882         garbage       blocksNBytes
883         underrep      blocksNBytes
884         unachievable  blocksNBytes
885         justright     blocksNBytes
886         desired       blocksNBytes
887         current       blocksNBytes
888         pulls         int
889         trashes       int
890         replHistogram []int
891         classStats    map[string]replicationStats
892
893         // collectionBytes / collectionBlockBytes = deduplication ratio
894         collectionBytes      int64 // sum(bytes in referenced blocks) across all collections
895         collectionBlockBytes int64 // sum(block size) across all blocks referenced by collections
896         collectionBlockRefs  int64 // sum(number of blocks referenced) across all collections
897         collectionBlocks     int64 // number of blocks referenced by any collection
898 }
899
900 func (s *balancerStats) dedupByteRatio() float64 {
901         if s.collectionBlockBytes == 0 {
902                 return 0
903         }
904         return float64(s.collectionBytes) / float64(s.collectionBlockBytes)
905 }
906
907 func (s *balancerStats) dedupBlockRatio() float64 {
908         if s.collectionBlocks == 0 {
909                 return 0
910         }
911         return float64(s.collectionBlockRefs) / float64(s.collectionBlocks)
912 }
913
914 func (bal *Balancer) collectStatistics(results <-chan balanceResult) {
915         var s balancerStats
916         s.replHistogram = make([]int, 2)
917         s.classStats = make(map[string]replicationStats, len(bal.classes))
918         for result := range results {
919                 bytes := result.blkid.Size()
920
921                 if rc := int64(result.blk.RefCount); rc > 0 {
922                         s.collectionBytes += rc * bytes
923                         s.collectionBlockBytes += bytes
924                         s.collectionBlockRefs += rc
925                         s.collectionBlocks++
926                 }
927
928                 for class, state := range result.classState {
929                         cs := s.classStats[class]
930                         if state.unachievable {
931                                 cs.unachievable.replicas++
932                                 cs.unachievable.blocks++
933                                 cs.unachievable.bytes += bytes
934                         }
935                         if state.needed > 0 {
936                                 cs.needed.replicas += state.needed
937                                 cs.needed.blocks++
938                                 cs.needed.bytes += bytes * int64(state.needed)
939                         }
940                         if state.unneeded > 0 {
941                                 cs.unneeded.replicas += state.unneeded
942                                 cs.unneeded.blocks++
943                                 cs.unneeded.bytes += bytes * int64(state.unneeded)
944                         }
945                         if state.pulling > 0 {
946                                 cs.pulling.replicas += state.pulling
947                                 cs.pulling.blocks++
948                                 cs.pulling.bytes += bytes * int64(state.pulling)
949                         }
950                         s.classStats[class] = cs
951                 }
952
953                 bs := result.blockState
954                 switch {
955                 case result.lost:
956                         s.lost.replicas++
957                         s.lost.blocks++
958                         s.lost.bytes += bytes
959                         fmt.Fprintf(bal.lostBlocks, "%s", strings.SplitN(string(result.blkid), "+", 2)[0])
960                         for pdh := range result.blk.Refs {
961                                 fmt.Fprintf(bal.lostBlocks, " %s", pdh)
962                         }
963                         fmt.Fprint(bal.lostBlocks, "\n")
964                 case bs.pulling > 0:
965                         s.underrep.replicas += bs.pulling
966                         s.underrep.blocks++
967                         s.underrep.bytes += bytes * int64(bs.pulling)
968                 case bs.unachievable:
969                         s.underrep.replicas++
970                         s.underrep.blocks++
971                         s.underrep.bytes += bytes
972                 case bs.unneeded > 0 && bs.needed == 0:
973                         // Count as "garbage" if all replicas are old
974                         // enough to trash, otherwise count as
975                         // "unref".
976                         counter := &s.garbage
977                         for _, r := range result.blk.Replicas {
978                                 if r.Mtime >= bal.MinMtime {
979                                         counter = &s.unref
980                                         break
981                                 }
982                         }
983                         counter.replicas += bs.unneeded
984                         counter.blocks++
985                         counter.bytes += bytes * int64(bs.unneeded)
986                 case bs.unneeded > 0:
987                         s.overrep.replicas += bs.unneeded
988                         s.overrep.blocks++
989                         s.overrep.bytes += bytes * int64(bs.unneeded)
990                 default:
991                         s.justright.replicas += bs.needed
992                         s.justright.blocks++
993                         s.justright.bytes += bytes * int64(bs.needed)
994                 }
995
996                 if bs.needed > 0 {
997                         s.desired.replicas += bs.needed
998                         s.desired.blocks++
999                         s.desired.bytes += bytes * int64(bs.needed)
1000                 }
1001                 if bs.needed+bs.unneeded > 0 {
1002                         s.current.replicas += bs.needed + bs.unneeded
1003                         s.current.blocks++
1004                         s.current.bytes += bytes * int64(bs.needed+bs.unneeded)
1005                 }
1006
1007                 for len(s.replHistogram) <= bs.needed+bs.unneeded {
1008                         s.replHistogram = append(s.replHistogram, 0)
1009                 }
1010                 s.replHistogram[bs.needed+bs.unneeded]++
1011         }
1012         for _, srv := range bal.KeepServices {
1013                 s.pulls += len(srv.ChangeSet.Pulls)
1014                 s.trashes += len(srv.ChangeSet.Trashes)
1015         }
1016         bal.stats = s
1017         bal.Metrics.UpdateStats(s)
1018 }
1019
1020 // PrintStatistics writes statistics about the computed changes to
1021 // bal.Logger. It should not be called until ComputeChangeSets has
1022 // finished.
1023 func (bal *Balancer) PrintStatistics() {
1024         bal.logf("===")
1025         bal.logf("%s lost (0=have<want)", bal.stats.lost)
1026         bal.logf("%s underreplicated (0<have<want)", bal.stats.underrep)
1027         bal.logf("%s just right (have=want)", bal.stats.justright)
1028         bal.logf("%s overreplicated (have>want>0)", bal.stats.overrep)
1029         bal.logf("%s unreferenced (have>want=0, new)", bal.stats.unref)
1030         bal.logf("%s garbage (have>want=0, old)", bal.stats.garbage)
1031         for _, class := range bal.classes {
1032                 cs := bal.stats.classStats[class]
1033                 bal.logf("===")
1034                 bal.logf("storage class %q: %s needed", class, cs.needed)
1035                 bal.logf("storage class %q: %s unneeded", class, cs.unneeded)
1036                 bal.logf("storage class %q: %s pulling", class, cs.pulling)
1037                 bal.logf("storage class %q: %s unachievable", class, cs.unachievable)
1038         }
1039         bal.logf("===")
1040         bal.logf("%s total commitment (excluding unreferenced)", bal.stats.desired)
1041         bal.logf("%s total usage", bal.stats.current)
1042         bal.logf("===")
1043         for _, srv := range bal.KeepServices {
1044                 bal.logf("%s: %v\n", srv, srv.ChangeSet)
1045         }
1046         bal.logf("===")
1047         bal.printHistogram(60)
1048         bal.logf("===")
1049 }
1050
1051 func (bal *Balancer) printHistogram(hashColumns int) {
1052         bal.logf("Replication level distribution:")
1053         maxCount := 0
1054         for _, count := range bal.stats.replHistogram {
1055                 if maxCount < count {
1056                         maxCount = count
1057                 }
1058         }
1059         hashes := strings.Repeat("#", hashColumns)
1060         countWidth := 1 + int(math.Log10(float64(maxCount+1)))
1061         scaleCount := 10 * float64(hashColumns) / math.Floor(1+10*math.Log10(float64(maxCount+1)))
1062         for repl, count := range bal.stats.replHistogram {
1063                 nHashes := int(scaleCount * math.Log10(float64(count+1)))
1064                 bal.logf("%2d: %*d %s", repl, countWidth, count, hashes[:nHashes])
1065         }
1066 }
1067
1068 // CheckSanityLate checks for configuration and runtime errors after
1069 // GetCurrentState() and ComputeChangeSets() have finished.
1070 //
1071 // If it returns an error, it is dangerous to run any Commit methods.
1072 func (bal *Balancer) CheckSanityLate() error {
1073         if bal.errors != nil {
1074                 for _, err := range bal.errors {
1075                         bal.logf("deferred error: %v", err)
1076                 }
1077                 return fmt.Errorf("cannot proceed safely after deferred errors")
1078         }
1079
1080         if bal.collScanned == 0 {
1081                 return fmt.Errorf("received zero collections")
1082         }
1083
1084         anyDesired := false
1085         bal.BlockStateMap.Apply(func(_ arvados.SizedDigest, blk *BlockState) {
1086                 for _, desired := range blk.Desired {
1087                         if desired > 0 {
1088                                 anyDesired = true
1089                                 break
1090                         }
1091                 }
1092         })
1093         if !anyDesired {
1094                 return fmt.Errorf("zero blocks have desired replication>0")
1095         }
1096
1097         if dr := bal.DefaultReplication; dr < 1 {
1098                 return fmt.Errorf("Default replication (%d) is less than 1", dr)
1099         }
1100
1101         // TODO: no two services have identical indexes
1102         // TODO: no collisions (same md5, different size)
1103         return nil
1104 }
1105
1106 // CommitPulls sends the computed lists of pull requests to the
1107 // keepstore servers. This has the effect of increasing replication of
1108 // existing blocks that are either underreplicated or poorly
1109 // distributed according to rendezvous hashing.
1110 func (bal *Balancer) CommitPulls(ctx context.Context, c *arvados.Client) error {
1111         defer bal.time("send_pull_lists", "wall clock time to send pull lists")()
1112         return bal.commitAsync(c, "send pull list",
1113                 func(srv *KeepService) error {
1114                         return srv.CommitPulls(ctx, c)
1115                 })
1116 }
1117
1118 // CommitTrash sends the computed lists of trash requests to the
1119 // keepstore servers. This has the effect of deleting blocks that are
1120 // overreplicated or unreferenced.
1121 func (bal *Balancer) CommitTrash(ctx context.Context, c *arvados.Client) error {
1122         defer bal.time("send_trash_lists", "wall clock time to send trash lists")()
1123         return bal.commitAsync(c, "send trash list",
1124                 func(srv *KeepService) error {
1125                         return srv.CommitTrash(ctx, c)
1126                 })
1127 }
1128
1129 func (bal *Balancer) commitAsync(c *arvados.Client, label string, f func(srv *KeepService) error) error {
1130         errs := make(chan error)
1131         for _, srv := range bal.KeepServices {
1132                 go func(srv *KeepService) {
1133                         var err error
1134                         defer func() { errs <- err }()
1135                         label := fmt.Sprintf("%s: %v", srv, label)
1136                         err = f(srv)
1137                         if err != nil {
1138                                 err = fmt.Errorf("%s: %v", label, err)
1139                         }
1140                 }(srv)
1141         }
1142         var lastErr error
1143         for range bal.KeepServices {
1144                 if err := <-errs; err != nil {
1145                         bal.logf("%v", err)
1146                         lastErr = err
1147                 }
1148         }
1149         close(errs)
1150         return lastErr
1151 }
1152
1153 func (bal *Balancer) logf(f string, args ...interface{}) {
1154         if bal.Logger != nil {
1155                 bal.Logger.Printf(f, args...)
1156         }
1157 }
1158
1159 func (bal *Balancer) time(name, help string) func() {
1160         observer := bal.Metrics.DurationObserver(name+"_seconds", help)
1161         t0 := time.Now()
1162         bal.Logger.Printf("%s: start", name)
1163         return func() {
1164                 dur := time.Since(t0)
1165                 observer.Observe(dur.Seconds())
1166                 bal.Logger.Printf("%s: took %vs", name, dur.Seconds())
1167         }
1168 }
1169
1170 // Rendezvous hash sort function. Less efficient than sorting on
1171 // precomputed rendezvous hashes, but also rarely used.
1172 func rendezvousLess(i, j string, blkid arvados.SizedDigest) bool {
1173         a := md5.Sum([]byte(string(blkid[:32]) + i))
1174         b := md5.Sum([]byte(string(blkid[:32]) + j))
1175         return bytes.Compare(a[:], b[:]) < 0
1176 }