15003: Merge branch 'master' into 15003-preprocess-config
[arvados.git] / services / keep-balance / balance.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "crypto/md5"
10         "fmt"
11         "io"
12         "io/ioutil"
13         "log"
14         "math"
15         "os"
16         "runtime"
17         "sort"
18         "strings"
19         "sync"
20         "syscall"
21         "time"
22
23         "git.curoverse.com/arvados.git/sdk/go/arvados"
24         "git.curoverse.com/arvados.git/sdk/go/keepclient"
25         "github.com/sirupsen/logrus"
26 )
27
28 // Balancer compares the contents of keepstore servers with the
29 // collections stored in Arvados, and issues pull/trash requests
30 // needed to get (closer to) the optimal data layout.
31 //
32 // In the optimal data layout: every data block referenced by a
33 // collection is replicated at least as many times as desired by the
34 // collection; there are no unreferenced data blocks older than
35 // BlobSignatureTTL; and all N existing replicas of a given data block
36 // are in the N best positions in rendezvous probe order.
37 type Balancer struct {
38         Logger  logrus.FieldLogger
39         Dumper  logrus.FieldLogger
40         Metrics *metrics
41
42         LostBlocksFile string
43
44         *BlockStateMap
45         KeepServices       map[string]*KeepService
46         DefaultReplication int
47         MinMtime           int64
48
49         classes       []string
50         mounts        int
51         mountsByClass map[string]map[*KeepMount]bool
52         collScanned   int
53         serviceRoots  map[string]string
54         errors        []error
55         stats         balancerStats
56         mutex         sync.Mutex
57         lostBlocks    io.Writer
58 }
59
60 // Run performs a balance operation using the given config and
61 // runOptions, and returns RunOptions suitable for passing to a
62 // subsequent balance operation.
63 //
64 // Run should only be called once on a given Balancer object.
65 //
66 // Typical usage:
67 //
68 //   runOptions, err = (&Balancer{}).Run(config, runOptions)
69 func (bal *Balancer) Run(config Config, runOptions RunOptions) (nextRunOptions RunOptions, err error) {
70         nextRunOptions = runOptions
71
72         defer bal.time("sweep", "wall clock time to run one full sweep")()
73
74         var lbFile *os.File
75         if bal.LostBlocksFile != "" {
76                 tmpfn := bal.LostBlocksFile + ".tmp"
77                 lbFile, err = os.OpenFile(tmpfn, os.O_CREATE|os.O_WRONLY, 0777)
78                 if err != nil {
79                         return
80                 }
81                 defer lbFile.Close()
82                 err = syscall.Flock(int(lbFile.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
83                 if err != nil {
84                         return
85                 }
86                 defer func() {
87                         // Remove the tempfile only if we didn't get
88                         // as far as successfully renaming it.
89                         if lbFile != nil {
90                                 os.Remove(tmpfn)
91                         }
92                 }()
93                 bal.lostBlocks = lbFile
94         } else {
95                 bal.lostBlocks = ioutil.Discard
96         }
97
98         if len(config.KeepServiceList.Items) > 0 {
99                 err = bal.SetKeepServices(config.KeepServiceList)
100         } else {
101                 err = bal.DiscoverKeepServices(&config.Client, config.KeepServiceTypes)
102         }
103         if err != nil {
104                 return
105         }
106
107         for _, srv := range bal.KeepServices {
108                 err = srv.discoverMounts(&config.Client)
109                 if err != nil {
110                         return
111                 }
112         }
113         bal.cleanupMounts()
114
115         if err = bal.CheckSanityEarly(&config.Client); err != nil {
116                 return
117         }
118         rs := bal.rendezvousState()
119         if runOptions.CommitTrash && rs != runOptions.SafeRendezvousState {
120                 if runOptions.SafeRendezvousState != "" {
121                         bal.logf("notice: KeepServices list has changed since last run")
122                 }
123                 bal.logf("clearing existing trash lists, in case the new rendezvous order differs from previous run")
124                 if err = bal.ClearTrashLists(&config.Client); err != nil {
125                         return
126                 }
127                 // The current rendezvous state becomes "safe" (i.e.,
128                 // OK to compute changes for that state without
129                 // clearing existing trash lists) only now, after we
130                 // succeed in clearing existing trash lists.
131                 nextRunOptions.SafeRendezvousState = rs
132         }
133         if err = bal.GetCurrentState(&config.Client, config.CollectionBatchSize, config.CollectionBuffers); err != nil {
134                 return
135         }
136         bal.ComputeChangeSets()
137         bal.PrintStatistics()
138         if err = bal.CheckSanityLate(); err != nil {
139                 return
140         }
141         if lbFile != nil {
142                 err = lbFile.Sync()
143                 if err != nil {
144                         return
145                 }
146                 err = os.Rename(bal.LostBlocksFile+".tmp", bal.LostBlocksFile)
147                 if err != nil {
148                         return
149                 }
150                 lbFile = nil
151         }
152         if runOptions.CommitPulls {
153                 err = bal.CommitPulls(&config.Client)
154                 if err != nil {
155                         // Skip trash if we can't pull. (Too cautious?)
156                         return
157                 }
158         }
159         if runOptions.CommitTrash {
160                 err = bal.CommitTrash(&config.Client)
161         }
162         return
163 }
164
165 // SetKeepServices sets the list of KeepServices to operate on.
166 func (bal *Balancer) SetKeepServices(srvList arvados.KeepServiceList) error {
167         bal.KeepServices = make(map[string]*KeepService)
168         for _, srv := range srvList.Items {
169                 bal.KeepServices[srv.UUID] = &KeepService{
170                         KeepService: srv,
171                         ChangeSet:   &ChangeSet{},
172                 }
173         }
174         return nil
175 }
176
177 // DiscoverKeepServices sets the list of KeepServices by calling the
178 // API to get a list of all services, and selecting the ones whose
179 // ServiceType is in okTypes.
180 func (bal *Balancer) DiscoverKeepServices(c *arvados.Client, okTypes []string) error {
181         bal.KeepServices = make(map[string]*KeepService)
182         ok := make(map[string]bool)
183         for _, t := range okTypes {
184                 ok[t] = true
185         }
186         return c.EachKeepService(func(srv arvados.KeepService) error {
187                 if ok[srv.ServiceType] {
188                         bal.KeepServices[srv.UUID] = &KeepService{
189                                 KeepService: srv,
190                                 ChangeSet:   &ChangeSet{},
191                         }
192                 } else {
193                         bal.logf("skipping %v with service type %q", srv.UUID, srv.ServiceType)
194                 }
195                 return nil
196         })
197 }
198
199 func (bal *Balancer) cleanupMounts() {
200         rwdev := map[string]*KeepService{}
201         for _, srv := range bal.KeepServices {
202                 for _, mnt := range srv.mounts {
203                         if !mnt.ReadOnly && mnt.DeviceID != "" {
204                                 rwdev[mnt.DeviceID] = srv
205                         }
206                 }
207         }
208         // Drop the readonly mounts whose device is mounted RW
209         // elsewhere.
210         for _, srv := range bal.KeepServices {
211                 var dedup []*KeepMount
212                 for _, mnt := range srv.mounts {
213                         if mnt.ReadOnly && rwdev[mnt.DeviceID] != nil {
214                                 bal.logf("skipping srv %s readonly mount %q because same device %q is mounted read-write on srv %s", srv, mnt.UUID, mnt.DeviceID, rwdev[mnt.DeviceID])
215                         } else {
216                                 dedup = append(dedup, mnt)
217                         }
218                 }
219                 srv.mounts = dedup
220         }
221         for _, srv := range bal.KeepServices {
222                 for _, mnt := range srv.mounts {
223                         if mnt.Replication <= 0 {
224                                 log.Printf("%s: mount %s reports replication=%d, using replication=1", srv, mnt.UUID, mnt.Replication)
225                                 mnt.Replication = 1
226                         }
227                 }
228         }
229 }
230
231 // CheckSanityEarly checks for configuration and runtime errors that
232 // can be detected before GetCurrentState() and ComputeChangeSets()
233 // are called.
234 //
235 // If it returns an error, it is pointless to run GetCurrentState or
236 // ComputeChangeSets: after doing so, the statistics would be
237 // meaningless and it would be dangerous to run any Commit methods.
238 func (bal *Balancer) CheckSanityEarly(c *arvados.Client) error {
239         u, err := c.CurrentUser()
240         if err != nil {
241                 return fmt.Errorf("CurrentUser(): %v", err)
242         }
243         if !u.IsActive || !u.IsAdmin {
244                 return fmt.Errorf("current user (%s) is not an active admin user", u.UUID)
245         }
246         for _, srv := range bal.KeepServices {
247                 if srv.ServiceType == "proxy" {
248                         return fmt.Errorf("config error: %s: proxy servers cannot be balanced", srv)
249                 }
250         }
251
252         var checkPage arvados.CollectionList
253         if err = c.RequestAndDecode(&checkPage, "GET", "arvados/v1/collections", nil, arvados.ResourceListParams{
254                 Limit:              new(int),
255                 Count:              "exact",
256                 IncludeTrash:       true,
257                 IncludeOldVersions: true,
258                 Filters: []arvados.Filter{{
259                         Attr:     "modified_at",
260                         Operator: "=",
261                         Operand:  nil,
262                 }},
263         }); err != nil {
264                 return err
265         } else if n := checkPage.ItemsAvailable; n > 0 {
266                 return fmt.Errorf("%d collections exist with null modified_at; cannot fetch reliably", n)
267         }
268
269         return nil
270 }
271
272 // rendezvousState returns a fingerprint (e.g., a sorted list of
273 // UUID+host+port) of the current set of keep services.
274 func (bal *Balancer) rendezvousState() string {
275         srvs := make([]string, 0, len(bal.KeepServices))
276         for _, srv := range bal.KeepServices {
277                 srvs = append(srvs, srv.String())
278         }
279         sort.Strings(srvs)
280         return strings.Join(srvs, "; ")
281 }
282
283 // ClearTrashLists sends an empty trash list to each keep
284 // service. Calling this before GetCurrentState avoids races.
285 //
286 // When a block appears in an index, we assume that replica will still
287 // exist after we delete other replicas on other servers. However,
288 // it's possible that a previous rebalancing operation made different
289 // decisions (e.g., servers were added/removed, and rendezvous order
290 // changed). In this case, the replica might already be on that
291 // server's trash list, and it might be deleted before we send a
292 // replacement trash list.
293 //
294 // We avoid this problem if we clear all trash lists before getting
295 // indexes. (We also assume there is only one rebalancing process
296 // running at a time.)
297 func (bal *Balancer) ClearTrashLists(c *arvados.Client) error {
298         for _, srv := range bal.KeepServices {
299                 srv.ChangeSet = &ChangeSet{}
300         }
301         return bal.CommitTrash(c)
302 }
303
304 // GetCurrentState determines the current replication state, and the
305 // desired replication level, for every block that is either
306 // retrievable or referenced.
307 //
308 // It determines the current replication state by reading the block index
309 // from every known Keep service.
310 //
311 // It determines the desired replication level by retrieving all
312 // collection manifests in the database (API server).
313 //
314 // It encodes the resulting information in BlockStateMap.
315 func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) error {
316         defer bal.time("get_state", "wall clock time to get current state")()
317         bal.BlockStateMap = NewBlockStateMap()
318
319         dd, err := c.DiscoveryDocument()
320         if err != nil {
321                 return err
322         }
323         bal.DefaultReplication = dd.DefaultCollectionReplication
324         bal.MinMtime = time.Now().UnixNano() - dd.BlobSignatureTTL*1e9
325
326         errs := make(chan error, 1)
327         wg := sync.WaitGroup{}
328
329         // When a device is mounted more than once, we will get its
330         // index only once, and call AddReplicas on all of the mounts.
331         // equivMount keys are the mounts that will be indexed, and
332         // each value is a list of mounts to apply the received index
333         // to.
334         equivMount := map[*KeepMount][]*KeepMount{}
335         // deviceMount maps each device ID to the one mount that will
336         // be indexed for that device.
337         deviceMount := map[string]*KeepMount{}
338         for _, srv := range bal.KeepServices {
339                 for _, mnt := range srv.mounts {
340                         equiv := deviceMount[mnt.DeviceID]
341                         if equiv == nil {
342                                 equiv = mnt
343                                 if mnt.DeviceID != "" {
344                                         deviceMount[mnt.DeviceID] = equiv
345                                 }
346                         }
347                         equivMount[equiv] = append(equivMount[equiv], mnt)
348                 }
349         }
350
351         // Start one goroutine for each (non-redundant) mount:
352         // retrieve the index, and add the returned blocks to
353         // BlockStateMap.
354         for _, mounts := range equivMount {
355                 wg.Add(1)
356                 go func(mounts []*KeepMount) {
357                         defer wg.Done()
358                         bal.logf("mount %s: retrieve index from %s", mounts[0], mounts[0].KeepService)
359                         idx, err := mounts[0].KeepService.IndexMount(c, mounts[0].UUID, "")
360                         if err != nil {
361                                 select {
362                                 case errs <- fmt.Errorf("%s: retrieve index: %v", mounts[0], err):
363                                 default:
364                                 }
365                                 return
366                         }
367                         if len(errs) > 0 {
368                                 // Some other goroutine encountered an
369                                 // error -- any further effort here
370                                 // will be wasted.
371                                 return
372                         }
373                         for _, mount := range mounts {
374                                 bal.logf("%s: add %d entries to map", mount, len(idx))
375                                 bal.BlockStateMap.AddReplicas(mount, idx)
376                                 bal.logf("%s: added %d entries to map at %dx (%d replicas)", mount, len(idx), mount.Replication, len(idx)*mount.Replication)
377                         }
378                         bal.logf("mount %s: index done", mounts[0])
379                 }(mounts)
380         }
381
382         // collQ buffers incoming collections so we can start fetching
383         // the next page without waiting for the current page to
384         // finish processing.
385         collQ := make(chan arvados.Collection, bufs)
386
387         // Start a goroutine to process collections. (We could use a
388         // worker pool here, but even with a single worker we already
389         // process collections much faster than we can retrieve them.)
390         wg.Add(1)
391         go func() {
392                 defer wg.Done()
393                 for coll := range collQ {
394                         err := bal.addCollection(coll)
395                         if err != nil || len(errs) > 0 {
396                                 select {
397                                 case errs <- err:
398                                 default:
399                                 }
400                                 for range collQ {
401                                 }
402                                 return
403                         }
404                         bal.collScanned++
405                 }
406         }()
407
408         // Start a goroutine to retrieve all collections from the
409         // Arvados database and send them to collQ for processing.
410         wg.Add(1)
411         go func() {
412                 defer wg.Done()
413                 err = EachCollection(c, pageSize,
414                         func(coll arvados.Collection) error {
415                                 collQ <- coll
416                                 if len(errs) > 0 {
417                                         // some other GetCurrentState
418                                         // error happened: no point
419                                         // getting any more
420                                         // collections.
421                                         return fmt.Errorf("")
422                                 }
423                                 return nil
424                         }, func(done, total int) {
425                                 bal.logf("collections: %d/%d", done, total)
426                         })
427                 close(collQ)
428                 if err != nil {
429                         select {
430                         case errs <- err:
431                         default:
432                         }
433                 }
434         }()
435
436         wg.Wait()
437         if len(errs) > 0 {
438                 return <-errs
439         }
440         return nil
441 }
442
443 func (bal *Balancer) addCollection(coll arvados.Collection) error {
444         blkids, err := coll.SizedDigests()
445         if err != nil {
446                 return fmt.Errorf("%v: %v", coll.UUID, err)
447         }
448         repl := bal.DefaultReplication
449         if coll.ReplicationDesired != nil {
450                 repl = *coll.ReplicationDesired
451         }
452         debugf("%v: %d block x%d", coll.UUID, len(blkids), repl)
453         bal.BlockStateMap.IncreaseDesired(coll.StorageClassesDesired, repl, blkids)
454         return nil
455 }
456
457 // ComputeChangeSets compares, for each known block, the current and
458 // desired replication states. If it is possible to get closer to the
459 // desired state by copying or deleting blocks, it adds those changes
460 // to the relevant KeepServices' ChangeSets.
461 //
462 // It does not actually apply any of the computed changes.
463 func (bal *Balancer) ComputeChangeSets() {
464         // This just calls balanceBlock() once for each block, using a
465         // pool of worker goroutines.
466         defer bal.time("changeset_compute", "wall clock time to compute changesets")()
467         bal.setupLookupTables()
468
469         type balanceTask struct {
470                 blkid arvados.SizedDigest
471                 blk   *BlockState
472         }
473         workers := runtime.GOMAXPROCS(-1)
474         todo := make(chan balanceTask, workers)
475         go func() {
476                 bal.BlockStateMap.Apply(func(blkid arvados.SizedDigest, blk *BlockState) {
477                         todo <- balanceTask{
478                                 blkid: blkid,
479                                 blk:   blk,
480                         }
481                 })
482                 close(todo)
483         }()
484         results := make(chan balanceResult, workers)
485         go func() {
486                 var wg sync.WaitGroup
487                 for i := 0; i < workers; i++ {
488                         wg.Add(1)
489                         go func() {
490                                 for work := range todo {
491                                         results <- bal.balanceBlock(work.blkid, work.blk)
492                                 }
493                                 wg.Done()
494                         }()
495                 }
496                 wg.Wait()
497                 close(results)
498         }()
499         bal.collectStatistics(results)
500 }
501
502 func (bal *Balancer) setupLookupTables() {
503         bal.serviceRoots = make(map[string]string)
504         bal.classes = defaultClasses
505         bal.mountsByClass = map[string]map[*KeepMount]bool{"default": {}}
506         bal.mounts = 0
507         for _, srv := range bal.KeepServices {
508                 bal.serviceRoots[srv.UUID] = srv.UUID
509                 for _, mnt := range srv.mounts {
510                         bal.mounts++
511
512                         // All mounts on a read-only service are
513                         // effectively read-only.
514                         mnt.ReadOnly = mnt.ReadOnly || srv.ReadOnly
515
516                         if len(mnt.StorageClasses) == 0 {
517                                 bal.mountsByClass["default"][mnt] = true
518                                 continue
519                         }
520                         for _, class := range mnt.StorageClasses {
521                                 if mbc := bal.mountsByClass[class]; mbc == nil {
522                                         bal.classes = append(bal.classes, class)
523                                         bal.mountsByClass[class] = map[*KeepMount]bool{mnt: true}
524                                 } else {
525                                         mbc[mnt] = true
526                                 }
527                         }
528                 }
529         }
530         // Consider classes in lexicographic order to avoid flapping
531         // between balancing runs.  The outcome of the "prefer a mount
532         // we're already planning to use for a different storage
533         // class" case in balanceBlock depends on the order classes
534         // are considered.
535         sort.Strings(bal.classes)
536 }
537
538 const (
539         changeStay = iota
540         changePull
541         changeTrash
542         changeNone
543 )
544
545 var changeName = map[int]string{
546         changeStay:  "stay",
547         changePull:  "pull",
548         changeTrash: "trash",
549         changeNone:  "none",
550 }
551
552 type balanceResult struct {
553         blk        *BlockState
554         blkid      arvados.SizedDigest
555         have       int
556         want       int
557         classState map[string]balancedBlockState
558 }
559
560 // balanceBlock compares current state to desired state for a single
561 // block, and makes the appropriate ChangeSet calls.
562 func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) balanceResult {
563         debugf("balanceBlock: %v %+v", blkid, blk)
564
565         type slot struct {
566                 mnt  *KeepMount // never nil
567                 repl *Replica   // replica already stored here (or nil)
568                 want bool       // we should pull/leave a replica here
569         }
570
571         // Build a list of all slots (one per mounted volume).
572         slots := make([]slot, 0, bal.mounts)
573         for _, srv := range bal.KeepServices {
574                 for _, mnt := range srv.mounts {
575                         var repl *Replica
576                         for r := range blk.Replicas {
577                                 if blk.Replicas[r].KeepMount == mnt {
578                                         repl = &blk.Replicas[r]
579                                 }
580                         }
581                         // Initial value of "want" is "have, and can't
582                         // delete". These untrashable replicas get
583                         // prioritized when sorting slots: otherwise,
584                         // non-optimal readonly copies would cause us
585                         // to overreplicate.
586                         slots = append(slots, slot{
587                                 mnt:  mnt,
588                                 repl: repl,
589                                 want: repl != nil && mnt.ReadOnly,
590                         })
591                 }
592         }
593
594         uuids := keepclient.NewRootSorter(bal.serviceRoots, string(blkid[:32])).GetSortedRoots()
595         srvRendezvous := make(map[*KeepService]int, len(uuids))
596         for i, uuid := range uuids {
597                 srv := bal.KeepServices[uuid]
598                 srvRendezvous[srv] = i
599         }
600
601         // Below we set underreplicated=true if we find any storage
602         // class that's currently underreplicated -- in that case we
603         // won't want to trash any replicas.
604         underreplicated := false
605
606         classState := make(map[string]balancedBlockState, len(bal.classes))
607         unsafeToDelete := make(map[int64]bool, len(slots))
608         for _, class := range bal.classes {
609                 desired := blk.Desired[class]
610
611                 countedDev := map[string]bool{}
612                 have := 0
613                 for _, slot := range slots {
614                         if slot.repl != nil && bal.mountsByClass[class][slot.mnt] && !countedDev[slot.mnt.DeviceID] {
615                                 have += slot.mnt.Replication
616                                 if slot.mnt.DeviceID != "" {
617                                         countedDev[slot.mnt.DeviceID] = true
618                                 }
619                         }
620                 }
621                 classState[class] = balancedBlockState{
622                         desired: desired,
623                         surplus: have - desired,
624                 }
625
626                 if desired == 0 {
627                         continue
628                 }
629
630                 // Sort the slots by desirability.
631                 sort.Slice(slots, func(i, j int) bool {
632                         si, sj := slots[i], slots[j]
633                         if classi, classj := bal.mountsByClass[class][si.mnt], bal.mountsByClass[class][sj.mnt]; classi != classj {
634                                 // Prefer a mount that satisfies the
635                                 // desired class.
636                                 return bal.mountsByClass[class][si.mnt]
637                         } else if si.want != sj.want {
638                                 // Prefer a mount that will have a
639                                 // replica no matter what we do here
640                                 // -- either because it already has an
641                                 // untrashable replica, or because we
642                                 // already need it to satisfy a
643                                 // different storage class.
644                                 return si.want
645                         } else if orderi, orderj := srvRendezvous[si.mnt.KeepService], srvRendezvous[sj.mnt.KeepService]; orderi != orderj {
646                                 // Prefer a better rendezvous
647                                 // position.
648                                 return orderi < orderj
649                         } else if repli, replj := si.repl != nil, sj.repl != nil; repli != replj {
650                                 // Prefer a mount that already has a
651                                 // replica.
652                                 return repli
653                         } else {
654                                 // If pull/trash turns out to be
655                                 // needed, distribute the
656                                 // new/remaining replicas uniformly
657                                 // across qualifying mounts on a given
658                                 // server.
659                                 return rendezvousLess(si.mnt.DeviceID, sj.mnt.DeviceID, blkid)
660                         }
661                 })
662
663                 // Servers/mounts/devices (with or without existing
664                 // replicas) that are part of the best achievable
665                 // layout for this storage class.
666                 wantSrv := map[*KeepService]bool{}
667                 wantMnt := map[*KeepMount]bool{}
668                 wantDev := map[string]bool{}
669                 // Positions (with existing replicas) that have been
670                 // protected (via unsafeToDelete) to ensure we don't
671                 // reduce replication below desired level when
672                 // trashing replicas that aren't optimal positions for
673                 // any storage class.
674                 protMnt := map[*KeepMount]bool{}
675                 // Replication planned so far (corresponds to wantMnt).
676                 replWant := 0
677                 // Protected replication (corresponds to protMnt).
678                 replProt := 0
679
680                 // trySlot tries using a slot to meet requirements,
681                 // and returns true if all requirements are met.
682                 trySlot := func(i int) bool {
683                         slot := slots[i]
684                         if wantMnt[slot.mnt] || wantDev[slot.mnt.DeviceID] {
685                                 // Already allocated a replica to this
686                                 // backend device, possibly on a
687                                 // different server.
688                                 return false
689                         }
690                         if replProt < desired && slot.repl != nil && !protMnt[slot.mnt] {
691                                 unsafeToDelete[slot.repl.Mtime] = true
692                                 protMnt[slot.mnt] = true
693                                 replProt += slot.mnt.Replication
694                         }
695                         if replWant < desired && (slot.repl != nil || !slot.mnt.ReadOnly) {
696                                 slots[i].want = true
697                                 wantSrv[slot.mnt.KeepService] = true
698                                 wantMnt[slot.mnt] = true
699                                 if slot.mnt.DeviceID != "" {
700                                         wantDev[slot.mnt.DeviceID] = true
701                                 }
702                                 replWant += slot.mnt.Replication
703                         }
704                         return replProt >= desired && replWant >= desired
705                 }
706
707                 // First try to achieve desired replication without
708                 // using the same server twice.
709                 done := false
710                 for i := 0; i < len(slots) && !done; i++ {
711                         if !wantSrv[slots[i].mnt.KeepService] {
712                                 done = trySlot(i)
713                         }
714                 }
715
716                 // If that didn't suffice, do another pass without the
717                 // "distinct services" restriction. (Achieving the
718                 // desired volume replication on fewer than the
719                 // desired number of services is better than
720                 // underreplicating.)
721                 for i := 0; i < len(slots) && !done; i++ {
722                         done = trySlot(i)
723                 }
724
725                 if !underreplicated {
726                         safe := 0
727                         for _, slot := range slots {
728                                 if slot.repl == nil || !bal.mountsByClass[class][slot.mnt] {
729                                         continue
730                                 }
731                                 if safe += slot.mnt.Replication; safe >= desired {
732                                         break
733                                 }
734                         }
735                         underreplicated = safe < desired
736                 }
737
738                 // set the unachievable flag if there aren't enough
739                 // slots offering the relevant storage class. (This is
740                 // as easy as checking slots[desired] because we
741                 // already sorted the qualifying slots to the front.)
742                 if desired >= len(slots) || !bal.mountsByClass[class][slots[desired].mnt] {
743                         cs := classState[class]
744                         cs.unachievable = true
745                         classState[class] = cs
746                 }
747
748                 // Avoid deleting wanted replicas from devices that
749                 // are mounted on multiple servers -- even if they
750                 // haven't already been added to unsafeToDelete
751                 // because the servers report different Mtimes.
752                 for _, slot := range slots {
753                         if slot.repl != nil && wantDev[slot.mnt.DeviceID] {
754                                 unsafeToDelete[slot.repl.Mtime] = true
755                         }
756                 }
757         }
758
759         // TODO: If multiple replicas are trashable, prefer the oldest
760         // replica that doesn't have a timestamp collision with
761         // others.
762
763         countedDev := map[string]bool{}
764         var have, want int
765         for _, slot := range slots {
766                 if countedDev[slot.mnt.DeviceID] {
767                         continue
768                 }
769                 if slot.want {
770                         want += slot.mnt.Replication
771                 }
772                 if slot.repl != nil {
773                         have += slot.mnt.Replication
774                 }
775                 if slot.mnt.DeviceID != "" {
776                         countedDev[slot.mnt.DeviceID] = true
777                 }
778         }
779
780         var changes []string
781         for _, slot := range slots {
782                 // TODO: request a Touch if Mtime is duplicated.
783                 var change int
784                 switch {
785                 case !underreplicated && !slot.want && slot.repl != nil && slot.repl.Mtime < bal.MinMtime && !unsafeToDelete[slot.repl.Mtime]:
786                         slot.mnt.KeepService.AddTrash(Trash{
787                                 SizedDigest: blkid,
788                                 Mtime:       slot.repl.Mtime,
789                                 From:        slot.mnt,
790                         })
791                         change = changeTrash
792                 case len(blk.Replicas) > 0 && slot.repl == nil && slot.want && !slot.mnt.ReadOnly:
793                         slot.mnt.KeepService.AddPull(Pull{
794                                 SizedDigest: blkid,
795                                 From:        blk.Replicas[0].KeepMount.KeepService,
796                                 To:          slot.mnt,
797                         })
798                         change = changePull
799                 case slot.repl != nil:
800                         change = changeStay
801                 default:
802                         change = changeNone
803                 }
804                 if bal.Dumper != nil {
805                         var mtime int64
806                         if slot.repl != nil {
807                                 mtime = slot.repl.Mtime
808                         }
809                         srv := slot.mnt.KeepService
810                         changes = append(changes, fmt.Sprintf("%s:%d/%s=%s,%d", srv.ServiceHost, srv.ServicePort, slot.mnt.UUID, changeName[change], mtime))
811                 }
812         }
813         if bal.Dumper != nil {
814                 bal.Dumper.Printf("%s refs=%d have=%d want=%v %v %v", blkid, blk.RefCount, have, want, blk.Desired, changes)
815         }
816         return balanceResult{
817                 blk:        blk,
818                 blkid:      blkid,
819                 have:       have,
820                 want:       want,
821                 classState: classState,
822         }
823 }
824
825 type blocksNBytes struct {
826         replicas int
827         blocks   int
828         bytes    int64
829 }
830
831 func (bb blocksNBytes) String() string {
832         return fmt.Sprintf("%d replicas (%d blocks, %d bytes)", bb.replicas, bb.blocks, bb.bytes)
833 }
834
835 type balancerStats struct {
836         lost          blocksNBytes
837         overrep       blocksNBytes
838         unref         blocksNBytes
839         garbage       blocksNBytes
840         underrep      blocksNBytes
841         unachievable  blocksNBytes
842         justright     blocksNBytes
843         desired       blocksNBytes
844         current       blocksNBytes
845         pulls         int
846         trashes       int
847         replHistogram []int
848         classStats    map[string]replicationStats
849
850         // collectionBytes / collectionBlockBytes = deduplication ratio
851         collectionBytes      int64 // sum(bytes in referenced blocks) across all collections
852         collectionBlockBytes int64 // sum(block size) across all blocks referenced by collections
853         collectionBlockRefs  int64 // sum(number of blocks referenced) across all collections
854         collectionBlocks     int64 // number of blocks referenced by any collection
855 }
856
857 func (s *balancerStats) dedupByteRatio() float64 {
858         if s.collectionBlockBytes == 0 {
859                 return 0
860         }
861         return float64(s.collectionBytes) / float64(s.collectionBlockBytes)
862 }
863
864 func (s *balancerStats) dedupBlockRatio() float64 {
865         if s.collectionBlocks == 0 {
866                 return 0
867         }
868         return float64(s.collectionBlockRefs) / float64(s.collectionBlocks)
869 }
870
871 type replicationStats struct {
872         desired      blocksNBytes
873         surplus      blocksNBytes
874         short        blocksNBytes
875         unachievable blocksNBytes
876 }
877
878 type balancedBlockState struct {
879         desired      int
880         surplus      int
881         unachievable bool
882 }
883
884 func (bal *Balancer) collectStatistics(results <-chan balanceResult) {
885         var s balancerStats
886         s.replHistogram = make([]int, 2)
887         s.classStats = make(map[string]replicationStats, len(bal.classes))
888         for result := range results {
889                 surplus := result.have - result.want
890                 bytes := result.blkid.Size()
891
892                 if rc := int64(result.blk.RefCount); rc > 0 {
893                         s.collectionBytes += rc * bytes
894                         s.collectionBlockBytes += bytes
895                         s.collectionBlockRefs += rc
896                         s.collectionBlocks++
897                 }
898
899                 for class, state := range result.classState {
900                         cs := s.classStats[class]
901                         if state.unachievable {
902                                 cs.unachievable.blocks++
903                                 cs.unachievable.bytes += bytes
904                         }
905                         if state.desired > 0 {
906                                 cs.desired.replicas += state.desired
907                                 cs.desired.blocks++
908                                 cs.desired.bytes += bytes * int64(state.desired)
909                         }
910                         if state.surplus > 0 {
911                                 cs.surplus.replicas += state.surplus
912                                 cs.surplus.blocks++
913                                 cs.surplus.bytes += bytes * int64(state.surplus)
914                         } else if state.surplus < 0 {
915                                 cs.short.replicas += -state.surplus
916                                 cs.short.blocks++
917                                 cs.short.bytes += bytes * int64(-state.surplus)
918                         }
919                         s.classStats[class] = cs
920                 }
921
922                 switch {
923                 case result.have == 0 && result.want > 0:
924                         s.lost.replicas -= surplus
925                         s.lost.blocks++
926                         s.lost.bytes += bytes * int64(-surplus)
927                         fmt.Fprintf(bal.lostBlocks, "%s\n", strings.SplitN(string(result.blkid), "+", 2)[0])
928                 case surplus < 0:
929                         s.underrep.replicas -= surplus
930                         s.underrep.blocks++
931                         s.underrep.bytes += bytes * int64(-surplus)
932                 case surplus > 0 && result.want == 0:
933                         counter := &s.garbage
934                         for _, r := range result.blk.Replicas {
935                                 if r.Mtime >= bal.MinMtime {
936                                         counter = &s.unref
937                                         break
938                                 }
939                         }
940                         counter.replicas += surplus
941                         counter.blocks++
942                         counter.bytes += bytes * int64(surplus)
943                 case surplus > 0:
944                         s.overrep.replicas += surplus
945                         s.overrep.blocks++
946                         s.overrep.bytes += bytes * int64(result.have-result.want)
947                 default:
948                         s.justright.replicas += result.want
949                         s.justright.blocks++
950                         s.justright.bytes += bytes * int64(result.want)
951                 }
952
953                 if result.want > 0 {
954                         s.desired.replicas += result.want
955                         s.desired.blocks++
956                         s.desired.bytes += bytes * int64(result.want)
957                 }
958                 if result.have > 0 {
959                         s.current.replicas += result.have
960                         s.current.blocks++
961                         s.current.bytes += bytes * int64(result.have)
962                 }
963
964                 for len(s.replHistogram) <= result.have {
965                         s.replHistogram = append(s.replHistogram, 0)
966                 }
967                 s.replHistogram[result.have]++
968         }
969         for _, srv := range bal.KeepServices {
970                 s.pulls += len(srv.ChangeSet.Pulls)
971                 s.trashes += len(srv.ChangeSet.Trashes)
972         }
973         bal.stats = s
974         bal.Metrics.UpdateStats(s)
975 }
976
977 // PrintStatistics writes statistics about the computed changes to
978 // bal.Logger. It should not be called until ComputeChangeSets has
979 // finished.
980 func (bal *Balancer) PrintStatistics() {
981         bal.logf("===")
982         bal.logf("%s lost (0=have<want)", bal.stats.lost)
983         bal.logf("%s underreplicated (0<have<want)", bal.stats.underrep)
984         bal.logf("%s just right (have=want)", bal.stats.justright)
985         bal.logf("%s overreplicated (have>want>0)", bal.stats.overrep)
986         bal.logf("%s unreferenced (have>want=0, new)", bal.stats.unref)
987         bal.logf("%s garbage (have>want=0, old)", bal.stats.garbage)
988         for _, class := range bal.classes {
989                 cs := bal.stats.classStats[class]
990                 bal.logf("===")
991                 bal.logf("storage class %q: %s desired", class, cs.desired)
992                 bal.logf("storage class %q: %s short", class, cs.short)
993                 bal.logf("storage class %q: %s surplus", class, cs.surplus)
994                 bal.logf("storage class %q: %s unachievable", class, cs.unachievable)
995         }
996         bal.logf("===")
997         bal.logf("%s total commitment (excluding unreferenced)", bal.stats.desired)
998         bal.logf("%s total usage", bal.stats.current)
999         bal.logf("===")
1000         for _, srv := range bal.KeepServices {
1001                 bal.logf("%s: %v\n", srv, srv.ChangeSet)
1002         }
1003         bal.logf("===")
1004         bal.printHistogram(60)
1005         bal.logf("===")
1006 }
1007
1008 func (bal *Balancer) printHistogram(hashColumns int) {
1009         bal.logf("Replication level distribution (counting N replicas on a single server as N):")
1010         maxCount := 0
1011         for _, count := range bal.stats.replHistogram {
1012                 if maxCount < count {
1013                         maxCount = count
1014                 }
1015         }
1016         hashes := strings.Repeat("#", hashColumns)
1017         countWidth := 1 + int(math.Log10(float64(maxCount+1)))
1018         scaleCount := 10 * float64(hashColumns) / math.Floor(1+10*math.Log10(float64(maxCount+1)))
1019         for repl, count := range bal.stats.replHistogram {
1020                 nHashes := int(scaleCount * math.Log10(float64(count+1)))
1021                 bal.logf("%2d: %*d %s", repl, countWidth, count, hashes[:nHashes])
1022         }
1023 }
1024
1025 // CheckSanityLate checks for configuration and runtime errors after
1026 // GetCurrentState() and ComputeChangeSets() have finished.
1027 //
1028 // If it returns an error, it is dangerous to run any Commit methods.
1029 func (bal *Balancer) CheckSanityLate() error {
1030         if bal.errors != nil {
1031                 for _, err := range bal.errors {
1032                         bal.logf("deferred error: %v", err)
1033                 }
1034                 return fmt.Errorf("cannot proceed safely after deferred errors")
1035         }
1036
1037         if bal.collScanned == 0 {
1038                 return fmt.Errorf("received zero collections")
1039         }
1040
1041         anyDesired := false
1042         bal.BlockStateMap.Apply(func(_ arvados.SizedDigest, blk *BlockState) {
1043                 for _, desired := range blk.Desired {
1044                         if desired > 0 {
1045                                 anyDesired = true
1046                                 break
1047                         }
1048                 }
1049         })
1050         if !anyDesired {
1051                 return fmt.Errorf("zero blocks have desired replication>0")
1052         }
1053
1054         if dr := bal.DefaultReplication; dr < 1 {
1055                 return fmt.Errorf("Default replication (%d) is less than 1", dr)
1056         }
1057
1058         // TODO: no two services have identical indexes
1059         // TODO: no collisions (same md5, different size)
1060         return nil
1061 }
1062
1063 // CommitPulls sends the computed lists of pull requests to the
1064 // keepstore servers. This has the effect of increasing replication of
1065 // existing blocks that are either underreplicated or poorly
1066 // distributed according to rendezvous hashing.
1067 func (bal *Balancer) CommitPulls(c *arvados.Client) error {
1068         defer bal.time("send_pull_lists", "wall clock time to send pull lists")()
1069         return bal.commitAsync(c, "send pull list",
1070                 func(srv *KeepService) error {
1071                         return srv.CommitPulls(c)
1072                 })
1073 }
1074
1075 // CommitTrash sends the computed lists of trash requests to the
1076 // keepstore servers. This has the effect of deleting blocks that are
1077 // overreplicated or unreferenced.
1078 func (bal *Balancer) CommitTrash(c *arvados.Client) error {
1079         defer bal.time("send_trash_lists", "wall clock time to send trash lists")()
1080         return bal.commitAsync(c, "send trash list",
1081                 func(srv *KeepService) error {
1082                         return srv.CommitTrash(c)
1083                 })
1084 }
1085
1086 func (bal *Balancer) commitAsync(c *arvados.Client, label string, f func(srv *KeepService) error) error {
1087         errs := make(chan error)
1088         for _, srv := range bal.KeepServices {
1089                 go func(srv *KeepService) {
1090                         var err error
1091                         defer func() { errs <- err }()
1092                         label := fmt.Sprintf("%s: %v", srv, label)
1093                         err = f(srv)
1094                         if err != nil {
1095                                 err = fmt.Errorf("%s: %v", label, err)
1096                         }
1097                 }(srv)
1098         }
1099         var lastErr error
1100         for range bal.KeepServices {
1101                 if err := <-errs; err != nil {
1102                         bal.logf("%v", err)
1103                         lastErr = err
1104                 }
1105         }
1106         close(errs)
1107         return lastErr
1108 }
1109
1110 func (bal *Balancer) logf(f string, args ...interface{}) {
1111         if bal.Logger != nil {
1112                 bal.Logger.Printf(f, args...)
1113         }
1114 }
1115
1116 func (bal *Balancer) time(name, help string) func() {
1117         observer := bal.Metrics.DurationObserver(name+"_seconds", help)
1118         t0 := time.Now()
1119         bal.Logger.Printf("%s: start", name)
1120         return func() {
1121                 dur := time.Since(t0)
1122                 observer.Observe(dur.Seconds())
1123                 bal.Logger.Printf("%s: took %vs", name, dur.Seconds())
1124         }
1125 }
1126
1127 // Rendezvous hash sort function. Less efficient than sorting on
1128 // precomputed rendezvous hashes, but also rarely used.
1129 func rendezvousLess(i, j string, blkid arvados.SizedDigest) bool {
1130         a := md5.Sum([]byte(string(blkid[:32]) + i))
1131         b := md5.Sum([]byte(string(blkid[:32]) + j))
1132         return bytes.Compare(a[:], b[:]) < 0
1133 }