X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/b8de9b3e62e82b806576b237be5f317bf378169f..8de338d1178abb71addc344382657d3826d7f0bb:/sdk/cwl/arvados_cwl/runner.py diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py index d3e0a0e107..69e4f5bd7b 100644 --- a/sdk/cwl/arvados_cwl/runner.py +++ b/sdk/cwl/arvados_cwl/runner.py @@ -3,7 +3,6 @@ import urlparse from functools import partial import logging import json -import re import subprocess from StringIO import StringIO @@ -24,26 +23,22 @@ import arvados.collection import ruamel.yaml as yaml from .arvdocker import arv_docker_get_image -from .pathmapper import ArvPathMapper +from .pathmapper import ArvPathMapper, trim_listing from ._version import __version__ from . import done logger = logging.getLogger('arvados.cwl-runner') -cwltool.draft2tool.ACCEPTLIST_RE = re.compile(r".*") +def trim_anonymous_location(obj): + """Remove 'location' field from File and Directory literals. -def trim_listing(obj): - """Remove 'listing' field from Directory objects that are keep references. + To make internal handling easier, literals are assigned a random id for + 'location'. However, when writing the record back out, this can break + reproducibility. Since it is valid for literals not have a 'location' + field, remove it. - When Directory objects represent Keep references, it redundant and - potentially very expensive to pass fully enumerated Directory objects - between instances of cwl-runner (e.g. a submitting a job, or using the - RunInSingleContainer feature), so delete the 'listing' field when it is - safe to do so. """ - if obj.get("location", "").startswith("keep:") and "listing" in obj: - del obj["listing"] if obj.get("location", "").startswith("_:"): del obj["location"] @@ -123,7 +118,8 @@ def upload_dependencies(arvrunner, name, document_loader, def upload_docker(arvrunner, tool): - """Visitor which uploads Docker images referenced in CommandLineTool objects.""" + """Uploads Docker images used in CommandLineTool objects.""" + if isinstance(tool, CommandLineTool): (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement") if docker_req: @@ -132,6 +128,9 @@ def upload_docker(arvrunner, tool): raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError( "Option 'dockerOutputDirectory' of DockerRequirement not supported.") arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid) + elif isinstance(tool, cwltool.workflow.Workflow): + for s in tool.steps: + upload_docker(arvrunner, s.embedded_tool) def packed_workflow(arvrunner, tool): """Create a packed workflow. @@ -189,7 +188,8 @@ def upload_job_order(arvrunner, name, tool, job_order): def upload_workflow_deps(arvrunner, tool): # Ensure that Docker images needed by this workflow are available - tool.visit(partial(upload_docker, arvrunner)) + + upload_docker(arvrunner, tool) document_loader = tool.doc_loader @@ -215,6 +215,31 @@ def arvados_jobs_image(arvrunner, img): raise Exception("Docker image %s is not available\n%s" % (img, e) ) return img +def upload_workflow_collection(arvrunner, name, packed): + collection = arvados.collection.Collection(api_client=arvrunner.api, + keep_client=arvrunner.keep_client, + num_retries=arvrunner.num_retries) + with collection.open("workflow.cwl", "w") as f: + f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': '))) + + filters = [["portable_data_hash", "=", collection.portable_data_hash()], + ["name", "like", name+"%"]] + if arvrunner.project_uuid: + filters.append(["owner_uuid", "=", arvrunner.project_uuid]) + exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries) + + if exists["items"]: + logger.info("Using collection %s", exists["items"][0]["uuid"]) + else: + collection.save_new(name=name, + owner_uuid=arvrunner.project_uuid, + ensure_unique_name=True, + num_retries=arvrunner.num_retries) + logger.info("Uploaded to %s", collection.manifest_locator()) + + return collection.portable_data_hash() + + class Runner(object): """Base class for runner processes, which submit an instance of arvados-cwl-runner and wait for the final result.""" @@ -238,7 +263,7 @@ class Runner(object): if submit_runner_ram: self.submit_runner_ram = submit_runner_ram else: - self.submit_runner_ram = 1024 + self.submit_runner_ram = 3000 if self.submit_runner_ram <= 0: raise Exception("Value of --submit-runner-ram must be greater than zero")