From 7cb536fa58d8cc837b4cb59680c7355a1687648b Mon Sep 17 00:00:00 2001 From: Tom Clegg Date: Sun, 13 Nov 2016 19:43:46 -0500 Subject: [PATCH] 8460: Receive events and broadcast to clients. --- services/ws/config.go | 1 + services/ws/event.go | 50 ++++++++++++++++ services/ws/handler.go | 9 +++ services/ws/handler_v0.go | 6 +- services/ws/handler_v1.go | 6 +- services/ws/main.go | 10 +++- services/ws/pg.go | 117 +++++++++++++++++++++++++++++++++++--- services/ws/router.go | 21 ++++--- 8 files changed, 200 insertions(+), 20 deletions(-) create mode 100644 services/ws/handler.go diff --git a/services/ws/config.go b/services/ws/config.go index 60731f9b23..9a2bb3c979 100644 --- a/services/ws/config.go +++ b/services/ws/config.go @@ -8,6 +8,7 @@ type Config struct { Client arvados.Client Postgres pgConfig Listen string + Debug bool ClientEventQueue int ServerEventQueue int diff --git a/services/ws/event.go b/services/ws/event.go index 26cdb3b534..1634a7a18a 100644 --- a/services/ws/event.go +++ b/services/ws/event.go @@ -1,4 +1,54 @@ package main +import ( + "database/sql" + "log" + "sync" + "time" + + "git.curoverse.com/arvados.git/sdk/go/arvados" +) + +type eventSink interface { + Channel() <-chan *event + Stop() +} + +type eventSource interface { + NewSink(chan *event) eventSink +} + type event struct { + LogUUID string + Received time.Time + Serial uint64 + + logRow *arvados.Log + err error + mtx sync.Mutex +} + +// Detail returns the database row corresponding to the event. It can +// be called safely from multiple goroutines. Only one attempt will be +// made. If the database row cannot be retrieved, Detail returns nil. +func (e *event) Detail(db *sql.DB) *arvados.Log { + e.mtx.Lock() + defer e.mtx.Unlock() + if e.logRow != nil || e.err != nil { + return e.logRow + } + var logRow arvados.Log + var oldAttrs, newAttrs []byte + e.err = db.QueryRow(`SELECT uuid, object_uuid, object_owner_uuid, event_type, created_at, old_attributes, new_attributes FROM logs WHERE uuid = ?`, e.LogUUID).Scan( + &logRow.UUID, + &logRow.ObjectUUID, + &logRow.ObjectOwnerUUID, + &logRow.EventType, + &logRow.CreatedAt, + &oldAttrs, + &newAttrs) + if e.err != nil { + log.Printf("retrieving log row %s: %s", e.LogUUID, e.err) + } + return e.logRow } diff --git a/services/ws/handler.go b/services/ws/handler.go new file mode 100644 index 0000000000..c7681431f5 --- /dev/null +++ b/services/ws/handler.go @@ -0,0 +1,9 @@ +package main + +import ( + "io" +) + +type handler interface { + Handle(io.ReadWriter, <-chan *event) +} diff --git a/services/ws/handler_v0.go b/services/ws/handler_v0.go index b53f950461..40ab75b6ef 100644 --- a/services/ws/handler_v0.go +++ b/services/ws/handler_v0.go @@ -4,5 +4,9 @@ import ( "io" ) -func handlerV0(ws io.ReadWriter) { +type handlerV0 struct { + QueueSize int +} + +func (h *handlerV0) Handle(ws io.ReadWriter, events <-chan *event) { } diff --git a/services/ws/handler_v1.go b/services/ws/handler_v1.go index fcf85dd22a..4b3f12b908 100644 --- a/services/ws/handler_v1.go +++ b/services/ws/handler_v1.go @@ -4,5 +4,9 @@ import ( "io" ) -func handlerV1(ws io.ReadWriter) { +type handlerV1 struct { + QueueSize int +} + +func (h *handlerV1) Handle(ws io.ReadWriter, events <-chan *event) { } diff --git a/services/ws/main.go b/services/ws/main.go index 9a24b312b0..0f978231b9 100644 --- a/services/ws/main.go +++ b/services/ws/main.go @@ -10,6 +10,8 @@ import ( "git.curoverse.com/arvados.git/sdk/go/config" ) +var debugLogf = func(string, ...interface{}) {} + func main() { configPath := flag.String("config", "/etc/arvados/ws/ws.yml", "`path` to config file") dumpConfig := flag.Bool("dump-config", false, "show current configuration and exit") @@ -20,6 +22,9 @@ func main() { if err != nil { log.Fatal(err) } + if cfg.Debug { + debugLogf = log.Printf + } if *dumpConfig { txt, err := config.Dump(&cfg) @@ -36,10 +41,11 @@ func main() { WriteTimeout: time.Minute, MaxHeaderBytes: 1 << 20, Handler: &router{ - EventSource: (&pgEventSource{ + Config: &cfg, + eventSource: &pgEventSource{ PgConfig: cfg.Postgres, QueueSize: cfg.ServerEventQueue, - }).EventSource(), + }, }, } log.Fatal(srv.ListenAndServe()) diff --git a/services/ws/pg.go b/services/ws/pg.go index 01b5bff877..6bce668fca 100644 --- a/services/ws/pg.go +++ b/services/ws/pg.go @@ -5,8 +5,9 @@ import ( "log" "strings" "sync" + "time" - _ "github.com/lib/pq" + "github.com/lib/pq" ) type pgConfig map[string]string @@ -28,19 +29,117 @@ type pgEventSource struct { PgConfig pgConfig QueueSize int - db *sql.DB - setupOnce sync.Once + pqListener *pq.Listener + sinks map[*pgEventSink]bool + setupOnce sync.Once + mtx sync.Mutex } -func (es *pgEventSource) setup() { - db, err := sql.Open("postgres", es.PgConfig.ConnectionString()) +func (ps *pgEventSource) setup() { + ps.sinks = make(map[*pgEventSink]bool) + go ps.run() +} + +func (ps *pgEventSource) run() { + db, err := sql.Open("postgres", ps.PgConfig.ConnectionString()) if err != nil { log.Fatal(err) } - es.db = db + + listener := pq.NewListener(ps.PgConfig.ConnectionString(), time.Second, time.Minute, func(ev pq.ListenerEventType, err error) { + if err != nil { + // Until we have a mechanism for catching up + // on missed events, we cannot recover from a + // dropped connection without breaking our + // promises to clients. + log.Fatal(err) + } + }) + err = listener.Listen("logs") + if err != nil { + log.Fatal(err) + } + go func() { + for _ = range time.NewTicker(time.Minute).C { + listener.Ping() + } + }() + + eventQueue := make(chan *event, ps.QueueSize) + go func() { + for e := range eventQueue { + // Wait for the "select ... from logs" call to + // finish. This limits max concurrent queries + // to ps.QueueSize. Without this, max + // concurrent queries would be bounded by + // client_count X client_queue_size. + e.Detail(db) + debugLogf("%+v", e) + ps.mtx.Lock() + for sink := range ps.sinks { + sink.channel <- e + } + ps.mtx.Unlock() + } + }() + + var serial uint64 + for pqEvent := range listener.Notify { + if pqEvent.Channel != "logs" { + continue + } + serial++ + e := &event{ + LogUUID: pqEvent.Extra, + Received: time.Now(), + Serial: serial, + } + debugLogf("%+v", e) + eventQueue <- e + go e.Detail(db) + } +} + +// NewSink subscribes to the event source. If c is not nil, it will be +// used as the event channel. Otherwise, a new channel will be +// created. Either way, the sink channel will be returned by the +// Channel() method of the returned eventSink. All subsequent events +// will be sent to the sink channel. The caller must ensure events are +// received from the sink channel as quickly as possible: when one +// sink blocks, all other sinks also block. +func (ps *pgEventSource) NewSink(c chan *event) eventSink { + ps.setupOnce.Do(ps.setup) + if c == nil { + c = make(chan *event, 1) + } + sink := &pgEventSink{ + channel: c, + source: ps, + } + ps.mtx.Lock() + ps.sinks[sink] = true + ps.mtx.Unlock() + return sink +} + +type pgEventSink struct { + channel chan *event + source *pgEventSource +} + +func (sink *pgEventSink) Channel() <-chan *event { + return sink.channel } -func (es *pgEventSource) EventSource() <-chan event { - es.setupOnce.Do(es.setup) - return nil +func (sink *pgEventSink) Stop() { + go func() { + // Ensure this sink cannot fill up and block the + // server-side queue (which otherwise could in turn + // block our mtx.Lock() here) + for _ = range sink.channel {} + }() + sink.source.mtx.Lock() + delete(sink.source.sinks, sink) + sink.source.mtx.Unlock() + close(sink.channel) } diff --git a/services/ws/router.go b/services/ws/router.go index 8b7658e4de..01c1477443 100644 --- a/services/ws/router.go +++ b/services/ws/router.go @@ -1,7 +1,6 @@ package main import ( - "io" "log" "net/http" "sync" @@ -10,25 +9,33 @@ import ( ) type router struct { - EventSource <-chan event + Config *Config + + eventSource eventSource mux *http.ServeMux setupOnce sync.Once } func (rtr *router) setup() { rtr.mux = http.NewServeMux() - rtr.mux.Handle("/websocket", makeServer(handlerV0)) - rtr.mux.Handle("/arvados/v1/events.ws", makeServer(handlerV1)) + rtr.mux.Handle("/websocket", rtr.makeServer(&handlerV0{ + QueueSize: rtr.Config.ClientEventQueue, + })) + rtr.mux.Handle("/arvados/v1/events.ws", rtr.makeServer(&handlerV1{ + QueueSize: rtr.Config.ClientEventQueue, + })) } -func makeServer(handler func(io.ReadWriter)) websocket.Server { - return websocket.Server{ +func (rtr *router) makeServer(handler handler) *websocket.Server { + return &websocket.Server{ Handshake: func(c *websocket.Config, r *http.Request) error { return nil }, Handler: websocket.Handler(func(ws *websocket.Conn) { log.Printf("socket request: %+v", ws.Request()) - handler(ws) + sink := rtr.eventSource.NewSink(nil) + handler.Handle(ws, sink.Channel()) + sink.Stop() ws.Close() log.Printf("socket disconnect: %+v", ws.Request().RemoteAddr) }), -- 2.30.2