19923: keep-balance option to process a subset of blocks.
[arvados.git] / services / keep-balance / balance.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package keepbalance
6
7 import (
8         "bytes"
9         "context"
10         "crypto/md5"
11         "errors"
12         "fmt"
13         "io"
14         "io/ioutil"
15         "log"
16         "math"
17         "os"
18         "runtime"
19         "sort"
20         "strings"
21         "sync"
22         "sync/atomic"
23         "syscall"
24         "time"
25
26         "git.arvados.org/arvados.git/lib/controller/dblock"
27         "git.arvados.org/arvados.git/sdk/go/arvados"
28         "git.arvados.org/arvados.git/sdk/go/ctxlog"
29         "git.arvados.org/arvados.git/sdk/go/keepclient"
30         "github.com/jmoiron/sqlx"
31         "github.com/sirupsen/logrus"
32 )
33
34 // Balancer compares the contents of keepstore servers with the
35 // collections stored in Arvados, and issues pull/trash requests
36 // needed to get (closer to) the optimal data layout.
37 //
38 // In the optimal data layout: every data block referenced by a
39 // collection is replicated at least as many times as desired by the
40 // collection; there are no unreferenced data blocks older than
41 // BlobSignatureTTL; and all N existing replicas of a given data block
42 // are in the N best positions in rendezvous probe order.
43 type Balancer struct {
44         DB      *sqlx.DB
45         Logger  logrus.FieldLogger
46         Dumper  logrus.FieldLogger
47         Metrics *metrics
48
49         ChunkPrefix    string
50         LostBlocksFile string
51
52         *BlockStateMap
53         KeepServices       map[string]*KeepService
54         DefaultReplication int
55         MinMtime           int64
56
57         classes       []string
58         mounts        int
59         mountsByClass map[string]map[*KeepMount]bool
60         collScanned   int64
61         serviceRoots  map[string]string
62         errors        []error
63         stats         balancerStats
64         mutex         sync.Mutex
65         lostBlocks    io.Writer
66 }
67
68 // Run performs a balance operation using the given config and
69 // runOptions, and returns RunOptions suitable for passing to a
70 // subsequent balance operation.
71 //
72 // Run should only be called once on a given Balancer object.
73 func (bal *Balancer) Run(ctx context.Context, client *arvados.Client, cluster *arvados.Cluster, runOptions RunOptions) (nextRunOptions RunOptions, err error) {
74         nextRunOptions = runOptions
75
76         ctxlog.FromContext(ctx).Info("acquiring active lock")
77         if !dblock.KeepBalanceActive.Lock(ctx, func(context.Context) (*sqlx.DB, error) { return bal.DB, nil }) {
78                 // context canceled
79                 return
80         }
81         defer dblock.KeepBalanceActive.Unlock()
82
83         defer bal.time("sweep", "wall clock time to run one full sweep")()
84
85         ctx, cancel := context.WithDeadline(ctx, time.Now().Add(cluster.Collections.BalanceTimeout.Duration()))
86         defer cancel()
87
88         var lbFile *os.File
89         if bal.LostBlocksFile != "" {
90                 tmpfn := bal.LostBlocksFile + ".tmp"
91                 lbFile, err = os.OpenFile(tmpfn, os.O_CREATE|os.O_WRONLY, 0777)
92                 if err != nil {
93                         return
94                 }
95                 defer lbFile.Close()
96                 err = syscall.Flock(int(lbFile.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
97                 if err != nil {
98                         return
99                 }
100                 defer func() {
101                         // Remove the tempfile only if we didn't get
102                         // as far as successfully renaming it.
103                         if lbFile != nil {
104                                 os.Remove(tmpfn)
105                         }
106                 }()
107                 bal.lostBlocks = lbFile
108         } else {
109                 bal.lostBlocks = ioutil.Discard
110         }
111
112         err = bal.DiscoverKeepServices(client)
113         if err != nil {
114                 return
115         }
116
117         for _, srv := range bal.KeepServices {
118                 err = srv.discoverMounts(client)
119                 if err != nil {
120                         return
121                 }
122         }
123         bal.cleanupMounts()
124
125         if err = bal.CheckSanityEarly(client); err != nil {
126                 return
127         }
128
129         // On a big site, indexing and sending trash/pull lists can
130         // take much longer than the usual 5 minute client
131         // timeout. From here on, we rely on the context deadline
132         // instead, aborting the entire operation if any part takes
133         // too long.
134         client.Timeout = 0
135
136         rs := bal.rendezvousState()
137         if runOptions.CommitTrash && rs != runOptions.SafeRendezvousState {
138                 if runOptions.SafeRendezvousState != "" {
139                         bal.logf("notice: KeepServices list has changed since last run")
140                 }
141                 bal.logf("clearing existing trash lists, in case the new rendezvous order differs from previous run")
142                 if err = bal.ClearTrashLists(ctx, client); err != nil {
143                         return
144                 }
145                 // The current rendezvous state becomes "safe" (i.e.,
146                 // OK to compute changes for that state without
147                 // clearing existing trash lists) only now, after we
148                 // succeed in clearing existing trash lists.
149                 nextRunOptions.SafeRendezvousState = rs
150         }
151
152         if err = bal.GetCurrentState(ctx, client, cluster.Collections.BalanceCollectionBatch, cluster.Collections.BalanceCollectionBuffers); err != nil {
153                 return
154         }
155         bal.ComputeChangeSets()
156         bal.PrintStatistics()
157         if err = bal.CheckSanityLate(); err != nil {
158                 return
159         }
160         if lbFile != nil {
161                 err = lbFile.Sync()
162                 if err != nil {
163                         return
164                 }
165                 err = os.Rename(bal.LostBlocksFile+".tmp", bal.LostBlocksFile)
166                 if err != nil {
167                         return
168                 }
169                 lbFile = nil
170         }
171         if runOptions.CommitPulls {
172                 err = bal.CommitPulls(ctx, client)
173                 if err != nil {
174                         // Skip trash if we can't pull. (Too cautious?)
175                         return
176                 }
177         }
178         if runOptions.CommitTrash {
179                 err = bal.CommitTrash(ctx, client)
180                 if err != nil {
181                         return
182                 }
183         }
184         if runOptions.CommitConfirmedFields {
185                 err = bal.updateCollections(ctx, client, cluster)
186                 if err != nil {
187                         return
188                 }
189         }
190         return
191 }
192
193 // SetKeepServices sets the list of KeepServices to operate on.
194 func (bal *Balancer) SetKeepServices(srvList arvados.KeepServiceList) error {
195         bal.KeepServices = make(map[string]*KeepService)
196         for _, srv := range srvList.Items {
197                 bal.KeepServices[srv.UUID] = &KeepService{
198                         KeepService: srv,
199                         ChangeSet:   &ChangeSet{},
200                 }
201         }
202         return nil
203 }
204
205 // DiscoverKeepServices sets the list of KeepServices by calling the
206 // API to get a list of all services, and selecting the ones whose
207 // ServiceType is "disk"
208 func (bal *Balancer) DiscoverKeepServices(c *arvados.Client) error {
209         bal.KeepServices = make(map[string]*KeepService)
210         return c.EachKeepService(func(srv arvados.KeepService) error {
211                 if srv.ServiceType == "disk" {
212                         bal.KeepServices[srv.UUID] = &KeepService{
213                                 KeepService: srv,
214                                 ChangeSet:   &ChangeSet{},
215                         }
216                 } else {
217                         bal.logf("skipping %v with service type %q", srv.UUID, srv.ServiceType)
218                 }
219                 return nil
220         })
221 }
222
223 func (bal *Balancer) cleanupMounts() {
224         rwdev := map[string]*KeepService{}
225         for _, srv := range bal.KeepServices {
226                 for _, mnt := range srv.mounts {
227                         if !mnt.ReadOnly {
228                                 rwdev[mnt.UUID] = srv
229                         }
230                 }
231         }
232         // Drop the readonly mounts whose device is mounted RW
233         // elsewhere.
234         for _, srv := range bal.KeepServices {
235                 var dedup []*KeepMount
236                 for _, mnt := range srv.mounts {
237                         if mnt.ReadOnly && rwdev[mnt.UUID] != nil {
238                                 bal.logf("skipping srv %s readonly mount %q because same volume is mounted read-write on srv %s", srv, mnt.UUID, rwdev[mnt.UUID])
239                         } else {
240                                 dedup = append(dedup, mnt)
241                         }
242                 }
243                 srv.mounts = dedup
244         }
245         for _, srv := range bal.KeepServices {
246                 for _, mnt := range srv.mounts {
247                         if mnt.Replication <= 0 {
248                                 log.Printf("%s: mount %s reports replication=%d, using replication=1", srv, mnt.UUID, mnt.Replication)
249                                 mnt.Replication = 1
250                         }
251                 }
252         }
253 }
254
255 // CheckSanityEarly checks for configuration and runtime errors that
256 // can be detected before GetCurrentState() and ComputeChangeSets()
257 // are called.
258 //
259 // If it returns an error, it is pointless to run GetCurrentState or
260 // ComputeChangeSets: after doing so, the statistics would be
261 // meaningless and it would be dangerous to run any Commit methods.
262 func (bal *Balancer) CheckSanityEarly(c *arvados.Client) error {
263         u, err := c.CurrentUser()
264         if err != nil {
265                 return fmt.Errorf("CurrentUser(): %v", err)
266         }
267         if !u.IsActive || !u.IsAdmin {
268                 return fmt.Errorf("current user (%s) is not an active admin user", u.UUID)
269         }
270         for _, srv := range bal.KeepServices {
271                 if srv.ServiceType == "proxy" {
272                         return fmt.Errorf("config error: %s: proxy servers cannot be balanced", srv)
273                 }
274         }
275
276         mountProblem := false
277         type deviceMount struct {
278                 srv *KeepService
279                 mnt *KeepMount
280         }
281         deviceMounted := map[string]deviceMount{} // DeviceID -> mount
282         for _, srv := range bal.KeepServices {
283                 for _, mnt := range srv.mounts {
284                         if first, dup := deviceMounted[mnt.DeviceID]; dup && first.mnt.UUID != mnt.UUID && mnt.DeviceID != "" {
285                                 bal.logf("config error: device %s is mounted with multiple volume UUIDs: %s on %s, and %s on %s",
286                                         mnt.DeviceID,
287                                         first.mnt.UUID, first.srv,
288                                         mnt.UUID, srv)
289                                 mountProblem = true
290                                 continue
291                         }
292                         deviceMounted[mnt.DeviceID] = deviceMount{srv, mnt}
293                 }
294         }
295         if mountProblem {
296                 return errors.New("cannot continue with config errors (see above)")
297         }
298
299         var checkPage arvados.CollectionList
300         if err = c.RequestAndDecode(&checkPage, "GET", "arvados/v1/collections", nil, arvados.ResourceListParams{
301                 Limit:              new(int),
302                 Count:              "exact",
303                 IncludeTrash:       true,
304                 IncludeOldVersions: true,
305                 Filters: []arvados.Filter{{
306                         Attr:     "modified_at",
307                         Operator: "=",
308                         Operand:  nil,
309                 }},
310         }); err != nil {
311                 return err
312         } else if n := checkPage.ItemsAvailable; n > 0 {
313                 return fmt.Errorf("%d collections exist with null modified_at; cannot fetch reliably", n)
314         }
315
316         return nil
317 }
318
319 // rendezvousState returns a fingerprint (e.g., a sorted list of
320 // UUID+host+port) of the current set of keep services.
321 func (bal *Balancer) rendezvousState() string {
322         srvs := make([]string, 0, len(bal.KeepServices))
323         for _, srv := range bal.KeepServices {
324                 srvs = append(srvs, srv.String())
325         }
326         sort.Strings(srvs)
327         return strings.Join(srvs, "; ")
328 }
329
330 // ClearTrashLists sends an empty trash list to each keep
331 // service. Calling this before GetCurrentState avoids races.
332 //
333 // When a block appears in an index, we assume that replica will still
334 // exist after we delete other replicas on other servers. However,
335 // it's possible that a previous rebalancing operation made different
336 // decisions (e.g., servers were added/removed, and rendezvous order
337 // changed). In this case, the replica might already be on that
338 // server's trash list, and it might be deleted before we send a
339 // replacement trash list.
340 //
341 // We avoid this problem if we clear all trash lists before getting
342 // indexes. (We also assume there is only one rebalancing process
343 // running at a time.)
344 func (bal *Balancer) ClearTrashLists(ctx context.Context, c *arvados.Client) error {
345         for _, srv := range bal.KeepServices {
346                 srv.ChangeSet = &ChangeSet{}
347         }
348         return bal.CommitTrash(ctx, c)
349 }
350
351 // GetCurrentState determines the current replication state, and the
352 // desired replication level, for every block that is either
353 // retrievable or referenced.
354 //
355 // It determines the current replication state by reading the block index
356 // from every known Keep service.
357 //
358 // It determines the desired replication level by retrieving all
359 // collection manifests in the database (API server).
360 //
361 // It encodes the resulting information in BlockStateMap.
362 func (bal *Balancer) GetCurrentState(ctx context.Context, c *arvados.Client, pageSize, bufs int) error {
363         ctx, cancel := context.WithCancel(ctx)
364         defer cancel()
365
366         defer bal.time("get_state", "wall clock time to get current state")()
367         bal.BlockStateMap = NewBlockStateMap()
368
369         dd, err := c.DiscoveryDocument()
370         if err != nil {
371                 return err
372         }
373         bal.DefaultReplication = dd.DefaultCollectionReplication
374         bal.MinMtime = time.Now().UnixNano() - dd.BlobSignatureTTL*1e9
375
376         errs := make(chan error, 1)
377         wg := sync.WaitGroup{}
378
379         // When a device is mounted more than once, we will get its
380         // index only once, and call AddReplicas on all of the mounts.
381         // equivMount keys are the mounts that will be indexed, and
382         // each value is a list of mounts to apply the received index
383         // to.
384         equivMount := map[*KeepMount][]*KeepMount{}
385         // deviceMount maps each device ID to the one mount that will
386         // be indexed for that device.
387         deviceMount := map[string]*KeepMount{}
388         for _, srv := range bal.KeepServices {
389                 for _, mnt := range srv.mounts {
390                         equiv := deviceMount[mnt.UUID]
391                         if equiv == nil {
392                                 equiv = mnt
393                                 deviceMount[mnt.UUID] = equiv
394                         }
395                         equivMount[equiv] = append(equivMount[equiv], mnt)
396                 }
397         }
398
399         // Start one goroutine for each (non-redundant) mount:
400         // retrieve the index, and add the returned blocks to
401         // BlockStateMap.
402         for _, mounts := range equivMount {
403                 wg.Add(1)
404                 go func(mounts []*KeepMount) {
405                         defer wg.Done()
406                         bal.logf("mount %s: retrieve index from %s", mounts[0], mounts[0].KeepService)
407                         idx, err := mounts[0].KeepService.IndexMount(ctx, c, mounts[0].UUID, bal.ChunkPrefix)
408                         if err != nil {
409                                 select {
410                                 case errs <- fmt.Errorf("%s: retrieve index: %v", mounts[0], err):
411                                 default:
412                                 }
413                                 cancel()
414                                 return
415                         }
416                         if len(errs) > 0 {
417                                 // Some other goroutine encountered an
418                                 // error -- any further effort here
419                                 // will be wasted.
420                                 return
421                         }
422                         for _, mount := range mounts {
423                                 bal.logf("%s: add %d entries to map", mount, len(idx))
424                                 bal.BlockStateMap.AddReplicas(mount, idx)
425                                 bal.logf("%s: added %d entries to map at %dx (%d replicas)", mount, len(idx), mount.Replication, len(idx)*mount.Replication)
426                         }
427                         bal.logf("mount %s: index done", mounts[0])
428                 }(mounts)
429         }
430
431         collQ := make(chan arvados.Collection, bufs)
432
433         // Retrieve all collections from the database and send them to
434         // collQ.
435         wg.Add(1)
436         go func() {
437                 defer wg.Done()
438                 err = EachCollection(ctx, bal.DB, c,
439                         func(coll arvados.Collection) error {
440                                 collQ <- coll
441                                 if len(errs) > 0 {
442                                         // some other GetCurrentState
443                                         // error happened: no point
444                                         // getting any more
445                                         // collections.
446                                         return fmt.Errorf("")
447                                 }
448                                 return nil
449                         }, func(done, total int) {
450                                 bal.logf("collections: %d/%d", done, total)
451                         })
452                 close(collQ)
453                 if err != nil {
454                         select {
455                         case errs <- err:
456                         default:
457                         }
458                         cancel()
459                 }
460         }()
461
462         // Parse manifests from collQ and pass the block hashes to
463         // BlockStateMap to track desired replication.
464         for i := 0; i < runtime.NumCPU(); i++ {
465                 wg.Add(1)
466                 go func() {
467                         defer wg.Done()
468                         for coll := range collQ {
469                                 err := bal.addCollection(coll)
470                                 if err != nil || len(errs) > 0 {
471                                         select {
472                                         case errs <- err:
473                                         default:
474                                         }
475                                         cancel()
476                                         continue
477                                 }
478                                 atomic.AddInt64(&bal.collScanned, 1)
479                         }
480                 }()
481         }
482
483         wg.Wait()
484         if len(errs) > 0 {
485                 return <-errs
486         }
487         return nil
488 }
489
490 func (bal *Balancer) addCollection(coll arvados.Collection) error {
491         blkids, err := coll.SizedDigests()
492         if err != nil {
493                 return fmt.Errorf("%v: %v", coll.UUID, err)
494         }
495         repl := bal.DefaultReplication
496         if coll.ReplicationDesired != nil {
497                 repl = *coll.ReplicationDesired
498         }
499         if bal.ChunkPrefix != "" {
500                 // Throw out blocks that don't match the requested
501                 // prefix.  (We save a bit of GC work here by
502                 // preallocating based on each hex digit in
503                 // ChunkPrefix reducing the expected size of the
504                 // filtered set by ~16x.)
505                 filtered := make([]arvados.SizedDigest, 0, len(blkids)>>(4*len(bal.ChunkPrefix)-1))
506                 for _, blkid := range blkids {
507                         if strings.HasPrefix(string(blkid), bal.ChunkPrefix) {
508                                 filtered = append(filtered, blkid)
509                         }
510                 }
511                 blkids = filtered
512         }
513         bal.Logger.Debugf("%v: %d blocks x%d", coll.UUID, len(blkids), repl)
514         // Pass pdh to IncreaseDesired only if LostBlocksFile is being
515         // written -- otherwise it's just a waste of memory.
516         pdh := ""
517         if bal.LostBlocksFile != "" {
518                 pdh = coll.PortableDataHash
519         }
520         bal.BlockStateMap.IncreaseDesired(pdh, coll.StorageClassesDesired, repl, blkids)
521         return nil
522 }
523
524 // ComputeChangeSets compares, for each known block, the current and
525 // desired replication states. If it is possible to get closer to the
526 // desired state by copying or deleting blocks, it adds those changes
527 // to the relevant KeepServices' ChangeSets.
528 //
529 // It does not actually apply any of the computed changes.
530 func (bal *Balancer) ComputeChangeSets() {
531         // This just calls balanceBlock() once for each block, using a
532         // pool of worker goroutines.
533         defer bal.time("changeset_compute", "wall clock time to compute changesets")()
534         bal.setupLookupTables()
535
536         type balanceTask struct {
537                 blkid arvados.SizedDigest
538                 blk   *BlockState
539         }
540         workers := runtime.GOMAXPROCS(-1)
541         todo := make(chan balanceTask, workers)
542         go func() {
543                 bal.BlockStateMap.Apply(func(blkid arvados.SizedDigest, blk *BlockState) {
544                         todo <- balanceTask{
545                                 blkid: blkid,
546                                 blk:   blk,
547                         }
548                 })
549                 close(todo)
550         }()
551         results := make(chan balanceResult, workers)
552         go func() {
553                 var wg sync.WaitGroup
554                 for i := 0; i < workers; i++ {
555                         wg.Add(1)
556                         go func() {
557                                 for work := range todo {
558                                         results <- bal.balanceBlock(work.blkid, work.blk)
559                                 }
560                                 wg.Done()
561                         }()
562                 }
563                 wg.Wait()
564                 close(results)
565         }()
566         bal.collectStatistics(results)
567 }
568
569 func (bal *Balancer) setupLookupTables() {
570         bal.serviceRoots = make(map[string]string)
571         bal.classes = defaultClasses
572         bal.mountsByClass = map[string]map[*KeepMount]bool{"default": {}}
573         bal.mounts = 0
574         for _, srv := range bal.KeepServices {
575                 bal.serviceRoots[srv.UUID] = srv.UUID
576                 for _, mnt := range srv.mounts {
577                         bal.mounts++
578
579                         // All mounts on a read-only service are
580                         // effectively read-only.
581                         mnt.ReadOnly = mnt.ReadOnly || srv.ReadOnly
582
583                         for class := range mnt.StorageClasses {
584                                 if mbc := bal.mountsByClass[class]; mbc == nil {
585                                         bal.classes = append(bal.classes, class)
586                                         bal.mountsByClass[class] = map[*KeepMount]bool{mnt: true}
587                                 } else {
588                                         mbc[mnt] = true
589                                 }
590                         }
591                 }
592         }
593         // Consider classes in lexicographic order to avoid flapping
594         // between balancing runs.  The outcome of the "prefer a mount
595         // we're already planning to use for a different storage
596         // class" case in balanceBlock depends on the order classes
597         // are considered.
598         sort.Strings(bal.classes)
599 }
600
601 const (
602         changeStay = iota
603         changePull
604         changeTrash
605         changeNone
606 )
607
608 var changeName = map[int]string{
609         changeStay:  "stay",
610         changePull:  "pull",
611         changeTrash: "trash",
612         changeNone:  "none",
613 }
614
615 type balancedBlockState struct {
616         needed       int
617         unneeded     int
618         pulling      int
619         unachievable bool
620 }
621
622 type balanceResult struct {
623         blk        *BlockState
624         blkid      arvados.SizedDigest
625         lost       bool
626         blockState balancedBlockState
627         classState map[string]balancedBlockState
628 }
629
630 type slot struct {
631         mnt  *KeepMount // never nil
632         repl *Replica   // replica already stored here (or nil)
633         want bool       // we should pull/leave a replica here
634 }
635
636 // balanceBlock compares current state to desired state for a single
637 // block, and makes the appropriate ChangeSet calls.
638 func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) balanceResult {
639         bal.Logger.Debugf("balanceBlock: %v %+v", blkid, blk)
640
641         // Build a list of all slots (one per mounted volume).
642         slots := make([]slot, 0, bal.mounts)
643         for _, srv := range bal.KeepServices {
644                 for _, mnt := range srv.mounts {
645                         var repl *Replica
646                         for r := range blk.Replicas {
647                                 if blk.Replicas[r].KeepMount == mnt {
648                                         repl = &blk.Replicas[r]
649                                 }
650                         }
651                         // Initial value of "want" is "have, and can't
652                         // delete". These untrashable replicas get
653                         // prioritized when sorting slots: otherwise,
654                         // non-optimal readonly copies would cause us
655                         // to overreplicate.
656                         slots = append(slots, slot{
657                                 mnt:  mnt,
658                                 repl: repl,
659                                 want: repl != nil && mnt.ReadOnly,
660                         })
661                 }
662         }
663
664         uuids := keepclient.NewRootSorter(bal.serviceRoots, string(blkid[:32])).GetSortedRoots()
665         srvRendezvous := make(map[*KeepService]int, len(uuids))
666         for i, uuid := range uuids {
667                 srv := bal.KeepServices[uuid]
668                 srvRendezvous[srv] = i
669         }
670
671         // Below we set underreplicated=true if we find any storage
672         // class that's currently underreplicated -- in that case we
673         // won't want to trash any replicas.
674         underreplicated := false
675
676         unsafeToDelete := make(map[int64]bool, len(slots))
677         for _, class := range bal.classes {
678                 desired := blk.Desired[class]
679                 if desired == 0 {
680                         continue
681                 }
682
683                 // Sort the slots by desirability.
684                 sort.Slice(slots, func(i, j int) bool {
685                         si, sj := slots[i], slots[j]
686                         if classi, classj := bal.mountsByClass[class][si.mnt], bal.mountsByClass[class][sj.mnt]; classi != classj {
687                                 // Prefer a mount that satisfies the
688                                 // desired class.
689                                 return bal.mountsByClass[class][si.mnt]
690                         } else if si.want != sj.want {
691                                 // Prefer a mount that will have a
692                                 // replica no matter what we do here
693                                 // -- either because it already has an
694                                 // untrashable replica, or because we
695                                 // already need it to satisfy a
696                                 // different storage class.
697                                 return si.want
698                         } else if orderi, orderj := srvRendezvous[si.mnt.KeepService], srvRendezvous[sj.mnt.KeepService]; orderi != orderj {
699                                 // Prefer a better rendezvous
700                                 // position.
701                                 return orderi < orderj
702                         } else if repli, replj := si.repl != nil, sj.repl != nil; repli != replj {
703                                 // Prefer a mount that already has a
704                                 // replica.
705                                 return repli
706                         } else {
707                                 // If pull/trash turns out to be
708                                 // needed, distribute the
709                                 // new/remaining replicas uniformly
710                                 // across qualifying mounts on a given
711                                 // server.
712                                 return rendezvousLess(si.mnt.UUID, sj.mnt.UUID, blkid)
713                         }
714                 })
715
716                 // Servers/mounts/devices (with or without existing
717                 // replicas) that are part of the best achievable
718                 // layout for this storage class.
719                 wantSrv := map[*KeepService]bool{}
720                 wantMnt := map[*KeepMount]bool{}
721                 wantDev := map[string]bool{}
722                 // Positions (with existing replicas) that have been
723                 // protected (via unsafeToDelete) to ensure we don't
724                 // reduce replication below desired level when
725                 // trashing replicas that aren't optimal positions for
726                 // any storage class.
727                 protMnt := map[*KeepMount]bool{}
728                 // Replication planned so far (corresponds to wantMnt).
729                 replWant := 0
730                 // Protected replication (corresponds to protMnt).
731                 replProt := 0
732
733                 // trySlot tries using a slot to meet requirements,
734                 // and returns true if all requirements are met.
735                 trySlot := func(i int) bool {
736                         slot := slots[i]
737                         if wantMnt[slot.mnt] || wantDev[slot.mnt.UUID] {
738                                 // Already allocated a replica to this
739                                 // backend device, possibly on a
740                                 // different server.
741                                 return false
742                         }
743                         if replProt < desired && slot.repl != nil && !protMnt[slot.mnt] {
744                                 unsafeToDelete[slot.repl.Mtime] = true
745                                 protMnt[slot.mnt] = true
746                                 replProt += slot.mnt.Replication
747                         }
748                         if replWant < desired && (slot.repl != nil || !slot.mnt.ReadOnly) {
749                                 slots[i].want = true
750                                 wantSrv[slot.mnt.KeepService] = true
751                                 wantMnt[slot.mnt] = true
752                                 wantDev[slot.mnt.UUID] = true
753                                 replWant += slot.mnt.Replication
754                         }
755                         return replProt >= desired && replWant >= desired
756                 }
757
758                 // First try to achieve desired replication without
759                 // using the same server twice.
760                 done := false
761                 for i := 0; i < len(slots) && !done; i++ {
762                         if !wantSrv[slots[i].mnt.KeepService] {
763                                 done = trySlot(i)
764                         }
765                 }
766
767                 // If that didn't suffice, do another pass without the
768                 // "distinct services" restriction. (Achieving the
769                 // desired volume replication on fewer than the
770                 // desired number of services is better than
771                 // underreplicating.)
772                 for i := 0; i < len(slots) && !done; i++ {
773                         done = trySlot(i)
774                 }
775
776                 if !underreplicated {
777                         safe := 0
778                         for _, slot := range slots {
779                                 if slot.repl == nil || !bal.mountsByClass[class][slot.mnt] {
780                                         continue
781                                 }
782                                 if safe += slot.mnt.Replication; safe >= desired {
783                                         break
784                                 }
785                         }
786                         underreplicated = safe < desired
787                 }
788
789                 // Avoid deleting wanted replicas from devices that
790                 // are mounted on multiple servers -- even if they
791                 // haven't already been added to unsafeToDelete
792                 // because the servers report different Mtimes.
793                 for _, slot := range slots {
794                         if slot.repl != nil && wantDev[slot.mnt.UUID] {
795                                 unsafeToDelete[slot.repl.Mtime] = true
796                         }
797                 }
798         }
799
800         // TODO: If multiple replicas are trashable, prefer the oldest
801         // replica that doesn't have a timestamp collision with
802         // others.
803
804         for i, slot := range slots {
805                 // Don't trash (1) any replicas of an underreplicated
806                 // block, even if they're in the wrong positions, or
807                 // (2) any replicas whose Mtimes are identical to
808                 // needed replicas (in case we're really seeing the
809                 // same copy via different mounts).
810                 if slot.repl != nil && (underreplicated || unsafeToDelete[slot.repl.Mtime]) {
811                         slots[i].want = true
812                 }
813         }
814
815         classState := make(map[string]balancedBlockState, len(bal.classes))
816         for _, class := range bal.classes {
817                 classState[class] = computeBlockState(slots, bal.mountsByClass[class], len(blk.Replicas), blk.Desired[class])
818         }
819         blockState := computeBlockState(slots, nil, len(blk.Replicas), 0)
820
821         var lost bool
822         var changes []string
823         for _, slot := range slots {
824                 // TODO: request a Touch if Mtime is duplicated.
825                 var change int
826                 switch {
827                 case !slot.want && slot.repl != nil && slot.repl.Mtime < bal.MinMtime:
828                         slot.mnt.KeepService.AddTrash(Trash{
829                                 SizedDigest: blkid,
830                                 Mtime:       slot.repl.Mtime,
831                                 From:        slot.mnt,
832                         })
833                         change = changeTrash
834                 case slot.repl == nil && slot.want && len(blk.Replicas) == 0:
835                         lost = true
836                         change = changeNone
837                 case slot.repl == nil && slot.want && !slot.mnt.ReadOnly:
838                         slot.mnt.KeepService.AddPull(Pull{
839                                 SizedDigest: blkid,
840                                 From:        blk.Replicas[0].KeepMount.KeepService,
841                                 To:          slot.mnt,
842                         })
843                         change = changePull
844                 case slot.repl != nil:
845                         change = changeStay
846                 default:
847                         change = changeNone
848                 }
849                 if bal.Dumper != nil {
850                         var mtime int64
851                         if slot.repl != nil {
852                                 mtime = slot.repl.Mtime
853                         }
854                         srv := slot.mnt.KeepService
855                         changes = append(changes, fmt.Sprintf("%s:%d/%s=%s,%d", srv.ServiceHost, srv.ServicePort, slot.mnt.UUID, changeName[change], mtime))
856                 }
857         }
858         if bal.Dumper != nil {
859                 bal.Dumper.Printf("%s refs=%d needed=%d unneeded=%d pulling=%v %v %v", blkid, blk.RefCount, blockState.needed, blockState.unneeded, blockState.pulling, blk.Desired, changes)
860         }
861         return balanceResult{
862                 blk:        blk,
863                 blkid:      blkid,
864                 lost:       lost,
865                 blockState: blockState,
866                 classState: classState,
867         }
868 }
869
870 func computeBlockState(slots []slot, onlyCount map[*KeepMount]bool, have, needRepl int) (bbs balancedBlockState) {
871         repl := 0
872         countedDev := map[string]bool{}
873         for _, slot := range slots {
874                 if onlyCount != nil && !onlyCount[slot.mnt] {
875                         continue
876                 }
877                 if countedDev[slot.mnt.UUID] {
878                         continue
879                 }
880                 switch {
881                 case slot.repl != nil && slot.want:
882                         bbs.needed++
883                         repl += slot.mnt.Replication
884                 case slot.repl != nil && !slot.want:
885                         bbs.unneeded++
886                         repl += slot.mnt.Replication
887                 case slot.repl == nil && slot.want && have > 0:
888                         bbs.pulling++
889                         repl += slot.mnt.Replication
890                 }
891                 countedDev[slot.mnt.UUID] = true
892         }
893         if repl < needRepl {
894                 bbs.unachievable = true
895         }
896         return
897 }
898
899 type blocksNBytes struct {
900         replicas int
901         blocks   int
902         bytes    int64
903 }
904
905 func (bb blocksNBytes) String() string {
906         return fmt.Sprintf("%d replicas (%d blocks, %d bytes)", bb.replicas, bb.blocks, bb.bytes)
907 }
908
909 type replicationStats struct {
910         needed       blocksNBytes
911         unneeded     blocksNBytes
912         pulling      blocksNBytes
913         unachievable blocksNBytes
914 }
915
916 type balancerStats struct {
917         lost          blocksNBytes
918         overrep       blocksNBytes
919         unref         blocksNBytes
920         garbage       blocksNBytes
921         underrep      blocksNBytes
922         unachievable  blocksNBytes
923         justright     blocksNBytes
924         desired       blocksNBytes
925         current       blocksNBytes
926         pulls         int
927         trashes       int
928         replHistogram []int
929         classStats    map[string]replicationStats
930
931         // collectionBytes / collectionBlockBytes = deduplication ratio
932         collectionBytes      int64 // sum(bytes in referenced blocks) across all collections
933         collectionBlockBytes int64 // sum(block size) across all blocks referenced by collections
934         collectionBlockRefs  int64 // sum(number of blocks referenced) across all collections
935         collectionBlocks     int64 // number of blocks referenced by any collection
936 }
937
938 func (s *balancerStats) dedupByteRatio() float64 {
939         if s.collectionBlockBytes == 0 {
940                 return 0
941         }
942         return float64(s.collectionBytes) / float64(s.collectionBlockBytes)
943 }
944
945 func (s *balancerStats) dedupBlockRatio() float64 {
946         if s.collectionBlocks == 0 {
947                 return 0
948         }
949         return float64(s.collectionBlockRefs) / float64(s.collectionBlocks)
950 }
951
952 func (bal *Balancer) collectStatistics(results <-chan balanceResult) {
953         var s balancerStats
954         s.replHistogram = make([]int, 2)
955         s.classStats = make(map[string]replicationStats, len(bal.classes))
956         for result := range results {
957                 bytes := result.blkid.Size()
958
959                 if rc := int64(result.blk.RefCount); rc > 0 {
960                         s.collectionBytes += rc * bytes
961                         s.collectionBlockBytes += bytes
962                         s.collectionBlockRefs += rc
963                         s.collectionBlocks++
964                 }
965
966                 for class, state := range result.classState {
967                         cs := s.classStats[class]
968                         if state.unachievable {
969                                 cs.unachievable.replicas++
970                                 cs.unachievable.blocks++
971                                 cs.unachievable.bytes += bytes
972                         }
973                         if state.needed > 0 {
974                                 cs.needed.replicas += state.needed
975                                 cs.needed.blocks++
976                                 cs.needed.bytes += bytes * int64(state.needed)
977                         }
978                         if state.unneeded > 0 {
979                                 cs.unneeded.replicas += state.unneeded
980                                 cs.unneeded.blocks++
981                                 cs.unneeded.bytes += bytes * int64(state.unneeded)
982                         }
983                         if state.pulling > 0 {
984                                 cs.pulling.replicas += state.pulling
985                                 cs.pulling.blocks++
986                                 cs.pulling.bytes += bytes * int64(state.pulling)
987                         }
988                         s.classStats[class] = cs
989                 }
990
991                 bs := result.blockState
992                 switch {
993                 case result.lost:
994                         s.lost.replicas++
995                         s.lost.blocks++
996                         s.lost.bytes += bytes
997                         fmt.Fprintf(bal.lostBlocks, "%s", strings.SplitN(string(result.blkid), "+", 2)[0])
998                         for pdh := range result.blk.Refs {
999                                 fmt.Fprintf(bal.lostBlocks, " %s", pdh)
1000                         }
1001                         fmt.Fprint(bal.lostBlocks, "\n")
1002                 case bs.pulling > 0:
1003                         s.underrep.replicas += bs.pulling
1004                         s.underrep.blocks++
1005                         s.underrep.bytes += bytes * int64(bs.pulling)
1006                 case bs.unachievable:
1007                         s.underrep.replicas++
1008                         s.underrep.blocks++
1009                         s.underrep.bytes += bytes
1010                 case bs.unneeded > 0 && bs.needed == 0:
1011                         // Count as "garbage" if all replicas are old
1012                         // enough to trash, otherwise count as
1013                         // "unref".
1014                         counter := &s.garbage
1015                         for _, r := range result.blk.Replicas {
1016                                 if r.Mtime >= bal.MinMtime {
1017                                         counter = &s.unref
1018                                         break
1019                                 }
1020                         }
1021                         counter.replicas += bs.unneeded
1022                         counter.blocks++
1023                         counter.bytes += bytes * int64(bs.unneeded)
1024                 case bs.unneeded > 0:
1025                         s.overrep.replicas += bs.unneeded
1026                         s.overrep.blocks++
1027                         s.overrep.bytes += bytes * int64(bs.unneeded)
1028                 default:
1029                         s.justright.replicas += bs.needed
1030                         s.justright.blocks++
1031                         s.justright.bytes += bytes * int64(bs.needed)
1032                 }
1033
1034                 if bs.needed > 0 {
1035                         s.desired.replicas += bs.needed
1036                         s.desired.blocks++
1037                         s.desired.bytes += bytes * int64(bs.needed)
1038                 }
1039                 if bs.needed+bs.unneeded > 0 {
1040                         s.current.replicas += bs.needed + bs.unneeded
1041                         s.current.blocks++
1042                         s.current.bytes += bytes * int64(bs.needed+bs.unneeded)
1043                 }
1044
1045                 for len(s.replHistogram) <= bs.needed+bs.unneeded {
1046                         s.replHistogram = append(s.replHistogram, 0)
1047                 }
1048                 s.replHistogram[bs.needed+bs.unneeded]++
1049         }
1050         for _, srv := range bal.KeepServices {
1051                 s.pulls += len(srv.ChangeSet.Pulls)
1052                 s.trashes += len(srv.ChangeSet.Trashes)
1053         }
1054         bal.stats = s
1055         bal.Metrics.UpdateStats(s)
1056 }
1057
1058 // PrintStatistics writes statistics about the computed changes to
1059 // bal.Logger. It should not be called until ComputeChangeSets has
1060 // finished.
1061 func (bal *Balancer) PrintStatistics() {
1062         bal.logf("===")
1063         bal.logf("%s lost (0=have<want)", bal.stats.lost)
1064         bal.logf("%s underreplicated (0<have<want)", bal.stats.underrep)
1065         bal.logf("%s just right (have=want)", bal.stats.justright)
1066         bal.logf("%s overreplicated (have>want>0)", bal.stats.overrep)
1067         bal.logf("%s unreferenced (have>want=0, new)", bal.stats.unref)
1068         bal.logf("%s garbage (have>want=0, old)", bal.stats.garbage)
1069         for _, class := range bal.classes {
1070                 cs := bal.stats.classStats[class]
1071                 bal.logf("===")
1072                 bal.logf("storage class %q: %s needed", class, cs.needed)
1073                 bal.logf("storage class %q: %s unneeded", class, cs.unneeded)
1074                 bal.logf("storage class %q: %s pulling", class, cs.pulling)
1075                 bal.logf("storage class %q: %s unachievable", class, cs.unachievable)
1076         }
1077         bal.logf("===")
1078         bal.logf("%s total commitment (excluding unreferenced)", bal.stats.desired)
1079         bal.logf("%s total usage", bal.stats.current)
1080         bal.logf("===")
1081         for _, srv := range bal.KeepServices {
1082                 bal.logf("%s: %v\n", srv, srv.ChangeSet)
1083         }
1084         bal.logf("===")
1085         bal.printHistogram(60)
1086         bal.logf("===")
1087 }
1088
1089 func (bal *Balancer) printHistogram(hashColumns int) {
1090         bal.logf("Replication level distribution:")
1091         maxCount := 0
1092         for _, count := range bal.stats.replHistogram {
1093                 if maxCount < count {
1094                         maxCount = count
1095                 }
1096         }
1097         hashes := strings.Repeat("#", hashColumns)
1098         countWidth := 1 + int(math.Log10(float64(maxCount+1)))
1099         scaleCount := 10 * float64(hashColumns) / math.Floor(1+10*math.Log10(float64(maxCount+1)))
1100         for repl, count := range bal.stats.replHistogram {
1101                 nHashes := int(scaleCount * math.Log10(float64(count+1)))
1102                 bal.logf("%2d: %*d %s", repl, countWidth, count, hashes[:nHashes])
1103         }
1104 }
1105
1106 // CheckSanityLate checks for configuration and runtime errors after
1107 // GetCurrentState() and ComputeChangeSets() have finished.
1108 //
1109 // If it returns an error, it is dangerous to run any Commit methods.
1110 func (bal *Balancer) CheckSanityLate() error {
1111         if bal.errors != nil {
1112                 for _, err := range bal.errors {
1113                         bal.logf("deferred error: %v", err)
1114                 }
1115                 return fmt.Errorf("cannot proceed safely after deferred errors")
1116         }
1117
1118         if bal.collScanned == 0 {
1119                 return fmt.Errorf("received zero collections")
1120         }
1121
1122         anyDesired := false
1123         bal.BlockStateMap.Apply(func(_ arvados.SizedDigest, blk *BlockState) {
1124                 for _, desired := range blk.Desired {
1125                         if desired > 0 {
1126                                 anyDesired = true
1127                                 break
1128                         }
1129                 }
1130         })
1131         if !anyDesired {
1132                 return fmt.Errorf("zero blocks have desired replication>0")
1133         }
1134
1135         if dr := bal.DefaultReplication; dr < 1 {
1136                 return fmt.Errorf("Default replication (%d) is less than 1", dr)
1137         }
1138
1139         // TODO: no two services have identical indexes
1140         // TODO: no collisions (same md5, different size)
1141         return nil
1142 }
1143
1144 // CommitPulls sends the computed lists of pull requests to the
1145 // keepstore servers. This has the effect of increasing replication of
1146 // existing blocks that are either underreplicated or poorly
1147 // distributed according to rendezvous hashing.
1148 func (bal *Balancer) CommitPulls(ctx context.Context, c *arvados.Client) error {
1149         defer bal.time("send_pull_lists", "wall clock time to send pull lists")()
1150         return bal.commitAsync(c, "send pull list",
1151                 func(srv *KeepService) error {
1152                         return srv.CommitPulls(ctx, c)
1153                 })
1154 }
1155
1156 // CommitTrash sends the computed lists of trash requests to the
1157 // keepstore servers. This has the effect of deleting blocks that are
1158 // overreplicated or unreferenced.
1159 func (bal *Balancer) CommitTrash(ctx context.Context, c *arvados.Client) error {
1160         defer bal.time("send_trash_lists", "wall clock time to send trash lists")()
1161         return bal.commitAsync(c, "send trash list",
1162                 func(srv *KeepService) error {
1163                         return srv.CommitTrash(ctx, c)
1164                 })
1165 }
1166
1167 func (bal *Balancer) commitAsync(c *arvados.Client, label string, f func(srv *KeepService) error) error {
1168         errs := make(chan error)
1169         for _, srv := range bal.KeepServices {
1170                 go func(srv *KeepService) {
1171                         var err error
1172                         defer func() { errs <- err }()
1173                         label := fmt.Sprintf("%s: %v", srv, label)
1174                         err = f(srv)
1175                         if err != nil {
1176                                 err = fmt.Errorf("%s: %v", label, err)
1177                         }
1178                 }(srv)
1179         }
1180         var lastErr error
1181         for range bal.KeepServices {
1182                 if err := <-errs; err != nil {
1183                         bal.logf("%v", err)
1184                         lastErr = err
1185                 }
1186         }
1187         close(errs)
1188         return lastErr
1189 }
1190
1191 func (bal *Balancer) logf(f string, args ...interface{}) {
1192         if bal.Logger != nil {
1193                 bal.Logger.Printf(f, args...)
1194         }
1195 }
1196
1197 func (bal *Balancer) time(name, help string) func() {
1198         observer := bal.Metrics.DurationObserver(name+"_seconds", help)
1199         t0 := time.Now()
1200         bal.Logger.Printf("%s: start", name)
1201         return func() {
1202                 dur := time.Since(t0)
1203                 observer.Observe(dur.Seconds())
1204                 bal.Logger.Printf("%s: took %vs", name, dur.Seconds())
1205         }
1206 }
1207
1208 // Rendezvous hash sort function. Less efficient than sorting on
1209 // precomputed rendezvous hashes, but also rarely used.
1210 func rendezvousLess(i, j string, blkid arvados.SizedDigest) bool {
1211         a := md5.Sum([]byte(string(blkid[:32]) + i))
1212         b := md5.Sum([]byte(string(blkid[:32]) + j))
1213         return bytes.Compare(a[:], b[:]) < 0
1214 }