3 # Implement cwl-runner interface for submitting and running work on Arvados, using
4 # either the Crunch jobs API or Crunch containers API.
11 import pkg_resources # part of setuptools
13 from cwltool.errors import WorkflowException
15 import cwltool.workflow
21 from .arvcontainer import ArvadosContainer, RunnerContainer
22 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
23 from .arvtool import ArvadosCommandTool
24 from .fsaccess import CollectionFsAccess
26 from cwltool.process import shortname, UnsupportedRequirement
27 from arvados.api import OrderedJsonModel
29 logger = logging.getLogger('arvados.cwl-runner')
30 logger.setLevel(logging.INFO)
32 class ArvCwlRunner(object):
33 """Execute a CWL tool or workflow, submit work (using either jobs or
34 containers API), wait for them to complete, and report output.
38 def __init__(self, api_client, work_api=None):
41 self.lock = threading.Lock()
42 self.cond = threading.Condition(self.lock)
43 self.final_output = None
44 self.final_status = None
48 self.work_api = work_api
50 if self.work_api is None:
51 # todo: autodetect API to use.
52 self.work_api = "jobs"
54 if self.work_api not in ("containers", "jobs"):
55 raise Exception("Unsupported API '%s'" % self.work_api)
57 def arvMakeTool(self, toolpath_object, **kwargs):
58 if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
59 return ArvadosCommandTool(self, toolpath_object, work_api=self.work_api, **kwargs)
61 return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
63 def output_callback(self, out, processStatus):
64 if processStatus == "success":
65 logger.info("Overall process status is %s", processStatus)
67 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
68 body={"state": "Complete"}).execute(num_retries=self.num_retries)
70 logger.warn("Overall process status is %s", processStatus)
72 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
73 body={"state": "Failed"}).execute(num_retries=self.num_retries)
74 self.final_status = processStatus
75 self.final_output = out
77 def on_message(self, event):
78 if "object_uuid" in event:
79 if event["object_uuid"] in self.processes and event["event_type"] == "update":
80 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
81 uuid = event["object_uuid"]
83 j = self.processes[uuid]
84 logger.info("Job %s (%s) is Running", j.name, uuid)
86 j.update_pipeline_component(event["properties"]["new_attributes"])
87 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
88 uuid = event["object_uuid"]
91 j = self.processes[uuid]
92 logger.info("Job %s (%s) is %s", j.name, uuid, event["properties"]["new_attributes"]["state"])
93 j.done(event["properties"]["new_attributes"])
98 def get_uploaded(self):
99 return self.uploaded.copy()
101 def add_uploaded(self, src, pair):
102 self.uploaded[src] = pair
104 def arvExecutor(self, tool, job_order, **kwargs):
105 self.debug = kwargs.get("debug")
107 if kwargs.get("quiet"):
108 logger.setLevel(logging.WARN)
109 logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
111 useruuid = self.api.users().current().execute()["uuid"]
112 self.project_uuid = kwargs.get("project_uuid") if kwargs.get("project_uuid") else useruuid
114 self.fs_access = CollectionFsAccess(kwargs["basedir"], api_client=self.api)
116 if kwargs.get("create_template"):
117 tmpl = RunnerTemplate(self, tool, job_order, kwargs.get("enable_reuse"))
119 # cwltool.main will write our return value to stdout.
122 self.debug = kwargs.get("debug")
123 self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
125 kwargs["fs_access"] = self.fs_access
126 kwargs["enable_reuse"] = kwargs.get("enable_reuse")
127 kwargs["use_container"] = True
128 kwargs["tmpdir_prefix"] = "tmp"
129 kwargs["on_error"] = "continue"
130 kwargs["compute_checksum"] = kwargs.get("compute_checksum")
132 if self.work_api == "containers":
133 kwargs["outdir"] = "/var/spool/cwl"
134 kwargs["docker_outdir"] = "/var/spool/cwl"
135 kwargs["tmpdir"] = "/tmp"
136 elif self.work_api == "jobs":
137 kwargs["outdir"] = "$(task.outdir)"
138 kwargs["docker_outdir"] = "$(task.outdir)"
139 kwargs["tmpdir"] = "$(task.tmpdir)"
142 if kwargs.get("submit"):
143 if self.work_api == "containers":
144 if tool.tool["class"] == "CommandLineTool":
145 runnerjob = tool.job(job_order,
146 self.output_callback,
149 runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"))
151 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"))
153 if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and not self.work_api == "containers":
154 # Create pipeline for local run
155 self.pipeline = self.api.pipeline_instances().create(
157 "owner_uuid": self.project_uuid,
158 "name": shortname(tool.tool["id"]),
160 "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
161 logger.info("Pipeline instance %s", self.pipeline["uuid"])
163 if runnerjob and not kwargs.get("wait"):
165 return runnerjob.uuid
167 arvados.config.settings()["ARVADOS_DISABLE_WEBSOCKETS"] = "1"
169 if self.work_api == "containers":
170 events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#container"]], self.on_message)
171 if self.work_api == "jobs":
172 events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#job"]], self.on_message)
175 jobiter = iter((runnerjob,))
177 if "cwl_runner_job" in kwargs:
178 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
179 jobiter = tool.job(job_order,
180 self.output_callback,
185 # Will continue to hold the lock for the duration of this code
186 # except when in cond.wait(), at which point on_message can update
187 # job state and process output callbacks.
189 for runnable in jobiter:
191 runnable.run(**kwargs)
196 logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
199 while self.processes:
203 except UnsupportedRequirement:
206 if sys.exc_info()[0] is KeyboardInterrupt:
207 logger.error("Interrupted, marking pipeline as failed")
209 logger.error("Caught unhandled exception, marking pipeline as failed. Error was: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
211 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
212 body={"state": "Failed"}).execute(num_retries=self.num_retries)
213 if runnerjob and runnerjob.uuid and self.work_api == "containers":
214 self.api.container_requests().update(uuid=runnerjob.uuid,
215 body={"priority": "0"}).execute(num_retries=self.num_retries)
219 if self.final_status == "UnsupportedRequirement":
220 raise UnsupportedRequirement("Check log for details.")
222 if self.final_status != "success":
223 raise WorkflowException("Workflow failed.")
225 if self.final_output is None:
226 raise WorkflowException("Workflow did not return a result.")
228 return self.final_output
232 """Print version string of key packages for provenance and debugging."""
234 arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
235 arvpkg = pkg_resources.require("arvados-python-client")
236 cwlpkg = pkg_resources.require("cwltool")
238 return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
239 "arvados-python-client", arvpkg[0].version,
240 "cwltool", cwlpkg[0].version)
243 def arg_parser(): # type: () -> argparse.ArgumentParser
244 parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
246 parser.add_argument("--basedir", type=str,
247 help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
248 parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
249 help="Output directory, default current directory")
251 parser.add_argument("--eval-timeout",
252 help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
255 parser.add_argument("--version", action="store_true", help="Print version and exit")
257 exgroup = parser.add_mutually_exclusive_group()
258 exgroup.add_argument("--verbose", action="store_true", help="Default logging")
259 exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
260 exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
262 parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
264 exgroup = parser.add_mutually_exclusive_group()
265 exgroup.add_argument("--enable-reuse", action="store_true",
266 default=True, dest="enable_reuse",
268 exgroup.add_argument("--disable-reuse", action="store_false",
269 default=True, dest="enable_reuse",
272 parser.add_argument("--project-uuid", type=str, help="Project that will own the workflow jobs, if not provided, will go to home project.")
273 parser.add_argument("--ignore-docker-for-reuse", action="store_true",
274 help="Ignore Docker image version when deciding whether to reuse past jobs.",
277 exgroup = parser.add_mutually_exclusive_group()
278 exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
279 default=True, dest="submit")
280 exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
281 default=True, dest="submit")
282 exgroup.add_argument("--create-template", action="store_true", help="Create an Arvados pipeline template.")
284 exgroup = parser.add_mutually_exclusive_group()
285 exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
286 default=True, dest="wait")
287 exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
288 default=True, dest="wait")
290 parser.add_argument("--api", type=str,
291 default=None, dest="work_api",
292 help="Select work submission API, one of 'jobs' or 'containers'.")
294 parser.add_argument("--compute-checksum", action="store_true", default=False,
295 help="Compute checksum of contents while collecting outputs",
296 dest="compute_checksum")
298 parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
299 parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
304 def main(args, stdout, stderr, api_client=None):
305 parser = arg_parser()
307 job_order_object = None
308 arvargs = parser.parse_args(args)
309 if arvargs.create_template and not arvargs.job_order:
310 job_order_object = ({}, "")
313 if api_client is None:
314 api_client=arvados.api('v1', model=OrderedJsonModel())
315 runner = ArvCwlRunner(api_client, work_api=arvargs.work_api)
316 except Exception as e:
320 arvargs.conformance_test = None
321 arvargs.use_container = True
323 return cwltool.main.main(args=arvargs,
326 executor=runner.arvExecutor,
327 makeTool=runner.arvMakeTool,
328 versionfunc=versionstring,
329 job_order_object=job_order_object)