timeoutProbe: duration(cluster.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
}
wp.registerMetrics(reg)
- go wp.run()
+ go func() {
+ wp.setupOnce.Do(wp.setup)
+ go wp.runMetrics()
+ go wp.runProbes()
+ go wp.runSync()
+ }()
return wp
}
reg.MustRegister(wp.mMemoryInuse)
}
+func (wp *Pool) runMetrics() {
+ ch := wp.Subscribe()
+ defer wp.Unsubscribe(ch)
+ for range ch {
+ wp.updateMetrics()
+ }
+}
+
func (wp *Pool) updateMetrics() {
wp.mtx.RLock()
defer wp.mtx.RUnlock()
wp.mMemoryInuse.Set(float64(memInuse))
}
-func (wp *Pool) run() {
- wp.setupOnce.Do(wp.setup)
+func (wp *Pool) runProbes() {
+ maxPPS := wp.maxProbesPerSecond
+ if maxPPS < 1 {
+ maxPPS = defaultMaxProbesPerSecond
+ }
+ limitticker := time.NewTicker(time.Second / time.Duration(maxPPS))
+ defer limitticker.Stop()
- go func() {
- ch := wp.Subscribe()
- defer wp.Unsubscribe(ch)
- for range ch {
- wp.updateMetrics()
- }
- }()
+ probeticker := time.NewTicker(wp.probeInterval)
+ defer probeticker.Stop()
- go func() {
- maxPPS := wp.maxProbesPerSecond
- if maxPPS < 1 {
- maxPPS = defaultMaxProbesPerSecond
+ workers := []cloud.InstanceID{}
+ for range probeticker.C {
+ workers = workers[:0]
+ wp.mtx.Lock()
+ for id, wkr := range wp.workers {
+ if wkr.state == StateShutdown || wp.autoShutdown(wkr) {
+ continue
+ }
+ workers = append(workers, id)
}
- limitticker := time.NewTicker(time.Second / time.Duration(maxPPS))
- defer limitticker.Stop()
-
- probeticker := time.NewTicker(wp.probeInterval)
- defer probeticker.Stop()
+ wp.mtx.Unlock()
- workers := []cloud.InstanceID{}
- for range probeticker.C {
- workers = workers[:0]
+ for _, id := range workers {
wp.mtx.Lock()
- for id, wkr := range wp.workers {
- if wkr.state == StateShutdown || wp.autoShutdown(wkr) {
- continue
- }
- workers = append(workers, id)
- }
+ wkr, ok := wp.workers[id]
wp.mtx.Unlock()
-
- for _, id := range workers {
- wp.mtx.Lock()
- wkr, ok := wp.workers[id]
- wp.mtx.Unlock()
- if !ok || wkr.state == StateShutdown {
- // Deleted/shutdown while we
- // were probing others
- continue
- }
- select {
- case wkr.probing <- struct{}{}:
- go func() {
- wp.probeAndUpdate(wkr)
- <-wkr.probing
- }()
- default:
- wp.logger.WithField("Instance", wkr.instance).Debug("still waiting for last probe to finish")
- }
- select {
- case <-wp.stop:
- return
- case <-limitticker.C:
- }
+ if !ok || wkr.state == StateShutdown {
+ // Deleted/shutdown while we
+ // were probing others
+ continue
+ }
+ select {
+ case wkr.probing <- struct{}{}:
+ go func() {
+ wp.probeAndUpdate(wkr)
+ <-wkr.probing
+ }()
+ default:
+ wp.logger.WithField("Instance", wkr.instance).Debug("still waiting for last probe to finish")
+ }
+ select {
+ case <-wp.stop:
+ return
+ case <-limitticker.C:
}
}
- }()
+ }
+}
+func (wp *Pool) runSync() {
// sync once immediately, then wait syncInterval, sync again,
// etc.
timer := time.NewTimer(1)