3 from __future__ import absolute_import, print_function
9 import libcloud.common.types as cloud_types
13 arvados_node_fqdn, arvados_node_mtime, arvados_timestamp, timestamp_fresh, \
14 arvados_node_missing, RetryMixin
15 from ...clientactor import _notify_subscribers
16 from ... import config
18 class ComputeNodeStateChangeBase(config.actor_class, RetryMixin):
19 """Base class for actors that change a compute node's state.
21 This base class takes care of retrying changes and notifying
22 subscribers when the change is finished.
24 def __init__(self, cloud_client, arvados_client, timer_actor,
25 retry_wait, max_retry_wait):
26 super(ComputeNodeStateChangeBase, self).__init__()
27 RetryMixin.__init__(self, retry_wait, max_retry_wait,
28 None, cloud_client, timer_actor)
29 self._later = self.actor_ref.proxy()
30 self._arvados = arvados_client
31 self.subscribers = set()
33 def _set_logger(self):
34 self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, self.actor_urn[33:]))
40 _notify_subscribers(self._later, self.subscribers)
41 self.subscribers = None
42 self._logger.info("finished")
44 def subscribe(self, subscriber):
45 if self.subscribers is None:
47 subscriber(self._later)
48 except pykka.ActorDeadError:
51 self.subscribers.add(subscriber)
53 def _clean_arvados_node(self, arvados_node, explanation):
54 return self._arvados.nodes().update(
55 uuid=arvados_node['uuid'],
56 body={'hostname': None,
59 'first_ping_at': None,
62 'info': {'ec2_instance_id': None,
63 'last_action': explanation}},
67 def _finish_on_exception(orig_func):
68 @functools.wraps(orig_func)
69 def finish_wrapper(self, *args, **kwargs):
71 return orig_func(self, *args, **kwargs)
72 except Exception as error:
73 self._logger.error("Actor error %s", error)
78 class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
79 """Actor to create and set up a cloud compute node.
81 This actor prepares an Arvados node record for a new compute node
82 (either creating one or cleaning one passed in), then boots the
83 actual compute node. It notifies subscribers when the cloud node
84 is successfully created (the last step in the process for Node
87 def __init__(self, timer_actor, arvados_client, cloud_client,
88 cloud_size, arvados_node=None,
89 retry_wait=1, max_retry_wait=180):
90 super(ComputeNodeSetupActor, self).__init__(
91 cloud_client, arvados_client, timer_actor,
92 retry_wait, max_retry_wait)
93 self.cloud_size = cloud_size
94 self.arvados_node = None
95 self.cloud_node = None
96 if arvados_node is None:
97 self._later.create_arvados_node()
99 self._later.prepare_arvados_node(arvados_node)
101 @ComputeNodeStateChangeBase._finish_on_exception
102 @RetryMixin._retry(config.ARVADOS_ERRORS)
103 def create_arvados_node(self):
104 self.arvados_node = self._arvados.nodes().create(body={}).execute()
105 self._later.create_cloud_node()
107 @ComputeNodeStateChangeBase._finish_on_exception
108 @RetryMixin._retry(config.ARVADOS_ERRORS)
109 def prepare_arvados_node(self, node):
110 self.arvados_node = self._clean_arvados_node(
111 node, "Prepared by Node Manager")
112 self._later.create_cloud_node()
114 @ComputeNodeStateChangeBase._finish_on_exception
116 def create_cloud_node(self):
117 self._logger.info("Sending create_node request for node size %s.",
118 self.cloud_size.name)
119 self.cloud_node = self._cloud.create_node(self.cloud_size,
121 if not self.cloud_node.size:
122 self.cloud_node.size = self.cloud_size
123 self._logger.info("Cloud node %s created.", self.cloud_node.id)
124 self._later.update_arvados_node_properties()
126 @ComputeNodeStateChangeBase._finish_on_exception
127 @RetryMixin._retry(config.ARVADOS_ERRORS)
128 def update_arvados_node_properties(self):
129 """Tell Arvados some details about the cloud node.
131 Currently we only include size/price from our request, which
132 we already knew before create_cloud_node(), but doing it here
133 gives us an opportunity to provide more detail from
134 self.cloud_node, too.
136 self.arvados_node['properties']['cloud_node'] = {
137 # Note this 'size' is the node size we asked the cloud
138 # driver to create -- not necessarily equal to the size
139 # reported by the cloud driver for the node that was
141 'size': self.cloud_size.id,
142 'price': self.cloud_size.price,
144 self.arvados_node = self._arvados.nodes().update(
145 uuid=self.arvados_node['uuid'],
146 body={'properties': self.arvados_node['properties']},
148 self._logger.info("%s updated properties.", self.arvados_node['uuid'])
149 self._later.post_create()
152 def post_create(self):
153 self._cloud.post_create_node(self.cloud_node)
154 self._logger.info("%s post-create work done.", self.cloud_node.id)
157 def stop_if_no_cloud_node(self):
158 if self.cloud_node is not None:
164 class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
165 """Actor to shut down a compute node.
167 This actor simply destroys a cloud node, retrying as needed.
169 # Reasons for a shutdown to be cancelled.
170 WINDOW_CLOSED = "shutdown window closed"
171 NODE_BROKEN = "cloud failed to shut down broken node"
173 def __init__(self, timer_actor, cloud_client, arvados_client, node_monitor,
174 cancellable=True, retry_wait=1, max_retry_wait=180):
175 # If a ShutdownActor is cancellable, it will ask the
176 # ComputeNodeMonitorActor if it's still eligible before taking each
177 # action, and stop the shutdown process if the node is no longer
178 # eligible. Normal shutdowns based on job demand should be
179 # cancellable; shutdowns based on node misbehavior should not.
180 super(ComputeNodeShutdownActor, self).__init__(
181 cloud_client, arvados_client, timer_actor,
182 retry_wait, max_retry_wait)
183 self._monitor = node_monitor.proxy()
184 self.cloud_node = self._monitor.cloud_node.get()
185 self.cancellable = cancellable
186 self.cancel_reason = None
189 def _set_logger(self):
190 self._logger = logging.getLogger("%s.%s.%s" % (self.__class__.__name__, self.actor_urn[33:], self.cloud_node.name))
193 super(ComputeNodeShutdownActor, self).on_start()
194 self._later.shutdown_node()
196 def _arvados_node(self):
197 return self._monitor.arvados_node.get()
199 def _finished(self, success_flag=None):
200 if success_flag is not None:
201 self.success = success_flag
202 return super(ComputeNodeShutdownActor, self)._finished()
204 def cancel_shutdown(self, reason):
205 self.cancel_reason = reason
206 self._logger.info("Shutdown cancelled: %s.", reason)
207 self._finished(success_flag=False)
209 def _stop_if_window_closed(orig_func):
210 @functools.wraps(orig_func)
211 def stop_wrapper(self, *args, **kwargs):
212 if (self.cancellable and
213 (self._monitor.shutdown_eligible().get() is not True)):
214 self._later.cancel_shutdown(self.WINDOW_CLOSED)
217 return orig_func(self, *args, **kwargs)
220 @ComputeNodeStateChangeBase._finish_on_exception
221 @_stop_if_window_closed
223 def shutdown_node(self):
224 self._logger.info("Starting shutdown")
225 if not self._cloud.destroy_node(self.cloud_node):
226 if self._cloud.broken(self.cloud_node):
227 self._later.cancel_shutdown(self.NODE_BROKEN)
230 raise cloud_types.LibcloudError("destroy_node failed")
231 self._logger.info("Shutdown success")
232 arv_node = self._arvados_node()
234 self._finished(success_flag=True)
236 self._later.clean_arvados_node(arv_node)
238 @ComputeNodeStateChangeBase._finish_on_exception
239 @RetryMixin._retry(config.ARVADOS_ERRORS)
240 def clean_arvados_node(self, arvados_node):
241 self._clean_arvados_node(arvados_node, "Shut down by Node Manager")
242 self._finished(success_flag=True)
244 # Make the decorator available to subclasses.
245 _stop_if_window_closed = staticmethod(_stop_if_window_closed)
248 class ComputeNodeUpdateActor(config.actor_class):
249 """Actor to dispatch one-off cloud management requests.
251 This actor receives requests for small cloud updates, and
252 dispatches them to a real driver. ComputeNodeMonitorActors use
253 this to perform maintenance tasks on themselves. Having a
254 dedicated actor for this gives us the opportunity to control the
255 flow of requests; e.g., by backing off when errors occur.
257 This actor is most like a "traditional" Pykka actor: there's no
258 subscribing, but instead methods return real driver results. If
259 you're interested in those results, you should get them from the
260 Future that the proxy method returns. Be prepared to handle exceptions
261 from the cloud driver when you do.
263 def __init__(self, cloud_factory, max_retry_wait=180):
264 super(ComputeNodeUpdateActor, self).__init__()
265 self._cloud = cloud_factory()
266 self.max_retry_wait = max_retry_wait
267 self.error_streak = 0
268 self.next_request_time = time.time()
270 def _throttle_errors(orig_func):
271 @functools.wraps(orig_func)
272 def throttle_wrapper(self, *args, **kwargs):
273 throttle_time = self.next_request_time - time.time()
274 if throttle_time > 0:
275 time.sleep(throttle_time)
276 self.next_request_time = time.time()
278 result = orig_func(self, *args, **kwargs)
279 except Exception as error:
280 self.error_streak += 1
281 self.next_request_time += min(2 ** self.error_streak,
285 self.error_streak = 0
287 return throttle_wrapper
290 def sync_node(self, cloud_node, arvados_node):
291 return self._cloud.sync_node(cloud_node, arvados_node)
294 class ComputeNodeMonitorActor(config.actor_class):
295 """Actor to manage a running compute node.
297 This actor gets updates about a compute node's cloud and Arvados records.
298 It uses this information to notify subscribers when the node is eligible
301 def __init__(self, cloud_node, cloud_node_start_time, shutdown_timer,
302 cloud_fqdn_func, timer_actor, update_actor, cloud_client,
303 arvados_node=None, poll_stale_after=600, node_stale_after=3600,
306 super(ComputeNodeMonitorActor, self).__init__()
307 self._later = self.actor_ref.proxy()
308 self._last_log = None
309 self._shutdowns = shutdown_timer
310 self._cloud_node_fqdn = cloud_fqdn_func
311 self._timer = timer_actor
312 self._update = update_actor
313 self._cloud = cloud_client
314 self.cloud_node = cloud_node
315 self.cloud_node_start_time = cloud_node_start_time
316 self.poll_stale_after = poll_stale_after
317 self.node_stale_after = node_stale_after
318 self.boot_fail_after = boot_fail_after
319 self.subscribers = set()
320 self.arvados_node = None
321 self._later.update_arvados_node(arvados_node)
322 self.last_shutdown_opening = None
323 self._later.consider_shutdown()
325 def _set_logger(self):
326 self._logger = logging.getLogger("%s.%s.%s" % (self.__class__.__name__, self.actor_urn[33:], self.cloud_node.name))
330 self._timer.schedule(self.cloud_node_start_time + self.boot_fail_after, self._later.consider_shutdown)
332 def subscribe(self, subscriber):
333 self.subscribers.add(subscriber)
335 def _debug(self, msg, *args):
336 if msg == self._last_log:
339 self._logger.debug(msg, *args)
341 def in_state(self, *states):
342 # Return a boolean to say whether or not our Arvados node record is in
343 # one of the given states. If state information is not
344 # available--because this node has no Arvados record, the record is
345 # stale, or the record has no state information--return None.
346 if (self.arvados_node is None) or not timestamp_fresh(
347 arvados_node_mtime(self.arvados_node), self.node_stale_after):
349 state = self.arvados_node['crunch_worker_state']
352 result = state in states
354 result = result and not self.arvados_node['job_uuid']
357 def shutdown_eligible(self):
358 """Return True if eligible for shutdown, or a string explaining why the node
359 is not eligible for shutdown."""
361 if not self._shutdowns.window_open():
362 return "shutdown window is not open."
363 if self.arvados_node is None:
365 # If it hasn't pinged Arvados after boot_fail seconds, shut it down
366 if timestamp_fresh(self.cloud_node_start_time, self.boot_fail_after):
367 return "node is still booting, will be considered a failed boot at %s" % time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(self.cloud_node_start_time + self.boot_fail_after))
370 missing = arvados_node_missing(self.arvados_node, self.node_stale_after)
371 if missing and self._cloud.broken(self.cloud_node):
372 # Node is paired, but Arvados says it is missing and the cloud says the node
373 # is in an error state, so shut it down.
375 if missing is None and self._cloud.broken(self.cloud_node):
377 "Cloud node considered 'broken' but paired node %s last_ping_at is None, " +
378 "cannot check node_stale_after (node may be shut down and we just haven't gotten the message yet).",
379 self.arvados_node['uuid'])
380 if self.in_state('idle'):
383 return "node is not idle."
385 def consider_shutdown(self):
387 next_opening = self._shutdowns.next_opening()
388 eligible = self.shutdown_eligible()
390 self._debug("Suggesting shutdown.")
391 _notify_subscribers(self._later, self.subscribers)
392 elif self._shutdowns.window_open():
393 self._debug("Cannot shut down because %s", eligible)
394 elif self.last_shutdown_opening != next_opening:
395 self._debug("Shutdown window closed. Next at %s.",
396 time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(next_opening)))
397 self._timer.schedule(next_opening, self._later.consider_shutdown)
398 self.last_shutdown_opening = next_opening
400 self._logger.exception("Unexpected exception")
402 def offer_arvados_pair(self, arvados_node):
403 first_ping_s = arvados_node.get('first_ping_at')
404 if (self.arvados_node is not None) or (not first_ping_s):
406 elif ((arvados_node['ip_address'] in self.cloud_node.private_ips) and
407 (arvados_timestamp(first_ping_s) >= self.cloud_node_start_time)):
408 self._later.update_arvados_node(arvados_node)
409 return self.cloud_node.id
413 def update_cloud_node(self, cloud_node):
414 if cloud_node is not None:
415 self.cloud_node = cloud_node
416 self._later.consider_shutdown()
418 def update_arvados_node(self, arvados_node):
419 # If the cloud node's FQDN doesn't match what's in the Arvados node
420 # record, make them match.
421 # This method is a little unusual in the way it just fires off the
422 # request without checking the result or retrying errors. That's
423 # because this update happens every time we reload the Arvados node
424 # list: if a previous sync attempt failed, we'll see that the names
425 # are out of sync and just try again. ComputeNodeUpdateActor has
426 # the logic to throttle those effective retries when there's trouble.
427 if arvados_node is not None:
428 self.arvados_node = arvados_node
429 if (self._cloud_node_fqdn(self.cloud_node) !=
430 arvados_node_fqdn(self.arvados_node)):
431 self._update.sync_node(self.cloud_node, self.arvados_node)
432 self._later.consider_shutdown()