8442: Debugging container --submit with crunch2
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2
3 # Implement cwl-runner interface for submitting and running jobs on Arvados.
4
5 import argparse
6 import logging
7 import os
8 import sys
9 import threading
10 import pkg_resources  # part of setuptools
11
12 from cwltool.errors import WorkflowException
13 import cwltool.main
14 import cwltool.workflow
15
16 import arvados
17 import arvados.events
18
19 from .arvcontainer import ArvadosContainer, RunnerContainer
20 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
21 from .arvtool import ArvadosCommandTool
22 from .fsaccess import CollectionFsAccess
23
24 from cwltool.process import shortname, UnsupportedRequirement
25 from arvados.api import OrderedJsonModel
26
27 logger = logging.getLogger('arvados.cwl-runner')
28 logger.setLevel(logging.INFO)
29
30 class ArvCwlRunner(object):
31     """Execute a CWL tool or workflow, submit crunch jobs, wait for them to
32     complete, and report output."""
33
34     def __init__(self, api_client, crunch2):
35         self.api = api_client
36         self.jobs = {}
37         self.lock = threading.Lock()
38         self.cond = threading.Condition(self.lock)
39         self.final_output = None
40         self.uploaded = {}
41         self.num_retries = 4
42         self.uuid = None
43         self.crunch2 = crunch2
44
45     def arvMakeTool(self, toolpath_object, **kwargs):
46         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
47             return ArvadosCommandTool(self, toolpath_object, crunch2=self.crunch2, **kwargs)
48         else:
49             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
50
51     def output_callback(self, out, processStatus):
52         if processStatus == "success":
53             logger.info("Overall job status is %s", processStatus)
54             if self.pipeline:
55                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
56                                                      body={"state": "Complete"}).execute(num_retries=self.num_retries)
57
58         else:
59             logger.warn("Overall job status is %s", processStatus)
60             if self.pipeline:
61                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
62                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
63         self.final_output = out
64
65     def on_message(self, event):
66         if "object_uuid" in event:
67             if event["object_uuid"] in self.jobs and event["event_type"] == "update":
68                 if event["properties"]["new_attributes"]["state"] == "Running" and self.jobs[event["object_uuid"]].running is False:
69                     uuid = event["object_uuid"]
70                     with self.lock:
71                         j = self.jobs[uuid]
72                         logger.info("Job %s (%s) is Running", j.name, uuid)
73                         j.running = True
74                         j.update_pipeline_component(event["properties"]["new_attributes"])
75                 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
76                     uuid = event["object_uuid"]
77                     try:
78                         self.cond.acquire()
79                         j = self.jobs[uuid]
80                         logger.info("Job %s (%s) is %s", j.name, uuid, event["properties"]["new_attributes"]["state"])
81                         j.done(event["properties"]["new_attributes"])
82                         self.cond.notify()
83                     finally:
84                         self.cond.release()
85
86     def get_uploaded(self):
87         return self.uploaded.copy()
88
89     def add_uploaded(self, src, pair):
90         self.uploaded[src] = pair
91
92     def arvExecutor(self, tool, job_order, **kwargs):
93         self.debug = kwargs.get("debug")
94
95         if kwargs.get("quiet"):
96             logger.setLevel(logging.WARN)
97             logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
98
99         useruuid = self.api.users().current().execute()["uuid"]
100         self.project_uuid = kwargs.get("project_uuid") if kwargs.get("project_uuid") else useruuid
101         self.pipeline = None
102
103         if kwargs.get("create_template"):
104             tmpl = RunnerTemplate(self, tool, job_order, kwargs.get("enable_reuse"))
105             tmpl.save()
106             # cwltool.main will write our return value to stdout.
107             return tmpl.uuid
108
109         if kwargs.get("submit"):
110             if self.crunch2:
111                 runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"))
112             else:
113                 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"))
114
115         if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and not self.crunch2:
116             # Create pipeline for local run
117             self.pipeline = self.api.pipeline_instances().create(
118                 body={
119                     "owner_uuid": self.project_uuid,
120                     "name": shortname(tool.tool["id"]),
121                     "components": {},
122                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
123             logger.info("Pipeline instance %s", self.pipeline["uuid"])
124
125         if kwargs.get("submit") and not kwargs.get("wait"):
126                 runnerjob.run()
127                 return runnerjob.uuid
128
129         if self.crunch2:
130             events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#container"]], self.on_message)
131         else:
132             events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#job"]], self.on_message)
133
134         self.debug = kwargs.get("debug")
135         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
136         self.fs_access = CollectionFsAccess(kwargs["basedir"])
137
138         kwargs["fs_access"] = self.fs_access
139         kwargs["enable_reuse"] = kwargs.get("enable_reuse")
140
141         if self.crunch2:
142             kwargs["outdir"] = "/var/spool/cwl"
143             kwargs["tmpdir"] = "/tmp"
144         else:
145             kwargs["outdir"] = "$(task.outdir)"
146             kwargs["tmpdir"] = "$(task.tmpdir)"
147
148         if kwargs.get("submit"):
149             jobiter = iter((runnerjob,))
150         else:
151             if "cwl_runner_job" in kwargs:
152                 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
153             jobiter = tool.job(job_order,
154                                self.output_callback,
155                                docker_outdir="$(task.outdir)",
156                                **kwargs)
157
158         try:
159             self.cond.acquire()
160             # Will continue to hold the lock for the duration of this code
161             # except when in cond.wait(), at which point on_message can update
162             # job state and process output callbacks.
163
164             for runnable in jobiter:
165                 if runnable:
166                     runnable.run(**kwargs)
167                 else:
168                     if self.jobs:
169                         self.cond.wait(1)
170                     else:
171                         logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
172                         break
173
174             while self.jobs:
175                 self.cond.wait(1)
176
177             events.close()
178         except UnsupportedRequirement:
179             raise
180         except:
181             if sys.exc_info()[0] is KeyboardInterrupt:
182                 logger.error("Interrupted, marking pipeline as failed")
183             else:
184                 logger.error("Caught unhandled exception, marking pipeline as failed.  Error was: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
185             if self.pipeline:
186                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
187                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
188             if runnerjob and self.crunch2:
189                 self.api.container_requests().update(uuid=runnerjob.uuid,
190                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
191         finally:
192             self.cond.release()
193
194         if self.final_output is None:
195             raise cwltool.workflow.WorkflowException("Workflow did not return a result.")
196
197         return self.final_output
198
199
200 def versionstring():
201     """Print version string of key packages for provenance and debugging."""
202
203     arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
204     arvpkg = pkg_resources.require("arvados-python-client")
205     cwlpkg = pkg_resources.require("cwltool")
206
207     return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
208                                     "arvados-python-client", arvpkg[0].version,
209                                     "cwltool", cwlpkg[0].version)
210
211
212 def arg_parser():  # type: () -> argparse.ArgumentParser
213     parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
214
215     parser.add_argument("--conformance-test", action="store_true")
216     parser.add_argument("--basedir", type=str,
217                         help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
218     parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
219                         help="Output directory, default current directory")
220
221     parser.add_argument("--eval-timeout",
222                         help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
223                         type=float,
224                         default=20)
225     parser.add_argument("--version", action="store_true", help="Print version and exit")
226
227     exgroup = parser.add_mutually_exclusive_group()
228     exgroup.add_argument("--verbose", action="store_true", help="Default logging")
229     exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
230     exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
231
232     parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
233
234     exgroup = parser.add_mutually_exclusive_group()
235     exgroup.add_argument("--enable-reuse", action="store_true",
236                         default=True, dest="enable_reuse",
237                         help="")
238     exgroup.add_argument("--disable-reuse", action="store_false",
239                         default=True, dest="enable_reuse",
240                         help="")
241
242     parser.add_argument("--project-uuid", type=str, help="Project that will own the workflow jobs, if not provided, will go to home project.")
243     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
244                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
245                         default=False)
246
247     exgroup = parser.add_mutually_exclusive_group()
248     exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
249                         default=True, dest="submit")
250     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
251                         default=True, dest="submit")
252     exgroup.add_argument("--create-template", action="store_true", help="Create an Arvados pipeline template.")
253
254     exgroup = parser.add_mutually_exclusive_group()
255     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
256                         default=True, dest="wait")
257     exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
258                         default=True, dest="wait")
259
260     exgroup = parser.add_mutually_exclusive_group()
261     exgroup.add_argument("--crunch1", action="store_false",
262                         default=False, dest="crunch2",
263                         help="Use Crunch v1 Jobs API")
264
265     exgroup.add_argument("--crunch2", action="store_true",
266                         default=False, dest="crunch2",
267                         help="Use Crunch v2 Containers API")
268
269     parser.add_argument("workflow", type=str, nargs="?", default=None)
270     parser.add_argument("job_order", nargs=argparse.REMAINDER)
271
272     return parser
273
274
275 def main(args, stdout, stderr, api_client=None):
276     parser = arg_parser()
277
278     job_order_object = None
279     arvargs = parser.parse_args(args)
280     if arvargs.create_template and not arvargs.job_order:
281         job_order_object = ({}, "")
282
283     try:
284         if api_client is None:
285             api_client=arvados.api('v1', model=OrderedJsonModel())
286         runner = ArvCwlRunner(api_client, crunch2=arvargs.crunch2)
287     except Exception as e:
288         logger.error(e)
289         return 1
290
291     return cwltool.main.main(args=arvargs,
292                              stdout=stdout,
293                              stderr=stderr,
294                              executor=runner.arvExecutor,
295                              makeTool=runner.arvMakeTool,
296                              versionfunc=versionstring,
297                              job_order_object=job_order_object)