11345: Simplify and consolodate retry for API throttling errors.
[arvados.git] / services / nodemanager / arvnodeman / jobqueue.py
1 #!/usr/bin/env python
2
3 from __future__ import absolute_import, print_function
4
5 import logging
6 import subprocess
7
8 from . import clientactor
9 from .config import ARVADOS_ERRORS
10
11 class ServerCalculator(object):
12     """Generate cloud server wishlists from an Arvados job queue.
13
14     Instantiate this class with a list of cloud node sizes you're willing to
15     use, plus keyword overrides from the configuration.  Then you can pass
16     job queues to servers_for_queue.  It will return a list of node sizes
17     that would best satisfy the jobs, choosing the cheapest size that
18     satisfies each job, and ignoring jobs that can't be satisfied.
19     """
20
21     class CloudSizeWrapper(object):
22         def __init__(self, real_size, node_mem_scaling, **kwargs):
23             self.real = real_size
24             for name in ['id', 'name', 'ram', 'disk', 'bandwidth', 'price',
25                          'extra']:
26                 setattr(self, name, getattr(self.real, name))
27             self.cores = kwargs.pop('cores')
28             # libcloud disk sizes are in GB, Arvados/SLURM are in MB
29             # multiply by 1000 instead of 1024 to err on low side
30             self.scratch = self.disk * 1000
31             self.ram = int(self.ram * node_mem_scaling)
32             for name, override in kwargs.iteritems():
33                 if not hasattr(self, name):
34                     raise ValueError("unrecognized size field '%s'" % (name,))
35                 setattr(self, name, override)
36
37             if self.price is None:
38                 raise ValueError("Required field 'price' is None")
39
40         def meets_constraints(self, **kwargs):
41             for name, want_value in kwargs.iteritems():
42                 have_value = getattr(self, name)
43                 if (have_value != 0) and (have_value < want_value):
44                     return False
45             return True
46
47
48     def __init__(self, server_list, max_nodes=None, max_price=None,
49                  node_mem_scaling=0.95):
50         self.cloud_sizes = [self.CloudSizeWrapper(s, node_mem_scaling, **kws)
51                             for s, kws in server_list]
52         self.cloud_sizes.sort(key=lambda s: s.price)
53         self.max_nodes = max_nodes or float('inf')
54         self.max_price = max_price or float('inf')
55         self.logger = logging.getLogger('arvnodeman.jobqueue')
56         self.logged_jobs = set()
57
58         self.logger.info("Using cloud node sizes:")
59         for s in self.cloud_sizes:
60             self.logger.info(str(s.__dict__))
61
62     @staticmethod
63     def coerce_int(x, fallback):
64         try:
65             return int(x)
66         except (TypeError, ValueError):
67             return fallback
68
69     def cloud_size_for_constraints(self, constraints):
70         want_value = lambda key: self.coerce_int(constraints.get(key), 0)
71         wants = {'cores': want_value('min_cores_per_node'),
72                  'ram': want_value('min_ram_mb_per_node'),
73                  'scratch': want_value('min_scratch_mb_per_node')}
74         for size in self.cloud_sizes:
75             if size.meets_constraints(**wants):
76                 return size
77         return None
78
79     def servers_for_queue(self, queue):
80         servers = []
81         seen_jobs = set()
82         for job in queue:
83             seen_jobs.add(job['uuid'])
84             constraints = job['runtime_constraints']
85             want_count = max(1, self.coerce_int(constraints.get('min_nodes'), 1))
86             cloud_size = self.cloud_size_for_constraints(constraints)
87             if cloud_size is None:
88                 if job['uuid'] not in self.logged_jobs:
89                     self.logged_jobs.add(job['uuid'])
90                     self.logger.debug("job %s not satisfiable", job['uuid'])
91             elif (want_count <= self.max_nodes) and (want_count*cloud_size.price <= self.max_price):
92                 servers.extend([cloud_size.real] * want_count)
93         self.logged_jobs.intersection_update(seen_jobs)
94         return servers
95
96     def cheapest_size(self):
97         return self.cloud_sizes[0]
98
99     def find_size(self, sizeid):
100         for s in self.cloud_sizes:
101             if s.id == sizeid:
102                 return s
103         return None
104
105 class JobQueueMonitorActor(clientactor.RemotePollLoopActor):
106     """Actor to generate server wishlists from the job queue.
107
108     This actor regularly polls Arvados' job queue, and uses the provided
109     ServerCalculator to turn that into a list of requested node sizes.  That
110     list is sent to subscribers on every poll.
111     """
112
113     CLIENT_ERRORS = ARVADOS_ERRORS
114
115     def __init__(self, client, timer_actor, server_calc,
116                  jobs_queue, slurm_queue, *args, **kwargs):
117         super(JobQueueMonitorActor, self).__init__(
118             client, timer_actor, *args, **kwargs)
119         self.jobs_queue = jobs_queue
120         self.slurm_queue = slurm_queue
121         self._calculator = server_calc
122
123     @staticmethod
124     def coerce_to_mb(x):
125         v, u = x[:-1], x[-1]
126         if u in ("M", "m"):
127             return int(v)
128         elif u in ("G", "g"):
129             return float(v) * 2**10
130         elif u in ("T", "t"):
131             return float(v) * 2**20
132         elif u in ("P", "p"):
133             return float(v) * 2**30
134         else:
135             return int(x)
136
137     def _send_request(self):
138         queuelist = []
139         if self.slurm_queue:
140             # cpus, memory, tempory disk space, reason, job name
141             squeue_out = subprocess.check_output(["squeue", "--state=PENDING", "--noheader", "--format=%c|%m|%d|%r|%j"])
142             for out in squeue_out.splitlines():
143                 try:
144                     cpu, ram, disk, reason, jobname = out.split("|", 4)
145                     if ("ReqNodeNotAvail" in reason) or ("Resources" in reason):
146                         queuelist.append({
147                             "uuid": jobname,
148                             "runtime_constraints": {
149                                 "min_cores_per_node": cpu,
150                                 "min_ram_mb_per_node": self.coerce_to_mb(ram),
151                                 "min_scratch_mb_per_node": self.coerce_to_mb(disk)
152                             }
153                         })
154                 except ValueError:
155                     pass
156
157         if self.jobs_queue:
158             queuelist.extend(self._client.jobs().queue().execute()['items'])
159
160         return queuelist
161
162     def _got_response(self, queue):
163         server_list = self._calculator.servers_for_queue(queue)
164         self._logger.debug("Calculated wishlist: %s",
165                            ', '.join(s.name for s in server_list) or "(empty)")
166         return super(JobQueueMonitorActor, self)._got_response(server_list)