X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/fdc9a9308c646d23ec50073833f141ceebf78613..e2bf56f0a0fa1f6b4fb7b4efc4db5178b074b8ce:/services/nodemanager/arvnodeman/jobqueue.py diff --git a/services/nodemanager/arvnodeman/jobqueue.py b/services/nodemanager/arvnodeman/jobqueue.py index 87cf738311..4d2d3e0c0a 100644 --- a/services/nodemanager/arvnodeman/jobqueue.py +++ b/services/nodemanager/arvnodeman/jobqueue.py @@ -1,12 +1,19 @@ #!/usr/bin/env python +# Copyright (C) The Arvados Authors. All rights reserved. +# +# SPDX-License-Identifier: AGPL-3.0 from __future__ import absolute_import, print_function import logging +import subprocess + +import arvados.util from . import clientactor from .config import ARVADOS_ERRORS + class ServerCalculator(object): """Generate cloud server wishlists from an Arvados job queue. @@ -18,13 +25,18 @@ class ServerCalculator(object): """ class CloudSizeWrapper(object): - def __init__(self, real_size, **kwargs): + def __init__(self, real_size, node_mem_scaling, **kwargs): self.real = real_size for name in ['id', 'name', 'ram', 'disk', 'bandwidth', 'price', 'extra']: setattr(self, name, getattr(self.real, name)) self.cores = kwargs.pop('cores') - self.scratch = self.disk + # libcloud disk sizes are in GB, Arvados/SLURM are in MB + # multiply by 1000 instead of 1024 to err on low side + if self.disk is None: + self.disk = 0 + self.scratch = self.disk * 1000 + self.ram = int(self.ram * node_mem_scaling) for name, override in kwargs.iteritems(): if not hasattr(self, name): raise ValueError("unrecognized size field '%s'" % (name,)) @@ -41,14 +53,18 @@ class ServerCalculator(object): return True - def __init__(self, server_list, max_nodes=None, max_price=None): - self.cloud_sizes = [self.CloudSizeWrapper(s, **kws) + def __init__(self, server_list, max_nodes=None, max_price=None, + node_mem_scaling=0.95): + self.cloud_sizes = [self.CloudSizeWrapper(s, node_mem_scaling, **kws) for s, kws in server_list] self.cloud_sizes.sort(key=lambda s: s.price) self.max_nodes = max_nodes or float('inf') self.max_price = max_price or float('inf') self.logger = logging.getLogger('arvnodeman.jobqueue') - self.logged_jobs = set() + + self.logger.info("Using cloud node sizes:") + for s in self.cloud_sizes: + self.logger.info(str(s.__dict__)) @staticmethod def coerce_int(x, fallback): @@ -69,20 +85,26 @@ class ServerCalculator(object): def servers_for_queue(self, queue): servers = [] - seen_jobs = set() + unsatisfiable_jobs = {} for job in queue: - seen_jobs.add(job['uuid']) constraints = job['runtime_constraints'] want_count = max(1, self.coerce_int(constraints.get('min_nodes'), 1)) cloud_size = self.cloud_size_for_constraints(constraints) if cloud_size is None: - if job['uuid'] not in self.logged_jobs: - self.logged_jobs.add(job['uuid']) - self.logger.debug("job %s not satisfiable", job['uuid']) - elif (want_count <= self.max_nodes) and (want_count*cloud_size.price <= self.max_price): + unsatisfiable_jobs[job['uuid']] = ( + 'Requirements for a single node exceed the available ' + 'cloud node size') + elif (want_count > self.max_nodes): + unsatisfiable_jobs[job['uuid']] = ( + "Job's min_nodes constraint is greater than the configured " + "max_nodes (%d)" % self.max_nodes) + elif (want_count*cloud_size.price <= self.max_price): servers.extend([cloud_size.real] * want_count) - self.logged_jobs.intersection_update(seen_jobs) - return servers + else: + unsatisfiable_jobs[job['uuid']] = ( + "Job's price (%d) is above system's max_price " + "limit (%d)" % (want_count*cloud_size.price, self.max_price)) + return (servers, unsatisfiable_jobs) def cheapest_size(self): return self.cloud_sizes[0] @@ -93,6 +115,7 @@ class ServerCalculator(object): return s return None + class JobQueueMonitorActor(clientactor.RemotePollLoopActor): """Actor to generate server wishlists from the job queue. @@ -103,16 +126,76 @@ class JobQueueMonitorActor(clientactor.RemotePollLoopActor): CLIENT_ERRORS = ARVADOS_ERRORS - def __init__(self, client, timer_actor, server_calc, *args, **kwargs): + def __init__(self, client, timer_actor, server_calc, + jobs_queue, slurm_queue, *args, **kwargs): super(JobQueueMonitorActor, self).__init__( client, timer_actor, *args, **kwargs) + self.jobs_queue = jobs_queue + self.slurm_queue = slurm_queue self._calculator = server_calc + @staticmethod + def coerce_to_mb(x): + v, u = x[:-1], x[-1] + if u in ("M", "m"): + return int(v) + elif u in ("G", "g"): + return float(v) * 2**10 + elif u in ("T", "t"): + return float(v) * 2**20 + elif u in ("P", "p"): + return float(v) * 2**30 + else: + return int(x) + def _send_request(self): - return self._client.jobs().queue().execute()['items'] + queuelist = [] + if self.slurm_queue: + # cpus, memory, tempory disk space, reason, job name + squeue_out = subprocess.check_output(["squeue", "--state=PENDING", "--noheader", "--format=%c|%m|%d|%r|%j"]) + for out in squeue_out.splitlines(): + try: + cpu, ram, disk, reason, jobname = out.split("|", 4) + if ("ReqNodeNotAvail" in reason) or ("Resources" in reason) or ("Priority" in reason): + queuelist.append({ + "uuid": jobname, + "runtime_constraints": { + "min_cores_per_node": cpu, + "min_ram_mb_per_node": self.coerce_to_mb(ram), + "min_scratch_mb_per_node": self.coerce_to_mb(disk) + } + }) + except ValueError: + pass + + if self.jobs_queue: + queuelist.extend(self._client.jobs().queue().execute()['items']) + + return queuelist def _got_response(self, queue): - server_list = self._calculator.servers_for_queue(queue) + server_list, unsatisfiable_jobs = self._calculator.servers_for_queue(queue) + # Cancel any job/container with unsatisfiable requirements, emitting + # a log explaining why. + for job_uuid, reason in unsatisfiable_jobs.iteritems(): + try: + self._client.logs().create(body={ + 'object_uuid': job_uuid, + 'event_type': 'stderr', + 'properties': {'text': reason}, + }).execute() + # Cancel the job depending on its type + if arvados.util.container_uuid_pattern.match(job_uuid): + subprocess.check_call(['scancel', '--name='+job_uuid]) + elif arvados.util.job_uuid_pattern.match(job_uuid): + self._client.jobs().cancel(uuid=job_uuid).execute() + else: + raise Exception('Unknown job type') + self._logger.debug("Cancelled unsatisfiable job '%s'", job_uuid) + except Exception as error: + self._logger.error("Trying to cancel job '%s': %s", + job_uuid, + error) self._logger.debug("Calculated wishlist: %s", ', '.join(s.name for s in server_list) or "(empty)") return super(JobQueueMonitorActor, self)._got_response(server_list)