Merge branch '8784-dir-listings'
[arvados.git] / services / ws / router.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "encoding/json"
9         "io"
10         "net/http"
11         "strconv"
12         "sync"
13         "sync/atomic"
14         "time"
15
16         "git.curoverse.com/arvados.git/sdk/go/ctxlog"
17         "github.com/Sirupsen/logrus"
18         "golang.org/x/net/websocket"
19 )
20
21 type wsConn interface {
22         io.ReadWriter
23         Request() *http.Request
24         SetReadDeadline(time.Time) error
25         SetWriteDeadline(time.Time) error
26 }
27
28 type router struct {
29         Config         *wsConfig
30         eventSource    eventSource
31         newPermChecker func() permChecker
32
33         handler   *handler
34         mux       *http.ServeMux
35         setupOnce sync.Once
36
37         lastReqID  int64
38         lastReqMtx sync.Mutex
39
40         status routerDebugStatus
41 }
42
43 type routerDebugStatus struct {
44         ReqsReceived int64
45         ReqsActive   int64
46 }
47
48 type debugStatuser interface {
49         DebugStatus() interface{}
50 }
51
52 func (rtr *router) setup() {
53         rtr.handler = &handler{
54                 PingTimeout: rtr.Config.PingTimeout.Duration(),
55                 QueueSize:   rtr.Config.ClientEventQueue,
56         }
57         rtr.mux = http.NewServeMux()
58         rtr.mux.Handle("/websocket", rtr.makeServer(newSessionV0))
59         rtr.mux.Handle("/arvados/v1/events.ws", rtr.makeServer(newSessionV1))
60         rtr.mux.Handle("/debug.json", rtr.jsonHandler(rtr.DebugStatus))
61         rtr.mux.Handle("/status.json", rtr.jsonHandler(rtr.Status))
62
63         health := http.NewServeMux()
64         rtr.mux.Handle("/_health/", rtr.mgmtAuth(health))
65         health.Handle("/_health/ping", rtr.jsonHandler(rtr.HealthFunc(func() error { return nil })))
66         health.Handle("/_health/db", rtr.jsonHandler(rtr.HealthFunc(rtr.eventSource.DBHealth)))
67 }
68
69 func (rtr *router) makeServer(newSession sessionFactory) *websocket.Server {
70         return &websocket.Server{
71                 Handshake: func(c *websocket.Config, r *http.Request) error {
72                         return nil
73                 },
74                 Handler: websocket.Handler(func(ws *websocket.Conn) {
75                         t0 := time.Now()
76                         log := logger(ws.Request().Context())
77                         log.Info("connected")
78
79                         stats := rtr.handler.Handle(ws, rtr.eventSource,
80                                 func(ws wsConn, sendq chan<- interface{}) (session, error) {
81                                         return newSession(ws, sendq, rtr.eventSource.DB(), rtr.newPermChecker(), &rtr.Config.Client)
82                                 })
83
84                         log.WithFields(logrus.Fields{
85                                 "elapsed": time.Now().Sub(t0).Seconds(),
86                                 "stats":   stats,
87                         }).Info("disconnect")
88                         ws.Close()
89                 }),
90         }
91 }
92
93 func (rtr *router) newReqID() string {
94         rtr.lastReqMtx.Lock()
95         defer rtr.lastReqMtx.Unlock()
96         id := time.Now().UnixNano()
97         if id <= rtr.lastReqID {
98                 id = rtr.lastReqID + 1
99         }
100         return strconv.FormatInt(id, 36)
101 }
102
103 func (rtr *router) DebugStatus() interface{} {
104         s := map[string]interface{}{
105                 "HTTP":     rtr.status,
106                 "Outgoing": rtr.handler.DebugStatus(),
107         }
108         if es, ok := rtr.eventSource.(debugStatuser); ok {
109                 s["EventSource"] = es.DebugStatus()
110         }
111         return s
112 }
113
114 var pingResponseOK = map[string]string{"health": "OK"}
115
116 func (rtr *router) HealthFunc(f func() error) func() interface{} {
117         return func() interface{} {
118                 err := f()
119                 if err == nil {
120                         return pingResponseOK
121                 }
122                 return map[string]string{
123                         "health": "ERROR",
124                         "error":  err.Error(),
125                 }
126         }
127 }
128
129 func (rtr *router) Status() interface{} {
130         return map[string]interface{}{
131                 "Clients": atomic.LoadInt64(&rtr.status.ReqsActive),
132         }
133 }
134
135 func (rtr *router) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
136         rtr.setupOnce.Do(rtr.setup)
137         atomic.AddInt64(&rtr.status.ReqsReceived, 1)
138         atomic.AddInt64(&rtr.status.ReqsActive, 1)
139         defer atomic.AddInt64(&rtr.status.ReqsActive, -1)
140
141         logger := logger(req.Context()).
142                 WithField("RequestID", rtr.newReqID())
143         ctx := ctxlog.Context(req.Context(), logger)
144         req = req.WithContext(ctx)
145         logger.WithFields(logrus.Fields{
146                 "remoteAddr":      req.RemoteAddr,
147                 "reqForwardedFor": req.Header.Get("X-Forwarded-For"),
148         }).Info("accept request")
149         rtr.mux.ServeHTTP(resp, req)
150 }
151
152 func (rtr *router) mgmtAuth(h http.Handler) http.Handler {
153         return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
154                 if rtr.Config.ManagementToken == "" {
155                         http.Error(w, "disabled", http.StatusNotFound)
156                 } else if ah := r.Header.Get("Authorization"); ah == "" {
157                         http.Error(w, "authorization required", http.StatusUnauthorized)
158                 } else if ah != "Bearer "+rtr.Config.ManagementToken {
159                         http.Error(w, "authorization error", http.StatusForbidden)
160                 } else {
161                         h.ServeHTTP(w, r)
162                 }
163         })
164 }
165
166 func (rtr *router) jsonHandler(fn func() interface{}) http.Handler {
167         return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
168                 logger := logger(r.Context())
169                 w.Header().Set("Content-Type", "application/json")
170                 enc := json.NewEncoder(w)
171                 err := enc.Encode(fn())
172                 if err != nil {
173                         msg := "encode failed"
174                         logger.WithError(err).Error(msg)
175                         http.Error(w, msg, http.StatusInternalServerError)
176                 }
177         })
178 }