3 from __future__ import absolute_import, print_function
9 import libcloud.common.types as cloud_types
12 from .. import arvados_node_fqdn, arvados_node_mtime, timestamp_fresh
13 from ...clientactor import _notify_subscribers
14 from ... import config
16 class ComputeNodeStateChangeBase(config.actor_class):
17 """Base class for actors that change a compute node's state.
19 This base class takes care of retrying changes and notifying
20 subscribers when the change is finished.
22 def __init__(self, logger_name, cloud_client, timer_actor,
23 retry_wait, max_retry_wait):
24 super(ComputeNodeStateChangeBase, self).__init__()
25 self._later = self.actor_ref.proxy()
26 self._logger = logging.getLogger(logger_name)
27 self._cloud = cloud_client
28 self._timer = timer_actor
29 self.min_retry_wait = retry_wait
30 self.max_retry_wait = max_retry_wait
31 self.retry_wait = retry_wait
32 self.subscribers = set()
35 def _retry(errors=()):
36 """Retry decorator for an actor method that makes remote requests.
38 Use this function to decorator an actor method, and pass in a
39 tuple of exceptions to catch. This decorator will schedule
40 retries of that method with exponential backoff if the
41 original method raises a known cloud driver error, or any of the
42 given exception types.
44 def decorator(orig_func):
45 @functools.wraps(orig_func)
46 def retry_wrapper(self, *args, **kwargs):
47 start_time = time.time()
49 orig_func(self, *args, **kwargs)
50 except Exception as error:
51 if not (isinstance(error, errors) or
52 self._cloud.is_cloud_exception(error)):
55 "Client error: %s - waiting %s seconds",
56 error, self.retry_wait)
57 self._timer.schedule(start_time + self.retry_wait,
61 self.retry_wait = min(self.retry_wait * 2,
64 self.retry_wait = self.min_retry_wait
69 _notify_subscribers(self._later, self.subscribers)
70 self.subscribers = None
72 def subscribe(self, subscriber):
73 if self.subscribers is None:
75 subscriber(self._later)
76 except pykka.ActorDeadError:
79 self.subscribers.add(subscriber)
82 class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
83 """Actor to create and set up a cloud compute node.
85 This actor prepares an Arvados node record for a new compute node
86 (either creating one or cleaning one passed in), then boots the
87 actual compute node. It notifies subscribers when the cloud node
88 is successfully created (the last step in the process for Node
91 def __init__(self, timer_actor, arvados_client, cloud_client,
92 cloud_size, arvados_node=None,
93 retry_wait=1, max_retry_wait=180):
94 super(ComputeNodeSetupActor, self).__init__(
95 'arvnodeman.nodeup', cloud_client, timer_actor,
96 retry_wait, max_retry_wait)
97 self._arvados = arvados_client
98 self.cloud_size = cloud_size
99 self.arvados_node = None
100 self.cloud_node = None
101 if arvados_node is None:
102 self._later.create_arvados_node()
104 self._later.prepare_arvados_node(arvados_node)
106 @ComputeNodeStateChangeBase._retry()
107 def create_arvados_node(self):
108 self.arvados_node = self._arvados.nodes().create(body={}).execute()
109 self._later.create_cloud_node()
111 @ComputeNodeStateChangeBase._retry()
112 def prepare_arvados_node(self, node):
113 self.arvados_node = self._arvados.nodes().update(
115 body={'hostname': None,
118 'first_ping_at': None,
119 'last_ping_at': None,
120 'info': {'ec2_instance_id': None,
121 'last_action': "Prepared by Node Manager"}}
123 self._later.create_cloud_node()
125 @ComputeNodeStateChangeBase._retry()
126 def create_cloud_node(self):
127 self._logger.info("Creating cloud node with size %s.",
128 self.cloud_size.name)
129 self.cloud_node = self._cloud.create_node(self.cloud_size,
131 self._logger.info("Cloud node %s created.", self.cloud_node.id)
132 self._later.post_create()
134 @ComputeNodeStateChangeBase._retry()
135 def post_create(self):
136 self._cloud.post_create_node(self.cloud_node)
137 self._logger.info("%s post-create work done.", self.cloud_node.id)
140 def stop_if_no_cloud_node(self):
141 if self.cloud_node is None:
145 class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
146 """Actor to shut down a compute node.
148 This actor simply destroys a cloud node, retrying as needed.
150 def __init__(self, timer_actor, cloud_client, node_monitor,
151 cancellable=True, retry_wait=1, max_retry_wait=180):
152 # If a ShutdownActor is cancellable, it will ask the
153 # ComputeNodeMonitorActor if it's still eligible before taking each
154 # action, and stop the shutdown process if the node is no longer
155 # eligible. Normal shutdowns based on job demand should be
156 # cancellable; shutdowns based on node misbehavior should not.
157 super(ComputeNodeShutdownActor, self).__init__(
158 'arvnodeman.nodedown', cloud_client, timer_actor,
159 retry_wait, max_retry_wait)
160 self._monitor = node_monitor.proxy()
161 self.cloud_node = self._monitor.cloud_node.get()
162 self.cancellable = cancellable
166 self._later.shutdown_node()
168 def cancel_shutdown(self):
172 def _stop_if_window_closed(orig_func):
173 @functools.wraps(orig_func)
174 def stop_wrapper(self, *args, **kwargs):
175 if (self.cancellable and
176 (not self._monitor.shutdown_eligible().get())):
178 "Cloud node %s shutdown cancelled - no longer eligible.",
180 self._later.cancel_shutdown()
183 return orig_func(self, *args, **kwargs)
186 @_stop_if_window_closed
187 @ComputeNodeStateChangeBase._retry()
188 def shutdown_node(self):
189 if self._cloud.destroy_node(self.cloud_node):
190 self._logger.info("Cloud node %s shut down.", self.cloud_node.id)
195 raise cloud_types.LibcloudError("destroy_node failed")
197 # Make the decorator available to subclasses.
198 _stop_if_window_closed = staticmethod(_stop_if_window_closed)
201 class ComputeNodeUpdateActor(config.actor_class):
202 """Actor to dispatch one-off cloud management requests.
204 This actor receives requests for small cloud updates, and
205 dispatches them to a real driver. ComputeNodeMonitorActors use
206 this to perform maintenance tasks on themselves. Having a
207 dedicated actor for this gives us the opportunity to control the
208 flow of requests; e.g., by backing off when errors occur.
210 This actor is most like a "traditional" Pykka actor: there's no
211 subscribing, but instead methods return real driver results. If
212 you're interested in those results, you should get them from the
213 Future that the proxy method returns. Be prepared to handle exceptions
214 from the cloud driver when you do.
216 def __init__(self, cloud_factory, max_retry_wait=180):
217 super(ComputeNodeUpdateActor, self).__init__()
218 self._cloud = cloud_factory()
219 self.max_retry_wait = max_retry_wait
220 self.error_streak = 0
221 self.next_request_time = time.time()
223 def _throttle_errors(orig_func):
224 @functools.wraps(orig_func)
225 def throttle_wrapper(self, *args, **kwargs):
226 throttle_time = self.next_request_time - time.time()
227 if throttle_time > 0:
228 time.sleep(throttle_time)
229 self.next_request_time = time.time()
231 result = orig_func(self, *args, **kwargs)
232 except Exception as error:
233 self.error_streak += 1
234 self.next_request_time += min(2 ** self.error_streak,
238 self.error_streak = 0
240 return throttle_wrapper
243 def sync_node(self, cloud_node, arvados_node):
244 return self._cloud.sync_node(cloud_node, arvados_node)
247 class ComputeNodeMonitorActor(config.actor_class):
248 """Actor to manage a running compute node.
250 This actor gets updates about a compute node's cloud and Arvados records.
251 It uses this information to notify subscribers when the node is eligible
254 def __init__(self, cloud_node, cloud_node_start_time, shutdown_timer,
255 timer_actor, update_actor, arvados_node=None,
256 poll_stale_after=600, node_stale_after=3600):
257 super(ComputeNodeMonitorActor, self).__init__()
258 self._later = self.actor_ref.proxy()
259 self._logger = logging.getLogger('arvnodeman.computenode')
260 self._last_log = None
261 self._shutdowns = shutdown_timer
262 self._timer = timer_actor
263 self._update = update_actor
264 self.cloud_node = cloud_node
265 self.cloud_node_start_time = cloud_node_start_time
266 self.poll_stale_after = poll_stale_after
267 self.node_stale_after = node_stale_after
268 self.subscribers = set()
269 self.arvados_node = None
270 self._later.update_arvados_node(arvados_node)
271 self.last_shutdown_opening = None
272 self._later.consider_shutdown()
274 def subscribe(self, subscriber):
275 self.subscribers.add(subscriber)
277 def _debug(self, msg, *args):
278 if msg == self._last_log:
281 self._logger.debug(msg, *args)
283 def in_state(self, *states):
284 # Return a boolean to say whether or not our Arvados node record is in
285 # one of the given states. If state information is not
286 # available--because this node has no Arvados record, the record is
287 # stale, or the record has no state information--return None.
288 if (self.arvados_node is None) or not timestamp_fresh(
289 arvados_node_mtime(self.arvados_node), self.node_stale_after):
291 state = self.arvados_node['info'].get('slurm_state')
294 result = state in states
296 result = result and not self.arvados_node['job_uuid']
299 def shutdown_eligible(self):
300 if not self._shutdowns.window_open():
302 elif self.arvados_node is None:
303 # If this is a new, unpaired node, it's eligible for
304 # shutdown--we figure there was an error during bootstrap.
305 return timestamp_fresh(self.cloud_node_start_time,
306 self.node_stale_after)
308 return self.in_state('idle')
310 def consider_shutdown(self):
311 next_opening = self._shutdowns.next_opening()
312 if self.shutdown_eligible():
313 self._debug("Node %s suggesting shutdown.", self.cloud_node.id)
314 _notify_subscribers(self._later, self.subscribers)
315 elif self._shutdowns.window_open():
316 self._debug("Node %s shutdown window open but node busy.",
318 elif self.last_shutdown_opening != next_opening:
319 self._debug("Node %s shutdown window closed. Next at %s.",
320 self.cloud_node.id, time.ctime(next_opening))
321 self._timer.schedule(next_opening, self._later.consider_shutdown)
322 self.last_shutdown_opening = next_opening
324 def offer_arvados_pair(self, arvados_node):
325 if self.arvados_node is not None:
327 elif arvados_node['ip_address'] in self.cloud_node.private_ips:
328 self._later.update_arvados_node(arvados_node)
329 return self.cloud_node.id
333 def update_cloud_node(self, cloud_node):
334 if cloud_node is not None:
335 self.cloud_node = cloud_node
336 self._later.consider_shutdown()
338 def update_arvados_node(self, arvados_node):
339 if arvados_node is not None:
340 self.arvados_node = arvados_node
341 new_hostname = arvados_node_fqdn(self.arvados_node)
342 if new_hostname != self.cloud_node.name:
343 self._update.sync_node(self.cloud_node, self.arvados_node)
344 self._later.consider_shutdown()