X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/b5d352d6099b60db5dcdd9183dcab3e8e17d729e..f9e94997cb5c2166d8b71874f263544cfc2fe5ba:/services/nodemanager/arvnodeman/jobqueue.py diff --git a/services/nodemanager/arvnodeman/jobqueue.py b/services/nodemanager/arvnodeman/jobqueue.py index a35bd92863..e91764474f 100644 --- a/services/nodemanager/arvnodeman/jobqueue.py +++ b/services/nodemanager/arvnodeman/jobqueue.py @@ -1,13 +1,20 @@ #!/usr/bin/env python +# Copyright (C) The Arvados Authors. All rights reserved. +# +# SPDX-License-Identifier: AGPL-3.0 from __future__ import absolute_import, print_function import logging +import re import subprocess +import arvados.util + from . import clientactor from .config import ARVADOS_ERRORS + class ServerCalculator(object): """Generate cloud server wishlists from an Arvados job queue. @@ -17,6 +24,26 @@ class ServerCalculator(object): that would best satisfy the jobs, choosing the cheapest size that satisfies each job, and ignoring jobs that can't be satisfied. """ + class InvalidCloudSize(object): + """ + Dummy CloudSizeWrapper-like class, to be used when a cloud node doesn't + have a recognizable arvados_node_size tag. + """ + def __init__(self): + self.id = 'invalid' + self.name = 'invalid' + self.ram = 0 + self.disk = 0 + self.scratch = 0 + self.cores = 0 + self.bandwidth = 0 + self.price = 9999999 + self.preemptible = False + self.extra = {} + + def meets_constraints(self, **kwargs): + return False + class CloudSizeWrapper(object): def __init__(self, real_size, node_mem_scaling, **kwargs): @@ -27,9 +54,13 @@ class ServerCalculator(object): self.cores = kwargs.pop('cores') # libcloud disk sizes are in GB, Arvados/SLURM are in MB # multiply by 1000 instead of 1024 to err on low side + if self.disk is None: + self.disk = 0 self.scratch = self.disk * 1000 self.ram = int(self.ram * node_mem_scaling) + self.preemptible = False for name, override in kwargs.iteritems(): + if name == 'instance_type': continue if not hasattr(self, name): raise ValueError("unrecognized size field '%s'" % (name,)) setattr(self, name, override) @@ -53,7 +84,6 @@ class ServerCalculator(object): self.max_nodes = max_nodes or float('inf') self.max_price = max_price or float('inf') self.logger = logging.getLogger('arvnodeman.jobqueue') - self.logged_jobs = set() self.logger.info("Using cloud node sizes:") for s in self.cloud_sizes: @@ -67,31 +97,40 @@ class ServerCalculator(object): return fallback def cloud_size_for_constraints(self, constraints): + specified_size = constraints.get('instance_type') want_value = lambda key: self.coerce_int(constraints.get(key), 0) wants = {'cores': want_value('min_cores_per_node'), 'ram': want_value('min_ram_mb_per_node'), 'scratch': want_value('min_scratch_mb_per_node')} + # EC2 node sizes are identified by id. GCE sizes are identified by name. for size in self.cloud_sizes: - if size.meets_constraints(**wants): - return size + if (size.meets_constraints(**wants) and + (specified_size is None or + size.id == specified_size or size.name == specified_size)): + return size return None def servers_for_queue(self, queue): servers = [] - seen_jobs = set() + unsatisfiable_jobs = {} for job in queue: - seen_jobs.add(job['uuid']) constraints = job['runtime_constraints'] want_count = max(1, self.coerce_int(constraints.get('min_nodes'), 1)) cloud_size = self.cloud_size_for_constraints(constraints) if cloud_size is None: - if job['uuid'] not in self.logged_jobs: - self.logged_jobs.add(job['uuid']) - self.logger.debug("job %s not satisfiable", job['uuid']) - elif (want_count <= self.max_nodes) and (want_count*cloud_size.price <= self.max_price): - servers.extend([cloud_size.real] * want_count) - self.logged_jobs.intersection_update(seen_jobs) - return servers + unsatisfiable_jobs[job['uuid']] = ( + "Constraints cannot be satisfied by any node type") + elif (want_count > self.max_nodes): + unsatisfiable_jobs[job['uuid']] = ( + "Job's min_nodes constraint is greater than the configured " + "max_nodes (%d)" % self.max_nodes) + elif (want_count*cloud_size.price <= self.max_price): + servers.extend([cloud_size] * want_count) + else: + unsatisfiable_jobs[job['uuid']] = ( + "Job's price (%d) is above system's max_price " + "limit (%d)" % (want_count*cloud_size.price, self.max_price)) + return (servers, unsatisfiable_jobs) def cheapest_size(self): return self.cloud_sizes[0] @@ -100,7 +139,8 @@ class ServerCalculator(object): for s in self.cloud_sizes: if s.id == sizeid: return s - return None + return self.InvalidCloudSize() + class JobQueueMonitorActor(clientactor.RemotePollLoopActor): """Actor to generate server wishlists from the job queue. @@ -112,9 +152,12 @@ class JobQueueMonitorActor(clientactor.RemotePollLoopActor): CLIENT_ERRORS = ARVADOS_ERRORS - def __init__(self, client, timer_actor, server_calc, *args, **kwargs): + def __init__(self, client, timer_actor, server_calc, + jobs_queue, slurm_queue, *args, **kwargs): super(JobQueueMonitorActor, self).__init__( client, timer_actor, *args, **kwargs) + self.jobs_queue = jobs_queue + self.slurm_queue = slurm_queue self._calculator = server_calc @staticmethod @@ -132,27 +175,78 @@ class JobQueueMonitorActor(clientactor.RemotePollLoopActor): return int(x) def _send_request(self): - # cpus, memory, tempory disk space, reason, job name - squeue_out = subprocess.check_output(["squeue", "--state=PENDING", "--noheader", "--format=%c %m %d %r %j"]) queuelist = [] - for out in squeue_out.splitlines(): - cpu, ram, disk, reason, jobname = out.split(" ", 4) - if reason in ("Resources", "ReqNodeNotAvail"): - queuelist.append({ - "uuid": jobname, - "runtime_constraints": { - "min_cores_per_node": cpu, - "min_ram_mb_per_node": self.coerce_to_mb(ram), - "min_scratch_mb_per_node": self.coerce_to_mb(disk) - } - }) - - queuelist.extend(self._client.jobs().queue().execute()['items']) + if self.slurm_queue: + # cpus, memory, tempory disk space, reason, job name, feature constraints, priority + squeue_out = subprocess.check_output(["squeue", "--state=PENDING", "--noheader", "--format=%c|%m|%d|%r|%j|%f|%Q"]) + for out in squeue_out.splitlines(): + try: + cpu, ram, disk, reason, jobname, features, priority = out.split("|", 6) + except ValueError: + self._logger.warning("ignored malformed line in squeue output: %r", out) + continue + if '-dz642-' not in jobname: + continue + if not re.search(r'BadConstraints|ReqNodeNotAvail|Resources|Priority', reason): + continue + + for feature in features.split(','): + m = re.match(r'instancetype=(.*)', feature) + if not m: + continue + instance_type = m.group(1) + # Ignore cpu/ram/scratch requirements, bring up + # the requested node type. + queuelist.append({ + "uuid": jobname, + "runtime_constraints": { + "instance_type": instance_type, + }, + "priority": int(priority) + }) + break + else: + # No instance type specified. Choose a node type + # to suit cpu/ram/scratch requirements. + queuelist.append({ + "uuid": jobname, + "runtime_constraints": { + "min_cores_per_node": cpu, + "min_ram_mb_per_node": self.coerce_to_mb(ram), + "min_scratch_mb_per_node": self.coerce_to_mb(disk) + }, + "priority": int(priority) + }) + queuelist.sort(key=lambda x: x.get('priority', 1), reverse=True) + + if self.jobs_queue: + queuelist.extend(self._client.jobs().queue().execute()['items']) return queuelist def _got_response(self, queue): - server_list = self._calculator.servers_for_queue(queue) + server_list, unsatisfiable_jobs = self._calculator.servers_for_queue(queue) + # Cancel any job/container with unsatisfiable requirements, emitting + # a log explaining why. + for job_uuid, reason in unsatisfiable_jobs.iteritems(): + try: + self._client.logs().create(body={ + 'object_uuid': job_uuid, + 'event_type': 'stderr', + 'properties': {'text': reason}, + }).execute() + # Cancel the job depending on its type + if arvados.util.container_uuid_pattern.match(job_uuid): + subprocess.check_call(['scancel', '--name='+job_uuid]) + elif arvados.util.job_uuid_pattern.match(job_uuid): + self._client.jobs().cancel(uuid=job_uuid).execute() + else: + raise Exception('Unknown job type') + self._logger.debug("Cancelled unsatisfiable job '%s'", job_uuid) + except Exception as error: + self._logger.error("Trying to cancel job '%s': %s", + job_uuid, + error) self._logger.debug("Calculated wishlist: %s", - ', '.join(s.name for s in server_list) or "(empty)") + ', '.join(s.id for s in server_list) or "(empty)") return super(JobQueueMonitorActor, self)._got_response(server_list)