projects
/
arvados.git
/ blobdiff
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
17774: Add non-empty mounts to test fixtures.
[arvados.git]
/
services
/
ws
/
handler.go
diff --git
a/services/ws/handler.go
b/services/ws/handler.go
index ca9231c986de0f75e0655e506b8590c1b17d3d84..912643ad97c6374006b3fd4b00f90d340157d687 100644
(file)
--- a/
services/ws/handler.go
+++ b/
services/ws/handler.go
@@
-1,4
+1,8
@@
-package main
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package ws
import (
"context"
import (
"context"
@@
-6,8
+10,9
@@
import (
"sync"
"time"
"sync"
"time"
- "git.curoverse.com/arvados.git/sdk/go/arvados"
- "git.curoverse.com/arvados.git/sdk/go/stats"
+ "git.arvados.org/arvados.git/sdk/go/arvados"
+ "git.arvados.org/arvados.git/sdk/go/stats"
+ "github.com/sirupsen/logrus"
)
type handler struct {
)
type handler struct {
@@
-27,11
+32,11
@@
type handlerStats struct {
EventCount uint64
}
EventCount uint64
}
-func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsConn, chan<- interface{}) (session, error)) (hStats handlerStats) {
+func (h *handler) Handle(ws wsConn,
logger logrus.FieldLogger,
eventSource eventSource, newSession func(wsConn, chan<- interface{}) (session, error)) (hStats handlerStats) {
h.setupOnce.Do(h.setup)
ctx, cancel := context.WithCancel(ws.Request().Context())
h.setupOnce.Do(h.setup)
ctx, cancel := context.WithCancel(ws.Request().Context())
-
log := logger(ctx
)
+
defer cancel(
)
incoming := eventSource.NewSink()
defer incoming.Stop()
incoming := eventSource.NewSink()
defer incoming.Stop()
@@
-48,11
+53,14
@@
func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC
sess, err := newSession(ws, queue)
if err != nil {
sess, err := newSession(ws, queue)
if err != nil {
- log.WithError(err).Error("newSession failed")
+ log
ger
.WithError(err).Error("newSession failed")
return
}
return
}
+ // Receive websocket frames from the client and pass them to
+ // sess.Receive().
go func() {
go func() {
+ defer cancel()
buf := make([]byte, 2<<20)
for {
select {
buf := make([]byte, 2<<20)
for {
select {
@@
-63,27
+71,29
@@
func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC
ws.SetReadDeadline(time.Now().Add(24 * 365 * time.Hour))
n, err := ws.Read(buf)
buf := buf[:n]
ws.SetReadDeadline(time.Now().Add(24 * 365 * time.Hour))
n, err := ws.Read(buf)
buf := buf[:n]
- log.WithField("frame", string(buf[:n])).Debug("received frame")
+ log
ger
.WithField("frame", string(buf[:n])).Debug("received frame")
if err == nil && n == cap(buf) {
err = errFrameTooBig
}
if err != nil {
if err == nil && n == cap(buf) {
err = errFrameTooBig
}
if err != nil {
- if err != io.EOF {
- log.WithError(err).Info("read error")
+ if err != io.EOF
&& ctx.Err() == nil
{
+ log
ger
.WithError(err).Info("read error")
}
}
- cancel()
return
}
err = sess.Receive(buf)
if err != nil {
return
}
err = sess.Receive(buf)
if err != nil {
- log.WithError(err).Error("sess.Receive() failed")
- cancel()
+ logger.WithError(err).Error("sess.Receive() failed")
return
}
}
}()
return
}
}
}()
+ // Take items from the outgoing queue, serialize them using
+ // sess.EventMessage() as needed, and send them to the client
+ // as websocket frames.
go func() {
go func() {
+ defer cancel()
for {
var ok bool
var data interface{}
for {
var ok bool
var data interface{}
@@
-98,38
+108,38
@@
func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC
var e *event
var buf []byte
var err error
var e *event
var buf []byte
var err error
- log
:= log
+ log
ger := logger
switch data := data.(type) {
case []byte:
buf = data
case *event:
e = data
switch data := data.(type) {
case []byte:
buf = data
case *event:
e = data
- log
= log
.WithField("serial", e.Serial)
+ log
ger = logger
.WithField("serial", e.Serial)
buf, err = sess.EventMessage(e)
if err != nil {
buf, err = sess.EventMessage(e)
if err != nil {
- log.WithError(err).Error("EventMessage failed")
- cancel()
- break
+ logger.WithError(err).Error("EventMessage failed")
+ return
} else if len(buf) == 0 {
} else if len(buf) == 0 {
- log.Debug("skip")
+ log
ger
.Debug("skip")
continue
}
default:
continue
}
default:
- log.WithField("data", data).Error("bad object in client queue")
+ log
ger
.WithField("data", data).Error("bad object in client queue")
continue
}
continue
}
- log.WithField("frame", string(buf)).Debug("send event")
+ log
ger
.WithField("frame", string(buf)).Debug("send event")
ws.SetWriteDeadline(time.Now().Add(h.PingTimeout))
t0 := time.Now()
_, err = ws.Write(buf)
if err != nil {
ws.SetWriteDeadline(time.Now().Add(h.PingTimeout))
t0 := time.Now()
_, err = ws.Write(buf)
if err != nil {
- log.WithError(err).Error("write failed")
- cancel()
- break
+ if ctx.Err() == nil {
+ logger.WithError(err).Error("write failed")
+ }
+ return
}
}
- log.Debug("sent")
+ log
ger
.Debug("sent")
if e != nil {
hStats.QueueDelayNs += t0.Sub(e.Ready)
if e != nil {
hStats.QueueDelayNs += t0.Sub(e.Ready)
@@
-149,6
+159,7
@@
func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC
// is done/cancelled or the incoming event stream ends. Shut
// down the handler if the outgoing queue fills up.
go func() {
// is done/cancelled or the incoming event stream ends. Shut
// down the handler if the outgoing queue fills up.
go func() {
+ defer cancel()
ticker := time.NewTicker(h.PingTimeout)
defer ticker.Stop()
ticker := time.NewTicker(h.PingTimeout)
defer ticker.Stop()
@@
-168,10
+179,8
@@
func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC
default:
}
}
default:
}
}
- continue
case e, ok := <-incoming.Channel():
if !ok {
case e, ok := <-incoming.Channel():
if !ok {
- cancel()
return
}
if !sess.Filter(e) {
return
}
if !sess.Filter(e) {
@@
-180,8
+189,7
@@
func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC
select {
case queue <- e:
default:
select {
case queue <- e:
default:
- log.WithError(errQueueFull).Error("terminate")
- cancel()
+ logger.WithError(errQueueFull).Error("terminate")
return
}
}
return
}
}