Merge branch '7667-node-manager-logging' refs #7667
[arvados.git] / services / nodemanager / arvnodeman / jobqueue.py
1 #!/usr/bin/env python
2
3 from __future__ import absolute_import, print_function
4
5 import logging
6
7 from . import clientactor
8 from .config import ARVADOS_ERRORS
9
10 class ServerCalculator(object):
11     """Generate cloud server wishlists from an Arvados job queue.
12
13     Instantiate this class with a list of cloud node sizes you're willing to
14     use, plus keyword overrides from the configuration.  Then you can pass
15     job queues to servers_for_queue.  It will return a list of node sizes
16     that would best satisfy the jobs, choosing the cheapest size that
17     satisfies each job, and ignoring jobs that can't be satisfied.
18     """
19
20     class CloudSizeWrapper(object):
21         def __init__(self, real_size, **kwargs):
22             self.real = real_size
23             for name in ['id', 'name', 'ram', 'disk', 'bandwidth', 'price',
24                          'extra']:
25                 setattr(self, name, getattr(self.real, name))
26             self.cores = kwargs.pop('cores')
27             self.scratch = self.disk
28             for name, override in kwargs.iteritems():
29                 if not hasattr(self, name):
30                     raise ValueError("unrecognized size field '%s'" % (name,))
31                 setattr(self, name, override)
32
33             if self.price is None:
34                 raise ValueError("Required field 'price' is None")
35
36         def meets_constraints(self, **kwargs):
37             for name, want_value in kwargs.iteritems():
38                 have_value = getattr(self, name)
39                 if (have_value != 0) and (have_value < want_value):
40                     return False
41             return True
42
43
44     def __init__(self, server_list, max_nodes=None, max_price=None):
45         self.cloud_sizes = [self.CloudSizeWrapper(s, **kws)
46                             for s, kws in server_list]
47         self.cloud_sizes.sort(key=lambda s: s.price)
48         self.max_nodes = max_nodes or float('inf')
49         self.max_price = max_price or float('inf')
50         self.logger = logging.getLogger('arvnodeman.jobqueue')
51         self.logged_jobs = set()
52
53     @staticmethod
54     def coerce_int(x, fallback):
55         try:
56             return int(x)
57         except (TypeError, ValueError):
58             return fallback
59
60     def cloud_size_for_constraints(self, constraints):
61         want_value = lambda key: self.coerce_int(constraints.get(key), 0)
62         wants = {'cores': want_value('min_cores_per_node'),
63                  'ram': want_value('min_ram_mb_per_node'),
64                  'scratch': want_value('min_scratch_mb_per_node')}
65         for size in self.cloud_sizes:
66             if size.meets_constraints(**wants):
67                 return size
68         return None
69
70     def servers_for_queue(self, queue):
71         servers = []
72         seen_jobs = set()
73         for job in queue:
74             seen_jobs.add(job['uuid'])
75             constraints = job['runtime_constraints']
76             want_count = max(1, self.coerce_int(constraints.get('min_nodes'), 1))
77             cloud_size = self.cloud_size_for_constraints(constraints)
78             if cloud_size is None:
79                 if job['uuid'] not in self.logged_jobs:
80                     self.logged_jobs.add(job['uuid'])
81                     self.logger.debug("job %s not satisfiable", job['uuid'])
82             elif (want_count <= self.max_nodes) and (want_count*cloud_size.price <= self.max_price):
83                 servers.extend([cloud_size.real] * want_count)
84         self.logged_jobs.intersection_update(seen_jobs)
85         return servers
86
87     def cheapest_size(self):
88         return self.cloud_sizes[0]
89
90     def find_size(self, sizeid):
91         for s in self.cloud_sizes:
92             if s.id == sizeid:
93                 return s
94         return None
95
96 class JobQueueMonitorActor(clientactor.RemotePollLoopActor):
97     """Actor to generate server wishlists from the job queue.
98
99     This actor regularly polls Arvados' job queue, and uses the provided
100     ServerCalculator to turn that into a list of requested node sizes.  That
101     list is sent to subscribers on every poll.
102     """
103
104     CLIENT_ERRORS = ARVADOS_ERRORS
105
106     def __init__(self, client, timer_actor, server_calc, *args, **kwargs):
107         super(JobQueueMonitorActor, self).__init__(
108             client, timer_actor, *args, **kwargs)
109         self._calculator = server_calc
110
111     def _send_request(self):
112         return self._client.jobs().queue().execute()['items']
113
114     def _got_response(self, queue):
115         server_list = self._calculator.servers_for_queue(queue)
116         self._logger.debug("Calculated wishlist: %s",
117                            ', '.join(s.name for s in server_list) or "(empty)")
118         return super(JobQueueMonitorActor, self)._got_response(server_list)