return
}
}
+ bal.dedupDevices()
if err = bal.CheckSanityEarly(&config.Client); err != nil {
return
})
}
+func (bal *Balancer) dedupDevices() {
+ rwdev := map[string]*KeepService{}
+ for _, srv := range bal.KeepServices {
+ for _, mnt := range srv.mounts {
+ if !mnt.ReadOnly && mnt.DeviceID != "" {
+ rwdev[mnt.DeviceID] = srv
+ }
+ }
+ }
+ // Drop the readonly mounts whose device is mounted RW
+ // elsewhere.
+ for _, srv := range bal.KeepServices {
+ var dedup []*KeepMount
+ for _, mnt := range srv.mounts {
+ if mnt.ReadOnly && rwdev[mnt.DeviceID] != nil {
+ bal.logf("skipping srv %s readonly mount %q because same device %q is mounted read-write on srv %s", srv, mnt.UUID, mnt.DeviceID, rwdev[mnt.DeviceID])
+ } else {
+ dedup = append(dedup, mnt)
+ }
+ }
+ srv.mounts = dedup
+ }
+}
+
// CheckSanityEarly checks for configuration and runtime errors that
// can be detected before GetCurrentState() and ComputeChangeSets()
// are called.
errs := make(chan error, 2+len(bal.KeepServices))
wg := sync.WaitGroup{}
- // Start one goroutine for each KeepService: retrieve the
- // index, and add the returned blocks to BlockStateMap.
+ // When a device is mounted more than once, we will get its
+ // index only once, and call AddReplicas on all of the mounts.
+ // equivMount keys are the mounts that will be indexed, and
+ // each value is a list of mounts to apply the received index
+ // to.
+ equivMount := map[*KeepMount][]*KeepMount{}
+ // deviceMount maps each device ID to the one mount that will
+ // be indexed for that device.
+ deviceMount := map[string]*KeepMount{}
for _, srv := range bal.KeepServices {
+ for _, mnt := range srv.mounts {
+ equiv := deviceMount[mnt.DeviceID]
+ if equiv == nil {
+ equiv = mnt
+ if mnt.DeviceID != "" {
+ deviceMount[mnt.DeviceID] = equiv
+ }
+ }
+ equivMount[equiv] = append(equivMount[equiv], mnt)
+ }
+ }
+
+ // Start one goroutine for each (non-redundant) mount:
+ // retrieve the index, and add the returned blocks to
+ // BlockStateMap.
+ for _, mounts := range equivMount {
wg.Add(1)
- go func(srv *KeepService) {
+ go func(mounts []*KeepMount) {
defer wg.Done()
- bal.logf("%s: retrieve indexes", srv)
- for _, mount := range srv.mounts {
- bal.logf("%s: retrieve index", mount)
- idx, err := srv.IndexMount(c, mount.UUID, "")
- if err != nil {
- errs <- fmt.Errorf("%s: retrieve index: %v", mount, err)
- return
- }
- if len(errs) > 0 {
- // Some other goroutine encountered an
- // error -- any further effort here
- // will be wasted.
- return
- }
+ bal.logf("mount %s: retrieve index from %s", mounts[0], mounts[0].KeepService)
+ idx, err := mounts[0].KeepService.IndexMount(c, mounts[0].UUID, "")
+ if err != nil {
+ errs <- fmt.Errorf("%s: retrieve index: %v", mounts[0], err)
+ return
+ }
+ if len(errs) > 0 {
+ // Some other goroutine encountered an
+ // error -- any further effort here
+ // will be wasted.
+ return
+ }
+ for _, mount := range mounts {
bal.logf("%s: add %d replicas to map", mount, len(idx))
bal.BlockStateMap.AddReplicas(mount, idx)
- bal.logf("%s: done", mount)
+ bal.logf("%s: added %d replicas", mount, len(idx))
}
- bal.logf("%s: done", srv)
- }(srv)
+ bal.logf("mount %s: index done", mounts[0])
+ }(mounts)
}
// collQ buffers incoming collections so we can start fetching
blkid arvados.SizedDigest
blk *BlockState
}
- nWorkers := 1 + runtime.NumCPU()
- todo := make(chan balanceTask, nWorkers)
- results := make(chan balanceResult, 16)
- var wg sync.WaitGroup
- for i := 0; i < nWorkers; i++ {
- wg.Add(1)
- go func() {
- for work := range todo {
- results <- bal.balanceBlock(work.blkid, work.blk)
+ workers := runtime.GOMAXPROCS(-1)
+ todo := make(chan balanceTask, workers)
+ go func() {
+ bal.BlockStateMap.Apply(func(blkid arvados.SizedDigest, blk *BlockState) {
+ todo <- balanceTask{
+ blkid: blkid,
+ blk: blk,
}
- wg.Done()
- }()
- }
- bal.BlockStateMap.Apply(func(blkid arvados.SizedDigest, blk *BlockState) {
- todo <- balanceTask{
- blkid: blkid,
- blk: blk,
- }
- })
- close(todo)
+ })
+ close(todo)
+ }()
+ results := make(chan balanceResult, workers)
go func() {
+ var wg sync.WaitGroup
+ for i := 0; i < workers; i++ {
+ wg.Add(1)
+ go func() {
+ for work := range todo {
+ results <- bal.balanceBlock(work.blkid, work.blk)
+ }
+ wg.Done()
+ }()
+ }
wg.Wait()
close(results)
}()
}
type balanceResult struct {
- blk *BlockState
- blkid arvados.SizedDigest
- have int
- want int
+ blk *BlockState
+ blkid arvados.SizedDigest
+ have int
+ want int
+ classState map[string]balancedBlockState
}
// balanceBlock compares current state to desired state for a single
// won't want to trash any replicas.
underreplicated := false
+ classState := make(map[string]balancedBlockState, len(bal.classes))
unsafeToDelete := make(map[int64]bool, len(slots))
for _, class := range bal.classes {
desired := blk.Desired[class]
+
+ countedDev := map[string]bool{}
+ have := 0
+ for _, slot := range slots {
+ if slot.repl != nil && bal.mountsByClass[class][slot.mnt] && !countedDev[slot.mnt.DeviceID] {
+ have++
+ if slot.mnt.DeviceID != "" {
+ countedDev[slot.mnt.DeviceID] = true
+ }
+ }
+ }
+ classState[class] = balancedBlockState{
+ desired: desired,
+ surplus: have - desired,
+ }
+
if desired == 0 {
continue
}
+
// Sort the slots by desirability.
sort.Slice(slots, func(i, j int) bool {
si, sj := slots[i], slots[j]
}
})
- // Servers and mounts (with or without existing
+ // Servers/mounts/devices (with or without existing
// replicas) that are part of the best achievable
// layout for this storage class.
wantSrv := map[*KeepService]bool{}
wantMnt := map[*KeepMount]bool{}
+ wantDev := map[string]bool{}
// Positions (with existing replicas) that have been
// protected (via unsafeToDelete) to ensure we don't
// reduce replication below desired level when
// and returns true if all requirements are met.
trySlot := func(i int) bool {
slot := slots[i]
+ if wantDev[slot.mnt.DeviceID] {
+ // Already allocated a replica to this
+ // backend device, possibly on a
+ // different server.
+ return false
+ }
if len(protMnt) < desired && slot.repl != nil {
unsafeToDelete[slot.repl.Mtime] = true
protMnt[slot.mnt] = true
slots[i].want = true
wantSrv[slot.mnt.KeepService] = true
wantMnt[slot.mnt] = true
+ if slot.mnt.DeviceID != "" {
+ wantDev[slot.mnt.DeviceID] = true
+ }
}
return len(protMnt) >= desired && len(wantMnt) >= desired
}
}
underreplicated = safe < desired
}
+
+ // set the unachievable flag if there aren't enough
+ // slots offering the relevant storage class. (This is
+ // as easy as checking slots[desired] because we
+ // already sorted the qualifying slots to the front.)
+ if desired >= len(slots) || !bal.mountsByClass[class][slots[desired].mnt] {
+ cs := classState[class]
+ cs.unachievable = true
+ classState[class] = cs
+ }
+
+ // Avoid deleting wanted replicas from devices that
+ // are mounted on multiple servers -- even if they
+ // haven't already been added to unsafeToDelete
+ // because the servers report different Mtimes.
+ for _, slot := range slots {
+ if slot.repl != nil && wantDev[slot.mnt.DeviceID] {
+ unsafeToDelete[slot.repl.Mtime] = true
+ }
+ }
}
// TODO: If multiple replicas are trashable, prefer the oldest
// replica that doesn't have a timestamp collision with
// others.
+ countedDev := map[string]bool{}
var have, want int
for _, slot := range slots {
if slot.want {
want++
}
- if slot.repl != nil {
+ if slot.repl != nil && !countedDev[slot.mnt.DeviceID] {
have++
+ if slot.mnt.DeviceID != "" {
+ countedDev[slot.mnt.DeviceID] = true
+ }
}
}
bal.Dumper.Printf("%s have=%d want=%v %s", blkid, have, want, strings.Join(changes, " "))
}
return balanceResult{
- blk: blk,
- blkid: blkid,
- have: have,
- want: want,
+ blk: blk,
+ blkid: blkid,
+ have: have,
+ want: want,
+ classState: classState,
}
}
}
type balancerStats struct {
- lost, overrep, unref, garbage, underrep, justright blocksNBytes
- desired, current blocksNBytes
- pulls, trashes int
- replHistogram []int
+ lost blocksNBytes
+ overrep blocksNBytes
+ unref blocksNBytes
+ garbage blocksNBytes
+ underrep blocksNBytes
+ unachievable blocksNBytes
+ justright blocksNBytes
+ desired blocksNBytes
+ current blocksNBytes
+ pulls int
+ trashes int
+ replHistogram []int
+ classStats map[string]replicationStats
+}
+
+type replicationStats struct {
+ desired blocksNBytes
+ surplus blocksNBytes
+ short blocksNBytes
+ unachievable blocksNBytes
+}
+
+type balancedBlockState struct {
+ desired int
+ surplus int
+ unachievable bool
}
func (bal *Balancer) collectStatistics(results <-chan balanceResult) {
var s balancerStats
s.replHistogram = make([]int, 2)
+ s.classStats = make(map[string]replicationStats, len(bal.classes))
for result := range results {
surplus := result.have - result.want
bytes := result.blkid.Size()
+
+ for class, state := range result.classState {
+ cs := s.classStats[class]
+ if state.unachievable {
+ cs.unachievable.blocks++
+ cs.unachievable.bytes += bytes
+ }
+ if state.desired > 0 {
+ cs.desired.replicas += state.desired
+ cs.desired.blocks++
+ cs.desired.bytes += bytes * int64(state.desired)
+ }
+ if state.surplus > 0 {
+ cs.surplus.replicas += state.surplus
+ cs.surplus.blocks++
+ cs.surplus.bytes += bytes * int64(state.surplus)
+ } else if state.surplus < 0 {
+ cs.short.replicas += -state.surplus
+ cs.short.blocks++
+ cs.short.bytes += bytes * int64(-state.surplus)
+ }
+ s.classStats[class] = cs
+ }
+
switch {
case result.have == 0 && result.want > 0:
s.lost.replicas -= surplus
bal.logf("%s overreplicated (have>want>0)", bal.stats.overrep)
bal.logf("%s unreferenced (have>want=0, new)", bal.stats.unref)
bal.logf("%s garbage (have>want=0, old)", bal.stats.garbage)
+ for _, class := range bal.classes {
+ cs := bal.stats.classStats[class]
+ bal.logf("===")
+ bal.logf("storage class %q: %s desired", class, cs.desired)
+ bal.logf("storage class %q: %s short", class, cs.short)
+ bal.logf("storage class %q: %s surplus", class, cs.surplus)
+ bal.logf("storage class %q: %s unachievable", class, cs.unachievable)
+ }
bal.logf("===")
bal.logf("%s total commitment (excluding unreferenced)", bal.stats.desired)
bal.logf("%s total usage", bal.stats.current)