Merge branch 'master' into 6473-fetch-events-starting-at
[arvados.git] / sdk / python / arvados / events.py
index b1872002ebe5cd4b6df40fc102d137a61acbb9d1..e319be38ce4eb53b3880d0d5684f2eca16dcbf0e 100644 (file)
@@ -53,7 +53,7 @@ class EventClient(WebSocketClient):
         self.send(json.dumps({"method": "unsubscribe", "filters": filters}))
 
 class PollClient(threading.Thread):
-    def __init__(self, api, filters, on_event, poll_time):
+    def __init__(self, api, filters, on_event, poll_time, last_log_id):
         super(PollClient, self).__init__()
         self.api = api
         if filters:
@@ -64,14 +64,18 @@ class PollClient(threading.Thread):
         self.poll_time = poll_time
         self.daemon = True
         self.stop = threading.Event()
+        self.last_log_id = last_log_id
 
     def run(self):
         self.id = 0
-        for f in self.filters:
-            items = self.api.logs().list(limit=1, order="id desc", filters=f).execute()['items']
-            if items:
-                if items[0]['id'] > self.id:
-                    self.id = items[0]['id']
+        if self.last_log_id != None:
+            self.id = self.last_log_id
+        else:
+            for f in self.filters:
+                items = self.api.logs().list(limit=1, order="id desc", filters=f).execute()['items']
+                if items:
+                    if items[0]['id'] > self.id:
+                        self.id = items[0]['id']
 
         self.on_event({'status': 200})
 
@@ -147,6 +151,6 @@ def subscribe(api, filters, on_event, poll_fallback=15, last_log_id=None):
         return _subscribe_websocket(api, filters, on_event, last_log_id)
     except Exception as e:
         _logger.warn("Falling back to polling after websocket error: %s" % e)
-    p = PollClient(api, filters, on_event, poll_fallback)
+    p = PollClient(api, filters, on_event, poll_fallback, last_log_id)
     p.start()
     return p