16451: do not leave testargs as a space when there are no opts provided.
[arvados.git] / services / keep-balance / balance.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "crypto/md5"
10         "fmt"
11         "io"
12         "io/ioutil"
13         "log"
14         "math"
15         "os"
16         "runtime"
17         "sort"
18         "strings"
19         "sync"
20         "syscall"
21         "time"
22
23         "git.arvados.org/arvados.git/sdk/go/arvados"
24         "git.arvados.org/arvados.git/sdk/go/keepclient"
25         "github.com/sirupsen/logrus"
26 )
27
28 // Balancer compares the contents of keepstore servers with the
29 // collections stored in Arvados, and issues pull/trash requests
30 // needed to get (closer to) the optimal data layout.
31 //
32 // In the optimal data layout: every data block referenced by a
33 // collection is replicated at least as many times as desired by the
34 // collection; there are no unreferenced data blocks older than
35 // BlobSignatureTTL; and all N existing replicas of a given data block
36 // are in the N best positions in rendezvous probe order.
37 type Balancer struct {
38         Logger  logrus.FieldLogger
39         Dumper  logrus.FieldLogger
40         Metrics *metrics
41
42         LostBlocksFile string
43
44         *BlockStateMap
45         KeepServices       map[string]*KeepService
46         DefaultReplication int
47         MinMtime           int64
48
49         classes       []string
50         mounts        int
51         mountsByClass map[string]map[*KeepMount]bool
52         collScanned   int
53         serviceRoots  map[string]string
54         errors        []error
55         stats         balancerStats
56         mutex         sync.Mutex
57         lostBlocks    io.Writer
58 }
59
60 // Run performs a balance operation using the given config and
61 // runOptions, and returns RunOptions suitable for passing to a
62 // subsequent balance operation.
63 //
64 // Run should only be called once on a given Balancer object.
65 //
66 // Typical usage:
67 //
68 //   runOptions, err = (&Balancer{}).Run(config, runOptions)
69 func (bal *Balancer) Run(client *arvados.Client, cluster *arvados.Cluster, runOptions RunOptions) (nextRunOptions RunOptions, err error) {
70         nextRunOptions = runOptions
71
72         defer bal.time("sweep", "wall clock time to run one full sweep")()
73
74         var lbFile *os.File
75         if bal.LostBlocksFile != "" {
76                 tmpfn := bal.LostBlocksFile + ".tmp"
77                 lbFile, err = os.OpenFile(tmpfn, os.O_CREATE|os.O_WRONLY, 0777)
78                 if err != nil {
79                         return
80                 }
81                 defer lbFile.Close()
82                 err = syscall.Flock(int(lbFile.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
83                 if err != nil {
84                         return
85                 }
86                 defer func() {
87                         // Remove the tempfile only if we didn't get
88                         // as far as successfully renaming it.
89                         if lbFile != nil {
90                                 os.Remove(tmpfn)
91                         }
92                 }()
93                 bal.lostBlocks = lbFile
94         } else {
95                 bal.lostBlocks = ioutil.Discard
96         }
97
98         err = bal.DiscoverKeepServices(client)
99         if err != nil {
100                 return
101         }
102
103         for _, srv := range bal.KeepServices {
104                 err = srv.discoverMounts(client)
105                 if err != nil {
106                         return
107                 }
108         }
109         bal.cleanupMounts()
110
111         if err = bal.CheckSanityEarly(client); err != nil {
112                 return
113         }
114         rs := bal.rendezvousState()
115         if runOptions.CommitTrash && rs != runOptions.SafeRendezvousState {
116                 if runOptions.SafeRendezvousState != "" {
117                         bal.logf("notice: KeepServices list has changed since last run")
118                 }
119                 bal.logf("clearing existing trash lists, in case the new rendezvous order differs from previous run")
120                 if err = bal.ClearTrashLists(client); err != nil {
121                         return
122                 }
123                 // The current rendezvous state becomes "safe" (i.e.,
124                 // OK to compute changes for that state without
125                 // clearing existing trash lists) only now, after we
126                 // succeed in clearing existing trash lists.
127                 nextRunOptions.SafeRendezvousState = rs
128         }
129         if err = bal.GetCurrentState(client, cluster.Collections.BalanceCollectionBatch, cluster.Collections.BalanceCollectionBuffers); err != nil {
130                 return
131         }
132         bal.ComputeChangeSets()
133         bal.PrintStatistics()
134         if err = bal.CheckSanityLate(); err != nil {
135                 return
136         }
137         if lbFile != nil {
138                 err = lbFile.Sync()
139                 if err != nil {
140                         return
141                 }
142                 err = os.Rename(bal.LostBlocksFile+".tmp", bal.LostBlocksFile)
143                 if err != nil {
144                         return
145                 }
146                 lbFile = nil
147         }
148         if runOptions.CommitPulls {
149                 err = bal.CommitPulls(client)
150                 if err != nil {
151                         // Skip trash if we can't pull. (Too cautious?)
152                         return
153                 }
154         }
155         if runOptions.CommitTrash {
156                 err = bal.CommitTrash(client)
157         }
158         return
159 }
160
161 // SetKeepServices sets the list of KeepServices to operate on.
162 func (bal *Balancer) SetKeepServices(srvList arvados.KeepServiceList) error {
163         bal.KeepServices = make(map[string]*KeepService)
164         for _, srv := range srvList.Items {
165                 bal.KeepServices[srv.UUID] = &KeepService{
166                         KeepService: srv,
167                         ChangeSet:   &ChangeSet{},
168                 }
169         }
170         return nil
171 }
172
173 // DiscoverKeepServices sets the list of KeepServices by calling the
174 // API to get a list of all services, and selecting the ones whose
175 // ServiceType is "disk"
176 func (bal *Balancer) DiscoverKeepServices(c *arvados.Client) error {
177         bal.KeepServices = make(map[string]*KeepService)
178         return c.EachKeepService(func(srv arvados.KeepService) error {
179                 if srv.ServiceType == "disk" {
180                         bal.KeepServices[srv.UUID] = &KeepService{
181                                 KeepService: srv,
182                                 ChangeSet:   &ChangeSet{},
183                         }
184                 } else {
185                         bal.logf("skipping %v with service type %q", srv.UUID, srv.ServiceType)
186                 }
187                 return nil
188         })
189 }
190
191 func (bal *Balancer) cleanupMounts() {
192         rwdev := map[string]*KeepService{}
193         for _, srv := range bal.KeepServices {
194                 for _, mnt := range srv.mounts {
195                         if !mnt.ReadOnly && mnt.DeviceID != "" {
196                                 rwdev[mnt.DeviceID] = srv
197                         }
198                 }
199         }
200         // Drop the readonly mounts whose device is mounted RW
201         // elsewhere.
202         for _, srv := range bal.KeepServices {
203                 var dedup []*KeepMount
204                 for _, mnt := range srv.mounts {
205                         if mnt.ReadOnly && rwdev[mnt.DeviceID] != nil {
206                                 bal.logf("skipping srv %s readonly mount %q because same device %q is mounted read-write on srv %s", srv, mnt.UUID, mnt.DeviceID, rwdev[mnt.DeviceID])
207                         } else {
208                                 dedup = append(dedup, mnt)
209                         }
210                 }
211                 srv.mounts = dedup
212         }
213         for _, srv := range bal.KeepServices {
214                 for _, mnt := range srv.mounts {
215                         if mnt.Replication <= 0 {
216                                 log.Printf("%s: mount %s reports replication=%d, using replication=1", srv, mnt.UUID, mnt.Replication)
217                                 mnt.Replication = 1
218                         }
219                 }
220         }
221 }
222
223 // CheckSanityEarly checks for configuration and runtime errors that
224 // can be detected before GetCurrentState() and ComputeChangeSets()
225 // are called.
226 //
227 // If it returns an error, it is pointless to run GetCurrentState or
228 // ComputeChangeSets: after doing so, the statistics would be
229 // meaningless and it would be dangerous to run any Commit methods.
230 func (bal *Balancer) CheckSanityEarly(c *arvados.Client) error {
231         u, err := c.CurrentUser()
232         if err != nil {
233                 return fmt.Errorf("CurrentUser(): %v", err)
234         }
235         if !u.IsActive || !u.IsAdmin {
236                 return fmt.Errorf("current user (%s) is not an active admin user", u.UUID)
237         }
238         for _, srv := range bal.KeepServices {
239                 if srv.ServiceType == "proxy" {
240                         return fmt.Errorf("config error: %s: proxy servers cannot be balanced", srv)
241                 }
242         }
243
244         var checkPage arvados.CollectionList
245         if err = c.RequestAndDecode(&checkPage, "GET", "arvados/v1/collections", nil, arvados.ResourceListParams{
246                 Limit:              new(int),
247                 Count:              "exact",
248                 IncludeTrash:       true,
249                 IncludeOldVersions: true,
250                 Filters: []arvados.Filter{{
251                         Attr:     "modified_at",
252                         Operator: "=",
253                         Operand:  nil,
254                 }},
255         }); err != nil {
256                 return err
257         } else if n := checkPage.ItemsAvailable; n > 0 {
258                 return fmt.Errorf("%d collections exist with null modified_at; cannot fetch reliably", n)
259         }
260
261         return nil
262 }
263
264 // rendezvousState returns a fingerprint (e.g., a sorted list of
265 // UUID+host+port) of the current set of keep services.
266 func (bal *Balancer) rendezvousState() string {
267         srvs := make([]string, 0, len(bal.KeepServices))
268         for _, srv := range bal.KeepServices {
269                 srvs = append(srvs, srv.String())
270         }
271         sort.Strings(srvs)
272         return strings.Join(srvs, "; ")
273 }
274
275 // ClearTrashLists sends an empty trash list to each keep
276 // service. Calling this before GetCurrentState avoids races.
277 //
278 // When a block appears in an index, we assume that replica will still
279 // exist after we delete other replicas on other servers. However,
280 // it's possible that a previous rebalancing operation made different
281 // decisions (e.g., servers were added/removed, and rendezvous order
282 // changed). In this case, the replica might already be on that
283 // server's trash list, and it might be deleted before we send a
284 // replacement trash list.
285 //
286 // We avoid this problem if we clear all trash lists before getting
287 // indexes. (We also assume there is only one rebalancing process
288 // running at a time.)
289 func (bal *Balancer) ClearTrashLists(c *arvados.Client) error {
290         for _, srv := range bal.KeepServices {
291                 srv.ChangeSet = &ChangeSet{}
292         }
293         return bal.CommitTrash(c)
294 }
295
296 // GetCurrentState determines the current replication state, and the
297 // desired replication level, for every block that is either
298 // retrievable or referenced.
299 //
300 // It determines the current replication state by reading the block index
301 // from every known Keep service.
302 //
303 // It determines the desired replication level by retrieving all
304 // collection manifests in the database (API server).
305 //
306 // It encodes the resulting information in BlockStateMap.
307 func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) error {
308         defer bal.time("get_state", "wall clock time to get current state")()
309         bal.BlockStateMap = NewBlockStateMap()
310
311         dd, err := c.DiscoveryDocument()
312         if err != nil {
313                 return err
314         }
315         bal.DefaultReplication = dd.DefaultCollectionReplication
316         bal.MinMtime = time.Now().UnixNano() - dd.BlobSignatureTTL*1e9
317
318         errs := make(chan error, 1)
319         wg := sync.WaitGroup{}
320
321         // When a device is mounted more than once, we will get its
322         // index only once, and call AddReplicas on all of the mounts.
323         // equivMount keys are the mounts that will be indexed, and
324         // each value is a list of mounts to apply the received index
325         // to.
326         equivMount := map[*KeepMount][]*KeepMount{}
327         // deviceMount maps each device ID to the one mount that will
328         // be indexed for that device.
329         deviceMount := map[string]*KeepMount{}
330         for _, srv := range bal.KeepServices {
331                 for _, mnt := range srv.mounts {
332                         equiv := deviceMount[mnt.DeviceID]
333                         if equiv == nil {
334                                 equiv = mnt
335                                 if mnt.DeviceID != "" {
336                                         deviceMount[mnt.DeviceID] = equiv
337                                 }
338                         }
339                         equivMount[equiv] = append(equivMount[equiv], mnt)
340                 }
341         }
342
343         // Start one goroutine for each (non-redundant) mount:
344         // retrieve the index, and add the returned blocks to
345         // BlockStateMap.
346         for _, mounts := range equivMount {
347                 wg.Add(1)
348                 go func(mounts []*KeepMount) {
349                         defer wg.Done()
350                         bal.logf("mount %s: retrieve index from %s", mounts[0], mounts[0].KeepService)
351                         idx, err := mounts[0].KeepService.IndexMount(c, mounts[0].UUID, "")
352                         if err != nil {
353                                 select {
354                                 case errs <- fmt.Errorf("%s: retrieve index: %v", mounts[0], err):
355                                 default:
356                                 }
357                                 return
358                         }
359                         if len(errs) > 0 {
360                                 // Some other goroutine encountered an
361                                 // error -- any further effort here
362                                 // will be wasted.
363                                 return
364                         }
365                         for _, mount := range mounts {
366                                 bal.logf("%s: add %d entries to map", mount, len(idx))
367                                 bal.BlockStateMap.AddReplicas(mount, idx)
368                                 bal.logf("%s: added %d entries to map at %dx (%d replicas)", mount, len(idx), mount.Replication, len(idx)*mount.Replication)
369                         }
370                         bal.logf("mount %s: index done", mounts[0])
371                 }(mounts)
372         }
373
374         // collQ buffers incoming collections so we can start fetching
375         // the next page without waiting for the current page to
376         // finish processing.
377         collQ := make(chan arvados.Collection, bufs)
378
379         // Start a goroutine to process collections. (We could use a
380         // worker pool here, but even with a single worker we already
381         // process collections much faster than we can retrieve them.)
382         wg.Add(1)
383         go func() {
384                 defer wg.Done()
385                 for coll := range collQ {
386                         err := bal.addCollection(coll)
387                         if err != nil || len(errs) > 0 {
388                                 select {
389                                 case errs <- err:
390                                 default:
391                                 }
392                                 for range collQ {
393                                 }
394                                 return
395                         }
396                         bal.collScanned++
397                 }
398         }()
399
400         // Start a goroutine to retrieve all collections from the
401         // Arvados database and send them to collQ for processing.
402         wg.Add(1)
403         go func() {
404                 defer wg.Done()
405                 err = EachCollection(c, pageSize,
406                         func(coll arvados.Collection) error {
407                                 collQ <- coll
408                                 if len(errs) > 0 {
409                                         // some other GetCurrentState
410                                         // error happened: no point
411                                         // getting any more
412                                         // collections.
413                                         return fmt.Errorf("")
414                                 }
415                                 return nil
416                         }, func(done, total int) {
417                                 bal.logf("collections: %d/%d", done, total)
418                         })
419                 close(collQ)
420                 if err != nil {
421                         select {
422                         case errs <- err:
423                         default:
424                         }
425                 }
426         }()
427
428         wg.Wait()
429         if len(errs) > 0 {
430                 return <-errs
431         }
432         return nil
433 }
434
435 func (bal *Balancer) addCollection(coll arvados.Collection) error {
436         blkids, err := coll.SizedDigests()
437         if err != nil {
438                 return fmt.Errorf("%v: %v", coll.UUID, err)
439         }
440         repl := bal.DefaultReplication
441         if coll.ReplicationDesired != nil {
442                 repl = *coll.ReplicationDesired
443         }
444         bal.Logger.Debugf("%v: %d block x%d", coll.UUID, len(blkids), repl)
445         // Pass pdh to IncreaseDesired only if LostBlocksFile is being
446         // written -- otherwise it's just a waste of memory.
447         pdh := ""
448         if bal.LostBlocksFile != "" {
449                 pdh = coll.PortableDataHash
450         }
451         bal.BlockStateMap.IncreaseDesired(pdh, coll.StorageClassesDesired, repl, blkids)
452         return nil
453 }
454
455 // ComputeChangeSets compares, for each known block, the current and
456 // desired replication states. If it is possible to get closer to the
457 // desired state by copying or deleting blocks, it adds those changes
458 // to the relevant KeepServices' ChangeSets.
459 //
460 // It does not actually apply any of the computed changes.
461 func (bal *Balancer) ComputeChangeSets() {
462         // This just calls balanceBlock() once for each block, using a
463         // pool of worker goroutines.
464         defer bal.time("changeset_compute", "wall clock time to compute changesets")()
465         bal.setupLookupTables()
466
467         type balanceTask struct {
468                 blkid arvados.SizedDigest
469                 blk   *BlockState
470         }
471         workers := runtime.GOMAXPROCS(-1)
472         todo := make(chan balanceTask, workers)
473         go func() {
474                 bal.BlockStateMap.Apply(func(blkid arvados.SizedDigest, blk *BlockState) {
475                         todo <- balanceTask{
476                                 blkid: blkid,
477                                 blk:   blk,
478                         }
479                 })
480                 close(todo)
481         }()
482         results := make(chan balanceResult, workers)
483         go func() {
484                 var wg sync.WaitGroup
485                 for i := 0; i < workers; i++ {
486                         wg.Add(1)
487                         go func() {
488                                 for work := range todo {
489                                         results <- bal.balanceBlock(work.blkid, work.blk)
490                                 }
491                                 wg.Done()
492                         }()
493                 }
494                 wg.Wait()
495                 close(results)
496         }()
497         bal.collectStatistics(results)
498 }
499
500 func (bal *Balancer) setupLookupTables() {
501         bal.serviceRoots = make(map[string]string)
502         bal.classes = defaultClasses
503         bal.mountsByClass = map[string]map[*KeepMount]bool{"default": {}}
504         bal.mounts = 0
505         for _, srv := range bal.KeepServices {
506                 bal.serviceRoots[srv.UUID] = srv.UUID
507                 for _, mnt := range srv.mounts {
508                         bal.mounts++
509
510                         // All mounts on a read-only service are
511                         // effectively read-only.
512                         mnt.ReadOnly = mnt.ReadOnly || srv.ReadOnly
513
514                         if len(mnt.StorageClasses) == 0 {
515                                 bal.mountsByClass["default"][mnt] = true
516                                 continue
517                         }
518                         for class := range mnt.StorageClasses {
519                                 if mbc := bal.mountsByClass[class]; mbc == nil {
520                                         bal.classes = append(bal.classes, class)
521                                         bal.mountsByClass[class] = map[*KeepMount]bool{mnt: true}
522                                 } else {
523                                         mbc[mnt] = true
524                                 }
525                         }
526                 }
527         }
528         // Consider classes in lexicographic order to avoid flapping
529         // between balancing runs.  The outcome of the "prefer a mount
530         // we're already planning to use for a different storage
531         // class" case in balanceBlock depends on the order classes
532         // are considered.
533         sort.Strings(bal.classes)
534 }
535
536 const (
537         changeStay = iota
538         changePull
539         changeTrash
540         changeNone
541 )
542
543 var changeName = map[int]string{
544         changeStay:  "stay",
545         changePull:  "pull",
546         changeTrash: "trash",
547         changeNone:  "none",
548 }
549
550 type balancedBlockState struct {
551         needed       int
552         unneeded     int
553         pulling      int
554         unachievable bool
555 }
556
557 type balanceResult struct {
558         blk        *BlockState
559         blkid      arvados.SizedDigest
560         lost       bool
561         blockState balancedBlockState
562         classState map[string]balancedBlockState
563 }
564
565 type slot struct {
566         mnt  *KeepMount // never nil
567         repl *Replica   // replica already stored here (or nil)
568         want bool       // we should pull/leave a replica here
569 }
570
571 // balanceBlock compares current state to desired state for a single
572 // block, and makes the appropriate ChangeSet calls.
573 func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) balanceResult {
574         bal.Logger.Debugf("balanceBlock: %v %+v", blkid, blk)
575
576         // Build a list of all slots (one per mounted volume).
577         slots := make([]slot, 0, bal.mounts)
578         for _, srv := range bal.KeepServices {
579                 for _, mnt := range srv.mounts {
580                         var repl *Replica
581                         for r := range blk.Replicas {
582                                 if blk.Replicas[r].KeepMount == mnt {
583                                         repl = &blk.Replicas[r]
584                                 }
585                         }
586                         // Initial value of "want" is "have, and can't
587                         // delete". These untrashable replicas get
588                         // prioritized when sorting slots: otherwise,
589                         // non-optimal readonly copies would cause us
590                         // to overreplicate.
591                         slots = append(slots, slot{
592                                 mnt:  mnt,
593                                 repl: repl,
594                                 want: repl != nil && mnt.ReadOnly,
595                         })
596                 }
597         }
598
599         uuids := keepclient.NewRootSorter(bal.serviceRoots, string(blkid[:32])).GetSortedRoots()
600         srvRendezvous := make(map[*KeepService]int, len(uuids))
601         for i, uuid := range uuids {
602                 srv := bal.KeepServices[uuid]
603                 srvRendezvous[srv] = i
604         }
605
606         // Below we set underreplicated=true if we find any storage
607         // class that's currently underreplicated -- in that case we
608         // won't want to trash any replicas.
609         underreplicated := false
610
611         unsafeToDelete := make(map[int64]bool, len(slots))
612         for _, class := range bal.classes {
613                 desired := blk.Desired[class]
614                 if desired == 0 {
615                         continue
616                 }
617
618                 // Sort the slots by desirability.
619                 sort.Slice(slots, func(i, j int) bool {
620                         si, sj := slots[i], slots[j]
621                         if classi, classj := bal.mountsByClass[class][si.mnt], bal.mountsByClass[class][sj.mnt]; classi != classj {
622                                 // Prefer a mount that satisfies the
623                                 // desired class.
624                                 return bal.mountsByClass[class][si.mnt]
625                         } else if si.want != sj.want {
626                                 // Prefer a mount that will have a
627                                 // replica no matter what we do here
628                                 // -- either because it already has an
629                                 // untrashable replica, or because we
630                                 // already need it to satisfy a
631                                 // different storage class.
632                                 return si.want
633                         } else if orderi, orderj := srvRendezvous[si.mnt.KeepService], srvRendezvous[sj.mnt.KeepService]; orderi != orderj {
634                                 // Prefer a better rendezvous
635                                 // position.
636                                 return orderi < orderj
637                         } else if repli, replj := si.repl != nil, sj.repl != nil; repli != replj {
638                                 // Prefer a mount that already has a
639                                 // replica.
640                                 return repli
641                         } else {
642                                 // If pull/trash turns out to be
643                                 // needed, distribute the
644                                 // new/remaining replicas uniformly
645                                 // across qualifying mounts on a given
646                                 // server.
647                                 return rendezvousLess(si.mnt.DeviceID, sj.mnt.DeviceID, blkid)
648                         }
649                 })
650
651                 // Servers/mounts/devices (with or without existing
652                 // replicas) that are part of the best achievable
653                 // layout for this storage class.
654                 wantSrv := map[*KeepService]bool{}
655                 wantMnt := map[*KeepMount]bool{}
656                 wantDev := map[string]bool{}
657                 // Positions (with existing replicas) that have been
658                 // protected (via unsafeToDelete) to ensure we don't
659                 // reduce replication below desired level when
660                 // trashing replicas that aren't optimal positions for
661                 // any storage class.
662                 protMnt := map[*KeepMount]bool{}
663                 // Replication planned so far (corresponds to wantMnt).
664                 replWant := 0
665                 // Protected replication (corresponds to protMnt).
666                 replProt := 0
667
668                 // trySlot tries using a slot to meet requirements,
669                 // and returns true if all requirements are met.
670                 trySlot := func(i int) bool {
671                         slot := slots[i]
672                         if wantMnt[slot.mnt] || wantDev[slot.mnt.DeviceID] {
673                                 // Already allocated a replica to this
674                                 // backend device, possibly on a
675                                 // different server.
676                                 return false
677                         }
678                         if replProt < desired && slot.repl != nil && !protMnt[slot.mnt] {
679                                 unsafeToDelete[slot.repl.Mtime] = true
680                                 protMnt[slot.mnt] = true
681                                 replProt += slot.mnt.Replication
682                         }
683                         if replWant < desired && (slot.repl != nil || !slot.mnt.ReadOnly) {
684                                 slots[i].want = true
685                                 wantSrv[slot.mnt.KeepService] = true
686                                 wantMnt[slot.mnt] = true
687                                 if slot.mnt.DeviceID != "" {
688                                         wantDev[slot.mnt.DeviceID] = true
689                                 }
690                                 replWant += slot.mnt.Replication
691                         }
692                         return replProt >= desired && replWant >= desired
693                 }
694
695                 // First try to achieve desired replication without
696                 // using the same server twice.
697                 done := false
698                 for i := 0; i < len(slots) && !done; i++ {
699                         if !wantSrv[slots[i].mnt.KeepService] {
700                                 done = trySlot(i)
701                         }
702                 }
703
704                 // If that didn't suffice, do another pass without the
705                 // "distinct services" restriction. (Achieving the
706                 // desired volume replication on fewer than the
707                 // desired number of services is better than
708                 // underreplicating.)
709                 for i := 0; i < len(slots) && !done; i++ {
710                         done = trySlot(i)
711                 }
712
713                 if !underreplicated {
714                         safe := 0
715                         for _, slot := range slots {
716                                 if slot.repl == nil || !bal.mountsByClass[class][slot.mnt] {
717                                         continue
718                                 }
719                                 if safe += slot.mnt.Replication; safe >= desired {
720                                         break
721                                 }
722                         }
723                         underreplicated = safe < desired
724                 }
725
726                 // Avoid deleting wanted replicas from devices that
727                 // are mounted on multiple servers -- even if they
728                 // haven't already been added to unsafeToDelete
729                 // because the servers report different Mtimes.
730                 for _, slot := range slots {
731                         if slot.repl != nil && wantDev[slot.mnt.DeviceID] {
732                                 unsafeToDelete[slot.repl.Mtime] = true
733                         }
734                 }
735         }
736
737         // TODO: If multiple replicas are trashable, prefer the oldest
738         // replica that doesn't have a timestamp collision with
739         // others.
740
741         for i, slot := range slots {
742                 // Don't trash (1) any replicas of an underreplicated
743                 // block, even if they're in the wrong positions, or
744                 // (2) any replicas whose Mtimes are identical to
745                 // needed replicas (in case we're really seeing the
746                 // same copy via different mounts).
747                 if slot.repl != nil && (underreplicated || unsafeToDelete[slot.repl.Mtime]) {
748                         slots[i].want = true
749                 }
750         }
751
752         classState := make(map[string]balancedBlockState, len(bal.classes))
753         for _, class := range bal.classes {
754                 classState[class] = computeBlockState(slots, bal.mountsByClass[class], len(blk.Replicas), blk.Desired[class])
755         }
756         blockState := computeBlockState(slots, nil, len(blk.Replicas), 0)
757
758         var lost bool
759         var changes []string
760         for _, slot := range slots {
761                 // TODO: request a Touch if Mtime is duplicated.
762                 var change int
763                 switch {
764                 case !slot.want && slot.repl != nil && slot.repl.Mtime < bal.MinMtime:
765                         slot.mnt.KeepService.AddTrash(Trash{
766                                 SizedDigest: blkid,
767                                 Mtime:       slot.repl.Mtime,
768                                 From:        slot.mnt,
769                         })
770                         change = changeTrash
771                 case slot.repl == nil && slot.want && len(blk.Replicas) == 0:
772                         lost = true
773                         change = changeNone
774                 case slot.repl == nil && slot.want && !slot.mnt.ReadOnly:
775                         slot.mnt.KeepService.AddPull(Pull{
776                                 SizedDigest: blkid,
777                                 From:        blk.Replicas[0].KeepMount.KeepService,
778                                 To:          slot.mnt,
779                         })
780                         change = changePull
781                 case slot.repl != nil:
782                         change = changeStay
783                 default:
784                         change = changeNone
785                 }
786                 if bal.Dumper != nil {
787                         var mtime int64
788                         if slot.repl != nil {
789                                 mtime = slot.repl.Mtime
790                         }
791                         srv := slot.mnt.KeepService
792                         changes = append(changes, fmt.Sprintf("%s:%d/%s=%s,%d", srv.ServiceHost, srv.ServicePort, slot.mnt.UUID, changeName[change], mtime))
793                 }
794         }
795         if bal.Dumper != nil {
796                 bal.Dumper.Printf("%s refs=%d needed=%d unneeded=%d pulling=%v %v %v", blkid, blk.RefCount, blockState.needed, blockState.unneeded, blockState.pulling, blk.Desired, changes)
797         }
798         return balanceResult{
799                 blk:        blk,
800                 blkid:      blkid,
801                 lost:       lost,
802                 blockState: blockState,
803                 classState: classState,
804         }
805 }
806
807 func computeBlockState(slots []slot, onlyCount map[*KeepMount]bool, have, needRepl int) (bbs balancedBlockState) {
808         repl := 0
809         countedDev := map[string]bool{}
810         for _, slot := range slots {
811                 if onlyCount != nil && !onlyCount[slot.mnt] {
812                         continue
813                 }
814                 if countedDev[slot.mnt.DeviceID] {
815                         continue
816                 }
817                 switch {
818                 case slot.repl != nil && slot.want:
819                         bbs.needed++
820                         repl += slot.mnt.Replication
821                 case slot.repl != nil && !slot.want:
822                         bbs.unneeded++
823                         repl += slot.mnt.Replication
824                 case slot.repl == nil && slot.want && have > 0:
825                         bbs.pulling++
826                         repl += slot.mnt.Replication
827                 }
828                 if slot.mnt.DeviceID != "" {
829                         countedDev[slot.mnt.DeviceID] = true
830                 }
831         }
832         if repl < needRepl {
833                 bbs.unachievable = true
834         }
835         return
836 }
837
838 type blocksNBytes struct {
839         replicas int
840         blocks   int
841         bytes    int64
842 }
843
844 func (bb blocksNBytes) String() string {
845         return fmt.Sprintf("%d replicas (%d blocks, %d bytes)", bb.replicas, bb.blocks, bb.bytes)
846 }
847
848 type replicationStats struct {
849         needed       blocksNBytes
850         unneeded     blocksNBytes
851         pulling      blocksNBytes
852         unachievable blocksNBytes
853 }
854
855 type balancerStats struct {
856         lost          blocksNBytes
857         overrep       blocksNBytes
858         unref         blocksNBytes
859         garbage       blocksNBytes
860         underrep      blocksNBytes
861         unachievable  blocksNBytes
862         justright     blocksNBytes
863         desired       blocksNBytes
864         current       blocksNBytes
865         pulls         int
866         trashes       int
867         replHistogram []int
868         classStats    map[string]replicationStats
869
870         // collectionBytes / collectionBlockBytes = deduplication ratio
871         collectionBytes      int64 // sum(bytes in referenced blocks) across all collections
872         collectionBlockBytes int64 // sum(block size) across all blocks referenced by collections
873         collectionBlockRefs  int64 // sum(number of blocks referenced) across all collections
874         collectionBlocks     int64 // number of blocks referenced by any collection
875 }
876
877 func (s *balancerStats) dedupByteRatio() float64 {
878         if s.collectionBlockBytes == 0 {
879                 return 0
880         }
881         return float64(s.collectionBytes) / float64(s.collectionBlockBytes)
882 }
883
884 func (s *balancerStats) dedupBlockRatio() float64 {
885         if s.collectionBlocks == 0 {
886                 return 0
887         }
888         return float64(s.collectionBlockRefs) / float64(s.collectionBlocks)
889 }
890
891 func (bal *Balancer) collectStatistics(results <-chan balanceResult) {
892         var s balancerStats
893         s.replHistogram = make([]int, 2)
894         s.classStats = make(map[string]replicationStats, len(bal.classes))
895         for result := range results {
896                 bytes := result.blkid.Size()
897
898                 if rc := int64(result.blk.RefCount); rc > 0 {
899                         s.collectionBytes += rc * bytes
900                         s.collectionBlockBytes += bytes
901                         s.collectionBlockRefs += rc
902                         s.collectionBlocks++
903                 }
904
905                 for class, state := range result.classState {
906                         cs := s.classStats[class]
907                         if state.unachievable {
908                                 cs.unachievable.replicas++
909                                 cs.unachievable.blocks++
910                                 cs.unachievable.bytes += bytes
911                         }
912                         if state.needed > 0 {
913                                 cs.needed.replicas += state.needed
914                                 cs.needed.blocks++
915                                 cs.needed.bytes += bytes * int64(state.needed)
916                         }
917                         if state.unneeded > 0 {
918                                 cs.unneeded.replicas += state.unneeded
919                                 cs.unneeded.blocks++
920                                 cs.unneeded.bytes += bytes * int64(state.unneeded)
921                         }
922                         if state.pulling > 0 {
923                                 cs.pulling.replicas += state.pulling
924                                 cs.pulling.blocks++
925                                 cs.pulling.bytes += bytes * int64(state.pulling)
926                         }
927                         s.classStats[class] = cs
928                 }
929
930                 bs := result.blockState
931                 switch {
932                 case result.lost:
933                         s.lost.replicas++
934                         s.lost.blocks++
935                         s.lost.bytes += bytes
936                         fmt.Fprintf(bal.lostBlocks, "%s", strings.SplitN(string(result.blkid), "+", 2)[0])
937                         for pdh := range result.blk.Refs {
938                                 fmt.Fprintf(bal.lostBlocks, " %s", pdh)
939                         }
940                         fmt.Fprint(bal.lostBlocks, "\n")
941                 case bs.pulling > 0:
942                         s.underrep.replicas += bs.pulling
943                         s.underrep.blocks++
944                         s.underrep.bytes += bytes * int64(bs.pulling)
945                 case bs.unachievable:
946                         s.underrep.replicas++
947                         s.underrep.blocks++
948                         s.underrep.bytes += bytes
949                 case bs.unneeded > 0 && bs.needed == 0:
950                         // Count as "garbage" if all replicas are old
951                         // enough to trash, otherwise count as
952                         // "unref".
953                         counter := &s.garbage
954                         for _, r := range result.blk.Replicas {
955                                 if r.Mtime >= bal.MinMtime {
956                                         counter = &s.unref
957                                         break
958                                 }
959                         }
960                         counter.replicas += bs.unneeded
961                         counter.blocks++
962                         counter.bytes += bytes * int64(bs.unneeded)
963                 case bs.unneeded > 0:
964                         s.overrep.replicas += bs.unneeded
965                         s.overrep.blocks++
966                         s.overrep.bytes += bytes * int64(bs.unneeded)
967                 default:
968                         s.justright.replicas += bs.needed
969                         s.justright.blocks++
970                         s.justright.bytes += bytes * int64(bs.needed)
971                 }
972
973                 if bs.needed > 0 {
974                         s.desired.replicas += bs.needed
975                         s.desired.blocks++
976                         s.desired.bytes += bytes * int64(bs.needed)
977                 }
978                 if bs.needed+bs.unneeded > 0 {
979                         s.current.replicas += bs.needed + bs.unneeded
980                         s.current.blocks++
981                         s.current.bytes += bytes * int64(bs.needed+bs.unneeded)
982                 }
983
984                 for len(s.replHistogram) <= bs.needed+bs.unneeded {
985                         s.replHistogram = append(s.replHistogram, 0)
986                 }
987                 s.replHistogram[bs.needed+bs.unneeded]++
988         }
989         for _, srv := range bal.KeepServices {
990                 s.pulls += len(srv.ChangeSet.Pulls)
991                 s.trashes += len(srv.ChangeSet.Trashes)
992         }
993         bal.stats = s
994         bal.Metrics.UpdateStats(s)
995 }
996
997 // PrintStatistics writes statistics about the computed changes to
998 // bal.Logger. It should not be called until ComputeChangeSets has
999 // finished.
1000 func (bal *Balancer) PrintStatistics() {
1001         bal.logf("===")
1002         bal.logf("%s lost (0=have<want)", bal.stats.lost)
1003         bal.logf("%s underreplicated (0<have<want)", bal.stats.underrep)
1004         bal.logf("%s just right (have=want)", bal.stats.justright)
1005         bal.logf("%s overreplicated (have>want>0)", bal.stats.overrep)
1006         bal.logf("%s unreferenced (have>want=0, new)", bal.stats.unref)
1007         bal.logf("%s garbage (have>want=0, old)", bal.stats.garbage)
1008         for _, class := range bal.classes {
1009                 cs := bal.stats.classStats[class]
1010                 bal.logf("===")
1011                 bal.logf("storage class %q: %s needed", class, cs.needed)
1012                 bal.logf("storage class %q: %s unneeded", class, cs.unneeded)
1013                 bal.logf("storage class %q: %s pulling", class, cs.pulling)
1014                 bal.logf("storage class %q: %s unachievable", class, cs.unachievable)
1015         }
1016         bal.logf("===")
1017         bal.logf("%s total commitment (excluding unreferenced)", bal.stats.desired)
1018         bal.logf("%s total usage", bal.stats.current)
1019         bal.logf("===")
1020         for _, srv := range bal.KeepServices {
1021                 bal.logf("%s: %v\n", srv, srv.ChangeSet)
1022         }
1023         bal.logf("===")
1024         bal.printHistogram(60)
1025         bal.logf("===")
1026 }
1027
1028 func (bal *Balancer) printHistogram(hashColumns int) {
1029         bal.logf("Replication level distribution:")
1030         maxCount := 0
1031         for _, count := range bal.stats.replHistogram {
1032                 if maxCount < count {
1033                         maxCount = count
1034                 }
1035         }
1036         hashes := strings.Repeat("#", hashColumns)
1037         countWidth := 1 + int(math.Log10(float64(maxCount+1)))
1038         scaleCount := 10 * float64(hashColumns) / math.Floor(1+10*math.Log10(float64(maxCount+1)))
1039         for repl, count := range bal.stats.replHistogram {
1040                 nHashes := int(scaleCount * math.Log10(float64(count+1)))
1041                 bal.logf("%2d: %*d %s", repl, countWidth, count, hashes[:nHashes])
1042         }
1043 }
1044
1045 // CheckSanityLate checks for configuration and runtime errors after
1046 // GetCurrentState() and ComputeChangeSets() have finished.
1047 //
1048 // If it returns an error, it is dangerous to run any Commit methods.
1049 func (bal *Balancer) CheckSanityLate() error {
1050         if bal.errors != nil {
1051                 for _, err := range bal.errors {
1052                         bal.logf("deferred error: %v", err)
1053                 }
1054                 return fmt.Errorf("cannot proceed safely after deferred errors")
1055         }
1056
1057         if bal.collScanned == 0 {
1058                 return fmt.Errorf("received zero collections")
1059         }
1060
1061         anyDesired := false
1062         bal.BlockStateMap.Apply(func(_ arvados.SizedDigest, blk *BlockState) {
1063                 for _, desired := range blk.Desired {
1064                         if desired > 0 {
1065                                 anyDesired = true
1066                                 break
1067                         }
1068                 }
1069         })
1070         if !anyDesired {
1071                 return fmt.Errorf("zero blocks have desired replication>0")
1072         }
1073
1074         if dr := bal.DefaultReplication; dr < 1 {
1075                 return fmt.Errorf("Default replication (%d) is less than 1", dr)
1076         }
1077
1078         // TODO: no two services have identical indexes
1079         // TODO: no collisions (same md5, different size)
1080         return nil
1081 }
1082
1083 // CommitPulls sends the computed lists of pull requests to the
1084 // keepstore servers. This has the effect of increasing replication of
1085 // existing blocks that are either underreplicated or poorly
1086 // distributed according to rendezvous hashing.
1087 func (bal *Balancer) CommitPulls(c *arvados.Client) error {
1088         defer bal.time("send_pull_lists", "wall clock time to send pull lists")()
1089         return bal.commitAsync(c, "send pull list",
1090                 func(srv *KeepService) error {
1091                         return srv.CommitPulls(c)
1092                 })
1093 }
1094
1095 // CommitTrash sends the computed lists of trash requests to the
1096 // keepstore servers. This has the effect of deleting blocks that are
1097 // overreplicated or unreferenced.
1098 func (bal *Balancer) CommitTrash(c *arvados.Client) error {
1099         defer bal.time("send_trash_lists", "wall clock time to send trash lists")()
1100         return bal.commitAsync(c, "send trash list",
1101                 func(srv *KeepService) error {
1102                         return srv.CommitTrash(c)
1103                 })
1104 }
1105
1106 func (bal *Balancer) commitAsync(c *arvados.Client, label string, f func(srv *KeepService) error) error {
1107         errs := make(chan error)
1108         for _, srv := range bal.KeepServices {
1109                 go func(srv *KeepService) {
1110                         var err error
1111                         defer func() { errs <- err }()
1112                         label := fmt.Sprintf("%s: %v", srv, label)
1113                         err = f(srv)
1114                         if err != nil {
1115                                 err = fmt.Errorf("%s: %v", label, err)
1116                         }
1117                 }(srv)
1118         }
1119         var lastErr error
1120         for range bal.KeepServices {
1121                 if err := <-errs; err != nil {
1122                         bal.logf("%v", err)
1123                         lastErr = err
1124                 }
1125         }
1126         close(errs)
1127         return lastErr
1128 }
1129
1130 func (bal *Balancer) logf(f string, args ...interface{}) {
1131         if bal.Logger != nil {
1132                 bal.Logger.Printf(f, args...)
1133         }
1134 }
1135
1136 func (bal *Balancer) time(name, help string) func() {
1137         observer := bal.Metrics.DurationObserver(name+"_seconds", help)
1138         t0 := time.Now()
1139         bal.Logger.Printf("%s: start", name)
1140         return func() {
1141                 dur := time.Since(t0)
1142                 observer.Observe(dur.Seconds())
1143                 bal.Logger.Printf("%s: took %vs", name, dur.Seconds())
1144         }
1145 }
1146
1147 // Rendezvous hash sort function. Less efficient than sorting on
1148 // precomputed rendezvous hashes, but also rarely used.
1149 func rendezvousLess(i, j string, blkid arvados.SizedDigest) bool {
1150         a := md5.Sum([]byte(string(blkid[:32]) + i))
1151         b := md5.Sum([]byte(string(blkid[:32]) + j))
1152         return bytes.Compare(a[:], b[:]) < 0
1153 }