11870: minor update
[arvados.git] / services / nodemanager / arvnodeman / jobqueue.py
1 #!/usr/bin/env python
2
3 from __future__ import absolute_import, print_function
4
5 import logging
6 import subprocess
7
8 from . import clientactor
9 from .config import ARVADOS_ERRORS
10
11 class ServerCalculator(object):
12     """Generate cloud server wishlists from an Arvados job queue.
13
14     Instantiate this class with a list of cloud node sizes you're willing to
15     use, plus keyword overrides from the configuration.  Then you can pass
16     job queues to servers_for_queue.  It will return a list of node sizes
17     that would best satisfy the jobs, choosing the cheapest size that
18     satisfies each job, and ignoring jobs that can't be satisfied.
19     """
20
21     class CloudSizeWrapper(object):
22         def __init__(self, real_size, node_mem_scaling, **kwargs):
23             self.real = real_size
24             for name in ['id', 'name', 'ram', 'disk', 'bandwidth', 'price',
25                          'extra']:
26                 setattr(self, name, getattr(self.real, name))
27             self.cores = kwargs.pop('cores')
28             # libcloud disk sizes are in GB, Arvados/SLURM are in MB
29             # multiply by 1000 instead of 1024 to err on low side
30             if self.disk is None:
31                 self.disk = 0
32             self.scratch = self.disk * 1000
33             self.ram = int(self.ram * node_mem_scaling)
34             for name, override in kwargs.iteritems():
35                 if not hasattr(self, name):
36                     raise ValueError("unrecognized size field '%s'" % (name,))
37                 setattr(self, name, override)
38
39             if self.price is None:
40                 raise ValueError("Required field 'price' is None")
41
42         def meets_constraints(self, **kwargs):
43             for name, want_value in kwargs.iteritems():
44                 have_value = getattr(self, name)
45                 if (have_value != 0) and (have_value < want_value):
46                     return False
47             return True
48
49
50     def __init__(self, server_list, max_nodes=None, max_price=None,
51                  node_mem_scaling=0.95):
52         self.cloud_sizes = [self.CloudSizeWrapper(s, node_mem_scaling, **kws)
53                             for s, kws in server_list]
54         self.cloud_sizes.sort(key=lambda s: s.price)
55         self.max_nodes = max_nodes or float('inf')
56         self.max_price = max_price or float('inf')
57         self.logger = logging.getLogger('arvnodeman.jobqueue')
58         self.logged_jobs = set()
59
60         self.logger.info("Using cloud node sizes:")
61         for s in self.cloud_sizes:
62             self.logger.info(str(s.__dict__))
63
64     @staticmethod
65     def coerce_int(x, fallback):
66         try:
67             return int(x)
68         except (TypeError, ValueError):
69             return fallback
70
71     def cloud_size_for_constraints(self, constraints):
72         want_value = lambda key: self.coerce_int(constraints.get(key), 0)
73         wants = {'cores': want_value('min_cores_per_node'),
74                  'ram': want_value('min_ram_mb_per_node'),
75                  'scratch': want_value('min_scratch_mb_per_node')}
76         for size in self.cloud_sizes:
77             if size.meets_constraints(**wants):
78                 return size
79         return None
80
81     def servers_for_queue(self, queue):
82         servers = []
83         seen_jobs = set()
84         for job in queue:
85             seen_jobs.add(job['uuid'])
86             constraints = job['runtime_constraints']
87             want_count = max(1, self.coerce_int(constraints.get('min_nodes'), 1))
88             cloud_size = self.cloud_size_for_constraints(constraints)
89             if cloud_size is None:
90                 if job['uuid'] not in self.logged_jobs:
91                     self.logged_jobs.add(job['uuid'])
92                     self.logger.debug("job %s not satisfiable", job['uuid'])
93             elif (want_count <= self.max_nodes) and (want_count*cloud_size.price <= self.max_price):
94                 servers.extend([cloud_size.real] * want_count)
95         self.logged_jobs.intersection_update(seen_jobs)
96         return servers
97
98     def cheapest_size(self):
99         return self.cloud_sizes[0]
100
101     def find_size(self, sizeid):
102         for s in self.cloud_sizes:
103             if s.id == sizeid:
104                 return s
105         return None
106
107 class JobQueueMonitorActor(clientactor.RemotePollLoopActor):
108     """Actor to generate server wishlists from the job queue.
109
110     This actor regularly polls Arvados' job queue, and uses the provided
111     ServerCalculator to turn that into a list of requested node sizes.  That
112     list is sent to subscribers on every poll.
113     """
114
115     CLIENT_ERRORS = ARVADOS_ERRORS
116
117     def __init__(self, client, timer_actor, server_calc,
118                  jobs_queue, slurm_queue, *args, **kwargs):
119         super(JobQueueMonitorActor, self).__init__(
120             client, timer_actor, *args, **kwargs)
121         self.jobs_queue = jobs_queue
122         self.slurm_queue = slurm_queue
123         self._calculator = server_calc
124
125     @staticmethod
126     def coerce_to_mb(x):
127         v, u = x[:-1], x[-1]
128         if u in ("M", "m"):
129             return int(v)
130         elif u in ("G", "g"):
131             return float(v) * 2**10
132         elif u in ("T", "t"):
133             return float(v) * 2**20
134         elif u in ("P", "p"):
135             return float(v) * 2**30
136         else:
137             return int(x)
138
139     def _send_request(self):
140         queuelist = []
141         if self.slurm_queue:
142             # cpus, memory, tempory disk space, reason, job name
143             squeue_out = subprocess.check_output(["squeue", "--state=PENDING", "--noheader", "--format=%c|%m|%d|%r|%j"])
144             for out in squeue_out.splitlines():
145                 try:
146                     cpu, ram, disk, reason, jobname = out.split("|", 4)
147                     if ("ReqNodeNotAvail" in reason) or ("Resources" in reason):
148                         queuelist.append({
149                             "uuid": jobname,
150                             "runtime_constraints": {
151                                 "min_cores_per_node": cpu,
152                                 "min_ram_mb_per_node": self.coerce_to_mb(ram),
153                                 "min_scratch_mb_per_node": self.coerce_to_mb(disk)
154                             }
155                         })
156                 except ValueError:
157                     pass
158
159         if self.jobs_queue:
160             queuelist.extend(self._client.jobs().queue().execute()['items'])
161
162         return queuelist
163
164     def _got_response(self, queue):
165         server_list = self._calculator.servers_for_queue(queue)
166         self._logger.debug("Calculated wishlist: %s",
167                            ', '.join(s.name for s in server_list) or "(empty)")
168         return super(JobQueueMonitorActor, self)._got_response(server_list)