14807: Merge branch 'master'
[arvados.git] / lib / dispatchcloud / worker / worker.go
index baa56addeaab2cac4bbb0acb2a804d6828f7496a..9be9f41f43b7ef51cbb1d1257e4ac39f642472aa 100644 (file)
@@ -6,6 +6,7 @@ package worker
 
 import (
        "bytes"
+       "encoding/json"
        "fmt"
        "strings"
        "sync"
@@ -13,6 +14,7 @@ import (
 
        "git.curoverse.com/arvados.git/lib/cloud"
        "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "git.curoverse.com/arvados.git/sdk/go/stats"
        "github.com/sirupsen/logrus"
 )
 
@@ -96,7 +98,7 @@ func (wkr *worker) startContainer(ctr arvados.Container) {
                "ContainerUUID": ctr.UUID,
                "Priority":      ctr.Priority,
        })
-       logger = logger.WithField("Instance", wkr.instance)
+       logger = logger.WithField("Instance", wkr.instance.ID())
        logger.Debug("starting container")
        wkr.starting[ctr.UUID] = struct{}{}
        wkr.state = StateRunning
@@ -105,7 +107,19 @@ func (wkr *worker) startContainer(ctr arvados.Container) {
                        "ARVADOS_API_HOST":  wkr.wp.arvClient.APIHost,
                        "ARVADOS_API_TOKEN": wkr.wp.arvClient.AuthToken,
                }
-               stdout, stderr, err := wkr.executor.Execute(env, "crunch-run --detach '"+ctr.UUID+"'", nil)
+               if wkr.wp.arvClient.Insecure {
+                       env["ARVADOS_API_HOST_INSECURE"] = "1"
+               }
+               envJSON, err := json.Marshal(env)
+               if err != nil {
+                       panic(err)
+               }
+               stdin := bytes.NewBuffer(envJSON)
+               cmd := "crunch-run --detach --stdin-env '" + ctr.UUID + "'"
+               if u := wkr.instance.RemoteUser(); u != "root" {
+                       cmd = "sudo " + cmd
+               }
+               stdout, stderr, err := wkr.executor.Execute(nil, cmd, stdin)
                wkr.mtx.Lock()
                defer wkr.mtx.Unlock()
                now := time.Now()
@@ -172,7 +186,7 @@ func (wkr *worker) probeAndUpdate() {
                booted   bool
                ctrUUIDs []string
                ok       bool
-               stderr   []byte
+               stderr   []byte // from probeBooted
        )
 
        switch initialState {
@@ -185,6 +199,9 @@ func (wkr *worker) probeAndUpdate() {
                panic(fmt.Sprintf("unknown state %s", initialState))
        }
 
+       probeStart := time.Now()
+       logger := wkr.logger.WithField("ProbeStart", probeStart)
+
        if !booted {
                booted, stderr = wkr.probeBooted()
                if !booted {
@@ -193,14 +210,14 @@ func (wkr *worker) probeAndUpdate() {
                        wkr.mtx.Lock()
                        booted = wkr.state == StateRunning || wkr.state == StateIdle
                        wkr.mtx.Unlock()
-               } else {
-                       wkr.logger.Info("instance booted; will try probeRunning")
+               }
+               if booted {
+                       logger.Info("instance booted; will try probeRunning")
                }
        }
        if booted || wkr.state == StateUnknown {
-               ctrUUIDs, ok, stderr = wkr.probeRunning()
+               ctrUUIDs, ok = wkr.probeRunning()
        }
-       logger := wkr.logger.WithField("stderr", string(stderr))
        wkr.mtx.Lock()
        defer wkr.mtx.Unlock()
        if !ok || (!booted && len(ctrUUIDs) == 0 && len(wkr.running) == 0) {
@@ -209,20 +226,27 @@ func (wkr *worker) probeAndUpdate() {
                        // initiated during probe.
                        return
                }
-               dur := time.Since(wkr.probed)
-               logger := logger.WithFields(logrus.Fields{
-                       "Duration": dur,
-                       "State":    wkr.state,
-               })
-               if !booted {
-                       // While we're polling the VM to see if it's
-                       // finished booting, failures are not
-                       // noteworthy, so we log at Debug level.
-                       logger.Debug("new instance not responding")
-               } else {
-                       logger.Info("instance not responding")
+               // Using the start time of the probe as the timeout
+               // threshold ensures we always initiate at least one
+               // probe attempt after the boot/probe timeout expires
+               // (otherwise, a slow probe failure could cause us to
+               // shutdown an instance even though it did in fact
+               // boot/recover before the timeout expired).
+               dur := probeStart.Sub(wkr.probed)
+               if wkr.shutdownIfBroken(dur) {
+                       // stderr from failed run-probes will have
+                       // been logged already, but boot-probe
+                       // failures are normal so they are logged only
+                       // at Debug level. This is our chance to log
+                       // some evidence about why the node never
+                       // booted, even in non-debug mode.
+                       if !booted {
+                               logger.WithFields(logrus.Fields{
+                                       "Duration": dur,
+                                       "stderr":   string(stderr),
+                               }).Info("boot failed")
+                       }
                }
-               wkr.shutdownIfBroken(dur)
                return
        }
 
@@ -247,11 +271,21 @@ func (wkr *worker) probeAndUpdate() {
                // advantage of the non-busy state, though.
                wkr.busy = updateTime
        }
-       running := map[string]struct{}{}
        changed := false
+
+       // Build a new "running" map. Set changed=true if it differs
+       // from the existing map (wkr.running) to ensure the scheduler
+       // gets notified below.
+       running := map[string]struct{}{}
        for _, uuid := range ctrUUIDs {
                running[uuid] = struct{}{}
                if _, ok := wkr.running[uuid]; !ok {
+                       if _, ok := wkr.starting[uuid]; !ok {
+                               // We didn't start it -- it must have
+                               // been started by a previous
+                               // dispatcher process.
+                               logger.WithField("ContainerUUID", uuid).Info("crunch-run process detected")
+                       }
                        changed = true
                }
        }
@@ -262,20 +296,30 @@ func (wkr *worker) probeAndUpdate() {
                        changed = true
                }
        }
+
+       // Update state if this was the first successful boot-probe.
        if booted && (wkr.state == StateUnknown || wkr.state == StateBooting) {
                // Note: this will change again below if
                // len(wkr.starting)+len(wkr.running) > 0.
                wkr.state = StateIdle
                changed = true
-       } else if wkr.state == StateUnknown && len(running) != len(wkr.running) {
+       }
+
+       // If wkr.state and wkr.running aren't changing then there's
+       // no need to log anything, notify the scheduler, move state
+       // back and forth between idle/running, etc.
+       if !changed {
+               return
+       }
+
+       // Log whenever a run-probe reveals crunch-run processes
+       // appearing/disappearing before boot-probe succeeds.
+       if wkr.state == StateUnknown && len(running) != len(wkr.running) {
                logger.WithFields(logrus.Fields{
                        "RunningContainers": len(running),
                        "State":             wkr.state,
                }).Info("crunch-run probe succeeded, but boot probe is still failing")
        }
-       if !changed {
-               return
-       }
 
        wkr.running = running
        if wkr.state == StateIdle && len(wkr.starting)+len(wkr.running) > 0 {
@@ -293,8 +337,11 @@ func (wkr *worker) probeAndUpdate() {
        go wkr.wp.notify()
 }
 
-func (wkr *worker) probeRunning() (running []string, ok bool, stderr []byte) {
+func (wkr *worker) probeRunning() (running []string, ok bool) {
        cmd := "crunch-run --list"
+       if u := wkr.instance.RemoteUser(); u != "root" {
+               cmd = "sudo " + cmd
+       }
        stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
        if err != nil {
                wkr.logger.WithFields(logrus.Fields{
@@ -302,13 +349,13 @@ func (wkr *worker) probeRunning() (running []string, ok bool, stderr []byte) {
                        "stdout":  string(stdout),
                        "stderr":  string(stderr),
                }).WithError(err).Warn("probe failed")
-               return nil, false, stderr
+               return nil, false
        }
        stdout = bytes.TrimRight(stdout, "\n")
        if len(stdout) == 0 {
-               return nil, true, stderr
+               return nil, true
        }
-       return strings.Split(string(stdout), "\n"), true, stderr
+       return strings.Split(string(stdout), "\n"), true
 }
 
 func (wkr *worker) probeBooted() (ok bool, stderr []byte) {
@@ -331,16 +378,17 @@ func (wkr *worker) probeBooted() (ok bool, stderr []byte) {
 }
 
 // caller must have lock.
-func (wkr *worker) shutdownIfBroken(dur time.Duration) {
+func (wkr *worker) shutdownIfBroken(dur time.Duration) bool {
        if wkr.idleBehavior == IdleBehaviorHold {
-               return
+               // Never shut down.
+               return false
        }
        label, threshold := "", wkr.wp.timeoutProbe
        if wkr.state == StateUnknown || wkr.state == StateBooting {
                label, threshold = "new ", wkr.wp.timeoutBooting
        }
        if dur < threshold {
-               return
+               return false
        }
        wkr.logger.WithFields(logrus.Fields{
                "Duration": dur,
@@ -348,22 +396,28 @@ func (wkr *worker) shutdownIfBroken(dur time.Duration) {
                "State":    wkr.state,
        }).Warnf("%sinstance unresponsive, shutting down", label)
        wkr.shutdown()
+       return true
 }
 
 // caller must have lock.
 func (wkr *worker) shutdownIfIdle() bool {
        if wkr.idleBehavior == IdleBehaviorHold {
-               return false
-       }
-       if !(wkr.state == StateIdle || (wkr.state == StateBooting && wkr.idleBehavior == IdleBehaviorDrain)) {
+               // Never shut down.
                return false
        }
        age := time.Since(wkr.busy)
-       if wkr.idleBehavior != IdleBehaviorDrain && age < wkr.wp.timeoutIdle {
+
+       old := age >= wkr.wp.timeoutIdle
+       draining := wkr.idleBehavior == IdleBehaviorDrain
+       shouldShutdown := ((old || draining) && wkr.state == StateIdle) ||
+               (draining && wkr.state == StateBooting)
+       if !shouldShutdown {
                return false
        }
+
        wkr.logger.WithFields(logrus.Fields{
-               "Age":          age,
+               "State":        wkr.state,
+               "IdleDuration": stats.Duration(age),
                "IdleBehavior": wkr.idleBehavior,
        }).Info("shutdown idle worker")
        wkr.shutdown()
@@ -390,22 +444,24 @@ func (wkr *worker) shutdown() {
 // match. Caller must have lock.
 func (wkr *worker) saveTags() {
        instance := wkr.instance
-       have := instance.Tags()
-       want := cloud.InstanceTags{
+       tags := instance.Tags()
+       update := cloud.InstanceTags{
                tagKeyInstanceType: wkr.instType.Name,
                tagKeyIdleBehavior: string(wkr.idleBehavior),
        }
-       go func() {
-               for k, v := range want {
-                       if v == have[k] {
-                               continue
-                       }
-                       err := instance.SetTags(want)
+       save := false
+       for k, v := range update {
+               if tags[k] != v {
+                       tags[k] = v
+                       save = true
+               }
+       }
+       if save {
+               go func() {
+                       err := instance.SetTags(tags)
                        if err != nil {
-                               wkr.wp.logger.WithField("Instance", instance).WithError(err).Warnf("error updating tags")
+                               wkr.wp.logger.WithField("Instance", instance.ID()).WithError(err).Warnf("error updating tags")
                        }
-                       break
-
-               }
-       }()
+               }()
+       }
 }