From 104b2b5a46844e94a37c332b4ddd5a861dd8d63d Mon Sep 17 00:00:00 2001 From: Tom Clegg Date: Mon, 14 Nov 2016 10:38:14 -0500 Subject: [PATCH] 8460: Check permissions. --- sdk/go/arvados/client.go | 77 +++++++++++++++++++++++++++++++++++-- services/ws/config.go | 4 +- services/ws/event.go | 21 ++++++---- services/ws/handler.go | 1 + services/ws/handler_v0.go | 58 ++++++++++++++++++++++------ services/ws/handler_v1.go | 3 ++ services/ws/main.go | 12 +++--- services/ws/pg.go | 37 ++++++++++-------- services/ws/proxy_client.go | 41 ++++++++++++++++++++ services/ws/router.go | 4 +- 10 files changed, 213 insertions(+), 45 deletions(-) create mode 100644 services/ws/proxy_client.go diff --git a/sdk/go/arvados/client.go b/sdk/go/arvados/client.go index 36f4eb52ae..0c18d38974 100644 --- a/sdk/go/arvados/client.go +++ b/sdk/go/arvados/client.go @@ -41,6 +41,8 @@ type Client struct { // callers who use a Client to initialize an // arvadosclient.ArvadosClient.) KeepServiceURIs []string `json:",omitempty"` + + dd *DiscoveryDocument } // The default http.Client used by a Client with Insecure==true and @@ -198,14 +200,83 @@ func (c *Client) apiURL(path string) string { // DiscoveryDocument is the Arvados server's description of itself. type DiscoveryDocument struct { - DefaultCollectionReplication int `json:"defaultCollectionReplication"` - BlobSignatureTTL int64 `json:"blobSignatureTtl"` + BasePath string `json:"basePath"` + DefaultCollectionReplication int `json:"defaultCollectionReplication"` + BlobSignatureTTL int64 `json:"blobSignatureTtl"` + Schemas map[string]Schema `json:"schemas"` + Resources map[string]Resource `json:"resources"` +} + +type Resource struct { + Methods map[string]ResourceMethod `json:"methods"` +} + +type ResourceMethod struct { + HTTPMethod string `json:"httpMethod"` + Path string `json:"path"` + Response MethodResponse `json:"response"` +} + +type MethodResponse struct { + Ref string `json:"$ref"` +} + +type Schema struct { + UUIDPrefix string `json:"uuidPrefix"` } // DiscoveryDocument returns a *DiscoveryDocument. The returned object // should not be modified: the same object may be returned by // subsequent calls. func (c *Client) DiscoveryDocument() (*DiscoveryDocument, error) { + if c.dd != nil { + return c.dd, nil + } var dd DiscoveryDocument - return &dd, c.RequestAndDecode(&dd, "GET", "discovery/v1/apis/arvados/v1/rest", nil, nil) + err := c.RequestAndDecode(&dd, "GET", "discovery/v1/apis/arvados/v1/rest", nil, nil) + if err != nil { + return nil, err + } + c.dd = &dd + return c.dd, nil +} + +func (c *Client) PathForUUID(method, uuid string) (string, error) { + if len(uuid) != 27 { + return "", fmt.Errorf("invalid UUID: %q", uuid) + } + dd, err := c.DiscoveryDocument() + if err != nil { + return "", err + } + infix := uuid[6:11] + var model string + for m, s := range dd.Schemas { + if s.UUIDPrefix == infix { + model = m + break + } + } + if model == "" { + return "", fmt.Errorf("unrecognized UUID infix: %q", infix) + } + var resource string + for r, rsc := range dd.Resources { + if rsc.Methods["get"].Response.Ref == model { + resource = r + break + } + } + if resource == "" { + return "", fmt.Errorf("no resource for model: %q", model) + } + m, ok := dd.Resources[resource].Methods[method] + if !ok { + return "", fmt.Errorf("no method %q for resource %q", method, resource) + } + path := dd.BasePath + strings.Replace(m.Path, "{uuid}", uuid, -1) + if path[0] == '/' { + path = path[1:] + } + return path, nil } diff --git a/services/ws/config.go b/services/ws/config.go index 3e3d91f292..9c2e80a172 100644 --- a/services/ws/config.go +++ b/services/ws/config.go @@ -23,12 +23,12 @@ func DefaultConfig() Config { APIHost: "localhost:443", }, Postgres: pgConfig{ - "dbname": "arvados_test", + "dbname": "arvados_production", "user": "arvados", "password": "xyzzy", "host": "localhost", "connect_timeout": "30", - "sslmode": "disable", + "sslmode": "require", }, PingTimeout: arvados.Duration(time.Minute), ClientEventQueue: 64, diff --git a/services/ws/event.go b/services/ws/event.go index b6dda4968b..e34b6b4b58 100644 --- a/services/ws/event.go +++ b/services/ws/event.go @@ -7,6 +7,7 @@ import ( "time" "git.curoverse.com/arvados.git/sdk/go/arvados" + "github.com/ghodss/yaml" ) type eventSink interface { @@ -15,11 +16,11 @@ type eventSink interface { } type eventSource interface { - NewSink(chan *event) eventSink + NewSink() eventSink } type event struct { - LogUUID string + LogID uint64 Received time.Time Serial uint64 @@ -39,18 +40,24 @@ func (e *event) Detail() *arvados.Log { return e.logRow } var logRow arvados.Log - var oldAttrs, newAttrs []byte - e.err = e.db.QueryRow(`SELECT id, uuid, object_uuid, object_owner_uuid, event_type, created_at, old_attributes, new_attributes FROM logs WHERE uuid = ?`, e.LogUUID).Scan( + var propYAML []byte + e.err = e.db.QueryRow(`SELECT id, uuid, object_uuid, object_owner_uuid, event_type, created_at, properties FROM logs WHERE id = $1`, e.LogID).Scan( &logRow.ID, &logRow.UUID, &logRow.ObjectUUID, &logRow.ObjectOwnerUUID, &logRow.EventType, &logRow.CreatedAt, - &oldAttrs, - &newAttrs) + &propYAML) if e.err != nil { - log.Printf("retrieving log row %s: %s", e.LogUUID, e.err) + log.Printf("retrieving log row %d: %s", e.LogID, e.err) + return nil } + e.err = yaml.Unmarshal(propYAML, &logRow.Properties) + if e.err != nil { + log.Printf("decoding yaml for log row %d: %s", e.LogID, e.err) + return nil + } + e.logRow = &logRow return e.logRow } diff --git a/services/ws/handler.go b/services/ws/handler.go index fe47a62ccf..ba8f945dfc 100644 --- a/services/ws/handler.go +++ b/services/ws/handler.go @@ -14,6 +14,7 @@ type wsConn interface { io.ReadWriter Request() *http.Request SetReadDeadline(time.Time) error + SetWriteDeadline(time.Time) error } type timeouter interface { diff --git a/services/ws/handler_v0.go b/services/ws/handler_v0.go index c728d121f3..eb076b5bb7 100644 --- a/services/ws/handler_v0.go +++ b/services/ws/handler_v0.go @@ -7,6 +7,8 @@ import ( "log" "sync" "time" + + "git.curoverse.com/arvados.git/sdk/go/arvados" ) var ( @@ -15,6 +17,7 @@ var ( ) type handlerV0 struct { + Client arvados.Client PingTimeout time.Duration QueueSize int } @@ -29,6 +32,18 @@ func (h *handlerV0) Handle(ws wsConn, events <-chan *event) { mtx := sync.Mutex{} subscribed := make(map[string]bool) + proxyClient := NewProxyClient(h.Client) + { + err := ws.Request().ParseForm() + if err != nil { + log.Printf("%s ParseForm: %s", ws.Request().RemoteAddr, err) + return + } + token := ws.Request().Form.Get("api_token") + h.debugLogf(ws, "handlerV0: token = %+q", token) + proxyClient.SetToken(token) + } + stopped := make(chan struct{}) stop := make(chan error, 5) @@ -40,21 +55,13 @@ func (h *handlerV0) Handle(ws wsConn, events <-chan *event) { return default: } - ws.SetReadDeadline(time.Now().Add(h.PingTimeout)) + ws.SetReadDeadline(time.Now().Add(24 * 365 * time.Hour)) n, err := ws.Read(buf) h.debugLogf(ws, "received frame: %q", buf[:n]) if err == nil && n == len(buf) { err = errFrameTooBig } if err, ok := err.(timeouter); ok && err.Timeout() { - // If the outgoing queue is empty, - // send an empty message. This can - // help detect a disconnected network - // socket, and prevent an idle socket - // from being closed. - if len(queue) == 0 { - queue <- nil - } continue } if err != nil { @@ -80,6 +87,7 @@ func (h *handlerV0) Handle(ws wsConn, events <-chan *event) { go func() { for e := range queue { if e == nil { + ws.SetWriteDeadline(time.Now().Add(h.PingTimeout)) _, err := ws.Write([]byte("{}\n")) if err != nil { h.debugLogf(ws, "handlerV0: write: %s", err) @@ -92,7 +100,18 @@ func (h *handlerV0) Handle(ws wsConn, events <-chan *event) { if detail == nil { continue } - // FIXME: check permission + + ok, err := proxyClient.CheckReadPermission(detail.UUID) + if err != nil { + log.Printf("CheckReadPermission: %s", err) + stop <- err + break + } + if !ok { + h.debugLogf(ws, "handlerV0: skip event %d", e.Serial) + continue + } + buf, err := json.Marshal(map[string]interface{}{ "msgID": e.Serial, "id": detail.ID, @@ -105,14 +124,18 @@ func (h *handlerV0) Handle(ws wsConn, events <-chan *event) { log.Printf("error encoding: ", err) continue } + h.debugLogf(ws, "handlerV0: send event %d: %q", e.Serial, buf) + ws.SetWriteDeadline(time.Now().Add(h.PingTimeout)) _, err = ws.Write(append(buf, byte('\n'))) if err != nil { h.debugLogf(ws, "handlerV0: write: %s", err) stop <- err break } + h.debugLogf(ws, "handlerV0: sent event %d", e.Serial) + } + for _ = range queue { } - for _ = range queue {} }() // Filter incoming events against the current subscription @@ -129,6 +152,9 @@ func (h *handlerV0) Handle(ws wsConn, events <-chan *event) { } } + ticker := time.NewTicker(h.PingTimeout) + defer ticker.Stop() + for { var e *event var ok bool @@ -136,6 +162,16 @@ func (h *handlerV0) Handle(ws wsConn, events <-chan *event) { case <-stopped: close(queue) return + case <-ticker.C: + // If the outgoing queue is empty, + // send an empty message. This can + // help detect a disconnected network + // socket, and prevent an idle socket + // from being closed. + if len(queue) == 0 { + queue <- nil + } + continue case e, ok = <-events: if !ok { close(queue) diff --git a/services/ws/handler_v1.go b/services/ws/handler_v1.go index 4160d8696d..1b8549e26f 100644 --- a/services/ws/handler_v1.go +++ b/services/ws/handler_v1.go @@ -2,9 +2,12 @@ package main import ( "time" + + "git.curoverse.com/arvados.git/sdk/go/arvados" ) type handlerV1 struct { + Client arvados.Client PingTimeout time.Duration QueueSize int } diff --git a/services/ws/main.go b/services/ws/main.go index 0f978231b9..28662440d0 100644 --- a/services/ws/main.go +++ b/services/ws/main.go @@ -35,18 +35,20 @@ func main() { return } + eventSource := &pgEventSource{ + PgConfig: cfg.Postgres, + QueueSize: cfg.ServerEventQueue, + } srv := &http.Server{ Addr: cfg.Listen, ReadTimeout: time.Minute, WriteTimeout: time.Minute, MaxHeaderBytes: 1 << 20, Handler: &router{ - Config: &cfg, - eventSource: &pgEventSource{ - PgConfig: cfg.Postgres, - QueueSize: cfg.ServerEventQueue, - }, + Config: &cfg, + eventSource: eventSource, }, } + eventSource.NewSink().Stop() log.Fatal(srv.ListenAndServe()) } diff --git a/services/ws/pg.go b/services/ws/pg.go index 51bc92ca67..5e8e63e01f 100644 --- a/services/ws/pg.go +++ b/services/ws/pg.go @@ -3,6 +3,7 @@ package main import ( "database/sql" "log" + "strconv" "strings" "sync" "time" @@ -52,15 +53,17 @@ func (ps *pgEventSource) run() { // on missed events, we cannot recover from a // dropped connection without breaking our // promises to clients. - log.Fatal(err) + log.Fatalf("pgEventSource listener problem: %s", err) } }) err = listener.Listen("logs") if err != nil { log.Fatal(err) } + debugLogf("pgEventSource listening") go func() { for _ = range time.NewTicker(time.Minute).C { + debugLogf("pgEventSource listener ping") listener.Ping() } }() @@ -74,7 +77,7 @@ func (ps *pgEventSource) run() { // concurrent queries would be bounded by // client_count X client_queue_size. e.Detail() - debugLogf("%+v", e) + debugLogf("event %d detail %+v", e.Serial, e.Detail()) ps.mtx.Lock() for sink := range ps.sinks { sink.channel <- e @@ -88,33 +91,35 @@ func (ps *pgEventSource) run() { if pqEvent.Channel != "logs" { continue } + logID, err := strconv.ParseUint(pqEvent.Extra, 10, 64) + if err != nil { + log.Printf("bad notify payload: %+v", pqEvent) + continue + } serial++ e := &event{ - LogUUID: pqEvent.Extra, + LogID: logID, Received: time.Now(), Serial: serial, db: db, } - debugLogf("%+v", e) + debugLogf("event %d %+v", e.Serial, e) eventQueue <- e go e.Detail() } } -// NewSink subscribes to the event source. If c is not nil, it will be -// used as the event channel. Otherwise, a new channel will be -// created. Either way, the sink channel will be returned by the -// Channel() method of the returned eventSink. All subsequent events -// will be sent to the sink channel. The caller must ensure events are -// received from the sink channel as quickly as possible: when one -// sink blocks, all other sinks also block. -func (ps *pgEventSource) NewSink(c chan *event) eventSink { +// NewSink subscribes to the event source. NewSink returns an +// eventSink, whose Channel() method returns a channel: a pointer to +// each subsequent event will be sent to that channel. +// +// The caller must ensure events are received from the sink channel as +// quickly as possible because when one sink stops being ready, all +// other sinks block. +func (ps *pgEventSource) NewSink() eventSink { ps.setupOnce.Do(ps.setup) - if c == nil { - c = make(chan *event, 1) - } sink := &pgEventSink{ - channel: c, + channel: make(chan *event, 1), source: ps, } ps.mtx.Lock() diff --git a/services/ws/proxy_client.go b/services/ws/proxy_client.go new file mode 100644 index 0000000000..28be2e2554 --- /dev/null +++ b/services/ws/proxy_client.go @@ -0,0 +1,41 @@ +package main + +import ( + "net/http" + "net/url" + + "git.curoverse.com/arvados.git/sdk/go/arvados" +) + +type proxyClient struct { + *arvados.Client +} + +func NewProxyClient(ac arvados.Client) *proxyClient { + ac.AuthToken = "" + return &proxyClient{ + Client: &ac, + } +} + +func (pc *proxyClient) SetToken(token string) { + pc.Client.AuthToken = token +} + +func (pc *proxyClient) CheckReadPermission(uuid string) (bool, error) { + var buf map[string]interface{} + path, err := pc.PathForUUID("get", uuid) + if err != nil { + return false, err + } + err = pc.RequestAndDecode(&buf, "GET", path, nil, url.Values{ + "select": {`["uuid"]`}, + }) + if err, ok := err.(arvados.TransactionError); ok && err.StatusCode == http.StatusNotFound { + return false, nil + } + if err != nil { + return false, err + } + return true, nil +} diff --git a/services/ws/router.go b/services/ws/router.go index 685b6132ab..30f93eab3a 100644 --- a/services/ws/router.go +++ b/services/ws/router.go @@ -21,10 +21,12 @@ type router struct { func (rtr *router) setup() { rtr.mux = http.NewServeMux() rtr.mux.Handle("/websocket", rtr.makeServer(&handlerV0{ + Client: rtr.Config.Client, PingTimeout: rtr.Config.PingTimeout.Duration(), QueueSize: rtr.Config.ClientEventQueue, })) rtr.mux.Handle("/arvados/v1/events.ws", rtr.makeServer(&handlerV1{ + Client: rtr.Config.Client, PingTimeout: rtr.Config.PingTimeout.Duration(), QueueSize: rtr.Config.ClientEventQueue, })) @@ -37,7 +39,7 @@ func (rtr *router) makeServer(handler handler) *websocket.Server { }, Handler: websocket.Handler(func(ws *websocket.Conn) { log.Printf("%v accepted", ws.Request().RemoteAddr) - sink := rtr.eventSource.NewSink(nil) + sink := rtr.eventSource.NewSink() handler.Handle(ws, sink.Channel()) sink.Stop() ws.Close() -- 2.39.5