X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/6d95130da47af9fd0290d3c8f80a0364faf74957..dd3fd89528fa5c21001bbec9048f1d2710d80689:/services/keep-balance/balance.go diff --git a/services/keep-balance/balance.go b/services/keep-balance/balance.go index 3c35d304cb..e71eb07efa 100644 --- a/services/keep-balance/balance.go +++ b/services/keep-balance/balance.go @@ -2,26 +2,33 @@ // // SPDX-License-Identifier: AGPL-3.0 -package main +package keepbalance import ( "bytes" + "context" "crypto/md5" + "errors" "fmt" "io" "io/ioutil" "log" "math" "os" + "regexp" "runtime" "sort" + "strconv" "strings" "sync" + "sync/atomic" "syscall" "time" + "git.arvados.org/arvados.git/lib/controller/dblock" "git.arvados.org/arvados.git/sdk/go/arvados" "git.arvados.org/arvados.git/sdk/go/keepclient" + "github.com/jmoiron/sqlx" "github.com/sirupsen/logrus" ) @@ -35,10 +42,12 @@ import ( // BlobSignatureTTL; and all N existing replicas of a given data block // are in the N best positions in rendezvous probe order. type Balancer struct { + DB *sqlx.DB Logger logrus.FieldLogger Dumper logrus.FieldLogger Metrics *metrics + ChunkPrefix string LostBlocksFile string *BlockStateMap @@ -49,7 +58,7 @@ type Balancer struct { classes []string mounts int mountsByClass map[string]map[*KeepMount]bool - collScanned int + collScanned int64 serviceRoots map[string]string errors []error stats balancerStats @@ -62,15 +71,23 @@ type Balancer struct { // subsequent balance operation. // // Run should only be called once on a given Balancer object. -// -// Typical usage: -// -// runOptions, err = (&Balancer{}).Run(config, runOptions) -func (bal *Balancer) Run(client *arvados.Client, cluster *arvados.Cluster, runOptions RunOptions) (nextRunOptions RunOptions, err error) { +func (bal *Balancer) Run(ctx context.Context, client *arvados.Client, cluster *arvados.Cluster, runOptions RunOptions) (nextRunOptions RunOptions, err error) { nextRunOptions = runOptions + bal.logf("acquiring active lock") + if !dblock.KeepBalanceActive.Lock(ctx, func(context.Context) (*sqlx.DB, error) { return bal.DB, nil }) { + // context canceled + return + } + defer dblock.KeepBalanceActive.Unlock() + defer bal.time("sweep", "wall clock time to run one full sweep")() + ctx, cancel := context.WithDeadline(ctx, time.Now().Add(cluster.Collections.BalanceTimeout.Duration())) + defer cancel() + + go bal.reportMemorySize(ctx) + var lbFile *os.File if bal.LostBlocksFile != "" { tmpfn := bal.LostBlocksFile + ".tmp" @@ -111,13 +128,21 @@ func (bal *Balancer) Run(client *arvados.Client, cluster *arvados.Cluster, runOp if err = bal.CheckSanityEarly(client); err != nil { return } + + // On a big site, indexing and sending trash/pull lists can + // take much longer than the usual 5 minute client + // timeout. From here on, we rely on the context deadline + // instead, aborting the entire operation if any part takes + // too long. + client.Timeout = 0 + rs := bal.rendezvousState() - if runOptions.CommitTrash && rs != runOptions.SafeRendezvousState { + if cluster.Collections.BalanceTrashLimit > 0 && rs != runOptions.SafeRendezvousState { if runOptions.SafeRendezvousState != "" { bal.logf("notice: KeepServices list has changed since last run") } bal.logf("clearing existing trash lists, in case the new rendezvous order differs from previous run") - if err = bal.ClearTrashLists(client); err != nil { + if err = bal.ClearTrashLists(ctx, client); err != nil { return } // The current rendezvous state becomes "safe" (i.e., @@ -126,9 +151,11 @@ func (bal *Balancer) Run(client *arvados.Client, cluster *arvados.Cluster, runOp // succeed in clearing existing trash lists. nextRunOptions.SafeRendezvousState = rs } - if err = bal.GetCurrentState(client, cluster.Collections.BalanceCollectionBatch, cluster.Collections.BalanceCollectionBuffers); err != nil { + + if err = bal.GetCurrentState(ctx, client, cluster.Collections.BalanceCollectionBatch, cluster.Collections.BalanceCollectionBuffers); err != nil { return } + bal.setupLookupTables(cluster) bal.ComputeChangeSets() bal.PrintStatistics() if err = bal.CheckSanityLate(); err != nil { @@ -145,15 +172,24 @@ func (bal *Balancer) Run(client *arvados.Client, cluster *arvados.Cluster, runOp } lbFile = nil } - if runOptions.CommitPulls { - err = bal.CommitPulls(client) + if cluster.Collections.BalancePullLimit > 0 { + err = bal.CommitPulls(ctx, client) if err != nil { // Skip trash if we can't pull. (Too cautious?) return } } - if runOptions.CommitTrash { - err = bal.CommitTrash(client) + if cluster.Collections.BalanceTrashLimit > 0 { + err = bal.CommitTrash(ctx, client) + if err != nil { + return + } + } + if runOptions.CommitConfirmedFields { + err = bal.updateCollections(ctx, client, cluster) + if err != nil { + return + } } return } @@ -192,8 +228,8 @@ func (bal *Balancer) cleanupMounts() { rwdev := map[string]*KeepService{} for _, srv := range bal.KeepServices { for _, mnt := range srv.mounts { - if !mnt.ReadOnly && mnt.DeviceID != "" { - rwdev[mnt.DeviceID] = srv + if mnt.AllowWrite { + rwdev[mnt.UUID] = srv } } } @@ -202,8 +238,8 @@ func (bal *Balancer) cleanupMounts() { for _, srv := range bal.KeepServices { var dedup []*KeepMount for _, mnt := range srv.mounts { - if mnt.ReadOnly && rwdev[mnt.DeviceID] != nil { - bal.logf("skipping srv %s readonly mount %q because same device %q is mounted read-write on srv %s", srv, mnt.UUID, mnt.DeviceID, rwdev[mnt.DeviceID]) + if !mnt.AllowWrite && rwdev[mnt.UUID] != nil { + bal.logf("skipping srv %s readonly mount %q because same volume is mounted read-write on srv %s", srv, mnt.UUID, rwdev[mnt.UUID]) } else { dedup = append(dedup, mnt) } @@ -240,6 +276,37 @@ func (bal *Balancer) CheckSanityEarly(c *arvados.Client) error { return fmt.Errorf("config error: %s: proxy servers cannot be balanced", srv) } } + for _, c := range bal.ChunkPrefix { + if !strings.ContainsRune("0123456789abcdef", c) { + return fmt.Errorf("invalid char %q in chunk prefix %q: only lowercase hex digits make sense", string(c), bal.ChunkPrefix) + } + } + if len(bal.ChunkPrefix) > 32 { + return fmt.Errorf("invalid chunk prefix %q: longer than a block hash", bal.ChunkPrefix) + } + + mountProblem := false + type deviceMount struct { + srv *KeepService + mnt *KeepMount + } + deviceMounted := map[string]deviceMount{} // DeviceID -> mount + for _, srv := range bal.KeepServices { + for _, mnt := range srv.mounts { + if first, dup := deviceMounted[mnt.DeviceID]; dup && first.mnt.UUID != mnt.UUID && mnt.DeviceID != "" { + bal.logf("config error: device %s is mounted with multiple volume UUIDs: %s on %s, and %s on %s", + mnt.DeviceID, + first.mnt.UUID, first.srv, + mnt.UUID, srv) + mountProblem = true + continue + } + deviceMounted[mnt.DeviceID] = deviceMount{srv, mnt} + } + } + if mountProblem { + return errors.New("cannot continue with config errors (see above)") + } var checkPage arvados.CollectionList if err = c.RequestAndDecode(&checkPage, "GET", "arvados/v1/collections", nil, arvados.ResourceListParams{ @@ -286,11 +353,11 @@ func (bal *Balancer) rendezvousState() string { // We avoid this problem if we clear all trash lists before getting // indexes. (We also assume there is only one rebalancing process // running at a time.) -func (bal *Balancer) ClearTrashLists(c *arvados.Client) error { +func (bal *Balancer) ClearTrashLists(ctx context.Context, c *arvados.Client) error { for _, srv := range bal.KeepServices { srv.ChangeSet = &ChangeSet{} } - return bal.CommitTrash(c) + return bal.CommitTrash(ctx, c) } // GetCurrentState determines the current replication state, and the @@ -304,7 +371,10 @@ func (bal *Balancer) ClearTrashLists(c *arvados.Client) error { // collection manifests in the database (API server). // // It encodes the resulting information in BlockStateMap. -func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) error { +func (bal *Balancer) GetCurrentState(ctx context.Context, c *arvados.Client, pageSize, bufs int) error { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + defer bal.time("get_state", "wall clock time to get current state")() bal.BlockStateMap = NewBlockStateMap() @@ -329,12 +399,10 @@ func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) erro deviceMount := map[string]*KeepMount{} for _, srv := range bal.KeepServices { for _, mnt := range srv.mounts { - equiv := deviceMount[mnt.DeviceID] + equiv := deviceMount[mnt.UUID] if equiv == nil { equiv = mnt - if mnt.DeviceID != "" { - deviceMount[mnt.DeviceID] = equiv - } + deviceMount[mnt.UUID] = equiv } equivMount[equiv] = append(equivMount[equiv], mnt) } @@ -348,12 +416,13 @@ func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) erro go func(mounts []*KeepMount) { defer wg.Done() bal.logf("mount %s: retrieve index from %s", mounts[0], mounts[0].KeepService) - idx, err := mounts[0].KeepService.IndexMount(c, mounts[0].UUID, "") + idx, err := mounts[0].KeepService.IndexMount(ctx, c, mounts[0].UUID, bal.ChunkPrefix) if err != nil { select { case errs <- fmt.Errorf("%s: retrieve index: %v", mounts[0], err): default: } + cancel() return } if len(errs) > 0 { @@ -371,38 +440,14 @@ func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) erro }(mounts) } - // collQ buffers incoming collections so we can start fetching - // the next page without waiting for the current page to - // finish processing. collQ := make(chan arvados.Collection, bufs) - // Start a goroutine to process collections. (We could use a - // worker pool here, but even with a single worker we already - // process collections much faster than we can retrieve them.) - wg.Add(1) - go func() { - defer wg.Done() - for coll := range collQ { - err := bal.addCollection(coll) - if err != nil || len(errs) > 0 { - select { - case errs <- err: - default: - } - for range collQ { - } - return - } - bal.collScanned++ - } - }() - - // Start a goroutine to retrieve all collections from the - // Arvados database and send them to collQ for processing. + // Retrieve all collections from the database and send them to + // collQ. wg.Add(1) go func() { defer wg.Done() - err = EachCollection(c, pageSize, + err = EachCollection(ctx, bal.DB, c, func(coll arvados.Collection) error { collQ <- coll if len(errs) > 0 { @@ -422,9 +467,31 @@ func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) erro case errs <- err: default: } + cancel() } }() + // Parse manifests from collQ and pass the block hashes to + // BlockStateMap to track desired replication. + for i := 0; i < runtime.NumCPU(); i++ { + wg.Add(1) + go func() { + defer wg.Done() + for coll := range collQ { + err := bal.addCollection(coll) + if err != nil || len(errs) > 0 { + select { + case errs <- err: + default: + } + cancel() + continue + } + atomic.AddInt64(&bal.collScanned, 1) + } + }() + } + wg.Wait() if len(errs) > 0 { return <-errs @@ -441,7 +508,21 @@ func (bal *Balancer) addCollection(coll arvados.Collection) error { if coll.ReplicationDesired != nil { repl = *coll.ReplicationDesired } - bal.Logger.Debugf("%v: %d block x%d", coll.UUID, len(blkids), repl) + if bal.ChunkPrefix != "" { + // Throw out blocks that don't match the requested + // prefix. (We save a bit of GC work here by + // preallocating based on each hex digit in + // ChunkPrefix reducing the expected size of the + // filtered set by ~16x.) + filtered := make([]arvados.SizedDigest, 0, len(blkids)>>(4*len(bal.ChunkPrefix)-1)) + for _, blkid := range blkids { + if strings.HasPrefix(string(blkid), bal.ChunkPrefix) { + filtered = append(filtered, blkid) + } + } + blkids = filtered + } + bal.Logger.Debugf("%v: %d blocks x%d", coll.UUID, len(blkids), repl) // Pass pdh to IncreaseDesired only if LostBlocksFile is being // written -- otherwise it's just a waste of memory. pdh := "" @@ -462,7 +543,6 @@ func (bal *Balancer) ComputeChangeSets() { // This just calls balanceBlock() once for each block, using a // pool of worker goroutines. defer bal.time("changeset_compute", "wall clock time to compute changesets")() - bal.setupLookupTables() type balanceTask struct { blkid arvados.SizedDigest @@ -497,7 +577,7 @@ func (bal *Balancer) ComputeChangeSets() { bal.collectStatistics(results) } -func (bal *Balancer) setupLookupTables() { +func (bal *Balancer) setupLookupTables(cluster *arvados.Cluster) { bal.serviceRoots = make(map[string]string) bal.classes = defaultClasses bal.mountsByClass = map[string]map[*KeepMount]bool{"default": {}} @@ -507,14 +587,12 @@ func (bal *Balancer) setupLookupTables() { for _, mnt := range srv.mounts { bal.mounts++ - // All mounts on a read-only service are - // effectively read-only. - mnt.ReadOnly = mnt.ReadOnly || srv.ReadOnly - - if len(mnt.StorageClasses) == 0 { - bal.mountsByClass["default"][mnt] = true - continue + if srv.ReadOnly { + // All mounts on a read-only service + // are effectively read-only. + mnt.AllowWrite = false } + for class := range mnt.StorageClasses { if mbc := bal.mountsByClass[class]; mbc == nil { bal.classes = append(bal.classes, class) @@ -531,6 +609,13 @@ func (bal *Balancer) setupLookupTables() { // class" case in balanceBlock depends on the order classes // are considered. sort.Strings(bal.classes) + + for _, srv := range bal.KeepServices { + srv.ChangeSet = &ChangeSet{ + PullLimit: cluster.Collections.BalancePullLimit, + TrashLimit: cluster.Collections.BalanceTrashLimit, + } + } } const ( @@ -591,7 +676,7 @@ func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) ba slots = append(slots, slot{ mnt: mnt, repl: repl, - want: repl != nil && mnt.ReadOnly, + want: repl != nil && !mnt.AllowTrash, }) } } @@ -644,7 +729,7 @@ func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) ba // new/remaining replicas uniformly // across qualifying mounts on a given // server. - return rendezvousLess(si.mnt.DeviceID, sj.mnt.DeviceID, blkid) + return rendezvousLess(si.mnt.UUID, sj.mnt.UUID, blkid) } }) @@ -669,7 +754,7 @@ func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) ba // and returns true if all requirements are met. trySlot := func(i int) bool { slot := slots[i] - if wantMnt[slot.mnt] || wantDev[slot.mnt.DeviceID] { + if wantMnt[slot.mnt] || wantDev[slot.mnt.UUID] { // Already allocated a replica to this // backend device, possibly on a // different server. @@ -680,13 +765,11 @@ func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) ba protMnt[slot.mnt] = true replProt += slot.mnt.Replication } - if replWant < desired && (slot.repl != nil || !slot.mnt.ReadOnly) { + if replWant < desired && (slot.repl != nil || slot.mnt.AllowWrite) { slots[i].want = true wantSrv[slot.mnt.KeepService] = true wantMnt[slot.mnt] = true - if slot.mnt.DeviceID != "" { - wantDev[slot.mnt.DeviceID] = true - } + wantDev[slot.mnt.UUID] = true replWant += slot.mnt.Replication } return replProt >= desired && replWant >= desired @@ -728,7 +811,7 @@ func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) ba // haven't already been added to unsafeToDelete // because the servers report different Mtimes. for _, slot := range slots { - if slot.repl != nil && wantDev[slot.mnt.DeviceID] { + if slot.repl != nil && wantDev[slot.mnt.UUID] { unsafeToDelete[slot.repl.Mtime] = true } } @@ -755,23 +838,53 @@ func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) ba } blockState := computeBlockState(slots, nil, len(blk.Replicas), 0) - var lost bool - var changes []string + // Sort the slots by rendezvous order. This ensures "trash the + // first of N replicas with identical timestamps" is + // predictable (helpful for testing) and well distributed + // across servers. + sort.Slice(slots, func(i, j int) bool { + si, sj := slots[i], slots[j] + if orderi, orderj := srvRendezvous[si.mnt.KeepService], srvRendezvous[sj.mnt.KeepService]; orderi != orderj { + return orderi < orderj + } else { + return rendezvousLess(si.mnt.UUID, sj.mnt.UUID, blkid) + } + }) + + var ( + lost bool + changes []string + trashedMtime = make(map[int64]bool, len(slots)) + ) for _, slot := range slots { // TODO: request a Touch if Mtime is duplicated. var change int switch { case !slot.want && slot.repl != nil && slot.repl.Mtime < bal.MinMtime: - slot.mnt.KeepService.AddTrash(Trash{ - SizedDigest: blkid, - Mtime: slot.repl.Mtime, - From: slot.mnt, - }) - change = changeTrash + if trashedMtime[slot.repl.Mtime] { + // Don't trash multiple replicas with + // identical timestamps. If they are + // multiple views of the same backing + // storage, asking both servers to + // trash is redundant and can cause + // races (see #20242). If they are + // distinct replicas that happen to + // have identical timestamps, we'll + // get this one on the next sweep. + change = changeNone + } else { + slot.mnt.KeepService.AddTrash(Trash{ + SizedDigest: blkid, + Mtime: slot.repl.Mtime, + From: slot.mnt, + }) + change = changeTrash + trashedMtime[slot.repl.Mtime] = true + } case slot.repl == nil && slot.want && len(blk.Replicas) == 0: lost = true change = changeNone - case slot.repl == nil && slot.want && !slot.mnt.ReadOnly: + case slot.repl == nil && slot.want && slot.mnt.AllowWrite: slot.mnt.KeepService.AddPull(Pull{ SizedDigest: blkid, From: blk.Replicas[0].KeepMount.KeepService, @@ -811,7 +924,7 @@ func computeBlockState(slots []slot, onlyCount map[*KeepMount]bool, have, needRe if onlyCount != nil && !onlyCount[slot.mnt] { continue } - if countedDev[slot.mnt.DeviceID] { + if countedDev[slot.mnt.UUID] { continue } switch { @@ -825,9 +938,7 @@ func computeBlockState(slots []slot, onlyCount map[*KeepMount]bool, have, needRe bbs.pulling++ repl += slot.mnt.Replication } - if slot.mnt.DeviceID != "" { - countedDev[slot.mnt.DeviceID] = true - } + countedDev[slot.mnt.UUID] = true } if repl < needRepl { bbs.unachievable = true @@ -853,19 +964,21 @@ type replicationStats struct { } type balancerStats struct { - lost blocksNBytes - overrep blocksNBytes - unref blocksNBytes - garbage blocksNBytes - underrep blocksNBytes - unachievable blocksNBytes - justright blocksNBytes - desired blocksNBytes - current blocksNBytes - pulls int - trashes int - replHistogram []int - classStats map[string]replicationStats + lost blocksNBytes + overrep blocksNBytes + unref blocksNBytes + garbage blocksNBytes + underrep blocksNBytes + unachievable blocksNBytes + justright blocksNBytes + desired blocksNBytes + current blocksNBytes + pulls int + pullsDeferred int + trashes int + trashesDeferred int + replHistogram []int + classStats map[string]replicationStats // collectionBytes / collectionBlockBytes = deduplication ratio collectionBytes int64 // sum(bytes in referenced blocks) across all collections @@ -988,7 +1101,9 @@ func (bal *Balancer) collectStatistics(results <-chan balanceResult) { } for _, srv := range bal.KeepServices { s.pulls += len(srv.ChangeSet.Pulls) + s.pullsDeferred += srv.ChangeSet.PullsDeferred s.trashes += len(srv.ChangeSet.Trashes) + s.trashesDeferred += srv.ChangeSet.TrashesDeferred } bal.stats = s bal.Metrics.UpdateStats(s) @@ -1084,22 +1199,22 @@ func (bal *Balancer) CheckSanityLate() error { // keepstore servers. This has the effect of increasing replication of // existing blocks that are either underreplicated or poorly // distributed according to rendezvous hashing. -func (bal *Balancer) CommitPulls(c *arvados.Client) error { +func (bal *Balancer) CommitPulls(ctx context.Context, c *arvados.Client) error { defer bal.time("send_pull_lists", "wall clock time to send pull lists")() return bal.commitAsync(c, "send pull list", func(srv *KeepService) error { - return srv.CommitPulls(c) + return srv.CommitPulls(ctx, c) }) } // CommitTrash sends the computed lists of trash requests to the // keepstore servers. This has the effect of deleting blocks that are // overreplicated or unreferenced. -func (bal *Balancer) CommitTrash(c *arvados.Client) error { +func (bal *Balancer) CommitTrash(ctx context.Context, c *arvados.Client) error { defer bal.time("send_trash_lists", "wall clock time to send trash lists")() return bal.commitAsync(c, "send trash list", func(srv *KeepService) error { - return srv.CommitTrash(c) + return srv.CommitTrash(ctx, c) }) } @@ -1144,6 +1259,60 @@ func (bal *Balancer) time(name, help string) func() { } } +// Log current memory usage: once now, at least once every 10 minutes, +// and when memory grows by 40% since the last log. Stop when ctx is +// canceled. +func (bal *Balancer) reportMemorySize(ctx context.Context) { + buf, _ := os.ReadFile("/proc/self/smaps") + m := regexp.MustCompile(`\nKernelPageSize:\s*(\d+) kB\n`).FindSubmatch(buf) + var pagesize int64 + if len(m) == 2 { + pagesize, _ = strconv.ParseInt(string(m[1]), 10, 64) + pagesize <<= 10 + } + if pagesize == 0 { + bal.logf("cannot log OS-reported memory size: failed to parse KernelPageSize from /proc/self/smaps") + } + osstats := func() string { + if pagesize == 0 { + return "" + } + buf, _ := os.ReadFile("/proc/self/statm") + fields := strings.Split(string(buf), " ") + if len(fields) < 2 { + return "" + } + virt, _ := strconv.ParseInt(fields[0], 10, 64) + virt *= pagesize + res, _ := strconv.ParseInt(fields[1], 10, 64) + res *= pagesize + if virt == 0 || res == 0 { + return "" + } + return fmt.Sprintf(" virt %d res %d", virt, res) + } + + var nextTime time.Time + var nextMem uint64 + const maxInterval = time.Minute * 10 + const maxIncrease = 1.4 + + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + var memstats runtime.MemStats + for ctx.Err() == nil { + now := time.Now() + runtime.ReadMemStats(&memstats) + mem := memstats.StackInuse + memstats.HeapInuse + if now.After(nextTime) || mem >= nextMem { + bal.logf("heap %d stack %d heapalloc %d%s", memstats.HeapInuse, memstats.StackInuse, memstats.HeapAlloc, osstats()) + nextMem = uint64(float64(mem) * maxIncrease) + nextTime = now.Add(maxInterval) + } + <-ticker.C + } +} + // Rendezvous hash sort function. Less efficient than sorting on // precomputed rendezvous hashes, but also rarely used. func rendezvousLess(i, j string, blkid arvados.SizedDigest) bool {