Merge branch '21535-multi-wf-delete'
[arvados.git] / sdk / python / arvados / commands / ws.py
index aa7dd05b23314722daa08015afb5d91f834f3445..3508682399b84850f71719366502ee431429f20b 100644 (file)
@@ -1,17 +1,22 @@
-from __future__ import print_function
-import sys
-import logging
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
 import argparse
-import arvados
 import json
+import logging
+import signal
+import sys
+
+import arvados
 from arvados.events import subscribe
 from arvados._version import __version__
-import signal
+from . import _util as arv_cmd
 
 def main(arguments=None):
     logger = logging.getLogger('arvados.arv-ws')
 
-    parser = argparse.ArgumentParser()
+    parser = argparse.ArgumentParser(parents=[arv_cmd.retry_opt])
     parser.add_argument('--version', action='version',
                         version="%s %s" % (sys.argv[0], __version__),
                         help='Print version and exit.')
@@ -46,12 +51,13 @@ def main(arguments=None):
             if "job" in components[c]:
                 pipeline_jobs.add(components[c]["job"]["uuid"])
         if known_component_jobs != pipeline_jobs:
+            new_filters = [['object_uuid', 'in', [args.pipeline] + list(pipeline_jobs)]]
+            ws.subscribe(new_filters)
             ws.unsubscribe(filters)
-            filters = [['object_uuid', 'in', [args.pipeline] + list(pipeline_jobs)]]
-            ws.subscribe([['object_uuid', 'in', [args.pipeline] + list(pipeline_jobs)]])
+            filters = new_filters
             known_component_jobs = pipeline_jobs
 
-    api = arvados.api('v1')
+    api = arvados.api('v1', num_retries=args.retries)
 
     if args.uuid:
         filters += [ ['object_uuid', '=', args.uuid] ]
@@ -84,7 +90,8 @@ def main(arguments=None):
                 sys.stdout.write(ev["properties"]["text"])
             elif ev["event_type"] in ("create", "update"):
                 if ev["object_kind"] == "arvados#pipelineInstance":
-                    update_subscribed_components(ev["properties"]["new_attributes"]["components"])
+                    c = api.pipeline_instances().get(uuid=ev["object_uuid"]).execute()
+                    update_subscribed_components(c["components"])
 
                 if ev["object_kind"] == "arvados#pipelineInstance" and args.pipeline:
                     if ev["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Paused"):