package worker
import (
- "context"
+ "errors"
"io"
"sort"
"strings"
"git.curoverse.com/arvados.git/lib/cloud"
"git.curoverse.com/arvados.git/sdk/go/arvados"
- "github.com/Sirupsen/logrus"
"github.com/prometheus/client_golang/prometheus"
+ "github.com/sirupsen/logrus"
)
const (
tagKeyInstanceType = "InstanceType"
- tagKeyHold = "Hold"
+ tagKeyIdleBehavior = "IdleBehavior"
)
// An InstanceView shows a worker's current state and recent activity.
type InstanceView struct {
- Instance string
- Price float64
- ArvadosInstanceType string
- ProviderInstanceType string
- LastContainerUUID string
- LastBusy time.Time
- WorkerState string
+ Instance cloud.InstanceID `json:"instance"`
+ Price float64 `json:"price"`
+ ArvadosInstanceType string `json:"arvados_instance_type"`
+ ProviderInstanceType string `json:"provider_instance_type"`
+ LastContainerUUID string `json:"last_container_uuid"`
+ LastBusy time.Time `json:"last_busy"`
+ WorkerState string `json:"worker_state"`
+ IdleBehavior IdleBehavior `json:"idle_behavior"`
}
// An Executor executes shell commands on a remote host.
type Executor interface {
// Run cmd on the current target.
- Execute(cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
+ Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
// Use the given target for subsequent operations. The new
// target is the same host as the previous target, but it
defaultTimeoutBooting = time.Minute * 10
defaultTimeoutProbe = time.Minute * 10
defaultTimeoutShutdown = time.Second * 10
+
+ // Time after a quota error to try again anyway, even if no
+ // instances have been shutdown.
+ quotaErrorTTL = time.Minute
+
+ // Time between "X failed because rate limiting" messages
+ logRateLimitErrorInterval = time.Second * 10
)
func duration(conf arvados.Duration, def time.Duration) time.Duration {
//
// New instances are configured and set up according to the given
// cluster configuration.
-func NewPool(logger logrus.FieldLogger, reg *prometheus.Registry, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, cluster *arvados.Cluster) *Pool {
+func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, cluster *arvados.Cluster) *Pool {
wp := &Pool{
logger: logger,
- instanceSet: instanceSet,
+ arvClient: arvClient,
+ instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
newExecutor: newExecutor,
bootProbeCommand: cluster.CloudVMs.BootProbeCommand,
imageID: cloud.ImageID(cluster.CloudVMs.ImageID),
type Pool struct {
// configuration
logger logrus.FieldLogger
- instanceSet cloud.InstanceSet
+ arvClient *arvados.Client
+ instanceSet *throttledInstanceSet
newExecutor func(cloud.Instance) Executor
bootProbeCommand string
imageID cloud.ImageID
mtx sync.RWMutex
setupOnce sync.Once
+ throttleCreate throttle
+ throttleInstances throttle
+
mInstances prometheus.Gauge
+ mInstancesPrice prometheus.Gauge
mContainersRunning prometheus.Gauge
mVCPUs prometheus.Gauge
mVCPUsInuse prometheus.Gauge
mMemoryInuse prometheus.Gauge
}
-// Subscribe returns a channel that becomes ready whenever a worker's
-// state changes.
+// Subscribe returns a buffered channel that becomes ready after any
+// change to the pool's state that could have scheduling implications:
+// a worker's state changes, a new worker appears, the cloud
+// provider's API rate limiting period ends, etc.
+//
+// Additional events that occur while the channel is already ready
+// will be dropped, so it is OK if the caller services the channel
+// slowly.
//
// Example:
//
// ch := wp.Subscribe()
// defer wp.Unsubscribe(ch)
// for range ch {
-// // ...try scheduling some work...
+// tryScheduling(wp)
// if done {
// break
// }
}
// Unallocated returns the number of unallocated (creating + booting +
-// idle + unknown) workers for each instance type.
+// idle + unknown) workers for each instance type. Workers in
+// hold/drain mode are not included.
func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
wp.setupOnce.Do(wp.setup)
wp.mtx.RLock()
creating[it] = len(times)
}
for _, wkr := range wp.workers {
- if !(wkr.state == StateIdle || wkr.state == StateBooting || wkr.state == StateUnknown) {
+ if !(wkr.state == StateIdle || wkr.state == StateBooting || wkr.state == StateUnknown) || wkr.idleBehavior != IdleBehaviorRun {
continue
}
it := wkr.instType
// Create a new instance with the given type, and add it to the worker
// pool. The worker is added immediately; instance creation runs in
// the background.
-func (wp *Pool) Create(it arvados.InstanceType) error {
+//
+// Create returns false if a pre-existing error state prevents it from
+// even attempting to create a new instance. Those errors are logged
+// by the Pool, so the caller does not need to log anything in such
+// cases.
+func (wp *Pool) Create(it arvados.InstanceType) bool {
logger := wp.logger.WithField("InstanceType", it.Name)
wp.setupOnce.Do(wp.setup)
wp.mtx.Lock()
defer wp.mtx.Unlock()
- if time.Now().Before(wp.atQuotaUntil) {
- return wp.atQuotaErr
+ if time.Now().Before(wp.atQuotaUntil) || wp.throttleCreate.Error() != nil {
+ return false
+ }
+ tags := cloud.InstanceTags{
+ tagKeyInstanceType: it.Name,
+ tagKeyIdleBehavior: string(IdleBehaviorRun),
}
- tags := cloud.InstanceTags{tagKeyInstanceType: it.Name}
now := time.Now()
wp.creating[it] = append(wp.creating[it], now)
go func() {
defer wp.notify()
- inst, err := wp.instanceSet.Create(context.TODO(), it, wp.imageID, tags, nil)
+ inst, err := wp.instanceSet.Create(it, wp.imageID, tags, nil)
wp.mtx.Lock()
defer wp.mtx.Unlock()
// Remove our timestamp marker from wp.creating
break
}
}
- if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
- wp.atQuotaErr = err
- wp.atQuotaUntil = time.Now().Add(time.Minute)
- }
if err != nil {
+ if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
+ wp.atQuotaErr = err
+ wp.atQuotaUntil = time.Now().Add(quotaErrorTTL)
+ time.AfterFunc(quotaErrorTTL, wp.notify)
+ }
logger.WithError(err).Error("create failed")
+ wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify)
return
}
wp.updateWorker(inst, it, StateBooting)
}()
- return nil
+ return true
}
// AtQuota returns true if Create is not expected to work at the
return time.Now().Before(wp.atQuotaUntil)
}
+// SetIdleBehavior determines how the indicated instance will behave
+// when it has no containers running.
+func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior) error {
+ wp.mtx.Lock()
+ defer wp.mtx.Unlock()
+ wkr, ok := wp.workers[id]
+ if !ok {
+ return errors.New("requested instance does not exist")
+ }
+ wkr.idleBehavior = idleBehavior
+ wkr.saveTags()
+ wkr.shutdownIfIdle()
+ return nil
+}
+
// Add or update worker attached to the given instance. Use
// initialState if a new worker is created.
//
if initialState == StateBooting && wkr.state == StateUnknown {
wkr.state = StateBooting
}
+ wkr.saveTags()
return wkr, false
}
- if initialState == StateUnknown && inst.Tags()[tagKeyHold] != "" {
- initialState = StateHold
+
+ // If an instance has a valid IdleBehavior tag when it first
+ // appears, initialize the new worker accordingly (this is how
+ // we restore IdleBehavior that was set by a prior dispatch
+ // process); otherwise, default to "run". After this,
+ // wkr.idleBehavior is the source of truth, and will only be
+ // changed via SetIdleBehavior().
+ idleBehavior := IdleBehavior(inst.Tags()[tagKeyIdleBehavior])
+ if !validIdleBehavior[idleBehavior] {
+ idleBehavior = IdleBehaviorRun
}
+
logger := wp.logger.WithFields(logrus.Fields{
"InstanceType": it.Name,
"Instance": inst,
})
- logger.WithField("State", initialState).Infof("instance appeared in cloud")
+ logger.WithFields(logrus.Fields{
+ "State": initialState,
+ "IdleBehavior": idleBehavior,
+ }).Infof("instance appeared in cloud")
now := time.Now()
wkr := &worker{
- mtx: &wp.mtx,
- wp: wp,
- logger: logger,
- executor: wp.newExecutor(inst),
- state: initialState,
- instance: inst,
- instType: it,
- appeared: now,
- probed: now,
- busy: now,
- updated: now,
- running: make(map[string]struct{}),
- starting: make(map[string]struct{}),
- probing: make(chan struct{}, 1),
+ mtx: &wp.mtx,
+ wp: wp,
+ logger: logger,
+ executor: wp.newExecutor(inst),
+ state: initialState,
+ idleBehavior: idleBehavior,
+ instance: inst,
+ instType: it,
+ appeared: now,
+ probed: now,
+ busy: now,
+ updated: now,
+ running: make(map[string]struct{}),
+ starting: make(map[string]struct{}),
+ probing: make(chan struct{}, 1),
}
wp.workers[id] = wkr
return wkr, true
// TODO: shutdown the worker with the longest idle
// time (Idle) or the earliest create time (Booting)
for _, wkr := range wp.workers {
- if wkr.state == tryState && wkr.instType == it {
+ if wkr.idleBehavior != IdleBehaviorHold && wkr.state == tryState && wkr.instType == it {
logger.WithField("Instance", wkr.instance).Info("shutting down")
wkr.shutdown()
return true
}
// Running returns the container UUIDs being prepared/run on workers.
+//
+// In the returned map, the time value indicates when the Pool
+// observed that the container process had exited. A container that
+// has not yet exited has a zero time value. The caller should use
+// KillContainer() to garbage-collect the entries for exited
+// containers.
func (wp *Pool) Running() map[string]time.Time {
wp.setupOnce.Do(wp.setup)
wp.mtx.Lock()
"Instance": wkr.instance,
})
logger.Debug("killing process")
- stdout, stderr, err := wkr.executor.Execute("crunch-run --kill 15 "+uuid, nil)
+ stdout, stderr, err := wkr.executor.Execute(nil, "crunch-run --kill 15 "+uuid, nil)
if err != nil {
logger.WithFields(logrus.Fields{
"stderr": string(stderr),
Help: "Number of cloud VMs including pending, booting, running, held, and shutting down.",
})
reg.MustRegister(wp.mInstances)
+ wp.mInstancesPrice = prometheus.NewGauge(prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Subsystem: "dispatchcloud",
+ Name: "instances_price_total",
+ Help: "Sum of prices of all cloud VMs including pending, booting, running, held, and shutting down.",
+ })
+ reg.MustRegister(wp.mInstancesPrice)
wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "arvados",
Subsystem: "dispatchcloud",
wp.mtx.RLock()
defer wp.mtx.RUnlock()
+ var price float64
var alloc, cpu, cpuInuse, mem, memInuse int64
for _, wkr := range wp.workers {
+ price += wkr.instType.Price
cpu += int64(wkr.instType.VCPUs)
mem += int64(wkr.instType.RAM)
if len(wkr.running)+len(wkr.starting) == 0 {
memInuse += int64(wkr.instType.RAM)
}
wp.mInstances.Set(float64(len(wp.workers)))
+ wp.mInstancesPrice.Set(price)
wp.mContainersRunning.Set(float64(alloc))
wp.mVCPUs.Set(float64(cpu))
wp.mMemory.Set(float64(mem))
wp.mtx.Lock()
for _, w := range wp.workers {
r = append(r, InstanceView{
- Instance: w.instance.String(),
+ Instance: w.instance.ID(),
Price: w.instType.Price,
ArvadosInstanceType: w.instType.Name,
ProviderInstanceType: w.instType.ProviderType,
LastContainerUUID: w.lastUUID,
LastBusy: w.busy,
WorkerState: w.state.String(),
+ IdleBehavior: w.idleBehavior,
})
}
wp.mtx.Unlock()
sort.Slice(r, func(i, j int) bool {
- return strings.Compare(r[i].Instance, r[j].Instance) < 0
+ return strings.Compare(string(r[i].Instance), string(r[j].Instance)) < 0
})
return r
}
func (wp *Pool) getInstancesAndSync() error {
wp.setupOnce.Do(wp.setup)
+ if err := wp.instanceSet.throttleInstances.Error(); err != nil {
+ return err
+ }
wp.logger.Debug("getting instance list")
threshold := time.Now()
- instances, err := wp.instanceSet.Instances(context.TODO(), cloud.InstanceTags{})
+ instances, err := wp.instanceSet.Instances(cloud.InstanceTags{})
if err != nil {
+ wp.instanceSet.throttleInstances.CheckRateLimitError(err, wp.logger, "list instances", wp.notify)
return err
}
wp.sync(threshold, instances)