8784: Fix test for latest firefox.
[arvados.git] / services / ws / event_source.go
1 package main
2
3 import (
4         "context"
5         "database/sql"
6         "strconv"
7         "strings"
8         "sync"
9         "sync/atomic"
10         "time"
11
12         "git.curoverse.com/arvados.git/sdk/go/stats"
13         "github.com/lib/pq"
14 )
15
16 type pgConfig map[string]string
17
18 func (c pgConfig) ConnectionString() string {
19         s := ""
20         for k, v := range c {
21                 s += k
22                 s += "='"
23                 s += strings.Replace(
24                         strings.Replace(v, `\`, `\\`, -1),
25                         `'`, `\'`, -1)
26                 s += "' "
27         }
28         return s
29 }
30
31 type pgEventSource struct {
32         DataSource   string
33         MaxOpenConns int
34         QueueSize    int
35
36         db         *sql.DB
37         pqListener *pq.Listener
38         queue      chan *event
39         sinks      map[*pgEventSink]bool
40         mtx        sync.Mutex
41
42         lastQDelay time.Duration
43         eventsIn   uint64
44         eventsOut  uint64
45
46         cancel func()
47
48         setupOnce sync.Once
49         ready     chan bool
50 }
51
52 var _ debugStatuser = (*pgEventSource)(nil)
53
54 func (ps *pgEventSource) listenerProblem(et pq.ListenerEventType, err error) {
55         if et == pq.ListenerEventConnected {
56                 logger(nil).Debug("pgEventSource connected")
57                 return
58         }
59
60         // Until we have a mechanism for catching up on missed events,
61         // we cannot recover from a dropped connection without
62         // breaking our promises to clients.
63         logger(nil).
64                 WithField("eventType", et).
65                 WithError(err).
66                 Error("listener problem")
67         ps.cancel()
68 }
69
70 func (ps *pgEventSource) setup() {
71         ps.ready = make(chan bool)
72 }
73
74 // Close stops listening for new events and disconnects all clients.
75 func (ps *pgEventSource) Close() {
76         ps.WaitReady()
77         ps.cancel()
78 }
79
80 // WaitReady returns when the event listener is connected.
81 func (ps *pgEventSource) WaitReady() {
82         ps.setupOnce.Do(ps.setup)
83         <-ps.ready
84 }
85
86 // Run listens for event notifications on the "logs" channel and sends
87 // them to all subscribers.
88 func (ps *pgEventSource) Run() {
89         logger(nil).Debug("pgEventSource Run starting")
90         defer logger(nil).Debug("pgEventSource Run finished")
91
92         ps.setupOnce.Do(ps.setup)
93         ready := ps.ready
94         defer func() {
95                 if ready != nil {
96                         close(ready)
97                 }
98         }()
99
100         ctx, cancel := context.WithCancel(context.Background())
101         ps.cancel = cancel
102         defer cancel()
103
104         defer func() {
105                 // Disconnect all clients
106                 ps.mtx.Lock()
107                 for sink := range ps.sinks {
108                         close(sink.channel)
109                 }
110                 ps.sinks = nil
111                 ps.mtx.Unlock()
112         }()
113
114         db, err := sql.Open("postgres", ps.DataSource)
115         if err != nil {
116                 logger(nil).WithError(err).Error("sql.Open failed")
117                 return
118         }
119         if ps.MaxOpenConns <= 0 {
120                 logger(nil).Warn("no database connection limit configured -- consider setting PostgresPool>0 in arvados-ws configuration file")
121         }
122         db.SetMaxOpenConns(ps.MaxOpenConns)
123         if err = db.Ping(); err != nil {
124                 logger(nil).WithError(err).Error("db.Ping failed")
125                 return
126         }
127         ps.db = db
128
129         ps.pqListener = pq.NewListener(ps.DataSource, time.Second, time.Minute, ps.listenerProblem)
130         err = ps.pqListener.Listen("logs")
131         if err != nil {
132                 logger(nil).WithError(err).Error("pq Listen failed")
133                 return
134         }
135         defer ps.pqListener.Close()
136         logger(nil).Debug("pq Listen setup done")
137
138         close(ready)
139         // Avoid double-close in deferred func
140         ready = nil
141
142         ps.queue = make(chan *event, ps.QueueSize)
143         defer close(ps.queue)
144
145         go func() {
146                 for e := range ps.queue {
147                         // Wait for the "select ... from logs" call to
148                         // finish. This limits max concurrent queries
149                         // to ps.QueueSize. Without this, max
150                         // concurrent queries would be bounded by
151                         // client_count X client_queue_size.
152                         e.Detail()
153
154                         logger(nil).
155                                 WithField("serial", e.Serial).
156                                 WithField("detail", e.Detail()).
157                                 Debug("event ready")
158                         e.Ready = time.Now()
159                         ps.lastQDelay = e.Ready.Sub(e.Received)
160
161                         ps.mtx.Lock()
162                         atomic.AddUint64(&ps.eventsOut, uint64(len(ps.sinks)))
163                         for sink := range ps.sinks {
164                                 sink.channel <- e
165                         }
166                         ps.mtx.Unlock()
167                 }
168         }()
169
170         var serial uint64
171         ticker := time.NewTicker(time.Minute)
172         defer ticker.Stop()
173         for {
174                 select {
175                 case <-ctx.Done():
176                         logger(nil).Debug("ctx done")
177                         return
178
179                 case <-ticker.C:
180                         logger(nil).Debug("listener ping")
181                         ps.pqListener.Ping()
182
183                 case pqEvent, ok := <-ps.pqListener.Notify:
184                         if !ok {
185                                 logger(nil).Debug("pqListener Notify chan closed")
186                                 return
187                         }
188                         if pqEvent == nil {
189                                 // pq should call listenerProblem
190                                 // itself in addition to sending us a
191                                 // nil event, so this might be
192                                 // superfluous:
193                                 ps.listenerProblem(-1, nil)
194                                 continue
195                         }
196                         if pqEvent.Channel != "logs" {
197                                 logger(nil).WithField("pqEvent", pqEvent).Error("unexpected notify from wrong channel")
198                                 continue
199                         }
200                         logID, err := strconv.ParseUint(pqEvent.Extra, 10, 64)
201                         if err != nil {
202                                 logger(nil).WithField("pqEvent", pqEvent).Error("bad notify payload")
203                                 continue
204                         }
205                         serial++
206                         e := &event{
207                                 LogID:    logID,
208                                 Received: time.Now(),
209                                 Serial:   serial,
210                                 db:       ps.db,
211                         }
212                         logger(nil).WithField("event", e).Debug("incoming")
213                         atomic.AddUint64(&ps.eventsIn, 1)
214                         ps.queue <- e
215                         go e.Detail()
216                 }
217         }
218 }
219
220 // NewSink subscribes to the event source. NewSink returns an
221 // eventSink, whose Channel() method returns a channel: a pointer to
222 // each subsequent event will be sent to that channel.
223 //
224 // The caller must ensure events are received from the sink channel as
225 // quickly as possible because when one sink stops being ready, all
226 // other sinks block.
227 func (ps *pgEventSource) NewSink() eventSink {
228         sink := &pgEventSink{
229                 channel: make(chan *event, 1),
230                 source:  ps,
231         }
232         ps.mtx.Lock()
233         if ps.sinks == nil {
234                 ps.sinks = make(map[*pgEventSink]bool)
235         }
236         ps.sinks[sink] = true
237         ps.mtx.Unlock()
238         return sink
239 }
240
241 func (ps *pgEventSource) DB() *sql.DB {
242         return ps.db
243 }
244
245 func (ps *pgEventSource) DebugStatus() interface{} {
246         ps.mtx.Lock()
247         defer ps.mtx.Unlock()
248         blocked := 0
249         for sink := range ps.sinks {
250                 blocked += len(sink.channel)
251         }
252         return map[string]interface{}{
253                 "EventsIn":     atomic.LoadUint64(&ps.eventsIn),
254                 "EventsOut":    atomic.LoadUint64(&ps.eventsOut),
255                 "Queue":        len(ps.queue),
256                 "QueueLimit":   cap(ps.queue),
257                 "QueueDelay":   stats.Duration(ps.lastQDelay),
258                 "Sinks":        len(ps.sinks),
259                 "SinksBlocked": blocked,
260         }
261 }
262
263 type pgEventSink struct {
264         channel chan *event
265         source  *pgEventSource
266 }
267
268 func (sink *pgEventSink) Channel() <-chan *event {
269         return sink.channel
270 }
271
272 // Stop sending events to the sink's channel.
273 func (sink *pgEventSink) Stop() {
274         go func() {
275                 // Ensure this sink cannot fill up and block the
276                 // server-side queue (which otherwise could in turn
277                 // block our mtx.Lock() here)
278                 for _ = range sink.channel {
279                 }
280         }()
281         sink.source.mtx.Lock()
282         if _, ok := sink.source.sinks[sink]; ok {
283                 delete(sink.source.sinks, sink)
284                 close(sink.channel)
285         }
286         sink.source.mtx.Unlock()
287 }