10 "git.curoverse.com/arvados.git/sdk/go/arvados"
11 log "github.com/Sirupsen/logrus"
15 errQueueFull = errors.New("client queue full")
16 errFrameTooBig = errors.New("frame too big")
18 sendObjectAttributes = []string{"state", "name"}
20 v0subscribeOK = []byte(`{"status":200}`)
21 v0subscribeFail = []byte(`{"status":400}`)
24 type v0session struct {
27 permChecker permChecker
28 subscriptions []v0subscribe
34 func NewSessionV0(ws wsConn, ac arvados.Client, db *sql.DB) (session, error) {
38 permChecker: NewPermChecker(ac),
39 log: logger(ws.Request().Context()),
42 err := ws.Request().ParseForm()
44 sess.log.WithError(err).Error("ParseForm failed")
47 token := ws.Request().Form.Get("api_token")
48 sess.permChecker.SetToken(token)
49 sess.log.WithField("token", token).Debug("set token")
54 func (sess *v0session) Receive(msg map[string]interface{}, buf []byte) [][]byte {
55 sess.log.WithField("data", msg).Debug("received message")
57 if err := json.Unmarshal(buf, &sub); err != nil {
58 sess.log.WithError(err).Info("ignored invalid request")
61 if sub.Method == "subscribe" {
63 sess.log.WithField("sub", sub).Debug("sub prepared")
65 sess.subscriptions = append(sess.subscriptions, sub)
68 return append([][]byte{v0subscribeOK}, sub.getOldEvents(sess)...)
70 return [][]byte{v0subscribeFail}
73 func (sess *v0session) EventMessage(e *event) ([]byte, error) {
79 ok, err := sess.permChecker.Check(detail.ObjectUUID)
80 if err != nil || !ok {
84 msg := map[string]interface{}{
88 "object_uuid": detail.ObjectUUID,
89 "object_owner_uuid": detail.ObjectOwnerUUID,
90 "event_type": detail.EventType,
92 if detail.Properties != nil && detail.Properties["text"] != nil {
93 msg["properties"] = detail.Properties
95 msgProps := map[string]map[string]interface{}{}
96 for _, ak := range []string{"old_attributes", "new_attributes"} {
97 eventAttrs, ok := detail.Properties[ak].(map[string]interface{})
101 msgAttrs := map[string]interface{}{}
102 for _, k := range sendObjectAttributes {
103 if v, ok := eventAttrs[k]; ok {
107 msgProps[ak] = msgAttrs
109 msg["properties"] = msgProps
111 return json.Marshal(msg)
114 func (sess *v0session) Filter(e *event) bool {
116 defer sess.mtx.Unlock()
117 for _, sub := range sess.subscriptions {
118 if sub.match(sess, e) {
125 func (sub *v0subscribe) getOldEvents(sess *v0session) (msgs [][]byte) {
126 if sub.LastLogID == 0 {
129 sess.log.WithField("LastLogID", sub.LastLogID).Debug("getOldEvents")
130 // Here we do a "select id" query and queue an event for every
131 // log since the given ID, then use (*event)Detail() to
132 // retrieve the whole row and decide whether to send it. This
133 // approach is very inefficient if the subscriber asks for
134 // last_log_id==1, even if the filters end up matching very
137 // To mitigate this, filter on "created > 10 minutes ago" when
138 // retrieving the list of old event IDs to consider.
139 rows, err := sess.db.Query(
140 `SELECT id FROM logs WHERE id > $1 AND created_at > $2 ORDER BY id`,
142 time.Now().UTC().Add(-10*time.Minute).Format(time.RFC3339Nano))
144 sess.log.WithError(err).Error("db.Query failed")
149 err := rows.Scan(&id)
151 sess.log.WithError(err).Error("row Scan failed")
156 Received: time.Now(),
159 if !sub.match(sess, e) {
160 sess.log.WithField("event", e).Debug("skip old event")
163 msg, err := sess.EventMessage(e)
165 sess.log.WithError(err).Error("event marshal failed")
168 sess.log.WithField("data", msg).Debug("will queue old event")
169 msgs = append(msgs, msg)
171 if err := rows.Err(); err != nil {
172 sess.log.WithError(err).Error("db.Query failed")
177 type v0subscribe struct {
180 LastLogID int64 `json:"last_log_id"`
182 funcs []func(*event) bool
185 type v0filter [3]interface{}
187 func (sub *v0subscribe) match(sess *v0session, e *event) bool {
188 log := sess.log.WithField("LogID", e.LogID)
191 log.Error("match failed, no detail")
194 log = log.WithField("funcs", len(sub.funcs))
195 for i, f := range sub.funcs {
197 log.WithField("func", i).Debug("match failed")
201 log.Debug("match passed")
205 func (sub *v0subscribe) prepare(sess *v0session) {
206 for _, f := range sub.Filters {
210 if col, ok := f[0].(string); ok && col == "event_type" {
211 op, ok := f[1].(string)
212 if !ok || op != "in" {
215 arr, ok := f[2].([]interface{})
220 for _, s := range arr {
221 if s, ok := s.(string); ok {
222 strs = append(strs, s)
225 sub.funcs = append(sub.funcs, func(e *event) bool {
226 for _, s := range strs {
227 if s == e.Detail().EventType {
233 } else if ok && col == "created_at" {
234 op, ok := f[1].(string)
238 tstr, ok := f[2].(string)
242 t, err := time.Parse(time.RFC3339Nano, tstr)
244 sess.log.WithField("data", tstr).WithError(err).Info("time.Parse failed")
247 var fn func(*event) bool
250 fn = func(e *event) bool {
251 return !e.Detail().CreatedAt.Before(t)
254 fn = func(e *event) bool {
255 return !e.Detail().CreatedAt.After(t)
258 fn = func(e *event) bool {
259 return e.Detail().CreatedAt.After(t)
262 fn = func(e *event) bool {
263 return e.Detail().CreatedAt.Before(t)
266 fn = func(e *event) bool {
267 return e.Detail().CreatedAt.Equal(t)
270 sess.log.WithField("operator", op).Info("bogus operator")
273 sub.funcs = append(sub.funcs, fn)