3 from . import clientactor
4 from .config import ARVADOS_ERRORS
6 class ServerCalculator(object):
7 class SizeWrapper(object):
8 def __init__(self, real_size, **kwargs):
10 for name in ['id', 'name', 'ram', 'disk', 'bandwidth', 'price',
12 setattr(self, name, getattr(self.real, name))
13 self.cores = kwargs.pop('cores')
14 self.scratch = self.disk
15 for name, override in kwargs.iteritems():
16 if not hasattr(self, name):
17 raise ValueError("unrecognized size field '%s'" % (name,))
18 setattr(self, name, override)
20 def meets_constraints(self, **kwargs):
21 for name, want_value in kwargs.iteritems():
22 have_value = getattr(self, name)
23 if (have_value != 0) and (have_value < want_value):
28 def __init__(self, server_list, max_nodes=None):
29 self.sizes = [self.SizeWrapper(s, **kws) for s, kws in server_list]
30 self.sizes.sort(key=lambda s: s.price)
31 self.max_nodes = max_nodes or float("inf")
34 def coerce_int(x, fallback):
37 except (TypeError, ValueError):
40 def size_for_constraints(self, constraints):
41 want_value = lambda key: self.coerce_int(constraints.get(key), 0)
42 wants = {'cores': want_value('min_cores_per_node'),
43 'ram': want_value('min_ram_mb_per_node'),
44 'scratch': want_value('min_scratch_mb_per_node')}
45 for size in self.sizes:
46 if size.meets_constraints(**wants):
50 def servers_for_queue(self, queue):
53 constraints = job['runtime_constraints']
54 want_count = self.coerce_int(constraints.get('min_nodes'), 1)
55 size = self.size_for_constraints(constraints)
56 if (want_count < self.max_nodes) and (size is not None):
57 servers.extend([size.real] * max(1, want_count))
61 class JobQueueMonitorActor(clientactor.RemotePollLoopActor):
62 CLIENT_ERRORS = ARVADOS_ERRORS
63 LOGGER_NAME = 'arvnodeman.jobqueue'
65 def __init__(self, client, timer_actor, server_calc, *args, **kwargs):
66 super(JobQueueMonitorActor, self).__init__(
67 client, timer_actor, *args, **kwargs)
68 self._calculator = server_calc
70 def _send_request(self):
71 return self._client.jobs().queue().execute()['items']
73 def _got_response(self, queue):
74 server_list = self._calculator.servers_for_queue(queue)
75 self._logger.debug("Sending server wishlist: %s",
76 ', '.join(s.name for s in server_list))
77 return super(JobQueueMonitorActor, self)._got_response(server_list)