10484: Report current client count.
authorTom Clegg <tom@curoverse.com>
Mon, 21 Nov 2016 19:22:59 +0000 (14:22 -0500)
committerTom Clegg <tom@curoverse.com>
Mon, 21 Nov 2016 19:22:59 +0000 (14:22 -0500)
sdk/go/httpserver/request_limiter.go
services/keepstore/handlers.go
services/keepstore/keepstore.go

index 178ffb90f4facbebdfd6809bb1448e84904bc82f..ee35f4748b78ecfabac5c431ac5ad73340e4f300 100644 (file)
@@ -4,18 +4,42 @@ import (
        "net/http"
 )
 
+// RequestCounter is an http.Handler that tracks the number of
+// requests in progress.
+type RequestCounter interface {
+       http.Handler
+
+       // Current() returns the number of requests in progress.
+       Current() int
+
+       // Max() returns the maximum number of concurrent requests
+       // that will be accepted.
+       Max() int
+}
+
 type limiterHandler struct {
        requests chan struct{}
        handler  http.Handler
 }
 
-func NewRequestLimiter(maxRequests int, handler http.Handler) http.Handler {
+// NewRequestLimiter returns a RequestCounter that delegates up to
+// maxRequests at a time to the given handler, and responds 503 to all
+// incoming requests beyond that limit.
+func NewRequestLimiter(maxRequests int, handler http.Handler) RequestCounter {
        return &limiterHandler{
                requests: make(chan struct{}, maxRequests),
                handler:  handler,
        }
 }
 
+func (h *limiterHandler) Current() int {
+       return len(h.requests)
+}
+
+func (h *limiterHandler) Max() int {
+       return cap(h.requests)
+}
+
 func (h *limiterHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
        select {
        case h.requests <- struct{}{}:
index ed0d8d7a5bff1721a144db27bcd29273f70c77d3..adaaa361e96177080a9df4e2b2f1d77aac98424d 100644 (file)
@@ -24,14 +24,20 @@ import (
        "sync"
        "time"
 
+       "git.curoverse.com/arvados.git/sdk/go/httpserver"
        log "github.com/Sirupsen/logrus"
 )
 
-// MakeRESTRouter returns a new mux.Router that forwards all Keep
-// requests to the appropriate handlers.
-//
-func MakeRESTRouter() *mux.Router {
+type router struct {
+       *mux.Router
+       limiter httpserver.RequestCounter
+}
+
+// MakeRESTRouter returns a new router that forwards all Keep requests
+// to the appropriate handlers.
+func MakeRESTRouter() *router {
        rest := mux.NewRouter()
+       rtr := &router{Router: rest}
 
        rest.HandleFunc(
                `/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
@@ -48,10 +54,10 @@ func MakeRESTRouter() *mux.Router {
        rest.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
 
        // Internals/debugging info (runtime.MemStats)
-       rest.HandleFunc(`/debug.json`, DebugHandler).Methods("GET", "HEAD")
+       rest.HandleFunc(`/debug.json`, rtr.DebugHandler).Methods("GET", "HEAD")
 
        // List volumes: path, device number, bytes used/avail.
-       rest.HandleFunc(`/status.json`, StatusHandler).Methods("GET", "HEAD")
+       rest.HandleFunc(`/status.json`, rtr.StatusHandler).Methods("GET", "HEAD")
 
        // Replace the current pull queue.
        rest.HandleFunc(`/pull`, PullHandler).Methods("PUT")
@@ -66,7 +72,7 @@ func MakeRESTRouter() *mux.Router {
        // 400 Bad Request.
        rest.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
 
-       return rest
+       return rtr
 }
 
 // BadRequestHandler is a HandleFunc to address bad requests.
@@ -259,17 +265,19 @@ type volumeStatusEnt struct {
 
 // NodeStatus struct
 type NodeStatus struct {
-       Volumes    []*volumeStatusEnt
-       BufferPool PoolStatus
-       PullQueue  WorkQueueStatus
-       TrashQueue WorkQueueStatus
+       Volumes         []*volumeStatusEnt
+       BufferPool      PoolStatus
+       PullQueue       WorkQueueStatus
+       TrashQueue      WorkQueueStatus
+       RequestsCurrent int
+       RequestsMax     int
 }
 
 var st NodeStatus
 var stLock sync.Mutex
 
 // DebugHandler addresses /debug.json requests.
-func DebugHandler(resp http.ResponseWriter, req *http.Request) {
+func (rtr *router) DebugHandler(resp http.ResponseWriter, req *http.Request) {
        type debugStats struct {
                MemStats runtime.MemStats
        }
@@ -282,9 +290,9 @@ func DebugHandler(resp http.ResponseWriter, req *http.Request) {
 }
 
 // StatusHandler addresses /status.json requests.
-func StatusHandler(resp http.ResponseWriter, req *http.Request) {
+func (rtr *router) StatusHandler(resp http.ResponseWriter, req *http.Request) {
        stLock.Lock()
-       readNodeStatus(&st)
+       rtr.readNodeStatus(&st)
        jstat, err := json.Marshal(&st)
        stLock.Unlock()
        if err == nil {
@@ -297,7 +305,7 @@ func StatusHandler(resp http.ResponseWriter, req *http.Request) {
 }
 
 // populate the given NodeStatus struct with current values.
-func readNodeStatus(st *NodeStatus) {
+func (rtr *router) readNodeStatus(st *NodeStatus) {
        vols := KeepVM.AllReadable()
        if cap(st.Volumes) < len(vols) {
                st.Volumes = make([]*volumeStatusEnt, len(vols))
@@ -320,6 +328,10 @@ func readNodeStatus(st *NodeStatus) {
        st.BufferPool.Len = bufs.Len()
        st.PullQueue = getWorkQueueStatus(pullq)
        st.TrashQueue = getWorkQueueStatus(trashq)
+       if rtr.limiter != nil {
+               st.RequestsCurrent = rtr.limiter.Current()
+               st.RequestsMax = rtr.limiter.Max()
+       }
 }
 
 // return a WorkQueueStatus for the given queue. If q is nil (which
index d82c7d0f7a78792b4ccd46558e5ccb159ae463e4..54147959719183141a8e3137d5d1363ec9667e6b 100644 (file)
@@ -150,9 +150,10 @@ func main() {
        KeepVM = MakeRRVolumeManager(theConfig.Volumes)
 
        // Middleware stack: logger, MaxRequests limiter, method handlers
-       http.Handle("/", &LoggingRESTRouter{
-               router: httpserver.NewRequestLimiter(theConfig.MaxRequests, MakeRESTRouter()),
-       })
+       router := MakeRESTRouter()
+       limiter := httpserver.NewRequestLimiter(theConfig.MaxRequests, router)
+       router.limiter = limiter
+       http.Handle("/", &LoggingRESTRouter{router: limiter})
 
        // Set up a TCP listener.
        listener, err := net.Listen("tcp", theConfig.Listen)