Allow multiple clusters to use loopback driver on same host.
[arvados.git] / lib / dispatchcloud / worker / worker.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package worker
6
7 import (
8         "bytes"
9         "fmt"
10         "path/filepath"
11         "strings"
12         "sync"
13         "time"
14
15         "git.arvados.org/arvados.git/lib/cloud"
16         "git.arvados.org/arvados.git/sdk/go/arvados"
17         "git.arvados.org/arvados.git/sdk/go/stats"
18         "github.com/sirupsen/logrus"
19 )
20
21 const (
22         // TODO: configurable
23         maxPingFailTime = 10 * time.Minute
24 )
25
26 // State indicates whether a worker is available to do work, and (if
27 // not) whether/when it is expected to become ready.
28 type State int
29
30 const (
31         StateUnknown  State = iota // might be running a container already
32         StateBooting               // instance is booting
33         StateIdle                  // instance booted, no containers are running
34         StateRunning               // instance is running one or more containers
35         StateShutdown              // worker has stopped monitoring the instance
36 )
37
38 var stateString = map[State]string{
39         StateUnknown:  "unknown",
40         StateBooting:  "booting",
41         StateIdle:     "idle",
42         StateRunning:  "running",
43         StateShutdown: "shutdown",
44 }
45
46 // String implements fmt.Stringer.
47 func (s State) String() string {
48         return stateString[s]
49 }
50
51 // MarshalText implements encoding.TextMarshaler so a JSON encoding of
52 // map[State]anything uses the state's string representation.
53 func (s State) MarshalText() ([]byte, error) {
54         return []byte(stateString[s]), nil
55 }
56
57 // BootOutcome is the result of a worker boot. It is used as a label in a metric.
58 type BootOutcome string
59
60 const (
61         BootOutcomeFailed      BootOutcome = "failure"
62         BootOutcomeSucceeded   BootOutcome = "success"
63         BootOutcomeAborted     BootOutcome = "aborted"
64         BootOutcomeDisappeared BootOutcome = "disappeared"
65 )
66
67 var validBootOutcomes = map[BootOutcome]bool{
68         BootOutcomeFailed:      true,
69         BootOutcomeSucceeded:   true,
70         BootOutcomeAborted:     true,
71         BootOutcomeDisappeared: true,
72 }
73
74 // IdleBehavior indicates the behavior desired when a node becomes idle.
75 type IdleBehavior string
76
77 const (
78         IdleBehaviorRun   IdleBehavior = "run"   // run containers, or shutdown on idle timeout
79         IdleBehaviorHold  IdleBehavior = "hold"  // don't shutdown or run more containers
80         IdleBehaviorDrain IdleBehavior = "drain" // shutdown immediately when idle
81 )
82
83 var validIdleBehavior = map[IdleBehavior]bool{
84         IdleBehaviorRun:   true,
85         IdleBehaviorHold:  true,
86         IdleBehaviorDrain: true,
87 }
88
89 type worker struct {
90         logger   logrus.FieldLogger
91         executor Executor
92         wp       *Pool
93
94         mtx                 sync.Locker // must be wp's Locker.
95         state               State
96         idleBehavior        IdleBehavior
97         instance            cloud.Instance
98         instType            arvados.InstanceType
99         vcpus               int64
100         memory              int64
101         appeared            time.Time
102         probed              time.Time
103         updated             time.Time
104         busy                time.Time
105         destroyed           time.Time
106         firstSSHConnection  time.Time
107         lastUUID            string
108         running             map[string]*remoteRunner // remember to update state idle<->running when this changes
109         starting            map[string]*remoteRunner // remember to update state idle<->running when this changes
110         probing             chan struct{}
111         bootOutcomeReported bool
112         timeToReadyReported bool
113         staleRunLockSince   time.Time
114 }
115
116 func (wkr *worker) onUnkillable(uuid string) {
117         wkr.mtx.Lock()
118         defer wkr.mtx.Unlock()
119         logger := wkr.logger.WithField("ContainerUUID", uuid)
120         if wkr.idleBehavior == IdleBehaviorHold {
121                 logger.Warn("unkillable container, but worker has IdleBehavior=Hold")
122                 return
123         }
124         logger.Warn("unkillable container, draining worker")
125         wkr.setIdleBehavior(IdleBehaviorDrain)
126 }
127
128 func (wkr *worker) onKilled(uuid string) {
129         wkr.mtx.Lock()
130         defer wkr.mtx.Unlock()
131         wkr.closeRunner(uuid)
132         go wkr.wp.notify()
133 }
134
135 // caller must have lock.
136 func (wkr *worker) reportBootOutcome(outcome BootOutcome) {
137         if wkr.bootOutcomeReported {
138                 return
139         }
140         if wkr.wp.mBootOutcomes != nil {
141                 wkr.wp.mBootOutcomes.WithLabelValues(string(outcome)).Inc()
142         }
143         wkr.bootOutcomeReported = true
144 }
145
146 // caller must have lock.
147 func (wkr *worker) reportTimeBetweenFirstSSHAndReadyForContainer() {
148         if wkr.timeToReadyReported {
149                 return
150         }
151         if wkr.wp.mTimeToSSH != nil {
152                 wkr.wp.mTimeToReadyForContainer.Observe(time.Since(wkr.firstSSHConnection).Seconds())
153         }
154         wkr.timeToReadyReported = true
155 }
156
157 // caller must have lock.
158 func (wkr *worker) setIdleBehavior(idleBehavior IdleBehavior) {
159         wkr.logger.WithField("IdleBehavior", idleBehavior).Info("set idle behavior")
160         wkr.idleBehavior = idleBehavior
161         wkr.saveTags()
162         wkr.shutdownIfIdle()
163 }
164
165 // caller must have lock.
166 func (wkr *worker) startContainer(ctr arvados.Container) {
167         logger := wkr.logger.WithFields(logrus.Fields{
168                 "ContainerUUID": ctr.UUID,
169                 "Priority":      ctr.Priority,
170         })
171         logger.Debug("starting container")
172         rr := newRemoteRunner(ctr.UUID, wkr)
173         wkr.starting[ctr.UUID] = rr
174         if wkr.state != StateRunning {
175                 wkr.state = StateRunning
176                 go wkr.wp.notify()
177         }
178         go func() {
179                 rr.Start()
180                 if wkr.wp.mTimeFromQueueToCrunchRun != nil {
181                         wkr.wp.mTimeFromQueueToCrunchRun.Observe(time.Since(ctr.CreatedAt).Seconds())
182                 }
183                 wkr.mtx.Lock()
184                 defer wkr.mtx.Unlock()
185                 now := time.Now()
186                 wkr.updated = now
187                 wkr.busy = now
188                 delete(wkr.starting, ctr.UUID)
189                 wkr.running[ctr.UUID] = rr
190                 wkr.lastUUID = ctr.UUID
191         }()
192 }
193
194 // ProbeAndUpdate conducts appropriate boot/running probes (if any)
195 // for the worker's current state. If a previous probe is still
196 // running, it does nothing.
197 //
198 // It should be called in a new goroutine.
199 func (wkr *worker) ProbeAndUpdate() {
200         select {
201         case wkr.probing <- struct{}{}:
202                 wkr.probeAndUpdate()
203                 <-wkr.probing
204         default:
205                 wkr.logger.Debug("still waiting for last probe to finish")
206         }
207 }
208
209 // probeAndUpdate calls probeBooted and/or probeRunning if needed, and
210 // updates state accordingly.
211 //
212 // In StateUnknown: Call both probeBooted and probeRunning.
213 // In StateBooting: Call probeBooted; if successful, call probeRunning.
214 // In StateRunning: Call probeRunning.
215 // In StateIdle: Call probeRunning.
216 // In StateShutdown: Do nothing.
217 //
218 // If both probes succeed, wkr.state changes to
219 // StateIdle/StateRunning.
220 //
221 // If probeRunning succeeds, wkr.running is updated. (This means
222 // wkr.running might be non-empty even in StateUnknown, if the boot
223 // probe failed.)
224 //
225 // probeAndUpdate should be called in a new goroutine.
226 func (wkr *worker) probeAndUpdate() {
227         wkr.mtx.Lock()
228         updated := wkr.updated
229         initialState := wkr.state
230         wkr.mtx.Unlock()
231
232         var (
233                 booted   bool
234                 ctrUUIDs []string
235                 ok       bool
236                 stderr   []byte // from probeBooted
237         )
238
239         switch initialState {
240         case StateShutdown:
241                 return
242         case StateIdle, StateRunning:
243                 booted = true
244         case StateUnknown, StateBooting:
245         default:
246                 panic(fmt.Sprintf("unknown state %s", initialState))
247         }
248
249         probeStart := time.Now()
250         logger := wkr.logger.WithField("ProbeStart", probeStart)
251
252         if !booted {
253                 booted, stderr = wkr.probeBooted()
254                 if !booted {
255                         // Pretend this probe succeeded if another
256                         // concurrent attempt succeeded.
257                         wkr.mtx.Lock()
258                         booted = wkr.state == StateRunning || wkr.state == StateIdle
259                         wkr.mtx.Unlock()
260                 }
261                 if booted {
262                         logger.Info("instance booted; will try probeRunning")
263                 }
264         }
265         reportedBroken := false
266         if booted || wkr.state == StateUnknown {
267                 ctrUUIDs, reportedBroken, ok = wkr.probeRunning()
268         }
269         wkr.mtx.Lock()
270         defer wkr.mtx.Unlock()
271         if reportedBroken && wkr.idleBehavior == IdleBehaviorRun {
272                 logger.Info("probe reported broken instance")
273                 wkr.reportBootOutcome(BootOutcomeFailed)
274                 wkr.setIdleBehavior(IdleBehaviorDrain)
275         }
276         if !ok || (!booted && len(ctrUUIDs) == 0 && len(wkr.running) == 0) {
277                 if wkr.state == StateShutdown && wkr.updated.After(updated) {
278                         // Skip the logging noise if shutdown was
279                         // initiated during probe.
280                         return
281                 }
282                 // Using the start time of the probe as the timeout
283                 // threshold ensures we always initiate at least one
284                 // probe attempt after the boot/probe timeout expires
285                 // (otherwise, a slow probe failure could cause us to
286                 // shutdown an instance even though it did in fact
287                 // boot/recover before the timeout expired).
288                 dur := probeStart.Sub(wkr.probed)
289                 if wkr.shutdownIfBroken(dur) {
290                         // stderr from failed run-probes will have
291                         // been logged already, but boot-probe
292                         // failures are normal so they are logged only
293                         // at Debug level. This is our chance to log
294                         // some evidence about why the node never
295                         // booted, even in non-debug mode.
296                         if !booted {
297                                 wkr.reportBootOutcome(BootOutcomeFailed)
298                                 logger.WithFields(logrus.Fields{
299                                         "Duration": dur,
300                                         "stderr":   string(stderr),
301                                 }).Info("boot failed")
302                         }
303                 }
304                 return
305         }
306
307         updateTime := time.Now()
308         wkr.probed = updateTime
309
310         if updated != wkr.updated {
311                 // Worker was updated after the probe began, so
312                 // wkr.running might have a container UUID that was
313                 // not yet running when ctrUUIDs was generated. Leave
314                 // wkr.running alone and wait for the next probe to
315                 // catch up on any changes.
316                 logger.WithFields(logrus.Fields{
317                         "updated":     updated,
318                         "wkr.updated": wkr.updated,
319                 }).Debug("skipping worker state update due to probe/sync race")
320                 return
321         }
322
323         if len(ctrUUIDs) > 0 {
324                 wkr.busy = updateTime
325                 wkr.lastUUID = ctrUUIDs[0]
326         } else if len(wkr.running) > 0 {
327                 // Actual last-busy time was sometime between wkr.busy
328                 // and now. Now is the earliest opportunity to take
329                 // advantage of the non-busy state, though.
330                 wkr.busy = updateTime
331         }
332
333         changed := wkr.updateRunning(ctrUUIDs)
334
335         // Update state if this was the first successful boot-probe.
336         if booted && (wkr.state == StateUnknown || wkr.state == StateBooting) {
337                 if wkr.state == StateBooting {
338                         wkr.reportTimeBetweenFirstSSHAndReadyForContainer()
339                 }
340                 // Note: this will change again below if
341                 // len(wkr.starting)+len(wkr.running) > 0.
342                 wkr.state = StateIdle
343                 changed = true
344         }
345
346         // If wkr.state and wkr.running aren't changing then there's
347         // no need to log anything, notify the scheduler, move state
348         // back and forth between idle/running, etc.
349         if !changed {
350                 return
351         }
352
353         // Log whenever a run-probe reveals crunch-run processes
354         // appearing/disappearing before boot-probe succeeds.
355         if wkr.state == StateUnknown && changed {
356                 logger.WithFields(logrus.Fields{
357                         "RunningContainers": len(wkr.running),
358                         "State":             wkr.state,
359                 }).Info("crunch-run probe succeeded, but boot probe is still failing")
360         }
361
362         if wkr.state == StateIdle && len(wkr.starting)+len(wkr.running) > 0 {
363                 wkr.state = StateRunning
364         } else if wkr.state == StateRunning && len(wkr.starting)+len(wkr.running) == 0 {
365                 wkr.state = StateIdle
366         }
367         wkr.updated = updateTime
368         if booted && (initialState == StateUnknown || initialState == StateBooting) {
369                 wkr.reportBootOutcome(BootOutcomeSucceeded)
370                 logger.WithFields(logrus.Fields{
371                         "RunningContainers": len(wkr.running),
372                         "State":             wkr.state,
373                 }).Info("probes succeeded, instance is in service")
374         }
375         go wkr.wp.notify()
376 }
377
378 func (wkr *worker) probeRunning() (running []string, reportsBroken, ok bool) {
379         cmd := wkr.wp.runnerCmd + " --list"
380         if u := wkr.instance.RemoteUser(); u != "root" {
381                 cmd = "sudo " + cmd
382         }
383         before := time.Now()
384         stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
385         if err != nil {
386                 wkr.logger.WithFields(logrus.Fields{
387                         "Command": cmd,
388                         "stdout":  string(stdout),
389                         "stderr":  string(stderr),
390                 }).WithError(err).Warn("probe failed")
391                 wkr.wp.mRunProbeDuration.WithLabelValues("fail").Observe(time.Now().Sub(before).Seconds())
392                 return
393         }
394         wkr.logger.WithFields(logrus.Fields{
395                 "Command": cmd,
396                 "stdout":  string(stdout),
397                 "stderr":  string(stderr),
398         }).Debug("probe succeeded")
399         wkr.wp.mRunProbeDuration.WithLabelValues("success").Observe(time.Now().Sub(before).Seconds())
400         ok = true
401
402         staleRunLock := false
403         for _, s := range strings.Split(string(stdout), "\n") {
404                 // Each line of the "crunch-run --list" output is one
405                 // of the following:
406                 //
407                 // * a container UUID, indicating that processes
408                 //   related to that container are currently running.
409                 //   Optionally followed by " stale", indicating that
410                 //   the crunch-run process itself has exited (the
411                 //   remaining process is probably arv-mount).
412                 //
413                 // * the string "broken", indicating that the instance
414                 //   appears incapable of starting containers.
415                 //
416                 // See ListProcesses() in lib/crunchrun/background.go.
417                 if s == "" {
418                         // empty string following final newline
419                 } else if s == "broken" {
420                         reportsBroken = true
421                 } else if !strings.HasPrefix(s, wkr.wp.cluster.ClusterID) {
422                         // Ignore crunch-run processes that belong to
423                         // a different cluster (e.g., a single host
424                         // running multiple clusters with the loopback
425                         // driver)
426                         continue
427                 } else if toks := strings.Split(s, " "); len(toks) == 1 {
428                         running = append(running, s)
429                 } else if toks[1] == "stale" {
430                         wkr.logger.WithField("ContainerUUID", toks[0]).Info("probe reported stale run lock")
431                         staleRunLock = true
432                 }
433         }
434         wkr.mtx.Lock()
435         defer wkr.mtx.Unlock()
436         if !staleRunLock {
437                 wkr.staleRunLockSince = time.Time{}
438         } else if wkr.staleRunLockSince.IsZero() {
439                 wkr.staleRunLockSince = time.Now()
440         } else if dur := time.Now().Sub(wkr.staleRunLockSince); dur > wkr.wp.timeoutStaleRunLock {
441                 wkr.logger.WithField("Duration", dur).Warn("reporting broken after reporting stale run lock for too long")
442                 reportsBroken = true
443         }
444         return
445 }
446
447 func (wkr *worker) probeBooted() (ok bool, stderr []byte) {
448         cmd := wkr.wp.bootProbeCommand
449         if cmd == "" {
450                 cmd = "true"
451         }
452         stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
453         logger := wkr.logger.WithFields(logrus.Fields{
454                 "Command": cmd,
455                 "stdout":  string(stdout),
456                 "stderr":  string(stderr),
457         })
458         if err != nil {
459                 logger.WithError(err).Debug("boot probe failed")
460                 return false, stderr
461         }
462         logger.Info("boot probe succeeded")
463         if err = wkr.wp.loadRunnerData(); err != nil {
464                 wkr.logger.WithError(err).Warn("cannot boot worker: error loading runner binary")
465                 return false, stderr
466         } else if len(wkr.wp.runnerData) == 0 {
467                 // Assume crunch-run is already installed
468         } else if _, stderr2, err := wkr.copyRunnerData(); err != nil {
469                 wkr.logger.WithError(err).WithField("stderr", string(stderr2)).Warn("error copying runner binary")
470                 return false, stderr2
471         } else {
472                 stderr = append(stderr, stderr2...)
473         }
474         return true, stderr
475 }
476
477 func (wkr *worker) copyRunnerData() (stdout, stderr []byte, err error) {
478         hash := fmt.Sprintf("%x", wkr.wp.runnerMD5)
479         dstdir, _ := filepath.Split(wkr.wp.runnerCmd)
480         logger := wkr.logger.WithFields(logrus.Fields{
481                 "hash": hash,
482                 "path": wkr.wp.runnerCmd,
483         })
484
485         stdout, stderr, err = wkr.executor.Execute(nil, `md5sum `+wkr.wp.runnerCmd, nil)
486         if err == nil && len(stderr) == 0 && bytes.Equal(stdout, []byte(hash+"  "+wkr.wp.runnerCmd+"\n")) {
487                 logger.Info("runner binary already exists on worker, with correct hash")
488                 return
489         }
490
491         // Note touch+chmod come before writing data, to avoid the
492         // possibility of md5 being correct while file mode is
493         // incorrect.
494         cmd := `set -e; dstdir="` + dstdir + `"; dstfile="` + wkr.wp.runnerCmd + `"; mkdir -p "$dstdir"; touch "$dstfile"; chmod 0755 "$dstdir" "$dstfile"; cat >"$dstfile"`
495         if wkr.instance.RemoteUser() != "root" {
496                 cmd = `sudo sh -c '` + strings.Replace(cmd, "'", "'\\''", -1) + `'`
497         }
498         logger.WithField("cmd", cmd).Info("installing runner binary on worker")
499         stdout, stderr, err = wkr.executor.Execute(nil, cmd, bytes.NewReader(wkr.wp.runnerData))
500         return
501 }
502
503 // caller must have lock.
504 func (wkr *worker) shutdownIfBroken(dur time.Duration) bool {
505         if wkr.idleBehavior == IdleBehaviorHold {
506                 // Never shut down.
507                 return false
508         }
509         label, threshold := "", wkr.wp.timeoutProbe
510         if wkr.state == StateUnknown || wkr.state == StateBooting {
511                 label, threshold = "new ", wkr.wp.timeoutBooting
512         }
513         if dur < threshold {
514                 return false
515         }
516         wkr.logger.WithFields(logrus.Fields{
517                 "Duration": dur,
518                 "Since":    wkr.probed,
519                 "State":    wkr.state,
520         }).Warnf("%sinstance unresponsive, shutting down", label)
521         wkr.shutdown()
522         return true
523 }
524
525 // Returns true if the instance is eligible for shutdown: either it's
526 // been idle too long, or idleBehavior=Drain and nothing is running.
527 //
528 // caller must have lock.
529 func (wkr *worker) eligibleForShutdown() bool {
530         if wkr.idleBehavior == IdleBehaviorHold {
531                 return false
532         }
533         draining := wkr.idleBehavior == IdleBehaviorDrain
534         switch wkr.state {
535         case StateBooting:
536                 return draining
537         case StateIdle:
538                 return draining || time.Since(wkr.busy) >= wkr.wp.timeoutIdle
539         case StateRunning:
540                 if !draining {
541                         return false
542                 }
543                 for _, rr := range wkr.running {
544                         if !rr.givenup {
545                                 return false
546                         }
547                 }
548                 for _, rr := range wkr.starting {
549                         if !rr.givenup {
550                                 return false
551                         }
552                 }
553                 // draining, and all remaining runners are just trying
554                 // to force-kill their crunch-run procs
555                 return true
556         default:
557                 return false
558         }
559 }
560
561 // caller must have lock.
562 func (wkr *worker) shutdownIfIdle() bool {
563         if !wkr.eligibleForShutdown() {
564                 return false
565         }
566         wkr.logger.WithFields(logrus.Fields{
567                 "State":        wkr.state,
568                 "IdleDuration": stats.Duration(time.Since(wkr.busy)),
569                 "IdleBehavior": wkr.idleBehavior,
570         }).Info("shutdown worker")
571         wkr.reportBootOutcome(BootOutcomeAborted)
572         wkr.shutdown()
573         return true
574 }
575
576 // caller must have lock.
577 func (wkr *worker) shutdown() {
578         now := time.Now()
579         wkr.updated = now
580         wkr.destroyed = now
581         wkr.state = StateShutdown
582         go wkr.wp.notify()
583         go func() {
584                 err := wkr.instance.Destroy()
585                 if err != nil {
586                         wkr.logger.WithError(err).Warn("shutdown failed")
587                         return
588                 }
589         }()
590 }
591
592 // Save worker tags to cloud provider metadata, if they don't already
593 // match. Caller must have lock.
594 func (wkr *worker) saveTags() {
595         instance := wkr.instance
596         tags := instance.Tags()
597         update := cloud.InstanceTags{
598                 wkr.wp.tagKeyPrefix + tagKeyInstanceType: wkr.instType.Name,
599                 wkr.wp.tagKeyPrefix + tagKeyIdleBehavior: string(wkr.idleBehavior),
600         }
601         save := false
602         for k, v := range update {
603                 if tags[k] != v {
604                         tags[k] = v
605                         save = true
606                 }
607         }
608         if save {
609                 go func() {
610                         err := instance.SetTags(tags)
611                         if err != nil {
612                                 wkr.wp.logger.WithField("Instance", instance.ID()).WithError(err).Warnf("error updating tags")
613                         }
614                 }()
615         }
616 }
617
618 func (wkr *worker) Close() {
619         // This might take time, so do it after unlocking mtx.
620         defer wkr.executor.Close()
621
622         wkr.mtx.Lock()
623         defer wkr.mtx.Unlock()
624         for uuid, rr := range wkr.running {
625                 wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process abandoned")
626                 rr.Close()
627         }
628         for uuid, rr := range wkr.starting {
629                 wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process abandoned")
630                 rr.Close()
631         }
632 }
633
634 // Add/remove entries in wkr.running to match ctrUUIDs returned by a
635 // probe. Returns true if anything was added or removed.
636 //
637 // Caller must have lock.
638 func (wkr *worker) updateRunning(ctrUUIDs []string) (changed bool) {
639         alive := map[string]bool{}
640         for _, uuid := range ctrUUIDs {
641                 alive[uuid] = true
642                 if _, ok := wkr.running[uuid]; ok {
643                         // unchanged
644                 } else if rr, ok := wkr.starting[uuid]; ok {
645                         wkr.running[uuid] = rr
646                         delete(wkr.starting, uuid)
647                         changed = true
648                 } else {
649                         // We didn't start it -- it must have been
650                         // started by a previous dispatcher process.
651                         wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process detected")
652                         wkr.running[uuid] = newRemoteRunner(uuid, wkr)
653                         changed = true
654                 }
655         }
656         for uuid := range wkr.running {
657                 if !alive[uuid] {
658                         wkr.closeRunner(uuid)
659                         changed = true
660                 }
661         }
662         return
663 }
664
665 // caller must have lock.
666 func (wkr *worker) closeRunner(uuid string) {
667         rr := wkr.running[uuid]
668         if rr == nil {
669                 return
670         }
671         wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process ended")
672         delete(wkr.running, uuid)
673         rr.Close()
674
675         now := time.Now()
676         wkr.updated = now
677         wkr.wp.exited[uuid] = now
678         if wkr.state == StateRunning && len(wkr.running)+len(wkr.starting) == 0 {
679                 wkr.state = StateIdle
680         }
681 }