4380: Node Manager monitors respond to shutdown_eligible message.
[arvados.git] / services / nodemanager / arvnodeman / computenode / dispatch / __init__.py
1 #!/usr/bin/env python
2
3 from __future__ import absolute_import, print_function
4
5 import functools
6 import logging
7 import time
8
9 import pykka
10
11 from .. import arvados_node_fqdn, arvados_node_mtime, timestamp_fresh
12 from ...clientactor import _notify_subscribers
13 from ... import config
14
15 class ComputeNodeStateChangeBase(config.actor_class):
16     """Base class for actors that change a compute node's state.
17
18     This base class takes care of retrying changes and notifying
19     subscribers when the change is finished.
20     """
21     def __init__(self, logger_name, timer_actor, retry_wait, max_retry_wait):
22         super(ComputeNodeStateChangeBase, self).__init__()
23         self._later = self.actor_ref.proxy()
24         self._timer = timer_actor
25         self._logger = logging.getLogger(logger_name)
26         self.min_retry_wait = retry_wait
27         self.max_retry_wait = max_retry_wait
28         self.retry_wait = retry_wait
29         self.subscribers = set()
30
31     @staticmethod
32     def _retry(errors):
33         """Retry decorator for an actor method that makes remote requests.
34
35         Use this function to decorator an actor method, and pass in a
36         tuple of exceptions to catch.  This decorator will schedule
37         retries of that method with exponential backoff if the
38         original method raises any of the given errors.
39         """
40         def decorator(orig_func):
41             @functools.wraps(orig_func)
42             def wrapper(self, *args, **kwargs):
43                 try:
44                     orig_func(self, *args, **kwargs)
45                 except errors as error:
46                     self._logger.warning(
47                         "Client error: %s - waiting %s seconds",
48                         error, self.retry_wait)
49                     self._timer.schedule(self.retry_wait,
50                                          getattr(self._later,
51                                                  orig_func.__name__),
52                                          *args, **kwargs)
53                     self.retry_wait = min(self.retry_wait * 2,
54                                           self.max_retry_wait)
55                 else:
56                     self.retry_wait = self.min_retry_wait
57             return wrapper
58         return decorator
59
60     def _finished(self):
61         _notify_subscribers(self._later, self.subscribers)
62         self.subscribers = None
63
64     def subscribe(self, subscriber):
65         if self.subscribers is None:
66             try:
67                 subscriber(self._later)
68             except pykka.ActorDeadError:
69                 pass
70         else:
71             self.subscribers.add(subscriber)
72
73
74 class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
75     """Actor to create and set up a cloud compute node.
76
77     This actor prepares an Arvados node record for a new compute node
78     (either creating one or cleaning one passed in), then boots the
79     actual compute node.  It notifies subscribers when the cloud node
80     is successfully created (the last step in the process for Node
81     Manager to handle).
82     """
83     def __init__(self, timer_actor, arvados_client, cloud_client,
84                  cloud_size, arvados_node=None,
85                  retry_wait=1, max_retry_wait=180):
86         super(ComputeNodeSetupActor, self).__init__(
87             'arvnodeman.nodeup', timer_actor, retry_wait, max_retry_wait)
88         self._arvados = arvados_client
89         self._cloud = cloud_client
90         self.cloud_size = cloud_size
91         self.arvados_node = None
92         self.cloud_node = None
93         if arvados_node is None:
94             self._later.create_arvados_node()
95         else:
96             self._later.prepare_arvados_node(arvados_node)
97
98     @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
99     def create_arvados_node(self):
100         self.arvados_node = self._arvados.nodes().create(body={}).execute()
101         self._later.create_cloud_node()
102
103     @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
104     def prepare_arvados_node(self, node):
105         self.arvados_node = self._arvados.nodes().update(
106             uuid=node['uuid'],
107             body={'hostname': None,
108                   'ip_address': None,
109                   'slot_number': None,
110                   'first_ping_at': None,
111                   'last_ping_at': None,
112                   'info': {'ec2_instance_id': None,
113                            'last_action': "Prepared by Node Manager"}}
114             ).execute()
115         self._later.create_cloud_node()
116
117     @ComputeNodeStateChangeBase._retry(config.CLOUD_ERRORS)
118     def create_cloud_node(self):
119         self._logger.info("Creating cloud node with size %s.",
120                           self.cloud_size.name)
121         self.cloud_node = self._cloud.create_node(self.cloud_size,
122                                                   self.arvados_node)
123         self._logger.info("Cloud node %s created.", self.cloud_node.id)
124         self._finished()
125
126     def stop_if_no_cloud_node(self):
127         if self.cloud_node is None:
128             self.stop()
129
130
131 class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
132     """Actor to shut down a compute node.
133
134     This actor simply destroys a cloud node, retrying as needed.
135     """
136     def __init__(self, timer_actor, cloud_client, cloud_node,
137                  retry_wait=1, max_retry_wait=180):
138         super(ComputeNodeShutdownActor, self).__init__(
139             'arvnodeman.nodedown', timer_actor, retry_wait, max_retry_wait)
140         self._cloud = cloud_client
141         self.cloud_node = cloud_node
142         self._later.shutdown_node()
143
144     @ComputeNodeStateChangeBase._retry(config.CLOUD_ERRORS)
145     def shutdown_node(self):
146         self._cloud.destroy_node(self.cloud_node)
147         self._logger.info("Cloud node %s shut down.", self.cloud_node.id)
148         self._finished()
149
150
151 class ComputeNodeUpdateActor(config.actor_class):
152     """Actor to dispatch one-off cloud management requests.
153
154     This actor receives requests for small cloud updates, and
155     dispatches them to a real driver.  ComputeNodeMonitorActors use
156     this to perform maintenance tasks on themselves.  Having a
157     dedicated actor for this gives us the opportunity to control the
158     flow of requests; e.g., by backing off when errors occur.
159
160     This actor is most like a "traditional" Pykka actor: there's no
161     subscribing, but instead methods return real driver results.  If
162     you're interested in those results, you should get them from the
163     Future that the proxy method returns.  Be prepared to handle exceptions
164     from the cloud driver when you do.
165     """
166     def __init__(self, cloud_factory, max_retry_wait=180):
167         super(ComputeNodeUpdateActor, self).__init__()
168         self._cloud = cloud_factory()
169         self.max_retry_wait = max_retry_wait
170         self.error_streak = 0
171         self.next_request_time = time.time()
172
173     def _throttle_errors(orig_func):
174         @functools.wraps(orig_func)
175         def wrapper(self, *args, **kwargs):
176             throttle_time = self.next_request_time - time.time()
177             if throttle_time > 0:
178                 time.sleep(throttle_time)
179             self.next_request_time = time.time()
180             try:
181                 result = orig_func(self, *args, **kwargs)
182             except config.CLOUD_ERRORS:
183                 self.error_streak += 1
184                 self.next_request_time += min(2 ** self.error_streak,
185                                               self.max_retry_wait)
186                 raise
187             else:
188                 self.error_streak = 0
189                 return result
190         return wrapper
191
192     @_throttle_errors
193     def sync_node(self, cloud_node, arvados_node):
194         return self._cloud.sync_node(cloud_node, arvados_node)
195
196
197 class ComputeNodeMonitorActor(config.actor_class):
198     """Actor to manage a running compute node.
199
200     This actor gets updates about a compute node's cloud and Arvados records.
201     It uses this information to notify subscribers when the node is eligible
202     for shutdown.
203     """
204     def __init__(self, cloud_node, cloud_node_start_time, shutdown_timer,
205                  timer_actor, update_actor, arvados_node=None,
206                  poll_stale_after=600, node_stale_after=3600):
207         super(ComputeNodeMonitorActor, self).__init__()
208         self._later = self.actor_ref.proxy()
209         self._logger = logging.getLogger('arvnodeman.computenode')
210         self._last_log = None
211         self._shutdowns = shutdown_timer
212         self._timer = timer_actor
213         self._update = update_actor
214         self.cloud_node = cloud_node
215         self.cloud_node_start_time = cloud_node_start_time
216         self.poll_stale_after = poll_stale_after
217         self.node_stale_after = node_stale_after
218         self.subscribers = set()
219         self.arvados_node = None
220         self._later.update_arvados_node(arvados_node)
221         self.last_shutdown_opening = None
222         self._later.consider_shutdown()
223
224     def subscribe(self, subscriber):
225         self.subscribers.add(subscriber)
226
227     def _debug(self, msg, *args):
228         if msg == self._last_log:
229             return
230         self._last_log = msg
231         self._logger.debug(msg, *args)
232
233     def in_state(self, *states):
234         # Return a boolean to say whether or not our Arvados node record is in
235         # one of the given states.  If state information is not
236         # available--because this node has no Arvados record, the record is
237         # stale, or the record has no state information--return None.
238         if (self.arvados_node is None) or not timestamp_fresh(
239               arvados_node_mtime(self.arvados_node), self.node_stale_after):
240             return None
241         state = self.arvados_node['info'].get('slurm_state')
242         if not state:
243             return None
244         result = state in states
245         if state == 'idle':
246             result = result and not self.arvados_node['job_uuid']
247         return result
248
249     def shutdown_eligible(self):
250         if not self._shutdowns.window_open():
251             return False
252         elif self.arvados_node is None:
253             # If this is a new, unpaired node, it's eligible for
254             # shutdown--we figure there was an error during bootstrap.
255             return timestamp_fresh(self.cloud_node_start_time,
256                                    self.node_stale_after)
257         else:
258             return self.in_state('idle')
259
260     def consider_shutdown(self):
261         next_opening = self._shutdowns.next_opening()
262         if self.shutdown_eligible():
263             self._debug("Node %s suggesting shutdown.", self.cloud_node.id)
264             _notify_subscribers(self._later, self.subscribers)
265         elif self._shutdowns.window_open():
266             self._debug("Node %s shutdown window open but node busy.",
267                         self.cloud_node.id)
268         elif self.last_shutdown_opening != next_opening:
269             self._debug("Node %s shutdown window closed.  Next at %s.",
270                         self.cloud_node.id, time.ctime(next_opening))
271             self._timer.schedule(next_opening, self._later.consider_shutdown)
272             self.last_shutdown_opening = next_opening
273
274     def offer_arvados_pair(self, arvados_node):
275         if self.arvados_node is not None:
276             return None
277         elif arvados_node['ip_address'] in self.cloud_node.private_ips:
278             self._later.update_arvados_node(arvados_node)
279             return self.cloud_node.id
280         else:
281             return None
282
283     def update_cloud_node(self, cloud_node):
284         if cloud_node is not None:
285             self.cloud_node = cloud_node
286             self._later.consider_shutdown()
287
288     def update_arvados_node(self, arvados_node):
289         if arvados_node is not None:
290             self.arvados_node = arvados_node
291             new_hostname = arvados_node_fqdn(self.arvados_node)
292             if new_hostname != self.cloud_node.name:
293                 self._update.sync_node(self.cloud_node, self.arvados_node)
294             self._later.consider_shutdown()