8460: Retrieve recent logs and send old matching events if last_log_id given.
[arvados.git] / services / ws / handler.go
1 package main
2
3 import (
4         "encoding/json"
5         "io"
6         "log"
7         "net/http"
8         "time"
9
10         "git.curoverse.com/arvados.git/sdk/go/arvados"
11 )
12
13 type wsConn interface {
14         io.ReadWriter
15         Request() *http.Request
16         SetReadDeadline(time.Time) error
17         SetWriteDeadline(time.Time) error
18 }
19
20 type handler struct {
21         Client      arvados.Client
22         PingTimeout time.Duration
23         QueueSize   int
24         NewSession  func(wsConn, arvados.Client) (session, error)
25 }
26
27 type handlerStats struct {
28         QueueDelay time.Duration
29         WriteDelay time.Duration
30         EventBytes uint64
31         EventCount uint64
32 }
33
34 func (h *handler) Handle(ws wsConn, events <-chan *event) (stats handlerStats) {
35         sess, err := h.NewSession(ws, h.Client)
36         if err != nil {
37                 log.Printf("%s NewSession: %s", ws.Request().RemoteAddr, err)
38                 return
39         }
40
41         queue := make(chan interface{}, h.QueueSize)
42
43         stopped := make(chan struct{})
44         stop := make(chan error, 5)
45
46         go func() {
47                 buf := make([]byte, 2<<20)
48                 for {
49                         select {
50                         case <-stopped:
51                                 return
52                         default:
53                         }
54                         ws.SetReadDeadline(time.Now().Add(24 * 365 * time.Hour))
55                         n, err := ws.Read(buf)
56                         sess.debugLogf("received frame: %q", buf[:n])
57                         if err == nil && n == len(buf) {
58                                 err = errFrameTooBig
59                         }
60                         if err != nil {
61                                 if err != io.EOF {
62                                         sess.debugLogf("handler: read: %s", err)
63                                 }
64                                 stop <- err
65                                 return
66                         }
67                         msg := make(map[string]interface{})
68                         err = json.Unmarshal(buf[:n], &msg)
69                         if err != nil {
70                                 sess.debugLogf("handler: unmarshal: %s", err)
71                                 stop <- err
72                                 return
73                         }
74                         for _, buf := range sess.Receive(msg, buf[:n]) {
75                                 queue <- buf
76                         }
77                 }
78         }()
79
80         go func() {
81                 for e := range queue {
82                         if buf, ok := e.([]byte); ok {
83                                 ws.SetWriteDeadline(time.Now().Add(h.PingTimeout))
84                                 _, err := ws.Write(buf)
85                                 if err != nil {
86                                         sess.debugLogf("handler: write {}: %s", err)
87                                         stop <- err
88                                         break
89                                 }
90                                 continue
91                         }
92                         e := e.(*event)
93
94                         buf, err := sess.EventMessage(e)
95                         if err != nil {
96                                 sess.debugLogf("EventMessage %d: err %s", err)
97                                 stop <- err
98                                 break
99                         } else if len(buf) == 0 {
100                                 sess.debugLogf("EventMessage %d: skip", e.Serial)
101                                 continue
102                         }
103
104                         sess.debugLogf("handler: send event %d: %q", e.Serial, buf)
105                         ws.SetWriteDeadline(time.Now().Add(h.PingTimeout))
106                         t0 := time.Now()
107                         _, err = ws.Write(buf)
108                         if err != nil {
109                                 sess.debugLogf("handler: write: %s", err)
110                                 stop <- err
111                                 break
112                         }
113                         sess.debugLogf("handler: sent event %d", e.Serial)
114                         stats.WriteDelay += time.Since(t0)
115                         stats.QueueDelay += t0.Sub(e.Received)
116                         stats.EventBytes += uint64(len(buf))
117                         stats.EventCount++
118                 }
119                 for _ = range queue {
120                 }
121         }()
122
123         // Filter incoming events against the current subscription
124         // list, and forward matching events to the outgoing message
125         // queue. Close the queue and return when the "stopped"
126         // channel closes or the incoming event stream ends. Shut down
127         // the handler if the outgoing queue fills up.
128         go func() {
129                 send := func(e *event) {
130                         select {
131                         case queue <- e:
132                         default:
133                                 stop <- errQueueFull
134                         }
135                 }
136
137                 ticker := time.NewTicker(h.PingTimeout)
138                 defer ticker.Stop()
139
140                 for {
141                         var e *event
142                         var ok bool
143                         select {
144                         case <-stopped:
145                                 close(queue)
146                                 return
147                         case <-ticker.C:
148                                 // If the outgoing queue is empty,
149                                 // send an empty message. This can
150                                 // help detect a disconnected network
151                                 // socket, and prevent an idle socket
152                                 // from being closed.
153                                 if len(queue) == 0 {
154                                         queue <- []byte(`{}`)
155                                 }
156                                 continue
157                         case e, ok = <-events:
158                                 if !ok {
159                                         close(queue)
160                                         return
161                                 }
162                         }
163                         if sess.Filter(e) {
164                                 send(e)
165                         }
166                 }
167         }()
168
169         <-stop
170         close(stopped)
171
172         return
173 }