3 from __future__ import absolute_import, print_function
9 import libcloud.common.types as cloud_types
13 arvados_node_fqdn, arvados_node_mtime, arvados_timestamp, timestamp_fresh, \
14 arvados_node_missing, RetryMixin
15 from ...clientactor import _notify_subscribers
16 from ... import config
17 from .transitions import transitions
19 class ComputeNodeStateChangeBase(config.actor_class, RetryMixin):
20 """Base class for actors that change a compute node's state.
22 This base class takes care of retrying changes and notifying
23 subscribers when the change is finished.
25 def __init__(self, cloud_client, arvados_client, timer_actor,
26 retry_wait, max_retry_wait):
27 super(ComputeNodeStateChangeBase, self).__init__()
28 RetryMixin.__init__(self, retry_wait, max_retry_wait,
29 None, cloud_client, timer_actor)
30 self._later = self.actor_ref.tell_proxy()
31 self._arvados = arvados_client
32 self.subscribers = set()
34 def _set_logger(self):
35 self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, self.actor_urn[33:]))
41 if self.subscribers is None:
42 raise Exception("Actor tried to finish twice")
43 _notify_subscribers(self.actor_ref.proxy(), self.subscribers)
44 self.subscribers = None
45 self._logger.info("finished")
47 def subscribe(self, subscriber):
48 if self.subscribers is None:
50 subscriber(self.actor_ref.proxy())
51 except pykka.ActorDeadError:
54 self.subscribers.add(subscriber)
56 def _clean_arvados_node(self, arvados_node, explanation):
57 return self._arvados.nodes().update(
58 uuid=arvados_node['uuid'],
59 body={'hostname': None,
62 'first_ping_at': None,
65 'info': {'ec2_instance_id': None,
66 'last_action': explanation}},
70 def _finish_on_exception(orig_func):
71 @functools.wraps(orig_func)
72 def finish_wrapper(self, *args, **kwargs):
74 return orig_func(self, *args, **kwargs)
75 except Exception as error:
76 self._logger.error("Actor error %s", error)
81 class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
82 """Actor to create and set up a cloud compute node.
84 This actor prepares an Arvados node record for a new compute node
85 (either creating one or cleaning one passed in), then boots the
86 actual compute node. It notifies subscribers when the cloud node
87 is successfully created (the last step in the process for Node
90 def __init__(self, timer_actor, arvados_client, cloud_client,
91 cloud_size, arvados_node=None,
92 retry_wait=1, max_retry_wait=180):
93 super(ComputeNodeSetupActor, self).__init__(
94 cloud_client, arvados_client, timer_actor,
95 retry_wait, max_retry_wait)
96 self.cloud_size = cloud_size
97 self.arvados_node = None
98 self.cloud_node = None
99 if arvados_node is None:
100 self._later.create_arvados_node()
102 self._later.prepare_arvados_node(arvados_node)
104 @ComputeNodeStateChangeBase._finish_on_exception
105 @RetryMixin._retry(config.ARVADOS_ERRORS)
106 def create_arvados_node(self):
107 self.arvados_node = self._arvados.nodes().create(body={}).execute()
108 self._later.create_cloud_node()
110 @ComputeNodeStateChangeBase._finish_on_exception
111 @RetryMixin._retry(config.ARVADOS_ERRORS)
112 def prepare_arvados_node(self, node):
113 self.arvados_node = self._clean_arvados_node(
114 node, "Prepared by Node Manager")
115 self._later.create_cloud_node()
117 @ComputeNodeStateChangeBase._finish_on_exception
119 def create_cloud_node(self):
120 self._logger.info("Sending create_node request for node size %s.",
121 self.cloud_size.name)
122 self.cloud_node = self._cloud.create_node(self.cloud_size,
125 # The information included in the node size object we get from libcloud
126 # is inconsistent between cloud providers. Replace libcloud NodeSize
127 # object with compatible CloudSizeWrapper object which merges the size
128 # info reported from the cloud with size information from the
129 # configuration file.
130 self.cloud_node.size = self.cloud_size
132 self._logger.info("Cloud node %s created.", self.cloud_node.id)
133 self._later.update_arvados_node_properties()
135 @ComputeNodeStateChangeBase._finish_on_exception
136 @RetryMixin._retry(config.ARVADOS_ERRORS)
137 def update_arvados_node_properties(self):
138 """Tell Arvados some details about the cloud node.
140 Currently we only include size/price from our request, which
141 we already knew before create_cloud_node(), but doing it here
142 gives us an opportunity to provide more detail from
143 self.cloud_node, too.
145 self.arvados_node['properties']['cloud_node'] = {
146 # Note this 'size' is the node size we asked the cloud
147 # driver to create -- not necessarily equal to the size
148 # reported by the cloud driver for the node that was
150 'size': self.cloud_size.id,
151 'price': self.cloud_size.price,
153 self.arvados_node = self._arvados.nodes().update(
154 uuid=self.arvados_node['uuid'],
155 body={'properties': self.arvados_node['properties']},
157 self._logger.info("%s updated properties.", self.arvados_node['uuid'])
158 self._later.post_create()
161 def post_create(self):
162 self._cloud.post_create_node(self.cloud_node)
163 self._logger.info("%s post-create work done.", self.cloud_node.id)
166 def stop_if_no_cloud_node(self):
167 if self.cloud_node is not None:
173 class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
174 """Actor to shut down a compute node.
176 This actor simply destroys a cloud node, retrying as needed.
178 # Reasons for a shutdown to be cancelled.
179 WINDOW_CLOSED = "shutdown window closed"
180 DESTROY_FAILED = "destroy_node failed"
182 def __init__(self, timer_actor, cloud_client, arvados_client, node_monitor,
183 cancellable=True, retry_wait=1, max_retry_wait=180):
184 # If a ShutdownActor is cancellable, it will ask the
185 # ComputeNodeMonitorActor if it's still eligible before taking each
186 # action, and stop the shutdown process if the node is no longer
187 # eligible. Normal shutdowns based on job demand should be
188 # cancellable; shutdowns based on node misbehavior should not.
189 super(ComputeNodeShutdownActor, self).__init__(
190 cloud_client, arvados_client, timer_actor,
191 retry_wait, max_retry_wait)
192 self._monitor = node_monitor.proxy()
193 self.cloud_node = self._monitor.cloud_node.get()
194 self.cancellable = cancellable
195 self.cancel_reason = None
198 def _set_logger(self):
199 self._logger = logging.getLogger("%s.%s.%s" % (self.__class__.__name__, self.actor_urn[33:], self.cloud_node.name))
202 super(ComputeNodeShutdownActor, self).on_start()
203 self._later.shutdown_node()
205 def _arvados_node(self):
206 return self._monitor.arvados_node.get()
208 def _finished(self, success_flag=None):
209 if success_flag is not None:
210 self.success = success_flag
211 return super(ComputeNodeShutdownActor, self)._finished()
213 def cancel_shutdown(self, reason, **kwargs):
214 self.cancel_reason = reason
215 self._logger.info("Shutdown cancelled: %s.", reason)
216 self._finished(success_flag=False)
218 def _cancel_on_exception(orig_func):
219 @functools.wraps(orig_func)
220 def finish_wrapper(self, *args, **kwargs):
222 return orig_func(self, *args, **kwargs)
223 except Exception as error:
224 self._logger.error("Actor error %s", error)
225 self._logger.debug("", exc_info=True)
226 self._later.cancel_shutdown("Unhandled exception %s" % error, try_resume=False)
227 return finish_wrapper
229 @_cancel_on_exception
230 def shutdown_node(self):
232 self._logger.info("Checking that node is still eligible for shutdown")
233 eligible, reason = self._monitor.shutdown_eligible().get()
235 self.cancel_shutdown("No longer eligible for shut down because %s" % reason,
240 def _destroy_node(self):
241 self._logger.info("Starting shutdown")
242 arv_node = self._arvados_node()
243 if self._cloud.destroy_node(self.cloud_node):
244 self._logger.info("Shutdown success")
246 self._later.clean_arvados_node(arv_node)
248 self._finished(success_flag=True)
250 self.cancel_shutdown(self.DESTROY_FAILED, try_resume=False)
252 @ComputeNodeStateChangeBase._finish_on_exception
253 @RetryMixin._retry(config.ARVADOS_ERRORS)
254 def clean_arvados_node(self, arvados_node):
255 self._clean_arvados_node(arvados_node, "Shut down by Node Manager")
256 self._finished(success_flag=True)
259 class ComputeNodeUpdateActor(config.actor_class):
260 """Actor to dispatch one-off cloud management requests.
262 This actor receives requests for small cloud updates, and
263 dispatches them to a real driver. ComputeNodeMonitorActors use
264 this to perform maintenance tasks on themselves. Having a
265 dedicated actor for this gives us the opportunity to control the
266 flow of requests; e.g., by backing off when errors occur.
268 def __init__(self, cloud_factory, max_retry_wait=180):
269 super(ComputeNodeUpdateActor, self).__init__()
270 self._cloud = cloud_factory()
271 self.max_retry_wait = max_retry_wait
272 self.error_streak = 0
273 self.next_request_time = time.time()
275 def _set_logger(self):
276 self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, self.actor_urn[33:]))
281 def _throttle_errors(orig_func):
282 @functools.wraps(orig_func)
283 def throttle_wrapper(self, *args, **kwargs):
284 throttle_time = self.next_request_time - time.time()
285 if throttle_time > 0:
286 time.sleep(throttle_time)
287 self.next_request_time = time.time()
289 result = orig_func(self, *args, **kwargs)
290 except Exception as error:
291 if self._cloud.is_cloud_exception(error):
292 self.error_streak += 1
293 self.next_request_time += min(2 ** self.error_streak,
296 "Unhandled exception: %s", error, exc_info=error)
298 self.error_streak = 0
300 return throttle_wrapper
303 def sync_node(self, cloud_node, arvados_node):
304 return self._cloud.sync_node(cloud_node, arvados_node)
307 class ComputeNodeMonitorActor(config.actor_class):
308 """Actor to manage a running compute node.
310 This actor gets updates about a compute node's cloud and Arvados records.
311 It uses this information to notify subscribers when the node is eligible
314 def __init__(self, cloud_node, cloud_node_start_time, shutdown_timer,
315 cloud_fqdn_func, timer_actor, update_actor, cloud_client,
316 arvados_node=None, poll_stale_after=600, node_stale_after=3600,
319 super(ComputeNodeMonitorActor, self).__init__()
320 self._later = self.actor_ref.tell_proxy()
321 self._shutdowns = shutdown_timer
322 self._cloud_node_fqdn = cloud_fqdn_func
323 self._timer = timer_actor
324 self._update = update_actor
325 self._cloud = cloud_client
326 self.cloud_node = cloud_node
327 self.cloud_node_start_time = cloud_node_start_time
328 self.poll_stale_after = poll_stale_after
329 self.node_stale_after = node_stale_after
330 self.boot_fail_after = boot_fail_after
331 self.subscribers = set()
332 self.arvados_node = None
333 self._later.update_arvados_node(arvados_node)
334 self.last_shutdown_opening = None
335 self._later.consider_shutdown()
337 def _set_logger(self):
338 self._logger = logging.getLogger("%s.%s.%s" % (self.__class__.__name__, self.actor_urn[33:], self.cloud_node.name))
342 self._timer.schedule(self.cloud_node_start_time + self.boot_fail_after, self._later.consider_shutdown)
344 def subscribe(self, subscriber):
345 self.subscribers.add(subscriber)
347 def _debug(self, msg, *args):
348 self._logger.debug(msg, *args)
351 """Get node state, one of ['unpaired', 'busy', 'idle', 'down']."""
353 # If this node is not associated with an Arvados node, return 'unpaired'.
354 if self.arvados_node is None:
357 state = self.arvados_node['crunch_worker_state']
359 # If state information is not available because it is missing or the
360 # record is stale, return 'down'.
361 if not state or not timestamp_fresh(arvados_node_mtime(self.arvados_node),
362 self.node_stale_after):
365 # There's a window between when a node pings for the first time and the
366 # value of 'slurm_state' is synchronized by crunch-dispatch. In this
367 # window, the node will still report as 'down'. Check that
368 # first_ping_at is truthy and consider the node 'idle' during the
369 # initial boot grace period.
370 if (state == 'down' and
371 self.arvados_node['first_ping_at'] and
372 timestamp_fresh(self.cloud_node_start_time,
373 self.boot_fail_after) and
374 not self._cloud.broken(self.cloud_node)):
377 # "missing" means last_ping_at is stale, this should be
379 if arvados_node_missing(self.arvados_node, self.node_stale_after):
382 # Turns out using 'job_uuid' this way is a bad idea. The node record
383 # is assigned the job_uuid before the job is locked (which removes it
384 # from the queue) which means the job will be double-counted as both in
385 # the wishlist and but also keeping a node busy. This end result is
386 # excess nodes being booted.
387 #if state == 'idle' and self.arvados_node['job_uuid']:
392 def in_state(self, *states):
393 return self.get_state() in states
395 def shutdown_eligible(self):
396 """Determine if node is candidate for shut down.
398 Returns a tuple of (boolean, string) where the first value is whether
399 the node is candidate for shut down, and the second value is the
400 reason for the decision.
403 # Collect states and then consult state transition table whether we
404 # should shut down. Possible states are:
405 # crunch_worker_state = ['unpaired', 'busy', 'idle', 'down']
406 # window = ["open", "closed"]
407 # boot_grace = ["boot wait", "boot exceeded"]
408 # idle_grace = ["not idle", "idle wait", "idle exceeded"]
410 if self.arvados_node and not timestamp_fresh(arvados_node_mtime(self.arvados_node), self.node_stale_after):
411 return (False, "node state is stale")
413 crunch_worker_state = self.get_state()
415 window = "open" if self._shutdowns.window_open() else "closed"
417 if timestamp_fresh(self.cloud_node_start_time, self.boot_fail_after):
418 boot_grace = "boot wait"
420 boot_grace = "boot exceeded"
422 # API server side not implemented yet.
423 idle_grace = 'idle exceeded'
425 node_state = (crunch_worker_state, window, boot_grace, idle_grace)
426 t = transitions[node_state]
428 # yes, shutdown eligible
429 return (True, "node state is %s" % (node_state,))
431 # no, return a reason
432 return (False, "node state is %s" % (node_state,))
434 def consider_shutdown(self):
436 eligible, reason = self.shutdown_eligible()
437 next_opening = self._shutdowns.next_opening()
439 self._debug("Suggesting shutdown because %s", reason)
440 _notify_subscribers(self.actor_ref.proxy(), self.subscribers)
442 self._debug("Not eligible for shut down because %s", reason)
444 if self.last_shutdown_opening != next_opening:
445 self._debug("Shutdown window closed. Next at %s.",
446 time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(next_opening)))
447 self._timer.schedule(next_opening, self._later.consider_shutdown)
448 self.last_shutdown_opening = next_opening
450 self._logger.exception("Unexpected exception")
452 def offer_arvados_pair(self, arvados_node):
453 first_ping_s = arvados_node.get('first_ping_at')
454 if (self.arvados_node is not None) or (not first_ping_s):
456 elif ((arvados_node['info'].get('ec2_instance_id') == self._cloud.node_id(self.cloud_node)) and
457 (arvados_timestamp(first_ping_s) >= self.cloud_node_start_time)):
458 self._later.update_arvados_node(arvados_node)
459 return self.cloud_node.id
463 def update_cloud_node(self, cloud_node):
464 if cloud_node is not None:
465 self.cloud_node = cloud_node
466 self._later.consider_shutdown()
468 def update_arvados_node(self, arvados_node):
469 # If the cloud node's FQDN doesn't match what's in the Arvados node
470 # record, make them match.
471 # This method is a little unusual in the way it just fires off the
472 # request without checking the result or retrying errors. That's
473 # because this update happens every time we reload the Arvados node
474 # list: if a previous sync attempt failed, we'll see that the names
475 # are out of sync and just try again. ComputeNodeUpdateActor has
476 # the logic to throttle those effective retries when there's trouble.
477 if arvados_node is not None:
478 self.arvados_node = arvados_node
479 if (self._cloud_node_fqdn(self.cloud_node) !=
480 arvados_node_fqdn(self.arvados_node)):
481 self._update.sync_node(self.cloud_node, self.arvados_node)
482 self._later.consider_shutdown()