10813: Merge branch 'master' into 10813-arv-put-six-threads
[arvados.git] / services / ws / event_source.go
1 package main
2
3 import (
4         "database/sql"
5         "strconv"
6         "strings"
7         "sync"
8         "sync/atomic"
9         "time"
10
11         "git.curoverse.com/arvados.git/sdk/go/stats"
12         "github.com/lib/pq"
13 )
14
15 type pgConfig map[string]string
16
17 func (c pgConfig) ConnectionString() string {
18         s := ""
19         for k, v := range c {
20                 s += k
21                 s += "='"
22                 s += strings.Replace(
23                         strings.Replace(v, `\`, `\\`, -1),
24                         `'`, `\'`, -1)
25                 s += "' "
26         }
27         return s
28 }
29
30 type pgEventSource struct {
31         DataSource string
32         QueueSize  int
33
34         db         *sql.DB
35         pqListener *pq.Listener
36         queue      chan *event
37         sinks      map[*pgEventSink]bool
38         setupOnce  sync.Once
39         mtx        sync.Mutex
40         shutdown   chan error
41
42         lastQDelay time.Duration
43         eventsIn   uint64
44         eventsOut  uint64
45 }
46
47 var _ debugStatuser = (*pgEventSource)(nil)
48
49 func (ps *pgEventSource) setup() {
50         ps.shutdown = make(chan error, 1)
51         ps.sinks = make(map[*pgEventSink]bool)
52
53         db, err := sql.Open("postgres", ps.DataSource)
54         if err != nil {
55                 logger(nil).WithError(err).Fatal("sql.Open failed")
56         }
57         if err = db.Ping(); err != nil {
58                 logger(nil).WithError(err).Fatal("db.Ping failed")
59         }
60         ps.db = db
61
62         ps.pqListener = pq.NewListener(ps.DataSource, time.Second, time.Minute, func(ev pq.ListenerEventType, err error) {
63                 if err != nil {
64                         // Until we have a mechanism for catching up
65                         // on missed events, we cannot recover from a
66                         // dropped connection without breaking our
67                         // promises to clients.
68                         logger(nil).WithError(err).Error("listener problem")
69                         ps.shutdown <- err
70                 }
71         })
72         err = ps.pqListener.Listen("logs")
73         if err != nil {
74                 logger(nil).WithError(err).Fatal("pq Listen failed")
75         }
76         logger(nil).Debug("pgEventSource listening")
77
78         go ps.run()
79 }
80
81 func (ps *pgEventSource) run() {
82         ps.queue = make(chan *event, ps.QueueSize)
83
84         go func() {
85                 for e := range ps.queue {
86                         // Wait for the "select ... from logs" call to
87                         // finish. This limits max concurrent queries
88                         // to ps.QueueSize. Without this, max
89                         // concurrent queries would be bounded by
90                         // client_count X client_queue_size.
91                         e.Detail()
92
93                         logger(nil).
94                                 WithField("serial", e.Serial).
95                                 WithField("detail", e.Detail()).
96                                 Debug("event ready")
97                         e.Ready = time.Now()
98                         ps.lastQDelay = e.Ready.Sub(e.Received)
99
100                         ps.mtx.Lock()
101                         atomic.AddUint64(&ps.eventsOut, uint64(len(ps.sinks)))
102                         for sink := range ps.sinks {
103                                 sink.channel <- e
104                         }
105                         ps.mtx.Unlock()
106                 }
107         }()
108
109         var serial uint64
110         ticker := time.NewTicker(time.Minute)
111         defer ticker.Stop()
112         for {
113                 select {
114                 case err, ok := <-ps.shutdown:
115                         if ok {
116                                 logger(nil).WithError(err).Info("shutdown")
117                         }
118                         close(ps.queue)
119                         return
120
121                 case <-ticker.C:
122                         logger(nil).Debug("listener ping")
123                         ps.pqListener.Ping()
124
125                 case pqEvent, ok := <-ps.pqListener.Notify:
126                         if !ok {
127                                 close(ps.queue)
128                                 return
129                         }
130                         if pqEvent.Channel != "logs" {
131                                 continue
132                         }
133                         logID, err := strconv.ParseUint(pqEvent.Extra, 10, 64)
134                         if err != nil {
135                                 logger(nil).WithField("pqEvent", pqEvent).Error("bad notify payload")
136                                 continue
137                         }
138                         serial++
139                         e := &event{
140                                 LogID:    logID,
141                                 Received: time.Now(),
142                                 Serial:   serial,
143                                 db:       ps.db,
144                         }
145                         logger(nil).WithField("event", e).Debug("incoming")
146                         atomic.AddUint64(&ps.eventsIn, 1)
147                         ps.queue <- e
148                         go e.Detail()
149                 }
150         }
151 }
152
153 // NewSink subscribes to the event source. NewSink returns an
154 // eventSink, whose Channel() method returns a channel: a pointer to
155 // each subsequent event will be sent to that channel.
156 //
157 // The caller must ensure events are received from the sink channel as
158 // quickly as possible because when one sink stops being ready, all
159 // other sinks block.
160 func (ps *pgEventSource) NewSink() eventSink {
161         ps.setupOnce.Do(ps.setup)
162         sink := &pgEventSink{
163                 channel: make(chan *event, 1),
164                 source:  ps,
165         }
166         ps.mtx.Lock()
167         ps.sinks[sink] = true
168         ps.mtx.Unlock()
169         return sink
170 }
171
172 func (ps *pgEventSource) DB() *sql.DB {
173         ps.setupOnce.Do(ps.setup)
174         return ps.db
175 }
176
177 func (ps *pgEventSource) DebugStatus() interface{} {
178         ps.mtx.Lock()
179         defer ps.mtx.Unlock()
180         blocked := 0
181         for sink := range ps.sinks {
182                 blocked += len(sink.channel)
183         }
184         return map[string]interface{}{
185                 "EventsIn":     atomic.LoadUint64(&ps.eventsIn),
186                 "EventsOut":    atomic.LoadUint64(&ps.eventsOut),
187                 "Queue":        len(ps.queue),
188                 "QueueLimit":   cap(ps.queue),
189                 "QueueDelay":   stats.Duration(ps.lastQDelay),
190                 "Sinks":        len(ps.sinks),
191                 "SinksBlocked": blocked,
192         }
193 }
194
195 type pgEventSink struct {
196         channel chan *event
197         source  *pgEventSource
198 }
199
200 func (sink *pgEventSink) Channel() <-chan *event {
201         return sink.channel
202 }
203
204 func (sink *pgEventSink) Stop() {
205         go func() {
206                 // Ensure this sink cannot fill up and block the
207                 // server-side queue (which otherwise could in turn
208                 // block our mtx.Lock() here)
209                 for _ = range sink.channel {
210                 }
211         }()
212         sink.source.mtx.Lock()
213         delete(sink.source.sinks, sink)
214         sink.source.mtx.Unlock()
215         close(sink.channel)
216 }