def test_uuid_is_subscription_key(self, sinfo_mock):
sinfo_mock.return_value = ""
node = testutil.arvados_node_mock()
def test_uuid_is_subscription_key(self, sinfo_mock):
sinfo_mock.return_value = ""
node = testutil.arvados_node_mock()
self.subscriber.assert_called_with(node)
self.assertEqual("down", node["crunch_worker_state"])
self.subscriber.assert_called_with(node)
self.assertEqual("down", node["crunch_worker_state"])
def test_update_from_sinfo(self, sinfo_mock):
sinfo_mock.return_value = """compute1|idle|instancetype=a1.test
compute2|alloc|(null)
def test_update_from_sinfo(self, sinfo_mock):
sinfo_mock.return_value = """compute1|idle|instancetype=a1.test
compute2|alloc|(null)
def build_monitor(self, side_effect, *args, **kwargs):
def build_monitor(self, side_effect, *args, **kwargs):