Merge branch '15781-multi-value-property-search'
[arvados.git] / services / keep-balance / balance.go
index e50b0b505aee471f30918772f66f18ec0838e31d..3c35d304cb395cf97485cd462f0f78ae31b523d6 100644 (file)
@@ -20,8 +20,8 @@ import (
        "syscall"
        "time"
 
-       "git.curoverse.com/arvados.git/sdk/go/arvados"
-       "git.curoverse.com/arvados.git/sdk/go/keepclient"
+       "git.arvados.org/arvados.git/sdk/go/arvados"
+       "git.arvados.org/arvados.git/sdk/go/keepclient"
        "github.com/sirupsen/logrus"
 )
 
@@ -547,25 +547,32 @@ var changeName = map[int]string{
        changeNone:  "none",
 }
 
+type balancedBlockState struct {
+       needed       int
+       unneeded     int
+       pulling      int
+       unachievable bool
+}
+
 type balanceResult struct {
        blk        *BlockState
        blkid      arvados.SizedDigest
-       have       int
-       want       int
+       lost       bool
+       blockState balancedBlockState
        classState map[string]balancedBlockState
 }
 
+type slot struct {
+       mnt  *KeepMount // never nil
+       repl *Replica   // replica already stored here (or nil)
+       want bool       // we should pull/leave a replica here
+}
+
 // balanceBlock compares current state to desired state for a single
 // block, and makes the appropriate ChangeSet calls.
 func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) balanceResult {
        bal.Logger.Debugf("balanceBlock: %v %+v", blkid, blk)
 
-       type slot struct {
-               mnt  *KeepMount // never nil
-               repl *Replica   // replica already stored here (or nil)
-               want bool       // we should pull/leave a replica here
-       }
-
        // Build a list of all slots (one per mounted volume).
        slots := make([]slot, 0, bal.mounts)
        for _, srv := range bal.KeepServices {
@@ -601,26 +608,9 @@ func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) ba
        // won't want to trash any replicas.
        underreplicated := false
 
-       classState := make(map[string]balancedBlockState, len(bal.classes))
        unsafeToDelete := make(map[int64]bool, len(slots))
        for _, class := range bal.classes {
                desired := blk.Desired[class]
-
-               countedDev := map[string]bool{}
-               have := 0
-               for _, slot := range slots {
-                       if slot.repl != nil && bal.mountsByClass[class][slot.mnt] && !countedDev[slot.mnt.DeviceID] {
-                               have += slot.mnt.Replication
-                               if slot.mnt.DeviceID != "" {
-                                       countedDev[slot.mnt.DeviceID] = true
-                               }
-                       }
-               }
-               classState[class] = balancedBlockState{
-                       desired: desired,
-                       surplus: have - desired,
-               }
-
                if desired == 0 {
                        continue
                }
@@ -733,16 +723,6 @@ func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) ba
                        underreplicated = safe < desired
                }
 
-               // set the unachievable flag if there aren't enough
-               // slots offering the relevant storage class. (This is
-               // as easy as checking slots[desired] because we
-               // already sorted the qualifying slots to the front.)
-               if desired >= len(slots) || !bal.mountsByClass[class][slots[desired].mnt] {
-                       cs := classState[class]
-                       cs.unachievable = true
-                       classState[class] = cs
-               }
-
                // Avoid deleting wanted replicas from devices that
                // are mounted on multiple servers -- even if they
                // haven't already been added to unsafeToDelete
@@ -758,36 +738,40 @@ func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) ba
        // replica that doesn't have a timestamp collision with
        // others.
 
-       countedDev := map[string]bool{}
-       var have, want int
-       for _, slot := range slots {
-               if countedDev[slot.mnt.DeviceID] {
-                       continue
-               }
-               if slot.want {
-                       want += slot.mnt.Replication
-               }
-               if slot.repl != nil {
-                       have += slot.mnt.Replication
-               }
-               if slot.mnt.DeviceID != "" {
-                       countedDev[slot.mnt.DeviceID] = true
+       for i, slot := range slots {
+               // Don't trash (1) any replicas of an underreplicated
+               // block, even if they're in the wrong positions, or
+               // (2) any replicas whose Mtimes are identical to
+               // needed replicas (in case we're really seeing the
+               // same copy via different mounts).
+               if slot.repl != nil && (underreplicated || unsafeToDelete[slot.repl.Mtime]) {
+                       slots[i].want = true
                }
        }
 
+       classState := make(map[string]balancedBlockState, len(bal.classes))
+       for _, class := range bal.classes {
+               classState[class] = computeBlockState(slots, bal.mountsByClass[class], len(blk.Replicas), blk.Desired[class])
+       }
+       blockState := computeBlockState(slots, nil, len(blk.Replicas), 0)
+
+       var lost bool
        var changes []string
        for _, slot := range slots {
                // TODO: request a Touch if Mtime is duplicated.
                var change int
                switch {
-               case !underreplicated && !slot.want && slot.repl != nil && slot.repl.Mtime < bal.MinMtime && !unsafeToDelete[slot.repl.Mtime]:
+               case !slot.want && slot.repl != nil && slot.repl.Mtime < bal.MinMtime:
                        slot.mnt.KeepService.AddTrash(Trash{
                                SizedDigest: blkid,
                                Mtime:       slot.repl.Mtime,
                                From:        slot.mnt,
                        })
                        change = changeTrash
-               case len(blk.Replicas) > 0 && slot.repl == nil && slot.want && !slot.mnt.ReadOnly:
+               case slot.repl == nil && slot.want && len(blk.Replicas) == 0:
+                       lost = true
+                       change = changeNone
+               case slot.repl == nil && slot.want && !slot.mnt.ReadOnly:
                        slot.mnt.KeepService.AddPull(Pull{
                                SizedDigest: blkid,
                                From:        blk.Replicas[0].KeepMount.KeepService,
@@ -809,17 +793,48 @@ func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) ba
                }
        }
        if bal.Dumper != nil {
-               bal.Dumper.Printf("%s refs=%d have=%d want=%v %v %v", blkid, blk.RefCount, have, want, blk.Desired, changes)
+               bal.Dumper.Printf("%s refs=%d needed=%d unneeded=%d pulling=%v %v %v", blkid, blk.RefCount, blockState.needed, blockState.unneeded, blockState.pulling, blk.Desired, changes)
        }
        return balanceResult{
                blk:        blk,
                blkid:      blkid,
-               have:       have,
-               want:       want,
+               lost:       lost,
+               blockState: blockState,
                classState: classState,
        }
 }
 
+func computeBlockState(slots []slot, onlyCount map[*KeepMount]bool, have, needRepl int) (bbs balancedBlockState) {
+       repl := 0
+       countedDev := map[string]bool{}
+       for _, slot := range slots {
+               if onlyCount != nil && !onlyCount[slot.mnt] {
+                       continue
+               }
+               if countedDev[slot.mnt.DeviceID] {
+                       continue
+               }
+               switch {
+               case slot.repl != nil && slot.want:
+                       bbs.needed++
+                       repl += slot.mnt.Replication
+               case slot.repl != nil && !slot.want:
+                       bbs.unneeded++
+                       repl += slot.mnt.Replication
+               case slot.repl == nil && slot.want && have > 0:
+                       bbs.pulling++
+                       repl += slot.mnt.Replication
+               }
+               if slot.mnt.DeviceID != "" {
+                       countedDev[slot.mnt.DeviceID] = true
+               }
+       }
+       if repl < needRepl {
+               bbs.unachievable = true
+       }
+       return
+}
+
 type blocksNBytes struct {
        replicas int
        blocks   int
@@ -830,6 +845,13 @@ func (bb blocksNBytes) String() string {
        return fmt.Sprintf("%d replicas (%d blocks, %d bytes)", bb.replicas, bb.blocks, bb.bytes)
 }
 
+type replicationStats struct {
+       needed       blocksNBytes
+       unneeded     blocksNBytes
+       pulling      blocksNBytes
+       unachievable blocksNBytes
+}
+
 type balancerStats struct {
        lost          blocksNBytes
        overrep       blocksNBytes
@@ -866,25 +888,11 @@ func (s *balancerStats) dedupBlockRatio() float64 {
        return float64(s.collectionBlockRefs) / float64(s.collectionBlocks)
 }
 
-type replicationStats struct {
-       desired      blocksNBytes
-       surplus      blocksNBytes
-       short        blocksNBytes
-       unachievable blocksNBytes
-}
-
-type balancedBlockState struct {
-       desired      int
-       surplus      int
-       unachievable bool
-}
-
 func (bal *Balancer) collectStatistics(results <-chan balanceResult) {
        var s balancerStats
        s.replHistogram = make([]int, 2)
        s.classStats = make(map[string]replicationStats, len(bal.classes))
        for result := range results {
-               surplus := result.have - result.want
                bytes := result.blkid.Size()
 
                if rc := int64(result.blk.RefCount); rc > 0 {
@@ -897,41 +905,51 @@ func (bal *Balancer) collectStatistics(results <-chan balanceResult) {
                for class, state := range result.classState {
                        cs := s.classStats[class]
                        if state.unachievable {
+                               cs.unachievable.replicas++
                                cs.unachievable.blocks++
                                cs.unachievable.bytes += bytes
                        }
-                       if state.desired > 0 {
-                               cs.desired.replicas += state.desired
-                               cs.desired.blocks++
-                               cs.desired.bytes += bytes * int64(state.desired)
+                       if state.needed > 0 {
+                               cs.needed.replicas += state.needed
+                               cs.needed.blocks++
+                               cs.needed.bytes += bytes * int64(state.needed)
                        }
-                       if state.surplus > 0 {
-                               cs.surplus.replicas += state.surplus
-                               cs.surplus.blocks++
-                               cs.surplus.bytes += bytes * int64(state.surplus)
-                       } else if state.surplus < 0 {
-                               cs.short.replicas += -state.surplus
-                               cs.short.blocks++
-                               cs.short.bytes += bytes * int64(-state.surplus)
+                       if state.unneeded > 0 {
+                               cs.unneeded.replicas += state.unneeded
+                               cs.unneeded.blocks++
+                               cs.unneeded.bytes += bytes * int64(state.unneeded)
+                       }
+                       if state.pulling > 0 {
+                               cs.pulling.replicas += state.pulling
+                               cs.pulling.blocks++
+                               cs.pulling.bytes += bytes * int64(state.pulling)
                        }
                        s.classStats[class] = cs
                }
 
+               bs := result.blockState
                switch {
-               case result.have == 0 && result.want > 0:
-                       s.lost.replicas -= surplus
+               case result.lost:
+                       s.lost.replicas++
                        s.lost.blocks++
-                       s.lost.bytes += bytes * int64(-surplus)
+                       s.lost.bytes += bytes
                        fmt.Fprintf(bal.lostBlocks, "%s", strings.SplitN(string(result.blkid), "+", 2)[0])
                        for pdh := range result.blk.Refs {
                                fmt.Fprintf(bal.lostBlocks, " %s", pdh)
                        }
                        fmt.Fprint(bal.lostBlocks, "\n")
-               case surplus < 0:
-                       s.underrep.replicas -= surplus
+               case bs.pulling > 0:
+                       s.underrep.replicas += bs.pulling
+                       s.underrep.blocks++
+                       s.underrep.bytes += bytes * int64(bs.pulling)
+               case bs.unachievable:
+                       s.underrep.replicas++
                        s.underrep.blocks++
-                       s.underrep.bytes += bytes * int64(-surplus)
-               case surplus > 0 && result.want == 0:
+                       s.underrep.bytes += bytes
+               case bs.unneeded > 0 && bs.needed == 0:
+                       // Count as "garbage" if all replicas are old
+                       // enough to trash, otherwise count as
+                       // "unref".
                        counter := &s.garbage
                        for _, r := range result.blk.Replicas {
                                if r.Mtime >= bal.MinMtime {
@@ -939,34 +957,34 @@ func (bal *Balancer) collectStatistics(results <-chan balanceResult) {
                                        break
                                }
                        }
-                       counter.replicas += surplus
+                       counter.replicas += bs.unneeded
                        counter.blocks++
-                       counter.bytes += bytes * int64(surplus)
-               case surplus > 0:
-                       s.overrep.replicas += surplus
+                       counter.bytes += bytes * int64(bs.unneeded)
+               case bs.unneeded > 0:
+                       s.overrep.replicas += bs.unneeded
                        s.overrep.blocks++
-                       s.overrep.bytes += bytes * int64(result.have-result.want)
+                       s.overrep.bytes += bytes * int64(bs.unneeded)
                default:
-                       s.justright.replicas += result.want
+                       s.justright.replicas += bs.needed
                        s.justright.blocks++
-                       s.justright.bytes += bytes * int64(result.want)
+                       s.justright.bytes += bytes * int64(bs.needed)
                }
 
-               if result.want > 0 {
-                       s.desired.replicas += result.want
+               if bs.needed > 0 {
+                       s.desired.replicas += bs.needed
                        s.desired.blocks++
-                       s.desired.bytes += bytes * int64(result.want)
+                       s.desired.bytes += bytes * int64(bs.needed)
                }
-               if result.have > 0 {
-                       s.current.replicas += result.have
+               if bs.needed+bs.unneeded > 0 {
+                       s.current.replicas += bs.needed + bs.unneeded
                        s.current.blocks++
-                       s.current.bytes += bytes * int64(result.have)
+                       s.current.bytes += bytes * int64(bs.needed+bs.unneeded)
                }
 
-               for len(s.replHistogram) <= result.have {
+               for len(s.replHistogram) <= bs.needed+bs.unneeded {
                        s.replHistogram = append(s.replHistogram, 0)
                }
-               s.replHistogram[result.have]++
+               s.replHistogram[bs.needed+bs.unneeded]++
        }
        for _, srv := range bal.KeepServices {
                s.pulls += len(srv.ChangeSet.Pulls)
@@ -990,9 +1008,9 @@ func (bal *Balancer) PrintStatistics() {
        for _, class := range bal.classes {
                cs := bal.stats.classStats[class]
                bal.logf("===")
-               bal.logf("storage class %q: %s desired", class, cs.desired)
-               bal.logf("storage class %q: %s short", class, cs.short)
-               bal.logf("storage class %q: %s surplus", class, cs.surplus)
+               bal.logf("storage class %q: %s needed", class, cs.needed)
+               bal.logf("storage class %q: %s unneeded", class, cs.unneeded)
+               bal.logf("storage class %q: %s pulling", class, cs.pulling)
                bal.logf("storage class %q: %s unachievable", class, cs.unachievable)
        }
        bal.logf("===")
@@ -1008,7 +1026,7 @@ func (bal *Balancer) PrintStatistics() {
 }
 
 func (bal *Balancer) printHistogram(hashColumns int) {
-       bal.logf("Replication level distribution (counting N replicas on a single server as N):")
+       bal.logf("Replication level distribution:")
        maxCount := 0
        for _, count := range bal.stats.replHistogram {
                if maxCount < count {