3 # Implement cwl-runner interface for submitting and running work on Arvados, using
4 # either the Crunch jobs API or Crunch containers API.
11 import pkg_resources # part of setuptools
13 from cwltool.errors import WorkflowException
15 import cwltool.workflow
21 from .arvcontainer import ArvadosContainer, RunnerContainer
22 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
23 from .arvtool import ArvadosCommandTool
24 from .fsaccess import CollectionFsAccess
26 from cwltool.process import shortname, UnsupportedRequirement
27 from arvados.api import OrderedJsonModel
29 logger = logging.getLogger('arvados.cwl-runner')
30 logger.setLevel(logging.INFO)
32 class ArvCwlRunner(object):
33 """Execute a CWL tool or workflow, submit work (using either jobs or
34 containers API), wait for them to complete, and report output.
38 def __init__(self, api_client, work_api=None):
41 self.lock = threading.Lock()
42 self.cond = threading.Condition(self.lock)
43 self.final_output = None
44 self.final_status = None
48 self.work_api = work_api
50 if self.work_api is None:
51 # todo: autodetect API to use.
52 self.work_api = "jobs"
54 if self.work_api not in ("containers", "jobs"):
55 raise Exception("Unsupported API '%s'" % self.work_api)
57 def arvMakeTool(self, toolpath_object, **kwargs):
58 if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
59 return ArvadosCommandTool(self, toolpath_object, work_api=self.work_api, **kwargs)
61 return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
63 def output_callback(self, out, processStatus):
64 if processStatus == "success":
65 logger.info("Overall process status is %s", processStatus)
67 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
68 body={"state": "Complete"}).execute(num_retries=self.num_retries)
70 logger.warn("Overall process status is %s", processStatus)
72 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
73 body={"state": "Failed"}).execute(num_retries=self.num_retries)
74 self.final_status = processStatus
75 self.final_output = out
77 def on_message(self, event):
78 if "object_uuid" in event:
79 if event["object_uuid"] in self.processes and event["event_type"] == "update":
80 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
81 uuid = event["object_uuid"]
83 j = self.processes[uuid]
84 logger.info("Job %s (%s) is Running", j.name, uuid)
86 j.update_pipeline_component(event["properties"]["new_attributes"])
87 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
88 uuid = event["object_uuid"]
91 j = self.processes[uuid]
92 logger.info("Job %s (%s) is %s", j.name, uuid, event["properties"]["new_attributes"]["state"])
93 j.done(event["properties"]["new_attributes"])
98 def get_uploaded(self):
99 return self.uploaded.copy()
101 def add_uploaded(self, src, pair):
102 self.uploaded[src] = pair
104 def arvExecutor(self, tool, job_order, **kwargs):
105 self.debug = kwargs.get("debug")
107 if kwargs.get("quiet"):
108 logger.setLevel(logging.WARN)
109 logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
111 useruuid = self.api.users().current().execute()["uuid"]
112 self.project_uuid = kwargs.get("project_uuid") if kwargs.get("project_uuid") else useruuid
114 self.fs_access = CollectionFsAccess(kwargs["basedir"], api_client=self.api)
116 if kwargs.get("create_template"):
117 tmpl = RunnerTemplate(self, tool, job_order, kwargs.get("enable_reuse"))
119 # cwltool.main will write our return value to stdout.
122 self.debug = kwargs.get("debug")
123 self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
125 kwargs["fs_access"] = self.fs_access
126 kwargs["enable_reuse"] = kwargs.get("enable_reuse")
127 kwargs["use_container"] = True
128 kwargs["tmpdir_prefix"] = "tmp"
129 kwargs["on_error"] = "continue"
131 if self.work_api == "containers":
132 kwargs["outdir"] = "/var/spool/cwl"
133 kwargs["docker_outdir"] = "/var/spool/cwl"
134 kwargs["tmpdir"] = "/tmp"
135 elif self.work_api == "jobs":
136 kwargs["outdir"] = "$(task.outdir)"
137 kwargs["docker_outdir"] = "$(task.outdir)"
138 kwargs["tmpdir"] = "$(task.tmpdir)"
141 if kwargs.get("submit"):
142 if self.work_api == "containers":
143 if tool.tool["class"] == "CommandLineTool":
144 runnerjob = tool.job(job_order,
145 self.output_callback,
148 runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"))
150 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"))
152 if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and not self.work_api == "containers":
153 # Create pipeline for local run
154 self.pipeline = self.api.pipeline_instances().create(
156 "owner_uuid": self.project_uuid,
157 "name": shortname(tool.tool["id"]),
159 "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
160 logger.info("Pipeline instance %s", self.pipeline["uuid"])
162 if runnerjob and not kwargs.get("wait"):
164 return runnerjob.uuid
166 arvados.config.settings()["ARVADOS_DISABLE_WEBSOCKETS"] = "1"
168 if self.work_api == "containers":
169 events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#container"]], self.on_message)
170 if self.work_api == "jobs":
171 events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#job"]], self.on_message)
174 jobiter = iter((runnerjob,))
176 if "cwl_runner_job" in kwargs:
177 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
178 jobiter = tool.job(job_order,
179 self.output_callback,
184 # Will continue to hold the lock for the duration of this code
185 # except when in cond.wait(), at which point on_message can update
186 # job state and process output callbacks.
188 for runnable in jobiter:
190 runnable.run(**kwargs)
195 logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
198 while self.processes:
202 except UnsupportedRequirement:
205 if sys.exc_info()[0] is KeyboardInterrupt:
206 logger.error("Interrupted, marking pipeline as failed")
208 logger.error("Caught unhandled exception, marking pipeline as failed. Error was: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
210 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
211 body={"state": "Failed"}).execute(num_retries=self.num_retries)
212 if runnerjob and runnerjob.uuid and self.work_api == "containers":
213 self.api.container_requests().update(uuid=runnerjob.uuid,
214 body={"priority": "0"}).execute(num_retries=self.num_retries)
218 if self.final_status == "UnsupportedRequirement":
219 raise UnsupportedRequirement("Check log for details.")
221 if self.final_status != "success":
222 raise WorkflowException("Workflow failed.")
224 if self.final_output is None:
225 raise WorkflowException("Workflow did not return a result.")
227 return self.final_output
231 """Print version string of key packages for provenance and debugging."""
233 arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
234 arvpkg = pkg_resources.require("arvados-python-client")
235 cwlpkg = pkg_resources.require("cwltool")
237 return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
238 "arvados-python-client", arvpkg[0].version,
239 "cwltool", cwlpkg[0].version)
242 def arg_parser(): # type: () -> argparse.ArgumentParser
243 parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
245 parser.add_argument("--basedir", type=str,
246 help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
247 parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
248 help="Output directory, default current directory")
250 parser.add_argument("--eval-timeout",
251 help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
254 parser.add_argument("--version", action="store_true", help="Print version and exit")
256 exgroup = parser.add_mutually_exclusive_group()
257 exgroup.add_argument("--verbose", action="store_true", help="Default logging")
258 exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
259 exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
261 parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
263 exgroup = parser.add_mutually_exclusive_group()
264 exgroup.add_argument("--enable-reuse", action="store_true",
265 default=True, dest="enable_reuse",
267 exgroup.add_argument("--disable-reuse", action="store_false",
268 default=True, dest="enable_reuse",
271 parser.add_argument("--project-uuid", type=str, help="Project that will own the workflow jobs, if not provided, will go to home project.")
272 parser.add_argument("--ignore-docker-for-reuse", action="store_true",
273 help="Ignore Docker image version when deciding whether to reuse past jobs.",
276 exgroup = parser.add_mutually_exclusive_group()
277 exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
278 default=True, dest="submit")
279 exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
280 default=True, dest="submit")
281 exgroup.add_argument("--create-template", action="store_true", help="Create an Arvados pipeline template.")
283 exgroup = parser.add_mutually_exclusive_group()
284 exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
285 default=True, dest="wait")
286 exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
287 default=True, dest="wait")
289 parser.add_argument("--api", type=str,
290 default=None, dest="work_api",
291 help="Select work submission API, one of 'jobs' or 'containers'.")
293 parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
294 parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
299 def main(args, stdout, stderr, api_client=None):
300 parser = arg_parser()
302 job_order_object = None
303 arvargs = parser.parse_args(args)
304 if arvargs.create_template and not arvargs.job_order:
305 job_order_object = ({}, "")
308 if api_client is None:
309 api_client=arvados.api('v1', model=OrderedJsonModel())
310 runner = ArvCwlRunner(api_client, work_api=arvargs.work_api)
311 except Exception as e:
315 arvargs.conformance_test = None
316 arvargs.use_container = True
318 return cwltool.main.main(args=arvargs,
321 executor=runner.arvExecutor,
322 makeTool=runner.arvMakeTool,
323 versionfunc=versionstring,
324 job_order_object=job_order_object)