18842: Clean up keep cache set logic a little more
[arvados.git] / sdk / python / arvados / events.py
index cf26f9e8addb7b6e45e2af87a2504947878c7c6c..e53e4980a86f01a595649331d020c6b87e823e6a 100644 (file)
@@ -1,11 +1,20 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+from __future__ import absolute_import
+from future import standard_library
+standard_library.install_aliases()
+from builtins import str
+from builtins import object
 import arvados
-import config
-import errors
-from retry import RetryLoop
+from . import config
+from . import errors
+from .retry import RetryLoop
 
 import logging
 import json
-import thread
+import _thread
 import threading
 import time
 import os
@@ -115,22 +124,22 @@ class EventClient(object):
             self.on_event_cb(m)
         except Exception as e:
             _logger.exception("Unexpected exception from event callback.")
-            thread.interrupt_main()
+            _thread.interrupt_main()
 
     def on_closed(self):
         if not self.is_closed.is_set():
-            _logger.warn("Unexpected close. Reconnecting.")
+            _logger.warning("Unexpected close. Reconnecting.")
             for tries_left in RetryLoop(num_retries=25, backoff_start=.1, max_wait=15):
                 try:
                     self._setup_event_client()
-                    _logger.warn("Reconnect successful.")
+                    _logger.warning("Reconnect successful.")
                     break
                 except Exception as e:
-                    _logger.warn("Error '%s' during websocket reconnect.", e)
+                    _logger.warning("Error '%s' during websocket reconnect.", e)
             if tries_left == 0:
                 _logger.exception("EventClient thread could not contact websocket server.")
                 self.is_closed.set()
-                thread.interrupt_main()
+                _thread.interrupt_main()
                 return
 
     def run_forever(self):
@@ -154,16 +163,17 @@ class PollClient(threading.Thread):
         self._closing = threading.Event()
         self._closing_lock = threading.RLock()
 
-    def run(self):
         if self.last_log_id != None:
             # Caller supplied the last-seen event ID from a previous
-            # connection
-            skip_old_events = [["id", ">", str(self.last_log_id)]]
+            # connection.
+            self._skip_old_events = [["id", ">", str(self.last_log_id)]]
         else:
             # We need to do a reverse-order query to find the most
-            # recent event ID (see "if not skip_old_events" below).
-            skip_old_events = False
+            # recent event ID (see "if not self._skip_old_events"
+            # in run()).
+            self._skip_old_events = False
 
+    def run(self):
         self.on_event({'status': 200})
 
         while not self._closing.is_set():
@@ -171,7 +181,7 @@ class PollClient(threading.Thread):
             for f in self.filters:
                 for tries_left in RetryLoop(num_retries=25, backoff_start=.1, max_wait=self.poll_time):
                     try:
-                        if not skip_old_events:
+                        if not self._skip_old_events:
                             # If the caller didn't provide a known
                             # recent ID, our first request will ask
                             # for the single most recent event from
@@ -189,7 +199,11 @@ class PollClient(threading.Thread):
                             # filter on that same cutoff time, or
                             # (once we see our first matching event)
                             # the ID of the last-seen event.
-                            skip_old_events = [[
+                            #
+                            # Note: self._skip_old_events must not be
+                            # set until the threshold is decided.
+                            # Otherwise, tests will be unreliable.
+                            filter_by_time = [[
                                 "created_at", ">=",
                                 time.strftime(
                                     "%Y-%m-%dT%H:%M:%SZ",
@@ -197,14 +211,19 @@ class PollClient(threading.Thread):
                             items = self.api.logs().list(
                                 order="id desc",
                                 limit=1,
-                                filters=f+skip_old_events).execute()
+                                filters=f+filter_by_time).execute()
                             if items["items"]:
-                                skip_old_events = [
+                                self._skip_old_events = [
                                     ["id", ">", str(items["items"][0]["id"])]]
                                 items = {
                                     "items": [],
                                     "items_available": 0,
                                 }
+                            else:
+                                # No recent events. We can keep using
+                                # the same timestamp threshold until
+                                # we receive our first new event.
+                                self._skip_old_events = filter_by_time
                         else:
                             # In this case, either we know the most
                             # recent matching ID, or we know there
@@ -214,7 +233,7 @@ class PollClient(threading.Thread):
                             # order.
                             items = self.api.logs().list(
                                 order="id asc",
-                                filters=f+skip_old_events).execute()
+                                filters=f+self._skip_old_events).execute()
                         break
                     except errors.ApiError as error:
                         pass
@@ -225,10 +244,10 @@ class PollClient(threading.Thread):
                     _logger.exception("PollClient thread could not contact API server.")
                     with self._closing_lock:
                         self._closing.set()
-                    thread.interrupt_main()
+                    _thread.interrupt_main()
                     return
                 for i in items["items"]:
-                    skip_old_events = [["id", ">", str(i["id"])]]
+                    self._skip_old_events = [["id", ">", str(i["id"])]]
                     with self._closing_lock:
                         if self._closing.is_set():
                             return
@@ -236,7 +255,7 @@ class PollClient(threading.Thread):
                             self.on_event(i)
                         except Exception as e:
                             _logger.exception("Unexpected exception from event callback.")
-                            thread.interrupt_main()
+                            _thread.interrupt_main()
                 if items["items_available"] > len(items["items"]):
                     moreitems = True
             if not moreitems:
@@ -288,7 +307,7 @@ def _subscribe_websocket(api, filters, on_event, last_log_id=None):
     try:
         client = EventClient(uri_with_token, filters, on_event, last_log_id)
     except Exception:
-        _logger.warn("Failed to connect to websockets on %s" % endpoint)
+        _logger.warning("Failed to connect to websockets on %s" % endpoint)
         raise
     else:
         return client
@@ -317,7 +336,7 @@ def subscribe(api, filters, on_event, poll_fallback=15, last_log_id=None):
         else:
             _logger.info("Using polling because ARVADOS_DISABLE_WEBSOCKETS is true")
     except Exception as e:
-        _logger.warn("Falling back to polling after websocket error: %s" % e)
+        _logger.warning("Falling back to polling after websocket error: %s" % e)
     p = PollClient(api, filters, on_event, poll_fallback, last_log_id)
     p.start()
     return p