5622304bf4df562e56b1f478f3f3ecb23bcd9333
[arvados.git] / services / ws / session_v0.go
1 package main
2
3 import (
4         "encoding/json"
5         "errors"
6         "fmt"
7         "log"
8         "net/url"
9         "sync"
10         "time"
11
12         "git.curoverse.com/arvados.git/sdk/go/arvados"
13 )
14
15 var (
16         errQueueFull   = errors.New("client queue full")
17         errFrameTooBig = errors.New("frame too big")
18
19         sendObjectAttributes = []string{"state", "name"}
20
21         v0subscribeOK   = []byte(`{"status":200}`)
22         v0subscribeFail = []byte(`{"status":400}`)
23 )
24
25 type v0session struct {
26         ws            wsConn
27         permChecker   permChecker
28         subscriptions []v0subscribe
29         mtx           sync.Mutex
30         setupOnce     sync.Once
31 }
32
33 func NewSessionV0(ws wsConn, ac arvados.Client) (session, error) {
34         sess := &v0session{
35                 ws:          ws,
36                 permChecker: NewPermChecker(ac),
37         }
38
39         err := ws.Request().ParseForm()
40         if err != nil {
41                 log.Printf("%s ParseForm: %s", ws.Request().RemoteAddr, err)
42                 return nil, err
43         }
44         token := ws.Request().Form.Get("api_token")
45         sess.permChecker.SetToken(token)
46         sess.debugLogf("token = %+q", token)
47
48         return sess, nil
49 }
50
51 func (sess *v0session) debugLogf(s string, args ...interface{}) {
52         args = append([]interface{}{sess.ws.Request().RemoteAddr}, args...)
53         debugLogf("%s "+s, args...)
54 }
55
56 func (sess *v0session) Receive(msg map[string]interface{}, buf []byte) [][]byte {
57         sess.debugLogf("received message: %+v", msg)
58         var sub v0subscribe
59         if err := json.Unmarshal(buf, &sub); err != nil {
60                 sess.debugLogf("ignored unrecognized request: %s", err)
61                 return nil
62         }
63         if sub.Method == "subscribe" {
64                 sub.prepare()
65                 sess.debugLogf("subscription: %v", sub)
66                 sess.mtx.Lock()
67                 sess.subscriptions = append(sess.subscriptions, sub)
68                 sess.mtx.Unlock()
69
70                 resp := [][]byte{v0subscribeOK}
71
72                 if sub.LastLogID > 0 {
73                         // Hack 1: use the permission checker's
74                         // Arvados client to retrieve old log IDs. Use
75                         // "created < 2 hours ago" to ensure the
76                         // database query doesn't get too expensive
77                         // when our client gives us last_log_id==1.
78                         ac := sess.permChecker.(*cachingPermChecker).Client
79                         var old arvados.LogList
80                         ac.RequestAndDecode(&old, "GET", "arvados/v1/logs", nil, url.Values{
81                                 "limit": {"1000"},
82                                 "filters": {fmt.Sprintf(
83                                         `[["id",">",%d],["created_at",">","%s"]]`,
84                                         sub.LastLogID,
85                                         time.Now().UTC().Add(-2*time.Hour).Format(time.RFC3339Nano))},
86                         })
87                         for _, log := range old.Items {
88                                 // Hack 2: populate the event's logRow
89                                 // using the API response -- otherwise
90                                 // Detail() would crash because e.db
91                                 // is nil.
92                                 e := &event{
93                                         LogID:    log.ID,
94                                         Received: time.Now(),
95                                         logRow:   &log,
96                                 }
97                                 msg, err := sess.EventMessage(e)
98                                 if err != nil {
99                                         continue
100                                 }
101                                 resp = append(resp, msg)
102                         }
103                 }
104
105                 return [][]byte{v0subscribeOK}
106         }
107         return [][]byte{v0subscribeFail}
108 }
109
110 func (sess *v0session) EventMessage(e *event) ([]byte, error) {
111         detail := e.Detail()
112         if detail == nil {
113                 return nil, nil
114         }
115
116         ok, err := sess.permChecker.Check(detail.ObjectUUID)
117         if err != nil || !ok {
118                 return nil, err
119         }
120
121         msg := map[string]interface{}{
122                 "msgID":             e.Serial,
123                 "id":                detail.ID,
124                 "uuid":              detail.UUID,
125                 "object_uuid":       detail.ObjectUUID,
126                 "object_owner_uuid": detail.ObjectOwnerUUID,
127                 "event_type":        detail.EventType,
128         }
129         if detail.Properties != nil && detail.Properties["text"] != nil {
130                 msg["properties"] = detail.Properties
131         } else {
132                 msgProps := map[string]map[string]interface{}{}
133                 for _, ak := range []string{"old_attributes", "new_attributes"} {
134                         eventAttrs, ok := detail.Properties[ak].(map[string]interface{})
135                         if !ok {
136                                 continue
137                         }
138                         msgAttrs := map[string]interface{}{}
139                         for _, k := range sendObjectAttributes {
140                                 if v, ok := eventAttrs[k]; ok {
141                                         msgAttrs[k] = v
142                                 }
143                         }
144                         msgProps[ak] = msgAttrs
145                 }
146                 msg["properties"] = msgProps
147         }
148         return json.Marshal(msg)
149 }
150
151 func (sess *v0session) Filter(e *event) bool {
152         sess.mtx.Lock()
153         defer sess.mtx.Unlock()
154         for _, sub := range sess.subscriptions {
155                 if sub.match(e) {
156                         return true
157                 }
158         }
159         return false
160 }
161
162 type v0subscribe struct {
163         Method    string
164         Filters   []v0filter
165         LastLogID int64 `json:"last_log_id"`
166
167         funcs []func(*event) bool
168 }
169
170 type v0filter [3]interface{}
171
172 func (sub *v0subscribe) match(e *event) bool {
173         detail := e.Detail()
174         if detail == nil {
175                 return false
176         }
177         debugLogf("sub.match: len(funcs)==%d", len(sub.funcs))
178         for i, f := range sub.funcs {
179                 if !f(e) {
180                         debugLogf("sub.match: failed on func %d", i)
181                         return false
182                 }
183         }
184         return true
185 }
186
187 func (sub *v0subscribe) prepare() {
188         for _, f := range sub.Filters {
189                 if len(f) != 3 {
190                         continue
191                 }
192                 if col, ok := f[0].(string); ok && col == "event_type" {
193                         op, ok := f[1].(string)
194                         if !ok || op != "in" {
195                                 continue
196                         }
197                         arr, ok := f[2].([]interface{})
198                         if !ok {
199                                 continue
200                         }
201                         var strs []string
202                         for _, s := range arr {
203                                 if s, ok := s.(string); ok {
204                                         strs = append(strs, s)
205                                 }
206                         }
207                         sub.funcs = append(sub.funcs, func(e *event) bool {
208                                 debugLogf("event_type func: %v in %v", e.Detail().EventType, strs)
209                                 for _, s := range strs {
210                                         if s == e.Detail().EventType {
211                                                 return true
212                                         }
213                                 }
214                                 return false
215                         })
216                 } else if ok && col == "created_at" {
217                         op, ok := f[1].(string)
218                         if !ok {
219                                 continue
220                         }
221                         tstr, ok := f[2].(string)
222                         if !ok {
223                                 continue
224                         }
225                         t, err := time.Parse(time.RFC3339Nano, tstr)
226                         if err != nil {
227                                 debugLogf("time.Parse(%q): %s", tstr, err)
228                                 continue
229                         }
230                         switch op {
231                         case ">=":
232                                 sub.funcs = append(sub.funcs, func(e *event) bool {
233                                         debugLogf("created_at func: %v >= %v", e.Detail().CreatedAt, t)
234                                         return !e.Detail().CreatedAt.Before(t)
235                                 })
236                         case "<=":
237                                 sub.funcs = append(sub.funcs, func(e *event) bool {
238                                         debugLogf("created_at func: %v <= %v", e.Detail().CreatedAt, t)
239                                         return !e.Detail().CreatedAt.After(t)
240                                 })
241                         case ">":
242                                 sub.funcs = append(sub.funcs, func(e *event) bool {
243                                         debugLogf("created_at func: %v > %v", e.Detail().CreatedAt, t)
244                                         return e.Detail().CreatedAt.After(t)
245                                 })
246                         case "<":
247                                 sub.funcs = append(sub.funcs, func(e *event) bool {
248                                         debugLogf("created_at func: %v < %v", e.Detail().CreatedAt, t)
249                                         return e.Detail().CreatedAt.Before(t)
250                                 })
251                         case "=":
252                                 sub.funcs = append(sub.funcs, func(e *event) bool {
253                                         debugLogf("created_at func: %v = %v", e.Detail().CreatedAt, t)
254                                         return e.Detail().CreatedAt.Equal(t)
255                                 })
256                         }
257                 }
258         }
259 }