8460: Merge branch 'master' into 8460-websocket-go
[arvados.git] / services / ws / session_v0.go
1 package main
2
3 import (
4         "database/sql"
5         "encoding/json"
6         "errors"
7         "sync"
8         "sync/atomic"
9         "time"
10
11         "github.com/Sirupsen/logrus"
12 )
13
14 var (
15         errQueueFull   = errors.New("client queue full")
16         errFrameTooBig = errors.New("frame too big")
17
18         sendObjectAttributes = []string{"state", "name"}
19
20         v0subscribeOK   = []byte(`{"status":200}`)
21         v0subscribeFail = []byte(`{"status":400}`)
22 )
23
24 type v0session struct {
25         ws            wsConn
26         sendq         chan<- interface{}
27         db            *sql.DB
28         permChecker   permChecker
29         subscriptions []v0subscribe
30         lastMsgID     uint64
31         log           *logrus.Entry
32         mtx           sync.Mutex
33         setupOnce     sync.Once
34 }
35
36 func NewSessionV0(ws wsConn, sendq chan<- interface{}, db *sql.DB, pc permChecker) (session, error) {
37         sess := &v0session{
38                 sendq:       sendq,
39                 ws:          ws,
40                 db:          db,
41                 permChecker: pc,
42                 log:         logger(ws.Request().Context()),
43         }
44
45         err := ws.Request().ParseForm()
46         if err != nil {
47                 sess.log.WithError(err).Error("ParseForm failed")
48                 return nil, err
49         }
50         token := ws.Request().Form.Get("api_token")
51         sess.permChecker.SetToken(token)
52         sess.log.WithField("token", token).Debug("set token")
53
54         return sess, nil
55 }
56
57 func (sess *v0session) Receive(buf []byte) error {
58         var sub v0subscribe
59         if err := json.Unmarshal(buf, &sub); err != nil {
60                 sess.log.WithError(err).Info("invalid message from client")
61         } else if sub.Method == "subscribe" {
62                 sub.prepare(sess)
63                 sess.log.WithField("sub", sub).Debug("sub prepared")
64                 sess.sendq <- v0subscribeOK
65                 sess.mtx.Lock()
66                 sess.subscriptions = append(sess.subscriptions, sub)
67                 sess.mtx.Unlock()
68                 sub.sendOldEvents(sess)
69                 return nil
70         } else {
71                 sess.log.WithField("Method", sub.Method).Info("unknown method")
72         }
73         sess.sendq <- v0subscribeFail
74         return nil
75 }
76
77 func (sess *v0session) EventMessage(e *event) ([]byte, error) {
78         detail := e.Detail()
79         if detail == nil {
80                 return nil, nil
81         }
82
83         ok, err := sess.permChecker.Check(detail.ObjectUUID)
84         if err != nil || !ok {
85                 return nil, err
86         }
87
88         msg := map[string]interface{}{
89                 "msgID":             atomic.AddUint64(&sess.lastMsgID, 1),
90                 "id":                detail.ID,
91                 "uuid":              detail.UUID,
92                 "object_uuid":       detail.ObjectUUID,
93                 "object_owner_uuid": detail.ObjectOwnerUUID,
94                 "event_type":        detail.EventType,
95         }
96         if detail.Properties != nil && detail.Properties["text"] != nil {
97                 msg["properties"] = detail.Properties
98         } else {
99                 msgProps := map[string]map[string]interface{}{}
100                 for _, ak := range []string{"old_attributes", "new_attributes"} {
101                         eventAttrs, ok := detail.Properties[ak].(map[string]interface{})
102                         if !ok {
103                                 continue
104                         }
105                         msgAttrs := map[string]interface{}{}
106                         for _, k := range sendObjectAttributes {
107                                 if v, ok := eventAttrs[k]; ok {
108                                         msgAttrs[k] = v
109                                 }
110                         }
111                         msgProps[ak] = msgAttrs
112                 }
113                 msg["properties"] = msgProps
114         }
115         return json.Marshal(msg)
116 }
117
118 func (sess *v0session) Filter(e *event) bool {
119         sess.mtx.Lock()
120         defer sess.mtx.Unlock()
121         for _, sub := range sess.subscriptions {
122                 if sub.match(sess, e) {
123                         return true
124                 }
125         }
126         return false
127 }
128
129 func (sub *v0subscribe) sendOldEvents(sess *v0session) {
130         if sub.LastLogID == 0 {
131                 return
132         }
133         sess.log.WithField("LastLogID", sub.LastLogID).Debug("getOldEvents")
134         // Here we do a "select id" query and queue an event for every
135         // log since the given ID, then use (*event)Detail() to
136         // retrieve the whole row and decide whether to send it. This
137         // approach is very inefficient if the subscriber asks for
138         // last_log_id==1, even if the filters end up matching very
139         // few events.
140         //
141         // To mitigate this, filter on "created > 10 minutes ago" when
142         // retrieving the list of old event IDs to consider.
143         rows, err := sess.db.Query(
144                 `SELECT id FROM logs WHERE id > $1 AND created_at > $2 ORDER BY id`,
145                 sub.LastLogID,
146                 time.Now().UTC().Add(-10*time.Minute).Format(time.RFC3339Nano))
147         if err != nil {
148                 sess.log.WithError(err).Error("db.Query failed")
149                 return
150         }
151         for rows.Next() {
152                 var id uint64
153                 err := rows.Scan(&id)
154                 if err != nil {
155                         sess.log.WithError(err).Error("row Scan failed")
156                         continue
157                 }
158                 for len(sess.sendq)*2 > cap(sess.sendq) {
159                         // Ugly... but if we fill up the whole client
160                         // queue with a backlog of old events, a
161                         // single new event will overflow it and
162                         // terminate the connection, and then the
163                         // client will probably reconnect and do the
164                         // same thing all over again.
165                         time.Sleep(100 * time.Millisecond)
166                 }
167                 now := time.Now()
168                 e := &event{
169                         LogID:    id,
170                         Received: now,
171                         Ready:    now,
172                         db:       sess.db,
173                 }
174                 if sub.match(sess, e) {
175                         select {
176                         case sess.sendq <- e:
177                         case <-sess.ws.Request().Context().Done():
178                                 return
179                         }
180                 }
181         }
182         if err := rows.Err(); err != nil {
183                 sess.log.WithError(err).Error("db.Query failed")
184         }
185 }
186
187 type v0subscribe struct {
188         Method    string
189         Filters   []v0filter
190         LastLogID int64 `json:"last_log_id"`
191
192         funcs []func(*event) bool
193 }
194
195 type v0filter [3]interface{}
196
197 func (sub *v0subscribe) match(sess *v0session, e *event) bool {
198         log := sess.log.WithField("LogID", e.LogID)
199         detail := e.Detail()
200         if detail == nil {
201                 log.Error("match failed, no detail")
202                 return false
203         }
204         log = log.WithField("funcs", len(sub.funcs))
205         for i, f := range sub.funcs {
206                 if !f(e) {
207                         log.WithField("func", i).Debug("match failed")
208                         return false
209                 }
210         }
211         log.Debug("match passed")
212         return true
213 }
214
215 func (sub *v0subscribe) prepare(sess *v0session) {
216         for _, f := range sub.Filters {
217                 if len(f) != 3 {
218                         continue
219                 }
220                 if col, ok := f[0].(string); ok && col == "event_type" {
221                         op, ok := f[1].(string)
222                         if !ok || op != "in" {
223                                 continue
224                         }
225                         arr, ok := f[2].([]interface{})
226                         if !ok {
227                                 continue
228                         }
229                         var strs []string
230                         for _, s := range arr {
231                                 if s, ok := s.(string); ok {
232                                         strs = append(strs, s)
233                                 }
234                         }
235                         sub.funcs = append(sub.funcs, func(e *event) bool {
236                                 for _, s := range strs {
237                                         if s == e.Detail().EventType {
238                                                 return true
239                                         }
240                                 }
241                                 return false
242                         })
243                 } else if ok && col == "created_at" {
244                         op, ok := f[1].(string)
245                         if !ok {
246                                 continue
247                         }
248                         tstr, ok := f[2].(string)
249                         if !ok {
250                                 continue
251                         }
252                         t, err := time.Parse(time.RFC3339Nano, tstr)
253                         if err != nil {
254                                 sess.log.WithField("data", tstr).WithError(err).Info("time.Parse failed")
255                                 continue
256                         }
257                         var fn func(*event) bool
258                         switch op {
259                         case ">=":
260                                 fn = func(e *event) bool {
261                                         return !e.Detail().CreatedAt.Before(t)
262                                 }
263                         case "<=":
264                                 fn = func(e *event) bool {
265                                         return !e.Detail().CreatedAt.After(t)
266                                 }
267                         case ">":
268                                 fn = func(e *event) bool {
269                                         return e.Detail().CreatedAt.After(t)
270                                 }
271                         case "<":
272                                 fn = func(e *event) bool {
273                                         return e.Detail().CreatedAt.Before(t)
274                                 }
275                         case "=":
276                                 fn = func(e *event) bool {
277                                         return e.Detail().CreatedAt.Equal(t)
278                                 }
279                         default:
280                                 sess.log.WithField("operator", op).Info("bogus operator")
281                                 continue
282                         }
283                         sub.funcs = append(sub.funcs, fn)
284                 }
285         }
286 }