1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
15 "git.arvados.org/arvados.git/lib/cloud"
16 "git.arvados.org/arvados.git/sdk/go/arvados"
17 "git.arvados.org/arvados.git/sdk/go/stats"
18 "github.com/sirupsen/logrus"
23 maxPingFailTime = 10 * time.Minute
26 // State indicates whether a worker is available to do work, and (if
27 // not) whether/when it is expected to become ready.
31 StateUnknown State = iota // might be running a container already
32 StateBooting // instance is booting
33 StateIdle // instance booted, no containers are running
34 StateRunning // instance is running one or more containers
35 StateShutdown // worker has stopped monitoring the instance
38 var stateString = map[State]string{
39 StateUnknown: "unknown",
40 StateBooting: "booting",
42 StateRunning: "running",
43 StateShutdown: "shutdown",
46 // String implements fmt.Stringer.
47 func (s State) String() string {
51 // MarshalText implements encoding.TextMarshaler so a JSON encoding of
52 // map[State]anything uses the state's string representation.
53 func (s State) MarshalText() ([]byte, error) {
54 return []byte(stateString[s]), nil
57 // IdleBehavior indicates the behavior desired when a node becomes idle.
58 type IdleBehavior string
61 IdleBehaviorRun IdleBehavior = "run" // run containers, or shutdown on idle timeout
62 IdleBehaviorHold IdleBehavior = "hold" // don't shutdown or run more containers
63 IdleBehaviorDrain IdleBehavior = "drain" // shutdown immediately when idle
66 var validIdleBehavior = map[IdleBehavior]bool{
67 IdleBehaviorRun: true,
68 IdleBehaviorHold: true,
69 IdleBehaviorDrain: true,
73 logger logrus.FieldLogger
77 mtx sync.Locker // must be wp's Locker.
79 idleBehavior IdleBehavior
80 instance cloud.Instance
81 instType arvados.InstanceType
90 running map[string]*remoteRunner // remember to update state idle<->running when this changes
91 starting map[string]*remoteRunner // remember to update state idle<->running when this changes
95 func (wkr *worker) onUnkillable(uuid string) {
97 defer wkr.mtx.Unlock()
98 logger := wkr.logger.WithField("ContainerUUID", uuid)
99 if wkr.idleBehavior == IdleBehaviorHold {
100 logger.Warn("unkillable container, but worker has IdleBehavior=Hold")
103 logger.Warn("unkillable container, draining worker")
104 wkr.setIdleBehavior(IdleBehaviorDrain)
107 func (wkr *worker) onKilled(uuid string) {
109 defer wkr.mtx.Unlock()
110 wkr.closeRunner(uuid)
114 // caller must have lock.
115 func (wkr *worker) setIdleBehavior(idleBehavior IdleBehavior) {
116 wkr.logger.WithField("IdleBehavior", idleBehavior).Info("set idle behavior")
117 wkr.idleBehavior = idleBehavior
122 // caller must have lock.
123 func (wkr *worker) startContainer(ctr arvados.Container) {
124 logger := wkr.logger.WithFields(logrus.Fields{
125 "ContainerUUID": ctr.UUID,
126 "Priority": ctr.Priority,
128 logger.Debug("starting container")
129 rr := newRemoteRunner(ctr.UUID, wkr)
130 wkr.starting[ctr.UUID] = rr
131 if wkr.state != StateRunning {
132 wkr.state = StateRunning
138 defer wkr.mtx.Unlock()
142 delete(wkr.starting, ctr.UUID)
143 wkr.running[ctr.UUID] = rr
144 wkr.lastUUID = ctr.UUID
148 // ProbeAndUpdate conducts appropriate boot/running probes (if any)
149 // for the worker's curent state. If a previous probe is still
150 // running, it does nothing.
152 // It should be called in a new goroutine.
153 func (wkr *worker) ProbeAndUpdate() {
155 case wkr.probing <- struct{}{}:
159 wkr.logger.Debug("still waiting for last probe to finish")
163 // probeAndUpdate calls probeBooted and/or probeRunning if needed, and
164 // updates state accordingly.
166 // In StateUnknown: Call both probeBooted and probeRunning.
167 // In StateBooting: Call probeBooted; if successful, call probeRunning.
168 // In StateRunning: Call probeRunning.
169 // In StateIdle: Call probeRunning.
170 // In StateShutdown: Do nothing.
172 // If both probes succeed, wkr.state changes to
173 // StateIdle/StateRunning.
175 // If probeRunning succeeds, wkr.running is updated. (This means
176 // wkr.running might be non-empty even in StateUnknown, if the boot
179 // probeAndUpdate should be called in a new goroutine.
180 func (wkr *worker) probeAndUpdate() {
182 updated := wkr.updated
183 initialState := wkr.state
190 stderr []byte // from probeBooted
193 switch initialState {
196 case StateIdle, StateRunning:
198 case StateUnknown, StateBooting:
200 panic(fmt.Sprintf("unknown state %s", initialState))
203 probeStart := time.Now()
204 logger := wkr.logger.WithField("ProbeStart", probeStart)
207 booted, stderr = wkr.probeBooted()
209 // Pretend this probe succeeded if another
210 // concurrent attempt succeeded.
212 booted = wkr.state == StateRunning || wkr.state == StateIdle
216 logger.Info("instance booted; will try probeRunning")
219 reportedBroken := false
220 if booted || wkr.state == StateUnknown {
221 ctrUUIDs, reportedBroken, ok = wkr.probeRunning()
224 defer wkr.mtx.Unlock()
225 if reportedBroken && wkr.idleBehavior == IdleBehaviorRun {
226 logger.Info("probe reported broken instance")
227 wkr.setIdleBehavior(IdleBehaviorDrain)
229 if !ok || (!booted && len(ctrUUIDs) == 0 && len(wkr.running) == 0) {
230 if wkr.state == StateShutdown && wkr.updated.After(updated) {
231 // Skip the logging noise if shutdown was
232 // initiated during probe.
235 // Using the start time of the probe as the timeout
236 // threshold ensures we always initiate at least one
237 // probe attempt after the boot/probe timeout expires
238 // (otherwise, a slow probe failure could cause us to
239 // shutdown an instance even though it did in fact
240 // boot/recover before the timeout expired).
241 dur := probeStart.Sub(wkr.probed)
242 if wkr.shutdownIfBroken(dur) {
243 // stderr from failed run-probes will have
244 // been logged already, but boot-probe
245 // failures are normal so they are logged only
246 // at Debug level. This is our chance to log
247 // some evidence about why the node never
248 // booted, even in non-debug mode.
250 logger.WithFields(logrus.Fields{
252 "stderr": string(stderr),
253 }).Info("boot failed")
259 updateTime := time.Now()
260 wkr.probed = updateTime
262 if updated != wkr.updated {
263 // Worker was updated after the probe began, so
264 // wkr.running might have a container UUID that was
265 // not yet running when ctrUUIDs was generated. Leave
266 // wkr.running alone and wait for the next probe to
267 // catch up on any changes.
271 if len(ctrUUIDs) > 0 {
272 wkr.busy = updateTime
273 wkr.lastUUID = ctrUUIDs[0]
274 } else if len(wkr.running) > 0 {
275 // Actual last-busy time was sometime between wkr.busy
276 // and now. Now is the earliest opportunity to take
277 // advantage of the non-busy state, though.
278 wkr.busy = updateTime
281 changed := wkr.updateRunning(ctrUUIDs)
283 // Update state if this was the first successful boot-probe.
284 if booted && (wkr.state == StateUnknown || wkr.state == StateBooting) {
285 // Note: this will change again below if
286 // len(wkr.starting)+len(wkr.running) > 0.
287 wkr.state = StateIdle
291 // If wkr.state and wkr.running aren't changing then there's
292 // no need to log anything, notify the scheduler, move state
293 // back and forth between idle/running, etc.
298 // Log whenever a run-probe reveals crunch-run processes
299 // appearing/disappearing before boot-probe succeeds.
300 if wkr.state == StateUnknown && changed {
301 logger.WithFields(logrus.Fields{
302 "RunningContainers": len(wkr.running),
304 }).Info("crunch-run probe succeeded, but boot probe is still failing")
307 if wkr.state == StateIdle && len(wkr.starting)+len(wkr.running) > 0 {
308 wkr.state = StateRunning
309 } else if wkr.state == StateRunning && len(wkr.starting)+len(wkr.running) == 0 {
310 wkr.state = StateIdle
312 wkr.updated = updateTime
313 if booted && (initialState == StateUnknown || initialState == StateBooting) {
314 logger.WithFields(logrus.Fields{
315 "RunningContainers": len(wkr.running),
317 }).Info("probes succeeded, instance is in service")
322 func (wkr *worker) probeRunning() (running []string, reportsBroken, ok bool) {
323 cmd := wkr.wp.runnerCmd + " --list"
324 if u := wkr.instance.RemoteUser(); u != "root" {
327 stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
329 wkr.logger.WithFields(logrus.Fields{
331 "stdout": string(stdout),
332 "stderr": string(stderr),
333 }).WithError(err).Warn("probe failed")
337 for _, s := range strings.Split(string(stdout), "\n") {
341 running = append(running, s)
347 func (wkr *worker) probeBooted() (ok bool, stderr []byte) {
348 cmd := wkr.wp.bootProbeCommand
352 stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
353 logger := wkr.logger.WithFields(logrus.Fields{
355 "stdout": string(stdout),
356 "stderr": string(stderr),
359 logger.WithError(err).Debug("boot probe failed")
362 logger.Info("boot probe succeeded")
363 if err = wkr.wp.loadRunnerData(); err != nil {
364 wkr.logger.WithError(err).Warn("cannot boot worker: error loading runner binary")
366 } else if len(wkr.wp.runnerData) == 0 {
367 // Assume crunch-run is already installed
368 } else if _, stderr2, err := wkr.copyRunnerData(); err != nil {
369 wkr.logger.WithError(err).WithField("stderr", string(stderr2)).Warn("error copying runner binary")
370 return false, stderr2
372 wkr.logger.Info("runner binary OK")
373 stderr = append(stderr, stderr2...)
378 func (wkr *worker) copyRunnerData() (stdout, stderr []byte, err error) {
379 hash := fmt.Sprintf("%x", wkr.wp.runnerMD5)
380 dstdir, _ := filepath.Split(wkr.wp.runnerCmd)
382 stdout, stderr, err = wkr.executor.Execute(nil, `md5sum `+wkr.wp.runnerCmd, nil)
383 if err == nil && len(stderr) == 0 && bytes.Equal(stdout, []byte(hash+" "+wkr.wp.runnerCmd+"\n")) {
387 // Note touch+chmod come before writing data, to avoid the
388 // possibility of md5 being correct while file mode is
390 cmd := `set -e; dstdir="` + dstdir + `"; dstfile="` + wkr.wp.runnerCmd + `"; mkdir -p "$dstdir"; touch "$dstfile"; chmod 0755 "$dstdir" "$dstfile"; cat >"$dstfile"`
391 if wkr.instance.RemoteUser() != "root" {
392 cmd = `sudo sh -c '` + strings.Replace(cmd, "'", "'\\''", -1) + `'`
394 stdout, stderr, err = wkr.executor.Execute(nil, cmd, bytes.NewReader(wkr.wp.runnerData))
398 // caller must have lock.
399 func (wkr *worker) shutdownIfBroken(dur time.Duration) bool {
400 if wkr.idleBehavior == IdleBehaviorHold {
404 label, threshold := "", wkr.wp.timeoutProbe
405 if wkr.state == StateUnknown || wkr.state == StateBooting {
406 label, threshold = "new ", wkr.wp.timeoutBooting
411 wkr.logger.WithFields(logrus.Fields{
415 }).Warnf("%sinstance unresponsive, shutting down", label)
420 // Returns true if the instance is eligible for shutdown: either it's
421 // been idle too long, or idleBehavior=Drain and nothing is running.
423 // caller must have lock.
424 func (wkr *worker) eligibleForShutdown() bool {
425 if wkr.idleBehavior == IdleBehaviorHold {
428 draining := wkr.idleBehavior == IdleBehaviorDrain
433 return draining || time.Since(wkr.busy) >= wkr.wp.timeoutIdle
438 for _, rr := range wkr.running {
443 for _, rr := range wkr.starting {
448 // draining, and all remaining runners are just trying
449 // to force-kill their crunch-run procs
456 // caller must have lock.
457 func (wkr *worker) shutdownIfIdle() bool {
458 if !wkr.eligibleForShutdown() {
461 wkr.logger.WithFields(logrus.Fields{
463 "IdleDuration": stats.Duration(time.Since(wkr.busy)),
464 "IdleBehavior": wkr.idleBehavior,
465 }).Info("shutdown worker")
470 // caller must have lock.
471 func (wkr *worker) shutdown() {
475 wkr.state = StateShutdown
478 err := wkr.instance.Destroy()
480 wkr.logger.WithError(err).Warn("shutdown failed")
486 // Save worker tags to cloud provider metadata, if they don't already
487 // match. Caller must have lock.
488 func (wkr *worker) saveTags() {
489 instance := wkr.instance
490 tags := instance.Tags()
491 update := cloud.InstanceTags{
492 wkr.wp.tagKeyPrefix + tagKeyInstanceType: wkr.instType.Name,
493 wkr.wp.tagKeyPrefix + tagKeyIdleBehavior: string(wkr.idleBehavior),
496 for k, v := range update {
504 err := instance.SetTags(tags)
506 wkr.wp.logger.WithField("Instance", instance.ID()).WithError(err).Warnf("error updating tags")
512 func (wkr *worker) Close() {
513 // This might take time, so do it after unlocking mtx.
514 defer wkr.executor.Close()
517 defer wkr.mtx.Unlock()
518 for uuid, rr := range wkr.running {
519 wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process abandoned")
522 for uuid, rr := range wkr.starting {
523 wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process abandoned")
528 // Add/remove entries in wkr.running to match ctrUUIDs returned by a
529 // probe. Returns true if anything was added or removed.
531 // Caller must have lock.
532 func (wkr *worker) updateRunning(ctrUUIDs []string) (changed bool) {
533 alive := map[string]bool{}
534 for _, uuid := range ctrUUIDs {
536 if _, ok := wkr.running[uuid]; ok {
538 } else if rr, ok := wkr.starting[uuid]; ok {
539 wkr.running[uuid] = rr
540 delete(wkr.starting, uuid)
543 // We didn't start it -- it must have been
544 // started by a previous dispatcher process.
545 wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process detected")
546 wkr.running[uuid] = newRemoteRunner(uuid, wkr)
550 for uuid := range wkr.running {
552 wkr.closeRunner(uuid)
559 // caller must have lock.
560 func (wkr *worker) closeRunner(uuid string) {
561 rr := wkr.running[uuid]
565 wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process ended")
566 delete(wkr.running, uuid)
571 wkr.wp.exited[uuid] = now
572 if wkr.state == StateRunning && len(wkr.running)+len(wkr.starting) == 0 {
573 wkr.state = StateIdle