X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/570e4e953abc93ee6c9a3fc951854be615ff169a..72ce6ed68a7af7de8f4afbee6b7cfb73763a5c3f:/lib/controller/handler_test.go diff --git a/lib/controller/handler_test.go b/lib/controller/handler_test.go index 9b71c349a4..fcd70d7cc5 100644 --- a/lib/controller/handler_test.go +++ b/lib/controller/handler_test.go @@ -5,18 +5,22 @@ package controller import ( + "bytes" "context" "crypto/tls" "encoding/json" + "io" "io/ioutil" "net/http" "net/http/httptest" "net/url" "os" "strings" + "sync" "testing" "time" + "git.arvados.org/arvados.git/lib/controller/rpc" "git.arvados.org/arvados.git/sdk/go/arvados" "git.arvados.org/arvados.git/sdk/go/arvadostest" "git.arvados.org/arvados.git/sdk/go/auth" @@ -34,15 +38,18 @@ func Test(t *testing.T) { var _ = check.Suite(&HandlerSuite{}) type HandlerSuite struct { - cluster *arvados.Cluster - handler http.Handler - ctx context.Context - cancel context.CancelFunc + cluster *arvados.Cluster + handler *Handler + railsSpy *arvadostest.Proxy + logbuf *bytes.Buffer + ctx context.Context + cancel context.CancelFunc } func (s *HandlerSuite) SetUpTest(c *check.C) { + s.logbuf = &bytes.Buffer{} s.ctx, s.cancel = context.WithCancel(context.Background()) - s.ctx = ctxlog.Context(s.ctx, ctxlog.New(os.Stderr, "json", "debug")) + s.ctx = ctxlog.Context(s.ctx, ctxlog.New(io.MultiWriter(os.Stderr, s.logbuf), "json", "debug")) s.cluster = &arvados.Cluster{ ClusterID: "zzzzz", PostgreSQL: integrationTestCluster().PostgreSQL, @@ -50,8 +57,10 @@ func (s *HandlerSuite) SetUpTest(c *check.C) { s.cluster.API.RequestTimeout = arvados.Duration(5 * time.Minute) s.cluster.TLS.Insecure = true arvadostest.SetServiceURL(&s.cluster.Services.RailsAPI, "https://"+os.Getenv("ARVADOS_TEST_API_HOST")) + s.railsSpy = arvadostest.NewProxy(c, s.cluster.Services.RailsAPI) + arvadostest.SetServiceURL(&s.cluster.Services.RailsAPI, s.railsSpy.URL.String()) arvadostest.SetServiceURL(&s.cluster.Services.Controller, "http://localhost:/") - s.handler = newHandler(s.ctx, s.cluster, "", prometheus.NewRegistry()) + s.handler = newHandler(s.ctx, s.cluster, "", prometheus.NewRegistry()).(*Handler) } func (s *HandlerSuite) TearDownTest(c *check.C) { @@ -88,6 +97,302 @@ func (s *HandlerSuite) TestConfigExport(c *check.C) { } } +func (s *HandlerSuite) TestDiscoveryDocCache(c *check.C) { + countRailsReqs := func() int { + n := 0 + for _, req := range s.railsSpy.RequestDumps { + if bytes.Contains(req, []byte("/discovery/v1/apis/arvados/v1/rest")) { + n++ + } + } + return n + } + getDD := func() int { + req := httptest.NewRequest(http.MethodGet, "/discovery/v1/apis/arvados/v1/rest", nil) + resp := httptest.NewRecorder() + s.handler.ServeHTTP(resp, req) + if resp.Code == http.StatusOK { + var dd arvados.DiscoveryDocument + err := json.Unmarshal(resp.Body.Bytes(), &dd) + c.Check(err, check.IsNil) + c.Check(dd.Schemas["Collection"].UUIDPrefix, check.Equals, "4zz18") + } + return resp.Code + } + getDDConcurrently := func(n int, expectCode int, checkArgs ...interface{}) *sync.WaitGroup { + var wg sync.WaitGroup + for i := 0; i < n; i++ { + wg.Add(1) + go func() { + defer wg.Done() + c.Check(getDD(), check.Equals, append([]interface{}{expectCode}, checkArgs...)...) + }() + } + return &wg + } + clearCache := func() { + for _, ent := range s.handler.cache { + ent.refreshLock.Lock() + ent.mtx.Lock() + ent.body, ent.header, ent.refreshAfter = nil, nil, time.Time{} + ent.mtx.Unlock() + ent.refreshLock.Unlock() + } + } + waitPendingUpdates := func() { + for _, ent := range s.handler.cache { + ent.refreshLock.Lock() + defer ent.refreshLock.Unlock() + ent.mtx.Lock() + defer ent.mtx.Unlock() + } + } + refreshNow := func() { + waitPendingUpdates() + for _, ent := range s.handler.cache { + ent.refreshAfter = time.Now() + } + } + expireNow := func() { + waitPendingUpdates() + for _, ent := range s.handler.cache { + ent.expireAfter = time.Now() + } + } + + // Easy path: first req fetches, subsequent reqs use cache. + c.Check(countRailsReqs(), check.Equals, 0) + c.Check(getDD(), check.Equals, http.StatusOK) + c.Check(countRailsReqs(), check.Equals, 1) + c.Check(getDD(), check.Equals, http.StatusOK) + c.Check(countRailsReqs(), check.Equals, 1) + c.Check(getDD(), check.Equals, http.StatusOK) + c.Check(countRailsReqs(), check.Equals, 1) + + // To guarantee we have concurrent requests, we set up + // railsSpy to hold up the Handler's outgoing requests until + // we send to (or close) holdReqs. + holdReqs := make(chan struct{}) + s.railsSpy.Director = func(*http.Request) { + <-holdReqs + } + + // Race at startup: first req fetches, other concurrent reqs + // wait for the initial fetch to complete, then all return. + clearCache() + reqsBefore := countRailsReqs() + wg := getDDConcurrently(5, http.StatusOK, check.Commentf("race at startup")) + close(holdReqs) + wg.Wait() + c.Check(countRailsReqs(), check.Equals, reqsBefore+1) + + // Race after expiry: concurrent reqs return the cached data + // but initiate a new fetch in the background. + refreshNow() + holdReqs = make(chan struct{}) + wg = getDDConcurrently(5, http.StatusOK, check.Commentf("race after expiry")) + reqsBefore = countRailsReqs() + close(holdReqs) + wg.Wait() + for deadline := time.Now().Add(time.Second); time.Now().Before(deadline) && countRailsReqs() < reqsBefore+1; { + time.Sleep(time.Second / 100) + } + c.Check(countRailsReqs(), check.Equals, reqsBefore+1) + + // Configure railsSpy to return an error or bad content + // depending on flags. + var wantError, wantBadContent bool + s.railsSpy.Director = func(req *http.Request) { + if wantError { + req.Method = "MAKE-COFFEE" + } else if wantBadContent { + req.URL.Path = "/_health/ping" + req.Header.Set("Authorization", "Bearer "+arvadostest.ManagementToken) + } + } + + // Error at startup (empty cache) => caller gets error, and we + // make an upstream attempt for each incoming request because + // we have nothing better to return + clearCache() + wantError, wantBadContent = true, false + reqsBefore = countRailsReqs() + holdReqs = make(chan struct{}) + wg = getDDConcurrently(5, http.StatusBadGateway, check.Commentf("error at startup")) + close(holdReqs) + wg.Wait() + c.Check(countRailsReqs(), check.Equals, reqsBefore+5) + + // Response status is OK but body is not a discovery document + wantError, wantBadContent = false, true + reqsBefore = countRailsReqs() + c.Check(getDD(), check.Equals, http.StatusBadGateway) + c.Check(countRailsReqs(), check.Equals, reqsBefore+1) + + // Error condition clears => caller gets OK, cache is warmed + // up + wantError, wantBadContent = false, false + reqsBefore = countRailsReqs() + getDDConcurrently(5, http.StatusOK, check.Commentf("success after errors at startup")).Wait() + c.Check(countRailsReqs(), check.Equals, reqsBefore+1) + + // Error with warm cache => caller gets OK (with no attempt to + // re-fetch) + wantError, wantBadContent = true, false + reqsBefore = countRailsReqs() + getDDConcurrently(5, http.StatusOK, check.Commentf("error with warm cache")).Wait() + c.Check(countRailsReqs(), check.Equals, reqsBefore) + + // Error with stale cache => caller gets OK with stale data + // while the re-fetch is attempted in the background + refreshNow() + wantError, wantBadContent = true, false + reqsBefore = countRailsReqs() + holdReqs = make(chan struct{}) + getDDConcurrently(5, http.StatusOK, check.Commentf("error with stale cache")).Wait() + close(holdReqs) + // Only one attempt to re-fetch (holdReqs ensured the first + // update took long enough for the last incoming request to + // arrive) + c.Check(countRailsReqs(), check.Equals, reqsBefore+1) + + refreshNow() + wantError, wantBadContent = false, false + reqsBefore = countRailsReqs() + holdReqs = make(chan struct{}) + getDDConcurrently(5, http.StatusOK, check.Commentf("refresh cache after error condition clears")).Wait() + close(holdReqs) + waitPendingUpdates() + c.Check(countRailsReqs(), check.Equals, reqsBefore+1) + + // Make sure expireAfter is getting set + waitPendingUpdates() + exp := s.handler.cache["/discovery/v1/apis/arvados/v1/rest"].expireAfter.Sub(time.Now()) + c.Check(exp > cacheTTL, check.Equals, true) + c.Check(exp < cacheExpire, check.Equals, true) + + // After the cache *expires* it behaves as if uninitialized: + // each incoming request does a new upstream request until one + // succeeds. + // + // First check failure after expiry: + expireNow() + wantError, wantBadContent = true, false + reqsBefore = countRailsReqs() + holdReqs = make(chan struct{}) + wg = getDDConcurrently(5, http.StatusBadGateway, check.Commentf("error after expiry")) + close(holdReqs) + wg.Wait() + c.Check(countRailsReqs(), check.Equals, reqsBefore+5) + + // Success after expiry: + wantError, wantBadContent = false, false + reqsBefore = countRailsReqs() + holdReqs = make(chan struct{}) + wg = getDDConcurrently(5, http.StatusOK, check.Commentf("success after expiry")) + close(holdReqs) + wg.Wait() + c.Check(countRailsReqs(), check.Equals, reqsBefore+1) +} + +func (s *HandlerSuite) TestVocabularyExport(c *check.C) { + voc := `{ + "strict_tags": false, + "tags": { + "IDTAGIMPORTANCE": { + "strict": false, + "labels": [{"label": "Importance"}], + "values": { + "HIGH": { + "labels": [{"label": "High"}] + }, + "LOW": { + "labels": [{"label": "Low"}] + } + } + } + } + }` + f, err := os.CreateTemp("", "test-vocabulary-*.json") + c.Assert(err, check.IsNil) + defer os.Remove(f.Name()) + _, err = f.WriteString(voc) + c.Assert(err, check.IsNil) + f.Close() + s.cluster.API.VocabularyPath = f.Name() + for _, method := range []string{"GET", "OPTIONS"} { + c.Log(c.TestName()+" ", method) + req := httptest.NewRequest(method, "/arvados/v1/vocabulary", nil) + resp := httptest.NewRecorder() + s.handler.ServeHTTP(resp, req) + c.Log(resp.Body.String()) + if !c.Check(resp.Code, check.Equals, http.StatusOK) { + continue + } + c.Check(resp.Header().Get("Access-Control-Allow-Origin"), check.Equals, `*`) + c.Check(resp.Header().Get("Access-Control-Allow-Methods"), check.Matches, `.*\bGET\b.*`) + c.Check(resp.Header().Get("Access-Control-Allow-Headers"), check.Matches, `.+`) + if method == "OPTIONS" { + c.Check(resp.Body.String(), check.HasLen, 0) + continue + } + var expectedVoc, receivedVoc *arvados.Vocabulary + err := json.Unmarshal([]byte(voc), &expectedVoc) + c.Check(err, check.IsNil) + err = json.Unmarshal(resp.Body.Bytes(), &receivedVoc) + c.Check(err, check.IsNil) + c.Check(receivedVoc, check.DeepEquals, expectedVoc) + } +} + +func (s *HandlerSuite) TestVocabularyFailedCheckStatus(c *check.C) { + voc := `{ + "strict_tags": false, + "tags": { + "IDTAGIMPORTANCE": { + "strict": true, + "labels": [{"label": "Importance"}], + "values": { + "HIGH": { + "labels": [{"label": "High"}] + }, + "LOW": { + "labels": [{"label": "Low"}] + } + } + } + } + }` + f, err := os.CreateTemp("", "test-vocabulary-*.json") + c.Assert(err, check.IsNil) + defer os.Remove(f.Name()) + _, err = f.WriteString(voc) + c.Assert(err, check.IsNil) + f.Close() + s.cluster.API.VocabularyPath = f.Name() + + req := httptest.NewRequest("POST", "/arvados/v1/collections", + strings.NewReader(`{ + "collection": { + "properties": { + "IDTAGIMPORTANCE": "Critical" + } + } + }`)) + req.Header.Set("Authorization", "Bearer "+arvadostest.ActiveToken) + req.Header.Set("Content-type", "application/json") + + resp := httptest.NewRecorder() + s.handler.ServeHTTP(resp, req) + c.Log(resp.Body.String()) + c.Assert(resp.Code, check.Equals, http.StatusBadRequest) + var jresp httpserver.ErrorResponse + err = json.Unmarshal(resp.Body.Bytes(), &jresp) + c.Check(err, check.IsNil) + c.Assert(len(jresp.Errors), check.Equals, 1) + c.Check(jresp.Errors[0], check.Matches, `.*tag value.*is not valid for key.*`) +} + func (s *HandlerSuite) TestProxyDiscoveryDoc(c *check.C) { req := httptest.NewRequest("GET", "/discovery/v1/apis/arvados/v1/rest", nil) resp := httptest.NewRecorder() @@ -102,17 +407,21 @@ func (s *HandlerSuite) TestProxyDiscoveryDoc(c *check.C) { c.Check(len(dd.Schemas), check.Not(check.Equals), 0) } -func (s *HandlerSuite) TestRequestTimeout(c *check.C) { - s.cluster.API.RequestTimeout = arvados.Duration(time.Nanosecond) - req := httptest.NewRequest("GET", "/discovery/v1/apis/arvados/v1/rest", nil) +// Handler should give up and exit early if request context is +// cancelled due to client hangup, httpserver.HandlerWithDeadline, +// etc. +func (s *HandlerSuite) TestRequestCancel(c *check.C) { + ctx, cancel := context.WithCancel(context.Background()) + req := httptest.NewRequest("GET", "/static/login_failure", nil).WithContext(ctx) resp := httptest.NewRecorder() + cancel() s.handler.ServeHTTP(resp, req) c.Check(resp.Code, check.Equals, http.StatusBadGateway) var jresp httpserver.ErrorResponse err := json.Unmarshal(resp.Body.Bytes(), &jresp) c.Check(err, check.IsNil) c.Assert(len(jresp.Errors), check.Equals, 1) - c.Check(jresp.Errors[0], check.Matches, `.*context deadline exceeded.*`) + c.Check(jresp.Errors[0], check.Matches, `.*context canceled`) } func (s *HandlerSuite) TestProxyWithoutToken(c *check.C) { @@ -165,20 +474,22 @@ func (s *HandlerSuite) TestProxyNotFound(c *check.C) { } func (s *HandlerSuite) TestLogoutGoogle(c *check.C) { + s.cluster.Services.Workbench2.ExternalURL = arvados.URL{Scheme: "https", Host: "wb2.example", Path: "/"} s.cluster.Login.Google.Enable = true s.cluster.Login.Google.ClientID = "test" - req := httptest.NewRequest("GET", "https://0.0.0.0:1/logout?return_to=https://example.com/foo", nil) + req := httptest.NewRequest("GET", "https://0.0.0.0:1/logout?return_to=https://wb2.example/", nil) resp := httptest.NewRecorder() s.handler.ServeHTTP(resp, req) if !c.Check(resp.Code, check.Equals, http.StatusFound) { c.Log(resp.Body.String()) } - c.Check(resp.Header().Get("Location"), check.Equals, "https://example.com/foo") + c.Check(resp.Header().Get("Location"), check.Equals, "https://wb2.example/") } func (s *HandlerSuite) TestValidateV1APIToken(c *check.C) { + c.Assert(s.handler.CheckHealth(), check.IsNil) req := httptest.NewRequest("GET", "/arvados/v1/users/current", nil) - user, ok, err := s.handler.(*Handler).validateAPItoken(req, arvadostest.ActiveToken) + user, ok, err := s.handler.validateAPItoken(req, arvadostest.ActiveToken) c.Assert(err, check.IsNil) c.Check(ok, check.Equals, true) c.Check(user.Authorization.UUID, check.Equals, arvadostest.ActiveTokenUUID) @@ -188,8 +499,9 @@ func (s *HandlerSuite) TestValidateV1APIToken(c *check.C) { } func (s *HandlerSuite) TestValidateV2APIToken(c *check.C) { + c.Assert(s.handler.CheckHealth(), check.IsNil) req := httptest.NewRequest("GET", "/arvados/v1/users/current", nil) - user, ok, err := s.handler.(*Handler).validateAPItoken(req, arvadostest.ActiveTokenV2) + user, ok, err := s.handler.validateAPItoken(req, arvadostest.ActiveTokenV2) c.Assert(err, check.IsNil) c.Check(ok, check.Equals, true) c.Check(user.Authorization.UUID, check.Equals, arvadostest.ActiveTokenUUID) @@ -219,13 +531,24 @@ func (s *HandlerSuite) TestValidateRemoteToken(c *check.C) { } } +func (s *HandlerSuite) TestLogTokenUUID(c *check.C) { + req := httptest.NewRequest("GET", "https://0.0.0.0/arvados/v1/users/current", nil) + req.Header.Set("Authorization", "Bearer "+arvadostest.ActiveTokenV2) + req = req.WithContext(s.ctx) + resp := httptest.NewRecorder() + httpserver.LogRequests(s.handler).ServeHTTP(resp, req) + c.Check(resp.Code, check.Equals, http.StatusOK) + c.Check(s.logbuf.String(), check.Matches, `(?ms).*"tokenUUIDs":\["`+strings.Split(arvadostest.ActiveTokenV2, "/")[1]+`"\].*`) +} + func (s *HandlerSuite) TestCreateAPIToken(c *check.C) { + c.Assert(s.handler.CheckHealth(), check.IsNil) req := httptest.NewRequest("GET", "/arvados/v1/users/current", nil) - auth, err := s.handler.(*Handler).createAPItoken(req, arvadostest.ActiveUserUUID, nil) + auth, err := s.handler.createAPItoken(req, arvadostest.ActiveUserUUID, nil) c.Assert(err, check.IsNil) c.Check(auth.Scopes, check.DeepEquals, []string{"all"}) - user, ok, err := s.handler.(*Handler).validateAPItoken(req, auth.TokenV2()) + user, ok, err := s.handler.validateAPItoken(req, auth.TokenV2()) c.Assert(err, check.IsNil) c.Check(ok, check.Equals, true) c.Check(user.Authorization.UUID, check.Equals, auth.UUID) @@ -245,7 +568,7 @@ func (s *HandlerSuite) CheckObjectType(c *check.C, url string, token string, ski resp := httptest.NewRecorder() s.handler.ServeHTTP(resp, req) c.Assert(resp.Code, check.Equals, http.StatusOK, - check.Commentf("Wasn't able to get data from the controller at %q", url)) + check.Commentf("Wasn't able to get data from the controller at %q: %q", url, resp.Body.String())) err = json.Unmarshal(resp.Body.Bytes(), &proxied) c.Check(err, check.Equals, nil) @@ -269,16 +592,14 @@ func (s *HandlerSuite) CheckObjectType(c *check.C, url string, token string, ski for k := range direct { if _, ok := skippedFields[k]; ok { continue - } else if val, ok := proxied[k]; ok { - if direct["kind"] == "arvados#collection" && k == "manifest_text" { - // Tokens differ from request to request - c.Check(strings.Split(val.(string), "+A")[0], check.Equals, strings.Split(direct[k].(string), "+A")[0]) - } else { - c.Check(val, check.DeepEquals, direct[k], - check.Commentf("RailsAPI %s key %q's value %q differs from controller's %q.", direct["kind"], k, direct[k], val)) - } - } else { + } else if val, ok := proxied[k]; !ok { c.Errorf("%s's key %q missing on controller's response.", direct["kind"], k) + } else if direct["kind"] == "arvados#collection" && k == "manifest_text" { + // Tokens differ from request to request + c.Check(strings.Split(val.(string), "+A")[0], check.Equals, strings.Split(direct[k].(string), "+A")[0]) + } else { + c.Check(val, check.DeepEquals, direct[k], + check.Commentf("RailsAPI %s key %q's value %q differs from controller's %q.", direct["kind"], k, direct[k], val)) } } } @@ -294,10 +615,30 @@ func (s *HandlerSuite) TestGetObjects(c *check.C) { json.Unmarshal(resp.Body.Bytes(), &ksList) c.Assert(len(ksList.Items), check.Not(check.Equals), 0) ksUUID := ksList.Items[0].UUID + // Create a new token for the test user so that we're not comparing + // the ones from the fixtures. + req = httptest.NewRequest("POST", "/arvados/v1/api_client_authorizations", + strings.NewReader(`{ + "api_client_authorization": { + "owner_uuid": "`+arvadostest.AdminUserUUID+`", + "created_by_ip_address": "::1", + "last_used_by_ip_address": "::1", + "default_owner_uuid": "`+arvadostest.AdminUserUUID+`" + } + }`)) + req.Header.Set("Authorization", "Bearer "+arvadostest.SystemRootToken) + req.Header.Set("Content-type", "application/json") + resp = httptest.NewRecorder() + s.handler.ServeHTTP(resp, req) + c.Assert(resp.Code, check.Equals, http.StatusOK, + check.Commentf("%s", resp.Body.String())) + var auth arvados.APIClientAuthorization + json.Unmarshal(resp.Body.Bytes(), &auth) + c.Assert(auth.UUID, check.Not(check.Equals), "") testCases := map[string]map[string]bool{ "api_clients/" + arvadostest.TrustedWorkbenchAPIClientUUID: nil, - "api_client_authorizations/" + arvadostest.AdminTokenUUID: nil, + "api_client_authorizations/" + auth.UUID: {"href": true, "modified_by_client_uuid": true, "modified_by_user_uuid": true}, "authorized_keys/" + arvadostest.AdminAuthorizedKeysUUID: nil, "collections/" + arvadostest.CollectionWithUniqueWordsUUID: {"href": true}, "containers/" + arvadostest.RunningContainerUUID: nil, @@ -313,7 +654,8 @@ func (s *HandlerSuite) TestGetObjects(c *check.C) { "workflows/" + arvadostest.WorkflowWithDefinitionYAMLUUID: nil, } for url, skippedFields := range testCases { - s.CheckObjectType(c, "/arvados/v1/"+url, arvadostest.AdminToken, skippedFields) + c.Logf("Testing %q", url) + s.CheckObjectType(c, "/arvados/v1/"+url, auth.TokenV2(), skippedFields) } } @@ -332,3 +674,150 @@ func (s *HandlerSuite) TestRedactRailsAPIHostFromErrors(c *check.C) { c.Check(jresp.Errors[0], check.Matches, `.*//railsapi\.internal/arvados/v1/collections/.*: 404 Not Found.*`) c.Check(jresp.Errors[0], check.Not(check.Matches), `(?ms).*127.0.0.1.*`) } + +func (s *HandlerSuite) TestTrashSweep(c *check.C) { + s.cluster.SystemRootToken = arvadostest.SystemRootToken + s.cluster.Collections.TrashSweepInterval = arvados.Duration(time.Second / 10) + s.handler.CheckHealth() + ctx := auth.NewContext(s.ctx, &auth.Credentials{Tokens: []string{arvadostest.ActiveTokenV2}}) + coll, err := s.handler.federation.CollectionCreate(ctx, arvados.CreateOptions{Attrs: map[string]interface{}{"name": "test trash sweep"}, EnsureUniqueName: true}) + c.Assert(err, check.IsNil) + defer s.handler.federation.CollectionDelete(ctx, arvados.DeleteOptions{UUID: coll.UUID}) + db, err := s.handler.dbConnector.GetDB(s.ctx) + c.Assert(err, check.IsNil) + _, err = db.ExecContext(s.ctx, `update collections set trash_at = $1, delete_at = $2 where uuid = $3`, time.Now().UTC().Add(time.Second/10), time.Now().UTC().Add(time.Hour), coll.UUID) + c.Assert(err, check.IsNil) + deadline := time.Now().Add(5 * time.Second) + for { + if time.Now().After(deadline) { + c.Log("timed out") + c.FailNow() + } + updated, err := s.handler.federation.CollectionGet(ctx, arvados.GetOptions{UUID: coll.UUID, IncludeTrash: true}) + c.Assert(err, check.IsNil) + if updated.IsTrashed { + break + } + time.Sleep(time.Second / 10) + } +} + +func (s *HandlerSuite) TestContainerLogSweep(c *check.C) { + s.cluster.SystemRootToken = arvadostest.SystemRootToken + s.cluster.Containers.Logging.SweepInterval = arvados.Duration(time.Second / 10) + s.handler.CheckHealth() + ctx := auth.NewContext(s.ctx, &auth.Credentials{Tokens: []string{arvadostest.ActiveTokenV2}}) + logentry, err := s.handler.federation.LogCreate(ctx, arvados.CreateOptions{Attrs: map[string]interface{}{ + "object_uuid": arvadostest.CompletedContainerUUID, + "event_type": "stderr", + "properties": map[string]interface{}{ + "text": "test trash sweep\n", + }, + }}) + c.Assert(err, check.IsNil) + defer s.handler.federation.LogDelete(ctx, arvados.DeleteOptions{UUID: logentry.UUID}) + deadline := time.Now().Add(5 * time.Second) + for { + if time.Now().After(deadline) { + c.Log("timed out") + c.FailNow() + } + logentries, err := s.handler.federation.LogList(ctx, arvados.ListOptions{Filters: []arvados.Filter{{"uuid", "=", logentry.UUID}}, Limit: -1}) + c.Assert(err, check.IsNil) + if len(logentries.Items) == 0 { + break + } + time.Sleep(time.Second / 10) + } +} + +func (s *HandlerSuite) TestLogActivity(c *check.C) { + s.cluster.SystemRootToken = arvadostest.SystemRootToken + s.cluster.Users.ActivityLoggingPeriod = arvados.Duration(24 * time.Hour) + s.handler.CheckHealth() + + testServer := newServerFromIntegrationTestEnv(c) + testServer.Server.Handler = httpserver.AddRequestIDs(httpserver.LogRequests(s.handler)) + c.Assert(testServer.Start(), check.IsNil) + defer testServer.Close() + + u, _ := url.Parse("http://" + testServer.Addr) + client := rpc.NewConn(s.cluster.ClusterID, u, true, rpc.PassthroughTokenProvider) + + starttime := time.Now() + for i := 0; i < 4; i++ { + for _, token := range []string{ + arvadostest.ActiveTokenV2, + arvadostest.ActiveToken, + arvadostest.SpectatorToken, + } { + ctx := auth.NewContext(s.ctx, &auth.Credentials{Tokens: []string{token}}) + _, err := client.CollectionList(ctx, arvados.ListOptions{}) + c.Assert(err, check.IsNil) + } + } + db, err := s.handler.dbConnector.GetDB(s.ctx) + c.Assert(err, check.IsNil) + for _, userUUID := range []string{arvadostest.ActiveUserUUID, arvadostest.SpectatorUserUUID} { + var rows int + err = db.QueryRowContext(s.ctx, `select count(uuid) from logs where object_uuid = $1 and event_at > $2`, arvadostest.ActiveUserUUID, starttime.UTC()).Scan(&rows) + c.Assert(err, check.IsNil) + c.Check(rows, check.Equals, 1, check.Commentf("expect 1 row for user uuid %s", userUUID)) + } +} + +func (s *HandlerSuite) TestLogLimiting(c *check.C) { + s.handler.Cluster.API.MaxConcurrentRequests = 2 + s.handler.Cluster.API.LogCreateRequestFraction = 0.5 + + logreq := httptest.NewRequest("POST", "/arvados/v1/logs", strings.NewReader(`{ + "log": { + "event_type": "test" + } + }`)) + logreq.Header.Set("Authorization", "Bearer "+arvadostest.ActiveToken) + + // Log create succeeds + for i := 0; i < 2; i++ { + resp := httptest.NewRecorder() + s.handler.ServeHTTP(resp, logreq) + c.Check(resp.Code, check.Equals, http.StatusOK) + var lg arvados.Log + err := json.Unmarshal(resp.Body.Bytes(), &lg) + c.Check(err, check.IsNil) + c.Check(lg.UUID, check.Matches, "zzzzz-57u5n-.*") + } + + // Pretend there's a log create in flight + s.handler.limitLogCreate <- struct{}{} + + // Log create should be rejected now + resp := httptest.NewRecorder() + s.handler.ServeHTTP(resp, logreq) + c.Check(resp.Code, check.Equals, http.StatusServiceUnavailable) + + // Other requests still succeed + req := httptest.NewRequest("GET", "/arvados/v1/users/current", nil) + req.Header.Set("Authorization", "Bearer "+arvadostest.ActiveToken) + resp = httptest.NewRecorder() + s.handler.ServeHTTP(resp, req) + c.Check(resp.Code, check.Equals, http.StatusOK) + var u arvados.User + err := json.Unmarshal(resp.Body.Bytes(), &u) + c.Check(err, check.IsNil) + c.Check(u.UUID, check.Equals, arvadostest.ActiveUserUUID) + + // log create still fails + resp = httptest.NewRecorder() + s.handler.ServeHTTP(resp, logreq) + c.Check(resp.Code, check.Equals, http.StatusServiceUnavailable) + + // Pretend in-flight log is done + <-s.handler.limitLogCreate + + // log create succeeds again + resp = httptest.NewRecorder() + s.handler.ServeHTTP(resp, logreq) + c.Check(resp.Code, check.Equals, http.StatusOK) + +}