10 "git.curoverse.com/arvados.git/sdk/go/arvados"
13 type wsConn interface {
15 Request() *http.Request
16 SetReadDeadline(time.Time) error
17 SetWriteDeadline(time.Time) error
22 PingTimeout time.Duration
24 NewSession func(wsConn, arvados.Client) (session, error)
27 type handlerStats struct {
28 QueueDelay time.Duration
29 WriteDelay time.Duration
34 func (h *handler) Handle(ws wsConn, events <-chan *event) (stats handlerStats) {
35 sess, err := h.NewSession(ws, h.Client)
37 log.Printf("%s NewSession: %s", ws.Request().RemoteAddr, err)
41 queue := make(chan interface{}, h.QueueSize)
43 stopped := make(chan struct{})
44 stop := make(chan error, 5)
47 buf := make([]byte, 2<<20)
54 ws.SetReadDeadline(time.Now().Add(24 * 365 * time.Hour))
55 n, err := ws.Read(buf)
56 sess.debugLogf("received frame: %q", buf[:n])
57 if err == nil && n == len(buf) {
62 sess.debugLogf("handler: read: %s", err)
67 msg := make(map[string]interface{})
68 err = json.Unmarshal(buf[:n], &msg)
70 sess.debugLogf("handler: unmarshal: %s", err)
74 e := sess.Receive(msg, buf[:n])
82 for e := range queue {
83 if buf, ok := e.([]byte); ok {
84 ws.SetWriteDeadline(time.Now().Add(h.PingTimeout))
85 _, err := ws.Write(buf)
87 sess.debugLogf("handler: write {}: %s", err)
95 buf, err := sess.EventMessage(e)
97 sess.debugLogf("EventMessage %d: err %s", err)
100 } else if len(buf) == 0 {
101 sess.debugLogf("EventMessage %d: skip", e.Serial)
105 sess.debugLogf("handler: send event %d: %q", e.Serial, buf)
106 ws.SetWriteDeadline(time.Now().Add(h.PingTimeout))
108 _, err = ws.Write(buf)
110 sess.debugLogf("handler: write: %s", err)
114 sess.debugLogf("handler: sent event %d", e.Serial)
115 stats.WriteDelay += time.Since(t0)
116 stats.QueueDelay += t0.Sub(e.Received)
117 stats.EventBytes += uint64(len(buf))
120 for _ = range queue {
124 // Filter incoming events against the current subscription
125 // list, and forward matching events to the outgoing message
126 // queue. Close the queue and return when the "stopped"
127 // channel closes or the incoming event stream ends. Shut down
128 // the handler if the outgoing queue fills up.
130 send := func(e *event) {
138 ticker := time.NewTicker(h.PingTimeout)
149 // If the outgoing queue is empty,
150 // send an empty message. This can
151 // help detect a disconnected network
152 // socket, and prevent an idle socket
153 // from being closed.
155 queue <- []byte(`{}`)
158 case e, ok = <-events: