X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/466e236c4174de64bdefeb1766160a11217c8c1c..81693ee04f18e68558c9206705ef589cca2460b0:/sdk/cwl/arvados_cwl/runner.py diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py index 41166c5122..e515ac2ce5 100644 --- a/sdk/cwl/arvados_cwl/runner.py +++ b/sdk/cwl/arvados_cwl/runner.py @@ -2,21 +2,29 @@ # # SPDX-License-Identifier: Apache-2.0 +from future import standard_library +standard_library.install_aliases() +from future.utils import viewvalues, viewitems + import os -import urlparse +import sys +import urllib.parse from functools import partial import logging import json -import subprocess32 as subprocess from collections import namedtuple +from io import StringIO -from StringIO import StringIO +if os.name == "posix" and sys.version_info[0] < 3: + import subprocess32 as subprocess +else: + import subprocess from schema_salad.sourceline import SourceLine, cmap from cwltool.command_line_tool import CommandLineTool import cwltool.workflow -from cwltool.process import scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname +from cwltool.process import scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname, Process from cwltool.load_tool import fetch_document from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class from cwltool.utils import aslist @@ -26,7 +34,7 @@ from cwltool.pack import pack import arvados.collection import ruamel.yaml as yaml -from .arvdocker import arv_docker_get_image +import arvados_cwl.arvdocker from .pathmapper import ArvPathMapper, trim_listing from ._version import __version__ from . import done @@ -61,7 +69,7 @@ def find_defaults(d, op): if "default" in d: op(d) else: - for i in d.itervalues(): + for i in viewvalues(d): find_defaults(i, op) def setSecondary(t, fileobj, discovered): @@ -98,7 +106,7 @@ def upload_dependencies(arvrunner, name, document_loader, loaded = set() def loadref(b, u): joined = document_loader.fetcher.urljoin(b, u) - defrg, _ = urlparse.urldefrag(joined) + defrg, _ = urllib.parse.urldefrag(joined) if defrg not in loaded: loaded.add(defrg) # Use fetch_text to get raw file (before preprocessing). @@ -171,7 +179,7 @@ def upload_dependencies(arvrunner, name, document_loader, visit_class(workflowobj, ("CommandLineTool", "Workflow"), discover_default_secondary_files) - for d in list(discovered.keys()): + for d in list(discovered): # Only interested in discovered secondaryFiles which are local # files that need to be uploaded. if d.startswith("file:"): @@ -215,9 +223,9 @@ def upload_docker(arvrunner, tool): # TODO: can be supported by containers API, but not jobs API. raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError( "Option 'dockerOutputDirectory' of DockerRequirement not supported.") - arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid) + arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid) else: - arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs"}, True, arvrunner.project_uuid) + arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs"}, True, arvrunner.project_uuid) elif isinstance(tool, cwltool.workflow.Workflow): for s in tool.steps: upload_docker(arvrunner, s.embedded_tool) @@ -232,7 +240,7 @@ def packed_workflow(arvrunner, tool, merged_map): packed = pack(tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]), tool.tool["id"], tool.metadata, rewrite_out=rewrites) - rewrite_to_orig = {v: k for k,v in rewrites.items()} + rewrite_to_orig = {v: k for k,v in viewitems(rewrites)} def visit(v, cur_id): if isinstance(v, dict): @@ -244,6 +252,8 @@ def packed_workflow(arvrunner, tool, merged_map): v["location"] = merged_map[cur_id].resolved[v["location"]] if "location" in v and v["location"] in merged_map[cur_id].secondaryFiles: v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]] + if v.get("class") == "DockerRequirement": + v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True, arvrunner.project_uuid) for l in v: visit(v[l], cur_id) if isinstance(v, list): @@ -324,10 +334,10 @@ def arvados_jobs_image(arvrunner, img): """Determine if the right arvados/jobs image version is available. If not, try to pull and upload it.""" try: - arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid) + return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid) except Exception as e: raise Exception("Docker image %s is not available\n%s" % (img, e) ) - return img + def upload_workflow_collection(arvrunner, name, packed): collection = arvados.collection.Collection(api_client=arvrunner.api, @@ -354,23 +364,28 @@ def upload_workflow_collection(arvrunner, name, packed): return collection.portable_data_hash() -class Runner(object): +class Runner(Process): """Base class for runner processes, which submit an instance of arvados-cwl-runner and wait for the final result.""" - def __init__(self, runner, tool, job_order, enable_reuse, + def __init__(self, runner, tool, loadingContext, enable_reuse, output_name, output_tags, submit_runner_ram=0, name=None, on_error=None, submit_runner_image=None, intermediate_output_ttl=0, merged_map=None, - priority=None, secret_store=None): + priority=None, secret_store=None, + collection_cache_size=256, + collection_cache_is_default=True): + + super(Runner, self).__init__(tool.tool, loadingContext) + self.arvrunner = runner - self.tool = tool - self.job_order = job_order + self.embedded_tool = tool + self.job_order = None self.running = False if enable_reuse: # If reuse is permitted by command line arguments but # disabled by the workflow itself, disable it. - reuse_req, _ = self.tool.get_requirement("http://arvados.org/cwl#ReuseRequirement") + reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement") if reuse_req: enable_reuse = reuse_req["enableReuse"] self.enable_reuse = enable_reuse @@ -387,13 +402,16 @@ class Runner(object): self.submit_runner_cores = 1 self.submit_runner_ram = 1024 # defaut 1 GiB + self.collection_cache_size = collection_cache_size - runner_resource_req, _ = self.tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources") + runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources") if runner_resource_req: if runner_resource_req.get("coresMin"): self.submit_runner_cores = runner_resource_req["coresMin"] if runner_resource_req.get("ramMin"): self.submit_runner_ram = runner_resource_req["ramMin"] + if runner_resource_req.get("keep_cache") and collection_cache_is_default: + self.collection_cache_size = runner_resource_req["keep_cache"] if submit_runner_ram: # Command line / initializer overrides default and/or spec from workflow @@ -407,6 +425,15 @@ class Runner(object): self.merged_map = merged_map or {} + def job(self, + job_order, # type: Mapping[Text, Text] + output_callbacks, # type: Callable[[Any, Any], Any] + runtimeContext # type: RuntimeContext + ): # type: (...) -> Generator[Any, None, None] + self.job_order = job_order + self._init_job(job_order, runtimeContext) + yield self + def update_pipeline_component(self, record): pass @@ -442,17 +469,17 @@ class Runner(object): keep_client=self.arvrunner.keep_client, num_retries=self.arvrunner.num_retries) if "cwl.output.json" in outc: - with outc.open("cwl.output.json") as f: + with outc.open("cwl.output.json", "rb") as f: if f.size() > 0: - outputs = json.load(f) + outputs = json.loads(f.read().decode()) def keepify(fileobj): path = fileobj["location"] if not path.startswith("keep:"): fileobj["location"] = "keep:%s/%s" % (record["output"], path) adjustFileObjs(outputs, keepify) adjustDirObjs(outputs, keepify) - except Exception as e: - logger.exception("[%s] While getting final output object: %s", self.name, e) + except Exception: + logger.exception("[%s] While getting final output object", self.name) self.arvrunner.output_callback({}, "permanentFail") else: self.arvrunner.output_callback(outputs, processStatus)