1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
15 "git.curoverse.com/arvados.git/lib/cloud"
16 "git.curoverse.com/arvados.git/sdk/go/arvados"
17 "github.com/sirupsen/logrus"
22 maxPingFailTime = 10 * time.Minute
25 // State indicates whether a worker is available to do work, and (if
26 // not) whether/when it is expected to become ready.
30 StateUnknown State = iota // might be running a container already
31 StateBooting // instance is booting
32 StateIdle // instance booted, no containers are running
33 StateRunning // instance is running one or more containers
34 StateShutdown // worker has stopped monitoring the instance
37 var stateString = map[State]string{
38 StateUnknown: "unknown",
39 StateBooting: "booting",
41 StateRunning: "running",
42 StateShutdown: "shutdown",
45 // String implements fmt.Stringer.
46 func (s State) String() string {
50 // MarshalText implements encoding.TextMarshaler so a JSON encoding of
51 // map[State]anything uses the state's string representation.
52 func (s State) MarshalText() ([]byte, error) {
53 return []byte(stateString[s]), nil
56 // IdleBehavior indicates the behavior desired when a node becomes idle.
57 type IdleBehavior string
60 IdleBehaviorRun IdleBehavior = "run" // run containers, or shutdown on idle timeout
61 IdleBehaviorHold IdleBehavior = "hold" // don't shutdown or run more containers
62 IdleBehaviorDrain IdleBehavior = "drain" // shutdown immediately when idle
65 var validIdleBehavior = map[IdleBehavior]bool{
66 IdleBehaviorRun: true,
67 IdleBehaviorHold: true,
68 IdleBehaviorDrain: true,
72 logger logrus.FieldLogger
76 mtx sync.Locker // must be wp's Locker.
78 idleBehavior IdleBehavior
79 instance cloud.Instance
80 instType arvados.InstanceType
89 running map[string]struct{} // remember to update state idle<->running when this changes
90 starting map[string]struct{} // remember to update state idle<->running when this changes
94 // caller must have lock.
95 func (wkr *worker) startContainer(ctr arvados.Container) {
96 logger := wkr.logger.WithFields(logrus.Fields{
97 "ContainerUUID": ctr.UUID,
98 "Priority": ctr.Priority,
100 logger = logger.WithField("Instance", wkr.instance.ID())
101 logger.Debug("starting container")
102 wkr.starting[ctr.UUID] = struct{}{}
103 wkr.state = StateRunning
105 env := map[string]string{
106 "ARVADOS_API_HOST": wkr.wp.arvClient.APIHost,
107 "ARVADOS_API_TOKEN": wkr.wp.arvClient.AuthToken,
109 if wkr.wp.arvClient.Insecure {
110 env["ARVADOS_API_HOST_INSECURE"] = "1"
112 envJSON, err := json.Marshal(env)
116 stdin := bytes.NewBuffer(envJSON)
117 cmd := "crunch-run --detach --stdin-env '" + ctr.UUID + "'"
118 if u := wkr.instance.RemoteUser(); u != "root" {
121 stdout, stderr, err := wkr.executor.Execute(nil, cmd, stdin)
123 defer wkr.mtx.Unlock()
127 delete(wkr.starting, ctr.UUID)
128 wkr.running[ctr.UUID] = struct{}{}
129 wkr.lastUUID = ctr.UUID
131 logger.WithField("stdout", string(stdout)).
132 WithField("stderr", string(stderr)).
134 Error("error starting crunch-run process")
135 // Leave uuid in wkr.running, though: it's
136 // possible the error was just a communication
137 // failure and the process was in fact
138 // started. Wait for next probe to find out.
141 logger.Info("crunch-run process started")
142 wkr.lastUUID = ctr.UUID
146 // ProbeAndUpdate conducts appropriate boot/running probes (if any)
147 // for the worker's curent state. If a previous probe is still
148 // running, it does nothing.
150 // It should be called in a new goroutine.
151 func (wkr *worker) ProbeAndUpdate() {
153 case wkr.probing <- struct{}{}:
157 wkr.logger.Debug("still waiting for last probe to finish")
161 // probeAndUpdate calls probeBooted and/or probeRunning if needed, and
162 // updates state accordingly.
164 // In StateUnknown: Call both probeBooted and probeRunning.
165 // In StateBooting: Call probeBooted; if successful, call probeRunning.
166 // In StateRunning: Call probeRunning.
167 // In StateIdle: Call probeRunning.
168 // In StateShutdown: Do nothing.
170 // If both probes succeed, wkr.state changes to
171 // StateIdle/StateRunning.
173 // If probeRunning succeeds, wkr.running is updated. (This means
174 // wkr.running might be non-empty even in StateUnknown, if the boot
177 // probeAndUpdate should be called in a new goroutine.
178 func (wkr *worker) probeAndUpdate() {
180 updated := wkr.updated
181 initialState := wkr.state
188 stderr []byte // from probeBooted
191 switch initialState {
194 case StateIdle, StateRunning:
196 case StateUnknown, StateBooting:
198 panic(fmt.Sprintf("unknown state %s", initialState))
201 probeStart := time.Now()
202 logger := wkr.logger.WithField("ProbeStart", probeStart)
205 booted, stderr = wkr.probeBooted()
207 // Pretend this probe succeeded if another
208 // concurrent attempt succeeded.
210 booted = wkr.state == StateRunning || wkr.state == StateIdle
214 logger.Info("instance booted; will try probeRunning")
217 if booted || wkr.state == StateUnknown {
218 ctrUUIDs, ok = wkr.probeRunning()
221 defer wkr.mtx.Unlock()
222 if !ok || (!booted && len(ctrUUIDs) == 0 && len(wkr.running) == 0) {
223 if wkr.state == StateShutdown && wkr.updated.After(updated) {
224 // Skip the logging noise if shutdown was
225 // initiated during probe.
228 // Using the start time of the probe as the timeout
229 // threshold ensures we always initiate at least one
230 // probe attempt after the boot/probe timeout expires
231 // (otherwise, a slow probe failure could cause us to
232 // shutdown an instance even though it did in fact
233 // boot/recover before the timeout expired).
234 dur := probeStart.Sub(wkr.probed)
235 if wkr.shutdownIfBroken(dur) {
236 // stderr from failed run-probes will have
237 // been logged already, but boot-probe
238 // failures are normal so they are logged only
239 // at Debug level. This is our chance to log
240 // some evidence about why the node never
241 // booted, even in non-debug mode.
243 logger.WithFields(logrus.Fields{
245 "stderr": string(stderr),
246 }).Info("boot failed")
252 updateTime := time.Now()
253 wkr.probed = updateTime
255 if updated != wkr.updated {
256 // Worker was updated after the probe began, so
257 // wkr.running might have a container UUID that was
258 // not yet running when ctrUUIDs was generated. Leave
259 // wkr.running alone and wait for the next probe to
260 // catch up on any changes.
264 if len(ctrUUIDs) > 0 {
265 wkr.busy = updateTime
266 wkr.lastUUID = ctrUUIDs[0]
267 } else if len(wkr.running) > 0 {
268 // Actual last-busy time was sometime between wkr.busy
269 // and now. Now is the earliest opportunity to take
270 // advantage of the non-busy state, though.
271 wkr.busy = updateTime
275 // Build a new "running" map. Set changed=true if it differs
276 // from the existing map (wkr.running) to ensure the scheduler
277 // gets notified below.
278 running := map[string]struct{}{}
279 for _, uuid := range ctrUUIDs {
280 running[uuid] = struct{}{}
281 if _, ok := wkr.running[uuid]; !ok {
282 if _, ok := wkr.starting[uuid]; !ok {
283 // We didn't start it -- it must have
284 // been started by a previous
285 // dispatcher process.
286 logger.WithField("ContainerUUID", uuid).Info("crunch-run process detected")
291 for uuid := range wkr.running {
292 if _, ok := running[uuid]; !ok {
293 logger.WithField("ContainerUUID", uuid).Info("crunch-run process ended")
294 wkr.wp.notifyExited(uuid, updateTime)
299 // Update state if this was the first successful boot-probe.
300 if booted && (wkr.state == StateUnknown || wkr.state == StateBooting) {
301 // Note: this will change again below if
302 // len(wkr.starting)+len(wkr.running) > 0.
303 wkr.state = StateIdle
307 // If wkr.state and wkr.running aren't changing then there's
308 // no need to log anything, notify the scheduler, move state
309 // back and forth between idle/running, etc.
314 // Log whenever a run-probe reveals crunch-run processes
315 // appearing/disappearing before boot-probe succeeds.
316 if wkr.state == StateUnknown && len(running) != len(wkr.running) {
317 logger.WithFields(logrus.Fields{
318 "RunningContainers": len(running),
320 }).Info("crunch-run probe succeeded, but boot probe is still failing")
323 wkr.running = running
324 if wkr.state == StateIdle && len(wkr.starting)+len(wkr.running) > 0 {
325 wkr.state = StateRunning
326 } else if wkr.state == StateRunning && len(wkr.starting)+len(wkr.running) == 0 {
327 wkr.state = StateIdle
329 wkr.updated = updateTime
330 if booted && (initialState == StateUnknown || initialState == StateBooting) {
331 logger.WithFields(logrus.Fields{
332 "RunningContainers": len(running),
334 }).Info("probes succeeded, instance is in service")
339 func (wkr *worker) probeRunning() (running []string, ok bool) {
340 cmd := "crunch-run --list"
341 if u := wkr.instance.RemoteUser(); u != "root" {
344 stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
346 wkr.logger.WithFields(logrus.Fields{
348 "stdout": string(stdout),
349 "stderr": string(stderr),
350 }).WithError(err).Warn("probe failed")
353 stdout = bytes.TrimRight(stdout, "\n")
354 if len(stdout) == 0 {
357 return strings.Split(string(stdout), "\n"), true
360 func (wkr *worker) probeBooted() (ok bool, stderr []byte) {
361 cmd := wkr.wp.bootProbeCommand
365 stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
366 logger := wkr.logger.WithFields(logrus.Fields{
368 "stdout": string(stdout),
369 "stderr": string(stderr),
372 logger.WithError(err).Debug("boot probe failed")
375 logger.Info("boot probe succeeded")
379 // caller must have lock.
380 func (wkr *worker) shutdownIfBroken(dur time.Duration) bool {
381 if wkr.idleBehavior == IdleBehaviorHold {
385 label, threshold := "", wkr.wp.timeoutProbe
386 if wkr.state == StateUnknown || wkr.state == StateBooting {
387 label, threshold = "new ", wkr.wp.timeoutBooting
392 wkr.logger.WithFields(logrus.Fields{
396 }).Warnf("%sinstance unresponsive, shutting down", label)
401 // caller must have lock.
402 func (wkr *worker) shutdownIfIdle() bool {
403 if wkr.idleBehavior == IdleBehaviorHold {
407 age := time.Since(wkr.busy)
409 old := age >= wkr.wp.timeoutIdle
410 draining := wkr.idleBehavior == IdleBehaviorDrain
411 shouldShutdown := ((old || draining) && wkr.state == StateIdle) ||
412 (draining && wkr.state == StateBooting)
417 wkr.logger.WithFields(logrus.Fields{
420 "IdleBehavior": wkr.idleBehavior,
421 }).Info("shutdown idle worker")
426 // caller must have lock.
427 func (wkr *worker) shutdown() {
431 wkr.state = StateShutdown
434 err := wkr.instance.Destroy()
436 wkr.logger.WithError(err).Warn("shutdown failed")
442 // Save worker tags to cloud provider metadata, if they don't already
443 // match. Caller must have lock.
444 func (wkr *worker) saveTags() {
445 instance := wkr.instance
446 have := instance.Tags()
447 want := cloud.InstanceTags{
448 tagKeyInstanceType: wkr.instType.Name,
449 tagKeyIdleBehavior: string(wkr.idleBehavior),
452 for k, v := range want {
456 err := instance.SetTags(want)
458 wkr.wp.logger.WithField("Instance", instance.ID()).WithError(err).Warnf("error updating tags")