ArvadosInstanceType string
ProviderInstanceType string
LastContainerUUID string
- Unallocated time.Time
+ LastBusy time.Time
WorkerState string
}
loaded bool // loaded list of instances from InstanceSet at least once
exited map[string]time.Time // containers whose crunch-run proc has exited, but KillContainer has not been called
atQuotaUntil time.Time
+ atQuotaErr cloud.QuotaError
stop chan bool
mtx sync.RWMutex
setupOnce sync.Once
}
type worker struct {
- state State
- instance cloud.Instance
- executor Executor
- instType arvados.InstanceType
- vcpus int64
- memory int64
- booted bool
- probed time.Time
- updated time.Time
- busy time.Time
- unallocated time.Time
- lastUUID string
- running map[string]struct{}
- starting map[string]struct{}
- probing chan struct{}
+ state State
+ instance cloud.Instance
+ executor Executor
+ instType arvados.InstanceType
+ vcpus int64
+ memory int64
+ probed time.Time
+ updated time.Time
+ busy time.Time
+ lastUUID string
+ running map[string]struct{} // remember to update state idle<->running when this changes
+ starting map[string]struct{} // remember to update state idle<->running when this changes
+ probing chan struct{}
}
// Subscribe returns a channel that becomes ready whenever a worker's
u[it] = c
}
for _, wkr := range wp.workers {
- if len(wkr.running)+len(wkr.starting) == 0 && (wkr.state == StateRunning || wkr.state == StateBooting || wkr.state == StateUnknown) {
+ if wkr.state == StateIdle || wkr.state == StateBooting || wkr.state == StateUnknown {
u[wkr.instType]++
}
}
wp.setupOnce.Do(wp.setup)
wp.mtx.Lock()
defer wp.mtx.Unlock()
+ if time.Now().Before(wp.atQuotaUntil) {
+ return wp.atQuotaErr
+ }
tags := cloud.InstanceTags{tagKeyInstanceType: it.Name}
wp.creating[it]++
go func() {
defer wp.mtx.Unlock()
wp.creating[it]--
if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
+ wp.atQuotaErr = err
wp.atQuotaUntil = time.Now().Add(time.Minute)
}
if err != nil {
}).Infof("instance appeared in cloud")
now := time.Now()
wp.workers[id] = &worker{
- executor: wp.newExecutor(inst),
- state: initialState,
- instance: inst,
- instType: it,
- probed: now,
- busy: now,
- updated: now,
- unallocated: now,
- running: make(map[string]struct{}),
- starting: make(map[string]struct{}),
- probing: make(chan struct{}, 1),
+ executor: wp.newExecutor(inst),
+ state: initialState,
+ instance: inst,
+ instType: it,
+ probed: now,
+ busy: now,
+ updated: now,
+ running: make(map[string]struct{}),
+ starting: make(map[string]struct{}),
+ probing: make(chan struct{}, 1),
}
return true
}
defer wp.mtx.Unlock()
logger := wp.logger.WithField("InstanceType", it.Name)
logger.Info("shutdown requested")
- for _, tryState := range []State{StateBooting, StateRunning} {
+ for _, tryState := range []State{StateBooting, StateIdle} {
// TODO: shutdown the worker with the longest idle
- // time (Running) or the earliest create time
- // (Booting)
+ // time (Idle) or the earliest create time (Booting)
for _, wkr := range wp.workers {
- if wkr.state != tryState || len(wkr.running)+len(wkr.starting) > 0 {
- continue
+ if wkr.state == tryState && wkr.instType == it {
+ logger = logger.WithField("Instance", wkr.instance)
+ logger.Info("shutting down")
+ wp.shutdown(wkr, logger)
+ return true
}
- if wkr.instType != it {
- continue
- }
- logger = logger.WithField("Instance", wkr.instance)
- logger.Info("shutting down")
- wp.shutdown(wkr, logger)
- return true
}
}
return false
defer wp.mtx.Unlock()
var wkr *worker
for _, w := range wp.workers {
- if w.instType == it && w.state == StateRunning && len(w.running)+len(w.starting) == 0 {
+ if w.instType == it && w.state == StateIdle {
if wkr == nil || w.busy.After(wkr.busy) {
wkr = w
}
logger = logger.WithField("Instance", wkr.instance)
logger.Debug("starting container")
wkr.starting[ctr.UUID] = struct{}{}
+ wkr.state = StateRunning
go func() {
stdout, stderr, err := wkr.executor.Execute("crunch-run --detach '"+ctr.UUID+"'", nil)
wp.mtx.Lock()
// KillContainer kills the crunch-run process for the given container
// UUID, if it's running on any worker.
+//
+// KillContainer returns immediately; the act of killing the container
+// takes some time, and runs in the background.
func (wp *Pool) KillContainer(uuid string) {
wp.mtx.Lock()
defer wp.mtx.Unlock()
defer wp.mtx.Unlock()
if _, ok := wkr.running[uuid]; ok {
delete(wkr.running, uuid)
+ if wkr.state == StateRunning && len(wkr.running)+len(wkr.starting) == 0 {
+ wkr.state = StateIdle
+ }
wkr.updated = time.Now()
go wp.notify()
}
// caller must have lock.
func (wp *Pool) shutdownIfIdle(wkr *worker) bool {
- if len(wkr.running)+len(wkr.starting) > 0 || wkr.state != StateRunning {
+ if wkr.state != StateIdle {
return false
}
- age := time.Since(wkr.unallocated)
+ age := time.Since(wkr.busy)
if age < wp.timeoutIdle {
return false
}
ArvadosInstanceType: w.instType.Name,
ProviderInstanceType: w.instType.ProviderType,
LastContainerUUID: w.lastUUID,
- Unallocated: w.unallocated,
+ LastBusy: w.busy,
WorkerState: w.state.String(),
})
}
logger := wp.logger.WithField("Instance", wkr.instance)
wp.mtx.Lock()
updated := wkr.updated
- booted := wkr.booted
+ needProbeRunning := wkr.state == StateRunning || wkr.state == StateIdle
+ needProbeBooted := wkr.state == StateUnknown || wkr.state == StateBooting
wp.mtx.Unlock()
+ if !needProbeBooted && !needProbeRunning {
+ return
+ }
var (
ctrUUIDs []string
ok bool
stderr []byte
)
- if !booted {
- booted, stderr = wp.probeBooted(wkr)
+ if needProbeBooted {
+ ok, stderr = wp.probeBooted(wkr)
wp.mtx.Lock()
- if booted && !wkr.booted {
- wkr.booted = booted
- logger.Info("instance booted")
- } else {
- booted = wkr.booted
+ if ok || wkr.state == StateRunning || wkr.state == StateIdle {
+ logger.Info("instance booted; will try probeRunning")
+ needProbeRunning = true
}
wp.mtx.Unlock()
}
- if booted {
+ if needProbeRunning {
ctrUUIDs, ok, stderr = wp.probeRunning(wkr)
}
logger = logger.WithField("stderr", string(stderr))
wp.mtx.Lock()
defer wp.mtx.Unlock()
if !ok {
- if wkr.state == StateShutdown {
+ if wkr.state == StateShutdown && wkr.updated.After(updated) {
+ // Skip the logging noise if shutdown was
+ // initiated during probe.
return
}
dur := time.Since(wkr.probed)
updateTime := time.Now()
wkr.probed = updateTime
- if wkr.state == StateShutdown || wkr.state == StateHold {
- } else if booted {
- if wkr.state != StateRunning {
- wkr.state = StateRunning
- go wp.notify()
- }
- } else {
- wkr.state = StateBooting
- }
if updated != wkr.updated {
// Worker was updated after the probe began, so
wkr.busy = updateTime
wkr.lastUUID = ctrUUIDs[0]
} else if len(wkr.running) > 0 {
- wkr.unallocated = updateTime
+ // Actual last-busy time was sometime between wkr.busy
+ // and now. Now is the earliest opportunity to take
+ // advantage of the non-busy state, though.
+ wkr.busy = updateTime
}
running := map[string]struct{}{}
changed := false
changed = true
}
}
+ if wkr.state == StateUnknown || wkr.state == StateBooting {
+ wkr.state = StateIdle
+ changed = true
+ }
if changed {
wkr.running = running
+ if wkr.state == StateIdle && len(wkr.starting)+len(wkr.running) > 0 {
+ wkr.state = StateRunning
+ } else if wkr.state == StateRunning && len(wkr.starting)+len(wkr.running) == 0 {
+ wkr.state = StateIdle
+ }
wkr.updated = updateTime
go wp.notify()
}