9820: Directly poll job or container records that we are interested in. Benefit:...
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2
3 # Implement cwl-runner interface for submitting and running work on Arvados, using
4 # either the Crunch jobs API or Crunch containers API.
5
6 import argparse
7 import logging
8 import os
9 import sys
10 import threading
11 import hashlib
12 from functools import partial
13 import pkg_resources  # part of setuptools
14
15 from cwltool.errors import WorkflowException
16 import cwltool.main
17 import cwltool.workflow
18
19 import arvados
20 import arvados.config
21
22 from .arvcontainer import ArvadosContainer, RunnerContainer
23 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
24 from .arvtool import ArvadosCommandTool
25 from .fsaccess import CollectionFsAccess
26
27 from cwltool.process import shortname, UnsupportedRequirement
28 from cwltool.pathmapper import adjustFileObjs
29 from cwltool.draft2tool import compute_checksums
30 from arvados.api import OrderedJsonModel
31
32 logger = logging.getLogger('arvados.cwl-runner')
33 logger.setLevel(logging.INFO)
34
35 class ArvCwlRunner(object):
36     """Execute a CWL tool or workflow, submit work (using either jobs or
37     containers API), wait for them to complete, and report output.
38
39     """
40
41     def __init__(self, api_client, work_api=None):
42         self.api = api_client
43         self.processes = {}
44         self.lock = threading.Lock()
45         self.cond = threading.Condition(self.lock)
46         self.final_output = None
47         self.final_status = None
48         self.uploaded = {}
49         self.num_retries = 4
50         self.uuid = None
51         self.work_api = work_api
52         self.stop_polling = threading.Event()
53         self.poll_api = None
54
55         if self.work_api is None:
56             # todo: autodetect API to use.
57             self.work_api = "jobs"
58
59         if self.work_api not in ("containers", "jobs"):
60             raise Exception("Unsupported API '%s'" % self.work_api)
61
62     def arvMakeTool(self, toolpath_object, **kwargs):
63         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
64             return ArvadosCommandTool(self, toolpath_object, work_api=self.work_api, **kwargs)
65         else:
66             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
67
68     def output_callback(self, out, processStatus):
69         if processStatus == "success":
70             logger.info("Overall process status is %s", processStatus)
71             if self.pipeline:
72                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
73                                                      body={"state": "Complete"}).execute(num_retries=self.num_retries)
74         else:
75             logger.warn("Overall process status is %s", processStatus)
76             if self.pipeline:
77                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
78                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
79         self.final_status = processStatus
80         self.final_output = out
81
82     def on_message(self, event):
83         if "object_uuid" in event:
84             if event["object_uuid"] in self.processes and event["event_type"] == "update":
85                 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
86                     uuid = event["object_uuid"]
87                     with self.lock:
88                         j = self.processes[uuid]
89                         logger.info("Job %s (%s) is Running", j.name, uuid)
90                         j.running = True
91                         j.update_pipeline_component(event["properties"]["new_attributes"])
92                 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
93                     uuid = event["object_uuid"]
94                     try:
95                         self.cond.acquire()
96                         j = self.processes[uuid]
97                         logger.info("Job %s (%s) is %s", j.name, uuid, event["properties"]["new_attributes"]["state"])
98                         j.done(event["properties"]["new_attributes"])
99                         self.cond.notify()
100                     finally:
101                         self.cond.release()
102
103     def poll_states(self):
104         """Poll status of jobs or containers listed in the processes dict.
105
106         Runs in a separate thread.
107         """
108
109         while True:
110             self.stop_polling.wait(15)
111             if self.stop_polling.is_set():
112                 break
113             with self.lock:
114                 keys = self.processes.keys()
115             if not keys:
116                 continue
117
118             if self.work_api == "containers":
119                 table = self.poll_api.containers()
120             elif self.work_api == "jobs":
121                 table = self.poll_api.jobs()
122
123             try:
124                 proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
125             except Exception as e:
126                 logger.warn("Error checking states on API server: %s", e)
127                 continue
128
129             for p in proc_states["items"]:
130                 self.on_message({
131                     "object_uuid": p["uuid"],
132                     "event_type": "update",
133                     "properties": {
134                         "new_attributes": p
135                     }
136                 })
137
138     def get_uploaded(self):
139         return self.uploaded.copy()
140
141     def add_uploaded(self, src, pair):
142         self.uploaded[src] = pair
143
144     def check_writable(self, obj):
145         if isinstance(obj, dict):
146             if obj.get("writable"):
147                 raise UnsupportedRequirement("InitialWorkDir feature 'writable: true' not supported")
148             for v in obj.itervalues():
149                 self.check_writable(v)
150         if isinstance(obj, list):
151             for v in obj:
152                 self.check_writable(v)
153
154     def arvExecutor(self, tool, job_order, **kwargs):
155         self.debug = kwargs.get("debug")
156
157         tool.visit(self.check_writable)
158
159         if kwargs.get("quiet"):
160             logger.setLevel(logging.WARN)
161             logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
162
163         useruuid = self.api.users().current().execute()["uuid"]
164         self.project_uuid = kwargs.get("project_uuid") if kwargs.get("project_uuid") else useruuid
165         self.pipeline = None
166         make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess, api_client=self.api)
167         self.fs_access = make_fs_access(kwargs["basedir"])
168
169         if kwargs.get("create_template"):
170             tmpl = RunnerTemplate(self, tool, job_order, kwargs.get("enable_reuse"))
171             tmpl.save()
172             # cwltool.main will write our return value to stdout.
173             return tmpl.uuid
174
175         self.debug = kwargs.get("debug")
176         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
177
178         kwargs["make_fs_access"] = make_fs_access
179         kwargs["enable_reuse"] = kwargs.get("enable_reuse")
180         kwargs["use_container"] = True
181         kwargs["tmpdir_prefix"] = "tmp"
182         kwargs["on_error"] = "continue"
183         kwargs["compute_checksum"] = kwargs.get("compute_checksum")
184
185         if self.work_api == "containers":
186             kwargs["outdir"] = "/var/spool/cwl"
187             kwargs["docker_outdir"] = "/var/spool/cwl"
188             kwargs["tmpdir"] = "/tmp"
189             kwargs["docker_tmpdir"] = "/tmp"
190         elif self.work_api == "jobs":
191             kwargs["outdir"] = "$(task.outdir)"
192             kwargs["docker_outdir"] = "$(task.outdir)"
193             kwargs["tmpdir"] = "$(task.tmpdir)"
194
195         runnerjob = None
196         if kwargs.get("submit"):
197             if self.work_api == "containers":
198                 if tool.tool["class"] == "CommandLineTool":
199                     runnerjob = tool.job(job_order,
200                                          self.output_callback,
201                                          **kwargs).next()
202                 else:
203                     runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"))
204             else:
205                 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"))
206
207         if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and not self.work_api == "containers":
208             # Create pipeline for local run
209             self.pipeline = self.api.pipeline_instances().create(
210                 body={
211                     "owner_uuid": self.project_uuid,
212                     "name": shortname(tool.tool["id"]),
213                     "components": {},
214                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
215             logger.info("Pipeline instance %s", self.pipeline["uuid"])
216
217         if runnerjob and not kwargs.get("wait"):
218             runnerjob.run()
219             return runnerjob.uuid
220
221         self.poll_api = arvados.api('v1')
222         self.polling_thread = threading.Thread(target=self.poll_states)
223         self.polling_thread.start()
224
225         if runnerjob:
226             jobiter = iter((runnerjob,))
227         else:
228             if "cwl_runner_job" in kwargs:
229                 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
230             jobiter = tool.job(job_order,
231                                self.output_callback,
232                                **kwargs)
233
234         try:
235             self.cond.acquire()
236             # Will continue to hold the lock for the duration of this code
237             # except when in cond.wait(), at which point on_message can update
238             # job state and process output callbacks.
239
240             for runnable in jobiter:
241                 if runnable:
242                     runnable.run(**kwargs)
243                 else:
244                     if self.processes:
245                         self.cond.wait(1)
246                     else:
247                         logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
248                         break
249
250             while self.processes:
251                 self.cond.wait(1)
252
253         except UnsupportedRequirement:
254             raise
255         except:
256             if sys.exc_info()[0] is KeyboardInterrupt:
257                 logger.error("Interrupted, marking pipeline as failed")
258             else:
259                 logger.error("Caught unhandled exception, marking pipeline as failed.  Error was: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
260             if self.pipeline:
261                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
262                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
263             if runnerjob and runnerjob.uuid and self.work_api == "containers":
264                 self.api.container_requests().update(uuid=runnerjob.uuid,
265                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
266         finally:
267             self.cond.release()
268             self.stop_polling.set()
269             self.polling_thread.join()
270
271         if self.final_status == "UnsupportedRequirement":
272             raise UnsupportedRequirement("Check log for details.")
273
274         if self.final_status != "success":
275             raise WorkflowException("Workflow failed.")
276
277         if self.final_output is None:
278             raise WorkflowException("Workflow did not return a result.")
279
280         if kwargs.get("compute_checksum"):
281             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
282
283         return self.final_output
284
285
286 def versionstring():
287     """Print version string of key packages for provenance and debugging."""
288
289     arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
290     arvpkg = pkg_resources.require("arvados-python-client")
291     cwlpkg = pkg_resources.require("cwltool")
292
293     return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
294                                     "arvados-python-client", arvpkg[0].version,
295                                     "cwltool", cwlpkg[0].version)
296
297
298 def arg_parser():  # type: () -> argparse.ArgumentParser
299     parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
300
301     parser.add_argument("--basedir", type=str,
302                         help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
303     parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
304                         help="Output directory, default current directory")
305
306     parser.add_argument("--eval-timeout",
307                         help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
308                         type=float,
309                         default=20)
310     parser.add_argument("--version", action="store_true", help="Print version and exit")
311
312     exgroup = parser.add_mutually_exclusive_group()
313     exgroup.add_argument("--verbose", action="store_true", help="Default logging")
314     exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
315     exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
316
317     parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
318
319     exgroup = parser.add_mutually_exclusive_group()
320     exgroup.add_argument("--enable-reuse", action="store_true",
321                         default=True, dest="enable_reuse",
322                         help="")
323     exgroup.add_argument("--disable-reuse", action="store_false",
324                         default=True, dest="enable_reuse",
325                         help="")
326
327     parser.add_argument("--project-uuid", type=str, help="Project that will own the workflow jobs, if not provided, will go to home project.")
328     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
329                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
330                         default=False)
331
332     exgroup = parser.add_mutually_exclusive_group()
333     exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
334                         default=True, dest="submit")
335     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
336                         default=True, dest="submit")
337     exgroup.add_argument("--create-template", action="store_true", help="Create an Arvados pipeline template.")
338
339     exgroup = parser.add_mutually_exclusive_group()
340     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
341                         default=True, dest="wait")
342     exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
343                         default=True, dest="wait")
344
345     parser.add_argument("--api", type=str,
346                         default=None, dest="work_api",
347                         help="Select work submission API, one of 'jobs' or 'containers'.")
348
349     parser.add_argument("--compute-checksum", action="store_true", default=False,
350                         help="Compute checksum of contents while collecting outputs",
351                         dest="compute_checksum")
352
353     parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
354     parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
355
356     return parser
357
358
359 def main(args, stdout, stderr, api_client=None):
360     parser = arg_parser()
361
362     job_order_object = None
363     arvargs = parser.parse_args(args)
364     if arvargs.create_template and not arvargs.job_order:
365         job_order_object = ({}, "")
366
367     try:
368         if api_client is None:
369             api_client=arvados.api('v1', model=OrderedJsonModel())
370         runner = ArvCwlRunner(api_client, work_api=arvargs.work_api)
371     except Exception as e:
372         logger.error(e)
373         return 1
374
375     arvargs.conformance_test = None
376     arvargs.use_container = True
377
378     return cwltool.main.main(args=arvargs,
379                              stdout=stdout,
380                              stderr=stderr,
381                              executor=runner.arvExecutor,
382                              makeTool=runner.arvMakeTool,
383                              versionfunc=versionstring,
384                              job_order_object=job_order_object,
385                              make_fs_access=partial(CollectionFsAccess, api_client=api_client))