Merge branch 'main' into 18842-arv-mount-disk-config
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
index 1544d05cd70660c6e046ef80073b7c80fb7c52c2..4861039198a18c36dbd0ae6d805be060cff1e224 100644 (file)
@@ -53,13 +53,14 @@ from cwltool.command_line_tool import CommandLineTool
 import cwltool.workflow
 from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
                              shortname, Process, fill_in_defaults)
-from cwltool.load_tool import fetch_document
+from cwltool.load_tool import fetch_document, jobloaderctx
 from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
 from cwltool.builder import substitute
 from cwltool.pack import pack
 from cwltool.update import INTERNAL_VERSION
 from cwltool.builder import Builder
 import schema_salad.validate as validate
+import schema_salad.ref_resolver
 
 import arvados.collection
 import arvados.util
@@ -694,9 +695,12 @@ def upload_job_order(arvrunner, name, tool, job_order, runtimeContext):
                              tool.tool["inputs"],
                              job_order)
 
+    _jobloaderctx = jobloaderctx.copy()
+    jobloader = schema_salad.ref_resolver.Loader(_jobloaderctx, fetcher_constructor=tool.doc_loader.fetcher_constructor)
+
     jobmapper = upload_dependencies(arvrunner,
                                     name,
-                                    tool.doc_loader,
+                                    jobloader,
                                     job_order,
                                     job_order.get("id", "#"),
                                     False,
@@ -724,28 +728,37 @@ def upload_workflow_deps(arvrunner, tool, runtimeContext):
 
     merged_map = {}
     tool_dep_cache = {}
+
+    todo = []
+
+    # Standard traversal is top down, we want to go bottom up, so use
+    # the visitor to accumalate a list of nodes to visit, then
+    # visit them in reverse order.
     def upload_tool_deps(deptool):
         if "id" in deptool:
-            discovered_secondaryfiles = {}
-            with Perf(metrics, "upload_dependencies %s" % shortname(deptool["id"])):
-                pm = upload_dependencies(arvrunner,
-                                         "%s dependencies" % (shortname(deptool["id"])),
-                                         document_loader,
-                                         deptool,
-                                         deptool["id"],
-                                         False,
-                                         runtimeContext,
-                                         include_primary=False,
-                                         discovered_secondaryfiles=discovered_secondaryfiles,
-                                         cache=tool_dep_cache)
-            document_loader.idx[deptool["id"]] = deptool
-            toolmap = {}
-            for k,v in pm.items():
-                toolmap[k] = v.resolved
-            merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
+            todo.append(deptool)
 
     tool.visit(upload_tool_deps)
 
+    for deptool in reversed(todo):
+        discovered_secondaryfiles = {}
+        with Perf(metrics, "upload_dependencies %s" % shortname(deptool["id"])):
+            pm = upload_dependencies(arvrunner,
+                                     "%s dependencies" % (shortname(deptool["id"])),
+                                     document_loader,
+                                     deptool,
+                                     deptool["id"],
+                                     False,
+                                     runtimeContext,
+                                     include_primary=False,
+                                     discovered_secondaryfiles=discovered_secondaryfiles,
+                                     cache=tool_dep_cache)
+        document_loader.idx[deptool["id"]] = deptool
+        toolmap = {}
+        for k,v in pm.items():
+            toolmap[k] = v.resolved
+        merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
+
     return merged_map
 
 def arvados_jobs_image(arvrunner, img, runtimeContext):