3021: Propagate unhandled exceptions back to the caller instead of returning None...
authorTom Clegg <tom@curoverse.com>
Mon, 2 Feb 2015 00:21:02 +0000 (19:21 -0500)
committerTom Clegg <tom@curoverse.com>
Mon, 2 Feb 2015 00:21:20 +0000 (19:21 -0500)
sdk/python/arvados/errors.py
sdk/python/arvados/events.py

index f70fa17149711d1d869af4efa104d638b8d64ebc..16f4096572c422cf37a93a7110b19c45aa8f1061 100644 (file)
@@ -74,3 +74,5 @@ class NoKeepServersError(Exception):
     pass
 class StaleWriterStateError(Exception):
     pass
+class FeatureNotEnabledError(Exception):
+    pass
index 94dd62b4ed1370cd239106c78abbbf8b249cb832..268692637afb7a45721322f1617b3aea50788d60 100644 (file)
@@ -1,13 +1,15 @@
-from ws4py.client.threadedclient import WebSocketClient
-import threading
+import arvados
+import config
+import errors
+
+import logging
 import json
-import os
+import threading
 import time
-import ssl
+import os
 import re
-import config
-import logging
-import arvados
+import ssl
+from ws4py.client.threadedclient import WebSocketClient
 
 _logger = logging.getLogger('arvados.events')
 
@@ -110,6 +112,22 @@ class PollClient(threading.Thread):
         del self.filters[self.filters.index(filters)]
 
 
+def _subscribe_websocket(api, filters, on_event):
+    endpoint = api._rootDesc.get('websocketUrl', None)
+    if not endpoint:
+        raise errors.FeatureNotEnabledError(
+            "Server does not advertise a websocket endpoint")
+    uri_with_token = "{}?api_token={}".format(endpoint, api.api_token)
+    client = EventClient(uri_with_token, filters, on_event)
+    ok = False
+    try:
+        client.connect()
+        ok = True
+        return client
+    finally:
+        if not ok:
+            client.close_connection()
+
 def subscribe(api, filters, on_event, poll_fallback=15):
     '''
     api: a client object retrieved from arvados.api(). The caller should not use this client object for anything else after calling subscribe().
@@ -117,22 +135,13 @@ def subscribe(api, filters, on_event, poll_fallback=15):
     on_event: The callback when a message is received.
     poll_fallback: If websockets are not available, fall back to polling every N seconds.  If poll_fallback=False, this will return None if websockets are not available.
     '''
-    ws = None
-    if 'websocketUrl' in api._rootDesc:
-        try:
-            url = "{}?api_token={}".format(api._rootDesc['websocketUrl'], api.api_token)
-            ws = EventClient(url, filters, on_event)
-            ws.connect()
-            return ws
-        except Exception as e:
-            _logger.warn("Got exception %s trying to connect to websockets at %s" % (e, api._rootDesc['websocketUrl']))
-            if ws:
-                ws.close_connection()
-    if poll_fallback:
-        _logger.warn("Websockets not available, falling back to log table polling")
-        p = PollClient(api, filters, on_event, poll_fallback)
-        p.start()
-        return p
-    else:
-        _logger.error("Websockets not available")
-        return None
+    if not poll_fallback:
+        return _subscribe_websocket(api, filters, on_event)
+
+    try:
+        return _subscribe_websocket(api, filters, on_event)
+    except Exception as e:
+        _logger.warn("Falling back to polling after websocket error: %s" % e)
+    p = PollClient(api, filters, on_event, poll_fallback)
+    p.start()
+    return p