11002: Merge branch 'master' into 11002-arvput-crash-fix
[arvados.git] / services / ws / router.go
1 package main
2
3 import (
4         "encoding/json"
5         "io"
6         "net/http"
7         "strconv"
8         "sync"
9         "sync/atomic"
10         "time"
11
12         "git.curoverse.com/arvados.git/sdk/go/ctxlog"
13         "github.com/Sirupsen/logrus"
14         "golang.org/x/net/websocket"
15 )
16
17 type wsConn interface {
18         io.ReadWriter
19         Request() *http.Request
20         SetReadDeadline(time.Time) error
21         SetWriteDeadline(time.Time) error
22 }
23
24 type router struct {
25         Config         *wsConfig
26         eventSource    eventSource
27         newPermChecker func() permChecker
28
29         handler   *handler
30         mux       *http.ServeMux
31         setupOnce sync.Once
32
33         lastReqID  int64
34         lastReqMtx sync.Mutex
35
36         status routerDebugStatus
37 }
38
39 type routerDebugStatus struct {
40         ReqsReceived int64
41         ReqsActive   int64
42 }
43
44 type debugStatuser interface {
45         DebugStatus() interface{}
46 }
47
48 func (rtr *router) setup() {
49         rtr.handler = &handler{
50                 PingTimeout: rtr.Config.PingTimeout.Duration(),
51                 QueueSize:   rtr.Config.ClientEventQueue,
52         }
53         rtr.mux = http.NewServeMux()
54         rtr.mux.Handle("/websocket", rtr.makeServer(newSessionV0))
55         rtr.mux.Handle("/arvados/v1/events.ws", rtr.makeServer(newSessionV1))
56         rtr.mux.HandleFunc("/debug.json", jsonHandler(rtr.DebugStatus))
57         rtr.mux.HandleFunc("/status.json", jsonHandler(rtr.Status))
58 }
59
60 func (rtr *router) makeServer(newSession sessionFactory) *websocket.Server {
61         return &websocket.Server{
62                 Handshake: func(c *websocket.Config, r *http.Request) error {
63                         return nil
64                 },
65                 Handler: websocket.Handler(func(ws *websocket.Conn) {
66                         t0 := time.Now()
67                         log := logger(ws.Request().Context())
68                         log.Info("connected")
69
70                         stats := rtr.handler.Handle(ws, rtr.eventSource,
71                                 func(ws wsConn, sendq chan<- interface{}) (session, error) {
72                                         return newSession(ws, sendq, rtr.eventSource.DB(), rtr.newPermChecker(), &rtr.Config.Client)
73                                 })
74
75                         log.WithFields(logrus.Fields{
76                                 "elapsed": time.Now().Sub(t0).Seconds(),
77                                 "stats":   stats,
78                         }).Info("disconnect")
79                         ws.Close()
80                 }),
81         }
82 }
83
84 func (rtr *router) newReqID() string {
85         rtr.lastReqMtx.Lock()
86         defer rtr.lastReqMtx.Unlock()
87         id := time.Now().UnixNano()
88         if id <= rtr.lastReqID {
89                 id = rtr.lastReqID + 1
90         }
91         return strconv.FormatInt(id, 36)
92 }
93
94 func (rtr *router) DebugStatus() interface{} {
95         s := map[string]interface{}{
96                 "HTTP":     rtr.status,
97                 "Outgoing": rtr.handler.DebugStatus(),
98         }
99         if es, ok := rtr.eventSource.(debugStatuser); ok {
100                 s["EventSource"] = es.DebugStatus()
101         }
102         return s
103 }
104
105 func (rtr *router) Status() interface{} {
106         return map[string]interface{}{
107                 "Clients": atomic.LoadInt64(&rtr.status.ReqsActive),
108         }
109 }
110
111 func (rtr *router) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
112         rtr.setupOnce.Do(rtr.setup)
113         atomic.AddInt64(&rtr.status.ReqsReceived, 1)
114         atomic.AddInt64(&rtr.status.ReqsActive, 1)
115         defer atomic.AddInt64(&rtr.status.ReqsActive, -1)
116
117         logger := logger(req.Context()).
118                 WithField("RequestID", rtr.newReqID())
119         ctx := ctxlog.Context(req.Context(), logger)
120         req = req.WithContext(ctx)
121         logger.WithFields(logrus.Fields{
122                 "remoteAddr":      req.RemoteAddr,
123                 "reqForwardedFor": req.Header.Get("X-Forwarded-For"),
124         }).Info("accept request")
125         rtr.mux.ServeHTTP(resp, req)
126 }
127
128 func jsonHandler(fn func() interface{}) http.HandlerFunc {
129         return func(resp http.ResponseWriter, req *http.Request) {
130                 logger := logger(req.Context())
131                 resp.Header().Set("Content-Type", "application/json")
132                 enc := json.NewEncoder(resp)
133                 err := enc.Encode(fn())
134                 if err != nil {
135                         msg := "encode failed"
136                         logger.WithError(err).Error(msg)
137                         http.Error(resp, msg, http.StatusInternalServerError)
138                 }
139         }
140 }