11235: Log a message when a job is interrupted by node failure.
[arvados.git] / services / nodemanager / arvnodeman / jobqueue.py
1 #!/usr/bin/env python
2
3 from __future__ import absolute_import, print_function
4
5 import logging
6 import subprocess
7
8 from . import clientactor
9 from .config import ARVADOS_ERRORS
10
11 class ServerCalculator(object):
12     """Generate cloud server wishlists from an Arvados job queue.
13
14     Instantiate this class with a list of cloud node sizes you're willing to
15     use, plus keyword overrides from the configuration.  Then you can pass
16     job queues to servers_for_queue.  It will return a list of node sizes
17     that would best satisfy the jobs, choosing the cheapest size that
18     satisfies each job, and ignoring jobs that can't be satisfied.
19     """
20
21     class CloudSizeWrapper(object):
22         def __init__(self, real_size, node_mem_scaling, **kwargs):
23             self.real = real_size
24             for name in ['id', 'name', 'ram', 'disk', 'bandwidth', 'price',
25                          'extra']:
26                 setattr(self, name, getattr(self.real, name))
27             self.cores = kwargs.pop('cores')
28             self.scratch = self.disk
29             self.ram = int(self.ram * node_mem_scaling)
30             for name, override in kwargs.iteritems():
31                 if not hasattr(self, name):
32                     raise ValueError("unrecognized size field '%s'" % (name,))
33                 setattr(self, name, override)
34
35             if self.price is None:
36                 raise ValueError("Required field 'price' is None")
37
38         def meets_constraints(self, **kwargs):
39             for name, want_value in kwargs.iteritems():
40                 have_value = getattr(self, name)
41                 if (have_value != 0) and (have_value < want_value):
42                     return False
43             return True
44
45
46     def __init__(self, server_list, max_nodes=None, max_price=None,
47                  node_mem_scaling=0.95):
48         self.cloud_sizes = [self.CloudSizeWrapper(s, node_mem_scaling, **kws)
49                             for s, kws in server_list]
50         self.cloud_sizes.sort(key=lambda s: s.price)
51         self.max_nodes = max_nodes or float('inf')
52         self.max_price = max_price or float('inf')
53         self.logger = logging.getLogger('arvnodeman.jobqueue')
54         self.logged_jobs = set()
55
56     @staticmethod
57     def coerce_int(x, fallback):
58         try:
59             return int(x)
60         except (TypeError, ValueError):
61             return fallback
62
63     def cloud_size_for_constraints(self, constraints):
64         want_value = lambda key: self.coerce_int(constraints.get(key), 0)
65         wants = {'cores': want_value('min_cores_per_node'),
66                  'ram': want_value('min_ram_mb_per_node'),
67                  'scratch': want_value('min_scratch_mb_per_node')}
68         for size in self.cloud_sizes:
69             if size.meets_constraints(**wants):
70                 return size
71         return None
72
73     def servers_for_queue(self, queue):
74         servers = []
75         seen_jobs = set()
76         for job in queue:
77             seen_jobs.add(job['uuid'])
78             constraints = job['runtime_constraints']
79             want_count = max(1, self.coerce_int(constraints.get('min_nodes'), 1))
80             cloud_size = self.cloud_size_for_constraints(constraints)
81             if cloud_size is None:
82                 if job['uuid'] not in self.logged_jobs:
83                     self.logged_jobs.add(job['uuid'])
84                     self.logger.debug("job %s not satisfiable", job['uuid'])
85             elif (want_count <= self.max_nodes) and (want_count*cloud_size.price <= self.max_price):
86                 servers.extend([cloud_size.real] * want_count)
87         self.logged_jobs.intersection_update(seen_jobs)
88         return servers
89
90     def cheapest_size(self):
91         return self.cloud_sizes[0]
92
93     def find_size(self, sizeid):
94         for s in self.cloud_sizes:
95             if s.id == sizeid:
96                 return s
97         return None
98
99 class JobQueueMonitorActor(clientactor.RemotePollLoopActor):
100     """Actor to generate server wishlists from the job queue.
101
102     This actor regularly polls Arvados' job queue, and uses the provided
103     ServerCalculator to turn that into a list of requested node sizes.  That
104     list is sent to subscribers on every poll.
105     """
106
107     CLIENT_ERRORS = ARVADOS_ERRORS
108
109     def __init__(self, client, timer_actor, server_calc, *args, **kwargs):
110         super(JobQueueMonitorActor, self).__init__(
111             client, timer_actor, *args, **kwargs)
112         self._calculator = server_calc
113
114     def _send_request(self):
115         # cpus, memory, tempory disk space, reason, job name
116         squeue_out = subprocess.check_output(["squeue", "--state=PENDING", "--noheader", "--format=%c %m %d %r %j"])
117         queuelist = []
118         for out in squeue_out.splitlines():
119             cpu, ram, disk, reason, jobname = out.split(" ", 4)
120             if reason in ("Resources", "ReqNodeNotAvail"):
121                 queuelist.append({
122                     "uuid": jobname,
123                     "runtime_constraints": {
124                         "min_cores_per_node": cpu,
125                         "min_ram_mb_per_node": ram,
126                         "min_scratch_mb_per_node": disk
127                     }
128                 })
129
130         queuelist.extend(self._client.jobs().queue().execute()['items'])
131
132         return queuelist
133
134     def _got_response(self, queue):
135         server_list = self._calculator.servers_for_queue(queue)
136         self._logger.debug("Calculated wishlist: %s",
137                            ', '.join(s.name for s in server_list) or "(empty)")
138         return super(JobQueueMonitorActor, self)._got_response(server_list)