From 001e1ecfddb4f1d0cc26cdb415395ff47cde4914 Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Tue, 27 Oct 2015 16:26:10 -0400 Subject: [PATCH] 7593: References to files in keep must have keep: URI scheme. Improve error handling. Support configuring which git repo has crunchrunner. --- sdk/cwl/arvados_cwl/__init__.py | 41 +++++++++++++++++++-------------- 1 file changed, 24 insertions(+), 17 deletions(-) diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py index d1e53faeb9..0fdf75340f 100644 --- a/sdk/cwl/arvados_cwl/__init__.py +++ b/sdk/cwl/arvados_cwl/__init__.py @@ -73,7 +73,7 @@ class CollectionFsAccess(cwltool.process.StdFsAccess): def glob(self, pattern): collection, rest = self.get_collection(pattern) patternsegments = rest.split("/") - return self._match(collection, patternsegments, collection.manifest_locator()) + return self._match(collection, patternsegments, "keep:" + collection.manifest_locator()) def open(self, fn, mode): collection, rest = self.get_collection(fn) @@ -128,20 +128,25 @@ class ArvadosJob(object): if docker_req and kwargs.get("use_container") is not False: runtime_constraints["docker_image"] = arv_docker_get_image(self.arvrunner.api, docker_req, pull_image) - response = self.arvrunner.api.jobs().create(body={ - "script": "crunchrunner", - "repository": "peteramstutz/cr", - "script_version": "master", - "script_parameters": {"tasks": [script_parameters]}, - "runtime_constraints": runtime_constraints - }, find_or_create=kwargs.get("enable_reuse", True)).execute() + try: + response = self.arvrunner.api.jobs().create(body={ + "script": "crunchrunner", + "repository": kwargs["repository"], + "script_version": "master", + "script_parameters": {"tasks": [script_parameters]}, + "runtime_constraints": runtime_constraints + }, find_or_create=kwargs.get("enable_reuse", True)).execute() + + self.arvrunner.jobs[response["uuid"]] = self - self.arvrunner.jobs[response["uuid"]] = self + logger.info("Job %s is %s", response["uuid"], response["state"]) - logger.info("Job %s is %s", response["uuid"], response["state"]) + if response["state"] in ("Complete", "Failed", "Cancelled"): + self.done(response) + except Exception as e: + logger.error("Got error %s" % str(e)) + self.output_callback({}, "permanentFail") - if response["state"] in ("Complete", "Failed", "Cancelled"): - self.done(response) def done(self, record): try: @@ -161,19 +166,18 @@ class ArvadosJob(object): finally: del self.arvrunner.jobs[record["uuid"]] + class ArvPathMapper(cwltool.pathmapper.PathMapper): def __init__(self, arvrunner, referenced_files, basedir, **kwargs): self._pathmap = copy.copy(arvrunner.uploaded) uploadfiles = [] - pdh_path = re.compile(r'^[0-9a-f]{32}\+\d+/.+') + pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+') for src in referenced_files: if isinstance(src, basestring) and pdh_path.match(src): - self._pathmap[src] = (src, "$(task.keep)/%s" % src) - if src in self._pathmap: - pass - else: + self._pathmap[src] = (src, "$(task.keep)/%s" % src[5:]) + if src not in self._pathmap: ab = cwltool.pathmapper.abspath(src, basedir) st = arvados.commands.run.statfile("", ab) if kwargs.get("conformance_test"): @@ -255,6 +259,7 @@ class ArvCwlRunner(object): kwargs["fs_access"] = self.fs_access kwargs["enable_reuse"] = args.enable_reuse + kwargs["repository"] = args.repository if kwargs.get("conformance_test"): return cwltool.main.single_job_executor(tool, job_order, input_basedir, args, **kwargs) @@ -306,4 +311,6 @@ def main(args, stdout, stderr, api_client=None): default=False, dest="enable_reuse", help="") + parser.add_argument('--repository', type=str, default="crunchrunner", help="Repository containing the 'crunchrunner' program.") + return cwltool.main.main(args, executor=runner.arvExecutor, makeTool=runner.arvMakeTool, parser=parser) -- 2.30.2