Merge branch 'master' into 9998-unsigned_manifest
[arvados.git] / services / api / test / integration / websocket_test.rb
1 require 'test_helper'
2 require 'oj'
3 require 'database_cleaner'
4
5 DatabaseCleaner.strategy = :deletion
6
7 class WebsocketTest < ActionDispatch::IntegrationTest
8   self.use_transactional_fixtures = false
9
10   setup do
11     DatabaseCleaner.start
12   end
13
14   teardown do
15     DatabaseCleaner.clean
16   end
17
18   def self.startup
19     s = TCPServer.new('0.0.0.0', 0)
20     @@port = s.addr[1]
21     s.close
22     @@pidfile = "tmp/pids/passenger.#{@@port}.pid"
23     DatabaseCleaner.start
24     Dir.chdir(Rails.root) do |apidir|
25       # Only passenger seems to be able to run the websockets server
26       # successfully.
27       _system('passenger', 'start', '-d',
28               "-p#{@@port}",
29               "--log-file", "/dev/stderr",
30               "--pid-file", @@pidfile)
31       timeout = Time.now.tv_sec + 10
32       begin
33         sleep 0.2
34         begin
35           server_pid = IO.read(@@pidfile).to_i
36           good_pid = (server_pid > 0) and (Process.kill(0, pid) rescue false)
37         rescue Errno::ENOENT
38           good_pid = false
39         end
40       end while (not good_pid) and (Time.now.tv_sec < timeout)
41       if not good_pid
42         raise RuntimeError, "could not find API server Rails pid"
43       end
44       STDERR.puts "Started websocket server on port #{@@port} with pid #{server_pid}"
45     end
46   end
47
48   def self.shutdown
49     Dir.chdir(Rails.root) do
50       _system('passenger', 'stop', "-p#{@@port}",
51               "--pid-file", @@pidfile)
52     end
53     # DatabaseCleaner leaves the database empty. Prefer to leave it full.
54     dc = DatabaseController.new
55     dc.define_singleton_method :render do |*args| end
56     dc.reset
57   end
58
59   def self._system(*cmd)
60     Bundler.with_clean_env do
61       env = {
62         'ARVADOS_WEBSOCKETS' => 'ws-only',
63         'RAILS_ENV' => 'test',
64       }
65       if not system(env, *cmd)
66         raise RuntimeError, "Command exited #{$?}: #{cmd.inspect}"
67       end
68     end
69   end
70
71   def ws_helper(token: nil, timeout: 8)
72     opened = false
73     close_status = nil
74     too_long = false
75
76     EM.run do
77       if token
78         ws = Faye::WebSocket::Client.new("ws://localhost:#{@@port}/websocket?api_token=#{api_client_authorizations(token).api_token}")
79       else
80         ws = Faye::WebSocket::Client.new("ws://localhost:#{@@port}/websocket")
81       end
82
83       ws.on :open do |event|
84         opened = true
85         if timeout
86           EM::Timer.new(timeout) do
87             too_long = true if close_status.nil?
88             EM.stop_event_loop
89           end
90         end
91       end
92
93       ws.on :error do |event|
94         STDERR.puts "websocket client error: #{event.inspect}"
95       end
96
97       ws.on :close do |event|
98         close_status = [:close, event.code, event.reason]
99         EM.stop_event_loop
100       end
101
102       yield ws
103     end
104
105     assert opened, "Should have opened web socket"
106     assert (not too_long), "Test took too long"
107     assert_equal 1000, close_status[1], "Connection closed unexpectedly (check log for errors)"
108   end
109
110   test "connect with no token" do
111     status = nil
112
113     ws_helper do |ws|
114       ws.on :message do |event|
115         d = Oj.strict_load event.data
116         status = d["status"]
117         ws.close
118       end
119     end
120
121     assert_equal 401, status
122   end
123
124   test "connect, subscribe and get response" do
125     status = nil
126
127     ws_helper(token: :active) do |ws|
128       ws.on :open do |event|
129         ws.send ({method: 'subscribe'}.to_json)
130       end
131
132       ws.on :message do |event|
133         d = Oj.strict_load event.data
134         status = d["status"]
135         ws.close
136       end
137     end
138
139     assert_equal 200, status
140   end
141
142   def subscribe_test
143     state = 1
144     spec = nil
145     ev_uuid = nil
146
147     authorize_with :active
148
149     ws_helper(token: :active) do |ws|
150       ws.on :open do |event|
151         ws.send ({method: 'subscribe'}.to_json)
152       end
153
154       ws.on :message do |event|
155         d = Oj.strict_load event.data
156         case state
157         when 1
158           assert_equal 200, d["status"]
159           spec = Specimen.create
160           state = 2
161         when 2
162           ev_uuid = d["object_uuid"]
163           ws.close
164         end
165       end
166
167     end
168
169     assert_not_nil spec
170     assert_equal spec.uuid, ev_uuid
171   end
172
173   test "connect, subscribe, get event" do
174     subscribe_test()
175   end
176
177   test "connect, subscribe, get two events" do
178     state = 1
179     spec = nil
180     human = nil
181     spec_ev_uuid = nil
182     human_ev_uuid = nil
183
184     authorize_with :active
185
186     ws_helper(token: :active) do |ws|
187       ws.on :open do |event|
188         ws.send ({method: 'subscribe'}.to_json)
189       end
190
191       ws.on :message do |event|
192         d = Oj.strict_load event.data
193         case state
194         when 1
195           assert_equal 200, d["status"]
196           spec = Specimen.create
197           human = Human.create
198           state = 2
199         when 2
200           spec_ev_uuid = d["object_uuid"]
201           state = 3
202         when 3
203           human_ev_uuid = d["object_uuid"]
204           state = 4
205           ws.close
206         when 4
207           assert false, "Should not get any more events"
208         end
209       end
210
211     end
212
213     assert_not_nil spec
214     assert_not_nil human
215     assert_equal spec.uuid, spec_ev_uuid
216     assert_equal human.uuid, human_ev_uuid
217   end
218
219   test "connect, subscribe, filter events" do
220     state = 1
221     human = nil
222     human_ev_uuid = nil
223
224     authorize_with :active
225
226     ws_helper(token: :active) do |ws|
227       ws.on :open do |event|
228         ws.send ({method: 'subscribe', filters: [['object_uuid', 'is_a', 'arvados#human']]}.to_json)
229       end
230
231       ws.on :message do |event|
232         d = Oj.strict_load event.data
233         case state
234         when 1
235           assert_equal 200, d["status"]
236           Specimen.create
237           human = Human.create
238           state = 2
239         when 2
240           human_ev_uuid = d["object_uuid"]
241           state = 3
242           ws.close
243         when 3
244           assert false, "Should not get any more events"
245         end
246       end
247
248     end
249
250     assert_not_nil human
251     assert_equal human.uuid, human_ev_uuid
252   end
253
254
255   test "connect, subscribe, multiple filters" do
256     state = 1
257     spec = nil
258     human = nil
259     spec_ev_uuid = nil
260     human_ev_uuid = nil
261
262     authorize_with :active
263
264     ws_helper(token: :active) do |ws|
265       ws.on :open do |event|
266         ws.send ({method: 'subscribe', filters: [['object_uuid', 'is_a', 'arvados#human']]}.to_json)
267         ws.send ({method: 'subscribe', filters: [['object_uuid', 'is_a', 'arvados#specimen']]}.to_json)
268       end
269
270       ws.on :message do |event|
271         d = Oj.strict_load event.data
272         case state
273         when 1
274           assert_equal 200, d["status"]
275           state = 2
276         when 2
277           assert_equal 200, d["status"]
278           spec = Specimen.create
279           Trait.create # not part of filters, should not be received
280           human = Human.create
281           state = 3
282         when 3
283           spec_ev_uuid = d["object_uuid"]
284           state = 4
285         when 4
286           human_ev_uuid = d["object_uuid"]
287           state = 5
288           ws.close
289         when 5
290           assert false, "Should not get any more events"
291         end
292       end
293
294     end
295
296     assert_not_nil spec
297     assert_not_nil human
298     assert_equal spec.uuid, spec_ev_uuid
299     assert_equal human.uuid, human_ev_uuid
300   end
301
302
303   test "connect, subscribe, compound filter" do
304     state = 1
305     t1 = nil
306
307     authorize_with :active
308
309     ws_helper(token: :active) do |ws|
310       ws.on :open do |event|
311         ws.send ({method: 'subscribe', filters: [['object_uuid', 'is_a', 'arvados#trait'], ['event_type', '=', 'update']]}.to_json)
312       end
313
314       ws.on :message do |event|
315         d = Oj.strict_load event.data
316         case state
317         when 1
318           assert_equal 200, d["status"]
319           t1 = Trait.create("name" => "foo")
320           t1.name = "bar"
321           t1.save!
322           state = 2
323          when 2
324           assert_equal 'update', d['event_type']
325           state = 3
326           ws.close
327         when 3
328           assert false, "Should not get any more events"
329         end
330       end
331
332     end
333
334     assert_equal 3, state
335     assert_not_nil t1
336   end
337
338   test "connect, subscribe, ask events starting at seq num" do
339     state = 1
340
341     authorize_with :active
342
343     lastid = logs(:admin_changes_specimen).id
344     l1 = nil
345     l2 = nil
346
347     ws_helper(token: :active) do |ws|
348       ws.on :open do |event|
349         ws.send ({method: 'subscribe', last_log_id: lastid}.to_json)
350       end
351
352       ws.on :message do |event|
353         d = Oj.strict_load event.data
354         case state
355         when 1
356           assert_equal 200, d["status"]
357           state = 2
358         when 2
359           l1 = d["object_uuid"]
360           assert_not_nil l1, "Unexpected message: #{d}"
361           state = 3
362         when 3
363           l2 = d["object_uuid"]
364           assert_not_nil l2, "Unexpected message: #{d}"
365           state = 4
366           ws.close
367         when 4
368           assert false, "Should not get any more events"
369         end
370       end
371     end
372
373     expect_next_logs = Log.where('id > ?', lastid).order('id asc')
374     assert_equal expect_next_logs[0].object_uuid, l1
375     assert_equal expect_next_logs[1].object_uuid, l2
376   end
377
378   slow_test "connect, subscribe, get event, unsubscribe" do
379     state = 1
380     spec = nil
381     spec_ev_uuid = nil
382
383     authorize_with :active
384
385     ws_helper(token: :active, timeout: false) do |ws|
386       ws.on :open do |event|
387         ws.send ({method: 'subscribe'}.to_json)
388         EM::Timer.new 3 do
389           # Set a time limit on the test because after unsubscribing the server
390           # still has to process the next event (and then hopefully correctly
391           # decides not to send it because we unsubscribed.)
392           ws.close
393         end
394       end
395
396       ws.on :message do |event|
397         d = Oj.strict_load event.data
398         case state
399         when 1
400           assert_equal 200, d["status"]
401           spec = Specimen.create
402           state = 2
403         when 2
404           spec_ev_uuid = d["object_uuid"]
405           ws.send ({method: 'unsubscribe'}.to_json)
406
407           EM::Timer.new 1 do
408             Specimen.create
409           end
410
411           state = 3
412         when 3
413           assert_equal 200, d["status"]
414           state = 4
415         when 4
416           assert false, "Should not get any more events"
417         end
418       end
419
420     end
421
422     assert_not_nil spec
423     assert_equal spec.uuid, spec_ev_uuid
424   end
425
426   slow_test "connect, subscribe, get event, unsubscribe with filter" do
427     state = 1
428     spec = nil
429     spec_ev_uuid = nil
430
431     authorize_with :active
432
433     ws_helper(token: :active, timeout: false) do |ws|
434       ws.on :open do |event|
435         ws.send ({method: 'subscribe', filters: [['object_uuid', 'is_a', 'arvados#human']]}.to_json)
436         EM::Timer.new 6 do
437           # Set a time limit on the test because after unsubscribing the server
438           # still has to process the next event (and then hopefully correctly
439           # decides not to send it because we unsubscribed.)
440           ws.close
441         end
442       end
443
444       ws.on :message do |event|
445         d = Oj.strict_load event.data
446         case state
447         when 1
448           assert_equal 200, d["status"]
449           spec = Human.create
450           state = 2
451         when 2
452           spec_ev_uuid = d["object_uuid"]
453           ws.send ({method: 'unsubscribe', filters: [['object_uuid', 'is_a', 'arvados#human']]}.to_json)
454
455           EM::Timer.new 1 do
456             Human.create
457           end
458
459           state = 3
460         when 3
461           assert_equal 200, d["status"]
462           state = 4
463         when 4
464           assert false, "Should not get any more events"
465         end
466       end
467
468     end
469
470     assert_not_nil spec
471     assert_equal spec.uuid, spec_ev_uuid
472   end
473
474
475   slow_test "connect, subscribe, get event, try to unsubscribe with bogus filter" do
476     state = 1
477     spec = nil
478     spec_ev_uuid = nil
479     human = nil
480     human_ev_uuid = nil
481
482     authorize_with :active
483
484     ws_helper(token: :active) do |ws|
485       ws.on :open do |event|
486         ws.send ({method: 'subscribe'}.to_json)
487       end
488
489       ws.on :message do |event|
490         d = Oj.strict_load event.data
491         case state
492         when 1
493           assert_equal 200, d["status"]
494           spec = Specimen.create
495           state = 2
496         when 2
497           spec_ev_uuid = d["object_uuid"]
498           ws.send ({method: 'unsubscribe', filters: [['foo', 'bar', 'baz']]}.to_json)
499
500           EM::Timer.new 1 do
501             human = Human.create
502           end
503
504           state = 3
505         when 3
506           assert_equal 404, d["status"]
507           state = 4
508         when 4
509           human_ev_uuid = d["object_uuid"]
510           state = 5
511           ws.close
512         when 5
513           assert false, "Should not get any more events"
514         end
515       end
516
517     end
518
519     assert_not_nil spec
520     assert_not_nil human
521     assert_equal spec.uuid, spec_ev_uuid
522     assert_equal human.uuid, human_ev_uuid
523   end
524
525   slow_test "connected, not subscribed, no event" do
526     authorize_with :active
527
528     ws_helper(token: :active, timeout: false) do |ws|
529       ws.on :open do |event|
530         EM::Timer.new 1 do
531           Specimen.create
532         end
533
534         EM::Timer.new 3 do
535           ws.close
536         end
537       end
538
539       ws.on :message do |event|
540         assert false, "Should not get any messages, message was #{event.data}"
541       end
542     end
543   end
544
545   slow_test "connected, not authorized to see event" do
546     state = 1
547
548     authorize_with :admin
549
550     ws_helper(token: :active, timeout: false) do |ws|
551       ws.on :open do |event|
552         ws.send ({method: 'subscribe'}.to_json)
553
554         EM::Timer.new 3 do
555           ws.close
556         end
557       end
558
559       ws.on :message do |event|
560         d = Oj.strict_load event.data
561         case state
562         when 1
563           assert_equal 200, d["status"]
564           Specimen.create
565           state = 2
566         when 2
567           assert false, "Should not get any messages, message was #{event.data}"
568         end
569       end
570
571     end
572
573   end
574
575   test "connect, try bogus method" do
576     status = nil
577
578     ws_helper(token: :active) do |ws|
579       ws.on :open do |event|
580         ws.send ({method: 'frobnabble'}.to_json)
581       end
582
583       ws.on :message do |event|
584         d = Oj.strict_load event.data
585         status = d["status"]
586         ws.close
587       end
588     end
589
590     assert_equal 400, status
591   end
592
593   test "connect, missing method" do
594     status = nil
595
596     ws_helper(token: :active) do |ws|
597       ws.on :open do |event|
598         ws.send ({fizzbuzz: 'frobnabble'}.to_json)
599       end
600
601       ws.on :message do |event|
602         d = Oj.strict_load event.data
603         status = d["status"]
604         ws.close
605       end
606     end
607
608     assert_equal 400, status
609   end
610
611   test "connect, send malformed request" do
612     status = nil
613
614     ws_helper(token: :active) do |ws|
615       ws.on :open do |event|
616         ws.send '<XML4EVER></XML4EVER>'
617       end
618
619       ws.on :message do |event|
620         d = Oj.strict_load event.data
621         status = d["status"]
622         ws.close
623       end
624     end
625
626     assert_equal 400, status
627   end
628
629
630   test "connect, try subscribe too many filters" do
631     state = 1
632
633     authorize_with :active
634
635     ws_helper(token: :active) do |ws|
636       ws.on :open do |event|
637         (1..17).each do |i|
638           ws.send ({method: 'subscribe', filters: [['object_uuid', '=', i]]}.to_json)
639         end
640       end
641
642       ws.on :message do |event|
643         d = Oj.strict_load event.data
644         case state
645         when (1..Rails.configuration.websocket_max_filters)
646           assert_equal 200, d["status"]
647           state += 1
648         when (Rails.configuration.websocket_max_filters+1)
649           assert_equal 403, d["status"]
650           ws.close
651         end
652       end
653
654     end
655
656     assert_equal Rails.configuration.websocket_max_filters+1, state
657
658   end
659
660   slow_test "connect, subscribe, lots of events" do
661     state = 1
662     event_count = 0
663     log_start = Log.order(:id).last.id
664
665     authorize_with :active
666
667     ws_helper(token: :active, timeout: false) do |ws|
668       EM::Timer.new 45 do
669         # Needs a longer timeout than the default
670         ws.close
671       end
672
673       ws.on :open do |event|
674         ws.send ({method: 'subscribe'}.to_json)
675       end
676
677       ws.on :message do |event|
678         d = Oj.strict_load event.data
679         case state
680         when 1
681           assert_equal 200, d["status"]
682           ActiveRecord::Base.transaction do
683             (1..202).each do
684               Specimen.create
685             end
686           end
687           state = 2
688         when 2
689           event_count += 1
690           assert_equal d['id'], event_count+log_start
691           if event_count == 202
692             ws.close
693           end
694         end
695       end
696
697     end
698
699     assert_equal 202, event_count
700   end
701
702
703   test "connect, subscribe with invalid filter" do
704     state = 1
705
706     authorize_with :active
707
708     ws_helper(token: :active) do |ws|
709       ws.on :open do |event|
710         # test that #6451 is fixed (invalid filter crashes websockets)
711         ws.send ({method: 'subscribe', filters: [['object_blarg', 'is_a', 'arvados#human']]}.to_json)
712       end
713
714       ws.on :message do |event|
715         d = Oj.strict_load event.data
716         case state
717         when 1
718           assert_equal 200, d["status"]
719           Specimen.create
720           Human.create
721           state = 2
722         when 2
723           assert_equal 500, d["status"]
724           state = 3
725           ws.close
726         when 3
727           assert false, "Should not get any more events"
728         end
729       end
730
731     end
732
733     assert_equal 3, state
734
735     # Try connecting again, ensure that websockets server is still running and
736     # didn't crash per #6451
737     subscribe_test()
738
739   end
740
741
742 end