3 from __future__ import absolute_import, print_function
9 import libcloud.common.types as cloud_types
12 from .. import arvados_node_fqdn, arvados_node_mtime, timestamp_fresh
13 from ...clientactor import _notify_subscribers
14 from ... import config
16 class ComputeNodeStateChangeBase(config.actor_class):
17 """Base class for actors that change a compute node's state.
19 This base class takes care of retrying changes and notifying
20 subscribers when the change is finished.
22 def __init__(self, logger_name, timer_actor, retry_wait, max_retry_wait):
23 super(ComputeNodeStateChangeBase, self).__init__()
24 self._later = self.actor_ref.proxy()
25 self._timer = timer_actor
26 self._logger = logging.getLogger(logger_name)
27 self.min_retry_wait = retry_wait
28 self.max_retry_wait = max_retry_wait
29 self.retry_wait = retry_wait
30 self.subscribers = set()
34 """Retry decorator for an actor method that makes remote requests.
36 Use this function to decorator an actor method, and pass in a
37 tuple of exceptions to catch. This decorator will schedule
38 retries of that method with exponential backoff if the
39 original method raises any of the given errors.
41 def decorator(orig_func):
42 @functools.wraps(orig_func)
43 def wrapper(self, *args, **kwargs):
44 start_time = time.time()
46 orig_func(self, *args, **kwargs)
47 except errors as error:
49 "Client error: %s - waiting %s seconds",
50 error, self.retry_wait)
51 self._timer.schedule(start_time + self.retry_wait,
55 self.retry_wait = min(self.retry_wait * 2,
58 self.retry_wait = self.min_retry_wait
63 _notify_subscribers(self._later, self.subscribers)
64 self.subscribers = None
66 def subscribe(self, subscriber):
67 if self.subscribers is None:
69 subscriber(self._later)
70 except pykka.ActorDeadError:
73 self.subscribers.add(subscriber)
76 class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
77 """Actor to create and set up a cloud compute node.
79 This actor prepares an Arvados node record for a new compute node
80 (either creating one or cleaning one passed in), then boots the
81 actual compute node. It notifies subscribers when the cloud node
82 is successfully created (the last step in the process for Node
85 def __init__(self, timer_actor, arvados_client, cloud_client,
86 cloud_size, arvados_node=None,
87 retry_wait=1, max_retry_wait=180):
88 super(ComputeNodeSetupActor, self).__init__(
89 'arvnodeman.nodeup', timer_actor, retry_wait, max_retry_wait)
90 self._arvados = arvados_client
91 self._cloud = cloud_client
92 self.cloud_size = cloud_size
93 self.arvados_node = None
94 self.cloud_node = None
95 if arvados_node is None:
96 self._later.create_arvados_node()
98 self._later.prepare_arvados_node(arvados_node)
100 @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
101 def create_arvados_node(self):
102 self.arvados_node = self._arvados.nodes().create(body={}).execute()
103 self._later.create_cloud_node()
105 @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
106 def prepare_arvados_node(self, node):
107 self.arvados_node = self._arvados.nodes().update(
109 body={'hostname': None,
112 'first_ping_at': None,
113 'last_ping_at': None,
114 'info': {'ec2_instance_id': None,
115 'last_action': "Prepared by Node Manager"}}
117 self._later.create_cloud_node()
119 @ComputeNodeStateChangeBase._retry(config.CLOUD_ERRORS)
120 def create_cloud_node(self):
121 self._logger.info("Creating cloud node with size %s.",
122 self.cloud_size.name)
123 self.cloud_node = self._cloud.create_node(self.cloud_size,
125 self._logger.info("Cloud node %s created.", self.cloud_node.id)
128 def stop_if_no_cloud_node(self):
129 if self.cloud_node is None:
133 class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
134 """Actor to shut down a compute node.
136 This actor simply destroys a cloud node, retrying as needed.
138 def __init__(self, timer_actor, cloud_client, node_monitor,
139 cancellable=True, retry_wait=1, max_retry_wait=180):
140 # If a ShutdownActor is cancellable, it will ask the
141 # ComputeNodeMonitorActor if it's still eligible before taking each
142 # action, and stop the shutdown process if the node is no longer
143 # eligible. Normal shutdowns based on job demand should be
144 # cancellable; shutdowns based on node misbehavior should not.
145 super(ComputeNodeShutdownActor, self).__init__(
146 'arvnodeman.nodedown', timer_actor, retry_wait, max_retry_wait)
147 self._cloud = cloud_client
148 self._monitor = node_monitor.proxy()
149 self.cloud_node = self._monitor.cloud_node.get()
150 self.cancellable = cancellable
154 self._later.shutdown_node()
156 def cancel_shutdown(self):
160 def _stop_if_window_closed(orig_func):
161 @functools.wraps(orig_func)
162 def wrapper(self, *args, **kwargs):
163 if (self.cancellable and
164 (not self._monitor.shutdown_eligible().get())):
166 "Cloud node %s shutdown cancelled - no longer eligible.",
168 self._later.cancel_shutdown()
171 return orig_func(self, *args, **kwargs)
174 @_stop_if_window_closed
175 @ComputeNodeStateChangeBase._retry(config.CLOUD_ERRORS)
176 def shutdown_node(self):
177 if self._cloud.destroy_node(self.cloud_node):
178 self._logger.info("Cloud node %s shut down.", self.cloud_node.id)
183 raise cloud_types.LibcloudError("destroy_node failed")
185 # Make the decorator available to subclasses.
186 _stop_if_window_closed = staticmethod(_stop_if_window_closed)
189 class ComputeNodeUpdateActor(config.actor_class):
190 """Actor to dispatch one-off cloud management requests.
192 This actor receives requests for small cloud updates, and
193 dispatches them to a real driver. ComputeNodeMonitorActors use
194 this to perform maintenance tasks on themselves. Having a
195 dedicated actor for this gives us the opportunity to control the
196 flow of requests; e.g., by backing off when errors occur.
198 This actor is most like a "traditional" Pykka actor: there's no
199 subscribing, but instead methods return real driver results. If
200 you're interested in those results, you should get them from the
201 Future that the proxy method returns. Be prepared to handle exceptions
202 from the cloud driver when you do.
204 def __init__(self, cloud_factory, max_retry_wait=180):
205 super(ComputeNodeUpdateActor, self).__init__()
206 self._cloud = cloud_factory()
207 self.max_retry_wait = max_retry_wait
208 self.error_streak = 0
209 self.next_request_time = time.time()
211 def _throttle_errors(orig_func):
212 @functools.wraps(orig_func)
213 def wrapper(self, *args, **kwargs):
214 throttle_time = self.next_request_time - time.time()
215 if throttle_time > 0:
216 time.sleep(throttle_time)
217 self.next_request_time = time.time()
219 result = orig_func(self, *args, **kwargs)
220 except config.CLOUD_ERRORS:
221 self.error_streak += 1
222 self.next_request_time += min(2 ** self.error_streak,
226 self.error_streak = 0
231 def sync_node(self, cloud_node, arvados_node):
232 return self._cloud.sync_node(cloud_node, arvados_node)
235 class ComputeNodeMonitorActor(config.actor_class):
236 """Actor to manage a running compute node.
238 This actor gets updates about a compute node's cloud and Arvados records.
239 It uses this information to notify subscribers when the node is eligible
242 def __init__(self, cloud_node, cloud_node_start_time, shutdown_timer,
243 timer_actor, update_actor, arvados_node=None,
244 poll_stale_after=600, node_stale_after=3600):
245 super(ComputeNodeMonitorActor, self).__init__()
246 self._later = self.actor_ref.proxy()
247 self._logger = logging.getLogger('arvnodeman.computenode')
248 self._last_log = None
249 self._shutdowns = shutdown_timer
250 self._timer = timer_actor
251 self._update = update_actor
252 self.cloud_node = cloud_node
253 self.cloud_node_start_time = cloud_node_start_time
254 self.poll_stale_after = poll_stale_after
255 self.node_stale_after = node_stale_after
256 self.subscribers = set()
257 self.arvados_node = None
258 self._later.update_arvados_node(arvados_node)
259 self.last_shutdown_opening = None
260 self._later.consider_shutdown()
262 def subscribe(self, subscriber):
263 self.subscribers.add(subscriber)
265 def _debug(self, msg, *args):
266 if msg == self._last_log:
269 self._logger.debug(msg, *args)
271 def in_state(self, *states):
272 # Return a boolean to say whether or not our Arvados node record is in
273 # one of the given states. If state information is not
274 # available--because this node has no Arvados record, the record is
275 # stale, or the record has no state information--return None.
276 if (self.arvados_node is None) or not timestamp_fresh(
277 arvados_node_mtime(self.arvados_node), self.node_stale_after):
279 state = self.arvados_node['info'].get('slurm_state')
282 result = state in states
284 result = result and not self.arvados_node['job_uuid']
287 def shutdown_eligible(self):
288 if not self._shutdowns.window_open():
290 elif self.arvados_node is None:
291 # If this is a new, unpaired node, it's eligible for
292 # shutdown--we figure there was an error during bootstrap.
293 return timestamp_fresh(self.cloud_node_start_time,
294 self.node_stale_after)
296 return self.in_state('idle')
298 def consider_shutdown(self):
299 next_opening = self._shutdowns.next_opening()
300 if self.shutdown_eligible():
301 self._debug("Node %s suggesting shutdown.", self.cloud_node.id)
302 _notify_subscribers(self._later, self.subscribers)
303 elif self._shutdowns.window_open():
304 self._debug("Node %s shutdown window open but node busy.",
306 elif self.last_shutdown_opening != next_opening:
307 self._debug("Node %s shutdown window closed. Next at %s.",
308 self.cloud_node.id, time.ctime(next_opening))
309 self._timer.schedule(next_opening, self._later.consider_shutdown)
310 self.last_shutdown_opening = next_opening
312 def offer_arvados_pair(self, arvados_node):
313 if self.arvados_node is not None:
315 elif arvados_node['ip_address'] in self.cloud_node.private_ips:
316 self._later.update_arvados_node(arvados_node)
317 return self.cloud_node.id
321 def update_cloud_node(self, cloud_node):
322 if cloud_node is not None:
323 self.cloud_node = cloud_node
324 self._later.consider_shutdown()
326 def update_arvados_node(self, arvados_node):
327 if arvados_node is not None:
328 self.arvados_node = arvados_node
329 new_hostname = arvados_node_fqdn(self.arvados_node)
330 if new_hostname != self.cloud_node.name:
331 self._update.sync_node(self.cloud_node, self.arvados_node)
332 self._later.consider_shutdown()