Merge branch '16265-security-updates' into dependabot/bundler/apps/workbench/loofah...
[arvados.git] / services / ws / router.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "encoding/json"
9         "io"
10         "net/http"
11         "strconv"
12         "sync"
13         "sync/atomic"
14         "time"
15
16         "git.arvados.org/arvados.git/sdk/go/arvados"
17         "git.arvados.org/arvados.git/sdk/go/ctxlog"
18         "git.arvados.org/arvados.git/sdk/go/health"
19         "github.com/sirupsen/logrus"
20         "golang.org/x/net/websocket"
21 )
22
23 type wsConn interface {
24         io.ReadWriter
25         Request() *http.Request
26         SetReadDeadline(time.Time) error
27         SetWriteDeadline(time.Time) error
28 }
29
30 type router struct {
31         client         arvados.Client
32         cluster        *arvados.Cluster
33         eventSource    eventSource
34         newPermChecker func() permChecker
35
36         handler   *handler
37         mux       *http.ServeMux
38         setupOnce sync.Once
39
40         lastReqID  int64
41         lastReqMtx sync.Mutex
42
43         status routerDebugStatus
44 }
45
46 type routerDebugStatus struct {
47         ReqsReceived int64
48         ReqsActive   int64
49 }
50
51 type debugStatuser interface {
52         DebugStatus() interface{}
53 }
54
55 func (rtr *router) setup() {
56         rtr.handler = &handler{
57                 PingTimeout: time.Duration(rtr.cluster.API.SendTimeout),
58                 QueueSize:   rtr.cluster.API.WebsocketClientEventQueue,
59         }
60         rtr.mux = http.NewServeMux()
61         rtr.mux.Handle("/websocket", rtr.makeServer(newSessionV0))
62         rtr.mux.Handle("/arvados/v1/events.ws", rtr.makeServer(newSessionV1))
63         rtr.mux.Handle("/debug.json", rtr.jsonHandler(rtr.DebugStatus))
64         rtr.mux.Handle("/status.json", rtr.jsonHandler(rtr.Status))
65
66         rtr.mux.Handle("/_health/", &health.Handler{
67                 Token:  rtr.cluster.ManagementToken,
68                 Prefix: "/_health/",
69                 Routes: health.Routes{
70                         "db": rtr.eventSource.DBHealth,
71                 },
72                 Log: func(r *http.Request, err error) {
73                         if err != nil {
74                                 logger(r.Context()).WithError(err).Error("error")
75                         }
76                 },
77         })
78 }
79
80 func (rtr *router) makeServer(newSession sessionFactory) *websocket.Server {
81         return &websocket.Server{
82                 Handshake: func(c *websocket.Config, r *http.Request) error {
83                         return nil
84                 },
85                 Handler: websocket.Handler(func(ws *websocket.Conn) {
86                         t0 := time.Now()
87                         log := logger(ws.Request().Context())
88                         log.Info("connected")
89
90                         stats := rtr.handler.Handle(ws, rtr.eventSource,
91                                 func(ws wsConn, sendq chan<- interface{}) (session, error) {
92                                         return newSession(ws, sendq, rtr.eventSource.DB(), rtr.newPermChecker(), &rtr.client)
93                                 })
94
95                         log.WithFields(logrus.Fields{
96                                 "elapsed": time.Now().Sub(t0).Seconds(),
97                                 "stats":   stats,
98                         }).Info("disconnect")
99                         ws.Close()
100                 }),
101         }
102 }
103
104 func (rtr *router) newReqID() string {
105         rtr.lastReqMtx.Lock()
106         defer rtr.lastReqMtx.Unlock()
107         id := time.Now().UnixNano()
108         if id <= rtr.lastReqID {
109                 id = rtr.lastReqID + 1
110         }
111         return strconv.FormatInt(id, 36)
112 }
113
114 func (rtr *router) DebugStatus() interface{} {
115         s := map[string]interface{}{
116                 "HTTP":     rtr.status,
117                 "Outgoing": rtr.handler.DebugStatus(),
118         }
119         if es, ok := rtr.eventSource.(debugStatuser); ok {
120                 s["EventSource"] = es.DebugStatus()
121         }
122         return s
123 }
124
125 func (rtr *router) Status() interface{} {
126         return map[string]interface{}{
127                 "Clients": atomic.LoadInt64(&rtr.status.ReqsActive),
128                 "Version": version,
129         }
130 }
131
132 func (rtr *router) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
133         rtr.setupOnce.Do(rtr.setup)
134         atomic.AddInt64(&rtr.status.ReqsReceived, 1)
135         atomic.AddInt64(&rtr.status.ReqsActive, 1)
136         defer atomic.AddInt64(&rtr.status.ReqsActive, -1)
137
138         logger := logger(req.Context()).
139                 WithField("RequestID", rtr.newReqID())
140         ctx := ctxlog.Context(req.Context(), logger)
141         req = req.WithContext(ctx)
142         logger.WithFields(logrus.Fields{
143                 "remoteAddr":      req.RemoteAddr,
144                 "reqForwardedFor": req.Header.Get("X-Forwarded-For"),
145         }).Info("accept request")
146         rtr.mux.ServeHTTP(resp, req)
147 }
148
149 func (rtr *router) jsonHandler(fn func() interface{}) http.Handler {
150         return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
151                 logger := logger(r.Context())
152                 w.Header().Set("Content-Type", "application/json")
153                 enc := json.NewEncoder(w)
154                 err := enc.Encode(fn())
155                 if err != nil {
156                         msg := "encode failed"
157                         logger.WithError(err).Error(msg)
158                         http.Error(w, msg, http.StatusInternalServerError)
159                 }
160         })
161 }