X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/f02109541ae4f462c4d1838d78ad06cf318098ce..3e5e5fa67a8f548397bda99449c7f7b19fadc641:/sdk/python/arvados/commands/ws.py diff --git a/sdk/python/arvados/commands/ws.py b/sdk/python/arvados/commands/ws.py index d399eb5d38..b6ad029ee8 100644 --- a/sdk/python/arvados/commands/ws.py +++ b/sdk/python/arvados/commands/ws.py @@ -5,9 +5,9 @@ import logging import argparse import arvados import json -import time from arvados.events import subscribe import signal +import subprocess def main(arguments=None): logger = logging.getLogger('arvados.arv-ws') @@ -23,6 +23,9 @@ def main(arguments=None): group = parser.add_mutually_exclusive_group() group.add_argument('-p', '--pipeline', type=str, default="", help="Supply pipeline uuid, print log output from pipeline and its jobs") group.add_argument('-j', '--job', type=str, default="", help="Supply job uuid, print log output from jobs") + group.add_argument('--project', type=str, default="", help="Monitor change events for a specific project") + + parser.add_argument('--command', type=str, default="", help="Command to run when project event occurs") args = parser.parse_args(arguments) @@ -72,20 +75,48 @@ def main(arguments=None): elif ev["event_type"] in ("create", "update"): if ev["object_kind"] == "arvados#pipelineInstance": update_subscribed_components(ev["properties"]["new_attributes"]["components"]) - elif 'status' in ev and ev['status'] == 200: - pass - else: + + if ev["object_kind"] == "arvados#pipelineInstance" and args.pipeline: + if ev["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Paused"): + ws.close() + + if ev["object_kind"] == "arvados#job" and args.job: + if ev["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"): + ws.close() + return + + if 'status' in ev and ev['status'] == 200: + return + + if args.project: + old_attr = None + if 'old_attributes' in ev['properties'] and ev['properties']['old_attributes']: + old_attr = ev['properties']['old_attributes'] + if args.project not in (ev['properties']['new_attributes']['owner_uuid'], + old_attr['owner_uuid'] if old_attr else None): + return + + if args.command: + et = ev['event_type'] + if ev['event_type'] == 'update' and ev['properties']['new_attributes']['owner_uuid'] != ev['properties']['old_attributes']['owner_uuid']: + if args.project == ev['properties']['new_attributes']['owner_uuid']: + et = 'add' + else: + et = 'remove' + subprocess.call([args.command, args.project, et, ev['object_uuid']]) + return + print json.dumps(ev) try: - ws = subscribe(api, filters, on_message, poll_fallback=args.poll_interval) + ws = subscribe(arvados.api('v1', cache=False), filters, on_message, poll_fallback=args.poll_interval) if ws: if args.pipeline: c = api.pipeline_instances().get(uuid=args.pipeline).execute() update_subscribed_components(c["components"]) - - while True: - signal.pause() + if c["state"] in ("Complete", "Failed", "Paused"): + ws.close() + ws.run_forever() except KeyboardInterrupt: pass except Exception as e: