1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
17 "git.curoverse.com/arvados.git/sdk/go/arvados"
18 "git.curoverse.com/arvados.git/sdk/go/arvadostest"
19 "git.curoverse.com/arvados.git/sdk/go/ctxlog"
20 "golang.org/x/net/websocket"
21 check "gopkg.in/check.v1"
25 if os.Getenv("ARVADOS_DEBUG") != "" {
26 ctxlog.SetLevel("debug")
30 var _ = check.Suite(&v0Suite{})
33 serverSuite serverSuite
40 func (s *v0Suite) SetUpTest(c *check.C) {
41 s.serverSuite.SetUpTest(c)
42 go s.serverSuite.srv.Run()
43 s.serverSuite.srv.WaitReady()
45 s.token = arvadostest.ActiveToken
46 s.ignoreLogID = s.lastLogID(c)
49 func (s *v0Suite) TearDownTest(c *check.C) {
51 s.serverSuite.srv.Close()
54 func (s *v0Suite) TearDownSuite(c *check.C) {
55 s.deleteTestObjects(c)
58 func (s *v0Suite) deleteTestObjects(c *check.C) {
59 ac := arvados.NewClientFromEnv()
60 ac.AuthToken = arvadostest.AdminToken
61 for _, path := range s.toDelete {
62 err := ac.RequestAndDecode(nil, "DELETE", path, nil, nil)
70 func (s *v0Suite) TestFilters(c *check.C) {
71 conn, r, w := s.testClient()
74 c.Check(w.Encode(map[string]interface{}{
75 "method": "subscribe",
76 "filters": [][]interface{}{{"event_type", "in", []string{"update"}}},
78 s.expectStatus(c, r, 200)
81 lg := s.expectLog(c, r)
82 c.Check(lg.EventType, check.Equals, "update")
85 func (s *v0Suite) TestLastLogID(c *check.C) {
86 lastID := s.lastLogID(c)
88 checkLogs := func(r *json.Decoder, uuid string) {
89 for _, etype := range []string{"create", "blip", "update"} {
90 lg := s.expectLog(c, r)
91 for lg.ObjectUUID != uuid {
92 lg = s.expectLog(c, r)
94 c.Check(lg.EventType, check.Equals, etype)
98 // Connecting connEarly (before sending the early events) lets
99 // us confirm all of the "early" events have already passed
100 // through the server.
101 connEarly, rEarly, wEarly := s.testClient()
102 defer connEarly.Close()
103 c.Check(wEarly.Encode(map[string]interface{}{
104 "method": "subscribe",
106 s.expectStatus(c, rEarly, 200)
108 // Send the early events.
109 uuidChan := make(chan string, 1)
110 s.emitEvents(uuidChan)
111 uuidEarly := <-uuidChan
113 // Wait for the early events to pass through.
114 checkLogs(rEarly, uuidEarly)
116 // Connect the client that wants to get old events via
118 conn, r, w := s.testClient()
121 c.Check(w.Encode(map[string]interface{}{
122 "method": "subscribe",
123 "last_log_id": lastID,
125 s.expectStatus(c, r, 200)
127 checkLogs(r, uuidEarly)
128 s.emitEvents(uuidChan)
129 checkLogs(r, <-uuidChan)
132 func (s *v0Suite) TestPermission(c *check.C) {
133 conn, r, w := s.testClient()
136 c.Check(w.Encode(map[string]interface{}{
137 "method": "subscribe",
139 s.expectStatus(c, r, 200)
141 uuidChan := make(chan string, 2)
143 s.token = arvadostest.AdminToken
144 s.emitEvents(uuidChan)
145 s.token = arvadostest.ActiveToken
146 s.emitEvents(uuidChan)
149 wrongUUID := <-uuidChan
150 rightUUID := <-uuidChan
151 lg := s.expectLog(c, r)
152 for lg.ObjectUUID != rightUUID {
153 c.Check(lg.ObjectUUID, check.Not(check.Equals), wrongUUID)
154 lg = s.expectLog(c, r)
158 // Two users create private objects; admin deletes both objects; each
159 // user receives a "delete" event for their own object (not for the
160 // other user's object).
161 func (s *v0Suite) TestEventTypeDelete(c *check.C) {
162 clients := []struct {
168 }{{token: arvadostest.ActiveToken}, {token: arvadostest.SpectatorToken}}
169 for i := range clients {
170 uuidChan := make(chan string, 1)
171 s.token = clients[i].token
172 s.emitEvents(uuidChan)
173 clients[i].uuid = <-uuidChan
174 clients[i].conn, clients[i].r, clients[i].w = s.testClient()
176 c.Check(clients[i].w.Encode(map[string]interface{}{
177 "method": "subscribe",
179 s.expectStatus(c, clients[i].r, 200)
182 s.ignoreLogID = s.lastLogID(c)
183 s.deleteTestObjects(c)
185 for _, client := range clients {
186 lg := s.expectLog(c, client.r)
187 c.Check(lg.ObjectUUID, check.Equals, client.uuid)
188 c.Check(lg.EventType, check.Equals, "delete")
192 // Trashing/deleting a collection produces an "update" event with
193 // properties["new_attributes"]["is_trashed"] == true.
194 func (s *v0Suite) TestTrashedCollection(c *check.C) {
195 ac := arvados.NewClientFromEnv()
196 ac.AuthToken = s.token
198 coll := &arvados.Collection{ManifestText: ""}
199 err := ac.RequestAndDecode(coll, "POST", "arvados/v1/collections", s.jsonBody("collection", coll), map[string]interface{}{"ensure_unique_name": true})
200 c.Assert(err, check.IsNil)
201 s.ignoreLogID = s.lastLogID(c)
203 conn, r, w := s.testClient()
206 c.Check(w.Encode(map[string]interface{}{
207 "method": "subscribe",
209 s.expectStatus(c, r, 200)
211 err = ac.RequestAndDecode(nil, "DELETE", "arvados/v1/collections/"+coll.UUID, nil, nil)
212 c.Assert(err, check.IsNil)
214 lg := s.expectLog(c, r)
215 c.Check(lg.ObjectUUID, check.Equals, coll.UUID)
216 c.Check(lg.EventType, check.Equals, "update")
217 c.Check(lg.Properties["old_attributes"].(map[string]interface{})["is_trashed"], check.Equals, false)
218 c.Check(lg.Properties["new_attributes"].(map[string]interface{})["is_trashed"], check.Equals, true)
221 func (s *v0Suite) TestSendBadJSON(c *check.C) {
222 conn, r, w := s.testClient()
225 c.Check(w.Encode(map[string]interface{}{
226 "method": "subscribe",
228 s.expectStatus(c, r, 200)
230 _, err := fmt.Fprint(conn, "^]beep\n")
231 c.Check(err, check.IsNil)
232 s.expectStatus(c, r, 400)
234 c.Check(w.Encode(map[string]interface{}{
235 "method": "subscribe",
237 s.expectStatus(c, r, 200)
240 func (s *v0Suite) TestSubscribe(c *check.C) {
241 conn, r, w := s.testClient()
246 err := w.Encode(map[string]interface{}{"21": 12})
247 c.Check(err, check.IsNil)
248 s.expectStatus(c, r, 400)
250 err = w.Encode(map[string]interface{}{"method": "subscribe", "filters": []string{}})
251 c.Check(err, check.IsNil)
252 s.expectStatus(c, r, 200)
254 uuidChan := make(chan string, 1)
255 go s.emitEvents(uuidChan)
258 for _, etype := range []string{"create", "blip", "update"} {
259 lg := s.expectLog(c, r)
260 for lg.ObjectUUID != uuid {
261 lg = s.expectLog(c, r)
263 c.Check(lg.EventType, check.Equals, etype)
267 // Generate some events by creating and updating a workflow object,
268 // and creating a custom log entry (event_type="blip") about the newly
269 // created workflow. If uuidChan is not nil, send the new workflow
270 // UUID to uuidChan as soon as it's known.
271 func (s *v0Suite) emitEvents(uuidChan chan<- string) {
275 ac := arvados.NewClientFromEnv()
276 ac.AuthToken = s.token
277 wf := &arvados.Workflow{
280 err := ac.RequestAndDecode(wf, "POST", "arvados/v1/workflows", s.jsonBody("workflow", wf), map[string]interface{}{"ensure_unique_name": true})
288 err = ac.RequestAndDecode(lg, "POST", "arvados/v1/logs", s.jsonBody("log", &arvados.Log{
291 Properties: map[string]interface{}{
298 err = ac.RequestAndDecode(wf, "PUT", "arvados/v1/workflows/"+wf.UUID, s.jsonBody("workflow", wf), nil)
302 s.toDelete = append(s.toDelete, "arvados/v1/workflows/"+wf.UUID, "arvados/v1/logs/"+lg.UUID)
305 func (s *v0Suite) jsonBody(rscName string, ob interface{}) io.Reader {
306 j, err := json.Marshal(ob)
311 v[rscName] = []string{string(j)}
312 return bytes.NewBufferString(v.Encode())
315 func (s *v0Suite) expectStatus(c *check.C, r *json.Decoder, status int) {
316 msg := map[string]interface{}{}
317 c.Check(r.Decode(&msg), check.IsNil)
318 c.Check(int(msg["status"].(float64)), check.Equals, status)
321 func (s *v0Suite) expectLog(c *check.C, r *json.Decoder) *arvados.Log {
323 ok := make(chan struct{})
325 for lg.ID <= s.ignoreLogID {
326 c.Check(r.Decode(lg), check.IsNil)
331 case <-time.After(10 * time.Second):
338 func (s *v0Suite) testClient() (*websocket.Conn, *json.Decoder, *json.Encoder) {
339 srv := s.serverSuite.srv
340 conn, err := websocket.Dial("ws://"+srv.listener.Addr().String()+"/websocket?api_token="+s.token, "", "http://"+srv.listener.Addr().String())
344 w := json.NewEncoder(conn)
345 r := json.NewDecoder(conn)
349 func (s *v0Suite) lastLogID(c *check.C) uint64 {
351 c.Assert(testDB().QueryRow(`SELECT MAX(id) FROM logs`).Scan(&lastID), check.IsNil)