parser = argparse.ArgumentParser()
parser.add_argument('-u', '--uuid', type=str, default="")
parser.add_argument('-f', '--filters', type=str, default="")
- parser.add_argument('-p', '--pipeline', type=str, default="", help="Print log output from a pipeline and its jobs")
- parser.add_argument('-j', '--job', type=str, default="", help="Print log output from a job")
+
+ group = parser.add_argument_group('group')
+ group.add_argument('-p', '--pipeline', type=str, default="", help="Print log output from a pipeline and its jobs")
+ group.add_argument('-j', '--job', type=str, default="", help="Print log output from a job")
+
args = parser.parse_args(arguments)
filters = []
filters += [ ['object_uuid', '=', args.pipeline] ]
if args.job:
- filters += [ ['object_uuid', '=', args.job] ]
+ filters += [ ['object_uuid', '=', args.job] ], ['event_type', 'in', ['stderr', 'stdout'] ]
api = arvados.api('v1', cache=False)
+ known_component_jobs = set()
def on_message(ev):
- print json.dumps(ev)
+ if args.pipeline or args.job:
+ if ev['event_type'] in ('stderr', 'stdout'):
+ print x["properties"]["text"]
+ elif x["event_type"] in ("create", "update"):
+ if args.job or x["object_kind"] == "arvados#pipeline_instance":
+ if x["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
+ ws.close_connection()
+ if x["object_kind"] == "arvados#pipeline_instance":
+ pipeline_jobs = set()
+ for c in x["properties"]["new_attributes"]["components"]:
+ if "job" in x["properties"]["new_attributes"]["components"][c]:
+ pipeline_jobs.add(x["properties"]["new_attributes"]["components"][c]["job"]["uuid"])
+ if known_component_jobs != pipeline_jobs:
+ ws.unsubscribe(filters)
+ filters = [['object_uuid', 'in', [args.pipeline] + list(pipeline_jobs)]]
+ ws.subscribe([['object_uuid', 'in', [args.pipeline] + list(pipeline_jobs)]])
+ known_component_jobs = pipeline_jobs
+ else:
+ print json.dumps(ev)
ws = None
try:
from ws4py.client.threadedclient import WebSocketClient
-import thread
+import threading
import json
import os
import time
ssl_options={'cert_reqs': ssl.CERT_REQUIRED}
super(EventClient, self).__init__(url, ssl_options)
- self.filters = filters
+ self.filters = []
self.on_event = on_event
def opened(self):
- self.send(json.dumps({"method": "subscribe", "filters": self.filters}))
+ self.subscribe(self.filters)
def received_message(self, m):
self.on_event(json.loads(str(m)))
except:
pass
+ def subscribe(self, filters, last_log_id=None):
+ m = {"method": "subscribe", "filters": self.filters}
+ if last_log_id is not None:
+ m["last_log_id"] = last_log_id
+ self.send(json.dumps(m))
+
+ def unsubscribe(self, filters):
+ self.send(json.dumps({"method": "unsubscribe", "filters": self.filters}))
+
class PollClient(threading.Thread):
def __init__(self, api, filters, on_event):
self.api = api
def close_connection(self):
self.loop = False
+ def subscribe(self, filters):
+ self.filters += filters
+
+ def unsubscribe(self, filters):
+ del self.filters[self.filters.index(filters)]
+
def subscribe(api, filters, on_event):
ws = None
try:
ws = EventClient(url, filters, on_event)
ws.connect()
else:
+ _logger.info("Web sockets not available, falling back to log table polling")
ws = PollClient(api, filters, on_event)
return ws
except Exception: