3 # Implement cwl-runner interface for submitting and running work on Arvados, using
4 # either the Crunch jobs API or Crunch containers API.
12 from functools import partial
13 import pkg_resources # part of setuptools
15 from cwltool.errors import WorkflowException
17 import cwltool.workflow
22 from .arvcontainer import ArvadosContainer, RunnerContainer
23 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
24 from .arvtool import ArvadosCommandTool
25 from .fsaccess import CollectionFsAccess
26 from .arvworkflow import make_workflow
27 from .perf import Perf
29 from cwltool.process import shortname, UnsupportedRequirement
30 from cwltool.pathmapper import adjustFileObjs
31 from cwltool.draft2tool import compute_checksums
32 from arvados.api import OrderedJsonModel
34 logger = logging.getLogger('arvados.cwl-runner')
35 logger.setLevel(logging.INFO)
37 class ArvCwlRunner(object):
38 """Execute a CWL tool or workflow, submit work (using either jobs or
39 containers API), wait for them to complete, and report output.
43 def __init__(self, api_client, work_api=None):
46 self.lock = threading.Lock()
47 self.cond = threading.Condition(self.lock)
48 self.final_output = None
49 self.final_status = None
53 self.work_api = work_api
54 self.stop_polling = threading.Event()
57 if self.work_api is None:
58 # todo: autodetect API to use.
59 self.work_api = "jobs"
61 if self.work_api not in ("containers", "jobs"):
62 raise Exception("Unsupported API '%s'" % self.work_api)
64 def arvMakeTool(self, toolpath_object, **kwargs):
65 if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
66 kwargs["work_api"] = self.work_api
67 return ArvadosCommandTool(self, toolpath_object, **kwargs)
69 return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
71 def output_callback(self, out, processStatus):
72 if processStatus == "success":
73 logger.info("Overall process status is %s", processStatus)
75 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
76 body={"state": "Complete"}).execute(num_retries=self.num_retries)
78 logger.warn("Overall process status is %s", processStatus)
80 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
81 body={"state": "Failed"}).execute(num_retries=self.num_retries)
82 self.final_status = processStatus
83 self.final_output = out
85 def on_message(self, event):
86 if "object_uuid" in event:
87 if event["object_uuid"] in self.processes and event["event_type"] == "update":
88 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
89 uuid = event["object_uuid"]
91 j = self.processes[uuid]
92 logger.info("Job %s (%s) is Running", j.name, uuid)
94 j.update_pipeline_component(event["properties"]["new_attributes"])
95 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
96 uuid = event["object_uuid"]
99 j = self.processes[uuid]
100 logger.info("Job %s (%s) is %s", j.name, uuid, event["properties"]["new_attributes"]["state"])
101 with Perf(logger, "done %s" % j.name):
102 j.done(event["properties"]["new_attributes"])
107 def poll_states(self):
108 """Poll status of jobs or containers listed in the processes dict.
110 Runs in a separate thread.
114 self.stop_polling.wait(15)
115 if self.stop_polling.is_set():
118 keys = self.processes.keys()
122 if self.work_api == "containers":
123 table = self.poll_api.containers()
124 elif self.work_api == "jobs":
125 table = self.poll_api.jobs()
128 proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
129 except Exception as e:
130 logger.warn("Error checking states on API server: %s", e)
133 for p in proc_states["items"]:
135 "object_uuid": p["uuid"],
136 "event_type": "update",
142 def get_uploaded(self):
143 return self.uploaded.copy()
145 def add_uploaded(self, src, pair):
146 self.uploaded[src] = pair
148 def check_writable(self, obj):
149 if isinstance(obj, dict):
150 if obj.get("writable"):
151 raise UnsupportedRequirement("InitialWorkDir feature 'writable: true' not supported")
152 for v in obj.itervalues():
153 self.check_writable(v)
154 if isinstance(obj, list):
156 self.check_writable(v)
158 def arvExecutor(self, tool, job_order, **kwargs):
159 self.debug = kwargs.get("debug")
161 tool.visit(self.check_writable)
163 if kwargs.get("quiet"):
164 logger.setLevel(logging.WARN)
165 logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
168 logger.setLevel(logging.DEBUG)
170 useruuid = self.api.users().current().execute()["uuid"]
171 self.project_uuid = kwargs.get("project_uuid") if kwargs.get("project_uuid") else useruuid
173 make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess, api_client=self.api)
174 self.fs_access = make_fs_access(kwargs["basedir"])
176 if kwargs.get("create_template"):
177 tmpl = RunnerTemplate(self, tool, job_order, kwargs.get("enable_reuse"))
179 # cwltool.main will write our return value to stdout.
182 if kwargs.get("create_workflow") or kwargs.get("update_workflow"):
183 return make_workflow(self, tool, job_order, self.project_uuid, kwargs.get("update_workflow"))
185 self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
187 kwargs["make_fs_access"] = make_fs_access
188 kwargs["enable_reuse"] = kwargs.get("enable_reuse")
189 kwargs["use_container"] = True
190 kwargs["tmpdir_prefix"] = "tmp"
191 kwargs["on_error"] = "continue"
192 kwargs["compute_checksum"] = kwargs.get("compute_checksum")
194 if self.work_api == "containers":
195 kwargs["outdir"] = "/var/spool/cwl"
196 kwargs["docker_outdir"] = "/var/spool/cwl"
197 kwargs["tmpdir"] = "/tmp"
198 kwargs["docker_tmpdir"] = "/tmp"
199 elif self.work_api == "jobs":
200 kwargs["outdir"] = "$(task.outdir)"
201 kwargs["docker_outdir"] = "$(task.outdir)"
202 kwargs["tmpdir"] = "$(task.tmpdir)"
205 if kwargs.get("submit"):
206 if self.work_api == "containers":
207 if tool.tool["class"] == "CommandLineTool":
208 runnerjob = tool.job(job_order,
209 self.output_callback,
212 runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"))
214 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"))
216 if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and not self.work_api == "containers":
217 # Create pipeline for local run
218 self.pipeline = self.api.pipeline_instances().create(
220 "owner_uuid": self.project_uuid,
221 "name": shortname(tool.tool["id"]),
223 "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
224 logger.info("Pipeline instance %s", self.pipeline["uuid"])
226 if runnerjob and not kwargs.get("wait"):
228 return runnerjob.uuid
230 self.poll_api = arvados.api('v1')
231 self.polling_thread = threading.Thread(target=self.poll_states)
232 self.polling_thread.start()
235 jobiter = iter((runnerjob,))
237 if "cwl_runner_job" in kwargs:
238 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
239 jobiter = tool.job(job_order,
240 self.output_callback,
245 # Will continue to hold the lock for the duration of this code
246 # except when in cond.wait(), at which point on_message can update
247 # job state and process output callbacks.
249 for runnable in jobiter:
251 with Perf(logger, "run"):
252 runnable.run(**kwargs)
257 logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
260 while self.processes:
263 except UnsupportedRequirement:
266 if sys.exc_info()[0] is KeyboardInterrupt:
267 logger.error("Interrupted, marking pipeline as failed")
269 logger.error("Caught unhandled exception, marking pipeline as failed. Error was: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
271 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
272 body={"state": "Failed"}).execute(num_retries=self.num_retries)
273 if runnerjob and runnerjob.uuid and self.work_api == "containers":
274 self.api.container_requests().update(uuid=runnerjob.uuid,
275 body={"priority": "0"}).execute(num_retries=self.num_retries)
278 self.stop_polling.set()
279 self.polling_thread.join()
281 if self.final_status == "UnsupportedRequirement":
282 raise UnsupportedRequirement("Check log for details.")
284 if self.final_status != "success":
285 raise WorkflowException("Workflow failed.")
287 if self.final_output is None:
288 raise WorkflowException("Workflow did not return a result.")
290 if kwargs.get("compute_checksum"):
291 adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
293 return self.final_output
297 """Print version string of key packages for provenance and debugging."""
299 arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
300 arvpkg = pkg_resources.require("arvados-python-client")
301 cwlpkg = pkg_resources.require("cwltool")
303 return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
304 "arvados-python-client", arvpkg[0].version,
305 "cwltool", cwlpkg[0].version)
308 def arg_parser(): # type: () -> argparse.ArgumentParser
309 parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
311 parser.add_argument("--basedir", type=str,
312 help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
313 parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
314 help="Output directory, default current directory")
316 parser.add_argument("--eval-timeout",
317 help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
320 parser.add_argument("--version", action="store_true", help="Print version and exit")
322 exgroup = parser.add_mutually_exclusive_group()
323 exgroup.add_argument("--verbose", action="store_true", help="Default logging")
324 exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
325 exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
327 parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
329 exgroup = parser.add_mutually_exclusive_group()
330 exgroup.add_argument("--enable-reuse", action="store_true",
331 default=True, dest="enable_reuse",
333 exgroup.add_argument("--disable-reuse", action="store_false",
334 default=True, dest="enable_reuse",
337 parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
338 parser.add_argument("--ignore-docker-for-reuse", action="store_true",
339 help="Ignore Docker image version when deciding whether to reuse past jobs.",
342 exgroup = parser.add_mutually_exclusive_group()
343 exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
344 default=True, dest="submit")
345 exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
346 default=True, dest="submit")
347 exgroup.add_argument("--create-template", action="store_true", help="Create an Arvados pipeline template.")
348 exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow.")
349 exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update existing Arvados workflow with uuid.")
351 exgroup = parser.add_mutually_exclusive_group()
352 exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
353 default=True, dest="wait")
354 exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
355 default=True, dest="wait")
357 parser.add_argument("--api", type=str,
358 default=None, dest="work_api",
359 help="Select work submission API, one of 'jobs' or 'containers'.")
361 parser.add_argument("--compute-checksum", action="store_true", default=False,
362 help="Compute checksum of contents while collecting outputs",
363 dest="compute_checksum")
365 parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
366 parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
371 def main(args, stdout, stderr, api_client=None):
372 parser = arg_parser()
374 job_order_object = None
375 arvargs = parser.parse_args(args)
376 if (arvargs.create_template or arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
377 job_order_object = ({}, "")
380 if api_client is None:
381 api_client=arvados.api('v1', model=OrderedJsonModel())
382 runner = ArvCwlRunner(api_client, work_api=arvargs.work_api)
383 except Exception as e:
387 arvargs.conformance_test = None
388 arvargs.use_container = True
390 return cwltool.main.main(args=arvargs,
393 executor=runner.arvExecutor,
394 makeTool=runner.arvMakeTool,
395 versionfunc=versionstring,
396 job_order_object=job_order_object,
397 make_fs_access=partial(CollectionFsAccess, api_client=api_client))