+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
from __future__ import absolute_import
from future import standard_library
standard_library.install_aliases()
self._closing = threading.Event()
self._closing_lock = threading.RLock()
- def run(self):
if self.last_log_id != None:
# Caller supplied the last-seen event ID from a previous
- # connection
- skip_old_events = [["id", ">", str(self.last_log_id)]]
+ # connection.
+ self._skip_old_events = [["id", ">", str(self.last_log_id)]]
else:
# We need to do a reverse-order query to find the most
- # recent event ID (see "if not skip_old_events" below).
- skip_old_events = False
+ # recent event ID (see "if not self._skip_old_events"
+ # in run()).
+ self._skip_old_events = False
+ def run(self):
self.on_event({'status': 200})
while not self._closing.is_set():
for f in self.filters:
for tries_left in RetryLoop(num_retries=25, backoff_start=.1, max_wait=self.poll_time):
try:
- if not skip_old_events:
+ if not self._skip_old_events:
# If the caller didn't provide a known
# recent ID, our first request will ask
# for the single most recent event from
# filter on that same cutoff time, or
# (once we see our first matching event)
# the ID of the last-seen event.
- skip_old_events = [[
+ #
+ # Note: self._skip_old_events must not be
+ # set until the threshold is decided.
+ # Otherwise, tests will be unreliable.
+ filter_by_time = [[
"created_at", ">=",
time.strftime(
"%Y-%m-%dT%H:%M:%SZ",
items = self.api.logs().list(
order="id desc",
limit=1,
- filters=f+skip_old_events).execute()
+ filters=f+filter_by_time).execute()
if items["items"]:
- skip_old_events = [
+ self._skip_old_events = [
["id", ">", str(items["items"][0]["id"])]]
items = {
"items": [],
"items_available": 0,
}
+ else:
+ # No recent events. We can keep using
+ # the same timestamp threshold until
+ # we receive our first new event.
+ self._skip_old_events = filter_by_time
else:
# In this case, either we know the most
# recent matching ID, or we know there
# order.
items = self.api.logs().list(
order="id asc",
- filters=f+skip_old_events).execute()
+ filters=f+self._skip_old_events).execute()
break
except errors.ApiError as error:
pass
_thread.interrupt_main()
return
for i in items["items"]:
- skip_old_events = [["id", ">", str(i["id"])]]
+ self._skip_old_events = [["id", ">", str(i["id"])]]
with self._closing_lock:
if self._closing.is_set():
return