Merge branch '10032-cwl-spinup' refs #10032
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2
3 # Implement cwl-runner interface for submitting and running work on Arvados, using
4 # either the Crunch jobs API or Crunch containers API.
5
6 import argparse
7 import logging
8 import os
9 import sys
10 import threading
11 import hashlib
12 from functools import partial
13 import pkg_resources  # part of setuptools
14
15 from cwltool.errors import WorkflowException
16 import cwltool.main
17 import cwltool.workflow
18 import schema_salad
19
20 import arvados
21 import arvados.config
22
23 from .arvcontainer import ArvadosContainer, RunnerContainer
24 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
25 from .arvtool import ArvadosCommandTool
26 from .arvworkflow import ArvadosWorkflow, upload_workflow
27 from .fsaccess import CollectionFsAccess
28 from .perf import Perf
29 from cwltool.pack import pack
30
31 from cwltool.process import shortname, UnsupportedRequirement
32 from cwltool.pathmapper import adjustFileObjs
33 from cwltool.draft2tool import compute_checksums
34 from arvados.api import OrderedJsonModel
35
36 logger = logging.getLogger('arvados.cwl-runner')
37 metrics = logging.getLogger('arvados.cwl-runner.metrics')
38 logger.setLevel(logging.INFO)
39
40
41 class ArvCwlRunner(object):
42     """Execute a CWL tool or workflow, submit work (using either jobs or
43     containers API), wait for them to complete, and report output.
44
45     """
46
47     def __init__(self, api_client, work_api=None):
48         self.api = api_client
49         self.processes = {}
50         self.lock = threading.Lock()
51         self.cond = threading.Condition(self.lock)
52         self.final_output = None
53         self.final_status = None
54         self.uploaded = {}
55         self.num_retries = 4
56         self.uuid = None
57         self.work_api = work_api
58         self.stop_polling = threading.Event()
59         self.poll_api = None
60         self.pipeline = None
61
62         if self.work_api is None:
63             # todo: autodetect API to use.
64             self.work_api = "jobs"
65
66         if self.work_api not in ("containers", "jobs"):
67             raise Exception("Unsupported API '%s'" % self.work_api)
68
69     def arv_make_tool(self, toolpath_object, **kwargs):
70         kwargs["work_api"] = self.work_api
71         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
72             return ArvadosCommandTool(self, toolpath_object, **kwargs)
73         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
74             return ArvadosWorkflow(self, toolpath_object, **kwargs)
75         else:
76             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
77
78     def output_callback(self, out, processStatus):
79         if processStatus == "success":
80             logger.info("Overall process status is %s", processStatus)
81             if self.pipeline:
82                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
83                                                      body={"state": "Complete"}).execute(num_retries=self.num_retries)
84         else:
85             logger.warn("Overall process status is %s", processStatus)
86             if self.pipeline:
87                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
88                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
89         self.final_status = processStatus
90         self.final_output = out
91
92     def on_message(self, event):
93         if "object_uuid" in event:
94             if event["object_uuid"] in self.processes and event["event_type"] == "update":
95                 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
96                     uuid = event["object_uuid"]
97                     with self.lock:
98                         j = self.processes[uuid]
99                         logger.info("Job %s (%s) is Running", j.name, uuid)
100                         j.running = True
101                         j.update_pipeline_component(event["properties"]["new_attributes"])
102                 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
103                     uuid = event["object_uuid"]
104                     try:
105                         self.cond.acquire()
106                         j = self.processes[uuid]
107                         logger.info("Job %s (%s) is %s", j.name, uuid, event["properties"]["new_attributes"]["state"])
108                         with Perf(metrics, "done %s" % j.name):
109                             j.done(event["properties"]["new_attributes"])
110                         self.cond.notify()
111                     finally:
112                         self.cond.release()
113
114     def poll_states(self):
115         """Poll status of jobs or containers listed in the processes dict.
116
117         Runs in a separate thread.
118         """
119
120         while True:
121             self.stop_polling.wait(15)
122             if self.stop_polling.is_set():
123                 break
124             with self.lock:
125                 keys = self.processes.keys()
126             if not keys:
127                 continue
128
129             if self.work_api == "containers":
130                 table = self.poll_api.containers()
131             elif self.work_api == "jobs":
132                 table = self.poll_api.jobs()
133
134             try:
135                 proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
136             except Exception as e:
137                 logger.warn("Error checking states on API server: %s", e)
138                 continue
139
140             for p in proc_states["items"]:
141                 self.on_message({
142                     "object_uuid": p["uuid"],
143                     "event_type": "update",
144                     "properties": {
145                         "new_attributes": p
146                     }
147                 })
148
149     def get_uploaded(self):
150         return self.uploaded.copy()
151
152     def add_uploaded(self, src, pair):
153         self.uploaded[src] = pair
154
155     def check_writable(self, obj):
156         if isinstance(obj, dict):
157             if obj.get("writable"):
158                 raise UnsupportedRequirement("InitialWorkDir feature 'writable: true' not supported")
159             for v in obj.itervalues():
160                 self.check_writable(v)
161         if isinstance(obj, list):
162             for v in obj:
163                 self.check_writable(v)
164
165     def arv_executor(self, tool, job_order, **kwargs):
166         self.debug = kwargs.get("debug")
167
168         tool.visit(self.check_writable)
169
170         useruuid = self.api.users().current().execute()["uuid"]
171         self.project_uuid = kwargs.get("project_uuid") if kwargs.get("project_uuid") else useruuid
172         self.pipeline = None
173         make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess, api_client=self.api)
174         self.fs_access = make_fs_access(kwargs["basedir"])
175
176         if kwargs.get("create_template"):
177             tmpl = RunnerTemplate(self, tool, job_order, kwargs.get("enable_reuse"))
178             tmpl.save()
179             # cwltool.main will write our return value to stdout.
180             return tmpl.uuid
181
182         if kwargs.get("create_workflow") or kwargs.get("update_workflow"):
183             return upload_workflow(self, tool, job_order, self.project_uuid, kwargs.get("update_workflow"))
184
185         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
186
187         kwargs["make_fs_access"] = make_fs_access
188         kwargs["enable_reuse"] = kwargs.get("enable_reuse")
189         kwargs["use_container"] = True
190         kwargs["tmpdir_prefix"] = "tmp"
191         kwargs["on_error"] = "continue"
192         kwargs["compute_checksum"] = kwargs.get("compute_checksum")
193
194         if self.work_api == "containers":
195             kwargs["outdir"] = "/var/spool/cwl"
196             kwargs["docker_outdir"] = "/var/spool/cwl"
197             kwargs["tmpdir"] = "/tmp"
198             kwargs["docker_tmpdir"] = "/tmp"
199         elif self.work_api == "jobs":
200             kwargs["outdir"] = "$(task.outdir)"
201             kwargs["docker_outdir"] = "$(task.outdir)"
202             kwargs["tmpdir"] = "$(task.tmpdir)"
203
204         runnerjob = None
205         if kwargs.get("submit"):
206             if self.work_api == "containers":
207                 if tool.tool["class"] == "CommandLineTool":
208                     runnerjob = tool.job(job_order,
209                                          self.output_callback,
210                                          **kwargs).next()
211                 else:
212                     runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"))
213             else:
214                 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"))
215
216         if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and not self.work_api == "containers":
217             # Create pipeline for local run
218             self.pipeline = self.api.pipeline_instances().create(
219                 body={
220                     "owner_uuid": self.project_uuid,
221                     "name": shortname(tool.tool["id"]),
222                     "components": {},
223                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
224             logger.info("Pipeline instance %s", self.pipeline["uuid"])
225
226         if runnerjob and not kwargs.get("wait"):
227             runnerjob.run()
228             return runnerjob.uuid
229
230         self.poll_api = arvados.api('v1')
231         self.polling_thread = threading.Thread(target=self.poll_states)
232         self.polling_thread.start()
233
234         if runnerjob:
235             jobiter = iter((runnerjob,))
236         else:
237             if "cwl_runner_job" in kwargs:
238                 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
239             jobiter = tool.job(job_order,
240                                self.output_callback,
241                                **kwargs)
242
243         try:
244             self.cond.acquire()
245             # Will continue to hold the lock for the duration of this code
246             # except when in cond.wait(), at which point on_message can update
247             # job state and process output callbacks.
248
249             loopperf = Perf(metrics, "jobiter")
250             loopperf.__enter__()
251             for runnable in jobiter:
252                 loopperf.__exit__()
253                 if runnable:
254                     with Perf(metrics, "run"):
255                         runnable.run(**kwargs)
256                 else:
257                     if self.processes:
258                         self.cond.wait(1)
259                     else:
260                         logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
261                         break
262                 loopperf.__enter__()
263             loopperf.__exit__()
264
265             while self.processes:
266                 self.cond.wait(1)
267
268         except UnsupportedRequirement:
269             raise
270         except:
271             if sys.exc_info()[0] is KeyboardInterrupt:
272                 logger.error("Interrupted, marking pipeline as failed")
273             else:
274                 logger.error("Caught unhandled exception, marking pipeline as failed.  Error was: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
275             if self.pipeline:
276                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
277                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
278             if runnerjob and runnerjob.uuid and self.work_api == "containers":
279                 self.api.container_requests().update(uuid=runnerjob.uuid,
280                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
281         finally:
282             self.cond.release()
283             self.stop_polling.set()
284             self.polling_thread.join()
285
286         if self.final_status == "UnsupportedRequirement":
287             raise UnsupportedRequirement("Check log for details.")
288
289         if self.final_status != "success":
290             raise WorkflowException("Workflow failed.")
291
292         if self.final_output is None:
293             raise WorkflowException("Workflow did not return a result.")
294
295         if kwargs.get("compute_checksum"):
296             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
297
298         return self.final_output
299
300
301 def versionstring():
302     """Print version string of key packages for provenance and debugging."""
303
304     arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
305     arvpkg = pkg_resources.require("arvados-python-client")
306     cwlpkg = pkg_resources.require("cwltool")
307
308     return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
309                                     "arvados-python-client", arvpkg[0].version,
310                                     "cwltool", cwlpkg[0].version)
311
312
313 def arg_parser():  # type: () -> argparse.ArgumentParser
314     parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
315
316     parser.add_argument("--basedir", type=str,
317                         help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
318     parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
319                         help="Output directory, default current directory")
320
321     parser.add_argument("--eval-timeout",
322                         help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
323                         type=float,
324                         default=20)
325     parser.add_argument("--version", action="store_true", help="Print version and exit")
326
327     exgroup = parser.add_mutually_exclusive_group()
328     exgroup.add_argument("--verbose", action="store_true", help="Default logging")
329     exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
330     exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
331
332     parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
333
334     parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
335
336     exgroup = parser.add_mutually_exclusive_group()
337     exgroup.add_argument("--enable-reuse", action="store_true",
338                         default=True, dest="enable_reuse",
339                         help="")
340     exgroup.add_argument("--disable-reuse", action="store_false",
341                         default=True, dest="enable_reuse",
342                         help="")
343
344     parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
345     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
346                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
347                         default=False)
348
349     exgroup = parser.add_mutually_exclusive_group()
350     exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
351                         default=True, dest="submit")
352     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
353                         default=True, dest="submit")
354     exgroup.add_argument("--create-template", action="store_true", help="Create an Arvados pipeline template.")
355     exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow.")
356     exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update existing Arvados workflow with uuid.")
357
358     exgroup = parser.add_mutually_exclusive_group()
359     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
360                         default=True, dest="wait")
361     exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
362                         default=True, dest="wait")
363
364     parser.add_argument("--api", type=str,
365                         default=None, dest="work_api",
366                         help="Select work submission API, one of 'jobs' or 'containers'.")
367
368     parser.add_argument("--compute-checksum", action="store_true", default=False,
369                         help="Compute checksum of contents while collecting outputs",
370                         dest="compute_checksum")
371
372     parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
373     parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
374
375     return parser
376
377 def add_arv_hints():
378     cache = {}
379     res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
380     cache["http://arvados.org/cwl"] = res.read()
381     res.close()
382     _, cwlnames, _, _ = cwltool.process.get_schema("v1.0")
383     _, extnames, _, _ = schema_salad.schema.load_schema("http://arvados.org/cwl", cache=cache)
384     for n in extnames.names:
385         if not cwlnames.has_name("http://arvados.org/cwl#"+n, ""):
386             cwlnames.add_name("http://arvados.org/cwl#"+n, "", extnames.get_name(n, ""))
387
388 def main(args, stdout, stderr, api_client=None):
389     parser = arg_parser()
390
391     job_order_object = None
392     arvargs = parser.parse_args(args)
393     if (arvargs.create_template or arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
394         job_order_object = ({}, "")
395
396     add_arv_hints()
397
398     try:
399         if api_client is None:
400             api_client=arvados.api('v1', model=OrderedJsonModel())
401         runner = ArvCwlRunner(api_client, work_api=arvargs.work_api)
402     except Exception as e:
403         logger.error(e)
404         return 1
405
406     if arvargs.debug:
407         logger.setLevel(logging.DEBUG)
408
409     if arvargs.quiet:
410         logger.setLevel(logging.WARN)
411         logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
412
413     if arvargs.metrics:
414         metrics.setLevel(logging.DEBUG)
415         logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
416
417     arvargs.conformance_test = None
418     arvargs.use_container = True
419
420     return cwltool.main.main(args=arvargs,
421                              stdout=stdout,
422                              stderr=stderr,
423                              executor=runner.arv_executor,
424                              makeTool=runner.arv_make_tool,
425                              versionfunc=versionstring,
426                              job_order_object=job_order_object,
427                              make_fs_access=partial(CollectionFsAccess, api_client=api_client))