X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/7cb536fa58d8cc837b4cb59680c7355a1687648b..d6b6b39bfe67926490506125c88f3567e45e7dcc:/services/ws/pg.go?ds=sidebyside diff --git a/services/ws/pg.go b/services/ws/pg.go index 6bce668fca..a5af9f765b 100644 --- a/services/ws/pg.go +++ b/services/ws/pg.go @@ -2,7 +2,9 @@ package main import ( "database/sql" + "fmt" "log" + "strconv" "strings" "sync" "time" @@ -26,46 +28,51 @@ func (c pgConfig) ConnectionString() string { } type pgEventSource struct { - PgConfig pgConfig - QueueSize int + DataSource string + QueueSize int + db *sql.DB pqListener *pq.Listener sinks map[*pgEventSink]bool setupOnce sync.Once mtx sync.Mutex + shutdown chan error } func (ps *pgEventSource) setup() { + ps.shutdown = make(chan error, 1) ps.sinks = make(map[*pgEventSink]bool) - go ps.run() -} -func (ps *pgEventSource) run() { - db, err := sql.Open("postgres", ps.PgConfig.ConnectionString()) + db, err := sql.Open("postgres", ps.DataSource) if err != nil { - log.Fatal(err) + log.Fatalf("sql.Open: %s", err) + } + if err = db.Ping(); err != nil { + log.Fatalf("db.Ping: %s", err) } + ps.db = db - listener := pq.NewListener(ps.PgConfig.ConnectionString(), time.Second, time.Minute, func(ev pq.ListenerEventType, err error) { + ps.pqListener = pq.NewListener(ps.DataSource, time.Second, time.Minute, func(ev pq.ListenerEventType, err error) { if err != nil { // Until we have a mechanism for catching up // on missed events, we cannot recover from a // dropped connection without breaking our // promises to clients. - log.Fatal(err) + ps.shutdown <- fmt.Errorf("pgEventSource listener problem: %s", err) } }) - err = listener.Listen("logs") + err = ps.pqListener.Listen("logs") if err != nil { log.Fatal(err) } - go func() { - for _ = range time.NewTicker(time.Minute).C { - listener.Ping() - } - }() + debugLogf("pgEventSource listening") + + go ps.run() +} +func (ps *pgEventSource) run() { eventQueue := make(chan *event, ps.QueueSize) + go func() { for e := range eventQueue { // Wait for the "select ... from logs" call to @@ -73,8 +80,8 @@ func (ps *pgEventSource) run() { // to ps.QueueSize. Without this, max // concurrent queries would be bounded by // client_count X client_queue_size. - e.Detail(db) - debugLogf("%+v", e) + e.Detail() + debugLogf("event %d detail %+v", e.Serial, e.Detail()) ps.mtx.Lock() for sink := range ps.sinks { sink.channel <- e @@ -84,36 +91,59 @@ func (ps *pgEventSource) run() { }() var serial uint64 - for pqEvent := range listener.Notify { - if pqEvent.Channel != "logs" { - continue - } - serial++ - e := &event{ - LogUUID: pqEvent.Extra, - Received: time.Now(), - Serial: serial, + ticker := time.NewTicker(time.Minute) + defer ticker.Stop() + for { + select { + case err, ok := <-ps.shutdown: + if ok { + debugLogf("shutdown on error: %s", err) + } + close(eventQueue) + return + + case <-ticker.C: + debugLogf("pgEventSource listener ping") + ps.pqListener.Ping() + + case pqEvent, ok := <-ps.pqListener.Notify: + if !ok { + close(eventQueue) + return + } + if pqEvent.Channel != "logs" { + continue + } + logID, err := strconv.ParseUint(pqEvent.Extra, 10, 64) + if err != nil { + log.Printf("bad notify payload: %+v", pqEvent) + continue + } + serial++ + e := &event{ + LogID: logID, + Received: time.Now(), + Serial: serial, + db: ps.db, + } + debugLogf("event %d %+v", e.Serial, e) + eventQueue <- e + go e.Detail() } - debugLogf("%+v", e) - eventQueue <- e - go e.Detail(db) } } -// NewSink subscribes to the event source. If c is not nil, it will be -// used as the event channel. Otherwise, a new channel will be -// created. Either way, the sink channel will be returned by the -// Channel() method of the returned eventSink. All subsequent events -// will be sent to the sink channel. The caller must ensure events are -// received from the sink channel as quickly as possible: when one -// sink blocks, all other sinks also block. -func (ps *pgEventSource) NewSink(c chan *event) eventSink { +// NewSink subscribes to the event source. NewSink returns an +// eventSink, whose Channel() method returns a channel: a pointer to +// each subsequent event will be sent to that channel. +// +// The caller must ensure events are received from the sink channel as +// quickly as possible because when one sink stops being ready, all +// other sinks block. +func (ps *pgEventSource) NewSink() eventSink { ps.setupOnce.Do(ps.setup) - if c == nil { - c = make(chan *event, 1) - } sink := &pgEventSink{ - channel: c, + channel: make(chan *event, 1), source: ps, } ps.mtx.Lock() @@ -136,7 +166,8 @@ func (sink *pgEventSink) Stop() { // Ensure this sink cannot fill up and block the // server-side queue (which otherwise could in turn // block our mtx.Lock() here) - for _ = range sink.channel {} + for _ = range sink.channel { + } }() sink.source.mtx.Lock() delete(sink.source.sinks, sink)