Merge branch '8931-event-thread-catch-exceptions' closes #8931
authorPeter Amstutz <peter.amstutz@curoverse.com>
Tue, 26 Apr 2016 17:00:25 +0000 (13:00 -0400)
committerPeter Amstutz <peter.amstutz@curoverse.com>
Tue, 26 Apr 2016 17:00:25 +0000 (13:00 -0400)
sdk/python/arvados/events.py
sdk/python/arvados/retry.py
sdk/python/tests/test_retry.py
sdk/python/tests/test_websockets.py

index 79960c43bf559161038eb4006662ecbc6314c113..d88897f1234329b5294bedb1da4e4104f9720b4e 100644 (file)
@@ -1,9 +1,11 @@
 import arvados
 import config
 import errors
+from retry import RetryLoop
 
 import logging
 import json
+import thread
 import threading
 import time
 import os
@@ -105,19 +107,27 @@ class EventClient(object):
     def on_event(self, m):
         if m.get('id') != None:
             self.last_log_id = m.get('id')
-        self.on_event_cb(m)
+        try:
+            self.on_event_cb(m)
+        except Exception as e:
+            _logger.exception("Unexpected exception from event callback.")
+            thread.interrupt_main()
 
     def on_closed(self):
         if self.is_closed == False:
             _logger.warn("Unexpected close. Reconnecting.")
-            self.ec = _EventClient(self.url, self.filters, self.on_event, self.last_log_id, self.on_closed)
-            while True:
-              try:
-                  self.ec.connect()
-                  break
-              except Exception as e:
-                  _logger.warn("Error '%s' during websocket reconnect. Will retry after 5s.", e, exc_info=e)
-                  time.sleep(5)
+            for tries_left in RetryLoop(num_retries=25, backoff_start=.1, max_wait=15):
+                try:
+                    self.ec = _EventClient(self.url, self.filters, self.on_event, self.last_log_id, self.on_closed)
+                    self.ec.connect()
+                    break
+                except Exception as e:
+                    _logger.warn("Error '%s' during websocket reconnect.", e)
+            if tries_left == 0:
+                _logger.exception("EventClient thread could not contact websocket server.")
+                self.is_closed = True
+                thread.interrupt_main()
+                return
 
 
 class PollClient(threading.Thread):
@@ -141,7 +151,21 @@ class PollClient(threading.Thread):
             self.id = self.last_log_id
         else:
             for f in self.filters:
-                items = self.api.logs().list(limit=1, order="id desc", filters=f).execute()['items']
+                for tries_left in RetryLoop(num_retries=25, backoff_start=.1, max_wait=self.poll_time):
+                    try:
+                        items = self.api.logs().list(limit=1, order="id desc", filters=f).execute()['items']
+                        break
+                    except errors.ApiError as error:
+                        pass
+                    else:
+                        tries_left = 0
+                        break
+                if tries_left == 0:
+                    _logger.exception("PollClient thread could not contact API server.")
+                    with self._closing_lock:
+                        self._closing.set()
+                    thread.interrupt_main()
+                    return
                 if items:
                     if items[0]['id'] > self.id:
                         self.id = items[0]['id']
@@ -152,14 +176,32 @@ class PollClient(threading.Thread):
             max_id = self.id
             moreitems = False
             for f in self.filters:
-                items = self.api.logs().list(order="id asc", filters=f+[["id", ">", str(self.id)]]).execute()
+                for tries_left in RetryLoop(num_retries=25, backoff_start=.1, max_wait=self.poll_time):
+                    try:
+                        items = self.api.logs().list(order="id asc", filters=f+[["id", ">", str(self.id)]]).execute()
+                        break
+                    except errors.ApiError as error:
+                        pass
+                    else:
+                        tries_left = 0
+                        break
+                if tries_left == 0:
+                    _logger.exception("PollClient thread could not contact API server.")
+                    with self._closing_lock:
+                        self._closing.set()
+                    thread.interrupt_main()
+                    return
                 for i in items["items"]:
                     if i['id'] > max_id:
                         max_id = i['id']
                     with self._closing_lock:
                         if self._closing.is_set():
                             return
-                        self.on_event(i)
+                        try:
+                            self.on_event(i)
+                        except Exception as e:
+                            _logger.exception("Unexpected exception from event callback.")
+                            thread.interrupt_main()
                 if items["items_available"] > len(items["items"]):
                     moreitems = True
             self.id = max_id
index d8f5317d2c4c160c833339929302edaae679d6f5..5ba4f4ea41016a6225ebb3fca194265e56b56a0b 100644 (file)
@@ -31,7 +31,8 @@ class RetryLoop(object):
             return loop.last_result()
     """
     def __init__(self, num_retries, success_check=lambda r: True,
-                 backoff_start=0, backoff_growth=2, save_results=1):
+                 backoff_start=0, backoff_growth=2, save_results=1,
+                 max_wait=60):
         """Construct a new RetryLoop.
 
         Arguments:
@@ -50,11 +51,13 @@ class RetryLoop(object):
         * save_results: Specify a number to save the last N results
           that the loop recorded.  These records are available through
           the results attribute, oldest first.  Default 1.
+        * max_wait: Maximum number of seconds to wait between retries.
         """
         self.tries_left = num_retries + 1
         self.check_result = success_check
         self.backoff_wait = backoff_start
         self.backoff_growth = backoff_growth
+        self.max_wait = max_wait
         self.next_start_time = 0
         self.results = deque(maxlen=save_results)
         self._running = None
@@ -76,6 +79,8 @@ class RetryLoop(object):
             wait_time = max(0, self.next_start_time - time.time())
             time.sleep(wait_time)
             self.backoff_wait *= self.backoff_growth
+            if self.backoff_wait > self.max_wait:
+                self.backoff_wait = self.max_wait
         self.next_start_time = time.time() + self.backoff_wait
         self.tries_left -= 1
         return self.tries_left
index c41c42e762cd5e8f856444926716b0c274735bb5..cc12f39a355ef9b97a85a34ee5989e3bae38a744 100644 (file)
@@ -141,7 +141,7 @@ class RetryLoopBackoffTestCase(unittest.TestCase, RetryLoopTestMixin):
 
     def test_backoff_multiplier(self, sleep_mock, time_mock):
         self.run_loop(5, 500, 501, 502, 503, 504, 505,
-                      backoff_start=5, backoff_growth=10)
+                      backoff_start=5, backoff_growth=10, max_wait=1000000000)
         self.check_backoff(sleep_mock, 5, 9)
 
 
index 907dd93100e645082b06a55841fffcdc0ff350f4..d122a1cf42570a71b9a953f2a15636b6cd50ee33 100644 (file)
@@ -1,5 +1,6 @@
 import arvados
 import arvados.events
+import arvados.errors
 from datetime import datetime, timedelta, tzinfo
 import logging
 import logging.handlers
@@ -78,6 +79,25 @@ class WebsocketTest(run_test_server.TestCaseWithServers):
         self._test_subscribe(
             poll_fallback=0.25, expect_type=arvados.events.PollClient, expected=1)
 
+    def test_subscribe_poll_retry(self):
+        api_mock = mock.MagicMock()
+        n = []
+        def on_ev(ev):
+            n.append(ev)
+
+        error_mock = mock.MagicMock()
+        error_mock.resp.status = 0
+        error_mock._get_reason.return_value = "testing"
+        api_mock.logs().list().execute.side_effect = (arvados.errors.ApiError(error_mock, ""),
+                                                      {"items": [{"id": 1}], "items_available": 1},
+                                                      arvados.errors.ApiError(error_mock, ""),
+                                                      {"items": [{"id": 1}], "items_available": 1})
+        pc = arvados.events.PollClient(api_mock, [], on_ev, 15, None)
+        pc.start()
+        while len(n) < 2:
+            time.sleep(.1)
+        pc.close()
+
     def test_subscribe_websocket_with_start_time_past(self):
         self._test_subscribe(
             poll_fallback=False, expect_type=arvados.events.EventClient,
@@ -222,6 +242,6 @@ class WebsocketTest(run_test_server.TestCaseWithServers):
 
         # verify log messages to ensure retry happened
         log_messages = logstream.getvalue()
-        found = log_messages.find("Error 'EventClient.connect error' during websocket reconnect. Will retry")
+        found = log_messages.find("Error 'EventClient.connect error' during websocket reconnect.")
         self.assertNotEqual(found, -1)
         rootLogger.removeHandler(streamHandler)