MinMtime int64
classes []string
+ mounts int
mountsByClass map[string]map[*KeepMount]bool
collScanned int
serviceRoots map[string]string
// This just calls balanceBlock() once for each block, using a
// pool of worker goroutines.
defer timeMe(bal.Logger, "ComputeChangeSets")()
- bal.setupCaches()
+ bal.setupLookupTables()
type balanceTask struct {
blkid arvados.SizedDigest
blk *BlockState
}
- nWorkers := 1 + runtime.NumCPU()
- todo := make(chan balanceTask, nWorkers)
- results := make(chan balanceResult, 16)
- var wg sync.WaitGroup
- for i := 0; i < nWorkers; i++ {
- wg.Add(1)
- go func() {
- for work := range todo {
- results <- bal.balanceBlock(work.blkid, work.blk)
+ workers := runtime.GOMAXPROCS(-1)
+ todo := make(chan balanceTask, workers)
+ go func() {
+ bal.BlockStateMap.Apply(func(blkid arvados.SizedDigest, blk *BlockState) {
+ todo <- balanceTask{
+ blkid: blkid,
+ blk: blk,
}
- wg.Done()
- }()
- }
- bal.BlockStateMap.Apply(func(blkid arvados.SizedDigest, blk *BlockState) {
- todo <- balanceTask{
- blkid: blkid,
- blk: blk,
- }
- })
- close(todo)
+ })
+ close(todo)
+ }()
+ results := make(chan balanceResult, workers)
go func() {
+ var wg sync.WaitGroup
+ for i := 0; i < workers; i++ {
+ wg.Add(1)
+ go func() {
+ for work := range todo {
+ results <- bal.balanceBlock(work.blkid, work.blk)
+ }
+ wg.Done()
+ }()
+ }
wg.Wait()
close(results)
}()
bal.collectStatistics(results)
}
-func (bal *Balancer) setupCaches() {
+func (bal *Balancer) setupLookupTables() {
bal.serviceRoots = make(map[string]string)
bal.classes = []string{"default"}
bal.mountsByClass = map[string]map[*KeepMount]bool{"default": {}}
+ bal.mounts = 0
for _, srv := range bal.KeepServices {
bal.serviceRoots[srv.UUID] = srv.UUID
for _, mnt := range srv.mounts {
+ bal.mounts++
+
// All mounts on a read-only service are
// effectively read-only.
mnt.ReadOnly = mnt.ReadOnly || srv.ReadOnly
}
type balanceResult struct {
- blk *BlockState
- blkid arvados.SizedDigest
- have int
- want int
+ blk *BlockState
+ blkid arvados.SizedDigest
+ have int
+ want int
+ classState map[string]balancedBlockState
}
// balanceBlock compares current state to desired state for a single
}
// Build a list of all slots (one per mounted volume).
- var slots []slot
+ slots := make([]slot, 0, bal.mounts)
for _, srv := range bal.KeepServices {
for _, mnt := range srv.mounts {
var repl *Replica
// won't want to trash any replicas.
underreplicated := false
+ classState := make(map[string]balancedBlockState, len(bal.classes))
unsafeToDelete := make(map[int64]bool, len(slots))
for _, class := range bal.classes {
desired := blk.Desired[class]
+
+ have := 0
+ for _, slot := range slots {
+ if slot.repl != nil && bal.mountsByClass[class][slot.mnt] {
+ have++
+ }
+ }
+ classState[class] = balancedBlockState{
+ desired: desired,
+ surplus: have - desired,
+ }
+
if desired == 0 {
continue
}
+
// Sort the slots by desirability.
sort.Slice(slots, func(i, j int) bool {
si, sj := slots[i], slots[j]
}
underreplicated = safe < desired
}
+
+ // set the unachievable flag if there aren't enough
+ // slots offering the relevant storage class. (This is
+ // as easy as checking slots[desired] because we
+ // already sorted the qualifying slots to the front.)
+ if desired >= len(slots) || !bal.mountsByClass[class][slots[desired].mnt] {
+ cs := classState[class]
+ cs.unachievable = true
+ classState[class] = cs
+ }
}
// TODO: If multiple replicas are trashable, prefer the oldest
bal.Dumper.Printf("%s have=%d want=%v %s", blkid, have, want, strings.Join(changes, " "))
}
return balanceResult{
- blk: blk,
- blkid: blkid,
- have: have,
- want: want,
+ blk: blk,
+ blkid: blkid,
+ have: have,
+ want: want,
+ classState: classState,
}
}
}
type balancerStats struct {
- lost, overrep, unref, garbage, underrep, justright blocksNBytes
- desired, current blocksNBytes
- pulls, trashes int
- replHistogram []int
+ lost blocksNBytes
+ overrep blocksNBytes
+ unref blocksNBytes
+ garbage blocksNBytes
+ underrep blocksNBytes
+ unachievable blocksNBytes
+ justright blocksNBytes
+ desired blocksNBytes
+ current blocksNBytes
+ pulls int
+ trashes int
+ replHistogram []int
+ classStats map[string]replicationStats
+}
+
+type replicationStats struct {
+ desired blocksNBytes
+ surplus blocksNBytes
+ short blocksNBytes
+ unachievable blocksNBytes
+}
+
+type balancedBlockState struct {
+ desired int
+ surplus int
+ unachievable bool
}
func (bal *Balancer) collectStatistics(results <-chan balanceResult) {
var s balancerStats
s.replHistogram = make([]int, 2)
+ s.classStats = make(map[string]replicationStats, len(bal.classes))
for result := range results {
surplus := result.have - result.want
bytes := result.blkid.Size()
+
+ for class, state := range result.classState {
+ cs := s.classStats[class]
+ if state.unachievable {
+ cs.unachievable.blocks++
+ cs.unachievable.bytes += bytes
+ }
+ if state.desired > 0 {
+ cs.desired.replicas += state.desired
+ cs.desired.blocks++
+ cs.desired.bytes += bytes * int64(state.desired)
+ }
+ if state.surplus > 0 {
+ cs.surplus.replicas += state.surplus
+ cs.surplus.blocks++
+ cs.surplus.bytes += bytes * int64(state.surplus)
+ } else if state.surplus < 0 {
+ cs.short.replicas += -state.surplus
+ cs.short.blocks++
+ cs.short.bytes += bytes * int64(-state.surplus)
+ }
+ s.classStats[class] = cs
+ }
+
switch {
case result.have == 0 && result.want > 0:
s.lost.replicas -= surplus
bal.logf("%s overreplicated (have>want>0)", bal.stats.overrep)
bal.logf("%s unreferenced (have>want=0, new)", bal.stats.unref)
bal.logf("%s garbage (have>want=0, old)", bal.stats.garbage)
+ for _, class := range bal.classes {
+ cs := bal.stats.classStats[class]
+ bal.logf("===")
+ bal.logf("storage class %q: %s desired", class, cs.desired)
+ bal.logf("storage class %q: %s short", class, cs.short)
+ bal.logf("storage class %q: %s surplus", class, cs.surplus)
+ bal.logf("storage class %q: %s unachievable", class, cs.unachievable)
+ }
bal.logf("===")
bal.logf("%s total commitment (excluding unreferenced)", bal.stats.desired)
bal.logf("%s total usage", bal.stats.current)