10088: Updated cwltool deduplicates dependencies to avoid creating separate
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2
3 # Implement cwl-runner interface for submitting and running work on Arvados, using
4 # either the Crunch jobs API or Crunch containers API.
5
6 import argparse
7 import logging
8 import os
9 import sys
10 import threading
11 import hashlib
12 from functools import partial
13 import pkg_resources  # part of setuptools
14
15 from cwltool.errors import WorkflowException
16 import cwltool.main
17 import cwltool.workflow
18
19 import arvados
20 import arvados.config
21
22 from .arvcontainer import ArvadosContainer, RunnerContainer
23 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
24 from .arvtool import ArvadosCommandTool
25 from .fsaccess import CollectionFsAccess
26 from .arvworkflow import make_workflow
27 from .perf import Perf
28
29 from cwltool.process import shortname, UnsupportedRequirement
30 from cwltool.pathmapper import adjustFileObjs
31 from cwltool.draft2tool import compute_checksums
32 from arvados.api import OrderedJsonModel
33
34 logger = logging.getLogger('arvados.cwl-runner')
35 logger.setLevel(logging.INFO)
36
37 class ArvCwlRunner(object):
38     """Execute a CWL tool or workflow, submit work (using either jobs or
39     containers API), wait for them to complete, and report output.
40
41     """
42
43     def __init__(self, api_client, work_api=None):
44         self.api = api_client
45         self.processes = {}
46         self.lock = threading.Lock()
47         self.cond = threading.Condition(self.lock)
48         self.final_output = None
49         self.final_status = None
50         self.uploaded = {}
51         self.num_retries = 4
52         self.uuid = None
53         self.work_api = work_api
54         self.stop_polling = threading.Event()
55         self.poll_api = None
56
57         if self.work_api is None:
58             # todo: autodetect API to use.
59             self.work_api = "jobs"
60
61         if self.work_api not in ("containers", "jobs"):
62             raise Exception("Unsupported API '%s'" % self.work_api)
63
64     def arvMakeTool(self, toolpath_object, **kwargs):
65         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
66             kwargs["work_api"] = self.work_api
67             return ArvadosCommandTool(self, toolpath_object, **kwargs)
68         else:
69             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
70
71     def output_callback(self, out, processStatus):
72         if processStatus == "success":
73             logger.info("Overall process status is %s", processStatus)
74             if self.pipeline:
75                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
76                                                      body={"state": "Complete"}).execute(num_retries=self.num_retries)
77         else:
78             logger.warn("Overall process status is %s", processStatus)
79             if self.pipeline:
80                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
81                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
82         self.final_status = processStatus
83         self.final_output = out
84
85     def on_message(self, event):
86         if "object_uuid" in event:
87             if event["object_uuid"] in self.processes and event["event_type"] == "update":
88                 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
89                     uuid = event["object_uuid"]
90                     with self.lock:
91                         j = self.processes[uuid]
92                         logger.info("Job %s (%s) is Running", j.name, uuid)
93                         j.running = True
94                         j.update_pipeline_component(event["properties"]["new_attributes"])
95                 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
96                     uuid = event["object_uuid"]
97                     try:
98                         self.cond.acquire()
99                         j = self.processes[uuid]
100                         logger.info("Job %s (%s) is %s", j.name, uuid, event["properties"]["new_attributes"]["state"])
101                         with Perf(logger, "done %s" % j.name):
102                             j.done(event["properties"]["new_attributes"])
103                         self.cond.notify()
104                     finally:
105                         self.cond.release()
106
107     def poll_states(self):
108         """Poll status of jobs or containers listed in the processes dict.
109
110         Runs in a separate thread.
111         """
112
113         while True:
114             self.stop_polling.wait(15)
115             if self.stop_polling.is_set():
116                 break
117             with self.lock:
118                 keys = self.processes.keys()
119             if not keys:
120                 continue
121
122             if self.work_api == "containers":
123                 table = self.poll_api.containers()
124             elif self.work_api == "jobs":
125                 table = self.poll_api.jobs()
126
127             try:
128                 proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
129             except Exception as e:
130                 logger.warn("Error checking states on API server: %s", e)
131                 continue
132
133             for p in proc_states["items"]:
134                 self.on_message({
135                     "object_uuid": p["uuid"],
136                     "event_type": "update",
137                     "properties": {
138                         "new_attributes": p
139                     }
140                 })
141
142     def get_uploaded(self):
143         return self.uploaded.copy()
144
145     def add_uploaded(self, src, pair):
146         self.uploaded[src] = pair
147
148     def check_writable(self, obj):
149         if isinstance(obj, dict):
150             if obj.get("writable"):
151                 raise UnsupportedRequirement("InitialWorkDir feature 'writable: true' not supported")
152             for v in obj.itervalues():
153                 self.check_writable(v)
154         if isinstance(obj, list):
155             for v in obj:
156                 self.check_writable(v)
157
158     def arvExecutor(self, tool, job_order, **kwargs):
159         self.debug = kwargs.get("debug")
160
161         tool.visit(self.check_writable)
162
163         if kwargs.get("quiet"):
164             logger.setLevel(logging.WARN)
165             logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
166
167         if self.debug:
168             logger.setLevel(logging.DEBUG)
169
170         useruuid = self.api.users().current().execute()["uuid"]
171         self.project_uuid = kwargs.get("project_uuid") if kwargs.get("project_uuid") else useruuid
172         self.pipeline = None
173         make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess, api_client=self.api)
174         self.fs_access = make_fs_access(kwargs["basedir"])
175
176         if kwargs.get("create_template"):
177             tmpl = RunnerTemplate(self, tool, job_order, kwargs.get("enable_reuse"))
178             tmpl.save()
179             # cwltool.main will write our return value to stdout.
180             return tmpl.uuid
181
182         if kwargs.get("create_workflow") or kwargs.get("update_workflow"):
183             return make_workflow(self, tool, job_order, self.project_uuid, kwargs.get("update_workflow"))
184
185         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
186
187         kwargs["make_fs_access"] = make_fs_access
188         kwargs["enable_reuse"] = kwargs.get("enable_reuse")
189         kwargs["use_container"] = True
190         kwargs["tmpdir_prefix"] = "tmp"
191         kwargs["on_error"] = "continue"
192         kwargs["compute_checksum"] = kwargs.get("compute_checksum")
193
194         if self.work_api == "containers":
195             kwargs["outdir"] = "/var/spool/cwl"
196             kwargs["docker_outdir"] = "/var/spool/cwl"
197             kwargs["tmpdir"] = "/tmp"
198             kwargs["docker_tmpdir"] = "/tmp"
199         elif self.work_api == "jobs":
200             kwargs["outdir"] = "$(task.outdir)"
201             kwargs["docker_outdir"] = "$(task.outdir)"
202             kwargs["tmpdir"] = "$(task.tmpdir)"
203
204         runnerjob = None
205         if kwargs.get("submit"):
206             if self.work_api == "containers":
207                 if tool.tool["class"] == "CommandLineTool":
208                     runnerjob = tool.job(job_order,
209                                          self.output_callback,
210                                          **kwargs).next()
211                 else:
212                     runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"))
213             else:
214                 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"))
215
216         if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and not self.work_api == "containers":
217             # Create pipeline for local run
218             self.pipeline = self.api.pipeline_instances().create(
219                 body={
220                     "owner_uuid": self.project_uuid,
221                     "name": shortname(tool.tool["id"]),
222                     "components": {},
223                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
224             logger.info("Pipeline instance %s", self.pipeline["uuid"])
225
226         if runnerjob and not kwargs.get("wait"):
227             runnerjob.run()
228             return runnerjob.uuid
229
230         self.poll_api = arvados.api('v1')
231         self.polling_thread = threading.Thread(target=self.poll_states)
232         self.polling_thread.start()
233
234         if runnerjob:
235             jobiter = iter((runnerjob,))
236         else:
237             if "cwl_runner_job" in kwargs:
238                 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
239             jobiter = tool.job(job_order,
240                                self.output_callback,
241                                **kwargs)
242
243         try:
244             self.cond.acquire()
245             # Will continue to hold the lock for the duration of this code
246             # except when in cond.wait(), at which point on_message can update
247             # job state and process output callbacks.
248
249             for runnable in jobiter:
250                 if runnable:
251                     with Perf(logger, "run"):
252                         runnable.run(**kwargs)
253                 else:
254                     if self.processes:
255                         self.cond.wait(1)
256                     else:
257                         logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
258                         break
259
260             while self.processes:
261                 self.cond.wait(1)
262
263         except UnsupportedRequirement:
264             raise
265         except:
266             if sys.exc_info()[0] is KeyboardInterrupt:
267                 logger.error("Interrupted, marking pipeline as failed")
268             else:
269                 logger.error("Caught unhandled exception, marking pipeline as failed.  Error was: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
270             if self.pipeline:
271                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
272                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
273             if runnerjob and runnerjob.uuid and self.work_api == "containers":
274                 self.api.container_requests().update(uuid=runnerjob.uuid,
275                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
276         finally:
277             self.cond.release()
278             self.stop_polling.set()
279             self.polling_thread.join()
280
281         if self.final_status == "UnsupportedRequirement":
282             raise UnsupportedRequirement("Check log for details.")
283
284         if self.final_status != "success":
285             raise WorkflowException("Workflow failed.")
286
287         if self.final_output is None:
288             raise WorkflowException("Workflow did not return a result.")
289
290         if kwargs.get("compute_checksum"):
291             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
292
293         return self.final_output
294
295
296 def versionstring():
297     """Print version string of key packages for provenance and debugging."""
298
299     arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
300     arvpkg = pkg_resources.require("arvados-python-client")
301     cwlpkg = pkg_resources.require("cwltool")
302
303     return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
304                                     "arvados-python-client", arvpkg[0].version,
305                                     "cwltool", cwlpkg[0].version)
306
307
308 def arg_parser():  # type: () -> argparse.ArgumentParser
309     parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
310
311     parser.add_argument("--basedir", type=str,
312                         help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
313     parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
314                         help="Output directory, default current directory")
315
316     parser.add_argument("--eval-timeout",
317                         help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
318                         type=float,
319                         default=20)
320     parser.add_argument("--version", action="store_true", help="Print version and exit")
321
322     exgroup = parser.add_mutually_exclusive_group()
323     exgroup.add_argument("--verbose", action="store_true", help="Default logging")
324     exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
325     exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
326
327     parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
328
329     exgroup = parser.add_mutually_exclusive_group()
330     exgroup.add_argument("--enable-reuse", action="store_true",
331                         default=True, dest="enable_reuse",
332                         help="")
333     exgroup.add_argument("--disable-reuse", action="store_false",
334                         default=True, dest="enable_reuse",
335                         help="")
336
337     parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
338     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
339                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
340                         default=False)
341
342     exgroup = parser.add_mutually_exclusive_group()
343     exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
344                         default=True, dest="submit")
345     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
346                         default=True, dest="submit")
347     exgroup.add_argument("--create-template", action="store_true", help="Create an Arvados pipeline template.")
348     exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow.")
349     exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update existing Arvados workflow with uuid.")
350
351     exgroup = parser.add_mutually_exclusive_group()
352     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
353                         default=True, dest="wait")
354     exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
355                         default=True, dest="wait")
356
357     parser.add_argument("--api", type=str,
358                         default=None, dest="work_api",
359                         help="Select work submission API, one of 'jobs' or 'containers'.")
360
361     parser.add_argument("--compute-checksum", action="store_true", default=False,
362                         help="Compute checksum of contents while collecting outputs",
363                         dest="compute_checksum")
364
365     parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
366     parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
367
368     return parser
369
370
371 def main(args, stdout, stderr, api_client=None):
372     parser = arg_parser()
373
374     job_order_object = None
375     arvargs = parser.parse_args(args)
376     if (arvargs.create_template or arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
377         job_order_object = ({}, "")
378
379     try:
380         if api_client is None:
381             api_client=arvados.api('v1', model=OrderedJsonModel())
382         runner = ArvCwlRunner(api_client, work_api=arvargs.work_api)
383     except Exception as e:
384         logger.error(e)
385         return 1
386
387     arvargs.conformance_test = None
388     arvargs.use_container = True
389
390     return cwltool.main.main(args=arvargs,
391                              stdout=stdout,
392                              stderr=stderr,
393                              executor=runner.arvExecutor,
394                              makeTool=runner.arvMakeTool,
395                              versionfunc=versionstring,
396                              job_order_object=job_order_object,
397                              make_fs_access=partial(CollectionFsAccess, api_client=api_client))