Help: "Open connections to the database",
}, []string{"inuse"})
ps.Reg.MustRegister(openConnections)
+
+ updateDBStats := func() {
+ stats := ps.db.Stats()
+ maxConnections.Set(float64(stats.MaxOpenConnections))
+ openConnections.WithLabelValues("0").Set(float64(stats.Idle))
+ openConnections.WithLabelValues("1").Set(float64(stats.InUse))
+ }
go func() {
<-ps.ready
if ps.db == nil {
return
}
+ updateDBStats()
for range time.Tick(time.Second) {
- stats := ps.db.Stats()
- maxConnections.Set(float64(stats.MaxOpenConnections))
- openConnections.WithLabelValues("0").Set(float64(stats.Idle))
- openConnections.WithLabelValues("1").Set(float64(stats.InUse))
+ updateDBStats()
}
}()
}
return
}
if ps.MaxOpenConns <= 0 {
- ps.Logger.Warn("no database connection limit configured -- consider setting PostgresPool>0 in arvados-ws configuration file")
+ ps.Logger.Warn("no database connection limit configured -- consider setting PostgreSQL.ConnectionPool>0 in arvados-ws configuration file")
}
db.SetMaxOpenConns(ps.MaxOpenConns)
if err = db.Ping(); err != nil {
check "gopkg.in/check.v1"
)
-var _ debugStatuser = (*pgEventSource)(nil)
-
var _ = check.Suite(&eventSourceSuite{})
type eventSourceSuite struct{}
package ws
import (
- "encoding/json"
"io"
"net/http"
- "strconv"
"sync"
"sync/atomic"
"time"
- "git.arvados.org/arvados.git/lib/cmd"
"git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/ctxlog"
"git.arvados.org/arvados.git/sdk/go/health"
setupOnce sync.Once
done chan struct{}
reg *prometheus.Registry
-
- lastReqID int64
- lastReqMtx sync.Mutex
-
- status routerDebugStatus
-}
-
-type routerDebugStatus struct {
- ReqsReceived int64
- ReqsActive int64
-}
-
-type debugStatuser interface {
- DebugStatus() interface{}
}
func (rtr *router) setup() {
QueueSize: rtr.cluster.API.WebsocketClientEventQueue,
}
rtr.mux = http.NewServeMux()
- rtr.mux.Handle("/debug.json", rtr.jsonHandler(rtr.DebugStatus))
- rtr.mux.Handle("/status.json", rtr.jsonHandler(rtr.Status))
-
rtr.mux.Handle("/websocket", rtr.makeServer(newSessionV0, mSockets.WithLabelValues("0")))
rtr.mux.Handle("/arvados/v1/events.ws", rtr.makeServer(newSessionV1, mSockets.WithLabelValues("1")))
rtr.mux.Handle("/_health/", &health.Handler{
Handler: websocket.Handler(func(ws *websocket.Conn) {
t0 := time.Now()
logger := ctxlog.FromContext(ws.Request().Context())
- logger.Info("connected")
atomic.AddInt64(&connected, 1)
gauge.Set(float64(atomic.LoadInt64(&connected)))
logger.WithFields(logrus.Fields{
"elapsed": time.Now().Sub(t0).Seconds(),
"stats": stats,
- }).Info("disconnect")
+ }).Info("client disconnected")
ws.Close()
atomic.AddInt64(&connected, -1)
gauge.Set(float64(atomic.LoadInt64(&connected)))
}
}
-func (rtr *router) newReqID() string {
- rtr.lastReqMtx.Lock()
- defer rtr.lastReqMtx.Unlock()
- id := time.Now().UnixNano()
- if id <= rtr.lastReqID {
- id = rtr.lastReqID + 1
- }
- return strconv.FormatInt(id, 36)
-}
-
-func (rtr *router) DebugStatus() interface{} {
- s := map[string]interface{}{
- "HTTP": rtr.status,
- "Outgoing": rtr.handler.DebugStatus(),
- }
- if es, ok := rtr.eventSource.(debugStatuser); ok {
- s["EventSource"] = es.DebugStatus()
- }
- return s
-}
-
-func (rtr *router) Status() interface{} {
- return map[string]interface{}{
- "Clients": atomic.LoadInt64(&rtr.status.ReqsActive),
- "Version": cmd.Version.String(),
- }
-}
-
func (rtr *router) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
rtr.setupOnce.Do(rtr.setup)
- atomic.AddInt64(&rtr.status.ReqsReceived, 1)
- atomic.AddInt64(&rtr.status.ReqsActive, 1)
- defer atomic.AddInt64(&rtr.status.ReqsActive, -1)
-
- logger := ctxlog.FromContext(req.Context()).
- WithField("RequestID", rtr.newReqID())
- ctx := ctxlog.Context(req.Context(), logger)
- req = req.WithContext(ctx)
- logger.WithFields(logrus.Fields{
- "remoteAddr": req.RemoteAddr,
- "reqForwardedFor": req.Header.Get("X-Forwarded-For"),
- }).Info("accept request")
rtr.mux.ServeHTTP(resp, req)
}
-func (rtr *router) jsonHandler(fn func() interface{}) http.Handler {
- return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
- logger := ctxlog.FromContext(r.Context())
- w.Header().Set("Content-Type", "application/json")
- enc := json.NewEncoder(w)
- err := enc.Encode(fn())
- if err != nil {
- msg := "encode failed"
- logger.WithError(err).Error(msg)
- http.Error(w, msg, http.StatusInternalServerError)
- }
- })
-}
-
func (rtr *router) CheckHealth() error {
rtr.setupOnce.Do(rtr.setup)
return rtr.eventSource.DBHealth()
import (
"bytes"
"context"
- "encoding/json"
"flag"
"io/ioutil"
"net/http"
"net/http/httptest"
"os"
+ "strings"
"sync"
"time"
"git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/arvadostest"
"git.arvados.org/arvados.git/sdk/go/ctxlog"
+ "git.arvados.org/arvados.git/sdk/go/httpserver"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
check "gopkg.in/check.v1"
type serviceSuite struct {
handler service.Handler
+ reg *prometheus.Registry
srv *httptest.Server
cluster *arvados.Cluster
wg sync.WaitGroup
c.Assert(err, check.IsNil)
}
-func (s *serviceSuite) start() {
- s.handler = newHandler(context.Background(), s.cluster, "", prometheus.NewRegistry())
- s.srv = httptest.NewServer(s.handler)
+func (s *serviceSuite) start(c *check.C) {
+ s.reg = prometheus.NewRegistry()
+ s.handler = newHandler(context.Background(), s.cluster, "", s.reg)
+ instrumented := httpserver.Instrument(s.reg, ctxlog.TestLogger(c), s.handler)
+ s.srv = httptest.NewServer(instrumented.ServeAPI(s.cluster.ManagementToken, instrumented))
}
func (s *serviceSuite) TearDownTest(c *check.C) {
cluster.SystemRootToken = client.AuthToken
cluster.TLS.Insecure = client.Insecure
cluster.PostgreSQL.Connection = testDBConfig()
+ cluster.PostgreSQL.ConnectionPool = 12
cluster.Services.Websocket.InternalURLs = map[arvados.URL]arvados.ServiceInstance{arvados.URL{Host: ":"}: arvados.ServiceInstance{}}
cluster.ManagementToken = arvadostest.ManagementToken
return cluster, nil
// startup.
func (s *serviceSuite) TestBadDB(c *check.C) {
s.cluster.PostgreSQL.Connection["password"] = "1234"
- s.start()
+ s.start(c)
resp, err := http.Get(s.srv.URL)
c.Check(err, check.IsNil)
c.Check(resp.StatusCode, check.Equals, http.StatusInternalServerError)
}
func (s *serviceSuite) TestHealth(c *check.C) {
- s.start()
+ s.start(c)
for _, token := range []string{"", "foo", s.cluster.ManagementToken} {
req, err := http.NewRequest("GET", s.srv.URL+"/_health/ping", nil)
c.Assert(err, check.IsNil)
}
}
-func (s *serviceSuite) TestStatus(c *check.C) {
- s.start()
- req, err := http.NewRequest("GET", s.srv.URL+"/status.json", nil)
- c.Assert(err, check.IsNil)
- resp, err := http.DefaultClient.Do(req)
- c.Check(err, check.IsNil)
- c.Check(resp.StatusCode, check.Equals, http.StatusOK)
- var status map[string]interface{}
- err = json.NewDecoder(resp.Body).Decode(&status)
- c.Check(err, check.IsNil)
- c.Check(status["Version"], check.Not(check.Equals), "")
+func (s *serviceSuite) TestMetrics(c *check.C) {
+ s.start(c)
+ s.handler.CheckHealth()
+ for deadline := time.Now().Add(time.Second); ; {
+ req, err := http.NewRequest("GET", s.srv.URL+"/metrics", nil)
+ c.Assert(err, check.IsNil)
+ req.Header.Set("Authorization", "Bearer "+s.cluster.ManagementToken)
+ resp, err := http.DefaultClient.Do(req)
+ c.Check(err, check.IsNil)
+ c.Check(resp.StatusCode, check.Equals, http.StatusOK)
+ text, err := ioutil.ReadAll(resp.Body)
+ c.Check(err, check.IsNil)
+ if strings.Contains(string(text), "_db_max_connections 0\n") {
+ // wait for the first db stats update
+ if time.Now().After(deadline) {
+ c.Fatal("timed out")
+ }
+ time.Sleep(time.Second / 50)
+ continue
+ }
+ c.Check(string(text), check.Matches, `(?ms).*\narvados_ws_db_max_connections 12\n.*`)
+ c.Check(string(text), check.Matches, `(?ms).*\narvados_ws_db_open_connections\{inuse="0"\} \d+\n.*`)
+ c.Check(string(text), check.Matches, `(?ms).*\narvados_ws_db_open_connections\{inuse="1"\} \d+\n.*`)
+ break
+ }
}
func (s *serviceSuite) TestHealthDisabled(c *check.C) {
s.cluster.ManagementToken = ""
- s.start()
+ s.start(c)
for _, token := range []string{"", "foo", arvadostest.ManagementToken} {
req, err := http.NewRequest("GET", s.srv.URL+"/_health/ping", nil)
c.Assert(err, check.IsNil)
func (s *v0Suite) SetUpTest(c *check.C) {
s.serviceSuite.SetUpTest(c)
- s.serviceSuite.start()
+ s.serviceSuite.start(c)
s.token = arvadostest.ActiveToken
s.ignoreLogID = s.lastLogID(c)