X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/0066dc77abc461090fe98bcee7c6e324a5ca43a1..5624fec61db977d386ce03ca333241c74ca251b5:/sdk/cwl/arvados_cwl/crunch_script.py diff --git a/sdk/cwl/arvados_cwl/crunch_script.py b/sdk/cwl/arvados_cwl/crunch_script.py index 9b0680bc83..f33619391d 100644 --- a/sdk/cwl/arvados_cwl/crunch_script.py +++ b/sdk/cwl/arvados_cwl/crunch_script.py @@ -23,9 +23,14 @@ from cwltool.process import shortname, adjustFileObjs, adjustDirObjs, getListing from cwltool.load_tool import load_tool from cwltool.errors import WorkflowException +from .fsaccess import CollectionFetcher + logger = logging.getLogger('arvados.cwl-runner') def run(): + # Timestamps are added by crunch-job, so don't print redundant timestamps. + arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s')) + # Print package versions logger.info(arvados_cwl.versionstring()) @@ -36,6 +41,7 @@ def run(): runner = None try: job_order_object = arvados.current_job()['script_parameters'] + toolpath = "file://%s/%s" % (os.environ['TASK_KEEPMOUNT'], job_order_object.pop("cwl:tool")) pdh_path = re.compile(r'^[0-9a-f]{32}\+\d+(/.+)?$') @@ -48,8 +54,6 @@ def run(): def keeppathObj(v): v["location"] = keeppath(v["location"]) - job_order_object["cwl:tool"] = "file://%s/%s" % (os.environ['TASK_KEEPMOUNT'], job_order_object["cwl:tool"]) - for k,v in job_order_object.items(): if isinstance(v, basestring) and arvados.util.keep_locator_pattern.match(v): job_order_object[k] = { @@ -63,25 +67,45 @@ def run(): adjustDirObjs(job_order_object, functools.partial(getListing, arvados_cwl.fsaccess.CollectionFsAccess("", api_client=api))) output_name = None + output_tags = None + enable_reuse = True + on_error = "continue" if "arv:output_name" in job_order_object: output_name = job_order_object["arv:output_name"] del job_order_object["arv:output_name"] + if "arv:output_tags" in job_order_object: + output_tags = job_order_object["arv:output_tags"] + del job_order_object["arv:output_tags"] + + if "arv:enable_reuse" in job_order_object: + enable_reuse = job_order_object["arv:enable_reuse"] + del job_order_object["arv:enable_reuse"] + + if "arv:on_error" in job_order_object: + on_error = job_order_object["arv:on_error"] + del job_order_object["arv:on_error"] + runner = arvados_cwl.ArvCwlRunner(api_client=arvados.api('v1', model=OrderedJsonModel()), - output_name=output_name) + output_name=output_name, output_tags=output_tags) - t = load_tool(job_order_object, runner.arv_make_tool) + t = load_tool(toolpath, runner.arv_make_tool, + fetcher_constructor=functools.partial(CollectionFetcher, + api_client=api, + keep_client=arvados.keep.KeepClient(api_client=api, num_retries=4))) args = argparse.Namespace() args.project_uuid = arvados.current_job()["owner_uuid"] - args.enable_reuse = True + args.enable_reuse = enable_reuse + args.on_error = on_error args.submit = False - args.debug = True + args.debug = False args.quiet = False args.ignore_docker_for_reuse = False args.basedir = os.getcwd() + args.name = None args.cwl_runner_job={"uuid": arvados.current_job()["uuid"], "state": arvados.current_job()["state"]} - outputObj = runner.arv_executor(t, job_order_object, **vars(args)) + runner.arv_executor(t, job_order_object, **vars(args)) except Exception as e: if isinstance(e, WorkflowException): logging.info("Workflow error %s", e)