1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
16 "git.curoverse.com/arvados.git/sdk/go/stats"
20 type pgConfig map[string]string
22 func (c pgConfig) ConnectionString() string {
28 strings.Replace(v, `\`, `\\`, -1),
35 type pgEventSource struct {
41 pqListener *pq.Listener
43 sinks map[*pgEventSink]bool
46 lastQDelay time.Duration
56 var _ debugStatuser = (*pgEventSource)(nil)
58 func (ps *pgEventSource) listenerProblem(et pq.ListenerEventType, err error) {
59 if et == pq.ListenerEventConnected {
60 logger(nil).Debug("pgEventSource connected")
64 // Until we have a mechanism for catching up on missed events,
65 // we cannot recover from a dropped connection without
66 // breaking our promises to clients.
68 WithField("eventType", et).
70 Error("listener problem")
74 func (ps *pgEventSource) setup() {
75 ps.ready = make(chan bool)
78 // Close stops listening for new events and disconnects all clients.
79 func (ps *pgEventSource) Close() {
84 // WaitReady returns when the event listener is connected.
85 func (ps *pgEventSource) WaitReady() {
86 ps.setupOnce.Do(ps.setup)
90 // Run listens for event notifications on the "logs" channel and sends
91 // them to all subscribers.
92 func (ps *pgEventSource) Run() {
93 logger(nil).Debug("pgEventSource Run starting")
94 defer logger(nil).Debug("pgEventSource Run finished")
96 ps.setupOnce.Do(ps.setup)
104 ctx, cancel := context.WithCancel(context.Background())
109 // Disconnect all clients
111 for sink := range ps.sinks {
118 db, err := sql.Open("postgres", ps.DataSource)
120 logger(nil).WithError(err).Error("sql.Open failed")
123 if ps.MaxOpenConns <= 0 {
124 logger(nil).Warn("no database connection limit configured -- consider setting PostgresPool>0 in arvados-ws configuration file")
126 db.SetMaxOpenConns(ps.MaxOpenConns)
127 if err = db.Ping(); err != nil {
128 logger(nil).WithError(err).Error("db.Ping failed")
133 ps.pqListener = pq.NewListener(ps.DataSource, time.Second, time.Minute, ps.listenerProblem)
134 err = ps.pqListener.Listen("logs")
136 logger(nil).WithError(err).Error("pq Listen failed")
139 defer ps.pqListener.Close()
140 logger(nil).Debug("pq Listen setup done")
143 // Avoid double-close in deferred func
146 ps.queue = make(chan *event, ps.QueueSize)
147 defer close(ps.queue)
150 for e := range ps.queue {
151 // Wait for the "select ... from logs" call to
152 // finish. This limits max concurrent queries
153 // to ps.QueueSize. Without this, max
154 // concurrent queries would be bounded by
155 // client_count X client_queue_size.
159 WithField("serial", e.Serial).
160 WithField("detail", e.Detail()).
163 ps.lastQDelay = e.Ready.Sub(e.Received)
166 atomic.AddUint64(&ps.eventsOut, uint64(len(ps.sinks)))
167 for sink := range ps.sinks {
175 ticker := time.NewTicker(time.Minute)
180 logger(nil).Debug("ctx done")
184 logger(nil).Debug("listener ping")
187 case pqEvent, ok := <-ps.pqListener.Notify:
189 logger(nil).Debug("pqListener Notify chan closed")
193 // pq should call listenerProblem
194 // itself in addition to sending us a
195 // nil event, so this might be
197 ps.listenerProblem(-1, nil)
200 if pqEvent.Channel != "logs" {
201 logger(nil).WithField("pqEvent", pqEvent).Error("unexpected notify from wrong channel")
204 logID, err := strconv.ParseUint(pqEvent.Extra, 10, 64)
206 logger(nil).WithField("pqEvent", pqEvent).Error("bad notify payload")
212 Received: time.Now(),
216 logger(nil).WithField("event", e).Debug("incoming")
217 atomic.AddUint64(&ps.eventsIn, 1)
224 // NewSink subscribes to the event source. NewSink returns an
225 // eventSink, whose Channel() method returns a channel: a pointer to
226 // each subsequent event will be sent to that channel.
228 // The caller must ensure events are received from the sink channel as
229 // quickly as possible because when one sink stops being ready, all
230 // other sinks block.
231 func (ps *pgEventSource) NewSink() eventSink {
232 sink := &pgEventSink{
233 channel: make(chan *event, 1),
238 ps.sinks = make(map[*pgEventSink]bool)
240 ps.sinks[sink] = true
245 func (ps *pgEventSource) DB() *sql.DB {
249 func (ps *pgEventSource) DBHealth() error {
250 ctx, _ := context.WithDeadline(context.Background(), time.Now().Add(time.Second))
252 return ps.db.QueryRowContext(ctx, "SELECT 1").Scan(&i)
255 func (ps *pgEventSource) DebugStatus() interface{} {
257 defer ps.mtx.Unlock()
259 for sink := range ps.sinks {
260 blocked += len(sink.channel)
262 return map[string]interface{}{
263 "EventsIn": atomic.LoadUint64(&ps.eventsIn),
264 "EventsOut": atomic.LoadUint64(&ps.eventsOut),
265 "Queue": len(ps.queue),
266 "QueueLimit": cap(ps.queue),
267 "QueueDelay": stats.Duration(ps.lastQDelay),
268 "Sinks": len(ps.sinks),
269 "SinksBlocked": blocked,
270 "DBStats": ps.db.Stats(),
274 type pgEventSink struct {
276 source *pgEventSource
279 func (sink *pgEventSink) Channel() <-chan *event {
283 // Stop sending events to the sink's channel.
284 func (sink *pgEventSink) Stop() {
286 // Ensure this sink cannot fill up and block the
287 // server-side queue (which otherwise could in turn
288 // block our mtx.Lock() here)
289 for _ = range sink.channel {
292 sink.source.mtx.Lock()
293 if _, ok := sink.source.sinks[sink]; ok {
294 delete(sink.source.sinks, sink)
297 sink.source.mtx.Unlock()