18842: Clean up keep cache set logic a little more
[arvados.git] / sdk / python / arvados / events.py
index 6b3b21f82a7acd07554558ab77be751a4cec268c..e53e4980a86f01a595649331d020c6b87e823e6a 100644 (file)
@@ -1,3 +1,7 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
 from __future__ import absolute_import
 from future import standard_library
 standard_library.install_aliases()
@@ -159,16 +163,17 @@ class PollClient(threading.Thread):
         self._closing = threading.Event()
         self._closing_lock = threading.RLock()
 
-    def run(self):
         if self.last_log_id != None:
             # Caller supplied the last-seen event ID from a previous
-            # connection
-            skip_old_events = [["id", ">", str(self.last_log_id)]]
+            # connection.
+            self._skip_old_events = [["id", ">", str(self.last_log_id)]]
         else:
             # We need to do a reverse-order query to find the most
-            # recent event ID (see "if not skip_old_events" below).
-            skip_old_events = False
+            # recent event ID (see "if not self._skip_old_events"
+            # in run()).
+            self._skip_old_events = False
 
+    def run(self):
         self.on_event({'status': 200})
 
         while not self._closing.is_set():
@@ -176,7 +181,7 @@ class PollClient(threading.Thread):
             for f in self.filters:
                 for tries_left in RetryLoop(num_retries=25, backoff_start=.1, max_wait=self.poll_time):
                     try:
-                        if not skip_old_events:
+                        if not self._skip_old_events:
                             # If the caller didn't provide a known
                             # recent ID, our first request will ask
                             # for the single most recent event from
@@ -194,7 +199,11 @@ class PollClient(threading.Thread):
                             # filter on that same cutoff time, or
                             # (once we see our first matching event)
                             # the ID of the last-seen event.
-                            skip_old_events = [[
+                            #
+                            # Note: self._skip_old_events must not be
+                            # set until the threshold is decided.
+                            # Otherwise, tests will be unreliable.
+                            filter_by_time = [[
                                 "created_at", ">=",
                                 time.strftime(
                                     "%Y-%m-%dT%H:%M:%SZ",
@@ -202,14 +211,19 @@ class PollClient(threading.Thread):
                             items = self.api.logs().list(
                                 order="id desc",
                                 limit=1,
-                                filters=f+skip_old_events).execute()
+                                filters=f+filter_by_time).execute()
                             if items["items"]:
-                                skip_old_events = [
+                                self._skip_old_events = [
                                     ["id", ">", str(items["items"][0]["id"])]]
                                 items = {
                                     "items": [],
                                     "items_available": 0,
                                 }
+                            else:
+                                # No recent events. We can keep using
+                                # the same timestamp threshold until
+                                # we receive our first new event.
+                                self._skip_old_events = filter_by_time
                         else:
                             # In this case, either we know the most
                             # recent matching ID, or we know there
@@ -219,7 +233,7 @@ class PollClient(threading.Thread):
                             # order.
                             items = self.api.logs().list(
                                 order="id asc",
-                                filters=f+skip_old_events).execute()
+                                filters=f+self._skip_old_events).execute()
                         break
                     except errors.ApiError as error:
                         pass
@@ -233,7 +247,7 @@ class PollClient(threading.Thread):
                     _thread.interrupt_main()
                     return
                 for i in items["items"]:
-                    skip_old_events = [["id", ">", str(i["id"])]]
+                    self._skip_old_events = [["id", ">", str(i["id"])]]
                     with self._closing_lock:
                         if self._closing.is_set():
                             return