9570: Support Directory and file literal features. Support
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2
3 # Implement cwl-runner interface for submitting and running work on Arvados, using
4 # either the Crunch jobs API or Crunch containers API.
5
6 import argparse
7 import logging
8 import os
9 import sys
10 import threading
11 import pkg_resources  # part of setuptools
12
13 from cwltool.errors import WorkflowException
14 import cwltool.main
15 import cwltool.workflow
16
17 import arvados
18 import arvados.events
19
20 from .arvcontainer import ArvadosContainer, RunnerContainer
21 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
22 from .arvtool import ArvadosCommandTool
23 from .fsaccess import CollectionFsAccess
24
25 from cwltool.process import shortname, UnsupportedRequirement
26 from arvados.api import OrderedJsonModel
27
28 logger = logging.getLogger('arvados.cwl-runner')
29 logger.setLevel(logging.INFO)
30
31 class ArvCwlRunner(object):
32     """Execute a CWL tool or workflow, submit work (using either jobs or
33     containers API), wait for them to complete, and report output.
34
35     """
36
37     def __init__(self, api_client, work_api=None):
38         self.api = api_client
39         self.processes = {}
40         self.lock = threading.Lock()
41         self.cond = threading.Condition(self.lock)
42         self.final_output = None
43         self.final_status = None
44         self.uploaded = {}
45         self.num_retries = 4
46         self.uuid = None
47         self.work_api = work_api
48
49         if self.work_api is None:
50             # todo: autodetect API to use.
51             self.work_api = "jobs"
52
53         if self.work_api not in ("containers", "jobs"):
54             raise Exception("Unsupported API '%s'" % self.work_api)
55
56     def arvMakeTool(self, toolpath_object, **kwargs):
57         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
58             return ArvadosCommandTool(self, toolpath_object, work_api=self.work_api, **kwargs)
59         else:
60             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
61
62     def output_callback(self, out, processStatus):
63         if processStatus == "success":
64             logger.info("Overall process status is %s", processStatus)
65             if self.pipeline:
66                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
67                                                      body={"state": "Complete"}).execute(num_retries=self.num_retries)
68         else:
69             logger.warn("Overall process status is %s", processStatus)
70             if self.pipeline:
71                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
72                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
73         self.final_status = processStatus
74         self.final_output = out
75
76     def on_message(self, event):
77         if "object_uuid" in event:
78             if event["object_uuid"] in self.processes and event["event_type"] == "update":
79                 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
80                     uuid = event["object_uuid"]
81                     with self.lock:
82                         j = self.processes[uuid]
83                         logger.info("Job %s (%s) is Running", j.name, uuid)
84                         j.running = True
85                         j.update_pipeline_component(event["properties"]["new_attributes"])
86                 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
87                     uuid = event["object_uuid"]
88                     try:
89                         self.cond.acquire()
90                         j = self.processes[uuid]
91                         logger.info("Job %s (%s) is %s", j.name, uuid, event["properties"]["new_attributes"]["state"])
92                         j.done(event["properties"]["new_attributes"])
93                         self.cond.notify()
94                     finally:
95                         self.cond.release()
96
97     def get_uploaded(self):
98         return self.uploaded.copy()
99
100     def add_uploaded(self, src, pair):
101         self.uploaded[src] = pair
102
103     def arvExecutor(self, tool, job_order, **kwargs):
104         self.debug = kwargs.get("debug")
105
106         if kwargs.get("quiet"):
107             logger.setLevel(logging.WARN)
108             logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
109
110         useruuid = self.api.users().current().execute()["uuid"]
111         self.project_uuid = kwargs.get("project_uuid") if kwargs.get("project_uuid") else useruuid
112         self.pipeline = None
113
114         if kwargs.get("create_template"):
115             tmpl = RunnerTemplate(self, tool, job_order, kwargs.get("enable_reuse"))
116             tmpl.save()
117             # cwltool.main will write our return value to stdout.
118             return tmpl.uuid
119
120         self.debug = kwargs.get("debug")
121         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
122         self.fs_access = CollectionFsAccess(kwargs["basedir"])
123
124         kwargs["fs_access"] = self.fs_access
125         kwargs["enable_reuse"] = kwargs.get("enable_reuse")
126
127         if self.work_api == "containers":
128             kwargs["outdir"] = "/var/spool/cwl"
129             kwargs["tmpdir"] = "/tmp"
130         elif self.work_api == "jobs":
131             kwargs["outdir"] = "$(task.outdir)"
132             kwargs["tmpdir"] = "$(task.tmpdir)"
133
134         runnerjob = None
135         if kwargs.get("submit"):
136             if self.work_api == "containers":
137                 if tool.tool["class"] == "CommandLineTool":
138                     runnerjob = tool.job(job_order,
139                                          self.output_callback,
140                                          **kwargs).next()
141                 else:
142                     runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"))
143             else:
144                 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"))
145
146         if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and not self.work_api == "containers":
147             # Create pipeline for local run
148             self.pipeline = self.api.pipeline_instances().create(
149                 body={
150                     "owner_uuid": self.project_uuid,
151                     "name": shortname(tool.tool["id"]),
152                     "components": {},
153                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
154             logger.info("Pipeline instance %s", self.pipeline["uuid"])
155
156         if runnerjob and not kwargs.get("wait"):
157             runnerjob.run()
158             return runnerjob.uuid
159
160         if self.work_api == "containers":
161             events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#container"]], self.on_message)
162         if self.work_api == "jobs":
163             events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#job"]], self.on_message)
164
165         if runnerjob:
166             jobiter = iter((runnerjob,))
167         else:
168             if "cwl_runner_job" in kwargs:
169                 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
170             jobiter = tool.job(job_order,
171                                self.output_callback,
172                                docker_outdir="$(task.outdir)",
173                                **kwargs)
174
175         try:
176             self.cond.acquire()
177             # Will continue to hold the lock for the duration of this code
178             # except when in cond.wait(), at which point on_message can update
179             # job state and process output callbacks.
180
181             for runnable in jobiter:
182                 if runnable:
183                     runnable.run(**kwargs)
184                 else:
185                     if self.processes:
186                         self.cond.wait(1)
187                     else:
188                         logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
189                         break
190
191             while self.processes:
192                 self.cond.wait(1)
193
194             events.close()
195         except UnsupportedRequirement:
196             raise
197         except:
198             if sys.exc_info()[0] is KeyboardInterrupt:
199                 logger.error("Interrupted, marking pipeline as failed")
200             else:
201                 logger.error("Caught unhandled exception, marking pipeline as failed.  Error was: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
202             if self.pipeline:
203                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
204                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
205             if runnerjob and runnerjob.uuid and self.work_api == "containers":
206                 self.api.container_requests().update(uuid=runnerjob.uuid,
207                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
208         finally:
209             self.cond.release()
210
211         if self.final_status == "UnsupportedRequirement":
212             raise UnsupportedRequirement("Check log for details.")
213
214         if self.final_output is None:
215             raise WorkflowException("Workflow did not return a result.")
216
217         return self.final_output
218
219
220 def versionstring():
221     """Print version string of key packages for provenance and debugging."""
222
223     arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
224     arvpkg = pkg_resources.require("arvados-python-client")
225     cwlpkg = pkg_resources.require("cwltool")
226
227     return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
228                                     "arvados-python-client", arvpkg[0].version,
229                                     "cwltool", cwlpkg[0].version)
230
231
232 def arg_parser():  # type: () -> argparse.ArgumentParser
233     parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
234
235     parser.add_argument("--basedir", type=str,
236                         help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
237     parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
238                         help="Output directory, default current directory")
239
240     parser.add_argument("--eval-timeout",
241                         help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
242                         type=float,
243                         default=20)
244     parser.add_argument("--version", action="store_true", help="Print version and exit")
245
246     exgroup = parser.add_mutually_exclusive_group()
247     exgroup.add_argument("--verbose", action="store_true", help="Default logging")
248     exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
249     exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
250
251     parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
252
253     exgroup = parser.add_mutually_exclusive_group()
254     exgroup.add_argument("--enable-reuse", action="store_true",
255                         default=True, dest="enable_reuse",
256                         help="")
257     exgroup.add_argument("--disable-reuse", action="store_false",
258                         default=True, dest="enable_reuse",
259                         help="")
260
261     parser.add_argument("--project-uuid", type=str, help="Project that will own the workflow jobs, if not provided, will go to home project.")
262     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
263                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
264                         default=False)
265
266     exgroup = parser.add_mutually_exclusive_group()
267     exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
268                         default=True, dest="submit")
269     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
270                         default=True, dest="submit")
271     exgroup.add_argument("--create-template", action="store_true", help="Create an Arvados pipeline template.")
272
273     exgroup = parser.add_mutually_exclusive_group()
274     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
275                         default=True, dest="wait")
276     exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
277                         default=True, dest="wait")
278
279     parser.add_argument("--api", type=str,
280                         default=None, dest="work_api",
281                         help="Select work submission API, one of 'jobs' or 'containers'.")
282
283     parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
284     parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
285
286     return parser
287
288
289 def main(args, stdout, stderr, api_client=None):
290     parser = arg_parser()
291
292     job_order_object = None
293     arvargs = parser.parse_args(args)
294     if arvargs.create_template and not arvargs.job_order:
295         job_order_object = ({}, "")
296
297     try:
298         if api_client is None:
299             api_client=arvados.api('v1', model=OrderedJsonModel())
300         runner = ArvCwlRunner(api_client, work_api=arvargs.work_api)
301     except Exception as e:
302         logger.error(e)
303         return 1
304
305     arvargs.conformance_test = None
306     arvargs.use_container = True
307
308     return cwltool.main.main(args=arvargs,
309                              stdout=stdout,
310                              stderr=stderr,
311                              executor=runner.arvExecutor,
312                              makeTool=runner.arvMakeTool,
313                              versionfunc=versionstring,
314                              job_order_object=job_order_object)