Allow multiple clusters to use loopback driver on same host.
[arvados.git] / lib / dispatchcloud / worker / worker.go
index 03ab15176f5297b85182d3689b71f5a3f0195004..b01a820cd619b172538b725d689d0323897611d5 100644 (file)
@@ -5,14 +5,16 @@
 package worker
 
 import (
+       "bytes"
        "fmt"
+       "path/filepath"
        "strings"
        "sync"
        "time"
 
-       "git.curoverse.com/arvados.git/lib/cloud"
-       "git.curoverse.com/arvados.git/sdk/go/arvados"
-       "git.curoverse.com/arvados.git/sdk/go/stats"
+       "git.arvados.org/arvados.git/lib/cloud"
+       "git.arvados.org/arvados.git/sdk/go/arvados"
+       "git.arvados.org/arvados.git/sdk/go/stats"
        "github.com/sirupsen/logrus"
 )
 
@@ -52,6 +54,23 @@ func (s State) MarshalText() ([]byte, error) {
        return []byte(stateString[s]), nil
 }
 
+// BootOutcome is the result of a worker boot. It is used as a label in a metric.
+type BootOutcome string
+
+const (
+       BootOutcomeFailed      BootOutcome = "failure"
+       BootOutcomeSucceeded   BootOutcome = "success"
+       BootOutcomeAborted     BootOutcome = "aborted"
+       BootOutcomeDisappeared BootOutcome = "disappeared"
+)
+
+var validBootOutcomes = map[BootOutcome]bool{
+       BootOutcomeFailed:      true,
+       BootOutcomeSucceeded:   true,
+       BootOutcomeAborted:     true,
+       BootOutcomeDisappeared: true,
+}
+
 // IdleBehavior indicates the behavior desired when a node becomes idle.
 type IdleBehavior string
 
@@ -72,22 +91,26 @@ type worker struct {
        executor Executor
        wp       *Pool
 
-       mtx          sync.Locker // must be wp's Locker.
-       state        State
-       idleBehavior IdleBehavior
-       instance     cloud.Instance
-       instType     arvados.InstanceType
-       vcpus        int64
-       memory       int64
-       appeared     time.Time
-       probed       time.Time
-       updated      time.Time
-       busy         time.Time
-       destroyed    time.Time
-       lastUUID     string
-       running      map[string]*remoteRunner // remember to update state idle<->running when this changes
-       starting     map[string]*remoteRunner // remember to update state idle<->running when this changes
-       probing      chan struct{}
+       mtx                 sync.Locker // must be wp's Locker.
+       state               State
+       idleBehavior        IdleBehavior
+       instance            cloud.Instance
+       instType            arvados.InstanceType
+       vcpus               int64
+       memory              int64
+       appeared            time.Time
+       probed              time.Time
+       updated             time.Time
+       busy                time.Time
+       destroyed           time.Time
+       firstSSHConnection  time.Time
+       lastUUID            string
+       running             map[string]*remoteRunner // remember to update state idle<->running when this changes
+       starting            map[string]*remoteRunner // remember to update state idle<->running when this changes
+       probing             chan struct{}
+       bootOutcomeReported bool
+       timeToReadyReported bool
+       staleRunLockSince   time.Time
 }
 
 func (wkr *worker) onUnkillable(uuid string) {
@@ -109,6 +132,28 @@ func (wkr *worker) onKilled(uuid string) {
        go wkr.wp.notify()
 }
 
+// caller must have lock.
+func (wkr *worker) reportBootOutcome(outcome BootOutcome) {
+       if wkr.bootOutcomeReported {
+               return
+       }
+       if wkr.wp.mBootOutcomes != nil {
+               wkr.wp.mBootOutcomes.WithLabelValues(string(outcome)).Inc()
+       }
+       wkr.bootOutcomeReported = true
+}
+
+// caller must have lock.
+func (wkr *worker) reportTimeBetweenFirstSSHAndReadyForContainer() {
+       if wkr.timeToReadyReported {
+               return
+       }
+       if wkr.wp.mTimeToSSH != nil {
+               wkr.wp.mTimeToReadyForContainer.Observe(time.Since(wkr.firstSSHConnection).Seconds())
+       }
+       wkr.timeToReadyReported = true
+}
+
 // caller must have lock.
 func (wkr *worker) setIdleBehavior(idleBehavior IdleBehavior) {
        wkr.logger.WithField("IdleBehavior", idleBehavior).Info("set idle behavior")
@@ -132,6 +177,9 @@ func (wkr *worker) startContainer(ctr arvados.Container) {
        }
        go func() {
                rr.Start()
+               if wkr.wp.mTimeFromQueueToCrunchRun != nil {
+                       wkr.wp.mTimeFromQueueToCrunchRun.Observe(time.Since(ctr.CreatedAt).Seconds())
+               }
                wkr.mtx.Lock()
                defer wkr.mtx.Unlock()
                now := time.Now()
@@ -144,7 +192,7 @@ func (wkr *worker) startContainer(ctr arvados.Container) {
 }
 
 // ProbeAndUpdate conducts appropriate boot/running probes (if any)
-// for the worker's curent state. If a previous probe is still
+// for the worker's current state. If a previous probe is still
 // running, it does nothing.
 //
 // It should be called in a new goroutine.
@@ -222,6 +270,7 @@ func (wkr *worker) probeAndUpdate() {
        defer wkr.mtx.Unlock()
        if reportedBroken && wkr.idleBehavior == IdleBehaviorRun {
                logger.Info("probe reported broken instance")
+               wkr.reportBootOutcome(BootOutcomeFailed)
                wkr.setIdleBehavior(IdleBehaviorDrain)
        }
        if !ok || (!booted && len(ctrUUIDs) == 0 && len(wkr.running) == 0) {
@@ -245,6 +294,7 @@ func (wkr *worker) probeAndUpdate() {
                        // some evidence about why the node never
                        // booted, even in non-debug mode.
                        if !booted {
+                               wkr.reportBootOutcome(BootOutcomeFailed)
                                logger.WithFields(logrus.Fields{
                                        "Duration": dur,
                                        "stderr":   string(stderr),
@@ -263,6 +313,10 @@ func (wkr *worker) probeAndUpdate() {
                // not yet running when ctrUUIDs was generated. Leave
                // wkr.running alone and wait for the next probe to
                // catch up on any changes.
+               logger.WithFields(logrus.Fields{
+                       "updated":     updated,
+                       "wkr.updated": wkr.updated,
+               }).Debug("skipping worker state update due to probe/sync race")
                return
        }
 
@@ -280,6 +334,9 @@ func (wkr *worker) probeAndUpdate() {
 
        // Update state if this was the first successful boot-probe.
        if booted && (wkr.state == StateUnknown || wkr.state == StateBooting) {
+               if wkr.state == StateBooting {
+                       wkr.reportTimeBetweenFirstSSHAndReadyForContainer()
+               }
                // Note: this will change again below if
                // len(wkr.starting)+len(wkr.running) > 0.
                wkr.state = StateIdle
@@ -309,6 +366,7 @@ func (wkr *worker) probeAndUpdate() {
        }
        wkr.updated = updateTime
        if booted && (initialState == StateUnknown || initialState == StateBooting) {
+               wkr.reportBootOutcome(BootOutcomeSucceeded)
                logger.WithFields(logrus.Fields{
                        "RunningContainers": len(wkr.running),
                        "State":             wkr.state,
@@ -318,10 +376,11 @@ func (wkr *worker) probeAndUpdate() {
 }
 
 func (wkr *worker) probeRunning() (running []string, reportsBroken, ok bool) {
-       cmd := "crunch-run --list"
+       cmd := wkr.wp.runnerCmd + " --list"
        if u := wkr.instance.RemoteUser(); u != "root" {
                cmd = "sudo " + cmd
        }
+       before := time.Now()
        stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
        if err != nil {
                wkr.logger.WithFields(logrus.Fields{
@@ -329,16 +388,59 @@ func (wkr *worker) probeRunning() (running []string, reportsBroken, ok bool) {
                        "stdout":  string(stdout),
                        "stderr":  string(stderr),
                }).WithError(err).Warn("probe failed")
+               wkr.wp.mRunProbeDuration.WithLabelValues("fail").Observe(time.Now().Sub(before).Seconds())
                return
        }
+       wkr.logger.WithFields(logrus.Fields{
+               "Command": cmd,
+               "stdout":  string(stdout),
+               "stderr":  string(stderr),
+       }).Debug("probe succeeded")
+       wkr.wp.mRunProbeDuration.WithLabelValues("success").Observe(time.Now().Sub(before).Seconds())
        ok = true
+
+       staleRunLock := false
        for _, s := range strings.Split(string(stdout), "\n") {
-               if s == "broken" {
+               // Each line of the "crunch-run --list" output is one
+               // of the following:
+               //
+               // * a container UUID, indicating that processes
+               //   related to that container are currently running.
+               //   Optionally followed by " stale", indicating that
+               //   the crunch-run process itself has exited (the
+               //   remaining process is probably arv-mount).
+               //
+               // * the string "broken", indicating that the instance
+               //   appears incapable of starting containers.
+               //
+               // See ListProcesses() in lib/crunchrun/background.go.
+               if s == "" {
+                       // empty string following final newline
+               } else if s == "broken" {
                        reportsBroken = true
-               } else if s != "" {
+               } else if !strings.HasPrefix(s, wkr.wp.cluster.ClusterID) {
+                       // Ignore crunch-run processes that belong to
+                       // a different cluster (e.g., a single host
+                       // running multiple clusters with the loopback
+                       // driver)
+                       continue
+               } else if toks := strings.Split(s, " "); len(toks) == 1 {
                        running = append(running, s)
+               } else if toks[1] == "stale" {
+                       wkr.logger.WithField("ContainerUUID", toks[0]).Info("probe reported stale run lock")
+                       staleRunLock = true
                }
        }
+       wkr.mtx.Lock()
+       defer wkr.mtx.Unlock()
+       if !staleRunLock {
+               wkr.staleRunLockSince = time.Time{}
+       } else if wkr.staleRunLockSince.IsZero() {
+               wkr.staleRunLockSince = time.Now()
+       } else if dur := time.Now().Sub(wkr.staleRunLockSince); dur > wkr.wp.timeoutStaleRunLock {
+               wkr.logger.WithField("Duration", dur).Warn("reporting broken after reporting stale run lock for too long")
+               reportsBroken = true
+       }
        return
 }
 
@@ -358,9 +460,46 @@ func (wkr *worker) probeBooted() (ok bool, stderr []byte) {
                return false, stderr
        }
        logger.Info("boot probe succeeded")
+       if err = wkr.wp.loadRunnerData(); err != nil {
+               wkr.logger.WithError(err).Warn("cannot boot worker: error loading runner binary")
+               return false, stderr
+       } else if len(wkr.wp.runnerData) == 0 {
+               // Assume crunch-run is already installed
+       } else if _, stderr2, err := wkr.copyRunnerData(); err != nil {
+               wkr.logger.WithError(err).WithField("stderr", string(stderr2)).Warn("error copying runner binary")
+               return false, stderr2
+       } else {
+               stderr = append(stderr, stderr2...)
+       }
        return true, stderr
 }
 
+func (wkr *worker) copyRunnerData() (stdout, stderr []byte, err error) {
+       hash := fmt.Sprintf("%x", wkr.wp.runnerMD5)
+       dstdir, _ := filepath.Split(wkr.wp.runnerCmd)
+       logger := wkr.logger.WithFields(logrus.Fields{
+               "hash": hash,
+               "path": wkr.wp.runnerCmd,
+       })
+
+       stdout, stderr, err = wkr.executor.Execute(nil, `md5sum `+wkr.wp.runnerCmd, nil)
+       if err == nil && len(stderr) == 0 && bytes.Equal(stdout, []byte(hash+"  "+wkr.wp.runnerCmd+"\n")) {
+               logger.Info("runner binary already exists on worker, with correct hash")
+               return
+       }
+
+       // Note touch+chmod come before writing data, to avoid the
+       // possibility of md5 being correct while file mode is
+       // incorrect.
+       cmd := `set -e; dstdir="` + dstdir + `"; dstfile="` + wkr.wp.runnerCmd + `"; mkdir -p "$dstdir"; touch "$dstfile"; chmod 0755 "$dstdir" "$dstfile"; cat >"$dstfile"`
+       if wkr.instance.RemoteUser() != "root" {
+               cmd = `sudo sh -c '` + strings.Replace(cmd, "'", "'\\''", -1) + `'`
+       }
+       logger.WithField("cmd", cmd).Info("installing runner binary on worker")
+       stdout, stderr, err = wkr.executor.Execute(nil, cmd, bytes.NewReader(wkr.wp.runnerData))
+       return
+}
+
 // caller must have lock.
 func (wkr *worker) shutdownIfBroken(dur time.Duration) bool {
        if wkr.idleBehavior == IdleBehaviorHold {
@@ -429,6 +568,7 @@ func (wkr *worker) shutdownIfIdle() bool {
                "IdleDuration": stats.Duration(time.Since(wkr.busy)),
                "IdleBehavior": wkr.idleBehavior,
        }).Info("shutdown worker")
+       wkr.reportBootOutcome(BootOutcomeAborted)
        wkr.shutdown()
        return true
 }