"github.com/prometheus/client_golang/prometheus"
)
+const (
+ tagKeyInstanceType = "InstanceType"
+ tagKeyHold = "Hold"
+)
+
// A View shows a worker's current state and recent activity.
type View struct {
Instance string
timeoutProbe: duration(cluster.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
}
wp.registerMetrics(reg)
- go wp.run()
+ go func() {
+ wp.setupOnce.Do(wp.setup)
+ go wp.runMetrics()
+ go wp.runProbes()
+ go wp.runSync()
+ }()
return wp
}
wp.setupOnce.Do(wp.setup)
wp.mtx.Lock()
defer wp.mtx.Unlock()
- tags := cloud.InstanceTags{"InstanceType": it.Name}
+ tags := cloud.InstanceTags{tagKeyInstanceType: it.Name}
wp.creating[it]++
go func() {
inst, err := wp.instanceSet.Create(it, wp.imageID, tags, nil)
}
return
}
- if initialState == StateUnknown && inst.Tags()["hold"] != "" {
+ if initialState == StateUnknown && inst.Tags()[tagKeyHold] != "" {
initialState = StateHold
}
wp.logger.WithFields(logrus.Fields{
"Instance": inst,
"State": initialState,
}).Infof("instance appeared in cloud")
+ now := time.Now()
wp.workers[id] = &worker{
executor: wp.newExecutor(inst),
state: initialState,
instance: inst,
instType: it,
- probed: time.Now(),
- busy: time.Now(),
- updated: time.Now(),
- unallocated: time.Now(),
+ probed: now,
+ busy: now,
+ updated: now,
+ unallocated: now,
running: make(map[string]struct{}),
starting: make(map[string]struct{}),
probing: make(chan struct{}, 1),
reg.MustRegister(wp.mMemoryInuse)
}
+func (wp *Pool) runMetrics() {
+ ch := wp.Subscribe()
+ defer wp.Unsubscribe(ch)
+ for range ch {
+ wp.updateMetrics()
+ }
+}
+
func (wp *Pool) updateMetrics() {
wp.mtx.RLock()
defer wp.mtx.RUnlock()
wp.mMemoryInuse.Set(float64(memInuse))
}
-func (wp *Pool) run() {
- wp.setupOnce.Do(wp.setup)
+func (wp *Pool) runProbes() {
+ maxPPS := wp.maxProbesPerSecond
+ if maxPPS < 1 {
+ maxPPS = defaultMaxProbesPerSecond
+ }
+ limitticker := time.NewTicker(time.Second / time.Duration(maxPPS))
+ defer limitticker.Stop()
- go func() {
- ch := wp.Subscribe()
- defer wp.Unsubscribe(ch)
- for range ch {
- wp.updateMetrics()
- }
- }()
+ probeticker := time.NewTicker(wp.probeInterval)
+ defer probeticker.Stop()
- go func() {
- maxPPS := wp.maxProbesPerSecond
- if maxPPS < 1 {
- maxPPS = defaultMaxProbesPerSecond
+ workers := []cloud.InstanceID{}
+ for range probeticker.C {
+ workers = workers[:0]
+ wp.mtx.Lock()
+ for id, wkr := range wp.workers {
+ if wkr.state == StateShutdown || wp.autoShutdown(wkr) {
+ continue
+ }
+ workers = append(workers, id)
}
- limitticker := time.NewTicker(time.Second / time.Duration(maxPPS))
- defer limitticker.Stop()
-
- probeticker := time.NewTicker(wp.probeInterval)
- defer probeticker.Stop()
+ wp.mtx.Unlock()
- workers := []cloud.InstanceID{}
- for range probeticker.C {
- workers = workers[:0]
+ for _, id := range workers {
wp.mtx.Lock()
- for id, wkr := range wp.workers {
- if wkr.state == StateShutdown || wp.autoShutdown(wkr) {
- continue
- }
- workers = append(workers, id)
- }
+ wkr, ok := wp.workers[id]
wp.mtx.Unlock()
-
- for _, id := range workers {
- wp.mtx.Lock()
- wkr, ok := wp.workers[id]
- wp.mtx.Unlock()
- if !ok || wkr.state == StateShutdown {
- // Deleted/shutdown while we
- // were probing others
- continue
- }
- select {
- case wkr.probing <- struct{}{}:
- go func() {
- wp.probeAndUpdate(wkr)
- <-wkr.probing
- }()
- default:
- wp.logger.WithField("Instance", wkr.instance).Debug("still waiting for last probe to finish")
- }
- select {
- case <-wp.stop:
- return
- case <-limitticker.C:
- }
+ if !ok || wkr.state == StateShutdown {
+ // Deleted/shutdown while we
+ // were probing others
+ continue
+ }
+ select {
+ case wkr.probing <- struct{}{}:
+ go func() {
+ wp.probeAndUpdate(wkr)
+ <-wkr.probing
+ }()
+ default:
+ wp.logger.WithField("Instance", wkr.instance).Debug("still waiting for last probe to finish")
+ }
+ select {
+ case <-wp.stop:
+ return
+ case <-limitticker.C:
}
}
- }()
+ }
+}
+func (wp *Pool) runSync() {
// sync once immediately, then wait syncInterval, sync again,
// etc.
timer := time.NewTimer(1)
wp.logger.WithField("Instances", len(instances)).Debug("sync instances")
for _, inst := range instances {
- itTag := inst.Tags()["InstanceType"]
+ itTag := inst.Tags()[tagKeyInstanceType]
it, ok := wp.instanceTypes[itTag]
if !ok {
wp.logger.WithField("Instance", inst).Errorf("unknown InstanceType tag %q --- ignoring", itTag)