X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/97fc7b68a8777794e1beb2c93a1c5c7880f1f0df..a5f492d001f158a01abfd79b65c5bf41708da7f9:/sdk/cwl/arvados_cwl/runner.py diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py index e5a81cdc73..a307cf7f66 100644 --- a/sdk/cwl/arvados_cwl/runner.py +++ b/sdk/cwl/arvados_cwl/runner.py @@ -17,7 +17,30 @@ import json import copy from collections import namedtuple from io import StringIO -from typing import Mapping, Sequence +from typing import ( + Any, + Callable, + Dict, + Iterable, + Iterator, + List, + Mapping, + MutableMapping, + Sequence, + MutableSequence, + Optional, + Set, + Sized, + Tuple, + Type, + Union, + cast, +) +from cwltool.utils import ( + CWLObjectType, + CWLOutputAtomType, + CWLOutputType, +) if os.name == "posix" and sys.version_info[0] < 3: import subprocess32 as subprocess @@ -30,15 +53,17 @@ from cwltool.command_line_tool import CommandLineTool import cwltool.workflow from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname, Process, fill_in_defaults) -from cwltool.load_tool import fetch_document +from cwltool.load_tool import fetch_document, jobloaderctx from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class from cwltool.builder import substitute from cwltool.pack import pack from cwltool.update import INTERNAL_VERSION from cwltool.builder import Builder import schema_salad.validate as validate +import schema_salad.ref_resolver import arvados.collection +import arvados.util from .util import collectionUUID from ruamel.yaml import YAML from ruamel.yaml.comments import CommentedMap, CommentedSeq @@ -48,8 +73,10 @@ from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, col from ._version import __version__ from . import done from . context import ArvRuntimeContext +from .perf import Perf logger = logging.getLogger('arvados.cwl-runner') +metrics = logging.getLogger('arvados.cwl-runner.metrics') def trim_anonymous_location(obj): """Remove 'location' field from File and Directory literals. @@ -128,6 +155,9 @@ def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discov set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered) return + if inputschema == "File": + inputschema = {"type": "File"} + if isinstance(inputschema, basestring): sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements)) if sd: @@ -163,10 +193,13 @@ def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discov set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered) elif (inputschema["type"] == "File" and - secondaryspec and isinstance(primary, Mapping) and - primary.get("class") == "File" and - "secondaryFiles" not in primary): + primary.get("class") == "File"): + + if "secondaryFiles" in primary or not secondaryspec: + # Nothing to do. + return + # # Found a file, check for secondaryFiles # @@ -174,9 +207,9 @@ def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discov primary["secondaryFiles"] = secondaryspec for i, sf in enumerate(aslist(secondaryspec)): if builder.cwlVersion == "v1.0": - pattern = builder.do_eval(sf, context=primary) + pattern = sf else: - pattern = builder.do_eval(sf["pattern"], context=primary) + pattern = sf["pattern"] if pattern is None: continue if isinstance(pattern, list): @@ -213,23 +246,46 @@ def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discov "Expression must return list, object, string or null") if pattern is not None: - sfpath = substitute(primary["location"], pattern) + if "${" in pattern or "$(" in pattern: + sfname = builder.do_eval(pattern, context=primary) + else: + sfname = substitute(primary["basename"], pattern) + + if sfname is None: + continue + + if isinstance(sfname, str): + p_location = primary["location"] + if "/" in p_location: + sfpath = ( + p_location[0 : p_location.rindex("/") + 1] + + sfname + ) required = builder.do_eval(required, context=primary) - if fsaccess.exists(sfpath): - if pattern is not None: - found.append({"location": sfpath, "class": "File"}) - else: - found.append(sf) - elif required: - raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError( - "Required secondary file '%s' does not exist" % sfpath) + if isinstance(sfname, list) or isinstance(sfname, dict): + each = aslist(sfname) + for e in each: + if required and not fsaccess.exists(e.get("location")): + raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError( + "Required secondary file '%s' does not exist" % e.get("location")) + found.extend(each) + + if isinstance(sfname, str): + if fsaccess.exists(sfpath): + if pattern is not None: + found.append({"location": sfpath, "class": "File"}) + else: + found.append(sf) + elif required: + raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError( + "Required secondary file '%s' does not exist" % sfpath) primary["secondaryFiles"] = cmap(found) if discovered is not None: discovered[primary["location"]] = primary["secondaryFiles"] - elif inputschema["type"] not in primitive_types_set: + elif inputschema["type"] not in primitive_types_set and inputschema["type"] not in ("File", "Directory"): set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered) def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None): @@ -239,8 +295,9 @@ def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=No set_secondary(fsaccess, builder, inputschema, None, primary, discovered) def upload_dependencies(arvrunner, name, document_loader, - workflowobj, uri, loadref_run, - include_primary=True, discovered_secondaryfiles=None): + workflowobj, uri, loadref_run, runtimeContext, + include_primary=True, discovered_secondaryfiles=None, + cache=None): """Upload the dependencies of the workflowobj document to Keep. Returns a pathmapper object mapping local paths to keep references. Also @@ -259,6 +316,8 @@ def upload_dependencies(arvrunner, name, document_loader, defrg, _ = urllib.parse.urldefrag(joined) if defrg not in loaded: loaded.add(defrg) + if cache is not None and defrg in cache: + return cache[defrg] # Use fetch_text to get raw file (before preprocessing). text = document_loader.fetch_text(defrg) if isinstance(text, bytes): @@ -266,7 +325,10 @@ def upload_dependencies(arvrunner, name, document_loader, else: textIO = StringIO(text) yamlloader = YAML(typ='safe', pure=True) - return yamlloader.load(textIO) + result = yamlloader.load(textIO) + if cache is not None: + cache[defrg] = result + return result else: return {} @@ -277,25 +339,37 @@ def upload_dependencies(arvrunner, name, document_loader, scanobj = workflowobj if "id" in workflowobj and not workflowobj["id"].startswith("_:"): - # Need raw file content (before preprocessing) to ensure - # that external references in $include and $mixin are captured. - scanobj = loadref("", workflowobj["id"]) + defrg, _ = urllib.parse.urldefrag(workflowobj["id"]) + if cache is not None and defrg not in cache: + # if we haven't seen this file before, want raw file + # content (before preprocessing) to ensure that external + # references like $include haven't already been inlined. + scanobj = loadref("", workflowobj["id"]) metadata = scanobj - sc_result = scandeps(uri, scanobj, - loadref_fields, - set(("$include", "location")), - loadref, urljoin=document_loader.fetcher.urljoin, - nestdirs=False) + with Perf(metrics, "scandeps include, location"): + sc_result = scandeps(uri, scanobj, + loadref_fields, + set(("$include", "location")), + loadref, urljoin=document_loader.fetcher.urljoin, + nestdirs=False) - optional_deps = scandeps(uri, scanobj, - loadref_fields, - set(("$schemas",)), - loadref, urljoin=document_loader.fetcher.urljoin, - nestdirs=False) + with Perf(metrics, "scandeps $schemas"): + optional_deps = scandeps(uri, scanobj, + loadref_fields, + set(("$schemas",)), + loadref, urljoin=document_loader.fetcher.urljoin, + nestdirs=False) - sc_result.extend(optional_deps) + if sc_result is None: + sc_result = [] + + if optional_deps is None: + optional_deps = [] + + if optional_deps: + sc_result.extend(optional_deps) sc = [] uuids = {} @@ -323,41 +397,52 @@ def upload_dependencies(arvrunner, name, document_loader, sc.append(obj) collect_uuids(obj) - visit_class(workflowobj, ("File", "Directory"), collect_uuids) - visit_class(sc_result, ("File", "Directory"), collect_uploads) + with Perf(metrics, "collect uuids"): + visit_class(workflowobj, ("File", "Directory"), collect_uuids) + + with Perf(metrics, "collect uploads"): + visit_class(sc_result, ("File", "Directory"), collect_uploads) # Resolve any collection uuids we found to portable data hashes # and assign them to uuid_map uuid_map = {} fetch_uuids = list(uuids.keys()) - while fetch_uuids: - # For a large number of fetch_uuids, API server may limit - # response size, so keep fetching from API server has nothing - # more to give us. - lookups = arvrunner.api.collections().list( - filters=[["uuid", "in", fetch_uuids]], - count="none", - select=["uuid", "portable_data_hash"]).execute( - num_retries=arvrunner.num_retries) + with Perf(metrics, "fetch_uuids"): + while fetch_uuids: + # For a large number of fetch_uuids, API server may limit + # response size, so keep fetching from API server has nothing + # more to give us. + lookups = arvrunner.api.collections().list( + filters=[["uuid", "in", fetch_uuids]], + count="none", + select=["uuid", "portable_data_hash"]).execute( + num_retries=arvrunner.num_retries) - if not lookups["items"]: - break + if not lookups["items"]: + break - for l in lookups["items"]: - uuid_map[l["uuid"]] = l["portable_data_hash"] + for l in lookups["items"]: + uuid_map[l["uuid"]] = l["portable_data_hash"] - fetch_uuids = [u for u in fetch_uuids if u not in uuid_map] + fetch_uuids = [u for u in fetch_uuids if u not in uuid_map] normalizeFilesDirs(sc) - if include_primary and "id" in workflowobj: - sc.append({"class": "File", "location": workflowobj["id"]}) + if "id" in workflowobj: + defrg, _ = urllib.parse.urldefrag(workflowobj["id"]) + if include_primary: + # make sure it's included + sc.append({"class": "File", "location": defrg}) + else: + # make sure it's excluded + sc = [d for d in sc if d.get("location") != defrg] def visit_default(obj): def defaults_are_optional(f): if "location" not in f and "path" in f: f["location"] = f["path"] del f["path"] + normalizeFilesDirs(f) optional_deps.append(f) visit_class(obj["default"], ("File", "Directory"), defaults_are_optional) @@ -380,7 +465,10 @@ def upload_dependencies(arvrunner, name, document_loader, builder_job_order, discovered) - copied, _ = document_loader.resolve_all(copy.deepcopy(cmap(workflowobj)), base_url=uri, checklinks=False) + _jobloaderctx = jobloaderctx.copy() + loader = schema_salad.ref_resolver.Loader(_jobloaderctx, fetcher_constructor=document_loader.fetcher_constructor) + + copied, _ = loader.resolve_all(copy.deepcopy(cmap(workflowobj)), base_url=uri, checklinks=False) visit_class(copied, ("CommandLineTool", "Workflow"), discover_default_secondary_files) for d in list(discovered): @@ -391,17 +479,24 @@ def upload_dependencies(arvrunner, name, document_loader, else: del discovered[d] - mapper = ArvPathMapper(arvrunner, sc, "", - "keep:%s", - "keep:%s/%s", - name=name, - single_collection=True, - optional_deps=optional_deps) + with Perf(metrics, "mapper"): + mapper = ArvPathMapper(arvrunner, sc, "", + "keep:%s", + "keep:%s/%s", + name=name, + single_collection=True, + optional_deps=optional_deps) + + keeprefs = set() + def addkeepref(k): + if k.startswith("keep:"): + keeprefs.add(collection_pdh_pattern.match(k).group(1)) def setloc(p): loc = p.get("location") if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")): p["location"] = mapper.mapper(p["location"]).resolved + addkeepref(p["location"]) return if not loc: @@ -423,7 +518,10 @@ def upload_dependencies(arvrunner, name, document_loader, gp = collection_uuid_pattern.match(loc) if not gp: + # Not a uuid pattern (must be a pdh pattern) + addkeepref(p["location"]) return + uuid = gp.groups()[0] if uuid not in uuid_map: raise SourceLine(p, "location", validate.ValidationException).makeError( @@ -431,13 +529,46 @@ def upload_dependencies(arvrunner, name, document_loader, p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "") p[collectionUUID] = uuid - visit_class(workflowobj, ("File", "Directory"), setloc) - visit_class(discovered, ("File", "Directory"), setloc) + with Perf(metrics, "setloc"): + visit_class(workflowobj, ("File", "Directory"), setloc) + visit_class(discovered, ("File", "Directory"), setloc) if discovered_secondaryfiles is not None: for d in discovered: discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d] + if runtimeContext.copy_deps: + # Find referenced collections and copy them into the + # destination project, for easy sharing. + already_present = list(arvados.util.keyset_list_all(arvrunner.api.collections().list, + filters=[["portable_data_hash", "in", list(keeprefs)], + ["owner_uuid", "=", runtimeContext.project_uuid]], + select=["uuid", "portable_data_hash", "created_at"])) + + keeprefs = keeprefs - set(a["portable_data_hash"] for a in already_present) + for kr in keeprefs: + col = arvrunner.api.collections().list(filters=[["portable_data_hash", "=", kr]], + order="created_at desc", + select=["name", "description", "properties", "portable_data_hash", "manifest_text", "storage_classes_desired", "trash_at"], + limit=1).execute() + if len(col["items"]) == 0: + logger.warning("Cannot find collection with portable data hash %s", kr) + continue + col = col["items"][0] + try: + arvrunner.api.collections().create(body={"collection": { + "owner_uuid": runtimeContext.project_uuid, + "name": col["name"], + "description": col["description"], + "properties": col["properties"], + "portable_data_hash": col["portable_data_hash"], + "manifest_text": col["manifest_text"], + "storage_classes_desired": col["storage_classes_desired"], + "trash_at": col["trash_at"] + }}, ensure_unique_name=True).execute() + except Exception as e: + logger.warning("Unable copy collection to destination: %s", e) + if "$schemas" in workflowobj: sch = CommentedSeq() for s in workflowobj["$schemas"]: @@ -448,32 +579,36 @@ def upload_dependencies(arvrunner, name, document_loader, return mapper -def upload_docker(arvrunner, tool): +def upload_docker(arvrunner, tool, runtimeContext): """Uploads Docker images used in CommandLineTool objects.""" if isinstance(tool, CommandLineTool): (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement") if docker_req: if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers": - # TODO: can be supported by containers API, but not jobs API. raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError( "Option 'dockerOutputDirectory' of DockerRequirement not supported.") - arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid, - arvrunner.runtimeContext.force_docker_pull, - arvrunner.runtimeContext.tmp_outdir_prefix, - arvrunner.runtimeContext.match_local_docker) + + arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, + runtimeContext.project_uuid, + runtimeContext.force_docker_pull, + runtimeContext.tmp_outdir_prefix, + runtimeContext.match_local_docker, + runtimeContext.copy_deps) else: arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__}, - True, arvrunner.project_uuid, - arvrunner.runtimeContext.force_docker_pull, - arvrunner.runtimeContext.tmp_outdir_prefix, - arvrunner.runtimeContext.match_local_docker) + True, + runtimeContext.project_uuid, + runtimeContext.force_docker_pull, + runtimeContext.tmp_outdir_prefix, + runtimeContext.match_local_docker, + runtimeContext.copy_deps) elif isinstance(tool, cwltool.workflow.Workflow): for s in tool.steps: - upload_docker(arvrunner, s.embedded_tool) + upload_docker(arvrunner, s.embedded_tool, runtimeContext) -def packed_workflow(arvrunner, tool, merged_map): +def packed_workflow(arvrunner, tool, merged_map, runtimeContext, git_info): """Create a packed workflow. A "packed" workflow is one where all the components have been combined into a single document.""" @@ -502,16 +637,22 @@ def packed_workflow(arvrunner, tool, merged_map): v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]] if v.get("class") == "DockerRequirement": v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True, - arvrunner.project_uuid, - arvrunner.runtimeContext.force_docker_pull, - arvrunner.runtimeContext.tmp_outdir_prefix, - arvrunner.runtimeContext.match_local_docker) + runtimeContext.project_uuid, + runtimeContext.force_docker_pull, + runtimeContext.tmp_outdir_prefix, + runtimeContext.match_local_docker, + runtimeContext.copy_deps) for l in v: visit(v[l], cur_id) if isinstance(v, list): for l in v: visit(l, cur_id) visit(packed, None) + + if git_info: + for g in git_info: + packed[g] = git_info[g] + return packed @@ -526,7 +667,7 @@ def tag_git_version(packed): packed["http://schema.org/version"] = githash -def upload_job_order(arvrunner, name, tool, job_order): +def upload_job_order(arvrunner, name, tool, job_order, runtimeContext): """Upload local files referenced in the input object and return updated input object with 'location' updated to the proper keep references. """ @@ -562,7 +703,8 @@ def upload_job_order(arvrunner, name, tool, job_order): tool.doc_loader, job_order, job_order.get("id", "#"), - False) + False, + runtimeContext) if "id" in job_order: del job_order["id"] @@ -576,26 +718,30 @@ def upload_job_order(arvrunner, name, tool, job_order): FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"]) -def upload_workflow_deps(arvrunner, tool): +def upload_workflow_deps(arvrunner, tool, runtimeContext): # Ensure that Docker images needed by this workflow are available - upload_docker(arvrunner, tool) + with Perf(metrics, "upload_docker"): + upload_docker(arvrunner, tool, runtimeContext) document_loader = tool.doc_loader merged_map = {} - + tool_dep_cache = {} def upload_tool_deps(deptool): if "id" in deptool: discovered_secondaryfiles = {} - pm = upload_dependencies(arvrunner, - "%s dependencies" % (shortname(deptool["id"])), - document_loader, - deptool, - deptool["id"], - False, - include_primary=False, - discovered_secondaryfiles=discovered_secondaryfiles) + with Perf(metrics, "upload_dependencies %s" % shortname(deptool["id"])): + pm = upload_dependencies(arvrunner, + "%s dependencies" % (shortname(deptool["id"])), + document_loader, + deptool, + deptool["id"], + False, + runtimeContext, + include_primary=False, + discovered_secondaryfiles=discovered_secondaryfiles, + cache=tool_dep_cache) document_loader.idx[deptool["id"]] = deptool toolmap = {} for k,v in pm.items(): @@ -606,19 +752,22 @@ def upload_workflow_deps(arvrunner, tool): return merged_map -def arvados_jobs_image(arvrunner, img): +def arvados_jobs_image(arvrunner, img, runtimeContext): """Determine if the right arvados/jobs image version is available. If not, try to pull and upload it.""" try: - return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid, - arvrunner.runtimeContext.force_docker_pull, - arvrunner.runtimeContext.tmp_outdir_prefix, - arvrunner.runtimeContext.match_local_docker) + return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, + True, + runtimeContext.project_uuid, + runtimeContext.force_docker_pull, + runtimeContext.tmp_outdir_prefix, + runtimeContext.match_local_docker, + runtimeContext.copy_deps) except Exception as e: raise Exception("Docker image %s is not available\n%s" % (img, e) ) -def upload_workflow_collection(arvrunner, name, packed): +def upload_workflow_collection(arvrunner, name, packed, runtimeContext): collection = arvados.collection.Collection(api_client=arvrunner.api, keep_client=arvrunner.keep_client, num_retries=arvrunner.num_retries) @@ -627,15 +776,15 @@ def upload_workflow_collection(arvrunner, name, packed): filters = [["portable_data_hash", "=", collection.portable_data_hash()], ["name", "like", name+"%"]] - if arvrunner.project_uuid: - filters.append(["owner_uuid", "=", arvrunner.project_uuid]) + if runtimeContext.project_uuid: + filters.append(["owner_uuid", "=", runtimeContext.project_uuid]) exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries) if exists["items"]: logger.info("Using collection %s", exists["items"][0]["uuid"]) else: collection.save_new(name=name, - owner_uuid=arvrunner.project_uuid, + owner_uuid=runtimeContext.project_uuid, ensure_unique_name=True, num_retries=arvrunner.num_retries) logger.info("Uploaded to %s", collection.manifest_locator()) @@ -654,7 +803,8 @@ class Runner(Process): intermediate_output_ttl=0, merged_map=None, priority=None, secret_store=None, collection_cache_size=256, - collection_cache_is_default=True): + collection_cache_is_default=True, + git_info=None): loadingContext = loadingContext.copy() loadingContext.metadata = updated_tool.metadata.copy() @@ -683,6 +833,7 @@ class Runner(Process): self.priority = priority self.secret_store = secret_store self.enable_dev = loadingContext.enable_dev + self.git_info = git_info self.submit_runner_cores = 1 self.submit_runner_ram = 1024 # defaut 1 GiB