// cluster configuration.
func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSetID cloud.InstanceSetID, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, installPublicKey ssh.PublicKey, cluster *arvados.Cluster) *Pool {
wp := &Pool{
- logger: logger,
- arvClient: arvClient,
- instanceSetID: instanceSetID,
- instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
- newExecutor: newExecutor,
- bootProbeCommand: cluster.Containers.CloudVMs.BootProbeCommand,
- runnerSource: cluster.Containers.CloudVMs.DeployRunnerBinary,
- imageID: cloud.ImageID(cluster.Containers.CloudVMs.ImageID),
- instanceTypes: cluster.InstanceTypes,
- maxProbesPerSecond: cluster.Containers.CloudVMs.MaxProbesPerSecond,
- probeInterval: duration(cluster.Containers.CloudVMs.ProbeInterval, defaultProbeInterval),
- syncInterval: duration(cluster.Containers.CloudVMs.SyncInterval, defaultSyncInterval),
- timeoutIdle: duration(cluster.Containers.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
- timeoutBooting: duration(cluster.Containers.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
- timeoutProbe: duration(cluster.Containers.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
- timeoutShutdown: duration(cluster.Containers.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
- timeoutTERM: duration(cluster.Containers.CloudVMs.TimeoutTERM, defaultTimeoutTERM),
- timeoutSignal: duration(cluster.Containers.CloudVMs.TimeoutSignal, defaultTimeoutSignal),
- installPublicKey: installPublicKey,
- tagKeyPrefix: cluster.Containers.CloudVMs.TagKeyPrefix,
- stop: make(chan bool),
+ logger: logger,
+ arvClient: arvClient,
+ instanceSetID: instanceSetID,
+ instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
+ newExecutor: newExecutor,
+ bootProbeCommand: cluster.Containers.CloudVMs.BootProbeCommand,
+ runnerSource: cluster.Containers.CloudVMs.DeployRunnerBinary,
+ imageID: cloud.ImageID(cluster.Containers.CloudVMs.ImageID),
+ instanceTypes: cluster.InstanceTypes,
+ maxProbesPerSecond: cluster.Containers.CloudVMs.MaxProbesPerSecond,
+ maxConcurrentInstanceCreateOps: cluster.Containers.CloudVMs.MaxConcurrentInstanceCreateOps,
+ probeInterval: duration(cluster.Containers.CloudVMs.ProbeInterval, defaultProbeInterval),
+ syncInterval: duration(cluster.Containers.CloudVMs.SyncInterval, defaultSyncInterval),
+ timeoutIdle: duration(cluster.Containers.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
+ timeoutBooting: duration(cluster.Containers.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
+ timeoutProbe: duration(cluster.Containers.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
+ timeoutShutdown: duration(cluster.Containers.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
+ timeoutTERM: duration(cluster.Containers.CloudVMs.TimeoutTERM, defaultTimeoutTERM),
+ timeoutSignal: duration(cluster.Containers.CloudVMs.TimeoutSignal, defaultTimeoutSignal),
+ installPublicKey: installPublicKey,
+ tagKeyPrefix: cluster.Containers.CloudVMs.TagKeyPrefix,
+ stop: make(chan bool),
}
wp.registerMetrics(reg)
go func() {
// zero Pool should not be used. Call NewPool to create a new Pool.
type Pool struct {
// configuration
- logger logrus.FieldLogger
- arvClient *arvados.Client
- instanceSetID cloud.InstanceSetID
- instanceSet *throttledInstanceSet
- newExecutor func(cloud.Instance) Executor
- bootProbeCommand string
- runnerSource string
- imageID cloud.ImageID
- instanceTypes map[string]arvados.InstanceType
- syncInterval time.Duration
- probeInterval time.Duration
- maxProbesPerSecond int
- timeoutIdle time.Duration
- timeoutBooting time.Duration
- timeoutProbe time.Duration
- timeoutShutdown time.Duration
- timeoutTERM time.Duration
- timeoutSignal time.Duration
- installPublicKey ssh.PublicKey
- tagKeyPrefix string
+ logger logrus.FieldLogger
+ arvClient *arvados.Client
+ instanceSetID cloud.InstanceSetID
+ instanceSet *throttledInstanceSet
+ newExecutor func(cloud.Instance) Executor
+ bootProbeCommand string
+ runnerSource string
+ imageID cloud.ImageID
+ instanceTypes map[string]arvados.InstanceType
+ syncInterval time.Duration
+ probeInterval time.Duration
+ maxProbesPerSecond int
+ maxConcurrentInstanceCreateOps int
+ timeoutIdle time.Duration
+ timeoutBooting time.Duration
+ timeoutProbe time.Duration
+ timeoutShutdown time.Duration
+ timeoutTERM time.Duration
+ timeoutSignal time.Duration
+ installPublicKey ssh.PublicKey
+ tagKeyPrefix string
// private state
subscribers map[<-chan struct{}]chan<- struct{}
runnerMD5 [md5.Size]byte
runnerCmd string
- throttleCreate throttle
- throttleInstances throttle
-
mContainersRunning prometheus.Gauge
mInstances *prometheus.GaugeVec
mInstancesPrice *prometheus.GaugeVec
}
wp.mtx.Lock()
defer wp.mtx.Unlock()
- if time.Now().Before(wp.atQuotaUntil) || wp.throttleCreate.Error() != nil {
+ if time.Now().Before(wp.atQuotaUntil) || wp.instanceSet.throttleCreate.Error() != nil {
+ return false
+ }
+ // The maxConcurrentInstanceCreateOps knob throttles the number of node create
+ // requests in flight. It was added to work around a limitation in Azure's
+ // managed disks, which support no more than 20 concurrent node creation
+ // requests from a single disk image (cf.
+ // https://docs.microsoft.com/en-us/azure/virtual-machines/linux/capture-image).
+ // The code assumes that node creation, from Azure's perspective, means the
+ // period until the instance appears in the "get all instances" list.
+ if wp.maxConcurrentInstanceCreateOps > 0 && len(wp.creating) >= wp.maxConcurrentInstanceCreateOps {
+ logger.Info("reached MaxConcurrentInstanceCreateOps")
+ wp.instanceSet.throttleCreate.ErrorUntil(errors.New("reached MaxConcurrentInstanceCreateOps"), time.Now().Add(5*time.Second), wp.notify)
return false
}
now := time.Now()
}
}
+func (suite *PoolSuite) TestNodeCreateThrottle(c *check.C) {
+ logger := ctxlog.TestLogger(c)
+ driver := test.StubDriver{HoldCloudOps: true}
+ instanceSet, err := driver.InstanceSet(nil, "test-instance-set-id", nil, logger)
+ c.Assert(err, check.IsNil)
+
+ type1 := test.InstanceType(1)
+ pool := &Pool{
+ logger: logger,
+ instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
+ maxConcurrentInstanceCreateOps: 1,
+ instanceTypes: arvados.InstanceTypeMap{
+ type1.Name: type1,
+ },
+ }
+
+ c.Check(pool.Unallocated()[type1], check.Equals, 0)
+ res := pool.Create(type1)
+ c.Check(pool.Unallocated()[type1], check.Equals, 1)
+ c.Check(res, check.Equals, true)
+
+ res = pool.Create(type1)
+ c.Check(pool.Unallocated()[type1], check.Equals, 1)
+ c.Check(res, check.Equals, false)
+
+ pool.instanceSet.throttleCreate.err = nil
+ pool.maxConcurrentInstanceCreateOps = 2
+
+ res = pool.Create(type1)
+ c.Check(pool.Unallocated()[type1], check.Equals, 2)
+ c.Check(res, check.Equals, true)
+
+ pool.instanceSet.throttleCreate.err = nil
+ pool.maxConcurrentInstanceCreateOps = 0
+
+ res = pool.Create(type1)
+ c.Check(pool.Unallocated()[type1], check.Equals, 3)
+ c.Check(res, check.Equals, true)
+}
+
func (suite *PoolSuite) TestCreateUnallocShutdown(c *check.C) {
logger := ctxlog.TestLogger(c)
driver := test.StubDriver{HoldCloudOps: true}