Merge branch 'master' into 9766-register-workflow
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2
3 # Implement cwl-runner interface for submitting and running work on Arvados, using
4 # either the Crunch jobs API or Crunch containers API.
5
6 import argparse
7 import logging
8 import os
9 import sys
10 import threading
11 import hashlib
12 from functools import partial
13 import pkg_resources  # part of setuptools
14
15 from cwltool.errors import WorkflowException
16 import cwltool.main
17 import cwltool.workflow
18
19 import arvados
20 import arvados.config
21
22 from .arvcontainer import ArvadosContainer, RunnerContainer
23 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
24 from .arvtool import ArvadosCommandTool
25 from .fsaccess import CollectionFsAccess
26 from .arvworkflow import make_workflow
27
28 from cwltool.process import shortname, UnsupportedRequirement
29 from cwltool.pathmapper import adjustFileObjs
30 from cwltool.draft2tool import compute_checksums
31 from arvados.api import OrderedJsonModel
32
33 logger = logging.getLogger('arvados.cwl-runner')
34 logger.setLevel(logging.INFO)
35
36 class ArvCwlRunner(object):
37     """Execute a CWL tool or workflow, submit work (using either jobs or
38     containers API), wait for them to complete, and report output.
39
40     """
41
42     def __init__(self, api_client, work_api=None):
43         self.api = api_client
44         self.processes = {}
45         self.lock = threading.Lock()
46         self.cond = threading.Condition(self.lock)
47         self.final_output = None
48         self.final_status = None
49         self.uploaded = {}
50         self.num_retries = 4
51         self.uuid = None
52         self.work_api = work_api
53         self.stop_polling = threading.Event()
54         self.poll_api = None
55
56         if self.work_api is None:
57             # todo: autodetect API to use.
58             self.work_api = "jobs"
59
60         if self.work_api not in ("containers", "jobs"):
61             raise Exception("Unsupported API '%s'" % self.work_api)
62
63     def arvMakeTool(self, toolpath_object, **kwargs):
64         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
65             return ArvadosCommandTool(self, toolpath_object, work_api=self.work_api, **kwargs)
66         else:
67             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
68
69     def output_callback(self, out, processStatus):
70         if processStatus == "success":
71             logger.info("Overall process status is %s", processStatus)
72             if self.pipeline:
73                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
74                                                      body={"state": "Complete"}).execute(num_retries=self.num_retries)
75         else:
76             logger.warn("Overall process status is %s", processStatus)
77             if self.pipeline:
78                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
79                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
80         self.final_status = processStatus
81         self.final_output = out
82
83     def on_message(self, event):
84         if "object_uuid" in event:
85             if event["object_uuid"] in self.processes and event["event_type"] == "update":
86                 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
87                     uuid = event["object_uuid"]
88                     with self.lock:
89                         j = self.processes[uuid]
90                         logger.info("Job %s (%s) is Running", j.name, uuid)
91                         j.running = True
92                         j.update_pipeline_component(event["properties"]["new_attributes"])
93                 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
94                     uuid = event["object_uuid"]
95                     try:
96                         self.cond.acquire()
97                         j = self.processes[uuid]
98                         logger.info("Job %s (%s) is %s", j.name, uuid, event["properties"]["new_attributes"]["state"])
99                         j.done(event["properties"]["new_attributes"])
100                         self.cond.notify()
101                     finally:
102                         self.cond.release()
103
104     def poll_states(self):
105         """Poll status of jobs or containers listed in the processes dict.
106
107         Runs in a separate thread.
108         """
109
110         while True:
111             self.stop_polling.wait(15)
112             if self.stop_polling.is_set():
113                 break
114             with self.lock:
115                 keys = self.processes.keys()
116             if not keys:
117                 continue
118
119             if self.work_api == "containers":
120                 table = self.poll_api.containers()
121             elif self.work_api == "jobs":
122                 table = self.poll_api.jobs()
123
124             try:
125                 proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
126             except Exception as e:
127                 logger.warn("Error checking states on API server: %s", e)
128                 continue
129
130             for p in proc_states["items"]:
131                 self.on_message({
132                     "object_uuid": p["uuid"],
133                     "event_type": "update",
134                     "properties": {
135                         "new_attributes": p
136                     }
137                 })
138
139     def get_uploaded(self):
140         return self.uploaded.copy()
141
142     def add_uploaded(self, src, pair):
143         self.uploaded[src] = pair
144
145     def check_writable(self, obj):
146         if isinstance(obj, dict):
147             if obj.get("writable"):
148                 raise UnsupportedRequirement("InitialWorkDir feature 'writable: true' not supported")
149             for v in obj.itervalues():
150                 self.check_writable(v)
151         if isinstance(obj, list):
152             for v in obj:
153                 self.check_writable(v)
154
155     def arvExecutor(self, tool, job_order, **kwargs):
156         self.debug = kwargs.get("debug")
157
158         tool.visit(self.check_writable)
159
160         if kwargs.get("quiet"):
161             logger.setLevel(logging.WARN)
162             logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
163
164         useruuid = self.api.users().current().execute()["uuid"]
165         self.project_uuid = kwargs.get("project_uuid") if kwargs.get("project_uuid") else useruuid
166         self.pipeline = None
167         make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess, api_client=self.api)
168         self.fs_access = make_fs_access(kwargs["basedir"])
169
170         if kwargs.get("create_template"):
171             tmpl = RunnerTemplate(self, tool, job_order, kwargs.get("enable_reuse"))
172             tmpl.save()
173             # cwltool.main will write our return value to stdout.
174             return tmpl.uuid
175
176         if kwargs.get("create_workflow") or kwargs.get("update_workflow"):
177             return make_workflow(self, tool, job_order, self.project_uuid, kwargs.get("update_workflow"))
178
179         self.debug = kwargs.get("debug")
180         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
181
182         kwargs["make_fs_access"] = make_fs_access
183         kwargs["enable_reuse"] = kwargs.get("enable_reuse")
184         kwargs["use_container"] = True
185         kwargs["tmpdir_prefix"] = "tmp"
186         kwargs["on_error"] = "continue"
187         kwargs["compute_checksum"] = kwargs.get("compute_checksum")
188
189         if self.work_api == "containers":
190             kwargs["outdir"] = "/var/spool/cwl"
191             kwargs["docker_outdir"] = "/var/spool/cwl"
192             kwargs["tmpdir"] = "/tmp"
193             kwargs["docker_tmpdir"] = "/tmp"
194         elif self.work_api == "jobs":
195             kwargs["outdir"] = "$(task.outdir)"
196             kwargs["docker_outdir"] = "$(task.outdir)"
197             kwargs["tmpdir"] = "$(task.tmpdir)"
198
199         runnerjob = None
200         if kwargs.get("submit"):
201             if self.work_api == "containers":
202                 if tool.tool["class"] == "CommandLineTool":
203                     runnerjob = tool.job(job_order,
204                                          self.output_callback,
205                                          **kwargs).next()
206                 else:
207                     runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"))
208             else:
209                 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"))
210
211         if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and not self.work_api == "containers":
212             # Create pipeline for local run
213             self.pipeline = self.api.pipeline_instances().create(
214                 body={
215                     "owner_uuid": self.project_uuid,
216                     "name": shortname(tool.tool["id"]),
217                     "components": {},
218                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
219             logger.info("Pipeline instance %s", self.pipeline["uuid"])
220
221         if runnerjob and not kwargs.get("wait"):
222             runnerjob.run()
223             return runnerjob.uuid
224
225         self.poll_api = arvados.api('v1')
226         self.polling_thread = threading.Thread(target=self.poll_states)
227         self.polling_thread.start()
228
229         if runnerjob:
230             jobiter = iter((runnerjob,))
231         else:
232             if "cwl_runner_job" in kwargs:
233                 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
234             jobiter = tool.job(job_order,
235                                self.output_callback,
236                                **kwargs)
237
238         try:
239             self.cond.acquire()
240             # Will continue to hold the lock for the duration of this code
241             # except when in cond.wait(), at which point on_message can update
242             # job state and process output callbacks.
243
244             for runnable in jobiter:
245                 if runnable:
246                     runnable.run(**kwargs)
247                 else:
248                     if self.processes:
249                         self.cond.wait(1)
250                     else:
251                         logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
252                         break
253
254             while self.processes:
255                 self.cond.wait(1)
256
257         except UnsupportedRequirement:
258             raise
259         except:
260             if sys.exc_info()[0] is KeyboardInterrupt:
261                 logger.error("Interrupted, marking pipeline as failed")
262             else:
263                 logger.error("Caught unhandled exception, marking pipeline as failed.  Error was: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
264             if self.pipeline:
265                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
266                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
267             if runnerjob and runnerjob.uuid and self.work_api == "containers":
268                 self.api.container_requests().update(uuid=runnerjob.uuid,
269                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
270         finally:
271             self.cond.release()
272             self.stop_polling.set()
273             self.polling_thread.join()
274
275         if self.final_status == "UnsupportedRequirement":
276             raise UnsupportedRequirement("Check log for details.")
277
278         if self.final_status != "success":
279             raise WorkflowException("Workflow failed.")
280
281         if self.final_output is None:
282             raise WorkflowException("Workflow did not return a result.")
283
284         if kwargs.get("compute_checksum"):
285             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
286
287         return self.final_output
288
289
290 def versionstring():
291     """Print version string of key packages for provenance and debugging."""
292
293     arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
294     arvpkg = pkg_resources.require("arvados-python-client")
295     cwlpkg = pkg_resources.require("cwltool")
296
297     return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
298                                     "arvados-python-client", arvpkg[0].version,
299                                     "cwltool", cwlpkg[0].version)
300
301
302 def arg_parser():  # type: () -> argparse.ArgumentParser
303     parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
304
305     parser.add_argument("--basedir", type=str,
306                         help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
307     parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
308                         help="Output directory, default current directory")
309
310     parser.add_argument("--eval-timeout",
311                         help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
312                         type=float,
313                         default=20)
314     parser.add_argument("--version", action="store_true", help="Print version and exit")
315
316     exgroup = parser.add_mutually_exclusive_group()
317     exgroup.add_argument("--verbose", action="store_true", help="Default logging")
318     exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
319     exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
320
321     parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
322
323     exgroup = parser.add_mutually_exclusive_group()
324     exgroup.add_argument("--enable-reuse", action="store_true",
325                         default=True, dest="enable_reuse",
326                         help="")
327     exgroup.add_argument("--disable-reuse", action="store_false",
328                         default=True, dest="enable_reuse",
329                         help="")
330
331     parser.add_argument("--project-uuid", type=str, help="Project that will own the workflow jobs, if not provided, will go to home project.")
332     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
333                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
334                         default=False)
335
336     exgroup = parser.add_mutually_exclusive_group()
337     exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
338                         default=True, dest="submit")
339     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
340                         default=True, dest="submit")
341     exgroup.add_argument("--create-template", action="store_true", help="Create an Arvados pipeline template.")
342     exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow.")
343     exgroup.add_argument("--update-workflow", type=str, help="Update Arvados workflow.")
344
345     exgroup = parser.add_mutually_exclusive_group()
346     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
347                         default=True, dest="wait")
348     exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
349                         default=True, dest="wait")
350
351     parser.add_argument("--api", type=str,
352                         default=None, dest="work_api",
353                         help="Select work submission API, one of 'jobs' or 'containers'.")
354
355     parser.add_argument("--compute-checksum", action="store_true", default=False,
356                         help="Compute checksum of contents while collecting outputs",
357                         dest="compute_checksum")
358
359     parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
360     parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
361
362     return parser
363
364
365 def main(args, stdout, stderr, api_client=None):
366     parser = arg_parser()
367
368     job_order_object = None
369     arvargs = parser.parse_args(args)
370     if (arvargs.create_template or arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
371         job_order_object = ({}, "")
372
373     try:
374         if api_client is None:
375             api_client=arvados.api('v1', model=OrderedJsonModel())
376         runner = ArvCwlRunner(api_client, work_api=arvargs.work_api)
377     except Exception as e:
378         logger.error(e)
379         return 1
380
381     arvargs.conformance_test = None
382     arvargs.use_container = True
383
384     return cwltool.main.main(args=arvargs,
385                              stdout=stdout,
386                              stderr=stderr,
387                              executor=runner.arvExecutor,
388                              makeTool=runner.arvMakeTool,
389                              versionfunc=versionstring,
390                              job_order_object=job_order_object,
391                              make_fs_access=partial(CollectionFsAccess, api_client=api_client))