22235: adjusted collection test
[arvados.git] / services / ws / session_v0.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package ws
6
7 import (
8         "database/sql"
9         "encoding/json"
10         "errors"
11         "reflect"
12         "sync"
13         "sync/atomic"
14         "time"
15
16         "git.arvados.org/arvados.git/sdk/go/arvados"
17         "git.arvados.org/arvados.git/sdk/go/ctxlog"
18         "github.com/sirupsen/logrus"
19 )
20
21 var (
22         errQueueFull   = errors.New("client queue full")
23         errFrameTooBig = errors.New("frame too big")
24
25         // Send clients only these keys from the
26         // log.properties.old_attributes and
27         // log.properties.new_attributes hashes.
28         sendObjectAttributes = []string{
29                 "is_trashed",
30                 "name",
31                 "owner_uuid",
32                 "portable_data_hash",
33                 "requesting_container_uuid",
34                 "state",
35         }
36
37         v0subscribeOK   = []byte(`{"status":200}`)
38         v0subscribeFail = []byte(`{"status":400}`)
39 )
40
41 type v0session struct {
42         ac            *arvados.Client
43         ws            wsConn
44         sendq         chan<- interface{}
45         db            *sql.DB
46         permChecker   permChecker
47         subscriptions []v0subscribe
48         lastMsgID     uint64
49         log           logrus.FieldLogger
50         mtx           sync.Mutex
51         setupOnce     sync.Once
52 }
53
54 // newSessionV0 returns a v0 session: a partial port of the Rails/puma
55 // implementation, with just enough functionality to support Workbench
56 // and arv-mount.
57 func newSessionV0(ws wsConn, sendq chan<- interface{}, db *sql.DB, pc permChecker, ac *arvados.Client) (session, error) {
58         sess := &v0session{
59                 sendq:       sendq,
60                 ws:          ws,
61                 db:          db,
62                 ac:          ac,
63                 permChecker: pc,
64                 log:         ctxlog.FromContext(ws.Request().Context()),
65         }
66
67         err := ws.Request().ParseForm()
68         if err != nil {
69                 sess.log.WithError(err).Error("ParseForm failed")
70                 return nil, err
71         }
72         token := ws.Request().Form.Get("api_token")
73         sess.permChecker.SetToken(token)
74         sess.log.WithField("token", token).Debug("set token")
75
76         return sess, nil
77 }
78
79 func (sess *v0session) Receive(buf []byte) error {
80         var sub v0subscribe
81         if err := json.Unmarshal(buf, &sub); err != nil {
82                 sess.log.WithError(err).Info("invalid message from client")
83         } else if sub.Method == "subscribe" {
84                 sub.prepare(sess)
85                 sess.log.WithField("sub", sub).Debug("sub prepared")
86                 sess.sendq <- v0subscribeOK
87                 sess.mtx.Lock()
88                 sess.subscriptions = append(sess.subscriptions, sub)
89                 sess.mtx.Unlock()
90                 sub.sendOldEvents(sess)
91                 return nil
92         } else if sub.Method == "unsubscribe" {
93                 sess.mtx.Lock()
94                 found := false
95                 for i, s := range sess.subscriptions {
96                         if !reflect.DeepEqual(s.Filters, sub.Filters) {
97                                 continue
98                         }
99                         copy(sess.subscriptions[i:], sess.subscriptions[i+1:])
100                         sess.subscriptions = sess.subscriptions[:len(sess.subscriptions)-1]
101                         found = true
102                         break
103                 }
104                 sess.mtx.Unlock()
105                 sess.log.WithField("sub", sub).WithField("found", found).Debug("unsubscribe")
106                 if found {
107                         sess.sendq <- v0subscribeOK
108                         return nil
109                 }
110         } else {
111                 sess.log.WithField("Method", sub.Method).Info("unknown method")
112         }
113         sess.sendq <- v0subscribeFail
114         return nil
115 }
116
117 func (sess *v0session) EventMessage(e *event) ([]byte, error) {
118         detail := e.Detail()
119         if detail == nil {
120                 return nil, nil
121         }
122
123         var permTarget string
124         if detail.EventType == "delete" {
125                 // It's pointless to check permission by reading
126                 // ObjectUUID if it has just been deleted, but if the
127                 // client has permission on the parent project then
128                 // it's OK to send the event.
129                 permTarget = detail.ObjectOwnerUUID
130         } else {
131                 permTarget = detail.ObjectUUID
132         }
133         ok, err := sess.permChecker.Check(sess.ws.Request().Context(), permTarget)
134         if err != nil || !ok {
135                 return nil, err
136         }
137
138         kind, _ := sess.ac.KindForUUID(detail.ObjectUUID)
139         msg := map[string]interface{}{
140                 "msgID":             atomic.AddUint64(&sess.lastMsgID, 1),
141                 "id":                detail.ID,
142                 "uuid":              detail.UUID,
143                 "object_uuid":       detail.ObjectUUID,
144                 "object_owner_uuid": detail.ObjectOwnerUUID,
145                 "object_kind":       kind,
146                 "event_type":        detail.EventType,
147                 "event_at":          detail.EventAt,
148         }
149         if detail.Properties != nil && detail.Properties["text"] != nil {
150                 msg["properties"] = detail.Properties
151         } else {
152                 msgProps := map[string]map[string]interface{}{}
153                 for _, ak := range []string{"old_attributes", "new_attributes"} {
154                         eventAttrs, ok := detail.Properties[ak].(map[string]interface{})
155                         if !ok {
156                                 continue
157                         }
158                         msgAttrs := map[string]interface{}{}
159                         for _, k := range sendObjectAttributes {
160                                 if v, ok := eventAttrs[k]; ok {
161                                         msgAttrs[k] = v
162                                 }
163                         }
164                         msgProps[ak] = msgAttrs
165                 }
166                 msg["properties"] = msgProps
167         }
168         return json.Marshal(msg)
169 }
170
171 func (sess *v0session) Filter(e *event) bool {
172         sess.mtx.Lock()
173         defer sess.mtx.Unlock()
174         for _, sub := range sess.subscriptions {
175                 if sub.match(sess, e) {
176                         return true
177                 }
178         }
179         return false
180 }
181
182 func (sub *v0subscribe) sendOldEvents(sess *v0session) {
183         if sub.LastLogID == 0 {
184                 return
185         }
186         sess.log.WithField("LastLogID", sub.LastLogID).Debug("sendOldEvents")
187         // Here we do a "select id" query and queue an event for every
188         // log since the given ID, then use (*event)Detail() to
189         // retrieve the whole row and decide whether to send it. This
190         // approach is very inefficient if the subscriber asks for
191         // last_log_id==1, even if the filters end up matching very
192         // few events.
193         //
194         // To mitigate this, filter on "created > 10 minutes ago" when
195         // retrieving the list of old event IDs to consider.
196         rows, err := sess.db.Query(
197                 `SELECT id FROM logs WHERE id > $1 AND created_at > $2 ORDER BY id`,
198                 sub.LastLogID,
199                 time.Now().UTC().Add(-10*time.Minute).Format(time.RFC3339Nano))
200         if err != nil {
201                 sess.log.WithError(err).Error("sendOldEvents db.Query failed")
202                 return
203         }
204
205         var ids []int64
206         for rows.Next() {
207                 var id int64
208                 err := rows.Scan(&id)
209                 if err != nil {
210                         sess.log.WithError(err).Error("sendOldEvents row Scan failed")
211                         continue
212                 }
213                 ids = append(ids, id)
214         }
215         if err := rows.Err(); err != nil {
216                 sess.log.WithError(err).Error("sendOldEvents db.Query failed")
217         }
218         rows.Close()
219
220         for _, id := range ids {
221                 for len(sess.sendq)*2 > cap(sess.sendq) {
222                         // Ugly... but if we fill up the whole client
223                         // queue with a backlog of old events, a
224                         // single new event will overflow it and
225                         // terminate the connection, and then the
226                         // client will probably reconnect and do the
227                         // same thing all over again.
228                         time.Sleep(100 * time.Millisecond)
229                         if sess.ws.Request().Context().Err() != nil {
230                                 // Session terminated while we were sleeping
231                                 return
232                         }
233                 }
234                 now := time.Now()
235                 e := &event{
236                         LogID:    id,
237                         Received: now,
238                         Ready:    now,
239                         db:       sess.db,
240                 }
241                 if sub.match(sess, e) {
242                         select {
243                         case sess.sendq <- e:
244                         case <-sess.ws.Request().Context().Done():
245                                 return
246                         }
247                 }
248         }
249 }
250
251 type v0subscribe struct {
252         Method    string
253         Filters   []v0filter
254         LastLogID int64 `json:"last_log_id"`
255
256         funcs []func(*event) bool
257 }
258
259 type v0filter [3]interface{}
260
261 func (sub *v0subscribe) match(sess *v0session, e *event) bool {
262         log := sess.log.WithField("LogID", e.LogID)
263         detail := e.Detail()
264         if detail == nil {
265                 log.Error("match failed, no detail")
266                 return false
267         }
268         log = log.WithField("funcs", len(sub.funcs))
269         for i, f := range sub.funcs {
270                 if !f(e) {
271                         log.WithField("func", i).Debug("match failed")
272                         return false
273                 }
274         }
275         log.Debug("match passed")
276         return true
277 }
278
279 func (sub *v0subscribe) prepare(sess *v0session) {
280         for _, f := range sub.Filters {
281                 if len(f) != 3 {
282                         continue
283                 }
284                 if col, ok := f[0].(string); ok && col == "event_type" {
285                         op, ok := f[1].(string)
286                         if !ok || op != "in" {
287                                 continue
288                         }
289                         arr, ok := f[2].([]interface{})
290                         if !ok {
291                                 continue
292                         }
293                         var strs []string
294                         for _, s := range arr {
295                                 if s, ok := s.(string); ok {
296                                         strs = append(strs, s)
297                                 }
298                         }
299                         sub.funcs = append(sub.funcs, func(e *event) bool {
300                                 for _, s := range strs {
301                                         if s == e.Detail().EventType {
302                                                 return true
303                                         }
304                                 }
305                                 return false
306                         })
307                 } else if ok && col == "created_at" {
308                         op, ok := f[1].(string)
309                         if !ok {
310                                 continue
311                         }
312                         tstr, ok := f[2].(string)
313                         if !ok {
314                                 continue
315                         }
316                         t, err := time.Parse(time.RFC3339Nano, tstr)
317                         if err != nil {
318                                 sess.log.WithField("data", tstr).WithError(err).Info("time.Parse failed")
319                                 continue
320                         }
321                         var fn func(*event) bool
322                         switch op {
323                         case ">=":
324                                 fn = func(e *event) bool {
325                                         return !e.Detail().CreatedAt.Before(t)
326                                 }
327                         case "<=":
328                                 fn = func(e *event) bool {
329                                         return !e.Detail().CreatedAt.After(t)
330                                 }
331                         case ">":
332                                 fn = func(e *event) bool {
333                                         return e.Detail().CreatedAt.After(t)
334                                 }
335                         case "<":
336                                 fn = func(e *event) bool {
337                                         return e.Detail().CreatedAt.Before(t)
338                                 }
339                         case "=":
340                                 fn = func(e *event) bool {
341                                         return e.Detail().CreatedAt.Equal(t)
342                                 }
343                         default:
344                                 sess.log.WithField("operator", op).Info("bogus operator")
345                                 continue
346                         }
347                         sub.funcs = append(sub.funcs, fn)
348                 }
349         }
350 }