client, timer_actor, *args, **kwargs)
self._calculator = server_calc
+ @staticmethod
+ def coerce_to_mb(x):
+ v, u = x[:-1], x[-1]
+ if u in ("M", "m"):
+ return int(v)
+ elif u in ("G", "g"):
+ return float(v) * 2**10
+ elif u in ("T", "t"):
+ return float(v) * 2**20
+ elif u in ("P", "p"):
+ return float(v) * 2**30
+ else:
+ return int(x)
+
def _send_request(self):
# cpus, memory, tempory disk space, reason, job name
squeue_out = subprocess.check_output(["squeue", "--state=PENDING", "--noheader", "--format=%c %m %d %r %j"])
"uuid": jobname,
"runtime_constraints": {
"min_cores_per_node": cpu,
- "min_ram_mb_per_node": ram,
- "min_scratch_mb_per_node": disk
+ "min_ram_mb_per_node": self.coerce_to_mb(ram),
+ "min_scratch_mb_per_node": self.coerce_to_mb(disk)
}
})
@mock.patch("subprocess.check_output")
def test_squeue_server_list(self, mock_squeue):
- mock_squeue.return_value = """1 0 0 Resources zzzzz-zzzzz-zzzzzzzzzzzzzzy
-2 0 0 Resources zzzzz-zzzzz-zzzzzzzzzzzzzzz
+ mock_squeue.return_value = """1 1024 0 Resources zzzzz-zzzzz-zzzzzzzzzzzzzzy
+2 1024 0 Resources zzzzz-zzzzz-zzzzzzzzzzzzzzz
"""
super(JobQueueMonitorActorTestCase, self).build_monitor(jobqueue.ServerCalculator(
- [(testutil.MockSize(n), {'cores': n, 'ram': n, 'scratch': n}) for n in range(1, 3)]))
+ [(testutil.MockSize(n), {'cores': n, 'ram': n*1024, 'scratch': n}) for n in range(1, 3)]))
self.monitor.subscribe(self.subscriber).get(self.TIMEOUT)
self.stop_proxy(self.monitor)
self.subscriber.assert_called_with([testutil.MockSize(1),
testutil.MockSize(2)])
+ @mock.patch("subprocess.check_output")
+ def test_squeue_server_list_suffix(self, mock_squeue):
+ mock_squeue.return_value = """1 1024M 0 Resources zzzzz-zzzzz-zzzzzzzzzzzzzzy
+1 2G 0 Resources zzzzz-zzzzz-zzzzzzzzzzzzzzz
+"""
+
+ super(JobQueueMonitorActorTestCase, self).build_monitor(jobqueue.ServerCalculator(
+ [(testutil.MockSize(n), {'cores': n, 'ram': n*1024, 'scratch': n}) for n in range(1, 3)]))
+ self.monitor.subscribe(self.subscriber).get(self.TIMEOUT)
+ self.stop_proxy(self.monitor)
+ self.subscriber.assert_called_with([testutil.MockSize(1),
+ testutil.MockSize(2)])
+
+ def test_coerce_to_mb(self):
+ self.assertEqual(1, jobqueue.JobQueueMonitorActor.coerce_to_mb("1"))
+ self.assertEqual(512, jobqueue.JobQueueMonitorActor.coerce_to_mb("512"))
+ self.assertEqual(512, jobqueue.JobQueueMonitorActor.coerce_to_mb("512M"))
+ self.assertEqual(1024, jobqueue.JobQueueMonitorActor.coerce_to_mb("1024M"))
+ self.assertEqual(1024, jobqueue.JobQueueMonitorActor.coerce_to_mb("1G"))
+ self.assertEqual(1536, jobqueue.JobQueueMonitorActor.coerce_to_mb("1.5G"))
+ self.assertEqual(2048, jobqueue.JobQueueMonitorActor.coerce_to_mb("2G"))
+ self.assertEqual(1025, jobqueue.JobQueueMonitorActor.coerce_to_mb("1025M"))
+ self.assertEqual(1048576, jobqueue.JobQueueMonitorActor.coerce_to_mb("1T"))
+ self.assertEqual(1572864, jobqueue.JobQueueMonitorActor.coerce_to_mb("1.5T"))
+ self.assertEqual(1073741824, jobqueue.JobQueueMonitorActor.coerce_to_mb("1P"))
+ self.assertEqual(1610612736, jobqueue.JobQueueMonitorActor.coerce_to_mb("1.5P"))
+ self.assertEqual(0, jobqueue.JobQueueMonitorActor.coerce_to_mb("0"))
+ self.assertEqual(0, jobqueue.JobQueueMonitorActor.coerce_to_mb("0M"))
+ self.assertEqual(0, jobqueue.JobQueueMonitorActor.coerce_to_mb("0G"))
+
if __name__ == '__main__':
unittest.main()