18547: Use volume UUID instead of DeviceID to deduplicate mounts.
[arvados.git] / services / keep-balance / balance.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "context"
10         "crypto/md5"
11         "fmt"
12         "io"
13         "io/ioutil"
14         "log"
15         "math"
16         "os"
17         "runtime"
18         "sort"
19         "strings"
20         "sync"
21         "sync/atomic"
22         "syscall"
23         "time"
24
25         "git.arvados.org/arvados.git/sdk/go/arvados"
26         "git.arvados.org/arvados.git/sdk/go/keepclient"
27         "github.com/jmoiron/sqlx"
28         "github.com/sirupsen/logrus"
29 )
30
31 // Balancer compares the contents of keepstore servers with the
32 // collections stored in Arvados, and issues pull/trash requests
33 // needed to get (closer to) the optimal data layout.
34 //
35 // In the optimal data layout: every data block referenced by a
36 // collection is replicated at least as many times as desired by the
37 // collection; there are no unreferenced data blocks older than
38 // BlobSignatureTTL; and all N existing replicas of a given data block
39 // are in the N best positions in rendezvous probe order.
40 type Balancer struct {
41         DB      *sqlx.DB
42         Logger  logrus.FieldLogger
43         Dumper  logrus.FieldLogger
44         Metrics *metrics
45
46         LostBlocksFile string
47
48         *BlockStateMap
49         KeepServices       map[string]*KeepService
50         DefaultReplication int
51         MinMtime           int64
52
53         classes       []string
54         mounts        int
55         mountsByClass map[string]map[*KeepMount]bool
56         collScanned   int64
57         serviceRoots  map[string]string
58         errors        []error
59         stats         balancerStats
60         mutex         sync.Mutex
61         lostBlocks    io.Writer
62 }
63
64 // Run performs a balance operation using the given config and
65 // runOptions, and returns RunOptions suitable for passing to a
66 // subsequent balance operation.
67 //
68 // Run should only be called once on a given Balancer object.
69 //
70 // Typical usage:
71 //
72 //   runOptions, err = (&Balancer{}).Run(config, runOptions)
73 func (bal *Balancer) Run(client *arvados.Client, cluster *arvados.Cluster, runOptions RunOptions) (nextRunOptions RunOptions, err error) {
74         nextRunOptions = runOptions
75
76         defer bal.time("sweep", "wall clock time to run one full sweep")()
77
78         ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(cluster.Collections.BalanceTimeout.Duration()))
79         defer cancel()
80
81         var lbFile *os.File
82         if bal.LostBlocksFile != "" {
83                 tmpfn := bal.LostBlocksFile + ".tmp"
84                 lbFile, err = os.OpenFile(tmpfn, os.O_CREATE|os.O_WRONLY, 0777)
85                 if err != nil {
86                         return
87                 }
88                 defer lbFile.Close()
89                 err = syscall.Flock(int(lbFile.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
90                 if err != nil {
91                         return
92                 }
93                 defer func() {
94                         // Remove the tempfile only if we didn't get
95                         // as far as successfully renaming it.
96                         if lbFile != nil {
97                                 os.Remove(tmpfn)
98                         }
99                 }()
100                 bal.lostBlocks = lbFile
101         } else {
102                 bal.lostBlocks = ioutil.Discard
103         }
104
105         err = bal.DiscoverKeepServices(client)
106         if err != nil {
107                 return
108         }
109
110         for _, srv := range bal.KeepServices {
111                 err = srv.discoverMounts(client)
112                 if err != nil {
113                         return
114                 }
115         }
116         bal.cleanupMounts()
117
118         if err = bal.CheckSanityEarly(client); err != nil {
119                 return
120         }
121
122         // On a big site, indexing and sending trash/pull lists can
123         // take much longer than the usual 5 minute client
124         // timeout. From here on, we rely on the context deadline
125         // instead, aborting the entire operation if any part takes
126         // too long.
127         client.Timeout = 0
128
129         rs := bal.rendezvousState()
130         if runOptions.CommitTrash && rs != runOptions.SafeRendezvousState {
131                 if runOptions.SafeRendezvousState != "" {
132                         bal.logf("notice: KeepServices list has changed since last run")
133                 }
134                 bal.logf("clearing existing trash lists, in case the new rendezvous order differs from previous run")
135                 if err = bal.ClearTrashLists(ctx, client); err != nil {
136                         return
137                 }
138                 // The current rendezvous state becomes "safe" (i.e.,
139                 // OK to compute changes for that state without
140                 // clearing existing trash lists) only now, after we
141                 // succeed in clearing existing trash lists.
142                 nextRunOptions.SafeRendezvousState = rs
143         }
144
145         if err = bal.GetCurrentState(ctx, client, cluster.Collections.BalanceCollectionBatch, cluster.Collections.BalanceCollectionBuffers); err != nil {
146                 return
147         }
148         bal.ComputeChangeSets()
149         bal.PrintStatistics()
150         if err = bal.CheckSanityLate(); err != nil {
151                 return
152         }
153         if lbFile != nil {
154                 err = lbFile.Sync()
155                 if err != nil {
156                         return
157                 }
158                 err = os.Rename(bal.LostBlocksFile+".tmp", bal.LostBlocksFile)
159                 if err != nil {
160                         return
161                 }
162                 lbFile = nil
163         }
164         if runOptions.CommitPulls {
165                 err = bal.CommitPulls(ctx, client)
166                 if err != nil {
167                         // Skip trash if we can't pull. (Too cautious?)
168                         return
169                 }
170         }
171         if runOptions.CommitTrash {
172                 err = bal.CommitTrash(ctx, client)
173                 if err != nil {
174                         return
175                 }
176         }
177         if runOptions.CommitConfirmedFields {
178                 err = bal.updateCollections(ctx, client, cluster)
179                 if err != nil {
180                         return
181                 }
182         }
183         return
184 }
185
186 // SetKeepServices sets the list of KeepServices to operate on.
187 func (bal *Balancer) SetKeepServices(srvList arvados.KeepServiceList) error {
188         bal.KeepServices = make(map[string]*KeepService)
189         for _, srv := range srvList.Items {
190                 bal.KeepServices[srv.UUID] = &KeepService{
191                         KeepService: srv,
192                         ChangeSet:   &ChangeSet{},
193                 }
194         }
195         return nil
196 }
197
198 // DiscoverKeepServices sets the list of KeepServices by calling the
199 // API to get a list of all services, and selecting the ones whose
200 // ServiceType is "disk"
201 func (bal *Balancer) DiscoverKeepServices(c *arvados.Client) error {
202         bal.KeepServices = make(map[string]*KeepService)
203         return c.EachKeepService(func(srv arvados.KeepService) error {
204                 if srv.ServiceType == "disk" {
205                         bal.KeepServices[srv.UUID] = &KeepService{
206                                 KeepService: srv,
207                                 ChangeSet:   &ChangeSet{},
208                         }
209                 } else {
210                         bal.logf("skipping %v with service type %q", srv.UUID, srv.ServiceType)
211                 }
212                 return nil
213         })
214 }
215
216 func (bal *Balancer) cleanupMounts() {
217         rwdev := map[string]*KeepService{}
218         for _, srv := range bal.KeepServices {
219                 for _, mnt := range srv.mounts {
220                         if !mnt.ReadOnly {
221                                 rwdev[mnt.UUID] = srv
222                         }
223                 }
224         }
225         // Drop the readonly mounts whose device is mounted RW
226         // elsewhere.
227         for _, srv := range bal.KeepServices {
228                 var dedup []*KeepMount
229                 for _, mnt := range srv.mounts {
230                         if mnt.ReadOnly && rwdev[mnt.UUID] != nil {
231                                 bal.logf("skipping srv %s readonly mount %q because same volume is mounted read-write on srv %s", srv, mnt.UUID, rwdev[mnt.UUID])
232                         } else {
233                                 dedup = append(dedup, mnt)
234                         }
235                 }
236                 srv.mounts = dedup
237         }
238         for _, srv := range bal.KeepServices {
239                 for _, mnt := range srv.mounts {
240                         if mnt.Replication <= 0 {
241                                 log.Printf("%s: mount %s reports replication=%d, using replication=1", srv, mnt.UUID, mnt.Replication)
242                                 mnt.Replication = 1
243                         }
244                 }
245         }
246 }
247
248 // CheckSanityEarly checks for configuration and runtime errors that
249 // can be detected before GetCurrentState() and ComputeChangeSets()
250 // are called.
251 //
252 // If it returns an error, it is pointless to run GetCurrentState or
253 // ComputeChangeSets: after doing so, the statistics would be
254 // meaningless and it would be dangerous to run any Commit methods.
255 func (bal *Balancer) CheckSanityEarly(c *arvados.Client) error {
256         u, err := c.CurrentUser()
257         if err != nil {
258                 return fmt.Errorf("CurrentUser(): %v", err)
259         }
260         if !u.IsActive || !u.IsAdmin {
261                 return fmt.Errorf("current user (%s) is not an active admin user", u.UUID)
262         }
263         for _, srv := range bal.KeepServices {
264                 if srv.ServiceType == "proxy" {
265                         return fmt.Errorf("config error: %s: proxy servers cannot be balanced", srv)
266                 }
267         }
268
269         var checkPage arvados.CollectionList
270         if err = c.RequestAndDecode(&checkPage, "GET", "arvados/v1/collections", nil, arvados.ResourceListParams{
271                 Limit:              new(int),
272                 Count:              "exact",
273                 IncludeTrash:       true,
274                 IncludeOldVersions: true,
275                 Filters: []arvados.Filter{{
276                         Attr:     "modified_at",
277                         Operator: "=",
278                         Operand:  nil,
279                 }},
280         }); err != nil {
281                 return err
282         } else if n := checkPage.ItemsAvailable; n > 0 {
283                 return fmt.Errorf("%d collections exist with null modified_at; cannot fetch reliably", n)
284         }
285
286         return nil
287 }
288
289 // rendezvousState returns a fingerprint (e.g., a sorted list of
290 // UUID+host+port) of the current set of keep services.
291 func (bal *Balancer) rendezvousState() string {
292         srvs := make([]string, 0, len(bal.KeepServices))
293         for _, srv := range bal.KeepServices {
294                 srvs = append(srvs, srv.String())
295         }
296         sort.Strings(srvs)
297         return strings.Join(srvs, "; ")
298 }
299
300 // ClearTrashLists sends an empty trash list to each keep
301 // service. Calling this before GetCurrentState avoids races.
302 //
303 // When a block appears in an index, we assume that replica will still
304 // exist after we delete other replicas on other servers. However,
305 // it's possible that a previous rebalancing operation made different
306 // decisions (e.g., servers were added/removed, and rendezvous order
307 // changed). In this case, the replica might already be on that
308 // server's trash list, and it might be deleted before we send a
309 // replacement trash list.
310 //
311 // We avoid this problem if we clear all trash lists before getting
312 // indexes. (We also assume there is only one rebalancing process
313 // running at a time.)
314 func (bal *Balancer) ClearTrashLists(ctx context.Context, c *arvados.Client) error {
315         for _, srv := range bal.KeepServices {
316                 srv.ChangeSet = &ChangeSet{}
317         }
318         return bal.CommitTrash(ctx, c)
319 }
320
321 // GetCurrentState determines the current replication state, and the
322 // desired replication level, for every block that is either
323 // retrievable or referenced.
324 //
325 // It determines the current replication state by reading the block index
326 // from every known Keep service.
327 //
328 // It determines the desired replication level by retrieving all
329 // collection manifests in the database (API server).
330 //
331 // It encodes the resulting information in BlockStateMap.
332 func (bal *Balancer) GetCurrentState(ctx context.Context, c *arvados.Client, pageSize, bufs int) error {
333         ctx, cancel := context.WithCancel(ctx)
334         defer cancel()
335
336         defer bal.time("get_state", "wall clock time to get current state")()
337         bal.BlockStateMap = NewBlockStateMap()
338
339         dd, err := c.DiscoveryDocument()
340         if err != nil {
341                 return err
342         }
343         bal.DefaultReplication = dd.DefaultCollectionReplication
344         bal.MinMtime = time.Now().UnixNano() - dd.BlobSignatureTTL*1e9
345
346         errs := make(chan error, 1)
347         wg := sync.WaitGroup{}
348
349         // When a device is mounted more than once, we will get its
350         // index only once, and call AddReplicas on all of the mounts.
351         // equivMount keys are the mounts that will be indexed, and
352         // each value is a list of mounts to apply the received index
353         // to.
354         equivMount := map[*KeepMount][]*KeepMount{}
355         // deviceMount maps each device ID to the one mount that will
356         // be indexed for that device.
357         deviceMount := map[string]*KeepMount{}
358         for _, srv := range bal.KeepServices {
359                 for _, mnt := range srv.mounts {
360                         equiv := deviceMount[mnt.UUID]
361                         if equiv == nil {
362                                 equiv = mnt
363                                 deviceMount[mnt.UUID] = equiv
364                         }
365                         equivMount[equiv] = append(equivMount[equiv], mnt)
366                 }
367         }
368
369         // Start one goroutine for each (non-redundant) mount:
370         // retrieve the index, and add the returned blocks to
371         // BlockStateMap.
372         for _, mounts := range equivMount {
373                 wg.Add(1)
374                 go func(mounts []*KeepMount) {
375                         defer wg.Done()
376                         bal.logf("mount %s: retrieve index from %s", mounts[0], mounts[0].KeepService)
377                         idx, err := mounts[0].KeepService.IndexMount(ctx, c, mounts[0].UUID, "")
378                         if err != nil {
379                                 select {
380                                 case errs <- fmt.Errorf("%s: retrieve index: %v", mounts[0], err):
381                                 default:
382                                 }
383                                 cancel()
384                                 return
385                         }
386                         if len(errs) > 0 {
387                                 // Some other goroutine encountered an
388                                 // error -- any further effort here
389                                 // will be wasted.
390                                 return
391                         }
392                         for _, mount := range mounts {
393                                 bal.logf("%s: add %d entries to map", mount, len(idx))
394                                 bal.BlockStateMap.AddReplicas(mount, idx)
395                                 bal.logf("%s: added %d entries to map at %dx (%d replicas)", mount, len(idx), mount.Replication, len(idx)*mount.Replication)
396                         }
397                         bal.logf("mount %s: index done", mounts[0])
398                 }(mounts)
399         }
400
401         collQ := make(chan arvados.Collection, bufs)
402
403         // Retrieve all collections from the database and send them to
404         // collQ.
405         wg.Add(1)
406         go func() {
407                 defer wg.Done()
408                 err = EachCollection(ctx, bal.DB, c,
409                         func(coll arvados.Collection) error {
410                                 collQ <- coll
411                                 if len(errs) > 0 {
412                                         // some other GetCurrentState
413                                         // error happened: no point
414                                         // getting any more
415                                         // collections.
416                                         return fmt.Errorf("")
417                                 }
418                                 return nil
419                         }, func(done, total int) {
420                                 bal.logf("collections: %d/%d", done, total)
421                         })
422                 close(collQ)
423                 if err != nil {
424                         select {
425                         case errs <- err:
426                         default:
427                         }
428                         cancel()
429                 }
430         }()
431
432         // Parse manifests from collQ and pass the block hashes to
433         // BlockStateMap to track desired replication.
434         for i := 0; i < runtime.NumCPU(); i++ {
435                 wg.Add(1)
436                 go func() {
437                         defer wg.Done()
438                         for coll := range collQ {
439                                 err := bal.addCollection(coll)
440                                 if err != nil || len(errs) > 0 {
441                                         select {
442                                         case errs <- err:
443                                         default:
444                                         }
445                                         cancel()
446                                         continue
447                                 }
448                                 atomic.AddInt64(&bal.collScanned, 1)
449                         }
450                 }()
451         }
452
453         wg.Wait()
454         if len(errs) > 0 {
455                 return <-errs
456         }
457         return nil
458 }
459
460 func (bal *Balancer) addCollection(coll arvados.Collection) error {
461         blkids, err := coll.SizedDigests()
462         if err != nil {
463                 return fmt.Errorf("%v: %v", coll.UUID, err)
464         }
465         repl := bal.DefaultReplication
466         if coll.ReplicationDesired != nil {
467                 repl = *coll.ReplicationDesired
468         }
469         bal.Logger.Debugf("%v: %d blocks x%d", coll.UUID, len(blkids), repl)
470         // Pass pdh to IncreaseDesired only if LostBlocksFile is being
471         // written -- otherwise it's just a waste of memory.
472         pdh := ""
473         if bal.LostBlocksFile != "" {
474                 pdh = coll.PortableDataHash
475         }
476         bal.BlockStateMap.IncreaseDesired(pdh, coll.StorageClassesDesired, repl, blkids)
477         return nil
478 }
479
480 // ComputeChangeSets compares, for each known block, the current and
481 // desired replication states. If it is possible to get closer to the
482 // desired state by copying or deleting blocks, it adds those changes
483 // to the relevant KeepServices' ChangeSets.
484 //
485 // It does not actually apply any of the computed changes.
486 func (bal *Balancer) ComputeChangeSets() {
487         // This just calls balanceBlock() once for each block, using a
488         // pool of worker goroutines.
489         defer bal.time("changeset_compute", "wall clock time to compute changesets")()
490         bal.setupLookupTables()
491
492         type balanceTask struct {
493                 blkid arvados.SizedDigest
494                 blk   *BlockState
495         }
496         workers := runtime.GOMAXPROCS(-1)
497         todo := make(chan balanceTask, workers)
498         go func() {
499                 bal.BlockStateMap.Apply(func(blkid arvados.SizedDigest, blk *BlockState) {
500                         todo <- balanceTask{
501                                 blkid: blkid,
502                                 blk:   blk,
503                         }
504                 })
505                 close(todo)
506         }()
507         results := make(chan balanceResult, workers)
508         go func() {
509                 var wg sync.WaitGroup
510                 for i := 0; i < workers; i++ {
511                         wg.Add(1)
512                         go func() {
513                                 for work := range todo {
514                                         results <- bal.balanceBlock(work.blkid, work.blk)
515                                 }
516                                 wg.Done()
517                         }()
518                 }
519                 wg.Wait()
520                 close(results)
521         }()
522         bal.collectStatistics(results)
523 }
524
525 func (bal *Balancer) setupLookupTables() {
526         bal.serviceRoots = make(map[string]string)
527         bal.classes = defaultClasses
528         bal.mountsByClass = map[string]map[*KeepMount]bool{"default": {}}
529         bal.mounts = 0
530         for _, srv := range bal.KeepServices {
531                 bal.serviceRoots[srv.UUID] = srv.UUID
532                 for _, mnt := range srv.mounts {
533                         bal.mounts++
534
535                         // All mounts on a read-only service are
536                         // effectively read-only.
537                         mnt.ReadOnly = mnt.ReadOnly || srv.ReadOnly
538
539                         for class := range mnt.StorageClasses {
540                                 if mbc := bal.mountsByClass[class]; mbc == nil {
541                                         bal.classes = append(bal.classes, class)
542                                         bal.mountsByClass[class] = map[*KeepMount]bool{mnt: true}
543                                 } else {
544                                         mbc[mnt] = true
545                                 }
546                         }
547                 }
548         }
549         // Consider classes in lexicographic order to avoid flapping
550         // between balancing runs.  The outcome of the "prefer a mount
551         // we're already planning to use for a different storage
552         // class" case in balanceBlock depends on the order classes
553         // are considered.
554         sort.Strings(bal.classes)
555 }
556
557 const (
558         changeStay = iota
559         changePull
560         changeTrash
561         changeNone
562 )
563
564 var changeName = map[int]string{
565         changeStay:  "stay",
566         changePull:  "pull",
567         changeTrash: "trash",
568         changeNone:  "none",
569 }
570
571 type balancedBlockState struct {
572         needed       int
573         unneeded     int
574         pulling      int
575         unachievable bool
576 }
577
578 type balanceResult struct {
579         blk        *BlockState
580         blkid      arvados.SizedDigest
581         lost       bool
582         blockState balancedBlockState
583         classState map[string]balancedBlockState
584 }
585
586 type slot struct {
587         mnt  *KeepMount // never nil
588         repl *Replica   // replica already stored here (or nil)
589         want bool       // we should pull/leave a replica here
590 }
591
592 // balanceBlock compares current state to desired state for a single
593 // block, and makes the appropriate ChangeSet calls.
594 func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) balanceResult {
595         bal.Logger.Debugf("balanceBlock: %v %+v", blkid, blk)
596
597         // Build a list of all slots (one per mounted volume).
598         slots := make([]slot, 0, bal.mounts)
599         for _, srv := range bal.KeepServices {
600                 for _, mnt := range srv.mounts {
601                         var repl *Replica
602                         for r := range blk.Replicas {
603                                 if blk.Replicas[r].KeepMount == mnt {
604                                         repl = &blk.Replicas[r]
605                                 }
606                         }
607                         // Initial value of "want" is "have, and can't
608                         // delete". These untrashable replicas get
609                         // prioritized when sorting slots: otherwise,
610                         // non-optimal readonly copies would cause us
611                         // to overreplicate.
612                         slots = append(slots, slot{
613                                 mnt:  mnt,
614                                 repl: repl,
615                                 want: repl != nil && mnt.ReadOnly,
616                         })
617                 }
618         }
619
620         uuids := keepclient.NewRootSorter(bal.serviceRoots, string(blkid[:32])).GetSortedRoots()
621         srvRendezvous := make(map[*KeepService]int, len(uuids))
622         for i, uuid := range uuids {
623                 srv := bal.KeepServices[uuid]
624                 srvRendezvous[srv] = i
625         }
626
627         // Below we set underreplicated=true if we find any storage
628         // class that's currently underreplicated -- in that case we
629         // won't want to trash any replicas.
630         underreplicated := false
631
632         unsafeToDelete := make(map[int64]bool, len(slots))
633         for _, class := range bal.classes {
634                 desired := blk.Desired[class]
635                 if desired == 0 {
636                         continue
637                 }
638
639                 // Sort the slots by desirability.
640                 sort.Slice(slots, func(i, j int) bool {
641                         si, sj := slots[i], slots[j]
642                         if classi, classj := bal.mountsByClass[class][si.mnt], bal.mountsByClass[class][sj.mnt]; classi != classj {
643                                 // Prefer a mount that satisfies the
644                                 // desired class.
645                                 return bal.mountsByClass[class][si.mnt]
646                         } else if si.want != sj.want {
647                                 // Prefer a mount that will have a
648                                 // replica no matter what we do here
649                                 // -- either because it already has an
650                                 // untrashable replica, or because we
651                                 // already need it to satisfy a
652                                 // different storage class.
653                                 return si.want
654                         } else if orderi, orderj := srvRendezvous[si.mnt.KeepService], srvRendezvous[sj.mnt.KeepService]; orderi != orderj {
655                                 // Prefer a better rendezvous
656                                 // position.
657                                 return orderi < orderj
658                         } else if repli, replj := si.repl != nil, sj.repl != nil; repli != replj {
659                                 // Prefer a mount that already has a
660                                 // replica.
661                                 return repli
662                         } else {
663                                 // If pull/trash turns out to be
664                                 // needed, distribute the
665                                 // new/remaining replicas uniformly
666                                 // across qualifying mounts on a given
667                                 // server.
668                                 return rendezvousLess(si.mnt.UUID, sj.mnt.UUID, blkid)
669                         }
670                 })
671
672                 // Servers/mounts/devices (with or without existing
673                 // replicas) that are part of the best achievable
674                 // layout for this storage class.
675                 wantSrv := map[*KeepService]bool{}
676                 wantMnt := map[*KeepMount]bool{}
677                 wantDev := map[string]bool{}
678                 // Positions (with existing replicas) that have been
679                 // protected (via unsafeToDelete) to ensure we don't
680                 // reduce replication below desired level when
681                 // trashing replicas that aren't optimal positions for
682                 // any storage class.
683                 protMnt := map[*KeepMount]bool{}
684                 // Replication planned so far (corresponds to wantMnt).
685                 replWant := 0
686                 // Protected replication (corresponds to protMnt).
687                 replProt := 0
688
689                 // trySlot tries using a slot to meet requirements,
690                 // and returns true if all requirements are met.
691                 trySlot := func(i int) bool {
692                         slot := slots[i]
693                         if wantMnt[slot.mnt] || wantDev[slot.mnt.UUID] {
694                                 // Already allocated a replica to this
695                                 // backend device, possibly on a
696                                 // different server.
697                                 return false
698                         }
699                         if replProt < desired && slot.repl != nil && !protMnt[slot.mnt] {
700                                 unsafeToDelete[slot.repl.Mtime] = true
701                                 protMnt[slot.mnt] = true
702                                 replProt += slot.mnt.Replication
703                         }
704                         if replWant < desired && (slot.repl != nil || !slot.mnt.ReadOnly) {
705                                 slots[i].want = true
706                                 wantSrv[slot.mnt.KeepService] = true
707                                 wantMnt[slot.mnt] = true
708                                 wantDev[slot.mnt.UUID] = true
709                                 replWant += slot.mnt.Replication
710                         }
711                         return replProt >= desired && replWant >= desired
712                 }
713
714                 // First try to achieve desired replication without
715                 // using the same server twice.
716                 done := false
717                 for i := 0; i < len(slots) && !done; i++ {
718                         if !wantSrv[slots[i].mnt.KeepService] {
719                                 done = trySlot(i)
720                         }
721                 }
722
723                 // If that didn't suffice, do another pass without the
724                 // "distinct services" restriction. (Achieving the
725                 // desired volume replication on fewer than the
726                 // desired number of services is better than
727                 // underreplicating.)
728                 for i := 0; i < len(slots) && !done; i++ {
729                         done = trySlot(i)
730                 }
731
732                 if !underreplicated {
733                         safe := 0
734                         for _, slot := range slots {
735                                 if slot.repl == nil || !bal.mountsByClass[class][slot.mnt] {
736                                         continue
737                                 }
738                                 if safe += slot.mnt.Replication; safe >= desired {
739                                         break
740                                 }
741                         }
742                         underreplicated = safe < desired
743                 }
744
745                 // Avoid deleting wanted replicas from devices that
746                 // are mounted on multiple servers -- even if they
747                 // haven't already been added to unsafeToDelete
748                 // because the servers report different Mtimes.
749                 for _, slot := range slots {
750                         if slot.repl != nil && wantDev[slot.mnt.UUID] {
751                                 unsafeToDelete[slot.repl.Mtime] = true
752                         }
753                 }
754         }
755
756         // TODO: If multiple replicas are trashable, prefer the oldest
757         // replica that doesn't have a timestamp collision with
758         // others.
759
760         for i, slot := range slots {
761                 // Don't trash (1) any replicas of an underreplicated
762                 // block, even if they're in the wrong positions, or
763                 // (2) any replicas whose Mtimes are identical to
764                 // needed replicas (in case we're really seeing the
765                 // same copy via different mounts).
766                 if slot.repl != nil && (underreplicated || unsafeToDelete[slot.repl.Mtime]) {
767                         slots[i].want = true
768                 }
769         }
770
771         classState := make(map[string]balancedBlockState, len(bal.classes))
772         for _, class := range bal.classes {
773                 classState[class] = computeBlockState(slots, bal.mountsByClass[class], len(blk.Replicas), blk.Desired[class])
774         }
775         blockState := computeBlockState(slots, nil, len(blk.Replicas), 0)
776
777         var lost bool
778         var changes []string
779         for _, slot := range slots {
780                 // TODO: request a Touch if Mtime is duplicated.
781                 var change int
782                 switch {
783                 case !slot.want && slot.repl != nil && slot.repl.Mtime < bal.MinMtime:
784                         slot.mnt.KeepService.AddTrash(Trash{
785                                 SizedDigest: blkid,
786                                 Mtime:       slot.repl.Mtime,
787                                 From:        slot.mnt,
788                         })
789                         change = changeTrash
790                 case slot.repl == nil && slot.want && len(blk.Replicas) == 0:
791                         lost = true
792                         change = changeNone
793                 case slot.repl == nil && slot.want && !slot.mnt.ReadOnly:
794                         slot.mnt.KeepService.AddPull(Pull{
795                                 SizedDigest: blkid,
796                                 From:        blk.Replicas[0].KeepMount.KeepService,
797                                 To:          slot.mnt,
798                         })
799                         change = changePull
800                 case slot.repl != nil:
801                         change = changeStay
802                 default:
803                         change = changeNone
804                 }
805                 if bal.Dumper != nil {
806                         var mtime int64
807                         if slot.repl != nil {
808                                 mtime = slot.repl.Mtime
809                         }
810                         srv := slot.mnt.KeepService
811                         changes = append(changes, fmt.Sprintf("%s:%d/%s=%s,%d", srv.ServiceHost, srv.ServicePort, slot.mnt.UUID, changeName[change], mtime))
812                 }
813         }
814         if bal.Dumper != nil {
815                 bal.Dumper.Printf("%s refs=%d needed=%d unneeded=%d pulling=%v %v %v", blkid, blk.RefCount, blockState.needed, blockState.unneeded, blockState.pulling, blk.Desired, changes)
816         }
817         return balanceResult{
818                 blk:        blk,
819                 blkid:      blkid,
820                 lost:       lost,
821                 blockState: blockState,
822                 classState: classState,
823         }
824 }
825
826 func computeBlockState(slots []slot, onlyCount map[*KeepMount]bool, have, needRepl int) (bbs balancedBlockState) {
827         repl := 0
828         countedDev := map[string]bool{}
829         for _, slot := range slots {
830                 if onlyCount != nil && !onlyCount[slot.mnt] {
831                         continue
832                 }
833                 if countedDev[slot.mnt.UUID] {
834                         continue
835                 }
836                 switch {
837                 case slot.repl != nil && slot.want:
838                         bbs.needed++
839                         repl += slot.mnt.Replication
840                 case slot.repl != nil && !slot.want:
841                         bbs.unneeded++
842                         repl += slot.mnt.Replication
843                 case slot.repl == nil && slot.want && have > 0:
844                         bbs.pulling++
845                         repl += slot.mnt.Replication
846                 }
847                 countedDev[slot.mnt.UUID] = true
848         }
849         if repl < needRepl {
850                 bbs.unachievable = true
851         }
852         return
853 }
854
855 type blocksNBytes struct {
856         replicas int
857         blocks   int
858         bytes    int64
859 }
860
861 func (bb blocksNBytes) String() string {
862         return fmt.Sprintf("%d replicas (%d blocks, %d bytes)", bb.replicas, bb.blocks, bb.bytes)
863 }
864
865 type replicationStats struct {
866         needed       blocksNBytes
867         unneeded     blocksNBytes
868         pulling      blocksNBytes
869         unachievable blocksNBytes
870 }
871
872 type balancerStats struct {
873         lost          blocksNBytes
874         overrep       blocksNBytes
875         unref         blocksNBytes
876         garbage       blocksNBytes
877         underrep      blocksNBytes
878         unachievable  blocksNBytes
879         justright     blocksNBytes
880         desired       blocksNBytes
881         current       blocksNBytes
882         pulls         int
883         trashes       int
884         replHistogram []int
885         classStats    map[string]replicationStats
886
887         // collectionBytes / collectionBlockBytes = deduplication ratio
888         collectionBytes      int64 // sum(bytes in referenced blocks) across all collections
889         collectionBlockBytes int64 // sum(block size) across all blocks referenced by collections
890         collectionBlockRefs  int64 // sum(number of blocks referenced) across all collections
891         collectionBlocks     int64 // number of blocks referenced by any collection
892 }
893
894 func (s *balancerStats) dedupByteRatio() float64 {
895         if s.collectionBlockBytes == 0 {
896                 return 0
897         }
898         return float64(s.collectionBytes) / float64(s.collectionBlockBytes)
899 }
900
901 func (s *balancerStats) dedupBlockRatio() float64 {
902         if s.collectionBlocks == 0 {
903                 return 0
904         }
905         return float64(s.collectionBlockRefs) / float64(s.collectionBlocks)
906 }
907
908 func (bal *Balancer) collectStatistics(results <-chan balanceResult) {
909         var s balancerStats
910         s.replHistogram = make([]int, 2)
911         s.classStats = make(map[string]replicationStats, len(bal.classes))
912         for result := range results {
913                 bytes := result.blkid.Size()
914
915                 if rc := int64(result.blk.RefCount); rc > 0 {
916                         s.collectionBytes += rc * bytes
917                         s.collectionBlockBytes += bytes
918                         s.collectionBlockRefs += rc
919                         s.collectionBlocks++
920                 }
921
922                 for class, state := range result.classState {
923                         cs := s.classStats[class]
924                         if state.unachievable {
925                                 cs.unachievable.replicas++
926                                 cs.unachievable.blocks++
927                                 cs.unachievable.bytes += bytes
928                         }
929                         if state.needed > 0 {
930                                 cs.needed.replicas += state.needed
931                                 cs.needed.blocks++
932                                 cs.needed.bytes += bytes * int64(state.needed)
933                         }
934                         if state.unneeded > 0 {
935                                 cs.unneeded.replicas += state.unneeded
936                                 cs.unneeded.blocks++
937                                 cs.unneeded.bytes += bytes * int64(state.unneeded)
938                         }
939                         if state.pulling > 0 {
940                                 cs.pulling.replicas += state.pulling
941                                 cs.pulling.blocks++
942                                 cs.pulling.bytes += bytes * int64(state.pulling)
943                         }
944                         s.classStats[class] = cs
945                 }
946
947                 bs := result.blockState
948                 switch {
949                 case result.lost:
950                         s.lost.replicas++
951                         s.lost.blocks++
952                         s.lost.bytes += bytes
953                         fmt.Fprintf(bal.lostBlocks, "%s", strings.SplitN(string(result.blkid), "+", 2)[0])
954                         for pdh := range result.blk.Refs {
955                                 fmt.Fprintf(bal.lostBlocks, " %s", pdh)
956                         }
957                         fmt.Fprint(bal.lostBlocks, "\n")
958                 case bs.pulling > 0:
959                         s.underrep.replicas += bs.pulling
960                         s.underrep.blocks++
961                         s.underrep.bytes += bytes * int64(bs.pulling)
962                 case bs.unachievable:
963                         s.underrep.replicas++
964                         s.underrep.blocks++
965                         s.underrep.bytes += bytes
966                 case bs.unneeded > 0 && bs.needed == 0:
967                         // Count as "garbage" if all replicas are old
968                         // enough to trash, otherwise count as
969                         // "unref".
970                         counter := &s.garbage
971                         for _, r := range result.blk.Replicas {
972                                 if r.Mtime >= bal.MinMtime {
973                                         counter = &s.unref
974                                         break
975                                 }
976                         }
977                         counter.replicas += bs.unneeded
978                         counter.blocks++
979                         counter.bytes += bytes * int64(bs.unneeded)
980                 case bs.unneeded > 0:
981                         s.overrep.replicas += bs.unneeded
982                         s.overrep.blocks++
983                         s.overrep.bytes += bytes * int64(bs.unneeded)
984                 default:
985                         s.justright.replicas += bs.needed
986                         s.justright.blocks++
987                         s.justright.bytes += bytes * int64(bs.needed)
988                 }
989
990                 if bs.needed > 0 {
991                         s.desired.replicas += bs.needed
992                         s.desired.blocks++
993                         s.desired.bytes += bytes * int64(bs.needed)
994                 }
995                 if bs.needed+bs.unneeded > 0 {
996                         s.current.replicas += bs.needed + bs.unneeded
997                         s.current.blocks++
998                         s.current.bytes += bytes * int64(bs.needed+bs.unneeded)
999                 }
1000
1001                 for len(s.replHistogram) <= bs.needed+bs.unneeded {
1002                         s.replHistogram = append(s.replHistogram, 0)
1003                 }
1004                 s.replHistogram[bs.needed+bs.unneeded]++
1005         }
1006         for _, srv := range bal.KeepServices {
1007                 s.pulls += len(srv.ChangeSet.Pulls)
1008                 s.trashes += len(srv.ChangeSet.Trashes)
1009         }
1010         bal.stats = s
1011         bal.Metrics.UpdateStats(s)
1012 }
1013
1014 // PrintStatistics writes statistics about the computed changes to
1015 // bal.Logger. It should not be called until ComputeChangeSets has
1016 // finished.
1017 func (bal *Balancer) PrintStatistics() {
1018         bal.logf("===")
1019         bal.logf("%s lost (0=have<want)", bal.stats.lost)
1020         bal.logf("%s underreplicated (0<have<want)", bal.stats.underrep)
1021         bal.logf("%s just right (have=want)", bal.stats.justright)
1022         bal.logf("%s overreplicated (have>want>0)", bal.stats.overrep)
1023         bal.logf("%s unreferenced (have>want=0, new)", bal.stats.unref)
1024         bal.logf("%s garbage (have>want=0, old)", bal.stats.garbage)
1025         for _, class := range bal.classes {
1026                 cs := bal.stats.classStats[class]
1027                 bal.logf("===")
1028                 bal.logf("storage class %q: %s needed", class, cs.needed)
1029                 bal.logf("storage class %q: %s unneeded", class, cs.unneeded)
1030                 bal.logf("storage class %q: %s pulling", class, cs.pulling)
1031                 bal.logf("storage class %q: %s unachievable", class, cs.unachievable)
1032         }
1033         bal.logf("===")
1034         bal.logf("%s total commitment (excluding unreferenced)", bal.stats.desired)
1035         bal.logf("%s total usage", bal.stats.current)
1036         bal.logf("===")
1037         for _, srv := range bal.KeepServices {
1038                 bal.logf("%s: %v\n", srv, srv.ChangeSet)
1039         }
1040         bal.logf("===")
1041         bal.printHistogram(60)
1042         bal.logf("===")
1043 }
1044
1045 func (bal *Balancer) printHistogram(hashColumns int) {
1046         bal.logf("Replication level distribution:")
1047         maxCount := 0
1048         for _, count := range bal.stats.replHistogram {
1049                 if maxCount < count {
1050                         maxCount = count
1051                 }
1052         }
1053         hashes := strings.Repeat("#", hashColumns)
1054         countWidth := 1 + int(math.Log10(float64(maxCount+1)))
1055         scaleCount := 10 * float64(hashColumns) / math.Floor(1+10*math.Log10(float64(maxCount+1)))
1056         for repl, count := range bal.stats.replHistogram {
1057                 nHashes := int(scaleCount * math.Log10(float64(count+1)))
1058                 bal.logf("%2d: %*d %s", repl, countWidth, count, hashes[:nHashes])
1059         }
1060 }
1061
1062 // CheckSanityLate checks for configuration and runtime errors after
1063 // GetCurrentState() and ComputeChangeSets() have finished.
1064 //
1065 // If it returns an error, it is dangerous to run any Commit methods.
1066 func (bal *Balancer) CheckSanityLate() error {
1067         if bal.errors != nil {
1068                 for _, err := range bal.errors {
1069                         bal.logf("deferred error: %v", err)
1070                 }
1071                 return fmt.Errorf("cannot proceed safely after deferred errors")
1072         }
1073
1074         if bal.collScanned == 0 {
1075                 return fmt.Errorf("received zero collections")
1076         }
1077
1078         anyDesired := false
1079         bal.BlockStateMap.Apply(func(_ arvados.SizedDigest, blk *BlockState) {
1080                 for _, desired := range blk.Desired {
1081                         if desired > 0 {
1082                                 anyDesired = true
1083                                 break
1084                         }
1085                 }
1086         })
1087         if !anyDesired {
1088                 return fmt.Errorf("zero blocks have desired replication>0")
1089         }
1090
1091         if dr := bal.DefaultReplication; dr < 1 {
1092                 return fmt.Errorf("Default replication (%d) is less than 1", dr)
1093         }
1094
1095         // TODO: no two services have identical indexes
1096         // TODO: no collisions (same md5, different size)
1097         return nil
1098 }
1099
1100 // CommitPulls sends the computed lists of pull requests to the
1101 // keepstore servers. This has the effect of increasing replication of
1102 // existing blocks that are either underreplicated or poorly
1103 // distributed according to rendezvous hashing.
1104 func (bal *Balancer) CommitPulls(ctx context.Context, c *arvados.Client) error {
1105         defer bal.time("send_pull_lists", "wall clock time to send pull lists")()
1106         return bal.commitAsync(c, "send pull list",
1107                 func(srv *KeepService) error {
1108                         return srv.CommitPulls(ctx, c)
1109                 })
1110 }
1111
1112 // CommitTrash sends the computed lists of trash requests to the
1113 // keepstore servers. This has the effect of deleting blocks that are
1114 // overreplicated or unreferenced.
1115 func (bal *Balancer) CommitTrash(ctx context.Context, c *arvados.Client) error {
1116         defer bal.time("send_trash_lists", "wall clock time to send trash lists")()
1117         return bal.commitAsync(c, "send trash list",
1118                 func(srv *KeepService) error {
1119                         return srv.CommitTrash(ctx, c)
1120                 })
1121 }
1122
1123 func (bal *Balancer) commitAsync(c *arvados.Client, label string, f func(srv *KeepService) error) error {
1124         errs := make(chan error)
1125         for _, srv := range bal.KeepServices {
1126                 go func(srv *KeepService) {
1127                         var err error
1128                         defer func() { errs <- err }()
1129                         label := fmt.Sprintf("%s: %v", srv, label)
1130                         err = f(srv)
1131                         if err != nil {
1132                                 err = fmt.Errorf("%s: %v", label, err)
1133                         }
1134                 }(srv)
1135         }
1136         var lastErr error
1137         for range bal.KeepServices {
1138                 if err := <-errs; err != nil {
1139                         bal.logf("%v", err)
1140                         lastErr = err
1141                 }
1142         }
1143         close(errs)
1144         return lastErr
1145 }
1146
1147 func (bal *Balancer) logf(f string, args ...interface{}) {
1148         if bal.Logger != nil {
1149                 bal.Logger.Printf(f, args...)
1150         }
1151 }
1152
1153 func (bal *Balancer) time(name, help string) func() {
1154         observer := bal.Metrics.DurationObserver(name+"_seconds", help)
1155         t0 := time.Now()
1156         bal.Logger.Printf("%s: start", name)
1157         return func() {
1158                 dur := time.Since(t0)
1159                 observer.Observe(dur.Seconds())
1160                 bal.Logger.Printf("%s: took %vs", name, dur.Seconds())
1161         }
1162 }
1163
1164 // Rendezvous hash sort function. Less efficient than sorting on
1165 // precomputed rendezvous hashes, but also rarely used.
1166 func rendezvousLess(i, j string, blkid arvados.SizedDigest) bool {
1167         a := md5.Sum([]byte(string(blkid[:32]) + i))
1168         b := md5.Sum([]byte(string(blkid[:32]) + j))
1169         return bytes.Compare(a[:], b[:]) < 0
1170 }