9 "git.curoverse.com/arvados.git/sdk/go/arvados"
13 errQueueFull = errors.New("client queue full")
14 errFrameTooBig = errors.New("frame too big")
16 sendObjectAttributes = []string{"state", "name"}
19 type sessionV0 struct {
21 permChecker permChecker
22 subscribed map[string]bool
23 eventTypes map[string]bool
28 type v0subscribe struct {
33 type v0filter []interface{}
35 func NewSessionV0(ws wsConn, ac arvados.Client) (session, error) {
38 permChecker: NewPermChecker(ac),
39 subscribed: make(map[string]bool),
40 eventTypes: make(map[string]bool),
43 err := ws.Request().ParseForm()
45 log.Printf("%s ParseForm: %s", ws.Request().RemoteAddr, err)
48 token := ws.Request().Form.Get("api_token")
49 sess.permChecker.SetToken(token)
50 sess.debugLogf("token = %+q", token)
55 func (sess *sessionV0) debugLogf(s string, args ...interface{}) {
56 args = append([]interface{}{sess.ws.Request().RemoteAddr}, args...)
57 debugLogf("%s "+s, args...)
60 // If every client subscription message includes filters consisting
61 // only of [["event_type","in",...]] then send only the requested
62 // event types. Otherwise, clear sess.eventTypes and send all event
64 func (sess *sessionV0) checkFilters(filters []v0filter) {
65 if sess.eventTypes == nil {
66 // Already received a subscription request without
67 // event_type filters.
70 eventTypes := sess.eventTypes
72 if len(filters) == 0 {
76 for _, f := range filters {
77 col, ok := f[0].(string)
78 if !ok || col != "event_type" {
81 op, ok := f[1].(string)
82 if !ok || op != "in" {
85 arr, ok := f[2].([]interface{})
90 for _, s := range arr {
91 if s, ok := s.(string); ok {
99 sess.debugLogf("eventTypes %+v", eventTypes)
100 sess.eventTypes = eventTypes
104 func (sess *sessionV0) Receive(msg map[string]interface{}, buf []byte) []byte {
105 sess.debugLogf("received message: %+v", msg)
107 if err := json.Unmarshal(buf, &sub); err != nil {
108 sess.debugLogf("ignored unrecognized request: %s", err)
111 if sub.Method == "subscribe" {
112 sess.debugLogf("subscribing to *")
114 sess.checkFilters(sub.Filters)
115 sess.subscribed["*"] = true
117 return []byte(`{"status":200}`)
119 return []byte(`{"status":400}`)
122 func (sess *sessionV0) EventMessage(e *event) ([]byte, error) {
128 ok, err := sess.permChecker.Check(detail.ObjectUUID)
129 if err != nil || !ok {
133 msg := map[string]interface{}{
137 "object_uuid": detail.ObjectUUID,
138 "object_owner_uuid": detail.ObjectOwnerUUID,
139 "event_type": detail.EventType,
141 if detail.Properties != nil && detail.Properties["text"] != nil {
142 msg["properties"] = detail.Properties
144 msgProps := map[string]map[string]interface{}{}
145 for _, ak := range []string{"old_attributes", "new_attributes"} {
146 eventAttrs, ok := detail.Properties[ak].(map[string]interface{})
150 msgAttrs := map[string]interface{}{}
151 for _, k := range sendObjectAttributes {
152 if v, ok := eventAttrs[k]; ok {
156 msgProps[ak] = msgAttrs
158 msg["properties"] = msgProps
160 return json.Marshal(msg)
163 func (sess *sessionV0) Filter(e *event) bool {
166 defer sess.mtx.Unlock()
168 case sess.eventTypes != nil && detail == nil:
170 case sess.eventTypes != nil && !sess.eventTypes[detail.EventType]:
172 case sess.subscribed["*"]:
176 case sess.subscribed[detail.ObjectUUID]:
178 case sess.subscribed[detail.ObjectOwnerUUID]: