9066: Add keepstore -max-requests argument.
authorTom Clegg <tom@curoverse.com>
Thu, 28 Apr 2016 13:11:33 +0000 (09:11 -0400)
committerTom Clegg <tom@curoverse.com>
Fri, 29 Apr 2016 16:59:54 +0000 (12:59 -0400)
build/run-tests.sh
doc/install/install-keepstore.html.textile.liquid
sdk/go/httpserver/request_limiter.go [new file with mode: 0644]
sdk/go/httpserver/request_limiter_test.go [new file with mode: 0644]
services/keepstore/handler_test.go
services/keepstore/keepstore.go
services/keepstore/logging_router.go

index 53df93c55810c4a9dbfbb2c87352ffbd5921e2a6..c94f831a36a5e41d69d524c6eb723ade34ca6549 100755 (executable)
@@ -81,6 +81,7 @@ sdk/python
 sdk/ruby
 sdk/go/arvadosclient
 sdk/go/keepclient
+sdk/go/httpserver
 sdk/go/manifest
 sdk/go/blockdigest
 sdk/go/streamer
@@ -703,6 +704,7 @@ declare -a gostuff
 gostuff=(
     sdk/go/arvadosclient
     sdk/go/blockdigest
+    sdk/go/httpserver
     sdk/go/manifest
     sdk/go/streamer
     sdk/go/crunchrunner
index b211ce60bf49cbd88221a60c0dd7eb4e3164f5ad..6548422f4f8d0492cfac61a25257c365f238bcde 100644 (file)
@@ -47,6 +47,8 @@ Usage of ./keepstore:
   -enforce-permissions=false: Enforce permission signatures on requests.
   -listen=":25107": Listening address, in the form "host:port". e.g., 10.0.1.24:8000. Omit the host part to listen on all interfaces.
   -max-buffers=128: Maximum RAM to use for data buffers, given in multiples of block size (64 MiB). When this limit is reached, HTTP requests requiring buffers (like GET and PUT) will wait for buffer space to be released.
+  -max-requests int
+   Maximum concurrent requests. When this limit is reached, new requests will receive 503 responses. Note: this limit does not include idle connections from clients using HTTP keepalive, so it does not strictly limit the number of concurrent connections. (default 2 * max-buffers)
   -never-delete=false: If set, nothing will be deleted. HTTP 405 will be returned for valid DELETE requests.
   -permission-key-file="": Synonym for -blob-signing-key-file.
   -permission-ttl=0: Synonym for -blob-signature-ttl.
diff --git a/sdk/go/httpserver/request_limiter.go b/sdk/go/httpserver/request_limiter.go
new file mode 100644 (file)
index 0000000..178ffb9
--- /dev/null
@@ -0,0 +1,29 @@
+package httpserver
+
+import (
+       "net/http"
+)
+
+type limiterHandler struct {
+       requests chan struct{}
+       handler  http.Handler
+}
+
+func NewRequestLimiter(maxRequests int, handler http.Handler) http.Handler {
+       return &limiterHandler{
+               requests: make(chan struct{}, maxRequests),
+               handler:  handler,
+       }
+}
+
+func (h *limiterHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+       select {
+       case h.requests <- struct{}{}:
+       default:
+               // reached max requests
+               resp.WriteHeader(http.StatusServiceUnavailable)
+               return
+       }
+       h.handler.ServeHTTP(resp, req)
+       <-h.requests
+}
diff --git a/sdk/go/httpserver/request_limiter_test.go b/sdk/go/httpserver/request_limiter_test.go
new file mode 100644 (file)
index 0000000..a8cc806
--- /dev/null
@@ -0,0 +1,106 @@
+package httpserver
+
+import (
+       "net/http"
+       "net/http/httptest"
+       "sync"
+       "testing"
+       "time"
+)
+
+type testHandler struct {
+       inHandler   chan struct{}
+       okToProceed chan struct{}
+}
+
+func (h *testHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+       h.inHandler <- struct{}{}
+       <-h.okToProceed
+}
+
+func newTestHandler(maxReqs int) *testHandler {
+       return &testHandler{
+               inHandler:   make(chan struct{}),
+               okToProceed: make(chan struct{}),
+       }
+}
+
+func TestRequestLimiter1(t *testing.T) {
+       h := newTestHandler(10)
+       l := NewRequestLimiter(1, h)
+       var wg sync.WaitGroup
+       resps := make([]*httptest.ResponseRecorder, 10)
+       for i := 0; i < 10; i++ {
+               wg.Add(1)
+               resps[i] = httptest.NewRecorder()
+               go func(i int) {
+                       l.ServeHTTP(resps[i], &http.Request{})
+                       wg.Done()
+               }(i)
+       }
+       done := make(chan struct{})
+       go func() {
+               // Make sure one request has entered the handler
+               <-h.inHandler
+               // Make sure all unsuccessful requests finish (but don't wait
+               // for the one that's still waiting for okToProceed)
+               wg.Add(-1)
+               wg.Wait()
+               // Wait for the last goroutine
+               wg.Add(1)
+               h.okToProceed <- struct{}{}
+               wg.Wait()
+               done <- struct{}{}
+       }()
+       select {
+       case <-done:
+       case <-time.After(10 * time.Second):
+               t.Fatal("test timed out, probably deadlocked")
+       }
+       n200 := 0
+       n503 := 0
+       for i := 0; i < 10; i++ {
+               switch resps[i].Code {
+               case 200:
+                       n200++
+               case 503:
+                       n503++
+               default:
+                       t.Fatalf("Unexpected response code %d", resps[i].Code)
+               }
+       }
+       if n200 != 1 || n503 != 9 {
+               t.Fatalf("Got %d 200 responses, %d 503 responses (expected 1, 9)", n200, n503)
+       }
+       // Now that all 10 are finished, an 11th request should
+       // succeed.
+       go func() {
+               <-h.inHandler
+               h.okToProceed <- struct{}{}
+       }()
+       resp := httptest.NewRecorder()
+       l.ServeHTTP(resp, &http.Request{})
+       if resp.Code != 200 {
+               t.Errorf("Got status %d on 11th request, want 200", resp.Code)
+       }
+}
+
+func TestRequestLimiter10(t *testing.T) {
+       h := newTestHandler(10)
+       l := NewRequestLimiter(10, h)
+       var wg sync.WaitGroup
+       for i := 0; i < 10; i++ {
+               wg.Add(1)
+               go func() {
+                       l.ServeHTTP(httptest.NewRecorder(), &http.Request{})
+                       wg.Done()
+               }()
+               // Make sure the handler starts before we initiate the
+               // next request, but don't let it finish yet.
+               <-h.inHandler
+       }
+       for i := 0; i < 10; i++ {
+               h.okToProceed <- struct{}{}
+       }
+       wg.Wait()
+}
index a7675fb1dcfbea5782a40ef6fc5b0d0c8bd93a8f..33d585ae1e69f18868f7eb57278637c7f855761b 100644 (file)
@@ -814,7 +814,7 @@ func IssueRequest(rt *RequestTester) *httptest.ResponseRecorder {
        if rt.apiToken != "" {
                req.Header.Set("Authorization", "OAuth2 "+rt.apiToken)
        }
-       loggingRouter := MakeLoggingRESTRouter()
+       loggingRouter := MakeRESTRouter()
        loggingRouter.ServeHTTP(response, req)
        return response
 }
index b17cc79e17d5bf540894801b648f5d60d4d5bdc9..93ee43c446cf96624a09a0ff7660d198cacdd3cd 100644 (file)
@@ -4,6 +4,7 @@ import (
        "bytes"
        "flag"
        "fmt"
+       "git.curoverse.com/arvados.git/sdk/go/httpserver"
        "git.curoverse.com/arvados.git/sdk/go/keepclient"
        "io/ioutil"
        "log"
@@ -145,6 +146,7 @@ func main() {
                blobSigningKeyFile   string
                permissionTTLSec     int
                pidfile              string
+               maxRequests          int
        )
        flag.StringVar(
                &dataManagerTokenFile,
@@ -162,6 +164,11 @@ func main() {
                "listen",
                DefaultAddr,
                "Listening address, in the form \"host:port\". e.g., 10.0.1.24:8000. Omit the host part to listen on all interfaces.")
+       flag.IntVar(
+               &maxRequests,
+               "max-requests",
+               0,
+               "Maximum concurrent requests. When this limit is reached, new requests will receive 503 responses. Note: this limit does not include idle connections from clients using HTTP keepalive, so it does not strictly limit the number of concurrent connections. (default 2 * max-buffers)")
        flag.BoolVar(
                &neverDelete,
                "never-delete",
@@ -302,13 +309,18 @@ func main() {
                }
        }
 
+       if maxRequests <= 0 {
+               maxRequests = maxBuffers * 2
+               log.Printf("-max-requests <1 or not specified; defaulting to maxBuffers * 2 == %d", maxRequests)
+       }
+
        // Start a round-robin VolumeManager with the volumes we have found.
        KeepVM = MakeRRVolumeManager(volumes)
 
-       // Tell the built-in HTTP server to direct all requests to the REST router.
-       loggingRouter := MakeLoggingRESTRouter()
-       http.HandleFunc("/", func(resp http.ResponseWriter, req *http.Request) {
-               loggingRouter.ServeHTTP(resp, req)
+       // Middleware stack: logger, maxRequests limiter, method handlers
+       http.Handle("/", &LoggingRESTRouter{
+               httpserver.NewRequestLimiter(maxRequests,
+                       MakeRESTRouter()),
        })
 
        // Set up a TCP listener.
index 9edfb6e69d15356bd7bee516e8993e0d98953bc5..a93b72cf611cfc4dfa6283b8bd249c02db52ca19 100644 (file)
@@ -4,7 +4,6 @@ package main
 // LoggingResponseWriter
 
 import (
-       "github.com/gorilla/mux"
        "log"
        "net/http"
        "strings"
@@ -44,13 +43,7 @@ func (loggingWriter *LoggingResponseWriter) Write(data []byte) (int, error) {
 
 // LoggingRESTRouter is used to add logging capabilities to mux.Router
 type LoggingRESTRouter struct {
-       router *mux.Router
-}
-
-// MakeLoggingRESTRouter initializes LoggingRESTRouter
-func MakeLoggingRESTRouter() *LoggingRESTRouter {
-       router := MakeRESTRouter()
-       return (&LoggingRESTRouter{router})
+       router http.Handler
 }
 
 func (loggingRouter *LoggingRESTRouter) ServeHTTP(resp http.ResponseWriter, req *http.Request) {