14965: Fix for default command args test
[arvados.git] / services / ws / event_source.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "context"
9         "database/sql"
10         "strconv"
11         "sync"
12         "sync/atomic"
13         "time"
14
15         "git.curoverse.com/arvados.git/sdk/go/stats"
16         "github.com/lib/pq"
17 )
18
19 type pgEventSource struct {
20         DataSource   string
21         MaxOpenConns int
22         QueueSize    int
23
24         db         *sql.DB
25         pqListener *pq.Listener
26         queue      chan *event
27         sinks      map[*pgEventSink]bool
28         mtx        sync.Mutex
29
30         lastQDelay time.Duration
31         eventsIn   uint64
32         eventsOut  uint64
33
34         cancel func()
35
36         setupOnce sync.Once
37         ready     chan bool
38 }
39
40 var _ debugStatuser = (*pgEventSource)(nil)
41
42 func (ps *pgEventSource) listenerProblem(et pq.ListenerEventType, err error) {
43         if et == pq.ListenerEventConnected {
44                 logger(nil).Debug("pgEventSource connected")
45                 return
46         }
47
48         // Until we have a mechanism for catching up on missed events,
49         // we cannot recover from a dropped connection without
50         // breaking our promises to clients.
51         logger(nil).
52                 WithField("eventType", et).
53                 WithError(err).
54                 Error("listener problem")
55         ps.cancel()
56 }
57
58 func (ps *pgEventSource) setup() {
59         ps.ready = make(chan bool)
60 }
61
62 // Close stops listening for new events and disconnects all clients.
63 func (ps *pgEventSource) Close() {
64         ps.WaitReady()
65         ps.cancel()
66 }
67
68 // WaitReady returns when the event listener is connected.
69 func (ps *pgEventSource) WaitReady() {
70         ps.setupOnce.Do(ps.setup)
71         <-ps.ready
72 }
73
74 // Run listens for event notifications on the "logs" channel and sends
75 // them to all subscribers.
76 func (ps *pgEventSource) Run() {
77         logger(nil).Debug("pgEventSource Run starting")
78         defer logger(nil).Debug("pgEventSource Run finished")
79
80         ps.setupOnce.Do(ps.setup)
81         ready := ps.ready
82         defer func() {
83                 if ready != nil {
84                         close(ready)
85                 }
86         }()
87
88         ctx, cancel := context.WithCancel(context.Background())
89         ps.cancel = cancel
90         defer cancel()
91
92         defer func() {
93                 // Disconnect all clients
94                 ps.mtx.Lock()
95                 for sink := range ps.sinks {
96                         close(sink.channel)
97                 }
98                 ps.sinks = nil
99                 ps.mtx.Unlock()
100         }()
101
102         db, err := sql.Open("postgres", ps.DataSource)
103         if err != nil {
104                 logger(nil).WithError(err).Error("sql.Open failed")
105                 return
106         }
107         if ps.MaxOpenConns <= 0 {
108                 logger(nil).Warn("no database connection limit configured -- consider setting PostgresPool>0 in arvados-ws configuration file")
109         }
110         db.SetMaxOpenConns(ps.MaxOpenConns)
111         if err = db.Ping(); err != nil {
112                 logger(nil).WithError(err).Error("db.Ping failed")
113                 return
114         }
115         ps.db = db
116
117         ps.pqListener = pq.NewListener(ps.DataSource, time.Second, time.Minute, ps.listenerProblem)
118         err = ps.pqListener.Listen("logs")
119         if err != nil {
120                 logger(nil).WithError(err).Error("pq Listen failed")
121                 return
122         }
123         defer ps.pqListener.Close()
124         logger(nil).Debug("pq Listen setup done")
125
126         close(ready)
127         // Avoid double-close in deferred func
128         ready = nil
129
130         ps.queue = make(chan *event, ps.QueueSize)
131         defer close(ps.queue)
132
133         go func() {
134                 for e := range ps.queue {
135                         // Wait for the "select ... from logs" call to
136                         // finish. This limits max concurrent queries
137                         // to ps.QueueSize. Without this, max
138                         // concurrent queries would be bounded by
139                         // client_count X client_queue_size.
140                         e.Detail()
141
142                         logger(nil).
143                                 WithField("serial", e.Serial).
144                                 WithField("detail", e.Detail()).
145                                 Debug("event ready")
146                         e.Ready = time.Now()
147                         ps.lastQDelay = e.Ready.Sub(e.Received)
148
149                         ps.mtx.Lock()
150                         atomic.AddUint64(&ps.eventsOut, uint64(len(ps.sinks)))
151                         for sink := range ps.sinks {
152                                 sink.channel <- e
153                         }
154                         ps.mtx.Unlock()
155                 }
156         }()
157
158         var serial uint64
159         ticker := time.NewTicker(time.Minute)
160         defer ticker.Stop()
161         for {
162                 select {
163                 case <-ctx.Done():
164                         logger(nil).Debug("ctx done")
165                         return
166
167                 case <-ticker.C:
168                         logger(nil).Debug("listener ping")
169                         ps.pqListener.Ping()
170
171                 case pqEvent, ok := <-ps.pqListener.Notify:
172                         if !ok {
173                                 logger(nil).Debug("pqListener Notify chan closed")
174                                 return
175                         }
176                         if pqEvent == nil {
177                                 // pq should call listenerProblem
178                                 // itself in addition to sending us a
179                                 // nil event, so this might be
180                                 // superfluous:
181                                 ps.listenerProblem(-1, nil)
182                                 continue
183                         }
184                         if pqEvent.Channel != "logs" {
185                                 logger(nil).WithField("pqEvent", pqEvent).Error("unexpected notify from wrong channel")
186                                 continue
187                         }
188                         logID, err := strconv.ParseUint(pqEvent.Extra, 10, 64)
189                         if err != nil {
190                                 logger(nil).WithField("pqEvent", pqEvent).Error("bad notify payload")
191                                 continue
192                         }
193                         serial++
194                         e := &event{
195                                 LogID:    logID,
196                                 Received: time.Now(),
197                                 Serial:   serial,
198                                 db:       ps.db,
199                         }
200                         logger(nil).WithField("event", e).Debug("incoming")
201                         atomic.AddUint64(&ps.eventsIn, 1)
202                         ps.queue <- e
203                         go e.Detail()
204                 }
205         }
206 }
207
208 // NewSink subscribes to the event source. NewSink returns an
209 // eventSink, whose Channel() method returns a channel: a pointer to
210 // each subsequent event will be sent to that channel.
211 //
212 // The caller must ensure events are received from the sink channel as
213 // quickly as possible because when one sink stops being ready, all
214 // other sinks block.
215 func (ps *pgEventSource) NewSink() eventSink {
216         sink := &pgEventSink{
217                 channel: make(chan *event, 1),
218                 source:  ps,
219         }
220         ps.mtx.Lock()
221         if ps.sinks == nil {
222                 ps.sinks = make(map[*pgEventSink]bool)
223         }
224         ps.sinks[sink] = true
225         ps.mtx.Unlock()
226         return sink
227 }
228
229 func (ps *pgEventSource) DB() *sql.DB {
230         ps.WaitReady()
231         return ps.db
232 }
233
234 func (ps *pgEventSource) DBHealth() error {
235         ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second))
236         defer cancel()
237         var i int
238         return ps.db.QueryRowContext(ctx, "SELECT 1").Scan(&i)
239 }
240
241 func (ps *pgEventSource) DebugStatus() interface{} {
242         ps.mtx.Lock()
243         defer ps.mtx.Unlock()
244         blocked := 0
245         for sink := range ps.sinks {
246                 blocked += len(sink.channel)
247         }
248         return map[string]interface{}{
249                 "EventsIn":     atomic.LoadUint64(&ps.eventsIn),
250                 "EventsOut":    atomic.LoadUint64(&ps.eventsOut),
251                 "Queue":        len(ps.queue),
252                 "QueueLimit":   cap(ps.queue),
253                 "QueueDelay":   stats.Duration(ps.lastQDelay),
254                 "Sinks":        len(ps.sinks),
255                 "SinksBlocked": blocked,
256                 "DBStats":      ps.db.Stats(),
257         }
258 }
259
260 type pgEventSink struct {
261         channel chan *event
262         source  *pgEventSource
263 }
264
265 func (sink *pgEventSink) Channel() <-chan *event {
266         return sink.channel
267 }
268
269 // Stop sending events to the sink's channel.
270 func (sink *pgEventSink) Stop() {
271         go func() {
272                 // Ensure this sink cannot fill up and block the
273                 // server-side queue (which otherwise could in turn
274                 // block our mtx.Lock() here)
275                 for range sink.channel {
276                 }
277         }()
278         sink.source.mtx.Lock()
279         if _, ok := sink.source.sinks[sink]; ok {
280                 delete(sink.source.sinks, sink)
281                 close(sink.channel)
282         }
283         sink.source.mtx.Unlock()
284 }