8442: Adding --submit support with --crunch2. General refactoring into more/smaller...
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2
3 # Implement cwl-runner interface for submitting and running jobs on Arvados.
4
5 import argparse
6 import logging
7 import os
8 import sys
9 import threading
10 import pkg_resources  # part of setuptools
11
12 from cwltool.errors import WorkflowException
13 import cwltool.main
14 import cwltool.workflow
15
16 import arvados
17 import arvados.events
18
19 from .arvcontainer import ArvadosContainer, RunnerContainer
20 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
21 from .arvtool import ArvadosCommandTool
22
23 from cwltool.process import shortname, UnsupportedRequirement
24 from arvados.api import OrderedJsonModel
25
26 logger = logging.getLogger('arvados.cwl-runner')
27 logger.setLevel(logging.INFO)
28
29 class ArvCwlRunner(object):
30     """Execute a CWL tool or workflow, submit crunch jobs, wait for them to
31     complete, and report output."""
32
33     def __init__(self, api_client, crunch2):
34         self.api = api_client
35         self.jobs = {}
36         self.lock = threading.Lock()
37         self.cond = threading.Condition(self.lock)
38         self.final_output = None
39         self.uploaded = {}
40         self.num_retries = 4
41         self.uuid = None
42         self.crunch2 = crunch2
43
44     def arvMakeTool(self, toolpath_object, **kwargs):
45         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
46             return ArvadosCommandTool(self, toolpath_object, crunch2=self.crunch2, **kwargs)
47         else:
48             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
49
50     def output_callback(self, out, processStatus):
51         if processStatus == "success":
52             logger.info("Overall job status is %s", processStatus)
53             if self.pipeline:
54                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
55                                                      body={"state": "Complete"}).execute(num_retries=self.num_retries)
56
57         else:
58             logger.warn("Overall job status is %s", processStatus)
59             if self.pipeline:
60                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
61                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
62         self.final_output = out
63
64     def on_message(self, event):
65         if "object_uuid" in event:
66             if event["object_uuid"] in self.jobs and event["event_type"] == "update":
67                 if event["properties"]["new_attributes"]["state"] == "Running" and self.jobs[event["object_uuid"]].running is False:
68                     uuid = event["object_uuid"]
69                     with self.lock:
70                         j = self.jobs[uuid]
71                         logger.info("Job %s (%s) is Running", j.name, uuid)
72                         j.running = True
73                         j.update_pipeline_component(event["properties"]["new_attributes"])
74                 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
75                     uuid = event["object_uuid"]
76                     try:
77                         self.cond.acquire()
78                         j = self.jobs[uuid]
79                         logger.info("Job %s (%s) is %s", j.name, uuid, event["properties"]["new_attributes"]["state"])
80                         j.done(event["properties"]["new_attributes"])
81                         self.cond.notify()
82                     finally:
83                         self.cond.release()
84
85     def get_uploaded(self):
86         return self.uploaded.copy()
87
88     def add_uploaded(self, src, pair):
89         self.uploaded[src] = pair
90
91     def arvExecutor(self, tool, job_order, **kwargs):
92         self.debug = kwargs.get("debug")
93
94         if kwargs.get("quiet"):
95             logger.setLevel(logging.WARN)
96             logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
97
98         useruuid = self.api.users().current().execute()["uuid"]
99         self.project_uuid = kwargs.get("project_uuid") if kwargs.get("project_uuid") else useruuid
100         self.pipeline = None
101
102         if kwargs.get("create_template"):
103             tmpl = RunnerTemplate(self, tool, job_order, kwargs.get("enable_reuse"))
104             tmpl.save()
105             # cwltool.main will write our return value to stdout.
106             return tmpl.uuid
107
108         if kwargs.get("submit"):
109             if self.crunch2:
110                 runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"))
111             else:
112                 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"))
113
114         if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and not self.crunch2:
115             # Create pipeline for local run
116             self.pipeline = self.api.pipeline_instances().create(
117                 body={
118                     "owner_uuid": self.project_uuid,
119                     "name": shortname(tool.tool["id"]),
120                     "components": {},
121                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
122             logger.info("Pipeline instance %s", self.pipeline["uuid"])
123
124         if kwargs.get("submit") and not kwargs.get("wait"):
125                 runnerjob.run()
126                 return runnerjob.uuid
127
128         if self.crunch2:
129             events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#container"]], self.on_message)
130         else:
131             events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#job"]], self.on_message)
132
133         self.debug = kwargs.get("debug")
134         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
135         self.fs_access = CollectionFsAccess(kwargs["basedir"])
136
137         kwargs["fs_access"] = self.fs_access
138         kwargs["enable_reuse"] = kwargs.get("enable_reuse")
139
140         if self.crunch2:
141             kwargs["outdir"] = "/var/spool/cwl"
142             kwargs["tmpdir"] = "/tmp"
143         else:
144             kwargs["outdir"] = "$(task.outdir)"
145             kwargs["tmpdir"] = "$(task.tmpdir)"
146
147         if kwargs.get("submit"):
148             jobiter = iter((runnerjob,))
149         else:
150             if "cwl_runner_job" in kwargs:
151                 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
152             jobiter = tool.job(job_order,
153                                self.output_callback,
154                                docker_outdir="$(task.outdir)",
155                                **kwargs)
156
157         try:
158             self.cond.acquire()
159             # Will continue to hold the lock for the duration of this code
160             # except when in cond.wait(), at which point on_message can update
161             # job state and process output callbacks.
162
163             for runnable in jobiter:
164                 if runnable:
165                     runnable.run(**kwargs)
166                 else:
167                     if self.jobs:
168                         self.cond.wait(1)
169                     else:
170                         logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
171                         break
172
173             while self.jobs:
174                 self.cond.wait(1)
175
176             events.close()
177         except UnsupportedRequirement:
178             raise
179         except:
180             if sys.exc_info()[0] is KeyboardInterrupt:
181                 logger.error("Interrupted, marking pipeline as failed")
182             else:
183                 logger.error("Caught unhandled exception, marking pipeline as failed.  Error was: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
184             if self.pipeline:
185                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
186                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
187         finally:
188             self.cond.release()
189
190         if self.final_output is None:
191             raise cwltool.workflow.WorkflowException("Workflow did not return a result.")
192
193         return self.final_output
194
195
196 def versionstring():
197     """Print version string of key packages for provenance and debugging."""
198
199     arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
200     arvpkg = pkg_resources.require("arvados-python-client")
201     cwlpkg = pkg_resources.require("cwltool")
202
203     return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
204                                     "arvados-python-client", arvpkg[0].version,
205                                     "cwltool", cwlpkg[0].version)
206
207
208 def arg_parser():  # type: () -> argparse.ArgumentParser
209     parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
210
211     parser.add_argument("--conformance-test", action="store_true")
212     parser.add_argument("--basedir", type=str,
213                         help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
214     parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
215                         help="Output directory, default current directory")
216
217     parser.add_argument("--eval-timeout",
218                         help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
219                         type=float,
220                         default=20)
221     parser.add_argument("--version", action="store_true", help="Print version and exit")
222
223     exgroup = parser.add_mutually_exclusive_group()
224     exgroup.add_argument("--verbose", action="store_true", help="Default logging")
225     exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
226     exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
227
228     parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
229
230     exgroup = parser.add_mutually_exclusive_group()
231     exgroup.add_argument("--enable-reuse", action="store_true",
232                         default=True, dest="enable_reuse",
233                         help="")
234     exgroup.add_argument("--disable-reuse", action="store_false",
235                         default=True, dest="enable_reuse",
236                         help="")
237
238     parser.add_argument("--project-uuid", type=str, help="Project that will own the workflow jobs, if not provided, will go to home project.")
239     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
240                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
241                         default=False)
242
243     exgroup = parser.add_mutually_exclusive_group()
244     exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
245                         default=True, dest="submit")
246     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
247                         default=True, dest="submit")
248     exgroup.add_argument("--create-template", action="store_true", help="Create an Arvados pipeline template.")
249
250     exgroup = parser.add_mutually_exclusive_group()
251     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
252                         default=True, dest="wait")
253     exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
254                         default=True, dest="wait")
255
256     exgroup = parser.add_mutually_exclusive_group()
257     exgroup.add_argument("--crunch1", action="store_false",
258                         default=False, dest="crunch2",
259                         help="Use Crunch v1 Jobs API")
260
261     exgroup.add_argument("--crunch2", action="store_true",
262                         default=False, dest="crunch2",
263                         help="Use Crunch v2 Containers API")
264
265     parser.add_argument("workflow", type=str, nargs="?", default=None)
266     parser.add_argument("job_order", nargs=argparse.REMAINDER)
267
268     return parser
269
270
271 def main(args, stdout, stderr, api_client=None):
272     parser = arg_parser()
273
274     job_order_object = None
275     arvargs = parser.parse_args(args)
276     if arvargs.create_template and not arvargs.job_order:
277         job_order_object = ({}, "")
278
279     try:
280         if api_client is None:
281             api_client=arvados.api('v1', model=OrderedJsonModel())
282         runner = ArvCwlRunner(api_client, crunch2=arvargs.crunch2)
283     except Exception as e:
284         logger.error(e)
285         return 1
286
287     return cwltool.main.main(args=arvargs,
288                              stdout=stdout,
289                              stderr=stderr,
290                              executor=runner.arvExecutor,
291                              makeTool=runner.arvMakeTool,
292                              versionfunc=versionstring,
293                              job_order_object=job_order_object)