3 # Implement cwl-runner interface for submitting and running work on Arvados, using
4 # either the Crunch jobs API or Crunch containers API.
12 from functools import partial
13 import pkg_resources # part of setuptools
15 from cwltool.errors import WorkflowException
17 import cwltool.workflow
23 from .arvcontainer import ArvadosContainer, RunnerContainer
24 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
25 from .arvtool import ArvadosCommandTool
26 from .fsaccess import CollectionFsAccess
28 from cwltool.process import shortname, UnsupportedRequirement
29 from cwltool.pathmapper import adjustFileObjs
30 from cwltool.draft2tool import compute_checksums
31 from arvados.api import OrderedJsonModel
33 logger = logging.getLogger('arvados.cwl-runner')
34 logger.setLevel(logging.INFO)
36 class ArvCwlRunner(object):
37 """Execute a CWL tool or workflow, submit work (using either jobs or
38 containers API), wait for them to complete, and report output.
42 def __init__(self, api_client, work_api=None):
45 self.lock = threading.Lock()
46 self.cond = threading.Condition(self.lock)
47 self.final_output = None
48 self.final_status = None
52 self.work_api = work_api
54 if self.work_api is None:
55 # todo: autodetect API to use.
56 self.work_api = "jobs"
58 if self.work_api not in ("containers", "jobs"):
59 raise Exception("Unsupported API '%s'" % self.work_api)
61 def arvMakeTool(self, toolpath_object, **kwargs):
62 if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
63 return ArvadosCommandTool(self, toolpath_object, work_api=self.work_api, **kwargs)
65 return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
67 def output_callback(self, out, processStatus):
68 if processStatus == "success":
69 logger.info("Overall process status is %s", processStatus)
71 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
72 body={"state": "Complete"}).execute(num_retries=self.num_retries)
74 logger.warn("Overall process status is %s", processStatus)
76 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
77 body={"state": "Failed"}).execute(num_retries=self.num_retries)
78 self.final_status = processStatus
79 self.final_output = out
81 def on_message(self, event):
82 if "object_uuid" in event:
83 if event["object_uuid"] in self.processes and event["event_type"] == "update":
84 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
85 uuid = event["object_uuid"]
87 j = self.processes[uuid]
88 logger.info("Job %s (%s) is Running", j.name, uuid)
90 j.update_pipeline_component(event["properties"]["new_attributes"])
91 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
92 uuid = event["object_uuid"]
95 j = self.processes[uuid]
96 logger.info("Job %s (%s) is %s", j.name, uuid, event["properties"]["new_attributes"]["state"])
97 j.done(event["properties"]["new_attributes"])
102 def get_uploaded(self):
103 return self.uploaded.copy()
105 def add_uploaded(self, src, pair):
106 self.uploaded[src] = pair
108 def check_writable(self, obj):
109 if isinstance(obj, dict):
110 if obj.get("writable"):
111 raise UnsupportedRequirement("InitialWorkDir feature 'writable: true' not supported")
112 for v in obj.itervalues():
113 self.check_writable(v)
114 if isinstance(obj, list):
116 self.check_writable(v)
118 def arvExecutor(self, tool, job_order, **kwargs):
119 self.debug = kwargs.get("debug")
121 tool.visit(self.check_writable)
123 if kwargs.get("quiet"):
124 logger.setLevel(logging.WARN)
125 logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
127 useruuid = self.api.users().current().execute()["uuid"]
128 self.project_uuid = kwargs.get("project_uuid") if kwargs.get("project_uuid") else useruuid
130 make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess, api_client=self.api)
131 self.fs_access = make_fs_access(kwargs["basedir"])
133 if kwargs.get("create_template"):
134 tmpl = RunnerTemplate(self, tool, job_order, kwargs.get("enable_reuse"))
136 # cwltool.main will write our return value to stdout.
139 self.debug = kwargs.get("debug")
140 self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
142 kwargs["make_fs_access"] = make_fs_access
143 kwargs["enable_reuse"] = kwargs.get("enable_reuse")
144 kwargs["use_container"] = True
145 kwargs["tmpdir_prefix"] = "tmp"
146 kwargs["on_error"] = "continue"
147 kwargs["compute_checksum"] = kwargs.get("compute_checksum")
149 if self.work_api == "containers":
150 kwargs["outdir"] = "/var/spool/cwl"
151 kwargs["docker_outdir"] = "/var/spool/cwl"
152 kwargs["tmpdir"] = "/tmp"
153 kwargs["docker_tmpdir"] = "/tmp"
154 elif self.work_api == "jobs":
155 kwargs["outdir"] = "$(task.outdir)"
156 kwargs["docker_outdir"] = "$(task.outdir)"
157 kwargs["tmpdir"] = "$(task.tmpdir)"
160 if kwargs.get("submit"):
161 if self.work_api == "containers":
162 if tool.tool["class"] == "CommandLineTool":
163 runnerjob = tool.job(job_order,
164 self.output_callback,
167 runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"))
169 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"))
171 if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and not self.work_api == "containers":
172 # Create pipeline for local run
173 self.pipeline = self.api.pipeline_instances().create(
175 "owner_uuid": self.project_uuid,
176 "name": shortname(tool.tool["id"]),
178 "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
179 logger.info("Pipeline instance %s", self.pipeline["uuid"])
181 if runnerjob and not kwargs.get("wait"):
183 return runnerjob.uuid
185 arvados.config.settings()["ARVADOS_DISABLE_WEBSOCKETS"] = "1"
187 if self.work_api == "containers":
188 events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#container"]], self.on_message)
189 if self.work_api == "jobs":
190 events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#job"]], self.on_message)
193 jobiter = iter((runnerjob,))
195 if "cwl_runner_job" in kwargs:
196 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
197 jobiter = tool.job(job_order,
198 self.output_callback,
203 # Will continue to hold the lock for the duration of this code
204 # except when in cond.wait(), at which point on_message can update
205 # job state and process output callbacks.
207 for runnable in jobiter:
209 runnable.run(**kwargs)
214 logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
217 while self.processes:
221 except UnsupportedRequirement:
224 if sys.exc_info()[0] is KeyboardInterrupt:
225 logger.error("Interrupted, marking pipeline as failed")
227 logger.error("Caught unhandled exception, marking pipeline as failed. Error was: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
229 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
230 body={"state": "Failed"}).execute(num_retries=self.num_retries)
231 if runnerjob and runnerjob.uuid and self.work_api == "containers":
232 self.api.container_requests().update(uuid=runnerjob.uuid,
233 body={"priority": "0"}).execute(num_retries=self.num_retries)
237 if self.final_status == "UnsupportedRequirement":
238 raise UnsupportedRequirement("Check log for details.")
240 if self.final_status != "success":
241 raise WorkflowException("Workflow failed.")
243 if self.final_output is None:
244 raise WorkflowException("Workflow did not return a result.")
246 if kwargs.get("compute_checksum"):
247 adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
249 return self.final_output
253 """Print version string of key packages for provenance and debugging."""
255 arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
256 arvpkg = pkg_resources.require("arvados-python-client")
257 cwlpkg = pkg_resources.require("cwltool")
259 return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
260 "arvados-python-client", arvpkg[0].version,
261 "cwltool", cwlpkg[0].version)
264 def arg_parser(): # type: () -> argparse.ArgumentParser
265 parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
267 parser.add_argument("--basedir", type=str,
268 help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
269 parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
270 help="Output directory, default current directory")
272 parser.add_argument("--eval-timeout",
273 help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
276 parser.add_argument("--version", action="store_true", help="Print version and exit")
278 exgroup = parser.add_mutually_exclusive_group()
279 exgroup.add_argument("--verbose", action="store_true", help="Default logging")
280 exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
281 exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
283 parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
285 exgroup = parser.add_mutually_exclusive_group()
286 exgroup.add_argument("--enable-reuse", action="store_true",
287 default=True, dest="enable_reuse",
289 exgroup.add_argument("--disable-reuse", action="store_false",
290 default=True, dest="enable_reuse",
293 parser.add_argument("--project-uuid", type=str, help="Project that will own the workflow jobs, if not provided, will go to home project.")
294 parser.add_argument("--ignore-docker-for-reuse", action="store_true",
295 help="Ignore Docker image version when deciding whether to reuse past jobs.",
298 exgroup = parser.add_mutually_exclusive_group()
299 exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
300 default=True, dest="submit")
301 exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
302 default=True, dest="submit")
303 exgroup.add_argument("--create-template", action="store_true", help="Create an Arvados pipeline template.")
305 exgroup = parser.add_mutually_exclusive_group()
306 exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
307 default=True, dest="wait")
308 exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
309 default=True, dest="wait")
311 parser.add_argument("--api", type=str,
312 default=None, dest="work_api",
313 help="Select work submission API, one of 'jobs' or 'containers'.")
315 parser.add_argument("--compute-checksum", action="store_true", default=False,
316 help="Compute checksum of contents while collecting outputs",
317 dest="compute_checksum")
319 parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
320 parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
325 def main(args, stdout, stderr, api_client=None):
326 parser = arg_parser()
328 job_order_object = None
329 arvargs = parser.parse_args(args)
330 if arvargs.create_template and not arvargs.job_order:
331 job_order_object = ({}, "")
334 if api_client is None:
335 api_client=arvados.api('v1', model=OrderedJsonModel())
336 runner = ArvCwlRunner(api_client, work_api=arvargs.work_api)
337 except Exception as e:
341 arvargs.conformance_test = None
342 arvargs.use_container = True
344 return cwltool.main.main(args=arvargs,
347 executor=runner.arvExecutor,
348 makeTool=runner.arvMakeTool,
349 versionfunc=versionstring,
350 job_order_object=job_order_object,
351 make_fs_access=partial(CollectionFsAccess, api_client=api_client))