"Keep storage daemon, accessible to clients on the LAN"
package_go_binary services/keep-web keep-web \
"Static web hosting service for user data stored in Arvados Keep"
-package_go_binary services/ws arvados-ws \
+package_go_binary cmd/arvados-server arvados-ws \
"Arvados Websocket server"
package_go_binary tools/sync-groups arvados-sync-groups \
"Synchronize remote groups into Arvados from an external source"
Description=Arvados websocket server
Documentation=https://doc.arvados.org/
After=network.target
+AssertPathExists=/etc/arvados/config.yml
# systemd==229 (ubuntu:xenial) obeys StartLimitInterval in the [Unit] section
StartLimitInterval=0
"git.arvados.org/arvados.git/lib/crunchrun"
"git.arvados.org/arvados.git/lib/dispatchcloud"
"git.arvados.org/arvados.git/lib/install"
+ "git.arvados.org/arvados.git/services/ws"
)
var (
"boot": boot.Command,
"cloudtest": cloudtest.Command,
"config-check": config.CheckCommand,
- "config-dump": config.DumpCommand,
"config-defaults": config.DumpDefaultsCommand,
+ "config-dump": config.DumpCommand,
"controller": controller.Command,
"crunch-run": crunchrun.Command,
"dispatch-cloud": dispatchcloud.Command,
"install": install.Command,
+ "ws": ws.Command,
})
)
|arvados-dispatch-cloud|✓|
|arvados-git-httpd||
|arvados-node-manager||
-|arvados-ws||
+|arvados-ws|✓|
|composer||
|keepproxy||
|keepstore|✓|
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550
golang.org/x/net v0.0.0-20190620200207-3b0461eec859
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45
- golang.org/x/sys v0.0.0-20191105231009-c1f44814a5cd // indirect
+ golang.org/x/sys v0.0.0-20191105231009-c1f44814a5cd
google.golang.org/api v0.13.0
gopkg.in/check.v1 v1.0.0-20161208181325-20d25e280405
gopkg.in/square/go-jose.v2 v2.3.1
runGoProgram{src: "services/keepproxy", svc: super.cluster.Services.Keepproxy, depends: []supervisedTask{runPassenger{src: "services/api"}}},
runGoProgram{src: "services/keepstore", svc: super.cluster.Services.Keepstore},
runGoProgram{src: "services/keep-web", svc: super.cluster.Services.WebDAV},
- runGoProgram{src: "services/ws", svc: super.cluster.Services.Websocket, depends: []supervisedTask{runPostgreSQL{}}},
+ runServiceCommand{name: "ws", svc: super.cluster.Services.Websocket, depends: []supervisedTask{runPostgreSQL{}}},
installPassenger{src: "services/api"},
runPassenger{src: "services/api", svc: super.cluster.Services.RailsAPI, depends: []supervisedTask{createCertificates{}, runPostgreSQL{}, installPassenger{src: "services/api"}}},
installPassenger{src: "apps/workbench", depends: []supervisedTask{installPassenger{src: "services/api"}}}, // dependency ensures workbench doesn't delay api startup
return err
}
+func (h *Handler) Done() <-chan struct{} {
+ return nil
+}
+
func neverRedirect(*http.Request, []*http.Request) error { return http.ErrUseLastResponse }
func (h *Handler) setup() {
return disp.pool.CheckHealth()
}
+// Done implements service.Handler.
+func (disp *dispatcher) Done() <-chan struct{} {
+ return disp.stopped
+}
+
// Stop dispatching containers and release resources. Typically used
// in tests.
func (disp *dispatcher) Close() {
type Handler interface {
http.Handler
CheckHealth() error
+ Done() <-chan struct{}
}
type NewHandlerFunc func(_ context.Context, _ *arvados.Cluster, token string, registry *prometheus.Registry) Handler
logger.WithError(err).Errorf("error notifying init daemon")
}
go func() {
+ // Shut down server if caller cancels context
<-ctx.Done()
srv.Close()
}()
+ go func() {
+ // Shut down server if handler dies
+ <-handler.Done()
+ srv.Close()
+ }()
err = srv.Wait()
if err != nil {
return 1
healthCheck chan bool
}
+func (th *testHandler) Done() <-chan struct{} { return nil }
func (th *testHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { th.handler.ServeHTTP(w, r) }
func (th *testHandler) CheckHealth() error {
ctxlog.FromContext(th.ctx).Info("CheckHealth called")
func (eh errorHandler) CheckHealth() error {
return eh.err
}
+
+// Done returns a closed channel to indicate the service has
+// stopped/failed.
+func (eh errorHandler) Done() <-chan struct{} {
+ return doneChannel
+}
+
+var doneChannel = func() <-chan struct{} {
+ done := make(chan struct{})
+ close(done)
+ return done
+}()
// UnmarshalJSON handles old config files that provide an array of
// instance types instead of a hash.
func (it *InstanceTypeMap) UnmarshalJSON(data []byte) error {
+ fixup := func(t InstanceType) (InstanceType, error) {
+ if t.ProviderType == "" {
+ t.ProviderType = t.Name
+ }
+ if t.Scratch == 0 {
+ t.Scratch = t.IncludedScratch + t.AddedScratch
+ } else if t.AddedScratch == 0 {
+ t.AddedScratch = t.Scratch - t.IncludedScratch
+ } else if t.IncludedScratch == 0 {
+ t.IncludedScratch = t.Scratch - t.AddedScratch
+ }
+
+ if t.Scratch != (t.IncludedScratch + t.AddedScratch) {
+ return t, fmt.Errorf("InstanceType %q: Scratch != (IncludedScratch + AddedScratch)", t.Name)
+ }
+ return t, nil
+ }
+
if len(data) > 0 && data[0] == '[' {
var arr []InstanceType
err := json.Unmarshal(data, &arr)
if _, ok := (*it)[t.Name]; ok {
return errDuplicateInstanceTypeName
}
- if t.ProviderType == "" {
- t.ProviderType = t.Name
- }
- if t.Scratch == 0 {
- t.Scratch = t.IncludedScratch + t.AddedScratch
- } else if t.AddedScratch == 0 {
- t.AddedScratch = t.Scratch - t.IncludedScratch
- } else if t.IncludedScratch == 0 {
- t.IncludedScratch = t.Scratch - t.AddedScratch
- }
-
- if t.Scratch != (t.IncludedScratch + t.AddedScratch) {
- return fmt.Errorf("%v: Scratch != (IncludedScratch + AddedScratch)", t.Name)
+ t, err := fixup(t)
+ if err != nil {
+ return err
}
(*it)[t.Name] = t
}
*it = InstanceTypeMap(hash)
for name, t := range *it {
t.Name = name
- if t.ProviderType == "" {
- t.ProviderType = name
+ t, err := fixup(t)
+ if err != nil {
+ return err
}
(*it)[name] = t
}
c.Check(int64(it.Scratch), check.Equals, int64(4000000000))
c.Check(int64(it.RAM), check.Equals, int64(4294967296))
}
+
+func (s *ConfigSuite) TestInstanceTypeFixup(c *check.C) {
+ for _, confdata := range []string{
+ // Current format: map of entries
+ `{foo4: {IncludedScratch: 4GB}, foo8: {ProviderType: foo_8, Scratch: 8GB}}`,
+ // Legacy format: array of entries with key in "Name" field
+ `[{Name: foo4, IncludedScratch: 4GB}, {Name: foo8, ProviderType: foo_8, Scratch: 8GB}]`,
+ } {
+ c.Log(confdata)
+ var itm InstanceTypeMap
+ err := yaml.Unmarshal([]byte(confdata), &itm)
+ c.Check(err, check.IsNil)
+
+ c.Check(itm["foo4"].Name, check.Equals, "foo4")
+ c.Check(itm["foo4"].ProviderType, check.Equals, "foo4")
+ c.Check(itm["foo4"].Scratch, check.Equals, ByteSize(4000000000))
+ c.Check(itm["foo4"].AddedScratch, check.Equals, ByteSize(0))
+ c.Check(itm["foo4"].IncludedScratch, check.Equals, ByteSize(4000000000))
+
+ c.Check(itm["foo8"].Name, check.Equals, "foo8")
+ c.Check(itm["foo8"].ProviderType, check.Equals, "foo_8")
+ c.Check(itm["foo8"].Scratch, check.Equals, ByteSize(8000000000))
+ c.Check(itm["foo8"].AddedScratch, check.Equals, ByteSize(8000000000))
+ c.Check(itm["foo8"].IncludedScratch, check.Equals, ByteSize(0))
+ }
+}
return nil
}
+func (agg *Aggregator) Done() <-chan struct{} {
+ return nil
+}
+
func (agg *Aggregator) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
agg.setupOnce.Do(agg.setup)
sendErr := func(statusCode int, err error) {
logRequest(w, req, lgr)
defer logResponse(w, req, lgr)
- h.ServeHTTP(w, req)
+ h.ServeHTTP(rewrapResponseWriter(w, wrapped), req)
})
}
+// Rewrap w to restore additional interfaces provided by wrapped.
+func rewrapResponseWriter(w http.ResponseWriter, wrapped http.ResponseWriter) http.ResponseWriter {
+ if hijacker, ok := wrapped.(http.Hijacker); ok {
+ return struct {
+ http.ResponseWriter
+ http.Hijacker
+ }{w, hijacker}
+ } else {
+ return w
+ }
+}
+
func Logger(req *http.Request) logrus.FieldLogger {
return ctxlog.FromContext(req.Context())
}
stop_ws()
port = internal_port_from_config("Websocket")
logf = open(_logfilename('ws'), 'a')
- ws = subprocess.Popen(["ws"],
+ ws = subprocess.Popen(
+ ["arvados-server", "ws"],
stdin=open('/dev/null'), stdout=logf, stderr=logf, close_fds=True)
with open(_pidfile('ws'), 'w') as f:
f.write(str(ws.pid))
return nil
}
+// Done implements service.Handler.
+func (srv *Server) Done() <-chan struct{} {
+ return nil
+}
+
func (srv *Server) run() {
var err error
if srv.RunOptions.Once {
return h.err
}
+func (h *handler) Done() <-chan struct{} {
+ return nil
+}
+
func newHandlerOrErrorHandler(ctx context.Context, cluster *arvados.Cluster, token string, reg *prometheus.Registry) service.Handler {
var h handler
serviceURL, ok := service.URLFromContext(ctx)
// Developer info
//
// See https://dev.arvados.org/projects/arvados/wiki/Hacking_websocket_server.
-//
-// Usage
-//
-// arvados-ws [-legacy-ws-config /etc/arvados/ws/ws.yml] [-dump-config]
-//
-// Options
-//
-// -legacy-ws-config path
-//
-// Load legacy configuration from the given file instead of the default
-// /etc/arvados/ws/ws.yml, legacy config overrides the clusterwide config.yml.
-//
-// -dump-config
-//
-// Print the loaded configuration to stdout and exit.
-//
-// Logs
-//
-// Logs are printed to stderr, formatted as JSON.
-//
-// A log is printed each time a client connects or disconnects.
-//
-// Enable additional logs by configuring:
-//
-// LogLevel: debug
-//
-// Runtime status
-//
-// GET /debug.json responds with debug stats.
-//
-// GET /status.json responds with health check results and
-// activity/usage metrics.
-package main
+package ws
//
// SPDX-License-Identifier: AGPL-3.0
-package main
+package ws
import (
"database/sql"
"git.arvados.org/arvados.git/sdk/go/arvados"
"github.com/ghodss/yaml"
+ "github.com/sirupsen/logrus"
)
type eventSink interface {
Serial uint64
db *sql.DB
+ logger logrus.FieldLogger
logRow *arvados.Log
err error
mtx sync.Mutex
&logRow.CreatedAt,
&propYAML)
if e.err != nil {
- logger(nil).WithField("LogID", e.LogID).WithError(e.err).Error("QueryRow failed")
+ e.logger.WithField("LogID", e.LogID).WithError(e.err).Error("QueryRow failed")
return nil
}
e.err = yaml.Unmarshal(propYAML, &logRow.Properties)
if e.err != nil {
- logger(nil).WithField("LogID", e.LogID).WithError(e.err).Error("yaml decode failed")
+ e.logger.WithField("LogID", e.LogID).WithError(e.err).Error("yaml decode failed")
return nil
}
e.logRow = &logRow
//
// SPDX-License-Identifier: AGPL-3.0
-package main
+package ws
import (
"context"
"fmt"
"strconv"
"sync"
- "sync/atomic"
"time"
"git.arvados.org/arvados.git/sdk/go/stats"
"github.com/lib/pq"
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/sirupsen/logrus"
)
type pgEventSource struct {
DataSource string
MaxOpenConns int
QueueSize int
+ Logger logrus.FieldLogger
+ Reg *prometheus.Registry
db *sql.DB
pqListener *pq.Listener
mtx sync.Mutex
lastQDelay time.Duration
- eventsIn uint64
- eventsOut uint64
+ eventsIn prometheus.Counter
+ eventsOut prometheus.Counter
cancel func()
ready chan bool
}
-var _ debugStatuser = (*pgEventSource)(nil)
-
func (ps *pgEventSource) listenerProblem(et pq.ListenerEventType, err error) {
if et == pq.ListenerEventConnected {
- logger(nil).Debug("pgEventSource connected")
+ ps.Logger.Debug("pgEventSource connected")
return
}
// Until we have a mechanism for catching up on missed events,
// we cannot recover from a dropped connection without
// breaking our promises to clients.
- logger(nil).
+ ps.Logger.
WithField("eventType", et).
WithError(err).
Error("listener problem")
func (ps *pgEventSource) setup() {
ps.ready = make(chan bool)
+ ps.Reg.MustRegister(prometheus.NewGaugeFunc(
+ prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Subsystem: "ws",
+ Name: "queue_len",
+ Help: "Current number of events in queue",
+ }, func() float64 { return float64(len(ps.queue)) }))
+ ps.Reg.MustRegister(prometheus.NewGaugeFunc(
+ prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Subsystem: "ws",
+ Name: "queue_cap",
+ Help: "Event queue capacity",
+ }, func() float64 { return float64(cap(ps.queue)) }))
+ ps.Reg.MustRegister(prometheus.NewGaugeFunc(
+ prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Subsystem: "ws",
+ Name: "queue_delay",
+ Help: "Queue delay of the last emitted event",
+ }, func() float64 { return ps.lastQDelay.Seconds() }))
+ ps.Reg.MustRegister(prometheus.NewGaugeFunc(
+ prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Subsystem: "ws",
+ Name: "sinks",
+ Help: "Number of active sinks (connections)",
+ }, func() float64 { return float64(len(ps.sinks)) }))
+ ps.Reg.MustRegister(prometheus.NewGaugeFunc(
+ prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Subsystem: "ws",
+ Name: "sinks_blocked",
+ Help: "Number of sinks (connections) that are busy and blocking the main event stream",
+ }, func() float64 {
+ ps.mtx.Lock()
+ defer ps.mtx.Unlock()
+ blocked := 0
+ for sink := range ps.sinks {
+ blocked += len(sink.channel)
+ }
+ return float64(blocked)
+ }))
+ ps.eventsIn = prometheus.NewCounter(prometheus.CounterOpts{
+ Namespace: "arvados",
+ Subsystem: "ws",
+ Name: "events_in",
+ Help: "Number of events received from postgresql notify channel",
+ })
+ ps.Reg.MustRegister(ps.eventsIn)
+ ps.eventsOut = prometheus.NewCounter(prometheus.CounterOpts{
+ Namespace: "arvados",
+ Subsystem: "ws",
+ Name: "events_out",
+ Help: "Number of events sent to client sessions (before filtering)",
+ })
+ ps.Reg.MustRegister(ps.eventsOut)
+
+ maxConnections := prometheus.NewGauge(prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Subsystem: "ws",
+ Name: "db_max_connections",
+ Help: "Maximum number of open connections to the database",
+ })
+ ps.Reg.MustRegister(maxConnections)
+ openConnections := prometheus.NewGaugeVec(prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Subsystem: "ws",
+ Name: "db_open_connections",
+ Help: "Open connections to the database",
+ }, []string{"inuse"})
+ ps.Reg.MustRegister(openConnections)
+
+ updateDBStats := func() {
+ stats := ps.db.Stats()
+ maxConnections.Set(float64(stats.MaxOpenConnections))
+ openConnections.WithLabelValues("0").Set(float64(stats.Idle))
+ openConnections.WithLabelValues("1").Set(float64(stats.InUse))
+ }
+ go func() {
+ <-ps.ready
+ if ps.db == nil {
+ return
+ }
+ updateDBStats()
+ for range time.Tick(time.Second) {
+ updateDBStats()
+ }
+ }()
}
// Close stops listening for new events and disconnects all clients.
// Run listens for event notifications on the "logs" channel and sends
// them to all subscribers.
func (ps *pgEventSource) Run() {
- logger(nil).Debug("pgEventSource Run starting")
- defer logger(nil).Debug("pgEventSource Run finished")
+ ps.Logger.Debug("pgEventSource Run starting")
+ defer ps.Logger.Debug("pgEventSource Run finished")
ps.setupOnce.Do(ps.setup)
ready := ps.ready
db, err := sql.Open("postgres", ps.DataSource)
if err != nil {
- logger(nil).WithError(err).Error("sql.Open failed")
+ ps.Logger.WithError(err).Error("sql.Open failed")
return
}
if ps.MaxOpenConns <= 0 {
- logger(nil).Warn("no database connection limit configured -- consider setting PostgresPool>0 in arvados-ws configuration file")
+ ps.Logger.Warn("no database connection limit configured -- consider setting PostgreSQL.ConnectionPool>0 in arvados-ws configuration file")
}
db.SetMaxOpenConns(ps.MaxOpenConns)
if err = db.Ping(); err != nil {
- logger(nil).WithError(err).Error("db.Ping failed")
+ ps.Logger.WithError(err).Error("db.Ping failed")
return
}
ps.db = db
ps.pqListener = pq.NewListener(ps.DataSource, time.Second, time.Minute, ps.listenerProblem)
err = ps.pqListener.Listen("logs")
if err != nil {
- logger(nil).WithError(err).Error("pq Listen failed")
+ ps.Logger.WithError(err).Error("pq Listen failed")
return
}
defer ps.pqListener.Close()
- logger(nil).Debug("pq Listen setup done")
+ ps.Logger.Debug("pq Listen setup done")
close(ready)
// Avoid double-close in deferred func
// client_count X client_queue_size.
e.Detail()
- logger(nil).
+ ps.Logger.
WithField("serial", e.Serial).
WithField("detail", e.Detail()).
Debug("event ready")
ps.lastQDelay = e.Ready.Sub(e.Received)
ps.mtx.Lock()
- atomic.AddUint64(&ps.eventsOut, uint64(len(ps.sinks)))
for sink := range ps.sinks {
sink.channel <- e
+ ps.eventsOut.Inc()
}
ps.mtx.Unlock()
}
for {
select {
case <-ctx.Done():
- logger(nil).Debug("ctx done")
+ ps.Logger.Debug("ctx done")
return
case <-ticker.C:
- logger(nil).Debug("listener ping")
+ ps.Logger.Debug("listener ping")
err := ps.pqListener.Ping()
if err != nil {
ps.listenerProblem(-1, fmt.Errorf("pqListener ping failed: %s", err))
case pqEvent, ok := <-ps.pqListener.Notify:
if !ok {
- logger(nil).Error("pqListener Notify chan closed")
+ ps.Logger.Error("pqListener Notify chan closed")
return
}
if pqEvent == nil {
continue
}
if pqEvent.Channel != "logs" {
- logger(nil).WithField("pqEvent", pqEvent).Error("unexpected notify from wrong channel")
+ ps.Logger.WithField("pqEvent", pqEvent).Error("unexpected notify from wrong channel")
continue
}
logID, err := strconv.ParseUint(pqEvent.Extra, 10, 64)
if err != nil {
- logger(nil).WithField("pqEvent", pqEvent).Error("bad notify payload")
+ ps.Logger.WithField("pqEvent", pqEvent).Error("bad notify payload")
continue
}
serial++
Received: time.Now(),
Serial: serial,
db: ps.db,
+ logger: ps.Logger,
}
- logger(nil).WithField("event", e).Debug("incoming")
- atomic.AddUint64(&ps.eventsIn, 1)
+ ps.Logger.WithField("event", e).Debug("incoming")
+ ps.eventsIn.Inc()
ps.queue <- e
go e.Detail()
}
}
func (ps *pgEventSource) DBHealth() error {
+ if ps.db == nil {
+ return errors.New("database not connected")
+ }
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second))
defer cancel()
var i int
blocked += len(sink.channel)
}
return map[string]interface{}{
- "EventsIn": atomic.LoadUint64(&ps.eventsIn),
- "EventsOut": atomic.LoadUint64(&ps.eventsOut),
"Queue": len(ps.queue),
"QueueLimit": cap(ps.queue),
"QueueDelay": stats.Duration(ps.lastQDelay),
//
// SPDX-License-Identifier: AGPL-3.0
-package main
+package ws
import (
"database/sql"
"fmt"
- "os"
- "path/filepath"
"sync"
"time"
"git.arvados.org/arvados.git/sdk/go/arvados"
+ "git.arvados.org/arvados.git/sdk/go/ctxlog"
+ "github.com/prometheus/client_golang/prometheus"
check "gopkg.in/check.v1"
)
type eventSourceSuite struct{}
func testDBConfig() arvados.PostgreSQLConnection {
- cfg, err := arvados.GetConfig(filepath.Join(os.Getenv("WORKSPACE"), "tmp", "arvados.yml"))
+ cfg, err := arvados.GetConfig(arvados.DefaultConfigFile)
if err != nil {
panic(err)
}
pges := &pgEventSource{
DataSource: cfg.String(),
QueueSize: 4,
+ Logger: ctxlog.TestLogger(c),
+ Reg: prometheus.NewRegistry(),
}
go pges.Run()
sinks := make([]eventSink, 18)
//
// SPDX-License-Identifier: AGPL-3.0
-package main
+package ws
import check "gopkg.in/check.v1"
//
// SPDX-License-Identifier: AGPL-3.0
-package main
+package ws
import (
"testing"
func TestGocheck(t *testing.T) {
check.TestingT(t)
}
+
+func init() {
+ testMode = true
+}
//
// SPDX-License-Identifier: AGPL-3.0
-package main
+package ws
import (
"context"
"git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/stats"
+ "github.com/sirupsen/logrus"
)
type handler struct {
EventCount uint64
}
-func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsConn, chan<- interface{}) (session, error)) (hStats handlerStats) {
+func (h *handler) Handle(ws wsConn, logger logrus.FieldLogger, eventSource eventSource, newSession func(wsConn, chan<- interface{}) (session, error)) (hStats handlerStats) {
h.setupOnce.Do(h.setup)
ctx, cancel := context.WithCancel(ws.Request().Context())
defer cancel()
- log := logger(ctx)
incoming := eventSource.NewSink()
defer incoming.Stop()
sess, err := newSession(ws, queue)
if err != nil {
- log.WithError(err).Error("newSession failed")
+ logger.WithError(err).Error("newSession failed")
return
}
ws.SetReadDeadline(time.Now().Add(24 * 365 * time.Hour))
n, err := ws.Read(buf)
buf := buf[:n]
- log.WithField("frame", string(buf[:n])).Debug("received frame")
+ logger.WithField("frame", string(buf[:n])).Debug("received frame")
if err == nil && n == cap(buf) {
err = errFrameTooBig
}
if err != nil {
if err != io.EOF && ctx.Err() == nil {
- log.WithError(err).Info("read error")
+ logger.WithError(err).Info("read error")
}
return
}
err = sess.Receive(buf)
if err != nil {
- log.WithError(err).Error("sess.Receive() failed")
+ logger.WithError(err).Error("sess.Receive() failed")
return
}
}
var e *event
var buf []byte
var err error
- log := log
+ logger := logger
switch data := data.(type) {
case []byte:
buf = data
case *event:
e = data
- log = log.WithField("serial", e.Serial)
+ logger = logger.WithField("serial", e.Serial)
buf, err = sess.EventMessage(e)
if err != nil {
- log.WithError(err).Error("EventMessage failed")
+ logger.WithError(err).Error("EventMessage failed")
return
} else if len(buf) == 0 {
- log.Debug("skip")
+ logger.Debug("skip")
continue
}
default:
- log.WithField("data", data).Error("bad object in client queue")
+ logger.WithField("data", data).Error("bad object in client queue")
continue
}
- log.WithField("frame", string(buf)).Debug("send event")
+ logger.WithField("frame", string(buf)).Debug("send event")
ws.SetWriteDeadline(time.Now().Add(h.PingTimeout))
t0 := time.Now()
_, err = ws.Write(buf)
if err != nil {
if ctx.Err() == nil {
- log.WithError(err).Error("write failed")
+ logger.WithError(err).Error("write failed")
}
return
}
- log.Debug("sent")
+ logger.Debug("sent")
if e != nil {
hStats.QueueDelayNs += t0.Sub(e.Ready)
select {
case queue <- e:
default:
- log.WithError(errQueueFull).Error("terminate")
+ logger.WithError(errQueueFull).Error("terminate")
return
}
}
+++ /dev/null
-// Copyright (C) The Arvados Authors. All rights reserved.
-//
-// SPDX-License-Identifier: AGPL-3.0
-
-package main
-
-import (
- "flag"
- "fmt"
- "os"
-
- "git.arvados.org/arvados.git/lib/config"
- "git.arvados.org/arvados.git/sdk/go/arvados"
- "git.arvados.org/arvados.git/sdk/go/ctxlog"
- "github.com/ghodss/yaml"
- "github.com/sirupsen/logrus"
-)
-
-var logger = ctxlog.FromContext
-var version = "dev"
-
-func configure(log logrus.FieldLogger, args []string) *arvados.Cluster {
- flags := flag.NewFlagSet(args[0], flag.ExitOnError)
- dumpConfig := flags.Bool("dump-config", false, "show current configuration and exit")
- getVersion := flags.Bool("version", false, "Print version information and exit.")
-
- loader := config.NewLoader(nil, log)
- loader.SetupFlags(flags)
- args = loader.MungeLegacyConfigArgs(log, args[1:], "-legacy-ws-config")
-
- flags.Parse(args)
-
- // Print version information if requested
- if *getVersion {
- fmt.Printf("arvados-ws %s\n", version)
- return nil
- }
-
- cfg, err := loader.Load()
- if err != nil {
- log.Fatal(err)
- }
-
- cluster, err := cfg.GetCluster("")
- if err != nil {
- log.Fatal(err)
- }
-
- ctxlog.SetLevel(cluster.SystemLogs.LogLevel)
- ctxlog.SetFormat(cluster.SystemLogs.Format)
-
- if *dumpConfig {
- out, err := yaml.Marshal(cfg)
- if err != nil {
- log.Fatal(err)
- }
- _, err = os.Stdout.Write(out)
- if err != nil {
- log.Fatal(err)
- }
- return nil
- }
- return cluster
-}
-
-func main() {
- log := logger(nil)
-
- cluster := configure(log, os.Args)
- if cluster == nil {
- return
- }
-
- log.Printf("arvados-ws %s started", version)
- srv := &server{cluster: cluster}
- log.Fatal(srv.Run())
-}
//
// SPDX-License-Identifier: AGPL-3.0
-package main
+package ws
import (
+ "context"
"net/http"
"net/url"
"time"
"git.arvados.org/arvados.git/sdk/go/arvados"
+ "git.arvados.org/arvados.git/sdk/go/ctxlog"
)
const (
type permChecker interface {
SetToken(token string)
- Check(uuid string) (bool, error)
+ Check(ctx context.Context, uuid string) (bool, error)
}
func newPermChecker(ac arvados.Client) permChecker {
pc.cache = make(map[string]cacheEnt)
}
-func (pc *cachingPermChecker) Check(uuid string) (bool, error) {
+func (pc *cachingPermChecker) Check(ctx context.Context, uuid string) (bool, error) {
pc.nChecks++
- logger := logger(nil).
+ logger := ctxlog.FromContext(ctx).
WithField("token", pc.Client.AuthToken).
WithField("uuid", uuid)
pc.tidy()
//
// SPDX-License-Identifier: AGPL-3.0
-package main
+package ws
import (
+ "context"
+
"git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/arvadostest"
check "gopkg.in/check.v1"
}
wantError := func(uuid string) {
c.Log(uuid)
- ok, err := pc.Check(uuid)
+ ok, err := pc.Check(context.Background(), uuid)
c.Check(ok, check.Equals, false)
c.Check(err, check.NotNil)
}
wantYes := func(uuid string) {
c.Log(uuid)
- ok, err := pc.Check(uuid)
+ ok, err := pc.Check(context.Background(), uuid)
c.Check(ok, check.Equals, true)
c.Check(err, check.IsNil)
}
wantNo := func(uuid string) {
c.Log(uuid)
- ok, err := pc.Check(uuid)
+ ok, err := pc.Check(context.Background(), uuid)
c.Check(ok, check.Equals, false)
c.Check(err, check.IsNil)
}
pc.SetToken(arvadostest.ActiveToken)
c.Log("...network error")
- pc.Client.APIHost = "127.0.0.1:discard"
+ pc.Client.APIHost = "127.0.0.1:9"
wantError(arvadostest.UserAgreementCollection)
wantError(arvadostest.FooBarDirCollection)
//
// SPDX-License-Identifier: AGPL-3.0
-package main
+package ws
import (
- "encoding/json"
"io"
"net/http"
- "strconv"
"sync"
"sync/atomic"
"time"
"git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/ctxlog"
"git.arvados.org/arvados.git/sdk/go/health"
+ "github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
"golang.org/x/net/websocket"
)
}
type router struct {
- client arvados.Client
+ client *arvados.Client
cluster *arvados.Cluster
eventSource eventSource
newPermChecker func() permChecker
handler *handler
mux *http.ServeMux
setupOnce sync.Once
-
- lastReqID int64
- lastReqMtx sync.Mutex
-
- status routerDebugStatus
-}
-
-type routerDebugStatus struct {
- ReqsReceived int64
- ReqsActive int64
-}
-
-type debugStatuser interface {
- DebugStatus() interface{}
+ done chan struct{}
+ reg *prometheus.Registry
}
func (rtr *router) setup() {
+ mSockets := prometheus.NewGaugeVec(prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Subsystem: "ws",
+ Name: "sockets",
+ Help: "Number of connected sockets",
+ }, []string{"version"})
+ rtr.reg.MustRegister(mSockets)
+
rtr.handler = &handler{
PingTimeout: time.Duration(rtr.cluster.API.SendTimeout),
QueueSize: rtr.cluster.API.WebsocketClientEventQueue,
}
rtr.mux = http.NewServeMux()
- rtr.mux.Handle("/websocket", rtr.makeServer(newSessionV0))
- rtr.mux.Handle("/arvados/v1/events.ws", rtr.makeServer(newSessionV1))
- rtr.mux.Handle("/debug.json", rtr.jsonHandler(rtr.DebugStatus))
- rtr.mux.Handle("/status.json", rtr.jsonHandler(rtr.Status))
-
+ rtr.mux.Handle("/websocket", rtr.makeServer(newSessionV0, mSockets.WithLabelValues("0")))
+ rtr.mux.Handle("/arvados/v1/events.ws", rtr.makeServer(newSessionV1, mSockets.WithLabelValues("1")))
rtr.mux.Handle("/_health/", &health.Handler{
Token: rtr.cluster.ManagementToken,
Prefix: "/_health/",
},
Log: func(r *http.Request, err error) {
if err != nil {
- logger(r.Context()).WithError(err).Error("error")
+ ctxlog.FromContext(r.Context()).WithError(err).Error("error")
}
},
})
}
-func (rtr *router) makeServer(newSession sessionFactory) *websocket.Server {
+func (rtr *router) makeServer(newSession sessionFactory, gauge prometheus.Gauge) *websocket.Server {
+ var connected int64
return &websocket.Server{
Handshake: func(c *websocket.Config, r *http.Request) error {
return nil
},
Handler: websocket.Handler(func(ws *websocket.Conn) {
t0 := time.Now()
- log := logger(ws.Request().Context())
- log.Info("connected")
+ logger := ctxlog.FromContext(ws.Request().Context())
+ atomic.AddInt64(&connected, 1)
+ gauge.Set(float64(atomic.LoadInt64(&connected)))
- stats := rtr.handler.Handle(ws, rtr.eventSource,
+ stats := rtr.handler.Handle(ws, logger, rtr.eventSource,
func(ws wsConn, sendq chan<- interface{}) (session, error) {
- return newSession(ws, sendq, rtr.eventSource.DB(), rtr.newPermChecker(), &rtr.client)
+ return newSession(ws, sendq, rtr.eventSource.DB(), rtr.newPermChecker(), rtr.client)
})
- log.WithFields(logrus.Fields{
+ logger.WithFields(logrus.Fields{
"elapsed": time.Now().Sub(t0).Seconds(),
"stats": stats,
- }).Info("disconnect")
+ }).Info("client disconnected")
ws.Close()
+ atomic.AddInt64(&connected, -1)
+ gauge.Set(float64(atomic.LoadInt64(&connected)))
}),
}
}
-func (rtr *router) newReqID() string {
- rtr.lastReqMtx.Lock()
- defer rtr.lastReqMtx.Unlock()
- id := time.Now().UnixNano()
- if id <= rtr.lastReqID {
- id = rtr.lastReqID + 1
- }
- return strconv.FormatInt(id, 36)
-}
-
-func (rtr *router) DebugStatus() interface{} {
- s := map[string]interface{}{
- "HTTP": rtr.status,
- "Outgoing": rtr.handler.DebugStatus(),
- }
- if es, ok := rtr.eventSource.(debugStatuser); ok {
- s["EventSource"] = es.DebugStatus()
- }
- return s
-}
-
-func (rtr *router) Status() interface{} {
- return map[string]interface{}{
- "Clients": atomic.LoadInt64(&rtr.status.ReqsActive),
- "Version": version,
- }
-}
-
func (rtr *router) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
rtr.setupOnce.Do(rtr.setup)
- atomic.AddInt64(&rtr.status.ReqsReceived, 1)
- atomic.AddInt64(&rtr.status.ReqsActive, 1)
- defer atomic.AddInt64(&rtr.status.ReqsActive, -1)
-
- logger := logger(req.Context()).
- WithField("RequestID", rtr.newReqID())
- ctx := ctxlog.Context(req.Context(), logger)
- req = req.WithContext(ctx)
- logger.WithFields(logrus.Fields{
- "remoteAddr": req.RemoteAddr,
- "reqForwardedFor": req.Header.Get("X-Forwarded-For"),
- }).Info("accept request")
rtr.mux.ServeHTTP(resp, req)
}
-func (rtr *router) jsonHandler(fn func() interface{}) http.Handler {
- return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
- logger := logger(r.Context())
- w.Header().Set("Content-Type", "application/json")
- enc := json.NewEncoder(w)
- err := enc.Encode(fn())
- if err != nil {
- msg := "encode failed"
- logger.WithError(err).Error(msg)
- http.Error(w, msg, http.StatusInternalServerError)
- }
- })
+func (rtr *router) CheckHealth() error {
+ rtr.setupOnce.Do(rtr.setup)
+ return rtr.eventSource.DBHealth()
+}
+
+func (rtr *router) Done() <-chan struct{} {
+ return rtr.done
}
+++ /dev/null
-// Copyright (C) The Arvados Authors. All rights reserved.
-//
-// SPDX-License-Identifier: AGPL-3.0
-
-package main
-
-import (
- "net"
- "net/http"
- "sync"
- "time"
-
- "git.arvados.org/arvados.git/sdk/go/arvados"
- "github.com/coreos/go-systemd/daemon"
-)
-
-type server struct {
- httpServer *http.Server
- listener net.Listener
- cluster *arvados.Cluster
- eventSource *pgEventSource
- setupOnce sync.Once
-}
-
-func (srv *server) Close() {
- srv.WaitReady()
- srv.eventSource.Close()
- srv.httpServer.Close()
- srv.listener.Close()
-}
-
-func (srv *server) WaitReady() {
- srv.setupOnce.Do(srv.setup)
- srv.eventSource.WaitReady()
-}
-
-func (srv *server) Run() error {
- srv.setupOnce.Do(srv.setup)
- return srv.httpServer.Serve(srv.listener)
-}
-
-func (srv *server) setup() {
- log := logger(nil)
-
- var listen arvados.URL
- for listen, _ = range srv.cluster.Services.Websocket.InternalURLs {
- break
- }
- ln, err := net.Listen("tcp", listen.Host)
- if err != nil {
- log.WithField("Listen", listen).Fatal(err)
- }
- log.WithField("Listen", ln.Addr().String()).Info("listening")
-
- client := arvados.Client{}
- client.APIHost = srv.cluster.Services.Controller.ExternalURL.Host
- client.AuthToken = srv.cluster.SystemRootToken
- client.Insecure = srv.cluster.TLS.Insecure
-
- srv.listener = ln
- srv.eventSource = &pgEventSource{
- DataSource: srv.cluster.PostgreSQL.Connection.String(),
- MaxOpenConns: srv.cluster.PostgreSQL.ConnectionPool,
- QueueSize: srv.cluster.API.WebsocketServerEventQueue,
- }
-
- srv.httpServer = &http.Server{
- Addr: listen.Host,
- ReadTimeout: time.Minute,
- WriteTimeout: time.Minute,
- MaxHeaderBytes: 1 << 20,
- Handler: &router{
- cluster: srv.cluster,
- client: client,
- eventSource: srv.eventSource,
- newPermChecker: func() permChecker { return newPermChecker(client) },
- },
- }
-
- go func() {
- srv.eventSource.Run()
- log.Info("event source stopped")
- srv.Close()
- }()
-
- if _, err := daemon.SdNotify(false, "READY=1"); err != nil {
- log.WithError(err).Warn("error notifying init daemon")
- }
-}
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package ws
+
+import (
+ "context"
+ "fmt"
+
+ "git.arvados.org/arvados.git/lib/cmd"
+ "git.arvados.org/arvados.git/lib/service"
+ "git.arvados.org/arvados.git/sdk/go/arvados"
+ "git.arvados.org/arvados.git/sdk/go/ctxlog"
+ "github.com/prometheus/client_golang/prometheus"
+)
+
+var testMode = false
+
+var Command cmd.Handler = service.Command(arvados.ServiceNameWebsocket, newHandler)
+
+func newHandler(ctx context.Context, cluster *arvados.Cluster, token string, reg *prometheus.Registry) service.Handler {
+ client, err := arvados.NewClientFromConfig(cluster)
+ if err != nil {
+ return service.ErrorHandler(ctx, cluster, fmt.Errorf("error initializing client from cluster config: %s", err))
+ }
+ eventSource := &pgEventSource{
+ DataSource: cluster.PostgreSQL.Connection.String(),
+ MaxOpenConns: cluster.PostgreSQL.ConnectionPool,
+ QueueSize: cluster.API.WebsocketServerEventQueue,
+ Logger: ctxlog.FromContext(ctx),
+ Reg: reg,
+ }
+ done := make(chan struct{})
+ go func() {
+ eventSource.Run()
+ ctxlog.FromContext(ctx).Error("event source stopped")
+ close(done)
+ }()
+ eventSource.WaitReady()
+ if err := eventSource.DBHealth(); err != nil {
+ return service.ErrorHandler(ctx, cluster, err)
+ }
+ rtr := &router{
+ cluster: cluster,
+ client: client,
+ eventSource: eventSource,
+ newPermChecker: func() permChecker { return newPermChecker(*client) },
+ done: done,
+ reg: reg,
+ }
+ return rtr
+}
//
// SPDX-License-Identifier: AGPL-3.0
-package main
+package ws
import (
- "encoding/json"
+ "bytes"
+ "context"
+ "flag"
"io/ioutil"
"net/http"
+ "net/http/httptest"
"os"
+ "strings"
"sync"
"time"
"git.arvados.org/arvados.git/lib/config"
+ "git.arvados.org/arvados.git/lib/service"
"git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/arvadostest"
"git.arvados.org/arvados.git/sdk/go/ctxlog"
+ "git.arvados.org/arvados.git/sdk/go/httpserver"
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/sirupsen/logrus"
check "gopkg.in/check.v1"
)
-var _ = check.Suite(&serverSuite{})
+var _ = check.Suite(&serviceSuite{})
-type serverSuite struct {
+type serviceSuite struct {
+ handler service.Handler
+ reg *prometheus.Registry
+ srv *httptest.Server
cluster *arvados.Cluster
- srv *server
wg sync.WaitGroup
}
-func (s *serverSuite) SetUpTest(c *check.C) {
+func (s *serviceSuite) SetUpTest(c *check.C) {
var err error
s.cluster, err = s.testConfig(c)
c.Assert(err, check.IsNil)
- s.srv = &server{cluster: s.cluster}
}
-func (*serverSuite) testConfig(c *check.C) (*arvados.Cluster, error) {
+func (s *serviceSuite) start(c *check.C) {
+ s.reg = prometheus.NewRegistry()
+ s.handler = newHandler(context.Background(), s.cluster, "", s.reg)
+ instrumented := httpserver.Instrument(s.reg, ctxlog.TestLogger(c), s.handler)
+ s.srv = httptest.NewServer(instrumented.ServeAPI(s.cluster.ManagementToken, instrumented))
+}
+
+func (s *serviceSuite) TearDownTest(c *check.C) {
+ if s.srv != nil {
+ s.srv.Close()
+ }
+}
+
+func (*serviceSuite) testConfig(c *check.C) (*arvados.Cluster, error) {
ldr := config.NewLoader(nil, ctxlog.TestLogger(c))
cfg, err := ldr.Load()
if err != nil {
cluster.SystemRootToken = client.AuthToken
cluster.TLS.Insecure = client.Insecure
cluster.PostgreSQL.Connection = testDBConfig()
+ cluster.PostgreSQL.ConnectionPool = 12
cluster.Services.Websocket.InternalURLs = map[arvados.URL]arvados.ServiceInstance{arvados.URL{Host: ":"}: arvados.ServiceInstance{}}
cluster.ManagementToken = arvadostest.ManagementToken
return cluster, nil
}
-// TestBadDB ensures Run() returns an error (instead of panicking or
-// deadlocking) if it can't connect to the database server at startup.
-func (s *serverSuite) TestBadDB(c *check.C) {
+// TestBadDB ensures the server returns an error (instead of panicking
+// or deadlocking) if it can't connect to the database server at
+// startup.
+func (s *serviceSuite) TestBadDB(c *check.C) {
s.cluster.PostgreSQL.Connection["password"] = "1234"
-
- var wg sync.WaitGroup
- wg.Add(1)
- go func() {
- err := s.srv.Run()
- c.Check(err, check.NotNil)
- wg.Done()
- }()
- wg.Add(1)
- go func() {
- s.srv.WaitReady()
- wg.Done()
- }()
-
- done := make(chan bool)
- go func() {
- wg.Wait()
- close(done)
- }()
- select {
- case <-done:
- case <-time.After(10 * time.Second):
- c.Fatal("timeout")
- }
+ s.start(c)
+ resp, err := http.Get(s.srv.URL)
+ c.Check(err, check.IsNil)
+ c.Check(resp.StatusCode, check.Equals, http.StatusInternalServerError)
+ c.Check(s.handler.CheckHealth(), check.ErrorMatches, "database not connected")
+ c.Check(err, check.IsNil)
+ c.Check(resp.StatusCode, check.Equals, http.StatusInternalServerError)
}
-func (s *serverSuite) TestHealth(c *check.C) {
- go s.srv.Run()
- defer s.srv.Close()
- s.srv.WaitReady()
+func (s *serviceSuite) TestHealth(c *check.C) {
+ s.start(c)
for _, token := range []string{"", "foo", s.cluster.ManagementToken} {
- req, err := http.NewRequest("GET", "http://"+s.srv.listener.Addr().String()+"/_health/ping", nil)
+ req, err := http.NewRequest("GET", s.srv.URL+"/_health/ping", nil)
c.Assert(err, check.IsNil)
if token != "" {
req.Header.Add("Authorization", "Bearer "+token)
}
}
-func (s *serverSuite) TestStatus(c *check.C) {
- go s.srv.Run()
- defer s.srv.Close()
- s.srv.WaitReady()
- req, err := http.NewRequest("GET", "http://"+s.srv.listener.Addr().String()+"/status.json", nil)
- c.Assert(err, check.IsNil)
- resp, err := http.DefaultClient.Do(req)
- c.Check(err, check.IsNil)
- c.Check(resp.StatusCode, check.Equals, http.StatusOK)
- var status map[string]interface{}
- err = json.NewDecoder(resp.Body).Decode(&status)
- c.Check(err, check.IsNil)
- c.Check(status["Version"], check.Not(check.Equals), "")
+func (s *serviceSuite) TestMetrics(c *check.C) {
+ s.start(c)
+ s.handler.CheckHealth()
+ for deadline := time.Now().Add(time.Second); ; {
+ req, err := http.NewRequest("GET", s.srv.URL+"/metrics", nil)
+ c.Assert(err, check.IsNil)
+ req.Header.Set("Authorization", "Bearer "+s.cluster.ManagementToken)
+ resp, err := http.DefaultClient.Do(req)
+ c.Check(err, check.IsNil)
+ c.Check(resp.StatusCode, check.Equals, http.StatusOK)
+ text, err := ioutil.ReadAll(resp.Body)
+ c.Check(err, check.IsNil)
+ if strings.Contains(string(text), "_db_max_connections 0\n") {
+ // wait for the first db stats update
+ if time.Now().After(deadline) {
+ c.Fatal("timed out")
+ }
+ time.Sleep(time.Second / 50)
+ continue
+ }
+ c.Check(string(text), check.Matches, `(?ms).*\narvados_ws_db_max_connections 12\n.*`)
+ c.Check(string(text), check.Matches, `(?ms).*\narvados_ws_db_open_connections\{inuse="0"\} \d+\n.*`)
+ c.Check(string(text), check.Matches, `(?ms).*\narvados_ws_db_open_connections\{inuse="1"\} \d+\n.*`)
+ break
+ }
}
-func (s *serverSuite) TestHealthDisabled(c *check.C) {
+func (s *serviceSuite) TestHealthDisabled(c *check.C) {
s.cluster.ManagementToken = ""
-
- go s.srv.Run()
- defer s.srv.Close()
- s.srv.WaitReady()
-
+ s.start(c)
for _, token := range []string{"", "foo", arvadostest.ManagementToken} {
- req, err := http.NewRequest("GET", "http://"+s.srv.listener.Addr().String()+"/_health/ping", nil)
+ req, err := http.NewRequest("GET", s.srv.URL+"/_health/ping", nil)
c.Assert(err, check.IsNil)
req.Header.Add("Authorization", "Bearer "+token)
resp, err := http.DefaultClient.Do(req)
}
}
-func (s *serverSuite) TestLoadLegacyConfig(c *check.C) {
+func (s *serviceSuite) TestLoadLegacyConfig(c *check.C) {
content := []byte(`
Client:
APIHost: example.com
c.Error(err)
}
- cluster := configure(logger(nil), []string{"arvados-ws", "-config", tmpfile.Name()})
+ ldr := config.NewLoader(&bytes.Buffer{}, logrus.New())
+ flagset := flag.NewFlagSet("", flag.ContinueOnError)
+ ldr.SetupFlags(flagset)
+ flagset.Parse(ldr.MungeLegacyConfigArgs(ctxlog.TestLogger(c), []string{"-config", tmpfile.Name()}, "-legacy-ws-config"))
+ cfg, err := ldr.Load()
+ c.Check(err, check.IsNil)
+ cluster, err := cfg.GetCluster("")
+ c.Check(err, check.IsNil)
c.Check(cluster, check.NotNil)
c.Check(cluster.Services.Controller.ExternalURL, check.Equals, arvados.URL{Scheme: "https", Host: "example.com"})
//
// SPDX-License-Identifier: AGPL-3.0
-package main
+package ws
import (
"database/sql"
//
// SPDX-License-Identifier: AGPL-3.0
-package main
+package ws
import (
"database/sql"
"time"
"git.arvados.org/arvados.git/sdk/go/arvados"
+ "git.arvados.org/arvados.git/sdk/go/ctxlog"
"github.com/sirupsen/logrus"
)
db: db,
ac: ac,
permChecker: pc,
- log: logger(ws.Request().Context()),
+ log: ctxlog.FromContext(ws.Request().Context()),
}
err := ws.Request().ParseForm()
} else {
permTarget = detail.ObjectUUID
}
- ok, err := sess.permChecker.Check(permTarget)
+ ok, err := sess.permChecker.Check(sess.ws.Request().Context(), permTarget)
if err != nil || !ok {
return nil, err
}
//
// SPDX-License-Identifier: AGPL-3.0
-package main
+package ws
import (
"bytes"
"io"
"net/url"
"os"
+ "strings"
"sync"
"time"
var _ = check.Suite(&v0Suite{})
type v0Suite struct {
- serverSuite serverSuite
- token string
- toDelete []string
- wg sync.WaitGroup
- ignoreLogID uint64
+ serviceSuite serviceSuite
+ token string
+ toDelete []string
+ wg sync.WaitGroup
+ ignoreLogID uint64
}
func (s *v0Suite) SetUpTest(c *check.C) {
- s.serverSuite.SetUpTest(c)
- go s.serverSuite.srv.Run()
- s.serverSuite.srv.WaitReady()
+ s.serviceSuite.SetUpTest(c)
+ s.serviceSuite.start(c)
s.token = arvadostest.ActiveToken
s.ignoreLogID = s.lastLogID(c)
func (s *v0Suite) TearDownTest(c *check.C) {
s.wg.Wait()
- s.serverSuite.srv.Close()
+ s.serviceSuite.TearDownTest(c)
}
func (s *v0Suite) TearDownSuite(c *check.C) {
}
func (s *v0Suite) testClient() (*websocket.Conn, *json.Decoder, *json.Encoder) {
- srv := s.serverSuite.srv
- conn, err := websocket.Dial("ws://"+srv.listener.Addr().String()+"/websocket?api_token="+s.token, "", "http://"+srv.listener.Addr().String())
+ srv := s.serviceSuite.srv
+ conn, err := websocket.Dial(strings.Replace(srv.URL, "http", "ws", 1)+"/websocket?api_token="+s.token, "", srv.URL)
if err != nil {
panic(err)
}
//
// SPDX-License-Identifier: AGPL-3.0
-package main
+package ws
import (
"database/sql"
set -ex -o pipefail
. /usr/local/lib/arvbox/common.sh
-
-if test -s /var/lib/arvados/api_rails_env ; then
- RAILS_ENV=$(cat /var/lib/arvados/api_rails_env)
-else
- RAILS_ENV=development
-fi
-
. /usr/local/lib/arvbox/go-setup.sh
-flock /var/lib/gopath/gopath.lock go install "git.arvados.org/arvados.git/services/ws"
-install $GOPATH/bin/ws /usr/local/bin/arvados-ws
+(cd /usr/local/bin && ln -sf arvados-server arvados-ws)
if test "$1" = "--only-deps" ; then
exit
fi
-database_pw=$(cat /var/lib/arvados/api_database_pw)
-
-cat >/var/lib/arvados/arvados-ws.yml <<EOF
-Client:
- APIHost: $localip:${services[controller-ssl]}
- Insecure: false
-Postgres:
- dbname: arvados_$RAILS_ENV
- user: arvados
- password: $database_pw
- host: localhost
-Listen: localhost:${services[websockets]}
-EOF
+/usr/local/lib/arvbox/runsu.sh flock /var/lib/arvados/cluster_config.yml.lock /usr/local/lib/arvbox/cluster-config.sh
-exec /usr/local/bin/arvados-ws -config /var/lib/arvados/arvados-ws.yml
+exec /usr/local/lib/arvbox/runsu.sh /usr/local/bin/arvados-ws