15148: Include list of affected PDHs in LostBlocksFile.
[arvados.git] / services / keep-balance / balance.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "crypto/md5"
10         "fmt"
11         "io"
12         "io/ioutil"
13         "log"
14         "math"
15         "os"
16         "runtime"
17         "sort"
18         "strings"
19         "sync"
20         "syscall"
21         "time"
22
23         "git.curoverse.com/arvados.git/sdk/go/arvados"
24         "git.curoverse.com/arvados.git/sdk/go/keepclient"
25         "github.com/sirupsen/logrus"
26 )
27
28 // Balancer compares the contents of keepstore servers with the
29 // collections stored in Arvados, and issues pull/trash requests
30 // needed to get (closer to) the optimal data layout.
31 //
32 // In the optimal data layout: every data block referenced by a
33 // collection is replicated at least as many times as desired by the
34 // collection; there are no unreferenced data blocks older than
35 // BlobSignatureTTL; and all N existing replicas of a given data block
36 // are in the N best positions in rendezvous probe order.
37 type Balancer struct {
38         Logger  logrus.FieldLogger
39         Dumper  logrus.FieldLogger
40         Metrics *metrics
41
42         LostBlocksFile string
43
44         *BlockStateMap
45         KeepServices       map[string]*KeepService
46         DefaultReplication int
47         MinMtime           int64
48
49         classes       []string
50         mounts        int
51         mountsByClass map[string]map[*KeepMount]bool
52         collScanned   int
53         serviceRoots  map[string]string
54         errors        []error
55         stats         balancerStats
56         mutex         sync.Mutex
57         lostBlocks    io.Writer
58 }
59
60 // Run performs a balance operation using the given config and
61 // runOptions, and returns RunOptions suitable for passing to a
62 // subsequent balance operation.
63 //
64 // Run should only be called once on a given Balancer object.
65 //
66 // Typical usage:
67 //
68 //   runOptions, err = (&Balancer{}).Run(config, runOptions)
69 func (bal *Balancer) Run(config Config, runOptions RunOptions) (nextRunOptions RunOptions, err error) {
70         nextRunOptions = runOptions
71
72         defer bal.time("sweep", "wall clock time to run one full sweep")()
73
74         var lbFile *os.File
75         if bal.LostBlocksFile != "" {
76                 tmpfn := bal.LostBlocksFile + ".tmp"
77                 lbFile, err = os.OpenFile(tmpfn, os.O_CREATE|os.O_WRONLY, 0777)
78                 if err != nil {
79                         return
80                 }
81                 defer lbFile.Close()
82                 err = syscall.Flock(int(lbFile.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
83                 if err != nil {
84                         return
85                 }
86                 defer func() {
87                         // Remove the tempfile only if we didn't get
88                         // as far as successfully renaming it.
89                         if lbFile != nil {
90                                 os.Remove(tmpfn)
91                         }
92                 }()
93                 bal.lostBlocks = lbFile
94         } else {
95                 bal.lostBlocks = ioutil.Discard
96         }
97
98         if len(config.KeepServiceList.Items) > 0 {
99                 err = bal.SetKeepServices(config.KeepServiceList)
100         } else {
101                 err = bal.DiscoverKeepServices(&config.Client, config.KeepServiceTypes)
102         }
103         if err != nil {
104                 return
105         }
106
107         for _, srv := range bal.KeepServices {
108                 err = srv.discoverMounts(&config.Client)
109                 if err != nil {
110                         return
111                 }
112         }
113         bal.cleanupMounts()
114
115         if err = bal.CheckSanityEarly(&config.Client); err != nil {
116                 return
117         }
118         rs := bal.rendezvousState()
119         if runOptions.CommitTrash && rs != runOptions.SafeRendezvousState {
120                 if runOptions.SafeRendezvousState != "" {
121                         bal.logf("notice: KeepServices list has changed since last run")
122                 }
123                 bal.logf("clearing existing trash lists, in case the new rendezvous order differs from previous run")
124                 if err = bal.ClearTrashLists(&config.Client); err != nil {
125                         return
126                 }
127                 // The current rendezvous state becomes "safe" (i.e.,
128                 // OK to compute changes for that state without
129                 // clearing existing trash lists) only now, after we
130                 // succeed in clearing existing trash lists.
131                 nextRunOptions.SafeRendezvousState = rs
132         }
133         if err = bal.GetCurrentState(&config.Client, config.CollectionBatchSize, config.CollectionBuffers); err != nil {
134                 return
135         }
136         bal.ComputeChangeSets()
137         bal.PrintStatistics()
138         if err = bal.CheckSanityLate(); err != nil {
139                 return
140         }
141         if lbFile != nil {
142                 err = lbFile.Sync()
143                 if err != nil {
144                         return
145                 }
146                 err = os.Rename(bal.LostBlocksFile+".tmp", bal.LostBlocksFile)
147                 if err != nil {
148                         return
149                 }
150                 lbFile = nil
151         }
152         if runOptions.CommitPulls {
153                 err = bal.CommitPulls(&config.Client)
154                 if err != nil {
155                         // Skip trash if we can't pull. (Too cautious?)
156                         return
157                 }
158         }
159         if runOptions.CommitTrash {
160                 err = bal.CommitTrash(&config.Client)
161         }
162         return
163 }
164
165 // SetKeepServices sets the list of KeepServices to operate on.
166 func (bal *Balancer) SetKeepServices(srvList arvados.KeepServiceList) error {
167         bal.KeepServices = make(map[string]*KeepService)
168         for _, srv := range srvList.Items {
169                 bal.KeepServices[srv.UUID] = &KeepService{
170                         KeepService: srv,
171                         ChangeSet:   &ChangeSet{},
172                 }
173         }
174         return nil
175 }
176
177 // DiscoverKeepServices sets the list of KeepServices by calling the
178 // API to get a list of all services, and selecting the ones whose
179 // ServiceType is in okTypes.
180 func (bal *Balancer) DiscoverKeepServices(c *arvados.Client, okTypes []string) error {
181         bal.KeepServices = make(map[string]*KeepService)
182         ok := make(map[string]bool)
183         for _, t := range okTypes {
184                 ok[t] = true
185         }
186         return c.EachKeepService(func(srv arvados.KeepService) error {
187                 if ok[srv.ServiceType] {
188                         bal.KeepServices[srv.UUID] = &KeepService{
189                                 KeepService: srv,
190                                 ChangeSet:   &ChangeSet{},
191                         }
192                 } else {
193                         bal.logf("skipping %v with service type %q", srv.UUID, srv.ServiceType)
194                 }
195                 return nil
196         })
197 }
198
199 func (bal *Balancer) cleanupMounts() {
200         rwdev := map[string]*KeepService{}
201         for _, srv := range bal.KeepServices {
202                 for _, mnt := range srv.mounts {
203                         if !mnt.ReadOnly && mnt.DeviceID != "" {
204                                 rwdev[mnt.DeviceID] = srv
205                         }
206                 }
207         }
208         // Drop the readonly mounts whose device is mounted RW
209         // elsewhere.
210         for _, srv := range bal.KeepServices {
211                 var dedup []*KeepMount
212                 for _, mnt := range srv.mounts {
213                         if mnt.ReadOnly && rwdev[mnt.DeviceID] != nil {
214                                 bal.logf("skipping srv %s readonly mount %q because same device %q is mounted read-write on srv %s", srv, mnt.UUID, mnt.DeviceID, rwdev[mnt.DeviceID])
215                         } else {
216                                 dedup = append(dedup, mnt)
217                         }
218                 }
219                 srv.mounts = dedup
220         }
221         for _, srv := range bal.KeepServices {
222                 for _, mnt := range srv.mounts {
223                         if mnt.Replication <= 0 {
224                                 log.Printf("%s: mount %s reports replication=%d, using replication=1", srv, mnt.UUID, mnt.Replication)
225                                 mnt.Replication = 1
226                         }
227                 }
228         }
229 }
230
231 // CheckSanityEarly checks for configuration and runtime errors that
232 // can be detected before GetCurrentState() and ComputeChangeSets()
233 // are called.
234 //
235 // If it returns an error, it is pointless to run GetCurrentState or
236 // ComputeChangeSets: after doing so, the statistics would be
237 // meaningless and it would be dangerous to run any Commit methods.
238 func (bal *Balancer) CheckSanityEarly(c *arvados.Client) error {
239         u, err := c.CurrentUser()
240         if err != nil {
241                 return fmt.Errorf("CurrentUser(): %v", err)
242         }
243         if !u.IsActive || !u.IsAdmin {
244                 return fmt.Errorf("current user (%s) is not an active admin user", u.UUID)
245         }
246         for _, srv := range bal.KeepServices {
247                 if srv.ServiceType == "proxy" {
248                         return fmt.Errorf("config error: %s: proxy servers cannot be balanced", srv)
249                 }
250         }
251
252         var checkPage arvados.CollectionList
253         if err = c.RequestAndDecode(&checkPage, "GET", "arvados/v1/collections", nil, arvados.ResourceListParams{
254                 Limit:              new(int),
255                 Count:              "exact",
256                 IncludeTrash:       true,
257                 IncludeOldVersions: true,
258                 Filters: []arvados.Filter{{
259                         Attr:     "modified_at",
260                         Operator: "=",
261                         Operand:  nil,
262                 }},
263         }); err != nil {
264                 return err
265         } else if n := checkPage.ItemsAvailable; n > 0 {
266                 return fmt.Errorf("%d collections exist with null modified_at; cannot fetch reliably", n)
267         }
268
269         return nil
270 }
271
272 // rendezvousState returns a fingerprint (e.g., a sorted list of
273 // UUID+host+port) of the current set of keep services.
274 func (bal *Balancer) rendezvousState() string {
275         srvs := make([]string, 0, len(bal.KeepServices))
276         for _, srv := range bal.KeepServices {
277                 srvs = append(srvs, srv.String())
278         }
279         sort.Strings(srvs)
280         return strings.Join(srvs, "; ")
281 }
282
283 // ClearTrashLists sends an empty trash list to each keep
284 // service. Calling this before GetCurrentState avoids races.
285 //
286 // When a block appears in an index, we assume that replica will still
287 // exist after we delete other replicas on other servers. However,
288 // it's possible that a previous rebalancing operation made different
289 // decisions (e.g., servers were added/removed, and rendezvous order
290 // changed). In this case, the replica might already be on that
291 // server's trash list, and it might be deleted before we send a
292 // replacement trash list.
293 //
294 // We avoid this problem if we clear all trash lists before getting
295 // indexes. (We also assume there is only one rebalancing process
296 // running at a time.)
297 func (bal *Balancer) ClearTrashLists(c *arvados.Client) error {
298         for _, srv := range bal.KeepServices {
299                 srv.ChangeSet = &ChangeSet{}
300         }
301         return bal.CommitTrash(c)
302 }
303
304 // GetCurrentState determines the current replication state, and the
305 // desired replication level, for every block that is either
306 // retrievable or referenced.
307 //
308 // It determines the current replication state by reading the block index
309 // from every known Keep service.
310 //
311 // It determines the desired replication level by retrieving all
312 // collection manifests in the database (API server).
313 //
314 // It encodes the resulting information in BlockStateMap.
315 func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) error {
316         defer bal.time("get_state", "wall clock time to get current state")()
317         bal.BlockStateMap = NewBlockStateMap()
318
319         dd, err := c.DiscoveryDocument()
320         if err != nil {
321                 return err
322         }
323         bal.DefaultReplication = dd.DefaultCollectionReplication
324         bal.MinMtime = time.Now().UnixNano() - dd.BlobSignatureTTL*1e9
325
326         errs := make(chan error, 1)
327         wg := sync.WaitGroup{}
328
329         // When a device is mounted more than once, we will get its
330         // index only once, and call AddReplicas on all of the mounts.
331         // equivMount keys are the mounts that will be indexed, and
332         // each value is a list of mounts to apply the received index
333         // to.
334         equivMount := map[*KeepMount][]*KeepMount{}
335         // deviceMount maps each device ID to the one mount that will
336         // be indexed for that device.
337         deviceMount := map[string]*KeepMount{}
338         for _, srv := range bal.KeepServices {
339                 for _, mnt := range srv.mounts {
340                         equiv := deviceMount[mnt.DeviceID]
341                         if equiv == nil {
342                                 equiv = mnt
343                                 if mnt.DeviceID != "" {
344                                         deviceMount[mnt.DeviceID] = equiv
345                                 }
346                         }
347                         equivMount[equiv] = append(equivMount[equiv], mnt)
348                 }
349         }
350
351         // Start one goroutine for each (non-redundant) mount:
352         // retrieve the index, and add the returned blocks to
353         // BlockStateMap.
354         for _, mounts := range equivMount {
355                 wg.Add(1)
356                 go func(mounts []*KeepMount) {
357                         defer wg.Done()
358                         bal.logf("mount %s: retrieve index from %s", mounts[0], mounts[0].KeepService)
359                         idx, err := mounts[0].KeepService.IndexMount(c, mounts[0].UUID, "")
360                         if err != nil {
361                                 select {
362                                 case errs <- fmt.Errorf("%s: retrieve index: %v", mounts[0], err):
363                                 default:
364                                 }
365                                 return
366                         }
367                         if len(errs) > 0 {
368                                 // Some other goroutine encountered an
369                                 // error -- any further effort here
370                                 // will be wasted.
371                                 return
372                         }
373                         for _, mount := range mounts {
374                                 bal.logf("%s: add %d entries to map", mount, len(idx))
375                                 bal.BlockStateMap.AddReplicas(mount, idx)
376                                 bal.logf("%s: added %d entries to map at %dx (%d replicas)", mount, len(idx), mount.Replication, len(idx)*mount.Replication)
377                         }
378                         bal.logf("mount %s: index done", mounts[0])
379                 }(mounts)
380         }
381
382         // collQ buffers incoming collections so we can start fetching
383         // the next page without waiting for the current page to
384         // finish processing.
385         collQ := make(chan arvados.Collection, bufs)
386
387         // Start a goroutine to process collections. (We could use a
388         // worker pool here, but even with a single worker we already
389         // process collections much faster than we can retrieve them.)
390         wg.Add(1)
391         go func() {
392                 defer wg.Done()
393                 for coll := range collQ {
394                         err := bal.addCollection(coll)
395                         if err != nil || len(errs) > 0 {
396                                 select {
397                                 case errs <- err:
398                                 default:
399                                 }
400                                 for range collQ {
401                                 }
402                                 return
403                         }
404                         bal.collScanned++
405                 }
406         }()
407
408         // Start a goroutine to retrieve all collections from the
409         // Arvados database and send them to collQ for processing.
410         wg.Add(1)
411         go func() {
412                 defer wg.Done()
413                 err = EachCollection(c, pageSize,
414                         func(coll arvados.Collection) error {
415                                 collQ <- coll
416                                 if len(errs) > 0 {
417                                         // some other GetCurrentState
418                                         // error happened: no point
419                                         // getting any more
420                                         // collections.
421                                         return fmt.Errorf("")
422                                 }
423                                 return nil
424                         }, func(done, total int) {
425                                 bal.logf("collections: %d/%d", done, total)
426                         })
427                 close(collQ)
428                 if err != nil {
429                         select {
430                         case errs <- err:
431                         default:
432                         }
433                 }
434         }()
435
436         wg.Wait()
437         if len(errs) > 0 {
438                 return <-errs
439         }
440         return nil
441 }
442
443 func (bal *Balancer) addCollection(coll arvados.Collection) error {
444         blkids, err := coll.SizedDigests()
445         if err != nil {
446                 return fmt.Errorf("%v: %v", coll.UUID, err)
447         }
448         repl := bal.DefaultReplication
449         if coll.ReplicationDesired != nil {
450                 repl = *coll.ReplicationDesired
451         }
452         debugf("%v: %d block x%d", coll.UUID, len(blkids), repl)
453         // Pass pdh to IncreaseDesired only if LostBlocksFile is being
454         // written -- otherwise it's just a waste of memory.
455         pdh := ""
456         if bal.LostBlocksFile != "" {
457                 pdh = coll.PortableDataHash
458         }
459         bal.BlockStateMap.IncreaseDesired(pdh, coll.StorageClassesDesired, repl, blkids)
460         return nil
461 }
462
463 // ComputeChangeSets compares, for each known block, the current and
464 // desired replication states. If it is possible to get closer to the
465 // desired state by copying or deleting blocks, it adds those changes
466 // to the relevant KeepServices' ChangeSets.
467 //
468 // It does not actually apply any of the computed changes.
469 func (bal *Balancer) ComputeChangeSets() {
470         // This just calls balanceBlock() once for each block, using a
471         // pool of worker goroutines.
472         defer bal.time("changeset_compute", "wall clock time to compute changesets")()
473         bal.setupLookupTables()
474
475         type balanceTask struct {
476                 blkid arvados.SizedDigest
477                 blk   *BlockState
478         }
479         workers := runtime.GOMAXPROCS(-1)
480         todo := make(chan balanceTask, workers)
481         go func() {
482                 bal.BlockStateMap.Apply(func(blkid arvados.SizedDigest, blk *BlockState) {
483                         todo <- balanceTask{
484                                 blkid: blkid,
485                                 blk:   blk,
486                         }
487                 })
488                 close(todo)
489         }()
490         results := make(chan balanceResult, workers)
491         go func() {
492                 var wg sync.WaitGroup
493                 for i := 0; i < workers; i++ {
494                         wg.Add(1)
495                         go func() {
496                                 for work := range todo {
497                                         results <- bal.balanceBlock(work.blkid, work.blk)
498                                 }
499                                 wg.Done()
500                         }()
501                 }
502                 wg.Wait()
503                 close(results)
504         }()
505         bal.collectStatistics(results)
506 }
507
508 func (bal *Balancer) setupLookupTables() {
509         bal.serviceRoots = make(map[string]string)
510         bal.classes = defaultClasses
511         bal.mountsByClass = map[string]map[*KeepMount]bool{"default": {}}
512         bal.mounts = 0
513         for _, srv := range bal.KeepServices {
514                 bal.serviceRoots[srv.UUID] = srv.UUID
515                 for _, mnt := range srv.mounts {
516                         bal.mounts++
517
518                         // All mounts on a read-only service are
519                         // effectively read-only.
520                         mnt.ReadOnly = mnt.ReadOnly || srv.ReadOnly
521
522                         if len(mnt.StorageClasses) == 0 {
523                                 bal.mountsByClass["default"][mnt] = true
524                                 continue
525                         }
526                         for _, class := range mnt.StorageClasses {
527                                 if mbc := bal.mountsByClass[class]; mbc == nil {
528                                         bal.classes = append(bal.classes, class)
529                                         bal.mountsByClass[class] = map[*KeepMount]bool{mnt: true}
530                                 } else {
531                                         mbc[mnt] = true
532                                 }
533                         }
534                 }
535         }
536         // Consider classes in lexicographic order to avoid flapping
537         // between balancing runs.  The outcome of the "prefer a mount
538         // we're already planning to use for a different storage
539         // class" case in balanceBlock depends on the order classes
540         // are considered.
541         sort.Strings(bal.classes)
542 }
543
544 const (
545         changeStay = iota
546         changePull
547         changeTrash
548         changeNone
549 )
550
551 var changeName = map[int]string{
552         changeStay:  "stay",
553         changePull:  "pull",
554         changeTrash: "trash",
555         changeNone:  "none",
556 }
557
558 type balanceResult struct {
559         blk        *BlockState
560         blkid      arvados.SizedDigest
561         have       int
562         want       int
563         classState map[string]balancedBlockState
564 }
565
566 // balanceBlock compares current state to desired state for a single
567 // block, and makes the appropriate ChangeSet calls.
568 func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) balanceResult {
569         debugf("balanceBlock: %v %+v", blkid, blk)
570
571         type slot struct {
572                 mnt  *KeepMount // never nil
573                 repl *Replica   // replica already stored here (or nil)
574                 want bool       // we should pull/leave a replica here
575         }
576
577         // Build a list of all slots (one per mounted volume).
578         slots := make([]slot, 0, bal.mounts)
579         for _, srv := range bal.KeepServices {
580                 for _, mnt := range srv.mounts {
581                         var repl *Replica
582                         for r := range blk.Replicas {
583                                 if blk.Replicas[r].KeepMount == mnt {
584                                         repl = &blk.Replicas[r]
585                                 }
586                         }
587                         // Initial value of "want" is "have, and can't
588                         // delete". These untrashable replicas get
589                         // prioritized when sorting slots: otherwise,
590                         // non-optimal readonly copies would cause us
591                         // to overreplicate.
592                         slots = append(slots, slot{
593                                 mnt:  mnt,
594                                 repl: repl,
595                                 want: repl != nil && mnt.ReadOnly,
596                         })
597                 }
598         }
599
600         uuids := keepclient.NewRootSorter(bal.serviceRoots, string(blkid[:32])).GetSortedRoots()
601         srvRendezvous := make(map[*KeepService]int, len(uuids))
602         for i, uuid := range uuids {
603                 srv := bal.KeepServices[uuid]
604                 srvRendezvous[srv] = i
605         }
606
607         // Below we set underreplicated=true if we find any storage
608         // class that's currently underreplicated -- in that case we
609         // won't want to trash any replicas.
610         underreplicated := false
611
612         classState := make(map[string]balancedBlockState, len(bal.classes))
613         unsafeToDelete := make(map[int64]bool, len(slots))
614         for _, class := range bal.classes {
615                 desired := blk.Desired[class]
616
617                 countedDev := map[string]bool{}
618                 have := 0
619                 for _, slot := range slots {
620                         if slot.repl != nil && bal.mountsByClass[class][slot.mnt] && !countedDev[slot.mnt.DeviceID] {
621                                 have += slot.mnt.Replication
622                                 if slot.mnt.DeviceID != "" {
623                                         countedDev[slot.mnt.DeviceID] = true
624                                 }
625                         }
626                 }
627                 classState[class] = balancedBlockState{
628                         desired: desired,
629                         surplus: have - desired,
630                 }
631
632                 if desired == 0 {
633                         continue
634                 }
635
636                 // Sort the slots by desirability.
637                 sort.Slice(slots, func(i, j int) bool {
638                         si, sj := slots[i], slots[j]
639                         if classi, classj := bal.mountsByClass[class][si.mnt], bal.mountsByClass[class][sj.mnt]; classi != classj {
640                                 // Prefer a mount that satisfies the
641                                 // desired class.
642                                 return bal.mountsByClass[class][si.mnt]
643                         } else if si.want != sj.want {
644                                 // Prefer a mount that will have a
645                                 // replica no matter what we do here
646                                 // -- either because it already has an
647                                 // untrashable replica, or because we
648                                 // already need it to satisfy a
649                                 // different storage class.
650                                 return si.want
651                         } else if orderi, orderj := srvRendezvous[si.mnt.KeepService], srvRendezvous[sj.mnt.KeepService]; orderi != orderj {
652                                 // Prefer a better rendezvous
653                                 // position.
654                                 return orderi < orderj
655                         } else if repli, replj := si.repl != nil, sj.repl != nil; repli != replj {
656                                 // Prefer a mount that already has a
657                                 // replica.
658                                 return repli
659                         } else {
660                                 // If pull/trash turns out to be
661                                 // needed, distribute the
662                                 // new/remaining replicas uniformly
663                                 // across qualifying mounts on a given
664                                 // server.
665                                 return rendezvousLess(si.mnt.DeviceID, sj.mnt.DeviceID, blkid)
666                         }
667                 })
668
669                 // Servers/mounts/devices (with or without existing
670                 // replicas) that are part of the best achievable
671                 // layout for this storage class.
672                 wantSrv := map[*KeepService]bool{}
673                 wantMnt := map[*KeepMount]bool{}
674                 wantDev := map[string]bool{}
675                 // Positions (with existing replicas) that have been
676                 // protected (via unsafeToDelete) to ensure we don't
677                 // reduce replication below desired level when
678                 // trashing replicas that aren't optimal positions for
679                 // any storage class.
680                 protMnt := map[*KeepMount]bool{}
681                 // Replication planned so far (corresponds to wantMnt).
682                 replWant := 0
683                 // Protected replication (corresponds to protMnt).
684                 replProt := 0
685
686                 // trySlot tries using a slot to meet requirements,
687                 // and returns true if all requirements are met.
688                 trySlot := func(i int) bool {
689                         slot := slots[i]
690                         if wantMnt[slot.mnt] || wantDev[slot.mnt.DeviceID] {
691                                 // Already allocated a replica to this
692                                 // backend device, possibly on a
693                                 // different server.
694                                 return false
695                         }
696                         if replProt < desired && slot.repl != nil && !protMnt[slot.mnt] {
697                                 unsafeToDelete[slot.repl.Mtime] = true
698                                 protMnt[slot.mnt] = true
699                                 replProt += slot.mnt.Replication
700                         }
701                         if replWant < desired && (slot.repl != nil || !slot.mnt.ReadOnly) {
702                                 slots[i].want = true
703                                 wantSrv[slot.mnt.KeepService] = true
704                                 wantMnt[slot.mnt] = true
705                                 if slot.mnt.DeviceID != "" {
706                                         wantDev[slot.mnt.DeviceID] = true
707                                 }
708                                 replWant += slot.mnt.Replication
709                         }
710                         return replProt >= desired && replWant >= desired
711                 }
712
713                 // First try to achieve desired replication without
714                 // using the same server twice.
715                 done := false
716                 for i := 0; i < len(slots) && !done; i++ {
717                         if !wantSrv[slots[i].mnt.KeepService] {
718                                 done = trySlot(i)
719                         }
720                 }
721
722                 // If that didn't suffice, do another pass without the
723                 // "distinct services" restriction. (Achieving the
724                 // desired volume replication on fewer than the
725                 // desired number of services is better than
726                 // underreplicating.)
727                 for i := 0; i < len(slots) && !done; i++ {
728                         done = trySlot(i)
729                 }
730
731                 if !underreplicated {
732                         safe := 0
733                         for _, slot := range slots {
734                                 if slot.repl == nil || !bal.mountsByClass[class][slot.mnt] {
735                                         continue
736                                 }
737                                 if safe += slot.mnt.Replication; safe >= desired {
738                                         break
739                                 }
740                         }
741                         underreplicated = safe < desired
742                 }
743
744                 // set the unachievable flag if there aren't enough
745                 // slots offering the relevant storage class. (This is
746                 // as easy as checking slots[desired] because we
747                 // already sorted the qualifying slots to the front.)
748                 if desired >= len(slots) || !bal.mountsByClass[class][slots[desired].mnt] {
749                         cs := classState[class]
750                         cs.unachievable = true
751                         classState[class] = cs
752                 }
753
754                 // Avoid deleting wanted replicas from devices that
755                 // are mounted on multiple servers -- even if they
756                 // haven't already been added to unsafeToDelete
757                 // because the servers report different Mtimes.
758                 for _, slot := range slots {
759                         if slot.repl != nil && wantDev[slot.mnt.DeviceID] {
760                                 unsafeToDelete[slot.repl.Mtime] = true
761                         }
762                 }
763         }
764
765         // TODO: If multiple replicas are trashable, prefer the oldest
766         // replica that doesn't have a timestamp collision with
767         // others.
768
769         countedDev := map[string]bool{}
770         var have, want int
771         for _, slot := range slots {
772                 if countedDev[slot.mnt.DeviceID] {
773                         continue
774                 }
775                 if slot.want {
776                         want += slot.mnt.Replication
777                 }
778                 if slot.repl != nil {
779                         have += slot.mnt.Replication
780                 }
781                 if slot.mnt.DeviceID != "" {
782                         countedDev[slot.mnt.DeviceID] = true
783                 }
784         }
785
786         var changes []string
787         for _, slot := range slots {
788                 // TODO: request a Touch if Mtime is duplicated.
789                 var change int
790                 switch {
791                 case !underreplicated && !slot.want && slot.repl != nil && slot.repl.Mtime < bal.MinMtime && !unsafeToDelete[slot.repl.Mtime]:
792                         slot.mnt.KeepService.AddTrash(Trash{
793                                 SizedDigest: blkid,
794                                 Mtime:       slot.repl.Mtime,
795                                 From:        slot.mnt,
796                         })
797                         change = changeTrash
798                 case len(blk.Replicas) > 0 && slot.repl == nil && slot.want && !slot.mnt.ReadOnly:
799                         slot.mnt.KeepService.AddPull(Pull{
800                                 SizedDigest: blkid,
801                                 From:        blk.Replicas[0].KeepMount.KeepService,
802                                 To:          slot.mnt,
803                         })
804                         change = changePull
805                 case slot.repl != nil:
806                         change = changeStay
807                 default:
808                         change = changeNone
809                 }
810                 if bal.Dumper != nil {
811                         var mtime int64
812                         if slot.repl != nil {
813                                 mtime = slot.repl.Mtime
814                         }
815                         srv := slot.mnt.KeepService
816                         changes = append(changes, fmt.Sprintf("%s:%d/%s=%s,%d", srv.ServiceHost, srv.ServicePort, slot.mnt.UUID, changeName[change], mtime))
817                 }
818         }
819         if bal.Dumper != nil {
820                 bal.Dumper.Printf("%s refs=%d have=%d want=%v %v %v", blkid, blk.RefCount, have, want, blk.Desired, changes)
821         }
822         return balanceResult{
823                 blk:        blk,
824                 blkid:      blkid,
825                 have:       have,
826                 want:       want,
827                 classState: classState,
828         }
829 }
830
831 type blocksNBytes struct {
832         replicas int
833         blocks   int
834         bytes    int64
835 }
836
837 func (bb blocksNBytes) String() string {
838         return fmt.Sprintf("%d replicas (%d blocks, %d bytes)", bb.replicas, bb.blocks, bb.bytes)
839 }
840
841 type balancerStats struct {
842         lost          blocksNBytes
843         overrep       blocksNBytes
844         unref         blocksNBytes
845         garbage       blocksNBytes
846         underrep      blocksNBytes
847         unachievable  blocksNBytes
848         justright     blocksNBytes
849         desired       blocksNBytes
850         current       blocksNBytes
851         pulls         int
852         trashes       int
853         replHistogram []int
854         classStats    map[string]replicationStats
855
856         // collectionBytes / collectionBlockBytes = deduplication ratio
857         collectionBytes      int64 // sum(bytes in referenced blocks) across all collections
858         collectionBlockBytes int64 // sum(block size) across all blocks referenced by collections
859         collectionBlockRefs  int64 // sum(number of blocks referenced) across all collections
860         collectionBlocks     int64 // number of blocks referenced by any collection
861 }
862
863 func (s *balancerStats) dedupByteRatio() float64 {
864         if s.collectionBlockBytes == 0 {
865                 return 0
866         }
867         return float64(s.collectionBytes) / float64(s.collectionBlockBytes)
868 }
869
870 func (s *balancerStats) dedupBlockRatio() float64 {
871         if s.collectionBlocks == 0 {
872                 return 0
873         }
874         return float64(s.collectionBlockRefs) / float64(s.collectionBlocks)
875 }
876
877 type replicationStats struct {
878         desired      blocksNBytes
879         surplus      blocksNBytes
880         short        blocksNBytes
881         unachievable blocksNBytes
882 }
883
884 type balancedBlockState struct {
885         desired      int
886         surplus      int
887         unachievable bool
888 }
889
890 func (bal *Balancer) collectStatistics(results <-chan balanceResult) {
891         var s balancerStats
892         s.replHistogram = make([]int, 2)
893         s.classStats = make(map[string]replicationStats, len(bal.classes))
894         for result := range results {
895                 surplus := result.have - result.want
896                 bytes := result.blkid.Size()
897
898                 if rc := int64(result.blk.RefCount); rc > 0 {
899                         s.collectionBytes += rc * bytes
900                         s.collectionBlockBytes += bytes
901                         s.collectionBlockRefs += rc
902                         s.collectionBlocks++
903                 }
904
905                 for class, state := range result.classState {
906                         cs := s.classStats[class]
907                         if state.unachievable {
908                                 cs.unachievable.blocks++
909                                 cs.unachievable.bytes += bytes
910                         }
911                         if state.desired > 0 {
912                                 cs.desired.replicas += state.desired
913                                 cs.desired.blocks++
914                                 cs.desired.bytes += bytes * int64(state.desired)
915                         }
916                         if state.surplus > 0 {
917                                 cs.surplus.replicas += state.surplus
918                                 cs.surplus.blocks++
919                                 cs.surplus.bytes += bytes * int64(state.surplus)
920                         } else if state.surplus < 0 {
921                                 cs.short.replicas += -state.surplus
922                                 cs.short.blocks++
923                                 cs.short.bytes += bytes * int64(-state.surplus)
924                         }
925                         s.classStats[class] = cs
926                 }
927
928                 switch {
929                 case result.have == 0 && result.want > 0:
930                         s.lost.replicas -= surplus
931                         s.lost.blocks++
932                         s.lost.bytes += bytes * int64(-surplus)
933                         fmt.Fprintf(bal.lostBlocks, "%s", strings.SplitN(string(result.blkid), "+", 2)[0])
934                         for pdh := range result.blk.Refs {
935                                 fmt.Fprintf(bal.lostBlocks, " %s", pdh)
936                         }
937                         fmt.Fprint(bal.lostBlocks, "\n")
938                 case surplus < 0:
939                         s.underrep.replicas -= surplus
940                         s.underrep.blocks++
941                         s.underrep.bytes += bytes * int64(-surplus)
942                 case surplus > 0 && result.want == 0:
943                         counter := &s.garbage
944                         for _, r := range result.blk.Replicas {
945                                 if r.Mtime >= bal.MinMtime {
946                                         counter = &s.unref
947                                         break
948                                 }
949                         }
950                         counter.replicas += surplus
951                         counter.blocks++
952                         counter.bytes += bytes * int64(surplus)
953                 case surplus > 0:
954                         s.overrep.replicas += surplus
955                         s.overrep.blocks++
956                         s.overrep.bytes += bytes * int64(result.have-result.want)
957                 default:
958                         s.justright.replicas += result.want
959                         s.justright.blocks++
960                         s.justright.bytes += bytes * int64(result.want)
961                 }
962
963                 if result.want > 0 {
964                         s.desired.replicas += result.want
965                         s.desired.blocks++
966                         s.desired.bytes += bytes * int64(result.want)
967                 }
968                 if result.have > 0 {
969                         s.current.replicas += result.have
970                         s.current.blocks++
971                         s.current.bytes += bytes * int64(result.have)
972                 }
973
974                 for len(s.replHistogram) <= result.have {
975                         s.replHistogram = append(s.replHistogram, 0)
976                 }
977                 s.replHistogram[result.have]++
978         }
979         for _, srv := range bal.KeepServices {
980                 s.pulls += len(srv.ChangeSet.Pulls)
981                 s.trashes += len(srv.ChangeSet.Trashes)
982         }
983         bal.stats = s
984         bal.Metrics.UpdateStats(s)
985 }
986
987 // PrintStatistics writes statistics about the computed changes to
988 // bal.Logger. It should not be called until ComputeChangeSets has
989 // finished.
990 func (bal *Balancer) PrintStatistics() {
991         bal.logf("===")
992         bal.logf("%s lost (0=have<want)", bal.stats.lost)
993         bal.logf("%s underreplicated (0<have<want)", bal.stats.underrep)
994         bal.logf("%s just right (have=want)", bal.stats.justright)
995         bal.logf("%s overreplicated (have>want>0)", bal.stats.overrep)
996         bal.logf("%s unreferenced (have>want=0, new)", bal.stats.unref)
997         bal.logf("%s garbage (have>want=0, old)", bal.stats.garbage)
998         for _, class := range bal.classes {
999                 cs := bal.stats.classStats[class]
1000                 bal.logf("===")
1001                 bal.logf("storage class %q: %s desired", class, cs.desired)
1002                 bal.logf("storage class %q: %s short", class, cs.short)
1003                 bal.logf("storage class %q: %s surplus", class, cs.surplus)
1004                 bal.logf("storage class %q: %s unachievable", class, cs.unachievable)
1005         }
1006         bal.logf("===")
1007         bal.logf("%s total commitment (excluding unreferenced)", bal.stats.desired)
1008         bal.logf("%s total usage", bal.stats.current)
1009         bal.logf("===")
1010         for _, srv := range bal.KeepServices {
1011                 bal.logf("%s: %v\n", srv, srv.ChangeSet)
1012         }
1013         bal.logf("===")
1014         bal.printHistogram(60)
1015         bal.logf("===")
1016 }
1017
1018 func (bal *Balancer) printHistogram(hashColumns int) {
1019         bal.logf("Replication level distribution (counting N replicas on a single server as N):")
1020         maxCount := 0
1021         for _, count := range bal.stats.replHistogram {
1022                 if maxCount < count {
1023                         maxCount = count
1024                 }
1025         }
1026         hashes := strings.Repeat("#", hashColumns)
1027         countWidth := 1 + int(math.Log10(float64(maxCount+1)))
1028         scaleCount := 10 * float64(hashColumns) / math.Floor(1+10*math.Log10(float64(maxCount+1)))
1029         for repl, count := range bal.stats.replHistogram {
1030                 nHashes := int(scaleCount * math.Log10(float64(count+1)))
1031                 bal.logf("%2d: %*d %s", repl, countWidth, count, hashes[:nHashes])
1032         }
1033 }
1034
1035 // CheckSanityLate checks for configuration and runtime errors after
1036 // GetCurrentState() and ComputeChangeSets() have finished.
1037 //
1038 // If it returns an error, it is dangerous to run any Commit methods.
1039 func (bal *Balancer) CheckSanityLate() error {
1040         if bal.errors != nil {
1041                 for _, err := range bal.errors {
1042                         bal.logf("deferred error: %v", err)
1043                 }
1044                 return fmt.Errorf("cannot proceed safely after deferred errors")
1045         }
1046
1047         if bal.collScanned == 0 {
1048                 return fmt.Errorf("received zero collections")
1049         }
1050
1051         anyDesired := false
1052         bal.BlockStateMap.Apply(func(_ arvados.SizedDigest, blk *BlockState) {
1053                 for _, desired := range blk.Desired {
1054                         if desired > 0 {
1055                                 anyDesired = true
1056                                 break
1057                         }
1058                 }
1059         })
1060         if !anyDesired {
1061                 return fmt.Errorf("zero blocks have desired replication>0")
1062         }
1063
1064         if dr := bal.DefaultReplication; dr < 1 {
1065                 return fmt.Errorf("Default replication (%d) is less than 1", dr)
1066         }
1067
1068         // TODO: no two services have identical indexes
1069         // TODO: no collisions (same md5, different size)
1070         return nil
1071 }
1072
1073 // CommitPulls sends the computed lists of pull requests to the
1074 // keepstore servers. This has the effect of increasing replication of
1075 // existing blocks that are either underreplicated or poorly
1076 // distributed according to rendezvous hashing.
1077 func (bal *Balancer) CommitPulls(c *arvados.Client) error {
1078         defer bal.time("send_pull_lists", "wall clock time to send pull lists")()
1079         return bal.commitAsync(c, "send pull list",
1080                 func(srv *KeepService) error {
1081                         return srv.CommitPulls(c)
1082                 })
1083 }
1084
1085 // CommitTrash sends the computed lists of trash requests to the
1086 // keepstore servers. This has the effect of deleting blocks that are
1087 // overreplicated or unreferenced.
1088 func (bal *Balancer) CommitTrash(c *arvados.Client) error {
1089         defer bal.time("send_trash_lists", "wall clock time to send trash lists")()
1090         return bal.commitAsync(c, "send trash list",
1091                 func(srv *KeepService) error {
1092                         return srv.CommitTrash(c)
1093                 })
1094 }
1095
1096 func (bal *Balancer) commitAsync(c *arvados.Client, label string, f func(srv *KeepService) error) error {
1097         errs := make(chan error)
1098         for _, srv := range bal.KeepServices {
1099                 go func(srv *KeepService) {
1100                         var err error
1101                         defer func() { errs <- err }()
1102                         label := fmt.Sprintf("%s: %v", srv, label)
1103                         err = f(srv)
1104                         if err != nil {
1105                                 err = fmt.Errorf("%s: %v", label, err)
1106                         }
1107                 }(srv)
1108         }
1109         var lastErr error
1110         for range bal.KeepServices {
1111                 if err := <-errs; err != nil {
1112                         bal.logf("%v", err)
1113                         lastErr = err
1114                 }
1115         }
1116         close(errs)
1117         return lastErr
1118 }
1119
1120 func (bal *Balancer) logf(f string, args ...interface{}) {
1121         if bal.Logger != nil {
1122                 bal.Logger.Printf(f, args...)
1123         }
1124 }
1125
1126 func (bal *Balancer) time(name, help string) func() {
1127         observer := bal.Metrics.DurationObserver(name+"_seconds", help)
1128         t0 := time.Now()
1129         bal.Logger.Printf("%s: start", name)
1130         return func() {
1131                 dur := time.Since(t0)
1132                 observer.Observe(dur.Seconds())
1133                 bal.Logger.Printf("%s: took %vs", name, dur.Seconds())
1134         }
1135 }
1136
1137 // Rendezvous hash sort function. Less efficient than sorting on
1138 // precomputed rendezvous hashes, but also rarely used.
1139 func rendezvousLess(i, j string, blkid arvados.SizedDigest) bool {
1140         a := md5.Sum([]byte(string(blkid[:32]) + i))
1141         b := md5.Sum([]byte(string(blkid[:32]) + j))
1142         return bytes.Compare(a[:], b[:]) < 0
1143 }