Allow multiple clusters to use loopback driver on same host.
[arvados.git] / lib / dispatchcloud / worker / worker.go
index f70e2099843fdb0fdb2f511f49b8ea58fb05c6e8..b01a820cd619b172538b725d689d0323897611d5 100644 (file)
@@ -6,14 +6,15 @@ package worker
 
 import (
        "bytes"
-       "encoding/json"
        "fmt"
+       "path/filepath"
        "strings"
        "sync"
        "time"
 
-       "git.curoverse.com/arvados.git/lib/cloud"
-       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "git.arvados.org/arvados.git/lib/cloud"
+       "git.arvados.org/arvados.git/sdk/go/arvados"
+       "git.arvados.org/arvados.git/sdk/go/stats"
        "github.com/sirupsen/logrus"
 )
 
@@ -53,6 +54,23 @@ func (s State) MarshalText() ([]byte, error) {
        return []byte(stateString[s]), nil
 }
 
+// BootOutcome is the result of a worker boot. It is used as a label in a metric.
+type BootOutcome string
+
+const (
+       BootOutcomeFailed      BootOutcome = "failure"
+       BootOutcomeSucceeded   BootOutcome = "success"
+       BootOutcomeAborted     BootOutcome = "aborted"
+       BootOutcomeDisappeared BootOutcome = "disappeared"
+)
+
+var validBootOutcomes = map[BootOutcome]bool{
+       BootOutcomeFailed:      true,
+       BootOutcomeSucceeded:   true,
+       BootOutcomeAborted:     true,
+       BootOutcomeDisappeared: true,
+}
+
 // IdleBehavior indicates the behavior desired when a node becomes idle.
 type IdleBehavior string
 
@@ -73,22 +91,75 @@ type worker struct {
        executor Executor
        wp       *Pool
 
-       mtx          sync.Locker // must be wp's Locker.
-       state        State
-       idleBehavior IdleBehavior
-       instance     cloud.Instance
-       instType     arvados.InstanceType
-       vcpus        int64
-       memory       int64
-       appeared     time.Time
-       probed       time.Time
-       updated      time.Time
-       busy         time.Time
-       destroyed    time.Time
-       lastUUID     string
-       running      map[string]struct{} // remember to update state idle<->running when this changes
-       starting     map[string]struct{} // remember to update state idle<->running when this changes
-       probing      chan struct{}
+       mtx                 sync.Locker // must be wp's Locker.
+       state               State
+       idleBehavior        IdleBehavior
+       instance            cloud.Instance
+       instType            arvados.InstanceType
+       vcpus               int64
+       memory              int64
+       appeared            time.Time
+       probed              time.Time
+       updated             time.Time
+       busy                time.Time
+       destroyed           time.Time
+       firstSSHConnection  time.Time
+       lastUUID            string
+       running             map[string]*remoteRunner // remember to update state idle<->running when this changes
+       starting            map[string]*remoteRunner // remember to update state idle<->running when this changes
+       probing             chan struct{}
+       bootOutcomeReported bool
+       timeToReadyReported bool
+       staleRunLockSince   time.Time
+}
+
+func (wkr *worker) onUnkillable(uuid string) {
+       wkr.mtx.Lock()
+       defer wkr.mtx.Unlock()
+       logger := wkr.logger.WithField("ContainerUUID", uuid)
+       if wkr.idleBehavior == IdleBehaviorHold {
+               logger.Warn("unkillable container, but worker has IdleBehavior=Hold")
+               return
+       }
+       logger.Warn("unkillable container, draining worker")
+       wkr.setIdleBehavior(IdleBehaviorDrain)
+}
+
+func (wkr *worker) onKilled(uuid string) {
+       wkr.mtx.Lock()
+       defer wkr.mtx.Unlock()
+       wkr.closeRunner(uuid)
+       go wkr.wp.notify()
+}
+
+// caller must have lock.
+func (wkr *worker) reportBootOutcome(outcome BootOutcome) {
+       if wkr.bootOutcomeReported {
+               return
+       }
+       if wkr.wp.mBootOutcomes != nil {
+               wkr.wp.mBootOutcomes.WithLabelValues(string(outcome)).Inc()
+       }
+       wkr.bootOutcomeReported = true
+}
+
+// caller must have lock.
+func (wkr *worker) reportTimeBetweenFirstSSHAndReadyForContainer() {
+       if wkr.timeToReadyReported {
+               return
+       }
+       if wkr.wp.mTimeToSSH != nil {
+               wkr.wp.mTimeToReadyForContainer.Observe(time.Since(wkr.firstSSHConnection).Seconds())
+       }
+       wkr.timeToReadyReported = true
+}
+
+// caller must have lock.
+func (wkr *worker) setIdleBehavior(idleBehavior IdleBehavior) {
+       wkr.logger.WithField("IdleBehavior", idleBehavior).Info("set idle behavior")
+       wkr.idleBehavior = idleBehavior
+       wkr.saveTags()
+       wkr.shutdownIfIdle()
 }
 
 // caller must have lock.
@@ -97,54 +168,31 @@ func (wkr *worker) startContainer(ctr arvados.Container) {
                "ContainerUUID": ctr.UUID,
                "Priority":      ctr.Priority,
        })
-       logger = logger.WithField("Instance", wkr.instance.ID())
        logger.Debug("starting container")
-       wkr.starting[ctr.UUID] = struct{}{}
-       wkr.state = StateRunning
+       rr := newRemoteRunner(ctr.UUID, wkr)
+       wkr.starting[ctr.UUID] = rr
+       if wkr.state != StateRunning {
+               wkr.state = StateRunning
+               go wkr.wp.notify()
+       }
        go func() {
-               env := map[string]string{
-                       "ARVADOS_API_HOST":  wkr.wp.arvClient.APIHost,
-                       "ARVADOS_API_TOKEN": wkr.wp.arvClient.AuthToken,
-               }
-               if wkr.wp.arvClient.Insecure {
-                       env["ARVADOS_API_HOST_INSECURE"] = "1"
-               }
-               envJSON, err := json.Marshal(env)
-               if err != nil {
-                       panic(err)
+               rr.Start()
+               if wkr.wp.mTimeFromQueueToCrunchRun != nil {
+                       wkr.wp.mTimeFromQueueToCrunchRun.Observe(time.Since(ctr.CreatedAt).Seconds())
                }
-               stdin := bytes.NewBuffer(envJSON)
-               cmd := "crunch-run --detach --stdin-env '" + ctr.UUID + "'"
-               if u := wkr.instance.RemoteUser(); u != "root" {
-                       cmd = "sudo " + cmd
-               }
-               stdout, stderr, err := wkr.executor.Execute(nil, cmd, stdin)
                wkr.mtx.Lock()
                defer wkr.mtx.Unlock()
                now := time.Now()
                wkr.updated = now
                wkr.busy = now
                delete(wkr.starting, ctr.UUID)
-               wkr.running[ctr.UUID] = struct{}{}
-               wkr.lastUUID = ctr.UUID
-               if err != nil {
-                       logger.WithField("stdout", string(stdout)).
-                               WithField("stderr", string(stderr)).
-                               WithError(err).
-                               Error("error starting crunch-run process")
-                       // Leave uuid in wkr.running, though: it's
-                       // possible the error was just a communication
-                       // failure and the process was in fact
-                       // started.  Wait for next probe to find out.
-                       return
-               }
-               logger.Info("crunch-run process started")
+               wkr.running[ctr.UUID] = rr
                wkr.lastUUID = ctr.UUID
        }()
 }
 
 // ProbeAndUpdate conducts appropriate boot/running probes (if any)
-// for the worker's curent state. If a previous probe is still
+// for the worker's current state. If a previous probe is still
 // running, it does nothing.
 //
 // It should be called in a new goroutine.
@@ -214,11 +262,17 @@ func (wkr *worker) probeAndUpdate() {
                        logger.Info("instance booted; will try probeRunning")
                }
        }
+       reportedBroken := false
        if booted || wkr.state == StateUnknown {
-               ctrUUIDs, ok = wkr.probeRunning()
+               ctrUUIDs, reportedBroken, ok = wkr.probeRunning()
        }
        wkr.mtx.Lock()
        defer wkr.mtx.Unlock()
+       if reportedBroken && wkr.idleBehavior == IdleBehaviorRun {
+               logger.Info("probe reported broken instance")
+               wkr.reportBootOutcome(BootOutcomeFailed)
+               wkr.setIdleBehavior(IdleBehaviorDrain)
+       }
        if !ok || (!booted && len(ctrUUIDs) == 0 && len(wkr.running) == 0) {
                if wkr.state == StateShutdown && wkr.updated.After(updated) {
                        // Skip the logging noise if shutdown was
@@ -240,6 +294,7 @@ func (wkr *worker) probeAndUpdate() {
                        // some evidence about why the node never
                        // booted, even in non-debug mode.
                        if !booted {
+                               wkr.reportBootOutcome(BootOutcomeFailed)
                                logger.WithFields(logrus.Fields{
                                        "Duration": dur,
                                        "stderr":   string(stderr),
@@ -258,6 +313,10 @@ func (wkr *worker) probeAndUpdate() {
                // not yet running when ctrUUIDs was generated. Leave
                // wkr.running alone and wait for the next probe to
                // catch up on any changes.
+               logger.WithFields(logrus.Fields{
+                       "updated":     updated,
+                       "wkr.updated": wkr.updated,
+               }).Debug("skipping worker state update due to probe/sync race")
                return
        }
 
@@ -270,34 +329,14 @@ func (wkr *worker) probeAndUpdate() {
                // advantage of the non-busy state, though.
                wkr.busy = updateTime
        }
-       changed := false
 
-       // Build a new "running" map. Set changed=true if it differs
-       // from the existing map (wkr.running) to ensure the scheduler
-       // gets notified below.
-       running := map[string]struct{}{}
-       for _, uuid := range ctrUUIDs {
-               running[uuid] = struct{}{}
-               if _, ok := wkr.running[uuid]; !ok {
-                       if _, ok := wkr.starting[uuid]; !ok {
-                               // We didn't start it -- it must have
-                               // been started by a previous
-                               // dispatcher process.
-                               logger.WithField("ContainerUUID", uuid).Info("crunch-run process detected")
-                       }
-                       changed = true
-               }
-       }
-       for uuid := range wkr.running {
-               if _, ok := running[uuid]; !ok {
-                       logger.WithField("ContainerUUID", uuid).Info("crunch-run process ended")
-                       wkr.wp.notifyExited(uuid, updateTime)
-                       changed = true
-               }
-       }
+       changed := wkr.updateRunning(ctrUUIDs)
 
        // Update state if this was the first successful boot-probe.
        if booted && (wkr.state == StateUnknown || wkr.state == StateBooting) {
+               if wkr.state == StateBooting {
+                       wkr.reportTimeBetweenFirstSSHAndReadyForContainer()
+               }
                // Note: this will change again below if
                // len(wkr.starting)+len(wkr.running) > 0.
                wkr.state = StateIdle
@@ -313,14 +352,13 @@ func (wkr *worker) probeAndUpdate() {
 
        // Log whenever a run-probe reveals crunch-run processes
        // appearing/disappearing before boot-probe succeeds.
-       if wkr.state == StateUnknown && len(running) != len(wkr.running) {
+       if wkr.state == StateUnknown && changed {
                logger.WithFields(logrus.Fields{
-                       "RunningContainers": len(running),
+                       "RunningContainers": len(wkr.running),
                        "State":             wkr.state,
                }).Info("crunch-run probe succeeded, but boot probe is still failing")
        }
 
-       wkr.running = running
        if wkr.state == StateIdle && len(wkr.starting)+len(wkr.running) > 0 {
                wkr.state = StateRunning
        } else if wkr.state == StateRunning && len(wkr.starting)+len(wkr.running) == 0 {
@@ -328,19 +366,21 @@ func (wkr *worker) probeAndUpdate() {
        }
        wkr.updated = updateTime
        if booted && (initialState == StateUnknown || initialState == StateBooting) {
+               wkr.reportBootOutcome(BootOutcomeSucceeded)
                logger.WithFields(logrus.Fields{
-                       "RunningContainers": len(running),
+                       "RunningContainers": len(wkr.running),
                        "State":             wkr.state,
                }).Info("probes succeeded, instance is in service")
        }
        go wkr.wp.notify()
 }
 
-func (wkr *worker) probeRunning() (running []string, ok bool) {
-       cmd := "crunch-run --list"
+func (wkr *worker) probeRunning() (running []string, reportsBroken, ok bool) {
+       cmd := wkr.wp.runnerCmd + " --list"
        if u := wkr.instance.RemoteUser(); u != "root" {
                cmd = "sudo " + cmd
        }
+       before := time.Now()
        stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
        if err != nil {
                wkr.logger.WithFields(logrus.Fields{
@@ -348,13 +388,60 @@ func (wkr *worker) probeRunning() (running []string, ok bool) {
                        "stdout":  string(stdout),
                        "stderr":  string(stderr),
                }).WithError(err).Warn("probe failed")
-               return nil, false
+               wkr.wp.mRunProbeDuration.WithLabelValues("fail").Observe(time.Now().Sub(before).Seconds())
+               return
        }
-       stdout = bytes.TrimRight(stdout, "\n")
-       if len(stdout) == 0 {
-               return nil, true
+       wkr.logger.WithFields(logrus.Fields{
+               "Command": cmd,
+               "stdout":  string(stdout),
+               "stderr":  string(stderr),
+       }).Debug("probe succeeded")
+       wkr.wp.mRunProbeDuration.WithLabelValues("success").Observe(time.Now().Sub(before).Seconds())
+       ok = true
+
+       staleRunLock := false
+       for _, s := range strings.Split(string(stdout), "\n") {
+               // Each line of the "crunch-run --list" output is one
+               // of the following:
+               //
+               // * a container UUID, indicating that processes
+               //   related to that container are currently running.
+               //   Optionally followed by " stale", indicating that
+               //   the crunch-run process itself has exited (the
+               //   remaining process is probably arv-mount).
+               //
+               // * the string "broken", indicating that the instance
+               //   appears incapable of starting containers.
+               //
+               // See ListProcesses() in lib/crunchrun/background.go.
+               if s == "" {
+                       // empty string following final newline
+               } else if s == "broken" {
+                       reportsBroken = true
+               } else if !strings.HasPrefix(s, wkr.wp.cluster.ClusterID) {
+                       // Ignore crunch-run processes that belong to
+                       // a different cluster (e.g., a single host
+                       // running multiple clusters with the loopback
+                       // driver)
+                       continue
+               } else if toks := strings.Split(s, " "); len(toks) == 1 {
+                       running = append(running, s)
+               } else if toks[1] == "stale" {
+                       wkr.logger.WithField("ContainerUUID", toks[0]).Info("probe reported stale run lock")
+                       staleRunLock = true
+               }
+       }
+       wkr.mtx.Lock()
+       defer wkr.mtx.Unlock()
+       if !staleRunLock {
+               wkr.staleRunLockSince = time.Time{}
+       } else if wkr.staleRunLockSince.IsZero() {
+               wkr.staleRunLockSince = time.Now()
+       } else if dur := time.Now().Sub(wkr.staleRunLockSince); dur > wkr.wp.timeoutStaleRunLock {
+               wkr.logger.WithField("Duration", dur).Warn("reporting broken after reporting stale run lock for too long")
+               reportsBroken = true
        }
-       return strings.Split(string(stdout), "\n"), true
+       return
 }
 
 func (wkr *worker) probeBooted() (ok bool, stderr []byte) {
@@ -373,9 +460,46 @@ func (wkr *worker) probeBooted() (ok bool, stderr []byte) {
                return false, stderr
        }
        logger.Info("boot probe succeeded")
+       if err = wkr.wp.loadRunnerData(); err != nil {
+               wkr.logger.WithError(err).Warn("cannot boot worker: error loading runner binary")
+               return false, stderr
+       } else if len(wkr.wp.runnerData) == 0 {
+               // Assume crunch-run is already installed
+       } else if _, stderr2, err := wkr.copyRunnerData(); err != nil {
+               wkr.logger.WithError(err).WithField("stderr", string(stderr2)).Warn("error copying runner binary")
+               return false, stderr2
+       } else {
+               stderr = append(stderr, stderr2...)
+       }
        return true, stderr
 }
 
+func (wkr *worker) copyRunnerData() (stdout, stderr []byte, err error) {
+       hash := fmt.Sprintf("%x", wkr.wp.runnerMD5)
+       dstdir, _ := filepath.Split(wkr.wp.runnerCmd)
+       logger := wkr.logger.WithFields(logrus.Fields{
+               "hash": hash,
+               "path": wkr.wp.runnerCmd,
+       })
+
+       stdout, stderr, err = wkr.executor.Execute(nil, `md5sum `+wkr.wp.runnerCmd, nil)
+       if err == nil && len(stderr) == 0 && bytes.Equal(stdout, []byte(hash+"  "+wkr.wp.runnerCmd+"\n")) {
+               logger.Info("runner binary already exists on worker, with correct hash")
+               return
+       }
+
+       // Note touch+chmod come before writing data, to avoid the
+       // possibility of md5 being correct while file mode is
+       // incorrect.
+       cmd := `set -e; dstdir="` + dstdir + `"; dstfile="` + wkr.wp.runnerCmd + `"; mkdir -p "$dstdir"; touch "$dstfile"; chmod 0755 "$dstdir" "$dstfile"; cat >"$dstfile"`
+       if wkr.instance.RemoteUser() != "root" {
+               cmd = `sudo sh -c '` + strings.Replace(cmd, "'", "'\\''", -1) + `'`
+       }
+       logger.WithField("cmd", cmd).Info("installing runner binary on worker")
+       stdout, stderr, err = wkr.executor.Execute(nil, cmd, bytes.NewReader(wkr.wp.runnerData))
+       return
+}
+
 // caller must have lock.
 func (wkr *worker) shutdownIfBroken(dur time.Duration) bool {
        if wkr.idleBehavior == IdleBehaviorHold {
@@ -398,27 +522,53 @@ func (wkr *worker) shutdownIfBroken(dur time.Duration) bool {
        return true
 }
 
+// Returns true if the instance is eligible for shutdown: either it's
+// been idle too long, or idleBehavior=Drain and nothing is running.
+//
 // caller must have lock.
-func (wkr *worker) shutdownIfIdle() bool {
+func (wkr *worker) eligibleForShutdown() bool {
        if wkr.idleBehavior == IdleBehaviorHold {
-               // Never shut down.
                return false
        }
-       age := time.Since(wkr.busy)
-
-       old := age >= wkr.wp.timeoutIdle
        draining := wkr.idleBehavior == IdleBehaviorDrain
-       shouldShutdown := ((old || draining) && wkr.state == StateIdle) ||
-               (draining && wkr.state == StateBooting)
-       if !shouldShutdown {
+       switch wkr.state {
+       case StateBooting:
+               return draining
+       case StateIdle:
+               return draining || time.Since(wkr.busy) >= wkr.wp.timeoutIdle
+       case StateRunning:
+               if !draining {
+                       return false
+               }
+               for _, rr := range wkr.running {
+                       if !rr.givenup {
+                               return false
+                       }
+               }
+               for _, rr := range wkr.starting {
+                       if !rr.givenup {
+                               return false
+                       }
+               }
+               // draining, and all remaining runners are just trying
+               // to force-kill their crunch-run procs
+               return true
+       default:
                return false
        }
+}
 
+// caller must have lock.
+func (wkr *worker) shutdownIfIdle() bool {
+       if !wkr.eligibleForShutdown() {
+               return false
+       }
        wkr.logger.WithFields(logrus.Fields{
                "State":        wkr.state,
-               "Age":          age,
+               "IdleDuration": stats.Duration(time.Since(wkr.busy)),
                "IdleBehavior": wkr.idleBehavior,
-       }).Info("shutdown idle worker")
+       }).Info("shutdown worker")
+       wkr.reportBootOutcome(BootOutcomeAborted)
        wkr.shutdown()
        return true
 }
@@ -445,8 +595,8 @@ func (wkr *worker) saveTags() {
        instance := wkr.instance
        tags := instance.Tags()
        update := cloud.InstanceTags{
-               tagKeyInstanceType: wkr.instType.Name,
-               tagKeyIdleBehavior: string(wkr.idleBehavior),
+               wkr.wp.tagKeyPrefix + tagKeyInstanceType: wkr.instType.Name,
+               wkr.wp.tagKeyPrefix + tagKeyIdleBehavior: string(wkr.idleBehavior),
        }
        save := false
        for k, v := range update {
@@ -464,3 +614,68 @@ func (wkr *worker) saveTags() {
                }()
        }
 }
+
+func (wkr *worker) Close() {
+       // This might take time, so do it after unlocking mtx.
+       defer wkr.executor.Close()
+
+       wkr.mtx.Lock()
+       defer wkr.mtx.Unlock()
+       for uuid, rr := range wkr.running {
+               wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process abandoned")
+               rr.Close()
+       }
+       for uuid, rr := range wkr.starting {
+               wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process abandoned")
+               rr.Close()
+       }
+}
+
+// Add/remove entries in wkr.running to match ctrUUIDs returned by a
+// probe. Returns true if anything was added or removed.
+//
+// Caller must have lock.
+func (wkr *worker) updateRunning(ctrUUIDs []string) (changed bool) {
+       alive := map[string]bool{}
+       for _, uuid := range ctrUUIDs {
+               alive[uuid] = true
+               if _, ok := wkr.running[uuid]; ok {
+                       // unchanged
+               } else if rr, ok := wkr.starting[uuid]; ok {
+                       wkr.running[uuid] = rr
+                       delete(wkr.starting, uuid)
+                       changed = true
+               } else {
+                       // We didn't start it -- it must have been
+                       // started by a previous dispatcher process.
+                       wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process detected")
+                       wkr.running[uuid] = newRemoteRunner(uuid, wkr)
+                       changed = true
+               }
+       }
+       for uuid := range wkr.running {
+               if !alive[uuid] {
+                       wkr.closeRunner(uuid)
+                       changed = true
+               }
+       }
+       return
+}
+
+// caller must have lock.
+func (wkr *worker) closeRunner(uuid string) {
+       rr := wkr.running[uuid]
+       if rr == nil {
+               return
+       }
+       wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process ended")
+       delete(wkr.running, uuid)
+       rr.Close()
+
+       now := time.Now()
+       wkr.updated = now
+       wkr.wp.exited[uuid] = now
+       if wkr.state == StateRunning && len(wkr.running)+len(wkr.starting) == 0 {
+               wkr.state = StateIdle
+       }
+}