8460: Cache negative permission.
[arvados.git] / services / ws / handler.go
1 package main
2
3 import (
4         "encoding/json"
5         "io"
6         "log"
7         "net/http"
8         "time"
9
10         "git.curoverse.com/arvados.git/sdk/go/arvados"
11 )
12
13 type wsConn interface {
14         io.ReadWriter
15         Request() *http.Request
16         SetReadDeadline(time.Time) error
17         SetWriteDeadline(time.Time) error
18 }
19
20 type handler struct {
21         Client      arvados.Client
22         PingTimeout time.Duration
23         QueueSize   int
24         NewSession  func(wsConn, arvados.Client) (session, error)
25 }
26
27 func (h *handler) Handle(ws wsConn, events <-chan *event) {
28         sess, err := h.NewSession(ws, h.Client)
29         if err != nil {
30                 log.Printf("%s NewSession: %s", ws.Request().RemoteAddr, err)
31                 return
32         }
33
34         queue := make(chan *event, h.QueueSize)
35
36         stopped := make(chan struct{})
37         stop := make(chan error, 5)
38
39         go func() {
40                 buf := make([]byte, 2<<20)
41                 for {
42                         select {
43                         case <-stopped:
44                                 return
45                         default:
46                         }
47                         ws.SetReadDeadline(time.Now().Add(24 * 365 * time.Hour))
48                         n, err := ws.Read(buf)
49                         sess.debugLogf("received frame: %q", buf[:n])
50                         if err == nil && n == len(buf) {
51                                 err = errFrameTooBig
52                         }
53                         if err != nil {
54                                 if err != io.EOF {
55                                         sess.debugLogf("handler: read: %s", err)
56                                 }
57                                 stop <- err
58                                 return
59                         }
60                         msg := make(map[string]interface{})
61                         err = json.Unmarshal(buf[:n], &msg)
62                         if err != nil {
63                                 sess.debugLogf("handler: unmarshal: %s", err)
64                                 stop <- err
65                                 return
66                         }
67                         sess.Receive(msg, buf[:n])
68                 }
69         }()
70
71         go func() {
72                 for e := range queue {
73                         if e == nil {
74                                 ws.SetWriteDeadline(time.Now().Add(h.PingTimeout))
75                                 _, err := ws.Write([]byte("{}"))
76                                 if err != nil {
77                                         sess.debugLogf("handler: write {}: %s", err)
78                                         stop <- err
79                                         break
80                                 }
81                                 continue
82                         }
83
84                         buf, err := sess.EventMessage(e)
85                         if err != nil {
86                                 sess.debugLogf("EventMessage %d: err %s", err)
87                                 stop <- err
88                                 break
89                         } else if len(buf) == 0 {
90                                 sess.debugLogf("EventMessage %d: skip", e.Serial)
91                                 continue
92                         }
93
94                         sess.debugLogf("handler: send event %d: %q", e.Serial, buf)
95                         ws.SetWriteDeadline(time.Now().Add(h.PingTimeout))
96                         _, err = ws.Write(buf)
97                         if err != nil {
98                                 sess.debugLogf("handler: write: %s", err)
99                                 stop <- err
100                                 break
101                         }
102                         sess.debugLogf("handler: sent event %d", e.Serial)
103                 }
104                 for _ = range queue {
105                 }
106         }()
107
108         // Filter incoming events against the current subscription
109         // list, and forward matching events to the outgoing message
110         // queue. Close the queue and return when the "stopped"
111         // channel closes or the incoming event stream ends. Shut down
112         // the handler if the outgoing queue fills up.
113         go func() {
114                 send := func(e *event) {
115                         select {
116                         case queue <- e:
117                         default:
118                                 stop <- errQueueFull
119                         }
120                 }
121
122                 ticker := time.NewTicker(h.PingTimeout)
123                 defer ticker.Stop()
124
125                 for {
126                         var e *event
127                         var ok bool
128                         select {
129                         case <-stopped:
130                                 close(queue)
131                                 return
132                         case <-ticker.C:
133                                 // If the outgoing queue is empty,
134                                 // send an empty message. This can
135                                 // help detect a disconnected network
136                                 // socket, and prevent an idle socket
137                                 // from being closed.
138                                 if len(queue) == 0 {
139                                         queue <- nil
140                                 }
141                                 continue
142                         case e, ok = <-events:
143                                 if !ok {
144                                         close(queue)
145                                         return
146                                 }
147                         }
148                         if sess.Filter(e) {
149                                 send(e)
150                         }
151                 }
152         }()
153
154         <-stop
155         close(stopped)
156 }