Merge branch '16265-security-updates' into dependabot/bundler/apps/workbench/loofah...
[arvados.git] / services / ws / event_source.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "context"
9         "database/sql"
10         "errors"
11         "fmt"
12         "strconv"
13         "sync"
14         "sync/atomic"
15         "time"
16
17         "git.arvados.org/arvados.git/sdk/go/stats"
18         "github.com/lib/pq"
19 )
20
21 type pgEventSource struct {
22         DataSource   string
23         MaxOpenConns int
24         QueueSize    int
25
26         db         *sql.DB
27         pqListener *pq.Listener
28         queue      chan *event
29         sinks      map[*pgEventSink]bool
30         mtx        sync.Mutex
31
32         lastQDelay time.Duration
33         eventsIn   uint64
34         eventsOut  uint64
35
36         cancel func()
37
38         setupOnce sync.Once
39         ready     chan bool
40 }
41
42 var _ debugStatuser = (*pgEventSource)(nil)
43
44 func (ps *pgEventSource) listenerProblem(et pq.ListenerEventType, err error) {
45         if et == pq.ListenerEventConnected {
46                 logger(nil).Debug("pgEventSource connected")
47                 return
48         }
49
50         // Until we have a mechanism for catching up on missed events,
51         // we cannot recover from a dropped connection without
52         // breaking our promises to clients.
53         logger(nil).
54                 WithField("eventType", et).
55                 WithError(err).
56                 Error("listener problem")
57         ps.cancel()
58 }
59
60 func (ps *pgEventSource) setup() {
61         ps.ready = make(chan bool)
62 }
63
64 // Close stops listening for new events and disconnects all clients.
65 func (ps *pgEventSource) Close() {
66         ps.WaitReady()
67         ps.cancel()
68 }
69
70 // WaitReady returns when the event listener is connected.
71 func (ps *pgEventSource) WaitReady() {
72         ps.setupOnce.Do(ps.setup)
73         <-ps.ready
74 }
75
76 // Run listens for event notifications on the "logs" channel and sends
77 // them to all subscribers.
78 func (ps *pgEventSource) Run() {
79         logger(nil).Debug("pgEventSource Run starting")
80         defer logger(nil).Debug("pgEventSource Run finished")
81
82         ps.setupOnce.Do(ps.setup)
83         ready := ps.ready
84         defer func() {
85                 if ready != nil {
86                         close(ready)
87                 }
88         }()
89
90         ctx, cancel := context.WithCancel(context.Background())
91         ps.cancel = cancel
92         defer cancel()
93
94         defer func() {
95                 // Disconnect all clients
96                 ps.mtx.Lock()
97                 for sink := range ps.sinks {
98                         close(sink.channel)
99                 }
100                 ps.sinks = nil
101                 ps.mtx.Unlock()
102         }()
103
104         db, err := sql.Open("postgres", ps.DataSource)
105         if err != nil {
106                 logger(nil).WithError(err).Error("sql.Open failed")
107                 return
108         }
109         if ps.MaxOpenConns <= 0 {
110                 logger(nil).Warn("no database connection limit configured -- consider setting PostgresPool>0 in arvados-ws configuration file")
111         }
112         db.SetMaxOpenConns(ps.MaxOpenConns)
113         if err = db.Ping(); err != nil {
114                 logger(nil).WithError(err).Error("db.Ping failed")
115                 return
116         }
117         ps.db = db
118
119         ps.pqListener = pq.NewListener(ps.DataSource, time.Second, time.Minute, ps.listenerProblem)
120         err = ps.pqListener.Listen("logs")
121         if err != nil {
122                 logger(nil).WithError(err).Error("pq Listen failed")
123                 return
124         }
125         defer ps.pqListener.Close()
126         logger(nil).Debug("pq Listen setup done")
127
128         close(ready)
129         // Avoid double-close in deferred func
130         ready = nil
131
132         ps.queue = make(chan *event, ps.QueueSize)
133         defer close(ps.queue)
134
135         go func() {
136                 for e := range ps.queue {
137                         // Wait for the "select ... from logs" call to
138                         // finish. This limits max concurrent queries
139                         // to ps.QueueSize. Without this, max
140                         // concurrent queries would be bounded by
141                         // client_count X client_queue_size.
142                         e.Detail()
143
144                         logger(nil).
145                                 WithField("serial", e.Serial).
146                                 WithField("detail", e.Detail()).
147                                 Debug("event ready")
148                         e.Ready = time.Now()
149                         ps.lastQDelay = e.Ready.Sub(e.Received)
150
151                         ps.mtx.Lock()
152                         atomic.AddUint64(&ps.eventsOut, uint64(len(ps.sinks)))
153                         for sink := range ps.sinks {
154                                 sink.channel <- e
155                         }
156                         ps.mtx.Unlock()
157                 }
158         }()
159
160         var serial uint64
161         ticker := time.NewTicker(time.Minute)
162         defer ticker.Stop()
163         for {
164                 select {
165                 case <-ctx.Done():
166                         logger(nil).Debug("ctx done")
167                         return
168
169                 case <-ticker.C:
170                         logger(nil).Debug("listener ping")
171                         err := ps.pqListener.Ping()
172                         if err != nil {
173                                 ps.listenerProblem(-1, fmt.Errorf("pqListener ping failed: %s", err))
174                                 continue
175                         }
176
177                 case pqEvent, ok := <-ps.pqListener.Notify:
178                         if !ok {
179                                 logger(nil).Error("pqListener Notify chan closed")
180                                 return
181                         }
182                         if pqEvent == nil {
183                                 // pq should call listenerProblem
184                                 // itself in addition to sending us a
185                                 // nil event, so this might be
186                                 // superfluous:
187                                 ps.listenerProblem(-1, errors.New("pqListener Notify chan received nil event"))
188                                 continue
189                         }
190                         if pqEvent.Channel != "logs" {
191                                 logger(nil).WithField("pqEvent", pqEvent).Error("unexpected notify from wrong channel")
192                                 continue
193                         }
194                         logID, err := strconv.ParseUint(pqEvent.Extra, 10, 64)
195                         if err != nil {
196                                 logger(nil).WithField("pqEvent", pqEvent).Error("bad notify payload")
197                                 continue
198                         }
199                         serial++
200                         e := &event{
201                                 LogID:    logID,
202                                 Received: time.Now(),
203                                 Serial:   serial,
204                                 db:       ps.db,
205                         }
206                         logger(nil).WithField("event", e).Debug("incoming")
207                         atomic.AddUint64(&ps.eventsIn, 1)
208                         ps.queue <- e
209                         go e.Detail()
210                 }
211         }
212 }
213
214 // NewSink subscribes to the event source. NewSink returns an
215 // eventSink, whose Channel() method returns a channel: a pointer to
216 // each subsequent event will be sent to that channel.
217 //
218 // The caller must ensure events are received from the sink channel as
219 // quickly as possible because when one sink stops being ready, all
220 // other sinks block.
221 func (ps *pgEventSource) NewSink() eventSink {
222         sink := &pgEventSink{
223                 channel: make(chan *event, 1),
224                 source:  ps,
225         }
226         ps.mtx.Lock()
227         if ps.sinks == nil {
228                 ps.sinks = make(map[*pgEventSink]bool)
229         }
230         ps.sinks[sink] = true
231         ps.mtx.Unlock()
232         return sink
233 }
234
235 func (ps *pgEventSource) DB() *sql.DB {
236         ps.WaitReady()
237         return ps.db
238 }
239
240 func (ps *pgEventSource) DBHealth() error {
241         ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second))
242         defer cancel()
243         var i int
244         return ps.db.QueryRowContext(ctx, "SELECT 1").Scan(&i)
245 }
246
247 func (ps *pgEventSource) DebugStatus() interface{} {
248         ps.mtx.Lock()
249         defer ps.mtx.Unlock()
250         blocked := 0
251         for sink := range ps.sinks {
252                 blocked += len(sink.channel)
253         }
254         return map[string]interface{}{
255                 "EventsIn":     atomic.LoadUint64(&ps.eventsIn),
256                 "EventsOut":    atomic.LoadUint64(&ps.eventsOut),
257                 "Queue":        len(ps.queue),
258                 "QueueLimit":   cap(ps.queue),
259                 "QueueDelay":   stats.Duration(ps.lastQDelay),
260                 "Sinks":        len(ps.sinks),
261                 "SinksBlocked": blocked,
262                 "DBStats":      ps.db.Stats(),
263         }
264 }
265
266 type pgEventSink struct {
267         channel chan *event
268         source  *pgEventSource
269 }
270
271 func (sink *pgEventSink) Channel() <-chan *event {
272         return sink.channel
273 }
274
275 // Stop sending events to the sink's channel.
276 func (sink *pgEventSink) Stop() {
277         go func() {
278                 // Ensure this sink cannot fill up and block the
279                 // server-side queue (which otherwise could in turn
280                 // block our mtx.Lock() here)
281                 for range sink.channel {
282                 }
283         }()
284         sink.source.mtx.Lock()
285         if _, ok := sink.source.sinks[sink]; ok {
286                 delete(sink.source.sinks, sink)
287                 close(sink.channel)
288         }
289         sink.source.mtx.Unlock()
290 }