8460: Ping clients only when read times out and outgoing queue is empty.
[arvados.git] / services / ws / pg.go
1 package main
2
3 import (
4         "database/sql"
5         "log"
6         "strings"
7         "sync"
8         "time"
9
10         "github.com/lib/pq"
11 )
12
13 type pgConfig map[string]string
14
15 func (c pgConfig) ConnectionString() string {
16         s := ""
17         for k, v := range c {
18                 s += k
19                 s += "='"
20                 s += strings.Replace(
21                         strings.Replace(v, `\`, `\\`, -1),
22                         `'`, `\'`, -1)
23                 s += "' "
24         }
25         return s
26 }
27
28 type pgEventSource struct {
29         PgConfig  pgConfig
30         QueueSize int
31
32         pqListener *pq.Listener
33         sinks      map[*pgEventSink]bool
34         setupOnce  sync.Once
35         mtx        sync.Mutex
36 }
37
38 func (ps *pgEventSource) setup() {
39         ps.sinks = make(map[*pgEventSink]bool)
40         go ps.run()
41 }
42
43 func (ps *pgEventSource) run() {
44         db, err := sql.Open("postgres", ps.PgConfig.ConnectionString())
45         if err != nil {
46                 log.Fatal(err)
47         }
48
49         listener := pq.NewListener(ps.PgConfig.ConnectionString(), time.Second, time.Minute, func(ev pq.ListenerEventType, err error) {
50                 if err != nil {
51                         // Until we have a mechanism for catching up
52                         // on missed events, we cannot recover from a
53                         // dropped connection without breaking our
54                         // promises to clients.
55                         log.Fatal(err)
56                 }
57         })
58         err = listener.Listen("logs")
59         if err != nil {
60                 log.Fatal(err)
61         }
62         go func() {
63                 for _ = range time.NewTicker(time.Minute).C {
64                         listener.Ping()
65                 }
66         }()
67
68         eventQueue := make(chan *event, ps.QueueSize)
69         go func() {
70                 for e := range eventQueue {
71                         // Wait for the "select ... from logs" call to
72                         // finish. This limits max concurrent queries
73                         // to ps.QueueSize. Without this, max
74                         // concurrent queries would be bounded by
75                         // client_count X client_queue_size.
76                         e.Detail()
77                         debugLogf("%+v", e)
78                         ps.mtx.Lock()
79                         for sink := range ps.sinks {
80                                 sink.channel <- e
81                         }
82                         ps.mtx.Unlock()
83                 }
84         }()
85
86         var serial uint64
87         for pqEvent := range listener.Notify {
88                 if pqEvent.Channel != "logs" {
89                         continue
90                 }
91                 serial++
92                 e := &event{
93                         LogUUID:  pqEvent.Extra,
94                         Received: time.Now(),
95                         Serial:   serial,
96                         db:       db,
97                 }
98                 debugLogf("%+v", e)
99                 eventQueue <- e
100                 go e.Detail()
101         }
102 }
103
104 // NewSink subscribes to the event source. If c is not nil, it will be
105 // used as the event channel. Otherwise, a new channel will be
106 // created. Either way, the sink channel will be returned by the
107 // Channel() method of the returned eventSink. All subsequent events
108 // will be sent to the sink channel. The caller must ensure events are
109 // received from the sink channel as quickly as possible: when one
110 // sink blocks, all other sinks also block.
111 func (ps *pgEventSource) NewSink(c chan *event) eventSink {
112         ps.setupOnce.Do(ps.setup)
113         if c == nil {
114                 c = make(chan *event, 1)
115         }
116         sink := &pgEventSink{
117                 channel: c,
118                 source:  ps,
119         }
120         ps.mtx.Lock()
121         ps.sinks[sink] = true
122         ps.mtx.Unlock()
123         return sink
124 }
125
126 type pgEventSink struct {
127         channel chan *event
128         source  *pgEventSource
129 }
130
131 func (sink *pgEventSink) Channel() <-chan *event {
132         return sink.channel
133 }
134
135 func (sink *pgEventSink) Stop() {
136         go func() {
137                 // Ensure this sink cannot fill up and block the
138                 // server-side queue (which otherwise could in turn
139                 // block our mtx.Lock() here)
140                 for _ = range sink.channel {
141                 }
142         }()
143         sink.source.mtx.Lock()
144         delete(sink.source.sinks, sink)
145         sink.source.mtx.Unlock()
146         close(sink.channel)
147 }