13166: Sort optimization & wishlist ordering testing.
[arvados.git] / services / nodemanager / arvnodeman / jobqueue.py
1 #!/usr/bin/env python
2 # Copyright (C) The Arvados Authors. All rights reserved.
3 #
4 # SPDX-License-Identifier: AGPL-3.0
5
6 from __future__ import absolute_import, print_function
7
8 import logging
9 import re
10 import subprocess
11
12 import arvados.util
13
14 from . import clientactor
15 from .config import ARVADOS_ERRORS
16
17
18 class ServerCalculator(object):
19     """Generate cloud server wishlists from an Arvados job queue.
20
21     Instantiate this class with a list of cloud node sizes you're willing to
22     use, plus keyword overrides from the configuration.  Then you can pass
23     job queues to servers_for_queue.  It will return a list of node sizes
24     that would best satisfy the jobs, choosing the cheapest size that
25     satisfies each job, and ignoring jobs that can't be satisfied.
26     """
27
28     class CloudSizeWrapper(object):
29         def __init__(self, real_size, node_mem_scaling, **kwargs):
30             self.real = real_size
31             for name in ['id', 'name', 'ram', 'disk', 'bandwidth', 'price',
32                          'extra']:
33                 setattr(self, name, getattr(self.real, name))
34             self.cores = kwargs.pop('cores')
35             # libcloud disk sizes are in GB, Arvados/SLURM are in MB
36             # multiply by 1000 instead of 1024 to err on low side
37             if self.disk is None:
38                 self.disk = 0
39             self.scratch = self.disk * 1000
40             self.ram = int(self.ram * node_mem_scaling)
41             for name, override in kwargs.iteritems():
42                 if not hasattr(self, name):
43                     raise ValueError("unrecognized size field '%s'" % (name,))
44                 setattr(self, name, override)
45
46             if self.price is None:
47                 raise ValueError("Required field 'price' is None")
48
49         def meets_constraints(self, **kwargs):
50             for name, want_value in kwargs.iteritems():
51                 have_value = getattr(self, name)
52                 if (have_value != 0) and (have_value < want_value):
53                     return False
54             return True
55
56
57     def __init__(self, server_list, max_nodes=None, max_price=None,
58                  node_mem_scaling=0.95):
59         self.cloud_sizes = [self.CloudSizeWrapper(s, node_mem_scaling, **kws)
60                             for s, kws in server_list]
61         self.cloud_sizes.sort(key=lambda s: s.price)
62         self.max_nodes = max_nodes or float('inf')
63         self.max_price = max_price or float('inf')
64         self.logger = logging.getLogger('arvnodeman.jobqueue')
65
66         self.logger.info("Using cloud node sizes:")
67         for s in self.cloud_sizes:
68             self.logger.info(str(s.__dict__))
69
70     @staticmethod
71     def coerce_int(x, fallback):
72         try:
73             return int(x)
74         except (TypeError, ValueError):
75             return fallback
76
77     def cloud_size_for_constraints(self, constraints):
78         specified_size = constraints.get('instance_type')
79         want_value = lambda key: self.coerce_int(constraints.get(key), 0)
80         wants = {'cores': want_value('min_cores_per_node'),
81                  'ram': want_value('min_ram_mb_per_node'),
82                  'scratch': want_value('min_scratch_mb_per_node')}
83         for size in self.cloud_sizes:
84             if (size.meets_constraints(**wants) and
85                 (specified_size is None or size.id == specified_size)):
86                     return size
87         return None
88
89     def servers_for_queue(self, queue):
90         servers = []
91         unsatisfiable_jobs = {}
92         for job in queue:
93             constraints = job['runtime_constraints']
94             want_count = max(1, self.coerce_int(constraints.get('min_nodes'), 1))
95             cloud_size = self.cloud_size_for_constraints(constraints)
96             if cloud_size is None:
97                 unsatisfiable_jobs[job['uuid']] = (
98                     "Constraints cannot be satisfied by any node type")
99             elif (want_count > self.max_nodes):
100                 unsatisfiable_jobs[job['uuid']] = (
101                     "Job's min_nodes constraint is greater than the configured "
102                     "max_nodes (%d)" % self.max_nodes)
103             elif (want_count*cloud_size.price <= self.max_price):
104                 servers.extend([cloud_size.real] * want_count)
105             else:
106                 unsatisfiable_jobs[job['uuid']] = (
107                     "Job's price (%d) is above system's max_price "
108                     "limit (%d)" % (want_count*cloud_size.price, self.max_price))
109         return (servers, unsatisfiable_jobs)
110
111     def cheapest_size(self):
112         return self.cloud_sizes[0]
113
114     def find_size(self, sizeid):
115         for s in self.cloud_sizes:
116             if s.id == sizeid:
117                 return s
118         return None
119
120
121 class JobQueueMonitorActor(clientactor.RemotePollLoopActor):
122     """Actor to generate server wishlists from the job queue.
123
124     This actor regularly polls Arvados' job queue, and uses the provided
125     ServerCalculator to turn that into a list of requested node sizes.  That
126     list is sent to subscribers on every poll.
127     """
128
129     CLIENT_ERRORS = ARVADOS_ERRORS
130
131     def __init__(self, client, timer_actor, server_calc,
132                  jobs_queue, slurm_queue, *args, **kwargs):
133         super(JobQueueMonitorActor, self).__init__(
134             client, timer_actor, *args, **kwargs)
135         self.jobs_queue = jobs_queue
136         self.slurm_queue = slurm_queue
137         self._calculator = server_calc
138
139     @staticmethod
140     def coerce_to_mb(x):
141         v, u = x[:-1], x[-1]
142         if u in ("M", "m"):
143             return int(v)
144         elif u in ("G", "g"):
145             return float(v) * 2**10
146         elif u in ("T", "t"):
147             return float(v) * 2**20
148         elif u in ("P", "p"):
149             return float(v) * 2**30
150         else:
151             return int(x)
152
153     def _send_request(self):
154         queuelist = []
155         if self.slurm_queue:
156             # cpus, memory, tempory disk space, reason, job name, feature constraints, priority
157             squeue_out = subprocess.check_output(["squeue", "--state=PENDING", "--noheader", "--format=%c|%m|%d|%r|%j|%f|%Q"])
158             for out in squeue_out.splitlines():
159                 try:
160                     cpu, ram, disk, reason, jobname, features, priority = out.split("|", 6)
161                 except ValueError:
162                     self._logger.warning("ignored malformed line in squeue output: %r", out)
163                     continue
164                 if '-dz642-' not in jobname:
165                     continue
166                 if not re.search(r'BadConstraints|ReqNodeNotAvail|Resources|Priority', reason):
167                     continue
168
169                 for feature in features.split(','):
170                     m = re.match(r'instancetype=(.*)', feature)
171                     if not m:
172                         continue
173                     instance_type = m.group(1)
174                     # Ignore cpu/ram/scratch requirements, bring up
175                     # the requested node type.
176                     queuelist.append({
177                         "uuid": jobname,
178                         "runtime_constraints": {
179                             "instance_type": instance_type,
180                         },
181                         "priority": int(priority)
182                     })
183                     break
184                 else:
185                     # No instance type specified. Choose a node type
186                     # to suit cpu/ram/scratch requirements.
187                     queuelist.append({
188                         "uuid": jobname,
189                         "runtime_constraints": {
190                             "min_cores_per_node": cpu,
191                             "min_ram_mb_per_node": self.coerce_to_mb(ram),
192                             "min_scratch_mb_per_node": self.coerce_to_mb(disk)
193                         },
194                         "priority": int(priority)
195                     })
196             queuelist.sort(key=lambda x: x.get('priority', 1), reverse=True)
197
198         if self.jobs_queue:
199             queuelist.extend(self._client.jobs().queue().execute()['items'])
200
201         return queuelist
202
203     def _got_response(self, queue):
204         server_list, unsatisfiable_jobs = self._calculator.servers_for_queue(queue)
205         # Cancel any job/container with unsatisfiable requirements, emitting
206         # a log explaining why.
207         for job_uuid, reason in unsatisfiable_jobs.iteritems():
208             try:
209                 self._client.logs().create(body={
210                     'object_uuid': job_uuid,
211                     'event_type': 'stderr',
212                     'properties': {'text': reason},
213                 }).execute()
214                 # Cancel the job depending on its type
215                 if arvados.util.container_uuid_pattern.match(job_uuid):
216                     subprocess.check_call(['scancel', '--name='+job_uuid])
217                 elif arvados.util.job_uuid_pattern.match(job_uuid):
218                     self._client.jobs().cancel(uuid=job_uuid).execute()
219                 else:
220                     raise Exception('Unknown job type')
221                 self._logger.debug("Cancelled unsatisfiable job '%s'", job_uuid)
222             except Exception as error:
223                 self._logger.error("Trying to cancel job '%s': %s",
224                                    job_uuid,
225                                    error)
226         self._logger.debug("Calculated wishlist: %s",
227                            ', '.join(s.name for s in server_list) or "(empty)")
228         return super(JobQueueMonitorActor, self)._got_response(server_list)