1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
13 "git.curoverse.com/arvados.git/lib/cloud"
14 "git.curoverse.com/arvados.git/sdk/go/arvados"
15 "github.com/sirupsen/logrus"
20 maxPingFailTime = 10 * time.Minute
23 // State indicates whether a worker is available to do work, and (if
24 // not) whether/when it is expected to become ready.
28 StateUnknown State = iota // might be running a container already
29 StateBooting // instance is booting
30 StateIdle // instance booted, no containers are running
31 StateRunning // instance is running one or more containers
32 StateShutdown // worker has stopped monitoring the instance
35 var stateString = map[State]string{
36 StateUnknown: "unknown",
37 StateBooting: "booting",
39 StateRunning: "running",
40 StateShutdown: "shutdown",
43 // String implements fmt.Stringer.
44 func (s State) String() string {
48 // MarshalText implements encoding.TextMarshaler so a JSON encoding of
49 // map[State]anything uses the state's string representation.
50 func (s State) MarshalText() ([]byte, error) {
51 return []byte(stateString[s]), nil
54 // IdleBehavior indicates the behavior desired when a node becomes idle.
55 type IdleBehavior string
58 IdleBehaviorRun IdleBehavior = "run" // run containers, or shutdown on idle timeout
59 IdleBehaviorHold = "hold" // don't shutdown or run more containers
60 IdleBehaviorDrain = "drain" // shutdown immediately when idle
63 var validIdleBehavior = map[IdleBehavior]bool{
64 IdleBehaviorRun: true,
65 IdleBehaviorHold: true,
66 IdleBehaviorDrain: true,
70 logger logrus.FieldLogger
74 mtx sync.Locker // must be wp's Locker.
76 idleBehavior IdleBehavior
77 instance cloud.Instance
78 instType arvados.InstanceType
87 running map[string]struct{} // remember to update state idle<->running when this changes
88 starting map[string]struct{} // remember to update state idle<->running when this changes
92 // caller must have lock.
93 func (wkr *worker) startContainer(ctr arvados.Container) {
94 logger := wkr.logger.WithFields(logrus.Fields{
95 "ContainerUUID": ctr.UUID,
96 "Priority": ctr.Priority,
98 logger = logger.WithField("Instance", wkr.instance)
99 logger.Debug("starting container")
100 wkr.starting[ctr.UUID] = struct{}{}
101 wkr.state = StateRunning
103 env := map[string]string{
104 "ARVADOS_API_HOST": wkr.wp.arvClient.APIHost,
105 "ARVADOS_API_TOKEN": wkr.wp.arvClient.AuthToken,
107 stdout, stderr, err := wkr.executor.Execute(env, "crunch-run --detach '"+ctr.UUID+"'", nil)
109 defer wkr.mtx.Unlock()
113 delete(wkr.starting, ctr.UUID)
114 wkr.running[ctr.UUID] = struct{}{}
115 wkr.lastUUID = ctr.UUID
117 logger.WithField("stdout", string(stdout)).
118 WithField("stderr", string(stderr)).
120 Error("error starting crunch-run process")
121 // Leave uuid in wkr.running, though: it's
122 // possible the error was just a communication
123 // failure and the process was in fact
124 // started. Wait for next probe to find out.
127 logger.Info("crunch-run process started")
128 wkr.lastUUID = ctr.UUID
132 // ProbeAndUpdate conducts appropriate boot/running probes (if any)
133 // for the worker's curent state. If a previous probe is still
134 // running, it does nothing.
136 // It should be called in a new goroutine.
137 func (wkr *worker) ProbeAndUpdate() {
139 case wkr.probing <- struct{}{}:
143 wkr.logger.Debug("still waiting for last probe to finish")
147 // should be called in a new goroutine
148 func (wkr *worker) probeAndUpdate() {
150 updated := wkr.updated
151 needProbeRunning := wkr.state == StateRunning || wkr.state == StateIdle
152 needProbeBooted := wkr.state == StateUnknown || wkr.state == StateBooting
154 if !needProbeBooted && !needProbeRunning {
164 ok, stderr = wkr.probeBooted()
166 if ok || wkr.state == StateRunning || wkr.state == StateIdle {
167 wkr.logger.Info("instance booted; will try probeRunning")
168 needProbeRunning = true
172 if needProbeRunning {
173 ctrUUIDs, ok, stderr = wkr.probeRunning()
175 logger := wkr.logger.WithField("stderr", string(stderr))
177 defer wkr.mtx.Unlock()
179 if wkr.state == StateShutdown && wkr.updated.After(updated) {
180 // Skip the logging noise if shutdown was
181 // initiated during probe.
184 dur := time.Since(wkr.probed)
185 logger := logger.WithFields(logrus.Fields{
189 if wkr.state == StateBooting && !needProbeRunning {
190 // If we know the instance has never passed a
191 // boot probe, it's not noteworthy that it
192 // hasn't passed this probe.
193 logger.Debug("new instance not responding")
195 logger.Info("instance not responding")
197 wkr.shutdownIfBroken(dur)
201 updateTime := time.Now()
202 wkr.probed = updateTime
204 if updated != wkr.updated {
205 // Worker was updated after the probe began, so
206 // wkr.running might have a container UUID that was
207 // not yet running when ctrUUIDs was generated. Leave
208 // wkr.running alone and wait for the next probe to
209 // catch up on any changes.
213 if len(ctrUUIDs) > 0 {
214 wkr.busy = updateTime
215 wkr.lastUUID = ctrUUIDs[0]
216 } else if len(wkr.running) > 0 {
217 // Actual last-busy time was sometime between wkr.busy
218 // and now. Now is the earliest opportunity to take
219 // advantage of the non-busy state, though.
220 wkr.busy = updateTime
222 running := map[string]struct{}{}
224 for _, uuid := range ctrUUIDs {
225 running[uuid] = struct{}{}
226 if _, ok := wkr.running[uuid]; !ok {
230 for uuid := range wkr.running {
231 if _, ok := running[uuid]; !ok {
232 logger.WithField("ContainerUUID", uuid).Info("crunch-run process ended")
233 wkr.wp.notifyExited(uuid, updateTime)
237 if wkr.state == StateUnknown || wkr.state == StateBooting {
238 wkr.state = StateIdle
242 wkr.running = running
243 if wkr.state == StateIdle && len(wkr.starting)+len(wkr.running) > 0 {
244 wkr.state = StateRunning
245 } else if wkr.state == StateRunning && len(wkr.starting)+len(wkr.running) == 0 {
246 wkr.state = StateIdle
248 wkr.updated = updateTime
253 func (wkr *worker) probeRunning() (running []string, ok bool, stderr []byte) {
254 cmd := "crunch-run --list"
255 stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
257 wkr.logger.WithFields(logrus.Fields{
259 "stdout": string(stdout),
260 "stderr": string(stderr),
261 }).WithError(err).Warn("probe failed")
262 return nil, false, stderr
264 stdout = bytes.TrimRight(stdout, "\n")
265 if len(stdout) == 0 {
266 return nil, true, stderr
268 return strings.Split(string(stdout), "\n"), true, stderr
271 func (wkr *worker) probeBooted() (ok bool, stderr []byte) {
272 cmd := wkr.wp.bootProbeCommand
276 stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
277 logger := wkr.logger.WithFields(logrus.Fields{
279 "stdout": string(stdout),
280 "stderr": string(stderr),
283 logger.WithError(err).Debug("boot probe failed")
286 logger.Info("boot probe succeeded")
290 // caller must have lock.
291 func (wkr *worker) shutdownIfBroken(dur time.Duration) {
292 if wkr.idleBehavior == IdleBehaviorHold {
295 label, threshold := "", wkr.wp.timeoutProbe
296 if wkr.state == StateUnknown || wkr.state == StateBooting {
297 label, threshold = "new ", wkr.wp.timeoutBooting
302 wkr.logger.WithFields(logrus.Fields{
306 }).Warnf("%sinstance unresponsive, shutting down", label)
310 // caller must have lock.
311 func (wkr *worker) shutdownIfIdle() bool {
312 if wkr.idleBehavior == IdleBehaviorHold {
315 if !(wkr.state == StateIdle || (wkr.state == StateBooting && wkr.idleBehavior == IdleBehaviorDrain)) {
318 age := time.Since(wkr.busy)
319 if wkr.idleBehavior != IdleBehaviorDrain && age < wkr.wp.timeoutIdle {
322 wkr.logger.WithFields(logrus.Fields{
324 "IdleBehavior": wkr.idleBehavior,
325 }).Info("shutdown idle worker")
330 // caller must have lock.
331 func (wkr *worker) shutdown() {
335 wkr.state = StateShutdown
338 err := wkr.instance.Destroy()
340 wkr.logger.WithError(err).Warn("shutdown failed")
346 // Save worker tags to cloud provider metadata, if they don't already
347 // match. Caller must have lock.
348 func (wkr *worker) saveTags() {
349 instance := wkr.instance
350 have := instance.Tags()
351 want := cloud.InstanceTags{
352 tagKeyInstanceType: wkr.instType.Name,
353 tagKeyIdleBehavior: string(wkr.idleBehavior),
356 for k, v := range want {
360 err := instance.SetTags(want)
362 wkr.wp.logger.WithField("Instance", instance).WithError(err).Warnf("error updating tags")