8460: Structured logging.
[arvados.git] / services / ws / session_v0.go
1 package main
2
3 import (
4         "database/sql"
5         "encoding/json"
6         "errors"
7         "sync"
8         "time"
9
10         "git.curoverse.com/arvados.git/sdk/go/arvados"
11         log "github.com/Sirupsen/logrus"
12 )
13
14 var (
15         errQueueFull   = errors.New("client queue full")
16         errFrameTooBig = errors.New("frame too big")
17
18         sendObjectAttributes = []string{"state", "name"}
19
20         v0subscribeOK   = []byte(`{"status":200}`)
21         v0subscribeFail = []byte(`{"status":400}`)
22 )
23
24 type v0session struct {
25         ws            wsConn
26         db            *sql.DB
27         permChecker   permChecker
28         subscriptions []v0subscribe
29         log           *log.Entry
30         mtx           sync.Mutex
31         setupOnce     sync.Once
32 }
33
34 func NewSessionV0(ws wsConn, ac arvados.Client, db *sql.DB) (session, error) {
35         sess := &v0session{
36                 ws:          ws,
37                 db:          db,
38                 permChecker: NewPermChecker(ac),
39                 log:         logger(ws.Request().Context()),
40         }
41
42         err := ws.Request().ParseForm()
43         if err != nil {
44                 sess.log.WithError(err).Error("ParseForm failed")
45                 return nil, err
46         }
47         token := ws.Request().Form.Get("api_token")
48         sess.permChecker.SetToken(token)
49         sess.log.WithField("token", token).Debug("set token")
50
51         return sess, nil
52 }
53
54 func (sess *v0session) Receive(msg map[string]interface{}, buf []byte) [][]byte {
55         sess.log.WithField("data", msg).Debug("received message")
56         var sub v0subscribe
57         if err := json.Unmarshal(buf, &sub); err != nil {
58                 sess.log.WithError(err).Info("ignored invalid request")
59                 return nil
60         }
61         if sub.Method == "subscribe" {
62                 sub.prepare(sess)
63                 sess.log.WithField("sub", sub).Debug("sub prepared")
64                 sess.mtx.Lock()
65                 sess.subscriptions = append(sess.subscriptions, sub)
66                 sess.mtx.Unlock()
67
68                 return append([][]byte{v0subscribeOK}, sub.getOldEvents(sess)...)
69         }
70         return [][]byte{v0subscribeFail}
71 }
72
73 func (sess *v0session) EventMessage(e *event) ([]byte, error) {
74         detail := e.Detail()
75         if detail == nil {
76                 return nil, nil
77         }
78
79         ok, err := sess.permChecker.Check(detail.ObjectUUID)
80         if err != nil || !ok {
81                 return nil, err
82         }
83
84         msg := map[string]interface{}{
85                 "msgID":             e.Serial,
86                 "id":                detail.ID,
87                 "uuid":              detail.UUID,
88                 "object_uuid":       detail.ObjectUUID,
89                 "object_owner_uuid": detail.ObjectOwnerUUID,
90                 "event_type":        detail.EventType,
91         }
92         if detail.Properties != nil && detail.Properties["text"] != nil {
93                 msg["properties"] = detail.Properties
94         } else {
95                 msgProps := map[string]map[string]interface{}{}
96                 for _, ak := range []string{"old_attributes", "new_attributes"} {
97                         eventAttrs, ok := detail.Properties[ak].(map[string]interface{})
98                         if !ok {
99                                 continue
100                         }
101                         msgAttrs := map[string]interface{}{}
102                         for _, k := range sendObjectAttributes {
103                                 if v, ok := eventAttrs[k]; ok {
104                                         msgAttrs[k] = v
105                                 }
106                         }
107                         msgProps[ak] = msgAttrs
108                 }
109                 msg["properties"] = msgProps
110         }
111         return json.Marshal(msg)
112 }
113
114 func (sess *v0session) Filter(e *event) bool {
115         sess.mtx.Lock()
116         defer sess.mtx.Unlock()
117         for _, sub := range sess.subscriptions {
118                 if sub.match(sess, e) {
119                         return true
120                 }
121         }
122         return false
123 }
124
125 func (sub *v0subscribe) getOldEvents(sess *v0session) (msgs [][]byte) {
126         if sub.LastLogID == 0 {
127                 return
128         }
129         sess.log.WithField("LastLogID", sub.LastLogID).Debug("getOldEvents")
130         // Here we do a "select id" query and queue an event for every
131         // log since the given ID, then use (*event)Detail() to
132         // retrieve the whole row and decide whether to send it. This
133         // approach is very inefficient if the subscriber asks for
134         // last_log_id==1, even if the filters end up matching very
135         // few events.
136         //
137         // To mitigate this, filter on "created > 10 minutes ago" when
138         // retrieving the list of old event IDs to consider.
139         rows, err := sess.db.Query(
140                 `SELECT id FROM logs WHERE id > $1 AND created_at > $2 ORDER BY id`,
141                 sub.LastLogID,
142                 time.Now().UTC().Add(-10*time.Minute).Format(time.RFC3339Nano))
143         if err != nil {
144                 sess.log.WithError(err).Error("db.Query failed")
145                 return
146         }
147         for rows.Next() {
148                 var id uint64
149                 err := rows.Scan(&id)
150                 if err != nil {
151                         sess.log.WithError(err).Error("row Scan failed")
152                         continue
153                 }
154                 e := &event{
155                         LogID:    id,
156                         Received: time.Now(),
157                         db:       sess.db,
158                 }
159                 if !sub.match(sess, e) {
160                         sess.log.WithField("event", e).Debug("skip old event")
161                         continue
162                 }
163                 msg, err := sess.EventMessage(e)
164                 if err != nil {
165                         sess.log.WithError(err).Error("event marshal failed")
166                         continue
167                 }
168                 sess.log.WithField("data", msg).Debug("will queue old event")
169                 msgs = append(msgs, msg)
170         }
171         if err := rows.Err(); err != nil {
172                 sess.log.WithError(err).Error("db.Query failed")
173         }
174         return
175 }
176
177 type v0subscribe struct {
178         Method    string
179         Filters   []v0filter
180         LastLogID int64 `json:"last_log_id"`
181
182         funcs []func(*event) bool
183 }
184
185 type v0filter [3]interface{}
186
187 func (sub *v0subscribe) match(sess *v0session, e *event) bool {
188         log := sess.log.WithField("LogID", e.LogID)
189         detail := e.Detail()
190         if detail == nil {
191                 log.Error("match failed, no detail")
192                 return false
193         }
194         log = log.WithField("funcs", len(sub.funcs))
195         for i, f := range sub.funcs {
196                 if !f(e) {
197                         log.WithField("func", i).Debug("match failed")
198                         return false
199                 }
200         }
201         log.Debug("match passed")
202         return true
203 }
204
205 func (sub *v0subscribe) prepare(sess *v0session) {
206         for _, f := range sub.Filters {
207                 if len(f) != 3 {
208                         continue
209                 }
210                 if col, ok := f[0].(string); ok && col == "event_type" {
211                         op, ok := f[1].(string)
212                         if !ok || op != "in" {
213                                 continue
214                         }
215                         arr, ok := f[2].([]interface{})
216                         if !ok {
217                                 continue
218                         }
219                         var strs []string
220                         for _, s := range arr {
221                                 if s, ok := s.(string); ok {
222                                         strs = append(strs, s)
223                                 }
224                         }
225                         sub.funcs = append(sub.funcs, func(e *event) bool {
226                                 for _, s := range strs {
227                                         if s == e.Detail().EventType {
228                                                 return true
229                                         }
230                                 }
231                                 return false
232                         })
233                 } else if ok && col == "created_at" {
234                         op, ok := f[1].(string)
235                         if !ok {
236                                 continue
237                         }
238                         tstr, ok := f[2].(string)
239                         if !ok {
240                                 continue
241                         }
242                         t, err := time.Parse(time.RFC3339Nano, tstr)
243                         if err != nil {
244                                 sess.log.WithField("data", tstr).WithError(err).Info("time.Parse failed")
245                                 continue
246                         }
247                         var fn func(*event) bool
248                         switch op {
249                         case ">=":
250                                 fn = func(e *event) bool {
251                                         return !e.Detail().CreatedAt.Before(t)
252                                 }
253                         case "<=":
254                                 fn = func(e *event) bool {
255                                         return !e.Detail().CreatedAt.After(t)
256                                 }
257                         case ">":
258                                 fn = func(e *event) bool {
259                                         return e.Detail().CreatedAt.After(t)
260                                 }
261                         case "<":
262                                 fn = func(e *event) bool {
263                                         return e.Detail().CreatedAt.Before(t)
264                                 }
265                         case "=":
266                                 fn = func(e *event) bool {
267                                         return e.Detail().CreatedAt.Equal(t)
268                                 }
269                         default:
270                                 sess.log.WithField("operator", op).Info("bogus operator")
271                                 continue
272                         }
273                         sub.funcs = append(sub.funcs, fn)
274                 }
275         }
276 }