19070: Fix tests
[arvados.git] / services / ws / event_source.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package ws
6
7 import (
8         "context"
9         "database/sql"
10         "errors"
11         "fmt"
12         "strconv"
13         "sync"
14         "time"
15
16         "git.arvados.org/arvados.git/sdk/go/stats"
17         "github.com/lib/pq"
18         "github.com/prometheus/client_golang/prometheus"
19         "github.com/sirupsen/logrus"
20 )
21
22 type pgEventSource struct {
23         DataSource   string
24         MaxOpenConns int
25         QueueSize    int
26         Logger       logrus.FieldLogger
27         Reg          *prometheus.Registry
28
29         db         *sql.DB
30         pqListener *pq.Listener
31         queue      chan *event
32         sinks      map[*pgEventSink]bool
33         mtx        sync.Mutex
34
35         lastQDelay time.Duration
36         eventsIn   prometheus.Counter
37         eventsOut  prometheus.Counter
38
39         cancel func()
40
41         setupOnce sync.Once
42         ready     chan bool
43 }
44
45 func (ps *pgEventSource) listenerProblem(et pq.ListenerEventType, err error) {
46         if et == pq.ListenerEventConnected {
47                 ps.Logger.Debug("pgEventSource connected")
48                 return
49         }
50
51         // Until we have a mechanism for catching up on missed events,
52         // we cannot recover from a dropped connection without
53         // breaking our promises to clients.
54         ps.Logger.
55                 WithField("eventType", et).
56                 WithError(err).
57                 Error("listener problem")
58         ps.cancel()
59 }
60
61 func (ps *pgEventSource) setup() {
62         ps.ready = make(chan bool)
63         ps.Reg.MustRegister(prometheus.NewGaugeFunc(
64                 prometheus.GaugeOpts{
65                         Namespace: "arvados",
66                         Subsystem: "ws",
67                         Name:      "queue_len",
68                         Help:      "Current number of events in queue",
69                 }, func() float64 { return float64(len(ps.queue)) }))
70         ps.Reg.MustRegister(prometheus.NewGaugeFunc(
71                 prometheus.GaugeOpts{
72                         Namespace: "arvados",
73                         Subsystem: "ws",
74                         Name:      "queue_cap",
75                         Help:      "Event queue capacity",
76                 }, func() float64 { return float64(cap(ps.queue)) }))
77         ps.Reg.MustRegister(prometheus.NewGaugeFunc(
78                 prometheus.GaugeOpts{
79                         Namespace: "arvados",
80                         Subsystem: "ws",
81                         Name:      "queue_delay",
82                         Help:      "Queue delay of the last emitted event",
83                 }, func() float64 { return ps.lastQDelay.Seconds() }))
84         ps.Reg.MustRegister(prometheus.NewGaugeFunc(
85                 prometheus.GaugeOpts{
86                         Namespace: "arvados",
87                         Subsystem: "ws",
88                         Name:      "sinks",
89                         Help:      "Number of active sinks (connections)",
90                 }, func() float64 { return float64(len(ps.sinks)) }))
91         ps.Reg.MustRegister(prometheus.NewGaugeFunc(
92                 prometheus.GaugeOpts{
93                         Namespace: "arvados",
94                         Subsystem: "ws",
95                         Name:      "sinks_blocked",
96                         Help:      "Number of sinks (connections) that are busy and blocking the main event stream",
97                 }, func() float64 {
98                         ps.mtx.Lock()
99                         defer ps.mtx.Unlock()
100                         blocked := 0
101                         for sink := range ps.sinks {
102                                 blocked += len(sink.channel)
103                         }
104                         return float64(blocked)
105                 }))
106         ps.eventsIn = prometheus.NewCounter(prometheus.CounterOpts{
107                 Namespace: "arvados",
108                 Subsystem: "ws",
109                 Name:      "events_in",
110                 Help:      "Number of events received from postgresql notify channel",
111         })
112         ps.Reg.MustRegister(ps.eventsIn)
113         ps.eventsOut = prometheus.NewCounter(prometheus.CounterOpts{
114                 Namespace: "arvados",
115                 Subsystem: "ws",
116                 Name:      "events_out",
117                 Help:      "Number of events sent to client sessions (before filtering)",
118         })
119         ps.Reg.MustRegister(ps.eventsOut)
120
121         maxConnections := prometheus.NewGauge(prometheus.GaugeOpts{
122                 Namespace: "arvados",
123                 Subsystem: "ws",
124                 Name:      "db_max_connections",
125                 Help:      "Maximum number of open connections to the database",
126         })
127         ps.Reg.MustRegister(maxConnections)
128         openConnections := prometheus.NewGaugeVec(prometheus.GaugeOpts{
129                 Namespace: "arvados",
130                 Subsystem: "ws",
131                 Name:      "db_open_connections",
132                 Help:      "Open connections to the database",
133         }, []string{"inuse"})
134         ps.Reg.MustRegister(openConnections)
135
136         updateDBStats := func() {
137                 stats := ps.db.Stats()
138                 maxConnections.Set(float64(stats.MaxOpenConnections))
139                 openConnections.WithLabelValues("0").Set(float64(stats.Idle))
140                 openConnections.WithLabelValues("1").Set(float64(stats.InUse))
141         }
142         go func() {
143                 <-ps.ready
144                 if ps.db == nil {
145                         return
146                 }
147                 updateDBStats()
148                 for range time.Tick(time.Second) {
149                         updateDBStats()
150                 }
151         }()
152 }
153
154 // Close stops listening for new events and disconnects all clients.
155 func (ps *pgEventSource) Close() {
156         ps.WaitReady()
157         ps.cancel()
158 }
159
160 // WaitReady returns when the event listener is connected.
161 func (ps *pgEventSource) WaitReady() {
162         ps.setupOnce.Do(ps.setup)
163         <-ps.ready
164 }
165
166 // Run listens for event notifications on the "logs" channel and sends
167 // them to all subscribers.
168 func (ps *pgEventSource) Run() {
169         ps.Logger.Debug("pgEventSource Run starting")
170         defer ps.Logger.Debug("pgEventSource Run finished")
171
172         ps.setupOnce.Do(ps.setup)
173         ready := ps.ready
174         defer func() {
175                 if ready != nil {
176                         close(ready)
177                 }
178         }()
179
180         ctx, cancel := context.WithCancel(context.Background())
181         ps.cancel = cancel
182         defer cancel()
183
184         defer func() {
185                 // Disconnect all clients
186                 ps.mtx.Lock()
187                 for sink := range ps.sinks {
188                         close(sink.channel)
189                 }
190                 ps.sinks = nil
191                 ps.mtx.Unlock()
192         }()
193
194         db, err := sql.Open("postgres", ps.DataSource)
195         if err != nil {
196                 ps.Logger.WithError(err).Error("sql.Open failed")
197                 return
198         }
199         if ps.MaxOpenConns <= 0 {
200                 ps.Logger.Warn("no database connection limit configured -- consider setting PostgreSQL.ConnectionPool>0 in arvados-ws configuration file")
201         }
202         db.SetMaxOpenConns(ps.MaxOpenConns)
203         if err = db.Ping(); err != nil {
204                 ps.Logger.WithError(err).Error("db.Ping failed")
205                 return
206         }
207         ps.db = db
208
209         ps.pqListener = pq.NewListener(ps.DataSource, time.Second, time.Minute, ps.listenerProblem)
210         err = ps.pqListener.Listen("logs")
211         if err != nil {
212                 ps.Logger.WithError(err).Error("pq Listen failed")
213                 return
214         }
215         defer ps.pqListener.Close()
216         ps.Logger.Debug("pq Listen setup done")
217
218         close(ready)
219         // Avoid double-close in deferred func
220         ready = nil
221
222         ps.queue = make(chan *event, ps.QueueSize)
223         defer close(ps.queue)
224
225         go func() {
226                 for e := range ps.queue {
227                         // Wait for the "select ... from logs" call to
228                         // finish. This limits max concurrent queries
229                         // to ps.QueueSize. Without this, max
230                         // concurrent queries would be bounded by
231                         // client_count X client_queue_size.
232                         e.Detail()
233
234                         ps.Logger.
235                                 WithField("serial", e.Serial).
236                                 WithField("detail", e.Detail()).
237                                 Debug("event ready")
238                         e.Ready = time.Now()
239                         ps.lastQDelay = e.Ready.Sub(e.Received)
240
241                         ps.mtx.Lock()
242                         for sink := range ps.sinks {
243                                 sink.channel <- e
244                                 ps.eventsOut.Inc()
245                         }
246                         ps.mtx.Unlock()
247                 }
248         }()
249
250         var serial uint64
251         ticker := time.NewTicker(time.Minute)
252         defer ticker.Stop()
253         for {
254                 select {
255                 case <-ctx.Done():
256                         ps.Logger.Debug("ctx done")
257                         return
258
259                 case <-ticker.C:
260                         ps.Logger.Debug("listener ping")
261                         err := ps.pqListener.Ping()
262                         if err != nil {
263                                 ps.listenerProblem(-1, fmt.Errorf("pqListener ping failed: %s", err))
264                                 continue
265                         }
266
267                 case pqEvent, ok := <-ps.pqListener.Notify:
268                         if !ok {
269                                 ps.Logger.Error("pqListener Notify chan closed")
270                                 return
271                         }
272                         if pqEvent == nil {
273                                 // pq should call listenerProblem
274                                 // itself in addition to sending us a
275                                 // nil event, so this might be
276                                 // superfluous:
277                                 ps.listenerProblem(-1, errors.New("pqListener Notify chan received nil event"))
278                                 continue
279                         }
280                         if pqEvent.Channel != "logs" {
281                                 ps.Logger.WithField("pqEvent", pqEvent).Error("unexpected notify from wrong channel")
282                                 continue
283                         }
284                         logID, err := strconv.ParseUint(pqEvent.Extra, 10, 64)
285                         if err != nil {
286                                 ps.Logger.WithField("pqEvent", pqEvent).Error("bad notify payload")
287                                 continue
288                         }
289                         serial++
290                         e := &event{
291                                 LogID:    logID,
292                                 Received: time.Now(),
293                                 Serial:   serial,
294                                 db:       ps.db,
295                                 logger:   ps.Logger,
296                         }
297                         ps.Logger.WithField("event", e).Debug("incoming")
298                         ps.eventsIn.Inc()
299                         ps.queue <- e
300                         go e.Detail()
301                 }
302         }
303 }
304
305 // NewSink subscribes to the event source. NewSink returns an
306 // eventSink, whose Channel() method returns a channel: a pointer to
307 // each subsequent event will be sent to that channel.
308 //
309 // The caller must ensure events are received from the sink channel as
310 // quickly as possible because when one sink stops being ready, all
311 // other sinks block.
312 func (ps *pgEventSource) NewSink() eventSink {
313         sink := &pgEventSink{
314                 channel: make(chan *event, 1),
315                 source:  ps,
316         }
317         ps.mtx.Lock()
318         if ps.sinks == nil {
319                 ps.sinks = make(map[*pgEventSink]bool)
320         }
321         ps.sinks[sink] = true
322         ps.mtx.Unlock()
323         return sink
324 }
325
326 func (ps *pgEventSource) DB() *sql.DB {
327         ps.WaitReady()
328         return ps.db
329 }
330
331 func (ps *pgEventSource) DBHealth() error {
332         if ps.db == nil {
333                 return errors.New("database not connected")
334         }
335         ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second))
336         defer cancel()
337         var i int
338         return ps.db.QueryRowContext(ctx, "SELECT 1").Scan(&i)
339 }
340
341 func (ps *pgEventSource) DebugStatus() interface{} {
342         ps.mtx.Lock()
343         defer ps.mtx.Unlock()
344         blocked := 0
345         for sink := range ps.sinks {
346                 blocked += len(sink.channel)
347         }
348         return map[string]interface{}{
349                 "Queue":        len(ps.queue),
350                 "QueueLimit":   cap(ps.queue),
351                 "QueueDelay":   stats.Duration(ps.lastQDelay),
352                 "Sinks":        len(ps.sinks),
353                 "SinksBlocked": blocked,
354                 "DBStats":      ps.db.Stats(),
355         }
356 }
357
358 type pgEventSink struct {
359         channel chan *event
360         source  *pgEventSource
361 }
362
363 func (sink *pgEventSink) Channel() <-chan *event {
364         return sink.channel
365 }
366
367 // Stop sending events to the sink's channel.
368 func (sink *pgEventSink) Stop() {
369         go func() {
370                 // Ensure this sink cannot fill up and block the
371                 // server-side queue (which otherwise could in turn
372                 // block our mtx.Lock() here)
373                 for range sink.channel {
374                 }
375         }()
376         sink.source.mtx.Lock()
377         if _, ok := sink.source.sinks[sink]; ok {
378                 delete(sink.source.sinks, sink)
379                 close(sink.channel)
380         }
381         sink.source.mtx.Unlock()
382 }