closes #4921
[arvados.git] / services / ws / session_v0_test.go
1 package main
2
3 import (
4         "bytes"
5         "encoding/json"
6         "fmt"
7         "io"
8         "net/url"
9         "os"
10         "time"
11
12         "git.curoverse.com/arvados.git/sdk/go/arvados"
13         "git.curoverse.com/arvados.git/sdk/go/arvadostest"
14         "git.curoverse.com/arvados.git/sdk/go/ctxlog"
15         "golang.org/x/net/websocket"
16         check "gopkg.in/check.v1"
17 )
18
19 func init() {
20         if os.Getenv("ARVADOS_DEBUG") != "" {
21                 ctxlog.SetLevel("debug")
22         }
23 }
24
25 var _ = check.Suite(&v0Suite{})
26
27 type v0Suite struct {
28         token    string
29         toDelete []string
30 }
31
32 func (s *v0Suite) SetUpTest(c *check.C) {
33         s.token = arvadostest.ActiveToken
34 }
35
36 func (s *v0Suite) TearDownSuite(c *check.C) {
37         ac := arvados.NewClientFromEnv()
38         ac.AuthToken = arvadostest.AdminToken
39         for _, path := range s.toDelete {
40                 err := ac.RequestAndDecode(nil, "DELETE", path, nil, nil)
41                 if err != nil {
42                         panic(err)
43                 }
44         }
45 }
46
47 func (s *v0Suite) TestFilters(c *check.C) {
48         srv, conn, r, w := s.testClient()
49         defer srv.Close()
50         defer conn.Close()
51
52         c.Check(w.Encode(map[string]interface{}{
53                 "method":  "subscribe",
54                 "filters": [][]interface{}{{"event_type", "in", []string{"update"}}},
55         }), check.IsNil)
56         s.expectStatus(c, r, 200)
57
58         go s.emitEvents(nil)
59         lg := s.expectLog(c, r)
60         c.Check(lg.EventType, check.Equals, "update")
61 }
62
63 func (s *v0Suite) TestLastLogID(c *check.C) {
64         var lastID uint64
65         c.Assert(testDB().QueryRow(`SELECT MAX(id) FROM logs`).Scan(&lastID), check.IsNil)
66
67         srv, conn, r, w := s.testClient()
68         defer srv.Close()
69         defer conn.Close()
70
71         uuidChan := make(chan string, 2)
72         s.emitEvents(uuidChan)
73
74         c.Check(w.Encode(map[string]interface{}{
75                 "method":      "subscribe",
76                 "last_log_id": lastID,
77         }), check.IsNil)
78         s.expectStatus(c, r, 200)
79
80         go func() {
81                 s.emitEvents(uuidChan)
82                 close(uuidChan)
83         }()
84
85         done := make(chan bool)
86         go func() {
87                 for uuid := range uuidChan {
88                         for _, etype := range []string{"create", "blip", "update"} {
89                                 lg := s.expectLog(c, r)
90                                 c.Check(lg.ObjectUUID, check.Equals, uuid)
91                                 c.Check(lg.EventType, check.Equals, etype)
92                         }
93                 }
94                 close(done)
95         }()
96
97         select {
98         case <-time.After(10 * time.Second):
99                 c.Fatal("timeout")
100         case <-done:
101         }
102 }
103
104 func (s *v0Suite) TestPermission(c *check.C) {
105         srv, conn, r, w := s.testClient()
106         defer srv.Close()
107         defer conn.Close()
108
109         c.Check(w.Encode(map[string]interface{}{
110                 "method": "subscribe",
111         }), check.IsNil)
112         s.expectStatus(c, r, 200)
113
114         uuidChan := make(chan string, 1)
115         go func() {
116                 s.token = arvadostest.AdminToken
117                 s.emitEvents(nil)
118                 s.token = arvadostest.ActiveToken
119                 s.emitEvents(uuidChan)
120         }()
121
122         lg := s.expectLog(c, r)
123         c.Check(lg.ObjectUUID, check.Equals, <-uuidChan)
124 }
125
126 func (s *v0Suite) TestSendBadJSON(c *check.C) {
127         srv, conn, r, w := s.testClient()
128         defer srv.Close()
129         defer conn.Close()
130
131         c.Check(w.Encode(map[string]interface{}{
132                 "method": "subscribe",
133         }), check.IsNil)
134         s.expectStatus(c, r, 200)
135
136         _, err := fmt.Fprint(conn, "^]beep\n")
137         c.Check(err, check.IsNil)
138         s.expectStatus(c, r, 400)
139
140         c.Check(w.Encode(map[string]interface{}{
141                 "method": "subscribe",
142         }), check.IsNil)
143         s.expectStatus(c, r, 200)
144 }
145
146 func (s *v0Suite) TestSubscribe(c *check.C) {
147         srv, conn, r, w := s.testClient()
148         defer srv.Close()
149         defer conn.Close()
150
151         s.emitEvents(nil)
152
153         err := w.Encode(map[string]interface{}{"21": 12})
154         c.Check(err, check.IsNil)
155         s.expectStatus(c, r, 400)
156
157         err = w.Encode(map[string]interface{}{"method": "subscribe", "filters": []string{}})
158         c.Check(err, check.IsNil)
159         s.expectStatus(c, r, 200)
160
161         uuidChan := make(chan string, 1)
162         go s.emitEvents(uuidChan)
163         uuid := <-uuidChan
164
165         for _, etype := range []string{"create", "blip", "update"} {
166                 lg := s.expectLog(c, r)
167                 c.Check(lg.ObjectUUID, check.Equals, uuid)
168                 c.Check(lg.EventType, check.Equals, etype)
169         }
170 }
171
172 // Generate some events by creating and updating a workflow object,
173 // and creating a custom log entry (event_type="blip") about the newly
174 // created workflow. If uuidChan is not nil, send the new workflow
175 // UUID to uuidChan as soon as it's known.
176 func (s *v0Suite) emitEvents(uuidChan chan<- string) {
177         ac := arvados.NewClientFromEnv()
178         ac.AuthToken = s.token
179         wf := &arvados.Workflow{
180                 Name: "ws_test",
181         }
182         err := ac.RequestAndDecode(wf, "POST", "arvados/v1/workflows", s.jsonBody("workflow", wf), map[string]interface{}{"ensure_unique_name": true})
183         if err != nil {
184                 panic(err)
185         }
186         if uuidChan != nil {
187                 uuidChan <- wf.UUID
188         }
189         lg := &arvados.Log{}
190         err = ac.RequestAndDecode(lg, "POST", "arvados/v1/logs", s.jsonBody("log", &arvados.Log{
191                 ObjectUUID: wf.UUID,
192                 EventType:  "blip",
193                 Properties: map[string]interface{}{
194                         "beep": "boop",
195                 },
196         }), nil)
197         if err != nil {
198                 panic(err)
199         }
200         err = ac.RequestAndDecode(wf, "PUT", "arvados/v1/workflows/"+wf.UUID, s.jsonBody("workflow", wf), nil)
201         if err != nil {
202                 panic(err)
203         }
204         s.toDelete = append(s.toDelete, "arvados/v1/workflows/"+wf.UUID, "arvados/v1/logs/"+lg.UUID)
205 }
206
207 func (s *v0Suite) jsonBody(rscName string, ob interface{}) io.Reader {
208         j, err := json.Marshal(ob)
209         if err != nil {
210                 panic(err)
211         }
212         v := url.Values{}
213         v[rscName] = []string{string(j)}
214         return bytes.NewBufferString(v.Encode())
215 }
216
217 func (s *v0Suite) expectStatus(c *check.C, r *json.Decoder, status int) {
218         msg := map[string]interface{}{}
219         c.Check(r.Decode(&msg), check.IsNil)
220         c.Check(int(msg["status"].(float64)), check.Equals, status)
221 }
222
223 func (s *v0Suite) expectLog(c *check.C, r *json.Decoder) *arvados.Log {
224         lg := &arvados.Log{}
225         c.Check(r.Decode(lg), check.IsNil)
226         return lg
227 }
228
229 func (s *v0Suite) testClient() (*server, *websocket.Conn, *json.Decoder, *json.Encoder) {
230         srv := newTestServer()
231         conn, err := websocket.Dial("ws://"+srv.listener.Addr().String()+"/websocket?api_token="+s.token, "", "http://"+srv.listener.Addr().String())
232         if err != nil {
233                 panic(err)
234         }
235         w := json.NewEncoder(conn)
236         r := json.NewDecoder(conn)
237         return srv, conn, r, w
238 }