package httpserver
import (
+ "math/rand"
+ "net/http"
"strconv"
"sync"
"time"
// Prefix is prepended to each returned ID.
Prefix string
- lastID int64
- mtx sync.Mutex
+ mtx sync.Mutex
+ src rand.Source
}
// Next returns a new ID string. It is safe to call Next from multiple
// goroutines.
func (g *IDGenerator) Next() string {
- id := time.Now().UnixNano()
g.mtx.Lock()
- if id <= g.lastID {
- id = g.lastID + 1
+ defer g.mtx.Unlock()
+ if g.src == nil {
+ g.src = rand.NewSource(time.Now().UnixNano())
}
- g.lastID = id
- g.mtx.Unlock()
- return g.Prefix + strconv.FormatInt(id, 36)
+ a, b := g.src.Int63(), g.src.Int63()
+ id := strconv.FormatInt(a, 36) + strconv.FormatInt(b, 36)
+ for len(id) > 20 {
+ id = id[:20]
+ }
+ return g.Prefix + id
+}
+
+// AddRequestIDs wraps an http.Handler, adding an X-Request-Id header
+// to each request that doesn't already have one.
+func AddRequestIDs(h http.Handler) http.Handler {
+ gen := &IDGenerator{Prefix: "req-"}
+ return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
+ if req.Header.Get("X-Request-Id") == "" {
+ req.Header.Set("X-Request-Id", gen.Next())
+ }
+ h.ServeHTTP(w, req)
+ })
}
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package httpserver
+
+import (
+ "context"
+ "net/http"
+ "time"
+
+ "git.curoverse.com/arvados.git/sdk/go/stats"
+ log "github.com/Sirupsen/logrus"
+)
+
+type contextKey struct {
+ name string
+}
+
+var requestTimeContextKey = contextKey{"requestTime"}
+
+// LogRequests wraps an http.Handler, logging each request and
+// response via logrus.
+func LogRequests(h http.Handler) http.Handler {
+ return http.HandlerFunc(func(wrapped http.ResponseWriter, req *http.Request) {
+ w := &responseTimer{ResponseWriter: WrapResponseWriter(wrapped)}
+ req = req.WithContext(context.WithValue(req.Context(), &requestTimeContextKey, time.Now()))
+ lgr := log.WithFields(log.Fields{
+ "RequestID": req.Header.Get("X-Request-Id"),
+ "remoteAddr": req.RemoteAddr,
+ "reqForwardedFor": req.Header.Get("X-Forwarded-For"),
+ "reqMethod": req.Method,
+ "reqPath": req.URL.Path[1:],
+ "reqBytes": req.ContentLength,
+ })
+ logRequest(w, req, lgr)
+ defer logResponse(w, req, lgr)
+ h.ServeHTTP(w, req)
+ })
+}
+
+func logRequest(w *responseTimer, req *http.Request, lgr *log.Entry) {
+ lgr.Info("request")
+}
+
+func logResponse(w *responseTimer, req *http.Request, lgr *log.Entry) {
+ if tStart, ok := req.Context().Value(&requestTimeContextKey).(time.Time); ok {
+ tDone := time.Now()
+ lgr = lgr.WithFields(log.Fields{
+ "timeTotal": stats.Duration(tDone.Sub(tStart)),
+ "timeToStatus": stats.Duration(w.writeTime.Sub(tStart)),
+ "timeWriteBody": stats.Duration(tDone.Sub(w.writeTime)),
+ })
+ }
+ lgr.WithFields(log.Fields{
+ "respStatusCode": w.WroteStatus(),
+ "respStatus": http.StatusText(w.WroteStatus()),
+ "respBytes": w.WroteBodyBytes(),
+ }).Info("response")
+}
+
+type responseTimer struct {
+ ResponseWriter
+ wrote bool
+ writeTime time.Time
+}
+
+func (rt *responseTimer) WriteHeader(code int) {
+ if !rt.wrote {
+ rt.wrote = true
+ rt.writeTime = time.Now()
+ }
+ rt.ResponseWriter.WriteHeader(code)
+}
+
+func (rt *responseTimer) Write(p []byte) (int, error) {
+ if !rt.wrote {
+ rt.wrote = true
+ rt.writeTime = time.Now()
+ }
+ return rt.ResponseWriter.Write(p)
+}
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package httpserver
+
+import (
+ "bytes"
+ "encoding/json"
+ "net/http"
+ "net/http/httptest"
+ "os"
+ "testing"
+ "time"
+
+ log "github.com/Sirupsen/logrus"
+ check "gopkg.in/check.v1"
+)
+
+func Test(t *testing.T) {
+ check.TestingT(t)
+}
+
+var _ = check.Suite(&Suite{})
+
+type Suite struct{}
+
+func (s *Suite) TestLogRequests(c *check.C) {
+ defer log.SetOutput(os.Stdout)
+ captured := &bytes.Buffer{}
+ log.SetOutput(captured)
+ log.SetFormatter(&log.JSONFormatter{
+ TimestampFormat: time.RFC3339Nano,
+ })
+ h := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
+ w.Write([]byte("hello world"))
+ })
+ req, err := http.NewRequest("GET", "https://foo.example/bar", nil)
+ req.Header.Set("X-Forwarded-For", "1.2.3.4:12345")
+ c.Assert(err, check.IsNil)
+ resp := httptest.NewRecorder()
+ AddRequestIDs(LogRequests(h)).ServeHTTP(resp, req)
+
+ dec := json.NewDecoder(captured)
+
+ gotReq := make(map[string]interface{})
+ err = dec.Decode(&gotReq)
+ c.Logf("%#v", gotReq)
+ c.Check(gotReq["RequestID"], check.Matches, "req-[a-z0-9]{20}")
+ c.Check(gotReq["reqForwardedFor"], check.Equals, "1.2.3.4:12345")
+ c.Check(gotReq["msg"], check.Equals, "request")
+
+ gotResp := make(map[string]interface{})
+ err = dec.Decode(&gotResp)
+ c.Logf("%#v", gotResp)
+ c.Check(gotResp["RequestID"], check.Equals, gotReq["RequestID"])
+ c.Check(gotResp["reqForwardedFor"], check.Equals, "1.2.3.4:12345")
+ c.Check(gotResp["msg"], check.Equals, "response")
+
+ c.Assert(gotResp["time"], check.FitsTypeOf, "")
+ _, err = time.Parse(time.RFC3339Nano, gotResp["time"].(string))
+ c.Check(err, check.IsNil)
+
+ for _, key := range []string{"timeToStatus", "timeWriteBody", "timeTotal"} {
+ c.Assert(gotResp[key], check.FitsTypeOf, float64(0))
+ c.Check(gotResp[key].(float64), check.Not(check.Equals), float64(0))
+ }
+}
"fmt"
"io"
"io/ioutil"
- "log"
"net"
"net/http"
"os"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/config"
"git.curoverse.com/arvados.git/sdk/go/health"
+ "git.curoverse.com/arvados.git/sdk/go/httpserver"
"git.curoverse.com/arvados.git/sdk/go/keepclient"
+ log "github.com/Sirupsen/logrus"
"github.com/coreos/go-systemd/daemon"
"github.com/ghodss/yaml"
"github.com/gorilla/mux"
router http.Handler
)
+const rfc3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00"
+
func main() {
+ log.SetFormatter(&log.JSONFormatter{
+ TimestampFormat: rfc3339NanoFixed,
+ })
+
cfg := DefaultConfig()
flagset := flag.NewFlagSet("keepproxy", flag.ExitOnError)
// Start serving requests.
router = MakeRESTRouter(!cfg.DisableGet, !cfg.DisablePut, kc, time.Duration(cfg.Timeout), cfg.ManagementToken)
- http.Serve(listener, router)
+ http.Serve(listener, httpserver.AddRequestIDs(httpserver.LogRequests(router)))
log.Println("shutting down")
}
Timeout: h.timeout,
Transport: h.transport,
},
- proto: req.Proto,
+ proto: req.Proto,
+ requestID: req.Header.Get("X-Request-Id"),
}
return &kc
}
"errors"
"fmt"
"io/ioutil"
- "log"
"math/rand"
"net/http"
"net/http/httptest"
time.Sleep(ms * time.Millisecond)
}
if listener == nil {
- log.Fatalf("Timed out waiting for listener to start")
+ panic("Timed out waiting for listener to start")
}
}
{
_, _, err := kc.Ask(hash)
c.Check(err, Equals, keepclient.BlockNotFound)
- log.Print("Finished Ask (expected BlockNotFound)")
+ c.Log("Finished Ask (expected BlockNotFound)")
}
{
reader, _, _, err := kc.Get(hash)
c.Check(reader, Equals, nil)
c.Check(err, Equals, keepclient.BlockNotFound)
- log.Print("Finished Get (expected BlockNotFound)")
+ c.Log("Finished Get (expected BlockNotFound)")
}
// Note in bug #5309 among other errors keepproxy would set
c.Check(hash2, Matches, fmt.Sprintf(`^%s\+3(\+.+)?$`, hash))
c.Check(rep, Equals, 2)
c.Check(err, Equals, nil)
- log.Print("Finished PutB (expected success)")
+ c.Log("Finished PutB (expected success)")
}
{
blocklen, _, err := kc.Ask(hash2)
c.Assert(err, Equals, nil)
c.Check(blocklen, Equals, int64(3))
- log.Print("Finished Ask (expected success)")
+ c.Log("Finished Ask (expected success)")
}
{
all, err := ioutil.ReadAll(reader)
c.Check(all, DeepEquals, []byte("foo"))
c.Check(blocklen, Equals, int64(3))
- log.Print("Finished Get (expected success)")
+ c.Log("Finished Get (expected success)")
}
{
c.Check(hash2, Matches, `^d41d8cd98f00b204e9800998ecf8427e\+0(\+.+)?$`)
c.Check(rep, Equals, 2)
c.Check(err, Equals, nil)
- log.Print("Finished PutB zero block")
+ c.Log("Finished PutB zero block")
}
{
all, err := ioutil.ReadAll(reader)
c.Check(all, DeepEquals, []byte(""))
c.Check(blocklen, Equals, int64(0))
- log.Print("Finished Get zero block")
+ c.Log("Finished Get zero block")
}
}
errNotFound, _ := err.(keepclient.ErrNotFound)
c.Check(errNotFound, NotNil)
c.Assert(strings.Contains(err.Error(), "HTTP 403"), Equals, true)
- log.Print("Ask 1")
+ c.Log("Ask 1")
}
{
c.Check(hash2, Equals, "")
c.Check(rep, Equals, 0)
c.Check(err, FitsTypeOf, keepclient.InsufficientReplicasError(errors.New("")))
- log.Print("PutB")
+ c.Log("PutB")
}
{
c.Check(errNotFound, NotNil)
c.Assert(strings.Contains(err.Error(), "HTTP 403"), Equals, true)
c.Check(blocklen, Equals, int64(0))
- log.Print("Ask 2")
+ c.Log("Ask 2")
}
{
c.Check(errNotFound, NotNil)
c.Assert(strings.Contains(err.Error(), "HTTP 403"), Equals, true)
c.Check(blocklen, Equals, int64(0))
- log.Print("Get")
+ c.Log("Get")
}
}
errNotFound, _ := err.(keepclient.ErrNotFound)
c.Check(errNotFound, NotNil)
c.Assert(strings.Contains(err.Error(), "HTTP 400"), Equals, true)
- log.Print("Ask 1")
+ c.Log("Ask 1")
}
{
c.Check(hash2, Matches, fmt.Sprintf(`^%s\+3(\+.+)?$`, hash))
c.Check(rep, Equals, 2)
c.Check(err, Equals, nil)
- log.Print("PutB")
+ c.Log("PutB")
}
{
c.Check(errNotFound, NotNil)
c.Assert(strings.Contains(err.Error(), "HTTP 400"), Equals, true)
c.Check(blocklen, Equals, int64(0))
- log.Print("Ask 2")
+ c.Log("Ask 2")
}
{
c.Check(errNotFound, NotNil)
c.Assert(strings.Contains(err.Error(), "HTTP 400"), Equals, true)
c.Check(blocklen, Equals, int64(0))
- log.Print("Get")
+ c.Log("Get")
}
}
var viaAlias = "keepproxy"
type proxyClient struct {
- client keepclient.HTTPClient
- proto string
+ client keepclient.HTTPClient
+ proto string
+ requestID string
}
func (pc *proxyClient) Do(req *http.Request) (*http.Response, error) {
req.Header.Add("Via", pc.proto+" "+viaAlias)
+ req.Header.Add("X-Request-Id", pc.requestID)
return pc.client.Do(req)
}
ok := make(chan struct{})
go func() {
req, _ := http.NewRequest("GET", fmt.Sprintf("/%s+%d", TestHash, len(TestBlock)), nil)
- (&LoggingRESTRouter{router: MakeRESTRouter()}).ServeHTTP(resp, req)
+ MakeRESTRouter().ServeHTTP(resp, req)
ok <- struct{}{}
}()
// Start a round-robin VolumeManager with the volumes we have found.
KeepVM = MakeRRVolumeManager(theConfig.Volumes)
- // Middleware stack: logger, MaxRequests limiter, method handlers
+ // Middleware/handler stack
router := MakeRESTRouter()
limiter := httpserver.NewRequestLimiter(theConfig.MaxRequests, router)
router.limiter = limiter
- http.Handle("/", &LoggingRESTRouter{router: limiter})
+ http.Handle("/", httpserver.AddRequestIDs(httpserver.LogRequests(limiter)))
// Set up a TCP listener.
listener, err := net.Listen("tcp", theConfig.Listen)
+++ /dev/null
-// Copyright (C) The Arvados Authors. All rights reserved.
-//
-// SPDX-License-Identifier: AGPL-3.0
-
-package main
-
-// LoggingRESTRouter
-// LoggingResponseWriter
-
-import (
- "context"
- "net/http"
- "strings"
- "time"
-
- "git.curoverse.com/arvados.git/sdk/go/httpserver"
- "git.curoverse.com/arvados.git/sdk/go/stats"
- log "github.com/Sirupsen/logrus"
-)
-
-// LoggingResponseWriter has anonymous fields ResponseWriter and ResponseBody
-type LoggingResponseWriter struct {
- Status int
- Length int
- http.ResponseWriter
- ResponseBody string
- sentHdr time.Time
-}
-
-// CloseNotify implements http.CloseNotifier.
-func (resp *LoggingResponseWriter) CloseNotify() <-chan bool {
- wrapped, ok := resp.ResponseWriter.(http.CloseNotifier)
- if !ok {
- // If upstream doesn't implement CloseNotifier, we can
- // satisfy the interface by returning a channel that
- // never sends anything (the interface doesn't
- // guarantee that anything will ever be sent on the
- // channel even if the client disconnects).
- return nil
- }
- return wrapped.CloseNotify()
-}
-
-// WriteHeader writes header to ResponseWriter
-func (resp *LoggingResponseWriter) WriteHeader(code int) {
- if resp.sentHdr == zeroTime {
- resp.sentHdr = time.Now()
- }
- resp.Status = code
- resp.ResponseWriter.WriteHeader(code)
-}
-
-var zeroTime time.Time
-
-func (resp *LoggingResponseWriter) Write(data []byte) (int, error) {
- if resp.Length == 0 && len(data) > 0 && resp.sentHdr == zeroTime {
- resp.sentHdr = time.Now()
- }
- resp.Length += len(data)
- if resp.Status >= 400 {
- resp.ResponseBody += string(data)
- }
- return resp.ResponseWriter.Write(data)
-}
-
-// LoggingRESTRouter is used to add logging capabilities to mux.Router
-type LoggingRESTRouter struct {
- router http.Handler
- idGenerator httpserver.IDGenerator
-}
-
-func (loggingRouter *LoggingRESTRouter) ServeHTTP(wrappedResp http.ResponseWriter, req *http.Request) {
- tStart := time.Now()
-
- // Attach a requestID-aware logger to the request context.
- lgr := log.WithField("RequestID", loggingRouter.idGenerator.Next())
- ctx := context.WithValue(req.Context(), "logger", lgr)
- req = req.WithContext(ctx)
-
- lgr = lgr.WithFields(log.Fields{
- "remoteAddr": req.RemoteAddr,
- "reqForwardedFor": req.Header.Get("X-Forwarded-For"),
- "reqMethod": req.Method,
- "reqPath": req.URL.Path[1:],
- "reqBytes": req.ContentLength,
- })
- lgr.Debug("request")
-
- resp := LoggingResponseWriter{http.StatusOK, 0, wrappedResp, "", zeroTime}
- loggingRouter.router.ServeHTTP(&resp, req)
- tDone := time.Now()
-
- statusText := http.StatusText(resp.Status)
- if resp.Status >= 400 {
- statusText = strings.Replace(resp.ResponseBody, "\n", "", -1)
- }
- if resp.sentHdr == zeroTime {
- // Nobody changed status or wrote any data, i.e., we
- // returned a 200 response with no body.
- resp.sentHdr = tDone
- }
-
- lgr.WithFields(log.Fields{
- "timeTotal": stats.Duration(tDone.Sub(tStart)),
- "timeToStatus": stats.Duration(resp.sentHdr.Sub(tStart)),
- "timeWriteBody": stats.Duration(tDone.Sub(resp.sentHdr)),
- "respStatusCode": resp.Status,
- "respStatus": statusText,
- "respBytes": resp.Length,
- }).Info("response")
-}
+++ /dev/null
-// Copyright (C) The Arvados Authors. All rights reserved.
-//
-// SPDX-License-Identifier: AGPL-3.0
-
-package main
-
-import (
- "net/http"
- "testing"
-)
-
-func TestLoggingResponseWriterImplementsCloseNotifier(t *testing.T) {
- http.ResponseWriter(&LoggingResponseWriter{}).(http.CloseNotifier).CloseNotify()
-}
s3RaceWindow time.Duration
s3ACL = s3.Private
+
+ zeroTime time.Time
)
const (