X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/b8749c42d9ae650f4e2dbeeda2a8cdae9266ff2a..e2267bd99209651c61425f335230e515421b2ef4:/sdk/cwl/arvados_cwl/runner.py diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py index 934aeb4018..4861039198 100644 --- a/sdk/cwl/arvados_cwl/runner.py +++ b/sdk/cwl/arvados_cwl/runner.py @@ -17,7 +17,30 @@ import json import copy from collections import namedtuple from io import StringIO -from typing import Mapping, Sequence +from typing import ( + Any, + Callable, + Dict, + Iterable, + Iterator, + List, + Mapping, + MutableMapping, + Sequence, + MutableSequence, + Optional, + Set, + Sized, + Tuple, + Type, + Union, + cast, +) +from cwltool.utils import ( + CWLObjectType, + CWLOutputAtomType, + CWLOutputType, +) if os.name == "posix" and sys.version_info[0] < 3: import subprocess32 as subprocess @@ -30,13 +53,14 @@ from cwltool.command_line_tool import CommandLineTool import cwltool.workflow from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname, Process, fill_in_defaults) -from cwltool.load_tool import fetch_document +from cwltool.load_tool import fetch_document, jobloaderctx from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class from cwltool.builder import substitute from cwltool.pack import pack from cwltool.update import INTERNAL_VERSION from cwltool.builder import Builder import schema_salad.validate as validate +import schema_salad.ref_resolver import arvados.collection import arvados.util @@ -230,23 +254,33 @@ def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discov if sfname is None: continue - p_location = primary["location"] - if "/" in p_location: - sfpath = ( - p_location[0 : p_location.rindex("/") + 1] - + sfname - ) + if isinstance(sfname, str): + p_location = primary["location"] + if "/" in p_location: + sfpath = ( + p_location[0 : p_location.rindex("/") + 1] + + sfname + ) required = builder.do_eval(required, context=primary) - if fsaccess.exists(sfpath): - if pattern is not None: - found.append({"location": sfpath, "class": "File"}) - else: - found.append(sf) - elif required: - raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError( - "Required secondary file '%s' does not exist" % sfpath) + if isinstance(sfname, list) or isinstance(sfname, dict): + each = aslist(sfname) + for e in each: + if required and not fsaccess.exists(e.get("location")): + raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError( + "Required secondary file '%s' does not exist" % e.get("location")) + found.extend(each) + + if isinstance(sfname, str): + if fsaccess.exists(sfpath): + if pattern is not None: + found.append({"location": sfpath, "class": "File"}) + else: + found.append(sf) + elif required: + raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError( + "Required secondary file '%s' does not exist" % sfpath) primary["secondaryFiles"] = cmap(found) if discovered is not None: @@ -305,25 +339,37 @@ def upload_dependencies(arvrunner, name, document_loader, scanobj = workflowobj if "id" in workflowobj and not workflowobj["id"].startswith("_:"): - # Need raw file content (before preprocessing) to ensure - # that external references in $include and $mixin are captured. - scanobj = loadref("", workflowobj["id"]) + defrg, _ = urllib.parse.urldefrag(workflowobj["id"]) + if cache is not None and defrg not in cache: + # if we haven't seen this file before, want raw file + # content (before preprocessing) to ensure that external + # references like $include haven't already been inlined. + scanobj = loadref("", workflowobj["id"]) metadata = scanobj - sc_result = scandeps(uri, scanobj, - loadref_fields, - set(("$include", "location")), - loadref, urljoin=document_loader.fetcher.urljoin, - nestdirs=False) + with Perf(metrics, "scandeps include, location"): + sc_result = scandeps(uri, scanobj, + loadref_fields, + set(("$include", "location")), + loadref, urljoin=document_loader.fetcher.urljoin, + nestdirs=False) + + with Perf(metrics, "scandeps $schemas"): + optional_deps = scandeps(uri, scanobj, + loadref_fields, + set(("$schemas",)), + loadref, urljoin=document_loader.fetcher.urljoin, + nestdirs=False) - optional_deps = scandeps(uri, scanobj, - loadref_fields, - set(("$schemas",)), - loadref, urljoin=document_loader.fetcher.urljoin, - nestdirs=False) + if sc_result is None: + sc_result = [] - sc_result.extend(optional_deps) + if optional_deps is None: + optional_deps = [] + + if optional_deps: + sc_result.extend(optional_deps) sc = [] uuids = {} @@ -351,35 +397,45 @@ def upload_dependencies(arvrunner, name, document_loader, sc.append(obj) collect_uuids(obj) - visit_class(workflowobj, ("File", "Directory"), collect_uuids) - visit_class(sc_result, ("File", "Directory"), collect_uploads) + with Perf(metrics, "collect uuids"): + visit_class(workflowobj, ("File", "Directory"), collect_uuids) + + with Perf(metrics, "collect uploads"): + visit_class(sc_result, ("File", "Directory"), collect_uploads) # Resolve any collection uuids we found to portable data hashes # and assign them to uuid_map uuid_map = {} fetch_uuids = list(uuids.keys()) - while fetch_uuids: - # For a large number of fetch_uuids, API server may limit - # response size, so keep fetching from API server has nothing - # more to give us. - lookups = arvrunner.api.collections().list( - filters=[["uuid", "in", fetch_uuids]], - count="none", - select=["uuid", "portable_data_hash"]).execute( - num_retries=arvrunner.num_retries) + with Perf(metrics, "fetch_uuids"): + while fetch_uuids: + # For a large number of fetch_uuids, API server may limit + # response size, so keep fetching from API server has nothing + # more to give us. + lookups = arvrunner.api.collections().list( + filters=[["uuid", "in", fetch_uuids]], + count="none", + select=["uuid", "portable_data_hash"]).execute( + num_retries=arvrunner.num_retries) - if not lookups["items"]: - break + if not lookups["items"]: + break - for l in lookups["items"]: - uuid_map[l["uuid"]] = l["portable_data_hash"] + for l in lookups["items"]: + uuid_map[l["uuid"]] = l["portable_data_hash"] - fetch_uuids = [u for u in fetch_uuids if u not in uuid_map] + fetch_uuids = [u for u in fetch_uuids if u not in uuid_map] normalizeFilesDirs(sc) - if include_primary and "id" in workflowobj: - sc.append({"class": "File", "location": workflowobj["id"]}) + if "id" in workflowobj: + defrg, _ = urllib.parse.urldefrag(workflowobj["id"]) + if include_primary: + # make sure it's included + sc.append({"class": "File", "location": defrg}) + else: + # make sure it's excluded + sc = [d for d in sc if d.get("location") != defrg] def visit_default(obj): def defaults_are_optional(f): @@ -420,12 +476,13 @@ def upload_dependencies(arvrunner, name, document_loader, else: del discovered[d] - mapper = ArvPathMapper(arvrunner, sc, "", - "keep:%s", - "keep:%s/%s", - name=name, - single_collection=True, - optional_deps=optional_deps) + with Perf(metrics, "mapper"): + mapper = ArvPathMapper(arvrunner, sc, "", + "keep:%s", + "keep:%s/%s", + name=name, + single_collection=True, + optional_deps=optional_deps) keeprefs = set() def addkeepref(k): @@ -469,8 +526,9 @@ def upload_dependencies(arvrunner, name, document_loader, p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "") p[collectionUUID] = uuid - visit_class(workflowobj, ("File", "Directory"), setloc) - visit_class(discovered, ("File", "Directory"), setloc) + with Perf(metrics, "setloc"): + visit_class(workflowobj, ("File", "Directory"), setloc) + visit_class(discovered, ("File", "Directory"), setloc) if discovered_secondaryfiles is not None: for d in discovered: @@ -547,7 +605,7 @@ def upload_docker(arvrunner, tool, runtimeContext): upload_docker(arvrunner, s.embedded_tool, runtimeContext) -def packed_workflow(arvrunner, tool, merged_map, runtimeContext): +def packed_workflow(arvrunner, tool, merged_map, runtimeContext, git_info): """Create a packed workflow. A "packed" workflow is one where all the components have been combined into a single document.""" @@ -587,6 +645,11 @@ def packed_workflow(arvrunner, tool, merged_map, runtimeContext): for l in v: visit(l, cur_id) visit(packed, None) + + if git_info: + for g in git_info: + packed[g] = git_info[g] + return packed @@ -632,9 +695,12 @@ def upload_job_order(arvrunner, name, tool, job_order, runtimeContext): tool.tool["inputs"], job_order) + _jobloaderctx = jobloaderctx.copy() + jobloader = schema_salad.ref_resolver.Loader(_jobloaderctx, fetcher_constructor=tool.doc_loader.fetcher_constructor) + jobmapper = upload_dependencies(arvrunner, name, - tool.doc_loader, + jobloader, job_order, job_order.get("id", "#"), False, @@ -662,28 +728,37 @@ def upload_workflow_deps(arvrunner, tool, runtimeContext): merged_map = {} tool_dep_cache = {} + + todo = [] + + # Standard traversal is top down, we want to go bottom up, so use + # the visitor to accumalate a list of nodes to visit, then + # visit them in reverse order. def upload_tool_deps(deptool): if "id" in deptool: - discovered_secondaryfiles = {} - with Perf(metrics, "upload_dependencies %s" % shortname(deptool["id"]): - pm = upload_dependencies(arvrunner, - "%s dependencies" % (shortname(deptool["id"])), - document_loader, - deptool, - deptool["id"], - False, - runtimeContext, - include_primary=False, - discovered_secondaryfiles=discovered_secondaryfiles, - cache=tool_dep_cache) - document_loader.idx[deptool["id"]] = deptool - toolmap = {} - for k,v in pm.items(): - toolmap[k] = v.resolved - merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles) + todo.append(deptool) tool.visit(upload_tool_deps) + for deptool in reversed(todo): + discovered_secondaryfiles = {} + with Perf(metrics, "upload_dependencies %s" % shortname(deptool["id"])): + pm = upload_dependencies(arvrunner, + "%s dependencies" % (shortname(deptool["id"])), + document_loader, + deptool, + deptool["id"], + False, + runtimeContext, + include_primary=False, + discovered_secondaryfiles=discovered_secondaryfiles, + cache=tool_dep_cache) + document_loader.idx[deptool["id"]] = deptool + toolmap = {} + for k,v in pm.items(): + toolmap[k] = v.resolved + merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles) + return merged_map def arvados_jobs_image(arvrunner, img, runtimeContext): @@ -737,7 +812,8 @@ class Runner(Process): intermediate_output_ttl=0, merged_map=None, priority=None, secret_store=None, collection_cache_size=256, - collection_cache_is_default=True): + collection_cache_is_default=True, + git_info=None): loadingContext = loadingContext.copy() loadingContext.metadata = updated_tool.metadata.copy() @@ -766,6 +842,7 @@ class Runner(Process): self.priority = priority self.secret_store = secret_store self.enable_dev = loadingContext.enable_dev + self.git_info = git_info self.submit_runner_cores = 1 self.submit_runner_ram = 1024 # defaut 1 GiB