Adds checking for request body options that are both valid JSON and readable files
[arvados.git] / services / nodemanager / arvnodeman / daemon.py
1 #!/usr/bin/env python
2
3 from __future__ import absolute_import, print_function
4
5 import functools
6 import logging
7 import time
8
9 import pykka
10
11 from . import computenode as cnode
12 from .computenode import dispatch
13 from .config import actor_class
14
15 class _ComputeNodeRecord(object):
16     def __init__(self, actor=None, cloud_node=None, arvados_node=None,
17                  assignment_time=float('-inf')):
18         self.actor = actor
19         self.cloud_node = cloud_node
20         self.arvados_node = arvados_node
21         self.assignment_time = assignment_time
22
23
24 class _BaseNodeTracker(object):
25     def __init__(self):
26         self.nodes = {}
27         self.orphans = {}
28         self._blacklist = set()
29
30     # Proxy the methods listed below to self.nodes.
31     def _proxy_method(name):
32         method = getattr(dict, name)
33         @functools.wraps(method, ('__name__', '__doc__'))
34         def wrapper(self, *args, **kwargs):
35             return method(self.nodes, *args, **kwargs)
36         return wrapper
37
38     for _method_name in ['__contains__', '__getitem__', '__len__', 'get']:
39         locals()[_method_name] = _proxy_method(_method_name)
40
41     def record_key(self, record):
42         return self.item_key(getattr(record, self.RECORD_ATTR))
43
44     def add(self, record):
45         self.nodes[self.record_key(record)] = record
46
47     def blacklist(self, key):
48         self._blacklist.add(key)
49
50     def update_record(self, key, item):
51         setattr(self.nodes[key], self.RECORD_ATTR, item)
52
53     def update_from(self, response):
54         unseen = set(self.nodes.iterkeys())
55         for item in response:
56             key = self.item_key(item)
57             if key in self._blacklist:
58                 continue
59             elif key in unseen:
60                 unseen.remove(key)
61                 self.update_record(key, item)
62             else:
63                 yield key, item
64         self.orphans = {key: self.nodes.pop(key) for key in unseen}
65
66     def unpaired(self):
67         return (record for record in self.nodes.itervalues()
68                 if getattr(record, self.PAIR_ATTR) is None)
69
70
71 class _CloudNodeTracker(_BaseNodeTracker):
72     RECORD_ATTR = 'cloud_node'
73     PAIR_ATTR = 'arvados_node'
74     item_key = staticmethod(lambda cloud_node: cloud_node.id)
75
76
77 class _ArvadosNodeTracker(_BaseNodeTracker):
78     RECORD_ATTR = 'arvados_node'
79     PAIR_ATTR = 'cloud_node'
80     item_key = staticmethod(lambda arvados_node: arvados_node['uuid'])
81
82     def find_stale_node(self, stale_time):
83         for record in self.nodes.itervalues():
84             node = record.arvados_node
85             if (not cnode.timestamp_fresh(cnode.arvados_node_mtime(node),
86                                           stale_time) and
87                   not cnode.timestamp_fresh(record.assignment_time,
88                                             stale_time)):
89                 return node
90         return None
91
92
93 class NodeManagerDaemonActor(actor_class):
94     """Node Manager daemon.
95
96     This actor subscribes to all information polls about cloud nodes,
97     Arvados nodes, and the job queue.  It creates a ComputeNodeMonitorActor
98     for every cloud node, subscribing them to poll updates
99     appropriately.  It creates and destroys cloud nodes based on job queue
100     demand, and stops the corresponding ComputeNode actors when their work
101     is done.
102     """
103     def __init__(self, server_wishlist_actor, arvados_nodes_actor,
104                  cloud_nodes_actor, cloud_update_actor, timer_actor,
105                  arvados_factory, cloud_factory,
106                  shutdown_windows, server_calculator,
107                  min_nodes, max_nodes,
108                  poll_stale_after=600,
109                  boot_fail_after=1800,
110                  node_stale_after=7200,
111                  node_setup_class=dispatch.ComputeNodeSetupActor,
112                  node_shutdown_class=dispatch.ComputeNodeShutdownActor,
113                  node_actor_class=dispatch.ComputeNodeMonitorActor,
114                  max_total_price=0):
115         super(NodeManagerDaemonActor, self).__init__()
116         self._node_setup = node_setup_class
117         self._node_shutdown = node_shutdown_class
118         self._node_actor = node_actor_class
119         self._cloud_updater = cloud_update_actor
120         self._timer = timer_actor
121         self._new_arvados = arvados_factory
122         self._new_cloud = cloud_factory
123         self._cloud_driver = self._new_cloud()
124         self._later = self.actor_ref.proxy()
125         self.shutdown_windows = shutdown_windows
126         self.server_calculator = server_calculator
127         self.min_cloud_size = self.server_calculator.cheapest_size()
128         self.min_nodes = min_nodes
129         self.max_nodes = max_nodes
130         self.max_total_price = max_total_price
131         self.poll_stale_after = poll_stale_after
132         self.boot_fail_after = boot_fail_after
133         self.node_stale_after = node_stale_after
134         self.last_polls = {}
135         for poll_name in ['server_wishlist', 'arvados_nodes', 'cloud_nodes']:
136             poll_actor = locals()[poll_name + '_actor']
137             poll_actor.subscribe(getattr(self._later, 'update_' + poll_name))
138             setattr(self, '_{}_actor'.format(poll_name), poll_actor)
139             self.last_polls[poll_name] = -self.poll_stale_after
140         self.cloud_nodes = _CloudNodeTracker()
141         self.arvados_nodes = _ArvadosNodeTracker()
142         self.booting = {}       # Actor IDs to ComputeNodeSetupActors
143         self.booted = {}        # Cloud node IDs to _ComputeNodeRecords
144         self.shutdowns = {}     # Cloud node IDs to ComputeNodeShutdownActors
145         self.sizes_booting_shutdown = {} # Actor IDs or Cloud node IDs to node size
146
147     def on_start(self):
148         self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, self.actor_urn[33:]))
149         self._logger.debug("Daemon started")
150
151     def _update_poll_time(self, poll_key):
152         self.last_polls[poll_key] = time.time()
153
154     def _pair_nodes(self, node_record, arvados_node):
155         self._logger.info("Cloud node %s is now paired with Arvados node %s",
156                           node_record.cloud_node.name, arvados_node['uuid'])
157         self._arvados_nodes_actor.subscribe_to(
158             arvados_node['uuid'], node_record.actor.update_arvados_node)
159         node_record.arvados_node = arvados_node
160         self.arvados_nodes.add(node_record)
161
162     def _new_node(self, cloud_node):
163         start_time = self._cloud_driver.node_start_time(cloud_node)
164         shutdown_timer = cnode.ShutdownTimer(start_time,
165                                              self.shutdown_windows)
166         actor = self._node_actor.start(
167             cloud_node=cloud_node,
168             cloud_node_start_time=start_time,
169             shutdown_timer=shutdown_timer,
170             cloud_fqdn_func=self._cloud_driver.node_fqdn,
171             update_actor=self._cloud_updater,
172             timer_actor=self._timer,
173             arvados_node=None,
174             poll_stale_after=self.poll_stale_after,
175             node_stale_after=self.node_stale_after,
176             cloud_client=self._cloud_driver,
177             boot_fail_after=self.boot_fail_after).proxy()
178         actor.subscribe(self._later.node_can_shutdown)
179         self._cloud_nodes_actor.subscribe_to(cloud_node.id,
180                                              actor.update_cloud_node)
181         record = _ComputeNodeRecord(actor, cloud_node)
182         return record
183
184     def update_cloud_nodes(self, nodelist):
185         self._update_poll_time('cloud_nodes')
186         for key, node in self.cloud_nodes.update_from(nodelist):
187             self._logger.info("Registering new cloud node %s", key)
188             if key in self.booted:
189                 record = self.booted.pop(key)
190             else:
191                 record = self._new_node(node)
192             self.cloud_nodes.add(record)
193             for arv_rec in self.arvados_nodes.unpaired():
194                 if record.actor.offer_arvados_pair(arv_rec.arvados_node).get():
195                     self._pair_nodes(record, arv_rec.arvados_node)
196                     break
197         for key, record in self.cloud_nodes.orphans.iteritems():
198             if key in self.shutdowns:
199                 try:
200                     self.shutdowns[key].stop().get()
201                 except pykka.ActorDeadError:
202                     pass
203                 del self.shutdowns[key]
204                 del self.sizes_booting_shutdown[key]
205             record.actor.stop()
206             record.cloud_node = None
207
208     def update_arvados_nodes(self, nodelist):
209         self._update_poll_time('arvados_nodes')
210         for key, node in self.arvados_nodes.update_from(nodelist):
211             self._logger.info("Registering new Arvados node %s", key)
212             record = _ComputeNodeRecord(arvados_node=node)
213             self.arvados_nodes.add(record)
214         for arv_rec in self.arvados_nodes.unpaired():
215             arv_node = arv_rec.arvados_node
216             for cloud_rec in self.cloud_nodes.unpaired():
217                 if cloud_rec.actor.offer_arvados_pair(arv_node).get():
218                     self._pair_nodes(cloud_rec, arv_node)
219                     break
220
221     def _nodes_booting(self, size):
222         s = sum(1
223                 for c in self.booting.iterkeys()
224                 if size is None or self.sizes_booting_shutdown[c].id == size.id)
225         s += sum(1
226                  for c in self.booted.itervalues()
227                  if size is None or c.cloud_node.size.id == size.id)
228         return s
229
230     def _nodes_unpaired(self, size):
231         return sum(1
232                    for c in self.cloud_nodes.unpaired()
233                    if size is None or c.cloud_node.size.id == size.id)
234
235     def _nodes_booted(self, size):
236         return sum(1
237                   for c in self.cloud_nodes.nodes.itervalues()
238                   if size is None or c.cloud_node.size.id == size.id)
239
240     def _nodes_up(self, size):
241         up = self._nodes_booting(size) + self._nodes_booted(size)
242         return up
243
244     def _total_price(self):
245         cost = 0
246         cost += sum(self.server_calculator.find_size(self.sizes_booting_shutdown[c].id).price
247                   for c in self.booting.iterkeys())
248         cost += sum(self.server_calculator.find_size(c.cloud_node.size.id).price
249                     for i in (self.booted, self.cloud_nodes.nodes)
250                     for c in i.itervalues())
251         return cost
252
253     def _nodes_busy(self, size):
254         return sum(1 for busy in
255                    pykka.get_all(rec.actor.in_state('busy') for rec in
256                                  self.cloud_nodes.nodes.itervalues()
257                                  if rec.cloud_node.size.id == size.id)
258                    if busy)
259
260     def _nodes_missing(self, size):
261         return sum(1 for arv_node in
262                    pykka.get_all(rec.actor.arvados_node for rec in
263                                  self.cloud_nodes.nodes.itervalues()
264                                  if rec.cloud_node.size.id == size.id and rec.actor.cloud_node.get().id not in self.shutdowns)
265                    if arv_node and cnode.arvados_node_missing(arv_node, self.node_stale_after))
266
267     def _size_wishlist(self, size):
268         return sum(1 for c in self.last_wishlist if c.id == size.id)
269
270     def _size_shutdowns(self, size):
271         sh = 0
272         for c in self.shutdowns.iterkeys():
273             try:
274                 if self.sizes_booting_shutdown[c].id == size.id:
275                     sh += 1
276             except pykka.ActorDeadError:
277                 pass
278         return sh
279
280     def _nodes_wanted(self, size):
281         total_up_count = self._nodes_up(None)
282         under_min = self.min_nodes - total_up_count
283         over_max = total_up_count - self.max_nodes
284         total_price = self._total_price()
285
286         if over_max >= 0:
287             return -over_max
288         elif under_min > 0 and size.id == self.min_cloud_size.id:
289             return under_min
290
291         booting_count = self._nodes_booting(size) + self._nodes_unpaired(size)
292         shutdown_count = self._size_shutdowns(size)
293         busy_count = self._nodes_busy(size)
294         up_count = self._nodes_up(size) - (shutdown_count + busy_count + self._nodes_missing(size))
295
296         self._logger.info("%s: wishlist %i, up %i (booting %i, idle %i, busy %i), shutting down %i", size.name,
297                           self._size_wishlist(size),
298                           up_count + busy_count,
299                           booting_count,
300                           up_count - booting_count,
301                           busy_count,
302                           shutdown_count)
303
304         wanted = self._size_wishlist(size) - up_count
305         if wanted > 0 and self.max_total_price and ((total_price + (size.price*wanted)) > self.max_total_price):
306             can_boot = int((self.max_total_price - total_price) / size.price)
307             if can_boot == 0:
308                 self._logger.info("Not booting %s (price %s) because with it would exceed max_total_price of %s (current total_price is %s)",
309                                   size.name, size.price, self.max_total_price, total_price)
310             return can_boot
311         else:
312             return wanted
313
314     def _nodes_excess(self, size):
315         up_count = self._nodes_up(size) - self._size_shutdowns(size)
316         if size.id == self.min_cloud_size.id:
317             up_count -= self.min_nodes
318         return up_count - self._nodes_busy(size) - self._size_wishlist(size)
319
320     def update_server_wishlist(self, wishlist):
321         self._update_poll_time('server_wishlist')
322         self.last_wishlist = wishlist
323         for size in reversed(self.server_calculator.cloud_sizes):
324             try:
325                 nodes_wanted = self._nodes_wanted(size)
326                 if nodes_wanted > 0:
327                     self._later.start_node(size)
328                 elif (nodes_wanted < 0) and self.booting:
329                     self._later.stop_booting_node(size)
330             except Exception as e:
331                 self._logger.exception("while calculating nodes wanted for size %s", size)
332
333     def _check_poll_freshness(orig_func):
334         """Decorator to inhibit a method when poll information is stale.
335
336         This decorator checks the timestamps of all the poll information the
337         daemon has received.  The decorated method is only called if none
338         of the timestamps are considered stale.
339         """
340         @functools.wraps(orig_func)
341         def wrapper(self, *args, **kwargs):
342             now = time.time()
343             if all(now - t < self.poll_stale_after
344                    for t in self.last_polls.itervalues()):
345                 return orig_func(self, *args, **kwargs)
346             else:
347                 return None
348         return wrapper
349
350     @_check_poll_freshness
351     def start_node(self, cloud_size):
352         nodes_wanted = self._nodes_wanted(cloud_size)
353         if nodes_wanted < 1:
354             return None
355         arvados_node = self.arvados_nodes.find_stale_node(self.node_stale_after)
356         self._logger.info("Want %i more %s nodes.  Booting a node.",
357                           nodes_wanted, cloud_size.name)
358         new_setup = self._node_setup.start(
359             timer_actor=self._timer,
360             arvados_client=self._new_arvados(),
361             arvados_node=arvados_node,
362             cloud_client=self._new_cloud(),
363             cloud_size=cloud_size).proxy()
364         self.booting[new_setup.actor_ref.actor_urn] = new_setup
365         self.sizes_booting_shutdown[new_setup.actor_ref.actor_urn] = cloud_size
366
367         if arvados_node is not None:
368             self.arvados_nodes[arvados_node['uuid']].assignment_time = (
369                 time.time())
370         new_setup.subscribe(self._later.node_up)
371         if nodes_wanted > 1:
372             self._later.start_node(cloud_size)
373
374     def _get_actor_attrs(self, actor, *attr_names):
375         return pykka.get_all([getattr(actor, name) for name in attr_names])
376
377     def node_up(self, setup_proxy):
378         cloud_node = setup_proxy.cloud_node.get()
379         del self.booting[setup_proxy.actor_ref.actor_urn]
380         del self.sizes_booting_shutdown[setup_proxy.actor_ref.actor_urn]
381
382         setup_proxy.stop()
383         if cloud_node is not None:
384             record = self.cloud_nodes.get(cloud_node.id)
385             if record is None:
386                 record = self._new_node(cloud_node)
387                 self.booted[cloud_node.id] = record
388             self._timer.schedule(time.time() + self.boot_fail_after,
389                                  self._later.shutdown_unpaired_node, cloud_node.id)
390
391     @_check_poll_freshness
392     def stop_booting_node(self, size):
393         nodes_excess = self._nodes_excess(size)
394         if (nodes_excess < 1) or not self.booting:
395             return None
396         for key, node in self.booting.iteritems():
397             if node.cloud_size.get().id == size.id and node.stop_if_no_cloud_node().get():
398                 del self.booting[key]
399                 del self.sizes_booting_shutdown[key]
400
401                 if nodes_excess > 1:
402                     self._later.stop_booting_node(size)
403                 break
404
405     def _begin_node_shutdown(self, node_actor, cancellable):
406         cloud_node_obj = node_actor.cloud_node.get()
407         cloud_node_id = cloud_node_obj.id
408         if cloud_node_id in self.shutdowns:
409             return None
410         shutdown = self._node_shutdown.start(
411             timer_actor=self._timer, cloud_client=self._new_cloud(),
412             arvados_client=self._new_arvados(),
413             node_monitor=node_actor.actor_ref, cancellable=cancellable).proxy()
414         self.shutdowns[cloud_node_id] = shutdown
415         self.sizes_booting_shutdown[cloud_node_id] = cloud_node_obj.size
416         shutdown.subscribe(self._later.node_finished_shutdown)
417
418     @_check_poll_freshness
419     def node_can_shutdown(self, node_actor):
420         if self._nodes_excess(node_actor.cloud_node.get().size) > 0:
421             self._begin_node_shutdown(node_actor, cancellable=True)
422
423     def shutdown_unpaired_node(self, cloud_node_id):
424         for record_dict in [self.cloud_nodes, self.booted]:
425             if cloud_node_id in record_dict:
426                 record = record_dict[cloud_node_id]
427                 break
428         else:
429             return None
430         if not record.actor.in_state('idle', 'busy').get():
431             self._begin_node_shutdown(record.actor, cancellable=False)
432
433     def node_finished_shutdown(self, shutdown_actor):
434         cloud_node, success, cancel_reason = self._get_actor_attrs(
435             shutdown_actor, 'cloud_node', 'success', 'cancel_reason')
436         shutdown_actor.stop()
437         cloud_node_id = cloud_node.id
438         if not success:
439             if cancel_reason == self._node_shutdown.NODE_BROKEN:
440                 self.cloud_nodes.blacklist(cloud_node_id)
441             del self.shutdowns[cloud_node_id]
442             del self.sizes_booting_shutdown[cloud_node_id]
443         elif cloud_node_id in self.booted:
444             self.booted.pop(cloud_node_id).actor.stop()
445             del self.shutdowns[cloud_node_id]
446             del self.sizes_booting_shutdown[cloud_node_id]
447
448     def shutdown(self):
449         self._logger.info("Shutting down after signal.")
450         self.poll_stale_after = -1  # Inhibit starting/stopping nodes
451         setup_stops = {key: node.stop_if_no_cloud_node()
452                        for key, node in self.booting.iteritems()}
453         self.booting = {key: self.booting[key]
454                         for key in setup_stops if not setup_stops[key].get()}
455         self._later.await_shutdown()
456
457     def await_shutdown(self):
458         if self.booting:
459             self._timer.schedule(time.time() + 1, self._later.await_shutdown)
460         else:
461             self.stop()