3609: --job and --pipeline logging implemented, needs testing
[arvados.git] / sdk / python / arvados / events.py
index beb34545370969ea44cfc05fd5d1e9ca49483279..faf638fb379a78819665952e2518b5f23f46811b 100644 (file)
@@ -1,5 +1,5 @@
 from ws4py.client.threadedclient import WebSocketClient
-import thread
+import threading
 import json
 import os
 import time
@@ -20,11 +20,11 @@ class EventClient(WebSocketClient):
             ssl_options={'cert_reqs': ssl.CERT_REQUIRED}
 
         super(EventClient, self).__init__(url, ssl_options)
-        self.filters = filters
+        self.filters = []
         self.on_event = on_event
 
     def opened(self):
-        self.send(json.dumps({"method": "subscribe", "filters": self.filters}))
+        self.subscribe(self.filters)
 
     def received_message(self, m):
         self.on_event(json.loads(str(m)))
@@ -36,6 +36,15 @@ class EventClient(WebSocketClient):
         except:
             pass
 
+    def subscribe(self, filters, last_log_id=None):
+        m = {"method": "subscribe", "filters": self.filters}
+        if last_log_id is not None:
+            m["last_log_id"] = last_log_id
+        self.send(json.dumps(m))
+
+    def unsubscribe(self, filters):
+        self.send(json.dumps({"method": "unsubscribe", "filters": self.filters}))
+
 class PollClient(threading.Thread):
     def __init__(self, api, filters, on_event):
         self.api = api
@@ -59,6 +68,12 @@ class PollClient(threading.Thread):
     def close_connection(self):
         self.loop = False
 
+    def subscribe(self, filters):
+        self.filters += filters
+
+    def unsubscribe(self, filters):
+        del self.filters[self.filters.index(filters)]
+
 def subscribe(api, filters, on_event):
     ws = None
     try:
@@ -67,6 +82,7 @@ def subscribe(api, filters, on_event):
             ws = EventClient(url, filters, on_event)
             ws.connect()
         else:
+            _logger.info("Web sockets not available, falling back to log table polling")
             ws = PollClient(api, filters, on_event)
         return ws
     except Exception: