18870: Need to declare NODES as array
[arvados.git] / sdk / cwl / arvados_cwl / arvworkflow.py
index 97c5fafe792fc06ce099e6a9bc6934671ace580d..51e7cd8b9e52a2d8ffcc0f890e60f30f6810c359 100644 (file)
@@ -17,16 +17,17 @@ from cwltool.pack import pack
 from cwltool.load_tool import fetch_document, resolve_and_validate_document
 from cwltool.process import shortname
 from cwltool.workflow import Workflow, WorkflowException, WorkflowStep
-from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
+from cwltool.utils import adjustFileObjs, adjustDirObjs, visit_class, normalizeFilesDirs
 from cwltool.context import LoadingContext
 
 import ruamel.yaml as yaml
 
 from .runner import (upload_dependencies, packed_workflow, upload_workflow_collection,
                      trim_anonymous_location, remove_redundant_fields, discover_secondary_files,
-                     make_builder)
+                     make_builder, arvados_jobs_image)
 from .pathmapper import ArvPathMapper, trim_listing
 from .arvtool import ArvadosCommandTool, set_cluster_target
+from ._version import __version__
 
 from .perf import Perf
 
@@ -36,10 +37,12 @@ metrics = logging.getLogger('arvados.cwl-runner.metrics')
 max_res_pars = ("coresMin", "coresMax", "ramMin", "ramMax", "tmpdirMin", "tmpdirMax")
 sum_res_pars = ("outdirMin", "outdirMax")
 
-def upload_workflow(arvRunner, tool, job_order, project_uuid, uuid=None,
-                    submit_runner_ram=0, name=None, merged_map=None):
+def upload_workflow(arvRunner, tool, job_order, project_uuid,
+                    runtimeContext, uuid=None,
+                    submit_runner_ram=0, name=None, merged_map=None,
+                    submit_runner_image=None):
 
-    packed = packed_workflow(arvRunner, tool, merged_map)
+    packed = packed_workflow(arvRunner, tool, merged_map, runtimeContext)
 
     adjustDirObjs(job_order, trim_listing)
     adjustFileObjs(job_order, trim_anonymous_location)
@@ -55,20 +58,30 @@ def upload_workflow(arvRunner, tool, job_order, project_uuid, uuid=None,
         name = tool.tool.get("label", os.path.basename(tool.tool["id"]))
 
     upload_dependencies(arvRunner, name, tool.doc_loader,
-                        packed, tool.tool["id"], False)
+                        packed, tool.tool["id"], False,
+                        runtimeContext)
+
+    wf_runner_resources = None
+
+    hints = main.get("hints", [])
+    found = False
+    for h in hints:
+        if h["class"] == "http://arvados.org/cwl#WorkflowRunnerResources":
+            wf_runner_resources = h
+            found = True
+            break
+    if not found:
+        wf_runner_resources = {"class": "http://arvados.org/cwl#WorkflowRunnerResources"}
+        hints.append(wf_runner_resources)
+
+    wf_runner_resources["acrContainerImage"] = arvados_jobs_image(arvRunner,
+                                                                  submit_runner_image or "arvados/jobs:"+__version__,
+                                                                  runtimeContext)
 
     if submit_runner_ram:
-        hints = main.get("hints", [])
-        found = False
-        for h in hints:
-            if h["class"] == "http://arvados.org/cwl#WorkflowRunnerResources":
-                h["ramMin"] = submit_runner_ram
-                found = True
-                break
-        if not found:
-            hints.append({"class": "http://arvados.org/cwl#WorkflowRunnerResources",
-                          "ramMin": submit_runner_ram})
-        main["hints"] = hints
+        wf_runner_resources["ramMin"] = submit_runner_ram
+
+    main["hints"] = hints
 
     body = {
         "workflow": {
@@ -177,6 +190,7 @@ class ArvadosWorkflow(Workflow):
 
         discover_secondary_files(self.arvrunner.fs_access, builder,
                                  self.tool["inputs"], joborder)
+        normalizeFilesDirs(joborder)
 
         with Perf(metrics, "subworkflow upload_deps"):
             upload_dependencies(self.arvrunner,
@@ -184,7 +198,8 @@ class ArvadosWorkflow(Workflow):
                                 self.doc_loader,
                                 joborder,
                                 joborder.get("id", "#"),
-                                False)
+                                False,
+                                runtimeContext)
 
             if self.wf_pdh is None:
                 packed = pack(self.loadingContext, self.tool["id"], loader=self.doc_loader)
@@ -227,7 +242,8 @@ class ArvadosWorkflow(Workflow):
                                     self.doc_loader,
                                     packed,
                                     self.tool["id"],
-                                    False)
+                                    False,
+                                    runtimeContext)
 
                 # Discover files/directories referenced by the
                 # workflow (mainly "default" values)
@@ -291,7 +307,7 @@ class ArvadosWorkflow(Workflow):
             if self.wf_pdh is None:
                 adjustFileObjs(packed, keepmount)
                 adjustDirObjs(packed, keepmount)
-                self.wf_pdh = upload_workflow_collection(self.arvrunner, shortname(self.tool["id"]), packed)
+                self.wf_pdh = upload_workflow_collection(self.arvrunner, shortname(self.tool["id"]), packed, runtimeContext)
 
         self.loadingContext = self.loadingContext.copy()
         self.loadingContext.metadata = self.loadingContext.metadata.copy()