- else:
- self.retry_wait = self.min_retry_wait
- return wrapper
- return decorator
-
- def _finished(self):
- _notify_subscribers(self._later, self.subscribers)
- self.subscribers = None
-
- def subscribe(self, subscriber):
- if self.subscribers is None:
- try:
- subscriber(self._later)
- except pykka.ActorDeadError:
- pass
- else:
- self.subscribers.add(subscriber)
-
-
-class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
- """Actor to create and set up a cloud compute node.
-
- This actor prepares an Arvados node record for a new compute node
- (either creating one or cleaning one passed in), then boots the
- actual compute node. It notifies subscribers when the cloud node
- is successfully created (the last step in the process for Node
- Manager to handle).
- """
- def __init__(self, timer_actor, arvados_client, cloud_client,
- cloud_size, arvados_node=None,
- retry_wait=1, max_retry_wait=180):
- super(ComputeNodeSetupActor, self).__init__(
- 'arvnodeman.nodeup', timer_actor, retry_wait, max_retry_wait)
- self._arvados = arvados_client
- self._cloud = cloud_client
- self.cloud_size = cloud_size
- self.arvados_node = None
- self.cloud_node = None
- if arvados_node is None:
- self._later.create_arvados_node()
- else:
- self._later.prepare_arvados_node(arvados_node)
-
- @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
- def create_arvados_node(self):
- self.arvados_node = self._arvados.nodes().create(body={}).execute()
- self._later.create_cloud_node()
-
- @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
- def prepare_arvados_node(self, node):
- self.arvados_node = self._arvados.nodes().update(
- uuid=node['uuid'],
- body={'hostname': None,
- 'ip_address': None,
- 'slot_number': None,
- 'first_ping_at': None,
- 'last_ping_at': None,
- 'info': {'ec2_instance_id': None,
- 'last_action': "Prepared by Node Manager"}}
- ).execute()
- self._later.create_cloud_node()
-
- @ComputeNodeStateChangeBase._retry(config.CLOUD_ERRORS)
- def create_cloud_node(self):
- self._logger.info("Creating cloud node with size %s.",
- self.cloud_size.name)
- self.cloud_node = self._cloud.create_node(self.cloud_size,
- self.arvados_node)
- self._logger.info("Cloud node %s created.", self.cloud_node.id)
- self._finished()
-
- def stop_if_no_cloud_node(self):
- if self.cloud_node is None:
- self.stop()
-
-
-class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
- """Actor to shut down a compute node.
-
- This actor simply destroys a cloud node, retrying as needed.
- """
- def __init__(self, timer_actor, cloud_client, cloud_node,
- retry_wait=1, max_retry_wait=180):
- super(ComputeNodeShutdownActor, self).__init__(
- 'arvnodeman.nodedown', timer_actor, retry_wait, max_retry_wait)
- self._cloud = cloud_client
- self.cloud_node = cloud_node
- self._later.shutdown_node()
-
- @ComputeNodeStateChangeBase._retry(config.CLOUD_ERRORS)
- def shutdown_node(self):
- self._cloud.destroy_node(self.cloud_node)
- self._logger.info("Cloud node %s shut down.", self.cloud_node.id)
- self._finished()
-
-
-class ComputeNodeUpdateActor(config.actor_class):
- """Actor to dispatch one-off cloud management requests.
-
- This actor receives requests for small cloud updates, and
- dispatches them to a real driver. ComputeNodeMonitorActors use
- this to perform maintenance tasks on themselves. Having a
- dedicated actor for this gives us the opportunity to control the
- flow of requests; e.g., by backing off when errors occur.
-
- This actor is most like a "traditional" Pykka actor: there's no
- subscribing, but instead methods return real driver results. If
- you're interested in those results, you should get them from the
- Future that the proxy method returns. Be prepared to handle exceptions
- from the cloud driver when you do.
- """
- def __init__(self, cloud_factory, max_retry_wait=180):
- super(ComputeNodeUpdateActor, self).__init__()
- self._cloud = cloud_factory()
- self.max_retry_wait = max_retry_wait
- self.error_streak = 0
- self.next_request_time = time.time()
-
- def _throttle_errors(orig_func):
- @functools.wraps(orig_func)
- def wrapper(self, *args, **kwargs):
- throttle_time = self.next_request_time - time.time()
- if throttle_time > 0:
- time.sleep(throttle_time)
- self.next_request_time = time.time()
- try:
- result = orig_func(self, *args, **kwargs)
- except config.CLOUD_ERRORS:
- self.error_streak += 1
- self.next_request_time += min(2 ** self.error_streak,
- self.max_retry_wait)
- raise
- else:
- self.error_streak = 0
- return result
- return wrapper
-
- @_throttle_errors
- def sync_node(self, cloud_node, arvados_node):
- return self._cloud.sync_node(cloud_node, arvados_node)