Merge branch '12167-keep-request-id'
authorTom Clegg <tclegg@veritasgenetics.com>
Thu, 30 Nov 2017 14:38:29 +0000 (09:38 -0500)
committerTom Clegg <tclegg@veritasgenetics.com>
Thu, 30 Nov 2017 14:38:29 +0000 (09:38 -0500)
refs #12167

Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg@veritasgenetics.com>

sdk/go/httpserver/id_generator.go
sdk/go/httpserver/logger.go [new file with mode: 0644]
sdk/go/httpserver/logger_test.go [new file with mode: 0644]
services/keepproxy/keepproxy.go
services/keepproxy/keepproxy_test.go
services/keepproxy/proxy_client.go
services/keepstore/handler_test.go
services/keepstore/keepstore.go
services/keepstore/logging_router.go [deleted file]
services/keepstore/logging_router_test.go [deleted file]
services/keepstore/s3_volume.go

index 18fd91f560227ee1c40fec2bce64126bc0c3b827..d2c3a41f2108e2bc852f56119b747a7ec9423e7a 100644 (file)
@@ -5,6 +5,8 @@
 package httpserver
 
 import (
+       "math/rand"
+       "net/http"
        "strconv"
        "sync"
        "time"
@@ -17,19 +19,34 @@ type IDGenerator struct {
        // Prefix is prepended to each returned ID.
        Prefix string
 
-       lastID int64
-       mtx    sync.Mutex
+       mtx sync.Mutex
+       src rand.Source
 }
 
 // Next returns a new ID string. It is safe to call Next from multiple
 // goroutines.
 func (g *IDGenerator) Next() string {
-       id := time.Now().UnixNano()
        g.mtx.Lock()
-       if id <= g.lastID {
-               id = g.lastID + 1
+       defer g.mtx.Unlock()
+       if g.src == nil {
+               g.src = rand.NewSource(time.Now().UnixNano())
        }
-       g.lastID = id
-       g.mtx.Unlock()
-       return g.Prefix + strconv.FormatInt(id, 36)
+       a, b := g.src.Int63(), g.src.Int63()
+       id := strconv.FormatInt(a, 36) + strconv.FormatInt(b, 36)
+       for len(id) > 20 {
+               id = id[:20]
+       }
+       return g.Prefix + id
+}
+
+// AddRequestIDs wraps an http.Handler, adding an X-Request-Id header
+// to each request that doesn't already have one.
+func AddRequestIDs(h http.Handler) http.Handler {
+       gen := &IDGenerator{Prefix: "req-"}
+       return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
+               if req.Header.Get("X-Request-Id") == "" {
+                       req.Header.Set("X-Request-Id", gen.Next())
+               }
+               h.ServeHTTP(w, req)
+       })
 }
diff --git a/sdk/go/httpserver/logger.go b/sdk/go/httpserver/logger.go
new file mode 100644 (file)
index 0000000..decb2ff
--- /dev/null
@@ -0,0 +1,82 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package httpserver
+
+import (
+       "context"
+       "net/http"
+       "time"
+
+       "git.curoverse.com/arvados.git/sdk/go/stats"
+       log "github.com/Sirupsen/logrus"
+)
+
+type contextKey struct {
+       name string
+}
+
+var requestTimeContextKey = contextKey{"requestTime"}
+
+// LogRequests wraps an http.Handler, logging each request and
+// response via logrus.
+func LogRequests(h http.Handler) http.Handler {
+       return http.HandlerFunc(func(wrapped http.ResponseWriter, req *http.Request) {
+               w := &responseTimer{ResponseWriter: WrapResponseWriter(wrapped)}
+               req = req.WithContext(context.WithValue(req.Context(), &requestTimeContextKey, time.Now()))
+               lgr := log.WithFields(log.Fields{
+                       "RequestID":       req.Header.Get("X-Request-Id"),
+                       "remoteAddr":      req.RemoteAddr,
+                       "reqForwardedFor": req.Header.Get("X-Forwarded-For"),
+                       "reqMethod":       req.Method,
+                       "reqPath":         req.URL.Path[1:],
+                       "reqBytes":        req.ContentLength,
+               })
+               logRequest(w, req, lgr)
+               defer logResponse(w, req, lgr)
+               h.ServeHTTP(w, req)
+       })
+}
+
+func logRequest(w *responseTimer, req *http.Request, lgr *log.Entry) {
+       lgr.Info("request")
+}
+
+func logResponse(w *responseTimer, req *http.Request, lgr *log.Entry) {
+       if tStart, ok := req.Context().Value(&requestTimeContextKey).(time.Time); ok {
+               tDone := time.Now()
+               lgr = lgr.WithFields(log.Fields{
+                       "timeTotal":     stats.Duration(tDone.Sub(tStart)),
+                       "timeToStatus":  stats.Duration(w.writeTime.Sub(tStart)),
+                       "timeWriteBody": stats.Duration(tDone.Sub(w.writeTime)),
+               })
+       }
+       lgr.WithFields(log.Fields{
+               "respStatusCode": w.WroteStatus(),
+               "respStatus":     http.StatusText(w.WroteStatus()),
+               "respBytes":      w.WroteBodyBytes(),
+       }).Info("response")
+}
+
+type responseTimer struct {
+       ResponseWriter
+       wrote     bool
+       writeTime time.Time
+}
+
+func (rt *responseTimer) WriteHeader(code int) {
+       if !rt.wrote {
+               rt.wrote = true
+               rt.writeTime = time.Now()
+       }
+       rt.ResponseWriter.WriteHeader(code)
+}
+
+func (rt *responseTimer) Write(p []byte) (int, error) {
+       if !rt.wrote {
+               rt.wrote = true
+               rt.writeTime = time.Now()
+       }
+       return rt.ResponseWriter.Write(p)
+}
diff --git a/sdk/go/httpserver/logger_test.go b/sdk/go/httpserver/logger_test.go
new file mode 100644 (file)
index 0000000..bbcafa1
--- /dev/null
@@ -0,0 +1,68 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package httpserver
+
+import (
+       "bytes"
+       "encoding/json"
+       "net/http"
+       "net/http/httptest"
+       "os"
+       "testing"
+       "time"
+
+       log "github.com/Sirupsen/logrus"
+       check "gopkg.in/check.v1"
+)
+
+func Test(t *testing.T) {
+       check.TestingT(t)
+}
+
+var _ = check.Suite(&Suite{})
+
+type Suite struct{}
+
+func (s *Suite) TestLogRequests(c *check.C) {
+       defer log.SetOutput(os.Stdout)
+       captured := &bytes.Buffer{}
+       log.SetOutput(captured)
+       log.SetFormatter(&log.JSONFormatter{
+               TimestampFormat: time.RFC3339Nano,
+       })
+       h := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
+               w.Write([]byte("hello world"))
+       })
+       req, err := http.NewRequest("GET", "https://foo.example/bar", nil)
+       req.Header.Set("X-Forwarded-For", "1.2.3.4:12345")
+       c.Assert(err, check.IsNil)
+       resp := httptest.NewRecorder()
+       AddRequestIDs(LogRequests(h)).ServeHTTP(resp, req)
+
+       dec := json.NewDecoder(captured)
+
+       gotReq := make(map[string]interface{})
+       err = dec.Decode(&gotReq)
+       c.Logf("%#v", gotReq)
+       c.Check(gotReq["RequestID"], check.Matches, "req-[a-z0-9]{20}")
+       c.Check(gotReq["reqForwardedFor"], check.Equals, "1.2.3.4:12345")
+       c.Check(gotReq["msg"], check.Equals, "request")
+
+       gotResp := make(map[string]interface{})
+       err = dec.Decode(&gotResp)
+       c.Logf("%#v", gotResp)
+       c.Check(gotResp["RequestID"], check.Equals, gotReq["RequestID"])
+       c.Check(gotResp["reqForwardedFor"], check.Equals, "1.2.3.4:12345")
+       c.Check(gotResp["msg"], check.Equals, "response")
+
+       c.Assert(gotResp["time"], check.FitsTypeOf, "")
+       _, err = time.Parse(time.RFC3339Nano, gotResp["time"].(string))
+       c.Check(err, check.IsNil)
+
+       for _, key := range []string{"timeToStatus", "timeWriteBody", "timeTotal"} {
+               c.Assert(gotResp[key], check.FitsTypeOf, float64(0))
+               c.Check(gotResp[key].(float64), check.Not(check.Equals), float64(0))
+       }
+}
index 3d1b4476255d0cadf2274ad12b9aae78f164f72f..e2a6221f10e28981ea0c3f64fa5ce6d52ac718ea 100644 (file)
@@ -10,7 +10,6 @@ import (
        "fmt"
        "io"
        "io/ioutil"
-       "log"
        "net"
        "net/http"
        "os"
@@ -25,7 +24,9 @@ import (
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
        "git.curoverse.com/arvados.git/sdk/go/config"
        "git.curoverse.com/arvados.git/sdk/go/health"
+       "git.curoverse.com/arvados.git/sdk/go/httpserver"
        "git.curoverse.com/arvados.git/sdk/go/keepclient"
+       log "github.com/Sirupsen/logrus"
        "github.com/coreos/go-systemd/daemon"
        "github.com/ghodss/yaml"
        "github.com/gorilla/mux"
@@ -55,7 +56,13 @@ var (
        router   http.Handler
 )
 
+const rfc3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00"
+
 func main() {
+       log.SetFormatter(&log.JSONFormatter{
+               TimestampFormat: rfc3339NanoFixed,
+       })
+
        cfg := DefaultConfig()
 
        flagset := flag.NewFlagSet("keepproxy", flag.ExitOnError)
@@ -164,7 +171,7 @@ func main() {
 
        // Start serving requests.
        router = MakeRESTRouter(!cfg.DisableGet, !cfg.DisablePut, kc, time.Duration(cfg.Timeout), cfg.ManagementToken)
-       http.Serve(listener, router)
+       http.Serve(listener, httpserver.AddRequestIDs(httpserver.LogRequests(router)))
 
        log.Println("shutting down")
 }
@@ -596,7 +603,8 @@ func (h *proxyHandler) makeKeepClient(req *http.Request) *keepclient.KeepClient
                        Timeout:   h.timeout,
                        Transport: h.transport,
                },
-               proto: req.Proto,
+               proto:     req.Proto,
+               requestID: req.Header.Get("X-Request-Id"),
        }
        return &kc
 }
index 23bb2bd216c1f1080d246e63a47252564e4d9bf4..a7b608b69c462fd4149f932c16dbabcaddbca6c6 100644 (file)
@@ -10,7 +10,6 @@ import (
        "errors"
        "fmt"
        "io/ioutil"
-       "log"
        "math/rand"
        "net/http"
        "net/http/httptest"
@@ -57,7 +56,7 @@ func waitForListener() {
                time.Sleep(ms * time.Millisecond)
        }
        if listener == nil {
-               log.Fatalf("Timed out waiting for listener to start")
+               panic("Timed out waiting for listener to start")
        }
 }
 
@@ -255,14 +254,14 @@ func (s *ServerRequiredSuite) TestPutAskGet(c *C) {
        {
                _, _, err := kc.Ask(hash)
                c.Check(err, Equals, keepclient.BlockNotFound)
-               log.Print("Finished Ask (expected BlockNotFound)")
+               c.Log("Finished Ask (expected BlockNotFound)")
        }
 
        {
                reader, _, _, err := kc.Get(hash)
                c.Check(reader, Equals, nil)
                c.Check(err, Equals, keepclient.BlockNotFound)
-               log.Print("Finished Get (expected BlockNotFound)")
+               c.Log("Finished Get (expected BlockNotFound)")
        }
 
        // Note in bug #5309 among other errors keepproxy would set
@@ -281,14 +280,14 @@ func (s *ServerRequiredSuite) TestPutAskGet(c *C) {
                c.Check(hash2, Matches, fmt.Sprintf(`^%s\+3(\+.+)?$`, hash))
                c.Check(rep, Equals, 2)
                c.Check(err, Equals, nil)
-               log.Print("Finished PutB (expected success)")
+               c.Log("Finished PutB (expected success)")
        }
 
        {
                blocklen, _, err := kc.Ask(hash2)
                c.Assert(err, Equals, nil)
                c.Check(blocklen, Equals, int64(3))
-               log.Print("Finished Ask (expected success)")
+               c.Log("Finished Ask (expected success)")
        }
 
        {
@@ -297,7 +296,7 @@ func (s *ServerRequiredSuite) TestPutAskGet(c *C) {
                all, err := ioutil.ReadAll(reader)
                c.Check(all, DeepEquals, []byte("foo"))
                c.Check(blocklen, Equals, int64(3))
-               log.Print("Finished Get (expected success)")
+               c.Log("Finished Get (expected success)")
        }
 
        {
@@ -307,7 +306,7 @@ func (s *ServerRequiredSuite) TestPutAskGet(c *C) {
                c.Check(hash2, Matches, `^d41d8cd98f00b204e9800998ecf8427e\+0(\+.+)?$`)
                c.Check(rep, Equals, 2)
                c.Check(err, Equals, nil)
-               log.Print("Finished PutB zero block")
+               c.Log("Finished PutB zero block")
        }
 
        {
@@ -316,7 +315,7 @@ func (s *ServerRequiredSuite) TestPutAskGet(c *C) {
                all, err := ioutil.ReadAll(reader)
                c.Check(all, DeepEquals, []byte(""))
                c.Check(blocklen, Equals, int64(0))
-               log.Print("Finished Get zero block")
+               c.Log("Finished Get zero block")
        }
 }
 
@@ -331,7 +330,7 @@ func (s *ServerRequiredSuite) TestPutAskGetForbidden(c *C) {
                errNotFound, _ := err.(keepclient.ErrNotFound)
                c.Check(errNotFound, NotNil)
                c.Assert(strings.Contains(err.Error(), "HTTP 403"), Equals, true)
-               log.Print("Ask 1")
+               c.Log("Ask 1")
        }
 
        {
@@ -339,7 +338,7 @@ func (s *ServerRequiredSuite) TestPutAskGetForbidden(c *C) {
                c.Check(hash2, Equals, "")
                c.Check(rep, Equals, 0)
                c.Check(err, FitsTypeOf, keepclient.InsufficientReplicasError(errors.New("")))
-               log.Print("PutB")
+               c.Log("PutB")
        }
 
        {
@@ -348,7 +347,7 @@ func (s *ServerRequiredSuite) TestPutAskGetForbidden(c *C) {
                c.Check(errNotFound, NotNil)
                c.Assert(strings.Contains(err.Error(), "HTTP 403"), Equals, true)
                c.Check(blocklen, Equals, int64(0))
-               log.Print("Ask 2")
+               c.Log("Ask 2")
        }
 
        {
@@ -357,7 +356,7 @@ func (s *ServerRequiredSuite) TestPutAskGetForbidden(c *C) {
                c.Check(errNotFound, NotNil)
                c.Assert(strings.Contains(err.Error(), "HTTP 403"), Equals, true)
                c.Check(blocklen, Equals, int64(0))
-               log.Print("Get")
+               c.Log("Get")
        }
 }
 
@@ -372,7 +371,7 @@ func (s *ServerRequiredSuite) TestGetDisabled(c *C) {
                errNotFound, _ := err.(keepclient.ErrNotFound)
                c.Check(errNotFound, NotNil)
                c.Assert(strings.Contains(err.Error(), "HTTP 400"), Equals, true)
-               log.Print("Ask 1")
+               c.Log("Ask 1")
        }
 
        {
@@ -380,7 +379,7 @@ func (s *ServerRequiredSuite) TestGetDisabled(c *C) {
                c.Check(hash2, Matches, fmt.Sprintf(`^%s\+3(\+.+)?$`, hash))
                c.Check(rep, Equals, 2)
                c.Check(err, Equals, nil)
-               log.Print("PutB")
+               c.Log("PutB")
        }
 
        {
@@ -389,7 +388,7 @@ func (s *ServerRequiredSuite) TestGetDisabled(c *C) {
                c.Check(errNotFound, NotNil)
                c.Assert(strings.Contains(err.Error(), "HTTP 400"), Equals, true)
                c.Check(blocklen, Equals, int64(0))
-               log.Print("Ask 2")
+               c.Log("Ask 2")
        }
 
        {
@@ -398,7 +397,7 @@ func (s *ServerRequiredSuite) TestGetDisabled(c *C) {
                c.Check(errNotFound, NotNil)
                c.Assert(strings.Contains(err.Error(), "HTTP 400"), Equals, true)
                c.Check(blocklen, Equals, int64(0))
-               log.Print("Get")
+               c.Log("Get")
        }
 }
 
index 0faf4aea0e3c35354e30dc33f1e7005d491ab4d5..3fa2671df58331bb52c16651ded3924e9390ee90 100644 (file)
@@ -13,11 +13,13 @@ import (
 var viaAlias = "keepproxy"
 
 type proxyClient struct {
-       client keepclient.HTTPClient
-       proto  string
+       client    keepclient.HTTPClient
+       proto     string
+       requestID string
 }
 
 func (pc *proxyClient) Do(req *http.Request) (*http.Response, error) {
        req.Header.Add("Via", pc.proto+" "+viaAlias)
+       req.Header.Add("X-Request-Id", pc.requestID)
        return pc.client.Do(req)
 }
index 424910dfa8b2af4a0d22ad9e91bd4eab20076af7..4d042a70dd376ea1e04bdff16283ab80669dfd0f 100644 (file)
@@ -975,7 +975,7 @@ func TestGetHandlerClientDisconnect(t *testing.T) {
        ok := make(chan struct{})
        go func() {
                req, _ := http.NewRequest("GET", fmt.Sprintf("/%s+%d", TestHash, len(TestBlock)), nil)
-               (&LoggingRESTRouter{router: MakeRESTRouter()}).ServeHTTP(resp, req)
+               MakeRESTRouter().ServeHTTP(resp, req)
                ok <- struct{}{}
        }()
 
index 921176dbbe93f481f497af476f176c2116ef3bce..e422179f643e9cad438742cd2aa450d52ae84fa4 100644 (file)
@@ -147,11 +147,11 @@ func main() {
        // Start a round-robin VolumeManager with the volumes we have found.
        KeepVM = MakeRRVolumeManager(theConfig.Volumes)
 
-       // Middleware stack: logger, MaxRequests limiter, method handlers
+       // Middleware/handler stack
        router := MakeRESTRouter()
        limiter := httpserver.NewRequestLimiter(theConfig.MaxRequests, router)
        router.limiter = limiter
-       http.Handle("/", &LoggingRESTRouter{router: limiter})
+       http.Handle("/", httpserver.AddRequestIDs(httpserver.LogRequests(limiter)))
 
        // Set up a TCP listener.
        listener, err := net.Listen("tcp", theConfig.Listen)
diff --git a/services/keepstore/logging_router.go b/services/keepstore/logging_router.go
deleted file mode 100644 (file)
index 63c28a2..0000000
+++ /dev/null
@@ -1,111 +0,0 @@
-// Copyright (C) The Arvados Authors. All rights reserved.
-//
-// SPDX-License-Identifier: AGPL-3.0
-
-package main
-
-// LoggingRESTRouter
-// LoggingResponseWriter
-
-import (
-       "context"
-       "net/http"
-       "strings"
-       "time"
-
-       "git.curoverse.com/arvados.git/sdk/go/httpserver"
-       "git.curoverse.com/arvados.git/sdk/go/stats"
-       log "github.com/Sirupsen/logrus"
-)
-
-// LoggingResponseWriter has anonymous fields ResponseWriter and ResponseBody
-type LoggingResponseWriter struct {
-       Status int
-       Length int
-       http.ResponseWriter
-       ResponseBody string
-       sentHdr      time.Time
-}
-
-// CloseNotify implements http.CloseNotifier.
-func (resp *LoggingResponseWriter) CloseNotify() <-chan bool {
-       wrapped, ok := resp.ResponseWriter.(http.CloseNotifier)
-       if !ok {
-               // If upstream doesn't implement CloseNotifier, we can
-               // satisfy the interface by returning a channel that
-               // never sends anything (the interface doesn't
-               // guarantee that anything will ever be sent on the
-               // channel even if the client disconnects).
-               return nil
-       }
-       return wrapped.CloseNotify()
-}
-
-// WriteHeader writes header to ResponseWriter
-func (resp *LoggingResponseWriter) WriteHeader(code int) {
-       if resp.sentHdr == zeroTime {
-               resp.sentHdr = time.Now()
-       }
-       resp.Status = code
-       resp.ResponseWriter.WriteHeader(code)
-}
-
-var zeroTime time.Time
-
-func (resp *LoggingResponseWriter) Write(data []byte) (int, error) {
-       if resp.Length == 0 && len(data) > 0 && resp.sentHdr == zeroTime {
-               resp.sentHdr = time.Now()
-       }
-       resp.Length += len(data)
-       if resp.Status >= 400 {
-               resp.ResponseBody += string(data)
-       }
-       return resp.ResponseWriter.Write(data)
-}
-
-// LoggingRESTRouter is used to add logging capabilities to mux.Router
-type LoggingRESTRouter struct {
-       router      http.Handler
-       idGenerator httpserver.IDGenerator
-}
-
-func (loggingRouter *LoggingRESTRouter) ServeHTTP(wrappedResp http.ResponseWriter, req *http.Request) {
-       tStart := time.Now()
-
-       // Attach a requestID-aware logger to the request context.
-       lgr := log.WithField("RequestID", loggingRouter.idGenerator.Next())
-       ctx := context.WithValue(req.Context(), "logger", lgr)
-       req = req.WithContext(ctx)
-
-       lgr = lgr.WithFields(log.Fields{
-               "remoteAddr":      req.RemoteAddr,
-               "reqForwardedFor": req.Header.Get("X-Forwarded-For"),
-               "reqMethod":       req.Method,
-               "reqPath":         req.URL.Path[1:],
-               "reqBytes":        req.ContentLength,
-       })
-       lgr.Debug("request")
-
-       resp := LoggingResponseWriter{http.StatusOK, 0, wrappedResp, "", zeroTime}
-       loggingRouter.router.ServeHTTP(&resp, req)
-       tDone := time.Now()
-
-       statusText := http.StatusText(resp.Status)
-       if resp.Status >= 400 {
-               statusText = strings.Replace(resp.ResponseBody, "\n", "", -1)
-       }
-       if resp.sentHdr == zeroTime {
-               // Nobody changed status or wrote any data, i.e., we
-               // returned a 200 response with no body.
-               resp.sentHdr = tDone
-       }
-
-       lgr.WithFields(log.Fields{
-               "timeTotal":      stats.Duration(tDone.Sub(tStart)),
-               "timeToStatus":   stats.Duration(resp.sentHdr.Sub(tStart)),
-               "timeWriteBody":  stats.Duration(tDone.Sub(resp.sentHdr)),
-               "respStatusCode": resp.Status,
-               "respStatus":     statusText,
-               "respBytes":      resp.Length,
-       }).Info("response")
-}
diff --git a/services/keepstore/logging_router_test.go b/services/keepstore/logging_router_test.go
deleted file mode 100644 (file)
index 6ca48dc..0000000
+++ /dev/null
@@ -1,14 +0,0 @@
-// Copyright (C) The Arvados Authors. All rights reserved.
-//
-// SPDX-License-Identifier: AGPL-3.0
-
-package main
-
-import (
-       "net/http"
-       "testing"
-)
-
-func TestLoggingResponseWriterImplementsCloseNotifier(t *testing.T) {
-       http.ResponseWriter(&LoggingResponseWriter{}).(http.CloseNotifier).CloseNotify()
-}
index e6a53d06c6c297ab343f697dd7c5f68a40e8355e..61e69f9096503eb88cf9328af5e85f129dba4226 100644 (file)
@@ -45,6 +45,8 @@ var (
        s3RaceWindow    time.Duration
 
        s3ACL = s3.Private
+
+       zeroTime time.Time
 )
 
 const (