Manager to handle).
"""
def __init__(self, timer_actor, arvados_client, cloud_client,
- cloud_size, arvados_node,
+ cloud_size, arvados_node=None,
retry_wait=1, max_retry_wait=180):
super(ComputeNodeSetupActor, self).__init__(
cloud_client, arvados_client, timer_actor,
self.cloud_size = cloud_size
self.arvados_node = None
self.cloud_node = None
- self._later.prepare_arvados_node(arvados_node)
+ if arvados_node is None:
+ self._later.create_arvados_node()
+ else:
+ self._later.prepare_arvados_node(arvados_node)
+
+ @ComputeNodeStateChangeBase._finish_on_exception
+ @RetryMixin._retry(config.ARVADOS_ERRORS)
+ def create_arvados_node(self):
+ self.arvados_node = self._arvados.nodes().create(body={}).execute()
+ self._later.create_cloud_node()
@ComputeNodeStateChangeBase._finish_on_exception
@RetryMixin._retry(config.ARVADOS_ERRORS)
if (state == 'down' and
self.arvados_node['first_ping_at'] and
timestamp_fresh(self.cloud_node_start_time,
- self.boot_fail_after)):
+ self.boot_fail_after) and
+ not self._cloud.broken(self.cloud_node)):
state = 'idle'
# "missing" means last_ping_at is stale, this should be
result = state in states
if state == 'idle':
result = result and not self.arvados_node['job_uuid']
+
return result
def shutdown_eligible(self):
return sum(1 for down in
pykka.get_all(rec.actor.in_state('down') for rec in
self.cloud_nodes.nodes.itervalues()
- if size is None or rec.cloud_node.size.id == size.id)
+ if ((size is None or rec.cloud_node.size.id == size.id) and
+ rec.cloud_node.id not in self.shutdowns))
if down)
def _nodes_up(self, size):
- up = (self._nodes_booting(size) + self._nodes_unpaired(size) + self._nodes_paired(size)) - self._nodes_down(size)
+ up = (self._nodes_booting(size) + self._nodes_unpaired(size) + self._nodes_paired(size)) - (self._nodes_down(size) + self._size_shutdowns(size))
return up
def _total_price(self):
return sum(1 for c in self.last_wishlist if c.id == size.id)
def _size_shutdowns(self, size):
- sh = 0
- for c in self.shutdowns.iterkeys():
- try:
- if self.sizes_booting_shutdown[c].id == size.id:
- sh += 1
- except pykka.ActorDeadError:
- pass
- return sh
+ return sum(1
+ for c in self.shutdowns.iterkeys()
+ if size is None or self.sizes_booting_shutdown[c].id == size.id)
def _nodes_wanted(self, size):
- total_up_count = self._nodes_up(None)
+ total_up_count = self._nodes_up(None) + self._nodes_down(None)
under_min = self.min_nodes - total_up_count
over_max = total_up_count - self.max_nodes
total_price = self._total_price()
nodes_wanted = self._nodes_wanted(cloud_size)
if nodes_wanted < 1:
return None
+ arvados_node = self.arvados_nodes.find_stale_node(self.node_stale_after)
self._logger.info("Want %i more %s nodes. Booting a node.",
nodes_wanted, cloud_size.name)
-
- arvados_client=self._new_arvados()
- arvados_node = self.arvados_nodes.find_stale_node(self.node_stale_after)
- if not arvados_node:
- arvados_node = arvados_client.nodes().create(body={}).execute()
- self._register_arvados_node(arvados_node["uuid"], arvados_node)
-
new_setup = self._node_setup.start(
timer_actor=self._timer,
- arvados_client=arvados_client,
+ arvados_client=self._new_arvados(),
arvados_node=arvados_node,
cloud_client=self._new_cloud(),
cloud_size=cloud_size).proxy()
- self.booting[arvados_node['uuid']] = new_setup
- self.sizes_booting_shutdown[arvados_node['uuid']] = cloud_size
+ self.booting[new_setup.actor_ref.actor_urn] = new_setup
+ self.sizes_booting_shutdown[new_setup.actor_ref.actor_urn] = cloud_size
if arvados_node is not None:
self.arvados_nodes[arvados_node['uuid']].assignment_time = (
if cloud_node is not None:
# Node creation succeeded. Update cloud node list.
self._register_cloud_node(cloud_node)
- arvuuid = arvados_node["uuid"]
- if arvuuid in self.booting:
- del self.booting[arvuuid]
- del self.sizes_booting_shutdown[arvuuid]
+ del self.booting[setup_proxy.actor_ref.actor_urn]
+ del self.sizes_booting_shutdown[setup_proxy.actor_ref.actor_urn]
@_check_poll_freshness
def stop_booting_node(self, size):
@_check_poll_freshness
def node_can_shutdown(self, node_actor):
if self._nodes_excess(node_actor.cloud_node.get().size) > 0:
+ print("excess")
self._begin_node_shutdown(node_actor, cancellable=True)
elif self.cloud_nodes.nodes.get(node_actor.cloud_node.get().id).arvados_node is None:
# Node is unpaired, which means it probably exceeded its booting
# grace period without a ping, so shut it down so we can boot a new
# node in its place.
+ print("unpaired")
self._begin_node_shutdown(node_actor, cancellable=False)
- elif node_actor.in_state('down'):
+ elif node_actor.in_state('down').get():
# Node is down and unlikely to come back.
self._begin_node_shutdown(node_actor, cancellable=False)
def test_shutdown_without_arvados_node(self):
self.make_actor(start_time=0)
self.shutdowns._set_state(True, 600)
- self.assertEquals(self.node_actor.shutdown_eligible().get(self.TIMEOUT), (True, "node state is ('unpaired', 'open', 'boot exceeded', 'idle exceeded')"))
+ self.assertEquals((True, "node state is ('unpaired', 'open', 'boot exceeded', 'idle exceeded')"),
+ self.node_actor.shutdown_eligible().get(self.TIMEOUT))
def test_shutdown_missing(self):
arv_node = testutil.arvados_node_mock(10, job_uuid=None,
last_ping_at='1970-01-01T01:02:03.04050607Z')
self.make_actor(10, arv_node)
self.shutdowns._set_state(True, 600)
- self.assertEquals(self.node_actor.shutdown_eligible().get(self.TIMEOUT), (True, "node state is ('down', 'open', 'boot wait', 'idle exceeded')"))
+ self.assertEquals((True, "node state is ('down', 'open', 'boot wait', 'idle exceeded')"),
+ self.node_actor.shutdown_eligible().get(self.TIMEOUT))
def test_shutdown_running_broken(self):
arv_node = testutil.arvados_node_mock(12, job_uuid=None,
self.make_actor(12, arv_node)
self.shutdowns._set_state(True, 600)
self.cloud_client.broken.return_value = True
- self.assertEquals(self.node_actor.shutdown_eligible().get(self.TIMEOUT), (True, "node state is ('down', 'open', 'boot wait', 'idle exceeded')"))
+ self.assertEquals((True, "node state is ('down', 'open', 'boot wait', 'idle exceeded')"),
+ self.node_actor.shutdown_eligible().get(self.TIMEOUT))
def test_shutdown_missing_broken(self):
arv_node = testutil.arvados_node_mock(11, job_uuid=None,
def test_no_shutdown_when_window_closed(self):
self.make_actor(3, testutil.arvados_node_mock(3, job_uuid=None))
- self.assertEquals(self.node_actor.shutdown_eligible().get(self.TIMEOUT),
- (False, "node state is ('idle', 'closed', 'boot wait', 'idle exceeded')"))
+ self.assertEquals((False, "node state is ('idle', 'closed', 'boot wait', 'idle exceeded')"),
+ self.node_actor.shutdown_eligible().get(self.TIMEOUT))
def test_no_shutdown_when_node_running_job(self):
self.make_actor(4, testutil.arvados_node_mock(4, job_uuid=True))
self.shutdowns._set_state(True, 600)
- self.assertEquals(self.node_actor.shutdown_eligible().get(self.TIMEOUT),
- (False, "node state is ('busy', 'open', 'boot wait', 'idle exceeded')"))
+ self.assertEquals((False, "node state is ('busy', 'open', 'boot wait', 'idle exceeded')"),
+ self.node_actor.shutdown_eligible().get(self.TIMEOUT))
def test_no_shutdown_when_node_state_unknown(self):
self.make_actor(5, testutil.arvados_node_mock(
5, crunch_worker_state=None))
self.shutdowns._set_state(True, 600)
- self.assertEquals(self.node_actor.shutdown_eligible().get(self.TIMEOUT),
- (False, "node is paired but crunch_worker_state is 'None'"))
+ self.assertEquals((False, "node is paired but crunch_worker_state is 'None'"),
+ self.node_actor.shutdown_eligible().get(self.TIMEOUT))
def test_no_shutdown_when_node_state_stale(self):
self.make_actor(6, testutil.arvados_node_mock(6, age=90000))
self.shutdowns._set_state(True, 600)
- self.assertEquals(self.node_actor.shutdown_eligible().get(self.TIMEOUT),
- (False, "node state is stale"))
+ self.assertEquals((False, "node state is stale"),
+ self.node_actor.shutdown_eligible().get(self.TIMEOUT))
def test_arvados_node_match(self):
self.make_actor(2)
setup = self.start_node_boot(cloud_node, arv_node)
self.daemon.node_up(setup).get(self.TIMEOUT)
self.assertEqual(1, self.alive_monitor_count())
- self.daemon.update_cloud_nodes([cloud_node])
self.daemon.update_arvados_nodes([arv_node])
+ self.daemon.update_cloud_nodes([cloud_node])
+ self.monitor_list()[0].proxy().cloud_node_start_time = time.time()-1801
self.daemon.update_server_wishlist(
[testutil.MockSize(1)]).get(self.TIMEOUT)
self.stop_proxy(self.daemon)
self.daemon.update_cloud_nodes([cloud_node]).get(self.TIMEOUT)
self.assertEqual(1, self.alive_monitor_count())
- def test_node_counted_after_boot_with_slow_listing(self):
- # Test that, after we boot a compute node, we assume it exists
- # even it doesn't appear in the listing (e.g., because of delays
- # propagating tags).
- setup = self.start_node_boot()
- self.daemon.node_up(setup).get(self.TIMEOUT)
- self.assertEqual(1, self.alive_monitor_count())
- self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
- self.assertEqual(1, self.alive_monitor_count())
-
def test_booted_unlisted_node_counted(self):
setup = self.start_node_boot(id_num=1)
self.daemon.node_up(setup)
def test_shutdown_declined_at_wishlist_capacity(self):
cloud_node = testutil.cloud_node_mock(1)
+ arv_node = testutil.arvados_node_mock(1)
size = testutil.MockSize(1)
- self.make_daemon(cloud_nodes=[cloud_node], want_sizes=[size])
+ self.make_daemon(cloud_nodes=[cloud_node], arvados_nodes=[arv_node], want_sizes=[size])
self.assertEqual(1, self.alive_monitor_count())
monitor = self.monitor_list()[0].proxy()
self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
def test_shutdown_declined_below_min_nodes(self):
cloud_node = testutil.cloud_node_mock(1)
- self.make_daemon(cloud_nodes=[cloud_node], min_nodes=1)
+ arv_node = testutil.arvados_node_mock(1)
+ self.make_daemon(cloud_nodes=[cloud_node], arvados_nodes=[arv_node], min_nodes=1)
self.assertEqual(1, self.alive_monitor_count())
monitor = self.monitor_list()[0].proxy()
self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)