3 from __future__ import absolute_import, print_function
9 import libcloud.common.types as cloud_types
12 from .. import arvados_node_fqdn, arvados_node_mtime, timestamp_fresh
13 from ...clientactor import _notify_subscribers
14 from ... import config
16 class ComputeNodeStateChangeBase(config.actor_class):
17 """Base class for actors that change a compute node's state.
19 This base class takes care of retrying changes and notifying
20 subscribers when the change is finished.
22 def __init__(self, logger_name, timer_actor, retry_wait, max_retry_wait):
23 super(ComputeNodeStateChangeBase, self).__init__()
24 self._later = self.actor_ref.proxy()
25 self._timer = timer_actor
26 self._logger = logging.getLogger(logger_name)
27 self.min_retry_wait = retry_wait
28 self.max_retry_wait = max_retry_wait
29 self.retry_wait = retry_wait
30 self.subscribers = set()
34 """Retry decorator for an actor method that makes remote requests.
36 Use this function to decorator an actor method, and pass in a
37 tuple of exceptions to catch. This decorator will schedule
38 retries of that method with exponential backoff if the
39 original method raises any of the given errors.
41 def decorator(orig_func):
42 @functools.wraps(orig_func)
43 def wrapper(self, *args, **kwargs):
45 orig_func(self, *args, **kwargs)
46 except errors as error:
48 "Client error: %s - waiting %s seconds",
49 error, self.retry_wait)
50 self._timer.schedule(self.retry_wait,
54 self.retry_wait = min(self.retry_wait * 2,
57 self.retry_wait = self.min_retry_wait
62 _notify_subscribers(self._later, self.subscribers)
63 self.subscribers = None
65 def subscribe(self, subscriber):
66 if self.subscribers is None:
68 subscriber(self._later)
69 except pykka.ActorDeadError:
72 self.subscribers.add(subscriber)
75 class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
76 """Actor to create and set up a cloud compute node.
78 This actor prepares an Arvados node record for a new compute node
79 (either creating one or cleaning one passed in), then boots the
80 actual compute node. It notifies subscribers when the cloud node
81 is successfully created (the last step in the process for Node
84 def __init__(self, timer_actor, arvados_client, cloud_client,
85 cloud_size, arvados_node=None,
86 retry_wait=1, max_retry_wait=180):
87 super(ComputeNodeSetupActor, self).__init__(
88 'arvnodeman.nodeup', timer_actor, retry_wait, max_retry_wait)
89 self._arvados = arvados_client
90 self._cloud = cloud_client
91 self.cloud_size = cloud_size
92 self.arvados_node = None
93 self.cloud_node = None
94 if arvados_node is None:
95 self._later.create_arvados_node()
97 self._later.prepare_arvados_node(arvados_node)
99 @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
100 def create_arvados_node(self):
101 self.arvados_node = self._arvados.nodes().create(body={}).execute()
102 self._later.create_cloud_node()
104 @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
105 def prepare_arvados_node(self, node):
106 self.arvados_node = self._arvados.nodes().update(
108 body={'hostname': None,
111 'first_ping_at': None,
112 'last_ping_at': None,
113 'info': {'ec2_instance_id': None,
114 'last_action': "Prepared by Node Manager"}}
116 self._later.create_cloud_node()
118 @ComputeNodeStateChangeBase._retry(config.CLOUD_ERRORS)
119 def create_cloud_node(self):
120 self._logger.info("Creating cloud node with size %s.",
121 self.cloud_size.name)
122 self.cloud_node = self._cloud.create_node(self.cloud_size,
124 self._logger.info("Cloud node %s created.", self.cloud_node.id)
127 def stop_if_no_cloud_node(self):
128 if self.cloud_node is None:
132 class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
133 """Actor to shut down a compute node.
135 This actor simply destroys a cloud node, retrying as needed.
137 def __init__(self, timer_actor, cloud_client, node_monitor,
138 retry_wait=1, max_retry_wait=180):
139 super(ComputeNodeShutdownActor, self).__init__(
140 'arvnodeman.nodedown', timer_actor, retry_wait, max_retry_wait)
141 self._cloud = cloud_client
142 self._monitor = node_monitor.proxy()
143 self.cloud_node = self._monitor.cloud_node.get()
147 self._later.shutdown_node()
149 def cancel_shutdown(self):
153 def _stop_if_window_closed(orig_func):
154 @functools.wraps(orig_func)
155 def wrapper(self, *args, **kwargs):
156 if not self._monitor.shutdown_eligible().get():
158 "Cloud node %s shutdown cancelled - no longer eligible.",
160 self._later.cancel_shutdown()
163 return orig_func(self, *args, **kwargs)
166 @_stop_if_window_closed
167 @ComputeNodeStateChangeBase._retry(config.CLOUD_ERRORS)
168 def shutdown_node(self):
169 if self._cloud.destroy_node(self.cloud_node):
170 self._logger.info("Cloud node %s shut down.", self.cloud_node.id)
175 raise cloud_types.LibcloudError("destroy_node failed")
177 # Make the decorator available to subclasses.
178 _stop_if_window_closed = staticmethod(_stop_if_window_closed)
181 class ComputeNodeUpdateActor(config.actor_class):
182 """Actor to dispatch one-off cloud management requests.
184 This actor receives requests for small cloud updates, and
185 dispatches them to a real driver. ComputeNodeMonitorActors use
186 this to perform maintenance tasks on themselves. Having a
187 dedicated actor for this gives us the opportunity to control the
188 flow of requests; e.g., by backing off when errors occur.
190 This actor is most like a "traditional" Pykka actor: there's no
191 subscribing, but instead methods return real driver results. If
192 you're interested in those results, you should get them from the
193 Future that the proxy method returns. Be prepared to handle exceptions
194 from the cloud driver when you do.
196 def __init__(self, cloud_factory, max_retry_wait=180):
197 super(ComputeNodeUpdateActor, self).__init__()
198 self._cloud = cloud_factory()
199 self.max_retry_wait = max_retry_wait
200 self.error_streak = 0
201 self.next_request_time = time.time()
203 def _throttle_errors(orig_func):
204 @functools.wraps(orig_func)
205 def wrapper(self, *args, **kwargs):
206 throttle_time = self.next_request_time - time.time()
207 if throttle_time > 0:
208 time.sleep(throttle_time)
209 self.next_request_time = time.time()
211 result = orig_func(self, *args, **kwargs)
212 except config.CLOUD_ERRORS:
213 self.error_streak += 1
214 self.next_request_time += min(2 ** self.error_streak,
218 self.error_streak = 0
223 def sync_node(self, cloud_node, arvados_node):
224 return self._cloud.sync_node(cloud_node, arvados_node)
227 class ComputeNodeMonitorActor(config.actor_class):
228 """Actor to manage a running compute node.
230 This actor gets updates about a compute node's cloud and Arvados records.
231 It uses this information to notify subscribers when the node is eligible
234 def __init__(self, cloud_node, cloud_node_start_time, shutdown_timer,
235 timer_actor, update_actor, arvados_node=None,
236 poll_stale_after=600, node_stale_after=3600):
237 super(ComputeNodeMonitorActor, self).__init__()
238 self._later = self.actor_ref.proxy()
239 self._logger = logging.getLogger('arvnodeman.computenode')
240 self._last_log = None
241 self._shutdowns = shutdown_timer
242 self._timer = timer_actor
243 self._update = update_actor
244 self.cloud_node = cloud_node
245 self.cloud_node_start_time = cloud_node_start_time
246 self.poll_stale_after = poll_stale_after
247 self.node_stale_after = node_stale_after
248 self.subscribers = set()
249 self.arvados_node = None
250 self._later.update_arvados_node(arvados_node)
251 self.last_shutdown_opening = None
252 self._later.consider_shutdown()
254 def subscribe(self, subscriber):
255 self.subscribers.add(subscriber)
257 def _debug(self, msg, *args):
258 if msg == self._last_log:
261 self._logger.debug(msg, *args)
263 def in_state(self, *states):
264 # Return a boolean to say whether or not our Arvados node record is in
265 # one of the given states. If state information is not
266 # available--because this node has no Arvados record, the record is
267 # stale, or the record has no state information--return None.
268 if (self.arvados_node is None) or not timestamp_fresh(
269 arvados_node_mtime(self.arvados_node), self.node_stale_after):
271 state = self.arvados_node['info'].get('slurm_state')
274 result = state in states
276 result = result and not self.arvados_node['job_uuid']
279 def shutdown_eligible(self):
280 if not self._shutdowns.window_open():
282 elif self.arvados_node is None:
283 # If this is a new, unpaired node, it's eligible for
284 # shutdown--we figure there was an error during bootstrap.
285 return timestamp_fresh(self.cloud_node_start_time,
286 self.node_stale_after)
288 return self.in_state('idle')
290 def consider_shutdown(self):
291 next_opening = self._shutdowns.next_opening()
292 if self.shutdown_eligible():
293 self._debug("Node %s suggesting shutdown.", self.cloud_node.id)
294 _notify_subscribers(self._later, self.subscribers)
295 elif self._shutdowns.window_open():
296 self._debug("Node %s shutdown window open but node busy.",
298 elif self.last_shutdown_opening != next_opening:
299 self._debug("Node %s shutdown window closed. Next at %s.",
300 self.cloud_node.id, time.ctime(next_opening))
301 self._timer.schedule(next_opening, self._later.consider_shutdown)
302 self.last_shutdown_opening = next_opening
304 def offer_arvados_pair(self, arvados_node):
305 if self.arvados_node is not None:
307 elif arvados_node['ip_address'] in self.cloud_node.private_ips:
308 self._later.update_arvados_node(arvados_node)
309 return self.cloud_node.id
313 def update_cloud_node(self, cloud_node):
314 if cloud_node is not None:
315 self.cloud_node = cloud_node
316 self._later.consider_shutdown()
318 def update_arvados_node(self, arvados_node):
319 if arvados_node is not None:
320 self.arvados_node = arvados_node
321 new_hostname = arvados_node_fqdn(self.arvados_node)
322 if new_hostname != self.cloud_node.name:
323 self._update.sync_node(self.cloud_node, self.arvados_node)
324 self._later.consider_shutdown()