8460: Add event_at and props.{new,old}.{pdh,owner} to websocket v0. Fix fuse crash...
[arvados.git] / services / ws / session_v0.go
index 41432821db186d61f56e2ad40044d578238e45dc..364555a74087461e799b6d7925dc703e4be3c188 100644 (file)
@@ -9,37 +9,39 @@ import (
        "time"
 
        "git.curoverse.com/arvados.git/sdk/go/arvados"
-       log "github.com/Sirupsen/logrus"
+       "github.com/Sirupsen/logrus"
 )
 
 var (
        errQueueFull   = errors.New("client queue full")
        errFrameTooBig = errors.New("frame too big")
 
-       sendObjectAttributes = []string{"state", "name"}
+       sendObjectAttributes = []string{"state", "name", "owner_uuid", "portable_data_hash"}
 
        v0subscribeOK   = []byte(`{"status":200}`)
        v0subscribeFail = []byte(`{"status":400}`)
 )
 
 type v0session struct {
+       ac            *arvados.Client
        ws            wsConn
        sendq         chan<- interface{}
        db            *sql.DB
        permChecker   permChecker
        subscriptions []v0subscribe
        lastMsgID     uint64
-       log           *log.Entry
+       log           *logrus.Entry
        mtx           sync.Mutex
        setupOnce     sync.Once
 }
 
-func NewSessionV0(ws wsConn, sendq chan<- interface{}, ac arvados.Client, db *sql.DB) (session, error) {
+func newSessionV0(ws wsConn, sendq chan<- interface{}, db *sql.DB, pc permChecker, ac *arvados.Client) (session, error) {
        sess := &v0session{
                sendq:       sendq,
                ws:          ws,
                db:          db,
-               permChecker: NewPermChecker(ac),
+               ac:          ac,
+               permChecker: pc,
                log:         logger(ws.Request().Context()),
        }
 
@@ -55,14 +57,11 @@ func NewSessionV0(ws wsConn, sendq chan<- interface{}, ac arvados.Client, db *sq
        return sess, nil
 }
 
-func (sess *v0session) Receive(msg map[string]interface{}, buf []byte) {
-       sess.log.WithField("data", msg).Debug("received message")
+func (sess *v0session) Receive(buf []byte) error {
        var sub v0subscribe
        if err := json.Unmarshal(buf, &sub); err != nil {
-               sess.log.WithError(err).Info("ignored invalid request")
-               return
-       }
-       if sub.Method == "subscribe" {
+               sess.log.WithError(err).Info("invalid message from client")
+       } else if sub.Method == "subscribe" {
                sub.prepare(sess)
                sess.log.WithField("sub", sub).Debug("sub prepared")
                sess.sendq <- v0subscribeOK
@@ -70,9 +69,12 @@ func (sess *v0session) Receive(msg map[string]interface{}, buf []byte) {
                sess.subscriptions = append(sess.subscriptions, sub)
                sess.mtx.Unlock()
                sub.sendOldEvents(sess)
-               return
+               return nil
+       } else {
+               sess.log.WithField("Method", sub.Method).Info("unknown method")
        }
        sess.sendq <- v0subscribeFail
+       return nil
 }
 
 func (sess *v0session) EventMessage(e *event) ([]byte, error) {
@@ -86,13 +88,16 @@ func (sess *v0session) EventMessage(e *event) ([]byte, error) {
                return nil, err
        }
 
+       kind, _ := sess.ac.KindForUUID(detail.ObjectUUID)
        msg := map[string]interface{}{
                "msgID":             atomic.AddUint64(&sess.lastMsgID, 1),
                "id":                detail.ID,
                "uuid":              detail.UUID,
                "object_uuid":       detail.ObjectUUID,
                "object_owner_uuid": detail.ObjectOwnerUUID,
+               "object_kind":       kind,
                "event_type":        detail.EventType,
+               "event_at":          detail.EventAt,
        }
        if detail.Properties != nil && detail.Properties["text"] != nil {
                msg["properties"] = detail.Properties
@@ -165,13 +170,19 @@ func (sub *v0subscribe) sendOldEvents(sess *v0session) {
                        // same thing all over again.
                        time.Sleep(100 * time.Millisecond)
                }
+               now := time.Now()
                e := &event{
                        LogID:    id,
-                       Received: time.Now(),
+                       Received: now,
+                       Ready:    now,
                        db:       sess.db,
                }
                if sub.match(sess, e) {
-                       sess.sendq <- e
+                       select {
+                       case sess.sendq <- e:
+                       case <-sess.ws.Request().Context().Done():
+                               return
+                       }
                }
        }
        if err := rows.Err(); err != nil {