1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
16 "git.curoverse.com/arvados.git/sdk/go/ctxlog"
17 "git.curoverse.com/arvados.git/sdk/go/health"
18 "github.com/Sirupsen/logrus"
19 "golang.org/x/net/websocket"
22 type wsConn interface {
24 Request() *http.Request
25 SetReadDeadline(time.Time) error
26 SetWriteDeadline(time.Time) error
31 eventSource eventSource
32 newPermChecker func() permChecker
41 status routerDebugStatus
44 type routerDebugStatus struct {
49 type debugStatuser interface {
50 DebugStatus() interface{}
53 func (rtr *router) setup() {
54 rtr.handler = &handler{
55 PingTimeout: rtr.Config.PingTimeout.Duration(),
56 QueueSize: rtr.Config.ClientEventQueue,
58 rtr.mux = http.NewServeMux()
59 rtr.mux.Handle("/websocket", rtr.makeServer(newSessionV0))
60 rtr.mux.Handle("/arvados/v1/events.ws", rtr.makeServer(newSessionV1))
61 rtr.mux.Handle("/debug.json", rtr.jsonHandler(rtr.DebugStatus))
62 rtr.mux.Handle("/status.json", rtr.jsonHandler(rtr.Status))
64 rtr.mux.Handle("/_health/", &health.Handler{
65 Token: rtr.Config.ManagementToken,
67 Routes: health.Routes{
68 "db": rtr.eventSource.DBHealth,
70 Log: func(r *http.Request, err error) {
72 logger(r.Context()).WithError(err).Error("error")
78 func (rtr *router) makeServer(newSession sessionFactory) *websocket.Server {
79 return &websocket.Server{
80 Handshake: func(c *websocket.Config, r *http.Request) error {
83 Handler: websocket.Handler(func(ws *websocket.Conn) {
85 log := logger(ws.Request().Context())
88 stats := rtr.handler.Handle(ws, rtr.eventSource,
89 func(ws wsConn, sendq chan<- interface{}) (session, error) {
90 return newSession(ws, sendq, rtr.eventSource.DB(), rtr.newPermChecker(), &rtr.Config.Client)
93 log.WithFields(logrus.Fields{
94 "elapsed": time.Now().Sub(t0).Seconds(),
102 func (rtr *router) newReqID() string {
103 rtr.lastReqMtx.Lock()
104 defer rtr.lastReqMtx.Unlock()
105 id := time.Now().UnixNano()
106 if id <= rtr.lastReqID {
107 id = rtr.lastReqID + 1
109 return strconv.FormatInt(id, 36)
112 func (rtr *router) DebugStatus() interface{} {
113 s := map[string]interface{}{
115 "Outgoing": rtr.handler.DebugStatus(),
117 if es, ok := rtr.eventSource.(debugStatuser); ok {
118 s["EventSource"] = es.DebugStatus()
123 func (rtr *router) Status() interface{} {
124 return map[string]interface{}{
125 "Clients": atomic.LoadInt64(&rtr.status.ReqsActive),
129 func (rtr *router) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
130 rtr.setupOnce.Do(rtr.setup)
131 atomic.AddInt64(&rtr.status.ReqsReceived, 1)
132 atomic.AddInt64(&rtr.status.ReqsActive, 1)
133 defer atomic.AddInt64(&rtr.status.ReqsActive, -1)
135 logger := logger(req.Context()).
136 WithField("RequestID", rtr.newReqID())
137 ctx := ctxlog.Context(req.Context(), logger)
138 req = req.WithContext(ctx)
139 logger.WithFields(logrus.Fields{
140 "remoteAddr": req.RemoteAddr,
141 "reqForwardedFor": req.Header.Get("X-Forwarded-For"),
142 }).Info("accept request")
143 rtr.mux.ServeHTTP(resp, req)
146 func (rtr *router) jsonHandler(fn func() interface{}) http.Handler {
147 return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
148 logger := logger(r.Context())
149 w.Header().Set("Content-Type", "application/json")
150 enc := json.NewEncoder(w)
151 err := enc.Encode(fn())
153 msg := "encode failed"
154 logger.WithError(err).Error(msg)
155 http.Error(w, msg, http.StatusInternalServerError)