From cf182ddd634cc754c5444ad4291f6027fbf07acf Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Thu, 19 Oct 2023 14:44:52 -0400 Subject: [PATCH] Merge branch '20825-cwl-separate-runner' refs #20825 Arvados-DCO-1.1-Signed-off-by: Peter Amstutz --- sdk/cwl/arvados_cwl/arv-cwl-schema-v1.2.yml | 19 ++++++ sdk/cwl/arvados_cwl/arvcontainer.py | 4 +- sdk/cwl/arvados_cwl/arvworkflow.py | 59 ++++++++++++++---- sdk/cwl/arvados_cwl/context.py | 1 + sdk/cwl/arvados_cwl/executor.py | 5 +- sdk/cwl/arvados_cwl/runner.py | 4 +- sdk/cwl/tests/arvados-tests.yml | 6 ++ sdk/cwl/tests/wf/runseparate-wf.cwl | 68 +++++++++++++++++++++ 8 files changed, 152 insertions(+), 14 deletions(-) create mode 100644 sdk/cwl/tests/wf/runseparate-wf.cwl diff --git a/sdk/cwl/arvados_cwl/arv-cwl-schema-v1.2.yml b/sdk/cwl/arvados_cwl/arv-cwl-schema-v1.2.yml index f4246ed70a..389add4104 100644 --- a/sdk/cwl/arvados_cwl/arv-cwl-schema-v1.2.yml +++ b/sdk/cwl/arvados_cwl/arv-cwl-schema-v1.2.yml @@ -429,3 +429,22 @@ $graph: doc: | If the container failed on its first run, re-submit the container with the RAM request multiplied by this factor. + +- name: SeparateRunner + type: record + extends: cwl:ProcessRequirement + inVocab: false + doc: | + Indicates that a subworkflow should run in a separate + arvados-cwl-runner process. + fields: + - name: class + type: string + doc: "Always 'arv:SeparateRunner'" + jsonldPredicate: + _id: "@type" + _type: "@vocab" + - name: runnerProcessName + type: ['null', string, cwl:Expression] + doc: | + Custom name to use for the runner process diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py index ea7c9f7a33..6e3e42975e 100644 --- a/sdk/cwl/arvados_cwl/arvcontainer.py +++ b/sdk/cwl/arvados_cwl/arvcontainer.py @@ -593,7 +593,7 @@ class RunnerContainer(Runner): "ram": 1024*1024 * (math.ceil(self.submit_runner_ram) + math.ceil(self.collection_cache_size)), "API": True }, - "use_existing": False, # Never reuse the runner container - see #15497. + "use_existing": self.reuse_runner, "properties": {} } @@ -617,6 +617,8 @@ class RunnerContainer(Runner): "content": packed } container_req["properties"]["template_uuid"] = self.embedded_tool.tool["id"][6:33] + elif self.embedded_tool.tool.get("id", "").startswith("file:"): + raise WorkflowException("Tool id '%s' is a local file but expected keep: or arvwf:" % self.embedded_tool.tool.get("id")) else: main = self.loadingContext.loader.idx["_:main"] if main.get("id") == "_:main": diff --git a/sdk/cwl/arvados_cwl/arvworkflow.py b/sdk/cwl/arvados_cwl/arvworkflow.py index 3ad2c6419a..c592b83dc7 100644 --- a/sdk/cwl/arvados_cwl/arvworkflow.py +++ b/sdk/cwl/arvados_cwl/arvworkflow.py @@ -38,6 +38,7 @@ import ruamel.yaml as yaml from .runner import (upload_dependencies, packed_workflow, upload_workflow_collection, trim_anonymous_location, remove_redundant_fields, discover_secondary_files, make_builder, arvados_jobs_image, FileUpdates) +from .arvcontainer import RunnerContainer from .pathmapper import ArvPathMapper, trim_listing from .arvtool import ArvadosCommandTool, set_cluster_target from ._version import __version__ @@ -147,7 +148,7 @@ def make_wrapper_workflow(arvRunner, main, packed, project_uuid, name, git_info, def rel_ref(s, baseuri, urlexpander, merged_map, jobmapper): - if s.startswith("keep:"): + if s.startswith("keep:") or s.startswith("arvwf:"): return s uri = urlexpander(s, baseuri) @@ -616,17 +617,8 @@ class ArvadosWorkflow(Workflow): super(ArvadosWorkflow, self).__init__(toolpath_object, self.loadingContext) self.cluster_target_req, _ = self.get_requirement("http://arvados.org/cwl#ClusterTarget") - def job(self, joborder, output_callback, runtimeContext): - - builder = make_builder(joborder, self.hints, self.requirements, runtimeContext, self.metadata) - runtimeContext = set_cluster_target(self.tool, self.arvrunner, builder, runtimeContext) - - req, _ = self.get_requirement("http://arvados.org/cwl#RunInSingleContainer") - if not req: - return super(ArvadosWorkflow, self).job(joborder, output_callback, runtimeContext) - - # RunInSingleContainer is true + def runInSingleContainer(self, joborder, output_callback, runtimeContext, builder): with SourceLine(self.tool, None, WorkflowException, logger.isEnabledFor(logging.DEBUG)): if "id" not in self.tool: raise WorkflowException("%s object must have 'id'" % (self.tool["class"])) @@ -789,6 +781,51 @@ class ArvadosWorkflow(Workflow): }) return ArvadosCommandTool(self.arvrunner, wf_runner, self.loadingContext).job(joborder_resolved, output_callback, runtimeContext) + + def separateRunner(self, joborder, output_callback, runtimeContext, req, builder): + + name = runtimeContext.name + + rpn = req.get("runnerProcessName") + if rpn: + name = builder.do_eval(rpn) + + return RunnerContainer(self.arvrunner, + self, + self.loadingContext, + runtimeContext.enable_reuse, + None, + None, + submit_runner_ram=runtimeContext.submit_runner_ram, + name=name, + on_error=runtimeContext.on_error, + submit_runner_image=runtimeContext.submit_runner_image, + intermediate_output_ttl=runtimeContext.intermediate_output_ttl, + merged_map=None, + priority=runtimeContext.priority, + secret_store=self.arvrunner.secret_store, + collection_cache_size=runtimeContext.collection_cache_size, + collection_cache_is_default=self.arvrunner.should_estimate_cache_size, + git_info=runtimeContext.git_info, + reuse_runner=True).job(joborder, output_callback, runtimeContext) + + + def job(self, joborder, output_callback, runtimeContext): + + builder = make_builder(joborder, self.hints, self.requirements, runtimeContext, self.metadata) + runtimeContext = set_cluster_target(self.tool, self.arvrunner, builder, runtimeContext) + + req, _ = self.get_requirement("http://arvados.org/cwl#RunInSingleContainer") + if req: + return self.runInSingleContainer(joborder, output_callback, runtimeContext, builder) + + req, _ = self.get_requirement("http://arvados.org/cwl#SeparateRunner") + if req: + return self.separateRunner(joborder, output_callback, runtimeContext, req, builder) + + return super(ArvadosWorkflow, self).job(joborder, output_callback, runtimeContext) + + def make_workflow_step(self, toolpath_object, # type: Dict[Text, Any] pos, # type: int diff --git a/sdk/cwl/arvados_cwl/context.py b/sdk/cwl/arvados_cwl/context.py index dd64879b9f..0439cb5b15 100644 --- a/sdk/cwl/arvados_cwl/context.py +++ b/sdk/cwl/arvados_cwl/context.py @@ -45,6 +45,7 @@ class ArvRuntimeContext(RuntimeContext): self.prefer_cached_downloads = False self.cached_docker_lookups = {} self.print_keep_deps = False + self.git_info = {} super(ArvRuntimeContext, self).__init__(kwargs) diff --git a/sdk/cwl/arvados_cwl/executor.py b/sdk/cwl/arvados_cwl/executor.py index 677e10d265..2db6a9bfe2 100644 --- a/sdk/cwl/arvados_cwl/executor.py +++ b/sdk/cwl/arvados_cwl/executor.py @@ -603,6 +603,8 @@ The 'jobs' API is no longer supported. if git_info[g]: logger.info(" %s: %s", g.split("#", 1)[1], git_info[g]) + runtimeContext.git_info = git_info + workbench1 = self.api.config()["Services"]["Workbench1"]["ExternalURL"] workbench2 = self.api.config()["Services"]["Workbench2"]["ExternalURL"] controller = self.api.config()["Services"]["Controller"]["ExternalURL"] @@ -874,7 +876,8 @@ The 'jobs' API is no longer supported. if (self.task_queue.in_flight + len(self.processes)) > 0: self.workflow_eval_lock.wait(3) else: - logger.error("Workflow is deadlocked, no runnable processes and not waiting on any pending processes.") + if self.final_status is None: + logger.error("Workflow is deadlocked, no runnable processes and not waiting on any pending processes.") break if self.stop_polling.is_set(): diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py index f6aab4b93f..f52768d3d3 100644 --- a/sdk/cwl/arvados_cwl/runner.py +++ b/sdk/cwl/arvados_cwl/runner.py @@ -828,7 +828,8 @@ class Runner(Process): priority=None, secret_store=None, collection_cache_size=256, collection_cache_is_default=True, - git_info=None): + git_info=None, + reuse_runner=False): self.loadingContext = loadingContext.copy() @@ -861,6 +862,7 @@ class Runner(Process): self.enable_dev = self.loadingContext.enable_dev self.git_info = git_info self.fast_parser = self.loadingContext.fast_parser + self.reuse_runner = reuse_runner self.submit_runner_cores = 1 self.submit_runner_ram = 1024 # defaut 1 GiB diff --git a/sdk/cwl/tests/arvados-tests.yml b/sdk/cwl/tests/arvados-tests.yml index a93c64a224..e0bdd8a5a3 100644 --- a/sdk/cwl/tests/arvados-tests.yml +++ b/sdk/cwl/tests/arvados-tests.yml @@ -494,3 +494,9 @@ output: {} tool: oom/19975-oom3.cwl doc: "Test feature 19975 - retry on custom error" + +- job: null + output: + out: out + tool: wf/runseparate-wf.cwl + doc: "test arv:SeparateRunner" diff --git a/sdk/cwl/tests/wf/runseparate-wf.cwl b/sdk/cwl/tests/wf/runseparate-wf.cwl new file mode 100644 index 0000000000..e4ab627256 --- /dev/null +++ b/sdk/cwl/tests/wf/runseparate-wf.cwl @@ -0,0 +1,68 @@ +# Copyright (C) The Arvados Authors. All rights reserved. +# +# SPDX-License-Identifier: Apache-2.0 + +class: Workflow +cwlVersion: v1.0 +$namespaces: + arv: "http://arvados.org/cwl#" +inputs: + sleeptime: + type: int + default: 5 + fileblub: + type: File + default: + class: File + location: keep:d7514270f356df848477718d58308cc4+94/a + secondaryFiles: + - class: File + location: keep:d7514270f356df848477718d58308cc4+94/b +outputs: + out: + type: string + outputSource: substep/out +requirements: + SubworkflowFeatureRequirement: {} + ScatterFeatureRequirement: {} + InlineJavascriptRequirement: {} + StepInputExpressionRequirement: {} +steps: + substep: + in: + sleeptime: sleeptime + fileblub: fileblub + out: [out] + hints: + - class: arv:SeparateRunner + runnerProcessName: $("sleeptime "+inputs.sleeptime) + - class: DockerRequirement + dockerPull: arvados/jobs:2.2.2 + run: + class: Workflow + id: mysub + inputs: + fileblub: File + sleeptime: int + outputs: + out: + type: string + outputSource: sleep1/out + steps: + sleep1: + in: + fileblub: fileblub + out: [out] + run: + class: CommandLineTool + id: subtool + inputs: + fileblub: + type: File + inputBinding: {position: 1} + outputs: + out: + type: string + outputBinding: + outputEval: 'out' + baseCommand: cat -- 2.30.2