7 "git.curoverse.com/arvados.git/sdk/go/arvados"
12 PingTimeout time.Duration
14 NewSession func(wsConn, chan<- interface{}) (session, error)
17 type handlerStats struct {
18 QueueDelayNs time.Duration
19 WriteDelayNs time.Duration
24 func (h *handler) Handle(ws wsConn, incoming <-chan *event) (stats handlerStats) {
25 log := logger(ws.Request().Context())
26 queue := make(chan interface{}, h.QueueSize)
27 sess, err := h.NewSession(ws, queue)
29 log.WithError(err).Error("NewSession failed")
33 stopped := make(chan struct{})
34 stop := make(chan error, 5)
37 buf := make([]byte, 2<<20)
44 ws.SetReadDeadline(time.Now().Add(24 * 365 * time.Hour))
45 n, err := ws.Read(buf)
47 log.WithField("frame", string(buf[:n])).Debug("received frame")
48 if err == nil && n == cap(buf) {
53 log.WithError(err).Info("read error")
58 err = sess.Receive(buf)
67 for data := range queue {
73 switch data := data.(type) {
78 log = log.WithField("serial", e.Serial)
79 buf, err = sess.EventMessage(e)
81 log.WithError(err).Error("EventMessage failed")
84 } else if len(buf) == 0 {
89 log.WithField("data", data).Error("bad object in client queue")
93 log.WithField("frame", string(buf)).Debug("send event")
94 ws.SetWriteDeadline(time.Now().Add(h.PingTimeout))
96 _, err = ws.Write(buf)
98 log.WithError(err).Error("write failed")
105 stats.QueueDelayNs += t0.Sub(e.Received)
107 stats.WriteDelayNs += time.Since(t0)
108 stats.EventBytes += uint64(len(buf))
111 for _ = range queue {
112 // Ensure queue can't fill up and block other
113 // goroutines after we hit a write error.
117 // Filter incoming events against the current subscription
118 // list, and forward matching events to the outgoing message
119 // queue. Close the queue and return when the "stopped"
120 // channel closes or the incoming event stream ends. Shut down
121 // the handler if the outgoing queue fills up.
123 ticker := time.NewTicker(h.PingTimeout)
132 // If the outgoing queue is empty,
133 // send an empty message. This can
134 // help detect a disconnected network
135 // socket, and prevent an idle socket
136 // from being closed.
138 queue <- []byte(`{}`)
141 case e, ok := <-incoming: