3 # Implement cwl-runner interface for submitting and running work on Arvados, using
4 # either the Crunch jobs API or Crunch containers API.
14 from functools import partial
15 import pkg_resources # part of setuptools
17 from cwltool.errors import WorkflowException
19 import cwltool.workflow
25 from .arvcontainer import ArvadosContainer, RunnerContainer
26 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
27 from .arvtool import ArvadosCommandTool
28 from .arvworkflow import ArvadosWorkflow, upload_workflow
29 from .fsaccess import CollectionFsAccess
30 from .perf import Perf
31 from .pathmapper import InitialWorkDirPathMapper
33 from cwltool.pack import pack
34 from cwltool.process import shortname, UnsupportedRequirement
35 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
36 from cwltool.draft2tool import compute_checksums
37 from arvados.api import OrderedJsonModel
39 logger = logging.getLogger('arvados.cwl-runner')
40 metrics = logging.getLogger('arvados.cwl-runner.metrics')
41 logger.setLevel(logging.INFO)
44 class ArvCwlRunner(object):
45 """Execute a CWL tool or workflow, submit work (using either jobs or
46 containers API), wait for them to complete, and report output.
50 def __init__(self, api_client, work_api=None):
53 self.lock = threading.Lock()
54 self.cond = threading.Condition(self.lock)
55 self.final_output = None
56 self.final_status = None
60 self.work_api = work_api
61 self.stop_polling = threading.Event()
64 self.final_output_collection = None
66 if self.work_api is None:
67 # todo: autodetect API to use.
68 self.work_api = "jobs"
70 if self.work_api not in ("containers", "jobs"):
71 raise Exception("Unsupported API '%s'" % self.work_api)
73 def arv_make_tool(self, toolpath_object, **kwargs):
74 kwargs["work_api"] = self.work_api
75 if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
76 return ArvadosCommandTool(self, toolpath_object, **kwargs)
77 elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
78 return ArvadosWorkflow(self, toolpath_object, **kwargs)
80 return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
82 def output_callback(self, out, processStatus):
83 if processStatus == "success":
84 logger.info("Overall process status is %s", processStatus)
86 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
87 body={"state": "Complete"}).execute(num_retries=self.num_retries)
89 logger.warn("Overall process status is %s", processStatus)
91 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
92 body={"state": "Failed"}).execute(num_retries=self.num_retries)
93 self.final_status = processStatus
94 self.final_output = out
96 def on_message(self, event):
97 if "object_uuid" in event:
98 if event["object_uuid"] in self.processes and event["event_type"] == "update":
99 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
100 uuid = event["object_uuid"]
102 j = self.processes[uuid]
103 logger.info("Job %s (%s) is Running", j.name, uuid)
105 j.update_pipeline_component(event["properties"]["new_attributes"])
106 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
107 uuid = event["object_uuid"]
110 j = self.processes[uuid]
111 logger.info("Job %s (%s) is %s", j.name, uuid, event["properties"]["new_attributes"]["state"])
112 with Perf(metrics, "done %s" % j.name):
113 j.done(event["properties"]["new_attributes"])
118 def poll_states(self):
119 """Poll status of jobs or containers listed in the processes dict.
121 Runs in a separate thread.
125 self.stop_polling.wait(15)
126 if self.stop_polling.is_set():
129 keys = self.processes.keys()
133 if self.work_api == "containers":
134 table = self.poll_api.containers()
135 elif self.work_api == "jobs":
136 table = self.poll_api.jobs()
139 proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
140 except Exception as e:
141 logger.warn("Error checking states on API server: %s", e)
144 for p in proc_states["items"]:
146 "object_uuid": p["uuid"],
147 "event_type": "update",
153 def get_uploaded(self):
154 return self.uploaded.copy()
156 def add_uploaded(self, src, pair):
157 self.uploaded[src] = pair
159 def check_writable(self, obj):
160 if isinstance(obj, dict):
161 if obj.get("writable"):
162 raise UnsupportedRequirement("InitialWorkDir feature 'writable: true' not supported")
163 for v in obj.itervalues():
164 self.check_writable(v)
165 if isinstance(obj, list):
167 self.check_writable(v)
169 def make_output_collection(self, name, outputObj):
170 outputObj = copy.deepcopy(outputObj)
173 def capture(fileobj):
174 files.append(fileobj)
176 adjustDirObjs(outputObj, capture)
177 adjustFileObjs(outputObj, capture)
179 generatemapper = InitialWorkDirPathMapper(files, "", "",
182 final = arvados.collection.Collection()
185 for k,v in generatemapper.items():
187 srccollection = sp[0][5:]
188 if srccollection not in srccollections:
189 srccollections[srccollection] = arvados.collection.CollectionReader(srccollection)
190 reader = srccollections[srccollection]
192 final.copy("/".join(sp[1:]), v.target, source_collection=reader, overwrite=False)
194 logger.warn("While preparing output collection: %s", e)
196 def rewrite(fileobj):
197 fileobj["location"] = generatemapper.mapper(fileobj["location"])
199 adjustDirObjs(outputObj, capture)
200 adjustFileObjs(outputObj, capture)
202 with final.open("cwl.output.json", "w") as f:
203 json.dump(outputObj, f, indent=4)
205 final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
207 logger.info("Final output collection %s (%s)", final.portable_data_hash(), final.manifest_locator())
209 self.final_output_collection = final
211 def arv_executor(self, tool, job_order, **kwargs):
212 self.debug = kwargs.get("debug")
214 tool.visit(self.check_writable)
216 useruuid = self.api.users().current().execute()["uuid"]
217 self.project_uuid = kwargs.get("project_uuid") if kwargs.get("project_uuid") else useruuid
219 make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess, api_client=self.api)
220 self.fs_access = make_fs_access(kwargs["basedir"])
222 if kwargs.get("create_template"):
223 tmpl = RunnerTemplate(self, tool, job_order, kwargs.get("enable_reuse"))
225 # cwltool.main will write our return value to stdout.
228 if kwargs.get("create_workflow") or kwargs.get("update_workflow"):
229 return upload_workflow(self, tool, job_order, self.project_uuid, kwargs.get("update_workflow"))
231 self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
233 kwargs["make_fs_access"] = make_fs_access
234 kwargs["enable_reuse"] = kwargs.get("enable_reuse")
235 kwargs["use_container"] = True
236 kwargs["tmpdir_prefix"] = "tmp"
237 kwargs["on_error"] = "continue"
238 kwargs["compute_checksum"] = kwargs.get("compute_checksum")
240 if self.work_api == "containers":
241 kwargs["outdir"] = "/var/spool/cwl"
242 kwargs["docker_outdir"] = "/var/spool/cwl"
243 kwargs["tmpdir"] = "/tmp"
244 kwargs["docker_tmpdir"] = "/tmp"
245 elif self.work_api == "jobs":
246 kwargs["outdir"] = "$(task.outdir)"
247 kwargs["docker_outdir"] = "$(task.outdir)"
248 kwargs["tmpdir"] = "$(task.tmpdir)"
251 if kwargs.get("submit"):
252 if self.work_api == "containers":
253 if tool.tool["class"] == "CommandLineTool":
254 runnerjob = tool.job(job_order,
255 self.output_callback,
258 runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"))
260 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"))
262 if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and not self.work_api == "containers":
263 # Create pipeline for local run
264 self.pipeline = self.api.pipeline_instances().create(
266 "owner_uuid": self.project_uuid,
267 "name": shortname(tool.tool["id"]),
269 "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
270 logger.info("Pipeline instance %s", self.pipeline["uuid"])
272 if runnerjob and not kwargs.get("wait"):
274 return runnerjob.uuid
276 self.poll_api = arvados.api('v1')
277 self.polling_thread = threading.Thread(target=self.poll_states)
278 self.polling_thread.start()
281 jobiter = iter((runnerjob,))
283 if "cwl_runner_job" in kwargs:
284 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
285 jobiter = tool.job(job_order,
286 self.output_callback,
291 # Will continue to hold the lock for the duration of this code
292 # except when in cond.wait(), at which point on_message can update
293 # job state and process output callbacks.
295 loopperf = Perf(metrics, "jobiter")
297 for runnable in jobiter:
300 with Perf(metrics, "run"):
301 runnable.run(**kwargs)
306 logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
311 while self.processes:
314 except UnsupportedRequirement:
317 if sys.exc_info()[0] is KeyboardInterrupt:
318 logger.error("Interrupted, marking pipeline as failed")
320 logger.error("Caught unhandled exception, marking pipeline as failed. Error was: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
322 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
323 body={"state": "Failed"}).execute(num_retries=self.num_retries)
324 if runnerjob and runnerjob.uuid and self.work_api == "containers":
325 self.api.container_requests().update(uuid=runnerjob.uuid,
326 body={"priority": "0"}).execute(num_retries=self.num_retries)
329 self.stop_polling.set()
330 self.polling_thread.join()
332 if self.final_status == "UnsupportedRequirement":
333 raise UnsupportedRequirement("Check log for details.")
335 if self.final_status != "success":
336 raise WorkflowException("Workflow failed.")
338 if self.final_output is None:
339 raise WorkflowException("Workflow did not return a result.")
341 if kwargs.get("compute_checksum"):
342 adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
344 if kwargs.get("submit"):
345 logger.info("Final output collection %s", runnerjob.final_output)
347 self.make_output_collection("Output of %s" % (shortname(tool.tool["id"])),
350 return self.final_output
354 """Print version string of key packages for provenance and debugging."""
356 arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
357 arvpkg = pkg_resources.require("arvados-python-client")
358 cwlpkg = pkg_resources.require("cwltool")
360 return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
361 "arvados-python-client", arvpkg[0].version,
362 "cwltool", cwlpkg[0].version)
365 def arg_parser(): # type: () -> argparse.ArgumentParser
366 parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
368 parser.add_argument("--basedir", type=str,
369 help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
370 parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
371 help="Output directory, default current directory")
373 parser.add_argument("--eval-timeout",
374 help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
377 parser.add_argument("--version", action="store_true", help="Print version and exit")
379 exgroup = parser.add_mutually_exclusive_group()
380 exgroup.add_argument("--verbose", action="store_true", help="Default logging")
381 exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
382 exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
384 parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
386 parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
388 exgroup = parser.add_mutually_exclusive_group()
389 exgroup.add_argument("--enable-reuse", action="store_true",
390 default=True, dest="enable_reuse",
392 exgroup.add_argument("--disable-reuse", action="store_false",
393 default=True, dest="enable_reuse",
396 parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
397 parser.add_argument("--ignore-docker-for-reuse", action="store_true",
398 help="Ignore Docker image version when deciding whether to reuse past jobs.",
401 exgroup = parser.add_mutually_exclusive_group()
402 exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
403 default=True, dest="submit")
404 exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
405 default=True, dest="submit")
406 exgroup.add_argument("--create-template", action="store_true", help="Create an Arvados pipeline template.")
407 exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow.")
408 exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update existing Arvados workflow with uuid.")
410 exgroup = parser.add_mutually_exclusive_group()
411 exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
412 default=True, dest="wait")
413 exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
414 default=True, dest="wait")
416 parser.add_argument("--api", type=str,
417 default=None, dest="work_api",
418 help="Select work submission API, one of 'jobs' or 'containers'.")
420 parser.add_argument("--compute-checksum", action="store_true", default=False,
421 help="Compute checksum of contents while collecting outputs",
422 dest="compute_checksum")
424 parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
425 parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
431 res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
432 cache["http://arvados.org/cwl"] = res.read()
434 _, cwlnames, _, _ = cwltool.process.get_schema("v1.0")
435 _, extnames, _, _ = schema_salad.schema.load_schema("http://arvados.org/cwl", cache=cache)
436 for n in extnames.names:
437 if not cwlnames.has_name("http://arvados.org/cwl#"+n, ""):
438 cwlnames.add_name("http://arvados.org/cwl#"+n, "", extnames.get_name(n, ""))
440 def main(args, stdout, stderr, api_client=None):
441 parser = arg_parser()
443 job_order_object = None
444 arvargs = parser.parse_args(args)
445 if (arvargs.create_template or arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
446 job_order_object = ({}, "")
451 if api_client is None:
452 api_client=arvados.api('v1', model=OrderedJsonModel())
453 runner = ArvCwlRunner(api_client, work_api=arvargs.work_api)
454 except Exception as e:
459 logger.setLevel(logging.DEBUG)
462 logger.setLevel(logging.WARN)
463 logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
466 metrics.setLevel(logging.DEBUG)
467 logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
469 arvargs.conformance_test = None
470 arvargs.use_container = True
472 return cwltool.main.main(args=arvargs,
475 executor=runner.arv_executor,
476 makeTool=runner.arv_make_tool,
477 versionfunc=versionstring,
478 job_order_object=job_order_object,
479 make_fs_access=partial(CollectionFsAccess, api_client=api_client))