1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
14 "git.curoverse.com/arvados.git/lib/cloud"
15 "git.curoverse.com/arvados.git/sdk/go/arvados"
16 "git.curoverse.com/arvados.git/sdk/go/stats"
17 "github.com/sirupsen/logrus"
22 maxPingFailTime = 10 * time.Minute
25 // State indicates whether a worker is available to do work, and (if
26 // not) whether/when it is expected to become ready.
30 StateUnknown State = iota // might be running a container already
31 StateBooting // instance is booting
32 StateIdle // instance booted, no containers are running
33 StateRunning // instance is running one or more containers
34 StateShutdown // worker has stopped monitoring the instance
37 var stateString = map[State]string{
38 StateUnknown: "unknown",
39 StateBooting: "booting",
41 StateRunning: "running",
42 StateShutdown: "shutdown",
45 // String implements fmt.Stringer.
46 func (s State) String() string {
50 // MarshalText implements encoding.TextMarshaler so a JSON encoding of
51 // map[State]anything uses the state's string representation.
52 func (s State) MarshalText() ([]byte, error) {
53 return []byte(stateString[s]), nil
56 // IdleBehavior indicates the behavior desired when a node becomes idle.
57 type IdleBehavior string
60 IdleBehaviorRun IdleBehavior = "run" // run containers, or shutdown on idle timeout
61 IdleBehaviorHold IdleBehavior = "hold" // don't shutdown or run more containers
62 IdleBehaviorDrain IdleBehavior = "drain" // shutdown immediately when idle
65 var validIdleBehavior = map[IdleBehavior]bool{
66 IdleBehaviorRun: true,
67 IdleBehaviorHold: true,
68 IdleBehaviorDrain: true,
72 logger logrus.FieldLogger
76 mtx sync.Locker // must be wp's Locker.
78 idleBehavior IdleBehavior
79 instance cloud.Instance
80 instType arvados.InstanceType
89 running map[string]*remoteRunner // remember to update state idle<->running when this changes
90 starting map[string]*remoteRunner // remember to update state idle<->running when this changes
94 func (wkr *worker) onUnkillable(uuid string) {
96 defer wkr.mtx.Unlock()
97 logger := wkr.logger.WithField("ContainerUUID", uuid)
98 if wkr.idleBehavior == IdleBehaviorHold {
99 logger.Warn("unkillable container, but worker has IdleBehavior=Hold")
102 logger.Warn("unkillable container, draining worker")
103 wkr.setIdleBehavior(IdleBehaviorDrain)
106 func (wkr *worker) onKilled(uuid string) {
108 defer wkr.mtx.Unlock()
109 wkr.closeRunner(uuid)
113 // caller must have lock.
114 func (wkr *worker) setIdleBehavior(idleBehavior IdleBehavior) {
115 wkr.logger.WithField("IdleBehavior", idleBehavior).Info("set idle behavior")
116 wkr.idleBehavior = idleBehavior
121 // caller must have lock.
122 func (wkr *worker) startContainer(ctr arvados.Container) {
123 logger := wkr.logger.WithFields(logrus.Fields{
124 "ContainerUUID": ctr.UUID,
125 "Priority": ctr.Priority,
127 logger.Debug("starting container")
128 rr := newRemoteRunner(ctr.UUID, wkr)
129 wkr.starting[ctr.UUID] = rr
130 if wkr.state != StateRunning {
131 wkr.state = StateRunning
137 defer wkr.mtx.Unlock()
141 delete(wkr.starting, ctr.UUID)
142 wkr.running[ctr.UUID] = rr
143 wkr.lastUUID = ctr.UUID
147 // ProbeAndUpdate conducts appropriate boot/running probes (if any)
148 // for the worker's curent state. If a previous probe is still
149 // running, it does nothing.
151 // It should be called in a new goroutine.
152 func (wkr *worker) ProbeAndUpdate() {
154 case wkr.probing <- struct{}{}:
158 wkr.logger.Debug("still waiting for last probe to finish")
162 // probeAndUpdate calls probeBooted and/or probeRunning if needed, and
163 // updates state accordingly.
165 // In StateUnknown: Call both probeBooted and probeRunning.
166 // In StateBooting: Call probeBooted; if successful, call probeRunning.
167 // In StateRunning: Call probeRunning.
168 // In StateIdle: Call probeRunning.
169 // In StateShutdown: Do nothing.
171 // If both probes succeed, wkr.state changes to
172 // StateIdle/StateRunning.
174 // If probeRunning succeeds, wkr.running is updated. (This means
175 // wkr.running might be non-empty even in StateUnknown, if the boot
178 // probeAndUpdate should be called in a new goroutine.
179 func (wkr *worker) probeAndUpdate() {
181 updated := wkr.updated
182 initialState := wkr.state
189 stderr []byte // from probeBooted
192 switch initialState {
195 case StateIdle, StateRunning:
197 case StateUnknown, StateBooting:
199 panic(fmt.Sprintf("unknown state %s", initialState))
202 probeStart := time.Now()
203 logger := wkr.logger.WithField("ProbeStart", probeStart)
206 booted, stderr = wkr.probeBooted()
208 // Pretend this probe succeeded if another
209 // concurrent attempt succeeded.
211 booted = wkr.state == StateRunning || wkr.state == StateIdle
215 logger.Info("instance booted; will try probeRunning")
218 if booted || wkr.state == StateUnknown {
219 ctrUUIDs, ok = wkr.probeRunning()
222 defer wkr.mtx.Unlock()
223 if !ok || (!booted && len(ctrUUIDs) == 0 && len(wkr.running) == 0) {
224 if wkr.state == StateShutdown && wkr.updated.After(updated) {
225 // Skip the logging noise if shutdown was
226 // initiated during probe.
229 // Using the start time of the probe as the timeout
230 // threshold ensures we always initiate at least one
231 // probe attempt after the boot/probe timeout expires
232 // (otherwise, a slow probe failure could cause us to
233 // shutdown an instance even though it did in fact
234 // boot/recover before the timeout expired).
235 dur := probeStart.Sub(wkr.probed)
236 if wkr.shutdownIfBroken(dur) {
237 // stderr from failed run-probes will have
238 // been logged already, but boot-probe
239 // failures are normal so they are logged only
240 // at Debug level. This is our chance to log
241 // some evidence about why the node never
242 // booted, even in non-debug mode.
244 logger.WithFields(logrus.Fields{
246 "stderr": string(stderr),
247 }).Info("boot failed")
253 updateTime := time.Now()
254 wkr.probed = updateTime
256 if updated != wkr.updated {
257 // Worker was updated after the probe began, so
258 // wkr.running might have a container UUID that was
259 // not yet running when ctrUUIDs was generated. Leave
260 // wkr.running alone and wait for the next probe to
261 // catch up on any changes.
265 if len(ctrUUIDs) > 0 {
266 wkr.busy = updateTime
267 wkr.lastUUID = ctrUUIDs[0]
268 } else if len(wkr.running) > 0 {
269 // Actual last-busy time was sometime between wkr.busy
270 // and now. Now is the earliest opportunity to take
271 // advantage of the non-busy state, though.
272 wkr.busy = updateTime
275 changed := wkr.updateRunning(ctrUUIDs)
277 // Update state if this was the first successful boot-probe.
278 if booted && (wkr.state == StateUnknown || wkr.state == StateBooting) {
279 // Note: this will change again below if
280 // len(wkr.starting)+len(wkr.running) > 0.
281 wkr.state = StateIdle
285 // If wkr.state and wkr.running aren't changing then there's
286 // no need to log anything, notify the scheduler, move state
287 // back and forth between idle/running, etc.
292 // Log whenever a run-probe reveals crunch-run processes
293 // appearing/disappearing before boot-probe succeeds.
294 if wkr.state == StateUnknown && changed {
295 logger.WithFields(logrus.Fields{
296 "RunningContainers": len(wkr.running),
298 }).Info("crunch-run probe succeeded, but boot probe is still failing")
301 if wkr.state == StateIdle && len(wkr.starting)+len(wkr.running) > 0 {
302 wkr.state = StateRunning
303 } else if wkr.state == StateRunning && len(wkr.starting)+len(wkr.running) == 0 {
304 wkr.state = StateIdle
306 wkr.updated = updateTime
307 if booted && (initialState == StateUnknown || initialState == StateBooting) {
308 logger.WithFields(logrus.Fields{
309 "RunningContainers": len(wkr.running),
311 }).Info("probes succeeded, instance is in service")
316 func (wkr *worker) probeRunning() (running []string, ok bool) {
317 cmd := "crunch-run --list"
318 if u := wkr.instance.RemoteUser(); u != "root" {
321 stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
323 wkr.logger.WithFields(logrus.Fields{
325 "stdout": string(stdout),
326 "stderr": string(stderr),
327 }).WithError(err).Warn("probe failed")
330 stdout = bytes.TrimRight(stdout, "\n")
331 if len(stdout) == 0 {
334 return strings.Split(string(stdout), "\n"), true
337 func (wkr *worker) probeBooted() (ok bool, stderr []byte) {
338 cmd := wkr.wp.bootProbeCommand
342 stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
343 logger := wkr.logger.WithFields(logrus.Fields{
345 "stdout": string(stdout),
346 "stderr": string(stderr),
349 logger.WithError(err).Debug("boot probe failed")
352 logger.Info("boot probe succeeded")
356 // caller must have lock.
357 func (wkr *worker) shutdownIfBroken(dur time.Duration) bool {
358 if wkr.idleBehavior == IdleBehaviorHold {
362 label, threshold := "", wkr.wp.timeoutProbe
363 if wkr.state == StateUnknown || wkr.state == StateBooting {
364 label, threshold = "new ", wkr.wp.timeoutBooting
369 wkr.logger.WithFields(logrus.Fields{
373 }).Warnf("%sinstance unresponsive, shutting down", label)
378 // Returns true if the instance is eligible for shutdown: either it's
379 // been idle too long, or idleBehavior=Drain and nothing is running.
381 // caller must have lock.
382 func (wkr *worker) eligibleForShutdown() bool {
383 if wkr.idleBehavior == IdleBehaviorHold {
386 draining := wkr.idleBehavior == IdleBehaviorDrain
391 return draining || time.Since(wkr.busy) >= wkr.wp.timeoutIdle
396 for _, rr := range wkr.running {
401 for _, rr := range wkr.starting {
406 // draining, and all remaining runners are just trying
407 // to force-kill their crunch-run procs
414 // caller must have lock.
415 func (wkr *worker) shutdownIfIdle() bool {
416 if !wkr.eligibleForShutdown() {
419 wkr.logger.WithFields(logrus.Fields{
421 "IdleDuration": stats.Duration(time.Since(wkr.busy)),
422 "IdleBehavior": wkr.idleBehavior,
423 }).Info("shutdown worker")
428 // caller must have lock.
429 func (wkr *worker) shutdown() {
433 wkr.state = StateShutdown
436 err := wkr.instance.Destroy()
438 wkr.logger.WithError(err).Warn("shutdown failed")
444 // Save worker tags to cloud provider metadata, if they don't already
445 // match. Caller must have lock.
446 func (wkr *worker) saveTags() {
447 instance := wkr.instance
448 tags := instance.Tags()
449 update := cloud.InstanceTags{
450 tagKeyInstanceType: wkr.instType.Name,
451 tagKeyIdleBehavior: string(wkr.idleBehavior),
454 for k, v := range update {
462 err := instance.SetTags(tags)
464 wkr.wp.logger.WithField("Instance", instance.ID()).WithError(err).Warnf("error updating tags")
470 func (wkr *worker) Close() {
471 // This might take time, so do it after unlocking mtx.
472 defer wkr.executor.Close()
475 defer wkr.mtx.Unlock()
476 for uuid, rr := range wkr.running {
477 wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process abandoned")
480 for uuid, rr := range wkr.starting {
481 wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process abandoned")
486 // Add/remove entries in wkr.running to match ctrUUIDs returned by a
487 // probe. Returns true if anything was added or removed.
489 // Caller must have lock.
490 func (wkr *worker) updateRunning(ctrUUIDs []string) (changed bool) {
491 alive := map[string]bool{}
492 for _, uuid := range ctrUUIDs {
494 if _, ok := wkr.running[uuid]; ok {
496 } else if rr, ok := wkr.starting[uuid]; ok {
497 wkr.running[uuid] = rr
498 delete(wkr.starting, uuid)
501 // We didn't start it -- it must have been
502 // started by a previous dispatcher process.
503 wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process detected")
504 wkr.running[uuid] = newRemoteRunner(uuid, wkr)
508 for uuid := range wkr.running {
510 wkr.closeRunner(uuid)
517 // caller must have lock.
518 func (wkr *worker) closeRunner(uuid string) {
519 rr := wkr.running[uuid]
523 wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process ended")
524 delete(wkr.running, uuid)
529 wkr.wp.exited[uuid] = now
530 if wkr.state == StateRunning && len(wkr.running)+len(wkr.starting) == 0 {
531 wkr.state = StateIdle