Merge branch '19975-oom-resubmit' refs #19975
[arvados.git] / services / keep-balance / balance.go
index d8cd84b516924ee152084874c25133661cfd8669..33c907c2031ac97dbcabe8742ad2313ead157f5f 100644 (file)
@@ -15,8 +15,10 @@ import (
        "log"
        "math"
        "os"
+       "regexp"
        "runtime"
        "sort"
+       "strconv"
        "strings"
        "sync"
        "sync/atomic"
@@ -25,7 +27,6 @@ import (
 
        "git.arvados.org/arvados.git/lib/controller/dblock"
        "git.arvados.org/arvados.git/sdk/go/arvados"
-       "git.arvados.org/arvados.git/sdk/go/ctxlog"
        "git.arvados.org/arvados.git/sdk/go/keepclient"
        "github.com/jmoiron/sqlx"
        "github.com/sirupsen/logrus"
@@ -73,7 +74,7 @@ type Balancer struct {
 func (bal *Balancer) Run(ctx context.Context, client *arvados.Client, cluster *arvados.Cluster, runOptions RunOptions) (nextRunOptions RunOptions, err error) {
        nextRunOptions = runOptions
 
-       ctxlog.FromContext(ctx).Info("acquiring active lock")
+       bal.logf("acquiring active lock")
        if !dblock.KeepBalanceActive.Lock(ctx, func(context.Context) (*sqlx.DB, error) { return bal.DB, nil }) {
                // context canceled
                return
@@ -85,6 +86,8 @@ func (bal *Balancer) Run(ctx context.Context, client *arvados.Client, cluster *a
        ctx, cancel := context.WithDeadline(ctx, time.Now().Add(cluster.Collections.BalanceTimeout.Duration()))
        defer cancel()
 
+       go bal.reportMemorySize(ctx)
+
        var lbFile *os.File
        if bal.LostBlocksFile != "" {
                tmpfn := bal.LostBlocksFile + ".tmp"
@@ -272,6 +275,14 @@ func (bal *Balancer) CheckSanityEarly(c *arvados.Client) error {
                        return fmt.Errorf("config error: %s: proxy servers cannot be balanced", srv)
                }
        }
+       for _, c := range bal.ChunkPrefix {
+               if !strings.ContainsRune("0123456789abcdef", c) {
+                       return fmt.Errorf("invalid char %q in chunk prefix %q: only lowercase hex digits make sense", string(c), bal.ChunkPrefix)
+               }
+       }
+       if len(bal.ChunkPrefix) > 32 {
+               return fmt.Errorf("invalid chunk prefix %q: longer than a block hash", bal.ChunkPrefix)
+       }
 
        mountProblem := false
        type deviceMount struct {
@@ -1205,6 +1216,60 @@ func (bal *Balancer) time(name, help string) func() {
        }
 }
 
+// Log current memory usage: once now, at least once every 10 minutes,
+// and when memory grows by 40% since the last log. Stop when ctx is
+// canceled.
+func (bal *Balancer) reportMemorySize(ctx context.Context) {
+       buf, _ := os.ReadFile("/proc/self/smaps")
+       m := regexp.MustCompile(`\nKernelPageSize:\s*(\d+) kB\n`).FindSubmatch(buf)
+       var pagesize int64
+       if len(m) == 2 {
+               pagesize, _ = strconv.ParseInt(string(m[1]), 10, 64)
+               pagesize <<= 10
+       }
+       if pagesize == 0 {
+               bal.logf("cannot log OS-reported memory size: failed to parse KernelPageSize from /proc/self/smaps")
+       }
+       osstats := func() string {
+               if pagesize == 0 {
+                       return ""
+               }
+               buf, _ := os.ReadFile("/proc/self/statm")
+               fields := strings.Split(string(buf), " ")
+               if len(fields) < 2 {
+                       return ""
+               }
+               virt, _ := strconv.ParseInt(fields[0], 10, 64)
+               virt *= pagesize
+               res, _ := strconv.ParseInt(fields[1], 10, 64)
+               res *= pagesize
+               if virt == 0 || res == 0 {
+                       return ""
+               }
+               return fmt.Sprintf(" virt %d res %d", virt, res)
+       }
+
+       var nextTime time.Time
+       var nextMem uint64
+       const maxInterval = time.Minute * 10
+       const maxIncrease = 1.4
+
+       ticker := time.NewTicker(time.Second)
+       defer ticker.Stop()
+       var memstats runtime.MemStats
+       for ctx.Err() == nil {
+               now := time.Now()
+               runtime.ReadMemStats(&memstats)
+               mem := memstats.StackInuse + memstats.HeapInuse
+               if now.After(nextTime) || mem >= nextMem {
+                       bal.logf("heap %d stack %d heapalloc %d%s", memstats.HeapInuse, memstats.StackInuse, memstats.HeapAlloc, osstats())
+                       nextMem = uint64(float64(mem) * maxIncrease)
+                       nextTime = now.Add(maxInterval)
+               }
+               <-ticker.C
+       }
+}
+
 // Rendezvous hash sort function. Less efficient than sorting on
 // precomputed rendezvous hashes, but also rarely used.
 func rendezvousLess(i, j string, blkid arvados.SizedDigest) bool {