X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/1a17734f7264bc74463e1e6fe115cdad6ec4c521..ba1ec0f0b59ab871b6e4faf5e8ae87809fdb85b6:/services/ws/session_v0.go diff --git a/services/ws/session_v0.go b/services/ws/session_v0.go index 41432821db..364555a740 100644 --- a/services/ws/session_v0.go +++ b/services/ws/session_v0.go @@ -9,37 +9,39 @@ import ( "time" "git.curoverse.com/arvados.git/sdk/go/arvados" - log "github.com/Sirupsen/logrus" + "github.com/Sirupsen/logrus" ) var ( errQueueFull = errors.New("client queue full") errFrameTooBig = errors.New("frame too big") - sendObjectAttributes = []string{"state", "name"} + sendObjectAttributes = []string{"state", "name", "owner_uuid", "portable_data_hash"} v0subscribeOK = []byte(`{"status":200}`) v0subscribeFail = []byte(`{"status":400}`) ) type v0session struct { + ac *arvados.Client ws wsConn sendq chan<- interface{} db *sql.DB permChecker permChecker subscriptions []v0subscribe lastMsgID uint64 - log *log.Entry + log *logrus.Entry mtx sync.Mutex setupOnce sync.Once } -func NewSessionV0(ws wsConn, sendq chan<- interface{}, ac arvados.Client, db *sql.DB) (session, error) { +func newSessionV0(ws wsConn, sendq chan<- interface{}, db *sql.DB, pc permChecker, ac *arvados.Client) (session, error) { sess := &v0session{ sendq: sendq, ws: ws, db: db, - permChecker: NewPermChecker(ac), + ac: ac, + permChecker: pc, log: logger(ws.Request().Context()), } @@ -55,14 +57,11 @@ func NewSessionV0(ws wsConn, sendq chan<- interface{}, ac arvados.Client, db *sq return sess, nil } -func (sess *v0session) Receive(msg map[string]interface{}, buf []byte) { - sess.log.WithField("data", msg).Debug("received message") +func (sess *v0session) Receive(buf []byte) error { var sub v0subscribe if err := json.Unmarshal(buf, &sub); err != nil { - sess.log.WithError(err).Info("ignored invalid request") - return - } - if sub.Method == "subscribe" { + sess.log.WithError(err).Info("invalid message from client") + } else if sub.Method == "subscribe" { sub.prepare(sess) sess.log.WithField("sub", sub).Debug("sub prepared") sess.sendq <- v0subscribeOK @@ -70,9 +69,12 @@ func (sess *v0session) Receive(msg map[string]interface{}, buf []byte) { sess.subscriptions = append(sess.subscriptions, sub) sess.mtx.Unlock() sub.sendOldEvents(sess) - return + return nil + } else { + sess.log.WithField("Method", sub.Method).Info("unknown method") } sess.sendq <- v0subscribeFail + return nil } func (sess *v0session) EventMessage(e *event) ([]byte, error) { @@ -86,13 +88,16 @@ func (sess *v0session) EventMessage(e *event) ([]byte, error) { return nil, err } + kind, _ := sess.ac.KindForUUID(detail.ObjectUUID) msg := map[string]interface{}{ "msgID": atomic.AddUint64(&sess.lastMsgID, 1), "id": detail.ID, "uuid": detail.UUID, "object_uuid": detail.ObjectUUID, "object_owner_uuid": detail.ObjectOwnerUUID, + "object_kind": kind, "event_type": detail.EventType, + "event_at": detail.EventAt, } if detail.Properties != nil && detail.Properties["text"] != nil { msg["properties"] = detail.Properties @@ -165,13 +170,19 @@ func (sub *v0subscribe) sendOldEvents(sess *v0session) { // same thing all over again. time.Sleep(100 * time.Millisecond) } + now := time.Now() e := &event{ LogID: id, - Received: time.Now(), + Received: now, + Ready: now, db: sess.db, } if sub.match(sess, e) { - sess.sendq <- e + select { + case sess.sendq <- e: + case <-sess.ws.Request().Context().Done(): + return + } } } if err := rows.Err(); err != nil {