1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
17 "git.curoverse.com/arvados.git/sdk/go/arvados"
18 "git.curoverse.com/arvados.git/sdk/go/arvadostest"
19 "git.curoverse.com/arvados.git/sdk/go/ctxlog"
20 "golang.org/x/net/websocket"
21 check "gopkg.in/check.v1"
25 if os.Getenv("ARVADOS_DEBUG") != "" {
26 ctxlog.SetLevel("debug")
30 var _ = check.Suite(&v0Suite{})
33 serverSuite serverSuite
40 func (s *v0Suite) SetUpTest(c *check.C) {
41 s.serverSuite.SetUpTest(c)
42 go s.serverSuite.srv.Run()
43 s.serverSuite.srv.WaitReady()
45 s.token = arvadostest.ActiveToken
46 s.ignoreLogID = s.lastLogID(c)
49 func (s *v0Suite) TearDownTest(c *check.C) {
51 s.serverSuite.srv.Close()
54 func (s *v0Suite) TearDownSuite(c *check.C) {
55 s.deleteTestObjects(c)
58 func (s *v0Suite) deleteTestObjects(c *check.C) {
59 ac := arvados.NewClientFromEnv()
60 ac.AuthToken = arvadostest.AdminToken
61 for _, path := range s.toDelete {
62 err := ac.RequestAndDecode(nil, "DELETE", path, nil, nil)
70 func (s *v0Suite) TestFilters(c *check.C) {
71 conn, r, w := s.testClient()
74 cmd := func(method, eventType string, status int) {
75 c.Check(w.Encode(map[string]interface{}{
77 "filters": [][]interface{}{{"event_type", "in", []string{eventType}}},
79 s.expectStatus(c, r, status)
81 cmd("subscribe", "update", 200)
82 cmd("subscribe", "update", 200)
83 cmd("subscribe", "create", 200)
84 cmd("subscribe", "update", 200)
85 cmd("unsubscribe", "blip", 400)
86 cmd("unsubscribe", "create", 200)
87 cmd("unsubscribe", "update", 200)
90 lg := s.expectLog(c, r)
91 c.Check(lg.EventType, check.Equals, "update")
93 cmd("unsubscribe", "update", 200)
94 cmd("unsubscribe", "update", 200)
95 cmd("unsubscribe", "update", 400)
98 func (s *v0Suite) TestLastLogID(c *check.C) {
99 lastID := s.lastLogID(c)
101 checkLogs := func(r *json.Decoder, uuid string) {
102 for _, etype := range []string{"create", "blip", "update"} {
103 lg := s.expectLog(c, r)
104 for lg.ObjectUUID != uuid {
105 lg = s.expectLog(c, r)
107 c.Check(lg.EventType, check.Equals, etype)
111 // Connecting connEarly (before sending the early events) lets
112 // us confirm all of the "early" events have already passed
113 // through the server.
114 connEarly, rEarly, wEarly := s.testClient()
115 defer connEarly.Close()
116 c.Check(wEarly.Encode(map[string]interface{}{
117 "method": "subscribe",
119 s.expectStatus(c, rEarly, 200)
121 // Send the early events.
122 uuidChan := make(chan string, 1)
123 s.emitEvents(uuidChan)
124 uuidEarly := <-uuidChan
126 // Wait for the early events to pass through.
127 checkLogs(rEarly, uuidEarly)
129 // Connect the client that wants to get old events via
131 conn, r, w := s.testClient()
134 c.Check(w.Encode(map[string]interface{}{
135 "method": "subscribe",
136 "last_log_id": lastID,
138 s.expectStatus(c, r, 200)
140 checkLogs(r, uuidEarly)
141 s.emitEvents(uuidChan)
142 checkLogs(r, <-uuidChan)
145 func (s *v0Suite) TestPermission(c *check.C) {
146 conn, r, w := s.testClient()
149 c.Check(w.Encode(map[string]interface{}{
150 "method": "subscribe",
152 s.expectStatus(c, r, 200)
154 uuidChan := make(chan string, 2)
156 s.token = arvadostest.AdminToken
157 s.emitEvents(uuidChan)
158 s.token = arvadostest.ActiveToken
159 s.emitEvents(uuidChan)
162 wrongUUID := <-uuidChan
163 rightUUID := <-uuidChan
164 lg := s.expectLog(c, r)
165 for lg.ObjectUUID != rightUUID {
166 c.Check(lg.ObjectUUID, check.Not(check.Equals), wrongUUID)
167 lg = s.expectLog(c, r)
171 // Two users create private objects; admin deletes both objects; each
172 // user receives a "delete" event for their own object (not for the
173 // other user's object).
174 func (s *v0Suite) TestEventTypeDelete(c *check.C) {
175 clients := []struct {
181 }{{token: arvadostest.ActiveToken}, {token: arvadostest.SpectatorToken}}
182 for i := range clients {
183 uuidChan := make(chan string, 1)
184 s.token = clients[i].token
185 s.emitEvents(uuidChan)
186 clients[i].uuid = <-uuidChan
187 clients[i].conn, clients[i].r, clients[i].w = s.testClient()
189 c.Check(clients[i].w.Encode(map[string]interface{}{
190 "method": "subscribe",
192 s.expectStatus(c, clients[i].r, 200)
195 s.ignoreLogID = s.lastLogID(c)
196 s.deleteTestObjects(c)
198 for _, client := range clients {
199 lg := s.expectLog(c, client.r)
200 c.Check(lg.ObjectUUID, check.Equals, client.uuid)
201 c.Check(lg.EventType, check.Equals, "delete")
205 // Trashing/deleting a collection produces an "update" event with
206 // properties["new_attributes"]["is_trashed"] == true.
207 func (s *v0Suite) TestTrashedCollection(c *check.C) {
208 ac := arvados.NewClientFromEnv()
209 ac.AuthToken = s.token
211 coll := &arvados.Collection{ManifestText: ""}
212 err := ac.RequestAndDecode(coll, "POST", "arvados/v1/collections", s.jsonBody("collection", coll), map[string]interface{}{"ensure_unique_name": true})
213 c.Assert(err, check.IsNil)
214 s.ignoreLogID = s.lastLogID(c)
216 conn, r, w := s.testClient()
219 c.Check(w.Encode(map[string]interface{}{
220 "method": "subscribe",
222 s.expectStatus(c, r, 200)
224 err = ac.RequestAndDecode(nil, "DELETE", "arvados/v1/collections/"+coll.UUID, nil, nil)
225 c.Assert(err, check.IsNil)
227 lg := s.expectLog(c, r)
228 c.Check(lg.ObjectUUID, check.Equals, coll.UUID)
229 c.Check(lg.EventType, check.Equals, "update")
230 c.Check(lg.Properties["old_attributes"].(map[string]interface{})["is_trashed"], check.Equals, false)
231 c.Check(lg.Properties["new_attributes"].(map[string]interface{})["is_trashed"], check.Equals, true)
234 func (s *v0Suite) TestSendBadJSON(c *check.C) {
235 conn, r, w := s.testClient()
238 c.Check(w.Encode(map[string]interface{}{
239 "method": "subscribe",
241 s.expectStatus(c, r, 200)
243 _, err := fmt.Fprint(conn, "^]beep\n")
244 c.Check(err, check.IsNil)
245 s.expectStatus(c, r, 400)
247 c.Check(w.Encode(map[string]interface{}{
248 "method": "subscribe",
250 s.expectStatus(c, r, 200)
253 func (s *v0Suite) TestSubscribe(c *check.C) {
254 conn, r, w := s.testClient()
259 err := w.Encode(map[string]interface{}{"21": 12})
260 c.Check(err, check.IsNil)
261 s.expectStatus(c, r, 400)
263 err = w.Encode(map[string]interface{}{"method": "subscribe", "filters": []string{}})
264 c.Check(err, check.IsNil)
265 s.expectStatus(c, r, 200)
267 uuidChan := make(chan string, 1)
268 go s.emitEvents(uuidChan)
271 for _, etype := range []string{"create", "blip", "update"} {
272 lg := s.expectLog(c, r)
273 for lg.ObjectUUID != uuid {
274 lg = s.expectLog(c, r)
276 c.Check(lg.EventType, check.Equals, etype)
280 // Generate some events by creating and updating a workflow object,
281 // and creating a custom log entry (event_type="blip") about the newly
282 // created workflow. If uuidChan is not nil, send the new workflow
283 // UUID to uuidChan as soon as it's known.
284 func (s *v0Suite) emitEvents(uuidChan chan<- string) {
288 ac := arvados.NewClientFromEnv()
289 ac.AuthToken = s.token
290 wf := &arvados.Workflow{
293 err := ac.RequestAndDecode(wf, "POST", "arvados/v1/workflows", s.jsonBody("workflow", wf), map[string]interface{}{"ensure_unique_name": true})
301 err = ac.RequestAndDecode(lg, "POST", "arvados/v1/logs", s.jsonBody("log", &arvados.Log{
304 Properties: map[string]interface{}{
311 err = ac.RequestAndDecode(wf, "PUT", "arvados/v1/workflows/"+wf.UUID, s.jsonBody("workflow", wf), nil)
315 s.toDelete = append(s.toDelete, "arvados/v1/workflows/"+wf.UUID, "arvados/v1/logs/"+lg.UUID)
318 func (s *v0Suite) jsonBody(rscName string, ob interface{}) io.Reader {
319 j, err := json.Marshal(ob)
324 v[rscName] = []string{string(j)}
325 return bytes.NewBufferString(v.Encode())
328 func (s *v0Suite) expectStatus(c *check.C, r *json.Decoder, status int) {
329 msg := map[string]interface{}{}
330 c.Check(r.Decode(&msg), check.IsNil)
331 c.Check(int(msg["status"].(float64)), check.Equals, status)
334 func (s *v0Suite) expectLog(c *check.C, r *json.Decoder) *arvados.Log {
336 ok := make(chan struct{})
338 for lg.ID <= s.ignoreLogID {
339 c.Check(r.Decode(lg), check.IsNil)
344 case <-time.After(10 * time.Second):
351 func (s *v0Suite) testClient() (*websocket.Conn, *json.Decoder, *json.Encoder) {
352 srv := s.serverSuite.srv
353 conn, err := websocket.Dial("ws://"+srv.listener.Addr().String()+"/websocket?api_token="+s.token, "", "http://"+srv.listener.Addr().String())
357 w := json.NewEncoder(conn)
358 r := json.NewDecoder(conn)
362 func (s *v0Suite) lastLogID(c *check.C) uint64 {
364 c.Assert(testDB().QueryRow(`SELECT MAX(id) FROM logs`).Scan(&lastID), check.IsNil)