Merge branch '15014-wrong-node-count'
[arvados.git] / services / keep-balance / balance.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "crypto/md5"
10         "fmt"
11         "io"
12         "io/ioutil"
13         "log"
14         "math"
15         "os"
16         "runtime"
17         "sort"
18         "strings"
19         "sync"
20         "syscall"
21         "time"
22
23         "git.curoverse.com/arvados.git/sdk/go/arvados"
24         "git.curoverse.com/arvados.git/sdk/go/keepclient"
25         "github.com/sirupsen/logrus"
26 )
27
28 // Balancer compares the contents of keepstore servers with the
29 // collections stored in Arvados, and issues pull/trash requests
30 // needed to get (closer to) the optimal data layout.
31 //
32 // In the optimal data layout: every data block referenced by a
33 // collection is replicated at least as many times as desired by the
34 // collection; there are no unreferenced data blocks older than
35 // BlobSignatureTTL; and all N existing replicas of a given data block
36 // are in the N best positions in rendezvous probe order.
37 type Balancer struct {
38         Logger  logrus.FieldLogger
39         Dumper  logrus.FieldLogger
40         Metrics *metrics
41
42         LostBlocksFile string
43
44         *BlockStateMap
45         KeepServices       map[string]*KeepService
46         DefaultReplication int
47         MinMtime           int64
48
49         classes       []string
50         mounts        int
51         mountsByClass map[string]map[*KeepMount]bool
52         collScanned   int
53         serviceRoots  map[string]string
54         errors        []error
55         stats         balancerStats
56         mutex         sync.Mutex
57         lostBlocks    io.Writer
58 }
59
60 // Run performs a balance operation using the given config and
61 // runOptions, and returns RunOptions suitable for passing to a
62 // subsequent balance operation.
63 //
64 // Run should only be called once on a given Balancer object.
65 //
66 // Typical usage:
67 //
68 //   runOptions, err = (&Balancer{}).Run(config, runOptions)
69 func (bal *Balancer) Run(client *arvados.Client, cluster *arvados.Cluster, runOptions RunOptions) (nextRunOptions RunOptions, err error) {
70         nextRunOptions = runOptions
71
72         defer bal.time("sweep", "wall clock time to run one full sweep")()
73
74         var lbFile *os.File
75         if bal.LostBlocksFile != "" {
76                 tmpfn := bal.LostBlocksFile + ".tmp"
77                 lbFile, err = os.OpenFile(tmpfn, os.O_CREATE|os.O_WRONLY, 0777)
78                 if err != nil {
79                         return
80                 }
81                 defer lbFile.Close()
82                 err = syscall.Flock(int(lbFile.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
83                 if err != nil {
84                         return
85                 }
86                 defer func() {
87                         // Remove the tempfile only if we didn't get
88                         // as far as successfully renaming it.
89                         if lbFile != nil {
90                                 os.Remove(tmpfn)
91                         }
92                 }()
93                 bal.lostBlocks = lbFile
94         } else {
95                 bal.lostBlocks = ioutil.Discard
96         }
97
98         err = bal.DiscoverKeepServices(client)
99         if err != nil {
100                 return
101         }
102
103         for _, srv := range bal.KeepServices {
104                 err = srv.discoverMounts(client)
105                 if err != nil {
106                         return
107                 }
108         }
109         bal.cleanupMounts()
110
111         if err = bal.CheckSanityEarly(client); err != nil {
112                 return
113         }
114         rs := bal.rendezvousState()
115         if runOptions.CommitTrash && rs != runOptions.SafeRendezvousState {
116                 if runOptions.SafeRendezvousState != "" {
117                         bal.logf("notice: KeepServices list has changed since last run")
118                 }
119                 bal.logf("clearing existing trash lists, in case the new rendezvous order differs from previous run")
120                 if err = bal.ClearTrashLists(client); err != nil {
121                         return
122                 }
123                 // The current rendezvous state becomes "safe" (i.e.,
124                 // OK to compute changes for that state without
125                 // clearing existing trash lists) only now, after we
126                 // succeed in clearing existing trash lists.
127                 nextRunOptions.SafeRendezvousState = rs
128         }
129         if err = bal.GetCurrentState(client, cluster.Collections.BalanceCollectionBatch, cluster.Collections.BalanceCollectionBuffers); err != nil {
130                 return
131         }
132         bal.ComputeChangeSets()
133         bal.PrintStatistics()
134         if err = bal.CheckSanityLate(); err != nil {
135                 return
136         }
137         if lbFile != nil {
138                 err = lbFile.Sync()
139                 if err != nil {
140                         return
141                 }
142                 err = os.Rename(bal.LostBlocksFile+".tmp", bal.LostBlocksFile)
143                 if err != nil {
144                         return
145                 }
146                 lbFile = nil
147         }
148         if runOptions.CommitPulls {
149                 err = bal.CommitPulls(client)
150                 if err != nil {
151                         // Skip trash if we can't pull. (Too cautious?)
152                         return
153                 }
154         }
155         if runOptions.CommitTrash {
156                 err = bal.CommitTrash(client)
157         }
158         return
159 }
160
161 // SetKeepServices sets the list of KeepServices to operate on.
162 func (bal *Balancer) SetKeepServices(srvList arvados.KeepServiceList) error {
163         bal.KeepServices = make(map[string]*KeepService)
164         for _, srv := range srvList.Items {
165                 bal.KeepServices[srv.UUID] = &KeepService{
166                         KeepService: srv,
167                         ChangeSet:   &ChangeSet{},
168                 }
169         }
170         return nil
171 }
172
173 // DiscoverKeepServices sets the list of KeepServices by calling the
174 // API to get a list of all services, and selecting the ones whose
175 // ServiceType is "disk"
176 func (bal *Balancer) DiscoverKeepServices(c *arvados.Client) error {
177         bal.KeepServices = make(map[string]*KeepService)
178         return c.EachKeepService(func(srv arvados.KeepService) error {
179                 if srv.ServiceType == "disk" {
180                         bal.KeepServices[srv.UUID] = &KeepService{
181                                 KeepService: srv,
182                                 ChangeSet:   &ChangeSet{},
183                         }
184                 } else {
185                         bal.logf("skipping %v with service type %q", srv.UUID, srv.ServiceType)
186                 }
187                 return nil
188         })
189 }
190
191 func (bal *Balancer) cleanupMounts() {
192         rwdev := map[string]*KeepService{}
193         for _, srv := range bal.KeepServices {
194                 for _, mnt := range srv.mounts {
195                         if !mnt.ReadOnly && mnt.DeviceID != "" {
196                                 rwdev[mnt.DeviceID] = srv
197                         }
198                 }
199         }
200         // Drop the readonly mounts whose device is mounted RW
201         // elsewhere.
202         for _, srv := range bal.KeepServices {
203                 var dedup []*KeepMount
204                 for _, mnt := range srv.mounts {
205                         if mnt.ReadOnly && rwdev[mnt.DeviceID] != nil {
206                                 bal.logf("skipping srv %s readonly mount %q because same device %q is mounted read-write on srv %s", srv, mnt.UUID, mnt.DeviceID, rwdev[mnt.DeviceID])
207                         } else {
208                                 dedup = append(dedup, mnt)
209                         }
210                 }
211                 srv.mounts = dedup
212         }
213         for _, srv := range bal.KeepServices {
214                 for _, mnt := range srv.mounts {
215                         if mnt.Replication <= 0 {
216                                 log.Printf("%s: mount %s reports replication=%d, using replication=1", srv, mnt.UUID, mnt.Replication)
217                                 mnt.Replication = 1
218                         }
219                 }
220         }
221 }
222
223 // CheckSanityEarly checks for configuration and runtime errors that
224 // can be detected before GetCurrentState() and ComputeChangeSets()
225 // are called.
226 //
227 // If it returns an error, it is pointless to run GetCurrentState or
228 // ComputeChangeSets: after doing so, the statistics would be
229 // meaningless and it would be dangerous to run any Commit methods.
230 func (bal *Balancer) CheckSanityEarly(c *arvados.Client) error {
231         u, err := c.CurrentUser()
232         if err != nil {
233                 return fmt.Errorf("CurrentUser(): %v", err)
234         }
235         if !u.IsActive || !u.IsAdmin {
236                 return fmt.Errorf("current user (%s) is not an active admin user", u.UUID)
237         }
238         for _, srv := range bal.KeepServices {
239                 if srv.ServiceType == "proxy" {
240                         return fmt.Errorf("config error: %s: proxy servers cannot be balanced", srv)
241                 }
242         }
243
244         var checkPage arvados.CollectionList
245         if err = c.RequestAndDecode(&checkPage, "GET", "arvados/v1/collections", nil, arvados.ResourceListParams{
246                 Limit:              new(int),
247                 Count:              "exact",
248                 IncludeTrash:       true,
249                 IncludeOldVersions: true,
250                 Filters: []arvados.Filter{{
251                         Attr:     "modified_at",
252                         Operator: "=",
253                         Operand:  nil,
254                 }},
255         }); err != nil {
256                 return err
257         } else if n := checkPage.ItemsAvailable; n > 0 {
258                 return fmt.Errorf("%d collections exist with null modified_at; cannot fetch reliably", n)
259         }
260
261         return nil
262 }
263
264 // rendezvousState returns a fingerprint (e.g., a sorted list of
265 // UUID+host+port) of the current set of keep services.
266 func (bal *Balancer) rendezvousState() string {
267         srvs := make([]string, 0, len(bal.KeepServices))
268         for _, srv := range bal.KeepServices {
269                 srvs = append(srvs, srv.String())
270         }
271         sort.Strings(srvs)
272         return strings.Join(srvs, "; ")
273 }
274
275 // ClearTrashLists sends an empty trash list to each keep
276 // service. Calling this before GetCurrentState avoids races.
277 //
278 // When a block appears in an index, we assume that replica will still
279 // exist after we delete other replicas on other servers. However,
280 // it's possible that a previous rebalancing operation made different
281 // decisions (e.g., servers were added/removed, and rendezvous order
282 // changed). In this case, the replica might already be on that
283 // server's trash list, and it might be deleted before we send a
284 // replacement trash list.
285 //
286 // We avoid this problem if we clear all trash lists before getting
287 // indexes. (We also assume there is only one rebalancing process
288 // running at a time.)
289 func (bal *Balancer) ClearTrashLists(c *arvados.Client) error {
290         for _, srv := range bal.KeepServices {
291                 srv.ChangeSet = &ChangeSet{}
292         }
293         return bal.CommitTrash(c)
294 }
295
296 // GetCurrentState determines the current replication state, and the
297 // desired replication level, for every block that is either
298 // retrievable or referenced.
299 //
300 // It determines the current replication state by reading the block index
301 // from every known Keep service.
302 //
303 // It determines the desired replication level by retrieving all
304 // collection manifests in the database (API server).
305 //
306 // It encodes the resulting information in BlockStateMap.
307 func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) error {
308         defer bal.time("get_state", "wall clock time to get current state")()
309         bal.BlockStateMap = NewBlockStateMap()
310
311         dd, err := c.DiscoveryDocument()
312         if err != nil {
313                 return err
314         }
315         bal.DefaultReplication = dd.DefaultCollectionReplication
316         bal.MinMtime = time.Now().UnixNano() - dd.BlobSignatureTTL*1e9
317
318         errs := make(chan error, 1)
319         wg := sync.WaitGroup{}
320
321         // When a device is mounted more than once, we will get its
322         // index only once, and call AddReplicas on all of the mounts.
323         // equivMount keys are the mounts that will be indexed, and
324         // each value is a list of mounts to apply the received index
325         // to.
326         equivMount := map[*KeepMount][]*KeepMount{}
327         // deviceMount maps each device ID to the one mount that will
328         // be indexed for that device.
329         deviceMount := map[string]*KeepMount{}
330         for _, srv := range bal.KeepServices {
331                 for _, mnt := range srv.mounts {
332                         equiv := deviceMount[mnt.DeviceID]
333                         if equiv == nil {
334                                 equiv = mnt
335                                 if mnt.DeviceID != "" {
336                                         deviceMount[mnt.DeviceID] = equiv
337                                 }
338                         }
339                         equivMount[equiv] = append(equivMount[equiv], mnt)
340                 }
341         }
342
343         // Start one goroutine for each (non-redundant) mount:
344         // retrieve the index, and add the returned blocks to
345         // BlockStateMap.
346         for _, mounts := range equivMount {
347                 wg.Add(1)
348                 go func(mounts []*KeepMount) {
349                         defer wg.Done()
350                         bal.logf("mount %s: retrieve index from %s", mounts[0], mounts[0].KeepService)
351                         idx, err := mounts[0].KeepService.IndexMount(c, mounts[0].UUID, "")
352                         if err != nil {
353                                 select {
354                                 case errs <- fmt.Errorf("%s: retrieve index: %v", mounts[0], err):
355                                 default:
356                                 }
357                                 return
358                         }
359                         if len(errs) > 0 {
360                                 // Some other goroutine encountered an
361                                 // error -- any further effort here
362                                 // will be wasted.
363                                 return
364                         }
365                         for _, mount := range mounts {
366                                 bal.logf("%s: add %d entries to map", mount, len(idx))
367                                 bal.BlockStateMap.AddReplicas(mount, idx)
368                                 bal.logf("%s: added %d entries to map at %dx (%d replicas)", mount, len(idx), mount.Replication, len(idx)*mount.Replication)
369                         }
370                         bal.logf("mount %s: index done", mounts[0])
371                 }(mounts)
372         }
373
374         // collQ buffers incoming collections so we can start fetching
375         // the next page without waiting for the current page to
376         // finish processing.
377         collQ := make(chan arvados.Collection, bufs)
378
379         // Start a goroutine to process collections. (We could use a
380         // worker pool here, but even with a single worker we already
381         // process collections much faster than we can retrieve them.)
382         wg.Add(1)
383         go func() {
384                 defer wg.Done()
385                 for coll := range collQ {
386                         err := bal.addCollection(coll)
387                         if err != nil || len(errs) > 0 {
388                                 select {
389                                 case errs <- err:
390                                 default:
391                                 }
392                                 for range collQ {
393                                 }
394                                 return
395                         }
396                         bal.collScanned++
397                 }
398         }()
399
400         // Start a goroutine to retrieve all collections from the
401         // Arvados database and send them to collQ for processing.
402         wg.Add(1)
403         go func() {
404                 defer wg.Done()
405                 err = EachCollection(c, pageSize,
406                         func(coll arvados.Collection) error {
407                                 collQ <- coll
408                                 if len(errs) > 0 {
409                                         // some other GetCurrentState
410                                         // error happened: no point
411                                         // getting any more
412                                         // collections.
413                                         return fmt.Errorf("")
414                                 }
415                                 return nil
416                         }, func(done, total int) {
417                                 bal.logf("collections: %d/%d", done, total)
418                         })
419                 close(collQ)
420                 if err != nil {
421                         select {
422                         case errs <- err:
423                         default:
424                         }
425                 }
426         }()
427
428         wg.Wait()
429         if len(errs) > 0 {
430                 return <-errs
431         }
432         return nil
433 }
434
435 func (bal *Balancer) addCollection(coll arvados.Collection) error {
436         blkids, err := coll.SizedDigests()
437         if err != nil {
438                 return fmt.Errorf("%v: %v", coll.UUID, err)
439         }
440         repl := bal.DefaultReplication
441         if coll.ReplicationDesired != nil {
442                 repl = *coll.ReplicationDesired
443         }
444         bal.Logger.Debugf("%v: %d block x%d", coll.UUID, len(blkids), repl)
445         // Pass pdh to IncreaseDesired only if LostBlocksFile is being
446         // written -- otherwise it's just a waste of memory.
447         pdh := ""
448         if bal.LostBlocksFile != "" {
449                 pdh = coll.PortableDataHash
450         }
451         bal.BlockStateMap.IncreaseDesired(pdh, coll.StorageClassesDesired, repl, blkids)
452         return nil
453 }
454
455 // ComputeChangeSets compares, for each known block, the current and
456 // desired replication states. If it is possible to get closer to the
457 // desired state by copying or deleting blocks, it adds those changes
458 // to the relevant KeepServices' ChangeSets.
459 //
460 // It does not actually apply any of the computed changes.
461 func (bal *Balancer) ComputeChangeSets() {
462         // This just calls balanceBlock() once for each block, using a
463         // pool of worker goroutines.
464         defer bal.time("changeset_compute", "wall clock time to compute changesets")()
465         bal.setupLookupTables()
466
467         type balanceTask struct {
468                 blkid arvados.SizedDigest
469                 blk   *BlockState
470         }
471         workers := runtime.GOMAXPROCS(-1)
472         todo := make(chan balanceTask, workers)
473         go func() {
474                 bal.BlockStateMap.Apply(func(blkid arvados.SizedDigest, blk *BlockState) {
475                         todo <- balanceTask{
476                                 blkid: blkid,
477                                 blk:   blk,
478                         }
479                 })
480                 close(todo)
481         }()
482         results := make(chan balanceResult, workers)
483         go func() {
484                 var wg sync.WaitGroup
485                 for i := 0; i < workers; i++ {
486                         wg.Add(1)
487                         go func() {
488                                 for work := range todo {
489                                         results <- bal.balanceBlock(work.blkid, work.blk)
490                                 }
491                                 wg.Done()
492                         }()
493                 }
494                 wg.Wait()
495                 close(results)
496         }()
497         bal.collectStatistics(results)
498 }
499
500 func (bal *Balancer) setupLookupTables() {
501         bal.serviceRoots = make(map[string]string)
502         bal.classes = defaultClasses
503         bal.mountsByClass = map[string]map[*KeepMount]bool{"default": {}}
504         bal.mounts = 0
505         for _, srv := range bal.KeepServices {
506                 bal.serviceRoots[srv.UUID] = srv.UUID
507                 for _, mnt := range srv.mounts {
508                         bal.mounts++
509
510                         // All mounts on a read-only service are
511                         // effectively read-only.
512                         mnt.ReadOnly = mnt.ReadOnly || srv.ReadOnly
513
514                         if len(mnt.StorageClasses) == 0 {
515                                 bal.mountsByClass["default"][mnt] = true
516                                 continue
517                         }
518                         for class := range mnt.StorageClasses {
519                                 if mbc := bal.mountsByClass[class]; mbc == nil {
520                                         bal.classes = append(bal.classes, class)
521                                         bal.mountsByClass[class] = map[*KeepMount]bool{mnt: true}
522                                 } else {
523                                         mbc[mnt] = true
524                                 }
525                         }
526                 }
527         }
528         // Consider classes in lexicographic order to avoid flapping
529         // between balancing runs.  The outcome of the "prefer a mount
530         // we're already planning to use for a different storage
531         // class" case in balanceBlock depends on the order classes
532         // are considered.
533         sort.Strings(bal.classes)
534 }
535
536 const (
537         changeStay = iota
538         changePull
539         changeTrash
540         changeNone
541 )
542
543 var changeName = map[int]string{
544         changeStay:  "stay",
545         changePull:  "pull",
546         changeTrash: "trash",
547         changeNone:  "none",
548 }
549
550 type balanceResult struct {
551         blk        *BlockState
552         blkid      arvados.SizedDigest
553         have       int
554         want       int
555         classState map[string]balancedBlockState
556 }
557
558 // balanceBlock compares current state to desired state for a single
559 // block, and makes the appropriate ChangeSet calls.
560 func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) balanceResult {
561         bal.Logger.Debugf("balanceBlock: %v %+v", blkid, blk)
562
563         type slot struct {
564                 mnt  *KeepMount // never nil
565                 repl *Replica   // replica already stored here (or nil)
566                 want bool       // we should pull/leave a replica here
567         }
568
569         // Build a list of all slots (one per mounted volume).
570         slots := make([]slot, 0, bal.mounts)
571         for _, srv := range bal.KeepServices {
572                 for _, mnt := range srv.mounts {
573                         var repl *Replica
574                         for r := range blk.Replicas {
575                                 if blk.Replicas[r].KeepMount == mnt {
576                                         repl = &blk.Replicas[r]
577                                 }
578                         }
579                         // Initial value of "want" is "have, and can't
580                         // delete". These untrashable replicas get
581                         // prioritized when sorting slots: otherwise,
582                         // non-optimal readonly copies would cause us
583                         // to overreplicate.
584                         slots = append(slots, slot{
585                                 mnt:  mnt,
586                                 repl: repl,
587                                 want: repl != nil && mnt.ReadOnly,
588                         })
589                 }
590         }
591
592         uuids := keepclient.NewRootSorter(bal.serviceRoots, string(blkid[:32])).GetSortedRoots()
593         srvRendezvous := make(map[*KeepService]int, len(uuids))
594         for i, uuid := range uuids {
595                 srv := bal.KeepServices[uuid]
596                 srvRendezvous[srv] = i
597         }
598
599         // Below we set underreplicated=true if we find any storage
600         // class that's currently underreplicated -- in that case we
601         // won't want to trash any replicas.
602         underreplicated := false
603
604         classState := make(map[string]balancedBlockState, len(bal.classes))
605         unsafeToDelete := make(map[int64]bool, len(slots))
606         for _, class := range bal.classes {
607                 desired := blk.Desired[class]
608
609                 countedDev := map[string]bool{}
610                 have := 0
611                 for _, slot := range slots {
612                         if slot.repl != nil && bal.mountsByClass[class][slot.mnt] && !countedDev[slot.mnt.DeviceID] {
613                                 have += slot.mnt.Replication
614                                 if slot.mnt.DeviceID != "" {
615                                         countedDev[slot.mnt.DeviceID] = true
616                                 }
617                         }
618                 }
619                 classState[class] = balancedBlockState{
620                         desired: desired,
621                         surplus: have - desired,
622                 }
623
624                 if desired == 0 {
625                         continue
626                 }
627
628                 // Sort the slots by desirability.
629                 sort.Slice(slots, func(i, j int) bool {
630                         si, sj := slots[i], slots[j]
631                         if classi, classj := bal.mountsByClass[class][si.mnt], bal.mountsByClass[class][sj.mnt]; classi != classj {
632                                 // Prefer a mount that satisfies the
633                                 // desired class.
634                                 return bal.mountsByClass[class][si.mnt]
635                         } else if si.want != sj.want {
636                                 // Prefer a mount that will have a
637                                 // replica no matter what we do here
638                                 // -- either because it already has an
639                                 // untrashable replica, or because we
640                                 // already need it to satisfy a
641                                 // different storage class.
642                                 return si.want
643                         } else if orderi, orderj := srvRendezvous[si.mnt.KeepService], srvRendezvous[sj.mnt.KeepService]; orderi != orderj {
644                                 // Prefer a better rendezvous
645                                 // position.
646                                 return orderi < orderj
647                         } else if repli, replj := si.repl != nil, sj.repl != nil; repli != replj {
648                                 // Prefer a mount that already has a
649                                 // replica.
650                                 return repli
651                         } else {
652                                 // If pull/trash turns out to be
653                                 // needed, distribute the
654                                 // new/remaining replicas uniformly
655                                 // across qualifying mounts on a given
656                                 // server.
657                                 return rendezvousLess(si.mnt.DeviceID, sj.mnt.DeviceID, blkid)
658                         }
659                 })
660
661                 // Servers/mounts/devices (with or without existing
662                 // replicas) that are part of the best achievable
663                 // layout for this storage class.
664                 wantSrv := map[*KeepService]bool{}
665                 wantMnt := map[*KeepMount]bool{}
666                 wantDev := map[string]bool{}
667                 // Positions (with existing replicas) that have been
668                 // protected (via unsafeToDelete) to ensure we don't
669                 // reduce replication below desired level when
670                 // trashing replicas that aren't optimal positions for
671                 // any storage class.
672                 protMnt := map[*KeepMount]bool{}
673                 // Replication planned so far (corresponds to wantMnt).
674                 replWant := 0
675                 // Protected replication (corresponds to protMnt).
676                 replProt := 0
677
678                 // trySlot tries using a slot to meet requirements,
679                 // and returns true if all requirements are met.
680                 trySlot := func(i int) bool {
681                         slot := slots[i]
682                         if wantMnt[slot.mnt] || wantDev[slot.mnt.DeviceID] {
683                                 // Already allocated a replica to this
684                                 // backend device, possibly on a
685                                 // different server.
686                                 return false
687                         }
688                         if replProt < desired && slot.repl != nil && !protMnt[slot.mnt] {
689                                 unsafeToDelete[slot.repl.Mtime] = true
690                                 protMnt[slot.mnt] = true
691                                 replProt += slot.mnt.Replication
692                         }
693                         if replWant < desired && (slot.repl != nil || !slot.mnt.ReadOnly) {
694                                 slots[i].want = true
695                                 wantSrv[slot.mnt.KeepService] = true
696                                 wantMnt[slot.mnt] = true
697                                 if slot.mnt.DeviceID != "" {
698                                         wantDev[slot.mnt.DeviceID] = true
699                                 }
700                                 replWant += slot.mnt.Replication
701                         }
702                         return replProt >= desired && replWant >= desired
703                 }
704
705                 // First try to achieve desired replication without
706                 // using the same server twice.
707                 done := false
708                 for i := 0; i < len(slots) && !done; i++ {
709                         if !wantSrv[slots[i].mnt.KeepService] {
710                                 done = trySlot(i)
711                         }
712                 }
713
714                 // If that didn't suffice, do another pass without the
715                 // "distinct services" restriction. (Achieving the
716                 // desired volume replication on fewer than the
717                 // desired number of services is better than
718                 // underreplicating.)
719                 for i := 0; i < len(slots) && !done; i++ {
720                         done = trySlot(i)
721                 }
722
723                 if !underreplicated {
724                         safe := 0
725                         for _, slot := range slots {
726                                 if slot.repl == nil || !bal.mountsByClass[class][slot.mnt] {
727                                         continue
728                                 }
729                                 if safe += slot.mnt.Replication; safe >= desired {
730                                         break
731                                 }
732                         }
733                         underreplicated = safe < desired
734                 }
735
736                 // set the unachievable flag if there aren't enough
737                 // slots offering the relevant storage class. (This is
738                 // as easy as checking slots[desired] because we
739                 // already sorted the qualifying slots to the front.)
740                 if desired >= len(slots) || !bal.mountsByClass[class][slots[desired].mnt] {
741                         cs := classState[class]
742                         cs.unachievable = true
743                         classState[class] = cs
744                 }
745
746                 // Avoid deleting wanted replicas from devices that
747                 // are mounted on multiple servers -- even if they
748                 // haven't already been added to unsafeToDelete
749                 // because the servers report different Mtimes.
750                 for _, slot := range slots {
751                         if slot.repl != nil && wantDev[slot.mnt.DeviceID] {
752                                 unsafeToDelete[slot.repl.Mtime] = true
753                         }
754                 }
755         }
756
757         // TODO: If multiple replicas are trashable, prefer the oldest
758         // replica that doesn't have a timestamp collision with
759         // others.
760
761         countedDev := map[string]bool{}
762         var have, want int
763         for _, slot := range slots {
764                 if countedDev[slot.mnt.DeviceID] {
765                         continue
766                 }
767                 if slot.want {
768                         want += slot.mnt.Replication
769                 }
770                 if slot.repl != nil {
771                         have += slot.mnt.Replication
772                 }
773                 if slot.mnt.DeviceID != "" {
774                         countedDev[slot.mnt.DeviceID] = true
775                 }
776         }
777
778         var changes []string
779         for _, slot := range slots {
780                 // TODO: request a Touch if Mtime is duplicated.
781                 var change int
782                 switch {
783                 case !underreplicated && !slot.want && slot.repl != nil && slot.repl.Mtime < bal.MinMtime && !unsafeToDelete[slot.repl.Mtime]:
784                         slot.mnt.KeepService.AddTrash(Trash{
785                                 SizedDigest: blkid,
786                                 Mtime:       slot.repl.Mtime,
787                                 From:        slot.mnt,
788                         })
789                         change = changeTrash
790                 case len(blk.Replicas) > 0 && slot.repl == nil && slot.want && !slot.mnt.ReadOnly:
791                         slot.mnt.KeepService.AddPull(Pull{
792                                 SizedDigest: blkid,
793                                 From:        blk.Replicas[0].KeepMount.KeepService,
794                                 To:          slot.mnt,
795                         })
796                         change = changePull
797                 case slot.repl != nil:
798                         change = changeStay
799                 default:
800                         change = changeNone
801                 }
802                 if bal.Dumper != nil {
803                         var mtime int64
804                         if slot.repl != nil {
805                                 mtime = slot.repl.Mtime
806                         }
807                         srv := slot.mnt.KeepService
808                         changes = append(changes, fmt.Sprintf("%s:%d/%s=%s,%d", srv.ServiceHost, srv.ServicePort, slot.mnt.UUID, changeName[change], mtime))
809                 }
810         }
811         if bal.Dumper != nil {
812                 bal.Dumper.Printf("%s refs=%d have=%d want=%v %v %v", blkid, blk.RefCount, have, want, blk.Desired, changes)
813         }
814         return balanceResult{
815                 blk:        blk,
816                 blkid:      blkid,
817                 have:       have,
818                 want:       want,
819                 classState: classState,
820         }
821 }
822
823 type blocksNBytes struct {
824         replicas int
825         blocks   int
826         bytes    int64
827 }
828
829 func (bb blocksNBytes) String() string {
830         return fmt.Sprintf("%d replicas (%d blocks, %d bytes)", bb.replicas, bb.blocks, bb.bytes)
831 }
832
833 type balancerStats struct {
834         lost          blocksNBytes
835         overrep       blocksNBytes
836         unref         blocksNBytes
837         garbage       blocksNBytes
838         underrep      blocksNBytes
839         unachievable  blocksNBytes
840         justright     blocksNBytes
841         desired       blocksNBytes
842         current       blocksNBytes
843         pulls         int
844         trashes       int
845         replHistogram []int
846         classStats    map[string]replicationStats
847
848         // collectionBytes / collectionBlockBytes = deduplication ratio
849         collectionBytes      int64 // sum(bytes in referenced blocks) across all collections
850         collectionBlockBytes int64 // sum(block size) across all blocks referenced by collections
851         collectionBlockRefs  int64 // sum(number of blocks referenced) across all collections
852         collectionBlocks     int64 // number of blocks referenced by any collection
853 }
854
855 func (s *balancerStats) dedupByteRatio() float64 {
856         if s.collectionBlockBytes == 0 {
857                 return 0
858         }
859         return float64(s.collectionBytes) / float64(s.collectionBlockBytes)
860 }
861
862 func (s *balancerStats) dedupBlockRatio() float64 {
863         if s.collectionBlocks == 0 {
864                 return 0
865         }
866         return float64(s.collectionBlockRefs) / float64(s.collectionBlocks)
867 }
868
869 type replicationStats struct {
870         desired      blocksNBytes
871         surplus      blocksNBytes
872         short        blocksNBytes
873         unachievable blocksNBytes
874 }
875
876 type balancedBlockState struct {
877         desired      int
878         surplus      int
879         unachievable bool
880 }
881
882 func (bal *Balancer) collectStatistics(results <-chan balanceResult) {
883         var s balancerStats
884         s.replHistogram = make([]int, 2)
885         s.classStats = make(map[string]replicationStats, len(bal.classes))
886         for result := range results {
887                 surplus := result.have - result.want
888                 bytes := result.blkid.Size()
889
890                 if rc := int64(result.blk.RefCount); rc > 0 {
891                         s.collectionBytes += rc * bytes
892                         s.collectionBlockBytes += bytes
893                         s.collectionBlockRefs += rc
894                         s.collectionBlocks++
895                 }
896
897                 for class, state := range result.classState {
898                         cs := s.classStats[class]
899                         if state.unachievable {
900                                 cs.unachievable.blocks++
901                                 cs.unachievable.bytes += bytes
902                         }
903                         if state.desired > 0 {
904                                 cs.desired.replicas += state.desired
905                                 cs.desired.blocks++
906                                 cs.desired.bytes += bytes * int64(state.desired)
907                         }
908                         if state.surplus > 0 {
909                                 cs.surplus.replicas += state.surplus
910                                 cs.surplus.blocks++
911                                 cs.surplus.bytes += bytes * int64(state.surplus)
912                         } else if state.surplus < 0 {
913                                 cs.short.replicas += -state.surplus
914                                 cs.short.blocks++
915                                 cs.short.bytes += bytes * int64(-state.surplus)
916                         }
917                         s.classStats[class] = cs
918                 }
919
920                 switch {
921                 case result.have == 0 && result.want > 0:
922                         s.lost.replicas -= surplus
923                         s.lost.blocks++
924                         s.lost.bytes += bytes * int64(-surplus)
925                         fmt.Fprintf(bal.lostBlocks, "%s", strings.SplitN(string(result.blkid), "+", 2)[0])
926                         for pdh := range result.blk.Refs {
927                                 fmt.Fprintf(bal.lostBlocks, " %s", pdh)
928                         }
929                         fmt.Fprint(bal.lostBlocks, "\n")
930                 case surplus < 0:
931                         s.underrep.replicas -= surplus
932                         s.underrep.blocks++
933                         s.underrep.bytes += bytes * int64(-surplus)
934                 case surplus > 0 && result.want == 0:
935                         counter := &s.garbage
936                         for _, r := range result.blk.Replicas {
937                                 if r.Mtime >= bal.MinMtime {
938                                         counter = &s.unref
939                                         break
940                                 }
941                         }
942                         counter.replicas += surplus
943                         counter.blocks++
944                         counter.bytes += bytes * int64(surplus)
945                 case surplus > 0:
946                         s.overrep.replicas += surplus
947                         s.overrep.blocks++
948                         s.overrep.bytes += bytes * int64(result.have-result.want)
949                 default:
950                         s.justright.replicas += result.want
951                         s.justright.blocks++
952                         s.justright.bytes += bytes * int64(result.want)
953                 }
954
955                 if result.want > 0 {
956                         s.desired.replicas += result.want
957                         s.desired.blocks++
958                         s.desired.bytes += bytes * int64(result.want)
959                 }
960                 if result.have > 0 {
961                         s.current.replicas += result.have
962                         s.current.blocks++
963                         s.current.bytes += bytes * int64(result.have)
964                 }
965
966                 for len(s.replHistogram) <= result.have {
967                         s.replHistogram = append(s.replHistogram, 0)
968                 }
969                 s.replHistogram[result.have]++
970         }
971         for _, srv := range bal.KeepServices {
972                 s.pulls += len(srv.ChangeSet.Pulls)
973                 s.trashes += len(srv.ChangeSet.Trashes)
974         }
975         bal.stats = s
976         bal.Metrics.UpdateStats(s)
977 }
978
979 // PrintStatistics writes statistics about the computed changes to
980 // bal.Logger. It should not be called until ComputeChangeSets has
981 // finished.
982 func (bal *Balancer) PrintStatistics() {
983         bal.logf("===")
984         bal.logf("%s lost (0=have<want)", bal.stats.lost)
985         bal.logf("%s underreplicated (0<have<want)", bal.stats.underrep)
986         bal.logf("%s just right (have=want)", bal.stats.justright)
987         bal.logf("%s overreplicated (have>want>0)", bal.stats.overrep)
988         bal.logf("%s unreferenced (have>want=0, new)", bal.stats.unref)
989         bal.logf("%s garbage (have>want=0, old)", bal.stats.garbage)
990         for _, class := range bal.classes {
991                 cs := bal.stats.classStats[class]
992                 bal.logf("===")
993                 bal.logf("storage class %q: %s desired", class, cs.desired)
994                 bal.logf("storage class %q: %s short", class, cs.short)
995                 bal.logf("storage class %q: %s surplus", class, cs.surplus)
996                 bal.logf("storage class %q: %s unachievable", class, cs.unachievable)
997         }
998         bal.logf("===")
999         bal.logf("%s total commitment (excluding unreferenced)", bal.stats.desired)
1000         bal.logf("%s total usage", bal.stats.current)
1001         bal.logf("===")
1002         for _, srv := range bal.KeepServices {
1003                 bal.logf("%s: %v\n", srv, srv.ChangeSet)
1004         }
1005         bal.logf("===")
1006         bal.printHistogram(60)
1007         bal.logf("===")
1008 }
1009
1010 func (bal *Balancer) printHistogram(hashColumns int) {
1011         bal.logf("Replication level distribution (counting N replicas on a single server as N):")
1012         maxCount := 0
1013         for _, count := range bal.stats.replHistogram {
1014                 if maxCount < count {
1015                         maxCount = count
1016                 }
1017         }
1018         hashes := strings.Repeat("#", hashColumns)
1019         countWidth := 1 + int(math.Log10(float64(maxCount+1)))
1020         scaleCount := 10 * float64(hashColumns) / math.Floor(1+10*math.Log10(float64(maxCount+1)))
1021         for repl, count := range bal.stats.replHistogram {
1022                 nHashes := int(scaleCount * math.Log10(float64(count+1)))
1023                 bal.logf("%2d: %*d %s", repl, countWidth, count, hashes[:nHashes])
1024         }
1025 }
1026
1027 // CheckSanityLate checks for configuration and runtime errors after
1028 // GetCurrentState() and ComputeChangeSets() have finished.
1029 //
1030 // If it returns an error, it is dangerous to run any Commit methods.
1031 func (bal *Balancer) CheckSanityLate() error {
1032         if bal.errors != nil {
1033                 for _, err := range bal.errors {
1034                         bal.logf("deferred error: %v", err)
1035                 }
1036                 return fmt.Errorf("cannot proceed safely after deferred errors")
1037         }
1038
1039         if bal.collScanned == 0 {
1040                 return fmt.Errorf("received zero collections")
1041         }
1042
1043         anyDesired := false
1044         bal.BlockStateMap.Apply(func(_ arvados.SizedDigest, blk *BlockState) {
1045                 for _, desired := range blk.Desired {
1046                         if desired > 0 {
1047                                 anyDesired = true
1048                                 break
1049                         }
1050                 }
1051         })
1052         if !anyDesired {
1053                 return fmt.Errorf("zero blocks have desired replication>0")
1054         }
1055
1056         if dr := bal.DefaultReplication; dr < 1 {
1057                 return fmt.Errorf("Default replication (%d) is less than 1", dr)
1058         }
1059
1060         // TODO: no two services have identical indexes
1061         // TODO: no collisions (same md5, different size)
1062         return nil
1063 }
1064
1065 // CommitPulls sends the computed lists of pull requests to the
1066 // keepstore servers. This has the effect of increasing replication of
1067 // existing blocks that are either underreplicated or poorly
1068 // distributed according to rendezvous hashing.
1069 func (bal *Balancer) CommitPulls(c *arvados.Client) error {
1070         defer bal.time("send_pull_lists", "wall clock time to send pull lists")()
1071         return bal.commitAsync(c, "send pull list",
1072                 func(srv *KeepService) error {
1073                         return srv.CommitPulls(c)
1074                 })
1075 }
1076
1077 // CommitTrash sends the computed lists of trash requests to the
1078 // keepstore servers. This has the effect of deleting blocks that are
1079 // overreplicated or unreferenced.
1080 func (bal *Balancer) CommitTrash(c *arvados.Client) error {
1081         defer bal.time("send_trash_lists", "wall clock time to send trash lists")()
1082         return bal.commitAsync(c, "send trash list",
1083                 func(srv *KeepService) error {
1084                         return srv.CommitTrash(c)
1085                 })
1086 }
1087
1088 func (bal *Balancer) commitAsync(c *arvados.Client, label string, f func(srv *KeepService) error) error {
1089         errs := make(chan error)
1090         for _, srv := range bal.KeepServices {
1091                 go func(srv *KeepService) {
1092                         var err error
1093                         defer func() { errs <- err }()
1094                         label := fmt.Sprintf("%s: %v", srv, label)
1095                         err = f(srv)
1096                         if err != nil {
1097                                 err = fmt.Errorf("%s: %v", label, err)
1098                         }
1099                 }(srv)
1100         }
1101         var lastErr error
1102         for range bal.KeepServices {
1103                 if err := <-errs; err != nil {
1104                         bal.logf("%v", err)
1105                         lastErr = err
1106                 }
1107         }
1108         close(errs)
1109         return lastErr
1110 }
1111
1112 func (bal *Balancer) logf(f string, args ...interface{}) {
1113         if bal.Logger != nil {
1114                 bal.Logger.Printf(f, args...)
1115         }
1116 }
1117
1118 func (bal *Balancer) time(name, help string) func() {
1119         observer := bal.Metrics.DurationObserver(name+"_seconds", help)
1120         t0 := time.Now()
1121         bal.Logger.Printf("%s: start", name)
1122         return func() {
1123                 dur := time.Since(t0)
1124                 observer.Observe(dur.Seconds())
1125                 bal.Logger.Printf("%s: took %vs", name, dur.Seconds())
1126         }
1127 }
1128
1129 // Rendezvous hash sort function. Less efficient than sorting on
1130 // precomputed rendezvous hashes, but also rarely used.
1131 func rendezvousLess(i, j string, blkid arvados.SizedDigest) bool {
1132         a := md5.Sum([]byte(string(blkid[:32]) + i))
1133         b := md5.Sum([]byte(string(blkid[:32]) + j))
1134         return bytes.Compare(a[:], b[:]) < 0
1135 }