X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/419db47f0e97123cb3ff491d189b5607468101da..8d8847e070f588b5d85ac2d7123fd929b4d417cd:/sdk/cwl/arvados_cwl/runner.py diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py index 3ce08f6cc7..c1a98e7456 100644 --- a/sdk/cwl/arvados_cwl/runner.py +++ b/sdk/cwl/arvados_cwl/runner.py @@ -7,7 +7,7 @@ import urlparse from functools import partial import logging import json -import subprocess +import subprocess32 as subprocess from collections import namedtuple from StringIO import StringIO @@ -16,7 +16,7 @@ from schema_salad.sourceline import SourceLine, cmap from cwltool.command_line_tool import CommandLineTool import cwltool.workflow -from cwltool.process import get_feature, scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname +from cwltool.process import scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname from cwltool.load_tool import fetch_document from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class from cwltool.utils import aslist @@ -26,7 +26,7 @@ from cwltool.pack import pack import arvados.collection import ruamel.yaml as yaml -from .arvdocker import arv_docker_get_image +import arvados_cwl.arvdocker from .pathmapper import ArvPathMapper, trim_listing from ._version import __version__ from . import done @@ -122,11 +122,21 @@ def upload_dependencies(arvrunner, name, document_loader, # that external references in $include and $mixin are captured. scanobj = loadref("", workflowobj["id"]) - sc = scandeps(uri, scanobj, + sc_result = scandeps(uri, scanobj, loadref_fields, set(("$include", "$schemas", "location")), loadref, urljoin=document_loader.fetcher.urljoin) + sc = [] + def only_real(obj): + # Only interested in local files than need to be uploaded, + # don't include file literals, keep references, etc. + sp = obj.get("location", "").split(":") + if len(sp) > 1 and sp[0] in ("file", "http", "https"): + sc.append(obj) + + visit_class(sc_result, ("File", "Directory"), only_real) + normalizeFilesDirs(sc) if include_primary and "id" in workflowobj: @@ -161,8 +171,13 @@ def upload_dependencies(arvrunner, name, document_loader, visit_class(workflowobj, ("CommandLineTool", "Workflow"), discover_default_secondary_files) - for d in discovered: - sc.extend(discovered[d]) + for d in list(discovered.keys()): + # Only interested in discovered secondaryFiles which are local + # files that need to be uploaded. + if d.startswith("file:"): + sc.extend(discovered[d]) + else: + del discovered[d] mapper = ArvPathMapper(arvrunner, sc, "", "keep:%s", @@ -194,15 +209,15 @@ def upload_docker(arvrunner, tool): """Uploads Docker images used in CommandLineTool objects.""" if isinstance(tool, CommandLineTool): - (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement") + (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement") if docker_req: if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers": # TODO: can be supported by containers API, but not jobs API. raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError( "Option 'dockerOutputDirectory' of DockerRequirement not supported.") - arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid) + arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid) else: - arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs"}, True, arvrunner.project_uuid) + arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs"}, True, arvrunner.project_uuid) elif isinstance(tool, cwltool.workflow.Workflow): for s in tool.steps: upload_docker(arvrunner, s.embedded_tool) @@ -229,6 +244,8 @@ def packed_workflow(arvrunner, tool, merged_map): v["location"] = merged_map[cur_id].resolved[v["location"]] if "location" in v and v["location"] in merged_map[cur_id].secondaryFiles: v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]] + if v.get("class") == "DockerRequirement": + v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True, arvrunner.project_uuid) for l in v: visit(v[l], cur_id) if isinstance(v, list): @@ -309,10 +326,10 @@ def arvados_jobs_image(arvrunner, img): """Determine if the right arvados/jobs image version is available. If not, try to pull and upload it.""" try: - arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid) + return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid) except Exception as e: raise Exception("Docker image %s is not available\n%s" % (img, e) ) - return img + def upload_workflow_collection(arvrunner, name, packed): collection = arvados.collection.Collection(api_client=arvrunner.api, @@ -346,8 +363,8 @@ class Runner(object): def __init__(self, runner, tool, job_order, enable_reuse, output_name, output_tags, submit_runner_ram=0, name=None, on_error=None, submit_runner_image=None, - intermediate_output_ttl=0, merged_map=None, priority=None, - secret_store=None): + intermediate_output_ttl=0, merged_map=None, + priority=None, secret_store=None, collection_cache_size=None): self.arvrunner = runner self.tool = tool self.job_order = job_order @@ -355,7 +372,7 @@ class Runner(object): if enable_reuse: # If reuse is permitted by command line arguments but # disabled by the workflow itself, disable it. - reuse_req, _ = get_feature(self.tool, "http://arvados.org/cwl#ReuseRequirement") + reuse_req, _ = self.tool.get_requirement("http://arvados.org/cwl#ReuseRequirement") if reuse_req: enable_reuse = reuse_req["enableReuse"] self.enable_reuse = enable_reuse @@ -370,13 +387,32 @@ class Runner(object): self.priority = priority self.secret_store = secret_store + self.submit_runner_cores = 1 + self.submit_runner_ram = 1024 # defaut 1 GiB + self.collection_cache_size = 256 + + runner_resource_req, _ = self.tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources") + if runner_resource_req: + if runner_resource_req.get("coresMin"): + self.submit_runner_cores = runner_resource_req["coresMin"] + if runner_resource_req.get("ramMin"): + self.submit_runner_ram = runner_resource_req["ramMin"] + if runner_resource_req.get("keep_cache"): + self.collection_cache_size = runner_resource_req["keep_cache"] + if submit_runner_ram: + # Command line / initializer overrides default and/or spec from workflow self.submit_runner_ram = submit_runner_ram - else: - self.submit_runner_ram = 3000 + + if collection_cache_size: + # Command line / initializer overrides default and/or spec from workflow + self.collection_cache_size = collection_cache_size if self.submit_runner_ram <= 0: - raise Exception("Value of --submit-runner-ram must be greater than zero") + raise Exception("Value of submit-runner-ram must be greater than zero") + + if self.submit_runner_cores <= 0: + raise Exception("Value of submit-runner-cores must be greater than zero") self.merged_map = merged_map or {} @@ -407,7 +443,7 @@ class Runner(object): api_client=self.arvrunner.api, keep_client=self.arvrunner.keep_client, num_retries=self.arvrunner.num_retries) - done.logtail(logc, logger, "%s error log:" % self.arvrunner.label(self), maxlen=40) + done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40) self.final_output = record["output"] outc = arvados.collection.CollectionReader(self.final_output,