3 from __future__ import absolute_import, print_function
9 import libcloud.common.types as cloud_types
13 arvados_node_fqdn, arvados_node_mtime, arvados_timestamp, timestamp_fresh
14 from ...clientactor import _notify_subscribers
15 from ... import config
17 class ComputeNodeStateChangeBase(config.actor_class):
18 """Base class for actors that change a compute node's state.
20 This base class takes care of retrying changes and notifying
21 subscribers when the change is finished.
23 def __init__(self, logger_name, cloud_client, timer_actor,
24 retry_wait, max_retry_wait):
25 super(ComputeNodeStateChangeBase, self).__init__()
26 self._later = self.actor_ref.proxy()
27 self._logger = logging.getLogger(logger_name)
28 self._cloud = cloud_client
29 self._timer = timer_actor
30 self.min_retry_wait = retry_wait
31 self.max_retry_wait = max_retry_wait
32 self.retry_wait = retry_wait
33 self.subscribers = set()
36 def _retry(errors=()):
37 """Retry decorator for an actor method that makes remote requests.
39 Use this function to decorator an actor method, and pass in a
40 tuple of exceptions to catch. This decorator will schedule
41 retries of that method with exponential backoff if the
42 original method raises a known cloud driver error, or any of the
43 given exception types.
45 def decorator(orig_func):
46 @functools.wraps(orig_func)
47 def retry_wrapper(self, *args, **kwargs):
48 start_time = time.time()
50 orig_func(self, *args, **kwargs)
51 except Exception as error:
52 if not (isinstance(error, errors) or
53 self._cloud.is_cloud_exception(error)):
56 "Client error: %s - waiting %s seconds",
57 error, self.retry_wait)
58 self._timer.schedule(start_time + self.retry_wait,
62 self.retry_wait = min(self.retry_wait * 2,
65 self.retry_wait = self.min_retry_wait
70 _notify_subscribers(self._later, self.subscribers)
71 self.subscribers = None
73 def subscribe(self, subscriber):
74 if self.subscribers is None:
76 subscriber(self._later)
77 except pykka.ActorDeadError:
80 self.subscribers.add(subscriber)
83 class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
84 """Actor to create and set up a cloud compute node.
86 This actor prepares an Arvados node record for a new compute node
87 (either creating one or cleaning one passed in), then boots the
88 actual compute node. It notifies subscribers when the cloud node
89 is successfully created (the last step in the process for Node
92 def __init__(self, timer_actor, arvados_client, cloud_client,
93 cloud_size, arvados_node=None,
94 retry_wait=1, max_retry_wait=180):
95 super(ComputeNodeSetupActor, self).__init__(
96 'arvnodeman.nodeup', cloud_client, timer_actor,
97 retry_wait, max_retry_wait)
98 self._arvados = arvados_client
99 self.cloud_size = cloud_size
100 self.arvados_node = None
101 self.cloud_node = None
102 if arvados_node is None:
103 self._later.create_arvados_node()
105 self._later.prepare_arvados_node(arvados_node)
107 @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
108 def create_arvados_node(self):
109 self.arvados_node = self._arvados.nodes().create(body={}).execute()
110 self._later.create_cloud_node()
112 @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
113 def prepare_arvados_node(self, node):
114 self.arvados_node = self._arvados.nodes().update(
116 body={'hostname': None,
119 'first_ping_at': None,
120 'last_ping_at': None,
121 'info': {'ec2_instance_id': None,
122 'last_action': "Prepared by Node Manager"}}
124 self._later.create_cloud_node()
126 @ComputeNodeStateChangeBase._retry()
127 def create_cloud_node(self):
128 self._logger.info("Creating cloud node with size %s.",
129 self.cloud_size.name)
130 self.cloud_node = self._cloud.create_node(self.cloud_size,
132 self._logger.info("Cloud node %s created.", self.cloud_node.id)
133 self._later.post_create()
135 @ComputeNodeStateChangeBase._retry()
136 def post_create(self):
137 self._cloud.post_create_node(self.cloud_node)
138 self._logger.info("%s post-create work done.", self.cloud_node.id)
141 def stop_if_no_cloud_node(self):
142 if self.cloud_node is not None:
148 class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
149 """Actor to shut down a compute node.
151 This actor simply destroys a cloud node, retrying as needed.
153 def __init__(self, timer_actor, cloud_client, node_monitor,
154 cancellable=True, retry_wait=1, max_retry_wait=180):
155 # If a ShutdownActor is cancellable, it will ask the
156 # ComputeNodeMonitorActor if it's still eligible before taking each
157 # action, and stop the shutdown process if the node is no longer
158 # eligible. Normal shutdowns based on job demand should be
159 # cancellable; shutdowns based on node misbehavior should not.
160 super(ComputeNodeShutdownActor, self).__init__(
161 'arvnodeman.nodedown', cloud_client, timer_actor,
162 retry_wait, max_retry_wait)
163 self._monitor = node_monitor.proxy()
164 self.cloud_node = self._monitor.cloud_node.get()
165 self.cancellable = cancellable
169 self._later.shutdown_node()
171 def cancel_shutdown(self):
175 def _stop_if_window_closed(orig_func):
176 @functools.wraps(orig_func)
177 def stop_wrapper(self, *args, **kwargs):
178 if (self.cancellable and
179 (not self._monitor.shutdown_eligible().get())):
181 "Cloud node %s shutdown cancelled - no longer eligible.",
183 self._later.cancel_shutdown()
186 return orig_func(self, *args, **kwargs)
189 @_stop_if_window_closed
190 @ComputeNodeStateChangeBase._retry()
191 def shutdown_node(self):
192 if self._cloud.destroy_node(self.cloud_node):
193 self._logger.info("Cloud node %s shut down.", self.cloud_node.id)
198 raise cloud_types.LibcloudError("destroy_node failed")
200 # Make the decorator available to subclasses.
201 _stop_if_window_closed = staticmethod(_stop_if_window_closed)
204 class ComputeNodeUpdateActor(config.actor_class):
205 """Actor to dispatch one-off cloud management requests.
207 This actor receives requests for small cloud updates, and
208 dispatches them to a real driver. ComputeNodeMonitorActors use
209 this to perform maintenance tasks on themselves. Having a
210 dedicated actor for this gives us the opportunity to control the
211 flow of requests; e.g., by backing off when errors occur.
213 This actor is most like a "traditional" Pykka actor: there's no
214 subscribing, but instead methods return real driver results. If
215 you're interested in those results, you should get them from the
216 Future that the proxy method returns. Be prepared to handle exceptions
217 from the cloud driver when you do.
219 def __init__(self, cloud_factory, max_retry_wait=180):
220 super(ComputeNodeUpdateActor, self).__init__()
221 self._cloud = cloud_factory()
222 self.max_retry_wait = max_retry_wait
223 self.error_streak = 0
224 self.next_request_time = time.time()
226 def _throttle_errors(orig_func):
227 @functools.wraps(orig_func)
228 def throttle_wrapper(self, *args, **kwargs):
229 throttle_time = self.next_request_time - time.time()
230 if throttle_time > 0:
231 time.sleep(throttle_time)
232 self.next_request_time = time.time()
234 result = orig_func(self, *args, **kwargs)
235 except Exception as error:
236 self.error_streak += 1
237 self.next_request_time += min(2 ** self.error_streak,
241 self.error_streak = 0
243 return throttle_wrapper
246 def sync_node(self, cloud_node, arvados_node):
247 return self._cloud.sync_node(cloud_node, arvados_node)
250 class ComputeNodeMonitorActor(config.actor_class):
251 """Actor to manage a running compute node.
253 This actor gets updates about a compute node's cloud and Arvados records.
254 It uses this information to notify subscribers when the node is eligible
257 def __init__(self, cloud_node, cloud_node_start_time, shutdown_timer,
258 cloud_fqdn_func, timer_actor, update_actor, arvados_node=None,
259 poll_stale_after=600, node_stale_after=3600):
260 super(ComputeNodeMonitorActor, self).__init__()
261 self._later = self.actor_ref.proxy()
262 self._logger = logging.getLogger('arvnodeman.computenode')
263 self._last_log = None
264 self._shutdowns = shutdown_timer
265 self._cloud_node_fqdn = cloud_fqdn_func
266 self._timer = timer_actor
267 self._update = update_actor
268 self.cloud_node = cloud_node
269 self.cloud_node_start_time = cloud_node_start_time
270 self.poll_stale_after = poll_stale_after
271 self.node_stale_after = node_stale_after
272 self.subscribers = set()
273 self.arvados_node = None
274 self._later.update_arvados_node(arvados_node)
275 self.last_shutdown_opening = None
276 self._later.consider_shutdown()
278 def subscribe(self, subscriber):
279 self.subscribers.add(subscriber)
281 def _debug(self, msg, *args):
282 if msg == self._last_log:
285 self._logger.debug(msg, *args)
287 def in_state(self, *states):
288 # Return a boolean to say whether or not our Arvados node record is in
289 # one of the given states. If state information is not
290 # available--because this node has no Arvados record, the record is
291 # stale, or the record has no state information--return None.
292 if (self.arvados_node is None) or not timestamp_fresh(
293 arvados_node_mtime(self.arvados_node), self.node_stale_after):
295 state = self.arvados_node['crunch_worker_state']
298 result = state in states
300 result = result and not self.arvados_node['job_uuid']
303 def shutdown_eligible(self):
304 if not self._shutdowns.window_open():
306 elif self.arvados_node is None:
307 # If this is a new, unpaired node, it's eligible for
308 # shutdown--we figure there was an error during bootstrap.
309 return timestamp_fresh(self.cloud_node_start_time,
310 self.node_stale_after)
312 return self.in_state('idle')
314 def consider_shutdown(self):
315 next_opening = self._shutdowns.next_opening()
316 if self.shutdown_eligible():
317 self._debug("Node %s suggesting shutdown.", self.cloud_node.id)
318 _notify_subscribers(self._later, self.subscribers)
319 elif self._shutdowns.window_open():
320 self._debug("Node %s shutdown window open but node busy.",
322 elif self.last_shutdown_opening != next_opening:
323 self._debug("Node %s shutdown window closed. Next at %s.",
324 self.cloud_node.id, time.ctime(next_opening))
325 self._timer.schedule(next_opening, self._later.consider_shutdown)
326 self.last_shutdown_opening = next_opening
328 def offer_arvados_pair(self, arvados_node):
329 first_ping_s = arvados_node.get('first_ping_at')
330 if (self.arvados_node is not None) or (not first_ping_s):
332 elif ((arvados_node['ip_address'] in self.cloud_node.private_ips) and
333 (arvados_timestamp(first_ping_s) >= self.cloud_node_start_time)):
334 self._later.update_arvados_node(arvados_node)
335 return self.cloud_node.id
339 def update_cloud_node(self, cloud_node):
340 if cloud_node is not None:
341 self.cloud_node = cloud_node
342 self._later.consider_shutdown()
344 def update_arvados_node(self, arvados_node):
345 # If the cloud node's FQDN doesn't match what's in the Arvados node
346 # record, make them match.
347 # This method is a little unusual in the way it just fires off the
348 # request without checking the result or retrying errors. That's
349 # because this update happens every time we reload the Arvados node
350 # list: if a previous sync attempt failed, we'll see that the names
351 # are out of sync and just try again. ComputeNodeUpdateActor has
352 # the logic to throttle those effective retries when there's trouble.
353 if arvados_node is not None:
354 self.arvados_node = arvados_node
355 if (self._cloud_node_fqdn(self.cloud_node) !=
356 arvados_node_fqdn(self.arvados_node)):
357 self._update.sync_node(self.cloud_node, self.arvados_node)
358 self._later.consider_shutdown()