1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
16 "git.curoverse.com/arvados.git/sdk/go/ctxlog"
17 "github.com/Sirupsen/logrus"
18 "golang.org/x/net/websocket"
21 type wsConn interface {
23 Request() *http.Request
24 SetReadDeadline(time.Time) error
25 SetWriteDeadline(time.Time) error
30 eventSource eventSource
31 newPermChecker func() permChecker
40 status routerDebugStatus
43 type routerDebugStatus struct {
48 type debugStatuser interface {
49 DebugStatus() interface{}
52 func (rtr *router) setup() {
53 rtr.handler = &handler{
54 PingTimeout: rtr.Config.PingTimeout.Duration(),
55 QueueSize: rtr.Config.ClientEventQueue,
57 rtr.mux = http.NewServeMux()
58 rtr.mux.Handle("/websocket", rtr.makeServer(newSessionV0))
59 rtr.mux.Handle("/arvados/v1/events.ws", rtr.makeServer(newSessionV1))
60 rtr.mux.Handle("/debug.json", rtr.jsonHandler(rtr.DebugStatus))
61 rtr.mux.Handle("/status.json", rtr.jsonHandler(rtr.Status))
63 health := http.NewServeMux()
64 rtr.mux.Handle("/_health/", rtr.mgmtAuth(health))
65 health.Handle("/_health/ping", rtr.jsonHandler(rtr.HealthFunc(func() error { return nil })))
66 health.Handle("/_health/db", rtr.jsonHandler(rtr.HealthFunc(rtr.eventSource.DBHealth)))
69 func (rtr *router) makeServer(newSession sessionFactory) *websocket.Server {
70 return &websocket.Server{
71 Handshake: func(c *websocket.Config, r *http.Request) error {
74 Handler: websocket.Handler(func(ws *websocket.Conn) {
76 log := logger(ws.Request().Context())
79 stats := rtr.handler.Handle(ws, rtr.eventSource,
80 func(ws wsConn, sendq chan<- interface{}) (session, error) {
81 return newSession(ws, sendq, rtr.eventSource.DB(), rtr.newPermChecker(), &rtr.Config.Client)
84 log.WithFields(logrus.Fields{
85 "elapsed": time.Now().Sub(t0).Seconds(),
93 func (rtr *router) newReqID() string {
95 defer rtr.lastReqMtx.Unlock()
96 id := time.Now().UnixNano()
97 if id <= rtr.lastReqID {
98 id = rtr.lastReqID + 1
100 return strconv.FormatInt(id, 36)
103 func (rtr *router) DebugStatus() interface{} {
104 s := map[string]interface{}{
106 "Outgoing": rtr.handler.DebugStatus(),
108 if es, ok := rtr.eventSource.(debugStatuser); ok {
109 s["EventSource"] = es.DebugStatus()
114 var pingResponseOK = map[string]string{"health": "OK"}
116 func (rtr *router) HealthFunc(f func() error) func() interface{} {
117 return func() interface{} {
120 return pingResponseOK
122 return map[string]string{
124 "error": err.Error(),
129 func (rtr *router) Status() interface{} {
130 return map[string]interface{}{
131 "Clients": atomic.LoadInt64(&rtr.status.ReqsActive),
135 func (rtr *router) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
136 rtr.setupOnce.Do(rtr.setup)
137 atomic.AddInt64(&rtr.status.ReqsReceived, 1)
138 atomic.AddInt64(&rtr.status.ReqsActive, 1)
139 defer atomic.AddInt64(&rtr.status.ReqsActive, -1)
141 logger := logger(req.Context()).
142 WithField("RequestID", rtr.newReqID())
143 ctx := ctxlog.Context(req.Context(), logger)
144 req = req.WithContext(ctx)
145 logger.WithFields(logrus.Fields{
146 "remoteAddr": req.RemoteAddr,
147 "reqForwardedFor": req.Header.Get("X-Forwarded-For"),
148 }).Info("accept request")
149 rtr.mux.ServeHTTP(resp, req)
152 func (rtr *router) mgmtAuth(h http.Handler) http.Handler {
153 return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
154 if rtr.Config.ManagementToken == "" {
155 http.Error(w, "disabled", http.StatusNotFound)
156 } else if ah := r.Header.Get("Authorization"); ah == "" {
157 http.Error(w, "authorization required", http.StatusUnauthorized)
158 } else if ah != "Bearer "+rtr.Config.ManagementToken {
159 http.Error(w, "authorization error", http.StatusForbidden)
166 func (rtr *router) jsonHandler(fn func() interface{}) http.Handler {
167 return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
168 logger := logger(r.Context())
169 w.Header().Set("Content-Type", "application/json")
170 enc := json.NewEncoder(w)
171 err := enc.Encode(fn())
173 msg := "encode failed"
174 logger.WithError(err).Error(msg)
175 http.Error(w, msg, http.StatusInternalServerError)