// instances have been shutdown.
quotaErrorTTL = time.Minute
+ // Time after a capacity error to try again
+ capacityErrorTTL = time.Minute
+
// Time between "X failed because rate limiting" messages
logRateLimitErrorInterval = time.Second * 10
)
runnerArgs []string // extra args passed to crunch-run
// private state
- subscribers map[<-chan struct{}]chan<- struct{}
- creating map[string]createCall // unfinished (cloud.InstanceSet)Create calls (key is instance secret)
- workers map[cloud.InstanceID]*worker
- loaded bool // loaded list of instances from InstanceSet at least once
- exited map[string]time.Time // containers whose crunch-run proc has exited, but ForgetContainer has not been called
- atQuotaUntil time.Time
- atQuotaErr cloud.QuotaError
- stop chan bool
- mtx sync.RWMutex
- setupOnce sync.Once
- runnerData []byte
- runnerMD5 [md5.Size]byte
- runnerCmd string
+ subscribers map[<-chan struct{}]chan<- struct{}
+ creating map[string]createCall // unfinished (cloud.InstanceSet)Create calls (key is instance secret)
+ workers map[cloud.InstanceID]*worker
+ loaded bool // loaded list of instances from InstanceSet at least once
+ exited map[string]time.Time // containers whose crunch-run proc has exited, but ForgetContainer has not been called
+ atQuotaUntilFewerInstances int
+ atQuotaUntil time.Time
+ atQuotaErr cloud.QuotaError
+ atCapacityUntil map[string]time.Time
+ stop chan bool
+ mtx sync.RWMutex
+ setupOnce sync.Once
+ runnerData []byte
+ runnerMD5 [md5.Size]byte
+ runnerCmd string
mContainersRunning prometheus.Gauge
mInstances *prometheus.GaugeVec
// Boot probe is certain to fail.
return false
}
- wp.mtx.Lock()
- defer wp.mtx.Unlock()
- if time.Now().Before(wp.atQuotaUntil) ||
- wp.instanceSet.throttleCreate.Error() != nil ||
- (wp.maxInstances > 0 && wp.maxInstances <= len(wp.workers)+len(wp.creating)) {
+ if wp.AtCapacity(it) || wp.AtQuota() || wp.instanceSet.throttleCreate.Error() != nil {
return false
}
+ wp.mtx.Lock()
+ defer wp.mtx.Unlock()
// The maxConcurrentInstanceCreateOps knob throttles the number of node create
// requests in flight. It was added to work around a limitation in Azure's
// managed disks, which support no more than 20 concurrent node creation
if err != nil {
if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
wp.atQuotaErr = err
- wp.atQuotaUntil = time.Now().Add(quotaErrorTTL)
- time.AfterFunc(quotaErrorTTL, wp.notify)
+ n := len(wp.workers) + len(wp.creating) - 1
+ if n < 1 {
+ // Quota error with no
+ // instances running --
+ // nothing to do but wait
+ wp.atQuotaUntilFewerInstances = 0
+ wp.atQuotaUntil = time.Now().Add(quotaErrorTTL)
+ time.AfterFunc(quotaErrorTTL, wp.notify)
+ logger.WithField("atQuotaUntil", wp.atQuotaUntil).Info("quota error with 0 running -- waiting for quotaErrorTTL")
+ } else if n < wp.atQuotaUntilFewerInstances || wp.atQuotaUntilFewerInstances == 0 {
+ // Quota error with N
+ // instances running -- report
+ // AtQuota until some
+ // instances shut down
+ wp.atQuotaUntilFewerInstances = n
+ wp.atQuotaUntil = time.Time{}
+ logger.WithField("atQuotaUntilFewerInstances", n).Info("quota error -- waiting for next instance shutdown")
+ }
+ }
+ if err, ok := err.(cloud.CapacityError); ok && err.IsCapacityError() {
+ capKey := it.ProviderType
+ if !err.IsInstanceTypeSpecific() {
+ // set capacity flag for all
+ // instance types
+ capKey = ""
+ }
+ if wp.atCapacityUntil == nil {
+ wp.atCapacityUntil = map[string]time.Time{}
+ }
+ wp.atCapacityUntil[capKey] = time.Now().Add(capacityErrorTTL)
+ time.AfterFunc(capacityErrorTTL, wp.notify)
}
logger.WithError(err).Error("create failed")
wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify)
return true
}
+// AtCapacity returns true if Create() is currently expected to fail
+// for the given instance type.
+func (wp *Pool) AtCapacity(it arvados.InstanceType) bool {
+ wp.mtx.Lock()
+ defer wp.mtx.Unlock()
+ if t, ok := wp.atCapacityUntil[it.ProviderType]; ok && time.Now().Before(t) {
+ // at capacity for this instance type
+ return true
+ }
+ if t, ok := wp.atCapacityUntil[""]; ok && time.Now().Before(t) {
+ // at capacity for all instance types
+ return true
+ }
+ return false
+}
+
// AtQuota returns true if Create is not expected to work at the
// moment (e.g., cloud provider has reported quota errors, or we are
// already at our own configured quota).
func (wp *Pool) AtQuota() bool {
wp.mtx.Lock()
defer wp.mtx.Unlock()
- return time.Now().Before(wp.atQuotaUntil) || (wp.maxInstances > 0 && wp.maxInstances <= len(wp.workers)+len(wp.creating))
+ return wp.atQuotaUntilFewerInstances > 0 ||
+ time.Now().Before(wp.atQuotaUntil) ||
+ (wp.maxInstances > 0 && wp.maxInstances <= len(wp.workers)+len(wp.creating))
}
// SetIdleBehavior determines how the indicated instance will behave
// KillInstance destroys a cloud VM instance. It returns an error if
// the given instance does not exist.
func (wp *Pool) KillInstance(id cloud.InstanceID, reason string) error {
+ wp.setupOnce.Do(wp.setup)
+ wp.mtx.Lock()
+ defer wp.mtx.Unlock()
wkr, ok := wp.workers[id]
if !ok {
return errors.New("instance not found")
notify = true
}
+ if wp.atQuotaUntilFewerInstances > len(wp.workers)+len(wp.creating) {
+ // After syncing, there are fewer instances (including
+ // pending creates) than there were last time we saw a
+ // quota error. This might mean it's now possible to
+ // create new instances. Reset our "at quota" state.
+ wp.atQuotaUntilFewerInstances = 0
+ }
+
if !wp.loaded {
notify = true
wp.loaded = true