11 "git.curoverse.com/arvados.git/sdk/go/arvados"
12 "github.com/Sirupsen/logrus"
16 errQueueFull = errors.New("client queue full")
17 errFrameTooBig = errors.New("frame too big")
19 sendObjectAttributes = []string{"state", "name"}
21 v0subscribeOK = []byte(`{"status":200}`)
22 v0subscribeFail = []byte(`{"status":400}`)
25 type v0session struct {
27 sendq chan<- interface{}
29 permChecker permChecker
30 subscriptions []v0subscribe
37 func NewSessionV0(ws wsConn, sendq chan<- interface{}, ac arvados.Client, db *sql.DB) (session, error) {
42 permChecker: NewPermChecker(ac),
43 log: logger(ws.Request().Context()),
46 err := ws.Request().ParseForm()
48 sess.log.WithError(err).Error("ParseForm failed")
51 token := ws.Request().Form.Get("api_token")
52 sess.permChecker.SetToken(token)
53 sess.log.WithField("token", token).Debug("set token")
58 func (sess *v0session) Receive(buf []byte) error {
60 if err := json.Unmarshal(buf, &sub); err != nil {
61 sess.log.WithError(err).Info("invalid message from client")
62 } else if sub.Method == "subscribe" {
64 sess.log.WithField("sub", sub).Debug("sub prepared")
65 sess.sendq <- v0subscribeOK
67 sess.subscriptions = append(sess.subscriptions, sub)
69 sub.sendOldEvents(sess)
72 sess.log.WithField("Method", sub.Method).Info("unknown method")
74 sess.sendq <- v0subscribeFail
78 func (sess *v0session) EventMessage(e *event) ([]byte, error) {
84 ok, err := sess.permChecker.Check(detail.ObjectUUID)
85 if err != nil || !ok {
89 msg := map[string]interface{}{
90 "msgID": atomic.AddUint64(&sess.lastMsgID, 1),
93 "object_uuid": detail.ObjectUUID,
94 "object_owner_uuid": detail.ObjectOwnerUUID,
95 "event_type": detail.EventType,
97 if detail.Properties != nil && detail.Properties["text"] != nil {
98 msg["properties"] = detail.Properties
100 msgProps := map[string]map[string]interface{}{}
101 for _, ak := range []string{"old_attributes", "new_attributes"} {
102 eventAttrs, ok := detail.Properties[ak].(map[string]interface{})
106 msgAttrs := map[string]interface{}{}
107 for _, k := range sendObjectAttributes {
108 if v, ok := eventAttrs[k]; ok {
112 msgProps[ak] = msgAttrs
114 msg["properties"] = msgProps
116 return json.Marshal(msg)
119 func (sess *v0session) Filter(e *event) bool {
121 defer sess.mtx.Unlock()
122 for _, sub := range sess.subscriptions {
123 if sub.match(sess, e) {
130 func (sub *v0subscribe) sendOldEvents(sess *v0session) {
131 if sub.LastLogID == 0 {
134 sess.log.WithField("LastLogID", sub.LastLogID).Debug("getOldEvents")
135 // Here we do a "select id" query and queue an event for every
136 // log since the given ID, then use (*event)Detail() to
137 // retrieve the whole row and decide whether to send it. This
138 // approach is very inefficient if the subscriber asks for
139 // last_log_id==1, even if the filters end up matching very
142 // To mitigate this, filter on "created > 10 minutes ago" when
143 // retrieving the list of old event IDs to consider.
144 rows, err := sess.db.Query(
145 `SELECT id FROM logs WHERE id > $1 AND created_at > $2 ORDER BY id`,
147 time.Now().UTC().Add(-10*time.Minute).Format(time.RFC3339Nano))
149 sess.log.WithError(err).Error("db.Query failed")
154 err := rows.Scan(&id)
156 sess.log.WithError(err).Error("row Scan failed")
159 for len(sess.sendq)*2 > cap(sess.sendq) {
160 // Ugly... but if we fill up the whole client
161 // queue with a backlog of old events, a
162 // single new event will overflow it and
163 // terminate the connection, and then the
164 // client will probably reconnect and do the
165 // same thing all over again.
166 time.Sleep(100 * time.Millisecond)
170 Received: time.Now(),
173 if sub.match(sess, e) {
177 if err := rows.Err(); err != nil {
178 sess.log.WithError(err).Error("db.Query failed")
182 type v0subscribe struct {
185 LastLogID int64 `json:"last_log_id"`
187 funcs []func(*event) bool
190 type v0filter [3]interface{}
192 func (sub *v0subscribe) match(sess *v0session, e *event) bool {
193 log := sess.log.WithField("LogID", e.LogID)
196 log.Error("match failed, no detail")
199 log = log.WithField("funcs", len(sub.funcs))
200 for i, f := range sub.funcs {
202 log.WithField("func", i).Debug("match failed")
206 log.Debug("match passed")
210 func (sub *v0subscribe) prepare(sess *v0session) {
211 for _, f := range sub.Filters {
215 if col, ok := f[0].(string); ok && col == "event_type" {
216 op, ok := f[1].(string)
217 if !ok || op != "in" {
220 arr, ok := f[2].([]interface{})
225 for _, s := range arr {
226 if s, ok := s.(string); ok {
227 strs = append(strs, s)
230 sub.funcs = append(sub.funcs, func(e *event) bool {
231 for _, s := range strs {
232 if s == e.Detail().EventType {
238 } else if ok && col == "created_at" {
239 op, ok := f[1].(string)
243 tstr, ok := f[2].(string)
247 t, err := time.Parse(time.RFC3339Nano, tstr)
249 sess.log.WithField("data", tstr).WithError(err).Info("time.Parse failed")
252 var fn func(*event) bool
255 fn = func(e *event) bool {
256 return !e.Detail().CreatedAt.Before(t)
259 fn = func(e *event) bool {
260 return !e.Detail().CreatedAt.After(t)
263 fn = func(e *event) bool {
264 return e.Detail().CreatedAt.After(t)
267 fn = func(e *event) bool {
268 return e.Detail().CreatedAt.Before(t)
271 fn = func(e *event) bool {
272 return e.Detail().CreatedAt.Equal(t)
275 sess.log.WithField("operator", op).Info("bogus operator")
278 sub.funcs = append(sub.funcs, fn)