import arvados
import arvados.events
+import arvados.errors
from datetime import datetime, timedelta, tzinfo
import logging
import logging.handlers
self._test_subscribe(
poll_fallback=0.25, expect_type=arvados.events.PollClient, expected=1)
+ def test_subscribe_poll_retry(self):
+ api_mock = mock.MagicMock()
+ n = []
+ def on_ev(ev):
+ n.append(ev)
+
+ error_mock = mock.MagicMock()
+ error_mock.resp.status = 0
+ error_mock._get_reason.return_value = "testing"
+ api_mock.logs().list().execute.side_effect = (arvados.errors.ApiError(error_mock, ""),
+ {"items": [{"id": 1}], "items_available": 1},
+ arvados.errors.ApiError(error_mock, ""),
+ {"items": [{"id": 1}], "items_available": 1})
+ pc = arvados.events.PollClient(api_mock, [], on_ev, 15, None)
+ pc.start()
+ while len(n) < 2:
+ time.sleep(.1)
+ pc.close()
+
def test_subscribe_websocket_with_start_time_past(self):
self._test_subscribe(
poll_fallback=False, expect_type=arvados.events.EventClient,