X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/b334b065b36357dd08099adad9835f4aa7075337..7a233da0326bee6f4f6448528707c0cf8925d2ea:/sdk/cwl/arvados_cwl/runner.py diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py index d2486c164b..4861039198 100644 --- a/sdk/cwl/arvados_cwl/runner.py +++ b/sdk/cwl/arvados_cwl/runner.py @@ -53,13 +53,14 @@ from cwltool.command_line_tool import CommandLineTool import cwltool.workflow from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname, Process, fill_in_defaults) -from cwltool.load_tool import fetch_document +from cwltool.load_tool import fetch_document, jobloaderctx from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class from cwltool.builder import substitute from cwltool.pack import pack from cwltool.update import INTERNAL_VERSION from cwltool.builder import Builder import schema_salad.validate as validate +import schema_salad.ref_resolver import arvados.collection import arvados.util @@ -253,23 +254,33 @@ def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discov if sfname is None: continue - p_location = primary["location"] - if "/" in p_location: - sfpath = ( - p_location[0 : p_location.rindex("/") + 1] - + sfname - ) + if isinstance(sfname, str): + p_location = primary["location"] + if "/" in p_location: + sfpath = ( + p_location[0 : p_location.rindex("/") + 1] + + sfname + ) required = builder.do_eval(required, context=primary) - if fsaccess.exists(sfpath): - if pattern is not None: - found.append({"location": sfpath, "class": "File"}) - else: - found.append(sf) - elif required: - raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError( - "Required secondary file '%s' does not exist" % sfpath) + if isinstance(sfname, list) or isinstance(sfname, dict): + each = aslist(sfname) + for e in each: + if required and not fsaccess.exists(e.get("location")): + raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError( + "Required secondary file '%s' does not exist" % e.get("location")) + found.extend(each) + + if isinstance(sfname, str): + if fsaccess.exists(sfpath): + if pattern is not None: + found.append({"location": sfpath, "class": "File"}) + else: + found.append(sf) + elif required: + raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError( + "Required secondary file '%s' does not exist" % sfpath) primary["secondaryFiles"] = cmap(found) if discovered is not None: @@ -417,8 +428,14 @@ def upload_dependencies(arvrunner, name, document_loader, normalizeFilesDirs(sc) - if include_primary and "id" in workflowobj: - sc.append({"class": "File", "location": workflowobj["id"]}) + if "id" in workflowobj: + defrg, _ = urllib.parse.urldefrag(workflowobj["id"]) + if include_primary: + # make sure it's included + sc.append({"class": "File", "location": defrg}) + else: + # make sure it's excluded + sc = [d for d in sc if d.get("location") != defrg] def visit_default(obj): def defaults_are_optional(f): @@ -588,7 +605,7 @@ def upload_docker(arvrunner, tool, runtimeContext): upload_docker(arvrunner, s.embedded_tool, runtimeContext) -def packed_workflow(arvrunner, tool, merged_map, runtimeContext): +def packed_workflow(arvrunner, tool, merged_map, runtimeContext, git_info): """Create a packed workflow. A "packed" workflow is one where all the components have been combined into a single document.""" @@ -628,6 +645,11 @@ def packed_workflow(arvrunner, tool, merged_map, runtimeContext): for l in v: visit(l, cur_id) visit(packed, None) + + if git_info: + for g in git_info: + packed[g] = git_info[g] + return packed @@ -673,9 +695,12 @@ def upload_job_order(arvrunner, name, tool, job_order, runtimeContext): tool.tool["inputs"], job_order) + _jobloaderctx = jobloaderctx.copy() + jobloader = schema_salad.ref_resolver.Loader(_jobloaderctx, fetcher_constructor=tool.doc_loader.fetcher_constructor) + jobmapper = upload_dependencies(arvrunner, name, - tool.doc_loader, + jobloader, job_order, job_order.get("id", "#"), False, @@ -703,28 +728,37 @@ def upload_workflow_deps(arvrunner, tool, runtimeContext): merged_map = {} tool_dep_cache = {} + + todo = [] + + # Standard traversal is top down, we want to go bottom up, so use + # the visitor to accumalate a list of nodes to visit, then + # visit them in reverse order. def upload_tool_deps(deptool): if "id" in deptool: - discovered_secondaryfiles = {} - with Perf(metrics, "upload_dependencies %s" % shortname(deptool["id"])): - pm = upload_dependencies(arvrunner, - "%s dependencies" % (shortname(deptool["id"])), - document_loader, - deptool, - deptool["id"], - False, - runtimeContext, - include_primary=False, - discovered_secondaryfiles=discovered_secondaryfiles, - cache=tool_dep_cache) - document_loader.idx[deptool["id"]] = deptool - toolmap = {} - for k,v in pm.items(): - toolmap[k] = v.resolved - merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles) + todo.append(deptool) tool.visit(upload_tool_deps) + for deptool in reversed(todo): + discovered_secondaryfiles = {} + with Perf(metrics, "upload_dependencies %s" % shortname(deptool["id"])): + pm = upload_dependencies(arvrunner, + "%s dependencies" % (shortname(deptool["id"])), + document_loader, + deptool, + deptool["id"], + False, + runtimeContext, + include_primary=False, + discovered_secondaryfiles=discovered_secondaryfiles, + cache=tool_dep_cache) + document_loader.idx[deptool["id"]] = deptool + toolmap = {} + for k,v in pm.items(): + toolmap[k] = v.resolved + merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles) + return merged_map def arvados_jobs_image(arvrunner, img, runtimeContext): @@ -778,7 +812,8 @@ class Runner(Process): intermediate_output_ttl=0, merged_map=None, priority=None, secret_store=None, collection_cache_size=256, - collection_cache_is_default=True): + collection_cache_is_default=True, + git_info=None): loadingContext = loadingContext.copy() loadingContext.metadata = updated_tool.metadata.copy() @@ -807,6 +842,7 @@ class Runner(Process): self.priority = priority self.secret_store = secret_store self.enable_dev = loadingContext.enable_dev + self.git_info = git_info self.submit_runner_cores = 1 self.submit_runner_ram = 1024 # defaut 1 GiB