3 from __future__ import absolute_import, print_function
10 import libcloud.common.types as cloud_types
11 from libcloud.common.exceptions import BaseHTTPError
16 arvados_node_fqdn, arvados_node_mtime, arvados_timestamp, timestamp_fresh, \
17 arvados_node_missing, RetryMixin
18 from ...clientactor import _notify_subscribers
19 from ... import config
20 from .transitions import transitions
22 QuotaExceeded = "QuotaExceeded"
24 class ComputeNodeStateChangeBase(config.actor_class, RetryMixin):
25 """Base class for actors that change a compute node's state.
27 This base class takes care of retrying changes and notifying
28 subscribers when the change is finished.
30 def __init__(self, cloud_client, arvados_client, timer_actor,
31 retry_wait, max_retry_wait):
32 super(ComputeNodeStateChangeBase, self).__init__()
33 RetryMixin.__init__(self, retry_wait, max_retry_wait,
34 None, cloud_client, timer_actor)
35 self._later = self.actor_ref.tell_proxy()
36 self._arvados = arvados_client
37 self.subscribers = set()
39 def _set_logger(self):
40 self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, self.actor_urn[33:]))
46 if self.subscribers is None:
47 raise Exception("Actor tried to finish twice")
48 _notify_subscribers(self.actor_ref.proxy(), self.subscribers)
49 self.subscribers = None
50 self._logger.info("finished")
52 def subscribe(self, subscriber):
53 if self.subscribers is None:
55 subscriber(self.actor_ref.proxy())
56 except pykka.ActorDeadError:
59 self.subscribers.add(subscriber)
61 def _clean_arvados_node(self, arvados_node, explanation):
62 return self._arvados.nodes().update(
63 uuid=arvados_node['uuid'],
64 body={'hostname': None,
67 'first_ping_at': None,
70 'info': {'ec2_instance_id': None,
71 'last_action': explanation}},
75 def _finish_on_exception(orig_func):
76 @functools.wraps(orig_func)
77 def finish_wrapper(self, *args, **kwargs):
79 return orig_func(self, *args, **kwargs)
80 except Exception as error:
81 self._logger.error("Actor error %s", error)
86 class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
87 """Actor to create and set up a cloud compute node.
89 This actor prepares an Arvados node record for a new compute node
90 (either creating one or cleaning one passed in), then boots the
91 actual compute node. It notifies subscribers when the cloud node
92 is successfully created (the last step in the process for Node
95 def __init__(self, timer_actor, arvados_client, cloud_client,
96 cloud_size, arvados_node=None,
97 retry_wait=1, max_retry_wait=180):
98 super(ComputeNodeSetupActor, self).__init__(
99 cloud_client, arvados_client, timer_actor,
100 retry_wait, max_retry_wait)
101 self.cloud_size = cloud_size
102 self.arvados_node = None
103 self.cloud_node = None
105 if arvados_node is None:
106 self._later.create_arvados_node()
108 self._later.prepare_arvados_node(arvados_node)
110 @ComputeNodeStateChangeBase._finish_on_exception
111 @RetryMixin._retry(config.ARVADOS_ERRORS)
112 def create_arvados_node(self):
113 self.arvados_node = self._arvados.nodes().create(body={}).execute()
114 self._later.create_cloud_node()
116 @ComputeNodeStateChangeBase._finish_on_exception
117 @RetryMixin._retry(config.ARVADOS_ERRORS)
118 def prepare_arvados_node(self, node):
119 self.arvados_node = self._clean_arvados_node(
120 node, "Prepared by Node Manager")
121 self._later.create_cloud_node()
123 @ComputeNodeStateChangeBase._finish_on_exception
125 def create_cloud_node(self):
126 self._logger.info("Sending create_node request for node size %s.",
127 self.cloud_size.name)
129 self.cloud_node = self._cloud.create_node(self.cloud_size,
131 except BaseHTTPError as e:
132 if e.code == 429 or "RequestLimitExceeded" in e.message:
133 # Don't consider API rate limits to be quota errors.
134 # re-raise so the Retry logic applies.
137 # The set of possible error codes / messages isn't documented for
138 # all clouds, so use a keyword heuristic to determine if the
139 # failure is likely due to a quota.
140 if re.search(r'(exceed|quota|limit)', e.message, re.I):
141 self.error = QuotaExceeded
142 self._logger.warning("Quota exceeded: %s", e)
146 # Something else happened, re-raise so the Retry logic applies.
148 except Exception as e:
151 # The information included in the node size object we get from libcloud
152 # is inconsistent between cloud drivers. Replace libcloud NodeSize
153 # object with compatible CloudSizeWrapper object which merges the size
154 # info reported from the cloud with size information from the
155 # configuration file.
156 self.cloud_node.size = self.cloud_size
158 self._logger.info("Cloud node %s created.", self.cloud_node.id)
159 self._later.update_arvados_node_properties()
161 @ComputeNodeStateChangeBase._finish_on_exception
162 @RetryMixin._retry(config.ARVADOS_ERRORS)
163 def update_arvados_node_properties(self):
164 """Tell Arvados some details about the cloud node.
166 Currently we only include size/price from our request, which
167 we already knew before create_cloud_node(), but doing it here
168 gives us an opportunity to provide more detail from
169 self.cloud_node, too.
171 self.arvados_node['properties']['cloud_node'] = {
172 # Note this 'size' is the node size we asked the cloud
173 # driver to create -- not necessarily equal to the size
174 # reported by the cloud driver for the node that was
176 'size': self.cloud_size.id,
177 'price': self.cloud_size.price,
179 self.arvados_node = self._arvados.nodes().update(
180 uuid=self.arvados_node['uuid'],
181 body={'properties': self.arvados_node['properties']},
183 self._logger.info("%s updated properties.", self.arvados_node['uuid'])
184 self._later.post_create()
187 def post_create(self):
188 self._cloud.post_create_node(self.cloud_node)
189 self._logger.info("%s post-create work done.", self.cloud_node.id)
192 def stop_if_no_cloud_node(self):
193 if self.cloud_node is not None:
199 class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
200 """Actor to shut down a compute node.
202 This actor simply destroys a cloud node, retrying as needed.
204 # Reasons for a shutdown to be cancelled.
205 WINDOW_CLOSED = "shutdown window closed"
206 DESTROY_FAILED = "destroy_node failed"
208 def __init__(self, timer_actor, cloud_client, arvados_client, node_monitor,
209 cancellable=True, retry_wait=1, max_retry_wait=180):
210 # If a ShutdownActor is cancellable, it will ask the
211 # ComputeNodeMonitorActor if it's still eligible before taking each
212 # action, and stop the shutdown process if the node is no longer
213 # eligible. Normal shutdowns based on job demand should be
214 # cancellable; shutdowns based on node misbehavior should not.
215 super(ComputeNodeShutdownActor, self).__init__(
216 cloud_client, arvados_client, timer_actor,
217 retry_wait, max_retry_wait)
218 self._monitor = node_monitor.proxy()
219 self.cloud_node = self._monitor.cloud_node.get()
220 self.cancellable = cancellable
221 self.cancel_reason = None
224 def _set_logger(self):
225 self._logger = logging.getLogger("%s.%s.%s" % (self.__class__.__name__, self.actor_urn[33:], self.cloud_node.name))
228 super(ComputeNodeShutdownActor, self).on_start()
229 self._later.shutdown_node()
231 def _arvados_node(self):
232 return self._monitor.arvados_node.get()
234 def _finished(self, success_flag=None):
235 if success_flag is not None:
236 self.success = success_flag
237 return super(ComputeNodeShutdownActor, self)._finished()
239 def cancel_shutdown(self, reason, **kwargs):
240 self.cancel_reason = reason
241 self._logger.info("Shutdown cancelled: %s.", reason)
242 self._finished(success_flag=False)
244 def _cancel_on_exception(orig_func):
245 @functools.wraps(orig_func)
246 def finish_wrapper(self, *args, **kwargs):
248 return orig_func(self, *args, **kwargs)
249 except Exception as error:
250 self._logger.error("Actor error %s", error)
251 self._logger.debug("", exc_info=True)
252 self._later.cancel_shutdown("Unhandled exception %s" % error, try_resume=False)
253 return finish_wrapper
255 @_cancel_on_exception
256 def shutdown_node(self):
258 self._logger.info("Checking that node is still eligible for shutdown")
259 eligible, reason = self._monitor.shutdown_eligible().get()
261 self.cancel_shutdown("No longer eligible for shut down because %s" % reason,
266 def _destroy_node(self):
267 self._logger.info("Starting shutdown")
268 arv_node = self._arvados_node()
269 if self._cloud.destroy_node(self.cloud_node):
270 self._logger.info("Shutdown success")
272 self._later.clean_arvados_node(arv_node)
274 self._finished(success_flag=True)
276 self.cancel_shutdown(self.DESTROY_FAILED, try_resume=False)
278 @ComputeNodeStateChangeBase._finish_on_exception
279 @RetryMixin._retry(config.ARVADOS_ERRORS)
280 def clean_arvados_node(self, arvados_node):
281 self._clean_arvados_node(arvados_node, "Shut down by Node Manager")
282 self._finished(success_flag=True)
285 class ComputeNodeUpdateActor(config.actor_class, RetryMixin):
286 """Actor to dispatch one-off cloud management requests.
288 This actor receives requests for small cloud updates, and
289 dispatches them to a real driver. ComputeNodeMonitorActors use
290 this to perform maintenance tasks on themselves. Having a
291 dedicated actor for this gives us the opportunity to control the
292 flow of requests; e.g., by backing off when errors occur.
294 def __init__(self, cloud_factory, timer_actor, max_retry_wait=180):
295 super(ComputeNodeUpdateActor, self).__init__()
296 RetryMixin.__init__(self, 1, max_retry_wait,
297 None, cloud_factory(), timer_actor)
298 self._cloud = cloud_factory()
299 self._later = self.actor_ref.tell_proxy()
301 def _set_logger(self):
302 self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, self.actor_urn[33:]))
308 def sync_node(self, cloud_node, arvados_node):
309 return self._cloud.sync_node(cloud_node, arvados_node)
312 class ComputeNodeMonitorActor(config.actor_class):
313 """Actor to manage a running compute node.
315 This actor gets updates about a compute node's cloud and Arvados records.
316 It uses this information to notify subscribers when the node is eligible
319 def __init__(self, cloud_node, cloud_node_start_time, shutdown_timer,
320 cloud_fqdn_func, timer_actor, update_actor, cloud_client,
321 arvados_node=None, poll_stale_after=600, node_stale_after=3600,
324 super(ComputeNodeMonitorActor, self).__init__()
325 self._later = self.actor_ref.tell_proxy()
326 self._shutdowns = shutdown_timer
327 self._cloud_node_fqdn = cloud_fqdn_func
328 self._timer = timer_actor
329 self._update = update_actor
330 self._cloud = cloud_client
331 self.cloud_node = cloud_node
332 self.cloud_node_start_time = cloud_node_start_time
333 self.poll_stale_after = poll_stale_after
334 self.node_stale_after = node_stale_after
335 self.boot_fail_after = boot_fail_after
336 self.subscribers = set()
337 self.arvados_node = None
338 self._later.update_arvados_node(arvados_node)
339 self.last_shutdown_opening = None
340 self._later.consider_shutdown()
342 def _set_logger(self):
343 self._logger = logging.getLogger("%s.%s.%s" % (self.__class__.__name__, self.actor_urn[33:], self.cloud_node.name))
347 self._timer.schedule(self.cloud_node_start_time + self.boot_fail_after, self._later.consider_shutdown)
349 def subscribe(self, subscriber):
350 self.subscribers.add(subscriber)
352 def _debug(self, msg, *args):
353 self._logger.debug(msg, *args)
356 """Get node state, one of ['unpaired', 'busy', 'idle', 'down']."""
358 # If this node is not associated with an Arvados node, return 'unpaired'.
359 if self.arvados_node is None:
362 state = self.arvados_node['crunch_worker_state']
364 # If state information is not available because it is missing or the
365 # record is stale, return 'down'.
366 if not state or not timestamp_fresh(arvados_node_mtime(self.arvados_node),
367 self.node_stale_after):
370 # There's a window between when a node pings for the first time and the
371 # value of 'slurm_state' is synchronized by crunch-dispatch. In this
372 # window, the node will still report as 'down'. Check that
373 # first_ping_at is truthy and consider the node 'idle' during the
374 # initial boot grace period.
375 if (state == 'down' and
376 self.arvados_node['first_ping_at'] and
377 timestamp_fresh(self.cloud_node_start_time,
378 self.boot_fail_after) and
379 not self._cloud.broken(self.cloud_node)):
382 # "missing" means last_ping_at is stale, this should be
384 if arvados_node_missing(self.arvados_node, self.node_stale_after):
387 # Turns out using 'job_uuid' this way is a bad idea. The node record
388 # is assigned the job_uuid before the job is locked (which removes it
389 # from the queue) which means the job will be double-counted as both in
390 # the wishlist and but also keeping a node busy. This end result is
391 # excess nodes being booted.
392 #if state == 'idle' and self.arvados_node['job_uuid']:
397 def in_state(self, *states):
398 return self.get_state() in states
400 def shutdown_eligible(self):
401 """Determine if node is candidate for shut down.
403 Returns a tuple of (boolean, string) where the first value is whether
404 the node is candidate for shut down, and the second value is the
405 reason for the decision.
408 # Collect states and then consult state transition table whether we
409 # should shut down. Possible states are:
410 # crunch_worker_state = ['unpaired', 'busy', 'idle', 'down']
411 # window = ["open", "closed"]
412 # boot_grace = ["boot wait", "boot exceeded"]
413 # idle_grace = ["not idle", "idle wait", "idle exceeded"]
415 if self.arvados_node and not timestamp_fresh(arvados_node_mtime(self.arvados_node), self.node_stale_after):
416 return (False, "node state is stale")
418 crunch_worker_state = self.get_state()
420 window = "open" if self._shutdowns.window_open() else "closed"
422 if timestamp_fresh(self.cloud_node_start_time, self.boot_fail_after):
423 boot_grace = "boot wait"
425 boot_grace = "boot exceeded"
427 # API server side not implemented yet.
428 idle_grace = 'idle exceeded'
430 node_state = (crunch_worker_state, window, boot_grace, idle_grace)
431 t = transitions[node_state]
433 # yes, shutdown eligible
434 return (True, "node state is %s" % (node_state,))
436 # no, return a reason
437 return (False, "node state is %s" % (node_state,))
439 def consider_shutdown(self):
441 eligible, reason = self.shutdown_eligible()
442 next_opening = self._shutdowns.next_opening()
444 self._debug("Suggesting shutdown because %s", reason)
445 _notify_subscribers(self.actor_ref.proxy(), self.subscribers)
447 self._debug("Not eligible for shut down because %s", reason)
449 if self.last_shutdown_opening != next_opening:
450 self._debug("Shutdown window closed. Next at %s.",
451 time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(next_opening)))
452 self._timer.schedule(next_opening, self._later.consider_shutdown)
453 self.last_shutdown_opening = next_opening
455 self._logger.exception("Unexpected exception")
457 def offer_arvados_pair(self, arvados_node):
458 first_ping_s = arvados_node.get('first_ping_at')
459 if (self.arvados_node is not None) or (not first_ping_s):
461 elif ((arvados_node['info'].get('ec2_instance_id') == self._cloud.node_id(self.cloud_node)) and
462 (arvados_timestamp(first_ping_s) >= self.cloud_node_start_time)):
463 self._later.update_arvados_node(arvados_node)
464 return self.cloud_node.id
468 def update_cloud_node(self, cloud_node):
469 if cloud_node is not None:
470 self.cloud_node = cloud_node
471 self._later.consider_shutdown()
473 def update_arvados_node(self, arvados_node):
474 # If the cloud node's FQDN doesn't match what's in the Arvados node
475 # record, make them match.
476 # This method is a little unusual in the way it just fires off the
477 # request without checking the result or retrying errors. That's
478 # because this update happens every time we reload the Arvados node
479 # list: if a previous sync attempt failed, we'll see that the names
480 # are out of sync and just try again. ComputeNodeUpdateActor has
481 # the logic to throttle those effective retries when there's trouble.
482 if arvados_node is not None:
483 self.arvados_node = arvados_node
484 if (self._cloud_node_fqdn(self.cloud_node) !=
485 arvados_node_fqdn(self.arvados_node)):
486 self._update.sync_node(self.cloud_node, self.arvados_node)
487 self._later.consider_shutdown()