X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/0e5198142fdba0ce2af6eb2852d45dff46ffb2e2..e336e28f4ce9c13aad98d059d2befb505bff365b:/sdk/cwl/arvados_cwl/crunch_script.py diff --git a/sdk/cwl/arvados_cwl/crunch_script.py b/sdk/cwl/arvados_cwl/crunch_script.py index e5c6d67ac9..849b177aeb 100644 --- a/sdk/cwl/arvados_cwl/crunch_script.py +++ b/sdk/cwl/arvados_cwl/crunch_script.py @@ -21,6 +21,7 @@ import functools from arvados.api import OrderedJsonModel from cwltool.process import shortname, adjustFileObjs, adjustDirObjs, getListing, normalizeFilesDirs from cwltool.load_tool import load_tool +from cwltool.errors import WorkflowException logger = logging.getLogger('arvados.cwl-runner') @@ -32,6 +33,7 @@ def run(): arvados_cwl.add_arv_hints() + runner = None try: job_order_object = arvados.current_job()['script_parameters'] @@ -61,18 +63,28 @@ def run(): adjustDirObjs(job_order_object, functools.partial(getListing, arvados_cwl.fsaccess.CollectionFsAccess("", api_client=api))) output_name = None + output_tags = None + enable_reuse = True if "arv:output_name" in job_order_object: output_name = job_order_object["arv:output_name"] del job_order_object["arv:output_name"] + if "arv:output_tags" in job_order_object: + output_tags = job_order_object["arv:output_tags"] + del job_order_object["arv:output_tags"] + + if "arv:enable_reuse" in job_order_object: + enable_reuse = job_order_object["arv:enable_reuse"] + del job_order_object["arv:enable_reuse"] + runner = arvados_cwl.ArvCwlRunner(api_client=arvados.api('v1', model=OrderedJsonModel()), - output_name=output_name) + output_name=output_name, output_tags=output_tags) t = load_tool(job_order_object, runner.arv_make_tool) args = argparse.Namespace() args.project_uuid = arvados.current_job()["owner_uuid"] - args.enable_reuse = True + args.enable_reuse = enable_reuse args.submit = False args.debug = True args.quiet = False @@ -80,23 +92,18 @@ def run(): args.basedir = os.getcwd() args.cwl_runner_job={"uuid": arvados.current_job()["uuid"], "state": arvados.current_job()["state"]} outputObj = runner.arv_executor(t, job_order_object, **vars(args)) - - if runner.final_output_collection: + except Exception as e: + if isinstance(e, WorkflowException): + logging.info("Workflow error %s", e) + else: + logging.exception("Unhandled exception") + if runner and runner.final_output_collection: outputCollection = runner.final_output_collection.portable_data_hash() else: outputCollection = None - api.job_tasks().update(uuid=arvados.current_task()['uuid'], body={ 'output': outputCollection, - 'success': True, - 'progress':1.0 - }).execute() - except Exception as e: - logging.exception("Unhandled exception") - api.job_tasks().update(uuid=arvados.current_task()['uuid'], - body={ - 'output': None, 'success': False, 'progress':1.0 }).execute()