3 from __future__ import absolute_import, print_function
9 import libcloud.common.types as cloud_types
13 arvados_node_fqdn, arvados_node_mtime, arvados_timestamp, timestamp_fresh
14 from ...clientactor import _notify_subscribers
15 from ... import config
17 class ComputeNodeStateChangeBase(config.actor_class):
18 """Base class for actors that change a compute node's state.
20 This base class takes care of retrying changes and notifying
21 subscribers when the change is finished.
23 def __init__(self, logger_name, cloud_client, timer_actor,
24 retry_wait, max_retry_wait):
25 super(ComputeNodeStateChangeBase, self).__init__()
26 self._later = self.actor_ref.proxy()
27 self._logger = logging.getLogger(logger_name)
28 self._cloud = cloud_client
29 self._timer = timer_actor
30 self.min_retry_wait = retry_wait
31 self.max_retry_wait = max_retry_wait
32 self.retry_wait = retry_wait
33 self.subscribers = set()
36 def _retry(errors=()):
37 """Retry decorator for an actor method that makes remote requests.
39 Use this function to decorator an actor method, and pass in a
40 tuple of exceptions to catch. This decorator will schedule
41 retries of that method with exponential backoff if the
42 original method raises a known cloud driver error, or any of the
43 given exception types.
45 def decorator(orig_func):
46 @functools.wraps(orig_func)
47 def retry_wrapper(self, *args, **kwargs):
48 start_time = time.time()
50 orig_func(self, *args, **kwargs)
51 except Exception as error:
52 if not (isinstance(error, errors) or
53 self._cloud.is_cloud_exception(error)):
56 "Client error: %s - waiting %s seconds",
57 error, self.retry_wait)
58 self._timer.schedule(start_time + self.retry_wait,
62 self.retry_wait = min(self.retry_wait * 2,
65 self.retry_wait = self.min_retry_wait
70 _notify_subscribers(self._later, self.subscribers)
71 self.subscribers = None
73 def subscribe(self, subscriber):
74 if self.subscribers is None:
76 subscriber(self._later)
77 except pykka.ActorDeadError:
80 self.subscribers.add(subscriber)
83 class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
84 """Actor to create and set up a cloud compute node.
86 This actor prepares an Arvados node record for a new compute node
87 (either creating one or cleaning one passed in), then boots the
88 actual compute node. It notifies subscribers when the cloud node
89 is successfully created (the last step in the process for Node
92 def __init__(self, timer_actor, arvados_client, cloud_client,
93 cloud_size, arvados_node=None,
94 retry_wait=1, max_retry_wait=180):
95 super(ComputeNodeSetupActor, self).__init__(
96 'arvnodeman.nodeup', cloud_client, timer_actor,
97 retry_wait, max_retry_wait)
98 self._arvados = arvados_client
99 self.cloud_size = cloud_size
100 self.arvados_node = None
101 self.cloud_node = None
102 if arvados_node is None:
103 self._later.create_arvados_node()
105 self._later.prepare_arvados_node(arvados_node)
107 @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
108 def create_arvados_node(self):
109 self.arvados_node = self._arvados.nodes().create(body={}).execute()
110 self._later.create_cloud_node()
112 @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
113 def prepare_arvados_node(self, node):
114 self.arvados_node = self._arvados.nodes().update(
116 body={'hostname': None,
119 'first_ping_at': None,
120 'last_ping_at': None,
121 'info': {'ec2_instance_id': None,
122 'last_action': "Prepared by Node Manager"}}
124 self._later.create_cloud_node()
126 @ComputeNodeStateChangeBase._retry()
127 def create_cloud_node(self):
128 self._logger.info("Creating cloud node with size %s.",
129 self.cloud_size.name)
130 self.cloud_node = self._cloud.create_node(self.cloud_size,
132 self._logger.info("Cloud node %s created.", self.cloud_node.id)
133 self._later.post_create()
135 @ComputeNodeStateChangeBase._retry()
136 def post_create(self):
137 self._cloud.post_create_node(self.cloud_node)
138 self._logger.info("%s post-create work done.", self.cloud_node.id)
141 def stop_if_no_cloud_node(self):
142 if self.cloud_node is None:
146 class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
147 """Actor to shut down a compute node.
149 This actor simply destroys a cloud node, retrying as needed.
151 def __init__(self, timer_actor, cloud_client, node_monitor,
152 cancellable=True, retry_wait=1, max_retry_wait=180):
153 # If a ShutdownActor is cancellable, it will ask the
154 # ComputeNodeMonitorActor if it's still eligible before taking each
155 # action, and stop the shutdown process if the node is no longer
156 # eligible. Normal shutdowns based on job demand should be
157 # cancellable; shutdowns based on node misbehavior should not.
158 super(ComputeNodeShutdownActor, self).__init__(
159 'arvnodeman.nodedown', cloud_client, timer_actor,
160 retry_wait, max_retry_wait)
161 self._monitor = node_monitor.proxy()
162 self.cloud_node = self._monitor.cloud_node.get()
163 self.cancellable = cancellable
167 self._later.shutdown_node()
169 def cancel_shutdown(self):
173 def _stop_if_window_closed(orig_func):
174 @functools.wraps(orig_func)
175 def stop_wrapper(self, *args, **kwargs):
176 if (self.cancellable and
177 (not self._monitor.shutdown_eligible().get())):
179 "Cloud node %s shutdown cancelled - no longer eligible.",
181 self._later.cancel_shutdown()
184 return orig_func(self, *args, **kwargs)
187 @_stop_if_window_closed
188 @ComputeNodeStateChangeBase._retry()
189 def shutdown_node(self):
190 if self._cloud.destroy_node(self.cloud_node):
191 self._logger.info("Cloud node %s shut down.", self.cloud_node.id)
196 raise cloud_types.LibcloudError("destroy_node failed")
198 # Make the decorator available to subclasses.
199 _stop_if_window_closed = staticmethod(_stop_if_window_closed)
202 class ComputeNodeUpdateActor(config.actor_class):
203 """Actor to dispatch one-off cloud management requests.
205 This actor receives requests for small cloud updates, and
206 dispatches them to a real driver. ComputeNodeMonitorActors use
207 this to perform maintenance tasks on themselves. Having a
208 dedicated actor for this gives us the opportunity to control the
209 flow of requests; e.g., by backing off when errors occur.
211 This actor is most like a "traditional" Pykka actor: there's no
212 subscribing, but instead methods return real driver results. If
213 you're interested in those results, you should get them from the
214 Future that the proxy method returns. Be prepared to handle exceptions
215 from the cloud driver when you do.
217 def __init__(self, cloud_factory, max_retry_wait=180):
218 super(ComputeNodeUpdateActor, self).__init__()
219 self._cloud = cloud_factory()
220 self.max_retry_wait = max_retry_wait
221 self.error_streak = 0
222 self.next_request_time = time.time()
224 def _throttle_errors(orig_func):
225 @functools.wraps(orig_func)
226 def throttle_wrapper(self, *args, **kwargs):
227 throttle_time = self.next_request_time - time.time()
228 if throttle_time > 0:
229 time.sleep(throttle_time)
230 self.next_request_time = time.time()
232 result = orig_func(self, *args, **kwargs)
233 except Exception as error:
234 self.error_streak += 1
235 self.next_request_time += min(2 ** self.error_streak,
239 self.error_streak = 0
241 return throttle_wrapper
244 def sync_node(self, cloud_node, arvados_node):
245 return self._cloud.sync_node(cloud_node, arvados_node)
248 class ComputeNodeMonitorActor(config.actor_class):
249 """Actor to manage a running compute node.
251 This actor gets updates about a compute node's cloud and Arvados records.
252 It uses this information to notify subscribers when the node is eligible
255 def __init__(self, cloud_node, cloud_node_start_time, shutdown_timer,
256 cloud_fqdn_func, timer_actor, update_actor, arvados_node=None,
257 poll_stale_after=600, node_stale_after=3600):
258 super(ComputeNodeMonitorActor, self).__init__()
259 self._later = self.actor_ref.proxy()
260 self._logger = logging.getLogger('arvnodeman.computenode')
261 self._last_log = None
262 self._shutdowns = shutdown_timer
263 self._cloud_node_fqdn = cloud_fqdn_func
264 self._timer = timer_actor
265 self._update = update_actor
266 self.cloud_node = cloud_node
267 self.cloud_node_start_time = cloud_node_start_time
268 self.poll_stale_after = poll_stale_after
269 self.node_stale_after = node_stale_after
270 self.subscribers = set()
271 self.arvados_node = None
272 self._later.update_arvados_node(arvados_node)
273 self.last_shutdown_opening = None
274 self._later.consider_shutdown()
276 def subscribe(self, subscriber):
277 self.subscribers.add(subscriber)
279 def _debug(self, msg, *args):
280 if msg == self._last_log:
283 self._logger.debug(msg, *args)
285 def in_state(self, *states):
286 # Return a boolean to say whether or not our Arvados node record is in
287 # one of the given states. If state information is not
288 # available--because this node has no Arvados record, the record is
289 # stale, or the record has no state information--return None.
290 if (self.arvados_node is None) or not timestamp_fresh(
291 arvados_node_mtime(self.arvados_node), self.node_stale_after):
293 state = self.arvados_node['crunch_worker_state']
296 result = state in states
298 result = result and not self.arvados_node['job_uuid']
301 def shutdown_eligible(self):
302 if not self._shutdowns.window_open():
304 elif self.arvados_node is None:
305 # If this is a new, unpaired node, it's eligible for
306 # shutdown--we figure there was an error during bootstrap.
307 return timestamp_fresh(self.cloud_node_start_time,
308 self.node_stale_after)
310 return self.in_state('idle')
312 def consider_shutdown(self):
313 next_opening = self._shutdowns.next_opening()
314 if self.shutdown_eligible():
315 self._debug("Node %s suggesting shutdown.", self.cloud_node.id)
316 _notify_subscribers(self._later, self.subscribers)
317 elif self._shutdowns.window_open():
318 self._debug("Node %s shutdown window open but node busy.",
320 elif self.last_shutdown_opening != next_opening:
321 self._debug("Node %s shutdown window closed. Next at %s.",
322 self.cloud_node.id, time.ctime(next_opening))
323 self._timer.schedule(next_opening, self._later.consider_shutdown)
324 self.last_shutdown_opening = next_opening
326 def offer_arvados_pair(self, arvados_node):
327 first_ping_s = arvados_node.get('first_ping_at')
328 if (self.arvados_node is not None) or (not first_ping_s):
330 elif ((arvados_node['ip_address'] in self.cloud_node.private_ips) and
331 (arvados_timestamp(first_ping_s) >= self.cloud_node_start_time)):
332 self._later.update_arvados_node(arvados_node)
333 return self.cloud_node.id
337 def update_cloud_node(self, cloud_node):
338 if cloud_node is not None:
339 self.cloud_node = cloud_node
340 self._later.consider_shutdown()
342 def update_arvados_node(self, arvados_node):
343 # If the cloud node's FQDN doesn't match what's in the Arvados node
344 # record, make them match.
345 # This method is a little unusual in the way it just fires off the
346 # request without checking the result or retrying errors. That's
347 # because this update happens every time we reload the Arvados node
348 # list: if a previous sync attempt failed, we'll see that the names
349 # are out of sync and just try again. ComputeNodeUpdateActor has
350 # the logic to throttle those effective retries when there's trouble.
351 if arvados_node is not None:
352 self.arvados_node = arvados_node
353 if (self._cloud_node_fqdn(self.cloud_node) !=
354 arvados_node_fqdn(self.arvados_node)):
355 self._update.sync_node(self.cloud_node, self.arvados_node)
356 self._later.consider_shutdown()