Merge branch 'master' into 10172-crunch2-container-output
[arvados.git] / services / api / test / integration / websocket_test.rb
1 require 'test_helper'
2 require 'websocket_runner'
3 require 'oj'
4 require 'database_cleaner'
5
6 DatabaseCleaner.strategy = :deletion
7
8 class WebsocketTest < ActionDispatch::IntegrationTest
9   self.use_transactional_fixtures = false
10
11   setup do
12     DatabaseCleaner.start
13   end
14
15   teardown do
16     DatabaseCleaner.clean
17   end
18
19   def ws_helper (token = nil, timeout = true)
20     opened = false
21     close_status = nil
22     too_long = false
23
24     EM.run {
25       if token
26         ws = Faye::WebSocket::Client.new("ws://localhost:#{WEBSOCKET_PORT}/websocket?api_token=#{api_client_authorizations(token).api_token}")
27       else
28         ws = Faye::WebSocket::Client.new("ws://localhost:#{WEBSOCKET_PORT}/websocket")
29       end
30
31       ws.on :open do |event|
32         opened = true
33         if timeout
34           EM::Timer.new 8 do
35             too_long = true if close_status.nil?
36             EM.stop_event_loop
37           end
38         end
39       end
40
41       ws.on :close do |event|
42         close_status = [:close, event.code, event.reason]
43         EM.stop_event_loop
44       end
45
46       yield ws
47     }
48
49     assert opened, "Should have opened web socket"
50     assert (not too_long), "Test took too long"
51     assert_equal 1000, close_status[1], "Connection closed unexpectedly (check log for errors)"
52   end
53
54   test "connect with no token" do
55     status = nil
56
57     ws_helper do |ws|
58       ws.on :message do |event|
59         d = Oj.strict_load event.data
60         status = d["status"]
61         ws.close
62       end
63     end
64
65     assert_equal 401, status
66   end
67
68
69   test "connect, subscribe and get response" do
70     status = nil
71
72     ws_helper :active do |ws|
73       ws.on :open do |event|
74         ws.send ({method: 'subscribe'}.to_json)
75       end
76
77       ws.on :message do |event|
78         d = Oj.strict_load event.data
79         status = d["status"]
80         ws.close
81       end
82     end
83
84     assert_equal 200, status
85   end
86
87   def subscribe_test
88     state = 1
89     spec = nil
90     ev_uuid = nil
91
92     authorize_with :active
93
94     ws_helper :active do |ws|
95       ws.on :open do |event|
96         ws.send ({method: 'subscribe'}.to_json)
97       end
98
99       ws.on :message do |event|
100         d = Oj.strict_load event.data
101         case state
102         when 1
103           assert_equal 200, d["status"]
104           spec = Specimen.create
105           state = 2
106         when 2
107           ev_uuid = d["object_uuid"]
108           ws.close
109         end
110       end
111
112     end
113
114     assert_not_nil spec
115     assert_equal spec.uuid, ev_uuid
116   end
117
118   test "connect, subscribe, get event" do
119     subscribe_test()
120   end
121
122   test "connect, subscribe, get two events" do
123     state = 1
124     spec = nil
125     human = nil
126     spec_ev_uuid = nil
127     human_ev_uuid = nil
128
129     authorize_with :active
130
131     ws_helper :active do |ws|
132       ws.on :open do |event|
133         ws.send ({method: 'subscribe'}.to_json)
134       end
135
136       ws.on :message do |event|
137         d = Oj.strict_load event.data
138         case state
139         when 1
140           assert_equal 200, d["status"]
141           spec = Specimen.create
142           human = Human.create
143           state = 2
144         when 2
145           spec_ev_uuid = d["object_uuid"]
146           state = 3
147         when 3
148           human_ev_uuid = d["object_uuid"]
149           state = 4
150           ws.close
151         when 4
152           assert false, "Should not get any more events"
153         end
154       end
155
156     end
157
158     assert_not_nil spec
159     assert_not_nil human
160     assert_equal spec.uuid, spec_ev_uuid
161     assert_equal human.uuid, human_ev_uuid
162   end
163
164   test "connect, subscribe, filter events" do
165     state = 1
166     human = nil
167     human_ev_uuid = nil
168
169     authorize_with :active
170
171     ws_helper :active do |ws|
172       ws.on :open do |event|
173         ws.send ({method: 'subscribe', filters: [['object_uuid', 'is_a', 'arvados#human']]}.to_json)
174       end
175
176       ws.on :message do |event|
177         d = Oj.strict_load event.data
178         case state
179         when 1
180           assert_equal 200, d["status"]
181           Specimen.create
182           human = Human.create
183           state = 2
184         when 2
185           human_ev_uuid = d["object_uuid"]
186           state = 3
187           ws.close
188         when 3
189           assert false, "Should not get any more events"
190         end
191       end
192
193     end
194
195     assert_not_nil human
196     assert_equal human.uuid, human_ev_uuid
197   end
198
199
200   test "connect, subscribe, multiple filters" do
201     state = 1
202     spec = nil
203     human = nil
204     spec_ev_uuid = nil
205     human_ev_uuid = nil
206
207     authorize_with :active
208
209     ws_helper :active do |ws|
210       ws.on :open do |event|
211         ws.send ({method: 'subscribe', filters: [['object_uuid', 'is_a', 'arvados#human']]}.to_json)
212         ws.send ({method: 'subscribe', filters: [['object_uuid', 'is_a', 'arvados#specimen']]}.to_json)
213       end
214
215       ws.on :message do |event|
216         d = Oj.strict_load event.data
217         case state
218         when 1
219           assert_equal 200, d["status"]
220           state = 2
221         when 2
222           assert_equal 200, d["status"]
223           spec = Specimen.create
224           Trait.create # not part of filters, should not be received
225           human = Human.create
226           state = 3
227         when 3
228           spec_ev_uuid = d["object_uuid"]
229           state = 4
230         when 4
231           human_ev_uuid = d["object_uuid"]
232           state = 5
233           ws.close
234         when 5
235           assert false, "Should not get any more events"
236         end
237       end
238
239     end
240
241     assert_not_nil spec
242     assert_not_nil human
243     assert_equal spec.uuid, spec_ev_uuid
244     assert_equal human.uuid, human_ev_uuid
245   end
246
247
248   test "connect, subscribe, compound filter" do
249     state = 1
250     t1 = nil
251
252     authorize_with :active
253
254     ws_helper :active do |ws|
255       ws.on :open do |event|
256         ws.send ({method: 'subscribe', filters: [['object_uuid', 'is_a', 'arvados#trait'], ['event_type', '=', 'update']]}.to_json)
257       end
258
259       ws.on :message do |event|
260         d = Oj.strict_load event.data
261         case state
262         when 1
263           assert_equal 200, d["status"]
264           t1 = Trait.create("name" => "foo")
265           t1.name = "bar"
266           t1.save!
267           state = 2
268          when 2
269           assert_equal 'update', d['event_type']
270           state = 3
271           ws.close
272         when 3
273           assert false, "Should not get any more events"
274         end
275       end
276
277     end
278
279     assert_equal 3, state
280     assert_not_nil t1
281   end
282
283   test "connect, subscribe, ask events starting at seq num" do
284     state = 1
285     human = nil
286     human_ev_uuid = nil
287
288     authorize_with :active
289
290     lastid = logs(:admin_changes_specimen).id
291     l1 = nil
292     l2 = nil
293
294     ws_helper :active do |ws|
295       ws.on :open do |event|
296         ws.send ({method: 'subscribe', last_log_id: lastid}.to_json)
297       end
298
299       ws.on :message do |event|
300         d = Oj.strict_load event.data
301         case state
302         when 1
303           assert_equal 200, d["status"]
304           state = 2
305         when 2
306           l1 = d["object_uuid"]
307           assert_not_nil l1, "Unexpected message: #{d}"
308           state = 3
309         when 3
310           l2 = d["object_uuid"]
311           assert_not_nil l2, "Unexpected message: #{d}"
312           state = 4
313           ws.close
314         when 4
315           assert false, "Should not get any more events"
316         end
317       end
318     end
319
320     expect_next_logs = Log.where('id > ?', lastid).order('id asc')
321     assert_equal expect_next_logs[0].object_uuid, l1
322     assert_equal expect_next_logs[1].object_uuid, l2
323   end
324
325   test "connect, subscribe, get event, unsubscribe" do
326     slow_test
327     state = 1
328     spec = nil
329     spec_ev_uuid = nil
330     filter_id = nil
331
332     authorize_with :active
333
334     ws_helper :active, false do |ws|
335       ws.on :open do |event|
336         ws.send ({method: 'subscribe'}.to_json)
337         EM::Timer.new 3 do
338           # Set a time limit on the test because after unsubscribing the server
339           # still has to process the next event (and then hopefully correctly
340           # decides not to send it because we unsubscribed.)
341           ws.close
342         end
343       end
344
345       ws.on :message do |event|
346         d = Oj.strict_load event.data
347         case state
348         when 1
349           assert_equal 200, d["status"]
350           spec = Specimen.create
351           state = 2
352         when 2
353           spec_ev_uuid = d["object_uuid"]
354           ws.send ({method: 'unsubscribe'}.to_json)
355
356           EM::Timer.new 1 do
357             Specimen.create
358           end
359
360           state = 3
361         when 3
362           assert_equal 200, d["status"]
363           state = 4
364         when 4
365           assert false, "Should not get any more events"
366         end
367       end
368
369     end
370
371     assert_not_nil spec
372     assert_equal spec.uuid, spec_ev_uuid
373   end
374
375   test "connect, subscribe, get event, unsubscribe with filter" do
376     slow_test
377     state = 1
378     spec = nil
379     spec_ev_uuid = nil
380
381     authorize_with :active
382
383     ws_helper :active, false do |ws|
384       ws.on :open do |event|
385         ws.send ({method: 'subscribe', filters: [['object_uuid', 'is_a', 'arvados#human']]}.to_json)
386         EM::Timer.new 6 do
387           # Set a time limit on the test because after unsubscribing the server
388           # still has to process the next event (and then hopefully correctly
389           # decides not to send it because we unsubscribed.)
390           ws.close
391         end
392       end
393
394       ws.on :message do |event|
395         d = Oj.strict_load event.data
396         case state
397         when 1
398           assert_equal 200, d["status"]
399           spec = Human.create
400           state = 2
401         when 2
402           spec_ev_uuid = d["object_uuid"]
403           ws.send ({method: 'unsubscribe', filters: [['object_uuid', 'is_a', 'arvados#human']]}.to_json)
404
405           EM::Timer.new 1 do
406             Human.create
407           end
408
409           state = 3
410         when 3
411           assert_equal 200, d["status"]
412           state = 4
413         when 4
414           assert false, "Should not get any more events"
415         end
416       end
417
418     end
419
420     assert_not_nil spec
421     assert_equal spec.uuid, spec_ev_uuid
422   end
423
424
425   test "connect, subscribe, get event, try to unsubscribe with bogus filter" do
426     slow_test
427     state = 1
428     spec = nil
429     spec_ev_uuid = nil
430     human = nil
431     human_ev_uuid = nil
432
433     authorize_with :active
434
435     ws_helper :active do |ws|
436       ws.on :open do |event|
437         ws.send ({method: 'subscribe'}.to_json)
438       end
439
440       ws.on :message do |event|
441         d = Oj.strict_load event.data
442         case state
443         when 1
444           assert_equal 200, d["status"]
445           spec = Specimen.create
446           state = 2
447         when 2
448           spec_ev_uuid = d["object_uuid"]
449           ws.send ({method: 'unsubscribe', filters: [['foo', 'bar', 'baz']]}.to_json)
450
451           EM::Timer.new 1 do
452             human = Human.create
453           end
454
455           state = 3
456         when 3
457           assert_equal 404, d["status"]
458           state = 4
459         when 4
460           human_ev_uuid = d["object_uuid"]
461           state = 5
462           ws.close
463         when 5
464           assert false, "Should not get any more events"
465         end
466       end
467
468     end
469
470     assert_not_nil spec
471     assert_not_nil human
472     assert_equal spec.uuid, spec_ev_uuid
473     assert_equal human.uuid, human_ev_uuid
474   end
475
476
477
478   test "connected, not subscribed, no event" do
479     slow_test
480     authorize_with :active
481
482     ws_helper :active, false do |ws|
483       ws.on :open do |event|
484         EM::Timer.new 1 do
485           Specimen.create
486         end
487
488         EM::Timer.new 3 do
489           ws.close
490         end
491       end
492
493       ws.on :message do |event|
494         assert false, "Should not get any messages, message was #{event.data}"
495       end
496     end
497   end
498
499   test "connected, not authorized to see event" do
500     slow_test
501     state = 1
502
503     authorize_with :admin
504
505     ws_helper :active, false do |ws|
506       ws.on :open do |event|
507         ws.send ({method: 'subscribe'}.to_json)
508
509         EM::Timer.new 3 do
510           ws.close
511         end
512       end
513
514       ws.on :message do |event|
515         d = Oj.strict_load event.data
516         case state
517         when 1
518           assert_equal 200, d["status"]
519           Specimen.create
520           state = 2
521         when 2
522           assert false, "Should not get any messages, message was #{event.data}"
523         end
524       end
525
526     end
527
528   end
529
530   test "connect, try bogus method" do
531     status = nil
532
533     ws_helper :active do |ws|
534       ws.on :open do |event|
535         ws.send ({method: 'frobnabble'}.to_json)
536       end
537
538       ws.on :message do |event|
539         d = Oj.strict_load event.data
540         status = d["status"]
541         ws.close
542       end
543     end
544
545     assert_equal 400, status
546   end
547
548   test "connect, missing method" do
549     status = nil
550
551     ws_helper :active do |ws|
552       ws.on :open do |event|
553         ws.send ({fizzbuzz: 'frobnabble'}.to_json)
554       end
555
556       ws.on :message do |event|
557         d = Oj.strict_load event.data
558         status = d["status"]
559         ws.close
560       end
561     end
562
563     assert_equal 400, status
564   end
565
566   test "connect, send malformed request" do
567     status = nil
568
569     ws_helper :active do |ws|
570       ws.on :open do |event|
571         ws.send '<XML4EVER></XML4EVER>'
572       end
573
574       ws.on :message do |event|
575         d = Oj.strict_load event.data
576         status = d["status"]
577         ws.close
578       end
579     end
580
581     assert_equal 400, status
582   end
583
584
585   test "connect, try subscribe too many filters" do
586     state = 1
587
588     authorize_with :active
589
590     ws_helper :active do |ws|
591       ws.on :open do |event|
592         (1..17).each do |i|
593           ws.send ({method: 'subscribe', filters: [['object_uuid', '=', i]]}.to_json)
594         end
595       end
596
597       ws.on :message do |event|
598         d = Oj.strict_load event.data
599         case state
600         when (1..Rails.configuration.websocket_max_filters)
601           assert_equal 200, d["status"]
602           state += 1
603         when (Rails.configuration.websocket_max_filters+1)
604           assert_equal 403, d["status"]
605           ws.close
606         end
607       end
608
609     end
610
611     assert_equal Rails.configuration.websocket_max_filters+1, state
612
613   end
614
615   test "connect, subscribe, lots of events" do
616     slow_test
617     state = 1
618     event_count = 0
619     log_start = Log.order(:id).last.id
620
621     authorize_with :active
622
623     ws_helper :active, false do |ws|
624       EM::Timer.new 45 do
625         # Needs a longer timeout than the default
626         ws.close
627       end
628
629       ws.on :open do |event|
630         ws.send ({method: 'subscribe'}.to_json)
631       end
632
633       ws.on :message do |event|
634         d = Oj.strict_load event.data
635         case state
636         when 1
637           assert_equal 200, d["status"]
638           ActiveRecord::Base.transaction do
639             (1..202).each do
640               spec = Specimen.create
641             end
642           end
643           state = 2
644         when 2
645           event_count += 1
646           assert_equal d['id'], event_count+log_start
647           if event_count == 202
648             ws.close
649           end
650         end
651       end
652
653     end
654
655     assert_equal 202, event_count
656   end
657
658
659   test "connect, subscribe with invalid filter" do
660     state = 1
661     human = nil
662     human_ev_uuid = nil
663
664     authorize_with :active
665
666     ws_helper :active do |ws|
667       ws.on :open do |event|
668         # test that #6451 is fixed (invalid filter crashes websockets)
669         ws.send ({method: 'subscribe', filters: [['object_blarg', 'is_a', 'arvados#human']]}.to_json)
670       end
671
672       ws.on :message do |event|
673         d = Oj.strict_load event.data
674         case state
675         when 1
676           assert_equal 200, d["status"]
677           Specimen.create
678           human = Human.create
679           state = 2
680         when 2
681           assert_equal 500, d["status"]
682           state = 3
683           ws.close
684         when 3
685           assert false, "Should not get any more events"
686         end
687       end
688
689     end
690
691     assert_equal 3, state
692
693     # Try connecting again, ensure that websockets server is still running and
694     # didn't crash per #6451
695     subscribe_test()
696
697   end
698
699
700 end