Merge branch '8784-dir-listings'
[arvados.git] / services / nodemanager / arvnodeman / jobqueue.py
1 #!/usr/bin/env python
2 # Copyright (C) The Arvados Authors. All rights reserved.
3 #
4 # SPDX-License-Identifier: AGPL-3.0
5
6 from __future__ import absolute_import, print_function
7
8 import logging
9 import subprocess
10
11 from . import clientactor
12 from .config import ARVADOS_ERRORS
13
14 class ServerCalculator(object):
15     """Generate cloud server wishlists from an Arvados job queue.
16
17     Instantiate this class with a list of cloud node sizes you're willing to
18     use, plus keyword overrides from the configuration.  Then you can pass
19     job queues to servers_for_queue.  It will return a list of node sizes
20     that would best satisfy the jobs, choosing the cheapest size that
21     satisfies each job, and ignoring jobs that can't be satisfied.
22     """
23
24     class CloudSizeWrapper(object):
25         def __init__(self, real_size, node_mem_scaling, **kwargs):
26             self.real = real_size
27             for name in ['id', 'name', 'ram', 'disk', 'bandwidth', 'price',
28                          'extra']:
29                 setattr(self, name, getattr(self.real, name))
30             self.cores = kwargs.pop('cores')
31             # libcloud disk sizes are in GB, Arvados/SLURM are in MB
32             # multiply by 1000 instead of 1024 to err on low side
33             if self.disk is None:
34                 self.disk = 0
35             self.scratch = self.disk * 1000
36             self.ram = int(self.ram * node_mem_scaling)
37             for name, override in kwargs.iteritems():
38                 if not hasattr(self, name):
39                     raise ValueError("unrecognized size field '%s'" % (name,))
40                 setattr(self, name, override)
41
42             if self.price is None:
43                 raise ValueError("Required field 'price' is None")
44
45         def meets_constraints(self, **kwargs):
46             for name, want_value in kwargs.iteritems():
47                 have_value = getattr(self, name)
48                 if (have_value != 0) and (have_value < want_value):
49                     return False
50             return True
51
52
53     def __init__(self, server_list, max_nodes=None, max_price=None,
54                  node_mem_scaling=0.95):
55         self.cloud_sizes = [self.CloudSizeWrapper(s, node_mem_scaling, **kws)
56                             for s, kws in server_list]
57         self.cloud_sizes.sort(key=lambda s: s.price)
58         self.max_nodes = max_nodes or float('inf')
59         self.max_price = max_price or float('inf')
60         self.logger = logging.getLogger('arvnodeman.jobqueue')
61         self.logged_jobs = set()
62
63         self.logger.info("Using cloud node sizes:")
64         for s in self.cloud_sizes:
65             self.logger.info(str(s.__dict__))
66
67     @staticmethod
68     def coerce_int(x, fallback):
69         try:
70             return int(x)
71         except (TypeError, ValueError):
72             return fallback
73
74     def cloud_size_for_constraints(self, constraints):
75         want_value = lambda key: self.coerce_int(constraints.get(key), 0)
76         wants = {'cores': want_value('min_cores_per_node'),
77                  'ram': want_value('min_ram_mb_per_node'),
78                  'scratch': want_value('min_scratch_mb_per_node')}
79         for size in self.cloud_sizes:
80             if size.meets_constraints(**wants):
81                 return size
82         return None
83
84     def servers_for_queue(self, queue):
85         servers = []
86         seen_jobs = set()
87         for job in queue:
88             seen_jobs.add(job['uuid'])
89             constraints = job['runtime_constraints']
90             want_count = max(1, self.coerce_int(constraints.get('min_nodes'), 1))
91             cloud_size = self.cloud_size_for_constraints(constraints)
92             if cloud_size is None:
93                 if job['uuid'] not in self.logged_jobs:
94                     self.logged_jobs.add(job['uuid'])
95                     self.logger.debug("job %s not satisfiable", job['uuid'])
96             elif (want_count <= self.max_nodes) and (want_count*cloud_size.price <= self.max_price):
97                 servers.extend([cloud_size.real] * want_count)
98         self.logged_jobs.intersection_update(seen_jobs)
99         return servers
100
101     def cheapest_size(self):
102         return self.cloud_sizes[0]
103
104     def find_size(self, sizeid):
105         for s in self.cloud_sizes:
106             if s.id == sizeid:
107                 return s
108         return None
109
110 class JobQueueMonitorActor(clientactor.RemotePollLoopActor):
111     """Actor to generate server wishlists from the job queue.
112
113     This actor regularly polls Arvados' job queue, and uses the provided
114     ServerCalculator to turn that into a list of requested node sizes.  That
115     list is sent to subscribers on every poll.
116     """
117
118     CLIENT_ERRORS = ARVADOS_ERRORS
119
120     def __init__(self, client, timer_actor, server_calc,
121                  jobs_queue, slurm_queue, *args, **kwargs):
122         super(JobQueueMonitorActor, self).__init__(
123             client, timer_actor, *args, **kwargs)
124         self.jobs_queue = jobs_queue
125         self.slurm_queue = slurm_queue
126         self._calculator = server_calc
127
128     @staticmethod
129     def coerce_to_mb(x):
130         v, u = x[:-1], x[-1]
131         if u in ("M", "m"):
132             return int(v)
133         elif u in ("G", "g"):
134             return float(v) * 2**10
135         elif u in ("T", "t"):
136             return float(v) * 2**20
137         elif u in ("P", "p"):
138             return float(v) * 2**30
139         else:
140             return int(x)
141
142     def _send_request(self):
143         queuelist = []
144         if self.slurm_queue:
145             # cpus, memory, tempory disk space, reason, job name
146             squeue_out = subprocess.check_output(["squeue", "--state=PENDING", "--noheader", "--format=%c|%m|%d|%r|%j"])
147             for out in squeue_out.splitlines():
148                 try:
149                     cpu, ram, disk, reason, jobname = out.split("|", 4)
150                     if ("ReqNodeNotAvail" in reason) or ("Resources" in reason):
151                         queuelist.append({
152                             "uuid": jobname,
153                             "runtime_constraints": {
154                                 "min_cores_per_node": cpu,
155                                 "min_ram_mb_per_node": self.coerce_to_mb(ram),
156                                 "min_scratch_mb_per_node": self.coerce_to_mb(disk)
157                             }
158                         })
159                 except ValueError:
160                     pass
161
162         if self.jobs_queue:
163             queuelist.extend(self._client.jobs().queue().execute()['items'])
164
165         return queuelist
166
167     def _got_response(self, queue):
168         server_list = self._calculator.servers_for_queue(queue)
169         self._logger.debug("Calculated wishlist: %s",
170                            ', '.join(s.name for s in server_list) or "(empty)")
171         return super(JobQueueMonitorActor, self)._got_response(server_list)