8460: Send {"status":200} messages. Bring up ws server for Python SDK tests.
[arvados.git] / services / ws / session_v0.go
1 package main
2
3 import (
4         "encoding/json"
5         "errors"
6         "log"
7         "sync"
8
9         "git.curoverse.com/arvados.git/sdk/go/arvados"
10 )
11
12 var (
13         errQueueFull   = errors.New("client queue full")
14         errFrameTooBig = errors.New("frame too big")
15
16         sendObjectAttributes = []string{"state", "name"}
17 )
18
19 type sessionV0 struct {
20         ws          wsConn
21         permChecker permChecker
22         subscribed  map[string]bool
23         eventTypes  map[string]bool
24         mtx         sync.Mutex
25         setupOnce   sync.Once
26 }
27
28 type v0subscribe struct {
29         Method  string
30         Filters []v0filter
31 }
32
33 type v0filter []interface{}
34
35 func NewSessionV0(ws wsConn, ac arvados.Client) (session, error) {
36         sess := &sessionV0{
37                 ws:          ws,
38                 permChecker: NewPermChecker(ac),
39                 subscribed:  make(map[string]bool),
40                 eventTypes:  make(map[string]bool),
41         }
42
43         err := ws.Request().ParseForm()
44         if err != nil {
45                 log.Printf("%s ParseForm: %s", ws.Request().RemoteAddr, err)
46                 return nil, err
47         }
48         token := ws.Request().Form.Get("api_token")
49         sess.permChecker.SetToken(token)
50         sess.debugLogf("token = %+q", token)
51
52         return sess, nil
53 }
54
55 func (sess *sessionV0) debugLogf(s string, args ...interface{}) {
56         args = append([]interface{}{sess.ws.Request().RemoteAddr}, args...)
57         debugLogf("%s "+s, args...)
58 }
59
60 // If every client subscription message includes filters consisting
61 // only of [["event_type","in",...]] then send only the requested
62 // event types. Otherwise, clear sess.eventTypes and send all event
63 // types from now on.
64 func (sess *sessionV0) checkFilters(filters []v0filter) {
65         if sess.eventTypes == nil {
66                 // Already received a subscription request without
67                 // event_type filters.
68                 return
69         }
70         eventTypes := sess.eventTypes
71         sess.eventTypes = nil
72         if len(filters) == 0 {
73                 return
74         }
75         useFilters := false
76         for _, f := range filters {
77                 col, ok := f[0].(string)
78                 if !ok || col != "event_type" {
79                         continue
80                 }
81                 op, ok := f[1].(string)
82                 if !ok || op != "in" {
83                         return
84                 }
85                 arr, ok := f[2].([]interface{})
86                 if !ok {
87                         return
88                 }
89                 useFilters = true
90                 for _, s := range arr {
91                         if s, ok := s.(string); ok {
92                                 eventTypes[s] = true
93                         } else {
94                                 return
95                         }
96                 }
97         }
98         if useFilters {
99                 sess.debugLogf("eventTypes %+v", eventTypes)
100                 sess.eventTypes = eventTypes
101         }
102 }
103
104 func (sess *sessionV0) Receive(msg map[string]interface{}, buf []byte) []byte {
105         sess.debugLogf("received message: %+v", msg)
106         var sub v0subscribe
107         if err := json.Unmarshal(buf, &sub); err != nil {
108                 sess.debugLogf("ignored unrecognized request: %s", err)
109                 return nil
110         }
111         if sub.Method == "subscribe" {
112                 sess.debugLogf("subscribing to *")
113                 sess.mtx.Lock()
114                 sess.checkFilters(sub.Filters)
115                 sess.subscribed["*"] = true
116                 sess.mtx.Unlock()
117                 return []byte(`{"status":200}`)
118         }
119         return []byte(`{"status":400}`)
120 }
121
122 func (sess *sessionV0) EventMessage(e *event) ([]byte, error) {
123         detail := e.Detail()
124         if detail == nil {
125                 return nil, nil
126         }
127
128         ok, err := sess.permChecker.Check(detail.ObjectUUID)
129         if err != nil || !ok {
130                 return nil, err
131         }
132
133         msg := map[string]interface{}{
134                 "msgID":             e.Serial,
135                 "id":                detail.ID,
136                 "uuid":              detail.UUID,
137                 "object_uuid":       detail.ObjectUUID,
138                 "object_owner_uuid": detail.ObjectOwnerUUID,
139                 "event_type":        detail.EventType,
140         }
141         if detail.Properties != nil && detail.Properties["text"] != nil {
142                 msg["properties"] = detail.Properties
143         } else {
144                 msgProps := map[string]map[string]interface{}{}
145                 for _, ak := range []string{"old_attributes", "new_attributes"} {
146                         eventAttrs, ok := detail.Properties[ak].(map[string]interface{})
147                         if !ok {
148                                 continue
149                         }
150                         msgAttrs := map[string]interface{}{}
151                         for _, k := range sendObjectAttributes {
152                                 if v, ok := eventAttrs[k]; ok {
153                                         msgAttrs[k] = v
154                                 }
155                         }
156                         msgProps[ak] = msgAttrs
157                 }
158                 msg["properties"] = msgProps
159         }
160         return json.Marshal(msg)
161 }
162
163 func (sess *sessionV0) Filter(e *event) bool {
164         detail := e.Detail()
165         sess.mtx.Lock()
166         defer sess.mtx.Unlock()
167         switch {
168         case sess.eventTypes != nil && detail == nil:
169                 return false
170         case sess.eventTypes != nil && !sess.eventTypes[detail.EventType]:
171                 return false
172         case sess.subscribed["*"]:
173                 return true
174         case detail == nil:
175                 return false
176         case sess.subscribed[detail.ObjectUUID]:
177                 return true
178         case sess.subscribed[detail.ObjectOwnerUUID]:
179                 return true
180         default:
181                 return false
182         }
183 }