18865: Rearrange code & update comments for clarity.
[arvados.git] / services / ws / router.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package ws
6
7 import (
8         "io"
9         "net/http"
10         "sync"
11         "sync/atomic"
12         "time"
13
14         "git.arvados.org/arvados.git/sdk/go/arvados"
15         "git.arvados.org/arvados.git/sdk/go/ctxlog"
16         "git.arvados.org/arvados.git/sdk/go/health"
17         "github.com/prometheus/client_golang/prometheus"
18         "github.com/sirupsen/logrus"
19         "golang.org/x/net/websocket"
20 )
21
22 type wsConn interface {
23         io.ReadWriter
24         Request() *http.Request
25         SetReadDeadline(time.Time) error
26         SetWriteDeadline(time.Time) error
27 }
28
29 type router struct {
30         client         *arvados.Client
31         cluster        *arvados.Cluster
32         eventSource    eventSource
33         newPermChecker func() permChecker
34
35         handler   *handler
36         mux       *http.ServeMux
37         setupOnce sync.Once
38         done      chan struct{}
39         reg       *prometheus.Registry
40 }
41
42 func (rtr *router) setup() {
43         mSockets := prometheus.NewGaugeVec(prometheus.GaugeOpts{
44                 Namespace: "arvados",
45                 Subsystem: "ws",
46                 Name:      "sockets",
47                 Help:      "Number of connected sockets",
48         }, []string{"version"})
49         rtr.reg.MustRegister(mSockets)
50
51         rtr.handler = &handler{
52                 PingTimeout: time.Duration(rtr.cluster.API.SendTimeout),
53                 QueueSize:   rtr.cluster.API.WebsocketClientEventQueue,
54         }
55         rtr.mux = http.NewServeMux()
56         rtr.mux.Handle("/websocket", rtr.makeServer(newSessionV0, mSockets.WithLabelValues("0")))
57         rtr.mux.Handle("/arvados/v1/events.ws", rtr.makeServer(newSessionV1, mSockets.WithLabelValues("1")))
58         rtr.mux.Handle("/_health/", &health.Handler{
59                 Token:  rtr.cluster.ManagementToken,
60                 Prefix: "/_health/",
61                 Routes: health.Routes{
62                         "db": rtr.eventSource.DBHealth,
63                 },
64                 Log: func(r *http.Request, err error) {
65                         if err != nil {
66                                 ctxlog.FromContext(r.Context()).WithError(err).Error("error")
67                         }
68                 },
69         })
70 }
71
72 func (rtr *router) makeServer(newSession sessionFactory, gauge prometheus.Gauge) *websocket.Server {
73         var connected int64
74         return &websocket.Server{
75                 Handshake: func(c *websocket.Config, r *http.Request) error {
76                         return nil
77                 },
78                 Handler: websocket.Handler(func(ws *websocket.Conn) {
79                         t0 := time.Now()
80                         logger := ctxlog.FromContext(ws.Request().Context())
81                         atomic.AddInt64(&connected, 1)
82                         gauge.Set(float64(atomic.LoadInt64(&connected)))
83
84                         stats := rtr.handler.Handle(ws, logger, rtr.eventSource,
85                                 func(ws wsConn, sendq chan<- interface{}) (session, error) {
86                                         return newSession(ws, sendq, rtr.eventSource.DB(), rtr.newPermChecker(), rtr.client)
87                                 })
88
89                         logger.WithFields(logrus.Fields{
90                                 "elapsed": time.Now().Sub(t0).Seconds(),
91                                 "stats":   stats,
92                         }).Info("client disconnected")
93                         ws.Close()
94                         atomic.AddInt64(&connected, -1)
95                         gauge.Set(float64(atomic.LoadInt64(&connected)))
96                 }),
97         }
98 }
99
100 func (rtr *router) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
101         rtr.setupOnce.Do(rtr.setup)
102         rtr.mux.ServeHTTP(resp, req)
103 }
104
105 func (rtr *router) CheckHealth() error {
106         rtr.setupOnce.Do(rtr.setup)
107         return rtr.eventSource.DBHealth()
108 }
109
110 func (rtr *router) Done() <-chan struct{} {
111         return rtr.done
112 }