Merge branch 'master' into 13822-nm-delayed-daemon
[arvados.git] / services / nodemanager / arvnodeman / jobqueue.py
1 #!/usr/bin/env python
2 # Copyright (C) The Arvados Authors. All rights reserved.
3 #
4 # SPDX-License-Identifier: AGPL-3.0
5
6 from __future__ import absolute_import, print_function
7
8 import logging
9 import re
10 import subprocess32 as subprocess
11
12 import arvados.util
13
14 from . import clientactor
15 from .config import ARVADOS_ERRORS
16
17
18 class ServerCalculator(object):
19     """Generate cloud server wishlists from an Arvados job queue.
20
21     Instantiate this class with a list of cloud node sizes you're willing to
22     use, plus keyword overrides from the configuration.  Then you can pass
23     job queues to servers_for_queue.  It will return a list of node sizes
24     that would best satisfy the jobs, choosing the cheapest size that
25     satisfies each job, and ignoring jobs that can't be satisfied.
26     """
27     class InvalidCloudSize(object):
28         """
29         Dummy CloudSizeWrapper-like class, to be used when a cloud node doesn't
30         have a recognizable arvados_node_size tag.
31         """
32         def __init__(self):
33             self.id = 'invalid'
34             self.name = 'invalid'
35             self.ram = 0
36             self.disk = 0
37             self.scratch = 0
38             self.cores = 0
39             self.bandwidth = 0
40             self.price = 9999999
41             self.preemptible = False
42             self.extra = {}
43
44         def meets_constraints(self, **kwargs):
45             return False
46
47
48     class CloudSizeWrapper(object):
49         def __init__(self, real_size, node_mem_scaling, **kwargs):
50             self.real = real_size
51             for name in ['id', 'name', 'ram', 'disk', 'bandwidth', 'price',
52                          'extra']:
53                 setattr(self, name, getattr(self.real, name))
54             self.cores = kwargs.pop('cores')
55             # libcloud disk sizes are in GB, Arvados/SLURM are in MB
56             # multiply by 1000 instead of 1024 to err on low side
57             if self.disk is None:
58                 self.disk = 0
59             self.scratch = self.disk * 1000
60             self.ram = int(self.ram * node_mem_scaling)
61             self.preemptible = False
62             for name, override in kwargs.iteritems():
63                 if name == 'instance_type': continue
64                 if not hasattr(self, name):
65                     raise ValueError("unrecognized size field '%s'" % (name,))
66                 setattr(self, name, override)
67
68             if self.price is None:
69                 raise ValueError("Required field 'price' is None")
70
71         def meets_constraints(self, **kwargs):
72             for name, want_value in kwargs.iteritems():
73                 have_value = getattr(self, name)
74                 if (have_value != 0) and (have_value < want_value):
75                     return False
76             return True
77
78
79     def __init__(self, server_list, max_nodes=None, max_price=None,
80                  node_mem_scaling=0.95):
81         self.cloud_sizes = [self.CloudSizeWrapper(s, node_mem_scaling, **kws)
82                             for s, kws in server_list]
83         self.cloud_sizes.sort(key=lambda s: s.price)
84         self.max_nodes = max_nodes or float('inf')
85         self.max_price = max_price or float('inf')
86         self.logger = logging.getLogger('arvnodeman.jobqueue')
87
88         self.logger.info("Using cloud node sizes:")
89         for s in self.cloud_sizes:
90             self.logger.info(str(s.__dict__))
91
92     @staticmethod
93     def coerce_int(x, fallback):
94         try:
95             return int(x)
96         except (TypeError, ValueError):
97             return fallback
98
99     def cloud_size_for_constraints(self, constraints):
100         specified_size = constraints.get('instance_type')
101         want_value = lambda key: self.coerce_int(constraints.get(key), 0)
102         wants = {'cores': want_value('min_cores_per_node'),
103                  'ram': want_value('min_ram_mb_per_node'),
104                  'scratch': want_value('min_scratch_mb_per_node')}
105         # EC2 node sizes are identified by id. GCE sizes are identified by name.
106         for size in self.cloud_sizes:
107             if (size.meets_constraints(**wants) and
108                 (specified_size is None or
109                     size.id == specified_size or size.name == specified_size)):
110                         return size
111         return None
112
113     def servers_for_queue(self, queue):
114         servers = []
115         unsatisfiable_jobs = {}
116         for job in queue:
117             constraints = job['runtime_constraints']
118             want_count = max(1, self.coerce_int(constraints.get('min_nodes'), 1))
119             cloud_size = self.cloud_size_for_constraints(constraints)
120             if cloud_size is None:
121                 unsatisfiable_jobs[job['uuid']] = (
122                     "Constraints cannot be satisfied by any node type")
123             elif (want_count > self.max_nodes):
124                 unsatisfiable_jobs[job['uuid']] = (
125                     "Job's min_nodes constraint is greater than the configured "
126                     "max_nodes (%d)" % self.max_nodes)
127             elif (want_count*cloud_size.price <= self.max_price):
128                 servers.extend([cloud_size] * want_count)
129             else:
130                 unsatisfiable_jobs[job['uuid']] = (
131                     "Job's price (%d) is above system's max_price "
132                     "limit (%d)" % (want_count*cloud_size.price, self.max_price))
133         return (servers, unsatisfiable_jobs)
134
135     def cheapest_size(self):
136         return self.cloud_sizes[0]
137
138     def find_size(self, sizeid):
139         for s in self.cloud_sizes:
140             if s.id == sizeid:
141                 return s
142         return self.InvalidCloudSize()
143
144
145 class JobQueueMonitorActor(clientactor.RemotePollLoopActor):
146     """Actor to generate server wishlists from the job queue.
147
148     This actor regularly polls Arvados' job queue, and uses the provided
149     ServerCalculator to turn that into a list of requested node sizes.  That
150     list is sent to subscribers on every poll.
151     """
152
153     CLIENT_ERRORS = ARVADOS_ERRORS
154
155     def __init__(self, client, timer_actor, server_calc,
156                  jobs_queue, slurm_queue, *args, **kwargs):
157         super(JobQueueMonitorActor, self).__init__(
158             client, timer_actor, *args, **kwargs)
159         self.jobs_queue = jobs_queue
160         self.slurm_queue = slurm_queue
161         self._calculator = server_calc
162
163     @staticmethod
164     def coerce_to_mb(x):
165         v, u = x[:-1], x[-1]
166         if u in ("M", "m"):
167             return int(v)
168         elif u in ("G", "g"):
169             return float(v) * 2**10
170         elif u in ("T", "t"):
171             return float(v) * 2**20
172         elif u in ("P", "p"):
173             return float(v) * 2**30
174         else:
175             return int(x)
176
177     def _send_request(self):
178         queuelist = []
179         if self.slurm_queue:
180             # cpus, memory, tempory disk space, reason, job name, feature constraints, priority
181             squeue_out = subprocess.check_output(["squeue", "--state=PENDING", "--noheader", "--format=%c|%m|%d|%r|%j|%f|%Q"])
182             for out in squeue_out.splitlines():
183                 try:
184                     cpu, ram, disk, reason, jobname, features, priority = out.split("|", 6)
185                 except ValueError:
186                     self._logger.warning("ignored malformed line in squeue output: %r", out)
187                     continue
188                 if '-dz642-' not in jobname:
189                     continue
190                 if not re.search(r'BadConstraints|ReqNodeNotAvail|Resources|Priority', reason):
191                     continue
192
193                 for feature in features.split(','):
194                     m = re.match(r'instancetype=(.*)', feature)
195                     if not m:
196                         continue
197                     instance_type = m.group(1)
198                     # Ignore cpu/ram/scratch requirements, bring up
199                     # the requested node type.
200                     queuelist.append({
201                         "uuid": jobname,
202                         "runtime_constraints": {
203                             "instance_type": instance_type,
204                         },
205                         "priority": int(priority)
206                     })
207                     break
208                 else:
209                     # No instance type specified. Choose a node type
210                     # to suit cpu/ram/scratch requirements.
211                     queuelist.append({
212                         "uuid": jobname,
213                         "runtime_constraints": {
214                             "min_cores_per_node": cpu,
215                             "min_ram_mb_per_node": self.coerce_to_mb(ram),
216                             "min_scratch_mb_per_node": self.coerce_to_mb(disk)
217                         },
218                         "priority": int(priority)
219                     })
220             queuelist.sort(key=lambda x: x.get('priority', 1), reverse=True)
221
222         if self.jobs_queue:
223             queuelist.extend(self._client.jobs().queue().execute()['items'])
224
225         return queuelist
226
227     def _got_response(self, queue):
228         server_list, unsatisfiable_jobs = self._calculator.servers_for_queue(queue)
229         # Cancel any job/container with unsatisfiable requirements, emitting
230         # a log explaining why.
231         for job_uuid, reason in unsatisfiable_jobs.iteritems():
232             try:
233                 self._client.logs().create(body={
234                     'object_uuid': job_uuid,
235                     'event_type': 'stderr',
236                     'properties': {'text': reason},
237                 }).execute()
238                 # Cancel the job depending on its type
239                 if arvados.util.container_uuid_pattern.match(job_uuid):
240                     subprocess.check_call(['scancel', '--name='+job_uuid])
241                 elif arvados.util.job_uuid_pattern.match(job_uuid):
242                     self._client.jobs().cancel(uuid=job_uuid).execute()
243                 else:
244                     raise Exception('Unknown job type')
245                 self._logger.debug("Cancelled unsatisfiable job '%s'", job_uuid)
246             except Exception as error:
247                 self._logger.error("Trying to cancel job '%s': %s",
248                                    job_uuid,
249                                    error)
250         self._logger.debug("Calculated wishlist: %s",
251                            ', '.join(s.id for s in server_list) or "(empty)")
252         return super(JobQueueMonitorActor, self)._got_response(server_list)