21907: Rename s3SecretCacheTidyInterval
[arvados.git] / services / keep-balance / balance.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package keepbalance
6
7 import (
8         "bytes"
9         "context"
10         "crypto/md5"
11         "errors"
12         "fmt"
13         "io"
14         "io/ioutil"
15         "log"
16         "math"
17         "os"
18         "regexp"
19         "runtime"
20         "sort"
21         "strconv"
22         "strings"
23         "sync"
24         "sync/atomic"
25         "syscall"
26         "time"
27
28         "git.arvados.org/arvados.git/lib/controller/dblock"
29         "git.arvados.org/arvados.git/sdk/go/arvados"
30         "git.arvados.org/arvados.git/sdk/go/keepclient"
31         "github.com/jmoiron/sqlx"
32         "github.com/sirupsen/logrus"
33 )
34
35 // Balancer compares the contents of keepstore servers with the
36 // collections stored in Arvados, and issues pull/trash requests
37 // needed to get (closer to) the optimal data layout.
38 //
39 // In the optimal data layout: every data block referenced by a
40 // collection is replicated at least as many times as desired by the
41 // collection; there are no unreferenced data blocks older than
42 // BlobSignatureTTL; and all N existing replicas of a given data block
43 // are in the N best positions in rendezvous probe order.
44 type Balancer struct {
45         DB      *sqlx.DB
46         Logger  logrus.FieldLogger
47         Dumper  logrus.FieldLogger
48         Metrics *metrics
49
50         ChunkPrefix    string
51         LostBlocksFile string
52
53         *BlockStateMap
54         KeepServices       map[string]*KeepService
55         DefaultReplication int
56         MinMtime           int64
57
58         classes       []string
59         mounts        int
60         mountsByClass map[string]map[*KeepMount]bool
61         collScanned   int64
62         serviceRoots  map[string]string
63         errors        []error
64         stats         balancerStats
65         mutex         sync.Mutex
66         lostBlocks    io.Writer
67 }
68
69 // Run performs a balance operation using the given config and
70 // runOptions, and returns RunOptions suitable for passing to a
71 // subsequent balance operation.
72 //
73 // Run should only be called once on a given Balancer object.
74 func (bal *Balancer) Run(ctx context.Context, client *arvados.Client, cluster *arvados.Cluster, runOptions RunOptions) (nextRunOptions RunOptions, err error) {
75         nextRunOptions = runOptions
76
77         bal.logf("acquiring active lock")
78         if !dblock.KeepBalanceActive.Lock(ctx, func(context.Context) (*sqlx.DB, error) { return bal.DB, nil }) {
79                 // context canceled
80                 return
81         }
82         defer dblock.KeepBalanceActive.Unlock()
83
84         defer bal.time("sweep", "wall clock time to run one full sweep")()
85
86         ctx, cancel := context.WithDeadline(ctx, time.Now().Add(cluster.Collections.BalanceTimeout.Duration()))
87         defer cancel()
88
89         go bal.reportMemorySize(ctx)
90
91         var lbFile *os.File
92         if bal.LostBlocksFile != "" {
93                 tmpfn := bal.LostBlocksFile + ".tmp"
94                 lbFile, err = os.OpenFile(tmpfn, os.O_CREATE|os.O_WRONLY, 0777)
95                 if err != nil {
96                         return
97                 }
98                 defer lbFile.Close()
99                 err = syscall.Flock(int(lbFile.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
100                 if err != nil {
101                         return
102                 }
103                 defer func() {
104                         // Remove the tempfile only if we didn't get
105                         // as far as successfully renaming it.
106                         if lbFile != nil {
107                                 os.Remove(tmpfn)
108                         }
109                 }()
110                 bal.lostBlocks = lbFile
111         } else {
112                 bal.lostBlocks = ioutil.Discard
113         }
114
115         err = bal.DiscoverKeepServices(client)
116         if err != nil {
117                 return
118         }
119
120         for _, srv := range bal.KeepServices {
121                 err = srv.discoverMounts(client)
122                 if err != nil {
123                         return
124                 }
125         }
126         bal.cleanupMounts()
127
128         if err = bal.CheckSanityEarly(client); err != nil {
129                 return
130         }
131
132         // On a big site, indexing and sending trash/pull lists can
133         // take much longer than the usual 5 minute client
134         // timeout. From here on, we rely on the context deadline
135         // instead, aborting the entire operation if any part takes
136         // too long.
137         client.Timeout = 0
138
139         rs := bal.rendezvousState()
140         if cluster.Collections.BalanceTrashLimit > 0 && rs != runOptions.SafeRendezvousState {
141                 if runOptions.SafeRendezvousState != "" {
142                         bal.logf("notice: KeepServices list has changed since last run")
143                 }
144                 bal.logf("clearing existing trash lists, in case the new rendezvous order differs from previous run")
145                 if err = bal.ClearTrashLists(ctx, client); err != nil {
146                         return
147                 }
148                 // The current rendezvous state becomes "safe" (i.e.,
149                 // OK to compute changes for that state without
150                 // clearing existing trash lists) only now, after we
151                 // succeed in clearing existing trash lists.
152                 nextRunOptions.SafeRendezvousState = rs
153         }
154
155         if err = bal.GetCurrentState(ctx, client, cluster.Collections.BalanceCollectionBatch, cluster.Collections.BalanceCollectionBuffers); err != nil {
156                 return
157         }
158         bal.setupLookupTables(cluster)
159         bal.ComputeChangeSets()
160         bal.PrintStatistics()
161         if err = bal.CheckSanityLate(); err != nil {
162                 return
163         }
164         if lbFile != nil {
165                 err = lbFile.Sync()
166                 if err != nil {
167                         return
168                 }
169                 err = os.Rename(bal.LostBlocksFile+".tmp", bal.LostBlocksFile)
170                 if err != nil {
171                         return
172                 }
173                 lbFile = nil
174         }
175         if cluster.Collections.BalancePullLimit > 0 {
176                 err = bal.CommitPulls(ctx, client)
177                 if err != nil {
178                         // Skip trash if we can't pull. (Too cautious?)
179                         return
180                 }
181         }
182         if cluster.Collections.BalanceTrashLimit > 0 {
183                 err = bal.CommitTrash(ctx, client)
184                 if err != nil {
185                         return
186                 }
187         }
188         if runOptions.CommitConfirmedFields {
189                 err = bal.updateCollections(ctx, client, cluster)
190                 if err != nil {
191                         return
192                 }
193         }
194         return
195 }
196
197 // SetKeepServices sets the list of KeepServices to operate on.
198 func (bal *Balancer) SetKeepServices(srvList arvados.KeepServiceList) error {
199         bal.KeepServices = make(map[string]*KeepService)
200         for _, srv := range srvList.Items {
201                 bal.KeepServices[srv.UUID] = &KeepService{
202                         KeepService: srv,
203                         ChangeSet:   &ChangeSet{},
204                 }
205         }
206         return nil
207 }
208
209 // DiscoverKeepServices sets the list of KeepServices by calling the
210 // API to get a list of all services, and selecting the ones whose
211 // ServiceType is "disk"
212 func (bal *Balancer) DiscoverKeepServices(c *arvados.Client) error {
213         bal.KeepServices = make(map[string]*KeepService)
214         return c.EachKeepService(func(srv arvados.KeepService) error {
215                 if srv.ServiceType == "disk" {
216                         bal.KeepServices[srv.UUID] = &KeepService{
217                                 KeepService: srv,
218                                 ChangeSet:   &ChangeSet{},
219                         }
220                 } else {
221                         bal.logf("skipping %v with service type %q", srv.UUID, srv.ServiceType)
222                 }
223                 return nil
224         })
225 }
226
227 func (bal *Balancer) cleanupMounts() {
228         rwdev := map[string]*KeepService{}
229         for _, srv := range bal.KeepServices {
230                 for _, mnt := range srv.mounts {
231                         if mnt.AllowWrite {
232                                 rwdev[mnt.UUID] = srv
233                         }
234                 }
235         }
236         // Drop the readonly mounts whose device is mounted RW
237         // elsewhere.
238         for _, srv := range bal.KeepServices {
239                 var dedup []*KeepMount
240                 for _, mnt := range srv.mounts {
241                         if !mnt.AllowWrite && rwdev[mnt.UUID] != nil {
242                                 bal.logf("skipping srv %s readonly mount %q because same volume is mounted read-write on srv %s", srv, mnt.UUID, rwdev[mnt.UUID])
243                         } else {
244                                 dedup = append(dedup, mnt)
245                         }
246                 }
247                 srv.mounts = dedup
248         }
249         for _, srv := range bal.KeepServices {
250                 for _, mnt := range srv.mounts {
251                         if mnt.Replication <= 0 {
252                                 log.Printf("%s: mount %s reports replication=%d, using replication=1", srv, mnt.UUID, mnt.Replication)
253                                 mnt.Replication = 1
254                         }
255                 }
256         }
257 }
258
259 // CheckSanityEarly checks for configuration and runtime errors that
260 // can be detected before GetCurrentState() and ComputeChangeSets()
261 // are called.
262 //
263 // If it returns an error, it is pointless to run GetCurrentState or
264 // ComputeChangeSets: after doing so, the statistics would be
265 // meaningless and it would be dangerous to run any Commit methods.
266 func (bal *Balancer) CheckSanityEarly(c *arvados.Client) error {
267         u, err := c.CurrentUser()
268         if err != nil {
269                 return fmt.Errorf("CurrentUser(): %v", err)
270         }
271         if !u.IsActive || !u.IsAdmin {
272                 return fmt.Errorf("current user (%s) is not an active admin user", u.UUID)
273         }
274         for _, srv := range bal.KeepServices {
275                 if srv.ServiceType == "proxy" {
276                         return fmt.Errorf("config error: %s: proxy servers cannot be balanced", srv)
277                 }
278         }
279         for _, c := range bal.ChunkPrefix {
280                 if !strings.ContainsRune("0123456789abcdef", c) {
281                         return fmt.Errorf("invalid char %q in chunk prefix %q: only lowercase hex digits make sense", string(c), bal.ChunkPrefix)
282                 }
283         }
284         if len(bal.ChunkPrefix) > 32 {
285                 return fmt.Errorf("invalid chunk prefix %q: longer than a block hash", bal.ChunkPrefix)
286         }
287
288         mountProblem := false
289         type deviceMount struct {
290                 srv *KeepService
291                 mnt *KeepMount
292         }
293         deviceMounted := map[string]deviceMount{} // DeviceID -> mount
294         for _, srv := range bal.KeepServices {
295                 for _, mnt := range srv.mounts {
296                         if first, dup := deviceMounted[mnt.DeviceID]; dup && first.mnt.UUID != mnt.UUID && mnt.DeviceID != "" {
297                                 bal.logf("config error: device %s is mounted with multiple volume UUIDs: %s on %s, and %s on %s",
298                                         mnt.DeviceID,
299                                         first.mnt.UUID, first.srv,
300                                         mnt.UUID, srv)
301                                 mountProblem = true
302                                 continue
303                         }
304                         deviceMounted[mnt.DeviceID] = deviceMount{srv, mnt}
305                 }
306         }
307         if mountProblem {
308                 return errors.New("cannot continue with config errors (see above)")
309         }
310
311         var checkPage arvados.CollectionList
312         if err = c.RequestAndDecode(&checkPage, "GET", "arvados/v1/collections", nil, arvados.ResourceListParams{
313                 Limit:              new(int),
314                 Count:              "exact",
315                 IncludeTrash:       true,
316                 IncludeOldVersions: true,
317                 Filters: []arvados.Filter{{
318                         Attr:     "modified_at",
319                         Operator: "=",
320                         Operand:  nil,
321                 }},
322         }); err != nil {
323                 return err
324         } else if n := checkPage.ItemsAvailable; n > 0 {
325                 return fmt.Errorf("%d collections exist with null modified_at; cannot fetch reliably", n)
326         }
327
328         return nil
329 }
330
331 // rendezvousState returns a fingerprint (e.g., a sorted list of
332 // UUID+host+port) of the current set of keep services.
333 func (bal *Balancer) rendezvousState() string {
334         srvs := make([]string, 0, len(bal.KeepServices))
335         for _, srv := range bal.KeepServices {
336                 srvs = append(srvs, srv.String())
337         }
338         sort.Strings(srvs)
339         return strings.Join(srvs, "; ")
340 }
341
342 // ClearTrashLists sends an empty trash list to each keep
343 // service. Calling this before GetCurrentState avoids races.
344 //
345 // When a block appears in an index, we assume that replica will still
346 // exist after we delete other replicas on other servers. However,
347 // it's possible that a previous rebalancing operation made different
348 // decisions (e.g., servers were added/removed, and rendezvous order
349 // changed). In this case, the replica might already be on that
350 // server's trash list, and it might be deleted before we send a
351 // replacement trash list.
352 //
353 // We avoid this problem if we clear all trash lists before getting
354 // indexes. (We also assume there is only one rebalancing process
355 // running at a time.)
356 func (bal *Balancer) ClearTrashLists(ctx context.Context, c *arvados.Client) error {
357         for _, srv := range bal.KeepServices {
358                 srv.ChangeSet = &ChangeSet{}
359         }
360         return bal.CommitTrash(ctx, c)
361 }
362
363 // GetCurrentState determines the current replication state, and the
364 // desired replication level, for every block that is either
365 // retrievable or referenced.
366 //
367 // It determines the current replication state by reading the block index
368 // from every known Keep service.
369 //
370 // It determines the desired replication level by retrieving all
371 // collection manifests in the database (API server).
372 //
373 // It encodes the resulting information in BlockStateMap.
374 func (bal *Balancer) GetCurrentState(ctx context.Context, c *arvados.Client, pageSize, bufs int) error {
375         ctx, cancel := context.WithCancel(ctx)
376         defer cancel()
377
378         defer bal.time("get_state", "wall clock time to get current state")()
379         bal.BlockStateMap = NewBlockStateMap()
380
381         dd, err := c.DiscoveryDocument()
382         if err != nil {
383                 return err
384         }
385         bal.DefaultReplication = dd.DefaultCollectionReplication
386         bal.MinMtime = time.Now().UnixNano() - dd.BlobSignatureTTL*1e9
387
388         errs := make(chan error, 1)
389         wg := sync.WaitGroup{}
390
391         // When a device is mounted more than once, we will get its
392         // index only once, and call AddReplicas on all of the mounts.
393         // equivMount keys are the mounts that will be indexed, and
394         // each value is a list of mounts to apply the received index
395         // to.
396         equivMount := map[*KeepMount][]*KeepMount{}
397         // deviceMount maps each device ID to the one mount that will
398         // be indexed for that device.
399         deviceMount := map[string]*KeepMount{}
400         for _, srv := range bal.KeepServices {
401                 for _, mnt := range srv.mounts {
402                         equiv := deviceMount[mnt.UUID]
403                         if equiv == nil {
404                                 equiv = mnt
405                                 deviceMount[mnt.UUID] = equiv
406                         }
407                         equivMount[equiv] = append(equivMount[equiv], mnt)
408                 }
409         }
410
411         // Start one goroutine for each (non-redundant) mount:
412         // retrieve the index, and add the returned blocks to
413         // BlockStateMap.
414         for _, mounts := range equivMount {
415                 wg.Add(1)
416                 go func(mounts []*KeepMount) {
417                         defer wg.Done()
418                         bal.logf("mount %s: retrieve index from %s", mounts[0], mounts[0].KeepService)
419                         idx, err := mounts[0].KeepService.IndexMount(ctx, c, mounts[0].UUID, bal.ChunkPrefix)
420                         if err != nil {
421                                 select {
422                                 case errs <- fmt.Errorf("%s: retrieve index: %v", mounts[0], err):
423                                 default:
424                                 }
425                                 cancel()
426                                 return
427                         }
428                         if len(errs) > 0 {
429                                 // Some other goroutine encountered an
430                                 // error -- any further effort here
431                                 // will be wasted.
432                                 return
433                         }
434                         for _, mount := range mounts {
435                                 bal.logf("%s: add %d entries to map", mount, len(idx))
436                                 bal.BlockStateMap.AddReplicas(mount, idx)
437                                 bal.logf("%s: added %d entries to map at %dx (%d replicas)", mount, len(idx), mount.Replication, len(idx)*mount.Replication)
438                         }
439                         bal.logf("mount %s: index done", mounts[0])
440                 }(mounts)
441         }
442
443         collQ := make(chan arvados.Collection, bufs)
444
445         // Retrieve all collections from the database and send them to
446         // collQ.
447         wg.Add(1)
448         go func() {
449                 defer wg.Done()
450                 err = EachCollection(ctx, bal.DB, c,
451                         func(coll arvados.Collection) error {
452                                 collQ <- coll
453                                 if len(errs) > 0 {
454                                         // some other GetCurrentState
455                                         // error happened: no point
456                                         // getting any more
457                                         // collections.
458                                         return fmt.Errorf("")
459                                 }
460                                 return nil
461                         }, func(done, total int) {
462                                 bal.logf("collections: %d/%d", done, total)
463                         })
464                 close(collQ)
465                 if err != nil {
466                         select {
467                         case errs <- err:
468                         default:
469                         }
470                         cancel()
471                 }
472         }()
473
474         // Parse manifests from collQ and pass the block hashes to
475         // BlockStateMap to track desired replication.
476         for i := 0; i < runtime.NumCPU(); i++ {
477                 wg.Add(1)
478                 go func() {
479                         defer wg.Done()
480                         for coll := range collQ {
481                                 err := bal.addCollection(coll)
482                                 if err != nil || len(errs) > 0 {
483                                         select {
484                                         case errs <- err:
485                                         default:
486                                         }
487                                         cancel()
488                                         continue
489                                 }
490                                 atomic.AddInt64(&bal.collScanned, 1)
491                         }
492                 }()
493         }
494
495         wg.Wait()
496         if len(errs) > 0 {
497                 return <-errs
498         }
499         return nil
500 }
501
502 func (bal *Balancer) addCollection(coll arvados.Collection) error {
503         blkids, err := coll.SizedDigests()
504         if err != nil {
505                 return fmt.Errorf("%v: %v", coll.UUID, err)
506         }
507         repl := bal.DefaultReplication
508         if coll.ReplicationDesired != nil {
509                 repl = *coll.ReplicationDesired
510         }
511         if bal.ChunkPrefix != "" {
512                 // Throw out blocks that don't match the requested
513                 // prefix.  (We save a bit of GC work here by
514                 // preallocating based on each hex digit in
515                 // ChunkPrefix reducing the expected size of the
516                 // filtered set by ~16x.)
517                 filtered := make([]arvados.SizedDigest, 0, len(blkids)>>(4*len(bal.ChunkPrefix)-1))
518                 for _, blkid := range blkids {
519                         if strings.HasPrefix(string(blkid), bal.ChunkPrefix) {
520                                 filtered = append(filtered, blkid)
521                         }
522                 }
523                 blkids = filtered
524         }
525         bal.Logger.Debugf("%v: %d blocks x%d", coll.UUID, len(blkids), repl)
526         // Pass pdh to IncreaseDesired only if LostBlocksFile is being
527         // written -- otherwise it's just a waste of memory.
528         pdh := ""
529         if bal.LostBlocksFile != "" {
530                 pdh = coll.PortableDataHash
531         }
532         bal.BlockStateMap.IncreaseDesired(pdh, coll.StorageClassesDesired, repl, blkids)
533         return nil
534 }
535
536 // ComputeChangeSets compares, for each known block, the current and
537 // desired replication states. If it is possible to get closer to the
538 // desired state by copying or deleting blocks, it adds those changes
539 // to the relevant KeepServices' ChangeSets.
540 //
541 // It does not actually apply any of the computed changes.
542 func (bal *Balancer) ComputeChangeSets() {
543         // This just calls balanceBlock() once for each block, using a
544         // pool of worker goroutines.
545         defer bal.time("changeset_compute", "wall clock time to compute changesets")()
546
547         type balanceTask struct {
548                 blkid arvados.SizedDigest
549                 blk   *BlockState
550         }
551         workers := runtime.GOMAXPROCS(-1)
552         todo := make(chan balanceTask, workers)
553         go func() {
554                 bal.BlockStateMap.Apply(func(blkid arvados.SizedDigest, blk *BlockState) {
555                         todo <- balanceTask{
556                                 blkid: blkid,
557                                 blk:   blk,
558                         }
559                 })
560                 close(todo)
561         }()
562         results := make(chan balanceResult, workers)
563         go func() {
564                 var wg sync.WaitGroup
565                 for i := 0; i < workers; i++ {
566                         wg.Add(1)
567                         go func() {
568                                 for work := range todo {
569                                         results <- bal.balanceBlock(work.blkid, work.blk)
570                                 }
571                                 wg.Done()
572                         }()
573                 }
574                 wg.Wait()
575                 close(results)
576         }()
577         bal.collectStatistics(results)
578 }
579
580 func (bal *Balancer) setupLookupTables(cluster *arvados.Cluster) {
581         bal.serviceRoots = make(map[string]string)
582         bal.classes = defaultClasses
583         bal.mountsByClass = map[string]map[*KeepMount]bool{"default": {}}
584         bal.mounts = 0
585         for _, srv := range bal.KeepServices {
586                 bal.serviceRoots[srv.UUID] = srv.UUID
587                 for _, mnt := range srv.mounts {
588                         bal.mounts++
589
590                         if srv.ReadOnly {
591                                 // All mounts on a read-only service
592                                 // are effectively read-only.
593                                 mnt.AllowWrite = false
594                         }
595
596                         for class := range mnt.StorageClasses {
597                                 if mbc := bal.mountsByClass[class]; mbc == nil {
598                                         bal.classes = append(bal.classes, class)
599                                         bal.mountsByClass[class] = map[*KeepMount]bool{mnt: true}
600                                 } else {
601                                         mbc[mnt] = true
602                                 }
603                         }
604                 }
605         }
606         // Consider classes in lexicographic order to avoid flapping
607         // between balancing runs.  The outcome of the "prefer a mount
608         // we're already planning to use for a different storage
609         // class" case in balanceBlock depends on the order classes
610         // are considered.
611         sort.Strings(bal.classes)
612
613         for _, srv := range bal.KeepServices {
614                 srv.ChangeSet = &ChangeSet{
615                         PullLimit:  cluster.Collections.BalancePullLimit,
616                         TrashLimit: cluster.Collections.BalanceTrashLimit,
617                 }
618         }
619 }
620
621 const (
622         changeStay = iota
623         changePull
624         changeTrash
625         changeNone
626 )
627
628 var changeName = map[int]string{
629         changeStay:  "stay",
630         changePull:  "pull",
631         changeTrash: "trash",
632         changeNone:  "none",
633 }
634
635 type balancedBlockState struct {
636         needed       int
637         unneeded     int
638         pulling      int
639         unachievable bool
640 }
641
642 type balanceResult struct {
643         blk        *BlockState
644         blkid      arvados.SizedDigest
645         lost       bool
646         blockState balancedBlockState
647         classState map[string]balancedBlockState
648 }
649
650 type slot struct {
651         mnt  *KeepMount // never nil
652         repl *Replica   // replica already stored here (or nil)
653         want bool       // we should pull/leave a replica here
654 }
655
656 // balanceBlock compares current state to desired state for a single
657 // block, and makes the appropriate ChangeSet calls.
658 func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) balanceResult {
659         bal.Logger.Debugf("balanceBlock: %v %+v", blkid, blk)
660
661         // Build a list of all slots (one per mounted volume).
662         slots := make([]slot, 0, bal.mounts)
663         for _, srv := range bal.KeepServices {
664                 for _, mnt := range srv.mounts {
665                         var repl *Replica
666                         for r := range blk.Replicas {
667                                 if blk.Replicas[r].KeepMount == mnt {
668                                         repl = &blk.Replicas[r]
669                                 }
670                         }
671                         // Initial value of "want" is "have, and can't
672                         // delete". These untrashable replicas get
673                         // prioritized when sorting slots: otherwise,
674                         // non-optimal readonly copies would cause us
675                         // to overreplicate.
676                         slots = append(slots, slot{
677                                 mnt:  mnt,
678                                 repl: repl,
679                                 want: repl != nil && !mnt.AllowTrash,
680                         })
681                 }
682         }
683
684         uuids := keepclient.NewRootSorter(bal.serviceRoots, string(blkid[:32])).GetSortedRoots()
685         srvRendezvous := make(map[*KeepService]int, len(uuids))
686         for i, uuid := range uuids {
687                 srv := bal.KeepServices[uuid]
688                 srvRendezvous[srv] = i
689         }
690
691         // Below we set underreplicated=true if we find any storage
692         // class that's currently underreplicated -- in that case we
693         // won't want to trash any replicas.
694         underreplicated := false
695
696         unsafeToDelete := make(map[int64]bool, len(slots))
697         for _, class := range bal.classes {
698                 desired := blk.Desired[class]
699                 if desired == 0 {
700                         continue
701                 }
702
703                 // Sort the slots by desirability.
704                 sort.Slice(slots, func(i, j int) bool {
705                         si, sj := slots[i], slots[j]
706                         if classi, classj := bal.mountsByClass[class][si.mnt], bal.mountsByClass[class][sj.mnt]; classi != classj {
707                                 // Prefer a mount that satisfies the
708                                 // desired class.
709                                 return bal.mountsByClass[class][si.mnt]
710                         } else if si.want != sj.want {
711                                 // Prefer a mount that will have a
712                                 // replica no matter what we do here
713                                 // -- either because it already has an
714                                 // untrashable replica, or because we
715                                 // already need it to satisfy a
716                                 // different storage class.
717                                 return si.want
718                         } else if orderi, orderj := srvRendezvous[si.mnt.KeepService], srvRendezvous[sj.mnt.KeepService]; orderi != orderj {
719                                 // Prefer a better rendezvous
720                                 // position.
721                                 return orderi < orderj
722                         } else if repli, replj := si.repl != nil, sj.repl != nil; repli != replj {
723                                 // Prefer a mount that already has a
724                                 // replica.
725                                 return repli
726                         } else {
727                                 // If pull/trash turns out to be
728                                 // needed, distribute the
729                                 // new/remaining replicas uniformly
730                                 // across qualifying mounts on a given
731                                 // server.
732                                 return rendezvousLess(si.mnt.UUID, sj.mnt.UUID, blkid)
733                         }
734                 })
735
736                 // Servers/mounts/devices (with or without existing
737                 // replicas) that are part of the best achievable
738                 // layout for this storage class.
739                 wantSrv := map[*KeepService]bool{}
740                 wantMnt := map[*KeepMount]bool{}
741                 wantDev := map[string]bool{}
742                 // Positions (with existing replicas) that have been
743                 // protected (via unsafeToDelete) to ensure we don't
744                 // reduce replication below desired level when
745                 // trashing replicas that aren't optimal positions for
746                 // any storage class.
747                 protMnt := map[*KeepMount]bool{}
748                 // Replication planned so far (corresponds to wantMnt).
749                 replWant := 0
750                 // Protected replication (corresponds to protMnt).
751                 replProt := 0
752
753                 // trySlot tries using a slot to meet requirements,
754                 // and returns true if all requirements are met.
755                 trySlot := func(i int) bool {
756                         slot := slots[i]
757                         if wantMnt[slot.mnt] || wantDev[slot.mnt.UUID] {
758                                 // Already allocated a replica to this
759                                 // backend device, possibly on a
760                                 // different server.
761                                 return false
762                         }
763                         if replProt < desired && slot.repl != nil && !protMnt[slot.mnt] {
764                                 unsafeToDelete[slot.repl.Mtime] = true
765                                 protMnt[slot.mnt] = true
766                                 replProt += slot.mnt.Replication
767                         }
768                         if replWant < desired && (slot.repl != nil || slot.mnt.AllowWrite) {
769                                 slots[i].want = true
770                                 wantSrv[slot.mnt.KeepService] = true
771                                 wantMnt[slot.mnt] = true
772                                 wantDev[slot.mnt.UUID] = true
773                                 replWant += slot.mnt.Replication
774                         }
775                         return replProt >= desired && replWant >= desired
776                 }
777
778                 // First try to achieve desired replication without
779                 // using the same server twice.
780                 done := false
781                 for i := 0; i < len(slots) && !done; i++ {
782                         if !wantSrv[slots[i].mnt.KeepService] {
783                                 done = trySlot(i)
784                         }
785                 }
786
787                 // If that didn't suffice, do another pass without the
788                 // "distinct services" restriction. (Achieving the
789                 // desired volume replication on fewer than the
790                 // desired number of services is better than
791                 // underreplicating.)
792                 for i := 0; i < len(slots) && !done; i++ {
793                         done = trySlot(i)
794                 }
795
796                 if !underreplicated {
797                         safe := 0
798                         for _, slot := range slots {
799                                 if slot.repl == nil || !bal.mountsByClass[class][slot.mnt] {
800                                         continue
801                                 }
802                                 if safe += slot.mnt.Replication; safe >= desired {
803                                         break
804                                 }
805                         }
806                         underreplicated = safe < desired
807                 }
808
809                 // Avoid deleting wanted replicas from devices that
810                 // are mounted on multiple servers -- even if they
811                 // haven't already been added to unsafeToDelete
812                 // because the servers report different Mtimes.
813                 for _, slot := range slots {
814                         if slot.repl != nil && wantDev[slot.mnt.UUID] {
815                                 unsafeToDelete[slot.repl.Mtime] = true
816                         }
817                 }
818         }
819
820         // TODO: If multiple replicas are trashable, prefer the oldest
821         // replica that doesn't have a timestamp collision with
822         // others.
823
824         for i, slot := range slots {
825                 // Don't trash (1) any replicas of an underreplicated
826                 // block, even if they're in the wrong positions, or
827                 // (2) any replicas whose Mtimes are identical to
828                 // needed replicas (in case we're really seeing the
829                 // same copy via different mounts).
830                 if slot.repl != nil && (underreplicated || unsafeToDelete[slot.repl.Mtime]) {
831                         slots[i].want = true
832                 }
833         }
834
835         classState := make(map[string]balancedBlockState, len(bal.classes))
836         for _, class := range bal.classes {
837                 classState[class] = computeBlockState(slots, bal.mountsByClass[class], len(blk.Replicas), blk.Desired[class])
838         }
839         blockState := computeBlockState(slots, nil, len(blk.Replicas), 0)
840
841         // Sort the slots by rendezvous order. This ensures "trash the
842         // first of N replicas with identical timestamps" is
843         // predictable (helpful for testing) and well distributed
844         // across servers.
845         sort.Slice(slots, func(i, j int) bool {
846                 si, sj := slots[i], slots[j]
847                 if orderi, orderj := srvRendezvous[si.mnt.KeepService], srvRendezvous[sj.mnt.KeepService]; orderi != orderj {
848                         return orderi < orderj
849                 } else {
850                         return rendezvousLess(si.mnt.UUID, sj.mnt.UUID, blkid)
851                 }
852         })
853
854         var (
855                 lost         bool
856                 changes      []string
857                 trashedMtime = make(map[int64]bool, len(slots))
858         )
859         for _, slot := range slots {
860                 // TODO: request a Touch if Mtime is duplicated.
861                 var change int
862                 switch {
863                 case !slot.want && slot.repl != nil && slot.repl.Mtime < bal.MinMtime:
864                         if trashedMtime[slot.repl.Mtime] {
865                                 // Don't trash multiple replicas with
866                                 // identical timestamps. If they are
867                                 // multiple views of the same backing
868                                 // storage, asking both servers to
869                                 // trash is redundant and can cause
870                                 // races (see #20242). If they are
871                                 // distinct replicas that happen to
872                                 // have identical timestamps, we'll
873                                 // get this one on the next sweep.
874                                 change = changeNone
875                         } else {
876                                 slot.mnt.KeepService.AddTrash(Trash{
877                                         SizedDigest: blkid,
878                                         Mtime:       slot.repl.Mtime,
879                                         From:        slot.mnt,
880                                 })
881                                 change = changeTrash
882                                 trashedMtime[slot.repl.Mtime] = true
883                         }
884                 case slot.repl == nil && slot.want && len(blk.Replicas) == 0:
885                         lost = true
886                         change = changeNone
887                 case slot.repl == nil && slot.want && slot.mnt.AllowWrite:
888                         slot.mnt.KeepService.AddPull(Pull{
889                                 SizedDigest: blkid,
890                                 From:        blk.Replicas[0].KeepMount.KeepService,
891                                 To:          slot.mnt,
892                         })
893                         change = changePull
894                 case slot.repl != nil:
895                         change = changeStay
896                 default:
897                         change = changeNone
898                 }
899                 if bal.Dumper != nil {
900                         var mtime int64
901                         if slot.repl != nil {
902                                 mtime = slot.repl.Mtime
903                         }
904                         srv := slot.mnt.KeepService
905                         changes = append(changes, fmt.Sprintf("%s:%d/%s=%s,%d", srv.ServiceHost, srv.ServicePort, slot.mnt.UUID, changeName[change], mtime))
906                 }
907         }
908         if bal.Dumper != nil {
909                 bal.Dumper.Printf("%s refs=%d needed=%d unneeded=%d pulling=%v %v %v", blkid, blk.RefCount, blockState.needed, blockState.unneeded, blockState.pulling, blk.Desired, changes)
910         }
911         return balanceResult{
912                 blk:        blk,
913                 blkid:      blkid,
914                 lost:       lost,
915                 blockState: blockState,
916                 classState: classState,
917         }
918 }
919
920 func computeBlockState(slots []slot, onlyCount map[*KeepMount]bool, have, needRepl int) (bbs balancedBlockState) {
921         repl := 0
922         countedDev := map[string]bool{}
923         for _, slot := range slots {
924                 if onlyCount != nil && !onlyCount[slot.mnt] {
925                         continue
926                 }
927                 if countedDev[slot.mnt.UUID] {
928                         continue
929                 }
930                 switch {
931                 case slot.repl != nil && slot.want:
932                         bbs.needed++
933                         repl += slot.mnt.Replication
934                 case slot.repl != nil && !slot.want:
935                         bbs.unneeded++
936                         repl += slot.mnt.Replication
937                 case slot.repl == nil && slot.want && have > 0:
938                         bbs.pulling++
939                         repl += slot.mnt.Replication
940                 }
941                 countedDev[slot.mnt.UUID] = true
942         }
943         if repl < needRepl {
944                 bbs.unachievable = true
945         }
946         return
947 }
948
949 type blocksNBytes struct {
950         replicas int
951         blocks   int
952         bytes    int64
953 }
954
955 func (bb blocksNBytes) String() string {
956         return fmt.Sprintf("%d replicas (%d blocks, %d bytes)", bb.replicas, bb.blocks, bb.bytes)
957 }
958
959 type replicationStats struct {
960         needed       blocksNBytes
961         unneeded     blocksNBytes
962         pulling      blocksNBytes
963         unachievable blocksNBytes
964 }
965
966 type balancerStats struct {
967         lost            blocksNBytes
968         overrep         blocksNBytes
969         unref           blocksNBytes
970         garbage         blocksNBytes
971         underrep        blocksNBytes
972         unachievable    blocksNBytes
973         justright       blocksNBytes
974         desired         blocksNBytes
975         current         blocksNBytes
976         pulls           int
977         pullsDeferred   int
978         trashes         int
979         trashesDeferred int
980         replHistogram   []int
981         classStats      map[string]replicationStats
982
983         // collectionBytes / collectionBlockBytes = deduplication ratio
984         collectionBytes      int64 // sum(bytes in referenced blocks) across all collections
985         collectionBlockBytes int64 // sum(block size) across all blocks referenced by collections
986         collectionBlockRefs  int64 // sum(number of blocks referenced) across all collections
987         collectionBlocks     int64 // number of blocks referenced by any collection
988 }
989
990 func (s *balancerStats) dedupByteRatio() float64 {
991         if s.collectionBlockBytes == 0 {
992                 return 0
993         }
994         return float64(s.collectionBytes) / float64(s.collectionBlockBytes)
995 }
996
997 func (s *balancerStats) dedupBlockRatio() float64 {
998         if s.collectionBlocks == 0 {
999                 return 0
1000         }
1001         return float64(s.collectionBlockRefs) / float64(s.collectionBlocks)
1002 }
1003
1004 func (bal *Balancer) collectStatistics(results <-chan balanceResult) {
1005         var s balancerStats
1006         s.replHistogram = make([]int, 2)
1007         s.classStats = make(map[string]replicationStats, len(bal.classes))
1008         for result := range results {
1009                 bytes := result.blkid.Size()
1010
1011                 if rc := int64(result.blk.RefCount); rc > 0 {
1012                         s.collectionBytes += rc * bytes
1013                         s.collectionBlockBytes += bytes
1014                         s.collectionBlockRefs += rc
1015                         s.collectionBlocks++
1016                 }
1017
1018                 for class, state := range result.classState {
1019                         cs := s.classStats[class]
1020                         if state.unachievable {
1021                                 cs.unachievable.replicas++
1022                                 cs.unachievable.blocks++
1023                                 cs.unachievable.bytes += bytes
1024                         }
1025                         if state.needed > 0 {
1026                                 cs.needed.replicas += state.needed
1027                                 cs.needed.blocks++
1028                                 cs.needed.bytes += bytes * int64(state.needed)
1029                         }
1030                         if state.unneeded > 0 {
1031                                 cs.unneeded.replicas += state.unneeded
1032                                 cs.unneeded.blocks++
1033                                 cs.unneeded.bytes += bytes * int64(state.unneeded)
1034                         }
1035                         if state.pulling > 0 {
1036                                 cs.pulling.replicas += state.pulling
1037                                 cs.pulling.blocks++
1038                                 cs.pulling.bytes += bytes * int64(state.pulling)
1039                         }
1040                         s.classStats[class] = cs
1041                 }
1042
1043                 bs := result.blockState
1044                 switch {
1045                 case result.lost:
1046                         s.lost.replicas++
1047                         s.lost.blocks++
1048                         s.lost.bytes += bytes
1049                         fmt.Fprintf(bal.lostBlocks, "%s", strings.SplitN(string(result.blkid), "+", 2)[0])
1050                         for pdh := range result.blk.Refs {
1051                                 fmt.Fprintf(bal.lostBlocks, " %s", pdh)
1052                         }
1053                         fmt.Fprint(bal.lostBlocks, "\n")
1054                 case bs.pulling > 0:
1055                         s.underrep.replicas += bs.pulling
1056                         s.underrep.blocks++
1057                         s.underrep.bytes += bytes * int64(bs.pulling)
1058                 case bs.unachievable:
1059                         s.underrep.replicas++
1060                         s.underrep.blocks++
1061                         s.underrep.bytes += bytes
1062                 case bs.unneeded > 0 && bs.needed == 0:
1063                         // Count as "garbage" if all replicas are old
1064                         // enough to trash, otherwise count as
1065                         // "unref".
1066                         counter := &s.garbage
1067                         for _, r := range result.blk.Replicas {
1068                                 if r.Mtime >= bal.MinMtime {
1069                                         counter = &s.unref
1070                                         break
1071                                 }
1072                         }
1073                         counter.replicas += bs.unneeded
1074                         counter.blocks++
1075                         counter.bytes += bytes * int64(bs.unneeded)
1076                 case bs.unneeded > 0:
1077                         s.overrep.replicas += bs.unneeded
1078                         s.overrep.blocks++
1079                         s.overrep.bytes += bytes * int64(bs.unneeded)
1080                 default:
1081                         s.justright.replicas += bs.needed
1082                         s.justright.blocks++
1083                         s.justright.bytes += bytes * int64(bs.needed)
1084                 }
1085
1086                 if bs.needed > 0 {
1087                         s.desired.replicas += bs.needed
1088                         s.desired.blocks++
1089                         s.desired.bytes += bytes * int64(bs.needed)
1090                 }
1091                 if bs.needed+bs.unneeded > 0 {
1092                         s.current.replicas += bs.needed + bs.unneeded
1093                         s.current.blocks++
1094                         s.current.bytes += bytes * int64(bs.needed+bs.unneeded)
1095                 }
1096
1097                 for len(s.replHistogram) <= bs.needed+bs.unneeded {
1098                         s.replHistogram = append(s.replHistogram, 0)
1099                 }
1100                 s.replHistogram[bs.needed+bs.unneeded]++
1101         }
1102         for _, srv := range bal.KeepServices {
1103                 s.pulls += len(srv.ChangeSet.Pulls)
1104                 s.pullsDeferred += srv.ChangeSet.PullsDeferred
1105                 s.trashes += len(srv.ChangeSet.Trashes)
1106                 s.trashesDeferred += srv.ChangeSet.TrashesDeferred
1107         }
1108         bal.stats = s
1109         bal.Metrics.UpdateStats(s)
1110 }
1111
1112 // PrintStatistics writes statistics about the computed changes to
1113 // bal.Logger. It should not be called until ComputeChangeSets has
1114 // finished.
1115 func (bal *Balancer) PrintStatistics() {
1116         bal.logf("===")
1117         bal.logf("%s lost (0=have<want)", bal.stats.lost)
1118         bal.logf("%s underreplicated (0<have<want)", bal.stats.underrep)
1119         bal.logf("%s just right (have=want)", bal.stats.justright)
1120         bal.logf("%s overreplicated (have>want>0)", bal.stats.overrep)
1121         bal.logf("%s unreferenced (have>want=0, new)", bal.stats.unref)
1122         bal.logf("%s garbage (have>want=0, old)", bal.stats.garbage)
1123         for _, class := range bal.classes {
1124                 cs := bal.stats.classStats[class]
1125                 bal.logf("===")
1126                 bal.logf("storage class %q: %s needed", class, cs.needed)
1127                 bal.logf("storage class %q: %s unneeded", class, cs.unneeded)
1128                 bal.logf("storage class %q: %s pulling", class, cs.pulling)
1129                 bal.logf("storage class %q: %s unachievable", class, cs.unachievable)
1130         }
1131         bal.logf("===")
1132         bal.logf("%s total commitment (excluding unreferenced)", bal.stats.desired)
1133         bal.logf("%s total usage", bal.stats.current)
1134         bal.logf("===")
1135         for _, srv := range bal.KeepServices {
1136                 bal.logf("%s: %v\n", srv, srv.ChangeSet)
1137         }
1138         bal.logf("===")
1139         bal.printHistogram(60)
1140         bal.logf("===")
1141 }
1142
1143 func (bal *Balancer) printHistogram(hashColumns int) {
1144         bal.logf("Replication level distribution:")
1145         maxCount := 0
1146         for _, count := range bal.stats.replHistogram {
1147                 if maxCount < count {
1148                         maxCount = count
1149                 }
1150         }
1151         hashes := strings.Repeat("#", hashColumns)
1152         countWidth := 1 + int(math.Log10(float64(maxCount+1)))
1153         scaleCount := 10 * float64(hashColumns) / math.Floor(1+10*math.Log10(float64(maxCount+1)))
1154         for repl, count := range bal.stats.replHistogram {
1155                 nHashes := int(scaleCount * math.Log10(float64(count+1)))
1156                 bal.logf("%2d: %*d %s", repl, countWidth, count, hashes[:nHashes])
1157         }
1158 }
1159
1160 // CheckSanityLate checks for configuration and runtime errors after
1161 // GetCurrentState() and ComputeChangeSets() have finished.
1162 //
1163 // If it returns an error, it is dangerous to run any Commit methods.
1164 func (bal *Balancer) CheckSanityLate() error {
1165         if bal.errors != nil {
1166                 for _, err := range bal.errors {
1167                         bal.logf("deferred error: %v", err)
1168                 }
1169                 return fmt.Errorf("cannot proceed safely after deferred errors")
1170         }
1171
1172         if bal.collScanned == 0 {
1173                 return fmt.Errorf("received zero collections")
1174         }
1175
1176         anyDesired := false
1177         bal.BlockStateMap.Apply(func(_ arvados.SizedDigest, blk *BlockState) {
1178                 for _, desired := range blk.Desired {
1179                         if desired > 0 {
1180                                 anyDesired = true
1181                                 break
1182                         }
1183                 }
1184         })
1185         if !anyDesired {
1186                 return fmt.Errorf("zero blocks have desired replication>0")
1187         }
1188
1189         if dr := bal.DefaultReplication; dr < 1 {
1190                 return fmt.Errorf("Default replication (%d) is less than 1", dr)
1191         }
1192
1193         // TODO: no two services have identical indexes
1194         // TODO: no collisions (same md5, different size)
1195         return nil
1196 }
1197
1198 // CommitPulls sends the computed lists of pull requests to the
1199 // keepstore servers. This has the effect of increasing replication of
1200 // existing blocks that are either underreplicated or poorly
1201 // distributed according to rendezvous hashing.
1202 func (bal *Balancer) CommitPulls(ctx context.Context, c *arvados.Client) error {
1203         defer bal.time("send_pull_lists", "wall clock time to send pull lists")()
1204         return bal.commitAsync(c, "send pull list",
1205                 func(srv *KeepService) error {
1206                         return srv.CommitPulls(ctx, c)
1207                 })
1208 }
1209
1210 // CommitTrash sends the computed lists of trash requests to the
1211 // keepstore servers. This has the effect of deleting blocks that are
1212 // overreplicated or unreferenced.
1213 func (bal *Balancer) CommitTrash(ctx context.Context, c *arvados.Client) error {
1214         defer bal.time("send_trash_lists", "wall clock time to send trash lists")()
1215         return bal.commitAsync(c, "send trash list",
1216                 func(srv *KeepService) error {
1217                         return srv.CommitTrash(ctx, c)
1218                 })
1219 }
1220
1221 func (bal *Balancer) commitAsync(c *arvados.Client, label string, f func(srv *KeepService) error) error {
1222         errs := make(chan error)
1223         for _, srv := range bal.KeepServices {
1224                 go func(srv *KeepService) {
1225                         var err error
1226                         defer func() { errs <- err }()
1227                         label := fmt.Sprintf("%s: %v", srv, label)
1228                         err = f(srv)
1229                         if err != nil {
1230                                 err = fmt.Errorf("%s: %v", label, err)
1231                         }
1232                 }(srv)
1233         }
1234         var lastErr error
1235         for range bal.KeepServices {
1236                 if err := <-errs; err != nil {
1237                         bal.logf("%v", err)
1238                         lastErr = err
1239                 }
1240         }
1241         close(errs)
1242         return lastErr
1243 }
1244
1245 func (bal *Balancer) logf(f string, args ...interface{}) {
1246         if bal.Logger != nil {
1247                 bal.Logger.Printf(f, args...)
1248         }
1249 }
1250
1251 func (bal *Balancer) time(name, help string) func() {
1252         observer := bal.Metrics.DurationObserver(name+"_seconds", help)
1253         t0 := time.Now()
1254         bal.Logger.Printf("%s: start", name)
1255         return func() {
1256                 dur := time.Since(t0)
1257                 observer.Observe(dur.Seconds())
1258                 bal.Logger.Printf("%s: took %vs", name, dur.Seconds())
1259         }
1260 }
1261
1262 // Log current memory usage: once now, at least once every 10 minutes,
1263 // and when memory grows by 40% since the last log. Stop when ctx is
1264 // canceled.
1265 func (bal *Balancer) reportMemorySize(ctx context.Context) {
1266         buf, _ := os.ReadFile("/proc/self/smaps")
1267         m := regexp.MustCompile(`\nKernelPageSize:\s*(\d+) kB\n`).FindSubmatch(buf)
1268         var pagesize int64
1269         if len(m) == 2 {
1270                 pagesize, _ = strconv.ParseInt(string(m[1]), 10, 64)
1271                 pagesize <<= 10
1272         }
1273         if pagesize == 0 {
1274                 bal.logf("cannot log OS-reported memory size: failed to parse KernelPageSize from /proc/self/smaps")
1275         }
1276         osstats := func() string {
1277                 if pagesize == 0 {
1278                         return ""
1279                 }
1280                 buf, _ := os.ReadFile("/proc/self/statm")
1281                 fields := strings.Split(string(buf), " ")
1282                 if len(fields) < 2 {
1283                         return ""
1284                 }
1285                 virt, _ := strconv.ParseInt(fields[0], 10, 64)
1286                 virt *= pagesize
1287                 res, _ := strconv.ParseInt(fields[1], 10, 64)
1288                 res *= pagesize
1289                 if virt == 0 || res == 0 {
1290                         return ""
1291                 }
1292                 return fmt.Sprintf(" virt %d res %d", virt, res)
1293         }
1294
1295         var nextTime time.Time
1296         var nextMem uint64
1297         const maxInterval = time.Minute * 10
1298         const maxIncrease = 1.4
1299
1300         ticker := time.NewTicker(time.Second)
1301         defer ticker.Stop()
1302         var memstats runtime.MemStats
1303         for ctx.Err() == nil {
1304                 now := time.Now()
1305                 runtime.ReadMemStats(&memstats)
1306                 mem := memstats.StackInuse + memstats.HeapInuse
1307                 if now.After(nextTime) || mem >= nextMem {
1308                         bal.logf("heap %d stack %d heapalloc %d%s", memstats.HeapInuse, memstats.StackInuse, memstats.HeapAlloc, osstats())
1309                         nextMem = uint64(float64(mem) * maxIncrease)
1310                         nextTime = now.Add(maxInterval)
1311                 }
1312                 <-ticker.C
1313         }
1314 }
1315
1316 // Rendezvous hash sort function. Less efficient than sorting on
1317 // precomputed rendezvous hashes, but also rarely used.
1318 func rendezvousLess(i, j string, blkid arvados.SizedDigest) bool {
1319         a := md5.Sum([]byte(string(blkid[:32]) + i))
1320         b := md5.Sum([]byte(string(blkid[:32]) + j))
1321         return bytes.Compare(a[:], b[:]) < 0
1322 }