from __future__ import absolute_import, print_function
+import threading
import time
import mock
if job_uuid is True:
job_uuid = 'zzzzz-jjjjj-jobjobjobjobjob'
slurm_state = 'idle' if (job_uuid is None) else 'alloc'
- node = {'uuid': 'zzzzz-yyyyy-12345abcde67890',
+ node = {'uuid': 'zzzzz-yyyyy-{:015x}'.format(node_num),
'created_at': '2014-01-01T01:02:03Z',
'modified_at': time.strftime('%Y-%m-%dT%H:%M:%SZ',
time.gmtime(time.time() - age)),
def ip_address_mock(last_octet):
return '10.20.30.{}'.format(last_octet)
+class MockShutdownTimer(object):
+ def _set_state(self, is_open, next_opening):
+ self.window_open = lambda: is_open
+ self.next_opening = lambda: next_opening
+
+
class MockSize(object):
def __init__(self, factor):
self.id = 'z{}.test'.format(factor)
class MockTimer(object):
+ def __init__(self, deliver_immediately=True):
+ self.deliver_immediately = deliver_immediately
+ self.messages = []
+ self.lock = threading.Lock()
+
+ def deliver(self):
+ with self.lock:
+ to_deliver = self.messages
+ self.messages = []
+ for callback, args, kwargs in to_deliver:
+ callback(*args, **kwargs)
+
def schedule(self, want_time, callback, *args, **kwargs):
- return callback(*args, **kwargs)
+ with self.lock:
+ self.messages.append((callback, args, kwargs))
+ if self.deliver_immediately:
+ self.deliver()
class ActorTestMixin(object):