from cwltool.load_tool import fetch_document, resolve_and_validate_document
from cwltool.process import shortname
from cwltool.workflow import Workflow, WorkflowException, WorkflowStep
-from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
+from cwltool.utils import adjustFileObjs, adjustDirObjs, visit_class
from cwltool.context import LoadingContext
import ruamel.yaml as yaml
from .runner import (upload_dependencies, packed_workflow, upload_workflow_collection,
trim_anonymous_location, remove_redundant_fields, discover_secondary_files,
- make_builder)
+ make_builder, arvados_jobs_image)
from .pathmapper import ArvPathMapper, trim_listing
from .arvtool import ArvadosCommandTool, set_cluster_target
+from ._version import __version__
from .perf import Perf
sum_res_pars = ("outdirMin", "outdirMax")
def upload_workflow(arvRunner, tool, job_order, project_uuid, uuid=None,
- submit_runner_ram=0, name=None, merged_map=None):
+ submit_runner_ram=0, name=None, merged_map=None,
+ submit_runner_image=None):
packed = packed_workflow(arvRunner, tool, merged_map)
upload_dependencies(arvRunner, name, tool.doc_loader,
packed, tool.tool["id"], False)
+ wf_runner_resources = None
+
+ hints = main.get("hints", [])
+ found = False
+ for h in hints:
+ if h["class"] == "http://arvados.org/cwl#WorkflowRunnerResources":
+ wf_runner_resources = h
+ found = True
+ break
+ if not found:
+ wf_runner_resources = {"class": "http://arvados.org/cwl#WorkflowRunnerResources"}
+ hints.append(wf_runner_resources)
+
+ wf_runner_resources["acrContainerImage"] = arvados_jobs_image(arvRunner, submit_runner_image or "arvados/jobs:"+__version__)
+
if submit_runner_ram:
- hints = main.get("hints", [])
- found = False
- for h in hints:
- if h["class"] == "http://arvados.org/cwl#WorkflowRunnerResources":
- h["ramMin"] = submit_runner_ram
- found = True
- break
- if not found:
- hints.append({"class": "http://arvados.org/cwl#WorkflowRunnerResources",
- "ramMin": submit_runner_ram})
- main["hints"] = hints
+ wf_runner_resources["ramMin"] = submit_runner_ram
+
+ main["hints"] = hints
body = {
"workflow": {
runtimeContext = runtimeContext.copy()
runtimeContext.toplevel = True # Preserve behavior for #13365
- builder = make_builder({shortname(k): v for k,v in viewitems(joborder)}, self.hints, self.requirements, runtimeContext)
+ builder = make_builder({shortname(k): v for k,v in viewitems(joborder)}, self.hints, self.requirements,
+ runtimeContext, self.metadata)
runtimeContext = set_cluster_target(self.tool, self.arvrunner, builder, runtimeContext)
return super(ArvadosWorkflowStep, self).job(joborder, output_callback, runtimeContext)
def job(self, joborder, output_callback, runtimeContext):
- builder = make_builder(joborder, self.hints, self.requirements, runtimeContext)
+ builder = make_builder(joborder, self.hints, self.requirements, runtimeContext, self.metadata)
runtimeContext = set_cluster_target(self.tool, self.arvrunner, builder, runtimeContext)
req, _ = self.get_requirement("http://arvados.org/cwl#RunInSingleContainer")