3 # Implement cwl-runner interface for submitting and running work on Arvados, using
4 # either the Crunch jobs API or Crunch containers API.
12 from functools import partial
13 import pkg_resources # part of setuptools
15 from cwltool.errors import WorkflowException
17 import cwltool.workflow
22 from .arvcontainer import ArvadosContainer, RunnerContainer
23 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
24 from .arvtool import ArvadosCommandTool
25 from .fsaccess import CollectionFsAccess
27 from cwltool.process import shortname, UnsupportedRequirement
28 from cwltool.pathmapper import adjustFileObjs
29 from cwltool.draft2tool import compute_checksums
30 from arvados.api import OrderedJsonModel
32 logger = logging.getLogger('arvados.cwl-runner')
33 logger.setLevel(logging.INFO)
35 class ArvCwlRunner(object):
36 """Execute a CWL tool or workflow, submit work (using either jobs or
37 containers API), wait for them to complete, and report output.
41 def __init__(self, api_client, work_api=None):
44 self.lock = threading.Lock()
45 self.cond = threading.Condition(self.lock)
46 self.final_output = None
47 self.final_status = None
51 self.work_api = work_api
52 self.stop_polling = threading.Event()
55 if self.work_api is None:
56 # todo: autodetect API to use.
57 self.work_api = "jobs"
59 if self.work_api not in ("containers", "jobs"):
60 raise Exception("Unsupported API '%s'" % self.work_api)
62 def arvMakeTool(self, toolpath_object, **kwargs):
63 if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
64 kwargs["work_api"] = self.work_api
65 return ArvadosCommandTool(self, toolpath_object, **kwargs)
67 return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
69 def output_callback(self, out, processStatus):
70 if processStatus == "success":
71 logger.info("Overall process status is %s", processStatus)
73 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
74 body={"state": "Complete"}).execute(num_retries=self.num_retries)
76 logger.warn("Overall process status is %s", processStatus)
78 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
79 body={"state": "Failed"}).execute(num_retries=self.num_retries)
80 self.final_status = processStatus
81 self.final_output = out
83 def on_message(self, event):
84 if "object_uuid" in event:
85 if event["object_uuid"] in self.processes and event["event_type"] == "update":
86 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
87 uuid = event["object_uuid"]
89 j = self.processes[uuid]
90 logger.info("Job %s (%s) is Running", j.name, uuid)
92 j.update_pipeline_component(event["properties"]["new_attributes"])
93 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
94 uuid = event["object_uuid"]
97 j = self.processes[uuid]
98 logger.info("Job %s (%s) is %s", j.name, uuid, event["properties"]["new_attributes"]["state"])
99 j.done(event["properties"]["new_attributes"])
104 def poll_states(self):
105 """Poll status of jobs or containers listed in the processes dict.
107 Runs in a separate thread.
111 self.stop_polling.wait(15)
112 if self.stop_polling.is_set():
115 keys = self.processes.keys()
119 if self.work_api == "containers":
120 table = self.poll_api.containers()
121 elif self.work_api == "jobs":
122 table = self.poll_api.jobs()
125 proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
126 except Exception as e:
127 logger.warn("Error checking states on API server: %s", e)
130 for p in proc_states["items"]:
132 "object_uuid": p["uuid"],
133 "event_type": "update",
139 def get_uploaded(self):
140 return self.uploaded.copy()
142 def add_uploaded(self, src, pair):
143 self.uploaded[src] = pair
145 def check_writable(self, obj):
146 if isinstance(obj, dict):
147 if obj.get("writable"):
148 raise UnsupportedRequirement("InitialWorkDir feature 'writable: true' not supported")
149 for v in obj.itervalues():
150 self.check_writable(v)
151 if isinstance(obj, list):
153 self.check_writable(v)
155 def arvExecutor(self, tool, job_order, **kwargs):
156 self.debug = kwargs.get("debug")
158 tool.visit(self.check_writable)
160 if kwargs.get("quiet"):
161 logger.setLevel(logging.WARN)
162 logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
164 useruuid = self.api.users().current().execute()["uuid"]
165 self.project_uuid = kwargs.get("project_uuid") if kwargs.get("project_uuid") else useruuid
167 make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess, api_client=self.api)
168 self.fs_access = make_fs_access(kwargs["basedir"])
170 if kwargs.get("create_template"):
171 tmpl = RunnerTemplate(self, tool, job_order, kwargs.get("enable_reuse"))
173 # cwltool.main will write our return value to stdout.
176 self.debug = kwargs.get("debug")
177 self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
179 kwargs["make_fs_access"] = make_fs_access
180 kwargs["enable_reuse"] = kwargs.get("enable_reuse")
181 kwargs["use_container"] = True
182 kwargs["tmpdir_prefix"] = "tmp"
183 kwargs["on_error"] = "continue"
184 kwargs["compute_checksum"] = kwargs.get("compute_checksum")
186 if self.work_api == "containers":
187 kwargs["outdir"] = "/var/spool/cwl"
188 kwargs["docker_outdir"] = "/var/spool/cwl"
189 kwargs["tmpdir"] = "/tmp"
190 kwargs["docker_tmpdir"] = "/tmp"
191 elif self.work_api == "jobs":
192 kwargs["outdir"] = "$(task.outdir)"
193 kwargs["docker_outdir"] = "$(task.outdir)"
194 kwargs["tmpdir"] = "$(task.tmpdir)"
197 if kwargs.get("submit"):
198 if self.work_api == "containers":
199 if tool.tool["class"] == "CommandLineTool":
200 runnerjob = tool.job(job_order,
201 self.output_callback,
204 runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"))
206 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"))
208 if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and not self.work_api == "containers":
209 # Create pipeline for local run
210 self.pipeline = self.api.pipeline_instances().create(
212 "owner_uuid": self.project_uuid,
213 "name": shortname(tool.tool["id"]),
215 "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
216 logger.info("Pipeline instance %s", self.pipeline["uuid"])
218 if runnerjob and not kwargs.get("wait"):
220 return runnerjob.uuid
222 self.poll_api = arvados.api('v1')
223 self.polling_thread = threading.Thread(target=self.poll_states)
224 self.polling_thread.start()
227 jobiter = iter((runnerjob,))
229 if "cwl_runner_job" in kwargs:
230 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
231 jobiter = tool.job(job_order,
232 self.output_callback,
237 # Will continue to hold the lock for the duration of this code
238 # except when in cond.wait(), at which point on_message can update
239 # job state and process output callbacks.
241 for runnable in jobiter:
243 runnable.run(**kwargs)
248 logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
251 while self.processes:
254 except UnsupportedRequirement:
257 if sys.exc_info()[0] is KeyboardInterrupt:
258 logger.error("Interrupted, marking pipeline as failed")
260 logger.error("Caught unhandled exception, marking pipeline as failed. Error was: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
262 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
263 body={"state": "Failed"}).execute(num_retries=self.num_retries)
264 if runnerjob and runnerjob.uuid and self.work_api == "containers":
265 self.api.container_requests().update(uuid=runnerjob.uuid,
266 body={"priority": "0"}).execute(num_retries=self.num_retries)
269 self.stop_polling.set()
270 self.polling_thread.join()
272 if self.final_status == "UnsupportedRequirement":
273 raise UnsupportedRequirement("Check log for details.")
275 if self.final_status != "success":
276 raise WorkflowException("Workflow failed.")
278 if self.final_output is None:
279 raise WorkflowException("Workflow did not return a result.")
281 if kwargs.get("compute_checksum"):
282 adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
284 return self.final_output
288 """Print version string of key packages for provenance and debugging."""
290 arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
291 arvpkg = pkg_resources.require("arvados-python-client")
292 cwlpkg = pkg_resources.require("cwltool")
294 return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
295 "arvados-python-client", arvpkg[0].version,
296 "cwltool", cwlpkg[0].version)
299 def arg_parser(): # type: () -> argparse.ArgumentParser
300 parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
302 parser.add_argument("--basedir", type=str,
303 help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
304 parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
305 help="Output directory, default current directory")
307 parser.add_argument("--eval-timeout",
308 help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
311 parser.add_argument("--version", action="store_true", help="Print version and exit")
313 exgroup = parser.add_mutually_exclusive_group()
314 exgroup.add_argument("--verbose", action="store_true", help="Default logging")
315 exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
316 exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
318 parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
320 exgroup = parser.add_mutually_exclusive_group()
321 exgroup.add_argument("--enable-reuse", action="store_true",
322 default=True, dest="enable_reuse",
324 exgroup.add_argument("--disable-reuse", action="store_false",
325 default=True, dest="enable_reuse",
328 parser.add_argument("--project-uuid", type=str, help="Project that will own the workflow jobs, if not provided, will go to home project.")
329 parser.add_argument("--ignore-docker-for-reuse", action="store_true",
330 help="Ignore Docker image version when deciding whether to reuse past jobs.",
333 exgroup = parser.add_mutually_exclusive_group()
334 exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
335 default=True, dest="submit")
336 exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
337 default=True, dest="submit")
338 exgroup.add_argument("--create-template", action="store_true", help="Create an Arvados pipeline template.")
340 exgroup = parser.add_mutually_exclusive_group()
341 exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
342 default=True, dest="wait")
343 exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
344 default=True, dest="wait")
346 parser.add_argument("--api", type=str,
347 default=None, dest="work_api",
348 help="Select work submission API, one of 'jobs' or 'containers'.")
350 parser.add_argument("--compute-checksum", action="store_true", default=False,
351 help="Compute checksum of contents while collecting outputs",
352 dest="compute_checksum")
354 parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
355 parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
360 def main(args, stdout, stderr, api_client=None):
361 parser = arg_parser()
363 job_order_object = None
364 arvargs = parser.parse_args(args)
365 if arvargs.create_template and not arvargs.job_order:
366 job_order_object = ({}, "")
369 if api_client is None:
370 api_client=arvados.api('v1', model=OrderedJsonModel())
371 runner = ArvCwlRunner(api_client, work_api=arvargs.work_api)
372 except Exception as e:
376 arvargs.conformance_test = None
377 arvargs.use_container = True
379 return cwltool.main.main(args=arvargs,
382 executor=runner.arvExecutor,
383 makeTool=runner.arvMakeTool,
384 versionfunc=versionstring,
385 job_order_object=job_order_object,
386 make_fs_access=partial(CollectionFsAccess, api_client=api_client))