3 # Implement cwl-runner interface for submitting and running work on Arvados, using
4 # either the Crunch jobs API or Crunch containers API.
12 from functools import partial
13 import pkg_resources # part of setuptools
15 from cwltool.errors import WorkflowException
17 import cwltool.workflow
22 from .arvcontainer import ArvadosContainer, RunnerContainer
23 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
24 from .arvtool import ArvadosCommandTool
25 from .fsaccess import CollectionFsAccess
26 from .arvworkflow import make_workflow
28 from cwltool.process import shortname, UnsupportedRequirement
29 from cwltool.pathmapper import adjustFileObjs
30 from cwltool.draft2tool import compute_checksums
31 from arvados.api import OrderedJsonModel
33 logger = logging.getLogger('arvados.cwl-runner')
34 logger.setLevel(logging.INFO)
36 class ArvCwlRunner(object):
37 """Execute a CWL tool or workflow, submit work (using either jobs or
38 containers API), wait for them to complete, and report output.
42 def __init__(self, api_client, work_api=None):
45 self.lock = threading.Lock()
46 self.cond = threading.Condition(self.lock)
47 self.final_output = None
48 self.final_status = None
52 self.work_api = work_api
53 self.stop_polling = threading.Event()
56 if self.work_api is None:
57 # todo: autodetect API to use.
58 self.work_api = "jobs"
60 if self.work_api not in ("containers", "jobs"):
61 raise Exception("Unsupported API '%s'" % self.work_api)
63 def arvMakeTool(self, toolpath_object, **kwargs):
64 if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
65 return ArvadosCommandTool(self, toolpath_object, work_api=self.work_api, **kwargs)
67 return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
69 def output_callback(self, out, processStatus):
70 if processStatus == "success":
71 logger.info("Overall process status is %s", processStatus)
73 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
74 body={"state": "Complete"}).execute(num_retries=self.num_retries)
76 logger.warn("Overall process status is %s", processStatus)
78 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
79 body={"state": "Failed"}).execute(num_retries=self.num_retries)
80 self.final_status = processStatus
81 self.final_output = out
83 def on_message(self, event):
84 if "object_uuid" in event:
85 if event["object_uuid"] in self.processes and event["event_type"] == "update":
86 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
87 uuid = event["object_uuid"]
89 j = self.processes[uuid]
90 logger.info("Job %s (%s) is Running", j.name, uuid)
92 j.update_pipeline_component(event["properties"]["new_attributes"])
93 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
94 uuid = event["object_uuid"]
97 j = self.processes[uuid]
98 logger.info("Job %s (%s) is %s", j.name, uuid, event["properties"]["new_attributes"]["state"])
99 j.done(event["properties"]["new_attributes"])
104 def poll_states(self):
105 """Poll status of jobs or containers listed in the processes dict.
107 Runs in a separate thread.
111 self.stop_polling.wait(15)
112 if self.stop_polling.is_set():
115 keys = self.processes.keys()
119 if self.work_api == "containers":
120 table = self.poll_api.containers()
121 elif self.work_api == "jobs":
122 table = self.poll_api.jobs()
125 proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
126 except Exception as e:
127 logger.warn("Error checking states on API server: %s", e)
130 for p in proc_states["items"]:
132 "object_uuid": p["uuid"],
133 "event_type": "update",
139 def get_uploaded(self):
140 return self.uploaded.copy()
142 def add_uploaded(self, src, pair):
143 self.uploaded[src] = pair
145 def check_writable(self, obj):
146 if isinstance(obj, dict):
147 if obj.get("writable"):
148 raise UnsupportedRequirement("InitialWorkDir feature 'writable: true' not supported")
149 for v in obj.itervalues():
150 self.check_writable(v)
151 if isinstance(obj, list):
153 self.check_writable(v)
155 def arvExecutor(self, tool, job_order, **kwargs):
156 self.debug = kwargs.get("debug")
158 tool.visit(self.check_writable)
160 if kwargs.get("quiet"):
161 logger.setLevel(logging.WARN)
162 logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
164 useruuid = self.api.users().current().execute()["uuid"]
165 self.project_uuid = kwargs.get("project_uuid") if kwargs.get("project_uuid") else useruuid
167 make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess, api_client=self.api)
168 self.fs_access = make_fs_access(kwargs["basedir"])
170 if kwargs.get("create_template"):
171 tmpl = RunnerTemplate(self, tool, job_order, kwargs.get("enable_reuse"))
173 # cwltool.main will write our return value to stdout.
176 if kwargs.get("create_workflow") or kwargs.get("update_workflow"):
177 return make_workflow(self, tool, job_order, self.project_uuid, kwargs.get("update_workflow"))
179 self.debug = kwargs.get("debug")
180 self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
182 kwargs["make_fs_access"] = make_fs_access
183 kwargs["enable_reuse"] = kwargs.get("enable_reuse")
184 kwargs["use_container"] = True
185 kwargs["tmpdir_prefix"] = "tmp"
186 kwargs["on_error"] = "continue"
187 kwargs["compute_checksum"] = kwargs.get("compute_checksum")
189 if self.work_api == "containers":
190 kwargs["outdir"] = "/var/spool/cwl"
191 kwargs["docker_outdir"] = "/var/spool/cwl"
192 kwargs["tmpdir"] = "/tmp"
193 kwargs["docker_tmpdir"] = "/tmp"
194 elif self.work_api == "jobs":
195 kwargs["outdir"] = "$(task.outdir)"
196 kwargs["docker_outdir"] = "$(task.outdir)"
197 kwargs["tmpdir"] = "$(task.tmpdir)"
200 if kwargs.get("submit"):
201 if self.work_api == "containers":
202 if tool.tool["class"] == "CommandLineTool":
203 runnerjob = tool.job(job_order,
204 self.output_callback,
207 runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"))
209 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"))
211 if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and not self.work_api == "containers":
212 # Create pipeline for local run
213 self.pipeline = self.api.pipeline_instances().create(
215 "owner_uuid": self.project_uuid,
216 "name": shortname(tool.tool["id"]),
218 "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
219 logger.info("Pipeline instance %s", self.pipeline["uuid"])
221 if runnerjob and not kwargs.get("wait"):
223 return runnerjob.uuid
225 self.poll_api = arvados.api('v1')
226 self.polling_thread = threading.Thread(target=self.poll_states)
227 self.polling_thread.start()
230 jobiter = iter((runnerjob,))
232 if "cwl_runner_job" in kwargs:
233 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
234 jobiter = tool.job(job_order,
235 self.output_callback,
240 # Will continue to hold the lock for the duration of this code
241 # except when in cond.wait(), at which point on_message can update
242 # job state and process output callbacks.
244 for runnable in jobiter:
246 runnable.run(**kwargs)
251 logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
254 while self.processes:
257 except UnsupportedRequirement:
260 if sys.exc_info()[0] is KeyboardInterrupt:
261 logger.error("Interrupted, marking pipeline as failed")
263 logger.error("Caught unhandled exception, marking pipeline as failed. Error was: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
265 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
266 body={"state": "Failed"}).execute(num_retries=self.num_retries)
267 if runnerjob and runnerjob.uuid and self.work_api == "containers":
268 self.api.container_requests().update(uuid=runnerjob.uuid,
269 body={"priority": "0"}).execute(num_retries=self.num_retries)
272 self.stop_polling.set()
273 self.polling_thread.join()
275 if self.final_status == "UnsupportedRequirement":
276 raise UnsupportedRequirement("Check log for details.")
278 if self.final_status != "success":
279 raise WorkflowException("Workflow failed.")
281 if self.final_output is None:
282 raise WorkflowException("Workflow did not return a result.")
284 if kwargs.get("compute_checksum"):
285 adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
287 return self.final_output
291 """Print version string of key packages for provenance and debugging."""
293 arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
294 arvpkg = pkg_resources.require("arvados-python-client")
295 cwlpkg = pkg_resources.require("cwltool")
297 return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
298 "arvados-python-client", arvpkg[0].version,
299 "cwltool", cwlpkg[0].version)
302 def arg_parser(): # type: () -> argparse.ArgumentParser
303 parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
305 parser.add_argument("--basedir", type=str,
306 help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
307 parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
308 help="Output directory, default current directory")
310 parser.add_argument("--eval-timeout",
311 help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
314 parser.add_argument("--version", action="store_true", help="Print version and exit")
316 exgroup = parser.add_mutually_exclusive_group()
317 exgroup.add_argument("--verbose", action="store_true", help="Default logging")
318 exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
319 exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
321 parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
323 exgroup = parser.add_mutually_exclusive_group()
324 exgroup.add_argument("--enable-reuse", action="store_true",
325 default=True, dest="enable_reuse",
327 exgroup.add_argument("--disable-reuse", action="store_false",
328 default=True, dest="enable_reuse",
331 parser.add_argument("--project-uuid", type=str, help="Project that will own the workflow jobs, if not provided, will go to home project.")
332 parser.add_argument("--ignore-docker-for-reuse", action="store_true",
333 help="Ignore Docker image version when deciding whether to reuse past jobs.",
336 exgroup = parser.add_mutually_exclusive_group()
337 exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
338 default=True, dest="submit")
339 exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
340 default=True, dest="submit")
341 exgroup.add_argument("--create-template", action="store_true", help="Create an Arvados pipeline template.")
342 exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow.")
343 exgroup.add_argument("--update-workflow", type=str, help="Update Arvados workflow.")
345 exgroup = parser.add_mutually_exclusive_group()
346 exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
347 default=True, dest="wait")
348 exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
349 default=True, dest="wait")
351 parser.add_argument("--api", type=str,
352 default=None, dest="work_api",
353 help="Select work submission API, one of 'jobs' or 'containers'.")
355 parser.add_argument("--compute-checksum", action="store_true", default=False,
356 help="Compute checksum of contents while collecting outputs",
357 dest="compute_checksum")
359 parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
360 parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
365 def main(args, stdout, stderr, api_client=None):
366 parser = arg_parser()
368 job_order_object = None
369 arvargs = parser.parse_args(args)
370 if (arvargs.create_template or arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
371 job_order_object = ({}, "")
374 if api_client is None:
375 api_client=arvados.api('v1', model=OrderedJsonModel())
376 runner = ArvCwlRunner(api_client, work_api=arvargs.work_api)
377 except Exception as e:
381 arvargs.conformance_test = None
382 arvargs.use_container = True
384 return cwltool.main.main(args=arvargs,
387 executor=runner.arvExecutor,
388 makeTool=runner.arvMakeTool,
389 versionfunc=versionstring,
390 job_order_object=job_order_object,
391 make_fs_access=partial(CollectionFsAccess, api_client=api_client))