10 "git.curoverse.com/arvados.git/sdk/go/arvados"
14 errQueueFull = errors.New("client queue full")
15 errFrameTooBig = errors.New("frame too big")
17 sendObjectAttributes = []string{"state", "name"}
19 v0subscribeOK = []byte(`{"status":200}`)
20 v0subscribeFail = []byte(`{"status":400}`)
23 type v0session struct {
25 permChecker permChecker
26 subscriptions []v0subscribe
31 func NewSessionV0(ws wsConn, ac arvados.Client) (session, error) {
34 permChecker: NewPermChecker(ac),
37 err := ws.Request().ParseForm()
39 log.Printf("%s ParseForm: %s", ws.Request().RemoteAddr, err)
42 token := ws.Request().Form.Get("api_token")
43 sess.permChecker.SetToken(token)
44 sess.debugLogf("token = %+q", token)
49 func (sess *v0session) debugLogf(s string, args ...interface{}) {
50 args = append([]interface{}{sess.ws.Request().RemoteAddr}, args...)
51 debugLogf("%s "+s, args...)
54 func (sess *v0session) Receive(msg map[string]interface{}, buf []byte) [][]byte {
55 sess.debugLogf("received message: %+v", msg)
57 if err := json.Unmarshal(buf, &sub); err != nil {
58 sess.debugLogf("ignored unrecognized request: %s", err)
61 if sub.Method == "subscribe" {
63 sess.debugLogf("subscription: %v", sub)
65 sess.subscriptions = append(sess.subscriptions, sub)
67 return [][]byte{v0subscribeOK}
69 return [][]byte{v0subscribeFail}
72 func (sess *v0session) EventMessage(e *event) ([]byte, error) {
78 ok, err := sess.permChecker.Check(detail.ObjectUUID)
79 if err != nil || !ok {
83 msg := map[string]interface{}{
87 "object_uuid": detail.ObjectUUID,
88 "object_owner_uuid": detail.ObjectOwnerUUID,
89 "event_type": detail.EventType,
91 if detail.Properties != nil && detail.Properties["text"] != nil {
92 msg["properties"] = detail.Properties
94 msgProps := map[string]map[string]interface{}{}
95 for _, ak := range []string{"old_attributes", "new_attributes"} {
96 eventAttrs, ok := detail.Properties[ak].(map[string]interface{})
100 msgAttrs := map[string]interface{}{}
101 for _, k := range sendObjectAttributes {
102 if v, ok := eventAttrs[k]; ok {
106 msgProps[ak] = msgAttrs
108 msg["properties"] = msgProps
110 return json.Marshal(msg)
113 func (sess *v0session) Filter(e *event) bool {
115 defer sess.mtx.Unlock()
116 for _, sub := range sess.subscriptions {
124 type v0subscribe struct {
127 funcs []func(*event) bool
130 type v0filter [3]interface{}
132 func (sub *v0subscribe) match(e *event) bool {
137 debugLogf("sub.match: len(funcs)==%d", len(sub.funcs))
138 for i, f := range sub.funcs {
140 debugLogf("sub.match: failed on func %d", i)
147 func (sub *v0subscribe) prepare() {
148 for _, f := range sub.Filters {
152 if col, ok := f[0].(string); ok && col == "event_type" {
153 op, ok := f[1].(string)
154 if !ok || op != "in" {
157 arr, ok := f[2].([]interface{})
162 for _, s := range arr {
163 if s, ok := s.(string); ok {
164 strs = append(strs, s)
167 sub.funcs = append(sub.funcs, func(e *event) bool {
168 debugLogf("event_type func: %v in %v", e.Detail().EventType, strs)
169 for _, s := range strs {
170 if s == e.Detail().EventType {
176 } else if ok && col == "created_at" {
177 op, ok := f[1].(string)
181 tstr, ok := f[2].(string)
185 t, err := time.Parse(time.RFC3339Nano, tstr)
187 debugLogf("time.Parse(%q): %s", tstr, err)
192 sub.funcs = append(sub.funcs, func(e *event) bool {
193 debugLogf("created_at func: %v >= %v", e.Detail().CreatedAt, t)
194 return !e.Detail().CreatedAt.Before(t)
197 sub.funcs = append(sub.funcs, func(e *event) bool {
198 debugLogf("created_at func: %v <= %v", e.Detail().CreatedAt, t)
199 return !e.Detail().CreatedAt.After(t)
202 sub.funcs = append(sub.funcs, func(e *event) bool {
203 debugLogf("created_at func: %v > %v", e.Detail().CreatedAt, t)
204 return e.Detail().CreatedAt.After(t)
207 sub.funcs = append(sub.funcs, func(e *event) bool {
208 debugLogf("created_at func: %v < %v", e.Detail().CreatedAt, t)
209 return e.Detail().CreatedAt.Before(t)
212 sub.funcs = append(sub.funcs, func(e *event) bool {
213 debugLogf("created_at func: %v = %v", e.Detail().CreatedAt, t)
214 return e.Detail().CreatedAt.Equal(t)