10 "git.curoverse.com/arvados.git/sdk/go/arvados"
14 errQueueFull = errors.New("client queue full")
15 errFrameTooBig = errors.New("frame too big")
17 sendObjectAttributes = []string{"state", "name"}
20 type v0session struct {
22 permChecker permChecker
23 subscriptions []v0subscribe
28 func NewSessionV0(ws wsConn, ac arvados.Client) (session, error) {
31 permChecker: NewPermChecker(ac),
34 err := ws.Request().ParseForm()
36 log.Printf("%s ParseForm: %s", ws.Request().RemoteAddr, err)
39 token := ws.Request().Form.Get("api_token")
40 sess.permChecker.SetToken(token)
41 sess.debugLogf("token = %+q", token)
46 func (sess *v0session) debugLogf(s string, args ...interface{}) {
47 args = append([]interface{}{sess.ws.Request().RemoteAddr}, args...)
48 debugLogf("%s "+s, args...)
51 func (sess *v0session) Receive(msg map[string]interface{}, buf []byte) []byte {
52 sess.debugLogf("received message: %+v", msg)
54 if err := json.Unmarshal(buf, &sub); err != nil {
55 sess.debugLogf("ignored unrecognized request: %s", err)
58 if sub.Method == "subscribe" {
60 sess.debugLogf("subscription: %v", sub)
62 sess.subscriptions = append(sess.subscriptions, sub)
64 return []byte(`{"status":200}`)
66 return []byte(`{"status":400}`)
69 func (sess *v0session) EventMessage(e *event) ([]byte, error) {
75 ok, err := sess.permChecker.Check(detail.ObjectUUID)
76 if err != nil || !ok {
80 msg := map[string]interface{}{
84 "object_uuid": detail.ObjectUUID,
85 "object_owner_uuid": detail.ObjectOwnerUUID,
86 "event_type": detail.EventType,
88 if detail.Properties != nil && detail.Properties["text"] != nil {
89 msg["properties"] = detail.Properties
91 msgProps := map[string]map[string]interface{}{}
92 for _, ak := range []string{"old_attributes", "new_attributes"} {
93 eventAttrs, ok := detail.Properties[ak].(map[string]interface{})
97 msgAttrs := map[string]interface{}{}
98 for _, k := range sendObjectAttributes {
99 if v, ok := eventAttrs[k]; ok {
103 msgProps[ak] = msgAttrs
105 msg["properties"] = msgProps
107 return json.Marshal(msg)
110 func (sess *v0session) Filter(e *event) bool {
112 defer sess.mtx.Unlock()
113 for _, sub := range sess.subscriptions {
121 type v0subscribe struct {
124 funcs []func(*event) bool
127 type v0filter [3]interface{}
129 func (sub *v0subscribe) match(e *event) bool {
134 debugLogf("sub.match: len(funcs)==%d", len(sub.funcs))
135 for i, f := range sub.funcs {
137 debugLogf("sub.match: failed on func %d", i)
144 func (sub *v0subscribe) prepare() {
145 for _, f := range sub.Filters {
149 if col, ok := f[0].(string); ok && col == "event_type" {
150 op, ok := f[1].(string)
151 if !ok || op != "in" {
154 arr, ok := f[2].([]interface{})
159 for _, s := range arr {
160 if s, ok := s.(string); ok {
161 strs = append(strs, s)
164 sub.funcs = append(sub.funcs, func(e *event) bool {
165 debugLogf("event_type func: %v in %v", e.Detail().EventType, strs)
166 for _, s := range strs {
167 if s == e.Detail().EventType {
173 } else if ok && col == "created_at" {
174 op, ok := f[1].(string)
178 tstr, ok := f[2].(string)
182 t, err := time.Parse(time.RFC3339Nano, tstr)
184 debugLogf("time.Parse(%q): %s", tstr, err)
189 sub.funcs = append(sub.funcs, func(e *event) bool {
190 debugLogf("created_at func: %v >= %v", e.Detail().CreatedAt, t)
191 return !e.Detail().CreatedAt.Before(t)
194 sub.funcs = append(sub.funcs, func(e *event) bool {
195 debugLogf("created_at func: %v <= %v", e.Detail().CreatedAt, t)
196 return !e.Detail().CreatedAt.After(t)
199 sub.funcs = append(sub.funcs, func(e *event) bool {
200 debugLogf("created_at func: %v > %v", e.Detail().CreatedAt, t)
201 return e.Detail().CreatedAt.After(t)
204 sub.funcs = append(sub.funcs, func(e *event) bool {
205 debugLogf("created_at func: %v < %v", e.Detail().CreatedAt, t)
206 return e.Detail().CreatedAt.Before(t)
209 sub.funcs = append(sub.funcs, func(e *event) bool {
210 debugLogf("created_at func: %v = %v", e.Detail().CreatedAt, t)
211 return e.Detail().CreatedAt.Equal(t)