+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
package main
import (
"context"
- "fmt"
"io"
"sync"
"time"
"git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.curoverse.com/arvados.git/sdk/go/stats"
)
type handler struct {
QueueSize int
mtx sync.Mutex
- lastDelay map[chan interface{}]time.Duration
+ lastDelay map[chan interface{}]stats.Duration
setupOnce sync.Once
}
EventCount uint64
}
-func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsConn, chan<- interface{}) (session, error)) (stats handlerStats) {
+func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsConn, chan<- interface{}) (session, error)) (hStats handlerStats) {
h.setupOnce.Do(h.setup)
ctx, cancel := context.WithCancel(ws.Request().Context())
+ defer cancel()
log := logger(ctx)
incoming := eventSource.NewSink()
return
}
+ // Receive websocket frames from the client and pass them to
+ // sess.Receive().
go func() {
buf := make([]byte, 2<<20)
for {
}
}()
+ // Take items from the outgoing queue, serialize them using
+ // sess.EventMessage() as needed, and send them to the client
+ // as websocket frames.
go func() {
for {
var ok bool
log.Debug("sent")
if e != nil {
- stats.QueueDelayNs += t0.Sub(e.Ready)
+ hStats.QueueDelayNs += t0.Sub(e.Ready)
h.mtx.Lock()
- h.lastDelay[queue] = time.Since(e.Ready)
+ h.lastDelay[queue] = stats.Duration(time.Since(e.Ready))
h.mtx.Unlock()
}
- stats.WriteDelayNs += time.Since(t0)
- stats.EventBytes += uint64(len(buf))
- stats.EventCount++
+ hStats.WriteDelayNs += time.Since(t0)
+ hStats.EventBytes += uint64(len(buf))
+ hStats.EventCount++
}
}()
return
}
-func (h *handler) Status() interface{} {
+func (h *handler) DebugStatus() interface{} {
h.mtx.Lock()
defer h.mtx.Unlock()
QueueMin int
QueueMax int
QueueTotal uint64
- queueDelayMin time.Duration
- QueueDelayMin string
- queueDelayMax time.Duration
- QueueDelayMax string
+ QueueDelayMin stats.Duration
+ QueueDelayMax stats.Duration
}
for q, lastDelay := range h.lastDelay {
s.QueueCount++
if s.QueueMin > n || s.QueueCount == 1 {
s.QueueMin = n
}
- if (s.queueDelayMin > lastDelay || s.queueDelayMin == 0) && lastDelay > 0 {
- s.queueDelayMin = lastDelay
+ if (s.QueueDelayMin > lastDelay || s.QueueDelayMin == 0) && lastDelay > 0 {
+ s.QueueDelayMin = lastDelay
}
- if s.queueDelayMax < lastDelay {
- s.queueDelayMax = lastDelay
+ if s.QueueDelayMax < lastDelay {
+ s.QueueDelayMax = lastDelay
}
}
- s.QueueDelayMin = fmt.Sprintf("%.06f", s.queueDelayMin.Seconds())
- s.QueueDelayMax = fmt.Sprintf("%.06f", s.queueDelayMax.Seconds())
return &s
}
func (h *handler) setup() {
- h.lastDelay = make(map[chan interface{}]time.Duration)
+ h.lastDelay = make(map[chan interface{}]stats.Duration)
}