projects
/
arvados.git
/ blobdiff
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
10181: Merge branch 'master' into 10181-incremental-log
[arvados.git]
/
services
/
keep-balance
/
balance.go
diff --git
a/services/keep-balance/balance.go
b/services/keep-balance/balance.go
index c7fa88230307e3eb47d06fac094b4768f3c7e2c8..d86234a936cc96702f3a79d12c10d04548c0faa2 100644
(file)
--- a/
services/keep-balance/balance.go
+++ b/
services/keep-balance/balance.go
@@
-95,7
+95,7
@@
func (bal *Balancer) Run(config Config, runOptions RunOptions) (nextRunOptions R
return
}
}
return
}
}
- bal.
dedupDevice
s()
+ bal.
cleanupMount
s()
if err = bal.CheckSanityEarly(&config.Client); err != nil {
return
if err = bal.CheckSanityEarly(&config.Client); err != nil {
return
@@
-170,7
+170,7
@@
func (bal *Balancer) DiscoverKeepServices(c *arvados.Client, okTypes []string) e
})
}
})
}
-func (bal *Balancer)
dedupDevice
s() {
+func (bal *Balancer)
cleanupMount
s() {
rwdev := map[string]*KeepService{}
for _, srv := range bal.KeepServices {
for _, mnt := range srv.mounts {
rwdev := map[string]*KeepService{}
for _, srv := range bal.KeepServices {
for _, mnt := range srv.mounts {
@@
-192,6
+192,14
@@
func (bal *Balancer) dedupDevices() {
}
srv.mounts = dedup
}
}
srv.mounts = dedup
}
+ for _, srv := range bal.KeepServices {
+ for _, mnt := range srv.mounts {
+ if mnt.Replication <= 0 {
+ log.Printf("%s: mount %s reports replication=%d, using replication=1", srv, mnt.UUID, mnt.Replication)
+ mnt.Replication = 1
+ }
+ }
+ }
}
// CheckSanityEarly checks for configuration and runtime errors that
}
// CheckSanityEarly checks for configuration and runtime errors that
@@
-554,7
+562,7
@@
func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) ba
have := 0
for _, slot := range slots {
if slot.repl != nil && bal.mountsByClass[class][slot.mnt] && !countedDev[slot.mnt.DeviceID] {
have := 0
for _, slot := range slots {
if slot.repl != nil && bal.mountsByClass[class][slot.mnt] && !countedDev[slot.mnt.DeviceID] {
- have
++
+ have
+= slot.mnt.Replication
if slot.mnt.DeviceID != "" {
countedDev[slot.mnt.DeviceID] = true
}
if slot.mnt.DeviceID != "" {
countedDev[slot.mnt.DeviceID] = true
}
@@
-614,30
+622,36
@@
func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) ba
// trashing replicas that aren't optimal positions for
// any storage class.
protMnt := map[*KeepMount]bool{}
// trashing replicas that aren't optimal positions for
// any storage class.
protMnt := map[*KeepMount]bool{}
+ // Replication planned so far (corresponds to wantMnt).
+ replWant := 0
+ // Protected replication (corresponds to protMnt).
+ replProt := 0
// trySlot tries using a slot to meet requirements,
// and returns true if all requirements are met.
trySlot := func(i int) bool {
slot := slots[i]
// trySlot tries using a slot to meet requirements,
// and returns true if all requirements are met.
trySlot := func(i int) bool {
slot := slots[i]
- if wantDev[slot.mnt.DeviceID] {
+ if want
Mnt[slot.mnt] || want
Dev[slot.mnt.DeviceID] {
// Already allocated a replica to this
// backend device, possibly on a
// different server.
return false
}
// Already allocated a replica to this
// backend device, possibly on a
// different server.
return false
}
- if
len(protMnt) < desired && slot.repl != nil
{
+ if
replProt < desired && slot.repl != nil && !protMnt[slot.mnt]
{
unsafeToDelete[slot.repl.Mtime] = true
protMnt[slot.mnt] = true
unsafeToDelete[slot.repl.Mtime] = true
protMnt[slot.mnt] = true
+ replProt += slot.mnt.Replication
}
}
- if
len(wantMnt)
< desired && (slot.repl != nil || !slot.mnt.ReadOnly) {
+ if
replWant
< desired && (slot.repl != nil || !slot.mnt.ReadOnly) {
slots[i].want = true
wantSrv[slot.mnt.KeepService] = true
wantMnt[slot.mnt] = true
if slot.mnt.DeviceID != "" {
wantDev[slot.mnt.DeviceID] = true
}
slots[i].want = true
wantSrv[slot.mnt.KeepService] = true
wantMnt[slot.mnt] = true
if slot.mnt.DeviceID != "" {
wantDev[slot.mnt.DeviceID] = true
}
+ replWant += slot.mnt.Replication
}
}
- return
len(protMnt) >= desired && len(wantMnt)
>= desired
+ return
replProt >= desired && replWant
>= desired
}
// First try to achieve desired replication without
}
// First try to achieve desired replication without
@@
-664,7
+678,7
@@
func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) ba
if slot.repl == nil || !bal.mountsByClass[class][slot.mnt] {
continue
}
if slot.repl == nil || !bal.mountsByClass[class][slot.mnt] {
continue
}
- if safe
++
; safe >= desired {
+ if safe
+= slot.mnt.Replication
; safe >= desired {
break
}
}
break
}
}
@@
-699,14
+713,17
@@
func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) ba
countedDev := map[string]bool{}
var have, want int
for _, slot := range slots {
countedDev := map[string]bool{}
var have, want int
for _, slot := range slots {
+ if countedDev[slot.mnt.DeviceID] {
+ continue
+ }
if slot.want {
if slot.want {
- want
++
+ want
+= slot.mnt.Replication
}
}
- if slot.repl != nil
&& !countedDev[slot.mnt.DeviceID]
{
- have
++
- if slot.mnt.DeviceID != "" {
- countedDev[slot.mnt.DeviceID] = true
- }
+ if slot.repl != nil {
+ have
+= slot.mnt.Replication
+ }
+ if slot.mnt.DeviceID != "" {
+ countedDev[slot.mnt.DeviceID] = true
}
}
}
}
@@
-848,7
+865,7
@@
func (bal *Balancer) collectStatistics(results <-chan balanceResult) {
case surplus > 0:
s.overrep.replicas += surplus
s.overrep.blocks++
case surplus > 0:
s.overrep.replicas += surplus
s.overrep.blocks++
- s.overrep.bytes += bytes * int64(
len(result.blk.Replicas)
-result.want)
+ s.overrep.bytes += bytes * int64(
result.have
-result.want)
default:
s.justright.replicas += result.want
s.justright.blocks++
default:
s.justright.replicas += result.want
s.justright.blocks++
@@
-860,16
+877,16
@@
func (bal *Balancer) collectStatistics(results <-chan balanceResult) {
s.desired.blocks++
s.desired.bytes += bytes * int64(result.want)
}
s.desired.blocks++
s.desired.bytes += bytes * int64(result.want)
}
- if
len(result.blk.Replicas)
> 0 {
- s.current.replicas +=
len(result.blk.Replicas)
+ if
result.have
> 0 {
+ s.current.replicas +=
result.have
s.current.blocks++
s.current.blocks++
- s.current.bytes += bytes * int64(
len(result.blk.Replicas)
)
+ s.current.bytes += bytes * int64(
result.have
)
}
}
- for len(s.replHistogram) <=
len(result.blk.Replicas)
{
+ for len(s.replHistogram) <=
result.have
{
s.replHistogram = append(s.replHistogram, 0)
}
s.replHistogram = append(s.replHistogram, 0)
}
- s.replHistogram[
len(result.blk.Replicas)
]++
+ s.replHistogram[
result.have
]++
}
for _, srv := range bal.KeepServices {
s.pulls += len(srv.ChangeSet.Pulls)
}
for _, srv := range bal.KeepServices {
s.pulls += len(srv.ChangeSet.Pulls)