Merge branch '8784-dir-listings'
[arvados.git] / services / ws / event_source.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "context"
9         "database/sql"
10         "strconv"
11         "strings"
12         "sync"
13         "sync/atomic"
14         "time"
15
16         "git.curoverse.com/arvados.git/sdk/go/stats"
17         "github.com/lib/pq"
18 )
19
20 type pgConfig map[string]string
21
22 func (c pgConfig) ConnectionString() string {
23         s := ""
24         for k, v := range c {
25                 s += k
26                 s += "='"
27                 s += strings.Replace(
28                         strings.Replace(v, `\`, `\\`, -1),
29                         `'`, `\'`, -1)
30                 s += "' "
31         }
32         return s
33 }
34
35 type pgEventSource struct {
36         DataSource   string
37         MaxOpenConns int
38         QueueSize    int
39
40         db         *sql.DB
41         pqListener *pq.Listener
42         queue      chan *event
43         sinks      map[*pgEventSink]bool
44         mtx        sync.Mutex
45
46         lastQDelay time.Duration
47         eventsIn   uint64
48         eventsOut  uint64
49
50         cancel func()
51
52         setupOnce sync.Once
53         ready     chan bool
54 }
55
56 var _ debugStatuser = (*pgEventSource)(nil)
57
58 func (ps *pgEventSource) listenerProblem(et pq.ListenerEventType, err error) {
59         if et == pq.ListenerEventConnected {
60                 logger(nil).Debug("pgEventSource connected")
61                 return
62         }
63
64         // Until we have a mechanism for catching up on missed events,
65         // we cannot recover from a dropped connection without
66         // breaking our promises to clients.
67         logger(nil).
68                 WithField("eventType", et).
69                 WithError(err).
70                 Error("listener problem")
71         ps.cancel()
72 }
73
74 func (ps *pgEventSource) setup() {
75         ps.ready = make(chan bool)
76 }
77
78 // Close stops listening for new events and disconnects all clients.
79 func (ps *pgEventSource) Close() {
80         ps.WaitReady()
81         ps.cancel()
82 }
83
84 // WaitReady returns when the event listener is connected.
85 func (ps *pgEventSource) WaitReady() {
86         ps.setupOnce.Do(ps.setup)
87         <-ps.ready
88 }
89
90 // Run listens for event notifications on the "logs" channel and sends
91 // them to all subscribers.
92 func (ps *pgEventSource) Run() {
93         logger(nil).Debug("pgEventSource Run starting")
94         defer logger(nil).Debug("pgEventSource Run finished")
95
96         ps.setupOnce.Do(ps.setup)
97         ready := ps.ready
98         defer func() {
99                 if ready != nil {
100                         close(ready)
101                 }
102         }()
103
104         ctx, cancel := context.WithCancel(context.Background())
105         ps.cancel = cancel
106         defer cancel()
107
108         defer func() {
109                 // Disconnect all clients
110                 ps.mtx.Lock()
111                 for sink := range ps.sinks {
112                         close(sink.channel)
113                 }
114                 ps.sinks = nil
115                 ps.mtx.Unlock()
116         }()
117
118         db, err := sql.Open("postgres", ps.DataSource)
119         if err != nil {
120                 logger(nil).WithError(err).Error("sql.Open failed")
121                 return
122         }
123         if ps.MaxOpenConns <= 0 {
124                 logger(nil).Warn("no database connection limit configured -- consider setting PostgresPool>0 in arvados-ws configuration file")
125         }
126         db.SetMaxOpenConns(ps.MaxOpenConns)
127         if err = db.Ping(); err != nil {
128                 logger(nil).WithError(err).Error("db.Ping failed")
129                 return
130         }
131         ps.db = db
132
133         ps.pqListener = pq.NewListener(ps.DataSource, time.Second, time.Minute, ps.listenerProblem)
134         err = ps.pqListener.Listen("logs")
135         if err != nil {
136                 logger(nil).WithError(err).Error("pq Listen failed")
137                 return
138         }
139         defer ps.pqListener.Close()
140         logger(nil).Debug("pq Listen setup done")
141
142         close(ready)
143         // Avoid double-close in deferred func
144         ready = nil
145
146         ps.queue = make(chan *event, ps.QueueSize)
147         defer close(ps.queue)
148
149         go func() {
150                 for e := range ps.queue {
151                         // Wait for the "select ... from logs" call to
152                         // finish. This limits max concurrent queries
153                         // to ps.QueueSize. Without this, max
154                         // concurrent queries would be bounded by
155                         // client_count X client_queue_size.
156                         e.Detail()
157
158                         logger(nil).
159                                 WithField("serial", e.Serial).
160                                 WithField("detail", e.Detail()).
161                                 Debug("event ready")
162                         e.Ready = time.Now()
163                         ps.lastQDelay = e.Ready.Sub(e.Received)
164
165                         ps.mtx.Lock()
166                         atomic.AddUint64(&ps.eventsOut, uint64(len(ps.sinks)))
167                         for sink := range ps.sinks {
168                                 sink.channel <- e
169                         }
170                         ps.mtx.Unlock()
171                 }
172         }()
173
174         var serial uint64
175         ticker := time.NewTicker(time.Minute)
176         defer ticker.Stop()
177         for {
178                 select {
179                 case <-ctx.Done():
180                         logger(nil).Debug("ctx done")
181                         return
182
183                 case <-ticker.C:
184                         logger(nil).Debug("listener ping")
185                         ps.pqListener.Ping()
186
187                 case pqEvent, ok := <-ps.pqListener.Notify:
188                         if !ok {
189                                 logger(nil).Debug("pqListener Notify chan closed")
190                                 return
191                         }
192                         if pqEvent == nil {
193                                 // pq should call listenerProblem
194                                 // itself in addition to sending us a
195                                 // nil event, so this might be
196                                 // superfluous:
197                                 ps.listenerProblem(-1, nil)
198                                 continue
199                         }
200                         if pqEvent.Channel != "logs" {
201                                 logger(nil).WithField("pqEvent", pqEvent).Error("unexpected notify from wrong channel")
202                                 continue
203                         }
204                         logID, err := strconv.ParseUint(pqEvent.Extra, 10, 64)
205                         if err != nil {
206                                 logger(nil).WithField("pqEvent", pqEvent).Error("bad notify payload")
207                                 continue
208                         }
209                         serial++
210                         e := &event{
211                                 LogID:    logID,
212                                 Received: time.Now(),
213                                 Serial:   serial,
214                                 db:       ps.db,
215                         }
216                         logger(nil).WithField("event", e).Debug("incoming")
217                         atomic.AddUint64(&ps.eventsIn, 1)
218                         ps.queue <- e
219                         go e.Detail()
220                 }
221         }
222 }
223
224 // NewSink subscribes to the event source. NewSink returns an
225 // eventSink, whose Channel() method returns a channel: a pointer to
226 // each subsequent event will be sent to that channel.
227 //
228 // The caller must ensure events are received from the sink channel as
229 // quickly as possible because when one sink stops being ready, all
230 // other sinks block.
231 func (ps *pgEventSource) NewSink() eventSink {
232         sink := &pgEventSink{
233                 channel: make(chan *event, 1),
234                 source:  ps,
235         }
236         ps.mtx.Lock()
237         if ps.sinks == nil {
238                 ps.sinks = make(map[*pgEventSink]bool)
239         }
240         ps.sinks[sink] = true
241         ps.mtx.Unlock()
242         return sink
243 }
244
245 func (ps *pgEventSource) DB() *sql.DB {
246         return ps.db
247 }
248
249 func (ps *pgEventSource) DBHealth() error {
250         ctx, _ := context.WithDeadline(context.Background(), time.Now().Add(time.Second))
251         var i int
252         return ps.db.QueryRowContext(ctx, "SELECT 1").Scan(&i)
253 }
254
255 func (ps *pgEventSource) DebugStatus() interface{} {
256         ps.mtx.Lock()
257         defer ps.mtx.Unlock()
258         blocked := 0
259         for sink := range ps.sinks {
260                 blocked += len(sink.channel)
261         }
262         return map[string]interface{}{
263                 "EventsIn":     atomic.LoadUint64(&ps.eventsIn),
264                 "EventsOut":    atomic.LoadUint64(&ps.eventsOut),
265                 "Queue":        len(ps.queue),
266                 "QueueLimit":   cap(ps.queue),
267                 "QueueDelay":   stats.Duration(ps.lastQDelay),
268                 "Sinks":        len(ps.sinks),
269                 "SinksBlocked": blocked,
270                 "DBStats":      ps.db.Stats(),
271         }
272 }
273
274 type pgEventSink struct {
275         channel chan *event
276         source  *pgEventSource
277 }
278
279 func (sink *pgEventSink) Channel() <-chan *event {
280         return sink.channel
281 }
282
283 // Stop sending events to the sink's channel.
284 func (sink *pgEventSink) Stop() {
285         go func() {
286                 // Ensure this sink cannot fill up and block the
287                 // server-side queue (which otherwise could in turn
288                 // block our mtx.Lock() here)
289                 for _ = range sink.channel {
290                 }
291         }()
292         sink.source.mtx.Lock()
293         if _, ok := sink.source.sinks[sink]; ok {
294                 delete(sink.source.sinks, sink)
295                 close(sink.channel)
296         }
297         sink.source.mtx.Unlock()
298 }