11807: Migrate old records in jobs table from YAML to JSON.
[arvados.git] / services / nodemanager / arvnodeman / jobqueue.py
index 0340918e736549403d99469bad52f9f9c4a93154..2237420ffb39d5da6ec408c26f7d8f12ed6a42e3 100644 (file)
@@ -27,6 +27,8 @@ class ServerCalculator(object):
             self.cores = kwargs.pop('cores')
             # libcloud disk sizes are in GB, Arvados/SLURM are in MB
             # multiply by 1000 instead of 1024 to err on low side
+            if self.disk is None:
+                self.disk = 0
             self.scratch = self.disk * 1000
             self.ram = int(self.ram * node_mem_scaling)
             for name, override in kwargs.iteritems():
@@ -112,9 +114,12 @@ class JobQueueMonitorActor(clientactor.RemotePollLoopActor):
 
     CLIENT_ERRORS = ARVADOS_ERRORS
 
-    def __init__(self, client, timer_actor, server_calc, *args, **kwargs):
+    def __init__(self, client, timer_actor, server_calc,
+                 jobs_queue, slurm_queue, *args, **kwargs):
         super(JobQueueMonitorActor, self).__init__(
             client, timer_actor, *args, **kwargs)
+        self.jobs_queue = jobs_queue
+        self.slurm_queue = slurm_queue
         self._calculator = server_calc
 
     @staticmethod
@@ -132,22 +137,27 @@ class JobQueueMonitorActor(clientactor.RemotePollLoopActor):
             return int(x)
 
     def _send_request(self):
-        # cpus, memory, tempory disk space, reason, job name
-        squeue_out = subprocess.check_output(["squeue", "--state=PENDING", "--noheader", "--format=%c|%m|%d|%r|%j"])
         queuelist = []
-        for out in squeue_out.splitlines():
-            cpu, ram, disk, reason, jobname = out.split("|", 4)
-            if ("ReqNodeNotAvail" in reason) or ("Resources" in reason):
-                queuelist.append({
-                    "uuid": jobname,
-                    "runtime_constraints": {
-                        "min_cores_per_node": cpu,
-                        "min_ram_mb_per_node": self.coerce_to_mb(ram),
-                        "min_scratch_mb_per_node": self.coerce_to_mb(disk)
-                    }
-                })
-
-        queuelist.extend(self._client.jobs().queue().execute()['items'])
+        if self.slurm_queue:
+            # cpus, memory, tempory disk space, reason, job name
+            squeue_out = subprocess.check_output(["squeue", "--state=PENDING", "--noheader", "--format=%c|%m|%d|%r|%j"])
+            for out in squeue_out.splitlines():
+                try:
+                    cpu, ram, disk, reason, jobname = out.split("|", 4)
+                    if ("ReqNodeNotAvail" in reason) or ("Resources" in reason):
+                        queuelist.append({
+                            "uuid": jobname,
+                            "runtime_constraints": {
+                                "min_cores_per_node": cpu,
+                                "min_ram_mb_per_node": self.coerce_to_mb(ram),
+                                "min_scratch_mb_per_node": self.coerce_to_mb(disk)
+                            }
+                        })
+                except ValueError:
+                    pass
+
+        if self.jobs_queue:
+            queuelist.extend(self._client.jobs().queue().execute()['items'])
 
         return queuelist