3 from __future__ import absolute_import, print_function
9 import libcloud.common.types as cloud_types
13 arvados_node_fqdn, arvados_node_mtime, arvados_timestamp, timestamp_fresh, \
14 arvados_node_missing, RetryMixin
15 from ...clientactor import _notify_subscribers
16 from ... import config
18 class ComputeNodeStateChangeBase(config.actor_class, RetryMixin):
19 """Base class for actors that change a compute node's state.
21 This base class takes care of retrying changes and notifying
22 subscribers when the change is finished.
24 def __init__(self, cloud_client, arvados_client, timer_actor,
25 retry_wait, max_retry_wait):
26 super(ComputeNodeStateChangeBase, self).__init__()
27 RetryMixin.__init__(self, retry_wait, max_retry_wait,
28 None, cloud_client, timer_actor)
29 self._later = self.actor_ref.tell_proxy()
30 self._arvados = arvados_client
31 self.subscribers = set()
33 def _set_logger(self):
34 self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, self.actor_urn[33:]))
40 if self.subscribers is None:
41 raise Exception("Actor tried to finish twice")
42 _notify_subscribers(self.actor_ref.proxy(), self.subscribers)
43 self.subscribers = None
44 self._logger.info("finished")
46 def subscribe(self, subscriber):
47 if self.subscribers is None:
49 subscriber(self._later)
50 except pykka.ActorDeadError:
53 self.subscribers.add(subscriber)
55 def _clean_arvados_node(self, arvados_node, explanation):
56 return self._arvados.nodes().update(
57 uuid=arvados_node['uuid'],
58 body={'hostname': None,
61 'first_ping_at': None,
64 'info': {'ec2_instance_id': None,
65 'last_action': explanation}},
69 def _finish_on_exception(orig_func):
70 @functools.wraps(orig_func)
71 def finish_wrapper(self, *args, **kwargs):
73 return orig_func(self, *args, **kwargs)
74 except Exception as error:
75 self._logger.error("Actor error %s", error)
80 class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
81 """Actor to create and set up a cloud compute node.
83 This actor prepares an Arvados node record for a new compute node
84 (either creating one or cleaning one passed in), then boots the
85 actual compute node. It notifies subscribers when the cloud node
86 is successfully created (the last step in the process for Node
89 def __init__(self, timer_actor, arvados_client, cloud_client,
90 cloud_size, arvados_node=None,
91 retry_wait=1, max_retry_wait=180):
92 super(ComputeNodeSetupActor, self).__init__(
93 cloud_client, arvados_client, timer_actor,
94 retry_wait, max_retry_wait)
95 self.cloud_size = cloud_size
96 self.arvados_node = None
97 self.cloud_node = None
98 if arvados_node is None:
99 self._later.create_arvados_node()
101 self._later.prepare_arvados_node(arvados_node)
103 @ComputeNodeStateChangeBase._finish_on_exception
104 @RetryMixin._retry(config.ARVADOS_ERRORS)
105 def create_arvados_node(self):
106 self.arvados_node = self._arvados.nodes().create(body={}).execute()
107 self._later.create_cloud_node()
109 @ComputeNodeStateChangeBase._finish_on_exception
110 @RetryMixin._retry(config.ARVADOS_ERRORS)
111 def prepare_arvados_node(self, node):
112 self.arvados_node = self._clean_arvados_node(
113 node, "Prepared by Node Manager")
114 self._later.create_cloud_node()
116 @ComputeNodeStateChangeBase._finish_on_exception
118 def create_cloud_node(self):
119 self._logger.info("Sending create_node request for node size %s.",
120 self.cloud_size.name)
121 self.cloud_node = self._cloud.create_node(self.cloud_size,
123 if not self.cloud_node.size:
124 self.cloud_node.size = self.cloud_size
125 self._logger.info("Cloud node %s created.", self.cloud_node.id)
126 self._later.update_arvados_node_properties()
128 @ComputeNodeStateChangeBase._finish_on_exception
129 @RetryMixin._retry(config.ARVADOS_ERRORS)
130 def update_arvados_node_properties(self):
131 """Tell Arvados some details about the cloud node.
133 Currently we only include size/price from our request, which
134 we already knew before create_cloud_node(), but doing it here
135 gives us an opportunity to provide more detail from
136 self.cloud_node, too.
138 self.arvados_node['properties']['cloud_node'] = {
139 # Note this 'size' is the node size we asked the cloud
140 # driver to create -- not necessarily equal to the size
141 # reported by the cloud driver for the node that was
143 'size': self.cloud_size.id,
144 'price': self.cloud_size.price,
146 self.arvados_node = self._arvados.nodes().update(
147 uuid=self.arvados_node['uuid'],
148 body={'properties': self.arvados_node['properties']},
150 self._logger.info("%s updated properties.", self.arvados_node['uuid'])
151 self._later.post_create()
154 def post_create(self):
155 self._cloud.post_create_node(self.cloud_node)
156 self._logger.info("%s post-create work done.", self.cloud_node.id)
159 def stop_if_no_cloud_node(self):
160 if self.cloud_node is not None:
166 class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
167 """Actor to shut down a compute node.
169 This actor simply destroys a cloud node, retrying as needed.
171 # Reasons for a shutdown to be cancelled.
172 WINDOW_CLOSED = "shutdown window closed"
173 NODE_BROKEN = "cloud failed to shut down broken node"
175 def __init__(self, timer_actor, cloud_client, arvados_client, node_monitor,
176 cancellable=True, retry_wait=1, max_retry_wait=180):
177 # If a ShutdownActor is cancellable, it will ask the
178 # ComputeNodeMonitorActor if it's still eligible before taking each
179 # action, and stop the shutdown process if the node is no longer
180 # eligible. Normal shutdowns based on job demand should be
181 # cancellable; shutdowns based on node misbehavior should not.
182 super(ComputeNodeShutdownActor, self).__init__(
183 cloud_client, arvados_client, timer_actor,
184 retry_wait, max_retry_wait)
185 self._monitor = node_monitor.proxy()
186 self.cloud_node = self._monitor.cloud_node.get()
187 self.cancellable = cancellable
188 self.cancel_reason = None
191 def _set_logger(self):
192 self._logger = logging.getLogger("%s.%s.%s" % (self.__class__.__name__, self.actor_urn[33:], self.cloud_node.name))
195 super(ComputeNodeShutdownActor, self).on_start()
196 self._later.shutdown_node()
198 def _arvados_node(self):
199 return self._monitor.arvados_node.get()
201 def _finished(self, success_flag=None):
202 if success_flag is not None:
203 self.success = success_flag
204 return super(ComputeNodeShutdownActor, self)._finished()
206 def cancel_shutdown(self, reason):
207 self.cancel_reason = reason
208 self._logger.info("Shutdown cancelled: %s.", reason)
209 self._finished(success_flag=False)
211 def _stop_if_window_closed(orig_func):
212 @functools.wraps(orig_func)
213 def stop_wrapper(self, *args, **kwargs):
214 if (self.cancellable and
215 (self._monitor.shutdown_eligible().get() is not True)):
216 self._later.cancel_shutdown(self.WINDOW_CLOSED)
219 return orig_func(self, *args, **kwargs)
222 @ComputeNodeStateChangeBase._finish_on_exception
223 @_stop_if_window_closed
225 def shutdown_node(self):
226 self._logger.info("Starting shutdown")
227 if not self._cloud.destroy_node(self.cloud_node):
228 if self._cloud.broken(self.cloud_node):
229 self._later.cancel_shutdown(self.NODE_BROKEN)
233 raise cloud_types.LibcloudError("destroy_node failed")
234 self._logger.info("Shutdown success")
235 arv_node = self._arvados_node()
237 self._finished(success_flag=True)
239 self._later.clean_arvados_node(arv_node)
241 @ComputeNodeStateChangeBase._finish_on_exception
242 @RetryMixin._retry(config.ARVADOS_ERRORS)
243 def clean_arvados_node(self, arvados_node):
244 self._clean_arvados_node(arvados_node, "Shut down by Node Manager")
245 self._finished(success_flag=True)
247 # Make the decorator available to subclasses.
248 _stop_if_window_closed = staticmethod(_stop_if_window_closed)
251 class ComputeNodeUpdateActor(config.actor_class):
252 """Actor to dispatch one-off cloud management requests.
254 This actor receives requests for small cloud updates, and
255 dispatches them to a real driver. ComputeNodeMonitorActors use
256 this to perform maintenance tasks on themselves. Having a
257 dedicated actor for this gives us the opportunity to control the
258 flow of requests; e.g., by backing off when errors occur.
260 This actor is most like a "traditional" Pykka actor: there's no
261 subscribing, but instead methods return real driver results. If
262 you're interested in those results, you should get them from the
263 Future that the proxy method returns. Be prepared to handle exceptions
264 from the cloud driver when you do.
266 def __init__(self, cloud_factory, max_retry_wait=180):
267 super(ComputeNodeUpdateActor, self).__init__()
268 self._cloud = cloud_factory()
269 self.max_retry_wait = max_retry_wait
270 self.error_streak = 0
271 self.next_request_time = time.time()
273 def _throttle_errors(orig_func):
274 @functools.wraps(orig_func)
275 def throttle_wrapper(self, *args, **kwargs):
276 throttle_time = self.next_request_time - time.time()
277 if throttle_time > 0:
278 time.sleep(throttle_time)
279 self.next_request_time = time.time()
281 result = orig_func(self, *args, **kwargs)
282 except Exception as error:
283 self.error_streak += 1
284 self.next_request_time += min(2 ** self.error_streak,
288 self.error_streak = 0
290 return throttle_wrapper
293 def sync_node(self, cloud_node, arvados_node):
294 return self._cloud.sync_node(cloud_node, arvados_node)
297 class ComputeNodeMonitorActor(config.actor_class):
298 """Actor to manage a running compute node.
300 This actor gets updates about a compute node's cloud and Arvados records.
301 It uses this information to notify subscribers when the node is eligible
304 def __init__(self, cloud_node, cloud_node_start_time, shutdown_timer,
305 cloud_fqdn_func, timer_actor, update_actor, cloud_client,
306 arvados_node=None, poll_stale_after=600, node_stale_after=3600,
309 super(ComputeNodeMonitorActor, self).__init__()
310 self._later = self.actor_ref.tell_proxy()
311 self._last_log = None
312 self._shutdowns = shutdown_timer
313 self._cloud_node_fqdn = cloud_fqdn_func
314 self._timer = timer_actor
315 self._update = update_actor
316 self._cloud = cloud_client
317 self.cloud_node = cloud_node
318 self.cloud_node_start_time = cloud_node_start_time
319 self.poll_stale_after = poll_stale_after
320 self.node_stale_after = node_stale_after
321 self.boot_fail_after = boot_fail_after
322 self.subscribers = set()
323 self.arvados_node = None
324 self._later.update_arvados_node(arvados_node)
325 self.last_shutdown_opening = None
326 self._later.consider_shutdown()
328 def _set_logger(self):
329 self._logger = logging.getLogger("%s.%s.%s" % (self.__class__.__name__, self.actor_urn[33:], self.cloud_node.name))
333 self._timer.schedule(self.cloud_node_start_time + self.boot_fail_after, self._later.consider_shutdown)
335 def subscribe(self, subscriber):
336 self.subscribers.add(subscriber)
338 def _debug(self, msg, *args):
339 if msg == self._last_log:
342 self._logger.debug(msg, *args)
344 def in_state(self, *states):
345 # Return a boolean to say whether or not our Arvados node record is in
346 # one of the given states. If state information is not
347 # available--because this node has no Arvados record, the record is
348 # stale, or the record has no state information--return None.
349 if (self.arvados_node is None) or not timestamp_fresh(
350 arvados_node_mtime(self.arvados_node), self.node_stale_after):
352 state = self.arvados_node['crunch_worker_state']
355 result = state in states
357 result = result and not self.arvados_node['job_uuid']
360 def shutdown_eligible(self):
361 """Return True if eligible for shutdown, or a string explaining why the node
362 is not eligible for shutdown."""
364 if not self._shutdowns.window_open():
365 return "shutdown window is not open."
366 if self.arvados_node is None:
368 # If it hasn't pinged Arvados after boot_fail seconds, shut it down
369 if timestamp_fresh(self.cloud_node_start_time, self.boot_fail_after):
370 return "node is still booting, will be considered a failed boot at %s" % time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(self.cloud_node_start_time + self.boot_fail_after))
373 missing = arvados_node_missing(self.arvados_node, self.node_stale_after)
374 if missing and self._cloud.broken(self.cloud_node):
375 # Node is paired, but Arvados says it is missing and the cloud says the node
376 # is in an error state, so shut it down.
378 if missing is None and self._cloud.broken(self.cloud_node):
380 "Cloud node considered 'broken' but paired node %s last_ping_at is None, " +
381 "cannot check node_stale_after (node may be shut down and we just haven't gotten the message yet).",
382 self.arvados_node['uuid'])
383 if self.in_state('idle'):
386 return "node is not idle."
388 def consider_shutdown(self):
390 next_opening = self._shutdowns.next_opening()
391 eligible = self.shutdown_eligible()
393 self._debug("Suggesting shutdown.")
394 _notify_subscribers(self.actor_ref.proxy(), self.subscribers)
395 elif self._shutdowns.window_open():
396 self._debug("Cannot shut down because %s", eligible)
397 elif self.last_shutdown_opening != next_opening:
398 self._debug("Shutdown window closed. Next at %s.",
399 time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(next_opening)))
400 self._timer.schedule(next_opening, self._later.consider_shutdown)
401 self.last_shutdown_opening = next_opening
403 self._logger.exception("Unexpected exception")
405 def offer_arvados_pair(self, arvados_node):
406 first_ping_s = arvados_node.get('first_ping_at')
407 if (self.arvados_node is not None) or (not first_ping_s):
409 elif ((arvados_node['ip_address'] in self.cloud_node.private_ips) and
410 (arvados_timestamp(first_ping_s) >= self.cloud_node_start_time)):
411 self._later.update_arvados_node(arvados_node)
412 return self.cloud_node.id
416 def update_cloud_node(self, cloud_node):
417 if cloud_node is not None:
418 self.cloud_node = cloud_node
419 self._later.consider_shutdown()
421 def update_arvados_node(self, arvados_node):
422 # If the cloud node's FQDN doesn't match what's in the Arvados node
423 # record, make them match.
424 # This method is a little unusual in the way it just fires off the
425 # request without checking the result or retrying errors. That's
426 # because this update happens every time we reload the Arvados node
427 # list: if a previous sync attempt failed, we'll see that the names
428 # are out of sync and just try again. ComputeNodeUpdateActor has
429 # the logic to throttle those effective retries when there's trouble.
430 if arvados_node is not None:
431 self.arvados_node = arvados_node
432 if (self._cloud_node_fqdn(self.cloud_node) !=
433 arvados_node_fqdn(self.arvados_node)):
434 self._update.sync_node(self.cloud_node, self.arvados_node)
435 self._later.consider_shutdown()