Merge branch 'master' into 6588-split-manage-account
[arvados.git] / services / api / test / integration / websocket_test.rb
1 require 'test_helper'
2 require 'websocket_runner'
3 require 'oj'
4 require 'database_cleaner'
5
6 DatabaseCleaner.strategy = :deletion
7
8 class WebsocketTest < ActionDispatch::IntegrationTest
9   self.use_transactional_fixtures = false
10
11   setup do
12     DatabaseCleaner.start
13   end
14
15   teardown do
16     DatabaseCleaner.clean
17   end
18
19   def ws_helper (token = nil, timeout = true)
20     opened = false
21     close_status = nil
22     too_long = false
23
24     EM.run {
25       if token
26         ws = Faye::WebSocket::Client.new("ws://localhost:3002/websocket?api_token=#{api_client_authorizations(token).api_token}")
27       else
28         ws = Faye::WebSocket::Client.new("ws://localhost:3002/websocket")
29       end
30
31       ws.on :open do |event|
32         opened = true
33         if timeout
34           EM::Timer.new 4 do
35             too_long = true if close_status.nil?
36             EM.stop_event_loop
37           end
38         end
39       end
40
41       ws.on :close do |event|
42         close_status = [:close, event.code, event.reason]
43         EM.stop_event_loop
44       end
45
46       yield ws
47     }
48
49     assert opened, "Should have opened web socket"
50     assert (not too_long), "Test took too long"
51     assert_equal 1000, close_status[1], "Connection closed unexpectedly (check log for errors)"
52   end
53
54   test "connect with no token" do
55     status = nil
56
57     ws_helper do |ws|
58       ws.on :message do |event|
59         d = Oj.load event.data
60         status = d["status"]
61         ws.close
62       end
63     end
64
65     assert_equal 401, status
66   end
67
68
69   test "connect, subscribe and get response" do
70     status = nil
71
72     ws_helper :admin do |ws|
73       ws.on :open do |event|
74         ws.send ({method: 'subscribe'}.to_json)
75       end
76
77       ws.on :message do |event|
78         d = Oj.load event.data
79         status = d["status"]
80         ws.close
81       end
82     end
83
84     assert_equal 200, status
85   end
86
87   def subscribe_test
88     state = 1
89     spec = nil
90     ev_uuid = nil
91
92     authorize_with :admin
93
94     ws_helper :admin do |ws|
95       ws.on :open do |event|
96         ws.send ({method: 'subscribe'}.to_json)
97       end
98
99       ws.on :message do |event|
100         d = Oj.load event.data
101         case state
102         when 1
103           assert_equal 200, d["status"]
104           spec = Specimen.create
105           state = 2
106         when 2
107           ev_uuid = d["object_uuid"]
108           ws.close
109         end
110       end
111
112     end
113
114     assert_not_nil spec
115     assert_equal spec.uuid, ev_uuid
116   end
117
118   test "connect, subscribe, get event" do
119     subscribe_test()
120   end
121
122   test "connect, subscribe, get two events" do
123     state = 1
124     spec = nil
125     human = nil
126     spec_ev_uuid = nil
127     human_ev_uuid = nil
128
129     authorize_with :admin
130
131     ws_helper :admin do |ws|
132       ws.on :open do |event|
133         ws.send ({method: 'subscribe'}.to_json)
134       end
135
136       ws.on :message do |event|
137         d = Oj.load event.data
138         case state
139         when 1
140           assert_equal 200, d["status"]
141           spec = Specimen.create
142           human = Human.create
143           state = 2
144         when 2
145           spec_ev_uuid = d["object_uuid"]
146           state = 3
147         when 3
148           human_ev_uuid = d["object_uuid"]
149           state = 4
150           ws.close
151         when 4
152           assert false, "Should not get any more events"
153         end
154       end
155
156     end
157
158     assert_not_nil spec
159     assert_not_nil human
160     assert_equal spec.uuid, spec_ev_uuid
161     assert_equal human.uuid, human_ev_uuid
162   end
163
164   test "connect, subscribe, filter events" do
165     state = 1
166     human = nil
167     human_ev_uuid = nil
168
169     authorize_with :admin
170
171     ws_helper :admin do |ws|
172       ws.on :open do |event|
173         ws.send ({method: 'subscribe', filters: [['object_uuid', 'is_a', 'arvados#human']]}.to_json)
174       end
175
176       ws.on :message do |event|
177         d = Oj.load event.data
178         case state
179         when 1
180           assert_equal 200, d["status"]
181           Specimen.create
182           human = Human.create
183           state = 2
184         when 2
185           human_ev_uuid = d["object_uuid"]
186           state = 3
187           ws.close
188         when 3
189           assert false, "Should not get any more events"
190         end
191       end
192
193     end
194
195     assert_not_nil human
196     assert_equal human.uuid, human_ev_uuid
197   end
198
199
200   test "connect, subscribe, multiple filters" do
201     state = 1
202     spec = nil
203     human = nil
204     spec_ev_uuid = nil
205     human_ev_uuid = nil
206
207     authorize_with :admin
208
209     ws_helper :admin do |ws|
210       ws.on :open do |event|
211         ws.send ({method: 'subscribe', filters: [['object_uuid', 'is_a', 'arvados#human']]}.to_json)
212         ws.send ({method: 'subscribe', filters: [['object_uuid', 'is_a', 'arvados#specimen']]}.to_json)
213       end
214
215       ws.on :message do |event|
216         d = Oj.load event.data
217         case state
218         when 1
219           assert_equal 200, d["status"]
220           state = 2
221         when 2
222           assert_equal 200, d["status"]
223           spec = Specimen.create
224           Trait.create # not part of filters, should not be received
225           human = Human.create
226           state = 3
227         when 3
228           spec_ev_uuid = d["object_uuid"]
229           state = 4
230         when 4
231           human_ev_uuid = d["object_uuid"]
232           state = 5
233           ws.close
234         when 5
235           assert false, "Should not get any more events"
236         end
237       end
238
239     end
240
241     assert_not_nil spec
242     assert_not_nil human
243     assert_equal spec.uuid, spec_ev_uuid
244     assert_equal human.uuid, human_ev_uuid
245   end
246
247
248   test "connect, subscribe, compound filter" do
249     state = 1
250     t1 = nil
251
252     authorize_with :admin
253
254     ws_helper :admin do |ws|
255       ws.on :open do |event|
256         ws.send ({method: 'subscribe', filters: [['object_uuid', 'is_a', 'arvados#trait'], ['event_type', '=', 'update']]}.to_json)
257       end
258
259       ws.on :message do |event|
260         d = Oj.load event.data
261         case state
262         when 1
263           assert_equal 200, d["status"]
264           t1 = Trait.create("name" => "foo")
265           t1.name = "bar"
266           t1.save!
267           state = 2
268          when 2
269           assert_equal 'update', d['event_type']
270           state = 3
271           ws.close
272         when 3
273           assert false, "Should not get any more events"
274         end
275       end
276
277     end
278
279     assert_equal 3, state
280     assert_not_nil t1
281   end
282
283   test "connect, subscribe, ask events starting at seq num" do
284     state = 1
285     human = nil
286     human_ev_uuid = nil
287
288     authorize_with :admin
289
290     lastid = logs(:admin_changes_specimen).id
291     l1 = nil
292     l2 = nil
293
294     ws_helper :admin do |ws|
295       ws.on :open do |event|
296         ws.send ({method: 'subscribe', last_log_id: lastid}.to_json)
297       end
298
299       ws.on :message do |event|
300         d = Oj.load event.data
301         case state
302         when 1
303           assert_equal 200, d["status"]
304           state = 2
305         when 2
306           l1 = d["object_uuid"]
307           assert_not_nil l1, "Unexpected message: #{d}"
308           state = 3
309         when 3
310           l2 = d["object_uuid"]
311           assert_not_nil l2, "Unexpected message: #{d}"
312           state = 4
313           ws.close
314         when 4
315           assert false, "Should not get any more events"
316         end
317       end
318     end
319
320     expect_next_logs = Log.where('id > ?', lastid).order('id asc')
321     assert_equal expect_next_logs[0].object_uuid, l1
322     assert_equal expect_next_logs[1].object_uuid, l2
323   end
324
325   test "connect, subscribe, get event, unsubscribe" do
326     state = 1
327     spec = nil
328     spec_ev_uuid = nil
329     filter_id = nil
330
331     authorize_with :admin
332
333     ws_helper :admin, false do |ws|
334       ws.on :open do |event|
335         ws.send ({method: 'subscribe'}.to_json)
336         EM::Timer.new 3 do
337           # Set a time limit on the test because after unsubscribing the server
338           # still has to process the next event (and then hopefully correctly
339           # decides not to send it because we unsubscribed.)
340           ws.close
341         end
342       end
343
344       ws.on :message do |event|
345         d = Oj.load event.data
346         case state
347         when 1
348           assert_equal 200, d["status"]
349           spec = Specimen.create
350           state = 2
351         when 2
352           spec_ev_uuid = d["object_uuid"]
353           ws.send ({method: 'unsubscribe'}.to_json)
354
355           EM::Timer.new 1 do
356             Specimen.create
357           end
358
359           state = 3
360         when 3
361           assert_equal 200, d["status"]
362           state = 4
363         when 4
364           assert false, "Should not get any more events"
365         end
366       end
367
368     end
369
370     assert_not_nil spec
371     assert_equal spec.uuid, spec_ev_uuid
372   end
373
374   test "connect, subscribe, get event, unsubscribe with filter" do
375     state = 1
376     spec = nil
377     spec_ev_uuid = nil
378
379     authorize_with :admin
380
381     ws_helper :admin, false do |ws|
382       ws.on :open do |event|
383         ws.send ({method: 'subscribe', filters: [['object_uuid', 'is_a', 'arvados#human']]}.to_json)
384         EM::Timer.new 3 do
385           # Set a time limit on the test because after unsubscribing the server
386           # still has to process the next event (and then hopefully correctly
387           # decides not to send it because we unsubscribed.)
388           ws.close
389         end
390       end
391
392       ws.on :message do |event|
393         d = Oj.load event.data
394         case state
395         when 1
396           assert_equal 200, d["status"]
397           spec = Human.create
398           state = 2
399         when 2
400           spec_ev_uuid = d["object_uuid"]
401           ws.send ({method: 'unsubscribe', filters: [['object_uuid', 'is_a', 'arvados#human']]}.to_json)
402
403           EM::Timer.new 1 do
404             Human.create
405           end
406
407           state = 3
408         when 3
409           assert_equal 200, d["status"]
410           state = 4
411         when 4
412           assert false, "Should not get any more events"
413         end
414       end
415
416     end
417
418     assert_not_nil spec
419     assert_equal spec.uuid, spec_ev_uuid
420   end
421
422
423   test "connect, subscribe, get event, try to unsubscribe with bogus filter" do
424     state = 1
425     spec = nil
426     spec_ev_uuid = nil
427     human = nil
428     human_ev_uuid = nil
429
430     authorize_with :admin
431
432     ws_helper :admin do |ws|
433       ws.on :open do |event|
434         ws.send ({method: 'subscribe'}.to_json)
435       end
436
437       ws.on :message do |event|
438         d = Oj.load event.data
439         case state
440         when 1
441           assert_equal 200, d["status"]
442           spec = Specimen.create
443           state = 2
444         when 2
445           spec_ev_uuid = d["object_uuid"]
446           ws.send ({method: 'unsubscribe', filters: [['foo', 'bar', 'baz']]}.to_json)
447
448           EM::Timer.new 1 do
449             human = Human.create
450           end
451
452           state = 3
453         when 3
454           assert_equal 404, d["status"]
455           state = 4
456         when 4
457           human_ev_uuid = d["object_uuid"]
458           state = 5
459           ws.close
460         when 5
461           assert false, "Should not get any more events"
462         end
463       end
464
465     end
466
467     assert_not_nil spec
468     assert_not_nil human
469     assert_equal spec.uuid, spec_ev_uuid
470     assert_equal human.uuid, human_ev_uuid
471   end
472
473
474
475   test "connected, not subscribed, no event" do
476     authorize_with :admin
477
478     ws_helper :admin, false do |ws|
479       ws.on :open do |event|
480         EM::Timer.new 1 do
481           Specimen.create
482         end
483
484         EM::Timer.new 3 do
485           ws.close
486         end
487       end
488
489       ws.on :message do |event|
490         assert false, "Should not get any messages, message was #{event.data}"
491       end
492     end
493   end
494
495   test "connected, not authorized to see event" do
496     state = 1
497
498     authorize_with :admin
499
500     ws_helper :active, false do |ws|
501       ws.on :open do |event|
502         ws.send ({method: 'subscribe'}.to_json)
503
504         EM::Timer.new 3 do
505           ws.close
506         end
507       end
508
509       ws.on :message do |event|
510         d = Oj.load event.data
511         case state
512         when 1
513           assert_equal 200, d["status"]
514           Specimen.create
515           state = 2
516         when 2
517           assert false, "Should not get any messages, message was #{event.data}"
518         end
519       end
520
521     end
522
523   end
524
525   test "connect, try bogus method" do
526     status = nil
527
528     ws_helper :admin do |ws|
529       ws.on :open do |event|
530         ws.send ({method: 'frobnabble'}.to_json)
531       end
532
533       ws.on :message do |event|
534         d = Oj.load event.data
535         status = d["status"]
536         ws.close
537       end
538     end
539
540     assert_equal 400, status
541   end
542
543   test "connect, missing method" do
544     status = nil
545
546     ws_helper :admin do |ws|
547       ws.on :open do |event|
548         ws.send ({fizzbuzz: 'frobnabble'}.to_json)
549       end
550
551       ws.on :message do |event|
552         d = Oj.load event.data
553         status = d["status"]
554         ws.close
555       end
556     end
557
558     assert_equal 400, status
559   end
560
561   test "connect, send malformed request" do
562     status = nil
563
564     ws_helper :admin do |ws|
565       ws.on :open do |event|
566         ws.send '<XML4EVER></XML4EVER>'
567       end
568
569       ws.on :message do |event|
570         d = Oj.load event.data
571         status = d["status"]
572         ws.close
573       end
574     end
575
576     assert_equal 400, status
577   end
578
579
580   test "connect, try subscribe too many filters" do
581     state = 1
582
583     authorize_with :admin
584
585     ws_helper :admin do |ws|
586       ws.on :open do |event|
587         (1..17).each do |i|
588           ws.send ({method: 'subscribe', filters: [['object_uuid', '=', i]]}.to_json)
589         end
590       end
591
592       ws.on :message do |event|
593         d = Oj.load event.data
594         case state
595         when (1..EventBus::MAX_FILTERS)
596           assert_equal 200, d["status"]
597           state += 1
598         when (EventBus::MAX_FILTERS+1)
599           assert_equal 403, d["status"]
600           ws.close
601         end
602       end
603
604     end
605
606     assert_equal 17, state
607
608   end
609
610   test "connect, subscribe, lots of events" do
611     state = 1
612     event_count = 0
613     log_start = Log.order(:id).last.id
614
615     authorize_with :admin
616
617     ws_helper :admin, false do |ws|
618       EM::Timer.new 45 do
619         # Needs a longer timeout than the default
620         ws.close
621       end
622
623       ws.on :open do |event|
624         ws.send ({method: 'subscribe'}.to_json)
625       end
626
627       ws.on :message do |event|
628         d = Oj.load event.data
629         case state
630         when 1
631           assert_equal 200, d["status"]
632           ActiveRecord::Base.transaction do
633             (1..202).each do
634               spec = Specimen.create
635             end
636           end
637           state = 2
638         when 2
639           event_count += 1
640           assert_equal d['id'], event_count+log_start
641           if event_count == 202
642             ws.close
643           end
644         end
645       end
646
647     end
648
649     assert_equal 202, event_count
650   end
651
652
653   test "connect, subscribe with invalid filter" do
654     state = 1
655     human = nil
656     human_ev_uuid = nil
657
658     authorize_with :admin
659
660     ws_helper :admin do |ws|
661       ws.on :open do |event|
662         # test that #6451 is fixed (invalid filter crashes websockets)
663         ws.send ({method: 'subscribe', filters: [['object_blarg', 'is_a', 'arvados#human']]}.to_json)
664       end
665
666       ws.on :message do |event|
667         d = Oj.load event.data
668         case state
669         when 1
670           assert_equal 200, d["status"]
671           Specimen.create
672           human = Human.create
673           state = 2
674         when 2
675           assert_equal 500, d["status"]
676           state = 3
677           ws.close
678         when 3
679           assert false, "Should not get any more events"
680         end
681       end
682
683     end
684
685     assert_equal 3, state
686
687     # Try connecting again, ensure that websockets server is still running and
688     # didn't crash per #6451
689     subscribe_test()
690
691   end
692
693
694 end