3 import arvados.errors as arverror
5 from . import clientactor
7 class ServerCalculator(object):
8 class SizeWrapper(object):
9 def __init__(self, real_size, **kwargs):
11 for name in ['id', 'name', 'ram', 'disk', 'bandwidth', 'price',
13 setattr(self, name, getattr(self.real, name))
14 self.cores = kwargs.pop('cores')
15 self.scratch = self.disk
16 for name, override in kwargs.iteritems():
17 if not hasattr(self, name):
18 raise ValueError("unrecognized size field '%s'" % (name,))
19 setattr(self, name, override)
21 def meets_constraints(self, **kwargs):
22 for name, want_value in kwargs.iteritems():
23 have_value = getattr(self, name)
24 if (have_value != 0) and (have_value < want_value):
29 def __init__(self, server_list, max_nodes=None):
30 self.sizes = [self.SizeWrapper(s, **kws) for s, kws in server_list]
31 self.sizes.sort(key=lambda s: s.price)
32 self.max_nodes = max_nodes or float("inf")
35 def coerce_int(x, fallback):
38 except (TypeError, ValueError):
41 def size_for_constraints(self, constraints):
42 want_value = lambda key: self.coerce_int(constraints.get(key), 0)
43 wants = {'cores': want_value('min_cores_per_node'),
44 'ram': want_value('min_ram_mb_per_node'),
45 'scratch': want_value('min_scratch_mb_per_node')}
46 for size in self.sizes:
47 if size.meets_constraints(**wants):
51 def servers_for_queue(self, queue):
54 constraints = job['runtime_constraints']
55 want_count = self.coerce_int(constraints.get('min_nodes'), 1)
56 size = self.size_for_constraints(constraints)
57 if (want_count < self.max_nodes) and (size is not None):
58 servers.extend([size.real] * max(1, want_count))
62 class JobQueueMonitorActor(clientactor.RemotePollLoopActor):
63 CLIENT_ERRORS = (arverror.ApiError,)
64 LOGGER_NAME = 'arvnodeman.jobqueue'
66 def __init__(self, client, timer_actor, server_calc, *args, **kwargs):
67 super(JobQueueMonitorActor, self).__init__(
68 client, timer_actor, *args, **kwargs)
69 self._calculator = server_calc
71 def _send_request(self):
72 return self._client.jobs().queue().execute()['items']
74 def _got_response(self, queue):
75 server_list = self._calculator.servers_for_queue(queue)
76 self._logger.debug("Sending server wishlist: %s",
77 ', '.join(s.name for s in server_list))
78 return super(JobQueueMonitorActor, self)._got_response(server_list)