8 from datetime import datetime, timedelta
11 class WebsocketTest(run_test_server.TestCaseWithServers):
20 super(WebsocketTest, self).tearDown()
22 def _test_subscribe(self, poll_fallback, expect_type, last_log_id=None, additional_filters=None, expected=1):
23 run_test_server.authorize_with('active')
24 events = Queue.Queue(100)
25 filters = [['object_uuid', 'is_a', 'arvados#human']]
26 if additional_filters:
27 filters = filters + additional_filters
29 # Create an extra object before subscribing and verify that as well
30 ancestor = arvados.api('v1').humans().create(body={}).execute()
33 self.ws = arvados.events.subscribe(
34 arvados.api('v1'), filters,
35 events.put, poll_fallback=poll_fallback, last_log_id=last_log_id)
36 self.assertIsInstance(self.ws, expect_type)
37 self.assertEqual(200, events.get(True, 10)['status'])
38 human = arvados.api('v1').humans().create(body={}).execute()
40 if last_log_id == None or expected == 0:
41 self.assertEqual(human['uuid'], events.get(True, 10)['object_uuid'])
42 self.assertTrue(events.empty(), "got more events than expected")
45 for i in range(0, 10):
47 event = events.get(True, 10)
48 self.assertTrue(event['object_uuid'] is not None)
49 log_events.append(event['object_uuid'])
53 self.assertTrue(len(log_events)>1)
54 self.assertTrue(human['uuid'] in log_events)
55 self.assertTrue(ancestor['uuid'] in log_events)
57 def test_subscribe_websocket(self):
59 poll_fallback=False, expect_type=arvados.events.EventClient)
61 def test_subscribe_websocket_with_start_time_today(self):
62 now = datetime.today()
64 poll_fallback=False, expect_type=arvados.events.EventClient, last_log_id=1,
65 additional_filters=[['created_at', '>=', now.strftime('%Y-%m-%d')]])
67 def test_subscribe_websocket_with_start_time_last_hour(self):
68 lastHour = datetime.today() - timedelta(hours = 1)
70 poll_fallback=False, expect_type=arvados.events.EventClient, last_log_id=1,
71 additional_filters=[['created_at', '>=', lastHour.strftime('%Y-%m-%d %H:%M:%S')]])
73 def test_subscribe_websocket_with_start_time_next_hour(self):
74 nextHour = datetime.today() + timedelta(hours = 1)
75 with self.assertRaises(Queue.Empty):
77 poll_fallback=False, expect_type=arvados.events.EventClient, last_log_id=1,
78 additional_filters=[['created_at', '>=', nextHour.strftime('%Y-%m-%d %H:%M:%S')]], expected=0)
80 def test_subscribe_websocket_with_start_time_tomorrow(self):
81 tomorrow = datetime.today() + timedelta(hours = 24)
82 with self.assertRaises(Queue.Empty):
84 poll_fallback=False, expect_type=arvados.events.EventClient, last_log_id=1,
85 additional_filters=[['created_at', '>=', tomorrow.strftime('%Y-%m-%d')]], expected=0)
87 @mock.patch('arvados.events.EventClient.__init__')
88 def test_subscribe_poll(self, event_client_constr):
89 event_client_constr.side_effect = Exception('All is well')
91 poll_fallback=1, expect_type=arvados.events.PollClient)