9daf5562d76dbc93182ad33a532b062c20918b27
[arvados.git] / services / nodemanager / arvnodeman / jobqueue.py
1 #!/usr/bin/env python
2
3 import arvados.errors as arverror
4
5 from . import clientactor
6
7 class ServerCalculator(object):
8     class SizeWrapper(object):
9         def __init__(self, real_size, **kwargs):
10             self.real = real_size
11             for name in ['id', 'name', 'ram', 'disk', 'bandwidth', 'price',
12                          'extra']:
13                 setattr(self, name, getattr(self.real, name))
14             self.cores = kwargs.pop('cores')
15             self.scratch = self.disk
16             for name, override in kwargs.iteritems():
17                 if not hasattr(self, name):
18                     raise ValueError("unrecognized size field '%s'" % (name,))
19                 setattr(self, name, override)
20
21         def meets_constraints(self, **kwargs):
22             for name, want_value in kwargs.iteritems():
23                 have_value = getattr(self, name)
24                 if (have_value != 0) and (have_value < want_value):
25                     return False
26             return True
27
28
29     def __init__(self, server_list, max_nodes=None):
30         self.sizes = [self.SizeWrapper(s, **kws) for s, kws in server_list]
31         self.sizes.sort(key=lambda s: s.price)
32         self.max_nodes = max_nodes or float("inf")
33
34     @staticmethod
35     def coerce_int(x, fallback):
36         try:
37             return int(x)
38         except (TypeError, ValueError):
39             return fallback
40
41     def size_for_constraints(self, constraints):
42         want_value = lambda key: self.coerce_int(constraints.get(key), 0)
43         wants = {'cores': want_value('min_cores_per_node'),
44                  'ram': want_value('min_ram_mb_per_node'),
45                  'scratch': want_value('min_scratch_mb_per_node')}
46         for size in self.sizes:
47             if size.meets_constraints(**wants):
48                 return size
49         return None
50
51     def servers_for_queue(self, queue):
52         servers = []
53         for job in queue:
54             constraints = job['runtime_constraints']
55             want_count = self.coerce_int(constraints.get('min_nodes'), 1)
56             size = self.size_for_constraints(constraints)
57             if (want_count < self.max_nodes) and (size is not None):
58                 servers.extend([size.real] * max(1, want_count))
59         return servers
60
61
62 class JobQueueMonitorActor(clientactor.RemotePollLoopActor):
63     CLIENT_ERRORS = (arverror.ApiError,)
64     LOGGER_NAME = 'arvnodeman.jobqueue'
65
66     def __init__(self, client, timer_actor, server_calc, *args, **kwargs):
67         super(JobQueueMonitorActor, self).__init__(
68             client, timer_actor, *args, **kwargs)
69         self._calculator = server_calc
70
71     def _send_request(self):
72         return self._client.jobs().queue().execute()['items']
73
74     def _got_response(self, queue):
75         server_list = self._calculator.servers_for_queue(queue)
76         self._logger.debug("Sending server wishlist: %s",
77                            ', '.join(s.name for s in server_list))
78         return super(JobQueueMonitorActor, self)._got_response(server_list)