From 1abd17dde53982ed058abf770072123f61ed13af Mon Sep 17 00:00:00 2001 From: Tom Clegg Date: Thu, 9 Mar 2023 15:39:00 -0500 Subject: [PATCH] 20187: Test discovery doc cache. Arvados-DCO-1.1-Signed-off-by: Tom Clegg --- lib/controller/handler.go | 12 +++ lib/controller/handler_test.go | 161 ++++++++++++++++++++++++++++++++- sdk/go/arvadostest/proxy.go | 7 ++ 3 files changed, 175 insertions(+), 5 deletions(-) diff --git a/lib/controller/handler.go b/lib/controller/handler.go index ce3243f6cb..05b45a0fc0 100644 --- a/lib/controller/handler.go +++ b/lib/controller/handler.go @@ -238,7 +238,19 @@ func (ent *cacheEnt) refresh(path string, do func(*http.Request) (*http.Response // another goroutine refreshed successfully while we // were waiting for refreshLock return header, body, nil + } else if body != nil { + // Cache is present, but expired. We'll try to refresh + // below. Meanwhile, other refresh() calls will queue + // up for refreshLock -- and we don't want them to + // turn into N upstream requests, even if upstream is + // failing. (If we succeed we'll update the expiry + // time again below with the real cacheTTL -- this + // just takes care of the error case.) + ent.mtx.Lock() + ent.refreshAfter = time.Now().Add(time.Second) + ent.mtx.Unlock() } + ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Minute)) defer cancel() // 0.0.0.0:0 is just a placeholder here -- do(), which is diff --git a/lib/controller/handler_test.go b/lib/controller/handler_test.go index 52bc4f907e..4a7e1ad78a 100644 --- a/lib/controller/handler_test.go +++ b/lib/controller/handler_test.go @@ -16,6 +16,7 @@ import ( "net/url" "os" "strings" + "sync" "testing" "time" @@ -37,11 +38,12 @@ func Test(t *testing.T) { var _ = check.Suite(&HandlerSuite{}) type HandlerSuite struct { - cluster *arvados.Cluster - handler *Handler - logbuf *bytes.Buffer - ctx context.Context - cancel context.CancelFunc + cluster *arvados.Cluster + handler *Handler + railsSpy *arvadostest.Proxy + logbuf *bytes.Buffer + ctx context.Context + cancel context.CancelFunc } func (s *HandlerSuite) SetUpTest(c *check.C) { @@ -55,6 +57,8 @@ func (s *HandlerSuite) SetUpTest(c *check.C) { s.cluster.API.RequestTimeout = arvados.Duration(5 * time.Minute) s.cluster.TLS.Insecure = true arvadostest.SetServiceURL(&s.cluster.Services.RailsAPI, "https://"+os.Getenv("ARVADOS_TEST_API_HOST")) + s.railsSpy = arvadostest.NewProxy(c, s.cluster.Services.RailsAPI) + arvadostest.SetServiceURL(&s.cluster.Services.RailsAPI, s.railsSpy.URL.String()) arvadostest.SetServiceURL(&s.cluster.Services.Controller, "http://localhost:/") s.handler = newHandler(s.ctx, s.cluster, "", prometheus.NewRegistry()).(*Handler) } @@ -93,6 +97,153 @@ func (s *HandlerSuite) TestConfigExport(c *check.C) { } } +func (s *HandlerSuite) TestDiscoveryDocCache(c *check.C) { + countRailsReqs := func() int { + n := 0 + for _, req := range s.railsSpy.RequestDumps { + if bytes.Contains(req, []byte("/discovery/v1/apis/arvados/v1/rest")) { + n++ + } + } + return n + } + getDD := func() int { + req := httptest.NewRequest(http.MethodGet, "/discovery/v1/apis/arvados/v1/rest", nil) + resp := httptest.NewRecorder() + s.handler.ServeHTTP(resp, req) + if resp.Code == http.StatusOK { + var dd arvados.DiscoveryDocument + err := json.Unmarshal(resp.Body.Bytes(), &dd) + c.Check(err, check.IsNil) + c.Check(dd.Schemas["Collection"].UUIDPrefix, check.Equals, "4zz18") + } + return resp.Code + } + getDDConcurrently := func(n int, expectCode int, checkArgs ...interface{}) *sync.WaitGroup { + var wg sync.WaitGroup + for i := 0; i < n; i++ { + wg.Add(1) + go func() { + defer wg.Done() + c.Check(getDD(), check.Equals, append([]interface{}{expectCode}, checkArgs...)...) + }() + } + return &wg + } + clearCache := func() { + for path := range s.handler.cache { + s.handler.cache[path] = &cacheEnt{} + } + } + expireCache := func() { + for _, ent := range s.handler.cache { + ent.refreshAfter = time.Now() + } + } + waitPendingUpdates := func() { + for _, ent := range s.handler.cache { + ent.refreshLock.Lock() + defer ent.refreshLock.Unlock() + ent.mtx.Lock() + defer ent.mtx.Unlock() + } + } + + // Easy path: first req fetches, subsequent reqs use cache. + c.Check(countRailsReqs(), check.Equals, 0) + c.Check(getDD(), check.Equals, http.StatusOK) + c.Check(countRailsReqs(), check.Equals, 1) + c.Check(getDD(), check.Equals, http.StatusOK) + c.Check(countRailsReqs(), check.Equals, 1) + c.Check(getDD(), check.Equals, http.StatusOK) + c.Check(countRailsReqs(), check.Equals, 1) + + // To guarantee we have concurrent requests, we set up + // railsSpy to hold up the Handler's outgoing requests until + // we send to (or close) holdReqs. + holdReqs := make(chan struct{}) + s.railsSpy.Director = func(*http.Request) { + <-holdReqs + } + + // Race at startup: first req fetches, other concurrent reqs + // wait for the initial fetch to complete, then all return. + clearCache() + reqsBefore := countRailsReqs() + wg := getDDConcurrently(5, http.StatusOK, check.Commentf("race at startup")) + close(holdReqs) + wg.Wait() + c.Check(countRailsReqs(), check.Equals, reqsBefore+1) + + // Race after expiry: concurrent reqs return the cached data + // but initiate a new fetch in the background. + expireCache() + holdReqs = make(chan struct{}) + wg = getDDConcurrently(5, http.StatusOK, check.Commentf("race after expiry")) + reqsBefore = countRailsReqs() + close(holdReqs) + wg.Wait() + for deadline := time.Now().Add(time.Second); time.Now().Before(deadline) && countRailsReqs() < reqsBefore+1; { + time.Sleep(time.Second / 100) + } + c.Check(countRailsReqs(), check.Equals, reqsBefore+1) + + // Configure railsSpy to return an error when wantError==true. + var wantError bool + s.railsSpy.Director = func(req *http.Request) { + if wantError { + req.Method = "MAKE-COFFEE" + } + } + + // Error at startup (empty cache) => caller gets error, and we + // make an upstream attempt for each incoming request because + // we have nothing better to return + clearCache() + wantError = true + reqsBefore = countRailsReqs() + holdReqs = make(chan struct{}) + wg = getDDConcurrently(5, http.StatusBadGateway, check.Commentf("error at startup")) + close(holdReqs) + wg.Wait() + c.Check(countRailsReqs(), check.Equals, reqsBefore+5) + + // Error condition clears => caller gets OK, cache is warmed + // up + wantError = false + reqsBefore = countRailsReqs() + getDDConcurrently(5, http.StatusOK, check.Commentf("success after errors at startup")).Wait() + c.Check(countRailsReqs(), check.Equals, reqsBefore+1) + + // Error with warm cache => caller gets OK (with no attempt to + // re-fetch) + wantError = true + reqsBefore = countRailsReqs() + getDDConcurrently(5, http.StatusOK, check.Commentf("error with warm cache")).Wait() + c.Check(countRailsReqs(), check.Equals, reqsBefore) + expireCache() + + // Error with expired cache => caller gets OK with stale data + // while the re-fetch is attempted in the background + reqsBefore = countRailsReqs() + holdReqs = make(chan struct{}) + getDDConcurrently(5, http.StatusOK, check.Commentf("error with expired cache")).Wait() + close(holdReqs) + // Only one attempt to re-fetch (holdReqs ensured the first + // update took long enough for the last incoming request to + // arrive) + c.Check(countRailsReqs(), check.Equals, reqsBefore+1) + + waitPendingUpdates() + expireCache() + wantError = false + reqsBefore = countRailsReqs() + holdReqs = make(chan struct{}) + getDDConcurrently(5, http.StatusOK, check.Commentf("refresh cache after error condition clears")).Wait() + close(holdReqs) + c.Check(countRailsReqs(), check.Equals, reqsBefore+1) +} + func (s *HandlerSuite) TestVocabularyExport(c *check.C) { voc := `{ "strict_tags": false, diff --git a/sdk/go/arvadostest/proxy.go b/sdk/go/arvadostest/proxy.go index 48700d8b18..9940ddd3d9 100644 --- a/sdk/go/arvadostest/proxy.go +++ b/sdk/go/arvadostest/proxy.go @@ -26,6 +26,10 @@ type Proxy struct { // A dump of each request that has been proxied. RequestDumps [][]byte + + // If non-nil, func will be called on each incoming request + // before proxying it. + Director func(*http.Request) } // NewProxy returns a new Proxy that saves a dump of each reqeust @@ -63,6 +67,9 @@ func NewProxy(c *check.C, svc arvados.Service) *Proxy { URL: u, } rp.Director = func(r *http.Request) { + if proxy.Director != nil { + proxy.Director(r) + } dump, _ := httputil.DumpRequest(r, true) proxy.RequestDumps = append(proxy.RequestDumps, dump) r.URL.Scheme = target.Scheme -- 2.30.2