import arvados
import config
import errors
+from retry import RetryLoop
import logging
import json
+import thread
import threading
import time
import os
def on_event(self, m):
if m.get('id') != None:
self.last_log_id = m.get('id')
- self.on_event_cb(m)
+ try:
+ self.on_event_cb(m)
+ except Exception as e:
+ _logger.exception("Unexpected exception from event callback.")
+ thread.interrupt_main()
def on_closed(self):
if self.is_closed == False:
_logger.warn("Unexpected close. Reconnecting.")
- self.ec = _EventClient(self.url, self.filters, self.on_event, self.last_log_id, self.on_closed)
- while True:
- try:
- self.ec.connect()
- break
- except Exception as e:
- _logger.warn("Error '%s' during websocket reconnect. Will retry after 5s.", e, exc_info=e)
- time.sleep(5)
+ for tries_left in RetryLoop(num_retries=25, backoff_start=.1, max_wait=15):
+ try:
+ self.ec = _EventClient(self.url, self.filters, self.on_event, self.last_log_id, self.on_closed)
+ self.ec.connect()
+ break
+ except Exception as e:
+ _logger.warn("Error '%s' during websocket reconnect.", e)
+ if tries_left == 0:
+ _logger.exception("EventClient thread could not contact websocket server.")
+ self.is_closed = True
+ thread.interrupt_main()
+ return
class PollClient(threading.Thread):
self.id = self.last_log_id
else:
for f in self.filters:
- items = self.api.logs().list(limit=1, order="id desc", filters=f).execute()['items']
+ for tries_left in RetryLoop(num_retries=25, backoff_start=.1, max_wait=self.poll_time):
+ try:
+ items = self.api.logs().list(limit=1, order="id desc", filters=f).execute()['items']
+ break
+ except errors.ApiError as error:
+ pass
+ else:
+ tries_left = 0
+ break
+ if tries_left == 0:
+ _logger.exception("PollClient thread could not contact API server.")
+ with self._closing_lock:
+ self._closing.set()
+ thread.interrupt_main()
+ return
if items:
if items[0]['id'] > self.id:
self.id = items[0]['id']
max_id = self.id
moreitems = False
for f in self.filters:
- items = self.api.logs().list(order="id asc", filters=f+[["id", ">", str(self.id)]]).execute()
+ for tries_left in RetryLoop(num_retries=25, backoff_start=.1, max_wait=self.poll_time):
+ try:
+ items = self.api.logs().list(order="id asc", filters=f+[["id", ">", str(self.id)]]).execute()
+ break
+ except errors.ApiError as error:
+ pass
+ else:
+ tries_left = 0
+ break
+ if tries_left == 0:
+ _logger.exception("PollClient thread could not contact API server.")
+ with self._closing_lock:
+ self._closing.set()
+ thread.interrupt_main()
+ return
for i in items["items"]:
if i['id'] > max_id:
max_id = i['id']
with self._closing_lock:
if self._closing.is_set():
return
- self.on_event(i)
+ try:
+ self.on_event(i)
+ except Exception as e:
+ _logger.exception("Unexpected exception from event callback.")
+ thread.interrupt_main()
if items["items_available"] > len(items["items"]):
moreitems = True
self.id = max_id
return loop.last_result()
"""
def __init__(self, num_retries, success_check=lambda r: True,
- backoff_start=0, backoff_growth=2, save_results=1):
+ backoff_start=0, backoff_growth=2, save_results=1,
+ max_wait=60):
"""Construct a new RetryLoop.
Arguments:
* save_results: Specify a number to save the last N results
that the loop recorded. These records are available through
the results attribute, oldest first. Default 1.
+ * max_wait: Maximum number of seconds to wait between retries.
"""
self.tries_left = num_retries + 1
self.check_result = success_check
self.backoff_wait = backoff_start
self.backoff_growth = backoff_growth
+ self.max_wait = max_wait
self.next_start_time = 0
self.results = deque(maxlen=save_results)
self._running = None
wait_time = max(0, self.next_start_time - time.time())
time.sleep(wait_time)
self.backoff_wait *= self.backoff_growth
+ if self.backoff_wait > self.max_wait:
+ self.backoff_wait = self.max_wait
self.next_start_time = time.time() + self.backoff_wait
self.tries_left -= 1
return self.tries_left
def test_backoff_multiplier(self, sleep_mock, time_mock):
self.run_loop(5, 500, 501, 502, 503, 504, 505,
- backoff_start=5, backoff_growth=10)
+ backoff_start=5, backoff_growth=10, max_wait=1000000000)
self.check_backoff(sleep_mock, 5, 9)
import arvados
import arvados.events
+import arvados.errors
from datetime import datetime, timedelta, tzinfo
import logging
import logging.handlers
self._test_subscribe(
poll_fallback=0.25, expect_type=arvados.events.PollClient, expected=1)
+ def test_subscribe_poll_retry(self):
+ api_mock = mock.MagicMock()
+ n = []
+ def on_ev(ev):
+ n.append(ev)
+
+ error_mock = mock.MagicMock()
+ error_mock.resp.status = 0
+ error_mock._get_reason.return_value = "testing"
+ api_mock.logs().list().execute.side_effect = (arvados.errors.ApiError(error_mock, ""),
+ {"items": [{"id": 1}], "items_available": 1},
+ arvados.errors.ApiError(error_mock, ""),
+ {"items": [{"id": 1}], "items_available": 1})
+ pc = arvados.events.PollClient(api_mock, [], on_ev, 15, None)
+ pc.start()
+ while len(n) < 2:
+ time.sleep(.1)
+ pc.close()
+
def test_subscribe_websocket_with_start_time_past(self):
self._test_subscribe(
poll_fallback=False, expect_type=arvados.events.EventClient,
# verify log messages to ensure retry happened
log_messages = logstream.getvalue()
- found = log_messages.find("Error 'EventClient.connect error' during websocket reconnect. Will retry")
+ found = log_messages.find("Error 'EventClient.connect error' during websocket reconnect.")
self.assertNotEqual(found, -1)
rootLogger.removeHandler(streamHandler)