package worker
import (
+ "bytes"
+ "strings"
+ "sync"
"time"
+
+ "git.curoverse.com/arvados.git/lib/cloud"
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+ "github.com/Sirupsen/logrus"
)
// State indicates whether a worker is available to do work, and (if
func (s State) MarshalText() ([]byte, error) {
return []byte(stateString[s]), nil
}
+
+type worker struct {
+ logger logrus.FieldLogger
+ executor Executor
+ wp *Pool
+
+ mtx sync.Locker // must be wp's Locker.
+ state State
+ instance cloud.Instance
+ instType arvados.InstanceType
+ vcpus int64
+ memory int64
+ probed time.Time
+ updated time.Time
+ busy time.Time
+ lastUUID string
+ running map[string]struct{} // remember to update state idle<->running when this changes
+ starting map[string]struct{} // remember to update state idle<->running when this changes
+ probing chan struct{}
+}
+
+// caller must have lock.
+func (wkr *worker) startContainer(ctr arvados.Container) {
+ logger := wkr.logger.WithFields(logrus.Fields{
+ "ContainerUUID": ctr.UUID,
+ "Priority": ctr.Priority,
+ })
+ logger = logger.WithField("Instance", wkr.instance)
+ logger.Debug("starting container")
+ wkr.starting[ctr.UUID] = struct{}{}
+ wkr.state = StateRunning
+ go func() {
+ stdout, stderr, err := wkr.executor.Execute("crunch-run --detach '"+ctr.UUID+"'", nil)
+ wkr.mtx.Lock()
+ defer wkr.mtx.Unlock()
+ now := time.Now()
+ wkr.updated = now
+ wkr.busy = now
+ delete(wkr.starting, ctr.UUID)
+ wkr.running[ctr.UUID] = struct{}{}
+ wkr.lastUUID = ctr.UUID
+ if err != nil {
+ logger.WithField("stdout", string(stdout)).
+ WithField("stderr", string(stderr)).
+ WithError(err).
+ Error("error starting crunch-run process")
+ // Leave uuid in wkr.running, though: it's
+ // possible the error was just a communication
+ // failure and the process was in fact
+ // started. Wait for next probe to find out.
+ return
+ }
+ logger.Info("crunch-run process started")
+ wkr.lastUUID = ctr.UUID
+ }()
+}
+
+// ProbeAndUpdate conducts appropriate boot/running probes (if any)
+// for the worker's curent state. If a previous probe is still
+// running, it does nothing.
+//
+// It should be called in a new goroutine.
+func (wkr *worker) ProbeAndUpdate() {
+ select {
+ case wkr.probing <- struct{}{}:
+ wkr.probeAndUpdate()
+ <-wkr.probing
+ default:
+ wkr.logger.Debug("still waiting for last probe to finish")
+ }
+}
+
+// should be called in a new goroutine
+func (wkr *worker) probeAndUpdate() {
+ wkr.mtx.Lock()
+ updated := wkr.updated
+ needProbeRunning := wkr.state == StateRunning || wkr.state == StateIdle
+ needProbeBooted := wkr.state == StateUnknown || wkr.state == StateBooting
+ wkr.mtx.Unlock()
+ if !needProbeBooted && !needProbeRunning {
+ return
+ }
+
+ var (
+ ctrUUIDs []string
+ ok bool
+ stderr []byte
+ )
+ if needProbeBooted {
+ ok, stderr = wkr.probeBooted()
+ wkr.mtx.Lock()
+ if ok || wkr.state == StateRunning || wkr.state == StateIdle {
+ wkr.logger.Info("instance booted; will try probeRunning")
+ needProbeRunning = true
+ }
+ wkr.mtx.Unlock()
+ }
+ if needProbeRunning {
+ ctrUUIDs, ok, stderr = wkr.probeRunning()
+ }
+ logger := wkr.logger.WithField("stderr", string(stderr))
+ wkr.mtx.Lock()
+ defer wkr.mtx.Unlock()
+ if !ok {
+ if wkr.state == StateShutdown && wkr.updated.After(updated) {
+ // Skip the logging noise if shutdown was
+ // initiated during probe.
+ return
+ }
+ dur := time.Since(wkr.probed)
+ logger := logger.WithFields(logrus.Fields{
+ "Duration": dur,
+ "State": wkr.state,
+ })
+ if wkr.state == StateBooting {
+ logger.Debug("new instance not responding")
+ } else {
+ logger.Info("instance not responding")
+ }
+ wkr.shutdownIfBroken(dur)
+ return
+ }
+
+ updateTime := time.Now()
+ wkr.probed = updateTime
+
+ if updated != wkr.updated {
+ // Worker was updated after the probe began, so
+ // wkr.running might have a container UUID that was
+ // not yet running when ctrUUIDs was generated. Leave
+ // wkr.running alone and wait for the next probe to
+ // catch up on any changes.
+ return
+ }
+
+ if len(ctrUUIDs) > 0 {
+ wkr.busy = updateTime
+ wkr.lastUUID = ctrUUIDs[0]
+ } else if len(wkr.running) > 0 {
+ // Actual last-busy time was sometime between wkr.busy
+ // and now. Now is the earliest opportunity to take
+ // advantage of the non-busy state, though.
+ wkr.busy = updateTime
+ }
+ running := map[string]struct{}{}
+ changed := false
+ for _, uuid := range ctrUUIDs {
+ running[uuid] = struct{}{}
+ if _, ok := wkr.running[uuid]; !ok {
+ changed = true
+ }
+ }
+ for uuid := range wkr.running {
+ if _, ok := running[uuid]; !ok {
+ logger.WithField("ContainerUUID", uuid).Info("crunch-run process ended")
+ wkr.wp.notifyExited(uuid, updateTime)
+ changed = true
+ }
+ }
+ if wkr.state == StateUnknown || wkr.state == StateBooting {
+ wkr.state = StateIdle
+ changed = true
+ }
+ if changed {
+ wkr.running = running
+ if wkr.state == StateIdle && len(wkr.starting)+len(wkr.running) > 0 {
+ wkr.state = StateRunning
+ } else if wkr.state == StateRunning && len(wkr.starting)+len(wkr.running) == 0 {
+ wkr.state = StateIdle
+ }
+ wkr.updated = updateTime
+ go wkr.wp.notify()
+ }
+}
+
+func (wkr *worker) probeRunning() (running []string, ok bool, stderr []byte) {
+ cmd := "crunch-run --list"
+ stdout, stderr, err := wkr.executor.Execute(cmd, nil)
+ if err != nil {
+ wkr.logger.WithFields(logrus.Fields{
+ "Command": cmd,
+ "stdout": string(stdout),
+ "stderr": string(stderr),
+ }).WithError(err).Warn("probe failed")
+ return nil, false, stderr
+ }
+ stdout = bytes.TrimRight(stdout, "\n")
+ if len(stdout) == 0 {
+ return nil, true, stderr
+ }
+ return strings.Split(string(stdout), "\n"), true, stderr
+}
+
+func (wkr *worker) probeBooted() (ok bool, stderr []byte) {
+ cmd := wkr.wp.bootProbeCommand
+ if cmd == "" {
+ cmd = "true"
+ }
+ stdout, stderr, err := wkr.executor.Execute(cmd, nil)
+ logger := wkr.logger.WithFields(logrus.Fields{
+ "Command": cmd,
+ "stdout": string(stdout),
+ "stderr": string(stderr),
+ })
+ if err != nil {
+ logger.WithError(err).Debug("boot probe failed")
+ return false, stderr
+ }
+ logger.Info("boot probe succeeded")
+ return true, stderr
+}
+
+// caller must have lock.
+func (wkr *worker) shutdownIfBroken(dur time.Duration) {
+ if wkr.state == StateHold {
+ return
+ }
+ label, threshold := "", wkr.wp.timeoutProbe
+ if wkr.state == StateBooting {
+ label, threshold = "new ", wkr.wp.timeoutBooting
+ }
+ if dur < threshold {
+ return
+ }
+ wkr.logger.WithFields(logrus.Fields{
+ "Duration": dur,
+ "Since": wkr.probed,
+ "State": wkr.state,
+ }).Warnf("%sinstance unresponsive, shutting down", label)
+ wkr.shutdown()
+}
+
+// caller must have lock.
+func (wkr *worker) shutdownIfIdle() bool {
+ if wkr.state != StateIdle {
+ return false
+ }
+ age := time.Since(wkr.busy)
+ if age < wkr.wp.timeoutIdle {
+ return false
+ }
+ wkr.logger.WithField("Age", age).Info("shutdown idle worker")
+ wkr.shutdown()
+ return true
+}
+
+// caller must have lock
+func (wkr *worker) shutdown() {
+ wkr.updated = time.Now()
+ wkr.state = StateShutdown
+ go func() {
+ err := wkr.instance.Destroy()
+ if err != nil {
+ wkr.logger.WithError(err).Warn("shutdown failed")
+ return
+ }
+ }()
+}