10 "git.curoverse.com/arvados.git/sdk/go/arvados"
13 type wsConn interface {
15 Request() *http.Request
16 SetReadDeadline(time.Time) error
17 SetWriteDeadline(time.Time) error
22 PingTimeout time.Duration
24 NewSession func(wsConn, arvados.Client) (session, error)
27 func (h *handler) Handle(ws wsConn, events <-chan *event) {
28 sess, err := h.NewSession(ws, h.Client)
30 log.Printf("%s NewSession: %s", ws.Request().RemoteAddr, err)
34 queue := make(chan *event, h.QueueSize)
36 stopped := make(chan struct{})
37 stop := make(chan error, 5)
40 buf := make([]byte, 2<<20)
47 ws.SetReadDeadline(time.Now().Add(24 * 365 * time.Hour))
48 n, err := ws.Read(buf)
49 sess.debugLogf("received frame: %q", buf[:n])
50 if err == nil && n == len(buf) {
55 sess.debugLogf("handler: read: %s", err)
60 msg := make(map[string]interface{})
61 err = json.Unmarshal(buf[:n], &msg)
63 sess.debugLogf("handler: unmarshal: %s", err)
67 sess.Receive(msg, buf[:n])
72 for e := range queue {
74 ws.SetWriteDeadline(time.Now().Add(h.PingTimeout))
75 _, err := ws.Write([]byte("{}"))
77 sess.debugLogf("handler: write {}: %s", err)
84 buf, err := sess.EventMessage(e)
86 sess.debugLogf("EventMessage %d: err %s", err)
89 } else if len(buf) == 0 {
90 sess.debugLogf("EventMessage %d: skip", e.Serial)
94 sess.debugLogf("handler: send event %d: %q", e.Serial, buf)
95 ws.SetWriteDeadline(time.Now().Add(h.PingTimeout))
96 _, err = ws.Write(buf)
98 sess.debugLogf("handler: write: %s", err)
102 sess.debugLogf("handler: sent event %d", e.Serial)
104 for _ = range queue {
108 // Filter incoming events against the current subscription
109 // list, and forward matching events to the outgoing message
110 // queue. Close the queue and return when the "stopped"
111 // channel closes or the incoming event stream ends. Shut down
112 // the handler if the outgoing queue fills up.
114 send := func(e *event) {
122 ticker := time.NewTicker(h.PingTimeout)
133 // If the outgoing queue is empty,
134 // send an empty message. This can
135 // help detect a disconnected network
136 // socket, and prevent an idle socket
137 // from being closed.
142 case e, ok = <-events: