Merge branch '11917-dont-clear-cache'
[arvados.git] / services / ws / session_v0_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "encoding/json"
10         "fmt"
11         "io"
12         "net/url"
13         "os"
14         "time"
15
16         "git.curoverse.com/arvados.git/sdk/go/arvados"
17         "git.curoverse.com/arvados.git/sdk/go/arvadostest"
18         "git.curoverse.com/arvados.git/sdk/go/ctxlog"
19         "golang.org/x/net/websocket"
20         check "gopkg.in/check.v1"
21 )
22
23 func init() {
24         if os.Getenv("ARVADOS_DEBUG") != "" {
25                 ctxlog.SetLevel("debug")
26         }
27 }
28
29 var _ = check.Suite(&v0Suite{})
30
31 type v0Suite struct {
32         serverSuite serverSuite
33         token       string
34         toDelete    []string
35 }
36
37 func (s *v0Suite) SetUpTest(c *check.C) {
38         s.serverSuite.SetUpTest(c)
39         s.token = arvadostest.ActiveToken
40 }
41
42 func (s *v0Suite) TearDownSuite(c *check.C) {
43         ac := arvados.NewClientFromEnv()
44         ac.AuthToken = arvadostest.AdminToken
45         for _, path := range s.toDelete {
46                 err := ac.RequestAndDecode(nil, "DELETE", path, nil, nil)
47                 if err != nil {
48                         panic(err)
49                 }
50         }
51 }
52
53 func (s *v0Suite) TestFilters(c *check.C) {
54         srv, conn, r, w := s.testClient()
55         defer srv.Close()
56         defer conn.Close()
57
58         c.Check(w.Encode(map[string]interface{}{
59                 "method":  "subscribe",
60                 "filters": [][]interface{}{{"event_type", "in", []string{"update"}}},
61         }), check.IsNil)
62         s.expectStatus(c, r, 200)
63
64         go s.emitEvents(nil)
65         lg := s.expectLog(c, r)
66         c.Check(lg.EventType, check.Equals, "update")
67 }
68
69 func (s *v0Suite) TestLastLogID(c *check.C) {
70         var lastID uint64
71         c.Assert(testDB().QueryRow(`SELECT MAX(id) FROM logs`).Scan(&lastID), check.IsNil)
72
73         srv, conn, r, w := s.testClient()
74         defer srv.Close()
75         defer conn.Close()
76
77         uuidChan := make(chan string, 2)
78         s.emitEvents(uuidChan)
79
80         c.Check(w.Encode(map[string]interface{}{
81                 "method":      "subscribe",
82                 "last_log_id": lastID,
83         }), check.IsNil)
84         s.expectStatus(c, r, 200)
85
86         avoidRace := make(chan struct{}, cap(uuidChan))
87         go func() {
88                 // When last_log_id is given, although v0session sends
89                 // old events in order, and sends new events in order,
90                 // it doesn't necessarily finish sending all old
91                 // events before sending any new events. To avoid
92                 // hitting this bug in the test, we wait for the old
93                 // events to arrive before emitting any new events.
94                 <-avoidRace
95                 s.emitEvents(uuidChan)
96                 close(uuidChan)
97         }()
98
99         go func() {
100                 for uuid := range uuidChan {
101                         for _, etype := range []string{"create", "blip", "update"} {
102                                 lg := s.expectLog(c, r)
103                                 for lg.ObjectUUID != uuid {
104                                         lg = s.expectLog(c, r)
105                                 }
106                                 c.Check(lg.EventType, check.Equals, etype)
107                         }
108                         avoidRace <- struct{}{}
109                 }
110         }()
111 }
112
113 func (s *v0Suite) TestPermission(c *check.C) {
114         srv, conn, r, w := s.testClient()
115         defer srv.Close()
116         defer conn.Close()
117
118         c.Check(w.Encode(map[string]interface{}{
119                 "method": "subscribe",
120         }), check.IsNil)
121         s.expectStatus(c, r, 200)
122
123         uuidChan := make(chan string, 2)
124         go func() {
125                 s.token = arvadostest.AdminToken
126                 s.emitEvents(uuidChan)
127                 s.token = arvadostest.ActiveToken
128                 s.emitEvents(uuidChan)
129         }()
130
131         wrongUUID := <-uuidChan
132         rightUUID := <-uuidChan
133         lg := s.expectLog(c, r)
134         for lg.ObjectUUID != rightUUID {
135                 c.Check(lg.ObjectUUID, check.Not(check.Equals), wrongUUID)
136                 lg = s.expectLog(c, r)
137         }
138 }
139
140 func (s *v0Suite) TestSendBadJSON(c *check.C) {
141         srv, conn, r, w := s.testClient()
142         defer srv.Close()
143         defer conn.Close()
144
145         c.Check(w.Encode(map[string]interface{}{
146                 "method": "subscribe",
147         }), check.IsNil)
148         s.expectStatus(c, r, 200)
149
150         _, err := fmt.Fprint(conn, "^]beep\n")
151         c.Check(err, check.IsNil)
152         s.expectStatus(c, r, 400)
153
154         c.Check(w.Encode(map[string]interface{}{
155                 "method": "subscribe",
156         }), check.IsNil)
157         s.expectStatus(c, r, 200)
158 }
159
160 func (s *v0Suite) TestSubscribe(c *check.C) {
161         srv, conn, r, w := s.testClient()
162         defer srv.Close()
163         defer conn.Close()
164
165         s.emitEvents(nil)
166
167         err := w.Encode(map[string]interface{}{"21": 12})
168         c.Check(err, check.IsNil)
169         s.expectStatus(c, r, 400)
170
171         err = w.Encode(map[string]interface{}{"method": "subscribe", "filters": []string{}})
172         c.Check(err, check.IsNil)
173         s.expectStatus(c, r, 200)
174
175         uuidChan := make(chan string, 1)
176         go s.emitEvents(uuidChan)
177         uuid := <-uuidChan
178
179         for _, etype := range []string{"create", "blip", "update"} {
180                 lg := s.expectLog(c, r)
181                 for lg.ObjectUUID != uuid {
182                         lg = s.expectLog(c, r)
183                 }
184                 c.Check(lg.EventType, check.Equals, etype)
185         }
186 }
187
188 // Generate some events by creating and updating a workflow object,
189 // and creating a custom log entry (event_type="blip") about the newly
190 // created workflow. If uuidChan is not nil, send the new workflow
191 // UUID to uuidChan as soon as it's known.
192 func (s *v0Suite) emitEvents(uuidChan chan<- string) {
193         ac := arvados.NewClientFromEnv()
194         ac.AuthToken = s.token
195         wf := &arvados.Workflow{
196                 Name: "ws_test",
197         }
198         err := ac.RequestAndDecode(wf, "POST", "arvados/v1/workflows", s.jsonBody("workflow", wf), map[string]interface{}{"ensure_unique_name": true})
199         if err != nil {
200                 panic(err)
201         }
202         if uuidChan != nil {
203                 uuidChan <- wf.UUID
204         }
205         lg := &arvados.Log{}
206         err = ac.RequestAndDecode(lg, "POST", "arvados/v1/logs", s.jsonBody("log", &arvados.Log{
207                 ObjectUUID: wf.UUID,
208                 EventType:  "blip",
209                 Properties: map[string]interface{}{
210                         "beep": "boop",
211                 },
212         }), nil)
213         if err != nil {
214                 panic(err)
215         }
216         err = ac.RequestAndDecode(wf, "PUT", "arvados/v1/workflows/"+wf.UUID, s.jsonBody("workflow", wf), nil)
217         if err != nil {
218                 panic(err)
219         }
220         s.toDelete = append(s.toDelete, "arvados/v1/workflows/"+wf.UUID, "arvados/v1/logs/"+lg.UUID)
221 }
222
223 func (s *v0Suite) jsonBody(rscName string, ob interface{}) io.Reader {
224         j, err := json.Marshal(ob)
225         if err != nil {
226                 panic(err)
227         }
228         v := url.Values{}
229         v[rscName] = []string{string(j)}
230         return bytes.NewBufferString(v.Encode())
231 }
232
233 func (s *v0Suite) expectStatus(c *check.C, r *json.Decoder, status int) {
234         msg := map[string]interface{}{}
235         c.Check(r.Decode(&msg), check.IsNil)
236         c.Check(int(msg["status"].(float64)), check.Equals, status)
237 }
238
239 func (s *v0Suite) expectLog(c *check.C, r *json.Decoder) *arvados.Log {
240         lg := &arvados.Log{}
241         ok := make(chan struct{})
242         go func() {
243                 c.Check(r.Decode(lg), check.IsNil)
244                 close(ok)
245         }()
246         select {
247         case <-time.After(10 * time.Second):
248                 panic("timed out")
249         case <-ok:
250                 return lg
251         }
252 }
253
254 func (s *v0Suite) testClient() (*server, *websocket.Conn, *json.Decoder, *json.Encoder) {
255         go s.serverSuite.srv.Run()
256         s.serverSuite.srv.WaitReady()
257         srv := s.serverSuite.srv
258         conn, err := websocket.Dial("ws://"+srv.listener.Addr().String()+"/websocket?api_token="+s.token, "", "http://"+srv.listener.Addr().String())
259         if err != nil {
260                 panic(err)
261         }
262         w := json.NewEncoder(conn)
263         r := json.NewDecoder(conn)
264         return srv, conn, r, w
265 }