tagKeyHold = "Hold"
)
-// A View shows a worker's current state and recent activity.
-type View struct {
+// An InstanceView shows a worker's current state and recent activity.
+type InstanceView struct {
Instance string
Price float64
ArvadosInstanceType string
// AtQuota returns true if Create is not expected to work at the
// moment.
func (wp *Pool) AtQuota() bool {
+ wp.mtx.Lock()
+ defer wp.mtx.Unlock()
return time.Now().Before(wp.atQuotaUntil)
}
go func() {
err := wkr.instance.Destroy()
if err != nil {
- logger.WithError(err).Warn("shutdown failed")
+ logger.WithError(err).WithField("Instance", wkr.instance).Warn("shutdown failed")
return
}
wp.mtx.Lock()
stdout, stderr, err := wkr.executor.Execute("crunch-run --detach '"+ctr.UUID+"'", nil)
wp.mtx.Lock()
defer wp.mtx.Unlock()
- wkr.updated = time.Now()
+ now := time.Now()
+ wkr.updated = now
+ wkr.busy = now
delete(wkr.starting, ctr.UUID)
wkr.running[ctr.UUID] = struct{}{}
+ wkr.lastUUID = ctr.UUID
if err != nil {
logger.WithField("stdout", string(stdout)).
WithField("stderr", string(stderr)).
// KillContainer kills the crunch-run process for the given container
// UUID, if it's running on any worker.
+//
+// KillContainer returns immediately; the act of killing the container
+// takes some time, and runs in the background.
func (wp *Pool) KillContainer(uuid string) {
wp.mtx.Lock()
defer wp.mtx.Unlock()
workers = workers[:0]
wp.mtx.Lock()
for id, wkr := range wp.workers {
- if wkr.state == StateShutdown || wp.autoShutdown(wkr) {
+ if wkr.state == StateShutdown || wp.shutdownIfIdle(wkr) {
continue
}
workers = append(workers, id)
}
// caller must have lock.
-func (wp *Pool) autoShutdown(wkr *worker) bool {
+func (wp *Pool) shutdownIfBroken(wkr *worker, dur time.Duration) {
+ if wkr.state == StateHold {
+ return
+ }
+ label, threshold := "", wp.timeoutProbe
+ if wkr.state == StateBooting {
+ label, threshold = "new ", wp.timeoutBooting
+ }
+ if dur < threshold {
+ return
+ }
+ wp.logger.WithFields(logrus.Fields{
+ "Instance": wkr.instance,
+ "Duration": dur,
+ "Since": wkr.probed,
+ "State": wkr.state,
+ }).Warnf("%sinstance unresponsive, shutting down", label)
+ wp.shutdown(wkr, wp.logger)
+}
+
+// caller must have lock.
+func (wp *Pool) shutdownIfIdle(wkr *worker) bool {
if len(wkr.running)+len(wkr.starting) > 0 || wkr.state != StateRunning {
return false
}
close(wp.stop)
}
-// View reports status information for every worker in the pool.
-func (wp *Pool) View() []View {
- var r []View
+// Instances returns an InstanceView for each worker in the pool,
+// summarizing its current state and recent activity.
+func (wp *Pool) Instances() []InstanceView {
+ var r []InstanceView
wp.setupOnce.Do(wp.setup)
wp.mtx.Lock()
for _, w := range wp.workers {
- r = append(r, View{
+ r = append(r, InstanceView{
Instance: w.instance.String(),
Price: w.instType.Price,
ArvadosInstanceType: w.instType.Name,
} else {
logger.Info("instance not responding")
}
-
- if wkr.state == StateHold {
- return
- }
-
- label, threshold := "", wp.timeoutProbe
- if wkr.state == StateBooting {
- label, threshold = "new ", wp.timeoutBooting
- }
- if dur > threshold {
- logger.WithField("Since", wkr.probed).Warnf("%sinstance unresponsive, shutting down", label)
- wp.shutdown(wkr, logger)
- }
+ wp.shutdownIfBroken(wkr, dur)
return
}
updateTime := time.Now()
wkr.probed = updateTime
- if len(ctrUUIDs) > 0 {
- wkr.busy = updateTime
- wkr.lastUUID = ctrUUIDs[0]
- }
if wkr.state == StateShutdown || wkr.state == StateHold {
} else if booted {
if wkr.state != StateRunning {
}
if updated != wkr.updated {
- // Worker was updated (e.g., by starting a new
- // container) after the probe began. Avoid clobbering
- // those changes with the probe results.
+ // Worker was updated after the probe began, so
+ // wkr.running might have a container UUID that was
+ // not yet running when ctrUUIDs was generated. Leave
+ // wkr.running alone and wait for the next probe to
+ // catch up on any changes.
return
}
- if len(ctrUUIDs) == 0 && len(wkr.running) > 0 {
+ if len(ctrUUIDs) > 0 {
+ wkr.busy = updateTime
+ wkr.lastUUID = ctrUUIDs[0]
+ } else if len(wkr.running) > 0 {
wkr.unallocated = updateTime
}
running := map[string]struct{}{}