895e03d5ba5ddf6b8c2762e92120e86ccdd904e6
[arvados.git] / services / nodemanager / arvnodeman / jobqueue.py
1 #!/usr/bin/env python
2 # Copyright (C) The Arvados Authors. All rights reserved.
3 #
4 # SPDX-License-Identifier: AGPL-3.0
5
6 from __future__ import absolute_import, print_function
7
8 import logging
9 import subprocess
10
11 from . import clientactor
12 from .config import ARVADOS_ERRORS
13
14
15 class ServerCalculator(object):
16     """Generate cloud server wishlists from an Arvados job queue.
17
18     Instantiate this class with a list of cloud node sizes you're willing to
19     use, plus keyword overrides from the configuration.  Then you can pass
20     job queues to servers_for_queue.  It will return a list of node sizes
21     that would best satisfy the jobs, choosing the cheapest size that
22     satisfies each job, and ignoring jobs that can't be satisfied.
23     """
24
25     class CloudSizeWrapper(object):
26         def __init__(self, real_size, node_mem_scaling, **kwargs):
27             self.real = real_size
28             for name in ['id', 'name', 'ram', 'disk', 'bandwidth', 'price',
29                          'extra']:
30                 setattr(self, name, getattr(self.real, name))
31             self.cores = kwargs.pop('cores')
32             # libcloud disk sizes are in GB, Arvados/SLURM are in MB
33             # multiply by 1000 instead of 1024 to err on low side
34             if self.disk is None:
35                 self.disk = 0
36             self.scratch = self.disk * 1000
37             self.ram = int(self.ram * node_mem_scaling)
38             for name, override in kwargs.iteritems():
39                 if not hasattr(self, name):
40                     raise ValueError("unrecognized size field '%s'" % (name,))
41                 setattr(self, name, override)
42
43             if self.price is None:
44                 raise ValueError("Required field 'price' is None")
45
46         def meets_constraints(self, **kwargs):
47             for name, want_value in kwargs.iteritems():
48                 have_value = getattr(self, name)
49                 if (have_value != 0) and (have_value < want_value):
50                     return False
51             return True
52
53
54     def __init__(self, server_list, max_nodes=None, max_price=None,
55                  node_mem_scaling=0.95):
56         self.cloud_sizes = [self.CloudSizeWrapper(s, node_mem_scaling, **kws)
57                             for s, kws in server_list]
58         self.cloud_sizes.sort(key=lambda s: s.price)
59         self.max_nodes = max_nodes or float('inf')
60         self.max_price = max_price or float('inf')
61         self.logger = logging.getLogger('arvnodeman.jobqueue')
62
63         self.logger.info("Using cloud node sizes:")
64         for s in self.cloud_sizes:
65             self.logger.info(str(s.__dict__))
66
67     @staticmethod
68     def coerce_int(x, fallback):
69         try:
70             return int(x)
71         except (TypeError, ValueError):
72             return fallback
73
74     def cloud_size_for_constraints(self, constraints):
75         want_value = lambda key: self.coerce_int(constraints.get(key), 0)
76         wants = {'cores': want_value('min_cores_per_node'),
77                  'ram': want_value('min_ram_mb_per_node'),
78                  'scratch': want_value('min_scratch_mb_per_node')}
79         for size in self.cloud_sizes:
80             if size.meets_constraints(**wants):
81                 return size
82         return None
83
84     def servers_for_queue(self, queue):
85         servers = []
86         unsatisfiable_jobs = {}
87         for job in queue:
88             constraints = job['runtime_constraints']
89             want_count = max(1, self.coerce_int(constraints.get('min_nodes'), 1))
90             cloud_size = self.cloud_size_for_constraints(constraints)
91             if cloud_size is None:
92                 unsatisfiable_jobs[job['uuid']] = (
93                     'Requirements for a single node exceed the available '
94                     'cloud node size')
95             elif (want_count > self.max_nodes):
96                 unsatisfiable_jobs[job['uuid']] = (
97                     "Job's min_nodes constraint is greater than the configured "
98                     "max_nodes (%d)" % self.max_nodes)
99             elif (want_count*cloud_size.price <= self.max_price):
100                 servers.extend([cloud_size.real] * want_count)
101             else:
102                 unsatisfiable_jobs[job['uuid']] = (
103                     "Job's price (%d) is above system's max_price "
104                     "limit (%d)" % (want_count*cloud_size.price, self.max_price))
105         return (servers, unsatisfiable_jobs)
106
107     def cheapest_size(self):
108         return self.cloud_sizes[0]
109
110     def find_size(self, sizeid):
111         for s in self.cloud_sizes:
112             if s.id == sizeid:
113                 return s
114         return None
115
116
117 class JobQueueMonitorActor(clientactor.RemotePollLoopActor):
118     """Actor to generate server wishlists from the job queue.
119
120     This actor regularly polls Arvados' job queue, and uses the provided
121     ServerCalculator to turn that into a list of requested node sizes.  That
122     list is sent to subscribers on every poll.
123     """
124
125     CLIENT_ERRORS = ARVADOS_ERRORS
126
127     def __init__(self, client, timer_actor, server_calc,
128                  jobs_queue, slurm_queue, *args, **kwargs):
129         super(JobQueueMonitorActor, self).__init__(
130             client, timer_actor, *args, **kwargs)
131         self.jobs_queue = jobs_queue
132         self.slurm_queue = slurm_queue
133         self._calculator = server_calc
134
135     @staticmethod
136     def coerce_to_mb(x):
137         v, u = x[:-1], x[-1]
138         if u in ("M", "m"):
139             return int(v)
140         elif u in ("G", "g"):
141             return float(v) * 2**10
142         elif u in ("T", "t"):
143             return float(v) * 2**20
144         elif u in ("P", "p"):
145             return float(v) * 2**30
146         else:
147             return int(x)
148
149     def _send_request(self):
150         queuelist = []
151         if self.slurm_queue:
152             # cpus, memory, tempory disk space, reason, job name
153             squeue_out = subprocess.check_output(["squeue", "--state=PENDING", "--noheader", "--format=%c|%m|%d|%r|%j"])
154             for out in squeue_out.splitlines():
155                 try:
156                     cpu, ram, disk, reason, jobname = out.split("|", 4)
157                     if ("ReqNodeNotAvail" in reason) or ("Resources" in reason):
158                         queuelist.append({
159                             "uuid": jobname,
160                             "runtime_constraints": {
161                                 "min_cores_per_node": cpu,
162                                 "min_ram_mb_per_node": self.coerce_to_mb(ram),
163                                 "min_scratch_mb_per_node": self.coerce_to_mb(disk)
164                             }
165                         })
166                 except ValueError:
167                     pass
168
169         if self.jobs_queue:
170             queuelist.extend(self._client.jobs().queue().execute()['items'])
171
172         return queuelist
173
174     def _got_response(self, queue):
175         server_list, unsatisfiable_jobs = self._calculator.servers_for_queue(queue)
176         # Cancel any job with unsatisfiable requirements, emitting a log
177         # explaining why.
178         for job_uuid, reason in unsatisfiable_jobs.iteritems():
179             self._logger.debug("Cancelling unsatisfiable job '%s'", job_uuid)
180             try:
181                 self._client.logs().create(body={
182                     'object_uuid': job_uuid,
183                     'event_type': 'stderr',
184                     'properties': {'text': reason},
185                 }).execute()
186                 self._client.jobs().cancel(uuid=job_uuid).execute()
187             except Exception as error:
188                 self._logger.error("Trying to cancel job '%s': %s",
189                                    job_uuid,
190                                    error)
191         self._logger.debug("Calculated wishlist: %s",
192                            ', '.join(s.name for s in server_list) or "(empty)")
193         return super(JobQueueMonitorActor, self)._got_response(server_list)