3 # Implement cwl-runner interface for submitting and running work on Arvados, using
4 # either the Crunch jobs API or Crunch containers API.
12 from functools import partial
13 import pkg_resources # part of setuptools
15 from cwltool.errors import WorkflowException
17 import cwltool.workflow
23 from .arvcontainer import ArvadosContainer, RunnerContainer
24 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
25 from .arvtool import ArvadosCommandTool
26 from .arvworkflow import ArvadosWorkflow, upload_workflow
27 from .fsaccess import CollectionFsAccess
28 from .perf import Perf
29 from cwltool.pack import pack
31 from cwltool.process import shortname, UnsupportedRequirement
32 from cwltool.pathmapper import adjustFileObjs
33 from cwltool.draft2tool import compute_checksums
34 from arvados.api import OrderedJsonModel
36 logger = logging.getLogger('arvados.cwl-runner')
37 metrics = logging.getLogger('arvados.cwl-runner.metrics')
38 logger.setLevel(logging.INFO)
41 class ArvCwlRunner(object):
42 """Execute a CWL tool or workflow, submit work (using either jobs or
43 containers API), wait for them to complete, and report output.
47 def __init__(self, api_client, work_api=None):
50 self.lock = threading.Lock()
51 self.cond = threading.Condition(self.lock)
52 self.final_output = None
53 self.final_status = None
57 self.work_api = work_api
58 self.stop_polling = threading.Event()
62 if self.work_api is None:
63 # todo: autodetect API to use.
64 self.work_api = "jobs"
66 if self.work_api not in ("containers", "jobs"):
67 raise Exception("Unsupported API '%s'" % self.work_api)
69 def arv_make_tool(self, toolpath_object, **kwargs):
70 kwargs["work_api"] = self.work_api
71 if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
72 return ArvadosCommandTool(self, toolpath_object, **kwargs)
73 elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
74 return ArvadosWorkflow(self, toolpath_object, **kwargs)
76 return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
78 def output_callback(self, out, processStatus):
79 if processStatus == "success":
80 logger.info("Overall process status is %s", processStatus)
82 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
83 body={"state": "Complete"}).execute(num_retries=self.num_retries)
85 logger.warn("Overall process status is %s", processStatus)
87 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
88 body={"state": "Failed"}).execute(num_retries=self.num_retries)
89 self.final_status = processStatus
90 self.final_output = out
92 def on_message(self, event):
93 if "object_uuid" in event:
94 if event["object_uuid"] in self.processes and event["event_type"] == "update":
95 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
96 uuid = event["object_uuid"]
98 j = self.processes[uuid]
99 logger.info("Job %s (%s) is Running", j.name, uuid)
101 j.update_pipeline_component(event["properties"]["new_attributes"])
102 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
103 uuid = event["object_uuid"]
106 j = self.processes[uuid]
107 logger.info("Job %s (%s) is %s", j.name, uuid, event["properties"]["new_attributes"]["state"])
108 with Perf(metrics, "done %s" % j.name):
109 j.done(event["properties"]["new_attributes"])
114 def poll_states(self):
115 """Poll status of jobs or containers listed in the processes dict.
117 Runs in a separate thread.
121 self.stop_polling.wait(15)
122 if self.stop_polling.is_set():
125 keys = self.processes.keys()
129 if self.work_api == "containers":
130 table = self.poll_api.containers()
131 elif self.work_api == "jobs":
132 table = self.poll_api.jobs()
135 proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
136 except Exception as e:
137 logger.warn("Error checking states on API server: %s", e)
140 for p in proc_states["items"]:
142 "object_uuid": p["uuid"],
143 "event_type": "update",
149 def get_uploaded(self):
150 return self.uploaded.copy()
152 def add_uploaded(self, src, pair):
153 self.uploaded[src] = pair
155 def check_writable(self, obj):
156 if isinstance(obj, dict):
157 if obj.get("writable"):
158 raise UnsupportedRequirement("InitialWorkDir feature 'writable: true' not supported")
159 for v in obj.itervalues():
160 self.check_writable(v)
161 if isinstance(obj, list):
163 self.check_writable(v)
165 def arv_executor(self, tool, job_order, **kwargs):
166 self.debug = kwargs.get("debug")
168 tool.visit(self.check_writable)
170 useruuid = self.api.users().current().execute()["uuid"]
171 self.project_uuid = kwargs.get("project_uuid") if kwargs.get("project_uuid") else useruuid
173 make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess, api_client=self.api)
174 self.fs_access = make_fs_access(kwargs["basedir"])
176 if kwargs.get("create_template"):
177 tmpl = RunnerTemplate(self, tool, job_order, kwargs.get("enable_reuse"))
179 # cwltool.main will write our return value to stdout.
182 if kwargs.get("create_workflow") or kwargs.get("update_workflow"):
183 return upload_workflow(self, tool, job_order, self.project_uuid, kwargs.get("update_workflow"))
185 self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
187 kwargs["make_fs_access"] = make_fs_access
188 kwargs["enable_reuse"] = kwargs.get("enable_reuse")
189 kwargs["use_container"] = True
190 kwargs["tmpdir_prefix"] = "tmp"
191 kwargs["on_error"] = "continue"
192 kwargs["compute_checksum"] = kwargs.get("compute_checksum")
194 if self.work_api == "containers":
195 kwargs["outdir"] = "/var/spool/cwl"
196 kwargs["docker_outdir"] = "/var/spool/cwl"
197 kwargs["tmpdir"] = "/tmp"
198 kwargs["docker_tmpdir"] = "/tmp"
199 elif self.work_api == "jobs":
200 kwargs["outdir"] = "$(task.outdir)"
201 kwargs["docker_outdir"] = "$(task.outdir)"
202 kwargs["tmpdir"] = "$(task.tmpdir)"
205 if kwargs.get("submit"):
206 if self.work_api == "containers":
207 if tool.tool["class"] == "CommandLineTool":
208 runnerjob = tool.job(job_order,
209 self.output_callback,
212 runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"))
214 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"))
216 if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and not self.work_api == "containers":
217 # Create pipeline for local run
218 self.pipeline = self.api.pipeline_instances().create(
220 "owner_uuid": self.project_uuid,
221 "name": shortname(tool.tool["id"]),
223 "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
224 logger.info("Pipeline instance %s", self.pipeline["uuid"])
226 if runnerjob and not kwargs.get("wait"):
228 return runnerjob.uuid
230 self.poll_api = arvados.api('v1')
231 self.polling_thread = threading.Thread(target=self.poll_states)
232 self.polling_thread.start()
235 jobiter = iter((runnerjob,))
237 if "cwl_runner_job" in kwargs:
238 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
239 jobiter = tool.job(job_order,
240 self.output_callback,
245 # Will continue to hold the lock for the duration of this code
246 # except when in cond.wait(), at which point on_message can update
247 # job state and process output callbacks.
249 loopperf = Perf(metrics, "jobiter")
251 for runnable in jobiter:
254 with Perf(metrics, "run"):
255 runnable.run(**kwargs)
260 logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
265 while self.processes:
268 except UnsupportedRequirement:
271 if sys.exc_info()[0] is KeyboardInterrupt:
272 logger.error("Interrupted, marking pipeline as failed")
274 logger.error("Caught unhandled exception, marking pipeline as failed. Error was: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
276 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
277 body={"state": "Failed"}).execute(num_retries=self.num_retries)
278 if runnerjob and runnerjob.uuid and self.work_api == "containers":
279 self.api.container_requests().update(uuid=runnerjob.uuid,
280 body={"priority": "0"}).execute(num_retries=self.num_retries)
283 self.stop_polling.set()
284 self.polling_thread.join()
286 if self.final_status == "UnsupportedRequirement":
287 raise UnsupportedRequirement("Check log for details.")
289 if self.final_status != "success":
290 raise WorkflowException("Workflow failed.")
292 if self.final_output is None:
293 raise WorkflowException("Workflow did not return a result.")
295 if kwargs.get("compute_checksum"):
296 adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
298 return self.final_output
302 """Print version string of key packages for provenance and debugging."""
304 arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
305 arvpkg = pkg_resources.require("arvados-python-client")
306 cwlpkg = pkg_resources.require("cwltool")
308 return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
309 "arvados-python-client", arvpkg[0].version,
310 "cwltool", cwlpkg[0].version)
313 def arg_parser(): # type: () -> argparse.ArgumentParser
314 parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
316 parser.add_argument("--basedir", type=str,
317 help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
318 parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
319 help="Output directory, default current directory")
321 parser.add_argument("--eval-timeout",
322 help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
325 parser.add_argument("--version", action="store_true", help="Print version and exit")
327 exgroup = parser.add_mutually_exclusive_group()
328 exgroup.add_argument("--verbose", action="store_true", help="Default logging")
329 exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
330 exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
332 parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
334 parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
336 exgroup = parser.add_mutually_exclusive_group()
337 exgroup.add_argument("--enable-reuse", action="store_true",
338 default=True, dest="enable_reuse",
340 exgroup.add_argument("--disable-reuse", action="store_false",
341 default=True, dest="enable_reuse",
344 parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
345 parser.add_argument("--ignore-docker-for-reuse", action="store_true",
346 help="Ignore Docker image version when deciding whether to reuse past jobs.",
349 exgroup = parser.add_mutually_exclusive_group()
350 exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
351 default=True, dest="submit")
352 exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
353 default=True, dest="submit")
354 exgroup.add_argument("--create-template", action="store_true", help="Create an Arvados pipeline template.")
355 exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow.")
356 exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update existing Arvados workflow with uuid.")
358 exgroup = parser.add_mutually_exclusive_group()
359 exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
360 default=True, dest="wait")
361 exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
362 default=True, dest="wait")
364 parser.add_argument("--api", type=str,
365 default=None, dest="work_api",
366 help="Select work submission API, one of 'jobs' or 'containers'.")
368 parser.add_argument("--compute-checksum", action="store_true", default=False,
369 help="Compute checksum of contents while collecting outputs",
370 dest="compute_checksum")
372 parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
373 parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
379 res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
380 cache["http://arvados.org/cwl"] = res.read()
382 _, cwlnames, _, _ = cwltool.process.get_schema("v1.0")
383 _, extnames, _, _ = schema_salad.schema.load_schema("http://arvados.org/cwl", cache=cache)
384 for n in extnames.names:
385 if not cwlnames.has_name("http://arvados.org/cwl#"+n, ""):
386 cwlnames.add_name("http://arvados.org/cwl#"+n, "", extnames.get_name(n, ""))
388 def main(args, stdout, stderr, api_client=None):
389 parser = arg_parser()
391 job_order_object = None
392 arvargs = parser.parse_args(args)
393 if (arvargs.create_template or arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
394 job_order_object = ({}, "")
399 if api_client is None:
400 api_client=arvados.api('v1', model=OrderedJsonModel())
401 runner = ArvCwlRunner(api_client, work_api=arvargs.work_api)
402 except Exception as e:
407 logger.setLevel(logging.DEBUG)
410 logger.setLevel(logging.WARN)
411 logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
414 metrics.setLevel(logging.DEBUG)
415 logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
417 arvargs.conformance_test = None
418 arvargs.use_container = True
420 return cwltool.main.main(args=arvargs,
423 executor=runner.arv_executor,
424 makeTool=runner.arv_make_tool,
425 versionfunc=versionstring,
426 job_order_object=job_order_object,
427 make_fs_access=partial(CollectionFsAccess, api_client=api_client))