20242: Trash only one when identical replicas are eligible to trash.
[arvados.git] / services / keep-balance / balance.go
index ef15ea0704996927b5a6bbaa2b01ee8ec0520f6f..215c5e1b5be1355e9f6793da54ab0c3148eff242 100644 (file)
@@ -829,19 +829,49 @@ func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) ba
        }
        blockState := computeBlockState(slots, nil, len(blk.Replicas), 0)
 
-       var lost bool
-       var changes []string
+       // Sort the slots by rendezvous order. This ensures "trash the
+       // first of N replicas with identical timestamps" is
+       // predictable (helpful for testing) and well distributed
+       // across servers.
+       sort.Slice(slots, func(i, j int) bool {
+               si, sj := slots[i], slots[j]
+               if orderi, orderj := srvRendezvous[si.mnt.KeepService], srvRendezvous[sj.mnt.KeepService]; orderi != orderj {
+                       return orderi < orderj
+               } else {
+                       return rendezvousLess(si.mnt.UUID, sj.mnt.UUID, blkid)
+               }
+       })
+
+       var (
+               lost         bool
+               changes      []string
+               trashedMtime = make(map[int64]bool, len(slots))
+       )
        for _, slot := range slots {
                // TODO: request a Touch if Mtime is duplicated.
                var change int
                switch {
                case !slot.want && slot.repl != nil && slot.repl.Mtime < bal.MinMtime:
-                       slot.mnt.KeepService.AddTrash(Trash{
-                               SizedDigest: blkid,
-                               Mtime:       slot.repl.Mtime,
-                               From:        slot.mnt,
-                       })
-                       change = changeTrash
+                       if trashedMtime[slot.repl.Mtime] {
+                               // Don't trash multiple replicas with
+                               // identical timestamps. If they are
+                               // multiple views of the same backing
+                               // storage, asking both servers to
+                               // trash is redundant and can cause
+                               // races (see #20242). If they are
+                               // distinct replicas that happen to
+                               // have identical timestamps, we'll
+                               // get this one on the next sweep.
+                               change = changeNone
+                       } else {
+                               slot.mnt.KeepService.AddTrash(Trash{
+                                       SizedDigest: blkid,
+                                       Mtime:       slot.repl.Mtime,
+                                       From:        slot.mnt,
+                               })
+                               change = changeTrash
+                               trashedMtime[slot.repl.Mtime] = true
+                       }
                case slot.repl == nil && slot.want && len(blk.Replicas) == 0:
                        lost = true
                        change = changeNone
@@ -1228,26 +1258,42 @@ func (bal *Balancer) reportMemorySize(ctx context.Context) {
                pagesize <<= 10
        }
        if pagesize == 0 {
-               bal.logf("cannot report memory size: failed to parse KernelPageSize from /proc/self/smaps")
-               return
+               bal.logf("cannot log OS-reported memory size: failed to parse KernelPageSize from /proc/self/smaps")
+       }
+       osstats := func() string {
+               if pagesize == 0 {
+                       return ""
+               }
+               buf, _ := os.ReadFile("/proc/self/statm")
+               fields := strings.Split(string(buf), " ")
+               if len(fields) < 2 {
+                       return ""
+               }
+               virt, _ := strconv.ParseInt(fields[0], 10, 64)
+               virt *= pagesize
+               res, _ := strconv.ParseInt(fields[1], 10, 64)
+               res *= pagesize
+               if virt == 0 || res == 0 {
+                       return ""
+               }
+               return fmt.Sprintf(" virt %d res %d", virt, res)
        }
 
        var nextTime time.Time
-       var nextMem int64
+       var nextMem uint64
        const maxInterval = time.Minute * 10
        const maxIncrease = 1.4
 
        ticker := time.NewTicker(time.Second)
        defer ticker.Stop()
+       var memstats runtime.MemStats
        for ctx.Err() == nil {
                now := time.Now()
-               buf, _ := os.ReadFile("/proc/self/statm")
-               fields := strings.Split(string(buf), " ")
-               mem, _ := strconv.ParseInt(fields[0], 10, 64)
-               mem *= pagesize
+               runtime.ReadMemStats(&memstats)
+               mem := memstats.StackInuse + memstats.HeapInuse
                if now.After(nextTime) || mem >= nextMem {
-                       bal.logf("process virtual memory size %d", mem)
-                       nextMem = int64(float64(mem) * maxIncrease)
+                       bal.logf("heap %d stack %d heapalloc %d%s", memstats.HeapInuse, memstats.StackInuse, memstats.HeapAlloc, osstats())
+                       nextMem = uint64(float64(mem) * maxIncrease)
                        nextTime = now.Add(maxInterval)
                }
                <-ticker.C