X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/f675fb2c202516021b961b5aa2de4528ba9f0d1f..2686feb4bf08e16538c376b0dfc88580e1bd8998:/services/ws/session_v0.go diff --git a/services/ws/session_v0.go b/services/ws/session_v0.go index 33cdb2f3f0..309352b39e 100644 --- a/services/ws/session_v0.go +++ b/services/ws/session_v0.go @@ -1,76 +1,116 @@ -package main +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + +package ws import ( "database/sql" "encoding/json" "errors" - "log" + "reflect" "sync" + "sync/atomic" "time" - "git.curoverse.com/arvados.git/sdk/go/arvados" + "git.arvados.org/arvados.git/sdk/go/arvados" + "git.arvados.org/arvados.git/sdk/go/ctxlog" + "github.com/sirupsen/logrus" ) var ( errQueueFull = errors.New("client queue full") errFrameTooBig = errors.New("frame too big") - sendObjectAttributes = []string{"state", "name"} + // Send clients only these keys from the + // log.properties.old_attributes and + // log.properties.new_attributes hashes. + sendObjectAttributes = []string{ + "is_trashed", + "name", + "owner_uuid", + "portable_data_hash", + "state", + } v0subscribeOK = []byte(`{"status":200}`) v0subscribeFail = []byte(`{"status":400}`) ) type v0session struct { + ac *arvados.Client ws wsConn + sendq chan<- interface{} db *sql.DB permChecker permChecker subscriptions []v0subscribe + lastMsgID uint64 + log logrus.FieldLogger mtx sync.Mutex setupOnce sync.Once } -func NewSessionV0(ws wsConn, ac arvados.Client, db *sql.DB) (session, error) { +// newSessionV0 returns a v0 session: a partial port of the Rails/puma +// implementation, with just enough functionality to support Workbench +// and arv-mount. +func newSessionV0(ws wsConn, sendq chan<- interface{}, db *sql.DB, pc permChecker, ac *arvados.Client) (session, error) { sess := &v0session{ + sendq: sendq, ws: ws, db: db, - permChecker: NewPermChecker(ac), + ac: ac, + permChecker: pc, + log: ctxlog.FromContext(ws.Request().Context()), } err := ws.Request().ParseForm() if err != nil { - log.Printf("%s ParseForm: %s", ws.Request().RemoteAddr, err) + sess.log.WithError(err).Error("ParseForm failed") return nil, err } token := ws.Request().Form.Get("api_token") sess.permChecker.SetToken(token) - sess.debugLogf("token = %+q", token) + sess.log.WithField("token", token).Debug("set token") return sess, nil } -func (sess *v0session) debugLogf(s string, args ...interface{}) { - args = append([]interface{}{sess.ws.Request().RemoteAddr}, args...) - debugLogf("%s "+s, args...) -} - -func (sess *v0session) Receive(msg map[string]interface{}, buf []byte) [][]byte { - sess.debugLogf("received message: %+v", msg) +func (sess *v0session) Receive(buf []byte) error { var sub v0subscribe if err := json.Unmarshal(buf, &sub); err != nil { - sess.debugLogf("ignored unrecognized request: %s", err) - return nil - } - if sub.Method == "subscribe" { - sub.prepare() - sess.debugLogf("subscription: %v", sub) + sess.log.WithError(err).Info("invalid message from client") + } else if sub.Method == "subscribe" { + sub.prepare(sess) + sess.log.WithField("sub", sub).Debug("sub prepared") + sess.sendq <- v0subscribeOK sess.mtx.Lock() sess.subscriptions = append(sess.subscriptions, sub) sess.mtx.Unlock() - - return append([][]byte{v0subscribeOK}, sub.getOldEvents(sess)...) + sub.sendOldEvents(sess) + return nil + } else if sub.Method == "unsubscribe" { + sess.mtx.Lock() + found := false + for i, s := range sess.subscriptions { + if !reflect.DeepEqual(s.Filters, sub.Filters) { + continue + } + copy(sess.subscriptions[i:], sess.subscriptions[i+1:]) + sess.subscriptions = sess.subscriptions[:len(sess.subscriptions)-1] + found = true + break + } + sess.mtx.Unlock() + sess.log.WithField("sub", sub).WithField("found", found).Debug("unsubscribe") + if found { + sess.sendq <- v0subscribeOK + return nil + } + } else { + sess.log.WithField("Method", sub.Method).Info("unknown method") } - return [][]byte{v0subscribeFail} + sess.sendq <- v0subscribeFail + return nil } func (sess *v0session) EventMessage(e *event) ([]byte, error) { @@ -79,18 +119,31 @@ func (sess *v0session) EventMessage(e *event) ([]byte, error) { return nil, nil } - ok, err := sess.permChecker.Check(detail.ObjectUUID) + var permTarget string + if detail.EventType == "delete" { + // It's pointless to check permission by reading + // ObjectUUID if it has just been deleted, but if the + // client has permission on the parent project then + // it's OK to send the event. + permTarget = detail.ObjectOwnerUUID + } else { + permTarget = detail.ObjectUUID + } + ok, err := sess.permChecker.Check(sess.ws.Request().Context(), permTarget) if err != nil || !ok { return nil, err } + kind, _ := sess.ac.KindForUUID(detail.ObjectUUID) msg := map[string]interface{}{ - "msgID": e.Serial, + "msgID": atomic.AddUint64(&sess.lastMsgID, 1), "id": detail.ID, "uuid": detail.UUID, "object_uuid": detail.ObjectUUID, "object_owner_uuid": detail.ObjectOwnerUUID, + "object_kind": kind, "event_type": detail.EventType, + "event_at": detail.EventAt, } if detail.Properties != nil && detail.Properties["text"] != nil { msg["properties"] = detail.Properties @@ -118,18 +171,18 @@ func (sess *v0session) Filter(e *event) bool { sess.mtx.Lock() defer sess.mtx.Unlock() for _, sub := range sess.subscriptions { - if sub.match(e) { + if sub.match(sess, e) { return true } } return false } -func (sub *v0subscribe) getOldEvents(sess *v0session) (msgs [][]byte) { +func (sub *v0subscribe) sendOldEvents(sess *v0session) { if sub.LastLogID == 0 { return } - debugLogf("getOldEvents(%d)", sub.LastLogID) + sess.log.WithField("LastLogID", sub.LastLogID).Debug("sendOldEvents") // Here we do a "select id" query and queue an event for every // log since the given ID, then use (*event)Detail() to // retrieve the whole row and decide whether to send it. This @@ -144,37 +197,54 @@ func (sub *v0subscribe) getOldEvents(sess *v0session) (msgs [][]byte) { sub.LastLogID, time.Now().UTC().Add(-10*time.Minute).Format(time.RFC3339Nano)) if err != nil { - errorLogf("db.Query: %s", err) + sess.log.WithError(err).Error("sendOldEvents db.Query failed") return } + + var ids []uint64 for rows.Next() { var id uint64 err := rows.Scan(&id) if err != nil { - errorLogf("Scan: %s", err) + sess.log.WithError(err).Error("sendOldEvents row Scan failed") continue } + ids = append(ids, id) + } + if err := rows.Err(); err != nil { + sess.log.WithError(err).Error("sendOldEvents db.Query failed") + } + rows.Close() + + for _, id := range ids { + for len(sess.sendq)*2 > cap(sess.sendq) { + // Ugly... but if we fill up the whole client + // queue with a backlog of old events, a + // single new event will overflow it and + // terminate the connection, and then the + // client will probably reconnect and do the + // same thing all over again. + time.Sleep(100 * time.Millisecond) + if sess.ws.Request().Context().Err() != nil { + // Session terminated while we were sleeping + return + } + } + now := time.Now() e := &event{ LogID: id, - Received: time.Now(), + Received: now, + Ready: now, db: sess.db, } - if !sub.match(e) { - debugLogf("skip old event %+v", e) - continue - } - msg, err := sess.EventMessage(e) - if err != nil { - debugLogf("event marshal: %s", err) - continue + if sub.match(sess, e) { + select { + case sess.sendq <- e: + case <-sess.ws.Request().Context().Done(): + return + } } - debugLogf("old event: %s", string(msg)) - msgs = append(msgs, msg) - } - if err := rows.Err(); err != nil { - errorLogf("db.Query: %s", err) } - return } type v0subscribe struct { @@ -187,23 +257,25 @@ type v0subscribe struct { type v0filter [3]interface{} -func (sub *v0subscribe) match(e *event) bool { +func (sub *v0subscribe) match(sess *v0session, e *event) bool { + log := sess.log.WithField("LogID", e.LogID) detail := e.Detail() if detail == nil { - debugLogf("match(%d): failed on no detail", e.LogID) + log.Error("match failed, no detail") return false } + log = log.WithField("funcs", len(sub.funcs)) for i, f := range sub.funcs { if !f(e) { - debugLogf("match(%d): failed on func %d", e.LogID, i) + log.WithField("func", i).Debug("match failed") return false } } - debugLogf("match(%d): passed %d funcs", e.LogID, len(sub.funcs)) + log.Debug("match passed") return true } -func (sub *v0subscribe) prepare() { +func (sub *v0subscribe) prepare(sess *v0session) { for _, f := range sub.Filters { if len(f) != 3 { continue @@ -224,7 +296,6 @@ func (sub *v0subscribe) prepare() { } } sub.funcs = append(sub.funcs, func(e *event) bool { - debugLogf("event_type func: %v in %v", e.Detail().EventType, strs) for _, s := range strs { if s == e.Detail().EventType { return true @@ -243,36 +314,36 @@ func (sub *v0subscribe) prepare() { } t, err := time.Parse(time.RFC3339Nano, tstr) if err != nil { - debugLogf("time.Parse(%q): %s", tstr, err) + sess.log.WithField("data", tstr).WithError(err).Info("time.Parse failed") continue } + var fn func(*event) bool switch op { case ">=": - sub.funcs = append(sub.funcs, func(e *event) bool { - debugLogf("created_at func: %v >= %v", e.Detail().CreatedAt, t) + fn = func(e *event) bool { return !e.Detail().CreatedAt.Before(t) - }) + } case "<=": - sub.funcs = append(sub.funcs, func(e *event) bool { - debugLogf("created_at func: %v <= %v", e.Detail().CreatedAt, t) + fn = func(e *event) bool { return !e.Detail().CreatedAt.After(t) - }) + } case ">": - sub.funcs = append(sub.funcs, func(e *event) bool { - debugLogf("created_at func: %v > %v", e.Detail().CreatedAt, t) + fn = func(e *event) bool { return e.Detail().CreatedAt.After(t) - }) + } case "<": - sub.funcs = append(sub.funcs, func(e *event) bool { - debugLogf("created_at func: %v < %v", e.Detail().CreatedAt, t) + fn = func(e *event) bool { return e.Detail().CreatedAt.Before(t) - }) + } case "=": - sub.funcs = append(sub.funcs, func(e *event) bool { - debugLogf("created_at func: %v = %v", e.Detail().CreatedAt, t) + fn = func(e *event) bool { return e.Detail().CreatedAt.Equal(t) - }) + } + default: + sess.log.WithField("operator", op).Info("bogus operator") + continue } + sub.funcs = append(sub.funcs, fn) } } }