// Copyright (C) The Arvados Authors. All rights reserved.
//
// SPDX-License-Identifier: AGPL-3.0

package worker

import (
	"bytes"
	"fmt"
	"path/filepath"
	"strings"
	"sync"
	"time"

	"git.arvados.org/arvados.git/lib/cloud"
	"git.arvados.org/arvados.git/sdk/go/arvados"
	"git.arvados.org/arvados.git/sdk/go/stats"
	"github.com/sirupsen/logrus"
)

const (
	// TODO: configurable
	maxPingFailTime = 10 * time.Minute
)

// State indicates whether a worker is available to do work, and (if
// not) whether/when it is expected to become ready.
type State int

const (
	StateUnknown  State = iota // might be running a container already
	StateBooting               // instance is booting
	StateIdle                  // instance booted, no containers are running
	StateRunning               // instance is running one or more containers
	StateShutdown              // worker has stopped monitoring the instance
)

var stateString = map[State]string{
	StateUnknown:  "unknown",
	StateBooting:  "booting",
	StateIdle:     "idle",
	StateRunning:  "running",
	StateShutdown: "shutdown",
}

// String implements fmt.Stringer.
func (s State) String() string {
	return stateString[s]
}

// MarshalText implements encoding.TextMarshaler so a JSON encoding of
// map[State]anything uses the state's string representation.
func (s State) MarshalText() ([]byte, error) {
	return []byte(stateString[s]), nil
}

// BootOutcome is the result of a worker boot. It is used as a label in a metric.
type BootOutcome string

const (
	BootOutcomeFailed      BootOutcome = "failure"
	BootOutcomeSucceeded   BootOutcome = "success"
	BootOutcomeAborted     BootOutcome = "aborted"
	BootOutcomeDisappeared BootOutcome = "disappeared"
)

var validBootOutcomes = map[BootOutcome]bool{
	BootOutcomeFailed:      true,
	BootOutcomeSucceeded:   true,
	BootOutcomeAborted:     true,
	BootOutcomeDisappeared: true,
}

// IdleBehavior indicates the behavior desired when a node becomes idle.
type IdleBehavior string

const (
	IdleBehaviorRun   IdleBehavior = "run"   // run containers, or shutdown on idle timeout
	IdleBehaviorHold  IdleBehavior = "hold"  // don't shutdown or run more containers
	IdleBehaviorDrain IdleBehavior = "drain" // shutdown immediately when idle
)

var validIdleBehavior = map[IdleBehavior]bool{
	IdleBehaviorRun:   true,
	IdleBehaviorHold:  true,
	IdleBehaviorDrain: true,
}

type worker struct {
	logger   logrus.FieldLogger
	executor Executor
	wp       *Pool

	mtx                 sync.Locker // must be wp's Locker.
	state               State
	idleBehavior        IdleBehavior
	instance            cloud.Instance
	instType            arvados.InstanceType
	vcpus               int64
	memory              int64
	appeared            time.Time
	probed              time.Time
	updated             time.Time
	busy                time.Time
	destroyed           time.Time
	lastUUID            string
	running             map[string]*remoteRunner // remember to update state idle<->running when this changes
	starting            map[string]*remoteRunner // remember to update state idle<->running when this changes
	probing             chan struct{}
	bootOutcomeReported bool
}

func (wkr *worker) onUnkillable(uuid string) {
	wkr.mtx.Lock()
	defer wkr.mtx.Unlock()
	logger := wkr.logger.WithField("ContainerUUID", uuid)
	if wkr.idleBehavior == IdleBehaviorHold {
		logger.Warn("unkillable container, but worker has IdleBehavior=Hold")
		return
	}
	logger.Warn("unkillable container, draining worker")
	wkr.setIdleBehavior(IdleBehaviorDrain)
}

func (wkr *worker) onKilled(uuid string) {
	wkr.mtx.Lock()
	defer wkr.mtx.Unlock()
	wkr.closeRunner(uuid)
	go wkr.wp.notify()
}

// caller must have lock.
func (wkr *worker) reportBootOutcome(outcome BootOutcome) {
	if wkr.bootOutcomeReported {
		return
	}
	if wkr.wp.mBootOutcomes != nil {
		wkr.wp.mBootOutcomes.WithLabelValues(string(outcome)).Inc()
	}
	wkr.bootOutcomeReported = true
}

// caller must have lock.
func (wkr *worker) setIdleBehavior(idleBehavior IdleBehavior) {
	wkr.logger.WithField("IdleBehavior", idleBehavior).Info("set idle behavior")
	wkr.idleBehavior = idleBehavior
	wkr.saveTags()
	wkr.shutdownIfIdle()
}

// caller must have lock.
func (wkr *worker) startContainer(ctr arvados.Container) {
	logger := wkr.logger.WithFields(logrus.Fields{
		"ContainerUUID": ctr.UUID,
		"Priority":      ctr.Priority,
	})
	logger.Debug("starting container")
	rr := newRemoteRunner(ctr.UUID, wkr)
	wkr.starting[ctr.UUID] = rr
	if wkr.state != StateRunning {
		wkr.state = StateRunning
		go wkr.wp.notify()
	}
	go func() {
		rr.Start()
		wkr.mtx.Lock()
		defer wkr.mtx.Unlock()
		now := time.Now()
		wkr.updated = now
		wkr.busy = now
		delete(wkr.starting, ctr.UUID)
		wkr.running[ctr.UUID] = rr
		wkr.lastUUID = ctr.UUID
	}()
}

// ProbeAndUpdate conducts appropriate boot/running probes (if any)
// for the worker's curent state. If a previous probe is still
// running, it does nothing.
//
// It should be called in a new goroutine.
func (wkr *worker) ProbeAndUpdate() {
	select {
	case wkr.probing <- struct{}{}:
		wkr.probeAndUpdate()
		<-wkr.probing
	default:
		wkr.logger.Debug("still waiting for last probe to finish")
	}
}

// probeAndUpdate calls probeBooted and/or probeRunning if needed, and
// updates state accordingly.
//
// In StateUnknown: Call both probeBooted and probeRunning.
// In StateBooting: Call probeBooted; if successful, call probeRunning.
// In StateRunning: Call probeRunning.
// In StateIdle: Call probeRunning.
// In StateShutdown: Do nothing.
//
// If both probes succeed, wkr.state changes to
// StateIdle/StateRunning.
//
// If probeRunning succeeds, wkr.running is updated. (This means
// wkr.running might be non-empty even in StateUnknown, if the boot
// probe failed.)
//
// probeAndUpdate should be called in a new goroutine.
func (wkr *worker) probeAndUpdate() {
	wkr.mtx.Lock()
	updated := wkr.updated
	initialState := wkr.state
	wkr.mtx.Unlock()

	var (
		booted   bool
		ctrUUIDs []string
		ok       bool
		stderr   []byte // from probeBooted
	)

	switch initialState {
	case StateShutdown:
		return
	case StateIdle, StateRunning:
		booted = true
	case StateUnknown, StateBooting:
	default:
		panic(fmt.Sprintf("unknown state %s", initialState))
	}

	probeStart := time.Now()
	logger := wkr.logger.WithField("ProbeStart", probeStart)

	if !booted {
		booted, stderr = wkr.probeBooted()
		if !booted {
			// Pretend this probe succeeded if another
			// concurrent attempt succeeded.
			wkr.mtx.Lock()
			booted = wkr.state == StateRunning || wkr.state == StateIdle
			wkr.mtx.Unlock()
		}
		if booted {
			logger.Info("instance booted; will try probeRunning")
		}
	}
	reportedBroken := false
	if booted || wkr.state == StateUnknown {
		ctrUUIDs, reportedBroken, ok = wkr.probeRunning()
	}
	wkr.mtx.Lock()
	defer wkr.mtx.Unlock()
	if reportedBroken && wkr.idleBehavior == IdleBehaviorRun {
		logger.Info("probe reported broken instance")
		wkr.reportBootOutcome(BootOutcomeFailed)
		wkr.setIdleBehavior(IdleBehaviorDrain)
	}
	if !ok || (!booted && len(ctrUUIDs) == 0 && len(wkr.running) == 0) {
		if wkr.state == StateShutdown && wkr.updated.After(updated) {
			// Skip the logging noise if shutdown was
			// initiated during probe.
			return
		}
		// Using the start time of the probe as the timeout
		// threshold ensures we always initiate at least one
		// probe attempt after the boot/probe timeout expires
		// (otherwise, a slow probe failure could cause us to
		// shutdown an instance even though it did in fact
		// boot/recover before the timeout expired).
		dur := probeStart.Sub(wkr.probed)
		if wkr.shutdownIfBroken(dur) {
			// stderr from failed run-probes will have
			// been logged already, but boot-probe
			// failures are normal so they are logged only
			// at Debug level. This is our chance to log
			// some evidence about why the node never
			// booted, even in non-debug mode.
			if !booted {
				wkr.reportBootOutcome(BootOutcomeFailed)
				logger.WithFields(logrus.Fields{
					"Duration": dur,
					"stderr":   string(stderr),
				}).Info("boot failed")
			}
		}
		return
	}

	updateTime := time.Now()
	wkr.probed = updateTime

	if updated != wkr.updated {
		// Worker was updated after the probe began, so
		// wkr.running might have a container UUID that was
		// not yet running when ctrUUIDs was generated. Leave
		// wkr.running alone and wait for the next probe to
		// catch up on any changes.
		return
	}

	if len(ctrUUIDs) > 0 {
		wkr.busy = updateTime
		wkr.lastUUID = ctrUUIDs[0]
	} else if len(wkr.running) > 0 {
		// Actual last-busy time was sometime between wkr.busy
		// and now. Now is the earliest opportunity to take
		// advantage of the non-busy state, though.
		wkr.busy = updateTime
	}

	changed := wkr.updateRunning(ctrUUIDs)

	// Update state if this was the first successful boot-probe.
	if booted && (wkr.state == StateUnknown || wkr.state == StateBooting) {
		// Note: this will change again below if
		// len(wkr.starting)+len(wkr.running) > 0.
		wkr.state = StateIdle
		changed = true
	}

	// If wkr.state and wkr.running aren't changing then there's
	// no need to log anything, notify the scheduler, move state
	// back and forth between idle/running, etc.
	if !changed {
		return
	}

	// Log whenever a run-probe reveals crunch-run processes
	// appearing/disappearing before boot-probe succeeds.
	if wkr.state == StateUnknown && changed {
		logger.WithFields(logrus.Fields{
			"RunningContainers": len(wkr.running),
			"State":             wkr.state,
		}).Info("crunch-run probe succeeded, but boot probe is still failing")
	}

	if wkr.state == StateIdle && len(wkr.starting)+len(wkr.running) > 0 {
		wkr.state = StateRunning
	} else if wkr.state == StateRunning && len(wkr.starting)+len(wkr.running) == 0 {
		wkr.state = StateIdle
	}
	wkr.updated = updateTime
	if booted && (initialState == StateUnknown || initialState == StateBooting) {
		wkr.reportBootOutcome(BootOutcomeSucceeded)
		logger.WithFields(logrus.Fields{
			"RunningContainers": len(wkr.running),
			"State":             wkr.state,
		}).Info("probes succeeded, instance is in service")
	}
	go wkr.wp.notify()
}

func (wkr *worker) probeRunning() (running []string, reportsBroken, ok bool) {
	cmd := wkr.wp.runnerCmd + " --list"
	if u := wkr.instance.RemoteUser(); u != "root" {
		cmd = "sudo " + cmd
	}
	stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
	if err != nil {
		wkr.logger.WithFields(logrus.Fields{
			"Command": cmd,
			"stdout":  string(stdout),
			"stderr":  string(stderr),
		}).WithError(err).Warn("probe failed")
		return
	}
	ok = true
	for _, s := range strings.Split(string(stdout), "\n") {
		if s == "broken" {
			reportsBroken = true
		} else if s != "" {
			running = append(running, s)
		}
	}
	return
}

func (wkr *worker) probeBooted() (ok bool, stderr []byte) {
	cmd := wkr.wp.bootProbeCommand
	if cmd == "" {
		cmd = "true"
	}
	stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
	logger := wkr.logger.WithFields(logrus.Fields{
		"Command": cmd,
		"stdout":  string(stdout),
		"stderr":  string(stderr),
	})
	if err != nil {
		logger.WithError(err).Debug("boot probe failed")
		return false, stderr
	}
	logger.Info("boot probe succeeded")
	if err = wkr.wp.loadRunnerData(); err != nil {
		wkr.logger.WithError(err).Warn("cannot boot worker: error loading runner binary")
		return false, stderr
	} else if len(wkr.wp.runnerData) == 0 {
		// Assume crunch-run is already installed
	} else if _, stderr2, err := wkr.copyRunnerData(); err != nil {
		wkr.logger.WithError(err).WithField("stderr", string(stderr2)).Warn("error copying runner binary")
		return false, stderr2
	} else {
		stderr = append(stderr, stderr2...)
	}
	return true, stderr
}

func (wkr *worker) copyRunnerData() (stdout, stderr []byte, err error) {
	hash := fmt.Sprintf("%x", wkr.wp.runnerMD5)
	dstdir, _ := filepath.Split(wkr.wp.runnerCmd)
	logger := wkr.logger.WithFields(logrus.Fields{
		"hash": hash,
		"path": wkr.wp.runnerCmd,
	})

	stdout, stderr, err = wkr.executor.Execute(nil, `md5sum `+wkr.wp.runnerCmd, nil)
	if err == nil && len(stderr) == 0 && bytes.Equal(stdout, []byte(hash+"  "+wkr.wp.runnerCmd+"\n")) {
		logger.Info("runner binary already exists on worker, with correct hash")
		return
	}

	// Note touch+chmod come before writing data, to avoid the
	// possibility of md5 being correct while file mode is
	// incorrect.
	cmd := `set -e; dstdir="` + dstdir + `"; dstfile="` + wkr.wp.runnerCmd + `"; mkdir -p "$dstdir"; touch "$dstfile"; chmod 0755 "$dstdir" "$dstfile"; cat >"$dstfile"`
	if wkr.instance.RemoteUser() != "root" {
		cmd = `sudo sh -c '` + strings.Replace(cmd, "'", "'\\''", -1) + `'`
	}
	logger.WithField("cmd", cmd).Info("installing runner binary on worker")
	stdout, stderr, err = wkr.executor.Execute(nil, cmd, bytes.NewReader(wkr.wp.runnerData))
	return
}

// caller must have lock.
func (wkr *worker) shutdownIfBroken(dur time.Duration) bool {
	if wkr.idleBehavior == IdleBehaviorHold {
		// Never shut down.
		return false
	}
	label, threshold := "", wkr.wp.timeoutProbe
	if wkr.state == StateUnknown || wkr.state == StateBooting {
		label, threshold = "new ", wkr.wp.timeoutBooting
	}
	if dur < threshold {
		return false
	}
	wkr.logger.WithFields(logrus.Fields{
		"Duration": dur,
		"Since":    wkr.probed,
		"State":    wkr.state,
	}).Warnf("%sinstance unresponsive, shutting down", label)
	wkr.shutdown()
	return true
}

// Returns true if the instance is eligible for shutdown: either it's
// been idle too long, or idleBehavior=Drain and nothing is running.
//
// caller must have lock.
func (wkr *worker) eligibleForShutdown() bool {
	if wkr.idleBehavior == IdleBehaviorHold {
		return false
	}
	draining := wkr.idleBehavior == IdleBehaviorDrain
	switch wkr.state {
	case StateBooting:
		return draining
	case StateIdle:
		return draining || time.Since(wkr.busy) >= wkr.wp.timeoutIdle
	case StateRunning:
		if !draining {
			return false
		}
		for _, rr := range wkr.running {
			if !rr.givenup {
				return false
			}
		}
		for _, rr := range wkr.starting {
			if !rr.givenup {
				return false
			}
		}
		// draining, and all remaining runners are just trying
		// to force-kill their crunch-run procs
		return true
	default:
		return false
	}
}

// caller must have lock.
func (wkr *worker) shutdownIfIdle() bool {
	if !wkr.eligibleForShutdown() {
		return false
	}
	wkr.logger.WithFields(logrus.Fields{
		"State":        wkr.state,
		"IdleDuration": stats.Duration(time.Since(wkr.busy)),
		"IdleBehavior": wkr.idleBehavior,
	}).Info("shutdown worker")
	wkr.reportBootOutcome(BootOutcomeAborted)
	wkr.shutdown()
	return true
}

// caller must have lock.
func (wkr *worker) shutdown() {
	now := time.Now()
	wkr.updated = now
	wkr.destroyed = now
	wkr.state = StateShutdown
	go wkr.wp.notify()
	go func() {
		err := wkr.instance.Destroy()
		if err != nil {
			wkr.logger.WithError(err).Warn("shutdown failed")
			return
		}
	}()
}

// Save worker tags to cloud provider metadata, if they don't already
// match. Caller must have lock.
func (wkr *worker) saveTags() {
	instance := wkr.instance
	tags := instance.Tags()
	update := cloud.InstanceTags{
		wkr.wp.tagKeyPrefix + tagKeyInstanceType: wkr.instType.Name,
		wkr.wp.tagKeyPrefix + tagKeyIdleBehavior: string(wkr.idleBehavior),
	}
	save := false
	for k, v := range update {
		if tags[k] != v {
			tags[k] = v
			save = true
		}
	}
	if save {
		go func() {
			err := instance.SetTags(tags)
			if err != nil {
				wkr.wp.logger.WithField("Instance", instance.ID()).WithError(err).Warnf("error updating tags")
			}
		}()
	}
}

func (wkr *worker) Close() {
	// This might take time, so do it after unlocking mtx.
	defer wkr.executor.Close()

	wkr.mtx.Lock()
	defer wkr.mtx.Unlock()
	for uuid, rr := range wkr.running {
		wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process abandoned")
		rr.Close()
	}
	for uuid, rr := range wkr.starting {
		wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process abandoned")
		rr.Close()
	}
}

// Add/remove entries in wkr.running to match ctrUUIDs returned by a
// probe. Returns true if anything was added or removed.
//
// Caller must have lock.
func (wkr *worker) updateRunning(ctrUUIDs []string) (changed bool) {
	alive := map[string]bool{}
	for _, uuid := range ctrUUIDs {
		alive[uuid] = true
		if _, ok := wkr.running[uuid]; ok {
			// unchanged
		} else if rr, ok := wkr.starting[uuid]; ok {
			wkr.running[uuid] = rr
			delete(wkr.starting, uuid)
			changed = true
		} else {
			// We didn't start it -- it must have been
			// started by a previous dispatcher process.
			wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process detected")
			wkr.running[uuid] = newRemoteRunner(uuid, wkr)
			changed = true
		}
	}
	for uuid := range wkr.running {
		if !alive[uuid] {
			wkr.closeRunner(uuid)
			changed = true
		}
	}
	return
}

// caller must have lock.
func (wkr *worker) closeRunner(uuid string) {
	rr := wkr.running[uuid]
	if rr == nil {
		return
	}
	wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process ended")
	delete(wkr.running, uuid)
	rr.Close()

	now := time.Now()
	wkr.updated = now
	wkr.wp.exited[uuid] = now
	if wkr.state == StateRunning && len(wkr.running)+len(wkr.starting) == 0 {
		wkr.state = StateIdle
	}
}