X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/18e79caebbda5a4350462a2b22e5af36f8e4b699..40dcd44173ea6d6cdc53206766875151eac7b0fd:/lib/controller/handler_test.go diff --git a/lib/controller/handler_test.go b/lib/controller/handler_test.go index 1af3ba3626..0c50a6c4be 100644 --- a/lib/controller/handler_test.go +++ b/lib/controller/handler_test.go @@ -16,6 +16,7 @@ import ( "net/url" "os" "strings" + "sync" "testing" "time" @@ -37,11 +38,12 @@ func Test(t *testing.T) { var _ = check.Suite(&HandlerSuite{}) type HandlerSuite struct { - cluster *arvados.Cluster - handler *Handler - logbuf *bytes.Buffer - ctx context.Context - cancel context.CancelFunc + cluster *arvados.Cluster + handler *Handler + railsSpy *arvadostest.Proxy + logbuf *bytes.Buffer + ctx context.Context + cancel context.CancelFunc } func (s *HandlerSuite) SetUpTest(c *check.C) { @@ -55,6 +57,8 @@ func (s *HandlerSuite) SetUpTest(c *check.C) { s.cluster.API.RequestTimeout = arvados.Duration(5 * time.Minute) s.cluster.TLS.Insecure = true arvadostest.SetServiceURL(&s.cluster.Services.RailsAPI, "https://"+os.Getenv("ARVADOS_TEST_API_HOST")) + s.railsSpy = arvadostest.NewProxy(c, s.cluster.Services.RailsAPI) + arvadostest.SetServiceURL(&s.cluster.Services.RailsAPI, s.railsSpy.URL.String()) arvadostest.SetServiceURL(&s.cluster.Services.Controller, "http://localhost:/") s.handler = newHandler(s.ctx, s.cluster, "", prometheus.NewRegistry()).(*Handler) } @@ -93,6 +97,204 @@ func (s *HandlerSuite) TestConfigExport(c *check.C) { } } +func (s *HandlerSuite) TestDiscoveryDocCache(c *check.C) { + countRailsReqs := func() int { + n := 0 + for _, req := range s.railsSpy.RequestDumps { + if bytes.Contains(req, []byte("/discovery/v1/apis/arvados/v1/rest")) { + n++ + } + } + return n + } + getDD := func() int { + req := httptest.NewRequest(http.MethodGet, "/discovery/v1/apis/arvados/v1/rest", nil) + resp := httptest.NewRecorder() + s.handler.ServeHTTP(resp, req) + if resp.Code == http.StatusOK { + var dd arvados.DiscoveryDocument + err := json.Unmarshal(resp.Body.Bytes(), &dd) + c.Check(err, check.IsNil) + c.Check(dd.Schemas["Collection"].UUIDPrefix, check.Equals, "4zz18") + } + return resp.Code + } + getDDConcurrently := func(n int, expectCode int, checkArgs ...interface{}) *sync.WaitGroup { + var wg sync.WaitGroup + for i := 0; i < n; i++ { + wg.Add(1) + go func() { + defer wg.Done() + c.Check(getDD(), check.Equals, append([]interface{}{expectCode}, checkArgs...)...) + }() + } + return &wg + } + clearCache := func() { + for _, ent := range s.handler.cache { + ent.refreshLock.Lock() + ent.mtx.Lock() + ent.body, ent.header, ent.refreshAfter = nil, nil, time.Time{} + ent.mtx.Unlock() + ent.refreshLock.Unlock() + } + } + waitPendingUpdates := func() { + for _, ent := range s.handler.cache { + ent.refreshLock.Lock() + defer ent.refreshLock.Unlock() + ent.mtx.Lock() + defer ent.mtx.Unlock() + } + } + refreshNow := func() { + waitPendingUpdates() + for _, ent := range s.handler.cache { + ent.refreshAfter = time.Now() + } + } + expireNow := func() { + waitPendingUpdates() + for _, ent := range s.handler.cache { + ent.expireAfter = time.Now() + } + } + + // Easy path: first req fetches, subsequent reqs use cache. + c.Check(countRailsReqs(), check.Equals, 0) + c.Check(getDD(), check.Equals, http.StatusOK) + c.Check(countRailsReqs(), check.Equals, 1) + c.Check(getDD(), check.Equals, http.StatusOK) + c.Check(countRailsReqs(), check.Equals, 1) + c.Check(getDD(), check.Equals, http.StatusOK) + c.Check(countRailsReqs(), check.Equals, 1) + + // To guarantee we have concurrent requests, we set up + // railsSpy to hold up the Handler's outgoing requests until + // we send to (or close) holdReqs. + holdReqs := make(chan struct{}) + s.railsSpy.Director = func(*http.Request) { + <-holdReqs + } + + // Race at startup: first req fetches, other concurrent reqs + // wait for the initial fetch to complete, then all return. + clearCache() + reqsBefore := countRailsReqs() + wg := getDDConcurrently(5, http.StatusOK, check.Commentf("race at startup")) + close(holdReqs) + wg.Wait() + c.Check(countRailsReqs(), check.Equals, reqsBefore+1) + + // Race after expiry: concurrent reqs return the cached data + // but initiate a new fetch in the background. + refreshNow() + holdReqs = make(chan struct{}) + wg = getDDConcurrently(5, http.StatusOK, check.Commentf("race after expiry")) + reqsBefore = countRailsReqs() + close(holdReqs) + wg.Wait() + for deadline := time.Now().Add(time.Second); time.Now().Before(deadline) && countRailsReqs() < reqsBefore+1; { + time.Sleep(time.Second / 100) + } + c.Check(countRailsReqs(), check.Equals, reqsBefore+1) + + // Configure railsSpy to return an error or bad content + // depending on flags. + var wantError, wantBadContent bool + s.railsSpy.Director = func(req *http.Request) { + if wantError { + req.Method = "MAKE-COFFEE" + } else if wantBadContent { + req.URL.Path = "/_health/ping" + req.Header.Set("Authorization", "Bearer "+arvadostest.ManagementToken) + } + } + + // Error at startup (empty cache) => caller gets error, and we + // make an upstream attempt for each incoming request because + // we have nothing better to return + clearCache() + wantError, wantBadContent = true, false + reqsBefore = countRailsReqs() + holdReqs = make(chan struct{}) + wg = getDDConcurrently(5, http.StatusBadGateway, check.Commentf("error at startup")) + close(holdReqs) + wg.Wait() + c.Check(countRailsReqs(), check.Equals, reqsBefore+5) + + // Response status is OK but body is not a discovery document + wantError, wantBadContent = false, true + reqsBefore = countRailsReqs() + c.Check(getDD(), check.Equals, http.StatusBadGateway) + c.Check(countRailsReqs(), check.Equals, reqsBefore+1) + + // Error condition clears => caller gets OK, cache is warmed + // up + wantError, wantBadContent = false, false + reqsBefore = countRailsReqs() + getDDConcurrently(5, http.StatusOK, check.Commentf("success after errors at startup")).Wait() + c.Check(countRailsReqs(), check.Equals, reqsBefore+1) + + // Error with warm cache => caller gets OK (with no attempt to + // re-fetch) + wantError, wantBadContent = true, false + reqsBefore = countRailsReqs() + getDDConcurrently(5, http.StatusOK, check.Commentf("error with warm cache")).Wait() + c.Check(countRailsReqs(), check.Equals, reqsBefore) + + // Error with stale cache => caller gets OK with stale data + // while the re-fetch is attempted in the background + refreshNow() + wantError, wantBadContent = true, false + reqsBefore = countRailsReqs() + holdReqs = make(chan struct{}) + getDDConcurrently(5, http.StatusOK, check.Commentf("error with stale cache")).Wait() + close(holdReqs) + // Only one attempt to re-fetch (holdReqs ensured the first + // update took long enough for the last incoming request to + // arrive) + c.Check(countRailsReqs(), check.Equals, reqsBefore+1) + + refreshNow() + wantError, wantBadContent = false, false + reqsBefore = countRailsReqs() + holdReqs = make(chan struct{}) + getDDConcurrently(5, http.StatusOK, check.Commentf("refresh cache after error condition clears")).Wait() + close(holdReqs) + waitPendingUpdates() + c.Check(countRailsReqs(), check.Equals, reqsBefore+1) + + // Make sure expireAfter is getting set + waitPendingUpdates() + exp := s.handler.cache["/discovery/v1/apis/arvados/v1/rest"].expireAfter.Sub(time.Now()) + c.Check(exp > cacheTTL, check.Equals, true) + c.Check(exp < cacheExpire, check.Equals, true) + + // After the cache *expires* it behaves as if uninitialized: + // each incoming request does a new upstream request until one + // succeeds. + // + // First check failure after expiry: + expireNow() + wantError, wantBadContent = true, false + reqsBefore = countRailsReqs() + holdReqs = make(chan struct{}) + wg = getDDConcurrently(5, http.StatusBadGateway, check.Commentf("error after expiry")) + close(holdReqs) + wg.Wait() + c.Check(countRailsReqs(), check.Equals, reqsBefore+5) + + // Success after expiry: + wantError, wantBadContent = false, false + reqsBefore = countRailsReqs() + holdReqs = make(chan struct{}) + wg = getDDConcurrently(5, http.StatusOK, check.Commentf("success after expiry")) + close(holdReqs) + wg.Wait() + c.Check(countRailsReqs(), check.Equals, reqsBefore+1) +} + func (s *HandlerSuite) TestVocabularyExport(c *check.C) { voc := `{ "strict_tags": false, @@ -210,7 +412,7 @@ func (s *HandlerSuite) TestProxyDiscoveryDoc(c *check.C) { // etc. func (s *HandlerSuite) TestRequestCancel(c *check.C) { ctx, cancel := context.WithCancel(context.Background()) - req := httptest.NewRequest("GET", "/discovery/v1/apis/arvados/v1/rest", nil).WithContext(ctx) + req := httptest.NewRequest("GET", "/static/login_failure", nil).WithContext(ctx) resp := httptest.NewRecorder() cancel() s.handler.ServeHTTP(resp, req) @@ -437,7 +639,7 @@ func (s *HandlerSuite) TestGetObjects(c *check.C) { testCases := map[string]map[string]bool{ "api_clients/" + arvadostest.TrustedWorkbenchAPIClientUUID: nil, "api_client_authorizations/" + auth.UUID: {"href": true, "modified_by_client_uuid": true, "modified_by_user_uuid": true}, - "authorized_keys/" + arvadostest.AdminAuthorizedKeysUUID: nil, + "authorized_keys/" + arvadostest.AdminAuthorizedKeysUUID: {"href": true}, "collections/" + arvadostest.CollectionWithUniqueWordsUUID: {"href": true}, "containers/" + arvadostest.RunningContainerUUID: nil, "container_requests/" + arvadostest.QueuedContainerRequestUUID: nil, @@ -563,3 +765,59 @@ func (s *HandlerSuite) TestLogActivity(c *check.C) { c.Check(rows, check.Equals, 1, check.Commentf("expect 1 row for user uuid %s", userUUID)) } } + +func (s *HandlerSuite) TestLogLimiting(c *check.C) { + s.handler.Cluster.API.MaxConcurrentRequests = 2 + s.handler.Cluster.API.LogCreateRequestFraction = 0.5 + + logreq := httptest.NewRequest("POST", "/arvados/v1/logs", strings.NewReader(`{ + "log": { + "event_type": "test" + } + }`)) + logreq.Header.Set("Authorization", "Bearer "+arvadostest.ActiveToken) + + // Log create succeeds + for i := 0; i < 2; i++ { + resp := httptest.NewRecorder() + s.handler.ServeHTTP(resp, logreq) + c.Check(resp.Code, check.Equals, http.StatusOK) + var lg arvados.Log + err := json.Unmarshal(resp.Body.Bytes(), &lg) + c.Check(err, check.IsNil) + c.Check(lg.UUID, check.Matches, "zzzzz-57u5n-.*") + } + + // Pretend there's a log create in flight + s.handler.limitLogCreate <- struct{}{} + + // Log create should be rejected now + resp := httptest.NewRecorder() + s.handler.ServeHTTP(resp, logreq) + c.Check(resp.Code, check.Equals, http.StatusServiceUnavailable) + + // Other requests still succeed + req := httptest.NewRequest("GET", "/arvados/v1/users/current", nil) + req.Header.Set("Authorization", "Bearer "+arvadostest.ActiveToken) + resp = httptest.NewRecorder() + s.handler.ServeHTTP(resp, req) + c.Check(resp.Code, check.Equals, http.StatusOK) + var u arvados.User + err := json.Unmarshal(resp.Body.Bytes(), &u) + c.Check(err, check.IsNil) + c.Check(u.UUID, check.Equals, arvadostest.ActiveUserUUID) + + // log create still fails + resp = httptest.NewRecorder() + s.handler.ServeHTTP(resp, logreq) + c.Check(resp.Code, check.Equals, http.StatusServiceUnavailable) + + // Pretend in-flight log is done + <-s.handler.limitLogCreate + + // log create succeeds again + resp = httptest.NewRecorder() + s.handler.ServeHTTP(resp, logreq) + c.Check(resp.Code, check.Equals, http.StatusOK) + +}