X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/988c59f51aa579ce8bf0eab1cc729e05a5ee5631..90d1b5a78f311bf9b85f992e59e783871c64ffeb:/sdk/cwl/arvados_cwl/crunch_script.py?ds=sidebyside diff --git a/sdk/cwl/arvados_cwl/crunch_script.py b/sdk/cwl/arvados_cwl/crunch_script.py index 5024e95f77..7512d5bef2 100644 --- a/sdk/cwl/arvados_cwl/crunch_script.py +++ b/sdk/cwl/arvados_cwl/crunch_script.py @@ -27,6 +27,7 @@ from cwltool.process import shortname from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, normalizeFilesDirs from cwltool.load_tool import load_tool from cwltool.errors import WorkflowException +from arvados_cwl.context import ArvRuntimeContext from .fsaccess import CollectionFetcher, CollectionFsAccess @@ -97,25 +98,27 @@ def run(): debug = job_order_object["arv:debug"] del job_order_object["arv:debug"] - runner = arvados_cwl.ArvCwlRunner(api_client=arvados.safeapi.ThreadSafeApiCache( + arvargs = argparse.Namespace() + arvargs.work_api = "jobs" + arvargs.output_name = output_name + arvargs.output_tags = output_tags + arvargs.thread_count = 1 + + runner = arvados_cwl.ArvCwlExecutor(api_client=arvados.safeapi.ThreadSafeApiCache( api_params={"model": OrderedJsonModel()}, keep_params={"num_retries": 4}), - output_name=output_name, output_tags=output_tags) + arvargs=arvargs) make_fs_access = functools.partial(CollectionFsAccess, collection_cache=runner.collection_cache) - t = load_tool(toolpath, runner.arv_make_tool, - fetcher_constructor=functools.partial(CollectionFetcher, - api_client=runner.api, - fs_access=make_fs_access(""), - num_retries=runner.num_retries)) + t = load_tool(toolpath, runner.loadingContext) if debug: logger.setLevel(logging.DEBUG) logging.getLogger('arvados').setLevel(logging.DEBUG) logging.getLogger("cwltool").setLevel(logging.DEBUG) - args = argparse.Namespace() + args = ArvRuntimeContext(vars(arvargs)) args.project_uuid = arvados.current_job()["owner_uuid"] args.enable_reuse = enable_reuse args.on_error = on_error @@ -134,7 +137,7 @@ def run(): args.disable_js_validation = False args.tmp_outdir_prefix = "tmp" - runner.arv_executor(t, job_order_object, **vars(args)) + runner.arv_executor(t, job_order_object, args, logger=logger) except Exception as e: if isinstance(e, WorkflowException): logging.info("Workflow error %s", e)