success/fail.
"""Execute a CWL tool or workflow, submit crunch jobs, wait for them to
complete, and report output."""
- def __init__(self, api_client, crunch2):
+ def __init__(self, api_client, crunch2=False):
self.api = api_client
self.jobs = {}
self.lock = threading.Lock()
if self.pipeline:
self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
body={"state": "Complete"}).execute(num_retries=self.num_retries)
-
else:
logger.warn("Overall job status is %s", processStatus)
if self.pipeline:
self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
body={"state": "Failed"}).execute(num_retries=self.num_retries)
+ self.final_status = processStatus
self.final_output = out
def on_message(self, event):
finally:
self.cond.release()
+ if self.final_status == "UnsupportedRequirement":
+ raise UnsupportedRequirement("Check log for details.")
+
if self.final_output is None:
raise cwltool.workflow.WorkflowException("Workflow did not return a result.")
def done(self, record):
try:
if record["state"] == "Complete":
- processStatus = "success"
+ rcode = record["exit_code"]
+ if self.successCodes and rcode in self.successCodes:
+ processStatus = "success"
+ elif self.temporaryFailCodes and rcode in self.temporaryFailCodes:
+ processStatus = "temporaryFail"
+ elif self.permanentFailCodes and rcode in self.permanentFailCodes:
+ processStatus = "permanentFail"
+ elif rcode == 0:
+ processStatus = "success"
+ else:
+ processStatus = "permanentFail"
else:
processStatus = "permanentFail"
workflowname = os.path.basename(self.tool.tool["id"])
workflowpath = "/var/lib/cwl/workflow/%s" % workflowname
workflowcollection = workflowmapper.mapper(self.tool.tool["id"])[1]
- workflowcollection = workflowcollection[:workflowcollection.index('/')]
+ workflowcollection = workflowcollection[5:workflowcollection.index('/')]
jobpath = "/var/lib/cwl/job/cwl.input.json"
container_image = arv_docker_get_image(self.arvrunner.api,
}
def run(self, *args, **kwargs):
+ kwargs["keepprefix"] = "keep:"
job_spec = self.arvados_job_spec(*args, **kwargs)
job_spec.setdefault("owner_uuid", self.arvrunner.project_uuid)
from cwltool.draft2tool import CommandLineTool
import cwltool.workflow
-from cwltool.process import get_feature, scandeps, adjustFiles
+from cwltool.process import get_feature, scandeps, adjustFiles, UnsupportedRequirement
from cwltool.load_tool import fetch_document
import arvados.collection
adjustFiles(sc, partial(visitFiles, workflowfiles))
adjustFiles(self.job_order, partial(visitFiles, jobfiles))
+ keepprefix = kwargs.get("keepprefix", "")
workflowmapper = ArvPathMapper(self.arvrunner, workflowfiles, "",
- "%s",
- "%s/%s",
+ keepprefix+"%s",
+ keepprefix+"%s/%s",
name=self.name,
**kwargs)
jobmapper = ArvPathMapper(self.arvrunner, jobfiles, "",
- "%s",
- "%s/%s",
+ keepprefix+"%s",
+ keepprefix+"%s/%s",
name=os.path.basename(self.job_order.get("id", "#")),
**kwargs)
def done(self, record):
if record["state"] == "Complete":
- processStatus = "success"
+ if record.get("exit_code") is not None:
+ if record["exit_code"] == 33:
+ processStatus = "UnsupportedRequirement"
+ elif record["exit_code"] == 0:
+ processStatus = "success"
+ else:
+ processStatus = "permanentFail"
+ else:
+ processStatus = "success"
else:
processStatus = "permanentFail"
def keepify(path):
if not path.startswith("keep:"):
return "keep:%s/%s" % (record["output"], path)
+ else:
+ return path
adjustFiles(outputs, keepify)
except Exception as e:
logger.error("While getting final output object: %s", e)