fe3249cc026dc51892b0a3e5e247332d679700e5
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2
3 # Implement cwl-runner interface for submitting and running work on Arvados, using
4 # either the Crunch jobs API or Crunch containers API.
5
6 import argparse
7 import logging
8 import os
9 import sys
10 import threading
11 import hashlib
12 from functools import partial
13 import pkg_resources  # part of setuptools
14
15 from cwltool.errors import WorkflowException
16 import cwltool.main
17 import cwltool.workflow
18
19 import arvados
20 import arvados.events
21 import arvados.config
22
23 from .arvcontainer import ArvadosContainer, RunnerContainer
24 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
25 from .arvtool import ArvadosCommandTool
26 from .fsaccess import CollectionFsAccess
27
28 from cwltool.process import shortname, UnsupportedRequirement
29 from cwltool.pathmapper import adjustFileObjs
30 from cwltool.draft2tool import compute_checksums
31 from arvados.api import OrderedJsonModel
32
33 logger = logging.getLogger('arvados.cwl-runner')
34 logger.setLevel(logging.INFO)
35
36 class ArvCwlRunner(object):
37     """Execute a CWL tool or workflow, submit work (using either jobs or
38     containers API), wait for them to complete, and report output.
39
40     """
41
42     def __init__(self, api_client, work_api=None):
43         self.api = api_client
44         self.processes = {}
45         self.lock = threading.Lock()
46         self.cond = threading.Condition(self.lock)
47         self.final_output = None
48         self.final_status = None
49         self.uploaded = {}
50         self.num_retries = 4
51         self.uuid = None
52         self.work_api = work_api
53
54         if self.work_api is None:
55             # todo: autodetect API to use.
56             self.work_api = "jobs"
57
58         if self.work_api not in ("containers", "jobs"):
59             raise Exception("Unsupported API '%s'" % self.work_api)
60
61     def arvMakeTool(self, toolpath_object, **kwargs):
62         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
63             return ArvadosCommandTool(self, toolpath_object, work_api=self.work_api, **kwargs)
64         else:
65             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
66
67     def output_callback(self, out, processStatus):
68         if processStatus == "success":
69             logger.info("Overall process status is %s", processStatus)
70             if self.pipeline:
71                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
72                                                      body={"state": "Complete"}).execute(num_retries=self.num_retries)
73         else:
74             logger.warn("Overall process status is %s", processStatus)
75             if self.pipeline:
76                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
77                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
78         self.final_status = processStatus
79         self.final_output = out
80
81     def on_message(self, event):
82         if "object_uuid" in event:
83             if event["object_uuid"] in self.processes and event["event_type"] == "update":
84                 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
85                     uuid = event["object_uuid"]
86                     with self.lock:
87                         j = self.processes[uuid]
88                         logger.info("Job %s (%s) is Running", j.name, uuid)
89                         j.running = True
90                         j.update_pipeline_component(event["properties"]["new_attributes"])
91                 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
92                     uuid = event["object_uuid"]
93                     try:
94                         self.cond.acquire()
95                         j = self.processes[uuid]
96                         logger.info("Job %s (%s) is %s", j.name, uuid, event["properties"]["new_attributes"]["state"])
97                         j.done(event["properties"]["new_attributes"])
98                         self.cond.notify()
99                     finally:
100                         self.cond.release()
101
102     def get_uploaded(self):
103         return self.uploaded.copy()
104
105     def add_uploaded(self, src, pair):
106         self.uploaded[src] = pair
107
108     def arvExecutor(self, tool, job_order, **kwargs):
109         self.debug = kwargs.get("debug")
110
111         if kwargs.get("quiet"):
112             logger.setLevel(logging.WARN)
113             logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
114
115         useruuid = self.api.users().current().execute()["uuid"]
116         self.project_uuid = kwargs.get("project_uuid") if kwargs.get("project_uuid") else useruuid
117         self.pipeline = None
118         make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess, api_client=self.api)
119         self.fs_access = make_fs_access(kwargs["basedir"])
120
121         if kwargs.get("create_template"):
122             tmpl = RunnerTemplate(self, tool, job_order, kwargs.get("enable_reuse"))
123             tmpl.save()
124             # cwltool.main will write our return value to stdout.
125             return tmpl.uuid
126
127         self.debug = kwargs.get("debug")
128         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
129
130         kwargs["make_fs_access"] = make_fs_access
131         kwargs["enable_reuse"] = kwargs.get("enable_reuse")
132         kwargs["use_container"] = True
133         kwargs["tmpdir_prefix"] = "tmp"
134         kwargs["on_error"] = "continue"
135         kwargs["compute_checksum"] = kwargs.get("compute_checksum")
136
137         if self.work_api == "containers":
138             kwargs["outdir"] = "/var/spool/cwl"
139             kwargs["docker_outdir"] = "/var/spool/cwl"
140             kwargs["tmpdir"] = "/tmp"
141         elif self.work_api == "jobs":
142             kwargs["outdir"] = "$(task.outdir)"
143             kwargs["docker_outdir"] = "$(task.outdir)"
144             kwargs["tmpdir"] = "$(task.tmpdir)"
145
146         runnerjob = None
147         if kwargs.get("submit"):
148             if self.work_api == "containers":
149                 if tool.tool["class"] == "CommandLineTool":
150                     runnerjob = tool.job(job_order,
151                                          self.output_callback,
152                                          **kwargs).next()
153                 else:
154                     runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"))
155             else:
156                 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"))
157
158         if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and not self.work_api == "containers":
159             # Create pipeline for local run
160             self.pipeline = self.api.pipeline_instances().create(
161                 body={
162                     "owner_uuid": self.project_uuid,
163                     "name": shortname(tool.tool["id"]),
164                     "components": {},
165                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
166             logger.info("Pipeline instance %s", self.pipeline["uuid"])
167
168         if runnerjob and not kwargs.get("wait"):
169             runnerjob.run()
170             return runnerjob.uuid
171
172         arvados.config.settings()["ARVADOS_DISABLE_WEBSOCKETS"] = "1"
173
174         if self.work_api == "containers":
175             events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#container"]], self.on_message)
176         if self.work_api == "jobs":
177             events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#job"]], self.on_message)
178
179         if runnerjob:
180             jobiter = iter((runnerjob,))
181         else:
182             if "cwl_runner_job" in kwargs:
183                 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
184             jobiter = tool.job(job_order,
185                                self.output_callback,
186                                **kwargs)
187
188         try:
189             self.cond.acquire()
190             # Will continue to hold the lock for the duration of this code
191             # except when in cond.wait(), at which point on_message can update
192             # job state and process output callbacks.
193
194             for runnable in jobiter:
195                 if runnable:
196                     runnable.run(**kwargs)
197                 else:
198                     if self.processes:
199                         self.cond.wait(1)
200                     else:
201                         logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
202                         break
203
204             while self.processes:
205                 self.cond.wait(1)
206
207             events.close()
208         except UnsupportedRequirement:
209             raise
210         except:
211             if sys.exc_info()[0] is KeyboardInterrupt:
212                 logger.error("Interrupted, marking pipeline as failed")
213             else:
214                 logger.error("Caught unhandled exception, marking pipeline as failed.  Error was: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
215             if self.pipeline:
216                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
217                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
218             if runnerjob and runnerjob.uuid and self.work_api == "containers":
219                 self.api.container_requests().update(uuid=runnerjob.uuid,
220                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
221         finally:
222             self.cond.release()
223
224         if self.final_status == "UnsupportedRequirement":
225             raise UnsupportedRequirement("Check log for details.")
226
227         if self.final_status != "success":
228             raise WorkflowException("Workflow failed.")
229
230         if self.final_output is None:
231             raise WorkflowException("Workflow did not return a result.")
232
233         if kwargs.get("compute_checksum"):
234             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
235
236         return self.final_output
237
238
239 def versionstring():
240     """Print version string of key packages for provenance and debugging."""
241
242     arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
243     arvpkg = pkg_resources.require("arvados-python-client")
244     cwlpkg = pkg_resources.require("cwltool")
245
246     return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
247                                     "arvados-python-client", arvpkg[0].version,
248                                     "cwltool", cwlpkg[0].version)
249
250
251 def arg_parser():  # type: () -> argparse.ArgumentParser
252     parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
253
254     parser.add_argument("--basedir", type=str,
255                         help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
256     parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
257                         help="Output directory, default current directory")
258
259     parser.add_argument("--eval-timeout",
260                         help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
261                         type=float,
262                         default=20)
263     parser.add_argument("--version", action="store_true", help="Print version and exit")
264
265     exgroup = parser.add_mutually_exclusive_group()
266     exgroup.add_argument("--verbose", action="store_true", help="Default logging")
267     exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
268     exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
269
270     parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
271
272     exgroup = parser.add_mutually_exclusive_group()
273     exgroup.add_argument("--enable-reuse", action="store_true",
274                         default=True, dest="enable_reuse",
275                         help="")
276     exgroup.add_argument("--disable-reuse", action="store_false",
277                         default=True, dest="enable_reuse",
278                         help="")
279
280     parser.add_argument("--project-uuid", type=str, help="Project that will own the workflow jobs, if not provided, will go to home project.")
281     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
282                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
283                         default=False)
284
285     exgroup = parser.add_mutually_exclusive_group()
286     exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
287                         default=True, dest="submit")
288     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
289                         default=True, dest="submit")
290     exgroup.add_argument("--create-template", action="store_true", help="Create an Arvados pipeline template.")
291
292     exgroup = parser.add_mutually_exclusive_group()
293     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
294                         default=True, dest="wait")
295     exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
296                         default=True, dest="wait")
297
298     parser.add_argument("--api", type=str,
299                         default=None, dest="work_api",
300                         help="Select work submission API, one of 'jobs' or 'containers'.")
301
302     parser.add_argument("--compute-checksum", action="store_true", default=False,
303                         help="Compute checksum of contents while collecting outputs",
304                         dest="compute_checksum")
305
306     parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
307     parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
308
309     return parser
310
311
312 def main(args, stdout, stderr, api_client=None):
313     parser = arg_parser()
314
315     job_order_object = None
316     arvargs = parser.parse_args(args)
317     if arvargs.create_template and not arvargs.job_order:
318         job_order_object = ({}, "")
319
320     try:
321         if api_client is None:
322             api_client=arvados.api('v1', model=OrderedJsonModel())
323         runner = ArvCwlRunner(api_client, work_api=arvargs.work_api)
324     except Exception as e:
325         logger.error(e)
326         return 1
327
328     arvargs.conformance_test = None
329     arvargs.use_container = True
330
331     return cwltool.main.main(args=arvargs,
332                              stdout=stdout,
333                              stderr=stderr,
334                              executor=runner.arvExecutor,
335                              makeTool=runner.arvMakeTool,
336                              versionfunc=versionstring,
337                              job_order_object=job_order_object,
338                              make_fs_access=partial(CollectionFsAccess, api_client=api_client))