top:1.5em;
}
+.dropdown-menu {
+ max-height: 30em;
+ overflow-y: auto;
+}
+
.row-fill-height, .row-fill-height>div[class*='col-'] {
display: flex;
}
end
def show
- @pipelines = [@object]
-
- if params[:compare]
- PipelineInstance.where(uuid: params[:compare]).each do |p|
- @pipelines << p
- end
+ # the #show action can also be called by #compare, which does its own work to set up @pipelines
+ unless defined? @pipelines
+ @pipelines = [@object]
end
provenance, pips = graph(@pipelines)
end
if params['tab_pane'] == "Graph"
- provenance, pips = graph(@objects)
-
@pipelines = @objects
-
- if provenance
- @prov_svg = ProvenanceHelper::create_provenance_graph provenance, "provenance_svg", {
- :request => request,
- :all_script_parameters => true,
- :combine_jobs => :script_and_version,
- :script_version_nodes => true,
- :pips => pips }
- end
end
@object = @objects.first
- <li>
- <%= link_to projects_path(options: {ensure_unique_name: true}), method: :post, class: 'btn btn-xs btn-default pull-right' do %>
- <i class="fa fa-plus"></i> Add a new project
- <% end %>
- </li>
- <li>
- <%= project_link_to.call({object: current_user, depth: 0}) do %>
- <span style="padding-left: 0"></span>Home
- <% end %>
- </li>
- <% my_project_tree.each do |pnode| %>
- <% next if pnode[:object].class != Group %>
- <li>
- <%= project_link_to.call pnode do %>
- <span style="padding-left: <%= pnode[:depth] %>em"></span><%= pnode[:object].name %>
- <% end %>
- </li>
- <% end %>
- <li class="divider" />
- <li role="presentation" class="dropdown-header">
- Projects shared with me
- </li>
- <% shared_project_tree.each do |pnode| %>
- <% next if pnode[:object].class != Group %>
- <li>
- <%= project_link_to.call pnode do %>
- <span style="padding-left: <%= pnode[:depth]-1 %>em"></span><i class="fa fa-fw fa-share-alt" style="color:#aaa"></i> <%= pnode[:object].name %>
- <% end %>
- </li>
- <% end %>
+<li>
+ <%= project_link_to.call({object: current_user, depth: 0}) do %>
+ <span style="padding-left: 0">Home</span>
+ <% end %>
+</li>
+<% my_project_tree.each do |pnode| %>
+ <% next if pnode[:object].class != Group %>
+ <li>
+ <%= project_link_to.call pnode do %>
+ <span style="padding-left: <%= pnode[:depth] %>em"></span><%= pnode[:object].name %>
+ <% end %>
+ </li>
+<% end %>
+<li class="divider" />
+<li role="presentation" class="dropdown-header">
+ Projects shared with me
+</li>
+<% shared_project_tree.each do |pnode| %>
+ <% next if pnode[:object].class != Group %>
+ <li>
+ <%= project_link_to.call pnode do %>
+ <span style="padding-left: <%= pnode[:depth]-1 %>em"></span><i class="fa fa-fw fa-share-alt" style="color:#aaa"></i> <%= pnode[:object].name %>
+ <% end %>
+ </li>
+<% end %>
<%= current_user.email %> <span class="caret"></span>
</a>
<ul class="dropdown-menu" role="menu">
- <li role="presentation" class="dropdown-header">
- My account
- </li>
<% if current_user.is_active %>
<li role="menuitem"><a href="/manage_account" role="menuitem"><i class="fa fa-key fa-fw"></i> Manage account</a></li>
<% if Rails.configuration.user_profile_form_fields %>
<li role="menuitem"><a href="/users/<%=current_user.uuid%>/profile" role="menuitem"><i class="fa fa-key fa-fw"></i> Manage profile</a></li>
<% end %>
- <li role="presentation" class="divider"></li>
<% end %>
<li role="menuitem"><a href="<%= logout_path %>" role="menuitem"><i class="fa fa-sign-out fa-fw"></i> Log out</a></li>
<% if current_user.is_active and
Projects
<span class="caret"></span>
</a>
- <ul class="dropdown-menu" role="menu">
+ <ul class="dropdown-menu" style="min-width: 20em" role="menu">
+ <li>
+ <%= link_to projects_path(options: {ensure_unique_name: true}), method: :post, class: 'btn btn-xs btn-default pull-right' do %>
+ <i class="fa fa-plus"></i> Add a new project
+ <% end %>
+ </li>
<%= render partial: "projects_tree_menu", locals: {
:project_link_to => Proc.new do |pnode, &block|
link_to(project_path(pnode[:object].uuid),
require 'test_helper'
class SearchControllerTest < ActionController::TestCase
- # test "the truth" do
- # assert true
- # end
+ # These tests don't do state-changing API calls. Save some time by
+ # skipping the database reset.
+ reset_api_fixtures :after_each_test, false
+ reset_api_fixtures :after_suite, true
+
+ include Rails.application.routes.url_helpers
+
+ test 'Get search dialog' do
+ xhr :get, :choose, {
+ format: :js,
+ title: 'Search',
+ action_name: 'Show',
+ action_href: url_for(host: 'localhost', controller: :actions, action: :show),
+ action_data: {}.to_json,
+ }, session_for(:active)
+ assert_response :success
+ end
+
+ test 'Get search results for all projects' do
+ xhr :get, :choose, {
+ format: :json,
+ partial: true,
+ }, session_for(:active)
+ assert_response :success
+ assert_not_empty(json_response['content'],
+ 'search results for all projects should not be empty')
+ end
+
+ test 'Get search results for empty project' do
+ xhr :get, :choose, {
+ format: :json,
+ partial: true,
+ project_uuid: api_fixture('groups')['empty_project']['uuid'],
+ }, session_for(:active)
+ assert_response :success
+ assert_empty(json_response['content'],
+ 'search results for empty project should be empty')
+ end
end
else:
outcollection = robust_put.upload(outdir, logger)
-# Success if no non-zero return codes
-success = any(rcode) and not any([status != 0 for status in rcode.values()])
+# Success if we ran any subprocess, and they all exited 0.
+success = rcode and all(status == 0 for status in rcode.itervalues())
api.job_tasks().update(uuid=arvados.current_task()['uuid'],
body={
'progress':1.0
}).execute()
-sys.exit(rcode)
+sys.exit(0 if success else 1)
end
def test_file_to_file_no_overwrite_file
+ skip "Waiting unitl #4534 is implemented"
File.open './tmp/foo', 'wb' do |f|
f.write 'baz'
end
end
def test_file_to_file_no_overwrite_file_in_dir
+ skip "Waiting unitl #4534 is implemented"
File.open './tmp/foo', 'wb' do |f|
f.write 'baz'
end
+*.pyc
/build/
/dist/
-/*.egg
-/*.egg-info
+*.egg
+*.egg-info
/tests/tmp
+.eggs
description: Users who can share objects with each other
group_class: role
+empty_project:
+ uuid: zzzzz-j7d0g-9otoxmrksam74q6
+ owner_uuid: zzzzz-tpzed-xurymjxw79nv3jz
+ created_at: 2014-12-16 15:56:27.967534940 Z
+ modified_by_client_uuid: ~
+ modified_by_user_uuid: zzzzz-tpzed-xurymjxw79nv3jz
+ modified_at: 2014-12-16 15:56:27.967358199 Z
+ name: Empty project
+ description: ~
+ updated_at: 2014-12-16 15:56:27.967242142 Z
+ group_class: project
+
project_with_10_collections:
uuid: zzzzz-j7d0g-0010collections
owner_uuid: zzzzz-tpzed-user1withloadab
+++ /dev/null
-*.pyc
-*.egg
-*.egg-info
-build/
-dist/
--- /dev/null
+../../sdk/python/.gitignore
\ No newline at end of file
response to subscribers. It takes care of error handling, and retrying
requests with exponential backoff.
- To use this actor, define CLIENT_ERRORS and the _send_request method.
- If you also define an _item_key method, this class will support
- subscribing to a specific item by key in responses.
+ To use this actor, define the _send_request method. If you also
+ define an _item_key method, this class will support subscribing to
+ a specific item by key in responses.
"""
- CLIENT_ERRORS = ()
-
def __init__(self, client, timer_actor, poll_wait=60, max_poll_wait=180):
super(RemotePollLoopActor, self).__init__()
self._client = client
return "{} got error: {} - waiting {} seconds".format(
self.log_prefix, error, self.poll_wait)
+ def is_common_error(self, exception):
+ return False
+
def poll(self, scheduled_start=None):
self._logger.debug("%s sending poll", self.log_prefix)
start_time = time.time()
response = self._send_request()
except Exception as error:
errmsg = self._got_error(error)
- if isinstance(error, self.CLIENT_ERRORS):
+ if self.is_common_error(error):
self._logger.warning(errmsg)
else:
self._logger.exception(errmsg)
This base class takes care of retrying changes and notifying
subscribers when the change is finished.
"""
- def __init__(self, logger_name, timer_actor, retry_wait, max_retry_wait):
+ def __init__(self, logger_name, cloud_client, timer_actor,
+ retry_wait, max_retry_wait):
super(ComputeNodeStateChangeBase, self).__init__()
self._later = self.actor_ref.proxy()
- self._timer = timer_actor
self._logger = logging.getLogger(logger_name)
+ self._cloud = cloud_client
+ self._timer = timer_actor
self.min_retry_wait = retry_wait
self.max_retry_wait = max_retry_wait
self.retry_wait = retry_wait
self.subscribers = set()
@staticmethod
- def _retry(errors):
+ def _retry(errors=()):
"""Retry decorator for an actor method that makes remote requests.
Use this function to decorator an actor method, and pass in a
tuple of exceptions to catch. This decorator will schedule
retries of that method with exponential backoff if the
- original method raises any of the given errors.
+ original method raises a known cloud driver error, or any of the
+ given exception types.
"""
def decorator(orig_func):
@functools.wraps(orig_func)
- def wrapper(self, *args, **kwargs):
+ def retry_wrapper(self, *args, **kwargs):
start_time = time.time()
try:
orig_func(self, *args, **kwargs)
- except errors as error:
+ except Exception as error:
+ if not (isinstance(error, errors) or
+ self._cloud.is_cloud_exception(error)):
+ raise
self._logger.warning(
"Client error: %s - waiting %s seconds",
error, self.retry_wait)
self.max_retry_wait)
else:
self.retry_wait = self.min_retry_wait
- return wrapper
+ return retry_wrapper
return decorator
def _finished(self):
cloud_size, arvados_node=None,
retry_wait=1, max_retry_wait=180):
super(ComputeNodeSetupActor, self).__init__(
- 'arvnodeman.nodeup', timer_actor, retry_wait, max_retry_wait)
+ 'arvnodeman.nodeup', cloud_client, timer_actor,
+ retry_wait, max_retry_wait)
self._arvados = arvados_client
- self._cloud = cloud_client
self.cloud_size = cloud_size
self.arvados_node = None
self.cloud_node = None
else:
self._later.prepare_arvados_node(arvados_node)
- @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
+ @ComputeNodeStateChangeBase._retry()
def create_arvados_node(self):
self.arvados_node = self._arvados.nodes().create(body={}).execute()
self._later.create_cloud_node()
- @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
+ @ComputeNodeStateChangeBase._retry()
def prepare_arvados_node(self, node):
self.arvados_node = self._arvados.nodes().update(
uuid=node['uuid'],
).execute()
self._later.create_cloud_node()
- @ComputeNodeStateChangeBase._retry(config.CLOUD_ERRORS)
+ @ComputeNodeStateChangeBase._retry()
def create_cloud_node(self):
self._logger.info("Creating cloud node with size %s.",
self.cloud_size.name)
self.cloud_node = self._cloud.create_node(self.cloud_size,
self.arvados_node)
self._logger.info("Cloud node %s created.", self.cloud_node.id)
+ self._later.post_create()
+
+ @ComputeNodeStateChangeBase._retry()
+ def post_create(self):
+ self._cloud.post_create_node(self.cloud_node)
+ self._logger.info("%s post-create work done.", self.cloud_node.id)
self._finished()
def stop_if_no_cloud_node(self):
# eligible. Normal shutdowns based on job demand should be
# cancellable; shutdowns based on node misbehavior should not.
super(ComputeNodeShutdownActor, self).__init__(
- 'arvnodeman.nodedown', timer_actor, retry_wait, max_retry_wait)
- self._cloud = cloud_client
+ 'arvnodeman.nodedown', cloud_client, timer_actor,
+ retry_wait, max_retry_wait)
self._monitor = node_monitor.proxy()
self.cloud_node = self._monitor.cloud_node.get()
self.cancellable = cancellable
def _stop_if_window_closed(orig_func):
@functools.wraps(orig_func)
- def wrapper(self, *args, **kwargs):
+ def stop_wrapper(self, *args, **kwargs):
if (self.cancellable and
(not self._monitor.shutdown_eligible().get())):
self._logger.info(
return None
else:
return orig_func(self, *args, **kwargs)
- return wrapper
+ return stop_wrapper
@_stop_if_window_closed
- @ComputeNodeStateChangeBase._retry(config.CLOUD_ERRORS)
+ @ComputeNodeStateChangeBase._retry()
def shutdown_node(self):
if self._cloud.destroy_node(self.cloud_node):
self._logger.info("Cloud node %s shut down.", self.cloud_node.id)
def _throttle_errors(orig_func):
@functools.wraps(orig_func)
- def wrapper(self, *args, **kwargs):
+ def throttle_wrapper(self, *args, **kwargs):
throttle_time = self.next_request_time - time.time()
if throttle_time > 0:
time.sleep(throttle_time)
self.next_request_time = time.time()
try:
result = orig_func(self, *args, **kwargs)
- except config.CLOUD_ERRORS:
+ except Exception as error:
self.error_streak += 1
self.next_request_time += min(2 ** self.error_streak,
self.max_retry_wait)
else:
self.error_streak = 0
return result
- return wrapper
+ return throttle_wrapper
@_throttle_errors
def sync_node(self, cloud_node, arvados_node):
from __future__ import absolute_import, print_function
+import libcloud.common.types as cloud_types
+
+from ...config import NETWORK_ERRORS
+
class BaseComputeNodeDriver(object):
"""Abstract base class for compute node drivers.
creation kwargs with information about the specific Arvados node
record), sync_node, and node_start_time.
"""
+ CLOUD_ERRORS = NETWORK_ERRORS + (cloud_types.LibcloudError,)
+
def __init__(self, auth_kwargs, list_kwargs, create_kwargs, driver_class):
self.real = driver_class(**auth_kwargs)
self.list_kwargs = list_kwargs
kwargs['size'] = size
return self.real.create_node(**kwargs)
+ def post_create_node(self, cloud_node):
+ # ComputeNodeSetupActor calls this method after the cloud node is
+ # created. Any setup tasks that need to happen afterward (e.g.,
+ # tagging) should be done in this method.
+ pass
+
def sync_node(self, cloud_node, arvados_node):
# When a compute node first pings the API server, the API server
# will automatically assign some attributes on the corresponding
@classmethod
def node_start_time(cls, node):
raise NotImplementedError("BaseComputeNodeDriver.node_start_time")
+
+ @classmethod
+ def is_cloud_exception(cls, exception):
+ # libcloud compute drivers typically raise bare Exceptions to
+ # represent API errors. Return True for any exception that is
+ # exactly an Exception, or a better-known higher-level exception.
+ return (isinstance(exception, cls.CLOUD_ERRORS) or
+ getattr(exception, '__class__', None) is Exception)
return 'auth', key
def arvados_create_kwargs(self, arvados_node):
- result = {'ex_metadata': self.tags.copy(),
- 'name': arvados_node_fqdn(arvados_node)}
+ result = {'name': arvados_node_fqdn(arvados_node)}
ping_secret = arvados_node['info'].get('ping_secret')
if ping_secret is not None:
ping_url = ('https://{}/arvados/v1/nodes/{}/ping?ping_secret={}'.
result['ex_userdata'] = ping_url
return result
+ def post_create_node(self, cloud_node):
+ self.real.ex_create_tags(cloud_node, self.tags)
+
def sync_node(self, cloud_node, arvados_node):
- metadata = self.arvados_create_kwargs(arvados_node)
- tags = metadata['ex_metadata']
- tags['Name'] = metadata['name']
- self.real.ex_create_tags(cloud_node, tags)
+ self.real.ex_create_tags(cloud_node,
+ {'Name': arvados_node_fqdn(arvados_node)})
@classmethod
def node_start_time(cls, node):
import arvados
import httplib2
-import libcloud.common.types as cloud_types
import pykka
from apiclient import errors as apierror
# it's low-level, but unlikely to catch code bugs.
NETWORK_ERRORS = (IOError, ssl.SSLError)
ARVADOS_ERRORS = NETWORK_ERRORS + (apierror.Error,)
-CLOUD_ERRORS = NETWORK_ERRORS + (cloud_types.LibcloudError,)
actor_class = pykka.ThreadingActor
def __init__(self, server_wishlist_actor, arvados_nodes_actor,
cloud_nodes_actor, cloud_update_actor, timer_actor,
arvados_factory, cloud_factory,
- shutdown_windows, min_nodes, max_nodes,
+ shutdown_windows, min_size, min_nodes, max_nodes,
poll_stale_after=600,
boot_fail_after=1800,
node_stale_after=7200,
self._logger = logging.getLogger('arvnodeman.daemon')
self._later = self.actor_ref.proxy()
self.shutdown_windows = shutdown_windows
+ self.min_cloud_size = min_size
self.min_nodes = min_nodes
self.max_nodes = max_nodes
self.poll_stale_after = poll_stale_after
break
for key, record in self.cloud_nodes.orphans.iteritems():
record.actor.stop()
+ record.cloud_node = None
self.shutdowns.pop(key, None)
def update_arvados_nodes(self, nodelist):
def _nodes_wanted(self):
up_count = self._nodes_up()
+ under_min = self.min_nodes - up_count
over_max = up_count - self.max_nodes
if over_max >= 0:
return -over_max
+ elif under_min > 0:
+ return under_min
else:
up_count -= len(self.shutdowns) + self._nodes_busy()
return len(self.last_wishlist) - up_count
if nodes_wanted < 1:
return None
arvados_node = self.arvados_nodes.find_stale_node(self.node_stale_after)
- cloud_size = self.last_wishlist[nodes_wanted - 1]
+ try:
+ cloud_size = self.last_wishlist[self._nodes_up()]
+ except IndexError:
+ cloud_size = self.min_cloud_size
self._logger.info("Want %s more nodes. Booting a %s node.",
nodes_wanted, cloud_size.name)
new_setup = self._node_setup.start(
return True
- def __init__(self, server_list, min_nodes=0, max_nodes=None):
+ def __init__(self, server_list, max_nodes=None):
self.cloud_sizes = [self.CloudSizeWrapper(s, **kws)
for s, kws in server_list]
self.cloud_sizes.sort(key=lambda s: s.price)
- self.min_nodes = min_nodes
self.max_nodes = max_nodes or float('inf')
self.logger = logging.getLogger('arvnodeman.jobqueue')
self.logged_jobs = set()
elif (want_count <= self.max_nodes):
servers.extend([cloud_size.real] * max(1, want_count))
self.logged_jobs.intersection_update(seen_jobs)
-
- # Make sure the server queue has at least enough entries to
- # satisfy min_nodes.
- node_shortfall = self.min_nodes - len(servers)
- if node_shortfall > 0:
- basic_node = self.cloud_size_for_constraints({})
- servers.extend([basic_node.real] * node_shortfall)
return servers
+ def cheapest_size(self):
+ return self.cloud_sizes[0]
+
class JobQueueMonitorActor(clientactor.RemotePollLoopActor):
"""Actor to generate server wishlists from the job queue.
sublogger = logging.getLogger(logger_name)
sublogger.setLevel(sublevel)
-def launch_pollers(config):
- cloud_client = config.new_cloud_client()
- arvados_client = config.new_arvados_client()
- cloud_size_list = config.node_sizes(cloud_client.list_sizes())
+def build_server_calculator(config):
+ cloud_size_list = config.node_sizes(config.new_cloud_client().list_sizes())
if not cloud_size_list:
abort("No valid node sizes configured")
+ return ServerCalculator(cloud_size_list,
+ config.getint('Daemon', 'max_nodes'))
- server_calculator = ServerCalculator(
- cloud_size_list,
- config.getint('Daemon', 'min_nodes'),
- config.getint('Daemon', 'max_nodes'))
+def launch_pollers(config, server_calculator):
poll_time = config.getint('Daemon', 'poll_time')
max_poll_time = config.getint('Daemon', 'max_poll_time')
timer = TimedCallBackActor.start(poll_time / 10.0).proxy()
cloud_node_poller = CloudNodeListMonitorActor.start(
- cloud_client, timer, poll_time, max_poll_time).proxy()
+ config.new_cloud_client(), timer, poll_time, max_poll_time).proxy()
arvados_node_poller = ArvadosNodeListMonitorActor.start(
- arvados_client, timer, poll_time, max_poll_time).proxy()
+ config.new_arvados_client(), timer, poll_time, max_poll_time).proxy()
job_queue_poller = JobQueueMonitorActor.start(
config.new_arvados_client(), timer, server_calculator,
poll_time, max_poll_time).proxy()
setup_logging(config.get('Logging', 'file'), **config.log_levels())
node_setup, node_shutdown, node_update, node_monitor = \
config.dispatch_classes()
+ server_calculator = build_server_calculator(config)
timer, cloud_node_poller, arvados_node_poller, job_queue_poller = \
- launch_pollers(config)
+ launch_pollers(config, server_calculator)
cloud_node_updater = node_update.start(config.new_cloud_client).proxy()
node_daemon = NodeManagerDaemonActor.start(
job_queue_poller, arvados_node_poller, cloud_node_poller,
cloud_node_updater, timer,
config.new_arvados_client, config.new_cloud_client,
config.shutdown_windows(),
+ server_calculator.cheapest_size(),
config.getint('Daemon', 'min_nodes'),
config.getint('Daemon', 'max_nodes'),
config.getint('Daemon', 'poll_stale_after'),
This actor regularly polls the list of Arvados node records, and
sends it to subscribers.
"""
-
- CLIENT_ERRORS = config.ARVADOS_ERRORS
LOGGER_NAME = 'arvnodeman.arvados_nodes'
+ def is_common_error(self, exception):
+ return isinstance(exception, config.ARVADOS_ERRORS)
+
def _item_key(self, node):
return node['uuid']
This actor regularly polls the cloud to get a list of running compute
nodes, and sends it to subscribers.
"""
-
- CLIENT_ERRORS = config.CLOUD_ERRORS
LOGGER_NAME = 'arvnodeman.cloud_nodes'
+ def is_common_error(self, exception):
+ return self._client.is_cloud_exception(exception)
+
def _item_key(self, node):
return node.id
from . import testutil
class ComputeNodeSetupActorTestCase(testutil.ActorTestMixin, unittest.TestCase):
- def make_mocks(self, arvados_effect=None, cloud_effect=None):
+ def make_mocks(self, arvados_effect=None):
if arvados_effect is None:
arvados_effect = [testutil.arvados_node_mock()]
self.arvados_effect = arvados_effect
self.assertEqual(self.cloud_client.create_node(),
self.setup_actor.cloud_node.get(self.TIMEOUT))
- def test_failed_calls_retried(self):
+ def test_failed_arvados_calls_retried(self):
self.make_mocks([
arverror.ApiError(httplib2.Response({'status': '500'}), ""),
testutil.arvados_node_mock(),
])
self.make_actor()
+ self.wait_for_assignment(self.setup_actor, 'arvados_node')
+
+ def test_failed_cloud_calls_retried(self):
+ self.make_mocks()
+ self.cloud_client.create_node.side_effect = [
+ Exception("test cloud creation error"),
+ self.cloud_client.create_node.return_value,
+ ]
+ self.make_actor()
self.wait_for_assignment(self.setup_actor, 'cloud_node')
+ def test_failed_post_create_retried(self):
+ self.make_mocks()
+ self.cloud_client.post_create_node.side_effect = [
+ Exception("test cloud post-create error"), None]
+ self.make_actor()
+ done = self.FUTURE_CLASS()
+ self.setup_actor.subscribe(done.set)
+ done.get(self.TIMEOUT)
+ self.assertEqual(2, self.cloud_client.post_create_node.call_count)
+
def test_stop_when_no_cloud_node(self):
self.make_mocks(
arverror.ApiError(httplib2.Response({'status': '500'}), ""))
from __future__ import absolute_import, print_function
+import ssl
import time
import unittest
+import libcloud.common.types as cloud_types
import mock
import arvnodeman.computenode.driver.ec2 as ec2
create_method.call_args[1].get('ex_userdata',
'arg missing'))
- def test_tags_created_from_arvados_node(self):
+ def test_hostname_from_arvados_node(self):
arv_node = testutil.arvados_node_mock(8)
- cloud_node = testutil.cloud_node_mock(8)
- driver = self.new_driver(list_kwargs={'tag:list': 'test'})
- self.assertEqual({'ex_metadata': {'list': 'test'},
- 'name': 'compute8.zzzzz.arvadosapi.com'},
- driver.arvados_create_kwargs(arv_node))
+ driver = self.new_driver()
+ self.assertEqual('compute8.zzzzz.arvadosapi.com',
+ driver.arvados_create_kwargs(arv_node)['name'])
- def test_tags_set_default_hostname_from_new_arvados_node(self):
+ def test_default_hostname_from_new_arvados_node(self):
arv_node = testutil.arvados_node_mock(hostname=None)
driver = self.new_driver()
- actual = driver.arvados_create_kwargs(arv_node)
self.assertEqual('dynamic.compute.zzzzz.arvadosapi.com',
- actual['name'])
+ driver.arvados_create_kwargs(arv_node)['name'])
+
+ def check_node_tagged(self, cloud_node, expected_tags):
+ tag_mock = self.driver_mock().ex_create_tags
+ self.assertTrue(tag_mock.called)
+ self.assertIs(cloud_node, tag_mock.call_args[0][0])
+ self.assertEqual(expected_tags, tag_mock.call_args[0][1])
+
+ def test_post_create_node_tags_from_list_kwargs(self):
+ expect_tags = {'key1': 'test value 1', 'key2': 'test value 2'}
+ list_kwargs = {('tag_' + key): value
+ for key, value in expect_tags.iteritems()}
+ list_kwargs['instance-state-name'] = 'running'
+ cloud_node = testutil.cloud_node_mock()
+ driver = self.new_driver(list_kwargs=list_kwargs)
+ driver.post_create_node(cloud_node)
+ self.check_node_tagged(cloud_node, expect_tags)
def test_sync_node(self):
arv_node = testutil.arvados_node_mock(1)
cloud_node = testutil.cloud_node_mock(2)
driver = self.new_driver()
driver.sync_node(cloud_node, arv_node)
- tag_mock = self.driver_mock().ex_create_tags
- self.assertTrue(tag_mock.called)
- self.assertEqual('compute1.zzzzz.arvadosapi.com',
- tag_mock.call_args[0][1].get('Name', 'no name'))
+ self.check_node_tagged(cloud_node,
+ {'Name': 'compute1.zzzzz.arvadosapi.com'})
def test_node_create_time(self):
refsecs = int(time.time())
node.extra = {'launch_time': time.strftime('%Y-%m-%dT%H:%M:%S.000Z',
reftuple)}
self.assertEqual(refsecs, ec2.ComputeNodeDriver.node_start_time(node))
+
+ def test_cloud_exceptions(self):
+ for error in [Exception("test exception"),
+ IOError("test exception"),
+ ssl.SSLError("test exception"),
+ cloud_types.LibcloudError("test exception")]:
+ self.assertTrue(ec2.ComputeNodeDriver.is_cloud_exception(error),
+ "{} not flagged as cloud exception".format(error))
+
+ def test_noncloud_exceptions(self):
+ self.assertFalse(
+ ec2.ComputeNodeDriver.is_cloud_exception(ValueError("test error")),
+ "ValueError flagged as cloud exception")
class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
unittest.TestCase):
+ def new_setup_proxy(self):
+ # Make sure that every time the daemon starts a setup actor,
+ # it gets a new mock object back.
+ self.last_setup = mock.MagicMock(name='setup_proxy_mock')
+ return self.last_setup
+
def make_daemon(self, cloud_nodes=[], arvados_nodes=[], want_sizes=[],
- min_nodes=0, max_nodes=8):
+ min_size=testutil.MockSize(1), min_nodes=0, max_nodes=8):
for name in ['cloud_nodes', 'arvados_nodes', 'server_wishlist']:
setattr(self, name + '_poller', mock.MagicMock(name=name + '_mock'))
self.arv_factory = mock.MagicMock(name='arvados_mock')
self.cloud_updates = mock.MagicMock(name='updates_mock')
self.timer = testutil.MockTimer(deliver_immediately=False)
self.node_setup = mock.MagicMock(name='setup_mock')
+ self.node_setup.start().proxy.side_effect = self.new_setup_proxy
+ self.node_setup.reset_mock()
self.node_shutdown = mock.MagicMock(name='shutdown_mock')
self.daemon = nmdaemon.NodeManagerDaemonActor.start(
self.server_wishlist_poller, self.arvados_nodes_poller,
self.cloud_nodes_poller, self.cloud_updates, self.timer,
self.arv_factory, self.cloud_factory,
- [54, 5, 1], min_nodes, max_nodes, 600, 1800, 3600,
+ [54, 5, 1], min_size, min_nodes, max_nodes, 600, 1800, 3600,
self.node_setup, self.node_shutdown).proxy()
if cloud_nodes is not None:
self.daemon.update_cloud_nodes(cloud_nodes).get(self.TIMEOUT)
self.stop_proxy(self.daemon)
self.assertTrue(self.node_setup.start.called)
+ def check_monitors_arvados_nodes(self, *arv_nodes):
+ pairings = [monitor.proxy().arvados_node
+ for monitor in self.monitor_list() if monitor.is_alive()]
+ self.assertItemsEqual(arv_nodes, pykka.get_all(pairings, self.TIMEOUT))
+
def test_node_pairing(self):
cloud_node = testutil.cloud_node_mock(1)
arv_node = testutil.arvados_node_mock(1)
self.make_daemon([cloud_node], [arv_node])
self.stop_proxy(self.daemon)
- self.assertEqual(1, self.alive_monitor_count())
- self.assertIs(
- self.monitor_list()[0].proxy().arvados_node.get(self.TIMEOUT),
- arv_node)
+ self.check_monitors_arvados_nodes(arv_node)
def test_node_pairing_after_arvados_update(self):
cloud_node = testutil.cloud_node_mock(2)
arv_node = testutil.arvados_node_mock(2)
self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
self.stop_proxy(self.daemon)
- self.assertEqual(1, self.alive_monitor_count())
- self.assertIs(
- self.monitor_list()[0].proxy().arvados_node.get(self.TIMEOUT),
- arv_node)
+ self.check_monitors_arvados_nodes(arv_node)
+
+ def test_arvados_node_un_and_re_paired(self):
+ arv_node = testutil.arvados_node_mock(3)
+ self.make_daemon([testutil.cloud_node_mock(3)], [arv_node])
+ self.check_monitors_arvados_nodes(arv_node)
+ self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
+ self.assertEqual(0, self.alive_monitor_count())
+ self.daemon.update_cloud_nodes([testutil.cloud_node_mock(3)])
+ self.stop_proxy(self.daemon)
+ self.check_monitors_arvados_nodes(arv_node)
def test_old_arvados_node_not_double_assigned(self):
arv_node = testutil.arvados_node_mock(3, age=9000)
size = testutil.MockSize(3)
self.make_daemon(arvados_nodes=[arv_node])
- setup_ref = self.node_setup.start().proxy().actor_ref
- setup_ref.actor_urn = 0
- self.node_setup.start.reset_mock()
self.daemon.update_server_wishlist([size]).get(self.TIMEOUT)
- self.daemon.max_nodes.get(self.TIMEOUT)
- setup_ref.actor_urn += 1
self.daemon.update_server_wishlist([size, size]).get(self.TIMEOUT)
self.stop_proxy(self.daemon)
used_nodes = [call[1].get('arvados_node')
self.stop_proxy(self.daemon)
self.assertTrue(self.node_setup.start.called)
+ def test_boot_new_node_below_min_nodes(self):
+ min_size = testutil.MockSize(1)
+ wish_size = testutil.MockSize(3)
+ self.make_daemon([], [], None, min_size=min_size, min_nodes=2)
+ self.daemon.update_server_wishlist([wish_size]).get(self.TIMEOUT)
+ self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
+ self.daemon.update_server_wishlist([wish_size]).get(self.TIMEOUT)
+ self.stop_proxy(self.daemon)
+ self.assertEqual([wish_size, min_size],
+ [call[1].get('cloud_size')
+ for call in self.node_setup.start.call_args_list])
+
+ def test_no_new_node_when_ge_min_nodes_busy(self):
+ cloud_nodes = [testutil.cloud_node_mock(n) for n in range(1, 4)]
+ arv_nodes = [testutil.arvados_node_mock(n, job_uuid=True)
+ for n in range(1, 4)]
+ self.make_daemon(cloud_nodes, arv_nodes, [], min_nodes=2)
+ self.stop_proxy(self.daemon)
+ self.assertEqual(0, self.node_setup.start.call_count)
+
def test_no_new_node_when_max_nodes_busy(self):
self.make_daemon([testutil.cloud_node_mock(3)],
[testutil.arvados_node_mock(3, job_uuid=True)],
self.stop_proxy(self.daemon)
self.assertFalse(self.node_setup.start.called)
- def mock_setup_actor(self, cloud_node, arv_node):
- setup = self.node_setup.start().proxy()
- self.node_setup.reset_mock()
- setup.actor_urn = cloud_node.id
- setup.cloud_node.get.return_value = cloud_node
- setup.arvados_node.get.return_value = arv_node
- return setup
-
def start_node_boot(self, cloud_node=None, arv_node=None, id_num=1):
if cloud_node is None:
cloud_node = testutil.cloud_node_mock(id_num)
self.make_daemon(want_sizes=[testutil.MockSize(id_num)])
self.daemon.max_nodes.get(self.TIMEOUT)
self.assertEqual(1, self.node_setup.start.call_count)
- return self.mock_setup_actor(cloud_node, arv_node)
+ self.last_setup.cloud_node.get.return_value = cloud_node
+ self.last_setup.arvados_node.get.return_value = arv_node
+ return self.last_setup
def test_no_duplication_when_booting_node_listed_fast(self):
# Test that we don't start two ComputeNodeMonitorActors when
self.daemon.update_server_wishlist(
[testutil.MockSize(1)]).get(self.TIMEOUT)
self.stop_proxy(self.daemon)
- self.assertFalse(self.node_setup.start.called,
- "daemon did not count booted node toward wishlist")
+ self.assertEqual(1, self.node_setup.start.call_count)
def test_booted_node_can_shutdown(self):
setup = self.start_node_boot()
self.make_daemon(want_sizes=[testutil.MockSize(1)])
self.daemon.update_server_wishlist([]).get(self.TIMEOUT)
self.stop_proxy(self.daemon)
- self.assertTrue(
- self.node_setup.start().proxy().stop_if_no_cloud_node.called)
+ self.assertTrue(self.last_setup.stop_if_no_cloud_node.called)
def test_shutdown_declined_at_wishlist_capacity(self):
cloud_node = testutil.cloud_node_mock(1)
{'min_scratch_mb_per_node': 200})
self.assertEqual(6, len(servlist))
- def test_server_calc_min_nodes_0_jobs(self):
- servcalc = self.make_calculator([1], min_nodes=3, max_nodes=9)
- servlist = self.calculate(servcalc, {})
- self.assertEqual(3, len(servlist))
-
- def test_server_calc_min_nodes_1_job(self):
- servcalc = self.make_calculator([1], min_nodes=3, max_nodes=9)
- servlist = self.calculate(servcalc, {'min_nodes': 1})
- self.assertEqual(3, len(servlist))
-
- def test_server_calc_more_jobs_than_min_nodes(self):
- servcalc = self.make_calculator([1], min_nodes=2, max_nodes=9)
- servlist = self.calculate(servcalc,
- {'min_nodes': 1},
- {'min_nodes': 1},
- {'min_nodes': 1})
- self.assertEqual(3, len(servlist))
-
def test_job_requesting_max_nodes_accepted(self):
servcalc = self.make_calculator([1], max_nodes=4)
servlist = self.calculate(servcalc, {'min_nodes': 4})
self.assertEqual(4, len(servlist))
+ def test_cheapest_size(self):
+ servcalc = self.make_calculator([2, 4, 1, 3])
+ self.assertEqual(testutil.MockSize(1), servcalc.cheapest_size())
+
class JobQueueMonitorActorTestCase(testutil.RemotePollLoopActorTestMixin,
unittest.TestCase):