21123: Fix inadvertent use of run-tests db in dispatcher tests.
[arvados.git] / lib / dispatchcloud / dispatcher_test.go
index 6a8adcad908938a5bcd414d6873a050c1857e419..3c23d81d58561152e064051ebd0117f31f9ddbdd 100644 (file)
@@ -8,13 +8,16 @@ import (
        "context"
        "crypto/tls"
        "encoding/json"
+       "fmt"
        "io/ioutil"
        "math/rand"
        "net/http"
        "net/http/httptest"
        "net/url"
        "os"
+       "strings"
        "sync"
+       "sync/atomic"
        "time"
 
        "git.arvados.org/arvados.git/lib/config"
@@ -49,8 +52,10 @@ func (s *DispatcherSuite) SetUpTest(c *check.C) {
        s.stubDriver = &test.StubDriver{
                HostKey:                   hostpriv,
                AuthorizedKeys:            []ssh.PublicKey{dispatchpub},
+               ErrorRateCreate:           0.1,
                ErrorRateDestroy:          0.1,
                MinTimeBetweenCreateCalls: time.Millisecond,
+               QuotaMaxInstances:         10,
        }
 
        // We need the postgresql connection info from the integration
@@ -70,6 +75,7 @@ func (s *DispatcherSuite) SetUpTest(c *check.C) {
                        StaleLockTimeout:       arvados.Duration(5 * time.Millisecond),
                        RuntimeEngine:          "stub",
                        MaxDispatchAttempts:    10,
+                       MaximumPriceFactor:     1.5,
                        CloudVMs: arvados.CloudVMsConfig{
                                Driver:               "test",
                                SyncInterval:         arvados.Duration(10 * time.Millisecond),
@@ -78,6 +84,7 @@ func (s *DispatcherSuite) SetUpTest(c *check.C) {
                                TimeoutProbe:         arvados.Duration(15 * time.Millisecond),
                                TimeoutShutdown:      arvados.Duration(5 * time.Millisecond),
                                MaxCloudOpsPerSecond: 500,
+                               InitialQuotaEstimate: 8,
                                PollInterval:         arvados.Duration(5 * time.Millisecond),
                                ProbeInterval:        arvados.Duration(5 * time.Millisecond),
                                MaxProbesPerSecond:   1000,
@@ -122,6 +129,10 @@ func (s *DispatcherSuite) SetUpTest(c *check.C) {
                ArvClient: arvClient,
                AuthToken: arvadostest.AdminToken,
                Registry:  prometheus.NewRegistry(),
+               // Providing a stub queue here prevents
+               // disp.initialize() from making a real one that uses
+               // the integration test servers/database.
+               queue: &test.Queue{},
        }
        // Test cases can modify s.cluster before calling
        // initialize(), and then modify private state before calling
@@ -154,10 +165,9 @@ func (s *DispatcherSuite) arvClientProxy(c *check.C) func(*http.Request) (*url.U
 // artificial errors in order to exercise a variety of code paths.
 func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) {
        Drivers["test"] = s.stubDriver
-       s.disp.setupOnce.Do(s.disp.initialize)
        queue := &test.Queue{
                MaxDispatchAttempts: 5,
-               ChooseType: func(ctr *arvados.Container) (arvados.InstanceType, error) {
+               ChooseType: func(ctr *arvados.Container) ([]arvados.InstanceType, error) {
                        return ChooseInstanceType(s.cluster, ctr)
                },
                Logger: ctxlog.TestLogger(c),
@@ -174,6 +184,7 @@ func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) {
                })
        }
        s.disp.queue = queue
+       s.disp.setupOnce.Do(s.disp.initialize)
 
        var mtx sync.Mutex
        done := make(chan struct{})
@@ -202,26 +213,51 @@ func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) {
                finishContainer(ctr)
                return int(rand.Uint32() & 0x3)
        }
-       n := 0
+       var type4BrokenUntil time.Time
+       var countCapacityErrors int64
+       vmCount := int32(0)
        s.stubDriver.Queue = queue
-       s.stubDriver.SetupVM = func(stubvm *test.StubVM) {
-               n++
+       s.stubDriver.SetupVM = func(stubvm *test.StubVM) error {
+               if pt := stubvm.Instance().ProviderType(); pt == test.InstanceType(6).ProviderType {
+                       c.Logf("test: returning capacity error for instance type %s", pt)
+                       atomic.AddInt64(&countCapacityErrors, 1)
+                       return test.CapacityError{InstanceTypeSpecific: true}
+               }
+               n := atomic.AddInt32(&vmCount, 1)
+               c.Logf("SetupVM: instance %s n=%d", stubvm.Instance(), n)
                stubvm.Boot = time.Now().Add(time.Duration(rand.Int63n(int64(5 * time.Millisecond))))
                stubvm.CrunchRunDetachDelay = time.Duration(rand.Int63n(int64(10 * time.Millisecond)))
                stubvm.ExecuteContainer = executeContainer
                stubvm.CrashRunningContainer = finishContainer
                stubvm.ExtraCrunchRunArgs = "'--runtime-engine=stub' '--foo' '--extra='\\''args'\\'''"
-               switch n % 7 {
-               case 0:
+               switch {
+               case stubvm.Instance().ProviderType() == test.InstanceType(4).ProviderType &&
+                       (type4BrokenUntil.IsZero() || time.Now().Before(type4BrokenUntil)):
+                       // Initially (at least 2*TimeoutBooting), all
+                       // instances of this type are completely
+                       // broken. This ensures the
+                       // boot_outcomes{outcome="failure"} metric is
+                       // not zero.
+                       stubvm.Broken = time.Now()
+                       if type4BrokenUntil.IsZero() {
+                               type4BrokenUntil = time.Now().Add(2 * s.cluster.Containers.CloudVMs.TimeoutBooting.Duration())
+                       }
+               case n%7 == 0:
+                       // some instances start out OK but then stop
+                       // running any commands
                        stubvm.Broken = time.Now().Add(time.Duration(rand.Int63n(90)) * time.Millisecond)
-               case 1:
+               case n%7 == 1:
+                       // some instances never pass a run-probe
                        stubvm.CrunchRunMissing = true
-               case 2:
+               case n%7 == 2:
+                       // some instances start out OK but then start
+                       // reporting themselves as broken
                        stubvm.ReportBroken = time.Now().Add(time.Duration(rand.Int63n(200)) * time.Millisecond)
                default:
                        stubvm.CrunchRunCrashRate = 0.1
                        stubvm.ArvMountDeadlockRate = 0.1
                }
+               return nil
        }
        s.stubDriver.Bugf = c.Errorf
 
@@ -257,6 +293,8 @@ func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) {
                }
        }
 
+       c.Check(countCapacityErrors, check.Not(check.Equals), int64(0))
+
        req := httptest.NewRequest("GET", "/metrics", nil)
        req.Header.Set("Authorization", "Bearer "+s.cluster.ManagementToken)
        resp := httptest.NewRecorder()
@@ -291,11 +329,10 @@ func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) {
        c.Check(resp.Body.String(), check.Matches, `(?ms).*max_concurrent_containers [1-9][0-9e+.]*`)
 }
 
-func (s *DispatcherSuite) TestAPIPermissions(c *check.C) {
+func (s *DispatcherSuite) TestManagementAPI_Permissions(c *check.C) {
        s.cluster.ManagementToken = "abcdefgh"
        Drivers["test"] = s.stubDriver
        s.disp.setupOnce.Do(s.disp.initialize)
-       s.disp.queue = &test.Queue{}
        go s.disp.run()
 
        for _, token := range []string{"abc", ""} {
@@ -313,11 +350,10 @@ func (s *DispatcherSuite) TestAPIPermissions(c *check.C) {
        }
 }
 
-func (s *DispatcherSuite) TestAPIDisabled(c *check.C) {
+func (s *DispatcherSuite) TestManagementAPI_Disabled(c *check.C) {
        s.cluster.ManagementToken = ""
        Drivers["test"] = s.stubDriver
        s.disp.setupOnce.Do(s.disp.initialize)
-       s.disp.queue = &test.Queue{}
        go s.disp.run()
 
        for _, token := range []string{"abc", ""} {
@@ -331,13 +367,121 @@ func (s *DispatcherSuite) TestAPIDisabled(c *check.C) {
        }
 }
 
-func (s *DispatcherSuite) TestInstancesAPI(c *check.C) {
+func (s *DispatcherSuite) TestManagementAPI_Containers(c *check.C) {
+       s.cluster.ManagementToken = "abcdefgh"
+       s.cluster.Containers.CloudVMs.InitialQuotaEstimate = 4
+       Drivers["test"] = s.stubDriver
+       queue := &test.Queue{
+               MaxDispatchAttempts: 5,
+               ChooseType: func(ctr *arvados.Container) ([]arvados.InstanceType, error) {
+                       return ChooseInstanceType(s.cluster, ctr)
+               },
+               Logger: ctxlog.TestLogger(c),
+       }
+       s.stubDriver.Queue = queue
+       s.stubDriver.QuotaMaxInstances = 4
+       s.stubDriver.SetupVM = func(stubvm *test.StubVM) error {
+               if stubvm.Instance().ProviderType() >= test.InstanceType(4).ProviderType {
+                       return test.CapacityError{InstanceTypeSpecific: true}
+               }
+               stubvm.ExecuteContainer = func(ctr arvados.Container) int {
+                       time.Sleep(5 * time.Second)
+                       return 0
+               }
+               return nil
+       }
+       s.disp.queue = queue
+       s.disp.setupOnce.Do(s.disp.initialize)
+
+       go s.disp.run()
+
+       type queueEnt struct {
+               Container        arvados.Container
+               InstanceType     arvados.InstanceType `json:"instance_type"`
+               SchedulingStatus string               `json:"scheduling_status"`
+       }
+       type containersResponse struct {
+               Items []queueEnt
+       }
+       getContainers := func() containersResponse {
+               sQueueRefresh = time.Millisecond
+               req := httptest.NewRequest("GET", "/arvados/v1/dispatch/containers", nil)
+               req.Header.Set("Authorization", "Bearer abcdefgh")
+               resp := httptest.NewRecorder()
+               s.disp.ServeHTTP(resp, req)
+               var cresp containersResponse
+               c.Check(resp.Code, check.Equals, http.StatusOK)
+               err := json.Unmarshal(resp.Body.Bytes(), &cresp)
+               c.Check(err, check.IsNil)
+               return cresp
+       }
+
+       c.Check(getContainers().Items, check.HasLen, 0)
+
+       for i := 0; i < 20; i++ {
+               queue.Containers = append(queue.Containers, arvados.Container{
+                       UUID:     test.ContainerUUID(i),
+                       State:    arvados.ContainerStateQueued,
+                       Priority: int64(100 - i),
+                       RuntimeConstraints: arvados.RuntimeConstraints{
+                               RAM:   int64(i%3+1) << 30,
+                               VCPUs: i%8 + 1,
+                       },
+               })
+       }
+       queue.Update()
+
+       expect := `
+ 0 zzzzz-dz642-000000000000000 (Running) ""
+ 1 zzzzz-dz642-000000000000001 (Running) ""
+ 2 zzzzz-dz642-000000000000002 (Locked) "waiting for suitable instance type to become available: queue position 1"
+ 3 zzzzz-dz642-000000000000003 (Locked) "waiting for suitable instance type to become available: queue position 2"
+ 4 zzzzz-dz642-000000000000004 (Queued) "waiting while cluster is running at capacity: queue position 3"
+ 5 zzzzz-dz642-000000000000005 (Queued) "waiting while cluster is running at capacity: queue position 4"
+ 6 zzzzz-dz642-000000000000006 (Queued) "waiting while cluster is running at capacity: queue position 5"
+ 7 zzzzz-dz642-000000000000007 (Queued) "waiting while cluster is running at capacity: queue position 6"
+ 8 zzzzz-dz642-000000000000008 (Queued) "waiting while cluster is running at capacity: queue position 7"
+ 9 zzzzz-dz642-000000000000009 (Queued) "waiting while cluster is running at capacity: queue position 8"
+ 10 zzzzz-dz642-000000000000010 (Queued) "waiting while cluster is running at capacity: queue position 9"
+ 11 zzzzz-dz642-000000000000011 (Queued) "waiting while cluster is running at capacity: queue position 10"
+ 12 zzzzz-dz642-000000000000012 (Queued) "waiting while cluster is running at capacity: queue position 11"
+ 13 zzzzz-dz642-000000000000013 (Queued) "waiting while cluster is running at capacity: queue position 12"
+ 14 zzzzz-dz642-000000000000014 (Queued) "waiting while cluster is running at capacity: queue position 13"
+ 15 zzzzz-dz642-000000000000015 (Queued) "waiting while cluster is running at capacity: queue position 14"
+ 16 zzzzz-dz642-000000000000016 (Queued) "waiting while cluster is running at capacity: queue position 15"
+ 17 zzzzz-dz642-000000000000017 (Queued) "waiting while cluster is running at capacity: queue position 16"
+ 18 zzzzz-dz642-000000000000018 (Queued) "waiting while cluster is running at capacity: queue position 17"
+ 19 zzzzz-dz642-000000000000019 (Queued) "waiting while cluster is running at capacity: queue position 18"
+`
+       sequence := make(map[string][]string)
+       var summary string
+       for deadline := time.Now().Add(time.Second); time.Now().Before(deadline); time.Sleep(time.Millisecond) {
+               cresp := getContainers()
+               summary = "\n"
+               for i, ent := range cresp.Items {
+                       summary += fmt.Sprintf("% 2d %s (%s) %q\n", i, ent.Container.UUID, ent.Container.State, ent.SchedulingStatus)
+                       s := sequence[ent.Container.UUID]
+                       if len(s) == 0 || s[len(s)-1] != ent.SchedulingStatus {
+                               sequence[ent.Container.UUID] = append(s, ent.SchedulingStatus)
+                       }
+               }
+               if summary == expect {
+                       break
+               }
+       }
+       c.Check(summary, check.Equals, expect)
+       for i := 0; i < 5; i++ {
+               c.Logf("sequence for container %d:\n... %s", i, strings.Join(sequence[test.ContainerUUID(i)], "\n... "))
+       }
+}
+
+func (s *DispatcherSuite) TestManagementAPI_Instances(c *check.C) {
        s.cluster.ManagementToken = "abcdefgh"
        s.cluster.Containers.CloudVMs.TimeoutBooting = arvados.Duration(time.Second)
        Drivers["test"] = s.stubDriver
        s.disp.setupOnce.Do(s.disp.initialize)
-       s.disp.queue = &test.Queue{}
        go s.disp.run()
+       defer s.disp.Close()
 
        type instance struct {
                Instance             string
@@ -365,6 +509,7 @@ func (s *DispatcherSuite) TestInstancesAPI(c *check.C) {
        sr := getInstances()
        c.Check(len(sr.Items), check.Equals, 0)
 
+       s.stubDriver.ErrorRateCreate = 0
        ch := s.disp.pool.Subscribe()
        defer s.disp.pool.Unsubscribe(ch)
        ok := s.disp.pool.Create(test.InstanceType(1))