10903: support need_transaction for job and pipeline_instance cancel methods.
[arvados.git] / services / ws / session_v0.go
1 package main
2
3 import (
4         "database/sql"
5         "encoding/json"
6         "errors"
7         "sync"
8         "sync/atomic"
9         "time"
10
11         "git.curoverse.com/arvados.git/sdk/go/arvados"
12         "github.com/Sirupsen/logrus"
13 )
14
15 var (
16         errQueueFull   = errors.New("client queue full")
17         errFrameTooBig = errors.New("frame too big")
18
19         sendObjectAttributes = []string{"state", "name", "owner_uuid", "portable_data_hash"}
20
21         v0subscribeOK   = []byte(`{"status":200}`)
22         v0subscribeFail = []byte(`{"status":400}`)
23 )
24
25 type v0session struct {
26         ac            *arvados.Client
27         ws            wsConn
28         sendq         chan<- interface{}
29         db            *sql.DB
30         permChecker   permChecker
31         subscriptions []v0subscribe
32         lastMsgID     uint64
33         log           *logrus.Entry
34         mtx           sync.Mutex
35         setupOnce     sync.Once
36 }
37
38 // newSessionV0 returns a v0 session: a partial port of the Rails/puma
39 // implementation, with just enough functionality to support Workbench
40 // and arv-mount.
41 func newSessionV0(ws wsConn, sendq chan<- interface{}, db *sql.DB, pc permChecker, ac *arvados.Client) (session, error) {
42         sess := &v0session{
43                 sendq:       sendq,
44                 ws:          ws,
45                 db:          db,
46                 ac:          ac,
47                 permChecker: pc,
48                 log:         logger(ws.Request().Context()),
49         }
50
51         err := ws.Request().ParseForm()
52         if err != nil {
53                 sess.log.WithError(err).Error("ParseForm failed")
54                 return nil, err
55         }
56         token := ws.Request().Form.Get("api_token")
57         sess.permChecker.SetToken(token)
58         sess.log.WithField("token", token).Debug("set token")
59
60         return sess, nil
61 }
62
63 func (sess *v0session) Receive(buf []byte) error {
64         var sub v0subscribe
65         if err := json.Unmarshal(buf, &sub); err != nil {
66                 sess.log.WithError(err).Info("invalid message from client")
67         } else if sub.Method == "subscribe" {
68                 sub.prepare(sess)
69                 sess.log.WithField("sub", sub).Debug("sub prepared")
70                 sess.sendq <- v0subscribeOK
71                 sess.mtx.Lock()
72                 sess.subscriptions = append(sess.subscriptions, sub)
73                 sess.mtx.Unlock()
74                 sub.sendOldEvents(sess)
75                 return nil
76         } else {
77                 sess.log.WithField("Method", sub.Method).Info("unknown method")
78         }
79         sess.sendq <- v0subscribeFail
80         return nil
81 }
82
83 func (sess *v0session) EventMessage(e *event) ([]byte, error) {
84         detail := e.Detail()
85         if detail == nil {
86                 return nil, nil
87         }
88
89         ok, err := sess.permChecker.Check(detail.ObjectUUID)
90         if err != nil || !ok {
91                 return nil, err
92         }
93
94         kind, _ := sess.ac.KindForUUID(detail.ObjectUUID)
95         msg := map[string]interface{}{
96                 "msgID":             atomic.AddUint64(&sess.lastMsgID, 1),
97                 "id":                detail.ID,
98                 "uuid":              detail.UUID,
99                 "object_uuid":       detail.ObjectUUID,
100                 "object_owner_uuid": detail.ObjectOwnerUUID,
101                 "object_kind":       kind,
102                 "event_type":        detail.EventType,
103                 "event_at":          detail.EventAt,
104         }
105         if detail.Properties != nil && detail.Properties["text"] != nil {
106                 msg["properties"] = detail.Properties
107         } else {
108                 msgProps := map[string]map[string]interface{}{}
109                 for _, ak := range []string{"old_attributes", "new_attributes"} {
110                         eventAttrs, ok := detail.Properties[ak].(map[string]interface{})
111                         if !ok {
112                                 continue
113                         }
114                         msgAttrs := map[string]interface{}{}
115                         for _, k := range sendObjectAttributes {
116                                 if v, ok := eventAttrs[k]; ok {
117                                         msgAttrs[k] = v
118                                 }
119                         }
120                         msgProps[ak] = msgAttrs
121                 }
122                 msg["properties"] = msgProps
123         }
124         return json.Marshal(msg)
125 }
126
127 func (sess *v0session) Filter(e *event) bool {
128         sess.mtx.Lock()
129         defer sess.mtx.Unlock()
130         for _, sub := range sess.subscriptions {
131                 if sub.match(sess, e) {
132                         return true
133                 }
134         }
135         return false
136 }
137
138 func (sub *v0subscribe) sendOldEvents(sess *v0session) {
139         if sub.LastLogID == 0 {
140                 return
141         }
142         sess.log.WithField("LastLogID", sub.LastLogID).Debug("getOldEvents")
143         // Here we do a "select id" query and queue an event for every
144         // log since the given ID, then use (*event)Detail() to
145         // retrieve the whole row and decide whether to send it. This
146         // approach is very inefficient if the subscriber asks for
147         // last_log_id==1, even if the filters end up matching very
148         // few events.
149         //
150         // To mitigate this, filter on "created > 10 minutes ago" when
151         // retrieving the list of old event IDs to consider.
152         rows, err := sess.db.Query(
153                 `SELECT id FROM logs WHERE id > $1 AND created_at > $2 ORDER BY id`,
154                 sub.LastLogID,
155                 time.Now().UTC().Add(-10*time.Minute).Format(time.RFC3339Nano))
156         if err != nil {
157                 sess.log.WithError(err).Error("db.Query failed")
158                 return
159         }
160         for rows.Next() {
161                 var id uint64
162                 err := rows.Scan(&id)
163                 if err != nil {
164                         sess.log.WithError(err).Error("row Scan failed")
165                         continue
166                 }
167                 for len(sess.sendq)*2 > cap(sess.sendq) {
168                         // Ugly... but if we fill up the whole client
169                         // queue with a backlog of old events, a
170                         // single new event will overflow it and
171                         // terminate the connection, and then the
172                         // client will probably reconnect and do the
173                         // same thing all over again.
174                         time.Sleep(100 * time.Millisecond)
175                 }
176                 now := time.Now()
177                 e := &event{
178                         LogID:    id,
179                         Received: now,
180                         Ready:    now,
181                         db:       sess.db,
182                 }
183                 if sub.match(sess, e) {
184                         select {
185                         case sess.sendq <- e:
186                         case <-sess.ws.Request().Context().Done():
187                                 return
188                         }
189                 }
190         }
191         if err := rows.Err(); err != nil {
192                 sess.log.WithError(err).Error("db.Query failed")
193         }
194 }
195
196 type v0subscribe struct {
197         Method    string
198         Filters   []v0filter
199         LastLogID int64 `json:"last_log_id"`
200
201         funcs []func(*event) bool
202 }
203
204 type v0filter [3]interface{}
205
206 func (sub *v0subscribe) match(sess *v0session, e *event) bool {
207         log := sess.log.WithField("LogID", e.LogID)
208         detail := e.Detail()
209         if detail == nil {
210                 log.Error("match failed, no detail")
211                 return false
212         }
213         log = log.WithField("funcs", len(sub.funcs))
214         for i, f := range sub.funcs {
215                 if !f(e) {
216                         log.WithField("func", i).Debug("match failed")
217                         return false
218                 }
219         }
220         log.Debug("match passed")
221         return true
222 }
223
224 func (sub *v0subscribe) prepare(sess *v0session) {
225         for _, f := range sub.Filters {
226                 if len(f) != 3 {
227                         continue
228                 }
229                 if col, ok := f[0].(string); ok && col == "event_type" {
230                         op, ok := f[1].(string)
231                         if !ok || op != "in" {
232                                 continue
233                         }
234                         arr, ok := f[2].([]interface{})
235                         if !ok {
236                                 continue
237                         }
238                         var strs []string
239                         for _, s := range arr {
240                                 if s, ok := s.(string); ok {
241                                         strs = append(strs, s)
242                                 }
243                         }
244                         sub.funcs = append(sub.funcs, func(e *event) bool {
245                                 for _, s := range strs {
246                                         if s == e.Detail().EventType {
247                                                 return true
248                                         }
249                                 }
250                                 return false
251                         })
252                 } else if ok && col == "created_at" {
253                         op, ok := f[1].(string)
254                         if !ok {
255                                 continue
256                         }
257                         tstr, ok := f[2].(string)
258                         if !ok {
259                                 continue
260                         }
261                         t, err := time.Parse(time.RFC3339Nano, tstr)
262                         if err != nil {
263                                 sess.log.WithField("data", tstr).WithError(err).Info("time.Parse failed")
264                                 continue
265                         }
266                         var fn func(*event) bool
267                         switch op {
268                         case ">=":
269                                 fn = func(e *event) bool {
270                                         return !e.Detail().CreatedAt.Before(t)
271                                 }
272                         case "<=":
273                                 fn = func(e *event) bool {
274                                         return !e.Detail().CreatedAt.After(t)
275                                 }
276                         case ">":
277                                 fn = func(e *event) bool {
278                                         return e.Detail().CreatedAt.After(t)
279                                 }
280                         case "<":
281                                 fn = func(e *event) bool {
282                                         return e.Detail().CreatedAt.Before(t)
283                                 }
284                         case "=":
285                                 fn = func(e *event) bool {
286                                         return e.Detail().CreatedAt.Equal(t)
287                                 }
288                         default:
289                                 sess.log.WithField("operator", op).Info("bogus operator")
290                                 continue
291                         }
292                         sub.funcs = append(sub.funcs, fn)
293                 }
294         }
295 }