projects
/
arvados.git
/ blobdiff
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
21200: copied branch from arvados-workbench2 Arvados-DCO-1.1-Signed-off-by: Lisa...
[arvados.git]
/
services
/
keep-balance
/
balance.go
diff --git
a/services/keep-balance/balance.go
b/services/keep-balance/balance.go
index 215c5e1b5be1355e9f6793da54ab0c3148eff242..e71eb07efa6979fa005c4a59faf70f7c3187519b 100644
(file)
--- a/
services/keep-balance/balance.go
+++ b/
services/keep-balance/balance.go
@@
-137,7
+137,7
@@
func (bal *Balancer) Run(ctx context.Context, client *arvados.Client, cluster *a
client.Timeout = 0
rs := bal.rendezvousState()
client.Timeout = 0
rs := bal.rendezvousState()
- if
runOptions.CommitTrash
&& rs != runOptions.SafeRendezvousState {
+ if
cluster.Collections.BalanceTrashLimit > 0
&& rs != runOptions.SafeRendezvousState {
if runOptions.SafeRendezvousState != "" {
bal.logf("notice: KeepServices list has changed since last run")
}
if runOptions.SafeRendezvousState != "" {
bal.logf("notice: KeepServices list has changed since last run")
}
@@
-155,6
+155,7
@@
func (bal *Balancer) Run(ctx context.Context, client *arvados.Client, cluster *a
if err = bal.GetCurrentState(ctx, client, cluster.Collections.BalanceCollectionBatch, cluster.Collections.BalanceCollectionBuffers); err != nil {
return
}
if err = bal.GetCurrentState(ctx, client, cluster.Collections.BalanceCollectionBatch, cluster.Collections.BalanceCollectionBuffers); err != nil {
return
}
+ bal.setupLookupTables(cluster)
bal.ComputeChangeSets()
bal.PrintStatistics()
if err = bal.CheckSanityLate(); err != nil {
bal.ComputeChangeSets()
bal.PrintStatistics()
if err = bal.CheckSanityLate(); err != nil {
@@
-171,14
+172,14
@@
func (bal *Balancer) Run(ctx context.Context, client *arvados.Client, cluster *a
}
lbFile = nil
}
}
lbFile = nil
}
- if
runOptions.CommitPulls
{
+ if
cluster.Collections.BalancePullLimit > 0
{
err = bal.CommitPulls(ctx, client)
if err != nil {
// Skip trash if we can't pull. (Too cautious?)
return
}
}
err = bal.CommitPulls(ctx, client)
if err != nil {
// Skip trash if we can't pull. (Too cautious?)
return
}
}
- if
runOptions.CommitTrash
{
+ if
cluster.Collections.BalanceTrashLimit > 0
{
err = bal.CommitTrash(ctx, client)
if err != nil {
return
err = bal.CommitTrash(ctx, client)
if err != nil {
return
@@
-227,7
+228,7
@@
func (bal *Balancer) cleanupMounts() {
rwdev := map[string]*KeepService{}
for _, srv := range bal.KeepServices {
for _, mnt := range srv.mounts {
rwdev := map[string]*KeepService{}
for _, srv := range bal.KeepServices {
for _, mnt := range srv.mounts {
- if
!mnt.ReadOnly
{
+ if
mnt.AllowWrite
{
rwdev[mnt.UUID] = srv
}
}
rwdev[mnt.UUID] = srv
}
}
@@
-237,7
+238,7
@@
func (bal *Balancer) cleanupMounts() {
for _, srv := range bal.KeepServices {
var dedup []*KeepMount
for _, mnt := range srv.mounts {
for _, srv := range bal.KeepServices {
var dedup []*KeepMount
for _, mnt := range srv.mounts {
- if
mnt.ReadOnly
&& rwdev[mnt.UUID] != nil {
+ if
!mnt.AllowWrite
&& rwdev[mnt.UUID] != nil {
bal.logf("skipping srv %s readonly mount %q because same volume is mounted read-write on srv %s", srv, mnt.UUID, rwdev[mnt.UUID])
} else {
dedup = append(dedup, mnt)
bal.logf("skipping srv %s readonly mount %q because same volume is mounted read-write on srv %s", srv, mnt.UUID, rwdev[mnt.UUID])
} else {
dedup = append(dedup, mnt)
@@
-542,7
+543,6
@@
func (bal *Balancer) ComputeChangeSets() {
// This just calls balanceBlock() once for each block, using a
// pool of worker goroutines.
defer bal.time("changeset_compute", "wall clock time to compute changesets")()
// This just calls balanceBlock() once for each block, using a
// pool of worker goroutines.
defer bal.time("changeset_compute", "wall clock time to compute changesets")()
- bal.setupLookupTables()
type balanceTask struct {
blkid arvados.SizedDigest
type balanceTask struct {
blkid arvados.SizedDigest
@@
-577,7
+577,7
@@
func (bal *Balancer) ComputeChangeSets() {
bal.collectStatistics(results)
}
bal.collectStatistics(results)
}
-func (bal *Balancer) setupLookupTables() {
+func (bal *Balancer) setupLookupTables(
cluster *arvados.Cluster
) {
bal.serviceRoots = make(map[string]string)
bal.classes = defaultClasses
bal.mountsByClass = map[string]map[*KeepMount]bool{"default": {}}
bal.serviceRoots = make(map[string]string)
bal.classes = defaultClasses
bal.mountsByClass = map[string]map[*KeepMount]bool{"default": {}}
@@
-587,9
+587,11
@@
func (bal *Balancer) setupLookupTables() {
for _, mnt := range srv.mounts {
bal.mounts++
for _, mnt := range srv.mounts {
bal.mounts++
- // All mounts on a read-only service are
- // effectively read-only.
- mnt.ReadOnly = mnt.ReadOnly || srv.ReadOnly
+ if srv.ReadOnly {
+ // All mounts on a read-only service
+ // are effectively read-only.
+ mnt.AllowWrite = false
+ }
for class := range mnt.StorageClasses {
if mbc := bal.mountsByClass[class]; mbc == nil {
for class := range mnt.StorageClasses {
if mbc := bal.mountsByClass[class]; mbc == nil {
@@
-607,6
+609,13
@@
func (bal *Balancer) setupLookupTables() {
// class" case in balanceBlock depends on the order classes
// are considered.
sort.Strings(bal.classes)
// class" case in balanceBlock depends on the order classes
// are considered.
sort.Strings(bal.classes)
+
+ for _, srv := range bal.KeepServices {
+ srv.ChangeSet = &ChangeSet{
+ PullLimit: cluster.Collections.BalancePullLimit,
+ TrashLimit: cluster.Collections.BalanceTrashLimit,
+ }
+ }
}
const (
}
const (
@@
-667,7
+676,7
@@
func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) ba
slots = append(slots, slot{
mnt: mnt,
repl: repl,
slots = append(slots, slot{
mnt: mnt,
repl: repl,
- want: repl != nil &&
mnt.ReadOnly
,
+ want: repl != nil &&
!mnt.AllowTrash
,
})
}
}
})
}
}
@@
-756,7
+765,7
@@
func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) ba
protMnt[slot.mnt] = true
replProt += slot.mnt.Replication
}
protMnt[slot.mnt] = true
replProt += slot.mnt.Replication
}
- if replWant < desired && (slot.repl != nil ||
!slot.mnt.ReadOnly
) {
+ if replWant < desired && (slot.repl != nil ||
slot.mnt.AllowWrite
) {
slots[i].want = true
wantSrv[slot.mnt.KeepService] = true
wantMnt[slot.mnt] = true
slots[i].want = true
wantSrv[slot.mnt.KeepService] = true
wantMnt[slot.mnt] = true
@@
-875,7
+884,7
@@
func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) ba
case slot.repl == nil && slot.want && len(blk.Replicas) == 0:
lost = true
change = changeNone
case slot.repl == nil && slot.want && len(blk.Replicas) == 0:
lost = true
change = changeNone
- case slot.repl == nil && slot.want &&
!slot.mnt.ReadOnly
:
+ case slot.repl == nil && slot.want &&
slot.mnt.AllowWrite
:
slot.mnt.KeepService.AddPull(Pull{
SizedDigest: blkid,
From: blk.Replicas[0].KeepMount.KeepService,
slot.mnt.KeepService.AddPull(Pull{
SizedDigest: blkid,
From: blk.Replicas[0].KeepMount.KeepService,
@@
-955,19
+964,21
@@
type replicationStats struct {
}
type balancerStats struct {
}
type balancerStats struct {
- lost blocksNBytes
- overrep blocksNBytes
- unref blocksNBytes
- garbage blocksNBytes
- underrep blocksNBytes
- unachievable blocksNBytes
- justright blocksNBytes
- desired blocksNBytes
- current blocksNBytes
- pulls int
- trashes int
- replHistogram []int
- classStats map[string]replicationStats
+ lost blocksNBytes
+ overrep blocksNBytes
+ unref blocksNBytes
+ garbage blocksNBytes
+ underrep blocksNBytes
+ unachievable blocksNBytes
+ justright blocksNBytes
+ desired blocksNBytes
+ current blocksNBytes
+ pulls int
+ pullsDeferred int
+ trashes int
+ trashesDeferred int
+ replHistogram []int
+ classStats map[string]replicationStats
// collectionBytes / collectionBlockBytes = deduplication ratio
collectionBytes int64 // sum(bytes in referenced blocks) across all collections
// collectionBytes / collectionBlockBytes = deduplication ratio
collectionBytes int64 // sum(bytes in referenced blocks) across all collections
@@
-1090,7
+1101,9
@@
func (bal *Balancer) collectStatistics(results <-chan balanceResult) {
}
for _, srv := range bal.KeepServices {
s.pulls += len(srv.ChangeSet.Pulls)
}
for _, srv := range bal.KeepServices {
s.pulls += len(srv.ChangeSet.Pulls)
+ s.pullsDeferred += srv.ChangeSet.PullsDeferred
s.trashes += len(srv.ChangeSet.Trashes)
s.trashes += len(srv.ChangeSet.Trashes)
+ s.trashesDeferred += srv.ChangeSet.TrashesDeferred
}
bal.stats = s
bal.Metrics.UpdateStats(s)
}
bal.stats = s
bal.Metrics.UpdateStats(s)