1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
22 "git.arvados.org/arvados.git/sdk/go/arvados"
23 "git.arvados.org/arvados.git/sdk/go/arvadostest"
24 "git.arvados.org/arvados.git/sdk/go/ctxlog"
25 "golang.org/x/net/websocket"
26 check "gopkg.in/check.v1"
30 if os.Getenv("ARVADOS_DEBUG") != "" {
31 ctxlog.SetLevel("debug")
35 var _ = check.Suite(&v0Suite{})
38 serviceSuite serviceSuite
45 func (s *v0Suite) SetUpTest(c *check.C) {
46 s.serviceSuite.SetUpTest(c)
47 s.serviceSuite.start(c)
49 s.token = arvadostest.ActiveToken
50 s.ignoreLogID = s.lastLogID(c)
53 func (s *v0Suite) TearDownTest(c *check.C) {
55 s.serviceSuite.TearDownTest(c)
58 func (s *v0Suite) TearDownSuite(c *check.C) {
59 s.deleteTestObjects(c)
62 func (s *v0Suite) deleteTestObjects(c *check.C) {
63 ac := arvados.NewClientFromEnv()
64 ac.AuthToken = arvadostest.AdminToken
65 for _, path := range s.toDelete {
66 err := ac.RequestAndDecode(nil, "DELETE", path, nil, nil)
67 c.Check(err, check.IsNil)
72 func (s *v0Suite) TestFilters(c *check.C) {
73 conn, r, w, err := s.testClient()
74 c.Assert(err, check.IsNil)
77 cmd := func(method, eventType string, status int) {
78 c.Check(w.Encode(map[string]interface{}{
80 "filters": [][]interface{}{{"event_type", "in", []string{eventType}}},
82 s.expectStatus(c, r, status)
84 cmd("subscribe", "update", 200)
85 cmd("subscribe", "update", 200)
86 cmd("subscribe", "create", 200)
87 cmd("subscribe", "update", 200)
88 cmd("unsubscribe", "blip", 400)
89 cmd("unsubscribe", "create", 200)
90 cmd("unsubscribe", "update", 200)
92 go s.emitEvents(c, nil, nil)
93 lg := s.expectLog(c, r)
94 c.Check(lg.EventType, check.Equals, "update")
96 cmd("unsubscribe", "update", 200)
97 cmd("unsubscribe", "update", 200)
98 cmd("unsubscribe", "update", 400)
101 func (s *v0Suite) TestLastLogID(c *check.C) {
102 lastID := s.lastLogID(c)
104 checkLogs := func(r *json.Decoder, uuid string) {
105 for _, etype := range []string{"create", "blip", "update"} {
106 lg := s.expectLog(c, r)
107 for lg.ObjectUUID != uuid {
108 lg = s.expectLog(c, r)
110 c.Check(lg.EventType, check.Equals, etype)
114 // Connecting connEarly (before sending the early events) lets
115 // us confirm all of the "early" events have already passed
116 // through the server.
117 connEarly, rEarly, wEarly, err := s.testClient()
118 c.Assert(err, check.IsNil)
119 defer connEarly.Close()
120 c.Check(wEarly.Encode(map[string]interface{}{
121 "method": "subscribe",
123 s.expectStatus(c, rEarly, 200)
125 // Send the early events.
126 uuidChan := make(chan string, 1)
127 s.emitEvents(c, uuidChan, nil)
128 uuidEarly := <-uuidChan
130 // Wait for the early events to pass through.
131 checkLogs(rEarly, uuidEarly)
133 // Connect the client that wants to get old events via
135 conn, r, w, err := s.testClient()
136 c.Assert(err, check.IsNil)
139 c.Check(w.Encode(map[string]interface{}{
140 "method": "subscribe",
141 "last_log_id": lastID,
143 s.expectStatus(c, r, 200)
145 checkLogs(r, uuidEarly)
146 s.emitEvents(c, uuidChan, nil)
147 checkLogs(r, <-uuidChan)
150 func (s *v0Suite) TestPermission(c *check.C) {
151 conn, r, w, err := s.testClient()
152 c.Assert(err, check.IsNil)
155 c.Check(w.Encode(map[string]interface{}{
156 "method": "subscribe",
158 s.expectStatus(c, r, 200)
160 uuidChan := make(chan string, 2)
162 s.token = arvadostest.AdminToken
163 s.emitEvents(c, uuidChan, nil)
164 s.token = arvadostest.ActiveToken
165 s.emitEvents(c, uuidChan, nil)
168 wrongUUID := <-uuidChan
169 rightUUID := <-uuidChan
170 lg := s.expectLog(c, r)
171 for lg.ObjectUUID != rightUUID {
172 c.Check(lg.ObjectUUID, check.Not(check.Equals), wrongUUID)
173 lg = s.expectLog(c, r)
177 // Two users create private objects; admin deletes both objects; each
178 // user receives a "delete" event for their own object (not for the
179 // other user's object).
180 func (s *v0Suite) TestEventTypeDelete(c *check.C) {
181 clients := []struct {
187 }{{token: arvadostest.ActiveToken}, {token: arvadostest.SpectatorToken}}
188 for i := range clients {
189 uuidChan := make(chan string, 1)
190 s.token = clients[i].token
191 s.emitEvents(c, uuidChan, nil)
192 clients[i].uuid = <-uuidChan
195 clients[i].conn, clients[i].r, clients[i].w, err = s.testClient()
196 c.Assert(err, check.IsNil)
198 c.Check(clients[i].w.Encode(map[string]interface{}{
199 "method": "subscribe",
201 s.expectStatus(c, clients[i].r, 200)
204 s.ignoreLogID = s.lastLogID(c)
205 s.deleteTestObjects(c)
207 for _, client := range clients {
208 lg := s.expectLog(c, client.r)
209 c.Check(lg.ObjectUUID, check.Equals, client.uuid)
210 c.Check(lg.EventType, check.Equals, "delete")
214 func (s *v0Suite) TestEventPropertiesFields(c *check.C) {
215 ac := arvados.NewClientFromEnv()
216 ac.AuthToken = s.token
218 conn, r, w, err := s.testClient()
219 c.Assert(err, check.IsNil)
222 c.Check(w.Encode(map[string]interface{}{
223 "method": "subscribe",
224 "filters": [][]string{{"object_uuid", "=", arvadostest.RunningContainerUUID}},
226 s.expectStatus(c, r, 200)
228 err = ac.RequestAndDecode(nil, "POST", "arvados/v1/logs", s.jsonBody("log", map[string]interface{}{
229 "object_uuid": arvadostest.RunningContainerUUID,
230 "event_type": "update",
231 "properties": map[string]interface{}{
232 "new_attributes": map[string]interface{}{
234 "requesting_container_uuid": "uuidvalue",
235 "state": "statevalue",
239 c.Assert(err, check.IsNil)
241 lg := s.expectLog(c, r)
242 c.Check(lg.ObjectUUID, check.Equals, arvadostest.RunningContainerUUID)
243 c.Check(lg.EventType, check.Equals, "update")
244 c.Check(lg.Properties["new_attributes"].(map[string]interface{})["requesting_container_uuid"], check.Equals, "uuidvalue")
245 c.Check(lg.Properties["new_attributes"].(map[string]interface{})["name"], check.Equals, "namevalue")
246 c.Check(lg.Properties["new_attributes"].(map[string]interface{})["state"], check.Equals, "statevalue")
249 // Trashing/deleting a collection produces an "update" event with
250 // properties["new_attributes"]["is_trashed"] == true.
251 func (s *v0Suite) TestTrashedCollection(c *check.C) {
252 ac := arvados.NewClientFromEnv()
253 ac.AuthToken = s.token
255 var coll arvados.Collection
256 err := ac.RequestAndDecode(&coll, "POST", "arvados/v1/collections", s.jsonBody("collection", `{"manifest_text":""}`), map[string]interface{}{"ensure_unique_name": true})
257 c.Assert(err, check.IsNil)
258 s.ignoreLogID = s.lastLogID(c)
260 conn, r, w, err := s.testClient()
261 c.Assert(err, check.IsNil)
264 c.Check(w.Encode(map[string]interface{}{
265 "method": "subscribe",
267 s.expectStatus(c, r, 200)
269 err = ac.RequestAndDecode(nil, "DELETE", "arvados/v1/collections/"+coll.UUID, nil, nil)
270 c.Assert(err, check.IsNil)
272 lg := s.expectLog(c, r)
273 c.Check(lg.ObjectUUID, check.Equals, coll.UUID)
274 c.Check(lg.EventType, check.Equals, "update")
275 c.Check(lg.Properties["old_attributes"].(map[string]interface{})["is_trashed"], check.Equals, false)
276 c.Check(lg.Properties["new_attributes"].(map[string]interface{})["is_trashed"], check.Equals, true)
279 func (s *v0Suite) TestSendBadJSON(c *check.C) {
280 conn, r, w, err := s.testClient()
281 c.Assert(err, check.IsNil)
284 c.Check(w.Encode(map[string]interface{}{
285 "method": "subscribe",
287 s.expectStatus(c, r, 200)
289 _, err = fmt.Fprint(conn, "^]beep\n")
290 c.Check(err, check.IsNil)
291 s.expectStatus(c, r, 400)
293 c.Check(w.Encode(map[string]interface{}{
294 "method": "subscribe",
296 s.expectStatus(c, r, 200)
299 func (s *v0Suite) TestSubscribe(c *check.C) {
300 conn, r, w, err := s.testClient()
301 c.Assert(err, check.IsNil)
304 s.emitEvents(c, nil, nil)
306 err = w.Encode(map[string]interface{}{"21": 12})
307 c.Check(err, check.IsNil)
308 s.expectStatus(c, r, 400)
310 err = w.Encode(map[string]interface{}{"method": "subscribe", "filters": []string{}})
311 c.Check(err, check.IsNil)
312 s.expectStatus(c, r, 200)
314 uuidChan := make(chan string, 1)
315 go s.emitEvents(c, uuidChan, nil)
318 for _, etype := range []string{"create", "blip", "update"} {
319 lg := s.expectLog(c, r)
320 for lg.ObjectUUID != uuid {
321 lg = s.expectLog(c, r)
323 c.Check(lg.EventType, check.Equals, etype)
327 func (s *v0Suite) TestManyEventsAndSubscribers(c *check.C) {
328 // Frequent slow listener pings create the conditions for a
329 // deadlock issue with the lib/pq example listener usage.
331 // Specifically: a lib/pq/example/listen-style event loop can
332 // deadlock if enough (~32) server notifications arrive after
333 // the event loop decides to call Ping (e.g., while
334 // listener.Ping() is waiting for a response from the server,
335 // or in the time.Sleep() invoked by testSlowPing).
337 // (*ListenerConn)listenerConnLoop() doesn't see the server's
338 // ping response until it finishes sending a previous
339 // notification through its internal queue to
340 // (*Listener)listenerConnLoop(), which is blocked on sending
341 // to our Notify channel, which is blocked on waiting for the
343 defer func(d time.Duration) {
344 listenerPingInterval = d
346 }(listenerPingInterval)
347 listenerPingInterval = time.Second / 2
349 // Restart the test server in order to get one that uses our
354 done := make(chan struct{})
356 go s.emitEvents(c, nil, done)
358 // We will expect to receive at least one event during each
359 // one-second interval while the test is running.
362 receivedPerSecond := make([]int64, seconds)
364 ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Duration(seconds)*time.Second))
366 for clientID := 0; clientID < 100; clientID++ {
369 for ctx.Err() == nil {
370 conn, r, w, err := s.testClient()
371 if ctx.Err() != nil {
374 c.Assert(err, check.IsNil)
376 err = w.Encode(map[string]interface{}{"method": "subscribe", "filters": []string{}})
377 if ctx.Err() != nil {
380 c.Check(err, check.IsNil)
381 s.expectStatus(c, r, 200)
383 if clientID%10 == 0 {
385 time.Sleep(time.Second / 20)
386 } else if rand.Float64() < 0.01 {
387 // disconnect+reconnect
392 if ctx.Err() != nil {
395 if errors.Is(err, io.EOF) {
398 c.Check(err, check.IsNil)
399 if i := int(time.Since(t0) / time.Second); i < seconds {
400 atomic.AddInt64(&receivedPerSecond[i], 1)
409 for i, n := range receivedPerSecond {
410 c.Logf("t<%d n=%d", i+1, n)
411 c.Check(int64(n), check.Not(check.Equals), int64(0))
415 // Generate some events by creating and updating a workflow object,
416 // and creating a custom log entry (event_type="blip") about the newly
419 // If uuidChan is not nil, send the new workflow UUID to uuidChan as
420 // soon as it's known.
422 // If done is not nil, keep generating events until done receives or
424 func (s *v0Suite) emitEvents(c *check.C, uuidChan chan<- string, done <-chan struct{}) {
428 ac := arvados.NewClientFromEnv()
429 ac.AuthToken = s.token
430 wf := &arvados.Workflow{
433 err := ac.RequestAndDecode(wf, "POST", "arvados/v1/workflows", s.jsonBody("workflow", `{"name":"ws_test"}`), map[string]interface{}{"ensure_unique_name": true})
434 c.Assert(err, check.IsNil)
435 s.toDelete = append(s.toDelete, "arvados/v1/workflows/"+wf.UUID)
441 err = ac.RequestAndDecode(lg, "POST", "arvados/v1/logs", s.jsonBody("log", map[string]interface{}{
442 "object_uuid": wf.UUID,
443 "event_type": "blip",
444 "properties": map[string]interface{}{
448 s.toDelete = append(s.toDelete, "arvados/v1/logs/"+lg.UUID)
454 time.Sleep(100 * time.Millisecond)
464 err = ac.RequestAndDecode(wf, "PUT", "arvados/v1/workflows/"+wf.UUID, s.jsonBody("workflow", `{"name":"ws_test"}`), nil)
470 func (s *v0Suite) jsonBody(rscName string, ob interface{}) io.Reader {
471 val, ok := ob.(string)
473 j, err := json.Marshal(ob)
480 v[rscName] = []string{val}
481 return bytes.NewBufferString(v.Encode())
484 func (s *v0Suite) expectStatus(c *check.C, r *json.Decoder, status int) {
485 msg := map[string]interface{}{}
486 c.Check(r.Decode(&msg), check.IsNil)
487 c.Check(int(msg["status"].(float64)), check.Equals, status)
490 func (s *v0Suite) expectLog(c *check.C, r *json.Decoder) *arvados.Log {
492 ok := make(chan struct{})
495 for lg.ID <= s.ignoreLogID {
496 c.Assert(r.Decode(lg), check.IsNil)
500 case <-time.After(10 * time.Second):
509 func (s *v0Suite) testClient() (*websocket.Conn, *json.Decoder, *json.Encoder, error) {
510 srv := s.serviceSuite.srv
511 conn, err := websocket.Dial(strings.Replace(srv.URL, "http", "ws", 1)+"/websocket?api_token="+s.token, "", srv.URL)
513 return nil, nil, nil, err
515 w := json.NewEncoder(conn)
516 r := json.NewDecoder(conn)
517 return conn, r, w, nil
520 func (s *v0Suite) lastLogID(c *check.C) int64 {
522 c.Assert(testDB().QueryRow(`SELECT MAX(id) FROM logs`).Scan(&lastID), check.IsNil)