X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/44e035c4f9873171ae48025515102b513898a7c3..aa03bc3e2a1122e02bb305bda7fadb96c68706b1:/sdk/cwl/arvados_cwl/runner.py diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py index 8540b6d245..644713bce2 100644 --- a/sdk/cwl/arvados_cwl/runner.py +++ b/sdk/cwl/arvados_cwl/runner.py @@ -31,8 +31,7 @@ import cwltool.workflow from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname, Process, fill_in_defaults) from cwltool.load_tool import fetch_document -from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class -from cwltool.utils import aslist +from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class from cwltool.builder import substitute from cwltool.pack import pack from cwltool.update import INTERNAL_VERSION @@ -40,8 +39,9 @@ from cwltool.builder import Builder import schema_salad.validate as validate import arvados.collection +import arvados.util from .util import collectionUUID -import ruamel.yaml as yaml +from ruamel.yaml import YAML from ruamel.yaml.comments import CommentedMap, CommentedSeq import arvados_cwl.arvdocker @@ -106,7 +106,8 @@ def make_builder(joborder, hints, requirements, runtimeContext, metadata): outdir="", # type: Text tmpdir="", # type: Text stagedir="", # type: Text - cwlVersion=metadata.get("http://commonwl.org/cwltool#original_cwlVersion") + cwlVersion=metadata.get("http://commonwl.org/cwltool#original_cwlVersion") or metadata.get("cwlVersion"), + container_engine="docker" ) def search_schemadef(name, reqs): @@ -128,6 +129,9 @@ def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discov set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered) return + if inputschema == "File": + inputschema = {"type": "File"} + if isinstance(inputschema, basestring): sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements)) if sd: @@ -163,17 +167,23 @@ def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discov set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered) elif (inputschema["type"] == "File" and - secondaryspec and isinstance(primary, Mapping) and - primary.get("class") == "File" and - "secondaryFiles" not in primary): + primary.get("class") == "File"): + + if "secondaryFiles" in primary or not secondaryspec: + # Nothing to do. + return + # # Found a file, check for secondaryFiles # specs = [] primary["secondaryFiles"] = secondaryspec for i, sf in enumerate(aslist(secondaryspec)): - pattern = builder.do_eval(sf["pattern"], context=primary) + if builder.cwlVersion == "v1.0": + pattern = sf + else: + pattern = sf["pattern"] if pattern is None: continue if isinstance(pattern, list): @@ -181,7 +191,10 @@ def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discov elif isinstance(pattern, dict): specs.append(pattern) elif isinstance(pattern, str): - specs.append({"pattern": pattern}) + if builder.cwlVersion == "v1.0": + specs.append({"pattern": pattern, "required": True}) + else: + specs.append({"pattern": pattern, "required": sf.get("required")}) else: raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError( "Expression must return list, object, string or null") @@ -190,7 +203,12 @@ def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discov for i, sf in enumerate(specs): if isinstance(sf, dict): if sf.get("class") == "File": - pattern = sf["basename"] + pattern = None + if sf.get("location") is None: + raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError( + "File object is missing 'location': %s" % sf) + sfpath = sf["location"] + required = True else: pattern = sf["pattern"] required = sf.get("required") @@ -201,11 +219,29 @@ def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discov raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError( "Expression must return list, object, string or null") - sfpath = substitute(primary["location"], pattern) + if pattern is not None: + if "${" in pattern or "$(" in pattern: + sfname = builder.do_eval(pattern, context=primary) + else: + sfname = substitute(primary["basename"], pattern) + + if sfname is None: + continue + + p_location = primary["location"] + if "/" in p_location: + sfpath = ( + p_location[0 : p_location.rindex("/") + 1] + + sfname + ) + required = builder.do_eval(required, context=primary) if fsaccess.exists(sfpath): - found.append({"location": sfpath, "class": "File"}) + if pattern is not None: + found.append({"location": sfpath, "class": "File"}) + else: + found.append(sf) elif required: raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError( "Required secondary file '%s' does not exist" % sfpath) @@ -213,7 +249,7 @@ def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discov primary["secondaryFiles"] = cmap(found) if discovered is not None: discovered[primary["location"]] = primary["secondaryFiles"] - elif inputschema["type"] not in primitive_types_set: + elif inputschema["type"] not in primitive_types_set and inputschema["type"] not in ("File", "Directory"): set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered) def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None): @@ -223,7 +259,7 @@ def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=No set_secondary(fsaccess, builder, inputschema, None, primary, discovered) def upload_dependencies(arvrunner, name, document_loader, - workflowobj, uri, loadref_run, + workflowobj, uri, loadref_run, runtimeContext, include_primary=True, discovered_secondaryfiles=None): """Upload the dependencies of the workflowobj document to Keep. @@ -249,7 +285,8 @@ def upload_dependencies(arvrunner, name, document_loader, textIO = StringIO(text.decode('utf-8')) else: textIO = StringIO(text) - return yaml.safe_load(textIO) + yamlloader = YAML(typ='safe', pure=True) + return yamlloader.load(textIO) else: return {} @@ -264,10 +301,21 @@ def upload_dependencies(arvrunner, name, document_loader, # that external references in $include and $mixin are captured. scanobj = loadref("", workflowobj["id"]) + metadata = scanobj + sc_result = scandeps(uri, scanobj, - loadref_fields, - set(("$include", "$schemas", "location")), - loadref, urljoin=document_loader.fetcher.urljoin) + loadref_fields, + set(("$include", "location")), + loadref, urljoin=document_loader.fetcher.urljoin, + nestdirs=False) + + optional_deps = scandeps(uri, scanobj, + loadref_fields, + set(("$schemas",)), + loadref, urljoin=document_loader.fetcher.urljoin, + nestdirs=False) + + sc_result.extend(optional_deps) sc = [] uuids = {} @@ -325,24 +373,14 @@ def upload_dependencies(arvrunner, name, document_loader, if include_primary and "id" in workflowobj: sc.append({"class": "File", "location": workflowobj["id"]}) - if "$schemas" in workflowobj: - for s in workflowobj["$schemas"]: - sc.append({"class": "File", "location": s}) - def visit_default(obj): - remove = [False] - def ensure_default_location(f): + def defaults_are_optional(f): if "location" not in f and "path" in f: f["location"] = f["path"] del f["path"] - if "location" in f and not arvrunner.fs_access.exists(f["location"]): - # Doesn't exist, remove from list of dependencies to upload - sc[:] = [x for x in sc if x["location"] != f["location"]] - # Delete "default" from workflowobj - remove[0] = True - visit_class(obj["default"], ("File", "Directory"), ensure_default_location) - if remove[0]: - del obj["default"] + normalizeFilesDirs(f) + optional_deps.append(f) + visit_class(obj["default"], ("File", "Directory"), defaults_are_optional) find_defaults(workflowobj, visit_default) @@ -356,7 +394,7 @@ def upload_dependencies(arvrunner, name, document_loader, obj.get("hints", []), obj.get("requirements", []), ArvRuntimeContext(), - {}) + metadata) discover_secondary_files(arvrunner.fs_access, builder, obj["inputs"], @@ -378,12 +416,19 @@ def upload_dependencies(arvrunner, name, document_loader, "keep:%s", "keep:%s/%s", name=name, - single_collection=True) + single_collection=True, + optional_deps=optional_deps) + + keeprefs = set() + def addkeepref(k): + if k.startswith("keep:"): + keeprefs.add(collection_pdh_pattern.match(k).group(1)) def setloc(p): loc = p.get("location") if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")): p["location"] = mapper.mapper(p["location"]).resolved + addkeepref(p["location"]) return if not loc: @@ -405,7 +450,10 @@ def upload_dependencies(arvrunner, name, document_loader, gp = collection_uuid_pattern.match(loc) if not gp: + # Not a uuid pattern (must be a pdh pattern) + addkeepref(p["location"]) return + uuid = gp.groups()[0] if uuid not in uuid_map: raise SourceLine(p, "location", validate.ValidationException).makeError( @@ -420,34 +468,78 @@ def upload_dependencies(arvrunner, name, document_loader, for d in discovered: discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d] + if runtimeContext.copy_deps: + # Find referenced collections and copy them into the + # destination project, for easy sharing. + already_present = list(arvados.util.keyset_list_all(arvrunner.api.collections().list, + filters=[["portable_data_hash", "in", list(keeprefs)], + ["owner_uuid", "=", runtimeContext.project_uuid]], + select=["uuid", "portable_data_hash", "created_at"])) + + keeprefs = keeprefs - set(a["portable_data_hash"] for a in already_present) + for kr in keeprefs: + col = arvrunner.api.collections().list(filters=[["portable_data_hash", "=", kr]], + order="created_at desc", + select=["name", "description", "properties", "portable_data_hash", "manifest_text", "storage_classes_desired", "trash_at"], + limit=1).execute() + if len(col["items"]) == 0: + logger.warning("Cannot find collection with portable data hash %s", kr) + continue + col = col["items"][0] + try: + arvrunner.api.collections().create(body={"collection": { + "owner_uuid": runtimeContext.project_uuid, + "name": col["name"], + "description": col["description"], + "properties": col["properties"], + "portable_data_hash": col["portable_data_hash"], + "manifest_text": col["manifest_text"], + "storage_classes_desired": col["storage_classes_desired"], + "trash_at": col["trash_at"] + }}, ensure_unique_name=True).execute() + except Exception as e: + logger.warning("Unable copy collection to destination: %s", e) + if "$schemas" in workflowobj: sch = CommentedSeq() for s in workflowobj["$schemas"]: - sch.append(mapper.mapper(s).resolved) + if s in mapper: + sch.append(mapper.mapper(s).resolved) workflowobj["$schemas"] = sch return mapper -def upload_docker(arvrunner, tool): +def upload_docker(arvrunner, tool, runtimeContext): """Uploads Docker images used in CommandLineTool objects.""" if isinstance(tool, CommandLineTool): (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement") if docker_req: if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers": - # TODO: can be supported by containers API, but not jobs API. raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError( "Option 'dockerOutputDirectory' of DockerRequirement not supported.") - arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid) + + arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, + runtimeContext.project_uuid, + runtimeContext.force_docker_pull, + runtimeContext.tmp_outdir_prefix, + runtimeContext.match_local_docker, + runtimeContext.copy_deps) else: - arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs"}, True, arvrunner.project_uuid) + arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__}, + True, + runtimeContext.project_uuid, + runtimeContext.force_docker_pull, + runtimeContext.tmp_outdir_prefix, + runtimeContext.match_local_docker, + runtimeContext.copy_deps) elif isinstance(tool, cwltool.workflow.Workflow): for s in tool.steps: - upload_docker(arvrunner, s.embedded_tool) + upload_docker(arvrunner, s.embedded_tool, runtimeContext) -def packed_workflow(arvrunner, tool, merged_map): +def packed_workflow(arvrunner, tool, merged_map, runtimeContext): """Create a packed workflow. A "packed" workflow is one where all the components have been combined into a single document.""" @@ -461,7 +553,7 @@ def packed_workflow(arvrunner, tool, merged_map): def visit(v, cur_id): if isinstance(v, dict): - if v.get("class") in ("CommandLineTool", "Workflow"): + if v.get("class") in ("CommandLineTool", "Workflow", "ExpressionTool"): if tool.metadata["cwlVersion"] == "v1.0" and "id" not in v: raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field, add an 'id' or use to cwlVersion: v1.1") if "id" in v: @@ -469,12 +561,18 @@ def packed_workflow(arvrunner, tool, merged_map): if "path" in v and "location" not in v: v["location"] = v["path"] del v["path"] - if "location" in v and not v["location"].startswith("keep:"): - v["location"] = merged_map[cur_id].resolved[v["location"]] - if "location" in v and v["location"] in merged_map[cur_id].secondaryFiles: - v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]] + if "location" in v and cur_id in merged_map: + if v["location"] in merged_map[cur_id].resolved: + v["location"] = merged_map[cur_id].resolved[v["location"]] + if v["location"] in merged_map[cur_id].secondaryFiles: + v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]] if v.get("class") == "DockerRequirement": - v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True, arvrunner.project_uuid) + v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True, + runtimeContext.project_uuid, + runtimeContext.force_docker_pull, + runtimeContext.tmp_outdir_prefix, + runtimeContext.match_local_docker, + runtimeContext.copy_deps) for l in v: visit(v[l], cur_id) if isinstance(v, list): @@ -495,7 +593,7 @@ def tag_git_version(packed): packed["http://schema.org/version"] = githash -def upload_job_order(arvrunner, name, tool, job_order): +def upload_job_order(arvrunner, name, tool, job_order, runtimeContext): """Upload local files referenced in the input object and return updated input object with 'location' updated to the proper keep references. """ @@ -531,7 +629,8 @@ def upload_job_order(arvrunner, name, tool, job_order): tool.doc_loader, job_order, job_order.get("id", "#"), - False) + False, + runtimeContext) if "id" in job_order: del job_order["id"] @@ -545,10 +644,10 @@ def upload_job_order(arvrunner, name, tool, job_order): FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"]) -def upload_workflow_deps(arvrunner, tool): +def upload_workflow_deps(arvrunner, tool, runtimeContext): # Ensure that Docker images needed by this workflow are available - upload_docker(arvrunner, tool) + upload_docker(arvrunner, tool, runtimeContext) document_loader = tool.doc_loader @@ -563,6 +662,7 @@ def upload_workflow_deps(arvrunner, tool): deptool, deptool["id"], False, + runtimeContext, include_primary=False, discovered_secondaryfiles=discovered_secondaryfiles) document_loader.idx[deptool["id"]] = deptool @@ -575,16 +675,22 @@ def upload_workflow_deps(arvrunner, tool): return merged_map -def arvados_jobs_image(arvrunner, img): +def arvados_jobs_image(arvrunner, img, runtimeContext): """Determine if the right arvados/jobs image version is available. If not, try to pull and upload it.""" try: - return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid) + return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, + True, + runtimeContext.project_uuid, + runtimeContext.force_docker_pull, + runtimeContext.tmp_outdir_prefix, + runtimeContext.match_local_docker, + runtimeContext.copy_deps) except Exception as e: raise Exception("Docker image %s is not available\n%s" % (img, e) ) -def upload_workflow_collection(arvrunner, name, packed): +def upload_workflow_collection(arvrunner, name, packed, runtimeContext): collection = arvados.collection.Collection(api_client=arvrunner.api, keep_client=arvrunner.keep_client, num_retries=arvrunner.num_retries) @@ -593,15 +699,15 @@ def upload_workflow_collection(arvrunner, name, packed): filters = [["portable_data_hash", "=", collection.portable_data_hash()], ["name", "like", name+"%"]] - if arvrunner.project_uuid: - filters.append(["owner_uuid", "=", arvrunner.project_uuid]) + if runtimeContext.project_uuid: + filters.append(["owner_uuid", "=", runtimeContext.project_uuid]) exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries) if exists["items"]: logger.info("Using collection %s", exists["items"][0]["uuid"]) else: collection.save_new(name=name, - owner_uuid=arvrunner.project_uuid, + owner_uuid=runtimeContext.project_uuid, ensure_unique_name=True, num_retries=arvrunner.num_retries) logger.info("Uploaded to %s", collection.manifest_locator())