3 from __future__ import absolute_import, print_function
5 from . import clientactor
6 from .config import ARVADOS_ERRORS
8 class ServerCalculator(object):
9 """Generate cloud server wishlists from an Arvados job queue.
11 Instantiate this class with a list of cloud node sizes you're willing to
12 use, plus keyword overrides from the configuration. Then you can pass
13 job queues to servers_for_queue. It will return a list of node sizes
14 that would best satisfy the jobs, choosing the cheapest size that
15 satisfies each job, and ignoring jobs that can't be satisfied.
18 class CloudSizeWrapper(object):
19 def __init__(self, real_size, **kwargs):
21 for name in ['id', 'name', 'ram', 'disk', 'bandwidth', 'price',
23 setattr(self, name, getattr(self.real, name))
24 self.cores = kwargs.pop('cores')
25 self.scratch = self.disk
26 for name, override in kwargs.iteritems():
27 if not hasattr(self, name):
28 raise ValueError("unrecognized size field '%s'" % (name,))
29 setattr(self, name, override)
31 def meets_constraints(self, **kwargs):
32 for name, want_value in kwargs.iteritems():
33 have_value = getattr(self, name)
34 if (have_value != 0) and (have_value < want_value):
39 def __init__(self, server_list, max_nodes=None):
40 self.cloud_sizes = [self.CloudSizeWrapper(s, **kws)
41 for s, kws in server_list]
42 self.cloud_sizes.sort(key=lambda s: s.price)
43 self.max_nodes = max_nodes or float("inf")
46 def coerce_int(x, fallback):
49 except (TypeError, ValueError):
52 def cloud_size_for_constraints(self, constraints):
53 want_value = lambda key: self.coerce_int(constraints.get(key), 0)
54 wants = {'cores': want_value('min_cores_per_node'),
55 'ram': want_value('min_ram_mb_per_node'),
56 'scratch': want_value('min_scratch_mb_per_node')}
57 for size in self.cloud_sizes:
58 if size.meets_constraints(**wants):
62 def servers_for_queue(self, queue):
65 constraints = job['runtime_constraints']
66 want_count = self.coerce_int(constraints.get('min_nodes'), 1)
67 cloud_size = self.cloud_size_for_constraints(constraints)
68 if (want_count < self.max_nodes) and (cloud_size is not None):
69 servers.extend([cloud_size.real] * max(1, want_count))
73 class JobQueueMonitorActor(clientactor.RemotePollLoopActor):
74 """Actor to generate server wishlists from the job queue.
76 This actor regularly polls Arvados' job queue, and uses the provided
77 ServerCalculator to turn that into a list of requested node sizes. That
78 list is sent to subscribers on every poll.
81 CLIENT_ERRORS = ARVADOS_ERRORS
82 LOGGER_NAME = 'arvnodeman.jobqueue'
84 def __init__(self, client, timer_actor, server_calc, *args, **kwargs):
85 super(JobQueueMonitorActor, self).__init__(
86 client, timer_actor, *args, **kwargs)
87 self._calculator = server_calc
89 def _send_request(self):
90 return self._client.jobs().queue().execute()['items']
92 def _got_response(self, queue):
93 server_list = self._calculator.servers_for_queue(queue)
94 self._logger.debug("Sending server wishlist: %s",
95 ', '.join(s.name for s in server_list))
96 return super(JobQueueMonitorActor, self)._got_response(server_list)