self.assertEqual(200, events.get(True, 5)['status'])
human = arvados.api('v1').humans().create(body={}).execute()
- log_object_uuids = []
- for i in range(0, expected):
- log_object_uuids.append(events.get(True, 5)['object_uuid'])
-
+ want_uuids = []
if expected > 0:
- self.assertIn(human['uuid'], log_object_uuids)
-
+ want_uuids.append(human['uuid'])
if expected > 1:
- self.assertIn(ancestor['uuid'], log_object_uuids)
+ want_uuids.append(ancestor['uuid'])
+ log_object_uuids = []
+ while set(want_uuids) - set(log_object_uuids):
+ log_object_uuids.append(events.get(True, 5)['object_uuid'])
- with self.assertRaises(Queue.Empty):
- # assertEqual just serves to show us what unexpected thing
- # comes out of the queue when the assertRaises fails; when
- # the test passes, this assertEqual doesn't get called.
- self.assertEqual(events.get(True, 2), None)
+ if expected < 2:
+ with self.assertRaises(Queue.Empty):
+ # assertEqual just serves to show us what unexpected
+ # thing comes out of the queue when the assertRaises
+ # fails; when the test passes, this assertEqual
+ # doesn't get called.
+ self.assertEqual(events.get(True, 2), None)
def test_subscribe_websocket(self):
self._test_subscribe(
return time.strftime('%Y-%m-%dT%H:%M:%S', time.localtime(t)) + self.isotz(-time.timezone/60)
def isotz(self, offset):
- """Convert minutes-east-of-UTC to ISO8601 time zone designator"""
- return '{:+03d}{:02d}'.format(offset/60, offset%60)
+ """Convert minutes-east-of-UTC to RFC3339- and ISO-compatible time zone designator"""
+ return '{:+03d}:{:02d}'.format(offset/60, offset%60)
# Test websocket reconnection on (un)execpted close
def _test_websocket_reconnect(self, close_unexpected):
def __init__(self):
self.logs = []
self.lock = threading.Lock()
+ self.api_called = threading.Event()
def add(self, log):
with self.lock:
self.logs.append(log)
def return_list(self, num_retries=None):
+ self.api_called.set()
+ args, kwargs = self.list_func.call_args_list[-1]
+ filters = kwargs.get('filters', [])
+ if not any(True for f in filters if f[0] == 'id' and f[1] == '>'):
+ # No 'id' filter was given -- this must be the probe
+ # to determine the most recent id.
+ return {'items': [{'id': 1}], 'items_available': 1}
with self.lock:
retval = self.logs
self.logs = []
self.logs = self.MockLogs()
self.arv = mock.MagicMock(name='arvados.api()')
self.arv.logs().list().execute.side_effect = self.logs.return_list
- self.callback_called = threading.Event()
+ # our MockLogs object's "execute" stub will need to inspect
+ # the call history to determine X in
+ # ....logs().list(filters=X).execute():
+ self.logs.list_func = self.arv.logs().list
+ self.status_ok = threading.Event()
+ self.event_received = threading.Event()
self.recv_events = []
def tearDown(self):
self.client.close(timeout=None)
def callback(self, event):
- self.recv_events.append(event)
- self.callback_called.set()
+ if event.get('status') == 200:
+ self.status_ok.set()
+ else:
+ self.recv_events.append(event)
+ self.event_received.set()
def build_client(self, filters=None, callback=None, last_log_id=None, poll_time=99):
if filters is None:
self.logs.add({'id': 123})
self.build_client(poll_time=.01)
self.client.start()
- self.assertTrue(self.callback_called.wait(self.TEST_TIMEOUT))
- self.callback_called.clear()
+ self.assertTrue(self.status_ok.wait(self.TEST_TIMEOUT))
+ self.assertTrue(self.event_received.wait(self.TEST_TIMEOUT))
+ self.event_received.clear()
self.logs.add(test_log.copy())
- self.assertTrue(self.callback_called.wait(self.TEST_TIMEOUT))
- self.client.close(timeout=None)
+ self.assertTrue(self.event_received.wait(self.TEST_TIMEOUT))
self.assertIn(test_log, self.recv_events)
def test_subscribe(self):
client_filter = ['kind', '=', 'arvados#test']
self.build_client()
+ self.client.unsubscribe([])
self.client.subscribe([client_filter[:]])
self.client.start()
- self.assertTrue(self.callback_called.wait(self.TEST_TIMEOUT))
- self.client.close(timeout=None)
+ self.assertTrue(self.status_ok.wait(self.TEST_TIMEOUT))
+ self.assertTrue(self.logs.api_called.wait(self.TEST_TIMEOUT))
self.assertTrue(self.was_filter_used(client_filter))
def test_unsubscribe(self):
- client_filter = ['kind', '=', 'arvados#test']
- self.build_client()
- self.client.subscribe([client_filter[:]])
- self.client.unsubscribe([client_filter[:]])
+ should_filter = ['foo', '=', 'foo']
+ should_not_filter = ['foo', '=', 'bar']
+ self.build_client(poll_time=0.01)
+ self.client.unsubscribe([])
+ self.client.subscribe([should_not_filter[:]])
+ self.client.subscribe([should_filter[:]])
+ self.client.unsubscribe([should_not_filter[:]])
self.client.start()
- self.client.close(timeout=None)
- self.assertFalse(self.was_filter_used(client_filter))
+ self.logs.add({'id': 123})
+ self.assertTrue(self.status_ok.wait(self.TEST_TIMEOUT))
+ self.assertTrue(self.event_received.wait(self.TEST_TIMEOUT))
+ self.assertTrue(self.was_filter_used(should_filter))
+ self.assertFalse(self.was_filter_used(should_not_filter))
def test_run_forever(self):
self.build_client()
self.client.start()
forever_thread = threading.Thread(target=self.client.run_forever)
forever_thread.start()
- self.assertTrue(self.callback_called.wait(self.TEST_TIMEOUT))
+ self.assertTrue(self.status_ok.wait(self.TEST_TIMEOUT))
self.assertTrue(forever_thread.is_alive())
self.client.close()
forever_thread.join()
+ del self.client