closes #9552
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2
3 # Implement cwl-runner interface for submitting and running work on Arvados, using
4 # either the Crunch jobs API or Crunch containers API.
5
6 import argparse
7 import logging
8 import os
9 import sys
10 import threading
11 import pkg_resources  # part of setuptools
12
13 from cwltool.errors import WorkflowException
14 import cwltool.main
15 import cwltool.workflow
16
17 import arvados
18 import arvados.events
19 import arvados.config
20
21 from .arvcontainer import ArvadosContainer, RunnerContainer
22 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
23 from .arvtool import ArvadosCommandTool
24 from .fsaccess import CollectionFsAccess
25
26 from cwltool.process import shortname, UnsupportedRequirement
27 from arvados.api import OrderedJsonModel
28
29 logger = logging.getLogger('arvados.cwl-runner')
30 logger.setLevel(logging.INFO)
31
32 class ArvCwlRunner(object):
33     """Execute a CWL tool or workflow, submit work (using either jobs or
34     containers API), wait for them to complete, and report output.
35
36     """
37
38     def __init__(self, api_client, work_api=None):
39         self.api = api_client
40         self.processes = {}
41         self.lock = threading.Lock()
42         self.cond = threading.Condition(self.lock)
43         self.final_output = None
44         self.final_status = None
45         self.uploaded = {}
46         self.num_retries = 4
47         self.uuid = None
48         self.work_api = work_api
49
50         if self.work_api is None:
51             # todo: autodetect API to use.
52             self.work_api = "jobs"
53
54         if self.work_api not in ("containers", "jobs"):
55             raise Exception("Unsupported API '%s'" % self.work_api)
56
57     def arvMakeTool(self, toolpath_object, **kwargs):
58         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
59             return ArvadosCommandTool(self, toolpath_object, work_api=self.work_api, **kwargs)
60         else:
61             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
62
63     def output_callback(self, out, processStatus):
64         if processStatus == "success":
65             logger.info("Overall process status is %s", processStatus)
66             if self.pipeline:
67                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
68                                                      body={"state": "Complete"}).execute(num_retries=self.num_retries)
69         else:
70             logger.warn("Overall process status is %s", processStatus)
71             if self.pipeline:
72                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
73                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
74         self.final_status = processStatus
75         self.final_output = out
76
77     def on_message(self, event):
78         if "object_uuid" in event:
79             if event["object_uuid"] in self.processes and event["event_type"] == "update":
80                 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
81                     uuid = event["object_uuid"]
82                     with self.lock:
83                         j = self.processes[uuid]
84                         logger.info("Job %s (%s) is Running", j.name, uuid)
85                         j.running = True
86                         j.update_pipeline_component(event["properties"]["new_attributes"])
87                 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
88                     uuid = event["object_uuid"]
89                     try:
90                         self.cond.acquire()
91                         j = self.processes[uuid]
92                         logger.info("Job %s (%s) is %s", j.name, uuid, event["properties"]["new_attributes"]["state"])
93                         j.done(event["properties"]["new_attributes"])
94                         self.cond.notify()
95                     finally:
96                         self.cond.release()
97
98     def get_uploaded(self):
99         return self.uploaded.copy()
100
101     def add_uploaded(self, src, pair):
102         self.uploaded[src] = pair
103
104     def arvExecutor(self, tool, job_order, **kwargs):
105         self.debug = kwargs.get("debug")
106
107         if kwargs.get("quiet"):
108             logger.setLevel(logging.WARN)
109             logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
110
111         useruuid = self.api.users().current().execute()["uuid"]
112         self.project_uuid = kwargs.get("project_uuid") if kwargs.get("project_uuid") else useruuid
113         self.pipeline = None
114         self.fs_access = CollectionFsAccess(kwargs["basedir"], api_client=self.api)
115
116         if kwargs.get("create_template"):
117             tmpl = RunnerTemplate(self, tool, job_order, kwargs.get("enable_reuse"))
118             tmpl.save()
119             # cwltool.main will write our return value to stdout.
120             return tmpl.uuid
121
122         self.debug = kwargs.get("debug")
123         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
124
125         kwargs["fs_access"] = self.fs_access
126         kwargs["enable_reuse"] = kwargs.get("enable_reuse")
127         kwargs["use_container"] = True
128         kwargs["tmpdir_prefix"] = "tmp"
129         kwargs["on_error"] = "continue"
130         kwargs["compute_checksum"] = False
131
132         if self.work_api == "containers":
133             kwargs["outdir"] = "/var/spool/cwl"
134             kwargs["docker_outdir"] = "/var/spool/cwl"
135             kwargs["tmpdir"] = "/tmp"
136         elif self.work_api == "jobs":
137             kwargs["outdir"] = "$(task.outdir)"
138             kwargs["docker_outdir"] = "$(task.outdir)"
139             kwargs["tmpdir"] = "$(task.tmpdir)"
140
141         runnerjob = None
142         if kwargs.get("submit"):
143             if self.work_api == "containers":
144                 if tool.tool["class"] == "CommandLineTool":
145                     runnerjob = tool.job(job_order,
146                                          self.output_callback,
147                                          **kwargs).next()
148                 else:
149                     runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"))
150             else:
151                 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"))
152
153         if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and not self.work_api == "containers":
154             # Create pipeline for local run
155             self.pipeline = self.api.pipeline_instances().create(
156                 body={
157                     "owner_uuid": self.project_uuid,
158                     "name": shortname(tool.tool["id"]),
159                     "components": {},
160                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
161             logger.info("Pipeline instance %s", self.pipeline["uuid"])
162
163         if runnerjob and not kwargs.get("wait"):
164             runnerjob.run()
165             return runnerjob.uuid
166
167         arvados.config.settings()["ARVADOS_DISABLE_WEBSOCKETS"] = "1"
168
169         if self.work_api == "containers":
170             events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#container"]], self.on_message)
171         if self.work_api == "jobs":
172             events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#job"]], self.on_message)
173
174         if runnerjob:
175             jobiter = iter((runnerjob,))
176         else:
177             if "cwl_runner_job" in kwargs:
178                 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
179             jobiter = tool.job(job_order,
180                                self.output_callback,
181                                **kwargs)
182
183         try:
184             self.cond.acquire()
185             # Will continue to hold the lock for the duration of this code
186             # except when in cond.wait(), at which point on_message can update
187             # job state and process output callbacks.
188
189             for runnable in jobiter:
190                 if runnable:
191                     runnable.run(**kwargs)
192                 else:
193                     if self.processes:
194                         self.cond.wait(1)
195                     else:
196                         logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
197                         break
198
199             while self.processes:
200                 self.cond.wait(1)
201
202             events.close()
203         except UnsupportedRequirement:
204             raise
205         except:
206             if sys.exc_info()[0] is KeyboardInterrupt:
207                 logger.error("Interrupted, marking pipeline as failed")
208             else:
209                 logger.error("Caught unhandled exception, marking pipeline as failed.  Error was: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
210             if self.pipeline:
211                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
212                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
213             if runnerjob and runnerjob.uuid and self.work_api == "containers":
214                 self.api.container_requests().update(uuid=runnerjob.uuid,
215                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
216         finally:
217             self.cond.release()
218
219         if self.final_status == "UnsupportedRequirement":
220             raise UnsupportedRequirement("Check log for details.")
221
222         if self.final_status != "success":
223             raise WorkflowException("Workflow failed.")
224
225         if self.final_output is None:
226             raise WorkflowException("Workflow did not return a result.")
227
228         return self.final_output
229
230
231 def versionstring():
232     """Print version string of key packages for provenance and debugging."""
233
234     arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
235     arvpkg = pkg_resources.require("arvados-python-client")
236     cwlpkg = pkg_resources.require("cwltool")
237
238     return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
239                                     "arvados-python-client", arvpkg[0].version,
240                                     "cwltool", cwlpkg[0].version)
241
242
243 def arg_parser():  # type: () -> argparse.ArgumentParser
244     parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
245
246     parser.add_argument("--basedir", type=str,
247                         help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
248     parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
249                         help="Output directory, default current directory")
250
251     parser.add_argument("--eval-timeout",
252                         help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
253                         type=float,
254                         default=20)
255     parser.add_argument("--version", action="store_true", help="Print version and exit")
256
257     exgroup = parser.add_mutually_exclusive_group()
258     exgroup.add_argument("--verbose", action="store_true", help="Default logging")
259     exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
260     exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
261
262     parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
263
264     exgroup = parser.add_mutually_exclusive_group()
265     exgroup.add_argument("--enable-reuse", action="store_true",
266                         default=True, dest="enable_reuse",
267                         help="")
268     exgroup.add_argument("--disable-reuse", action="store_false",
269                         default=True, dest="enable_reuse",
270                         help="")
271
272     parser.add_argument("--project-uuid", type=str, help="Project that will own the workflow jobs, if not provided, will go to home project.")
273     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
274                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
275                         default=False)
276
277     exgroup = parser.add_mutually_exclusive_group()
278     exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
279                         default=True, dest="submit")
280     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
281                         default=True, dest="submit")
282     exgroup.add_argument("--create-template", action="store_true", help="Create an Arvados pipeline template.")
283
284     exgroup = parser.add_mutually_exclusive_group()
285     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
286                         default=True, dest="wait")
287     exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
288                         default=True, dest="wait")
289
290     parser.add_argument("--api", type=str,
291                         default=None, dest="work_api",
292                         help="Select work submission API, one of 'jobs' or 'containers'.")
293
294     parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
295     parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
296
297     return parser
298
299
300 def main(args, stdout, stderr, api_client=None):
301     parser = arg_parser()
302
303     job_order_object = None
304     arvargs = parser.parse_args(args)
305     if arvargs.create_template and not arvargs.job_order:
306         job_order_object = ({}, "")
307
308     try:
309         if api_client is None:
310             api_client=arvados.api('v1', model=OrderedJsonModel())
311         runner = ArvCwlRunner(api_client, work_api=arvargs.work_api)
312     except Exception as e:
313         logger.error(e)
314         return 1
315
316     arvargs.conformance_test = None
317     arvargs.use_container = True
318
319     return cwltool.main.main(args=arvargs,
320                              stdout=stdout,
321                              stderr=stderr,
322                              executor=runner.arvExecutor,
323                              makeTool=runner.arvMakeTool,
324                              versionfunc=versionstring,
325                              job_order_object=job_order_object)