1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
16 "git.curoverse.com/arvados.git/sdk/go/arvados"
17 "git.curoverse.com/arvados.git/sdk/go/ctxlog"
18 "git.curoverse.com/arvados.git/sdk/go/health"
19 "github.com/sirupsen/logrus"
20 "golang.org/x/net/websocket"
23 type wsConn interface {
25 Request() *http.Request
26 SetReadDeadline(time.Time) error
27 SetWriteDeadline(time.Time) error
32 cluster *arvados.Cluster
33 eventSource eventSource
34 newPermChecker func() permChecker
43 status routerDebugStatus
46 type routerDebugStatus struct {
51 type debugStatuser interface {
52 DebugStatus() interface{}
55 func (rtr *router) setup() {
56 rtr.handler = &handler{
57 PingTimeout: time.Duration(rtr.cluster.API.SendTimeout),
58 QueueSize: rtr.cluster.API.WebsocketClientEventQueue,
60 rtr.mux = http.NewServeMux()
61 rtr.mux.Handle("/websocket", rtr.makeServer(newSessionV0))
62 rtr.mux.Handle("/arvados/v1/events.ws", rtr.makeServer(newSessionV1))
63 rtr.mux.Handle("/debug.json", rtr.jsonHandler(rtr.DebugStatus))
64 rtr.mux.Handle("/status.json", rtr.jsonHandler(rtr.Status))
66 rtr.mux.Handle("/_health/", &health.Handler{
67 Token: rtr.cluster.ManagementToken,
69 Routes: health.Routes{
70 "db": rtr.eventSource.DBHealth,
72 Log: func(r *http.Request, err error) {
74 logger(r.Context()).WithError(err).Error("error")
80 func (rtr *router) makeServer(newSession sessionFactory) *websocket.Server {
81 return &websocket.Server{
82 Handshake: func(c *websocket.Config, r *http.Request) error {
85 Handler: websocket.Handler(func(ws *websocket.Conn) {
87 log := logger(ws.Request().Context())
90 stats := rtr.handler.Handle(ws, rtr.eventSource,
91 func(ws wsConn, sendq chan<- interface{}) (session, error) {
92 return newSession(ws, sendq, rtr.eventSource.DB(), rtr.newPermChecker(), &rtr.client)
95 log.WithFields(logrus.Fields{
96 "elapsed": time.Now().Sub(t0).Seconds(),
104 func (rtr *router) newReqID() string {
105 rtr.lastReqMtx.Lock()
106 defer rtr.lastReqMtx.Unlock()
107 id := time.Now().UnixNano()
108 if id <= rtr.lastReqID {
109 id = rtr.lastReqID + 1
111 return strconv.FormatInt(id, 36)
114 func (rtr *router) DebugStatus() interface{} {
115 s := map[string]interface{}{
117 "Outgoing": rtr.handler.DebugStatus(),
119 if es, ok := rtr.eventSource.(debugStatuser); ok {
120 s["EventSource"] = es.DebugStatus()
125 func (rtr *router) Status() interface{} {
126 return map[string]interface{}{
127 "Clients": atomic.LoadInt64(&rtr.status.ReqsActive),
132 func (rtr *router) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
133 rtr.setupOnce.Do(rtr.setup)
134 atomic.AddInt64(&rtr.status.ReqsReceived, 1)
135 atomic.AddInt64(&rtr.status.ReqsActive, 1)
136 defer atomic.AddInt64(&rtr.status.ReqsActive, -1)
138 logger := logger(req.Context()).
139 WithField("RequestID", rtr.newReqID())
140 ctx := ctxlog.Context(req.Context(), logger)
141 req = req.WithContext(ctx)
142 logger.WithFields(logrus.Fields{
143 "remoteAddr": req.RemoteAddr,
144 "reqForwardedFor": req.Header.Get("X-Forwarded-For"),
145 }).Info("accept request")
146 rtr.mux.ServeHTTP(resp, req)
149 func (rtr *router) jsonHandler(fn func() interface{}) http.Handler {
150 return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
151 logger := logger(r.Context())
152 w.Header().Set("Content-Type", "application/json")
153 enc := json.NewEncoder(w)
154 err := enc.Encode(fn())
156 msg := "encode failed"
157 logger.WithError(err).Error(msg)
158 http.Error(w, msg, http.StatusInternalServerError)