Merge branch '12737-wb-rails42-upgrade'
[arvados.git] / services / keep-balance / balance.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "crypto/md5"
10         "fmt"
11         "log"
12         "math"
13         "os"
14         "runtime"
15         "sort"
16         "strings"
17         "sync"
18         "time"
19
20         "git.curoverse.com/arvados.git/sdk/go/arvados"
21         "git.curoverse.com/arvados.git/sdk/go/keepclient"
22 )
23
24 // CheckConfig returns an error if anything is wrong with the given
25 // config and runOptions.
26 func CheckConfig(config Config, runOptions RunOptions) error {
27         if len(config.KeepServiceList.Items) > 0 && config.KeepServiceTypes != nil {
28                 return fmt.Errorf("cannot specify both KeepServiceList and KeepServiceTypes in config")
29         }
30         if !runOptions.Once && config.RunPeriod == arvados.Duration(0) {
31                 return fmt.Errorf("you must either use the -once flag, or specify RunPeriod in config")
32         }
33         return nil
34 }
35
36 // Balancer compares the contents of keepstore servers with the
37 // collections stored in Arvados, and issues pull/trash requests
38 // needed to get (closer to) the optimal data layout.
39 //
40 // In the optimal data layout: every data block referenced by a
41 // collection is replicated at least as many times as desired by the
42 // collection; there are no unreferenced data blocks older than
43 // BlobSignatureTTL; and all N existing replicas of a given data block
44 // are in the N best positions in rendezvous probe order.
45 type Balancer struct {
46         *BlockStateMap
47         KeepServices       map[string]*KeepService
48         DefaultReplication int
49         Logger             *log.Logger
50         Dumper             *log.Logger
51         MinMtime           int64
52
53         classes       []string
54         mounts        int
55         mountsByClass map[string]map[*KeepMount]bool
56         collScanned   int
57         serviceRoots  map[string]string
58         errors        []error
59         stats         balancerStats
60         mutex         sync.Mutex
61 }
62
63 // Run performs a balance operation using the given config and
64 // runOptions, and returns RunOptions suitable for passing to a
65 // subsequent balance operation.
66 //
67 // Run should only be called once on a given Balancer object.
68 //
69 // Typical usage:
70 //
71 //   runOptions, err = (&Balancer{}).Run(config, runOptions)
72 func (bal *Balancer) Run(config Config, runOptions RunOptions) (nextRunOptions RunOptions, err error) {
73         nextRunOptions = runOptions
74
75         bal.Dumper = runOptions.Dumper
76         bal.Logger = runOptions.Logger
77         if bal.Logger == nil {
78                 bal.Logger = log.New(os.Stderr, "", log.LstdFlags)
79         }
80
81         defer timeMe(bal.Logger, "Run")()
82
83         if len(config.KeepServiceList.Items) > 0 {
84                 err = bal.SetKeepServices(config.KeepServiceList)
85         } else {
86                 err = bal.DiscoverKeepServices(&config.Client, config.KeepServiceTypes)
87         }
88         if err != nil {
89                 return
90         }
91
92         for _, srv := range bal.KeepServices {
93                 err = srv.discoverMounts(&config.Client)
94                 if err != nil {
95                         return
96                 }
97         }
98
99         if err = bal.CheckSanityEarly(&config.Client); err != nil {
100                 return
101         }
102         rs := bal.rendezvousState()
103         if runOptions.CommitTrash && rs != runOptions.SafeRendezvousState {
104                 if runOptions.SafeRendezvousState != "" {
105                         bal.logf("notice: KeepServices list has changed since last run")
106                 }
107                 bal.logf("clearing existing trash lists, in case the new rendezvous order differs from previous run")
108                 if err = bal.ClearTrashLists(&config.Client); err != nil {
109                         return
110                 }
111                 // The current rendezvous state becomes "safe" (i.e.,
112                 // OK to compute changes for that state without
113                 // clearing existing trash lists) only now, after we
114                 // succeed in clearing existing trash lists.
115                 nextRunOptions.SafeRendezvousState = rs
116         }
117         if err = bal.GetCurrentState(&config.Client, config.CollectionBatchSize, config.CollectionBuffers); err != nil {
118                 return
119         }
120         bal.ComputeChangeSets()
121         bal.PrintStatistics()
122         if err = bal.CheckSanityLate(); err != nil {
123                 return
124         }
125         if runOptions.CommitPulls {
126                 err = bal.CommitPulls(&config.Client)
127                 if err != nil {
128                         // Skip trash if we can't pull. (Too cautious?)
129                         return
130                 }
131         }
132         if runOptions.CommitTrash {
133                 err = bal.CommitTrash(&config.Client)
134         }
135         return
136 }
137
138 // SetKeepServices sets the list of KeepServices to operate on.
139 func (bal *Balancer) SetKeepServices(srvList arvados.KeepServiceList) error {
140         bal.KeepServices = make(map[string]*KeepService)
141         for _, srv := range srvList.Items {
142                 bal.KeepServices[srv.UUID] = &KeepService{
143                         KeepService: srv,
144                         ChangeSet:   &ChangeSet{},
145                 }
146         }
147         return nil
148 }
149
150 // DiscoverKeepServices sets the list of KeepServices by calling the
151 // API to get a list of all services, and selecting the ones whose
152 // ServiceType is in okTypes.
153 func (bal *Balancer) DiscoverKeepServices(c *arvados.Client, okTypes []string) error {
154         bal.KeepServices = make(map[string]*KeepService)
155         ok := make(map[string]bool)
156         for _, t := range okTypes {
157                 ok[t] = true
158         }
159         return c.EachKeepService(func(srv arvados.KeepService) error {
160                 if ok[srv.ServiceType] {
161                         bal.KeepServices[srv.UUID] = &KeepService{
162                                 KeepService: srv,
163                                 ChangeSet:   &ChangeSet{},
164                         }
165                 } else {
166                         bal.logf("skipping %v with service type %q", srv.UUID, srv.ServiceType)
167                 }
168                 return nil
169         })
170 }
171
172 // CheckSanityEarly checks for configuration and runtime errors that
173 // can be detected before GetCurrentState() and ComputeChangeSets()
174 // are called.
175 //
176 // If it returns an error, it is pointless to run GetCurrentState or
177 // ComputeChangeSets: after doing so, the statistics would be
178 // meaningless and it would be dangerous to run any Commit methods.
179 func (bal *Balancer) CheckSanityEarly(c *arvados.Client) error {
180         u, err := c.CurrentUser()
181         if err != nil {
182                 return fmt.Errorf("CurrentUser(): %v", err)
183         }
184         if !u.IsActive || !u.IsAdmin {
185                 return fmt.Errorf("current user (%s) is not an active admin user", u.UUID)
186         }
187         for _, srv := range bal.KeepServices {
188                 if srv.ServiceType == "proxy" {
189                         return fmt.Errorf("config error: %s: proxy servers cannot be balanced", srv)
190                 }
191         }
192         return nil
193 }
194
195 // rendezvousState returns a fingerprint (e.g., a sorted list of
196 // UUID+host+port) of the current set of keep services.
197 func (bal *Balancer) rendezvousState() string {
198         srvs := make([]string, 0, len(bal.KeepServices))
199         for _, srv := range bal.KeepServices {
200                 srvs = append(srvs, srv.String())
201         }
202         sort.Strings(srvs)
203         return strings.Join(srvs, "; ")
204 }
205
206 // ClearTrashLists sends an empty trash list to each keep
207 // service. Calling this before GetCurrentState avoids races.
208 //
209 // When a block appears in an index, we assume that replica will still
210 // exist after we delete other replicas on other servers. However,
211 // it's possible that a previous rebalancing operation made different
212 // decisions (e.g., servers were added/removed, and rendezvous order
213 // changed). In this case, the replica might already be on that
214 // server's trash list, and it might be deleted before we send a
215 // replacement trash list.
216 //
217 // We avoid this problem if we clear all trash lists before getting
218 // indexes. (We also assume there is only one rebalancing process
219 // running at a time.)
220 func (bal *Balancer) ClearTrashLists(c *arvados.Client) error {
221         for _, srv := range bal.KeepServices {
222                 srv.ChangeSet = &ChangeSet{}
223         }
224         return bal.CommitTrash(c)
225 }
226
227 // GetCurrentState determines the current replication state, and the
228 // desired replication level, for every block that is either
229 // retrievable or referenced.
230 //
231 // It determines the current replication state by reading the block index
232 // from every known Keep service.
233 //
234 // It determines the desired replication level by retrieving all
235 // collection manifests in the database (API server).
236 //
237 // It encodes the resulting information in BlockStateMap.
238 func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) error {
239         defer timeMe(bal.Logger, "GetCurrentState")()
240         bal.BlockStateMap = NewBlockStateMap()
241
242         dd, err := c.DiscoveryDocument()
243         if err != nil {
244                 return err
245         }
246         bal.DefaultReplication = dd.DefaultCollectionReplication
247         bal.MinMtime = time.Now().UnixNano() - dd.BlobSignatureTTL*1e9
248
249         errs := make(chan error, 2+len(bal.KeepServices))
250         wg := sync.WaitGroup{}
251
252         // Start one goroutine for each KeepService: retrieve the
253         // index, and add the returned blocks to BlockStateMap.
254         for _, srv := range bal.KeepServices {
255                 wg.Add(1)
256                 go func(srv *KeepService) {
257                         defer wg.Done()
258                         bal.logf("%s: retrieve indexes", srv)
259                         for _, mount := range srv.mounts {
260                                 bal.logf("%s: retrieve index", mount)
261                                 idx, err := srv.IndexMount(c, mount.UUID, "")
262                                 if err != nil {
263                                         errs <- fmt.Errorf("%s: retrieve index: %v", mount, err)
264                                         return
265                                 }
266                                 if len(errs) > 0 {
267                                         // Some other goroutine encountered an
268                                         // error -- any further effort here
269                                         // will be wasted.
270                                         return
271                                 }
272                                 bal.logf("%s: add %d replicas to map", mount, len(idx))
273                                 bal.BlockStateMap.AddReplicas(mount, idx)
274                                 bal.logf("%s: done", mount)
275                         }
276                         bal.logf("%s: done", srv)
277                 }(srv)
278         }
279
280         // collQ buffers incoming collections so we can start fetching
281         // the next page without waiting for the current page to
282         // finish processing.
283         collQ := make(chan arvados.Collection, bufs)
284
285         // Start a goroutine to process collections. (We could use a
286         // worker pool here, but even with a single worker we already
287         // process collections much faster than we can retrieve them.)
288         wg.Add(1)
289         go func() {
290                 defer wg.Done()
291                 for coll := range collQ {
292                         err := bal.addCollection(coll)
293                         if err != nil {
294                                 errs <- err
295                                 for range collQ {
296                                 }
297                                 return
298                         }
299                         bal.collScanned++
300                 }
301         }()
302
303         // Start a goroutine to retrieve all collections from the
304         // Arvados database and send them to collQ for processing.
305         wg.Add(1)
306         go func() {
307                 defer wg.Done()
308                 err = EachCollection(c, pageSize,
309                         func(coll arvados.Collection) error {
310                                 collQ <- coll
311                                 if len(errs) > 0 {
312                                         // some other GetCurrentState
313                                         // error happened: no point
314                                         // getting any more
315                                         // collections.
316                                         return fmt.Errorf("")
317                                 }
318                                 return nil
319                         }, func(done, total int) {
320                                 bal.logf("collections: %d/%d", done, total)
321                         })
322                 close(collQ)
323                 if err != nil {
324                         errs <- err
325                 }
326         }()
327
328         wg.Wait()
329         if len(errs) > 0 {
330                 return <-errs
331         }
332         return nil
333 }
334
335 func (bal *Balancer) addCollection(coll arvados.Collection) error {
336         blkids, err := coll.SizedDigests()
337         if err != nil {
338                 bal.mutex.Lock()
339                 bal.errors = append(bal.errors, fmt.Errorf("%v: %v", coll.UUID, err))
340                 bal.mutex.Unlock()
341                 return nil
342         }
343         repl := bal.DefaultReplication
344         if coll.ReplicationDesired != nil {
345                 repl = *coll.ReplicationDesired
346         }
347         debugf("%v: %d block x%d", coll.UUID, len(blkids), repl)
348         bal.BlockStateMap.IncreaseDesired(coll.StorageClassesDesired, repl, blkids)
349         return nil
350 }
351
352 // ComputeChangeSets compares, for each known block, the current and
353 // desired replication states. If it is possible to get closer to the
354 // desired state by copying or deleting blocks, it adds those changes
355 // to the relevant KeepServices' ChangeSets.
356 //
357 // It does not actually apply any of the computed changes.
358 func (bal *Balancer) ComputeChangeSets() {
359         // This just calls balanceBlock() once for each block, using a
360         // pool of worker goroutines.
361         defer timeMe(bal.Logger, "ComputeChangeSets")()
362         bal.setupLookupTables()
363
364         type balanceTask struct {
365                 blkid arvados.SizedDigest
366                 blk   *BlockState
367         }
368         nWorkers := 1 + runtime.NumCPU()
369         todo := make(chan balanceTask, nWorkers)
370         results := make(chan balanceResult, 16)
371         var wg sync.WaitGroup
372         for i := 0; i < nWorkers; i++ {
373                 wg.Add(1)
374                 go func() {
375                         for work := range todo {
376                                 results <- bal.balanceBlock(work.blkid, work.blk)
377                         }
378                         wg.Done()
379                 }()
380         }
381         bal.BlockStateMap.Apply(func(blkid arvados.SizedDigest, blk *BlockState) {
382                 todo <- balanceTask{
383                         blkid: blkid,
384                         blk:   blk,
385                 }
386         })
387         close(todo)
388         go func() {
389                 wg.Wait()
390                 close(results)
391         }()
392         bal.collectStatistics(results)
393 }
394
395 func (bal *Balancer) setupLookupTables() {
396         bal.serviceRoots = make(map[string]string)
397         bal.classes = []string{"default"}
398         bal.mountsByClass = map[string]map[*KeepMount]bool{"default": {}}
399         bal.mounts = 0
400         for _, srv := range bal.KeepServices {
401                 bal.serviceRoots[srv.UUID] = srv.UUID
402                 for _, mnt := range srv.mounts {
403                         bal.mounts++
404
405                         // All mounts on a read-only service are
406                         // effectively read-only.
407                         mnt.ReadOnly = mnt.ReadOnly || srv.ReadOnly
408
409                         if len(mnt.StorageClasses) == 0 {
410                                 bal.mountsByClass["default"][mnt] = true
411                                 continue
412                         }
413                         for _, class := range mnt.StorageClasses {
414                                 if mbc := bal.mountsByClass[class]; mbc == nil {
415                                         bal.classes = append(bal.classes, class)
416                                         bal.mountsByClass[class] = map[*KeepMount]bool{mnt: true}
417                                 } else {
418                                         mbc[mnt] = true
419                                 }
420                         }
421                 }
422         }
423         // Consider classes in lexicographic order to avoid flapping
424         // between balancing runs.  The outcome of the "prefer a mount
425         // we're already planning to use for a different storage
426         // class" case in balanceBlock depends on the order classes
427         // are considered.
428         sort.Strings(bal.classes)
429 }
430
431 const (
432         changeStay = iota
433         changePull
434         changeTrash
435         changeNone
436 )
437
438 var changeName = map[int]string{
439         changeStay:  "stay",
440         changePull:  "pull",
441         changeTrash: "trash",
442         changeNone:  "none",
443 }
444
445 type balanceResult struct {
446         blk        *BlockState
447         blkid      arvados.SizedDigest
448         have       int
449         want       int
450         classState map[string]balancedBlockState
451 }
452
453 // balanceBlock compares current state to desired state for a single
454 // block, and makes the appropriate ChangeSet calls.
455 func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) balanceResult {
456         debugf("balanceBlock: %v %+v", blkid, blk)
457
458         type slot struct {
459                 mnt  *KeepMount // never nil
460                 repl *Replica   // replica already stored here (or nil)
461                 want bool       // we should pull/leave a replica here
462         }
463
464         // Build a list of all slots (one per mounted volume).
465         slots := make([]slot, 0, bal.mounts)
466         for _, srv := range bal.KeepServices {
467                 for _, mnt := range srv.mounts {
468                         var repl *Replica
469                         for r := range blk.Replicas {
470                                 if blk.Replicas[r].KeepMount == mnt {
471                                         repl = &blk.Replicas[r]
472                                 }
473                         }
474                         // Initial value of "want" is "have, and can't
475                         // delete". These untrashable replicas get
476                         // prioritized when sorting slots: otherwise,
477                         // non-optimal readonly copies would cause us
478                         // to overreplicate.
479                         slots = append(slots, slot{
480                                 mnt:  mnt,
481                                 repl: repl,
482                                 want: repl != nil && (mnt.ReadOnly || repl.Mtime >= bal.MinMtime),
483                         })
484                 }
485         }
486
487         uuids := keepclient.NewRootSorter(bal.serviceRoots, string(blkid[:32])).GetSortedRoots()
488         srvRendezvous := make(map[*KeepService]int, len(uuids))
489         for i, uuid := range uuids {
490                 srv := bal.KeepServices[uuid]
491                 srvRendezvous[srv] = i
492         }
493
494         // Below we set underreplicated=true if we find any storage
495         // class that's currently underreplicated -- in that case we
496         // won't want to trash any replicas.
497         underreplicated := false
498
499         classState := make(map[string]balancedBlockState, len(bal.classes))
500         unsafeToDelete := make(map[int64]bool, len(slots))
501         for _, class := range bal.classes {
502                 desired := blk.Desired[class]
503
504                 have := 0
505                 for _, slot := range slots {
506                         if slot.repl != nil && bal.mountsByClass[class][slot.mnt] {
507                                 have++
508                         }
509                 }
510                 classState[class] = balancedBlockState{
511                         desired: desired,
512                         surplus: have - desired,
513                 }
514
515                 if desired == 0 {
516                         continue
517                 }
518
519                 // Sort the slots by desirability.
520                 sort.Slice(slots, func(i, j int) bool {
521                         si, sj := slots[i], slots[j]
522                         if classi, classj := bal.mountsByClass[class][si.mnt], bal.mountsByClass[class][sj.mnt]; classi != classj {
523                                 // Prefer a mount that satisfies the
524                                 // desired class.
525                                 return bal.mountsByClass[class][si.mnt]
526                         } else if wanti, wantj := si.want, si.want; wanti != wantj {
527                                 // Prefer a mount that will have a
528                                 // replica no matter what we do here
529                                 // -- either because it already has an
530                                 // untrashable replica, or because we
531                                 // already need it to satisfy a
532                                 // different storage class.
533                                 return slots[i].want
534                         } else if orderi, orderj := srvRendezvous[si.mnt.KeepService], srvRendezvous[sj.mnt.KeepService]; orderi != orderj {
535                                 // Prefer a better rendezvous
536                                 // position.
537                                 return orderi < orderj
538                         } else if repli, replj := si.repl != nil, sj.repl != nil; repli != replj {
539                                 // Prefer a mount that already has a
540                                 // replica.
541                                 return repli
542                         } else {
543                                 // If pull/trash turns out to be
544                                 // needed, distribute the
545                                 // new/remaining replicas uniformly
546                                 // across qualifying mounts on a given
547                                 // server.
548                                 return rendezvousLess(si.mnt.DeviceID, sj.mnt.DeviceID, blkid)
549                         }
550                 })
551
552                 // Servers and mounts (with or without existing
553                 // replicas) that are part of the best achievable
554                 // layout for this storage class.
555                 wantSrv := map[*KeepService]bool{}
556                 wantMnt := map[*KeepMount]bool{}
557                 // Positions (with existing replicas) that have been
558                 // protected (via unsafeToDelete) to ensure we don't
559                 // reduce replication below desired level when
560                 // trashing replicas that aren't optimal positions for
561                 // any storage class.
562                 protMnt := map[*KeepMount]bool{}
563
564                 // trySlot tries using a slot to meet requirements,
565                 // and returns true if all requirements are met.
566                 trySlot := func(i int) bool {
567                         slot := slots[i]
568                         if len(protMnt) < desired && slot.repl != nil {
569                                 unsafeToDelete[slot.repl.Mtime] = true
570                                 protMnt[slot.mnt] = true
571                         }
572                         if len(wantMnt) < desired && (slot.repl != nil || !slot.mnt.ReadOnly) {
573                                 slots[i].want = true
574                                 wantSrv[slot.mnt.KeepService] = true
575                                 wantMnt[slot.mnt] = true
576                         }
577                         return len(protMnt) >= desired && len(wantMnt) >= desired
578                 }
579
580                 // First try to achieve desired replication without
581                 // using the same server twice.
582                 done := false
583                 for i := 0; i < len(slots) && !done; i++ {
584                         if !wantSrv[slots[i].mnt.KeepService] {
585                                 done = trySlot(i)
586                         }
587                 }
588
589                 // If that didn't suffice, do another pass without the
590                 // "distinct services" restriction. (Achieving the
591                 // desired volume replication on fewer than the
592                 // desired number of services is better than
593                 // underreplicating.)
594                 for i := 0; i < len(slots) && !done; i++ {
595                         done = trySlot(i)
596                 }
597
598                 if !underreplicated {
599                         safe := 0
600                         for _, slot := range slots {
601                                 if slot.repl == nil || !bal.mountsByClass[class][slot.mnt] {
602                                         continue
603                                 }
604                                 if safe++; safe >= desired {
605                                         break
606                                 }
607                         }
608                         underreplicated = safe < desired
609                 }
610
611                 // set the unachievable flag if there aren't enough
612                 // slots offering the relevant storage class. (This is
613                 // as easy as checking slots[desired] because we
614                 // already sorted the qualifying slots to the front.)
615                 if desired >= len(slots) || !bal.mountsByClass[class][slots[desired].mnt] {
616                         cs := classState[class]
617                         cs.unachievable = true
618                         classState[class] = cs
619                 }
620         }
621
622         // TODO: If multiple replicas are trashable, prefer the oldest
623         // replica that doesn't have a timestamp collision with
624         // others.
625
626         var have, want int
627         for _, slot := range slots {
628                 if slot.want {
629                         want++
630                 }
631                 if slot.repl != nil {
632                         have++
633                 }
634         }
635
636         var changes []string
637         for _, slot := range slots {
638                 // TODO: request a Touch if Mtime is duplicated.
639                 var change int
640                 switch {
641                 case !underreplicated && slot.repl != nil && !slot.want && !unsafeToDelete[slot.repl.Mtime]:
642                         slot.mnt.KeepService.AddTrash(Trash{
643                                 SizedDigest: blkid,
644                                 Mtime:       slot.repl.Mtime,
645                                 From:        slot.mnt,
646                         })
647                         change = changeTrash
648                 case len(blk.Replicas) == 0:
649                         change = changeNone
650                 case slot.repl == nil && slot.want && !slot.mnt.ReadOnly:
651                         slot.mnt.KeepService.AddPull(Pull{
652                                 SizedDigest: blkid,
653                                 From:        blk.Replicas[0].KeepMount.KeepService,
654                                 To:          slot.mnt,
655                         })
656                         change = changePull
657                 default:
658                         change = changeStay
659                 }
660                 if bal.Dumper != nil {
661                         var mtime int64
662                         if slot.repl != nil {
663                                 mtime = slot.repl.Mtime
664                         }
665                         srv := slot.mnt.KeepService
666                         changes = append(changes, fmt.Sprintf("%s:%d/%s=%s,%d", srv.ServiceHost, srv.ServicePort, slot.mnt.UUID, changeName[change], mtime))
667                 }
668         }
669         if bal.Dumper != nil {
670                 bal.Dumper.Printf("%s have=%d want=%v %s", blkid, have, want, strings.Join(changes, " "))
671         }
672         return balanceResult{
673                 blk:        blk,
674                 blkid:      blkid,
675                 have:       have,
676                 want:       want,
677                 classState: classState,
678         }
679 }
680
681 type blocksNBytes struct {
682         replicas int
683         blocks   int
684         bytes    int64
685 }
686
687 func (bb blocksNBytes) String() string {
688         return fmt.Sprintf("%d replicas (%d blocks, %d bytes)", bb.replicas, bb.blocks, bb.bytes)
689 }
690
691 type balancerStats struct {
692         lost          blocksNBytes
693         overrep       blocksNBytes
694         unref         blocksNBytes
695         garbage       blocksNBytes
696         underrep      blocksNBytes
697         unachievable  blocksNBytes
698         justright     blocksNBytes
699         desired       blocksNBytes
700         current       blocksNBytes
701         pulls         int
702         trashes       int
703         replHistogram []int
704         classStats    map[string]replicationStats
705 }
706
707 type replicationStats struct {
708         desired      blocksNBytes
709         surplus      blocksNBytes
710         short        blocksNBytes
711         unachievable blocksNBytes
712 }
713
714 type balancedBlockState struct {
715         desired      int
716         surplus      int
717         unachievable bool
718 }
719
720 func (bal *Balancer) collectStatistics(results <-chan balanceResult) {
721         var s balancerStats
722         s.replHistogram = make([]int, 2)
723         s.classStats = make(map[string]replicationStats, len(bal.classes))
724         for result := range results {
725                 surplus := result.have - result.want
726                 bytes := result.blkid.Size()
727
728                 for class, state := range result.classState {
729                         cs := s.classStats[class]
730                         if state.unachievable {
731                                 cs.unachievable.blocks++
732                                 cs.unachievable.bytes += bytes
733                         }
734                         if state.desired > 0 {
735                                 cs.desired.replicas += state.desired
736                                 cs.desired.blocks++
737                                 cs.desired.bytes += bytes * int64(state.desired)
738                         }
739                         if state.surplus > 0 {
740                                 cs.surplus.replicas += state.surplus
741                                 cs.surplus.blocks++
742                                 cs.surplus.bytes += bytes * int64(state.surplus)
743                         } else if state.surplus < 0 {
744                                 cs.short.replicas += -state.surplus
745                                 cs.short.blocks++
746                                 cs.short.bytes += bytes * int64(-state.surplus)
747                         }
748                         s.classStats[class] = cs
749                 }
750
751                 switch {
752                 case result.have == 0 && result.want > 0:
753                         s.lost.replicas -= surplus
754                         s.lost.blocks++
755                         s.lost.bytes += bytes * int64(-surplus)
756                 case surplus < 0:
757                         s.underrep.replicas -= surplus
758                         s.underrep.blocks++
759                         s.underrep.bytes += bytes * int64(-surplus)
760                 case surplus > 0 && result.want == 0:
761                         counter := &s.garbage
762                         for _, r := range result.blk.Replicas {
763                                 if r.Mtime >= bal.MinMtime {
764                                         counter = &s.unref
765                                         break
766                                 }
767                         }
768                         counter.replicas += surplus
769                         counter.blocks++
770                         counter.bytes += bytes * int64(surplus)
771                 case surplus > 0:
772                         s.overrep.replicas += surplus
773                         s.overrep.blocks++
774                         s.overrep.bytes += bytes * int64(len(result.blk.Replicas)-result.want)
775                 default:
776                         s.justright.replicas += result.want
777                         s.justright.blocks++
778                         s.justright.bytes += bytes * int64(result.want)
779                 }
780
781                 if result.want > 0 {
782                         s.desired.replicas += result.want
783                         s.desired.blocks++
784                         s.desired.bytes += bytes * int64(result.want)
785                 }
786                 if len(result.blk.Replicas) > 0 {
787                         s.current.replicas += len(result.blk.Replicas)
788                         s.current.blocks++
789                         s.current.bytes += bytes * int64(len(result.blk.Replicas))
790                 }
791
792                 for len(s.replHistogram) <= len(result.blk.Replicas) {
793                         s.replHistogram = append(s.replHistogram, 0)
794                 }
795                 s.replHistogram[len(result.blk.Replicas)]++
796         }
797         for _, srv := range bal.KeepServices {
798                 s.pulls += len(srv.ChangeSet.Pulls)
799                 s.trashes += len(srv.ChangeSet.Trashes)
800         }
801         bal.stats = s
802 }
803
804 // PrintStatistics writes statistics about the computed changes to
805 // bal.Logger. It should not be called until ComputeChangeSets has
806 // finished.
807 func (bal *Balancer) PrintStatistics() {
808         bal.logf("===")
809         bal.logf("%s lost (0=have<want)", bal.stats.lost)
810         bal.logf("%s underreplicated (0<have<want)", bal.stats.underrep)
811         bal.logf("%s just right (have=want)", bal.stats.justright)
812         bal.logf("%s overreplicated (have>want>0)", bal.stats.overrep)
813         bal.logf("%s unreferenced (have>want=0, new)", bal.stats.unref)
814         bal.logf("%s garbage (have>want=0, old)", bal.stats.garbage)
815         for _, class := range bal.classes {
816                 cs := bal.stats.classStats[class]
817                 bal.logf("===")
818                 bal.logf("storage class %q: %s desired", class, cs.desired)
819                 bal.logf("storage class %q: %s short", class, cs.short)
820                 bal.logf("storage class %q: %s surplus", class, cs.surplus)
821                 bal.logf("storage class %q: %s unachievable", class, cs.unachievable)
822         }
823         bal.logf("===")
824         bal.logf("%s total commitment (excluding unreferenced)", bal.stats.desired)
825         bal.logf("%s total usage", bal.stats.current)
826         bal.logf("===")
827         for _, srv := range bal.KeepServices {
828                 bal.logf("%s: %v\n", srv, srv.ChangeSet)
829         }
830         bal.logf("===")
831         bal.printHistogram(60)
832         bal.logf("===")
833 }
834
835 func (bal *Balancer) printHistogram(hashColumns int) {
836         bal.logf("Replication level distribution (counting N replicas on a single server as N):")
837         maxCount := 0
838         for _, count := range bal.stats.replHistogram {
839                 if maxCount < count {
840                         maxCount = count
841                 }
842         }
843         hashes := strings.Repeat("#", hashColumns)
844         countWidth := 1 + int(math.Log10(float64(maxCount+1)))
845         scaleCount := 10 * float64(hashColumns) / math.Floor(1+10*math.Log10(float64(maxCount+1)))
846         for repl, count := range bal.stats.replHistogram {
847                 nHashes := int(scaleCount * math.Log10(float64(count+1)))
848                 bal.logf("%2d: %*d %s", repl, countWidth, count, hashes[:nHashes])
849         }
850 }
851
852 // CheckSanityLate checks for configuration and runtime errors after
853 // GetCurrentState() and ComputeChangeSets() have finished.
854 //
855 // If it returns an error, it is dangerous to run any Commit methods.
856 func (bal *Balancer) CheckSanityLate() error {
857         if bal.errors != nil {
858                 for _, err := range bal.errors {
859                         bal.logf("deferred error: %v", err)
860                 }
861                 return fmt.Errorf("cannot proceed safely after deferred errors")
862         }
863
864         if bal.collScanned == 0 {
865                 return fmt.Errorf("received zero collections")
866         }
867
868         anyDesired := false
869         bal.BlockStateMap.Apply(func(_ arvados.SizedDigest, blk *BlockState) {
870                 for _, desired := range blk.Desired {
871                         if desired > 0 {
872                                 anyDesired = true
873                                 break
874                         }
875                 }
876         })
877         if !anyDesired {
878                 return fmt.Errorf("zero blocks have desired replication>0")
879         }
880
881         if dr := bal.DefaultReplication; dr < 1 {
882                 return fmt.Errorf("Default replication (%d) is less than 1", dr)
883         }
884
885         // TODO: no two services have identical indexes
886         // TODO: no collisions (same md5, different size)
887         return nil
888 }
889
890 // CommitPulls sends the computed lists of pull requests to the
891 // keepstore servers. This has the effect of increasing replication of
892 // existing blocks that are either underreplicated or poorly
893 // distributed according to rendezvous hashing.
894 func (bal *Balancer) CommitPulls(c *arvados.Client) error {
895         return bal.commitAsync(c, "send pull list",
896                 func(srv *KeepService) error {
897                         return srv.CommitPulls(c)
898                 })
899 }
900
901 // CommitTrash sends the computed lists of trash requests to the
902 // keepstore servers. This has the effect of deleting blocks that are
903 // overreplicated or unreferenced.
904 func (bal *Balancer) CommitTrash(c *arvados.Client) error {
905         return bal.commitAsync(c, "send trash list",
906                 func(srv *KeepService) error {
907                         return srv.CommitTrash(c)
908                 })
909 }
910
911 func (bal *Balancer) commitAsync(c *arvados.Client, label string, f func(srv *KeepService) error) error {
912         errs := make(chan error)
913         for _, srv := range bal.KeepServices {
914                 go func(srv *KeepService) {
915                         var err error
916                         defer func() { errs <- err }()
917                         label := fmt.Sprintf("%s: %v", srv, label)
918                         defer timeMe(bal.Logger, label)()
919                         err = f(srv)
920                         if err != nil {
921                                 err = fmt.Errorf("%s: %v", label, err)
922                         }
923                 }(srv)
924         }
925         var lastErr error
926         for range bal.KeepServices {
927                 if err := <-errs; err != nil {
928                         bal.logf("%v", err)
929                         lastErr = err
930                 }
931         }
932         close(errs)
933         return lastErr
934 }
935
936 func (bal *Balancer) logf(f string, args ...interface{}) {
937         if bal.Logger != nil {
938                 bal.Logger.Printf(f, args...)
939         }
940 }
941
942 // Rendezvous hash sort function. Less efficient than sorting on
943 // precomputed rendezvous hashes, but also rarely used.
944 func rendezvousLess(i, j string, blkid arvados.SizedDigest) bool {
945         a := md5.Sum([]byte(string(blkid[:32]) + i))
946         b := md5.Sum([]byte(string(blkid[:32]) + j))
947         return bytes.Compare(a[:], b[:]) < 0
948 }