X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/535856c28a12bb07dc986b980b0f4ccfdfd25640..8d8847e070f588b5d85ac2d7123fd929b4d417cd:/sdk/cwl/arvados_cwl/runner.py?ds=inline diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py index 9c422a42f7..c1a98e7456 100644 --- a/sdk/cwl/arvados_cwl/runner.py +++ b/sdk/cwl/arvados_cwl/runner.py @@ -7,7 +7,7 @@ import urlparse from functools import partial import logging import json -import subprocess +import subprocess32 as subprocess from collections import namedtuple from StringIO import StringIO @@ -26,7 +26,7 @@ from cwltool.pack import pack import arvados.collection import ruamel.yaml as yaml -from .arvdocker import arv_docker_get_image +import arvados_cwl.arvdocker from .pathmapper import ArvPathMapper, trim_listing from ._version import __version__ from . import done @@ -129,7 +129,10 @@ def upload_dependencies(arvrunner, name, document_loader, sc = [] def only_real(obj): - if obj.get("location", "").startswith("file:"): + # Only interested in local files than need to be uploaded, + # don't include file literals, keep references, etc. + sp = obj.get("location", "").split(":") + if len(sp) > 1 and sp[0] in ("file", "http", "https"): sc.append(obj) visit_class(sc_result, ("File", "Directory"), only_real) @@ -168,8 +171,13 @@ def upload_dependencies(arvrunner, name, document_loader, visit_class(workflowobj, ("CommandLineTool", "Workflow"), discover_default_secondary_files) - for d in discovered: - sc.extend(discovered[d]) + for d in list(discovered.keys()): + # Only interested in discovered secondaryFiles which are local + # files that need to be uploaded. + if d.startswith("file:"): + sc.extend(discovered[d]) + else: + del discovered[d] mapper = ArvPathMapper(arvrunner, sc, "", "keep:%s", @@ -207,9 +215,9 @@ def upload_docker(arvrunner, tool): # TODO: can be supported by containers API, but not jobs API. raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError( "Option 'dockerOutputDirectory' of DockerRequirement not supported.") - arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid) + arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid) else: - arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs"}, True, arvrunner.project_uuid) + arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs"}, True, arvrunner.project_uuid) elif isinstance(tool, cwltool.workflow.Workflow): for s in tool.steps: upload_docker(arvrunner, s.embedded_tool) @@ -236,6 +244,8 @@ def packed_workflow(arvrunner, tool, merged_map): v["location"] = merged_map[cur_id].resolved[v["location"]] if "location" in v and v["location"] in merged_map[cur_id].secondaryFiles: v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]] + if v.get("class") == "DockerRequirement": + v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True, arvrunner.project_uuid) for l in v: visit(v[l], cur_id) if isinstance(v, list): @@ -316,10 +326,10 @@ def arvados_jobs_image(arvrunner, img): """Determine if the right arvados/jobs image version is available. If not, try to pull and upload it.""" try: - arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid) + return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid) except Exception as e: raise Exception("Docker image %s is not available\n%s" % (img, e) ) - return img + def upload_workflow_collection(arvrunner, name, packed): collection = arvados.collection.Collection(api_client=arvrunner.api, @@ -354,7 +364,7 @@ class Runner(object): output_name, output_tags, submit_runner_ram=0, name=None, on_error=None, submit_runner_image=None, intermediate_output_ttl=0, merged_map=None, - priority=None, secret_store=None): + priority=None, secret_store=None, collection_cache_size=None): self.arvrunner = runner self.tool = tool self.job_order = job_order @@ -379,6 +389,7 @@ class Runner(object): self.submit_runner_cores = 1 self.submit_runner_ram = 1024 # defaut 1 GiB + self.collection_cache_size = 256 runner_resource_req, _ = self.tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources") if runner_resource_req: @@ -386,11 +397,17 @@ class Runner(object): self.submit_runner_cores = runner_resource_req["coresMin"] if runner_resource_req.get("ramMin"): self.submit_runner_ram = runner_resource_req["ramMin"] + if runner_resource_req.get("keep_cache"): + self.collection_cache_size = runner_resource_req["keep_cache"] if submit_runner_ram: # Command line / initializer overrides default and/or spec from workflow self.submit_runner_ram = submit_runner_ram + if collection_cache_size: + # Command line / initializer overrides default and/or spec from workflow + self.collection_cache_size = collection_cache_size + if self.submit_runner_ram <= 0: raise Exception("Value of submit-runner-ram must be greater than zero")