Merge branch '13513-balance-deadlock'
[arvados.git] / services / keep-balance / balance.go
index 5359bc1eaf675498fee05fe409d2504284de4dea..d6a2dde9d74005c0e9fca4d87ccbc7dd1ece5243 100644 (file)
@@ -365,27 +365,29 @@ func (bal *Balancer) ComputeChangeSets() {
                blkid arvados.SizedDigest
                blk   *BlockState
        }
-       nWorkers := 1 + runtime.NumCPU()
-       todo := make(chan balanceTask, nWorkers)
-       results := make(chan balanceResult, 16)
-       var wg sync.WaitGroup
-       for i := 0; i < nWorkers; i++ {
-               wg.Add(1)
-               go func() {
-                       for work := range todo {
-                               results <- bal.balanceBlock(work.blkid, work.blk)
+       workers := runtime.GOMAXPROCS(-1)
+       todo := make(chan balanceTask, workers)
+       go func() {
+               bal.BlockStateMap.Apply(func(blkid arvados.SizedDigest, blk *BlockState) {
+                       todo <- balanceTask{
+                               blkid: blkid,
+                               blk:   blk,
                        }
-                       wg.Done()
-               }()
-       }
-       bal.BlockStateMap.Apply(func(blkid arvados.SizedDigest, blk *BlockState) {
-               todo <- balanceTask{
-                       blkid: blkid,
-                       blk:   blk,
-               }
-       })
-       close(todo)
+               })
+               close(todo)
+       }()
+       results := make(chan balanceResult, workers)
        go func() {
+               var wg sync.WaitGroup
+               for i := 0; i < workers; i++ {
+                       wg.Add(1)
+                       go func() {
+                               for work := range todo {
+                                       results <- bal.balanceBlock(work.blkid, work.blk)
+                               }
+                               wg.Done()
+                       }()
+               }
                wg.Wait()
                close(results)
        }()
@@ -443,10 +445,11 @@ var changeName = map[int]string{
 }
 
 type balanceResult struct {
-       blk   *BlockState
-       blkid arvados.SizedDigest
-       have  int
-       want  int
+       blk        *BlockState
+       blkid      arvados.SizedDigest
+       have       int
+       want       int
+       classState map[string]balancedBlockState
 }
 
 // balanceBlock compares current state to desired state for a single
@@ -495,12 +498,26 @@ func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) ba
        // won't want to trash any replicas.
        underreplicated := false
 
+       classState := make(map[string]balancedBlockState, len(bal.classes))
        unsafeToDelete := make(map[int64]bool, len(slots))
        for _, class := range bal.classes {
                desired := blk.Desired[class]
+
+               have := 0
+               for _, slot := range slots {
+                       if slot.repl != nil && bal.mountsByClass[class][slot.mnt] {
+                               have++
+                       }
+               }
+               classState[class] = balancedBlockState{
+                       desired: desired,
+                       surplus: have - desired,
+               }
+
                if desired == 0 {
                        continue
                }
+
                // Sort the slots by desirability.
                sort.Slice(slots, func(i, j int) bool {
                        si, sj := slots[i], slots[j]
@@ -592,6 +609,16 @@ func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) ba
                        }
                        underreplicated = safe < desired
                }
+
+               // set the unachievable flag if there aren't enough
+               // slots offering the relevant storage class. (This is
+               // as easy as checking slots[desired] because we
+               // already sorted the qualifying slots to the front.)
+               if desired >= len(slots) || !bal.mountsByClass[class][slots[desired].mnt] {
+                       cs := classState[class]
+                       cs.unachievable = true
+                       classState[class] = cs
+               }
        }
 
        // TODO: If multiple replicas are trashable, prefer the oldest
@@ -645,10 +672,11 @@ func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) ba
                bal.Dumper.Printf("%s have=%d want=%v %s", blkid, have, want, strings.Join(changes, " "))
        }
        return balanceResult{
-               blk:   blk,
-               blkid: blkid,
-               have:  have,
-               want:  want,
+               blk:        blk,
+               blkid:      blkid,
+               have:       have,
+               want:       want,
+               classState: classState,
        }
 }
 
@@ -663,18 +691,65 @@ func (bb blocksNBytes) String() string {
 }
 
 type balancerStats struct {
-       lost, overrep, unref, garbage, underrep, justright blocksNBytes
-       desired, current                                   blocksNBytes
-       pulls, trashes                                     int
-       replHistogram                                      []int
+       lost          blocksNBytes
+       overrep       blocksNBytes
+       unref         blocksNBytes
+       garbage       blocksNBytes
+       underrep      blocksNBytes
+       unachievable  blocksNBytes
+       justright     blocksNBytes
+       desired       blocksNBytes
+       current       blocksNBytes
+       pulls         int
+       trashes       int
+       replHistogram []int
+       classStats    map[string]replicationStats
+}
+
+type replicationStats struct {
+       desired      blocksNBytes
+       surplus      blocksNBytes
+       short        blocksNBytes
+       unachievable blocksNBytes
+}
+
+type balancedBlockState struct {
+       desired      int
+       surplus      int
+       unachievable bool
 }
 
 func (bal *Balancer) collectStatistics(results <-chan balanceResult) {
        var s balancerStats
        s.replHistogram = make([]int, 2)
+       s.classStats = make(map[string]replicationStats, len(bal.classes))
        for result := range results {
                surplus := result.have - result.want
                bytes := result.blkid.Size()
+
+               for class, state := range result.classState {
+                       cs := s.classStats[class]
+                       if state.unachievable {
+                               cs.unachievable.blocks++
+                               cs.unachievable.bytes += bytes
+                       }
+                       if state.desired > 0 {
+                               cs.desired.replicas += state.desired
+                               cs.desired.blocks++
+                               cs.desired.bytes += bytes * int64(state.desired)
+                       }
+                       if state.surplus > 0 {
+                               cs.surplus.replicas += state.surplus
+                               cs.surplus.blocks++
+                               cs.surplus.bytes += bytes * int64(state.surplus)
+                       } else if state.surplus < 0 {
+                               cs.short.replicas += -state.surplus
+                               cs.short.blocks++
+                               cs.short.bytes += bytes * int64(-state.surplus)
+                       }
+                       s.classStats[class] = cs
+               }
+
                switch {
                case result.have == 0 && result.want > 0:
                        s.lost.replicas -= surplus
@@ -739,6 +814,14 @@ func (bal *Balancer) PrintStatistics() {
        bal.logf("%s overreplicated (have>want>0)", bal.stats.overrep)
        bal.logf("%s unreferenced (have>want=0, new)", bal.stats.unref)
        bal.logf("%s garbage (have>want=0, old)", bal.stats.garbage)
+       for _, class := range bal.classes {
+               cs := bal.stats.classStats[class]
+               bal.logf("===")
+               bal.logf("storage class %q: %s desired", class, cs.desired)
+               bal.logf("storage class %q: %s short", class, cs.short)
+               bal.logf("storage class %q: %s surplus", class, cs.surplus)
+               bal.logf("storage class %q: %s unachievable", class, cs.unachievable)
+       }
        bal.logf("===")
        bal.logf("%s total commitment (excluding unreferenced)", bal.stats.desired)
        bal.logf("%s total usage", bal.stats.current)