package main
import (
+ "context"
"io"
"time"
}
func (h *handler) Handle(ws wsConn, incoming <-chan *event) (stats handlerStats) {
- log := logger(ws.Request().Context())
+ ctx, cancel := context.WithCancel(ws.Request().Context())
+ log := logger(ctx)
queue := make(chan interface{}, h.QueueSize)
sess, err := h.NewSession(ws, queue)
if err != nil {
return
}
- stopped := make(chan struct{})
- stop := make(chan error, 5)
-
go func() {
buf := make([]byte, 2<<20)
for {
select {
- case <-stopped:
+ case <-ctx.Done():
return
default:
}
if err != io.EOF {
log.WithError(err).Info("read error")
}
- stop <- err
+ cancel()
return
}
err = sess.Receive(buf)
if err != nil {
- stop <- err
+ log.WithError(err).Error("sess.Receive() failed")
+ cancel()
return
}
}
}()
go func() {
- for data := range queue {
+ for {
+ var ok bool
+ var data interface{}
+ select {
+ case <-ctx.Done():
+ return
+ case data, ok = <-queue:
+ if !ok {
+ return
+ }
+ }
var e *event
var buf []byte
var err error
buf, err = sess.EventMessage(e)
if err != nil {
log.WithError(err).Error("EventMessage failed")
- stop <- err
+ cancel()
break
} else if len(buf) == 0 {
log.Debug("skip")
_, err = ws.Write(buf)
if err != nil {
log.WithError(err).Error("write failed")
- stop <- err
+ cancel()
break
}
log.Debug("sent")
stats.EventBytes += uint64(len(buf))
stats.EventCount++
}
- for _ = range queue {
- // Ensure queue can't fill up and block other
- // goroutines after we hit a write error.
- }
}()
// Filter incoming events against the current subscription
// list, and forward matching events to the outgoing message
- // queue. Close the queue and return when the "stopped"
- // channel closes or the incoming event stream ends. Shut down
- // the handler if the outgoing queue fills up.
+ // queue. Close the queue and return when the request context
+ // is done/cancelled or the incoming event stream ends. Shut
+ // down the handler if the outgoing queue fills up.
go func() {
ticker := time.NewTicker(h.PingTimeout)
defer ticker.Stop()
for {
select {
- case <-stopped:
- close(queue)
+ case <-ctx.Done():
return
case <-ticker.C:
// If the outgoing queue is empty,
// socket, and prevent an idle socket
// from being closed.
if len(queue) == 0 {
- queue <- []byte(`{}`)
+ select {
+ case queue <- []byte(`{}`):
+ default:
+ }
}
continue
case e, ok := <-incoming:
if !ok {
- close(queue)
+ cancel()
return
}
if !sess.Filter(e) {
select {
case queue <- e:
default:
- stop <- errQueueFull
+ log.WithError(errQueueFull).Error("terminate")
+ cancel()
+ return
}
}
}
}()
- <-stop
- close(stopped)
-
+ <-ctx.Done()
return
}