8460: Allow session Receive handler to queue multiple messages.
[arvados.git] / services / ws / session_v0.go
1 package main
2
3 import (
4         "encoding/json"
5         "errors"
6         "log"
7         "sync"
8         "time"
9
10         "git.curoverse.com/arvados.git/sdk/go/arvados"
11 )
12
13 var (
14         errQueueFull   = errors.New("client queue full")
15         errFrameTooBig = errors.New("frame too big")
16
17         sendObjectAttributes = []string{"state", "name"}
18
19         v0subscribeOK   = []byte(`{"status":200}`)
20         v0subscribeFail = []byte(`{"status":400}`)
21 )
22
23 type v0session struct {
24         ws            wsConn
25         permChecker   permChecker
26         subscriptions []v0subscribe
27         mtx           sync.Mutex
28         setupOnce     sync.Once
29 }
30
31 func NewSessionV0(ws wsConn, ac arvados.Client) (session, error) {
32         sess := &v0session{
33                 ws:          ws,
34                 permChecker: NewPermChecker(ac),
35         }
36
37         err := ws.Request().ParseForm()
38         if err != nil {
39                 log.Printf("%s ParseForm: %s", ws.Request().RemoteAddr, err)
40                 return nil, err
41         }
42         token := ws.Request().Form.Get("api_token")
43         sess.permChecker.SetToken(token)
44         sess.debugLogf("token = %+q", token)
45
46         return sess, nil
47 }
48
49 func (sess *v0session) debugLogf(s string, args ...interface{}) {
50         args = append([]interface{}{sess.ws.Request().RemoteAddr}, args...)
51         debugLogf("%s "+s, args...)
52 }
53
54 func (sess *v0session) Receive(msg map[string]interface{}, buf []byte) [][]byte {
55         sess.debugLogf("received message: %+v", msg)
56         var sub v0subscribe
57         if err := json.Unmarshal(buf, &sub); err != nil {
58                 sess.debugLogf("ignored unrecognized request: %s", err)
59                 return nil
60         }
61         if sub.Method == "subscribe" {
62                 sub.prepare()
63                 sess.debugLogf("subscription: %v", sub)
64                 sess.mtx.Lock()
65                 sess.subscriptions = append(sess.subscriptions, sub)
66                 sess.mtx.Unlock()
67                 return [][]byte{v0subscribeOK}
68         }
69         return [][]byte{v0subscribeFail}
70 }
71
72 func (sess *v0session) EventMessage(e *event) ([]byte, error) {
73         detail := e.Detail()
74         if detail == nil {
75                 return nil, nil
76         }
77
78         ok, err := sess.permChecker.Check(detail.ObjectUUID)
79         if err != nil || !ok {
80                 return nil, err
81         }
82
83         msg := map[string]interface{}{
84                 "msgID":             e.Serial,
85                 "id":                detail.ID,
86                 "uuid":              detail.UUID,
87                 "object_uuid":       detail.ObjectUUID,
88                 "object_owner_uuid": detail.ObjectOwnerUUID,
89                 "event_type":        detail.EventType,
90         }
91         if detail.Properties != nil && detail.Properties["text"] != nil {
92                 msg["properties"] = detail.Properties
93         } else {
94                 msgProps := map[string]map[string]interface{}{}
95                 for _, ak := range []string{"old_attributes", "new_attributes"} {
96                         eventAttrs, ok := detail.Properties[ak].(map[string]interface{})
97                         if !ok {
98                                 continue
99                         }
100                         msgAttrs := map[string]interface{}{}
101                         for _, k := range sendObjectAttributes {
102                                 if v, ok := eventAttrs[k]; ok {
103                                         msgAttrs[k] = v
104                                 }
105                         }
106                         msgProps[ak] = msgAttrs
107                 }
108                 msg["properties"] = msgProps
109         }
110         return json.Marshal(msg)
111 }
112
113 func (sess *v0session) Filter(e *event) bool {
114         sess.mtx.Lock()
115         defer sess.mtx.Unlock()
116         for _, sub := range sess.subscriptions {
117                 if sub.match(e) {
118                         return true
119                 }
120         }
121         return false
122 }
123
124 type v0subscribe struct {
125         Method  string
126         Filters []v0filter
127         funcs   []func(*event) bool
128 }
129
130 type v0filter [3]interface{}
131
132 func (sub *v0subscribe) match(e *event) bool {
133         detail := e.Detail()
134         if detail == nil {
135                 return false
136         }
137         debugLogf("sub.match: len(funcs)==%d", len(sub.funcs))
138         for i, f := range sub.funcs {
139                 if !f(e) {
140                         debugLogf("sub.match: failed on func %d", i)
141                         return false
142                 }
143         }
144         return true
145 }
146
147 func (sub *v0subscribe) prepare() {
148         for _, f := range sub.Filters {
149                 if len(f) != 3 {
150                         continue
151                 }
152                 if col, ok := f[0].(string); ok && col == "event_type" {
153                         op, ok := f[1].(string)
154                         if !ok || op != "in" {
155                                 continue
156                         }
157                         arr, ok := f[2].([]interface{})
158                         if !ok {
159                                 continue
160                         }
161                         var strs []string
162                         for _, s := range arr {
163                                 if s, ok := s.(string); ok {
164                                         strs = append(strs, s)
165                                 }
166                         }
167                         sub.funcs = append(sub.funcs, func(e *event) bool {
168                                 debugLogf("event_type func: %v in %v", e.Detail().EventType, strs)
169                                 for _, s := range strs {
170                                         if s == e.Detail().EventType {
171                                                 return true
172                                         }
173                                 }
174                                 return false
175                         })
176                 } else if ok && col == "created_at" {
177                         op, ok := f[1].(string)
178                         if !ok {
179                                 continue
180                         }
181                         tstr, ok := f[2].(string)
182                         if !ok {
183                                 continue
184                         }
185                         t, err := time.Parse(time.RFC3339Nano, tstr)
186                         if err != nil {
187                                 debugLogf("time.Parse(%q): %s", tstr, err)
188                                 continue
189                         }
190                         switch op {
191                         case ">=":
192                                 sub.funcs = append(sub.funcs, func(e *event) bool {
193                                         debugLogf("created_at func: %v >= %v", e.Detail().CreatedAt, t)
194                                         return !e.Detail().CreatedAt.Before(t)
195                                 })
196                         case "<=":
197                                 sub.funcs = append(sub.funcs, func(e *event) bool {
198                                         debugLogf("created_at func: %v <= %v", e.Detail().CreatedAt, t)
199                                         return !e.Detail().CreatedAt.After(t)
200                                 })
201                         case ">":
202                                 sub.funcs = append(sub.funcs, func(e *event) bool {
203                                         debugLogf("created_at func: %v > %v", e.Detail().CreatedAt, t)
204                                         return e.Detail().CreatedAt.After(t)
205                                 })
206                         case "<":
207                                 sub.funcs = append(sub.funcs, func(e *event) bool {
208                                         debugLogf("created_at func: %v < %v", e.Detail().CreatedAt, t)
209                                         return e.Detail().CreatedAt.Before(t)
210                                 })
211                         case "=":
212                                 sub.funcs = append(sub.funcs, func(e *event) bool {
213                                         debugLogf("created_at func: %v = %v", e.Detail().CreatedAt, t)
214                                         return e.Detail().CreatedAt.Equal(t)
215                                 })
216                         }
217                 }
218         }
219 }