17574: Update replication_confirmed fields after keep-balance run.
[arvados.git] / services / keep-balance / balance.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "context"
10         "crypto/md5"
11         "fmt"
12         "io"
13         "io/ioutil"
14         "log"
15         "math"
16         "os"
17         "runtime"
18         "sort"
19         "strings"
20         "sync"
21         "syscall"
22         "time"
23
24         "git.arvados.org/arvados.git/sdk/go/arvados"
25         "git.arvados.org/arvados.git/sdk/go/keepclient"
26         "github.com/sirupsen/logrus"
27 )
28
29 // Balancer compares the contents of keepstore servers with the
30 // collections stored in Arvados, and issues pull/trash requests
31 // needed to get (closer to) the optimal data layout.
32 //
33 // In the optimal data layout: every data block referenced by a
34 // collection is replicated at least as many times as desired by the
35 // collection; there are no unreferenced data blocks older than
36 // BlobSignatureTTL; and all N existing replicas of a given data block
37 // are in the N best positions in rendezvous probe order.
38 type Balancer struct {
39         Logger  logrus.FieldLogger
40         Dumper  logrus.FieldLogger
41         Metrics *metrics
42
43         LostBlocksFile string
44
45         *BlockStateMap
46         KeepServices       map[string]*KeepService
47         DefaultReplication int
48         MinMtime           int64
49
50         classes       []string
51         mounts        int
52         mountsByClass map[string]map[*KeepMount]bool
53         collScanned   int
54         serviceRoots  map[string]string
55         errors        []error
56         stats         balancerStats
57         mutex         sync.Mutex
58         lostBlocks    io.Writer
59 }
60
61 // Run performs a balance operation using the given config and
62 // runOptions, and returns RunOptions suitable for passing to a
63 // subsequent balance operation.
64 //
65 // Run should only be called once on a given Balancer object.
66 //
67 // Typical usage:
68 //
69 //   runOptions, err = (&Balancer{}).Run(config, runOptions)
70 func (bal *Balancer) Run(client *arvados.Client, cluster *arvados.Cluster, runOptions RunOptions) (nextRunOptions RunOptions, err error) {
71         nextRunOptions = runOptions
72
73         defer bal.time("sweep", "wall clock time to run one full sweep")()
74
75         ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(cluster.Collections.BalanceTimeout.Duration()))
76         defer cancel()
77
78         var lbFile *os.File
79         if bal.LostBlocksFile != "" {
80                 tmpfn := bal.LostBlocksFile + ".tmp"
81                 lbFile, err = os.OpenFile(tmpfn, os.O_CREATE|os.O_WRONLY, 0777)
82                 if err != nil {
83                         return
84                 }
85                 defer lbFile.Close()
86                 err = syscall.Flock(int(lbFile.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
87                 if err != nil {
88                         return
89                 }
90                 defer func() {
91                         // Remove the tempfile only if we didn't get
92                         // as far as successfully renaming it.
93                         if lbFile != nil {
94                                 os.Remove(tmpfn)
95                         }
96                 }()
97                 bal.lostBlocks = lbFile
98         } else {
99                 bal.lostBlocks = ioutil.Discard
100         }
101
102         err = bal.DiscoverKeepServices(client)
103         if err != nil {
104                 return
105         }
106
107         for _, srv := range bal.KeepServices {
108                 err = srv.discoverMounts(client)
109                 if err != nil {
110                         return
111                 }
112         }
113         bal.cleanupMounts()
114
115         if err = bal.CheckSanityEarly(client); err != nil {
116                 return
117         }
118
119         // On a big site, indexing and sending trash/pull lists can
120         // take much longer than the usual 5 minute client
121         // timeout. From here on, we rely on the context deadline
122         // instead, aborting the entire operation if any part takes
123         // too long.
124         client.Timeout = 0
125
126         rs := bal.rendezvousState()
127         if runOptions.CommitTrash && rs != runOptions.SafeRendezvousState {
128                 if runOptions.SafeRendezvousState != "" {
129                         bal.logf("notice: KeepServices list has changed since last run")
130                 }
131                 bal.logf("clearing existing trash lists, in case the new rendezvous order differs from previous run")
132                 if err = bal.ClearTrashLists(ctx, client); err != nil {
133                         return
134                 }
135                 // The current rendezvous state becomes "safe" (i.e.,
136                 // OK to compute changes for that state without
137                 // clearing existing trash lists) only now, after we
138                 // succeed in clearing existing trash lists.
139                 nextRunOptions.SafeRendezvousState = rs
140         }
141
142         if err = bal.GetCurrentState(ctx, client, cluster.Collections.BalanceCollectionBatch, cluster.Collections.BalanceCollectionBuffers); err != nil {
143                 return
144         }
145         bal.ComputeChangeSets()
146         bal.PrintStatistics()
147         if err = bal.CheckSanityLate(); err != nil {
148                 return
149         }
150         if lbFile != nil {
151                 err = lbFile.Sync()
152                 if err != nil {
153                         return
154                 }
155                 err = os.Rename(bal.LostBlocksFile+".tmp", bal.LostBlocksFile)
156                 if err != nil {
157                         return
158                 }
159                 lbFile = nil
160         }
161         if runOptions.CommitPulls {
162                 err = bal.CommitPulls(ctx, client)
163                 if err != nil {
164                         // Skip trash if we can't pull. (Too cautious?)
165                         return
166                 }
167         }
168         if runOptions.CommitTrash {
169                 err = bal.CommitTrash(ctx, client)
170                 if err != nil {
171                         return
172                 }
173         }
174         err = bal.updateCollections(ctx, client, cluster)
175         return
176 }
177
178 // SetKeepServices sets the list of KeepServices to operate on.
179 func (bal *Balancer) SetKeepServices(srvList arvados.KeepServiceList) error {
180         bal.KeepServices = make(map[string]*KeepService)
181         for _, srv := range srvList.Items {
182                 bal.KeepServices[srv.UUID] = &KeepService{
183                         KeepService: srv,
184                         ChangeSet:   &ChangeSet{},
185                 }
186         }
187         return nil
188 }
189
190 // DiscoverKeepServices sets the list of KeepServices by calling the
191 // API to get a list of all services, and selecting the ones whose
192 // ServiceType is "disk"
193 func (bal *Balancer) DiscoverKeepServices(c *arvados.Client) error {
194         bal.KeepServices = make(map[string]*KeepService)
195         return c.EachKeepService(func(srv arvados.KeepService) error {
196                 if srv.ServiceType == "disk" {
197                         bal.KeepServices[srv.UUID] = &KeepService{
198                                 KeepService: srv,
199                                 ChangeSet:   &ChangeSet{},
200                         }
201                 } else {
202                         bal.logf("skipping %v with service type %q", srv.UUID, srv.ServiceType)
203                 }
204                 return nil
205         })
206 }
207
208 func (bal *Balancer) cleanupMounts() {
209         rwdev := map[string]*KeepService{}
210         for _, srv := range bal.KeepServices {
211                 for _, mnt := range srv.mounts {
212                         if !mnt.ReadOnly && mnt.DeviceID != "" {
213                                 rwdev[mnt.DeviceID] = srv
214                         }
215                 }
216         }
217         // Drop the readonly mounts whose device is mounted RW
218         // elsewhere.
219         for _, srv := range bal.KeepServices {
220                 var dedup []*KeepMount
221                 for _, mnt := range srv.mounts {
222                         if mnt.ReadOnly && rwdev[mnt.DeviceID] != nil {
223                                 bal.logf("skipping srv %s readonly mount %q because same device %q is mounted read-write on srv %s", srv, mnt.UUID, mnt.DeviceID, rwdev[mnt.DeviceID])
224                         } else {
225                                 dedup = append(dedup, mnt)
226                         }
227                 }
228                 srv.mounts = dedup
229         }
230         for _, srv := range bal.KeepServices {
231                 for _, mnt := range srv.mounts {
232                         if mnt.Replication <= 0 {
233                                 log.Printf("%s: mount %s reports replication=%d, using replication=1", srv, mnt.UUID, mnt.Replication)
234                                 mnt.Replication = 1
235                         }
236                 }
237         }
238 }
239
240 // CheckSanityEarly checks for configuration and runtime errors that
241 // can be detected before GetCurrentState() and ComputeChangeSets()
242 // are called.
243 //
244 // If it returns an error, it is pointless to run GetCurrentState or
245 // ComputeChangeSets: after doing so, the statistics would be
246 // meaningless and it would be dangerous to run any Commit methods.
247 func (bal *Balancer) CheckSanityEarly(c *arvados.Client) error {
248         u, err := c.CurrentUser()
249         if err != nil {
250                 return fmt.Errorf("CurrentUser(): %v", err)
251         }
252         if !u.IsActive || !u.IsAdmin {
253                 return fmt.Errorf("current user (%s) is not an active admin user", u.UUID)
254         }
255         for _, srv := range bal.KeepServices {
256                 if srv.ServiceType == "proxy" {
257                         return fmt.Errorf("config error: %s: proxy servers cannot be balanced", srv)
258                 }
259         }
260
261         var checkPage arvados.CollectionList
262         if err = c.RequestAndDecode(&checkPage, "GET", "arvados/v1/collections", nil, arvados.ResourceListParams{
263                 Limit:              new(int),
264                 Count:              "exact",
265                 IncludeTrash:       true,
266                 IncludeOldVersions: true,
267                 Filters: []arvados.Filter{{
268                         Attr:     "modified_at",
269                         Operator: "=",
270                         Operand:  nil,
271                 }},
272         }); err != nil {
273                 return err
274         } else if n := checkPage.ItemsAvailable; n > 0 {
275                 return fmt.Errorf("%d collections exist with null modified_at; cannot fetch reliably", n)
276         }
277
278         return nil
279 }
280
281 // rendezvousState returns a fingerprint (e.g., a sorted list of
282 // UUID+host+port) of the current set of keep services.
283 func (bal *Balancer) rendezvousState() string {
284         srvs := make([]string, 0, len(bal.KeepServices))
285         for _, srv := range bal.KeepServices {
286                 srvs = append(srvs, srv.String())
287         }
288         sort.Strings(srvs)
289         return strings.Join(srvs, "; ")
290 }
291
292 // ClearTrashLists sends an empty trash list to each keep
293 // service. Calling this before GetCurrentState avoids races.
294 //
295 // When a block appears in an index, we assume that replica will still
296 // exist after we delete other replicas on other servers. However,
297 // it's possible that a previous rebalancing operation made different
298 // decisions (e.g., servers were added/removed, and rendezvous order
299 // changed). In this case, the replica might already be on that
300 // server's trash list, and it might be deleted before we send a
301 // replacement trash list.
302 //
303 // We avoid this problem if we clear all trash lists before getting
304 // indexes. (We also assume there is only one rebalancing process
305 // running at a time.)
306 func (bal *Balancer) ClearTrashLists(ctx context.Context, c *arvados.Client) error {
307         for _, srv := range bal.KeepServices {
308                 srv.ChangeSet = &ChangeSet{}
309         }
310         return bal.CommitTrash(ctx, c)
311 }
312
313 // GetCurrentState determines the current replication state, and the
314 // desired replication level, for every block that is either
315 // retrievable or referenced.
316 //
317 // It determines the current replication state by reading the block index
318 // from every known Keep service.
319 //
320 // It determines the desired replication level by retrieving all
321 // collection manifests in the database (API server).
322 //
323 // It encodes the resulting information in BlockStateMap.
324 func (bal *Balancer) GetCurrentState(ctx context.Context, c *arvados.Client, pageSize, bufs int) error {
325         ctx, cancel := context.WithCancel(ctx)
326         defer cancel()
327
328         defer bal.time("get_state", "wall clock time to get current state")()
329         bal.BlockStateMap = NewBlockStateMap()
330
331         dd, err := c.DiscoveryDocument()
332         if err != nil {
333                 return err
334         }
335         bal.DefaultReplication = dd.DefaultCollectionReplication
336         bal.MinMtime = time.Now().UnixNano() - dd.BlobSignatureTTL*1e9
337
338         errs := make(chan error, 1)
339         wg := sync.WaitGroup{}
340
341         // When a device is mounted more than once, we will get its
342         // index only once, and call AddReplicas on all of the mounts.
343         // equivMount keys are the mounts that will be indexed, and
344         // each value is a list of mounts to apply the received index
345         // to.
346         equivMount := map[*KeepMount][]*KeepMount{}
347         // deviceMount maps each device ID to the one mount that will
348         // be indexed for that device.
349         deviceMount := map[string]*KeepMount{}
350         for _, srv := range bal.KeepServices {
351                 for _, mnt := range srv.mounts {
352                         equiv := deviceMount[mnt.DeviceID]
353                         if equiv == nil {
354                                 equiv = mnt
355                                 if mnt.DeviceID != "" {
356                                         deviceMount[mnt.DeviceID] = equiv
357                                 }
358                         }
359                         equivMount[equiv] = append(equivMount[equiv], mnt)
360                 }
361         }
362
363         // Start one goroutine for each (non-redundant) mount:
364         // retrieve the index, and add the returned blocks to
365         // BlockStateMap.
366         for _, mounts := range equivMount {
367                 wg.Add(1)
368                 go func(mounts []*KeepMount) {
369                         defer wg.Done()
370                         bal.logf("mount %s: retrieve index from %s", mounts[0], mounts[0].KeepService)
371                         idx, err := mounts[0].KeepService.IndexMount(ctx, c, mounts[0].UUID, "")
372                         if err != nil {
373                                 select {
374                                 case errs <- fmt.Errorf("%s: retrieve index: %v", mounts[0], err):
375                                 default:
376                                 }
377                                 cancel()
378                                 return
379                         }
380                         if len(errs) > 0 {
381                                 // Some other goroutine encountered an
382                                 // error -- any further effort here
383                                 // will be wasted.
384                                 return
385                         }
386                         for _, mount := range mounts {
387                                 bal.logf("%s: add %d entries to map", mount, len(idx))
388                                 bal.BlockStateMap.AddReplicas(mount, idx)
389                                 bal.logf("%s: added %d entries to map at %dx (%d replicas)", mount, len(idx), mount.Replication, len(idx)*mount.Replication)
390                         }
391                         bal.logf("mount %s: index done", mounts[0])
392                 }(mounts)
393         }
394
395         // collQ buffers incoming collections so we can start fetching
396         // the next page without waiting for the current page to
397         // finish processing.
398         collQ := make(chan arvados.Collection, bufs)
399
400         // Start a goroutine to process collections. (We could use a
401         // worker pool here, but even with a single worker we already
402         // process collections much faster than we can retrieve them.)
403         wg.Add(1)
404         go func() {
405                 defer wg.Done()
406                 for coll := range collQ {
407                         err := bal.addCollection(coll)
408                         if err != nil || len(errs) > 0 {
409                                 select {
410                                 case errs <- err:
411                                 default:
412                                 }
413                                 for range collQ {
414                                 }
415                                 cancel()
416                                 return
417                         }
418                         bal.collScanned++
419                 }
420         }()
421
422         // Start a goroutine to retrieve all collections from the
423         // Arvados database and send them to collQ for processing.
424         wg.Add(1)
425         go func() {
426                 defer wg.Done()
427                 err = EachCollection(ctx, c, pageSize,
428                         func(coll arvados.Collection) error {
429                                 collQ <- coll
430                                 if len(errs) > 0 {
431                                         // some other GetCurrentState
432                                         // error happened: no point
433                                         // getting any more
434                                         // collections.
435                                         return fmt.Errorf("")
436                                 }
437                                 return nil
438                         }, func(done, total int) {
439                                 bal.logf("collections: %d/%d", done, total)
440                         })
441                 close(collQ)
442                 if err != nil {
443                         select {
444                         case errs <- err:
445                         default:
446                         }
447                         cancel()
448                 }
449         }()
450
451         wg.Wait()
452         if len(errs) > 0 {
453                 return <-errs
454         }
455         return nil
456 }
457
458 func (bal *Balancer) addCollection(coll arvados.Collection) error {
459         blkids, err := coll.SizedDigests()
460         if err != nil {
461                 return fmt.Errorf("%v: %v", coll.UUID, err)
462         }
463         repl := bal.DefaultReplication
464         if coll.ReplicationDesired != nil {
465                 repl = *coll.ReplicationDesired
466         }
467         bal.Logger.Debugf("%v: %d blocks x%d", coll.UUID, len(blkids), repl)
468         // Pass pdh to IncreaseDesired only if LostBlocksFile is being
469         // written -- otherwise it's just a waste of memory.
470         pdh := ""
471         if bal.LostBlocksFile != "" {
472                 pdh = coll.PortableDataHash
473         }
474         bal.BlockStateMap.IncreaseDesired(pdh, coll.StorageClassesDesired, repl, blkids)
475         return nil
476 }
477
478 // ComputeChangeSets compares, for each known block, the current and
479 // desired replication states. If it is possible to get closer to the
480 // desired state by copying or deleting blocks, it adds those changes
481 // to the relevant KeepServices' ChangeSets.
482 //
483 // It does not actually apply any of the computed changes.
484 func (bal *Balancer) ComputeChangeSets() {
485         // This just calls balanceBlock() once for each block, using a
486         // pool of worker goroutines.
487         defer bal.time("changeset_compute", "wall clock time to compute changesets")()
488         bal.setupLookupTables()
489
490         type balanceTask struct {
491                 blkid arvados.SizedDigest
492                 blk   *BlockState
493         }
494         workers := runtime.GOMAXPROCS(-1)
495         todo := make(chan balanceTask, workers)
496         go func() {
497                 bal.BlockStateMap.Apply(func(blkid arvados.SizedDigest, blk *BlockState) {
498                         todo <- balanceTask{
499                                 blkid: blkid,
500                                 blk:   blk,
501                         }
502                 })
503                 close(todo)
504         }()
505         results := make(chan balanceResult, workers)
506         go func() {
507                 var wg sync.WaitGroup
508                 for i := 0; i < workers; i++ {
509                         wg.Add(1)
510                         go func() {
511                                 for work := range todo {
512                                         results <- bal.balanceBlock(work.blkid, work.blk)
513                                 }
514                                 wg.Done()
515                         }()
516                 }
517                 wg.Wait()
518                 close(results)
519         }()
520         bal.collectStatistics(results)
521 }
522
523 func (bal *Balancer) setupLookupTables() {
524         bal.serviceRoots = make(map[string]string)
525         bal.classes = defaultClasses
526         bal.mountsByClass = map[string]map[*KeepMount]bool{"default": {}}
527         bal.mounts = 0
528         for _, srv := range bal.KeepServices {
529                 bal.serviceRoots[srv.UUID] = srv.UUID
530                 for _, mnt := range srv.mounts {
531                         bal.mounts++
532
533                         // All mounts on a read-only service are
534                         // effectively read-only.
535                         mnt.ReadOnly = mnt.ReadOnly || srv.ReadOnly
536
537                         if len(mnt.StorageClasses) == 0 {
538                                 bal.mountsByClass["default"][mnt] = true
539                                 continue
540                         }
541                         for class := range mnt.StorageClasses {
542                                 if mbc := bal.mountsByClass[class]; mbc == nil {
543                                         bal.classes = append(bal.classes, class)
544                                         bal.mountsByClass[class] = map[*KeepMount]bool{mnt: true}
545                                 } else {
546                                         mbc[mnt] = true
547                                 }
548                         }
549                 }
550         }
551         // Consider classes in lexicographic order to avoid flapping
552         // between balancing runs.  The outcome of the "prefer a mount
553         // we're already planning to use for a different storage
554         // class" case in balanceBlock depends on the order classes
555         // are considered.
556         sort.Strings(bal.classes)
557 }
558
559 const (
560         changeStay = iota
561         changePull
562         changeTrash
563         changeNone
564 )
565
566 var changeName = map[int]string{
567         changeStay:  "stay",
568         changePull:  "pull",
569         changeTrash: "trash",
570         changeNone:  "none",
571 }
572
573 type balancedBlockState struct {
574         needed       int
575         unneeded     int
576         pulling      int
577         unachievable bool
578 }
579
580 type balanceResult struct {
581         blk        *BlockState
582         blkid      arvados.SizedDigest
583         lost       bool
584         blockState balancedBlockState
585         classState map[string]balancedBlockState
586 }
587
588 type slot struct {
589         mnt  *KeepMount // never nil
590         repl *Replica   // replica already stored here (or nil)
591         want bool       // we should pull/leave a replica here
592 }
593
594 // balanceBlock compares current state to desired state for a single
595 // block, and makes the appropriate ChangeSet calls.
596 func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) balanceResult {
597         bal.Logger.Debugf("balanceBlock: %v %+v", blkid, blk)
598
599         // Build a list of all slots (one per mounted volume).
600         slots := make([]slot, 0, bal.mounts)
601         for _, srv := range bal.KeepServices {
602                 for _, mnt := range srv.mounts {
603                         var repl *Replica
604                         for r := range blk.Replicas {
605                                 if blk.Replicas[r].KeepMount == mnt {
606                                         repl = &blk.Replicas[r]
607                                 }
608                         }
609                         // Initial value of "want" is "have, and can't
610                         // delete". These untrashable replicas get
611                         // prioritized when sorting slots: otherwise,
612                         // non-optimal readonly copies would cause us
613                         // to overreplicate.
614                         slots = append(slots, slot{
615                                 mnt:  mnt,
616                                 repl: repl,
617                                 want: repl != nil && mnt.ReadOnly,
618                         })
619                 }
620         }
621
622         uuids := keepclient.NewRootSorter(bal.serviceRoots, string(blkid[:32])).GetSortedRoots()
623         srvRendezvous := make(map[*KeepService]int, len(uuids))
624         for i, uuid := range uuids {
625                 srv := bal.KeepServices[uuid]
626                 srvRendezvous[srv] = i
627         }
628
629         // Below we set underreplicated=true if we find any storage
630         // class that's currently underreplicated -- in that case we
631         // won't want to trash any replicas.
632         underreplicated := false
633
634         unsafeToDelete := make(map[int64]bool, len(slots))
635         for _, class := range bal.classes {
636                 desired := blk.Desired[class]
637                 if desired == 0 {
638                         continue
639                 }
640
641                 // Sort the slots by desirability.
642                 sort.Slice(slots, func(i, j int) bool {
643                         si, sj := slots[i], slots[j]
644                         if classi, classj := bal.mountsByClass[class][si.mnt], bal.mountsByClass[class][sj.mnt]; classi != classj {
645                                 // Prefer a mount that satisfies the
646                                 // desired class.
647                                 return bal.mountsByClass[class][si.mnt]
648                         } else if si.want != sj.want {
649                                 // Prefer a mount that will have a
650                                 // replica no matter what we do here
651                                 // -- either because it already has an
652                                 // untrashable replica, or because we
653                                 // already need it to satisfy a
654                                 // different storage class.
655                                 return si.want
656                         } else if orderi, orderj := srvRendezvous[si.mnt.KeepService], srvRendezvous[sj.mnt.KeepService]; orderi != orderj {
657                                 // Prefer a better rendezvous
658                                 // position.
659                                 return orderi < orderj
660                         } else if repli, replj := si.repl != nil, sj.repl != nil; repli != replj {
661                                 // Prefer a mount that already has a
662                                 // replica.
663                                 return repli
664                         } else {
665                                 // If pull/trash turns out to be
666                                 // needed, distribute the
667                                 // new/remaining replicas uniformly
668                                 // across qualifying mounts on a given
669                                 // server.
670                                 return rendezvousLess(si.mnt.DeviceID, sj.mnt.DeviceID, blkid)
671                         }
672                 })
673
674                 // Servers/mounts/devices (with or without existing
675                 // replicas) that are part of the best achievable
676                 // layout for this storage class.
677                 wantSrv := map[*KeepService]bool{}
678                 wantMnt := map[*KeepMount]bool{}
679                 wantDev := map[string]bool{}
680                 // Positions (with existing replicas) that have been
681                 // protected (via unsafeToDelete) to ensure we don't
682                 // reduce replication below desired level when
683                 // trashing replicas that aren't optimal positions for
684                 // any storage class.
685                 protMnt := map[*KeepMount]bool{}
686                 // Replication planned so far (corresponds to wantMnt).
687                 replWant := 0
688                 // Protected replication (corresponds to protMnt).
689                 replProt := 0
690
691                 // trySlot tries using a slot to meet requirements,
692                 // and returns true if all requirements are met.
693                 trySlot := func(i int) bool {
694                         slot := slots[i]
695                         if wantMnt[slot.mnt] || wantDev[slot.mnt.DeviceID] {
696                                 // Already allocated a replica to this
697                                 // backend device, possibly on a
698                                 // different server.
699                                 return false
700                         }
701                         if replProt < desired && slot.repl != nil && !protMnt[slot.mnt] {
702                                 unsafeToDelete[slot.repl.Mtime] = true
703                                 protMnt[slot.mnt] = true
704                                 replProt += slot.mnt.Replication
705                         }
706                         if replWant < desired && (slot.repl != nil || !slot.mnt.ReadOnly) {
707                                 slots[i].want = true
708                                 wantSrv[slot.mnt.KeepService] = true
709                                 wantMnt[slot.mnt] = true
710                                 if slot.mnt.DeviceID != "" {
711                                         wantDev[slot.mnt.DeviceID] = true
712                                 }
713                                 replWant += slot.mnt.Replication
714                         }
715                         return replProt >= desired && replWant >= desired
716                 }
717
718                 // First try to achieve desired replication without
719                 // using the same server twice.
720                 done := false
721                 for i := 0; i < len(slots) && !done; i++ {
722                         if !wantSrv[slots[i].mnt.KeepService] {
723                                 done = trySlot(i)
724                         }
725                 }
726
727                 // If that didn't suffice, do another pass without the
728                 // "distinct services" restriction. (Achieving the
729                 // desired volume replication on fewer than the
730                 // desired number of services is better than
731                 // underreplicating.)
732                 for i := 0; i < len(slots) && !done; i++ {
733                         done = trySlot(i)
734                 }
735
736                 if !underreplicated {
737                         safe := 0
738                         for _, slot := range slots {
739                                 if slot.repl == nil || !bal.mountsByClass[class][slot.mnt] {
740                                         continue
741                                 }
742                                 if safe += slot.mnt.Replication; safe >= desired {
743                                         break
744                                 }
745                         }
746                         underreplicated = safe < desired
747                 }
748
749                 // Avoid deleting wanted replicas from devices that
750                 // are mounted on multiple servers -- even if they
751                 // haven't already been added to unsafeToDelete
752                 // because the servers report different Mtimes.
753                 for _, slot := range slots {
754                         if slot.repl != nil && wantDev[slot.mnt.DeviceID] {
755                                 unsafeToDelete[slot.repl.Mtime] = true
756                         }
757                 }
758         }
759
760         // TODO: If multiple replicas are trashable, prefer the oldest
761         // replica that doesn't have a timestamp collision with
762         // others.
763
764         for i, slot := range slots {
765                 // Don't trash (1) any replicas of an underreplicated
766                 // block, even if they're in the wrong positions, or
767                 // (2) any replicas whose Mtimes are identical to
768                 // needed replicas (in case we're really seeing the
769                 // same copy via different mounts).
770                 if slot.repl != nil && (underreplicated || unsafeToDelete[slot.repl.Mtime]) {
771                         slots[i].want = true
772                 }
773         }
774
775         classState := make(map[string]balancedBlockState, len(bal.classes))
776         for _, class := range bal.classes {
777                 classState[class] = computeBlockState(slots, bal.mountsByClass[class], len(blk.Replicas), blk.Desired[class])
778         }
779         blockState := computeBlockState(slots, nil, len(blk.Replicas), 0)
780
781         var lost bool
782         var changes []string
783         for _, slot := range slots {
784                 // TODO: request a Touch if Mtime is duplicated.
785                 var change int
786                 switch {
787                 case !slot.want && slot.repl != nil && slot.repl.Mtime < bal.MinMtime:
788                         slot.mnt.KeepService.AddTrash(Trash{
789                                 SizedDigest: blkid,
790                                 Mtime:       slot.repl.Mtime,
791                                 From:        slot.mnt,
792                         })
793                         change = changeTrash
794                 case slot.repl == nil && slot.want && len(blk.Replicas) == 0:
795                         lost = true
796                         change = changeNone
797                 case slot.repl == nil && slot.want && !slot.mnt.ReadOnly:
798                         slot.mnt.KeepService.AddPull(Pull{
799                                 SizedDigest: blkid,
800                                 From:        blk.Replicas[0].KeepMount.KeepService,
801                                 To:          slot.mnt,
802                         })
803                         change = changePull
804                 case slot.repl != nil:
805                         change = changeStay
806                 default:
807                         change = changeNone
808                 }
809                 if bal.Dumper != nil {
810                         var mtime int64
811                         if slot.repl != nil {
812                                 mtime = slot.repl.Mtime
813                         }
814                         srv := slot.mnt.KeepService
815                         changes = append(changes, fmt.Sprintf("%s:%d/%s=%s,%d", srv.ServiceHost, srv.ServicePort, slot.mnt.UUID, changeName[change], mtime))
816                 }
817         }
818         if bal.Dumper != nil {
819                 bal.Dumper.Printf("%s refs=%d needed=%d unneeded=%d pulling=%v %v %v", blkid, blk.RefCount, blockState.needed, blockState.unneeded, blockState.pulling, blk.Desired, changes)
820         }
821         return balanceResult{
822                 blk:        blk,
823                 blkid:      blkid,
824                 lost:       lost,
825                 blockState: blockState,
826                 classState: classState,
827         }
828 }
829
830 func computeBlockState(slots []slot, onlyCount map[*KeepMount]bool, have, needRepl int) (bbs balancedBlockState) {
831         repl := 0
832         countedDev := map[string]bool{}
833         for _, slot := range slots {
834                 if onlyCount != nil && !onlyCount[slot.mnt] {
835                         continue
836                 }
837                 if countedDev[slot.mnt.DeviceID] {
838                         continue
839                 }
840                 switch {
841                 case slot.repl != nil && slot.want:
842                         bbs.needed++
843                         repl += slot.mnt.Replication
844                 case slot.repl != nil && !slot.want:
845                         bbs.unneeded++
846                         repl += slot.mnt.Replication
847                 case slot.repl == nil && slot.want && have > 0:
848                         bbs.pulling++
849                         repl += slot.mnt.Replication
850                 }
851                 if slot.mnt.DeviceID != "" {
852                         countedDev[slot.mnt.DeviceID] = true
853                 }
854         }
855         if repl < needRepl {
856                 bbs.unachievable = true
857         }
858         return
859 }
860
861 type blocksNBytes struct {
862         replicas int
863         blocks   int
864         bytes    int64
865 }
866
867 func (bb blocksNBytes) String() string {
868         return fmt.Sprintf("%d replicas (%d blocks, %d bytes)", bb.replicas, bb.blocks, bb.bytes)
869 }
870
871 type replicationStats struct {
872         needed       blocksNBytes
873         unneeded     blocksNBytes
874         pulling      blocksNBytes
875         unachievable blocksNBytes
876 }
877
878 type balancerStats struct {
879         lost          blocksNBytes
880         overrep       blocksNBytes
881         unref         blocksNBytes
882         garbage       blocksNBytes
883         underrep      blocksNBytes
884         unachievable  blocksNBytes
885         justright     blocksNBytes
886         desired       blocksNBytes
887         current       blocksNBytes
888         pulls         int
889         trashes       int
890         replHistogram []int
891         classStats    map[string]replicationStats
892
893         // collectionBytes / collectionBlockBytes = deduplication ratio
894         collectionBytes      int64 // sum(bytes in referenced blocks) across all collections
895         collectionBlockBytes int64 // sum(block size) across all blocks referenced by collections
896         collectionBlockRefs  int64 // sum(number of blocks referenced) across all collections
897         collectionBlocks     int64 // number of blocks referenced by any collection
898 }
899
900 func (s *balancerStats) dedupByteRatio() float64 {
901         if s.collectionBlockBytes == 0 {
902                 return 0
903         }
904         return float64(s.collectionBytes) / float64(s.collectionBlockBytes)
905 }
906
907 func (s *balancerStats) dedupBlockRatio() float64 {
908         if s.collectionBlocks == 0 {
909                 return 0
910         }
911         return float64(s.collectionBlockRefs) / float64(s.collectionBlocks)
912 }
913
914 func (bal *Balancer) collectStatistics(results <-chan balanceResult) {
915         var s balancerStats
916         s.replHistogram = make([]int, 2)
917         s.classStats = make(map[string]replicationStats, len(bal.classes))
918         for result := range results {
919                 bytes := result.blkid.Size()
920
921                 if rc := int64(result.blk.RefCount); rc > 0 {
922                         s.collectionBytes += rc * bytes
923                         s.collectionBlockBytes += bytes
924                         s.collectionBlockRefs += rc
925                         s.collectionBlocks++
926                 }
927
928                 for class, state := range result.classState {
929                         cs := s.classStats[class]
930                         if state.unachievable {
931                                 cs.unachievable.replicas++
932                                 cs.unachievable.blocks++
933                                 cs.unachievable.bytes += bytes
934                         }
935                         if state.needed > 0 {
936                                 cs.needed.replicas += state.needed
937                                 cs.needed.blocks++
938                                 cs.needed.bytes += bytes * int64(state.needed)
939                         }
940                         if state.unneeded > 0 {
941                                 cs.unneeded.replicas += state.unneeded
942                                 cs.unneeded.blocks++
943                                 cs.unneeded.bytes += bytes * int64(state.unneeded)
944                         }
945                         if state.pulling > 0 {
946                                 cs.pulling.replicas += state.pulling
947                                 cs.pulling.blocks++
948                                 cs.pulling.bytes += bytes * int64(state.pulling)
949                         }
950                         s.classStats[class] = cs
951                 }
952
953                 bs := result.blockState
954                 switch {
955                 case result.lost:
956                         s.lost.replicas++
957                         s.lost.blocks++
958                         s.lost.bytes += bytes
959                         fmt.Fprintf(bal.lostBlocks, "%s", strings.SplitN(string(result.blkid), "+", 2)[0])
960                         for pdh := range result.blk.Refs {
961                                 fmt.Fprintf(bal.lostBlocks, " %s", pdh)
962                         }
963                         fmt.Fprint(bal.lostBlocks, "\n")
964                 case bs.pulling > 0:
965                         s.underrep.replicas += bs.pulling
966                         s.underrep.blocks++
967                         s.underrep.bytes += bytes * int64(bs.pulling)
968                 case bs.unachievable:
969                         s.underrep.replicas++
970                         s.underrep.blocks++
971                         s.underrep.bytes += bytes
972                 case bs.unneeded > 0 && bs.needed == 0:
973                         // Count as "garbage" if all replicas are old
974                         // enough to trash, otherwise count as
975                         // "unref".
976                         counter := &s.garbage
977                         for _, r := range result.blk.Replicas {
978                                 if r.Mtime >= bal.MinMtime {
979                                         counter = &s.unref
980                                         break
981                                 }
982                         }
983                         counter.replicas += bs.unneeded
984                         counter.blocks++
985                         counter.bytes += bytes * int64(bs.unneeded)
986                 case bs.unneeded > 0:
987                         s.overrep.replicas += bs.unneeded
988                         s.overrep.blocks++
989                         s.overrep.bytes += bytes * int64(bs.unneeded)
990                 default:
991                         s.justright.replicas += bs.needed
992                         s.justright.blocks++
993                         s.justright.bytes += bytes * int64(bs.needed)
994                 }
995
996                 if bs.needed > 0 {
997                         s.desired.replicas += bs.needed
998                         s.desired.blocks++
999                         s.desired.bytes += bytes * int64(bs.needed)
1000                 }
1001                 if bs.needed+bs.unneeded > 0 {
1002                         s.current.replicas += bs.needed + bs.unneeded
1003                         s.current.blocks++
1004                         s.current.bytes += bytes * int64(bs.needed+bs.unneeded)
1005                 }
1006
1007                 for len(s.replHistogram) <= bs.needed+bs.unneeded {
1008                         s.replHistogram = append(s.replHistogram, 0)
1009                 }
1010                 s.replHistogram[bs.needed+bs.unneeded]++
1011         }
1012         for _, srv := range bal.KeepServices {
1013                 s.pulls += len(srv.ChangeSet.Pulls)
1014                 s.trashes += len(srv.ChangeSet.Trashes)
1015         }
1016         bal.stats = s
1017         bal.Metrics.UpdateStats(s)
1018 }
1019
1020 // PrintStatistics writes statistics about the computed changes to
1021 // bal.Logger. It should not be called until ComputeChangeSets has
1022 // finished.
1023 func (bal *Balancer) PrintStatistics() {
1024         bal.logf("===")
1025         bal.logf("%s lost (0=have<want)", bal.stats.lost)
1026         bal.logf("%s underreplicated (0<have<want)", bal.stats.underrep)
1027         bal.logf("%s just right (have=want)", bal.stats.justright)
1028         bal.logf("%s overreplicated (have>want>0)", bal.stats.overrep)
1029         bal.logf("%s unreferenced (have>want=0, new)", bal.stats.unref)
1030         bal.logf("%s garbage (have>want=0, old)", bal.stats.garbage)
1031         for _, class := range bal.classes {
1032                 cs := bal.stats.classStats[class]
1033                 bal.logf("===")
1034                 bal.logf("storage class %q: %s needed", class, cs.needed)
1035                 bal.logf("storage class %q: %s unneeded", class, cs.unneeded)
1036                 bal.logf("storage class %q: %s pulling", class, cs.pulling)
1037                 bal.logf("storage class %q: %s unachievable", class, cs.unachievable)
1038         }
1039         bal.logf("===")
1040         bal.logf("%s total commitment (excluding unreferenced)", bal.stats.desired)
1041         bal.logf("%s total usage", bal.stats.current)
1042         bal.logf("===")
1043         for _, srv := range bal.KeepServices {
1044                 bal.logf("%s: %v\n", srv, srv.ChangeSet)
1045         }
1046         bal.logf("===")
1047         bal.printHistogram(60)
1048         bal.logf("===")
1049 }
1050
1051 func (bal *Balancer) printHistogram(hashColumns int) {
1052         bal.logf("Replication level distribution:")
1053         maxCount := 0
1054         for _, count := range bal.stats.replHistogram {
1055                 if maxCount < count {
1056                         maxCount = count
1057                 }
1058         }
1059         hashes := strings.Repeat("#", hashColumns)
1060         countWidth := 1 + int(math.Log10(float64(maxCount+1)))
1061         scaleCount := 10 * float64(hashColumns) / math.Floor(1+10*math.Log10(float64(maxCount+1)))
1062         for repl, count := range bal.stats.replHistogram {
1063                 nHashes := int(scaleCount * math.Log10(float64(count+1)))
1064                 bal.logf("%2d: %*d %s", repl, countWidth, count, hashes[:nHashes])
1065         }
1066 }
1067
1068 // CheckSanityLate checks for configuration and runtime errors after
1069 // GetCurrentState() and ComputeChangeSets() have finished.
1070 //
1071 // If it returns an error, it is dangerous to run any Commit methods.
1072 func (bal *Balancer) CheckSanityLate() error {
1073         if bal.errors != nil {
1074                 for _, err := range bal.errors {
1075                         bal.logf("deferred error: %v", err)
1076                 }
1077                 return fmt.Errorf("cannot proceed safely after deferred errors")
1078         }
1079
1080         if bal.collScanned == 0 {
1081                 return fmt.Errorf("received zero collections")
1082         }
1083
1084         anyDesired := false
1085         bal.BlockStateMap.Apply(func(_ arvados.SizedDigest, blk *BlockState) {
1086                 for _, desired := range blk.Desired {
1087                         if desired > 0 {
1088                                 anyDesired = true
1089                                 break
1090                         }
1091                 }
1092         })
1093         if !anyDesired {
1094                 return fmt.Errorf("zero blocks have desired replication>0")
1095         }
1096
1097         if dr := bal.DefaultReplication; dr < 1 {
1098                 return fmt.Errorf("Default replication (%d) is less than 1", dr)
1099         }
1100
1101         // TODO: no two services have identical indexes
1102         // TODO: no collisions (same md5, different size)
1103         return nil
1104 }
1105
1106 // CommitPulls sends the computed lists of pull requests to the
1107 // keepstore servers. This has the effect of increasing replication of
1108 // existing blocks that are either underreplicated or poorly
1109 // distributed according to rendezvous hashing.
1110 func (bal *Balancer) CommitPulls(ctx context.Context, c *arvados.Client) error {
1111         defer bal.time("send_pull_lists", "wall clock time to send pull lists")()
1112         return bal.commitAsync(c, "send pull list",
1113                 func(srv *KeepService) error {
1114                         return srv.CommitPulls(ctx, c)
1115                 })
1116 }
1117
1118 // CommitTrash sends the computed lists of trash requests to the
1119 // keepstore servers. This has the effect of deleting blocks that are
1120 // overreplicated or unreferenced.
1121 func (bal *Balancer) CommitTrash(ctx context.Context, c *arvados.Client) error {
1122         defer bal.time("send_trash_lists", "wall clock time to send trash lists")()
1123         return bal.commitAsync(c, "send trash list",
1124                 func(srv *KeepService) error {
1125                         return srv.CommitTrash(ctx, c)
1126                 })
1127 }
1128
1129 func (bal *Balancer) commitAsync(c *arvados.Client, label string, f func(srv *KeepService) error) error {
1130         errs := make(chan error)
1131         for _, srv := range bal.KeepServices {
1132                 go func(srv *KeepService) {
1133                         var err error
1134                         defer func() { errs <- err }()
1135                         label := fmt.Sprintf("%s: %v", srv, label)
1136                         err = f(srv)
1137                         if err != nil {
1138                                 err = fmt.Errorf("%s: %v", label, err)
1139                         }
1140                 }(srv)
1141         }
1142         var lastErr error
1143         for range bal.KeepServices {
1144                 if err := <-errs; err != nil {
1145                         bal.logf("%v", err)
1146                         lastErr = err
1147                 }
1148         }
1149         close(errs)
1150         return lastErr
1151 }
1152
1153 func (bal *Balancer) logf(f string, args ...interface{}) {
1154         if bal.Logger != nil {
1155                 bal.Logger.Printf(f, args...)
1156         }
1157 }
1158
1159 func (bal *Balancer) time(name, help string) func() {
1160         observer := bal.Metrics.DurationObserver(name+"_seconds", help)
1161         t0 := time.Now()
1162         bal.Logger.Printf("%s: start", name)
1163         return func() {
1164                 dur := time.Since(t0)
1165                 observer.Observe(dur.Seconds())
1166                 bal.Logger.Printf("%s: took %vs", name, dur.Seconds())
1167         }
1168 }
1169
1170 // Rendezvous hash sort function. Less efficient than sorting on
1171 // precomputed rendezvous hashes, but also rarely used.
1172 func rendezvousLess(i, j string, blkid arvados.SizedDigest) bool {
1173         a := md5.Sum([]byte(string(blkid[:32]) + i))
1174         b := md5.Sum([]byte(string(blkid[:32]) + j))
1175         return bytes.Compare(a[:], b[:]) < 0
1176 }