Merge branch '8784-dir-listings'
[arvados.git] / services / ws / handler.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "context"
9         "io"
10         "sync"
11         "time"
12
13         "git.curoverse.com/arvados.git/sdk/go/arvados"
14         "git.curoverse.com/arvados.git/sdk/go/stats"
15 )
16
17 type handler struct {
18         Client      arvados.Client
19         PingTimeout time.Duration
20         QueueSize   int
21
22         mtx       sync.Mutex
23         lastDelay map[chan interface{}]stats.Duration
24         setupOnce sync.Once
25 }
26
27 type handlerStats struct {
28         QueueDelayNs time.Duration
29         WriteDelayNs time.Duration
30         EventBytes   uint64
31         EventCount   uint64
32 }
33
34 func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsConn, chan<- interface{}) (session, error)) (hStats handlerStats) {
35         h.setupOnce.Do(h.setup)
36
37         ctx, cancel := context.WithCancel(ws.Request().Context())
38         defer cancel()
39         log := logger(ctx)
40
41         incoming := eventSource.NewSink()
42         defer incoming.Stop()
43
44         queue := make(chan interface{}, h.QueueSize)
45         h.mtx.Lock()
46         h.lastDelay[queue] = 0
47         h.mtx.Unlock()
48         defer func() {
49                 h.mtx.Lock()
50                 delete(h.lastDelay, queue)
51                 h.mtx.Unlock()
52         }()
53
54         sess, err := newSession(ws, queue)
55         if err != nil {
56                 log.WithError(err).Error("newSession failed")
57                 return
58         }
59
60         // Receive websocket frames from the client and pass them to
61         // sess.Receive().
62         go func() {
63                 buf := make([]byte, 2<<20)
64                 for {
65                         select {
66                         case <-ctx.Done():
67                                 return
68                         default:
69                         }
70                         ws.SetReadDeadline(time.Now().Add(24 * 365 * time.Hour))
71                         n, err := ws.Read(buf)
72                         buf := buf[:n]
73                         log.WithField("frame", string(buf[:n])).Debug("received frame")
74                         if err == nil && n == cap(buf) {
75                                 err = errFrameTooBig
76                         }
77                         if err != nil {
78                                 if err != io.EOF {
79                                         log.WithError(err).Info("read error")
80                                 }
81                                 cancel()
82                                 return
83                         }
84                         err = sess.Receive(buf)
85                         if err != nil {
86                                 log.WithError(err).Error("sess.Receive() failed")
87                                 cancel()
88                                 return
89                         }
90                 }
91         }()
92
93         // Take items from the outgoing queue, serialize them using
94         // sess.EventMessage() as needed, and send them to the client
95         // as websocket frames.
96         go func() {
97                 for {
98                         var ok bool
99                         var data interface{}
100                         select {
101                         case <-ctx.Done():
102                                 return
103                         case data, ok = <-queue:
104                                 if !ok {
105                                         return
106                                 }
107                         }
108                         var e *event
109                         var buf []byte
110                         var err error
111                         log := log
112
113                         switch data := data.(type) {
114                         case []byte:
115                                 buf = data
116                         case *event:
117                                 e = data
118                                 log = log.WithField("serial", e.Serial)
119                                 buf, err = sess.EventMessage(e)
120                                 if err != nil {
121                                         log.WithError(err).Error("EventMessage failed")
122                                         cancel()
123                                         break
124                                 } else if len(buf) == 0 {
125                                         log.Debug("skip")
126                                         continue
127                                 }
128                         default:
129                                 log.WithField("data", data).Error("bad object in client queue")
130                                 continue
131                         }
132
133                         log.WithField("frame", string(buf)).Debug("send event")
134                         ws.SetWriteDeadline(time.Now().Add(h.PingTimeout))
135                         t0 := time.Now()
136                         _, err = ws.Write(buf)
137                         if err != nil {
138                                 log.WithError(err).Error("write failed")
139                                 cancel()
140                                 break
141                         }
142                         log.Debug("sent")
143
144                         if e != nil {
145                                 hStats.QueueDelayNs += t0.Sub(e.Ready)
146                                 h.mtx.Lock()
147                                 h.lastDelay[queue] = stats.Duration(time.Since(e.Ready))
148                                 h.mtx.Unlock()
149                         }
150                         hStats.WriteDelayNs += time.Since(t0)
151                         hStats.EventBytes += uint64(len(buf))
152                         hStats.EventCount++
153                 }
154         }()
155
156         // Filter incoming events against the current subscription
157         // list, and forward matching events to the outgoing message
158         // queue. Close the queue and return when the request context
159         // is done/cancelled or the incoming event stream ends. Shut
160         // down the handler if the outgoing queue fills up.
161         go func() {
162                 ticker := time.NewTicker(h.PingTimeout)
163                 defer ticker.Stop()
164
165                 for {
166                         select {
167                         case <-ctx.Done():
168                                 return
169                         case <-ticker.C:
170                                 // If the outgoing queue is empty,
171                                 // send an empty message. This can
172                                 // help detect a disconnected network
173                                 // socket, and prevent an idle socket
174                                 // from being closed.
175                                 if len(queue) == 0 {
176                                         select {
177                                         case queue <- []byte(`{}`):
178                                         default:
179                                         }
180                                 }
181                                 continue
182                         case e, ok := <-incoming.Channel():
183                                 if !ok {
184                                         cancel()
185                                         return
186                                 }
187                                 if !sess.Filter(e) {
188                                         continue
189                                 }
190                                 select {
191                                 case queue <- e:
192                                 default:
193                                         log.WithError(errQueueFull).Error("terminate")
194                                         cancel()
195                                         return
196                                 }
197                         }
198                 }
199         }()
200
201         <-ctx.Done()
202         return
203 }
204
205 func (h *handler) DebugStatus() interface{} {
206         h.mtx.Lock()
207         defer h.mtx.Unlock()
208
209         var s struct {
210                 QueueCount    int
211                 QueueMin      int
212                 QueueMax      int
213                 QueueTotal    uint64
214                 QueueDelayMin stats.Duration
215                 QueueDelayMax stats.Duration
216         }
217         for q, lastDelay := range h.lastDelay {
218                 s.QueueCount++
219                 n := len(q)
220                 s.QueueTotal += uint64(n)
221                 if s.QueueMax < n {
222                         s.QueueMax = n
223                 }
224                 if s.QueueMin > n || s.QueueCount == 1 {
225                         s.QueueMin = n
226                 }
227                 if (s.QueueDelayMin > lastDelay || s.QueueDelayMin == 0) && lastDelay > 0 {
228                         s.QueueDelayMin = lastDelay
229                 }
230                 if s.QueueDelayMax < lastDelay {
231                         s.QueueDelayMax = lastDelay
232                 }
233         }
234         return &s
235 }
236
237 func (h *handler) setup() {
238         h.lastDelay = make(map[chan interface{}]stats.Duration)
239 }