3 from __future__ import absolute_import, print_function
9 import libcloud.common.types as cloud_types
13 arvados_node_fqdn, arvados_node_mtime, arvados_timestamp, timestamp_fresh, \
14 arvados_node_missing, RetryMixin
15 from ...clientactor import _notify_subscribers
16 from ... import config
17 from .transitions import transitions
19 class ComputeNodeStateChangeBase(config.actor_class, RetryMixin):
20 """Base class for actors that change a compute node's state.
22 This base class takes care of retrying changes and notifying
23 subscribers when the change is finished.
25 def __init__(self, cloud_client, arvados_client, timer_actor,
26 retry_wait, max_retry_wait):
27 super(ComputeNodeStateChangeBase, self).__init__()
28 RetryMixin.__init__(self, retry_wait, max_retry_wait,
29 None, cloud_client, timer_actor)
30 self._later = self.actor_ref.tell_proxy()
31 self._arvados = arvados_client
32 self.subscribers = set()
34 def _set_logger(self):
35 self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, self.actor_urn[33:]))
41 if self.subscribers is None:
42 raise Exception("Actor tried to finish twice")
43 _notify_subscribers(self.actor_ref.proxy(), self.subscribers)
44 self.subscribers = None
45 self._logger.info("finished")
47 def subscribe(self, subscriber):
48 if self.subscribers is None:
50 subscriber(self.actor_ref.proxy())
51 except pykka.ActorDeadError:
54 self.subscribers.add(subscriber)
56 def _clean_arvados_node(self, arvados_node, explanation):
57 return self._arvados.nodes().update(
58 uuid=arvados_node['uuid'],
59 body={'hostname': None,
62 'first_ping_at': None,
65 'info': {'ec2_instance_id': None,
66 'last_action': explanation}},
70 def _finish_on_exception(orig_func):
71 @functools.wraps(orig_func)
72 def finish_wrapper(self, *args, **kwargs):
74 return orig_func(self, *args, **kwargs)
75 except Exception as error:
76 self._logger.error("Actor error %s", error)
81 class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
82 """Actor to create and set up a cloud compute node.
84 This actor prepares an Arvados node record for a new compute node
85 (either creating one or cleaning one passed in), then boots the
86 actual compute node. It notifies subscribers when the cloud node
87 is successfully created (the last step in the process for Node
90 def __init__(self, timer_actor, arvados_client, cloud_client,
91 cloud_size, arvados_node,
92 retry_wait=1, max_retry_wait=180):
93 super(ComputeNodeSetupActor, self).__init__(
94 cloud_client, arvados_client, timer_actor,
95 retry_wait, max_retry_wait)
96 self.cloud_size = cloud_size
97 self.arvados_node = None
98 self.cloud_node = None
99 self._later.prepare_arvados_node(arvados_node)
101 @ComputeNodeStateChangeBase._finish_on_exception
102 @RetryMixin._retry(config.ARVADOS_ERRORS)
103 def prepare_arvados_node(self, node):
104 self.arvados_node = self._clean_arvados_node(
105 node, "Prepared by Node Manager")
106 self._later.create_cloud_node()
108 @ComputeNodeStateChangeBase._finish_on_exception
110 def create_cloud_node(self):
111 self._logger.info("Sending create_node request for node size %s.",
112 self.cloud_size.name)
113 self.cloud_node = self._cloud.create_node(self.cloud_size,
115 if not self.cloud_node.size:
116 self.cloud_node.size = self.cloud_size
117 self._logger.info("Cloud node %s created.", self.cloud_node.id)
118 self._later.update_arvados_node_properties()
120 @ComputeNodeStateChangeBase._finish_on_exception
121 @RetryMixin._retry(config.ARVADOS_ERRORS)
122 def update_arvados_node_properties(self):
123 """Tell Arvados some details about the cloud node.
125 Currently we only include size/price from our request, which
126 we already knew before create_cloud_node(), but doing it here
127 gives us an opportunity to provide more detail from
128 self.cloud_node, too.
130 self.arvados_node['properties']['cloud_node'] = {
131 # Note this 'size' is the node size we asked the cloud
132 # driver to create -- not necessarily equal to the size
133 # reported by the cloud driver for the node that was
135 'size': self.cloud_size.id,
136 'price': self.cloud_size.price,
138 self.arvados_node = self._arvados.nodes().update(
139 uuid=self.arvados_node['uuid'],
140 body={'properties': self.arvados_node['properties']},
142 self._logger.info("%s updated properties.", self.arvados_node['uuid'])
143 self._later.post_create()
146 def post_create(self):
147 self._cloud.post_create_node(self.cloud_node)
148 self._logger.info("%s post-create work done.", self.cloud_node.id)
151 def stop_if_no_cloud_node(self):
152 if self.cloud_node is not None:
158 class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
159 """Actor to shut down a compute node.
161 This actor simply destroys a cloud node, retrying as needed.
163 # Reasons for a shutdown to be cancelled.
164 WINDOW_CLOSED = "shutdown window closed"
165 NODE_BROKEN = "cloud failed to shut down broken node"
167 def __init__(self, timer_actor, cloud_client, arvados_client, node_monitor,
168 cancellable=True, retry_wait=1, max_retry_wait=180):
169 # If a ShutdownActor is cancellable, it will ask the
170 # ComputeNodeMonitorActor if it's still eligible before taking each
171 # action, and stop the shutdown process if the node is no longer
172 # eligible. Normal shutdowns based on job demand should be
173 # cancellable; shutdowns based on node misbehavior should not.
174 super(ComputeNodeShutdownActor, self).__init__(
175 cloud_client, arvados_client, timer_actor,
176 retry_wait, max_retry_wait)
177 self._monitor = node_monitor.proxy()
178 self.cloud_node = self._monitor.cloud_node.get()
179 self.cancellable = cancellable
180 self.cancel_reason = None
183 def _set_logger(self):
184 self._logger = logging.getLogger("%s.%s.%s" % (self.__class__.__name__, self.actor_urn[33:], self.cloud_node.name))
187 super(ComputeNodeShutdownActor, self).on_start()
188 self._later.shutdown_node()
190 def _arvados_node(self):
191 return self._monitor.arvados_node.get()
193 def _finished(self, success_flag=None):
194 if success_flag is not None:
195 self.success = success_flag
196 return super(ComputeNodeShutdownActor, self)._finished()
198 def cancel_shutdown(self, reason):
199 self.cancel_reason = reason
200 self._logger.info("Shutdown cancelled: %s.", reason)
201 self._finished(success_flag=False)
203 def _cancel_on_exception(orig_func):
204 @functools.wraps(orig_func)
205 def finish_wrapper(self, *args, **kwargs):
207 return orig_func(self, *args, **kwargs)
208 except Exception as error:
209 self._logger.error("Actor error %s", error)
210 self._later.cancel_shutdown("Unhandled exception %s" % error)
211 return finish_wrapper
213 @_cancel_on_exception
215 def shutdown_node(self):
216 self._logger.info("Starting shutdown")
217 if not self._cloud.destroy_node(self.cloud_node):
218 if self._cloud.broken(self.cloud_node):
219 self._later.cancel_shutdown(self.NODE_BROKEN)
223 raise cloud_types.LibcloudError("destroy_node failed")
224 self._logger.info("Shutdown success")
225 arv_node = self._arvados_node()
227 self._finished(success_flag=True)
229 self._later.clean_arvados_node(arv_node)
231 @ComputeNodeStateChangeBase._finish_on_exception
232 @RetryMixin._retry(config.ARVADOS_ERRORS)
233 def clean_arvados_node(self, arvados_node):
234 self._clean_arvados_node(arvados_node, "Shut down by Node Manager")
235 self._finished(success_flag=True)
238 class ComputeNodeUpdateActor(config.actor_class):
239 """Actor to dispatch one-off cloud management requests.
241 This actor receives requests for small cloud updates, and
242 dispatches them to a real driver. ComputeNodeMonitorActors use
243 this to perform maintenance tasks on themselves. Having a
244 dedicated actor for this gives us the opportunity to control the
245 flow of requests; e.g., by backing off when errors occur.
247 def __init__(self, cloud_factory, max_retry_wait=180):
248 super(ComputeNodeUpdateActor, self).__init__()
249 self._cloud = cloud_factory()
250 self.max_retry_wait = max_retry_wait
251 self.error_streak = 0
252 self.next_request_time = time.time()
254 def _set_logger(self):
255 self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, self.actor_urn[33:]))
260 def _throttle_errors(orig_func):
261 @functools.wraps(orig_func)
262 def throttle_wrapper(self, *args, **kwargs):
263 throttle_time = self.next_request_time - time.time()
264 if throttle_time > 0:
265 time.sleep(throttle_time)
266 self.next_request_time = time.time()
268 result = orig_func(self, *args, **kwargs)
269 except Exception as error:
270 if self._cloud.is_cloud_exception(error):
271 self.error_streak += 1
272 self.next_request_time += min(2 ** self.error_streak,
275 "Unhandled exception: %s", error, exc_info=error)
277 self.error_streak = 0
279 return throttle_wrapper
282 def sync_node(self, cloud_node, arvados_node):
283 return self._cloud.sync_node(cloud_node, arvados_node)
286 class ComputeNodeMonitorActor(config.actor_class):
287 """Actor to manage a running compute node.
289 This actor gets updates about a compute node's cloud and Arvados records.
290 It uses this information to notify subscribers when the node is eligible
293 def __init__(self, cloud_node, cloud_node_start_time, shutdown_timer,
294 cloud_fqdn_func, timer_actor, update_actor, cloud_client,
295 arvados_node=None, poll_stale_after=600, node_stale_after=3600,
298 super(ComputeNodeMonitorActor, self).__init__()
299 self._later = self.actor_ref.tell_proxy()
300 self._last_log = None
301 self._shutdowns = shutdown_timer
302 self._cloud_node_fqdn = cloud_fqdn_func
303 self._timer = timer_actor
304 self._update = update_actor
305 self._cloud = cloud_client
306 self.cloud_node = cloud_node
307 self.cloud_node_start_time = cloud_node_start_time
308 self.poll_stale_after = poll_stale_after
309 self.node_stale_after = node_stale_after
310 self.boot_fail_after = boot_fail_after
311 self.subscribers = set()
312 self.arvados_node = None
313 self._later.update_arvados_node(arvados_node)
314 self.last_shutdown_opening = None
315 self._later.consider_shutdown()
317 def _set_logger(self):
318 self._logger = logging.getLogger("%s.%s.%s" % (self.__class__.__name__, self.actor_urn[33:], self.cloud_node.name))
322 self._timer.schedule(self.cloud_node_start_time + self.boot_fail_after, self._later.consider_shutdown)
324 def subscribe(self, subscriber):
325 self.subscribers.add(subscriber)
327 def _debug(self, msg, *args):
328 if msg == self._last_log:
331 self._logger.debug(msg, *args)
333 def in_state(self, *states):
334 # Return a boolean to say whether or not our Arvados node record is in
335 # one of the given states. If state information is not
336 # available--because this node has no Arvados record, the record is
337 # stale, or the record has no state information--return None.
338 if (self.arvados_node is None) or not timestamp_fresh(
339 arvados_node_mtime(self.arvados_node), self.node_stale_after):
341 state = self.arvados_node['crunch_worker_state']
345 # There's a window between when a node pings for the first time and the
346 # value of 'slurm_state' is synchronized by crunch-dispatch. In this
347 # window, the node will still report as 'down'. Check that
348 # first_ping_at is truthy and consider the node 'idle' during the
349 # initial boot grace period.
350 if (state == 'down' and
351 self.arvados_node['first_ping_at'] and
352 timestamp_fresh(self.cloud_node_start_time,
353 self.boot_fail_after)):
356 # "missing" means last_ping_at is stale, this should be
358 if arvados_node_missing(self.arvados_node, self.node_stale_after):
361 result = state in states
363 result = result and not self.arvados_node['job_uuid']
366 def shutdown_eligible(self):
367 """Determine if node is candidate for shut down.
369 Returns a tuple of (boolean, string) where the first value is whether
370 the node is candidate for shut down, and the second value is the
371 reason for the decision.
374 # Collect states and then consult state transition table whether we
375 # should shut down. Possible states are:
376 # crunch_worker_state = ['unpaired', 'busy', 'idle', 'down']
377 # window = ["open", "closed"]
378 # boot_grace = ["boot wait", "boot exceeded"]
379 # idle_grace = ["not idle", "idle wait", "idle exceeded"]
381 if self.arvados_node is None:
382 crunch_worker_state = 'unpaired'
383 elif not timestamp_fresh(arvados_node_mtime(self.arvados_node), self.node_stale_after):
384 return (False, "node state is stale")
385 elif self.in_state('down'):
386 crunch_worker_state = 'down'
387 elif self.in_state('idle'):
388 crunch_worker_state = 'idle'
389 elif self.in_state('busy'):
390 crunch_worker_state = 'busy'
392 return (False, "node is paired but crunch_worker_state is '%s'" % self.arvados_node['crunch_worker_state'])
394 window = "open" if self._shutdowns.window_open() else "closed"
396 if timestamp_fresh(self.cloud_node_start_time, self.boot_fail_after):
397 boot_grace = "boot wait"
399 boot_grace = "boot exceeded"
401 # API server side not implemented yet.
402 idle_grace = 'idle exceeded'
404 node_state = (crunch_worker_state, window, boot_grace, idle_grace)
405 t = transitions[node_state]
407 # yes, shutdown eligible
408 return (True, "node state is %s" % (node_state,))
410 # no, return a reason
411 return (False, "node state is %s" % (node_state,))
413 def consider_shutdown(self):
415 eligible, reason = self.shutdown_eligible()
416 next_opening = self._shutdowns.next_opening()
418 self._debug("Suggesting shutdown because %s", reason)
419 _notify_subscribers(self.actor_ref.proxy(), self.subscribers)
421 self._debug("Not eligible for shut down because %s", reason)
423 if self.last_shutdown_opening != next_opening:
424 self._debug("Shutdown window closed. Next at %s.",
425 time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(next_opening)))
426 self._timer.schedule(next_opening, self._later.consider_shutdown)
427 self.last_shutdown_opening = next_opening
429 self._logger.exception("Unexpected exception")
431 def offer_arvados_pair(self, arvados_node):
432 first_ping_s = arvados_node.get('first_ping_at')
433 if (self.arvados_node is not None) or (not first_ping_s):
435 elif ((arvados_node['info'].get('ec2_instance_id') == self._cloud.node_id(self.cloud_node)) and
436 (arvados_timestamp(first_ping_s) >= self.cloud_node_start_time)):
437 self._later.update_arvados_node(arvados_node)
438 return self.cloud_node.id
442 def update_cloud_node(self, cloud_node):
443 if cloud_node is not None:
444 self.cloud_node = cloud_node
445 self._later.consider_shutdown()
447 def update_arvados_node(self, arvados_node):
448 # If the cloud node's FQDN doesn't match what's in the Arvados node
449 # record, make them match.
450 # This method is a little unusual in the way it just fires off the
451 # request without checking the result or retrying errors. That's
452 # because this update happens every time we reload the Arvados node
453 # list: if a previous sync attempt failed, we'll see that the names
454 # are out of sync and just try again. ComputeNodeUpdateActor has
455 # the logic to throttle those effective retries when there's trouble.
456 if arvados_node is not None:
457 self.arvados_node = arvados_node
458 if (self._cloud_node_fqdn(self.cloud_node) !=
459 arvados_node_fqdn(self.arvados_node)):
460 self._update.sync_node(self.cloud_node, self.arvados_node)
461 self._later.consider_shutdown()