3 from __future__ import absolute_import, print_function
12 from ..clientactor import _notify_subscribers
15 def arvados_node_fqdn(arvados_node, default_hostname='dynamic.compute'):
16 hostname = arvados_node.get('hostname') or default_hostname
17 return '{}.{}'.format(hostname, arvados_node['domain'])
19 def arvados_node_mtime(node):
20 return time.mktime(time.strptime(node['modified_at'] + 'UTC',
21 '%Y-%m-%dT%H:%M:%SZ%Z')) - time.timezone
23 def timestamp_fresh(timestamp, fresh_time):
24 return (time.time() - timestamp) < fresh_time
26 class BaseComputeNodeDriver(object):
27 """Abstract base class for compute node drivers.
29 libcloud abstracts away many of the differences between cloud providers,
30 but managing compute nodes requires some cloud-specific features (e.g.,
31 on EC2 we use tags to identify compute nodes). Compute node drivers
32 are responsible for translating the node manager's cloud requests to a
33 specific cloud's vocabulary.
35 Subclasses must implement arvados_create_kwargs (to update node
36 creation kwargs with information about the specific Arvados node
37 record), sync_node, and node_start_time.
39 def __init__(self, auth_kwargs, list_kwargs, create_kwargs, driver_class):
40 self.real = driver_class(**auth_kwargs)
41 self.list_kwargs = list_kwargs
42 self.create_kwargs = create_kwargs
44 def __getattr__(self, name):
45 # Proxy non-extension methods to the real driver.
46 if (not name.startswith('_') and not name.startswith('ex_')
47 and hasattr(self.real, name)):
48 return getattr(self.real, name)
50 return super(BaseComputeNodeDriver, self).__getattr__(name)
52 def search_for(self, term, list_method, key=lambda item: item.id):
53 cache_key = (list_method, term)
54 if cache_key not in self.SEARCH_CACHE:
55 results = [item for item in getattr(self.real, list_method)()
59 raise ValueError("{} returned {} results for '{}'".format(
60 list_method, count, term))
61 self.SEARCH_CACHE[cache_key] = results[0]
62 return self.SEARCH_CACHE[cache_key]
65 return self.real.list_nodes(**self.list_kwargs)
67 def arvados_create_kwargs(self, arvados_node):
68 raise NotImplementedError("BaseComputeNodeDriver.arvados_create_kwargs")
70 def create_node(self, size, arvados_node):
71 kwargs = self.create_kwargs.copy()
72 kwargs.update(self.arvados_create_kwargs(arvados_node))
74 return self.real.create_node(**kwargs)
76 def sync_node(self, cloud_node, arvados_node):
77 # When a compute node first pings the API server, the API server
78 # will automatically assign some attributes on the corresponding
79 # node record, like hostname. This method should propagate that
80 # information back to the cloud node appropriately.
81 raise NotImplementedError("BaseComputeNodeDriver.sync_node")
84 def node_start_time(cls, node):
85 raise NotImplementedError("BaseComputeNodeDriver.node_start_time")
88 ComputeNodeDriverClass = BaseComputeNodeDriver
90 class ComputeNodeStateChangeBase(config.actor_class):
91 """Base class for actors that change a compute node's state.
93 This base class takes care of retrying changes and notifying
94 subscribers when the change is finished.
96 def __init__(self, logger_name, timer_actor, retry_wait, max_retry_wait):
97 super(ComputeNodeStateChangeBase, self).__init__()
98 self._later = self.actor_ref.proxy()
99 self._timer = timer_actor
100 self._logger = logging.getLogger(logger_name)
101 self.min_retry_wait = retry_wait
102 self.max_retry_wait = max_retry_wait
103 self.retry_wait = retry_wait
104 self.subscribers = set()
108 """Retry decorator for an actor method that makes remote requests.
110 Use this function to decorator an actor method, and pass in a
111 tuple of exceptions to catch. This decorator will schedule
112 retries of that method with exponential backoff if the
113 original method raises any of the given errors.
115 def decorator(orig_func):
116 @functools.wraps(orig_func)
117 def wrapper(self, *args, **kwargs):
119 orig_func(self, *args, **kwargs)
120 except errors as error:
121 self._logger.warning(
122 "Client error: %s - waiting %s seconds",
123 error, self.retry_wait)
124 self._timer.schedule(self.retry_wait,
128 self.retry_wait = min(self.retry_wait * 2,
131 self.retry_wait = self.min_retry_wait
136 _notify_subscribers(self._later, self.subscribers)
137 self.subscribers = None
139 def subscribe(self, subscriber):
140 if self.subscribers is None:
142 subscriber(self._later)
143 except pykka.ActorDeadError:
146 self.subscribers.add(subscriber)
149 class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
150 """Actor to create and set up a cloud compute node.
152 This actor prepares an Arvados node record for a new compute node
153 (either creating one or cleaning one passed in), then boots the
154 actual compute node. It notifies subscribers when the cloud node
155 is successfully created (the last step in the process for Node
158 def __init__(self, timer_actor, arvados_client, cloud_client,
159 cloud_size, arvados_node=None,
160 retry_wait=1, max_retry_wait=180):
161 super(ComputeNodeSetupActor, self).__init__(
162 'arvnodeman.nodeup', timer_actor, retry_wait, max_retry_wait)
163 self._arvados = arvados_client
164 self._cloud = cloud_client
165 self.cloud_size = cloud_size
166 self.arvados_node = None
167 self.cloud_node = None
168 if arvados_node is None:
169 self._later.create_arvados_node()
171 self._later.prepare_arvados_node(arvados_node)
173 @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
174 def create_arvados_node(self):
175 self.arvados_node = self._arvados.nodes().create(body={}).execute()
176 self._later.create_cloud_node()
178 @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
179 def prepare_arvados_node(self, node):
180 self.arvados_node = self._arvados.nodes().update(
182 body={'hostname': None,
185 'first_ping_at': None,
186 'last_ping_at': None,
187 'info': {'ec2_instance_id': None,
188 'last_action': "Prepared by Node Manager"}}
190 self._later.create_cloud_node()
192 @ComputeNodeStateChangeBase._retry(config.CLOUD_ERRORS)
193 def create_cloud_node(self):
194 self._logger.info("Creating cloud node with size %s.",
195 self.cloud_size.name)
196 self.cloud_node = self._cloud.create_node(self.cloud_size,
198 self._logger.info("Cloud node %s created.", self.cloud_node.id)
201 def stop_if_no_cloud_node(self):
202 if self.cloud_node is None:
206 class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
207 """Actor to shut down a compute node.
209 This actor simply destroys a cloud node, retrying as needed.
211 def __init__(self, timer_actor, cloud_client, cloud_node,
212 retry_wait=1, max_retry_wait=180):
213 super(ComputeNodeShutdownActor, self).__init__(
214 'arvnodeman.nodedown', timer_actor, retry_wait, max_retry_wait)
215 self._cloud = cloud_client
216 self.cloud_node = cloud_node
217 self._later.shutdown_node()
219 @ComputeNodeStateChangeBase._retry(config.CLOUD_ERRORS)
220 def shutdown_node(self):
221 self._cloud.destroy_node(self.cloud_node)
222 self._logger.info("Cloud node %s shut down.", self.cloud_node.id)
226 class ComputeNodeUpdateActor(config.actor_class):
227 """Actor to dispatch one-off cloud management requests.
229 This actor receives requests for small cloud updates, and
230 dispatches them to a real driver. ComputeNodeMonitorActors use
231 this to perform maintenance tasks on themselves. Having a
232 dedicated actor for this gives us the opportunity to control the
233 flow of requests; e.g., by backing off when errors occur.
235 This actor is most like a "traditional" Pykka actor: there's no
236 subscribing, but instead methods return real driver results. If
237 you're interested in those results, you should get them from the
238 Future that the proxy method returns. Be prepared to handle exceptions
239 from the cloud driver when you do.
241 def __init__(self, cloud_factory, max_retry_wait=180):
242 super(ComputeNodeUpdateActor, self).__init__()
243 self._cloud = cloud_factory()
244 self.max_retry_wait = max_retry_wait
245 self.error_streak = 0
246 self.next_request_time = time.time()
248 def _throttle_errors(orig_func):
249 @functools.wraps(orig_func)
250 def wrapper(self, *args, **kwargs):
251 throttle_time = self.next_request_time - time.time()
252 if throttle_time > 0:
253 time.sleep(throttle_time)
254 self.next_request_time = time.time()
256 result = orig_func(self, *args, **kwargs)
257 except config.CLOUD_ERRORS:
258 self.error_streak += 1
259 self.next_request_time += min(2 ** self.error_streak,
263 self.error_streak = 0
268 def sync_node(self, cloud_node, arvados_node):
269 return self._cloud.sync_node(cloud_node, arvados_node)
272 class ShutdownTimer(object):
273 """Keep track of a cloud node's shutdown windows.
275 Instantiate this class with a timestamp of when a cloud node started,
276 and a list of durations (in minutes) of when the node must not and may
277 be shut down, alternating. The class will tell you when a shutdown
278 window is open, and when the next open window will start.
280 def __init__(self, start_time, shutdown_windows):
281 # The implementation is easiest if we have an even number of windows,
282 # because then windows always alternate between open and closed.
283 # Rig that up: calculate the first shutdown window based on what's
284 # passed in. Then, if we were given an odd number of windows, merge
285 # that first window into the last one, since they both# represent
287 first_window = shutdown_windows[0]
288 shutdown_windows = list(shutdown_windows[1:])
289 self._next_opening = start_time + (60 * first_window)
290 if len(shutdown_windows) % 2:
291 shutdown_windows.append(first_window)
293 shutdown_windows[-1] += first_window
294 self.shutdown_windows = itertools.cycle([60 * n
295 for n in shutdown_windows])
296 self._open_start = self._next_opening
297 self._open_for = next(self.shutdown_windows)
299 def _advance_opening(self):
300 while self._next_opening < time.time():
301 self._open_start = self._next_opening
302 self._next_opening += self._open_for + next(self.shutdown_windows)
303 self._open_for = next(self.shutdown_windows)
305 def next_opening(self):
306 self._advance_opening()
307 return self._next_opening
309 def window_open(self):
310 self._advance_opening()
311 return 0 < (time.time() - self._open_start) < self._open_for
314 class ComputeNodeMonitorActor(config.actor_class):
315 """Actor to manage a running compute node.
317 This actor gets updates about a compute node's cloud and Arvados records.
318 It uses this information to notify subscribers when the node is eligible
321 def __init__(self, cloud_node, cloud_node_start_time, shutdown_timer,
322 timer_actor, update_actor, arvados_node=None,
323 poll_stale_after=600, node_stale_after=3600):
324 super(ComputeNodeMonitorActor, self).__init__()
325 self._later = self.actor_ref.proxy()
326 self._logger = logging.getLogger('arvnodeman.computenode')
327 self._last_log = None
328 self._shutdowns = shutdown_timer
329 self._timer = timer_actor
330 self._update = update_actor
331 self.cloud_node = cloud_node
332 self.cloud_node_start_time = cloud_node_start_time
333 self.poll_stale_after = poll_stale_after
334 self.node_stale_after = node_stale_after
335 self.subscribers = set()
336 self.arvados_node = None
337 self._later.update_arvados_node(arvados_node)
338 self.last_shutdown_opening = None
339 self._later.consider_shutdown()
341 def subscribe(self, subscriber):
342 self.subscribers.add(subscriber)
344 def _debug(self, msg, *args):
345 if msg == self._last_log:
348 self._logger.debug(msg, *args)
350 def _shutdown_eligible(self):
351 if self.arvados_node is None:
352 return timestamp_fresh(self.cloud_node_start_time,
353 self.node_stale_after)
355 return (timestamp_fresh(arvados_node_mtime(self.arvados_node),
356 self.poll_stale_after) and
357 (self.arvados_node['info'].get('slurm_state') == 'idle'))
359 def consider_shutdown(self):
360 next_opening = self._shutdowns.next_opening()
361 if self._shutdowns.window_open():
362 if self._shutdown_eligible():
363 self._debug("Node %s suggesting shutdown.", self.cloud_node.id)
364 _notify_subscribers(self._later, self.subscribers)
366 self._debug("Node %s shutdown window open but node busy.",
369 self._debug("Node %s shutdown window closed. Next at %s.",
370 self.cloud_node.id, time.ctime(next_opening))
371 if self.last_shutdown_opening != next_opening:
372 self._timer.schedule(next_opening, self._later.consider_shutdown)
373 self.last_shutdown_opening = next_opening
375 def offer_arvados_pair(self, arvados_node):
376 if self.arvados_node is not None:
378 elif arvados_node['ip_address'] in self.cloud_node.private_ips:
379 self._later.update_arvados_node(arvados_node)
380 return self.cloud_node.id
384 def update_cloud_node(self, cloud_node):
385 if cloud_node is not None:
386 self.cloud_node = cloud_node
387 self._later.consider_shutdown()
389 def update_arvados_node(self, arvados_node):
390 if arvados_node is not None:
391 self.arvados_node = arvados_node
392 new_hostname = arvados_node_fqdn(self.arvados_node)
393 if new_hostname != self.cloud_node.name:
394 self._update.sync_node(self.cloud_node, self.arvados_node)
395 self._later.consider_shutdown()