sdk/ruby
sdk/go/arvadosclient
sdk/go/keepclient
+sdk/go/httpserver
sdk/go/manifest
sdk/go/blockdigest
sdk/go/streamer
gostuff=(
sdk/go/arvadosclient
sdk/go/blockdigest
+ sdk/go/httpserver
sdk/go/manifest
sdk/go/streamer
sdk/go/crunchrunner
-enforce-permissions=false: Enforce permission signatures on requests.
-listen=":25107": Listening address, in the form "host:port". e.g., 10.0.1.24:8000. Omit the host part to listen on all interfaces.
-max-buffers=128: Maximum RAM to use for data buffers, given in multiples of block size (64 MiB). When this limit is reached, HTTP requests requiring buffers (like GET and PUT) will wait for buffer space to be released.
+ -max-requests int
+ Maximum concurrent requests. When this limit is reached, new requests will receive 503 responses. Note: this limit does not include idle connections from clients using HTTP keepalive, so it does not strictly limit the number of concurrent connections. (default 2 * max-buffers)
-never-delete=false: If set, nothing will be deleted. HTTP 405 will be returned for valid DELETE requests.
-permission-key-file="": Synonym for -blob-signing-key-file.
-permission-ttl=0: Synonym for -blob-signature-ttl.
--- /dev/null
+package httpserver
+
+import (
+ "net/http"
+)
+
+type limiterHandler struct {
+ requests chan struct{}
+ handler http.Handler
+}
+
+func NewRequestLimiter(maxRequests int, handler http.Handler) http.Handler {
+ return &limiterHandler{
+ requests: make(chan struct{}, maxRequests),
+ handler: handler,
+ }
+}
+
+func (h *limiterHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+ select {
+ case h.requests <- struct{}{}:
+ default:
+ // reached max requests
+ resp.WriteHeader(http.StatusServiceUnavailable)
+ return
+ }
+ h.handler.ServeHTTP(resp, req)
+ <-h.requests
+}
--- /dev/null
+package httpserver
+
+import (
+ "net/http"
+ "net/http/httptest"
+ "sync"
+ "testing"
+ "time"
+)
+
+type testHandler struct {
+ inHandler chan struct{}
+ okToProceed chan struct{}
+}
+
+func (h *testHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+ h.inHandler <- struct{}{}
+ <-h.okToProceed
+}
+
+func newTestHandler(maxReqs int) *testHandler {
+ return &testHandler{
+ inHandler: make(chan struct{}),
+ okToProceed: make(chan struct{}),
+ }
+}
+
+func TestRequestLimiter1(t *testing.T) {
+ h := newTestHandler(10)
+ l := NewRequestLimiter(1, h)
+ var wg sync.WaitGroup
+ resps := make([]*httptest.ResponseRecorder, 10)
+ for i := 0; i < 10; i++ {
+ wg.Add(1)
+ resps[i] = httptest.NewRecorder()
+ go func(i int) {
+ l.ServeHTTP(resps[i], &http.Request{})
+ wg.Done()
+ }(i)
+ }
+ done := make(chan struct{})
+ go func() {
+ // Make sure one request has entered the handler
+ <-h.inHandler
+ // Make sure all unsuccessful requests finish (but don't wait
+ // for the one that's still waiting for okToProceed)
+ wg.Add(-1)
+ wg.Wait()
+ // Wait for the last goroutine
+ wg.Add(1)
+ h.okToProceed <- struct{}{}
+ wg.Wait()
+ done <- struct{}{}
+ }()
+ select {
+ case <-done:
+ case <-time.After(10 * time.Second):
+ t.Fatal("test timed out, probably deadlocked")
+ }
+ n200 := 0
+ n503 := 0
+ for i := 0; i < 10; i++ {
+ switch resps[i].Code {
+ case 200:
+ n200++
+ case 503:
+ n503++
+ default:
+ t.Fatalf("Unexpected response code %d", resps[i].Code)
+ }
+ }
+ if n200 != 1 || n503 != 9 {
+ t.Fatalf("Got %d 200 responses, %d 503 responses (expected 1, 9)", n200, n503)
+ }
+ // Now that all 10 are finished, an 11th request should
+ // succeed.
+ go func() {
+ <-h.inHandler
+ h.okToProceed <- struct{}{}
+ }()
+ resp := httptest.NewRecorder()
+ l.ServeHTTP(resp, &http.Request{})
+ if resp.Code != 200 {
+ t.Errorf("Got status %d on 11th request, want 200", resp.Code)
+ }
+}
+
+func TestRequestLimiter10(t *testing.T) {
+ h := newTestHandler(10)
+ l := NewRequestLimiter(10, h)
+ var wg sync.WaitGroup
+ for i := 0; i < 10; i++ {
+ wg.Add(1)
+ go func() {
+ l.ServeHTTP(httptest.NewRecorder(), &http.Request{})
+ wg.Done()
+ }()
+ // Make sure the handler starts before we initiate the
+ // next request, but don't let it finish yet.
+ <-h.inHandler
+ }
+ for i := 0; i < 10; i++ {
+ h.okToProceed <- struct{}{}
+ }
+ wg.Wait()
+}
if rt.apiToken != "" {
req.Header.Set("Authorization", "OAuth2 "+rt.apiToken)
}
- loggingRouter := MakeLoggingRESTRouter()
+ loggingRouter := MakeRESTRouter()
loggingRouter.ServeHTTP(response, req)
return response
}
"bytes"
"flag"
"fmt"
+ "git.curoverse.com/arvados.git/sdk/go/httpserver"
"git.curoverse.com/arvados.git/sdk/go/keepclient"
"io/ioutil"
"log"
blobSigningKeyFile string
permissionTTLSec int
pidfile string
+ maxRequests int
)
flag.StringVar(
&dataManagerTokenFile,
"listen",
DefaultAddr,
"Listening address, in the form \"host:port\". e.g., 10.0.1.24:8000. Omit the host part to listen on all interfaces.")
+ flag.IntVar(
+ &maxRequests,
+ "max-requests",
+ 0,
+ "Maximum concurrent requests. When this limit is reached, new requests will receive 503 responses. Note: this limit does not include idle connections from clients using HTTP keepalive, so it does not strictly limit the number of concurrent connections. (default 2 * max-buffers)")
flag.BoolVar(
&neverDelete,
"never-delete",
}
}
+ if maxRequests <= 0 {
+ maxRequests = maxBuffers * 2
+ log.Printf("-max-requests <1 or not specified; defaulting to maxBuffers * 2 == %d", maxRequests)
+ }
+
// Start a round-robin VolumeManager with the volumes we have found.
KeepVM = MakeRRVolumeManager(volumes)
- // Tell the built-in HTTP server to direct all requests to the REST router.
- loggingRouter := MakeLoggingRESTRouter()
- http.HandleFunc("/", func(resp http.ResponseWriter, req *http.Request) {
- loggingRouter.ServeHTTP(resp, req)
+ // Middleware stack: logger, maxRequests limiter, method handlers
+ http.Handle("/", &LoggingRESTRouter{
+ httpserver.NewRequestLimiter(maxRequests,
+ MakeRESTRouter()),
})
// Set up a TCP listener.
// LoggingResponseWriter
import (
- "github.com/gorilla/mux"
"log"
"net/http"
"strings"
// LoggingRESTRouter is used to add logging capabilities to mux.Router
type LoggingRESTRouter struct {
- router *mux.Router
-}
-
-// MakeLoggingRESTRouter initializes LoggingRESTRouter
-func MakeLoggingRESTRouter() *LoggingRESTRouter {
- router := MakeRESTRouter()
- return (&LoggingRESTRouter{router})
+ router http.Handler
}
func (loggingRouter *LoggingRESTRouter) ServeHTTP(resp http.ResponseWriter, req *http.Request) {