X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/5ffb79040668114c58bf35c3e18a8302b8d94445..b9f0177e6a477a518fd5a89156fafde57f2dddf8:/services/ws/session_v0.go diff --git a/services/ws/session_v0.go b/services/ws/session_v0.go index 2035acbd5d..2b108a2b5c 100644 --- a/services/ws/session_v0.go +++ b/services/ws/session_v0.go @@ -1,13 +1,15 @@ package main import ( + "database/sql" "encoding/json" "errors" - "log" "sync" + "sync/atomic" "time" "git.curoverse.com/arvados.git/sdk/go/arvados" + "github.com/Sirupsen/logrus" ) var ( @@ -15,55 +17,64 @@ var ( errFrameTooBig = errors.New("frame too big") sendObjectAttributes = []string{"state", "name"} + + v0subscribeOK = []byte(`{"status":200}`) + v0subscribeFail = []byte(`{"status":400}`) ) type v0session struct { + ac *arvados.Client ws wsConn + sendq chan<- interface{} + db *sql.DB permChecker permChecker subscriptions []v0subscribe + lastMsgID uint64 + log *logrus.Entry mtx sync.Mutex setupOnce sync.Once } -func NewSessionV0(ws wsConn, ac arvados.Client) (session, error) { +func newSessionV0(ws wsConn, sendq chan<- interface{}, db *sql.DB, pc permChecker, ac *arvados.Client) (session, error) { sess := &v0session{ + sendq: sendq, ws: ws, - permChecker: NewPermChecker(ac), + db: db, + ac: ac, + permChecker: pc, + log: logger(ws.Request().Context()), } err := ws.Request().ParseForm() if err != nil { - log.Printf("%s ParseForm: %s", ws.Request().RemoteAddr, err) + sess.log.WithError(err).Error("ParseForm failed") return nil, err } token := ws.Request().Form.Get("api_token") sess.permChecker.SetToken(token) - sess.debugLogf("token = %+q", token) + sess.log.WithField("token", token).Debug("set token") return sess, nil } -func (sess *v0session) debugLogf(s string, args ...interface{}) { - args = append([]interface{}{sess.ws.Request().RemoteAddr}, args...) - debugLogf("%s "+s, args...) -} - -func (sess *v0session) Receive(msg map[string]interface{}, buf []byte) []byte { - sess.debugLogf("received message: %+v", msg) +func (sess *v0session) Receive(buf []byte) error { var sub v0subscribe if err := json.Unmarshal(buf, &sub); err != nil { - sess.debugLogf("ignored unrecognized request: %s", err) - return nil - } - if sub.Method == "subscribe" { - sub.prepare() - sess.debugLogf("subscription: %v", sub) + sess.log.WithError(err).Info("invalid message from client") + } else if sub.Method == "subscribe" { + sub.prepare(sess) + sess.log.WithField("sub", sub).Debug("sub prepared") + sess.sendq <- v0subscribeOK sess.mtx.Lock() sess.subscriptions = append(sess.subscriptions, sub) sess.mtx.Unlock() - return []byte(`{"status":200}`) + sub.sendOldEvents(sess) + return nil + } else { + sess.log.WithField("Method", sub.Method).Info("unknown method") } - return []byte(`{"status":400}`) + sess.sendq <- v0subscribeFail + return nil } func (sess *v0session) EventMessage(e *event) ([]byte, error) { @@ -77,12 +88,14 @@ func (sess *v0session) EventMessage(e *event) ([]byte, error) { return nil, err } + kind, _ := sess.ac.KindForUUID(detail.ObjectUUID) msg := map[string]interface{}{ - "msgID": e.Serial, + "msgID": atomic.AddUint64(&sess.lastMsgID, 1), "id": detail.ID, "uuid": detail.UUID, "object_uuid": detail.ObjectUUID, "object_owner_uuid": detail.ObjectOwnerUUID, + "object_kind": kind, "event_type": detail.EventType, } if detail.Properties != nil && detail.Properties["text"] != nil { @@ -111,37 +124,100 @@ func (sess *v0session) Filter(e *event) bool { sess.mtx.Lock() defer sess.mtx.Unlock() for _, sub := range sess.subscriptions { - if sub.match(e) { + if sub.match(sess, e) { return true } } return false } +func (sub *v0subscribe) sendOldEvents(sess *v0session) { + if sub.LastLogID == 0 { + return + } + sess.log.WithField("LastLogID", sub.LastLogID).Debug("getOldEvents") + // Here we do a "select id" query and queue an event for every + // log since the given ID, then use (*event)Detail() to + // retrieve the whole row and decide whether to send it. This + // approach is very inefficient if the subscriber asks for + // last_log_id==1, even if the filters end up matching very + // few events. + // + // To mitigate this, filter on "created > 10 minutes ago" when + // retrieving the list of old event IDs to consider. + rows, err := sess.db.Query( + `SELECT id FROM logs WHERE id > $1 AND created_at > $2 ORDER BY id`, + sub.LastLogID, + time.Now().UTC().Add(-10*time.Minute).Format(time.RFC3339Nano)) + if err != nil { + sess.log.WithError(err).Error("db.Query failed") + return + } + for rows.Next() { + var id uint64 + err := rows.Scan(&id) + if err != nil { + sess.log.WithError(err).Error("row Scan failed") + continue + } + for len(sess.sendq)*2 > cap(sess.sendq) { + // Ugly... but if we fill up the whole client + // queue with a backlog of old events, a + // single new event will overflow it and + // terminate the connection, and then the + // client will probably reconnect and do the + // same thing all over again. + time.Sleep(100 * time.Millisecond) + } + now := time.Now() + e := &event{ + LogID: id, + Received: now, + Ready: now, + db: sess.db, + } + if sub.match(sess, e) { + select { + case sess.sendq <- e: + case <-sess.ws.Request().Context().Done(): + return + } + } + } + if err := rows.Err(); err != nil { + sess.log.WithError(err).Error("db.Query failed") + } +} + type v0subscribe struct { - Method string - Filters []v0filter - funcs []func(*event) bool + Method string + Filters []v0filter + LastLogID int64 `json:"last_log_id"` + + funcs []func(*event) bool } type v0filter [3]interface{} -func (sub *v0subscribe) match(e *event) bool { +func (sub *v0subscribe) match(sess *v0session, e *event) bool { + log := sess.log.WithField("LogID", e.LogID) detail := e.Detail() if detail == nil { + log.Error("match failed, no detail") return false } - debugLogf("sub.match: len(funcs)==%d", len(sub.funcs)) + log = log.WithField("funcs", len(sub.funcs)) for i, f := range sub.funcs { if !f(e) { - debugLogf("sub.match: failed on func %d", i) + log.WithField("func", i).Debug("match failed") return false } } + log.Debug("match passed") return true } -func (sub *v0subscribe) prepare() { +func (sub *v0subscribe) prepare(sess *v0session) { for _, f := range sub.Filters { if len(f) != 3 { continue @@ -162,7 +238,6 @@ func (sub *v0subscribe) prepare() { } } sub.funcs = append(sub.funcs, func(e *event) bool { - debugLogf("event_type func: %v in %v", e.Detail().EventType, strs) for _, s := range strs { if s == e.Detail().EventType { return true @@ -181,36 +256,36 @@ func (sub *v0subscribe) prepare() { } t, err := time.Parse(time.RFC3339Nano, tstr) if err != nil { - debugLogf("time.Parse(%q): %s", tstr, err) + sess.log.WithField("data", tstr).WithError(err).Info("time.Parse failed") continue } + var fn func(*event) bool switch op { case ">=": - sub.funcs = append(sub.funcs, func(e *event) bool { - debugLogf("created_at func: %v >= %v", e.Detail().CreatedAt, t) + fn = func(e *event) bool { return !e.Detail().CreatedAt.Before(t) - }) + } case "<=": - sub.funcs = append(sub.funcs, func(e *event) bool { - debugLogf("created_at func: %v <= %v", e.Detail().CreatedAt, t) + fn = func(e *event) bool { return !e.Detail().CreatedAt.After(t) - }) + } case ">": - sub.funcs = append(sub.funcs, func(e *event) bool { - debugLogf("created_at func: %v > %v", e.Detail().CreatedAt, t) + fn = func(e *event) bool { return e.Detail().CreatedAt.After(t) - }) + } case "<": - sub.funcs = append(sub.funcs, func(e *event) bool { - debugLogf("created_at func: %v < %v", e.Detail().CreatedAt, t) + fn = func(e *event) bool { return e.Detail().CreatedAt.Before(t) - }) + } case "=": - sub.funcs = append(sub.funcs, func(e *event) bool { - debugLogf("created_at func: %v = %v", e.Detail().CreatedAt, t) + fn = func(e *event) bool { return e.Detail().CreatedAt.Equal(t) - }) + } + default: + sess.log.WithField("operator", op).Info("bogus operator") + continue } + sub.funcs = append(sub.funcs, fn) } } }