8784: Fix test for latest firefox.
[arvados.git] / services / nodemanager / arvnodeman / computenode / dispatch / __init__.py
1 #!/usr/bin/env python
2
3 from __future__ import absolute_import, print_function
4
5 import functools
6 import logging
7 import time
8 import re
9
10 import libcloud.common.types as cloud_types
11 from libcloud.common.exceptions import BaseHTTPError
12
13 import pykka
14
15 from .. import \
16     arvados_node_fqdn, arvados_node_mtime, arvados_timestamp, timestamp_fresh, \
17     arvados_node_missing, RetryMixin
18 from ...clientactor import _notify_subscribers
19 from ... import config
20 from .transitions import transitions
21
22 QuotaExceeded = "QuotaExceeded"
23
24 class ComputeNodeStateChangeBase(config.actor_class, RetryMixin):
25     """Base class for actors that change a compute node's state.
26
27     This base class takes care of retrying changes and notifying
28     subscribers when the change is finished.
29     """
30     def __init__(self, cloud_client, arvados_client, timer_actor,
31                  retry_wait, max_retry_wait):
32         super(ComputeNodeStateChangeBase, self).__init__()
33         RetryMixin.__init__(self, retry_wait, max_retry_wait,
34                             None, cloud_client, timer_actor)
35         self._later = self.actor_ref.tell_proxy()
36         self._arvados = arvados_client
37         self.subscribers = set()
38
39     def _set_logger(self):
40         self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, self.actor_urn[33:]))
41
42     def on_start(self):
43         self._set_logger()
44
45     def _finished(self):
46         if self.subscribers is None:
47             raise Exception("Actor tried to finish twice")
48         _notify_subscribers(self.actor_ref.proxy(), self.subscribers)
49         self.subscribers = None
50         self._logger.info("finished")
51
52     def subscribe(self, subscriber):
53         if self.subscribers is None:
54             try:
55                 subscriber(self.actor_ref.proxy())
56             except pykka.ActorDeadError:
57                 pass
58         else:
59             self.subscribers.add(subscriber)
60
61     def _clean_arvados_node(self, arvados_node, explanation):
62         return self._arvados.nodes().update(
63             uuid=arvados_node['uuid'],
64             body={'hostname': None,
65                   'ip_address': None,
66                   'slot_number': None,
67                   'first_ping_at': None,
68                   'last_ping_at': None,
69                   'properties': {},
70                   'info': {'ec2_instance_id': None,
71                            'last_action': explanation}},
72             ).execute()
73
74     @staticmethod
75     def _finish_on_exception(orig_func):
76         @functools.wraps(orig_func)
77         def finish_wrapper(self, *args, **kwargs):
78             try:
79                 return orig_func(self, *args, **kwargs)
80             except Exception as error:
81                 self._logger.error("Actor error %s", error)
82                 self._finished()
83         return finish_wrapper
84
85
86 class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
87     """Actor to create and set up a cloud compute node.
88
89     This actor prepares an Arvados node record for a new compute node
90     (either creating one or cleaning one passed in), then boots the
91     actual compute node.  It notifies subscribers when the cloud node
92     is successfully created (the last step in the process for Node
93     Manager to handle).
94     """
95     def __init__(self, timer_actor, arvados_client, cloud_client,
96                  cloud_size, arvados_node=None,
97                  retry_wait=1, max_retry_wait=180):
98         super(ComputeNodeSetupActor, self).__init__(
99             cloud_client, arvados_client, timer_actor,
100             retry_wait, max_retry_wait)
101         self.cloud_size = cloud_size
102         self.arvados_node = None
103         self.cloud_node = None
104         self.error = None
105         if arvados_node is None:
106             self._later.create_arvados_node()
107         else:
108             self._later.prepare_arvados_node(arvados_node)
109
110     @ComputeNodeStateChangeBase._finish_on_exception
111     @RetryMixin._retry(config.ARVADOS_ERRORS)
112     def create_arvados_node(self):
113         self.arvados_node = self._arvados.nodes().create(body={}).execute()
114         self._later.create_cloud_node()
115
116     @ComputeNodeStateChangeBase._finish_on_exception
117     @RetryMixin._retry(config.ARVADOS_ERRORS)
118     def prepare_arvados_node(self, node):
119         self.arvados_node = self._clean_arvados_node(
120             node, "Prepared by Node Manager")
121         self._later.create_cloud_node()
122
123     @ComputeNodeStateChangeBase._finish_on_exception
124     @RetryMixin._retry()
125     def create_cloud_node(self):
126         self._logger.info("Sending create_node request for node size %s.",
127                           self.cloud_size.name)
128         try:
129             self.cloud_node = self._cloud.create_node(self.cloud_size,
130                                                       self.arvados_node)
131         except BaseHTTPError as e:
132             if e.code == 429 or "RequestLimitExceeded" in e.message:
133                 # Don't consider API rate limits to be quota errors.
134                 # re-raise so the Retry logic applies.
135                 raise
136
137             # The set of possible error codes / messages isn't documented for
138             # all clouds, so use a keyword heuristic to determine if the
139             # failure is likely due to a quota.
140             if re.search(r'(exceed|quota|limit)', e.message, re.I):
141                 self.error = QuotaExceeded
142                 self._logger.warning("Quota exceeded: %s", e)
143                 self._finished()
144                 return
145             else:
146                 # Something else happened, re-raise so the Retry logic applies.
147                 raise
148         except Exception as e:
149             raise
150
151         # The information included in the node size object we get from libcloud
152         # is inconsistent between cloud drivers.  Replace libcloud NodeSize
153         # object with compatible CloudSizeWrapper object which merges the size
154         # info reported from the cloud with size information from the
155         # configuration file.
156         self.cloud_node.size = self.cloud_size
157
158         self._logger.info("Cloud node %s created.", self.cloud_node.id)
159         self._later.update_arvados_node_properties()
160
161     @ComputeNodeStateChangeBase._finish_on_exception
162     @RetryMixin._retry(config.ARVADOS_ERRORS)
163     def update_arvados_node_properties(self):
164         """Tell Arvados some details about the cloud node.
165
166         Currently we only include size/price from our request, which
167         we already knew before create_cloud_node(), but doing it here
168         gives us an opportunity to provide more detail from
169         self.cloud_node, too.
170         """
171         self.arvados_node['properties']['cloud_node'] = {
172             # Note this 'size' is the node size we asked the cloud
173             # driver to create -- not necessarily equal to the size
174             # reported by the cloud driver for the node that was
175             # created.
176             'size': self.cloud_size.id,
177             'price': self.cloud_size.price,
178         }
179         self.arvados_node = self._arvados.nodes().update(
180             uuid=self.arvados_node['uuid'],
181             body={'properties': self.arvados_node['properties']},
182         ).execute()
183         self._logger.info("%s updated properties.", self.arvados_node['uuid'])
184         self._later.post_create()
185
186     @RetryMixin._retry()
187     def post_create(self):
188         self._cloud.post_create_node(self.cloud_node)
189         self._logger.info("%s post-create work done.", self.cloud_node.id)
190         self._finished()
191
192     def stop_if_no_cloud_node(self):
193         if self.cloud_node is not None:
194             return False
195         self.stop()
196         return True
197
198
199 class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
200     """Actor to shut down a compute node.
201
202     This actor simply destroys a cloud node, retrying as needed.
203     """
204     # Reasons for a shutdown to be cancelled.
205     WINDOW_CLOSED = "shutdown window closed"
206     DESTROY_FAILED = "destroy_node failed"
207
208     def __init__(self, timer_actor, cloud_client, arvados_client, node_monitor,
209                  cancellable=True, retry_wait=1, max_retry_wait=180):
210         # If a ShutdownActor is cancellable, it will ask the
211         # ComputeNodeMonitorActor if it's still eligible before taking each
212         # action, and stop the shutdown process if the node is no longer
213         # eligible.  Normal shutdowns based on job demand should be
214         # cancellable; shutdowns based on node misbehavior should not.
215         super(ComputeNodeShutdownActor, self).__init__(
216             cloud_client, arvados_client, timer_actor,
217             retry_wait, max_retry_wait)
218         self._monitor = node_monitor.proxy()
219         self.cloud_node = self._monitor.cloud_node.get()
220         self.cancellable = cancellable
221         self.cancel_reason = None
222         self.success = None
223
224     def _set_logger(self):
225         self._logger = logging.getLogger("%s.%s.%s" % (self.__class__.__name__, self.actor_urn[33:], self.cloud_node.name))
226
227     def on_start(self):
228         super(ComputeNodeShutdownActor, self).on_start()
229         self._later.shutdown_node()
230
231     def _arvados_node(self):
232         return self._monitor.arvados_node.get()
233
234     def _finished(self, success_flag=None):
235         if success_flag is not None:
236             self.success = success_flag
237         return super(ComputeNodeShutdownActor, self)._finished()
238
239     def cancel_shutdown(self, reason, **kwargs):
240         self.cancel_reason = reason
241         self._logger.info("Shutdown cancelled: %s.", reason)
242         self._finished(success_flag=False)
243
244     def _cancel_on_exception(orig_func):
245         @functools.wraps(orig_func)
246         def finish_wrapper(self, *args, **kwargs):
247             try:
248                 return orig_func(self, *args, **kwargs)
249             except Exception as error:
250                 self._logger.error("Actor error %s", error)
251                 self._logger.debug("", exc_info=True)
252                 self._later.cancel_shutdown("Unhandled exception %s" % error, try_resume=False)
253         return finish_wrapper
254
255     @_cancel_on_exception
256     def shutdown_node(self):
257         if self.cancellable:
258             self._logger.info("Checking that node is still eligible for shutdown")
259             eligible, reason = self._monitor.shutdown_eligible().get()
260             if not eligible:
261                 self.cancel_shutdown("No longer eligible for shut down because %s" % reason,
262                                      try_resume=True)
263                 return
264         self._destroy_node()
265
266     def _destroy_node(self):
267         self._logger.info("Starting shutdown")
268         arv_node = self._arvados_node()
269         if self._cloud.destroy_node(self.cloud_node):
270             self._logger.info("Shutdown success")
271             if arv_node:
272                 self._later.clean_arvados_node(arv_node)
273             else:
274                 self._finished(success_flag=True)
275         else:
276             self.cancel_shutdown(self.DESTROY_FAILED, try_resume=False)
277
278     @ComputeNodeStateChangeBase._finish_on_exception
279     @RetryMixin._retry(config.ARVADOS_ERRORS)
280     def clean_arvados_node(self, arvados_node):
281         self._clean_arvados_node(arvados_node, "Shut down by Node Manager")
282         self._finished(success_flag=True)
283
284
285 class ComputeNodeUpdateActor(config.actor_class, RetryMixin):
286     """Actor to dispatch one-off cloud management requests.
287
288     This actor receives requests for small cloud updates, and
289     dispatches them to a real driver.  ComputeNodeMonitorActors use
290     this to perform maintenance tasks on themselves.  Having a
291     dedicated actor for this gives us the opportunity to control the
292     flow of requests; e.g., by backing off when errors occur.
293     """
294     def __init__(self, cloud_factory, timer_actor, max_retry_wait=180):
295         super(ComputeNodeUpdateActor, self).__init__()
296         RetryMixin.__init__(self, 1, max_retry_wait,
297                             None, cloud_factory(), timer_actor)
298         self._cloud = cloud_factory()
299         self._later = self.actor_ref.tell_proxy()
300
301     def _set_logger(self):
302         self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, self.actor_urn[33:]))
303
304     def on_start(self):
305         self._set_logger()
306
307     @RetryMixin._retry()
308     def sync_node(self, cloud_node, arvados_node):
309         return self._cloud.sync_node(cloud_node, arvados_node)
310
311
312 class ComputeNodeMonitorActor(config.actor_class):
313     """Actor to manage a running compute node.
314
315     This actor gets updates about a compute node's cloud and Arvados records.
316     It uses this information to notify subscribers when the node is eligible
317     for shutdown.
318     """
319     def __init__(self, cloud_node, cloud_node_start_time, shutdown_timer,
320                  cloud_fqdn_func, timer_actor, update_actor, cloud_client,
321                  arvados_node=None, poll_stale_after=600, node_stale_after=3600,
322                  boot_fail_after=1800
323     ):
324         super(ComputeNodeMonitorActor, self).__init__()
325         self._later = self.actor_ref.tell_proxy()
326         self._shutdowns = shutdown_timer
327         self._cloud_node_fqdn = cloud_fqdn_func
328         self._timer = timer_actor
329         self._update = update_actor
330         self._cloud = cloud_client
331         self.cloud_node = cloud_node
332         self.cloud_node_start_time = cloud_node_start_time
333         self.poll_stale_after = poll_stale_after
334         self.node_stale_after = node_stale_after
335         self.boot_fail_after = boot_fail_after
336         self.subscribers = set()
337         self.arvados_node = None
338         self._later.update_arvados_node(arvados_node)
339         self.last_shutdown_opening = None
340         self._later.consider_shutdown()
341
342     def _set_logger(self):
343         self._logger = logging.getLogger("%s.%s.%s" % (self.__class__.__name__, self.actor_urn[33:], self.cloud_node.name))
344
345     def on_start(self):
346         self._set_logger()
347         self._timer.schedule(self.cloud_node_start_time + self.boot_fail_after, self._later.consider_shutdown)
348
349     def subscribe(self, subscriber):
350         self.subscribers.add(subscriber)
351
352     def _debug(self, msg, *args):
353         self._logger.debug(msg, *args)
354
355     def get_state(self):
356         """Get node state, one of ['unpaired', 'busy', 'idle', 'down']."""
357
358         # If this node is not associated with an Arvados node, return 'unpaired'.
359         if self.arvados_node is None:
360             return 'unpaired'
361
362         state = self.arvados_node['crunch_worker_state']
363
364         # If state information is not available because it is missing or the
365         # record is stale, return 'down'.
366         if not state or not timestamp_fresh(arvados_node_mtime(self.arvados_node),
367                                             self.node_stale_after):
368             state = 'down'
369
370         # There's a window between when a node pings for the first time and the
371         # value of 'slurm_state' is synchronized by crunch-dispatch.  In this
372         # window, the node will still report as 'down'.  Check that
373         # first_ping_at is truthy and consider the node 'idle' during the
374         # initial boot grace period.
375         if (state == 'down' and
376             self.arvados_node['first_ping_at'] and
377             timestamp_fresh(self.cloud_node_start_time,
378                             self.boot_fail_after) and
379             not self._cloud.broken(self.cloud_node)):
380             state = 'idle'
381
382         # "missing" means last_ping_at is stale, this should be
383         # considered "down"
384         if arvados_node_missing(self.arvados_node, self.node_stale_after):
385             state = 'down'
386
387         # Turns out using 'job_uuid' this way is a bad idea.  The node record
388         # is assigned the job_uuid before the job is locked (which removes it
389         # from the queue) which means the job will be double-counted as both in
390         # the wishlist and but also keeping a node busy.  This end result is
391         # excess nodes being booted.
392         #if state == 'idle' and self.arvados_node['job_uuid']:
393         #    state = 'busy'
394
395         return state
396
397     def in_state(self, *states):
398         return self.get_state() in states
399
400     def shutdown_eligible(self):
401         """Determine if node is candidate for shut down.
402
403         Returns a tuple of (boolean, string) where the first value is whether
404         the node is candidate for shut down, and the second value is the
405         reason for the decision.
406         """
407
408         # Collect states and then consult state transition table whether we
409         # should shut down.  Possible states are:
410         # crunch_worker_state = ['unpaired', 'busy', 'idle', 'down']
411         # window = ["open", "closed"]
412         # boot_grace = ["boot wait", "boot exceeded"]
413         # idle_grace = ["not idle", "idle wait", "idle exceeded"]
414
415         if self.arvados_node and not timestamp_fresh(arvados_node_mtime(self.arvados_node), self.node_stale_after):
416             return (False, "node state is stale")
417
418         crunch_worker_state = self.get_state()
419
420         window = "open" if self._shutdowns.window_open() else "closed"
421
422         if timestamp_fresh(self.cloud_node_start_time, self.boot_fail_after):
423             boot_grace = "boot wait"
424         else:
425             boot_grace = "boot exceeded"
426
427         # API server side not implemented yet.
428         idle_grace = 'idle exceeded'
429
430         node_state = (crunch_worker_state, window, boot_grace, idle_grace)
431         t = transitions[node_state]
432         if t is not None:
433             # yes, shutdown eligible
434             return (True, "node state is %s" % (node_state,))
435         else:
436             # no, return a reason
437             return (False, "node state is %s" % (node_state,))
438
439     def consider_shutdown(self):
440         try:
441             eligible, reason = self.shutdown_eligible()
442             next_opening = self._shutdowns.next_opening()
443             if eligible:
444                 self._debug("Suggesting shutdown because %s", reason)
445                 _notify_subscribers(self.actor_ref.proxy(), self.subscribers)
446             else:
447                 self._debug("Not eligible for shut down because %s", reason)
448
449                 if self.last_shutdown_opening != next_opening:
450                     self._debug("Shutdown window closed.  Next at %s.",
451                                 time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(next_opening)))
452                     self._timer.schedule(next_opening, self._later.consider_shutdown)
453                     self.last_shutdown_opening = next_opening
454         except Exception:
455             self._logger.exception("Unexpected exception")
456
457     def offer_arvados_pair(self, arvados_node):
458         first_ping_s = arvados_node.get('first_ping_at')
459         if (self.arvados_node is not None) or (not first_ping_s):
460             return None
461         elif ((arvados_node['info'].get('ec2_instance_id') == self._cloud.node_id(self.cloud_node)) and
462               (arvados_timestamp(first_ping_s) >= self.cloud_node_start_time)):
463             self._later.update_arvados_node(arvados_node)
464             return self.cloud_node.id
465         else:
466             return None
467
468     def update_cloud_node(self, cloud_node):
469         if cloud_node is not None:
470             self.cloud_node = cloud_node
471             self._later.consider_shutdown()
472
473     def update_arvados_node(self, arvados_node):
474         # If the cloud node's FQDN doesn't match what's in the Arvados node
475         # record, make them match.
476         # This method is a little unusual in the way it just fires off the
477         # request without checking the result or retrying errors.  That's
478         # because this update happens every time we reload the Arvados node
479         # list: if a previous sync attempt failed, we'll see that the names
480         # are out of sync and just try again.  ComputeNodeUpdateActor has
481         # the logic to throttle those effective retries when there's trouble.
482         if arvados_node is not None:
483             self.arvados_node = arvados_node
484             if (self._cloud_node_fqdn(self.cloud_node) !=
485                   arvados_node_fqdn(self.arvados_node)):
486                 self._update.sync_node(self.cloud_node, self.arvados_node)
487             self._later.consider_shutdown()