8460: Skip non-log events if filtering by event_type.
[arvados.git] / services / ws / session_v0.go
1 package main
2
3 import (
4         "encoding/json"
5         "errors"
6         "log"
7         "sync"
8
9         "git.curoverse.com/arvados.git/sdk/go/arvados"
10 )
11
12 var (
13         errQueueFull   = errors.New("client queue full")
14         errFrameTooBig = errors.New("frame too big")
15 )
16
17 type sessionV0 struct {
18         ws          wsConn
19         permChecker permChecker
20         subscribed  map[string]bool
21         eventTypes  map[string]bool
22         mtx         sync.Mutex
23         setupOnce   sync.Once
24 }
25
26 type v0subscribe struct {
27         Method  string
28         Filters []v0filter
29 }
30
31 type v0filter []interface{}
32
33 func NewSessionV0(ws wsConn, ac arvados.Client) (session, error) {
34         sess := &sessionV0{
35                 ws:          ws,
36                 permChecker: NewPermChecker(ac),
37                 subscribed:  make(map[string]bool),
38                 eventTypes:  make(map[string]bool),
39         }
40
41         err := ws.Request().ParseForm()
42         if err != nil {
43                 log.Printf("%s ParseForm: %s", ws.Request().RemoteAddr, err)
44                 return nil, err
45         }
46         token := ws.Request().Form.Get("api_token")
47         sess.permChecker.SetToken(token)
48         sess.debugLogf("token = %+q", token)
49
50         return sess, nil
51 }
52
53 func (sess *sessionV0) debugLogf(s string, args ...interface{}) {
54         args = append([]interface{}{sess.ws.Request().RemoteAddr}, args...)
55         debugLogf("%s "+s, args...)
56 }
57
58 // If every client subscription message includes filters consisting
59 // only of [["event_type","in",...]] then send only the requested
60 // event types. Otherwise, clear sess.eventTypes and send all event
61 // types from now on.
62 func (sess *sessionV0) checkFilters(filters []v0filter) {
63         if sess.eventTypes == nil {
64                 // Already received a subscription request without
65                 // event_type filters.
66                 return
67         }
68         eventTypes := sess.eventTypes
69         sess.eventTypes = nil
70         if len(filters) == 0 {
71                 return
72         }
73         useFilters := false
74         for _, f := range filters {
75                 col, ok := f[0].(string)
76                 if !ok || col != "event_type" {
77                         continue
78                 }
79                 op, ok := f[1].(string)
80                 if !ok || op != "in" {
81                         return
82                 }
83                 arr, ok := f[2].([]interface{})
84                 if !ok {
85                         return
86                 }
87                 useFilters = true
88                 for _, s := range arr {
89                         if s, ok := s.(string); ok {
90                                 eventTypes[s] = true
91                         } else {
92                                 return
93                         }
94                 }
95         }
96         if useFilters {
97                 sess.debugLogf("eventTypes %+v", eventTypes)
98                 sess.eventTypes = eventTypes
99         }
100 }
101
102 func (sess *sessionV0) Receive(msg map[string]interface{}, buf []byte) {
103         sess.debugLogf("received message: %+v", msg)
104         var sub v0subscribe
105         if err := json.Unmarshal(buf, &sub); err != nil {
106                 sess.debugLogf("ignored unrecognized request: %s", err)
107                 return
108         }
109         if sub.Method == "subscribe" {
110                 sess.debugLogf("subscribing to *")
111                 sess.mtx.Lock()
112                 sess.checkFilters(sub.Filters)
113                 sess.subscribed["*"] = true
114                 sess.mtx.Unlock()
115         }
116 }
117
118 func (sess *sessionV0) EventMessage(e *event) ([]byte, error) {
119         detail := e.Detail()
120         if detail == nil {
121                 return nil, nil
122         }
123
124         ok, err := sess.permChecker.Check(detail.ObjectUUID)
125         if err != nil || !ok {
126                 return nil, err
127         }
128
129         msg := map[string]interface{}{
130                 "msgID":             e.Serial,
131                 "id":                detail.ID,
132                 "uuid":              detail.UUID,
133                 "object_uuid":       detail.ObjectUUID,
134                 "object_owner_uuid": detail.ObjectOwnerUUID,
135                 "event_type":        detail.EventType,
136         }
137         if detail.Properties != nil && detail.Properties["text"] != nil {
138                 msg["properties"] = detail.Properties
139         }
140         return json.Marshal(msg)
141 }
142
143 func (sess *sessionV0) Filter(e *event) bool {
144         detail := e.Detail()
145         sess.mtx.Lock()
146         defer sess.mtx.Unlock()
147         switch {
148         case sess.eventTypes != nil && detail == nil:
149                 return false
150         case sess.eventTypes != nil && !sess.eventTypes[detail.EventType]:
151                 return false
152         case sess.subscribed["*"]:
153                 return true
154         case detail == nil:
155                 return false
156         case sess.subscribed[detail.ObjectUUID]:
157                 return true
158         case sess.subscribed[detail.ObjectOwnerUUID]:
159                 return true
160         default:
161                 return false
162         }
163 }