X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/83f085a0c405695d9840a0542eb9746e7b5d3f6f..5ffb79040668114c58bf35c3e18a8302b8d94445:/services/ws/session_v0.go diff --git a/services/ws/session_v0.go b/services/ws/session_v0.go index 15efc1d080..2035acbd5d 100644 --- a/services/ws/session_v0.go +++ b/services/ws/session_v0.go @@ -5,6 +5,7 @@ import ( "errors" "log" "sync" + "time" "git.curoverse.com/arvados.git/sdk/go/arvados" ) @@ -12,21 +13,22 @@ import ( var ( errQueueFull = errors.New("client queue full") errFrameTooBig = errors.New("frame too big") + + sendObjectAttributes = []string{"state", "name"} ) -type sessionV0 struct { - ws wsConn - proxyClient *proxyClient - subscribed map[string]bool - mtx sync.Mutex - setupOnce sync.Once +type v0session struct { + ws wsConn + permChecker permChecker + subscriptions []v0subscribe + mtx sync.Mutex + setupOnce sync.Once } func NewSessionV0(ws wsConn, ac arvados.Client) (session, error) { - sess := &sessionV0{ + sess := &v0session{ ws: ws, - proxyClient: NewProxyClient(ac), - subscribed: make(map[string]bool), + permChecker: NewPermChecker(ac), } err := ws.Request().ParseForm() @@ -35,57 +37,180 @@ func NewSessionV0(ws wsConn, ac arvados.Client) (session, error) { return nil, err } token := ws.Request().Form.Get("api_token") - sess.proxyClient.SetToken(token) - sess.debugLogf("handlerV0: token = %+q", token) + sess.permChecker.SetToken(token) + sess.debugLogf("token = %+q", token) return sess, nil } -func (sess *sessionV0) debugLogf(s string, args ...interface{}) { +func (sess *v0session) debugLogf(s string, args ...interface{}) { args = append([]interface{}{sess.ws.Request().RemoteAddr}, args...) debugLogf("%s "+s, args...) } -func (sess *sessionV0) Receive(msg map[string]interface{}) { +func (sess *v0session) Receive(msg map[string]interface{}, buf []byte) []byte { sess.debugLogf("received message: %+v", msg) - sess.debugLogf("subscribing to *") - sess.subscribed["*"] = true + var sub v0subscribe + if err := json.Unmarshal(buf, &sub); err != nil { + sess.debugLogf("ignored unrecognized request: %s", err) + return nil + } + if sub.Method == "subscribe" { + sub.prepare() + sess.debugLogf("subscription: %v", sub) + sess.mtx.Lock() + sess.subscriptions = append(sess.subscriptions, sub) + sess.mtx.Unlock() + return []byte(`{"status":200}`) + } + return []byte(`{"status":400}`) } -func (sess *sessionV0) EventMessage(e *event) ([]byte, error) { +func (sess *v0session) EventMessage(e *event) ([]byte, error) { detail := e.Detail() if detail == nil { return nil, nil } - ok, err := sess.proxyClient.CheckReadPermission(detail.UUID) + + ok, err := sess.permChecker.Check(detail.ObjectUUID) if err != nil || !ok { return nil, err } - return json.Marshal(map[string]interface{}{ + msg := map[string]interface{}{ "msgID": e.Serial, "id": detail.ID, "uuid": detail.UUID, "object_uuid": detail.ObjectUUID, "object_owner_uuid": detail.ObjectOwnerUUID, "event_type": detail.EventType, - }) + } + if detail.Properties != nil && detail.Properties["text"] != nil { + msg["properties"] = detail.Properties + } else { + msgProps := map[string]map[string]interface{}{} + for _, ak := range []string{"old_attributes", "new_attributes"} { + eventAttrs, ok := detail.Properties[ak].(map[string]interface{}) + if !ok { + continue + } + msgAttrs := map[string]interface{}{} + for _, k := range sendObjectAttributes { + if v, ok := eventAttrs[k]; ok { + msgAttrs[k] = v + } + } + msgProps[ak] = msgAttrs + } + msg["properties"] = msgProps + } + return json.Marshal(msg) } -func (sess *sessionV0) Filter(e *event) bool { - detail := e.Detail() +func (sess *v0session) Filter(e *event) bool { sess.mtx.Lock() defer sess.mtx.Unlock() - switch { - case sess.subscribed["*"]: - return true - case detail == nil: - return false - case sess.subscribed[detail.ObjectUUID]: - return true - case sess.subscribed[detail.ObjectOwnerUUID]: - return true - default: + for _, sub := range sess.subscriptions { + if sub.match(e) { + return true + } + } + return false +} + +type v0subscribe struct { + Method string + Filters []v0filter + funcs []func(*event) bool +} + +type v0filter [3]interface{} + +func (sub *v0subscribe) match(e *event) bool { + detail := e.Detail() + if detail == nil { return false } + debugLogf("sub.match: len(funcs)==%d", len(sub.funcs)) + for i, f := range sub.funcs { + if !f(e) { + debugLogf("sub.match: failed on func %d", i) + return false + } + } + return true +} + +func (sub *v0subscribe) prepare() { + for _, f := range sub.Filters { + if len(f) != 3 { + continue + } + if col, ok := f[0].(string); ok && col == "event_type" { + op, ok := f[1].(string) + if !ok || op != "in" { + continue + } + arr, ok := f[2].([]interface{}) + if !ok { + continue + } + var strs []string + for _, s := range arr { + if s, ok := s.(string); ok { + strs = append(strs, s) + } + } + sub.funcs = append(sub.funcs, func(e *event) bool { + debugLogf("event_type func: %v in %v", e.Detail().EventType, strs) + for _, s := range strs { + if s == e.Detail().EventType { + return true + } + } + return false + }) + } else if ok && col == "created_at" { + op, ok := f[1].(string) + if !ok { + continue + } + tstr, ok := f[2].(string) + if !ok { + continue + } + t, err := time.Parse(time.RFC3339Nano, tstr) + if err != nil { + debugLogf("time.Parse(%q): %s", tstr, err) + continue + } + switch op { + case ">=": + sub.funcs = append(sub.funcs, func(e *event) bool { + debugLogf("created_at func: %v >= %v", e.Detail().CreatedAt, t) + return !e.Detail().CreatedAt.Before(t) + }) + case "<=": + sub.funcs = append(sub.funcs, func(e *event) bool { + debugLogf("created_at func: %v <= %v", e.Detail().CreatedAt, t) + return !e.Detail().CreatedAt.After(t) + }) + case ">": + sub.funcs = append(sub.funcs, func(e *event) bool { + debugLogf("created_at func: %v > %v", e.Detail().CreatedAt, t) + return e.Detail().CreatedAt.After(t) + }) + case "<": + sub.funcs = append(sub.funcs, func(e *event) bool { + debugLogf("created_at func: %v < %v", e.Detail().CreatedAt, t) + return e.Detail().CreatedAt.Before(t) + }) + case "=": + sub.funcs = append(sub.funcs, func(e *event) bool { + debugLogf("created_at func: %v = %v", e.Detail().CreatedAt, t) + return e.Detail().CreatedAt.Equal(t) + }) + } + } + } }