8460: Refactor session logic (subscription protocol) out of handler (queueing and...
[arvados.git] / services / ws / session_v0.go
1 package main
2
3 import (
4         "encoding/json"
5         "errors"
6         "log"
7         "sync"
8
9         "git.curoverse.com/arvados.git/sdk/go/arvados"
10 )
11
12 var (
13         errQueueFull   = errors.New("client queue full")
14         errFrameTooBig = errors.New("frame too big")
15 )
16
17 type sessionV0 struct {
18         ws          wsConn
19         proxyClient *proxyClient
20         subscribed  map[string]bool
21         mtx         sync.Mutex
22         setupOnce   sync.Once
23 }
24
25 func NewSessionV0(ws wsConn, ac arvados.Client) (session, error) {
26         sess := &sessionV0{
27                 ws:          ws,
28                 proxyClient: NewProxyClient(ac),
29                 subscribed:  make(map[string]bool),
30         }
31
32         err := ws.Request().ParseForm()
33         if err != nil {
34                 log.Printf("%s ParseForm: %s", ws.Request().RemoteAddr, err)
35                 return nil, err
36         }
37         token := ws.Request().Form.Get("api_token")
38         sess.proxyClient.SetToken(token)
39         sess.debugLogf("handlerV0: token = %+q", token)
40
41         return sess, nil
42 }
43
44 func (sess *sessionV0) debugLogf(s string, args ...interface{}) {
45         args = append([]interface{}{sess.ws.Request().RemoteAddr}, args...)
46         debugLogf("%s "+s, args...)
47 }
48
49 func (sess *sessionV0) Receive(msg map[string]interface{}) {
50         sess.debugLogf("received message: %+v", msg)
51         sess.debugLogf("subscribing to *")
52         sess.subscribed["*"] = true
53 }
54
55 func (sess *sessionV0) EventMessage(e *event) ([]byte, error) {
56         detail := e.Detail()
57         if detail == nil {
58                 return nil, nil
59         }
60         ok, err := sess.proxyClient.CheckReadPermission(detail.UUID)
61         if err != nil || !ok {
62                 return nil, err
63         }
64
65         return json.Marshal(map[string]interface{}{
66                 "msgID":             e.Serial,
67                 "id":                detail.ID,
68                 "uuid":              detail.UUID,
69                 "object_uuid":       detail.ObjectUUID,
70                 "object_owner_uuid": detail.ObjectOwnerUUID,
71                 "event_type":        detail.EventType,
72         })
73 }
74
75 func (sess *sessionV0) Filter(e *event) bool {
76         detail := e.Detail()
77         sess.mtx.Lock()
78         defer sess.mtx.Unlock()
79         switch {
80         case sess.subscribed["*"]:
81                 return true
82         case detail == nil:
83                 return false
84         case sess.subscribed[detail.ObjectUUID]:
85                 return true
86         case sess.subscribed[detail.ObjectOwnerUUID]:
87                 return true
88         default:
89                 return false
90         }
91 }