Merge branch '20457-careful-near-quota'
[arvados.git] / lib / dispatchcloud / worker / worker.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package worker
6
7 import (
8         "bytes"
9         "encoding/json"
10         "fmt"
11         "io"
12         "path/filepath"
13         "strings"
14         "sync"
15         "time"
16
17         "git.arvados.org/arvados.git/lib/cloud"
18         "git.arvados.org/arvados.git/sdk/go/arvados"
19         "git.arvados.org/arvados.git/sdk/go/stats"
20         "github.com/sirupsen/logrus"
21 )
22
23 const (
24         // TODO: configurable
25         maxPingFailTime = 10 * time.Minute
26 )
27
28 // State indicates whether a worker is available to do work, and (if
29 // not) whether/when it is expected to become ready.
30 type State int
31
32 const (
33         StateUnknown  State = iota // might be running a container already
34         StateBooting               // instance is booting
35         StateIdle                  // instance booted, no containers are running
36         StateRunning               // instance is running one or more containers
37         StateShutdown              // worker has stopped monitoring the instance
38 )
39
40 var stateString = map[State]string{
41         StateUnknown:  "unknown",
42         StateBooting:  "booting",
43         StateIdle:     "idle",
44         StateRunning:  "running",
45         StateShutdown: "shutdown",
46 }
47
48 // String implements fmt.Stringer.
49 func (s State) String() string {
50         return stateString[s]
51 }
52
53 // MarshalText implements encoding.TextMarshaler so a JSON encoding of
54 // map[State]anything uses the state's string representation.
55 func (s State) MarshalText() ([]byte, error) {
56         return []byte(stateString[s]), nil
57 }
58
59 // BootOutcome is the result of a worker boot. It is used as a label in a metric.
60 type BootOutcome string
61
62 const (
63         BootOutcomeFailed      BootOutcome = "failure"
64         BootOutcomeSucceeded   BootOutcome = "success"
65         BootOutcomeAborted     BootOutcome = "aborted"
66         BootOutcomeDisappeared BootOutcome = "disappeared"
67 )
68
69 var validBootOutcomes = map[BootOutcome]bool{
70         BootOutcomeFailed:      true,
71         BootOutcomeSucceeded:   true,
72         BootOutcomeAborted:     true,
73         BootOutcomeDisappeared: true,
74 }
75
76 // IdleBehavior indicates the behavior desired when a node becomes idle.
77 type IdleBehavior string
78
79 const (
80         IdleBehaviorRun   IdleBehavior = "run"   // run containers, or shutdown on idle timeout
81         IdleBehaviorHold  IdleBehavior = "hold"  // don't shutdown or run more containers
82         IdleBehaviorDrain IdleBehavior = "drain" // shutdown immediately when idle
83 )
84
85 var validIdleBehavior = map[IdleBehavior]bool{
86         IdleBehaviorRun:   true,
87         IdleBehaviorHold:  true,
88         IdleBehaviorDrain: true,
89 }
90
91 type worker struct {
92         logger   logrus.FieldLogger
93         executor Executor
94         wp       *Pool
95
96         mtx                 sync.Locker // must be wp's Locker.
97         state               State
98         idleBehavior        IdleBehavior
99         instance            cloud.Instance
100         instType            arvados.InstanceType
101         vcpus               int64
102         memory              int64
103         appeared            time.Time
104         probed              time.Time
105         updated             time.Time
106         busy                time.Time
107         destroyed           time.Time
108         firstSSHConnection  time.Time
109         lastUUID            string
110         running             map[string]*remoteRunner // remember to update state idle<->running when this changes
111         starting            map[string]*remoteRunner // remember to update state idle<->running when this changes
112         probing             chan struct{}
113         bootOutcomeReported bool
114         timeToReadyReported bool
115         staleRunLockSince   time.Time
116 }
117
118 func (wkr *worker) onUnkillable(uuid string) {
119         wkr.mtx.Lock()
120         defer wkr.mtx.Unlock()
121         logger := wkr.logger.WithField("ContainerUUID", uuid)
122         if wkr.idleBehavior == IdleBehaviorHold {
123                 logger.Warn("unkillable container, but worker has IdleBehavior=Hold")
124                 return
125         }
126         logger.Warn("unkillable container, draining worker")
127         wkr.setIdleBehavior(IdleBehaviorDrain)
128 }
129
130 func (wkr *worker) onKilled(uuid string) {
131         wkr.mtx.Lock()
132         defer wkr.mtx.Unlock()
133         wkr.closeRunner(uuid)
134         go wkr.wp.notify()
135 }
136
137 // caller must have lock.
138 func (wkr *worker) reportBootOutcome(outcome BootOutcome) {
139         if wkr.bootOutcomeReported {
140                 return
141         }
142         if wkr.wp.mBootOutcomes != nil {
143                 wkr.wp.mBootOutcomes.WithLabelValues(string(outcome)).Inc()
144         }
145         wkr.bootOutcomeReported = true
146 }
147
148 // caller must have lock.
149 func (wkr *worker) reportTimeBetweenFirstSSHAndReadyForContainer() {
150         if wkr.timeToReadyReported {
151                 return
152         }
153         if wkr.wp.mTimeToSSH != nil {
154                 wkr.wp.mTimeToReadyForContainer.Observe(time.Since(wkr.firstSSHConnection).Seconds())
155         }
156         wkr.timeToReadyReported = true
157 }
158
159 // caller must have lock.
160 func (wkr *worker) setIdleBehavior(idleBehavior IdleBehavior) {
161         wkr.logger.WithField("IdleBehavior", idleBehavior).Info("set idle behavior")
162         wkr.idleBehavior = idleBehavior
163         wkr.saveTags()
164         wkr.shutdownIfIdle()
165 }
166
167 // caller must have lock.
168 func (wkr *worker) startContainer(ctr arvados.Container) {
169         logger := wkr.logger.WithFields(logrus.Fields{
170                 "ContainerUUID": ctr.UUID,
171                 "Priority":      ctr.Priority,
172         })
173         logger.Debug("starting container")
174         rr := newRemoteRunner(ctr.UUID, wkr)
175         wkr.starting[ctr.UUID] = rr
176         if wkr.state != StateRunning {
177                 wkr.state = StateRunning
178                 go wkr.wp.notify()
179         }
180         go func() {
181                 rr.Start()
182                 if wkr.wp.mTimeFromQueueToCrunchRun != nil {
183                         wkr.wp.mTimeFromQueueToCrunchRun.Observe(time.Since(ctr.CreatedAt).Seconds())
184                 }
185                 wkr.mtx.Lock()
186                 defer wkr.mtx.Unlock()
187                 now := time.Now()
188                 wkr.updated = now
189                 wkr.busy = now
190                 delete(wkr.starting, ctr.UUID)
191                 wkr.running[ctr.UUID] = rr
192                 wkr.lastUUID = ctr.UUID
193         }()
194 }
195
196 // ProbeAndUpdate conducts appropriate boot/running probes (if any)
197 // for the worker's current state. If a previous probe is still
198 // running, it does nothing.
199 //
200 // It should be called in a new goroutine.
201 func (wkr *worker) ProbeAndUpdate() {
202         select {
203         case wkr.probing <- struct{}{}:
204                 wkr.probeAndUpdate()
205                 <-wkr.probing
206         default:
207                 wkr.logger.Debug("still waiting for last probe to finish")
208         }
209 }
210
211 // probeAndUpdate calls probeBooted and/or probeRunning if needed, and
212 // updates state accordingly.
213 //
214 // In StateUnknown: Call both probeBooted and probeRunning.
215 // In StateBooting: Call probeBooted; if successful, call probeRunning.
216 // In StateRunning: Call probeRunning.
217 // In StateIdle: Call probeRunning.
218 // In StateShutdown: Do nothing.
219 //
220 // If both probes succeed, wkr.state changes to
221 // StateIdle/StateRunning.
222 //
223 // If probeRunning succeeds, wkr.running is updated. (This means
224 // wkr.running might be non-empty even in StateUnknown, if the boot
225 // probe failed.)
226 //
227 // probeAndUpdate should be called in a new goroutine.
228 func (wkr *worker) probeAndUpdate() {
229         wkr.mtx.Lock()
230         updated := wkr.updated
231         initialState := wkr.state
232         wkr.mtx.Unlock()
233
234         var (
235                 booted   bool
236                 ctrUUIDs []string
237                 ok       bool
238                 stderr   []byte // from probeBooted
239         )
240
241         switch initialState {
242         case StateShutdown:
243                 return
244         case StateIdle, StateRunning:
245                 booted = true
246         case StateUnknown, StateBooting:
247         default:
248                 panic(fmt.Sprintf("unknown state %s", initialState))
249         }
250
251         probeStart := time.Now()
252         logger := wkr.logger.WithField("ProbeStart", probeStart)
253
254         if !booted {
255                 booted, stderr = wkr.probeBooted()
256                 shouldCopy := booted || initialState == StateUnknown
257                 if !booted {
258                         // Pretend this probe succeeded if another
259                         // concurrent attempt succeeded.
260                         wkr.mtx.Lock()
261                         if wkr.state == StateRunning || wkr.state == StateIdle {
262                                 booted = true
263                                 shouldCopy = false
264                         }
265                         wkr.mtx.Unlock()
266                 }
267                 if shouldCopy {
268                         _, stderrCopy, err := wkr.copyRunnerData()
269                         if err != nil {
270                                 booted = false
271                                 wkr.logger.WithError(err).WithField("stderr", string(stderrCopy)).Warn("error copying runner binary")
272                         }
273                 }
274                 if booted {
275                         logger.Info("instance booted; will try probeRunning")
276                 }
277         }
278         reportedBroken := false
279         if booted || initialState == StateUnknown {
280                 ctrUUIDs, reportedBroken, ok = wkr.probeRunning()
281         }
282         wkr.mtx.Lock()
283         defer wkr.mtx.Unlock()
284         if reportedBroken && wkr.idleBehavior == IdleBehaviorRun {
285                 logger.Info("probe reported broken instance")
286                 wkr.reportBootOutcome(BootOutcomeFailed)
287                 wkr.setIdleBehavior(IdleBehaviorDrain)
288         }
289         if !ok || (!booted && len(ctrUUIDs) == 0 && len(wkr.running) == 0) {
290                 if wkr.state == StateShutdown && wkr.updated.After(updated) {
291                         // Skip the logging noise if shutdown was
292                         // initiated during probe.
293                         return
294                 }
295                 // Using the start time of the probe as the timeout
296                 // threshold ensures we always initiate at least one
297                 // probe attempt after the boot/probe timeout expires
298                 // (otherwise, a slow probe failure could cause us to
299                 // shutdown an instance even though it did in fact
300                 // boot/recover before the timeout expired).
301                 dur := probeStart.Sub(wkr.probed)
302                 if wkr.shutdownIfBroken(dur) {
303                         // stderr from failed run-probes will have
304                         // been logged already, but boot-probe
305                         // failures are normal so they are logged only
306                         // at Debug level. This is our chance to log
307                         // some evidence about why the node never
308                         // booted, even in non-debug mode.
309                         if !booted {
310                                 wkr.reportBootOutcome(BootOutcomeFailed)
311                                 logger.WithFields(logrus.Fields{
312                                         "Duration": dur,
313                                         "stderr":   string(stderr),
314                                 }).Info("boot failed")
315                         }
316                 }
317                 return
318         }
319
320         updateTime := time.Now()
321         wkr.probed = updateTime
322
323         if updated != wkr.updated {
324                 // Worker was updated after the probe began, so
325                 // wkr.running might have a container UUID that was
326                 // not yet running when ctrUUIDs was generated. Leave
327                 // wkr.running alone and wait for the next probe to
328                 // catch up on any changes.
329                 logger.WithFields(logrus.Fields{
330                         "updated":     updated,
331                         "wkr.updated": wkr.updated,
332                 }).Debug("skipping worker state update due to probe/sync race")
333                 return
334         }
335
336         if len(ctrUUIDs) > 0 {
337                 wkr.busy = updateTime
338                 wkr.lastUUID = ctrUUIDs[0]
339         } else if len(wkr.running) > 0 {
340                 // Actual last-busy time was sometime between wkr.busy
341                 // and now. Now is the earliest opportunity to take
342                 // advantage of the non-busy state, though.
343                 wkr.busy = updateTime
344         }
345
346         changed := wkr.updateRunning(ctrUUIDs)
347
348         // Update state if this was the first successful boot-probe.
349         if booted && (wkr.state == StateUnknown || wkr.state == StateBooting) {
350                 if wkr.state == StateBooting {
351                         wkr.reportTimeBetweenFirstSSHAndReadyForContainer()
352                 }
353                 // Note: this will change again below if
354                 // len(wkr.starting)+len(wkr.running) > 0.
355                 wkr.state = StateIdle
356                 changed = true
357         }
358
359         // If wkr.state and wkr.running aren't changing then there's
360         // no need to log anything, notify the scheduler, move state
361         // back and forth between idle/running, etc.
362         if !changed {
363                 return
364         }
365
366         // Log whenever a run-probe reveals crunch-run processes
367         // appearing/disappearing before boot-probe succeeds.
368         if wkr.state == StateUnknown && changed {
369                 logger.WithFields(logrus.Fields{
370                         "RunningContainers": len(wkr.running),
371                         "State":             wkr.state,
372                 }).Info("crunch-run probe succeeded, but boot probe is still failing")
373         }
374
375         if wkr.state == StateIdle && len(wkr.starting)+len(wkr.running) > 0 {
376                 wkr.state = StateRunning
377         } else if wkr.state == StateRunning && len(wkr.starting)+len(wkr.running) == 0 {
378                 wkr.state = StateIdle
379         }
380         wkr.updated = updateTime
381         if booted && (initialState == StateUnknown || initialState == StateBooting) {
382                 wkr.reportBootOutcome(BootOutcomeSucceeded)
383                 logger.WithFields(logrus.Fields{
384                         "RunningContainers": len(wkr.running),
385                         "State":             wkr.state,
386                 }).Info("probes succeeded, instance is in service")
387         }
388         go wkr.wp.notify()
389 }
390
391 func (wkr *worker) probeRunning() (running []string, reportsBroken, ok bool) {
392         cmd := wkr.wp.runnerCmd + " --list"
393         if u := wkr.instance.RemoteUser(); u != "root" {
394                 cmd = "sudo " + cmd
395         }
396         before := time.Now()
397         var stdin io.Reader
398         if prices := wkr.instance.PriceHistory(wkr.instType); len(prices) > 0 {
399                 j, _ := json.Marshal(prices)
400                 stdin = bytes.NewReader(j)
401         }
402         stdout, stderr, err := wkr.executor.Execute(nil, cmd, stdin)
403         if err != nil {
404                 wkr.logger.WithFields(logrus.Fields{
405                         "Command": cmd,
406                         "stdout":  string(stdout),
407                         "stderr":  string(stderr),
408                 }).WithError(err).Warn("probe failed")
409                 wkr.wp.mRunProbeDuration.WithLabelValues("fail").Observe(time.Now().Sub(before).Seconds())
410                 return
411         }
412         wkr.logger.WithFields(logrus.Fields{
413                 "Command": cmd,
414                 "stdout":  string(stdout),
415                 "stderr":  string(stderr),
416         }).Debug("probe succeeded")
417         wkr.wp.mRunProbeDuration.WithLabelValues("success").Observe(time.Now().Sub(before).Seconds())
418         ok = true
419
420         staleRunLock := false
421         for _, s := range strings.Split(string(stdout), "\n") {
422                 // Each line of the "crunch-run --list" output is one
423                 // of the following:
424                 //
425                 // * a container UUID, indicating that processes
426                 //   related to that container are currently running.
427                 //   Optionally followed by " stale", indicating that
428                 //   the crunch-run process itself has exited (the
429                 //   remaining process is probably arv-mount).
430                 //
431                 // * the string "broken", indicating that the instance
432                 //   appears incapable of starting containers.
433                 //
434                 // See ListProcesses() in lib/crunchrun/background.go.
435                 if s == "" {
436                         // empty string following final newline
437                 } else if s == "broken" {
438                         reportsBroken = true
439                 } else if !strings.HasPrefix(s, wkr.wp.cluster.ClusterID) {
440                         // Ignore crunch-run processes that belong to
441                         // a different cluster (e.g., a single host
442                         // running multiple clusters with the loopback
443                         // driver)
444                         continue
445                 } else if toks := strings.Split(s, " "); len(toks) == 1 {
446                         running = append(running, s)
447                 } else if toks[1] == "stale" {
448                         wkr.logger.WithField("ContainerUUID", toks[0]).Info("probe reported stale run lock")
449                         staleRunLock = true
450                 }
451         }
452         wkr.mtx.Lock()
453         defer wkr.mtx.Unlock()
454         if !staleRunLock {
455                 wkr.staleRunLockSince = time.Time{}
456         } else if wkr.staleRunLockSince.IsZero() {
457                 wkr.staleRunLockSince = time.Now()
458         } else if dur := time.Now().Sub(wkr.staleRunLockSince); dur > wkr.wp.timeoutStaleRunLock {
459                 wkr.logger.WithField("Duration", dur).Warn("reporting broken after reporting stale run lock for too long")
460                 reportsBroken = true
461         }
462         return
463 }
464
465 func (wkr *worker) probeBooted() (ok bool, stderr []byte) {
466         cmd := wkr.wp.bootProbeCommand
467         if cmd == "" {
468                 cmd = "true"
469         }
470         stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
471         logger := wkr.logger.WithFields(logrus.Fields{
472                 "Command": cmd,
473                 "stdout":  string(stdout),
474                 "stderr":  string(stderr),
475         })
476         if err != nil {
477                 logger.WithError(err).Debug("boot probe failed")
478                 return false, stderr
479         }
480         logger.Info("boot probe succeeded")
481         return true, stderr
482 }
483
484 func (wkr *worker) copyRunnerData() (stdout, stderr []byte, err error) {
485         if err = wkr.wp.loadRunnerData(); err != nil {
486                 wkr.logger.WithError(err).Warn("cannot boot worker: error loading runner binary")
487                 return
488         } else if len(wkr.wp.runnerData) == 0 {
489                 // Assume crunch-run is already installed
490                 return
491         }
492
493         hash := fmt.Sprintf("%x", wkr.wp.runnerMD5)
494         dstdir, _ := filepath.Split(wkr.wp.runnerCmd)
495         logger := wkr.logger.WithFields(logrus.Fields{
496                 "hash": hash,
497                 "path": wkr.wp.runnerCmd,
498         })
499
500         stdout, stderr, err = wkr.executor.Execute(nil, `md5sum `+wkr.wp.runnerCmd, nil)
501         if err == nil && len(stderr) == 0 && bytes.Equal(stdout, []byte(hash+"  "+wkr.wp.runnerCmd+"\n")) {
502                 logger.Info("runner binary already exists on worker, with correct hash")
503                 return
504         }
505
506         // Note touch+chmod come before writing data, to avoid the
507         // possibility of md5 being correct while file mode is
508         // incorrect.
509         cmd := `set -e; dstdir="` + dstdir + `"; dstfile="` + wkr.wp.runnerCmd + `"; mkdir -p "$dstdir"; touch "$dstfile"; chmod 0755 "$dstdir" "$dstfile"; cat >"$dstfile"`
510         if wkr.instance.RemoteUser() != "root" {
511                 cmd = `sudo sh -c '` + strings.Replace(cmd, "'", "'\\''", -1) + `'`
512         }
513         logger.WithField("cmd", cmd).Info("installing runner binary on worker")
514         stdout, stderr, err = wkr.executor.Execute(nil, cmd, bytes.NewReader(wkr.wp.runnerData))
515         return
516 }
517
518 // caller must have lock.
519 func (wkr *worker) shutdownIfBroken(dur time.Duration) bool {
520         if wkr.idleBehavior == IdleBehaviorHold {
521                 // Never shut down.
522                 return false
523         }
524         label, threshold := "", wkr.wp.timeoutProbe
525         if wkr.state == StateUnknown || wkr.state == StateBooting {
526                 label, threshold = "new ", wkr.wp.timeoutBooting
527         }
528         if dur < threshold {
529                 return false
530         }
531         wkr.logger.WithFields(logrus.Fields{
532                 "Duration": dur,
533                 "Since":    wkr.probed,
534                 "State":    wkr.state,
535         }).Warnf("%sinstance unresponsive, shutting down", label)
536         wkr.shutdown()
537         return true
538 }
539
540 // Returns true if the instance is eligible for shutdown: either it's
541 // been idle too long, or idleBehavior=Drain and nothing is running.
542 //
543 // caller must have lock.
544 func (wkr *worker) eligibleForShutdown() bool {
545         if wkr.idleBehavior == IdleBehaviorHold {
546                 return false
547         }
548         draining := wkr.idleBehavior == IdleBehaviorDrain
549         switch wkr.state {
550         case StateBooting:
551                 return draining
552         case StateIdle:
553                 return draining || time.Since(wkr.busy) >= wkr.wp.timeoutIdle
554         case StateRunning:
555                 if !draining {
556                         return false
557                 }
558                 for _, rr := range wkr.running {
559                         if !rr.givenup {
560                                 return false
561                         }
562                 }
563                 for _, rr := range wkr.starting {
564                         if !rr.givenup {
565                                 return false
566                         }
567                 }
568                 // draining, and all remaining runners are just trying
569                 // to force-kill their crunch-run procs
570                 return true
571         default:
572                 return false
573         }
574 }
575
576 // caller must have lock.
577 func (wkr *worker) shutdownIfIdle() bool {
578         if !wkr.eligibleForShutdown() {
579                 return false
580         }
581         wkr.logger.WithFields(logrus.Fields{
582                 "State":        wkr.state,
583                 "IdleDuration": stats.Duration(time.Since(wkr.busy)),
584                 "IdleBehavior": wkr.idleBehavior,
585         }).Info("shutdown worker")
586         wkr.reportBootOutcome(BootOutcomeAborted)
587         wkr.shutdown()
588         return true
589 }
590
591 // caller must have lock.
592 func (wkr *worker) shutdown() {
593         now := time.Now()
594         wkr.updated = now
595         wkr.destroyed = now
596         wkr.state = StateShutdown
597         go wkr.wp.notify()
598         go func() {
599                 err := wkr.instance.Destroy()
600                 if err != nil {
601                         wkr.logger.WithError(err).Warn("shutdown failed")
602                         return
603                 }
604         }()
605 }
606
607 // Save worker tags to cloud provider metadata, if they don't already
608 // match. Caller must have lock.
609 func (wkr *worker) saveTags() {
610         instance := wkr.instance
611         tags := instance.Tags()
612         update := cloud.InstanceTags{
613                 wkr.wp.tagKeyPrefix + tagKeyInstanceType: wkr.instType.Name,
614                 wkr.wp.tagKeyPrefix + tagKeyIdleBehavior: string(wkr.idleBehavior),
615         }
616         save := false
617         for k, v := range update {
618                 if tags[k] != v {
619                         tags[k] = v
620                         save = true
621                 }
622         }
623         if save {
624                 go func() {
625                         err := instance.SetTags(tags)
626                         if err != nil {
627                                 wkr.wp.logger.WithField("Instance", instance.ID()).WithError(err).Warnf("error updating tags")
628                         }
629                 }()
630         }
631 }
632
633 func (wkr *worker) Close() {
634         // This might take time, so do it after unlocking mtx.
635         defer wkr.executor.Close()
636
637         wkr.mtx.Lock()
638         defer wkr.mtx.Unlock()
639         for uuid, rr := range wkr.running {
640                 wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process abandoned")
641                 rr.Close()
642                 delete(wkr.running, uuid)
643         }
644         for uuid, rr := range wkr.starting {
645                 wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process abandoned")
646                 rr.Close()
647                 delete(wkr.starting, uuid)
648         }
649 }
650
651 // Add/remove entries in wkr.running to match ctrUUIDs returned by a
652 // probe. Returns true if anything was added or removed.
653 //
654 // Caller must have lock.
655 func (wkr *worker) updateRunning(ctrUUIDs []string) (changed bool) {
656         alive := map[string]bool{}
657         for _, uuid := range ctrUUIDs {
658                 alive[uuid] = true
659                 if _, ok := wkr.running[uuid]; ok {
660                         // unchanged
661                 } else if rr, ok := wkr.starting[uuid]; ok {
662                         wkr.running[uuid] = rr
663                         delete(wkr.starting, uuid)
664                         changed = true
665                 } else {
666                         // We didn't start it -- it must have been
667                         // started by a previous dispatcher process.
668                         wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process detected")
669                         wkr.running[uuid] = newRemoteRunner(uuid, wkr)
670                         changed = true
671                 }
672         }
673         for uuid := range wkr.running {
674                 if !alive[uuid] {
675                         wkr.closeRunner(uuid)
676                         changed = true
677                 }
678         }
679         return
680 }
681
682 // caller must have lock.
683 func (wkr *worker) closeRunner(uuid string) {
684         rr := wkr.running[uuid]
685         if rr == nil {
686                 return
687         }
688         wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process ended")
689         delete(wkr.running, uuid)
690         rr.Close()
691
692         now := time.Now()
693         wkr.updated = now
694         wkr.wp.exited[uuid] = now
695         if wkr.state == StateRunning && len(wkr.running)+len(wkr.starting) == 0 {
696                 wkr.state = StateIdle
697         }
698 }