Merge branch '10669-safe-http-cache'
[arvados.git] / services / ws / handler.go
index 91a77022d6030aef29bef64b15ad6951674188ba..72291900fac52c4192f4db971e29bd736be5c189 100644 (file)
@@ -3,16 +3,21 @@ package main
 import (
        "context"
        "io"
+       "sync"
        "time"
 
        "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "git.curoverse.com/arvados.git/sdk/go/stats"
 )
 
 type handler struct {
        Client      arvados.Client
        PingTimeout time.Duration
        QueueSize   int
-       NewSession  func(wsConn, chan<- interface{}) (session, error)
+
+       mtx       sync.Mutex
+       lastDelay map[chan interface{}]stats.Duration
+       setupOnce sync.Once
 }
 
 type handlerStats struct {
@@ -22,16 +27,34 @@ type handlerStats struct {
        EventCount   uint64
 }
 
-func (h *handler) Handle(ws wsConn, incoming <-chan *event) (stats handlerStats) {
+func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsConn, chan<- interface{}) (session, error)) (hStats handlerStats) {
+       h.setupOnce.Do(h.setup)
+
        ctx, cancel := context.WithCancel(ws.Request().Context())
+       defer cancel()
        log := logger(ctx)
+
+       incoming := eventSource.NewSink()
+       defer incoming.Stop()
+
        queue := make(chan interface{}, h.QueueSize)
-       sess, err := h.NewSession(ws, queue)
+       h.mtx.Lock()
+       h.lastDelay[queue] = 0
+       h.mtx.Unlock()
+       defer func() {
+               h.mtx.Lock()
+               delete(h.lastDelay, queue)
+               h.mtx.Unlock()
+       }()
+
+       sess, err := newSession(ws, queue)
        if err != nil {
-               log.WithError(err).Error("NewSession failed")
+               log.WithError(err).Error("newSession failed")
                return
        }
 
+       // Receive websocket frames from the client and pass them to
+       // sess.Receive().
        go func() {
                buf := make([]byte, 2<<20)
                for {
@@ -63,6 +86,9 @@ func (h *handler) Handle(ws wsConn, incoming <-chan *event) (stats handlerStats)
                }
        }()
 
+       // Take items from the outgoing queue, serialize them using
+       // sess.EventMessage() as needed, and send them to the client
+       // as websocket frames.
        go func() {
                for {
                        var ok bool
@@ -112,11 +138,14 @@ func (h *handler) Handle(ws wsConn, incoming <-chan *event) (stats handlerStats)
                        log.Debug("sent")
 
                        if e != nil {
-                               stats.QueueDelayNs += t0.Sub(e.Received)
+                               hStats.QueueDelayNs += t0.Sub(e.Ready)
+                               h.mtx.Lock()
+                               h.lastDelay[queue] = stats.Duration(time.Since(e.Ready))
+                               h.mtx.Unlock()
                        }
-                       stats.WriteDelayNs += time.Since(t0)
-                       stats.EventBytes += uint64(len(buf))
-                       stats.EventCount++
+                       hStats.WriteDelayNs += time.Since(t0)
+                       hStats.EventBytes += uint64(len(buf))
+                       hStats.EventCount++
                }
        }()
 
@@ -146,7 +175,7 @@ func (h *handler) Handle(ws wsConn, incoming <-chan *event) (stats handlerStats)
                                        }
                                }
                                continue
-                       case e, ok := <-incoming:
+                       case e, ok := <-incoming.Channel():
                                if !ok {
                                        cancel()
                                        return
@@ -168,3 +197,39 @@ func (h *handler) Handle(ws wsConn, incoming <-chan *event) (stats handlerStats)
        <-ctx.Done()
        return
 }
+
+func (h *handler) DebugStatus() interface{} {
+       h.mtx.Lock()
+       defer h.mtx.Unlock()
+
+       var s struct {
+               QueueCount    int
+               QueueMin      int
+               QueueMax      int
+               QueueTotal    uint64
+               QueueDelayMin stats.Duration
+               QueueDelayMax stats.Duration
+       }
+       for q, lastDelay := range h.lastDelay {
+               s.QueueCount++
+               n := len(q)
+               s.QueueTotal += uint64(n)
+               if s.QueueMax < n {
+                       s.QueueMax = n
+               }
+               if s.QueueMin > n || s.QueueCount == 1 {
+                       s.QueueMin = n
+               }
+               if (s.QueueDelayMin > lastDelay || s.QueueDelayMin == 0) && lastDelay > 0 {
+                       s.QueueDelayMin = lastDelay
+               }
+               if s.QueueDelayMax < lastDelay {
+                       s.QueueDelayMax = lastDelay
+               }
+       }
+       return &s
+}
+
+func (h *handler) setup() {
+       h.lastDelay = make(map[chan interface{}]stats.Duration)
+}