8460: Support created_at filters.
[arvados.git] / services / ws / handler.go
1 package main
2
3 import (
4         "encoding/json"
5         "io"
6         "log"
7         "net/http"
8         "time"
9
10         "git.curoverse.com/arvados.git/sdk/go/arvados"
11 )
12
13 type wsConn interface {
14         io.ReadWriter
15         Request() *http.Request
16         SetReadDeadline(time.Time) error
17         SetWriteDeadline(time.Time) error
18 }
19
20 type handler struct {
21         Client      arvados.Client
22         PingTimeout time.Duration
23         QueueSize   int
24         NewSession  func(wsConn, arvados.Client) (session, error)
25 }
26
27 type handlerStats struct {
28         QueueDelay time.Duration
29         WriteDelay time.Duration
30         EventBytes uint64
31         EventCount uint64
32 }
33
34 func (h *handler) Handle(ws wsConn, events <-chan *event) (stats handlerStats) {
35         sess, err := h.NewSession(ws, h.Client)
36         if err != nil {
37                 log.Printf("%s NewSession: %s", ws.Request().RemoteAddr, err)
38                 return
39         }
40
41         queue := make(chan interface{}, h.QueueSize)
42
43         stopped := make(chan struct{})
44         stop := make(chan error, 5)
45
46         go func() {
47                 buf := make([]byte, 2<<20)
48                 for {
49                         select {
50                         case <-stopped:
51                                 return
52                         default:
53                         }
54                         ws.SetReadDeadline(time.Now().Add(24 * 365 * time.Hour))
55                         n, err := ws.Read(buf)
56                         sess.debugLogf("received frame: %q", buf[:n])
57                         if err == nil && n == len(buf) {
58                                 err = errFrameTooBig
59                         }
60                         if err != nil {
61                                 if err != io.EOF {
62                                         sess.debugLogf("handler: read: %s", err)
63                                 }
64                                 stop <- err
65                                 return
66                         }
67                         msg := make(map[string]interface{})
68                         err = json.Unmarshal(buf[:n], &msg)
69                         if err != nil {
70                                 sess.debugLogf("handler: unmarshal: %s", err)
71                                 stop <- err
72                                 return
73                         }
74                         e := sess.Receive(msg, buf[:n])
75                         if e != nil {
76                                 queue <- e
77                         }
78                 }
79         }()
80
81         go func() {
82                 for e := range queue {
83                         if buf, ok := e.([]byte); ok {
84                                 ws.SetWriteDeadline(time.Now().Add(h.PingTimeout))
85                                 _, err := ws.Write(buf)
86                                 if err != nil {
87                                         sess.debugLogf("handler: write {}: %s", err)
88                                         stop <- err
89                                         break
90                                 }
91                                 continue
92                         }
93                         e := e.(*event)
94
95                         buf, err := sess.EventMessage(e)
96                         if err != nil {
97                                 sess.debugLogf("EventMessage %d: err %s", err)
98                                 stop <- err
99                                 break
100                         } else if len(buf) == 0 {
101                                 sess.debugLogf("EventMessage %d: skip", e.Serial)
102                                 continue
103                         }
104
105                         sess.debugLogf("handler: send event %d: %q", e.Serial, buf)
106                         ws.SetWriteDeadline(time.Now().Add(h.PingTimeout))
107                         t0 := time.Now()
108                         _, err = ws.Write(buf)
109                         if err != nil {
110                                 sess.debugLogf("handler: write: %s", err)
111                                 stop <- err
112                                 break
113                         }
114                         sess.debugLogf("handler: sent event %d", e.Serial)
115                         stats.WriteDelay += time.Since(t0)
116                         stats.QueueDelay += t0.Sub(e.Received)
117                         stats.EventBytes += uint64(len(buf))
118                         stats.EventCount++
119                 }
120                 for _ = range queue {
121                 }
122         }()
123
124         // Filter incoming events against the current subscription
125         // list, and forward matching events to the outgoing message
126         // queue. Close the queue and return when the "stopped"
127         // channel closes or the incoming event stream ends. Shut down
128         // the handler if the outgoing queue fills up.
129         go func() {
130                 send := func(e *event) {
131                         select {
132                         case queue <- e:
133                         default:
134                                 stop <- errQueueFull
135                         }
136                 }
137
138                 ticker := time.NewTicker(h.PingTimeout)
139                 defer ticker.Stop()
140
141                 for {
142                         var e *event
143                         var ok bool
144                         select {
145                         case <-stopped:
146                                 close(queue)
147                                 return
148                         case <-ticker.C:
149                                 // If the outgoing queue is empty,
150                                 // send an empty message. This can
151                                 // help detect a disconnected network
152                                 // socket, and prevent an idle socket
153                                 // from being closed.
154                                 if len(queue) == 0 {
155                                         queue <- []byte(`{}`)
156                                 }
157                                 continue
158                         case e, ok = <-events:
159                                 if !ok {
160                                         close(queue)
161                                         return
162                                 }
163                         }
164                         if sess.Filter(e) {
165                                 send(e)
166                         }
167                 }
168         }()
169
170         <-stop
171         close(stopped)
172
173         return
174 }