X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/3d6d097f42128bde90b7bc184057a84e99ea3e0a..8ab6b482342b95ad35775867bcdb8fd691b78fb7:/services/nodemanager/arvnodeman/jobqueue.py diff --git a/services/nodemanager/arvnodeman/jobqueue.py b/services/nodemanager/arvnodeman/jobqueue.py index 0360bfc542..7ca9c95537 100644 --- a/services/nodemanager/arvnodeman/jobqueue.py +++ b/services/nodemanager/arvnodeman/jobqueue.py @@ -7,7 +7,7 @@ from __future__ import absolute_import, print_function import logging import re -import subprocess +import subprocess32 as subprocess import arvados.util @@ -24,6 +24,29 @@ class ServerCalculator(object): that would best satisfy the jobs, choosing the cheapest size that satisfies each job, and ignoring jobs that can't be satisfied. """ + class InvalidCloudSize(object): + """ + Dummy CloudSizeWrapper-like class, to be used when a cloud node doesn't + have a recognizable arvados_node_size tag. + """ + def __init__(self): + self.id = 'invalid' + self.name = 'invalid' + self.ram = 0 + self.disk = 0 + self.scratch = 0 + self.cores = 0 + self.bandwidth = 0 + # price is multiplied by 1000 to get the node weight + # the maximum node weight is 4294967280 + # so use invalid node weight 4294967 * 1000 = 4294967000 + self.price = 4294967 + self.preemptible = False + self.extra = {} + + def meets_constraints(self, **kwargs): + return False + class CloudSizeWrapper(object): def __init__(self, real_size, node_mem_scaling, **kwargs): @@ -38,7 +61,9 @@ class ServerCalculator(object): self.disk = 0 self.scratch = self.disk * 1000 self.ram = int(self.ram * node_mem_scaling) + self.preemptible = False for name, override in kwargs.iteritems(): + if name == 'instance_type': continue if not hasattr(self, name): raise ValueError("unrecognized size field '%s'" % (name,)) setattr(self, name, override) @@ -80,10 +105,12 @@ class ServerCalculator(object): wants = {'cores': want_value('min_cores_per_node'), 'ram': want_value('min_ram_mb_per_node'), 'scratch': want_value('min_scratch_mb_per_node')} + # EC2 node sizes are identified by id. GCE sizes are identified by name. for size in self.cloud_sizes: if (size.meets_constraints(**wants) and - (specified_size is None or size.id == specified_size)): - return size + (specified_size is None or + size.id == specified_size or size.name == specified_size)): + return size return None def servers_for_queue(self, queue): @@ -101,7 +128,7 @@ class ServerCalculator(object): "Job's min_nodes constraint is greater than the configured " "max_nodes (%d)" % self.max_nodes) elif (want_count*cloud_size.price <= self.max_price): - servers.extend([cloud_size.real] * want_count) + servers.extend([cloud_size] * want_count) else: unsatisfiable_jobs[job['uuid']] = ( "Job's price (%d) is above system's max_price " @@ -115,7 +142,7 @@ class ServerCalculator(object): for s in self.cloud_sizes: if s.id == sizeid: return s - return None + return self.InvalidCloudSize() class JobQueueMonitorActor(clientactor.RemotePollLoopActor): @@ -153,17 +180,17 @@ class JobQueueMonitorActor(clientactor.RemotePollLoopActor): def _send_request(self): queuelist = [] if self.slurm_queue: - # cpus, memory, tempory disk space, reason, job name - squeue_out = subprocess.check_output(["squeue", "--state=PENDING", "--noheader", "--format=%c|%m|%d|%r|%j|%f"]) + # cpus, memory, tempory disk space, reason, job name, feature constraints, priority + squeue_out = subprocess.check_output(["squeue", "--state=PENDING", "--noheader", "--format=%c|%m|%d|%r|%j|%f|%Q"]) for out in squeue_out.splitlines(): try: - cpu, ram, disk, reason, jobname, features = out.split("|", 5) + cpu, ram, disk, reason, jobname, features, priority = out.split("|", 6) except ValueError: self._logger.warning("ignored malformed line in squeue output: %r", out) continue if '-dz642-' not in jobname: continue - if not re.search(r'ReqNodeNotAvail|Resources|Priority', reason): + if not re.search(r'BadConstraints|ReqNodeNotAvail|Resources|Priority', reason): continue for feature in features.split(','): @@ -177,7 +204,8 @@ class JobQueueMonitorActor(clientactor.RemotePollLoopActor): "uuid": jobname, "runtime_constraints": { "instance_type": instance_type, - } + }, + "priority": int(priority) }) break else: @@ -189,8 +217,10 @@ class JobQueueMonitorActor(clientactor.RemotePollLoopActor): "min_cores_per_node": cpu, "min_ram_mb_per_node": self.coerce_to_mb(ram), "min_scratch_mb_per_node": self.coerce_to_mb(disk) - } + }, + "priority": int(priority) }) + queuelist.sort(key=lambda x: x.get('priority', 1), reverse=True) if self.jobs_queue: queuelist.extend(self._client.jobs().queue().execute()['items']) @@ -221,5 +251,5 @@ class JobQueueMonitorActor(clientactor.RemotePollLoopActor): job_uuid, error) self._logger.debug("Calculated wishlist: %s", - ', '.join(s.name for s in server_list) or "(empty)") + ', '.join(s.id for s in server_list) or "(empty)") return super(JobQueueMonitorActor, self)._got_response(server_list)