3609: Refactoring arv-ws into a reusable command module. Working on adding
authorPeter Amstutz <peter.amstutz@curoverse.com>
Fri, 10 Oct 2014 14:17:40 +0000 (10:17 -0400)
committerPeter Amstutz <peter.amstutz@curoverse.com>
Fri, 10 Oct 2014 14:17:40 +0000 (10:17 -0400)
ability to monitor pipeline/job log at command line.

sdk/python/arvados/commands/ws.py [new file with mode: 0644]
sdk/python/arvados/events.py
sdk/python/bin/arv-ws

diff --git a/sdk/python/arvados/commands/ws.py b/sdk/python/arvados/commands/ws.py
new file mode 100644 (file)
index 0000000..f421d62
--- /dev/null
@@ -0,0 +1,48 @@
+#!/usr/bin/env python
+
+import sys
+import logging
+import argparse
+import arvados
+import json
+from arvados.events import subscribe
+
+def main(arguments=None):
+    logger = logging.getLogger('arvados.arv-ws')
+
+    parser = argparse.ArgumentParser()
+    parser.add_argument('-u', '--uuid', type=str, default="")
+    parser.add_argument('-f', '--filters', type=str, default="")
+    parser.add_argument('-p', '--pipeline', type=str, default="", help="Print log output from a pipeline and its jobs")
+    parser.add_argument('-j', '--job', type=str, default="", help="Print log output from a job")
+    args = parser.parse_args(arguments)
+
+    filters = []
+    if args.uuid:
+        filters += [ ['object_uuid', '=', args.uuid] ]
+
+    if args.filters:
+        filters += json.loads(args.filters)
+
+    if args.pipeline:
+        filters += [ ['object_uuid', '=', args.pipeline] ]
+
+    if args.job:
+        filters += [ ['object_uuid', '=', args.job] ]
+
+    api = arvados.api('v1', cache=False)
+
+    def on_message(ev):
+        print json.dumps(ev)
+
+    ws = None
+    try:
+        ws = subscribe(api, filters, lambda ev: on_message(ev))
+        ws.run_forever()
+    except KeyboardInterrupt:
+        pass
+    except Exception:
+        logger.exception('')
+    finally:
+        if ws:
+            ws.close_connection()
index b7d610d66e729a9b9cb3a27b29a9abc51395493b..beb34545370969ea44cfc05fd5d1e9ca49483279 100644 (file)
@@ -36,14 +36,43 @@ class EventClient(WebSocketClient):
         except:
             pass
 
+class PollClient(threading.Thread):
+    def __init__(self, api, filters, on_event):
+        self.api = api
+        self.filters = filters
+        self.on_event = on_event
+        items = self.api.logs().list(limit=1, order=json.dumps(["id desc"]), filters=json.dumps(filters)).execute()['items']
+        if len(items) > 0:
+            self.id = items[0]["id"]
+        else:
+            self.id = 0
+        self.loop = True
+
+    def run_forever(self):
+        while self.loop:
+            time.sleep(15)
+            items = self.api.logs().list(limit=1, order=json.dumps(["id asc"]), filters=json.dumps(self.filters+[["id", ">", str(self.id)]])).execute()['items']
+            for i in items:
+                self.id = i['id']
+                self.on_event(i)
+
+    def close_connection(self):
+        self.loop = False
+
 def subscribe(api, filters, on_event):
     ws = None
     try:
-        url = "{}?api_token={}".format(api._rootDesc['websocketUrl'], config.get('ARVADOS_API_TOKEN'))
-        ws = EventClient(url, filters, on_event)
-        ws.connect()
+        if 'websocketUrl' in api._rootDesc:
+            url = "{}?api_token={}".format(api._rootDesc['websocketUrl'], config.get('ARVADOS_API_TOKEN'))
+            ws = EventClient(url, filters, on_event)
+            ws.connect()
+        else:
+            ws = PollClient(api, filters, on_event)
         return ws
     except Exception:
         if (ws):
           ws.close_connection()
-        raise
+        try:
+            return PollClient(api, filters, on_event)
+        except:
+            raise
index ce7f066ec7682ff8b4fe3cdeaa59e0656db68716..4e663cef1bc2bd705ed7651c3185aa743637b4b7 100755 (executable)
@@ -1,30 +1,4 @@
 #!/usr/bin/env python
 
-import sys
-import logging
-import argparse
-import arvados
-from arvados.events import subscribe
-
-logger = logging.getLogger('arvados.arv-ws')
-
-parser = argparse.ArgumentParser()
-parser.add_argument('-u', '--uuid', type=str, default="")
-args = parser.parse_args()
-
-filters = []
-if len(args.uuid)>0: filters = [ ['object_uuid', '=', args.uuid] ]
-
-api = arvados.api('v1', cache=False)
-
-def on_message(ev):
-  print "\n", ev
-
-ws = None
-try:
-  ws = subscribe(api, filters, lambda ev: on_message(ev))
-  ws.run_forever()
-except Exception:
-  logger.exception('')
-  if (ws):
-    ws.close_connection()
+from arvados.commands.ws import main
+main()