Client arvados.Client
PingTimeout time.Duration
QueueSize int
- NewSession func(wsConn, arvados.Client) (session, error)
+ NewSession func(wsConn) (session, error)
}
type handlerStats struct {
}
func (h *handler) Handle(ws wsConn, events <-chan *event) (stats handlerStats) {
- sess, err := h.NewSession(ws, h.Client)
+ sess, err := h.NewSession(ws)
if err != nil {
log.Printf("%s NewSession: %s", ws.Request().RemoteAddr, err)
return
return
}
for _, buf := range sess.Receive(msg, buf[:n]) {
+ sess.debugLogf("handler: to queue: %s", string(buf))
queue <- buf
}
}
for e := range queue {
if buf, ok := e.([]byte); ok {
ws.SetWriteDeadline(time.Now().Add(h.PingTimeout))
+ sess.debugLogf("handler: send msg: %s", string(buf))
_, err := ws.Write(buf)
if err != nil {
sess.debugLogf("handler: write {}: %s", err)
pc.tidy()
now := time.Now()
if perm, ok := pc.cache[uuid]; ok && now.Sub(perm.Time) < maxPermCacheAge {
- debugLogf("perm (cached): %+q %+q => %v", pc.Client.AuthToken, uuid, perm.allowed)
+ debugLogf("perm (cached): %+q %+q ...%v", pc.Client.AuthToken, uuid, perm.allowed)
return perm.allowed, nil
}
var buf map[string]interface{}
} else if txErr, ok := err.(arvados.TransactionError); ok && txErr.StatusCode == http.StatusNotFound {
allowed = false
} else {
- errorLogf("perm err: %+q %+q: %s", pc.Client.AuthToken, uuid, err)
+ errorLogf("perm err: %+q %+q: %T %s", pc.Client.AuthToken, uuid, err, err)
return false, err
}
- debugLogf("perm: %+q %+q => %v", pc.Client.AuthToken, uuid, allowed)
+ debugLogf("perm: %+q %+q ...%v", pc.Client.AuthToken, uuid, allowed)
pc.cache[uuid] = cacheEnt{Time: now, allowed: allowed}
return allowed, nil
}
package main
import (
+ "database/sql"
"encoding/json"
"log"
"net/http"
rtr.mux.Handle("/arvados/v1/events.ws", rtr.makeServer(NewSessionV1))
}
-func (rtr *router) makeServer(newSession func(wsConn, arvados.Client) (session, error)) *websocket.Server {
+func (rtr *router) makeServer(newSession func(wsConn, arvados.Client, *sql.DB) (session, error)) *websocket.Server {
handler := &handler{
- Client: rtr.Config.Client,
PingTimeout: rtr.Config.PingTimeout.Duration(),
QueueSize: rtr.Config.ClientEventQueue,
- NewSession: newSession,
+ NewSession: func(ws wsConn) (session, error) {
+ return newSession(ws, rtr.Config.Client, rtr.eventSource.DB())
+ },
}
return &websocket.Server{
Handshake: func(c *websocket.Config, r *http.Request) error {
package main
import (
+ "database/sql"
"encoding/json"
"errors"
- "fmt"
"log"
- "net/url"
"sync"
"time"
type v0session struct {
ws wsConn
+ db *sql.DB
permChecker permChecker
subscriptions []v0subscribe
mtx sync.Mutex
setupOnce sync.Once
}
-func NewSessionV0(ws wsConn, ac arvados.Client) (session, error) {
+func NewSessionV0(ws wsConn, ac arvados.Client, db *sql.DB) (session, error) {
sess := &v0session{
ws: ws,
+ db: db,
permChecker: NewPermChecker(ac),
}
sess.subscriptions = append(sess.subscriptions, sub)
sess.mtx.Unlock()
- resp := [][]byte{v0subscribeOK}
-
- if sub.LastLogID > 0 {
- // Hack 1: use the permission checker's
- // Arvados client to retrieve old log IDs. Use
- // "created < 2 hours ago" to ensure the
- // database query doesn't get too expensive
- // when our client gives us last_log_id==1.
- ac := sess.permChecker.(*cachingPermChecker).Client
- var old arvados.LogList
- ac.RequestAndDecode(&old, "GET", "arvados/v1/logs", nil, url.Values{
- "limit": {"1000"},
- "filters": {fmt.Sprintf(
- `[["id",">",%d],["created_at",">","%s"]]`,
- sub.LastLogID,
- time.Now().UTC().Add(-2*time.Hour).Format(time.RFC3339Nano))},
- })
- for _, log := range old.Items {
- // Hack 2: populate the event's logRow
- // using the API response -- otherwise
- // Detail() would crash because e.db
- // is nil.
- e := &event{
- LogID: log.ID,
- Received: time.Now(),
- logRow: &log,
- }
- msg, err := sess.EventMessage(e)
- if err != nil {
- continue
- }
- resp = append(resp, msg)
- }
- }
-
- return [][]byte{v0subscribeOK}
+ return append([][]byte{v0subscribeOK}, sub.getOldEvents(sess)...)
}
return [][]byte{v0subscribeFail}
}
return false
}
+func (sub *v0subscribe) getOldEvents(sess *v0session) (msgs [][]byte) {
+ if sub.LastLogID == 0 {
+ return
+ }
+ debugLogf("getOldEvents(%d)", sub.LastLogID)
+ // Here we do a "select id" query and queue an event for every
+ // log since the given ID, then use (*event)Detail() to
+ // retrieve the whole row and decide whether to send it. This
+ // approach is very inefficient if the subscriber asks for
+ // last_log_id==1, even if the filters end up matching very
+ // few events.
+ //
+ // To mitigate this, filter on "created > 10 minutes ago" when
+ // retrieving the list of old event IDs to consider.
+ rows, err := sess.db.Query(
+ `SELECT id FROM logs WHERE id > $1 AND created_at > $2 ORDER BY id`,
+ sub.LastLogID,
+ time.Now().UTC().Add(-10*time.Minute).Format(time.RFC3339Nano))
+ if err != nil {
+ errorLogf("db.Query: %s", err)
+ return
+ }
+ for rows.Next() {
+ var id uint64
+ err := rows.Scan(&id)
+ if err != nil {
+ errorLogf("Scan: %s", err)
+ continue
+ }
+ e := &event{
+ LogID: id,
+ Received: time.Now(),
+ db: sess.db,
+ }
+ if !sub.match(e) {
+ debugLogf("skip old event %+v", e)
+ continue
+ }
+ msg, err := sess.EventMessage(e)
+ if err != nil {
+ debugLogf("event marshal: %s", err)
+ continue
+ }
+ debugLogf("old event: %s", string(msg))
+ msgs = append(msgs, msg)
+ }
+ if err := rows.Err(); err != nil {
+ errorLogf("db.Query: %s", err)
+ }
+ return
+}
+
type v0subscribe struct {
Method string
Filters []v0filter
func (sub *v0subscribe) match(e *event) bool {
detail := e.Detail()
if detail == nil {
+ debugLogf("match(%d): failed on no detail", e.LogID)
return false
}
- debugLogf("sub.match: len(funcs)==%d", len(sub.funcs))
for i, f := range sub.funcs {
if !f(e) {
- debugLogf("sub.match: failed on func %d", i)
+ debugLogf("match(%d): failed on func %d", e.LogID, i)
return false
}
}
+ debugLogf("match(%d): passed %d funcs", e.LogID, len(sub.funcs))
return true
}