Arvados-DCO-1.1-Signed-off-by: Radhika Chippada <radhika@curoverse.com>
[arvados.git] / services / nodemanager / arvnodeman / jobqueue.py
index f6e9249ebb812b3e33610479d28dd27f28c000fe..ca914e1096def7d28a9be41e90ffcbac2d01d203 100644 (file)
@@ -1,4 +1,7 @@
 #!/usr/bin/env python
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
 
 from __future__ import absolute_import, print_function
 
@@ -25,7 +28,11 @@ class ServerCalculator(object):
                          'extra']:
                 setattr(self, name, getattr(self.real, name))
             self.cores = kwargs.pop('cores')
-            self.scratch = self.disk
+            # libcloud disk sizes are in GB, Arvados/SLURM are in MB
+            # multiply by 1000 instead of 1024 to err on low side
+            if self.disk is None:
+                self.disk = 0
+            self.scratch = self.disk * 1000
             self.ram = int(self.ram * node_mem_scaling)
             for name, override in kwargs.iteritems():
                 if not hasattr(self, name):
@@ -53,6 +60,10 @@ class ServerCalculator(object):
         self.logger = logging.getLogger('arvnodeman.jobqueue')
         self.logged_jobs = set()
 
+        self.logger.info("Using cloud node sizes:")
+        for s in self.cloud_sizes:
+            self.logger.info(str(s.__dict__))
+
     @staticmethod
     def coerce_int(x, fallback):
         try:
@@ -106,28 +117,50 @@ class JobQueueMonitorActor(clientactor.RemotePollLoopActor):
 
     CLIENT_ERRORS = ARVADOS_ERRORS
 
-    def __init__(self, client, timer_actor, server_calc, *args, **kwargs):
+    def __init__(self, client, timer_actor, server_calc,
+                 jobs_queue, slurm_queue, *args, **kwargs):
         super(JobQueueMonitorActor, self).__init__(
             client, timer_actor, *args, **kwargs)
+        self.jobs_queue = jobs_queue
+        self.slurm_queue = slurm_queue
         self._calculator = server_calc
 
+    @staticmethod
+    def coerce_to_mb(x):
+        v, u = x[:-1], x[-1]
+        if u in ("M", "m"):
+            return int(v)
+        elif u in ("G", "g"):
+            return float(v) * 2**10
+        elif u in ("T", "t"):
+            return float(v) * 2**20
+        elif u in ("P", "p"):
+            return float(v) * 2**30
+        else:
+            return int(x)
+
     def _send_request(self):
-        # cpus, memory, tempory disk space, reason, job name
-        squeue_out = subprocess.check_output(["squeue", "--state=PENDING", "--noheader", "--format=%c %m %d %r %j"])
         queuelist = []
-        for out in squeue_out.splitlines():
-            cpu, ram, disk, reason, jobname = out.split(" ", 4)
-            if reason in ("Resources", "ReqNodeNotAvail"):
-                queuelist.append({
-                    "uuid": jobname,
-                    "runtime_constraints": {
-                        "min_cores_per_node": cpu,
-                        "min_ram_mb_per_node": ram,
-                        "min_scratch_mb_per_node": disk
-                    }
-                })
-
-        queuelist.extend(self._client.jobs().queue().execute()['items'])
+        if self.slurm_queue:
+            # cpus, memory, tempory disk space, reason, job name
+            squeue_out = subprocess.check_output(["squeue", "--state=PENDING", "--noheader", "--format=%c|%m|%d|%r|%j"])
+            for out in squeue_out.splitlines():
+                try:
+                    cpu, ram, disk, reason, jobname = out.split("|", 4)
+                    if ("ReqNodeNotAvail" in reason) or ("Resources" in reason):
+                        queuelist.append({
+                            "uuid": jobname,
+                            "runtime_constraints": {
+                                "min_cores_per_node": cpu,
+                                "min_ram_mb_per_node": self.coerce_to_mb(ram),
+                                "min_scratch_mb_per_node": self.coerce_to_mb(disk)
+                            }
+                        })
+                except ValueError:
+                    pass
+
+        if self.jobs_queue:
+            queuelist.extend(self._client.jobs().queue().execute()['items'])
 
         return queuelist