X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/a0dc773156537f6f08454450f6062d5132fbe8a5..HEAD:/sdk/cwl/arvados_cwl/runner.py diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py index 4432813f6a..c80a4b3f2f 100644 --- a/sdk/cwl/arvados_cwl/runner.py +++ b/sdk/cwl/arvados_cwl/runner.py @@ -2,11 +2,6 @@ # # SPDX-License-Identifier: Apache-2.0 -from future import standard_library -standard_library.install_aliases() -from future.utils import viewvalues, viewitems -from past.builtins import basestring - import os import sys import re @@ -36,16 +31,8 @@ from typing import ( Union, cast, ) -from cwltool.utils import ( - CWLObjectType, - CWLOutputAtomType, - CWLOutputType, -) -if os.name == "posix" and sys.version_info[0] < 3: - import subprocess32 as subprocess -else: - import subprocess +import subprocess from schema_salad.sourceline import SourceLine, cmap @@ -75,6 +62,7 @@ from . import done from . context import ArvRuntimeContext from .perf import Perf +basestring = (bytes, str) logger = logging.getLogger('arvados.cwl-runner') metrics = logging.getLogger('arvados.cwl-runner.metrics') @@ -106,7 +94,7 @@ def find_defaults(d, op): if "default" in d: op(d) else: - for i in viewvalues(d): + for i in d.values(): find_defaults(i, op) def make_builder(joborder, hints, requirements, runtimeContext, metadata): @@ -570,7 +558,7 @@ def packed_workflow(arvrunner, tool, merged_map, runtimeContext, git_info): rewrite_out=rewrites, loader=tool.doc_loader) - rewrite_to_orig = {v: k for k,v in viewitems(rewrites)} + rewrite_to_orig = {v: k for k,v in rewrites.items()} def visit(v, cur_id): if isinstance(v, dict): @@ -828,7 +816,8 @@ class Runner(Process): priority=None, secret_store=None, collection_cache_size=256, collection_cache_is_default=True, - git_info=None): + git_info=None, + reuse_runner=False): self.loadingContext = loadingContext.copy() @@ -861,6 +850,7 @@ class Runner(Process): self.enable_dev = self.loadingContext.enable_dev self.git_info = git_info self.fast_parser = self.loadingContext.fast_parser + self.reuse_runner = reuse_runner self.submit_runner_cores = 1 self.submit_runner_ram = 1024 # defaut 1 GiB @@ -934,7 +924,7 @@ class Runner(Process): if "cwl.output.json" in outc: with outc.open("cwl.output.json", "rb") as f: if f.size() > 0: - outputs = json.loads(f.read().decode()) + outputs = json.loads(str(f.read(), 'utf-8')) def keepify(fileobj): path = fileobj["location"] if not path.startswith("keep:"): @@ -946,3 +936,42 @@ class Runner(Process): self.arvrunner.output_callback({}, "permanentFail") else: self.arvrunner.output_callback(outputs, processStatus) + + +def print_keep_deps_visitor(api, runtimeContext, references, doc_loader, tool): + def collect_locators(obj): + loc = obj.get("location", "") + + g = arvados.util.keepuri_pattern.match(loc) + if g: + references.add(g[1]) + + if obj.get("class") == "http://arvados.org/cwl#WorkflowRunnerResources" and "acrContainerImage" in obj: + references.add(obj["acrContainerImage"]) + + if obj.get("class") == "DockerRequirement": + references.add(arvados_cwl.arvdocker.arv_docker_get_image(api, obj, False, runtimeContext)) + + sc_result = scandeps(tool["id"], tool, + set(), + set(("location", "id")), + None, urljoin=doc_loader.fetcher.urljoin, + nestdirs=False) + + visit_class(sc_result, ("File", "Directory"), collect_locators) + visit_class(tool, ("DockerRequirement", "http://arvados.org/cwl#WorkflowRunnerResources"), collect_locators) + + +def print_keep_deps(arvRunner, runtimeContext, merged_map, tool): + references = set() + + tool.visit(partial(print_keep_deps_visitor, arvRunner.api, runtimeContext, references, tool.doc_loader)) + + for mm in merged_map: + for k, v in merged_map[mm].resolved.items(): + g = arvados.util.keepuri_pattern.match(v) + if g: + references.add(g[1]) + + json.dump(sorted(references), arvRunner.stdout) + print(file=arvRunner.stdout)