8460: Add event_at and props.{new,old}.{pdh,owner} to websocket v0. Fix fuse crash...
[arvados.git] / services / ws / session_v0.go
1 package main
2
3 import (
4         "database/sql"
5         "encoding/json"
6         "errors"
7         "sync"
8         "sync/atomic"
9         "time"
10
11         "git.curoverse.com/arvados.git/sdk/go/arvados"
12         "github.com/Sirupsen/logrus"
13 )
14
15 var (
16         errQueueFull   = errors.New("client queue full")
17         errFrameTooBig = errors.New("frame too big")
18
19         sendObjectAttributes = []string{"state", "name", "owner_uuid", "portable_data_hash"}
20
21         v0subscribeOK   = []byte(`{"status":200}`)
22         v0subscribeFail = []byte(`{"status":400}`)
23 )
24
25 type v0session struct {
26         ac            *arvados.Client
27         ws            wsConn
28         sendq         chan<- interface{}
29         db            *sql.DB
30         permChecker   permChecker
31         subscriptions []v0subscribe
32         lastMsgID     uint64
33         log           *logrus.Entry
34         mtx           sync.Mutex
35         setupOnce     sync.Once
36 }
37
38 func newSessionV0(ws wsConn, sendq chan<- interface{}, db *sql.DB, pc permChecker, ac *arvados.Client) (session, error) {
39         sess := &v0session{
40                 sendq:       sendq,
41                 ws:          ws,
42                 db:          db,
43                 ac:          ac,
44                 permChecker: pc,
45                 log:         logger(ws.Request().Context()),
46         }
47
48         err := ws.Request().ParseForm()
49         if err != nil {
50                 sess.log.WithError(err).Error("ParseForm failed")
51                 return nil, err
52         }
53         token := ws.Request().Form.Get("api_token")
54         sess.permChecker.SetToken(token)
55         sess.log.WithField("token", token).Debug("set token")
56
57         return sess, nil
58 }
59
60 func (sess *v0session) Receive(buf []byte) error {
61         var sub v0subscribe
62         if err := json.Unmarshal(buf, &sub); err != nil {
63                 sess.log.WithError(err).Info("invalid message from client")
64         } else if sub.Method == "subscribe" {
65                 sub.prepare(sess)
66                 sess.log.WithField("sub", sub).Debug("sub prepared")
67                 sess.sendq <- v0subscribeOK
68                 sess.mtx.Lock()
69                 sess.subscriptions = append(sess.subscriptions, sub)
70                 sess.mtx.Unlock()
71                 sub.sendOldEvents(sess)
72                 return nil
73         } else {
74                 sess.log.WithField("Method", sub.Method).Info("unknown method")
75         }
76         sess.sendq <- v0subscribeFail
77         return nil
78 }
79
80 func (sess *v0session) EventMessage(e *event) ([]byte, error) {
81         detail := e.Detail()
82         if detail == nil {
83                 return nil, nil
84         }
85
86         ok, err := sess.permChecker.Check(detail.ObjectUUID)
87         if err != nil || !ok {
88                 return nil, err
89         }
90
91         kind, _ := sess.ac.KindForUUID(detail.ObjectUUID)
92         msg := map[string]interface{}{
93                 "msgID":             atomic.AddUint64(&sess.lastMsgID, 1),
94                 "id":                detail.ID,
95                 "uuid":              detail.UUID,
96                 "object_uuid":       detail.ObjectUUID,
97                 "object_owner_uuid": detail.ObjectOwnerUUID,
98                 "object_kind":       kind,
99                 "event_type":        detail.EventType,
100                 "event_at":          detail.EventAt,
101         }
102         if detail.Properties != nil && detail.Properties["text"] != nil {
103                 msg["properties"] = detail.Properties
104         } else {
105                 msgProps := map[string]map[string]interface{}{}
106                 for _, ak := range []string{"old_attributes", "new_attributes"} {
107                         eventAttrs, ok := detail.Properties[ak].(map[string]interface{})
108                         if !ok {
109                                 continue
110                         }
111                         msgAttrs := map[string]interface{}{}
112                         for _, k := range sendObjectAttributes {
113                                 if v, ok := eventAttrs[k]; ok {
114                                         msgAttrs[k] = v
115                                 }
116                         }
117                         msgProps[ak] = msgAttrs
118                 }
119                 msg["properties"] = msgProps
120         }
121         return json.Marshal(msg)
122 }
123
124 func (sess *v0session) Filter(e *event) bool {
125         sess.mtx.Lock()
126         defer sess.mtx.Unlock()
127         for _, sub := range sess.subscriptions {
128                 if sub.match(sess, e) {
129                         return true
130                 }
131         }
132         return false
133 }
134
135 func (sub *v0subscribe) sendOldEvents(sess *v0session) {
136         if sub.LastLogID == 0 {
137                 return
138         }
139         sess.log.WithField("LastLogID", sub.LastLogID).Debug("getOldEvents")
140         // Here we do a "select id" query and queue an event for every
141         // log since the given ID, then use (*event)Detail() to
142         // retrieve the whole row and decide whether to send it. This
143         // approach is very inefficient if the subscriber asks for
144         // last_log_id==1, even if the filters end up matching very
145         // few events.
146         //
147         // To mitigate this, filter on "created > 10 minutes ago" when
148         // retrieving the list of old event IDs to consider.
149         rows, err := sess.db.Query(
150                 `SELECT id FROM logs WHERE id > $1 AND created_at > $2 ORDER BY id`,
151                 sub.LastLogID,
152                 time.Now().UTC().Add(-10*time.Minute).Format(time.RFC3339Nano))
153         if err != nil {
154                 sess.log.WithError(err).Error("db.Query failed")
155                 return
156         }
157         for rows.Next() {
158                 var id uint64
159                 err := rows.Scan(&id)
160                 if err != nil {
161                         sess.log.WithError(err).Error("row Scan failed")
162                         continue
163                 }
164                 for len(sess.sendq)*2 > cap(sess.sendq) {
165                         // Ugly... but if we fill up the whole client
166                         // queue with a backlog of old events, a
167                         // single new event will overflow it and
168                         // terminate the connection, and then the
169                         // client will probably reconnect and do the
170                         // same thing all over again.
171                         time.Sleep(100 * time.Millisecond)
172                 }
173                 now := time.Now()
174                 e := &event{
175                         LogID:    id,
176                         Received: now,
177                         Ready:    now,
178                         db:       sess.db,
179                 }
180                 if sub.match(sess, e) {
181                         select {
182                         case sess.sendq <- e:
183                         case <-sess.ws.Request().Context().Done():
184                                 return
185                         }
186                 }
187         }
188         if err := rows.Err(); err != nil {
189                 sess.log.WithError(err).Error("db.Query failed")
190         }
191 }
192
193 type v0subscribe struct {
194         Method    string
195         Filters   []v0filter
196         LastLogID int64 `json:"last_log_id"`
197
198         funcs []func(*event) bool
199 }
200
201 type v0filter [3]interface{}
202
203 func (sub *v0subscribe) match(sess *v0session, e *event) bool {
204         log := sess.log.WithField("LogID", e.LogID)
205         detail := e.Detail()
206         if detail == nil {
207                 log.Error("match failed, no detail")
208                 return false
209         }
210         log = log.WithField("funcs", len(sub.funcs))
211         for i, f := range sub.funcs {
212                 if !f(e) {
213                         log.WithField("func", i).Debug("match failed")
214                         return false
215                 }
216         }
217         log.Debug("match passed")
218         return true
219 }
220
221 func (sub *v0subscribe) prepare(sess *v0session) {
222         for _, f := range sub.Filters {
223                 if len(f) != 3 {
224                         continue
225                 }
226                 if col, ok := f[0].(string); ok && col == "event_type" {
227                         op, ok := f[1].(string)
228                         if !ok || op != "in" {
229                                 continue
230                         }
231                         arr, ok := f[2].([]interface{})
232                         if !ok {
233                                 continue
234                         }
235                         var strs []string
236                         for _, s := range arr {
237                                 if s, ok := s.(string); ok {
238                                         strs = append(strs, s)
239                                 }
240                         }
241                         sub.funcs = append(sub.funcs, func(e *event) bool {
242                                 for _, s := range strs {
243                                         if s == e.Detail().EventType {
244                                                 return true
245                                         }
246                                 }
247                                 return false
248                         })
249                 } else if ok && col == "created_at" {
250                         op, ok := f[1].(string)
251                         if !ok {
252                                 continue
253                         }
254                         tstr, ok := f[2].(string)
255                         if !ok {
256                                 continue
257                         }
258                         t, err := time.Parse(time.RFC3339Nano, tstr)
259                         if err != nil {
260                                 sess.log.WithField("data", tstr).WithError(err).Info("time.Parse failed")
261                                 continue
262                         }
263                         var fn func(*event) bool
264                         switch op {
265                         case ">=":
266                                 fn = func(e *event) bool {
267                                         return !e.Detail().CreatedAt.Before(t)
268                                 }
269                         case "<=":
270                                 fn = func(e *event) bool {
271                                         return !e.Detail().CreatedAt.After(t)
272                                 }
273                         case ">":
274                                 fn = func(e *event) bool {
275                                         return e.Detail().CreatedAt.After(t)
276                                 }
277                         case "<":
278                                 fn = func(e *event) bool {
279                                         return e.Detail().CreatedAt.Before(t)
280                                 }
281                         case "=":
282                                 fn = func(e *event) bool {
283                                         return e.Detail().CreatedAt.Equal(t)
284                                 }
285                         default:
286                                 sess.log.WithField("operator", op).Info("bogus operator")
287                                 continue
288                         }
289                         sub.funcs = append(sub.funcs, fn)
290                 }
291         }
292 }