X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/513804e1a2bf43329dc7d37ee9374f3e02ffe169..7a233da0326bee6f4f6448528707c0cf8925d2ea:/sdk/cwl/arvados_cwl/runner.py diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py index 225f4ae60e..4861039198 100644 --- a/sdk/cwl/arvados_cwl/runner.py +++ b/sdk/cwl/arvados_cwl/runner.py @@ -53,13 +53,14 @@ from cwltool.command_line_tool import CommandLineTool import cwltool.workflow from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname, Process, fill_in_defaults) -from cwltool.load_tool import fetch_document +from cwltool.load_tool import fetch_document, jobloaderctx from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class from cwltool.builder import substitute from cwltool.pack import pack from cwltool.update import INTERNAL_VERSION from cwltool.builder import Builder import schema_salad.validate as validate +import schema_salad.ref_resolver import arvados.collection import arvados.util @@ -604,7 +605,7 @@ def upload_docker(arvrunner, tool, runtimeContext): upload_docker(arvrunner, s.embedded_tool, runtimeContext) -def packed_workflow(arvrunner, tool, merged_map, runtimeContext): +def packed_workflow(arvrunner, tool, merged_map, runtimeContext, git_info): """Create a packed workflow. A "packed" workflow is one where all the components have been combined into a single document.""" @@ -644,6 +645,11 @@ def packed_workflow(arvrunner, tool, merged_map, runtimeContext): for l in v: visit(l, cur_id) visit(packed, None) + + if git_info: + for g in git_info: + packed[g] = git_info[g] + return packed @@ -689,9 +695,12 @@ def upload_job_order(arvrunner, name, tool, job_order, runtimeContext): tool.tool["inputs"], job_order) + _jobloaderctx = jobloaderctx.copy() + jobloader = schema_salad.ref_resolver.Loader(_jobloaderctx, fetcher_constructor=tool.doc_loader.fetcher_constructor) + jobmapper = upload_dependencies(arvrunner, name, - tool.doc_loader, + jobloader, job_order, job_order.get("id", "#"), False, @@ -719,28 +728,37 @@ def upload_workflow_deps(arvrunner, tool, runtimeContext): merged_map = {} tool_dep_cache = {} + + todo = [] + + # Standard traversal is top down, we want to go bottom up, so use + # the visitor to accumalate a list of nodes to visit, then + # visit them in reverse order. def upload_tool_deps(deptool): if "id" in deptool: - discovered_secondaryfiles = {} - with Perf(metrics, "upload_dependencies %s" % shortname(deptool["id"])): - pm = upload_dependencies(arvrunner, - "%s dependencies" % (shortname(deptool["id"])), - document_loader, - deptool, - deptool["id"], - False, - runtimeContext, - include_primary=False, - discovered_secondaryfiles=discovered_secondaryfiles, - cache=tool_dep_cache) - document_loader.idx[deptool["id"]] = deptool - toolmap = {} - for k,v in pm.items(): - toolmap[k] = v.resolved - merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles) + todo.append(deptool) tool.visit(upload_tool_deps) + for deptool in reversed(todo): + discovered_secondaryfiles = {} + with Perf(metrics, "upload_dependencies %s" % shortname(deptool["id"])): + pm = upload_dependencies(arvrunner, + "%s dependencies" % (shortname(deptool["id"])), + document_loader, + deptool, + deptool["id"], + False, + runtimeContext, + include_primary=False, + discovered_secondaryfiles=discovered_secondaryfiles, + cache=tool_dep_cache) + document_loader.idx[deptool["id"]] = deptool + toolmap = {} + for k,v in pm.items(): + toolmap[k] = v.resolved + merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles) + return merged_map def arvados_jobs_image(arvrunner, img, runtimeContext): @@ -794,7 +812,8 @@ class Runner(Process): intermediate_output_ttl=0, merged_map=None, priority=None, secret_store=None, collection_cache_size=256, - collection_cache_is_default=True): + collection_cache_is_default=True, + git_info=None): loadingContext = loadingContext.copy() loadingContext.metadata = updated_tool.metadata.copy() @@ -823,6 +842,7 @@ class Runner(Process): self.priority = priority self.secret_store = secret_store self.enable_dev = loadingContext.enable_dev + self.git_info = git_info self.submit_runner_cores = 1 self.submit_runner_ram = 1024 # defaut 1 GiB