X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/0c529ed05805507b4d2c903b9587e9b61cec5ee6..0f644e242ef37c911ad3dc25aca8135c339de349:/services/nodemanager/arvnodeman/jobqueue.py diff --git a/services/nodemanager/arvnodeman/jobqueue.py b/services/nodemanager/arvnodeman/jobqueue.py index abe8852380..ca914e1096 100644 --- a/services/nodemanager/arvnodeman/jobqueue.py +++ b/services/nodemanager/arvnodeman/jobqueue.py @@ -1,4 +1,7 @@ #!/usr/bin/env python +# Copyright (C) The Arvados Authors. All rights reserved. +# +# SPDX-License-Identifier: AGPL-3.0 from __future__ import absolute_import, print_function @@ -19,13 +22,18 @@ class ServerCalculator(object): """ class CloudSizeWrapper(object): - def __init__(self, real_size, **kwargs): + def __init__(self, real_size, node_mem_scaling, **kwargs): self.real = real_size for name in ['id', 'name', 'ram', 'disk', 'bandwidth', 'price', 'extra']: setattr(self, name, getattr(self.real, name)) self.cores = kwargs.pop('cores') - self.scratch = self.disk + # libcloud disk sizes are in GB, Arvados/SLURM are in MB + # multiply by 1000 instead of 1024 to err on low side + if self.disk is None: + self.disk = 0 + self.scratch = self.disk * 1000 + self.ram = int(self.ram * node_mem_scaling) for name, override in kwargs.iteritems(): if not hasattr(self, name): raise ValueError("unrecognized size field '%s'" % (name,)) @@ -42,8 +50,9 @@ class ServerCalculator(object): return True - def __init__(self, server_list, max_nodes=None, max_price=None): - self.cloud_sizes = [self.CloudSizeWrapper(s, **kws) + def __init__(self, server_list, max_nodes=None, max_price=None, + node_mem_scaling=0.95): + self.cloud_sizes = [self.CloudSizeWrapper(s, node_mem_scaling, **kws) for s, kws in server_list] self.cloud_sizes.sort(key=lambda s: s.price) self.max_nodes = max_nodes or float('inf') @@ -51,6 +60,10 @@ class ServerCalculator(object): self.logger = logging.getLogger('arvnodeman.jobqueue') self.logged_jobs = set() + self.logger.info("Using cloud node sizes:") + for s in self.cloud_sizes: + self.logger.info(str(s.__dict__)) + @staticmethod def coerce_int(x, fallback): try: @@ -104,28 +117,50 @@ class JobQueueMonitorActor(clientactor.RemotePollLoopActor): CLIENT_ERRORS = ARVADOS_ERRORS - def __init__(self, client, timer_actor, server_calc, *args, **kwargs): + def __init__(self, client, timer_actor, server_calc, + jobs_queue, slurm_queue, *args, **kwargs): super(JobQueueMonitorActor, self).__init__( client, timer_actor, *args, **kwargs) + self.jobs_queue = jobs_queue + self.slurm_queue = slurm_queue self._calculator = server_calc + @staticmethod + def coerce_to_mb(x): + v, u = x[:-1], x[-1] + if u in ("M", "m"): + return int(v) + elif u in ("G", "g"): + return float(v) * 2**10 + elif u in ("T", "t"): + return float(v) * 2**20 + elif u in ("P", "p"): + return float(v) * 2**30 + else: + return int(x) + def _send_request(self): - # cpus, memory, tempory disk space, reason, job name - squeue_out = subprocess.check_output(["squeue", "--state=PENDING", "--noheader", "--format=%c %m %d %r %j"]) queuelist = [] - for out in squeue_out.splitlines(): - cpu, ram, disk, reason, jobname = out.split(" ", 4) - if reason in ("Resources", "ReqNodeNotAvail"): - queuelist.append({ - "uuid": jobname, - "runtime_constraints": { - "min_cores_per_node": cpu, - "min_ram_mb_per_node": ram, - "min_scratch_mb_per_node": disk - } - }) - - queuelist.extend(self._client.jobs().queue().execute()['items']) + if self.slurm_queue: + # cpus, memory, tempory disk space, reason, job name + squeue_out = subprocess.check_output(["squeue", "--state=PENDING", "--noheader", "--format=%c|%m|%d|%r|%j"]) + for out in squeue_out.splitlines(): + try: + cpu, ram, disk, reason, jobname = out.split("|", 4) + if ("ReqNodeNotAvail" in reason) or ("Resources" in reason): + queuelist.append({ + "uuid": jobname, + "runtime_constraints": { + "min_cores_per_node": cpu, + "min_ram_mb_per_node": self.coerce_to_mb(ram), + "min_scratch_mb_per_node": self.coerce_to_mb(disk) + } + }) + except ValueError: + pass + + if self.jobs_queue: + queuelist.extend(self._client.jobs().queue().execute()['items']) return queuelist