Arvados-DCO-1.1-Signed-off-by: Radhika Chippada <radhika@curoverse.com>
[arvados.git] / services / ws / handler.go
index dace39be35952c99d55e9cc89f4cefcde3978663..f9f7f53edc58430f231e9a52d5d95bb1a025084a 100644 (file)
@@ -1,13 +1,17 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
 package main
 
 import (
        "context"
-       "fmt"
        "io"
        "sync"
        "time"
 
        "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "git.curoverse.com/arvados.git/sdk/go/stats"
 )
 
 type handler struct {
@@ -16,7 +20,7 @@ type handler struct {
        QueueSize   int
 
        mtx       sync.Mutex
-       lastDelay map[chan interface{}]time.Duration
+       lastDelay map[chan interface{}]stats.Duration
        setupOnce sync.Once
 }
 
@@ -27,10 +31,11 @@ type handlerStats struct {
        EventCount   uint64
 }
 
-func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsConn, chan<- interface{}) (session, error)) (stats handlerStats) {
+func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsConn, chan<- interface{}) (session, error)) (hStats handlerStats) {
        h.setupOnce.Do(h.setup)
 
        ctx, cancel := context.WithCancel(ws.Request().Context())
+       defer cancel()
        log := logger(ctx)
 
        incoming := eventSource.NewSink()
@@ -52,6 +57,8 @@ func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC
                return
        }
 
+       // Receive websocket frames from the client and pass them to
+       // sess.Receive().
        go func() {
                buf := make([]byte, 2<<20)
                for {
@@ -83,6 +90,9 @@ func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC
                }
        }()
 
+       // Take items from the outgoing queue, serialize them using
+       // sess.EventMessage() as needed, and send them to the client
+       // as websocket frames.
        go func() {
                for {
                        var ok bool
@@ -132,14 +142,14 @@ func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC
                        log.Debug("sent")
 
                        if e != nil {
-                               stats.QueueDelayNs += t0.Sub(e.Ready)
+                               hStats.QueueDelayNs += t0.Sub(e.Ready)
                                h.mtx.Lock()
-                               h.lastDelay[queue] = time.Since(e.Ready)
+                               h.lastDelay[queue] = stats.Duration(time.Since(e.Ready))
                                h.mtx.Unlock()
                        }
-                       stats.WriteDelayNs += time.Since(t0)
-                       stats.EventBytes += uint64(len(buf))
-                       stats.EventCount++
+                       hStats.WriteDelayNs += time.Since(t0)
+                       hStats.EventBytes += uint64(len(buf))
+                       hStats.EventCount++
                }
        }()
 
@@ -192,7 +202,7 @@ func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC
        return
 }
 
-func (h *handler) Status() interface{} {
+func (h *handler) DebugStatus() interface{} {
        h.mtx.Lock()
        defer h.mtx.Unlock()
 
@@ -201,10 +211,8 @@ func (h *handler) Status() interface{} {
                QueueMin      int
                QueueMax      int
                QueueTotal    uint64
-               queueDelayMin time.Duration
-               QueueDelayMin string
-               queueDelayMax time.Duration
-               QueueDelayMax string
+               QueueDelayMin stats.Duration
+               QueueDelayMax stats.Duration
        }
        for q, lastDelay := range h.lastDelay {
                s.QueueCount++
@@ -216,18 +224,16 @@ func (h *handler) Status() interface{} {
                if s.QueueMin > n || s.QueueCount == 1 {
                        s.QueueMin = n
                }
-               if (s.queueDelayMin > lastDelay || s.queueDelayMin == 0) && lastDelay > 0 {
-                       s.queueDelayMin = lastDelay
+               if (s.QueueDelayMin > lastDelay || s.QueueDelayMin == 0) && lastDelay > 0 {
+                       s.QueueDelayMin = lastDelay
                }
-               if s.queueDelayMax < lastDelay {
-                       s.queueDelayMax = lastDelay
+               if s.QueueDelayMax < lastDelay {
+                       s.QueueDelayMax = lastDelay
                }
        }
-       s.QueueDelayMin = fmt.Sprintf("%.06f", s.queueDelayMin.Seconds())
-       s.QueueDelayMax = fmt.Sprintf("%.06f", s.queueDelayMax.Seconds())
        return &s
 }
 
 func (h *handler) setup() {
-       h.lastDelay = make(map[chan interface{}]time.Duration)
+       h.lastDelay = make(map[chan interface{}]stats.Duration)
 }