Merge branch '13301-cwl-resource-scatter' closes #13301
[arvados.git] / services / nodemanager / arvnodeman / timedcallback.py
1 #!/usr/bin/env python
2 # Copyright (C) The Arvados Authors. All rights reserved.
3 #
4 # SPDX-License-Identifier: AGPL-3.0
5
6 from __future__ import absolute_import, print_function
7
8 import heapq
9 import time
10
11 import pykka
12
13 from .config import actor_class
14
15 class TimedCallBackActor(actor_class):
16     """Send messages to other actors on a schedule.
17
18     Other actors can call the schedule() method to schedule delivery of a
19     message at a later time.  This actor runs the necessary event loop for
20     delivery.
21     """
22     def __init__(self, max_sleep=1, timefunc=None):
23         super(TimedCallBackActor, self).__init__()
24         self._proxy = self.actor_ref.tell_proxy()
25         self.messages = []
26         self.max_sleep = max_sleep
27         if timefunc is None:
28             self._timefunc = time.time
29         else:
30             self._timefunc = timefunc
31
32     def schedule(self, delivery_time, receiver, *args, **kwargs):
33         if not self.messages:
34             self._proxy.deliver()
35         heapq.heappush(self.messages, (delivery_time, receiver, args, kwargs))
36
37     def deliver(self):
38         if not self.messages:
39             return
40         til_next = self.messages[0][0] - self._timefunc()
41         if til_next <= 0:
42             t, receiver, args, kwargs = heapq.heappop(self.messages)
43             try:
44                 receiver(*args, **kwargs)
45             except pykka.ActorDeadError:
46                 pass
47         else:
48             time.sleep(min(til_next, self.max_sleep))
49         self._proxy.deliver()