From 9e6bcccc19bd00d76bf536769037a9a978207180 Mon Sep 17 00:00:00 2001 From: Tom Clegg Date: Wed, 27 Dec 2023 17:50:45 -0500 Subject: [PATCH] 21285: Use separate request limit/queue for gateway tunnel requests. Arvados-DCO-1.1-Signed-off-by: Tom Clegg --- lib/config/config.default.yml | 10 ++ lib/config/export.go | 1 + lib/service/cmd.go | 101 ++++++++----- lib/service/cmd_test.go | 87 +++++++++-- sdk/go/arvados/config.go | 1 + sdk/go/httpserver/request_limiter.go | 167 ++++++++++++---------- sdk/go/httpserver/request_limiter_test.go | 49 ++++--- 7 files changed, 275 insertions(+), 141 deletions(-) diff --git a/lib/config/config.default.yml b/lib/config/config.default.yml index 05bc1309cd..3924090ca9 100644 --- a/lib/config/config.default.yml +++ b/lib/config/config.default.yml @@ -231,6 +231,10 @@ Clusters: # also effectively limited by MaxConcurrentRailsRequests (see # below) because most controller requests proxy through to the # RailsAPI service. + # + # HTTP proxies and load balancers downstream of arvados services + # should be configured to allow at least {MaxConcurrentRequest + + # MaxQueuedRequests + MaxGatewayTunnels} concurrent requests. MaxConcurrentRequests: 64 # Maximum number of concurrent requests to process concurrently @@ -250,6 +254,12 @@ Clusters: # the incoming request queue before returning 503. MaxQueueTimeForLockRequests: 2s + # Maximum number of active gateway tunnel connections. A slot is + # consumed by each running container, and by each incoming + # "container shell" connection. These do not count toward + # MaxConcurrentRequests. + MaxGatewayTunnels: 1000 + # Fraction of MaxConcurrentRequests that can be "log create" # messages at any given time. This is to prevent logging # updates from crowding out more important requests. diff --git a/lib/config/export.go b/lib/config/export.go index e51e6fc32c..4b6c142ff2 100644 --- a/lib/config/export.go +++ b/lib/config/export.go @@ -70,6 +70,7 @@ var whitelist = map[string]bool{ "API.LogCreateRequestFraction": false, "API.MaxConcurrentRailsRequests": false, "API.MaxConcurrentRequests": false, + "API.MaxGatewayTunnels": false, "API.MaxIndexDatabaseRead": false, "API.MaxItemsPerResponse": true, "API.MaxKeepBlobBuffers": false, diff --git a/lib/service/cmd.go b/lib/service/cmd.go index 725f86f3bd..e40b47acbb 100644 --- a/lib/service/cmd.go +++ b/lib/service/cmd.go @@ -148,32 +148,13 @@ func (c *command) RunCommand(prog string, args []string, stdin io.Reader, stdout return 1 } - maxReqs := cluster.API.MaxConcurrentRequests - if maxRails := cluster.API.MaxConcurrentRailsRequests; maxRails > 0 && - (maxRails < maxReqs || maxReqs == 0) && - strings.HasSuffix(prog, "controller") { - // Ideally, we would accept up to - // MaxConcurrentRequests, and apply the - // MaxConcurrentRailsRequests limit only for requests - // that require calling upstream to RailsAPI. But for - // now we make the simplifying assumption that every - // controller request causes an upstream RailsAPI - // request. - maxReqs = maxRails - } instrumented := httpserver.Instrument(reg, log, httpserver.HandlerWithDeadline(cluster.API.RequestTimeout.Duration(), httpserver.AddRequestIDs( httpserver.Inspect(reg, cluster.ManagementToken, httpserver.LogRequests( interceptHealthReqs(cluster.ManagementToken, handler.CheckHealth, - &httpserver.RequestLimiter{ - Handler: handler, - MaxConcurrent: maxReqs, - MaxQueue: cluster.API.MaxQueuedRequests, - MaxQueueTimeForMinPriority: cluster.API.MaxQueueTimeForLockRequests.Duration(), - Priority: c.requestPriority, - Registry: reg})))))) + c.requestLimiter(handler, cluster, reg))))))) srv := &httpserver.Server{ Server: http.Server{ Handler: ifCollectionInHost(instrumented, instrumented.ServeAPI(cluster.ManagementToken, instrumented)), @@ -212,7 +193,7 @@ func (c *command) RunCommand(prog string, args []string, stdin io.Reader, stdout <-handler.Done() srv.Close() }() - go c.requestQueueDumpCheck(cluster, maxReqs, prog, reg, &srv.Server, logger) + go c.requestQueueDumpCheck(cluster, prog, reg, &srv.Server, logger) err = srv.Wait() if err != nil { return 1 @@ -221,12 +202,13 @@ func (c *command) RunCommand(prog string, args []string, stdin io.Reader, stdout } // If SystemLogs.RequestQueueDumpDirectory is set, monitor the -// server's incoming HTTP request queue size. When it exceeds 90% of -// API.MaxConcurrentRequests, write the /_inspect/requests data to a -// JSON file in the specified directory. -func (c *command) requestQueueDumpCheck(cluster *arvados.Cluster, maxReqs int, prog string, reg *prometheus.Registry, srv *http.Server, logger logrus.FieldLogger) { +// server's incoming HTTP request limiters. When the number of +// concurrent requests in any queue ("api" or "tunnel") exceeds 90% of +// its maximum slots, write the /_inspect/requests data to a JSON file +// in the specified directory. +func (c *command) requestQueueDumpCheck(cluster *arvados.Cluster, prog string, reg *prometheus.Registry, srv *http.Server, logger logrus.FieldLogger) { outdir := cluster.SystemLogs.RequestQueueDumpDirectory - if outdir == "" || cluster.ManagementToken == "" || maxReqs < 1 { + if outdir == "" || cluster.ManagementToken == "" { return } logger = logger.WithField("worker", "RequestQueueDump") @@ -237,16 +219,29 @@ func (c *command) requestQueueDumpCheck(cluster *arvados.Cluster, maxReqs int, p logger.WithError(err).Warn("error getting metrics") continue } - dump := false + cur := map[string]int{} // queue label => current + max := map[string]int{} // queue label => max for _, mf := range mfs { - if mf.Name != nil && *mf.Name == "arvados_concurrent_requests" && len(mf.Metric) == 1 { - n := int(mf.Metric[0].GetGauge().GetValue()) - if n > 0 && n >= maxReqs*9/10 { - dump = true - break + for _, m := range mf.GetMetric() { + for _, ml := range m.GetLabel() { + if ml.GetName() == "queue" { + n := int(m.GetGauge().GetValue()) + if name := mf.GetName(); name == "arvados_concurrent_requests" { + cur[*ml.Value] = n + } else if name == "arvados_max_concurrent_requests" { + max[*ml.Value] = n + } + } } } } + dump := false + for queue, n := range cur { + if n > 0 && max[queue] > 0 && n >= max[queue]*9/10 { + dump = true + break + } + } if dump { req, err := http.NewRequest("GET", "/_inspect/requests", nil) if err != nil { @@ -269,6 +264,48 @@ func (c *command) requestQueueDumpCheck(cluster *arvados.Cluster, maxReqs int, p } } +// Set up a httpserver.RequestLimiter with separate queues/streams for +// API requests (obeying MaxConcurrentRequests etc) and gateway tunnel +// requests (obeying MaxGatewayTunnels). +func (c *command) requestLimiter(handler http.Handler, cluster *arvados.Cluster, reg *prometheus.Registry) http.Handler { + maxReqs := cluster.API.MaxConcurrentRequests + if maxRails := cluster.API.MaxConcurrentRailsRequests; maxRails > 0 && + (maxRails < maxReqs || maxReqs == 0) && + c.svcName == arvados.ServiceNameController { + // Ideally, we would accept up to + // MaxConcurrentRequests, and apply the + // MaxConcurrentRailsRequests limit only for requests + // that require calling upstream to RailsAPI. But for + // now we make the simplifying assumption that every + // controller request causes an upstream RailsAPI + // request. + maxReqs = maxRails + } + rqAPI := &httpserver.RequestQueue{ + Label: "api", + MaxConcurrent: maxReqs, + MaxQueue: cluster.API.MaxQueuedRequests, + MaxQueueTimeForMinPriority: cluster.API.MaxQueueTimeForLockRequests.Duration(), + } + rqTunnel := &httpserver.RequestQueue{ + Label: "tunnel", + MaxConcurrent: cluster.API.MaxGatewayTunnels, + MaxQueue: 0, + } + return &httpserver.RequestLimiter{ + Handler: handler, + Priority: c.requestPriority, + Registry: reg, + Queue: func(req *http.Request) *httpserver.RequestQueue { + if strings.HasPrefix(req.URL.Path, "/arvados/v1/connect/") { + return rqTunnel + } else { + return rqAPI + } + }, + } +} + func (c *command) requestPriority(req *http.Request, queued time.Time) int64 { switch { case req.Method == http.MethodPost && strings.HasPrefix(req.URL.Path, "/arvados/v1/containers/") && strings.HasSuffix(req.URL.Path, "/lock"): diff --git a/lib/service/cmd_test.go b/lib/service/cmd_test.go index 08b3a239dc..0266752f38 100644 --- a/lib/service/cmd_test.go +++ b/lib/service/cmd_test.go @@ -17,6 +17,8 @@ import ( "net/url" "os" "strings" + "sync" + "sync/atomic" "testing" "time" @@ -198,15 +200,15 @@ func (*Suite) TestCommand(c *check.C) { c.Check(stderr.String(), check.Matches, `(?ms).*"msg":"CheckHealth called".*`) } -func (s *Suite) TestDumpRequestsKeepweb(c *check.C) { - s.testDumpRequests(c, arvados.ServiceNameKeepweb, "MaxConcurrentRequests") +func (s *Suite) TestRequestLimitsAndDumpRequests_Keepweb(c *check.C) { + s.testRequestLimitAndDumpRequests(c, arvados.ServiceNameKeepweb, "MaxConcurrentRequests") } -func (s *Suite) TestDumpRequestsController(c *check.C) { - s.testDumpRequests(c, arvados.ServiceNameController, "MaxConcurrentRailsRequests") +func (s *Suite) TestRequestLimitsAndDumpRequests_Controller(c *check.C) { + s.testRequestLimitAndDumpRequests(c, arvados.ServiceNameController, "MaxConcurrentRailsRequests") } -func (*Suite) testDumpRequests(c *check.C, serviceName arvados.ServiceName, maxReqsConfigKey string) { +func (*Suite) testRequestLimitAndDumpRequests(c *check.C, serviceName arvados.ServiceName, maxReqsConfigKey string) { defer func(orig time.Duration) { requestQueueDumpCheckInterval = orig }(requestQueueDumpCheckInterval) requestQueueDumpCheckInterval = time.Second / 10 @@ -218,6 +220,7 @@ func (*Suite) testDumpRequests(c *check.C, serviceName arvados.ServiceName, maxR defer cf.Close() max := 24 + maxTunnels := 30 fmt.Fprintf(cf, ` Clusters: zzzzz: @@ -225,7 +228,8 @@ Clusters: ManagementToken: bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb API: `+maxReqsConfigKey+`: %d - MaxQueuedRequests: 0 + MaxQueuedRequests: 1 + MaxGatewayTunnels: %d SystemLogs: {RequestQueueDumpDirectory: %q} Services: Controller: @@ -234,14 +238,18 @@ Clusters: WebDAV: ExternalURL: "http://localhost:`+port+`" InternalURLs: {"http://localhost:`+port+`": {}} -`, max, tmpdir) +`, max, maxTunnels, tmpdir) cf.Close() started := make(chan bool, max+1) hold := make(chan bool) handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - started <- true - <-hold + if strings.HasPrefix(r.URL.Path, "/arvados/v1/connect") { + <-hold + } else { + started <- true + <-hold + } }) healthCheck := make(chan bool, 1) ctx, cancel := context.WithCancel(context.Background()) @@ -267,15 +275,50 @@ Clusters: } client := http.Client{} deadline := time.Now().Add(time.Second * 2) + var activeReqs sync.WaitGroup + + // Start some API reqs + var apiResp200, apiResp503 int64 for i := 0; i < max+1; i++ { + activeReqs.Add(1) go func() { + defer activeReqs.Done() resp, err := client.Get("http://localhost:" + port + "/testpath") for err != nil && strings.Contains(err.Error(), "dial tcp") && deadline.After(time.Now()) { time.Sleep(time.Second / 100) resp, err = client.Get("http://localhost:" + port + "/testpath") } if c.Check(err, check.IsNil) { - c.Logf("resp StatusCode %d", resp.StatusCode) + if resp.StatusCode == http.StatusOK { + atomic.AddInt64(&apiResp200, 1) + } else if resp.StatusCode == http.StatusServiceUnavailable { + atomic.AddInt64(&apiResp503, 1) + } + } + }() + } + + // Start some gateway tunnel reqs that don't count toward our + // API req limit + extraTunnelReqs := 20 + var tunnelResp200, tunnelResp503 int64 + for i := 0; i < maxTunnels+extraTunnelReqs; i++ { + activeReqs.Add(1) + go func() { + defer activeReqs.Done() + resp, err := client.Get("http://localhost:" + port + "/arvados/v1/connect/...") + for err != nil && strings.Contains(err.Error(), "dial tcp") && deadline.After(time.Now()) { + time.Sleep(time.Second / 100) + resp, err = client.Get("http://localhost:" + port + "/arvados/v1/connect/...") + } + if c.Check(err, check.IsNil) { + if resp.StatusCode == http.StatusOK { + atomic.AddInt64(&tunnelResp200, 1) + } else if resp.StatusCode == http.StatusServiceUnavailable { + atomic.AddInt64(&tunnelResp503, 1) + } else { + c.Errorf("tunnel response code %d", resp.StatusCode) + } } }() } @@ -300,6 +343,20 @@ Clusters: var loaded []struct{ URL string } err = json.Unmarshal(j, &loaded) c.Check(err, check.IsNil) + + for i := 0; i < len(loaded); i++ { + if strings.HasPrefix(loaded[i].URL, "/arvados/v1/connect/") { + // Filter out a gateway tunnel req + // that doesn't count toward our API + // req limit + if i < len(loaded)-1 { + copy(loaded[i:], loaded[i+1:]) + i-- + } + loaded = loaded[:len(loaded)-1] + } + } + if len(loaded) < max { // Dumped when #requests was >90% but <100% of // limit. If we stop now, we won't be able to @@ -309,7 +366,7 @@ Clusters: c.Logf("loaded dumped requests, but len %d < max %d -- still waiting", len(loaded), max) continue } - c.Check(loaded, check.HasLen, max) + c.Check(loaded, check.HasLen, max+1) c.Check(loaded[0].URL, check.Equals, "/testpath") break } @@ -328,7 +385,8 @@ Clusters: c.Check(err, check.IsNil) switch path { case "/metrics": - c.Check(string(buf), check.Matches, `(?ms).*arvados_concurrent_requests `+fmt.Sprintf("%d", max)+`\n.*`) + c.Check(string(buf), check.Matches, `(?ms).*arvados_concurrent_requests{queue="api"} `+fmt.Sprintf("%d", max)+`\n.*`) + c.Check(string(buf), check.Matches, `(?ms).*arvados_queued_requests{priority="normal",queue="api"} 1\n.*`) case "/_inspect/requests": c.Check(string(buf), check.Matches, `(?ms).*"URL":"/testpath".*`) default: @@ -336,6 +394,11 @@ Clusters: } } close(hold) + activeReqs.Wait() + c.Check(int(apiResp200), check.Equals, max+1) + c.Check(int(apiResp503), check.Equals, 0) + c.Check(int(tunnelResp200), check.Equals, maxTunnels) + c.Check(int(tunnelResp503), check.Equals, extraTunnelReqs) cancel() } diff --git a/sdk/go/arvados/config.go b/sdk/go/arvados/config.go index 6301ed047a..16d789daf5 100644 --- a/sdk/go/arvados/config.go +++ b/sdk/go/arvados/config.go @@ -102,6 +102,7 @@ type Cluster struct { MaxConcurrentRailsRequests int MaxConcurrentRequests int MaxQueuedRequests int + MaxGatewayTunnels int MaxQueueTimeForLockRequests Duration LogCreateRequestFraction float64 MaxKeepBlobBuffers int diff --git a/sdk/go/httpserver/request_limiter.go b/sdk/go/httpserver/request_limiter.go index 9d501ab0eb..1e3316ed48 100644 --- a/sdk/go/httpserver/request_limiter.go +++ b/sdk/go/httpserver/request_limiter.go @@ -34,13 +34,8 @@ const metricsUpdateInterval = time.Second type RequestLimiter struct { Handler http.Handler - // Maximum number of requests being handled at once. Beyond - // this limit, requests will be queued. - MaxConcurrent int - - // Maximum number of requests in the queue. Beyond this limit, - // the lowest priority requests will return 503. - MaxQueue int + // Queue determines which queue a request is assigned to. + Queue func(req *http.Request) *RequestQueue // Priority determines queue ordering. Requests with higher // priority are handled first. Requests with equal priority @@ -48,11 +43,6 @@ type RequestLimiter struct { // handled FIFO. Priority func(req *http.Request, queued time.Time) int64 - // Return 503 for any request for which Priority() returns - // MinPriority if it spends longer than this in the queue - // before starting processing. - MaxQueueTimeForMinPriority time.Duration - // "concurrent_requests", "max_concurrent_requests", // "queued_requests", and "max_queued_requests" metrics are // registered with Registry, if it is not nil. @@ -63,11 +53,32 @@ type RequestLimiter struct { mQueueTimeout *prometheus.SummaryVec mQueueUsage *prometheus.GaugeVec mtx sync.Mutex - handling int - queue queue + rqs map[*RequestQueue]bool // all RequestQueues in use +} + +type RequestQueue struct { + // Label for metrics. No two queues should have the same label. + Label string + + // Maximum number of requests being handled at once. Beyond + // this limit, requests will be queued. + MaxConcurrent int + + // Maximum number of requests in the queue. Beyond this limit, + // the lowest priority requests will return 503. + MaxQueue int + + // Return 503 for any request for which Priority() returns + // MinPriority if it spends longer than this in the queue + // before starting processing. + MaxQueueTimeForMinPriority time.Duration + + queue queue + handling int } type qent struct { + rq *RequestQueue queued time.Time priority int64 heappos int @@ -121,101 +132,96 @@ func (h *queue) remove(i int) { func (rl *RequestLimiter) setup() { if rl.Registry != nil { - rl.Registry.MustRegister(prometheus.NewGaugeFunc( - prometheus.GaugeOpts{ - Namespace: "arvados", - Name: "concurrent_requests", - Help: "Number of requests in progress", - }, - func() float64 { - rl.mtx.Lock() - defer rl.mtx.Unlock() - return float64(rl.handling) - }, - )) - rl.Registry.MustRegister(prometheus.NewGaugeFunc( - prometheus.GaugeOpts{ - Namespace: "arvados", - Name: "max_concurrent_requests", - Help: "Maximum number of concurrent requests", - }, - func() float64 { return float64(rl.MaxConcurrent) }, - )) + mCurrentReqs := prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "arvados", + Name: "concurrent_requests", + Help: "Number of requests in progress", + }, []string{"queue"}) + rl.Registry.MustRegister(mCurrentReqs) + mMaxReqs := prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "arvados", + Name: "max_concurrent_requests", + Help: "Maximum number of concurrent requests", + }, []string{"queue"}) + rl.Registry.MustRegister(mMaxReqs) + mMaxQueue := prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "arvados", + Name: "max_queued_requests", + Help: "Maximum number of queued requests", + }, []string{"queue"}) + rl.Registry.MustRegister(mMaxQueue) rl.mQueueUsage = prometheus.NewGaugeVec(prometheus.GaugeOpts{ Namespace: "arvados", Name: "queued_requests", Help: "Number of requests in queue", - }, []string{"priority"}) + }, []string{"queue", "priority"}) rl.Registry.MustRegister(rl.mQueueUsage) - rl.Registry.MustRegister(prometheus.NewGaugeFunc( - prometheus.GaugeOpts{ - Namespace: "arvados", - Name: "max_queued_requests", - Help: "Maximum number of queued requests", - }, - func() float64 { return float64(rl.MaxQueue) }, - )) rl.mQueueDelay = prometheus.NewSummaryVec(prometheus.SummaryOpts{ Namespace: "arvados", Name: "queue_delay_seconds", Help: "Time spent in the incoming request queue before start of processing", Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001}, - }, []string{"priority"}) + }, []string{"queue", "priority"}) rl.Registry.MustRegister(rl.mQueueDelay) rl.mQueueTimeout = prometheus.NewSummaryVec(prometheus.SummaryOpts{ Namespace: "arvados", Name: "queue_timeout_seconds", Help: "Time spent in the incoming request queue before client timed out or disconnected", Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001}, - }, []string{"priority"}) + }, []string{"queue", "priority"}) rl.Registry.MustRegister(rl.mQueueTimeout) go func() { for range time.NewTicker(metricsUpdateInterval).C { - var low, normal, high int rl.mtx.Lock() - for _, ent := range rl.queue { - switch { - case ent.priority < 0: - low++ - case ent.priority > 0: - high++ - default: - normal++ + for rq := range rl.rqs { + var low, normal, high int + for _, ent := range rq.queue { + switch { + case ent.priority < 0: + low++ + case ent.priority > 0: + high++ + default: + normal++ + } } + mCurrentReqs.WithLabelValues(rq.Label).Set(float64(rq.handling)) + mMaxReqs.WithLabelValues(rq.Label).Set(float64(rq.MaxConcurrent)) + mMaxQueue.WithLabelValues(rq.Label).Set(float64(rq.MaxQueue)) + rl.mQueueUsage.WithLabelValues(rq.Label, "low").Set(float64(low)) + rl.mQueueUsage.WithLabelValues(rq.Label, "normal").Set(float64(normal)) + rl.mQueueUsage.WithLabelValues(rq.Label, "high").Set(float64(high)) } rl.mtx.Unlock() - rl.mQueueUsage.WithLabelValues("low").Set(float64(low)) - rl.mQueueUsage.WithLabelValues("normal").Set(float64(normal)) - rl.mQueueUsage.WithLabelValues("high").Set(float64(high)) } }() } } // caller must have lock -func (rl *RequestLimiter) runqueue() { +func (rq *RequestQueue) runqueue() { // Handle entries from the queue as capacity permits - for len(rl.queue) > 0 && (rl.MaxConcurrent == 0 || rl.handling < rl.MaxConcurrent) { - rl.handling++ - ent := rl.queue.removeMax() + for len(rq.queue) > 0 && (rq.MaxConcurrent == 0 || rq.handling < rq.MaxConcurrent) { + rq.handling++ + ent := rq.queue.removeMax() ent.ready <- true } } // If the queue is too full, fail and remove the lowest-priority // entry. Caller must have lock. Queue must not be empty. -func (rl *RequestLimiter) trimqueue() { - if len(rl.queue) <= rl.MaxQueue { +func (rq *RequestQueue) trimqueue() { + if len(rq.queue) <= rq.MaxQueue { return } min := 0 - for i := range rl.queue { - if i == 0 || rl.queue.Less(min, i) { + for i := range rq.queue { + if i == 0 || rq.queue.Less(min, i) { min = i } } - rl.queue[min].ready <- false - rl.queue.remove(min) + rq.queue[min].ready <- false + rq.queue.remove(min) } func (rl *RequestLimiter) enqueue(req *http.Request) *qent { @@ -227,19 +233,24 @@ func (rl *RequestLimiter) enqueue(req *http.Request) *qent { priority = rl.Priority(req, qtime) } ent := &qent{ + rq: rl.Queue(req), queued: qtime, priority: priority, ready: make(chan bool, 1), heappos: -1, } - if rl.MaxConcurrent == 0 || rl.MaxConcurrent > rl.handling { + if rl.rqs == nil { + rl.rqs = map[*RequestQueue]bool{} + } + rl.rqs[ent.rq] = true + if ent.rq.MaxConcurrent == 0 || ent.rq.MaxConcurrent > ent.rq.handling { // fast path, skip the queue - rl.handling++ + ent.rq.handling++ ent.ready <- true return ent } - rl.queue.add(ent) - rl.trimqueue() + ent.rq.queue.add(ent) + ent.rq.trimqueue() return ent } @@ -247,7 +258,7 @@ func (rl *RequestLimiter) remove(ent *qent) { rl.mtx.Lock() defer rl.mtx.Unlock() if ent.heappos >= 0 { - rl.queue.remove(ent.heappos) + ent.rq.queue.remove(ent.heappos) ent.ready <- false } } @@ -255,14 +266,14 @@ func (rl *RequestLimiter) remove(ent *qent) { func (rl *RequestLimiter) ServeHTTP(resp http.ResponseWriter, req *http.Request) { rl.setupOnce.Do(rl.setup) ent := rl.enqueue(req) - SetResponseLogFields(req.Context(), logrus.Fields{"priority": ent.priority}) + SetResponseLogFields(req.Context(), logrus.Fields{"priority": ent.priority, "queue": ent.rq.Label}) if ent.priority == MinPriority { // Note that MaxQueueTime==0 does not cancel a req // that skips the queue, because in that case // rl.enqueue() has already fired ready<-true and // rl.remove() is a no-op. go func() { - time.Sleep(rl.MaxQueueTimeForMinPriority) + time.Sleep(ent.rq.MaxQueueTimeForMinPriority) rl.remove(ent) }() } @@ -273,7 +284,7 @@ func (rl *RequestLimiter) ServeHTTP(resp http.ResponseWriter, req *http.Request) // we still need to wait for ent.ready, because // sometimes runqueue() will have already decided to // send true before our rl.remove() call, and in that - // case we'll need to decrement rl.handling below. + // case we'll need to decrement ent.rq.handling below. ok = <-ent.ready case ok = <-ent.ready: } @@ -298,7 +309,7 @@ func (rl *RequestLimiter) ServeHTTP(resp http.ResponseWriter, req *http.Request) default: qlabel = "normal" } - series.WithLabelValues(qlabel).Observe(time.Now().Sub(ent.queued).Seconds()) + series.WithLabelValues(ent.rq.Label, qlabel).Observe(time.Now().Sub(ent.queued).Seconds()) } if !ok { @@ -308,9 +319,9 @@ func (rl *RequestLimiter) ServeHTTP(resp http.ResponseWriter, req *http.Request) defer func() { rl.mtx.Lock() defer rl.mtx.Unlock() - rl.handling-- + ent.rq.handling-- // unblock the next waiting request - rl.runqueue() + ent.rq.runqueue() }() rl.Handler.ServeHTTP(resp, req) } diff --git a/sdk/go/httpserver/request_limiter_test.go b/sdk/go/httpserver/request_limiter_test.go index 55f13b4625..7366e1426b 100644 --- a/sdk/go/httpserver/request_limiter_test.go +++ b/sdk/go/httpserver/request_limiter_test.go @@ -34,7 +34,11 @@ func newTestHandler() *testHandler { func (s *Suite) TestRequestLimiter1(c *check.C) { h := newTestHandler() - l := RequestLimiter{MaxConcurrent: 1, Handler: h} + rq := &RequestQueue{ + MaxConcurrent: 1} + l := RequestLimiter{ + Queue: func(*http.Request) *RequestQueue { return rq }, + Handler: h} var wg sync.WaitGroup resps := make([]*httptest.ResponseRecorder, 10) for i := 0; i < 10; i++ { @@ -94,7 +98,11 @@ func (s *Suite) TestRequestLimiter1(c *check.C) { func (*Suite) TestRequestLimiter10(c *check.C) { h := newTestHandler() - l := RequestLimiter{MaxConcurrent: 10, Handler: h} + rq := &RequestQueue{ + MaxConcurrent: 10} + l := RequestLimiter{ + Queue: func(*http.Request) *RequestQueue { return rq }, + Handler: h} var wg sync.WaitGroup for i := 0; i < 10; i++ { wg.Add(1) @@ -114,29 +122,32 @@ func (*Suite) TestRequestLimiter10(c *check.C) { func (*Suite) TestRequestLimiterQueuePriority(c *check.C) { h := newTestHandler() - rl := RequestLimiter{ + rq := &RequestQueue{ MaxConcurrent: 1000, MaxQueue: 200, - Handler: h, + } + rl := RequestLimiter{ + Handler: h, + Queue: func(*http.Request) *RequestQueue { return rq }, Priority: func(r *http.Request, _ time.Time) int64 { p, _ := strconv.ParseInt(r.Header.Get("Priority"), 10, 64) return p }} c.Logf("starting initial requests") - for i := 0; i < rl.MaxConcurrent; i++ { + for i := 0; i < rq.MaxConcurrent; i++ { go func() { rl.ServeHTTP(httptest.NewRecorder(), &http.Request{Header: http.Header{"No-Priority": {"x"}}}) }() } c.Logf("waiting for initial requests to consume all MaxConcurrent slots") - for i := 0; i < rl.MaxConcurrent; i++ { + for i := 0; i < rq.MaxConcurrent; i++ { <-h.inHandler } - c.Logf("starting %d priority=MinPriority requests (should respond 503 immediately)", rl.MaxQueue) + c.Logf("starting %d priority=MinPriority requests (should respond 503 immediately)", rq.MaxQueue) var wgX sync.WaitGroup - for i := 0; i < rl.MaxQueue; i++ { + for i := 0; i < rq.MaxQueue; i++ { wgX.Add(1) go func() { defer wgX.Done() @@ -147,13 +158,13 @@ func (*Suite) TestRequestLimiterQueuePriority(c *check.C) { } wgX.Wait() - c.Logf("starting %d priority=MinPriority requests (should respond 503 after 100 ms)", rl.MaxQueue) + c.Logf("starting %d priority=MinPriority requests (should respond 503 after 100 ms)", rq.MaxQueue) // Usage docs say the caller isn't allowed to change fields // after first use, but we secretly know it's OK to change // this field on the fly as long as no requests are arriving // concurrently. - rl.MaxQueueTimeForMinPriority = time.Millisecond * 100 - for i := 0; i < rl.MaxQueue; i++ { + rq.MaxQueueTimeForMinPriority = time.Millisecond * 100 + for i := 0; i < rq.MaxQueue; i++ { wgX.Add(1) go func() { defer wgX.Done() @@ -162,17 +173,17 @@ func (*Suite) TestRequestLimiterQueuePriority(c *check.C) { rl.ServeHTTP(resp, &http.Request{Header: http.Header{"Priority": {fmt.Sprintf("%d", MinPriority)}}}) c.Check(resp.Code, check.Equals, http.StatusServiceUnavailable) elapsed := time.Since(t0) - c.Check(elapsed > rl.MaxQueueTimeForMinPriority, check.Equals, true) - c.Check(elapsed < rl.MaxQueueTimeForMinPriority*10, check.Equals, true) + c.Check(elapsed > rq.MaxQueueTimeForMinPriority, check.Equals, true) + c.Check(elapsed < rq.MaxQueueTimeForMinPriority*10, check.Equals, true) }() } wgX.Wait() - c.Logf("starting %d priority=1 and %d priority=1 requests", rl.MaxQueue, rl.MaxQueue) + c.Logf("starting %d priority=1 and %d priority=1 requests", rq.MaxQueue, rq.MaxQueue) var wg1, wg2 sync.WaitGroup - wg1.Add(rl.MaxQueue) - wg2.Add(rl.MaxQueue) - for i := 0; i < rl.MaxQueue*2; i++ { + wg1.Add(rq.MaxQueue) + wg2.Add(rq.MaxQueue) + for i := 0; i < rq.MaxQueue*2; i++ { i := i go func() { pri := (i & 1) + 1 @@ -192,12 +203,12 @@ func (*Suite) TestRequestLimiterQueuePriority(c *check.C) { wg1.Wait() c.Logf("allowing initial requests to proceed") - for i := 0; i < rl.MaxConcurrent; i++ { + for i := 0; i < rq.MaxConcurrent; i++ { h.okToProceed <- struct{}{} } c.Logf("allowing queued priority=2 requests to proceed") - for i := 0; i < rl.MaxQueue; i++ { + for i := 0; i < rq.MaxQueue; i++ { <-h.inHandler h.okToProceed <- struct{}{} } -- 2.30.2