3 from __future__ import absolute_import, print_function
9 import libcloud.common.types as cloud_types
13 arvados_node_fqdn, arvados_node_mtime, arvados_timestamp, timestamp_fresh, \
14 arvados_node_missing, RetryMixin
15 from ...clientactor import _notify_subscribers
16 from ... import config
18 class ComputeNodeStateChangeBase(config.actor_class, RetryMixin):
19 """Base class for actors that change a compute node's state.
21 This base class takes care of retrying changes and notifying
22 subscribers when the change is finished.
24 def __init__(self, logger_name, cloud_client, arvados_client, timer_actor,
25 retry_wait, max_retry_wait):
26 super(ComputeNodeStateChangeBase, self).__init__()
27 RetryMixin.__init__(self,
30 logging.getLogger(logger_name),
33 self._later = self.actor_ref.proxy()
34 self._arvados = arvados_client
35 self.subscribers = set()
38 _notify_subscribers(self._later, self.subscribers)
39 self.subscribers = None
41 def subscribe(self, subscriber):
42 if self.subscribers is None:
44 subscriber(self._later)
45 except pykka.ActorDeadError:
48 self.subscribers.add(subscriber)
50 def _clean_arvados_node(self, arvados_node, explanation):
51 return self._arvados.nodes().update(
52 uuid=arvados_node['uuid'],
53 body={'hostname': None,
56 'first_ping_at': None,
59 'info': {'ec2_instance_id': None,
60 'last_action': explanation}},
64 class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
65 """Actor to create and set up a cloud compute node.
67 This actor prepares an Arvados node record for a new compute node
68 (either creating one or cleaning one passed in), then boots the
69 actual compute node. It notifies subscribers when the cloud node
70 is successfully created (the last step in the process for Node
73 def __init__(self, timer_actor, arvados_client, cloud_client,
74 cloud_size, arvados_node=None,
75 retry_wait=1, max_retry_wait=180):
76 super(ComputeNodeSetupActor, self).__init__(
77 'arvnodeman.nodeup', cloud_client, arvados_client, timer_actor,
78 retry_wait, max_retry_wait)
79 self.cloud_size = cloud_size
80 self.arvados_node = None
81 self.cloud_node = None
82 if arvados_node is None:
83 self._later.create_arvados_node()
85 self._later.prepare_arvados_node(arvados_node)
87 @RetryMixin._retry(config.ARVADOS_ERRORS)
88 def create_arvados_node(self):
89 self.arvados_node = self._arvados.nodes().create(body={}).execute()
90 self._later.create_cloud_node()
92 @RetryMixin._retry(config.ARVADOS_ERRORS)
93 def prepare_arvados_node(self, node):
94 self.arvados_node = self._clean_arvados_node(
95 node, "Prepared by Node Manager")
96 self._later.create_cloud_node()
99 def create_cloud_node(self):
100 self._logger.info("Creating cloud node with size %s.",
101 self.cloud_size.name)
102 self.cloud_node = self._cloud.create_node(self.cloud_size,
104 if not self.cloud_node.size:
105 self.cloud_node.size = self.cloud_size
106 self._logger.info("Cloud node %s created.", self.cloud_node.id)
107 self._later.update_arvados_node_properties()
109 @RetryMixin._retry(config.ARVADOS_ERRORS)
110 def update_arvados_node_properties(self):
111 """Tell Arvados some details about the cloud node.
113 Currently we only include size/price from our request, which
114 we already knew before create_cloud_node(), but doing it here
115 gives us an opportunity to provide more detail from
116 self.cloud_node, too.
118 self.arvados_node['properties']['cloud_node'] = {
119 # Note this 'size' is the node size we asked the cloud
120 # driver to create -- not necessarily equal to the size
121 # reported by the cloud driver for the node that was
123 'size': self.cloud_size.id,
124 'price': self.cloud_size.price,
126 self.arvados_node = self._arvados.nodes().update(
127 uuid=self.arvados_node['uuid'],
128 body={'properties': self.arvados_node['properties']},
130 self._logger.info("%s updated properties.", self.arvados_node['uuid'])
131 self._later.post_create()
134 def post_create(self):
135 self._cloud.post_create_node(self.cloud_node)
136 self._logger.info("%s post-create work done.", self.cloud_node.id)
139 def stop_if_no_cloud_node(self):
140 if self.cloud_node is not None:
146 class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
147 """Actor to shut down a compute node.
149 This actor simply destroys a cloud node, retrying as needed.
151 # Reasons for a shutdown to be cancelled.
152 WINDOW_CLOSED = "shutdown window closed"
153 NODE_BROKEN = "cloud failed to shut down broken node"
155 def __init__(self, timer_actor, cloud_client, arvados_client, node_monitor,
156 cancellable=True, retry_wait=1, max_retry_wait=180):
157 # If a ShutdownActor is cancellable, it will ask the
158 # ComputeNodeMonitorActor if it's still eligible before taking each
159 # action, and stop the shutdown process if the node is no longer
160 # eligible. Normal shutdowns based on job demand should be
161 # cancellable; shutdowns based on node misbehavior should not.
162 super(ComputeNodeShutdownActor, self).__init__(
163 'arvnodeman.nodedown', cloud_client, arvados_client, timer_actor,
164 retry_wait, max_retry_wait)
165 self._monitor = node_monitor.proxy()
166 self.cloud_node = self._monitor.cloud_node.get()
167 self.cancellable = cancellable
168 self.cancel_reason = None
172 self._later.shutdown_node()
174 def _arvados_node(self):
175 return self._monitor.arvados_node.get()
177 def _finished(self, success_flag=None):
178 if success_flag is not None:
179 self.success = success_flag
180 return super(ComputeNodeShutdownActor, self)._finished()
182 def cancel_shutdown(self, reason):
183 self.cancel_reason = reason
184 self._logger.info("Cloud node %s shutdown cancelled: %s.",
185 self.cloud_node.id, reason)
186 self._finished(success_flag=False)
188 def _stop_if_window_closed(orig_func):
189 @functools.wraps(orig_func)
190 def stop_wrapper(self, *args, **kwargs):
191 if (self.cancellable and
192 (not self._monitor.shutdown_eligible().get())):
193 self._later.cancel_shutdown(self.WINDOW_CLOSED)
196 return orig_func(self, *args, **kwargs)
199 @_stop_if_window_closed
201 def shutdown_node(self):
202 if not self._cloud.destroy_node(self.cloud_node):
203 if self._cloud.broken(self.cloud_node):
204 self._later.cancel_shutdown(self.NODE_BROKEN)
207 raise cloud_types.LibcloudError("destroy_node failed")
208 self._logger.info("Cloud node %s shut down.", self.cloud_node.id)
209 arv_node = self._arvados_node()
211 self._finished(success_flag=True)
213 self._later.clean_arvados_node(arv_node)
215 @RetryMixin._retry(config.ARVADOS_ERRORS)
216 def clean_arvados_node(self, arvados_node):
217 self._clean_arvados_node(arvados_node, "Shut down by Node Manager")
218 self._finished(success_flag=True)
220 # Make the decorator available to subclasses.
221 _stop_if_window_closed = staticmethod(_stop_if_window_closed)
224 class ComputeNodeUpdateActor(config.actor_class):
225 """Actor to dispatch one-off cloud management requests.
227 This actor receives requests for small cloud updates, and
228 dispatches them to a real driver. ComputeNodeMonitorActors use
229 this to perform maintenance tasks on themselves. Having a
230 dedicated actor for this gives us the opportunity to control the
231 flow of requests; e.g., by backing off when errors occur.
233 This actor is most like a "traditional" Pykka actor: there's no
234 subscribing, but instead methods return real driver results. If
235 you're interested in those results, you should get them from the
236 Future that the proxy method returns. Be prepared to handle exceptions
237 from the cloud driver when you do.
239 def __init__(self, cloud_factory, max_retry_wait=180):
240 super(ComputeNodeUpdateActor, self).__init__()
241 self._cloud = cloud_factory()
242 self.max_retry_wait = max_retry_wait
243 self.error_streak = 0
244 self.next_request_time = time.time()
246 def _throttle_errors(orig_func):
247 @functools.wraps(orig_func)
248 def throttle_wrapper(self, *args, **kwargs):
249 throttle_time = self.next_request_time - time.time()
250 if throttle_time > 0:
251 time.sleep(throttle_time)
252 self.next_request_time = time.time()
254 result = orig_func(self, *args, **kwargs)
255 except Exception as error:
256 self.error_streak += 1
257 self.next_request_time += min(2 ** self.error_streak,
261 self.error_streak = 0
263 return throttle_wrapper
266 def sync_node(self, cloud_node, arvados_node):
267 return self._cloud.sync_node(cloud_node, arvados_node)
270 class ComputeNodeMonitorActor(config.actor_class):
271 """Actor to manage a running compute node.
273 This actor gets updates about a compute node's cloud and Arvados records.
274 It uses this information to notify subscribers when the node is eligible
277 def __init__(self, cloud_node, cloud_node_start_time, shutdown_timer,
278 cloud_fqdn_func, timer_actor, update_actor, cloud_client,
279 arvados_node=None, poll_stale_after=600, node_stale_after=3600,
282 super(ComputeNodeMonitorActor, self).__init__()
283 self._later = self.actor_ref.proxy()
284 self._logger = logging.getLogger('arvnodeman.computenode')
285 self._last_log = None
286 self._shutdowns = shutdown_timer
287 self._cloud_node_fqdn = cloud_fqdn_func
288 self._timer = timer_actor
289 self._update = update_actor
290 self._cloud = cloud_client
291 self.cloud_node = cloud_node
292 self.cloud_node_start_time = cloud_node_start_time
293 self.poll_stale_after = poll_stale_after
294 self.node_stale_after = node_stale_after
295 self.boot_fail_after = boot_fail_after
296 self.subscribers = set()
297 self.arvados_node = None
298 self._later.update_arvados_node(arvados_node)
299 self.last_shutdown_opening = None
300 self._later.consider_shutdown()
302 def subscribe(self, subscriber):
303 self.subscribers.add(subscriber)
305 def _debug(self, msg, *args):
306 if msg == self._last_log:
309 self._logger.debug(msg, *args)
311 def in_state(self, *states):
312 # Return a boolean to say whether or not our Arvados node record is in
313 # one of the given states. If state information is not
314 # available--because this node has no Arvados record, the record is
315 # stale, or the record has no state information--return None.
316 if (self.arvados_node is None) or not timestamp_fresh(
317 arvados_node_mtime(self.arvados_node), self.node_stale_after):
319 state = self.arvados_node['crunch_worker_state']
322 result = state in states
324 result = result and not self.arvados_node['job_uuid']
327 def shutdown_eligible(self):
328 if not self._shutdowns.window_open():
330 if self.arvados_node is None:
332 # If it hasn't pinged Arvados after boot_fail seconds, shut it down
333 return not timestamp_fresh(self.cloud_node_start_time, self.boot_fail_after)
334 missing = arvados_node_missing(self.arvados_node, self.node_stale_after)
335 if missing and self._cloud.broken(self.cloud_node):
336 # Node is paired, but Arvados says it is missing and the cloud says the node
337 # is in an error state, so shut it down.
339 if missing is None and self._cloud.broken(self.cloud_node):
340 self._logger.warning(
341 "cloud reports broken node, but paired node %s never pinged "
342 "(bug?) -- skipped check for node_stale_after",
343 self.arvados_node['uuid'])
344 return self.in_state('idle')
346 def consider_shutdown(self):
347 next_opening = self._shutdowns.next_opening()
348 if self.shutdown_eligible():
349 self._debug("Node %s suggesting shutdown.", self.cloud_node.id)
350 _notify_subscribers(self._later, self.subscribers)
351 elif self._shutdowns.window_open():
352 self._debug("Node %s shutdown window open but node busy.",
354 elif self.last_shutdown_opening != next_opening:
355 self._debug("Node %s shutdown window closed. Next at %s.",
356 self.cloud_node.id, time.ctime(next_opening))
357 self._timer.schedule(next_opening, self._later.consider_shutdown)
358 self.last_shutdown_opening = next_opening
360 def offer_arvados_pair(self, arvados_node):
361 first_ping_s = arvados_node.get('first_ping_at')
362 if (self.arvados_node is not None) or (not first_ping_s):
364 elif ((arvados_node['ip_address'] in self.cloud_node.private_ips) and
365 (arvados_timestamp(first_ping_s) >= self.cloud_node_start_time)):
366 self._later.update_arvados_node(arvados_node)
367 return self.cloud_node.id
371 def update_cloud_node(self, cloud_node):
372 if cloud_node is not None:
373 self.cloud_node = cloud_node
374 self._later.consider_shutdown()
376 def update_arvados_node(self, arvados_node):
377 # If the cloud node's FQDN doesn't match what's in the Arvados node
378 # record, make them match.
379 # This method is a little unusual in the way it just fires off the
380 # request without checking the result or retrying errors. That's
381 # because this update happens every time we reload the Arvados node
382 # list: if a previous sync attempt failed, we'll see that the names
383 # are out of sync and just try again. ComputeNodeUpdateActor has
384 # the logic to throttle those effective retries when there's trouble.
385 if arvados_node is not None:
386 self.arvados_node = arvados_node
387 if (self._cloud_node_fqdn(self.cloud_node) !=
388 arvados_node_fqdn(self.arvados_node)):
389 self._update.sync_node(self.cloud_node, self.arvados_node)
390 self._later.consider_shutdown()