3 from __future__ import absolute_import, print_function
9 import libcloud.common.types as cloud_types
13 arvados_node_fqdn, arvados_node_mtime, arvados_timestamp, timestamp_fresh, arvados_node_missing
14 from ...clientactor import _notify_subscribers
15 from ... import config
17 class ComputeNodeStateChangeBase(config.actor_class):
18 """Base class for actors that change a compute node's state.
20 This base class takes care of retrying changes and notifying
21 subscribers when the change is finished.
23 def __init__(self, logger_name, cloud_client, arvados_client, timer_actor,
24 retry_wait, max_retry_wait):
25 super(ComputeNodeStateChangeBase, self).__init__()
26 self._later = self.actor_ref.proxy()
27 self._logger = logging.getLogger(logger_name)
28 self._cloud = cloud_client
29 self._arvados = arvados_client
30 self._timer = timer_actor
31 self.min_retry_wait = retry_wait
32 self.max_retry_wait = max_retry_wait
33 self.retry_wait = retry_wait
34 self.subscribers = set()
37 def _retry(errors=()):
38 """Retry decorator for an actor method that makes remote requests.
40 Use this function to decorator an actor method, and pass in a
41 tuple of exceptions to catch. This decorator will schedule
42 retries of that method with exponential backoff if the
43 original method raises a known cloud driver error, or any of the
44 given exception types.
46 def decorator(orig_func):
47 @functools.wraps(orig_func)
48 def retry_wrapper(self, *args, **kwargs):
49 start_time = time.time()
51 orig_func(self, *args, **kwargs)
52 except Exception as error:
53 if not (isinstance(error, errors) or
54 self._cloud.is_cloud_exception(error)):
57 "Client error: %s - waiting %s seconds",
58 error, self.retry_wait)
59 self._timer.schedule(start_time + self.retry_wait,
63 self.retry_wait = min(self.retry_wait * 2,
66 self.retry_wait = self.min_retry_wait
71 _notify_subscribers(self._later, self.subscribers)
72 self.subscribers = None
74 def subscribe(self, subscriber):
75 if self.subscribers is None:
77 subscriber(self._later)
78 except pykka.ActorDeadError:
81 self.subscribers.add(subscriber)
83 def _clean_arvados_node(self, arvados_node, explanation):
84 return self._arvados.nodes().update(
85 uuid=arvados_node['uuid'],
86 body={'hostname': None,
89 'first_ping_at': None,
91 'info': {'ec2_instance_id': None,
92 'last_action': explanation}},
96 class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
97 """Actor to create and set up a cloud compute node.
99 This actor prepares an Arvados node record for a new compute node
100 (either creating one or cleaning one passed in), then boots the
101 actual compute node. It notifies subscribers when the cloud node
102 is successfully created (the last step in the process for Node
105 def __init__(self, timer_actor, arvados_client, cloud_client,
106 cloud_size, arvados_node=None,
107 retry_wait=1, max_retry_wait=180):
108 super(ComputeNodeSetupActor, self).__init__(
109 'arvnodeman.nodeup', cloud_client, arvados_client, timer_actor,
110 retry_wait, max_retry_wait)
111 self.cloud_size = cloud_size
112 self.arvados_node = None
113 self.cloud_node = None
114 if arvados_node is None:
115 self._later.create_arvados_node()
117 self._later.prepare_arvados_node(arvados_node)
119 @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
120 def create_arvados_node(self):
121 self.arvados_node = self._arvados.nodes().create(body={}).execute()
122 self._later.create_cloud_node()
124 @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
125 def prepare_arvados_node(self, node):
126 self.arvados_node = self._clean_arvados_node(
127 node, "Prepared by Node Manager")
128 self._later.create_cloud_node()
130 @ComputeNodeStateChangeBase._retry()
131 def create_cloud_node(self):
132 self._logger.info("Creating cloud node with size %s.",
133 self.cloud_size.name)
134 self.cloud_node = self._cloud.create_node(self.cloud_size,
136 self._logger.info("Cloud node %s created.", self.cloud_node.id)
137 self._later.post_create()
139 @ComputeNodeStateChangeBase._retry()
140 def post_create(self):
141 self._cloud.post_create_node(self.cloud_node)
142 self._logger.info("%s post-create work done.", self.cloud_node.id)
145 def stop_if_no_cloud_node(self):
146 if self.cloud_node is not None:
152 class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
153 """Actor to shut down a compute node.
155 This actor simply destroys a cloud node, retrying as needed.
157 # Reasons for a shutdown to be cancelled.
158 WINDOW_CLOSED = "shutdown window closed"
159 NODE_BROKEN = "cloud failed to shut down broken node"
161 def __init__(self, timer_actor, cloud_client, arvados_client, node_monitor,
162 cancellable=True, retry_wait=1, max_retry_wait=180):
163 # If a ShutdownActor is cancellable, it will ask the
164 # ComputeNodeMonitorActor if it's still eligible before taking each
165 # action, and stop the shutdown process if the node is no longer
166 # eligible. Normal shutdowns based on job demand should be
167 # cancellable; shutdowns based on node misbehavior should not.
168 super(ComputeNodeShutdownActor, self).__init__(
169 'arvnodeman.nodedown', cloud_client, arvados_client, timer_actor,
170 retry_wait, max_retry_wait)
171 self._monitor = node_monitor.proxy()
172 self.cloud_node = self._monitor.cloud_node.get()
173 self.cancellable = cancellable
174 self.cancel_reason = None
178 self._later.shutdown_node()
180 def _arvados_node(self):
181 return self._monitor.arvados_node.get()
183 def _finished(self, success_flag=None):
184 if success_flag is not None:
185 self.success = success_flag
186 return super(ComputeNodeShutdownActor, self)._finished()
188 def cancel_shutdown(self, reason):
189 self.cancel_reason = reason
190 self._logger.info("Cloud node %s shutdown cancelled: %s.",
191 self.cloud_node.id, reason)
192 self._finished(success_flag=False)
194 def _stop_if_window_closed(orig_func):
195 @functools.wraps(orig_func)
196 def stop_wrapper(self, *args, **kwargs):
197 if (self.cancellable and
198 (not self._monitor.shutdown_eligible().get())):
199 self._later.cancel_shutdown(self.WINDOW_CLOSED)
202 return orig_func(self, *args, **kwargs)
205 @_stop_if_window_closed
206 @ComputeNodeStateChangeBase._retry()
207 def shutdown_node(self):
208 if not self._cloud.destroy_node(self.cloud_node):
209 if self._cloud.broken(self.cloud_node):
210 self._later.cancel_shutdown(self.NODE_BROKEN)
213 raise cloud_types.LibcloudError("destroy_node failed")
214 self._logger.info("Cloud node %s shut down.", self.cloud_node.id)
215 arv_node = self._arvados_node()
217 self._finished(success_flag=True)
219 self._later.clean_arvados_node(arv_node)
221 @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
222 def clean_arvados_node(self, arvados_node):
223 self._clean_arvados_node(arvados_node, "Shut down by Node Manager")
224 self._finished(success_flag=True)
226 # Make the decorator available to subclasses.
227 _stop_if_window_closed = staticmethod(_stop_if_window_closed)
230 class ComputeNodeUpdateActor(config.actor_class):
231 """Actor to dispatch one-off cloud management requests.
233 This actor receives requests for small cloud updates, and
234 dispatches them to a real driver. ComputeNodeMonitorActors use
235 this to perform maintenance tasks on themselves. Having a
236 dedicated actor for this gives us the opportunity to control the
237 flow of requests; e.g., by backing off when errors occur.
239 This actor is most like a "traditional" Pykka actor: there's no
240 subscribing, but instead methods return real driver results. If
241 you're interested in those results, you should get them from the
242 Future that the proxy method returns. Be prepared to handle exceptions
243 from the cloud driver when you do.
245 def __init__(self, cloud_factory, max_retry_wait=180):
246 super(ComputeNodeUpdateActor, self).__init__()
247 self._cloud = cloud_factory()
248 self.max_retry_wait = max_retry_wait
249 self.error_streak = 0
250 self.next_request_time = time.time()
252 def _throttle_errors(orig_func):
253 @functools.wraps(orig_func)
254 def throttle_wrapper(self, *args, **kwargs):
255 throttle_time = self.next_request_time - time.time()
256 if throttle_time > 0:
257 time.sleep(throttle_time)
258 self.next_request_time = time.time()
260 result = orig_func(self, *args, **kwargs)
261 except Exception as error:
262 self.error_streak += 1
263 self.next_request_time += min(2 ** self.error_streak,
267 self.error_streak = 0
269 return throttle_wrapper
272 def sync_node(self, cloud_node, arvados_node):
273 return self._cloud.sync_node(cloud_node, arvados_node)
276 class ComputeNodeMonitorActor(config.actor_class):
277 """Actor to manage a running compute node.
279 This actor gets updates about a compute node's cloud and Arvados records.
280 It uses this information to notify subscribers when the node is eligible
283 def __init__(self, cloud_node, cloud_node_start_time, shutdown_timer,
284 cloud_fqdn_func, timer_actor, update_actor, cloud_client,
285 arvados_node=None, poll_stale_after=600, node_stale_after=3600,
288 super(ComputeNodeMonitorActor, self).__init__()
289 self._later = self.actor_ref.proxy()
290 self._logger = logging.getLogger('arvnodeman.computenode')
291 self._last_log = None
292 self._shutdowns = shutdown_timer
293 self._cloud_node_fqdn = cloud_fqdn_func
294 self._timer = timer_actor
295 self._update = update_actor
296 self._cloud = cloud_client
297 self.cloud_node = cloud_node
298 self.cloud_node_start_time = cloud_node_start_time
299 self.poll_stale_after = poll_stale_after
300 self.node_stale_after = node_stale_after
301 self.boot_fail_after = boot_fail_after
302 self.subscribers = set()
303 self.arvados_node = None
304 self._later.update_arvados_node(arvados_node)
305 self.last_shutdown_opening = None
306 self._later.consider_shutdown()
308 def subscribe(self, subscriber):
309 self.subscribers.add(subscriber)
311 def _debug(self, msg, *args):
312 if msg == self._last_log:
315 self._logger.debug(msg, *args)
317 def in_state(self, *states):
318 # Return a boolean to say whether or not our Arvados node record is in
319 # one of the given states. If state information is not
320 # available--because this node has no Arvados record, the record is
321 # stale, or the record has no state information--return None.
322 if (self.arvados_node is None) or not timestamp_fresh(
323 arvados_node_mtime(self.arvados_node), self.node_stale_after):
325 state = self.arvados_node['crunch_worker_state']
328 result = state in states
330 result = result and not self.arvados_node['job_uuid']
333 def shutdown_eligible(self):
334 if not self._shutdowns.window_open():
336 if self.arvados_node is None:
338 # If it hasn't pinged Arvados after boot_fail seconds, shut it down
339 return not timestamp_fresh(self.cloud_node_start_time, self.boot_fail_after)
340 missing = arvados_node_missing(self.arvados_node, self.node_stale_after)
341 if missing and self._cloud.broken(self.cloud_node):
342 # Node is paired, but Arvados says it is missing and the cloud says the node
343 # is in an error state, so shut it down.
345 if missing is None and self._cloud.broken(self.cloud_node):
346 self._logger.warning(
347 "cloud reports broken node, but paired node %s never pinged "
348 "(bug?) -- skipped check for node_stale_after",
349 self.arvados_node['uuid'])
350 return self.in_state('idle')
352 def consider_shutdown(self):
353 next_opening = self._shutdowns.next_opening()
354 if self.shutdown_eligible():
355 self._debug("Node %s suggesting shutdown.", self.cloud_node.id)
356 _notify_subscribers(self._later, self.subscribers)
357 elif self._shutdowns.window_open():
358 self._debug("Node %s shutdown window open but node busy.",
360 elif self.last_shutdown_opening != next_opening:
361 self._debug("Node %s shutdown window closed. Next at %s.",
362 self.cloud_node.id, time.ctime(next_opening))
363 self._timer.schedule(next_opening, self._later.consider_shutdown)
364 self.last_shutdown_opening = next_opening
366 def offer_arvados_pair(self, arvados_node):
367 first_ping_s = arvados_node.get('first_ping_at')
368 if (self.arvados_node is not None) or (not first_ping_s):
370 elif ((arvados_node['ip_address'] in self.cloud_node.private_ips) and
371 (arvados_timestamp(first_ping_s) >= self.cloud_node_start_time)):
372 self._later.update_arvados_node(arvados_node)
373 return self.cloud_node.id
377 def update_cloud_node(self, cloud_node):
378 if cloud_node is not None:
379 self.cloud_node = cloud_node
380 self._later.consider_shutdown()
382 def update_arvados_node(self, arvados_node):
383 # If the cloud node's FQDN doesn't match what's in the Arvados node
384 # record, make them match.
385 # This method is a little unusual in the way it just fires off the
386 # request without checking the result or retrying errors. That's
387 # because this update happens every time we reload the Arvados node
388 # list: if a previous sync attempt failed, we'll see that the names
389 # are out of sync and just try again. ComputeNodeUpdateActor has
390 # the logic to throttle those effective retries when there's trouble.
391 if arvados_node is not None:
392 self.arvados_node = arvados_node
393 if (self._cloud_node_fqdn(self.cloud_node) !=
394 arvados_node_fqdn(self.arvados_node)):
395 self._update.sync_node(self.cloud_node, self.arvados_node)
396 self._later.consider_shutdown()