X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/7865723dc5eee129e7ac269f3495274a13ff70ae..7a233da0326bee6f4f6448528707c0cf8925d2ea:/sdk/cwl/arvados_cwl/runner.py diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py index f232178c5d..4861039198 100644 --- a/sdk/cwl/arvados_cwl/runner.py +++ b/sdk/cwl/arvados_cwl/runner.py @@ -17,7 +17,30 @@ import json import copy from collections import namedtuple from io import StringIO -from typing import Mapping, Sequence +from typing import ( + Any, + Callable, + Dict, + Iterable, + Iterator, + List, + Mapping, + MutableMapping, + Sequence, + MutableSequence, + Optional, + Set, + Sized, + Tuple, + Type, + Union, + cast, +) +from cwltool.utils import ( + CWLObjectType, + CWLOutputAtomType, + CWLOutputType, +) if os.name == "posix" and sys.version_info[0] < 3: import subprocess32 as subprocess @@ -30,13 +53,14 @@ from cwltool.command_line_tool import CommandLineTool import cwltool.workflow from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname, Process, fill_in_defaults) -from cwltool.load_tool import fetch_document +from cwltool.load_tool import fetch_document, jobloaderctx from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class from cwltool.builder import substitute from cwltool.pack import pack from cwltool.update import INTERNAL_VERSION from cwltool.builder import Builder import schema_salad.validate as validate +import schema_salad.ref_resolver import arvados.collection import arvados.util @@ -49,8 +73,10 @@ from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, col from ._version import __version__ from . import done from . context import ArvRuntimeContext +from .perf import Perf logger = logging.getLogger('arvados.cwl-runner') +metrics = logging.getLogger('arvados.cwl-runner.metrics') def trim_anonymous_location(obj): """Remove 'location' field from File and Directory literals. @@ -228,28 +254,38 @@ def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discov if sfname is None: continue - p_location = primary["location"] - if "/" in p_location: - sfpath = ( - p_location[0 : p_location.rindex("/") + 1] - + sfname - ) + if isinstance(sfname, str): + p_location = primary["location"] + if "/" in p_location: + sfpath = ( + p_location[0 : p_location.rindex("/") + 1] + + sfname + ) required = builder.do_eval(required, context=primary) - if fsaccess.exists(sfpath): - if pattern is not None: - found.append({"location": sfpath, "class": "File"}) - else: - found.append(sf) - elif required: - raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError( - "Required secondary file '%s' does not exist" % sfpath) + if isinstance(sfname, list) or isinstance(sfname, dict): + each = aslist(sfname) + for e in each: + if required and not fsaccess.exists(e.get("location")): + raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError( + "Required secondary file '%s' does not exist" % e.get("location")) + found.extend(each) + + if isinstance(sfname, str): + if fsaccess.exists(sfpath): + if pattern is not None: + found.append({"location": sfpath, "class": "File"}) + else: + found.append(sf) + elif required: + raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError( + "Required secondary file '%s' does not exist" % sfpath) primary["secondaryFiles"] = cmap(found) if discovered is not None: discovered[primary["location"]] = primary["secondaryFiles"] - elif inputschema["type"] not in primitive_types_set: + elif inputschema["type"] not in primitive_types_set and inputschema["type"] not in ("File", "Directory"): set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered) def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None): @@ -260,7 +296,8 @@ def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=No def upload_dependencies(arvrunner, name, document_loader, workflowobj, uri, loadref_run, runtimeContext, - include_primary=True, discovered_secondaryfiles=None): + include_primary=True, discovered_secondaryfiles=None, + cache=None): """Upload the dependencies of the workflowobj document to Keep. Returns a pathmapper object mapping local paths to keep references. Also @@ -279,6 +316,8 @@ def upload_dependencies(arvrunner, name, document_loader, defrg, _ = urllib.parse.urldefrag(joined) if defrg not in loaded: loaded.add(defrg) + if cache is not None and defrg in cache: + return cache[defrg] # Use fetch_text to get raw file (before preprocessing). text = document_loader.fetch_text(defrg) if isinstance(text, bytes): @@ -286,7 +325,10 @@ def upload_dependencies(arvrunner, name, document_loader, else: textIO = StringIO(text) yamlloader = YAML(typ='safe', pure=True) - return yamlloader.load(textIO) + result = yamlloader.load(textIO) + if cache is not None: + cache[defrg] = result + return result else: return {} @@ -297,25 +339,37 @@ def upload_dependencies(arvrunner, name, document_loader, scanobj = workflowobj if "id" in workflowobj and not workflowobj["id"].startswith("_:"): - # Need raw file content (before preprocessing) to ensure - # that external references in $include and $mixin are captured. - scanobj = loadref("", workflowobj["id"]) + defrg, _ = urllib.parse.urldefrag(workflowobj["id"]) + if cache is not None and defrg not in cache: + # if we haven't seen this file before, want raw file + # content (before preprocessing) to ensure that external + # references like $include haven't already been inlined. + scanobj = loadref("", workflowobj["id"]) metadata = scanobj - sc_result = scandeps(uri, scanobj, - loadref_fields, - set(("$include", "location")), - loadref, urljoin=document_loader.fetcher.urljoin, - nestdirs=False) + with Perf(metrics, "scandeps include, location"): + sc_result = scandeps(uri, scanobj, + loadref_fields, + set(("$include", "location")), + loadref, urljoin=document_loader.fetcher.urljoin, + nestdirs=False) - optional_deps = scandeps(uri, scanobj, - loadref_fields, - set(("$schemas",)), - loadref, urljoin=document_loader.fetcher.urljoin, - nestdirs=False) + with Perf(metrics, "scandeps $schemas"): + optional_deps = scandeps(uri, scanobj, + loadref_fields, + set(("$schemas",)), + loadref, urljoin=document_loader.fetcher.urljoin, + nestdirs=False) - sc_result.extend(optional_deps) + if sc_result is None: + sc_result = [] + + if optional_deps is None: + optional_deps = [] + + if optional_deps: + sc_result.extend(optional_deps) sc = [] uuids = {} @@ -343,35 +397,45 @@ def upload_dependencies(arvrunner, name, document_loader, sc.append(obj) collect_uuids(obj) - visit_class(workflowobj, ("File", "Directory"), collect_uuids) - visit_class(sc_result, ("File", "Directory"), collect_uploads) + with Perf(metrics, "collect uuids"): + visit_class(workflowobj, ("File", "Directory"), collect_uuids) + + with Perf(metrics, "collect uploads"): + visit_class(sc_result, ("File", "Directory"), collect_uploads) # Resolve any collection uuids we found to portable data hashes # and assign them to uuid_map uuid_map = {} fetch_uuids = list(uuids.keys()) - while fetch_uuids: - # For a large number of fetch_uuids, API server may limit - # response size, so keep fetching from API server has nothing - # more to give us. - lookups = arvrunner.api.collections().list( - filters=[["uuid", "in", fetch_uuids]], - count="none", - select=["uuid", "portable_data_hash"]).execute( - num_retries=arvrunner.num_retries) + with Perf(metrics, "fetch_uuids"): + while fetch_uuids: + # For a large number of fetch_uuids, API server may limit + # response size, so keep fetching from API server has nothing + # more to give us. + lookups = arvrunner.api.collections().list( + filters=[["uuid", "in", fetch_uuids]], + count="none", + select=["uuid", "portable_data_hash"]).execute( + num_retries=arvrunner.num_retries) - if not lookups["items"]: - break + if not lookups["items"]: + break - for l in lookups["items"]: - uuid_map[l["uuid"]] = l["portable_data_hash"] + for l in lookups["items"]: + uuid_map[l["uuid"]] = l["portable_data_hash"] - fetch_uuids = [u for u in fetch_uuids if u not in uuid_map] + fetch_uuids = [u for u in fetch_uuids if u not in uuid_map] normalizeFilesDirs(sc) - if include_primary and "id" in workflowobj: - sc.append({"class": "File", "location": workflowobj["id"]}) + if "id" in workflowobj: + defrg, _ = urllib.parse.urldefrag(workflowobj["id"]) + if include_primary: + # make sure it's included + sc.append({"class": "File", "location": defrg}) + else: + # make sure it's excluded + sc = [d for d in sc if d.get("location") != defrg] def visit_default(obj): def defaults_are_optional(f): @@ -412,12 +476,13 @@ def upload_dependencies(arvrunner, name, document_loader, else: del discovered[d] - mapper = ArvPathMapper(arvrunner, sc, "", - "keep:%s", - "keep:%s/%s", - name=name, - single_collection=True, - optional_deps=optional_deps) + with Perf(metrics, "mapper"): + mapper = ArvPathMapper(arvrunner, sc, "", + "keep:%s", + "keep:%s/%s", + name=name, + single_collection=True, + optional_deps=optional_deps) keeprefs = set() def addkeepref(k): @@ -461,8 +526,9 @@ def upload_dependencies(arvrunner, name, document_loader, p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "") p[collectionUUID] = uuid - visit_class(workflowobj, ("File", "Directory"), setloc) - visit_class(discovered, ("File", "Directory"), setloc) + with Perf(metrics, "setloc"): + visit_class(workflowobj, ("File", "Directory"), setloc) + visit_class(discovered, ("File", "Directory"), setloc) if discovered_secondaryfiles is not None: for d in discovered: @@ -539,7 +605,7 @@ def upload_docker(arvrunner, tool, runtimeContext): upload_docker(arvrunner, s.embedded_tool, runtimeContext) -def packed_workflow(arvrunner, tool, merged_map, runtimeContext): +def packed_workflow(arvrunner, tool, merged_map, runtimeContext, git_info): """Create a packed workflow. A "packed" workflow is one where all the components have been combined into a single document.""" @@ -579,6 +645,11 @@ def packed_workflow(arvrunner, tool, merged_map, runtimeContext): for l in v: visit(l, cur_id) visit(packed, None) + + if git_info: + for g in git_info: + packed[g] = git_info[g] + return packed @@ -624,9 +695,12 @@ def upload_job_order(arvrunner, name, tool, job_order, runtimeContext): tool.tool["inputs"], job_order) + _jobloaderctx = jobloaderctx.copy() + jobloader = schema_salad.ref_resolver.Loader(_jobloaderctx, fetcher_constructor=tool.doc_loader.fetcher_constructor) + jobmapper = upload_dependencies(arvrunner, name, - tool.doc_loader, + jobloader, job_order, job_order.get("id", "#"), False, @@ -647,15 +721,28 @@ FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"]) def upload_workflow_deps(arvrunner, tool, runtimeContext): # Ensure that Docker images needed by this workflow are available - upload_docker(arvrunner, tool, runtimeContext) + with Perf(metrics, "upload_docker"): + upload_docker(arvrunner, tool, runtimeContext) document_loader = tool.doc_loader merged_map = {} + tool_dep_cache = {} + todo = [] + + # Standard traversal is top down, we want to go bottom up, so use + # the visitor to accumalate a list of nodes to visit, then + # visit them in reverse order. def upload_tool_deps(deptool): if "id" in deptool: - discovered_secondaryfiles = {} + todo.append(deptool) + + tool.visit(upload_tool_deps) + + for deptool in reversed(todo): + discovered_secondaryfiles = {} + with Perf(metrics, "upload_dependencies %s" % shortname(deptool["id"])): pm = upload_dependencies(arvrunner, "%s dependencies" % (shortname(deptool["id"])), document_loader, @@ -664,14 +751,13 @@ def upload_workflow_deps(arvrunner, tool, runtimeContext): False, runtimeContext, include_primary=False, - discovered_secondaryfiles=discovered_secondaryfiles) - document_loader.idx[deptool["id"]] = deptool - toolmap = {} - for k,v in pm.items(): - toolmap[k] = v.resolved - merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles) - - tool.visit(upload_tool_deps) + discovered_secondaryfiles=discovered_secondaryfiles, + cache=tool_dep_cache) + document_loader.idx[deptool["id"]] = deptool + toolmap = {} + for k,v in pm.items(): + toolmap[k] = v.resolved + merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles) return merged_map @@ -726,7 +812,8 @@ class Runner(Process): intermediate_output_ttl=0, merged_map=None, priority=None, secret_store=None, collection_cache_size=256, - collection_cache_is_default=True): + collection_cache_is_default=True, + git_info=None): loadingContext = loadingContext.copy() loadingContext.metadata = updated_tool.metadata.copy() @@ -755,6 +842,7 @@ class Runner(Process): self.priority = priority self.secret_store = secret_store self.enable_dev = loadingContext.enable_dev + self.git_info = git_info self.submit_runner_cores = 1 self.submit_runner_ram = 1024 # defaut 1 GiB