11 "git.curoverse.com/arvados.git/sdk/go/arvados"
15 errQueueFull = errors.New("client queue full")
16 errFrameTooBig = errors.New("frame too big")
18 sendObjectAttributes = []string{"state", "name"}
20 v0subscribeOK = []byte(`{"status":200}`)
21 v0subscribeFail = []byte(`{"status":400}`)
24 type v0session struct {
27 permChecker permChecker
28 subscriptions []v0subscribe
33 func NewSessionV0(ws wsConn, ac arvados.Client, db *sql.DB) (session, error) {
37 permChecker: NewPermChecker(ac),
40 err := ws.Request().ParseForm()
42 log.Printf("%s ParseForm: %s", ws.Request().RemoteAddr, err)
45 token := ws.Request().Form.Get("api_token")
46 sess.permChecker.SetToken(token)
47 sess.debugLogf("token = %+q", token)
52 func (sess *v0session) debugLogf(s string, args ...interface{}) {
53 args = append([]interface{}{sess.ws.Request().RemoteAddr}, args...)
54 debugLogf("%s "+s, args...)
57 func (sess *v0session) Receive(msg map[string]interface{}, buf []byte) [][]byte {
58 sess.debugLogf("received message: %+v", msg)
60 if err := json.Unmarshal(buf, &sub); err != nil {
61 sess.debugLogf("ignored unrecognized request: %s", err)
64 if sub.Method == "subscribe" {
66 sess.debugLogf("subscription: %v", sub)
68 sess.subscriptions = append(sess.subscriptions, sub)
71 return append([][]byte{v0subscribeOK}, sub.getOldEvents(sess)...)
73 return [][]byte{v0subscribeFail}
76 func (sess *v0session) EventMessage(e *event) ([]byte, error) {
82 ok, err := sess.permChecker.Check(detail.ObjectUUID)
83 if err != nil || !ok {
87 msg := map[string]interface{}{
91 "object_uuid": detail.ObjectUUID,
92 "object_owner_uuid": detail.ObjectOwnerUUID,
93 "event_type": detail.EventType,
95 if detail.Properties != nil && detail.Properties["text"] != nil {
96 msg["properties"] = detail.Properties
98 msgProps := map[string]map[string]interface{}{}
99 for _, ak := range []string{"old_attributes", "new_attributes"} {
100 eventAttrs, ok := detail.Properties[ak].(map[string]interface{})
104 msgAttrs := map[string]interface{}{}
105 for _, k := range sendObjectAttributes {
106 if v, ok := eventAttrs[k]; ok {
110 msgProps[ak] = msgAttrs
112 msg["properties"] = msgProps
114 return json.Marshal(msg)
117 func (sess *v0session) Filter(e *event) bool {
119 defer sess.mtx.Unlock()
120 for _, sub := range sess.subscriptions {
128 func (sub *v0subscribe) getOldEvents(sess *v0session) (msgs [][]byte) {
129 if sub.LastLogID == 0 {
132 debugLogf("getOldEvents(%d)", sub.LastLogID)
133 // Here we do a "select id" query and queue an event for every
134 // log since the given ID, then use (*event)Detail() to
135 // retrieve the whole row and decide whether to send it. This
136 // approach is very inefficient if the subscriber asks for
137 // last_log_id==1, even if the filters end up matching very
140 // To mitigate this, filter on "created > 10 minutes ago" when
141 // retrieving the list of old event IDs to consider.
142 rows, err := sess.db.Query(
143 `SELECT id FROM logs WHERE id > $1 AND created_at > $2 ORDER BY id`,
145 time.Now().UTC().Add(-10*time.Minute).Format(time.RFC3339Nano))
147 errorLogf("db.Query: %s", err)
152 err := rows.Scan(&id)
154 errorLogf("Scan: %s", err)
159 Received: time.Now(),
163 debugLogf("skip old event %+v", e)
166 msg, err := sess.EventMessage(e)
168 debugLogf("event marshal: %s", err)
171 debugLogf("old event: %s", string(msg))
172 msgs = append(msgs, msg)
174 if err := rows.Err(); err != nil {
175 errorLogf("db.Query: %s", err)
180 type v0subscribe struct {
183 LastLogID int64 `json:"last_log_id"`
185 funcs []func(*event) bool
188 type v0filter [3]interface{}
190 func (sub *v0subscribe) match(e *event) bool {
193 debugLogf("match(%d): failed on no detail", e.LogID)
196 for i, f := range sub.funcs {
198 debugLogf("match(%d): failed on func %d", e.LogID, i)
202 debugLogf("match(%d): passed %d funcs", e.LogID, len(sub.funcs))
206 func (sub *v0subscribe) prepare() {
207 for _, f := range sub.Filters {
211 if col, ok := f[0].(string); ok && col == "event_type" {
212 op, ok := f[1].(string)
213 if !ok || op != "in" {
216 arr, ok := f[2].([]interface{})
221 for _, s := range arr {
222 if s, ok := s.(string); ok {
223 strs = append(strs, s)
226 sub.funcs = append(sub.funcs, func(e *event) bool {
227 debugLogf("event_type func: %v in %v", e.Detail().EventType, strs)
228 for _, s := range strs {
229 if s == e.Detail().EventType {
235 } else if ok && col == "created_at" {
236 op, ok := f[1].(string)
240 tstr, ok := f[2].(string)
244 t, err := time.Parse(time.RFC3339Nano, tstr)
246 debugLogf("time.Parse(%q): %s", tstr, err)
251 sub.funcs = append(sub.funcs, func(e *event) bool {
252 debugLogf("created_at func: %v >= %v", e.Detail().CreatedAt, t)
253 return !e.Detail().CreatedAt.Before(t)
256 sub.funcs = append(sub.funcs, func(e *event) bool {
257 debugLogf("created_at func: %v <= %v", e.Detail().CreatedAt, t)
258 return !e.Detail().CreatedAt.After(t)
261 sub.funcs = append(sub.funcs, func(e *event) bool {
262 debugLogf("created_at func: %v > %v", e.Detail().CreatedAt, t)
263 return e.Detail().CreatedAt.After(t)
266 sub.funcs = append(sub.funcs, func(e *event) bool {
267 debugLogf("created_at func: %v < %v", e.Detail().CreatedAt, t)
268 return e.Detail().CreatedAt.Before(t)
271 sub.funcs = append(sub.funcs, func(e *event) bool {
272 debugLogf("created_at func: %v = %v", e.Detail().CreatedAt, t)
273 return e.Detail().CreatedAt.Equal(t)