Allow multiple clusters to use loopback driver on same host.
[arvados.git] / lib / dispatchcloud / worker / worker.go
index 9199d4bafe764d806312638328cf13fd3b422e4d..b01a820cd619b172538b725d689d0323897611d5 100644 (file)
@@ -110,6 +110,7 @@ type worker struct {
        probing             chan struct{}
        bootOutcomeReported bool
        timeToReadyReported bool
+       staleRunLockSince   time.Time
 }
 
 func (wkr *worker) onUnkillable(uuid string) {
@@ -176,6 +177,9 @@ func (wkr *worker) startContainer(ctr arvados.Container) {
        }
        go func() {
                rr.Start()
+               if wkr.wp.mTimeFromQueueToCrunchRun != nil {
+                       wkr.wp.mTimeFromQueueToCrunchRun.Observe(time.Since(ctr.CreatedAt).Seconds())
+               }
                wkr.mtx.Lock()
                defer wkr.mtx.Unlock()
                now := time.Now()
@@ -188,7 +192,7 @@ func (wkr *worker) startContainer(ctr arvados.Container) {
 }
 
 // ProbeAndUpdate conducts appropriate boot/running probes (if any)
-// for the worker's curent state. If a previous probe is still
+// for the worker's current state. If a previous probe is still
 // running, it does nothing.
 //
 // It should be called in a new goroutine.
@@ -309,6 +313,10 @@ func (wkr *worker) probeAndUpdate() {
                // not yet running when ctrUUIDs was generated. Leave
                // wkr.running alone and wait for the next probe to
                // catch up on any changes.
+               logger.WithFields(logrus.Fields{
+                       "updated":     updated,
+                       "wkr.updated": wkr.updated,
+               }).Debug("skipping worker state update due to probe/sync race")
                return
        }
 
@@ -372,6 +380,7 @@ func (wkr *worker) probeRunning() (running []string, reportsBroken, ok bool) {
        if u := wkr.instance.RemoteUser(); u != "root" {
                cmd = "sudo " + cmd
        }
+       before := time.Now()
        stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
        if err != nil {
                wkr.logger.WithFields(logrus.Fields{
@@ -379,16 +388,59 @@ func (wkr *worker) probeRunning() (running []string, reportsBroken, ok bool) {
                        "stdout":  string(stdout),
                        "stderr":  string(stderr),
                }).WithError(err).Warn("probe failed")
+               wkr.wp.mRunProbeDuration.WithLabelValues("fail").Observe(time.Now().Sub(before).Seconds())
                return
        }
+       wkr.logger.WithFields(logrus.Fields{
+               "Command": cmd,
+               "stdout":  string(stdout),
+               "stderr":  string(stderr),
+       }).Debug("probe succeeded")
+       wkr.wp.mRunProbeDuration.WithLabelValues("success").Observe(time.Now().Sub(before).Seconds())
        ok = true
+
+       staleRunLock := false
        for _, s := range strings.Split(string(stdout), "\n") {
-               if s == "broken" {
+               // Each line of the "crunch-run --list" output is one
+               // of the following:
+               //
+               // * a container UUID, indicating that processes
+               //   related to that container are currently running.
+               //   Optionally followed by " stale", indicating that
+               //   the crunch-run process itself has exited (the
+               //   remaining process is probably arv-mount).
+               //
+               // * the string "broken", indicating that the instance
+               //   appears incapable of starting containers.
+               //
+               // See ListProcesses() in lib/crunchrun/background.go.
+               if s == "" {
+                       // empty string following final newline
+               } else if s == "broken" {
                        reportsBroken = true
-               } else if s != "" {
+               } else if !strings.HasPrefix(s, wkr.wp.cluster.ClusterID) {
+                       // Ignore crunch-run processes that belong to
+                       // a different cluster (e.g., a single host
+                       // running multiple clusters with the loopback
+                       // driver)
+                       continue
+               } else if toks := strings.Split(s, " "); len(toks) == 1 {
                        running = append(running, s)
+               } else if toks[1] == "stale" {
+                       wkr.logger.WithField("ContainerUUID", toks[0]).Info("probe reported stale run lock")
+                       staleRunLock = true
                }
        }
+       wkr.mtx.Lock()
+       defer wkr.mtx.Unlock()
+       if !staleRunLock {
+               wkr.staleRunLockSince = time.Time{}
+       } else if wkr.staleRunLockSince.IsZero() {
+               wkr.staleRunLockSince = time.Now()
+       } else if dur := time.Now().Sub(wkr.staleRunLockSince); dur > wkr.wp.timeoutStaleRunLock {
+               wkr.logger.WithField("Duration", dur).Warn("reporting broken after reporting stale run lock for too long")
+               reportsBroken = true
+       }
        return
 }