Merge branch '11684-unsigned-locator-fix'
[arvados.git] / services / nodemanager / arvnodeman / jobqueue.py
1 #!/usr/bin/env python
2
3 from __future__ import absolute_import, print_function
4
5 import logging
6 import subprocess
7
8 from . import clientactor
9 from .config import ARVADOS_ERRORS
10
11 class ServerCalculator(object):
12     """Generate cloud server wishlists from an Arvados job queue.
13
14     Instantiate this class with a list of cloud node sizes you're willing to
15     use, plus keyword overrides from the configuration.  Then you can pass
16     job queues to servers_for_queue.  It will return a list of node sizes
17     that would best satisfy the jobs, choosing the cheapest size that
18     satisfies each job, and ignoring jobs that can't be satisfied.
19     """
20
21     class CloudSizeWrapper(object):
22         def __init__(self, real_size, node_mem_scaling, **kwargs):
23             self.real = real_size
24             for name in ['id', 'name', 'ram', 'disk', 'bandwidth', 'price',
25                          'extra']:
26                 setattr(self, name, getattr(self.real, name))
27             self.cores = kwargs.pop('cores')
28             # libcloud disk sizes are in GB, Arvados/SLURM are in MB
29             # multiply by 1000 instead of 1024 to err on low side
30             self.scratch = self.disk * 1000
31             self.ram = int(self.ram * node_mem_scaling)
32             for name, override in kwargs.iteritems():
33                 if not hasattr(self, name):
34                     raise ValueError("unrecognized size field '%s'" % (name,))
35                 setattr(self, name, override)
36
37             if self.price is None:
38                 raise ValueError("Required field 'price' is None")
39
40         def meets_constraints(self, **kwargs):
41             for name, want_value in kwargs.iteritems():
42                 have_value = getattr(self, name)
43                 if (have_value != 0) and (have_value < want_value):
44                     return False
45             return True
46
47
48     def __init__(self, server_list, max_nodes=None, max_price=None,
49                  node_mem_scaling=0.95):
50         self.cloud_sizes = [self.CloudSizeWrapper(s, node_mem_scaling, **kws)
51                             for s, kws in server_list]
52         self.cloud_sizes.sort(key=lambda s: s.price)
53         self.max_nodes = max_nodes or float('inf')
54         self.max_price = max_price or float('inf')
55         self.logger = logging.getLogger('arvnodeman.jobqueue')
56         self.logged_jobs = set()
57
58         self.logger.info("Using cloud node sizes:")
59         for s in self.cloud_sizes:
60             self.logger.info(str(s.__dict__))
61
62     @staticmethod
63     def coerce_int(x, fallback):
64         try:
65             return int(x)
66         except (TypeError, ValueError):
67             return fallback
68
69     def cloud_size_for_constraints(self, constraints):
70         want_value = lambda key: self.coerce_int(constraints.get(key), 0)
71         wants = {'cores': want_value('min_cores_per_node'),
72                  'ram': want_value('min_ram_mb_per_node'),
73                  'scratch': want_value('min_scratch_mb_per_node')}
74         for size in self.cloud_sizes:
75             if size.meets_constraints(**wants):
76                 return size
77         return None
78
79     def servers_for_queue(self, queue):
80         servers = []
81         seen_jobs = set()
82         for job in queue:
83             seen_jobs.add(job['uuid'])
84             constraints = job['runtime_constraints']
85             want_count = max(1, self.coerce_int(constraints.get('min_nodes'), 1))
86             cloud_size = self.cloud_size_for_constraints(constraints)
87             if cloud_size is None:
88                 if job['uuid'] not in self.logged_jobs:
89                     self.logged_jobs.add(job['uuid'])
90                     self.logger.debug("job %s not satisfiable", job['uuid'])
91             elif (want_count <= self.max_nodes) and (want_count*cloud_size.price <= self.max_price):
92                 servers.extend([cloud_size.real] * want_count)
93         self.logged_jobs.intersection_update(seen_jobs)
94         return servers
95
96     def cheapest_size(self):
97         return self.cloud_sizes[0]
98
99     def find_size(self, sizeid):
100         for s in self.cloud_sizes:
101             if s.id == sizeid:
102                 return s
103         return None
104
105 class JobQueueMonitorActor(clientactor.RemotePollLoopActor):
106     """Actor to generate server wishlists from the job queue.
107
108     This actor regularly polls Arvados' job queue, and uses the provided
109     ServerCalculator to turn that into a list of requested node sizes.  That
110     list is sent to subscribers on every poll.
111     """
112
113     CLIENT_ERRORS = ARVADOS_ERRORS
114
115     def __init__(self, client, timer_actor, server_calc, *args, **kwargs):
116         super(JobQueueMonitorActor, self).__init__(
117             client, timer_actor, *args, **kwargs)
118         self._calculator = server_calc
119
120     @staticmethod
121     def coerce_to_mb(x):
122         v, u = x[:-1], x[-1]
123         if u in ("M", "m"):
124             return int(v)
125         elif u in ("G", "g"):
126             return float(v) * 2**10
127         elif u in ("T", "t"):
128             return float(v) * 2**20
129         elif u in ("P", "p"):
130             return float(v) * 2**30
131         else:
132             return int(x)
133
134     def _send_request(self):
135         # cpus, memory, tempory disk space, reason, job name
136         squeue_out = subprocess.check_output(["squeue", "--state=PENDING", "--noheader", "--format=%c|%m|%d|%r|%j"])
137         queuelist = []
138         for out in squeue_out.splitlines():
139             cpu, ram, disk, reason, jobname = out.split("|", 4)
140             if ("ReqNodeNotAvail" in reason) or ("Resources" in reason):
141                 queuelist.append({
142                     "uuid": jobname,
143                     "runtime_constraints": {
144                         "min_cores_per_node": cpu,
145                         "min_ram_mb_per_node": self.coerce_to_mb(ram),
146                         "min_scratch_mb_per_node": self.coerce_to_mb(disk)
147                     }
148                 })
149
150         queuelist.extend(self._client.jobs().queue().execute()['items'])
151
152         return queuelist
153
154     def _got_response(self, queue):
155         server_list = self._calculator.servers_for_queue(queue)
156         self._logger.debug("Calculated wishlist: %s",
157                            ', '.join(s.name for s in server_list) or "(empty)")
158         return super(JobQueueMonitorActor, self)._got_response(server_list)