X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/5f8e65c488277b3f259f791d1cd17361bd67009d..d3a82af33386c99c0fd3d6471df7e6696560089c:/sdk/cwl/arvados_cwl/crunch_script.py diff --git a/sdk/cwl/arvados_cwl/crunch_script.py b/sdk/cwl/arvados_cwl/crunch_script.py index 71c65bca88..8073620be1 100644 --- a/sdk/cwl/arvados_cwl/crunch_script.py +++ b/sdk/cwl/arvados_cwl/crunch_script.py @@ -19,10 +19,12 @@ import re import functools from arvados.api import OrderedJsonModel -from cwltool.process import shortname, adjustFileObjs, adjustDirObjs, getListing, normalizeFilesDirs +from cwltool.process import shortname, adjustFileObjs, adjustDirObjs, normalizeFilesDirs from cwltool.load_tool import load_tool from cwltool.errors import WorkflowException +from .fsaccess import CollectionFetcher + logger = logging.getLogger('arvados.cwl-runner') def run(): @@ -62,11 +64,11 @@ def run(): adjustFileObjs(job_order_object, keeppathObj) adjustDirObjs(job_order_object, keeppathObj) normalizeFilesDirs(job_order_object) - adjustDirObjs(job_order_object, functools.partial(getListing, arvados_cwl.fsaccess.CollectionFsAccess("", api_client=api))) output_name = None output_tags = None enable_reuse = True + on_error = "continue" if "arv:output_name" in job_order_object: output_name = job_order_object["arv:output_name"] del job_order_object["arv:output_name"] @@ -86,7 +88,10 @@ def run(): runner = arvados_cwl.ArvCwlRunner(api_client=arvados.api('v1', model=OrderedJsonModel()), output_name=output_name, output_tags=output_tags) - t = load_tool(toolpath, runner.arv_make_tool) + t = load_tool(toolpath, runner.arv_make_tool, + fetcher_constructor=functools.partial(CollectionFetcher, + api_client=api, + keep_client=arvados.keep.KeepClient(api_client=api, num_retries=4))) args = argparse.Namespace() args.project_uuid = arvados.current_job()["owner_uuid"] @@ -99,7 +104,7 @@ def run(): args.basedir = os.getcwd() args.name = None args.cwl_runner_job={"uuid": arvados.current_job()["uuid"], "state": arvados.current_job()["state"]} - outputObj = runner.arv_executor(t, job_order_object, **vars(args)) + runner.arv_executor(t, job_order_object, **vars(args)) except Exception as e: if isinstance(e, WorkflowException): logging.info("Workflow error %s", e)