8460: Structured logging.
authorTom Clegg <tom@curoverse.com>
Thu, 17 Nov 2016 19:10:15 +0000 (14:10 -0500)
committerTom Clegg <tom@curoverse.com>
Thu, 17 Nov 2016 19:10:15 +0000 (14:10 -0500)
sdk/python/tests/run_test_server.py
services/ws/config.go
services/ws/event.go
services/ws/handler.go
services/ws/log.go
services/ws/main.go
services/ws/permission.go
services/ws/pg.go
services/ws/router.go
services/ws/session.go
services/ws/session_v0.go

index 5ef5e2a9f5c8b83e8c7ece912ea7eba6624968c8..bd37daa6cdd3c27b8c24252d0ffadb92321044f4 100644 (file)
@@ -379,6 +379,7 @@ Client:
   APIHost: {}
   Insecure: true
 Listen: :{}
+LogLevel: {}
 Postgres:
   host: {}
   dbname: {}
@@ -387,6 +388,7 @@ Postgres:
   sslmode: require
         """.format(os.environ['ARVADOS_API_HOST'],
                    port,
+                   ('info' if os.environ.get('ARVADOS_DEBUG', '') in ['','0'] else 'debug'),
                    _dbconfig('host'),
                    _dbconfig('database'),
                    _dbconfig('username'),
index 9c2e80a1728afba9998d8a1b40d58b1ecd311b8f..e2d69d0c68a2586b64a830b1baaabc56137c6604 100644 (file)
@@ -7,10 +7,11 @@ import (
 )
 
 type Config struct {
-       Client   arvados.Client
-       Postgres pgConfig
-       Listen   string
-       Debug    bool
+       Client    arvados.Client
+       Postgres  pgConfig
+       Listen    string
+       LogLevel  string
+       LogFormat string
 
        PingTimeout      arvados.Duration
        ClientEventQueue int
@@ -30,6 +31,8 @@ func DefaultConfig() Config {
                        "connect_timeout": "30",
                        "sslmode":         "require",
                },
+               LogLevel:         "info",
+               LogFormat:        "json",
                PingTimeout:      arvados.Duration(time.Minute),
                ClientEventQueue: 64,
                ServerEventQueue: 4,
index 77acf4496fa7957e13922787606c6da5ef670b3f..280035b9ef5840e9ff6c11fa8d4517f0ac8f850e 100644 (file)
@@ -2,7 +2,6 @@ package main
 
 import (
        "database/sql"
-       "log"
        "sync"
        "time"
 
@@ -51,12 +50,12 @@ func (e *event) Detail() *arvados.Log {
                &logRow.CreatedAt,
                &propYAML)
        if e.err != nil {
-               log.Printf("retrieving log row %d: %s", e.LogID, e.err)
+               logger(nil).WithField("LogID", e.LogID).WithError(e.err).Error("QueryRow failed")
                return nil
        }
        e.err = yaml.Unmarshal(propYAML, &logRow.Properties)
        if e.err != nil {
-               log.Printf("decoding yaml for log row %d: %s", e.LogID, e.err)
+               logger(nil).WithField("LogID", e.LogID).WithError(e.err).Error("yaml decode failed")
                return nil
        }
        e.logRow = &logRow
index 1470c6608c73de37fc1b543f6d554d1ded12efe2..2b94693610f1dba95c8a31ab26a2df89bf966eec 100644 (file)
@@ -3,20 +3,12 @@ package main
 import (
        "encoding/json"
        "io"
-       "log"
-       "net/http"
        "time"
 
        "git.curoverse.com/arvados.git/sdk/go/arvados"
+       log "github.com/Sirupsen/logrus"
 )
 
-type wsConn interface {
-       io.ReadWriter
-       Request() *http.Request
-       SetReadDeadline(time.Time) error
-       SetWriteDeadline(time.Time) error
-}
-
 type handler struct {
        Client      arvados.Client
        PingTimeout time.Duration
@@ -25,16 +17,19 @@ type handler struct {
 }
 
 type handlerStats struct {
-       QueueDelay time.Duration
-       WriteDelay time.Duration
-       EventBytes uint64
-       EventCount uint64
+       QueueDelayNs time.Duration
+       WriteDelayNs time.Duration
+       EventBytes   uint64
+       EventCount   uint64
 }
 
 func (h *handler) Handle(ws wsConn, events <-chan *event) (stats handlerStats) {
+       ctx := contextWithLogger(ws.Request().Context(), log.WithFields(log.Fields{
+               "RemoteAddr": ws.Request().RemoteAddr,
+       }))
        sess, err := h.NewSession(ws)
        if err != nil {
-               log.Printf("%s NewSession: %s", ws.Request().RemoteAddr, err)
+               logger(ctx).WithError(err).Error("NewSession failed")
                return
        }
 
@@ -53,13 +48,13 @@ func (h *handler) Handle(ws wsConn, events <-chan *event) (stats handlerStats) {
                        }
                        ws.SetReadDeadline(time.Now().Add(24 * 365 * time.Hour))
                        n, err := ws.Read(buf)
-                       sess.debugLogf("received frame: %q", buf[:n])
+                       logger(ctx).WithField("frame", string(buf[:n])).Debug("received frame")
                        if err == nil && n == len(buf) {
                                err = errFrameTooBig
                        }
                        if err != nil {
                                if err != io.EOF {
-                                       sess.debugLogf("handler: read: %s", err)
+                                       logger(ctx).WithError(err).Info("read error")
                                }
                                stop <- err
                                return
@@ -67,12 +62,12 @@ func (h *handler) Handle(ws wsConn, events <-chan *event) (stats handlerStats) {
                        msg := make(map[string]interface{})
                        err = json.Unmarshal(buf[:n], &msg)
                        if err != nil {
-                               sess.debugLogf("handler: unmarshal: %s", err)
+                               logger(ctx).WithError(err).Info("invalid json from client")
                                stop <- err
                                return
                        }
                        for _, buf := range sess.Receive(msg, buf[:n]) {
-                               sess.debugLogf("handler: to queue: %s", string(buf))
+                               logger(ctx).WithField("frame", string(buf)).Debug("queued message from sess.Receive")
                                queue <- buf
                        }
                }
@@ -82,39 +77,43 @@ func (h *handler) Handle(ws wsConn, events <-chan *event) (stats handlerStats) {
                for e := range queue {
                        if buf, ok := e.([]byte); ok {
                                ws.SetWriteDeadline(time.Now().Add(h.PingTimeout))
-                               sess.debugLogf("handler: send msg: %s", string(buf))
+                               logger(ctx).WithField("frame", string(buf)).Debug("send msg buf")
                                _, err := ws.Write(buf)
                                if err != nil {
-                                       sess.debugLogf("handler: write {}: %s", err)
+                                       logger(ctx).WithError(err).Error("write failed")
                                        stop <- err
                                        break
                                }
                                continue
                        }
                        e := e.(*event)
+                       log := logger(ctx).WithField("serial", e.Serial)
 
                        buf, err := sess.EventMessage(e)
                        if err != nil {
-                               sess.debugLogf("EventMessage %d: err %s", err)
+                               log.WithError(err).Error("EventMessage failed")
                                stop <- err
                                break
                        } else if len(buf) == 0 {
-                               sess.debugLogf("EventMessage %d: skip", e.Serial)
+                               log.Debug("skip")
                                continue
                        }
 
-                       sess.debugLogf("handler: send event %d: %q", e.Serial, buf)
+                       log.WithField("frame", string(buf)).Debug("send event")
                        ws.SetWriteDeadline(time.Now().Add(h.PingTimeout))
                        t0 := time.Now()
                        _, err = ws.Write(buf)
                        if err != nil {
-                               sess.debugLogf("handler: write: %s", err)
+                               log.WithError(err).Error("write failed")
                                stop <- err
                                break
                        }
-                       sess.debugLogf("handler: sent event %d", e.Serial)
-                       stats.WriteDelay += time.Since(t0)
-                       stats.QueueDelay += t0.Sub(e.Received)
+                       log.Debug("sent")
+
+                       if e != nil {
+                               stats.QueueDelayNs += t0.Sub(e.Received)
+                       }
+                       stats.WriteDelayNs += time.Since(t0)
                        stats.EventBytes += uint64(len(buf))
                        stats.EventCount++
                }
index 1511691c7ddcc6f6aa752d86bec78972a4f25978..d3aa82d9b2638b4baaa643f841b5a51252810d0a 100644 (file)
@@ -1,41 +1,22 @@
 package main
 
 import (
-       "encoding/json"
-       "fmt"
-       "log"
-       "time"
-)
-
-func init() {
-       log.SetFlags(0)
-}
+       "context"
 
-func errorLogf(f string, args ...interface{}) {
-       log.Print(`{"error":`, string(mustMarshal(fmt.Sprintf(f, args...))), `}`)
-}
+       log "github.com/Sirupsen/logrus"
+)
 
-var debugLogf = func(f string, args ...interface{}) {
-       log.Print(`{"debug":`, string(mustMarshal(fmt.Sprintf(f, args...))), `}`)
-}
+var loggerCtxKey = new(int)
 
-func mustMarshal(v interface{}) []byte {
-       buf, err := json.Marshal(v)
-       if err != nil {
-               panic(err)
-       }
-       return buf
+func contextWithLogger(ctx context.Context, logger *log.Entry) context.Context {
+       return context.WithValue(ctx, loggerCtxKey, logger)
 }
 
-func logj(args ...interface{}) {
-       m := map[string]interface{}{"Time": time.Now().UTC()}
-       for i := 0; i < len(args)-1; i += 2 {
-               m[fmt.Sprintf("%s", args[i])] = args[i+1]
-       }
-       buf, err := json.Marshal(m)
-       if err != nil {
-               errorLogf("logj: %s", err)
-               return
+func logger(ctx context.Context) *log.Entry {
+       if ctx != nil {
+               if logger, ok := ctx.Value(loggerCtxKey).(*log.Entry); ok {
+                       return logger
+               }
        }
-       log.Print(string(buf))
+       return log.WithFields(nil)
 }
index 719128f3f9fa33545d435b0bd922f5f700cd0c3f..c83f8d956a814e77c3255b13ba26ce4078df2f89 100644 (file)
@@ -3,11 +3,11 @@ package main
 import (
        "flag"
        "fmt"
-       "log"
        "net/http"
        "time"
 
        "git.curoverse.com/arvados.git/sdk/go/config"
+       log "github.com/Sirupsen/logrus"
 )
 
 func main() {
@@ -20,8 +20,24 @@ func main() {
        if err != nil {
                log.Fatal(err)
        }
-       if !cfg.Debug {
-               debugLogf = func(string, ...interface{}) {}
+
+       lvl, err := log.ParseLevel(cfg.LogLevel)
+       if err != nil {
+               log.Fatal(err)
+       }
+       log.SetLevel(lvl)
+       switch cfg.LogFormat {
+       case "text":
+               log.SetFormatter(&log.TextFormatter{
+                       FullTimestamp:   true,
+                       TimestampFormat: time.RFC3339Nano,
+               })
+       case "json":
+               log.SetFormatter(&log.JSONFormatter{
+                       TimestampFormat: time.RFC3339Nano,
+               })
+       default:
+               log.WithField("LogFormat", cfg.LogFormat).Fatal("unknown log format")
        }
 
        if *dumpConfig {
@@ -49,6 +65,6 @@ func main() {
        }
        eventSource.NewSink().Stop()
 
-       log.Printf("listening at %s", srv.Addr)
+       log.WithField("Listen", srv.Addr).Info("listening")
        log.Fatal(srv.ListenAndServe())
 }
index 1dc06b852260e1e6b54b20870491ad363f57699f..30276e4f6fa8ca75d4eabfd7ddc7368a0082d800 100644 (file)
@@ -43,10 +43,13 @@ func (pc *cachingPermChecker) SetToken(token string) {
 }
 
 func (pc *cachingPermChecker) Check(uuid string) (bool, error) {
+       logger := logger(nil).
+               WithField("token", pc.Client.AuthToken).
+               WithField("uuid", uuid)
        pc.tidy()
        now := time.Now()
        if perm, ok := pc.cache[uuid]; ok && now.Sub(perm.Time) < maxPermCacheAge {
-               debugLogf("perm (cached): %+q %+q ...%v", pc.Client.AuthToken, uuid, perm.allowed)
+               logger.WithField("allowed", perm.allowed).Debug("cache hit")
                return perm.allowed, nil
        }
        var buf map[string]interface{}
@@ -61,13 +64,13 @@ func (pc *cachingPermChecker) Check(uuid string) (bool, error) {
        var allowed bool
        if err == nil {
                allowed = true
-       } else if txErr, ok := err.(arvados.TransactionError); ok && txErr.StatusCode == http.StatusNotFound {
+       } else if txErr, ok := err.(*arvados.TransactionError); ok && txErr.StatusCode == http.StatusNotFound {
                allowed = false
        } else {
-               errorLogf("perm err: %+q %+q: %T %s", pc.Client.AuthToken, uuid, err, err)
+               logger.WithError(err).Error("lookup error")
                return false, err
        }
-       debugLogf("perm: %+q %+q ...%v", pc.Client.AuthToken, uuid, allowed)
+       logger.WithField("allowed", allowed).Debug("cache miss")
        pc.cache[uuid] = cacheEnt{Time: now, allowed: allowed}
        return allowed, nil
 }
index 08fbee1d584c229e3fce106947311af5f4d165c3..206cfeb73bce075ded3b7cf30d8b2363a7e418b8 100644 (file)
@@ -2,8 +2,6 @@ package main
 
 import (
        "database/sql"
-       "fmt"
-       "log"
        "strconv"
        "strings"
        "sync"
@@ -45,10 +43,10 @@ func (ps *pgEventSource) setup() {
 
        db, err := sql.Open("postgres", ps.DataSource)
        if err != nil {
-               log.Fatalf("sql.Open: %s", err)
+               logger(nil).WithError(err).Fatal("sql.Open failed")
        }
        if err = db.Ping(); err != nil {
-               log.Fatalf("db.Ping: %s", err)
+               logger(nil).WithError(err).Fatal("db.Ping failed")
        }
        ps.db = db
 
@@ -58,14 +56,15 @@ func (ps *pgEventSource) setup() {
                        // on missed events, we cannot recover from a
                        // dropped connection without breaking our
                        // promises to clients.
-                       ps.shutdown <- fmt.Errorf("pgEventSource listener problem: %s", err)
+                       logger(nil).WithError(err).Error("listener problem")
+                       ps.shutdown <- err
                }
        })
        err = ps.pqListener.Listen("logs")
        if err != nil {
-               log.Fatal(err)
+               logger(nil).WithError(err).Fatal("pq Listen failed")
        }
-       debugLogf("pgEventSource listening")
+       logger(nil).Debug("pgEventSource listening")
 
        go ps.run()
 }
@@ -81,7 +80,12 @@ func (ps *pgEventSource) run() {
                        // concurrent queries would be bounded by
                        // client_count X client_queue_size.
                        e.Detail()
-                       debugLogf("event %d detail %+v", e.Serial, e.Detail())
+
+                       logger(nil).
+                               WithField("serial", e.Serial).
+                               WithField("detail", e.Detail()).
+                               Debug("event ready")
+
                        ps.mtx.Lock()
                        for sink := range ps.sinks {
                                sink.channel <- e
@@ -97,13 +101,13 @@ func (ps *pgEventSource) run() {
                select {
                case err, ok := <-ps.shutdown:
                        if ok {
-                               debugLogf("shutdown on error: %s", err)
+                               logger(nil).WithError(err).Info("shutdown")
                        }
                        close(eventQueue)
                        return
 
                case <-ticker.C:
-                       debugLogf("pgEventSource listener ping")
+                       logger(nil).Debug("listener ping")
                        ps.pqListener.Ping()
 
                case pqEvent, ok := <-ps.pqListener.Notify:
@@ -116,7 +120,7 @@ func (ps *pgEventSource) run() {
                        }
                        logID, err := strconv.ParseUint(pqEvent.Extra, 10, 64)
                        if err != nil {
-                               log.Printf("bad notify payload: %+v", pqEvent)
+                               logger(nil).WithField("pqEvent", pqEvent).Error("bad notify payload")
                                continue
                        }
                        serial++
@@ -126,7 +130,7 @@ func (ps *pgEventSource) run() {
                                Serial:   serial,
                                db:       ps.db,
                        }
-                       debugLogf("event %d %+v", e.Serial, e)
+                       logger(nil).WithField("event", e).Debug("incoming")
                        eventQueue <- e
                        go e.Detail()
                }
index 2a4e52e31a9d159805ef306353bd0b29f66f37cb..34656ad91a5f01d09e3034a7c80cb01f29fbfb74 100644 (file)
@@ -2,22 +2,33 @@ package main
 
 import (
        "database/sql"
-       "encoding/json"
-       "log"
+       "io"
        "net/http"
+       "strconv"
        "sync"
        "time"
 
        "git.curoverse.com/arvados.git/sdk/go/arvados"
+       log "github.com/Sirupsen/logrus"
        "golang.org/x/net/websocket"
 )
 
+type wsConn interface {
+       io.ReadWriter
+       Request() *http.Request
+       SetReadDeadline(time.Time) error
+       SetWriteDeadline(time.Time) error
+}
+
 type router struct {
        Config *Config
 
        eventSource eventSource
        mux         *http.ServeMux
        setupOnce   sync.Once
+
+       lastReqID  int64
+       lastReqMtx sync.Mutex
 }
 
 func (rtr *router) setup() {
@@ -30,7 +41,7 @@ func (rtr *router) makeServer(newSession func(wsConn, arvados.Client, *sql.DB) (
        handler := &handler{
                PingTimeout: rtr.Config.PingTimeout.Duration(),
                QueueSize:   rtr.Config.ClientEventQueue,
-               NewSession:  func(ws wsConn) (session, error) {
+               NewSession: func(ws wsConn) (session, error) {
                        return newSession(ws, rtr.Config.Client, rtr.eventSource.DB())
                },
        }
@@ -39,17 +50,16 @@ func (rtr *router) makeServer(newSession func(wsConn, arvados.Client, *sql.DB) (
                        return nil
                },
                Handler: websocket.Handler(func(ws *websocket.Conn) {
-                       logj("Type", "connect",
-                               "RemoteAddr", ws.Request().RemoteAddr)
                        t0 := time.Now()
-
                        sink := rtr.eventSource.NewSink()
+                       logger(ws.Request().Context()).Info("connected")
+
                        stats := handler.Handle(ws, sink.Channel())
 
-                       logj("Type", "disconnect",
-                               "RemoteAddr", ws.Request().RemoteAddr,
-                               "Elapsed", time.Now().Sub(t0).Seconds(),
-                               "Stats", stats)
+                       logger(ws.Request().Context()).WithFields(log.Fields{
+                               "Elapsed": time.Now().Sub(t0).Seconds(),
+                               "Stats":   stats,
+                       }).Info("disconnect")
 
                        sink.Stop()
                        ws.Close()
@@ -57,18 +67,25 @@ func (rtr *router) makeServer(newSession func(wsConn, arvados.Client, *sql.DB) (
        }
 }
 
+func (rtr *router) newReqID() string {
+       rtr.lastReqMtx.Lock()
+       defer rtr.lastReqMtx.Unlock()
+       id := time.Now().UnixNano()
+       if id <= rtr.lastReqID {
+               id = rtr.lastReqID + 1
+       }
+       return strconv.FormatInt(id, 36)
+}
+
 func (rtr *router) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
        rtr.setupOnce.Do(rtr.setup)
-       logj("Type", "request",
-               "RemoteAddr", req.RemoteAddr,
-               "X-Forwarded-For", req.Header.Get("X-Forwarded-For"))
+       logger := logger(req.Context()).
+               WithField("RequestID", rtr.newReqID())
+       ctx := contextWithLogger(req.Context(), logger)
+       req = req.WithContext(ctx)
+       logger.WithFields(log.Fields{
+               "RemoteAddr":      req.RemoteAddr,
+               "X-Forwarded-For": req.Header.Get("X-Forwarded-For"),
+       }).Info("accept request")
        rtr.mux.ServeHTTP(resp, req)
 }
-
-func reqLog(m map[string]interface{}) {
-       j, err := json.Marshal(m)
-       if err != nil {
-               log.Fatal(err)
-       }
-       log.Print(string(j))
-}
index a0658d964cb8d2b97e32e83c22dc39370813fad5..d148f59128da879eb37689120ccdf95700018f5f 100644 (file)
@@ -22,6 +22,4 @@ type session interface {
        // incoming events will be queued. If the event queue fills
        // up, the connection will be dropped.
        EventMessage(*event) ([]byte, error)
-
-       debugLogf(string, ...interface{})
 }
index 33cdb2f3f05a7166e7f18320657eac2fbcb787c2..210c8c5d6f4825ac1d47dc4f9391de232cbbe578 100644 (file)
@@ -4,11 +4,11 @@ import (
        "database/sql"
        "encoding/json"
        "errors"
-       "log"
        "sync"
        "time"
 
        "git.curoverse.com/arvados.git/sdk/go/arvados"
+       log "github.com/Sirupsen/logrus"
 )
 
 var (
@@ -26,6 +26,7 @@ type v0session struct {
        db            *sql.DB
        permChecker   permChecker
        subscriptions []v0subscribe
+       log           *log.Entry
        mtx           sync.Mutex
        setupOnce     sync.Once
 }
@@ -35,35 +36,31 @@ func NewSessionV0(ws wsConn, ac arvados.Client, db *sql.DB) (session, error) {
                ws:          ws,
                db:          db,
                permChecker: NewPermChecker(ac),
+               log:         logger(ws.Request().Context()),
        }
 
        err := ws.Request().ParseForm()
        if err != nil {
-               log.Printf("%s ParseForm: %s", ws.Request().RemoteAddr, err)
+               sess.log.WithError(err).Error("ParseForm failed")
                return nil, err
        }
        token := ws.Request().Form.Get("api_token")
        sess.permChecker.SetToken(token)
-       sess.debugLogf("token = %+q", token)
+       sess.log.WithField("token", token).Debug("set token")
 
        return sess, nil
 }
 
-func (sess *v0session) debugLogf(s string, args ...interface{}) {
-       args = append([]interface{}{sess.ws.Request().RemoteAddr}, args...)
-       debugLogf("%s "+s, args...)
-}
-
 func (sess *v0session) Receive(msg map[string]interface{}, buf []byte) [][]byte {
-       sess.debugLogf("received message: %+v", msg)
+       sess.log.WithField("data", msg).Debug("received message")
        var sub v0subscribe
        if err := json.Unmarshal(buf, &sub); err != nil {
-               sess.debugLogf("ignored unrecognized request: %s", err)
+               sess.log.WithError(err).Info("ignored invalid request")
                return nil
        }
        if sub.Method == "subscribe" {
-               sub.prepare()
-               sess.debugLogf("subscription: %v", sub)
+               sub.prepare(sess)
+               sess.log.WithField("sub", sub).Debug("sub prepared")
                sess.mtx.Lock()
                sess.subscriptions = append(sess.subscriptions, sub)
                sess.mtx.Unlock()
@@ -118,7 +115,7 @@ func (sess *v0session) Filter(e *event) bool {
        sess.mtx.Lock()
        defer sess.mtx.Unlock()
        for _, sub := range sess.subscriptions {
-               if sub.match(e) {
+               if sub.match(sess, e) {
                        return true
                }
        }
@@ -129,7 +126,7 @@ func (sub *v0subscribe) getOldEvents(sess *v0session) (msgs [][]byte) {
        if sub.LastLogID == 0 {
                return
        }
-       debugLogf("getOldEvents(%d)", sub.LastLogID)
+       sess.log.WithField("LastLogID", sub.LastLogID).Debug("getOldEvents")
        // Here we do a "select id" query and queue an event for every
        // log since the given ID, then use (*event)Detail() to
        // retrieve the whole row and decide whether to send it. This
@@ -144,14 +141,14 @@ func (sub *v0subscribe) getOldEvents(sess *v0session) (msgs [][]byte) {
                sub.LastLogID,
                time.Now().UTC().Add(-10*time.Minute).Format(time.RFC3339Nano))
        if err != nil {
-               errorLogf("db.Query: %s", err)
+               sess.log.WithError(err).Error("db.Query failed")
                return
        }
        for rows.Next() {
                var id uint64
                err := rows.Scan(&id)
                if err != nil {
-                       errorLogf("Scan: %s", err)
+                       sess.log.WithError(err).Error("row Scan failed")
                        continue
                }
                e := &event{
@@ -159,20 +156,20 @@ func (sub *v0subscribe) getOldEvents(sess *v0session) (msgs [][]byte) {
                        Received: time.Now(),
                        db:       sess.db,
                }
-               if !sub.match(e) {
-                       debugLogf("skip old event %+v", e)
+               if !sub.match(sess, e) {
+                       sess.log.WithField("event", e).Debug("skip old event")
                        continue
                }
                msg, err := sess.EventMessage(e)
                if err != nil {
-                       debugLogf("event marshal: %s", err)
+                       sess.log.WithError(err).Error("event marshal failed")
                        continue
                }
-               debugLogf("old event: %s", string(msg))
+               sess.log.WithField("data", msg).Debug("will queue old event")
                msgs = append(msgs, msg)
        }
        if err := rows.Err(); err != nil {
-               errorLogf("db.Query: %s", err)
+               sess.log.WithError(err).Error("db.Query failed")
        }
        return
 }
@@ -187,23 +184,25 @@ type v0subscribe struct {
 
 type v0filter [3]interface{}
 
-func (sub *v0subscribe) match(e *event) bool {
+func (sub *v0subscribe) match(sess *v0session, e *event) bool {
+       log := sess.log.WithField("LogID", e.LogID)
        detail := e.Detail()
        if detail == nil {
-               debugLogf("match(%d): failed on no detail", e.LogID)
+               log.Error("match failed, no detail")
                return false
        }
+       log = log.WithField("funcs", len(sub.funcs))
        for i, f := range sub.funcs {
                if !f(e) {
-                       debugLogf("match(%d): failed on func %d", e.LogID, i)
+                       log.WithField("func", i).Debug("match failed")
                        return false
                }
        }
-       debugLogf("match(%d): passed %d funcs", e.LogID, len(sub.funcs))
+       log.Debug("match passed")
        return true
 }
 
-func (sub *v0subscribe) prepare() {
+func (sub *v0subscribe) prepare(sess *v0session) {
        for _, f := range sub.Filters {
                if len(f) != 3 {
                        continue
@@ -224,7 +223,6 @@ func (sub *v0subscribe) prepare() {
                                }
                        }
                        sub.funcs = append(sub.funcs, func(e *event) bool {
-                               debugLogf("event_type func: %v in %v", e.Detail().EventType, strs)
                                for _, s := range strs {
                                        if s == e.Detail().EventType {
                                                return true
@@ -243,36 +241,36 @@ func (sub *v0subscribe) prepare() {
                        }
                        t, err := time.Parse(time.RFC3339Nano, tstr)
                        if err != nil {
-                               debugLogf("time.Parse(%q): %s", tstr, err)
+                               sess.log.WithField("data", tstr).WithError(err).Info("time.Parse failed")
                                continue
                        }
+                       var fn func(*event) bool
                        switch op {
                        case ">=":
-                               sub.funcs = append(sub.funcs, func(e *event) bool {
-                                       debugLogf("created_at func: %v >= %v", e.Detail().CreatedAt, t)
+                               fn = func(e *event) bool {
                                        return !e.Detail().CreatedAt.Before(t)
-                               })
+                               }
                        case "<=":
-                               sub.funcs = append(sub.funcs, func(e *event) bool {
-                                       debugLogf("created_at func: %v <= %v", e.Detail().CreatedAt, t)
+                               fn = func(e *event) bool {
                                        return !e.Detail().CreatedAt.After(t)
-                               })
+                               }
                        case ">":
-                               sub.funcs = append(sub.funcs, func(e *event) bool {
-                                       debugLogf("created_at func: %v > %v", e.Detail().CreatedAt, t)
+                               fn = func(e *event) bool {
                                        return e.Detail().CreatedAt.After(t)
-                               })
+                               }
                        case "<":
-                               sub.funcs = append(sub.funcs, func(e *event) bool {
-                                       debugLogf("created_at func: %v < %v", e.Detail().CreatedAt, t)
+                               fn = func(e *event) bool {
                                        return e.Detail().CreatedAt.Before(t)
-                               })
+                               }
                        case "=":
-                               sub.funcs = append(sub.funcs, func(e *event) bool {
-                                       debugLogf("created_at func: %v = %v", e.Detail().CreatedAt, t)
+                               fn = func(e *event) bool {
                                        return e.Detail().CreatedAt.Equal(t)
-                               })
+                               }
+                       default:
+                               sess.log.WithField("operator", op).Info("bogus operator")
+                               continue
                        }
+                       sub.funcs = append(sub.funcs, fn)
                }
        }
 }