16217: Refactor ws to use lib/service.
[arvados.git] / services / ws / event_source.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package ws
6
7 import (
8         "context"
9         "database/sql"
10         "errors"
11         "fmt"
12         "strconv"
13         "sync"
14         "sync/atomic"
15         "time"
16
17         "git.arvados.org/arvados.git/sdk/go/stats"
18         "github.com/lib/pq"
19         "github.com/sirupsen/logrus"
20 )
21
22 type pgEventSource struct {
23         DataSource   string
24         MaxOpenConns int
25         QueueSize    int
26         Logger       logrus.FieldLogger
27
28         db         *sql.DB
29         pqListener *pq.Listener
30         queue      chan *event
31         sinks      map[*pgEventSink]bool
32         mtx        sync.Mutex
33
34         lastQDelay time.Duration
35         eventsIn   uint64
36         eventsOut  uint64
37
38         cancel func()
39
40         setupOnce sync.Once
41         ready     chan bool
42 }
43
44 var _ debugStatuser = (*pgEventSource)(nil)
45
46 func (ps *pgEventSource) listenerProblem(et pq.ListenerEventType, err error) {
47         if et == pq.ListenerEventConnected {
48                 ps.Logger.Debug("pgEventSource connected")
49                 return
50         }
51
52         // Until we have a mechanism for catching up on missed events,
53         // we cannot recover from a dropped connection without
54         // breaking our promises to clients.
55         ps.Logger.
56                 WithField("eventType", et).
57                 WithError(err).
58                 Error("listener problem")
59         ps.cancel()
60 }
61
62 func (ps *pgEventSource) setup() {
63         ps.ready = make(chan bool)
64 }
65
66 // Close stops listening for new events and disconnects all clients.
67 func (ps *pgEventSource) Close() {
68         ps.WaitReady()
69         ps.cancel()
70 }
71
72 // WaitReady returns when the event listener is connected.
73 func (ps *pgEventSource) WaitReady() {
74         ps.setupOnce.Do(ps.setup)
75         <-ps.ready
76 }
77
78 // Run listens for event notifications on the "logs" channel and sends
79 // them to all subscribers.
80 func (ps *pgEventSource) Run() {
81         ps.Logger.Debug("pgEventSource Run starting")
82         defer ps.Logger.Debug("pgEventSource Run finished")
83
84         ps.setupOnce.Do(ps.setup)
85         ready := ps.ready
86         defer func() {
87                 if ready != nil {
88                         close(ready)
89                 }
90         }()
91
92         ctx, cancel := context.WithCancel(context.Background())
93         ps.cancel = cancel
94         defer cancel()
95
96         defer func() {
97                 // Disconnect all clients
98                 ps.mtx.Lock()
99                 for sink := range ps.sinks {
100                         close(sink.channel)
101                 }
102                 ps.sinks = nil
103                 ps.mtx.Unlock()
104         }()
105
106         db, err := sql.Open("postgres", ps.DataSource)
107         if err != nil {
108                 ps.Logger.WithError(err).Error("sql.Open failed")
109                 return
110         }
111         if ps.MaxOpenConns <= 0 {
112                 ps.Logger.Warn("no database connection limit configured -- consider setting PostgresPool>0 in arvados-ws configuration file")
113         }
114         db.SetMaxOpenConns(ps.MaxOpenConns)
115         if err = db.Ping(); err != nil {
116                 ps.Logger.WithError(err).Error("db.Ping failed")
117                 return
118         }
119         ps.db = db
120
121         ps.pqListener = pq.NewListener(ps.DataSource, time.Second, time.Minute, ps.listenerProblem)
122         err = ps.pqListener.Listen("logs")
123         if err != nil {
124                 ps.Logger.WithError(err).Error("pq Listen failed")
125                 return
126         }
127         defer ps.pqListener.Close()
128         ps.Logger.Debug("pq Listen setup done")
129
130         close(ready)
131         // Avoid double-close in deferred func
132         ready = nil
133
134         ps.queue = make(chan *event, ps.QueueSize)
135         defer close(ps.queue)
136
137         go func() {
138                 for e := range ps.queue {
139                         // Wait for the "select ... from logs" call to
140                         // finish. This limits max concurrent queries
141                         // to ps.QueueSize. Without this, max
142                         // concurrent queries would be bounded by
143                         // client_count X client_queue_size.
144                         e.Detail()
145
146                         ps.Logger.
147                                 WithField("serial", e.Serial).
148                                 WithField("detail", e.Detail()).
149                                 Debug("event ready")
150                         e.Ready = time.Now()
151                         ps.lastQDelay = e.Ready.Sub(e.Received)
152
153                         ps.mtx.Lock()
154                         atomic.AddUint64(&ps.eventsOut, uint64(len(ps.sinks)))
155                         for sink := range ps.sinks {
156                                 sink.channel <- e
157                         }
158                         ps.mtx.Unlock()
159                 }
160         }()
161
162         var serial uint64
163         ticker := time.NewTicker(time.Minute)
164         defer ticker.Stop()
165         for {
166                 select {
167                 case <-ctx.Done():
168                         ps.Logger.Debug("ctx done")
169                         return
170
171                 case <-ticker.C:
172                         ps.Logger.Debug("listener ping")
173                         err := ps.pqListener.Ping()
174                         if err != nil {
175                                 ps.listenerProblem(-1, fmt.Errorf("pqListener ping failed: %s", err))
176                                 continue
177                         }
178
179                 case pqEvent, ok := <-ps.pqListener.Notify:
180                         if !ok {
181                                 ps.Logger.Error("pqListener Notify chan closed")
182                                 return
183                         }
184                         if pqEvent == nil {
185                                 // pq should call listenerProblem
186                                 // itself in addition to sending us a
187                                 // nil event, so this might be
188                                 // superfluous:
189                                 ps.listenerProblem(-1, errors.New("pqListener Notify chan received nil event"))
190                                 continue
191                         }
192                         if pqEvent.Channel != "logs" {
193                                 ps.Logger.WithField("pqEvent", pqEvent).Error("unexpected notify from wrong channel")
194                                 continue
195                         }
196                         logID, err := strconv.ParseUint(pqEvent.Extra, 10, 64)
197                         if err != nil {
198                                 ps.Logger.WithField("pqEvent", pqEvent).Error("bad notify payload")
199                                 continue
200                         }
201                         serial++
202                         e := &event{
203                                 LogID:    logID,
204                                 Received: time.Now(),
205                                 Serial:   serial,
206                                 db:       ps.db,
207                                 logger:   ps.Logger,
208                         }
209                         ps.Logger.WithField("event", e).Debug("incoming")
210                         atomic.AddUint64(&ps.eventsIn, 1)
211                         ps.queue <- e
212                         go e.Detail()
213                 }
214         }
215 }
216
217 // NewSink subscribes to the event source. NewSink returns an
218 // eventSink, whose Channel() method returns a channel: a pointer to
219 // each subsequent event will be sent to that channel.
220 //
221 // The caller must ensure events are received from the sink channel as
222 // quickly as possible because when one sink stops being ready, all
223 // other sinks block.
224 func (ps *pgEventSource) NewSink() eventSink {
225         sink := &pgEventSink{
226                 channel: make(chan *event, 1),
227                 source:  ps,
228         }
229         ps.mtx.Lock()
230         if ps.sinks == nil {
231                 ps.sinks = make(map[*pgEventSink]bool)
232         }
233         ps.sinks[sink] = true
234         ps.mtx.Unlock()
235         return sink
236 }
237
238 func (ps *pgEventSource) DB() *sql.DB {
239         ps.WaitReady()
240         return ps.db
241 }
242
243 func (ps *pgEventSource) DBHealth() error {
244         if ps.db == nil {
245                 return errors.New("database not connected")
246         }
247         ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second))
248         defer cancel()
249         var i int
250         return ps.db.QueryRowContext(ctx, "SELECT 1").Scan(&i)
251 }
252
253 func (ps *pgEventSource) DebugStatus() interface{} {
254         ps.mtx.Lock()
255         defer ps.mtx.Unlock()
256         blocked := 0
257         for sink := range ps.sinks {
258                 blocked += len(sink.channel)
259         }
260         return map[string]interface{}{
261                 "EventsIn":     atomic.LoadUint64(&ps.eventsIn),
262                 "EventsOut":    atomic.LoadUint64(&ps.eventsOut),
263                 "Queue":        len(ps.queue),
264                 "QueueLimit":   cap(ps.queue),
265                 "QueueDelay":   stats.Duration(ps.lastQDelay),
266                 "Sinks":        len(ps.sinks),
267                 "SinksBlocked": blocked,
268                 "DBStats":      ps.db.Stats(),
269         }
270 }
271
272 type pgEventSink struct {
273         channel chan *event
274         source  *pgEventSource
275 }
276
277 func (sink *pgEventSink) Channel() <-chan *event {
278         return sink.channel
279 }
280
281 // Stop sending events to the sink's channel.
282 func (sink *pgEventSink) Stop() {
283         go func() {
284                 // Ensure this sink cannot fill up and block the
285                 // server-side queue (which otherwise could in turn
286                 // block our mtx.Lock() here)
287                 for range sink.channel {
288                 }
289         }()
290         sink.source.mtx.Lock()
291         if _, ok := sink.source.sinks[sink]; ok {
292                 delete(sink.source.sinks, sink)
293                 close(sink.channel)
294         }
295         sink.source.mtx.Unlock()
296 }