Merge branch '9615-cwl-writable-unsupported' refs #9615
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2
3 # Implement cwl-runner interface for submitting and running work on Arvados, using
4 # either the Crunch jobs API or Crunch containers API.
5
6 import argparse
7 import logging
8 import os
9 import sys
10 import threading
11 import hashlib
12 from functools import partial
13 import pkg_resources  # part of setuptools
14
15 from cwltool.errors import WorkflowException
16 import cwltool.main
17 import cwltool.workflow
18
19 import arvados
20 import arvados.events
21 import arvados.config
22
23 from .arvcontainer import ArvadosContainer, RunnerContainer
24 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
25 from .arvtool import ArvadosCommandTool
26 from .fsaccess import CollectionFsAccess
27
28 from cwltool.process import shortname, UnsupportedRequirement
29 from cwltool.pathmapper import adjustFileObjs
30 from cwltool.draft2tool import compute_checksums
31 from arvados.api import OrderedJsonModel
32
33 logger = logging.getLogger('arvados.cwl-runner')
34 logger.setLevel(logging.INFO)
35
36 class ArvCwlRunner(object):
37     """Execute a CWL tool or workflow, submit work (using either jobs or
38     containers API), wait for them to complete, and report output.
39
40     """
41
42     def __init__(self, api_client, work_api=None):
43         self.api = api_client
44         self.processes = {}
45         self.lock = threading.Lock()
46         self.cond = threading.Condition(self.lock)
47         self.final_output = None
48         self.final_status = None
49         self.uploaded = {}
50         self.num_retries = 4
51         self.uuid = None
52         self.work_api = work_api
53
54         if self.work_api is None:
55             # todo: autodetect API to use.
56             self.work_api = "jobs"
57
58         if self.work_api not in ("containers", "jobs"):
59             raise Exception("Unsupported API '%s'" % self.work_api)
60
61     def arvMakeTool(self, toolpath_object, **kwargs):
62         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
63             return ArvadosCommandTool(self, toolpath_object, work_api=self.work_api, **kwargs)
64         else:
65             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
66
67     def output_callback(self, out, processStatus):
68         if processStatus == "success":
69             logger.info("Overall process status is %s", processStatus)
70             if self.pipeline:
71                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
72                                                      body={"state": "Complete"}).execute(num_retries=self.num_retries)
73         else:
74             logger.warn("Overall process status is %s", processStatus)
75             if self.pipeline:
76                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
77                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
78         self.final_status = processStatus
79         self.final_output = out
80
81     def on_message(self, event):
82         if "object_uuid" in event:
83             if event["object_uuid"] in self.processes and event["event_type"] == "update":
84                 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
85                     uuid = event["object_uuid"]
86                     with self.lock:
87                         j = self.processes[uuid]
88                         logger.info("Job %s (%s) is Running", j.name, uuid)
89                         j.running = True
90                         j.update_pipeline_component(event["properties"]["new_attributes"])
91                 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
92                     uuid = event["object_uuid"]
93                     try:
94                         self.cond.acquire()
95                         j = self.processes[uuid]
96                         logger.info("Job %s (%s) is %s", j.name, uuid, event["properties"]["new_attributes"]["state"])
97                         j.done(event["properties"]["new_attributes"])
98                         self.cond.notify()
99                     finally:
100                         self.cond.release()
101
102     def get_uploaded(self):
103         return self.uploaded.copy()
104
105     def add_uploaded(self, src, pair):
106         self.uploaded[src] = pair
107
108     def check_writable(self, obj):
109         if isinstance(obj, dict):
110             if obj.get("writable"):
111                 raise UnsupportedRequirement("InitialWorkDir feature 'writable: true' not supported")
112             for v in obj.itervalues():
113                 self.check_writable(v)
114         if isinstance(obj, list):
115             for v in obj:
116                 self.check_writable(v)
117
118     def arvExecutor(self, tool, job_order, **kwargs):
119         self.debug = kwargs.get("debug")
120
121         tool.visit(self.check_writable)
122
123         if kwargs.get("quiet"):
124             logger.setLevel(logging.WARN)
125             logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
126
127         useruuid = self.api.users().current().execute()["uuid"]
128         self.project_uuid = kwargs.get("project_uuid") if kwargs.get("project_uuid") else useruuid
129         self.pipeline = None
130         make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess, api_client=self.api)
131         self.fs_access = make_fs_access(kwargs["basedir"])
132
133         if kwargs.get("create_template"):
134             tmpl = RunnerTemplate(self, tool, job_order, kwargs.get("enable_reuse"))
135             tmpl.save()
136             # cwltool.main will write our return value to stdout.
137             return tmpl.uuid
138
139         self.debug = kwargs.get("debug")
140         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
141
142         kwargs["make_fs_access"] = make_fs_access
143         kwargs["enable_reuse"] = kwargs.get("enable_reuse")
144         kwargs["use_container"] = True
145         kwargs["tmpdir_prefix"] = "tmp"
146         kwargs["on_error"] = "continue"
147         kwargs["compute_checksum"] = kwargs.get("compute_checksum")
148
149         if self.work_api == "containers":
150             kwargs["outdir"] = "/var/spool/cwl"
151             kwargs["docker_outdir"] = "/var/spool/cwl"
152             kwargs["tmpdir"] = "/tmp"
153         elif self.work_api == "jobs":
154             kwargs["outdir"] = "$(task.outdir)"
155             kwargs["docker_outdir"] = "$(task.outdir)"
156             kwargs["tmpdir"] = "$(task.tmpdir)"
157
158         runnerjob = None
159         if kwargs.get("submit"):
160             if self.work_api == "containers":
161                 if tool.tool["class"] == "CommandLineTool":
162                     runnerjob = tool.job(job_order,
163                                          self.output_callback,
164                                          **kwargs).next()
165                 else:
166                     runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"))
167             else:
168                 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"))
169
170         if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and not self.work_api == "containers":
171             # Create pipeline for local run
172             self.pipeline = self.api.pipeline_instances().create(
173                 body={
174                     "owner_uuid": self.project_uuid,
175                     "name": shortname(tool.tool["id"]),
176                     "components": {},
177                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
178             logger.info("Pipeline instance %s", self.pipeline["uuid"])
179
180         if runnerjob and not kwargs.get("wait"):
181             runnerjob.run()
182             return runnerjob.uuid
183
184         arvados.config.settings()["ARVADOS_DISABLE_WEBSOCKETS"] = "1"
185
186         if self.work_api == "containers":
187             events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#container"]], self.on_message)
188         if self.work_api == "jobs":
189             events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#job"]], self.on_message)
190
191         if runnerjob:
192             jobiter = iter((runnerjob,))
193         else:
194             if "cwl_runner_job" in kwargs:
195                 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
196             jobiter = tool.job(job_order,
197                                self.output_callback,
198                                **kwargs)
199
200         try:
201             self.cond.acquire()
202             # Will continue to hold the lock for the duration of this code
203             # except when in cond.wait(), at which point on_message can update
204             # job state and process output callbacks.
205
206             for runnable in jobiter:
207                 if runnable:
208                     runnable.run(**kwargs)
209                 else:
210                     if self.processes:
211                         self.cond.wait(1)
212                     else:
213                         logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
214                         break
215
216             while self.processes:
217                 self.cond.wait(1)
218
219             events.close()
220         except UnsupportedRequirement:
221             raise
222         except:
223             if sys.exc_info()[0] is KeyboardInterrupt:
224                 logger.error("Interrupted, marking pipeline as failed")
225             else:
226                 logger.error("Caught unhandled exception, marking pipeline as failed.  Error was: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
227             if self.pipeline:
228                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
229                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
230             if runnerjob and runnerjob.uuid and self.work_api == "containers":
231                 self.api.container_requests().update(uuid=runnerjob.uuid,
232                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
233         finally:
234             self.cond.release()
235
236         if self.final_status == "UnsupportedRequirement":
237             raise UnsupportedRequirement("Check log for details.")
238
239         if self.final_status != "success":
240             raise WorkflowException("Workflow failed.")
241
242         if self.final_output is None:
243             raise WorkflowException("Workflow did not return a result.")
244
245         if kwargs.get("compute_checksum"):
246             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
247
248         return self.final_output
249
250
251 def versionstring():
252     """Print version string of key packages for provenance and debugging."""
253
254     arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
255     arvpkg = pkg_resources.require("arvados-python-client")
256     cwlpkg = pkg_resources.require("cwltool")
257
258     return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
259                                     "arvados-python-client", arvpkg[0].version,
260                                     "cwltool", cwlpkg[0].version)
261
262
263 def arg_parser():  # type: () -> argparse.ArgumentParser
264     parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
265
266     parser.add_argument("--basedir", type=str,
267                         help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
268     parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
269                         help="Output directory, default current directory")
270
271     parser.add_argument("--eval-timeout",
272                         help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
273                         type=float,
274                         default=20)
275     parser.add_argument("--version", action="store_true", help="Print version and exit")
276
277     exgroup = parser.add_mutually_exclusive_group()
278     exgroup.add_argument("--verbose", action="store_true", help="Default logging")
279     exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
280     exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
281
282     parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
283
284     exgroup = parser.add_mutually_exclusive_group()
285     exgroup.add_argument("--enable-reuse", action="store_true",
286                         default=True, dest="enable_reuse",
287                         help="")
288     exgroup.add_argument("--disable-reuse", action="store_false",
289                         default=True, dest="enable_reuse",
290                         help="")
291
292     parser.add_argument("--project-uuid", type=str, help="Project that will own the workflow jobs, if not provided, will go to home project.")
293     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
294                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
295                         default=False)
296
297     exgroup = parser.add_mutually_exclusive_group()
298     exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
299                         default=True, dest="submit")
300     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
301                         default=True, dest="submit")
302     exgroup.add_argument("--create-template", action="store_true", help="Create an Arvados pipeline template.")
303
304     exgroup = parser.add_mutually_exclusive_group()
305     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
306                         default=True, dest="wait")
307     exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
308                         default=True, dest="wait")
309
310     parser.add_argument("--api", type=str,
311                         default=None, dest="work_api",
312                         help="Select work submission API, one of 'jobs' or 'containers'.")
313
314     parser.add_argument("--compute-checksum", action="store_true", default=False,
315                         help="Compute checksum of contents while collecting outputs",
316                         dest="compute_checksum")
317
318     parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
319     parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
320
321     return parser
322
323
324 def main(args, stdout, stderr, api_client=None):
325     parser = arg_parser()
326
327     job_order_object = None
328     arvargs = parser.parse_args(args)
329     if arvargs.create_template and not arvargs.job_order:
330         job_order_object = ({}, "")
331
332     try:
333         if api_client is None:
334             api_client=arvados.api('v1', model=OrderedJsonModel())
335         runner = ArvCwlRunner(api_client, work_api=arvargs.work_api)
336     except Exception as e:
337         logger.error(e)
338         return 1
339
340     arvargs.conformance_test = None
341     arvargs.use_container = True
342
343     return cwltool.main.main(args=arvargs,
344                              stdout=stdout,
345                              stderr=stderr,
346                              executor=runner.arvExecutor,
347                              makeTool=runner.arvMakeTool,
348                              versionfunc=versionstring,
349                              job_order_object=job_order_object,
350                              make_fs_access=partial(CollectionFsAccess, api_client=api_client))