1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
16 "git.arvados.org/arvados.git/lib/cmd"
17 "git.arvados.org/arvados.git/sdk/go/arvados"
18 "git.arvados.org/arvados.git/sdk/go/ctxlog"
19 "git.arvados.org/arvados.git/sdk/go/health"
20 "github.com/sirupsen/logrus"
21 "golang.org/x/net/websocket"
24 type wsConn interface {
26 Request() *http.Request
27 SetReadDeadline(time.Time) error
28 SetWriteDeadline(time.Time) error
32 client *arvados.Client
33 cluster *arvados.Cluster
34 eventSource eventSource
35 newPermChecker func() permChecker
45 status routerDebugStatus
48 type routerDebugStatus struct {
53 type debugStatuser interface {
54 DebugStatus() interface{}
57 func (rtr *router) setup() {
58 rtr.handler = &handler{
59 PingTimeout: time.Duration(rtr.cluster.API.SendTimeout),
60 QueueSize: rtr.cluster.API.WebsocketClientEventQueue,
62 rtr.mux = http.NewServeMux()
63 rtr.mux.Handle("/websocket", rtr.makeServer(newSessionV0))
64 rtr.mux.Handle("/arvados/v1/events.ws", rtr.makeServer(newSessionV1))
65 rtr.mux.Handle("/debug.json", rtr.jsonHandler(rtr.DebugStatus))
66 rtr.mux.Handle("/status.json", rtr.jsonHandler(rtr.Status))
68 rtr.mux.Handle("/_health/", &health.Handler{
69 Token: rtr.cluster.ManagementToken,
71 Routes: health.Routes{
72 "db": rtr.eventSource.DBHealth,
74 Log: func(r *http.Request, err error) {
76 ctxlog.FromContext(r.Context()).WithError(err).Error("error")
82 func (rtr *router) makeServer(newSession sessionFactory) *websocket.Server {
83 return &websocket.Server{
84 Handshake: func(c *websocket.Config, r *http.Request) error {
87 Handler: websocket.Handler(func(ws *websocket.Conn) {
89 logger := ctxlog.FromContext(ws.Request().Context())
90 logger.Info("connected")
92 stats := rtr.handler.Handle(ws, logger, rtr.eventSource,
93 func(ws wsConn, sendq chan<- interface{}) (session, error) {
94 return newSession(ws, sendq, rtr.eventSource.DB(), rtr.newPermChecker(), rtr.client)
97 logger.WithFields(logrus.Fields{
98 "elapsed": time.Now().Sub(t0).Seconds(),
100 }).Info("disconnect")
106 func (rtr *router) newReqID() string {
107 rtr.lastReqMtx.Lock()
108 defer rtr.lastReqMtx.Unlock()
109 id := time.Now().UnixNano()
110 if id <= rtr.lastReqID {
111 id = rtr.lastReqID + 1
113 return strconv.FormatInt(id, 36)
116 func (rtr *router) DebugStatus() interface{} {
117 s := map[string]interface{}{
119 "Outgoing": rtr.handler.DebugStatus(),
121 if es, ok := rtr.eventSource.(debugStatuser); ok {
122 s["EventSource"] = es.DebugStatus()
127 func (rtr *router) Status() interface{} {
128 return map[string]interface{}{
129 "Clients": atomic.LoadInt64(&rtr.status.ReqsActive),
130 "Version": cmd.Version.String(),
134 func (rtr *router) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
135 rtr.setupOnce.Do(rtr.setup)
136 atomic.AddInt64(&rtr.status.ReqsReceived, 1)
137 atomic.AddInt64(&rtr.status.ReqsActive, 1)
138 defer atomic.AddInt64(&rtr.status.ReqsActive, -1)
140 logger := ctxlog.FromContext(req.Context()).
141 WithField("RequestID", rtr.newReqID())
142 ctx := ctxlog.Context(req.Context(), logger)
143 req = req.WithContext(ctx)
144 logger.WithFields(logrus.Fields{
145 "remoteAddr": req.RemoteAddr,
146 "reqForwardedFor": req.Header.Get("X-Forwarded-For"),
147 }).Info("accept request")
148 rtr.mux.ServeHTTP(resp, req)
151 func (rtr *router) jsonHandler(fn func() interface{}) http.Handler {
152 return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
153 logger := ctxlog.FromContext(r.Context())
154 w.Header().Set("Content-Type", "application/json")
155 enc := json.NewEncoder(w)
156 err := enc.Encode(fn())
158 msg := "encode failed"
159 logger.WithError(err).Error(msg)
160 http.Error(w, msg, http.StatusInternalServerError)
165 func (rtr *router) CheckHealth() error {
166 rtr.setupOnce.Do(rtr.setup)
167 return rtr.eventSource.DBHealth()
170 func (rtr *router) Done() <-chan struct{} {