projects
/
arvados.git
/ blobdiff
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
13804: Try to cancel pending shutdowns if nodes are needed
[arvados.git]
/
services
/
nodemanager
/
arvnodeman
/
computenode
/
dispatch
/
__init__.py
diff --git
a/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py
b/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py
index 6c61e32b8db2c10cbef17216e8e3c80c9e7bfa4e..bb83a193fa30bae46dc7a7f6979843eb65fd42d1 100644
(file)
--- a/
services/nodemanager/arvnodeman/computenode/dispatch/__init__.py
+++ b/
services/nodemanager/arvnodeman/computenode/dispatch/__init__.py
@@
-20,6
+20,7
@@
from .. import \
arvados_node_missing, RetryMixin
from ...clientactor import _notify_subscribers
from ... import config
arvados_node_missing, RetryMixin
from ...clientactor import _notify_subscribers
from ... import config
+from ... import status
from .transitions import transitions
QuotaExceeded = "QuotaExceeded"
from .transitions import transitions
QuotaExceeded = "QuotaExceeded"
@@
-113,14
+114,16
@@
class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
@ComputeNodeStateChangeBase._finish_on_exception
@RetryMixin._retry(config.ARVADOS_ERRORS)
def create_arvados_node(self):
@ComputeNodeStateChangeBase._finish_on_exception
@RetryMixin._retry(config.ARVADOS_ERRORS)
def create_arvados_node(self):
- self.arvados_node = self._arvados.nodes().create(body={}).execute()
+ self.arvados_node = self._arvados.nodes().create(
+ body={}, assign_slot=True).execute()
self._later.create_cloud_node()
@ComputeNodeStateChangeBase._finish_on_exception
@RetryMixin._retry(config.ARVADOS_ERRORS)
def prepare_arvados_node(self, node):
self._later.create_cloud_node()
@ComputeNodeStateChangeBase._finish_on_exception
@RetryMixin._retry(config.ARVADOS_ERRORS)
def prepare_arvados_node(self, node):
- self.arvados_node = self._clean_arvados_node(
- node, "Prepared by Node Manager")
+ self._clean_arvados_node(node, "Prepared by Node Manager")
+ self.arvados_node = self._arvados.nodes().update(
+ uuid=node['uuid'], body={}, assign_slot=True).execute()
self._later.create_cloud_node()
@ComputeNodeStateChangeBase._finish_on_exception
self._later.create_cloud_node()
@ComputeNodeStateChangeBase._finish_on_exception
@@
-240,12
+243,15
@@
class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
return super(ComputeNodeShutdownActor, self)._finished()
def cancel_shutdown(self, reason, **kwargs):
return super(ComputeNodeShutdownActor, self)._finished()
def cancel_shutdown(self, reason, **kwargs):
+ if not self.cancellable:
+ return False
if self.cancel_reason is not None:
# already cancelled
if self.cancel_reason is not None:
# already cancelled
- return
+ return
False
self.cancel_reason = reason
self._logger.info("Shutdown cancelled: %s.", reason)
self._finished(success_flag=False)
self.cancel_reason = reason
self._logger.info("Shutdown cancelled: %s.", reason)
self._finished(success_flag=False)
+ return True
def _cancel_on_exception(orig_func):
@functools.wraps(orig_func)
def _cancel_on_exception(orig_func):
@functools.wraps(orig_func)
@@
-270,12
+276,16
@@
class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
self.cancel_shutdown("No longer eligible for shut down because %s" % reason,
try_resume=True)
return
self.cancel_shutdown("No longer eligible for shut down because %s" % reason,
try_resume=True)
return
+ # If boot failed, count the event
+ if self._monitor.get_state().get() == 'unpaired':
+ status.tracker.counter_add('boot_failures')
self._destroy_node()
def _destroy_node(self):
self._logger.info("Starting shutdown")
arv_node = self._arvados_node()
if self._cloud.destroy_node(self.cloud_node):
self._destroy_node()
def _destroy_node(self):
self._logger.info("Starting shutdown")
arv_node = self._arvados_node()
if self._cloud.destroy_node(self.cloud_node):
+ self.cancellable = False
self._logger.info("Shutdown success")
if arv_node:
self._later.clean_arvados_node(arv_node)
self._logger.info("Shutdown success")
if arv_node:
self._later.clean_arvados_node(arv_node)
@@
-315,7
+325,8
@@
class ComputeNodeUpdateActor(config.actor_class, RetryMixin):
@RetryMixin._retry()
def sync_node(self, cloud_node, arvados_node):
@RetryMixin._retry()
def sync_node(self, cloud_node, arvados_node):
- return self._cloud.sync_node(cloud_node, arvados_node)
+ if self._cloud.node_fqdn(cloud_node) != arvados_node_fqdn(arvados_node):
+ return self._cloud.sync_node(cloud_node, arvados_node)
class ComputeNodeMonitorActor(config.actor_class):
class ComputeNodeMonitorActor(config.actor_class):
@@
-326,14
+337,13
@@
class ComputeNodeMonitorActor(config.actor_class):
for shutdown.
"""
def __init__(self, cloud_node, cloud_node_start_time, shutdown_timer,
for shutdown.
"""
def __init__(self, cloud_node, cloud_node_start_time, shutdown_timer,
-
cloud_fqdn_func,
timer_actor, update_actor, cloud_client,
+ timer_actor, update_actor, cloud_client,
arvados_node=None, poll_stale_after=600, node_stale_after=3600,
boot_fail_after=1800
):
super(ComputeNodeMonitorActor, self).__init__()
self._later = self.actor_ref.tell_proxy()
self._shutdowns = shutdown_timer
arvados_node=None, poll_stale_after=600, node_stale_after=3600,
boot_fail_after=1800
):
super(ComputeNodeMonitorActor, self).__init__()
self._later = self.actor_ref.tell_proxy()
self._shutdowns = shutdown_timer
- self._cloud_node_fqdn = cloud_fqdn_func
self._timer = timer_actor
self._update = update_actor
self._cloud = cloud_client
self._timer = timer_actor
self._update = update_actor
self._cloud = cloud_client
@@
-344,6
+354,7
@@
class ComputeNodeMonitorActor(config.actor_class):
self.boot_fail_after = boot_fail_after
self.subscribers = set()
self.arvados_node = None
self.boot_fail_after = boot_fail_after
self.subscribers = set()
self.arvados_node = None
+ self.consecutive_idle = 0
self._later.update_arvados_node(arvados_node)
self.last_shutdown_opening = None
self._later.consider_shutdown()
self._later.update_arvados_node(arvados_node)
self.last_shutdown_opening = None
self._later.consider_shutdown()
@@
-407,6
+418,12
@@
class ComputeNodeMonitorActor(config.actor_class):
#if state == 'idle' and self.arvados_node['job_uuid']:
# state = 'busy'
#if state == 'idle' and self.arvados_node['job_uuid']:
# state = 'busy'
+ # Update idle node times tracker
+ if state == 'idle':
+ status.tracker.idle_in(self.arvados_node['hostname'])
+ else:
+ status.tracker.idle_out(self.arvados_node['hostname'])
+
return state
def in_state(self, *states):
return state
def in_state(self, *states):
@@
-439,8
+456,14
@@
class ComputeNodeMonitorActor(config.actor_class):
else:
boot_grace = "boot exceeded"
else:
boot_grace = "boot exceeded"
- # API server side not implemented yet.
- idle_grace = 'idle exceeded'
+ if crunch_worker_state == "idle":
+ # Must report as "idle" at least two consecutive times
+ if self.consecutive_idle < 2:
+ idle_grace = 'idle wait'
+ else:
+ idle_grace = 'idle exceeded'
+ else:
+ idle_grace = 'not idle'
node_state = (crunch_worker_state, window, boot_grace, idle_grace)
t = transitions[node_state]
node_state = (crunch_worker_state, window, boot_grace, idle_grace)
t = transitions[node_state]
@@
-486,8
+509,11
@@
class ComputeNodeMonitorActor(config.actor_class):
self._later.consider_shutdown()
def update_arvados_node(self, arvados_node):
self._later.consider_shutdown()
def update_arvados_node(self, arvados_node):
- # If the cloud node's FQDN doesn't match what's in the Arvados node
- # record, make them match.
+ """Called when the latest Arvados node record is retrieved.
+
+ Calls the updater's sync_node() method.
+
+ """
# This method is a little unusual in the way it just fires off the
# request without checking the result or retrying errors. That's
# because this update happens every time we reload the Arvados node
# This method is a little unusual in the way it just fires off the
# request without checking the result or retrying errors. That's
# because this update happens every time we reload the Arvados node
@@
-496,7
+522,9
@@
class ComputeNodeMonitorActor(config.actor_class):
# the logic to throttle those effective retries when there's trouble.
if arvados_node is not None:
self.arvados_node = arvados_node
# the logic to throttle those effective retries when there's trouble.
if arvados_node is not None:
self.arvados_node = arvados_node
- if (self._cloud_node_fqdn(self.cloud_node) !=
- arvados_node_fqdn(self.arvados_node)):
- self._update.sync_node(self.cloud_node, self.arvados_node)
+ self._update.sync_node(self.cloud_node, self.arvados_node)
+ if self.arvados_node['crunch_worker_state'] == "idle":
+ self.consecutive_idle += 1
+ else:
+ self.consecutive_idle = 0
self._later.consider_shutdown()
self._later.consider_shutdown()