projects
/
arvados.git
/ blobdiff
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
13804: Smarter shutdown behavior WIP
[arvados.git]
/
services
/
ws
/
handler.go
diff --git
a/services/ws/handler.go
b/services/ws/handler.go
index dace39be35952c99d55e9cc89f4cefcde3978663..d527c39ba1c4eeb12c0cbae63526150da27f096d 100644
(file)
--- a/
services/ws/handler.go
+++ b/
services/ws/handler.go
@@
-1,13
+1,17
@@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
package main
import (
"context"
package main
import (
"context"
- "fmt"
"io"
"sync"
"time"
"git.curoverse.com/arvados.git/sdk/go/arvados"
"io"
"sync"
"time"
"git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.curoverse.com/arvados.git/sdk/go/stats"
)
type handler struct {
)
type handler struct {
@@
-16,7
+20,7
@@
type handler struct {
QueueSize int
mtx sync.Mutex
QueueSize int
mtx sync.Mutex
- lastDelay map[chan interface{}]
time
.Duration
+ lastDelay map[chan interface{}]
stats
.Duration
setupOnce sync.Once
}
setupOnce sync.Once
}
@@
-27,10
+31,11
@@
type handlerStats struct {
EventCount uint64
}
EventCount uint64
}
-func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsConn, chan<- interface{}) (session, error)) (
s
tats handlerStats) {
+func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsConn, chan<- interface{}) (session, error)) (
hS
tats handlerStats) {
h.setupOnce.Do(h.setup)
ctx, cancel := context.WithCancel(ws.Request().Context())
h.setupOnce.Do(h.setup)
ctx, cancel := context.WithCancel(ws.Request().Context())
+ defer cancel()
log := logger(ctx)
incoming := eventSource.NewSink()
log := logger(ctx)
incoming := eventSource.NewSink()
@@
-52,7
+57,10
@@
func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC
return
}
return
}
+ // Receive websocket frames from the client and pass them to
+ // sess.Receive().
go func() {
go func() {
+ defer cancel()
buf := make([]byte, 2<<20)
for {
select {
buf := make([]byte, 2<<20)
for {
select {
@@
-68,22
+76,24
@@
func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC
err = errFrameTooBig
}
if err != nil {
err = errFrameTooBig
}
if err != nil {
- if err != io.EOF {
+ if err != io.EOF
&& ctx.Err() == nil
{
log.WithError(err).Info("read error")
}
log.WithError(err).Info("read error")
}
- cancel()
return
}
err = sess.Receive(buf)
if err != nil {
log.WithError(err).Error("sess.Receive() failed")
return
}
err = sess.Receive(buf)
if err != nil {
log.WithError(err).Error("sess.Receive() failed")
- cancel()
return
}
}
}()
return
}
}
}()
+ // Take items from the outgoing queue, serialize them using
+ // sess.EventMessage() as needed, and send them to the client
+ // as websocket frames.
go func() {
go func() {
+ defer cancel()
for {
var ok bool
var data interface{}
for {
var ok bool
var data interface{}
@@
-109,8
+119,7
@@
func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC
buf, err = sess.EventMessage(e)
if err != nil {
log.WithError(err).Error("EventMessage failed")
buf, err = sess.EventMessage(e)
if err != nil {
log.WithError(err).Error("EventMessage failed")
- cancel()
- break
+ return
} else if len(buf) == 0 {
log.Debug("skip")
continue
} else if len(buf) == 0 {
log.Debug("skip")
continue
@@
-125,21
+134,22
@@
func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC
t0 := time.Now()
_, err = ws.Write(buf)
if err != nil {
t0 := time.Now()
_, err = ws.Write(buf)
if err != nil {
- log.WithError(err).Error("write failed")
- cancel()
- break
+ if ctx.Err() == nil {
+ log.WithError(err).Error("write failed")
+ }
+ return
}
log.Debug("sent")
if e != nil {
}
log.Debug("sent")
if e != nil {
-
s
tats.QueueDelayNs += t0.Sub(e.Ready)
+
hS
tats.QueueDelayNs += t0.Sub(e.Ready)
h.mtx.Lock()
h.mtx.Lock()
- h.lastDelay[queue] =
time.Since(e.Ready
)
+ h.lastDelay[queue] =
stats.Duration(time.Since(e.Ready)
)
h.mtx.Unlock()
}
h.mtx.Unlock()
}
-
s
tats.WriteDelayNs += time.Since(t0)
-
s
tats.EventBytes += uint64(len(buf))
-
s
tats.EventCount++
+
hS
tats.WriteDelayNs += time.Since(t0)
+
hS
tats.EventBytes += uint64(len(buf))
+
hS
tats.EventCount++
}
}()
}
}()
@@
-149,6
+159,7
@@
func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC
// is done/cancelled or the incoming event stream ends. Shut
// down the handler if the outgoing queue fills up.
go func() {
// is done/cancelled or the incoming event stream ends. Shut
// down the handler if the outgoing queue fills up.
go func() {
+ defer cancel()
ticker := time.NewTicker(h.PingTimeout)
defer ticker.Stop()
ticker := time.NewTicker(h.PingTimeout)
defer ticker.Stop()
@@
-168,10
+179,8
@@
func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC
default:
}
}
default:
}
}
- continue
case e, ok := <-incoming.Channel():
if !ok {
case e, ok := <-incoming.Channel():
if !ok {
- cancel()
return
}
if !sess.Filter(e) {
return
}
if !sess.Filter(e) {
@@
-181,7
+190,6
@@
func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC
case queue <- e:
default:
log.WithError(errQueueFull).Error("terminate")
case queue <- e:
default:
log.WithError(errQueueFull).Error("terminate")
- cancel()
return
}
}
return
}
}
@@
-192,7
+200,7
@@
func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC
return
}
return
}
-func (h *handler) Status() interface{} {
+func (h *handler)
Debug
Status() interface{} {
h.mtx.Lock()
defer h.mtx.Unlock()
h.mtx.Lock()
defer h.mtx.Unlock()
@@
-201,10
+209,8
@@
func (h *handler) Status() interface{} {
QueueMin int
QueueMax int
QueueTotal uint64
QueueMin int
QueueMax int
QueueTotal uint64
- queueDelayMin time.Duration
- QueueDelayMin string
- queueDelayMax time.Duration
- QueueDelayMax string
+ QueueDelayMin stats.Duration
+ QueueDelayMax stats.Duration
}
for q, lastDelay := range h.lastDelay {
s.QueueCount++
}
for q, lastDelay := range h.lastDelay {
s.QueueCount++
@@
-216,18
+222,16
@@
func (h *handler) Status() interface{} {
if s.QueueMin > n || s.QueueCount == 1 {
s.QueueMin = n
}
if s.QueueMin > n || s.QueueCount == 1 {
s.QueueMin = n
}
- if (s.
queueDelayMin > lastDelay || s.q
ueueDelayMin == 0) && lastDelay > 0 {
- s.
q
ueueDelayMin = lastDelay
+ if (s.
QueueDelayMin > lastDelay || s.Q
ueueDelayMin == 0) && lastDelay > 0 {
+ s.
Q
ueueDelayMin = lastDelay
}
}
- if s.
q
ueueDelayMax < lastDelay {
- s.
q
ueueDelayMax = lastDelay
+ if s.
Q
ueueDelayMax < lastDelay {
+ s.
Q
ueueDelayMax = lastDelay
}
}
}
}
- s.QueueDelayMin = fmt.Sprintf("%.06f", s.queueDelayMin.Seconds())
- s.QueueDelayMax = fmt.Sprintf("%.06f", s.queueDelayMax.Seconds())
return &s
}
func (h *handler) setup() {
return &s
}
func (h *handler) setup() {
- h.lastDelay = make(map[chan interface{}]
time
.Duration)
+ h.lastDelay = make(map[chan interface{}]
stats
.Duration)
}
}