10081: Custom extension of v1.0 schema to recognize RunInSingleContainer.
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2
3 # Implement cwl-runner interface for submitting and running work on Arvados, using
4 # either the Crunch jobs API or Crunch containers API.
5
6 import argparse
7 import logging
8 import os
9 import sys
10 import threading
11 import hashlib
12 from functools import partial
13 import pkg_resources  # part of setuptools
14
15 from cwltool.errors import WorkflowException
16 import cwltool.main
17 import cwltool.workflow
18 import schema_salad
19
20 import arvados
21 import arvados.config
22
23 from .arvcontainer import ArvadosContainer, RunnerContainer
24 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
25 from .arvtool import ArvadosCommandTool
26 from .arvworkflow import ArvadosWorkflow
27 from .fsaccess import CollectionFsAccess
28 from .arvworkflow import upload_workflow
29 from .perf import Perf
30 from cwltool.pack import pack
31
32 from cwltool.process import shortname, UnsupportedRequirement
33 from cwltool.pathmapper import adjustFileObjs
34 from cwltool.draft2tool import compute_checksums
35 from arvados.api import OrderedJsonModel
36
37 logger = logging.getLogger('arvados.cwl-runner')
38 logger.setLevel(logging.INFO)
39
40 class ArvCwlRunner(object):
41     """Execute a CWL tool or workflow, submit work (using either jobs or
42     containers API), wait for them to complete, and report output.
43
44     """
45
46     def __init__(self, api_client, work_api=None):
47         self.api = api_client
48         self.processes = {}
49         self.lock = threading.Lock()
50         self.cond = threading.Condition(self.lock)
51         self.final_output = None
52         self.final_status = None
53         self.uploaded = {}
54         self.num_retries = 4
55         self.uuid = None
56         self.work_api = work_api
57         self.stop_polling = threading.Event()
58         self.poll_api = None
59
60         if self.work_api is None:
61             # todo: autodetect API to use.
62             self.work_api = "jobs"
63
64         if self.work_api not in ("containers", "jobs"):
65             raise Exception("Unsupported API '%s'" % self.work_api)
66
67     def arv_make_tool(self, toolpath_object, **kwargs):
68         kwargs["work_api"] = self.work_api
69         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
70             return ArvadosCommandTool(self, toolpath_object, **kwargs)
71         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
72             return ArvadosWorkflow(self, toolpath_object, **kwargs)
73         else:
74             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
75
76     def output_callback(self, out, processStatus):
77         if processStatus == "success":
78             logger.info("Overall process status is %s", processStatus)
79             if self.pipeline:
80                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
81                                                      body={"state": "Complete"}).execute(num_retries=self.num_retries)
82         else:
83             logger.warn("Overall process status is %s", processStatus)
84             if self.pipeline:
85                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
86                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
87         self.final_status = processStatus
88         self.final_output = out
89
90     def on_message(self, event):
91         if "object_uuid" in event:
92             if event["object_uuid"] in self.processes and event["event_type"] == "update":
93                 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
94                     uuid = event["object_uuid"]
95                     with self.lock:
96                         j = self.processes[uuid]
97                         logger.info("Job %s (%s) is Running", j.name, uuid)
98                         j.running = True
99                         j.update_pipeline_component(event["properties"]["new_attributes"])
100                 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
101                     uuid = event["object_uuid"]
102                     try:
103                         self.cond.acquire()
104                         j = self.processes[uuid]
105                         logger.info("Job %s (%s) is %s", j.name, uuid, event["properties"]["new_attributes"]["state"])
106                         with Perf(logger, "done %s" % j.name):
107                             j.done(event["properties"]["new_attributes"])
108                         self.cond.notify()
109                     finally:
110                         self.cond.release()
111
112     def poll_states(self):
113         """Poll status of jobs or containers listed in the processes dict.
114
115         Runs in a separate thread.
116         """
117
118         while True:
119             self.stop_polling.wait(15)
120             if self.stop_polling.is_set():
121                 break
122             with self.lock:
123                 keys = self.processes.keys()
124             if not keys:
125                 continue
126
127             if self.work_api == "containers":
128                 table = self.poll_api.containers()
129             elif self.work_api == "jobs":
130                 table = self.poll_api.jobs()
131
132             try:
133                 proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
134             except Exception as e:
135                 logger.warn("Error checking states on API server: %s", e)
136                 continue
137
138             for p in proc_states["items"]:
139                 self.on_message({
140                     "object_uuid": p["uuid"],
141                     "event_type": "update",
142                     "properties": {
143                         "new_attributes": p
144                     }
145                 })
146
147     def get_uploaded(self):
148         return self.uploaded.copy()
149
150     def add_uploaded(self, src, pair):
151         self.uploaded[src] = pair
152
153     def check_writable(self, obj):
154         if isinstance(obj, dict):
155             if obj.get("writable"):
156                 raise UnsupportedRequirement("InitialWorkDir feature 'writable: true' not supported")
157             for v in obj.itervalues():
158                 self.check_writable(v)
159         if isinstance(obj, list):
160             for v in obj:
161                 self.check_writable(v)
162
163     def arv_executor(self, tool, job_order, **kwargs):
164         self.debug = kwargs.get("debug")
165
166         tool.visit(self.check_writable)
167
168         useruuid = self.api.users().current().execute()["uuid"]
169         self.project_uuid = kwargs.get("project_uuid") if kwargs.get("project_uuid") else useruuid
170         self.pipeline = None
171         make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess, api_client=self.api)
172         self.fs_access = make_fs_access(kwargs["basedir"])
173
174         if kwargs.get("create_template"):
175             tmpl = RunnerTemplate(self, tool, job_order, kwargs.get("enable_reuse"))
176             tmpl.save()
177             # cwltool.main will write our return value to stdout.
178             return tmpl.uuid
179
180         if kwargs.get("create_workflow") or kwargs.get("update_workflow"):
181             return upload_workflow(self, tool, job_order, self.project_uuid, kwargs.get("update_workflow"))
182
183         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
184
185         kwargs["make_fs_access"] = make_fs_access
186         kwargs["enable_reuse"] = kwargs.get("enable_reuse")
187         kwargs["use_container"] = True
188         kwargs["tmpdir_prefix"] = "tmp"
189         kwargs["on_error"] = "continue"
190         kwargs["compute_checksum"] = kwargs.get("compute_checksum")
191
192         if self.work_api == "containers":
193             kwargs["outdir"] = "/var/spool/cwl"
194             kwargs["docker_outdir"] = "/var/spool/cwl"
195             kwargs["tmpdir"] = "/tmp"
196             kwargs["docker_tmpdir"] = "/tmp"
197         elif self.work_api == "jobs":
198             kwargs["outdir"] = "$(task.outdir)"
199             kwargs["docker_outdir"] = "$(task.outdir)"
200             kwargs["tmpdir"] = "$(task.tmpdir)"
201
202         runnerjob = None
203         if kwargs.get("submit"):
204             if self.work_api == "containers":
205                 if tool.tool["class"] == "CommandLineTool":
206                     runnerjob = tool.job(job_order,
207                                          self.output_callback,
208                                          **kwargs).next()
209                 else:
210                     runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"))
211             else:
212                 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"))
213
214         if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and not self.work_api == "containers":
215             # Create pipeline for local run
216             self.pipeline = self.api.pipeline_instances().create(
217                 body={
218                     "owner_uuid": self.project_uuid,
219                     "name": shortname(tool.tool["id"]),
220                     "components": {},
221                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
222             logger.info("Pipeline instance %s", self.pipeline["uuid"])
223
224         if runnerjob and not kwargs.get("wait"):
225             runnerjob.run()
226             return runnerjob.uuid
227
228         self.poll_api = arvados.api('v1')
229         self.polling_thread = threading.Thread(target=self.poll_states)
230         self.polling_thread.start()
231
232         if runnerjob:
233             jobiter = iter((runnerjob,))
234         else:
235             if "cwl_runner_job" in kwargs:
236                 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
237             jobiter = tool.job(job_order,
238                                self.output_callback,
239                                **kwargs)
240
241         try:
242             self.cond.acquire()
243             # Will continue to hold the lock for the duration of this code
244             # except when in cond.wait(), at which point on_message can update
245             # job state and process output callbacks.
246
247             for runnable in jobiter:
248                 if runnable:
249                     with Perf(logger, "run"):
250                         runnable.run(**kwargs)
251                 else:
252                     if self.processes:
253                         self.cond.wait(1)
254                     else:
255                         logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
256                         break
257
258             while self.processes:
259                 self.cond.wait(1)
260
261         except UnsupportedRequirement:
262             raise
263         except:
264             if sys.exc_info()[0] is KeyboardInterrupt:
265                 logger.error("Interrupted, marking pipeline as failed")
266             else:
267                 logger.error("Caught unhandled exception, marking pipeline as failed.  Error was: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
268             if self.pipeline:
269                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
270                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
271             if runnerjob and runnerjob.uuid and self.work_api == "containers":
272                 self.api.container_requests().update(uuid=runnerjob.uuid,
273                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
274         finally:
275             self.cond.release()
276             self.stop_polling.set()
277             self.polling_thread.join()
278
279         if self.final_status == "UnsupportedRequirement":
280             raise UnsupportedRequirement("Check log for details.")
281
282         if self.final_status != "success":
283             raise WorkflowException("Workflow failed.")
284
285         if self.final_output is None:
286             raise WorkflowException("Workflow did not return a result.")
287
288         if kwargs.get("compute_checksum"):
289             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
290
291         return self.final_output
292
293
294 def versionstring():
295     """Print version string of key packages for provenance and debugging."""
296
297     arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
298     arvpkg = pkg_resources.require("arvados-python-client")
299     cwlpkg = pkg_resources.require("cwltool")
300
301     return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
302                                     "arvados-python-client", arvpkg[0].version,
303                                     "cwltool", cwlpkg[0].version)
304
305
306 def arg_parser():  # type: () -> argparse.ArgumentParser
307     parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
308
309     parser.add_argument("--basedir", type=str,
310                         help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
311     parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
312                         help="Output directory, default current directory")
313
314     parser.add_argument("--eval-timeout",
315                         help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
316                         type=float,
317                         default=20)
318     parser.add_argument("--version", action="store_true", help="Print version and exit")
319
320     exgroup = parser.add_mutually_exclusive_group()
321     exgroup.add_argument("--verbose", action="store_true", help="Default logging")
322     exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
323     exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
324
325     parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
326
327     exgroup = parser.add_mutually_exclusive_group()
328     exgroup.add_argument("--enable-reuse", action="store_true",
329                         default=True, dest="enable_reuse",
330                         help="")
331     exgroup.add_argument("--disable-reuse", action="store_false",
332                         default=True, dest="enable_reuse",
333                         help="")
334
335     parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
336     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
337                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
338                         default=False)
339
340     exgroup = parser.add_mutually_exclusive_group()
341     exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
342                         default=True, dest="submit")
343     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
344                         default=True, dest="submit")
345     exgroup.add_argument("--create-template", action="store_true", help="Create an Arvados pipeline template.")
346     exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow.")
347     exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update existing Arvados workflow with uuid.")
348
349     exgroup = parser.add_mutually_exclusive_group()
350     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
351                         default=True, dest="wait")
352     exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
353                         default=True, dest="wait")
354
355     parser.add_argument("--api", type=str,
356                         default=None, dest="work_api",
357                         help="Select work submission API, one of 'jobs' or 'containers'.")
358
359     parser.add_argument("--compute-checksum", action="store_true", default=False,
360                         help="Compute checksum of contents while collecting outputs",
361                         dest="compute_checksum")
362
363     parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
364     parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
365
366     return parser
367
368 def add_arv_hints():
369     cache = {}
370     res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
371     cache["https://w3id.org/cwl/arv-cwl-schema.yml"] = res.read()
372     res.close()
373     _, cwlnames, _, _ = cwltool.process.get_schema("v1.0")
374     _, extnames, _, _ = schema_salad.schema.load_schema("https://w3id.org/cwl/arv-cwl-schema.yml", cache=cache)
375     for n in extnames.names:
376         cwlnames.add_name("http://arvados.org/cwl#"+n, "", extnames.get_name(n, ""))
377
378 def main(args, stdout, stderr, api_client=None):
379     parser = arg_parser()
380
381     job_order_object = None
382     arvargs = parser.parse_args(args)
383     if (arvargs.create_template or arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
384         job_order_object = ({}, "")
385
386     add_arv_hints()
387
388     try:
389         if api_client is None:
390             api_client=arvados.api('v1', model=OrderedJsonModel())
391         runner = ArvCwlRunner(api_client, work_api=arvargs.work_api)
392     except Exception as e:
393         logger.error(e)
394         return 1
395
396     if arvargs.debug:
397         logger.setLevel(logging.DEBUG)
398
399     if arvargs.quiet:
400         logger.setLevel(logging.WARN)
401         logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
402
403     arvargs.conformance_test = None
404     arvargs.use_container = True
405
406     return cwltool.main.main(args=arvargs,
407                              stdout=stdout,
408                              stderr=stderr,
409                              executor=runner.arv_executor,
410                              makeTool=runner.arv_make_tool,
411                              versionfunc=versionstring,
412                              job_order_object=job_order_object,
413                              make_fs_access=partial(CollectionFsAccess, api_client=api_client))