3 from __future__ import absolute_import, print_function
9 import libcloud.common.types as cloud_types
13 arvados_node_fqdn, arvados_node_mtime, arvados_timestamp, timestamp_fresh, arvados_node_missing
14 from ...clientactor import _notify_subscribers
15 from ... import config
17 class ComputeNodeStateChangeBase(config.actor_class):
18 """Base class for actors that change a compute node's state.
20 This base class takes care of retrying changes and notifying
21 subscribers when the change is finished.
23 def __init__(self, logger_name, cloud_client, arvados_client, timer_actor,
24 retry_wait, max_retry_wait):
25 super(ComputeNodeStateChangeBase, self).__init__()
26 self._later = self.actor_ref.proxy()
27 self._logger = logging.getLogger(logger_name)
28 self._cloud = cloud_client
29 self._arvados = arvados_client
30 self._timer = timer_actor
31 self.min_retry_wait = retry_wait
32 self.max_retry_wait = max_retry_wait
33 self.retry_wait = retry_wait
34 self.subscribers = set()
37 def _retry(errors=()):
38 """Retry decorator for an actor method that makes remote requests.
40 Use this function to decorator an actor method, and pass in a
41 tuple of exceptions to catch. This decorator will schedule
42 retries of that method with exponential backoff if the
43 original method raises a known cloud driver error, or any of the
44 given exception types.
46 def decorator(orig_func):
47 @functools.wraps(orig_func)
48 def retry_wrapper(self, *args, **kwargs):
49 start_time = time.time()
51 orig_func(self, *args, **kwargs)
52 except Exception as error:
53 if not (isinstance(error, errors) or
54 self._cloud.is_cloud_exception(error)):
57 "Client error: %s - waiting %s seconds",
58 error, self.retry_wait)
59 self._timer.schedule(start_time + self.retry_wait,
63 self.retry_wait = min(self.retry_wait * 2,
66 self.retry_wait = self.min_retry_wait
71 _notify_subscribers(self._later, self.subscribers)
72 self.subscribers = None
74 def subscribe(self, subscriber):
75 if self.subscribers is None:
77 subscriber(self._later)
78 except pykka.ActorDeadError:
81 self.subscribers.add(subscriber)
83 def _clean_arvados_node(self, arvados_node, explanation):
84 return self._arvados.nodes().update(
85 uuid=arvados_node['uuid'],
86 body={'hostname': None,
89 'first_ping_at': None,
92 'info': {'ec2_instance_id': None,
93 'last_action': explanation}},
97 class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
98 """Actor to create and set up a cloud compute node.
100 This actor prepares an Arvados node record for a new compute node
101 (either creating one or cleaning one passed in), then boots the
102 actual compute node. It notifies subscribers when the cloud node
103 is successfully created (the last step in the process for Node
106 def __init__(self, timer_actor, arvados_client, cloud_client,
107 cloud_size, arvados_node=None,
108 retry_wait=1, max_retry_wait=180):
109 super(ComputeNodeSetupActor, self).__init__(
110 'arvnodeman.nodeup', cloud_client, arvados_client, timer_actor,
111 retry_wait, max_retry_wait)
112 self.cloud_size = cloud_size
113 self.arvados_node = None
114 self.cloud_node = None
115 if arvados_node is None:
116 self._later.create_arvados_node()
118 self._later.prepare_arvados_node(arvados_node)
120 @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
121 def create_arvados_node(self):
122 self.arvados_node = self._arvados.nodes().create(body={}).execute()
123 self._later.create_cloud_node()
125 @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
126 def prepare_arvados_node(self, node):
127 self.arvados_node = self._clean_arvados_node(
128 node, "Prepared by Node Manager")
129 self._later.create_cloud_node()
131 @ComputeNodeStateChangeBase._retry()
132 def create_cloud_node(self):
133 self._logger.info("Creating cloud node with size %s.",
134 self.cloud_size.name)
135 self.cloud_node = self._cloud.create_node(self.cloud_size,
137 self._logger.info("Cloud node %s created.", self.cloud_node.id)
138 self._later.update_arvados_node_properties()
140 @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
141 def update_arvados_node_properties(self):
142 """Tell Arvados some details about the cloud node.
144 Currently we only include size/price from our request, which
145 we already knew before create_cloud_node(), but doing it here
146 gives us an opportunity to provide more detail from
147 self.cloud_node, too.
149 self.arvados_node['properties']['cloud_node'] = {
150 # Note this 'size' is the node size we asked the cloud
151 # driver to create -- not necessarily equal to the size
152 # reported by the cloud driver for the node that was
154 'size': self.cloud_size.id,
155 'price': self.cloud_size.price,
157 self.arvados_node = self._arvados.nodes().update(
158 uuid=self.arvados_node['uuid'],
159 body={'properties': self.arvados_node['properties']},
161 self._logger.info("%s updated properties.", self.arvados_node['uuid'])
162 self._later.post_create()
164 @ComputeNodeStateChangeBase._retry()
165 def post_create(self):
166 self._cloud.post_create_node(self.cloud_node)
167 self._logger.info("%s post-create work done.", self.cloud_node.id)
170 def stop_if_no_cloud_node(self):
171 if self.cloud_node is not None:
177 class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
178 """Actor to shut down a compute node.
180 This actor simply destroys a cloud node, retrying as needed.
182 # Reasons for a shutdown to be cancelled.
183 WINDOW_CLOSED = "shutdown window closed"
184 NODE_BROKEN = "cloud failed to shut down broken node"
186 def __init__(self, timer_actor, cloud_client, arvados_client, node_monitor,
187 cancellable=True, retry_wait=1, max_retry_wait=180):
188 # If a ShutdownActor is cancellable, it will ask the
189 # ComputeNodeMonitorActor if it's still eligible before taking each
190 # action, and stop the shutdown process if the node is no longer
191 # eligible. Normal shutdowns based on job demand should be
192 # cancellable; shutdowns based on node misbehavior should not.
193 super(ComputeNodeShutdownActor, self).__init__(
194 'arvnodeman.nodedown', cloud_client, arvados_client, timer_actor,
195 retry_wait, max_retry_wait)
196 self._monitor = node_monitor.proxy()
197 self.cloud_node = self._monitor.cloud_node.get()
198 self.cancellable = cancellable
199 self.cancel_reason = None
203 self._later.shutdown_node()
205 def _arvados_node(self):
206 return self._monitor.arvados_node.get()
208 def _finished(self, success_flag=None):
209 if success_flag is not None:
210 self.success = success_flag
211 return super(ComputeNodeShutdownActor, self)._finished()
213 def cancel_shutdown(self, reason):
214 self.cancel_reason = reason
215 self._logger.info("Cloud node %s shutdown cancelled: %s.",
216 self.cloud_node.id, reason)
217 self._finished(success_flag=False)
219 def _stop_if_window_closed(orig_func):
220 @functools.wraps(orig_func)
221 def stop_wrapper(self, *args, **kwargs):
222 if (self.cancellable and
223 (not self._monitor.shutdown_eligible().get())):
224 self._later.cancel_shutdown(self.WINDOW_CLOSED)
227 return orig_func(self, *args, **kwargs)
230 @_stop_if_window_closed
231 @ComputeNodeStateChangeBase._retry()
232 def shutdown_node(self):
233 if not self._cloud.destroy_node(self.cloud_node):
234 if self._cloud.broken(self.cloud_node):
235 self._later.cancel_shutdown(self.NODE_BROKEN)
238 raise cloud_types.LibcloudError("destroy_node failed")
239 self._logger.info("Cloud node %s shut down.", self.cloud_node.id)
240 arv_node = self._arvados_node()
242 self._finished(success_flag=True)
244 self._later.clean_arvados_node(arv_node)
246 @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
247 def clean_arvados_node(self, arvados_node):
248 self._clean_arvados_node(arvados_node, "Shut down by Node Manager")
249 self._finished(success_flag=True)
251 # Make the decorator available to subclasses.
252 _stop_if_window_closed = staticmethod(_stop_if_window_closed)
255 class ComputeNodeUpdateActor(config.actor_class):
256 """Actor to dispatch one-off cloud management requests.
258 This actor receives requests for small cloud updates, and
259 dispatches them to a real driver. ComputeNodeMonitorActors use
260 this to perform maintenance tasks on themselves. Having a
261 dedicated actor for this gives us the opportunity to control the
262 flow of requests; e.g., by backing off when errors occur.
264 This actor is most like a "traditional" Pykka actor: there's no
265 subscribing, but instead methods return real driver results. If
266 you're interested in those results, you should get them from the
267 Future that the proxy method returns. Be prepared to handle exceptions
268 from the cloud driver when you do.
270 def __init__(self, cloud_factory, max_retry_wait=180):
271 super(ComputeNodeUpdateActor, self).__init__()
272 self._cloud = cloud_factory()
273 self.max_retry_wait = max_retry_wait
274 self.error_streak = 0
275 self.next_request_time = time.time()
277 def _throttle_errors(orig_func):
278 @functools.wraps(orig_func)
279 def throttle_wrapper(self, *args, **kwargs):
280 throttle_time = self.next_request_time - time.time()
281 if throttle_time > 0:
282 time.sleep(throttle_time)
283 self.next_request_time = time.time()
285 result = orig_func(self, *args, **kwargs)
286 except Exception as error:
287 self.error_streak += 1
288 self.next_request_time += min(2 ** self.error_streak,
292 self.error_streak = 0
294 return throttle_wrapper
297 def sync_node(self, cloud_node, arvados_node):
298 return self._cloud.sync_node(cloud_node, arvados_node)
301 class ComputeNodeMonitorActor(config.actor_class):
302 """Actor to manage a running compute node.
304 This actor gets updates about a compute node's cloud and Arvados records.
305 It uses this information to notify subscribers when the node is eligible
308 def __init__(self, cloud_node, cloud_node_start_time, shutdown_timer,
309 cloud_fqdn_func, timer_actor, update_actor, cloud_client,
310 arvados_node=None, poll_stale_after=600, node_stale_after=3600,
313 super(ComputeNodeMonitorActor, self).__init__()
314 self._later = self.actor_ref.proxy()
315 self._logger = logging.getLogger('arvnodeman.computenode')
316 self._last_log = None
317 self._shutdowns = shutdown_timer
318 self._cloud_node_fqdn = cloud_fqdn_func
319 self._timer = timer_actor
320 self._update = update_actor
321 self._cloud = cloud_client
322 self.cloud_node = cloud_node
323 self.cloud_node_start_time = cloud_node_start_time
324 self.poll_stale_after = poll_stale_after
325 self.node_stale_after = node_stale_after
326 self.boot_fail_after = boot_fail_after
327 self.subscribers = set()
328 self.arvados_node = None
329 self._later.update_arvados_node(arvados_node)
330 self.last_shutdown_opening = None
331 self._later.consider_shutdown()
333 def subscribe(self, subscriber):
334 self.subscribers.add(subscriber)
336 def _debug(self, msg, *args):
337 if msg == self._last_log:
340 self._logger.debug(msg, *args)
342 def in_state(self, *states):
343 # Return a boolean to say whether or not our Arvados node record is in
344 # one of the given states. If state information is not
345 # available--because this node has no Arvados record, the record is
346 # stale, or the record has no state information--return None.
347 if (self.arvados_node is None) or not timestamp_fresh(
348 arvados_node_mtime(self.arvados_node), self.node_stale_after):
350 state = self.arvados_node['crunch_worker_state']
353 result = state in states
355 result = result and not self.arvados_node['job_uuid']
358 def shutdown_eligible(self):
359 if not self._shutdowns.window_open():
361 if self.arvados_node is None:
363 # If it hasn't pinged Arvados after boot_fail seconds, shut it down
364 return not timestamp_fresh(self.cloud_node_start_time, self.boot_fail_after)
365 missing = arvados_node_missing(self.arvados_node, self.node_stale_after)
366 if missing and self._cloud.broken(self.cloud_node):
367 # Node is paired, but Arvados says it is missing and the cloud says the node
368 # is in an error state, so shut it down.
370 if missing is None and self._cloud.broken(self.cloud_node):
371 self._logger.warning(
372 "cloud reports broken node, but paired node %s never pinged "
373 "(bug?) -- skipped check for node_stale_after",
374 self.arvados_node['uuid'])
375 return self.in_state('idle')
377 def consider_shutdown(self):
378 next_opening = self._shutdowns.next_opening()
379 if self.shutdown_eligible():
380 self._debug("Node %s suggesting shutdown.", self.cloud_node.id)
381 _notify_subscribers(self._later, self.subscribers)
382 elif self._shutdowns.window_open():
383 self._debug("Node %s shutdown window open but node busy.",
385 elif self.last_shutdown_opening != next_opening:
386 self._debug("Node %s shutdown window closed. Next at %s.",
387 self.cloud_node.id, time.ctime(next_opening))
388 self._timer.schedule(next_opening, self._later.consider_shutdown)
389 self.last_shutdown_opening = next_opening
391 def offer_arvados_pair(self, arvados_node):
392 first_ping_s = arvados_node.get('first_ping_at')
393 if (self.arvados_node is not None) or (not first_ping_s):
395 elif ((arvados_node['ip_address'] in self.cloud_node.private_ips) and
396 (arvados_timestamp(first_ping_s) >= self.cloud_node_start_time)):
397 self._later.update_arvados_node(arvados_node)
398 return self.cloud_node.id
402 def update_cloud_node(self, cloud_node):
403 if cloud_node is not None:
404 self.cloud_node = cloud_node
405 self._later.consider_shutdown()
407 def update_arvados_node(self, arvados_node):
408 # If the cloud node's FQDN doesn't match what's in the Arvados node
409 # record, make them match.
410 # This method is a little unusual in the way it just fires off the
411 # request without checking the result or retrying errors. That's
412 # because this update happens every time we reload the Arvados node
413 # list: if a previous sync attempt failed, we'll see that the names
414 # are out of sync and just try again. ComputeNodeUpdateActor has
415 # the logic to throttle those effective retries when there's trouble.
416 if arvados_node is not None:
417 self.arvados_node = arvados_node
418 if (self._cloud_node_fqdn(self.cloud_node) !=
419 arvados_node_fqdn(self.arvados_node)):
420 self._update.sync_node(self.cloud_node, self.arvados_node)
421 self._later.consider_shutdown()