projects
/
arvados.git
/ blobdiff
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
17962: Add line about printing to stdout on description
[arvados.git]
/
sdk
/
python
/
arvados
/
events.py
diff --git
a/sdk/python/arvados/events.py
b/sdk/python/arvados/events.py
index b385761f7b7b1575dc576e783b3ae61e45eeb2cf..e53e4980a86f01a595649331d020c6b87e823e6a 100644
(file)
--- a/
sdk/python/arvados/events.py
+++ b/
sdk/python/arvados/events.py
@@
-1,4
+1,12
@@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
from __future__ import absolute_import
from __future__ import absolute_import
+from future import standard_library
+standard_library.install_aliases()
+from builtins import str
+from builtins import object
import arvados
from . import config
from . import errors
import arvados
from . import config
from . import errors
@@
-6,7
+14,7
@@
from .retry import RetryLoop
import logging
import json
import logging
import json
-import thread
+import
_
thread
import threading
import time
import os
import threading
import time
import os
@@
-116,22
+124,22
@@
class EventClient(object):
self.on_event_cb(m)
except Exception as e:
_logger.exception("Unexpected exception from event callback.")
self.on_event_cb(m)
except Exception as e:
_logger.exception("Unexpected exception from event callback.")
- thread.interrupt_main()
+
_
thread.interrupt_main()
def on_closed(self):
if not self.is_closed.is_set():
def on_closed(self):
if not self.is_closed.is_set():
- _logger.warn("Unexpected close. Reconnecting.")
+ _logger.warn
ing
("Unexpected close. Reconnecting.")
for tries_left in RetryLoop(num_retries=25, backoff_start=.1, max_wait=15):
try:
self._setup_event_client()
for tries_left in RetryLoop(num_retries=25, backoff_start=.1, max_wait=15):
try:
self._setup_event_client()
- _logger.warn("Reconnect successful.")
+ _logger.warn
ing
("Reconnect successful.")
break
except Exception as e:
break
except Exception as e:
- _logger.warn("Error '%s' during websocket reconnect.", e)
+ _logger.warn
ing
("Error '%s' during websocket reconnect.", e)
if tries_left == 0:
_logger.exception("EventClient thread could not contact websocket server.")
self.is_closed.set()
if tries_left == 0:
_logger.exception("EventClient thread could not contact websocket server.")
self.is_closed.set()
- thread.interrupt_main()
+
_
thread.interrupt_main()
return
def run_forever(self):
return
def run_forever(self):
@@
-155,16
+163,17
@@
class PollClient(threading.Thread):
self._closing = threading.Event()
self._closing_lock = threading.RLock()
self._closing = threading.Event()
self._closing_lock = threading.RLock()
- def run(self):
if self.last_log_id != None:
# Caller supplied the last-seen event ID from a previous
if self.last_log_id != None:
# Caller supplied the last-seen event ID from a previous
- # connection
- skip_old_events = [["id", ">", str(self.last_log_id)]]
+ # connection
.
+ s
elf._s
kip_old_events = [["id", ">", str(self.last_log_id)]]
else:
# We need to do a reverse-order query to find the most
else:
# We need to do a reverse-order query to find the most
- # recent event ID (see "if not skip_old_events" below).
- skip_old_events = False
+ # recent event ID (see "if not self._skip_old_events"
+ # in run()).
+ self._skip_old_events = False
+ def run(self):
self.on_event({'status': 200})
while not self._closing.is_set():
self.on_event({'status': 200})
while not self._closing.is_set():
@@
-172,7
+181,7
@@
class PollClient(threading.Thread):
for f in self.filters:
for tries_left in RetryLoop(num_retries=25, backoff_start=.1, max_wait=self.poll_time):
try:
for f in self.filters:
for tries_left in RetryLoop(num_retries=25, backoff_start=.1, max_wait=self.poll_time):
try:
- if not skip_old_events:
+ if not s
elf._s
kip_old_events:
# If the caller didn't provide a known
# recent ID, our first request will ask
# for the single most recent event from
# If the caller didn't provide a known
# recent ID, our first request will ask
# for the single most recent event from
@@
-190,7
+199,11
@@
class PollClient(threading.Thread):
# filter on that same cutoff time, or
# (once we see our first matching event)
# the ID of the last-seen event.
# filter on that same cutoff time, or
# (once we see our first matching event)
# the ID of the last-seen event.
- skip_old_events = [[
+ #
+ # Note: self._skip_old_events must not be
+ # set until the threshold is decided.
+ # Otherwise, tests will be unreliable.
+ filter_by_time = [[
"created_at", ">=",
time.strftime(
"%Y-%m-%dT%H:%M:%SZ",
"created_at", ">=",
time.strftime(
"%Y-%m-%dT%H:%M:%SZ",
@@
-198,14
+211,19
@@
class PollClient(threading.Thread):
items = self.api.logs().list(
order="id desc",
limit=1,
items = self.api.logs().list(
order="id desc",
limit=1,
- filters=f+
skip_old_events
).execute()
+ filters=f+
filter_by_time
).execute()
if items["items"]:
if items["items"]:
- skip_old_events = [
+ s
elf._s
kip_old_events = [
["id", ">", str(items["items"][0]["id"])]]
items = {
"items": [],
"items_available": 0,
}
["id", ">", str(items["items"][0]["id"])]]
items = {
"items": [],
"items_available": 0,
}
+ else:
+ # No recent events. We can keep using
+ # the same timestamp threshold until
+ # we receive our first new event.
+ self._skip_old_events = filter_by_time
else:
# In this case, either we know the most
# recent matching ID, or we know there
else:
# In this case, either we know the most
# recent matching ID, or we know there
@@
-215,7
+233,7
@@
class PollClient(threading.Thread):
# order.
items = self.api.logs().list(
order="id asc",
# order.
items = self.api.logs().list(
order="id asc",
- filters=f+skip_old_events).execute()
+ filters=f+s
elf._s
kip_old_events).execute()
break
except errors.ApiError as error:
pass
break
except errors.ApiError as error:
pass
@@
-226,10
+244,10
@@
class PollClient(threading.Thread):
_logger.exception("PollClient thread could not contact API server.")
with self._closing_lock:
self._closing.set()
_logger.exception("PollClient thread could not contact API server.")
with self._closing_lock:
self._closing.set()
- thread.interrupt_main()
+
_
thread.interrupt_main()
return
for i in items["items"]:
return
for i in items["items"]:
- skip_old_events = [["id", ">", str(i["id"])]]
+ s
elf._s
kip_old_events = [["id", ">", str(i["id"])]]
with self._closing_lock:
if self._closing.is_set():
return
with self._closing_lock:
if self._closing.is_set():
return
@@
-237,7
+255,7
@@
class PollClient(threading.Thread):
self.on_event(i)
except Exception as e:
_logger.exception("Unexpected exception from event callback.")
self.on_event(i)
except Exception as e:
_logger.exception("Unexpected exception from event callback.")
- thread.interrupt_main()
+
_
thread.interrupt_main()
if items["items_available"] > len(items["items"]):
moreitems = True
if not moreitems:
if items["items_available"] > len(items["items"]):
moreitems = True
if not moreitems:
@@
-289,7
+307,7
@@
def _subscribe_websocket(api, filters, on_event, last_log_id=None):
try:
client = EventClient(uri_with_token, filters, on_event, last_log_id)
except Exception:
try:
client = EventClient(uri_with_token, filters, on_event, last_log_id)
except Exception:
- _logger.warn("Failed to connect to websockets on %s" % endpoint)
+ _logger.warn
ing
("Failed to connect to websockets on %s" % endpoint)
raise
else:
return client
raise
else:
return client
@@
-318,7
+336,7
@@
def subscribe(api, filters, on_event, poll_fallback=15, last_log_id=None):
else:
_logger.info("Using polling because ARVADOS_DISABLE_WEBSOCKETS is true")
except Exception as e:
else:
_logger.info("Using polling because ARVADOS_DISABLE_WEBSOCKETS is true")
except Exception as e:
- _logger.warn("Falling back to polling after websocket error: %s" % e)
+ _logger.warn
ing
("Falling back to polling after websocket error: %s" % e)
p = PollClient(api, filters, on_event, poll_fallback, last_log_id)
p.start()
return p
p = PollClient(api, filters, on_event, poll_fallback, last_log_id)
p.start()
return p