Merge branch '13078-badconstraints'
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
index 628b6aea69913da1944abea1d886a14d63db3d7a..aee928d0e3d7ce107320005ef28c2cc712be948f 100644 (file)
@@ -23,6 +23,7 @@ import cwltool.main
 import cwltool.workflow
 import cwltool.process
 from schema_salad.sourceline import SourceLine
+import schema_salad.validate as validate
 
 import arvados
 import arvados.config
@@ -53,6 +54,8 @@ arvados.log_handler.setFormatter(logging.Formatter(
         '%(asctime)s %(name)s %(levelname)s: %(message)s',
         '%Y-%m-%d %H:%M:%S'))
 
+DEFAULT_PRIORITY = 500
+
 class ArvCwlRunner(object):
     """Execute a CWL tool or workflow, submit work (using either jobs or
     containers API), wait for them to complete, and report output.
@@ -415,16 +418,21 @@ class ArvCwlRunner(object):
 
         if self.work_api == "containers":
             if self.ignore_docker_for_reuse:
-                raise validate.ValidationException("--ignore-docker-for-reuse not supported with containers API.")
+                raise Exception("--ignore-docker-for-reuse not supported with containers API.")
             kwargs["outdir"] = "/var/spool/cwl"
             kwargs["docker_outdir"] = "/var/spool/cwl"
             kwargs["tmpdir"] = "/tmp"
             kwargs["docker_tmpdir"] = "/tmp"
         elif self.work_api == "jobs":
+            if kwargs["priority"] != DEFAULT_PRIORITY:
+                raise Exception("--priority not implemented for jobs API.")
             kwargs["outdir"] = "$(task.outdir)"
             kwargs["docker_outdir"] = "$(task.outdir)"
             kwargs["tmpdir"] = "$(task.tmpdir)"
 
+        if kwargs["priority"] < 1 or kwargs["priority"] > 1000:
+            raise Exception("--priority must be in the range 1..1000.")
+
         runnerjob = None
         if kwargs.get("submit"):
             # Submit a runner job to run the workflow for us.
@@ -443,7 +451,8 @@ class ArvCwlRunner(object):
                                                 on_error=kwargs.get("on_error"),
                                                 submit_runner_image=kwargs.get("submit_runner_image"),
                                                 intermediate_output_ttl=kwargs.get("intermediate_output_ttl"),
-                                                merged_map=merged_map)
+                                                merged_map=merged_map,
+                                                priority=kwargs.get("priority"))
             elif self.work_api == "jobs":
                 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"),
                                       self.output_name,
@@ -581,7 +590,7 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
     exgroup = parser.add_mutually_exclusive_group()
     exgroup.add_argument("--print-dot", action="store_true",
                          help="Print workflow visualization in graphviz format and exit")
-    exgroup.add_argument("--version", action="store_true", help="Print version and exit")
+    exgroup.add_argument("--version", action="version", help="Print version and exit", version=versionstring())
     exgroup.add_argument("--validate", action="store_true", help="Validate CWL document only.")
 
     exgroup = parser.add_mutually_exclusive_group()
@@ -663,6 +672,10 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
                         help="If N > 0, intermediate output collections will be trashed N seconds after creation.  Default is 0 (don't trash).",
                         default=0)
 
+    parser.add_argument("--priority", type=int,
+                        help="Workflow priority (range 1..1000, higher has precedence over lower, containers api only)",
+                        default=DEFAULT_PRIORITY)
+
     exgroup = parser.add_mutually_exclusive_group()
     exgroup.add_argument("--trash-intermediate", action="store_true",
                         default=False, dest="trash_intermediate",
@@ -671,7 +684,7 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
                         default=False, dest="trash_intermediate",
                         help="Do not trash intermediate outputs (default).")
 
-    parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
+    parser.add_argument("workflow", type=str, default=None, help="The workflow to execute")
     parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
 
     return parser
@@ -699,10 +712,6 @@ def main(args, stdout, stderr, api_client=None, keep_client=None):
     job_order_object = None
     arvargs = parser.parse_args(args)
 
-    if arvargs.version:
-        print versionstring()
-        return
-
     if arvargs.update_workflow:
         if arvargs.update_workflow.find('-7fd4e-') == 5:
             want_api = 'containers'