return
}
}
+ bal.cleanupMounts()
if err = bal.CheckSanityEarly(&config.Client); err != nil {
return
})
}
+func (bal *Balancer) cleanupMounts() {
+ rwdev := map[string]*KeepService{}
+ for _, srv := range bal.KeepServices {
+ for _, mnt := range srv.mounts {
+ if !mnt.ReadOnly && mnt.DeviceID != "" {
+ rwdev[mnt.DeviceID] = srv
+ }
+ }
+ }
+ // Drop the readonly mounts whose device is mounted RW
+ // elsewhere.
+ for _, srv := range bal.KeepServices {
+ var dedup []*KeepMount
+ for _, mnt := range srv.mounts {
+ if mnt.ReadOnly && rwdev[mnt.DeviceID] != nil {
+ bal.logf("skipping srv %s readonly mount %q because same device %q is mounted read-write on srv %s", srv, mnt.UUID, mnt.DeviceID, rwdev[mnt.DeviceID])
+ } else {
+ dedup = append(dedup, mnt)
+ }
+ }
+ srv.mounts = dedup
+ }
+ for _, srv := range bal.KeepServices {
+ for _, mnt := range srv.mounts {
+ if mnt.Replication <= 0 {
+ log.Printf("%s: mount %s reports replication=%d, using replication=1", srv, mnt.UUID, mnt.Replication)
+ mnt.Replication = 1
+ }
+ }
+ }
+}
+
// CheckSanityEarly checks for configuration and runtime errors that
// can be detected before GetCurrentState() and ComputeChangeSets()
// are called.
errs := make(chan error, 2+len(bal.KeepServices))
wg := sync.WaitGroup{}
- // Start one goroutine for each KeepService: retrieve the
- // index, and add the returned blocks to BlockStateMap.
+ // When a device is mounted more than once, we will get its
+ // index only once, and call AddReplicas on all of the mounts.
+ // equivMount keys are the mounts that will be indexed, and
+ // each value is a list of mounts to apply the received index
+ // to.
+ equivMount := map[*KeepMount][]*KeepMount{}
+ // deviceMount maps each device ID to the one mount that will
+ // be indexed for that device.
+ deviceMount := map[string]*KeepMount{}
for _, srv := range bal.KeepServices {
+ for _, mnt := range srv.mounts {
+ equiv := deviceMount[mnt.DeviceID]
+ if equiv == nil {
+ equiv = mnt
+ if mnt.DeviceID != "" {
+ deviceMount[mnt.DeviceID] = equiv
+ }
+ }
+ equivMount[equiv] = append(equivMount[equiv], mnt)
+ }
+ }
+
+ // Start one goroutine for each (non-redundant) mount:
+ // retrieve the index, and add the returned blocks to
+ // BlockStateMap.
+ for _, mounts := range equivMount {
wg.Add(1)
- go func(srv *KeepService) {
+ go func(mounts []*KeepMount) {
defer wg.Done()
- bal.logf("%s: retrieve indexes", srv)
- for _, mount := range srv.mounts {
- bal.logf("%s: retrieve index", mount)
- idx, err := srv.IndexMount(c, mount.UUID, "")
- if err != nil {
- errs <- fmt.Errorf("%s: retrieve index: %v", mount, err)
- return
- }
- if len(errs) > 0 {
- // Some other goroutine encountered an
- // error -- any further effort here
- // will be wasted.
- return
- }
+ bal.logf("mount %s: retrieve index from %s", mounts[0], mounts[0].KeepService)
+ idx, err := mounts[0].KeepService.IndexMount(c, mounts[0].UUID, "")
+ if err != nil {
+ errs <- fmt.Errorf("%s: retrieve index: %v", mounts[0], err)
+ return
+ }
+ if len(errs) > 0 {
+ // Some other goroutine encountered an
+ // error -- any further effort here
+ // will be wasted.
+ return
+ }
+ for _, mount := range mounts {
bal.logf("%s: add %d replicas to map", mount, len(idx))
bal.BlockStateMap.AddReplicas(mount, idx)
- bal.logf("%s: done", mount)
+ bal.logf("%s: added %d replicas", mount, len(idx))
}
- bal.logf("%s: done", srv)
- }(srv)
+ bal.logf("mount %s: index done", mounts[0])
+ }(mounts)
}
// collQ buffers incoming collections so we can start fetching
blkid arvados.SizedDigest
blk *BlockState
}
- nWorkers := 1 + runtime.NumCPU()
- todo := make(chan balanceTask, nWorkers)
- results := make(chan balanceResult, 16)
- var wg sync.WaitGroup
- for i := 0; i < nWorkers; i++ {
- wg.Add(1)
- go func() {
- for work := range todo {
- results <- bal.balanceBlock(work.blkid, work.blk)
+ workers := runtime.GOMAXPROCS(-1)
+ todo := make(chan balanceTask, workers)
+ go func() {
+ bal.BlockStateMap.Apply(func(blkid arvados.SizedDigest, blk *BlockState) {
+ todo <- balanceTask{
+ blkid: blkid,
+ blk: blk,
}
- wg.Done()
- }()
- }
- bal.BlockStateMap.Apply(func(blkid arvados.SizedDigest, blk *BlockState) {
- todo <- balanceTask{
- blkid: blkid,
- blk: blk,
- }
- })
- close(todo)
+ })
+ close(todo)
+ }()
+ results := make(chan balanceResult, workers)
go func() {
+ var wg sync.WaitGroup
+ for i := 0; i < workers; i++ {
+ wg.Add(1)
+ go func() {
+ for work := range todo {
+ results <- bal.balanceBlock(work.blkid, work.blk)
+ }
+ wg.Done()
+ }()
+ }
wg.Wait()
close(results)
}()
for _, class := range bal.classes {
desired := blk.Desired[class]
+ countedDev := map[string]bool{}
have := 0
for _, slot := range slots {
- if slot.repl != nil && bal.mountsByClass[class][slot.mnt] {
- have++
+ if slot.repl != nil && bal.mountsByClass[class][slot.mnt] && !countedDev[slot.mnt.DeviceID] {
+ have += slot.mnt.Replication
+ if slot.mnt.DeviceID != "" {
+ countedDev[slot.mnt.DeviceID] = true
+ }
}
}
classState[class] = balancedBlockState{
}
})
- // Servers and mounts (with or without existing
+ // Servers/mounts/devices (with or without existing
// replicas) that are part of the best achievable
// layout for this storage class.
wantSrv := map[*KeepService]bool{}
wantMnt := map[*KeepMount]bool{}
+ wantDev := map[string]bool{}
// Positions (with existing replicas) that have been
// protected (via unsafeToDelete) to ensure we don't
// reduce replication below desired level when
// trashing replicas that aren't optimal positions for
// any storage class.
protMnt := map[*KeepMount]bool{}
+ // Replication planned so far (corresponds to wantMnt).
+ replWant := 0
+ // Protected replication (corresponds to protMnt).
+ replProt := 0
// trySlot tries using a slot to meet requirements,
// and returns true if all requirements are met.
trySlot := func(i int) bool {
slot := slots[i]
- if len(protMnt) < desired && slot.repl != nil {
+ if wantMnt[slot.mnt] || wantDev[slot.mnt.DeviceID] {
+ // Already allocated a replica to this
+ // backend device, possibly on a
+ // different server.
+ return false
+ }
+ if replProt < desired && slot.repl != nil && !protMnt[slot.mnt] {
unsafeToDelete[slot.repl.Mtime] = true
protMnt[slot.mnt] = true
+ replProt += slot.mnt.Replication
}
- if len(wantMnt) < desired && (slot.repl != nil || !slot.mnt.ReadOnly) {
+ if replWant < desired && (slot.repl != nil || !slot.mnt.ReadOnly) {
slots[i].want = true
wantSrv[slot.mnt.KeepService] = true
wantMnt[slot.mnt] = true
+ if slot.mnt.DeviceID != "" {
+ wantDev[slot.mnt.DeviceID] = true
+ }
+ replWant += slot.mnt.Replication
}
- return len(protMnt) >= desired && len(wantMnt) >= desired
+ return replProt >= desired && replWant >= desired
}
// First try to achieve desired replication without
if slot.repl == nil || !bal.mountsByClass[class][slot.mnt] {
continue
}
- if safe++; safe >= desired {
+ if safe += slot.mnt.Replication; safe >= desired {
break
}
}
cs.unachievable = true
classState[class] = cs
}
+
+ // Avoid deleting wanted replicas from devices that
+ // are mounted on multiple servers -- even if they
+ // haven't already been added to unsafeToDelete
+ // because the servers report different Mtimes.
+ for _, slot := range slots {
+ if slot.repl != nil && wantDev[slot.mnt.DeviceID] {
+ unsafeToDelete[slot.repl.Mtime] = true
+ }
+ }
}
// TODO: If multiple replicas are trashable, prefer the oldest
// replica that doesn't have a timestamp collision with
// others.
+ countedDev := map[string]bool{}
var have, want int
for _, slot := range slots {
+ if countedDev[slot.mnt.DeviceID] {
+ continue
+ }
if slot.want {
- want++
+ want += slot.mnt.Replication
}
if slot.repl != nil {
- have++
+ have += slot.mnt.Replication
+ }
+ if slot.mnt.DeviceID != "" {
+ countedDev[slot.mnt.DeviceID] = true
}
}
case surplus > 0:
s.overrep.replicas += surplus
s.overrep.blocks++
- s.overrep.bytes += bytes * int64(len(result.blk.Replicas)-result.want)
+ s.overrep.bytes += bytes * int64(result.have-result.want)
default:
s.justright.replicas += result.want
s.justright.blocks++
s.desired.blocks++
s.desired.bytes += bytes * int64(result.want)
}
- if len(result.blk.Replicas) > 0 {
- s.current.replicas += len(result.blk.Replicas)
+ if result.have > 0 {
+ s.current.replicas += result.have
s.current.blocks++
- s.current.bytes += bytes * int64(len(result.blk.Replicas))
+ s.current.bytes += bytes * int64(result.have)
}
- for len(s.replHistogram) <= len(result.blk.Replicas) {
+ for len(s.replHistogram) <= result.have {
s.replHistogram = append(s.replHistogram, 0)
}
- s.replHistogram[len(result.blk.Replicas)]++
+ s.replHistogram[result.have]++
}
for _, srv := range bal.KeepServices {
s.pulls += len(srv.ChangeSet.Pulls)