3 from __future__ import absolute_import, print_function
9 import libcloud.common.types as cloud_types
13 arvados_node_fqdn, arvados_node_mtime, arvados_timestamp, timestamp_fresh, arvados_node_missing
14 from ...clientactor import _notify_subscribers
15 from ... import config
17 class ComputeNodeStateChangeBase(config.actor_class):
18 """Base class for actors that change a compute node's state.
20 This base class takes care of retrying changes and notifying
21 subscribers when the change is finished.
23 def __init__(self, logger_name, cloud_client, arvados_client, timer_actor,
24 retry_wait, max_retry_wait):
25 super(ComputeNodeStateChangeBase, self).__init__()
26 self._later = self.actor_ref.proxy()
27 self._logger = logging.getLogger(logger_name)
28 self._cloud = cloud_client
29 self._arvados = arvados_client
30 self._timer = timer_actor
31 self.min_retry_wait = retry_wait
32 self.max_retry_wait = max_retry_wait
33 self.retry_wait = retry_wait
34 self.subscribers = set()
37 def _retry(errors=()):
38 """Retry decorator for an actor method that makes remote requests.
40 Use this function to decorator an actor method, and pass in a
41 tuple of exceptions to catch. This decorator will schedule
42 retries of that method with exponential backoff if the
43 original method raises a known cloud driver error, or any of the
44 given exception types.
46 def decorator(orig_func):
47 @functools.wraps(orig_func)
48 def retry_wrapper(self, *args, **kwargs):
49 start_time = time.time()
51 orig_func(self, *args, **kwargs)
52 except Exception as error:
53 if not (isinstance(error, errors) or
54 self._cloud.is_cloud_exception(error)):
57 "Client error: %s - waiting %s seconds",
58 error, self.retry_wait)
59 self._timer.schedule(start_time + self.retry_wait,
63 self.retry_wait = min(self.retry_wait * 2,
66 self.retry_wait = self.min_retry_wait
71 _notify_subscribers(self._later, self.subscribers)
72 self.subscribers = None
74 def subscribe(self, subscriber):
75 if self.subscribers is None:
77 subscriber(self._later)
78 except pykka.ActorDeadError:
81 self.subscribers.add(subscriber)
83 def _clean_arvados_node(self, arvados_node, explanation):
84 return self._arvados.nodes().update(
85 uuid=arvados_node['uuid'],
86 body={'hostname': None,
89 'first_ping_at': None,
92 'info': {'ec2_instance_id': None,
93 'last_action': explanation}},
97 class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
98 """Actor to create and set up a cloud compute node.
100 This actor prepares an Arvados node record for a new compute node
101 (either creating one or cleaning one passed in), then boots the
102 actual compute node. It notifies subscribers when the cloud node
103 is successfully created (the last step in the process for Node
106 def __init__(self, timer_actor, arvados_client, cloud_client,
107 cloud_size, arvados_node=None,
108 retry_wait=1, max_retry_wait=180):
109 super(ComputeNodeSetupActor, self).__init__(
110 'arvnodeman.nodeup', cloud_client, arvados_client, timer_actor,
111 retry_wait, max_retry_wait)
112 self.cloud_size = cloud_size
113 self.arvados_node = None
114 self.cloud_node = None
115 if arvados_node is None:
116 self._later.create_arvados_node()
118 self._later.prepare_arvados_node(arvados_node)
120 @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
121 def create_arvados_node(self):
122 self.arvados_node = self._arvados.nodes().create(body={}).execute()
123 self._later.create_cloud_node()
125 @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
126 def prepare_arvados_node(self, node):
127 self.arvados_node = self._clean_arvados_node(
128 node, "Prepared by Node Manager")
129 self._later.create_cloud_node()
131 @ComputeNodeStateChangeBase._retry()
132 def create_cloud_node(self):
133 self._logger.info("Creating cloud node with size %s.",
134 self.cloud_size.name)
135 self.cloud_node = self._cloud.create_node(self.cloud_size,
137 if not self.cloud_node.size:
138 self.cloud_node.size = self.cloud_size
139 self._logger.info("Cloud node %s created.", self.cloud_node.id)
140 self._later.update_arvados_node_properties()
142 @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
143 def update_arvados_node_properties(self):
144 """Tell Arvados some details about the cloud node.
146 Currently we only include size/price from our request, which
147 we already knew before create_cloud_node(), but doing it here
148 gives us an opportunity to provide more detail from
149 self.cloud_node, too.
151 self.arvados_node['properties']['cloud_node'] = {
152 # Note this 'size' is the node size we asked the cloud
153 # driver to create -- not necessarily equal to the size
154 # reported by the cloud driver for the node that was
156 'size': self.cloud_size.id,
157 'price': self.cloud_size.price,
159 self.arvados_node = self._arvados.nodes().update(
160 uuid=self.arvados_node['uuid'],
161 body={'properties': self.arvados_node['properties']},
163 self._logger.info("%s updated properties.", self.arvados_node['uuid'])
164 self._later.post_create()
166 @ComputeNodeStateChangeBase._retry()
167 def post_create(self):
168 self._cloud.post_create_node(self.cloud_node)
169 self._logger.info("%s post-create work done.", self.cloud_node.id)
172 def stop_if_no_cloud_node(self):
173 if self.cloud_node is not None:
179 class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
180 """Actor to shut down a compute node.
182 This actor simply destroys a cloud node, retrying as needed.
184 # Reasons for a shutdown to be cancelled.
185 WINDOW_CLOSED = "shutdown window closed"
186 NODE_BROKEN = "cloud failed to shut down broken node"
188 def __init__(self, timer_actor, cloud_client, arvados_client, node_monitor,
189 cancellable=True, retry_wait=1, max_retry_wait=180):
190 # If a ShutdownActor is cancellable, it will ask the
191 # ComputeNodeMonitorActor if it's still eligible before taking each
192 # action, and stop the shutdown process if the node is no longer
193 # eligible. Normal shutdowns based on job demand should be
194 # cancellable; shutdowns based on node misbehavior should not.
195 super(ComputeNodeShutdownActor, self).__init__(
196 'arvnodeman.nodedown', cloud_client, arvados_client, timer_actor,
197 retry_wait, max_retry_wait)
198 self._monitor = node_monitor.proxy()
199 self.cloud_node = self._monitor.cloud_node.get()
200 self.cancellable = cancellable
201 self.cancel_reason = None
205 self._later.shutdown_node()
207 def _arvados_node(self):
208 return self._monitor.arvados_node.get()
210 def _finished(self, success_flag=None):
211 if success_flag is not None:
212 self.success = success_flag
213 return super(ComputeNodeShutdownActor, self)._finished()
215 def cancel_shutdown(self, reason):
216 self.cancel_reason = reason
217 self._logger.info("Cloud node %s shutdown cancelled: %s.",
218 self.cloud_node.id, reason)
219 self._finished(success_flag=False)
221 def _stop_if_window_closed(orig_func):
222 @functools.wraps(orig_func)
223 def stop_wrapper(self, *args, **kwargs):
224 if (self.cancellable and
225 (not self._monitor.shutdown_eligible().get())):
226 self._later.cancel_shutdown(self.WINDOW_CLOSED)
229 return orig_func(self, *args, **kwargs)
232 @_stop_if_window_closed
233 @ComputeNodeStateChangeBase._retry()
234 def shutdown_node(self):
235 if not self._cloud.destroy_node(self.cloud_node):
236 if self._cloud.broken(self.cloud_node):
237 self._later.cancel_shutdown(self.NODE_BROKEN)
240 raise cloud_types.LibcloudError("destroy_node failed")
241 self._logger.info("Cloud node %s shut down.", self.cloud_node.id)
242 arv_node = self._arvados_node()
244 self._finished(success_flag=True)
246 self._later.clean_arvados_node(arv_node)
248 @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
249 def clean_arvados_node(self, arvados_node):
250 self._clean_arvados_node(arvados_node, "Shut down by Node Manager")
251 self._finished(success_flag=True)
253 # Make the decorator available to subclasses.
254 _stop_if_window_closed = staticmethod(_stop_if_window_closed)
257 class ComputeNodeUpdateActor(config.actor_class):
258 """Actor to dispatch one-off cloud management requests.
260 This actor receives requests for small cloud updates, and
261 dispatches them to a real driver. ComputeNodeMonitorActors use
262 this to perform maintenance tasks on themselves. Having a
263 dedicated actor for this gives us the opportunity to control the
264 flow of requests; e.g., by backing off when errors occur.
266 This actor is most like a "traditional" Pykka actor: there's no
267 subscribing, but instead methods return real driver results. If
268 you're interested in those results, you should get them from the
269 Future that the proxy method returns. Be prepared to handle exceptions
270 from the cloud driver when you do.
272 def __init__(self, cloud_factory, max_retry_wait=180):
273 super(ComputeNodeUpdateActor, self).__init__()
274 self._cloud = cloud_factory()
275 self.max_retry_wait = max_retry_wait
276 self.error_streak = 0
277 self.next_request_time = time.time()
279 def _throttle_errors(orig_func):
280 @functools.wraps(orig_func)
281 def throttle_wrapper(self, *args, **kwargs):
282 throttle_time = self.next_request_time - time.time()
283 if throttle_time > 0:
284 time.sleep(throttle_time)
285 self.next_request_time = time.time()
287 result = orig_func(self, *args, **kwargs)
288 except Exception as error:
289 self.error_streak += 1
290 self.next_request_time += min(2 ** self.error_streak,
294 self.error_streak = 0
296 return throttle_wrapper
299 def sync_node(self, cloud_node, arvados_node):
300 return self._cloud.sync_node(cloud_node, arvados_node)
303 class ComputeNodeMonitorActor(config.actor_class):
304 """Actor to manage a running compute node.
306 This actor gets updates about a compute node's cloud and Arvados records.
307 It uses this information to notify subscribers when the node is eligible
310 def __init__(self, cloud_node, cloud_node_start_time, shutdown_timer,
311 cloud_fqdn_func, timer_actor, update_actor, cloud_client,
312 arvados_node=None, poll_stale_after=600, node_stale_after=3600,
315 super(ComputeNodeMonitorActor, self).__init__()
316 self._later = self.actor_ref.proxy()
317 self._logger = logging.getLogger('arvnodeman.computenode')
318 self._last_log = None
319 self._shutdowns = shutdown_timer
320 self._cloud_node_fqdn = cloud_fqdn_func
321 self._timer = timer_actor
322 self._update = update_actor
323 self._cloud = cloud_client
324 self.cloud_node = cloud_node
325 self.cloud_node_start_time = cloud_node_start_time
326 self.poll_stale_after = poll_stale_after
327 self.node_stale_after = node_stale_after
328 self.boot_fail_after = boot_fail_after
329 self.subscribers = set()
330 self.arvados_node = None
331 self._later.update_arvados_node(arvados_node)
332 self.last_shutdown_opening = None
333 self._later.consider_shutdown()
335 def subscribe(self, subscriber):
336 self.subscribers.add(subscriber)
338 def _debug(self, msg, *args):
339 if msg == self._last_log:
342 self._logger.debug(msg, *args)
344 def in_state(self, *states):
345 # Return a boolean to say whether or not our Arvados node record is in
346 # one of the given states. If state information is not
347 # available--because this node has no Arvados record, the record is
348 # stale, or the record has no state information--return None.
349 if (self.arvados_node is None) or not timestamp_fresh(
350 arvados_node_mtime(self.arvados_node), self.node_stale_after):
352 state = self.arvados_node['crunch_worker_state']
355 result = state in states
357 result = result and not self.arvados_node['job_uuid']
360 def shutdown_eligible(self):
361 if not self._shutdowns.window_open():
363 if self.arvados_node is None:
365 # If it hasn't pinged Arvados after boot_fail seconds, shut it down
366 return not timestamp_fresh(self.cloud_node_start_time, self.boot_fail_after)
367 missing = arvados_node_missing(self.arvados_node, self.node_stale_after)
368 if missing and self._cloud.broken(self.cloud_node):
369 # Node is paired, but Arvados says it is missing and the cloud says the node
370 # is in an error state, so shut it down.
372 if missing is None and self._cloud.broken(self.cloud_node):
373 self._logger.warning(
374 "cloud reports broken node, but paired node %s never pinged "
375 "(bug?) -- skipped check for node_stale_after",
376 self.arvados_node['uuid'])
377 return self.in_state('idle')
379 def consider_shutdown(self):
380 next_opening = self._shutdowns.next_opening()
381 if self.shutdown_eligible():
382 self._debug("Node %s suggesting shutdown.", self.cloud_node.id)
383 _notify_subscribers(self._later, self.subscribers)
384 elif self._shutdowns.window_open():
385 self._debug("Node %s shutdown window open but node busy.",
387 elif self.last_shutdown_opening != next_opening:
388 self._debug("Node %s shutdown window closed. Next at %s.",
389 self.cloud_node.id, time.ctime(next_opening))
390 self._timer.schedule(next_opening, self._later.consider_shutdown)
391 self.last_shutdown_opening = next_opening
393 def offer_arvados_pair(self, arvados_node):
394 first_ping_s = arvados_node.get('first_ping_at')
395 if (self.arvados_node is not None) or (not first_ping_s):
397 elif ((arvados_node['ip_address'] in self.cloud_node.private_ips) and
398 (arvados_timestamp(first_ping_s) >= self.cloud_node_start_time)):
399 self._later.update_arvados_node(arvados_node)
400 return self.cloud_node.id
404 def update_cloud_node(self, cloud_node):
405 if cloud_node is not None:
406 self.cloud_node = cloud_node
407 self._later.consider_shutdown()
409 def update_arvados_node(self, arvados_node):
410 # If the cloud node's FQDN doesn't match what's in the Arvados node
411 # record, make them match.
412 # This method is a little unusual in the way it just fires off the
413 # request without checking the result or retrying errors. That's
414 # because this update happens every time we reload the Arvados node
415 # list: if a previous sync attempt failed, we'll see that the names
416 # are out of sync and just try again. ComputeNodeUpdateActor has
417 # the logic to throttle those effective retries when there's trouble.
418 if arvados_node is not None:
419 self.arvados_node = arvados_node
420 if (self._cloud_node_fqdn(self.cloud_node) !=
421 arvados_node_fqdn(self.arvados_node)):
422 self._update.sync_node(self.cloud_node, self.arvados_node)
423 self._later.consider_shutdown()