// Receive websocket frames from the client and pass them to
// sess.Receive().
go func() {
+ defer cancel()
buf := make([]byte, 2<<20)
for {
select {
if err != io.EOF {
log.WithError(err).Info("read error")
}
- cancel()
return
}
err = sess.Receive(buf)
if err != nil {
log.WithError(err).Error("sess.Receive() failed")
- cancel()
return
}
}
// sess.EventMessage() as needed, and send them to the client
// as websocket frames.
go func() {
+ defer cancel()
for {
var ok bool
var data interface{}
buf, err = sess.EventMessage(e)
if err != nil {
log.WithError(err).Error("EventMessage failed")
- cancel()
- break
+ return
} else if len(buf) == 0 {
log.Debug("skip")
continue
_, err = ws.Write(buf)
if err != nil {
log.WithError(err).Error("write failed")
- cancel()
- break
+ return
}
log.Debug("sent")
// is done/cancelled or the incoming event stream ends. Shut
// down the handler if the outgoing queue fills up.
go func() {
+ defer cancel()
ticker := time.NewTicker(h.PingTimeout)
defer ticker.Stop()
continue
case e, ok := <-incoming.Channel():
if !ok {
- cancel()
return
}
if !sess.Filter(e) {
case queue <- e:
default:
log.WithError(errQueueFull).Error("terminate")
- cancel()
return
}
}