3 from __future__ import absolute_import, print_function
9 import libcloud.common.types as cloud_types
13 arvados_node_fqdn, arvados_node_mtime, arvados_timestamp, timestamp_fresh, \
14 arvados_node_missing, RetryMixin
15 from ...clientactor import _notify_subscribers
16 from ... import config
17 from .transitions import transitions
19 class ComputeNodeStateChangeBase(config.actor_class, RetryMixin):
20 """Base class for actors that change a compute node's state.
22 This base class takes care of retrying changes and notifying
23 subscribers when the change is finished.
25 def __init__(self, cloud_client, arvados_client, timer_actor,
26 retry_wait, max_retry_wait):
27 super(ComputeNodeStateChangeBase, self).__init__()
28 RetryMixin.__init__(self, retry_wait, max_retry_wait,
29 None, cloud_client, timer_actor)
30 self._later = self.actor_ref.tell_proxy()
31 self._arvados = arvados_client
32 self.subscribers = set()
34 def _set_logger(self):
35 self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, self.actor_urn[33:]))
41 if self.subscribers is None:
42 raise Exception("Actor tried to finish twice")
43 _notify_subscribers(self.actor_ref.proxy(), self.subscribers)
44 self.subscribers = None
45 self._logger.info("finished")
47 def subscribe(self, subscriber):
48 if self.subscribers is None:
50 subscriber(self.actor_ref.proxy())
51 except pykka.ActorDeadError:
54 self.subscribers.add(subscriber)
56 def _clean_arvados_node(self, arvados_node, explanation):
57 return self._arvados.nodes().update(
58 uuid=arvados_node['uuid'],
59 body={'hostname': None,
62 'first_ping_at': None,
65 'info': {'ec2_instance_id': None,
66 'last_action': explanation}},
70 def _finish_on_exception(orig_func):
71 @functools.wraps(orig_func)
72 def finish_wrapper(self, *args, **kwargs):
74 return orig_func(self, *args, **kwargs)
75 except Exception as error:
76 self._logger.error("Actor error %s", error)
81 class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
82 """Actor to create and set up a cloud compute node.
84 This actor prepares an Arvados node record for a new compute node
85 (either creating one or cleaning one passed in), then boots the
86 actual compute node. It notifies subscribers when the cloud node
87 is successfully created (the last step in the process for Node
90 def __init__(self, timer_actor, arvados_client, cloud_client,
91 cloud_size, arvados_node=None,
92 retry_wait=1, max_retry_wait=180):
93 super(ComputeNodeSetupActor, self).__init__(
94 cloud_client, arvados_client, timer_actor,
95 retry_wait, max_retry_wait)
96 self.cloud_size = cloud_size
97 self.arvados_node = None
98 self.cloud_node = None
99 if arvados_node is None:
100 self._later.create_arvados_node()
102 self._later.prepare_arvados_node(arvados_node)
104 @ComputeNodeStateChangeBase._finish_on_exception
105 @RetryMixin._retry(config.ARVADOS_ERRORS)
106 def create_arvados_node(self):
107 self.arvados_node = self._arvados.nodes().create(body={}).execute()
108 self._later.create_cloud_node()
110 @ComputeNodeStateChangeBase._finish_on_exception
111 @RetryMixin._retry(config.ARVADOS_ERRORS)
112 def prepare_arvados_node(self, node):
113 self.arvados_node = self._clean_arvados_node(
114 node, "Prepared by Node Manager")
115 self._later.create_cloud_node()
117 @ComputeNodeStateChangeBase._finish_on_exception
119 def create_cloud_node(self):
120 self._logger.info("Sending create_node request for node size %s.",
121 self.cloud_size.name)
122 self.cloud_node = self._cloud.create_node(self.cloud_size,
124 if not self.cloud_node.size:
125 self.cloud_node.size = self.cloud_size
126 self._logger.info("Cloud node %s created.", self.cloud_node.id)
127 self._later.update_arvados_node_properties()
129 @ComputeNodeStateChangeBase._finish_on_exception
130 @RetryMixin._retry(config.ARVADOS_ERRORS)
131 def update_arvados_node_properties(self):
132 """Tell Arvados some details about the cloud node.
134 Currently we only include size/price from our request, which
135 we already knew before create_cloud_node(), but doing it here
136 gives us an opportunity to provide more detail from
137 self.cloud_node, too.
139 self.arvados_node['properties']['cloud_node'] = {
140 # Note this 'size' is the node size we asked the cloud
141 # driver to create -- not necessarily equal to the size
142 # reported by the cloud driver for the node that was
144 'size': self.cloud_size.id,
145 'price': self.cloud_size.price,
147 self.arvados_node = self._arvados.nodes().update(
148 uuid=self.arvados_node['uuid'],
149 body={'properties': self.arvados_node['properties']},
151 self._logger.info("%s updated properties.", self.arvados_node['uuid'])
152 self._later.post_create()
155 def post_create(self):
156 self._cloud.post_create_node(self.cloud_node)
157 self._logger.info("%s post-create work done.", self.cloud_node.id)
160 def stop_if_no_cloud_node(self):
161 if self.cloud_node is not None:
167 class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
168 """Actor to shut down a compute node.
170 This actor simply destroys a cloud node, retrying as needed.
172 # Reasons for a shutdown to be cancelled.
173 WINDOW_CLOSED = "shutdown window closed"
174 NODE_BROKEN = "cloud failed to shut down broken node"
176 def __init__(self, timer_actor, cloud_client, arvados_client, node_monitor,
177 cancellable=True, retry_wait=1, max_retry_wait=180):
178 # If a ShutdownActor is cancellable, it will ask the
179 # ComputeNodeMonitorActor if it's still eligible before taking each
180 # action, and stop the shutdown process if the node is no longer
181 # eligible. Normal shutdowns based on job demand should be
182 # cancellable; shutdowns based on node misbehavior should not.
183 super(ComputeNodeShutdownActor, self).__init__(
184 cloud_client, arvados_client, timer_actor,
185 retry_wait, max_retry_wait)
186 self._monitor = node_monitor.proxy()
187 self.cloud_node = self._monitor.cloud_node.get()
188 self.cancellable = cancellable
189 self.cancel_reason = None
192 def _set_logger(self):
193 self._logger = logging.getLogger("%s.%s.%s" % (self.__class__.__name__, self.actor_urn[33:], self.cloud_node.name))
196 super(ComputeNodeShutdownActor, self).on_start()
197 self._later.shutdown_node()
199 def _arvados_node(self):
200 return self._monitor.arvados_node.get()
202 def _finished(self, success_flag=None):
203 if success_flag is not None:
204 self.success = success_flag
205 return super(ComputeNodeShutdownActor, self)._finished()
207 def cancel_shutdown(self, reason):
208 self.cancel_reason = reason
209 self._logger.info("Shutdown cancelled: %s.", reason)
210 self._finished(success_flag=False)
212 def _cancel_on_exception(orig_func):
213 @functools.wraps(orig_func)
214 def finish_wrapper(self, *args, **kwargs):
216 return orig_func(self, *args, **kwargs)
217 except Exception as error:
218 self._logger.error("Actor error %s", error)
219 self._later.cancel_shutdown("Unhandled exception %s" % error)
220 return finish_wrapper
222 @_cancel_on_exception
224 def shutdown_node(self):
225 self._logger.info("Starting shutdown")
226 arv_node = self._arvados_node()
227 if not self._cloud.destroy_node(self.cloud_node):
228 if self._cloud.broken(self.cloud_node):
229 self._later.cancel_shutdown(self.NODE_BROKEN)
233 raise cloud_types.LibcloudError("destroy_node failed")
234 self._logger.info("Shutdown success")
236 self._finished(success_flag=True)
238 self._later.clean_arvados_node(arv_node)
240 @ComputeNodeStateChangeBase._finish_on_exception
241 @RetryMixin._retry(config.ARVADOS_ERRORS)
242 def clean_arvados_node(self, arvados_node):
243 self._clean_arvados_node(arvados_node, "Shut down by Node Manager")
244 self._finished(success_flag=True)
247 class ComputeNodeUpdateActor(config.actor_class):
248 """Actor to dispatch one-off cloud management requests.
250 This actor receives requests for small cloud updates, and
251 dispatches them to a real driver. ComputeNodeMonitorActors use
252 this to perform maintenance tasks on themselves. Having a
253 dedicated actor for this gives us the opportunity to control the
254 flow of requests; e.g., by backing off when errors occur.
256 def __init__(self, cloud_factory, max_retry_wait=180):
257 super(ComputeNodeUpdateActor, self).__init__()
258 self._cloud = cloud_factory()
259 self.max_retry_wait = max_retry_wait
260 self.error_streak = 0
261 self.next_request_time = time.time()
263 def _set_logger(self):
264 self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, self.actor_urn[33:]))
269 def _throttle_errors(orig_func):
270 @functools.wraps(orig_func)
271 def throttle_wrapper(self, *args, **kwargs):
272 throttle_time = self.next_request_time - time.time()
273 if throttle_time > 0:
274 time.sleep(throttle_time)
275 self.next_request_time = time.time()
277 result = orig_func(self, *args, **kwargs)
278 except Exception as error:
279 if self._cloud.is_cloud_exception(error):
280 self.error_streak += 1
281 self.next_request_time += min(2 ** self.error_streak,
284 "Unhandled exception: %s", error, exc_info=error)
286 self.error_streak = 0
288 return throttle_wrapper
291 def sync_node(self, cloud_node, arvados_node):
292 return self._cloud.sync_node(cloud_node, arvados_node)
295 class ComputeNodeMonitorActor(config.actor_class):
296 """Actor to manage a running compute node.
298 This actor gets updates about a compute node's cloud and Arvados records.
299 It uses this information to notify subscribers when the node is eligible
302 def __init__(self, cloud_node, cloud_node_start_time, shutdown_timer,
303 cloud_fqdn_func, timer_actor, update_actor, cloud_client,
304 arvados_node=None, poll_stale_after=600, node_stale_after=3600,
307 super(ComputeNodeMonitorActor, self).__init__()
308 self._later = self.actor_ref.tell_proxy()
309 self._last_log = None
310 self._shutdowns = shutdown_timer
311 self._cloud_node_fqdn = cloud_fqdn_func
312 self._timer = timer_actor
313 self._update = update_actor
314 self._cloud = cloud_client
315 self.cloud_node = cloud_node
316 self.cloud_node_start_time = cloud_node_start_time
317 self.poll_stale_after = poll_stale_after
318 self.node_stale_after = node_stale_after
319 self.boot_fail_after = boot_fail_after
320 self.subscribers = set()
321 self.arvados_node = None
322 self._later.update_arvados_node(arvados_node)
323 self.last_shutdown_opening = None
324 self._later.consider_shutdown()
326 def _set_logger(self):
327 self._logger = logging.getLogger("%s.%s.%s" % (self.__class__.__name__, self.actor_urn[33:], self.cloud_node.name))
331 self._timer.schedule(self.cloud_node_start_time + self.boot_fail_after, self._later.consider_shutdown)
333 def subscribe(self, subscriber):
334 self.subscribers.add(subscriber)
336 def _debug(self, msg, *args):
337 if msg == self._last_log:
340 self._logger.debug(msg, *args)
343 """Get node state, one of ['unpaired', 'busy', 'idle', 'down']."""
345 # If this node is not associated with an Arvados node, return 'unpaired'.
346 if self.arvados_node is None:
349 state = self.arvados_node['crunch_worker_state']
351 # If state information is not available because it is missing or the
352 # record is stale, return 'down'.
353 if not state or not timestamp_fresh(arvados_node_mtime(self.arvados_node),
354 self.node_stale_after):
357 # There's a window between when a node pings for the first time and the
358 # value of 'slurm_state' is synchronized by crunch-dispatch. In this
359 # window, the node will still report as 'down'. Check that
360 # first_ping_at is truthy and consider the node 'idle' during the
361 # initial boot grace period.
362 if (state == 'down' and
363 self.arvados_node['first_ping_at'] and
364 timestamp_fresh(self.cloud_node_start_time,
365 self.boot_fail_after) and
366 not self._cloud.broken(self.cloud_node)):
369 # "missing" means last_ping_at is stale, this should be
371 if arvados_node_missing(self.arvados_node, self.node_stale_after):
374 # Turns out using 'job_uuid' this way is a bad idea. The node record
375 # is assigned the job_uuid before the job is locked (which removes it
376 # from the queue) which means the job will be double-counted as both in
377 # the wishlist and but also keeping a node busy. This end result is
378 # excess nodes being booted.
379 #if state == 'idle' and self.arvados_node['job_uuid']:
384 def in_state(self, *states):
385 return self.get_state() in states
387 def shutdown_eligible(self):
388 """Determine if node is candidate for shut down.
390 Returns a tuple of (boolean, string) where the first value is whether
391 the node is candidate for shut down, and the second value is the
392 reason for the decision.
395 # Collect states and then consult state transition table whether we
396 # should shut down. Possible states are:
397 # crunch_worker_state = ['unpaired', 'busy', 'idle', 'down']
398 # window = ["open", "closed"]
399 # boot_grace = ["boot wait", "boot exceeded"]
400 # idle_grace = ["not idle", "idle wait", "idle exceeded"]
402 if self.arvados_node and not timestamp_fresh(arvados_node_mtime(self.arvados_node), self.node_stale_after):
403 return (False, "node state is stale")
405 crunch_worker_state = self.get_state()
407 window = "open" if self._shutdowns.window_open() else "closed"
409 if timestamp_fresh(self.cloud_node_start_time, self.boot_fail_after):
410 boot_grace = "boot wait"
412 boot_grace = "boot exceeded"
414 # API server side not implemented yet.
415 idle_grace = 'idle exceeded'
417 node_state = (crunch_worker_state, window, boot_grace, idle_grace)
418 t = transitions[node_state]
420 # yes, shutdown eligible
421 return (True, "node state is %s" % (node_state,))
423 # no, return a reason
424 return (False, "node state is %s" % (node_state,))
426 def consider_shutdown(self):
428 eligible, reason = self.shutdown_eligible()
429 next_opening = self._shutdowns.next_opening()
431 self._debug("Suggesting shutdown because %s", reason)
432 _notify_subscribers(self.actor_ref.proxy(), self.subscribers)
434 self._debug("Not eligible for shut down because %s", reason)
436 if self.last_shutdown_opening != next_opening:
437 self._debug("Shutdown window closed. Next at %s.",
438 time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(next_opening)))
439 self._timer.schedule(next_opening, self._later.consider_shutdown)
440 self.last_shutdown_opening = next_opening
442 self._logger.exception("Unexpected exception")
444 def offer_arvados_pair(self, arvados_node):
445 first_ping_s = arvados_node.get('first_ping_at')
446 if (self.arvados_node is not None) or (not first_ping_s):
448 elif ((arvados_node['info'].get('ec2_instance_id') == self._cloud.node_id(self.cloud_node)) and
449 (arvados_timestamp(first_ping_s) >= self.cloud_node_start_time)):
450 self._later.update_arvados_node(arvados_node)
451 return self.cloud_node.id
455 def update_cloud_node(self, cloud_node):
456 if cloud_node is not None:
457 self.cloud_node = cloud_node
458 self._later.consider_shutdown()
460 def update_arvados_node(self, arvados_node):
461 # If the cloud node's FQDN doesn't match what's in the Arvados node
462 # record, make them match.
463 # This method is a little unusual in the way it just fires off the
464 # request without checking the result or retrying errors. That's
465 # because this update happens every time we reload the Arvados node
466 # list: if a previous sync attempt failed, we'll see that the names
467 # are out of sync and just try again. ComputeNodeUpdateActor has
468 # the logic to throttle those effective retries when there's trouble.
469 if arvados_node is not None:
470 self.arvados_node = arvados_node
471 if (self._cloud_node_fqdn(self.cloud_node) !=
472 arvados_node_fqdn(self.arvados_node)):
473 self._update.sync_node(self.cloud_node, self.arvados_node)
474 self._later.consider_shutdown()