+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
package main
import (
"database/sql"
"encoding/json"
"errors"
+ "reflect"
"sync"
"sync/atomic"
"time"
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
"github.com/Sirupsen/logrus"
)
errQueueFull = errors.New("client queue full")
errFrameTooBig = errors.New("frame too big")
- sendObjectAttributes = []string{"state", "name"}
+ // Send clients only these keys from the
+ // log.properties.old_attributes and
+ // log.properties.new_attributes hashes.
+ sendObjectAttributes = []string{
+ "is_trashed",
+ "name",
+ "owner_uuid",
+ "portable_data_hash",
+ "state",
+ }
v0subscribeOK = []byte(`{"status":200}`)
v0subscribeFail = []byte(`{"status":400}`)
)
type v0session struct {
+ ac *arvados.Client
ws wsConn
sendq chan<- interface{}
db *sql.DB
setupOnce sync.Once
}
-func NewSessionV0(ws wsConn, sendq chan<- interface{}, db *sql.DB, pc permChecker) (session, error) {
+// newSessionV0 returns a v0 session: a partial port of the Rails/puma
+// implementation, with just enough functionality to support Workbench
+// and arv-mount.
+func newSessionV0(ws wsConn, sendq chan<- interface{}, db *sql.DB, pc permChecker, ac *arvados.Client) (session, error) {
sess := &v0session{
sendq: sendq,
ws: ws,
db: db,
+ ac: ac,
permChecker: pc,
log: logger(ws.Request().Context()),
}
sess.mtx.Unlock()
sub.sendOldEvents(sess)
return nil
+ } else if sub.Method == "unsubscribe" {
+ sess.mtx.Lock()
+ found := false
+ for i, s := range sess.subscriptions {
+ if !reflect.DeepEqual(s.Filters, sub.Filters) {
+ continue
+ }
+ copy(sess.subscriptions[i:], sess.subscriptions[i+1:])
+ sess.subscriptions = sess.subscriptions[:len(sess.subscriptions)-1]
+ found = true
+ break
+ }
+ sess.mtx.Unlock()
+ sess.log.WithField("sub", sub).WithField("found", found).Debug("unsubscribe")
+ if found {
+ sess.sendq <- v0subscribeOK
+ return nil
+ }
} else {
sess.log.WithField("Method", sub.Method).Info("unknown method")
}
return nil, nil
}
- ok, err := sess.permChecker.Check(detail.ObjectUUID)
+ var permTarget string
+ if detail.EventType == "delete" {
+ // It's pointless to check permission by reading
+ // ObjectUUID if it has just been deleted, but if the
+ // client has permission on the parent project then
+ // it's OK to send the event.
+ permTarget = detail.ObjectOwnerUUID
+ } else {
+ permTarget = detail.ObjectUUID
+ }
+ ok, err := sess.permChecker.Check(permTarget)
if err != nil || !ok {
return nil, err
}
+ kind, _ := sess.ac.KindForUUID(detail.ObjectUUID)
msg := map[string]interface{}{
"msgID": atomic.AddUint64(&sess.lastMsgID, 1),
"id": detail.ID,
"uuid": detail.UUID,
"object_uuid": detail.ObjectUUID,
"object_owner_uuid": detail.ObjectOwnerUUID,
+ "object_kind": kind,
"event_type": detail.EventType,
+ "event_at": detail.EventAt,
}
if detail.Properties != nil && detail.Properties["text"] != nil {
msg["properties"] = detail.Properties
if sub.LastLogID == 0 {
return
}
- sess.log.WithField("LastLogID", sub.LastLogID).Debug("getOldEvents")
+ sess.log.WithField("LastLogID", sub.LastLogID).Debug("sendOldEvents")
// Here we do a "select id" query and queue an event for every
// log since the given ID, then use (*event)Detail() to
// retrieve the whole row and decide whether to send it. This
sub.LastLogID,
time.Now().UTC().Add(-10*time.Minute).Format(time.RFC3339Nano))
if err != nil {
- sess.log.WithError(err).Error("db.Query failed")
+ sess.log.WithError(err).Error("sendOldEvents db.Query failed")
return
}
+
+ var ids []uint64
for rows.Next() {
var id uint64
err := rows.Scan(&id)
if err != nil {
- sess.log.WithError(err).Error("row Scan failed")
+ sess.log.WithError(err).Error("sendOldEvents row Scan failed")
continue
}
+ ids = append(ids, id)
+ }
+ if err := rows.Err(); err != nil {
+ sess.log.WithError(err).Error("sendOldEvents db.Query failed")
+ }
+ rows.Close()
+
+ for _, id := range ids {
for len(sess.sendq)*2 > cap(sess.sendq) {
// Ugly... but if we fill up the whole client
// queue with a backlog of old events, a
// client will probably reconnect and do the
// same thing all over again.
time.Sleep(100 * time.Millisecond)
+ if sess.ws.Request().Context().Err() != nil {
+ // Session terminated while we were sleeping
+ return
+ }
}
now := time.Now()
e := &event{
}
}
}
- if err := rows.Err(); err != nil {
- sess.log.WithError(err).Error("db.Query failed")
- }
}
type v0subscribe struct {