1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
13 "git.arvados.org/arvados.git/sdk/go/arvados"
14 "git.arvados.org/arvados.git/sdk/go/stats"
15 "github.com/sirupsen/logrus"
20 PingTimeout time.Duration
24 lastDelay map[chan interface{}]stats.Duration
28 type handlerStats struct {
29 QueueDelayNs time.Duration
30 WriteDelayNs time.Duration
35 func (h *handler) Handle(ws wsConn, logger logrus.FieldLogger, eventSource eventSource, newSession func(wsConn, chan<- interface{}) (session, error)) (hStats handlerStats) {
36 h.setupOnce.Do(h.setup)
38 ctx, cancel := context.WithCancel(ws.Request().Context())
41 queue := make(chan interface{}, h.QueueSize)
43 h.lastDelay[queue] = 0
47 delete(h.lastDelay, queue)
51 sess, err := newSession(ws, queue)
53 logger.WithError(err).Error("newSession failed")
57 // Receive websocket frames from the client and pass them to
61 buf := make([]byte, 2<<20)
68 ws.SetReadDeadline(time.Now().Add(24 * 365 * time.Hour))
69 n, err := ws.Read(buf)
71 logger.WithField("frame", string(buf[:n])).Debug("received frame")
72 if err == nil && n == cap(buf) {
76 if err != io.EOF && ctx.Err() == nil {
77 logger.WithError(err).Info("read error")
81 err = sess.Receive(buf)
83 logger.WithError(err).Error("sess.Receive() failed")
89 // Take items from the outgoing queue, serialize them using
90 // sess.EventMessage() as needed, and send them to the client
91 // as websocket frames.
100 case data, ok = <-queue:
110 switch data := data.(type) {
115 logger = logger.WithField("serial", e.Serial)
116 buf, err = sess.EventMessage(e)
118 logger.WithError(err).Error("EventMessage failed")
120 } else if len(buf) == 0 {
125 logger.WithField("data", data).Error("bad object in client queue")
129 logger.WithField("frame", string(buf)).Debug("send event")
130 ws.SetWriteDeadline(time.Now().Add(h.PingTimeout))
132 _, err = ws.Write(buf)
134 if ctx.Err() == nil {
135 logger.WithError(err).Error("write failed")
142 hStats.QueueDelayNs += t0.Sub(e.Ready)
144 h.lastDelay[queue] = stats.Duration(time.Since(e.Ready))
147 hStats.WriteDelayNs += time.Since(t0)
148 hStats.EventBytes += uint64(len(buf))
153 // Filter incoming events against the current subscription
154 // list, and forward matching events to the outgoing message
155 // queue. Close the queue and return when the request context
156 // is done/cancelled or the incoming event stream ends. Shut
157 // down the handler if the outgoing queue fills up.
160 ticker := time.NewTicker(h.PingTimeout)
163 incoming := eventSource.NewSink()
164 defer incoming.Stop()
171 // If the outgoing queue is empty,
172 // send an empty message. This can
173 // help detect a disconnected network
174 // socket, and prevent an idle socket
175 // from being closed.
178 case queue <- []byte(`{}`):
182 case e, ok := <-incoming.Channel():
192 logger.WithError(errQueueFull).Error("terminate")
203 func (h *handler) DebugStatus() interface{} {
212 QueueDelayMin stats.Duration
213 QueueDelayMax stats.Duration
215 for q, lastDelay := range h.lastDelay {
218 s.QueueTotal += uint64(n)
222 if s.QueueMin > n || s.QueueCount == 1 {
225 if (s.QueueDelayMin > lastDelay || s.QueueDelayMin == 0) && lastDelay > 0 {
226 s.QueueDelayMin = lastDelay
228 if s.QueueDelayMax < lastDelay {
229 s.QueueDelayMax = lastDelay
235 func (h *handler) setup() {
236 h.lastDelay = make(map[chan interface{}]stats.Duration)