X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/88a29cd091468feb98e5cd541c560f4d35bca716..d0dd4abf2e364ad94c3bb8ad227faee28edda153:/sdk/cwl/arvados_cwl/__init__.py diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py index 4701b4d8f1..e4f5ceab74 100644 --- a/sdk/cwl/arvados_cwl/__init__.py +++ b/sdk/cwl/arvados_cwl/__init__.py @@ -23,6 +23,7 @@ import cwltool.main import cwltool.workflow import cwltool.process from schema_salad.sourceline import SourceLine +import schema_salad.validate as validate import arvados import arvados.config @@ -42,7 +43,7 @@ from ._version import __version__ from cwltool.pack import pack from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing -from cwltool.draft2tool import compute_checksums +from cwltool.command_line_tool import compute_checksums from arvados.api import OrderedJsonModel logger = logging.getLogger('arvados.cwl-runner') @@ -53,6 +54,8 @@ arvados.log_handler.setFormatter(logging.Formatter( '%(asctime)s %(name)s %(levelname)s: %(message)s', '%Y-%m-%d %H:%M:%S')) +DEFAULT_PRIORITY = 500 + class ArvCwlRunner(object): """Execute a CWL tool or workflow, submit work (using either jobs or containers API), wait for them to complete, and report output. @@ -234,6 +237,8 @@ class ArvCwlRunner(object): if not obj.get("dockerOutputDirectory").startswith('/'): raise SourceLine(obj, "dockerOutputDirectory", validate.ValidationException).makeError( "Option 'dockerOutputDirectory' must be an absolute path.") + if obj.get("class") == "http://commonwl.org/cwltool#Secrets" and self.work_api != "containers": + raise SourceLine(obj, "class", UnsupportedRequirement).makeError("Secrets not supported with --api=jobs") for v in obj.itervalues(): self.check_features(v) elif isinstance(obj, list): @@ -351,7 +356,7 @@ class ArvCwlRunner(object): make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess, collection_cache=self.collection_cache) self.fs_access = make_fs_access(kwargs["basedir"]) - + self.secret_store = kwargs.get("secret_store") self.trash_intermediate = kwargs["trash_intermediate"] if self.trash_intermediate and self.work_api != "containers": @@ -415,16 +420,21 @@ class ArvCwlRunner(object): if self.work_api == "containers": if self.ignore_docker_for_reuse: - raise validate.ValidationException("--ignore-docker-for-reuse not supported with containers API.") + raise Exception("--ignore-docker-for-reuse not supported with containers API.") kwargs["outdir"] = "/var/spool/cwl" kwargs["docker_outdir"] = "/var/spool/cwl" kwargs["tmpdir"] = "/tmp" kwargs["docker_tmpdir"] = "/tmp" elif self.work_api == "jobs": + if kwargs["priority"] != DEFAULT_PRIORITY: + raise Exception("--priority not implemented for jobs API.") kwargs["outdir"] = "$(task.outdir)" kwargs["docker_outdir"] = "$(task.outdir)" kwargs["tmpdir"] = "$(task.tmpdir)" + if kwargs["priority"] < 1 or kwargs["priority"] > 1000: + raise Exception("--priority must be in the range 1..1000.") + runnerjob = None if kwargs.get("submit"): # Submit a runner job to run the workflow for us. @@ -443,7 +453,9 @@ class ArvCwlRunner(object): on_error=kwargs.get("on_error"), submit_runner_image=kwargs.get("submit_runner_image"), intermediate_output_ttl=kwargs.get("intermediate_output_ttl"), - merged_map=merged_map) + merged_map=merged_map, + priority=kwargs.get("priority"), + secret_store=self.secret_store) elif self.work_api == "jobs": runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name, @@ -581,7 +593,7 @@ def arg_parser(): # type: () -> argparse.ArgumentParser exgroup = parser.add_mutually_exclusive_group() exgroup.add_argument("--print-dot", action="store_true", help="Print workflow visualization in graphviz format and exit") - exgroup.add_argument("--version", action="store_true", help="Print version and exit") + exgroup.add_argument("--version", action="version", help="Print version and exit", version=versionstring()) exgroup.add_argument("--validate", action="store_true", help="Validate CWL document only.") exgroup = parser.add_mutually_exclusive_group() @@ -663,6 +675,10 @@ def arg_parser(): # type: () -> argparse.ArgumentParser help="If N > 0, intermediate output collections will be trashed N seconds after creation. Default is 0 (don't trash).", default=0) + parser.add_argument("--priority", type=int, + help="Workflow priority (range 1..1000, higher has precedence over lower, containers api only)", + default=DEFAULT_PRIORITY) + exgroup = parser.add_mutually_exclusive_group() exgroup.add_argument("--trash-intermediate", action="store_true", default=False, dest="trash_intermediate", @@ -671,14 +687,14 @@ def arg_parser(): # type: () -> argparse.ArgumentParser default=False, dest="trash_intermediate", help="Do not trash intermediate outputs (default).") - parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute") + parser.add_argument("workflow", type=str, default=None, help="The workflow to execute") parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.") return parser def add_arv_hints(): - cwltool.draft2tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*") - cwltool.draft2tool.ACCEPTLIST_RE = cwltool.draft2tool.ACCEPTLIST_EN_RELAXED_RE + cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*") + cwltool.command_line_tool.ACCEPTLIST_RE = cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml') use_custom_schema("v1.0", "http://arvados.org/cwl", res.read()) res.close() @@ -699,10 +715,6 @@ def main(args, stdout, stderr, api_client=None, keep_client=None): job_order_object = None arvargs = parser.parse_args(args) - if arvargs.version: - print versionstring() - return - if arvargs.update_workflow: if arvargs.update_workflow.find('-7fd4e-') == 5: want_api = 'containers'