X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/861cff5ffc2eb2739573b8991fca1cfdc388377f..8a56164a8a9d3c7b3b58df77d1e5aa3f1d6cad12:/services/keepstore/keepstore.go diff --git a/services/keepstore/keepstore.go b/services/keepstore/keepstore.go index 40e62c5c50..d7da67c348 100644 --- a/services/keepstore/keepstore.go +++ b/services/keepstore/keepstore.go @@ -4,6 +4,8 @@ import ( "bytes" "flag" "fmt" + "git.curoverse.com/arvados.git/sdk/go/arvadosclient" + "git.curoverse.com/arvados.git/sdk/go/httpserver" "git.curoverse.com/arvados.git/sdk/go/keepclient" "io/ioutil" "log" @@ -90,6 +92,7 @@ var ( TooLongError = &KeepError{413, "Block is too large"} MethodDisabledError = &KeepError{405, "Method disabled"} ErrNotImplemented = &KeepError{500, "Unsupported configuration"} + ErrClientDisconnect = &KeepError{503, "Client disconnected"} ) func (e *KeepError) Error() string { @@ -145,6 +148,7 @@ func main() { blobSigningKeyFile string permissionTTLSec int pidfile string + maxRequests int ) flag.StringVar( &dataManagerTokenFile, @@ -162,6 +166,11 @@ func main() { "listen", DefaultAddr, "Listening address, in the form \"host:port\". e.g., 10.0.1.24:8000. Omit the host part to listen on all interfaces.") + flag.IntVar( + &maxRequests, + "max-requests", + 0, + "Maximum concurrent requests. When this limit is reached, new requests will receive 503 responses. Note: this limit does not include idle connections from clients using HTTP keepalive, so it does not strictly limit the number of concurrent connections. (default 2 * max-buffers)") flag.BoolVar( &neverDelete, "never-delete", @@ -189,7 +198,7 @@ func main() { &permissionTTLSec, "blob-signature-ttl", int(time.Duration(2*7*24*time.Hour).Seconds()), - "Lifetime of blob permission signatures. "+ + "Lifetime of blob permission signatures. Modifying the ttl will invalidate all existing signatures. "+ "See services/api/config/application.default.yml.") flag.BoolVar( &flagSerializeIO, @@ -302,13 +311,18 @@ func main() { } } + if maxRequests <= 0 { + maxRequests = maxBuffers * 2 + log.Printf("-max-requests <1 or not specified; defaulting to maxBuffers * 2 == %d", maxRequests) + } + // Start a round-robin VolumeManager with the volumes we have found. KeepVM = MakeRRVolumeManager(volumes) - // Tell the built-in HTTP server to direct all requests to the REST router. - loggingRouter := MakeLoggingRESTRouter() - http.HandleFunc("/", func(resp http.ResponseWriter, req *http.Request) { - loggingRouter.ServeHTTP(resp, req) + // Middleware stack: logger, maxRequests limiter, method handlers + http.Handle("/", &LoggingRESTRouter{ + httpserver.NewRequestLimiter(maxRequests, + MakeRESTRouter()), }) // Set up a TCP listener. @@ -318,8 +332,12 @@ func main() { } // Initialize Pull queue and worker + arv, err := arvadosclient.MakeArvadosClient() + if err != nil { + log.Fatalf("MakeArvadosClient: %s", err) + } keepClient := &keepclient.KeepClient{ - Arvados: nil, + Arvados: &arv, Want_replicas: 1, Client: &http.Client{}, }