Merge branch '16053-install-deps'
authorTom Clegg <tom@tomclegg.ca>
Tue, 7 Apr 2020 19:25:16 +0000 (15:25 -0400)
committerTom Clegg <tom@tomclegg.ca>
Tue, 7 Apr 2020 19:25:16 +0000 (15:25 -0400)
refs #16053

Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@tomclegg.ca>

37 files changed:
build/run-build-packages.sh
cmd/arvados-server/arvados-ws.service [moved from services/ws/arvados-ws.service with 94% similarity]
cmd/arvados-server/cmd.go
doc/admin/metrics.html.textile.liquid
go.mod
lib/boot/supervisor.go
lib/controller/handler.go
lib/dispatchcloud/dispatcher.go
lib/service/cmd.go
lib/service/cmd_test.go
lib/service/error.go
sdk/go/arvados/config.go
sdk/go/arvados/config_test.go
sdk/go/health/aggregator.go
sdk/go/httpserver/logger.go
sdk/python/tests/run_test_server.py
services/keep-balance/server.go
services/keepstore/command.go
services/ws/doc.go
services/ws/event.go
services/ws/event_source.go
services/ws/event_source_test.go
services/ws/event_test.go
services/ws/gocheck_test.go
services/ws/handler.go
services/ws/main.go [deleted file]
services/ws/permission.go
services/ws/permission_test.go
services/ws/router.go
services/ws/server.go [deleted file]
services/ws/service.go [new file with mode: 0644]
services/ws/service_test.go [moved from services/ws/server_test.go with 55% similarity]
services/ws/session.go
services/ws/session_v0.go
services/ws/session_v0_test.go
services/ws/session_v1.go
tools/arvbox/lib/arvbox/docker/service/websockets/run-service

index 4faa1c6b0d4b0e83d12d27b997615fbf78031284..3ba1dcc05e8776fc57a205e2deb79a0224a8e370 100755 (executable)
@@ -308,7 +308,7 @@ package_go_binary services/keepstore keepstore \
     "Keep storage daemon, accessible to clients on the LAN"
 package_go_binary services/keep-web keep-web \
     "Static web hosting service for user data stored in Arvados Keep"
-package_go_binary services/ws arvados-ws \
+package_go_binary cmd/arvados-server arvados-ws \
     "Arvados Websocket server"
 package_go_binary tools/sync-groups arvados-sync-groups \
     "Synchronize remote groups into Arvados from an external source"
similarity index 94%
rename from services/ws/arvados-ws.service
rename to cmd/arvados-server/arvados-ws.service
index 36624c78779c02cfde829323551ca9c2cb19eda3..aebc56a79f333b19f061f5f0aadce793e799529c 100644 (file)
@@ -6,6 +6,7 @@
 Description=Arvados websocket server
 Documentation=https://doc.arvados.org/
 After=network.target
+AssertPathExists=/etc/arvados/config.yml
 
 # systemd==229 (ubuntu:xenial) obeys StartLimitInterval in the [Unit] section
 StartLimitInterval=0
index 328599826f79cdf95f49965219114b9f156ec081..fcea2223da70d5a174ee74b8281ebd3d20e0b503 100644 (file)
@@ -15,6 +15,7 @@ import (
        "git.arvados.org/arvados.git/lib/crunchrun"
        "git.arvados.org/arvados.git/lib/dispatchcloud"
        "git.arvados.org/arvados.git/lib/install"
+       "git.arvados.org/arvados.git/services/ws"
 )
 
 var (
@@ -26,12 +27,13 @@ var (
                "boot":            boot.Command,
                "cloudtest":       cloudtest.Command,
                "config-check":    config.CheckCommand,
-               "config-dump":     config.DumpCommand,
                "config-defaults": config.DumpDefaultsCommand,
+               "config-dump":     config.DumpCommand,
                "controller":      controller.Command,
                "crunch-run":      crunchrun.Command,
                "dispatch-cloud":  dispatchcloud.Command,
                "install":         install.Command,
+               "ws":              ws.Command,
        })
 )
 
index 9616d4add43a44105d78fbf5ff6f4ae9b8e1c3cd..a6a0862c4f1d1383a44a80832b42cebaafd7f569 100644 (file)
@@ -36,7 +36,7 @@ table(table table-bordered table-condensed table-hover).
 |arvados-dispatch-cloud|✓|
 |arvados-git-httpd||
 |arvados-node-manager||
-|arvados-ws||
+|arvados-ws||
 |composer||
 |keepproxy||
 |keepstore|✓|
diff --git a/go.mod b/go.mod
index 2cc5e89eb1fe68c88335e2ba2e2906e1bb2d9c33..48b1c725a5986bfe249d3444ed5698a9ad193bd3 100644 (file)
--- a/go.mod
+++ b/go.mod
@@ -52,7 +52,7 @@ require (
        golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550
        golang.org/x/net v0.0.0-20190620200207-3b0461eec859
        golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45
-       golang.org/x/sys v0.0.0-20191105231009-c1f44814a5cd // indirect
+       golang.org/x/sys v0.0.0-20191105231009-c1f44814a5cd
        google.golang.org/api v0.13.0
        gopkg.in/check.v1 v1.0.0-20161208181325-20d25e280405
        gopkg.in/square/go-jose.v2 v2.3.1
index 9f50013e39841517fb486f35d226ea344b359946..7f5d6a9baae2dd4eaa2b2e66fea9585f7be3bdc1 100644 (file)
@@ -182,7 +182,7 @@ func (super *Supervisor) run(cfg *arvados.Config) error {
                runGoProgram{src: "services/keepproxy", svc: super.cluster.Services.Keepproxy, depends: []supervisedTask{runPassenger{src: "services/api"}}},
                runGoProgram{src: "services/keepstore", svc: super.cluster.Services.Keepstore},
                runGoProgram{src: "services/keep-web", svc: super.cluster.Services.WebDAV},
-               runGoProgram{src: "services/ws", svc: super.cluster.Services.Websocket, depends: []supervisedTask{runPostgreSQL{}}},
+               runServiceCommand{name: "ws", svc: super.cluster.Services.Websocket, depends: []supervisedTask{runPostgreSQL{}}},
                installPassenger{src: "services/api"},
                runPassenger{src: "services/api", svc: super.cluster.Services.RailsAPI, depends: []supervisedTask{createCertificates{}, runPostgreSQL{}, installPassenger{src: "services/api"}}},
                installPassenger{src: "apps/workbench", depends: []supervisedTask{installPassenger{src: "services/api"}}}, // dependency ensures workbench doesn't delay api startup
index e3869261a160b4e42d147574410f15b9641b4e9d..3929a1103fb074b35234af17926aa71cb98df76d 100644 (file)
@@ -67,6 +67,10 @@ func (h *Handler) CheckHealth() error {
        return err
 }
 
+func (h *Handler) Done() <-chan struct{} {
+       return nil
+}
+
 func neverRedirect(*http.Request, []*http.Request) error { return http.ErrUseLastResponse }
 
 func (h *Handler) setup() {
index 4023896f7933dbbd489387405419097dc083434e..02b6c976aec825f810eab3cca43488c808d5cc4e 100644 (file)
@@ -82,6 +82,11 @@ func (disp *dispatcher) CheckHealth() error {
        return disp.pool.CheckHealth()
 }
 
+// Done implements service.Handler.
+func (disp *dispatcher) Done() <-chan struct{} {
+       return disp.stopped
+}
+
 // Stop dispatching containers and release resources. Typically used
 // in tests.
 func (disp *dispatcher) Close() {
index 7f2f78ee9a9f7224aac4aacba94148497f292a5e..1e7a9a36edd3a8142192d14bfcfbf12885e1e857 100644 (file)
@@ -29,6 +29,7 @@ import (
 type Handler interface {
        http.Handler
        CheckHealth() error
+       Done() <-chan struct{}
 }
 
 type NewHandlerFunc func(_ context.Context, _ *arvados.Cluster, token string, registry *prometheus.Registry) Handler
@@ -148,9 +149,15 @@ func (c *command) RunCommand(prog string, args []string, stdin io.Reader, stdout
                logger.WithError(err).Errorf("error notifying init daemon")
        }
        go func() {
+               // Shut down server if caller cancels context
                <-ctx.Done()
                srv.Close()
        }()
+       go func() {
+               // Shut down server if handler dies
+               <-handler.Done()
+               srv.Close()
+       }()
        err = srv.Wait()
        if err != nil {
                return 1
index 86039c4dd1fa111d2de292676f4773c9bdc203a1..ec7834972c2609aeb5e4cd14099d35367a7e3c09 100644 (file)
@@ -135,6 +135,7 @@ type testHandler struct {
        healthCheck chan bool
 }
 
+func (th *testHandler) Done() <-chan struct{}                            { return nil }
 func (th *testHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { th.handler.ServeHTTP(w, r) }
 func (th *testHandler) CheckHealth() error {
        ctxlog.FromContext(th.ctx).Info("CheckHealth called")
index c4049f7064d70a5a88c654be10cff478bb0f42f3..a4d7370d1b8d0d6a3b630025a181a56c029ed3b0 100644 (file)
@@ -36,3 +36,15 @@ func (eh errorHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 func (eh errorHandler) CheckHealth() error {
        return eh.err
 }
+
+// Done returns a closed channel to indicate the service has
+// stopped/failed.
+func (eh errorHandler) Done() <-chan struct{} {
+       return doneChannel
+}
+
+var doneChannel = func() <-chan struct{} {
+       done := make(chan struct{})
+       close(done)
+       return done
+}()
index a70980cbde232cc562155bbfb813d0f1cf32afbc..79e47ba5d1648a096d455bd46eb86d0cd200c8d0 100644 (file)
@@ -421,6 +421,24 @@ var errDuplicateInstanceTypeName = errors.New("duplicate instance type name")
 // UnmarshalJSON handles old config files that provide an array of
 // instance types instead of a hash.
 func (it *InstanceTypeMap) UnmarshalJSON(data []byte) error {
+       fixup := func(t InstanceType) (InstanceType, error) {
+               if t.ProviderType == "" {
+                       t.ProviderType = t.Name
+               }
+               if t.Scratch == 0 {
+                       t.Scratch = t.IncludedScratch + t.AddedScratch
+               } else if t.AddedScratch == 0 {
+                       t.AddedScratch = t.Scratch - t.IncludedScratch
+               } else if t.IncludedScratch == 0 {
+                       t.IncludedScratch = t.Scratch - t.AddedScratch
+               }
+
+               if t.Scratch != (t.IncludedScratch + t.AddedScratch) {
+                       return t, fmt.Errorf("InstanceType %q: Scratch != (IncludedScratch + AddedScratch)", t.Name)
+               }
+               return t, nil
+       }
+
        if len(data) > 0 && data[0] == '[' {
                var arr []InstanceType
                err := json.Unmarshal(data, &arr)
@@ -436,19 +454,9 @@ func (it *InstanceTypeMap) UnmarshalJSON(data []byte) error {
                        if _, ok := (*it)[t.Name]; ok {
                                return errDuplicateInstanceTypeName
                        }
-                       if t.ProviderType == "" {
-                               t.ProviderType = t.Name
-                       }
-                       if t.Scratch == 0 {
-                               t.Scratch = t.IncludedScratch + t.AddedScratch
-                       } else if t.AddedScratch == 0 {
-                               t.AddedScratch = t.Scratch - t.IncludedScratch
-                       } else if t.IncludedScratch == 0 {
-                               t.IncludedScratch = t.Scratch - t.AddedScratch
-                       }
-
-                       if t.Scratch != (t.IncludedScratch + t.AddedScratch) {
-                               return fmt.Errorf("%v: Scratch != (IncludedScratch + AddedScratch)", t.Name)
+                       t, err := fixup(t)
+                       if err != nil {
+                               return err
                        }
                        (*it)[t.Name] = t
                }
@@ -464,8 +472,9 @@ func (it *InstanceTypeMap) UnmarshalJSON(data []byte) error {
        *it = InstanceTypeMap(hash)
        for name, t := range *it {
                t.Name = name
-               if t.ProviderType == "" {
-                       t.ProviderType = name
+               t, err := fixup(t)
+               if err != nil {
+                       return err
                }
                (*it)[name] = t
        }
index b984cb5669ce851f2ec1f136a9c96bfb0d06b832..e4d26e03fd3f8101ad339f648b1efbaa56208437 100644 (file)
@@ -45,3 +45,29 @@ func (s *ConfigSuite) TestInstanceTypeSize(c *check.C) {
        c.Check(int64(it.Scratch), check.Equals, int64(4000000000))
        c.Check(int64(it.RAM), check.Equals, int64(4294967296))
 }
+
+func (s *ConfigSuite) TestInstanceTypeFixup(c *check.C) {
+       for _, confdata := range []string{
+               // Current format: map of entries
+               `{foo4: {IncludedScratch: 4GB}, foo8: {ProviderType: foo_8, Scratch: 8GB}}`,
+               // Legacy format: array of entries with key in "Name" field
+               `[{Name: foo4, IncludedScratch: 4GB}, {Name: foo8, ProviderType: foo_8, Scratch: 8GB}]`,
+       } {
+               c.Log(confdata)
+               var itm InstanceTypeMap
+               err := yaml.Unmarshal([]byte(confdata), &itm)
+               c.Check(err, check.IsNil)
+
+               c.Check(itm["foo4"].Name, check.Equals, "foo4")
+               c.Check(itm["foo4"].ProviderType, check.Equals, "foo4")
+               c.Check(itm["foo4"].Scratch, check.Equals, ByteSize(4000000000))
+               c.Check(itm["foo4"].AddedScratch, check.Equals, ByteSize(0))
+               c.Check(itm["foo4"].IncludedScratch, check.Equals, ByteSize(4000000000))
+
+               c.Check(itm["foo8"].Name, check.Equals, "foo8")
+               c.Check(itm["foo8"].ProviderType, check.Equals, "foo_8")
+               c.Check(itm["foo8"].Scratch, check.Equals, ByteSize(8000000000))
+               c.Check(itm["foo8"].AddedScratch, check.Equals, ByteSize(8000000000))
+               c.Check(itm["foo8"].IncludedScratch, check.Equals, ByteSize(0))
+       }
+}
index a0284e8f247a60f8d2fd57b752f37a800d54c222..794adabdd3926b6b04036a6c62b1044f2e8f13d5 100644 (file)
@@ -46,6 +46,10 @@ func (agg *Aggregator) CheckHealth() error {
        return nil
 }
 
+func (agg *Aggregator) Done() <-chan struct{} {
+       return nil
+}
+
 func (agg *Aggregator) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
        agg.setupOnce.Do(agg.setup)
        sendErr := func(statusCode int, err error) {
index 8886f9517dfd5983032235e713a000f5615880b7..59981e3e55265be4eed1827d3570391533ac3a30 100644 (file)
@@ -53,10 +53,22 @@ func LogRequests(h http.Handler) http.Handler {
 
                logRequest(w, req, lgr)
                defer logResponse(w, req, lgr)
-               h.ServeHTTP(w, req)
+               h.ServeHTTP(rewrapResponseWriter(w, wrapped), req)
        })
 }
 
+// Rewrap w to restore additional interfaces provided by wrapped.
+func rewrapResponseWriter(w http.ResponseWriter, wrapped http.ResponseWriter) http.ResponseWriter {
+       if hijacker, ok := wrapped.(http.Hijacker); ok {
+               return struct {
+                       http.ResponseWriter
+                       http.Hijacker
+               }{w, hijacker}
+       } else {
+               return w
+       }
+}
+
 func Logger(req *http.Request) logrus.FieldLogger {
        return ctxlog.FromContext(req.Context())
 }
index 4f4d13858d3bcd72feb0c1f95a1b19fb536763b8..22d4f62ea0fd1bf5a8d6718e2d410b79d5377d72 100644 (file)
@@ -430,7 +430,8 @@ def run_ws():
     stop_ws()
     port = internal_port_from_config("Websocket")
     logf = open(_logfilename('ws'), 'a')
-    ws = subprocess.Popen(["ws"],
+    ws = subprocess.Popen(
+        ["arvados-server", "ws"],
         stdin=open('/dev/null'), stdout=logf, stderr=logf, close_fds=True)
     with open(_pidfile('ws'), 'w') as f:
         f.write(str(ws.pid))
index 05658b5e5d7f17a4dcd9bd099d81d68950f97a8f..9801a3fd45d5d13ec40bf661c59b4de5156cfeed 100644 (file)
@@ -53,6 +53,11 @@ func (srv *Server) CheckHealth() error {
        return nil
 }
 
+// Done implements service.Handler.
+func (srv *Server) Done() <-chan struct{} {
+       return nil
+}
+
 func (srv *Server) run() {
        var err error
        if srv.RunOptions.Once {
index e0509393cff077e119b6ea7b975d10f90115d536..be2639773650fe5312ef80759cc427bd5cf0c14c 100644 (file)
@@ -132,6 +132,10 @@ func (h *handler) CheckHealth() error {
        return h.err
 }
 
+func (h *handler) Done() <-chan struct{} {
+       return nil
+}
+
 func newHandlerOrErrorHandler(ctx context.Context, cluster *arvados.Cluster, token string, reg *prometheus.Registry) service.Handler {
        var h handler
        serviceURL, ok := service.URLFromContext(ctx)
index 806c3355da6c693350493a7471bc59e270bfb1e3..6a86cbe7a8307e1683dbd09ea506bc8cd79f52e3 100644 (file)
 // Developer info
 //
 // See https://dev.arvados.org/projects/arvados/wiki/Hacking_websocket_server.
-//
-// Usage
-//
-//     arvados-ws [-legacy-ws-config /etc/arvados/ws/ws.yml] [-dump-config]
-//
-// Options
-//
-// -legacy-ws-config path
-//
-// Load legacy configuration from the given file instead of the default
-// /etc/arvados/ws/ws.yml, legacy config overrides the clusterwide config.yml.
-//
-// -dump-config
-//
-// Print the loaded configuration to stdout and exit.
-//
-// Logs
-//
-// Logs are printed to stderr, formatted as JSON.
-//
-// A log is printed each time a client connects or disconnects.
-//
-// Enable additional logs by configuring:
-//
-//     LogLevel: debug
-//
-// Runtime status
-//
-// GET /debug.json responds with debug stats.
-//
-// GET /status.json responds with health check results and
-// activity/usage metrics.
-package main
+package ws
index ae545c092cf8ddece45cfbebdddb542e08de16b4..c989c0ca559b1a1cff472b2cc1bdb95b4fd021ce 100644 (file)
@@ -2,7 +2,7 @@
 //
 // SPDX-License-Identifier: AGPL-3.0
 
-package main
+package ws
 
 import (
        "database/sql"
@@ -11,6 +11,7 @@ import (
 
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "github.com/ghodss/yaml"
+       "github.com/sirupsen/logrus"
 )
 
 type eventSink interface {
@@ -31,6 +32,7 @@ type event struct {
        Serial   uint64
 
        db     *sql.DB
+       logger logrus.FieldLogger
        logRow *arvados.Log
        err    error
        mtx    sync.Mutex
@@ -57,12 +59,12 @@ func (e *event) Detail() *arvados.Log {
                &logRow.CreatedAt,
                &propYAML)
        if e.err != nil {
-               logger(nil).WithField("LogID", e.LogID).WithError(e.err).Error("QueryRow failed")
+               e.logger.WithField("LogID", e.LogID).WithError(e.err).Error("QueryRow failed")
                return nil
        }
        e.err = yaml.Unmarshal(propYAML, &logRow.Properties)
        if e.err != nil {
-               logger(nil).WithField("LogID", e.LogID).WithError(e.err).Error("yaml decode failed")
+               e.logger.WithField("LogID", e.LogID).WithError(e.err).Error("yaml decode failed")
                return nil
        }
        e.logRow = &logRow
index 3a82bf62b3e9351a95d2abe4c56ae942fededa4c..3593c3aebd58ceae6932e9667eca43aba8a8c0cf 100644 (file)
@@ -2,7 +2,7 @@
 //
 // SPDX-License-Identifier: AGPL-3.0
 
-package main
+package ws
 
 import (
        "context"
@@ -11,17 +11,20 @@ import (
        "fmt"
        "strconv"
        "sync"
-       "sync/atomic"
        "time"
 
        "git.arvados.org/arvados.git/sdk/go/stats"
        "github.com/lib/pq"
+       "github.com/prometheus/client_golang/prometheus"
+       "github.com/sirupsen/logrus"
 )
 
 type pgEventSource struct {
        DataSource   string
        MaxOpenConns int
        QueueSize    int
+       Logger       logrus.FieldLogger
+       Reg          *prometheus.Registry
 
        db         *sql.DB
        pqListener *pq.Listener
@@ -30,8 +33,8 @@ type pgEventSource struct {
        mtx        sync.Mutex
 
        lastQDelay time.Duration
-       eventsIn   uint64
-       eventsOut  uint64
+       eventsIn   prometheus.Counter
+       eventsOut  prometheus.Counter
 
        cancel func()
 
@@ -39,18 +42,16 @@ type pgEventSource struct {
        ready     chan bool
 }
 
-var _ debugStatuser = (*pgEventSource)(nil)
-
 func (ps *pgEventSource) listenerProblem(et pq.ListenerEventType, err error) {
        if et == pq.ListenerEventConnected {
-               logger(nil).Debug("pgEventSource connected")
+               ps.Logger.Debug("pgEventSource connected")
                return
        }
 
        // Until we have a mechanism for catching up on missed events,
        // we cannot recover from a dropped connection without
        // breaking our promises to clients.
-       logger(nil).
+       ps.Logger.
                WithField("eventType", et).
                WithError(err).
                Error("listener problem")
@@ -59,6 +60,95 @@ func (ps *pgEventSource) listenerProblem(et pq.ListenerEventType, err error) {
 
 func (ps *pgEventSource) setup() {
        ps.ready = make(chan bool)
+       ps.Reg.MustRegister(prometheus.NewGaugeFunc(
+               prometheus.GaugeOpts{
+                       Namespace: "arvados",
+                       Subsystem: "ws",
+                       Name:      "queue_len",
+                       Help:      "Current number of events in queue",
+               }, func() float64 { return float64(len(ps.queue)) }))
+       ps.Reg.MustRegister(prometheus.NewGaugeFunc(
+               prometheus.GaugeOpts{
+                       Namespace: "arvados",
+                       Subsystem: "ws",
+                       Name:      "queue_cap",
+                       Help:      "Event queue capacity",
+               }, func() float64 { return float64(cap(ps.queue)) }))
+       ps.Reg.MustRegister(prometheus.NewGaugeFunc(
+               prometheus.GaugeOpts{
+                       Namespace: "arvados",
+                       Subsystem: "ws",
+                       Name:      "queue_delay",
+                       Help:      "Queue delay of the last emitted event",
+               }, func() float64 { return ps.lastQDelay.Seconds() }))
+       ps.Reg.MustRegister(prometheus.NewGaugeFunc(
+               prometheus.GaugeOpts{
+                       Namespace: "arvados",
+                       Subsystem: "ws",
+                       Name:      "sinks",
+                       Help:      "Number of active sinks (connections)",
+               }, func() float64 { return float64(len(ps.sinks)) }))
+       ps.Reg.MustRegister(prometheus.NewGaugeFunc(
+               prometheus.GaugeOpts{
+                       Namespace: "arvados",
+                       Subsystem: "ws",
+                       Name:      "sinks_blocked",
+                       Help:      "Number of sinks (connections) that are busy and blocking the main event stream",
+               }, func() float64 {
+                       ps.mtx.Lock()
+                       defer ps.mtx.Unlock()
+                       blocked := 0
+                       for sink := range ps.sinks {
+                               blocked += len(sink.channel)
+                       }
+                       return float64(blocked)
+               }))
+       ps.eventsIn = prometheus.NewCounter(prometheus.CounterOpts{
+               Namespace: "arvados",
+               Subsystem: "ws",
+               Name:      "events_in",
+               Help:      "Number of events received from postgresql notify channel",
+       })
+       ps.Reg.MustRegister(ps.eventsIn)
+       ps.eventsOut = prometheus.NewCounter(prometheus.CounterOpts{
+               Namespace: "arvados",
+               Subsystem: "ws",
+               Name:      "events_out",
+               Help:      "Number of events sent to client sessions (before filtering)",
+       })
+       ps.Reg.MustRegister(ps.eventsOut)
+
+       maxConnections := prometheus.NewGauge(prometheus.GaugeOpts{
+               Namespace: "arvados",
+               Subsystem: "ws",
+               Name:      "db_max_connections",
+               Help:      "Maximum number of open connections to the database",
+       })
+       ps.Reg.MustRegister(maxConnections)
+       openConnections := prometheus.NewGaugeVec(prometheus.GaugeOpts{
+               Namespace: "arvados",
+               Subsystem: "ws",
+               Name:      "db_open_connections",
+               Help:      "Open connections to the database",
+       }, []string{"inuse"})
+       ps.Reg.MustRegister(openConnections)
+
+       updateDBStats := func() {
+               stats := ps.db.Stats()
+               maxConnections.Set(float64(stats.MaxOpenConnections))
+               openConnections.WithLabelValues("0").Set(float64(stats.Idle))
+               openConnections.WithLabelValues("1").Set(float64(stats.InUse))
+       }
+       go func() {
+               <-ps.ready
+               if ps.db == nil {
+                       return
+               }
+               updateDBStats()
+               for range time.Tick(time.Second) {
+                       updateDBStats()
+               }
+       }()
 }
 
 // Close stops listening for new events and disconnects all clients.
@@ -76,8 +166,8 @@ func (ps *pgEventSource) WaitReady() {
 // Run listens for event notifications on the "logs" channel and sends
 // them to all subscribers.
 func (ps *pgEventSource) Run() {
-       logger(nil).Debug("pgEventSource Run starting")
-       defer logger(nil).Debug("pgEventSource Run finished")
+       ps.Logger.Debug("pgEventSource Run starting")
+       defer ps.Logger.Debug("pgEventSource Run finished")
 
        ps.setupOnce.Do(ps.setup)
        ready := ps.ready
@@ -103,15 +193,15 @@ func (ps *pgEventSource) Run() {
 
        db, err := sql.Open("postgres", ps.DataSource)
        if err != nil {
-               logger(nil).WithError(err).Error("sql.Open failed")
+               ps.Logger.WithError(err).Error("sql.Open failed")
                return
        }
        if ps.MaxOpenConns <= 0 {
-               logger(nil).Warn("no database connection limit configured -- consider setting PostgresPool>0 in arvados-ws configuration file")
+               ps.Logger.Warn("no database connection limit configured -- consider setting PostgreSQL.ConnectionPool>0 in arvados-ws configuration file")
        }
        db.SetMaxOpenConns(ps.MaxOpenConns)
        if err = db.Ping(); err != nil {
-               logger(nil).WithError(err).Error("db.Ping failed")
+               ps.Logger.WithError(err).Error("db.Ping failed")
                return
        }
        ps.db = db
@@ -119,11 +209,11 @@ func (ps *pgEventSource) Run() {
        ps.pqListener = pq.NewListener(ps.DataSource, time.Second, time.Minute, ps.listenerProblem)
        err = ps.pqListener.Listen("logs")
        if err != nil {
-               logger(nil).WithError(err).Error("pq Listen failed")
+               ps.Logger.WithError(err).Error("pq Listen failed")
                return
        }
        defer ps.pqListener.Close()
-       logger(nil).Debug("pq Listen setup done")
+       ps.Logger.Debug("pq Listen setup done")
 
        close(ready)
        // Avoid double-close in deferred func
@@ -141,7 +231,7 @@ func (ps *pgEventSource) Run() {
                        // client_count X client_queue_size.
                        e.Detail()
 
-                       logger(nil).
+                       ps.Logger.
                                WithField("serial", e.Serial).
                                WithField("detail", e.Detail()).
                                Debug("event ready")
@@ -149,9 +239,9 @@ func (ps *pgEventSource) Run() {
                        ps.lastQDelay = e.Ready.Sub(e.Received)
 
                        ps.mtx.Lock()
-                       atomic.AddUint64(&ps.eventsOut, uint64(len(ps.sinks)))
                        for sink := range ps.sinks {
                                sink.channel <- e
+                               ps.eventsOut.Inc()
                        }
                        ps.mtx.Unlock()
                }
@@ -163,11 +253,11 @@ func (ps *pgEventSource) Run() {
        for {
                select {
                case <-ctx.Done():
-                       logger(nil).Debug("ctx done")
+                       ps.Logger.Debug("ctx done")
                        return
 
                case <-ticker.C:
-                       logger(nil).Debug("listener ping")
+                       ps.Logger.Debug("listener ping")
                        err := ps.pqListener.Ping()
                        if err != nil {
                                ps.listenerProblem(-1, fmt.Errorf("pqListener ping failed: %s", err))
@@ -176,7 +266,7 @@ func (ps *pgEventSource) Run() {
 
                case pqEvent, ok := <-ps.pqListener.Notify:
                        if !ok {
-                               logger(nil).Error("pqListener Notify chan closed")
+                               ps.Logger.Error("pqListener Notify chan closed")
                                return
                        }
                        if pqEvent == nil {
@@ -188,12 +278,12 @@ func (ps *pgEventSource) Run() {
                                continue
                        }
                        if pqEvent.Channel != "logs" {
-                               logger(nil).WithField("pqEvent", pqEvent).Error("unexpected notify from wrong channel")
+                               ps.Logger.WithField("pqEvent", pqEvent).Error("unexpected notify from wrong channel")
                                continue
                        }
                        logID, err := strconv.ParseUint(pqEvent.Extra, 10, 64)
                        if err != nil {
-                               logger(nil).WithField("pqEvent", pqEvent).Error("bad notify payload")
+                               ps.Logger.WithField("pqEvent", pqEvent).Error("bad notify payload")
                                continue
                        }
                        serial++
@@ -202,9 +292,10 @@ func (ps *pgEventSource) Run() {
                                Received: time.Now(),
                                Serial:   serial,
                                db:       ps.db,
+                               logger:   ps.Logger,
                        }
-                       logger(nil).WithField("event", e).Debug("incoming")
-                       atomic.AddUint64(&ps.eventsIn, 1)
+                       ps.Logger.WithField("event", e).Debug("incoming")
+                       ps.eventsIn.Inc()
                        ps.queue <- e
                        go e.Detail()
                }
@@ -238,6 +329,9 @@ func (ps *pgEventSource) DB() *sql.DB {
 }
 
 func (ps *pgEventSource) DBHealth() error {
+       if ps.db == nil {
+               return errors.New("database not connected")
+       }
        ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second))
        defer cancel()
        var i int
@@ -252,8 +346,6 @@ func (ps *pgEventSource) DebugStatus() interface{} {
                blocked += len(sink.channel)
        }
        return map[string]interface{}{
-               "EventsIn":     atomic.LoadUint64(&ps.eventsIn),
-               "EventsOut":    atomic.LoadUint64(&ps.eventsOut),
                "Queue":        len(ps.queue),
                "QueueLimit":   cap(ps.queue),
                "QueueDelay":   stats.Duration(ps.lastQDelay),
index 98a9e8b9785b40dbd8f5314bcedb98bd083efe44..b7b8ac3006f3fa6af19de31737af82129dbf8642 100644 (file)
@@ -2,17 +2,17 @@
 //
 // SPDX-License-Identifier: AGPL-3.0
 
-package main
+package ws
 
 import (
        "database/sql"
        "fmt"
-       "os"
-       "path/filepath"
        "sync"
        "time"
 
        "git.arvados.org/arvados.git/sdk/go/arvados"
+       "git.arvados.org/arvados.git/sdk/go/ctxlog"
+       "github.com/prometheus/client_golang/prometheus"
        check "gopkg.in/check.v1"
 )
 
@@ -21,7 +21,7 @@ var _ = check.Suite(&eventSourceSuite{})
 type eventSourceSuite struct{}
 
 func testDBConfig() arvados.PostgreSQLConnection {
-       cfg, err := arvados.GetConfig(filepath.Join(os.Getenv("WORKSPACE"), "tmp", "arvados.yml"))
+       cfg, err := arvados.GetConfig(arvados.DefaultConfigFile)
        if err != nil {
                panic(err)
        }
@@ -46,6 +46,8 @@ func (*eventSourceSuite) TestEventSource(c *check.C) {
        pges := &pgEventSource{
                DataSource: cfg.String(),
                QueueSize:  4,
+               Logger:     ctxlog.TestLogger(c),
+               Reg:        prometheus.NewRegistry(),
        }
        go pges.Run()
        sinks := make([]eventSink, 18)
index dc324464ec3d15f4b473b5d9b91f3557c7a90abd..4665dfcd9ee9208fcb71794189ba115d0285fa55 100644 (file)
@@ -2,7 +2,7 @@
 //
 // SPDX-License-Identifier: AGPL-3.0
 
-package main
+package ws
 
 import check "gopkg.in/check.v1"
 
index ea8dfc30c94c94e19308192c8c6713f745ce3a9b..df1ca7ab31c292280ab8a72c2f56155ef4c68e84 100644 (file)
@@ -2,7 +2,7 @@
 //
 // SPDX-License-Identifier: AGPL-3.0
 
-package main
+package ws
 
 import (
        "testing"
@@ -13,3 +13,7 @@ import (
 func TestGocheck(t *testing.T) {
        check.TestingT(t)
 }
+
+func init() {
+       testMode = true
+}
index 913b1ee8000cbd274039483df70bad7896d52df5..912643ad97c6374006b3fd4b00f90d340157d687 100644 (file)
@@ -2,7 +2,7 @@
 //
 // SPDX-License-Identifier: AGPL-3.0
 
-package main
+package ws
 
 import (
        "context"
@@ -12,6 +12,7 @@ import (
 
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "git.arvados.org/arvados.git/sdk/go/stats"
+       "github.com/sirupsen/logrus"
 )
 
 type handler struct {
@@ -31,12 +32,11 @@ type handlerStats struct {
        EventCount   uint64
 }
 
-func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsConn, chan<- interface{}) (session, error)) (hStats handlerStats) {
+func (h *handler) Handle(ws wsConn, logger logrus.FieldLogger, eventSource eventSource, newSession func(wsConn, chan<- interface{}) (session, error)) (hStats handlerStats) {
        h.setupOnce.Do(h.setup)
 
        ctx, cancel := context.WithCancel(ws.Request().Context())
        defer cancel()
-       log := logger(ctx)
 
        incoming := eventSource.NewSink()
        defer incoming.Stop()
@@ -53,7 +53,7 @@ func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC
 
        sess, err := newSession(ws, queue)
        if err != nil {
-               log.WithError(err).Error("newSession failed")
+               logger.WithError(err).Error("newSession failed")
                return
        }
 
@@ -71,19 +71,19 @@ func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC
                        ws.SetReadDeadline(time.Now().Add(24 * 365 * time.Hour))
                        n, err := ws.Read(buf)
                        buf := buf[:n]
-                       log.WithField("frame", string(buf[:n])).Debug("received frame")
+                       logger.WithField("frame", string(buf[:n])).Debug("received frame")
                        if err == nil && n == cap(buf) {
                                err = errFrameTooBig
                        }
                        if err != nil {
                                if err != io.EOF && ctx.Err() == nil {
-                                       log.WithError(err).Info("read error")
+                                       logger.WithError(err).Info("read error")
                                }
                                return
                        }
                        err = sess.Receive(buf)
                        if err != nil {
-                               log.WithError(err).Error("sess.Receive() failed")
+                               logger.WithError(err).Error("sess.Receive() failed")
                                return
                        }
                }
@@ -108,38 +108,38 @@ func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC
                        var e *event
                        var buf []byte
                        var err error
-                       log := log
+                       logger := logger
 
                        switch data := data.(type) {
                        case []byte:
                                buf = data
                        case *event:
                                e = data
-                               log = log.WithField("serial", e.Serial)
+                               logger = logger.WithField("serial", e.Serial)
                                buf, err = sess.EventMessage(e)
                                if err != nil {
-                                       log.WithError(err).Error("EventMessage failed")
+                                       logger.WithError(err).Error("EventMessage failed")
                                        return
                                } else if len(buf) == 0 {
-                                       log.Debug("skip")
+                                       logger.Debug("skip")
                                        continue
                                }
                        default:
-                               log.WithField("data", data).Error("bad object in client queue")
+                               logger.WithField("data", data).Error("bad object in client queue")
                                continue
                        }
 
-                       log.WithField("frame", string(buf)).Debug("send event")
+                       logger.WithField("frame", string(buf)).Debug("send event")
                        ws.SetWriteDeadline(time.Now().Add(h.PingTimeout))
                        t0 := time.Now()
                        _, err = ws.Write(buf)
                        if err != nil {
                                if ctx.Err() == nil {
-                                       log.WithError(err).Error("write failed")
+                                       logger.WithError(err).Error("write failed")
                                }
                                return
                        }
-                       log.Debug("sent")
+                       logger.Debug("sent")
 
                        if e != nil {
                                hStats.QueueDelayNs += t0.Sub(e.Ready)
@@ -189,7 +189,7 @@ func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC
                                select {
                                case queue <- e:
                                default:
-                                       log.WithError(errQueueFull).Error("terminate")
+                                       logger.WithError(errQueueFull).Error("terminate")
                                        return
                                }
                        }
diff --git a/services/ws/main.go b/services/ws/main.go
deleted file mode 100644 (file)
index 5b42c44..0000000
+++ /dev/null
@@ -1,77 +0,0 @@
-// Copyright (C) The Arvados Authors. All rights reserved.
-//
-// SPDX-License-Identifier: AGPL-3.0
-
-package main
-
-import (
-       "flag"
-       "fmt"
-       "os"
-
-       "git.arvados.org/arvados.git/lib/config"
-       "git.arvados.org/arvados.git/sdk/go/arvados"
-       "git.arvados.org/arvados.git/sdk/go/ctxlog"
-       "github.com/ghodss/yaml"
-       "github.com/sirupsen/logrus"
-)
-
-var logger = ctxlog.FromContext
-var version = "dev"
-
-func configure(log logrus.FieldLogger, args []string) *arvados.Cluster {
-       flags := flag.NewFlagSet(args[0], flag.ExitOnError)
-       dumpConfig := flags.Bool("dump-config", false, "show current configuration and exit")
-       getVersion := flags.Bool("version", false, "Print version information and exit.")
-
-       loader := config.NewLoader(nil, log)
-       loader.SetupFlags(flags)
-       args = loader.MungeLegacyConfigArgs(log, args[1:], "-legacy-ws-config")
-
-       flags.Parse(args)
-
-       // Print version information if requested
-       if *getVersion {
-               fmt.Printf("arvados-ws %s\n", version)
-               return nil
-       }
-
-       cfg, err := loader.Load()
-       if err != nil {
-               log.Fatal(err)
-       }
-
-       cluster, err := cfg.GetCluster("")
-       if err != nil {
-               log.Fatal(err)
-       }
-
-       ctxlog.SetLevel(cluster.SystemLogs.LogLevel)
-       ctxlog.SetFormat(cluster.SystemLogs.Format)
-
-       if *dumpConfig {
-               out, err := yaml.Marshal(cfg)
-               if err != nil {
-                       log.Fatal(err)
-               }
-               _, err = os.Stdout.Write(out)
-               if err != nil {
-                       log.Fatal(err)
-               }
-               return nil
-       }
-       return cluster
-}
-
-func main() {
-       log := logger(nil)
-
-       cluster := configure(log, os.Args)
-       if cluster == nil {
-               return
-       }
-
-       log.Printf("arvados-ws %s started", version)
-       srv := &server{cluster: cluster}
-       log.Fatal(srv.Run())
-}
index 745d28f9523f36ca83afa0b29e9511e6f98176f9..ac895f80e5fd7ae7933558fbfa6e6acb97a6c7b0 100644 (file)
@@ -2,14 +2,16 @@
 //
 // SPDX-License-Identifier: AGPL-3.0
 
-package main
+package ws
 
 import (
+       "context"
        "net/http"
        "net/url"
        "time"
 
        "git.arvados.org/arvados.git/sdk/go/arvados"
+       "git.arvados.org/arvados.git/sdk/go/ctxlog"
 )
 
 const (
@@ -19,7 +21,7 @@ const (
 
 type permChecker interface {
        SetToken(token string)
-       Check(uuid string) (bool, error)
+       Check(ctx context.Context, uuid string) (bool, error)
 }
 
 func newPermChecker(ac arvados.Client) permChecker {
@@ -54,9 +56,9 @@ func (pc *cachingPermChecker) SetToken(token string) {
        pc.cache = make(map[string]cacheEnt)
 }
 
-func (pc *cachingPermChecker) Check(uuid string) (bool, error) {
+func (pc *cachingPermChecker) Check(ctx context.Context, uuid string) (bool, error) {
        pc.nChecks++
-       logger := logger(nil).
+       logger := ctxlog.FromContext(ctx).
                WithField("token", pc.Client.AuthToken).
                WithField("uuid", uuid)
        pc.tidy()
index 5f972551ffe8ffeaa4e11ec81573ae46425591d3..023656c01fd93dc3a912283682ffc9eda59c7e6b 100644 (file)
@@ -2,9 +2,11 @@
 //
 // SPDX-License-Identifier: AGPL-3.0
 
-package main
+package ws
 
 import (
+       "context"
+
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "git.arvados.org/arvados.git/sdk/go/arvadostest"
        check "gopkg.in/check.v1"
@@ -22,19 +24,19 @@ func (s *permSuite) TestCheck(c *check.C) {
        }
        wantError := func(uuid string) {
                c.Log(uuid)
-               ok, err := pc.Check(uuid)
+               ok, err := pc.Check(context.Background(), uuid)
                c.Check(ok, check.Equals, false)
                c.Check(err, check.NotNil)
        }
        wantYes := func(uuid string) {
                c.Log(uuid)
-               ok, err := pc.Check(uuid)
+               ok, err := pc.Check(context.Background(), uuid)
                c.Check(ok, check.Equals, true)
                c.Check(err, check.IsNil)
        }
        wantNo := func(uuid string) {
                c.Log(uuid)
-               ok, err := pc.Check(uuid)
+               ok, err := pc.Check(context.Background(), uuid)
                c.Check(ok, check.Equals, false)
                c.Check(err, check.IsNil)
        }
@@ -67,7 +69,7 @@ func (s *permSuite) TestCheck(c *check.C) {
        pc.SetToken(arvadostest.ActiveToken)
 
        c.Log("...network error")
-       pc.Client.APIHost = "127.0.0.1:discard"
+       pc.Client.APIHost = "127.0.0.1:9"
        wantError(arvadostest.UserAgreementCollection)
        wantError(arvadostest.FooBarDirCollection)
 
index f8c273c5141b6f76f73b28c3c2c5d995f0df94dd..878c282f8a6c57f17192b777faba760485757b86 100644 (file)
@@ -2,13 +2,11 @@
 //
 // SPDX-License-Identifier: AGPL-3.0
 
-package main
+package ws
 
 import (
-       "encoding/json"
        "io"
        "net/http"
-       "strconv"
        "sync"
        "sync/atomic"
        "time"
@@ -16,6 +14,7 @@ import (
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "git.arvados.org/arvados.git/sdk/go/ctxlog"
        "git.arvados.org/arvados.git/sdk/go/health"
+       "github.com/prometheus/client_golang/prometheus"
        "github.com/sirupsen/logrus"
        "golang.org/x/net/websocket"
 )
@@ -28,7 +27,7 @@ type wsConn interface {
 }
 
 type router struct {
-       client         arvados.Client
+       client         *arvados.Client
        cluster        *arvados.Cluster
        eventSource    eventSource
        newPermChecker func() permChecker
@@ -36,33 +35,26 @@ type router struct {
        handler   *handler
        mux       *http.ServeMux
        setupOnce sync.Once
-
-       lastReqID  int64
-       lastReqMtx sync.Mutex
-
-       status routerDebugStatus
-}
-
-type routerDebugStatus struct {
-       ReqsReceived int64
-       ReqsActive   int64
-}
-
-type debugStatuser interface {
-       DebugStatus() interface{}
+       done      chan struct{}
+       reg       *prometheus.Registry
 }
 
 func (rtr *router) setup() {
+       mSockets := prometheus.NewGaugeVec(prometheus.GaugeOpts{
+               Namespace: "arvados",
+               Subsystem: "ws",
+               Name:      "sockets",
+               Help:      "Number of connected sockets",
+       }, []string{"version"})
+       rtr.reg.MustRegister(mSockets)
+
        rtr.handler = &handler{
                PingTimeout: time.Duration(rtr.cluster.API.SendTimeout),
                QueueSize:   rtr.cluster.API.WebsocketClientEventQueue,
        }
        rtr.mux = http.NewServeMux()
-       rtr.mux.Handle("/websocket", rtr.makeServer(newSessionV0))
-       rtr.mux.Handle("/arvados/v1/events.ws", rtr.makeServer(newSessionV1))
-       rtr.mux.Handle("/debug.json", rtr.jsonHandler(rtr.DebugStatus))
-       rtr.mux.Handle("/status.json", rtr.jsonHandler(rtr.Status))
-
+       rtr.mux.Handle("/websocket", rtr.makeServer(newSessionV0, mSockets.WithLabelValues("0")))
+       rtr.mux.Handle("/arvados/v1/events.ws", rtr.makeServer(newSessionV1, mSockets.WithLabelValues("1")))
        rtr.mux.Handle("/_health/", &health.Handler{
                Token:  rtr.cluster.ManagementToken,
                Prefix: "/_health/",
@@ -71,91 +63,50 @@ func (rtr *router) setup() {
                },
                Log: func(r *http.Request, err error) {
                        if err != nil {
-                               logger(r.Context()).WithError(err).Error("error")
+                               ctxlog.FromContext(r.Context()).WithError(err).Error("error")
                        }
                },
        })
 }
 
-func (rtr *router) makeServer(newSession sessionFactory) *websocket.Server {
+func (rtr *router) makeServer(newSession sessionFactory, gauge prometheus.Gauge) *websocket.Server {
+       var connected int64
        return &websocket.Server{
                Handshake: func(c *websocket.Config, r *http.Request) error {
                        return nil
                },
                Handler: websocket.Handler(func(ws *websocket.Conn) {
                        t0 := time.Now()
-                       log := logger(ws.Request().Context())
-                       log.Info("connected")
+                       logger := ctxlog.FromContext(ws.Request().Context())
+                       atomic.AddInt64(&connected, 1)
+                       gauge.Set(float64(atomic.LoadInt64(&connected)))
 
-                       stats := rtr.handler.Handle(ws, rtr.eventSource,
+                       stats := rtr.handler.Handle(ws, logger, rtr.eventSource,
                                func(ws wsConn, sendq chan<- interface{}) (session, error) {
-                                       return newSession(ws, sendq, rtr.eventSource.DB(), rtr.newPermChecker(), &rtr.client)
+                                       return newSession(ws, sendq, rtr.eventSource.DB(), rtr.newPermChecker(), rtr.client)
                                })
 
-                       log.WithFields(logrus.Fields{
+                       logger.WithFields(logrus.Fields{
                                "elapsed": time.Now().Sub(t0).Seconds(),
                                "stats":   stats,
-                       }).Info("disconnect")
+                       }).Info("client disconnected")
                        ws.Close()
+                       atomic.AddInt64(&connected, -1)
+                       gauge.Set(float64(atomic.LoadInt64(&connected)))
                }),
        }
 }
 
-func (rtr *router) newReqID() string {
-       rtr.lastReqMtx.Lock()
-       defer rtr.lastReqMtx.Unlock()
-       id := time.Now().UnixNano()
-       if id <= rtr.lastReqID {
-               id = rtr.lastReqID + 1
-       }
-       return strconv.FormatInt(id, 36)
-}
-
-func (rtr *router) DebugStatus() interface{} {
-       s := map[string]interface{}{
-               "HTTP":     rtr.status,
-               "Outgoing": rtr.handler.DebugStatus(),
-       }
-       if es, ok := rtr.eventSource.(debugStatuser); ok {
-               s["EventSource"] = es.DebugStatus()
-       }
-       return s
-}
-
-func (rtr *router) Status() interface{} {
-       return map[string]interface{}{
-               "Clients": atomic.LoadInt64(&rtr.status.ReqsActive),
-               "Version": version,
-       }
-}
-
 func (rtr *router) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
        rtr.setupOnce.Do(rtr.setup)
-       atomic.AddInt64(&rtr.status.ReqsReceived, 1)
-       atomic.AddInt64(&rtr.status.ReqsActive, 1)
-       defer atomic.AddInt64(&rtr.status.ReqsActive, -1)
-
-       logger := logger(req.Context()).
-               WithField("RequestID", rtr.newReqID())
-       ctx := ctxlog.Context(req.Context(), logger)
-       req = req.WithContext(ctx)
-       logger.WithFields(logrus.Fields{
-               "remoteAddr":      req.RemoteAddr,
-               "reqForwardedFor": req.Header.Get("X-Forwarded-For"),
-       }).Info("accept request")
        rtr.mux.ServeHTTP(resp, req)
 }
 
-func (rtr *router) jsonHandler(fn func() interface{}) http.Handler {
-       return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
-               logger := logger(r.Context())
-               w.Header().Set("Content-Type", "application/json")
-               enc := json.NewEncoder(w)
-               err := enc.Encode(fn())
-               if err != nil {
-                       msg := "encode failed"
-                       logger.WithError(err).Error(msg)
-                       http.Error(w, msg, http.StatusInternalServerError)
-               }
-       })
+func (rtr *router) CheckHealth() error {
+       rtr.setupOnce.Do(rtr.setup)
+       return rtr.eventSource.DBHealth()
+}
+
+func (rtr *router) Done() <-chan struct{} {
+       return rtr.done
 }
diff --git a/services/ws/server.go b/services/ws/server.go
deleted file mode 100644 (file)
index 9747ea1..0000000
+++ /dev/null
@@ -1,89 +0,0 @@
-// Copyright (C) The Arvados Authors. All rights reserved.
-//
-// SPDX-License-Identifier: AGPL-3.0
-
-package main
-
-import (
-       "net"
-       "net/http"
-       "sync"
-       "time"
-
-       "git.arvados.org/arvados.git/sdk/go/arvados"
-       "github.com/coreos/go-systemd/daemon"
-)
-
-type server struct {
-       httpServer  *http.Server
-       listener    net.Listener
-       cluster     *arvados.Cluster
-       eventSource *pgEventSource
-       setupOnce   sync.Once
-}
-
-func (srv *server) Close() {
-       srv.WaitReady()
-       srv.eventSource.Close()
-       srv.httpServer.Close()
-       srv.listener.Close()
-}
-
-func (srv *server) WaitReady() {
-       srv.setupOnce.Do(srv.setup)
-       srv.eventSource.WaitReady()
-}
-
-func (srv *server) Run() error {
-       srv.setupOnce.Do(srv.setup)
-       return srv.httpServer.Serve(srv.listener)
-}
-
-func (srv *server) setup() {
-       log := logger(nil)
-
-       var listen arvados.URL
-       for listen, _ = range srv.cluster.Services.Websocket.InternalURLs {
-               break
-       }
-       ln, err := net.Listen("tcp", listen.Host)
-       if err != nil {
-               log.WithField("Listen", listen).Fatal(err)
-       }
-       log.WithField("Listen", ln.Addr().String()).Info("listening")
-
-       client := arvados.Client{}
-       client.APIHost = srv.cluster.Services.Controller.ExternalURL.Host
-       client.AuthToken = srv.cluster.SystemRootToken
-       client.Insecure = srv.cluster.TLS.Insecure
-
-       srv.listener = ln
-       srv.eventSource = &pgEventSource{
-               DataSource:   srv.cluster.PostgreSQL.Connection.String(),
-               MaxOpenConns: srv.cluster.PostgreSQL.ConnectionPool,
-               QueueSize:    srv.cluster.API.WebsocketServerEventQueue,
-       }
-
-       srv.httpServer = &http.Server{
-               Addr:           listen.Host,
-               ReadTimeout:    time.Minute,
-               WriteTimeout:   time.Minute,
-               MaxHeaderBytes: 1 << 20,
-               Handler: &router{
-                       cluster:        srv.cluster,
-                       client:         client,
-                       eventSource:    srv.eventSource,
-                       newPermChecker: func() permChecker { return newPermChecker(client) },
-               },
-       }
-
-       go func() {
-               srv.eventSource.Run()
-               log.Info("event source stopped")
-               srv.Close()
-       }()
-
-       if _, err := daemon.SdNotify(false, "READY=1"); err != nil {
-               log.WithError(err).Warn("error notifying init daemon")
-       }
-}
diff --git a/services/ws/service.go b/services/ws/service.go
new file mode 100644 (file)
index 0000000..761e22e
--- /dev/null
@@ -0,0 +1,53 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package ws
+
+import (
+       "context"
+       "fmt"
+
+       "git.arvados.org/arvados.git/lib/cmd"
+       "git.arvados.org/arvados.git/lib/service"
+       "git.arvados.org/arvados.git/sdk/go/arvados"
+       "git.arvados.org/arvados.git/sdk/go/ctxlog"
+       "github.com/prometheus/client_golang/prometheus"
+)
+
+var testMode = false
+
+var Command cmd.Handler = service.Command(arvados.ServiceNameWebsocket, newHandler)
+
+func newHandler(ctx context.Context, cluster *arvados.Cluster, token string, reg *prometheus.Registry) service.Handler {
+       client, err := arvados.NewClientFromConfig(cluster)
+       if err != nil {
+               return service.ErrorHandler(ctx, cluster, fmt.Errorf("error initializing client from cluster config: %s", err))
+       }
+       eventSource := &pgEventSource{
+               DataSource:   cluster.PostgreSQL.Connection.String(),
+               MaxOpenConns: cluster.PostgreSQL.ConnectionPool,
+               QueueSize:    cluster.API.WebsocketServerEventQueue,
+               Logger:       ctxlog.FromContext(ctx),
+               Reg:          reg,
+       }
+       done := make(chan struct{})
+       go func() {
+               eventSource.Run()
+               ctxlog.FromContext(ctx).Error("event source stopped")
+               close(done)
+       }()
+       eventSource.WaitReady()
+       if err := eventSource.DBHealth(); err != nil {
+               return service.ErrorHandler(ctx, cluster, err)
+       }
+       rtr := &router{
+               cluster:        cluster,
+               client:         client,
+               eventSource:    eventSource,
+               newPermChecker: func() permChecker { return newPermChecker(*client) },
+               done:           done,
+               reg:            reg,
+       }
+       return rtr
+}
similarity index 55%
rename from services/ws/server_test.go
rename to services/ws/service_test.go
index 88279ec9b2de83cd28bc191815bd1fa274cfec80..7213dcad2a9ddbb967991d70a2f9b094ce317b98 100644 (file)
@@ -2,39 +2,61 @@
 //
 // SPDX-License-Identifier: AGPL-3.0
 
-package main
+package ws
 
 import (
-       "encoding/json"
+       "bytes"
+       "context"
+       "flag"
        "io/ioutil"
        "net/http"
+       "net/http/httptest"
        "os"
+       "strings"
        "sync"
        "time"
 
        "git.arvados.org/arvados.git/lib/config"
+       "git.arvados.org/arvados.git/lib/service"
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "git.arvados.org/arvados.git/sdk/go/arvadostest"
        "git.arvados.org/arvados.git/sdk/go/ctxlog"
+       "git.arvados.org/arvados.git/sdk/go/httpserver"
+       "github.com/prometheus/client_golang/prometheus"
+       "github.com/sirupsen/logrus"
        check "gopkg.in/check.v1"
 )
 
-var _ = check.Suite(&serverSuite{})
+var _ = check.Suite(&serviceSuite{})
 
-type serverSuite struct {
+type serviceSuite struct {
+       handler service.Handler
+       reg     *prometheus.Registry
+       srv     *httptest.Server
        cluster *arvados.Cluster
-       srv     *server
        wg      sync.WaitGroup
 }
 
-func (s *serverSuite) SetUpTest(c *check.C) {
+func (s *serviceSuite) SetUpTest(c *check.C) {
        var err error
        s.cluster, err = s.testConfig(c)
        c.Assert(err, check.IsNil)
-       s.srv = &server{cluster: s.cluster}
 }
 
-func (*serverSuite) testConfig(c *check.C) (*arvados.Cluster, error) {
+func (s *serviceSuite) start(c *check.C) {
+       s.reg = prometheus.NewRegistry()
+       s.handler = newHandler(context.Background(), s.cluster, "", s.reg)
+       instrumented := httpserver.Instrument(s.reg, ctxlog.TestLogger(c), s.handler)
+       s.srv = httptest.NewServer(instrumented.ServeAPI(s.cluster.ManagementToken, instrumented))
+}
+
+func (s *serviceSuite) TearDownTest(c *check.C) {
+       if s.srv != nil {
+               s.srv.Close()
+       }
+}
+
+func (*serviceSuite) testConfig(c *check.C) (*arvados.Cluster, error) {
        ldr := config.NewLoader(nil, ctxlog.TestLogger(c))
        cfg, err := ldr.Load()
        if err != nil {
@@ -49,47 +71,30 @@ func (*serverSuite) testConfig(c *check.C) (*arvados.Cluster, error) {
        cluster.SystemRootToken = client.AuthToken
        cluster.TLS.Insecure = client.Insecure
        cluster.PostgreSQL.Connection = testDBConfig()
+       cluster.PostgreSQL.ConnectionPool = 12
        cluster.Services.Websocket.InternalURLs = map[arvados.URL]arvados.ServiceInstance{arvados.URL{Host: ":"}: arvados.ServiceInstance{}}
        cluster.ManagementToken = arvadostest.ManagementToken
        return cluster, nil
 }
 
-// TestBadDB ensures Run() returns an error (instead of panicking or
-// deadlocking) if it can't connect to the database server at startup.
-func (s *serverSuite) TestBadDB(c *check.C) {
+// TestBadDB ensures the server returns an error (instead of panicking
+// or deadlocking) if it can't connect to the database server at
+// startup.
+func (s *serviceSuite) TestBadDB(c *check.C) {
        s.cluster.PostgreSQL.Connection["password"] = "1234"
-
-       var wg sync.WaitGroup
-       wg.Add(1)
-       go func() {
-               err := s.srv.Run()
-               c.Check(err, check.NotNil)
-               wg.Done()
-       }()
-       wg.Add(1)
-       go func() {
-               s.srv.WaitReady()
-               wg.Done()
-       }()
-
-       done := make(chan bool)
-       go func() {
-               wg.Wait()
-               close(done)
-       }()
-       select {
-       case <-done:
-       case <-time.After(10 * time.Second):
-               c.Fatal("timeout")
-       }
+       s.start(c)
+       resp, err := http.Get(s.srv.URL)
+       c.Check(err, check.IsNil)
+       c.Check(resp.StatusCode, check.Equals, http.StatusInternalServerError)
+       c.Check(s.handler.CheckHealth(), check.ErrorMatches, "database not connected")
+       c.Check(err, check.IsNil)
+       c.Check(resp.StatusCode, check.Equals, http.StatusInternalServerError)
 }
 
-func (s *serverSuite) TestHealth(c *check.C) {
-       go s.srv.Run()
-       defer s.srv.Close()
-       s.srv.WaitReady()
+func (s *serviceSuite) TestHealth(c *check.C) {
+       s.start(c)
        for _, token := range []string{"", "foo", s.cluster.ManagementToken} {
-               req, err := http.NewRequest("GET", "http://"+s.srv.listener.Addr().String()+"/_health/ping", nil)
+               req, err := http.NewRequest("GET", s.srv.URL+"/_health/ping", nil)
                c.Assert(err, check.IsNil)
                if token != "" {
                        req.Header.Add("Authorization", "Bearer "+token)
@@ -107,30 +112,38 @@ func (s *serverSuite) TestHealth(c *check.C) {
        }
 }
 
-func (s *serverSuite) TestStatus(c *check.C) {
-       go s.srv.Run()
-       defer s.srv.Close()
-       s.srv.WaitReady()
-       req, err := http.NewRequest("GET", "http://"+s.srv.listener.Addr().String()+"/status.json", nil)
-       c.Assert(err, check.IsNil)
-       resp, err := http.DefaultClient.Do(req)
-       c.Check(err, check.IsNil)
-       c.Check(resp.StatusCode, check.Equals, http.StatusOK)
-       var status map[string]interface{}
-       err = json.NewDecoder(resp.Body).Decode(&status)
-       c.Check(err, check.IsNil)
-       c.Check(status["Version"], check.Not(check.Equals), "")
+func (s *serviceSuite) TestMetrics(c *check.C) {
+       s.start(c)
+       s.handler.CheckHealth()
+       for deadline := time.Now().Add(time.Second); ; {
+               req, err := http.NewRequest("GET", s.srv.URL+"/metrics", nil)
+               c.Assert(err, check.IsNil)
+               req.Header.Set("Authorization", "Bearer "+s.cluster.ManagementToken)
+               resp, err := http.DefaultClient.Do(req)
+               c.Check(err, check.IsNil)
+               c.Check(resp.StatusCode, check.Equals, http.StatusOK)
+               text, err := ioutil.ReadAll(resp.Body)
+               c.Check(err, check.IsNil)
+               if strings.Contains(string(text), "_db_max_connections 0\n") {
+                       // wait for the first db stats update
+                       if time.Now().After(deadline) {
+                               c.Fatal("timed out")
+                       }
+                       time.Sleep(time.Second / 50)
+                       continue
+               }
+               c.Check(string(text), check.Matches, `(?ms).*\narvados_ws_db_max_connections 12\n.*`)
+               c.Check(string(text), check.Matches, `(?ms).*\narvados_ws_db_open_connections\{inuse="0"\} \d+\n.*`)
+               c.Check(string(text), check.Matches, `(?ms).*\narvados_ws_db_open_connections\{inuse="1"\} \d+\n.*`)
+               break
+       }
 }
 
-func (s *serverSuite) TestHealthDisabled(c *check.C) {
+func (s *serviceSuite) TestHealthDisabled(c *check.C) {
        s.cluster.ManagementToken = ""
-
-       go s.srv.Run()
-       defer s.srv.Close()
-       s.srv.WaitReady()
-
+       s.start(c)
        for _, token := range []string{"", "foo", arvadostest.ManagementToken} {
-               req, err := http.NewRequest("GET", "http://"+s.srv.listener.Addr().String()+"/_health/ping", nil)
+               req, err := http.NewRequest("GET", s.srv.URL+"/_health/ping", nil)
                c.Assert(err, check.IsNil)
                req.Header.Add("Authorization", "Bearer "+token)
                resp, err := http.DefaultClient.Do(req)
@@ -139,7 +152,7 @@ func (s *serverSuite) TestHealthDisabled(c *check.C) {
        }
 }
 
-func (s *serverSuite) TestLoadLegacyConfig(c *check.C) {
+func (s *serviceSuite) TestLoadLegacyConfig(c *check.C) {
        content := []byte(`
 Client:
   APIHost: example.com
@@ -175,7 +188,14 @@ ManagementToken: qqqqq
                c.Error(err)
 
        }
-       cluster := configure(logger(nil), []string{"arvados-ws", "-config", tmpfile.Name()})
+       ldr := config.NewLoader(&bytes.Buffer{}, logrus.New())
+       flagset := flag.NewFlagSet("", flag.ContinueOnError)
+       ldr.SetupFlags(flagset)
+       flagset.Parse(ldr.MungeLegacyConfigArgs(ctxlog.TestLogger(c), []string{"-config", tmpfile.Name()}, "-legacy-ws-config"))
+       cfg, err := ldr.Load()
+       c.Check(err, check.IsNil)
+       cluster, err := cfg.GetCluster("")
+       c.Check(err, check.IsNil)
        c.Check(cluster, check.NotNil)
 
        c.Check(cluster.Services.Controller.ExternalURL, check.Equals, arvados.URL{Scheme: "https", Host: "example.com"})
index 53b02146d560fe3eb4d045227277d60a8c6e072b..c0cfbd6d02f6ff37083f426c85084effae45f212 100644 (file)
@@ -2,7 +2,7 @@
 //
 // SPDX-License-Identifier: AGPL-3.0
 
-package main
+package ws
 
 import (
        "database/sql"
index b0f40371ffeb0ba12c5d3d1e1326d320fb6dbb51..309352b39edbd329aa031ec0c6194791341acec9 100644 (file)
@@ -2,7 +2,7 @@
 //
 // SPDX-License-Identifier: AGPL-3.0
 
-package main
+package ws
 
 import (
        "database/sql"
@@ -14,6 +14,7 @@ import (
        "time"
 
        "git.arvados.org/arvados.git/sdk/go/arvados"
+       "git.arvados.org/arvados.git/sdk/go/ctxlog"
        "github.com/sirupsen/logrus"
 )
 
@@ -59,7 +60,7 @@ func newSessionV0(ws wsConn, sendq chan<- interface{}, db *sql.DB, pc permChecke
                db:          db,
                ac:          ac,
                permChecker: pc,
-               log:         logger(ws.Request().Context()),
+               log:         ctxlog.FromContext(ws.Request().Context()),
        }
 
        err := ws.Request().ParseForm()
@@ -128,7 +129,7 @@ func (sess *v0session) EventMessage(e *event) ([]byte, error) {
        } else {
                permTarget = detail.ObjectUUID
        }
-       ok, err := sess.permChecker.Check(permTarget)
+       ok, err := sess.permChecker.Check(sess.ws.Request().Context(), permTarget)
        if err != nil || !ok {
                return nil, err
        }
index bd70b44459dd79b5f22b0c08074b2d4bf480d76f..7986cc7b08f95598ae4756be0aa1ca3dea2e2f7b 100644 (file)
@@ -2,7 +2,7 @@
 //
 // SPDX-License-Identifier: AGPL-3.0
 
-package main
+package ws
 
 import (
        "bytes"
@@ -11,6 +11,7 @@ import (
        "io"
        "net/url"
        "os"
+       "strings"
        "sync"
        "time"
 
@@ -30,17 +31,16 @@ func init() {
 var _ = check.Suite(&v0Suite{})
 
 type v0Suite struct {
-       serverSuite serverSuite
-       token       string
-       toDelete    []string
-       wg          sync.WaitGroup
-       ignoreLogID uint64
+       serviceSuite serviceSuite
+       token        string
+       toDelete     []string
+       wg           sync.WaitGroup
+       ignoreLogID  uint64
 }
 
 func (s *v0Suite) SetUpTest(c *check.C) {
-       s.serverSuite.SetUpTest(c)
-       go s.serverSuite.srv.Run()
-       s.serverSuite.srv.WaitReady()
+       s.serviceSuite.SetUpTest(c)
+       s.serviceSuite.start(c)
 
        s.token = arvadostest.ActiveToken
        s.ignoreLogID = s.lastLogID(c)
@@ -48,7 +48,7 @@ func (s *v0Suite) SetUpTest(c *check.C) {
 
 func (s *v0Suite) TearDownTest(c *check.C) {
        s.wg.Wait()
-       s.serverSuite.srv.Close()
+       s.serviceSuite.TearDownTest(c)
 }
 
 func (s *v0Suite) TearDownSuite(c *check.C) {
@@ -353,8 +353,8 @@ func (s *v0Suite) expectLog(c *check.C, r *json.Decoder) *arvados.Log {
 }
 
 func (s *v0Suite) testClient() (*websocket.Conn, *json.Decoder, *json.Encoder) {
-       srv := s.serverSuite.srv
-       conn, err := websocket.Dial("ws://"+srv.listener.Addr().String()+"/websocket?api_token="+s.token, "", "http://"+srv.listener.Addr().String())
+       srv := s.serviceSuite.srv
+       conn, err := websocket.Dial(strings.Replace(srv.URL, "http", "ws", 1)+"/websocket?api_token="+s.token, "", srv.URL)
        if err != nil {
                panic(err)
        }
index 58f77df430201f79e71f66209711a740dff8a016..60b980d58e2f8f8a9acc67362deb7d7beff21350 100644 (file)
@@ -2,7 +2,7 @@
 //
 // SPDX-License-Identifier: AGPL-3.0
 
-package main
+package ws
 
 import (
        "database/sql"
index d2585a6666c270de61514d007257d648eecd2287..efa2e08a7a7f34c3a04ee4c213931ed37a4f65ab 100755 (executable)
@@ -7,34 +7,14 @@ exec 2>&1
 set -ex -o pipefail
 
 . /usr/local/lib/arvbox/common.sh
-
-if test -s /var/lib/arvados/api_rails_env ; then
-  RAILS_ENV=$(cat /var/lib/arvados/api_rails_env)
-else
-  RAILS_ENV=development
-fi
-
 . /usr/local/lib/arvbox/go-setup.sh
 
-flock /var/lib/gopath/gopath.lock go install "git.arvados.org/arvados.git/services/ws"
-install $GOPATH/bin/ws /usr/local/bin/arvados-ws
+(cd /usr/local/bin && ln -sf arvados-server arvados-ws)
 
 if test "$1" = "--only-deps" ; then
     exit
 fi
 
-database_pw=$(cat /var/lib/arvados/api_database_pw)
-
-cat >/var/lib/arvados/arvados-ws.yml <<EOF
-Client:
-  APIHost: $localip:${services[controller-ssl]}
-  Insecure: false
-Postgres:
-  dbname: arvados_$RAILS_ENV
-  user: arvados
-  password: $database_pw
-  host: localhost
-Listen: localhost:${services[websockets]}
-EOF
+/usr/local/lib/arvbox/runsu.sh flock /var/lib/arvados/cluster_config.yml.lock /usr/local/lib/arvbox/cluster-config.sh
 
-exec /usr/local/bin/arvados-ws -config /var/lib/arvados/arvados-ws.yml
+exec /usr/local/lib/arvbox/runsu.sh /usr/local/bin/arvados-ws