3 from __future__ import absolute_import, print_function
9 import libcloud.common.types as cloud_types
13 arvados_node_fqdn, arvados_node_mtime, arvados_timestamp, timestamp_fresh, \
14 arvados_node_missing, RetryMixin
15 from ...clientactor import _notify_subscribers
16 from ... import config
17 from .transitions import transitions
19 class ComputeNodeStateChangeBase(config.actor_class, RetryMixin):
20 """Base class for actors that change a compute node's state.
22 This base class takes care of retrying changes and notifying
23 subscribers when the change is finished.
25 def __init__(self, cloud_client, arvados_client, timer_actor,
26 retry_wait, max_retry_wait):
27 super(ComputeNodeStateChangeBase, self).__init__()
28 RetryMixin.__init__(self, retry_wait, max_retry_wait,
29 None, cloud_client, timer_actor)
30 self._later = self.actor_ref.tell_proxy()
31 self._arvados = arvados_client
32 self.subscribers = set()
34 def _set_logger(self):
35 self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, self.actor_urn[33:]))
41 if self.subscribers is None:
42 raise Exception("Actor tried to finish twice")
43 _notify_subscribers(self.actor_ref.proxy(), self.subscribers)
44 self.subscribers = None
45 self._logger.info("finished")
47 def subscribe(self, subscriber):
48 if self.subscribers is None:
50 subscriber(self.actor_ref.proxy())
51 except pykka.ActorDeadError:
54 self.subscribers.add(subscriber)
56 def _clean_arvados_node(self, arvados_node, explanation):
57 return self._arvados.nodes().update(
58 uuid=arvados_node['uuid'],
59 body={'hostname': None,
62 'first_ping_at': None,
65 'info': {'ec2_instance_id': None,
66 'last_action': explanation}},
70 def _finish_on_exception(orig_func):
71 @functools.wraps(orig_func)
72 def finish_wrapper(self, *args, **kwargs):
74 return orig_func(self, *args, **kwargs)
75 except Exception as error:
76 self._logger.error("Actor error %s", error)
81 class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
82 """Actor to create and set up a cloud compute node.
84 This actor prepares an Arvados node record for a new compute node
85 (either creating one or cleaning one passed in), then boots the
86 actual compute node. It notifies subscribers when the cloud node
87 is successfully created (the last step in the process for Node
90 def __init__(self, timer_actor, arvados_client, cloud_client,
91 cloud_size, arvados_node=None,
92 retry_wait=1, max_retry_wait=180):
93 super(ComputeNodeSetupActor, self).__init__(
94 cloud_client, arvados_client, timer_actor,
95 retry_wait, max_retry_wait)
96 self.cloud_size = cloud_size
97 self.arvados_node = None
98 self.cloud_node = None
99 if arvados_node is None:
100 self._later.create_arvados_node()
102 self._later.prepare_arvados_node(arvados_node)
104 @ComputeNodeStateChangeBase._finish_on_exception
105 @RetryMixin._retry(config.ARVADOS_ERRORS)
106 def create_arvados_node(self):
107 self.arvados_node = self._arvados.nodes().create(body={}).execute()
108 self._later.create_cloud_node()
110 @ComputeNodeStateChangeBase._finish_on_exception
111 @RetryMixin._retry(config.ARVADOS_ERRORS)
112 def prepare_arvados_node(self, node):
113 self.arvados_node = self._clean_arvados_node(
114 node, "Prepared by Node Manager")
115 self._later.create_cloud_node()
117 @ComputeNodeStateChangeBase._finish_on_exception
119 def create_cloud_node(self):
120 self._logger.info("Sending create_node request for node size %s.",
121 self.cloud_size.name)
122 self.cloud_node = self._cloud.create_node(self.cloud_size,
124 if not self.cloud_node.size:
125 self.cloud_node.size = self.cloud_size
126 self._logger.info("Cloud node %s created.", self.cloud_node.id)
127 self._later.update_arvados_node_properties()
129 @ComputeNodeStateChangeBase._finish_on_exception
130 @RetryMixin._retry(config.ARVADOS_ERRORS)
131 def update_arvados_node_properties(self):
132 """Tell Arvados some details about the cloud node.
134 Currently we only include size/price from our request, which
135 we already knew before create_cloud_node(), but doing it here
136 gives us an opportunity to provide more detail from
137 self.cloud_node, too.
139 self.arvados_node['properties']['cloud_node'] = {
140 # Note this 'size' is the node size we asked the cloud
141 # driver to create -- not necessarily equal to the size
142 # reported by the cloud driver for the node that was
144 'size': self.cloud_size.id,
145 'price': self.cloud_size.price,
147 self.arvados_node = self._arvados.nodes().update(
148 uuid=self.arvados_node['uuid'],
149 body={'properties': self.arvados_node['properties']},
151 self._logger.info("%s updated properties.", self.arvados_node['uuid'])
152 self._later.post_create()
155 def post_create(self):
156 self._cloud.post_create_node(self.cloud_node)
157 self._logger.info("%s post-create work done.", self.cloud_node.id)
160 def stop_if_no_cloud_node(self):
161 if self.cloud_node is not None:
167 class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
168 """Actor to shut down a compute node.
170 This actor simply destroys a cloud node, retrying as needed.
172 # Reasons for a shutdown to be cancelled.
173 WINDOW_CLOSED = "shutdown window closed"
174 NODE_BROKEN = "cloud failed to shut down broken node"
176 def __init__(self, timer_actor, cloud_client, arvados_client, node_monitor,
177 cancellable=True, retry_wait=1, max_retry_wait=180):
178 # If a ShutdownActor is cancellable, it will ask the
179 # ComputeNodeMonitorActor if it's still eligible before taking each
180 # action, and stop the shutdown process if the node is no longer
181 # eligible. Normal shutdowns based on job demand should be
182 # cancellable; shutdowns based on node misbehavior should not.
183 super(ComputeNodeShutdownActor, self).__init__(
184 cloud_client, arvados_client, timer_actor,
185 retry_wait, max_retry_wait)
186 self._monitor = node_monitor.proxy()
187 self.cloud_node = self._monitor.cloud_node.get()
188 self.cancellable = cancellable
189 self.cancel_reason = None
192 def _set_logger(self):
193 self._logger = logging.getLogger("%s.%s.%s" % (self.__class__.__name__, self.actor_urn[33:], self.cloud_node.name))
196 super(ComputeNodeShutdownActor, self).on_start()
197 self._later.shutdown_node()
199 def _arvados_node(self):
200 return self._monitor.arvados_node.get()
202 def _finished(self, success_flag=None):
203 if success_flag is not None:
204 self.success = success_flag
205 return super(ComputeNodeShutdownActor, self)._finished()
207 def cancel_shutdown(self, reason):
208 self.cancel_reason = reason
209 self._logger.info("Shutdown cancelled: %s.", reason)
210 self._finished(success_flag=False)
212 def _cancel_on_exception(orig_func):
213 @functools.wraps(orig_func)
214 def finish_wrapper(self, *args, **kwargs):
216 return orig_func(self, *args, **kwargs)
217 except Exception as error:
218 self._logger.error("Actor error %s", error)
219 self._later.cancel_shutdown("Unhandled exception %s" % error)
220 return finish_wrapper
222 @_cancel_on_exception
224 def shutdown_node(self):
225 self._logger.info("Starting shutdown")
226 arv_node = self._arvados_node()
227 if not self._cloud.destroy_node(self.cloud_node):
228 if self._cloud.broken(self.cloud_node):
229 self._later.cancel_shutdown(self.NODE_BROKEN)
233 raise cloud_types.LibcloudError("destroy_node failed")
234 self._logger.info("Shutdown success")
236 self._finished(success_flag=True)
238 self._later.clean_arvados_node(arv_node)
240 @ComputeNodeStateChangeBase._finish_on_exception
241 @RetryMixin._retry(config.ARVADOS_ERRORS)
242 def clean_arvados_node(self, arvados_node):
243 self._clean_arvados_node(arvados_node, "Shut down by Node Manager")
244 self._finished(success_flag=True)
247 class ComputeNodeUpdateActor(config.actor_class):
248 """Actor to dispatch one-off cloud management requests.
250 This actor receives requests for small cloud updates, and
251 dispatches them to a real driver. ComputeNodeMonitorActors use
252 this to perform maintenance tasks on themselves. Having a
253 dedicated actor for this gives us the opportunity to control the
254 flow of requests; e.g., by backing off when errors occur.
256 def __init__(self, cloud_factory, max_retry_wait=180):
257 super(ComputeNodeUpdateActor, self).__init__()
258 self._cloud = cloud_factory()
259 self.max_retry_wait = max_retry_wait
260 self.error_streak = 0
261 self.next_request_time = time.time()
263 def _set_logger(self):
264 self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, self.actor_urn[33:]))
269 def _throttle_errors(orig_func):
270 @functools.wraps(orig_func)
271 def throttle_wrapper(self, *args, **kwargs):
272 throttle_time = self.next_request_time - time.time()
273 if throttle_time > 0:
274 time.sleep(throttle_time)
275 self.next_request_time = time.time()
277 result = orig_func(self, *args, **kwargs)
278 except Exception as error:
279 if self._cloud.is_cloud_exception(error):
280 self.error_streak += 1
281 self.next_request_time += min(2 ** self.error_streak,
284 "Unhandled exception: %s", error, exc_info=error)
286 self.error_streak = 0
288 return throttle_wrapper
291 def sync_node(self, cloud_node, arvados_node):
292 return self._cloud.sync_node(cloud_node, arvados_node)
295 class ComputeNodeMonitorActor(config.actor_class):
296 """Actor to manage a running compute node.
298 This actor gets updates about a compute node's cloud and Arvados records.
299 It uses this information to notify subscribers when the node is eligible
302 def __init__(self, cloud_node, cloud_node_start_time, shutdown_timer,
303 cloud_fqdn_func, timer_actor, update_actor, cloud_client,
304 arvados_node=None, poll_stale_after=600, node_stale_after=3600,
307 super(ComputeNodeMonitorActor, self).__init__()
308 self._later = self.actor_ref.tell_proxy()
309 self._shutdowns = shutdown_timer
310 self._cloud_node_fqdn = cloud_fqdn_func
311 self._timer = timer_actor
312 self._update = update_actor
313 self._cloud = cloud_client
314 self.cloud_node = cloud_node
315 self.cloud_node_start_time = cloud_node_start_time
316 self.poll_stale_after = poll_stale_after
317 self.node_stale_after = node_stale_after
318 self.boot_fail_after = boot_fail_after
319 self.subscribers = set()
320 self.arvados_node = None
321 self._later.update_arvados_node(arvados_node)
322 self.last_shutdown_opening = None
323 self._later.consider_shutdown()
325 def _set_logger(self):
326 self._logger = logging.getLogger("%s.%s.%s" % (self.__class__.__name__, self.actor_urn[33:], self.cloud_node.name))
330 self._timer.schedule(self.cloud_node_start_time + self.boot_fail_after, self._later.consider_shutdown)
332 def subscribe(self, subscriber):
333 self.subscribers.add(subscriber)
335 def _debug(self, msg, *args):
336 self._logger.debug(msg, *args)
339 """Get node state, one of ['unpaired', 'busy', 'idle', 'down']."""
341 # If this node is not associated with an Arvados node, return 'unpaired'.
342 if self.arvados_node is None:
345 state = self.arvados_node['crunch_worker_state']
347 # If state information is not available because it is missing or the
348 # record is stale, return 'down'.
349 if not state or not timestamp_fresh(arvados_node_mtime(self.arvados_node),
350 self.node_stale_after):
353 # There's a window between when a node pings for the first time and the
354 # value of 'slurm_state' is synchronized by crunch-dispatch. In this
355 # window, the node will still report as 'down'. Check that
356 # first_ping_at is truthy and consider the node 'idle' during the
357 # initial boot grace period.
358 if (state == 'down' and
359 self.arvados_node['first_ping_at'] and
360 timestamp_fresh(self.cloud_node_start_time,
361 self.boot_fail_after) and
362 not self._cloud.broken(self.cloud_node)):
365 # "missing" means last_ping_at is stale, this should be
367 if arvados_node_missing(self.arvados_node, self.node_stale_after):
370 # Turns out using 'job_uuid' this way is a bad idea. The node record
371 # is assigned the job_uuid before the job is locked (which removes it
372 # from the queue) which means the job will be double-counted as both in
373 # the wishlist and but also keeping a node busy. This end result is
374 # excess nodes being booted.
375 #if state == 'idle' and self.arvados_node['job_uuid']:
380 def in_state(self, *states):
381 return self.get_state() in states
383 def shutdown_eligible(self):
384 """Determine if node is candidate for shut down.
386 Returns a tuple of (boolean, string) where the first value is whether
387 the node is candidate for shut down, and the second value is the
388 reason for the decision.
391 # Collect states and then consult state transition table whether we
392 # should shut down. Possible states are:
393 # crunch_worker_state = ['unpaired', 'busy', 'idle', 'down']
394 # window = ["open", "closed"]
395 # boot_grace = ["boot wait", "boot exceeded"]
396 # idle_grace = ["not idle", "idle wait", "idle exceeded"]
398 if self.arvados_node and not timestamp_fresh(arvados_node_mtime(self.arvados_node), self.node_stale_after):
399 return (False, "node state is stale")
401 crunch_worker_state = self.get_state()
403 window = "open" if self._shutdowns.window_open() else "closed"
405 if timestamp_fresh(self.cloud_node_start_time, self.boot_fail_after):
406 boot_grace = "boot wait"
408 boot_grace = "boot exceeded"
410 # API server side not implemented yet.
411 idle_grace = 'idle exceeded'
413 node_state = (crunch_worker_state, window, boot_grace, idle_grace)
414 t = transitions[node_state]
416 # yes, shutdown eligible
417 return (True, "node state is %s" % (node_state,))
419 # no, return a reason
420 return (False, "node state is %s" % (node_state,))
422 def consider_shutdown(self):
424 eligible, reason = self.shutdown_eligible()
425 next_opening = self._shutdowns.next_opening()
427 self._debug("Suggesting shutdown because %s", reason)
428 _notify_subscribers(self.actor_ref.proxy(), self.subscribers)
430 self._debug("Not eligible for shut down because %s", reason)
432 if self.last_shutdown_opening != next_opening:
433 self._debug("Shutdown window closed. Next at %s.",
434 time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(next_opening)))
435 self._timer.schedule(next_opening, self._later.consider_shutdown)
436 self.last_shutdown_opening = next_opening
438 self._logger.exception("Unexpected exception")
440 def offer_arvados_pair(self, arvados_node):
441 first_ping_s = arvados_node.get('first_ping_at')
442 if (self.arvados_node is not None) or (not first_ping_s):
444 elif ((arvados_node['info'].get('ec2_instance_id') == self._cloud.node_id(self.cloud_node)) and
445 (arvados_timestamp(first_ping_s) >= self.cloud_node_start_time)):
446 self._later.update_arvados_node(arvados_node)
447 return self.cloud_node.id
451 def update_cloud_node(self, cloud_node):
452 if cloud_node is not None:
453 self.cloud_node = cloud_node
454 self._later.consider_shutdown()
456 def update_arvados_node(self, arvados_node):
457 # If the cloud node's FQDN doesn't match what's in the Arvados node
458 # record, make them match.
459 # This method is a little unusual in the way it just fires off the
460 # request without checking the result or retrying errors. That's
461 # because this update happens every time we reload the Arvados node
462 # list: if a previous sync attempt failed, we'll see that the names
463 # are out of sync and just try again. ComputeNodeUpdateActor has
464 # the logic to throttle those effective retries when there's trouble.
465 if arvados_node is not None:
466 self.arvados_node = arvados_node
467 if (self._cloud_node_fqdn(self.cloud_node) !=
468 arvados_node_fqdn(self.arvados_node)):
469 self._update.sync_node(self.cloud_node, self.arvados_node)
470 self._later.consider_shutdown()