1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
17 "git.arvados.org/arvados.git/sdk/go/stats"
19 "github.com/sirupsen/logrus"
22 type pgEventSource struct {
26 Logger logrus.FieldLogger
29 pqListener *pq.Listener
31 sinks map[*pgEventSink]bool
34 lastQDelay time.Duration
44 var _ debugStatuser = (*pgEventSource)(nil)
46 func (ps *pgEventSource) listenerProblem(et pq.ListenerEventType, err error) {
47 if et == pq.ListenerEventConnected {
48 ps.Logger.Debug("pgEventSource connected")
52 // Until we have a mechanism for catching up on missed events,
53 // we cannot recover from a dropped connection without
54 // breaking our promises to clients.
56 WithField("eventType", et).
58 Error("listener problem")
62 func (ps *pgEventSource) setup() {
63 ps.ready = make(chan bool)
66 // Close stops listening for new events and disconnects all clients.
67 func (ps *pgEventSource) Close() {
72 // WaitReady returns when the event listener is connected.
73 func (ps *pgEventSource) WaitReady() {
74 ps.setupOnce.Do(ps.setup)
78 // Run listens for event notifications on the "logs" channel and sends
79 // them to all subscribers.
80 func (ps *pgEventSource) Run() {
81 ps.Logger.Debug("pgEventSource Run starting")
82 defer ps.Logger.Debug("pgEventSource Run finished")
84 ps.setupOnce.Do(ps.setup)
92 ctx, cancel := context.WithCancel(context.Background())
97 // Disconnect all clients
99 for sink := range ps.sinks {
106 db, err := sql.Open("postgres", ps.DataSource)
108 ps.Logger.WithError(err).Error("sql.Open failed")
111 if ps.MaxOpenConns <= 0 {
112 ps.Logger.Warn("no database connection limit configured -- consider setting PostgresPool>0 in arvados-ws configuration file")
114 db.SetMaxOpenConns(ps.MaxOpenConns)
115 if err = db.Ping(); err != nil {
116 ps.Logger.WithError(err).Error("db.Ping failed")
121 ps.pqListener = pq.NewListener(ps.DataSource, time.Second, time.Minute, ps.listenerProblem)
122 err = ps.pqListener.Listen("logs")
124 ps.Logger.WithError(err).Error("pq Listen failed")
127 defer ps.pqListener.Close()
128 ps.Logger.Debug("pq Listen setup done")
131 // Avoid double-close in deferred func
134 ps.queue = make(chan *event, ps.QueueSize)
135 defer close(ps.queue)
138 for e := range ps.queue {
139 // Wait for the "select ... from logs" call to
140 // finish. This limits max concurrent queries
141 // to ps.QueueSize. Without this, max
142 // concurrent queries would be bounded by
143 // client_count X client_queue_size.
147 WithField("serial", e.Serial).
148 WithField("detail", e.Detail()).
151 ps.lastQDelay = e.Ready.Sub(e.Received)
154 atomic.AddUint64(&ps.eventsOut, uint64(len(ps.sinks)))
155 for sink := range ps.sinks {
163 ticker := time.NewTicker(time.Minute)
168 ps.Logger.Debug("ctx done")
172 ps.Logger.Debug("listener ping")
173 err := ps.pqListener.Ping()
175 ps.listenerProblem(-1, fmt.Errorf("pqListener ping failed: %s", err))
179 case pqEvent, ok := <-ps.pqListener.Notify:
181 ps.Logger.Error("pqListener Notify chan closed")
185 // pq should call listenerProblem
186 // itself in addition to sending us a
187 // nil event, so this might be
189 ps.listenerProblem(-1, errors.New("pqListener Notify chan received nil event"))
192 if pqEvent.Channel != "logs" {
193 ps.Logger.WithField("pqEvent", pqEvent).Error("unexpected notify from wrong channel")
196 logID, err := strconv.ParseUint(pqEvent.Extra, 10, 64)
198 ps.Logger.WithField("pqEvent", pqEvent).Error("bad notify payload")
204 Received: time.Now(),
209 ps.Logger.WithField("event", e).Debug("incoming")
210 atomic.AddUint64(&ps.eventsIn, 1)
217 // NewSink subscribes to the event source. NewSink returns an
218 // eventSink, whose Channel() method returns a channel: a pointer to
219 // each subsequent event will be sent to that channel.
221 // The caller must ensure events are received from the sink channel as
222 // quickly as possible because when one sink stops being ready, all
223 // other sinks block.
224 func (ps *pgEventSource) NewSink() eventSink {
225 sink := &pgEventSink{
226 channel: make(chan *event, 1),
231 ps.sinks = make(map[*pgEventSink]bool)
233 ps.sinks[sink] = true
238 func (ps *pgEventSource) DB() *sql.DB {
243 func (ps *pgEventSource) DBHealth() error {
245 return errors.New("database not connected")
247 ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second))
250 return ps.db.QueryRowContext(ctx, "SELECT 1").Scan(&i)
253 func (ps *pgEventSource) DebugStatus() interface{} {
255 defer ps.mtx.Unlock()
257 for sink := range ps.sinks {
258 blocked += len(sink.channel)
260 return map[string]interface{}{
261 "EventsIn": atomic.LoadUint64(&ps.eventsIn),
262 "EventsOut": atomic.LoadUint64(&ps.eventsOut),
263 "Queue": len(ps.queue),
264 "QueueLimit": cap(ps.queue),
265 "QueueDelay": stats.Duration(ps.lastQDelay),
266 "Sinks": len(ps.sinks),
267 "SinksBlocked": blocked,
268 "DBStats": ps.db.Stats(),
272 type pgEventSink struct {
274 source *pgEventSource
277 func (sink *pgEventSink) Channel() <-chan *event {
281 // Stop sending events to the sink's channel.
282 func (sink *pgEventSink) Stop() {
284 // Ensure this sink cannot fill up and block the
285 // server-side queue (which otherwise could in turn
286 // block our mtx.Lock() here)
287 for range sink.channel {
290 sink.source.mtx.Lock()
291 if _, ok := sink.source.sinks[sink]; ok {
292 delete(sink.source.sinks, sink)
295 sink.source.mtx.Unlock()