17417: Merge branch 'main' into 17417-add-arm64
[arvados.git] / sdk / cwl / arvados_cwl / arvworkflow.py
index ba60333648eabdb4d308503b3e8ac20b88ad34e1..4fe82a6fe1d6fc32f709dd909577da7010970e07 100644 (file)
@@ -17,16 +17,17 @@ from cwltool.pack import pack
 from cwltool.load_tool import fetch_document, resolve_and_validate_document
 from cwltool.process import shortname
 from cwltool.workflow import Workflow, WorkflowException, WorkflowStep
-from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
+from cwltool.utils import adjustFileObjs, adjustDirObjs, visit_class, normalizeFilesDirs
 from cwltool.context import LoadingContext
 
 import ruamel.yaml as yaml
 
 from .runner import (upload_dependencies, packed_workflow, upload_workflow_collection,
                      trim_anonymous_location, remove_redundant_fields, discover_secondary_files,
-                     make_builder)
+                     make_builder, arvados_jobs_image)
 from .pathmapper import ArvPathMapper, trim_listing
 from .arvtool import ArvadosCommandTool, set_cluster_target
+from ._version import __version__
 
 from .perf import Perf
 
@@ -37,7 +38,8 @@ max_res_pars = ("coresMin", "coresMax", "ramMin", "ramMax", "tmpdirMin", "tmpdir
 sum_res_pars = ("outdirMin", "outdirMax")
 
 def upload_workflow(arvRunner, tool, job_order, project_uuid, uuid=None,
-                    submit_runner_ram=0, name=None, merged_map=None):
+                    submit_runner_ram=0, name=None, merged_map=None,
+                    submit_runner_image=None):
 
     packed = packed_workflow(arvRunner, tool, merged_map)
 
@@ -57,18 +59,25 @@ def upload_workflow(arvRunner, tool, job_order, project_uuid, uuid=None,
     upload_dependencies(arvRunner, name, tool.doc_loader,
                         packed, tool.tool["id"], False)
 
+    wf_runner_resources = None
+
+    hints = main.get("hints", [])
+    found = False
+    for h in hints:
+        if h["class"] == "http://arvados.org/cwl#WorkflowRunnerResources":
+            wf_runner_resources = h
+            found = True
+            break
+    if not found:
+        wf_runner_resources = {"class": "http://arvados.org/cwl#WorkflowRunnerResources"}
+        hints.append(wf_runner_resources)
+
+    wf_runner_resources["acrContainerImage"] = arvados_jobs_image(arvRunner, submit_runner_image or "arvados/jobs:"+__version__)
+
     if submit_runner_ram:
-        hints = main.get("hints", [])
-        found = False
-        for h in hints:
-            if h["class"] == "http://arvados.org/cwl#WorkflowRunnerResources":
-                h["ramMin"] = submit_runner_ram
-                found = True
-                break
-        if not found:
-            hints.append({"class": "http://arvados.org/cwl#WorkflowRunnerResources",
-                          "ramMin": submit_runner_ram})
-        main["hints"] = hints
+        wf_runner_resources["ramMin"] = submit_runner_ram
+
+    main["hints"] = hints
 
     body = {
         "workflow": {
@@ -141,7 +150,8 @@ class ArvadosWorkflowStep(WorkflowStep):
         runtimeContext = runtimeContext.copy()
         runtimeContext.toplevel = True  # Preserve behavior for #13365
 
-        builder = make_builder({shortname(k): v for k,v in viewitems(joborder)}, self.hints, self.requirements, runtimeContext)
+        builder = make_builder({shortname(k): v for k,v in viewitems(joborder)}, self.hints, self.requirements,
+                               runtimeContext, self.metadata)
         runtimeContext = set_cluster_target(self.tool, self.arvrunner, builder, runtimeContext)
         return super(ArvadosWorkflowStep, self).job(joborder, output_callback, runtimeContext)
 
@@ -161,7 +171,7 @@ class ArvadosWorkflow(Workflow):
 
     def job(self, joborder, output_callback, runtimeContext):
 
-        builder = make_builder(joborder, self.hints, self.requirements, runtimeContext)
+        builder = make_builder(joborder, self.hints, self.requirements, runtimeContext, self.metadata)
         runtimeContext = set_cluster_target(self.tool, self.arvrunner, builder, runtimeContext)
 
         req, _ = self.get_requirement("http://arvados.org/cwl#RunInSingleContainer")
@@ -176,6 +186,7 @@ class ArvadosWorkflow(Workflow):
 
         discover_secondary_files(self.arvrunner.fs_access, builder,
                                  self.tool["inputs"], joborder)
+        normalizeFilesDirs(joborder)
 
         with Perf(metrics, "subworkflow upload_deps"):
             upload_dependencies(self.arvrunner,
@@ -186,7 +197,7 @@ class ArvadosWorkflow(Workflow):
                                 False)
 
             if self.wf_pdh is None:
-                packed = pack(self.doc_loader, self.tool, self.tool["id"], self.metadata)
+                packed = pack(self.loadingContext, self.tool["id"], loader=self.doc_loader)
 
                 for p in packed["$graph"]:
                     if p["id"] == "#main":