X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/7c013f919b5db9336833dbe855349600449a993d..cac999f47113207f1b05405ee24daab19ac97de4:/lib/dispatchcloud/dispatcher_test.go diff --git a/lib/dispatchcloud/dispatcher_test.go b/lib/dispatchcloud/dispatcher_test.go index a9ed95c7c3..d651e73a67 100644 --- a/lib/dispatchcloud/dispatcher_test.go +++ b/lib/dispatchcloud/dispatcher_test.go @@ -8,13 +8,16 @@ import ( "context" "crypto/tls" "encoding/json" + "fmt" "io/ioutil" "math/rand" "net/http" "net/http/httptest" "net/url" "os" + "strings" "sync" + "sync/atomic" "time" "git.arvados.org/arvados.git/lib/config" @@ -49,8 +52,10 @@ func (s *DispatcherSuite) SetUpTest(c *check.C) { s.stubDriver = &test.StubDriver{ HostKey: hostpriv, AuthorizedKeys: []ssh.PublicKey{dispatchpub}, + ErrorRateCreate: 0.1, ErrorRateDestroy: 0.1, MinTimeBetweenCreateCalls: time.Millisecond, + QuotaMaxInstances: 10, } // We need the postgresql connection info from the integration @@ -69,6 +74,8 @@ func (s *DispatcherSuite) SetUpTest(c *check.C) { DispatchPrivateKey: string(dispatchprivraw), StaleLockTimeout: arvados.Duration(5 * time.Millisecond), RuntimeEngine: "stub", + MaxDispatchAttempts: 10, + MaximumPriceFactor: 1.5, CloudVMs: arvados.CloudVMsConfig{ Driver: "test", SyncInterval: arvados.Duration(10 * time.Millisecond), @@ -77,6 +84,7 @@ func (s *DispatcherSuite) SetUpTest(c *check.C) { TimeoutProbe: arvados.Duration(15 * time.Millisecond), TimeoutShutdown: arvados.Duration(5 * time.Millisecond), MaxCloudOpsPerSecond: 500, + InitialQuotaEstimate: 8, PollInterval: arvados.Duration(5 * time.Millisecond), ProbeInterval: arvados.Duration(5 * time.Millisecond), MaxProbesPerSecond: 1000, @@ -101,9 +109,14 @@ func (s *DispatcherSuite) SetUpTest(c *check.C) { arvadostest.SetServiceURL(&s.cluster.Services.Controller, "https://"+os.Getenv("ARVADOS_API_HOST")+"/") arvClient, err := arvados.NewClientFromConfig(s.cluster) - c.Check(err, check.IsNil) + c.Assert(err, check.IsNil) + // Disable auto-retry + arvClient.Timeout = 0 - s.error503Server = httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusServiceUnavailable) })) + s.error503Server = httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + c.Logf("503 stub: returning 503") + w.WriteHeader(http.StatusServiceUnavailable) + })) arvClient.Client = &http.Client{ Transport: &http.Transport{ Proxy: s.arvClientProxy(c), @@ -116,6 +129,10 @@ func (s *DispatcherSuite) SetUpTest(c *check.C) { ArvClient: arvClient, AuthToken: arvadostest.AdminToken, Registry: prometheus.NewRegistry(), + // Providing a stub queue here prevents + // disp.initialize() from making a real one that uses + // the integration test servers/database. + queue: &test.Queue{}, } // Test cases can modify s.cluster before calling // initialize(), and then modify private state before calling @@ -134,6 +151,7 @@ func (s *DispatcherSuite) TearDownTest(c *check.C) { func (s *DispatcherSuite) arvClientProxy(c *check.C) func(*http.Request) (*url.URL, error) { return func(req *http.Request) (*url.URL, error) { if req.URL.Path == "/503" { + c.Logf("arvClientProxy: proxying to 503 stub") return url.Parse(s.error503Server.URL) } else { return nil, nil @@ -147,9 +165,9 @@ func (s *DispatcherSuite) arvClientProxy(c *check.C) func(*http.Request) (*url.U // artificial errors in order to exercise a variety of code paths. func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) { Drivers["test"] = s.stubDriver - s.disp.setupOnce.Do(s.disp.initialize) queue := &test.Queue{ - ChooseType: func(ctr *arvados.Container) (arvados.InstanceType, error) { + MaxDispatchAttempts: 5, + ChooseType: func(ctr *arvados.Container) ([]arvados.InstanceType, error) { return ChooseInstanceType(s.cluster, ctr) }, Logger: ctxlog.TestLogger(c), @@ -166,6 +184,7 @@ func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) { }) } s.disp.queue = queue + s.disp.setupOnce.Do(s.disp.initialize) var mtx sync.Mutex done := make(chan struct{}) @@ -183,6 +202,7 @@ func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) { delete(waiting, ctr.UUID) if len(waiting) == 100 { // trigger scheduler maxConcurrency limit + c.Logf("test: requesting 503 in order to trigger maxConcurrency limit") s.disp.ArvClient.RequestAndDecode(nil, "GET", "503", nil, nil) } if len(waiting) == 0 { @@ -193,26 +213,51 @@ func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) { finishContainer(ctr) return int(rand.Uint32() & 0x3) } - n := 0 + var type4BrokenUntil time.Time + var countCapacityErrors int64 + vmCount := int32(0) s.stubDriver.Queue = queue - s.stubDriver.SetupVM = func(stubvm *test.StubVM) { - n++ + s.stubDriver.SetupVM = func(stubvm *test.StubVM) error { + if pt := stubvm.Instance().ProviderType(); pt == test.InstanceType(6).ProviderType { + c.Logf("test: returning capacity error for instance type %s", pt) + atomic.AddInt64(&countCapacityErrors, 1) + return test.CapacityError{InstanceTypeSpecific: true} + } + n := atomic.AddInt32(&vmCount, 1) + c.Logf("SetupVM: instance %s n=%d", stubvm.Instance(), n) stubvm.Boot = time.Now().Add(time.Duration(rand.Int63n(int64(5 * time.Millisecond)))) stubvm.CrunchRunDetachDelay = time.Duration(rand.Int63n(int64(10 * time.Millisecond))) stubvm.ExecuteContainer = executeContainer stubvm.CrashRunningContainer = finishContainer stubvm.ExtraCrunchRunArgs = "'--runtime-engine=stub' '--foo' '--extra='\\''args'\\'''" - switch n % 7 { - case 0: + switch { + case stubvm.Instance().ProviderType() == test.InstanceType(4).ProviderType && + (type4BrokenUntil.IsZero() || time.Now().Before(type4BrokenUntil)): + // Initially (at least 2*TimeoutBooting), all + // instances of this type are completely + // broken. This ensures the + // boot_outcomes{outcome="failure"} metric is + // not zero. + stubvm.Broken = time.Now() + if type4BrokenUntil.IsZero() { + type4BrokenUntil = time.Now().Add(2 * s.cluster.Containers.CloudVMs.TimeoutBooting.Duration()) + } + case n%7 == 0: + // some instances start out OK but then stop + // running any commands stubvm.Broken = time.Now().Add(time.Duration(rand.Int63n(90)) * time.Millisecond) - case 1: + case n%7 == 1: + // some instances never pass a run-probe stubvm.CrunchRunMissing = true - case 2: + case n%7 == 2: + // some instances start out OK but then start + // reporting themselves as broken stubvm.ReportBroken = time.Now().Add(time.Duration(rand.Int63n(200)) * time.Millisecond) default: stubvm.CrunchRunCrashRate = 0.1 stubvm.ArvMountDeadlockRate = 0.1 } + return nil } s.stubDriver.Bugf = c.Errorf @@ -226,9 +271,9 @@ func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) { select { case <-done: // loop will end because len(waiting)==0 - case <-time.After(3 * time.Second): + case <-time.After(5 * time.Second): if len(waiting) >= waswaiting { - c.Fatalf("timed out; no progress in 3s while waiting for %d containers: %q", len(waiting), waiting) + c.Fatalf("timed out; no progress in 5 s while waiting for %d containers: %q", len(waiting), waiting) } } } @@ -248,6 +293,8 @@ func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) { } } + c.Check(countCapacityErrors, check.Not(check.Equals), int64(0)) + req := httptest.NewRequest("GET", "/metrics", nil) req.Header.Set("Authorization", "Bearer "+s.cluster.ManagementToken) resp := httptest.NewRecorder() @@ -282,11 +329,10 @@ func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) { c.Check(resp.Body.String(), check.Matches, `(?ms).*max_concurrent_containers [1-9][0-9e+.]*`) } -func (s *DispatcherSuite) TestAPIPermissions(c *check.C) { +func (s *DispatcherSuite) TestManagementAPI_Permissions(c *check.C) { s.cluster.ManagementToken = "abcdefgh" Drivers["test"] = s.stubDriver s.disp.setupOnce.Do(s.disp.initialize) - s.disp.queue = &test.Queue{} go s.disp.run() for _, token := range []string{"abc", ""} { @@ -304,11 +350,10 @@ func (s *DispatcherSuite) TestAPIPermissions(c *check.C) { } } -func (s *DispatcherSuite) TestAPIDisabled(c *check.C) { +func (s *DispatcherSuite) TestManagementAPI_Disabled(c *check.C) { s.cluster.ManagementToken = "" Drivers["test"] = s.stubDriver s.disp.setupOnce.Do(s.disp.initialize) - s.disp.queue = &test.Queue{} go s.disp.run() for _, token := range []string{"abc", ""} { @@ -322,13 +367,121 @@ func (s *DispatcherSuite) TestAPIDisabled(c *check.C) { } } -func (s *DispatcherSuite) TestInstancesAPI(c *check.C) { +func (s *DispatcherSuite) TestManagementAPI_Containers(c *check.C) { + s.cluster.ManagementToken = "abcdefgh" + s.cluster.Containers.CloudVMs.InitialQuotaEstimate = 4 + Drivers["test"] = s.stubDriver + queue := &test.Queue{ + MaxDispatchAttempts: 5, + ChooseType: func(ctr *arvados.Container) ([]arvados.InstanceType, error) { + return ChooseInstanceType(s.cluster, ctr) + }, + Logger: ctxlog.TestLogger(c), + } + s.stubDriver.Queue = queue + s.stubDriver.QuotaMaxInstances = 4 + s.stubDriver.SetupVM = func(stubvm *test.StubVM) error { + if stubvm.Instance().ProviderType() >= test.InstanceType(4).ProviderType { + return test.CapacityError{InstanceTypeSpecific: true} + } + stubvm.ExecuteContainer = func(ctr arvados.Container) int { + time.Sleep(5 * time.Second) + return 0 + } + return nil + } + s.disp.queue = queue + s.disp.setupOnce.Do(s.disp.initialize) + + go s.disp.run() + + type queueEnt struct { + Container arvados.Container + InstanceType arvados.InstanceType `json:"instance_type"` + SchedulingStatus string `json:"scheduling_status"` + } + type containersResponse struct { + Items []queueEnt + } + getContainers := func() containersResponse { + schedQueueRefresh = time.Millisecond + req := httptest.NewRequest("GET", "/arvados/v1/dispatch/containers", nil) + req.Header.Set("Authorization", "Bearer abcdefgh") + resp := httptest.NewRecorder() + s.disp.ServeHTTP(resp, req) + var cresp containersResponse + c.Check(resp.Code, check.Equals, http.StatusOK) + err := json.Unmarshal(resp.Body.Bytes(), &cresp) + c.Check(err, check.IsNil) + return cresp + } + + c.Check(getContainers().Items, check.HasLen, 0) + + for i := 0; i < 20; i++ { + queue.Containers = append(queue.Containers, arvados.Container{ + UUID: test.ContainerUUID(i), + State: arvados.ContainerStateQueued, + Priority: int64(100 - i), + RuntimeConstraints: arvados.RuntimeConstraints{ + RAM: int64(i%3+1) << 30, + VCPUs: i%8 + 1, + }, + }) + } + queue.Update() + + expect := ` + 0 zzzzz-dz642-000000000000000 (Running) "" + 1 zzzzz-dz642-000000000000001 (Running) "" + 2 zzzzz-dz642-000000000000002 (Locked) "waiting for suitable instance type to become available: queue position 1" + 3 zzzzz-dz642-000000000000003 (Locked) "waiting for suitable instance type to become available: queue position 2" + 4 zzzzz-dz642-000000000000004 (Queued) "waiting while cluster is running at capacity: queue position 3" + 5 zzzzz-dz642-000000000000005 (Queued) "waiting while cluster is running at capacity: queue position 4" + 6 zzzzz-dz642-000000000000006 (Queued) "waiting while cluster is running at capacity: queue position 5" + 7 zzzzz-dz642-000000000000007 (Queued) "waiting while cluster is running at capacity: queue position 6" + 8 zzzzz-dz642-000000000000008 (Queued) "waiting while cluster is running at capacity: queue position 7" + 9 zzzzz-dz642-000000000000009 (Queued) "waiting while cluster is running at capacity: queue position 8" + 10 zzzzz-dz642-000000000000010 (Queued) "waiting while cluster is running at capacity: queue position 9" + 11 zzzzz-dz642-000000000000011 (Queued) "waiting while cluster is running at capacity: queue position 10" + 12 zzzzz-dz642-000000000000012 (Queued) "waiting while cluster is running at capacity: queue position 11" + 13 zzzzz-dz642-000000000000013 (Queued) "waiting while cluster is running at capacity: queue position 12" + 14 zzzzz-dz642-000000000000014 (Queued) "waiting while cluster is running at capacity: queue position 13" + 15 zzzzz-dz642-000000000000015 (Queued) "waiting while cluster is running at capacity: queue position 14" + 16 zzzzz-dz642-000000000000016 (Queued) "waiting while cluster is running at capacity: queue position 15" + 17 zzzzz-dz642-000000000000017 (Queued) "waiting while cluster is running at capacity: queue position 16" + 18 zzzzz-dz642-000000000000018 (Queued) "waiting while cluster is running at capacity: queue position 17" + 19 zzzzz-dz642-000000000000019 (Queued) "waiting while cluster is running at capacity: queue position 18" +` + sequence := make(map[string][]string) + var summary string + for deadline := time.Now().Add(time.Second); time.Now().Before(deadline); time.Sleep(time.Millisecond) { + cresp := getContainers() + summary = "\n" + for i, ent := range cresp.Items { + summary += fmt.Sprintf("% 2d %s (%s) %q\n", i, ent.Container.UUID, ent.Container.State, ent.SchedulingStatus) + s := sequence[ent.Container.UUID] + if len(s) == 0 || s[len(s)-1] != ent.SchedulingStatus { + sequence[ent.Container.UUID] = append(s, ent.SchedulingStatus) + } + } + if summary == expect { + break + } + } + c.Check(summary, check.Equals, expect) + for i := 0; i < 5; i++ { + c.Logf("sequence for container %d:\n... %s", i, strings.Join(sequence[test.ContainerUUID(i)], "\n... ")) + } +} + +func (s *DispatcherSuite) TestManagementAPI_Instances(c *check.C) { s.cluster.ManagementToken = "abcdefgh" s.cluster.Containers.CloudVMs.TimeoutBooting = arvados.Duration(time.Second) Drivers["test"] = s.stubDriver s.disp.setupOnce.Do(s.disp.initialize) - s.disp.queue = &test.Queue{} go s.disp.run() + defer s.disp.Close() type instance struct { Instance string @@ -356,6 +509,7 @@ func (s *DispatcherSuite) TestInstancesAPI(c *check.C) { sr := getInstances() c.Check(len(sr.Items), check.Equals, 0) + s.stubDriver.ErrorRateCreate = 0 ch := s.disp.pool.Subscribe() defer s.disp.pool.Unsubscribe(ch) ok := s.disp.pool.Create(test.InstanceType(1))