1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
22 "git.arvados.org/arvados.git/sdk/go/arvados"
23 "git.arvados.org/arvados.git/sdk/go/arvadostest"
24 "git.arvados.org/arvados.git/sdk/go/ctxlog"
25 "golang.org/x/net/websocket"
26 check "gopkg.in/check.v1"
30 if os.Getenv("ARVADOS_DEBUG") != "" {
31 ctxlog.SetLevel("debug")
35 var _ = check.Suite(&v0Suite{})
38 serviceSuite serviceSuite
45 func (s *v0Suite) SetUpTest(c *check.C) {
46 s.serviceSuite.SetUpTest(c)
47 s.serviceSuite.start(c)
49 s.token = arvadostest.ActiveToken
50 s.ignoreLogID = s.lastLogID(c)
53 func (s *v0Suite) TearDownTest(c *check.C) {
55 s.serviceSuite.TearDownTest(c)
58 func (s *v0Suite) TearDownSuite(c *check.C) {
59 s.deleteTestObjects(c)
62 func (s *v0Suite) deleteTestObjects(c *check.C) {
63 ac := arvados.NewClientFromEnv()
64 ac.AuthToken = arvadostest.AdminToken
65 for _, path := range s.toDelete {
66 err := ac.RequestAndDecode(nil, "DELETE", path, nil, nil)
67 c.Check(err, check.IsNil)
72 func (s *v0Suite) TestFilters(c *check.C) {
73 conn, r, w, err := s.testClient()
74 c.Assert(err, check.IsNil)
77 cmd := func(method, eventType string, status int) {
78 c.Check(w.Encode(map[string]interface{}{
80 "filters": [][]interface{}{{"event_type", "in", []string{eventType}}},
82 s.expectStatus(c, r, status)
84 cmd("subscribe", "update", 200)
85 cmd("subscribe", "update", 200)
86 cmd("subscribe", "create", 200)
87 cmd("subscribe", "update", 200)
88 cmd("unsubscribe", "blip", 400)
89 cmd("unsubscribe", "create", 200)
90 cmd("unsubscribe", "update", 200)
92 go s.emitEvents(c, nil, nil)
93 lg := s.expectLog(c, r)
94 c.Check(lg.EventType, check.Equals, "update")
96 cmd("unsubscribe", "update", 200)
97 cmd("unsubscribe", "update", 200)
98 cmd("unsubscribe", "update", 400)
101 func (s *v0Suite) TestLastLogID(c *check.C) {
102 lastID := s.lastLogID(c)
104 checkLogs := func(r *json.Decoder, uuid string) {
105 for _, etype := range []string{"create", "blip", "update"} {
106 lg := s.expectLog(c, r)
107 for lg.ObjectUUID != uuid {
108 lg = s.expectLog(c, r)
110 c.Check(lg.EventType, check.Equals, etype)
114 // Connecting connEarly (before sending the early events) lets
115 // us confirm all of the "early" events have already passed
116 // through the server.
117 connEarly, rEarly, wEarly, err := s.testClient()
118 c.Assert(err, check.IsNil)
119 defer connEarly.Close()
120 c.Check(wEarly.Encode(map[string]interface{}{
121 "method": "subscribe",
123 s.expectStatus(c, rEarly, 200)
125 // Send the early events.
126 uuidChan := make(chan string, 1)
127 s.emitEvents(c, uuidChan, nil)
128 uuidEarly := <-uuidChan
130 // Wait for the early events to pass through.
131 checkLogs(rEarly, uuidEarly)
133 // Connect the client that wants to get old events via
135 conn, r, w, err := s.testClient()
136 c.Assert(err, check.IsNil)
139 c.Check(w.Encode(map[string]interface{}{
140 "method": "subscribe",
141 "last_log_id": lastID,
143 s.expectStatus(c, r, 200)
145 checkLogs(r, uuidEarly)
146 s.emitEvents(c, uuidChan, nil)
147 checkLogs(r, <-uuidChan)
150 func (s *v0Suite) TestPermission(c *check.C) {
151 conn, r, w, err := s.testClient()
152 c.Assert(err, check.IsNil)
155 c.Check(w.Encode(map[string]interface{}{
156 "method": "subscribe",
158 s.expectStatus(c, r, 200)
160 uuidChan := make(chan string, 2)
162 s.token = arvadostest.AdminToken
163 s.emitEvents(c, uuidChan, nil)
164 s.token = arvadostest.ActiveToken
165 s.emitEvents(c, uuidChan, nil)
168 wrongUUID := <-uuidChan
169 rightUUID := <-uuidChan
170 lg := s.expectLog(c, r)
171 for lg.ObjectUUID != rightUUID {
172 c.Check(lg.ObjectUUID, check.Not(check.Equals), wrongUUID)
173 lg = s.expectLog(c, r)
177 // Two users create private objects; admin deletes both objects; each
178 // user receives a "delete" event for their own object (not for the
179 // other user's object).
180 func (s *v0Suite) TestEventTypeDelete(c *check.C) {
181 clients := []struct {
187 }{{token: arvadostest.ActiveToken}, {token: arvadostest.SpectatorToken}}
188 for i := range clients {
189 uuidChan := make(chan string, 1)
190 s.token = clients[i].token
191 s.emitEvents(c, uuidChan, nil)
192 clients[i].uuid = <-uuidChan
195 clients[i].conn, clients[i].r, clients[i].w, err = s.testClient()
196 c.Assert(err, check.IsNil)
198 c.Check(clients[i].w.Encode(map[string]interface{}{
199 "method": "subscribe",
201 s.expectStatus(c, clients[i].r, 200)
204 s.ignoreLogID = s.lastLogID(c)
205 s.deleteTestObjects(c)
207 for _, client := range clients {
208 lg := s.expectLog(c, client.r)
209 c.Check(lg.ObjectUUID, check.Equals, client.uuid)
210 c.Check(lg.EventType, check.Equals, "delete")
214 // Trashing/deleting a collection produces an "update" event with
215 // properties["new_attributes"]["is_trashed"] == true.
216 func (s *v0Suite) TestTrashedCollection(c *check.C) {
217 ac := arvados.NewClientFromEnv()
218 ac.AuthToken = s.token
220 var coll arvados.Collection
221 err := ac.RequestAndDecode(&coll, "POST", "arvados/v1/collections", s.jsonBody("collection", `{"manifest_text":""}`), map[string]interface{}{"ensure_unique_name": true})
222 c.Assert(err, check.IsNil)
223 s.ignoreLogID = s.lastLogID(c)
225 conn, r, w, err := s.testClient()
226 c.Assert(err, check.IsNil)
229 c.Check(w.Encode(map[string]interface{}{
230 "method": "subscribe",
232 s.expectStatus(c, r, 200)
234 err = ac.RequestAndDecode(nil, "DELETE", "arvados/v1/collections/"+coll.UUID, nil, nil)
235 c.Assert(err, check.IsNil)
237 lg := s.expectLog(c, r)
238 c.Check(lg.ObjectUUID, check.Equals, coll.UUID)
239 c.Check(lg.EventType, check.Equals, "update")
240 c.Check(lg.Properties["old_attributes"].(map[string]interface{})["is_trashed"], check.Equals, false)
241 c.Check(lg.Properties["new_attributes"].(map[string]interface{})["is_trashed"], check.Equals, true)
244 func (s *v0Suite) TestSendBadJSON(c *check.C) {
245 conn, r, w, err := s.testClient()
246 c.Assert(err, check.IsNil)
249 c.Check(w.Encode(map[string]interface{}{
250 "method": "subscribe",
252 s.expectStatus(c, r, 200)
254 _, err = fmt.Fprint(conn, "^]beep\n")
255 c.Check(err, check.IsNil)
256 s.expectStatus(c, r, 400)
258 c.Check(w.Encode(map[string]interface{}{
259 "method": "subscribe",
261 s.expectStatus(c, r, 200)
264 func (s *v0Suite) TestSubscribe(c *check.C) {
265 conn, r, w, err := s.testClient()
266 c.Assert(err, check.IsNil)
269 s.emitEvents(c, nil, nil)
271 err = w.Encode(map[string]interface{}{"21": 12})
272 c.Check(err, check.IsNil)
273 s.expectStatus(c, r, 400)
275 err = w.Encode(map[string]interface{}{"method": "subscribe", "filters": []string{}})
276 c.Check(err, check.IsNil)
277 s.expectStatus(c, r, 200)
279 uuidChan := make(chan string, 1)
280 go s.emitEvents(c, uuidChan, nil)
283 for _, etype := range []string{"create", "blip", "update"} {
284 lg := s.expectLog(c, r)
285 for lg.ObjectUUID != uuid {
286 lg = s.expectLog(c, r)
288 c.Check(lg.EventType, check.Equals, etype)
292 func (s *v0Suite) TestManyEventsAndSubscribers(c *check.C) {
293 // Frequent slow listener pings create the conditions for a
294 // deadlock issue with the lib/pq example listener usage.
296 // Specifically: a lib/pq/example/listen-style event loop can
297 // deadlock if enough (~32) server notifications arrive after
298 // the event loop decides to call Ping (e.g., while
299 // listener.Ping() is waiting for a response from the server,
300 // or in the time.Sleep() invoked by testSlowPing).
302 // (*ListenerConn)listenerConnLoop() doesn't see the server's
303 // ping response until it finishes sending a previous
304 // notification through its internal queue to
305 // (*Listener)listenerConnLoop(), which is blocked on sending
306 // to our Notify channel, which is blocked on waiting for the
308 defer func(d time.Duration) {
309 listenerPingInterval = d
311 }(listenerPingInterval)
312 listenerPingInterval = time.Second / 2
314 // Restart the test server in order to get one that uses our
319 done := make(chan struct{})
321 go s.emitEvents(c, nil, done)
323 // We will expect to receive at least one event during each
324 // one-second interval while the test is running.
327 receivedPerSecond := make([]int64, seconds)
329 ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Duration(seconds)*time.Second))
331 for clientID := 0; clientID < 100; clientID++ {
334 for ctx.Err() == nil {
335 conn, r, w, err := s.testClient()
336 if ctx.Err() != nil {
339 c.Assert(err, check.IsNil)
341 err = w.Encode(map[string]interface{}{"method": "subscribe", "filters": []string{}})
342 if ctx.Err() != nil {
345 c.Check(err, check.IsNil)
346 s.expectStatus(c, r, 200)
348 if clientID%10 == 0 {
350 time.Sleep(time.Second / 20)
351 } else if rand.Float64() < 0.01 {
352 // disconnect+reconnect
357 if ctx.Err() != nil {
360 if errors.Is(err, io.EOF) {
363 c.Check(err, check.IsNil)
364 if i := int(time.Since(t0) / time.Second); i < seconds {
365 atomic.AddInt64(&receivedPerSecond[i], 1)
374 for i, n := range receivedPerSecond {
375 c.Logf("t<%d n=%d", i+1, n)
376 c.Check(int64(n), check.Not(check.Equals), int64(0))
380 // Generate some events by creating and updating a workflow object,
381 // and creating a custom log entry (event_type="blip") about the newly
384 // If uuidChan is not nil, send the new workflow UUID to uuidChan as
385 // soon as it's known.
387 // If done is not nil, keep generating events until done receives or
389 func (s *v0Suite) emitEvents(c *check.C, uuidChan chan<- string, done <-chan struct{}) {
393 ac := arvados.NewClientFromEnv()
394 ac.AuthToken = s.token
395 wf := &arvados.Workflow{
398 err := ac.RequestAndDecode(wf, "POST", "arvados/v1/workflows", s.jsonBody("workflow", `{"name":"ws_test"}`), map[string]interface{}{"ensure_unique_name": true})
399 c.Assert(err, check.IsNil)
400 s.toDelete = append(s.toDelete, "arvados/v1/workflows/"+wf.UUID)
406 err = ac.RequestAndDecode(lg, "POST", "arvados/v1/logs", s.jsonBody("log", map[string]interface{}{
407 "object_uuid": wf.UUID,
408 "event_type": "blip",
409 "properties": map[string]interface{}{
413 s.toDelete = append(s.toDelete, "arvados/v1/logs/"+lg.UUID)
419 time.Sleep(100 * time.Millisecond)
429 err = ac.RequestAndDecode(wf, "PUT", "arvados/v1/workflows/"+wf.UUID, s.jsonBody("workflow", `{"name":"ws_test"}`), nil)
435 func (s *v0Suite) jsonBody(rscName string, ob interface{}) io.Reader {
436 val, ok := ob.(string)
438 j, err := json.Marshal(ob)
445 v[rscName] = []string{val}
446 return bytes.NewBufferString(v.Encode())
449 func (s *v0Suite) expectStatus(c *check.C, r *json.Decoder, status int) {
450 msg := map[string]interface{}{}
451 c.Check(r.Decode(&msg), check.IsNil)
452 c.Check(int(msg["status"].(float64)), check.Equals, status)
455 func (s *v0Suite) expectLog(c *check.C, r *json.Decoder) *arvados.Log {
457 ok := make(chan struct{})
460 for lg.ID <= s.ignoreLogID {
461 c.Assert(r.Decode(lg), check.IsNil)
465 case <-time.After(10 * time.Second):
474 func (s *v0Suite) testClient() (*websocket.Conn, *json.Decoder, *json.Encoder, error) {
475 srv := s.serviceSuite.srv
476 conn, err := websocket.Dial(strings.Replace(srv.URL, "http", "ws", 1)+"/websocket?api_token="+s.token, "", srv.URL)
478 return nil, nil, nil, err
480 w := json.NewEncoder(conn)
481 r := json.NewDecoder(conn)
482 return conn, r, w, nil
485 func (s *v0Suite) lastLogID(c *check.C) int64 {
487 c.Assert(testDB().QueryRow(`SELECT MAX(id) FROM logs`).Scan(&lastID), check.IsNil)