7475: Merge branch 'master' into 7475-nodemgr-unsatisfiable-job-comms
[arvados.git] / services / ws / session_v0.go
index 2bcce6073e5ffc575934d7e5ec33c55bc6616c25..58c64231cb53c1204ceed70b0ea030a7050ebb95 100644 (file)
@@ -1,3 +1,7 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
 package main
 
 import (
@@ -16,13 +20,23 @@ var (
        errQueueFull   = errors.New("client queue full")
        errFrameTooBig = errors.New("frame too big")
 
-       sendObjectAttributes = []string{"state", "name"}
+       // Send clients only these keys from the
+       // log.properties.old_attributes and
+       // log.properties.new_attributes hashes.
+       sendObjectAttributes = []string{
+               "is_trashed",
+               "name",
+               "owner_uuid",
+               "portable_data_hash",
+               "state",
+       }
 
        v0subscribeOK   = []byte(`{"status":200}`)
        v0subscribeFail = []byte(`{"status":400}`)
 )
 
 type v0session struct {
+       ac            *arvados.Client
        ws            wsConn
        sendq         chan<- interface{}
        db            *sql.DB
@@ -34,12 +48,16 @@ type v0session struct {
        setupOnce     sync.Once
 }
 
-func NewSessionV0(ws wsConn, sendq chan<- interface{}, ac arvados.Client, db *sql.DB) (session, error) {
+// newSessionV0 returns a v0 session: a partial port of the Rails/puma
+// implementation, with just enough functionality to support Workbench
+// and arv-mount.
+func newSessionV0(ws wsConn, sendq chan<- interface{}, db *sql.DB, pc permChecker, ac *arvados.Client) (session, error) {
        sess := &v0session{
                sendq:       sendq,
                ws:          ws,
                db:          db,
-               permChecker: NewPermChecker(ac),
+               ac:          ac,
+               permChecker: pc,
                log:         logger(ws.Request().Context()),
        }
 
@@ -81,18 +99,31 @@ func (sess *v0session) EventMessage(e *event) ([]byte, error) {
                return nil, nil
        }
 
-       ok, err := sess.permChecker.Check(detail.ObjectUUID)
+       var permTarget string
+       if detail.EventType == "delete" {
+               // It's pointless to check permission by reading
+               // ObjectUUID if it has just been deleted, but if the
+               // client has permission on the parent project then
+               // it's OK to send the event.
+               permTarget = detail.ObjectOwnerUUID
+       } else {
+               permTarget = detail.ObjectUUID
+       }
+       ok, err := sess.permChecker.Check(permTarget)
        if err != nil || !ok {
                return nil, err
        }
 
+       kind, _ := sess.ac.KindForUUID(detail.ObjectUUID)
        msg := map[string]interface{}{
                "msgID":             atomic.AddUint64(&sess.lastMsgID, 1),
                "id":                detail.ID,
                "uuid":              detail.UUID,
                "object_uuid":       detail.ObjectUUID,
                "object_owner_uuid": detail.ObjectOwnerUUID,
+               "object_kind":       kind,
                "event_type":        detail.EventType,
+               "event_at":          detail.EventAt,
        }
        if detail.Properties != nil && detail.Properties["text"] != nil {
                msg["properties"] = detail.Properties
@@ -131,7 +162,7 @@ func (sub *v0subscribe) sendOldEvents(sess *v0session) {
        if sub.LastLogID == 0 {
                return
        }
-       sess.log.WithField("LastLogID", sub.LastLogID).Debug("getOldEvents")
+       sess.log.WithField("LastLogID", sub.LastLogID).Debug("sendOldEvents")
        // Here we do a "select id" query and queue an event for every
        // log since the given ID, then use (*event)Detail() to
        // retrieve the whole row and decide whether to send it. This
@@ -146,16 +177,26 @@ func (sub *v0subscribe) sendOldEvents(sess *v0session) {
                sub.LastLogID,
                time.Now().UTC().Add(-10*time.Minute).Format(time.RFC3339Nano))
        if err != nil {
-               sess.log.WithError(err).Error("db.Query failed")
+               sess.log.WithError(err).Error("sendOldEvents db.Query failed")
                return
        }
+
+       var ids []uint64
        for rows.Next() {
                var id uint64
                err := rows.Scan(&id)
                if err != nil {
-                       sess.log.WithError(err).Error("row Scan failed")
+                       sess.log.WithError(err).Error("sendOldEvents row Scan failed")
                        continue
                }
+               ids = append(ids, id)
+       }
+       if err := rows.Err(); err != nil {
+               sess.log.WithError(err).Error("sendOldEvents db.Query failed")
+       }
+       rows.Close()
+
+       for _, id := range ids {
                for len(sess.sendq)*2 > cap(sess.sendq) {
                        // Ugly... but if we fill up the whole client
                        // queue with a backlog of old events, a
@@ -165,18 +206,21 @@ func (sub *v0subscribe) sendOldEvents(sess *v0session) {
                        // same thing all over again.
                        time.Sleep(100 * time.Millisecond)
                }
+               now := time.Now()
                e := &event{
                        LogID:    id,
-                       Received: time.Now(),
+                       Received: now,
+                       Ready:    now,
                        db:       sess.db,
                }
                if sub.match(sess, e) {
-                       sess.sendq <- e
+                       select {
+                       case sess.sendq <- e:
+                       case <-sess.ws.Request().Context().Done():
+                               return
+                       }
                }
        }
-       if err := rows.Err(); err != nil {
-               sess.log.WithError(err).Error("db.Query failed")
-       }
 }
 
 type v0subscribe struct {