X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/37eb070f55b5ae0c622fb4bf0a946c9dd49b2752..dc16046dbfdca4a9c0d94971730d220b27e80620:/services/keep-balance/balance.go diff --git a/services/keep-balance/balance.go b/services/keep-balance/balance.go index 33c907c203..e71eb07efa 100644 --- a/services/keep-balance/balance.go +++ b/services/keep-balance/balance.go @@ -137,7 +137,7 @@ func (bal *Balancer) Run(ctx context.Context, client *arvados.Client, cluster *a client.Timeout = 0 rs := bal.rendezvousState() - if runOptions.CommitTrash && rs != runOptions.SafeRendezvousState { + if cluster.Collections.BalanceTrashLimit > 0 && rs != runOptions.SafeRendezvousState { if runOptions.SafeRendezvousState != "" { bal.logf("notice: KeepServices list has changed since last run") } @@ -155,6 +155,7 @@ func (bal *Balancer) Run(ctx context.Context, client *arvados.Client, cluster *a if err = bal.GetCurrentState(ctx, client, cluster.Collections.BalanceCollectionBatch, cluster.Collections.BalanceCollectionBuffers); err != nil { return } + bal.setupLookupTables(cluster) bal.ComputeChangeSets() bal.PrintStatistics() if err = bal.CheckSanityLate(); err != nil { @@ -171,14 +172,14 @@ func (bal *Balancer) Run(ctx context.Context, client *arvados.Client, cluster *a } lbFile = nil } - if runOptions.CommitPulls { + if cluster.Collections.BalancePullLimit > 0 { err = bal.CommitPulls(ctx, client) if err != nil { // Skip trash if we can't pull. (Too cautious?) return } } - if runOptions.CommitTrash { + if cluster.Collections.BalanceTrashLimit > 0 { err = bal.CommitTrash(ctx, client) if err != nil { return @@ -227,7 +228,7 @@ func (bal *Balancer) cleanupMounts() { rwdev := map[string]*KeepService{} for _, srv := range bal.KeepServices { for _, mnt := range srv.mounts { - if !mnt.ReadOnly { + if mnt.AllowWrite { rwdev[mnt.UUID] = srv } } @@ -237,7 +238,7 @@ func (bal *Balancer) cleanupMounts() { for _, srv := range bal.KeepServices { var dedup []*KeepMount for _, mnt := range srv.mounts { - if mnt.ReadOnly && rwdev[mnt.UUID] != nil { + if !mnt.AllowWrite && rwdev[mnt.UUID] != nil { bal.logf("skipping srv %s readonly mount %q because same volume is mounted read-write on srv %s", srv, mnt.UUID, rwdev[mnt.UUID]) } else { dedup = append(dedup, mnt) @@ -542,7 +543,6 @@ func (bal *Balancer) ComputeChangeSets() { // This just calls balanceBlock() once for each block, using a // pool of worker goroutines. defer bal.time("changeset_compute", "wall clock time to compute changesets")() - bal.setupLookupTables() type balanceTask struct { blkid arvados.SizedDigest @@ -577,7 +577,7 @@ func (bal *Balancer) ComputeChangeSets() { bal.collectStatistics(results) } -func (bal *Balancer) setupLookupTables() { +func (bal *Balancer) setupLookupTables(cluster *arvados.Cluster) { bal.serviceRoots = make(map[string]string) bal.classes = defaultClasses bal.mountsByClass = map[string]map[*KeepMount]bool{"default": {}} @@ -587,9 +587,11 @@ func (bal *Balancer) setupLookupTables() { for _, mnt := range srv.mounts { bal.mounts++ - // All mounts on a read-only service are - // effectively read-only. - mnt.ReadOnly = mnt.ReadOnly || srv.ReadOnly + if srv.ReadOnly { + // All mounts on a read-only service + // are effectively read-only. + mnt.AllowWrite = false + } for class := range mnt.StorageClasses { if mbc := bal.mountsByClass[class]; mbc == nil { @@ -607,6 +609,13 @@ func (bal *Balancer) setupLookupTables() { // class" case in balanceBlock depends on the order classes // are considered. sort.Strings(bal.classes) + + for _, srv := range bal.KeepServices { + srv.ChangeSet = &ChangeSet{ + PullLimit: cluster.Collections.BalancePullLimit, + TrashLimit: cluster.Collections.BalanceTrashLimit, + } + } } const ( @@ -667,7 +676,7 @@ func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) ba slots = append(slots, slot{ mnt: mnt, repl: repl, - want: repl != nil && mnt.ReadOnly, + want: repl != nil && !mnt.AllowTrash, }) } } @@ -756,7 +765,7 @@ func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) ba protMnt[slot.mnt] = true replProt += slot.mnt.Replication } - if replWant < desired && (slot.repl != nil || !slot.mnt.ReadOnly) { + if replWant < desired && (slot.repl != nil || slot.mnt.AllowWrite) { slots[i].want = true wantSrv[slot.mnt.KeepService] = true wantMnt[slot.mnt] = true @@ -829,23 +838,53 @@ func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) ba } blockState := computeBlockState(slots, nil, len(blk.Replicas), 0) - var lost bool - var changes []string + // Sort the slots by rendezvous order. This ensures "trash the + // first of N replicas with identical timestamps" is + // predictable (helpful for testing) and well distributed + // across servers. + sort.Slice(slots, func(i, j int) bool { + si, sj := slots[i], slots[j] + if orderi, orderj := srvRendezvous[si.mnt.KeepService], srvRendezvous[sj.mnt.KeepService]; orderi != orderj { + return orderi < orderj + } else { + return rendezvousLess(si.mnt.UUID, sj.mnt.UUID, blkid) + } + }) + + var ( + lost bool + changes []string + trashedMtime = make(map[int64]bool, len(slots)) + ) for _, slot := range slots { // TODO: request a Touch if Mtime is duplicated. var change int switch { case !slot.want && slot.repl != nil && slot.repl.Mtime < bal.MinMtime: - slot.mnt.KeepService.AddTrash(Trash{ - SizedDigest: blkid, - Mtime: slot.repl.Mtime, - From: slot.mnt, - }) - change = changeTrash + if trashedMtime[slot.repl.Mtime] { + // Don't trash multiple replicas with + // identical timestamps. If they are + // multiple views of the same backing + // storage, asking both servers to + // trash is redundant and can cause + // races (see #20242). If they are + // distinct replicas that happen to + // have identical timestamps, we'll + // get this one on the next sweep. + change = changeNone + } else { + slot.mnt.KeepService.AddTrash(Trash{ + SizedDigest: blkid, + Mtime: slot.repl.Mtime, + From: slot.mnt, + }) + change = changeTrash + trashedMtime[slot.repl.Mtime] = true + } case slot.repl == nil && slot.want && len(blk.Replicas) == 0: lost = true change = changeNone - case slot.repl == nil && slot.want && !slot.mnt.ReadOnly: + case slot.repl == nil && slot.want && slot.mnt.AllowWrite: slot.mnt.KeepService.AddPull(Pull{ SizedDigest: blkid, From: blk.Replicas[0].KeepMount.KeepService, @@ -925,19 +964,21 @@ type replicationStats struct { } type balancerStats struct { - lost blocksNBytes - overrep blocksNBytes - unref blocksNBytes - garbage blocksNBytes - underrep blocksNBytes - unachievable blocksNBytes - justright blocksNBytes - desired blocksNBytes - current blocksNBytes - pulls int - trashes int - replHistogram []int - classStats map[string]replicationStats + lost blocksNBytes + overrep blocksNBytes + unref blocksNBytes + garbage blocksNBytes + underrep blocksNBytes + unachievable blocksNBytes + justright blocksNBytes + desired blocksNBytes + current blocksNBytes + pulls int + pullsDeferred int + trashes int + trashesDeferred int + replHistogram []int + classStats map[string]replicationStats // collectionBytes / collectionBlockBytes = deduplication ratio collectionBytes int64 // sum(bytes in referenced blocks) across all collections @@ -1060,7 +1101,9 @@ func (bal *Balancer) collectStatistics(results <-chan balanceResult) { } for _, srv := range bal.KeepServices { s.pulls += len(srv.ChangeSet.Pulls) + s.pullsDeferred += srv.ChangeSet.PullsDeferred s.trashes += len(srv.ChangeSet.Trashes) + s.trashesDeferred += srv.ChangeSet.TrashesDeferred } bal.stats = s bal.Metrics.UpdateStats(s)