From: Peter Amstutz Date: Wed, 15 Jun 2016 15:23:47 +0000 (-0400) Subject: 8442: Tweak internal handling of keep: paths, examine exit codes to determine X-Git-Tag: 1.1.0~869^2~10 X-Git-Url: https://git.arvados.org/arvados.git/commitdiff_plain/032239408242c641b08df74f6a91984cbab610cd 8442: Tweak internal handling of keep: paths, examine exit codes to determine success/fail. --- diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py index ba26816c21..050b7b9a7e 100644 --- a/sdk/cwl/arvados_cwl/__init__.py +++ b/sdk/cwl/arvados_cwl/__init__.py @@ -31,7 +31,7 @@ class ArvCwlRunner(object): """Execute a CWL tool or workflow, submit crunch jobs, wait for them to complete, and report output.""" - def __init__(self, api_client, crunch2): + def __init__(self, api_client, crunch2=False): self.api = api_client self.jobs = {} self.lock = threading.Lock() @@ -54,12 +54,12 @@ class ArvCwlRunner(object): if self.pipeline: self.api.pipeline_instances().update(uuid=self.pipeline["uuid"], body={"state": "Complete"}).execute(num_retries=self.num_retries) - else: logger.warn("Overall job status is %s", processStatus) if self.pipeline: self.api.pipeline_instances().update(uuid=self.pipeline["uuid"], body={"state": "Failed"}).execute(num_retries=self.num_retries) + self.final_status = processStatus self.final_output = out def on_message(self, event): @@ -191,6 +191,9 @@ class ArvCwlRunner(object): finally: self.cond.release() + if self.final_status == "UnsupportedRequirement": + raise UnsupportedRequirement("Check log for details.") + if self.final_output is None: raise cwltool.workflow.WorkflowException("Workflow did not return a result.") diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py index 414ce63667..8b5ac5ac78 100644 --- a/sdk/cwl/arvados_cwl/arvcontainer.py +++ b/sdk/cwl/arvados_cwl/arvcontainer.py @@ -111,7 +111,17 @@ class ArvadosContainer(object): def done(self, record): try: if record["state"] == "Complete": - processStatus = "success" + rcode = record["exit_code"] + if self.successCodes and rcode in self.successCodes: + processStatus = "success" + elif self.temporaryFailCodes and rcode in self.temporaryFailCodes: + processStatus = "temporaryFail" + elif self.permanentFailCodes and rcode in self.permanentFailCodes: + processStatus = "permanentFail" + elif rcode == 0: + processStatus = "success" + else: + processStatus = "permanentFail" else: processStatus = "permanentFail" @@ -152,7 +162,7 @@ class RunnerContainer(Runner): workflowname = os.path.basename(self.tool.tool["id"]) workflowpath = "/var/lib/cwl/workflow/%s" % workflowname workflowcollection = workflowmapper.mapper(self.tool.tool["id"])[1] - workflowcollection = workflowcollection[:workflowcollection.index('/')] + workflowcollection = workflowcollection[5:workflowcollection.index('/')] jobpath = "/var/lib/cwl/job/cwl.input.json" container_image = arv_docker_get_image(self.arvrunner.api, @@ -195,6 +205,7 @@ class RunnerContainer(Runner): } def run(self, *args, **kwargs): + kwargs["keepprefix"] = "keep:" job_spec = self.arvados_job_spec(*args, **kwargs) job_spec.setdefault("owner_uuid", self.arvrunner.project_uuid) diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py index f7543482c7..002c0ca0d0 100644 --- a/sdk/cwl/arvados_cwl/runner.py +++ b/sdk/cwl/arvados_cwl/runner.py @@ -6,7 +6,7 @@ import json from cwltool.draft2tool import CommandLineTool import cwltool.workflow -from cwltool.process import get_feature, scandeps, adjustFiles +from cwltool.process import get_feature, scandeps, adjustFiles, UnsupportedRequirement from cwltool.load_tool import fetch_document import arvados.collection @@ -61,15 +61,16 @@ class Runner(object): adjustFiles(sc, partial(visitFiles, workflowfiles)) adjustFiles(self.job_order, partial(visitFiles, jobfiles)) + keepprefix = kwargs.get("keepprefix", "") workflowmapper = ArvPathMapper(self.arvrunner, workflowfiles, "", - "%s", - "%s/%s", + keepprefix+"%s", + keepprefix+"%s/%s", name=self.name, **kwargs) jobmapper = ArvPathMapper(self.arvrunner, jobfiles, "", - "%s", - "%s/%s", + keepprefix+"%s", + keepprefix+"%s/%s", name=os.path.basename(self.job_order.get("id", "#")), **kwargs) @@ -83,7 +84,15 @@ class Runner(object): def done(self, record): if record["state"] == "Complete": - processStatus = "success" + if record.get("exit_code") is not None: + if record["exit_code"] == 33: + processStatus = "UnsupportedRequirement" + elif record["exit_code"] == 0: + processStatus = "success" + else: + processStatus = "permanentFail" + else: + processStatus = "success" else: processStatus = "permanentFail" @@ -96,6 +105,8 @@ class Runner(object): def keepify(path): if not path.startswith("keep:"): return "keep:%s/%s" % (record["output"], path) + else: + return path adjustFiles(outputs, keepify) except Exception as e: logger.error("While getting final output object: %s", e)