Merge branch '21146-pysdk-new-websockets'
[arvados.git] / lib / dispatchcloud / dispatcher_test.go
index 6c057edc70223001a1a8d66a3134158806d559cb..51c2c3d6a35543cf586f30daac39a0822ad5a90e 100644 (file)
@@ -15,6 +15,7 @@ import (
        "net/url"
        "os"
        "sync"
+       "sync/atomic"
        "time"
 
        "git.arvados.org/arvados.git/lib/config"
@@ -72,6 +73,7 @@ func (s *DispatcherSuite) SetUpTest(c *check.C) {
                        StaleLockTimeout:       arvados.Duration(5 * time.Millisecond),
                        RuntimeEngine:          "stub",
                        MaxDispatchAttempts:    10,
+                       MaximumPriceFactor:     1.5,
                        CloudVMs: arvados.CloudVMsConfig{
                                Driver:               "test",
                                SyncInterval:         arvados.Duration(10 * time.Millisecond),
@@ -160,7 +162,7 @@ func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) {
        s.disp.setupOnce.Do(s.disp.initialize)
        queue := &test.Queue{
                MaxDispatchAttempts: 5,
-               ChooseType: func(ctr *arvados.Container) (arvados.InstanceType, error) {
+               ChooseType: func(ctr *arvados.Container) ([]arvados.InstanceType, error) {
                        return ChooseInstanceType(s.cluster, ctr)
                },
                Logger: ctxlog.TestLogger(c),
@@ -205,10 +207,17 @@ func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) {
                finishContainer(ctr)
                return int(rand.Uint32() & 0x3)
        }
-       n := 0
+       var countCapacityErrors int64
+       vmCount := int32(0)
        s.stubDriver.Queue = queue
-       s.stubDriver.SetupVM = func(stubvm *test.StubVM) {
-               n++
+       s.stubDriver.SetupVM = func(stubvm *test.StubVM) error {
+               if pt := stubvm.Instance().ProviderType(); pt == test.InstanceType(6).ProviderType {
+                       c.Logf("test: returning capacity error for instance type %s", pt)
+                       atomic.AddInt64(&countCapacityErrors, 1)
+                       return test.CapacityError{InstanceTypeSpecific: true}
+               }
+               n := atomic.AddInt32(&vmCount, 1)
+               c.Logf("SetupVM: instance %s n=%d", stubvm.Instance(), n)
                stubvm.Boot = time.Now().Add(time.Duration(rand.Int63n(int64(5 * time.Millisecond))))
                stubvm.CrunchRunDetachDelay = time.Duration(rand.Int63n(int64(10 * time.Millisecond)))
                stubvm.ExecuteContainer = executeContainer
@@ -230,11 +239,12 @@ func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) {
                        // 1 instance is completely broken, ensuring
                        // the boot_outcomes{outcome="failure"} metric
                        // is not zero
-                       stubvm.CrunchRunCrashRate = 1
+                       stubvm.Broken = time.Now()
                default:
                        stubvm.CrunchRunCrashRate = 0.1
                        stubvm.ArvMountDeadlockRate = 0.1
                }
+               return nil
        }
        s.stubDriver.Bugf = c.Errorf
 
@@ -270,6 +280,8 @@ func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) {
                }
        }
 
+       c.Check(countCapacityErrors, check.Not(check.Equals), int64(0))
+
        req := httptest.NewRequest("GET", "/metrics", nil)
        req.Header.Set("Authorization", "Bearer "+s.cluster.ManagementToken)
        resp := httptest.NewRecorder()