projects
/
arvados.git
/ blobdiff
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
20930: Concurrency optimization.
[arvados.git]
/
services
/
ws
/
handler.go
diff --git
a/services/ws/handler.go
b/services/ws/handler.go
index b07b78cc7c3fbbf7c69b39111f1959bb5da96921..8b6e9b97728257e12e86cd6fe306965be2e0a981 100644
(file)
--- a/
services/ws/handler.go
+++ b/
services/ws/handler.go
@@
-1,4
+1,8
@@
-package main
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package ws
import (
"context"
import (
"context"
@@
-6,8
+10,9
@@
import (
"sync"
"time"
"sync"
"time"
- "git.curoverse.com/arvados.git/sdk/go/arvados"
- "git.curoverse.com/arvados.git/sdk/go/stats"
+ "git.arvados.org/arvados.git/sdk/go/arvados"
+ "git.arvados.org/arvados.git/sdk/go/stats"
+ "github.com/sirupsen/logrus"
)
type handler struct {
)
type handler struct {
@@
-27,15
+32,11
@@
type handlerStats struct {
EventCount uint64
}
EventCount uint64
}
-func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsConn, chan<- interface{}) (session, error)) (hStats handlerStats) {
+func (h *handler) Handle(ws wsConn,
logger logrus.FieldLogger,
eventSource eventSource, newSession func(wsConn, chan<- interface{}) (session, error)) (hStats handlerStats) {
h.setupOnce.Do(h.setup)
ctx, cancel := context.WithCancel(ws.Request().Context())
defer cancel()
h.setupOnce.Do(h.setup)
ctx, cancel := context.WithCancel(ws.Request().Context())
defer cancel()
- log := logger(ctx)
-
- incoming := eventSource.NewSink()
- defer incoming.Stop()
queue := make(chan interface{}, h.QueueSize)
h.mtx.Lock()
queue := make(chan interface{}, h.QueueSize)
h.mtx.Lock()
@@
-49,11
+50,14
@@
func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC
sess, err := newSession(ws, queue)
if err != nil {
sess, err := newSession(ws, queue)
if err != nil {
- log.WithError(err).Error("newSession failed")
+ log
ger
.WithError(err).Error("newSession failed")
return
}
return
}
+ // Receive websocket frames from the client and pass them to
+ // sess.Receive().
go func() {
go func() {
+ defer cancel()
buf := make([]byte, 2<<20)
for {
select {
buf := make([]byte, 2<<20)
for {
select {
@@
-64,27
+68,29
@@
func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC
ws.SetReadDeadline(time.Now().Add(24 * 365 * time.Hour))
n, err := ws.Read(buf)
buf := buf[:n]
ws.SetReadDeadline(time.Now().Add(24 * 365 * time.Hour))
n, err := ws.Read(buf)
buf := buf[:n]
- log.WithField("frame", string(buf[:n])).Debug("received frame")
+ log
ger
.WithField("frame", string(buf[:n])).Debug("received frame")
if err == nil && n == cap(buf) {
err = errFrameTooBig
}
if err != nil {
if err == nil && n == cap(buf) {
err = errFrameTooBig
}
if err != nil {
- if err != io.EOF {
- log.WithError(err).Info("read error")
+ if err != io.EOF
&& ctx.Err() == nil
{
+ log
ger
.WithError(err).Info("read error")
}
}
- cancel()
return
}
err = sess.Receive(buf)
if err != nil {
return
}
err = sess.Receive(buf)
if err != nil {
- log.WithError(err).Error("sess.Receive() failed")
- cancel()
+ logger.WithError(err).Error("sess.Receive() failed")
return
}
}
}()
return
}
}
}()
+ // Take items from the outgoing queue, serialize them using
+ // sess.EventMessage() as needed, and send them to the client
+ // as websocket frames.
go func() {
go func() {
+ defer cancel()
for {
var ok bool
var data interface{}
for {
var ok bool
var data interface{}
@@
-99,38
+105,38
@@
func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC
var e *event
var buf []byte
var err error
var e *event
var buf []byte
var err error
- log
:= log
+ log
ger := logger
switch data := data.(type) {
case []byte:
buf = data
case *event:
e = data
switch data := data.(type) {
case []byte:
buf = data
case *event:
e = data
- log
= log
.WithField("serial", e.Serial)
+ log
ger = logger
.WithField("serial", e.Serial)
buf, err = sess.EventMessage(e)
if err != nil {
buf, err = sess.EventMessage(e)
if err != nil {
- log.WithError(err).Error("EventMessage failed")
- cancel()
- break
+ logger.WithError(err).Error("EventMessage failed")
+ return
} else if len(buf) == 0 {
} else if len(buf) == 0 {
- log.Debug("skip")
+ log
ger
.Debug("skip")
continue
}
default:
continue
}
default:
- log.WithField("data", data).Error("bad object in client queue")
+ log
ger
.WithField("data", data).Error("bad object in client queue")
continue
}
continue
}
- log.WithField("frame", string(buf)).Debug("send event")
+ log
ger
.WithField("frame", string(buf)).Debug("send event")
ws.SetWriteDeadline(time.Now().Add(h.PingTimeout))
t0 := time.Now()
_, err = ws.Write(buf)
if err != nil {
ws.SetWriteDeadline(time.Now().Add(h.PingTimeout))
t0 := time.Now()
_, err = ws.Write(buf)
if err != nil {
- log.WithError(err).Error("write failed")
- cancel()
- break
+ if ctx.Err() == nil {
+ logger.WithError(err).Error("write failed")
+ }
+ return
}
}
- log.Debug("sent")
+ log
ger
.Debug("sent")
if e != nil {
hStats.QueueDelayNs += t0.Sub(e.Ready)
if e != nil {
hStats.QueueDelayNs += t0.Sub(e.Ready)
@@
-150,9
+156,13
@@
func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC
// is done/cancelled or the incoming event stream ends. Shut
// down the handler if the outgoing queue fills up.
go func() {
// is done/cancelled or the incoming event stream ends. Shut
// down the handler if the outgoing queue fills up.
go func() {
+ defer cancel()
ticker := time.NewTicker(h.PingTimeout)
defer ticker.Stop()
ticker := time.NewTicker(h.PingTimeout)
defer ticker.Stop()
+ incoming := eventSource.NewSink()
+ defer incoming.Stop()
+
for {
select {
case <-ctx.Done():
for {
select {
case <-ctx.Done():
@@
-169,10
+179,8
@@
func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC
default:
}
}
default:
}
}
- continue
case e, ok := <-incoming.Channel():
if !ok {
case e, ok := <-incoming.Channel():
if !ok {
- cancel()
return
}
if !sess.Filter(e) {
return
}
if !sess.Filter(e) {
@@
-181,8
+189,7
@@
func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC
select {
case queue <- e:
default:
select {
case queue <- e:
default:
- log.WithError(errQueueFull).Error("terminate")
- cancel()
+ logger.WithError(errQueueFull).Error("terminate")
return
}
}
return
}
}