package worker
import (
- "bytes"
"io"
"sort"
"strings"
ArvadosInstanceType string
ProviderInstanceType string
LastContainerUUID string
- Unallocated time.Time
+ LastBusy time.Time
WorkerState string
}
defaultTimeoutIdle = time.Minute
defaultTimeoutBooting = time.Minute * 10
defaultTimeoutProbe = time.Minute * 10
+ defaultTimeoutShutdown = time.Second * 10
)
func duration(conf arvados.Duration, def time.Duration) time.Duration {
timeoutIdle: duration(cluster.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
timeoutBooting: duration(cluster.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
timeoutProbe: duration(cluster.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
+ timeoutShutdown: duration(cluster.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
+ stop: make(chan bool),
}
wp.registerMetrics(reg)
go func() {
timeoutIdle time.Duration
timeoutBooting time.Duration
timeoutProbe time.Duration
+ timeoutShutdown time.Duration
// private state
subscribers map[<-chan struct{}]chan<- struct{}
- creating map[arvados.InstanceType]int // goroutines waiting for (InstanceSet)Create to return
+ creating map[arvados.InstanceType][]time.Time // start times of unfinished (InstanceSet)Create calls
workers map[cloud.InstanceID]*worker
loaded bool // loaded list of instances from InstanceSet at least once
exited map[string]time.Time // containers whose crunch-run proc has exited, but KillContainer has not been called
atQuotaUntil time.Time
+ atQuotaErr cloud.QuotaError
stop chan bool
mtx sync.RWMutex
setupOnce sync.Once
mMemoryInuse prometheus.Gauge
}
-type worker struct {
- state State
- instance cloud.Instance
- executor Executor
- instType arvados.InstanceType
- vcpus int64
- memory int64
- booted bool
- probed time.Time
- updated time.Time
- busy time.Time
- unallocated time.Time
- lastUUID string
- running map[string]struct{}
- starting map[string]struct{}
- probing chan struct{}
-}
-
// Subscribe returns a channel that becomes ready whenever a worker's
// state changes.
//
wp.setupOnce.Do(wp.setup)
wp.mtx.RLock()
defer wp.mtx.RUnlock()
- u := map[arvados.InstanceType]int{}
- for it, c := range wp.creating {
- u[it] = c
+ unalloc := map[arvados.InstanceType]int{}
+ creating := map[arvados.InstanceType]int{}
+ for it, times := range wp.creating {
+ creating[it] = len(times)
}
for _, wkr := range wp.workers {
- if len(wkr.running)+len(wkr.starting) == 0 && (wkr.state == StateRunning || wkr.state == StateBooting || wkr.state == StateUnknown) {
- u[wkr.instType]++
+ if !(wkr.state == StateIdle || wkr.state == StateBooting || wkr.state == StateUnknown) {
+ continue
+ }
+ it := wkr.instType
+ unalloc[it]++
+ if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(wp.creating[it][0]) {
+ // If up to N new workers appear in
+ // Instances() while we are waiting for N
+ // Create() calls to complete, we assume we're
+ // just seeing a race between Instances() and
+ // Create() responses.
+ //
+ // The other common reason why nodes have
+ // state==Unknown is that they appeared at
+ // startup, before any Create calls. They
+ // don't match the above timing condition, so
+ // we never mistakenly attribute them to
+ // pending Create calls.
+ creating[it]--
}
}
- return u
+ for it, c := range creating {
+ unalloc[it] += c
+ }
+ return unalloc
}
// Create a new instance with the given type, and add it to the worker
wp.setupOnce.Do(wp.setup)
wp.mtx.Lock()
defer wp.mtx.Unlock()
+ if time.Now().Before(wp.atQuotaUntil) {
+ return wp.atQuotaErr
+ }
tags := cloud.InstanceTags{tagKeyInstanceType: it.Name}
- wp.creating[it]++
+ now := time.Now()
+ wp.creating[it] = append(wp.creating[it], now)
go func() {
defer wp.notify()
inst, err := wp.instanceSet.Create(it, wp.imageID, tags, nil)
wp.mtx.Lock()
defer wp.mtx.Unlock()
- wp.creating[it]--
+ // Remove our timestamp marker from wp.creating
+ for i, t := range wp.creating[it] {
+ if t == now {
+ copy(wp.creating[it][i:], wp.creating[it][i+1:])
+ wp.creating[it] = wp.creating[it][:len(wp.creating[it])-1]
+ break
+ }
+ }
if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
+ wp.atQuotaErr = err
wp.atQuotaUntil = time.Now().Add(time.Minute)
}
if err != nil {
}
// Add or update worker attached to the given instance. Use
-// initialState if a new worker is created. Caller must have lock.
+// initialState if a new worker is created.
+//
+// The second return value is true if a new worker is created.
//
-// Returns true when a new worker is created.
-func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initialState State) bool {
+// Caller must have lock.
+func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initialState State) (*worker, bool) {
id := inst.ID()
- if wp.workers[id] != nil {
- wp.workers[id].executor.SetTarget(inst)
- wp.workers[id].instance = inst
- wp.workers[id].updated = time.Now()
- if initialState == StateBooting && wp.workers[id].state == StateUnknown {
- wp.workers[id].state = StateBooting
+ if wkr := wp.workers[id]; wkr != nil {
+ wkr.executor.SetTarget(inst)
+ wkr.instance = inst
+ wkr.updated = time.Now()
+ if initialState == StateBooting && wkr.state == StateUnknown {
+ wkr.state = StateBooting
}
- return false
+ return wkr, false
}
if initialState == StateUnknown && inst.Tags()[tagKeyHold] != "" {
initialState = StateHold
}
- wp.logger.WithFields(logrus.Fields{
+ logger := wp.logger.WithFields(logrus.Fields{
"InstanceType": it.Name,
"Instance": inst,
- "State": initialState,
- }).Infof("instance appeared in cloud")
+ })
+ logger.WithField("State", initialState).Infof("instance appeared in cloud")
now := time.Now()
- wp.workers[id] = &worker{
- executor: wp.newExecutor(inst),
- state: initialState,
- instance: inst,
- instType: it,
- probed: now,
- busy: now,
- updated: now,
- unallocated: now,
- running: make(map[string]struct{}),
- starting: make(map[string]struct{}),
- probing: make(chan struct{}, 1),
- }
- return true
+ wkr := &worker{
+ mtx: &wp.mtx,
+ wp: wp,
+ logger: logger,
+ executor: wp.newExecutor(inst),
+ state: initialState,
+ instance: inst,
+ instType: it,
+ appeared: now,
+ probed: now,
+ busy: now,
+ updated: now,
+ running: make(map[string]struct{}),
+ starting: make(map[string]struct{}),
+ probing: make(chan struct{}, 1),
+ }
+ wp.workers[id] = wkr
+ return wkr, true
+}
+
+// caller must have lock.
+func (wp *Pool) notifyExited(uuid string, t time.Time) {
+ wp.exited[uuid] = t
}
// Shutdown shuts down a worker with the given type, or returns false
defer wp.mtx.Unlock()
logger := wp.logger.WithField("InstanceType", it.Name)
logger.Info("shutdown requested")
- for _, tryState := range []State{StateBooting, StateRunning} {
+ for _, tryState := range []State{StateBooting, StateIdle} {
// TODO: shutdown the worker with the longest idle
- // time (Running) or the earliest create time
- // (Booting)
+ // time (Idle) or the earliest create time (Booting)
for _, wkr := range wp.workers {
- if wkr.state != tryState || len(wkr.running)+len(wkr.starting) > 0 {
- continue
+ if wkr.state == tryState && wkr.instType == it {
+ logger.WithField("Instance", wkr.instance).Info("shutting down")
+ wkr.shutdown()
+ return true
}
- if wkr.instType != it {
- continue
- }
- logger = logger.WithField("Instance", wkr.instance)
- logger.Info("shutting down")
- wp.shutdown(wkr, logger)
- return true
}
}
return false
}
-// caller must have lock
-func (wp *Pool) shutdown(wkr *worker, logger logrus.FieldLogger) {
- wkr.updated = time.Now()
- wkr.state = StateShutdown
- go func() {
- err := wkr.instance.Destroy()
- if err != nil {
- logger.WithError(err).WithField("Instance", wkr.instance).Warn("shutdown failed")
- return
- }
- wp.mtx.Lock()
- wp.atQuotaUntil = time.Now()
- wp.mtx.Unlock()
- wp.notify()
- }()
-}
-
-// Workers returns the current number of workers in each state.
-func (wp *Pool) Workers() map[State]int {
+// CountWorkers returns the current number of workers in each state.
+func (wp *Pool) CountWorkers() map[State]int {
wp.setupOnce.Do(wp.setup)
wp.mtx.Lock()
defer wp.mtx.Unlock()
// StartContainer starts a container on an idle worker immediately if
// possible, otherwise returns false.
func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
- logger := wp.logger.WithFields(logrus.Fields{
- "InstanceType": it.Name,
- "ContainerUUID": ctr.UUID,
- "Priority": ctr.Priority,
- })
wp.setupOnce.Do(wp.setup)
wp.mtx.Lock()
defer wp.mtx.Unlock()
var wkr *worker
for _, w := range wp.workers {
- if w.instType == it && w.state == StateRunning && len(w.running)+len(w.starting) == 0 {
+ if w.instType == it && w.state == StateIdle {
if wkr == nil || w.busy.After(wkr.busy) {
wkr = w
}
if wkr == nil {
return false
}
- logger = logger.WithField("Instance", wkr.instance)
- logger.Debug("starting container")
- wkr.starting[ctr.UUID] = struct{}{}
- go func() {
- stdout, stderr, err := wkr.executor.Execute("crunch-run --detach '"+ctr.UUID+"'", nil)
- wp.mtx.Lock()
- defer wp.mtx.Unlock()
- now := time.Now()
- wkr.updated = now
- wkr.busy = now
- delete(wkr.starting, ctr.UUID)
- wkr.running[ctr.UUID] = struct{}{}
- wkr.lastUUID = ctr.UUID
- if err != nil {
- logger.WithField("stdout", string(stdout)).
- WithField("stderr", string(stderr)).
- WithError(err).
- Error("error starting crunch-run process")
- // Leave uuid in wkr.running, though: it's
- // possible the error was just a communication
- // failure and the process was in fact
- // started. Wait for next probe to find out.
- return
- }
- logger.Info("crunch-run process started")
- wkr.lastUUID = ctr.UUID
- }()
+ wkr.startContainer(ctr)
return true
}
"Instance": wkr.instance,
})
logger.Debug("killing process")
- stdout, stderr, err := wkr.executor.Execute("crunch-run --kill "+uuid, nil)
+ stdout, stderr, err := wkr.executor.Execute("crunch-run --kill 15 "+uuid, nil)
if err != nil {
logger.WithFields(logrus.Fields{
"stderr": string(stderr),
defer wp.mtx.Unlock()
if _, ok := wkr.running[uuid]; ok {
delete(wkr.running, uuid)
+ if wkr.state == StateRunning && len(wkr.running)+len(wkr.starting) == 0 {
+ wkr.state = StateIdle
+ }
wkr.updated = time.Now()
go wp.notify()
}
workers = workers[:0]
wp.mtx.Lock()
for id, wkr := range wp.workers {
- if wkr.state == StateShutdown || wp.shutdownIfIdle(wkr) {
+ if wkr.state == StateShutdown || wkr.shutdownIfIdle() {
continue
}
workers = append(workers, id)
wp.mtx.Lock()
wkr, ok := wp.workers[id]
wp.mtx.Unlock()
- if !ok || wkr.state == StateShutdown {
- // Deleted/shutdown while we
- // were probing others
+ if !ok {
+ // Deleted while we were probing
+ // others
continue
}
- select {
- case wkr.probing <- struct{}{}:
- go func() {
- wp.probeAndUpdate(wkr)
- <-wkr.probing
- }()
- default:
- wp.logger.WithField("Instance", wkr.instance).Debug("still waiting for last probe to finish")
- }
+ go wkr.ProbeAndUpdate()
select {
case <-wp.stop:
return
}
}
-// caller must have lock.
-func (wp *Pool) shutdownIfBroken(wkr *worker, dur time.Duration) {
- if wkr.state == StateHold {
- return
- }
- label, threshold := "", wp.timeoutProbe
- if wkr.state == StateBooting {
- label, threshold = "new ", wp.timeoutBooting
- }
- if dur < threshold {
- return
- }
- wp.logger.WithFields(logrus.Fields{
- "Instance": wkr.instance,
- "Duration": dur,
- "Since": wkr.probed,
- "State": wkr.state,
- }).Warnf("%sinstance unresponsive, shutting down", label)
- wp.shutdown(wkr, wp.logger)
-}
-
-// caller must have lock.
-func (wp *Pool) shutdownIfIdle(wkr *worker) bool {
- if len(wkr.running)+len(wkr.starting) > 0 || wkr.state != StateRunning {
- return false
- }
- age := time.Since(wkr.unallocated)
- if age < wp.timeoutIdle {
- return false
- }
- logger := wp.logger.WithFields(logrus.Fields{
- "Age": age,
- "Instance": wkr.instance,
- })
- logger.Info("shutdown idle worker")
- wp.shutdown(wkr, logger)
- return true
-}
-
// Stop synchronizing with the InstanceSet.
func (wp *Pool) Stop() {
wp.setupOnce.Do(wp.setup)
ArvadosInstanceType: w.instType.Name,
ProviderInstanceType: w.instType.ProviderType,
LastContainerUUID: w.lastUUID,
- Unallocated: w.unallocated,
+ LastBusy: w.busy,
WorkerState: w.state.String(),
})
}
}
func (wp *Pool) setup() {
- wp.creating = map[arvados.InstanceType]int{}
+ wp.creating = map[arvados.InstanceType][]time.Time{}
wp.exited = map[string]time.Time{}
wp.workers = map[cloud.InstanceID]*worker{}
wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
wp.logger.WithField("Instance", inst).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
continue
}
- if wp.updateWorker(inst, it, StateUnknown) {
+ if wkr, isNew := wp.updateWorker(inst, it, StateUnknown); isNew {
notify = true
+ } else if wkr.state == StateShutdown && time.Since(wkr.destroyed) > wp.timeoutShutdown {
+ wp.logger.WithField("Instance", inst).Info("worker still listed after shutdown; retrying")
+ wkr.shutdown()
}
}
go wp.notify()
}
}
-
-// should be called in a new goroutine
-func (wp *Pool) probeAndUpdate(wkr *worker) {
- logger := wp.logger.WithField("Instance", wkr.instance)
- wp.mtx.Lock()
- updated := wkr.updated
- booted := wkr.booted
- wp.mtx.Unlock()
-
- var (
- ctrUUIDs []string
- ok bool
- stderr []byte
- )
- if !booted {
- booted, stderr = wp.probeBooted(wkr)
- wp.mtx.Lock()
- if booted && !wkr.booted {
- wkr.booted = booted
- logger.Info("instance booted")
- } else {
- booted = wkr.booted
- }
- wp.mtx.Unlock()
- }
- if booted {
- ctrUUIDs, ok, stderr = wp.probeRunning(wkr)
- }
- logger = logger.WithField("stderr", string(stderr))
- wp.mtx.Lock()
- defer wp.mtx.Unlock()
- if !ok {
- if wkr.state == StateShutdown {
- return
- }
- dur := time.Since(wkr.probed)
- logger := logger.WithFields(logrus.Fields{
- "Duration": dur,
- "State": wkr.state,
- })
- if wkr.state == StateBooting {
- logger.Debug("new instance not responding")
- } else {
- logger.Info("instance not responding")
- }
- wp.shutdownIfBroken(wkr, dur)
- return
- }
-
- updateTime := time.Now()
- wkr.probed = updateTime
- if wkr.state == StateShutdown || wkr.state == StateHold {
- } else if booted {
- if wkr.state != StateRunning {
- wkr.state = StateRunning
- go wp.notify()
- }
- } else {
- wkr.state = StateBooting
- }
-
- if updated != wkr.updated {
- // Worker was updated after the probe began, so
- // wkr.running might have a container UUID that was
- // not yet running when ctrUUIDs was generated. Leave
- // wkr.running alone and wait for the next probe to
- // catch up on any changes.
- return
- }
-
- if len(ctrUUIDs) > 0 {
- wkr.busy = updateTime
- wkr.lastUUID = ctrUUIDs[0]
- } else if len(wkr.running) > 0 {
- wkr.unallocated = updateTime
- }
- running := map[string]struct{}{}
- changed := false
- for _, uuid := range ctrUUIDs {
- running[uuid] = struct{}{}
- if _, ok := wkr.running[uuid]; !ok {
- changed = true
- }
- }
- for uuid := range wkr.running {
- if _, ok := running[uuid]; !ok {
- logger.WithField("ContainerUUID", uuid).Info("crunch-run process ended")
- wp.exited[uuid] = updateTime
- changed = true
- }
- }
- if changed {
- wkr.running = running
- wkr.updated = updateTime
- go wp.notify()
- }
-}
-
-func (wp *Pool) probeRunning(wkr *worker) (running []string, ok bool, stderr []byte) {
- cmd := "crunch-run --list"
- stdout, stderr, err := wkr.executor.Execute(cmd, nil)
- if err != nil {
- wp.logger.WithFields(logrus.Fields{
- "Instance": wkr.instance,
- "Command": cmd,
- "stdout": string(stdout),
- "stderr": string(stderr),
- }).WithError(err).Warn("probe failed")
- return nil, false, stderr
- }
- stdout = bytes.TrimRight(stdout, "\n")
- if len(stdout) == 0 {
- return nil, true, stderr
- }
- return strings.Split(string(stdout), "\n"), true, stderr
-}
-
-func (wp *Pool) probeBooted(wkr *worker) (ok bool, stderr []byte) {
- cmd := wp.bootProbeCommand
- if cmd == "" {
- cmd = "true"
- }
- stdout, stderr, err := wkr.executor.Execute(cmd, nil)
- logger := wp.logger.WithFields(logrus.Fields{
- "Instance": wkr.instance,
- "Command": cmd,
- "stdout": string(stdout),
- "stderr": string(stderr),
- })
- if err != nil {
- logger.WithError(err).Debug("boot probe failed")
- return false, stderr
- }
- logger.Info("boot probe succeeded")
- return true, stderr
-}