7475: Cancel jobs that cannot be satisfied instead of endlessly retry to run it.
[arvados.git] / services / nodemanager / arvnodeman / jobqueue.py
1 #!/usr/bin/env python
2 # Copyright (C) The Arvados Authors. All rights reserved.
3 #
4 # SPDX-License-Identifier: AGPL-3.0
5
6 from __future__ import absolute_import, print_function
7
8 import logging
9 import subprocess
10
11 from . import clientactor
12 from .config import ARVADOS_ERRORS
13
14
15 class ServerCalculator(object):
16     """Generate cloud server wishlists from an Arvados job queue.
17
18     Instantiate this class with a list of cloud node sizes you're willing to
19     use, plus keyword overrides from the configuration.  Then you can pass
20     job queues to servers_for_queue.  It will return a list of node sizes
21     that would best satisfy the jobs, choosing the cheapest size that
22     satisfies each job, and ignoring jobs that can't be satisfied.
23     """
24
25     class CloudSizeWrapper(object):
26         def __init__(self, real_size, node_mem_scaling, **kwargs):
27             self.real = real_size
28             for name in ['id', 'name', 'ram', 'disk', 'bandwidth', 'price',
29                          'extra']:
30                 setattr(self, name, getattr(self.real, name))
31             self.cores = kwargs.pop('cores')
32             # libcloud disk sizes are in GB, Arvados/SLURM are in MB
33             # multiply by 1000 instead of 1024 to err on low side
34             if self.disk is None:
35                 self.disk = 0
36             self.scratch = self.disk * 1000
37             self.ram = int(self.ram * node_mem_scaling)
38             for name, override in kwargs.iteritems():
39                 if not hasattr(self, name):
40                     raise ValueError("unrecognized size field '%s'" % (name,))
41                 setattr(self, name, override)
42
43             if self.price is None:
44                 raise ValueError("Required field 'price' is None")
45
46         def meets_constraints(self, **kwargs):
47             for name, want_value in kwargs.iteritems():
48                 have_value = getattr(self, name)
49                 if (have_value != 0) and (have_value < want_value):
50                     return False
51             return True
52
53
54     def __init__(self, server_list, max_nodes=None, max_price=None,
55                  node_mem_scaling=0.95):
56         self.cloud_sizes = [self.CloudSizeWrapper(s, node_mem_scaling, **kws)
57                             for s, kws in server_list]
58         self.cloud_sizes.sort(key=lambda s: s.price)
59         self.max_nodes = max_nodes or float('inf')
60         self.max_price = max_price or float('inf')
61         self.logger = logging.getLogger('arvnodeman.jobqueue')
62
63         self.logger.info("Using cloud node sizes:")
64         for s in self.cloud_sizes:
65             self.logger.info(str(s.__dict__))
66
67     @staticmethod
68     def coerce_int(x, fallback):
69         try:
70             return int(x)
71         except (TypeError, ValueError):
72             return fallback
73
74     def cloud_size_for_constraints(self, constraints):
75         want_value = lambda key: self.coerce_int(constraints.get(key), 0)
76         wants = {'cores': want_value('min_cores_per_node'),
77                  'ram': want_value('min_ram_mb_per_node'),
78                  'scratch': want_value('min_scratch_mb_per_node')}
79         for size in self.cloud_sizes:
80             if size.meets_constraints(**wants):
81                 return size
82         return None
83
84     def servers_for_queue(self, queue):
85         servers = []
86         unsatisfiable_jobs = {}
87         for job in queue:
88             constraints = job['runtime_constraints']
89             want_count = max(1, self.coerce_int(constraints.get('min_nodes'), 1))
90             cloud_size = self.cloud_size_for_constraints(constraints)
91             if cloud_size is None:
92                 unsatisfiable_jobs[job['uuid']] = 'Requirements for a single node exceed the available cloud node size'
93             elif (want_count > self.max_nodes):
94                 unsatisfiable_jobs[job['uuid']] = "Job's min_nodes constraint is greater than the configured max_nodes (%d)" % self.max_nodes
95             elif (want_count*cloud_size.price <= self.max_price):
96                 servers.extend([cloud_size.real] * want_count)
97             else:
98                 unsatisfiable_jobs[job['uuid']] = "Job's price (%d) is above system's max_price limit (%d)" % (want_count*cloud_size.price, self.max_price)
99         return (servers, unsatisfiable_jobs)
100
101     def cheapest_size(self):
102         return self.cloud_sizes[0]
103
104     def find_size(self, sizeid):
105         for s in self.cloud_sizes:
106             if s.id == sizeid:
107                 return s
108         return None
109
110
111 class JobQueueMonitorActor(clientactor.RemotePollLoopActor):
112     """Actor to generate server wishlists from the job queue.
113
114     This actor regularly polls Arvados' job queue, and uses the provided
115     ServerCalculator to turn that into a list of requested node sizes.  That
116     list is sent to subscribers on every poll.
117     """
118
119     CLIENT_ERRORS = ARVADOS_ERRORS
120
121     def __init__(self, client, timer_actor, server_calc,
122                  jobs_queue, slurm_queue, *args, **kwargs):
123         super(JobQueueMonitorActor, self).__init__(
124             client, timer_actor, *args, **kwargs)
125         self.jobs_queue = jobs_queue
126         self.slurm_queue = slurm_queue
127         self._calculator = server_calc
128
129     @staticmethod
130     def coerce_to_mb(x):
131         v, u = x[:-1], x[-1]
132         if u in ("M", "m"):
133             return int(v)
134         elif u in ("G", "g"):
135             return float(v) * 2**10
136         elif u in ("T", "t"):
137             return float(v) * 2**20
138         elif u in ("P", "p"):
139             return float(v) * 2**30
140         else:
141             return int(x)
142
143     def _send_request(self):
144         queuelist = []
145         if self.slurm_queue:
146             # cpus, memory, tempory disk space, reason, job name
147             squeue_out = subprocess.check_output(["squeue", "--state=PENDING", "--noheader", "--format=%c|%m|%d|%r|%j"])
148             for out in squeue_out.splitlines():
149                 try:
150                     cpu, ram, disk, reason, jobname = out.split("|", 4)
151                     if ("ReqNodeNotAvail" in reason) or ("Resources" in reason):
152                         queuelist.append({
153                             "uuid": jobname,
154                             "runtime_constraints": {
155                                 "min_cores_per_node": cpu,
156                                 "min_ram_mb_per_node": self.coerce_to_mb(ram),
157                                 "min_scratch_mb_per_node": self.coerce_to_mb(disk)
158                             }
159                         })
160                 except ValueError:
161                     pass
162
163         if self.jobs_queue:
164             queuelist.extend(self._client.jobs().queue().execute()['items'])
165
166         return queuelist
167
168     def _got_response(self, queue):
169         server_list, unsatisfiable_jobs = self._calculator.servers_for_queue(queue)
170         # Cancel any job with unsatisfiable requirements, emitting a log
171         # explaining why.
172         for job_uuid, reason in unsatisfiable_jobs.iteritems():
173             self._client.logs().create(body={
174                 'object_uuid': job_uuid,
175                 'event_type': 'stderr',
176                 'properties': {'text': reason},
177             }).execute()
178             self._client.jobs().cancel(uuid=job['uuid']).execute()
179             self._logger.debug("Unsatisfiable job '%s' cancelled", job_uuid)
180         self._logger.debug("Calculated wishlist: %s",
181                            ', '.join(s.name for s in server_list) or "(empty)")
182         return super(JobQueueMonitorActor, self)._got_response(server_list)