1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 # Crunch script integration for running arvados-cwl-runner (importing
6 # arvados_cwl module) inside a crunch job.
8 # This gets the job record, transforms the script parameters into a valid CWL
9 # input object, then executes the CWL runner to run the underlying workflow or
10 # tool. When the workflow completes, record the output object in an output
11 # collection for this runner job.
13 from past.builtins import basestring
14 from future.utils import viewitems
18 import arvados.collection
28 from arvados.api import OrderedJsonModel
29 from cwltool.process import shortname
30 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, normalizeFilesDirs
31 from cwltool.load_tool import load_tool
32 from cwltool.errors import WorkflowException
33 from arvados_cwl.context import ArvRuntimeContext
35 from .fsaccess import CollectionFetcher, CollectionFsAccess
37 logger = logging.getLogger('arvados.cwl-runner')
40 # Timestamps are added by crunch-job, so don't print redundant timestamps.
41 arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
43 # Print package versions
44 logger.info(arvados_cwl.versionstring())
46 api = arvados.api("v1")
48 arvados_cwl.add_arv_hints()
52 job_order_object = arvados.current_job()['script_parameters']
53 toolpath = "file://%s/%s" % (os.environ['TASK_KEEPMOUNT'], job_order_object.pop("cwl:tool"))
55 pdh_path = re.compile(r'^[0-9a-f]{32}\+\d+(/.+)?$')
65 v["location"] = keeppath(v["location"])
67 for k,v in viewitems(job_order_object):
68 if isinstance(v, basestring) and arvados.util.keep_locator_pattern.match(v):
69 job_order_object[k] = {
71 "location": "keep:%s" % v
74 adjustFileObjs(job_order_object, keeppathObj)
75 adjustDirObjs(job_order_object, keeppathObj)
76 normalizeFilesDirs(job_order_object)
84 if "arv:output_name" in job_order_object:
85 output_name = job_order_object["arv:output_name"]
86 del job_order_object["arv:output_name"]
88 if "arv:output_tags" in job_order_object:
89 output_tags = job_order_object["arv:output_tags"]
90 del job_order_object["arv:output_tags"]
92 if "arv:enable_reuse" in job_order_object:
93 enable_reuse = job_order_object["arv:enable_reuse"]
94 del job_order_object["arv:enable_reuse"]
96 if "arv:on_error" in job_order_object:
97 on_error = job_order_object["arv:on_error"]
98 del job_order_object["arv:on_error"]
100 if "arv:debug" in job_order_object:
101 debug = job_order_object["arv:debug"]
102 del job_order_object["arv:debug"]
104 arvargs = argparse.Namespace()
105 arvargs.work_api = "jobs"
106 arvargs.output_name = output_name
107 arvargs.output_tags = output_tags
108 arvargs.thread_count = 1
109 arvargs.collection_cache_size = None
111 runner = arvados_cwl.ArvCwlExecutor(api_client=arvados.safeapi.ThreadSafeApiCache(
112 api_params={"model": OrderedJsonModel()}, keep_params={"num_retries": 4}),
115 make_fs_access = functools.partial(CollectionFsAccess,
116 collection_cache=runner.collection_cache)
118 t = load_tool(toolpath, runner.loadingContext)
121 logger.setLevel(logging.DEBUG)
122 logging.getLogger('arvados').setLevel(logging.DEBUG)
123 logging.getLogger("cwltool").setLevel(logging.DEBUG)
125 args = ArvRuntimeContext(vars(arvargs))
126 args.project_uuid = arvados.current_job()["owner_uuid"]
127 args.enable_reuse = enable_reuse
128 args.on_error = on_error
132 args.ignore_docker_for_reuse = False
133 args.basedir = os.getcwd()
135 args.cwl_runner_job={"uuid": arvados.current_job()["uuid"], "state": arvados.current_job()["state"]}
136 args.make_fs_access = make_fs_access
137 args.trash_intermediate = False
138 args.intermediate_output_ttl = 0
139 args.priority = arvados_cwl.DEFAULT_PRIORITY
140 args.do_validate = True
141 args.disable_js_validation = False
142 args.tmp_outdir_prefix = "tmp"
144 runner.arv_executor(t, job_order_object, args, logger=logger)
145 except Exception as e:
146 if isinstance(e, WorkflowException):
147 logging.info("Workflow error %s", e)
149 logging.exception("Unhandled exception")
150 if runner and runner.final_output_collection:
151 outputCollection = runner.final_output_collection.portable_data_hash()
153 outputCollection = None
154 api.job_tasks().update(uuid=arvados.current_task()['uuid'],
156 'output': outputCollection,