projects
/
arvados.git
/ blobdiff
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
Merge branch 'master' into 9998-unsigned_manifest
[arvados.git]
/
services
/
ws
/
handler.go
diff --git
a/services/ws/handler.go
b/services/ws/handler.go
index d2c119acfeab53b5a4ed799455e8802114c7c9d1..72291900fac52c4192f4db971e29bd736be5c189 100644
(file)
--- a/
services/ws/handler.go
+++ b/
services/ws/handler.go
@@
-7,6
+7,7
@@
import (
"time"
"git.curoverse.com/arvados.git/sdk/go/arvados"
"time"
"git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.curoverse.com/arvados.git/sdk/go/stats"
)
type handler struct {
)
type handler struct {
@@
-15,7
+16,7
@@
type handler struct {
QueueSize int
mtx sync.Mutex
QueueSize int
mtx sync.Mutex
- queues map[chan interface{}]struct{}
+ lastDelay map[chan interface{}]stats.Duration
setupOnce sync.Once
}
setupOnce sync.Once
}
@@
-26,10
+27,11
@@
type handlerStats struct {
EventCount uint64
}
EventCount uint64
}
-func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsConn, chan<- interface{}) (session, error)) (
s
tats handlerStats) {
+func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsConn, chan<- interface{}) (session, error)) (
hS
tats handlerStats) {
h.setupOnce.Do(h.setup)
ctx, cancel := context.WithCancel(ws.Request().Context())
h.setupOnce.Do(h.setup)
ctx, cancel := context.WithCancel(ws.Request().Context())
+ defer cancel()
log := logger(ctx)
incoming := eventSource.NewSink()
log := logger(ctx)
incoming := eventSource.NewSink()
@@
-37,11
+39,11
@@
func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC
queue := make(chan interface{}, h.QueueSize)
h.mtx.Lock()
queue := make(chan interface{}, h.QueueSize)
h.mtx.Lock()
- h.
queues[queue] = struct{}{}
+ h.
lastDelay[queue] = 0
h.mtx.Unlock()
defer func() {
h.mtx.Lock()
h.mtx.Unlock()
defer func() {
h.mtx.Lock()
- delete(h.
queues
, queue)
+ delete(h.
lastDelay
, queue)
h.mtx.Unlock()
}()
h.mtx.Unlock()
}()
@@
-51,6
+53,8
@@
func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC
return
}
return
}
+ // Receive websocket frames from the client and pass them to
+ // sess.Receive().
go func() {
buf := make([]byte, 2<<20)
for {
go func() {
buf := make([]byte, 2<<20)
for {
@@
-82,6
+86,9
@@
func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC
}
}()
}
}()
+ // Take items from the outgoing queue, serialize them using
+ // sess.EventMessage() as needed, and send them to the client
+ // as websocket frames.
go func() {
for {
var ok bool
go func() {
for {
var ok bool
@@
-131,11
+138,14
@@
func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC
log.Debug("sent")
if e != nil {
log.Debug("sent")
if e != nil {
- stats.QueueDelayNs += t0.Sub(e.Received)
+ hStats.QueueDelayNs += t0.Sub(e.Ready)
+ h.mtx.Lock()
+ h.lastDelay[queue] = stats.Duration(time.Since(e.Ready))
+ h.mtx.Unlock()
}
}
-
s
tats.WriteDelayNs += time.Since(t0)
-
s
tats.EventBytes += uint64(len(buf))
-
s
tats.EventCount++
+
hS
tats.WriteDelayNs += time.Since(t0)
+
hS
tats.EventBytes += uint64(len(buf))
+
hS
tats.EventCount++
}
}()
}
}()
@@
-188,26
+198,38
@@
func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC
return
}
return
}
-func (h *handler) Status() interface{} {
+func (h *handler)
Debug
Status() interface{} {
h.mtx.Lock()
defer h.mtx.Unlock()
var s struct {
h.mtx.Lock()
defer h.mtx.Unlock()
var s struct {
- QueueCount int
- QueueMax int
- QueueTotal uint64
+ QueueCount int
+ QueueMin int
+ QueueMax int
+ QueueTotal uint64
+ QueueDelayMin stats.Duration
+ QueueDelayMax stats.Duration
}
}
- for q := range h.queues {
+ for q, lastDelay := range h.lastDelay {
+ s.QueueCount++
n := len(q)
s.QueueTotal += uint64(n)
if s.QueueMax < n {
s.QueueMax = n
}
n := len(q)
s.QueueTotal += uint64(n)
if s.QueueMax < n {
s.QueueMax = n
}
+ if s.QueueMin > n || s.QueueCount == 1 {
+ s.QueueMin = n
+ }
+ if (s.QueueDelayMin > lastDelay || s.QueueDelayMin == 0) && lastDelay > 0 {
+ s.QueueDelayMin = lastDelay
+ }
+ if s.QueueDelayMax < lastDelay {
+ s.QueueDelayMax = lastDelay
+ }
}
}
- s.QueueCount = len(h.queues)
return &s
}
func (h *handler) setup() {
return &s
}
func (h *handler) setup() {
- h.
queues = make(map[chan interface{}]struct{}
)
+ h.
lastDelay = make(map[chan interface{}]stats.Duration
)
}
}