20978: Merge branch 'main' into 20978-instance-types
[arvados.git] / lib / dispatchcloud / dispatcher_test.go
index 9d7d3736b05f15ac04c189bee9993b21bb8c35a1..33d7f4e9acd2e5e15293faffefd4d2667e2899f7 100644 (file)
@@ -15,6 +15,7 @@ import (
        "net/url"
        "os"
        "sync"
+       "sync/atomic"
        "time"
 
        "git.arvados.org/arvados.git/lib/config"
@@ -49,8 +50,10 @@ func (s *DispatcherSuite) SetUpTest(c *check.C) {
        s.stubDriver = &test.StubDriver{
                HostKey:                   hostpriv,
                AuthorizedKeys:            []ssh.PublicKey{dispatchpub},
+               ErrorRateCreate:           0.1,
                ErrorRateDestroy:          0.1,
                MinTimeBetweenCreateCalls: time.Millisecond,
+               QuotaMaxInstances:         10,
        }
 
        // We need the postgresql connection info from the integration
@@ -69,6 +72,8 @@ func (s *DispatcherSuite) SetUpTest(c *check.C) {
                        DispatchPrivateKey:     string(dispatchprivraw),
                        StaleLockTimeout:       arvados.Duration(5 * time.Millisecond),
                        RuntimeEngine:          "stub",
+                       MaxDispatchAttempts:    10,
+                       MaximumPriceFactor:     1.5,
                        CloudVMs: arvados.CloudVMsConfig{
                                Driver:               "test",
                                SyncInterval:         arvados.Duration(10 * time.Millisecond),
@@ -77,6 +82,7 @@ func (s *DispatcherSuite) SetUpTest(c *check.C) {
                                TimeoutProbe:         arvados.Duration(15 * time.Millisecond),
                                TimeoutShutdown:      arvados.Duration(5 * time.Millisecond),
                                MaxCloudOpsPerSecond: 500,
+                               InitialQuotaEstimate: 8,
                                PollInterval:         arvados.Duration(5 * time.Millisecond),
                                ProbeInterval:        arvados.Duration(5 * time.Millisecond),
                                MaxProbesPerSecond:   1000,
@@ -102,9 +108,13 @@ func (s *DispatcherSuite) SetUpTest(c *check.C) {
 
        arvClient, err := arvados.NewClientFromConfig(s.cluster)
        c.Assert(err, check.IsNil)
-       arvClient.Timeout = 0 // disable auto-retry
+       // Disable auto-retry
+       arvClient.Timeout = 0
 
-       s.error503Server = httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusServiceUnavailable) }))
+       s.error503Server = httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+               c.Logf("503 stub: returning 503")
+               w.WriteHeader(http.StatusServiceUnavailable)
+       }))
        arvClient.Client = &http.Client{
                Transport: &http.Transport{
                        Proxy: s.arvClientProxy(c),
@@ -135,6 +145,7 @@ func (s *DispatcherSuite) TearDownTest(c *check.C) {
 func (s *DispatcherSuite) arvClientProxy(c *check.C) func(*http.Request) (*url.URL, error) {
        return func(req *http.Request) (*url.URL, error) {
                if req.URL.Path == "/503" {
+                       c.Logf("arvClientProxy: proxying to 503 stub")
                        return url.Parse(s.error503Server.URL)
                } else {
                        return nil, nil
@@ -150,7 +161,8 @@ func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) {
        Drivers["test"] = s.stubDriver
        s.disp.setupOnce.Do(s.disp.initialize)
        queue := &test.Queue{
-               ChooseType: func(ctr *arvados.Container) (arvados.InstanceType, error) {
+               MaxDispatchAttempts: 5,
+               ChooseType: func(ctr *arvados.Container) ([]arvados.InstanceType, error) {
                        return ChooseInstanceType(s.cluster, ctr)
                },
                Logger: ctxlog.TestLogger(c),
@@ -184,6 +196,7 @@ func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) {
                delete(waiting, ctr.UUID)
                if len(waiting) == 100 {
                        // trigger scheduler maxConcurrency limit
+                       c.Logf("test: requesting 503 in order to trigger maxConcurrency limit")
                        s.disp.ArvClient.RequestAndDecode(nil, "GET", "503", nil, nil)
                }
                if len(waiting) == 0 {
@@ -194,26 +207,43 @@ func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) {
                finishContainer(ctr)
                return int(rand.Uint32() & 0x3)
        }
+       var countCapacityErrors int64
        n := 0
        s.stubDriver.Queue = queue
-       s.stubDriver.SetupVM = func(stubvm *test.StubVM) {
+       s.stubDriver.SetupVM = func(stubvm *test.StubVM) error {
+               if pt := stubvm.Instance().ProviderType(); pt == test.InstanceType(6).ProviderType {
+                       c.Logf("test: returning capacity error for instance type %s", pt)
+                       atomic.AddInt64(&countCapacityErrors, 1)
+                       return test.CapacityError{InstanceTypeSpecific: true}
+               }
                n++
                stubvm.Boot = time.Now().Add(time.Duration(rand.Int63n(int64(5 * time.Millisecond))))
                stubvm.CrunchRunDetachDelay = time.Duration(rand.Int63n(int64(10 * time.Millisecond)))
                stubvm.ExecuteContainer = executeContainer
                stubvm.CrashRunningContainer = finishContainer
                stubvm.ExtraCrunchRunArgs = "'--runtime-engine=stub' '--foo' '--extra='\\''args'\\'''"
-               switch n % 7 {
-               case 0:
+               switch {
+               case n%7 == 0:
+                       // some instances start out OK but then stop
+                       // running any commands
                        stubvm.Broken = time.Now().Add(time.Duration(rand.Int63n(90)) * time.Millisecond)
-               case 1:
+               case n%7 == 1:
+                       // some instances never pass a run-probe
                        stubvm.CrunchRunMissing = true
-               case 2:
+               case n%7 == 2:
+                       // some instances start out OK but then start
+                       // reporting themselves as broken
                        stubvm.ReportBroken = time.Now().Add(time.Duration(rand.Int63n(200)) * time.Millisecond)
+               case n == 3:
+                       // 1 instance is completely broken, ensuring
+                       // the boot_outcomes{outcome="failure"} metric
+                       // is not zero
+                       stubvm.CrunchRunCrashRate = 1
                default:
                        stubvm.CrunchRunCrashRate = 0.1
                        stubvm.ArvMountDeadlockRate = 0.1
                }
+               return nil
        }
        s.stubDriver.Bugf = c.Errorf
 
@@ -227,9 +257,9 @@ func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) {
                select {
                case <-done:
                        // loop will end because len(waiting)==0
-               case <-time.After(3 * time.Second):
+               case <-time.After(5 * time.Second):
                        if len(waiting) >= waswaiting {
-                               c.Fatalf("timed out; no progress in 3s while waiting for %d containers: %q", len(waiting), waiting)
+                               c.Fatalf("timed out; no progress in s while waiting for %d containers: %q", len(waiting), waiting)
                        }
                }
        }
@@ -249,6 +279,8 @@ func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) {
                }
        }
 
+       c.Check(countCapacityErrors, check.Not(check.Equals), int64(0))
+
        req := httptest.NewRequest("GET", "/metrics", nil)
        req.Header.Set("Authorization", "Bearer "+s.cluster.ManagementToken)
        resp := httptest.NewRecorder()
@@ -357,6 +389,7 @@ func (s *DispatcherSuite) TestInstancesAPI(c *check.C) {
        sr := getInstances()
        c.Check(len(sr.Items), check.Equals, 0)
 
+       s.stubDriver.ErrorRateCreate = 0
        ch := s.disp.pool.Subscribe()
        defer s.disp.pool.Unsubscribe(ch)
        ok := s.disp.pool.Create(test.InstanceType(1))