package worker
import (
- "bytes"
+ "crypto/rand"
+ "errors"
+ "fmt"
"io"
"sort"
"strings"
"git.curoverse.com/arvados.git/lib/cloud"
"git.curoverse.com/arvados.git/sdk/go/arvados"
- "github.com/Sirupsen/logrus"
"github.com/prometheus/client_golang/prometheus"
+ "github.com/sirupsen/logrus"
+ "golang.org/x/crypto/ssh"
)
const (
- tagKeyInstanceType = "InstanceType"
- tagKeyHold = "Hold"
+ tagKeyInstanceType = "InstanceType"
+ tagKeyIdleBehavior = "IdleBehavior"
+ tagKeyInstanceSecret = "InstanceSecret"
)
// An InstanceView shows a worker's current state and recent activity.
type InstanceView struct {
- Instance string
- Price float64
- ArvadosInstanceType string
- ProviderInstanceType string
- LastContainerUUID string
- LastBusy time.Time
- WorkerState string
+ Instance cloud.InstanceID `json:"instance"`
+ Address string `json:"address"`
+ Price float64 `json:"price"`
+ ArvadosInstanceType string `json:"arvados_instance_type"`
+ ProviderInstanceType string `json:"provider_instance_type"`
+ LastContainerUUID string `json:"last_container_uuid"`
+ LastBusy time.Time `json:"last_busy"`
+ WorkerState string `json:"worker_state"`
+ IdleBehavior IdleBehavior `json:"idle_behavior"`
}
// An Executor executes shell commands on a remote host.
type Executor interface {
// Run cmd on the current target.
- Execute(cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
+ Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
// Use the given target for subsequent operations. The new
// target is the same host as the previous target, but it
defaultTimeoutIdle = time.Minute
defaultTimeoutBooting = time.Minute * 10
defaultTimeoutProbe = time.Minute * 10
+ defaultTimeoutShutdown = time.Second * 10
+ defaultTimeoutTERM = time.Minute * 2
+ defaultTimeoutSignal = time.Second * 5
+
+ // Time after a quota error to try again anyway, even if no
+ // instances have been shutdown.
+ quotaErrorTTL = time.Minute
+
+ // Time between "X failed because rate limiting" messages
+ logRateLimitErrorInterval = time.Second * 10
)
func duration(conf arvados.Duration, def time.Duration) time.Duration {
//
// New instances are configured and set up according to the given
// cluster configuration.
-func NewPool(logger logrus.FieldLogger, reg *prometheus.Registry, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, cluster *arvados.Cluster) *Pool {
+func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, installPublicKey ssh.PublicKey, cluster *arvados.Cluster) *Pool {
wp := &Pool{
logger: logger,
- instanceSet: instanceSet,
+ arvClient: arvClient,
+ instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
newExecutor: newExecutor,
bootProbeCommand: cluster.CloudVMs.BootProbeCommand,
imageID: cloud.ImageID(cluster.CloudVMs.ImageID),
timeoutIdle: duration(cluster.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
timeoutBooting: duration(cluster.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
timeoutProbe: duration(cluster.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
+ timeoutShutdown: duration(cluster.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
+ timeoutTERM: duration(cluster.Dispatch.TimeoutTERM, defaultTimeoutTERM),
+ timeoutSignal: duration(cluster.Dispatch.TimeoutSignal, defaultTimeoutSignal),
+ installPublicKey: installPublicKey,
+ stop: make(chan bool),
}
wp.registerMetrics(reg)
go func() {
type Pool struct {
// configuration
logger logrus.FieldLogger
- instanceSet cloud.InstanceSet
+ arvClient *arvados.Client
+ instanceSet *throttledInstanceSet
newExecutor func(cloud.Instance) Executor
bootProbeCommand string
imageID cloud.ImageID
timeoutIdle time.Duration
timeoutBooting time.Duration
timeoutProbe time.Duration
+ timeoutShutdown time.Duration
+ timeoutTERM time.Duration
+ timeoutSignal time.Duration
+ installPublicKey ssh.PublicKey
// private state
subscribers map[<-chan struct{}]chan<- struct{}
- creating map[arvados.InstanceType]int // goroutines waiting for (InstanceSet)Create to return
+ creating map[string]createCall // unfinished (cloud.InstanceSet)Create calls (key is instance secret)
workers map[cloud.InstanceID]*worker
loaded bool // loaded list of instances from InstanceSet at least once
exited map[string]time.Time // containers whose crunch-run proc has exited, but KillContainer has not been called
atQuotaUntil time.Time
+ atQuotaErr cloud.QuotaError
stop chan bool
mtx sync.RWMutex
setupOnce sync.Once
- mInstances prometheus.Gauge
+ throttleCreate throttle
+ throttleInstances throttle
+
mContainersRunning prometheus.Gauge
- mVCPUs prometheus.Gauge
- mVCPUsInuse prometheus.Gauge
- mMemory prometheus.Gauge
- mMemoryInuse prometheus.Gauge
-}
-
-type worker struct {
- state State
- instance cloud.Instance
- executor Executor
- instType arvados.InstanceType
- vcpus int64
- memory int64
- probed time.Time
- updated time.Time
- busy time.Time
- lastUUID string
- running map[string]struct{}
- starting map[string]struct{}
- probing chan struct{}
-}
-
-// Subscribe returns a channel that becomes ready whenever a worker's
-// state changes.
+ mInstances *prometheus.GaugeVec
+ mInstancesPrice *prometheus.GaugeVec
+ mVCPUs *prometheus.GaugeVec
+ mMemory *prometheus.GaugeVec
+}
+
+type createCall struct {
+ time time.Time
+ instanceType arvados.InstanceType
+}
+
+// Subscribe returns a buffered channel that becomes ready after any
+// change to the pool's state that could have scheduling implications:
+// a worker's state changes, a new worker appears, the cloud
+// provider's API rate limiting period ends, etc.
+//
+// Additional events that occur while the channel is already ready
+// will be dropped, so it is OK if the caller services the channel
+// slowly.
//
// Example:
//
// ch := wp.Subscribe()
// defer wp.Unsubscribe(ch)
// for range ch {
-// // ...try scheduling some work...
+// tryScheduling(wp)
// if done {
// break
// }
}
// Unallocated returns the number of unallocated (creating + booting +
-// idle + unknown) workers for each instance type.
+// idle + unknown) workers for each instance type. Workers in
+// hold/drain mode are not included.
func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
wp.setupOnce.Do(wp.setup)
wp.mtx.RLock()
defer wp.mtx.RUnlock()
- u := map[arvados.InstanceType]int{}
- for it, c := range wp.creating {
- u[it] = c
+ unalloc := map[arvados.InstanceType]int{}
+ creating := map[arvados.InstanceType]int{}
+ oldestCreate := map[arvados.InstanceType]time.Time{}
+ for _, cc := range wp.creating {
+ it := cc.instanceType
+ creating[it]++
+ if t, ok := oldestCreate[it]; !ok || t.After(cc.time) {
+ oldestCreate[it] = cc.time
+ }
}
for _, wkr := range wp.workers {
- if len(wkr.running)+len(wkr.starting) == 0 && (wkr.state == StateRunning || wkr.state == StateBooting || wkr.state == StateUnknown) {
- u[wkr.instType]++
+ // Skip workers that are not expected to become
+ // available soon. Note len(wkr.running)>0 is not
+ // redundant here: it can be true even in
+ // StateUnknown.
+ if wkr.state == StateShutdown ||
+ wkr.state == StateRunning ||
+ wkr.idleBehavior != IdleBehaviorRun ||
+ len(wkr.running) > 0 {
+ continue
+ }
+ it := wkr.instType
+ unalloc[it]++
+ if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(oldestCreate[it]) {
+ // If up to N new workers appear in
+ // Instances() while we are waiting for N
+ // Create() calls to complete, we assume we're
+ // just seeing a race between Instances() and
+ // Create() responses.
+ //
+ // The other common reason why nodes have
+ // state==Unknown is that they appeared at
+ // startup, before any Create calls. They
+ // don't match the above timing condition, so
+ // we never mistakenly attribute them to
+ // pending Create calls.
+ creating[it]--
}
}
- return u
+ for it, c := range creating {
+ unalloc[it] += c
+ }
+ return unalloc
}
// Create a new instance with the given type, and add it to the worker
// pool. The worker is added immediately; instance creation runs in
// the background.
-func (wp *Pool) Create(it arvados.InstanceType) error {
+//
+// Create returns false if a pre-existing error state prevents it from
+// even attempting to create a new instance. Those errors are logged
+// by the Pool, so the caller does not need to log anything in such
+// cases.
+func (wp *Pool) Create(it arvados.InstanceType) bool {
logger := wp.logger.WithField("InstanceType", it.Name)
wp.setupOnce.Do(wp.setup)
wp.mtx.Lock()
defer wp.mtx.Unlock()
- tags := cloud.InstanceTags{tagKeyInstanceType: it.Name}
- wp.creating[it]++
+ if time.Now().Before(wp.atQuotaUntil) || wp.throttleCreate.Error() != nil {
+ return false
+ }
+ now := time.Now()
+ secret := randomHex(instanceSecretLength)
+ wp.creating[secret] = createCall{time: now, instanceType: it}
go func() {
defer wp.notify()
- inst, err := wp.instanceSet.Create(it, wp.imageID, tags, nil)
+ tags := cloud.InstanceTags{
+ tagKeyInstanceType: it.Name,
+ tagKeyIdleBehavior: string(IdleBehaviorRun),
+ tagKeyInstanceSecret: secret,
+ }
+ initCmd := cloud.InitCommand(fmt.Sprintf("umask 0177 && echo -n %q >%s", secret, instanceSecretFilename))
+ inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey)
wp.mtx.Lock()
defer wp.mtx.Unlock()
- wp.creating[it]--
- if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
- wp.atQuotaUntil = time.Now().Add(time.Minute)
- }
+ // delete() is deferred so the updateWorker() call
+ // below knows to use StateBooting when adding a new
+ // worker.
+ defer delete(wp.creating, secret)
if err != nil {
+ if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
+ wp.atQuotaErr = err
+ wp.atQuotaUntil = time.Now().Add(quotaErrorTTL)
+ time.AfterFunc(quotaErrorTTL, wp.notify)
+ }
logger.WithError(err).Error("create failed")
+ wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify)
return
}
- wp.updateWorker(inst, it, StateBooting)
+ wp.updateWorker(inst, it)
}()
- return nil
+ return true
}
// AtQuota returns true if Create is not expected to work at the
return time.Now().Before(wp.atQuotaUntil)
}
-// Add or update worker attached to the given instance. Use
-// initialState if a new worker is created. Caller must have lock.
+// SetIdleBehavior determines how the indicated instance will behave
+// when it has no containers running.
+func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior) error {
+ wp.mtx.Lock()
+ defer wp.mtx.Unlock()
+ wkr, ok := wp.workers[id]
+ if !ok {
+ return errors.New("requested instance does not exist")
+ }
+ wkr.setIdleBehavior(idleBehavior)
+ return nil
+}
+
+// Add or update worker attached to the given instance.
//
-// Returns true when a new worker is created.
-func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initialState State) bool {
+// The second return value is true if a new worker is created.
+//
+// A newly added instance has state=StateBooting if its tags match an
+// entry in wp.creating, otherwise StateUnknown.
+//
+// Caller must have lock.
+func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*worker, bool) {
+ inst = tagVerifier{inst}
id := inst.ID()
- if wp.workers[id] != nil {
- wp.workers[id].executor.SetTarget(inst)
- wp.workers[id].instance = inst
- wp.workers[id].updated = time.Now()
- if initialState == StateBooting && wp.workers[id].state == StateUnknown {
- wp.workers[id].state = StateBooting
- }
- return false
+ if wkr := wp.workers[id]; wkr != nil {
+ wkr.executor.SetTarget(inst)
+ wkr.instance = inst
+ wkr.updated = time.Now()
+ wkr.saveTags()
+ return wkr, false
}
- if initialState == StateUnknown && inst.Tags()[tagKeyHold] != "" {
- initialState = StateHold
+
+ state := StateUnknown
+ if _, ok := wp.creating[inst.Tags()[tagKeyInstanceSecret]]; ok {
+ state = StateBooting
+ }
+
+ // If an instance has a valid IdleBehavior tag when it first
+ // appears, initialize the new worker accordingly (this is how
+ // we restore IdleBehavior that was set by a prior dispatch
+ // process); otherwise, default to "run". After this,
+ // wkr.idleBehavior is the source of truth, and will only be
+ // changed via SetIdleBehavior().
+ idleBehavior := IdleBehavior(inst.Tags()[tagKeyIdleBehavior])
+ if !validIdleBehavior[idleBehavior] {
+ idleBehavior = IdleBehaviorRun
}
- wp.logger.WithFields(logrus.Fields{
+
+ logger := wp.logger.WithFields(logrus.Fields{
"InstanceType": it.Name,
- "Instance": inst,
- "State": initialState,
+ "Instance": inst.ID(),
+ "Address": inst.Address(),
+ })
+ logger.WithFields(logrus.Fields{
+ "State": state,
+ "IdleBehavior": idleBehavior,
}).Infof("instance appeared in cloud")
now := time.Now()
- wp.workers[id] = &worker{
- executor: wp.newExecutor(inst),
- state: initialState,
- instance: inst,
- instType: it,
- probed: now,
- busy: now,
- updated: now,
- running: make(map[string]struct{}),
- starting: make(map[string]struct{}),
- probing: make(chan struct{}, 1),
- }
- return true
+ wkr := &worker{
+ mtx: &wp.mtx,
+ wp: wp,
+ logger: logger,
+ executor: wp.newExecutor(inst),
+ state: state,
+ idleBehavior: idleBehavior,
+ instance: inst,
+ instType: it,
+ appeared: now,
+ probed: now,
+ busy: now,
+ updated: now,
+ running: make(map[string]*remoteRunner),
+ starting: make(map[string]*remoteRunner),
+ probing: make(chan struct{}, 1),
+ }
+ wp.workers[id] = wkr
+ return wkr, true
}
// Shutdown shuts down a worker with the given type, or returns false
defer wp.mtx.Unlock()
logger := wp.logger.WithField("InstanceType", it.Name)
logger.Info("shutdown requested")
- for _, tryState := range []State{StateBooting, StateRunning} {
+ for _, tryState := range []State{StateBooting, StateIdle} {
// TODO: shutdown the worker with the longest idle
- // time (Running) or the earliest create time
- // (Booting)
+ // time (Idle) or the earliest create time (Booting)
for _, wkr := range wp.workers {
- if wkr.state != tryState || len(wkr.running)+len(wkr.starting) > 0 {
- continue
+ if wkr.idleBehavior != IdleBehaviorHold && wkr.state == tryState && wkr.instType == it {
+ logger.WithField("Instance", wkr.instance).Info("shutting down")
+ wkr.shutdown()
+ return true
}
- if wkr.instType != it {
- continue
- }
- logger = logger.WithField("Instance", wkr.instance)
- logger.Info("shutting down")
- wp.shutdown(wkr, logger)
- return true
}
}
return false
}
-// caller must have lock
-func (wp *Pool) shutdown(wkr *worker, logger logrus.FieldLogger) {
- wkr.updated = time.Now()
- wkr.state = StateShutdown
- go func() {
- err := wkr.instance.Destroy()
- if err != nil {
- logger.WithError(err).WithField("Instance", wkr.instance).Warn("shutdown failed")
- return
- }
- wp.mtx.Lock()
- wp.atQuotaUntil = time.Now()
- wp.mtx.Unlock()
- wp.notify()
- }()
-}
-
-// Workers returns the current number of workers in each state.
-func (wp *Pool) Workers() map[State]int {
+// CountWorkers returns the current number of workers in each state.
+//
+// CountWorkers blocks, if necessary, until the initial instance list
+// has been loaded from the cloud provider.
+func (wp *Pool) CountWorkers() map[State]int {
wp.setupOnce.Do(wp.setup)
+ wp.waitUntilLoaded()
wp.mtx.Lock()
defer wp.mtx.Unlock()
r := map[State]int{}
}
// Running returns the container UUIDs being prepared/run on workers.
+//
+// In the returned map, the time value indicates when the Pool
+// observed that the container process had exited. A container that
+// has not yet exited has a zero time value. The caller should use
+// KillContainer() to garbage-collect the entries for exited
+// containers.
func (wp *Pool) Running() map[string]time.Time {
wp.setupOnce.Do(wp.setup)
wp.mtx.Lock()
// StartContainer starts a container on an idle worker immediately if
// possible, otherwise returns false.
func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
- logger := wp.logger.WithFields(logrus.Fields{
- "InstanceType": it.Name,
- "ContainerUUID": ctr.UUID,
- "Priority": ctr.Priority,
- })
wp.setupOnce.Do(wp.setup)
wp.mtx.Lock()
defer wp.mtx.Unlock()
var wkr *worker
for _, w := range wp.workers {
- if w.instType == it && w.state == StateRunning && len(w.running)+len(w.starting) == 0 {
+ if w.instType == it && w.state == StateIdle {
if wkr == nil || w.busy.After(wkr.busy) {
wkr = w
}
if wkr == nil {
return false
}
- logger = logger.WithField("Instance", wkr.instance)
- logger.Debug("starting container")
- wkr.starting[ctr.UUID] = struct{}{}
- go func() {
- stdout, stderr, err := wkr.executor.Execute("crunch-run --detach '"+ctr.UUID+"'", nil)
- wp.mtx.Lock()
- defer wp.mtx.Unlock()
- now := time.Now()
- wkr.updated = now
- wkr.busy = now
- delete(wkr.starting, ctr.UUID)
- wkr.running[ctr.UUID] = struct{}{}
- wkr.lastUUID = ctr.UUID
- if err != nil {
- logger.WithField("stdout", string(stdout)).
- WithField("stderr", string(stderr)).
- WithError(err).
- Error("error starting crunch-run process")
- // Leave uuid in wkr.running, though: it's
- // possible the error was just a communication
- // failure and the process was in fact
- // started. Wait for next probe to find out.
- return
- }
- logger.Info("crunch-run process started")
- wkr.lastUUID = ctr.UUID
- }()
+ wkr.startContainer(ctr)
return true
}
//
// KillContainer returns immediately; the act of killing the container
// takes some time, and runs in the background.
-func (wp *Pool) KillContainer(uuid string) {
+func (wp *Pool) KillContainer(uuid string, reason string) {
wp.mtx.Lock()
defer wp.mtx.Unlock()
+ logger := wp.logger.WithFields(logrus.Fields{
+ "ContainerUUID": uuid,
+ "Reason": reason,
+ })
if _, ok := wp.exited[uuid]; ok {
- wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
+ logger.Debug("clearing placeholder for exited crunch-run process")
delete(wp.exited, uuid)
return
}
for _, wkr := range wp.workers {
- if _, ok := wkr.running[uuid]; ok {
- go wp.kill(wkr, uuid)
+ rr := wkr.running[uuid]
+ if rr == nil {
+ rr = wkr.starting[uuid]
+ }
+ if rr != nil {
+ rr.Kill(reason)
return
}
}
- wp.logger.WithField("ContainerUUID", uuid).Debug("cannot kill: already disappeared")
-}
-
-func (wp *Pool) kill(wkr *worker, uuid string) {
- logger := wp.logger.WithFields(logrus.Fields{
- "ContainerUUID": uuid,
- "Instance": wkr.instance,
- })
- logger.Debug("killing process")
- stdout, stderr, err := wkr.executor.Execute("crunch-run --kill "+uuid, nil)
- if err != nil {
- logger.WithFields(logrus.Fields{
- "stderr": string(stderr),
- "stdout": string(stdout),
- "error": err,
- }).Warn("kill failed")
- return
- }
- logger.Debug("killing process succeeded")
- wp.mtx.Lock()
- defer wp.mtx.Unlock()
- if _, ok := wkr.running[uuid]; ok {
- delete(wkr.running, uuid)
- wkr.updated = time.Now()
- go wp.notify()
- }
+ logger.Debug("cannot kill: already disappeared")
}
func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
if reg == nil {
reg = prometheus.NewRegistry()
}
- wp.mInstances = prometheus.NewGauge(prometheus.GaugeOpts{
- Namespace: "arvados",
- Subsystem: "dispatchcloud",
- Name: "instances_total",
- Help: "Number of cloud VMs including pending, booting, running, held, and shutting down.",
- })
- reg.MustRegister(wp.mInstances)
wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "arvados",
Subsystem: "dispatchcloud",
Help: "Number of containers reported running by cloud VMs.",
})
reg.MustRegister(wp.mContainersRunning)
-
- wp.mVCPUs = prometheus.NewGauge(prometheus.GaugeOpts{
+ wp.mInstances = prometheus.NewGaugeVec(prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Subsystem: "dispatchcloud",
+ Name: "instances_total",
+ Help: "Number of cloud VMs.",
+ }, []string{"category"})
+ reg.MustRegister(wp.mInstances)
+ wp.mInstancesPrice = prometheus.NewGaugeVec(prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Subsystem: "dispatchcloud",
+ Name: "instances_price",
+ Help: "Price of cloud VMs.",
+ }, []string{"category"})
+ reg.MustRegister(wp.mInstancesPrice)
+ wp.mVCPUs = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "arvados",
Subsystem: "dispatchcloud",
Name: "vcpus_total",
Help: "Total VCPUs on all cloud VMs.",
- })
+ }, []string{"category"})
reg.MustRegister(wp.mVCPUs)
- wp.mVCPUsInuse = prometheus.NewGauge(prometheus.GaugeOpts{
- Namespace: "arvados",
- Subsystem: "dispatchcloud",
- Name: "vcpus_inuse",
- Help: "VCPUs on cloud VMs that are running containers.",
- })
- reg.MustRegister(wp.mVCPUsInuse)
- wp.mMemory = prometheus.NewGauge(prometheus.GaugeOpts{
+ wp.mMemory = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "arvados",
Subsystem: "dispatchcloud",
Name: "memory_bytes_total",
Help: "Total memory on all cloud VMs.",
- })
+ }, []string{"category"})
reg.MustRegister(wp.mMemory)
- wp.mMemoryInuse = prometheus.NewGauge(prometheus.GaugeOpts{
- Namespace: "arvados",
- Subsystem: "dispatchcloud",
- Name: "memory_bytes_inuse",
- Help: "Memory on cloud VMs that are running containers.",
- })
- reg.MustRegister(wp.mMemoryInuse)
}
func (wp *Pool) runMetrics() {
ch := wp.Subscribe()
defer wp.Unsubscribe(ch)
+ wp.updateMetrics()
for range ch {
wp.updateMetrics()
}
wp.mtx.RLock()
defer wp.mtx.RUnlock()
- var alloc, cpu, cpuInuse, mem, memInuse int64
+ instances := map[string]int64{}
+ price := map[string]float64{}
+ cpu := map[string]int64{}
+ mem := map[string]int64{}
+ var running int64
for _, wkr := range wp.workers {
- cpu += int64(wkr.instType.VCPUs)
- mem += int64(wkr.instType.RAM)
- if len(wkr.running)+len(wkr.starting) == 0 {
- continue
+ var cat string
+ switch {
+ case len(wkr.running)+len(wkr.starting) > 0:
+ cat = "inuse"
+ case wkr.idleBehavior == IdleBehaviorHold:
+ cat = "hold"
+ case wkr.state == StateBooting:
+ cat = "booting"
+ case wkr.state == StateUnknown:
+ cat = "unknown"
+ default:
+ cat = "idle"
}
- alloc += int64(len(wkr.running) + len(wkr.starting))
- cpuInuse += int64(wkr.instType.VCPUs)
- memInuse += int64(wkr.instType.RAM)
+ instances[cat]++
+ price[cat] += wkr.instType.Price
+ cpu[cat] += int64(wkr.instType.VCPUs)
+ mem[cat] += int64(wkr.instType.RAM)
+ running += int64(len(wkr.running) + len(wkr.starting))
+ }
+ for _, cat := range []string{"inuse", "hold", "booting", "unknown", "idle"} {
+ wp.mInstances.WithLabelValues(cat).Set(float64(instances[cat]))
+ wp.mInstancesPrice.WithLabelValues(cat).Set(price[cat])
+ wp.mVCPUs.WithLabelValues(cat).Set(float64(cpu[cat]))
+ wp.mMemory.WithLabelValues(cat).Set(float64(mem[cat]))
}
- wp.mInstances.Set(float64(len(wp.workers)))
- wp.mContainersRunning.Set(float64(alloc))
- wp.mVCPUs.Set(float64(cpu))
- wp.mMemory.Set(float64(mem))
- wp.mVCPUsInuse.Set(float64(cpuInuse))
- wp.mMemoryInuse.Set(float64(memInuse))
+ wp.mContainersRunning.Set(float64(running))
}
func (wp *Pool) runProbes() {
workers = workers[:0]
wp.mtx.Lock()
for id, wkr := range wp.workers {
- if wkr.state == StateShutdown || wp.shutdownIfIdle(wkr) {
+ if wkr.state == StateShutdown || wkr.shutdownIfIdle() {
continue
}
workers = append(workers, id)
wp.mtx.Lock()
wkr, ok := wp.workers[id]
wp.mtx.Unlock()
- if !ok || wkr.state == StateShutdown {
- // Deleted/shutdown while we
- // were probing others
+ if !ok {
+ // Deleted while we were probing
+ // others
continue
}
- select {
- case wkr.probing <- struct{}{}:
- go func() {
- wp.probeAndUpdate(wkr)
- <-wkr.probing
- }()
- default:
- wp.logger.WithField("Instance", wkr.instance).Debug("still waiting for last probe to finish")
- }
+ go wkr.ProbeAndUpdate()
select {
case <-wp.stop:
return
}
}
-// caller must have lock.
-func (wp *Pool) shutdownIfBroken(wkr *worker, dur time.Duration) {
- if wkr.state == StateHold {
- return
- }
- label, threshold := "", wp.timeoutProbe
- if wkr.state == StateBooting {
- label, threshold = "new ", wp.timeoutBooting
- }
- if dur < threshold {
- return
- }
- wp.logger.WithFields(logrus.Fields{
- "Instance": wkr.instance,
- "Duration": dur,
- "Since": wkr.probed,
- "State": wkr.state,
- }).Warnf("%sinstance unresponsive, shutting down", label)
- wp.shutdown(wkr, wp.logger)
-}
-
-// caller must have lock.
-func (wp *Pool) shutdownIfIdle(wkr *worker) bool {
- if len(wkr.running)+len(wkr.starting) > 0 || wkr.state != StateRunning {
- return false
- }
- age := time.Since(wkr.busy)
- if age < wp.timeoutIdle {
- return false
- }
- logger := wp.logger.WithFields(logrus.Fields{
- "Age": age,
- "Instance": wkr.instance,
- })
- logger.Info("shutdown idle worker")
- wp.shutdown(wkr, logger)
- return true
-}
-
// Stop synchronizing with the InstanceSet.
func (wp *Pool) Stop() {
wp.setupOnce.Do(wp.setup)
wp.mtx.Lock()
for _, w := range wp.workers {
r = append(r, InstanceView{
- Instance: w.instance.String(),
+ Instance: w.instance.ID(),
+ Address: w.instance.Address(),
Price: w.instType.Price,
ArvadosInstanceType: w.instType.Name,
ProviderInstanceType: w.instType.ProviderType,
LastContainerUUID: w.lastUUID,
LastBusy: w.busy,
WorkerState: w.state.String(),
+ IdleBehavior: w.idleBehavior,
})
}
wp.mtx.Unlock()
sort.Slice(r, func(i, j int) bool {
- return strings.Compare(r[i].Instance, r[j].Instance) < 0
+ return strings.Compare(string(r[i].Instance), string(r[j].Instance)) < 0
})
return r
}
+// KillInstance destroys a cloud VM instance. It returns an error if
+// the given instance does not exist.
+func (wp *Pool) KillInstance(id cloud.InstanceID, reason string) error {
+ wkr, ok := wp.workers[id]
+ if !ok {
+ return errors.New("instance not found")
+ }
+ wkr.logger.WithField("Reason", reason).Info("shutting down")
+ wkr.shutdown()
+ return nil
+}
+
func (wp *Pool) setup() {
- wp.creating = map[arvados.InstanceType]int{}
+ wp.creating = map[string]createCall{}
wp.exited = map[string]time.Time{}
wp.workers = map[cloud.InstanceID]*worker{}
wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
func (wp *Pool) getInstancesAndSync() error {
wp.setupOnce.Do(wp.setup)
+ if err := wp.instanceSet.throttleInstances.Error(); err != nil {
+ return err
+ }
wp.logger.Debug("getting instance list")
threshold := time.Now()
instances, err := wp.instanceSet.Instances(cloud.InstanceTags{})
if err != nil {
+ wp.instanceSet.throttleInstances.CheckRateLimitError(err, wp.logger, "list instances", wp.notify)
return err
}
wp.sync(threshold, instances)
wp.logger.WithField("Instance", inst).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
continue
}
- if wp.updateWorker(inst, it, StateUnknown) {
+ if wkr, isNew := wp.updateWorker(inst, it); isNew {
notify = true
+ } else if wkr.state == StateShutdown && time.Since(wkr.destroyed) > wp.timeoutShutdown {
+ wp.logger.WithField("Instance", inst).Info("worker still listed after shutdown; retrying")
+ wkr.shutdown()
}
}
continue
}
logger := wp.logger.WithFields(logrus.Fields{
- "Instance": wkr.instance,
+ "Instance": wkr.instance.ID(),
"WorkerState": wkr.state,
})
logger.Info("instance disappeared in cloud")
delete(wp.workers, id)
- go wkr.executor.Close()
+ go wkr.Close()
notify = true
}
if !wp.loaded {
+ notify = true
wp.loaded = true
wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")
}
}
}
-// should be called in a new goroutine
-func (wp *Pool) probeAndUpdate(wkr *worker) {
- logger := wp.logger.WithField("Instance", wkr.instance)
- wp.mtx.Lock()
- updated := wkr.updated
- needProbeRunning := wkr.state == StateRunning
- needProbeBooted := wkr.state == StateUnknown || wkr.state == StateBooting
- wp.mtx.Unlock()
- if !needProbeBooted && !needProbeRunning {
- return
- }
-
- var (
- ctrUUIDs []string
- ok bool
- stderr []byte
- )
- if needProbeBooted {
- ok, stderr = wp.probeBooted(wkr)
- wp.mtx.Lock()
- if ok && (wkr.state == StateUnknown || wkr.state == StateBooting) {
- wkr.state = StateRunning
- wkr.probed = time.Now()
- logger.Info("instance booted")
- go wp.notify()
- }
- needProbeRunning = wkr.state == StateRunning
- wp.mtx.Unlock()
- }
- if needProbeRunning {
- ctrUUIDs, ok, stderr = wp.probeRunning(wkr)
- }
- logger = logger.WithField("stderr", string(stderr))
- wp.mtx.Lock()
- defer wp.mtx.Unlock()
- if !ok {
- if wkr.state == StateShutdown {
- // Skip the logging noise if shutdown was
- // initiated during probe.
- return
- }
- dur := time.Since(wkr.probed)
- logger := logger.WithFields(logrus.Fields{
- "Duration": dur,
- "State": wkr.state,
- })
- if wkr.state == StateBooting {
- logger.Debug("new instance not responding")
- } else {
- logger.Info("instance not responding")
- }
- wp.shutdownIfBroken(wkr, dur)
- return
- }
-
- updateTime := time.Now()
- wkr.probed = updateTime
-
- if updated != wkr.updated {
- // Worker was updated after the probe began, so
- // wkr.running might have a container UUID that was
- // not yet running when ctrUUIDs was generated. Leave
- // wkr.running alone and wait for the next probe to
- // catch up on any changes.
- return
- }
-
- if len(ctrUUIDs) > 0 {
- wkr.busy = updateTime
- wkr.lastUUID = ctrUUIDs[0]
- } else if len(wkr.running) > 0 {
- // Actual last-busy time was sometime between wkr.busy
- // and now. Now is the earliest opportunity to take
- // advantage of the non-busy state, though.
- wkr.busy = updateTime
- }
- running := map[string]struct{}{}
- changed := false
- for _, uuid := range ctrUUIDs {
- running[uuid] = struct{}{}
- if _, ok := wkr.running[uuid]; !ok {
- changed = true
- }
- }
- for uuid := range wkr.running {
- if _, ok := running[uuid]; !ok {
- logger.WithField("ContainerUUID", uuid).Info("crunch-run process ended")
- wp.exited[uuid] = updateTime
- changed = true
- }
- }
- if changed {
- wkr.running = running
- wkr.updated = updateTime
- go wp.notify()
- }
-}
-
-func (wp *Pool) probeRunning(wkr *worker) (running []string, ok bool, stderr []byte) {
- cmd := "crunch-run --list"
- stdout, stderr, err := wkr.executor.Execute(cmd, nil)
- if err != nil {
- wp.logger.WithFields(logrus.Fields{
- "Instance": wkr.instance,
- "Command": cmd,
- "stdout": string(stdout),
- "stderr": string(stderr),
- }).WithError(err).Warn("probe failed")
- return nil, false, stderr
- }
- stdout = bytes.TrimRight(stdout, "\n")
- if len(stdout) == 0 {
- return nil, true, stderr
+func (wp *Pool) waitUntilLoaded() {
+ ch := wp.Subscribe()
+ wp.mtx.RLock()
+ defer wp.mtx.RUnlock()
+ for !wp.loaded {
+ wp.mtx.RUnlock()
+ <-ch
+ wp.mtx.RLock()
}
- return strings.Split(string(stdout), "\n"), true, stderr
}
-func (wp *Pool) probeBooted(wkr *worker) (ok bool, stderr []byte) {
- cmd := wp.bootProbeCommand
- if cmd == "" {
- cmd = "true"
- }
- stdout, stderr, err := wkr.executor.Execute(cmd, nil)
- logger := wp.logger.WithFields(logrus.Fields{
- "Instance": wkr.instance,
- "Command": cmd,
- "stdout": string(stdout),
- "stderr": string(stderr),
- })
+// Return a random string of n hexadecimal digits (n*4 random bits). n
+// must be even.
+func randomHex(n int) string {
+ buf := make([]byte, n/2)
+ _, err := rand.Read(buf)
if err != nil {
- logger.WithError(err).Debug("boot probe failed")
- return false, stderr
+ panic(err)
}
- logger.Info("boot probe succeeded")
- return true, stderr
+ return fmt.Sprintf("%x", buf)
}