10764: De-duplicate real/test server startup. Add test for broken config.
authorTom Clegg <tom@curoverse.com>
Wed, 1 Mar 2017 15:58:16 +0000 (10:58 -0500)
committerTom Clegg <tom@curoverse.com>
Wed, 1 Mar 2017 15:58:16 +0000 (10:58 -0500)
services/ws/event_source.go
services/ws/event_source_test.go
services/ws/main.go
services/ws/server.go [new file with mode: 0644]
services/ws/server_test.go [new file with mode: 0644]
services/ws/session_v0_test.go

index ed1ac0db97a29f8bd3c9e53791e00e2eda6565dd..fe1876cc2788e9ccaaaff2f1f21d3116b3974b38 100644 (file)
@@ -70,9 +70,14 @@ func (ps *pgEventSource) setup() {
        ps.ready = make(chan bool)
 }
 
-// waitReady returns when private fields (cancel, db) are available
-// for tests to use.
-func (ps *pgEventSource) waitReady() {
+// Close stops listening for new events and disconnects all clients.
+func (ps *pgEventSource) Close() {
+       ps.WaitReady()
+       ps.cancel()
+}
+
+// WaitReady returns when the event listener is connected.
+func (ps *pgEventSource) WaitReady() {
        ps.setupOnce.Do(ps.setup)
        <-ps.ready
 }
@@ -84,6 +89,12 @@ func (ps *pgEventSource) Run() {
        defer logger(nil).Debug("pgEventSource Run finished")
 
        ps.setupOnce.Do(ps.setup)
+       ready := ps.ready
+       defer func() {
+               if ready != nil {
+                       close(ready)
+               }
+       }()
 
        ctx, cancel := context.WithCancel(context.Background())
        ps.cancel = cancel
@@ -101,11 +112,11 @@ func (ps *pgEventSource) Run() {
 
        db, err := sql.Open("postgres", ps.DataSource)
        if err != nil {
-               logger(nil).WithError(err).Fatal("sql.Open failed")
+               logger(nil).WithError(err).Error("sql.Open failed")
                return
        }
        if err = db.Ping(); err != nil {
-               logger(nil).WithError(err).Fatal("db.Ping failed")
+               logger(nil).WithError(err).Error("db.Ping failed")
                return
        }
        ps.db = db
@@ -113,12 +124,15 @@ func (ps *pgEventSource) Run() {
        ps.pqListener = pq.NewListener(ps.DataSource, time.Second, time.Minute, ps.listenerProblem)
        err = ps.pqListener.Listen("logs")
        if err != nil {
-               logger(nil).WithError(err).Fatal("pq Listen failed")
+               logger(nil).WithError(err).Error("pq Listen failed")
+               return
        }
        defer ps.pqListener.Close()
        logger(nil).Debug("pq Listen setup done")
 
-       close(ps.ready)
+       close(ready)
+       // Avoid double-close in deferred func
+       ready = nil
 
        ps.queue = make(chan *event, ps.QueueSize)
        defer close(ps.queue)
index 675ac900858aaf5ab50f90683ae32cc40361f8d3..b157cfa0eb9cf64cb1d7fc5566ea5649246e8a29 100644 (file)
@@ -57,7 +57,7 @@ func (*eventSourceSuite) TestEventSource(c *check.C) {
                sinks[i] = pges.NewSink()
        }
 
-       pges.waitReady()
+       pges.WaitReady()
        defer pges.cancel()
 
        done := make(chan bool, 1)
index 9eee8130af9d9b4b8fe831aa48865f5abd64808f..b2b86705cc1b40cef16edd0060fc13977e41e17d 100644 (file)
@@ -3,12 +3,9 @@ package main
 import (
        "flag"
        "fmt"
-       "net/http"
-       "time"
 
        "git.curoverse.com/arvados.git/sdk/go/config"
        "git.curoverse.com/arvados.git/sdk/go/ctxlog"
-       "github.com/coreos/go-systemd/daemon"
 )
 
 var logger = ctxlog.FromContext
@@ -39,31 +36,6 @@ func main() {
        }
 
        log.Info("started")
-       eventSource := &pgEventSource{
-               DataSource: cfg.Postgres.ConnectionString(),
-               QueueSize:  cfg.ServerEventQueue,
-       }
-       srv := &http.Server{
-               Addr:           cfg.Listen,
-               ReadTimeout:    time.Minute,
-               WriteTimeout:   time.Minute,
-               MaxHeaderBytes: 1 << 20,
-               Handler: &router{
-                       Config:         &cfg,
-                       eventSource:    eventSource,
-                       newPermChecker: func() permChecker { return newPermChecker(cfg.Client) },
-               },
-       }
-
-       go func() {
-               eventSource.Run()
-               log.Fatal("event source stopped")
-       }()
-
-       if _, err := daemon.SdNotify(false, "READY=1"); err != nil {
-               log.WithError(err).Warn("error notifying init daemon")
-       }
-
-       log.WithField("Listen", srv.Addr).Info("listening")
-       log.Fatal(srv.ListenAndServe())
+       srv := &server{wsConfig: &cfg}
+       log.Fatal(srv.Run())
 }
diff --git a/services/ws/server.go b/services/ws/server.go
new file mode 100644 (file)
index 0000000..8870ca1
--- /dev/null
@@ -0,0 +1,71 @@
+package main
+
+import (
+       "net"
+       "net/http"
+       "sync"
+       "time"
+
+       "github.com/coreos/go-systemd/daemon"
+)
+
+type server struct {
+       httpServer  *http.Server
+       listener    net.Listener
+       wsConfig    *wsConfig
+       eventSource *pgEventSource
+       setupOnce   sync.Once
+}
+
+func (srv *server) Close() {
+       srv.WaitReady()
+       srv.eventSource.Close()
+       srv.listener.Close()
+}
+
+func (srv *server) WaitReady() {
+       srv.setupOnce.Do(srv.setup)
+       srv.eventSource.WaitReady()
+}
+
+func (srv *server) Run() error {
+       srv.setupOnce.Do(srv.setup)
+       return srv.httpServer.Serve(srv.listener)
+}
+
+func (srv *server) setup() {
+       log := logger(nil)
+
+       ln, err := net.Listen("tcp", srv.wsConfig.Listen)
+       if err != nil {
+               log.WithField("Listen", srv.wsConfig.Listen).Fatal(err)
+       }
+       log.WithField("Listen", ln.Addr().String()).Info("listening")
+
+       srv.listener = ln
+       srv.eventSource = &pgEventSource{
+               DataSource: srv.wsConfig.Postgres.ConnectionString(),
+               QueueSize:  srv.wsConfig.ServerEventQueue,
+       }
+       srv.httpServer = &http.Server{
+               Addr:           srv.wsConfig.Listen,
+               ReadTimeout:    time.Minute,
+               WriteTimeout:   time.Minute,
+               MaxHeaderBytes: 1 << 20,
+               Handler: &router{
+                       Config:         srv.wsConfig,
+                       eventSource:    srv.eventSource,
+                       newPermChecker: func() permChecker { return newPermChecker(srv.wsConfig.Client) },
+               },
+       }
+
+       go func() {
+               srv.eventSource.Run()
+               log.Info("event source stopped")
+               srv.Close()
+       }()
+
+       if _, err := daemon.SdNotify(false, "READY=1"); err != nil {
+               log.WithError(err).Warn("error notifying init daemon")
+       }
+}
diff --git a/services/ws/server_test.go b/services/ws/server_test.go
new file mode 100644 (file)
index 0000000..d74f7df
--- /dev/null
@@ -0,0 +1,61 @@
+package main
+
+import (
+       "sync"
+       "time"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       check "gopkg.in/check.v1"
+)
+
+var _ = check.Suite(&serverSuite{})
+
+type serverSuite struct {
+}
+
+func testConfig() *wsConfig {
+       cfg := defaultConfig()
+       cfg.Client = *(arvados.NewClientFromEnv())
+       cfg.Postgres = testDBConfig()
+       cfg.Listen = ":"
+       return &cfg
+}
+
+// TestBadDB ensures Run() returns an error (instead of panicking or
+// deadlocking) if it can't connect to the database server at startup.
+func (s *serverSuite) TestBadDB(c *check.C) {
+       cfg := testConfig()
+       cfg.Postgres["password"] = "1234"
+       srv := &server{wsConfig: cfg}
+
+       var wg sync.WaitGroup
+       wg.Add(1)
+       go func() {
+               err := srv.Run()
+               c.Check(err, check.NotNil)
+               wg.Done()
+       }()
+       wg.Add(1)
+       go func() {
+               srv.WaitReady()
+               wg.Done()
+       }()
+
+       done := make(chan bool)
+       go func() {
+               wg.Wait()
+               close(done)
+       }()
+       select {
+       case <-done:
+       case <-time.After(10 * time.Second):
+               c.Fatal("timeout")
+       }
+}
+
+func newTestServer() *server {
+       srv := &server{wsConfig: testConfig()}
+       go srv.Run()
+       srv.WaitReady()
+       return srv
+}
index d8a2b698f44d1713a708d3f13c7b17f564263d10..85e36560e8d19d8364a8b531b89b89a356948bd0 100644 (file)
@@ -5,8 +5,6 @@ import (
        "encoding/json"
        "fmt"
        "io"
-       "net"
-       "net/http"
        "net/url"
        "os"
        "time"
@@ -228,9 +226,9 @@ func (s *v0Suite) expectLog(c *check.C, r *json.Decoder) *arvados.Log {
        return lg
 }
 
-func (s *v0Suite) testClient() (*testServer, *websocket.Conn, *json.Decoder, *json.Encoder) {
+func (s *v0Suite) testClient() (*server, *websocket.Conn, *json.Decoder, *json.Encoder) {
        srv := newTestServer()
-       conn, err := websocket.Dial("ws://"+srv.addr+"/websocket?api_token="+s.token, "", "http://"+srv.addr)
+       conn, err := websocket.Dial("ws://"+srv.listener.Addr().String()+"/websocket?api_token="+s.token, "", "http://"+srv.listener.Addr().String())
        if err != nil {
                panic(err)
        }
@@ -238,47 +236,3 @@ func (s *v0Suite) testClient() (*testServer, *websocket.Conn, *json.Decoder, *js
        r := json.NewDecoder(conn)
        return srv, conn, r, w
 }
-
-type testServer struct {
-       http.Server
-       addr string
-       ln   net.Listener
-       pges *pgEventSource
-}
-
-func (srv *testServer) Close() {
-       srv.ln.Close()
-       srv.pges.cancel()
-}
-
-func newTestServer() *testServer {
-       ln, err := net.Listen("tcp", ":")
-       if err != nil {
-               panic(err)
-       }
-       cfg := defaultConfig()
-       cfg.Client = *(arvados.NewClientFromEnv())
-       pges := &pgEventSource{
-               DataSource: testDBConfig().ConnectionString(),
-               QueueSize:  4,
-       }
-       srv := &testServer{
-               Server: http.Server{
-                       Addr:         ":",
-                       ReadTimeout:  10 * time.Second,
-                       WriteTimeout: 10 * time.Second,
-                       Handler: &router{
-                               Config:         &cfg,
-                               eventSource:    pges,
-                               newPermChecker: func() permChecker { return newPermChecker(cfg.Client) },
-                       },
-               },
-               addr: ln.Addr().String(),
-               ln:   ln,
-               pges: pges,
-       }
-       go pges.Run()
-       go srv.Serve(ln)
-       pges.waitReady()
-       return srv
-}