20242: Trash only one when identical replicas are eligible to trash.
[arvados.git] / services / keep-balance / balance.go
index fa01d512bc77b20d3ad871f21b14c5c88611ab0f..215c5e1b5be1355e9f6793da54ab0c3148eff242 100644 (file)
@@ -2,26 +2,30 @@
 //
 // SPDX-License-Identifier: AGPL-3.0
 
-package main
+package keepbalance
 
 import (
        "bytes"
        "context"
        "crypto/md5"
+       "errors"
        "fmt"
        "io"
        "io/ioutil"
        "log"
        "math"
        "os"
+       "regexp"
        "runtime"
        "sort"
+       "strconv"
        "strings"
        "sync"
        "sync/atomic"
        "syscall"
        "time"
 
+       "git.arvados.org/arvados.git/lib/controller/dblock"
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "git.arvados.org/arvados.git/sdk/go/keepclient"
        "github.com/jmoiron/sqlx"
@@ -43,6 +47,7 @@ type Balancer struct {
        Dumper  logrus.FieldLogger
        Metrics *metrics
 
+       ChunkPrefix    string
        LostBlocksFile string
 
        *BlockStateMap
@@ -66,18 +71,23 @@ type Balancer struct {
 // subsequent balance operation.
 //
 // Run should only be called once on a given Balancer object.
-//
-// Typical usage:
-//
-//   runOptions, err = (&Balancer{}).Run(config, runOptions)
-func (bal *Balancer) Run(client *arvados.Client, cluster *arvados.Cluster, runOptions RunOptions) (nextRunOptions RunOptions, err error) {
+func (bal *Balancer) Run(ctx context.Context, client *arvados.Client, cluster *arvados.Cluster, runOptions RunOptions) (nextRunOptions RunOptions, err error) {
        nextRunOptions = runOptions
 
+       bal.logf("acquiring active lock")
+       if !dblock.KeepBalanceActive.Lock(ctx, func(context.Context) (*sqlx.DB, error) { return bal.DB, nil }) {
+               // context canceled
+               return
+       }
+       defer dblock.KeepBalanceActive.Unlock()
+
        defer bal.time("sweep", "wall clock time to run one full sweep")()
 
-       ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(cluster.Collections.BalanceTimeout.Duration()))
+       ctx, cancel := context.WithDeadline(ctx, time.Now().Add(cluster.Collections.BalanceTimeout.Duration()))
        defer cancel()
 
+       go bal.reportMemorySize(ctx)
+
        var lbFile *os.File
        if bal.LostBlocksFile != "" {
                tmpfn := bal.LostBlocksFile + ".tmp"
@@ -265,6 +275,37 @@ func (bal *Balancer) CheckSanityEarly(c *arvados.Client) error {
                        return fmt.Errorf("config error: %s: proxy servers cannot be balanced", srv)
                }
        }
+       for _, c := range bal.ChunkPrefix {
+               if !strings.ContainsRune("0123456789abcdef", c) {
+                       return fmt.Errorf("invalid char %q in chunk prefix %q: only lowercase hex digits make sense", string(c), bal.ChunkPrefix)
+               }
+       }
+       if len(bal.ChunkPrefix) > 32 {
+               return fmt.Errorf("invalid chunk prefix %q: longer than a block hash", bal.ChunkPrefix)
+       }
+
+       mountProblem := false
+       type deviceMount struct {
+               srv *KeepService
+               mnt *KeepMount
+       }
+       deviceMounted := map[string]deviceMount{} // DeviceID -> mount
+       for _, srv := range bal.KeepServices {
+               for _, mnt := range srv.mounts {
+                       if first, dup := deviceMounted[mnt.DeviceID]; dup && first.mnt.UUID != mnt.UUID && mnt.DeviceID != "" {
+                               bal.logf("config error: device %s is mounted with multiple volume UUIDs: %s on %s, and %s on %s",
+                                       mnt.DeviceID,
+                                       first.mnt.UUID, first.srv,
+                                       mnt.UUID, srv)
+                               mountProblem = true
+                               continue
+                       }
+                       deviceMounted[mnt.DeviceID] = deviceMount{srv, mnt}
+               }
+       }
+       if mountProblem {
+               return errors.New("cannot continue with config errors (see above)")
+       }
 
        var checkPage arvados.CollectionList
        if err = c.RequestAndDecode(&checkPage, "GET", "arvados/v1/collections", nil, arvados.ResourceListParams{
@@ -374,7 +415,7 @@ func (bal *Balancer) GetCurrentState(ctx context.Context, c *arvados.Client, pag
                go func(mounts []*KeepMount) {
                        defer wg.Done()
                        bal.logf("mount %s: retrieve index from %s", mounts[0], mounts[0].KeepService)
-                       idx, err := mounts[0].KeepService.IndexMount(ctx, c, mounts[0].UUID, "")
+                       idx, err := mounts[0].KeepService.IndexMount(ctx, c, mounts[0].UUID, bal.ChunkPrefix)
                        if err != nil {
                                select {
                                case errs <- fmt.Errorf("%s: retrieve index: %v", mounts[0], err):
@@ -466,6 +507,20 @@ func (bal *Balancer) addCollection(coll arvados.Collection) error {
        if coll.ReplicationDesired != nil {
                repl = *coll.ReplicationDesired
        }
+       if bal.ChunkPrefix != "" {
+               // Throw out blocks that don't match the requested
+               // prefix.  (We save a bit of GC work here by
+               // preallocating based on each hex digit in
+               // ChunkPrefix reducing the expected size of the
+               // filtered set by ~16x.)
+               filtered := make([]arvados.SizedDigest, 0, len(blkids)>>(4*len(bal.ChunkPrefix)-1))
+               for _, blkid := range blkids {
+                       if strings.HasPrefix(string(blkid), bal.ChunkPrefix) {
+                               filtered = append(filtered, blkid)
+                       }
+               }
+               blkids = filtered
+       }
        bal.Logger.Debugf("%v: %d blocks x%d", coll.UUID, len(blkids), repl)
        // Pass pdh to IncreaseDesired only if LostBlocksFile is being
        // written -- otherwise it's just a waste of memory.
@@ -774,19 +829,49 @@ func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) ba
        }
        blockState := computeBlockState(slots, nil, len(blk.Replicas), 0)
 
-       var lost bool
-       var changes []string
+       // Sort the slots by rendezvous order. This ensures "trash the
+       // first of N replicas with identical timestamps" is
+       // predictable (helpful for testing) and well distributed
+       // across servers.
+       sort.Slice(slots, func(i, j int) bool {
+               si, sj := slots[i], slots[j]
+               if orderi, orderj := srvRendezvous[si.mnt.KeepService], srvRendezvous[sj.mnt.KeepService]; orderi != orderj {
+                       return orderi < orderj
+               } else {
+                       return rendezvousLess(si.mnt.UUID, sj.mnt.UUID, blkid)
+               }
+       })
+
+       var (
+               lost         bool
+               changes      []string
+               trashedMtime = make(map[int64]bool, len(slots))
+       )
        for _, slot := range slots {
                // TODO: request a Touch if Mtime is duplicated.
                var change int
                switch {
                case !slot.want && slot.repl != nil && slot.repl.Mtime < bal.MinMtime:
-                       slot.mnt.KeepService.AddTrash(Trash{
-                               SizedDigest: blkid,
-                               Mtime:       slot.repl.Mtime,
-                               From:        slot.mnt,
-                       })
-                       change = changeTrash
+                       if trashedMtime[slot.repl.Mtime] {
+                               // Don't trash multiple replicas with
+                               // identical timestamps. If they are
+                               // multiple views of the same backing
+                               // storage, asking both servers to
+                               // trash is redundant and can cause
+                               // races (see #20242). If they are
+                               // distinct replicas that happen to
+                               // have identical timestamps, we'll
+                               // get this one on the next sweep.
+                               change = changeNone
+                       } else {
+                               slot.mnt.KeepService.AddTrash(Trash{
+                                       SizedDigest: blkid,
+                                       Mtime:       slot.repl.Mtime,
+                                       From:        slot.mnt,
+                               })
+                               change = changeTrash
+                               trashedMtime[slot.repl.Mtime] = true
+                       }
                case slot.repl == nil && slot.want && len(blk.Replicas) == 0:
                        lost = true
                        change = changeNone
@@ -1161,6 +1246,60 @@ func (bal *Balancer) time(name, help string) func() {
        }
 }
 
+// Log current memory usage: once now, at least once every 10 minutes,
+// and when memory grows by 40% since the last log. Stop when ctx is
+// canceled.
+func (bal *Balancer) reportMemorySize(ctx context.Context) {
+       buf, _ := os.ReadFile("/proc/self/smaps")
+       m := regexp.MustCompile(`\nKernelPageSize:\s*(\d+) kB\n`).FindSubmatch(buf)
+       var pagesize int64
+       if len(m) == 2 {
+               pagesize, _ = strconv.ParseInt(string(m[1]), 10, 64)
+               pagesize <<= 10
+       }
+       if pagesize == 0 {
+               bal.logf("cannot log OS-reported memory size: failed to parse KernelPageSize from /proc/self/smaps")
+       }
+       osstats := func() string {
+               if pagesize == 0 {
+                       return ""
+               }
+               buf, _ := os.ReadFile("/proc/self/statm")
+               fields := strings.Split(string(buf), " ")
+               if len(fields) < 2 {
+                       return ""
+               }
+               virt, _ := strconv.ParseInt(fields[0], 10, 64)
+               virt *= pagesize
+               res, _ := strconv.ParseInt(fields[1], 10, 64)
+               res *= pagesize
+               if virt == 0 || res == 0 {
+                       return ""
+               }
+               return fmt.Sprintf(" virt %d res %d", virt, res)
+       }
+
+       var nextTime time.Time
+       var nextMem uint64
+       const maxInterval = time.Minute * 10
+       const maxIncrease = 1.4
+
+       ticker := time.NewTicker(time.Second)
+       defer ticker.Stop()
+       var memstats runtime.MemStats
+       for ctx.Err() == nil {
+               now := time.Now()
+               runtime.ReadMemStats(&memstats)
+               mem := memstats.StackInuse + memstats.HeapInuse
+               if now.After(nextTime) || mem >= nextMem {
+                       bal.logf("heap %d stack %d heapalloc %d%s", memstats.HeapInuse, memstats.StackInuse, memstats.HeapAlloc, osstats())
+                       nextMem = uint64(float64(mem) * maxIncrease)
+                       nextTime = now.Add(maxInterval)
+               }
+               <-ticker.C
+       }
+}
+
 // Rendezvous hash sort function. Less efficient than sorting on
 // precomputed rendezvous hashes, but also rarely used.
 func rendezvousLess(i, j string, blkid arvados.SizedDigest) bool {