X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/9171d310942ea3c8e3bed3e21a1c2d0604e93ad6..1a169a434494175b208d0d5055bb42333d9b64b9:/services/keep-balance/balance.go diff --git a/services/keep-balance/balance.go b/services/keep-balance/balance.go index 1dedb409a4..215c5e1b5b 100644 --- a/services/keep-balance/balance.go +++ b/services/keep-balance/balance.go @@ -15,14 +15,17 @@ import ( "log" "math" "os" + "regexp" "runtime" "sort" + "strconv" "strings" "sync" "sync/atomic" "syscall" "time" + "git.arvados.org/arvados.git/lib/controller/dblock" "git.arvados.org/arvados.git/sdk/go/arvados" "git.arvados.org/arvados.git/sdk/go/keepclient" "github.com/jmoiron/sqlx" @@ -44,6 +47,7 @@ type Balancer struct { Dumper logrus.FieldLogger Metrics *metrics + ChunkPrefix string LostBlocksFile string *BlockStateMap @@ -67,18 +71,23 @@ type Balancer struct { // subsequent balance operation. // // Run should only be called once on a given Balancer object. -// -// Typical usage: -// -// runOptions, err = (&Balancer{}).Run(config, runOptions) -func (bal *Balancer) Run(client *arvados.Client, cluster *arvados.Cluster, runOptions RunOptions) (nextRunOptions RunOptions, err error) { +func (bal *Balancer) Run(ctx context.Context, client *arvados.Client, cluster *arvados.Cluster, runOptions RunOptions) (nextRunOptions RunOptions, err error) { nextRunOptions = runOptions + bal.logf("acquiring active lock") + if !dblock.KeepBalanceActive.Lock(ctx, func(context.Context) (*sqlx.DB, error) { return bal.DB, nil }) { + // context canceled + return + } + defer dblock.KeepBalanceActive.Unlock() + defer bal.time("sweep", "wall clock time to run one full sweep")() - ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(cluster.Collections.BalanceTimeout.Duration())) + ctx, cancel := context.WithDeadline(ctx, time.Now().Add(cluster.Collections.BalanceTimeout.Duration())) defer cancel() + go bal.reportMemorySize(ctx) + var lbFile *os.File if bal.LostBlocksFile != "" { tmpfn := bal.LostBlocksFile + ".tmp" @@ -266,6 +275,14 @@ func (bal *Balancer) CheckSanityEarly(c *arvados.Client) error { return fmt.Errorf("config error: %s: proxy servers cannot be balanced", srv) } } + for _, c := range bal.ChunkPrefix { + if !strings.ContainsRune("0123456789abcdef", c) { + return fmt.Errorf("invalid char %q in chunk prefix %q: only lowercase hex digits make sense", string(c), bal.ChunkPrefix) + } + } + if len(bal.ChunkPrefix) > 32 { + return fmt.Errorf("invalid chunk prefix %q: longer than a block hash", bal.ChunkPrefix) + } mountProblem := false type deviceMount struct { @@ -398,7 +415,7 @@ func (bal *Balancer) GetCurrentState(ctx context.Context, c *arvados.Client, pag go func(mounts []*KeepMount) { defer wg.Done() bal.logf("mount %s: retrieve index from %s", mounts[0], mounts[0].KeepService) - idx, err := mounts[0].KeepService.IndexMount(ctx, c, mounts[0].UUID, "") + idx, err := mounts[0].KeepService.IndexMount(ctx, c, mounts[0].UUID, bal.ChunkPrefix) if err != nil { select { case errs <- fmt.Errorf("%s: retrieve index: %v", mounts[0], err): @@ -490,6 +507,20 @@ func (bal *Balancer) addCollection(coll arvados.Collection) error { if coll.ReplicationDesired != nil { repl = *coll.ReplicationDesired } + if bal.ChunkPrefix != "" { + // Throw out blocks that don't match the requested + // prefix. (We save a bit of GC work here by + // preallocating based on each hex digit in + // ChunkPrefix reducing the expected size of the + // filtered set by ~16x.) + filtered := make([]arvados.SizedDigest, 0, len(blkids)>>(4*len(bal.ChunkPrefix)-1)) + for _, blkid := range blkids { + if strings.HasPrefix(string(blkid), bal.ChunkPrefix) { + filtered = append(filtered, blkid) + } + } + blkids = filtered + } bal.Logger.Debugf("%v: %d blocks x%d", coll.UUID, len(blkids), repl) // Pass pdh to IncreaseDesired only if LostBlocksFile is being // written -- otherwise it's just a waste of memory. @@ -798,19 +829,49 @@ func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) ba } blockState := computeBlockState(slots, nil, len(blk.Replicas), 0) - var lost bool - var changes []string + // Sort the slots by rendezvous order. This ensures "trash the + // first of N replicas with identical timestamps" is + // predictable (helpful for testing) and well distributed + // across servers. + sort.Slice(slots, func(i, j int) bool { + si, sj := slots[i], slots[j] + if orderi, orderj := srvRendezvous[si.mnt.KeepService], srvRendezvous[sj.mnt.KeepService]; orderi != orderj { + return orderi < orderj + } else { + return rendezvousLess(si.mnt.UUID, sj.mnt.UUID, blkid) + } + }) + + var ( + lost bool + changes []string + trashedMtime = make(map[int64]bool, len(slots)) + ) for _, slot := range slots { // TODO: request a Touch if Mtime is duplicated. var change int switch { case !slot.want && slot.repl != nil && slot.repl.Mtime < bal.MinMtime: - slot.mnt.KeepService.AddTrash(Trash{ - SizedDigest: blkid, - Mtime: slot.repl.Mtime, - From: slot.mnt, - }) - change = changeTrash + if trashedMtime[slot.repl.Mtime] { + // Don't trash multiple replicas with + // identical timestamps. If they are + // multiple views of the same backing + // storage, asking both servers to + // trash is redundant and can cause + // races (see #20242). If they are + // distinct replicas that happen to + // have identical timestamps, we'll + // get this one on the next sweep. + change = changeNone + } else { + slot.mnt.KeepService.AddTrash(Trash{ + SizedDigest: blkid, + Mtime: slot.repl.Mtime, + From: slot.mnt, + }) + change = changeTrash + trashedMtime[slot.repl.Mtime] = true + } case slot.repl == nil && slot.want && len(blk.Replicas) == 0: lost = true change = changeNone @@ -1185,6 +1246,60 @@ func (bal *Balancer) time(name, help string) func() { } } +// Log current memory usage: once now, at least once every 10 minutes, +// and when memory grows by 40% since the last log. Stop when ctx is +// canceled. +func (bal *Balancer) reportMemorySize(ctx context.Context) { + buf, _ := os.ReadFile("/proc/self/smaps") + m := regexp.MustCompile(`\nKernelPageSize:\s*(\d+) kB\n`).FindSubmatch(buf) + var pagesize int64 + if len(m) == 2 { + pagesize, _ = strconv.ParseInt(string(m[1]), 10, 64) + pagesize <<= 10 + } + if pagesize == 0 { + bal.logf("cannot log OS-reported memory size: failed to parse KernelPageSize from /proc/self/smaps") + } + osstats := func() string { + if pagesize == 0 { + return "" + } + buf, _ := os.ReadFile("/proc/self/statm") + fields := strings.Split(string(buf), " ") + if len(fields) < 2 { + return "" + } + virt, _ := strconv.ParseInt(fields[0], 10, 64) + virt *= pagesize + res, _ := strconv.ParseInt(fields[1], 10, 64) + res *= pagesize + if virt == 0 || res == 0 { + return "" + } + return fmt.Sprintf(" virt %d res %d", virt, res) + } + + var nextTime time.Time + var nextMem uint64 + const maxInterval = time.Minute * 10 + const maxIncrease = 1.4 + + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + var memstats runtime.MemStats + for ctx.Err() == nil { + now := time.Now() + runtime.ReadMemStats(&memstats) + mem := memstats.StackInuse + memstats.HeapInuse + if now.After(nextTime) || mem >= nextMem { + bal.logf("heap %d stack %d heapalloc %d%s", memstats.HeapInuse, memstats.StackInuse, memstats.HeapAlloc, osstats()) + nextMem = uint64(float64(mem) * maxIncrease) + nextTime = now.Add(maxInterval) + } + <-ticker.C + } +} + // Rendezvous hash sort function. Less efficient than sorting on // precomputed rendezvous hashes, but also rarely used. func rendezvousLess(i, j string, blkid arvados.SizedDigest) bool {