13868: Fallback behavior when arvados_node_size tag is missing
[arvados.git] / services / nodemanager / arvnodeman / jobqueue.py
index 895e03d5ba5ddf6b8c2762e92120e86ccdd904e6..7ca9c9553721f0fa1291273bfeff5f5f9f7d0e78 100644 (file)
@@ -6,7 +6,10 @@
 from __future__ import absolute_import, print_function
 
 import logging
 from __future__ import absolute_import, print_function
 
 import logging
-import subprocess
+import re
+import subprocess32 as subprocess
+
+import arvados.util
 
 from . import clientactor
 from .config import ARVADOS_ERRORS
 
 from . import clientactor
 from .config import ARVADOS_ERRORS
@@ -21,6 +24,29 @@ class ServerCalculator(object):
     that would best satisfy the jobs, choosing the cheapest size that
     satisfies each job, and ignoring jobs that can't be satisfied.
     """
     that would best satisfy the jobs, choosing the cheapest size that
     satisfies each job, and ignoring jobs that can't be satisfied.
     """
+    class InvalidCloudSize(object):
+        """
+        Dummy CloudSizeWrapper-like class, to be used when a cloud node doesn't
+        have a recognizable arvados_node_size tag.
+        """
+        def __init__(self):
+            self.id = 'invalid'
+            self.name = 'invalid'
+            self.ram = 0
+            self.disk = 0
+            self.scratch = 0
+            self.cores = 0
+            self.bandwidth = 0
+            # price is multiplied by 1000 to get the node weight
+            # the maximum node weight is                  4294967280
+            # so use invalid node weight 4294967 * 1000 = 4294967000
+            self.price = 4294967
+            self.preemptible = False
+            self.extra = {}
+
+        def meets_constraints(self, **kwargs):
+            return False
+
 
     class CloudSizeWrapper(object):
         def __init__(self, real_size, node_mem_scaling, **kwargs):
 
     class CloudSizeWrapper(object):
         def __init__(self, real_size, node_mem_scaling, **kwargs):
@@ -35,7 +61,9 @@ class ServerCalculator(object):
                 self.disk = 0
             self.scratch = self.disk * 1000
             self.ram = int(self.ram * node_mem_scaling)
                 self.disk = 0
             self.scratch = self.disk * 1000
             self.ram = int(self.ram * node_mem_scaling)
+            self.preemptible = False
             for name, override in kwargs.iteritems():
             for name, override in kwargs.iteritems():
+                if name == 'instance_type': continue
                 if not hasattr(self, name):
                     raise ValueError("unrecognized size field '%s'" % (name,))
                 setattr(self, name, override)
                 if not hasattr(self, name):
                     raise ValueError("unrecognized size field '%s'" % (name,))
                 setattr(self, name, override)
@@ -72,13 +100,17 @@ class ServerCalculator(object):
             return fallback
 
     def cloud_size_for_constraints(self, constraints):
             return fallback
 
     def cloud_size_for_constraints(self, constraints):
+        specified_size = constraints.get('instance_type')
         want_value = lambda key: self.coerce_int(constraints.get(key), 0)
         wants = {'cores': want_value('min_cores_per_node'),
                  'ram': want_value('min_ram_mb_per_node'),
                  'scratch': want_value('min_scratch_mb_per_node')}
         want_value = lambda key: self.coerce_int(constraints.get(key), 0)
         wants = {'cores': want_value('min_cores_per_node'),
                  'ram': want_value('min_ram_mb_per_node'),
                  'scratch': want_value('min_scratch_mb_per_node')}
+        # EC2 node sizes are identified by id. GCE sizes are identified by name.
         for size in self.cloud_sizes:
         for size in self.cloud_sizes:
-            if size.meets_constraints(**wants):
-                return size
+            if (size.meets_constraints(**wants) and
+                (specified_size is None or
+                    size.id == specified_size or size.name == specified_size)):
+                        return size
         return None
 
     def servers_for_queue(self, queue):
         return None
 
     def servers_for_queue(self, queue):
@@ -90,14 +122,13 @@ class ServerCalculator(object):
             cloud_size = self.cloud_size_for_constraints(constraints)
             if cloud_size is None:
                 unsatisfiable_jobs[job['uuid']] = (
             cloud_size = self.cloud_size_for_constraints(constraints)
             if cloud_size is None:
                 unsatisfiable_jobs[job['uuid']] = (
-                    'Requirements for a single node exceed the available '
-                    'cloud node size')
+                    "Constraints cannot be satisfied by any node type")
             elif (want_count > self.max_nodes):
                 unsatisfiable_jobs[job['uuid']] = (
                     "Job's min_nodes constraint is greater than the configured "
                     "max_nodes (%d)" % self.max_nodes)
             elif (want_count*cloud_size.price <= self.max_price):
             elif (want_count > self.max_nodes):
                 unsatisfiable_jobs[job['uuid']] = (
                     "Job's min_nodes constraint is greater than the configured "
                     "max_nodes (%d)" % self.max_nodes)
             elif (want_count*cloud_size.price <= self.max_price):
-                servers.extend([cloud_size.real] * want_count)
+                servers.extend([cloud_size] * want_count)
             else:
                 unsatisfiable_jobs[job['uuid']] = (
                     "Job's price (%d) is above system's max_price "
             else:
                 unsatisfiable_jobs[job['uuid']] = (
                     "Job's price (%d) is above system's max_price "
@@ -111,7 +142,7 @@ class ServerCalculator(object):
         for s in self.cloud_sizes:
             if s.id == sizeid:
                 return s
         for s in self.cloud_sizes:
             if s.id == sizeid:
                 return s
-        return None
+        return self.InvalidCloudSize()
 
 
 class JobQueueMonitorActor(clientactor.RemotePollLoopActor):
 
 
 class JobQueueMonitorActor(clientactor.RemotePollLoopActor):
@@ -149,22 +180,47 @@ class JobQueueMonitorActor(clientactor.RemotePollLoopActor):
     def _send_request(self):
         queuelist = []
         if self.slurm_queue:
     def _send_request(self):
         queuelist = []
         if self.slurm_queue:
-            # cpus, memory, tempory disk space, reason, job name
-            squeue_out = subprocess.check_output(["squeue", "--state=PENDING", "--noheader", "--format=%c|%m|%d|%r|%j"])
+            # cpus, memory, tempory disk space, reason, job name, feature constraints, priority
+            squeue_out = subprocess.check_output(["squeue", "--state=PENDING", "--noheader", "--format=%c|%m|%d|%r|%j|%f|%Q"])
             for out in squeue_out.splitlines():
                 try:
             for out in squeue_out.splitlines():
                 try:
-                    cpu, ram, disk, reason, jobname = out.split("|", 4)
-                    if ("ReqNodeNotAvail" in reason) or ("Resources" in reason):
-                        queuelist.append({
-                            "uuid": jobname,
-                            "runtime_constraints": {
-                                "min_cores_per_node": cpu,
-                                "min_ram_mb_per_node": self.coerce_to_mb(ram),
-                                "min_scratch_mb_per_node": self.coerce_to_mb(disk)
-                            }
-                        })
+                    cpu, ram, disk, reason, jobname, features, priority = out.split("|", 6)
                 except ValueError:
                 except ValueError:
-                    pass
+                    self._logger.warning("ignored malformed line in squeue output: %r", out)
+                    continue
+                if '-dz642-' not in jobname:
+                    continue
+                if not re.search(r'BadConstraints|ReqNodeNotAvail|Resources|Priority', reason):
+                    continue
+
+                for feature in features.split(','):
+                    m = re.match(r'instancetype=(.*)', feature)
+                    if not m:
+                        continue
+                    instance_type = m.group(1)
+                    # Ignore cpu/ram/scratch requirements, bring up
+                    # the requested node type.
+                    queuelist.append({
+                        "uuid": jobname,
+                        "runtime_constraints": {
+                            "instance_type": instance_type,
+                        },
+                        "priority": int(priority)
+                    })
+                    break
+                else:
+                    # No instance type specified. Choose a node type
+                    # to suit cpu/ram/scratch requirements.
+                    queuelist.append({
+                        "uuid": jobname,
+                        "runtime_constraints": {
+                            "min_cores_per_node": cpu,
+                            "min_ram_mb_per_node": self.coerce_to_mb(ram),
+                            "min_scratch_mb_per_node": self.coerce_to_mb(disk)
+                        },
+                        "priority": int(priority)
+                    })
+            queuelist.sort(key=lambda x: x.get('priority', 1), reverse=True)
 
         if self.jobs_queue:
             queuelist.extend(self._client.jobs().queue().execute()['items'])
 
         if self.jobs_queue:
             queuelist.extend(self._client.jobs().queue().execute()['items'])
@@ -173,21 +229,27 @@ class JobQueueMonitorActor(clientactor.RemotePollLoopActor):
 
     def _got_response(self, queue):
         server_list, unsatisfiable_jobs = self._calculator.servers_for_queue(queue)
 
     def _got_response(self, queue):
         server_list, unsatisfiable_jobs = self._calculator.servers_for_queue(queue)
-        # Cancel any job with unsatisfiable requirements, emitting a log
-        # explaining why.
+        # Cancel any job/container with unsatisfiable requirements, emitting
+        # a log explaining why.
         for job_uuid, reason in unsatisfiable_jobs.iteritems():
         for job_uuid, reason in unsatisfiable_jobs.iteritems():
-            self._logger.debug("Cancelling unsatisfiable job '%s'", job_uuid)
             try:
                 self._client.logs().create(body={
                     'object_uuid': job_uuid,
                     'event_type': 'stderr',
                     'properties': {'text': reason},
                 }).execute()
             try:
                 self._client.logs().create(body={
                     'object_uuid': job_uuid,
                     'event_type': 'stderr',
                     'properties': {'text': reason},
                 }).execute()
-                self._client.jobs().cancel(uuid=job_uuid).execute()
+                # Cancel the job depending on its type
+                if arvados.util.container_uuid_pattern.match(job_uuid):
+                    subprocess.check_call(['scancel', '--name='+job_uuid])
+                elif arvados.util.job_uuid_pattern.match(job_uuid):
+                    self._client.jobs().cancel(uuid=job_uuid).execute()
+                else:
+                    raise Exception('Unknown job type')
+                self._logger.debug("Cancelled unsatisfiable job '%s'", job_uuid)
             except Exception as error:
                 self._logger.error("Trying to cancel job '%s': %s",
                                    job_uuid,
                                    error)
         self._logger.debug("Calculated wishlist: %s",
             except Exception as error:
                 self._logger.error("Trying to cancel job '%s': %s",
                                    job_uuid,
                                    error)
         self._logger.debug("Calculated wishlist: %s",
-                           ', '.join(s.name for s in server_list) or "(empty)")
+                           ', '.join(s.id for s in server_list) or "(empty)")
         return super(JobQueueMonitorActor, self)._got_response(server_list)
         return super(JobQueueMonitorActor, self)._got_response(server_list)