additional_filters=[['created_at', '>=', lastHour.strftime('%Y-%m-%d')]])
@mock.patch('arvados.events.EventClient.__init__')
- def test_poll_websocket_with_start_time_date_only(self, event_client_constr):
+ def test_poll_with_start_time_date_only(self, event_client_constr):
+ event_client_constr.side_effect = Exception('All is well')
lastHour = datetime.today() - timedelta(hours = 1)
self._test_subscribe(
poll_fallback=1, expect_type=arvados.events.PollClient, last_log_id=1,
@mock.patch('arvados.events.EventClient.__init__')
def test_subscribe_poll_with_start_time_last_hour(self, event_client_constr):
+ event_client_constr.side_effect = Exception('All is well')
lastHour = datetime.today() - timedelta(hours = 1)
self._test_subscribe(
poll_fallback=1, expect_type=arvados.events.PollClient, last_log_id=1,
@mock.patch('arvados.events.EventClient.__init__')
def test_subscribe_poll_with_start_time_next_hour(self, event_client_constr):
+ event_client_constr.side_effect = Exception('All is well')
nextHour = datetime.today() + timedelta(hours = 1)
with self.assertRaises(Queue.Empty):
self._test_subscribe(
@mock.patch('arvados.events.EventClient.__init__')
def test_subscribe_poll_with_start_time_tomorrow(self, event_client_constr):
+ event_client_constr.side_effect = Exception('All is well')
tomorrow = datetime.today() + timedelta(hours = 24)
with self.assertRaises(Queue.Empty):
self._test_subscribe(