From dfb995ab9ea0f1d8808c812870db717164ac95f4 Mon Sep 17 00:00:00 2001 From: Tom Clegg Date: Wed, 16 Nov 2016 14:09:15 -0500 Subject: [PATCH] 8460: Return recent events if last_log_id given. --- services/ws/event.go | 1 + services/ws/handler.go | 6 ++- services/ws/permission.go | 6 +-- services/ws/pg.go | 5 ++ services/ws/router.go | 8 +-- services/ws/session_v0.go | 101 ++++++++++++++++++++++---------------- services/ws/session_v1.go | 3 +- 7 files changed, 80 insertions(+), 50 deletions(-) diff --git a/services/ws/event.go b/services/ws/event.go index 09c9d0f0a4..77acf4496f 100644 --- a/services/ws/event.go +++ b/services/ws/event.go @@ -17,6 +17,7 @@ type eventSink interface { type eventSource interface { NewSink() eventSink + DB() *sql.DB } type event struct { diff --git a/services/ws/handler.go b/services/ws/handler.go index 59d690f6d4..1470c6608c 100644 --- a/services/ws/handler.go +++ b/services/ws/handler.go @@ -21,7 +21,7 @@ type handler struct { Client arvados.Client PingTimeout time.Duration QueueSize int - NewSession func(wsConn, arvados.Client) (session, error) + NewSession func(wsConn) (session, error) } type handlerStats struct { @@ -32,7 +32,7 @@ type handlerStats struct { } func (h *handler) Handle(ws wsConn, events <-chan *event) (stats handlerStats) { - sess, err := h.NewSession(ws, h.Client) + sess, err := h.NewSession(ws) if err != nil { log.Printf("%s NewSession: %s", ws.Request().RemoteAddr, err) return @@ -72,6 +72,7 @@ func (h *handler) Handle(ws wsConn, events <-chan *event) (stats handlerStats) { return } for _, buf := range sess.Receive(msg, buf[:n]) { + sess.debugLogf("handler: to queue: %s", string(buf)) queue <- buf } } @@ -81,6 +82,7 @@ func (h *handler) Handle(ws wsConn, events <-chan *event) (stats handlerStats) { for e := range queue { if buf, ok := e.([]byte); ok { ws.SetWriteDeadline(time.Now().Add(h.PingTimeout)) + sess.debugLogf("handler: send msg: %s", string(buf)) _, err := ws.Write(buf) if err != nil { sess.debugLogf("handler: write {}: %s", err) diff --git a/services/ws/permission.go b/services/ws/permission.go index 090e5ff7ca..1dc06b8522 100644 --- a/services/ws/permission.go +++ b/services/ws/permission.go @@ -46,7 +46,7 @@ func (pc *cachingPermChecker) Check(uuid string) (bool, error) { pc.tidy() now := time.Now() if perm, ok := pc.cache[uuid]; ok && now.Sub(perm.Time) < maxPermCacheAge { - debugLogf("perm (cached): %+q %+q => %v", pc.Client.AuthToken, uuid, perm.allowed) + debugLogf("perm (cached): %+q %+q ...%v", pc.Client.AuthToken, uuid, perm.allowed) return perm.allowed, nil } var buf map[string]interface{} @@ -64,10 +64,10 @@ func (pc *cachingPermChecker) Check(uuid string) (bool, error) { } else if txErr, ok := err.(arvados.TransactionError); ok && txErr.StatusCode == http.StatusNotFound { allowed = false } else { - errorLogf("perm err: %+q %+q: %s", pc.Client.AuthToken, uuid, err) + errorLogf("perm err: %+q %+q: %T %s", pc.Client.AuthToken, uuid, err, err) return false, err } - debugLogf("perm: %+q %+q => %v", pc.Client.AuthToken, uuid, allowed) + debugLogf("perm: %+q %+q ...%v", pc.Client.AuthToken, uuid, allowed) pc.cache[uuid] = cacheEnt{Time: now, allowed: allowed} return allowed, nil } diff --git a/services/ws/pg.go b/services/ws/pg.go index a5af9f765b..08fbee1d58 100644 --- a/services/ws/pg.go +++ b/services/ws/pg.go @@ -152,6 +152,11 @@ func (ps *pgEventSource) NewSink() eventSink { return sink } +func (ps *pgEventSource) DB() *sql.DB { + ps.setupOnce.Do(ps.setup) + return ps.db +} + type pgEventSink struct { channel chan *event source *pgEventSource diff --git a/services/ws/router.go b/services/ws/router.go index ba8b46bb77..2a4e52e31a 100644 --- a/services/ws/router.go +++ b/services/ws/router.go @@ -1,6 +1,7 @@ package main import ( + "database/sql" "encoding/json" "log" "net/http" @@ -25,12 +26,13 @@ func (rtr *router) setup() { rtr.mux.Handle("/arvados/v1/events.ws", rtr.makeServer(NewSessionV1)) } -func (rtr *router) makeServer(newSession func(wsConn, arvados.Client) (session, error)) *websocket.Server { +func (rtr *router) makeServer(newSession func(wsConn, arvados.Client, *sql.DB) (session, error)) *websocket.Server { handler := &handler{ - Client: rtr.Config.Client, PingTimeout: rtr.Config.PingTimeout.Duration(), QueueSize: rtr.Config.ClientEventQueue, - NewSession: newSession, + NewSession: func(ws wsConn) (session, error) { + return newSession(ws, rtr.Config.Client, rtr.eventSource.DB()) + }, } return &websocket.Server{ Handshake: func(c *websocket.Config, r *http.Request) error { diff --git a/services/ws/session_v0.go b/services/ws/session_v0.go index 5622304bf4..33cdb2f3f0 100644 --- a/services/ws/session_v0.go +++ b/services/ws/session_v0.go @@ -1,11 +1,10 @@ package main import ( + "database/sql" "encoding/json" "errors" - "fmt" "log" - "net/url" "sync" "time" @@ -24,15 +23,17 @@ var ( type v0session struct { ws wsConn + db *sql.DB permChecker permChecker subscriptions []v0subscribe mtx sync.Mutex setupOnce sync.Once } -func NewSessionV0(ws wsConn, ac arvados.Client) (session, error) { +func NewSessionV0(ws wsConn, ac arvados.Client, db *sql.DB) (session, error) { sess := &v0session{ ws: ws, + db: db, permChecker: NewPermChecker(ac), } @@ -67,42 +68,7 @@ func (sess *v0session) Receive(msg map[string]interface{}, buf []byte) [][]byte sess.subscriptions = append(sess.subscriptions, sub) sess.mtx.Unlock() - resp := [][]byte{v0subscribeOK} - - if sub.LastLogID > 0 { - // Hack 1: use the permission checker's - // Arvados client to retrieve old log IDs. Use - // "created < 2 hours ago" to ensure the - // database query doesn't get too expensive - // when our client gives us last_log_id==1. - ac := sess.permChecker.(*cachingPermChecker).Client - var old arvados.LogList - ac.RequestAndDecode(&old, "GET", "arvados/v1/logs", nil, url.Values{ - "limit": {"1000"}, - "filters": {fmt.Sprintf( - `[["id",">",%d],["created_at",">","%s"]]`, - sub.LastLogID, - time.Now().UTC().Add(-2*time.Hour).Format(time.RFC3339Nano))}, - }) - for _, log := range old.Items { - // Hack 2: populate the event's logRow - // using the API response -- otherwise - // Detail() would crash because e.db - // is nil. - e := &event{ - LogID: log.ID, - Received: time.Now(), - logRow: &log, - } - msg, err := sess.EventMessage(e) - if err != nil { - continue - } - resp = append(resp, msg) - } - } - - return [][]byte{v0subscribeOK} + return append([][]byte{v0subscribeOK}, sub.getOldEvents(sess)...) } return [][]byte{v0subscribeFail} } @@ -159,6 +125,58 @@ func (sess *v0session) Filter(e *event) bool { return false } +func (sub *v0subscribe) getOldEvents(sess *v0session) (msgs [][]byte) { + if sub.LastLogID == 0 { + return + } + debugLogf("getOldEvents(%d)", sub.LastLogID) + // Here we do a "select id" query and queue an event for every + // log since the given ID, then use (*event)Detail() to + // retrieve the whole row and decide whether to send it. This + // approach is very inefficient if the subscriber asks for + // last_log_id==1, even if the filters end up matching very + // few events. + // + // To mitigate this, filter on "created > 10 minutes ago" when + // retrieving the list of old event IDs to consider. + rows, err := sess.db.Query( + `SELECT id FROM logs WHERE id > $1 AND created_at > $2 ORDER BY id`, + sub.LastLogID, + time.Now().UTC().Add(-10*time.Minute).Format(time.RFC3339Nano)) + if err != nil { + errorLogf("db.Query: %s", err) + return + } + for rows.Next() { + var id uint64 + err := rows.Scan(&id) + if err != nil { + errorLogf("Scan: %s", err) + continue + } + e := &event{ + LogID: id, + Received: time.Now(), + db: sess.db, + } + if !sub.match(e) { + debugLogf("skip old event %+v", e) + continue + } + msg, err := sess.EventMessage(e) + if err != nil { + debugLogf("event marshal: %s", err) + continue + } + debugLogf("old event: %s", string(msg)) + msgs = append(msgs, msg) + } + if err := rows.Err(); err != nil { + errorLogf("db.Query: %s", err) + } + return +} + type v0subscribe struct { Method string Filters []v0filter @@ -172,15 +190,16 @@ type v0filter [3]interface{} func (sub *v0subscribe) match(e *event) bool { detail := e.Detail() if detail == nil { + debugLogf("match(%d): failed on no detail", e.LogID) return false } - debugLogf("sub.match: len(funcs)==%d", len(sub.funcs)) for i, f := range sub.funcs { if !f(e) { - debugLogf("sub.match: failed on func %d", i) + debugLogf("match(%d): failed on func %d", e.LogID, i) return false } } + debugLogf("match(%d): passed %d funcs", e.LogID, len(sub.funcs)) return true } diff --git a/services/ws/session_v1.go b/services/ws/session_v1.go index bc09ed0db7..60d12c449c 100644 --- a/services/ws/session_v1.go +++ b/services/ws/session_v1.go @@ -1,11 +1,12 @@ package main import ( + "database/sql" "errors" "git.curoverse.com/arvados.git/sdk/go/arvados" ) -func NewSessionV1(ws wsConn, ac arvados.Client) (session, error) { +func NewSessionV1(ws wsConn, ac arvados.Client, db *sql.DB) (session, error) { return nil, errors.New("Not implemented") } -- 2.30.2