8460: Hide *websocket.Conn behind interface.
[arvados.git] / services / ws / handler_v0.go
1 package main
2
3 import (
4         "encoding/json"
5         "log"
6         "sync"
7 )
8
9 type handlerV0 struct {
10         QueueSize int
11 }
12
13 func (h *handlerV0) debugLogf(ws wsConn, s string, args ...interface{}) {
14         args = append([]interface{}{ws.Request().RemoteAddr}, args...)
15         debugLogf("%s "+s, args...)
16 }
17
18 func (h *handlerV0) Handle(ws wsConn, events <-chan *event) {
19         done := make(chan struct{}, 3)
20         queue := make(chan *event, h.QueueSize)
21         mtx := sync.Mutex{}
22         subscribed := make(map[string]bool)
23         go func() {
24                 buf := make([]byte, 2<<20)
25                 for {
26                         n, err := ws.Read(buf)
27                         h.debugLogf(ws, "received frame: %q", buf[:n])
28                         if err != nil || n == len(buf) {
29                                 break
30                         }
31                         msg := make(map[string]interface{})
32                         err = json.Unmarshal(buf[:n], &msg)
33                         if err != nil {
34                                 break
35                         }
36                         h.debugLogf(ws, "received message: %+v", msg)
37                         h.debugLogf(ws, "subscribing to *")
38                         subscribed["*"] = true
39                 }
40                 done <- struct{}{}
41         }()
42         go func(queue <-chan *event) {
43                 for e := range queue {
44                         detail := e.Detail(nil)
45                         if detail == nil {
46                                 continue
47                         }
48                         // FIXME: check permission
49                         buf, err := json.Marshal(map[string]interface{}{
50                                 "msgID":             e.Serial,
51                                 "id":                detail.ID,
52                                 "uuid":              detail.UUID,
53                                 "object_uuid":       detail.ObjectUUID,
54                                 "object_owner_uuid": detail.ObjectOwnerUUID,
55                                 "event_type":        detail.EventType,
56                         })
57                         if err != nil {
58                                 log.Printf("error encoding: ", err)
59                                 continue
60                         }
61                         _, err = ws.Write(append(buf, byte('\n')))
62                         if  err != nil {
63                                 h.debugLogf(ws, "handlerV0: write: %s", err)
64                                 break
65                         }
66                 }
67                 done <- struct{}{}
68         }(queue)
69         go func() {
70                 send := func(e *event) {
71                         if queue == nil {
72                                 return
73                         }
74                         select {
75                         case queue <- e:
76                         default:
77                                 close(queue)
78                                 queue = nil
79                                 done <- struct{}{}
80                         }
81                 }
82                 for e := range events {
83                         detail := e.Detail(nil)
84                         mtx.Lock()
85                         switch {
86                         case subscribed["*"]:
87                                 send(e)
88                         case detail == nil:
89                         case subscribed[detail.ObjectUUID]:
90                                 send(e)
91                         case subscribed[detail.ObjectOwnerUUID]:
92                                 send(e)
93                         }
94                         mtx.Unlock()
95                 }
96                 done <- struct{}{}
97         }()
98         <-done
99 }