X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/a5687a390262abebfc16cf21e62052ac0019512d..84b30d024ee757e32af7e05d00bf4324513c388c:/services/nodemanager/arvnodeman/timedcallback.py diff --git a/services/nodemanager/arvnodeman/timedcallback.py b/services/nodemanager/arvnodeman/timedcallback.py index a1df8ec17b..e7e3f25fe3 100644 --- a/services/nodemanager/arvnodeman/timedcallback.py +++ b/services/nodemanager/arvnodeman/timedcallback.py @@ -1,4 +1,7 @@ #!/usr/bin/env python +# Copyright (C) The Arvados Authors. All rights reserved. +# +# SPDX-License-Identifier: AGPL-3.0 from __future__ import absolute_import, print_function @@ -16,21 +19,26 @@ class TimedCallBackActor(actor_class): message at a later time. This actor runs the necessary event loop for delivery. """ - def __init__(self, max_sleep=1): + def __init__(self, max_sleep=1, timefunc=None): super(TimedCallBackActor, self).__init__() - self._proxy = self.actor_ref.proxy() + self._proxy = self.actor_ref.tell_proxy() self.messages = [] self.max_sleep = max_sleep + if timefunc is None: + self._timefunc = time.time + else: + self._timefunc = timefunc def schedule(self, delivery_time, receiver, *args, **kwargs): + if not self.messages: + self._proxy.deliver() heapq.heappush(self.messages, (delivery_time, receiver, args, kwargs)) - self._proxy.deliver() def deliver(self): if not self.messages: - return None - til_next = self.messages[0][0] - time.time() - if til_next < 0: + return + til_next = self.messages[0][0] - self._timefunc() + if til_next <= 0: t, receiver, args, kwargs = heapq.heappop(self.messages) try: receiver(*args, **kwargs)