3 from __future__ import absolute_import, print_function
12 from ..clientactor import _notify_subscribers
15 def arvados_node_fqdn(arvados_node, default_hostname='dynamic.compute'):
16 hostname = arvados_node.get('hostname') or default_hostname
17 return '{}.{}'.format(hostname, arvados_node['domain'])
19 def arvados_node_mtime(node):
20 return time.mktime(time.strptime(node['modified_at'] + 'UTC',
21 '%Y-%m-%dT%H:%M:%SZ%Z')) - time.timezone
23 def timestamp_fresh(timestamp, fresh_time):
24 return (time.time() - timestamp) < fresh_time
27 """Retry decorator for an actor method that makes remote requests.
29 Use this function to decorator an actor method, and pass in a tuple of
30 exceptions to catch. This decorator will schedule retries of that method
31 with exponential backoff if the original method raises any of the given
34 def decorator(orig_func):
35 @functools.wraps(orig_func)
36 def wrapper(self, *args, **kwargs):
38 orig_func(self, *args, **kwargs)
39 except errors as error:
41 "Client error: %s - waiting %s seconds",
42 error, self.retry_wait)
43 self._timer.schedule(self.retry_wait,
44 getattr(self._later, orig_func.__name__),
46 self.retry_wait = min(self.retry_wait * 2,
49 self.retry_wait = self.min_retry_wait
53 class BaseComputeNodeDriver(object):
54 """Abstract base class for compute node drivers.
56 libcloud abstracts away many of the differences between cloud providers,
57 but managing compute nodes requires some cloud-specific features (e.g.,
58 on EC2 we use tags to identify compute nodes). Compute node drivers
59 are responsible for translating the node manager's cloud requests to a
60 specific cloud's vocabulary.
62 Subclasses must implement arvados_create_kwargs (to update node
63 creation kwargs with information about the specific Arvados node
64 record), sync_node, and node_start_time.
66 def __init__(self, auth_kwargs, list_kwargs, create_kwargs, driver_class):
67 self.real = driver_class(**auth_kwargs)
68 self.list_kwargs = list_kwargs
69 self.create_kwargs = create_kwargs
71 def __getattr__(self, name):
72 # Proxy non-extension methods to the real driver.
73 if (not name.startswith('_') and not name.startswith('ex_')
74 and hasattr(self.real, name)):
75 return getattr(self.real, name)
77 return super(BaseComputeNodeDriver, self).__getattr__(name)
79 def search_for(self, term, list_method, key=lambda item: item.id):
80 cache_key = (list_method, term)
81 if cache_key not in self.SEARCH_CACHE:
82 results = [item for item in getattr(self.real, list_method)()
86 raise ValueError("{} returned {} results for '{}'".format(
87 list_method, count, term))
88 self.SEARCH_CACHE[cache_key] = results[0]
89 return self.SEARCH_CACHE[cache_key]
92 return self.real.list_nodes(**self.list_kwargs)
94 def arvados_create_kwargs(self, arvados_node):
95 raise NotImplementedError("BaseComputeNodeDriver.arvados_create_kwargs")
97 def create_node(self, size, arvados_node):
98 kwargs = self.create_kwargs.copy()
99 kwargs.update(self.arvados_create_kwargs(arvados_node))
100 kwargs['size'] = size
101 return self.real.create_node(**kwargs)
103 def sync_node(self, cloud_node, arvados_node):
104 # When a compute node first pings the API server, the API server
105 # will automatically assign some attributes on the corresponding
106 # node record, like hostname. This method should propagate that
107 # information back to the cloud node appropriately.
108 raise NotImplementedError("BaseComputeNodeDriver.sync_node")
111 def node_start_time(cls, node):
112 raise NotImplementedError("BaseComputeNodeDriver.node_start_time")
115 ComputeNodeDriverClass = BaseComputeNodeDriver
117 class ComputeNodeSetupActor(config.actor_class):
118 """Actor to create and set up a cloud compute node.
120 This actor prepares an Arvados node record for a new compute node
121 (either creating one or cleaning one passed in), then boots the
122 actual compute node. It notifies subscribers when the cloud node
123 is successfully created (the last step in the process for Node
126 def __init__(self, timer_actor, arvados_client, cloud_client,
127 cloud_size, arvados_node=None,
128 retry_wait=1, max_retry_wait=180):
129 super(ComputeNodeSetupActor, self).__init__()
130 self._timer = timer_actor
131 self._arvados = arvados_client
132 self._cloud = cloud_client
133 self._later = self.actor_ref.proxy()
134 self._logger = logging.getLogger('arvnodeman.nodeup')
135 self.cloud_size = cloud_size
136 self.subscribers = set()
137 self.min_retry_wait = retry_wait
138 self.max_retry_wait = max_retry_wait
139 self.retry_wait = retry_wait
140 self.arvados_node = None
141 self.cloud_node = None
142 if arvados_node is None:
143 self._later.create_arvados_node()
145 self._later.prepare_arvados_node(arvados_node)
147 @_retry(config.ARVADOS_ERRORS)
148 def create_arvados_node(self):
149 self.arvados_node = self._arvados.nodes().create(body={}).execute()
150 self._later.create_cloud_node()
152 @_retry(config.ARVADOS_ERRORS)
153 def prepare_arvados_node(self, node):
154 self.arvados_node = self._arvados.nodes().update(
156 body={'hostname': None,
159 'first_ping_at': None,
160 'last_ping_at': None,
161 'info': {'ec2_instance_id': None,
162 'last_action': "Prepared by Node Manager"}}
164 self._later.create_cloud_node()
166 @_retry(config.CLOUD_ERRORS)
167 def create_cloud_node(self):
168 self._logger.info("Creating cloud node with size %s.",
169 self.cloud_size.name)
170 self.cloud_node = self._cloud.create_node(self.cloud_size,
172 self._logger.info("Cloud node %s created.", self.cloud_node.id)
173 _notify_subscribers(self._later, self.subscribers)
174 self.subscribers = None
176 def stop_if_no_cloud_node(self):
177 if self.cloud_node is None:
180 def subscribe(self, subscriber):
181 if self.subscribers is None:
183 subscriber(self._later)
184 except pykka.ActorDeadError:
187 self.subscribers.add(subscriber)
190 class ComputeNodeShutdownActor(config.actor_class):
191 """Actor to shut down a compute node.
193 This actor simply destroys a cloud node, retrying as needed.
195 def __init__(self, timer_actor, cloud_client, cloud_node,
196 retry_wait=1, max_retry_wait=180):
197 super(ComputeNodeShutdownActor, self).__init__()
198 self._timer = timer_actor
199 self._cloud = cloud_client
200 self._later = self.actor_ref.proxy()
201 self._logger = logging.getLogger('arvnodeman.nodedown')
202 self.cloud_node = cloud_node
203 self.min_retry_wait = retry_wait
204 self.max_retry_wait = max_retry_wait
205 self.retry_wait = retry_wait
206 self._later.shutdown_node()
208 @_retry(config.CLOUD_ERRORS)
209 def shutdown_node(self):
210 self._cloud.destroy_node(self.cloud_node)
211 self._logger.info("Cloud node %s shut down.", self.cloud_node.id)
214 class ComputeNodeUpdateActor(config.actor_class):
215 """Actor to dispatch one-off cloud management requests.
217 This actor receives requests for small cloud updates, and
218 dispatches them to a real driver. ComputeNodeMonitorActors use
219 this to perform maintenance tasks on themselves. Having a
220 dedicated actor for this gives us the opportunity to control the
221 flow of requests; e.g., by backing off when errors occur.
223 This actor is most like a "traditional" Pykka actor: there's no
224 subscribing, but instead methods return real driver results. If
225 you're interested in those results, you should get them from the
226 Future that the proxy method returns. Be prepared to handle exceptions
227 from the cloud driver when you do.
229 def __init__(self, cloud_factory, max_retry_wait=180):
230 super(ComputeNodeUpdateActor, self).__init__()
231 self._cloud = cloud_factory()
232 self.max_retry_wait = max_retry_wait
233 self.error_streak = 0
234 self.next_request_time = time.time()
236 def _throttle_errors(orig_func):
237 @functools.wraps(orig_func)
238 def wrapper(self, *args, **kwargs):
239 throttle_time = self.next_request_time - time.time()
240 if throttle_time > 0:
241 time.sleep(throttle_time)
242 self.next_request_time = time.time()
244 result = orig_func(self, *args, **kwargs)
245 except config.CLOUD_ERRORS:
246 self.error_streak += 1
247 self.next_request_time += min(2 ** self.error_streak,
251 self.error_streak = 0
256 def sync_node(self, cloud_node, arvados_node):
257 return self._cloud.sync_node(cloud_node, arvados_node)
260 class ShutdownTimer(object):
261 """Keep track of a cloud node's shutdown windows.
263 Instantiate this class with a timestamp of when a cloud node started,
264 and a list of durations (in minutes) of when the node must not and may
265 be shut down, alternating. The class will tell you when a shutdown
266 window is open, and when the next open window will start.
268 def __init__(self, start_time, shutdown_windows):
269 # The implementation is easiest if we have an even number of windows,
270 # because then windows always alternate between open and closed.
271 # Rig that up: calculate the first shutdown window based on what's
272 # passed in. Then, if we were given an odd number of windows, merge
273 # that first window into the last one, since they both# represent
275 first_window = shutdown_windows[0]
276 shutdown_windows = list(shutdown_windows[1:])
277 self._next_opening = start_time + (60 * first_window)
278 if len(shutdown_windows) % 2:
279 shutdown_windows.append(first_window)
281 shutdown_windows[-1] += first_window
282 self.shutdown_windows = itertools.cycle([60 * n
283 for n in shutdown_windows])
284 self._open_start = self._next_opening
285 self._open_for = next(self.shutdown_windows)
287 def _advance_opening(self):
288 while self._next_opening < time.time():
289 self._open_start = self._next_opening
290 self._next_opening += self._open_for + next(self.shutdown_windows)
291 self._open_for = next(self.shutdown_windows)
293 def next_opening(self):
294 self._advance_opening()
295 return self._next_opening
297 def window_open(self):
298 self._advance_opening()
299 return 0 < (time.time() - self._open_start) < self._open_for
302 class ComputeNodeMonitorActor(config.actor_class):
303 """Actor to manage a running compute node.
305 This actor gets updates about a compute node's cloud and Arvados records.
306 It uses this information to notify subscribers when the node is eligible
309 def __init__(self, cloud_node, cloud_node_start_time, shutdown_timer,
310 timer_actor, update_actor, arvados_node=None,
311 poll_stale_after=600, node_stale_after=3600):
312 super(ComputeNodeMonitorActor, self).__init__()
313 self._later = self.actor_ref.proxy()
314 self._logger = logging.getLogger('arvnodeman.computenode')
315 self._last_log = None
316 self._shutdowns = shutdown_timer
317 self._timer = timer_actor
318 self._update = update_actor
319 self.cloud_node = cloud_node
320 self.cloud_node_start_time = cloud_node_start_time
321 self.poll_stale_after = poll_stale_after
322 self.node_stale_after = node_stale_after
323 self.subscribers = set()
324 self.arvados_node = None
325 self._later.update_arvados_node(arvados_node)
326 self.last_shutdown_opening = None
327 self._later.consider_shutdown()
329 def subscribe(self, subscriber):
330 self.subscribers.add(subscriber)
332 def _debug(self, msg, *args):
333 if msg == self._last_log:
336 self._logger.debug(msg, *args)
338 def _shutdown_eligible(self):
339 if self.arvados_node is None:
340 return timestamp_fresh(self.cloud_node_start_time,
341 self.node_stale_after)
343 return (timestamp_fresh(arvados_node_mtime(self.arvados_node),
344 self.poll_stale_after) and
345 (self.arvados_node['info'].get('slurm_state') == 'idle'))
347 def consider_shutdown(self):
348 next_opening = self._shutdowns.next_opening()
349 if self._shutdowns.window_open():
350 if self._shutdown_eligible():
351 self._debug("Node %s suggesting shutdown.", self.cloud_node.id)
352 _notify_subscribers(self._later, self.subscribers)
354 self._debug("Node %s shutdown window open but node busy.",
357 self._debug("Node %s shutdown window closed. Next at %s.",
358 self.cloud_node.id, time.ctime(next_opening))
359 if self.last_shutdown_opening != next_opening:
360 self._timer.schedule(next_opening, self._later.consider_shutdown)
361 self.last_shutdown_opening = next_opening
363 def offer_arvados_pair(self, arvados_node):
364 if self.arvados_node is not None:
366 elif arvados_node['ip_address'] in self.cloud_node.private_ips:
367 self._later.update_arvados_node(arvados_node)
368 return self.cloud_node.id
372 def update_cloud_node(self, cloud_node):
373 if cloud_node is not None:
374 self.cloud_node = cloud_node
375 self._later.consider_shutdown()
377 def update_arvados_node(self, arvados_node):
378 if arvados_node is not None:
379 self.arvados_node = arvados_node
380 new_hostname = arvados_node_fqdn(self.arvados_node)
381 if new_hostname != self.cloud_node.name:
382 self._update.sync_node(self.cloud_node, self.arvados_node)
383 self._later.consider_shutdown()