9 "git.curoverse.com/arvados.git/sdk/go/arvados"
13 errQueueFull = errors.New("client queue full")
14 errFrameTooBig = errors.New("frame too big")
17 type sessionV0 struct {
19 permChecker permChecker
20 subscribed map[string]bool
21 eventTypes map[string]bool
26 type v0subscribe struct {
31 type v0filter []interface{}
33 func NewSessionV0(ws wsConn, ac arvados.Client) (session, error) {
36 permChecker: NewPermChecker(ac),
37 subscribed: make(map[string]bool),
38 eventTypes: make(map[string]bool),
41 err := ws.Request().ParseForm()
43 log.Printf("%s ParseForm: %s", ws.Request().RemoteAddr, err)
46 token := ws.Request().Form.Get("api_token")
47 sess.permChecker.SetToken(token)
48 sess.debugLogf("token = %+q", token)
53 func (sess *sessionV0) debugLogf(s string, args ...interface{}) {
54 args = append([]interface{}{sess.ws.Request().RemoteAddr}, args...)
55 debugLogf("%s "+s, args...)
58 // If every client subscription message includes filters consisting
59 // only of [["event_type","in",...]] then send only the requested
60 // event types. Otherwise, clear sess.eventTypes and send all event
62 func (sess *sessionV0) checkFilters(filters []v0filter) {
63 if sess.eventTypes == nil {
64 // Already received a subscription request without
65 // event_type filters.
68 eventTypes := sess.eventTypes
70 if len(filters) == 0 {
74 for _, f := range filters {
75 col, ok := f[0].(string)
76 if !ok || col != "event_type" {
79 op, ok := f[1].(string)
80 if !ok || op != "in" {
83 arr, ok := f[2].([]interface{})
88 for _, s := range arr {
89 if s, ok := s.(string); ok {
97 sess.debugLogf("eventTypes %+v", eventTypes)
98 sess.eventTypes = eventTypes
102 func (sess *sessionV0) Receive(msg map[string]interface{}, buf []byte) {
103 sess.debugLogf("received message: %+v", msg)
105 if err := json.Unmarshal(buf, &sub); err != nil {
106 sess.debugLogf("ignored unrecognized request: %s", err)
109 if sub.Method == "subscribe" {
110 sess.debugLogf("subscribing to *")
112 sess.checkFilters(sub.Filters)
113 sess.subscribed["*"] = true
118 func (sess *sessionV0) EventMessage(e *event) ([]byte, error) {
124 ok, err := sess.permChecker.Check(detail.ObjectUUID)
125 if err != nil || !ok {
129 msg := map[string]interface{}{
133 "object_uuid": detail.ObjectUUID,
134 "object_owner_uuid": detail.ObjectOwnerUUID,
135 "event_type": detail.EventType,
137 if detail.Properties != nil && detail.Properties["text"] != nil {
138 msg["properties"] = detail.Properties
140 return json.Marshal(msg)
143 func (sess *sessionV0) Filter(e *event) bool {
146 defer sess.mtx.Unlock()
148 case sess.eventTypes != nil && !sess.eventTypes[detail.EventType]:
150 case sess.subscribed["*"]:
154 case sess.subscribed[detail.ObjectUUID]:
156 case sess.subscribed[detail.ObjectOwnerUUID]: