8460: Return recent events if last_log_id given.
authorTom Clegg <tom@curoverse.com>
Wed, 16 Nov 2016 19:09:15 +0000 (14:09 -0500)
committerTom Clegg <tom@curoverse.com>
Wed, 16 Nov 2016 19:09:15 +0000 (14:09 -0500)
services/ws/event.go
services/ws/handler.go
services/ws/permission.go
services/ws/pg.go
services/ws/router.go
services/ws/session_v0.go
services/ws/session_v1.go

index 09c9d0f0a4ed3b88a221d45fc5e267d41ca4dcee..77acf4496fa7957e13922787606c6da5ef670b3f 100644 (file)
@@ -17,6 +17,7 @@ type eventSink interface {
 
 type eventSource interface {
        NewSink() eventSink
+       DB() *sql.DB
 }
 
 type event struct {
index 59d690f6d40068ff4806f9c587a915ff7352e09f..1470c6608c73de37fc1b543f6d554d1ded12efe2 100644 (file)
@@ -21,7 +21,7 @@ type handler struct {
        Client      arvados.Client
        PingTimeout time.Duration
        QueueSize   int
-       NewSession  func(wsConn, arvados.Client) (session, error)
+       NewSession  func(wsConn) (session, error)
 }
 
 type handlerStats struct {
@@ -32,7 +32,7 @@ type handlerStats struct {
 }
 
 func (h *handler) Handle(ws wsConn, events <-chan *event) (stats handlerStats) {
-       sess, err := h.NewSession(ws, h.Client)
+       sess, err := h.NewSession(ws)
        if err != nil {
                log.Printf("%s NewSession: %s", ws.Request().RemoteAddr, err)
                return
@@ -72,6 +72,7 @@ func (h *handler) Handle(ws wsConn, events <-chan *event) (stats handlerStats) {
                                return
                        }
                        for _, buf := range sess.Receive(msg, buf[:n]) {
+                               sess.debugLogf("handler: to queue: %s", string(buf))
                                queue <- buf
                        }
                }
@@ -81,6 +82,7 @@ func (h *handler) Handle(ws wsConn, events <-chan *event) (stats handlerStats) {
                for e := range queue {
                        if buf, ok := e.([]byte); ok {
                                ws.SetWriteDeadline(time.Now().Add(h.PingTimeout))
+                               sess.debugLogf("handler: send msg: %s", string(buf))
                                _, err := ws.Write(buf)
                                if err != nil {
                                        sess.debugLogf("handler: write {}: %s", err)
index 090e5ff7caf86809c72241679b2d6441b01e8cb5..1dc06b852260e1e6b54b20870491ad363f57699f 100644 (file)
@@ -46,7 +46,7 @@ func (pc *cachingPermChecker) Check(uuid string) (bool, error) {
        pc.tidy()
        now := time.Now()
        if perm, ok := pc.cache[uuid]; ok && now.Sub(perm.Time) < maxPermCacheAge {
-               debugLogf("perm (cached): %+q %+q => %v", pc.Client.AuthToken, uuid, perm.allowed)
+               debugLogf("perm (cached): %+q %+q ...%v", pc.Client.AuthToken, uuid, perm.allowed)
                return perm.allowed, nil
        }
        var buf map[string]interface{}
@@ -64,10 +64,10 @@ func (pc *cachingPermChecker) Check(uuid string) (bool, error) {
        } else if txErr, ok := err.(arvados.TransactionError); ok && txErr.StatusCode == http.StatusNotFound {
                allowed = false
        } else {
-               errorLogf("perm err: %+q %+q: %s", pc.Client.AuthToken, uuid, err)
+               errorLogf("perm err: %+q %+q: %T %s", pc.Client.AuthToken, uuid, err, err)
                return false, err
        }
-       debugLogf("perm: %+q %+q => %v", pc.Client.AuthToken, uuid, allowed)
+       debugLogf("perm: %+q %+q ...%v", pc.Client.AuthToken, uuid, allowed)
        pc.cache[uuid] = cacheEnt{Time: now, allowed: allowed}
        return allowed, nil
 }
index a5af9f765ba48f9187bb79322774cd252756fe1a..08fbee1d584c229e3fce106947311af5f4d165c3 100644 (file)
@@ -152,6 +152,11 @@ func (ps *pgEventSource) NewSink() eventSink {
        return sink
 }
 
+func (ps *pgEventSource) DB() *sql.DB {
+       ps.setupOnce.Do(ps.setup)
+       return ps.db
+}
+
 type pgEventSink struct {
        channel chan *event
        source  *pgEventSource
index ba8b46bb775098c9894a544087bc14ea355c4d54..2a4e52e31a9d159805ef306353bd0b29f66f37cb 100644 (file)
@@ -1,6 +1,7 @@
 package main
 
 import (
+       "database/sql"
        "encoding/json"
        "log"
        "net/http"
@@ -25,12 +26,13 @@ func (rtr *router) setup() {
        rtr.mux.Handle("/arvados/v1/events.ws", rtr.makeServer(NewSessionV1))
 }
 
-func (rtr *router) makeServer(newSession func(wsConn, arvados.Client) (session, error)) *websocket.Server {
+func (rtr *router) makeServer(newSession func(wsConn, arvados.Client, *sql.DB) (session, error)) *websocket.Server {
        handler := &handler{
-               Client:      rtr.Config.Client,
                PingTimeout: rtr.Config.PingTimeout.Duration(),
                QueueSize:   rtr.Config.ClientEventQueue,
-               NewSession:  newSession,
+               NewSession:  func(ws wsConn) (session, error) {
+                       return newSession(ws, rtr.Config.Client, rtr.eventSource.DB())
+               },
        }
        return &websocket.Server{
                Handshake: func(c *websocket.Config, r *http.Request) error {
index 5622304bf4df562e56b1f478f3f3ecb23bcd9333..33cdb2f3f05a7166e7f18320657eac2fbcb787c2 100644 (file)
@@ -1,11 +1,10 @@
 package main
 
 import (
+       "database/sql"
        "encoding/json"
        "errors"
-       "fmt"
        "log"
-       "net/url"
        "sync"
        "time"
 
@@ -24,15 +23,17 @@ var (
 
 type v0session struct {
        ws            wsConn
+       db            *sql.DB
        permChecker   permChecker
        subscriptions []v0subscribe
        mtx           sync.Mutex
        setupOnce     sync.Once
 }
 
-func NewSessionV0(ws wsConn, ac arvados.Client) (session, error) {
+func NewSessionV0(ws wsConn, ac arvados.Client, db *sql.DB) (session, error) {
        sess := &v0session{
                ws:          ws,
+               db:          db,
                permChecker: NewPermChecker(ac),
        }
 
@@ -67,42 +68,7 @@ func (sess *v0session) Receive(msg map[string]interface{}, buf []byte) [][]byte
                sess.subscriptions = append(sess.subscriptions, sub)
                sess.mtx.Unlock()
 
-               resp := [][]byte{v0subscribeOK}
-
-               if sub.LastLogID > 0 {
-                       // Hack 1: use the permission checker's
-                       // Arvados client to retrieve old log IDs. Use
-                       // "created < 2 hours ago" to ensure the
-                       // database query doesn't get too expensive
-                       // when our client gives us last_log_id==1.
-                       ac := sess.permChecker.(*cachingPermChecker).Client
-                       var old arvados.LogList
-                       ac.RequestAndDecode(&old, "GET", "arvados/v1/logs", nil, url.Values{
-                               "limit": {"1000"},
-                               "filters": {fmt.Sprintf(
-                                       `[["id",">",%d],["created_at",">","%s"]]`,
-                                       sub.LastLogID,
-                                       time.Now().UTC().Add(-2*time.Hour).Format(time.RFC3339Nano))},
-                       })
-                       for _, log := range old.Items {
-                               // Hack 2: populate the event's logRow
-                               // using the API response -- otherwise
-                               // Detail() would crash because e.db
-                               // is nil.
-                               e := &event{
-                                       LogID:    log.ID,
-                                       Received: time.Now(),
-                                       logRow:   &log,
-                               }
-                               msg, err := sess.EventMessage(e)
-                               if err != nil {
-                                       continue
-                               }
-                               resp = append(resp, msg)
-                       }
-               }
-
-               return [][]byte{v0subscribeOK}
+               return append([][]byte{v0subscribeOK}, sub.getOldEvents(sess)...)
        }
        return [][]byte{v0subscribeFail}
 }
@@ -159,6 +125,58 @@ func (sess *v0session) Filter(e *event) bool {
        return false
 }
 
+func (sub *v0subscribe) getOldEvents(sess *v0session) (msgs [][]byte) {
+       if sub.LastLogID == 0 {
+               return
+       }
+       debugLogf("getOldEvents(%d)", sub.LastLogID)
+       // Here we do a "select id" query and queue an event for every
+       // log since the given ID, then use (*event)Detail() to
+       // retrieve the whole row and decide whether to send it. This
+       // approach is very inefficient if the subscriber asks for
+       // last_log_id==1, even if the filters end up matching very
+       // few events.
+       //
+       // To mitigate this, filter on "created > 10 minutes ago" when
+       // retrieving the list of old event IDs to consider.
+       rows, err := sess.db.Query(
+               `SELECT id FROM logs WHERE id > $1 AND created_at > $2 ORDER BY id`,
+               sub.LastLogID,
+               time.Now().UTC().Add(-10*time.Minute).Format(time.RFC3339Nano))
+       if err != nil {
+               errorLogf("db.Query: %s", err)
+               return
+       }
+       for rows.Next() {
+               var id uint64
+               err := rows.Scan(&id)
+               if err != nil {
+                       errorLogf("Scan: %s", err)
+                       continue
+               }
+               e := &event{
+                       LogID:    id,
+                       Received: time.Now(),
+                       db:       sess.db,
+               }
+               if !sub.match(e) {
+                       debugLogf("skip old event %+v", e)
+                       continue
+               }
+               msg, err := sess.EventMessage(e)
+               if err != nil {
+                       debugLogf("event marshal: %s", err)
+                       continue
+               }
+               debugLogf("old event: %s", string(msg))
+               msgs = append(msgs, msg)
+       }
+       if err := rows.Err(); err != nil {
+               errorLogf("db.Query: %s", err)
+       }
+       return
+}
+
 type v0subscribe struct {
        Method    string
        Filters   []v0filter
@@ -172,15 +190,16 @@ type v0filter [3]interface{}
 func (sub *v0subscribe) match(e *event) bool {
        detail := e.Detail()
        if detail == nil {
+               debugLogf("match(%d): failed on no detail", e.LogID)
                return false
        }
-       debugLogf("sub.match: len(funcs)==%d", len(sub.funcs))
        for i, f := range sub.funcs {
                if !f(e) {
-                       debugLogf("sub.match: failed on func %d", i)
+                       debugLogf("match(%d): failed on func %d", e.LogID, i)
                        return false
                }
        }
+       debugLogf("match(%d): passed %d funcs", e.LogID, len(sub.funcs))
        return true
 }
 
index bc09ed0db78f7cf59eb129abf7452edb1d6364b4..60d12c449cfb6f78f36cab87b6527dccf4d610ac 100644 (file)
@@ -1,11 +1,12 @@
 package main
 
 import (
+       "database/sql"
        "errors"
 
        "git.curoverse.com/arvados.git/sdk/go/arvados"
 )
 
-func NewSessionV1(ws wsConn, ac arvados.Client) (session, error) {
+func NewSessionV1(ws wsConn, ac arvados.Client, db *sql.DB) (session, error) {
        return nil, errors.New("Not implemented")
 }