3 from __future__ import absolute_import, print_function
9 import libcloud.common.types as cloud_types
13 arvados_node_fqdn, arvados_node_mtime, arvados_timestamp, timestamp_fresh, \
14 arvados_node_missing, RetryMixin
15 from ...clientactor import _notify_subscribers
16 from ... import config
17 from .transitions import transitions
19 class ComputeNodeStateChangeBase(config.actor_class, RetryMixin):
20 """Base class for actors that change a compute node's state.
22 This base class takes care of retrying changes and notifying
23 subscribers when the change is finished.
25 def __init__(self, cloud_client, arvados_client, timer_actor,
26 retry_wait, max_retry_wait):
27 super(ComputeNodeStateChangeBase, self).__init__()
28 RetryMixin.__init__(self, retry_wait, max_retry_wait,
29 None, cloud_client, timer_actor)
30 self._later = self.actor_ref.tell_proxy()
31 self._arvados = arvados_client
32 self.subscribers = set()
34 def _set_logger(self):
35 self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, self.actor_urn[33:]))
41 if self.subscribers is None:
42 raise Exception("Actor tried to finish twice")
43 _notify_subscribers(self.actor_ref.proxy(), self.subscribers)
44 self.subscribers = None
45 self._logger.info("finished")
47 def subscribe(self, subscriber):
48 if self.subscribers is None:
50 subscriber(self.actor_ref.proxy())
51 except pykka.ActorDeadError:
54 self.subscribers.add(subscriber)
56 def _clean_arvados_node(self, arvados_node, explanation):
57 return self._arvados.nodes().update(
58 uuid=arvados_node['uuid'],
59 body={'hostname': None,
62 'first_ping_at': None,
65 'info': {'ec2_instance_id': None,
66 'last_action': explanation}},
70 def _finish_on_exception(orig_func):
71 @functools.wraps(orig_func)
72 def finish_wrapper(self, *args, **kwargs):
74 return orig_func(self, *args, **kwargs)
75 except Exception as error:
76 self._logger.error("Actor error %s", error)
81 class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
82 """Actor to create and set up a cloud compute node.
84 This actor prepares an Arvados node record for a new compute node
85 (either creating one or cleaning one passed in), then boots the
86 actual compute node. It notifies subscribers when the cloud node
87 is successfully created (the last step in the process for Node
90 def __init__(self, timer_actor, arvados_client, cloud_client,
91 cloud_size, arvados_node=None,
92 retry_wait=1, max_retry_wait=180):
93 super(ComputeNodeSetupActor, self).__init__(
94 cloud_client, arvados_client, timer_actor,
95 retry_wait, max_retry_wait)
96 self.cloud_size = cloud_size
97 self.arvados_node = None
98 self.cloud_node = None
99 if arvados_node is None:
100 self._later.create_arvados_node()
102 self._later.prepare_arvados_node(arvados_node)
104 @ComputeNodeStateChangeBase._finish_on_exception
105 @RetryMixin._retry(config.ARVADOS_ERRORS)
106 def create_arvados_node(self):
107 self.arvados_node = self._arvados.nodes().create(body={}).execute()
108 self._later.create_cloud_node()
110 @ComputeNodeStateChangeBase._finish_on_exception
111 @RetryMixin._retry(config.ARVADOS_ERRORS)
112 def prepare_arvados_node(self, node):
113 self.arvados_node = self._clean_arvados_node(
114 node, "Prepared by Node Manager")
115 self._later.create_cloud_node()
117 @ComputeNodeStateChangeBase._finish_on_exception
119 def create_cloud_node(self):
120 self._logger.info("Sending create_node request for node size %s.",
121 self.cloud_size.name)
122 self.cloud_node = self._cloud.create_node(self.cloud_size,
124 if not self.cloud_node.size:
125 self.cloud_node.size = self.cloud_size
126 self._logger.info("Cloud node %s created.", self.cloud_node.id)
127 self._later.update_arvados_node_properties()
129 @ComputeNodeStateChangeBase._finish_on_exception
130 @RetryMixin._retry(config.ARVADOS_ERRORS)
131 def update_arvados_node_properties(self):
132 """Tell Arvados some details about the cloud node.
134 Currently we only include size/price from our request, which
135 we already knew before create_cloud_node(), but doing it here
136 gives us an opportunity to provide more detail from
137 self.cloud_node, too.
139 self.arvados_node['properties']['cloud_node'] = {
140 # Note this 'size' is the node size we asked the cloud
141 # driver to create -- not necessarily equal to the size
142 # reported by the cloud driver for the node that was
144 'size': self.cloud_size.id,
145 'price': self.cloud_size.price,
147 self.arvados_node = self._arvados.nodes().update(
148 uuid=self.arvados_node['uuid'],
149 body={'properties': self.arvados_node['properties']},
151 self._logger.info("%s updated properties.", self.arvados_node['uuid'])
152 self._later.post_create()
155 def post_create(self):
156 self._cloud.post_create_node(self.cloud_node)
157 self._logger.info("%s post-create work done.", self.cloud_node.id)
160 def stop_if_no_cloud_node(self):
161 if self.cloud_node is not None:
167 class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
168 """Actor to shut down a compute node.
170 This actor simply destroys a cloud node, retrying as needed.
172 # Reasons for a shutdown to be cancelled.
173 WINDOW_CLOSED = "shutdown window closed"
174 DESTROY_FAILED = "destroy_node failed"
176 def __init__(self, timer_actor, cloud_client, arvados_client, node_monitor,
177 cancellable=True, retry_wait=1, max_retry_wait=180):
178 # If a ShutdownActor is cancellable, it will ask the
179 # ComputeNodeMonitorActor if it's still eligible before taking each
180 # action, and stop the shutdown process if the node is no longer
181 # eligible. Normal shutdowns based on job demand should be
182 # cancellable; shutdowns based on node misbehavior should not.
183 super(ComputeNodeShutdownActor, self).__init__(
184 cloud_client, arvados_client, timer_actor,
185 retry_wait, max_retry_wait)
186 self._monitor = node_monitor.proxy()
187 self.cloud_node = self._monitor.cloud_node.get()
188 self.cancellable = cancellable
189 self.cancel_reason = None
192 def _set_logger(self):
193 self._logger = logging.getLogger("%s.%s.%s" % (self.__class__.__name__, self.actor_urn[33:], self.cloud_node.name))
196 super(ComputeNodeShutdownActor, self).on_start()
197 self._later.shutdown_node()
199 def _arvados_node(self):
200 return self._monitor.arvados_node.get()
202 def _finished(self, success_flag=None):
203 if success_flag is not None:
204 self.success = success_flag
205 return super(ComputeNodeShutdownActor, self)._finished()
207 def cancel_shutdown(self, reason, **kwargs):
208 self.cancel_reason = reason
209 self._logger.info("Shutdown cancelled: %s.", reason)
210 self._finished(success_flag=False)
212 def _cancel_on_exception(orig_func):
213 @functools.wraps(orig_func)
214 def finish_wrapper(self, *args, **kwargs):
216 return orig_func(self, *args, **kwargs)
217 except Exception as error:
218 self._logger.error("Actor error %s", error)
219 self._logger.debug("", exc_info=True)
220 self._later.cancel_shutdown("Unhandled exception %s" % error, try_resume=False)
221 return finish_wrapper
223 @_cancel_on_exception
224 def shutdown_node(self):
226 self._logger.info("Checking that node is still eligible for shutdown")
227 eligible, reason = self._monitor.shutdown_eligible().get()
229 self.cancel_shutdown("No longer eligible for shut down because %s" % reason,
234 def _destroy_node(self):
235 self._logger.info("Starting shutdown")
236 arv_node = self._arvados_node()
237 if self._cloud.destroy_node(self.cloud_node):
238 self._logger.info("Shutdown success")
240 self._later.clean_arvados_node(arv_node)
242 self._finished(success_flag=True)
244 self.cancel_shutdown(self.DESTROY_FAILED, try_resume=False)
246 @ComputeNodeStateChangeBase._finish_on_exception
247 @RetryMixin._retry(config.ARVADOS_ERRORS)
248 def clean_arvados_node(self, arvados_node):
249 self._clean_arvados_node(arvados_node, "Shut down by Node Manager")
250 self._finished(success_flag=True)
253 class ComputeNodeUpdateActor(config.actor_class):
254 """Actor to dispatch one-off cloud management requests.
256 This actor receives requests for small cloud updates, and
257 dispatches them to a real driver. ComputeNodeMonitorActors use
258 this to perform maintenance tasks on themselves. Having a
259 dedicated actor for this gives us the opportunity to control the
260 flow of requests; e.g., by backing off when errors occur.
262 def __init__(self, cloud_factory, max_retry_wait=180):
263 super(ComputeNodeUpdateActor, self).__init__()
264 self._cloud = cloud_factory()
265 self.max_retry_wait = max_retry_wait
266 self.error_streak = 0
267 self.next_request_time = time.time()
269 def _set_logger(self):
270 self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, self.actor_urn[33:]))
275 def _throttle_errors(orig_func):
276 @functools.wraps(orig_func)
277 def throttle_wrapper(self, *args, **kwargs):
278 throttle_time = self.next_request_time - time.time()
279 if throttle_time > 0:
280 time.sleep(throttle_time)
281 self.next_request_time = time.time()
283 result = orig_func(self, *args, **kwargs)
284 except Exception as error:
285 if self._cloud.is_cloud_exception(error):
286 self.error_streak += 1
287 self.next_request_time += min(2 ** self.error_streak,
290 "Unhandled exception: %s", error, exc_info=error)
292 self.error_streak = 0
294 return throttle_wrapper
297 def sync_node(self, cloud_node, arvados_node):
298 return self._cloud.sync_node(cloud_node, arvados_node)
301 class ComputeNodeMonitorActor(config.actor_class):
302 """Actor to manage a running compute node.
304 This actor gets updates about a compute node's cloud and Arvados records.
305 It uses this information to notify subscribers when the node is eligible
308 def __init__(self, cloud_node, cloud_node_start_time, shutdown_timer,
309 cloud_fqdn_func, timer_actor, update_actor, cloud_client,
310 arvados_node=None, poll_stale_after=600, node_stale_after=3600,
313 super(ComputeNodeMonitorActor, self).__init__()
314 self._later = self.actor_ref.tell_proxy()
315 self._shutdowns = shutdown_timer
316 self._cloud_node_fqdn = cloud_fqdn_func
317 self._timer = timer_actor
318 self._update = update_actor
319 self._cloud = cloud_client
320 self.cloud_node = cloud_node
321 self.cloud_node_start_time = cloud_node_start_time
322 self.poll_stale_after = poll_stale_after
323 self.node_stale_after = node_stale_after
324 self.boot_fail_after = boot_fail_after
325 self.subscribers = set()
326 self.arvados_node = None
327 self._later.update_arvados_node(arvados_node)
328 self.last_shutdown_opening = None
329 self._later.consider_shutdown()
331 def _set_logger(self):
332 self._logger = logging.getLogger("%s.%s.%s" % (self.__class__.__name__, self.actor_urn[33:], self.cloud_node.name))
336 self._timer.schedule(self.cloud_node_start_time + self.boot_fail_after, self._later.consider_shutdown)
338 def subscribe(self, subscriber):
339 self.subscribers.add(subscriber)
341 def _debug(self, msg, *args):
342 self._logger.debug(msg, *args)
345 """Get node state, one of ['unpaired', 'busy', 'idle', 'down']."""
347 # If this node is not associated with an Arvados node, return 'unpaired'.
348 if self.arvados_node is None:
351 # This node is indicated as non-functioning by the cloud
352 if self._cloud.broken(self.cloud_node):
355 state = self.arvados_node['crunch_worker_state']
357 # If state information is not available because it is missing or the
358 # record is stale, return 'down'.
359 if not state or not timestamp_fresh(arvados_node_mtime(self.arvados_node),
360 self.node_stale_after):
363 # There's a window between when a node pings for the first time and the
364 # value of 'slurm_state' is synchronized by crunch-dispatch. In this
365 # window, the node will still report as 'down'. Check that
366 # first_ping_at is truthy and consider the node 'idle' during the
367 # initial boot grace period.
368 if (state == 'down' and
369 self.arvados_node['first_ping_at'] and
370 timestamp_fresh(self.cloud_node_start_time,
371 self.boot_fail_after) and
372 not self._cloud.broken(self.cloud_node)):
375 # "missing" means last_ping_at is stale, this should be
377 if arvados_node_missing(self.arvados_node, self.node_stale_after):
380 # Turns out using 'job_uuid' this way is a bad idea. The node record
381 # is assigned the job_uuid before the job is locked (which removes it
382 # from the queue) which means the job will be double-counted as both in
383 # the wishlist and but also keeping a node busy. This end result is
384 # excess nodes being booted.
385 #if state == 'idle' and self.arvados_node['job_uuid']:
390 def in_state(self, *states):
391 return self.get_state() in states
393 def shutdown_eligible(self):
394 """Determine if node is candidate for shut down.
396 Returns a tuple of (boolean, string) where the first value is whether
397 the node is candidate for shut down, and the second value is the
398 reason for the decision.
401 # Collect states and then consult state transition table whether we
402 # should shut down. Possible states are:
403 # crunch_worker_state = ['unpaired', 'busy', 'idle', 'down']
404 # window = ["open", "closed"]
405 # boot_grace = ["boot wait", "boot exceeded"]
406 # idle_grace = ["not idle", "idle wait", "idle exceeded"]
408 if self.arvados_node and not timestamp_fresh(arvados_node_mtime(self.arvados_node), self.node_stale_after):
409 return (False, "node state is stale")
411 crunch_worker_state = self.get_state()
413 window = "open" if self._shutdowns.window_open() else "closed"
415 if timestamp_fresh(self.cloud_node_start_time, self.boot_fail_after):
416 boot_grace = "boot wait"
418 boot_grace = "boot exceeded"
420 # API server side not implemented yet.
421 idle_grace = 'idle exceeded'
423 node_state = (crunch_worker_state, window, boot_grace, idle_grace)
424 t = transitions[node_state]
426 # yes, shutdown eligible
427 return (True, "node state is %s" % (node_state,))
429 # no, return a reason
430 return (False, "node state is %s" % (node_state,))
432 def consider_shutdown(self):
434 eligible, reason = self.shutdown_eligible()
435 next_opening = self._shutdowns.next_opening()
437 self._debug("Suggesting shutdown because %s", reason)
438 _notify_subscribers(self.actor_ref.proxy(), self.subscribers)
440 self._debug("Not eligible for shut down because %s", reason)
442 if self.last_shutdown_opening != next_opening:
443 self._debug("Shutdown window closed. Next at %s.",
444 time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(next_opening)))
445 self._timer.schedule(next_opening, self._later.consider_shutdown)
446 self.last_shutdown_opening = next_opening
448 self._logger.exception("Unexpected exception")
450 def offer_arvados_pair(self, arvados_node):
451 first_ping_s = arvados_node.get('first_ping_at')
452 if (self.arvados_node is not None) or (not first_ping_s):
454 elif ((arvados_node['info'].get('ec2_instance_id') == self._cloud.node_id(self.cloud_node)) and
455 (arvados_timestamp(first_ping_s) >= self.cloud_node_start_time)):
456 self._later.update_arvados_node(arvados_node)
457 return self.cloud_node.id
461 def update_cloud_node(self, cloud_node):
462 if cloud_node is not None:
463 self.cloud_node = cloud_node
464 self._later.consider_shutdown()
466 def update_arvados_node(self, arvados_node):
467 # If the cloud node's FQDN doesn't match what's in the Arvados node
468 # record, make them match.
469 # This method is a little unusual in the way it just fires off the
470 # request without checking the result or retrying errors. That's
471 # because this update happens every time we reload the Arvados node
472 # list: if a previous sync attempt failed, we'll see that the names
473 # are out of sync and just try again. ComputeNodeUpdateActor has
474 # the logic to throttle those effective retries when there's trouble.
475 if arvados_node is not None:
476 self.arvados_node = arvados_node
477 if (self._cloud_node_fqdn(self.cloud_node) !=
478 arvados_node_fqdn(self.arvados_node)):
479 self._update.sync_node(self.cloud_node, self.arvados_node)
480 self._later.consider_shutdown()