Merge branch '7478-s-preemptable-preemptible'
[arvados.git] / services / nodemanager / arvnodeman / jobqueue.py
index a35bd92863e533ca2abb7f4f18ca12f992e835a1..e91764474fbd10edb28463368588329125e0a3db 100644 (file)
@@ -1,13 +1,20 @@
 #!/usr/bin/env python
 #!/usr/bin/env python
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
 
 from __future__ import absolute_import, print_function
 
 import logging
 
 from __future__ import absolute_import, print_function
 
 import logging
+import re
 import subprocess
 
 import subprocess
 
+import arvados.util
+
 from . import clientactor
 from .config import ARVADOS_ERRORS
 
 from . import clientactor
 from .config import ARVADOS_ERRORS
 
+
 class ServerCalculator(object):
     """Generate cloud server wishlists from an Arvados job queue.
 
 class ServerCalculator(object):
     """Generate cloud server wishlists from an Arvados job queue.
 
@@ -17,6 +24,26 @@ class ServerCalculator(object):
     that would best satisfy the jobs, choosing the cheapest size that
     satisfies each job, and ignoring jobs that can't be satisfied.
     """
     that would best satisfy the jobs, choosing the cheapest size that
     satisfies each job, and ignoring jobs that can't be satisfied.
     """
+    class InvalidCloudSize(object):
+        """
+        Dummy CloudSizeWrapper-like class, to be used when a cloud node doesn't
+        have a recognizable arvados_node_size tag.
+        """
+        def __init__(self):
+            self.id = 'invalid'
+            self.name = 'invalid'
+            self.ram = 0
+            self.disk = 0
+            self.scratch = 0
+            self.cores = 0
+            self.bandwidth = 0
+            self.price = 9999999
+            self.preemptible = False
+            self.extra = {}
+
+        def meets_constraints(self, **kwargs):
+            return False
+
 
     class CloudSizeWrapper(object):
         def __init__(self, real_size, node_mem_scaling, **kwargs):
 
     class CloudSizeWrapper(object):
         def __init__(self, real_size, node_mem_scaling, **kwargs):
@@ -27,9 +54,13 @@ class ServerCalculator(object):
             self.cores = kwargs.pop('cores')
             # libcloud disk sizes are in GB, Arvados/SLURM are in MB
             # multiply by 1000 instead of 1024 to err on low side
             self.cores = kwargs.pop('cores')
             # libcloud disk sizes are in GB, Arvados/SLURM are in MB
             # multiply by 1000 instead of 1024 to err on low side
+            if self.disk is None:
+                self.disk = 0
             self.scratch = self.disk * 1000
             self.ram = int(self.ram * node_mem_scaling)
             self.scratch = self.disk * 1000
             self.ram = int(self.ram * node_mem_scaling)
+            self.preemptible = False
             for name, override in kwargs.iteritems():
             for name, override in kwargs.iteritems():
+                if name == 'instance_type': continue
                 if not hasattr(self, name):
                     raise ValueError("unrecognized size field '%s'" % (name,))
                 setattr(self, name, override)
                 if not hasattr(self, name):
                     raise ValueError("unrecognized size field '%s'" % (name,))
                 setattr(self, name, override)
@@ -53,7 +84,6 @@ class ServerCalculator(object):
         self.max_nodes = max_nodes or float('inf')
         self.max_price = max_price or float('inf')
         self.logger = logging.getLogger('arvnodeman.jobqueue')
         self.max_nodes = max_nodes or float('inf')
         self.max_price = max_price or float('inf')
         self.logger = logging.getLogger('arvnodeman.jobqueue')
-        self.logged_jobs = set()
 
         self.logger.info("Using cloud node sizes:")
         for s in self.cloud_sizes:
 
         self.logger.info("Using cloud node sizes:")
         for s in self.cloud_sizes:
@@ -67,31 +97,40 @@ class ServerCalculator(object):
             return fallback
 
     def cloud_size_for_constraints(self, constraints):
             return fallback
 
     def cloud_size_for_constraints(self, constraints):
+        specified_size = constraints.get('instance_type')
         want_value = lambda key: self.coerce_int(constraints.get(key), 0)
         wants = {'cores': want_value('min_cores_per_node'),
                  'ram': want_value('min_ram_mb_per_node'),
                  'scratch': want_value('min_scratch_mb_per_node')}
         want_value = lambda key: self.coerce_int(constraints.get(key), 0)
         wants = {'cores': want_value('min_cores_per_node'),
                  'ram': want_value('min_ram_mb_per_node'),
                  'scratch': want_value('min_scratch_mb_per_node')}
+        # EC2 node sizes are identified by id. GCE sizes are identified by name.
         for size in self.cloud_sizes:
         for size in self.cloud_sizes:
-            if size.meets_constraints(**wants):
-                return size
+            if (size.meets_constraints(**wants) and
+                (specified_size is None or
+                    size.id == specified_size or size.name == specified_size)):
+                        return size
         return None
 
     def servers_for_queue(self, queue):
         servers = []
         return None
 
     def servers_for_queue(self, queue):
         servers = []
-        seen_jobs = set()
+        unsatisfiable_jobs = {}
         for job in queue:
         for job in queue:
-            seen_jobs.add(job['uuid'])
             constraints = job['runtime_constraints']
             want_count = max(1, self.coerce_int(constraints.get('min_nodes'), 1))
             cloud_size = self.cloud_size_for_constraints(constraints)
             if cloud_size is None:
             constraints = job['runtime_constraints']
             want_count = max(1, self.coerce_int(constraints.get('min_nodes'), 1))
             cloud_size = self.cloud_size_for_constraints(constraints)
             if cloud_size is None:
-                if job['uuid'] not in self.logged_jobs:
-                    self.logged_jobs.add(job['uuid'])
-                    self.logger.debug("job %s not satisfiable", job['uuid'])
-            elif (want_count <= self.max_nodes) and (want_count*cloud_size.price <= self.max_price):
-                servers.extend([cloud_size.real] * want_count)
-        self.logged_jobs.intersection_update(seen_jobs)
-        return servers
+                unsatisfiable_jobs[job['uuid']] = (
+                    "Constraints cannot be satisfied by any node type")
+            elif (want_count > self.max_nodes):
+                unsatisfiable_jobs[job['uuid']] = (
+                    "Job's min_nodes constraint is greater than the configured "
+                    "max_nodes (%d)" % self.max_nodes)
+            elif (want_count*cloud_size.price <= self.max_price):
+                servers.extend([cloud_size] * want_count)
+            else:
+                unsatisfiable_jobs[job['uuid']] = (
+                    "Job's price (%d) is above system's max_price "
+                    "limit (%d)" % (want_count*cloud_size.price, self.max_price))
+        return (servers, unsatisfiable_jobs)
 
     def cheapest_size(self):
         return self.cloud_sizes[0]
 
     def cheapest_size(self):
         return self.cloud_sizes[0]
@@ -100,7 +139,8 @@ class ServerCalculator(object):
         for s in self.cloud_sizes:
             if s.id == sizeid:
                 return s
         for s in self.cloud_sizes:
             if s.id == sizeid:
                 return s
-        return None
+        return self.InvalidCloudSize()
+
 
 class JobQueueMonitorActor(clientactor.RemotePollLoopActor):
     """Actor to generate server wishlists from the job queue.
 
 class JobQueueMonitorActor(clientactor.RemotePollLoopActor):
     """Actor to generate server wishlists from the job queue.
@@ -112,9 +152,12 @@ class JobQueueMonitorActor(clientactor.RemotePollLoopActor):
 
     CLIENT_ERRORS = ARVADOS_ERRORS
 
 
     CLIENT_ERRORS = ARVADOS_ERRORS
 
-    def __init__(self, client, timer_actor, server_calc, *args, **kwargs):
+    def __init__(self, client, timer_actor, server_calc,
+                 jobs_queue, slurm_queue, *args, **kwargs):
         super(JobQueueMonitorActor, self).__init__(
             client, timer_actor, *args, **kwargs)
         super(JobQueueMonitorActor, self).__init__(
             client, timer_actor, *args, **kwargs)
+        self.jobs_queue = jobs_queue
+        self.slurm_queue = slurm_queue
         self._calculator = server_calc
 
     @staticmethod
         self._calculator = server_calc
 
     @staticmethod
@@ -132,27 +175,78 @@ class JobQueueMonitorActor(clientactor.RemotePollLoopActor):
             return int(x)
 
     def _send_request(self):
             return int(x)
 
     def _send_request(self):
-        # cpus, memory, tempory disk space, reason, job name
-        squeue_out = subprocess.check_output(["squeue", "--state=PENDING", "--noheader", "--format=%c %m %d %r %j"])
         queuelist = []
         queuelist = []
-        for out in squeue_out.splitlines():
-            cpu, ram, disk, reason, jobname = out.split(" ", 4)
-            if reason in ("Resources", "ReqNodeNotAvail"):
-                queuelist.append({
-                    "uuid": jobname,
-                    "runtime_constraints": {
-                        "min_cores_per_node": cpu,
-                        "min_ram_mb_per_node": self.coerce_to_mb(ram),
-                        "min_scratch_mb_per_node": self.coerce_to_mb(disk)
-                    }
-                })
-
-        queuelist.extend(self._client.jobs().queue().execute()['items'])
+        if self.slurm_queue:
+            # cpus, memory, tempory disk space, reason, job name, feature constraints, priority
+            squeue_out = subprocess.check_output(["squeue", "--state=PENDING", "--noheader", "--format=%c|%m|%d|%r|%j|%f|%Q"])
+            for out in squeue_out.splitlines():
+                try:
+                    cpu, ram, disk, reason, jobname, features, priority = out.split("|", 6)
+                except ValueError:
+                    self._logger.warning("ignored malformed line in squeue output: %r", out)
+                    continue
+                if '-dz642-' not in jobname:
+                    continue
+                if not re.search(r'BadConstraints|ReqNodeNotAvail|Resources|Priority', reason):
+                    continue
+
+                for feature in features.split(','):
+                    m = re.match(r'instancetype=(.*)', feature)
+                    if not m:
+                        continue
+                    instance_type = m.group(1)
+                    # Ignore cpu/ram/scratch requirements, bring up
+                    # the requested node type.
+                    queuelist.append({
+                        "uuid": jobname,
+                        "runtime_constraints": {
+                            "instance_type": instance_type,
+                        },
+                        "priority": int(priority)
+                    })
+                    break
+                else:
+                    # No instance type specified. Choose a node type
+                    # to suit cpu/ram/scratch requirements.
+                    queuelist.append({
+                        "uuid": jobname,
+                        "runtime_constraints": {
+                            "min_cores_per_node": cpu,
+                            "min_ram_mb_per_node": self.coerce_to_mb(ram),
+                            "min_scratch_mb_per_node": self.coerce_to_mb(disk)
+                        },
+                        "priority": int(priority)
+                    })
+            queuelist.sort(key=lambda x: x.get('priority', 1), reverse=True)
+
+        if self.jobs_queue:
+            queuelist.extend(self._client.jobs().queue().execute()['items'])
 
         return queuelist
 
     def _got_response(self, queue):
 
         return queuelist
 
     def _got_response(self, queue):
-        server_list = self._calculator.servers_for_queue(queue)
+        server_list, unsatisfiable_jobs = self._calculator.servers_for_queue(queue)
+        # Cancel any job/container with unsatisfiable requirements, emitting
+        # a log explaining why.
+        for job_uuid, reason in unsatisfiable_jobs.iteritems():
+            try:
+                self._client.logs().create(body={
+                    'object_uuid': job_uuid,
+                    'event_type': 'stderr',
+                    'properties': {'text': reason},
+                }).execute()
+                # Cancel the job depending on its type
+                if arvados.util.container_uuid_pattern.match(job_uuid):
+                    subprocess.check_call(['scancel', '--name='+job_uuid])
+                elif arvados.util.job_uuid_pattern.match(job_uuid):
+                    self._client.jobs().cancel(uuid=job_uuid).execute()
+                else:
+                    raise Exception('Unknown job type')
+                self._logger.debug("Cancelled unsatisfiable job '%s'", job_uuid)
+            except Exception as error:
+                self._logger.error("Trying to cancel job '%s': %s",
+                                   job_uuid,
+                                   error)
         self._logger.debug("Calculated wishlist: %s",
         self._logger.debug("Calculated wishlist: %s",
-                           ', '.join(s.name for s in server_list) or "(empty)")
+                           ', '.join(s.id for s in server_list) or "(empty)")
         return super(JobQueueMonitorActor, self)._got_response(server_list)
         return super(JobQueueMonitorActor, self)._got_response(server_list)