ce1cc26eeeaaf3236d7936d74cb249ef9fec8c89
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2
3 # Implement cwl-runner interface for submitting and running work on Arvados, using
4 # either the Crunch jobs API or Crunch containers API.
5
6 import argparse
7 import logging
8 import os
9 import sys
10 import threading
11 import pkg_resources  # part of setuptools
12
13 from cwltool.errors import WorkflowException
14 import cwltool.main
15 import cwltool.workflow
16
17 import arvados
18 import arvados.events
19 import arvados.config
20
21 from .arvcontainer import ArvadosContainer, RunnerContainer
22 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
23 from .arvtool import ArvadosCommandTool
24 from .fsaccess import CollectionFsAccess
25
26 from cwltool.process import shortname, UnsupportedRequirement
27 from arvados.api import OrderedJsonModel
28
29 logger = logging.getLogger('arvados.cwl-runner')
30 logger.setLevel(logging.INFO)
31
32 class ArvCwlRunner(object):
33     """Execute a CWL tool or workflow, submit work (using either jobs or
34     containers API), wait for them to complete, and report output.
35
36     """
37
38     def __init__(self, api_client, work_api=None):
39         self.api = api_client
40         self.processes = {}
41         self.lock = threading.Lock()
42         self.cond = threading.Condition(self.lock)
43         self.final_output = None
44         self.final_status = None
45         self.uploaded = {}
46         self.num_retries = 4
47         self.uuid = None
48         self.work_api = work_api
49
50         if self.work_api is None:
51             # todo: autodetect API to use.
52             self.work_api = "jobs"
53
54         if self.work_api not in ("containers", "jobs"):
55             raise Exception("Unsupported API '%s'" % self.work_api)
56
57     def arvMakeTool(self, toolpath_object, **kwargs):
58         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
59             return ArvadosCommandTool(self, toolpath_object, work_api=self.work_api, **kwargs)
60         else:
61             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
62
63     def output_callback(self, out, processStatus):
64         if processStatus == "success":
65             logger.info("Overall process status is %s", processStatus)
66             if self.pipeline:
67                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
68                                                      body={"state": "Complete"}).execute(num_retries=self.num_retries)
69         else:
70             logger.warn("Overall process status is %s", processStatus)
71             if self.pipeline:
72                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
73                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
74         self.final_status = processStatus
75         self.final_output = out
76
77     def on_message(self, event):
78         if "object_uuid" in event:
79             if event["object_uuid"] in self.processes and event["event_type"] == "update":
80                 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
81                     uuid = event["object_uuid"]
82                     with self.lock:
83                         j = self.processes[uuid]
84                         logger.info("Job %s (%s) is Running", j.name, uuid)
85                         j.running = True
86                         j.update_pipeline_component(event["properties"]["new_attributes"])
87                 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
88                     uuid = event["object_uuid"]
89                     try:
90                         self.cond.acquire()
91                         j = self.processes[uuid]
92                         logger.info("Job %s (%s) is %s", j.name, uuid, event["properties"]["new_attributes"]["state"])
93                         j.done(event["properties"]["new_attributes"])
94                         self.cond.notify()
95                     finally:
96                         self.cond.release()
97
98     def get_uploaded(self):
99         return self.uploaded.copy()
100
101     def add_uploaded(self, src, pair):
102         self.uploaded[src] = pair
103
104     def arvExecutor(self, tool, job_order, **kwargs):
105         self.debug = kwargs.get("debug")
106
107         if kwargs.get("quiet"):
108             logger.setLevel(logging.WARN)
109             logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
110
111         useruuid = self.api.users().current().execute()["uuid"]
112         self.project_uuid = kwargs.get("project_uuid") if kwargs.get("project_uuid") else useruuid
113         self.pipeline = None
114         self.fs_access = CollectionFsAccess(kwargs["basedir"], api_client=self.api)
115
116         if kwargs.get("create_template"):
117             tmpl = RunnerTemplate(self, tool, job_order, kwargs.get("enable_reuse"))
118             tmpl.save()
119             # cwltool.main will write our return value to stdout.
120             return tmpl.uuid
121
122         self.debug = kwargs.get("debug")
123         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
124
125         kwargs["fs_access"] = self.fs_access
126         kwargs["enable_reuse"] = kwargs.get("enable_reuse")
127         kwargs["use_container"] = True
128         kwargs["tmpdir_prefix"] = "tmp"
129         kwargs["on_error"] = "continue"
130         kwargs["compute_checksum"] = kwargs.get("compute_checksum")
131
132         if self.work_api == "containers":
133             kwargs["outdir"] = "/var/spool/cwl"
134             kwargs["docker_outdir"] = "/var/spool/cwl"
135             kwargs["tmpdir"] = "/tmp"
136         elif self.work_api == "jobs":
137             kwargs["outdir"] = "$(task.outdir)"
138             kwargs["docker_outdir"] = "$(task.outdir)"
139             kwargs["tmpdir"] = "$(task.tmpdir)"
140
141         runnerjob = None
142         if kwargs.get("submit"):
143             if self.work_api == "containers":
144                 if tool.tool["class"] == "CommandLineTool":
145                     runnerjob = tool.job(job_order,
146                                          self.output_callback,
147                                          **kwargs).next()
148                 else:
149                     runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"))
150             else:
151                 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"))
152
153         if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and not self.work_api == "containers":
154             # Create pipeline for local run
155             self.pipeline = self.api.pipeline_instances().create(
156                 body={
157                     "owner_uuid": self.project_uuid,
158                     "name": shortname(tool.tool["id"]),
159                     "components": {},
160                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
161             logger.info("Pipeline instance %s", self.pipeline["uuid"])
162
163         if runnerjob and not kwargs.get("wait"):
164             runnerjob.run()
165             return runnerjob.uuid
166
167         arvados.config.settings()["ARVADOS_DISABLE_WEBSOCKETS"] = "1"
168
169         if self.work_api == "containers":
170             events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#container"]], self.on_message)
171         if self.work_api == "jobs":
172             events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#job"]], self.on_message)
173
174         if runnerjob:
175             jobiter = iter((runnerjob,))
176         else:
177             if "cwl_runner_job" in kwargs:
178                 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
179             jobiter = tool.job(job_order,
180                                self.output_callback,
181                                **kwargs)
182
183         try:
184             self.cond.acquire()
185             # Will continue to hold the lock for the duration of this code
186             # except when in cond.wait(), at which point on_message can update
187             # job state and process output callbacks.
188
189             for runnable in jobiter:
190                 if runnable:
191                     runnable.run(**kwargs)
192                 else:
193                     if self.processes:
194                         self.cond.wait(1)
195                     else:
196                         logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
197                         break
198
199             while self.processes:
200                 self.cond.wait(1)
201
202             events.close()
203         except UnsupportedRequirement:
204             raise
205         except:
206             if sys.exc_info()[0] is KeyboardInterrupt:
207                 logger.error("Interrupted, marking pipeline as failed")
208             else:
209                 logger.error("Caught unhandled exception, marking pipeline as failed.  Error was: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
210             if self.pipeline:
211                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
212                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
213             if runnerjob and runnerjob.uuid and self.work_api == "containers":
214                 self.api.container_requests().update(uuid=runnerjob.uuid,
215                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
216         finally:
217             self.cond.release()
218
219         if self.final_status == "UnsupportedRequirement":
220             raise UnsupportedRequirement("Check log for details.")
221
222         if self.final_status != "success":
223             raise WorkflowException("Workflow failed.")
224
225         if self.final_output is None:
226             raise WorkflowException("Workflow did not return a result.")
227
228         return self.final_output
229
230
231 def versionstring():
232     """Print version string of key packages for provenance and debugging."""
233
234     arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
235     arvpkg = pkg_resources.require("arvados-python-client")
236     cwlpkg = pkg_resources.require("cwltool")
237
238     return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
239                                     "arvados-python-client", arvpkg[0].version,
240                                     "cwltool", cwlpkg[0].version)
241
242
243 def arg_parser():  # type: () -> argparse.ArgumentParser
244     parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
245
246     parser.add_argument("--basedir", type=str,
247                         help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
248     parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
249                         help="Output directory, default current directory")
250
251     parser.add_argument("--eval-timeout",
252                         help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
253                         type=float,
254                         default=20)
255     parser.add_argument("--version", action="store_true", help="Print version and exit")
256
257     exgroup = parser.add_mutually_exclusive_group()
258     exgroup.add_argument("--verbose", action="store_true", help="Default logging")
259     exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
260     exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
261
262     parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
263
264     exgroup = parser.add_mutually_exclusive_group()
265     exgroup.add_argument("--enable-reuse", action="store_true",
266                         default=True, dest="enable_reuse",
267                         help="")
268     exgroup.add_argument("--disable-reuse", action="store_false",
269                         default=True, dest="enable_reuse",
270                         help="")
271
272     parser.add_argument("--project-uuid", type=str, help="Project that will own the workflow jobs, if not provided, will go to home project.")
273     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
274                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
275                         default=False)
276
277     exgroup = parser.add_mutually_exclusive_group()
278     exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
279                         default=True, dest="submit")
280     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
281                         default=True, dest="submit")
282     exgroup.add_argument("--create-template", action="store_true", help="Create an Arvados pipeline template.")
283
284     exgroup = parser.add_mutually_exclusive_group()
285     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
286                         default=True, dest="wait")
287     exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
288                         default=True, dest="wait")
289
290     parser.add_argument("--api", type=str,
291                         default=None, dest="work_api",
292                         help="Select work submission API, one of 'jobs' or 'containers'.")
293
294     parser.add_argument("--compute-checksum", action="store_true", default=False,
295                         help="Compute checksum of contents while collecting outputs",
296                         dest="compute_checksum")
297
298     parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
299     parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
300
301     return parser
302
303
304 def main(args, stdout, stderr, api_client=None):
305     parser = arg_parser()
306
307     job_order_object = None
308     arvargs = parser.parse_args(args)
309     if arvargs.create_template and not arvargs.job_order:
310         job_order_object = ({}, "")
311
312     try:
313         if api_client is None:
314             api_client=arvados.api('v1', model=OrderedJsonModel())
315         runner = ArvCwlRunner(api_client, work_api=arvargs.work_api)
316     except Exception as e:
317         logger.error(e)
318         return 1
319
320     arvargs.conformance_test = None
321     arvargs.use_container = True
322
323     return cwltool.main.main(args=arvargs,
324                              stdout=stdout,
325                              stderr=stderr,
326                              executor=runner.arvExecutor,
327                              makeTool=runner.arvMakeTool,
328                              versionfunc=versionstring,
329                              job_order_object=job_order_object)