3 from __future__ import absolute_import, print_function
9 import libcloud.common.types as cloud_types
13 arvados_node_fqdn, arvados_node_mtime, arvados_timestamp, timestamp_fresh, arvados_node_missing
14 from ...clientactor import _notify_subscribers
15 from ... import config
17 class ComputeNodeStateChangeBase(config.actor_class):
18 """Base class for actors that change a compute node's state.
20 This base class takes care of retrying changes and notifying
21 subscribers when the change is finished.
23 def __init__(self, logger_name, cloud_client, arvados_client, timer_actor,
24 retry_wait, max_retry_wait):
25 super(ComputeNodeStateChangeBase, self).__init__()
26 self._later = self.actor_ref.proxy()
27 self._logger = logging.getLogger(logger_name)
28 self._cloud = cloud_client
29 self._arvados = arvados_client
30 self._timer = timer_actor
31 self.min_retry_wait = retry_wait
32 self.max_retry_wait = max_retry_wait
33 self.retry_wait = retry_wait
34 self.subscribers = set()
37 def _retry(errors=()):
38 """Retry decorator for an actor method that makes remote requests.
40 Use this function to decorator an actor method, and pass in a
41 tuple of exceptions to catch. This decorator will schedule
42 retries of that method with exponential backoff if the
43 original method raises a known cloud driver error, or any of the
44 given exception types.
46 def decorator(orig_func):
47 @functools.wraps(orig_func)
48 def retry_wrapper(self, *args, **kwargs):
49 start_time = time.time()
51 orig_func(self, *args, **kwargs)
52 except Exception as error:
53 if not (isinstance(error, errors) or
54 self._cloud.is_cloud_exception(error)):
57 "Client error: %s - waiting %s seconds",
58 error, self.retry_wait)
59 self._timer.schedule(start_time + self.retry_wait,
63 self.retry_wait = min(self.retry_wait * 2,
66 self.retry_wait = self.min_retry_wait
71 _notify_subscribers(self._later, self.subscribers)
72 self.subscribers = None
74 def subscribe(self, subscriber):
75 if self.subscribers is None:
77 subscriber(self._later)
78 except pykka.ActorDeadError:
81 self.subscribers.add(subscriber)
83 def _clean_arvados_node(self, arvados_node, explanation):
84 return self._arvados.nodes().update(
85 uuid=arvados_node['uuid'],
86 body={'hostname': None,
89 'first_ping_at': None,
91 'info': {'ec2_instance_id': None,
92 'last_action': explanation}},
96 class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
97 """Actor to create and set up a cloud compute node.
99 This actor prepares an Arvados node record for a new compute node
100 (either creating one or cleaning one passed in), then boots the
101 actual compute node. It notifies subscribers when the cloud node
102 is successfully created (the last step in the process for Node
105 def __init__(self, timer_actor, arvados_client, cloud_client,
106 cloud_size, arvados_node=None,
107 retry_wait=1, max_retry_wait=180):
108 super(ComputeNodeSetupActor, self).__init__(
109 'arvnodeman.nodeup', cloud_client, arvados_client, timer_actor,
110 retry_wait, max_retry_wait)
111 self.cloud_size = cloud_size
112 self.arvados_node = None
113 self.cloud_node = None
114 if arvados_node is None:
115 self._later.create_arvados_node()
117 self._later.prepare_arvados_node(arvados_node)
119 @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
120 def create_arvados_node(self):
121 self.arvados_node = self._arvados.nodes().create(body={}).execute()
122 self._later.create_cloud_node()
124 @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
125 def prepare_arvados_node(self, node):
126 self.arvados_node = self._clean_arvados_node(
127 node, "Prepared by Node Manager")
128 self._later.create_cloud_node()
130 @ComputeNodeStateChangeBase._retry()
131 def create_cloud_node(self):
132 self._logger.info("Creating cloud node with size %s.",
133 self.cloud_size.name)
134 self.cloud_node = self._cloud.create_node(self.cloud_size,
136 self._logger.info("Cloud node %s created.", self.cloud_node.id)
137 self._later.post_create()
139 @ComputeNodeStateChangeBase._retry()
140 def post_create(self):
141 self._cloud.post_create_node(self.cloud_node)
142 self._logger.info("%s post-create work done.", self.cloud_node.id)
145 def stop_if_no_cloud_node(self):
146 if self.cloud_node is not None:
152 class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
153 """Actor to shut down a compute node.
155 This actor simply destroys a cloud node, retrying as needed.
157 def __init__(self, timer_actor, cloud_client, arvados_client, node_monitor,
158 cancellable=True, retry_wait=1, max_retry_wait=180):
159 # If a ShutdownActor is cancellable, it will ask the
160 # ComputeNodeMonitorActor if it's still eligible before taking each
161 # action, and stop the shutdown process if the node is no longer
162 # eligible. Normal shutdowns based on job demand should be
163 # cancellable; shutdowns based on node misbehavior should not.
164 super(ComputeNodeShutdownActor, self).__init__(
165 'arvnodeman.nodedown', cloud_client, arvados_client, timer_actor,
166 retry_wait, max_retry_wait)
167 self._monitor = node_monitor.proxy()
168 self.cloud_node = self._monitor.cloud_node.get()
169 self.cancellable = cancellable
173 self._later.shutdown_node()
175 def _arvados_node(self):
176 return self._monitor.arvados_node.get()
178 def _finished(self, success_flag=None):
179 if success_flag is not None:
180 self.success = success_flag
181 return super(ComputeNodeShutdownActor, self)._finished()
183 def cancel_shutdown(self):
184 self._finished(success_flag=False)
186 def _stop_if_window_closed(orig_func):
187 @functools.wraps(orig_func)
188 def stop_wrapper(self, *args, **kwargs):
189 if (self.cancellable and
190 (not self._monitor.shutdown_eligible().get())):
192 "Cloud node %s shutdown cancelled - no longer eligible.",
194 self._later.cancel_shutdown()
197 return orig_func(self, *args, **kwargs)
200 @_stop_if_window_closed
201 @ComputeNodeStateChangeBase._retry()
202 def shutdown_node(self):
203 if not self._cloud.destroy_node(self.cloud_node):
205 raise cloud_types.LibcloudError("destroy_node failed")
206 self._logger.info("Cloud node %s shut down.", self.cloud_node.id)
207 arv_node = self._arvados_node()
209 self._finished(success_flag=True)
211 self._later.clean_arvados_node(arv_node)
213 @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
214 def clean_arvados_node(self, arvados_node):
215 self._clean_arvados_node(arvados_node, "Shut down by Node Manager")
216 self._finished(success_flag=True)
218 # Make the decorator available to subclasses.
219 _stop_if_window_closed = staticmethod(_stop_if_window_closed)
222 class ComputeNodeUpdateActor(config.actor_class):
223 """Actor to dispatch one-off cloud management requests.
225 This actor receives requests for small cloud updates, and
226 dispatches them to a real driver. ComputeNodeMonitorActors use
227 this to perform maintenance tasks on themselves. Having a
228 dedicated actor for this gives us the opportunity to control the
229 flow of requests; e.g., by backing off when errors occur.
231 This actor is most like a "traditional" Pykka actor: there's no
232 subscribing, but instead methods return real driver results. If
233 you're interested in those results, you should get them from the
234 Future that the proxy method returns. Be prepared to handle exceptions
235 from the cloud driver when you do.
237 def __init__(self, cloud_factory, max_retry_wait=180):
238 super(ComputeNodeUpdateActor, self).__init__()
239 self._cloud = cloud_factory()
240 self.max_retry_wait = max_retry_wait
241 self.error_streak = 0
242 self.next_request_time = time.time()
244 def _throttle_errors(orig_func):
245 @functools.wraps(orig_func)
246 def throttle_wrapper(self, *args, **kwargs):
247 throttle_time = self.next_request_time - time.time()
248 if throttle_time > 0:
249 time.sleep(throttle_time)
250 self.next_request_time = time.time()
252 result = orig_func(self, *args, **kwargs)
253 except Exception as error:
254 self.error_streak += 1
255 self.next_request_time += min(2 ** self.error_streak,
259 self.error_streak = 0
261 return throttle_wrapper
264 def sync_node(self, cloud_node, arvados_node):
265 return self._cloud.sync_node(cloud_node, arvados_node)
268 class ComputeNodeMonitorActor(config.actor_class):
269 """Actor to manage a running compute node.
271 This actor gets updates about a compute node's cloud and Arvados records.
272 It uses this information to notify subscribers when the node is eligible
275 def __init__(self, cloud_node, cloud_node_start_time, shutdown_timer,
276 cloud_fqdn_func, timer_actor, update_actor, cloud_client,
277 arvados_node=None, poll_stale_after=600, node_stale_after=3600,
280 super(ComputeNodeMonitorActor, self).__init__()
281 self._later = self.actor_ref.proxy()
282 self._logger = logging.getLogger('arvnodeman.computenode')
283 self._last_log = None
284 self._shutdowns = shutdown_timer
285 self._cloud_node_fqdn = cloud_fqdn_func
286 self._timer = timer_actor
287 self._update = update_actor
288 self._cloud = cloud_client
289 self.cloud_node = cloud_node
290 self.cloud_node_start_time = cloud_node_start_time
291 self.poll_stale_after = poll_stale_after
292 self.node_stale_after = node_stale_after
293 self.boot_fail_after = boot_fail_after
294 self.subscribers = set()
295 self.arvados_node = None
296 self._later.update_arvados_node(arvados_node)
297 self.last_shutdown_opening = None
298 self._later.consider_shutdown()
300 def subscribe(self, subscriber):
301 self.subscribers.add(subscriber)
303 def _debug(self, msg, *args):
304 if msg == self._last_log:
307 self._logger.debug(msg, *args)
309 def in_state(self, *states):
310 # Return a boolean to say whether or not our Arvados node record is in
311 # one of the given states. If state information is not
312 # available--because this node has no Arvados record, the record is
313 # stale, or the record has no state information--return None.
314 if (self.arvados_node is None) or not timestamp_fresh(
315 arvados_node_mtime(self.arvados_node), self.node_stale_after):
317 state = self.arvados_node['crunch_worker_state']
320 result = state in states
322 result = result and not self.arvados_node['job_uuid']
325 def shutdown_eligible(self):
326 if not self._shutdowns.window_open():
328 elif self.arvados_node is None:
330 # If it hasn't pinged Arvados after boot_fail seconds, shut it down
331 return not timestamp_fresh(self.cloud_node_start_time, self.boot_fail_after)
332 elif arvados_node_missing(self.arvados_node, self.node_stale_after) and self._cloud.broken(self.cloud_node):
333 # Node is paired, but Arvados says it is missing and the cloud says the node
334 # is in an error state, so shut it down.
337 return self.in_state('idle')
339 def consider_shutdown(self):
340 next_opening = self._shutdowns.next_opening()
341 if self.shutdown_eligible():
342 self._debug("Node %s suggesting shutdown.", self.cloud_node.id)
343 _notify_subscribers(self._later, self.subscribers)
344 elif self._shutdowns.window_open():
345 self._debug("Node %s shutdown window open but node busy.",
347 elif self.last_shutdown_opening != next_opening:
348 self._debug("Node %s shutdown window closed. Next at %s.",
349 self.cloud_node.id, time.ctime(next_opening))
350 self._timer.schedule(next_opening, self._later.consider_shutdown)
351 self.last_shutdown_opening = next_opening
353 def offer_arvados_pair(self, arvados_node):
354 first_ping_s = arvados_node.get('first_ping_at')
355 if (self.arvados_node is not None) or (not first_ping_s):
357 elif ((arvados_node['ip_address'] in self.cloud_node.private_ips) and
358 (arvados_timestamp(first_ping_s) >= self.cloud_node_start_time)):
359 self._later.update_arvados_node(arvados_node)
360 return self.cloud_node.id
364 def update_cloud_node(self, cloud_node):
365 if cloud_node is not None:
366 self.cloud_node = cloud_node
367 self._later.consider_shutdown()
369 def update_arvados_node(self, arvados_node):
370 # If the cloud node's FQDN doesn't match what's in the Arvados node
371 # record, make them match.
372 # This method is a little unusual in the way it just fires off the
373 # request without checking the result or retrying errors. That's
374 # because this update happens every time we reload the Arvados node
375 # list: if a previous sync attempt failed, we'll see that the names
376 # are out of sync and just try again. ComputeNodeUpdateActor has
377 # the logic to throttle those effective retries when there's trouble.
378 if arvados_node is not None:
379 self.arvados_node = arvados_node
380 if (self._cloud_node_fqdn(self.cloud_node) !=
381 arvados_node_fqdn(self.arvados_node)):
382 self._update.sync_node(self.cloud_node, self.arvados_node)
383 self._later.consider_shutdown()