+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
package main
import (
// Receive websocket frames from the client and pass them to
// sess.Receive().
go func() {
+ defer cancel()
buf := make([]byte, 2<<20)
for {
select {
err = errFrameTooBig
}
if err != nil {
- if err != io.EOF {
+ if err != io.EOF && ctx.Err() == nil {
log.WithError(err).Info("read error")
}
- cancel()
return
}
err = sess.Receive(buf)
if err != nil {
log.WithError(err).Error("sess.Receive() failed")
- cancel()
return
}
}
// sess.EventMessage() as needed, and send them to the client
// as websocket frames.
go func() {
+ defer cancel()
for {
var ok bool
var data interface{}
buf, err = sess.EventMessage(e)
if err != nil {
log.WithError(err).Error("EventMessage failed")
- cancel()
- break
+ return
} else if len(buf) == 0 {
log.Debug("skip")
continue
t0 := time.Now()
_, err = ws.Write(buf)
if err != nil {
- log.WithError(err).Error("write failed")
- cancel()
- break
+ if ctx.Err() == nil {
+ log.WithError(err).Error("write failed")
+ }
+ return
}
log.Debug("sent")
// is done/cancelled or the incoming event stream ends. Shut
// down the handler if the outgoing queue fills up.
go func() {
+ defer cancel()
ticker := time.NewTicker(h.PingTimeout)
defer ticker.Stop()
default:
}
}
- continue
case e, ok := <-incoming.Channel():
if !ok {
- cancel()
return
}
if !sess.Filter(e) {
case queue <- e:
default:
log.WithError(errQueueFull).Error("terminate")
- cancel()
return
}
}