#
# SPDX-License-Identifier: Apache-2.0
-from future import standard_library
-standard_library.install_aliases()
-from future.utils import viewvalues, viewitems
-from past.builtins import basestring
-
import os
import sys
import re
CWLOutputType,
)
-if os.name == "posix" and sys.version_info[0] < 3:
- import subprocess32 as subprocess
-else:
- import subprocess
+import subprocess
from schema_salad.sourceline import SourceLine, cmap
from . context import ArvRuntimeContext
from .perf import Perf
+basestring = (bytes, str)
logger = logging.getLogger('arvados.cwl-runner')
metrics = logging.getLogger('arvados.cwl-runner.metrics')
if "default" in d:
op(d)
else:
- for i in viewvalues(d):
+ for i in d.values():
find_defaults(i, op)
def make_builder(joborder, hints, requirements, runtimeContext, metadata):
rewrite_out=rewrites,
loader=tool.doc_loader)
- rewrite_to_orig = {v: k for k,v in viewitems(rewrites)}
+ rewrite_to_orig = {v: k for k,v in rewrites.items()}
def visit(v, cur_id):
if isinstance(v, dict):
v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
if v.get("class") == "DockerRequirement":
v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
- runtimeContext.project_uuid,
- runtimeContext.force_docker_pull,
- runtimeContext.tmp_outdir_prefix,
- runtimeContext.match_local_docker,
- runtimeContext.copy_deps)
+ runtimeContext)
for l in v:
visit(v[l], cur_id)
if isinstance(v, list):
with Perf(metrics, "setloc"):
visit_class(workflowobj, ("File", "Directory"), partial(setloc, mapper))
+def apply_merged_map(merged_map, workflowobj):
+ def visit(v, cur_id):
+ if isinstance(v, dict):
+ if v.get("class") in ("CommandLineTool", "Workflow", "ExpressionTool"):
+ if "id" in v:
+ cur_id = v["id"]
+ if "path" in v and "location" not in v:
+ v["location"] = v["path"]
+ del v["path"]
+ if "location" in v and cur_id in merged_map:
+ if v["location"] in merged_map[cur_id].resolved:
+ v["location"] = merged_map[cur_id].resolved[v["location"]]
+ if v["location"] in merged_map[cur_id].secondaryFiles:
+ v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
+ #if v.get("class") == "DockerRequirement":
+ # v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
+ # runtimeContext)
+ for l in v:
+ visit(v[l], cur_id)
+ if isinstance(v, list):
+ for l in v:
+ visit(l, cur_id)
+ visit(workflowobj, None)
+
+def update_from_merged_map(tool, merged_map):
+ tool.visit(partial(apply_merged_map, merged_map))
+
def upload_job_order(arvrunner, name, tool, job_order, runtimeContext):
"""Upload local files referenced in the input object and return updated input
object with 'location' updated to the proper keep references.
update_from_mapper(job_order, jobmapper)
- #print(json.dumps(job_order, indent=2))
-
- return job_order
+ return job_order, jobmapper
FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
def upload_workflow_deps(arvrunner, tool, runtimeContext):
# Ensure that Docker images needed by this workflow are available
- # commented out for testing only, uncomment me
with Perf(metrics, "upload_docker"):
upload_docker(arvrunner, tool, runtimeContext)
toolmap = {}
for k,v in pm.items():
toolmap[k] = v.resolved
+
merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
return merged_map
"""Base class for runner processes, which submit an instance of
arvados-cwl-runner and wait for the final result."""
- def __init__(self, runner, updated_tool,
+ def __init__(self, runner,
tool, loadingContext, enable_reuse,
output_name, output_tags, submit_runner_ram=0,
name=None, on_error=None, submit_runner_image=None,
priority=None, secret_store=None,
collection_cache_size=256,
collection_cache_is_default=True,
- git_info=None):
+ git_info=None,
+ reuse_runner=False):
self.loadingContext = loadingContext.copy()
- self.loadingContext.metadata = updated_tool.metadata.copy()
- super(Runner, self).__init__(updated_tool.tool, loadingContext)
+ super(Runner, self).__init__(tool.tool, loadingContext)
self.arvrunner = runner
self.embedded_tool = tool
self.enable_dev = self.loadingContext.enable_dev
self.git_info = git_info
self.fast_parser = self.loadingContext.fast_parser
+ self.reuse_runner = reuse_runner
self.submit_runner_cores = 1
self.submit_runner_ram = 1024 # defaut 1 GiB
api_client=self.arvrunner.api,
keep_client=self.arvrunner.keep_client,
num_retries=self.arvrunner.num_retries)
- done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
+ done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40,
+ include_crunchrun=(record.get("exit_code") is None or record.get("exit_code") > 127))
self.final_output = record["output"]
outc = arvados.collection.CollectionReader(self.final_output,
self.arvrunner.output_callback({}, "permanentFail")
else:
self.arvrunner.output_callback(outputs, processStatus)
+
+
+def print_keep_deps_visitor(api, runtimeContext, references, doc_loader, tool):
+ def collect_locators(obj):
+ loc = obj.get("location", "")
+
+ g = arvados.util.keepuri_pattern.match(loc)
+ if g:
+ references.add(g[1])
+
+ if obj.get("class") == "http://arvados.org/cwl#WorkflowRunnerResources" and "acrContainerImage" in obj:
+ references.add(obj["acrContainerImage"])
+
+ if obj.get("class") == "DockerRequirement":
+ references.add(arvados_cwl.arvdocker.arv_docker_get_image(api, obj, False, runtimeContext))
+
+ sc_result = scandeps(tool["id"], tool,
+ set(),
+ set(("location", "id")),
+ None, urljoin=doc_loader.fetcher.urljoin,
+ nestdirs=False)
+
+ visit_class(sc_result, ("File", "Directory"), collect_locators)
+ visit_class(tool, ("DockerRequirement", "http://arvados.org/cwl#WorkflowRunnerResources"), collect_locators)
+
+
+def print_keep_deps(arvRunner, runtimeContext, merged_map, tool):
+ references = set()
+
+ tool.visit(partial(print_keep_deps_visitor, arvRunner.api, runtimeContext, references, tool.doc_loader))
+
+ for mm in merged_map:
+ for k, v in merged_map[mm].resolved.items():
+ g = arvados.util.keepuri_pattern.match(v)
+ if g:
+ references.add(g[1])
+
+ json.dump(sorted(references), arvRunner.stdout)
+ print(file=arvRunner.stdout)