Fixup error spec in JobQueueMonitorActor.
[arvados.git] / services / nodemanager / arvnodeman / jobqueue.py
1 #!/usr/bin/env python
2
3 from . import clientactor
4 from .config import ARVADOS_ERRORS
5
6 class ServerCalculator(object):
7     class SizeWrapper(object):
8         def __init__(self, real_size, **kwargs):
9             self.real = real_size
10             for name in ['id', 'name', 'ram', 'disk', 'bandwidth', 'price',
11                          'extra']:
12                 setattr(self, name, getattr(self.real, name))
13             self.cores = kwargs.pop('cores')
14             self.scratch = self.disk
15             for name, override in kwargs.iteritems():
16                 if not hasattr(self, name):
17                     raise ValueError("unrecognized size field '%s'" % (name,))
18                 setattr(self, name, override)
19
20         def meets_constraints(self, **kwargs):
21             for name, want_value in kwargs.iteritems():
22                 have_value = getattr(self, name)
23                 if (have_value != 0) and (have_value < want_value):
24                     return False
25             return True
26
27
28     def __init__(self, server_list, max_nodes=None):
29         self.sizes = [self.SizeWrapper(s, **kws) for s, kws in server_list]
30         self.sizes.sort(key=lambda s: s.price)
31         self.max_nodes = max_nodes or float("inf")
32
33     @staticmethod
34     def coerce_int(x, fallback):
35         try:
36             return int(x)
37         except (TypeError, ValueError):
38             return fallback
39
40     def size_for_constraints(self, constraints):
41         want_value = lambda key: self.coerce_int(constraints.get(key), 0)
42         wants = {'cores': want_value('min_cores_per_node'),
43                  'ram': want_value('min_ram_mb_per_node'),
44                  'scratch': want_value('min_scratch_mb_per_node')}
45         for size in self.sizes:
46             if size.meets_constraints(**wants):
47                 return size
48         return None
49
50     def servers_for_queue(self, queue):
51         servers = []
52         for job in queue:
53             constraints = job['runtime_constraints']
54             want_count = self.coerce_int(constraints.get('min_nodes'), 1)
55             size = self.size_for_constraints(constraints)
56             if (want_count < self.max_nodes) and (size is not None):
57                 servers.extend([size.real] * max(1, want_count))
58         return servers
59
60
61 class JobQueueMonitorActor(clientactor.RemotePollLoopActor):
62     CLIENT_ERRORS = ARVADOS_ERRORS
63     LOGGER_NAME = 'arvnodeman.jobqueue'
64
65     def __init__(self, client, timer_actor, server_calc, *args, **kwargs):
66         super(JobQueueMonitorActor, self).__init__(
67             client, timer_actor, *args, **kwargs)
68         self._calculator = server_calc
69
70     def _send_request(self):
71         return self._client.jobs().queue().execute()['items']
72
73     def _got_response(self, queue):
74         server_list = self._calculator.servers_for_queue(queue)
75         self._logger.debug("Sending server wishlist: %s",
76                            ', '.join(s.name for s in server_list))
77         return super(JobQueueMonitorActor, self)._got_response(server_list)