X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/2001423a6eb7937a689414f3fa62be5b124812c1..b9b4502bcddeccd794614bf6979d643f9f350877:/sdk/cwl/arvados_cwl/__init__.py diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py index 2a82d56fb6..46436b54dc 100644 --- a/sdk/cwl/arvados_cwl/__init__.py +++ b/sdk/cwl/arvados_cwl/__init__.py @@ -32,7 +32,7 @@ from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, upload_dependencies from .arvtool import ArvadosCommandTool from .arvworkflow import ArvadosWorkflow, upload_workflow -from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver +from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache from .perf import Perf from .pathmapper import NoFollowPathMapper from ._version import __version__ @@ -80,6 +80,8 @@ class ArvCwlRunner(object): else: self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries) + self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries) + self.work_api = None expected_api = ["jobs", "containers"] for api in expected_api: @@ -102,7 +104,8 @@ class ArvCwlRunner(object): kwargs["work_api"] = self.work_api kwargs["fetcher_constructor"] = partial(CollectionFetcher, api_client=self.api, - keep_client=self.keep_client) + fs_access=CollectionFsAccess("", collection_cache=self.collection_cache), + num_retries=self.num_retries) if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool": return ArvadosCommandTool(self, toolpath_object, **kwargs) elif "class" in toolpath_object and toolpath_object["class"] == "Workflow": @@ -231,7 +234,6 @@ class ArvCwlRunner(object): keep_client=self.keep_client, num_retries=self.num_retries) - srccollections = {} for k,v in generatemapper.items(): if k.startswith("_:"): if v.type == "Directory": @@ -245,20 +247,13 @@ class ArvCwlRunner(object): raise Exception("Output source is not in keep or a literal") sp = k.split("/") srccollection = sp[0][5:] - if srccollection not in srccollections: - try: - srccollections[srccollection] = arvados.collection.CollectionReader( - srccollection, - api_client=self.api, - keep_client=self.keep_client, - num_retries=self.num_retries) - except arvados.errors.ArgumentError as e: - logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e) - raise - reader = srccollections[srccollection] try: + reader = self.collection_cache.get(srccollection) srcpath = "/".join(sp[1:]) if len(sp) > 1 else "." final.copy(srcpath, v.target, source_collection=reader, overwrite=False) + except arvados.errors.ArgumentError as e: + logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e) + raise except IOError as e: logger.warn("While preparing output collection: %s", e) @@ -331,8 +326,7 @@ class ArvCwlRunner(object): self.project_uuid = kwargs.get("project_uuid") self.pipeline = None make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess, - api_client=self.api, - keep_client=self.keep_client) + collection_cache=self.collection_cache) self.fs_access = make_fs_access(kwargs["basedir"]) if not kwargs.get("name"): @@ -545,7 +539,12 @@ def arg_parser(): # type: () -> argparse.ArgumentParser help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.", type=float, default=20) - parser.add_argument("--version", action="store_true", help="Print version and exit") + + exgroup = parser.add_mutually_exclusive_group() + exgroup.add_argument("--print-dot", action="store_true", + help="Print workflow visualization in graphviz format and exit") + exgroup.add_argument("--version", action="store_true", help="Print version and exit") + exgroup.add_argument("--validate", action="store_true", help="Validate CWL document only.") exgroup = parser.add_mutually_exclusive_group() exgroup.add_argument("--verbose", action="store_true", help="Default logging") @@ -617,6 +616,10 @@ def arg_parser(): # type: () -> argparse.ArgumentParser help="Desired workflow behavior when a step fails. One of 'stop' or 'continue'. " "Default is 'continue'.", default="continue", choices=("stop", "continue")) + parser.add_argument("--enable-dev", action="store_true", + help="Enable loading and running development versions " + "of CWL spec.", default=False) + parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute") parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.") @@ -637,7 +640,6 @@ def add_arv_hints(): "http://commonwl.org/cwltool#LoadListingRequirement" ]) - def main(args, stdout, stderr, api_client=None, keep_client=None): parser = arg_parser() @@ -703,6 +705,9 @@ def main(args, stdout, stderr, api_client=None, keep_client=None): arvargs.relax_path_checks = True arvargs.validate = None + make_fs_access = partial(CollectionFsAccess, + collection_cache=runner.collection_cache) + return cwltool.main.main(args=arvargs, stdout=stdout, stderr=stderr, @@ -710,12 +715,10 @@ def main(args, stdout, stderr, api_client=None, keep_client=None): makeTool=runner.arv_make_tool, versionfunc=versionstring, job_order_object=job_order_object, - make_fs_access=partial(CollectionFsAccess, - api_client=api_client, - keep_client=keep_client), + make_fs_access=make_fs_access, fetcher_constructor=partial(CollectionFetcher, api_client=api_client, - keep_client=keep_client, + fs_access=make_fs_access(""), num_retries=runner.num_retries), resolver=partial(collectionResolver, api_client, num_retries=runner.num_retries), logger_handler=arvados.log_handler,