10764: Test v0 session.
[arvados.git] / services / ws / event_source.go
1 package main
2
3 import (
4         "context"
5         "database/sql"
6         "strconv"
7         "strings"
8         "sync"
9         "sync/atomic"
10         "time"
11
12         "git.curoverse.com/arvados.git/sdk/go/stats"
13         "github.com/lib/pq"
14 )
15
16 type pgConfig map[string]string
17
18 func (c pgConfig) ConnectionString() string {
19         s := ""
20         for k, v := range c {
21                 s += k
22                 s += "='"
23                 s += strings.Replace(
24                         strings.Replace(v, `\`, `\\`, -1),
25                         `'`, `\'`, -1)
26                 s += "' "
27         }
28         return s
29 }
30
31 type pgEventSource struct {
32         DataSource string
33         QueueSize  int
34
35         db         *sql.DB
36         pqListener *pq.Listener
37         queue      chan *event
38         sinks      map[*pgEventSink]bool
39         mtx        sync.Mutex
40
41         lastQDelay time.Duration
42         eventsIn   uint64
43         eventsOut  uint64
44
45         cancel func()
46
47         setupOnce sync.Once
48         ready     chan bool
49 }
50
51 var _ debugStatuser = (*pgEventSource)(nil)
52
53 func (ps *pgEventSource) listenerProblem(et pq.ListenerEventType, err error) {
54         if et == pq.ListenerEventConnected {
55                 logger(nil).Debug("pgEventSource connected")
56                 return
57         }
58
59         // Until we have a mechanism for catching up on missed events,
60         // we cannot recover from a dropped connection without
61         // breaking our promises to clients.
62         logger(nil).
63                 WithField("eventType", et).
64                 WithError(err).
65                 Error("listener problem")
66         ps.cancel()
67 }
68
69 func (ps *pgEventSource) setup() {
70         ps.ready = make(chan bool)
71 }
72
73 // waitReady returns when private fields (cancel, db) are available
74 // for tests to use.
75 func (ps *pgEventSource) waitReady() {
76         ps.setupOnce.Do(ps.setup)
77         <-ps.ready
78 }
79
80 // Run listens for event notifications on the "logs" channel and sends
81 // them to all subscribers.
82 func (ps *pgEventSource) Run() {
83         logger(nil).Debug("pgEventSource Run starting")
84         defer logger(nil).Debug("pgEventSource Run finished")
85
86         ps.setupOnce.Do(ps.setup)
87
88         ctx, cancel := context.WithCancel(context.Background())
89         ps.cancel = cancel
90         defer cancel()
91
92         defer func() {
93                 // Disconnect all clients
94                 ps.mtx.Lock()
95                 for sink := range ps.sinks {
96                         close(sink.channel)
97                 }
98                 ps.sinks = nil
99                 ps.mtx.Unlock()
100         }()
101
102         db, err := sql.Open("postgres", ps.DataSource)
103         if err != nil {
104                 logger(nil).WithError(err).Fatal("sql.Open failed")
105                 return
106         }
107         if err = db.Ping(); err != nil {
108                 logger(nil).WithError(err).Fatal("db.Ping failed")
109                 return
110         }
111         ps.db = db
112
113         ps.pqListener = pq.NewListener(ps.DataSource, time.Second, time.Minute, ps.listenerProblem)
114         err = ps.pqListener.Listen("logs")
115         if err != nil {
116                 logger(nil).WithError(err).Fatal("pq Listen failed")
117         }
118         defer ps.pqListener.Close()
119         logger(nil).Debug("pq Listen setup done")
120
121         close(ps.ready)
122
123         ps.queue = make(chan *event, ps.QueueSize)
124         defer close(ps.queue)
125
126         go func() {
127                 for e := range ps.queue {
128                         // Wait for the "select ... from logs" call to
129                         // finish. This limits max concurrent queries
130                         // to ps.QueueSize. Without this, max
131                         // concurrent queries would be bounded by
132                         // client_count X client_queue_size.
133                         e.Detail()
134
135                         logger(nil).
136                                 WithField("serial", e.Serial).
137                                 WithField("detail", e.Detail()).
138                                 Debug("event ready")
139                         e.Ready = time.Now()
140                         ps.lastQDelay = e.Ready.Sub(e.Received)
141
142                         ps.mtx.Lock()
143                         atomic.AddUint64(&ps.eventsOut, uint64(len(ps.sinks)))
144                         for sink := range ps.sinks {
145                                 sink.channel <- e
146                         }
147                         ps.mtx.Unlock()
148                 }
149         }()
150
151         var serial uint64
152         ticker := time.NewTicker(time.Minute)
153         defer ticker.Stop()
154         for {
155                 select {
156                 case <-ctx.Done():
157                         logger(nil).Debug("ctx done")
158                         return
159
160                 case <-ticker.C:
161                         logger(nil).Debug("listener ping")
162                         ps.pqListener.Ping()
163
164                 case pqEvent, ok := <-ps.pqListener.Notify:
165                         if !ok {
166                                 logger(nil).Debug("pqListener Notify chan closed")
167                                 return
168                         }
169                         if pqEvent == nil {
170                                 // pq should call listenerProblem
171                                 // itself in addition to sending us a
172                                 // nil event, so this might be
173                                 // superfluous:
174                                 ps.listenerProblem(-1, nil)
175                                 continue
176                         }
177                         if pqEvent.Channel != "logs" {
178                                 logger(nil).WithField("pqEvent", pqEvent).Error("unexpected notify from wrong channel")
179                                 continue
180                         }
181                         logID, err := strconv.ParseUint(pqEvent.Extra, 10, 64)
182                         if err != nil {
183                                 logger(nil).WithField("pqEvent", pqEvent).Error("bad notify payload")
184                                 continue
185                         }
186                         serial++
187                         e := &event{
188                                 LogID:    logID,
189                                 Received: time.Now(),
190                                 Serial:   serial,
191                                 db:       ps.db,
192                         }
193                         logger(nil).WithField("event", e).Debug("incoming")
194                         atomic.AddUint64(&ps.eventsIn, 1)
195                         ps.queue <- e
196                         go e.Detail()
197                 }
198         }
199 }
200
201 // NewSink subscribes to the event source. NewSink returns an
202 // eventSink, whose Channel() method returns a channel: a pointer to
203 // each subsequent event will be sent to that channel.
204 //
205 // The caller must ensure events are received from the sink channel as
206 // quickly as possible because when one sink stops being ready, all
207 // other sinks block.
208 func (ps *pgEventSource) NewSink() eventSink {
209         sink := &pgEventSink{
210                 channel: make(chan *event, 1),
211                 source:  ps,
212         }
213         ps.mtx.Lock()
214         if ps.sinks == nil {
215                 ps.sinks = make(map[*pgEventSink]bool)
216         }
217         ps.sinks[sink] = true
218         ps.mtx.Unlock()
219         return sink
220 }
221
222 func (ps *pgEventSource) DB() *sql.DB {
223         return ps.db
224 }
225
226 func (ps *pgEventSource) DebugStatus() interface{} {
227         ps.mtx.Lock()
228         defer ps.mtx.Unlock()
229         blocked := 0
230         for sink := range ps.sinks {
231                 blocked += len(sink.channel)
232         }
233         return map[string]interface{}{
234                 "EventsIn":     atomic.LoadUint64(&ps.eventsIn),
235                 "EventsOut":    atomic.LoadUint64(&ps.eventsOut),
236                 "Queue":        len(ps.queue),
237                 "QueueLimit":   cap(ps.queue),
238                 "QueueDelay":   stats.Duration(ps.lastQDelay),
239                 "Sinks":        len(ps.sinks),
240                 "SinksBlocked": blocked,
241         }
242 }
243
244 type pgEventSink struct {
245         channel chan *event
246         source  *pgEventSource
247 }
248
249 func (sink *pgEventSink) Channel() <-chan *event {
250         return sink.channel
251 }
252
253 // Stop sending events to the sink's channel.
254 func (sink *pgEventSink) Stop() {
255         go func() {
256                 // Ensure this sink cannot fill up and block the
257                 // server-side queue (which otherwise could in turn
258                 // block our mtx.Lock() here)
259                 for _ = range sink.channel {
260                 }
261         }()
262         sink.source.mtx.Lock()
263         if _, ok := sink.source.sinks[sink]; ok {
264                 delete(sink.source.sinks, sink)
265                 close(sink.channel)
266         }
267         sink.source.mtx.Unlock()
268 }