1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
15 "git.curoverse.com/arvados.git/sdk/go/arvados"
16 "github.com/Sirupsen/logrus"
20 errQueueFull = errors.New("client queue full")
21 errFrameTooBig = errors.New("frame too big")
23 // Send clients only these keys from the
24 // log.properties.old_attributes and
25 // log.properties.new_attributes hashes.
26 sendObjectAttributes = []string{
34 v0subscribeOK = []byte(`{"status":200}`)
35 v0subscribeFail = []byte(`{"status":400}`)
38 type v0session struct {
41 sendq chan<- interface{}
43 permChecker permChecker
44 subscriptions []v0subscribe
51 // newSessionV0 returns a v0 session: a partial port of the Rails/puma
52 // implementation, with just enough functionality to support Workbench
54 func newSessionV0(ws wsConn, sendq chan<- interface{}, db *sql.DB, pc permChecker, ac *arvados.Client) (session, error) {
61 log: logger(ws.Request().Context()),
64 err := ws.Request().ParseForm()
66 sess.log.WithError(err).Error("ParseForm failed")
69 token := ws.Request().Form.Get("api_token")
70 sess.permChecker.SetToken(token)
71 sess.log.WithField("token", token).Debug("set token")
76 func (sess *v0session) Receive(buf []byte) error {
78 if err := json.Unmarshal(buf, &sub); err != nil {
79 sess.log.WithError(err).Info("invalid message from client")
80 } else if sub.Method == "subscribe" {
82 sess.log.WithField("sub", sub).Debug("sub prepared")
83 sess.sendq <- v0subscribeOK
85 sess.subscriptions = append(sess.subscriptions, sub)
87 sub.sendOldEvents(sess)
90 sess.log.WithField("Method", sub.Method).Info("unknown method")
92 sess.sendq <- v0subscribeFail
96 func (sess *v0session) EventMessage(e *event) ([]byte, error) {
102 ok, err := sess.permChecker.Check(detail.ObjectUUID)
103 if err != nil || !ok {
107 kind, _ := sess.ac.KindForUUID(detail.ObjectUUID)
108 msg := map[string]interface{}{
109 "msgID": atomic.AddUint64(&sess.lastMsgID, 1),
112 "object_uuid": detail.ObjectUUID,
113 "object_owner_uuid": detail.ObjectOwnerUUID,
115 "event_type": detail.EventType,
116 "event_at": detail.EventAt,
118 if detail.Properties != nil && detail.Properties["text"] != nil {
119 msg["properties"] = detail.Properties
121 msgProps := map[string]map[string]interface{}{}
122 for _, ak := range []string{"old_attributes", "new_attributes"} {
123 eventAttrs, ok := detail.Properties[ak].(map[string]interface{})
127 msgAttrs := map[string]interface{}{}
128 for _, k := range sendObjectAttributes {
129 if v, ok := eventAttrs[k]; ok {
133 msgProps[ak] = msgAttrs
135 msg["properties"] = msgProps
137 return json.Marshal(msg)
140 func (sess *v0session) Filter(e *event) bool {
142 defer sess.mtx.Unlock()
143 for _, sub := range sess.subscriptions {
144 if sub.match(sess, e) {
151 func (sub *v0subscribe) sendOldEvents(sess *v0session) {
152 if sub.LastLogID == 0 {
155 sess.log.WithField("LastLogID", sub.LastLogID).Debug("getOldEvents")
156 // Here we do a "select id" query and queue an event for every
157 // log since the given ID, then use (*event)Detail() to
158 // retrieve the whole row and decide whether to send it. This
159 // approach is very inefficient if the subscriber asks for
160 // last_log_id==1, even if the filters end up matching very
163 // To mitigate this, filter on "created > 10 minutes ago" when
164 // retrieving the list of old event IDs to consider.
165 rows, err := sess.db.Query(
166 `SELECT id FROM logs WHERE id > $1 AND created_at > $2 ORDER BY id`,
168 time.Now().UTC().Add(-10*time.Minute).Format(time.RFC3339Nano))
170 sess.log.WithError(err).Error("db.Query failed")
176 err := rows.Scan(&id)
178 sess.log.WithError(err).Error("row Scan failed")
181 for len(sess.sendq)*2 > cap(sess.sendq) {
182 // Ugly... but if we fill up the whole client
183 // queue with a backlog of old events, a
184 // single new event will overflow it and
185 // terminate the connection, and then the
186 // client will probably reconnect and do the
187 // same thing all over again.
188 time.Sleep(100 * time.Millisecond)
197 if sub.match(sess, e) {
199 case sess.sendq <- e:
200 case <-sess.ws.Request().Context().Done():
205 if err := rows.Err(); err != nil {
206 sess.log.WithError(err).Error("db.Query failed")
210 type v0subscribe struct {
213 LastLogID int64 `json:"last_log_id"`
215 funcs []func(*event) bool
218 type v0filter [3]interface{}
220 func (sub *v0subscribe) match(sess *v0session, e *event) bool {
221 log := sess.log.WithField("LogID", e.LogID)
224 log.Error("match failed, no detail")
227 log = log.WithField("funcs", len(sub.funcs))
228 for i, f := range sub.funcs {
230 log.WithField("func", i).Debug("match failed")
234 log.Debug("match passed")
238 func (sub *v0subscribe) prepare(sess *v0session) {
239 for _, f := range sub.Filters {
243 if col, ok := f[0].(string); ok && col == "event_type" {
244 op, ok := f[1].(string)
245 if !ok || op != "in" {
248 arr, ok := f[2].([]interface{})
253 for _, s := range arr {
254 if s, ok := s.(string); ok {
255 strs = append(strs, s)
258 sub.funcs = append(sub.funcs, func(e *event) bool {
259 for _, s := range strs {
260 if s == e.Detail().EventType {
266 } else if ok && col == "created_at" {
267 op, ok := f[1].(string)
271 tstr, ok := f[2].(string)
275 t, err := time.Parse(time.RFC3339Nano, tstr)
277 sess.log.WithField("data", tstr).WithError(err).Info("time.Parse failed")
280 var fn func(*event) bool
283 fn = func(e *event) bool {
284 return !e.Detail().CreatedAt.Before(t)
287 fn = func(e *event) bool {
288 return !e.Detail().CreatedAt.After(t)
291 fn = func(e *event) bool {
292 return e.Detail().CreatedAt.After(t)
295 fn = func(e *event) bool {
296 return e.Detail().CreatedAt.Before(t)
299 fn = func(e *event) bool {
300 return e.Detail().CreatedAt.Equal(t)
303 sess.log.WithField("operator", op).Info("bogus operator")
306 sub.funcs = append(sub.funcs, fn)