X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/e62dc0ffea0983f6a17ef16b368a0dceb62b98ea..be8ed479042df4fdefe1fd18c1e2e984e1c99bc0:/lib/dispatchcloud/worker/pool.go diff --git a/lib/dispatchcloud/worker/pool.go b/lib/dispatchcloud/worker/pool.go index fc3301d863..1665a1e43d 100644 --- a/lib/dispatchcloud/worker/pool.go +++ b/lib/dispatchcloud/worker/pool.go @@ -5,6 +5,7 @@ package worker import ( + "errors" "io" "sort" "strings" @@ -19,24 +20,25 @@ import ( const ( tagKeyInstanceType = "InstanceType" - tagKeyHold = "Hold" + tagKeyIdleBehavior = "IdleBehavior" ) // An InstanceView shows a worker's current state and recent activity. type InstanceView struct { - Instance string - Price float64 - ArvadosInstanceType string - ProviderInstanceType string - LastContainerUUID string - LastBusy time.Time - WorkerState string + Instance cloud.InstanceID `json:"instance"` + Price float64 `json:"price"` + ArvadosInstanceType string `json:"arvados_instance_type"` + ProviderInstanceType string `json:"provider_instance_type"` + LastContainerUUID string `json:"last_container_uuid"` + LastBusy time.Time `json:"last_busy"` + WorkerState string `json:"worker_state"` + IdleBehavior IdleBehavior `json:"idle_behavior"` } // An Executor executes shell commands on a remote host. type Executor interface { // Run cmd on the current target. - Execute(cmd string, stdin io.Reader) (stdout, stderr []byte, err error) + Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error) // Use the given target for subsequent operations. The new // target is the same host as the previous target, but it @@ -61,6 +63,13 @@ const ( defaultTimeoutBooting = time.Minute * 10 defaultTimeoutProbe = time.Minute * 10 defaultTimeoutShutdown = time.Second * 10 + + // Time after a quota error to try again anyway, even if no + // instances have been shutdown. + quotaErrorTTL = time.Minute + + // Time between "X failed because rate limiting" messages + logRateLimitErrorInterval = time.Second * 10 ) func duration(conf arvados.Duration, def time.Duration) time.Duration { @@ -75,10 +84,11 @@ func duration(conf arvados.Duration, def time.Duration) time.Duration { // // New instances are configured and set up according to the given // cluster configuration. -func NewPool(logger logrus.FieldLogger, reg *prometheus.Registry, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, cluster *arvados.Cluster) *Pool { +func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, cluster *arvados.Cluster) *Pool { wp := &Pool{ logger: logger, - instanceSet: instanceSet, + arvClient: arvClient, + instanceSet: &throttledInstanceSet{InstanceSet: instanceSet}, newExecutor: newExecutor, bootProbeCommand: cluster.CloudVMs.BootProbeCommand, imageID: cloud.ImageID(cluster.CloudVMs.ImageID), @@ -107,7 +117,8 @@ func NewPool(logger logrus.FieldLogger, reg *prometheus.Registry, instanceSet cl type Pool struct { // configuration logger logrus.FieldLogger - instanceSet cloud.InstanceSet + arvClient *arvados.Client + instanceSet *throttledInstanceSet newExecutor func(cloud.Instance) Executor bootProbeCommand string imageID cloud.ImageID @@ -132,7 +143,11 @@ type Pool struct { mtx sync.RWMutex setupOnce sync.Once + throttleCreate throttle + throttleInstances throttle + mInstances prometheus.Gauge + mInstancesPrice prometheus.Gauge mContainersRunning prometheus.Gauge mVCPUs prometheus.Gauge mVCPUsInuse prometheus.Gauge @@ -140,15 +155,21 @@ type Pool struct { mMemoryInuse prometheus.Gauge } -// Subscribe returns a channel that becomes ready whenever a worker's -// state changes. +// Subscribe returns a buffered channel that becomes ready after any +// change to the pool's state that could have scheduling implications: +// a worker's state changes, a new worker appears, the cloud +// provider's API rate limiting period ends, etc. +// +// Additional events that occur while the channel is already ready +// will be dropped, so it is OK if the caller services the channel +// slowly. // // Example: // // ch := wp.Subscribe() // defer wp.Unsubscribe(ch) // for range ch { -// // ...try scheduling some work... +// tryScheduling(wp) // if done { // break // } @@ -171,7 +192,8 @@ func (wp *Pool) Unsubscribe(ch <-chan struct{}) { } // Unallocated returns the number of unallocated (creating + booting + -// idle + unknown) workers for each instance type. +// idle + unknown) workers for each instance type. Workers in +// hold/drain mode are not included. func (wp *Pool) Unallocated() map[arvados.InstanceType]int { wp.setupOnce.Do(wp.setup) wp.mtx.RLock() @@ -182,7 +204,7 @@ func (wp *Pool) Unallocated() map[arvados.InstanceType]int { creating[it] = len(times) } for _, wkr := range wp.workers { - if !(wkr.state == StateIdle || wkr.state == StateBooting || wkr.state == StateUnknown) { + if !(wkr.state == StateIdle || wkr.state == StateBooting || wkr.state == StateUnknown) || wkr.idleBehavior != IdleBehaviorRun { continue } it := wkr.instType @@ -212,15 +234,23 @@ func (wp *Pool) Unallocated() map[arvados.InstanceType]int { // Create a new instance with the given type, and add it to the worker // pool. The worker is added immediately; instance creation runs in // the background. -func (wp *Pool) Create(it arvados.InstanceType) error { +// +// Create returns false if a pre-existing error state prevents it from +// even attempting to create a new instance. Those errors are logged +// by the Pool, so the caller does not need to log anything in such +// cases. +func (wp *Pool) Create(it arvados.InstanceType) bool { logger := wp.logger.WithField("InstanceType", it.Name) wp.setupOnce.Do(wp.setup) wp.mtx.Lock() defer wp.mtx.Unlock() - if time.Now().Before(wp.atQuotaUntil) { - return wp.atQuotaErr + if time.Now().Before(wp.atQuotaUntil) || wp.throttleCreate.Error() != nil { + return false + } + tags := cloud.InstanceTags{ + tagKeyInstanceType: it.Name, + tagKeyIdleBehavior: string(IdleBehaviorRun), } - tags := cloud.InstanceTags{tagKeyInstanceType: it.Name} now := time.Now() wp.creating[it] = append(wp.creating[it], now) go func() { @@ -236,17 +266,19 @@ func (wp *Pool) Create(it arvados.InstanceType) error { break } } - if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() { - wp.atQuotaErr = err - wp.atQuotaUntil = time.Now().Add(time.Minute) - } if err != nil { + if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() { + wp.atQuotaErr = err + wp.atQuotaUntil = time.Now().Add(quotaErrorTTL) + time.AfterFunc(quotaErrorTTL, wp.notify) + } logger.WithError(err).Error("create failed") + wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify) return } wp.updateWorker(inst, it, StateBooting) }() - return nil + return true } // AtQuota returns true if Create is not expected to work at the @@ -257,6 +289,21 @@ func (wp *Pool) AtQuota() bool { return time.Now().Before(wp.atQuotaUntil) } +// SetIdleBehavior determines how the indicated instance will behave +// when it has no containers running. +func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior) error { + wp.mtx.Lock() + defer wp.mtx.Unlock() + wkr, ok := wp.workers[id] + if !ok { + return errors.New("requested instance does not exist") + } + wkr.idleBehavior = idleBehavior + wkr.saveTags() + wkr.shutdownIfIdle() + return nil +} + // Add or update worker attached to the given instance. Use // initialState if a new worker is created. // @@ -272,32 +319,46 @@ func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initi if initialState == StateBooting && wkr.state == StateUnknown { wkr.state = StateBooting } + wkr.saveTags() return wkr, false } - if initialState == StateUnknown && inst.Tags()[tagKeyHold] != "" { - initialState = StateHold + + // If an instance has a valid IdleBehavior tag when it first + // appears, initialize the new worker accordingly (this is how + // we restore IdleBehavior that was set by a prior dispatch + // process); otherwise, default to "run". After this, + // wkr.idleBehavior is the source of truth, and will only be + // changed via SetIdleBehavior(). + idleBehavior := IdleBehavior(inst.Tags()[tagKeyIdleBehavior]) + if !validIdleBehavior[idleBehavior] { + idleBehavior = IdleBehaviorRun } + logger := wp.logger.WithFields(logrus.Fields{ "InstanceType": it.Name, "Instance": inst, }) - logger.WithField("State", initialState).Infof("instance appeared in cloud") + logger.WithFields(logrus.Fields{ + "State": initialState, + "IdleBehavior": idleBehavior, + }).Infof("instance appeared in cloud") now := time.Now() wkr := &worker{ - mtx: &wp.mtx, - wp: wp, - logger: logger, - executor: wp.newExecutor(inst), - state: initialState, - instance: inst, - instType: it, - appeared: now, - probed: now, - busy: now, - updated: now, - running: make(map[string]struct{}), - starting: make(map[string]struct{}), - probing: make(chan struct{}, 1), + mtx: &wp.mtx, + wp: wp, + logger: logger, + executor: wp.newExecutor(inst), + state: initialState, + idleBehavior: idleBehavior, + instance: inst, + instType: it, + appeared: now, + probed: now, + busy: now, + updated: now, + running: make(map[string]struct{}), + starting: make(map[string]struct{}), + probing: make(chan struct{}, 1), } wp.workers[id] = wkr return wkr, true @@ -320,7 +381,7 @@ func (wp *Pool) Shutdown(it arvados.InstanceType) bool { // TODO: shutdown the worker with the longest idle // time (Idle) or the earliest create time (Booting) for _, wkr := range wp.workers { - if wkr.state == tryState && wkr.instType == it { + if wkr.idleBehavior != IdleBehaviorHold && wkr.state == tryState && wkr.instType == it { logger.WithField("Instance", wkr.instance).Info("shutting down") wkr.shutdown() return true @@ -343,6 +404,12 @@ func (wp *Pool) CountWorkers() map[State]int { } // Running returns the container UUIDs being prepared/run on workers. +// +// In the returned map, the time value indicates when the Pool +// observed that the container process had exited. A container that +// has not yet exited has a zero time value. The caller should use +// KillContainer() to garbage-collect the entries for exited +// containers. func (wp *Pool) Running() map[string]time.Time { wp.setupOnce.Do(wp.setup) wp.mtx.Lock() @@ -411,7 +478,7 @@ func (wp *Pool) kill(wkr *worker, uuid string) { "Instance": wkr.instance, }) logger.Debug("killing process") - stdout, stderr, err := wkr.executor.Execute("crunch-run --kill 15 "+uuid, nil) + stdout, stderr, err := wkr.executor.Execute(nil, "crunch-run --kill 15 "+uuid, nil) if err != nil { logger.WithFields(logrus.Fields{ "stderr": string(stderr), @@ -444,6 +511,13 @@ func (wp *Pool) registerMetrics(reg *prometheus.Registry) { Help: "Number of cloud VMs including pending, booting, running, held, and shutting down.", }) reg.MustRegister(wp.mInstances) + wp.mInstancesPrice = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "arvados", + Subsystem: "dispatchcloud", + Name: "instances_price_total", + Help: "Sum of prices of all cloud VMs including pending, booting, running, held, and shutting down.", + }) + reg.MustRegister(wp.mInstancesPrice) wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{ Namespace: "arvados", Subsystem: "dispatchcloud", @@ -494,8 +568,10 @@ func (wp *Pool) updateMetrics() { wp.mtx.RLock() defer wp.mtx.RUnlock() + var price float64 var alloc, cpu, cpuInuse, mem, memInuse int64 for _, wkr := range wp.workers { + price += wkr.instType.Price cpu += int64(wkr.instType.VCPUs) mem += int64(wkr.instType.RAM) if len(wkr.running)+len(wkr.starting) == 0 { @@ -506,6 +582,7 @@ func (wp *Pool) updateMetrics() { memInuse += int64(wkr.instType.RAM) } wp.mInstances.Set(float64(len(wp.workers))) + wp.mInstancesPrice.Set(price) wp.mContainersRunning.Set(float64(alloc)) wp.mVCPUs.Set(float64(cpu)) wp.mMemory.Set(float64(mem)) @@ -588,18 +665,19 @@ func (wp *Pool) Instances() []InstanceView { wp.mtx.Lock() for _, w := range wp.workers { r = append(r, InstanceView{ - Instance: w.instance.String(), + Instance: w.instance.ID(), Price: w.instType.Price, ArvadosInstanceType: w.instType.Name, ProviderInstanceType: w.instType.ProviderType, LastContainerUUID: w.lastUUID, LastBusy: w.busy, WorkerState: w.state.String(), + IdleBehavior: w.idleBehavior, }) } wp.mtx.Unlock() sort.Slice(r, func(i, j int) bool { - return strings.Compare(r[i].Instance, r[j].Instance) < 0 + return strings.Compare(string(r[i].Instance), string(r[j].Instance)) < 0 }) return r } @@ -624,10 +702,14 @@ func (wp *Pool) notify() { func (wp *Pool) getInstancesAndSync() error { wp.setupOnce.Do(wp.setup) + if err := wp.instanceSet.throttleInstances.Error(); err != nil { + return err + } wp.logger.Debug("getting instance list") threshold := time.Now() instances, err := wp.instanceSet.Instances(cloud.InstanceTags{}) if err != nil { + wp.instanceSet.throttleInstances.CheckRateLimitError(err, wp.logger, "list instances", wp.notify) return err } wp.sync(threshold, instances)