3 # Implement cwl-runner interface for submitting and running work on Arvados, using
4 # either the Crunch jobs API or Crunch containers API.
12 from functools import partial
13 import pkg_resources # part of setuptools
15 from cwltool.errors import WorkflowException
17 import cwltool.workflow
22 from .arvcontainer import ArvadosContainer, RunnerContainer
23 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
24 from .arvtool import ArvadosCommandTool
25 from .fsaccess import CollectionFsAccess
27 from cwltool.process import shortname, UnsupportedRequirement
28 from cwltool.pathmapper import adjustFileObjs
29 from cwltool.draft2tool import compute_checksums
30 from arvados.api import OrderedJsonModel
32 logger = logging.getLogger('arvados.cwl-runner')
33 logger.setLevel(logging.INFO)
35 class ArvCwlRunner(object):
36 """Execute a CWL tool or workflow, submit work (using either jobs or
37 containers API), wait for them to complete, and report output.
41 def __init__(self, api_client, work_api=None):
44 self.lock = threading.Lock()
45 self.cond = threading.Condition(self.lock)
46 self.final_output = None
47 self.final_status = None
51 self.work_api = work_api
52 self.stop_polling = threading.Event()
55 if self.work_api is None:
56 # todo: autodetect API to use.
57 self.work_api = "jobs"
59 if self.work_api not in ("containers", "jobs"):
60 raise Exception("Unsupported API '%s'" % self.work_api)
62 def arvMakeTool(self, toolpath_object, **kwargs):
63 if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
64 return ArvadosCommandTool(self, toolpath_object, work_api=self.work_api, **kwargs)
66 return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
68 def output_callback(self, out, processStatus):
69 if processStatus == "success":
70 logger.info("Overall process status is %s", processStatus)
72 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
73 body={"state": "Complete"}).execute(num_retries=self.num_retries)
75 logger.warn("Overall process status is %s", processStatus)
77 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
78 body={"state": "Failed"}).execute(num_retries=self.num_retries)
79 self.final_status = processStatus
80 self.final_output = out
82 def on_message(self, event):
83 if "object_uuid" in event:
84 if event["object_uuid"] in self.processes and event["event_type"] == "update":
85 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
86 uuid = event["object_uuid"]
88 j = self.processes[uuid]
89 logger.info("Job %s (%s) is Running", j.name, uuid)
91 j.update_pipeline_component(event["properties"]["new_attributes"])
92 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
93 uuid = event["object_uuid"]
96 j = self.processes[uuid]
97 logger.info("Job %s (%s) is %s", j.name, uuid, event["properties"]["new_attributes"]["state"])
98 j.done(event["properties"]["new_attributes"])
103 def poll_states(self):
104 """Poll status of jobs or containers listed in the processes dict.
106 Runs in a separate thread.
110 self.stop_polling.wait(15)
111 if self.stop_polling.is_set():
114 keys = self.processes.keys()
118 if self.work_api == "containers":
119 table = self.poll_api.containers()
120 elif self.work_api == "jobs":
121 table = self.poll_api.jobs()
124 proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
125 except Exception as e:
126 logger.warn("Error checking states on API server: %s", e)
129 for p in proc_states["items"]:
131 "object_uuid": p["uuid"],
132 "event_type": "update",
138 def get_uploaded(self):
139 return self.uploaded.copy()
141 def add_uploaded(self, src, pair):
142 self.uploaded[src] = pair
144 def check_writable(self, obj):
145 if isinstance(obj, dict):
146 if obj.get("writable"):
147 raise UnsupportedRequirement("InitialWorkDir feature 'writable: true' not supported")
148 for v in obj.itervalues():
149 self.check_writable(v)
150 if isinstance(obj, list):
152 self.check_writable(v)
154 def arvExecutor(self, tool, job_order, **kwargs):
155 self.debug = kwargs.get("debug")
157 tool.visit(self.check_writable)
159 if kwargs.get("quiet"):
160 logger.setLevel(logging.WARN)
161 logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
163 useruuid = self.api.users().current().execute()["uuid"]
164 self.project_uuid = kwargs.get("project_uuid") if kwargs.get("project_uuid") else useruuid
166 make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess, api_client=self.api)
167 self.fs_access = make_fs_access(kwargs["basedir"])
169 if kwargs.get("create_template"):
170 tmpl = RunnerTemplate(self, tool, job_order, kwargs.get("enable_reuse"))
172 # cwltool.main will write our return value to stdout.
175 self.debug = kwargs.get("debug")
176 self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
178 kwargs["make_fs_access"] = make_fs_access
179 kwargs["enable_reuse"] = kwargs.get("enable_reuse")
180 kwargs["use_container"] = True
181 kwargs["tmpdir_prefix"] = "tmp"
182 kwargs["on_error"] = "continue"
183 kwargs["compute_checksum"] = kwargs.get("compute_checksum")
185 if self.work_api == "containers":
186 kwargs["outdir"] = "/var/spool/cwl"
187 kwargs["docker_outdir"] = "/var/spool/cwl"
188 kwargs["tmpdir"] = "/tmp"
189 kwargs["docker_tmpdir"] = "/tmp"
190 elif self.work_api == "jobs":
191 kwargs["outdir"] = "$(task.outdir)"
192 kwargs["docker_outdir"] = "$(task.outdir)"
193 kwargs["tmpdir"] = "$(task.tmpdir)"
196 if kwargs.get("submit"):
197 if self.work_api == "containers":
198 if tool.tool["class"] == "CommandLineTool":
199 runnerjob = tool.job(job_order,
200 self.output_callback,
203 runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"))
205 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"))
207 if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and not self.work_api == "containers":
208 # Create pipeline for local run
209 self.pipeline = self.api.pipeline_instances().create(
211 "owner_uuid": self.project_uuid,
212 "name": shortname(tool.tool["id"]),
214 "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
215 logger.info("Pipeline instance %s", self.pipeline["uuid"])
217 if runnerjob and not kwargs.get("wait"):
219 return runnerjob.uuid
221 self.poll_api = arvados.api('v1')
222 self.polling_thread = threading.Thread(target=self.poll_states)
223 self.polling_thread.start()
226 jobiter = iter((runnerjob,))
228 if "cwl_runner_job" in kwargs:
229 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
230 jobiter = tool.job(job_order,
231 self.output_callback,
236 # Will continue to hold the lock for the duration of this code
237 # except when in cond.wait(), at which point on_message can update
238 # job state and process output callbacks.
240 for runnable in jobiter:
242 runnable.run(**kwargs)
247 logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
250 while self.processes:
253 except UnsupportedRequirement:
256 if sys.exc_info()[0] is KeyboardInterrupt:
257 logger.error("Interrupted, marking pipeline as failed")
259 logger.error("Caught unhandled exception, marking pipeline as failed. Error was: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
261 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
262 body={"state": "Failed"}).execute(num_retries=self.num_retries)
263 if runnerjob and runnerjob.uuid and self.work_api == "containers":
264 self.api.container_requests().update(uuid=runnerjob.uuid,
265 body={"priority": "0"}).execute(num_retries=self.num_retries)
268 self.stop_polling.set()
269 self.polling_thread.join()
271 if self.final_status == "UnsupportedRequirement":
272 raise UnsupportedRequirement("Check log for details.")
274 if self.final_status != "success":
275 raise WorkflowException("Workflow failed.")
277 if self.final_output is None:
278 raise WorkflowException("Workflow did not return a result.")
280 if kwargs.get("compute_checksum"):
281 adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
283 return self.final_output
287 """Print version string of key packages for provenance and debugging."""
289 arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
290 arvpkg = pkg_resources.require("arvados-python-client")
291 cwlpkg = pkg_resources.require("cwltool")
293 return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
294 "arvados-python-client", arvpkg[0].version,
295 "cwltool", cwlpkg[0].version)
298 def arg_parser(): # type: () -> argparse.ArgumentParser
299 parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
301 parser.add_argument("--basedir", type=str,
302 help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
303 parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
304 help="Output directory, default current directory")
306 parser.add_argument("--eval-timeout",
307 help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
310 parser.add_argument("--version", action="store_true", help="Print version and exit")
312 exgroup = parser.add_mutually_exclusive_group()
313 exgroup.add_argument("--verbose", action="store_true", help="Default logging")
314 exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
315 exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
317 parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
319 exgroup = parser.add_mutually_exclusive_group()
320 exgroup.add_argument("--enable-reuse", action="store_true",
321 default=True, dest="enable_reuse",
323 exgroup.add_argument("--disable-reuse", action="store_false",
324 default=True, dest="enable_reuse",
327 parser.add_argument("--project-uuid", type=str, help="Project that will own the workflow jobs, if not provided, will go to home project.")
328 parser.add_argument("--ignore-docker-for-reuse", action="store_true",
329 help="Ignore Docker image version when deciding whether to reuse past jobs.",
332 exgroup = parser.add_mutually_exclusive_group()
333 exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
334 default=True, dest="submit")
335 exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
336 default=True, dest="submit")
337 exgroup.add_argument("--create-template", action="store_true", help="Create an Arvados pipeline template.")
339 exgroup = parser.add_mutually_exclusive_group()
340 exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
341 default=True, dest="wait")
342 exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
343 default=True, dest="wait")
345 parser.add_argument("--api", type=str,
346 default=None, dest="work_api",
347 help="Select work submission API, one of 'jobs' or 'containers'.")
349 parser.add_argument("--compute-checksum", action="store_true", default=False,
350 help="Compute checksum of contents while collecting outputs",
351 dest="compute_checksum")
353 parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
354 parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
359 def main(args, stdout, stderr, api_client=None):
360 parser = arg_parser()
362 job_order_object = None
363 arvargs = parser.parse_args(args)
364 if arvargs.create_template and not arvargs.job_order:
365 job_order_object = ({}, "")
368 if api_client is None:
369 api_client=arvados.api('v1', model=OrderedJsonModel())
370 runner = ArvCwlRunner(api_client, work_api=arvargs.work_api)
371 except Exception as e:
375 arvargs.conformance_test = None
376 arvargs.use_container = True
378 return cwltool.main.main(args=arvargs,
381 executor=runner.arvExecutor,
382 makeTool=runner.arvMakeTool,
383 versionfunc=versionstring,
384 job_order_object=job_order_object,
385 make_fs_access=partial(CollectionFsAccess, api_client=api_client))