import arvados
import config
import errors
+from retry import RetryLoop
import logging
import json
+import thread
import threading
import time
import os
def on_event(self, m):
if m.get('id') != None:
self.last_log_id = m.get('id')
- self.on_event_cb(m)
+ try:
+ self.on_event_cb(m)
+ except Exception as e:
+ _logger.exception("Unexpected exception from event callback.")
+ thread.interrupt_main()
def on_closed(self):
if self.is_closed == False:
_logger.warn("Unexpected close. Reconnecting.")
- self.ec = _EventClient(self.url, self.filters, self.on_event, self.last_log_id, self.on_closed)
- while True:
- try:
- self.ec.connect()
- break
- except Exception as e:
- _logger.warn("Error '%s' during websocket reconnect. Will retry after 5s.", e, exc_info=e)
- time.sleep(5)
+ for tries_left in RetryLoop(num_retries=25, backoff_start=.1, max_wait=15):
+ try:
+ self.ec = _EventClient(self.url, self.filters, self.on_event, self.last_log_id, self.on_closed)
+ self.ec.connect()
+ break
+ except Exception as e:
+ _logger.warn("Error '%s' during websocket reconnect.", e)
+ if tries_left == 0:
+ _logger.exception("EventClient thread could not contact websocket server.")
+ self.is_closed = True
+ thread.interrupt_main()
+ return
class PollClient(threading.Thread):
self.id = self.last_log_id
else:
for f in self.filters:
- items = self.api.logs().list(limit=1, order="id desc", filters=f).execute()['items']
+ for tries_left in RetryLoop(num_retries=25, backoff_start=.1, max_wait=self.poll_time):
+ try:
+ items = self.api.logs().list(limit=1, order="id desc", filters=f).execute()['items']
+ break
+ except errors.ApiError as error:
+ pass
+ else:
+ tries_left = 0
+ break
+ if tries_left == 0:
+ _logger.exception("PollClient thread could not contact API server.")
+ with self._closing_lock:
+ self._closing.set()
+ thread.interrupt_main()
+ return
if items:
if items[0]['id'] > self.id:
self.id = items[0]['id']
max_id = self.id
moreitems = False
for f in self.filters:
- items = self.api.logs().list(order="id asc", filters=f+[["id", ">", str(self.id)]]).execute()
+ for tries_left in RetryLoop(num_retries=25, backoff_start=.1, max_wait=self.poll_time):
+ try:
+ items = self.api.logs().list(order="id asc", filters=f+[["id", ">", str(self.id)]]).execute()
+ break
+ except errors.ApiError as error:
+ pass
+ else:
+ tries_left = 0
+ break
+ if tries_left == 0:
+ _logger.exception("PollClient thread could not contact API server.")
+ with self._closing_lock:
+ self._closing.set()
+ thread.interrupt_main()
+ return
for i in items["items"]:
if i['id'] > max_id:
max_id = i['id']
with self._closing_lock:
if self._closing.is_set():
return
- self.on_event(i)
+ try:
+ self.on_event(i)
+ except Exception as e:
+ _logger.exception("Unexpected exception from event callback.")
+ thread.interrupt_main()
if items["items_available"] > len(items["items"]):
moreitems = True
self.id = max_id