7667: Combine polling logs into fewer lines for less noise. Adjust message
[arvados.git] / services / nodemanager / arvnodeman / computenode / dispatch / __init__.py
1 #!/usr/bin/env python
2
3 from __future__ import absolute_import, print_function
4
5 import functools
6 import logging
7 import time
8
9 import libcloud.common.types as cloud_types
10 import pykka
11
12 from .. import \
13     arvados_node_fqdn, arvados_node_mtime, arvados_timestamp, timestamp_fresh, \
14     arvados_node_missing, RetryMixin
15 from ...clientactor import _notify_subscribers
16 from ... import config
17
18 class ComputeNodeStateChangeBase(config.actor_class, RetryMixin):
19     """Base class for actors that change a compute node's state.
20
21     This base class takes care of retrying changes and notifying
22     subscribers when the change is finished.
23     """
24     def __init__(self, cloud_client, arvados_client, timer_actor,
25                  retry_wait, max_retry_wait):
26         super(ComputeNodeStateChangeBase, self).__init__()
27         RetryMixin.__init__(self, retry_wait, max_retry_wait,
28                             None, cloud_client, timer_actor)
29         self._later = self.actor_ref.proxy()
30         self._arvados = arvados_client
31         self.subscribers = set()
32
33     def _set_logger(self):
34         self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, self.actor_urn[33:]))
35
36     def on_start(self):
37         self._set_logger()
38
39     def _finished(self):
40         _notify_subscribers(self._later, self.subscribers)
41         self.subscribers = None
42         self._logger.info("finished")
43
44     def subscribe(self, subscriber):
45         if self.subscribers is None:
46             try:
47                 subscriber(self._later)
48             except pykka.ActorDeadError:
49                 pass
50         else:
51             self.subscribers.add(subscriber)
52
53     def _clean_arvados_node(self, arvados_node, explanation):
54         return self._arvados.nodes().update(
55             uuid=arvados_node['uuid'],
56             body={'hostname': None,
57                   'ip_address': None,
58                   'slot_number': None,
59                   'first_ping_at': None,
60                   'last_ping_at': None,
61                   'properties': {},
62                   'info': {'ec2_instance_id': None,
63                            'last_action': explanation}},
64             ).execute()
65
66     @staticmethod
67     def _finish_on_exception(orig_func):
68         @functools.wraps(orig_func)
69         def finish_wrapper(self, *args, **kwargs):
70             try:
71                 return orig_func(self, *args, **kwargs)
72             except Exception as error:
73                 self._logger.error("Actor error %s", error)
74                 self._finished()
75         return finish_wrapper
76
77
78 class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
79     """Actor to create and set up a cloud compute node.
80
81     This actor prepares an Arvados node record for a new compute node
82     (either creating one or cleaning one passed in), then boots the
83     actual compute node.  It notifies subscribers when the cloud node
84     is successfully created (the last step in the process for Node
85     Manager to handle).
86     """
87     def __init__(self, timer_actor, arvados_client, cloud_client,
88                  cloud_size, arvados_node=None,
89                  retry_wait=1, max_retry_wait=180):
90         super(ComputeNodeSetupActor, self).__init__(
91             cloud_client, arvados_client, timer_actor,
92             retry_wait, max_retry_wait)
93         self.cloud_size = cloud_size
94         self.arvados_node = None
95         self.cloud_node = None
96         if arvados_node is None:
97             self._later.create_arvados_node()
98         else:
99             self._later.prepare_arvados_node(arvados_node)
100
101     @ComputeNodeStateChangeBase._finish_on_exception
102     @RetryMixin._retry(config.ARVADOS_ERRORS)
103     def create_arvados_node(self):
104         self.arvados_node = self._arvados.nodes().create(body={}).execute()
105         self._later.create_cloud_node()
106
107     @ComputeNodeStateChangeBase._finish_on_exception
108     @RetryMixin._retry(config.ARVADOS_ERRORS)
109     def prepare_arvados_node(self, node):
110         self.arvados_node = self._clean_arvados_node(
111             node, "Prepared by Node Manager")
112         self._later.create_cloud_node()
113
114     @ComputeNodeStateChangeBase._finish_on_exception
115     @RetryMixin._retry()
116     def create_cloud_node(self):
117         self._logger.info("Sending create_node request for node size %s.",
118                           self.cloud_size.name)
119         self.cloud_node = self._cloud.create_node(self.cloud_size,
120                                                   self.arvados_node)
121         if not self.cloud_node.size:
122              self.cloud_node.size = self.cloud_size
123         self._logger.info("Cloud node %s created.", self.cloud_node.id)
124         self._later.update_arvados_node_properties()
125
126     @ComputeNodeStateChangeBase._finish_on_exception
127     @RetryMixin._retry(config.ARVADOS_ERRORS)
128     def update_arvados_node_properties(self):
129         """Tell Arvados some details about the cloud node.
130
131         Currently we only include size/price from our request, which
132         we already knew before create_cloud_node(), but doing it here
133         gives us an opportunity to provide more detail from
134         self.cloud_node, too.
135         """
136         self.arvados_node['properties']['cloud_node'] = {
137             # Note this 'size' is the node size we asked the cloud
138             # driver to create -- not necessarily equal to the size
139             # reported by the cloud driver for the node that was
140             # created.
141             'size': self.cloud_size.id,
142             'price': self.cloud_size.price,
143         }
144         self.arvados_node = self._arvados.nodes().update(
145             uuid=self.arvados_node['uuid'],
146             body={'properties': self.arvados_node['properties']},
147         ).execute()
148         self._logger.info("%s updated properties.", self.arvados_node['uuid'])
149         self._later.post_create()
150
151     @RetryMixin._retry()
152     def post_create(self):
153         self._cloud.post_create_node(self.cloud_node)
154         self._logger.info("%s post-create work done.", self.cloud_node.id)
155         self._finished()
156
157     def stop_if_no_cloud_node(self):
158         if self.cloud_node is not None:
159             return False
160         self.stop()
161         return True
162
163
164 class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
165     """Actor to shut down a compute node.
166
167     This actor simply destroys a cloud node, retrying as needed.
168     """
169     # Reasons for a shutdown to be cancelled.
170     WINDOW_CLOSED = "shutdown window closed"
171     NODE_BROKEN = "cloud failed to shut down broken node"
172
173     def __init__(self, timer_actor, cloud_client, arvados_client, node_monitor,
174                  cancellable=True, retry_wait=1, max_retry_wait=180):
175         # If a ShutdownActor is cancellable, it will ask the
176         # ComputeNodeMonitorActor if it's still eligible before taking each
177         # action, and stop the shutdown process if the node is no longer
178         # eligible.  Normal shutdowns based on job demand should be
179         # cancellable; shutdowns based on node misbehavior should not.
180         super(ComputeNodeShutdownActor, self).__init__(
181             cloud_client, arvados_client, timer_actor,
182             retry_wait, max_retry_wait)
183         self._monitor = node_monitor.proxy()
184         self.cloud_node = self._monitor.cloud_node.get()
185         self.cancellable = cancellable
186         self.cancel_reason = None
187         self.success = None
188
189     def _set_logger(self):
190         self._logger = logging.getLogger("%s.%s.%s" % (self.__class__.__name__, self.actor_urn[33:], self.cloud_node.name))
191
192     def on_start(self):
193         super(ComputeNodeShutdownActor, self).on_start()
194         self._later.shutdown_node()
195
196     def _arvados_node(self):
197         return self._monitor.arvados_node.get()
198
199     def _finished(self, success_flag=None):
200         if success_flag is not None:
201             self.success = success_flag
202         return super(ComputeNodeShutdownActor, self)._finished()
203
204     def cancel_shutdown(self, reason):
205         self.cancel_reason = reason
206         self._logger.info("Shutdown cancelled: %s.",
207                           self.cloud_node.id, reason)
208         self._finished(success_flag=False)
209
210     def _stop_if_window_closed(orig_func):
211         @functools.wraps(orig_func)
212         def stop_wrapper(self, *args, **kwargs):
213             if (self.cancellable and
214                   (self._monitor.shutdown_eligible().get() is not True)):
215                 self._later.cancel_shutdown(self.WINDOW_CLOSED)
216                 return None
217             else:
218                 return orig_func(self, *args, **kwargs)
219         return stop_wrapper
220
221     @ComputeNodeStateChangeBase._finish_on_exception
222     @_stop_if_window_closed
223     @RetryMixin._retry()
224     def shutdown_node(self):
225         self._logger.info("Starting shutdown")
226         if not self._cloud.destroy_node(self.cloud_node):
227             if self._cloud.broken(self.cloud_node):
228                 self._later.cancel_shutdown(self.NODE_BROKEN)
229             else:
230                 # Force a retry.
231                 raise cloud_types.LibcloudError("destroy_node failed")
232         self._logger.info("Shutdown success")
233         arv_node = self._arvados_node()
234         if arv_node is None:
235             self._finished(success_flag=True)
236         else:
237             self._later.clean_arvados_node(arv_node)
238
239     @ComputeNodeStateChangeBase._finish_on_exception
240     @RetryMixin._retry(config.ARVADOS_ERRORS)
241     def clean_arvados_node(self, arvados_node):
242         self._clean_arvados_node(arvados_node, "Shut down by Node Manager")
243         self._finished(success_flag=True)
244
245     # Make the decorator available to subclasses.
246     _stop_if_window_closed = staticmethod(_stop_if_window_closed)
247
248
249 class ComputeNodeUpdateActor(config.actor_class):
250     """Actor to dispatch one-off cloud management requests.
251
252     This actor receives requests for small cloud updates, and
253     dispatches them to a real driver.  ComputeNodeMonitorActors use
254     this to perform maintenance tasks on themselves.  Having a
255     dedicated actor for this gives us the opportunity to control the
256     flow of requests; e.g., by backing off when errors occur.
257
258     This actor is most like a "traditional" Pykka actor: there's no
259     subscribing, but instead methods return real driver results.  If
260     you're interested in those results, you should get them from the
261     Future that the proxy method returns.  Be prepared to handle exceptions
262     from the cloud driver when you do.
263     """
264     def __init__(self, cloud_factory, max_retry_wait=180):
265         super(ComputeNodeUpdateActor, self).__init__()
266         self._cloud = cloud_factory()
267         self.max_retry_wait = max_retry_wait
268         self.error_streak = 0
269         self.next_request_time = time.time()
270
271     def _throttle_errors(orig_func):
272         @functools.wraps(orig_func)
273         def throttle_wrapper(self, *args, **kwargs):
274             throttle_time = self.next_request_time - time.time()
275             if throttle_time > 0:
276                 time.sleep(throttle_time)
277             self.next_request_time = time.time()
278             try:
279                 result = orig_func(self, *args, **kwargs)
280             except Exception as error:
281                 self.error_streak += 1
282                 self.next_request_time += min(2 ** self.error_streak,
283                                               self.max_retry_wait)
284                 raise
285             else:
286                 self.error_streak = 0
287                 return result
288         return throttle_wrapper
289
290     @_throttle_errors
291     def sync_node(self, cloud_node, arvados_node):
292         return self._cloud.sync_node(cloud_node, arvados_node)
293
294
295 class ComputeNodeMonitorActor(config.actor_class):
296     """Actor to manage a running compute node.
297
298     This actor gets updates about a compute node's cloud and Arvados records.
299     It uses this information to notify subscribers when the node is eligible
300     for shutdown.
301     """
302     def __init__(self, cloud_node, cloud_node_start_time, shutdown_timer,
303                  cloud_fqdn_func, timer_actor, update_actor, cloud_client,
304                  arvados_node=None, poll_stale_after=600, node_stale_after=3600,
305                  boot_fail_after=1800
306     ):
307         super(ComputeNodeMonitorActor, self).__init__()
308         self._later = self.actor_ref.proxy()
309         self._last_log = None
310         self._shutdowns = shutdown_timer
311         self._cloud_node_fqdn = cloud_fqdn_func
312         self._timer = timer_actor
313         self._update = update_actor
314         self._cloud = cloud_client
315         self.cloud_node = cloud_node
316         self.cloud_node_start_time = cloud_node_start_time
317         self.poll_stale_after = poll_stale_after
318         self.node_stale_after = node_stale_after
319         self.boot_fail_after = boot_fail_after
320         self.subscribers = set()
321         self.arvados_node = None
322         self._later.update_arvados_node(arvados_node)
323         self.last_shutdown_opening = None
324         self._later.consider_shutdown()
325
326     def _set_logger(self):
327         self._logger = logging.getLogger("%s.%s.%s" % (self.__class__.__name__, self.actor_urn[33:], self.cloud_node.name))
328
329     def on_start(self):
330         self._set_logger()
331         self._timer.schedule(self.cloud_node_start_time + self.boot_fail_after, self._later.consider_shutdown)
332
333     def subscribe(self, subscriber):
334         self.subscribers.add(subscriber)
335
336     def _debug(self, msg, *args):
337         if msg == self._last_log:
338             return
339         self._last_log = msg
340         self._logger.debug(msg, *args)
341
342     def in_state(self, *states):
343         # Return a boolean to say whether or not our Arvados node record is in
344         # one of the given states.  If state information is not
345         # available--because this node has no Arvados record, the record is
346         # stale, or the record has no state information--return None.
347         if (self.arvados_node is None) or not timestamp_fresh(
348               arvados_node_mtime(self.arvados_node), self.node_stale_after):
349             return None
350         state = self.arvados_node['crunch_worker_state']
351         if not state:
352             return None
353         result = state in states
354         if state == 'idle':
355             result = result and not self.arvados_node['job_uuid']
356         return result
357
358     def shutdown_eligible(self):
359         """Return True if eligible for shutdown, or a string explaining why the node
360         is not eligible for shutdown."""
361
362         if not self._shutdowns.window_open():
363             return "shutdown window is not open."
364         if self.arvados_node is None:
365             # Node is unpaired.
366             # If it hasn't pinged Arvados after boot_fail seconds, shut it down
367             if timestamp_fresh(self.cloud_node_start_time, self.boot_fail_after):
368                 return "node is still booting, will be considered a failed boot at %s" % time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(self.cloud_node_start_time + self.boot_fail_after))
369             else:
370                 return True
371         missing = arvados_node_missing(self.arvados_node, self.node_stale_after)
372         if missing and self._cloud.broken(self.cloud_node):
373             # Node is paired, but Arvados says it is missing and the cloud says the node
374             # is in an error state, so shut it down.
375             return True
376         if missing is None and self._cloud.broken(self.cloud_node):
377             self._logger.info(
378                 "Cloud node considered 'broken' but paired node %s last_ping_at is None, " +
379                 "cannot check node_stale_after (node may be shut down and we just haven't gotten the message yet).",
380                 self.arvados_node['uuid'])
381         if self.in_state('idle'):
382             return True
383         else:
384             return "node is not idle."
385
386     def consider_shutdown(self):
387         try:
388             next_opening = self._shutdowns.next_opening()
389             eligible = self.shutdown_eligible()
390             if eligible is True:
391                 self._debug("Suggesting shutdown.")
392                 _notify_subscribers(self._later, self.subscribers)
393             elif self._shutdowns.window_open():
394                 self._debug("Cannot shut down because %s", eligible)
395             elif self.last_shutdown_opening != next_opening:
396                 self._debug("Shutdown window closed.  Next at %s.",
397                             time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(next_opening)))
398                 self._timer.schedule(next_opening, self._later.consider_shutdown)
399                 self.last_shutdown_opening = next_opening
400         except Exception:
401             self._logger.exception("Unexpected exception")
402
403     def offer_arvados_pair(self, arvados_node):
404         first_ping_s = arvados_node.get('first_ping_at')
405         if (self.arvados_node is not None) or (not first_ping_s):
406             return None
407         elif ((arvados_node['ip_address'] in self.cloud_node.private_ips) and
408               (arvados_timestamp(first_ping_s) >= self.cloud_node_start_time)):
409             self._later.update_arvados_node(arvados_node)
410             return self.cloud_node.id
411         else:
412             return None
413
414     def update_cloud_node(self, cloud_node):
415         if cloud_node is not None:
416             self.cloud_node = cloud_node
417             self._later.consider_shutdown()
418
419     def update_arvados_node(self, arvados_node):
420         # If the cloud node's FQDN doesn't match what's in the Arvados node
421         # record, make them match.
422         # This method is a little unusual in the way it just fires off the
423         # request without checking the result or retrying errors.  That's
424         # because this update happens every time we reload the Arvados node
425         # list: if a previous sync attempt failed, we'll see that the names
426         # are out of sync and just try again.  ComputeNodeUpdateActor has
427         # the logic to throttle those effective retries when there's trouble.
428         if arvados_node is not None:
429             self.arvados_node = arvados_node
430             if (self._cloud_node_fqdn(self.cloud_node) !=
431                   arvados_node_fqdn(self.arvados_node)):
432                 self._update.sync_node(self.cloud_node, self.arvados_node)
433             self._later.consider_shutdown()