3 from __future__ import absolute_import, print_function
9 import libcloud.common.types as cloud_types
13 arvados_node_fqdn, arvados_node_mtime, arvados_timestamp, timestamp_fresh, \
14 arvados_node_missing, RetryMixin
15 from ...clientactor import _notify_subscribers
16 from ... import config
18 class ComputeNodeStateChangeBase(config.actor_class, RetryMixin):
19 """Base class for actors that change a compute node's state.
21 This base class takes care of retrying changes and notifying
22 subscribers when the change is finished.
24 def __init__(self, cloud_client, arvados_client, timer_actor,
25 retry_wait, max_retry_wait):
26 super(ComputeNodeStateChangeBase, self).__init__()
27 RetryMixin.__init__(self, retry_wait, max_retry_wait,
28 None, cloud_client, timer_actor)
29 self._later = self.actor_ref.proxy()
30 self._arvados = arvados_client
31 self.subscribers = set()
33 def _set_logger(self):
34 self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, self.actor_urn[33:]))
40 _notify_subscribers(self._later, self.subscribers)
41 self.subscribers = None
42 self._logger.info("finished")
44 def subscribe(self, subscriber):
45 if self.subscribers is None:
47 subscriber(self._later)
48 except pykka.ActorDeadError:
51 self.subscribers.add(subscriber)
53 def _clean_arvados_node(self, arvados_node, explanation):
54 return self._arvados.nodes().update(
55 uuid=arvados_node['uuid'],
56 body={'hostname': None,
59 'first_ping_at': None,
62 'info': {'ec2_instance_id': None,
63 'last_action': explanation}},
67 def _finish_on_exception(orig_func):
68 @functools.wraps(orig_func)
69 def finish_wrapper(self, *args, **kwargs):
71 return orig_func(self, *args, **kwargs)
72 except Exception as error:
73 self._logger.error("Actor error %s", error)
78 class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
79 """Actor to create and set up a cloud compute node.
81 This actor prepares an Arvados node record for a new compute node
82 (either creating one or cleaning one passed in), then boots the
83 actual compute node. It notifies subscribers when the cloud node
84 is successfully created (the last step in the process for Node
87 def __init__(self, timer_actor, arvados_client, cloud_client,
88 cloud_size, arvados_node=None,
89 retry_wait=1, max_retry_wait=180):
90 super(ComputeNodeSetupActor, self).__init__(
91 cloud_client, arvados_client, timer_actor,
92 retry_wait, max_retry_wait)
93 self.cloud_size = cloud_size
94 self.arvados_node = None
95 self.cloud_node = None
96 if arvados_node is None:
97 self._later.create_arvados_node()
99 self._later.prepare_arvados_node(arvados_node)
101 @ComputeNodeStateChangeBase._finish_on_exception
102 @RetryMixin._retry(config.ARVADOS_ERRORS)
103 def create_arvados_node(self):
104 self.arvados_node = self._arvados.nodes().create(body={}).execute()
105 self._later.create_cloud_node()
107 @ComputeNodeStateChangeBase._finish_on_exception
108 @RetryMixin._retry(config.ARVADOS_ERRORS)
109 def prepare_arvados_node(self, node):
110 self.arvados_node = self._clean_arvados_node(
111 node, "Prepared by Node Manager")
112 self._later.create_cloud_node()
114 @ComputeNodeStateChangeBase._finish_on_exception
116 def create_cloud_node(self):
117 self._logger.info("Sending create_node request for node size %s.",
118 self.cloud_size.name)
119 self.cloud_node = self._cloud.create_node(self.cloud_size,
121 if not self.cloud_node.size:
122 self.cloud_node.size = self.cloud_size
123 self._logger.info("Cloud node %s created.", self.cloud_node.id)
124 self._later.update_arvados_node_properties()
126 @ComputeNodeStateChangeBase._finish_on_exception
127 @RetryMixin._retry(config.ARVADOS_ERRORS)
128 def update_arvados_node_properties(self):
129 """Tell Arvados some details about the cloud node.
131 Currently we only include size/price from our request, which
132 we already knew before create_cloud_node(), but doing it here
133 gives us an opportunity to provide more detail from
134 self.cloud_node, too.
136 self.arvados_node['properties']['cloud_node'] = {
137 # Note this 'size' is the node size we asked the cloud
138 # driver to create -- not necessarily equal to the size
139 # reported by the cloud driver for the node that was
141 'size': self.cloud_size.id,
142 'price': self.cloud_size.price,
144 self.arvados_node = self._arvados.nodes().update(
145 uuid=self.arvados_node['uuid'],
146 body={'properties': self.arvados_node['properties']},
148 self._logger.info("%s updated properties.", self.arvados_node['uuid'])
149 self._later.post_create()
152 def post_create(self):
153 self._cloud.post_create_node(self.cloud_node)
154 self._logger.info("%s post-create work done.", self.cloud_node.id)
157 def stop_if_no_cloud_node(self):
158 if self.cloud_node is not None:
164 class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
165 """Actor to shut down a compute node.
167 This actor simply destroys a cloud node, retrying as needed.
169 # Reasons for a shutdown to be cancelled.
170 WINDOW_CLOSED = "shutdown window closed"
171 NODE_BROKEN = "cloud failed to shut down broken node"
173 def __init__(self, timer_actor, cloud_client, arvados_client, node_monitor,
174 cancellable=True, retry_wait=1, max_retry_wait=180):
175 # If a ShutdownActor is cancellable, it will ask the
176 # ComputeNodeMonitorActor if it's still eligible before taking each
177 # action, and stop the shutdown process if the node is no longer
178 # eligible. Normal shutdowns based on job demand should be
179 # cancellable; shutdowns based on node misbehavior should not.
180 super(ComputeNodeShutdownActor, self).__init__(
181 cloud_client, arvados_client, timer_actor,
182 retry_wait, max_retry_wait)
183 self._monitor = node_monitor.proxy()
184 self.cloud_node = self._monitor.cloud_node.get()
185 self.cancellable = cancellable
186 self.cancel_reason = None
189 def _set_logger(self):
190 self._logger = logging.getLogger("%s.%s.%s" % (self.__class__.__name__, self.actor_urn[33:], self.cloud_node.name))
193 super(ComputeNodeShutdownActor, self).on_start()
194 self._later.shutdown_node()
196 def _arvados_node(self):
197 return self._monitor.arvados_node.get()
199 def _finished(self, success_flag=None):
200 if success_flag is not None:
201 self.success = success_flag
202 return super(ComputeNodeShutdownActor, self)._finished()
204 def cancel_shutdown(self, reason):
205 self.cancel_reason = reason
206 self._logger.info("Shutdown cancelled: %s.",
207 self.cloud_node.id, reason)
208 self._finished(success_flag=False)
210 def _stop_if_window_closed(orig_func):
211 @functools.wraps(orig_func)
212 def stop_wrapper(self, *args, **kwargs):
213 if (self.cancellable and
214 (self._monitor.shutdown_eligible().get() is not True)):
215 self._later.cancel_shutdown(self.WINDOW_CLOSED)
218 return orig_func(self, *args, **kwargs)
221 @ComputeNodeStateChangeBase._finish_on_exception
222 @_stop_if_window_closed
224 def shutdown_node(self):
225 self._logger.info("Starting shutdown")
226 if not self._cloud.destroy_node(self.cloud_node):
227 if self._cloud.broken(self.cloud_node):
228 self._later.cancel_shutdown(self.NODE_BROKEN)
231 raise cloud_types.LibcloudError("destroy_node failed")
232 self._logger.info("Shutdown success")
233 arv_node = self._arvados_node()
235 self._finished(success_flag=True)
237 self._later.clean_arvados_node(arv_node)
239 @ComputeNodeStateChangeBase._finish_on_exception
240 @RetryMixin._retry(config.ARVADOS_ERRORS)
241 def clean_arvados_node(self, arvados_node):
242 self._clean_arvados_node(arvados_node, "Shut down by Node Manager")
243 self._finished(success_flag=True)
245 # Make the decorator available to subclasses.
246 _stop_if_window_closed = staticmethod(_stop_if_window_closed)
249 class ComputeNodeUpdateActor(config.actor_class):
250 """Actor to dispatch one-off cloud management requests.
252 This actor receives requests for small cloud updates, and
253 dispatches them to a real driver. ComputeNodeMonitorActors use
254 this to perform maintenance tasks on themselves. Having a
255 dedicated actor for this gives us the opportunity to control the
256 flow of requests; e.g., by backing off when errors occur.
258 This actor is most like a "traditional" Pykka actor: there's no
259 subscribing, but instead methods return real driver results. If
260 you're interested in those results, you should get them from the
261 Future that the proxy method returns. Be prepared to handle exceptions
262 from the cloud driver when you do.
264 def __init__(self, cloud_factory, max_retry_wait=180):
265 super(ComputeNodeUpdateActor, self).__init__()
266 self._cloud = cloud_factory()
267 self.max_retry_wait = max_retry_wait
268 self.error_streak = 0
269 self.next_request_time = time.time()
271 def _throttle_errors(orig_func):
272 @functools.wraps(orig_func)
273 def throttle_wrapper(self, *args, **kwargs):
274 throttle_time = self.next_request_time - time.time()
275 if throttle_time > 0:
276 time.sleep(throttle_time)
277 self.next_request_time = time.time()
279 result = orig_func(self, *args, **kwargs)
280 except Exception as error:
281 self.error_streak += 1
282 self.next_request_time += min(2 ** self.error_streak,
286 self.error_streak = 0
288 return throttle_wrapper
291 def sync_node(self, cloud_node, arvados_node):
292 return self._cloud.sync_node(cloud_node, arvados_node)
295 class ComputeNodeMonitorActor(config.actor_class):
296 """Actor to manage a running compute node.
298 This actor gets updates about a compute node's cloud and Arvados records.
299 It uses this information to notify subscribers when the node is eligible
302 def __init__(self, cloud_node, cloud_node_start_time, shutdown_timer,
303 cloud_fqdn_func, timer_actor, update_actor, cloud_client,
304 arvados_node=None, poll_stale_after=600, node_stale_after=3600,
307 super(ComputeNodeMonitorActor, self).__init__()
308 self._later = self.actor_ref.proxy()
309 self._last_log = None
310 self._shutdowns = shutdown_timer
311 self._cloud_node_fqdn = cloud_fqdn_func
312 self._timer = timer_actor
313 self._update = update_actor
314 self._cloud = cloud_client
315 self.cloud_node = cloud_node
316 self.cloud_node_start_time = cloud_node_start_time
317 self.poll_stale_after = poll_stale_after
318 self.node_stale_after = node_stale_after
319 self.boot_fail_after = boot_fail_after
320 self.subscribers = set()
321 self.arvados_node = None
322 self._later.update_arvados_node(arvados_node)
323 self.last_shutdown_opening = None
324 self._later.consider_shutdown()
326 def _set_logger(self):
327 self._logger = logging.getLogger("%s.%s.%s" % (self.__class__.__name__, self.actor_urn[33:], self.cloud_node.name))
331 self._timer.schedule(self.cloud_node_start_time + self.boot_fail_after, self._later.consider_shutdown)
333 def subscribe(self, subscriber):
334 self.subscribers.add(subscriber)
336 def _debug(self, msg, *args):
337 if msg == self._last_log:
340 self._logger.debug(msg, *args)
342 def in_state(self, *states):
343 # Return a boolean to say whether or not our Arvados node record is in
344 # one of the given states. If state information is not
345 # available--because this node has no Arvados record, the record is
346 # stale, or the record has no state information--return None.
347 if (self.arvados_node is None) or not timestamp_fresh(
348 arvados_node_mtime(self.arvados_node), self.node_stale_after):
350 state = self.arvados_node['crunch_worker_state']
353 result = state in states
355 result = result and not self.arvados_node['job_uuid']
358 def shutdown_eligible(self):
359 """Return True if eligible for shutdown, or a string explaining why the node
360 is not eligible for shutdown."""
362 if not self._shutdowns.window_open():
363 return "shutdown window is not open."
364 if self.arvados_node is None:
366 # If it hasn't pinged Arvados after boot_fail seconds, shut it down
367 if timestamp_fresh(self.cloud_node_start_time, self.boot_fail_after):
368 return "node is still booting, will be considered a failed boot at %s" % time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(self.cloud_node_start_time + self.boot_fail_after))
371 missing = arvados_node_missing(self.arvados_node, self.node_stale_after)
372 if missing and self._cloud.broken(self.cloud_node):
373 # Node is paired, but Arvados says it is missing and the cloud says the node
374 # is in an error state, so shut it down.
376 if missing is None and self._cloud.broken(self.cloud_node):
378 "Cloud node considered 'broken' but paired node %s last_ping_at is None, " +
379 "cannot check node_stale_after (node may be shut down and we just haven't gotten the message yet).",
380 self.arvados_node['uuid'])
381 if self.in_state('idle'):
384 return "node is not idle."
386 def consider_shutdown(self):
388 next_opening = self._shutdowns.next_opening()
389 eligible = self.shutdown_eligible()
391 self._debug("Suggesting shutdown.")
392 _notify_subscribers(self._later, self.subscribers)
393 elif self._shutdowns.window_open():
394 self._debug("Cannot shut down because %s", eligible)
395 elif self.last_shutdown_opening != next_opening:
396 self._debug("Shutdown window closed. Next at %s.",
397 time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(next_opening)))
398 self._timer.schedule(next_opening, self._later.consider_shutdown)
399 self.last_shutdown_opening = next_opening
401 self._logger.exception("Unexpected exception")
403 def offer_arvados_pair(self, arvados_node):
404 first_ping_s = arvados_node.get('first_ping_at')
405 if (self.arvados_node is not None) or (not first_ping_s):
407 elif ((arvados_node['ip_address'] in self.cloud_node.private_ips) and
408 (arvados_timestamp(first_ping_s) >= self.cloud_node_start_time)):
409 self._later.update_arvados_node(arvados_node)
410 return self.cloud_node.id
414 def update_cloud_node(self, cloud_node):
415 if cloud_node is not None:
416 self.cloud_node = cloud_node
417 self._later.consider_shutdown()
419 def update_arvados_node(self, arvados_node):
420 # If the cloud node's FQDN doesn't match what's in the Arvados node
421 # record, make them match.
422 # This method is a little unusual in the way it just fires off the
423 # request without checking the result or retrying errors. That's
424 # because this update happens every time we reload the Arvados node
425 # list: if a previous sync attempt failed, we'll see that the names
426 # are out of sync and just try again. ComputeNodeUpdateActor has
427 # the logic to throttle those effective retries when there's trouble.
428 if arvados_node is not None:
429 self.arvados_node = arvados_node
430 if (self._cloud_node_fqdn(self.cloud_node) !=
431 arvados_node_fqdn(self.arvados_node)):
432 self._update.sync_node(self.cloud_node, self.arvados_node)
433 self._later.consider_shutdown()