3 from __future__ import absolute_import, print_function
11 from .. import arvados_node_fqdn, arvados_node_mtime, timestamp_fresh
12 from ...clientactor import _notify_subscribers
13 from ... import config
15 class ComputeNodeStateChangeBase(config.actor_class):
16 """Base class for actors that change a compute node's state.
18 This base class takes care of retrying changes and notifying
19 subscribers when the change is finished.
21 def __init__(self, logger_name, timer_actor, retry_wait, max_retry_wait):
22 super(ComputeNodeStateChangeBase, self).__init__()
23 self._later = self.actor_ref.proxy()
24 self._timer = timer_actor
25 self._logger = logging.getLogger(logger_name)
26 self.min_retry_wait = retry_wait
27 self.max_retry_wait = max_retry_wait
28 self.retry_wait = retry_wait
29 self.subscribers = set()
33 """Retry decorator for an actor method that makes remote requests.
35 Use this function to decorator an actor method, and pass in a
36 tuple of exceptions to catch. This decorator will schedule
37 retries of that method with exponential backoff if the
38 original method raises any of the given errors.
40 def decorator(orig_func):
41 @functools.wraps(orig_func)
42 def wrapper(self, *args, **kwargs):
44 orig_func(self, *args, **kwargs)
45 except errors as error:
47 "Client error: %s - waiting %s seconds",
48 error, self.retry_wait)
49 self._timer.schedule(self.retry_wait,
53 self.retry_wait = min(self.retry_wait * 2,
56 self.retry_wait = self.min_retry_wait
61 _notify_subscribers(self._later, self.subscribers)
62 self.subscribers = None
64 def subscribe(self, subscriber):
65 if self.subscribers is None:
67 subscriber(self._later)
68 except pykka.ActorDeadError:
71 self.subscribers.add(subscriber)
74 class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
75 """Actor to create and set up a cloud compute node.
77 This actor prepares an Arvados node record for a new compute node
78 (either creating one or cleaning one passed in), then boots the
79 actual compute node. It notifies subscribers when the cloud node
80 is successfully created (the last step in the process for Node
83 def __init__(self, timer_actor, arvados_client, cloud_client,
84 cloud_size, arvados_node=None,
85 retry_wait=1, max_retry_wait=180):
86 super(ComputeNodeSetupActor, self).__init__(
87 'arvnodeman.nodeup', timer_actor, retry_wait, max_retry_wait)
88 self._arvados = arvados_client
89 self._cloud = cloud_client
90 self.cloud_size = cloud_size
91 self.arvados_node = None
92 self.cloud_node = None
93 if arvados_node is None:
94 self._later.create_arvados_node()
96 self._later.prepare_arvados_node(arvados_node)
98 @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
99 def create_arvados_node(self):
100 self.arvados_node = self._arvados.nodes().create(body={}).execute()
101 self._later.create_cloud_node()
103 @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
104 def prepare_arvados_node(self, node):
105 self.arvados_node = self._arvados.nodes().update(
107 body={'hostname': None,
110 'first_ping_at': None,
111 'last_ping_at': None,
112 'info': {'ec2_instance_id': None,
113 'last_action': "Prepared by Node Manager"}}
115 self._later.create_cloud_node()
117 @ComputeNodeStateChangeBase._retry(config.CLOUD_ERRORS)
118 def create_cloud_node(self):
119 self._logger.info("Creating cloud node with size %s.",
120 self.cloud_size.name)
121 self.cloud_node = self._cloud.create_node(self.cloud_size,
123 self._logger.info("Cloud node %s created.", self.cloud_node.id)
126 def stop_if_no_cloud_node(self):
127 if self.cloud_node is None:
131 class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
132 """Actor to shut down a compute node.
134 This actor simply destroys a cloud node, retrying as needed.
136 def __init__(self, timer_actor, cloud_client, cloud_node,
137 retry_wait=1, max_retry_wait=180):
138 super(ComputeNodeShutdownActor, self).__init__(
139 'arvnodeman.nodedown', timer_actor, retry_wait, max_retry_wait)
140 self._cloud = cloud_client
141 self.cloud_node = cloud_node
142 self._later.shutdown_node()
144 @ComputeNodeStateChangeBase._retry(config.CLOUD_ERRORS)
145 def shutdown_node(self):
146 self._cloud.destroy_node(self.cloud_node)
147 self._logger.info("Cloud node %s shut down.", self.cloud_node.id)
151 class ComputeNodeUpdateActor(config.actor_class):
152 """Actor to dispatch one-off cloud management requests.
154 This actor receives requests for small cloud updates, and
155 dispatches them to a real driver. ComputeNodeMonitorActors use
156 this to perform maintenance tasks on themselves. Having a
157 dedicated actor for this gives us the opportunity to control the
158 flow of requests; e.g., by backing off when errors occur.
160 This actor is most like a "traditional" Pykka actor: there's no
161 subscribing, but instead methods return real driver results. If
162 you're interested in those results, you should get them from the
163 Future that the proxy method returns. Be prepared to handle exceptions
164 from the cloud driver when you do.
166 def __init__(self, cloud_factory, max_retry_wait=180):
167 super(ComputeNodeUpdateActor, self).__init__()
168 self._cloud = cloud_factory()
169 self.max_retry_wait = max_retry_wait
170 self.error_streak = 0
171 self.next_request_time = time.time()
173 def _throttle_errors(orig_func):
174 @functools.wraps(orig_func)
175 def wrapper(self, *args, **kwargs):
176 throttle_time = self.next_request_time - time.time()
177 if throttle_time > 0:
178 time.sleep(throttle_time)
179 self.next_request_time = time.time()
181 result = orig_func(self, *args, **kwargs)
182 except config.CLOUD_ERRORS:
183 self.error_streak += 1
184 self.next_request_time += min(2 ** self.error_streak,
188 self.error_streak = 0
193 def sync_node(self, cloud_node, arvados_node):
194 return self._cloud.sync_node(cloud_node, arvados_node)
197 class ComputeNodeMonitorActor(config.actor_class):
198 """Actor to manage a running compute node.
200 This actor gets updates about a compute node's cloud and Arvados records.
201 It uses this information to notify subscribers when the node is eligible
204 def __init__(self, cloud_node, cloud_node_start_time, shutdown_timer,
205 timer_actor, update_actor, arvados_node=None,
206 poll_stale_after=600, node_stale_after=3600):
207 super(ComputeNodeMonitorActor, self).__init__()
208 self._later = self.actor_ref.proxy()
209 self._logger = logging.getLogger('arvnodeman.computenode')
210 self._last_log = None
211 self._shutdowns = shutdown_timer
212 self._timer = timer_actor
213 self._update = update_actor
214 self.cloud_node = cloud_node
215 self.cloud_node_start_time = cloud_node_start_time
216 self.poll_stale_after = poll_stale_after
217 self.node_stale_after = node_stale_after
218 self.subscribers = set()
219 self.arvados_node = None
220 self._later.update_arvados_node(arvados_node)
221 self.last_shutdown_opening = None
222 self._later.consider_shutdown()
224 def subscribe(self, subscriber):
225 self.subscribers.add(subscriber)
227 def _debug(self, msg, *args):
228 if msg == self._last_log:
231 self._logger.debug(msg, *args)
233 def in_state(self, *states):
234 # Return a boolean to say whether or not our Arvados node record is in
235 # one of the given states. If state information is not
236 # available--because this node has no Arvados record, the record is
237 # stale, or the record has no state information--return None.
238 if (self.arvados_node is None) or not timestamp_fresh(
239 arvados_node_mtime(self.arvados_node), self.node_stale_after):
241 state = self.arvados_node['info'].get('slurm_state')
244 result = state in states
246 result = result and not self.arvados_node['job_uuid']
249 def shutdown_eligible(self):
250 if not self._shutdowns.window_open():
252 elif self.arvados_node is None:
253 # If this is a new, unpaired node, it's eligible for
254 # shutdown--we figure there was an error during bootstrap.
255 return timestamp_fresh(self.cloud_node_start_time,
256 self.node_stale_after)
258 return self.in_state('idle')
260 def consider_shutdown(self):
261 next_opening = self._shutdowns.next_opening()
262 if self.shutdown_eligible():
263 self._debug("Node %s suggesting shutdown.", self.cloud_node.id)
264 _notify_subscribers(self._later, self.subscribers)
265 elif self._shutdowns.window_open():
266 self._debug("Node %s shutdown window open but node busy.",
268 elif self.last_shutdown_opening != next_opening:
269 self._debug("Node %s shutdown window closed. Next at %s.",
270 self.cloud_node.id, time.ctime(next_opening))
271 self._timer.schedule(next_opening, self._later.consider_shutdown)
272 self.last_shutdown_opening = next_opening
274 def offer_arvados_pair(self, arvados_node):
275 if self.arvados_node is not None:
277 elif arvados_node['ip_address'] in self.cloud_node.private_ips:
278 self._later.update_arvados_node(arvados_node)
279 return self.cloud_node.id
283 def update_cloud_node(self, cloud_node):
284 if cloud_node is not None:
285 self.cloud_node = cloud_node
286 self._later.consider_shutdown()
288 def update_arvados_node(self, arvados_node):
289 if arvados_node is not None:
290 self.arvados_node = arvados_node
291 new_hostname = arvados_node_fqdn(self.arvados_node)
292 if new_hostname != self.cloud_node.name:
293 self._update.sync_node(self.cloud_node, self.arvados_node)
294 self._later.consider_shutdown()