8460: Send events.
[arvados.git] / services / ws / handler_v0.go
1 package main
2
3 import (
4         "encoding/json"
5         "log"
6         "sync"
7
8         "golang.org/x/net/websocket"
9 )
10
11 type handlerV0 struct {
12         QueueSize int
13 }
14
15 func (h *handlerV0) debugLogf(ws *websocket.Conn, s string, args ...interface{}) {
16         args = append([]interface{}{ws.Request().RemoteAddr}, args...)
17         debugLogf("%s "+s, args...)
18 }
19
20 func (h *handlerV0) Handle(ws *websocket.Conn, events <-chan *event) {
21         done := make(chan struct{}, 3)
22         queue := make(chan *event, h.QueueSize)
23         mtx := sync.Mutex{}
24         subscribed := make(map[string]bool)
25         go func() {
26                 buf := make([]byte, 2<<20)
27                 for {
28                         n, err := ws.Read(buf)
29                         h.debugLogf(ws, "received frame: %q", buf[:n])
30                         if err != nil || n == len(buf) {
31                                 break
32                         }
33                         msg := make(map[string]interface{})
34                         err = json.Unmarshal(buf[:n], &msg)
35                         if err != nil {
36                                 break
37                         }
38                         h.debugLogf(ws, "received message: %+v", msg)
39                         h.debugLogf(ws, "subscribing to *")
40                         subscribed["*"] = true
41                 }
42                 done <- struct{}{}
43         }()
44         go func(queue <-chan *event) {
45                 for e := range queue {
46                         detail := e.Detail(nil)
47                         if detail == nil {
48                                 continue
49                         }
50                         // FIXME: check permission
51                         buf, err := json.Marshal(map[string]interface{}{
52                                 "msgID":             e.Serial,
53                                 "id":                detail.ID,
54                                 "uuid":              detail.UUID,
55                                 "object_uuid":       detail.ObjectUUID,
56                                 "object_owner_uuid": detail.ObjectOwnerUUID,
57                                 "event_type":        detail.EventType,
58                         })
59                         if err != nil {
60                                 log.Printf("error encoding: ", err)
61                                 continue
62                         }
63                         _, err = ws.Write(append(buf, byte('\n')))
64                         if  err != nil {
65                                 h.debugLogf(ws, "handlerV0: write: %s", err)
66                                 break
67                         }
68                 }
69                 done <- struct{}{}
70         }(queue)
71         go func() {
72                 send := func(e *event) {
73                         if queue == nil {
74                                 return
75                         }
76                         select {
77                         case queue <- e:
78                         default:
79                                 close(queue)
80                                 queue = nil
81                                 done <- struct{}{}
82                         }
83                 }
84                 for e := range events {
85                         detail := e.Detail(nil)
86                         mtx.Lock()
87                         switch {
88                         case subscribed["*"]:
89                                 send(e)
90                         case detail == nil:
91                         case subscribed[detail.ObjectUUID]:
92                                 send(e)
93                         case subscribed[detail.ObjectOwnerUUID]:
94                                 send(e)
95                         }
96                         mtx.Unlock()
97                 }
98                 done <- struct{}{}
99         }()
100         <-done
101 }