9945: Merge branch 'master' into 9945-make-python-package-dependency-free
[arvados.git] / services / ws / handler.go
index d2c119acfeab53b5a4ed799455e8802114c7c9d1..d527c39ba1c4eeb12c0cbae63526150da27f096d 100644 (file)
@@ -1,3 +1,7 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
 package main
 
 import (
@@ -7,6 +11,7 @@ import (
        "time"
 
        "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "git.curoverse.com/arvados.git/sdk/go/stats"
 )
 
 type handler struct {
@@ -15,7 +20,7 @@ type handler struct {
        QueueSize   int
 
        mtx       sync.Mutex
-       queues    map[chan interface{}]struct{}
+       lastDelay map[chan interface{}]stats.Duration
        setupOnce sync.Once
 }
 
@@ -26,10 +31,11 @@ type handlerStats struct {
        EventCount   uint64
 }
 
-func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsConn, chan<- interface{}) (session, error)) (stats handlerStats) {
+func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsConn, chan<- interface{}) (session, error)) (hStats handlerStats) {
        h.setupOnce.Do(h.setup)
 
        ctx, cancel := context.WithCancel(ws.Request().Context())
+       defer cancel()
        log := logger(ctx)
 
        incoming := eventSource.NewSink()
@@ -37,11 +43,11 @@ func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC
 
        queue := make(chan interface{}, h.QueueSize)
        h.mtx.Lock()
-       h.queues[queue] = struct{}{}
+       h.lastDelay[queue] = 0
        h.mtx.Unlock()
        defer func() {
                h.mtx.Lock()
-               delete(h.queues, queue)
+               delete(h.lastDelay, queue)
                h.mtx.Unlock()
        }()
 
@@ -51,7 +57,10 @@ func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC
                return
        }
 
+       // Receive websocket frames from the client and pass them to
+       // sess.Receive().
        go func() {
+               defer cancel()
                buf := make([]byte, 2<<20)
                for {
                        select {
@@ -67,22 +76,24 @@ func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC
                                err = errFrameTooBig
                        }
                        if err != nil {
-                               if err != io.EOF {
+                               if err != io.EOF && ctx.Err() == nil {
                                        log.WithError(err).Info("read error")
                                }
-                               cancel()
                                return
                        }
                        err = sess.Receive(buf)
                        if err != nil {
                                log.WithError(err).Error("sess.Receive() failed")
-                               cancel()
                                return
                        }
                }
        }()
 
+       // Take items from the outgoing queue, serialize them using
+       // sess.EventMessage() as needed, and send them to the client
+       // as websocket frames.
        go func() {
+               defer cancel()
                for {
                        var ok bool
                        var data interface{}
@@ -108,8 +119,7 @@ func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC
                                buf, err = sess.EventMessage(e)
                                if err != nil {
                                        log.WithError(err).Error("EventMessage failed")
-                                       cancel()
-                                       break
+                                       return
                                } else if len(buf) == 0 {
                                        log.Debug("skip")
                                        continue
@@ -124,18 +134,22 @@ func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC
                        t0 := time.Now()
                        _, err = ws.Write(buf)
                        if err != nil {
-                               log.WithError(err).Error("write failed")
-                               cancel()
-                               break
+                               if ctx.Err() == nil {
+                                       log.WithError(err).Error("write failed")
+                               }
+                               return
                        }
                        log.Debug("sent")
 
                        if e != nil {
-                               stats.QueueDelayNs += t0.Sub(e.Received)
+                               hStats.QueueDelayNs += t0.Sub(e.Ready)
+                               h.mtx.Lock()
+                               h.lastDelay[queue] = stats.Duration(time.Since(e.Ready))
+                               h.mtx.Unlock()
                        }
-                       stats.WriteDelayNs += time.Since(t0)
-                       stats.EventBytes += uint64(len(buf))
-                       stats.EventCount++
+                       hStats.WriteDelayNs += time.Since(t0)
+                       hStats.EventBytes += uint64(len(buf))
+                       hStats.EventCount++
                }
        }()
 
@@ -145,6 +159,7 @@ func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC
        // is done/cancelled or the incoming event stream ends. Shut
        // down the handler if the outgoing queue fills up.
        go func() {
+               defer cancel()
                ticker := time.NewTicker(h.PingTimeout)
                defer ticker.Stop()
 
@@ -164,10 +179,8 @@ func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC
                                        default:
                                        }
                                }
-                               continue
                        case e, ok := <-incoming.Channel():
                                if !ok {
-                                       cancel()
                                        return
                                }
                                if !sess.Filter(e) {
@@ -177,7 +190,6 @@ func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC
                                case queue <- e:
                                default:
                                        log.WithError(errQueueFull).Error("terminate")
-                                       cancel()
                                        return
                                }
                        }
@@ -188,26 +200,38 @@ func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC
        return
 }
 
-func (h *handler) Status() interface{} {
+func (h *handler) DebugStatus() interface{} {
        h.mtx.Lock()
        defer h.mtx.Unlock()
 
        var s struct {
-               QueueCount int
-               QueueMax   int
-               QueueTotal uint64
+               QueueCount    int
+               QueueMin      int
+               QueueMax      int
+               QueueTotal    uint64
+               QueueDelayMin stats.Duration
+               QueueDelayMax stats.Duration
        }
-       for q := range h.queues {
+       for q, lastDelay := range h.lastDelay {
+               s.QueueCount++
                n := len(q)
                s.QueueTotal += uint64(n)
                if s.QueueMax < n {
                        s.QueueMax = n
                }
+               if s.QueueMin > n || s.QueueCount == 1 {
+                       s.QueueMin = n
+               }
+               if (s.QueueDelayMin > lastDelay || s.QueueDelayMin == 0) && lastDelay > 0 {
+                       s.QueueDelayMin = lastDelay
+               }
+               if s.QueueDelayMax < lastDelay {
+                       s.QueueDelayMax = lastDelay
+               }
        }
-       s.QueueCount = len(h.queues)
        return &s
 }
 
 func (h *handler) setup() {
-       h.queues = make(map[chan interface{}]struct{})
+       h.lastDelay = make(map[chan interface{}]stats.Duration)
 }