Merge branch '10669-safe-http-cache'
[arvados.git] / services / ws / event_source.go
1 package main
2
3 import (
4         "context"
5         "database/sql"
6         "strconv"
7         "strings"
8         "sync"
9         "sync/atomic"
10         "time"
11
12         "git.curoverse.com/arvados.git/sdk/go/stats"
13         "github.com/lib/pq"
14 )
15
16 type pgConfig map[string]string
17
18 func (c pgConfig) ConnectionString() string {
19         s := ""
20         for k, v := range c {
21                 s += k
22                 s += "='"
23                 s += strings.Replace(
24                         strings.Replace(v, `\`, `\\`, -1),
25                         `'`, `\'`, -1)
26                 s += "' "
27         }
28         return s
29 }
30
31 type pgEventSource struct {
32         DataSource string
33         QueueSize  int
34
35         db         *sql.DB
36         pqListener *pq.Listener
37         queue      chan *event
38         sinks      map[*pgEventSink]bool
39         mtx        sync.Mutex
40
41         lastQDelay time.Duration
42         eventsIn   uint64
43         eventsOut  uint64
44
45         cancel func()
46
47         setupOnce sync.Once
48         ready     chan bool
49 }
50
51 var _ debugStatuser = (*pgEventSource)(nil)
52
53 func (ps *pgEventSource) listenerProblem(et pq.ListenerEventType, err error) {
54         if et == pq.ListenerEventConnected {
55                 logger(nil).Debug("pgEventSource connected")
56                 return
57         }
58
59         // Until we have a mechanism for catching up on missed events,
60         // we cannot recover from a dropped connection without
61         // breaking our promises to clients.
62         logger(nil).
63                 WithField("eventType", et).
64                 WithError(err).
65                 Error("listener problem")
66         ps.cancel()
67 }
68
69 func (ps *pgEventSource) setup() {
70         ps.ready = make(chan bool)
71 }
72
73 // Close stops listening for new events and disconnects all clients.
74 func (ps *pgEventSource) Close() {
75         ps.WaitReady()
76         ps.cancel()
77 }
78
79 // WaitReady returns when the event listener is connected.
80 func (ps *pgEventSource) WaitReady() {
81         ps.setupOnce.Do(ps.setup)
82         <-ps.ready
83 }
84
85 // Run listens for event notifications on the "logs" channel and sends
86 // them to all subscribers.
87 func (ps *pgEventSource) Run() {
88         logger(nil).Debug("pgEventSource Run starting")
89         defer logger(nil).Debug("pgEventSource Run finished")
90
91         ps.setupOnce.Do(ps.setup)
92         ready := ps.ready
93         defer func() {
94                 if ready != nil {
95                         close(ready)
96                 }
97         }()
98
99         ctx, cancel := context.WithCancel(context.Background())
100         ps.cancel = cancel
101         defer cancel()
102
103         defer func() {
104                 // Disconnect all clients
105                 ps.mtx.Lock()
106                 for sink := range ps.sinks {
107                         close(sink.channel)
108                 }
109                 ps.sinks = nil
110                 ps.mtx.Unlock()
111         }()
112
113         db, err := sql.Open("postgres", ps.DataSource)
114         if err != nil {
115                 logger(nil).WithError(err).Error("sql.Open failed")
116                 return
117         }
118         if err = db.Ping(); err != nil {
119                 logger(nil).WithError(err).Error("db.Ping failed")
120                 return
121         }
122         ps.db = db
123
124         ps.pqListener = pq.NewListener(ps.DataSource, time.Second, time.Minute, ps.listenerProblem)
125         err = ps.pqListener.Listen("logs")
126         if err != nil {
127                 logger(nil).WithError(err).Error("pq Listen failed")
128                 return
129         }
130         defer ps.pqListener.Close()
131         logger(nil).Debug("pq Listen setup done")
132
133         close(ready)
134         // Avoid double-close in deferred func
135         ready = nil
136
137         ps.queue = make(chan *event, ps.QueueSize)
138         defer close(ps.queue)
139
140         go func() {
141                 for e := range ps.queue {
142                         // Wait for the "select ... from logs" call to
143                         // finish. This limits max concurrent queries
144                         // to ps.QueueSize. Without this, max
145                         // concurrent queries would be bounded by
146                         // client_count X client_queue_size.
147                         e.Detail()
148
149                         logger(nil).
150                                 WithField("serial", e.Serial).
151                                 WithField("detail", e.Detail()).
152                                 Debug("event ready")
153                         e.Ready = time.Now()
154                         ps.lastQDelay = e.Ready.Sub(e.Received)
155
156                         ps.mtx.Lock()
157                         atomic.AddUint64(&ps.eventsOut, uint64(len(ps.sinks)))
158                         for sink := range ps.sinks {
159                                 sink.channel <- e
160                         }
161                         ps.mtx.Unlock()
162                 }
163         }()
164
165         var serial uint64
166         ticker := time.NewTicker(time.Minute)
167         defer ticker.Stop()
168         for {
169                 select {
170                 case <-ctx.Done():
171                         logger(nil).Debug("ctx done")
172                         return
173
174                 case <-ticker.C:
175                         logger(nil).Debug("listener ping")
176                         ps.pqListener.Ping()
177
178                 case pqEvent, ok := <-ps.pqListener.Notify:
179                         if !ok {
180                                 logger(nil).Debug("pqListener Notify chan closed")
181                                 return
182                         }
183                         if pqEvent == nil {
184                                 // pq should call listenerProblem
185                                 // itself in addition to sending us a
186                                 // nil event, so this might be
187                                 // superfluous:
188                                 ps.listenerProblem(-1, nil)
189                                 continue
190                         }
191                         if pqEvent.Channel != "logs" {
192                                 logger(nil).WithField("pqEvent", pqEvent).Error("unexpected notify from wrong channel")
193                                 continue
194                         }
195                         logID, err := strconv.ParseUint(pqEvent.Extra, 10, 64)
196                         if err != nil {
197                                 logger(nil).WithField("pqEvent", pqEvent).Error("bad notify payload")
198                                 continue
199                         }
200                         serial++
201                         e := &event{
202                                 LogID:    logID,
203                                 Received: time.Now(),
204                                 Serial:   serial,
205                                 db:       ps.db,
206                         }
207                         logger(nil).WithField("event", e).Debug("incoming")
208                         atomic.AddUint64(&ps.eventsIn, 1)
209                         ps.queue <- e
210                         go e.Detail()
211                 }
212         }
213 }
214
215 // NewSink subscribes to the event source. NewSink returns an
216 // eventSink, whose Channel() method returns a channel: a pointer to
217 // each subsequent event will be sent to that channel.
218 //
219 // The caller must ensure events are received from the sink channel as
220 // quickly as possible because when one sink stops being ready, all
221 // other sinks block.
222 func (ps *pgEventSource) NewSink() eventSink {
223         sink := &pgEventSink{
224                 channel: make(chan *event, 1),
225                 source:  ps,
226         }
227         ps.mtx.Lock()
228         if ps.sinks == nil {
229                 ps.sinks = make(map[*pgEventSink]bool)
230         }
231         ps.sinks[sink] = true
232         ps.mtx.Unlock()
233         return sink
234 }
235
236 func (ps *pgEventSource) DB() *sql.DB {
237         return ps.db
238 }
239
240 func (ps *pgEventSource) DebugStatus() interface{} {
241         ps.mtx.Lock()
242         defer ps.mtx.Unlock()
243         blocked := 0
244         for sink := range ps.sinks {
245                 blocked += len(sink.channel)
246         }
247         return map[string]interface{}{
248                 "EventsIn":     atomic.LoadUint64(&ps.eventsIn),
249                 "EventsOut":    atomic.LoadUint64(&ps.eventsOut),
250                 "Queue":        len(ps.queue),
251                 "QueueLimit":   cap(ps.queue),
252                 "QueueDelay":   stats.Duration(ps.lastQDelay),
253                 "Sinks":        len(ps.sinks),
254                 "SinksBlocked": blocked,
255         }
256 }
257
258 type pgEventSink struct {
259         channel chan *event
260         source  *pgEventSource
261 }
262
263 func (sink *pgEventSink) Channel() <-chan *event {
264         return sink.channel
265 }
266
267 // Stop sending events to the sink's channel.
268 func (sink *pgEventSink) Stop() {
269         go func() {
270                 // Ensure this sink cannot fill up and block the
271                 // server-side queue (which otherwise could in turn
272                 // block our mtx.Lock() here)
273                 for _ = range sink.channel {
274                 }
275         }()
276         sink.source.mtx.Lock()
277         if _, ok := sink.source.sinks[sink]; ok {
278                 delete(sink.source.sinks, sink)
279                 close(sink.channel)
280         }
281         sink.source.mtx.Unlock()
282 }