12055: Code simplification.
[arvados.git] / services / ws / session_v0.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "database/sql"
9         "encoding/json"
10         "errors"
11         "sync"
12         "sync/atomic"
13         "time"
14
15         "git.curoverse.com/arvados.git/sdk/go/arvados"
16         "github.com/Sirupsen/logrus"
17 )
18
19 var (
20         errQueueFull   = errors.New("client queue full")
21         errFrameTooBig = errors.New("frame too big")
22
23         // Send clients only these keys from the
24         // log.properties.old_attributes and
25         // log.properties.new_attributes hashes.
26         sendObjectAttributes = []string{
27                 "is_trashed",
28                 "name",
29                 "owner_uuid",
30                 "portable_data_hash",
31                 "state",
32         }
33
34         v0subscribeOK   = []byte(`{"status":200}`)
35         v0subscribeFail = []byte(`{"status":400}`)
36 )
37
38 type v0session struct {
39         ac            *arvados.Client
40         ws            wsConn
41         sendq         chan<- interface{}
42         db            *sql.DB
43         permChecker   permChecker
44         subscriptions []v0subscribe
45         lastMsgID     uint64
46         log           *logrus.Entry
47         mtx           sync.Mutex
48         setupOnce     sync.Once
49 }
50
51 // newSessionV0 returns a v0 session: a partial port of the Rails/puma
52 // implementation, with just enough functionality to support Workbench
53 // and arv-mount.
54 func newSessionV0(ws wsConn, sendq chan<- interface{}, db *sql.DB, pc permChecker, ac *arvados.Client) (session, error) {
55         sess := &v0session{
56                 sendq:       sendq,
57                 ws:          ws,
58                 db:          db,
59                 ac:          ac,
60                 permChecker: pc,
61                 log:         logger(ws.Request().Context()),
62         }
63
64         err := ws.Request().ParseForm()
65         if err != nil {
66                 sess.log.WithError(err).Error("ParseForm failed")
67                 return nil, err
68         }
69         token := ws.Request().Form.Get("api_token")
70         sess.permChecker.SetToken(token)
71         sess.log.WithField("token", token).Debug("set token")
72
73         return sess, nil
74 }
75
76 func (sess *v0session) Receive(buf []byte) error {
77         var sub v0subscribe
78         if err := json.Unmarshal(buf, &sub); err != nil {
79                 sess.log.WithError(err).Info("invalid message from client")
80         } else if sub.Method == "subscribe" {
81                 sub.prepare(sess)
82                 sess.log.WithField("sub", sub).Debug("sub prepared")
83                 sess.sendq <- v0subscribeOK
84                 sess.mtx.Lock()
85                 sess.subscriptions = append(sess.subscriptions, sub)
86                 sess.mtx.Unlock()
87                 sub.sendOldEvents(sess)
88                 return nil
89         } else {
90                 sess.log.WithField("Method", sub.Method).Info("unknown method")
91         }
92         sess.sendq <- v0subscribeFail
93         return nil
94 }
95
96 func (sess *v0session) EventMessage(e *event) ([]byte, error) {
97         detail := e.Detail()
98         if detail == nil {
99                 return nil, nil
100         }
101
102         var permTarget string
103         if detail.EventType == "delete" {
104                 // It's pointless to check permission by reading
105                 // ObjectUUID if it has just been deleted, but if the
106                 // client has permission on the parent project then
107                 // it's OK to send the event.
108                 permTarget = detail.ObjectOwnerUUID
109         } else {
110                 permTarget = detail.ObjectUUID
111         }
112         ok, err := sess.permChecker.Check(permTarget)
113         if err != nil || !ok {
114                 return nil, err
115         }
116
117         kind, _ := sess.ac.KindForUUID(detail.ObjectUUID)
118         msg := map[string]interface{}{
119                 "msgID":             atomic.AddUint64(&sess.lastMsgID, 1),
120                 "id":                detail.ID,
121                 "uuid":              detail.UUID,
122                 "object_uuid":       detail.ObjectUUID,
123                 "object_owner_uuid": detail.ObjectOwnerUUID,
124                 "object_kind":       kind,
125                 "event_type":        detail.EventType,
126                 "event_at":          detail.EventAt,
127         }
128         if detail.Properties != nil && detail.Properties["text"] != nil {
129                 msg["properties"] = detail.Properties
130         } else {
131                 msgProps := map[string]map[string]interface{}{}
132                 for _, ak := range []string{"old_attributes", "new_attributes"} {
133                         eventAttrs, ok := detail.Properties[ak].(map[string]interface{})
134                         if !ok {
135                                 continue
136                         }
137                         msgAttrs := map[string]interface{}{}
138                         for _, k := range sendObjectAttributes {
139                                 if v, ok := eventAttrs[k]; ok {
140                                         msgAttrs[k] = v
141                                 }
142                         }
143                         msgProps[ak] = msgAttrs
144                 }
145                 msg["properties"] = msgProps
146         }
147         return json.Marshal(msg)
148 }
149
150 func (sess *v0session) Filter(e *event) bool {
151         sess.mtx.Lock()
152         defer sess.mtx.Unlock()
153         for _, sub := range sess.subscriptions {
154                 if sub.match(sess, e) {
155                         return true
156                 }
157         }
158         return false
159 }
160
161 func (sub *v0subscribe) sendOldEvents(sess *v0session) {
162         if sub.LastLogID == 0 {
163                 return
164         }
165         sess.log.WithField("LastLogID", sub.LastLogID).Debug("sendOldEvents")
166         // Here we do a "select id" query and queue an event for every
167         // log since the given ID, then use (*event)Detail() to
168         // retrieve the whole row and decide whether to send it. This
169         // approach is very inefficient if the subscriber asks for
170         // last_log_id==1, even if the filters end up matching very
171         // few events.
172         //
173         // To mitigate this, filter on "created > 10 minutes ago" when
174         // retrieving the list of old event IDs to consider.
175         rows, err := sess.db.Query(
176                 `SELECT id FROM logs WHERE id > $1 AND created_at > $2 ORDER BY id`,
177                 sub.LastLogID,
178                 time.Now().UTC().Add(-10*time.Minute).Format(time.RFC3339Nano))
179         if err != nil {
180                 sess.log.WithError(err).Error("sendOldEvents db.Query failed")
181                 return
182         }
183
184         var ids []uint64
185         for rows.Next() {
186                 var id uint64
187                 err := rows.Scan(&id)
188                 if err != nil {
189                         sess.log.WithError(err).Error("sendOldEvents row Scan failed")
190                         continue
191                 }
192                 ids = append(ids, id)
193         }
194         if err := rows.Err(); err != nil {
195                 sess.log.WithError(err).Error("sendOldEvents db.Query failed")
196         }
197         rows.Close()
198
199         for _, id := range ids {
200                 for len(sess.sendq)*2 > cap(sess.sendq) {
201                         // Ugly... but if we fill up the whole client
202                         // queue with a backlog of old events, a
203                         // single new event will overflow it and
204                         // terminate the connection, and then the
205                         // client will probably reconnect and do the
206                         // same thing all over again.
207                         time.Sleep(100 * time.Millisecond)
208                         if sess.ws.Request().Context().Err() != nil {
209                                 // Session terminated while we were sleeping
210                                 return
211                         }
212                 }
213                 now := time.Now()
214                 e := &event{
215                         LogID:    id,
216                         Received: now,
217                         Ready:    now,
218                         db:       sess.db,
219                 }
220                 if sub.match(sess, e) {
221                         select {
222                         case sess.sendq <- e:
223                         case <-sess.ws.Request().Context().Done():
224                                 return
225                         }
226                 }
227         }
228 }
229
230 type v0subscribe struct {
231         Method    string
232         Filters   []v0filter
233         LastLogID int64 `json:"last_log_id"`
234
235         funcs []func(*event) bool
236 }
237
238 type v0filter [3]interface{}
239
240 func (sub *v0subscribe) match(sess *v0session, e *event) bool {
241         log := sess.log.WithField("LogID", e.LogID)
242         detail := e.Detail()
243         if detail == nil {
244                 log.Error("match failed, no detail")
245                 return false
246         }
247         log = log.WithField("funcs", len(sub.funcs))
248         for i, f := range sub.funcs {
249                 if !f(e) {
250                         log.WithField("func", i).Debug("match failed")
251                         return false
252                 }
253         }
254         log.Debug("match passed")
255         return true
256 }
257
258 func (sub *v0subscribe) prepare(sess *v0session) {
259         for _, f := range sub.Filters {
260                 if len(f) != 3 {
261                         continue
262                 }
263                 if col, ok := f[0].(string); ok && col == "event_type" {
264                         op, ok := f[1].(string)
265                         if !ok || op != "in" {
266                                 continue
267                         }
268                         arr, ok := f[2].([]interface{})
269                         if !ok {
270                                 continue
271                         }
272                         var strs []string
273                         for _, s := range arr {
274                                 if s, ok := s.(string); ok {
275                                         strs = append(strs, s)
276                                 }
277                         }
278                         sub.funcs = append(sub.funcs, func(e *event) bool {
279                                 for _, s := range strs {
280                                         if s == e.Detail().EventType {
281                                                 return true
282                                         }
283                                 }
284                                 return false
285                         })
286                 } else if ok && col == "created_at" {
287                         op, ok := f[1].(string)
288                         if !ok {
289                                 continue
290                         }
291                         tstr, ok := f[2].(string)
292                         if !ok {
293                                 continue
294                         }
295                         t, err := time.Parse(time.RFC3339Nano, tstr)
296                         if err != nil {
297                                 sess.log.WithField("data", tstr).WithError(err).Info("time.Parse failed")
298                                 continue
299                         }
300                         var fn func(*event) bool
301                         switch op {
302                         case ">=":
303                                 fn = func(e *event) bool {
304                                         return !e.Detail().CreatedAt.Before(t)
305                                 }
306                         case "<=":
307                                 fn = func(e *event) bool {
308                                         return !e.Detail().CreatedAt.After(t)
309                                 }
310                         case ">":
311                                 fn = func(e *event) bool {
312                                         return e.Detail().CreatedAt.After(t)
313                                 }
314                         case "<":
315                                 fn = func(e *event) bool {
316                                         return e.Detail().CreatedAt.Before(t)
317                                 }
318                         case "=":
319                                 fn = func(e *event) bool {
320                                         return e.Detail().CreatedAt.Equal(t)
321                                 }
322                         default:
323                                 sess.log.WithField("operator", op).Info("bogus operator")
324                                 continue
325                         }
326                         sub.funcs = append(sub.funcs, fn)
327                 }
328         }
329 }