Merge branch '21207-pytest'
[arvados.git] / services / keep-balance / balance.go
index a0b888a51ff054d659d9a6d88dabecb224a8e538..e71eb07efa6979fa005c4a59faf70f7c3187519b 100644 (file)
@@ -137,7 +137,7 @@ func (bal *Balancer) Run(ctx context.Context, client *arvados.Client, cluster *a
        client.Timeout = 0
 
        rs := bal.rendezvousState()
-       if runOptions.CommitTrash && rs != runOptions.SafeRendezvousState {
+       if cluster.Collections.BalanceTrashLimit > 0 && rs != runOptions.SafeRendezvousState {
                if runOptions.SafeRendezvousState != "" {
                        bal.logf("notice: KeepServices list has changed since last run")
                }
@@ -155,6 +155,7 @@ func (bal *Balancer) Run(ctx context.Context, client *arvados.Client, cluster *a
        if err = bal.GetCurrentState(ctx, client, cluster.Collections.BalanceCollectionBatch, cluster.Collections.BalanceCollectionBuffers); err != nil {
                return
        }
+       bal.setupLookupTables(cluster)
        bal.ComputeChangeSets()
        bal.PrintStatistics()
        if err = bal.CheckSanityLate(); err != nil {
@@ -171,14 +172,14 @@ func (bal *Balancer) Run(ctx context.Context, client *arvados.Client, cluster *a
                }
                lbFile = nil
        }
-       if runOptions.CommitPulls {
+       if cluster.Collections.BalancePullLimit > 0 {
                err = bal.CommitPulls(ctx, client)
                if err != nil {
                        // Skip trash if we can't pull. (Too cautious?)
                        return
                }
        }
-       if runOptions.CommitTrash {
+       if cluster.Collections.BalanceTrashLimit > 0 {
                err = bal.CommitTrash(ctx, client)
                if err != nil {
                        return
@@ -227,7 +228,7 @@ func (bal *Balancer) cleanupMounts() {
        rwdev := map[string]*KeepService{}
        for _, srv := range bal.KeepServices {
                for _, mnt := range srv.mounts {
-                       if !mnt.ReadOnly {
+                       if mnt.AllowWrite {
                                rwdev[mnt.UUID] = srv
                        }
                }
@@ -237,7 +238,7 @@ func (bal *Balancer) cleanupMounts() {
        for _, srv := range bal.KeepServices {
                var dedup []*KeepMount
                for _, mnt := range srv.mounts {
-                       if mnt.ReadOnly && rwdev[mnt.UUID] != nil {
+                       if !mnt.AllowWrite && rwdev[mnt.UUID] != nil {
                                bal.logf("skipping srv %s readonly mount %q because same volume is mounted read-write on srv %s", srv, mnt.UUID, rwdev[mnt.UUID])
                        } else {
                                dedup = append(dedup, mnt)
@@ -275,6 +276,14 @@ func (bal *Balancer) CheckSanityEarly(c *arvados.Client) error {
                        return fmt.Errorf("config error: %s: proxy servers cannot be balanced", srv)
                }
        }
+       for _, c := range bal.ChunkPrefix {
+               if !strings.ContainsRune("0123456789abcdef", c) {
+                       return fmt.Errorf("invalid char %q in chunk prefix %q: only lowercase hex digits make sense", string(c), bal.ChunkPrefix)
+               }
+       }
+       if len(bal.ChunkPrefix) > 32 {
+               return fmt.Errorf("invalid chunk prefix %q: longer than a block hash", bal.ChunkPrefix)
+       }
 
        mountProblem := false
        type deviceMount struct {
@@ -534,7 +543,6 @@ func (bal *Balancer) ComputeChangeSets() {
        // This just calls balanceBlock() once for each block, using a
        // pool of worker goroutines.
        defer bal.time("changeset_compute", "wall clock time to compute changesets")()
-       bal.setupLookupTables()
 
        type balanceTask struct {
                blkid arvados.SizedDigest
@@ -569,7 +577,7 @@ func (bal *Balancer) ComputeChangeSets() {
        bal.collectStatistics(results)
 }
 
-func (bal *Balancer) setupLookupTables() {
+func (bal *Balancer) setupLookupTables(cluster *arvados.Cluster) {
        bal.serviceRoots = make(map[string]string)
        bal.classes = defaultClasses
        bal.mountsByClass = map[string]map[*KeepMount]bool{"default": {}}
@@ -579,9 +587,11 @@ func (bal *Balancer) setupLookupTables() {
                for _, mnt := range srv.mounts {
                        bal.mounts++
 
-                       // All mounts on a read-only service are
-                       // effectively read-only.
-                       mnt.ReadOnly = mnt.ReadOnly || srv.ReadOnly
+                       if srv.ReadOnly {
+                               // All mounts on a read-only service
+                               // are effectively read-only.
+                               mnt.AllowWrite = false
+                       }
 
                        for class := range mnt.StorageClasses {
                                if mbc := bal.mountsByClass[class]; mbc == nil {
@@ -599,6 +609,13 @@ func (bal *Balancer) setupLookupTables() {
        // class" case in balanceBlock depends on the order classes
        // are considered.
        sort.Strings(bal.classes)
+
+       for _, srv := range bal.KeepServices {
+               srv.ChangeSet = &ChangeSet{
+                       PullLimit:  cluster.Collections.BalancePullLimit,
+                       TrashLimit: cluster.Collections.BalanceTrashLimit,
+               }
+       }
 }
 
 const (
@@ -659,7 +676,7 @@ func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) ba
                        slots = append(slots, slot{
                                mnt:  mnt,
                                repl: repl,
-                               want: repl != nil && mnt.ReadOnly,
+                               want: repl != nil && !mnt.AllowTrash,
                        })
                }
        }
@@ -748,7 +765,7 @@ func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) ba
                                protMnt[slot.mnt] = true
                                replProt += slot.mnt.Replication
                        }
-                       if replWant < desired && (slot.repl != nil || !slot.mnt.ReadOnly) {
+                       if replWant < desired && (slot.repl != nil || slot.mnt.AllowWrite) {
                                slots[i].want = true
                                wantSrv[slot.mnt.KeepService] = true
                                wantMnt[slot.mnt] = true
@@ -821,23 +838,53 @@ func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) ba
        }
        blockState := computeBlockState(slots, nil, len(blk.Replicas), 0)
 
-       var lost bool
-       var changes []string
+       // Sort the slots by rendezvous order. This ensures "trash the
+       // first of N replicas with identical timestamps" is
+       // predictable (helpful for testing) and well distributed
+       // across servers.
+       sort.Slice(slots, func(i, j int) bool {
+               si, sj := slots[i], slots[j]
+               if orderi, orderj := srvRendezvous[si.mnt.KeepService], srvRendezvous[sj.mnt.KeepService]; orderi != orderj {
+                       return orderi < orderj
+               } else {
+                       return rendezvousLess(si.mnt.UUID, sj.mnt.UUID, blkid)
+               }
+       })
+
+       var (
+               lost         bool
+               changes      []string
+               trashedMtime = make(map[int64]bool, len(slots))
+       )
        for _, slot := range slots {
                // TODO: request a Touch if Mtime is duplicated.
                var change int
                switch {
                case !slot.want && slot.repl != nil && slot.repl.Mtime < bal.MinMtime:
-                       slot.mnt.KeepService.AddTrash(Trash{
-                               SizedDigest: blkid,
-                               Mtime:       slot.repl.Mtime,
-                               From:        slot.mnt,
-                       })
-                       change = changeTrash
+                       if trashedMtime[slot.repl.Mtime] {
+                               // Don't trash multiple replicas with
+                               // identical timestamps. If they are
+                               // multiple views of the same backing
+                               // storage, asking both servers to
+                               // trash is redundant and can cause
+                               // races (see #20242). If they are
+                               // distinct replicas that happen to
+                               // have identical timestamps, we'll
+                               // get this one on the next sweep.
+                               change = changeNone
+                       } else {
+                               slot.mnt.KeepService.AddTrash(Trash{
+                                       SizedDigest: blkid,
+                                       Mtime:       slot.repl.Mtime,
+                                       From:        slot.mnt,
+                               })
+                               change = changeTrash
+                               trashedMtime[slot.repl.Mtime] = true
+                       }
                case slot.repl == nil && slot.want && len(blk.Replicas) == 0:
                        lost = true
                        change = changeNone
-               case slot.repl == nil && slot.want && !slot.mnt.ReadOnly:
+               case slot.repl == nil && slot.want && slot.mnt.AllowWrite:
                        slot.mnt.KeepService.AddPull(Pull{
                                SizedDigest: blkid,
                                From:        blk.Replicas[0].KeepMount.KeepService,
@@ -917,19 +964,21 @@ type replicationStats struct {
 }
 
 type balancerStats struct {
-       lost          blocksNBytes
-       overrep       blocksNBytes
-       unref         blocksNBytes
-       garbage       blocksNBytes
-       underrep      blocksNBytes
-       unachievable  blocksNBytes
-       justright     blocksNBytes
-       desired       blocksNBytes
-       current       blocksNBytes
-       pulls         int
-       trashes       int
-       replHistogram []int
-       classStats    map[string]replicationStats
+       lost            blocksNBytes
+       overrep         blocksNBytes
+       unref           blocksNBytes
+       garbage         blocksNBytes
+       underrep        blocksNBytes
+       unachievable    blocksNBytes
+       justright       blocksNBytes
+       desired         blocksNBytes
+       current         blocksNBytes
+       pulls           int
+       pullsDeferred   int
+       trashes         int
+       trashesDeferred int
+       replHistogram   []int
+       classStats      map[string]replicationStats
 
        // collectionBytes / collectionBlockBytes = deduplication ratio
        collectionBytes      int64 // sum(bytes in referenced blocks) across all collections
@@ -1052,7 +1101,9 @@ func (bal *Balancer) collectStatistics(results <-chan balanceResult) {
        }
        for _, srv := range bal.KeepServices {
                s.pulls += len(srv.ChangeSet.Pulls)
+               s.pullsDeferred += srv.ChangeSet.PullsDeferred
                s.trashes += len(srv.ChangeSet.Trashes)
+               s.trashesDeferred += srv.ChangeSet.TrashesDeferred
        }
        bal.stats = s
        bal.Metrics.UpdateStats(s)
@@ -1220,26 +1271,42 @@ func (bal *Balancer) reportMemorySize(ctx context.Context) {
                pagesize <<= 10
        }
        if pagesize == 0 {
-               bal.logf("cannot report memory size: failed to parse KernelPageSize from /proc/self/smaps")
-               return
+               bal.logf("cannot log OS-reported memory size: failed to parse KernelPageSize from /proc/self/smaps")
+       }
+       osstats := func() string {
+               if pagesize == 0 {
+                       return ""
+               }
+               buf, _ := os.ReadFile("/proc/self/statm")
+               fields := strings.Split(string(buf), " ")
+               if len(fields) < 2 {
+                       return ""
+               }
+               virt, _ := strconv.ParseInt(fields[0], 10, 64)
+               virt *= pagesize
+               res, _ := strconv.ParseInt(fields[1], 10, 64)
+               res *= pagesize
+               if virt == 0 || res == 0 {
+                       return ""
+               }
+               return fmt.Sprintf(" virt %d res %d", virt, res)
        }
 
        var nextTime time.Time
-       var nextMem int64
+       var nextMem uint64
        const maxInterval = time.Minute * 10
        const maxIncrease = 1.4
 
        ticker := time.NewTicker(time.Second)
        defer ticker.Stop()
+       var memstats runtime.MemStats
        for ctx.Err() == nil {
                now := time.Now()
-               buf, _ := os.ReadFile("/proc/self/statm")
-               fields := strings.Split(string(buf), " ")
-               mem, _ := strconv.ParseInt(fields[0], 10, 64)
-               mem *= pagesize
+               runtime.ReadMemStats(&memstats)
+               mem := memstats.StackInuse + memstats.HeapInuse
                if now.After(nextTime) || mem >= nextMem {
-                       bal.logf("process virtual memory size %d", mem)
-                       nextMem = int64(float64(mem) * maxIncrease)
+                       bal.logf("heap %d stack %d heapalloc %d%s", memstats.HeapInuse, memstats.StackInuse, memstats.HeapAlloc, osstats())
+                       nextMem = uint64(float64(mem) * maxIncrease)
                        nextTime = now.Add(maxInterval)
                }
                <-ticker.C