X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/33c10053b22fd5065516eb7df4c58b55a70d490c..917c3ca20c36318578c4dfab7de076d97a2f87fe:/sdk/cwl/arvados_cwl/arvjob.py diff --git a/sdk/cwl/arvados_cwl/arvjob.py b/sdk/cwl/arvados_cwl/arvjob.py index 1287fbb6ea..11efc0c1c3 100644 --- a/sdk/cwl/arvados_cwl/arvjob.py +++ b/sdk/cwl/arvados_cwl/arvjob.py @@ -2,6 +2,10 @@ # # SPDX-License-Identifier: Apache-2.0 +from past.builtins import basestring +from builtins import object +from future.utils import viewitems + import logging import re import copy @@ -18,7 +22,7 @@ from cwltool.job import JobBase from schema_salad.sourceline import SourceLine -from arvados_cwl.util import get_current_container, get_intermediate_collection_info +import arvados_cwl.util import ruamel.yaml as yaml import arvados.collection @@ -30,6 +34,7 @@ from .pathmapper import VwdPathMapper, trim_listing from .perf import Perf from . import done from ._version import __version__ +from .util import get_intermediate_collection_info logger = logging.getLogger('arvados.cwl-runner') metrics = logging.getLogger('arvados.cwl-runner.metrics') @@ -66,7 +71,7 @@ class ArvadosJob(JobBase): keep_client=self.arvrunner.keep_client, num_retries=self.arvrunner.num_retries) script_parameters["task.vwd"] = {} - generatemapper = VwdPathMapper([self.generatefiles], "", "", + generatemapper = VwdPathMapper(self.generatefiles["listing"], "", "", separateDirs=False) with Perf(metrics, "createfiles %s" % self.name): @@ -77,9 +82,7 @@ class ArvadosJob(JobBase): if vwd: with Perf(metrics, "generatefiles.save_new %s" % self.name): - if not runtimeContext.current_container: - runtimeContext.current_container = get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger) - info = get_intermediate_collection_info(self.name, runtimeContext.current_container, runtimeContext.intermediate_output_ttl) + info = get_intermediate_collection_info(self.name, None, runtimeContext.intermediate_output_ttl) vwd.save_new(name=info["name"], owner_uuid=self.arvrunner.project_uuid, ensure_unique_name=True, @@ -196,7 +199,7 @@ class ArvadosJob(JobBase): e) else: logger.info("%s %s is %s", self.arvrunner.label(self), response["uuid"], response["state"]) - except Exception as e: + except Exception: logger.exception("%s error" % (self.arvrunner.label(self))) self.output_callback({}, "permanentFail") @@ -221,8 +224,8 @@ class ArvadosJob(JobBase): body={ "components": components }).execute(num_retries=self.arvrunner.num_retries) - except Exception as e: - logger.info("Error adding to components: %s", e) + except Exception: + logger.exception("Error adding to components") def done(self, record): try: @@ -233,8 +236,11 @@ class ArvadosJob(JobBase): try: if record["state"] == "Complete": processStatus = "success" + # we don't have the real exit code so fake it. + record["exit_code"] = 0 else: processStatus = "permanentFail" + record["exit_code"] = 1 outputs = {} try: @@ -244,7 +250,7 @@ class ArvadosJob(JobBase): api_client=self.arvrunner.api, keep_client=self.arvrunner.keep_client, num_retries=self.arvrunner.num_retries) - log = logc.open(logc.keys()[0]) + log = logc.open(list(logc.keys())[0]) dirs = { "tmpdir": "/tmpdir", "outdir": "/outdir", @@ -269,10 +275,12 @@ class ArvadosJob(JobBase): outputs = done.done(self, record, dirs["tmpdir"], dirs["outdir"], dirs["keep"]) except WorkflowException as e: + # Only include a stack trace if in debug mode. + # This is most likely a user workflow error and a stack trace may obfuscate more useful output. logger.error("%s unable to collect output from %s:\n%s", self.arvrunner.label(self), record["output"], e, exc_info=(e if self.arvrunner.debug else False)) processStatus = "permanentFail" - except Exception as e: + except Exception: logger.exception("Got unknown exception while collecting output for job %s:", self.name) processStatus = "permanentFail" @@ -297,10 +305,10 @@ class RunnerJob(Runner): a pipeline template or pipeline instance. """ - if self.tool.tool["id"].startswith("keep:"): - self.job_order["cwl:tool"] = self.tool.tool["id"][5:] + if self.embedded_tool.tool["id"].startswith("keep:"): + self.job_order["cwl:tool"] = self.embedded_tool.tool["id"][5:] else: - packed = packed_workflow(self.arvrunner, self.tool, self.merged_map) + packed = packed_workflow(self.arvrunner, self.embedded_tool, self.merged_map) wf_pdh = upload_workflow_collection(self.arvrunner, self.name, packed) self.job_order["cwl:tool"] = "%s/workflow.cwl#main" % wf_pdh @@ -344,7 +352,7 @@ class RunnerJob(Runner): find_or_create=self.enable_reuse ).execute(num_retries=self.arvrunner.num_retries) - for k,v in job_spec["script_parameters"].items(): + for k,v in viewitems(job_spec["script_parameters"]): if v is False or v is None or isinstance(v, dict): job_spec["script_parameters"][k] = {"value": v} @@ -387,19 +395,21 @@ class RunnerTemplate(object): } def __init__(self, runner, tool, job_order, enable_reuse, uuid, - submit_runner_ram=0, name=None, merged_map=None): + submit_runner_ram=0, name=None, merged_map=None, + loadingContext=None): self.runner = runner - self.tool = tool + self.embedded_tool = tool self.job = RunnerJob( runner=runner, tool=tool, - job_order=job_order, enable_reuse=enable_reuse, output_name=None, output_tags=None, submit_runner_ram=submit_runner_ram, name=name, - merged_map=merged_map) + merged_map=merged_map, + loadingContext=loadingContext) + self.job.job_order = job_order self.uuid = uuid def pipeline_component_spec(self): @@ -421,7 +431,7 @@ class RunnerTemplate(object): job_params = spec['script_parameters'] spec['script_parameters'] = {} - for param in self.tool.tool['inputs']: + for param in self.embedded_tool.tool['inputs']: param = copy.deepcopy(param) # Data type and "required" flag...