1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
16 "git.arvados.org/arvados.git/lib/cmd"
17 "git.arvados.org/arvados.git/sdk/go/arvados"
18 "git.arvados.org/arvados.git/sdk/go/ctxlog"
19 "git.arvados.org/arvados.git/sdk/go/health"
20 "github.com/sirupsen/logrus"
21 "golang.org/x/net/websocket"
24 type wsConn interface {
26 Request() *http.Request
27 SetReadDeadline(time.Time) error
28 SetWriteDeadline(time.Time) error
32 client *arvados.Client
33 cluster *arvados.Cluster
34 eventSource eventSource
35 newPermChecker func() permChecker
44 status routerDebugStatus
47 type routerDebugStatus struct {
52 type debugStatuser interface {
53 DebugStatus() interface{}
56 func (rtr *router) setup() {
57 rtr.handler = &handler{
58 PingTimeout: time.Duration(rtr.cluster.API.SendTimeout),
59 QueueSize: rtr.cluster.API.WebsocketClientEventQueue,
61 rtr.mux = http.NewServeMux()
62 rtr.mux.Handle("/websocket", rtr.makeServer(newSessionV0))
63 rtr.mux.Handle("/arvados/v1/events.ws", rtr.makeServer(newSessionV1))
64 rtr.mux.Handle("/debug.json", rtr.jsonHandler(rtr.DebugStatus))
65 rtr.mux.Handle("/status.json", rtr.jsonHandler(rtr.Status))
67 rtr.mux.Handle("/_health/", &health.Handler{
68 Token: rtr.cluster.ManagementToken,
70 Routes: health.Routes{
71 "db": rtr.eventSource.DBHealth,
73 Log: func(r *http.Request, err error) {
75 ctxlog.FromContext(r.Context()).WithError(err).Error("error")
81 func (rtr *router) makeServer(newSession sessionFactory) *websocket.Server {
82 return &websocket.Server{
83 Handshake: func(c *websocket.Config, r *http.Request) error {
86 Handler: websocket.Handler(func(ws *websocket.Conn) {
88 logger := ctxlog.FromContext(ws.Request().Context())
89 logger.Info("connected")
91 stats := rtr.handler.Handle(ws, logger, rtr.eventSource,
92 func(ws wsConn, sendq chan<- interface{}) (session, error) {
93 return newSession(ws, sendq, rtr.eventSource.DB(), rtr.newPermChecker(), rtr.client)
96 logger.WithFields(logrus.Fields{
97 "elapsed": time.Now().Sub(t0).Seconds(),
105 func (rtr *router) newReqID() string {
106 rtr.lastReqMtx.Lock()
107 defer rtr.lastReqMtx.Unlock()
108 id := time.Now().UnixNano()
109 if id <= rtr.lastReqID {
110 id = rtr.lastReqID + 1
112 return strconv.FormatInt(id, 36)
115 func (rtr *router) DebugStatus() interface{} {
116 s := map[string]interface{}{
118 "Outgoing": rtr.handler.DebugStatus(),
120 if es, ok := rtr.eventSource.(debugStatuser); ok {
121 s["EventSource"] = es.DebugStatus()
126 func (rtr *router) Status() interface{} {
127 return map[string]interface{}{
128 "Clients": atomic.LoadInt64(&rtr.status.ReqsActive),
129 "Version": cmd.Version.String(),
133 func (rtr *router) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
134 rtr.setupOnce.Do(rtr.setup)
135 atomic.AddInt64(&rtr.status.ReqsReceived, 1)
136 atomic.AddInt64(&rtr.status.ReqsActive, 1)
137 defer atomic.AddInt64(&rtr.status.ReqsActive, -1)
139 logger := ctxlog.FromContext(req.Context()).
140 WithField("RequestID", rtr.newReqID())
141 ctx := ctxlog.Context(req.Context(), logger)
142 req = req.WithContext(ctx)
143 logger.WithFields(logrus.Fields{
144 "remoteAddr": req.RemoteAddr,
145 "reqForwardedFor": req.Header.Get("X-Forwarded-For"),
146 }).Info("accept request")
147 rtr.mux.ServeHTTP(resp, req)
150 func (rtr *router) jsonHandler(fn func() interface{}) http.Handler {
151 return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
152 logger := ctxlog.FromContext(r.Context())
153 w.Header().Set("Content-Type", "application/json")
154 enc := json.NewEncoder(w)
155 err := enc.Encode(fn())
157 msg := "encode failed"
158 logger.WithError(err).Error(msg)
159 http.Error(w, msg, http.StatusInternalServerError)
164 func (rtr *router) CheckHealth() error {
165 rtr.setupOnce.Do(rtr.setup)
166 return rtr.eventSource.DBHealth()