Merge branch '20825-cwl-separate-runner' refs #20825
[arvados.git] / services / ws / event_source.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package ws
6
7 import (
8         "context"
9         "database/sql"
10         "errors"
11         "fmt"
12         "strconv"
13         "sync"
14         "time"
15
16         "git.arvados.org/arvados.git/sdk/go/stats"
17         "github.com/lib/pq"
18         "github.com/prometheus/client_golang/prometheus"
19         "github.com/sirupsen/logrus"
20 )
21
22 var (
23         listenerPingInterval = time.Minute
24         testSlowPing         = false
25 )
26
27 type pgEventSource struct {
28         DataSource   string
29         MaxOpenConns int
30         QueueSize    int
31         Logger       logrus.FieldLogger
32         Reg          *prometheus.Registry
33
34         db         *sql.DB
35         pqListener *pq.Listener
36         queue      chan *event
37         sinks      map[*pgEventSink]bool
38         mtx        sync.Mutex
39
40         lastQDelay time.Duration
41         eventsIn   prometheus.Counter
42         eventsOut  prometheus.Counter
43
44         cancel func()
45
46         setupOnce sync.Once
47         ready     chan bool
48 }
49
50 func (ps *pgEventSource) listenerProblem(et pq.ListenerEventType, err error) {
51         if et == pq.ListenerEventConnected {
52                 ps.Logger.Debug("pgEventSource connected")
53                 return
54         }
55
56         // Until we have a mechanism for catching up on missed events,
57         // we cannot recover from a dropped connection without
58         // breaking our promises to clients.
59         ps.Logger.
60                 WithField("eventType", et).
61                 WithError(err).
62                 Error("listener problem")
63         ps.cancel()
64 }
65
66 func (ps *pgEventSource) setup() {
67         ps.ready = make(chan bool)
68         ps.Reg.MustRegister(prometheus.NewGaugeFunc(
69                 prometheus.GaugeOpts{
70                         Namespace: "arvados",
71                         Subsystem: "ws",
72                         Name:      "queue_len",
73                         Help:      "Current number of events in queue",
74                 }, func() float64 { return float64(len(ps.queue)) }))
75         ps.Reg.MustRegister(prometheus.NewGaugeFunc(
76                 prometheus.GaugeOpts{
77                         Namespace: "arvados",
78                         Subsystem: "ws",
79                         Name:      "queue_cap",
80                         Help:      "Event queue capacity",
81                 }, func() float64 { return float64(cap(ps.queue)) }))
82         ps.Reg.MustRegister(prometheus.NewGaugeFunc(
83                 prometheus.GaugeOpts{
84                         Namespace: "arvados",
85                         Subsystem: "ws",
86                         Name:      "queue_delay",
87                         Help:      "Queue delay of the last emitted event",
88                 }, func() float64 { return ps.lastQDelay.Seconds() }))
89         ps.Reg.MustRegister(prometheus.NewGaugeFunc(
90                 prometheus.GaugeOpts{
91                         Namespace: "arvados",
92                         Subsystem: "ws",
93                         Name:      "sinks",
94                         Help:      "Number of active sinks (connections)",
95                 }, func() float64 { return float64(len(ps.sinks)) }))
96         ps.Reg.MustRegister(prometheus.NewGaugeFunc(
97                 prometheus.GaugeOpts{
98                         Namespace: "arvados",
99                         Subsystem: "ws",
100                         Name:      "sinks_blocked",
101                         Help:      "Number of sinks (connections) that are busy and blocking the main event stream",
102                 }, func() float64 {
103                         ps.mtx.Lock()
104                         defer ps.mtx.Unlock()
105                         blocked := 0
106                         for sink := range ps.sinks {
107                                 blocked += len(sink.channel)
108                         }
109                         return float64(blocked)
110                 }))
111         ps.eventsIn = prometheus.NewCounter(prometheus.CounterOpts{
112                 Namespace: "arvados",
113                 Subsystem: "ws",
114                 Name:      "events_in",
115                 Help:      "Number of events received from postgresql notify channel",
116         })
117         ps.Reg.MustRegister(ps.eventsIn)
118         ps.eventsOut = prometheus.NewCounter(prometheus.CounterOpts{
119                 Namespace: "arvados",
120                 Subsystem: "ws",
121                 Name:      "events_out",
122                 Help:      "Number of events sent to client sessions (before filtering)",
123         })
124         ps.Reg.MustRegister(ps.eventsOut)
125
126         maxConnections := prometheus.NewGauge(prometheus.GaugeOpts{
127                 Namespace: "arvados",
128                 Subsystem: "ws",
129                 Name:      "db_max_connections",
130                 Help:      "Maximum number of open connections to the database",
131         })
132         ps.Reg.MustRegister(maxConnections)
133         openConnections := prometheus.NewGaugeVec(prometheus.GaugeOpts{
134                 Namespace: "arvados",
135                 Subsystem: "ws",
136                 Name:      "db_open_connections",
137                 Help:      "Open connections to the database",
138         }, []string{"inuse"})
139         ps.Reg.MustRegister(openConnections)
140
141         updateDBStats := func() {
142                 stats := ps.db.Stats()
143                 maxConnections.Set(float64(stats.MaxOpenConnections))
144                 openConnections.WithLabelValues("0").Set(float64(stats.Idle))
145                 openConnections.WithLabelValues("1").Set(float64(stats.InUse))
146         }
147         go func() {
148                 <-ps.ready
149                 if ps.db == nil {
150                         return
151                 }
152                 updateDBStats()
153                 for range time.Tick(time.Second) {
154                         updateDBStats()
155                 }
156         }()
157 }
158
159 // Close stops listening for new events and disconnects all clients.
160 func (ps *pgEventSource) Close() {
161         ps.WaitReady()
162         ps.cancel()
163 }
164
165 // WaitReady returns when the event listener is connected.
166 func (ps *pgEventSource) WaitReady() {
167         ps.setupOnce.Do(ps.setup)
168         <-ps.ready
169 }
170
171 // Run listens for event notifications on the "logs" channel and sends
172 // them to all subscribers.
173 func (ps *pgEventSource) Run() {
174         ps.Logger.Debug("pgEventSource Run starting")
175         defer ps.Logger.Debug("pgEventSource Run finished")
176
177         ps.setupOnce.Do(ps.setup)
178         ready := ps.ready
179         defer func() {
180                 if ready != nil {
181                         close(ready)
182                 }
183         }()
184
185         ctx, cancel := context.WithCancel(context.Background())
186         ps.cancel = cancel
187         defer cancel()
188
189         defer func() {
190                 // Disconnect all clients
191                 ps.mtx.Lock()
192                 for sink := range ps.sinks {
193                         close(sink.channel)
194                 }
195                 ps.sinks = nil
196                 ps.mtx.Unlock()
197         }()
198
199         db, err := sql.Open("postgres", ps.DataSource)
200         if err != nil {
201                 ps.Logger.WithError(err).Error("sql.Open failed")
202                 return
203         }
204         if ps.MaxOpenConns <= 0 {
205                 ps.Logger.Warn("no database connection limit configured -- consider setting PostgreSQL.ConnectionPool>0 in arvados-ws configuration file")
206         }
207         db.SetMaxOpenConns(ps.MaxOpenConns)
208         if err = db.Ping(); err != nil {
209                 ps.Logger.WithError(err).Error("db.Ping failed")
210                 return
211         }
212         ps.db = db
213
214         ps.pqListener = pq.NewListener(ps.DataSource, time.Second, time.Minute, ps.listenerProblem)
215         err = ps.pqListener.Listen("logs")
216         if err != nil {
217                 ps.Logger.WithError(err).Error("pq Listen failed")
218                 return
219         }
220         defer ps.pqListener.Close()
221         ps.Logger.Debug("pq Listen setup done")
222
223         close(ready)
224         // Avoid double-close in deferred func
225         ready = nil
226
227         ps.queue = make(chan *event, ps.QueueSize)
228         defer close(ps.queue)
229
230         go func() {
231                 for e := range ps.queue {
232                         // Wait for the "select ... from logs" call to
233                         // finish. This limits max concurrent queries
234                         // to ps.QueueSize. Without this, max
235                         // concurrent queries would be bounded by
236                         // client_count X client_queue_size.
237                         e.Detail()
238
239                         ps.Logger.
240                                 WithField("serial", e.Serial).
241                                 WithField("detail", e.Detail()).
242                                 Debug("event ready")
243                         e.Ready = time.Now()
244                         ps.lastQDelay = e.Ready.Sub(e.Received)
245
246                         ps.mtx.Lock()
247                         for sink := range ps.sinks {
248                                 sink.channel <- e
249                                 ps.eventsOut.Inc()
250                         }
251                         ps.mtx.Unlock()
252                 }
253         }()
254
255         var serial uint64
256
257         go func() {
258                 ticker := time.NewTicker(listenerPingInterval)
259                 defer ticker.Stop()
260                 for {
261                         select {
262                         case <-ctx.Done():
263                                 ps.Logger.Debug("ctx done")
264                                 return
265
266                         case <-ticker.C:
267                                 ps.Logger.Debug("listener ping")
268                                 if testSlowPing {
269                                         time.Sleep(time.Second / 2)
270                                 }
271                                 err := ps.pqListener.Ping()
272                                 if err != nil {
273                                         ps.listenerProblem(-1, fmt.Errorf("pqListener ping failed: %s", err))
274                                         continue
275                                 }
276                         }
277                 }
278         }()
279
280         for {
281                 select {
282                 case <-ctx.Done():
283                         ps.Logger.Debug("ctx done")
284                         return
285
286                 case pqEvent, ok := <-ps.pqListener.Notify:
287                         if !ok {
288                                 ps.Logger.Error("pqListener Notify chan closed")
289                                 return
290                         }
291                         if pqEvent == nil {
292                                 // pq should call listenerProblem
293                                 // itself in addition to sending us a
294                                 // nil event, so this might be
295                                 // superfluous:
296                                 ps.listenerProblem(-1, errors.New("pqListener Notify chan received nil event"))
297                                 continue
298                         }
299                         if pqEvent.Channel != "logs" {
300                                 ps.Logger.WithField("pqEvent", pqEvent).Error("unexpected notify from wrong channel")
301                                 continue
302                         }
303                         logID, err := strconv.ParseInt(pqEvent.Extra, 10, 64)
304                         if err != nil {
305                                 ps.Logger.WithField("pqEvent", pqEvent).Error("bad notify payload")
306                                 continue
307                         }
308                         serial++
309                         e := &event{
310                                 LogID:    logID,
311                                 Received: time.Now(),
312                                 Serial:   serial,
313                                 db:       ps.db,
314                                 logger:   ps.Logger,
315                         }
316                         ps.Logger.WithField("event", e).Debug("incoming")
317                         ps.eventsIn.Inc()
318                         ps.queue <- e
319                         go e.Detail()
320                 }
321         }
322 }
323
324 // NewSink subscribes to the event source. NewSink returns an
325 // eventSink, whose Channel() method returns a channel: a pointer to
326 // each subsequent event will be sent to that channel.
327 //
328 // The caller must ensure events are received from the sink channel as
329 // quickly as possible because when one sink stops being ready, all
330 // other sinks block.
331 func (ps *pgEventSource) NewSink() eventSink {
332         sink := &pgEventSink{
333                 channel: make(chan *event, 1),
334                 source:  ps,
335         }
336         ps.mtx.Lock()
337         if ps.sinks == nil {
338                 ps.sinks = make(map[*pgEventSink]bool)
339         }
340         ps.sinks[sink] = true
341         ps.mtx.Unlock()
342         return sink
343 }
344
345 func (ps *pgEventSource) DB() *sql.DB {
346         ps.WaitReady()
347         return ps.db
348 }
349
350 func (ps *pgEventSource) DBHealth() error {
351         if ps.db == nil {
352                 return errors.New("database not connected")
353         }
354         ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second))
355         defer cancel()
356         var i int
357         return ps.db.QueryRowContext(ctx, "SELECT 1").Scan(&i)
358 }
359
360 func (ps *pgEventSource) DebugStatus() interface{} {
361         ps.mtx.Lock()
362         defer ps.mtx.Unlock()
363         blocked := 0
364         for sink := range ps.sinks {
365                 blocked += len(sink.channel)
366         }
367         return map[string]interface{}{
368                 "Queue":        len(ps.queue),
369                 "QueueLimit":   cap(ps.queue),
370                 "QueueDelay":   stats.Duration(ps.lastQDelay),
371                 "Sinks":        len(ps.sinks),
372                 "SinksBlocked": blocked,
373                 "DBStats":      ps.db.Stats(),
374         }
375 }
376
377 type pgEventSink struct {
378         channel chan *event
379         source  *pgEventSource
380 }
381
382 func (sink *pgEventSink) Channel() <-chan *event {
383         return sink.channel
384 }
385
386 // Stop sending events to the sink's channel.
387 func (sink *pgEventSink) Stop() {
388         go func() {
389                 // Ensure this sink cannot fill up and block the
390                 // server-side queue (which otherwise could in turn
391                 // block our mtx.Lock() here)
392                 for range sink.channel {
393                 }
394         }()
395         sink.source.mtx.Lock()
396         if _, ok := sink.source.sinks[sink]; ok {
397                 delete(sink.source.sinks, sink)
398                 close(sink.channel)
399         }
400         sink.source.mtx.Unlock()
401 }