Runs in a separate thread.
"""
- while True:
- self.stop_polling.wait(15)
- if self.stop_polling.is_set():
- break
- with self.lock:
- keys = self.processes.keys()
- if not keys:
- continue
+ try:
+ while True:
+ self.stop_polling.wait(15)
+ if self.stop_polling.is_set():
+ break
+ with self.lock:
+ keys = self.processes.keys()
+ if not keys:
+ continue
- if self.work_api == "containers":
- table = self.poll_api.containers()
- elif self.work_api == "jobs":
- table = self.poll_api.jobs()
+ if self.work_api == "containers":
+ table = self.poll_api.containers()
+ elif self.work_api == "jobs":
+ table = self.poll_api.jobs()
- try:
- proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
- except Exception as e:
- logger.warn("Error checking states on API server: %s", e)
- continue
-
- for p in proc_states["items"]:
- self.on_message({
- "object_uuid": p["uuid"],
- "event_type": "update",
- "properties": {
- "new_attributes": p
- }
- })
+ try:
+ proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
+ except Exception as e:
+ logger.warn("Error checking states on API server: %s", e)
+ continue
+
+ for p in proc_states["items"]:
+ self.on_message({
+ "object_uuid": p["uuid"],
+ "event_type": "update",
+ "properties": {
+ "new_attributes": p
+ }
+ })
+ except:
+ logger.error("Fatal error in state polling thread.", exc_info=(sys.exc_info()[1] if self.debug else False))
+ self.cond.acquire()
+ self.processes.clear()
+ self.cond.notify()
+ self.cond.release()
+ finally:
+ self.stop_polling.set()
def get_uploaded(self):
return self.uploaded.copy()
keep_client=self.keep_client)
self.fs_access = make_fs_access(kwargs["basedir"])
- update_uuid = kwargs.get("update_workflow")
- if update_uuid or kwargs.get("create_workflow"):
+ existing_uuid = kwargs.get("update_workflow")
+ if existing_uuid or kwargs.get("create_workflow"):
if self.work_api == "jobs":
tmpl = RunnerTemplate(self, tool, job_order,
kwargs.get("enable_reuse"),
- uuid=update_uuid)
+ uuid=existing_uuid)
tmpl.save()
# cwltool.main will write our return value to stdout.
return tmpl.uuid
else:
return upload_workflow(self, tool, job_order,
self.project_uuid,
- uuid=update_uuid)
+ uuid=existing_uuid)
self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
loopperf.__enter__()
for runnable in jobiter:
loopperf.__exit__()
+
+ if self.stop_polling.is_set():
+ break
+
if runnable:
with Perf(metrics, "run"):
runnable.run(**kwargs)
if sys.exc_info()[0] is KeyboardInterrupt:
logger.error("Interrupted, marking pipeline as failed")
else:
- logger.error("Caught unhandled exception, marking pipeline as failed. Error was: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
+ logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
if self.pipeline:
self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
body={"state": "Failed"}).execute(num_retries=self.num_retries)
default=True, dest="submit")
exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
dest="create_workflow")
- exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow or pipeline template (depending on selected API, see --api).")
+ exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
exgroup = parser.add_mutually_exclusive_group()
arvargs.conformance_test = None
arvargs.use_container = True
+ arvargs.relax_path_checks = True
return cwltool.main.main(args=arvargs,
stdout=stdout,