X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/0eb72b526bf8bbb011551ecf019f604e17a534f1..e2bf56f0a0fa1f6b4fb7b4efc4db5178b074b8ce:/services/nodemanager/arvnodeman/jobqueue.py diff --git a/services/nodemanager/arvnodeman/jobqueue.py b/services/nodemanager/arvnodeman/jobqueue.py index ca914e1096..4d2d3e0c0a 100644 --- a/services/nodemanager/arvnodeman/jobqueue.py +++ b/services/nodemanager/arvnodeman/jobqueue.py @@ -8,9 +8,12 @@ from __future__ import absolute_import, print_function import logging import subprocess +import arvados.util + from . import clientactor from .config import ARVADOS_ERRORS + class ServerCalculator(object): """Generate cloud server wishlists from an Arvados job queue. @@ -58,7 +61,6 @@ class ServerCalculator(object): self.max_nodes = max_nodes or float('inf') self.max_price = max_price or float('inf') self.logger = logging.getLogger('arvnodeman.jobqueue') - self.logged_jobs = set() self.logger.info("Using cloud node sizes:") for s in self.cloud_sizes: @@ -83,20 +85,26 @@ class ServerCalculator(object): def servers_for_queue(self, queue): servers = [] - seen_jobs = set() + unsatisfiable_jobs = {} for job in queue: - seen_jobs.add(job['uuid']) constraints = job['runtime_constraints'] want_count = max(1, self.coerce_int(constraints.get('min_nodes'), 1)) cloud_size = self.cloud_size_for_constraints(constraints) if cloud_size is None: - if job['uuid'] not in self.logged_jobs: - self.logged_jobs.add(job['uuid']) - self.logger.debug("job %s not satisfiable", job['uuid']) - elif (want_count <= self.max_nodes) and (want_count*cloud_size.price <= self.max_price): + unsatisfiable_jobs[job['uuid']] = ( + 'Requirements for a single node exceed the available ' + 'cloud node size') + elif (want_count > self.max_nodes): + unsatisfiable_jobs[job['uuid']] = ( + "Job's min_nodes constraint is greater than the configured " + "max_nodes (%d)" % self.max_nodes) + elif (want_count*cloud_size.price <= self.max_price): servers.extend([cloud_size.real] * want_count) - self.logged_jobs.intersection_update(seen_jobs) - return servers + else: + unsatisfiable_jobs[job['uuid']] = ( + "Job's price (%d) is above system's max_price " + "limit (%d)" % (want_count*cloud_size.price, self.max_price)) + return (servers, unsatisfiable_jobs) def cheapest_size(self): return self.cloud_sizes[0] @@ -107,6 +115,7 @@ class ServerCalculator(object): return s return None + class JobQueueMonitorActor(clientactor.RemotePollLoopActor): """Actor to generate server wishlists from the job queue. @@ -147,7 +156,7 @@ class JobQueueMonitorActor(clientactor.RemotePollLoopActor): for out in squeue_out.splitlines(): try: cpu, ram, disk, reason, jobname = out.split("|", 4) - if ("ReqNodeNotAvail" in reason) or ("Resources" in reason): + if ("ReqNodeNotAvail" in reason) or ("Resources" in reason) or ("Priority" in reason): queuelist.append({ "uuid": jobname, "runtime_constraints": { @@ -165,7 +174,28 @@ class JobQueueMonitorActor(clientactor.RemotePollLoopActor): return queuelist def _got_response(self, queue): - server_list = self._calculator.servers_for_queue(queue) + server_list, unsatisfiable_jobs = self._calculator.servers_for_queue(queue) + # Cancel any job/container with unsatisfiable requirements, emitting + # a log explaining why. + for job_uuid, reason in unsatisfiable_jobs.iteritems(): + try: + self._client.logs().create(body={ + 'object_uuid': job_uuid, + 'event_type': 'stderr', + 'properties': {'text': reason}, + }).execute() + # Cancel the job depending on its type + if arvados.util.container_uuid_pattern.match(job_uuid): + subprocess.check_call(['scancel', '--name='+job_uuid]) + elif arvados.util.job_uuid_pattern.match(job_uuid): + self._client.jobs().cancel(uuid=job_uuid).execute() + else: + raise Exception('Unknown job type') + self._logger.debug("Cancelled unsatisfiable job '%s'", job_uuid) + except Exception as error: + self._logger.error("Trying to cancel job '%s': %s", + job_uuid, + error) self._logger.debug("Calculated wishlist: %s", ', '.join(s.name for s in server_list) or "(empty)") return super(JobQueueMonitorActor, self)._got_response(server_list)