15112: Fix fd leak on error.
[arvados.git] / services / keep-balance / balance.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "crypto/md5"
10         "fmt"
11         "io"
12         "io/ioutil"
13         "log"
14         "math"
15         "os"
16         "runtime"
17         "sort"
18         "strings"
19         "sync"
20         "syscall"
21         "time"
22
23         "git.curoverse.com/arvados.git/sdk/go/arvados"
24         "git.curoverse.com/arvados.git/sdk/go/keepclient"
25         "github.com/sirupsen/logrus"
26 )
27
28 // Balancer compares the contents of keepstore servers with the
29 // collections stored in Arvados, and issues pull/trash requests
30 // needed to get (closer to) the optimal data layout.
31 //
32 // In the optimal data layout: every data block referenced by a
33 // collection is replicated at least as many times as desired by the
34 // collection; there are no unreferenced data blocks older than
35 // BlobSignatureTTL; and all N existing replicas of a given data block
36 // are in the N best positions in rendezvous probe order.
37 type Balancer struct {
38         Logger  logrus.FieldLogger
39         Dumper  logrus.FieldLogger
40         Metrics *metrics
41
42         LostBlocksFile string
43
44         *BlockStateMap
45         KeepServices       map[string]*KeepService
46         DefaultReplication int
47         MinMtime           int64
48
49         classes       []string
50         mounts        int
51         mountsByClass map[string]map[*KeepMount]bool
52         collScanned   int
53         serviceRoots  map[string]string
54         errors        []error
55         stats         balancerStats
56         mutex         sync.Mutex
57         lostBlocks    io.Writer
58 }
59
60 // Run performs a balance operation using the given config and
61 // runOptions, and returns RunOptions suitable for passing to a
62 // subsequent balance operation.
63 //
64 // Run should only be called once on a given Balancer object.
65 //
66 // Typical usage:
67 //
68 //   runOptions, err = (&Balancer{}).Run(config, runOptions)
69 func (bal *Balancer) Run(config Config, runOptions RunOptions) (nextRunOptions RunOptions, err error) {
70         nextRunOptions = runOptions
71
72         defer bal.time("sweep", "wall clock time to run one full sweep")()
73
74         var lbFile *os.File
75         if bal.LostBlocksFile != "" {
76                 tmpfn := bal.LostBlocksFile + ".tmp"
77                 lbFile, err = os.OpenFile(tmpfn, os.O_CREATE|os.O_WRONLY, 0777)
78                 if err != nil {
79                         return
80                 }
81                 defer lbFile.Close()
82                 err = syscall.Flock(int(lbFile.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
83                 if err != nil {
84                         return
85                 }
86                 defer os.Remove(tmpfn)
87                 bal.lostBlocks = lbFile
88         } else {
89                 bal.lostBlocks = ioutil.Discard
90         }
91
92         if len(config.KeepServiceList.Items) > 0 {
93                 err = bal.SetKeepServices(config.KeepServiceList)
94         } else {
95                 err = bal.DiscoverKeepServices(&config.Client, config.KeepServiceTypes)
96         }
97         if err != nil {
98                 return
99         }
100
101         for _, srv := range bal.KeepServices {
102                 err = srv.discoverMounts(&config.Client)
103                 if err != nil {
104                         return
105                 }
106         }
107         bal.cleanupMounts()
108
109         if err = bal.CheckSanityEarly(&config.Client); err != nil {
110                 return
111         }
112         rs := bal.rendezvousState()
113         if runOptions.CommitTrash && rs != runOptions.SafeRendezvousState {
114                 if runOptions.SafeRendezvousState != "" {
115                         bal.logf("notice: KeepServices list has changed since last run")
116                 }
117                 bal.logf("clearing existing trash lists, in case the new rendezvous order differs from previous run")
118                 if err = bal.ClearTrashLists(&config.Client); err != nil {
119                         return
120                 }
121                 // The current rendezvous state becomes "safe" (i.e.,
122                 // OK to compute changes for that state without
123                 // clearing existing trash lists) only now, after we
124                 // succeed in clearing existing trash lists.
125                 nextRunOptions.SafeRendezvousState = rs
126         }
127         if err = bal.GetCurrentState(&config.Client, config.CollectionBatchSize, config.CollectionBuffers); err != nil {
128                 return
129         }
130         bal.ComputeChangeSets()
131         bal.PrintStatistics()
132         if err = bal.CheckSanityLate(); err != nil {
133                 return
134         }
135         if lbFile != nil {
136                 err = lbFile.Sync()
137                 if err != nil {
138                         return
139                 }
140                 err = os.Rename(bal.LostBlocksFile+".tmp", bal.LostBlocksFile)
141                 if err != nil {
142                         return
143                 }
144                 err = lbFile.Close()
145                 if err != nil {
146                         return
147                 }
148         }
149         if runOptions.CommitPulls {
150                 err = bal.CommitPulls(&config.Client)
151                 if err != nil {
152                         // Skip trash if we can't pull. (Too cautious?)
153                         return
154                 }
155         }
156         if runOptions.CommitTrash {
157                 err = bal.CommitTrash(&config.Client)
158         }
159         return
160 }
161
162 // SetKeepServices sets the list of KeepServices to operate on.
163 func (bal *Balancer) SetKeepServices(srvList arvados.KeepServiceList) error {
164         bal.KeepServices = make(map[string]*KeepService)
165         for _, srv := range srvList.Items {
166                 bal.KeepServices[srv.UUID] = &KeepService{
167                         KeepService: srv,
168                         ChangeSet:   &ChangeSet{},
169                 }
170         }
171         return nil
172 }
173
174 // DiscoverKeepServices sets the list of KeepServices by calling the
175 // API to get a list of all services, and selecting the ones whose
176 // ServiceType is in okTypes.
177 func (bal *Balancer) DiscoverKeepServices(c *arvados.Client, okTypes []string) error {
178         bal.KeepServices = make(map[string]*KeepService)
179         ok := make(map[string]bool)
180         for _, t := range okTypes {
181                 ok[t] = true
182         }
183         return c.EachKeepService(func(srv arvados.KeepService) error {
184                 if ok[srv.ServiceType] {
185                         bal.KeepServices[srv.UUID] = &KeepService{
186                                 KeepService: srv,
187                                 ChangeSet:   &ChangeSet{},
188                         }
189                 } else {
190                         bal.logf("skipping %v with service type %q", srv.UUID, srv.ServiceType)
191                 }
192                 return nil
193         })
194 }
195
196 func (bal *Balancer) cleanupMounts() {
197         rwdev := map[string]*KeepService{}
198         for _, srv := range bal.KeepServices {
199                 for _, mnt := range srv.mounts {
200                         if !mnt.ReadOnly && mnt.DeviceID != "" {
201                                 rwdev[mnt.DeviceID] = srv
202                         }
203                 }
204         }
205         // Drop the readonly mounts whose device is mounted RW
206         // elsewhere.
207         for _, srv := range bal.KeepServices {
208                 var dedup []*KeepMount
209                 for _, mnt := range srv.mounts {
210                         if mnt.ReadOnly && rwdev[mnt.DeviceID] != nil {
211                                 bal.logf("skipping srv %s readonly mount %q because same device %q is mounted read-write on srv %s", srv, mnt.UUID, mnt.DeviceID, rwdev[mnt.DeviceID])
212                         } else {
213                                 dedup = append(dedup, mnt)
214                         }
215                 }
216                 srv.mounts = dedup
217         }
218         for _, srv := range bal.KeepServices {
219                 for _, mnt := range srv.mounts {
220                         if mnt.Replication <= 0 {
221                                 log.Printf("%s: mount %s reports replication=%d, using replication=1", srv, mnt.UUID, mnt.Replication)
222                                 mnt.Replication = 1
223                         }
224                 }
225         }
226 }
227
228 // CheckSanityEarly checks for configuration and runtime errors that
229 // can be detected before GetCurrentState() and ComputeChangeSets()
230 // are called.
231 //
232 // If it returns an error, it is pointless to run GetCurrentState or
233 // ComputeChangeSets: after doing so, the statistics would be
234 // meaningless and it would be dangerous to run any Commit methods.
235 func (bal *Balancer) CheckSanityEarly(c *arvados.Client) error {
236         u, err := c.CurrentUser()
237         if err != nil {
238                 return fmt.Errorf("CurrentUser(): %v", err)
239         }
240         if !u.IsActive || !u.IsAdmin {
241                 return fmt.Errorf("current user (%s) is not an active admin user", u.UUID)
242         }
243         for _, srv := range bal.KeepServices {
244                 if srv.ServiceType == "proxy" {
245                         return fmt.Errorf("config error: %s: proxy servers cannot be balanced", srv)
246                 }
247         }
248
249         var checkPage arvados.CollectionList
250         if err = c.RequestAndDecode(&checkPage, "GET", "arvados/v1/collections", nil, arvados.ResourceListParams{
251                 Limit:              new(int),
252                 Count:              "exact",
253                 IncludeTrash:       true,
254                 IncludeOldVersions: true,
255                 Filters: []arvados.Filter{{
256                         Attr:     "modified_at",
257                         Operator: "=",
258                         Operand:  nil,
259                 }},
260         }); err != nil {
261                 return err
262         } else if n := checkPage.ItemsAvailable; n > 0 {
263                 return fmt.Errorf("%d collections exist with null modified_at; cannot fetch reliably", n)
264         }
265
266         return nil
267 }
268
269 // rendezvousState returns a fingerprint (e.g., a sorted list of
270 // UUID+host+port) of the current set of keep services.
271 func (bal *Balancer) rendezvousState() string {
272         srvs := make([]string, 0, len(bal.KeepServices))
273         for _, srv := range bal.KeepServices {
274                 srvs = append(srvs, srv.String())
275         }
276         sort.Strings(srvs)
277         return strings.Join(srvs, "; ")
278 }
279
280 // ClearTrashLists sends an empty trash list to each keep
281 // service. Calling this before GetCurrentState avoids races.
282 //
283 // When a block appears in an index, we assume that replica will still
284 // exist after we delete other replicas on other servers. However,
285 // it's possible that a previous rebalancing operation made different
286 // decisions (e.g., servers were added/removed, and rendezvous order
287 // changed). In this case, the replica might already be on that
288 // server's trash list, and it might be deleted before we send a
289 // replacement trash list.
290 //
291 // We avoid this problem if we clear all trash lists before getting
292 // indexes. (We also assume there is only one rebalancing process
293 // running at a time.)
294 func (bal *Balancer) ClearTrashLists(c *arvados.Client) error {
295         for _, srv := range bal.KeepServices {
296                 srv.ChangeSet = &ChangeSet{}
297         }
298         return bal.CommitTrash(c)
299 }
300
301 // GetCurrentState determines the current replication state, and the
302 // desired replication level, for every block that is either
303 // retrievable or referenced.
304 //
305 // It determines the current replication state by reading the block index
306 // from every known Keep service.
307 //
308 // It determines the desired replication level by retrieving all
309 // collection manifests in the database (API server).
310 //
311 // It encodes the resulting information in BlockStateMap.
312 func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) error {
313         defer bal.time("get_state", "wall clock time to get current state")()
314         bal.BlockStateMap = NewBlockStateMap()
315
316         dd, err := c.DiscoveryDocument()
317         if err != nil {
318                 return err
319         }
320         bal.DefaultReplication = dd.DefaultCollectionReplication
321         bal.MinMtime = time.Now().UnixNano() - dd.BlobSignatureTTL*1e9
322
323         errs := make(chan error, 1)
324         wg := sync.WaitGroup{}
325
326         // When a device is mounted more than once, we will get its
327         // index only once, and call AddReplicas on all of the mounts.
328         // equivMount keys are the mounts that will be indexed, and
329         // each value is a list of mounts to apply the received index
330         // to.
331         equivMount := map[*KeepMount][]*KeepMount{}
332         // deviceMount maps each device ID to the one mount that will
333         // be indexed for that device.
334         deviceMount := map[string]*KeepMount{}
335         for _, srv := range bal.KeepServices {
336                 for _, mnt := range srv.mounts {
337                         equiv := deviceMount[mnt.DeviceID]
338                         if equiv == nil {
339                                 equiv = mnt
340                                 if mnt.DeviceID != "" {
341                                         deviceMount[mnt.DeviceID] = equiv
342                                 }
343                         }
344                         equivMount[equiv] = append(equivMount[equiv], mnt)
345                 }
346         }
347
348         // Start one goroutine for each (non-redundant) mount:
349         // retrieve the index, and add the returned blocks to
350         // BlockStateMap.
351         for _, mounts := range equivMount {
352                 wg.Add(1)
353                 go func(mounts []*KeepMount) {
354                         defer wg.Done()
355                         bal.logf("mount %s: retrieve index from %s", mounts[0], mounts[0].KeepService)
356                         idx, err := mounts[0].KeepService.IndexMount(c, mounts[0].UUID, "")
357                         if err != nil {
358                                 select {
359                                 case errs <- fmt.Errorf("%s: retrieve index: %v", mounts[0], err):
360                                 default:
361                                 }
362                                 return
363                         }
364                         if len(errs) > 0 {
365                                 // Some other goroutine encountered an
366                                 // error -- any further effort here
367                                 // will be wasted.
368                                 return
369                         }
370                         for _, mount := range mounts {
371                                 bal.logf("%s: add %d entries to map", mount, len(idx))
372                                 bal.BlockStateMap.AddReplicas(mount, idx)
373                                 bal.logf("%s: added %d entries to map at %dx (%d replicas)", mount, len(idx), mount.Replication, len(idx)*mount.Replication)
374                         }
375                         bal.logf("mount %s: index done", mounts[0])
376                 }(mounts)
377         }
378
379         // collQ buffers incoming collections so we can start fetching
380         // the next page without waiting for the current page to
381         // finish processing.
382         collQ := make(chan arvados.Collection, bufs)
383
384         // Start a goroutine to process collections. (We could use a
385         // worker pool here, but even with a single worker we already
386         // process collections much faster than we can retrieve them.)
387         wg.Add(1)
388         go func() {
389                 defer wg.Done()
390                 for coll := range collQ {
391                         err := bal.addCollection(coll)
392                         if err != nil || len(errs) > 0 {
393                                 select {
394                                 case errs <- err:
395                                 default:
396                                 }
397                                 for range collQ {
398                                 }
399                                 return
400                         }
401                         bal.collScanned++
402                 }
403         }()
404
405         // Start a goroutine to retrieve all collections from the
406         // Arvados database and send them to collQ for processing.
407         wg.Add(1)
408         go func() {
409                 defer wg.Done()
410                 err = EachCollection(c, pageSize,
411                         func(coll arvados.Collection) error {
412                                 collQ <- coll
413                                 if len(errs) > 0 {
414                                         // some other GetCurrentState
415                                         // error happened: no point
416                                         // getting any more
417                                         // collections.
418                                         return fmt.Errorf("")
419                                 }
420                                 return nil
421                         }, func(done, total int) {
422                                 bal.logf("collections: %d/%d", done, total)
423                         })
424                 close(collQ)
425                 if err != nil {
426                         select {
427                         case errs <- err:
428                         default:
429                         }
430                 }
431         }()
432
433         wg.Wait()
434         if len(errs) > 0 {
435                 return <-errs
436         }
437         return nil
438 }
439
440 func (bal *Balancer) addCollection(coll arvados.Collection) error {
441         blkids, err := coll.SizedDigests()
442         if err != nil {
443                 return fmt.Errorf("%v: %v", coll.UUID, err)
444         }
445         repl := bal.DefaultReplication
446         if coll.ReplicationDesired != nil {
447                 repl = *coll.ReplicationDesired
448         }
449         debugf("%v: %d block x%d", coll.UUID, len(blkids), repl)
450         bal.BlockStateMap.IncreaseDesired(coll.StorageClassesDesired, repl, blkids)
451         return nil
452 }
453
454 // ComputeChangeSets compares, for each known block, the current and
455 // desired replication states. If it is possible to get closer to the
456 // desired state by copying or deleting blocks, it adds those changes
457 // to the relevant KeepServices' ChangeSets.
458 //
459 // It does not actually apply any of the computed changes.
460 func (bal *Balancer) ComputeChangeSets() {
461         // This just calls balanceBlock() once for each block, using a
462         // pool of worker goroutines.
463         defer bal.time("changeset_compute", "wall clock time to compute changesets")()
464         bal.setupLookupTables()
465
466         type balanceTask struct {
467                 blkid arvados.SizedDigest
468                 blk   *BlockState
469         }
470         workers := runtime.GOMAXPROCS(-1)
471         todo := make(chan balanceTask, workers)
472         go func() {
473                 bal.BlockStateMap.Apply(func(blkid arvados.SizedDigest, blk *BlockState) {
474                         todo <- balanceTask{
475                                 blkid: blkid,
476                                 blk:   blk,
477                         }
478                 })
479                 close(todo)
480         }()
481         results := make(chan balanceResult, workers)
482         go func() {
483                 var wg sync.WaitGroup
484                 for i := 0; i < workers; i++ {
485                         wg.Add(1)
486                         go func() {
487                                 for work := range todo {
488                                         results <- bal.balanceBlock(work.blkid, work.blk)
489                                 }
490                                 wg.Done()
491                         }()
492                 }
493                 wg.Wait()
494                 close(results)
495         }()
496         bal.collectStatistics(results)
497 }
498
499 func (bal *Balancer) setupLookupTables() {
500         bal.serviceRoots = make(map[string]string)
501         bal.classes = defaultClasses
502         bal.mountsByClass = map[string]map[*KeepMount]bool{"default": {}}
503         bal.mounts = 0
504         for _, srv := range bal.KeepServices {
505                 bal.serviceRoots[srv.UUID] = srv.UUID
506                 for _, mnt := range srv.mounts {
507                         bal.mounts++
508
509                         // All mounts on a read-only service are
510                         // effectively read-only.
511                         mnt.ReadOnly = mnt.ReadOnly || srv.ReadOnly
512
513                         if len(mnt.StorageClasses) == 0 {
514                                 bal.mountsByClass["default"][mnt] = true
515                                 continue
516                         }
517                         for _, class := range mnt.StorageClasses {
518                                 if mbc := bal.mountsByClass[class]; mbc == nil {
519                                         bal.classes = append(bal.classes, class)
520                                         bal.mountsByClass[class] = map[*KeepMount]bool{mnt: true}
521                                 } else {
522                                         mbc[mnt] = true
523                                 }
524                         }
525                 }
526         }
527         // Consider classes in lexicographic order to avoid flapping
528         // between balancing runs.  The outcome of the "prefer a mount
529         // we're already planning to use for a different storage
530         // class" case in balanceBlock depends on the order classes
531         // are considered.
532         sort.Strings(bal.classes)
533 }
534
535 const (
536         changeStay = iota
537         changePull
538         changeTrash
539         changeNone
540 )
541
542 var changeName = map[int]string{
543         changeStay:  "stay",
544         changePull:  "pull",
545         changeTrash: "trash",
546         changeNone:  "none",
547 }
548
549 type balanceResult struct {
550         blk        *BlockState
551         blkid      arvados.SizedDigest
552         have       int
553         want       int
554         classState map[string]balancedBlockState
555 }
556
557 // balanceBlock compares current state to desired state for a single
558 // block, and makes the appropriate ChangeSet calls.
559 func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) balanceResult {
560         debugf("balanceBlock: %v %+v", blkid, blk)
561
562         type slot struct {
563                 mnt  *KeepMount // never nil
564                 repl *Replica   // replica already stored here (or nil)
565                 want bool       // we should pull/leave a replica here
566         }
567
568         // Build a list of all slots (one per mounted volume).
569         slots := make([]slot, 0, bal.mounts)
570         for _, srv := range bal.KeepServices {
571                 for _, mnt := range srv.mounts {
572                         var repl *Replica
573                         for r := range blk.Replicas {
574                                 if blk.Replicas[r].KeepMount == mnt {
575                                         repl = &blk.Replicas[r]
576                                 }
577                         }
578                         // Initial value of "want" is "have, and can't
579                         // delete". These untrashable replicas get
580                         // prioritized when sorting slots: otherwise,
581                         // non-optimal readonly copies would cause us
582                         // to overreplicate.
583                         slots = append(slots, slot{
584                                 mnt:  mnt,
585                                 repl: repl,
586                                 want: repl != nil && mnt.ReadOnly,
587                         })
588                 }
589         }
590
591         uuids := keepclient.NewRootSorter(bal.serviceRoots, string(blkid[:32])).GetSortedRoots()
592         srvRendezvous := make(map[*KeepService]int, len(uuids))
593         for i, uuid := range uuids {
594                 srv := bal.KeepServices[uuid]
595                 srvRendezvous[srv] = i
596         }
597
598         // Below we set underreplicated=true if we find any storage
599         // class that's currently underreplicated -- in that case we
600         // won't want to trash any replicas.
601         underreplicated := false
602
603         classState := make(map[string]balancedBlockState, len(bal.classes))
604         unsafeToDelete := make(map[int64]bool, len(slots))
605         for _, class := range bal.classes {
606                 desired := blk.Desired[class]
607
608                 countedDev := map[string]bool{}
609                 have := 0
610                 for _, slot := range slots {
611                         if slot.repl != nil && bal.mountsByClass[class][slot.mnt] && !countedDev[slot.mnt.DeviceID] {
612                                 have += slot.mnt.Replication
613                                 if slot.mnt.DeviceID != "" {
614                                         countedDev[slot.mnt.DeviceID] = true
615                                 }
616                         }
617                 }
618                 classState[class] = balancedBlockState{
619                         desired: desired,
620                         surplus: have - desired,
621                 }
622
623                 if desired == 0 {
624                         continue
625                 }
626
627                 // Sort the slots by desirability.
628                 sort.Slice(slots, func(i, j int) bool {
629                         si, sj := slots[i], slots[j]
630                         if classi, classj := bal.mountsByClass[class][si.mnt], bal.mountsByClass[class][sj.mnt]; classi != classj {
631                                 // Prefer a mount that satisfies the
632                                 // desired class.
633                                 return bal.mountsByClass[class][si.mnt]
634                         } else if si.want != sj.want {
635                                 // Prefer a mount that will have a
636                                 // replica no matter what we do here
637                                 // -- either because it already has an
638                                 // untrashable replica, or because we
639                                 // already need it to satisfy a
640                                 // different storage class.
641                                 return si.want
642                         } else if orderi, orderj := srvRendezvous[si.mnt.KeepService], srvRendezvous[sj.mnt.KeepService]; orderi != orderj {
643                                 // Prefer a better rendezvous
644                                 // position.
645                                 return orderi < orderj
646                         } else if repli, replj := si.repl != nil, sj.repl != nil; repli != replj {
647                                 // Prefer a mount that already has a
648                                 // replica.
649                                 return repli
650                         } else {
651                                 // If pull/trash turns out to be
652                                 // needed, distribute the
653                                 // new/remaining replicas uniformly
654                                 // across qualifying mounts on a given
655                                 // server.
656                                 return rendezvousLess(si.mnt.DeviceID, sj.mnt.DeviceID, blkid)
657                         }
658                 })
659
660                 // Servers/mounts/devices (with or without existing
661                 // replicas) that are part of the best achievable
662                 // layout for this storage class.
663                 wantSrv := map[*KeepService]bool{}
664                 wantMnt := map[*KeepMount]bool{}
665                 wantDev := map[string]bool{}
666                 // Positions (with existing replicas) that have been
667                 // protected (via unsafeToDelete) to ensure we don't
668                 // reduce replication below desired level when
669                 // trashing replicas that aren't optimal positions for
670                 // any storage class.
671                 protMnt := map[*KeepMount]bool{}
672                 // Replication planned so far (corresponds to wantMnt).
673                 replWant := 0
674                 // Protected replication (corresponds to protMnt).
675                 replProt := 0
676
677                 // trySlot tries using a slot to meet requirements,
678                 // and returns true if all requirements are met.
679                 trySlot := func(i int) bool {
680                         slot := slots[i]
681                         if wantMnt[slot.mnt] || wantDev[slot.mnt.DeviceID] {
682                                 // Already allocated a replica to this
683                                 // backend device, possibly on a
684                                 // different server.
685                                 return false
686                         }
687                         if replProt < desired && slot.repl != nil && !protMnt[slot.mnt] {
688                                 unsafeToDelete[slot.repl.Mtime] = true
689                                 protMnt[slot.mnt] = true
690                                 replProt += slot.mnt.Replication
691                         }
692                         if replWant < desired && (slot.repl != nil || !slot.mnt.ReadOnly) {
693                                 slots[i].want = true
694                                 wantSrv[slot.mnt.KeepService] = true
695                                 wantMnt[slot.mnt] = true
696                                 if slot.mnt.DeviceID != "" {
697                                         wantDev[slot.mnt.DeviceID] = true
698                                 }
699                                 replWant += slot.mnt.Replication
700                         }
701                         return replProt >= desired && replWant >= desired
702                 }
703
704                 // First try to achieve desired replication without
705                 // using the same server twice.
706                 done := false
707                 for i := 0; i < len(slots) && !done; i++ {
708                         if !wantSrv[slots[i].mnt.KeepService] {
709                                 done = trySlot(i)
710                         }
711                 }
712
713                 // If that didn't suffice, do another pass without the
714                 // "distinct services" restriction. (Achieving the
715                 // desired volume replication on fewer than the
716                 // desired number of services is better than
717                 // underreplicating.)
718                 for i := 0; i < len(slots) && !done; i++ {
719                         done = trySlot(i)
720                 }
721
722                 if !underreplicated {
723                         safe := 0
724                         for _, slot := range slots {
725                                 if slot.repl == nil || !bal.mountsByClass[class][slot.mnt] {
726                                         continue
727                                 }
728                                 if safe += slot.mnt.Replication; safe >= desired {
729                                         break
730                                 }
731                         }
732                         underreplicated = safe < desired
733                 }
734
735                 // set the unachievable flag if there aren't enough
736                 // slots offering the relevant storage class. (This is
737                 // as easy as checking slots[desired] because we
738                 // already sorted the qualifying slots to the front.)
739                 if desired >= len(slots) || !bal.mountsByClass[class][slots[desired].mnt] {
740                         cs := classState[class]
741                         cs.unachievable = true
742                         classState[class] = cs
743                 }
744
745                 // Avoid deleting wanted replicas from devices that
746                 // are mounted on multiple servers -- even if they
747                 // haven't already been added to unsafeToDelete
748                 // because the servers report different Mtimes.
749                 for _, slot := range slots {
750                         if slot.repl != nil && wantDev[slot.mnt.DeviceID] {
751                                 unsafeToDelete[slot.repl.Mtime] = true
752                         }
753                 }
754         }
755
756         // TODO: If multiple replicas are trashable, prefer the oldest
757         // replica that doesn't have a timestamp collision with
758         // others.
759
760         countedDev := map[string]bool{}
761         var have, want int
762         for _, slot := range slots {
763                 if countedDev[slot.mnt.DeviceID] {
764                         continue
765                 }
766                 if slot.want {
767                         want += slot.mnt.Replication
768                 }
769                 if slot.repl != nil {
770                         have += slot.mnt.Replication
771                 }
772                 if slot.mnt.DeviceID != "" {
773                         countedDev[slot.mnt.DeviceID] = true
774                 }
775         }
776
777         var changes []string
778         for _, slot := range slots {
779                 // TODO: request a Touch if Mtime is duplicated.
780                 var change int
781                 switch {
782                 case !underreplicated && !slot.want && slot.repl != nil && slot.repl.Mtime < bal.MinMtime && !unsafeToDelete[slot.repl.Mtime]:
783                         slot.mnt.KeepService.AddTrash(Trash{
784                                 SizedDigest: blkid,
785                                 Mtime:       slot.repl.Mtime,
786                                 From:        slot.mnt,
787                         })
788                         change = changeTrash
789                 case len(blk.Replicas) > 0 && slot.repl == nil && slot.want && !slot.mnt.ReadOnly:
790                         slot.mnt.KeepService.AddPull(Pull{
791                                 SizedDigest: blkid,
792                                 From:        blk.Replicas[0].KeepMount.KeepService,
793                                 To:          slot.mnt,
794                         })
795                         change = changePull
796                 case slot.repl != nil:
797                         change = changeStay
798                 default:
799                         change = changeNone
800                 }
801                 if bal.Dumper != nil {
802                         var mtime int64
803                         if slot.repl != nil {
804                                 mtime = slot.repl.Mtime
805                         }
806                         srv := slot.mnt.KeepService
807                         changes = append(changes, fmt.Sprintf("%s:%d/%s=%s,%d", srv.ServiceHost, srv.ServicePort, slot.mnt.UUID, changeName[change], mtime))
808                 }
809         }
810         if bal.Dumper != nil {
811                 bal.Dumper.Printf("%s refs=%d have=%d want=%v %v %v", blkid, blk.RefCount, have, want, blk.Desired, changes)
812         }
813         return balanceResult{
814                 blk:        blk,
815                 blkid:      blkid,
816                 have:       have,
817                 want:       want,
818                 classState: classState,
819         }
820 }
821
822 type blocksNBytes struct {
823         replicas int
824         blocks   int
825         bytes    int64
826 }
827
828 func (bb blocksNBytes) String() string {
829         return fmt.Sprintf("%d replicas (%d blocks, %d bytes)", bb.replicas, bb.blocks, bb.bytes)
830 }
831
832 type balancerStats struct {
833         lost          blocksNBytes
834         overrep       blocksNBytes
835         unref         blocksNBytes
836         garbage       blocksNBytes
837         underrep      blocksNBytes
838         unachievable  blocksNBytes
839         justright     blocksNBytes
840         desired       blocksNBytes
841         current       blocksNBytes
842         pulls         int
843         trashes       int
844         replHistogram []int
845         classStats    map[string]replicationStats
846
847         // collectionBytes / collectionBlockBytes = deduplication ratio
848         collectionBytes      int64 // sum(bytes in referenced blocks) across all collections
849         collectionBlockBytes int64 // sum(block size) across all blocks referenced by collections
850         collectionBlockRefs  int64 // sum(number of blocks referenced) across all collections
851         collectionBlocks     int64 // number of blocks referenced by any collection
852 }
853
854 func (s *balancerStats) dedupByteRatio() float64 {
855         if s.collectionBlockBytes == 0 {
856                 return 0
857         }
858         return float64(s.collectionBytes) / float64(s.collectionBlockBytes)
859 }
860
861 func (s *balancerStats) dedupBlockRatio() float64 {
862         if s.collectionBlocks == 0 {
863                 return 0
864         }
865         return float64(s.collectionBlockRefs) / float64(s.collectionBlocks)
866 }
867
868 type replicationStats struct {
869         desired      blocksNBytes
870         surplus      blocksNBytes
871         short        blocksNBytes
872         unachievable blocksNBytes
873 }
874
875 type balancedBlockState struct {
876         desired      int
877         surplus      int
878         unachievable bool
879 }
880
881 func (bal *Balancer) collectStatistics(results <-chan balanceResult) {
882         var s balancerStats
883         s.replHistogram = make([]int, 2)
884         s.classStats = make(map[string]replicationStats, len(bal.classes))
885         for result := range results {
886                 surplus := result.have - result.want
887                 bytes := result.blkid.Size()
888
889                 if rc := int64(result.blk.RefCount); rc > 0 {
890                         s.collectionBytes += rc * bytes
891                         s.collectionBlockBytes += bytes
892                         s.collectionBlockRefs += rc
893                         s.collectionBlocks++
894                 }
895
896                 for class, state := range result.classState {
897                         cs := s.classStats[class]
898                         if state.unachievable {
899                                 cs.unachievable.blocks++
900                                 cs.unachievable.bytes += bytes
901                         }
902                         if state.desired > 0 {
903                                 cs.desired.replicas += state.desired
904                                 cs.desired.blocks++
905                                 cs.desired.bytes += bytes * int64(state.desired)
906                         }
907                         if state.surplus > 0 {
908                                 cs.surplus.replicas += state.surplus
909                                 cs.surplus.blocks++
910                                 cs.surplus.bytes += bytes * int64(state.surplus)
911                         } else if state.surplus < 0 {
912                                 cs.short.replicas += -state.surplus
913                                 cs.short.blocks++
914                                 cs.short.bytes += bytes * int64(-state.surplus)
915                         }
916                         s.classStats[class] = cs
917                 }
918
919                 switch {
920                 case result.have == 0 && result.want > 0:
921                         s.lost.replicas -= surplus
922                         s.lost.blocks++
923                         s.lost.bytes += bytes * int64(-surplus)
924                         fmt.Fprintf(bal.lostBlocks, "%s\n", strings.SplitN(string(result.blkid), "+", 2)[0])
925                 case surplus < 0:
926                         s.underrep.replicas -= surplus
927                         s.underrep.blocks++
928                         s.underrep.bytes += bytes * int64(-surplus)
929                 case surplus > 0 && result.want == 0:
930                         counter := &s.garbage
931                         for _, r := range result.blk.Replicas {
932                                 if r.Mtime >= bal.MinMtime {
933                                         counter = &s.unref
934                                         break
935                                 }
936                         }
937                         counter.replicas += surplus
938                         counter.blocks++
939                         counter.bytes += bytes * int64(surplus)
940                 case surplus > 0:
941                         s.overrep.replicas += surplus
942                         s.overrep.blocks++
943                         s.overrep.bytes += bytes * int64(result.have-result.want)
944                 default:
945                         s.justright.replicas += result.want
946                         s.justright.blocks++
947                         s.justright.bytes += bytes * int64(result.want)
948                 }
949
950                 if result.want > 0 {
951                         s.desired.replicas += result.want
952                         s.desired.blocks++
953                         s.desired.bytes += bytes * int64(result.want)
954                 }
955                 if result.have > 0 {
956                         s.current.replicas += result.have
957                         s.current.blocks++
958                         s.current.bytes += bytes * int64(result.have)
959                 }
960
961                 for len(s.replHistogram) <= result.have {
962                         s.replHistogram = append(s.replHistogram, 0)
963                 }
964                 s.replHistogram[result.have]++
965         }
966         for _, srv := range bal.KeepServices {
967                 s.pulls += len(srv.ChangeSet.Pulls)
968                 s.trashes += len(srv.ChangeSet.Trashes)
969         }
970         bal.stats = s
971         bal.Metrics.UpdateStats(s)
972 }
973
974 // PrintStatistics writes statistics about the computed changes to
975 // bal.Logger. It should not be called until ComputeChangeSets has
976 // finished.
977 func (bal *Balancer) PrintStatistics() {
978         bal.logf("===")
979         bal.logf("%s lost (0=have<want)", bal.stats.lost)
980         bal.logf("%s underreplicated (0<have<want)", bal.stats.underrep)
981         bal.logf("%s just right (have=want)", bal.stats.justright)
982         bal.logf("%s overreplicated (have>want>0)", bal.stats.overrep)
983         bal.logf("%s unreferenced (have>want=0, new)", bal.stats.unref)
984         bal.logf("%s garbage (have>want=0, old)", bal.stats.garbage)
985         for _, class := range bal.classes {
986                 cs := bal.stats.classStats[class]
987                 bal.logf("===")
988                 bal.logf("storage class %q: %s desired", class, cs.desired)
989                 bal.logf("storage class %q: %s short", class, cs.short)
990                 bal.logf("storage class %q: %s surplus", class, cs.surplus)
991                 bal.logf("storage class %q: %s unachievable", class, cs.unachievable)
992         }
993         bal.logf("===")
994         bal.logf("%s total commitment (excluding unreferenced)", bal.stats.desired)
995         bal.logf("%s total usage", bal.stats.current)
996         bal.logf("===")
997         for _, srv := range bal.KeepServices {
998                 bal.logf("%s: %v\n", srv, srv.ChangeSet)
999         }
1000         bal.logf("===")
1001         bal.printHistogram(60)
1002         bal.logf("===")
1003 }
1004
1005 func (bal *Balancer) printHistogram(hashColumns int) {
1006         bal.logf("Replication level distribution (counting N replicas on a single server as N):")
1007         maxCount := 0
1008         for _, count := range bal.stats.replHistogram {
1009                 if maxCount < count {
1010                         maxCount = count
1011                 }
1012         }
1013         hashes := strings.Repeat("#", hashColumns)
1014         countWidth := 1 + int(math.Log10(float64(maxCount+1)))
1015         scaleCount := 10 * float64(hashColumns) / math.Floor(1+10*math.Log10(float64(maxCount+1)))
1016         for repl, count := range bal.stats.replHistogram {
1017                 nHashes := int(scaleCount * math.Log10(float64(count+1)))
1018                 bal.logf("%2d: %*d %s", repl, countWidth, count, hashes[:nHashes])
1019         }
1020 }
1021
1022 // CheckSanityLate checks for configuration and runtime errors after
1023 // GetCurrentState() and ComputeChangeSets() have finished.
1024 //
1025 // If it returns an error, it is dangerous to run any Commit methods.
1026 func (bal *Balancer) CheckSanityLate() error {
1027         if bal.errors != nil {
1028                 for _, err := range bal.errors {
1029                         bal.logf("deferred error: %v", err)
1030                 }
1031                 return fmt.Errorf("cannot proceed safely after deferred errors")
1032         }
1033
1034         if bal.collScanned == 0 {
1035                 return fmt.Errorf("received zero collections")
1036         }
1037
1038         anyDesired := false
1039         bal.BlockStateMap.Apply(func(_ arvados.SizedDigest, blk *BlockState) {
1040                 for _, desired := range blk.Desired {
1041                         if desired > 0 {
1042                                 anyDesired = true
1043                                 break
1044                         }
1045                 }
1046         })
1047         if !anyDesired {
1048                 return fmt.Errorf("zero blocks have desired replication>0")
1049         }
1050
1051         if dr := bal.DefaultReplication; dr < 1 {
1052                 return fmt.Errorf("Default replication (%d) is less than 1", dr)
1053         }
1054
1055         // TODO: no two services have identical indexes
1056         // TODO: no collisions (same md5, different size)
1057         return nil
1058 }
1059
1060 // CommitPulls sends the computed lists of pull requests to the
1061 // keepstore servers. This has the effect of increasing replication of
1062 // existing blocks that are either underreplicated or poorly
1063 // distributed according to rendezvous hashing.
1064 func (bal *Balancer) CommitPulls(c *arvados.Client) error {
1065         defer bal.time("send_pull_lists", "wall clock time to send pull lists")()
1066         return bal.commitAsync(c, "send pull list",
1067                 func(srv *KeepService) error {
1068                         return srv.CommitPulls(c)
1069                 })
1070 }
1071
1072 // CommitTrash sends the computed lists of trash requests to the
1073 // keepstore servers. This has the effect of deleting blocks that are
1074 // overreplicated or unreferenced.
1075 func (bal *Balancer) CommitTrash(c *arvados.Client) error {
1076         defer bal.time("send_trash_lists", "wall clock time to send trash lists")()
1077         return bal.commitAsync(c, "send trash list",
1078                 func(srv *KeepService) error {
1079                         return srv.CommitTrash(c)
1080                 })
1081 }
1082
1083 func (bal *Balancer) commitAsync(c *arvados.Client, label string, f func(srv *KeepService) error) error {
1084         errs := make(chan error)
1085         for _, srv := range bal.KeepServices {
1086                 go func(srv *KeepService) {
1087                         var err error
1088                         defer func() { errs <- err }()
1089                         label := fmt.Sprintf("%s: %v", srv, label)
1090                         err = f(srv)
1091                         if err != nil {
1092                                 err = fmt.Errorf("%s: %v", label, err)
1093                         }
1094                 }(srv)
1095         }
1096         var lastErr error
1097         for range bal.KeepServices {
1098                 if err := <-errs; err != nil {
1099                         bal.logf("%v", err)
1100                         lastErr = err
1101                 }
1102         }
1103         close(errs)
1104         return lastErr
1105 }
1106
1107 func (bal *Balancer) logf(f string, args ...interface{}) {
1108         if bal.Logger != nil {
1109                 bal.Logger.Printf(f, args...)
1110         }
1111 }
1112
1113 func (bal *Balancer) time(name, help string) func() {
1114         observer := bal.Metrics.DurationObserver(name+"_seconds", help)
1115         t0 := time.Now()
1116         bal.Logger.Printf("%s: start", name)
1117         return func() {
1118                 dur := time.Since(t0)
1119                 observer.Observe(dur.Seconds())
1120                 bal.Logger.Printf("%s: took %vs", name, dur.Seconds())
1121         }
1122 }
1123
1124 // Rendezvous hash sort function. Less efficient than sorting on
1125 // precomputed rendezvous hashes, but also rarely used.
1126 func rendezvousLess(i, j string, blkid arvados.SizedDigest) bool {
1127         a := md5.Sum([]byte(string(blkid[:32]) + i))
1128         b := md5.Sum([]byte(string(blkid[:32]) + j))
1129         return bytes.Compare(a[:], b[:]) < 0
1130 }