Merge branch '9613-user-profile-string-keys'
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2
3 # Implement cwl-runner interface for submitting and running work on Arvados, using
4 # either the Crunch jobs API or Crunch containers API.
5
6 import argparse
7 import logging
8 import os
9 import sys
10 import threading
11 import pkg_resources  # part of setuptools
12
13 from cwltool.errors import WorkflowException
14 import cwltool.main
15 import cwltool.workflow
16
17 import arvados
18 import arvados.events
19 import arvados.config
20
21 from .arvcontainer import ArvadosContainer, RunnerContainer
22 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
23 from .arvtool import ArvadosCommandTool
24 from .fsaccess import CollectionFsAccess
25
26 from cwltool.process import shortname, UnsupportedRequirement
27 from arvados.api import OrderedJsonModel
28
29 logger = logging.getLogger('arvados.cwl-runner')
30 logger.setLevel(logging.INFO)
31
32 class ArvCwlRunner(object):
33     """Execute a CWL tool or workflow, submit work (using either jobs or
34     containers API), wait for them to complete, and report output.
35
36     """
37
38     def __init__(self, api_client, work_api=None):
39         self.api = api_client
40         self.processes = {}
41         self.lock = threading.Lock()
42         self.cond = threading.Condition(self.lock)
43         self.final_output = None
44         self.final_status = None
45         self.uploaded = {}
46         self.num_retries = 4
47         self.uuid = None
48         self.work_api = work_api
49
50         if self.work_api is None:
51             # todo: autodetect API to use.
52             self.work_api = "jobs"
53
54         if self.work_api not in ("containers", "jobs"):
55             raise Exception("Unsupported API '%s'" % self.work_api)
56
57     def arvMakeTool(self, toolpath_object, **kwargs):
58         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
59             return ArvadosCommandTool(self, toolpath_object, work_api=self.work_api, **kwargs)
60         else:
61             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
62
63     def output_callback(self, out, processStatus):
64         if processStatus == "success":
65             logger.info("Overall process status is %s", processStatus)
66             if self.pipeline:
67                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
68                                                      body={"state": "Complete"}).execute(num_retries=self.num_retries)
69         else:
70             logger.warn("Overall process status is %s", processStatus)
71             if self.pipeline:
72                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
73                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
74         self.final_status = processStatus
75         self.final_output = out
76
77     def on_message(self, event):
78         if "object_uuid" in event:
79             if event["object_uuid"] in self.processes and event["event_type"] == "update":
80                 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
81                     uuid = event["object_uuid"]
82                     with self.lock:
83                         j = self.processes[uuid]
84                         logger.info("Job %s (%s) is Running", j.name, uuid)
85                         j.running = True
86                         j.update_pipeline_component(event["properties"]["new_attributes"])
87                 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
88                     uuid = event["object_uuid"]
89                     try:
90                         self.cond.acquire()
91                         j = self.processes[uuid]
92                         logger.info("Job %s (%s) is %s", j.name, uuid, event["properties"]["new_attributes"]["state"])
93                         j.done(event["properties"]["new_attributes"])
94                         self.cond.notify()
95                     finally:
96                         self.cond.release()
97
98     def get_uploaded(self):
99         return self.uploaded.copy()
100
101     def add_uploaded(self, src, pair):
102         self.uploaded[src] = pair
103
104     def arvExecutor(self, tool, job_order, **kwargs):
105         self.debug = kwargs.get("debug")
106
107         if kwargs.get("quiet"):
108             logger.setLevel(logging.WARN)
109             logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
110
111         useruuid = self.api.users().current().execute()["uuid"]
112         self.project_uuid = kwargs.get("project_uuid") if kwargs.get("project_uuid") else useruuid
113         self.pipeline = None
114         self.fs_access = CollectionFsAccess(kwargs["basedir"], api_client=self.api)
115
116         if kwargs.get("create_template"):
117             tmpl = RunnerTemplate(self, tool, job_order, kwargs.get("enable_reuse"))
118             tmpl.save()
119             # cwltool.main will write our return value to stdout.
120             return tmpl.uuid
121
122         self.debug = kwargs.get("debug")
123         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
124
125         kwargs["fs_access"] = self.fs_access
126         kwargs["enable_reuse"] = kwargs.get("enable_reuse")
127         kwargs["use_container"] = True
128         kwargs["tmpdir_prefix"] = "tmp"
129         kwargs["on_error"] = "continue"
130
131         if self.work_api == "containers":
132             kwargs["outdir"] = "/var/spool/cwl"
133             kwargs["docker_outdir"] = "/var/spool/cwl"
134             kwargs["tmpdir"] = "/tmp"
135         elif self.work_api == "jobs":
136             kwargs["outdir"] = "$(task.outdir)"
137             kwargs["docker_outdir"] = "$(task.outdir)"
138             kwargs["tmpdir"] = "$(task.tmpdir)"
139
140         runnerjob = None
141         if kwargs.get("submit"):
142             if self.work_api == "containers":
143                 if tool.tool["class"] == "CommandLineTool":
144                     runnerjob = tool.job(job_order,
145                                          self.output_callback,
146                                          **kwargs).next()
147                 else:
148                     runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"))
149             else:
150                 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"))
151
152         if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and not self.work_api == "containers":
153             # Create pipeline for local run
154             self.pipeline = self.api.pipeline_instances().create(
155                 body={
156                     "owner_uuid": self.project_uuid,
157                     "name": shortname(tool.tool["id"]),
158                     "components": {},
159                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
160             logger.info("Pipeline instance %s", self.pipeline["uuid"])
161
162         if runnerjob and not kwargs.get("wait"):
163             runnerjob.run()
164             return runnerjob.uuid
165
166         arvados.config.settings()["ARVADOS_DISABLE_WEBSOCKETS"] = "1"
167
168         if self.work_api == "containers":
169             events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#container"]], self.on_message)
170         if self.work_api == "jobs":
171             events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#job"]], self.on_message)
172
173         if runnerjob:
174             jobiter = iter((runnerjob,))
175         else:
176             if "cwl_runner_job" in kwargs:
177                 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
178             jobiter = tool.job(job_order,
179                                self.output_callback,
180                                **kwargs)
181
182         try:
183             self.cond.acquire()
184             # Will continue to hold the lock for the duration of this code
185             # except when in cond.wait(), at which point on_message can update
186             # job state and process output callbacks.
187
188             for runnable in jobiter:
189                 if runnable:
190                     runnable.run(**kwargs)
191                 else:
192                     if self.processes:
193                         self.cond.wait(1)
194                     else:
195                         logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
196                         break
197
198             while self.processes:
199                 self.cond.wait(1)
200
201             events.close()
202         except UnsupportedRequirement:
203             raise
204         except:
205             if sys.exc_info()[0] is KeyboardInterrupt:
206                 logger.error("Interrupted, marking pipeline as failed")
207             else:
208                 logger.error("Caught unhandled exception, marking pipeline as failed.  Error was: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
209             if self.pipeline:
210                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
211                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
212             if runnerjob and runnerjob.uuid and self.work_api == "containers":
213                 self.api.container_requests().update(uuid=runnerjob.uuid,
214                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
215         finally:
216             self.cond.release()
217
218         if self.final_status == "UnsupportedRequirement":
219             raise UnsupportedRequirement("Check log for details.")
220
221         if self.final_status != "success":
222             raise WorkflowException("Workflow failed.")
223
224         if self.final_output is None:
225             raise WorkflowException("Workflow did not return a result.")
226
227         return self.final_output
228
229
230 def versionstring():
231     """Print version string of key packages for provenance and debugging."""
232
233     arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
234     arvpkg = pkg_resources.require("arvados-python-client")
235     cwlpkg = pkg_resources.require("cwltool")
236
237     return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
238                                     "arvados-python-client", arvpkg[0].version,
239                                     "cwltool", cwlpkg[0].version)
240
241
242 def arg_parser():  # type: () -> argparse.ArgumentParser
243     parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
244
245     parser.add_argument("--basedir", type=str,
246                         help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
247     parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
248                         help="Output directory, default current directory")
249
250     parser.add_argument("--eval-timeout",
251                         help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
252                         type=float,
253                         default=20)
254     parser.add_argument("--version", action="store_true", help="Print version and exit")
255
256     exgroup = parser.add_mutually_exclusive_group()
257     exgroup.add_argument("--verbose", action="store_true", help="Default logging")
258     exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
259     exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
260
261     parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
262
263     exgroup = parser.add_mutually_exclusive_group()
264     exgroup.add_argument("--enable-reuse", action="store_true",
265                         default=True, dest="enable_reuse",
266                         help="")
267     exgroup.add_argument("--disable-reuse", action="store_false",
268                         default=True, dest="enable_reuse",
269                         help="")
270
271     parser.add_argument("--project-uuid", type=str, help="Project that will own the workflow jobs, if not provided, will go to home project.")
272     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
273                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
274                         default=False)
275
276     exgroup = parser.add_mutually_exclusive_group()
277     exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
278                         default=True, dest="submit")
279     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
280                         default=True, dest="submit")
281     exgroup.add_argument("--create-template", action="store_true", help="Create an Arvados pipeline template.")
282
283     exgroup = parser.add_mutually_exclusive_group()
284     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
285                         default=True, dest="wait")
286     exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
287                         default=True, dest="wait")
288
289     parser.add_argument("--api", type=str,
290                         default=None, dest="work_api",
291                         help="Select work submission API, one of 'jobs' or 'containers'.")
292
293     parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
294     parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
295
296     return parser
297
298
299 def main(args, stdout, stderr, api_client=None):
300     parser = arg_parser()
301
302     job_order_object = None
303     arvargs = parser.parse_args(args)
304     if arvargs.create_template and not arvargs.job_order:
305         job_order_object = ({}, "")
306
307     try:
308         if api_client is None:
309             api_client=arvados.api('v1', model=OrderedJsonModel())
310         runner = ArvCwlRunner(api_client, work_api=arvargs.work_api)
311     except Exception as e:
312         logger.error(e)
313         return 1
314
315     arvargs.conformance_test = None
316     arvargs.use_container = True
317
318     return cwltool.main.main(args=arvargs,
319                              stdout=stdout,
320                              stderr=stderr,
321                              executor=runner.arvExecutor,
322                              makeTool=runner.arvMakeTool,
323                              versionfunc=versionstring,
324                              job_order_object=job_order_object)