3 from __future__ import absolute_import, print_function
9 import libcloud.common.types as cloud_types
12 from .. import arvados_node_fqdn, arvados_node_mtime, timestamp_fresh
13 from ...clientactor import _notify_subscribers
14 from ... import config
16 class ComputeNodeStateChangeBase(config.actor_class):
17 """Base class for actors that change a compute node's state.
19 This base class takes care of retrying changes and notifying
20 subscribers when the change is finished.
22 def __init__(self, logger_name, timer_actor, retry_wait, max_retry_wait):
23 super(ComputeNodeStateChangeBase, self).__init__()
24 self._later = self.actor_ref.proxy()
25 self._timer = timer_actor
26 self._logger = logging.getLogger(logger_name)
27 self.min_retry_wait = retry_wait
28 self.max_retry_wait = max_retry_wait
29 self.retry_wait = retry_wait
30 self.subscribers = set()
34 """Retry decorator for an actor method that makes remote requests.
36 Use this function to decorator an actor method, and pass in a
37 tuple of exceptions to catch. This decorator will schedule
38 retries of that method with exponential backoff if the
39 original method raises any of the given errors.
41 def decorator(orig_func):
42 @functools.wraps(orig_func)
43 def wrapper(self, *args, **kwargs):
44 start_time = time.time()
46 orig_func(self, *args, **kwargs)
47 except errors as error:
49 "Client error: %s - waiting %s seconds",
50 error, self.retry_wait)
51 self._timer.schedule(start_time + self.retry_wait,
55 self.retry_wait = min(self.retry_wait * 2,
58 self.retry_wait = self.min_retry_wait
63 _notify_subscribers(self._later, self.subscribers)
64 self.subscribers = None
66 def subscribe(self, subscriber):
67 if self.subscribers is None:
69 subscriber(self._later)
70 except pykka.ActorDeadError:
73 self.subscribers.add(subscriber)
76 class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
77 """Actor to create and set up a cloud compute node.
79 This actor prepares an Arvados node record for a new compute node
80 (either creating one or cleaning one passed in), then boots the
81 actual compute node. It notifies subscribers when the cloud node
82 is successfully created (the last step in the process for Node
85 def __init__(self, timer_actor, arvados_client, cloud_client,
86 cloud_size, arvados_node=None,
87 retry_wait=1, max_retry_wait=180):
88 super(ComputeNodeSetupActor, self).__init__(
89 'arvnodeman.nodeup', timer_actor, retry_wait, max_retry_wait)
90 self._arvados = arvados_client
91 self._cloud = cloud_client
92 self.cloud_size = cloud_size
93 self.arvados_node = None
94 self.cloud_node = None
95 if arvados_node is None:
96 self._later.create_arvados_node()
98 self._later.prepare_arvados_node(arvados_node)
100 @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
101 def create_arvados_node(self):
102 self.arvados_node = self._arvados.nodes().create(body={}).execute()
103 self._later.create_cloud_node()
105 @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
106 def prepare_arvados_node(self, node):
107 self.arvados_node = self._arvados.nodes().update(
109 body={'hostname': None,
112 'first_ping_at': None,
113 'last_ping_at': None,
114 'info': {'ec2_instance_id': None,
115 'last_action': "Prepared by Node Manager"}}
117 self._later.create_cloud_node()
119 @ComputeNodeStateChangeBase._retry(config.CLOUD_ERRORS)
120 def create_cloud_node(self):
121 self._logger.info("Creating cloud node with size %s.",
122 self.cloud_size.name)
123 self.cloud_node = self._cloud.create_node(self.cloud_size,
125 self._logger.info("Cloud node %s created.", self.cloud_node.id)
128 def stop_if_no_cloud_node(self):
129 if self.cloud_node is None:
133 class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
134 """Actor to shut down a compute node.
136 This actor simply destroys a cloud node, retrying as needed.
138 def __init__(self, timer_actor, cloud_client, node_monitor,
139 retry_wait=1, max_retry_wait=180):
140 super(ComputeNodeShutdownActor, self).__init__(
141 'arvnodeman.nodedown', timer_actor, retry_wait, max_retry_wait)
142 self._cloud = cloud_client
143 self._monitor = node_monitor.proxy()
144 self.cloud_node = self._monitor.cloud_node.get()
148 self._later.shutdown_node()
150 def cancel_shutdown(self):
154 def _stop_if_window_closed(orig_func):
155 @functools.wraps(orig_func)
156 def wrapper(self, *args, **kwargs):
157 if not self._monitor.shutdown_eligible().get():
159 "Cloud node %s shutdown cancelled - no longer eligible.",
161 self._later.cancel_shutdown()
164 return orig_func(self, *args, **kwargs)
167 @_stop_if_window_closed
168 @ComputeNodeStateChangeBase._retry(config.CLOUD_ERRORS)
169 def shutdown_node(self):
170 if self._cloud.destroy_node(self.cloud_node):
171 self._logger.info("Cloud node %s shut down.", self.cloud_node.id)
176 raise cloud_types.LibcloudError("destroy_node failed")
178 # Make the decorator available to subclasses.
179 _stop_if_window_closed = staticmethod(_stop_if_window_closed)
182 class ComputeNodeUpdateActor(config.actor_class):
183 """Actor to dispatch one-off cloud management requests.
185 This actor receives requests for small cloud updates, and
186 dispatches them to a real driver. ComputeNodeMonitorActors use
187 this to perform maintenance tasks on themselves. Having a
188 dedicated actor for this gives us the opportunity to control the
189 flow of requests; e.g., by backing off when errors occur.
191 This actor is most like a "traditional" Pykka actor: there's no
192 subscribing, but instead methods return real driver results. If
193 you're interested in those results, you should get them from the
194 Future that the proxy method returns. Be prepared to handle exceptions
195 from the cloud driver when you do.
197 def __init__(self, cloud_factory, max_retry_wait=180):
198 super(ComputeNodeUpdateActor, self).__init__()
199 self._cloud = cloud_factory()
200 self.max_retry_wait = max_retry_wait
201 self.error_streak = 0
202 self.next_request_time = time.time()
204 def _throttle_errors(orig_func):
205 @functools.wraps(orig_func)
206 def wrapper(self, *args, **kwargs):
207 throttle_time = self.next_request_time - time.time()
208 if throttle_time > 0:
209 time.sleep(throttle_time)
210 self.next_request_time = time.time()
212 result = orig_func(self, *args, **kwargs)
213 except config.CLOUD_ERRORS:
214 self.error_streak += 1
215 self.next_request_time += min(2 ** self.error_streak,
219 self.error_streak = 0
224 def sync_node(self, cloud_node, arvados_node):
225 return self._cloud.sync_node(cloud_node, arvados_node)
228 class ComputeNodeMonitorActor(config.actor_class):
229 """Actor to manage a running compute node.
231 This actor gets updates about a compute node's cloud and Arvados records.
232 It uses this information to notify subscribers when the node is eligible
235 def __init__(self, cloud_node, cloud_node_start_time, shutdown_timer,
236 timer_actor, update_actor, arvados_node=None,
237 poll_stale_after=600, node_stale_after=3600):
238 super(ComputeNodeMonitorActor, self).__init__()
239 self._later = self.actor_ref.proxy()
240 self._logger = logging.getLogger('arvnodeman.computenode')
241 self._last_log = None
242 self._shutdowns = shutdown_timer
243 self._timer = timer_actor
244 self._update = update_actor
245 self.cloud_node = cloud_node
246 self.cloud_node_start_time = cloud_node_start_time
247 self.poll_stale_after = poll_stale_after
248 self.node_stale_after = node_stale_after
249 self.subscribers = set()
250 self.arvados_node = None
251 self._later.update_arvados_node(arvados_node)
252 self.last_shutdown_opening = None
253 self._later.consider_shutdown()
255 def subscribe(self, subscriber):
256 self.subscribers.add(subscriber)
258 def _debug(self, msg, *args):
259 if msg == self._last_log:
262 self._logger.debug(msg, *args)
264 def in_state(self, *states):
265 # Return a boolean to say whether or not our Arvados node record is in
266 # one of the given states. If state information is not
267 # available--because this node has no Arvados record, the record is
268 # stale, or the record has no state information--return None.
269 if (self.arvados_node is None) or not timestamp_fresh(
270 arvados_node_mtime(self.arvados_node), self.node_stale_after):
272 state = self.arvados_node['info'].get('slurm_state')
275 result = state in states
277 result = result and not self.arvados_node['job_uuid']
280 def shutdown_eligible(self):
281 if not self._shutdowns.window_open():
283 elif self.arvados_node is None:
284 # If this is a new, unpaired node, it's eligible for
285 # shutdown--we figure there was an error during bootstrap.
286 return timestamp_fresh(self.cloud_node_start_time,
287 self.node_stale_after)
289 return self.in_state('idle')
291 def consider_shutdown(self):
292 next_opening = self._shutdowns.next_opening()
293 if self.shutdown_eligible():
294 self._debug("Node %s suggesting shutdown.", self.cloud_node.id)
295 _notify_subscribers(self._later, self.subscribers)
296 elif self._shutdowns.window_open():
297 self._debug("Node %s shutdown window open but node busy.",
299 elif self.last_shutdown_opening != next_opening:
300 self._debug("Node %s shutdown window closed. Next at %s.",
301 self.cloud_node.id, time.ctime(next_opening))
302 self._timer.schedule(next_opening, self._later.consider_shutdown)
303 self.last_shutdown_opening = next_opening
305 def offer_arvados_pair(self, arvados_node):
306 if self.arvados_node is not None:
308 elif arvados_node['ip_address'] in self.cloud_node.private_ips:
309 self._later.update_arvados_node(arvados_node)
310 return self.cloud_node.id
314 def update_cloud_node(self, cloud_node):
315 if cloud_node is not None:
316 self.cloud_node = cloud_node
317 self._later.consider_shutdown()
319 def update_arvados_node(self, arvados_node):
320 if arvados_node is not None:
321 self.arvados_node = arvados_node
322 new_hostname = arvados_node_fqdn(self.arvados_node)
323 if new_hostname != self.cloud_node.name:
324 self._update.sync_node(self.cloud_node, self.arvados_node)
325 self._later.consider_shutdown()