ps.ready = make(chan bool)
}
-// waitReady returns when private fields (cancel, db) are available
-// for tests to use.
-func (ps *pgEventSource) waitReady() {
+// Close stops listening for new events and disconnects all clients.
+func (ps *pgEventSource) Close() {
+ ps.WaitReady()
+ ps.cancel()
+}
+
+// WaitReady returns when the event listener is connected.
+func (ps *pgEventSource) WaitReady() {
ps.setupOnce.Do(ps.setup)
<-ps.ready
}
defer logger(nil).Debug("pgEventSource Run finished")
ps.setupOnce.Do(ps.setup)
+ ready := ps.ready
+ defer func() {
+ if ready != nil {
+ close(ready)
+ }
+ }()
ctx, cancel := context.WithCancel(context.Background())
ps.cancel = cancel
db, err := sql.Open("postgres", ps.DataSource)
if err != nil {
- logger(nil).WithError(err).Fatal("sql.Open failed")
+ logger(nil).WithError(err).Error("sql.Open failed")
return
}
if err = db.Ping(); err != nil {
- logger(nil).WithError(err).Fatal("db.Ping failed")
+ logger(nil).WithError(err).Error("db.Ping failed")
return
}
ps.db = db
ps.pqListener = pq.NewListener(ps.DataSource, time.Second, time.Minute, ps.listenerProblem)
err = ps.pqListener.Listen("logs")
if err != nil {
- logger(nil).WithError(err).Fatal("pq Listen failed")
+ logger(nil).WithError(err).Error("pq Listen failed")
+ return
}
defer ps.pqListener.Close()
logger(nil).Debug("pq Listen setup done")
- close(ps.ready)
+ close(ready)
+ // Avoid double-close in deferred func
+ ready = nil
ps.queue = make(chan *event, ps.QueueSize)
defer close(ps.queue)
sinks[i] = pges.NewSink()
}
- pges.waitReady()
+ pges.WaitReady()
defer pges.cancel()
done := make(chan bool, 1)
import (
"flag"
"fmt"
- "net/http"
- "time"
"git.curoverse.com/arvados.git/sdk/go/config"
"git.curoverse.com/arvados.git/sdk/go/ctxlog"
- "github.com/coreos/go-systemd/daemon"
)
var logger = ctxlog.FromContext
}
log.Info("started")
- eventSource := &pgEventSource{
- DataSource: cfg.Postgres.ConnectionString(),
- QueueSize: cfg.ServerEventQueue,
- }
- srv := &http.Server{
- Addr: cfg.Listen,
- ReadTimeout: time.Minute,
- WriteTimeout: time.Minute,
- MaxHeaderBytes: 1 << 20,
- Handler: &router{
- Config: &cfg,
- eventSource: eventSource,
- newPermChecker: func() permChecker { return newPermChecker(cfg.Client) },
- },
- }
-
- go func() {
- eventSource.Run()
- log.Fatal("event source stopped")
- }()
-
- if _, err := daemon.SdNotify(false, "READY=1"); err != nil {
- log.WithError(err).Warn("error notifying init daemon")
- }
-
- log.WithField("Listen", srv.Addr).Info("listening")
- log.Fatal(srv.ListenAndServe())
+ srv := &server{wsConfig: &cfg}
+ log.Fatal(srv.Run())
}
--- /dev/null
+package main
+
+import (
+ "net"
+ "net/http"
+ "sync"
+ "time"
+
+ "github.com/coreos/go-systemd/daemon"
+)
+
+type server struct {
+ httpServer *http.Server
+ listener net.Listener
+ wsConfig *wsConfig
+ eventSource *pgEventSource
+ setupOnce sync.Once
+}
+
+func (srv *server) Close() {
+ srv.WaitReady()
+ srv.eventSource.Close()
+ srv.listener.Close()
+}
+
+func (srv *server) WaitReady() {
+ srv.setupOnce.Do(srv.setup)
+ srv.eventSource.WaitReady()
+}
+
+func (srv *server) Run() error {
+ srv.setupOnce.Do(srv.setup)
+ return srv.httpServer.Serve(srv.listener)
+}
+
+func (srv *server) setup() {
+ log := logger(nil)
+
+ ln, err := net.Listen("tcp", srv.wsConfig.Listen)
+ if err != nil {
+ log.WithField("Listen", srv.wsConfig.Listen).Fatal(err)
+ }
+ log.WithField("Listen", ln.Addr().String()).Info("listening")
+
+ srv.listener = ln
+ srv.eventSource = &pgEventSource{
+ DataSource: srv.wsConfig.Postgres.ConnectionString(),
+ QueueSize: srv.wsConfig.ServerEventQueue,
+ }
+ srv.httpServer = &http.Server{
+ Addr: srv.wsConfig.Listen,
+ ReadTimeout: time.Minute,
+ WriteTimeout: time.Minute,
+ MaxHeaderBytes: 1 << 20,
+ Handler: &router{
+ Config: srv.wsConfig,
+ eventSource: srv.eventSource,
+ newPermChecker: func() permChecker { return newPermChecker(srv.wsConfig.Client) },
+ },
+ }
+
+ go func() {
+ srv.eventSource.Run()
+ log.Info("event source stopped")
+ srv.Close()
+ }()
+
+ if _, err := daemon.SdNotify(false, "READY=1"); err != nil {
+ log.WithError(err).Warn("error notifying init daemon")
+ }
+}
--- /dev/null
+package main
+
+import (
+ "sync"
+ "time"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+ check "gopkg.in/check.v1"
+)
+
+var _ = check.Suite(&serverSuite{})
+
+type serverSuite struct {
+}
+
+func testConfig() *wsConfig {
+ cfg := defaultConfig()
+ cfg.Client = *(arvados.NewClientFromEnv())
+ cfg.Postgres = testDBConfig()
+ cfg.Listen = ":"
+ return &cfg
+}
+
+// TestBadDB ensures Run() returns an error (instead of panicking or
+// deadlocking) if it can't connect to the database server at startup.
+func (s *serverSuite) TestBadDB(c *check.C) {
+ cfg := testConfig()
+ cfg.Postgres["password"] = "1234"
+ srv := &server{wsConfig: cfg}
+
+ var wg sync.WaitGroup
+ wg.Add(1)
+ go func() {
+ err := srv.Run()
+ c.Check(err, check.NotNil)
+ wg.Done()
+ }()
+ wg.Add(1)
+ go func() {
+ srv.WaitReady()
+ wg.Done()
+ }()
+
+ done := make(chan bool)
+ go func() {
+ wg.Wait()
+ close(done)
+ }()
+ select {
+ case <-done:
+ case <-time.After(10 * time.Second):
+ c.Fatal("timeout")
+ }
+}
+
+func newTestServer() *server {
+ srv := &server{wsConfig: testConfig()}
+ go srv.Run()
+ srv.WaitReady()
+ return srv
+}
"encoding/json"
"fmt"
"io"
- "net"
- "net/http"
"net/url"
"os"
"time"
return lg
}
-func (s *v0Suite) testClient() (*testServer, *websocket.Conn, *json.Decoder, *json.Encoder) {
+func (s *v0Suite) testClient() (*server, *websocket.Conn, *json.Decoder, *json.Encoder) {
srv := newTestServer()
- conn, err := websocket.Dial("ws://"+srv.addr+"/websocket?api_token="+s.token, "", "http://"+srv.addr)
+ conn, err := websocket.Dial("ws://"+srv.listener.Addr().String()+"/websocket?api_token="+s.token, "", "http://"+srv.listener.Addr().String())
if err != nil {
panic(err)
}
r := json.NewDecoder(conn)
return srv, conn, r, w
}
-
-type testServer struct {
- http.Server
- addr string
- ln net.Listener
- pges *pgEventSource
-}
-
-func (srv *testServer) Close() {
- srv.ln.Close()
- srv.pges.cancel()
-}
-
-func newTestServer() *testServer {
- ln, err := net.Listen("tcp", ":")
- if err != nil {
- panic(err)
- }
- cfg := defaultConfig()
- cfg.Client = *(arvados.NewClientFromEnv())
- pges := &pgEventSource{
- DataSource: testDBConfig().ConnectionString(),
- QueueSize: 4,
- }
- srv := &testServer{
- Server: http.Server{
- Addr: ":",
- ReadTimeout: 10 * time.Second,
- WriteTimeout: 10 * time.Second,
- Handler: &router{
- Config: &cfg,
- eventSource: pges,
- newPermChecker: func() permChecker { return newPermChecker(cfg.Client) },
- },
- },
- addr: ln.Addr().String(),
- ln: ln,
- pges: pges,
- }
- go pges.Run()
- go srv.Serve(ln)
- pges.waitReady()
- return srv
-}