8460: Fix up logging.
[arvados.git] / services / ws / session_v0.go
1 package main
2
3 import (
4         "database/sql"
5         "encoding/json"
6         "errors"
7         "sync"
8         "sync/atomic"
9         "time"
10
11         "git.curoverse.com/arvados.git/sdk/go/arvados"
12         "github.com/Sirupsen/logrus"
13 )
14
15 var (
16         errQueueFull   = errors.New("client queue full")
17         errFrameTooBig = errors.New("frame too big")
18
19         sendObjectAttributes = []string{"state", "name"}
20
21         v0subscribeOK   = []byte(`{"status":200}`)
22         v0subscribeFail = []byte(`{"status":400}`)
23 )
24
25 type v0session struct {
26         ws            wsConn
27         sendq         chan<- interface{}
28         db            *sql.DB
29         permChecker   permChecker
30         subscriptions []v0subscribe
31         lastMsgID     uint64
32         log           *logrus.Entry
33         mtx           sync.Mutex
34         setupOnce     sync.Once
35 }
36
37 func NewSessionV0(ws wsConn, sendq chan<- interface{}, ac arvados.Client, db *sql.DB) (session, error) {
38         sess := &v0session{
39                 sendq:       sendq,
40                 ws:          ws,
41                 db:          db,
42                 permChecker: NewPermChecker(ac),
43                 log:         logger(ws.Request().Context()),
44         }
45
46         err := ws.Request().ParseForm()
47         if err != nil {
48                 sess.log.WithError(err).Error("ParseForm failed")
49                 return nil, err
50         }
51         token := ws.Request().Form.Get("api_token")
52         sess.permChecker.SetToken(token)
53         sess.log.WithField("token", token).Debug("set token")
54
55         return sess, nil
56 }
57
58 func (sess *v0session) Receive(buf []byte) error {
59         var sub v0subscribe
60         if err := json.Unmarshal(buf, &sub); err != nil {
61                 sess.log.WithError(err).Info("invalid message from client")
62         } else if sub.Method == "subscribe" {
63                 sub.prepare(sess)
64                 sess.log.WithField("sub", sub).Debug("sub prepared")
65                 sess.sendq <- v0subscribeOK
66                 sess.mtx.Lock()
67                 sess.subscriptions = append(sess.subscriptions, sub)
68                 sess.mtx.Unlock()
69                 sub.sendOldEvents(sess)
70                 return nil
71         } else {
72                 sess.log.WithField("Method", sub.Method).Info("unknown method")
73         }
74         sess.sendq <- v0subscribeFail
75         return nil
76 }
77
78 func (sess *v0session) EventMessage(e *event) ([]byte, error) {
79         detail := e.Detail()
80         if detail == nil {
81                 return nil, nil
82         }
83
84         ok, err := sess.permChecker.Check(detail.ObjectUUID)
85         if err != nil || !ok {
86                 return nil, err
87         }
88
89         msg := map[string]interface{}{
90                 "msgID":             atomic.AddUint64(&sess.lastMsgID, 1),
91                 "id":                detail.ID,
92                 "uuid":              detail.UUID,
93                 "object_uuid":       detail.ObjectUUID,
94                 "object_owner_uuid": detail.ObjectOwnerUUID,
95                 "event_type":        detail.EventType,
96         }
97         if detail.Properties != nil && detail.Properties["text"] != nil {
98                 msg["properties"] = detail.Properties
99         } else {
100                 msgProps := map[string]map[string]interface{}{}
101                 for _, ak := range []string{"old_attributes", "new_attributes"} {
102                         eventAttrs, ok := detail.Properties[ak].(map[string]interface{})
103                         if !ok {
104                                 continue
105                         }
106                         msgAttrs := map[string]interface{}{}
107                         for _, k := range sendObjectAttributes {
108                                 if v, ok := eventAttrs[k]; ok {
109                                         msgAttrs[k] = v
110                                 }
111                         }
112                         msgProps[ak] = msgAttrs
113                 }
114                 msg["properties"] = msgProps
115         }
116         return json.Marshal(msg)
117 }
118
119 func (sess *v0session) Filter(e *event) bool {
120         sess.mtx.Lock()
121         defer sess.mtx.Unlock()
122         for _, sub := range sess.subscriptions {
123                 if sub.match(sess, e) {
124                         return true
125                 }
126         }
127         return false
128 }
129
130 func (sub *v0subscribe) sendOldEvents(sess *v0session) {
131         if sub.LastLogID == 0 {
132                 return
133         }
134         sess.log.WithField("LastLogID", sub.LastLogID).Debug("getOldEvents")
135         // Here we do a "select id" query and queue an event for every
136         // log since the given ID, then use (*event)Detail() to
137         // retrieve the whole row and decide whether to send it. This
138         // approach is very inefficient if the subscriber asks for
139         // last_log_id==1, even if the filters end up matching very
140         // few events.
141         //
142         // To mitigate this, filter on "created > 10 minutes ago" when
143         // retrieving the list of old event IDs to consider.
144         rows, err := sess.db.Query(
145                 `SELECT id FROM logs WHERE id > $1 AND created_at > $2 ORDER BY id`,
146                 sub.LastLogID,
147                 time.Now().UTC().Add(-10*time.Minute).Format(time.RFC3339Nano))
148         if err != nil {
149                 sess.log.WithError(err).Error("db.Query failed")
150                 return
151         }
152         for rows.Next() {
153                 var id uint64
154                 err := rows.Scan(&id)
155                 if err != nil {
156                         sess.log.WithError(err).Error("row Scan failed")
157                         continue
158                 }
159                 for len(sess.sendq)*2 > cap(sess.sendq) {
160                         // Ugly... but if we fill up the whole client
161                         // queue with a backlog of old events, a
162                         // single new event will overflow it and
163                         // terminate the connection, and then the
164                         // client will probably reconnect and do the
165                         // same thing all over again.
166                         time.Sleep(100 * time.Millisecond)
167                 }
168                 e := &event{
169                         LogID:    id,
170                         Received: time.Now(),
171                         db:       sess.db,
172                 }
173                 if sub.match(sess, e) {
174                         sess.sendq <- e
175                 }
176         }
177         if err := rows.Err(); err != nil {
178                 sess.log.WithError(err).Error("db.Query failed")
179         }
180 }
181
182 type v0subscribe struct {
183         Method    string
184         Filters   []v0filter
185         LastLogID int64 `json:"last_log_id"`
186
187         funcs []func(*event) bool
188 }
189
190 type v0filter [3]interface{}
191
192 func (sub *v0subscribe) match(sess *v0session, e *event) bool {
193         log := sess.log.WithField("LogID", e.LogID)
194         detail := e.Detail()
195         if detail == nil {
196                 log.Error("match failed, no detail")
197                 return false
198         }
199         log = log.WithField("funcs", len(sub.funcs))
200         for i, f := range sub.funcs {
201                 if !f(e) {
202                         log.WithField("func", i).Debug("match failed")
203                         return false
204                 }
205         }
206         log.Debug("match passed")
207         return true
208 }
209
210 func (sub *v0subscribe) prepare(sess *v0session) {
211         for _, f := range sub.Filters {
212                 if len(f) != 3 {
213                         continue
214                 }
215                 if col, ok := f[0].(string); ok && col == "event_type" {
216                         op, ok := f[1].(string)
217                         if !ok || op != "in" {
218                                 continue
219                         }
220                         arr, ok := f[2].([]interface{})
221                         if !ok {
222                                 continue
223                         }
224                         var strs []string
225                         for _, s := range arr {
226                                 if s, ok := s.(string); ok {
227                                         strs = append(strs, s)
228                                 }
229                         }
230                         sub.funcs = append(sub.funcs, func(e *event) bool {
231                                 for _, s := range strs {
232                                         if s == e.Detail().EventType {
233                                                 return true
234                                         }
235                                 }
236                                 return false
237                         })
238                 } else if ok && col == "created_at" {
239                         op, ok := f[1].(string)
240                         if !ok {
241                                 continue
242                         }
243                         tstr, ok := f[2].(string)
244                         if !ok {
245                                 continue
246                         }
247                         t, err := time.Parse(time.RFC3339Nano, tstr)
248                         if err != nil {
249                                 sess.log.WithField("data", tstr).WithError(err).Info("time.Parse failed")
250                                 continue
251                         }
252                         var fn func(*event) bool
253                         switch op {
254                         case ">=":
255                                 fn = func(e *event) bool {
256                                         return !e.Detail().CreatedAt.Before(t)
257                                 }
258                         case "<=":
259                                 fn = func(e *event) bool {
260                                         return !e.Detail().CreatedAt.After(t)
261                                 }
262                         case ">":
263                                 fn = func(e *event) bool {
264                                         return e.Detail().CreatedAt.After(t)
265                                 }
266                         case "<":
267                                 fn = func(e *event) bool {
268                                         return e.Detail().CreatedAt.Before(t)
269                                 }
270                         case "=":
271                                 fn = func(e *event) bool {
272                                         return e.Detail().CreatedAt.Equal(t)
273                                 }
274                         default:
275                                 sess.log.WithField("operator", op).Info("bogus operator")
276                                 continue
277                         }
278                         sub.funcs = append(sub.funcs, fn)
279                 }
280         }
281 }