Merge branch 'master' into 10858-build-packages-improvements
[arvados.git] / sdk / python / tests / test_events.py
index 7e8c84ec11279495d55fd47770378847886ae76e..88e528362143b04b1477aef0b4af3c2af1d76b0a 100644 (file)
@@ -291,12 +291,20 @@ class PollClientTestCase(unittest.TestCase):
         def __init__(self):
             self.logs = []
             self.lock = threading.Lock()
+            self.api_called = threading.Event()
 
         def add(self, log):
             with self.lock:
                 self.logs.append(log)
 
         def return_list(self, num_retries=None):
+            self.api_called.set()
+            args, kwargs = self.list_func.call_args_list[-1]
+            filters = kwargs.get('filters', [])
+            if not any(True for f in filters if f[0] == 'id' and f[1] == '>'):
+                # No 'id' filter was given -- this must be the probe
+                # to determine the most recent id.
+                return {'items': [{'id': 1}], 'items_available': 1}
             with self.lock:
                 retval = self.logs
                 self.logs = []
@@ -306,7 +314,12 @@ class PollClientTestCase(unittest.TestCase):
         self.logs = self.MockLogs()
         self.arv = mock.MagicMock(name='arvados.api()')
         self.arv.logs().list().execute.side_effect = self.logs.return_list
-        self.callback_called = threading.Event()
+        # our MockLogs object's "execute" stub will need to inspect
+        # the call history to determine X in
+        # ....logs().list(filters=X).execute():
+        self.logs.list_func = self.arv.logs().list
+        self.status_ok = threading.Event()
+        self.event_received = threading.Event()
         self.recv_events = []
 
     def tearDown(self):
@@ -314,8 +327,11 @@ class PollClientTestCase(unittest.TestCase):
             self.client.close(timeout=None)
 
     def callback(self, event):
-        self.recv_events.append(event)
-        self.callback_called.set()
+        if event.get('status') == 200:
+            self.status_ok.set()
+        else:
+            self.recv_events.append(event)
+            self.event_received.set()
 
     def build_client(self, filters=None, callback=None, last_log_id=None, poll_time=99):
         if filters is None:
@@ -334,37 +350,45 @@ class PollClientTestCase(unittest.TestCase):
         self.logs.add({'id': 123})
         self.build_client(poll_time=.01)
         self.client.start()
-        self.assertTrue(self.callback_called.wait(self.TEST_TIMEOUT))
-        self.callback_called.clear()
+        self.assertTrue(self.status_ok.wait(self.TEST_TIMEOUT))
+        self.assertTrue(self.event_received.wait(self.TEST_TIMEOUT))
+        self.event_received.clear()
         self.logs.add(test_log.copy())
-        self.assertTrue(self.callback_called.wait(self.TEST_TIMEOUT))
-        self.client.close(timeout=None)
+        self.assertTrue(self.event_received.wait(self.TEST_TIMEOUT))
         self.assertIn(test_log, self.recv_events)
 
     def test_subscribe(self):
         client_filter = ['kind', '=', 'arvados#test']
         self.build_client()
+        self.client.unsubscribe([])
         self.client.subscribe([client_filter[:]])
         self.client.start()
-        self.assertTrue(self.callback_called.wait(self.TEST_TIMEOUT))
-        self.client.close(timeout=None)
+        self.assertTrue(self.status_ok.wait(self.TEST_TIMEOUT))
+        self.assertTrue(self.logs.api_called.wait(self.TEST_TIMEOUT))
         self.assertTrue(self.was_filter_used(client_filter))
 
     def test_unsubscribe(self):
-        client_filter = ['kind', '=', 'arvados#test']
-        self.build_client()
-        self.client.subscribe([client_filter[:]])
-        self.client.unsubscribe([client_filter[:]])
+        should_filter = ['foo', '=', 'foo']
+        should_not_filter = ['foo', '=', 'bar']
+        self.build_client(poll_time=0.01)
+        self.client.unsubscribe([])
+        self.client.subscribe([should_not_filter[:]])
+        self.client.subscribe([should_filter[:]])
+        self.client.unsubscribe([should_not_filter[:]])
         self.client.start()
-        self.client.close(timeout=None)
-        self.assertFalse(self.was_filter_used(client_filter))
+        self.logs.add({'id': 123})
+        self.assertTrue(self.status_ok.wait(self.TEST_TIMEOUT))
+        self.assertTrue(self.event_received.wait(self.TEST_TIMEOUT))
+        self.assertTrue(self.was_filter_used(should_filter))
+        self.assertFalse(self.was_filter_used(should_not_filter))
 
     def test_run_forever(self):
         self.build_client()
         self.client.start()
         forever_thread = threading.Thread(target=self.client.run_forever)
         forever_thread.start()
-        self.assertTrue(self.callback_called.wait(self.TEST_TIMEOUT))
+        self.assertTrue(self.status_ok.wait(self.TEST_TIMEOUT))
         self.assertTrue(forever_thread.is_alive())
         self.client.close()
         forever_thread.join()
+        del self.client