22207: fixed subpanel toolbar spacing
[arvados.git] / lib / dispatchcloud / worker / worker.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package worker
6
7 import (
8         "bytes"
9         "encoding/json"
10         "errors"
11         "fmt"
12         "io"
13         "net"
14         "path/filepath"
15         "strings"
16         "sync"
17         "time"
18
19         "git.arvados.org/arvados.git/lib/cloud"
20         "git.arvados.org/arvados.git/lib/dispatchcloud/sshexecutor"
21         "git.arvados.org/arvados.git/sdk/go/arvados"
22         "git.arvados.org/arvados.git/sdk/go/stats"
23         "github.com/sirupsen/logrus"
24         "golang.org/x/crypto/ssh"
25 )
26
27 const (
28         // TODO: configurable
29         maxPingFailTime = 10 * time.Minute
30 )
31
32 // State indicates whether a worker is available to do work, and (if
33 // not) whether/when it is expected to become ready.
34 type State int
35
36 const (
37         StateUnknown  State = iota // might be running a container already
38         StateBooting               // instance is booting
39         StateIdle                  // instance booted, no containers are running
40         StateRunning               // instance is running one or more containers
41         StateShutdown              // worker has stopped monitoring the instance
42 )
43
44 var stateString = map[State]string{
45         StateUnknown:  "unknown",
46         StateBooting:  "booting",
47         StateIdle:     "idle",
48         StateRunning:  "running",
49         StateShutdown: "shutdown",
50 }
51
52 // String implements fmt.Stringer.
53 func (s State) String() string {
54         return stateString[s]
55 }
56
57 // MarshalText implements encoding.TextMarshaler so a JSON encoding of
58 // map[State]anything uses the state's string representation.
59 func (s State) MarshalText() ([]byte, error) {
60         return []byte(stateString[s]), nil
61 }
62
63 // BootOutcome is the result of a worker boot. It is used as a label in a metric.
64 type BootOutcome string
65
66 const (
67         BootOutcomeFailed      BootOutcome = "failure"
68         BootOutcomeSucceeded   BootOutcome = "success"
69         BootOutcomeAborted     BootOutcome = "aborted"
70         BootOutcomeDisappeared BootOutcome = "disappeared"
71 )
72
73 var validBootOutcomes = map[BootOutcome]bool{
74         BootOutcomeFailed:      true,
75         BootOutcomeSucceeded:   true,
76         BootOutcomeAborted:     true,
77         BootOutcomeDisappeared: true,
78 }
79
80 // IdleBehavior indicates the behavior desired when a node becomes idle.
81 type IdleBehavior string
82
83 const (
84         IdleBehaviorRun   IdleBehavior = "run"   // run containers, or shutdown on idle timeout
85         IdleBehaviorHold  IdleBehavior = "hold"  // don't shutdown or run more containers
86         IdleBehaviorDrain IdleBehavior = "drain" // shutdown immediately when idle
87 )
88
89 var validIdleBehavior = map[IdleBehavior]bool{
90         IdleBehaviorRun:   true,
91         IdleBehaviorHold:  true,
92         IdleBehaviorDrain: true,
93 }
94
95 type worker struct {
96         logger   logrus.FieldLogger
97         executor Executor
98         wp       *Pool
99
100         mtx                 sync.Locker // must be wp's Locker.
101         state               State
102         idleBehavior        IdleBehavior
103         instance            cloud.Instance
104         instType            arvados.InstanceType
105         vcpus               int64
106         memory              int64
107         appeared            time.Time
108         probed              time.Time
109         updated             time.Time
110         busy                time.Time
111         destroyed           time.Time
112         firstSSHConnection  time.Time
113         lastUUID            string
114         running             map[string]*remoteRunner // remember to update state idle<->running when this changes
115         starting            map[string]*remoteRunner // remember to update state idle<->running when this changes
116         probing             chan struct{}
117         bootOutcomeReported bool
118         timeToReadyReported bool
119         staleRunLockSince   time.Time
120 }
121
122 func (wkr *worker) onUnkillable(uuid string) {
123         wkr.mtx.Lock()
124         defer wkr.mtx.Unlock()
125         logger := wkr.logger.WithField("ContainerUUID", uuid)
126         if wkr.idleBehavior == IdleBehaviorHold {
127                 logger.Warn("unkillable container, but worker has IdleBehavior=Hold")
128                 return
129         }
130         logger.Warn("unkillable container, draining worker")
131         wkr.setIdleBehavior(IdleBehaviorDrain)
132 }
133
134 func (wkr *worker) onKilled(uuid string) {
135         wkr.mtx.Lock()
136         defer wkr.mtx.Unlock()
137         wkr.closeRunner(uuid)
138         go wkr.wp.notify()
139 }
140
141 // caller must have lock.
142 func (wkr *worker) reportBootOutcome(outcome BootOutcome) {
143         if wkr.bootOutcomeReported {
144                 return
145         }
146         if wkr.wp.mBootOutcomes != nil {
147                 wkr.wp.mBootOutcomes.WithLabelValues(string(outcome)).Inc()
148         }
149         wkr.bootOutcomeReported = true
150 }
151
152 // caller must have lock.
153 func (wkr *worker) reportTimeBetweenFirstSSHAndReadyForContainer() {
154         if wkr.timeToReadyReported {
155                 return
156         }
157         if wkr.wp.mTimeToSSH != nil {
158                 wkr.wp.mTimeToReadyForContainer.Observe(time.Since(wkr.firstSSHConnection).Seconds())
159         }
160         wkr.timeToReadyReported = true
161 }
162
163 // caller must have lock.
164 func (wkr *worker) setIdleBehavior(idleBehavior IdleBehavior) {
165         wkr.logger.WithField("IdleBehavior", idleBehavior).Info("set idle behavior")
166         wkr.idleBehavior = idleBehavior
167         wkr.saveTags()
168         wkr.shutdownIfIdle()
169 }
170
171 // caller must have lock.
172 func (wkr *worker) startContainer(ctr arvados.Container) {
173         logger := wkr.logger.WithFields(logrus.Fields{
174                 "ContainerUUID": ctr.UUID,
175                 "Priority":      ctr.Priority,
176         })
177         logger.Debug("starting container")
178         rr := newRemoteRunner(ctr.UUID, wkr)
179         wkr.starting[ctr.UUID] = rr
180         if wkr.state != StateRunning {
181                 wkr.state = StateRunning
182                 go wkr.wp.notify()
183         }
184         go func() {
185                 rr.Start()
186                 if wkr.wp.mTimeFromQueueToCrunchRun != nil {
187                         wkr.wp.mTimeFromQueueToCrunchRun.Observe(time.Since(ctr.CreatedAt).Seconds())
188                 }
189                 wkr.mtx.Lock()
190                 defer wkr.mtx.Unlock()
191                 if wkr.starting[ctr.UUID] != rr {
192                         // Someone else (e.g., wkr.probeAndUpdate() ->
193                         // wkr.updateRunning() or wkr.Close()) already
194                         // moved our runner from wkr.starting to
195                         // wkr.running or deleted it while we were in
196                         // rr.Start().
197                         return
198                 }
199                 now := time.Now()
200                 wkr.updated = now
201                 wkr.busy = now
202                 delete(wkr.starting, ctr.UUID)
203                 wkr.running[ctr.UUID] = rr
204                 wkr.lastUUID = ctr.UUID
205         }()
206 }
207
208 // ProbeAndUpdate conducts appropriate boot/running probes (if any)
209 // for the worker's current state. If a previous probe is still
210 // running, it does nothing.
211 //
212 // It should be called in a new goroutine.
213 func (wkr *worker) ProbeAndUpdate() {
214         select {
215         case wkr.probing <- struct{}{}:
216                 wkr.probeAndUpdate()
217                 <-wkr.probing
218         default:
219                 wkr.logger.Debug("still waiting for last probe to finish")
220         }
221 }
222
223 // probeAndUpdate calls probeBooted and/or probeRunning if needed, and
224 // updates state accordingly.
225 //
226 // In StateUnknown: Call both probeBooted and probeRunning.
227 // In StateBooting: Call probeBooted; if successful, call probeRunning.
228 // In StateRunning: Call probeRunning.
229 // In StateIdle: Call probeRunning.
230 // In StateShutdown: Do nothing.
231 //
232 // If both probes succeed, wkr.state changes to
233 // StateIdle/StateRunning.
234 //
235 // If probeRunning succeeds, wkr.running is updated. (This means
236 // wkr.running might be non-empty even in StateUnknown, if the boot
237 // probe failed.)
238 //
239 // probeAndUpdate should be called in a new goroutine.
240 func (wkr *worker) probeAndUpdate() {
241         wkr.mtx.Lock()
242         updated := wkr.updated
243         initialState := wkr.state
244         wkr.mtx.Unlock()
245
246         var (
247                 booted   bool
248                 ctrUUIDs []string
249                 ok       bool
250                 stderr   []byte // from probeBooted
251                 errLast  error  // from probeBooted or copyRunnerData
252         )
253
254         switch initialState {
255         case StateShutdown:
256                 return
257         case StateIdle, StateRunning:
258                 booted = true
259         case StateUnknown, StateBooting:
260         default:
261                 panic(fmt.Sprintf("unknown state %s", initialState))
262         }
263
264         probeStart := time.Now()
265         logger := wkr.logger.WithField("ProbeStart", probeStart)
266
267         if !booted {
268                 stderr, errLast = wkr.probeBooted()
269                 booted = errLast == nil
270                 shouldCopy := booted || initialState == StateUnknown
271                 if !booted {
272                         // Pretend this probe succeeded if another
273                         // concurrent attempt succeeded.
274                         wkr.mtx.Lock()
275                         if wkr.state == StateRunning || wkr.state == StateIdle {
276                                 booted = true
277                                 shouldCopy = false
278                         }
279                         wkr.mtx.Unlock()
280                 }
281                 if shouldCopy {
282                         _, stderrCopy, err := wkr.copyRunnerData()
283                         if err != nil {
284                                 booted = false
285                                 wkr.logger.WithError(err).WithField("stderr", string(stderrCopy)).Warn("error copying runner binary")
286                                 errLast = err
287                         }
288                 }
289                 if booted {
290                         logger.Info("instance booted; will try probeRunning")
291                 }
292         }
293         reportedBroken := false
294         if booted || initialState == StateUnknown {
295                 ctrUUIDs, reportedBroken, ok = wkr.probeRunning()
296         }
297         wkr.mtx.Lock()
298         defer wkr.mtx.Unlock()
299         if reportedBroken && wkr.idleBehavior == IdleBehaviorRun {
300                 logger.Info("probe reported broken instance")
301                 wkr.reportBootOutcome(BootOutcomeFailed)
302                 wkr.setIdleBehavior(IdleBehaviorDrain)
303         }
304         if !ok || (!booted && len(ctrUUIDs) == 0 && len(wkr.running) == 0) {
305                 if wkr.state == StateShutdown && wkr.updated.After(updated) {
306                         // Skip the logging noise if shutdown was
307                         // initiated during probe.
308                         return
309                 }
310                 // Using the start time of the probe as the timeout
311                 // threshold ensures we always initiate at least one
312                 // probe attempt after the boot/probe timeout expires
313                 // (otherwise, a slow probe failure could cause us to
314                 // shutdown an instance even though it did in fact
315                 // boot/recover before the timeout expired).
316                 dur := probeStart.Sub(wkr.probed)
317                 if wkr.shutdownIfBroken(dur) {
318                         // stderr from failed run-probes will have
319                         // been logged already, but some boot-probe
320                         // failures are normal so they are logged only
321                         // at Debug level. This may be our chance to
322                         // log some evidence about why the node never
323                         // booted, even in non-debug mode.
324                         if !booted {
325                                 wkr.reportBootOutcome(BootOutcomeFailed)
326                                 logger.WithFields(logrus.Fields{
327                                         "Duration": dur,
328                                         "stderr":   string(stderr),
329                                 }).WithError(errLast).Info("boot failed")
330                         }
331                 }
332                 return
333         }
334
335         updateTime := time.Now()
336         wkr.probed = updateTime
337
338         if updated != wkr.updated {
339                 // Worker was updated after the probe began, so
340                 // wkr.running might have a container UUID that was
341                 // not yet running when ctrUUIDs was generated. Leave
342                 // wkr.running alone and wait for the next probe to
343                 // catch up on any changes.
344                 logger.WithFields(logrus.Fields{
345                         "updated":     updated,
346                         "wkr.updated": wkr.updated,
347                 }).Debug("skipping worker state update due to probe/sync race")
348                 return
349         }
350
351         if len(ctrUUIDs) > 0 {
352                 wkr.busy = updateTime
353                 wkr.lastUUID = ctrUUIDs[0]
354         } else if len(wkr.running) > 0 {
355                 // Actual last-busy time was sometime between wkr.busy
356                 // and now. Now is the earliest opportunity to take
357                 // advantage of the non-busy state, though.
358                 wkr.busy = updateTime
359         }
360
361         changed := wkr.updateRunning(ctrUUIDs)
362
363         // Update state if this was the first successful boot-probe.
364         if booted && (wkr.state == StateUnknown || wkr.state == StateBooting) {
365                 if wkr.state == StateBooting {
366                         wkr.reportTimeBetweenFirstSSHAndReadyForContainer()
367                 }
368                 // Note: this will change again below if
369                 // len(wkr.starting)+len(wkr.running) > 0.
370                 wkr.state = StateIdle
371                 changed = true
372         }
373
374         // If wkr.state and wkr.running aren't changing then there's
375         // no need to log anything, notify the scheduler, move state
376         // back and forth between idle/running, etc.
377         if !changed {
378                 return
379         }
380
381         // Log whenever a run-probe reveals crunch-run processes
382         // appearing/disappearing before boot-probe succeeds.
383         if wkr.state == StateUnknown && changed {
384                 logger.WithFields(logrus.Fields{
385                         "RunningContainers": len(wkr.running),
386                         "State":             wkr.state,
387                 }).Info("crunch-run probe succeeded, but boot probe is still failing")
388         }
389
390         if wkr.state == StateIdle && len(wkr.starting)+len(wkr.running) > 0 {
391                 wkr.state = StateRunning
392         } else if wkr.state == StateRunning && len(wkr.starting)+len(wkr.running) == 0 {
393                 wkr.state = StateIdle
394         }
395         wkr.updated = updateTime
396         if booted && (initialState == StateUnknown || initialState == StateBooting) {
397                 wkr.reportBootOutcome(BootOutcomeSucceeded)
398                 logger.WithFields(logrus.Fields{
399                         "RunningContainers": len(wkr.running),
400                         "State":             wkr.state,
401                 }).Info("probes succeeded, instance is in service")
402         }
403         go wkr.wp.notify()
404 }
405
406 func (wkr *worker) probeRunning() (running []string, reportsBroken, ok bool) {
407         cmd := wkr.wp.runnerCmd + " --list"
408         if u := wkr.instance.RemoteUser(); u != "root" {
409                 cmd = "sudo " + cmd
410         }
411         before := time.Now()
412         var stdin io.Reader
413         if prices := wkr.instance.PriceHistory(wkr.instType); len(prices) > 0 {
414                 j, _ := json.Marshal(prices)
415                 stdin = bytes.NewReader(j)
416         }
417         stdout, stderr, err := wkr.executor.Execute(nil, cmd, stdin)
418         if err != nil {
419                 wkr.logger.WithFields(logrus.Fields{
420                         "Command": cmd,
421                         "stdout":  string(stdout),
422                         "stderr":  string(stderr),
423                 }).WithError(err).Warn("probe failed")
424                 wkr.wp.mRunProbeDuration.WithLabelValues("fail").Observe(time.Now().Sub(before).Seconds())
425                 return
426         }
427         wkr.logger.WithFields(logrus.Fields{
428                 "Command": cmd,
429                 "stdout":  string(stdout),
430                 "stderr":  string(stderr),
431         }).Debug("probe succeeded")
432         wkr.wp.mRunProbeDuration.WithLabelValues("success").Observe(time.Now().Sub(before).Seconds())
433         ok = true
434
435         staleRunLock := false
436         for _, s := range strings.Split(string(stdout), "\n") {
437                 // Each line of the "crunch-run --list" output is one
438                 // of the following:
439                 //
440                 // * a container UUID, indicating that processes
441                 //   related to that container are currently running.
442                 //   Optionally followed by " stale", indicating that
443                 //   the crunch-run process itself has exited (the
444                 //   remaining process is probably arv-mount).
445                 //
446                 // * the string "broken", indicating that the instance
447                 //   appears incapable of starting containers.
448                 //
449                 // See ListProcesses() in lib/crunchrun/background.go.
450                 if s == "" {
451                         // empty string following final newline
452                 } else if s == "broken" {
453                         reportsBroken = true
454                 } else if !strings.HasPrefix(s, wkr.wp.cluster.ClusterID) {
455                         // Ignore crunch-run processes that belong to
456                         // a different cluster (e.g., a single host
457                         // running multiple clusters with the loopback
458                         // driver)
459                         continue
460                 } else if toks := strings.Split(s, " "); len(toks) == 1 {
461                         running = append(running, s)
462                 } else if toks[1] == "stale" {
463                         wkr.logger.WithField("ContainerUUID", toks[0]).Info("probe reported stale run lock")
464                         staleRunLock = true
465                 }
466         }
467         wkr.mtx.Lock()
468         defer wkr.mtx.Unlock()
469         if !staleRunLock {
470                 wkr.staleRunLockSince = time.Time{}
471         } else if wkr.staleRunLockSince.IsZero() {
472                 wkr.staleRunLockSince = time.Now()
473         } else if dur := time.Now().Sub(wkr.staleRunLockSince); dur > wkr.wp.timeoutStaleRunLock {
474                 wkr.logger.WithField("Duration", dur).Warn("reporting broken after reporting stale run lock for too long")
475                 reportsBroken = true
476         }
477         return
478 }
479
480 func (wkr *worker) probeBooted() (stderr []byte, err error) {
481         cmd := wkr.wp.bootProbeCommand
482         if cmd == "" {
483                 cmd = "true"
484         }
485         stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
486         logger := wkr.logger.WithFields(logrus.Fields{
487                 "Command": cmd,
488                 "stdout":  string(stdout),
489                 "stderr":  string(stderr),
490         })
491         if err != nil {
492                 if errors.Is(err, sshexecutor.ErrNoAddress) ||
493                         errors.As(err, new(*net.OpError)) ||
494                         errors.As(err, new(*ssh.ExitError)) {
495                         // These errors are expected while the
496                         // instance is booting, so we only log them at
497                         // debug level.
498                         logger.WithError(err).Debug("boot probe failed")
499                 } else {
500                         // Other errors are more likely to indicate a
501                         // configuration problem, and it's more
502                         // sysadmin-friendly to show them right away
503                         // instead of waiting until boot timeout and
504                         // only showing the last error.
505                         //
506                         // Example: "ssh: handshake failed: ssh:
507                         // unable to authenticate, attempted methods
508                         // [none publickey], no supported methods
509                         // remain"
510                         logger.WithError(err).Warn("boot probe failed")
511                 }
512                 return stderr, err
513         }
514         logger.Info("boot probe succeeded")
515         return stderr, nil
516 }
517
518 func (wkr *worker) copyRunnerData() (stdout, stderr []byte, err error) {
519         if err = wkr.wp.loadRunnerData(); err != nil {
520                 wkr.logger.WithError(err).Warn("cannot boot worker: error loading runner binary")
521                 return
522         } else if len(wkr.wp.runnerData) == 0 {
523                 // Assume crunch-run is already installed
524                 return
525         }
526
527         hash := fmt.Sprintf("%x", wkr.wp.runnerMD5)
528         dstdir, _ := filepath.Split(wkr.wp.runnerCmd)
529         logger := wkr.logger.WithFields(logrus.Fields{
530                 "hash": hash,
531                 "path": wkr.wp.runnerCmd,
532         })
533
534         stdout, stderr, err = wkr.executor.Execute(nil, `md5sum `+wkr.wp.runnerCmd, nil)
535         if err == nil && len(stderr) == 0 && bytes.Equal(stdout, []byte(hash+"  "+wkr.wp.runnerCmd+"\n")) {
536                 logger.Info("runner binary already exists on worker, with correct hash")
537                 return
538         }
539
540         // Note touch+chmod come before writing data, to avoid the
541         // possibility of md5 being correct while file mode is
542         // incorrect.
543         cmd := `set -e; dstdir="` + dstdir + `"; dstfile="` + wkr.wp.runnerCmd + `"; mkdir -p "$dstdir"; touch "$dstfile"; chmod 0755 "$dstdir" "$dstfile"; cat >"$dstfile"`
544         if wkr.instance.RemoteUser() != "root" {
545                 cmd = `sudo sh -c '` + strings.Replace(cmd, "'", "'\\''", -1) + `'`
546         }
547         logger.WithField("cmd", cmd).Info("installing runner binary on worker")
548         stdout, stderr, err = wkr.executor.Execute(nil, cmd, bytes.NewReader(wkr.wp.runnerData))
549         return
550 }
551
552 // caller must have lock.
553 func (wkr *worker) shutdownIfBroken(dur time.Duration) bool {
554         if wkr.idleBehavior == IdleBehaviorHold {
555                 // Never shut down.
556                 return false
557         }
558         prologue, epilogue, threshold := "", "", wkr.wp.timeoutProbe
559         if wkr.state == StateUnknown || wkr.state == StateBooting {
560                 prologue = "new "
561                 epilogue = " -- `arvados-server cloudtest` might help troubleshoot, see https://doc.arvados.org/main/admin/cloudtest.html"
562                 threshold = wkr.wp.timeoutBooting
563         }
564         if dur < threshold {
565                 return false
566         }
567         wkr.logger.WithFields(logrus.Fields{
568                 "Duration": dur,
569                 "Since":    wkr.probed,
570                 "State":    wkr.state,
571         }).Warnf("%sinstance unresponsive, shutting down%s", prologue, epilogue)
572         wkr.shutdown()
573         return true
574 }
575
576 // Returns true if the instance is eligible for shutdown: either it's
577 // been idle too long, or idleBehavior=Drain and nothing is running.
578 //
579 // caller must have lock.
580 func (wkr *worker) eligibleForShutdown() bool {
581         if wkr.idleBehavior == IdleBehaviorHold {
582                 return false
583         }
584         draining := wkr.idleBehavior == IdleBehaviorDrain
585         switch wkr.state {
586         case StateBooting:
587                 return draining
588         case StateIdle:
589                 return draining || time.Since(wkr.busy) >= wkr.wp.timeoutIdle
590         case StateRunning:
591                 if !draining {
592                         return false
593                 }
594                 for _, rr := range wkr.running {
595                         if !rr.givenup {
596                                 return false
597                         }
598                 }
599                 for _, rr := range wkr.starting {
600                         if !rr.givenup {
601                                 return false
602                         }
603                 }
604                 // draining, and all remaining runners are just trying
605                 // to force-kill their crunch-run procs
606                 return true
607         default:
608                 return false
609         }
610 }
611
612 // caller must have lock.
613 func (wkr *worker) shutdownIfIdle() bool {
614         if !wkr.eligibleForShutdown() {
615                 return false
616         }
617         wkr.logger.WithFields(logrus.Fields{
618                 "State":        wkr.state,
619                 "IdleDuration": stats.Duration(time.Since(wkr.busy)),
620                 "IdleBehavior": wkr.idleBehavior,
621         }).Info("shutdown worker")
622         wkr.reportBootOutcome(BootOutcomeAborted)
623         wkr.shutdown()
624         return true
625 }
626
627 // caller must have lock.
628 func (wkr *worker) shutdown() {
629         now := time.Now()
630         wkr.updated = now
631         wkr.destroyed = now
632         wkr.state = StateShutdown
633         go wkr.wp.notify()
634         go func() {
635                 err := wkr.instance.Destroy()
636                 if err != nil {
637                         wkr.logger.WithError(err).Warn("shutdown failed")
638                         return
639                 }
640         }()
641 }
642
643 // Save worker tags to cloud provider metadata, if they don't already
644 // match. Caller must have lock.
645 func (wkr *worker) saveTags() {
646         instance := wkr.instance
647         tags := instance.Tags()
648         update := cloud.InstanceTags{
649                 wkr.wp.tagKeyPrefix + tagKeyInstanceType: wkr.instType.Name,
650                 wkr.wp.tagKeyPrefix + tagKeyIdleBehavior: string(wkr.idleBehavior),
651         }
652         save := false
653         for k, v := range update {
654                 if tags[k] != v {
655                         tags[k] = v
656                         save = true
657                 }
658         }
659         if save {
660                 go func() {
661                         err := instance.SetTags(tags)
662                         if err != nil {
663                                 wkr.wp.logger.WithField("Instance", instance.ID()).WithError(err).Warnf("error updating tags")
664                         }
665                 }()
666         }
667 }
668
669 func (wkr *worker) Close() {
670         // This might take time, so do it after unlocking mtx.
671         defer wkr.executor.Close()
672
673         wkr.mtx.Lock()
674         defer wkr.mtx.Unlock()
675         for uuid, rr := range wkr.running {
676                 wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process abandoned")
677                 rr.Close()
678                 delete(wkr.running, uuid)
679         }
680         for uuid, rr := range wkr.starting {
681                 wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process abandoned")
682                 rr.Close()
683                 delete(wkr.starting, uuid)
684         }
685 }
686
687 // Add/remove entries in wkr.running to match ctrUUIDs returned by a
688 // probe. Returns true if anything was added or removed.
689 //
690 // Caller must have lock.
691 func (wkr *worker) updateRunning(ctrUUIDs []string) (changed bool) {
692         alive := map[string]bool{}
693         for _, uuid := range ctrUUIDs {
694                 alive[uuid] = true
695                 if _, ok := wkr.running[uuid]; ok {
696                         // unchanged
697                 } else if rr, ok := wkr.starting[uuid]; ok {
698                         wkr.running[uuid] = rr
699                         delete(wkr.starting, uuid)
700                         changed = true
701                 } else {
702                         // We didn't start it -- it must have been
703                         // started by a previous dispatcher process.
704                         wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process detected")
705                         wkr.running[uuid] = newRemoteRunner(uuid, wkr)
706                         changed = true
707                 }
708         }
709         for uuid := range wkr.running {
710                 if !alive[uuid] {
711                         wkr.closeRunner(uuid)
712                         changed = true
713                 }
714         }
715         return
716 }
717
718 // caller must have lock.
719 func (wkr *worker) closeRunner(uuid string) {
720         rr := wkr.running[uuid]
721         if rr == nil {
722                 return
723         }
724         wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process ended")
725         delete(wkr.running, uuid)
726         rr.Close()
727
728         now := time.Now()
729         wkr.updated = now
730         wkr.wp.exited[uuid] = now
731         if wkr.state == StateRunning && len(wkr.running)+len(wkr.starting) == 0 {
732                 wkr.state = StateIdle
733         }
734 }