12 "git.curoverse.com/arvados.git/sdk/go/stats"
16 type pgConfig map[string]string
18 func (c pgConfig) ConnectionString() string {
24 strings.Replace(v, `\`, `\\`, -1),
31 type pgEventSource struct {
37 pqListener *pq.Listener
39 sinks map[*pgEventSink]bool
42 lastQDelay time.Duration
52 var _ debugStatuser = (*pgEventSource)(nil)
54 func (ps *pgEventSource) listenerProblem(et pq.ListenerEventType, err error) {
55 if et == pq.ListenerEventConnected {
56 logger(nil).Debug("pgEventSource connected")
60 // Until we have a mechanism for catching up on missed events,
61 // we cannot recover from a dropped connection without
62 // breaking our promises to clients.
64 WithField("eventType", et).
66 Error("listener problem")
70 func (ps *pgEventSource) setup() {
71 ps.ready = make(chan bool)
74 // Close stops listening for new events and disconnects all clients.
75 func (ps *pgEventSource) Close() {
80 // WaitReady returns when the event listener is connected.
81 func (ps *pgEventSource) WaitReady() {
82 ps.setupOnce.Do(ps.setup)
86 // Run listens for event notifications on the "logs" channel and sends
87 // them to all subscribers.
88 func (ps *pgEventSource) Run() {
89 logger(nil).Debug("pgEventSource Run starting")
90 defer logger(nil).Debug("pgEventSource Run finished")
92 ps.setupOnce.Do(ps.setup)
100 ctx, cancel := context.WithCancel(context.Background())
105 // Disconnect all clients
107 for sink := range ps.sinks {
114 db, err := sql.Open("postgres", ps.DataSource)
116 logger(nil).WithError(err).Error("sql.Open failed")
119 if ps.MaxOpenConns <= 0 {
120 logger(nil).Warn("no database connection limit configured -- consider setting PostgresPool>0 in arvados-ws configuration file")
122 db.SetMaxOpenConns(ps.MaxOpenConns)
123 if err = db.Ping(); err != nil {
124 logger(nil).WithError(err).Error("db.Ping failed")
129 ps.pqListener = pq.NewListener(ps.DataSource, time.Second, time.Minute, ps.listenerProblem)
130 err = ps.pqListener.Listen("logs")
132 logger(nil).WithError(err).Error("pq Listen failed")
135 defer ps.pqListener.Close()
136 logger(nil).Debug("pq Listen setup done")
139 // Avoid double-close in deferred func
142 ps.queue = make(chan *event, ps.QueueSize)
143 defer close(ps.queue)
146 for e := range ps.queue {
147 // Wait for the "select ... from logs" call to
148 // finish. This limits max concurrent queries
149 // to ps.QueueSize. Without this, max
150 // concurrent queries would be bounded by
151 // client_count X client_queue_size.
155 WithField("serial", e.Serial).
156 WithField("detail", e.Detail()).
159 ps.lastQDelay = e.Ready.Sub(e.Received)
162 atomic.AddUint64(&ps.eventsOut, uint64(len(ps.sinks)))
163 for sink := range ps.sinks {
171 ticker := time.NewTicker(time.Minute)
176 logger(nil).Debug("ctx done")
180 logger(nil).Debug("listener ping")
183 case pqEvent, ok := <-ps.pqListener.Notify:
185 logger(nil).Debug("pqListener Notify chan closed")
189 // pq should call listenerProblem
190 // itself in addition to sending us a
191 // nil event, so this might be
193 ps.listenerProblem(-1, nil)
196 if pqEvent.Channel != "logs" {
197 logger(nil).WithField("pqEvent", pqEvent).Error("unexpected notify from wrong channel")
200 logID, err := strconv.ParseUint(pqEvent.Extra, 10, 64)
202 logger(nil).WithField("pqEvent", pqEvent).Error("bad notify payload")
208 Received: time.Now(),
212 logger(nil).WithField("event", e).Debug("incoming")
213 atomic.AddUint64(&ps.eventsIn, 1)
220 // NewSink subscribes to the event source. NewSink returns an
221 // eventSink, whose Channel() method returns a channel: a pointer to
222 // each subsequent event will be sent to that channel.
224 // The caller must ensure events are received from the sink channel as
225 // quickly as possible because when one sink stops being ready, all
226 // other sinks block.
227 func (ps *pgEventSource) NewSink() eventSink {
228 sink := &pgEventSink{
229 channel: make(chan *event, 1),
234 ps.sinks = make(map[*pgEventSink]bool)
236 ps.sinks[sink] = true
241 func (ps *pgEventSource) DB() *sql.DB {
245 func (ps *pgEventSource) DBHealth() error {
246 ctx, _ := context.WithDeadline(context.Background(), time.Now().Add(time.Second))
248 return ps.db.QueryRowContext(ctx, "SELECT 1").Scan(&i)
251 func (ps *pgEventSource) DebugStatus() interface{} {
253 defer ps.mtx.Unlock()
255 for sink := range ps.sinks {
256 blocked += len(sink.channel)
258 return map[string]interface{}{
259 "EventsIn": atomic.LoadUint64(&ps.eventsIn),
260 "EventsOut": atomic.LoadUint64(&ps.eventsOut),
261 "Queue": len(ps.queue),
262 "QueueLimit": cap(ps.queue),
263 "QueueDelay": stats.Duration(ps.lastQDelay),
264 "Sinks": len(ps.sinks),
265 "SinksBlocked": blocked,
269 type pgEventSink struct {
271 source *pgEventSource
274 func (sink *pgEventSink) Channel() <-chan *event {
278 // Stop sending events to the sink's channel.
279 func (sink *pgEventSink) Stop() {
281 // Ensure this sink cannot fill up and block the
282 // server-side queue (which otherwise could in turn
283 // block our mtx.Lock() here)
284 for _ = range sink.channel {
287 sink.source.mtx.Lock()
288 if _, ok := sink.source.sinks[sink]; ok {
289 delete(sink.source.sinks, sink)
292 sink.source.mtx.Unlock()