X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/c52c3788fbbf161db40118261c4cfff52ebf8ceb..5b54a802c12f1bfd78fe08a3015870f85886032d:/services/ws/session_v0.go?ds=sidebyside diff --git a/services/ws/session_v0.go b/services/ws/session_v0.go index 2bcce6073e..58c64231cb 100644 --- a/services/ws/session_v0.go +++ b/services/ws/session_v0.go @@ -1,3 +1,7 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + package main import ( @@ -16,13 +20,23 @@ var ( errQueueFull = errors.New("client queue full") errFrameTooBig = errors.New("frame too big") - sendObjectAttributes = []string{"state", "name"} + // Send clients only these keys from the + // log.properties.old_attributes and + // log.properties.new_attributes hashes. + sendObjectAttributes = []string{ + "is_trashed", + "name", + "owner_uuid", + "portable_data_hash", + "state", + } v0subscribeOK = []byte(`{"status":200}`) v0subscribeFail = []byte(`{"status":400}`) ) type v0session struct { + ac *arvados.Client ws wsConn sendq chan<- interface{} db *sql.DB @@ -34,12 +48,16 @@ type v0session struct { setupOnce sync.Once } -func NewSessionV0(ws wsConn, sendq chan<- interface{}, ac arvados.Client, db *sql.DB) (session, error) { +// newSessionV0 returns a v0 session: a partial port of the Rails/puma +// implementation, with just enough functionality to support Workbench +// and arv-mount. +func newSessionV0(ws wsConn, sendq chan<- interface{}, db *sql.DB, pc permChecker, ac *arvados.Client) (session, error) { sess := &v0session{ sendq: sendq, ws: ws, db: db, - permChecker: NewPermChecker(ac), + ac: ac, + permChecker: pc, log: logger(ws.Request().Context()), } @@ -81,18 +99,31 @@ func (sess *v0session) EventMessage(e *event) ([]byte, error) { return nil, nil } - ok, err := sess.permChecker.Check(detail.ObjectUUID) + var permTarget string + if detail.EventType == "delete" { + // It's pointless to check permission by reading + // ObjectUUID if it has just been deleted, but if the + // client has permission on the parent project then + // it's OK to send the event. + permTarget = detail.ObjectOwnerUUID + } else { + permTarget = detail.ObjectUUID + } + ok, err := sess.permChecker.Check(permTarget) if err != nil || !ok { return nil, err } + kind, _ := sess.ac.KindForUUID(detail.ObjectUUID) msg := map[string]interface{}{ "msgID": atomic.AddUint64(&sess.lastMsgID, 1), "id": detail.ID, "uuid": detail.UUID, "object_uuid": detail.ObjectUUID, "object_owner_uuid": detail.ObjectOwnerUUID, + "object_kind": kind, "event_type": detail.EventType, + "event_at": detail.EventAt, } if detail.Properties != nil && detail.Properties["text"] != nil { msg["properties"] = detail.Properties @@ -131,7 +162,7 @@ func (sub *v0subscribe) sendOldEvents(sess *v0session) { if sub.LastLogID == 0 { return } - sess.log.WithField("LastLogID", sub.LastLogID).Debug("getOldEvents") + sess.log.WithField("LastLogID", sub.LastLogID).Debug("sendOldEvents") // Here we do a "select id" query and queue an event for every // log since the given ID, then use (*event)Detail() to // retrieve the whole row and decide whether to send it. This @@ -146,16 +177,26 @@ func (sub *v0subscribe) sendOldEvents(sess *v0session) { sub.LastLogID, time.Now().UTC().Add(-10*time.Minute).Format(time.RFC3339Nano)) if err != nil { - sess.log.WithError(err).Error("db.Query failed") + sess.log.WithError(err).Error("sendOldEvents db.Query failed") return } + + var ids []uint64 for rows.Next() { var id uint64 err := rows.Scan(&id) if err != nil { - sess.log.WithError(err).Error("row Scan failed") + sess.log.WithError(err).Error("sendOldEvents row Scan failed") continue } + ids = append(ids, id) + } + if err := rows.Err(); err != nil { + sess.log.WithError(err).Error("sendOldEvents db.Query failed") + } + rows.Close() + + for _, id := range ids { for len(sess.sendq)*2 > cap(sess.sendq) { // Ugly... but if we fill up the whole client // queue with a backlog of old events, a @@ -165,18 +206,21 @@ func (sub *v0subscribe) sendOldEvents(sess *v0session) { // same thing all over again. time.Sleep(100 * time.Millisecond) } + now := time.Now() e := &event{ LogID: id, - Received: time.Now(), + Received: now, + Ready: now, db: sess.db, } if sub.match(sess, e) { - sess.sendq <- e + select { + case sess.sendq <- e: + case <-sess.ws.Request().Context().Done(): + return + } } } - if err := rows.Err(); err != nil { - sess.log.WithError(err).Error("db.Query failed") - } } type v0subscribe struct {