20259: Add documentation for banner and tooltip features
[arvados.git] / lib / dispatchcloud / worker / worker.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package worker
6
7 import (
8         "bytes"
9         "encoding/json"
10         "fmt"
11         "io"
12         "path/filepath"
13         "strings"
14         "sync"
15         "time"
16
17         "git.arvados.org/arvados.git/lib/cloud"
18         "git.arvados.org/arvados.git/sdk/go/arvados"
19         "git.arvados.org/arvados.git/sdk/go/stats"
20         "github.com/sirupsen/logrus"
21 )
22
23 const (
24         // TODO: configurable
25         maxPingFailTime = 10 * time.Minute
26 )
27
28 // State indicates whether a worker is available to do work, and (if
29 // not) whether/when it is expected to become ready.
30 type State int
31
32 const (
33         StateUnknown  State = iota // might be running a container already
34         StateBooting               // instance is booting
35         StateIdle                  // instance booted, no containers are running
36         StateRunning               // instance is running one or more containers
37         StateShutdown              // worker has stopped monitoring the instance
38 )
39
40 var stateString = map[State]string{
41         StateUnknown:  "unknown",
42         StateBooting:  "booting",
43         StateIdle:     "idle",
44         StateRunning:  "running",
45         StateShutdown: "shutdown",
46 }
47
48 // String implements fmt.Stringer.
49 func (s State) String() string {
50         return stateString[s]
51 }
52
53 // MarshalText implements encoding.TextMarshaler so a JSON encoding of
54 // map[State]anything uses the state's string representation.
55 func (s State) MarshalText() ([]byte, error) {
56         return []byte(stateString[s]), nil
57 }
58
59 // BootOutcome is the result of a worker boot. It is used as a label in a metric.
60 type BootOutcome string
61
62 const (
63         BootOutcomeFailed      BootOutcome = "failure"
64         BootOutcomeSucceeded   BootOutcome = "success"
65         BootOutcomeAborted     BootOutcome = "aborted"
66         BootOutcomeDisappeared BootOutcome = "disappeared"
67 )
68
69 var validBootOutcomes = map[BootOutcome]bool{
70         BootOutcomeFailed:      true,
71         BootOutcomeSucceeded:   true,
72         BootOutcomeAborted:     true,
73         BootOutcomeDisappeared: true,
74 }
75
76 // IdleBehavior indicates the behavior desired when a node becomes idle.
77 type IdleBehavior string
78
79 const (
80         IdleBehaviorRun   IdleBehavior = "run"   // run containers, or shutdown on idle timeout
81         IdleBehaviorHold  IdleBehavior = "hold"  // don't shutdown or run more containers
82         IdleBehaviorDrain IdleBehavior = "drain" // shutdown immediately when idle
83 )
84
85 var validIdleBehavior = map[IdleBehavior]bool{
86         IdleBehaviorRun:   true,
87         IdleBehaviorHold:  true,
88         IdleBehaviorDrain: true,
89 }
90
91 type worker struct {
92         logger   logrus.FieldLogger
93         executor Executor
94         wp       *Pool
95
96         mtx                 sync.Locker // must be wp's Locker.
97         state               State
98         idleBehavior        IdleBehavior
99         instance            cloud.Instance
100         instType            arvados.InstanceType
101         vcpus               int64
102         memory              int64
103         appeared            time.Time
104         probed              time.Time
105         updated             time.Time
106         busy                time.Time
107         destroyed           time.Time
108         firstSSHConnection  time.Time
109         lastUUID            string
110         running             map[string]*remoteRunner // remember to update state idle<->running when this changes
111         starting            map[string]*remoteRunner // remember to update state idle<->running when this changes
112         probing             chan struct{}
113         bootOutcomeReported bool
114         timeToReadyReported bool
115         staleRunLockSince   time.Time
116 }
117
118 func (wkr *worker) onUnkillable(uuid string) {
119         wkr.mtx.Lock()
120         defer wkr.mtx.Unlock()
121         logger := wkr.logger.WithField("ContainerUUID", uuid)
122         if wkr.idleBehavior == IdleBehaviorHold {
123                 logger.Warn("unkillable container, but worker has IdleBehavior=Hold")
124                 return
125         }
126         logger.Warn("unkillable container, draining worker")
127         wkr.setIdleBehavior(IdleBehaviorDrain)
128 }
129
130 func (wkr *worker) onKilled(uuid string) {
131         wkr.mtx.Lock()
132         defer wkr.mtx.Unlock()
133         wkr.closeRunner(uuid)
134         go wkr.wp.notify()
135 }
136
137 // caller must have lock.
138 func (wkr *worker) reportBootOutcome(outcome BootOutcome) {
139         if wkr.bootOutcomeReported {
140                 return
141         }
142         if wkr.wp.mBootOutcomes != nil {
143                 wkr.wp.mBootOutcomes.WithLabelValues(string(outcome)).Inc()
144         }
145         wkr.bootOutcomeReported = true
146 }
147
148 // caller must have lock.
149 func (wkr *worker) reportTimeBetweenFirstSSHAndReadyForContainer() {
150         if wkr.timeToReadyReported {
151                 return
152         }
153         if wkr.wp.mTimeToSSH != nil {
154                 wkr.wp.mTimeToReadyForContainer.Observe(time.Since(wkr.firstSSHConnection).Seconds())
155         }
156         wkr.timeToReadyReported = true
157 }
158
159 // caller must have lock.
160 func (wkr *worker) setIdleBehavior(idleBehavior IdleBehavior) {
161         wkr.logger.WithField("IdleBehavior", idleBehavior).Info("set idle behavior")
162         wkr.idleBehavior = idleBehavior
163         wkr.saveTags()
164         wkr.shutdownIfIdle()
165 }
166
167 // caller must have lock.
168 func (wkr *worker) startContainer(ctr arvados.Container) {
169         logger := wkr.logger.WithFields(logrus.Fields{
170                 "ContainerUUID": ctr.UUID,
171                 "Priority":      ctr.Priority,
172         })
173         logger.Debug("starting container")
174         rr := newRemoteRunner(ctr.UUID, wkr)
175         wkr.starting[ctr.UUID] = rr
176         if wkr.state != StateRunning {
177                 wkr.state = StateRunning
178                 go wkr.wp.notify()
179         }
180         go func() {
181                 rr.Start()
182                 if wkr.wp.mTimeFromQueueToCrunchRun != nil {
183                         wkr.wp.mTimeFromQueueToCrunchRun.Observe(time.Since(ctr.CreatedAt).Seconds())
184                 }
185                 wkr.mtx.Lock()
186                 defer wkr.mtx.Unlock()
187                 now := time.Now()
188                 wkr.updated = now
189                 wkr.busy = now
190                 delete(wkr.starting, ctr.UUID)
191                 wkr.running[ctr.UUID] = rr
192                 wkr.lastUUID = ctr.UUID
193         }()
194 }
195
196 // ProbeAndUpdate conducts appropriate boot/running probes (if any)
197 // for the worker's current state. If a previous probe is still
198 // running, it does nothing.
199 //
200 // It should be called in a new goroutine.
201 func (wkr *worker) ProbeAndUpdate() {
202         select {
203         case wkr.probing <- struct{}{}:
204                 wkr.probeAndUpdate()
205                 <-wkr.probing
206         default:
207                 wkr.logger.Debug("still waiting for last probe to finish")
208         }
209 }
210
211 // probeAndUpdate calls probeBooted and/or probeRunning if needed, and
212 // updates state accordingly.
213 //
214 // In StateUnknown: Call both probeBooted and probeRunning.
215 // In StateBooting: Call probeBooted; if successful, call probeRunning.
216 // In StateRunning: Call probeRunning.
217 // In StateIdle: Call probeRunning.
218 // In StateShutdown: Do nothing.
219 //
220 // If both probes succeed, wkr.state changes to
221 // StateIdle/StateRunning.
222 //
223 // If probeRunning succeeds, wkr.running is updated. (This means
224 // wkr.running might be non-empty even in StateUnknown, if the boot
225 // probe failed.)
226 //
227 // probeAndUpdate should be called in a new goroutine.
228 func (wkr *worker) probeAndUpdate() {
229         wkr.mtx.Lock()
230         updated := wkr.updated
231         initialState := wkr.state
232         wkr.mtx.Unlock()
233
234         var (
235                 booted   bool
236                 ctrUUIDs []string
237                 ok       bool
238                 stderr   []byte // from probeBooted
239         )
240
241         switch initialState {
242         case StateShutdown:
243                 return
244         case StateIdle, StateRunning:
245                 booted = true
246         case StateUnknown, StateBooting:
247         default:
248                 panic(fmt.Sprintf("unknown state %s", initialState))
249         }
250
251         probeStart := time.Now()
252         logger := wkr.logger.WithField("ProbeStart", probeStart)
253
254         if !booted {
255                 booted, stderr = wkr.probeBooted()
256                 if !booted {
257                         // Pretend this probe succeeded if another
258                         // concurrent attempt succeeded.
259                         wkr.mtx.Lock()
260                         booted = wkr.state == StateRunning || wkr.state == StateIdle
261                         wkr.mtx.Unlock()
262                 }
263                 if booted {
264                         logger.Info("instance booted; will try probeRunning")
265                 }
266         }
267         reportedBroken := false
268         if booted || wkr.state == StateUnknown {
269                 ctrUUIDs, reportedBroken, ok = wkr.probeRunning()
270         }
271         wkr.mtx.Lock()
272         defer wkr.mtx.Unlock()
273         if reportedBroken && wkr.idleBehavior == IdleBehaviorRun {
274                 logger.Info("probe reported broken instance")
275                 wkr.reportBootOutcome(BootOutcomeFailed)
276                 wkr.setIdleBehavior(IdleBehaviorDrain)
277         }
278         if !ok || (!booted && len(ctrUUIDs) == 0 && len(wkr.running) == 0) {
279                 if wkr.state == StateShutdown && wkr.updated.After(updated) {
280                         // Skip the logging noise if shutdown was
281                         // initiated during probe.
282                         return
283                 }
284                 // Using the start time of the probe as the timeout
285                 // threshold ensures we always initiate at least one
286                 // probe attempt after the boot/probe timeout expires
287                 // (otherwise, a slow probe failure could cause us to
288                 // shutdown an instance even though it did in fact
289                 // boot/recover before the timeout expired).
290                 dur := probeStart.Sub(wkr.probed)
291                 if wkr.shutdownIfBroken(dur) {
292                         // stderr from failed run-probes will have
293                         // been logged already, but boot-probe
294                         // failures are normal so they are logged only
295                         // at Debug level. This is our chance to log
296                         // some evidence about why the node never
297                         // booted, even in non-debug mode.
298                         if !booted {
299                                 wkr.reportBootOutcome(BootOutcomeFailed)
300                                 logger.WithFields(logrus.Fields{
301                                         "Duration": dur,
302                                         "stderr":   string(stderr),
303                                 }).Info("boot failed")
304                         }
305                 }
306                 return
307         }
308
309         updateTime := time.Now()
310         wkr.probed = updateTime
311
312         if updated != wkr.updated {
313                 // Worker was updated after the probe began, so
314                 // wkr.running might have a container UUID that was
315                 // not yet running when ctrUUIDs was generated. Leave
316                 // wkr.running alone and wait for the next probe to
317                 // catch up on any changes.
318                 logger.WithFields(logrus.Fields{
319                         "updated":     updated,
320                         "wkr.updated": wkr.updated,
321                 }).Debug("skipping worker state update due to probe/sync race")
322                 return
323         }
324
325         if len(ctrUUIDs) > 0 {
326                 wkr.busy = updateTime
327                 wkr.lastUUID = ctrUUIDs[0]
328         } else if len(wkr.running) > 0 {
329                 // Actual last-busy time was sometime between wkr.busy
330                 // and now. Now is the earliest opportunity to take
331                 // advantage of the non-busy state, though.
332                 wkr.busy = updateTime
333         }
334
335         changed := wkr.updateRunning(ctrUUIDs)
336
337         // Update state if this was the first successful boot-probe.
338         if booted && (wkr.state == StateUnknown || wkr.state == StateBooting) {
339                 if wkr.state == StateBooting {
340                         wkr.reportTimeBetweenFirstSSHAndReadyForContainer()
341                 }
342                 // Note: this will change again below if
343                 // len(wkr.starting)+len(wkr.running) > 0.
344                 wkr.state = StateIdle
345                 changed = true
346         }
347
348         // If wkr.state and wkr.running aren't changing then there's
349         // no need to log anything, notify the scheduler, move state
350         // back and forth between idle/running, etc.
351         if !changed {
352                 return
353         }
354
355         // Log whenever a run-probe reveals crunch-run processes
356         // appearing/disappearing before boot-probe succeeds.
357         if wkr.state == StateUnknown && changed {
358                 logger.WithFields(logrus.Fields{
359                         "RunningContainers": len(wkr.running),
360                         "State":             wkr.state,
361                 }).Info("crunch-run probe succeeded, but boot probe is still failing")
362         }
363
364         if wkr.state == StateIdle && len(wkr.starting)+len(wkr.running) > 0 {
365                 wkr.state = StateRunning
366         } else if wkr.state == StateRunning && len(wkr.starting)+len(wkr.running) == 0 {
367                 wkr.state = StateIdle
368         }
369         wkr.updated = updateTime
370         if booted && (initialState == StateUnknown || initialState == StateBooting) {
371                 wkr.reportBootOutcome(BootOutcomeSucceeded)
372                 logger.WithFields(logrus.Fields{
373                         "RunningContainers": len(wkr.running),
374                         "State":             wkr.state,
375                 }).Info("probes succeeded, instance is in service")
376         }
377         go wkr.wp.notify()
378 }
379
380 func (wkr *worker) probeRunning() (running []string, reportsBroken, ok bool) {
381         cmd := wkr.wp.runnerCmd + " --list"
382         if u := wkr.instance.RemoteUser(); u != "root" {
383                 cmd = "sudo " + cmd
384         }
385         before := time.Now()
386         var stdin io.Reader
387         if prices := wkr.instance.PriceHistory(wkr.instType); len(prices) > 0 {
388                 j, _ := json.Marshal(prices)
389                 stdin = bytes.NewReader(j)
390         }
391         stdout, stderr, err := wkr.executor.Execute(nil, cmd, stdin)
392         if err != nil {
393                 wkr.logger.WithFields(logrus.Fields{
394                         "Command": cmd,
395                         "stdout":  string(stdout),
396                         "stderr":  string(stderr),
397                 }).WithError(err).Warn("probe failed")
398                 wkr.wp.mRunProbeDuration.WithLabelValues("fail").Observe(time.Now().Sub(before).Seconds())
399                 return
400         }
401         wkr.logger.WithFields(logrus.Fields{
402                 "Command": cmd,
403                 "stdout":  string(stdout),
404                 "stderr":  string(stderr),
405         }).Debug("probe succeeded")
406         wkr.wp.mRunProbeDuration.WithLabelValues("success").Observe(time.Now().Sub(before).Seconds())
407         ok = true
408
409         staleRunLock := false
410         for _, s := range strings.Split(string(stdout), "\n") {
411                 // Each line of the "crunch-run --list" output is one
412                 // of the following:
413                 //
414                 // * a container UUID, indicating that processes
415                 //   related to that container are currently running.
416                 //   Optionally followed by " stale", indicating that
417                 //   the crunch-run process itself has exited (the
418                 //   remaining process is probably arv-mount).
419                 //
420                 // * the string "broken", indicating that the instance
421                 //   appears incapable of starting containers.
422                 //
423                 // See ListProcesses() in lib/crunchrun/background.go.
424                 if s == "" {
425                         // empty string following final newline
426                 } else if s == "broken" {
427                         reportsBroken = true
428                 } else if !strings.HasPrefix(s, wkr.wp.cluster.ClusterID) {
429                         // Ignore crunch-run processes that belong to
430                         // a different cluster (e.g., a single host
431                         // running multiple clusters with the loopback
432                         // driver)
433                         continue
434                 } else if toks := strings.Split(s, " "); len(toks) == 1 {
435                         running = append(running, s)
436                 } else if toks[1] == "stale" {
437                         wkr.logger.WithField("ContainerUUID", toks[0]).Info("probe reported stale run lock")
438                         staleRunLock = true
439                 }
440         }
441         wkr.mtx.Lock()
442         defer wkr.mtx.Unlock()
443         if !staleRunLock {
444                 wkr.staleRunLockSince = time.Time{}
445         } else if wkr.staleRunLockSince.IsZero() {
446                 wkr.staleRunLockSince = time.Now()
447         } else if dur := time.Now().Sub(wkr.staleRunLockSince); dur > wkr.wp.timeoutStaleRunLock {
448                 wkr.logger.WithField("Duration", dur).Warn("reporting broken after reporting stale run lock for too long")
449                 reportsBroken = true
450         }
451         return
452 }
453
454 func (wkr *worker) probeBooted() (ok bool, stderr []byte) {
455         cmd := wkr.wp.bootProbeCommand
456         if cmd == "" {
457                 cmd = "true"
458         }
459         stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
460         logger := wkr.logger.WithFields(logrus.Fields{
461                 "Command": cmd,
462                 "stdout":  string(stdout),
463                 "stderr":  string(stderr),
464         })
465         if err != nil {
466                 logger.WithError(err).Debug("boot probe failed")
467                 return false, stderr
468         }
469         logger.Info("boot probe succeeded")
470         if err = wkr.wp.loadRunnerData(); err != nil {
471                 wkr.logger.WithError(err).Warn("cannot boot worker: error loading runner binary")
472                 return false, stderr
473         } else if len(wkr.wp.runnerData) == 0 {
474                 // Assume crunch-run is already installed
475         } else if _, stderr2, err := wkr.copyRunnerData(); err != nil {
476                 wkr.logger.WithError(err).WithField("stderr", string(stderr2)).Warn("error copying runner binary")
477                 return false, stderr2
478         } else {
479                 stderr = append(stderr, stderr2...)
480         }
481         return true, stderr
482 }
483
484 func (wkr *worker) copyRunnerData() (stdout, stderr []byte, err error) {
485         hash := fmt.Sprintf("%x", wkr.wp.runnerMD5)
486         dstdir, _ := filepath.Split(wkr.wp.runnerCmd)
487         logger := wkr.logger.WithFields(logrus.Fields{
488                 "hash": hash,
489                 "path": wkr.wp.runnerCmd,
490         })
491
492         stdout, stderr, err = wkr.executor.Execute(nil, `md5sum `+wkr.wp.runnerCmd, nil)
493         if err == nil && len(stderr) == 0 && bytes.Equal(stdout, []byte(hash+"  "+wkr.wp.runnerCmd+"\n")) {
494                 logger.Info("runner binary already exists on worker, with correct hash")
495                 return
496         }
497
498         // Note touch+chmod come before writing data, to avoid the
499         // possibility of md5 being correct while file mode is
500         // incorrect.
501         cmd := `set -e; dstdir="` + dstdir + `"; dstfile="` + wkr.wp.runnerCmd + `"; mkdir -p "$dstdir"; touch "$dstfile"; chmod 0755 "$dstdir" "$dstfile"; cat >"$dstfile"`
502         if wkr.instance.RemoteUser() != "root" {
503                 cmd = `sudo sh -c '` + strings.Replace(cmd, "'", "'\\''", -1) + `'`
504         }
505         logger.WithField("cmd", cmd).Info("installing runner binary on worker")
506         stdout, stderr, err = wkr.executor.Execute(nil, cmd, bytes.NewReader(wkr.wp.runnerData))
507         return
508 }
509
510 // caller must have lock.
511 func (wkr *worker) shutdownIfBroken(dur time.Duration) bool {
512         if wkr.idleBehavior == IdleBehaviorHold {
513                 // Never shut down.
514                 return false
515         }
516         label, threshold := "", wkr.wp.timeoutProbe
517         if wkr.state == StateUnknown || wkr.state == StateBooting {
518                 label, threshold = "new ", wkr.wp.timeoutBooting
519         }
520         if dur < threshold {
521                 return false
522         }
523         wkr.logger.WithFields(logrus.Fields{
524                 "Duration": dur,
525                 "Since":    wkr.probed,
526                 "State":    wkr.state,
527         }).Warnf("%sinstance unresponsive, shutting down", label)
528         wkr.shutdown()
529         return true
530 }
531
532 // Returns true if the instance is eligible for shutdown: either it's
533 // been idle too long, or idleBehavior=Drain and nothing is running.
534 //
535 // caller must have lock.
536 func (wkr *worker) eligibleForShutdown() bool {
537         if wkr.idleBehavior == IdleBehaviorHold {
538                 return false
539         }
540         draining := wkr.idleBehavior == IdleBehaviorDrain
541         switch wkr.state {
542         case StateBooting:
543                 return draining
544         case StateIdle:
545                 return draining || time.Since(wkr.busy) >= wkr.wp.timeoutIdle
546         case StateRunning:
547                 if !draining {
548                         return false
549                 }
550                 for _, rr := range wkr.running {
551                         if !rr.givenup {
552                                 return false
553                         }
554                 }
555                 for _, rr := range wkr.starting {
556                         if !rr.givenup {
557                                 return false
558                         }
559                 }
560                 // draining, and all remaining runners are just trying
561                 // to force-kill their crunch-run procs
562                 return true
563         default:
564                 return false
565         }
566 }
567
568 // caller must have lock.
569 func (wkr *worker) shutdownIfIdle() bool {
570         if !wkr.eligibleForShutdown() {
571                 return false
572         }
573         wkr.logger.WithFields(logrus.Fields{
574                 "State":        wkr.state,
575                 "IdleDuration": stats.Duration(time.Since(wkr.busy)),
576                 "IdleBehavior": wkr.idleBehavior,
577         }).Info("shutdown worker")
578         wkr.reportBootOutcome(BootOutcomeAborted)
579         wkr.shutdown()
580         return true
581 }
582
583 // caller must have lock.
584 func (wkr *worker) shutdown() {
585         now := time.Now()
586         wkr.updated = now
587         wkr.destroyed = now
588         wkr.state = StateShutdown
589         go wkr.wp.notify()
590         go func() {
591                 err := wkr.instance.Destroy()
592                 if err != nil {
593                         wkr.logger.WithError(err).Warn("shutdown failed")
594                         return
595                 }
596         }()
597 }
598
599 // Save worker tags to cloud provider metadata, if they don't already
600 // match. Caller must have lock.
601 func (wkr *worker) saveTags() {
602         instance := wkr.instance
603         tags := instance.Tags()
604         update := cloud.InstanceTags{
605                 wkr.wp.tagKeyPrefix + tagKeyInstanceType: wkr.instType.Name,
606                 wkr.wp.tagKeyPrefix + tagKeyIdleBehavior: string(wkr.idleBehavior),
607         }
608         save := false
609         for k, v := range update {
610                 if tags[k] != v {
611                         tags[k] = v
612                         save = true
613                 }
614         }
615         if save {
616                 go func() {
617                         err := instance.SetTags(tags)
618                         if err != nil {
619                                 wkr.wp.logger.WithField("Instance", instance.ID()).WithError(err).Warnf("error updating tags")
620                         }
621                 }()
622         }
623 }
624
625 func (wkr *worker) Close() {
626         // This might take time, so do it after unlocking mtx.
627         defer wkr.executor.Close()
628
629         wkr.mtx.Lock()
630         defer wkr.mtx.Unlock()
631         for uuid, rr := range wkr.running {
632                 wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process abandoned")
633                 rr.Close()
634         }
635         for uuid, rr := range wkr.starting {
636                 wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process abandoned")
637                 rr.Close()
638         }
639 }
640
641 // Add/remove entries in wkr.running to match ctrUUIDs returned by a
642 // probe. Returns true if anything was added or removed.
643 //
644 // Caller must have lock.
645 func (wkr *worker) updateRunning(ctrUUIDs []string) (changed bool) {
646         alive := map[string]bool{}
647         for _, uuid := range ctrUUIDs {
648                 alive[uuid] = true
649                 if _, ok := wkr.running[uuid]; ok {
650                         // unchanged
651                 } else if rr, ok := wkr.starting[uuid]; ok {
652                         wkr.running[uuid] = rr
653                         delete(wkr.starting, uuid)
654                         changed = true
655                 } else {
656                         // We didn't start it -- it must have been
657                         // started by a previous dispatcher process.
658                         wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process detected")
659                         wkr.running[uuid] = newRemoteRunner(uuid, wkr)
660                         changed = true
661                 }
662         }
663         for uuid := range wkr.running {
664                 if !alive[uuid] {
665                         wkr.closeRunner(uuid)
666                         changed = true
667                 }
668         }
669         return
670 }
671
672 // caller must have lock.
673 func (wkr *worker) closeRunner(uuid string) {
674         rr := wkr.running[uuid]
675         if rr == nil {
676                 return
677         }
678         wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process ended")
679         delete(wkr.running, uuid)
680         rr.Close()
681
682         now := time.Now()
683         wkr.updated = now
684         wkr.wp.exited[uuid] = now
685         if wkr.state == StateRunning && len(wkr.running)+len(wkr.starting) == 0 {
686                 wkr.state = StateIdle
687         }
688 }