20649: Don't wait timeoutSignal before sending first TERM signal.
[arvados.git] / lib / dispatchcloud / worker / worker.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package worker
6
7 import (
8         "bytes"
9         "encoding/json"
10         "errors"
11         "fmt"
12         "io"
13         "net"
14         "path/filepath"
15         "strings"
16         "sync"
17         "time"
18
19         "git.arvados.org/arvados.git/lib/cloud"
20         "git.arvados.org/arvados.git/lib/dispatchcloud/sshexecutor"
21         "git.arvados.org/arvados.git/sdk/go/arvados"
22         "git.arvados.org/arvados.git/sdk/go/stats"
23         "github.com/sirupsen/logrus"
24         "golang.org/x/crypto/ssh"
25 )
26
27 const (
28         // TODO: configurable
29         maxPingFailTime = 10 * time.Minute
30 )
31
32 // State indicates whether a worker is available to do work, and (if
33 // not) whether/when it is expected to become ready.
34 type State int
35
36 const (
37         StateUnknown  State = iota // might be running a container already
38         StateBooting               // instance is booting
39         StateIdle                  // instance booted, no containers are running
40         StateRunning               // instance is running one or more containers
41         StateShutdown              // worker has stopped monitoring the instance
42 )
43
44 var stateString = map[State]string{
45         StateUnknown:  "unknown",
46         StateBooting:  "booting",
47         StateIdle:     "idle",
48         StateRunning:  "running",
49         StateShutdown: "shutdown",
50 }
51
52 // String implements fmt.Stringer.
53 func (s State) String() string {
54         return stateString[s]
55 }
56
57 // MarshalText implements encoding.TextMarshaler so a JSON encoding of
58 // map[State]anything uses the state's string representation.
59 func (s State) MarshalText() ([]byte, error) {
60         return []byte(stateString[s]), nil
61 }
62
63 // BootOutcome is the result of a worker boot. It is used as a label in a metric.
64 type BootOutcome string
65
66 const (
67         BootOutcomeFailed      BootOutcome = "failure"
68         BootOutcomeSucceeded   BootOutcome = "success"
69         BootOutcomeAborted     BootOutcome = "aborted"
70         BootOutcomeDisappeared BootOutcome = "disappeared"
71 )
72
73 var validBootOutcomes = map[BootOutcome]bool{
74         BootOutcomeFailed:      true,
75         BootOutcomeSucceeded:   true,
76         BootOutcomeAborted:     true,
77         BootOutcomeDisappeared: true,
78 }
79
80 // IdleBehavior indicates the behavior desired when a node becomes idle.
81 type IdleBehavior string
82
83 const (
84         IdleBehaviorRun   IdleBehavior = "run"   // run containers, or shutdown on idle timeout
85         IdleBehaviorHold  IdleBehavior = "hold"  // don't shutdown or run more containers
86         IdleBehaviorDrain IdleBehavior = "drain" // shutdown immediately when idle
87 )
88
89 var validIdleBehavior = map[IdleBehavior]bool{
90         IdleBehaviorRun:   true,
91         IdleBehaviorHold:  true,
92         IdleBehaviorDrain: true,
93 }
94
95 type worker struct {
96         logger   logrus.FieldLogger
97         executor Executor
98         wp       *Pool
99
100         mtx                 sync.Locker // must be wp's Locker.
101         state               State
102         idleBehavior        IdleBehavior
103         instance            cloud.Instance
104         instType            arvados.InstanceType
105         vcpus               int64
106         memory              int64
107         appeared            time.Time
108         probed              time.Time
109         updated             time.Time
110         busy                time.Time
111         destroyed           time.Time
112         firstSSHConnection  time.Time
113         lastUUID            string
114         running             map[string]*remoteRunner // remember to update state idle<->running when this changes
115         starting            map[string]*remoteRunner // remember to update state idle<->running when this changes
116         probing             chan struct{}
117         bootOutcomeReported bool
118         timeToReadyReported bool
119         staleRunLockSince   time.Time
120 }
121
122 func (wkr *worker) onUnkillable(uuid string) {
123         wkr.mtx.Lock()
124         defer wkr.mtx.Unlock()
125         logger := wkr.logger.WithField("ContainerUUID", uuid)
126         if wkr.idleBehavior == IdleBehaviorHold {
127                 logger.Warn("unkillable container, but worker has IdleBehavior=Hold")
128                 return
129         }
130         logger.Warn("unkillable container, draining worker")
131         wkr.setIdleBehavior(IdleBehaviorDrain)
132 }
133
134 func (wkr *worker) onKilled(uuid string) {
135         wkr.mtx.Lock()
136         defer wkr.mtx.Unlock()
137         wkr.closeRunner(uuid)
138         go wkr.wp.notify()
139 }
140
141 // caller must have lock.
142 func (wkr *worker) reportBootOutcome(outcome BootOutcome) {
143         if wkr.bootOutcomeReported {
144                 return
145         }
146         if wkr.wp.mBootOutcomes != nil {
147                 wkr.wp.mBootOutcomes.WithLabelValues(string(outcome)).Inc()
148         }
149         wkr.bootOutcomeReported = true
150 }
151
152 // caller must have lock.
153 func (wkr *worker) reportTimeBetweenFirstSSHAndReadyForContainer() {
154         if wkr.timeToReadyReported {
155                 return
156         }
157         if wkr.wp.mTimeToSSH != nil {
158                 wkr.wp.mTimeToReadyForContainer.Observe(time.Since(wkr.firstSSHConnection).Seconds())
159         }
160         wkr.timeToReadyReported = true
161 }
162
163 // caller must have lock.
164 func (wkr *worker) setIdleBehavior(idleBehavior IdleBehavior) {
165         wkr.logger.WithField("IdleBehavior", idleBehavior).Info("set idle behavior")
166         wkr.idleBehavior = idleBehavior
167         wkr.saveTags()
168         wkr.shutdownIfIdle()
169 }
170
171 // caller must have lock.
172 func (wkr *worker) startContainer(ctr arvados.Container) {
173         logger := wkr.logger.WithFields(logrus.Fields{
174                 "ContainerUUID": ctr.UUID,
175                 "Priority":      ctr.Priority,
176         })
177         logger.Debug("starting container")
178         rr := newRemoteRunner(ctr.UUID, wkr)
179         wkr.starting[ctr.UUID] = rr
180         if wkr.state != StateRunning {
181                 wkr.state = StateRunning
182                 go wkr.wp.notify()
183         }
184         go func() {
185                 rr.Start()
186                 if wkr.wp.mTimeFromQueueToCrunchRun != nil {
187                         wkr.wp.mTimeFromQueueToCrunchRun.Observe(time.Since(ctr.CreatedAt).Seconds())
188                 }
189                 wkr.mtx.Lock()
190                 defer wkr.mtx.Unlock()
191                 now := time.Now()
192                 wkr.updated = now
193                 wkr.busy = now
194                 delete(wkr.starting, ctr.UUID)
195                 wkr.running[ctr.UUID] = rr
196                 wkr.lastUUID = ctr.UUID
197         }()
198 }
199
200 // ProbeAndUpdate conducts appropriate boot/running probes (if any)
201 // for the worker's current state. If a previous probe is still
202 // running, it does nothing.
203 //
204 // It should be called in a new goroutine.
205 func (wkr *worker) ProbeAndUpdate() {
206         select {
207         case wkr.probing <- struct{}{}:
208                 wkr.probeAndUpdate()
209                 <-wkr.probing
210         default:
211                 wkr.logger.Debug("still waiting for last probe to finish")
212         }
213 }
214
215 // probeAndUpdate calls probeBooted and/or probeRunning if needed, and
216 // updates state accordingly.
217 //
218 // In StateUnknown: Call both probeBooted and probeRunning.
219 // In StateBooting: Call probeBooted; if successful, call probeRunning.
220 // In StateRunning: Call probeRunning.
221 // In StateIdle: Call probeRunning.
222 // In StateShutdown: Do nothing.
223 //
224 // If both probes succeed, wkr.state changes to
225 // StateIdle/StateRunning.
226 //
227 // If probeRunning succeeds, wkr.running is updated. (This means
228 // wkr.running might be non-empty even in StateUnknown, if the boot
229 // probe failed.)
230 //
231 // probeAndUpdate should be called in a new goroutine.
232 func (wkr *worker) probeAndUpdate() {
233         wkr.mtx.Lock()
234         updated := wkr.updated
235         initialState := wkr.state
236         wkr.mtx.Unlock()
237
238         var (
239                 booted   bool
240                 ctrUUIDs []string
241                 ok       bool
242                 stderr   []byte // from probeBooted
243                 errLast  error  // from probeBooted or copyRunnerData
244         )
245
246         switch initialState {
247         case StateShutdown:
248                 return
249         case StateIdle, StateRunning:
250                 booted = true
251         case StateUnknown, StateBooting:
252         default:
253                 panic(fmt.Sprintf("unknown state %s", initialState))
254         }
255
256         probeStart := time.Now()
257         logger := wkr.logger.WithField("ProbeStart", probeStart)
258
259         if !booted {
260                 stderr, errLast = wkr.probeBooted()
261                 booted = errLast == nil
262                 shouldCopy := booted || initialState == StateUnknown
263                 if !booted {
264                         // Pretend this probe succeeded if another
265                         // concurrent attempt succeeded.
266                         wkr.mtx.Lock()
267                         if wkr.state == StateRunning || wkr.state == StateIdle {
268                                 booted = true
269                                 shouldCopy = false
270                         }
271                         wkr.mtx.Unlock()
272                 }
273                 if shouldCopy {
274                         _, stderrCopy, err := wkr.copyRunnerData()
275                         if err != nil {
276                                 booted = false
277                                 wkr.logger.WithError(err).WithField("stderr", string(stderrCopy)).Warn("error copying runner binary")
278                                 errLast = err
279                         }
280                 }
281                 if booted {
282                         logger.Info("instance booted; will try probeRunning")
283                 }
284         }
285         reportedBroken := false
286         if booted || initialState == StateUnknown {
287                 ctrUUIDs, reportedBroken, ok = wkr.probeRunning()
288         }
289         wkr.mtx.Lock()
290         defer wkr.mtx.Unlock()
291         if reportedBroken && wkr.idleBehavior == IdleBehaviorRun {
292                 logger.Info("probe reported broken instance")
293                 wkr.reportBootOutcome(BootOutcomeFailed)
294                 wkr.setIdleBehavior(IdleBehaviorDrain)
295         }
296         if !ok || (!booted && len(ctrUUIDs) == 0 && len(wkr.running) == 0) {
297                 if wkr.state == StateShutdown && wkr.updated.After(updated) {
298                         // Skip the logging noise if shutdown was
299                         // initiated during probe.
300                         return
301                 }
302                 // Using the start time of the probe as the timeout
303                 // threshold ensures we always initiate at least one
304                 // probe attempt after the boot/probe timeout expires
305                 // (otherwise, a slow probe failure could cause us to
306                 // shutdown an instance even though it did in fact
307                 // boot/recover before the timeout expired).
308                 dur := probeStart.Sub(wkr.probed)
309                 if wkr.shutdownIfBroken(dur) {
310                         // stderr from failed run-probes will have
311                         // been logged already, but some boot-probe
312                         // failures are normal so they are logged only
313                         // at Debug level. This may be our chance to
314                         // log some evidence about why the node never
315                         // booted, even in non-debug mode.
316                         if !booted {
317                                 wkr.reportBootOutcome(BootOutcomeFailed)
318                                 logger.WithFields(logrus.Fields{
319                                         "Duration": dur,
320                                         "stderr":   string(stderr),
321                                 }).WithError(errLast).Info("boot failed")
322                         }
323                 }
324                 return
325         }
326
327         updateTime := time.Now()
328         wkr.probed = updateTime
329
330         if updated != wkr.updated {
331                 // Worker was updated after the probe began, so
332                 // wkr.running might have a container UUID that was
333                 // not yet running when ctrUUIDs was generated. Leave
334                 // wkr.running alone and wait for the next probe to
335                 // catch up on any changes.
336                 logger.WithFields(logrus.Fields{
337                         "updated":     updated,
338                         "wkr.updated": wkr.updated,
339                 }).Debug("skipping worker state update due to probe/sync race")
340                 return
341         }
342
343         if len(ctrUUIDs) > 0 {
344                 wkr.busy = updateTime
345                 wkr.lastUUID = ctrUUIDs[0]
346         } else if len(wkr.running) > 0 {
347                 // Actual last-busy time was sometime between wkr.busy
348                 // and now. Now is the earliest opportunity to take
349                 // advantage of the non-busy state, though.
350                 wkr.busy = updateTime
351         }
352
353         changed := wkr.updateRunning(ctrUUIDs)
354
355         // Update state if this was the first successful boot-probe.
356         if booted && (wkr.state == StateUnknown || wkr.state == StateBooting) {
357                 if wkr.state == StateBooting {
358                         wkr.reportTimeBetweenFirstSSHAndReadyForContainer()
359                 }
360                 // Note: this will change again below if
361                 // len(wkr.starting)+len(wkr.running) > 0.
362                 wkr.state = StateIdle
363                 changed = true
364         }
365
366         // If wkr.state and wkr.running aren't changing then there's
367         // no need to log anything, notify the scheduler, move state
368         // back and forth between idle/running, etc.
369         if !changed {
370                 return
371         }
372
373         // Log whenever a run-probe reveals crunch-run processes
374         // appearing/disappearing before boot-probe succeeds.
375         if wkr.state == StateUnknown && changed {
376                 logger.WithFields(logrus.Fields{
377                         "RunningContainers": len(wkr.running),
378                         "State":             wkr.state,
379                 }).Info("crunch-run probe succeeded, but boot probe is still failing")
380         }
381
382         if wkr.state == StateIdle && len(wkr.starting)+len(wkr.running) > 0 {
383                 wkr.state = StateRunning
384         } else if wkr.state == StateRunning && len(wkr.starting)+len(wkr.running) == 0 {
385                 wkr.state = StateIdle
386         }
387         wkr.updated = updateTime
388         if booted && (initialState == StateUnknown || initialState == StateBooting) {
389                 wkr.reportBootOutcome(BootOutcomeSucceeded)
390                 logger.WithFields(logrus.Fields{
391                         "RunningContainers": len(wkr.running),
392                         "State":             wkr.state,
393                 }).Info("probes succeeded, instance is in service")
394         }
395         go wkr.wp.notify()
396 }
397
398 func (wkr *worker) probeRunning() (running []string, reportsBroken, ok bool) {
399         cmd := wkr.wp.runnerCmd + " --list"
400         if u := wkr.instance.RemoteUser(); u != "root" {
401                 cmd = "sudo " + cmd
402         }
403         before := time.Now()
404         var stdin io.Reader
405         if prices := wkr.instance.PriceHistory(wkr.instType); len(prices) > 0 {
406                 j, _ := json.Marshal(prices)
407                 stdin = bytes.NewReader(j)
408         }
409         stdout, stderr, err := wkr.executor.Execute(nil, cmd, stdin)
410         if err != nil {
411                 wkr.logger.WithFields(logrus.Fields{
412                         "Command": cmd,
413                         "stdout":  string(stdout),
414                         "stderr":  string(stderr),
415                 }).WithError(err).Warn("probe failed")
416                 wkr.wp.mRunProbeDuration.WithLabelValues("fail").Observe(time.Now().Sub(before).Seconds())
417                 return
418         }
419         wkr.logger.WithFields(logrus.Fields{
420                 "Command": cmd,
421                 "stdout":  string(stdout),
422                 "stderr":  string(stderr),
423         }).Debug("probe succeeded")
424         wkr.wp.mRunProbeDuration.WithLabelValues("success").Observe(time.Now().Sub(before).Seconds())
425         ok = true
426
427         staleRunLock := false
428         for _, s := range strings.Split(string(stdout), "\n") {
429                 // Each line of the "crunch-run --list" output is one
430                 // of the following:
431                 //
432                 // * a container UUID, indicating that processes
433                 //   related to that container are currently running.
434                 //   Optionally followed by " stale", indicating that
435                 //   the crunch-run process itself has exited (the
436                 //   remaining process is probably arv-mount).
437                 //
438                 // * the string "broken", indicating that the instance
439                 //   appears incapable of starting containers.
440                 //
441                 // See ListProcesses() in lib/crunchrun/background.go.
442                 if s == "" {
443                         // empty string following final newline
444                 } else if s == "broken" {
445                         reportsBroken = true
446                 } else if !strings.HasPrefix(s, wkr.wp.cluster.ClusterID) {
447                         // Ignore crunch-run processes that belong to
448                         // a different cluster (e.g., a single host
449                         // running multiple clusters with the loopback
450                         // driver)
451                         continue
452                 } else if toks := strings.Split(s, " "); len(toks) == 1 {
453                         running = append(running, s)
454                 } else if toks[1] == "stale" {
455                         wkr.logger.WithField("ContainerUUID", toks[0]).Info("probe reported stale run lock")
456                         staleRunLock = true
457                 }
458         }
459         wkr.mtx.Lock()
460         defer wkr.mtx.Unlock()
461         if !staleRunLock {
462                 wkr.staleRunLockSince = time.Time{}
463         } else if wkr.staleRunLockSince.IsZero() {
464                 wkr.staleRunLockSince = time.Now()
465         } else if dur := time.Now().Sub(wkr.staleRunLockSince); dur > wkr.wp.timeoutStaleRunLock {
466                 wkr.logger.WithField("Duration", dur).Warn("reporting broken after reporting stale run lock for too long")
467                 reportsBroken = true
468         }
469         return
470 }
471
472 func (wkr *worker) probeBooted() (stderr []byte, err error) {
473         cmd := wkr.wp.bootProbeCommand
474         if cmd == "" {
475                 cmd = "true"
476         }
477         stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
478         logger := wkr.logger.WithFields(logrus.Fields{
479                 "Command": cmd,
480                 "stdout":  string(stdout),
481                 "stderr":  string(stderr),
482         })
483         if err != nil {
484                 if errors.Is(err, sshexecutor.ErrNoAddress) ||
485                         errors.As(err, new(*net.OpError)) ||
486                         errors.As(err, new(*ssh.ExitError)) {
487                         // These errors are expected while the
488                         // instance is booting, so we only log them at
489                         // debug level.
490                         logger.WithError(err).Debug("boot probe failed")
491                 } else {
492                         // Other errors are more likely to indicate a
493                         // configuration problem, and it's more
494                         // sysadmin-friendly to show them right away
495                         // instead of waiting until boot timeout and
496                         // only showing the last error.
497                         //
498                         // Example: "ssh: handshake failed: ssh:
499                         // unable to authenticate, attempted methods
500                         // [none publickey], no supported methods
501                         // remain"
502                         logger.WithError(err).Warn("boot probe failed")
503                 }
504                 return stderr, err
505         }
506         logger.Info("boot probe succeeded")
507         return stderr, nil
508 }
509
510 func (wkr *worker) copyRunnerData() (stdout, stderr []byte, err error) {
511         if err = wkr.wp.loadRunnerData(); err != nil {
512                 wkr.logger.WithError(err).Warn("cannot boot worker: error loading runner binary")
513                 return
514         } else if len(wkr.wp.runnerData) == 0 {
515                 // Assume crunch-run is already installed
516                 return
517         }
518
519         hash := fmt.Sprintf("%x", wkr.wp.runnerMD5)
520         dstdir, _ := filepath.Split(wkr.wp.runnerCmd)
521         logger := wkr.logger.WithFields(logrus.Fields{
522                 "hash": hash,
523                 "path": wkr.wp.runnerCmd,
524         })
525
526         stdout, stderr, err = wkr.executor.Execute(nil, `md5sum `+wkr.wp.runnerCmd, nil)
527         if err == nil && len(stderr) == 0 && bytes.Equal(stdout, []byte(hash+"  "+wkr.wp.runnerCmd+"\n")) {
528                 logger.Info("runner binary already exists on worker, with correct hash")
529                 return
530         }
531
532         // Note touch+chmod come before writing data, to avoid the
533         // possibility of md5 being correct while file mode is
534         // incorrect.
535         cmd := `set -e; dstdir="` + dstdir + `"; dstfile="` + wkr.wp.runnerCmd + `"; mkdir -p "$dstdir"; touch "$dstfile"; chmod 0755 "$dstdir" "$dstfile"; cat >"$dstfile"`
536         if wkr.instance.RemoteUser() != "root" {
537                 cmd = `sudo sh -c '` + strings.Replace(cmd, "'", "'\\''", -1) + `'`
538         }
539         logger.WithField("cmd", cmd).Info("installing runner binary on worker")
540         stdout, stderr, err = wkr.executor.Execute(nil, cmd, bytes.NewReader(wkr.wp.runnerData))
541         return
542 }
543
544 // caller must have lock.
545 func (wkr *worker) shutdownIfBroken(dur time.Duration) bool {
546         if wkr.idleBehavior == IdleBehaviorHold {
547                 // Never shut down.
548                 return false
549         }
550         label, threshold := "", wkr.wp.timeoutProbe
551         if wkr.state == StateUnknown || wkr.state == StateBooting {
552                 label, threshold = "new ", wkr.wp.timeoutBooting
553         }
554         if dur < threshold {
555                 return false
556         }
557         wkr.logger.WithFields(logrus.Fields{
558                 "Duration": dur,
559                 "Since":    wkr.probed,
560                 "State":    wkr.state,
561         }).Warnf("%sinstance unresponsive, shutting down", label)
562         wkr.shutdown()
563         return true
564 }
565
566 // Returns true if the instance is eligible for shutdown: either it's
567 // been idle too long, or idleBehavior=Drain and nothing is running.
568 //
569 // caller must have lock.
570 func (wkr *worker) eligibleForShutdown() bool {
571         if wkr.idleBehavior == IdleBehaviorHold {
572                 return false
573         }
574         draining := wkr.idleBehavior == IdleBehaviorDrain
575         switch wkr.state {
576         case StateBooting:
577                 return draining
578         case StateIdle:
579                 return draining || time.Since(wkr.busy) >= wkr.wp.timeoutIdle
580         case StateRunning:
581                 if !draining {
582                         return false
583                 }
584                 for _, rr := range wkr.running {
585                         if !rr.givenup {
586                                 return false
587                         }
588                 }
589                 for _, rr := range wkr.starting {
590                         if !rr.givenup {
591                                 return false
592                         }
593                 }
594                 // draining, and all remaining runners are just trying
595                 // to force-kill their crunch-run procs
596                 return true
597         default:
598                 return false
599         }
600 }
601
602 // caller must have lock.
603 func (wkr *worker) shutdownIfIdle() bool {
604         if !wkr.eligibleForShutdown() {
605                 return false
606         }
607         wkr.logger.WithFields(logrus.Fields{
608                 "State":        wkr.state,
609                 "IdleDuration": stats.Duration(time.Since(wkr.busy)),
610                 "IdleBehavior": wkr.idleBehavior,
611         }).Info("shutdown worker")
612         wkr.reportBootOutcome(BootOutcomeAborted)
613         wkr.shutdown()
614         return true
615 }
616
617 // caller must have lock.
618 func (wkr *worker) shutdown() {
619         now := time.Now()
620         wkr.updated = now
621         wkr.destroyed = now
622         wkr.state = StateShutdown
623         go wkr.wp.notify()
624         go func() {
625                 err := wkr.instance.Destroy()
626                 if err != nil {
627                         wkr.logger.WithError(err).Warn("shutdown failed")
628                         return
629                 }
630         }()
631 }
632
633 // Save worker tags to cloud provider metadata, if they don't already
634 // match. Caller must have lock.
635 func (wkr *worker) saveTags() {
636         instance := wkr.instance
637         tags := instance.Tags()
638         update := cloud.InstanceTags{
639                 wkr.wp.tagKeyPrefix + tagKeyInstanceType: wkr.instType.Name,
640                 wkr.wp.tagKeyPrefix + tagKeyIdleBehavior: string(wkr.idleBehavior),
641         }
642         save := false
643         for k, v := range update {
644                 if tags[k] != v {
645                         tags[k] = v
646                         save = true
647                 }
648         }
649         if save {
650                 go func() {
651                         err := instance.SetTags(tags)
652                         if err != nil {
653                                 wkr.wp.logger.WithField("Instance", instance.ID()).WithError(err).Warnf("error updating tags")
654                         }
655                 }()
656         }
657 }
658
659 func (wkr *worker) Close() {
660         // This might take time, so do it after unlocking mtx.
661         defer wkr.executor.Close()
662
663         wkr.mtx.Lock()
664         defer wkr.mtx.Unlock()
665         for uuid, rr := range wkr.running {
666                 wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process abandoned")
667                 rr.Close()
668         }
669         for uuid, rr := range wkr.starting {
670                 wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process abandoned")
671                 rr.Close()
672         }
673 }
674
675 // Add/remove entries in wkr.running to match ctrUUIDs returned by a
676 // probe. Returns true if anything was added or removed.
677 //
678 // Caller must have lock.
679 func (wkr *worker) updateRunning(ctrUUIDs []string) (changed bool) {
680         alive := map[string]bool{}
681         for _, uuid := range ctrUUIDs {
682                 alive[uuid] = true
683                 if _, ok := wkr.running[uuid]; ok {
684                         // unchanged
685                 } else if rr, ok := wkr.starting[uuid]; ok {
686                         wkr.running[uuid] = rr
687                         delete(wkr.starting, uuid)
688                         changed = true
689                 } else {
690                         // We didn't start it -- it must have been
691                         // started by a previous dispatcher process.
692                         wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process detected")
693                         wkr.running[uuid] = newRemoteRunner(uuid, wkr)
694                         changed = true
695                 }
696         }
697         for uuid := range wkr.running {
698                 if !alive[uuid] {
699                         wkr.closeRunner(uuid)
700                         changed = true
701                 }
702         }
703         return
704 }
705
706 // caller must have lock.
707 func (wkr *worker) closeRunner(uuid string) {
708         rr := wkr.running[uuid]
709         if rr == nil {
710                 return
711         }
712         wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process ended")
713         delete(wkr.running, uuid)
714         rr.Close()
715
716         now := time.Now()
717         wkr.updated = now
718         wkr.wp.exited[uuid] = now
719         if wkr.state == StateRunning && len(wkr.running)+len(wkr.starting) == 0 {
720                 wkr.state = StateIdle
721         }
722 }