projects
/
arvados.git
/ blobdiff
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
Merge branch '12430-output-glob'
[arvados.git]
/
services
/
keep-balance
/
balance.go
diff --git
a/services/keep-balance/balance.go
b/services/keep-balance/balance.go
index e44dfeda8748aec51e18cd55118c15d509d8fc12..e71eb07efa6979fa005c4a59faf70f7c3187519b 100644
(file)
--- a/
services/keep-balance/balance.go
+++ b/
services/keep-balance/balance.go
@@
-137,7
+137,7
@@
func (bal *Balancer) Run(ctx context.Context, client *arvados.Client, cluster *a
client.Timeout = 0
rs := bal.rendezvousState()
client.Timeout = 0
rs := bal.rendezvousState()
- if
runOptions.CommitTrash
&& rs != runOptions.SafeRendezvousState {
+ if
cluster.Collections.BalanceTrashLimit > 0
&& rs != runOptions.SafeRendezvousState {
if runOptions.SafeRendezvousState != "" {
bal.logf("notice: KeepServices list has changed since last run")
}
if runOptions.SafeRendezvousState != "" {
bal.logf("notice: KeepServices list has changed since last run")
}
@@
-155,6
+155,7
@@
func (bal *Balancer) Run(ctx context.Context, client *arvados.Client, cluster *a
if err = bal.GetCurrentState(ctx, client, cluster.Collections.BalanceCollectionBatch, cluster.Collections.BalanceCollectionBuffers); err != nil {
return
}
if err = bal.GetCurrentState(ctx, client, cluster.Collections.BalanceCollectionBatch, cluster.Collections.BalanceCollectionBuffers); err != nil {
return
}
+ bal.setupLookupTables(cluster)
bal.ComputeChangeSets()
bal.PrintStatistics()
if err = bal.CheckSanityLate(); err != nil {
bal.ComputeChangeSets()
bal.PrintStatistics()
if err = bal.CheckSanityLate(); err != nil {
@@
-171,14
+172,14
@@
func (bal *Balancer) Run(ctx context.Context, client *arvados.Client, cluster *a
}
lbFile = nil
}
}
lbFile = nil
}
- if
runOptions.CommitPulls
{
+ if
cluster.Collections.BalancePullLimit > 0
{
err = bal.CommitPulls(ctx, client)
if err != nil {
// Skip trash if we can't pull. (Too cautious?)
return
}
}
err = bal.CommitPulls(ctx, client)
if err != nil {
// Skip trash if we can't pull. (Too cautious?)
return
}
}
- if
runOptions.CommitTrash
{
+ if
cluster.Collections.BalanceTrashLimit > 0
{
err = bal.CommitTrash(ctx, client)
if err != nil {
return
err = bal.CommitTrash(ctx, client)
if err != nil {
return
@@
-542,7
+543,6
@@
func (bal *Balancer) ComputeChangeSets() {
// This just calls balanceBlock() once for each block, using a
// pool of worker goroutines.
defer bal.time("changeset_compute", "wall clock time to compute changesets")()
// This just calls balanceBlock() once for each block, using a
// pool of worker goroutines.
defer bal.time("changeset_compute", "wall clock time to compute changesets")()
- bal.setupLookupTables()
type balanceTask struct {
blkid arvados.SizedDigest
type balanceTask struct {
blkid arvados.SizedDigest
@@
-577,7
+577,7
@@
func (bal *Balancer) ComputeChangeSets() {
bal.collectStatistics(results)
}
bal.collectStatistics(results)
}
-func (bal *Balancer) setupLookupTables() {
+func (bal *Balancer) setupLookupTables(
cluster *arvados.Cluster
) {
bal.serviceRoots = make(map[string]string)
bal.classes = defaultClasses
bal.mountsByClass = map[string]map[*KeepMount]bool{"default": {}}
bal.serviceRoots = make(map[string]string)
bal.classes = defaultClasses
bal.mountsByClass = map[string]map[*KeepMount]bool{"default": {}}
@@
-609,6
+609,13
@@
func (bal *Balancer) setupLookupTables() {
// class" case in balanceBlock depends on the order classes
// are considered.
sort.Strings(bal.classes)
// class" case in balanceBlock depends on the order classes
// are considered.
sort.Strings(bal.classes)
+
+ for _, srv := range bal.KeepServices {
+ srv.ChangeSet = &ChangeSet{
+ PullLimit: cluster.Collections.BalancePullLimit,
+ TrashLimit: cluster.Collections.BalanceTrashLimit,
+ }
+ }
}
const (
}
const (
@@
-957,19
+964,21
@@
type replicationStats struct {
}
type balancerStats struct {
}
type balancerStats struct {
- lost blocksNBytes
- overrep blocksNBytes
- unref blocksNBytes
- garbage blocksNBytes
- underrep blocksNBytes
- unachievable blocksNBytes
- justright blocksNBytes
- desired blocksNBytes
- current blocksNBytes
- pulls int
- trashes int
- replHistogram []int
- classStats map[string]replicationStats
+ lost blocksNBytes
+ overrep blocksNBytes
+ unref blocksNBytes
+ garbage blocksNBytes
+ underrep blocksNBytes
+ unachievable blocksNBytes
+ justright blocksNBytes
+ desired blocksNBytes
+ current blocksNBytes
+ pulls int
+ pullsDeferred int
+ trashes int
+ trashesDeferred int
+ replHistogram []int
+ classStats map[string]replicationStats
// collectionBytes / collectionBlockBytes = deduplication ratio
collectionBytes int64 // sum(bytes in referenced blocks) across all collections
// collectionBytes / collectionBlockBytes = deduplication ratio
collectionBytes int64 // sum(bytes in referenced blocks) across all collections
@@
-1092,7
+1101,9
@@
func (bal *Balancer) collectStatistics(results <-chan balanceResult) {
}
for _, srv := range bal.KeepServices {
s.pulls += len(srv.ChangeSet.Pulls)
}
for _, srv := range bal.KeepServices {
s.pulls += len(srv.ChangeSet.Pulls)
+ s.pullsDeferred += srv.ChangeSet.PullsDeferred
s.trashes += len(srv.ChangeSet.Trashes)
s.trashes += len(srv.ChangeSet.Trashes)
+ s.trashesDeferred += srv.ChangeSet.TrashesDeferred
}
bal.stats = s
bal.Metrics.UpdateStats(s)
}
bal.stats = s
bal.Metrics.UpdateStats(s)