X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/a5804b1e5b04f8f4d0fb1ca67cf7fe8f15d61ec1..adc0f36eeab40f4b8e0603247392b3c804d7272a:/sdk/cwl/arvados_cwl/runner.py diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py index ada64ae69a..ddd1d2bacf 100644 --- a/sdk/cwl/arvados_cwl/runner.py +++ b/sdk/cwl/arvados_cwl/runner.py @@ -40,7 +40,7 @@ import schema_salad.validate as validate import arvados.collection from .util import collectionUUID -import ruamel.yaml as yaml +from ruamel.yaml import YAML from ruamel.yaml.comments import CommentedMap, CommentedSeq import arvados_cwl.arvdocker @@ -184,7 +184,10 @@ def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discov elif isinstance(pattern, dict): specs.append(pattern) elif isinstance(pattern, str): - specs.append({"pattern": pattern, "required": sf.get("required")}) + if builder.cwlVersion == "v1.0": + specs.append({"pattern": pattern, "required": True}) + else: + specs.append({"pattern": pattern, "required": sf.get("required")}) else: raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError( "Expression must return list, object, string or null") @@ -194,6 +197,9 @@ def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discov if isinstance(sf, dict): if sf.get("class") == "File": pattern = None + if sf.get("location") is None: + raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError( + "File object is missing 'location': %s" % sf) sfpath = sf["location"] required = True else: @@ -259,7 +265,8 @@ def upload_dependencies(arvrunner, name, document_loader, textIO = StringIO(text.decode('utf-8')) else: textIO = StringIO(text) - return yaml.safe_load(textIO) + yamlloader = YAML(typ='safe', pure=True) + return yamlloader.load(textIO) else: return {} @@ -277,9 +284,18 @@ def upload_dependencies(arvrunner, name, document_loader, metadata = scanobj sc_result = scandeps(uri, scanobj, - loadref_fields, - set(("$include", "$schemas", "location")), - loadref, urljoin=document_loader.fetcher.urljoin) + loadref_fields, + set(("$include", "location")), + loadref, urljoin=document_loader.fetcher.urljoin, + nestdirs=False) + + optional_deps = scandeps(uri, scanobj, + loadref_fields, + set(("$schemas",)), + loadref, urljoin=document_loader.fetcher.urljoin, + nestdirs=False) + + sc_result.extend(optional_deps) sc = [] uuids = {} @@ -337,24 +353,25 @@ def upload_dependencies(arvrunner, name, document_loader, if include_primary and "id" in workflowobj: sc.append({"class": "File", "location": workflowobj["id"]}) - if "$schemas" in workflowobj: - for s in workflowobj["$schemas"]: - sc.append({"class": "File", "location": s}) + #if "$schemas" in workflowobj: + # for s in workflowobj["$schemas"]: + # sc.append({"class": "File", "location": s}) def visit_default(obj): - remove = [False] + #remove = [False] def ensure_default_location(f): if "location" not in f and "path" in f: f["location"] = f["path"] del f["path"] - if "location" in f and not arvrunner.fs_access.exists(f["location"]): - # Doesn't exist, remove from list of dependencies to upload - sc[:] = [x for x in sc if x["location"] != f["location"]] - # Delete "default" from workflowobj - remove[0] = True + optional_deps.append(f) + #if "location" in f and not arvrunner.fs_access.exists(f["location"]): + # # Doesn't exist, remove from list of dependencies to upload + # sc[:] = [x for x in sc if x["location"] != f["location"]] + # # Delete "default" from workflowobj + # remove[0] = True visit_class(obj["default"], ("File", "Directory"), ensure_default_location) - if remove[0]: - del obj["default"] + #if remove[0]: + # del obj["default"] find_defaults(workflowobj, visit_default) @@ -390,7 +407,8 @@ def upload_dependencies(arvrunner, name, document_loader, "keep:%s", "keep:%s/%s", name=name, - single_collection=True) + single_collection=True, + optional_deps=optional_deps) def setloc(p): loc = p.get("location") @@ -454,12 +472,14 @@ def upload_docker(arvrunner, tool): "Option 'dockerOutputDirectory' of DockerRequirement not supported.") arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid, arvrunner.runtimeContext.force_docker_pull, - arvrunner.runtimeContext.tmp_outdir_prefix) + arvrunner.runtimeContext.tmp_outdir_prefix, + arvrunner.runtimeContext.match_local_docker) else: arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__}, True, arvrunner.project_uuid, arvrunner.runtimeContext.force_docker_pull, - arvrunner.runtimeContext.tmp_outdir_prefix) + arvrunner.runtimeContext.tmp_outdir_prefix, + arvrunner.runtimeContext.match_local_docker) elif isinstance(tool, cwltool.workflow.Workflow): for s in tool.steps: upload_docker(arvrunner, s.embedded_tool) @@ -496,7 +516,8 @@ def packed_workflow(arvrunner, tool, merged_map): v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True, arvrunner.project_uuid, arvrunner.runtimeContext.force_docker_pull, - arvrunner.runtimeContext.tmp_outdir_prefix) + arvrunner.runtimeContext.tmp_outdir_prefix, + arvrunner.runtimeContext.match_local_docker) for l in v: visit(v[l], cur_id) if isinstance(v, list): @@ -603,7 +624,8 @@ def arvados_jobs_image(arvrunner, img): try: return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid, arvrunner.runtimeContext.force_docker_pull, - arvrunner.runtimeContext.tmp_outdir_prefix) + arvrunner.runtimeContext.tmp_outdir_prefix, + arvrunner.runtimeContext.match_local_docker) except Exception as e: raise Exception("Docker image %s is not available\n%s" % (img, e) )