20649: Fix panic on race, worker shutdown vs. container startup.
[arvados.git] / lib / dispatchcloud / worker / worker.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package worker
6
7 import (
8         "bytes"
9         "encoding/json"
10         "errors"
11         "fmt"
12         "io"
13         "net"
14         "path/filepath"
15         "strings"
16         "sync"
17         "time"
18
19         "git.arvados.org/arvados.git/lib/cloud"
20         "git.arvados.org/arvados.git/lib/dispatchcloud/sshexecutor"
21         "git.arvados.org/arvados.git/sdk/go/arvados"
22         "git.arvados.org/arvados.git/sdk/go/stats"
23         "github.com/sirupsen/logrus"
24         "golang.org/x/crypto/ssh"
25 )
26
27 const (
28         // TODO: configurable
29         maxPingFailTime = 10 * time.Minute
30 )
31
32 // State indicates whether a worker is available to do work, and (if
33 // not) whether/when it is expected to become ready.
34 type State int
35
36 const (
37         StateUnknown  State = iota // might be running a container already
38         StateBooting               // instance is booting
39         StateIdle                  // instance booted, no containers are running
40         StateRunning               // instance is running one or more containers
41         StateShutdown              // worker has stopped monitoring the instance
42 )
43
44 var stateString = map[State]string{
45         StateUnknown:  "unknown",
46         StateBooting:  "booting",
47         StateIdle:     "idle",
48         StateRunning:  "running",
49         StateShutdown: "shutdown",
50 }
51
52 // String implements fmt.Stringer.
53 func (s State) String() string {
54         return stateString[s]
55 }
56
57 // MarshalText implements encoding.TextMarshaler so a JSON encoding of
58 // map[State]anything uses the state's string representation.
59 func (s State) MarshalText() ([]byte, error) {
60         return []byte(stateString[s]), nil
61 }
62
63 // BootOutcome is the result of a worker boot. It is used as a label in a metric.
64 type BootOutcome string
65
66 const (
67         BootOutcomeFailed      BootOutcome = "failure"
68         BootOutcomeSucceeded   BootOutcome = "success"
69         BootOutcomeAborted     BootOutcome = "aborted"
70         BootOutcomeDisappeared BootOutcome = "disappeared"
71 )
72
73 var validBootOutcomes = map[BootOutcome]bool{
74         BootOutcomeFailed:      true,
75         BootOutcomeSucceeded:   true,
76         BootOutcomeAborted:     true,
77         BootOutcomeDisappeared: true,
78 }
79
80 // IdleBehavior indicates the behavior desired when a node becomes idle.
81 type IdleBehavior string
82
83 const (
84         IdleBehaviorRun   IdleBehavior = "run"   // run containers, or shutdown on idle timeout
85         IdleBehaviorHold  IdleBehavior = "hold"  // don't shutdown or run more containers
86         IdleBehaviorDrain IdleBehavior = "drain" // shutdown immediately when idle
87 )
88
89 var validIdleBehavior = map[IdleBehavior]bool{
90         IdleBehaviorRun:   true,
91         IdleBehaviorHold:  true,
92         IdleBehaviorDrain: true,
93 }
94
95 type worker struct {
96         logger   logrus.FieldLogger
97         executor Executor
98         wp       *Pool
99
100         mtx                 sync.Locker // must be wp's Locker.
101         state               State
102         idleBehavior        IdleBehavior
103         instance            cloud.Instance
104         instType            arvados.InstanceType
105         vcpus               int64
106         memory              int64
107         appeared            time.Time
108         probed              time.Time
109         updated             time.Time
110         busy                time.Time
111         destroyed           time.Time
112         firstSSHConnection  time.Time
113         lastUUID            string
114         running             map[string]*remoteRunner // remember to update state idle<->running when this changes
115         starting            map[string]*remoteRunner // remember to update state idle<->running when this changes
116         probing             chan struct{}
117         bootOutcomeReported bool
118         timeToReadyReported bool
119         staleRunLockSince   time.Time
120 }
121
122 func (wkr *worker) onUnkillable(uuid string) {
123         wkr.mtx.Lock()
124         defer wkr.mtx.Unlock()
125         logger := wkr.logger.WithField("ContainerUUID", uuid)
126         if wkr.idleBehavior == IdleBehaviorHold {
127                 logger.Warn("unkillable container, but worker has IdleBehavior=Hold")
128                 return
129         }
130         logger.Warn("unkillable container, draining worker")
131         wkr.setIdleBehavior(IdleBehaviorDrain)
132 }
133
134 func (wkr *worker) onKilled(uuid string) {
135         wkr.mtx.Lock()
136         defer wkr.mtx.Unlock()
137         wkr.closeRunner(uuid)
138         go wkr.wp.notify()
139 }
140
141 // caller must have lock.
142 func (wkr *worker) reportBootOutcome(outcome BootOutcome) {
143         if wkr.bootOutcomeReported {
144                 return
145         }
146         if wkr.wp.mBootOutcomes != nil {
147                 wkr.wp.mBootOutcomes.WithLabelValues(string(outcome)).Inc()
148         }
149         wkr.bootOutcomeReported = true
150 }
151
152 // caller must have lock.
153 func (wkr *worker) reportTimeBetweenFirstSSHAndReadyForContainer() {
154         if wkr.timeToReadyReported {
155                 return
156         }
157         if wkr.wp.mTimeToSSH != nil {
158                 wkr.wp.mTimeToReadyForContainer.Observe(time.Since(wkr.firstSSHConnection).Seconds())
159         }
160         wkr.timeToReadyReported = true
161 }
162
163 // caller must have lock.
164 func (wkr *worker) setIdleBehavior(idleBehavior IdleBehavior) {
165         wkr.logger.WithField("IdleBehavior", idleBehavior).Info("set idle behavior")
166         wkr.idleBehavior = idleBehavior
167         wkr.saveTags()
168         wkr.shutdownIfIdle()
169 }
170
171 // caller must have lock.
172 func (wkr *worker) startContainer(ctr arvados.Container) {
173         logger := wkr.logger.WithFields(logrus.Fields{
174                 "ContainerUUID": ctr.UUID,
175                 "Priority":      ctr.Priority,
176         })
177         logger.Debug("starting container")
178         rr := newRemoteRunner(ctr.UUID, wkr)
179         wkr.starting[ctr.UUID] = rr
180         if wkr.state != StateRunning {
181                 wkr.state = StateRunning
182                 go wkr.wp.notify()
183         }
184         go func() {
185                 rr.Start()
186                 if wkr.wp.mTimeFromQueueToCrunchRun != nil {
187                         wkr.wp.mTimeFromQueueToCrunchRun.Observe(time.Since(ctr.CreatedAt).Seconds())
188                 }
189                 wkr.mtx.Lock()
190                 defer wkr.mtx.Unlock()
191                 if wkr.starting[ctr.UUID] != rr {
192                         // Someone else (e.g., wkr.probeAndUpdate() ->
193                         // wkr.updateRunning() or wkr.Close()) already
194                         // moved our runner from wkr.starting to
195                         // wkr.running or deleted it while we were in
196                         // rr.Start().
197                         return
198                 }
199                 now := time.Now()
200                 wkr.updated = now
201                 wkr.busy = now
202                 delete(wkr.starting, ctr.UUID)
203                 wkr.running[ctr.UUID] = rr
204                 wkr.lastUUID = ctr.UUID
205         }()
206 }
207
208 // ProbeAndUpdate conducts appropriate boot/running probes (if any)
209 // for the worker's current state. If a previous probe is still
210 // running, it does nothing.
211 //
212 // It should be called in a new goroutine.
213 func (wkr *worker) ProbeAndUpdate() {
214         select {
215         case wkr.probing <- struct{}{}:
216                 wkr.probeAndUpdate()
217                 <-wkr.probing
218         default:
219                 wkr.logger.Debug("still waiting for last probe to finish")
220         }
221 }
222
223 // probeAndUpdate calls probeBooted and/or probeRunning if needed, and
224 // updates state accordingly.
225 //
226 // In StateUnknown: Call both probeBooted and probeRunning.
227 // In StateBooting: Call probeBooted; if successful, call probeRunning.
228 // In StateRunning: Call probeRunning.
229 // In StateIdle: Call probeRunning.
230 // In StateShutdown: Do nothing.
231 //
232 // If both probes succeed, wkr.state changes to
233 // StateIdle/StateRunning.
234 //
235 // If probeRunning succeeds, wkr.running is updated. (This means
236 // wkr.running might be non-empty even in StateUnknown, if the boot
237 // probe failed.)
238 //
239 // probeAndUpdate should be called in a new goroutine.
240 func (wkr *worker) probeAndUpdate() {
241         wkr.mtx.Lock()
242         updated := wkr.updated
243         initialState := wkr.state
244         wkr.mtx.Unlock()
245
246         var (
247                 booted   bool
248                 ctrUUIDs []string
249                 ok       bool
250                 stderr   []byte // from probeBooted
251                 errLast  error  // from probeBooted or copyRunnerData
252         )
253
254         switch initialState {
255         case StateShutdown:
256                 return
257         case StateIdle, StateRunning:
258                 booted = true
259         case StateUnknown, StateBooting:
260         default:
261                 panic(fmt.Sprintf("unknown state %s", initialState))
262         }
263
264         probeStart := time.Now()
265         logger := wkr.logger.WithField("ProbeStart", probeStart)
266
267         if !booted {
268                 stderr, errLast = wkr.probeBooted()
269                 booted = errLast == nil
270                 shouldCopy := booted || initialState == StateUnknown
271                 if !booted {
272                         // Pretend this probe succeeded if another
273                         // concurrent attempt succeeded.
274                         wkr.mtx.Lock()
275                         if wkr.state == StateRunning || wkr.state == StateIdle {
276                                 booted = true
277                                 shouldCopy = false
278                         }
279                         wkr.mtx.Unlock()
280                 }
281                 if shouldCopy {
282                         _, stderrCopy, err := wkr.copyRunnerData()
283                         if err != nil {
284                                 booted = false
285                                 wkr.logger.WithError(err).WithField("stderr", string(stderrCopy)).Warn("error copying runner binary")
286                                 errLast = err
287                         }
288                 }
289                 if booted {
290                         logger.Info("instance booted; will try probeRunning")
291                 }
292         }
293         reportedBroken := false
294         if booted || initialState == StateUnknown {
295                 ctrUUIDs, reportedBroken, ok = wkr.probeRunning()
296         }
297         wkr.mtx.Lock()
298         defer wkr.mtx.Unlock()
299         if reportedBroken && wkr.idleBehavior == IdleBehaviorRun {
300                 logger.Info("probe reported broken instance")
301                 wkr.reportBootOutcome(BootOutcomeFailed)
302                 wkr.setIdleBehavior(IdleBehaviorDrain)
303         }
304         if !ok || (!booted && len(ctrUUIDs) == 0 && len(wkr.running) == 0) {
305                 if wkr.state == StateShutdown && wkr.updated.After(updated) {
306                         // Skip the logging noise if shutdown was
307                         // initiated during probe.
308                         return
309                 }
310                 // Using the start time of the probe as the timeout
311                 // threshold ensures we always initiate at least one
312                 // probe attempt after the boot/probe timeout expires
313                 // (otherwise, a slow probe failure could cause us to
314                 // shutdown an instance even though it did in fact
315                 // boot/recover before the timeout expired).
316                 dur := probeStart.Sub(wkr.probed)
317                 if wkr.shutdownIfBroken(dur) {
318                         // stderr from failed run-probes will have
319                         // been logged already, but some boot-probe
320                         // failures are normal so they are logged only
321                         // at Debug level. This may be our chance to
322                         // log some evidence about why the node never
323                         // booted, even in non-debug mode.
324                         if !booted {
325                                 wkr.reportBootOutcome(BootOutcomeFailed)
326                                 logger.WithFields(logrus.Fields{
327                                         "Duration": dur,
328                                         "stderr":   string(stderr),
329                                 }).WithError(errLast).Info("boot failed")
330                         }
331                 }
332                 return
333         }
334
335         updateTime := time.Now()
336         wkr.probed = updateTime
337
338         if updated != wkr.updated {
339                 // Worker was updated after the probe began, so
340                 // wkr.running might have a container UUID that was
341                 // not yet running when ctrUUIDs was generated. Leave
342                 // wkr.running alone and wait for the next probe to
343                 // catch up on any changes.
344                 logger.WithFields(logrus.Fields{
345                         "updated":     updated,
346                         "wkr.updated": wkr.updated,
347                 }).Debug("skipping worker state update due to probe/sync race")
348                 return
349         }
350
351         if len(ctrUUIDs) > 0 {
352                 wkr.busy = updateTime
353                 wkr.lastUUID = ctrUUIDs[0]
354         } else if len(wkr.running) > 0 {
355                 // Actual last-busy time was sometime between wkr.busy
356                 // and now. Now is the earliest opportunity to take
357                 // advantage of the non-busy state, though.
358                 wkr.busy = updateTime
359         }
360
361         changed := wkr.updateRunning(ctrUUIDs)
362
363         // Update state if this was the first successful boot-probe.
364         if booted && (wkr.state == StateUnknown || wkr.state == StateBooting) {
365                 if wkr.state == StateBooting {
366                         wkr.reportTimeBetweenFirstSSHAndReadyForContainer()
367                 }
368                 // Note: this will change again below if
369                 // len(wkr.starting)+len(wkr.running) > 0.
370                 wkr.state = StateIdle
371                 changed = true
372         }
373
374         // If wkr.state and wkr.running aren't changing then there's
375         // no need to log anything, notify the scheduler, move state
376         // back and forth between idle/running, etc.
377         if !changed {
378                 return
379         }
380
381         // Log whenever a run-probe reveals crunch-run processes
382         // appearing/disappearing before boot-probe succeeds.
383         if wkr.state == StateUnknown && changed {
384                 logger.WithFields(logrus.Fields{
385                         "RunningContainers": len(wkr.running),
386                         "State":             wkr.state,
387                 }).Info("crunch-run probe succeeded, but boot probe is still failing")
388         }
389
390         if wkr.state == StateIdle && len(wkr.starting)+len(wkr.running) > 0 {
391                 wkr.state = StateRunning
392         } else if wkr.state == StateRunning && len(wkr.starting)+len(wkr.running) == 0 {
393                 wkr.state = StateIdle
394         }
395         wkr.updated = updateTime
396         if booted && (initialState == StateUnknown || initialState == StateBooting) {
397                 wkr.reportBootOutcome(BootOutcomeSucceeded)
398                 logger.WithFields(logrus.Fields{
399                         "RunningContainers": len(wkr.running),
400                         "State":             wkr.state,
401                 }).Info("probes succeeded, instance is in service")
402         }
403         go wkr.wp.notify()
404 }
405
406 func (wkr *worker) probeRunning() (running []string, reportsBroken, ok bool) {
407         cmd := wkr.wp.runnerCmd + " --list"
408         if u := wkr.instance.RemoteUser(); u != "root" {
409                 cmd = "sudo " + cmd
410         }
411         before := time.Now()
412         var stdin io.Reader
413         if prices := wkr.instance.PriceHistory(wkr.instType); len(prices) > 0 {
414                 j, _ := json.Marshal(prices)
415                 stdin = bytes.NewReader(j)
416         }
417         stdout, stderr, err := wkr.executor.Execute(nil, cmd, stdin)
418         if err != nil {
419                 wkr.logger.WithFields(logrus.Fields{
420                         "Command": cmd,
421                         "stdout":  string(stdout),
422                         "stderr":  string(stderr),
423                 }).WithError(err).Warn("probe failed")
424                 wkr.wp.mRunProbeDuration.WithLabelValues("fail").Observe(time.Now().Sub(before).Seconds())
425                 return
426         }
427         wkr.logger.WithFields(logrus.Fields{
428                 "Command": cmd,
429                 "stdout":  string(stdout),
430                 "stderr":  string(stderr),
431         }).Debug("probe succeeded")
432         wkr.wp.mRunProbeDuration.WithLabelValues("success").Observe(time.Now().Sub(before).Seconds())
433         ok = true
434
435         staleRunLock := false
436         for _, s := range strings.Split(string(stdout), "\n") {
437                 // Each line of the "crunch-run --list" output is one
438                 // of the following:
439                 //
440                 // * a container UUID, indicating that processes
441                 //   related to that container are currently running.
442                 //   Optionally followed by " stale", indicating that
443                 //   the crunch-run process itself has exited (the
444                 //   remaining process is probably arv-mount).
445                 //
446                 // * the string "broken", indicating that the instance
447                 //   appears incapable of starting containers.
448                 //
449                 // See ListProcesses() in lib/crunchrun/background.go.
450                 if s == "" {
451                         // empty string following final newline
452                 } else if s == "broken" {
453                         reportsBroken = true
454                 } else if !strings.HasPrefix(s, wkr.wp.cluster.ClusterID) {
455                         // Ignore crunch-run processes that belong to
456                         // a different cluster (e.g., a single host
457                         // running multiple clusters with the loopback
458                         // driver)
459                         continue
460                 } else if toks := strings.Split(s, " "); len(toks) == 1 {
461                         running = append(running, s)
462                 } else if toks[1] == "stale" {
463                         wkr.logger.WithField("ContainerUUID", toks[0]).Info("probe reported stale run lock")
464                         staleRunLock = true
465                 }
466         }
467         wkr.mtx.Lock()
468         defer wkr.mtx.Unlock()
469         if !staleRunLock {
470                 wkr.staleRunLockSince = time.Time{}
471         } else if wkr.staleRunLockSince.IsZero() {
472                 wkr.staleRunLockSince = time.Now()
473         } else if dur := time.Now().Sub(wkr.staleRunLockSince); dur > wkr.wp.timeoutStaleRunLock {
474                 wkr.logger.WithField("Duration", dur).Warn("reporting broken after reporting stale run lock for too long")
475                 reportsBroken = true
476         }
477         return
478 }
479
480 func (wkr *worker) probeBooted() (stderr []byte, err error) {
481         cmd := wkr.wp.bootProbeCommand
482         if cmd == "" {
483                 cmd = "true"
484         }
485         stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
486         logger := wkr.logger.WithFields(logrus.Fields{
487                 "Command": cmd,
488                 "stdout":  string(stdout),
489                 "stderr":  string(stderr),
490         })
491         if err != nil {
492                 if errors.Is(err, sshexecutor.ErrNoAddress) ||
493                         errors.As(err, new(*net.OpError)) ||
494                         errors.As(err, new(*ssh.ExitError)) {
495                         // These errors are expected while the
496                         // instance is booting, so we only log them at
497                         // debug level.
498                         logger.WithError(err).Debug("boot probe failed")
499                 } else {
500                         // Other errors are more likely to indicate a
501                         // configuration problem, and it's more
502                         // sysadmin-friendly to show them right away
503                         // instead of waiting until boot timeout and
504                         // only showing the last error.
505                         //
506                         // Example: "ssh: handshake failed: ssh:
507                         // unable to authenticate, attempted methods
508                         // [none publickey], no supported methods
509                         // remain"
510                         logger.WithError(err).Warn("boot probe failed")
511                 }
512                 return stderr, err
513         }
514         logger.Info("boot probe succeeded")
515         return stderr, nil
516 }
517
518 func (wkr *worker) copyRunnerData() (stdout, stderr []byte, err error) {
519         if err = wkr.wp.loadRunnerData(); err != nil {
520                 wkr.logger.WithError(err).Warn("cannot boot worker: error loading runner binary")
521                 return
522         } else if len(wkr.wp.runnerData) == 0 {
523                 // Assume crunch-run is already installed
524                 return
525         }
526
527         hash := fmt.Sprintf("%x", wkr.wp.runnerMD5)
528         dstdir, _ := filepath.Split(wkr.wp.runnerCmd)
529         logger := wkr.logger.WithFields(logrus.Fields{
530                 "hash": hash,
531                 "path": wkr.wp.runnerCmd,
532         })
533
534         stdout, stderr, err = wkr.executor.Execute(nil, `md5sum `+wkr.wp.runnerCmd, nil)
535         if err == nil && len(stderr) == 0 && bytes.Equal(stdout, []byte(hash+"  "+wkr.wp.runnerCmd+"\n")) {
536                 logger.Info("runner binary already exists on worker, with correct hash")
537                 return
538         }
539
540         // Note touch+chmod come before writing data, to avoid the
541         // possibility of md5 being correct while file mode is
542         // incorrect.
543         cmd := `set -e; dstdir="` + dstdir + `"; dstfile="` + wkr.wp.runnerCmd + `"; mkdir -p "$dstdir"; touch "$dstfile"; chmod 0755 "$dstdir" "$dstfile"; cat >"$dstfile"`
544         if wkr.instance.RemoteUser() != "root" {
545                 cmd = `sudo sh -c '` + strings.Replace(cmd, "'", "'\\''", -1) + `'`
546         }
547         logger.WithField("cmd", cmd).Info("installing runner binary on worker")
548         stdout, stderr, err = wkr.executor.Execute(nil, cmd, bytes.NewReader(wkr.wp.runnerData))
549         return
550 }
551
552 // caller must have lock.
553 func (wkr *worker) shutdownIfBroken(dur time.Duration) bool {
554         if wkr.idleBehavior == IdleBehaviorHold {
555                 // Never shut down.
556                 return false
557         }
558         label, threshold := "", wkr.wp.timeoutProbe
559         if wkr.state == StateUnknown || wkr.state == StateBooting {
560                 label, threshold = "new ", wkr.wp.timeoutBooting
561         }
562         if dur < threshold {
563                 return false
564         }
565         wkr.logger.WithFields(logrus.Fields{
566                 "Duration": dur,
567                 "Since":    wkr.probed,
568                 "State":    wkr.state,
569         }).Warnf("%sinstance unresponsive, shutting down", label)
570         wkr.shutdown()
571         return true
572 }
573
574 // Returns true if the instance is eligible for shutdown: either it's
575 // been idle too long, or idleBehavior=Drain and nothing is running.
576 //
577 // caller must have lock.
578 func (wkr *worker) eligibleForShutdown() bool {
579         if wkr.idleBehavior == IdleBehaviorHold {
580                 return false
581         }
582         draining := wkr.idleBehavior == IdleBehaviorDrain
583         switch wkr.state {
584         case StateBooting:
585                 return draining
586         case StateIdle:
587                 return draining || time.Since(wkr.busy) >= wkr.wp.timeoutIdle
588         case StateRunning:
589                 if !draining {
590                         return false
591                 }
592                 for _, rr := range wkr.running {
593                         if !rr.givenup {
594                                 return false
595                         }
596                 }
597                 for _, rr := range wkr.starting {
598                         if !rr.givenup {
599                                 return false
600                         }
601                 }
602                 // draining, and all remaining runners are just trying
603                 // to force-kill their crunch-run procs
604                 return true
605         default:
606                 return false
607         }
608 }
609
610 // caller must have lock.
611 func (wkr *worker) shutdownIfIdle() bool {
612         if !wkr.eligibleForShutdown() {
613                 return false
614         }
615         wkr.logger.WithFields(logrus.Fields{
616                 "State":        wkr.state,
617                 "IdleDuration": stats.Duration(time.Since(wkr.busy)),
618                 "IdleBehavior": wkr.idleBehavior,
619         }).Info("shutdown worker")
620         wkr.reportBootOutcome(BootOutcomeAborted)
621         wkr.shutdown()
622         return true
623 }
624
625 // caller must have lock.
626 func (wkr *worker) shutdown() {
627         now := time.Now()
628         wkr.updated = now
629         wkr.destroyed = now
630         wkr.state = StateShutdown
631         go wkr.wp.notify()
632         go func() {
633                 err := wkr.instance.Destroy()
634                 if err != nil {
635                         wkr.logger.WithError(err).Warn("shutdown failed")
636                         return
637                 }
638         }()
639 }
640
641 // Save worker tags to cloud provider metadata, if they don't already
642 // match. Caller must have lock.
643 func (wkr *worker) saveTags() {
644         instance := wkr.instance
645         tags := instance.Tags()
646         update := cloud.InstanceTags{
647                 wkr.wp.tagKeyPrefix + tagKeyInstanceType: wkr.instType.Name,
648                 wkr.wp.tagKeyPrefix + tagKeyIdleBehavior: string(wkr.idleBehavior),
649         }
650         save := false
651         for k, v := range update {
652                 if tags[k] != v {
653                         tags[k] = v
654                         save = true
655                 }
656         }
657         if save {
658                 go func() {
659                         err := instance.SetTags(tags)
660                         if err != nil {
661                                 wkr.wp.logger.WithField("Instance", instance.ID()).WithError(err).Warnf("error updating tags")
662                         }
663                 }()
664         }
665 }
666
667 func (wkr *worker) Close() {
668         // This might take time, so do it after unlocking mtx.
669         defer wkr.executor.Close()
670
671         wkr.mtx.Lock()
672         defer wkr.mtx.Unlock()
673         for uuid, rr := range wkr.running {
674                 wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process abandoned")
675                 rr.Close()
676                 delete(wkr.running, uuid)
677         }
678         for uuid, rr := range wkr.starting {
679                 wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process abandoned")
680                 rr.Close()
681                 delete(wkr.starting, uuid)
682         }
683 }
684
685 // Add/remove entries in wkr.running to match ctrUUIDs returned by a
686 // probe. Returns true if anything was added or removed.
687 //
688 // Caller must have lock.
689 func (wkr *worker) updateRunning(ctrUUIDs []string) (changed bool) {
690         alive := map[string]bool{}
691         for _, uuid := range ctrUUIDs {
692                 alive[uuid] = true
693                 if _, ok := wkr.running[uuid]; ok {
694                         // unchanged
695                 } else if rr, ok := wkr.starting[uuid]; ok {
696                         wkr.running[uuid] = rr
697                         delete(wkr.starting, uuid)
698                         changed = true
699                 } else {
700                         // We didn't start it -- it must have been
701                         // started by a previous dispatcher process.
702                         wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process detected")
703                         wkr.running[uuid] = newRemoteRunner(uuid, wkr)
704                         changed = true
705                 }
706         }
707         for uuid := range wkr.running {
708                 if !alive[uuid] {
709                         wkr.closeRunner(uuid)
710                         changed = true
711                 }
712         }
713         return
714 }
715
716 // caller must have lock.
717 func (wkr *worker) closeRunner(uuid string) {
718         rr := wkr.running[uuid]
719         if rr == nil {
720                 return
721         }
722         wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process ended")
723         delete(wkr.running, uuid)
724         rr.Close()
725
726         now := time.Now()
727         wkr.updated = now
728         wkr.wp.exited[uuid] = now
729         if wkr.state == StateRunning && len(wkr.running)+len(wkr.starting) == 0 {
730                 wkr.state = StateIdle
731         }
732 }