7475: Merge branch 'master' into 7475-nodemgr-unsatisfiable-job-comms
[arvados.git] / services / nodemanager / arvnodeman / jobqueue.py
1 #!/usr/bin/env python
2 # Copyright (C) The Arvados Authors. All rights reserved.
3 #
4 # SPDX-License-Identifier: AGPL-3.0
5
6 from __future__ import absolute_import, print_function
7
8 import logging
9 import subprocess
10
11 import arvados.util
12
13 from . import clientactor
14 from .config import ARVADOS_ERRORS
15
16
17 class ServerCalculator(object):
18     """Generate cloud server wishlists from an Arvados job queue.
19
20     Instantiate this class with a list of cloud node sizes you're willing to
21     use, plus keyword overrides from the configuration.  Then you can pass
22     job queues to servers_for_queue.  It will return a list of node sizes
23     that would best satisfy the jobs, choosing the cheapest size that
24     satisfies each job, and ignoring jobs that can't be satisfied.
25     """
26
27     class CloudSizeWrapper(object):
28         def __init__(self, real_size, node_mem_scaling, **kwargs):
29             self.real = real_size
30             for name in ['id', 'name', 'ram', 'disk', 'bandwidth', 'price',
31                          'extra']:
32                 setattr(self, name, getattr(self.real, name))
33             self.cores = kwargs.pop('cores')
34             # libcloud disk sizes are in GB, Arvados/SLURM are in MB
35             # multiply by 1000 instead of 1024 to err on low side
36             if self.disk is None:
37                 self.disk = 0
38             self.scratch = self.disk * 1000
39             self.ram = int(self.ram * node_mem_scaling)
40             for name, override in kwargs.iteritems():
41                 if not hasattr(self, name):
42                     raise ValueError("unrecognized size field '%s'" % (name,))
43                 setattr(self, name, override)
44
45             if self.price is None:
46                 raise ValueError("Required field 'price' is None")
47
48         def meets_constraints(self, **kwargs):
49             for name, want_value in kwargs.iteritems():
50                 have_value = getattr(self, name)
51                 if (have_value != 0) and (have_value < want_value):
52                     return False
53             return True
54
55
56     def __init__(self, server_list, max_nodes=None, max_price=None,
57                  node_mem_scaling=0.95):
58         self.cloud_sizes = [self.CloudSizeWrapper(s, node_mem_scaling, **kws)
59                             for s, kws in server_list]
60         self.cloud_sizes.sort(key=lambda s: s.price)
61         self.max_nodes = max_nodes or float('inf')
62         self.max_price = max_price or float('inf')
63         self.logger = logging.getLogger('arvnodeman.jobqueue')
64
65         self.logger.info("Using cloud node sizes:")
66         for s in self.cloud_sizes:
67             self.logger.info(str(s.__dict__))
68
69     @staticmethod
70     def coerce_int(x, fallback):
71         try:
72             return int(x)
73         except (TypeError, ValueError):
74             return fallback
75
76     def cloud_size_for_constraints(self, constraints):
77         want_value = lambda key: self.coerce_int(constraints.get(key), 0)
78         wants = {'cores': want_value('min_cores_per_node'),
79                  'ram': want_value('min_ram_mb_per_node'),
80                  'scratch': want_value('min_scratch_mb_per_node')}
81         for size in self.cloud_sizes:
82             if size.meets_constraints(**wants):
83                 return size
84         return None
85
86     def servers_for_queue(self, queue):
87         servers = []
88         unsatisfiable_jobs = {}
89         for job in queue:
90             constraints = job['runtime_constraints']
91             want_count = max(1, self.coerce_int(constraints.get('min_nodes'), 1))
92             cloud_size = self.cloud_size_for_constraints(constraints)
93             if cloud_size is None:
94                 unsatisfiable_jobs[job['uuid']] = (
95                     'Requirements for a single node exceed the available '
96                     'cloud node size')
97             elif (want_count > self.max_nodes):
98                 unsatisfiable_jobs[job['uuid']] = (
99                     "Job's min_nodes constraint is greater than the configured "
100                     "max_nodes (%d)" % self.max_nodes)
101             elif (want_count*cloud_size.price <= self.max_price):
102                 servers.extend([cloud_size.real] * want_count)
103             else:
104                 unsatisfiable_jobs[job['uuid']] = (
105                     "Job's price (%d) is above system's max_price "
106                     "limit (%d)" % (want_count*cloud_size.price, self.max_price))
107         return (servers, unsatisfiable_jobs)
108
109     def cheapest_size(self):
110         return self.cloud_sizes[0]
111
112     def find_size(self, sizeid):
113         for s in self.cloud_sizes:
114             if s.id == sizeid:
115                 return s
116         return None
117
118
119 class JobQueueMonitorActor(clientactor.RemotePollLoopActor):
120     """Actor to generate server wishlists from the job queue.
121
122     This actor regularly polls Arvados' job queue, and uses the provided
123     ServerCalculator to turn that into a list of requested node sizes.  That
124     list is sent to subscribers on every poll.
125     """
126
127     CLIENT_ERRORS = ARVADOS_ERRORS
128
129     def __init__(self, client, timer_actor, server_calc,
130                  jobs_queue, slurm_queue, *args, **kwargs):
131         super(JobQueueMonitorActor, self).__init__(
132             client, timer_actor, *args, **kwargs)
133         self.jobs_queue = jobs_queue
134         self.slurm_queue = slurm_queue
135         self._calculator = server_calc
136
137     @staticmethod
138     def coerce_to_mb(x):
139         v, u = x[:-1], x[-1]
140         if u in ("M", "m"):
141             return int(v)
142         elif u in ("G", "g"):
143             return float(v) * 2**10
144         elif u in ("T", "t"):
145             return float(v) * 2**20
146         elif u in ("P", "p"):
147             return float(v) * 2**30
148         else:
149             return int(x)
150
151     def _send_request(self):
152         queuelist = []
153         if self.slurm_queue:
154             # cpus, memory, tempory disk space, reason, job name
155             squeue_out = subprocess.check_output(["squeue", "--state=PENDING", "--noheader", "--format=%c|%m|%d|%r|%j"])
156             for out in squeue_out.splitlines():
157                 try:
158                     cpu, ram, disk, reason, jobname = out.split("|", 4)
159                     if ("ReqNodeNotAvail" in reason) or ("Resources" in reason):
160                         queuelist.append({
161                             "uuid": jobname,
162                             "runtime_constraints": {
163                                 "min_cores_per_node": cpu,
164                                 "min_ram_mb_per_node": self.coerce_to_mb(ram),
165                                 "min_scratch_mb_per_node": self.coerce_to_mb(disk)
166                             }
167                         })
168                 except ValueError:
169                     pass
170
171         if self.jobs_queue:
172             queuelist.extend(self._client.jobs().queue().execute()['items'])
173
174         return queuelist
175
176     def _got_response(self, queue):
177         server_list, unsatisfiable_jobs = self._calculator.servers_for_queue(queue)
178         # Cancel any job/container with unsatisfiable requirements, emitting
179         # a log explaining why.
180         for job_uuid, reason in unsatisfiable_jobs.iteritems():
181             self._logger.debug("Cancelling unsatisfiable job '%s'", job_uuid)
182             try:
183                 self._client.logs().create(body={
184                     'object_uuid': job_uuid,
185                     'event_type': 'stderr',
186                     'properties': {'text': reason},
187                 }).execute()
188                 # Cancel the job depending on it type
189                 if arvados.util.container_uuid_pattern.match(job_uuid):
190                     subprocess.check_call(['scancel', '--name='+job_uuid])
191                 elif arvados.util.job_uuid_pattern.match(job_uuid):
192                     self._client.jobs().cancel(uuid=job_uuid).execute()
193                 else:
194                     raise Exception('Unknown job type')
195             except Exception as error:
196                 self._logger.error("Trying to cancel job '%s': %s",
197                                    job_uuid,
198                                    error)
199         self._logger.debug("Calculated wishlist: %s",
200                            ', '.join(s.name for s in server_list) or "(empty)")
201         return super(JobQueueMonitorActor, self)._got_response(server_list)