Don't pass work_api twice in ArvadosCommandTool constructor. Bump arvados-python...
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2
3 # Implement cwl-runner interface for submitting and running work on Arvados, using
4 # either the Crunch jobs API or Crunch containers API.
5
6 import argparse
7 import logging
8 import os
9 import sys
10 import threading
11 import hashlib
12 from functools import partial
13 import pkg_resources  # part of setuptools
14
15 from cwltool.errors import WorkflowException
16 import cwltool.main
17 import cwltool.workflow
18
19 import arvados
20 import arvados.config
21
22 from .arvcontainer import ArvadosContainer, RunnerContainer
23 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
24 from .arvtool import ArvadosCommandTool
25 from .fsaccess import CollectionFsAccess
26
27 from cwltool.process import shortname, UnsupportedRequirement
28 from cwltool.pathmapper import adjustFileObjs
29 from cwltool.draft2tool import compute_checksums
30 from arvados.api import OrderedJsonModel
31
32 logger = logging.getLogger('arvados.cwl-runner')
33 logger.setLevel(logging.INFO)
34
35 class ArvCwlRunner(object):
36     """Execute a CWL tool or workflow, submit work (using either jobs or
37     containers API), wait for them to complete, and report output.
38
39     """
40
41     def __init__(self, api_client, work_api=None):
42         self.api = api_client
43         self.processes = {}
44         self.lock = threading.Lock()
45         self.cond = threading.Condition(self.lock)
46         self.final_output = None
47         self.final_status = None
48         self.uploaded = {}
49         self.num_retries = 4
50         self.uuid = None
51         self.work_api = work_api
52         self.stop_polling = threading.Event()
53         self.poll_api = None
54
55         if self.work_api is None:
56             # todo: autodetect API to use.
57             self.work_api = "jobs"
58
59         if self.work_api not in ("containers", "jobs"):
60             raise Exception("Unsupported API '%s'" % self.work_api)
61
62     def arvMakeTool(self, toolpath_object, **kwargs):
63         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
64             kwargs["work_api"] = self.work_api
65             return ArvadosCommandTool(self, toolpath_object, **kwargs)
66         else:
67             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
68
69     def output_callback(self, out, processStatus):
70         if processStatus == "success":
71             logger.info("Overall process status is %s", processStatus)
72             if self.pipeline:
73                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
74                                                      body={"state": "Complete"}).execute(num_retries=self.num_retries)
75         else:
76             logger.warn("Overall process status is %s", processStatus)
77             if self.pipeline:
78                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
79                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
80         self.final_status = processStatus
81         self.final_output = out
82
83     def on_message(self, event):
84         if "object_uuid" in event:
85             if event["object_uuid"] in self.processes and event["event_type"] == "update":
86                 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
87                     uuid = event["object_uuid"]
88                     with self.lock:
89                         j = self.processes[uuid]
90                         logger.info("Job %s (%s) is Running", j.name, uuid)
91                         j.running = True
92                         j.update_pipeline_component(event["properties"]["new_attributes"])
93                 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
94                     uuid = event["object_uuid"]
95                     try:
96                         self.cond.acquire()
97                         j = self.processes[uuid]
98                         logger.info("Job %s (%s) is %s", j.name, uuid, event["properties"]["new_attributes"]["state"])
99                         j.done(event["properties"]["new_attributes"])
100                         self.cond.notify()
101                     finally:
102                         self.cond.release()
103
104     def poll_states(self):
105         """Poll status of jobs or containers listed in the processes dict.
106
107         Runs in a separate thread.
108         """
109
110         while True:
111             self.stop_polling.wait(15)
112             if self.stop_polling.is_set():
113                 break
114             with self.lock:
115                 keys = self.processes.keys()
116             if not keys:
117                 continue
118
119             if self.work_api == "containers":
120                 table = self.poll_api.containers()
121             elif self.work_api == "jobs":
122                 table = self.poll_api.jobs()
123
124             try:
125                 proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
126             except Exception as e:
127                 logger.warn("Error checking states on API server: %s", e)
128                 continue
129
130             for p in proc_states["items"]:
131                 self.on_message({
132                     "object_uuid": p["uuid"],
133                     "event_type": "update",
134                     "properties": {
135                         "new_attributes": p
136                     }
137                 })
138
139     def get_uploaded(self):
140         return self.uploaded.copy()
141
142     def add_uploaded(self, src, pair):
143         self.uploaded[src] = pair
144
145     def check_writable(self, obj):
146         if isinstance(obj, dict):
147             if obj.get("writable"):
148                 raise UnsupportedRequirement("InitialWorkDir feature 'writable: true' not supported")
149             for v in obj.itervalues():
150                 self.check_writable(v)
151         if isinstance(obj, list):
152             for v in obj:
153                 self.check_writable(v)
154
155     def arvExecutor(self, tool, job_order, **kwargs):
156         self.debug = kwargs.get("debug")
157
158         tool.visit(self.check_writable)
159
160         if kwargs.get("quiet"):
161             logger.setLevel(logging.WARN)
162             logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
163
164         useruuid = self.api.users().current().execute()["uuid"]
165         self.project_uuid = kwargs.get("project_uuid") if kwargs.get("project_uuid") else useruuid
166         self.pipeline = None
167         make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess, api_client=self.api)
168         self.fs_access = make_fs_access(kwargs["basedir"])
169
170         if kwargs.get("create_template"):
171             tmpl = RunnerTemplate(self, tool, job_order, kwargs.get("enable_reuse"))
172             tmpl.save()
173             # cwltool.main will write our return value to stdout.
174             return tmpl.uuid
175
176         self.debug = kwargs.get("debug")
177         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
178
179         kwargs["make_fs_access"] = make_fs_access
180         kwargs["enable_reuse"] = kwargs.get("enable_reuse")
181         kwargs["use_container"] = True
182         kwargs["tmpdir_prefix"] = "tmp"
183         kwargs["on_error"] = "continue"
184         kwargs["compute_checksum"] = kwargs.get("compute_checksum")
185
186         if self.work_api == "containers":
187             kwargs["outdir"] = "/var/spool/cwl"
188             kwargs["docker_outdir"] = "/var/spool/cwl"
189             kwargs["tmpdir"] = "/tmp"
190             kwargs["docker_tmpdir"] = "/tmp"
191         elif self.work_api == "jobs":
192             kwargs["outdir"] = "$(task.outdir)"
193             kwargs["docker_outdir"] = "$(task.outdir)"
194             kwargs["tmpdir"] = "$(task.tmpdir)"
195
196         runnerjob = None
197         if kwargs.get("submit"):
198             if self.work_api == "containers":
199                 if tool.tool["class"] == "CommandLineTool":
200                     runnerjob = tool.job(job_order,
201                                          self.output_callback,
202                                          **kwargs).next()
203                 else:
204                     runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"))
205             else:
206                 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"))
207
208         if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and not self.work_api == "containers":
209             # Create pipeline for local run
210             self.pipeline = self.api.pipeline_instances().create(
211                 body={
212                     "owner_uuid": self.project_uuid,
213                     "name": shortname(tool.tool["id"]),
214                     "components": {},
215                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
216             logger.info("Pipeline instance %s", self.pipeline["uuid"])
217
218         if runnerjob and not kwargs.get("wait"):
219             runnerjob.run()
220             return runnerjob.uuid
221
222         self.poll_api = arvados.api('v1')
223         self.polling_thread = threading.Thread(target=self.poll_states)
224         self.polling_thread.start()
225
226         if runnerjob:
227             jobiter = iter((runnerjob,))
228         else:
229             if "cwl_runner_job" in kwargs:
230                 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
231             jobiter = tool.job(job_order,
232                                self.output_callback,
233                                **kwargs)
234
235         try:
236             self.cond.acquire()
237             # Will continue to hold the lock for the duration of this code
238             # except when in cond.wait(), at which point on_message can update
239             # job state and process output callbacks.
240
241             for runnable in jobiter:
242                 if runnable:
243                     runnable.run(**kwargs)
244                 else:
245                     if self.processes:
246                         self.cond.wait(1)
247                     else:
248                         logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
249                         break
250
251             while self.processes:
252                 self.cond.wait(1)
253
254         except UnsupportedRequirement:
255             raise
256         except:
257             if sys.exc_info()[0] is KeyboardInterrupt:
258                 logger.error("Interrupted, marking pipeline as failed")
259             else:
260                 logger.error("Caught unhandled exception, marking pipeline as failed.  Error was: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
261             if self.pipeline:
262                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
263                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
264             if runnerjob and runnerjob.uuid and self.work_api == "containers":
265                 self.api.container_requests().update(uuid=runnerjob.uuid,
266                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
267         finally:
268             self.cond.release()
269             self.stop_polling.set()
270             self.polling_thread.join()
271
272         if self.final_status == "UnsupportedRequirement":
273             raise UnsupportedRequirement("Check log for details.")
274
275         if self.final_status != "success":
276             raise WorkflowException("Workflow failed.")
277
278         if self.final_output is None:
279             raise WorkflowException("Workflow did not return a result.")
280
281         if kwargs.get("compute_checksum"):
282             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
283
284         return self.final_output
285
286
287 def versionstring():
288     """Print version string of key packages for provenance and debugging."""
289
290     arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
291     arvpkg = pkg_resources.require("arvados-python-client")
292     cwlpkg = pkg_resources.require("cwltool")
293
294     return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
295                                     "arvados-python-client", arvpkg[0].version,
296                                     "cwltool", cwlpkg[0].version)
297
298
299 def arg_parser():  # type: () -> argparse.ArgumentParser
300     parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
301
302     parser.add_argument("--basedir", type=str,
303                         help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
304     parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
305                         help="Output directory, default current directory")
306
307     parser.add_argument("--eval-timeout",
308                         help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
309                         type=float,
310                         default=20)
311     parser.add_argument("--version", action="store_true", help="Print version and exit")
312
313     exgroup = parser.add_mutually_exclusive_group()
314     exgroup.add_argument("--verbose", action="store_true", help="Default logging")
315     exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
316     exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
317
318     parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
319
320     exgroup = parser.add_mutually_exclusive_group()
321     exgroup.add_argument("--enable-reuse", action="store_true",
322                         default=True, dest="enable_reuse",
323                         help="")
324     exgroup.add_argument("--disable-reuse", action="store_false",
325                         default=True, dest="enable_reuse",
326                         help="")
327
328     parser.add_argument("--project-uuid", type=str, help="Project that will own the workflow jobs, if not provided, will go to home project.")
329     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
330                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
331                         default=False)
332
333     exgroup = parser.add_mutually_exclusive_group()
334     exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
335                         default=True, dest="submit")
336     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
337                         default=True, dest="submit")
338     exgroup.add_argument("--create-template", action="store_true", help="Create an Arvados pipeline template.")
339
340     exgroup = parser.add_mutually_exclusive_group()
341     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
342                         default=True, dest="wait")
343     exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
344                         default=True, dest="wait")
345
346     parser.add_argument("--api", type=str,
347                         default=None, dest="work_api",
348                         help="Select work submission API, one of 'jobs' or 'containers'.")
349
350     parser.add_argument("--compute-checksum", action="store_true", default=False,
351                         help="Compute checksum of contents while collecting outputs",
352                         dest="compute_checksum")
353
354     parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
355     parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
356
357     return parser
358
359
360 def main(args, stdout, stderr, api_client=None):
361     parser = arg_parser()
362
363     job_order_object = None
364     arvargs = parser.parse_args(args)
365     if arvargs.create_template and not arvargs.job_order:
366         job_order_object = ({}, "")
367
368     try:
369         if api_client is None:
370             api_client=arvados.api('v1', model=OrderedJsonModel())
371         runner = ArvCwlRunner(api_client, work_api=arvargs.work_api)
372     except Exception as e:
373         logger.error(e)
374         return 1
375
376     arvargs.conformance_test = None
377     arvargs.use_container = True
378
379     return cwltool.main.main(args=arvargs,
380                              stdout=stdout,
381                              stderr=stderr,
382                              executor=runner.arvExecutor,
383                              makeTool=runner.arvMakeTool,
384                              versionfunc=versionstring,
385                              job_order_object=job_order_object,
386                              make_fs_access=partial(CollectionFsAccess, api_client=api_client))