Merge branch '13078-badconstraints'
[arvados.git] / services / nodemanager / arvnodeman / jobqueue.py
1 #!/usr/bin/env python
2 # Copyright (C) The Arvados Authors. All rights reserved.
3 #
4 # SPDX-License-Identifier: AGPL-3.0
5
6 from __future__ import absolute_import, print_function
7
8 import logging
9 import re
10 import subprocess
11
12 import arvados.util
13
14 from . import clientactor
15 from .config import ARVADOS_ERRORS
16
17
18 class ServerCalculator(object):
19     """Generate cloud server wishlists from an Arvados job queue.
20
21     Instantiate this class with a list of cloud node sizes you're willing to
22     use, plus keyword overrides from the configuration.  Then you can pass
23     job queues to servers_for_queue.  It will return a list of node sizes
24     that would best satisfy the jobs, choosing the cheapest size that
25     satisfies each job, and ignoring jobs that can't be satisfied.
26     """
27
28     class CloudSizeWrapper(object):
29         def __init__(self, real_size, node_mem_scaling, **kwargs):
30             self.real = real_size
31             for name in ['id', 'name', 'ram', 'disk', 'bandwidth', 'price',
32                          'extra']:
33                 setattr(self, name, getattr(self.real, name))
34             self.cores = kwargs.pop('cores')
35             # libcloud disk sizes are in GB, Arvados/SLURM are in MB
36             # multiply by 1000 instead of 1024 to err on low side
37             if self.disk is None:
38                 self.disk = 0
39             self.scratch = self.disk * 1000
40             self.ram = int(self.ram * node_mem_scaling)
41             for name, override in kwargs.iteritems():
42                 if not hasattr(self, name):
43                     raise ValueError("unrecognized size field '%s'" % (name,))
44                 setattr(self, name, override)
45
46             if self.price is None:
47                 raise ValueError("Required field 'price' is None")
48
49         def meets_constraints(self, **kwargs):
50             for name, want_value in kwargs.iteritems():
51                 have_value = getattr(self, name)
52                 if (have_value != 0) and (have_value < want_value):
53                     return False
54             return True
55
56
57     def __init__(self, server_list, max_nodes=None, max_price=None,
58                  node_mem_scaling=0.95):
59         self.cloud_sizes = [self.CloudSizeWrapper(s, node_mem_scaling, **kws)
60                             for s, kws in server_list]
61         self.cloud_sizes.sort(key=lambda s: s.price)
62         self.max_nodes = max_nodes or float('inf')
63         self.max_price = max_price or float('inf')
64         self.logger = logging.getLogger('arvnodeman.jobqueue')
65
66         self.logger.info("Using cloud node sizes:")
67         for s in self.cloud_sizes:
68             self.logger.info(str(s.__dict__))
69
70     @staticmethod
71     def coerce_int(x, fallback):
72         try:
73             return int(x)
74         except (TypeError, ValueError):
75             return fallback
76
77     def cloud_size_for_constraints(self, constraints):
78         specified_size = constraints.get('instance_type')
79         want_value = lambda key: self.coerce_int(constraints.get(key), 0)
80         wants = {'cores': want_value('min_cores_per_node'),
81                  'ram': want_value('min_ram_mb_per_node'),
82                  'scratch': want_value('min_scratch_mb_per_node')}
83         for size in self.cloud_sizes:
84             if (size.meets_constraints(**wants) and
85                 (specified_size is None or size.id == specified_size)):
86                     return size
87         return None
88
89     def servers_for_queue(self, queue):
90         servers = []
91         unsatisfiable_jobs = {}
92         for job in queue:
93             constraints = job['runtime_constraints']
94             want_count = max(1, self.coerce_int(constraints.get('min_nodes'), 1))
95             cloud_size = self.cloud_size_for_constraints(constraints)
96             if cloud_size is None:
97                 unsatisfiable_jobs[job['uuid']] = (
98                     "Constraints cannot be satisfied by any node type")
99             elif (want_count > self.max_nodes):
100                 unsatisfiable_jobs[job['uuid']] = (
101                     "Job's min_nodes constraint is greater than the configured "
102                     "max_nodes (%d)" % self.max_nodes)
103             elif (want_count*cloud_size.price <= self.max_price):
104                 servers.extend([cloud_size.real] * want_count)
105             else:
106                 unsatisfiable_jobs[job['uuid']] = (
107                     "Job's price (%d) is above system's max_price "
108                     "limit (%d)" % (want_count*cloud_size.price, self.max_price))
109         return (servers, unsatisfiable_jobs)
110
111     def cheapest_size(self):
112         return self.cloud_sizes[0]
113
114     def find_size(self, sizeid):
115         for s in self.cloud_sizes:
116             if s.id == sizeid:
117                 return s
118         return None
119
120
121 class JobQueueMonitorActor(clientactor.RemotePollLoopActor):
122     """Actor to generate server wishlists from the job queue.
123
124     This actor regularly polls Arvados' job queue, and uses the provided
125     ServerCalculator to turn that into a list of requested node sizes.  That
126     list is sent to subscribers on every poll.
127     """
128
129     CLIENT_ERRORS = ARVADOS_ERRORS
130
131     def __init__(self, client, timer_actor, server_calc,
132                  jobs_queue, slurm_queue, *args, **kwargs):
133         super(JobQueueMonitorActor, self).__init__(
134             client, timer_actor, *args, **kwargs)
135         self.jobs_queue = jobs_queue
136         self.slurm_queue = slurm_queue
137         self._calculator = server_calc
138
139     @staticmethod
140     def coerce_to_mb(x):
141         v, u = x[:-1], x[-1]
142         if u in ("M", "m"):
143             return int(v)
144         elif u in ("G", "g"):
145             return float(v) * 2**10
146         elif u in ("T", "t"):
147             return float(v) * 2**20
148         elif u in ("P", "p"):
149             return float(v) * 2**30
150         else:
151             return int(x)
152
153     def _send_request(self):
154         queuelist = []
155         if self.slurm_queue:
156             # cpus, memory, tempory disk space, reason, job name, feature constraints
157             squeue_out = subprocess.check_output(["squeue", "--state=PENDING", "--noheader", "--format=%c|%m|%d|%r|%j|%f"])
158             for out in squeue_out.splitlines():
159                 try:
160                     cpu, ram, disk, reason, jobname, features = out.split("|", 5)
161                 except ValueError:
162                     self._logger.warning("ignored malformed line in squeue output: %r", out)
163                     continue
164                 if '-dz642-' not in jobname:
165                     continue
166                 if not re.search(r'BadConstraints|ReqNodeNotAvail|Resources|Priority', reason):
167                     continue
168
169                 for feature in features.split(','):
170                     m = re.match(r'instancetype=(.*)', feature)
171                     if not m:
172                         continue
173                     instance_type = m.group(1)
174                     # Ignore cpu/ram/scratch requirements, bring up
175                     # the requested node type.
176                     queuelist.append({
177                         "uuid": jobname,
178                         "runtime_constraints": {
179                             "instance_type": instance_type,
180                         }
181                     })
182                     break
183                 else:
184                     # No instance type specified. Choose a node type
185                     # to suit cpu/ram/scratch requirements.
186                     queuelist.append({
187                         "uuid": jobname,
188                         "runtime_constraints": {
189                             "min_cores_per_node": cpu,
190                             "min_ram_mb_per_node": self.coerce_to_mb(ram),
191                             "min_scratch_mb_per_node": self.coerce_to_mb(disk)
192                         }
193                     })
194
195         if self.jobs_queue:
196             queuelist.extend(self._client.jobs().queue().execute()['items'])
197
198         return queuelist
199
200     def _got_response(self, queue):
201         server_list, unsatisfiable_jobs = self._calculator.servers_for_queue(queue)
202         # Cancel any job/container with unsatisfiable requirements, emitting
203         # a log explaining why.
204         for job_uuid, reason in unsatisfiable_jobs.iteritems():
205             try:
206                 self._client.logs().create(body={
207                     'object_uuid': job_uuid,
208                     'event_type': 'stderr',
209                     'properties': {'text': reason},
210                 }).execute()
211                 # Cancel the job depending on its type
212                 if arvados.util.container_uuid_pattern.match(job_uuid):
213                     subprocess.check_call(['scancel', '--name='+job_uuid])
214                 elif arvados.util.job_uuid_pattern.match(job_uuid):
215                     self._client.jobs().cancel(uuid=job_uuid).execute()
216                 else:
217                     raise Exception('Unknown job type')
218                 self._logger.debug("Cancelled unsatisfiable job '%s'", job_uuid)
219             except Exception as error:
220                 self._logger.error("Trying to cancel job '%s': %s",
221                                    job_uuid,
222                                    error)
223         self._logger.debug("Calculated wishlist: %s",
224                            ', '.join(s.name for s in server_list) or "(empty)")
225         return super(JobQueueMonitorActor, self)._got_response(server_list)