13937: Export stats as prometheus metrics. (WIP)
[arvados.git] / services / nodemanager / arvnodeman / jobqueue.py
1 #!/usr/bin/env python
2 # Copyright (C) The Arvados Authors. All rights reserved.
3 #
4 # SPDX-License-Identifier: AGPL-3.0
5
6 from __future__ import absolute_import, print_function
7
8 import logging
9 import re
10 import subprocess32 as subprocess
11
12 import arvados.util
13
14 from . import clientactor
15 from .config import ARVADOS_ERRORS
16
17
18 class ServerCalculator(object):
19     """Generate cloud server wishlists from an Arvados job queue.
20
21     Instantiate this class with a list of cloud node sizes you're willing to
22     use, plus keyword overrides from the configuration.  Then you can pass
23     job queues to servers_for_queue.  It will return a list of node sizes
24     that would best satisfy the jobs, choosing the cheapest size that
25     satisfies each job, and ignoring jobs that can't be satisfied.
26     """
27     class InvalidCloudSize(object):
28         """
29         Dummy CloudSizeWrapper-like class, to be used when a cloud node doesn't
30         have a recognizable arvados_node_size tag.
31         """
32         def __init__(self):
33             self.id = 'invalid'
34             self.name = 'invalid'
35             self.ram = 0
36             self.disk = 0
37             self.scratch = 0
38             self.cores = 0
39             self.bandwidth = 0
40             # price is multiplied by 1000 to get the node weight
41             # the maximum node weight is                  4294967280
42             # so use invalid node weight 4294967 * 1000 = 4294967000
43             self.price = 4294967
44             self.preemptible = False
45             self.extra = {}
46
47         def meets_constraints(self, **kwargs):
48             return False
49
50
51     class CloudSizeWrapper(object):
52         def __init__(self, real_size, node_mem_scaling, **kwargs):
53             self.real = real_size
54             for name in ['id', 'name', 'ram', 'disk', 'bandwidth', 'price',
55                          'extra']:
56                 setattr(self, name, getattr(self.real, name))
57             self.cores = kwargs.pop('cores')
58             # libcloud disk sizes are in GB, Arvados/SLURM are in MB
59             # multiply by 1000 instead of 1024 to err on low side
60             if self.disk is None:
61                 self.disk = 0
62             self.scratch = self.disk * 1000
63             self.ram = int(self.ram * node_mem_scaling)
64             self.preemptible = False
65             for name, override in kwargs.iteritems():
66                 if name == 'instance_type': continue
67                 if not hasattr(self, name):
68                     raise ValueError("unrecognized size field '%s'" % (name,))
69                 setattr(self, name, override)
70
71             if self.price is None:
72                 raise ValueError("Required field 'price' is None")
73
74         def meets_constraints(self, **kwargs):
75             for name, want_value in kwargs.iteritems():
76                 have_value = getattr(self, name)
77                 if (have_value != 0) and (have_value < want_value):
78                     return False
79             return True
80
81
82     def __init__(self, server_list, max_nodes=None, max_price=None,
83                  node_mem_scaling=0.95):
84         self.cloud_sizes = [self.CloudSizeWrapper(s, node_mem_scaling, **kws)
85                             for s, kws in server_list]
86         self.cloud_sizes.sort(key=lambda s: s.price)
87         self.max_nodes = max_nodes or float('inf')
88         self.max_price = max_price or float('inf')
89         self.logger = logging.getLogger('arvnodeman.jobqueue')
90
91         self.logger.info("Using cloud node sizes:")
92         for s in self.cloud_sizes:
93             self.logger.info(str(s.__dict__))
94
95     @staticmethod
96     def coerce_int(x, fallback):
97         try:
98             return int(x)
99         except (TypeError, ValueError):
100             return fallback
101
102     def cloud_size_for_constraints(self, constraints):
103         specified_size = constraints.get('instance_type')
104         want_value = lambda key: self.coerce_int(constraints.get(key), 0)
105         wants = {'cores': want_value('min_cores_per_node'),
106                  'ram': want_value('min_ram_mb_per_node'),
107                  'scratch': want_value('min_scratch_mb_per_node')}
108         # EC2 node sizes are identified by id. GCE sizes are identified by name.
109         for size in self.cloud_sizes:
110             if (size.meets_constraints(**wants) and
111                 (specified_size is None or
112                     size.id == specified_size or size.name == specified_size)):
113                         return size
114         return None
115
116     def servers_for_queue(self, queue):
117         servers = []
118         unsatisfiable_jobs = {}
119         for job in queue:
120             constraints = job['runtime_constraints']
121             want_count = max(1, self.coerce_int(constraints.get('min_nodes'), 1))
122             cloud_size = self.cloud_size_for_constraints(constraints)
123             if cloud_size is None:
124                 unsatisfiable_jobs[job['uuid']] = (
125                     "Constraints cannot be satisfied by any node type")
126             elif (want_count > self.max_nodes):
127                 unsatisfiable_jobs[job['uuid']] = (
128                     "Job's min_nodes constraint is greater than the configured "
129                     "max_nodes (%d)" % self.max_nodes)
130             elif (want_count*cloud_size.price <= self.max_price):
131                 servers.extend([cloud_size] * want_count)
132             else:
133                 unsatisfiable_jobs[job['uuid']] = (
134                     "Job's price (%d) is above system's max_price "
135                     "limit (%d)" % (want_count*cloud_size.price, self.max_price))
136         return (servers, unsatisfiable_jobs)
137
138     def cheapest_size(self):
139         return self.cloud_sizes[0]
140
141     def find_size(self, sizeid):
142         for s in self.cloud_sizes:
143             if s.id == sizeid:
144                 return s
145         return self.InvalidCloudSize()
146
147
148 class JobQueueMonitorActor(clientactor.RemotePollLoopActor):
149     """Actor to generate server wishlists from the job queue.
150
151     This actor regularly polls Arvados' job queue, and uses the provided
152     ServerCalculator to turn that into a list of requested node sizes.  That
153     list is sent to subscribers on every poll.
154     """
155
156     CLIENT_ERRORS = ARVADOS_ERRORS
157
158     def __init__(self, client, timer_actor, server_calc,
159                  jobs_queue, slurm_queue, *args, **kwargs):
160         super(JobQueueMonitorActor, self).__init__(
161             client, timer_actor, *args, **kwargs)
162         self.jobs_queue = jobs_queue
163         self.slurm_queue = slurm_queue
164         self._calculator = server_calc
165
166     @staticmethod
167     def coerce_to_mb(x):
168         v, u = x[:-1], x[-1]
169         if u in ("M", "m"):
170             return int(v)
171         elif u in ("G", "g"):
172             return float(v) * 2**10
173         elif u in ("T", "t"):
174             return float(v) * 2**20
175         elif u in ("P", "p"):
176             return float(v) * 2**30
177         else:
178             return int(x)
179
180     def _send_request(self):
181         queuelist = []
182         if self.slurm_queue:
183             # cpus, memory, tempory disk space, reason, job name, feature constraints, priority
184             squeue_out = subprocess.check_output(["squeue", "--state=PENDING", "--noheader", "--format=%c|%m|%d|%r|%j|%f|%Q"])
185             for out in squeue_out.splitlines():
186                 try:
187                     cpu, ram, disk, reason, jobname, features, priority = out.split("|", 6)
188                 except ValueError:
189                     self._logger.warning("ignored malformed line in squeue output: %r", out)
190                     continue
191                 if '-dz642-' not in jobname:
192                     continue
193                 if not re.search(r'BadConstraints|ReqNodeNotAvail|Resources|Priority', reason):
194                     continue
195
196                 for feature in features.split(','):
197                     m = re.match(r'instancetype=(.*)', feature)
198                     if not m:
199                         continue
200                     instance_type = m.group(1)
201                     # Ignore cpu/ram/scratch requirements, bring up
202                     # the requested node type.
203                     queuelist.append({
204                         "uuid": jobname,
205                         "runtime_constraints": {
206                             "instance_type": instance_type,
207                         },
208                         "priority": int(priority)
209                     })
210                     break
211                 else:
212                     # No instance type specified. Choose a node type
213                     # to suit cpu/ram/scratch requirements.
214                     queuelist.append({
215                         "uuid": jobname,
216                         "runtime_constraints": {
217                             "min_cores_per_node": cpu,
218                             "min_ram_mb_per_node": self.coerce_to_mb(ram),
219                             "min_scratch_mb_per_node": self.coerce_to_mb(disk)
220                         },
221                         "priority": int(priority)
222                     })
223             queuelist.sort(key=lambda x: x.get('priority', 1), reverse=True)
224
225         if self.jobs_queue:
226             queuelist.extend(self._client.jobs().queue().execute()['items'])
227
228         return queuelist
229
230     def _got_response(self, queue):
231         server_list, unsatisfiable_jobs = self._calculator.servers_for_queue(queue)
232         # Cancel any job/container with unsatisfiable requirements, emitting
233         # a log explaining why.
234         for job_uuid, reason in unsatisfiable_jobs.iteritems():
235             try:
236                 self._client.logs().create(body={
237                     'object_uuid': job_uuid,
238                     'event_type': 'stderr',
239                     'properties': {'text': reason},
240                 }).execute()
241                 # Cancel the job depending on its type
242                 if arvados.util.container_uuid_pattern.match(job_uuid):
243                     subprocess.check_call(['scancel', '--name='+job_uuid])
244                 elif arvados.util.job_uuid_pattern.match(job_uuid):
245                     self._client.jobs().cancel(uuid=job_uuid).execute()
246                 else:
247                     raise Exception('Unknown job type')
248                 self._logger.debug("Cancelled unsatisfiable job '%s'", job_uuid)
249             except Exception as error:
250                 self._logger.error("Trying to cancel job '%s': %s",
251                                    job_uuid,
252                                    error)
253         self._logger.debug("Calculated wishlist: %s",
254                            ', '.join(s.id for s in server_list) or "(empty)")
255         return super(JobQueueMonitorActor, self)._got_response(server_list)