Merge branch '11901-ws-db-conns'
authorTom Clegg <tom@curoverse.com>
Tue, 27 Jun 2017 18:02:10 +0000 (14:02 -0400)
committerTom Clegg <tom@curoverse.com>
Tue, 27 Jun 2017 18:02:10 +0000 (14:02 -0400)
closes #11901

Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curoverse.com>

sdk/go/arvadostest/fixtures.go
services/ws/config.go
services/ws/event.go
services/ws/event_source.go
services/ws/event_source_test.go
services/ws/router.go
services/ws/server_test.go
services/ws/session_v0.go
services/ws/session_v0_test.go

index 7e21da4982b3ecb2325f3117008e15a7a8513a7d..cdab4633b45b8cba0922f0a185aab1948c746342 100644 (file)
@@ -7,6 +7,7 @@ const (
        AdminToken              = "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h"
        AnonymousToken          = "4kg6k6lzmp9kj4cpkcoxie964cmvjahbt4fod9zru44k4jqdmi"
        DataManagerToken        = "320mkve8qkswstz7ff61glpk3mhgghmg67wmic7elw4z41pke1"
+       ManagementToken         = "jg3ajndnq63sywcd50gbs5dskdc9ckkysb0nsqmfz08nwf17nl"
        ActiveUserUUID          = "zzzzz-tpzed-xurymjxw79nv3jz"
        SpectatorUserUUID       = "zzzzz-tpzed-l1s2piq4t4mps8r"
        UserAgreementCollection = "zzzzz-4zz18-uukreo9rbgwsujr" // user_agreement_in_anonymously_accessible_project
index 79c2f232daf5059c4b51f7d1bb2fa27a592c5f64..cf82cf8e1064cae202132f8ee97056b72fef2e41 100644 (file)
@@ -17,6 +17,8 @@ type wsConfig struct {
        PingTimeout      arvados.Duration
        ClientEventQueue int
        ServerEventQueue int
+
+       ManagementToken string
 }
 
 func defaultConfig() wsConfig {
index 304f86bbd0583146c4f88a4499d98d872bdab8ea..fd280aebb91aab14700d1ef93395e9ca31becfbd 100644 (file)
@@ -17,6 +17,7 @@ type eventSink interface {
 type eventSource interface {
        NewSink() eventSink
        DB() *sql.DB
+       DBHealth() error
 }
 
 type event struct {
index 7c1b58492dd030ef6b579abe8b699d787a758cc2..daf9a94cc180d8503bca99bedd8a3c6b946c09e6 100644 (file)
@@ -242,6 +242,12 @@ func (ps *pgEventSource) DB() *sql.DB {
        return ps.db
 }
 
+func (ps *pgEventSource) DBHealth() error {
+       ctx, _ := context.WithDeadline(context.Background(), time.Now().Add(time.Second))
+       var i int
+       return ps.db.QueryRowContext(ctx, "SELECT 1").Scan(&i)
+}
+
 func (ps *pgEventSource) DebugStatus() interface{} {
        ps.mtx.Lock()
        defer ps.mtx.Unlock()
@@ -257,6 +263,7 @@ func (ps *pgEventSource) DebugStatus() interface{} {
                "QueueDelay":   stats.Duration(ps.lastQDelay),
                "Sinks":        len(ps.sinks),
                "SinksBlocked": blocked,
+               "DBStats":      ps.db.Stats(),
        }
 }
 
index b157cfa0eb9cf64cb1d7fc5566ea5649246e8a29..94e3ba3ea0e9ac61e376a4fad212182f5de15a0e 100644 (file)
@@ -105,4 +105,6 @@ func (*eventSourceSuite) TestEventSource(c *check.C) {
        case <-time.After(10 * time.Second):
                c.Fatal("timed out")
        }
+
+       c.Check(pges.DBHealth(), check.IsNil)
 }
index 15b825f2abfa293f8ba2734f57508ba9306a03af..77744974d32b7bb45905268461280e8444ca91e6 100644 (file)
@@ -53,8 +53,13 @@ func (rtr *router) setup() {
        rtr.mux = http.NewServeMux()
        rtr.mux.Handle("/websocket", rtr.makeServer(newSessionV0))
        rtr.mux.Handle("/arvados/v1/events.ws", rtr.makeServer(newSessionV1))
-       rtr.mux.HandleFunc("/debug.json", jsonHandler(rtr.DebugStatus))
-       rtr.mux.HandleFunc("/status.json", jsonHandler(rtr.Status))
+       rtr.mux.Handle("/debug.json", rtr.jsonHandler(rtr.DebugStatus))
+       rtr.mux.Handle("/status.json", rtr.jsonHandler(rtr.Status))
+
+       health := http.NewServeMux()
+       rtr.mux.Handle("/_health/", rtr.mgmtAuth(health))
+       health.Handle("/_health/ping", rtr.jsonHandler(rtr.HealthFunc(func() error { return nil })))
+       health.Handle("/_health/db", rtr.jsonHandler(rtr.HealthFunc(rtr.eventSource.DBHealth)))
 }
 
 func (rtr *router) makeServer(newSession sessionFactory) *websocket.Server {
@@ -102,6 +107,21 @@ func (rtr *router) DebugStatus() interface{} {
        return s
 }
 
+var pingResponseOK = map[string]string{"health": "OK"}
+
+func (rtr *router) HealthFunc(f func() error) func() interface{} {
+       return func() interface{} {
+               err := f()
+               if err == nil {
+                       return pingResponseOK
+               }
+               return map[string]string{
+                       "health": "ERROR",
+                       "error":  err.Error(),
+               }
+       }
+}
+
 func (rtr *router) Status() interface{} {
        return map[string]interface{}{
                "Clients": atomic.LoadInt64(&rtr.status.ReqsActive),
@@ -125,16 +145,30 @@ func (rtr *router) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
        rtr.mux.ServeHTTP(resp, req)
 }
 
-func jsonHandler(fn func() interface{}) http.HandlerFunc {
-       return func(resp http.ResponseWriter, req *http.Request) {
-               logger := logger(req.Context())
-               resp.Header().Set("Content-Type", "application/json")
-               enc := json.NewEncoder(resp)
+func (rtr *router) mgmtAuth(h http.Handler) http.Handler {
+       return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+               if rtr.Config.ManagementToken == "" {
+                       http.Error(w, "disabled", http.StatusNotFound)
+               } else if ah := r.Header.Get("Authorization"); ah == "" {
+                       http.Error(w, "authorization required", http.StatusUnauthorized)
+               } else if ah != "Bearer "+rtr.Config.ManagementToken {
+                       http.Error(w, "authorization error", http.StatusForbidden)
+               } else {
+                       h.ServeHTTP(w, r)
+               }
+       })
+}
+
+func (rtr *router) jsonHandler(fn func() interface{}) http.Handler {
+       return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+               logger := logger(r.Context())
+               w.Header().Set("Content-Type", "application/json")
+               enc := json.NewEncoder(w)
                err := enc.Encode(fn())
                if err != nil {
                        msg := "encode failed"
                        logger.WithError(err).Error(msg)
-                       http.Error(resp, msg, http.StatusInternalServerError)
+                       http.Error(w, msg, http.StatusInternalServerError)
                }
-       }
+       })
 }
index d74f7dff4283ef071fb4e1ae6cb4d8e5db1c9f07..3e19b690b55234861a3464b73a2c736a784219e5 100644 (file)
@@ -1,43 +1,53 @@
 package main
 
 import (
+       "io/ioutil"
+       "net/http"
        "sync"
        "time"
 
        "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "git.curoverse.com/arvados.git/sdk/go/arvadostest"
        check "gopkg.in/check.v1"
 )
 
 var _ = check.Suite(&serverSuite{})
 
 type serverSuite struct {
+       cfg *wsConfig
+       srv *server
+       wg  sync.WaitGroup
 }
 
-func testConfig() *wsConfig {
+func (s *serverSuite) SetUpTest(c *check.C) {
+       s.cfg = s.testConfig()
+       s.srv = &server{wsConfig: s.cfg}
+}
+
+func (*serverSuite) testConfig() *wsConfig {
        cfg := defaultConfig()
        cfg.Client = *(arvados.NewClientFromEnv())
        cfg.Postgres = testDBConfig()
        cfg.Listen = ":"
+       cfg.ManagementToken = arvadostest.ManagementToken
        return &cfg
 }
 
 // TestBadDB ensures Run() returns an error (instead of panicking or
 // deadlocking) if it can't connect to the database server at startup.
 func (s *serverSuite) TestBadDB(c *check.C) {
-       cfg := testConfig()
-       cfg.Postgres["password"] = "1234"
-       srv := &server{wsConfig: cfg}
+       s.cfg.Postgres["password"] = "1234"
 
        var wg sync.WaitGroup
        wg.Add(1)
        go func() {
-               err := srv.Run()
+               err := s.srv.Run()
                c.Check(err, check.NotNil)
                wg.Done()
        }()
        wg.Add(1)
        go func() {
-               srv.WaitReady()
+               s.srv.WaitReady()
                wg.Done()
        }()
 
@@ -53,9 +63,42 @@ func (s *serverSuite) TestBadDB(c *check.C) {
        }
 }
 
-func newTestServer() *server {
-       srv := &server{wsConfig: testConfig()}
-       go srv.Run()
-       srv.WaitReady()
-       return srv
+func (s *serverSuite) TestHealth(c *check.C) {
+       go s.srv.Run()
+       defer s.srv.Close()
+       s.srv.WaitReady()
+       for _, token := range []string{"", "foo", s.cfg.ManagementToken} {
+               req, err := http.NewRequest("GET", "http://"+s.srv.listener.Addr().String()+"/_health/ping", nil)
+               c.Assert(err, check.IsNil)
+               if token != "" {
+                       req.Header.Add("Authorization", "Bearer "+token)
+               }
+               resp, err := http.DefaultClient.Do(req)
+               c.Check(err, check.IsNil)
+               if token == s.cfg.ManagementToken {
+                       c.Check(resp.StatusCode, check.Equals, http.StatusOK)
+                       buf, err := ioutil.ReadAll(resp.Body)
+                       c.Check(err, check.IsNil)
+                       c.Check(string(buf), check.Equals, `{"health":"OK"}`+"\n")
+               } else {
+                       c.Check(resp.StatusCode, check.Not(check.Equals), http.StatusOK)
+               }
+       }
+}
+
+func (s *serverSuite) TestHealthDisabled(c *check.C) {
+       s.cfg.ManagementToken = ""
+
+       go s.srv.Run()
+       defer s.srv.Close()
+       s.srv.WaitReady()
+
+       for _, token := range []string{"", "foo", arvadostest.ManagementToken} {
+               req, err := http.NewRequest("GET", "http://"+s.srv.listener.Addr().String()+"/_health/ping", nil)
+               c.Assert(err, check.IsNil)
+               req.Header.Add("Authorization", "Bearer "+token)
+               resp, err := http.DefaultClient.Do(req)
+               c.Check(err, check.IsNil)
+               c.Check(resp.StatusCode, check.Equals, http.StatusNotFound)
+       }
 }
index 44e2a1deb5843f5909fc6f9157bbde1ca045e424..f8645eb887ed735fea07845f2b4ea0c1a5ae4da7 100644 (file)
@@ -157,6 +157,7 @@ func (sub *v0subscribe) sendOldEvents(sess *v0session) {
                sess.log.WithError(err).Error("db.Query failed")
                return
        }
+       defer rows.Close()
        for rows.Next() {
                var id uint64
                err := rows.Scan(&id)
index 85e36560e8d19d8364a8b531b89b89a356948bd0..f6fe3f60e6bcd929ac917bc469d3b86cf2a542c7 100644 (file)
@@ -25,11 +25,13 @@ func init() {
 var _ = check.Suite(&v0Suite{})
 
 type v0Suite struct {
-       token    string
-       toDelete []string
+       serverSuite serverSuite
+       token       string
+       toDelete    []string
 }
 
 func (s *v0Suite) SetUpTest(c *check.C) {
+       s.serverSuite.SetUpTest(c)
        s.token = arvadostest.ActiveToken
 }
 
@@ -227,7 +229,9 @@ func (s *v0Suite) expectLog(c *check.C, r *json.Decoder) *arvados.Log {
 }
 
 func (s *v0Suite) testClient() (*server, *websocket.Conn, *json.Decoder, *json.Encoder) {
-       srv := newTestServer()
+       go s.serverSuite.srv.Run()
+       s.serverSuite.srv.WaitReady()
+       srv := s.serverSuite.srv
        conn, err := websocket.Dial("ws://"+srv.listener.Addr().String()+"/websocket?api_token="+s.token, "", "http://"+srv.listener.Addr().String())
        if err != nil {
                panic(err)