8460: Send selected old/new attributes with v0 events.
[arvados.git] / services / ws / session_v0.go
1 package main
2
3 import (
4         "encoding/json"
5         "errors"
6         "log"
7         "sync"
8
9         "git.curoverse.com/arvados.git/sdk/go/arvados"
10 )
11
12 var (
13         errQueueFull   = errors.New("client queue full")
14         errFrameTooBig = errors.New("frame too big")
15
16         sendObjectAttributes = []string{"state", "name"}
17 )
18
19 type sessionV0 struct {
20         ws          wsConn
21         permChecker permChecker
22         subscribed  map[string]bool
23         eventTypes  map[string]bool
24         mtx         sync.Mutex
25         setupOnce   sync.Once
26 }
27
28 type v0subscribe struct {
29         Method  string
30         Filters []v0filter
31 }
32
33 type v0filter []interface{}
34
35 func NewSessionV0(ws wsConn, ac arvados.Client) (session, error) {
36         sess := &sessionV0{
37                 ws:          ws,
38                 permChecker: NewPermChecker(ac),
39                 subscribed:  make(map[string]bool),
40                 eventTypes:  make(map[string]bool),
41         }
42
43         err := ws.Request().ParseForm()
44         if err != nil {
45                 log.Printf("%s ParseForm: %s", ws.Request().RemoteAddr, err)
46                 return nil, err
47         }
48         token := ws.Request().Form.Get("api_token")
49         sess.permChecker.SetToken(token)
50         sess.debugLogf("token = %+q", token)
51
52         return sess, nil
53 }
54
55 func (sess *sessionV0) debugLogf(s string, args ...interface{}) {
56         args = append([]interface{}{sess.ws.Request().RemoteAddr}, args...)
57         debugLogf("%s "+s, args...)
58 }
59
60 // If every client subscription message includes filters consisting
61 // only of [["event_type","in",...]] then send only the requested
62 // event types. Otherwise, clear sess.eventTypes and send all event
63 // types from now on.
64 func (sess *sessionV0) checkFilters(filters []v0filter) {
65         if sess.eventTypes == nil {
66                 // Already received a subscription request without
67                 // event_type filters.
68                 return
69         }
70         eventTypes := sess.eventTypes
71         sess.eventTypes = nil
72         if len(filters) == 0 {
73                 return
74         }
75         useFilters := false
76         for _, f := range filters {
77                 col, ok := f[0].(string)
78                 if !ok || col != "event_type" {
79                         continue
80                 }
81                 op, ok := f[1].(string)
82                 if !ok || op != "in" {
83                         return
84                 }
85                 arr, ok := f[2].([]interface{})
86                 if !ok {
87                         return
88                 }
89                 useFilters = true
90                 for _, s := range arr {
91                         if s, ok := s.(string); ok {
92                                 eventTypes[s] = true
93                         } else {
94                                 return
95                         }
96                 }
97         }
98         if useFilters {
99                 sess.debugLogf("eventTypes %+v", eventTypes)
100                 sess.eventTypes = eventTypes
101         }
102 }
103
104 func (sess *sessionV0) Receive(msg map[string]interface{}, buf []byte) {
105         sess.debugLogf("received message: %+v", msg)
106         var sub v0subscribe
107         if err := json.Unmarshal(buf, &sub); err != nil {
108                 sess.debugLogf("ignored unrecognized request: %s", err)
109                 return
110         }
111         if sub.Method == "subscribe" {
112                 sess.debugLogf("subscribing to *")
113                 sess.mtx.Lock()
114                 sess.checkFilters(sub.Filters)
115                 sess.subscribed["*"] = true
116                 sess.mtx.Unlock()
117         }
118 }
119
120 func (sess *sessionV0) EventMessage(e *event) ([]byte, error) {
121         detail := e.Detail()
122         if detail == nil {
123                 return nil, nil
124         }
125
126         ok, err := sess.permChecker.Check(detail.ObjectUUID)
127         if err != nil || !ok {
128                 return nil, err
129         }
130
131         msg := map[string]interface{}{
132                 "msgID":             e.Serial,
133                 "id":                detail.ID,
134                 "uuid":              detail.UUID,
135                 "object_uuid":       detail.ObjectUUID,
136                 "object_owner_uuid": detail.ObjectOwnerUUID,
137                 "event_type":        detail.EventType,
138         }
139         if detail.Properties != nil && detail.Properties["text"] != nil {
140                 msg["properties"] = detail.Properties
141         } else {
142                 msgProps := map[string]map[string]interface{}{}
143                 for _, ak := range []string{"old_attributes", "new_attributes"} {
144                         eventAttrs, ok := detail.Properties[ak].(map[string]interface{})
145                         if !ok {
146                                 continue
147                         }
148                         msgAttrs := map[string]interface{}{}
149                         for _, k := range sendObjectAttributes {
150                                 if v, ok := eventAttrs[k]; ok {
151                                         msgAttrs[k] = v
152                                 }
153                         }
154                         msgProps[ak] = msgAttrs
155                 }
156                 msg["properties"] = msgProps
157         }
158         return json.Marshal(msg)
159 }
160
161 func (sess *sessionV0) Filter(e *event) bool {
162         detail := e.Detail()
163         sess.mtx.Lock()
164         defer sess.mtx.Unlock()
165         switch {
166         case sess.eventTypes != nil && detail == nil:
167                 return false
168         case sess.eventTypes != nil && !sess.eventTypes[detail.EventType]:
169                 return false
170         case sess.subscribed["*"]:
171                 return true
172         case detail == nil:
173                 return false
174         case sess.subscribed[detail.ObjectUUID]:
175                 return true
176         case sess.subscribed[detail.ObjectOwnerUUID]:
177                 return true
178         default:
179                 return false
180         }
181 }