import cwltool.workflow
from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
shortname, Process, fill_in_defaults)
-from cwltool.load_tool import fetch_document
+from cwltool.load_tool import fetch_document, jobloaderctx
from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
from cwltool.builder import substitute
from cwltool.pack import pack
from cwltool.update import INTERNAL_VERSION
from cwltool.builder import Builder
import schema_salad.validate as validate
+import schema_salad.ref_resolver
import arvados.collection
import arvados.util
upload_docker(arvrunner, s.embedded_tool, runtimeContext)
-def packed_workflow(arvrunner, tool, merged_map, runtimeContext):
+def packed_workflow(arvrunner, tool, merged_map, runtimeContext, git_info):
"""Create a packed workflow.
A "packed" workflow is one where all the components have been combined into a single document."""
for l in v:
visit(l, cur_id)
visit(packed, None)
+
+ if git_info:
+ for g in git_info:
+ packed[g] = git_info[g]
+
return packed
tool.tool["inputs"],
job_order)
+ _jobloaderctx = jobloaderctx.copy()
+ jobloader = schema_salad.ref_resolver.Loader(_jobloaderctx, fetcher_constructor=tool.doc_loader.fetcher_constructor)
+
jobmapper = upload_dependencies(arvrunner,
name,
- tool.doc_loader,
+ jobloader,
job_order,
job_order.get("id", "#"),
False,
merged_map = {}
tool_dep_cache = {}
+
+ todo = []
+
+ # Standard traversal is top down, we want to go bottom up, so use
+ # the visitor to accumalate a list of nodes to visit, then
+ # visit them in reverse order.
def upload_tool_deps(deptool):
if "id" in deptool:
- discovered_secondaryfiles = {}
- with Perf(metrics, "upload_dependencies %s" % shortname(deptool["id"])):
- pm = upload_dependencies(arvrunner,
- "%s dependencies" % (shortname(deptool["id"])),
- document_loader,
- deptool,
- deptool["id"],
- False,
- runtimeContext,
- include_primary=False,
- discovered_secondaryfiles=discovered_secondaryfiles,
- cache=tool_dep_cache)
- document_loader.idx[deptool["id"]] = deptool
- toolmap = {}
- for k,v in pm.items():
- toolmap[k] = v.resolved
- merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
+ todo.append(deptool)
tool.visit(upload_tool_deps)
+ for deptool in reversed(todo):
+ discovered_secondaryfiles = {}
+ with Perf(metrics, "upload_dependencies %s" % shortname(deptool["id"])):
+ pm = upload_dependencies(arvrunner,
+ "%s dependencies" % (shortname(deptool["id"])),
+ document_loader,
+ deptool,
+ deptool["id"],
+ False,
+ runtimeContext,
+ include_primary=False,
+ discovered_secondaryfiles=discovered_secondaryfiles,
+ cache=tool_dep_cache)
+ document_loader.idx[deptool["id"]] = deptool
+ toolmap = {}
+ for k,v in pm.items():
+ toolmap[k] = v.resolved
+ merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
+
return merged_map
def arvados_jobs_image(arvrunner, img, runtimeContext):
intermediate_output_ttl=0, merged_map=None,
priority=None, secret_store=None,
collection_cache_size=256,
- collection_cache_is_default=True):
+ collection_cache_is_default=True,
+ git_info=None):
loadingContext = loadingContext.copy()
loadingContext.metadata = updated_tool.metadata.copy()
self.priority = priority
self.secret_store = secret_store
self.enable_dev = loadingContext.enable_dev
+ self.git_info = git_info
self.submit_runner_cores = 1
self.submit_runner_ram = 1024 # defaut 1 GiB