- client.connect()
- ok = True
- return client
- finally:
- if not ok:
- client.close_connection()
-
-def subscribe(api, filters, on_event, poll_fallback=15):
- '''
- api: a client object retrieved from arvados.api(). The caller should not use this client object for anything else after calling subscribe().
- filters: Initial subscription filters.
- on_event: The callback when a message is received.
- poll_fallback: If websockets are not available, fall back to polling every N seconds. If poll_fallback=False, this will return None if websockets are not available.
- '''
+ uri_with_token = "{}?api_token={}".format(endpoint, api.api_token)
+ client = EventClient(uri_with_token, filters, on_event, last_log_id)
+ ok = False
+ try:
+ client.connect()
+ ok = True
+ return client
+ finally:
+ if not ok:
+ client.close_connection()
+ except:
+ _logger.warn("Failed to connect to websockets on %s" % endpoint)
+ raise
+
+
+def subscribe(api, filters, on_event, poll_fallback=15, last_log_id=None):
+ """
+ :api:
+ a client object retrieved from arvados.api(). The caller should not use this client object for anything else after calling subscribe().
+ :filters:
+ Initial subscription filters.
+ :on_event:
+ The callback when a message is received.
+ :poll_fallback:
+ If websockets are not available, fall back to polling every N seconds. If poll_fallback=False, this will return None if websockets are not available.
+ :last_log_id:
+ Log rows that are newer than the log id
+ """
+