3609: --job and --pipeline logging implemented, needs testing
authorPeter Amstutz <peter.amstutz@curoverse.com>
Fri, 10 Oct 2014 14:52:26 +0000 (10:52 -0400)
committerPeter Amstutz <peter.amstutz@curoverse.com>
Fri, 10 Oct 2014 14:52:26 +0000 (10:52 -0400)
sdk/python/arvados/commands/ws.py
sdk/python/arvados/events.py

index f421d622b05d4e61c7f48c2234dfe6dd3cff7d36..8656f9265c9870710655db08c781a671de560edd 100644 (file)
@@ -13,8 +13,11 @@ def main(arguments=None):
     parser = argparse.ArgumentParser()
     parser.add_argument('-u', '--uuid', type=str, default="")
     parser.add_argument('-f', '--filters', type=str, default="")
-    parser.add_argument('-p', '--pipeline', type=str, default="", help="Print log output from a pipeline and its jobs")
-    parser.add_argument('-j', '--job', type=str, default="", help="Print log output from a job")
+
+    group = parser.add_argument_group('group')
+    group.add_argument('-p', '--pipeline', type=str, default="", help="Print log output from a pipeline and its jobs")
+    group.add_argument('-j', '--job', type=str, default="", help="Print log output from a job")
+
     args = parser.parse_args(arguments)
 
     filters = []
@@ -28,12 +31,31 @@ def main(arguments=None):
         filters += [ ['object_uuid', '=', args.pipeline] ]
 
     if args.job:
-        filters += [ ['object_uuid', '=', args.job] ]
+        filters += [ ['object_uuid', '=', args.job] ], ['event_type', 'in', ['stderr', 'stdout'] ]
 
     api = arvados.api('v1', cache=False)
 
+    known_component_jobs = set()
     def on_message(ev):
-        print json.dumps(ev)
+        if args.pipeline or args.job:
+            if ev['event_type'] in ('stderr', 'stdout'):
+                print x["properties"]["text"]
+            elif x["event_type"] in ("create", "update"):
+                if args.job or x["object_kind"] == "arvados#pipeline_instance":
+                    if x["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
+                        ws.close_connection()
+                if x["object_kind"] == "arvados#pipeline_instance":
+                    pipeline_jobs = set()
+                    for c in x["properties"]["new_attributes"]["components"]:
+                        if "job" in x["properties"]["new_attributes"]["components"][c]:
+                            pipeline_jobs.add(x["properties"]["new_attributes"]["components"][c]["job"]["uuid"])
+                    if known_component_jobs != pipeline_jobs:
+                        ws.unsubscribe(filters)
+                        filters = [['object_uuid', 'in', [args.pipeline] + list(pipeline_jobs)]]
+                        ws.subscribe([['object_uuid', 'in', [args.pipeline] + list(pipeline_jobs)]])
+                        known_component_jobs = pipeline_jobs
+        else:
+            print json.dumps(ev)
 
     ws = None
     try:
index beb34545370969ea44cfc05fd5d1e9ca49483279..faf638fb379a78819665952e2518b5f23f46811b 100644 (file)
@@ -1,5 +1,5 @@
 from ws4py.client.threadedclient import WebSocketClient
-import thread
+import threading
 import json
 import os
 import time
@@ -20,11 +20,11 @@ class EventClient(WebSocketClient):
             ssl_options={'cert_reqs': ssl.CERT_REQUIRED}
 
         super(EventClient, self).__init__(url, ssl_options)
-        self.filters = filters
+        self.filters = []
         self.on_event = on_event
 
     def opened(self):
-        self.send(json.dumps({"method": "subscribe", "filters": self.filters}))
+        self.subscribe(self.filters)
 
     def received_message(self, m):
         self.on_event(json.loads(str(m)))
@@ -36,6 +36,15 @@ class EventClient(WebSocketClient):
         except:
             pass
 
+    def subscribe(self, filters, last_log_id=None):
+        m = {"method": "subscribe", "filters": self.filters}
+        if last_log_id is not None:
+            m["last_log_id"] = last_log_id
+        self.send(json.dumps(m))
+
+    def unsubscribe(self, filters):
+        self.send(json.dumps({"method": "unsubscribe", "filters": self.filters}))
+
 class PollClient(threading.Thread):
     def __init__(self, api, filters, on_event):
         self.api = api
@@ -59,6 +68,12 @@ class PollClient(threading.Thread):
     def close_connection(self):
         self.loop = False
 
+    def subscribe(self, filters):
+        self.filters += filters
+
+    def unsubscribe(self, filters):
+        del self.filters[self.filters.index(filters)]
+
 def subscribe(api, filters, on_event):
     ws = None
     try:
@@ -67,6 +82,7 @@ def subscribe(api, filters, on_event):
             ws = EventClient(url, filters, on_event)
             ws.connect()
         else:
+            _logger.info("Web sockets not available, falling back to log table polling")
             ws = PollClient(api, filters, on_event)
         return ws
     except Exception: