67021bef36b0da7247a4ca32ea74e79234bc0311
[arvados.git] / services / keep-balance / balance.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "context"
10         "crypto/md5"
11         "fmt"
12         "io"
13         "io/ioutil"
14         "log"
15         "math"
16         "os"
17         "runtime"
18         "sort"
19         "strings"
20         "sync"
21         "syscall"
22         "time"
23
24         "git.arvados.org/arvados.git/sdk/go/arvados"
25         "git.arvados.org/arvados.git/sdk/go/keepclient"
26         "github.com/jmoiron/sqlx"
27         "github.com/sirupsen/logrus"
28 )
29
30 // Balancer compares the contents of keepstore servers with the
31 // collections stored in Arvados, and issues pull/trash requests
32 // needed to get (closer to) the optimal data layout.
33 //
34 // In the optimal data layout: every data block referenced by a
35 // collection is replicated at least as many times as desired by the
36 // collection; there are no unreferenced data blocks older than
37 // BlobSignatureTTL; and all N existing replicas of a given data block
38 // are in the N best positions in rendezvous probe order.
39 type Balancer struct {
40         DB      *sqlx.DB
41         Logger  logrus.FieldLogger
42         Dumper  logrus.FieldLogger
43         Metrics *metrics
44
45         LostBlocksFile string
46
47         *BlockStateMap
48         KeepServices       map[string]*KeepService
49         DefaultReplication int
50         MinMtime           int64
51
52         classes       []string
53         mounts        int
54         mountsByClass map[string]map[*KeepMount]bool
55         collScanned   int
56         serviceRoots  map[string]string
57         errors        []error
58         stats         balancerStats
59         mutex         sync.Mutex
60         lostBlocks    io.Writer
61 }
62
63 // Run performs a balance operation using the given config and
64 // runOptions, and returns RunOptions suitable for passing to a
65 // subsequent balance operation.
66 //
67 // Run should only be called once on a given Balancer object.
68 //
69 // Typical usage:
70 //
71 //   runOptions, err = (&Balancer{}).Run(config, runOptions)
72 func (bal *Balancer) Run(client *arvados.Client, cluster *arvados.Cluster, runOptions RunOptions) (nextRunOptions RunOptions, err error) {
73         nextRunOptions = runOptions
74
75         defer bal.time("sweep", "wall clock time to run one full sweep")()
76
77         ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(cluster.Collections.BalanceTimeout.Duration()))
78         defer cancel()
79
80         var lbFile *os.File
81         if bal.LostBlocksFile != "" {
82                 tmpfn := bal.LostBlocksFile + ".tmp"
83                 lbFile, err = os.OpenFile(tmpfn, os.O_CREATE|os.O_WRONLY, 0777)
84                 if err != nil {
85                         return
86                 }
87                 defer lbFile.Close()
88                 err = syscall.Flock(int(lbFile.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
89                 if err != nil {
90                         return
91                 }
92                 defer func() {
93                         // Remove the tempfile only if we didn't get
94                         // as far as successfully renaming it.
95                         if lbFile != nil {
96                                 os.Remove(tmpfn)
97                         }
98                 }()
99                 bal.lostBlocks = lbFile
100         } else {
101                 bal.lostBlocks = ioutil.Discard
102         }
103
104         err = bal.DiscoverKeepServices(client)
105         if err != nil {
106                 return
107         }
108
109         for _, srv := range bal.KeepServices {
110                 err = srv.discoverMounts(client)
111                 if err != nil {
112                         return
113                 }
114         }
115         bal.cleanupMounts()
116
117         if err = bal.CheckSanityEarly(client); err != nil {
118                 return
119         }
120
121         // On a big site, indexing and sending trash/pull lists can
122         // take much longer than the usual 5 minute client
123         // timeout. From here on, we rely on the context deadline
124         // instead, aborting the entire operation if any part takes
125         // too long.
126         client.Timeout = 0
127
128         rs := bal.rendezvousState()
129         if runOptions.CommitTrash && rs != runOptions.SafeRendezvousState {
130                 if runOptions.SafeRendezvousState != "" {
131                         bal.logf("notice: KeepServices list has changed since last run")
132                 }
133                 bal.logf("clearing existing trash lists, in case the new rendezvous order differs from previous run")
134                 if err = bal.ClearTrashLists(ctx, client); err != nil {
135                         return
136                 }
137                 // The current rendezvous state becomes "safe" (i.e.,
138                 // OK to compute changes for that state without
139                 // clearing existing trash lists) only now, after we
140                 // succeed in clearing existing trash lists.
141                 nextRunOptions.SafeRendezvousState = rs
142         }
143
144         if err = bal.GetCurrentState(ctx, client, cluster.Collections.BalanceCollectionBatch, cluster.Collections.BalanceCollectionBuffers); err != nil {
145                 return
146         }
147         bal.ComputeChangeSets()
148         bal.PrintStatistics()
149         if err = bal.CheckSanityLate(); err != nil {
150                 return
151         }
152         if lbFile != nil {
153                 err = lbFile.Sync()
154                 if err != nil {
155                         return
156                 }
157                 err = os.Rename(bal.LostBlocksFile+".tmp", bal.LostBlocksFile)
158                 if err != nil {
159                         return
160                 }
161                 lbFile = nil
162         }
163         if runOptions.CommitPulls {
164                 err = bal.CommitPulls(ctx, client)
165                 if err != nil {
166                         // Skip trash if we can't pull. (Too cautious?)
167                         return
168                 }
169         }
170         if runOptions.CommitTrash {
171                 err = bal.CommitTrash(ctx, client)
172                 if err != nil {
173                         return
174                 }
175         }
176         if runOptions.CommitConfirmedFields {
177                 err = bal.updateCollections(ctx, client, cluster)
178                 if err != nil {
179                         return
180                 }
181         }
182         return
183 }
184
185 // SetKeepServices sets the list of KeepServices to operate on.
186 func (bal *Balancer) SetKeepServices(srvList arvados.KeepServiceList) error {
187         bal.KeepServices = make(map[string]*KeepService)
188         for _, srv := range srvList.Items {
189                 bal.KeepServices[srv.UUID] = &KeepService{
190                         KeepService: srv,
191                         ChangeSet:   &ChangeSet{},
192                 }
193         }
194         return nil
195 }
196
197 // DiscoverKeepServices sets the list of KeepServices by calling the
198 // API to get a list of all services, and selecting the ones whose
199 // ServiceType is "disk"
200 func (bal *Balancer) DiscoverKeepServices(c *arvados.Client) error {
201         bal.KeepServices = make(map[string]*KeepService)
202         return c.EachKeepService(func(srv arvados.KeepService) error {
203                 if srv.ServiceType == "disk" {
204                         bal.KeepServices[srv.UUID] = &KeepService{
205                                 KeepService: srv,
206                                 ChangeSet:   &ChangeSet{},
207                         }
208                 } else {
209                         bal.logf("skipping %v with service type %q", srv.UUID, srv.ServiceType)
210                 }
211                 return nil
212         })
213 }
214
215 func (bal *Balancer) cleanupMounts() {
216         rwdev := map[string]*KeepService{}
217         for _, srv := range bal.KeepServices {
218                 for _, mnt := range srv.mounts {
219                         if !mnt.ReadOnly && mnt.DeviceID != "" {
220                                 rwdev[mnt.DeviceID] = srv
221                         }
222                 }
223         }
224         // Drop the readonly mounts whose device is mounted RW
225         // elsewhere.
226         for _, srv := range bal.KeepServices {
227                 var dedup []*KeepMount
228                 for _, mnt := range srv.mounts {
229                         if mnt.ReadOnly && rwdev[mnt.DeviceID] != nil {
230                                 bal.logf("skipping srv %s readonly mount %q because same device %q is mounted read-write on srv %s", srv, mnt.UUID, mnt.DeviceID, rwdev[mnt.DeviceID])
231                         } else {
232                                 dedup = append(dedup, mnt)
233                         }
234                 }
235                 srv.mounts = dedup
236         }
237         for _, srv := range bal.KeepServices {
238                 for _, mnt := range srv.mounts {
239                         if mnt.Replication <= 0 {
240                                 log.Printf("%s: mount %s reports replication=%d, using replication=1", srv, mnt.UUID, mnt.Replication)
241                                 mnt.Replication = 1
242                         }
243                 }
244         }
245 }
246
247 // CheckSanityEarly checks for configuration and runtime errors that
248 // can be detected before GetCurrentState() and ComputeChangeSets()
249 // are called.
250 //
251 // If it returns an error, it is pointless to run GetCurrentState or
252 // ComputeChangeSets: after doing so, the statistics would be
253 // meaningless and it would be dangerous to run any Commit methods.
254 func (bal *Balancer) CheckSanityEarly(c *arvados.Client) error {
255         u, err := c.CurrentUser()
256         if err != nil {
257                 return fmt.Errorf("CurrentUser(): %v", err)
258         }
259         if !u.IsActive || !u.IsAdmin {
260                 return fmt.Errorf("current user (%s) is not an active admin user", u.UUID)
261         }
262         for _, srv := range bal.KeepServices {
263                 if srv.ServiceType == "proxy" {
264                         return fmt.Errorf("config error: %s: proxy servers cannot be balanced", srv)
265                 }
266         }
267
268         var checkPage arvados.CollectionList
269         if err = c.RequestAndDecode(&checkPage, "GET", "arvados/v1/collections", nil, arvados.ResourceListParams{
270                 Limit:              new(int),
271                 Count:              "exact",
272                 IncludeTrash:       true,
273                 IncludeOldVersions: true,
274                 Filters: []arvados.Filter{{
275                         Attr:     "modified_at",
276                         Operator: "=",
277                         Operand:  nil,
278                 }},
279         }); err != nil {
280                 return err
281         } else if n := checkPage.ItemsAvailable; n > 0 {
282                 return fmt.Errorf("%d collections exist with null modified_at; cannot fetch reliably", n)
283         }
284
285         return nil
286 }
287
288 // rendezvousState returns a fingerprint (e.g., a sorted list of
289 // UUID+host+port) of the current set of keep services.
290 func (bal *Balancer) rendezvousState() string {
291         srvs := make([]string, 0, len(bal.KeepServices))
292         for _, srv := range bal.KeepServices {
293                 srvs = append(srvs, srv.String())
294         }
295         sort.Strings(srvs)
296         return strings.Join(srvs, "; ")
297 }
298
299 // ClearTrashLists sends an empty trash list to each keep
300 // service. Calling this before GetCurrentState avoids races.
301 //
302 // When a block appears in an index, we assume that replica will still
303 // exist after we delete other replicas on other servers. However,
304 // it's possible that a previous rebalancing operation made different
305 // decisions (e.g., servers were added/removed, and rendezvous order
306 // changed). In this case, the replica might already be on that
307 // server's trash list, and it might be deleted before we send a
308 // replacement trash list.
309 //
310 // We avoid this problem if we clear all trash lists before getting
311 // indexes. (We also assume there is only one rebalancing process
312 // running at a time.)
313 func (bal *Balancer) ClearTrashLists(ctx context.Context, c *arvados.Client) error {
314         for _, srv := range bal.KeepServices {
315                 srv.ChangeSet = &ChangeSet{}
316         }
317         return bal.CommitTrash(ctx, c)
318 }
319
320 // GetCurrentState determines the current replication state, and the
321 // desired replication level, for every block that is either
322 // retrievable or referenced.
323 //
324 // It determines the current replication state by reading the block index
325 // from every known Keep service.
326 //
327 // It determines the desired replication level by retrieving all
328 // collection manifests in the database (API server).
329 //
330 // It encodes the resulting information in BlockStateMap.
331 func (bal *Balancer) GetCurrentState(ctx context.Context, c *arvados.Client, pageSize, bufs int) error {
332         ctx, cancel := context.WithCancel(ctx)
333         defer cancel()
334
335         defer bal.time("get_state", "wall clock time to get current state")()
336         bal.BlockStateMap = NewBlockStateMap()
337
338         dd, err := c.DiscoveryDocument()
339         if err != nil {
340                 return err
341         }
342         bal.DefaultReplication = dd.DefaultCollectionReplication
343         bal.MinMtime = time.Now().UnixNano() - dd.BlobSignatureTTL*1e9
344
345         errs := make(chan error, 1)
346         wg := sync.WaitGroup{}
347
348         // When a device is mounted more than once, we will get its
349         // index only once, and call AddReplicas on all of the mounts.
350         // equivMount keys are the mounts that will be indexed, and
351         // each value is a list of mounts to apply the received index
352         // to.
353         equivMount := map[*KeepMount][]*KeepMount{}
354         // deviceMount maps each device ID to the one mount that will
355         // be indexed for that device.
356         deviceMount := map[string]*KeepMount{}
357         for _, srv := range bal.KeepServices {
358                 for _, mnt := range srv.mounts {
359                         equiv := deviceMount[mnt.DeviceID]
360                         if equiv == nil {
361                                 equiv = mnt
362                                 if mnt.DeviceID != "" {
363                                         deviceMount[mnt.DeviceID] = equiv
364                                 }
365                         }
366                         equivMount[equiv] = append(equivMount[equiv], mnt)
367                 }
368         }
369
370         // Start one goroutine for each (non-redundant) mount:
371         // retrieve the index, and add the returned blocks to
372         // BlockStateMap.
373         for _, mounts := range equivMount {
374                 wg.Add(1)
375                 go func(mounts []*KeepMount) {
376                         defer wg.Done()
377                         bal.logf("mount %s: retrieve index from %s", mounts[0], mounts[0].KeepService)
378                         idx, err := mounts[0].KeepService.IndexMount(ctx, c, mounts[0].UUID, "")
379                         if err != nil {
380                                 select {
381                                 case errs <- fmt.Errorf("%s: retrieve index: %v", mounts[0], err):
382                                 default:
383                                 }
384                                 cancel()
385                                 return
386                         }
387                         if len(errs) > 0 {
388                                 // Some other goroutine encountered an
389                                 // error -- any further effort here
390                                 // will be wasted.
391                                 return
392                         }
393                         for _, mount := range mounts {
394                                 bal.logf("%s: add %d entries to map", mount, len(idx))
395                                 bal.BlockStateMap.AddReplicas(mount, idx)
396                                 bal.logf("%s: added %d entries to map at %dx (%d replicas)", mount, len(idx), mount.Replication, len(idx)*mount.Replication)
397                         }
398                         bal.logf("mount %s: index done", mounts[0])
399                 }(mounts)
400         }
401
402         // collQ buffers incoming collections so we can start fetching
403         // the next page without waiting for the current page to
404         // finish processing.
405         collQ := make(chan arvados.Collection, bufs)
406
407         // Start a goroutine to process collections. (We could use a
408         // worker pool here, but even with a single worker we already
409         // process collections much faster than we can retrieve them.)
410         wg.Add(1)
411         go func() {
412                 defer wg.Done()
413                 for coll := range collQ {
414                         err := bal.addCollection(coll)
415                         if err != nil || len(errs) > 0 {
416                                 select {
417                                 case errs <- err:
418                                 default:
419                                 }
420                                 for range collQ {
421                                 }
422                                 cancel()
423                                 return
424                         }
425                         bal.collScanned++
426                 }
427         }()
428
429         // Start a goroutine to retrieve all collections from the
430         // Arvados database and send them to collQ for processing.
431         wg.Add(1)
432         go func() {
433                 defer wg.Done()
434                 err = EachCollection(ctx, bal.DB, c,
435                         func(coll arvados.Collection) error {
436                                 collQ <- coll
437                                 if len(errs) > 0 {
438                                         // some other GetCurrentState
439                                         // error happened: no point
440                                         // getting any more
441                                         // collections.
442                                         return fmt.Errorf("")
443                                 }
444                                 return nil
445                         }, func(done, total int) {
446                                 bal.logf("collections: %d/%d", done, total)
447                         })
448                 close(collQ)
449                 if err != nil {
450                         select {
451                         case errs <- err:
452                         default:
453                         }
454                         cancel()
455                 }
456         }()
457
458         wg.Wait()
459         if len(errs) > 0 {
460                 return <-errs
461         }
462         return nil
463 }
464
465 func (bal *Balancer) addCollection(coll arvados.Collection) error {
466         blkids, err := coll.SizedDigests()
467         if err != nil {
468                 return fmt.Errorf("%v: %v", coll.UUID, err)
469         }
470         repl := bal.DefaultReplication
471         if coll.ReplicationDesired != nil {
472                 repl = *coll.ReplicationDesired
473         }
474         bal.Logger.Debugf("%v: %d blocks x%d", coll.UUID, len(blkids), repl)
475         // Pass pdh to IncreaseDesired only if LostBlocksFile is being
476         // written -- otherwise it's just a waste of memory.
477         pdh := ""
478         if bal.LostBlocksFile != "" {
479                 pdh = coll.PortableDataHash
480         }
481         bal.BlockStateMap.IncreaseDesired(pdh, coll.StorageClassesDesired, repl, blkids)
482         return nil
483 }
484
485 // ComputeChangeSets compares, for each known block, the current and
486 // desired replication states. If it is possible to get closer to the
487 // desired state by copying or deleting blocks, it adds those changes
488 // to the relevant KeepServices' ChangeSets.
489 //
490 // It does not actually apply any of the computed changes.
491 func (bal *Balancer) ComputeChangeSets() {
492         // This just calls balanceBlock() once for each block, using a
493         // pool of worker goroutines.
494         defer bal.time("changeset_compute", "wall clock time to compute changesets")()
495         bal.setupLookupTables()
496
497         type balanceTask struct {
498                 blkid arvados.SizedDigest
499                 blk   *BlockState
500         }
501         workers := runtime.GOMAXPROCS(-1)
502         todo := make(chan balanceTask, workers)
503         go func() {
504                 bal.BlockStateMap.Apply(func(blkid arvados.SizedDigest, blk *BlockState) {
505                         todo <- balanceTask{
506                                 blkid: blkid,
507                                 blk:   blk,
508                         }
509                 })
510                 close(todo)
511         }()
512         results := make(chan balanceResult, workers)
513         go func() {
514                 var wg sync.WaitGroup
515                 for i := 0; i < workers; i++ {
516                         wg.Add(1)
517                         go func() {
518                                 for work := range todo {
519                                         results <- bal.balanceBlock(work.blkid, work.blk)
520                                 }
521                                 wg.Done()
522                         }()
523                 }
524                 wg.Wait()
525                 close(results)
526         }()
527         bal.collectStatistics(results)
528 }
529
530 func (bal *Balancer) setupLookupTables() {
531         bal.serviceRoots = make(map[string]string)
532         bal.classes = defaultClasses
533         bal.mountsByClass = map[string]map[*KeepMount]bool{"default": {}}
534         bal.mounts = 0
535         for _, srv := range bal.KeepServices {
536                 bal.serviceRoots[srv.UUID] = srv.UUID
537                 for _, mnt := range srv.mounts {
538                         bal.mounts++
539
540                         // All mounts on a read-only service are
541                         // effectively read-only.
542                         mnt.ReadOnly = mnt.ReadOnly || srv.ReadOnly
543
544                         if len(mnt.StorageClasses) == 0 {
545                                 bal.mountsByClass["default"][mnt] = true
546                                 continue
547                         }
548                         for class := range mnt.StorageClasses {
549                                 if mbc := bal.mountsByClass[class]; mbc == nil {
550                                         bal.classes = append(bal.classes, class)
551                                         bal.mountsByClass[class] = map[*KeepMount]bool{mnt: true}
552                                 } else {
553                                         mbc[mnt] = true
554                                 }
555                         }
556                 }
557         }
558         // Consider classes in lexicographic order to avoid flapping
559         // between balancing runs.  The outcome of the "prefer a mount
560         // we're already planning to use for a different storage
561         // class" case in balanceBlock depends on the order classes
562         // are considered.
563         sort.Strings(bal.classes)
564 }
565
566 const (
567         changeStay = iota
568         changePull
569         changeTrash
570         changeNone
571 )
572
573 var changeName = map[int]string{
574         changeStay:  "stay",
575         changePull:  "pull",
576         changeTrash: "trash",
577         changeNone:  "none",
578 }
579
580 type balancedBlockState struct {
581         needed       int
582         unneeded     int
583         pulling      int
584         unachievable bool
585 }
586
587 type balanceResult struct {
588         blk        *BlockState
589         blkid      arvados.SizedDigest
590         lost       bool
591         blockState balancedBlockState
592         classState map[string]balancedBlockState
593 }
594
595 type slot struct {
596         mnt  *KeepMount // never nil
597         repl *Replica   // replica already stored here (or nil)
598         want bool       // we should pull/leave a replica here
599 }
600
601 // balanceBlock compares current state to desired state for a single
602 // block, and makes the appropriate ChangeSet calls.
603 func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) balanceResult {
604         bal.Logger.Debugf("balanceBlock: %v %+v", blkid, blk)
605
606         // Build a list of all slots (one per mounted volume).
607         slots := make([]slot, 0, bal.mounts)
608         for _, srv := range bal.KeepServices {
609                 for _, mnt := range srv.mounts {
610                         var repl *Replica
611                         for r := range blk.Replicas {
612                                 if blk.Replicas[r].KeepMount == mnt {
613                                         repl = &blk.Replicas[r]
614                                 }
615                         }
616                         // Initial value of "want" is "have, and can't
617                         // delete". These untrashable replicas get
618                         // prioritized when sorting slots: otherwise,
619                         // non-optimal readonly copies would cause us
620                         // to overreplicate.
621                         slots = append(slots, slot{
622                                 mnt:  mnt,
623                                 repl: repl,
624                                 want: repl != nil && mnt.ReadOnly,
625                         })
626                 }
627         }
628
629         uuids := keepclient.NewRootSorter(bal.serviceRoots, string(blkid[:32])).GetSortedRoots()
630         srvRendezvous := make(map[*KeepService]int, len(uuids))
631         for i, uuid := range uuids {
632                 srv := bal.KeepServices[uuid]
633                 srvRendezvous[srv] = i
634         }
635
636         // Below we set underreplicated=true if we find any storage
637         // class that's currently underreplicated -- in that case we
638         // won't want to trash any replicas.
639         underreplicated := false
640
641         unsafeToDelete := make(map[int64]bool, len(slots))
642         for _, class := range bal.classes {
643                 desired := blk.Desired[class]
644                 if desired == 0 {
645                         continue
646                 }
647
648                 // Sort the slots by desirability.
649                 sort.Slice(slots, func(i, j int) bool {
650                         si, sj := slots[i], slots[j]
651                         if classi, classj := bal.mountsByClass[class][si.mnt], bal.mountsByClass[class][sj.mnt]; classi != classj {
652                                 // Prefer a mount that satisfies the
653                                 // desired class.
654                                 return bal.mountsByClass[class][si.mnt]
655                         } else if si.want != sj.want {
656                                 // Prefer a mount that will have a
657                                 // replica no matter what we do here
658                                 // -- either because it already has an
659                                 // untrashable replica, or because we
660                                 // already need it to satisfy a
661                                 // different storage class.
662                                 return si.want
663                         } else if orderi, orderj := srvRendezvous[si.mnt.KeepService], srvRendezvous[sj.mnt.KeepService]; orderi != orderj {
664                                 // Prefer a better rendezvous
665                                 // position.
666                                 return orderi < orderj
667                         } else if repli, replj := si.repl != nil, sj.repl != nil; repli != replj {
668                                 // Prefer a mount that already has a
669                                 // replica.
670                                 return repli
671                         } else {
672                                 // If pull/trash turns out to be
673                                 // needed, distribute the
674                                 // new/remaining replicas uniformly
675                                 // across qualifying mounts on a given
676                                 // server.
677                                 return rendezvousLess(si.mnt.DeviceID, sj.mnt.DeviceID, blkid)
678                         }
679                 })
680
681                 // Servers/mounts/devices (with or without existing
682                 // replicas) that are part of the best achievable
683                 // layout for this storage class.
684                 wantSrv := map[*KeepService]bool{}
685                 wantMnt := map[*KeepMount]bool{}
686                 wantDev := map[string]bool{}
687                 // Positions (with existing replicas) that have been
688                 // protected (via unsafeToDelete) to ensure we don't
689                 // reduce replication below desired level when
690                 // trashing replicas that aren't optimal positions for
691                 // any storage class.
692                 protMnt := map[*KeepMount]bool{}
693                 // Replication planned so far (corresponds to wantMnt).
694                 replWant := 0
695                 // Protected replication (corresponds to protMnt).
696                 replProt := 0
697
698                 // trySlot tries using a slot to meet requirements,
699                 // and returns true if all requirements are met.
700                 trySlot := func(i int) bool {
701                         slot := slots[i]
702                         if wantMnt[slot.mnt] || wantDev[slot.mnt.DeviceID] {
703                                 // Already allocated a replica to this
704                                 // backend device, possibly on a
705                                 // different server.
706                                 return false
707                         }
708                         if replProt < desired && slot.repl != nil && !protMnt[slot.mnt] {
709                                 unsafeToDelete[slot.repl.Mtime] = true
710                                 protMnt[slot.mnt] = true
711                                 replProt += slot.mnt.Replication
712                         }
713                         if replWant < desired && (slot.repl != nil || !slot.mnt.ReadOnly) {
714                                 slots[i].want = true
715                                 wantSrv[slot.mnt.KeepService] = true
716                                 wantMnt[slot.mnt] = true
717                                 if slot.mnt.DeviceID != "" {
718                                         wantDev[slot.mnt.DeviceID] = true
719                                 }
720                                 replWant += slot.mnt.Replication
721                         }
722                         return replProt >= desired && replWant >= desired
723                 }
724
725                 // First try to achieve desired replication without
726                 // using the same server twice.
727                 done := false
728                 for i := 0; i < len(slots) && !done; i++ {
729                         if !wantSrv[slots[i].mnt.KeepService] {
730                                 done = trySlot(i)
731                         }
732                 }
733
734                 // If that didn't suffice, do another pass without the
735                 // "distinct services" restriction. (Achieving the
736                 // desired volume replication on fewer than the
737                 // desired number of services is better than
738                 // underreplicating.)
739                 for i := 0; i < len(slots) && !done; i++ {
740                         done = trySlot(i)
741                 }
742
743                 if !underreplicated {
744                         safe := 0
745                         for _, slot := range slots {
746                                 if slot.repl == nil || !bal.mountsByClass[class][slot.mnt] {
747                                         continue
748                                 }
749                                 if safe += slot.mnt.Replication; safe >= desired {
750                                         break
751                                 }
752                         }
753                         underreplicated = safe < desired
754                 }
755
756                 // Avoid deleting wanted replicas from devices that
757                 // are mounted on multiple servers -- even if they
758                 // haven't already been added to unsafeToDelete
759                 // because the servers report different Mtimes.
760                 for _, slot := range slots {
761                         if slot.repl != nil && wantDev[slot.mnt.DeviceID] {
762                                 unsafeToDelete[slot.repl.Mtime] = true
763                         }
764                 }
765         }
766
767         // TODO: If multiple replicas are trashable, prefer the oldest
768         // replica that doesn't have a timestamp collision with
769         // others.
770
771         for i, slot := range slots {
772                 // Don't trash (1) any replicas of an underreplicated
773                 // block, even if they're in the wrong positions, or
774                 // (2) any replicas whose Mtimes are identical to
775                 // needed replicas (in case we're really seeing the
776                 // same copy via different mounts).
777                 if slot.repl != nil && (underreplicated || unsafeToDelete[slot.repl.Mtime]) {
778                         slots[i].want = true
779                 }
780         }
781
782         classState := make(map[string]balancedBlockState, len(bal.classes))
783         for _, class := range bal.classes {
784                 classState[class] = computeBlockState(slots, bal.mountsByClass[class], len(blk.Replicas), blk.Desired[class])
785         }
786         blockState := computeBlockState(slots, nil, len(blk.Replicas), 0)
787
788         var lost bool
789         var changes []string
790         for _, slot := range slots {
791                 // TODO: request a Touch if Mtime is duplicated.
792                 var change int
793                 switch {
794                 case !slot.want && slot.repl != nil && slot.repl.Mtime < bal.MinMtime:
795                         slot.mnt.KeepService.AddTrash(Trash{
796                                 SizedDigest: blkid,
797                                 Mtime:       slot.repl.Mtime,
798                                 From:        slot.mnt,
799                         })
800                         change = changeTrash
801                 case slot.repl == nil && slot.want && len(blk.Replicas) == 0:
802                         lost = true
803                         change = changeNone
804                 case slot.repl == nil && slot.want && !slot.mnt.ReadOnly:
805                         slot.mnt.KeepService.AddPull(Pull{
806                                 SizedDigest: blkid,
807                                 From:        blk.Replicas[0].KeepMount.KeepService,
808                                 To:          slot.mnt,
809                         })
810                         change = changePull
811                 case slot.repl != nil:
812                         change = changeStay
813                 default:
814                         change = changeNone
815                 }
816                 if bal.Dumper != nil {
817                         var mtime int64
818                         if slot.repl != nil {
819                                 mtime = slot.repl.Mtime
820                         }
821                         srv := slot.mnt.KeepService
822                         changes = append(changes, fmt.Sprintf("%s:%d/%s=%s,%d", srv.ServiceHost, srv.ServicePort, slot.mnt.UUID, changeName[change], mtime))
823                 }
824         }
825         if bal.Dumper != nil {
826                 bal.Dumper.Printf("%s refs=%d needed=%d unneeded=%d pulling=%v %v %v", blkid, blk.RefCount, blockState.needed, blockState.unneeded, blockState.pulling, blk.Desired, changes)
827         }
828         return balanceResult{
829                 blk:        blk,
830                 blkid:      blkid,
831                 lost:       lost,
832                 blockState: blockState,
833                 classState: classState,
834         }
835 }
836
837 func computeBlockState(slots []slot, onlyCount map[*KeepMount]bool, have, needRepl int) (bbs balancedBlockState) {
838         repl := 0
839         countedDev := map[string]bool{}
840         for _, slot := range slots {
841                 if onlyCount != nil && !onlyCount[slot.mnt] {
842                         continue
843                 }
844                 if countedDev[slot.mnt.DeviceID] {
845                         continue
846                 }
847                 switch {
848                 case slot.repl != nil && slot.want:
849                         bbs.needed++
850                         repl += slot.mnt.Replication
851                 case slot.repl != nil && !slot.want:
852                         bbs.unneeded++
853                         repl += slot.mnt.Replication
854                 case slot.repl == nil && slot.want && have > 0:
855                         bbs.pulling++
856                         repl += slot.mnt.Replication
857                 }
858                 if slot.mnt.DeviceID != "" {
859                         countedDev[slot.mnt.DeviceID] = true
860                 }
861         }
862         if repl < needRepl {
863                 bbs.unachievable = true
864         }
865         return
866 }
867
868 type blocksNBytes struct {
869         replicas int
870         blocks   int
871         bytes    int64
872 }
873
874 func (bb blocksNBytes) String() string {
875         return fmt.Sprintf("%d replicas (%d blocks, %d bytes)", bb.replicas, bb.blocks, bb.bytes)
876 }
877
878 type replicationStats struct {
879         needed       blocksNBytes
880         unneeded     blocksNBytes
881         pulling      blocksNBytes
882         unachievable blocksNBytes
883 }
884
885 type balancerStats struct {
886         lost          blocksNBytes
887         overrep       blocksNBytes
888         unref         blocksNBytes
889         garbage       blocksNBytes
890         underrep      blocksNBytes
891         unachievable  blocksNBytes
892         justright     blocksNBytes
893         desired       blocksNBytes
894         current       blocksNBytes
895         pulls         int
896         trashes       int
897         replHistogram []int
898         classStats    map[string]replicationStats
899
900         // collectionBytes / collectionBlockBytes = deduplication ratio
901         collectionBytes      int64 // sum(bytes in referenced blocks) across all collections
902         collectionBlockBytes int64 // sum(block size) across all blocks referenced by collections
903         collectionBlockRefs  int64 // sum(number of blocks referenced) across all collections
904         collectionBlocks     int64 // number of blocks referenced by any collection
905 }
906
907 func (s *balancerStats) dedupByteRatio() float64 {
908         if s.collectionBlockBytes == 0 {
909                 return 0
910         }
911         return float64(s.collectionBytes) / float64(s.collectionBlockBytes)
912 }
913
914 func (s *balancerStats) dedupBlockRatio() float64 {
915         if s.collectionBlocks == 0 {
916                 return 0
917         }
918         return float64(s.collectionBlockRefs) / float64(s.collectionBlocks)
919 }
920
921 func (bal *Balancer) collectStatistics(results <-chan balanceResult) {
922         var s balancerStats
923         s.replHistogram = make([]int, 2)
924         s.classStats = make(map[string]replicationStats, len(bal.classes))
925         for result := range results {
926                 bytes := result.blkid.Size()
927
928                 if rc := int64(result.blk.RefCount); rc > 0 {
929                         s.collectionBytes += rc * bytes
930                         s.collectionBlockBytes += bytes
931                         s.collectionBlockRefs += rc
932                         s.collectionBlocks++
933                 }
934
935                 for class, state := range result.classState {
936                         cs := s.classStats[class]
937                         if state.unachievable {
938                                 cs.unachievable.replicas++
939                                 cs.unachievable.blocks++
940                                 cs.unachievable.bytes += bytes
941                         }
942                         if state.needed > 0 {
943                                 cs.needed.replicas += state.needed
944                                 cs.needed.blocks++
945                                 cs.needed.bytes += bytes * int64(state.needed)
946                         }
947                         if state.unneeded > 0 {
948                                 cs.unneeded.replicas += state.unneeded
949                                 cs.unneeded.blocks++
950                                 cs.unneeded.bytes += bytes * int64(state.unneeded)
951                         }
952                         if state.pulling > 0 {
953                                 cs.pulling.replicas += state.pulling
954                                 cs.pulling.blocks++
955                                 cs.pulling.bytes += bytes * int64(state.pulling)
956                         }
957                         s.classStats[class] = cs
958                 }
959
960                 bs := result.blockState
961                 switch {
962                 case result.lost:
963                         s.lost.replicas++
964                         s.lost.blocks++
965                         s.lost.bytes += bytes
966                         fmt.Fprintf(bal.lostBlocks, "%s", strings.SplitN(string(result.blkid), "+", 2)[0])
967                         for pdh := range result.blk.Refs {
968                                 fmt.Fprintf(bal.lostBlocks, " %s", pdh)
969                         }
970                         fmt.Fprint(bal.lostBlocks, "\n")
971                 case bs.pulling > 0:
972                         s.underrep.replicas += bs.pulling
973                         s.underrep.blocks++
974                         s.underrep.bytes += bytes * int64(bs.pulling)
975                 case bs.unachievable:
976                         s.underrep.replicas++
977                         s.underrep.blocks++
978                         s.underrep.bytes += bytes
979                 case bs.unneeded > 0 && bs.needed == 0:
980                         // Count as "garbage" if all replicas are old
981                         // enough to trash, otherwise count as
982                         // "unref".
983                         counter := &s.garbage
984                         for _, r := range result.blk.Replicas {
985                                 if r.Mtime >= bal.MinMtime {
986                                         counter = &s.unref
987                                         break
988                                 }
989                         }
990                         counter.replicas += bs.unneeded
991                         counter.blocks++
992                         counter.bytes += bytes * int64(bs.unneeded)
993                 case bs.unneeded > 0:
994                         s.overrep.replicas += bs.unneeded
995                         s.overrep.blocks++
996                         s.overrep.bytes += bytes * int64(bs.unneeded)
997                 default:
998                         s.justright.replicas += bs.needed
999                         s.justright.blocks++
1000                         s.justright.bytes += bytes * int64(bs.needed)
1001                 }
1002
1003                 if bs.needed > 0 {
1004                         s.desired.replicas += bs.needed
1005                         s.desired.blocks++
1006                         s.desired.bytes += bytes * int64(bs.needed)
1007                 }
1008                 if bs.needed+bs.unneeded > 0 {
1009                         s.current.replicas += bs.needed + bs.unneeded
1010                         s.current.blocks++
1011                         s.current.bytes += bytes * int64(bs.needed+bs.unneeded)
1012                 }
1013
1014                 for len(s.replHistogram) <= bs.needed+bs.unneeded {
1015                         s.replHistogram = append(s.replHistogram, 0)
1016                 }
1017                 s.replHistogram[bs.needed+bs.unneeded]++
1018         }
1019         for _, srv := range bal.KeepServices {
1020                 s.pulls += len(srv.ChangeSet.Pulls)
1021                 s.trashes += len(srv.ChangeSet.Trashes)
1022         }
1023         bal.stats = s
1024         bal.Metrics.UpdateStats(s)
1025 }
1026
1027 // PrintStatistics writes statistics about the computed changes to
1028 // bal.Logger. It should not be called until ComputeChangeSets has
1029 // finished.
1030 func (bal *Balancer) PrintStatistics() {
1031         bal.logf("===")
1032         bal.logf("%s lost (0=have<want)", bal.stats.lost)
1033         bal.logf("%s underreplicated (0<have<want)", bal.stats.underrep)
1034         bal.logf("%s just right (have=want)", bal.stats.justright)
1035         bal.logf("%s overreplicated (have>want>0)", bal.stats.overrep)
1036         bal.logf("%s unreferenced (have>want=0, new)", bal.stats.unref)
1037         bal.logf("%s garbage (have>want=0, old)", bal.stats.garbage)
1038         for _, class := range bal.classes {
1039                 cs := bal.stats.classStats[class]
1040                 bal.logf("===")
1041                 bal.logf("storage class %q: %s needed", class, cs.needed)
1042                 bal.logf("storage class %q: %s unneeded", class, cs.unneeded)
1043                 bal.logf("storage class %q: %s pulling", class, cs.pulling)
1044                 bal.logf("storage class %q: %s unachievable", class, cs.unachievable)
1045         }
1046         bal.logf("===")
1047         bal.logf("%s total commitment (excluding unreferenced)", bal.stats.desired)
1048         bal.logf("%s total usage", bal.stats.current)
1049         bal.logf("===")
1050         for _, srv := range bal.KeepServices {
1051                 bal.logf("%s: %v\n", srv, srv.ChangeSet)
1052         }
1053         bal.logf("===")
1054         bal.printHistogram(60)
1055         bal.logf("===")
1056 }
1057
1058 func (bal *Balancer) printHistogram(hashColumns int) {
1059         bal.logf("Replication level distribution:")
1060         maxCount := 0
1061         for _, count := range bal.stats.replHistogram {
1062                 if maxCount < count {
1063                         maxCount = count
1064                 }
1065         }
1066         hashes := strings.Repeat("#", hashColumns)
1067         countWidth := 1 + int(math.Log10(float64(maxCount+1)))
1068         scaleCount := 10 * float64(hashColumns) / math.Floor(1+10*math.Log10(float64(maxCount+1)))
1069         for repl, count := range bal.stats.replHistogram {
1070                 nHashes := int(scaleCount * math.Log10(float64(count+1)))
1071                 bal.logf("%2d: %*d %s", repl, countWidth, count, hashes[:nHashes])
1072         }
1073 }
1074
1075 // CheckSanityLate checks for configuration and runtime errors after
1076 // GetCurrentState() and ComputeChangeSets() have finished.
1077 //
1078 // If it returns an error, it is dangerous to run any Commit methods.
1079 func (bal *Balancer) CheckSanityLate() error {
1080         if bal.errors != nil {
1081                 for _, err := range bal.errors {
1082                         bal.logf("deferred error: %v", err)
1083                 }
1084                 return fmt.Errorf("cannot proceed safely after deferred errors")
1085         }
1086
1087         if bal.collScanned == 0 {
1088                 return fmt.Errorf("received zero collections")
1089         }
1090
1091         anyDesired := false
1092         bal.BlockStateMap.Apply(func(_ arvados.SizedDigest, blk *BlockState) {
1093                 for _, desired := range blk.Desired {
1094                         if desired > 0 {
1095                                 anyDesired = true
1096                                 break
1097                         }
1098                 }
1099         })
1100         if !anyDesired {
1101                 return fmt.Errorf("zero blocks have desired replication>0")
1102         }
1103
1104         if dr := bal.DefaultReplication; dr < 1 {
1105                 return fmt.Errorf("Default replication (%d) is less than 1", dr)
1106         }
1107
1108         // TODO: no two services have identical indexes
1109         // TODO: no collisions (same md5, different size)
1110         return nil
1111 }
1112
1113 // CommitPulls sends the computed lists of pull requests to the
1114 // keepstore servers. This has the effect of increasing replication of
1115 // existing blocks that are either underreplicated or poorly
1116 // distributed according to rendezvous hashing.
1117 func (bal *Balancer) CommitPulls(ctx context.Context, c *arvados.Client) error {
1118         defer bal.time("send_pull_lists", "wall clock time to send pull lists")()
1119         return bal.commitAsync(c, "send pull list",
1120                 func(srv *KeepService) error {
1121                         return srv.CommitPulls(ctx, c)
1122                 })
1123 }
1124
1125 // CommitTrash sends the computed lists of trash requests to the
1126 // keepstore servers. This has the effect of deleting blocks that are
1127 // overreplicated or unreferenced.
1128 func (bal *Balancer) CommitTrash(ctx context.Context, c *arvados.Client) error {
1129         defer bal.time("send_trash_lists", "wall clock time to send trash lists")()
1130         return bal.commitAsync(c, "send trash list",
1131                 func(srv *KeepService) error {
1132                         return srv.CommitTrash(ctx, c)
1133                 })
1134 }
1135
1136 func (bal *Balancer) commitAsync(c *arvados.Client, label string, f func(srv *KeepService) error) error {
1137         errs := make(chan error)
1138         for _, srv := range bal.KeepServices {
1139                 go func(srv *KeepService) {
1140                         var err error
1141                         defer func() { errs <- err }()
1142                         label := fmt.Sprintf("%s: %v", srv, label)
1143                         err = f(srv)
1144                         if err != nil {
1145                                 err = fmt.Errorf("%s: %v", label, err)
1146                         }
1147                 }(srv)
1148         }
1149         var lastErr error
1150         for range bal.KeepServices {
1151                 if err := <-errs; err != nil {
1152                         bal.logf("%v", err)
1153                         lastErr = err
1154                 }
1155         }
1156         close(errs)
1157         return lastErr
1158 }
1159
1160 func (bal *Balancer) logf(f string, args ...interface{}) {
1161         if bal.Logger != nil {
1162                 bal.Logger.Printf(f, args...)
1163         }
1164 }
1165
1166 func (bal *Balancer) time(name, help string) func() {
1167         observer := bal.Metrics.DurationObserver(name+"_seconds", help)
1168         t0 := time.Now()
1169         bal.Logger.Printf("%s: start", name)
1170         return func() {
1171                 dur := time.Since(t0)
1172                 observer.Observe(dur.Seconds())
1173                 bal.Logger.Printf("%s: took %vs", name, dur.Seconds())
1174         }
1175 }
1176
1177 // Rendezvous hash sort function. Less efficient than sorting on
1178 // precomputed rendezvous hashes, but also rarely used.
1179 func rendezvousLess(i, j string, blkid arvados.SizedDigest) bool {
1180         a := md5.Sum([]byte(string(blkid[:32]) + i))
1181         b := md5.Sum([]byte(string(blkid[:32]) + j))
1182         return bytes.Compare(a[:], b[:]) < 0
1183 }