13547: Merge branch 'master' into 13547-respect-insecure-flag-when-talking-ssl-to...
[arvados.git] / services / keep-balance / balance.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "crypto/md5"
10         "fmt"
11         "log"
12         "math"
13         "os"
14         "runtime"
15         "sort"
16         "strings"
17         "sync"
18         "time"
19
20         "git.curoverse.com/arvados.git/sdk/go/arvados"
21         "git.curoverse.com/arvados.git/sdk/go/keepclient"
22 )
23
24 // CheckConfig returns an error if anything is wrong with the given
25 // config and runOptions.
26 func CheckConfig(config Config, runOptions RunOptions) error {
27         if len(config.KeepServiceList.Items) > 0 && config.KeepServiceTypes != nil {
28                 return fmt.Errorf("cannot specify both KeepServiceList and KeepServiceTypes in config")
29         }
30         if !runOptions.Once && config.RunPeriod == arvados.Duration(0) {
31                 return fmt.Errorf("you must either use the -once flag, or specify RunPeriod in config")
32         }
33         return nil
34 }
35
36 // Balancer compares the contents of keepstore servers with the
37 // collections stored in Arvados, and issues pull/trash requests
38 // needed to get (closer to) the optimal data layout.
39 //
40 // In the optimal data layout: every data block referenced by a
41 // collection is replicated at least as many times as desired by the
42 // collection; there are no unreferenced data blocks older than
43 // BlobSignatureTTL; and all N existing replicas of a given data block
44 // are in the N best positions in rendezvous probe order.
45 type Balancer struct {
46         *BlockStateMap
47         KeepServices       map[string]*KeepService
48         DefaultReplication int
49         Logger             *log.Logger
50         Dumper             *log.Logger
51         MinMtime           int64
52
53         classes       []string
54         mounts        int
55         mountsByClass map[string]map[*KeepMount]bool
56         collScanned   int
57         serviceRoots  map[string]string
58         errors        []error
59         stats         balancerStats
60         mutex         sync.Mutex
61 }
62
63 // Run performs a balance operation using the given config and
64 // runOptions, and returns RunOptions suitable for passing to a
65 // subsequent balance operation.
66 //
67 // Run should only be called once on a given Balancer object.
68 //
69 // Typical usage:
70 //
71 //   runOptions, err = (&Balancer{}).Run(config, runOptions)
72 func (bal *Balancer) Run(config Config, runOptions RunOptions) (nextRunOptions RunOptions, err error) {
73         nextRunOptions = runOptions
74
75         bal.Dumper = runOptions.Dumper
76         bal.Logger = runOptions.Logger
77         if bal.Logger == nil {
78                 bal.Logger = log.New(os.Stderr, "", log.LstdFlags)
79         }
80
81         defer timeMe(bal.Logger, "Run")()
82
83         if len(config.KeepServiceList.Items) > 0 {
84                 err = bal.SetKeepServices(config.KeepServiceList)
85         } else {
86                 err = bal.DiscoverKeepServices(&config.Client, config.KeepServiceTypes)
87         }
88         if err != nil {
89                 return
90         }
91
92         for _, srv := range bal.KeepServices {
93                 err = srv.discoverMounts(&config.Client)
94                 if err != nil {
95                         return
96                 }
97         }
98         bal.cleanupMounts()
99
100         if err = bal.CheckSanityEarly(&config.Client); err != nil {
101                 return
102         }
103         rs := bal.rendezvousState()
104         if runOptions.CommitTrash && rs != runOptions.SafeRendezvousState {
105                 if runOptions.SafeRendezvousState != "" {
106                         bal.logf("notice: KeepServices list has changed since last run")
107                 }
108                 bal.logf("clearing existing trash lists, in case the new rendezvous order differs from previous run")
109                 if err = bal.ClearTrashLists(&config.Client); err != nil {
110                         return
111                 }
112                 // The current rendezvous state becomes "safe" (i.e.,
113                 // OK to compute changes for that state without
114                 // clearing existing trash lists) only now, after we
115                 // succeed in clearing existing trash lists.
116                 nextRunOptions.SafeRendezvousState = rs
117         }
118         if err = bal.GetCurrentState(&config.Client, config.CollectionBatchSize, config.CollectionBuffers); err != nil {
119                 return
120         }
121         bal.ComputeChangeSets()
122         bal.PrintStatistics()
123         if err = bal.CheckSanityLate(); err != nil {
124                 return
125         }
126         if runOptions.CommitPulls {
127                 err = bal.CommitPulls(&config.Client)
128                 if err != nil {
129                         // Skip trash if we can't pull. (Too cautious?)
130                         return
131                 }
132         }
133         if runOptions.CommitTrash {
134                 err = bal.CommitTrash(&config.Client)
135         }
136         return
137 }
138
139 // SetKeepServices sets the list of KeepServices to operate on.
140 func (bal *Balancer) SetKeepServices(srvList arvados.KeepServiceList) error {
141         bal.KeepServices = make(map[string]*KeepService)
142         for _, srv := range srvList.Items {
143                 bal.KeepServices[srv.UUID] = &KeepService{
144                         KeepService: srv,
145                         ChangeSet:   &ChangeSet{},
146                 }
147         }
148         return nil
149 }
150
151 // DiscoverKeepServices sets the list of KeepServices by calling the
152 // API to get a list of all services, and selecting the ones whose
153 // ServiceType is in okTypes.
154 func (bal *Balancer) DiscoverKeepServices(c *arvados.Client, okTypes []string) error {
155         bal.KeepServices = make(map[string]*KeepService)
156         ok := make(map[string]bool)
157         for _, t := range okTypes {
158                 ok[t] = true
159         }
160         return c.EachKeepService(func(srv arvados.KeepService) error {
161                 if ok[srv.ServiceType] {
162                         bal.KeepServices[srv.UUID] = &KeepService{
163                                 KeepService: srv,
164                                 ChangeSet:   &ChangeSet{},
165                         }
166                 } else {
167                         bal.logf("skipping %v with service type %q", srv.UUID, srv.ServiceType)
168                 }
169                 return nil
170         })
171 }
172
173 func (bal *Balancer) cleanupMounts() {
174         rwdev := map[string]*KeepService{}
175         for _, srv := range bal.KeepServices {
176                 for _, mnt := range srv.mounts {
177                         if !mnt.ReadOnly && mnt.DeviceID != "" {
178                                 rwdev[mnt.DeviceID] = srv
179                         }
180                 }
181         }
182         // Drop the readonly mounts whose device is mounted RW
183         // elsewhere.
184         for _, srv := range bal.KeepServices {
185                 var dedup []*KeepMount
186                 for _, mnt := range srv.mounts {
187                         if mnt.ReadOnly && rwdev[mnt.DeviceID] != nil {
188                                 bal.logf("skipping srv %s readonly mount %q because same device %q is mounted read-write on srv %s", srv, mnt.UUID, mnt.DeviceID, rwdev[mnt.DeviceID])
189                         } else {
190                                 dedup = append(dedup, mnt)
191                         }
192                 }
193                 srv.mounts = dedup
194         }
195         for _, srv := range bal.KeepServices {
196                 for _, mnt := range srv.mounts {
197                         if mnt.Replication <= 0 {
198                                 log.Printf("%s: mount %s reports replication=%d, using replication=1", srv, mnt.UUID, mnt.Replication)
199                                 mnt.Replication = 1
200                         }
201                 }
202         }
203 }
204
205 // CheckSanityEarly checks for configuration and runtime errors that
206 // can be detected before GetCurrentState() and ComputeChangeSets()
207 // are called.
208 //
209 // If it returns an error, it is pointless to run GetCurrentState or
210 // ComputeChangeSets: after doing so, the statistics would be
211 // meaningless and it would be dangerous to run any Commit methods.
212 func (bal *Balancer) CheckSanityEarly(c *arvados.Client) error {
213         u, err := c.CurrentUser()
214         if err != nil {
215                 return fmt.Errorf("CurrentUser(): %v", err)
216         }
217         if !u.IsActive || !u.IsAdmin {
218                 return fmt.Errorf("current user (%s) is not an active admin user", u.UUID)
219         }
220         for _, srv := range bal.KeepServices {
221                 if srv.ServiceType == "proxy" {
222                         return fmt.Errorf("config error: %s: proxy servers cannot be balanced", srv)
223                 }
224         }
225         return nil
226 }
227
228 // rendezvousState returns a fingerprint (e.g., a sorted list of
229 // UUID+host+port) of the current set of keep services.
230 func (bal *Balancer) rendezvousState() string {
231         srvs := make([]string, 0, len(bal.KeepServices))
232         for _, srv := range bal.KeepServices {
233                 srvs = append(srvs, srv.String())
234         }
235         sort.Strings(srvs)
236         return strings.Join(srvs, "; ")
237 }
238
239 // ClearTrashLists sends an empty trash list to each keep
240 // service. Calling this before GetCurrentState avoids races.
241 //
242 // When a block appears in an index, we assume that replica will still
243 // exist after we delete other replicas on other servers. However,
244 // it's possible that a previous rebalancing operation made different
245 // decisions (e.g., servers were added/removed, and rendezvous order
246 // changed). In this case, the replica might already be on that
247 // server's trash list, and it might be deleted before we send a
248 // replacement trash list.
249 //
250 // We avoid this problem if we clear all trash lists before getting
251 // indexes. (We also assume there is only one rebalancing process
252 // running at a time.)
253 func (bal *Balancer) ClearTrashLists(c *arvados.Client) error {
254         for _, srv := range bal.KeepServices {
255                 srv.ChangeSet = &ChangeSet{}
256         }
257         return bal.CommitTrash(c)
258 }
259
260 // GetCurrentState determines the current replication state, and the
261 // desired replication level, for every block that is either
262 // retrievable or referenced.
263 //
264 // It determines the current replication state by reading the block index
265 // from every known Keep service.
266 //
267 // It determines the desired replication level by retrieving all
268 // collection manifests in the database (API server).
269 //
270 // It encodes the resulting information in BlockStateMap.
271 func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) error {
272         defer timeMe(bal.Logger, "GetCurrentState")()
273         bal.BlockStateMap = NewBlockStateMap()
274
275         dd, err := c.DiscoveryDocument()
276         if err != nil {
277                 return err
278         }
279         bal.DefaultReplication = dd.DefaultCollectionReplication
280         bal.MinMtime = time.Now().UnixNano() - dd.BlobSignatureTTL*1e9
281
282         errs := make(chan error, 2+len(bal.KeepServices))
283         wg := sync.WaitGroup{}
284
285         // When a device is mounted more than once, we will get its
286         // index only once, and call AddReplicas on all of the mounts.
287         // equivMount keys are the mounts that will be indexed, and
288         // each value is a list of mounts to apply the received index
289         // to.
290         equivMount := map[*KeepMount][]*KeepMount{}
291         // deviceMount maps each device ID to the one mount that will
292         // be indexed for that device.
293         deviceMount := map[string]*KeepMount{}
294         for _, srv := range bal.KeepServices {
295                 for _, mnt := range srv.mounts {
296                         equiv := deviceMount[mnt.DeviceID]
297                         if equiv == nil {
298                                 equiv = mnt
299                                 if mnt.DeviceID != "" {
300                                         deviceMount[mnt.DeviceID] = equiv
301                                 }
302                         }
303                         equivMount[equiv] = append(equivMount[equiv], mnt)
304                 }
305         }
306
307         // Start one goroutine for each (non-redundant) mount:
308         // retrieve the index, and add the returned blocks to
309         // BlockStateMap.
310         for _, mounts := range equivMount {
311                 wg.Add(1)
312                 go func(mounts []*KeepMount) {
313                         defer wg.Done()
314                         bal.logf("mount %s: retrieve index from %s", mounts[0], mounts[0].KeepService)
315                         idx, err := mounts[0].KeepService.IndexMount(c, mounts[0].UUID, "")
316                         if err != nil {
317                                 errs <- fmt.Errorf("%s: retrieve index: %v", mounts[0], err)
318                                 return
319                         }
320                         if len(errs) > 0 {
321                                 // Some other goroutine encountered an
322                                 // error -- any further effort here
323                                 // will be wasted.
324                                 return
325                         }
326                         for _, mount := range mounts {
327                                 bal.logf("%s: add %d replicas to map", mount, len(idx))
328                                 bal.BlockStateMap.AddReplicas(mount, idx)
329                                 bal.logf("%s: added %d replicas", mount, len(idx))
330                         }
331                         bal.logf("mount %s: index done", mounts[0])
332                 }(mounts)
333         }
334
335         // collQ buffers incoming collections so we can start fetching
336         // the next page without waiting for the current page to
337         // finish processing.
338         collQ := make(chan arvados.Collection, bufs)
339
340         // Start a goroutine to process collections. (We could use a
341         // worker pool here, but even with a single worker we already
342         // process collections much faster than we can retrieve them.)
343         wg.Add(1)
344         go func() {
345                 defer wg.Done()
346                 for coll := range collQ {
347                         err := bal.addCollection(coll)
348                         if err != nil {
349                                 errs <- err
350                                 for range collQ {
351                                 }
352                                 return
353                         }
354                         bal.collScanned++
355                 }
356         }()
357
358         // Start a goroutine to retrieve all collections from the
359         // Arvados database and send them to collQ for processing.
360         wg.Add(1)
361         go func() {
362                 defer wg.Done()
363                 err = EachCollection(c, pageSize,
364                         func(coll arvados.Collection) error {
365                                 collQ <- coll
366                                 if len(errs) > 0 {
367                                         // some other GetCurrentState
368                                         // error happened: no point
369                                         // getting any more
370                                         // collections.
371                                         return fmt.Errorf("")
372                                 }
373                                 return nil
374                         }, func(done, total int) {
375                                 bal.logf("collections: %d/%d", done, total)
376                         })
377                 close(collQ)
378                 if err != nil {
379                         errs <- err
380                 }
381         }()
382
383         wg.Wait()
384         if len(errs) > 0 {
385                 return <-errs
386         }
387         return nil
388 }
389
390 func (bal *Balancer) addCollection(coll arvados.Collection) error {
391         blkids, err := coll.SizedDigests()
392         if err != nil {
393                 bal.mutex.Lock()
394                 bal.errors = append(bal.errors, fmt.Errorf("%v: %v", coll.UUID, err))
395                 bal.mutex.Unlock()
396                 return nil
397         }
398         repl := bal.DefaultReplication
399         if coll.ReplicationDesired != nil {
400                 repl = *coll.ReplicationDesired
401         }
402         debugf("%v: %d block x%d", coll.UUID, len(blkids), repl)
403         bal.BlockStateMap.IncreaseDesired(coll.StorageClassesDesired, repl, blkids)
404         return nil
405 }
406
407 // ComputeChangeSets compares, for each known block, the current and
408 // desired replication states. If it is possible to get closer to the
409 // desired state by copying or deleting blocks, it adds those changes
410 // to the relevant KeepServices' ChangeSets.
411 //
412 // It does not actually apply any of the computed changes.
413 func (bal *Balancer) ComputeChangeSets() {
414         // This just calls balanceBlock() once for each block, using a
415         // pool of worker goroutines.
416         defer timeMe(bal.Logger, "ComputeChangeSets")()
417         bal.setupLookupTables()
418
419         type balanceTask struct {
420                 blkid arvados.SizedDigest
421                 blk   *BlockState
422         }
423         workers := runtime.GOMAXPROCS(-1)
424         todo := make(chan balanceTask, workers)
425         go func() {
426                 bal.BlockStateMap.Apply(func(blkid arvados.SizedDigest, blk *BlockState) {
427                         todo <- balanceTask{
428                                 blkid: blkid,
429                                 blk:   blk,
430                         }
431                 })
432                 close(todo)
433         }()
434         results := make(chan balanceResult, workers)
435         go func() {
436                 var wg sync.WaitGroup
437                 for i := 0; i < workers; i++ {
438                         wg.Add(1)
439                         go func() {
440                                 for work := range todo {
441                                         results <- bal.balanceBlock(work.blkid, work.blk)
442                                 }
443                                 wg.Done()
444                         }()
445                 }
446                 wg.Wait()
447                 close(results)
448         }()
449         bal.collectStatistics(results)
450 }
451
452 func (bal *Balancer) setupLookupTables() {
453         bal.serviceRoots = make(map[string]string)
454         bal.classes = []string{"default"}
455         bal.mountsByClass = map[string]map[*KeepMount]bool{"default": {}}
456         bal.mounts = 0
457         for _, srv := range bal.KeepServices {
458                 bal.serviceRoots[srv.UUID] = srv.UUID
459                 for _, mnt := range srv.mounts {
460                         bal.mounts++
461
462                         // All mounts on a read-only service are
463                         // effectively read-only.
464                         mnt.ReadOnly = mnt.ReadOnly || srv.ReadOnly
465
466                         if len(mnt.StorageClasses) == 0 {
467                                 bal.mountsByClass["default"][mnt] = true
468                                 continue
469                         }
470                         for _, class := range mnt.StorageClasses {
471                                 if mbc := bal.mountsByClass[class]; mbc == nil {
472                                         bal.classes = append(bal.classes, class)
473                                         bal.mountsByClass[class] = map[*KeepMount]bool{mnt: true}
474                                 } else {
475                                         mbc[mnt] = true
476                                 }
477                         }
478                 }
479         }
480         // Consider classes in lexicographic order to avoid flapping
481         // between balancing runs.  The outcome of the "prefer a mount
482         // we're already planning to use for a different storage
483         // class" case in balanceBlock depends on the order classes
484         // are considered.
485         sort.Strings(bal.classes)
486 }
487
488 const (
489         changeStay = iota
490         changePull
491         changeTrash
492         changeNone
493 )
494
495 var changeName = map[int]string{
496         changeStay:  "stay",
497         changePull:  "pull",
498         changeTrash: "trash",
499         changeNone:  "none",
500 }
501
502 type balanceResult struct {
503         blk        *BlockState
504         blkid      arvados.SizedDigest
505         have       int
506         want       int
507         classState map[string]balancedBlockState
508 }
509
510 // balanceBlock compares current state to desired state for a single
511 // block, and makes the appropriate ChangeSet calls.
512 func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) balanceResult {
513         debugf("balanceBlock: %v %+v", blkid, blk)
514
515         type slot struct {
516                 mnt  *KeepMount // never nil
517                 repl *Replica   // replica already stored here (or nil)
518                 want bool       // we should pull/leave a replica here
519         }
520
521         // Build a list of all slots (one per mounted volume).
522         slots := make([]slot, 0, bal.mounts)
523         for _, srv := range bal.KeepServices {
524                 for _, mnt := range srv.mounts {
525                         var repl *Replica
526                         for r := range blk.Replicas {
527                                 if blk.Replicas[r].KeepMount == mnt {
528                                         repl = &blk.Replicas[r]
529                                 }
530                         }
531                         // Initial value of "want" is "have, and can't
532                         // delete". These untrashable replicas get
533                         // prioritized when sorting slots: otherwise,
534                         // non-optimal readonly copies would cause us
535                         // to overreplicate.
536                         slots = append(slots, slot{
537                                 mnt:  mnt,
538                                 repl: repl,
539                                 want: repl != nil && (mnt.ReadOnly || repl.Mtime >= bal.MinMtime),
540                         })
541                 }
542         }
543
544         uuids := keepclient.NewRootSorter(bal.serviceRoots, string(blkid[:32])).GetSortedRoots()
545         srvRendezvous := make(map[*KeepService]int, len(uuids))
546         for i, uuid := range uuids {
547                 srv := bal.KeepServices[uuid]
548                 srvRendezvous[srv] = i
549         }
550
551         // Below we set underreplicated=true if we find any storage
552         // class that's currently underreplicated -- in that case we
553         // won't want to trash any replicas.
554         underreplicated := false
555
556         classState := make(map[string]balancedBlockState, len(bal.classes))
557         unsafeToDelete := make(map[int64]bool, len(slots))
558         for _, class := range bal.classes {
559                 desired := blk.Desired[class]
560
561                 countedDev := map[string]bool{}
562                 have := 0
563                 for _, slot := range slots {
564                         if slot.repl != nil && bal.mountsByClass[class][slot.mnt] && !countedDev[slot.mnt.DeviceID] {
565                                 have += slot.mnt.Replication
566                                 if slot.mnt.DeviceID != "" {
567                                         countedDev[slot.mnt.DeviceID] = true
568                                 }
569                         }
570                 }
571                 classState[class] = balancedBlockState{
572                         desired: desired,
573                         surplus: have - desired,
574                 }
575
576                 if desired == 0 {
577                         continue
578                 }
579
580                 // Sort the slots by desirability.
581                 sort.Slice(slots, func(i, j int) bool {
582                         si, sj := slots[i], slots[j]
583                         if classi, classj := bal.mountsByClass[class][si.mnt], bal.mountsByClass[class][sj.mnt]; classi != classj {
584                                 // Prefer a mount that satisfies the
585                                 // desired class.
586                                 return bal.mountsByClass[class][si.mnt]
587                         } else if wanti, wantj := si.want, si.want; wanti != wantj {
588                                 // Prefer a mount that will have a
589                                 // replica no matter what we do here
590                                 // -- either because it already has an
591                                 // untrashable replica, or because we
592                                 // already need it to satisfy a
593                                 // different storage class.
594                                 return slots[i].want
595                         } else if orderi, orderj := srvRendezvous[si.mnt.KeepService], srvRendezvous[sj.mnt.KeepService]; orderi != orderj {
596                                 // Prefer a better rendezvous
597                                 // position.
598                                 return orderi < orderj
599                         } else if repli, replj := si.repl != nil, sj.repl != nil; repli != replj {
600                                 // Prefer a mount that already has a
601                                 // replica.
602                                 return repli
603                         } else {
604                                 // If pull/trash turns out to be
605                                 // needed, distribute the
606                                 // new/remaining replicas uniformly
607                                 // across qualifying mounts on a given
608                                 // server.
609                                 return rendezvousLess(si.mnt.DeviceID, sj.mnt.DeviceID, blkid)
610                         }
611                 })
612
613                 // Servers/mounts/devices (with or without existing
614                 // replicas) that are part of the best achievable
615                 // layout for this storage class.
616                 wantSrv := map[*KeepService]bool{}
617                 wantMnt := map[*KeepMount]bool{}
618                 wantDev := map[string]bool{}
619                 // Positions (with existing replicas) that have been
620                 // protected (via unsafeToDelete) to ensure we don't
621                 // reduce replication below desired level when
622                 // trashing replicas that aren't optimal positions for
623                 // any storage class.
624                 protMnt := map[*KeepMount]bool{}
625                 // Replication planned so far (corresponds to wantMnt).
626                 replWant := 0
627                 // Protected replication (corresponds to protMnt).
628                 replProt := 0
629
630                 // trySlot tries using a slot to meet requirements,
631                 // and returns true if all requirements are met.
632                 trySlot := func(i int) bool {
633                         slot := slots[i]
634                         if wantMnt[slot.mnt] || wantDev[slot.mnt.DeviceID] {
635                                 // Already allocated a replica to this
636                                 // backend device, possibly on a
637                                 // different server.
638                                 return false
639                         }
640                         if replProt < desired && slot.repl != nil && !protMnt[slot.mnt] {
641                                 unsafeToDelete[slot.repl.Mtime] = true
642                                 protMnt[slot.mnt] = true
643                                 replProt += slot.mnt.Replication
644                         }
645                         if replWant < desired && (slot.repl != nil || !slot.mnt.ReadOnly) {
646                                 slots[i].want = true
647                                 wantSrv[slot.mnt.KeepService] = true
648                                 wantMnt[slot.mnt] = true
649                                 if slot.mnt.DeviceID != "" {
650                                         wantDev[slot.mnt.DeviceID] = true
651                                 }
652                                 replWant += slot.mnt.Replication
653                         }
654                         return replProt >= desired && replWant >= desired
655                 }
656
657                 // First try to achieve desired replication without
658                 // using the same server twice.
659                 done := false
660                 for i := 0; i < len(slots) && !done; i++ {
661                         if !wantSrv[slots[i].mnt.KeepService] {
662                                 done = trySlot(i)
663                         }
664                 }
665
666                 // If that didn't suffice, do another pass without the
667                 // "distinct services" restriction. (Achieving the
668                 // desired volume replication on fewer than the
669                 // desired number of services is better than
670                 // underreplicating.)
671                 for i := 0; i < len(slots) && !done; i++ {
672                         done = trySlot(i)
673                 }
674
675                 if !underreplicated {
676                         safe := 0
677                         for _, slot := range slots {
678                                 if slot.repl == nil || !bal.mountsByClass[class][slot.mnt] {
679                                         continue
680                                 }
681                                 if safe += slot.mnt.Replication; safe >= desired {
682                                         break
683                                 }
684                         }
685                         underreplicated = safe < desired
686                 }
687
688                 // set the unachievable flag if there aren't enough
689                 // slots offering the relevant storage class. (This is
690                 // as easy as checking slots[desired] because we
691                 // already sorted the qualifying slots to the front.)
692                 if desired >= len(slots) || !bal.mountsByClass[class][slots[desired].mnt] {
693                         cs := classState[class]
694                         cs.unachievable = true
695                         classState[class] = cs
696                 }
697
698                 // Avoid deleting wanted replicas from devices that
699                 // are mounted on multiple servers -- even if they
700                 // haven't already been added to unsafeToDelete
701                 // because the servers report different Mtimes.
702                 for _, slot := range slots {
703                         if slot.repl != nil && wantDev[slot.mnt.DeviceID] {
704                                 unsafeToDelete[slot.repl.Mtime] = true
705                         }
706                 }
707         }
708
709         // TODO: If multiple replicas are trashable, prefer the oldest
710         // replica that doesn't have a timestamp collision with
711         // others.
712
713         countedDev := map[string]bool{}
714         var have, want int
715         for _, slot := range slots {
716                 if countedDev[slot.mnt.DeviceID] {
717                         continue
718                 }
719                 if slot.want {
720                         want += slot.mnt.Replication
721                 }
722                 if slot.repl != nil {
723                         have += slot.mnt.Replication
724                 }
725                 if slot.mnt.DeviceID != "" {
726                         countedDev[slot.mnt.DeviceID] = true
727                 }
728         }
729
730         var changes []string
731         for _, slot := range slots {
732                 // TODO: request a Touch if Mtime is duplicated.
733                 var change int
734                 switch {
735                 case !underreplicated && slot.repl != nil && !slot.want && !unsafeToDelete[slot.repl.Mtime]:
736                         slot.mnt.KeepService.AddTrash(Trash{
737                                 SizedDigest: blkid,
738                                 Mtime:       slot.repl.Mtime,
739                                 From:        slot.mnt,
740                         })
741                         change = changeTrash
742                 case len(blk.Replicas) == 0:
743                         change = changeNone
744                 case slot.repl == nil && slot.want && !slot.mnt.ReadOnly:
745                         slot.mnt.KeepService.AddPull(Pull{
746                                 SizedDigest: blkid,
747                                 From:        blk.Replicas[0].KeepMount.KeepService,
748                                 To:          slot.mnt,
749                         })
750                         change = changePull
751                 default:
752                         change = changeStay
753                 }
754                 if bal.Dumper != nil {
755                         var mtime int64
756                         if slot.repl != nil {
757                                 mtime = slot.repl.Mtime
758                         }
759                         srv := slot.mnt.KeepService
760                         changes = append(changes, fmt.Sprintf("%s:%d/%s=%s,%d", srv.ServiceHost, srv.ServicePort, slot.mnt.UUID, changeName[change], mtime))
761                 }
762         }
763         if bal.Dumper != nil {
764                 bal.Dumper.Printf("%s have=%d want=%v %s", blkid, have, want, strings.Join(changes, " "))
765         }
766         return balanceResult{
767                 blk:        blk,
768                 blkid:      blkid,
769                 have:       have,
770                 want:       want,
771                 classState: classState,
772         }
773 }
774
775 type blocksNBytes struct {
776         replicas int
777         blocks   int
778         bytes    int64
779 }
780
781 func (bb blocksNBytes) String() string {
782         return fmt.Sprintf("%d replicas (%d blocks, %d bytes)", bb.replicas, bb.blocks, bb.bytes)
783 }
784
785 type balancerStats struct {
786         lost          blocksNBytes
787         overrep       blocksNBytes
788         unref         blocksNBytes
789         garbage       blocksNBytes
790         underrep      blocksNBytes
791         unachievable  blocksNBytes
792         justright     blocksNBytes
793         desired       blocksNBytes
794         current       blocksNBytes
795         pulls         int
796         trashes       int
797         replHistogram []int
798         classStats    map[string]replicationStats
799 }
800
801 type replicationStats struct {
802         desired      blocksNBytes
803         surplus      blocksNBytes
804         short        blocksNBytes
805         unachievable blocksNBytes
806 }
807
808 type balancedBlockState struct {
809         desired      int
810         surplus      int
811         unachievable bool
812 }
813
814 func (bal *Balancer) collectStatistics(results <-chan balanceResult) {
815         var s balancerStats
816         s.replHistogram = make([]int, 2)
817         s.classStats = make(map[string]replicationStats, len(bal.classes))
818         for result := range results {
819                 surplus := result.have - result.want
820                 bytes := result.blkid.Size()
821
822                 for class, state := range result.classState {
823                         cs := s.classStats[class]
824                         if state.unachievable {
825                                 cs.unachievable.blocks++
826                                 cs.unachievable.bytes += bytes
827                         }
828                         if state.desired > 0 {
829                                 cs.desired.replicas += state.desired
830                                 cs.desired.blocks++
831                                 cs.desired.bytes += bytes * int64(state.desired)
832                         }
833                         if state.surplus > 0 {
834                                 cs.surplus.replicas += state.surplus
835                                 cs.surplus.blocks++
836                                 cs.surplus.bytes += bytes * int64(state.surplus)
837                         } else if state.surplus < 0 {
838                                 cs.short.replicas += -state.surplus
839                                 cs.short.blocks++
840                                 cs.short.bytes += bytes * int64(-state.surplus)
841                         }
842                         s.classStats[class] = cs
843                 }
844
845                 switch {
846                 case result.have == 0 && result.want > 0:
847                         s.lost.replicas -= surplus
848                         s.lost.blocks++
849                         s.lost.bytes += bytes * int64(-surplus)
850                 case surplus < 0:
851                         s.underrep.replicas -= surplus
852                         s.underrep.blocks++
853                         s.underrep.bytes += bytes * int64(-surplus)
854                 case surplus > 0 && result.want == 0:
855                         counter := &s.garbage
856                         for _, r := range result.blk.Replicas {
857                                 if r.Mtime >= bal.MinMtime {
858                                         counter = &s.unref
859                                         break
860                                 }
861                         }
862                         counter.replicas += surplus
863                         counter.blocks++
864                         counter.bytes += bytes * int64(surplus)
865                 case surplus > 0:
866                         s.overrep.replicas += surplus
867                         s.overrep.blocks++
868                         s.overrep.bytes += bytes * int64(result.have-result.want)
869                 default:
870                         s.justright.replicas += result.want
871                         s.justright.blocks++
872                         s.justright.bytes += bytes * int64(result.want)
873                 }
874
875                 if result.want > 0 {
876                         s.desired.replicas += result.want
877                         s.desired.blocks++
878                         s.desired.bytes += bytes * int64(result.want)
879                 }
880                 if result.have > 0 {
881                         s.current.replicas += result.have
882                         s.current.blocks++
883                         s.current.bytes += bytes * int64(result.have)
884                 }
885
886                 for len(s.replHistogram) <= result.have {
887                         s.replHistogram = append(s.replHistogram, 0)
888                 }
889                 s.replHistogram[result.have]++
890         }
891         for _, srv := range bal.KeepServices {
892                 s.pulls += len(srv.ChangeSet.Pulls)
893                 s.trashes += len(srv.ChangeSet.Trashes)
894         }
895         bal.stats = s
896 }
897
898 // PrintStatistics writes statistics about the computed changes to
899 // bal.Logger. It should not be called until ComputeChangeSets has
900 // finished.
901 func (bal *Balancer) PrintStatistics() {
902         bal.logf("===")
903         bal.logf("%s lost (0=have<want)", bal.stats.lost)
904         bal.logf("%s underreplicated (0<have<want)", bal.stats.underrep)
905         bal.logf("%s just right (have=want)", bal.stats.justright)
906         bal.logf("%s overreplicated (have>want>0)", bal.stats.overrep)
907         bal.logf("%s unreferenced (have>want=0, new)", bal.stats.unref)
908         bal.logf("%s garbage (have>want=0, old)", bal.stats.garbage)
909         for _, class := range bal.classes {
910                 cs := bal.stats.classStats[class]
911                 bal.logf("===")
912                 bal.logf("storage class %q: %s desired", class, cs.desired)
913                 bal.logf("storage class %q: %s short", class, cs.short)
914                 bal.logf("storage class %q: %s surplus", class, cs.surplus)
915                 bal.logf("storage class %q: %s unachievable", class, cs.unachievable)
916         }
917         bal.logf("===")
918         bal.logf("%s total commitment (excluding unreferenced)", bal.stats.desired)
919         bal.logf("%s total usage", bal.stats.current)
920         bal.logf("===")
921         for _, srv := range bal.KeepServices {
922                 bal.logf("%s: %v\n", srv, srv.ChangeSet)
923         }
924         bal.logf("===")
925         bal.printHistogram(60)
926         bal.logf("===")
927 }
928
929 func (bal *Balancer) printHistogram(hashColumns int) {
930         bal.logf("Replication level distribution (counting N replicas on a single server as N):")
931         maxCount := 0
932         for _, count := range bal.stats.replHistogram {
933                 if maxCount < count {
934                         maxCount = count
935                 }
936         }
937         hashes := strings.Repeat("#", hashColumns)
938         countWidth := 1 + int(math.Log10(float64(maxCount+1)))
939         scaleCount := 10 * float64(hashColumns) / math.Floor(1+10*math.Log10(float64(maxCount+1)))
940         for repl, count := range bal.stats.replHistogram {
941                 nHashes := int(scaleCount * math.Log10(float64(count+1)))
942                 bal.logf("%2d: %*d %s", repl, countWidth, count, hashes[:nHashes])
943         }
944 }
945
946 // CheckSanityLate checks for configuration and runtime errors after
947 // GetCurrentState() and ComputeChangeSets() have finished.
948 //
949 // If it returns an error, it is dangerous to run any Commit methods.
950 func (bal *Balancer) CheckSanityLate() error {
951         if bal.errors != nil {
952                 for _, err := range bal.errors {
953                         bal.logf("deferred error: %v", err)
954                 }
955                 return fmt.Errorf("cannot proceed safely after deferred errors")
956         }
957
958         if bal.collScanned == 0 {
959                 return fmt.Errorf("received zero collections")
960         }
961
962         anyDesired := false
963         bal.BlockStateMap.Apply(func(_ arvados.SizedDigest, blk *BlockState) {
964                 for _, desired := range blk.Desired {
965                         if desired > 0 {
966                                 anyDesired = true
967                                 break
968                         }
969                 }
970         })
971         if !anyDesired {
972                 return fmt.Errorf("zero blocks have desired replication>0")
973         }
974
975         if dr := bal.DefaultReplication; dr < 1 {
976                 return fmt.Errorf("Default replication (%d) is less than 1", dr)
977         }
978
979         // TODO: no two services have identical indexes
980         // TODO: no collisions (same md5, different size)
981         return nil
982 }
983
984 // CommitPulls sends the computed lists of pull requests to the
985 // keepstore servers. This has the effect of increasing replication of
986 // existing blocks that are either underreplicated or poorly
987 // distributed according to rendezvous hashing.
988 func (bal *Balancer) CommitPulls(c *arvados.Client) error {
989         return bal.commitAsync(c, "send pull list",
990                 func(srv *KeepService) error {
991                         return srv.CommitPulls(c)
992                 })
993 }
994
995 // CommitTrash sends the computed lists of trash requests to the
996 // keepstore servers. This has the effect of deleting blocks that are
997 // overreplicated or unreferenced.
998 func (bal *Balancer) CommitTrash(c *arvados.Client) error {
999         return bal.commitAsync(c, "send trash list",
1000                 func(srv *KeepService) error {
1001                         return srv.CommitTrash(c)
1002                 })
1003 }
1004
1005 func (bal *Balancer) commitAsync(c *arvados.Client, label string, f func(srv *KeepService) error) error {
1006         errs := make(chan error)
1007         for _, srv := range bal.KeepServices {
1008                 go func(srv *KeepService) {
1009                         var err error
1010                         defer func() { errs <- err }()
1011                         label := fmt.Sprintf("%s: %v", srv, label)
1012                         defer timeMe(bal.Logger, label)()
1013                         err = f(srv)
1014                         if err != nil {
1015                                 err = fmt.Errorf("%s: %v", label, err)
1016                         }
1017                 }(srv)
1018         }
1019         var lastErr error
1020         for range bal.KeepServices {
1021                 if err := <-errs; err != nil {
1022                         bal.logf("%v", err)
1023                         lastErr = err
1024                 }
1025         }
1026         close(errs)
1027         return lastErr
1028 }
1029
1030 func (bal *Balancer) logf(f string, args ...interface{}) {
1031         if bal.Logger != nil {
1032                 bal.Logger.Printf(f, args...)
1033         }
1034 }
1035
1036 // Rendezvous hash sort function. Less efficient than sorting on
1037 // precomputed rendezvous hashes, but also rarely used.
1038 func rendezvousLess(i, j string, blkid arvados.SizedDigest) bool {
1039         a := md5.Sum([]byte(string(blkid[:32]) + i))
1040         b := md5.Sum([]byte(string(blkid[:32]) + j))
1041         return bytes.Compare(a[:], b[:]) < 0
1042 }