12 "git.curoverse.com/arvados.git/sdk/go/arvados"
16 errQueueFull = errors.New("client queue full")
17 errFrameTooBig = errors.New("frame too big")
19 sendObjectAttributes = []string{"state", "name"}
21 v0subscribeOK = []byte(`{"status":200}`)
22 v0subscribeFail = []byte(`{"status":400}`)
25 type v0session struct {
27 permChecker permChecker
28 subscriptions []v0subscribe
33 func NewSessionV0(ws wsConn, ac arvados.Client) (session, error) {
36 permChecker: NewPermChecker(ac),
39 err := ws.Request().ParseForm()
41 log.Printf("%s ParseForm: %s", ws.Request().RemoteAddr, err)
44 token := ws.Request().Form.Get("api_token")
45 sess.permChecker.SetToken(token)
46 sess.debugLogf("token = %+q", token)
51 func (sess *v0session) debugLogf(s string, args ...interface{}) {
52 args = append([]interface{}{sess.ws.Request().RemoteAddr}, args...)
53 debugLogf("%s "+s, args...)
56 func (sess *v0session) Receive(msg map[string]interface{}, buf []byte) [][]byte {
57 sess.debugLogf("received message: %+v", msg)
59 if err := json.Unmarshal(buf, &sub); err != nil {
60 sess.debugLogf("ignored unrecognized request: %s", err)
63 if sub.Method == "subscribe" {
65 sess.debugLogf("subscription: %v", sub)
67 sess.subscriptions = append(sess.subscriptions, sub)
70 resp := [][]byte{v0subscribeOK}
72 if sub.LastLogID > 0 {
73 // Hack 1: use the permission checker's
74 // Arvados client to retrieve old log IDs. Use
75 // "created < 2 hours ago" to ensure the
76 // database query doesn't get too expensive
77 // when our client gives us last_log_id==1.
78 ac := sess.permChecker.(*cachingPermChecker).Client
79 var old arvados.LogList
80 ac.RequestAndDecode(&old, "GET", "arvados/v1/logs", nil, url.Values{
82 "filters": {fmt.Sprintf(
83 `[["id",">",%d],["created_at",">","%s"]]`,
85 time.Now().UTC().Add(-2*time.Hour).Format(time.RFC3339Nano))},
87 for _, log := range old.Items {
88 // Hack 2: populate the event's logRow
89 // using the API response -- otherwise
90 // Detail() would crash because e.db
97 msg, err := sess.EventMessage(e)
101 resp = append(resp, msg)
105 return [][]byte{v0subscribeOK}
107 return [][]byte{v0subscribeFail}
110 func (sess *v0session) EventMessage(e *event) ([]byte, error) {
116 ok, err := sess.permChecker.Check(detail.ObjectUUID)
117 if err != nil || !ok {
121 msg := map[string]interface{}{
125 "object_uuid": detail.ObjectUUID,
126 "object_owner_uuid": detail.ObjectOwnerUUID,
127 "event_type": detail.EventType,
129 if detail.Properties != nil && detail.Properties["text"] != nil {
130 msg["properties"] = detail.Properties
132 msgProps := map[string]map[string]interface{}{}
133 for _, ak := range []string{"old_attributes", "new_attributes"} {
134 eventAttrs, ok := detail.Properties[ak].(map[string]interface{})
138 msgAttrs := map[string]interface{}{}
139 for _, k := range sendObjectAttributes {
140 if v, ok := eventAttrs[k]; ok {
144 msgProps[ak] = msgAttrs
146 msg["properties"] = msgProps
148 return json.Marshal(msg)
151 func (sess *v0session) Filter(e *event) bool {
153 defer sess.mtx.Unlock()
154 for _, sub := range sess.subscriptions {
162 type v0subscribe struct {
165 LastLogID int64 `json:"last_log_id"`
167 funcs []func(*event) bool
170 type v0filter [3]interface{}
172 func (sub *v0subscribe) match(e *event) bool {
177 debugLogf("sub.match: len(funcs)==%d", len(sub.funcs))
178 for i, f := range sub.funcs {
180 debugLogf("sub.match: failed on func %d", i)
187 func (sub *v0subscribe) prepare() {
188 for _, f := range sub.Filters {
192 if col, ok := f[0].(string); ok && col == "event_type" {
193 op, ok := f[1].(string)
194 if !ok || op != "in" {
197 arr, ok := f[2].([]interface{})
202 for _, s := range arr {
203 if s, ok := s.(string); ok {
204 strs = append(strs, s)
207 sub.funcs = append(sub.funcs, func(e *event) bool {
208 debugLogf("event_type func: %v in %v", e.Detail().EventType, strs)
209 for _, s := range strs {
210 if s == e.Detail().EventType {
216 } else if ok && col == "created_at" {
217 op, ok := f[1].(string)
221 tstr, ok := f[2].(string)
225 t, err := time.Parse(time.RFC3339Nano, tstr)
227 debugLogf("time.Parse(%q): %s", tstr, err)
232 sub.funcs = append(sub.funcs, func(e *event) bool {
233 debugLogf("created_at func: %v >= %v", e.Detail().CreatedAt, t)
234 return !e.Detail().CreatedAt.Before(t)
237 sub.funcs = append(sub.funcs, func(e *event) bool {
238 debugLogf("created_at func: %v <= %v", e.Detail().CreatedAt, t)
239 return !e.Detail().CreatedAt.After(t)
242 sub.funcs = append(sub.funcs, func(e *event) bool {
243 debugLogf("created_at func: %v > %v", e.Detail().CreatedAt, t)
244 return e.Detail().CreatedAt.After(t)
247 sub.funcs = append(sub.funcs, func(e *event) bool {
248 debugLogf("created_at func: %v < %v", e.Detail().CreatedAt, t)
249 return e.Detail().CreatedAt.Before(t)
252 sub.funcs = append(sub.funcs, func(e *event) bool {
253 debugLogf("created_at func: %v = %v", e.Detail().CreatedAt, t)
254 return e.Detail().CreatedAt.Equal(t)