CloudVMs:
ImageID: "zzzzz-compute-v1597349873"
Driver: azure
+ # (azure) managed disks: set MaxConcurrentInstanceCreateOps to 20 to avoid timeouts, cf
+ # https://docs.microsoft.com/en-us/azure/virtual-machines/linux/capture-image
+ MaxConcurrentInstanceCreateOps: 20
DriverParameters:
# Credentials.
SubscriptionID: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX
# unlimited).
MaxCloudOpsPerSecond: 0
+ # Maximum concurrent node creation operations (0 = unlimited). This is
+ # recommended by Azure in certain scenarios (see
+ # https://docs.microsoft.com/en-us/azure/virtual-machines/linux/capture-image)
+ # and can be used with other cloud providers too, if desired.
+ MaxConcurrentInstanceCreateOps: 0
+
# Interval between cloud provider syncs/updates ("list all
# instances").
SyncInterval: 1m
VocabularyURL: ""
FileViewersConfigURL: ""
+ # Idle time after which the user's session will be auto closed.
+ # This feature is disabled when set to zero.
+ IdleTimeout: 0s
+
# Workbench welcome screen, this is HTML text that will be
# incorporated directly onto the page.
WelcomePageHTML: |
"Login.Test": true,
"Login.Test.Enable": true,
"Login.Test.Users": false,
- "Login.TokenLifetime": false,
+ "Login.TokenLifetime": true,
"Mail": true,
"Mail.EmailFrom": false,
"Mail.IssueReporterEmailFrom": false,
"Workbench.EnableGettingStartedPopup": true,
"Workbench.EnablePublicProjectsPage": true,
"Workbench.FileViewersConfigURL": true,
+ "Workbench.IdleTimeout": true,
"Workbench.InactivePageHTML": true,
"Workbench.LogViewerMaxBytes": true,
"Workbench.MultiSiteSearch": true,
# unlimited).
MaxCloudOpsPerSecond: 0
+ # Maximum concurrent node creation operations (0 = unlimited). This is
+ # recommended by Azure in certain scenarios (see
+ # https://docs.microsoft.com/en-us/azure/virtual-machines/linux/capture-image)
+ # and can be used with other cloud providers too, if desired.
+ MaxConcurrentInstanceCreateOps: 0
+
# Interval between cloud provider syncs/updates ("list all
# instances").
SyncInterval: 1m
VocabularyURL: ""
FileViewersConfigURL: ""
+ # Idle time after which the user's session will be auto closed.
+ # This feature is disabled when set to zero.
+ IdleTimeout: 0s
+
# Workbench welcome screen, this is HTML text that will be
# incorporated directly onto the page.
WelcomePageHTML: |
if unalloc[it] > 0 {
unalloc[it]--
} else if sch.pool.AtQuota() {
- logger.Debug("not starting: AtQuota and no unalloc workers")
+ // Don't let lower-priority containers
+ // starve this one by using keeping
+ // idle workers alive on different
+ // instance types.
+ logger.Debug("unlocking: AtQuota and no unalloc workers")
+ sch.queue.Unlock(ctr.UUID)
overquota = sorted[i:]
break tryrun
+ } else if logger.Info("creating new instance"); sch.pool.Create(it) {
+ // Success. (Note pool.Create works
+ // asynchronously and does its own
+ // logging, so we don't need to.)
} else {
- logger.Info("creating new instance")
- if !sch.pool.Create(it) {
- // (Note pool.Create works
- // asynchronously and logs its
- // own failures, so we don't
- // need to log this as a
- // failure.)
-
- sch.queue.Unlock(ctr.UUID)
- // Don't let lower-priority
- // containers starve this one
- // by using keeping idle
- // workers alive on different
- // instance types. TODO:
- // avoid getting starved here
- // if instances of a specific
- // type always fail.
- overquota = sorted[i:]
- break tryrun
- }
+ // Failed despite not being at quota,
+ // e.g., cloud ops throttled. TODO:
+ // avoid getting starved here if
+ // instances of a specific type always
+ // fail.
+ continue
}
if dontstart[it] {
idle map[arvados.InstanceType]int
unknown map[arvados.InstanceType]int
running map[string]time.Time
- atQuota bool
+ quota int
canCreate int
creates []arvados.InstanceType
starts []string
sync.Mutex
}
-func (p *stubPool) AtQuota() bool { return p.atQuota }
+func (p *stubPool) AtQuota() bool {
+ p.Lock()
+ defer p.Unlock()
+ return len(p.unalloc)+len(p.running)+len(p.unknown) >= p.quota
+}
func (p *stubPool) Subscribe() <-chan struct{} { return p.notify }
func (p *stubPool) Unsubscribe(<-chan struct{}) {}
func (p *stubPool) Running() map[string]time.Time {
type SchedulerSuite struct{}
-// Assign priority=4 container to idle node. Create a new instance for
-// the priority=3 container. Don't try to start any priority<3
-// containers because priority=3 container didn't start
-// immediately. Don't try to create any other nodes after the failed
-// create.
+// Assign priority=4 container to idle node. Create new instances for
+// the priority=3, 2, 1 containers.
func (*SchedulerSuite) TestUseIdleWorkers(c *check.C) {
ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
queue := test.Queue{
}
queue.Update()
pool := stubPool{
+ quota: 1000,
unalloc: map[arvados.InstanceType]int{
test.InstanceType(1): 1,
test.InstanceType(2): 2,
canCreate: 0,
}
New(ctx, &queue, &pool, time.Millisecond, time.Millisecond).runQueue()
- c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType{test.InstanceType(1)})
+ c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType{test.InstanceType(1), test.InstanceType(1), test.InstanceType(1)})
c.Check(pool.starts, check.DeepEquals, []string{test.ContainerUUID(4)})
c.Check(pool.running, check.HasLen, 1)
for uuid := range pool.running {
}
}
-// If Create() fails, shutdown some nodes, and don't call Create()
-// again. Don't call Create() at all if AtQuota() is true.
+// If pool.AtQuota() is true, shutdown some unalloc nodes, and don't
+// call Create().
func (*SchedulerSuite) TestShutdownAtQuota(c *check.C) {
ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
- for quota := 0; quota < 2; quota++ {
+ for quota := 1; quota < 3; quota++ {
c.Logf("quota=%d", quota)
shouldCreate := []arvados.InstanceType{}
- for i := 0; i < quota; i++ {
+ for i := 1; i < quota; i++ {
shouldCreate = append(shouldCreate, test.InstanceType(3))
}
queue := test.Queue{
}
queue.Update()
pool := stubPool{
- atQuota: quota == 0,
+ quota: quota,
unalloc: map[arvados.InstanceType]int{
test.InstanceType(2): 2,
},
}
New(ctx, &queue, &pool, time.Millisecond, time.Millisecond).runQueue()
c.Check(pool.creates, check.DeepEquals, shouldCreate)
- c.Check(pool.starts, check.DeepEquals, []string{})
- c.Check(pool.shutdowns, check.Not(check.Equals), 0)
+ if len(shouldCreate) == 0 {
+ c.Check(pool.starts, check.DeepEquals, []string{})
+ c.Check(pool.shutdowns, check.Not(check.Equals), 0)
+ } else {
+ c.Check(pool.starts, check.DeepEquals, []string{test.ContainerUUID(2)})
+ c.Check(pool.shutdowns, check.Equals, 0)
+ }
}
}
func (*SchedulerSuite) TestStartWhileCreating(c *check.C) {
ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
pool := stubPool{
+ quota: 1000,
unalloc: map[arvados.InstanceType]int{
test.InstanceType(1): 2,
test.InstanceType(2): 2,
func (*SchedulerSuite) TestKillNonexistentContainer(c *check.C) {
ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
pool := stubPool{
+ quota: 1000,
unalloc: map[arvados.InstanceType]int{
test.InstanceType(2): 0,
},
svm.Lock()
defer svm.Unlock()
if svm.running[uuid] != pid {
- if !completed {
- bugf := svm.sis.driver.Bugf
- if bugf == nil {
- bugf = logger.Warnf
- }
- bugf("[test] StubDriver bug or caller bug: pid %d exiting, running[%s]==%d", pid, uuid, svm.running[uuid])
+ bugf := svm.sis.driver.Bugf
+ if bugf == nil {
+ bugf = logger.Warnf
}
+ bugf("[test] StubDriver bug or caller bug: pid %d exiting, running[%s]==%d", pid, uuid, svm.running[uuid])
} else {
delete(svm.running, uuid)
}
time.Sleep(time.Duration(math_rand.Float64()*20) * time.Millisecond)
svm.Lock()
- killed := svm.running[uuid] != pid
+ killed := svm.killing[uuid]
svm.Unlock()
if killed || wantCrashEarly {
return
}
if strings.HasPrefix(command, "crunch-run --kill ") {
svm.Lock()
- pid, running := svm.running[uuid]
- if running && !svm.killing[uuid] {
+ _, running := svm.running[uuid]
+ if running {
svm.killing[uuid] = true
- go func() {
- time.Sleep(time.Duration(math_rand.Float64()*30) * time.Millisecond)
- svm.Lock()
- defer svm.Unlock()
- if svm.running[uuid] == pid {
- // Kill only if the running entry
- // hasn't since been killed and
- // replaced with a different one.
- delete(svm.running, uuid)
- }
- delete(svm.killing, uuid)
- }()
svm.Unlock()
time.Sleep(time.Duration(math_rand.Float64()*2) * time.Millisecond)
svm.Lock()
// cluster configuration.
func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSetID cloud.InstanceSetID, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, installPublicKey ssh.PublicKey, cluster *arvados.Cluster) *Pool {
wp := &Pool{
- logger: logger,
- arvClient: arvClient,
- instanceSetID: instanceSetID,
- instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
- newExecutor: newExecutor,
- bootProbeCommand: cluster.Containers.CloudVMs.BootProbeCommand,
- runnerSource: cluster.Containers.CloudVMs.DeployRunnerBinary,
- imageID: cloud.ImageID(cluster.Containers.CloudVMs.ImageID),
- instanceTypes: cluster.InstanceTypes,
- maxProbesPerSecond: cluster.Containers.CloudVMs.MaxProbesPerSecond,
- probeInterval: duration(cluster.Containers.CloudVMs.ProbeInterval, defaultProbeInterval),
- syncInterval: duration(cluster.Containers.CloudVMs.SyncInterval, defaultSyncInterval),
- timeoutIdle: duration(cluster.Containers.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
- timeoutBooting: duration(cluster.Containers.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
- timeoutProbe: duration(cluster.Containers.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
- timeoutShutdown: duration(cluster.Containers.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
- timeoutTERM: duration(cluster.Containers.CloudVMs.TimeoutTERM, defaultTimeoutTERM),
- timeoutSignal: duration(cluster.Containers.CloudVMs.TimeoutSignal, defaultTimeoutSignal),
- installPublicKey: installPublicKey,
- tagKeyPrefix: cluster.Containers.CloudVMs.TagKeyPrefix,
- stop: make(chan bool),
+ logger: logger,
+ arvClient: arvClient,
+ instanceSetID: instanceSetID,
+ instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
+ newExecutor: newExecutor,
+ bootProbeCommand: cluster.Containers.CloudVMs.BootProbeCommand,
+ runnerSource: cluster.Containers.CloudVMs.DeployRunnerBinary,
+ imageID: cloud.ImageID(cluster.Containers.CloudVMs.ImageID),
+ instanceTypes: cluster.InstanceTypes,
+ maxProbesPerSecond: cluster.Containers.CloudVMs.MaxProbesPerSecond,
+ maxConcurrentInstanceCreateOps: cluster.Containers.CloudVMs.MaxConcurrentInstanceCreateOps,
+ probeInterval: duration(cluster.Containers.CloudVMs.ProbeInterval, defaultProbeInterval),
+ syncInterval: duration(cluster.Containers.CloudVMs.SyncInterval, defaultSyncInterval),
+ timeoutIdle: duration(cluster.Containers.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
+ timeoutBooting: duration(cluster.Containers.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
+ timeoutProbe: duration(cluster.Containers.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
+ timeoutShutdown: duration(cluster.Containers.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
+ timeoutTERM: duration(cluster.Containers.CloudVMs.TimeoutTERM, defaultTimeoutTERM),
+ timeoutSignal: duration(cluster.Containers.CloudVMs.TimeoutSignal, defaultTimeoutSignal),
+ installPublicKey: installPublicKey,
+ tagKeyPrefix: cluster.Containers.CloudVMs.TagKeyPrefix,
+ stop: make(chan bool),
}
wp.registerMetrics(reg)
go func() {
// zero Pool should not be used. Call NewPool to create a new Pool.
type Pool struct {
// configuration
- logger logrus.FieldLogger
- arvClient *arvados.Client
- instanceSetID cloud.InstanceSetID
- instanceSet *throttledInstanceSet
- newExecutor func(cloud.Instance) Executor
- bootProbeCommand string
- runnerSource string
- imageID cloud.ImageID
- instanceTypes map[string]arvados.InstanceType
- syncInterval time.Duration
- probeInterval time.Duration
- maxProbesPerSecond int
- timeoutIdle time.Duration
- timeoutBooting time.Duration
- timeoutProbe time.Duration
- timeoutShutdown time.Duration
- timeoutTERM time.Duration
- timeoutSignal time.Duration
- installPublicKey ssh.PublicKey
- tagKeyPrefix string
+ logger logrus.FieldLogger
+ arvClient *arvados.Client
+ instanceSetID cloud.InstanceSetID
+ instanceSet *throttledInstanceSet
+ newExecutor func(cloud.Instance) Executor
+ bootProbeCommand string
+ runnerSource string
+ imageID cloud.ImageID
+ instanceTypes map[string]arvados.InstanceType
+ syncInterval time.Duration
+ probeInterval time.Duration
+ maxProbesPerSecond int
+ maxConcurrentInstanceCreateOps int
+ timeoutIdle time.Duration
+ timeoutBooting time.Duration
+ timeoutProbe time.Duration
+ timeoutShutdown time.Duration
+ timeoutTERM time.Duration
+ timeoutSignal time.Duration
+ installPublicKey ssh.PublicKey
+ tagKeyPrefix string
// private state
subscribers map[<-chan struct{}]chan<- struct{}
runnerMD5 [md5.Size]byte
runnerCmd string
- throttleCreate throttle
- throttleInstances throttle
-
mContainersRunning prometheus.Gauge
mInstances *prometheus.GaugeVec
mInstancesPrice *prometheus.GaugeVec
}
wp.mtx.Lock()
defer wp.mtx.Unlock()
- if time.Now().Before(wp.atQuotaUntil) || wp.throttleCreate.Error() != nil {
+ if time.Now().Before(wp.atQuotaUntil) || wp.instanceSet.throttleCreate.Error() != nil {
+ return false
+ }
+ // The maxConcurrentInstanceCreateOps knob throttles the number of node create
+ // requests in flight. It was added to work around a limitation in Azure's
+ // managed disks, which support no more than 20 concurrent node creation
+ // requests from a single disk image (cf.
+ // https://docs.microsoft.com/en-us/azure/virtual-machines/linux/capture-image).
+ // The code assumes that node creation, from Azure's perspective, means the
+ // period until the instance appears in the "get all instances" list.
+ if wp.maxConcurrentInstanceCreateOps > 0 && len(wp.creating) >= wp.maxConcurrentInstanceCreateOps {
+ logger.Info("reached MaxConcurrentInstanceCreateOps")
+ wp.instanceSet.throttleCreate.ErrorUntil(errors.New("reached MaxConcurrentInstanceCreateOps"), time.Now().Add(5*time.Second), wp.notify)
return false
}
now := time.Now()
}
}
+func (suite *PoolSuite) TestNodeCreateThrottle(c *check.C) {
+ logger := ctxlog.TestLogger(c)
+ driver := test.StubDriver{HoldCloudOps: true}
+ instanceSet, err := driver.InstanceSet(nil, "test-instance-set-id", nil, logger)
+ c.Assert(err, check.IsNil)
+
+ type1 := test.InstanceType(1)
+ pool := &Pool{
+ logger: logger,
+ instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
+ maxConcurrentInstanceCreateOps: 1,
+ instanceTypes: arvados.InstanceTypeMap{
+ type1.Name: type1,
+ },
+ }
+
+ c.Check(pool.Unallocated()[type1], check.Equals, 0)
+ res := pool.Create(type1)
+ c.Check(pool.Unallocated()[type1], check.Equals, 1)
+ c.Check(res, check.Equals, true)
+
+ res = pool.Create(type1)
+ c.Check(pool.Unallocated()[type1], check.Equals, 1)
+ c.Check(res, check.Equals, false)
+
+ pool.instanceSet.throttleCreate.err = nil
+ pool.maxConcurrentInstanceCreateOps = 2
+
+ res = pool.Create(type1)
+ c.Check(pool.Unallocated()[type1], check.Equals, 2)
+ c.Check(res, check.Equals, true)
+
+ pool.instanceSet.throttleCreate.err = nil
+ pool.maxConcurrentInstanceCreateOps = 0
+
+ res = pool.Create(type1)
+ c.Check(pool.Unallocated()[type1], check.Equals, 3)
+ c.Check(res, check.Equals, true)
+}
+
func (suite *PoolSuite) TestCreateUnallocShutdown(c *check.C) {
logger := ctxlog.TestLogger(c)
driver := test.StubDriver{HoldCloudOps: true}
InactivePageHTML string
SSHHelpPageHTML string
SSHHelpHostSuffix string
+ IdleTimeout Duration
}
ForceLegacyAPI14 bool
type CloudVMsConfig struct {
Enable bool
- BootProbeCommand string
- DeployRunnerBinary string
- ImageID string
- MaxCloudOpsPerSecond int
- MaxProbesPerSecond int
- PollInterval Duration
- ProbeInterval Duration
- SSHPort string
- SyncInterval Duration
- TimeoutBooting Duration
- TimeoutIdle Duration
- TimeoutProbe Duration
- TimeoutShutdown Duration
- TimeoutSignal Duration
- TimeoutTERM Duration
- ResourceTags map[string]string
- TagKeyPrefix string
+ BootProbeCommand string
+ DeployRunnerBinary string
+ ImageID string
+ MaxCloudOpsPerSecond int
+ MaxProbesPerSecond int
+ MaxConcurrentInstanceCreateOps int
+ PollInterval Duration
+ ProbeInterval Duration
+ SSHPort string
+ SyncInterval Duration
+ TimeoutBooting Duration
+ TimeoutIdle Duration
+ TimeoutProbe Duration
+ TimeoutShutdown Duration
+ TimeoutSignal Duration
+ TimeoutTERM Duration
+ ResourceTags map[string]string
+ TagKeyPrefix string
Driver string
DriverParameters json.RawMessage
'future',
'google-api-python-client >=1.6.2, <1.7',
'httplib2 >=0.9.2',
- 'pycurl >=7.19.5.1',
+ 'pycurl >=7.19.5.1, <7.43.0.4', # 7.43.0.4 removes support for python2
'ruamel.yaml >=0.15.54, <=0.16.5',
'setuptools',
'ws4py >=0.4.2',
// CommonPrefixes is nil, which confuses some clients.
// Fix by using this nested struct instead.
CommonPrefixes []commonPrefix
+ // Similarly, we need omitempty here, because an empty
+ // tag confuses some clients (e.g.,
+ // github.com/aws/aws-sdk-net never terminates its
+ // paging loop).
+ NextMarker string `xml:"NextMarker,omitempty"`
}
resp := listResp{
ListResp: s3.ListResp{
c.Check(err, check.IsNil)
// HeadObject
- exists, err = bucket.Exists(prefix + "sailboat.txt")
+ resp, err := bucket.Head(prefix+"sailboat.txt", nil)
c.Check(err, check.IsNil)
- c.Check(exists, check.Equals, true)
+ c.Check(resp.StatusCode, check.Equals, http.StatusOK)
+ c.Check(resp.ContentLength, check.Equals, int64(4))
}
func (s *IntegrationSuite) TestS3CollectionPutObjectSuccess(c *check.C) {
c.Check(string(buf), check.Not(check.Matches), `(?ms).*CommonPrefixes.*`)
}
+// If there is no delimiter in the request, or the results are not
+// truncated, the NextMarker XML tag should not appear in the response
+// body.
+func (s *IntegrationSuite) TestS3ListNoNextMarker(c *check.C) {
+ stage := s.s3setup(c)
+ defer stage.teardown(c)
+
+ for _, query := range []string{"prefix=e&delimiter=/", ""} {
+ req, err := http.NewRequest("GET", stage.collbucket.URL("/"), nil)
+ c.Assert(err, check.IsNil)
+ req.Header.Set("Authorization", "AWS "+arvadostest.ActiveTokenV2+":none")
+ req.URL.RawQuery = query
+ resp, err := http.DefaultClient.Do(req)
+ c.Assert(err, check.IsNil)
+ buf, err := ioutil.ReadAll(resp.Body)
+ c.Assert(err, check.IsNil)
+ c.Check(string(buf), check.Not(check.Matches), `(?ms).*NextMarker.*`)
+ }
+}
+
func (s *IntegrationSuite) TestS3CollectionList(c *check.C) {
stage := s.s3setup(c)
defer stage.teardown(c)