import re
from cStringIO import StringIO
+from schema_salad.sourceline import SourceLine
+
import cwltool.draft2tool
from cwltool.draft2tool import CommandLineTool
import cwltool.workflow
from .arvdocker import arv_docker_get_image
from .pathmapper import ArvPathMapper
from ._version import __version__
+from . import done
logger = logging.getLogger('arvados.cwl-runner')
if docker_req:
if docker_req.get("dockerOutputDirectory"):
# TODO: can be supported by containers API, but not jobs API.
- raise UnsupportedRequirement("Option 'dockerOutputDirectory' of DockerRequirement not supported.")
+ raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
+ "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
elif isinstance(tool, cwltool.workflow.Workflow):
for s in tool.steps:
class Runner(object):
def __init__(self, runner, tool, job_order, enable_reuse,
output_name, output_tags, submit_runner_ram=0,
- name=None):
+ name=None, on_error=None):
self.arvrunner = runner
self.tool = tool
self.job_order = job_order
self.output_name = output_name
self.output_tags = output_tags
self.name = name
+ self.on_error = on_error
if submit_runner_ram:
self.submit_runner_ram = submit_runner_ram
return workflowmapper
def done(self, record):
- if record["state"] == "Complete":
- if record.get("exit_code") is not None:
- if record["exit_code"] == 33:
- processStatus = "UnsupportedRequirement"
- elif record["exit_code"] == 0:
- processStatus = "success"
+ try:
+ if record["state"] == "Complete":
+ if record.get("exit_code") is not None:
+ if record["exit_code"] == 33:
+ processStatus = "UnsupportedRequirement"
+ elif record["exit_code"] == 0:
+ processStatus = "success"
+ else:
+ processStatus = "permanentFail"
else:
- processStatus = "permanentFail"
+ processStatus = "success"
else:
- processStatus = "success"
- else:
- processStatus = "permanentFail"
+ processStatus = "permanentFail"
- outputs = {}
- try:
- try:
- self.final_output = record["output"]
- outc = arvados.collection.CollectionReader(self.final_output,
+ outputs = {}
+
+ if processStatus == "permanentFail":
+ logc = arvados.collection.CollectionReader(record["log"],
api_client=self.arvrunner.api,
keep_client=self.arvrunner.keep_client,
num_retries=self.arvrunner.num_retries)
- if "cwl.output.json" in outc:
- with outc.open("cwl.output.json") as f:
- if f.size() > 0:
- outputs = json.load(f)
- def keepify(fileobj):
- path = fileobj["location"]
- if not path.startswith("keep:"):
- fileobj["location"] = "keep:%s/%s" % (record["output"], path)
- adjustFileObjs(outputs, keepify)
- adjustDirObjs(outputs, keepify)
- except Exception as e:
- logger.exception("While getting final output object: %s", e)
+ done.logtail(logc, logger, "%s error log:" % self.arvrunner.label(self), maxlen=40)
+
+ self.final_output = record["output"]
+ outc = arvados.collection.CollectionReader(self.final_output,
+ api_client=self.arvrunner.api,
+ keep_client=self.arvrunner.keep_client,
+ num_retries=self.arvrunner.num_retries)
+ if "cwl.output.json" in outc:
+ with outc.open("cwl.output.json") as f:
+ if f.size() > 0:
+ outputs = json.load(f)
+ def keepify(fileobj):
+ path = fileobj["location"]
+ if not path.startswith("keep:"):
+ fileobj["location"] = "keep:%s/%s" % (record["output"], path)
+ adjustFileObjs(outputs, keepify)
+ adjustDirObjs(outputs, keepify)
+ except Exception as e:
+ logger.exception("[%s] While getting final output object: %s", self.name, e)
+ self.arvrunner.output_callback({}, "permanentFail")
+ else:
self.arvrunner.output_callback(outputs, processStatus)
finally:
if record["uuid"] in self.arvrunner.processes: