// Copyright (C) The Arvados Authors. All rights reserved.
// SPDX-License-Identifier: AGPL-3.0

package worker

import (


const (
	// TODO: configurable
	maxPingFailTime = 10 * time.Minute

// State indicates whether a worker is available to do work, and (if
// not) whether/when it is expected to become ready.
type State int

const (
	StateUnknown  State = iota // might be running a container already
	StateBooting               // instance is booting
	StateIdle                  // instance booted, no containers are running
	StateRunning               // instance is running one or more containers
	StateShutdown              // worker has stopped monitoring the instance

var stateString = map[State]string{
	StateUnknown:  "unknown",
	StateBooting:  "booting",
	StateIdle:     "idle",
	StateRunning:  "running",
	StateShutdown: "shutdown",

// String implements fmt.Stringer.
func (s State) String() string {
	return stateString[s]

// MarshalText implements encoding.TextMarshaler so a JSON encoding of
// map[State]anything uses the state's string representation.
func (s State) MarshalText() ([]byte, error) {
	return []byte(stateString[s]), nil

// BootOutcome is the result of a worker boot. It is used as a label in a metric.
type BootOutcome string

const (
	BootOutcomeFailed      BootOutcome = "failure"
	BootOutcomeSucceeded   BootOutcome = "success"
	BootOutcomeAborted     BootOutcome = "aborted"
	BootOutcomeDisappeared BootOutcome = "disappeared"

var validBootOutcomes = map[BootOutcome]bool{
	BootOutcomeFailed:      true,
	BootOutcomeSucceeded:   true,
	BootOutcomeAborted:     true,
	BootOutcomeDisappeared: true,

// IdleBehavior indicates the behavior desired when a node becomes idle.
type IdleBehavior string

const (
	IdleBehaviorRun   IdleBehavior = "run"   // run containers, or shutdown on idle timeout
	IdleBehaviorHold  IdleBehavior = "hold"  // don't shutdown or run more containers
	IdleBehaviorDrain IdleBehavior = "drain" // shutdown immediately when idle

var validIdleBehavior = map[IdleBehavior]bool{
	IdleBehaviorRun:   true,
	IdleBehaviorHold:  true,
	IdleBehaviorDrain: true,

type worker struct {
	logger   logrus.FieldLogger
	executor Executor
	wp       *Pool

	mtx                 sync.Locker // must be wp's Locker.
	state               State
	idleBehavior        IdleBehavior
	instance            cloud.Instance
	instType            arvados.InstanceType
	vcpus               int64
	memory              int64
	appeared            time.Time
	probed              time.Time
	updated             time.Time
	busy                time.Time
	destroyed           time.Time
	firstSSHConnection  time.Time
	lastUUID            string
	running             map[string]*remoteRunner // remember to update state idle<->running when this changes
	starting            map[string]*remoteRunner // remember to update state idle<->running when this changes
	probing             chan struct{}
	bootOutcomeReported bool
	timeToReadyReported bool
	staleRunLockSince   time.Time

func (wkr *worker) onUnkillable(uuid string) {
	defer wkr.mtx.Unlock()
	logger := wkr.logger.WithField("ContainerUUID", uuid)
	if wkr.idleBehavior == IdleBehaviorHold {
		logger.Warn("unkillable container, but worker has IdleBehavior=Hold")
	logger.Warn("unkillable container, draining worker")

func (wkr *worker) onKilled(uuid string) {
	defer wkr.mtx.Unlock()
	go wkr.wp.notify()

// caller must have lock.
func (wkr *worker) reportBootOutcome(outcome BootOutcome) {
	if wkr.bootOutcomeReported {
	if wkr.wp.mBootOutcomes != nil {
	wkr.bootOutcomeReported = true

// caller must have lock.
func (wkr *worker) reportTimeBetweenFirstSSHAndReadyForContainer() {
	if wkr.timeToReadyReported {
	if wkr.wp.mTimeToSSH != nil {
	wkr.timeToReadyReported = true

// caller must have lock.
func (wkr *worker) setIdleBehavior(idleBehavior IdleBehavior) {
	wkr.logger.WithField("IdleBehavior", idleBehavior).Info("set idle behavior")
	wkr.idleBehavior = idleBehavior

// caller must have lock.
func (wkr *worker) startContainer(ctr arvados.Container) {
	logger := wkr.logger.WithFields(logrus.Fields{
		"ContainerUUID": ctr.UUID,
		"Priority":      ctr.Priority,
	logger.Debug("starting container")
	rr := newRemoteRunner(ctr.UUID, wkr)
	wkr.starting[ctr.UUID] = rr
	if wkr.state != StateRunning {
		wkr.state = StateRunning
		go wkr.wp.notify()
	go func() {
		if wkr.wp.mTimeFromQueueToCrunchRun != nil {
		defer wkr.mtx.Unlock()
		now := time.Now()
		wkr.updated = now
		wkr.busy = now
		delete(wkr.starting, ctr.UUID)
		wkr.running[ctr.UUID] = rr
		wkr.lastUUID = ctr.UUID

// ProbeAndUpdate conducts appropriate boot/running probes (if any)
// for the worker's current state. If a previous probe is still
// running, it does nothing.
// It should be called in a new goroutine.
func (wkr *worker) ProbeAndUpdate() {
	select {
	case wkr.probing <- struct{}{}:
		wkr.logger.Debug("still waiting for last probe to finish")

// probeAndUpdate calls probeBooted and/or probeRunning if needed, and
// updates state accordingly.
// In StateUnknown: Call both probeBooted and probeRunning.
// In StateBooting: Call probeBooted; if successful, call probeRunning.
// In StateRunning: Call probeRunning.
// In StateIdle: Call probeRunning.
// In StateShutdown: Do nothing.
// If both probes succeed, wkr.state changes to
// StateIdle/StateRunning.
// If probeRunning succeeds, wkr.running is updated. (This means
// wkr.running might be non-empty even in StateUnknown, if the boot
// probe failed.)
// probeAndUpdate should be called in a new goroutine.
func (wkr *worker) probeAndUpdate() {
	updated := wkr.updated
	initialState := wkr.state

	var (
		booted   bool
		ctrUUIDs []string
		ok       bool
		stderr   []byte // from probeBooted

	switch initialState {
	case StateShutdown:
	case StateIdle, StateRunning:
		booted = true
	case StateUnknown, StateBooting:
		panic(fmt.Sprintf("unknown state %s", initialState))

	probeStart := time.Now()
	logger := wkr.logger.WithField("ProbeStart", probeStart)

	if !booted {
		booted, stderr = wkr.probeBooted()
		if !booted {
			// Pretend this probe succeeded if another
			// concurrent attempt succeeded.
			booted = wkr.state == StateRunning || wkr.state == StateIdle
		if booted {
			logger.Info("instance booted; will try probeRunning")
	reportedBroken := false
	if booted || wkr.state == StateUnknown {
		ctrUUIDs, reportedBroken, ok = wkr.probeRunning()
	defer wkr.mtx.Unlock()
	if reportedBroken && wkr.idleBehavior == IdleBehaviorRun {
		logger.Info("probe reported broken instance")
	if !ok || (!booted && len(ctrUUIDs) == 0 && len(wkr.running) == 0) {
		if wkr.state == StateShutdown && wkr.updated.After(updated) {
			// Skip the logging noise if shutdown was
			// initiated during probe.
		// Using the start time of the probe as the timeout
		// threshold ensures we always initiate at least one
		// probe attempt after the boot/probe timeout expires
		// (otherwise, a slow probe failure could cause us to
		// shutdown an instance even though it did in fact
		// boot/recover before the timeout expired).
		dur := probeStart.Sub(wkr.probed)
		if wkr.shutdownIfBroken(dur) {
			// stderr from failed run-probes will have
			// been logged already, but boot-probe
			// failures are normal so they are logged only
			// at Debug level. This is our chance to log
			// some evidence about why the node never
			// booted, even in non-debug mode.
			if !booted {
					"Duration": dur,
					"stderr":   string(stderr),
				}).Info("boot failed")

	updateTime := time.Now()
	wkr.probed = updateTime

	if updated != wkr.updated {
		// Worker was updated after the probe began, so
		// wkr.running might have a container UUID that was
		// not yet running when ctrUUIDs was generated. Leave
		// wkr.running alone and wait for the next probe to
		// catch up on any changes.
			"updated":     updated,
			"wkr.updated": wkr.updated,
		}).Debug("skipping worker state update due to probe/sync race")

	if len(ctrUUIDs) > 0 {
		wkr.busy = updateTime
		wkr.lastUUID = ctrUUIDs[0]
	} else if len(wkr.running) > 0 {
		// Actual last-busy time was sometime between wkr.busy
		// and now. Now is the earliest opportunity to take
		// advantage of the non-busy state, though.
		wkr.busy = updateTime

	changed := wkr.updateRunning(ctrUUIDs)

	// Update state if this was the first successful boot-probe.
	if booted && (wkr.state == StateUnknown || wkr.state == StateBooting) {
		if wkr.state == StateBooting {
		// Note: this will change again below if
		// len(wkr.starting)+len(wkr.running) > 0.
		wkr.state = StateIdle
		changed = true

	// If wkr.state and wkr.running aren't changing then there's
	// no need to log anything, notify the scheduler, move state
	// back and forth between idle/running, etc.
	if !changed {

	// Log whenever a run-probe reveals crunch-run processes
	// appearing/disappearing before boot-probe succeeds.
	if wkr.state == StateUnknown && changed {
			"RunningContainers": len(wkr.running),
			"State":             wkr.state,
		}).Info("crunch-run probe succeeded, but boot probe is still failing")

	if wkr.state == StateIdle && len(wkr.starting)+len(wkr.running) > 0 {
		wkr.state = StateRunning
	} else if wkr.state == StateRunning && len(wkr.starting)+len(wkr.running) == 0 {
		wkr.state = StateIdle
	wkr.updated = updateTime
	if booted && (initialState == StateUnknown || initialState == StateBooting) {
			"RunningContainers": len(wkr.running),
			"State":             wkr.state,
		}).Info("probes succeeded, instance is in service")
	go wkr.wp.notify()

func (wkr *worker) probeRunning() (running []string, reportsBroken, ok bool) {
	cmd := wkr.wp.runnerCmd + " --list"
	if u := wkr.instance.RemoteUser(); u != "root" {
		cmd = "sudo " + cmd
	before := time.Now()
	stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
	if err != nil {
			"Command": cmd,
			"stdout":  string(stdout),
			"stderr":  string(stderr),
		}).WithError(err).Warn("probe failed")
		"Command": cmd,
		"stdout":  string(stdout),
		"stderr":  string(stderr),
	}).Debug("probe succeeded")
	ok = true

	staleRunLock := false
	for _, s := range strings.Split(string(stdout), "\n") {
		// Each line of the "crunch-run --list" output is one
		// of the following:
		// * a container UUID, indicating that processes
		//   related to that container are currently running.
		//   Optionally followed by " stale", indicating that
		//   the crunch-run process itself has exited (the
		//   remaining process is probably arv-mount).
		// * the string "broken", indicating that the instance
		//   appears incapable of starting containers.
		// See ListProcesses() in lib/crunchrun/background.go.
		if s == "" {
			// empty string following final newline
		} else if s == "broken" {
			reportsBroken = true
		} else if !strings.HasPrefix(s, wkr.wp.cluster.ClusterID) {
			// Ignore crunch-run processes that belong to
			// a different cluster (e.g., a single host
			// running multiple clusters with the loopback
			// driver)
		} else if toks := strings.Split(s, " "); len(toks) == 1 {
			running = append(running, s)
		} else if toks[1] == "stale" {
			wkr.logger.WithField("ContainerUUID", toks[0]).Info("probe reported stale run lock")
			staleRunLock = true
	defer wkr.mtx.Unlock()
	if !staleRunLock {
		wkr.staleRunLockSince = time.Time{}
	} else if wkr.staleRunLockSince.IsZero() {
		wkr.staleRunLockSince = time.Now()
	} else if dur := time.Now().Sub(wkr.staleRunLockSince); dur > wkr.wp.timeoutStaleRunLock {
		wkr.logger.WithField("Duration", dur).Warn("reporting broken after reporting stale run lock for too long")
		reportsBroken = true

func (wkr *worker) probeBooted() (ok bool, stderr []byte) {
	cmd := wkr.wp.bootProbeCommand
	if cmd == "" {
		cmd = "true"
	stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
	logger := wkr.logger.WithFields(logrus.Fields{
		"Command": cmd,
		"stdout":  string(stdout),
		"stderr":  string(stderr),
	if err != nil {
		logger.WithError(err).Debug("boot probe failed")
		return false, stderr
	logger.Info("boot probe succeeded")
	if err = wkr.wp.loadRunnerData(); err != nil {
		wkr.logger.WithError(err).Warn("cannot boot worker: error loading runner binary")
		return false, stderr
	} else if len(wkr.wp.runnerData) == 0 {
		// Assume crunch-run is already installed
	} else if _, stderr2, err := wkr.copyRunnerData(); err != nil {
		wkr.logger.WithError(err).WithField("stderr", string(stderr2)).Warn("error copying runner binary")
		return false, stderr2
	} else {
		stderr = append(stderr, stderr2...)
	return true, stderr

func (wkr *worker) copyRunnerData() (stdout, stderr []byte, err error) {
	hash := fmt.Sprintf("%x", wkr.wp.runnerMD5)
	dstdir, _ := filepath.Split(wkr.wp.runnerCmd)
	logger := wkr.logger.WithFields(logrus.Fields{
		"hash": hash,
		"path": wkr.wp.runnerCmd,

	stdout, stderr, err = wkr.executor.Execute(nil, `md5sum `+wkr.wp.runnerCmd, nil)
	if err == nil && len(stderr) == 0 && bytes.Equal(stdout, []byte(hash+"  "+wkr.wp.runnerCmd+"\n")) {
		logger.Info("runner binary already exists on worker, with correct hash")

	// Note touch+chmod come before writing data, to avoid the
	// possibility of md5 being correct while file mode is
	// incorrect.
	cmd := `set -e; dstdir="` + dstdir + `"; dstfile="` + wkr.wp.runnerCmd + `"; mkdir -p "$dstdir"; touch "$dstfile"; chmod 0755 "$dstdir" "$dstfile"; cat >"$dstfile"`
	if wkr.instance.RemoteUser() != "root" {
		cmd = `sudo sh -c '` + strings.Replace(cmd, "'", "'\\''", -1) + `'`
	logger.WithField("cmd", cmd).Info("installing runner binary on worker")
	stdout, stderr, err = wkr.executor.Execute(nil, cmd, bytes.NewReader(wkr.wp.runnerData))

// caller must have lock.
func (wkr *worker) shutdownIfBroken(dur time.Duration) bool {
	if wkr.idleBehavior == IdleBehaviorHold {
		// Never shut down.
		return false
	label, threshold := "", wkr.wp.timeoutProbe
	if wkr.state == StateUnknown || wkr.state == StateBooting {
		label, threshold = "new ", wkr.wp.timeoutBooting
	if dur < threshold {
		return false
		"Duration": dur,
		"Since":    wkr.probed,
		"State":    wkr.state,
	}).Warnf("%sinstance unresponsive, shutting down", label)
	return true

// Returns true if the instance is eligible for shutdown: either it's
// been idle too long, or idleBehavior=Drain and nothing is running.
// caller must have lock.
func (wkr *worker) eligibleForShutdown() bool {
	if wkr.idleBehavior == IdleBehaviorHold {
		return false
	draining := wkr.idleBehavior == IdleBehaviorDrain
	switch wkr.state {
	case StateBooting:
		return draining
	case StateIdle:
		return draining || time.Since(wkr.busy) >= wkr.wp.timeoutIdle
	case StateRunning:
		if !draining {
			return false
		for _, rr := range wkr.running {
			if !rr.givenup {
				return false
		for _, rr := range wkr.starting {
			if !rr.givenup {
				return false
		// draining, and all remaining runners are just trying
		// to force-kill their crunch-run procs
		return true
		return false

// caller must have lock.
func (wkr *worker) shutdownIfIdle() bool {
	if !wkr.eligibleForShutdown() {
		return false
		"State":        wkr.state,
		"IdleDuration": stats.Duration(time.Since(wkr.busy)),
		"IdleBehavior": wkr.idleBehavior,
	}).Info("shutdown worker")
	return true

// caller must have lock.
func (wkr *worker) shutdown() {
	now := time.Now()
	wkr.updated = now
	wkr.destroyed = now
	wkr.state = StateShutdown
	go wkr.wp.notify()
	go func() {
		err := wkr.instance.Destroy()
		if err != nil {
			wkr.logger.WithError(err).Warn("shutdown failed")

// Save worker tags to cloud provider metadata, if they don't already
// match. Caller must have lock.
func (wkr *worker) saveTags() {
	instance := wkr.instance
	tags := instance.Tags()
	update := cloud.InstanceTags{
		wkr.wp.tagKeyPrefix + tagKeyInstanceType: wkr.instType.Name,
		wkr.wp.tagKeyPrefix + tagKeyIdleBehavior: string(wkr.idleBehavior),
	save := false
	for k, v := range update {
		if tags[k] != v {
			tags[k] = v
			save = true
	if save {
		go func() {
			err := instance.SetTags(tags)
			if err != nil {
				wkr.wp.logger.WithField("Instance", instance.ID()).WithError(err).Warnf("error updating tags")

func (wkr *worker) Close() {
	// This might take time, so do it after unlocking mtx.
	defer wkr.executor.Close()

	defer wkr.mtx.Unlock()
	for uuid, rr := range wkr.running {
		wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process abandoned")
	for uuid, rr := range wkr.starting {
		wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process abandoned")

// Add/remove entries in wkr.running to match ctrUUIDs returned by a
// probe. Returns true if anything was added or removed.
// Caller must have lock.
func (wkr *worker) updateRunning(ctrUUIDs []string) (changed bool) {
	alive := map[string]bool{}
	for _, uuid := range ctrUUIDs {
		alive[uuid] = true
		if _, ok := wkr.running[uuid]; ok {
			// unchanged
		} else if rr, ok := wkr.starting[uuid]; ok {
			wkr.running[uuid] = rr
			delete(wkr.starting, uuid)
			changed = true
		} else {
			// We didn't start it -- it must have been
			// started by a previous dispatcher process.
			wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process detected")
			wkr.running[uuid] = newRemoteRunner(uuid, wkr)
			changed = true
	for uuid := range wkr.running {
		if !alive[uuid] {
			changed = true

// caller must have lock.
func (wkr *worker) closeRunner(uuid string) {
	rr := wkr.running[uuid]
	if rr == nil {
	wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process ended")
	delete(wkr.running, uuid)

	now := time.Now()
	wkr.updated = now
	wkr.wp.exited[uuid] = now
	if wkr.state == StateRunning && len(wkr.running)+len(wkr.starting) == 0 {
		wkr.state = StateIdle