Merge branch '8784-dir-listings'
[arvados.git] / services / ws / session_v0.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "database/sql"
9         "encoding/json"
10         "errors"
11         "sync"
12         "sync/atomic"
13         "time"
14
15         "git.curoverse.com/arvados.git/sdk/go/arvados"
16         "github.com/Sirupsen/logrus"
17 )
18
19 var (
20         errQueueFull   = errors.New("client queue full")
21         errFrameTooBig = errors.New("frame too big")
22
23         sendObjectAttributes = []string{"state", "name", "owner_uuid", "portable_data_hash"}
24
25         v0subscribeOK   = []byte(`{"status":200}`)
26         v0subscribeFail = []byte(`{"status":400}`)
27 )
28
29 type v0session struct {
30         ac            *arvados.Client
31         ws            wsConn
32         sendq         chan<- interface{}
33         db            *sql.DB
34         permChecker   permChecker
35         subscriptions []v0subscribe
36         lastMsgID     uint64
37         log           *logrus.Entry
38         mtx           sync.Mutex
39         setupOnce     sync.Once
40 }
41
42 // newSessionV0 returns a v0 session: a partial port of the Rails/puma
43 // implementation, with just enough functionality to support Workbench
44 // and arv-mount.
45 func newSessionV0(ws wsConn, sendq chan<- interface{}, db *sql.DB, pc permChecker, ac *arvados.Client) (session, error) {
46         sess := &v0session{
47                 sendq:       sendq,
48                 ws:          ws,
49                 db:          db,
50                 ac:          ac,
51                 permChecker: pc,
52                 log:         logger(ws.Request().Context()),
53         }
54
55         err := ws.Request().ParseForm()
56         if err != nil {
57                 sess.log.WithError(err).Error("ParseForm failed")
58                 return nil, err
59         }
60         token := ws.Request().Form.Get("api_token")
61         sess.permChecker.SetToken(token)
62         sess.log.WithField("token", token).Debug("set token")
63
64         return sess, nil
65 }
66
67 func (sess *v0session) Receive(buf []byte) error {
68         var sub v0subscribe
69         if err := json.Unmarshal(buf, &sub); err != nil {
70                 sess.log.WithError(err).Info("invalid message from client")
71         } else if sub.Method == "subscribe" {
72                 sub.prepare(sess)
73                 sess.log.WithField("sub", sub).Debug("sub prepared")
74                 sess.sendq <- v0subscribeOK
75                 sess.mtx.Lock()
76                 sess.subscriptions = append(sess.subscriptions, sub)
77                 sess.mtx.Unlock()
78                 sub.sendOldEvents(sess)
79                 return nil
80         } else {
81                 sess.log.WithField("Method", sub.Method).Info("unknown method")
82         }
83         sess.sendq <- v0subscribeFail
84         return nil
85 }
86
87 func (sess *v0session) EventMessage(e *event) ([]byte, error) {
88         detail := e.Detail()
89         if detail == nil {
90                 return nil, nil
91         }
92
93         ok, err := sess.permChecker.Check(detail.ObjectUUID)
94         if err != nil || !ok {
95                 return nil, err
96         }
97
98         kind, _ := sess.ac.KindForUUID(detail.ObjectUUID)
99         msg := map[string]interface{}{
100                 "msgID":             atomic.AddUint64(&sess.lastMsgID, 1),
101                 "id":                detail.ID,
102                 "uuid":              detail.UUID,
103                 "object_uuid":       detail.ObjectUUID,
104                 "object_owner_uuid": detail.ObjectOwnerUUID,
105                 "object_kind":       kind,
106                 "event_type":        detail.EventType,
107                 "event_at":          detail.EventAt,
108         }
109         if detail.Properties != nil && detail.Properties["text"] != nil {
110                 msg["properties"] = detail.Properties
111         } else {
112                 msgProps := map[string]map[string]interface{}{}
113                 for _, ak := range []string{"old_attributes", "new_attributes"} {
114                         eventAttrs, ok := detail.Properties[ak].(map[string]interface{})
115                         if !ok {
116                                 continue
117                         }
118                         msgAttrs := map[string]interface{}{}
119                         for _, k := range sendObjectAttributes {
120                                 if v, ok := eventAttrs[k]; ok {
121                                         msgAttrs[k] = v
122                                 }
123                         }
124                         msgProps[ak] = msgAttrs
125                 }
126                 msg["properties"] = msgProps
127         }
128         return json.Marshal(msg)
129 }
130
131 func (sess *v0session) Filter(e *event) bool {
132         sess.mtx.Lock()
133         defer sess.mtx.Unlock()
134         for _, sub := range sess.subscriptions {
135                 if sub.match(sess, e) {
136                         return true
137                 }
138         }
139         return false
140 }
141
142 func (sub *v0subscribe) sendOldEvents(sess *v0session) {
143         if sub.LastLogID == 0 {
144                 return
145         }
146         sess.log.WithField("LastLogID", sub.LastLogID).Debug("getOldEvents")
147         // Here we do a "select id" query and queue an event for every
148         // log since the given ID, then use (*event)Detail() to
149         // retrieve the whole row and decide whether to send it. This
150         // approach is very inefficient if the subscriber asks for
151         // last_log_id==1, even if the filters end up matching very
152         // few events.
153         //
154         // To mitigate this, filter on "created > 10 minutes ago" when
155         // retrieving the list of old event IDs to consider.
156         rows, err := sess.db.Query(
157                 `SELECT id FROM logs WHERE id > $1 AND created_at > $2 ORDER BY id`,
158                 sub.LastLogID,
159                 time.Now().UTC().Add(-10*time.Minute).Format(time.RFC3339Nano))
160         if err != nil {
161                 sess.log.WithError(err).Error("db.Query failed")
162                 return
163         }
164         defer rows.Close()
165         for rows.Next() {
166                 var id uint64
167                 err := rows.Scan(&id)
168                 if err != nil {
169                         sess.log.WithError(err).Error("row Scan failed")
170                         continue
171                 }
172                 for len(sess.sendq)*2 > cap(sess.sendq) {
173                         // Ugly... but if we fill up the whole client
174                         // queue with a backlog of old events, a
175                         // single new event will overflow it and
176                         // terminate the connection, and then the
177                         // client will probably reconnect and do the
178                         // same thing all over again.
179                         time.Sleep(100 * time.Millisecond)
180                 }
181                 now := time.Now()
182                 e := &event{
183                         LogID:    id,
184                         Received: now,
185                         Ready:    now,
186                         db:       sess.db,
187                 }
188                 if sub.match(sess, e) {
189                         select {
190                         case sess.sendq <- e:
191                         case <-sess.ws.Request().Context().Done():
192                                 return
193                         }
194                 }
195         }
196         if err := rows.Err(); err != nil {
197                 sess.log.WithError(err).Error("db.Query failed")
198         }
199 }
200
201 type v0subscribe struct {
202         Method    string
203         Filters   []v0filter
204         LastLogID int64 `json:"last_log_id"`
205
206         funcs []func(*event) bool
207 }
208
209 type v0filter [3]interface{}
210
211 func (sub *v0subscribe) match(sess *v0session, e *event) bool {
212         log := sess.log.WithField("LogID", e.LogID)
213         detail := e.Detail()
214         if detail == nil {
215                 log.Error("match failed, no detail")
216                 return false
217         }
218         log = log.WithField("funcs", len(sub.funcs))
219         for i, f := range sub.funcs {
220                 if !f(e) {
221                         log.WithField("func", i).Debug("match failed")
222                         return false
223                 }
224         }
225         log.Debug("match passed")
226         return true
227 }
228
229 func (sub *v0subscribe) prepare(sess *v0session) {
230         for _, f := range sub.Filters {
231                 if len(f) != 3 {
232                         continue
233                 }
234                 if col, ok := f[0].(string); ok && col == "event_type" {
235                         op, ok := f[1].(string)
236                         if !ok || op != "in" {
237                                 continue
238                         }
239                         arr, ok := f[2].([]interface{})
240                         if !ok {
241                                 continue
242                         }
243                         var strs []string
244                         for _, s := range arr {
245                                 if s, ok := s.(string); ok {
246                                         strs = append(strs, s)
247                                 }
248                         }
249                         sub.funcs = append(sub.funcs, func(e *event) bool {
250                                 for _, s := range strs {
251                                         if s == e.Detail().EventType {
252                                                 return true
253                                         }
254                                 }
255                                 return false
256                         })
257                 } else if ok && col == "created_at" {
258                         op, ok := f[1].(string)
259                         if !ok {
260                                 continue
261                         }
262                         tstr, ok := f[2].(string)
263                         if !ok {
264                                 continue
265                         }
266                         t, err := time.Parse(time.RFC3339Nano, tstr)
267                         if err != nil {
268                                 sess.log.WithField("data", tstr).WithError(err).Info("time.Parse failed")
269                                 continue
270                         }
271                         var fn func(*event) bool
272                         switch op {
273                         case ">=":
274                                 fn = func(e *event) bool {
275                                         return !e.Detail().CreatedAt.Before(t)
276                                 }
277                         case "<=":
278                                 fn = func(e *event) bool {
279                                         return !e.Detail().CreatedAt.After(t)
280                                 }
281                         case ">":
282                                 fn = func(e *event) bool {
283                                         return e.Detail().CreatedAt.After(t)
284                                 }
285                         case "<":
286                                 fn = func(e *event) bool {
287                                         return e.Detail().CreatedAt.Before(t)
288                                 }
289                         case "=":
290                                 fn = func(e *event) bool {
291                                         return e.Detail().CreatedAt.Equal(t)
292                                 }
293                         default:
294                                 sess.log.WithField("operator", op).Info("bogus operator")
295                                 continue
296                         }
297                         sub.funcs = append(sub.funcs, fn)
298                 }
299         }
300 }