Add 'build/' from commit '555b039609a3c8700c27767c255fdfe00eb42063'
[arvados.git] / services / nodemanager / arvnodeman / computenode / dispatch / __init__.py
1 #!/usr/bin/env python
2
3 from __future__ import absolute_import, print_function
4
5 import functools
6 import logging
7 import time
8
9 import libcloud.common.types as cloud_types
10 import pykka
11
12 from .. import \
13     arvados_node_fqdn, arvados_node_mtime, arvados_timestamp, timestamp_fresh, \
14     arvados_node_missing, RetryMixin
15 from ...clientactor import _notify_subscribers
16 from ... import config
17
18 class ComputeNodeStateChangeBase(config.actor_class, RetryMixin):
19     """Base class for actors that change a compute node's state.
20
21     This base class takes care of retrying changes and notifying
22     subscribers when the change is finished.
23     """
24     def __init__(self, cloud_client, arvados_client, timer_actor,
25                  retry_wait, max_retry_wait):
26         super(ComputeNodeStateChangeBase, self).__init__()
27         RetryMixin.__init__(self, retry_wait, max_retry_wait,
28                             None, cloud_client, timer_actor)
29         self._later = self.actor_ref.tell_proxy()
30         self._arvados = arvados_client
31         self.subscribers = set()
32
33     def _set_logger(self):
34         self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, self.actor_urn[33:]))
35
36     def on_start(self):
37         self._set_logger()
38
39     def _finished(self):
40         if self.subscribers is None:
41             raise Exception("Actor tried to finish twice")
42         _notify_subscribers(self.actor_ref.proxy(), self.subscribers)
43         self.subscribers = None
44         self._logger.info("finished")
45
46     def subscribe(self, subscriber):
47         if self.subscribers is None:
48             try:
49                 subscriber(self._later)
50             except pykka.ActorDeadError:
51                 pass
52         else:
53             self.subscribers.add(subscriber)
54
55     def _clean_arvados_node(self, arvados_node, explanation):
56         return self._arvados.nodes().update(
57             uuid=arvados_node['uuid'],
58             body={'hostname': None,
59                   'ip_address': None,
60                   'slot_number': None,
61                   'first_ping_at': None,
62                   'last_ping_at': None,
63                   'properties': {},
64                   'info': {'ec2_instance_id': None,
65                            'last_action': explanation}},
66             ).execute()
67
68     @staticmethod
69     def _finish_on_exception(orig_func):
70         @functools.wraps(orig_func)
71         def finish_wrapper(self, *args, **kwargs):
72             try:
73                 return orig_func(self, *args, **kwargs)
74             except Exception as error:
75                 self._logger.error("Actor error %s", error)
76                 self._finished()
77         return finish_wrapper
78
79
80 class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
81     """Actor to create and set up a cloud compute node.
82
83     This actor prepares an Arvados node record for a new compute node
84     (either creating one or cleaning one passed in), then boots the
85     actual compute node.  It notifies subscribers when the cloud node
86     is successfully created (the last step in the process for Node
87     Manager to handle).
88     """
89     def __init__(self, timer_actor, arvados_client, cloud_client,
90                  cloud_size, arvados_node=None,
91                  retry_wait=1, max_retry_wait=180):
92         super(ComputeNodeSetupActor, self).__init__(
93             cloud_client, arvados_client, timer_actor,
94             retry_wait, max_retry_wait)
95         self.cloud_size = cloud_size
96         self.arvados_node = None
97         self.cloud_node = None
98         if arvados_node is None:
99             self._later.create_arvados_node()
100         else:
101             self._later.prepare_arvados_node(arvados_node)
102
103     @ComputeNodeStateChangeBase._finish_on_exception
104     @RetryMixin._retry(config.ARVADOS_ERRORS)
105     def create_arvados_node(self):
106         self.arvados_node = self._arvados.nodes().create(body={}).execute()
107         self._later.create_cloud_node()
108
109     @ComputeNodeStateChangeBase._finish_on_exception
110     @RetryMixin._retry(config.ARVADOS_ERRORS)
111     def prepare_arvados_node(self, node):
112         self.arvados_node = self._clean_arvados_node(
113             node, "Prepared by Node Manager")
114         self._later.create_cloud_node()
115
116     @ComputeNodeStateChangeBase._finish_on_exception
117     @RetryMixin._retry()
118     def create_cloud_node(self):
119         self._logger.info("Sending create_node request for node size %s.",
120                           self.cloud_size.name)
121         self.cloud_node = self._cloud.create_node(self.cloud_size,
122                                                   self.arvados_node)
123         if not self.cloud_node.size:
124              self.cloud_node.size = self.cloud_size
125         self._logger.info("Cloud node %s created.", self.cloud_node.id)
126         self._later.update_arvados_node_properties()
127
128     @ComputeNodeStateChangeBase._finish_on_exception
129     @RetryMixin._retry(config.ARVADOS_ERRORS)
130     def update_arvados_node_properties(self):
131         """Tell Arvados some details about the cloud node.
132
133         Currently we only include size/price from our request, which
134         we already knew before create_cloud_node(), but doing it here
135         gives us an opportunity to provide more detail from
136         self.cloud_node, too.
137         """
138         self.arvados_node['properties']['cloud_node'] = {
139             # Note this 'size' is the node size we asked the cloud
140             # driver to create -- not necessarily equal to the size
141             # reported by the cloud driver for the node that was
142             # created.
143             'size': self.cloud_size.id,
144             'price': self.cloud_size.price,
145         }
146         self.arvados_node = self._arvados.nodes().update(
147             uuid=self.arvados_node['uuid'],
148             body={'properties': self.arvados_node['properties']},
149         ).execute()
150         self._logger.info("%s updated properties.", self.arvados_node['uuid'])
151         self._later.post_create()
152
153     @RetryMixin._retry()
154     def post_create(self):
155         self._cloud.post_create_node(self.cloud_node)
156         self._logger.info("%s post-create work done.", self.cloud_node.id)
157         self._finished()
158
159     def stop_if_no_cloud_node(self):
160         if self.cloud_node is not None:
161             return False
162         self.stop()
163         return True
164
165
166 class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
167     """Actor to shut down a compute node.
168
169     This actor simply destroys a cloud node, retrying as needed.
170     """
171     # Reasons for a shutdown to be cancelled.
172     WINDOW_CLOSED = "shutdown window closed"
173     NODE_BROKEN = "cloud failed to shut down broken node"
174
175     def __init__(self, timer_actor, cloud_client, arvados_client, node_monitor,
176                  cancellable=True, retry_wait=1, max_retry_wait=180):
177         # If a ShutdownActor is cancellable, it will ask the
178         # ComputeNodeMonitorActor if it's still eligible before taking each
179         # action, and stop the shutdown process if the node is no longer
180         # eligible.  Normal shutdowns based on job demand should be
181         # cancellable; shutdowns based on node misbehavior should not.
182         super(ComputeNodeShutdownActor, self).__init__(
183             cloud_client, arvados_client, timer_actor,
184             retry_wait, max_retry_wait)
185         self._monitor = node_monitor.proxy()
186         self.cloud_node = self._monitor.cloud_node.get()
187         self.cancellable = cancellable
188         self.cancel_reason = None
189         self.success = None
190
191     def _set_logger(self):
192         self._logger = logging.getLogger("%s.%s.%s" % (self.__class__.__name__, self.actor_urn[33:], self.cloud_node.name))
193
194     def on_start(self):
195         super(ComputeNodeShutdownActor, self).on_start()
196         self._later.shutdown_node()
197
198     def _arvados_node(self):
199         return self._monitor.arvados_node.get()
200
201     def _finished(self, success_flag=None):
202         if success_flag is not None:
203             self.success = success_flag
204         return super(ComputeNodeShutdownActor, self)._finished()
205
206     def cancel_shutdown(self, reason):
207         self.cancel_reason = reason
208         self._logger.info("Shutdown cancelled: %s.", reason)
209         self._finished(success_flag=False)
210
211     def _stop_if_window_closed(orig_func):
212         @functools.wraps(orig_func)
213         def stop_wrapper(self, *args, **kwargs):
214             if (self.cancellable and
215                   (self._monitor.shutdown_eligible().get() is not True)):
216                 self._later.cancel_shutdown(self.WINDOW_CLOSED)
217                 return None
218             else:
219                 return orig_func(self, *args, **kwargs)
220         return stop_wrapper
221
222     @ComputeNodeStateChangeBase._finish_on_exception
223     @_stop_if_window_closed
224     @RetryMixin._retry()
225     def shutdown_node(self):
226         self._logger.info("Starting shutdown")
227         if not self._cloud.destroy_node(self.cloud_node):
228             if self._cloud.broken(self.cloud_node):
229                 self._later.cancel_shutdown(self.NODE_BROKEN)
230                 return
231             else:
232                 # Force a retry.
233                 raise cloud_types.LibcloudError("destroy_node failed")
234         self._logger.info("Shutdown success")
235         arv_node = self._arvados_node()
236         if arv_node is None:
237             self._finished(success_flag=True)
238         else:
239             self._later.clean_arvados_node(arv_node)
240
241     @ComputeNodeStateChangeBase._finish_on_exception
242     @RetryMixin._retry(config.ARVADOS_ERRORS)
243     def clean_arvados_node(self, arvados_node):
244         self._clean_arvados_node(arvados_node, "Shut down by Node Manager")
245         self._finished(success_flag=True)
246
247     # Make the decorator available to subclasses.
248     _stop_if_window_closed = staticmethod(_stop_if_window_closed)
249
250
251 class ComputeNodeUpdateActor(config.actor_class):
252     """Actor to dispatch one-off cloud management requests.
253
254     This actor receives requests for small cloud updates, and
255     dispatches them to a real driver.  ComputeNodeMonitorActors use
256     this to perform maintenance tasks on themselves.  Having a
257     dedicated actor for this gives us the opportunity to control the
258     flow of requests; e.g., by backing off when errors occur.
259
260     This actor is most like a "traditional" Pykka actor: there's no
261     subscribing, but instead methods return real driver results.  If
262     you're interested in those results, you should get them from the
263     Future that the proxy method returns.  Be prepared to handle exceptions
264     from the cloud driver when you do.
265     """
266     def __init__(self, cloud_factory, max_retry_wait=180):
267         super(ComputeNodeUpdateActor, self).__init__()
268         self._cloud = cloud_factory()
269         self.max_retry_wait = max_retry_wait
270         self.error_streak = 0
271         self.next_request_time = time.time()
272
273     def _throttle_errors(orig_func):
274         @functools.wraps(orig_func)
275         def throttle_wrapper(self, *args, **kwargs):
276             throttle_time = self.next_request_time - time.time()
277             if throttle_time > 0:
278                 time.sleep(throttle_time)
279             self.next_request_time = time.time()
280             try:
281                 result = orig_func(self, *args, **kwargs)
282             except Exception as error:
283                 self.error_streak += 1
284                 self.next_request_time += min(2 ** self.error_streak,
285                                               self.max_retry_wait)
286                 raise
287             else:
288                 self.error_streak = 0
289                 return result
290         return throttle_wrapper
291
292     @_throttle_errors
293     def sync_node(self, cloud_node, arvados_node):
294         return self._cloud.sync_node(cloud_node, arvados_node)
295
296
297 class ComputeNodeMonitorActor(config.actor_class):
298     """Actor to manage a running compute node.
299
300     This actor gets updates about a compute node's cloud and Arvados records.
301     It uses this information to notify subscribers when the node is eligible
302     for shutdown.
303     """
304     def __init__(self, cloud_node, cloud_node_start_time, shutdown_timer,
305                  cloud_fqdn_func, timer_actor, update_actor, cloud_client,
306                  arvados_node=None, poll_stale_after=600, node_stale_after=3600,
307                  boot_fail_after=1800
308     ):
309         super(ComputeNodeMonitorActor, self).__init__()
310         self._later = self.actor_ref.tell_proxy()
311         self._last_log = None
312         self._shutdowns = shutdown_timer
313         self._cloud_node_fqdn = cloud_fqdn_func
314         self._timer = timer_actor
315         self._update = update_actor
316         self._cloud = cloud_client
317         self.cloud_node = cloud_node
318         self.cloud_node_start_time = cloud_node_start_time
319         self.poll_stale_after = poll_stale_after
320         self.node_stale_after = node_stale_after
321         self.boot_fail_after = boot_fail_after
322         self.subscribers = set()
323         self.arvados_node = None
324         self._later.update_arvados_node(arvados_node)
325         self.last_shutdown_opening = None
326         self._later.consider_shutdown()
327
328     def _set_logger(self):
329         self._logger = logging.getLogger("%s.%s.%s" % (self.__class__.__name__, self.actor_urn[33:], self.cloud_node.name))
330
331     def on_start(self):
332         self._set_logger()
333         self._timer.schedule(self.cloud_node_start_time + self.boot_fail_after, self._later.consider_shutdown)
334
335     def subscribe(self, subscriber):
336         self.subscribers.add(subscriber)
337
338     def _debug(self, msg, *args):
339         if msg == self._last_log:
340             return
341         self._last_log = msg
342         self._logger.debug(msg, *args)
343
344     def in_state(self, *states):
345         # Return a boolean to say whether or not our Arvados node record is in
346         # one of the given states.  If state information is not
347         # available--because this node has no Arvados record, the record is
348         # stale, or the record has no state information--return None.
349         if (self.arvados_node is None) or not timestamp_fresh(
350               arvados_node_mtime(self.arvados_node), self.node_stale_after):
351             return None
352         state = self.arvados_node['crunch_worker_state']
353         if not state:
354             return None
355         result = state in states
356         if state == 'idle':
357             result = result and not self.arvados_node['job_uuid']
358         return result
359
360     def shutdown_eligible(self):
361         """Return True if eligible for shutdown, or a string explaining why the node
362         is not eligible for shutdown."""
363
364         if not self._shutdowns.window_open():
365             return "shutdown window is not open."
366         if self.arvados_node is None:
367             # Node is unpaired.
368             # If it hasn't pinged Arvados after boot_fail seconds, shut it down
369             if timestamp_fresh(self.cloud_node_start_time, self.boot_fail_after):
370                 return "node is still booting, will be considered a failed boot at %s" % time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(self.cloud_node_start_time + self.boot_fail_after))
371             else:
372                 return True
373         missing = arvados_node_missing(self.arvados_node, self.node_stale_after)
374         if missing and self._cloud.broken(self.cloud_node):
375             # Node is paired, but Arvados says it is missing and the cloud says the node
376             # is in an error state, so shut it down.
377             return True
378         if missing is None and self._cloud.broken(self.cloud_node):
379             self._logger.info(
380                 "Cloud node considered 'broken' but paired node %s last_ping_at is None, " +
381                 "cannot check node_stale_after (node may be shut down and we just haven't gotten the message yet).",
382                 self.arvados_node['uuid'])
383         if self.in_state('idle'):
384             return True
385         else:
386             return "node is not idle."
387
388     def consider_shutdown(self):
389         try:
390             next_opening = self._shutdowns.next_opening()
391             eligible = self.shutdown_eligible()
392             if eligible is True:
393                 self._debug("Suggesting shutdown.")
394                 _notify_subscribers(self.actor_ref.proxy(), self.subscribers)
395             elif self._shutdowns.window_open():
396                 self._debug("Cannot shut down because %s", eligible)
397             elif self.last_shutdown_opening != next_opening:
398                 self._debug("Shutdown window closed.  Next at %s.",
399                             time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(next_opening)))
400                 self._timer.schedule(next_opening, self._later.consider_shutdown)
401                 self.last_shutdown_opening = next_opening
402         except Exception:
403             self._logger.exception("Unexpected exception")
404
405     def offer_arvados_pair(self, arvados_node):
406         first_ping_s = arvados_node.get('first_ping_at')
407         if (self.arvados_node is not None) or (not first_ping_s):
408             return None
409         elif ((arvados_node['ip_address'] in self.cloud_node.private_ips) and
410               (arvados_timestamp(first_ping_s) >= self.cloud_node_start_time)):
411             self._later.update_arvados_node(arvados_node)
412             return self.cloud_node.id
413         else:
414             return None
415
416     def update_cloud_node(self, cloud_node):
417         if cloud_node is not None:
418             self.cloud_node = cloud_node
419             self._later.consider_shutdown()
420
421     def update_arvados_node(self, arvados_node):
422         # If the cloud node's FQDN doesn't match what's in the Arvados node
423         # record, make them match.
424         # This method is a little unusual in the way it just fires off the
425         # request without checking the result or retrying errors.  That's
426         # because this update happens every time we reload the Arvados node
427         # list: if a previous sync attempt failed, we'll see that the names
428         # are out of sync and just try again.  ComputeNodeUpdateActor has
429         # the logic to throttle those effective retries when there's trouble.
430         if arvados_node is not None:
431             self.arvados_node = arvados_node
432             if (self._cloud_node_fqdn(self.cloud_node) !=
433                   arvados_node_fqdn(self.arvados_node)):
434                 self._update.sync_node(self.cloud_node, self.arvados_node)
435             self._later.consider_shutdown()