"github.com/prometheus/client_golang/prometheus"
)
-// A View shows a worker's current state and recent activity.
-type View struct {
+const (
+ tagKeyInstanceType = "InstanceType"
+ tagKeyHold = "Hold"
+)
+
+// An InstanceView shows a worker's current state and recent activity.
+type InstanceView struct {
Instance string
Price float64
ArvadosInstanceType string
timeoutProbe: duration(cluster.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
}
wp.registerMetrics(reg)
- go wp.run()
+ go func() {
+ wp.setupOnce.Do(wp.setup)
+ go wp.runMetrics()
+ go wp.runProbes()
+ go wp.runSync()
+ }()
return wp
}
wp.setupOnce.Do(wp.setup)
wp.mtx.Lock()
defer wp.mtx.Unlock()
- tags := cloud.InstanceTags{"InstanceType": it.Name}
+ tags := cloud.InstanceTags{tagKeyInstanceType: it.Name}
wp.creating[it]++
go func() {
+ defer wp.notify()
inst, err := wp.instanceSet.Create(it, wp.imageID, tags, nil)
wp.mtx.Lock()
defer wp.mtx.Unlock()
}
if err != nil {
logger.WithError(err).Error("create failed")
- go wp.notify()
return
}
wp.updateWorker(inst, it, StateBooting)
// AtQuota returns true if Create is not expected to work at the
// moment.
func (wp *Pool) AtQuota() bool {
+ wp.mtx.Lock()
+ defer wp.mtx.Unlock()
return time.Now().Before(wp.atQuotaUntil)
}
// Add or update worker attached to the given instance. Use
// initialState if a new worker is created. Caller must have lock.
-func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initialState State) {
+//
+// Returns true when a new worker is created.
+func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initialState State) bool {
id := inst.ID()
if wp.workers[id] != nil {
wp.workers[id].executor.SetTarget(inst)
if initialState == StateBooting && wp.workers[id].state == StateUnknown {
wp.workers[id].state = StateBooting
}
- return
+ return false
}
- if initialState == StateUnknown && inst.Tags()["hold"] != "" {
+ if initialState == StateUnknown && inst.Tags()[tagKeyHold] != "" {
initialState = StateHold
}
wp.logger.WithFields(logrus.Fields{
"Instance": inst,
"State": initialState,
}).Infof("instance appeared in cloud")
+ now := time.Now()
wp.workers[id] = &worker{
executor: wp.newExecutor(inst),
state: initialState,
instance: inst,
instType: it,
- probed: time.Now(),
- busy: time.Now(),
- updated: time.Now(),
- unallocated: time.Now(),
+ probed: now,
+ busy: now,
+ updated: now,
+ unallocated: now,
running: make(map[string]struct{}),
starting: make(map[string]struct{}),
probing: make(chan struct{}, 1),
}
- go wp.notify()
+ return true
}
// Shutdown shuts down a worker with the given type, or returns false
go func() {
err := wkr.instance.Destroy()
if err != nil {
- logger.WithError(err).Warn("shutdown failed")
+ logger.WithError(err).WithField("Instance", wkr.instance).Warn("shutdown failed")
return
}
wp.mtx.Lock()
stdout, stderr, err := wkr.executor.Execute("crunch-run --detach '"+ctr.UUID+"'", nil)
wp.mtx.Lock()
defer wp.mtx.Unlock()
- wkr.updated = time.Now()
+ now := time.Now()
+ wkr.updated = now
+ wkr.busy = now
delete(wkr.starting, ctr.UUID)
wkr.running[ctr.UUID] = struct{}{}
+ wkr.lastUUID = ctr.UUID
if err != nil {
logger.WithField("stdout", string(stdout)).
WithField("stderr", string(stderr)).
// KillContainer kills the crunch-run process for the given container
// UUID, if it's running on any worker.
+//
+// KillContainer returns immediately; the act of killing the container
+// takes some time, and runs in the background.
func (wp *Pool) KillContainer(uuid string) {
wp.mtx.Lock()
defer wp.mtx.Unlock()
reg.MustRegister(wp.mMemoryInuse)
}
+func (wp *Pool) runMetrics() {
+ ch := wp.Subscribe()
+ defer wp.Unsubscribe(ch)
+ for range ch {
+ wp.updateMetrics()
+ }
+}
+
func (wp *Pool) updateMetrics() {
wp.mtx.RLock()
defer wp.mtx.RUnlock()
wp.mMemoryInuse.Set(float64(memInuse))
}
-func (wp *Pool) run() {
- wp.setupOnce.Do(wp.setup)
+func (wp *Pool) runProbes() {
+ maxPPS := wp.maxProbesPerSecond
+ if maxPPS < 1 {
+ maxPPS = defaultMaxProbesPerSecond
+ }
+ limitticker := time.NewTicker(time.Second / time.Duration(maxPPS))
+ defer limitticker.Stop()
- go func() {
- ch := wp.Subscribe()
- defer wp.Unsubscribe(ch)
- for range ch {
- wp.updateMetrics()
- }
- }()
+ probeticker := time.NewTicker(wp.probeInterval)
+ defer probeticker.Stop()
- go func() {
- maxPPS := wp.maxProbesPerSecond
- if maxPPS < 1 {
- maxPPS = defaultMaxProbesPerSecond
+ workers := []cloud.InstanceID{}
+ for range probeticker.C {
+ workers = workers[:0]
+ wp.mtx.Lock()
+ for id, wkr := range wp.workers {
+ if wkr.state == StateShutdown || wp.shutdownIfIdle(wkr) {
+ continue
+ }
+ workers = append(workers, id)
}
- limitticker := time.NewTicker(time.Second / time.Duration(maxPPS))
- defer limitticker.Stop()
-
- probeticker := time.NewTicker(wp.probeInterval)
- defer probeticker.Stop()
+ wp.mtx.Unlock()
- workers := []cloud.InstanceID{}
- for range probeticker.C {
- workers = workers[:0]
+ for _, id := range workers {
wp.mtx.Lock()
- for id, wkr := range wp.workers {
- if wkr.state == StateShutdown || wp.autoShutdown(wkr) {
- continue
- }
- workers = append(workers, id)
- }
+ wkr, ok := wp.workers[id]
wp.mtx.Unlock()
-
- for _, id := range workers {
- wp.mtx.Lock()
- wkr, ok := wp.workers[id]
- wp.mtx.Unlock()
- if !ok || wkr.state == StateShutdown {
- // Deleted/shutdown while we
- // were probing others
- continue
- }
- select {
- case wkr.probing <- struct{}{}:
- go func() {
- wp.probeAndUpdate(wkr)
- <-wkr.probing
- }()
- default:
- wp.logger.WithField("Instance", wkr.instance).Debug("still waiting for last probe to finish")
- }
- select {
- case <-wp.stop:
- return
- case <-limitticker.C:
- }
+ if !ok || wkr.state == StateShutdown {
+ // Deleted/shutdown while we
+ // were probing others
+ continue
+ }
+ select {
+ case wkr.probing <- struct{}{}:
+ go func() {
+ wp.probeAndUpdate(wkr)
+ <-wkr.probing
+ }()
+ default:
+ wp.logger.WithField("Instance", wkr.instance).Debug("still waiting for last probe to finish")
+ }
+ select {
+ case <-wp.stop:
+ return
+ case <-limitticker.C:
}
}
- }()
+ }
+}
+func (wp *Pool) runSync() {
// sync once immediately, then wait syncInterval, sync again,
// etc.
timer := time.NewTimer(1)
}
// caller must have lock.
-func (wp *Pool) autoShutdown(wkr *worker) bool {
+func (wp *Pool) shutdownIfBroken(wkr *worker, dur time.Duration) {
+ if wkr.state == StateHold {
+ return
+ }
+ label, threshold := "", wp.timeoutProbe
+ if wkr.state == StateBooting {
+ label, threshold = "new ", wp.timeoutBooting
+ }
+ if dur < threshold {
+ return
+ }
+ wp.logger.WithFields(logrus.Fields{
+ "Instance": wkr.instance,
+ "Duration": dur,
+ "Since": wkr.probed,
+ "State": wkr.state,
+ }).Warnf("%sinstance unresponsive, shutting down", label)
+ wp.shutdown(wkr, wp.logger)
+}
+
+// caller must have lock.
+func (wp *Pool) shutdownIfIdle(wkr *worker) bool {
if len(wkr.running)+len(wkr.starting) > 0 || wkr.state != StateRunning {
return false
}
close(wp.stop)
}
-// View reports status information for every worker in the pool.
-func (wp *Pool) View() []View {
- var r []View
+// Instances returns an InstanceView for each worker in the pool,
+// summarizing its current state and recent activity.
+func (wp *Pool) Instances() []InstanceView {
+ var r []InstanceView
wp.setupOnce.Do(wp.setup)
wp.mtx.Lock()
for _, w := range wp.workers {
- r = append(r, View{
+ r = append(r, InstanceView{
Instance: w.instance.String(),
Price: w.instType.Price,
ArvadosInstanceType: w.instType.Name,
wp.mtx.Lock()
defer wp.mtx.Unlock()
wp.logger.WithField("Instances", len(instances)).Debug("sync instances")
+ notify := false
for _, inst := range instances {
- itTag := inst.Tags()["InstanceType"]
+ itTag := inst.Tags()[tagKeyInstanceType]
it, ok := wp.instanceTypes[itTag]
if !ok {
wp.logger.WithField("Instance", inst).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
continue
}
- wp.updateWorker(inst, it, StateUnknown)
+ if wp.updateWorker(inst, it, StateUnknown) {
+ notify = true
+ }
}
for id, wkr := range wp.workers {
logger.Info("instance disappeared in cloud")
delete(wp.workers, id)
go wkr.executor.Close()
- go wp.notify()
+ notify = true
}
if !wp.loaded {
wp.loaded = true
wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")
}
+
+ if notify {
+ go wp.notify()
+ }
}
// should be called in a new goroutine
} else {
logger.Info("instance not responding")
}
-
- if wkr.state == StateHold {
- return
- }
-
- label, threshold := "", wp.timeoutProbe
- if wkr.state == StateBooting {
- label, threshold = "new ", wp.timeoutBooting
- }
- if dur > threshold {
- logger.WithField("Since", wkr.probed).Warnf("%sinstance unresponsive, shutting down", label)
- wp.shutdown(wkr, logger)
- }
+ wp.shutdownIfBroken(wkr, dur)
return
}
updateTime := time.Now()
wkr.probed = updateTime
- if len(ctrUUIDs) > 0 {
- wkr.busy = updateTime
- wkr.lastUUID = ctrUUIDs[0]
- }
if wkr.state == StateShutdown || wkr.state == StateHold {
} else if booted {
if wkr.state != StateRunning {
}
if updated != wkr.updated {
- // Worker was updated (e.g., by starting a new
- // container) after the probe began. Avoid clobbering
- // those changes with the probe results.
+ // Worker was updated after the probe began, so
+ // wkr.running might have a container UUID that was
+ // not yet running when ctrUUIDs was generated. Leave
+ // wkr.running alone and wait for the next probe to
+ // catch up on any changes.
return
}
- if len(ctrUUIDs) == 0 && len(wkr.running) > 0 {
+ if len(ctrUUIDs) > 0 {
+ wkr.busy = updateTime
+ wkr.lastUUID = ctrUUIDs[0]
+ } else if len(wkr.running) > 0 {
wkr.unallocated = updateTime
}
running := map[string]struct{}{}