Merge branch '18874-merge-wb2'
[arvados.git] / lib / dispatchcloud / worker / worker.go
index e6e817458ebf4606a6dd453561b5224452e53b8f..10a28157e43ee0e496fd82b8705542a58aa0c8ca 100644 (file)
@@ -188,6 +188,14 @@ func (wkr *worker) startContainer(ctr arvados.Container) {
                }
                wkr.mtx.Lock()
                defer wkr.mtx.Unlock()
+               if wkr.starting[ctr.UUID] != rr {
+                       // Someone else (e.g., wkr.probeAndUpdate() ->
+                       // wkr.updateRunning() or wkr.Close()) already
+                       // moved our runner from wkr.starting to
+                       // wkr.running or deleted it while we were in
+                       // rr.Start().
+                       return
+               }
                now := time.Now()
                wkr.updated = now
                wkr.busy = now
@@ -240,6 +248,7 @@ func (wkr *worker) probeAndUpdate() {
                ctrUUIDs []string
                ok       bool
                stderr   []byte // from probeBooted
+               errLast  error  // from probeBooted or copyRunnerData
        )
 
        switch initialState {
@@ -256,7 +265,8 @@ func (wkr *worker) probeAndUpdate() {
        logger := wkr.logger.WithField("ProbeStart", probeStart)
 
        if !booted {
-               booted, stderr = wkr.probeBooted()
+               stderr, errLast = wkr.probeBooted()
+               booted = errLast == nil
                shouldCopy := booted || initialState == StateUnknown
                if !booted {
                        // Pretend this probe succeeded if another
@@ -273,6 +283,7 @@ func (wkr *worker) probeAndUpdate() {
                        if err != nil {
                                booted = false
                                wkr.logger.WithError(err).WithField("stderr", string(stderrCopy)).Warn("error copying runner binary")
+                               errLast = err
                        }
                }
                if booted {
@@ -315,7 +326,7 @@ func (wkr *worker) probeAndUpdate() {
                                logger.WithFields(logrus.Fields{
                                        "Duration": dur,
                                        "stderr":   string(stderr),
-                               }).Info("boot failed")
+                               }).WithError(errLast).Info("boot failed")
                        }
                }
                return
@@ -466,7 +477,7 @@ func (wkr *worker) probeRunning() (running []string, reportsBroken, ok bool) {
        return
 }
 
-func (wkr *worker) probeBooted() (ok bool, stderr []byte) {
+func (wkr *worker) probeBooted() (stderr []byte, err error) {
        cmd := wkr.wp.bootProbeCommand
        if cmd == "" {
                cmd = "true"
@@ -498,10 +509,10 @@ func (wkr *worker) probeBooted() (ok bool, stderr []byte) {
                        // remain"
                        logger.WithError(err).Warn("boot probe failed")
                }
-               return false, stderr
+               return stderr, err
        }
        logger.Info("boot probe succeeded")
-       return true, stderr
+       return stderr, nil
 }
 
 func (wkr *worker) copyRunnerData() (stdout, stderr []byte, err error) {
@@ -544,9 +555,11 @@ func (wkr *worker) shutdownIfBroken(dur time.Duration) bool {
                // Never shut down.
                return false
        }
-       label, threshold := "", wkr.wp.timeoutProbe
+       prologue, epilogue, threshold := "", "", wkr.wp.timeoutProbe
        if wkr.state == StateUnknown || wkr.state == StateBooting {
-               label, threshold = "new ", wkr.wp.timeoutBooting
+               prologue = "new "
+               epilogue = " -- `arvados-server cloudtest` might help troubleshoot, see https://doc.arvados.org/main/admin/cloudtest.html"
+               threshold = wkr.wp.timeoutBooting
        }
        if dur < threshold {
                return false
@@ -555,7 +568,7 @@ func (wkr *worker) shutdownIfBroken(dur time.Duration) bool {
                "Duration": dur,
                "Since":    wkr.probed,
                "State":    wkr.state,
-       }).Warnf("%sinstance unresponsive, shutting down", label)
+       }).Warnf("%sinstance unresponsive, shutting down%s", prologue, epilogue)
        wkr.shutdown()
        return true
 }
@@ -662,10 +675,12 @@ func (wkr *worker) Close() {
        for uuid, rr := range wkr.running {
                wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process abandoned")
                rr.Close()
+               delete(wkr.running, uuid)
        }
        for uuid, rr := range wkr.starting {
                wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process abandoned")
                rr.Close()
+               delete(wkr.starting, uuid)
        }
 }