Merge branch '12708-balance-storage-classes'
[arvados.git] / services / keep-balance / balance.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "crypto/md5"
10         "fmt"
11         "log"
12         "math"
13         "os"
14         "runtime"
15         "sort"
16         "strings"
17         "sync"
18         "time"
19
20         "git.curoverse.com/arvados.git/sdk/go/arvados"
21         "git.curoverse.com/arvados.git/sdk/go/keepclient"
22 )
23
24 // CheckConfig returns an error if anything is wrong with the given
25 // config and runOptions.
26 func CheckConfig(config Config, runOptions RunOptions) error {
27         if len(config.KeepServiceList.Items) > 0 && config.KeepServiceTypes != nil {
28                 return fmt.Errorf("cannot specify both KeepServiceList and KeepServiceTypes in config")
29         }
30         if !runOptions.Once && config.RunPeriod == arvados.Duration(0) {
31                 return fmt.Errorf("you must either use the -once flag, or specify RunPeriod in config")
32         }
33         return nil
34 }
35
36 // Balancer compares the contents of keepstore servers with the
37 // collections stored in Arvados, and issues pull/trash requests
38 // needed to get (closer to) the optimal data layout.
39 //
40 // In the optimal data layout: every data block referenced by a
41 // collection is replicated at least as many times as desired by the
42 // collection; there are no unreferenced data blocks older than
43 // BlobSignatureTTL; and all N existing replicas of a given data block
44 // are in the N best positions in rendezvous probe order.
45 type Balancer struct {
46         *BlockStateMap
47         KeepServices       map[string]*KeepService
48         DefaultReplication int
49         Logger             *log.Logger
50         Dumper             *log.Logger
51         MinMtime           int64
52
53         classes       []string
54         mounts        int
55         mountsByClass map[string]map[*KeepMount]bool
56         collScanned   int
57         serviceRoots  map[string]string
58         errors        []error
59         stats         balancerStats
60         mutex         sync.Mutex
61 }
62
63 // Run performs a balance operation using the given config and
64 // runOptions, and returns RunOptions suitable for passing to a
65 // subsequent balance operation.
66 //
67 // Run should only be called once on a given Balancer object.
68 //
69 // Typical usage:
70 //
71 //   runOptions, err = (&Balancer{}).Run(config, runOptions)
72 func (bal *Balancer) Run(config Config, runOptions RunOptions) (nextRunOptions RunOptions, err error) {
73         nextRunOptions = runOptions
74
75         bal.Dumper = runOptions.Dumper
76         bal.Logger = runOptions.Logger
77         if bal.Logger == nil {
78                 bal.Logger = log.New(os.Stderr, "", log.LstdFlags)
79         }
80
81         defer timeMe(bal.Logger, "Run")()
82
83         if len(config.KeepServiceList.Items) > 0 {
84                 err = bal.SetKeepServices(config.KeepServiceList)
85         } else {
86                 err = bal.DiscoverKeepServices(&config.Client, config.KeepServiceTypes)
87         }
88         if err != nil {
89                 return
90         }
91
92         for _, srv := range bal.KeepServices {
93                 err = srv.discoverMounts(&config.Client)
94                 if err != nil {
95                         return
96                 }
97         }
98
99         if err = bal.CheckSanityEarly(&config.Client); err != nil {
100                 return
101         }
102         rs := bal.rendezvousState()
103         if runOptions.CommitTrash && rs != runOptions.SafeRendezvousState {
104                 if runOptions.SafeRendezvousState != "" {
105                         bal.logf("notice: KeepServices list has changed since last run")
106                 }
107                 bal.logf("clearing existing trash lists, in case the new rendezvous order differs from previous run")
108                 if err = bal.ClearTrashLists(&config.Client); err != nil {
109                         return
110                 }
111                 // The current rendezvous state becomes "safe" (i.e.,
112                 // OK to compute changes for that state without
113                 // clearing existing trash lists) only now, after we
114                 // succeed in clearing existing trash lists.
115                 nextRunOptions.SafeRendezvousState = rs
116         }
117         if err = bal.GetCurrentState(&config.Client, config.CollectionBatchSize, config.CollectionBuffers); err != nil {
118                 return
119         }
120         bal.ComputeChangeSets()
121         bal.PrintStatistics()
122         if err = bal.CheckSanityLate(); err != nil {
123                 return
124         }
125         if runOptions.CommitPulls {
126                 err = bal.CommitPulls(&config.Client)
127                 if err != nil {
128                         // Skip trash if we can't pull. (Too cautious?)
129                         return
130                 }
131         }
132         if runOptions.CommitTrash {
133                 err = bal.CommitTrash(&config.Client)
134         }
135         return
136 }
137
138 // SetKeepServices sets the list of KeepServices to operate on.
139 func (bal *Balancer) SetKeepServices(srvList arvados.KeepServiceList) error {
140         bal.KeepServices = make(map[string]*KeepService)
141         for _, srv := range srvList.Items {
142                 bal.KeepServices[srv.UUID] = &KeepService{
143                         KeepService: srv,
144                         ChangeSet:   &ChangeSet{},
145                 }
146         }
147         return nil
148 }
149
150 // DiscoverKeepServices sets the list of KeepServices by calling the
151 // API to get a list of all services, and selecting the ones whose
152 // ServiceType is in okTypes.
153 func (bal *Balancer) DiscoverKeepServices(c *arvados.Client, okTypes []string) error {
154         bal.KeepServices = make(map[string]*KeepService)
155         ok := make(map[string]bool)
156         for _, t := range okTypes {
157                 ok[t] = true
158         }
159         return c.EachKeepService(func(srv arvados.KeepService) error {
160                 if ok[srv.ServiceType] {
161                         bal.KeepServices[srv.UUID] = &KeepService{
162                                 KeepService: srv,
163                                 ChangeSet:   &ChangeSet{},
164                         }
165                 } else {
166                         bal.logf("skipping %v with service type %q", srv.UUID, srv.ServiceType)
167                 }
168                 return nil
169         })
170 }
171
172 // CheckSanityEarly checks for configuration and runtime errors that
173 // can be detected before GetCurrentState() and ComputeChangeSets()
174 // are called.
175 //
176 // If it returns an error, it is pointless to run GetCurrentState or
177 // ComputeChangeSets: after doing so, the statistics would be
178 // meaningless and it would be dangerous to run any Commit methods.
179 func (bal *Balancer) CheckSanityEarly(c *arvados.Client) error {
180         u, err := c.CurrentUser()
181         if err != nil {
182                 return fmt.Errorf("CurrentUser(): %v", err)
183         }
184         if !u.IsActive || !u.IsAdmin {
185                 return fmt.Errorf("current user (%s) is not an active admin user", u.UUID)
186         }
187         for _, srv := range bal.KeepServices {
188                 if srv.ServiceType == "proxy" {
189                         return fmt.Errorf("config error: %s: proxy servers cannot be balanced", srv)
190                 }
191         }
192         return nil
193 }
194
195 // rendezvousState returns a fingerprint (e.g., a sorted list of
196 // UUID+host+port) of the current set of keep services.
197 func (bal *Balancer) rendezvousState() string {
198         srvs := make([]string, 0, len(bal.KeepServices))
199         for _, srv := range bal.KeepServices {
200                 srvs = append(srvs, srv.String())
201         }
202         sort.Strings(srvs)
203         return strings.Join(srvs, "; ")
204 }
205
206 // ClearTrashLists sends an empty trash list to each keep
207 // service. Calling this before GetCurrentState avoids races.
208 //
209 // When a block appears in an index, we assume that replica will still
210 // exist after we delete other replicas on other servers. However,
211 // it's possible that a previous rebalancing operation made different
212 // decisions (e.g., servers were added/removed, and rendezvous order
213 // changed). In this case, the replica might already be on that
214 // server's trash list, and it might be deleted before we send a
215 // replacement trash list.
216 //
217 // We avoid this problem if we clear all trash lists before getting
218 // indexes. (We also assume there is only one rebalancing process
219 // running at a time.)
220 func (bal *Balancer) ClearTrashLists(c *arvados.Client) error {
221         for _, srv := range bal.KeepServices {
222                 srv.ChangeSet = &ChangeSet{}
223         }
224         return bal.CommitTrash(c)
225 }
226
227 // GetCurrentState determines the current replication state, and the
228 // desired replication level, for every block that is either
229 // retrievable or referenced.
230 //
231 // It determines the current replication state by reading the block index
232 // from every known Keep service.
233 //
234 // It determines the desired replication level by retrieving all
235 // collection manifests in the database (API server).
236 //
237 // It encodes the resulting information in BlockStateMap.
238 func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) error {
239         defer timeMe(bal.Logger, "GetCurrentState")()
240         bal.BlockStateMap = NewBlockStateMap()
241
242         dd, err := c.DiscoveryDocument()
243         if err != nil {
244                 return err
245         }
246         bal.DefaultReplication = dd.DefaultCollectionReplication
247         bal.MinMtime = time.Now().UnixNano() - dd.BlobSignatureTTL*1e9
248
249         errs := make(chan error, 2+len(bal.KeepServices))
250         wg := sync.WaitGroup{}
251
252         // Start one goroutine for each KeepService: retrieve the
253         // index, and add the returned blocks to BlockStateMap.
254         for _, srv := range bal.KeepServices {
255                 wg.Add(1)
256                 go func(srv *KeepService) {
257                         defer wg.Done()
258                         bal.logf("%s: retrieve indexes", srv)
259                         for _, mount := range srv.mounts {
260                                 bal.logf("%s: retrieve index", mount)
261                                 idx, err := srv.IndexMount(c, mount.UUID, "")
262                                 if err != nil {
263                                         errs <- fmt.Errorf("%s: retrieve index: %v", mount, err)
264                                         return
265                                 }
266                                 if len(errs) > 0 {
267                                         // Some other goroutine encountered an
268                                         // error -- any further effort here
269                                         // will be wasted.
270                                         return
271                                 }
272                                 bal.logf("%s: add %d replicas to map", mount, len(idx))
273                                 bal.BlockStateMap.AddReplicas(mount, idx)
274                                 bal.logf("%s: done", mount)
275                         }
276                         bal.logf("%s: done", srv)
277                 }(srv)
278         }
279
280         // collQ buffers incoming collections so we can start fetching
281         // the next page without waiting for the current page to
282         // finish processing.
283         collQ := make(chan arvados.Collection, bufs)
284
285         // Start a goroutine to process collections. (We could use a
286         // worker pool here, but even with a single worker we already
287         // process collections much faster than we can retrieve them.)
288         wg.Add(1)
289         go func() {
290                 defer wg.Done()
291                 for coll := range collQ {
292                         err := bal.addCollection(coll)
293                         if err != nil {
294                                 errs <- err
295                                 for range collQ {
296                                 }
297                                 return
298                         }
299                         bal.collScanned++
300                 }
301         }()
302
303         // Start a goroutine to retrieve all collections from the
304         // Arvados database and send them to collQ for processing.
305         wg.Add(1)
306         go func() {
307                 defer wg.Done()
308                 err = EachCollection(c, pageSize,
309                         func(coll arvados.Collection) error {
310                                 collQ <- coll
311                                 if len(errs) > 0 {
312                                         // some other GetCurrentState
313                                         // error happened: no point
314                                         // getting any more
315                                         // collections.
316                                         return fmt.Errorf("")
317                                 }
318                                 return nil
319                         }, func(done, total int) {
320                                 bal.logf("collections: %d/%d", done, total)
321                         })
322                 close(collQ)
323                 if err != nil {
324                         errs <- err
325                 }
326         }()
327
328         wg.Wait()
329         if len(errs) > 0 {
330                 return <-errs
331         }
332         return nil
333 }
334
335 func (bal *Balancer) addCollection(coll arvados.Collection) error {
336         blkids, err := coll.SizedDigests()
337         if err != nil {
338                 bal.mutex.Lock()
339                 bal.errors = append(bal.errors, fmt.Errorf("%v: %v", coll.UUID, err))
340                 bal.mutex.Unlock()
341                 return nil
342         }
343         repl := bal.DefaultReplication
344         if coll.ReplicationDesired != nil {
345                 repl = *coll.ReplicationDesired
346         }
347         debugf("%v: %d block x%d", coll.UUID, len(blkids), repl)
348         bal.BlockStateMap.IncreaseDesired(coll.StorageClassesDesired, repl, blkids)
349         return nil
350 }
351
352 // ComputeChangeSets compares, for each known block, the current and
353 // desired replication states. If it is possible to get closer to the
354 // desired state by copying or deleting blocks, it adds those changes
355 // to the relevant KeepServices' ChangeSets.
356 //
357 // It does not actually apply any of the computed changes.
358 func (bal *Balancer) ComputeChangeSets() {
359         // This just calls balanceBlock() once for each block, using a
360         // pool of worker goroutines.
361         defer timeMe(bal.Logger, "ComputeChangeSets")()
362         bal.setupLookupTables()
363
364         type balanceTask struct {
365                 blkid arvados.SizedDigest
366                 blk   *BlockState
367         }
368         nWorkers := 1 + runtime.NumCPU()
369         todo := make(chan balanceTask, nWorkers)
370         results := make(chan balanceResult, 16)
371         var wg sync.WaitGroup
372         for i := 0; i < nWorkers; i++ {
373                 wg.Add(1)
374                 go func() {
375                         for work := range todo {
376                                 results <- bal.balanceBlock(work.blkid, work.blk)
377                         }
378                         wg.Done()
379                 }()
380         }
381         bal.BlockStateMap.Apply(func(blkid arvados.SizedDigest, blk *BlockState) {
382                 todo <- balanceTask{
383                         blkid: blkid,
384                         blk:   blk,
385                 }
386         })
387         close(todo)
388         go func() {
389                 wg.Wait()
390                 close(results)
391         }()
392         bal.collectStatistics(results)
393 }
394
395 func (bal *Balancer) setupLookupTables() {
396         bal.serviceRoots = make(map[string]string)
397         bal.classes = []string{"default"}
398         bal.mountsByClass = map[string]map[*KeepMount]bool{"default": {}}
399         bal.mounts = 0
400         for _, srv := range bal.KeepServices {
401                 bal.serviceRoots[srv.UUID] = srv.UUID
402                 for _, mnt := range srv.mounts {
403                         bal.mounts++
404
405                         // All mounts on a read-only service are
406                         // effectively read-only.
407                         mnt.ReadOnly = mnt.ReadOnly || srv.ReadOnly
408
409                         if len(mnt.StorageClasses) == 0 {
410                                 bal.mountsByClass["default"][mnt] = true
411                                 continue
412                         }
413                         for _, class := range mnt.StorageClasses {
414                                 if mbc := bal.mountsByClass[class]; mbc == nil {
415                                         bal.classes = append(bal.classes, class)
416                                         bal.mountsByClass[class] = map[*KeepMount]bool{mnt: true}
417                                 } else {
418                                         mbc[mnt] = true
419                                 }
420                         }
421                 }
422         }
423         // Consider classes in lexicographic order to avoid flapping
424         // between balancing runs.  The outcome of the "prefer a mount
425         // we're already planning to use for a different storage
426         // class" case in balanceBlock depends on the order classes
427         // are considered.
428         sort.Strings(bal.classes)
429 }
430
431 const (
432         changeStay = iota
433         changePull
434         changeTrash
435         changeNone
436 )
437
438 var changeName = map[int]string{
439         changeStay:  "stay",
440         changePull:  "pull",
441         changeTrash: "trash",
442         changeNone:  "none",
443 }
444
445 type balanceResult struct {
446         blk   *BlockState
447         blkid arvados.SizedDigest
448         have  int
449         want  int
450 }
451
452 // balanceBlock compares current state to desired state for a single
453 // block, and makes the appropriate ChangeSet calls.
454 func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) balanceResult {
455         debugf("balanceBlock: %v %+v", blkid, blk)
456
457         type slot struct {
458                 mnt  *KeepMount // never nil
459                 repl *Replica   // replica already stored here (or nil)
460                 want bool       // we should pull/leave a replica here
461         }
462
463         // Build a list of all slots (one per mounted volume).
464         slots := make([]slot, 0, bal.mounts)
465         for _, srv := range bal.KeepServices {
466                 for _, mnt := range srv.mounts {
467                         var repl *Replica
468                         for r := range blk.Replicas {
469                                 if blk.Replicas[r].KeepMount == mnt {
470                                         repl = &blk.Replicas[r]
471                                 }
472                         }
473                         // Initial value of "want" is "have, and can't
474                         // delete". These untrashable replicas get
475                         // prioritized when sorting slots: otherwise,
476                         // non-optimal readonly copies would cause us
477                         // to overreplicate.
478                         slots = append(slots, slot{
479                                 mnt:  mnt,
480                                 repl: repl,
481                                 want: repl != nil && (mnt.ReadOnly || repl.Mtime >= bal.MinMtime),
482                         })
483                 }
484         }
485
486         uuids := keepclient.NewRootSorter(bal.serviceRoots, string(blkid[:32])).GetSortedRoots()
487         srvRendezvous := make(map[*KeepService]int, len(uuids))
488         for i, uuid := range uuids {
489                 srv := bal.KeepServices[uuid]
490                 srvRendezvous[srv] = i
491         }
492
493         // Below we set underreplicated=true if we find any storage
494         // class that's currently underreplicated -- in that case we
495         // won't want to trash any replicas.
496         underreplicated := false
497
498         unsafeToDelete := make(map[int64]bool, len(slots))
499         for _, class := range bal.classes {
500                 desired := blk.Desired[class]
501                 if desired == 0 {
502                         continue
503                 }
504                 // Sort the slots by desirability.
505                 sort.Slice(slots, func(i, j int) bool {
506                         si, sj := slots[i], slots[j]
507                         if classi, classj := bal.mountsByClass[class][si.mnt], bal.mountsByClass[class][sj.mnt]; classi != classj {
508                                 // Prefer a mount that satisfies the
509                                 // desired class.
510                                 return bal.mountsByClass[class][si.mnt]
511                         } else if wanti, wantj := si.want, si.want; wanti != wantj {
512                                 // Prefer a mount that will have a
513                                 // replica no matter what we do here
514                                 // -- either because it already has an
515                                 // untrashable replica, or because we
516                                 // already need it to satisfy a
517                                 // different storage class.
518                                 return slots[i].want
519                         } else if orderi, orderj := srvRendezvous[si.mnt.KeepService], srvRendezvous[sj.mnt.KeepService]; orderi != orderj {
520                                 // Prefer a better rendezvous
521                                 // position.
522                                 return orderi < orderj
523                         } else if repli, replj := si.repl != nil, sj.repl != nil; repli != replj {
524                                 // Prefer a mount that already has a
525                                 // replica.
526                                 return repli
527                         } else {
528                                 // If pull/trash turns out to be
529                                 // needed, distribute the
530                                 // new/remaining replicas uniformly
531                                 // across qualifying mounts on a given
532                                 // server.
533                                 return rendezvousLess(si.mnt.DeviceID, sj.mnt.DeviceID, blkid)
534                         }
535                 })
536
537                 // Servers and mounts (with or without existing
538                 // replicas) that are part of the best achievable
539                 // layout for this storage class.
540                 wantSrv := map[*KeepService]bool{}
541                 wantMnt := map[*KeepMount]bool{}
542                 // Positions (with existing replicas) that have been
543                 // protected (via unsafeToDelete) to ensure we don't
544                 // reduce replication below desired level when
545                 // trashing replicas that aren't optimal positions for
546                 // any storage class.
547                 protMnt := map[*KeepMount]bool{}
548
549                 // trySlot tries using a slot to meet requirements,
550                 // and returns true if all requirements are met.
551                 trySlot := func(i int) bool {
552                         slot := slots[i]
553                         if len(protMnt) < desired && slot.repl != nil {
554                                 unsafeToDelete[slot.repl.Mtime] = true
555                                 protMnt[slot.mnt] = true
556                         }
557                         if len(wantMnt) < desired && (slot.repl != nil || !slot.mnt.ReadOnly) {
558                                 slots[i].want = true
559                                 wantSrv[slot.mnt.KeepService] = true
560                                 wantMnt[slot.mnt] = true
561                         }
562                         return len(protMnt) >= desired && len(wantMnt) >= desired
563                 }
564
565                 // First try to achieve desired replication without
566                 // using the same server twice.
567                 done := false
568                 for i := 0; i < len(slots) && !done; i++ {
569                         if !wantSrv[slots[i].mnt.KeepService] {
570                                 done = trySlot(i)
571                         }
572                 }
573
574                 // If that didn't suffice, do another pass without the
575                 // "distinct services" restriction. (Achieving the
576                 // desired volume replication on fewer than the
577                 // desired number of services is better than
578                 // underreplicating.)
579                 for i := 0; i < len(slots) && !done; i++ {
580                         done = trySlot(i)
581                 }
582
583                 if !underreplicated {
584                         safe := 0
585                         for _, slot := range slots {
586                                 if slot.repl == nil || !bal.mountsByClass[class][slot.mnt] {
587                                         continue
588                                 }
589                                 if safe++; safe >= desired {
590                                         break
591                                 }
592                         }
593                         underreplicated = safe < desired
594                 }
595         }
596
597         // TODO: If multiple replicas are trashable, prefer the oldest
598         // replica that doesn't have a timestamp collision with
599         // others.
600
601         var have, want int
602         for _, slot := range slots {
603                 if slot.want {
604                         want++
605                 }
606                 if slot.repl != nil {
607                         have++
608                 }
609         }
610
611         var changes []string
612         for _, slot := range slots {
613                 // TODO: request a Touch if Mtime is duplicated.
614                 var change int
615                 switch {
616                 case !underreplicated && slot.repl != nil && !slot.want && !unsafeToDelete[slot.repl.Mtime]:
617                         slot.mnt.KeepService.AddTrash(Trash{
618                                 SizedDigest: blkid,
619                                 Mtime:       slot.repl.Mtime,
620                                 From:        slot.mnt,
621                         })
622                         change = changeTrash
623                 case len(blk.Replicas) == 0:
624                         change = changeNone
625                 case slot.repl == nil && slot.want && !slot.mnt.ReadOnly:
626                         slot.mnt.KeepService.AddPull(Pull{
627                                 SizedDigest: blkid,
628                                 From:        blk.Replicas[0].KeepMount.KeepService,
629                                 To:          slot.mnt,
630                         })
631                         change = changePull
632                 default:
633                         change = changeStay
634                 }
635                 if bal.Dumper != nil {
636                         var mtime int64
637                         if slot.repl != nil {
638                                 mtime = slot.repl.Mtime
639                         }
640                         srv := slot.mnt.KeepService
641                         changes = append(changes, fmt.Sprintf("%s:%d/%s=%s,%d", srv.ServiceHost, srv.ServicePort, slot.mnt.UUID, changeName[change], mtime))
642                 }
643         }
644         if bal.Dumper != nil {
645                 bal.Dumper.Printf("%s have=%d want=%v %s", blkid, have, want, strings.Join(changes, " "))
646         }
647         return balanceResult{
648                 blk:   blk,
649                 blkid: blkid,
650                 have:  have,
651                 want:  want,
652         }
653 }
654
655 type blocksNBytes struct {
656         replicas int
657         blocks   int
658         bytes    int64
659 }
660
661 func (bb blocksNBytes) String() string {
662         return fmt.Sprintf("%d replicas (%d blocks, %d bytes)", bb.replicas, bb.blocks, bb.bytes)
663 }
664
665 type balancerStats struct {
666         lost, overrep, unref, garbage, underrep, justright blocksNBytes
667         desired, current                                   blocksNBytes
668         pulls, trashes                                     int
669         replHistogram                                      []int
670 }
671
672 func (bal *Balancer) collectStatistics(results <-chan balanceResult) {
673         var s balancerStats
674         s.replHistogram = make([]int, 2)
675         for result := range results {
676                 surplus := result.have - result.want
677                 bytes := result.blkid.Size()
678                 switch {
679                 case result.have == 0 && result.want > 0:
680                         s.lost.replicas -= surplus
681                         s.lost.blocks++
682                         s.lost.bytes += bytes * int64(-surplus)
683                 case surplus < 0:
684                         s.underrep.replicas -= surplus
685                         s.underrep.blocks++
686                         s.underrep.bytes += bytes * int64(-surplus)
687                 case surplus > 0 && result.want == 0:
688                         counter := &s.garbage
689                         for _, r := range result.blk.Replicas {
690                                 if r.Mtime >= bal.MinMtime {
691                                         counter = &s.unref
692                                         break
693                                 }
694                         }
695                         counter.replicas += surplus
696                         counter.blocks++
697                         counter.bytes += bytes * int64(surplus)
698                 case surplus > 0:
699                         s.overrep.replicas += surplus
700                         s.overrep.blocks++
701                         s.overrep.bytes += bytes * int64(len(result.blk.Replicas)-result.want)
702                 default:
703                         s.justright.replicas += result.want
704                         s.justright.blocks++
705                         s.justright.bytes += bytes * int64(result.want)
706                 }
707
708                 if result.want > 0 {
709                         s.desired.replicas += result.want
710                         s.desired.blocks++
711                         s.desired.bytes += bytes * int64(result.want)
712                 }
713                 if len(result.blk.Replicas) > 0 {
714                         s.current.replicas += len(result.blk.Replicas)
715                         s.current.blocks++
716                         s.current.bytes += bytes * int64(len(result.blk.Replicas))
717                 }
718
719                 for len(s.replHistogram) <= len(result.blk.Replicas) {
720                         s.replHistogram = append(s.replHistogram, 0)
721                 }
722                 s.replHistogram[len(result.blk.Replicas)]++
723         }
724         for _, srv := range bal.KeepServices {
725                 s.pulls += len(srv.ChangeSet.Pulls)
726                 s.trashes += len(srv.ChangeSet.Trashes)
727         }
728         bal.stats = s
729 }
730
731 // PrintStatistics writes statistics about the computed changes to
732 // bal.Logger. It should not be called until ComputeChangeSets has
733 // finished.
734 func (bal *Balancer) PrintStatistics() {
735         bal.logf("===")
736         bal.logf("%s lost (0=have<want)", bal.stats.lost)
737         bal.logf("%s underreplicated (0<have<want)", bal.stats.underrep)
738         bal.logf("%s just right (have=want)", bal.stats.justright)
739         bal.logf("%s overreplicated (have>want>0)", bal.stats.overrep)
740         bal.logf("%s unreferenced (have>want=0, new)", bal.stats.unref)
741         bal.logf("%s garbage (have>want=0, old)", bal.stats.garbage)
742         bal.logf("===")
743         bal.logf("%s total commitment (excluding unreferenced)", bal.stats.desired)
744         bal.logf("%s total usage", bal.stats.current)
745         bal.logf("===")
746         for _, srv := range bal.KeepServices {
747                 bal.logf("%s: %v\n", srv, srv.ChangeSet)
748         }
749         bal.logf("===")
750         bal.printHistogram(60)
751         bal.logf("===")
752 }
753
754 func (bal *Balancer) printHistogram(hashColumns int) {
755         bal.logf("Replication level distribution (counting N replicas on a single server as N):")
756         maxCount := 0
757         for _, count := range bal.stats.replHistogram {
758                 if maxCount < count {
759                         maxCount = count
760                 }
761         }
762         hashes := strings.Repeat("#", hashColumns)
763         countWidth := 1 + int(math.Log10(float64(maxCount+1)))
764         scaleCount := 10 * float64(hashColumns) / math.Floor(1+10*math.Log10(float64(maxCount+1)))
765         for repl, count := range bal.stats.replHistogram {
766                 nHashes := int(scaleCount * math.Log10(float64(count+1)))
767                 bal.logf("%2d: %*d %s", repl, countWidth, count, hashes[:nHashes])
768         }
769 }
770
771 // CheckSanityLate checks for configuration and runtime errors after
772 // GetCurrentState() and ComputeChangeSets() have finished.
773 //
774 // If it returns an error, it is dangerous to run any Commit methods.
775 func (bal *Balancer) CheckSanityLate() error {
776         if bal.errors != nil {
777                 for _, err := range bal.errors {
778                         bal.logf("deferred error: %v", err)
779                 }
780                 return fmt.Errorf("cannot proceed safely after deferred errors")
781         }
782
783         if bal.collScanned == 0 {
784                 return fmt.Errorf("received zero collections")
785         }
786
787         anyDesired := false
788         bal.BlockStateMap.Apply(func(_ arvados.SizedDigest, blk *BlockState) {
789                 for _, desired := range blk.Desired {
790                         if desired > 0 {
791                                 anyDesired = true
792                                 break
793                         }
794                 }
795         })
796         if !anyDesired {
797                 return fmt.Errorf("zero blocks have desired replication>0")
798         }
799
800         if dr := bal.DefaultReplication; dr < 1 {
801                 return fmt.Errorf("Default replication (%d) is less than 1", dr)
802         }
803
804         // TODO: no two services have identical indexes
805         // TODO: no collisions (same md5, different size)
806         return nil
807 }
808
809 // CommitPulls sends the computed lists of pull requests to the
810 // keepstore servers. This has the effect of increasing replication of
811 // existing blocks that are either underreplicated or poorly
812 // distributed according to rendezvous hashing.
813 func (bal *Balancer) CommitPulls(c *arvados.Client) error {
814         return bal.commitAsync(c, "send pull list",
815                 func(srv *KeepService) error {
816                         return srv.CommitPulls(c)
817                 })
818 }
819
820 // CommitTrash sends the computed lists of trash requests to the
821 // keepstore servers. This has the effect of deleting blocks that are
822 // overreplicated or unreferenced.
823 func (bal *Balancer) CommitTrash(c *arvados.Client) error {
824         return bal.commitAsync(c, "send trash list",
825                 func(srv *KeepService) error {
826                         return srv.CommitTrash(c)
827                 })
828 }
829
830 func (bal *Balancer) commitAsync(c *arvados.Client, label string, f func(srv *KeepService) error) error {
831         errs := make(chan error)
832         for _, srv := range bal.KeepServices {
833                 go func(srv *KeepService) {
834                         var err error
835                         defer func() { errs <- err }()
836                         label := fmt.Sprintf("%s: %v", srv, label)
837                         defer timeMe(bal.Logger, label)()
838                         err = f(srv)
839                         if err != nil {
840                                 err = fmt.Errorf("%s: %v", label, err)
841                         }
842                 }(srv)
843         }
844         var lastErr error
845         for range bal.KeepServices {
846                 if err := <-errs; err != nil {
847                         bal.logf("%v", err)
848                         lastErr = err
849                 }
850         }
851         close(errs)
852         return lastErr
853 }
854
855 func (bal *Balancer) logf(f string, args ...interface{}) {
856         if bal.Logger != nil {
857                 bal.Logger.Printf(f, args...)
858         }
859 }
860
861 // Rendezvous hash sort function. Less efficient than sorting on
862 // precomputed rendezvous hashes, but also rarely used.
863 func rendezvousLess(i, j string, blkid arvados.SizedDigest) bool {
864         a := md5.Sum([]byte(string(blkid[:32]) + i))
865         b := md5.Sum([]byte(string(blkid[:32]) + j))
866         return bytes.Compare(a[:], b[:]) < 0
867 }