X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/d3a6d626ab4534865a14e8a34295a65e92036f37..7ef471d9d0513b0b06cf0007e575f16eda93ec29:/services/ws/handler.go diff --git a/services/ws/handler.go b/services/ws/handler.go index 2b94693610..b07b78cc7c 100644 --- a/services/ws/handler.go +++ b/services/ws/handler.go @@ -1,19 +1,23 @@ package main import ( - "encoding/json" + "context" "io" + "sync" "time" "git.curoverse.com/arvados.git/sdk/go/arvados" - log "github.com/Sirupsen/logrus" + "git.curoverse.com/arvados.git/sdk/go/stats" ) type handler struct { Client arvados.Client PingTimeout time.Duration QueueSize int - NewSession func(wsConn) (session, error) + + mtx sync.Mutex + lastDelay map[chan interface{}]stats.Duration + setupOnce sync.Once } type handlerStats struct { @@ -23,79 +27,97 @@ type handlerStats struct { EventCount uint64 } -func (h *handler) Handle(ws wsConn, events <-chan *event) (stats handlerStats) { - ctx := contextWithLogger(ws.Request().Context(), log.WithFields(log.Fields{ - "RemoteAddr": ws.Request().RemoteAddr, - })) - sess, err := h.NewSession(ws) - if err != nil { - logger(ctx).WithError(err).Error("NewSession failed") - return - } +func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsConn, chan<- interface{}) (session, error)) (hStats handlerStats) { + h.setupOnce.Do(h.setup) + + ctx, cancel := context.WithCancel(ws.Request().Context()) + defer cancel() + log := logger(ctx) + + incoming := eventSource.NewSink() + defer incoming.Stop() queue := make(chan interface{}, h.QueueSize) + h.mtx.Lock() + h.lastDelay[queue] = 0 + h.mtx.Unlock() + defer func() { + h.mtx.Lock() + delete(h.lastDelay, queue) + h.mtx.Unlock() + }() - stopped := make(chan struct{}) - stop := make(chan error, 5) + sess, err := newSession(ws, queue) + if err != nil { + log.WithError(err).Error("newSession failed") + return + } go func() { buf := make([]byte, 2<<20) for { select { - case <-stopped: + case <-ctx.Done(): return default: } ws.SetReadDeadline(time.Now().Add(24 * 365 * time.Hour)) n, err := ws.Read(buf) - logger(ctx).WithField("frame", string(buf[:n])).Debug("received frame") - if err == nil && n == len(buf) { + buf := buf[:n] + log.WithField("frame", string(buf[:n])).Debug("received frame") + if err == nil && n == cap(buf) { err = errFrameTooBig } if err != nil { if err != io.EOF { - logger(ctx).WithError(err).Info("read error") + log.WithError(err).Info("read error") } - stop <- err + cancel() return } - msg := make(map[string]interface{}) - err = json.Unmarshal(buf[:n], &msg) + err = sess.Receive(buf) if err != nil { - logger(ctx).WithError(err).Info("invalid json from client") - stop <- err + log.WithError(err).Error("sess.Receive() failed") + cancel() return } - for _, buf := range sess.Receive(msg, buf[:n]) { - logger(ctx).WithField("frame", string(buf)).Debug("queued message from sess.Receive") - queue <- buf - } } }() go func() { - for e := range queue { - if buf, ok := e.([]byte); ok { - ws.SetWriteDeadline(time.Now().Add(h.PingTimeout)) - logger(ctx).WithField("frame", string(buf)).Debug("send msg buf") - _, err := ws.Write(buf) + for { + var ok bool + var data interface{} + select { + case <-ctx.Done(): + return + case data, ok = <-queue: + if !ok { + return + } + } + var e *event + var buf []byte + var err error + log := log + + switch data := data.(type) { + case []byte: + buf = data + case *event: + e = data + log = log.WithField("serial", e.Serial) + buf, err = sess.EventMessage(e) if err != nil { - logger(ctx).WithError(err).Error("write failed") - stop <- err + log.WithError(err).Error("EventMessage failed") + cancel() break + } else if len(buf) == 0 { + log.Debug("skip") + continue } - continue - } - e := e.(*event) - log := logger(ctx).WithField("serial", e.Serial) - - buf, err := sess.EventMessage(e) - if err != nil { - log.WithError(err).Error("EventMessage failed") - stop <- err - break - } else if len(buf) == 0 { - log.Debug("skip") + default: + log.WithField("data", data).Error("bad object in client queue") continue } @@ -105,45 +127,35 @@ func (h *handler) Handle(ws wsConn, events <-chan *event) (stats handlerStats) { _, err = ws.Write(buf) if err != nil { log.WithError(err).Error("write failed") - stop <- err + cancel() break } log.Debug("sent") if e != nil { - stats.QueueDelayNs += t0.Sub(e.Received) + hStats.QueueDelayNs += t0.Sub(e.Ready) + h.mtx.Lock() + h.lastDelay[queue] = stats.Duration(time.Since(e.Ready)) + h.mtx.Unlock() } - stats.WriteDelayNs += time.Since(t0) - stats.EventBytes += uint64(len(buf)) - stats.EventCount++ - } - for _ = range queue { + hStats.WriteDelayNs += time.Since(t0) + hStats.EventBytes += uint64(len(buf)) + hStats.EventCount++ } }() // Filter incoming events against the current subscription // list, and forward matching events to the outgoing message - // queue. Close the queue and return when the "stopped" - // channel closes or the incoming event stream ends. Shut down - // the handler if the outgoing queue fills up. + // queue. Close the queue and return when the request context + // is done/cancelled or the incoming event stream ends. Shut + // down the handler if the outgoing queue fills up. go func() { - send := func(e *event) { - select { - case queue <- e: - default: - stop <- errQueueFull - } - } - ticker := time.NewTicker(h.PingTimeout) defer ticker.Stop() for { - var e *event - var ok bool select { - case <-stopped: - close(queue) + case <-ctx.Done(): return case <-ticker.C: // If the outgoing queue is empty, @@ -152,23 +164,67 @@ func (h *handler) Handle(ws wsConn, events <-chan *event) (stats handlerStats) { // socket, and prevent an idle socket // from being closed. if len(queue) == 0 { - queue <- []byte(`{}`) + select { + case queue <- []byte(`{}`): + default: + } } continue - case e, ok = <-events: + case e, ok := <-incoming.Channel(): if !ok { - close(queue) + cancel() + return + } + if !sess.Filter(e) { + continue + } + select { + case queue <- e: + default: + log.WithError(errQueueFull).Error("terminate") + cancel() return } - } - if sess.Filter(e) { - send(e) } } }() - <-stop - close(stopped) - + <-ctx.Done() return } + +func (h *handler) DebugStatus() interface{} { + h.mtx.Lock() + defer h.mtx.Unlock() + + var s struct { + QueueCount int + QueueMin int + QueueMax int + QueueTotal uint64 + QueueDelayMin stats.Duration + QueueDelayMax stats.Duration + } + for q, lastDelay := range h.lastDelay { + s.QueueCount++ + n := len(q) + s.QueueTotal += uint64(n) + if s.QueueMax < n { + s.QueueMax = n + } + if s.QueueMin > n || s.QueueCount == 1 { + s.QueueMin = n + } + if (s.QueueDelayMin > lastDelay || s.QueueDelayMin == 0) && lastDelay > 0 { + s.QueueDelayMin = lastDelay + } + if s.QueueDelayMax < lastDelay { + s.QueueDelayMax = lastDelay + } + } + return &s +} + +func (h *handler) setup() { + h.lastDelay = make(map[chan interface{}]stats.Duration) +}