X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/a0398ebd1c50b1be2433c109af6bb0d263c54ea5..d6446b03e2f5d5079a870bdd7b963456dc12b485:/lib/dispatchcloud/dispatcher_test.go diff --git a/lib/dispatchcloud/dispatcher_test.go b/lib/dispatchcloud/dispatcher_test.go index 9f1eb098e0..33d7f4e9ac 100644 --- a/lib/dispatchcloud/dispatcher_test.go +++ b/lib/dispatchcloud/dispatcher_test.go @@ -6,15 +6,19 @@ package dispatchcloud import ( "context" + "crypto/tls" "encoding/json" "io/ioutil" "math/rand" "net/http" "net/http/httptest" + "net/url" "os" "sync" + "sync/atomic" "time" + "git.arvados.org/arvados.git/lib/config" "git.arvados.org/arvados.git/lib/dispatchcloud/test" "git.arvados.org/arvados.git/sdk/go/arvados" "git.arvados.org/arvados.git/sdk/go/arvadostest" @@ -27,11 +31,12 @@ import ( var _ = check.Suite(&DispatcherSuite{}) type DispatcherSuite struct { - ctx context.Context - cancel context.CancelFunc - cluster *arvados.Cluster - stubDriver *test.StubDriver - disp *dispatcher + ctx context.Context + cancel context.CancelFunc + cluster *arvados.Cluster + stubDriver *test.StubDriver + disp *dispatcher + error503Server *httptest.Server } func (s *DispatcherSuite) SetUpTest(c *check.C) { @@ -45,15 +50,30 @@ func (s *DispatcherSuite) SetUpTest(c *check.C) { s.stubDriver = &test.StubDriver{ HostKey: hostpriv, AuthorizedKeys: []ssh.PublicKey{dispatchpub}, + ErrorRateCreate: 0.1, ErrorRateDestroy: 0.1, MinTimeBetweenCreateCalls: time.Millisecond, + QuotaMaxInstances: 10, } + // We need the postgresql connection info from the integration + // test config. + cfg, err := config.NewLoader(nil, ctxlog.FromContext(s.ctx)).Load() + c.Assert(err, check.IsNil) + testcluster, err := cfg.GetCluster("") + c.Assert(err, check.IsNil) + s.cluster = &arvados.Cluster{ ManagementToken: "test-management-token", + PostgreSQL: testcluster.PostgreSQL, Containers: arvados.ContainersConfig{ - DispatchPrivateKey: string(dispatchprivraw), - StaleLockTimeout: arvados.Duration(5 * time.Millisecond), + CrunchRunCommand: "crunch-run", + CrunchRunArgumentsList: []string{"--foo", "--extra='args'"}, + DispatchPrivateKey: string(dispatchprivraw), + StaleLockTimeout: arvados.Duration(5 * time.Millisecond), + RuntimeEngine: "stub", + MaxDispatchAttempts: 10, + MaximumPriceFactor: 1.5, CloudVMs: arvados.CloudVMsConfig{ Driver: "test", SyncInterval: arvados.Duration(10 * time.Millisecond), @@ -62,6 +82,7 @@ func (s *DispatcherSuite) SetUpTest(c *check.C) { TimeoutProbe: arvados.Duration(15 * time.Millisecond), TimeoutShutdown: arvados.Duration(5 * time.Millisecond), MaxCloudOpsPerSecond: 500, + InitialQuotaEstimate: 8, PollInterval: arvados.Duration(5 * time.Millisecond), ProbeInterval: arvados.Duration(5 * time.Millisecond), MaxProbesPerSecond: 1000, @@ -86,7 +107,19 @@ func (s *DispatcherSuite) SetUpTest(c *check.C) { arvadostest.SetServiceURL(&s.cluster.Services.Controller, "https://"+os.Getenv("ARVADOS_API_HOST")+"/") arvClient, err := arvados.NewClientFromConfig(s.cluster) - c.Check(err, check.IsNil) + c.Assert(err, check.IsNil) + // Disable auto-retry + arvClient.Timeout = 0 + + s.error503Server = httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + c.Logf("503 stub: returning 503") + w.WriteHeader(http.StatusServiceUnavailable) + })) + arvClient.Client = &http.Client{ + Transport: &http.Transport{ + Proxy: s.arvClientProxy(c), + TLSClientConfig: &tls.Config{ + InsecureSkipVerify: true}}} s.disp = &dispatcher{ Cluster: s.cluster, @@ -103,6 +136,21 @@ func (s *DispatcherSuite) SetUpTest(c *check.C) { func (s *DispatcherSuite) TearDownTest(c *check.C) { s.cancel() s.disp.Close() + s.error503Server.Close() +} + +// Intercept outgoing API requests for "/503" and respond HTTP +// 503. This lets us force (*arvados.Client)Last503() to return +// something. +func (s *DispatcherSuite) arvClientProxy(c *check.C) func(*http.Request) (*url.URL, error) { + return func(req *http.Request) (*url.URL, error) { + if req.URL.Path == "/503" { + c.Logf("arvClientProxy: proxying to 503 stub") + return url.Parse(s.error503Server.URL) + } else { + return nil, nil + } + } } // DispatchToStubDriver checks that the dispatcher wires everything @@ -113,7 +161,8 @@ func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) { Drivers["test"] = s.stubDriver s.disp.setupOnce.Do(s.disp.initialize) queue := &test.Queue{ - ChooseType: func(ctr *arvados.Container) (arvados.InstanceType, error) { + MaxDispatchAttempts: 5, + ChooseType: func(ctr *arvados.Container) ([]arvados.InstanceType, error) { return ChooseInstanceType(s.cluster, ctr) }, Logger: ctxlog.TestLogger(c), @@ -145,6 +194,11 @@ func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) { return } delete(waiting, ctr.UUID) + if len(waiting) == 100 { + // trigger scheduler maxConcurrency limit + c.Logf("test: requesting 503 in order to trigger maxConcurrency limit") + s.disp.ArvClient.RequestAndDecode(nil, "GET", "503", nil, nil) + } if len(waiting) == 0 { close(done) } @@ -153,25 +207,43 @@ func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) { finishContainer(ctr) return int(rand.Uint32() & 0x3) } + var countCapacityErrors int64 n := 0 s.stubDriver.Queue = queue - s.stubDriver.SetupVM = func(stubvm *test.StubVM) { + s.stubDriver.SetupVM = func(stubvm *test.StubVM) error { + if pt := stubvm.Instance().ProviderType(); pt == test.InstanceType(6).ProviderType { + c.Logf("test: returning capacity error for instance type %s", pt) + atomic.AddInt64(&countCapacityErrors, 1) + return test.CapacityError{InstanceTypeSpecific: true} + } n++ stubvm.Boot = time.Now().Add(time.Duration(rand.Int63n(int64(5 * time.Millisecond)))) stubvm.CrunchRunDetachDelay = time.Duration(rand.Int63n(int64(10 * time.Millisecond))) stubvm.ExecuteContainer = executeContainer stubvm.CrashRunningContainer = finishContainer - switch n % 7 { - case 0: + stubvm.ExtraCrunchRunArgs = "'--runtime-engine=stub' '--foo' '--extra='\\''args'\\'''" + switch { + case n%7 == 0: + // some instances start out OK but then stop + // running any commands stubvm.Broken = time.Now().Add(time.Duration(rand.Int63n(90)) * time.Millisecond) - case 1: + case n%7 == 1: + // some instances never pass a run-probe stubvm.CrunchRunMissing = true - case 2: + case n%7 == 2: + // some instances start out OK but then start + // reporting themselves as broken stubvm.ReportBroken = time.Now().Add(time.Duration(rand.Int63n(200)) * time.Millisecond) + case n == 3: + // 1 instance is completely broken, ensuring + // the boot_outcomes{outcome="failure"} metric + // is not zero + stubvm.CrunchRunCrashRate = 1 default: stubvm.CrunchRunCrashRate = 0.1 stubvm.ArvMountDeadlockRate = 0.1 } + return nil } s.stubDriver.Bugf = c.Errorf @@ -180,12 +252,18 @@ func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) { err := s.disp.CheckHealth() c.Check(err, check.IsNil) - select { - case <-done: - c.Logf("containers finished (%s), waiting for instances to shutdown and queue to clear", time.Since(start)) - case <-time.After(10 * time.Second): - c.Fatalf("timed out; still waiting for %d containers: %q", len(waiting), waiting) + for len(waiting) > 0 { + waswaiting := len(waiting) + select { + case <-done: + // loop will end because len(waiting)==0 + case <-time.After(5 * time.Second): + if len(waiting) >= waswaiting { + c.Fatalf("timed out; no progress in 5 s while waiting for %d containers: %q", len(waiting), waiting) + } + } } + c.Logf("containers finished (%s), waiting for instances to shutdown and queue to clear", time.Since(start)) deadline := time.Now().Add(5 * time.Second) for range time.NewTicker(10 * time.Millisecond).C { @@ -201,6 +279,8 @@ func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) { } } + c.Check(countCapacityErrors, check.Not(check.Equals), int64(0)) + req := httptest.NewRequest("GET", "/metrics", nil) req.Header.Set("Authorization", "Bearer "+s.cluster.ManagementToken) resp := httptest.NewRecorder() @@ -211,7 +291,7 @@ func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) { c.Check(resp.Body.String(), check.Matches, `(?ms).*driver_operations{error="0",operation="Destroy"} [^0].*`) c.Check(resp.Body.String(), check.Matches, `(?ms).*driver_operations{error="1",operation="Create"} [^0].*`) c.Check(resp.Body.String(), check.Matches, `(?ms).*driver_operations{error="1",operation="List"} 0\n.*`) - c.Check(resp.Body.String(), check.Matches, `(?ms).*boot_outcomes{outcome="aborted"} 0.*`) + c.Check(resp.Body.String(), check.Matches, `(?ms).*boot_outcomes{outcome="aborted"} [0-9]+\n.*`) c.Check(resp.Body.String(), check.Matches, `(?ms).*boot_outcomes{outcome="disappeared"} [^0].*`) c.Check(resp.Body.String(), check.Matches, `(?ms).*boot_outcomes{outcome="failure"} [^0].*`) c.Check(resp.Body.String(), check.Matches, `(?ms).*boot_outcomes{outcome="success"} [^0].*`) @@ -227,6 +307,12 @@ func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) { c.Check(resp.Body.String(), check.Matches, `(?ms).*time_from_shutdown_request_to_disappearance_seconds_sum [0-9.]*`) c.Check(resp.Body.String(), check.Matches, `(?ms).*time_from_queue_to_crunch_run_seconds_count [0-9]*`) c.Check(resp.Body.String(), check.Matches, `(?ms).*time_from_queue_to_crunch_run_seconds_sum [0-9e+.]*`) + c.Check(resp.Body.String(), check.Matches, `(?ms).*run_probe_duration_seconds_count{outcome="success"} [0-9]*`) + c.Check(resp.Body.String(), check.Matches, `(?ms).*run_probe_duration_seconds_sum{outcome="success"} [0-9e+.]*`) + c.Check(resp.Body.String(), check.Matches, `(?ms).*run_probe_duration_seconds_count{outcome="fail"} [0-9]*`) + c.Check(resp.Body.String(), check.Matches, `(?ms).*run_probe_duration_seconds_sum{outcome="fail"} [0-9e+.]*`) + c.Check(resp.Body.String(), check.Matches, `(?ms).*last_503_time [1-9][0-9e+.]*`) + c.Check(resp.Body.String(), check.Matches, `(?ms).*max_concurrent_containers [1-9][0-9e+.]*`) } func (s *DispatcherSuite) TestAPIPermissions(c *check.C) { @@ -303,6 +389,7 @@ func (s *DispatcherSuite) TestInstancesAPI(c *check.C) { sr := getInstances() c.Check(len(sr.Items), check.Equals, 0) + s.stubDriver.ErrorRateCreate = 0 ch := s.disp.pool.Subscribe() defer s.disp.pool.Unsubscribe(ch) ok := s.disp.pool.Create(test.InstanceType(1))