1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
14 "git.arvados.org/arvados.git/sdk/go/arvados"
15 "git.arvados.org/arvados.git/sdk/go/ctxlog"
16 "git.arvados.org/arvados.git/sdk/go/health"
17 "github.com/prometheus/client_golang/prometheus"
18 "github.com/sirupsen/logrus"
19 "golang.org/x/net/websocket"
22 type wsConn interface {
24 Request() *http.Request
25 SetReadDeadline(time.Time) error
26 SetWriteDeadline(time.Time) error
30 client *arvados.Client
31 cluster *arvados.Cluster
32 eventSource eventSource
33 newPermChecker func() permChecker
39 reg *prometheus.Registry
42 func (rtr *router) setup() {
43 mSockets := prometheus.NewGaugeVec(prometheus.GaugeOpts{
47 Help: "Number of connected sockets",
48 }, []string{"version"})
49 rtr.reg.MustRegister(mSockets)
51 rtr.handler = &handler{
52 PingTimeout: time.Duration(rtr.cluster.API.SendTimeout),
53 QueueSize: rtr.cluster.API.WebsocketClientEventQueue,
55 rtr.mux = http.NewServeMux()
56 rtr.mux.Handle("/websocket", rtr.makeServer(newSessionV0, mSockets.WithLabelValues("0")))
57 rtr.mux.Handle("/arvados/v1/events.ws", rtr.makeServer(newSessionV1, mSockets.WithLabelValues("1")))
58 rtr.mux.Handle("/_health/", &health.Handler{
59 Token: rtr.cluster.ManagementToken,
61 Routes: health.Routes{
62 "db": rtr.eventSource.DBHealth,
64 Log: func(r *http.Request, err error) {
66 ctxlog.FromContext(r.Context()).WithError(err).Error("error")
72 func (rtr *router) makeServer(newSession sessionFactory, gauge prometheus.Gauge) *websocket.Server {
74 return &websocket.Server{
75 Handshake: func(c *websocket.Config, r *http.Request) error {
78 Handler: websocket.Handler(func(ws *websocket.Conn) {
80 logger := ctxlog.FromContext(ws.Request().Context())
81 atomic.AddInt64(&connected, 1)
82 gauge.Set(float64(atomic.LoadInt64(&connected)))
84 stats := rtr.handler.Handle(ws, logger, rtr.eventSource,
85 func(ws wsConn, sendq chan<- interface{}) (session, error) {
86 return newSession(ws, sendq, rtr.eventSource.DB(), rtr.newPermChecker(), rtr.client)
89 logger.WithFields(logrus.Fields{
90 "elapsed": time.Now().Sub(t0).Seconds(),
92 }).Info("client disconnected")
94 atomic.AddInt64(&connected, -1)
95 gauge.Set(float64(atomic.LoadInt64(&connected)))
100 func (rtr *router) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
101 rtr.setupOnce.Do(rtr.setup)
102 rtr.mux.ServeHTTP(resp, req)
105 func (rtr *router) CheckHealth() error {
106 rtr.setupOnce.Do(rtr.setup)
107 return rtr.eventSource.DBHealth()
110 func (rtr *router) Done() <-chan struct{} {