1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
16 "git.arvados.org/arvados.git/sdk/go/arvados"
17 "git.arvados.org/arvados.git/sdk/go/ctxlog"
18 "github.com/sirupsen/logrus"
22 errQueueFull = errors.New("client queue full")
23 errFrameTooBig = errors.New("frame too big")
25 // Send clients only these keys from the
26 // log.properties.old_attributes and
27 // log.properties.new_attributes hashes.
28 sendObjectAttributes = []string{
33 "requesting_container_uuid",
37 v0subscribeOK = []byte(`{"status":200}`)
38 v0subscribeFail = []byte(`{"status":400}`)
41 type v0session struct {
44 sendq chan<- interface{}
46 permChecker permChecker
47 subscriptions []v0subscribe
49 log logrus.FieldLogger
54 // newSessionV0 returns a v0 session: a partial port of the Rails/puma
55 // implementation, with just enough functionality to support Workbench
57 func newSessionV0(ws wsConn, sendq chan<- interface{}, db *sql.DB, pc permChecker, ac *arvados.Client) (session, error) {
64 log: ctxlog.FromContext(ws.Request().Context()),
67 err := ws.Request().ParseForm()
69 sess.log.WithError(err).Error("ParseForm failed")
72 token := ws.Request().Form.Get("api_token")
73 sess.permChecker.SetToken(token)
74 sess.log.WithField("token", token).Debug("set token")
79 func (sess *v0session) Receive(buf []byte) error {
81 if err := json.Unmarshal(buf, &sub); err != nil {
82 sess.log.WithError(err).Info("invalid message from client")
83 } else if sub.Method == "subscribe" {
85 sess.log.WithField("sub", sub).Debug("sub prepared")
86 sess.sendq <- v0subscribeOK
88 sess.subscriptions = append(sess.subscriptions, sub)
90 sub.sendOldEvents(sess)
92 } else if sub.Method == "unsubscribe" {
95 for i, s := range sess.subscriptions {
96 if !reflect.DeepEqual(s.Filters, sub.Filters) {
99 copy(sess.subscriptions[i:], sess.subscriptions[i+1:])
100 sess.subscriptions = sess.subscriptions[:len(sess.subscriptions)-1]
105 sess.log.WithField("sub", sub).WithField("found", found).Debug("unsubscribe")
107 sess.sendq <- v0subscribeOK
111 sess.log.WithField("Method", sub.Method).Info("unknown method")
113 sess.sendq <- v0subscribeFail
117 func (sess *v0session) EventMessage(e *event) ([]byte, error) {
123 var permTarget string
124 if detail.EventType == "delete" {
125 // It's pointless to check permission by reading
126 // ObjectUUID if it has just been deleted, but if the
127 // client has permission on the parent project then
128 // it's OK to send the event.
129 permTarget = detail.ObjectOwnerUUID
131 permTarget = detail.ObjectUUID
133 ok, err := sess.permChecker.Check(sess.ws.Request().Context(), permTarget)
134 if err != nil || !ok {
138 kind, _ := sess.ac.KindForUUID(detail.ObjectUUID)
139 msg := map[string]interface{}{
140 "msgID": atomic.AddUint64(&sess.lastMsgID, 1),
143 "object_uuid": detail.ObjectUUID,
144 "object_owner_uuid": detail.ObjectOwnerUUID,
146 "event_type": detail.EventType,
147 "event_at": detail.EventAt,
149 if detail.Properties != nil && detail.Properties["text"] != nil {
150 msg["properties"] = detail.Properties
152 msgProps := map[string]map[string]interface{}{}
153 for _, ak := range []string{"old_attributes", "new_attributes"} {
154 eventAttrs, ok := detail.Properties[ak].(map[string]interface{})
158 msgAttrs := map[string]interface{}{}
159 for _, k := range sendObjectAttributes {
160 if v, ok := eventAttrs[k]; ok {
164 msgProps[ak] = msgAttrs
166 msg["properties"] = msgProps
168 return json.Marshal(msg)
171 func (sess *v0session) Filter(e *event) bool {
173 defer sess.mtx.Unlock()
174 for _, sub := range sess.subscriptions {
175 if sub.match(sess, e) {
182 func (sub *v0subscribe) sendOldEvents(sess *v0session) {
183 if sub.LastLogID == 0 {
186 sess.log.WithField("LastLogID", sub.LastLogID).Debug("sendOldEvents")
187 // Here we do a "select id" query and queue an event for every
188 // log since the given ID, then use (*event)Detail() to
189 // retrieve the whole row and decide whether to send it. This
190 // approach is very inefficient if the subscriber asks for
191 // last_log_id==1, even if the filters end up matching very
194 // To mitigate this, filter on "created > 10 minutes ago" when
195 // retrieving the list of old event IDs to consider.
196 rows, err := sess.db.Query(
197 `SELECT id FROM logs WHERE id > $1 AND created_at > $2 ORDER BY id`,
199 time.Now().UTC().Add(-10*time.Minute).Format(time.RFC3339Nano))
201 sess.log.WithError(err).Error("sendOldEvents db.Query failed")
208 err := rows.Scan(&id)
210 sess.log.WithError(err).Error("sendOldEvents row Scan failed")
213 ids = append(ids, id)
215 if err := rows.Err(); err != nil {
216 sess.log.WithError(err).Error("sendOldEvents db.Query failed")
220 for _, id := range ids {
221 for len(sess.sendq)*2 > cap(sess.sendq) {
222 // Ugly... but if we fill up the whole client
223 // queue with a backlog of old events, a
224 // single new event will overflow it and
225 // terminate the connection, and then the
226 // client will probably reconnect and do the
227 // same thing all over again.
228 time.Sleep(100 * time.Millisecond)
229 if sess.ws.Request().Context().Err() != nil {
230 // Session terminated while we were sleeping
241 if sub.match(sess, e) {
243 case sess.sendq <- e:
244 case <-sess.ws.Request().Context().Done():
251 type v0subscribe struct {
254 LastLogID int64 `json:"last_log_id"`
256 funcs []func(*event) bool
259 type v0filter [3]interface{}
261 func (sub *v0subscribe) match(sess *v0session, e *event) bool {
262 log := sess.log.WithField("LogID", e.LogID)
265 log.Error("match failed, no detail")
268 log = log.WithField("funcs", len(sub.funcs))
269 for i, f := range sub.funcs {
271 log.WithField("func", i).Debug("match failed")
275 log.Debug("match passed")
279 func (sub *v0subscribe) prepare(sess *v0session) {
280 for _, f := range sub.Filters {
284 if col, ok := f[0].(string); ok && col == "event_type" {
285 op, ok := f[1].(string)
286 if !ok || op != "in" {
289 arr, ok := f[2].([]interface{})
294 for _, s := range arr {
295 if s, ok := s.(string); ok {
296 strs = append(strs, s)
299 sub.funcs = append(sub.funcs, func(e *event) bool {
300 for _, s := range strs {
301 if s == e.Detail().EventType {
307 } else if ok && col == "created_at" {
308 op, ok := f[1].(string)
312 tstr, ok := f[2].(string)
316 t, err := time.Parse(time.RFC3339Nano, tstr)
318 sess.log.WithField("data", tstr).WithError(err).Info("time.Parse failed")
321 var fn func(*event) bool
324 fn = func(e *event) bool {
325 return !e.Detail().CreatedAt.Before(t)
328 fn = func(e *event) bool {
329 return !e.Detail().CreatedAt.After(t)
332 fn = func(e *event) bool {
333 return e.Detail().CreatedAt.After(t)
336 fn = func(e *event) bool {
337 return e.Detail().CreatedAt.Before(t)
340 fn = func(e *event) bool {
341 return e.Detail().CreatedAt.Equal(t)
344 sess.log.WithField("operator", op).Info("bogus operator")
347 sub.funcs = append(sub.funcs, fn)