18700: Add workbench2 to arvados-boot.
[arvados.git] / sdk / python / arvados / events.py
index c58abe52be48730e56edb70d5e5109a0e3847592..e53e4980a86f01a595649331d020c6b87e823e6a 100644 (file)
@@ -1,3 +1,7 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
 from __future__ import absolute_import
 from future import standard_library
 standard_library.install_aliases()
@@ -124,14 +128,14 @@ class EventClient(object):
 
     def on_closed(self):
         if not self.is_closed.is_set():
-            _logger.warn("Unexpected close. Reconnecting.")
+            _logger.warning("Unexpected close. Reconnecting.")
             for tries_left in RetryLoop(num_retries=25, backoff_start=.1, max_wait=15):
                 try:
                     self._setup_event_client()
-                    _logger.warn("Reconnect successful.")
+                    _logger.warning("Reconnect successful.")
                     break
                 except Exception as e:
-                    _logger.warn("Error '%s' during websocket reconnect.", e)
+                    _logger.warning("Error '%s' during websocket reconnect.", e)
             if tries_left == 0:
                 _logger.exception("EventClient thread could not contact websocket server.")
                 self.is_closed.set()
@@ -159,16 +163,17 @@ class PollClient(threading.Thread):
         self._closing = threading.Event()
         self._closing_lock = threading.RLock()
 
-    def run(self):
         if self.last_log_id != None:
             # Caller supplied the last-seen event ID from a previous
-            # connection
-            skip_old_events = [["id", ">", str(self.last_log_id)]]
+            # connection.
+            self._skip_old_events = [["id", ">", str(self.last_log_id)]]
         else:
             # We need to do a reverse-order query to find the most
-            # recent event ID (see "if not skip_old_events" below).
-            skip_old_events = False
+            # recent event ID (see "if not self._skip_old_events"
+            # in run()).
+            self._skip_old_events = False
 
+    def run(self):
         self.on_event({'status': 200})
 
         while not self._closing.is_set():
@@ -176,7 +181,7 @@ class PollClient(threading.Thread):
             for f in self.filters:
                 for tries_left in RetryLoop(num_retries=25, backoff_start=.1, max_wait=self.poll_time):
                     try:
-                        if not skip_old_events:
+                        if not self._skip_old_events:
                             # If the caller didn't provide a known
                             # recent ID, our first request will ask
                             # for the single most recent event from
@@ -194,7 +199,11 @@ class PollClient(threading.Thread):
                             # filter on that same cutoff time, or
                             # (once we see our first matching event)
                             # the ID of the last-seen event.
-                            skip_old_events = [[
+                            #
+                            # Note: self._skip_old_events must not be
+                            # set until the threshold is decided.
+                            # Otherwise, tests will be unreliable.
+                            filter_by_time = [[
                                 "created_at", ">=",
                                 time.strftime(
                                     "%Y-%m-%dT%H:%M:%SZ",
@@ -202,14 +211,19 @@ class PollClient(threading.Thread):
                             items = self.api.logs().list(
                                 order="id desc",
                                 limit=1,
-                                filters=f+skip_old_events).execute()
+                                filters=f+filter_by_time).execute()
                             if items["items"]:
-                                skip_old_events = [
+                                self._skip_old_events = [
                                     ["id", ">", str(items["items"][0]["id"])]]
                                 items = {
                                     "items": [],
                                     "items_available": 0,
                                 }
+                            else:
+                                # No recent events. We can keep using
+                                # the same timestamp threshold until
+                                # we receive our first new event.
+                                self._skip_old_events = filter_by_time
                         else:
                             # In this case, either we know the most
                             # recent matching ID, or we know there
@@ -219,7 +233,7 @@ class PollClient(threading.Thread):
                             # order.
                             items = self.api.logs().list(
                                 order="id asc",
-                                filters=f+skip_old_events).execute()
+                                filters=f+self._skip_old_events).execute()
                         break
                     except errors.ApiError as error:
                         pass
@@ -233,7 +247,7 @@ class PollClient(threading.Thread):
                     _thread.interrupt_main()
                     return
                 for i in items["items"]:
-                    skip_old_events = [["id", ">", str(i["id"])]]
+                    self._skip_old_events = [["id", ">", str(i["id"])]]
                     with self._closing_lock:
                         if self._closing.is_set():
                             return
@@ -293,7 +307,7 @@ def _subscribe_websocket(api, filters, on_event, last_log_id=None):
     try:
         client = EventClient(uri_with_token, filters, on_event, last_log_id)
     except Exception:
-        _logger.warn("Failed to connect to websockets on %s" % endpoint)
+        _logger.warning("Failed to connect to websockets on %s" % endpoint)
         raise
     else:
         return client
@@ -322,7 +336,7 @@ def subscribe(api, filters, on_event, poll_fallback=15, last_log_id=None):
         else:
             _logger.info("Using polling because ARVADOS_DISABLE_WEBSOCKETS is true")
     except Exception as e:
-        _logger.warn("Falling back to polling after websocket error: %s" % e)
+        _logger.warning("Falling back to polling after websocket error: %s" % e)
     p = PollClient(api, filters, on_event, poll_fallback, last_log_id)
     p.start()
     return p