Merge branch '9333-api-server-invalid-sql'
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2
3 # Implement cwl-runner interface for submitting and running work on Arvados, using
4 # either the Crunch jobs API or Crunch containers API.
5
6 import argparse
7 import logging
8 import os
9 import sys
10 import threading
11 import hashlib
12 from functools import partial
13 import pkg_resources  # part of setuptools
14
15 from cwltool.errors import WorkflowException
16 import cwltool.main
17 import cwltool.workflow
18
19 import arvados
20 import arvados.events
21 import arvados.config
22
23 from .arvcontainer import ArvadosContainer, RunnerContainer
24 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
25 from .arvtool import ArvadosCommandTool
26 from .fsaccess import CollectionFsAccess
27
28 from cwltool.process import shortname, UnsupportedRequirement
29 from cwltool.pathmapper import adjustFileObjs
30 from cwltool.draft2tool import compute_checksums
31 from arvados.api import OrderedJsonModel
32
33 logger = logging.getLogger('arvados.cwl-runner')
34 logger.setLevel(logging.INFO)
35
36 class ArvCwlRunner(object):
37     """Execute a CWL tool or workflow, submit work (using either jobs or
38     containers API), wait for them to complete, and report output.
39
40     """
41
42     def __init__(self, api_client, work_api=None):
43         self.api = api_client
44         self.processes = {}
45         self.lock = threading.Lock()
46         self.cond = threading.Condition(self.lock)
47         self.final_output = None
48         self.final_status = None
49         self.uploaded = {}
50         self.num_retries = 4
51         self.uuid = None
52         self.work_api = work_api
53
54         if self.work_api is None:
55             # todo: autodetect API to use.
56             self.work_api = "jobs"
57
58         if self.work_api not in ("containers", "jobs"):
59             raise Exception("Unsupported API '%s'" % self.work_api)
60
61     def arvMakeTool(self, toolpath_object, **kwargs):
62         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
63             return ArvadosCommandTool(self, toolpath_object, work_api=self.work_api, **kwargs)
64         else:
65             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
66
67     def output_callback(self, out, processStatus):
68         if processStatus == "success":
69             logger.info("Overall process status is %s", processStatus)
70             if self.pipeline:
71                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
72                                                      body={"state": "Complete"}).execute(num_retries=self.num_retries)
73         else:
74             logger.warn("Overall process status is %s", processStatus)
75             if self.pipeline:
76                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
77                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
78         self.final_status = processStatus
79         self.final_output = out
80
81     def on_message(self, event):
82         if "object_uuid" in event:
83             if event["object_uuid"] in self.processes and event["event_type"] == "update":
84                 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
85                     uuid = event["object_uuid"]
86                     with self.lock:
87                         j = self.processes[uuid]
88                         logger.info("Job %s (%s) is Running", j.name, uuid)
89                         j.running = True
90                         j.update_pipeline_component(event["properties"]["new_attributes"])
91                 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
92                     uuid = event["object_uuid"]
93                     try:
94                         self.cond.acquire()
95                         j = self.processes[uuid]
96                         logger.info("Job %s (%s) is %s", j.name, uuid, event["properties"]["new_attributes"]["state"])
97                         j.done(event["properties"]["new_attributes"])
98                         self.cond.notify()
99                     finally:
100                         self.cond.release()
101
102     def get_uploaded(self):
103         return self.uploaded.copy()
104
105     def add_uploaded(self, src, pair):
106         self.uploaded[src] = pair
107
108     def check_writable(self, obj):
109         if isinstance(obj, dict):
110             if obj.get("writable"):
111                 raise UnsupportedRequirement("InitialWorkDir feature 'writable: true' not supported")
112             for v in obj.itervalues():
113                 self.check_writable(v)
114         if isinstance(obj, list):
115             for v in obj:
116                 self.check_writable(v)
117
118     def arvExecutor(self, tool, job_order, **kwargs):
119         self.debug = kwargs.get("debug")
120
121         tool.visit(self.check_writable)
122
123         if kwargs.get("quiet"):
124             logger.setLevel(logging.WARN)
125             logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
126
127         useruuid = self.api.users().current().execute()["uuid"]
128         self.project_uuid = kwargs.get("project_uuid") if kwargs.get("project_uuid") else useruuid
129         self.pipeline = None
130         make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess, api_client=self.api)
131         self.fs_access = make_fs_access(kwargs["basedir"])
132
133         if kwargs.get("create_template"):
134             tmpl = RunnerTemplate(self, tool, job_order, kwargs.get("enable_reuse"))
135             tmpl.save()
136             # cwltool.main will write our return value to stdout.
137             return tmpl.uuid
138
139         self.debug = kwargs.get("debug")
140         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
141
142         kwargs["make_fs_access"] = make_fs_access
143         kwargs["enable_reuse"] = kwargs.get("enable_reuse")
144         kwargs["use_container"] = True
145         kwargs["tmpdir_prefix"] = "tmp"
146         kwargs["on_error"] = "continue"
147         kwargs["compute_checksum"] = kwargs.get("compute_checksum")
148
149         if self.work_api == "containers":
150             kwargs["outdir"] = "/var/spool/cwl"
151             kwargs["docker_outdir"] = "/var/spool/cwl"
152             kwargs["tmpdir"] = "/tmp"
153             kwargs["docker_tmpdir"] = "/tmp"
154         elif self.work_api == "jobs":
155             kwargs["outdir"] = "$(task.outdir)"
156             kwargs["docker_outdir"] = "$(task.outdir)"
157             kwargs["tmpdir"] = "$(task.tmpdir)"
158
159         runnerjob = None
160         if kwargs.get("submit"):
161             if self.work_api == "containers":
162                 if tool.tool["class"] == "CommandLineTool":
163                     runnerjob = tool.job(job_order,
164                                          self.output_callback,
165                                          **kwargs).next()
166                 else:
167                     runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"))
168             else:
169                 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"))
170
171         if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and not self.work_api == "containers":
172             # Create pipeline for local run
173             self.pipeline = self.api.pipeline_instances().create(
174                 body={
175                     "owner_uuid": self.project_uuid,
176                     "name": shortname(tool.tool["id"]),
177                     "components": {},
178                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
179             logger.info("Pipeline instance %s", self.pipeline["uuid"])
180
181         if runnerjob and not kwargs.get("wait"):
182             runnerjob.run()
183             return runnerjob.uuid
184
185         arvados.config.settings()["ARVADOS_DISABLE_WEBSOCKETS"] = "1"
186
187         if self.work_api == "containers":
188             events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#container"]], self.on_message)
189         if self.work_api == "jobs":
190             events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#job"]], self.on_message)
191
192         if runnerjob:
193             jobiter = iter((runnerjob,))
194         else:
195             if "cwl_runner_job" in kwargs:
196                 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
197             jobiter = tool.job(job_order,
198                                self.output_callback,
199                                **kwargs)
200
201         try:
202             self.cond.acquire()
203             # Will continue to hold the lock for the duration of this code
204             # except when in cond.wait(), at which point on_message can update
205             # job state and process output callbacks.
206
207             for runnable in jobiter:
208                 if runnable:
209                     runnable.run(**kwargs)
210                 else:
211                     if self.processes:
212                         self.cond.wait(1)
213                     else:
214                         logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
215                         break
216
217             while self.processes:
218                 self.cond.wait(1)
219
220             events.close()
221         except UnsupportedRequirement:
222             raise
223         except:
224             if sys.exc_info()[0] is KeyboardInterrupt:
225                 logger.error("Interrupted, marking pipeline as failed")
226             else:
227                 logger.error("Caught unhandled exception, marking pipeline as failed.  Error was: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
228             if self.pipeline:
229                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
230                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
231             if runnerjob and runnerjob.uuid and self.work_api == "containers":
232                 self.api.container_requests().update(uuid=runnerjob.uuid,
233                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
234         finally:
235             self.cond.release()
236
237         if self.final_status == "UnsupportedRequirement":
238             raise UnsupportedRequirement("Check log for details.")
239
240         if self.final_status != "success":
241             raise WorkflowException("Workflow failed.")
242
243         if self.final_output is None:
244             raise WorkflowException("Workflow did not return a result.")
245
246         if kwargs.get("compute_checksum"):
247             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
248
249         return self.final_output
250
251
252 def versionstring():
253     """Print version string of key packages for provenance and debugging."""
254
255     arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
256     arvpkg = pkg_resources.require("arvados-python-client")
257     cwlpkg = pkg_resources.require("cwltool")
258
259     return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
260                                     "arvados-python-client", arvpkg[0].version,
261                                     "cwltool", cwlpkg[0].version)
262
263
264 def arg_parser():  # type: () -> argparse.ArgumentParser
265     parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
266
267     parser.add_argument("--basedir", type=str,
268                         help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
269     parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
270                         help="Output directory, default current directory")
271
272     parser.add_argument("--eval-timeout",
273                         help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
274                         type=float,
275                         default=20)
276     parser.add_argument("--version", action="store_true", help="Print version and exit")
277
278     exgroup = parser.add_mutually_exclusive_group()
279     exgroup.add_argument("--verbose", action="store_true", help="Default logging")
280     exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
281     exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
282
283     parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
284
285     exgroup = parser.add_mutually_exclusive_group()
286     exgroup.add_argument("--enable-reuse", action="store_true",
287                         default=True, dest="enable_reuse",
288                         help="")
289     exgroup.add_argument("--disable-reuse", action="store_false",
290                         default=True, dest="enable_reuse",
291                         help="")
292
293     parser.add_argument("--project-uuid", type=str, help="Project that will own the workflow jobs, if not provided, will go to home project.")
294     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
295                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
296                         default=False)
297
298     exgroup = parser.add_mutually_exclusive_group()
299     exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
300                         default=True, dest="submit")
301     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
302                         default=True, dest="submit")
303     exgroup.add_argument("--create-template", action="store_true", help="Create an Arvados pipeline template.")
304
305     exgroup = parser.add_mutually_exclusive_group()
306     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
307                         default=True, dest="wait")
308     exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
309                         default=True, dest="wait")
310
311     parser.add_argument("--api", type=str,
312                         default=None, dest="work_api",
313                         help="Select work submission API, one of 'jobs' or 'containers'.")
314
315     parser.add_argument("--compute-checksum", action="store_true", default=False,
316                         help="Compute checksum of contents while collecting outputs",
317                         dest="compute_checksum")
318
319     parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
320     parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
321
322     return parser
323
324
325 def main(args, stdout, stderr, api_client=None):
326     parser = arg_parser()
327
328     job_order_object = None
329     arvargs = parser.parse_args(args)
330     if arvargs.create_template and not arvargs.job_order:
331         job_order_object = ({}, "")
332
333     try:
334         if api_client is None:
335             api_client=arvados.api('v1', model=OrderedJsonModel())
336         runner = ArvCwlRunner(api_client, work_api=arvargs.work_api)
337     except Exception as e:
338         logger.error(e)
339         return 1
340
341     arvargs.conformance_test = None
342     arvargs.use_container = True
343
344     return cwltool.main.main(args=arvargs,
345                              stdout=stdout,
346                              stderr=stderr,
347                              executor=runner.arvExecutor,
348                              makeTool=runner.arvMakeTool,
349                              versionfunc=versionstring,
350                              job_order_object=job_order_object,
351                              make_fs_access=partial(CollectionFsAccess, api_client=api_client))