8460: Support created_at filters.
[arvados.git] / services / ws / session_v0.go
1 package main
2
3 import (
4         "encoding/json"
5         "errors"
6         "log"
7         "sync"
8         "time"
9
10         "git.curoverse.com/arvados.git/sdk/go/arvados"
11 )
12
13 var (
14         errQueueFull   = errors.New("client queue full")
15         errFrameTooBig = errors.New("frame too big")
16
17         sendObjectAttributes = []string{"state", "name"}
18 )
19
20 type v0session struct {
21         ws            wsConn
22         permChecker   permChecker
23         subscriptions []v0subscribe
24         mtx           sync.Mutex
25         setupOnce     sync.Once
26 }
27
28 func NewSessionV0(ws wsConn, ac arvados.Client) (session, error) {
29         sess := &v0session{
30                 ws:          ws,
31                 permChecker: NewPermChecker(ac),
32         }
33
34         err := ws.Request().ParseForm()
35         if err != nil {
36                 log.Printf("%s ParseForm: %s", ws.Request().RemoteAddr, err)
37                 return nil, err
38         }
39         token := ws.Request().Form.Get("api_token")
40         sess.permChecker.SetToken(token)
41         sess.debugLogf("token = %+q", token)
42
43         return sess, nil
44 }
45
46 func (sess *v0session) debugLogf(s string, args ...interface{}) {
47         args = append([]interface{}{sess.ws.Request().RemoteAddr}, args...)
48         debugLogf("%s "+s, args...)
49 }
50
51 func (sess *v0session) Receive(msg map[string]interface{}, buf []byte) []byte {
52         sess.debugLogf("received message: %+v", msg)
53         var sub v0subscribe
54         if err := json.Unmarshal(buf, &sub); err != nil {
55                 sess.debugLogf("ignored unrecognized request: %s", err)
56                 return nil
57         }
58         if sub.Method == "subscribe" {
59                 sub.prepare()
60                 sess.debugLogf("subscription: %v", sub)
61                 sess.mtx.Lock()
62                 sess.subscriptions = append(sess.subscriptions, sub)
63                 sess.mtx.Unlock()
64                 return []byte(`{"status":200}`)
65         }
66         return []byte(`{"status":400}`)
67 }
68
69 func (sess *v0session) EventMessage(e *event) ([]byte, error) {
70         detail := e.Detail()
71         if detail == nil {
72                 return nil, nil
73         }
74
75         ok, err := sess.permChecker.Check(detail.ObjectUUID)
76         if err != nil || !ok {
77                 return nil, err
78         }
79
80         msg := map[string]interface{}{
81                 "msgID":             e.Serial,
82                 "id":                detail.ID,
83                 "uuid":              detail.UUID,
84                 "object_uuid":       detail.ObjectUUID,
85                 "object_owner_uuid": detail.ObjectOwnerUUID,
86                 "event_type":        detail.EventType,
87         }
88         if detail.Properties != nil && detail.Properties["text"] != nil {
89                 msg["properties"] = detail.Properties
90         } else {
91                 msgProps := map[string]map[string]interface{}{}
92                 for _, ak := range []string{"old_attributes", "new_attributes"} {
93                         eventAttrs, ok := detail.Properties[ak].(map[string]interface{})
94                         if !ok {
95                                 continue
96                         }
97                         msgAttrs := map[string]interface{}{}
98                         for _, k := range sendObjectAttributes {
99                                 if v, ok := eventAttrs[k]; ok {
100                                         msgAttrs[k] = v
101                                 }
102                         }
103                         msgProps[ak] = msgAttrs
104                 }
105                 msg["properties"] = msgProps
106         }
107         return json.Marshal(msg)
108 }
109
110 func (sess *v0session) Filter(e *event) bool {
111         sess.mtx.Lock()
112         defer sess.mtx.Unlock()
113         for _, sub := range sess.subscriptions {
114                 if sub.match(e) {
115                         return true
116                 }
117         }
118         return false
119 }
120
121 type v0subscribe struct {
122         Method  string
123         Filters []v0filter
124         funcs   []func(*event) bool
125 }
126
127 type v0filter [3]interface{}
128
129 func (sub *v0subscribe) match(e *event) bool {
130         detail := e.Detail()
131         if detail == nil {
132                 return false
133         }
134         debugLogf("sub.match: len(funcs)==%d", len(sub.funcs))
135         for i, f := range sub.funcs {
136                 if !f(e) {
137                         debugLogf("sub.match: failed on func %d", i)
138                         return false
139                 }
140         }
141         return true
142 }
143
144 func (sub *v0subscribe) prepare() {
145         for _, f := range sub.Filters {
146                 if len(f) != 3 {
147                         continue
148                 }
149                 if col, ok := f[0].(string); ok && col == "event_type" {
150                         op, ok := f[1].(string)
151                         if !ok || op != "in" {
152                                 continue
153                         }
154                         arr, ok := f[2].([]interface{})
155                         if !ok {
156                                 continue
157                         }
158                         var strs []string
159                         for _, s := range arr {
160                                 if s, ok := s.(string); ok {
161                                         strs = append(strs, s)
162                                 }
163                         }
164                         sub.funcs = append(sub.funcs, func(e *event) bool {
165                                 debugLogf("event_type func: %v in %v", e.Detail().EventType, strs)
166                                 for _, s := range strs {
167                                         if s == e.Detail().EventType {
168                                                 return true
169                                         }
170                                 }
171                                 return false
172                         })
173                 } else if ok && col == "created_at" {
174                         op, ok := f[1].(string)
175                         if !ok {
176                                 continue
177                         }
178                         tstr, ok := f[2].(string)
179                         if !ok {
180                                 continue
181                         }
182                         t, err := time.Parse(time.RFC3339Nano, tstr)
183                         if err != nil {
184                                 debugLogf("time.Parse(%q): %s", tstr, err)
185                                 continue
186                         }
187                         switch op {
188                         case ">=":
189                                 sub.funcs = append(sub.funcs, func(e *event) bool {
190                                         debugLogf("created_at func: %v >= %v", e.Detail().CreatedAt, t)
191                                         return !e.Detail().CreatedAt.Before(t)
192                                 })
193                         case "<=":
194                                 sub.funcs = append(sub.funcs, func(e *event) bool {
195                                         debugLogf("created_at func: %v <= %v", e.Detail().CreatedAt, t)
196                                         return !e.Detail().CreatedAt.After(t)
197                                 })
198                         case ">":
199                                 sub.funcs = append(sub.funcs, func(e *event) bool {
200                                         debugLogf("created_at func: %v > %v", e.Detail().CreatedAt, t)
201                                         return e.Detail().CreatedAt.After(t)
202                                 })
203                         case "<":
204                                 sub.funcs = append(sub.funcs, func(e *event) bool {
205                                         debugLogf("created_at func: %v < %v", e.Detail().CreatedAt, t)
206                                         return e.Detail().CreatedAt.Before(t)
207                                 })
208                         case "=":
209                                 sub.funcs = append(sub.funcs, func(e *event) bool {
210                                         debugLogf("created_at func: %v = %v", e.Detail().CreatedAt, t)
211                                         return e.Detail().CreatedAt.Equal(t)
212                                 })
213                         }
214                 }
215         }
216 }