X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/e9153135c39388bf403ea94896f935ce80309b01..cdad40f38151c5d678f13787853f361566fbcd43:/sdk/cwl/arvados_cwl/__init__.py diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py index e2e9270c10..4198c34482 100644 --- a/sdk/cwl/arvados_cwl/__init__.py +++ b/sdk/cwl/arvados_cwl/__init__.py @@ -14,6 +14,7 @@ import fnmatch import logging import re import os + from cwltool.process import get_feature logger = logging.getLogger('arvados.cwl-runner') @@ -40,6 +41,7 @@ def arv_docker_get_image(api_client, dockerRequirement, pull_image): return dockerRequirement["dockerImageId"] + class CollectionFsAccess(cwltool.process.StdFsAccess): def __init__(self, basedir): self.collections = {} @@ -47,17 +49,29 @@ class CollectionFsAccess(cwltool.process.StdFsAccess): def get_collection(self, path): p = path.split("/") - if arvados.util.keep_locator_pattern.match(p[0]): - if p[0] not in self.collections: - self.collections[p[0]] = arvados.collection.CollectionReader(p[0]) - return (self.collections[p[0]], "/".join(p[1:])) + if p[0].startswith("keep:") and arvados.util.keep_locator_pattern.match(p[0][5:]): + pdh = p[0][5:] + if pdh not in self.collections: + self.collections[pdh] = arvados.collection.CollectionReader(pdh) + return (self.collections[pdh], "/".join(p[1:])) else: return (None, path) def _match(self, collection, patternsegments, parent): + if not patternsegments: + return [] + + if not isinstance(collection, arvados.collection.RichCollectionBase): + return [] + ret = [] + # iterate over the files and subcollections in 'collection' for filename in collection: - if fnmatch.fnmatch(filename, patternsegments[0]): + if patternsegments[0] == '.': + # Pattern contains something like "./foo" so just shift + # past the "./" + ret.extend(self._match(collection, patternsegments[1:], parent)) + elif fnmatch.fnmatch(filename, patternsegments[0]): cur = os.path.join(parent, filename) if len(patternsegments) == 1: ret.append(cur) @@ -68,7 +82,7 @@ class CollectionFsAccess(cwltool.process.StdFsAccess): def glob(self, pattern): collection, rest = self.get_collection(pattern) patternsegments = rest.split("/") - return self._match(collection, patternsegments, collection.manifest_locator()) + return self._match(collection, patternsegments, "keep:" + collection.manifest_locator()) def open(self, fn, mode): collection, rest = self.get_collection(fn) @@ -100,7 +114,7 @@ class ArvadosJob(object): script_parameters["task.vwd"] = {} for t in self.generatefiles: if isinstance(self.generatefiles[t], dict): - src, rest = self.arvrunner.fs_access.get_collection(self.generatefiles[t]["path"][13:]) + src, rest = self.arvrunner.fs_access.get_collection(self.generatefiles[t]["path"].replace("$(task.keep)/", "keep:")) vwd.copy(rest, t, source_collection=src) else: with vwd.open(t, "w") as f: @@ -123,20 +137,25 @@ class ArvadosJob(object): if docker_req and kwargs.get("use_container") is not False: runtime_constraints["docker_image"] = arv_docker_get_image(self.arvrunner.api, docker_req, pull_image) - response = self.arvrunner.api.jobs().create(body={ - "script": "crunchrunner", - "repository": "peteramstutz/cr", - "script_version": "master", - "script_parameters": {"tasks": [script_parameters]}, - "runtime_constraints": runtime_constraints - }, find_or_create=kwargs.get("enable_reuse", True)).execute() + try: + response = self.arvrunner.api.jobs().create(body={ + "script": "crunchrunner", + "repository": kwargs["repository"], + "script_version": "master", + "script_parameters": {"tasks": [script_parameters]}, + "runtime_constraints": runtime_constraints + }, find_or_create=kwargs.get("enable_reuse", True)).execute() + + self.arvrunner.jobs[response["uuid"]] = self - self.arvrunner.jobs[response["uuid"]] = self + logger.info("Job %s is %s", response["uuid"], response["state"]) - logger.info("Job %s is %s", response["uuid"], response["state"]) + if response["state"] in ("Complete", "Failed", "Cancelled"): + self.done(response) + except Exception as e: + logger.error("Got error %s" % str(e)) + self.output_callback({}, "permanentFail") - if response["state"] in ("Complete", "Failed", "Cancelled"): - self.done(response) def done(self, record): try: @@ -147,26 +166,27 @@ class ArvadosJob(object): try: outputs = {} - outputs = self.collect_outputs(record["output"]) + outputs = self.collect_outputs("keep:" + record["output"]) except Exception as e: - logger.warn(str(e)) + logger.exception("Got exception while collecting job outputs:") processStatus = "permanentFail" self.output_callback(outputs, processStatus) finally: del self.arvrunner.jobs[record["uuid"]] + class ArvPathMapper(cwltool.pathmapper.PathMapper): def __init__(self, arvrunner, referenced_files, basedir, **kwargs): - self._pathmap = {} + self._pathmap = arvrunner.get_uploaded() uploadfiles = [] - pdh_path = re.compile(r'^[0-9a-f]{32}\+\d+/.+') + pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+') for src in referenced_files: if isinstance(src, basestring) and pdh_path.match(src): - self._pathmap[src] = (src, "$(task.keep)/%s" % src) - else: + self._pathmap[src] = (src, "$(task.keep)/%s" % src[5:]) + if src not in self._pathmap: ab = cwltool.pathmapper.abspath(src, basedir) st = arvados.commands.run.statfile("", ab) if kwargs.get("conformance_test"): @@ -186,6 +206,7 @@ class ArvPathMapper(cwltool.pathmapper.PathMapper): fnPattern="$(task.keep)/%s/%s") for src, ab, st in uploadfiles: + arvrunner.add_uploaded(src, (ab, st.fn)) self._pathmap[src] = (ab, st.fn) @@ -209,6 +230,7 @@ class ArvCwlRunner(object): self.lock = threading.Lock() self.cond = threading.Condition(self.lock) self.final_output = None + self.uploaded = {} def arvMakeTool(self, toolpath_object, **kwargs): if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool": @@ -239,6 +261,12 @@ class ArvCwlRunner(object): finally: self.cond.release() + def get_uploaded(self): + return self.uploaded.copy() + + def add_uploaded(self, src, pair): + self.uploaded[src] = pair + def arvExecutor(self, tool, job_order, input_basedir, args, **kwargs): events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#job"]], self.on_message) @@ -246,6 +274,7 @@ class ArvCwlRunner(object): kwargs["fs_access"] = self.fs_access kwargs["enable_reuse"] = args.enable_reuse + kwargs["repository"] = args.repository if kwargs.get("conformance_test"): return cwltool.main.single_job_executor(tool, job_order, input_basedir, args, **kwargs) @@ -297,4 +326,6 @@ def main(args, stdout, stderr, api_client=None): default=False, dest="enable_reuse", help="") + parser.add_argument('--repository', type=str, default="peter/crunchrunner", help="Repository containing the 'crunchrunner' program.") + return cwltool.main.main(args, executor=runner.arvExecutor, makeTool=runner.arvMakeTool, parser=parser)