8460: Accept 403 response from API as "not permitted", instead of "unexpected error".
[arvados.git] / services / ws / handler.go
1 package main
2
3 import (
4         "io"
5         "time"
6
7         "git.curoverse.com/arvados.git/sdk/go/arvados"
8 )
9
10 type handler struct {
11         Client      arvados.Client
12         PingTimeout time.Duration
13         QueueSize   int
14         NewSession  func(wsConn, chan<- interface{}) (session, error)
15 }
16
17 type handlerStats struct {
18         QueueDelayNs time.Duration
19         WriteDelayNs time.Duration
20         EventBytes   uint64
21         EventCount   uint64
22 }
23
24 func (h *handler) Handle(ws wsConn, incoming <-chan *event) (stats handlerStats) {
25         log := logger(ws.Request().Context())
26         queue := make(chan interface{}, h.QueueSize)
27         sess, err := h.NewSession(ws, queue)
28         if err != nil {
29                 log.WithError(err).Error("NewSession failed")
30                 return
31         }
32
33         stopped := make(chan struct{})
34         stop := make(chan error, 5)
35
36         go func() {
37                 buf := make([]byte, 2<<20)
38                 for {
39                         select {
40                         case <-stopped:
41                                 return
42                         default:
43                         }
44                         ws.SetReadDeadline(time.Now().Add(24 * 365 * time.Hour))
45                         n, err := ws.Read(buf)
46                         buf := buf[:n]
47                         log.WithField("frame", string(buf[:n])).Debug("received frame")
48                         if err == nil && n == cap(buf) {
49                                 err = errFrameTooBig
50                         }
51                         if err != nil {
52                                 if err != io.EOF {
53                                         log.WithError(err).Info("read error")
54                                 }
55                                 stop <- err
56                                 return
57                         }
58                         err = sess.Receive(buf)
59                         if err != nil {
60                                 stop <- err
61                                 return
62                         }
63                 }
64         }()
65
66         go func() {
67                 for data := range queue {
68                         var e *event
69                         var buf []byte
70                         var err error
71                         log := log
72
73                         switch data := data.(type) {
74                         case []byte:
75                                 buf = data
76                         case *event:
77                                 e = data
78                                 log = log.WithField("serial", e.Serial)
79                                 buf, err = sess.EventMessage(e)
80                                 if err != nil {
81                                         log.WithError(err).Error("EventMessage failed")
82                                         stop <- err
83                                         break
84                                 } else if len(buf) == 0 {
85                                         log.Debug("skip")
86                                         continue
87                                 }
88                         default:
89                                 log.WithField("data", data).Error("bad object in client queue")
90                                 continue
91                         }
92
93                         log.WithField("frame", string(buf)).Debug("send event")
94                         ws.SetWriteDeadline(time.Now().Add(h.PingTimeout))
95                         t0 := time.Now()
96                         _, err = ws.Write(buf)
97                         if err != nil {
98                                 log.WithError(err).Error("write failed")
99                                 stop <- err
100                                 break
101                         }
102                         log.Debug("sent")
103
104                         if e != nil {
105                                 stats.QueueDelayNs += t0.Sub(e.Received)
106                         }
107                         stats.WriteDelayNs += time.Since(t0)
108                         stats.EventBytes += uint64(len(buf))
109                         stats.EventCount++
110                 }
111                 for _ = range queue {
112                         // Ensure queue can't fill up and block other
113                         // goroutines after we hit a write error.
114                 }
115         }()
116
117         // Filter incoming events against the current subscription
118         // list, and forward matching events to the outgoing message
119         // queue. Close the queue and return when the "stopped"
120         // channel closes or the incoming event stream ends. Shut down
121         // the handler if the outgoing queue fills up.
122         go func() {
123                 ticker := time.NewTicker(h.PingTimeout)
124                 defer ticker.Stop()
125
126                 for {
127                         select {
128                         case <-stopped:
129                                 close(queue)
130                                 return
131                         case <-ticker.C:
132                                 // If the outgoing queue is empty,
133                                 // send an empty message. This can
134                                 // help detect a disconnected network
135                                 // socket, and prevent an idle socket
136                                 // from being closed.
137                                 if len(queue) == 0 {
138                                         queue <- []byte(`{}`)
139                                 }
140                                 continue
141                         case e, ok := <-incoming:
142                                 if !ok {
143                                         close(queue)
144                                         return
145                                 }
146                                 if !sess.Filter(e) {
147                                         continue
148                                 }
149                                 select {
150                                 case queue <- e:
151                                 default:
152                                         stop <- errQueueFull
153                                 }
154                         }
155                 }
156         }()
157
158         <-stop
159         close(stopped)
160
161         return
162 }