Merge branch '9570-cwl-runner-fixes' closes #9570 (again)
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2
3 # Implement cwl-runner interface for submitting and running work on Arvados, using
4 # either the Crunch jobs API or Crunch containers API.
5
6 import argparse
7 import logging
8 import os
9 import sys
10 import threading
11 import pkg_resources  # part of setuptools
12
13 from cwltool.errors import WorkflowException
14 import cwltool.main
15 import cwltool.workflow
16
17 import arvados
18 import arvados.events
19
20 from .arvcontainer import ArvadosContainer, RunnerContainer
21 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
22 from .arvtool import ArvadosCommandTool
23 from .fsaccess import CollectionFsAccess
24
25 from cwltool.process import shortname, UnsupportedRequirement
26 from arvados.api import OrderedJsonModel
27
28 logger = logging.getLogger('arvados.cwl-runner')
29 logger.setLevel(logging.INFO)
30
31 class ArvCwlRunner(object):
32     """Execute a CWL tool or workflow, submit work (using either jobs or
33     containers API), wait for them to complete, and report output.
34
35     """
36
37     def __init__(self, api_client, work_api=None):
38         self.api = api_client
39         self.processes = {}
40         self.lock = threading.Lock()
41         self.cond = threading.Condition(self.lock)
42         self.final_output = None
43         self.final_status = None
44         self.uploaded = {}
45         self.num_retries = 4
46         self.uuid = None
47         self.work_api = work_api
48
49         if self.work_api is None:
50             # todo: autodetect API to use.
51             self.work_api = "jobs"
52
53         if self.work_api not in ("containers", "jobs"):
54             raise Exception("Unsupported API '%s'" % self.work_api)
55
56     def arvMakeTool(self, toolpath_object, **kwargs):
57         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
58             return ArvadosCommandTool(self, toolpath_object, work_api=self.work_api, **kwargs)
59         else:
60             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
61
62     def output_callback(self, out, processStatus):
63         if processStatus == "success":
64             logger.info("Overall process status is %s", processStatus)
65             if self.pipeline:
66                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
67                                                      body={"state": "Complete"}).execute(num_retries=self.num_retries)
68         else:
69             logger.warn("Overall process status is %s", processStatus)
70             if self.pipeline:
71                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
72                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
73         self.final_status = processStatus
74         self.final_output = out
75
76     def on_message(self, event):
77         if "object_uuid" in event:
78             if event["object_uuid"] in self.processes and event["event_type"] == "update":
79                 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
80                     uuid = event["object_uuid"]
81                     with self.lock:
82                         j = self.processes[uuid]
83                         logger.info("Job %s (%s) is Running", j.name, uuid)
84                         j.running = True
85                         j.update_pipeline_component(event["properties"]["new_attributes"])
86                 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
87                     uuid = event["object_uuid"]
88                     try:
89                         self.cond.acquire()
90                         j = self.processes[uuid]
91                         logger.info("Job %s (%s) is %s", j.name, uuid, event["properties"]["new_attributes"]["state"])
92                         j.done(event["properties"]["new_attributes"])
93                         self.cond.notify()
94                     finally:
95                         self.cond.release()
96
97     def get_uploaded(self):
98         return self.uploaded.copy()
99
100     def add_uploaded(self, src, pair):
101         self.uploaded[src] = pair
102
103     def arvExecutor(self, tool, job_order, **kwargs):
104         self.debug = kwargs.get("debug")
105
106         if kwargs.get("quiet"):
107             logger.setLevel(logging.WARN)
108             logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
109
110         useruuid = self.api.users().current().execute()["uuid"]
111         self.project_uuid = kwargs.get("project_uuid") if kwargs.get("project_uuid") else useruuid
112         self.pipeline = None
113         self.fs_access = CollectionFsAccess(kwargs["basedir"], api_client=self.api)
114
115         if kwargs.get("create_template"):
116             tmpl = RunnerTemplate(self, tool, job_order, kwargs.get("enable_reuse"))
117             tmpl.save()
118             # cwltool.main will write our return value to stdout.
119             return tmpl.uuid
120
121         self.debug = kwargs.get("debug")
122         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
123
124         kwargs["fs_access"] = self.fs_access
125         kwargs["enable_reuse"] = kwargs.get("enable_reuse")
126         kwargs["use_container"] = True
127         kwargs["tmpdir_prefix"] = "tmp"
128         kwargs["on_error"] = "continue"
129
130         if self.work_api == "containers":
131             kwargs["outdir"] = "/var/spool/cwl"
132             kwargs["docker_outdir"] = "/var/spool/cwl"
133             kwargs["tmpdir"] = "/tmp"
134         elif self.work_api == "jobs":
135             kwargs["outdir"] = "$(task.outdir)"
136             kwargs["docker_outdir"] = "$(task.outdir)"
137             kwargs["tmpdir"] = "$(task.tmpdir)"
138
139         runnerjob = None
140         if kwargs.get("submit"):
141             if self.work_api == "containers":
142                 if tool.tool["class"] == "CommandLineTool":
143                     runnerjob = tool.job(job_order,
144                                          self.output_callback,
145                                          **kwargs).next()
146                 else:
147                     runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"))
148             else:
149                 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"))
150
151         if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and not self.work_api == "containers":
152             # Create pipeline for local run
153             self.pipeline = self.api.pipeline_instances().create(
154                 body={
155                     "owner_uuid": self.project_uuid,
156                     "name": shortname(tool.tool["id"]),
157                     "components": {},
158                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
159             logger.info("Pipeline instance %s", self.pipeline["uuid"])
160
161         if runnerjob and not kwargs.get("wait"):
162             runnerjob.run()
163             return runnerjob.uuid
164
165         if self.work_api == "containers":
166             events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#container"]], self.on_message)
167         if self.work_api == "jobs":
168             events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#job"]], self.on_message)
169
170         if runnerjob:
171             jobiter = iter((runnerjob,))
172         else:
173             if "cwl_runner_job" in kwargs:
174                 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
175             jobiter = tool.job(job_order,
176                                self.output_callback,
177                                **kwargs)
178
179         try:
180             self.cond.acquire()
181             # Will continue to hold the lock for the duration of this code
182             # except when in cond.wait(), at which point on_message can update
183             # job state and process output callbacks.
184
185             for runnable in jobiter:
186                 if runnable:
187                     runnable.run(**kwargs)
188                 else:
189                     if self.processes:
190                         self.cond.wait(1)
191                     else:
192                         logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
193                         break
194
195             while self.processes:
196                 self.cond.wait(1)
197
198             events.close()
199         except UnsupportedRequirement:
200             raise
201         except:
202             if sys.exc_info()[0] is KeyboardInterrupt:
203                 logger.error("Interrupted, marking pipeline as failed")
204             else:
205                 logger.error("Caught unhandled exception, marking pipeline as failed.  Error was: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
206             if self.pipeline:
207                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
208                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
209             if runnerjob and runnerjob.uuid and self.work_api == "containers":
210                 self.api.container_requests().update(uuid=runnerjob.uuid,
211                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
212         finally:
213             self.cond.release()
214
215         if self.final_status == "UnsupportedRequirement":
216             raise UnsupportedRequirement("Check log for details.")
217
218         if self.final_status != "success":
219             raise WorkflowException("Workflow failed.")
220
221         if self.final_output is None:
222             raise WorkflowException("Workflow did not return a result.")
223
224         return self.final_output
225
226
227 def versionstring():
228     """Print version string of key packages for provenance and debugging."""
229
230     arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
231     arvpkg = pkg_resources.require("arvados-python-client")
232     cwlpkg = pkg_resources.require("cwltool")
233
234     return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
235                                     "arvados-python-client", arvpkg[0].version,
236                                     "cwltool", cwlpkg[0].version)
237
238
239 def arg_parser():  # type: () -> argparse.ArgumentParser
240     parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
241
242     parser.add_argument("--basedir", type=str,
243                         help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
244     parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
245                         help="Output directory, default current directory")
246
247     parser.add_argument("--eval-timeout",
248                         help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
249                         type=float,
250                         default=20)
251     parser.add_argument("--version", action="store_true", help="Print version and exit")
252
253     exgroup = parser.add_mutually_exclusive_group()
254     exgroup.add_argument("--verbose", action="store_true", help="Default logging")
255     exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
256     exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
257
258     parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
259
260     exgroup = parser.add_mutually_exclusive_group()
261     exgroup.add_argument("--enable-reuse", action="store_true",
262                         default=True, dest="enable_reuse",
263                         help="")
264     exgroup.add_argument("--disable-reuse", action="store_false",
265                         default=True, dest="enable_reuse",
266                         help="")
267
268     parser.add_argument("--project-uuid", type=str, help="Project that will own the workflow jobs, if not provided, will go to home project.")
269     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
270                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
271                         default=False)
272
273     exgroup = parser.add_mutually_exclusive_group()
274     exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
275                         default=True, dest="submit")
276     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
277                         default=True, dest="submit")
278     exgroup.add_argument("--create-template", action="store_true", help="Create an Arvados pipeline template.")
279
280     exgroup = parser.add_mutually_exclusive_group()
281     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
282                         default=True, dest="wait")
283     exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
284                         default=True, dest="wait")
285
286     parser.add_argument("--api", type=str,
287                         default=None, dest="work_api",
288                         help="Select work submission API, one of 'jobs' or 'containers'.")
289
290     parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
291     parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
292
293     return parser
294
295
296 def main(args, stdout, stderr, api_client=None):
297     parser = arg_parser()
298
299     job_order_object = None
300     arvargs = parser.parse_args(args)
301     if arvargs.create_template and not arvargs.job_order:
302         job_order_object = ({}, "")
303
304     try:
305         if api_client is None:
306             api_client=arvados.api('v1', model=OrderedJsonModel())
307         runner = ArvCwlRunner(api_client, work_api=arvargs.work_api)
308     except Exception as e:
309         logger.error(e)
310         return 1
311
312     arvargs.conformance_test = None
313     arvargs.use_container = True
314
315     return cwltool.main.main(args=arvargs,
316                              stdout=stdout,
317                              stderr=stderr,
318                              executor=runner.arvExecutor,
319                              makeTool=runner.arvMakeTool,
320                              versionfunc=versionstring,
321                              job_order_object=job_order_object)