8460: Add godoc page.
[arvados.git] / services / ws / router.go
index 19f7d1878c2c869efe8b3c5ecd4c531046a5935d..073a398a29a0d88480bb3c1f52a5848abf4f2fcc 100644 (file)
@@ -26,6 +26,7 @@ type router struct {
        eventSource    eventSource
        newPermChecker func() permChecker
 
+       handler   *handler
        mux       *http.ServeMux
        setupOnce sync.Once
 
@@ -36,7 +37,8 @@ type router struct {
 }
 
 type routerStatus struct {
-       Connections int64
+       ReqsReceived int64
+       ReqsActive   int64
 }
 
 type Statuser interface {
@@ -46,6 +48,10 @@ type Statuser interface {
 type sessionFactory func(wsConn, chan<- interface{}, *sql.DB, permChecker) (session, error)
 
 func (rtr *router) setup() {
+       rtr.handler = &handler{
+               PingTimeout: rtr.Config.PingTimeout.Duration(),
+               QueueSize:   rtr.Config.ClientEventQueue,
+       }
        rtr.mux = http.NewServeMux()
        rtr.mux.Handle("/websocket", rtr.makeServer(NewSessionV0))
        rtr.mux.Handle("/arvados/v1/events.ws", rtr.makeServer(NewSessionV1))
@@ -53,30 +59,24 @@ func (rtr *router) setup() {
 }
 
 func (rtr *router) makeServer(newSession sessionFactory) *websocket.Server {
-       handler := &handler{
-               PingTimeout: rtr.Config.PingTimeout.Duration(),
-               QueueSize:   rtr.Config.ClientEventQueue,
-               NewSession: func(ws wsConn, sendq chan<- interface{}) (session, error) {
-                       return newSession(ws, sendq, rtr.eventSource.DB(), rtr.newPermChecker())
-               },
-       }
        return &websocket.Server{
                Handshake: func(c *websocket.Config, r *http.Request) error {
                        return nil
                },
                Handler: websocket.Handler(func(ws *websocket.Conn) {
                        t0 := time.Now()
-                       sink := rtr.eventSource.NewSink()
                        log := logger(ws.Request().Context())
                        log.Info("connected")
 
-                       stats := handler.Handle(ws, sink.Channel())
+                       stats := rtr.handler.Handle(ws, rtr.eventSource,
+                               func(ws wsConn, sendq chan<- interface{}) (session, error) {
+                                       return newSession(ws, sendq, rtr.eventSource.DB(), rtr.newPermChecker())
+                               })
 
                        log.WithFields(logrus.Fields{
                                "Elapsed": time.Now().Sub(t0).Seconds(),
                                "Stats":   stats,
                        }).Info("disconnect")
-                       sink.Stop()
                        ws.Close()
                }),
        }
@@ -94,7 +94,8 @@ func (rtr *router) newReqID() string {
 
 func (rtr *router) Status() interface{} {
        s := map[string]interface{}{
-               "Router": rtr.status,
+               "HTTP":     rtr.status,
+               "Outgoing": rtr.handler.Status(),
        }
        if es, ok := rtr.eventSource.(Statuser); ok {
                s["EventSource"] = es.Status()
@@ -115,8 +116,9 @@ func (rtr *router) serveStatus(resp http.ResponseWriter, req *http.Request) {
 
 func (rtr *router) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
        rtr.setupOnce.Do(rtr.setup)
-       atomic.AddInt64(&rtr.status.Connections, 1)
-       defer atomic.AddInt64(&rtr.status.Connections, -1)
+       atomic.AddInt64(&rtr.status.ReqsReceived, 1)
+       atomic.AddInt64(&rtr.status.ReqsActive, 1)
+       defer atomic.AddInt64(&rtr.status.ReqsActive, -1)
 
        logger := logger(req.Context()).
                WithField("RequestID", rtr.newReqID())