Merge branch 'master' into 3699-arv-copy
[arvados.git] / services / nodemanager / arvnodeman / jobqueue.py
1 #!/usr/bin/env python
2
3 from __future__ import absolute_import, print_function
4
5 from . import clientactor
6 from .config import ARVADOS_ERRORS
7
8 class ServerCalculator(object):
9     """Generate cloud server wishlists from an Arvados job queue.
10
11     Instantiate this class with a list of cloud node sizes you're willing to
12     use, plus keyword overrides from the configuration.  Then you can pass
13     job queues to servers_for_queue.  It will return a list of node sizes
14     that would best satisfy the jobs, choosing the cheapest size that
15     satisfies each job, and ignoring jobs that can't be satisfied.
16     """
17
18     class CloudSizeWrapper(object):
19         def __init__(self, real_size, **kwargs):
20             self.real = real_size
21             for name in ['id', 'name', 'ram', 'disk', 'bandwidth', 'price',
22                          'extra']:
23                 setattr(self, name, getattr(self.real, name))
24             self.cores = kwargs.pop('cores')
25             self.scratch = self.disk
26             for name, override in kwargs.iteritems():
27                 if not hasattr(self, name):
28                     raise ValueError("unrecognized size field '%s'" % (name,))
29                 setattr(self, name, override)
30
31         def meets_constraints(self, **kwargs):
32             for name, want_value in kwargs.iteritems():
33                 have_value = getattr(self, name)
34                 if (have_value != 0) and (have_value < want_value):
35                     return False
36             return True
37
38
39     def __init__(self, server_list, max_nodes=None):
40         self.cloud_sizes = [self.CloudSizeWrapper(s, **kws)
41                             for s, kws in server_list]
42         self.cloud_sizes.sort(key=lambda s: s.price)
43         self.max_nodes = max_nodes or float("inf")
44
45     @staticmethod
46     def coerce_int(x, fallback):
47         try:
48             return int(x)
49         except (TypeError, ValueError):
50             return fallback
51
52     def cloud_size_for_constraints(self, constraints):
53         want_value = lambda key: self.coerce_int(constraints.get(key), 0)
54         wants = {'cores': want_value('min_cores_per_node'),
55                  'ram': want_value('min_ram_mb_per_node'),
56                  'scratch': want_value('min_scratch_mb_per_node')}
57         for size in self.cloud_sizes:
58             if size.meets_constraints(**wants):
59                 return size
60         return None
61
62     def servers_for_queue(self, queue):
63         servers = []
64         for job in queue:
65             constraints = job['runtime_constraints']
66             want_count = self.coerce_int(constraints.get('min_nodes'), 1)
67             cloud_size = self.cloud_size_for_constraints(constraints)
68             if (want_count < self.max_nodes) and (cloud_size is not None):
69                 servers.extend([cloud_size.real] * max(1, want_count))
70         return servers
71
72
73 class JobQueueMonitorActor(clientactor.RemotePollLoopActor):
74     """Actor to generate server wishlists from the job queue.
75
76     This actor regularly polls Arvados' job queue, and uses the provided
77     ServerCalculator to turn that into a list of requested node sizes.  That
78     list is sent to subscribers on every poll.
79     """
80
81     CLIENT_ERRORS = ARVADOS_ERRORS
82     LOGGER_NAME = 'arvnodeman.jobqueue'
83
84     def __init__(self, client, timer_actor, server_calc, *args, **kwargs):
85         super(JobQueueMonitorActor, self).__init__(
86             client, timer_actor, *args, **kwargs)
87         self._calculator = server_calc
88
89     def _send_request(self):
90         return self._client.jobs().queue().execute()['items']
91
92     def _got_response(self, queue):
93         server_list = self._calculator.servers_for_queue(queue)
94         self._logger.debug("Sending server wishlist: %s",
95                            ', '.join(s.name for s in server_list))
96         return super(JobQueueMonitorActor, self)._got_response(server_list)