Merge branch '4754-performance-TC' closes #4754
authorTom Clegg <tom@curoverse.com>
Tue, 23 Dec 2014 20:51:49 +0000 (15:51 -0500)
committerTom Clegg <tom@curoverse.com>
Tue, 23 Dec 2014 20:51:49 +0000 (15:51 -0500)
23 files changed:
apps/workbench/app/assets/stylesheets/application.css.scss
apps/workbench/app/controllers/pipeline_instances_controller.rb
apps/workbench/app/views/application/_projects_tree_menu.html.erb
apps/workbench/app/views/layouts/body.html.erb
apps/workbench/test/controllers/search_controller_test.rb
crunch_scripts/run-command
sdk/cli/test/test_arv-get.rb
sdk/python/.gitignore
services/api/test/fixtures/groups.yml
services/nodemanager/.gitignore [changed from file to symlink]
services/nodemanager/arvnodeman/clientactor.py
services/nodemanager/arvnodeman/computenode/dispatch/__init__.py
services/nodemanager/arvnodeman/computenode/driver/__init__.py
services/nodemanager/arvnodeman/computenode/driver/ec2.py
services/nodemanager/arvnodeman/config.py
services/nodemanager/arvnodeman/daemon.py
services/nodemanager/arvnodeman/jobqueue.py
services/nodemanager/arvnodeman/launcher.py
services/nodemanager/arvnodeman/nodelist.py
services/nodemanager/tests/test_computenode_dispatch.py
services/nodemanager/tests/test_computenode_driver_ec2.py
services/nodemanager/tests/test_daemon.py
services/nodemanager/tests/test_jobqueue.py

index 51d4702485e04e583d59aac3f2fc78b38d0c343f..9bc93e32bd49574ff998e60980c6417cb98d4770 100644 (file)
@@ -204,6 +204,11 @@ table.table-fixed-header-row tbody {
     top:1.5em;
 }
 
+.dropdown-menu {
+    max-height: 30em;
+    overflow-y: auto;
+}
+
 .row-fill-height, .row-fill-height>div[class*='col-'] {
     display: flex;
 }
index eb9bac0fa99d9cff4ab5b45d5008e868253c2d55..25f5ee421c58dc860806b9a0f6b726e2e8816406 100644 (file)
@@ -174,12 +174,9 @@ class PipelineInstancesController < ApplicationController
   end
 
   def show
-    @pipelines = [@object]
-
-    if params[:compare]
-      PipelineInstance.where(uuid: params[:compare]).each do |p|
-        @pipelines << p
-      end
+    # the #show action can also be called by #compare, which does its own work to set up @pipelines
+    unless defined? @pipelines
+      @pipelines = [@object]
     end
 
     provenance, pips = graph(@pipelines)
@@ -259,18 +256,7 @@ class PipelineInstancesController < ApplicationController
     end
 
     if params['tab_pane'] == "Graph"
-      provenance, pips = graph(@objects)
-
       @pipelines = @objects
-
-      if provenance
-        @prov_svg = ProvenanceHelper::create_provenance_graph provenance, "provenance_svg", {
-          :request => request,
-          :all_script_parameters => true,
-          :combine_jobs => :script_and_version,
-          :script_version_nodes => true,
-          :pips => pips }
-      end
     end
 
     @object = @objects.first
index 4a49184aaf409805dcd3df2dd41e3ab632fefb33..4de3c2330ed55407c6a01053906a3ddc37dc91b6 100644 (file)
@@ -1,30 +1,25 @@
-              <li>
-                <%= link_to projects_path(options: {ensure_unique_name: true}), method: :post, class: 'btn btn-xs btn-default pull-right' do %>
-                  <i class="fa fa-plus"></i> Add a new project
-                <% end %>
-              </li>
-              <li>
-                <%= project_link_to.call({object: current_user, depth: 0}) do %>
-                  <span style="padding-left: 0"></span>Home
-                <% end %>
-              </li>
-              <% my_project_tree.each do |pnode| %>
-                <% next if pnode[:object].class != Group %>
-                <li>
-                  <%= project_link_to.call pnode do %>
-                    <span style="padding-left: <%= pnode[:depth] %>em"></span><%= pnode[:object].name %>
-                  <% end %>
-                </li>
-              <% end %>
-              <li class="divider" />
-              <li role="presentation" class="dropdown-header">
-                Projects shared with me
-              </li>
-              <% shared_project_tree.each do |pnode| %>
-                <% next if pnode[:object].class != Group %>
-                <li>
-                  <%= project_link_to.call pnode do %>
-                    <span style="padding-left: <%= pnode[:depth]-1 %>em"></span><i class="fa fa-fw fa-share-alt" style="color:#aaa"></i> <%= pnode[:object].name %>
-                  <% end %>
-                </li>
-              <% end %>
+<li>
+  <%= project_link_to.call({object: current_user, depth: 0}) do %>
+    <span style="padding-left: 0">Home</span>
+  <% end %>
+</li>
+<% my_project_tree.each do |pnode| %>
+  <% next if pnode[:object].class != Group %>
+  <li>
+    <%= project_link_to.call pnode do %>
+      <span style="padding-left: <%= pnode[:depth] %>em"></span><%= pnode[:object].name %>
+    <% end %>
+  </li>
+<% end %>
+<li class="divider" />
+<li role="presentation" class="dropdown-header">
+  Projects shared with me
+</li>
+<% shared_project_tree.each do |pnode| %>
+  <% next if pnode[:object].class != Group %>
+  <li>
+    <%= project_link_to.call pnode do %>
+      <span style="padding-left: <%= pnode[:depth]-1 %>em"></span><i class="fa fa-fw fa-share-alt" style="color:#aaa"></i> <%= pnode[:object].name %>
+    <% end %>
+  </li>
+<% end %>
index 07b536b8e366fcef537ad7f3f869c90462787566..824e370c582d6ad2ea80f12ac8b342bb099a2464 100644 (file)
                 <%= current_user.email %> <span class="caret"></span>
               </a>
               <ul class="dropdown-menu" role="menu">
-                <li role="presentation" class="dropdown-header">
-                  My account
-                </li>
                 <% if current_user.is_active %>
                 <li role="menuitem"><a href="/manage_account" role="menuitem"><i class="fa fa-key fa-fw"></i> Manage account</a></li>
                 <% if Rails.configuration.user_profile_form_fields %>
                   <li role="menuitem"><a href="/users/<%=current_user.uuid%>/profile" role="menuitem"><i class="fa fa-key fa-fw"></i> Manage profile</a></li>
                 <% end %>
-                <li role="presentation" class="divider"></li>
                 <% end %>
                 <li role="menuitem"><a href="<%= logout_path %>" role="menuitem"><i class="fa fa-sign-out fa-fw"></i> Log out</a></li>
                 <% if current_user.is_active and
               Projects
               <span class="caret"></span>
             </a>
-            <ul class="dropdown-menu" role="menu">
+            <ul class="dropdown-menu" style="min-width: 20em" role="menu">
+              <li>
+                <%= link_to projects_path(options: {ensure_unique_name: true}), method: :post, class: 'btn btn-xs btn-default pull-right' do %>
+                  <i class="fa fa-plus"></i> Add a new project
+                <% end %>
+              </li>
               <%= render partial: "projects_tree_menu", locals: {
                   :project_link_to => Proc.new do |pnode, &block|
                     link_to(project_path(pnode[:object].uuid),
index bfbf22d8b6958559f02137ab65ea425827104acc..a09d966a184c219cd5fad8a5dc69abea06d1a86c 100644 (file)
@@ -1,7 +1,42 @@
 require 'test_helper'
 
 class SearchControllerTest < ActionController::TestCase
-  # test "the truth" do
-  #   assert true
-  # end
+  # These tests don't do state-changing API calls. Save some time by
+  # skipping the database reset.
+  reset_api_fixtures :after_each_test, false
+  reset_api_fixtures :after_suite, true
+
+  include Rails.application.routes.url_helpers
+
+  test 'Get search dialog' do
+    xhr :get, :choose, {
+      format: :js,
+      title: 'Search',
+      action_name: 'Show',
+      action_href: url_for(host: 'localhost', controller: :actions, action: :show),
+      action_data: {}.to_json,
+    }, session_for(:active)
+    assert_response :success
+  end
+
+  test 'Get search results for all projects' do
+    xhr :get, :choose, {
+      format: :json,
+      partial: true,
+    }, session_for(:active)
+    assert_response :success
+    assert_not_empty(json_response['content'],
+                     'search results for all projects should not be empty')
+  end
+
+  test 'Get search results for empty project' do
+    xhr :get, :choose, {
+      format: :json,
+      partial: true,
+      project_uuid: api_fixture('groups')['empty_project']['uuid'],
+    }, session_for(:active)
+    assert_response :success
+    assert_empty(json_response['content'],
+                 'search results for empty project should be empty')
+  end
 end
index 13ae918895d6192553107ae00273e247c6eb4c34..c07debd787eecfc6696c5d614d69c013f8028dd7 100755 (executable)
@@ -434,8 +434,8 @@ if "task.vwd" in taskp:
 else:
     outcollection = robust_put.upload(outdir, logger)
 
-# Success if no non-zero return codes
-success = any(rcode) and not any([status != 0 for status in rcode.values()])
+# Success if we ran any subprocess, and they all exited 0.
+success = rcode and all(status == 0 for status in rcode.itervalues())
 
 api.job_tasks().update(uuid=arvados.current_task()['uuid'],
                                      body={
@@ -444,4 +444,4 @@ api.job_tasks().update(uuid=arvados.current_task()['uuid'],
                                          'progress':1.0
                                      }).execute()
 
-sys.exit(rcode)
+sys.exit(0 if success else 1)
index b7afa8b790a9c810751f1d7143da47c3f7feef6d..67dd399a2456fe4a7c2a2a2cf4d86401d409e6d6 100644 (file)
@@ -58,6 +58,7 @@ class TestArvGet < Minitest::Test
   end
 
   def test_file_to_file_no_overwrite_file
+    skip "Waiting unitl #4534 is implemented"
     File.open './tmp/foo', 'wb' do |f|
       f.write 'baz'
     end
@@ -70,6 +71,7 @@ class TestArvGet < Minitest::Test
   end
 
   def test_file_to_file_no_overwrite_file_in_dir
+    skip "Waiting unitl #4534 is implemented"
     File.open './tmp/foo', 'wb' do |f|
       f.write 'baz'
     end
index 105d068d4b60377ef81bcd0f4af620da766bdb68..ab21552d2e16fec0c536ed9de107a630bbab0956 100644 (file)
@@ -1,5 +1,7 @@
+*.pyc
 /build/
 /dist/
-/*.egg
-/*.egg-info
+*.egg
+*.egg-info
 /tests/tmp
+.eggs
index c98cdfa170f166d47970f6e0c372f35ba4f3ea27..86815c04633f2ebac6d73cace66fb7f5e8306ee0 100644 (file)
@@ -139,6 +139,18 @@ group_for_sharing_tests:
   description: Users who can share objects with each other
   group_class: role
 
+empty_project:
+  uuid: zzzzz-j7d0g-9otoxmrksam74q6
+  owner_uuid: zzzzz-tpzed-xurymjxw79nv3jz
+  created_at: 2014-12-16 15:56:27.967534940 Z
+  modified_by_client_uuid: ~
+  modified_by_user_uuid: zzzzz-tpzed-xurymjxw79nv3jz
+  modified_at: 2014-12-16 15:56:27.967358199 Z
+  name: Empty project
+  description: ~
+  updated_at: 2014-12-16 15:56:27.967242142 Z
+  group_class: project
+
 project_with_10_collections:
   uuid: zzzzz-j7d0g-0010collections
   owner_uuid: zzzzz-tpzed-user1withloadab
deleted file mode 100644 (file)
index d6bc7fefd17b560b91e44b1fb806838ecd913cb0..0000000000000000000000000000000000000000
+++ /dev/null
@@ -1,5 +0,0 @@
-*.pyc
-*.egg
-*.egg-info
-build/
-dist/
new file mode 120000 (symlink)
index 0000000000000000000000000000000000000000..ed3b3622f2486f6bd6ff8c3f486ef80fbb045db0
--- /dev/null
@@ -0,0 +1 @@
+../../sdk/python/.gitignore
\ No newline at end of file
index 46a103eb02985a7ba9e24af464e96e0cf6dd5a7b..6319f4bbfc5cece52832842cfda05d9ae9253005 100644 (file)
@@ -30,12 +30,10 @@ class RemotePollLoopActor(actor_class):
     response to subscribers.  It takes care of error handling, and retrying
     requests with exponential backoff.
 
-    To use this actor, define CLIENT_ERRORS and the _send_request method.
-    If you also define an _item_key method, this class will support
-    subscribing to a specific item by key in responses.
+    To use this actor, define the _send_request method.  If you also
+    define an _item_key method, this class will support subscribing to
+    a specific item by key in responses.
     """
-    CLIENT_ERRORS = ()
-
     def __init__(self, client, timer_actor, poll_wait=60, max_poll_wait=180):
         super(RemotePollLoopActor, self).__init__()
         self._client = client
@@ -87,6 +85,9 @@ class RemotePollLoopActor(actor_class):
         return "{} got error: {} - waiting {} seconds".format(
             self.log_prefix, error, self.poll_wait)
 
+    def is_common_error(self, exception):
+        return False
+
     def poll(self, scheduled_start=None):
         self._logger.debug("%s sending poll", self.log_prefix)
         start_time = time.time()
@@ -96,7 +97,7 @@ class RemotePollLoopActor(actor_class):
             response = self._send_request()
         except Exception as error:
             errmsg = self._got_error(error)
-            if isinstance(error, self.CLIENT_ERRORS):
+            if self.is_common_error(error):
                 self._logger.warning(errmsg)
             else:
                 self._logger.exception(errmsg)
index c79d8f9588fbea18e276fd1e8f30474b3ac7677e..48e8dcf45888de232ba9004c290023e6bf5999b7 100644 (file)
@@ -19,32 +19,38 @@ class ComputeNodeStateChangeBase(config.actor_class):
     This base class takes care of retrying changes and notifying
     subscribers when the change is finished.
     """
-    def __init__(self, logger_name, timer_actor, retry_wait, max_retry_wait):
+    def __init__(self, logger_name, cloud_client, timer_actor,
+                 retry_wait, max_retry_wait):
         super(ComputeNodeStateChangeBase, self).__init__()
         self._later = self.actor_ref.proxy()
-        self._timer = timer_actor
         self._logger = logging.getLogger(logger_name)
+        self._cloud = cloud_client
+        self._timer = timer_actor
         self.min_retry_wait = retry_wait
         self.max_retry_wait = max_retry_wait
         self.retry_wait = retry_wait
         self.subscribers = set()
 
     @staticmethod
-    def _retry(errors):
+    def _retry(errors=()):
         """Retry decorator for an actor method that makes remote requests.
 
         Use this function to decorator an actor method, and pass in a
         tuple of exceptions to catch.  This decorator will schedule
         retries of that method with exponential backoff if the
-        original method raises any of the given errors.
+        original method raises a known cloud driver error, or any of the
+        given exception types.
         """
         def decorator(orig_func):
             @functools.wraps(orig_func)
-            def wrapper(self, *args, **kwargs):
+            def retry_wrapper(self, *args, **kwargs):
                 start_time = time.time()
                 try:
                     orig_func(self, *args, **kwargs)
-                except errors as error:
+                except Exception as error:
+                    if not (isinstance(error, errors) or
+                            self._cloud.is_cloud_exception(error)):
+                        raise
                     self._logger.warning(
                         "Client error: %s - waiting %s seconds",
                         error, self.retry_wait)
@@ -56,7 +62,7 @@ class ComputeNodeStateChangeBase(config.actor_class):
                                           self.max_retry_wait)
                 else:
                     self.retry_wait = self.min_retry_wait
-            return wrapper
+            return retry_wrapper
         return decorator
 
     def _finished(self):
@@ -86,9 +92,9 @@ class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
                  cloud_size, arvados_node=None,
                  retry_wait=1, max_retry_wait=180):
         super(ComputeNodeSetupActor, self).__init__(
-            'arvnodeman.nodeup', timer_actor, retry_wait, max_retry_wait)
+            'arvnodeman.nodeup', cloud_client, timer_actor,
+            retry_wait, max_retry_wait)
         self._arvados = arvados_client
-        self._cloud = cloud_client
         self.cloud_size = cloud_size
         self.arvados_node = None
         self.cloud_node = None
@@ -97,12 +103,12 @@ class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
         else:
             self._later.prepare_arvados_node(arvados_node)
 
-    @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
+    @ComputeNodeStateChangeBase._retry()
     def create_arvados_node(self):
         self.arvados_node = self._arvados.nodes().create(body={}).execute()
         self._later.create_cloud_node()
 
-    @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
+    @ComputeNodeStateChangeBase._retry()
     def prepare_arvados_node(self, node):
         self.arvados_node = self._arvados.nodes().update(
             uuid=node['uuid'],
@@ -116,13 +122,19 @@ class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
             ).execute()
         self._later.create_cloud_node()
 
-    @ComputeNodeStateChangeBase._retry(config.CLOUD_ERRORS)
+    @ComputeNodeStateChangeBase._retry()
     def create_cloud_node(self):
         self._logger.info("Creating cloud node with size %s.",
                           self.cloud_size.name)
         self.cloud_node = self._cloud.create_node(self.cloud_size,
                                                   self.arvados_node)
         self._logger.info("Cloud node %s created.", self.cloud_node.id)
+        self._later.post_create()
+
+    @ComputeNodeStateChangeBase._retry()
+    def post_create(self):
+        self._cloud.post_create_node(self.cloud_node)
+        self._logger.info("%s post-create work done.", self.cloud_node.id)
         self._finished()
 
     def stop_if_no_cloud_node(self):
@@ -143,8 +155,8 @@ class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
         # eligible.  Normal shutdowns based on job demand should be
         # cancellable; shutdowns based on node misbehavior should not.
         super(ComputeNodeShutdownActor, self).__init__(
-            'arvnodeman.nodedown', timer_actor, retry_wait, max_retry_wait)
-        self._cloud = cloud_client
+            'arvnodeman.nodedown', cloud_client, timer_actor,
+            retry_wait, max_retry_wait)
         self._monitor = node_monitor.proxy()
         self.cloud_node = self._monitor.cloud_node.get()
         self.cancellable = cancellable
@@ -159,7 +171,7 @@ class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
 
     def _stop_if_window_closed(orig_func):
         @functools.wraps(orig_func)
-        def wrapper(self, *args, **kwargs):
+        def stop_wrapper(self, *args, **kwargs):
             if (self.cancellable and
                   (not self._monitor.shutdown_eligible().get())):
                 self._logger.info(
@@ -169,10 +181,10 @@ class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
                 return None
             else:
                 return orig_func(self, *args, **kwargs)
-        return wrapper
+        return stop_wrapper
 
     @_stop_if_window_closed
-    @ComputeNodeStateChangeBase._retry(config.CLOUD_ERRORS)
+    @ComputeNodeStateChangeBase._retry()
     def shutdown_node(self):
         if self._cloud.destroy_node(self.cloud_node):
             self._logger.info("Cloud node %s shut down.", self.cloud_node.id)
@@ -210,14 +222,14 @@ class ComputeNodeUpdateActor(config.actor_class):
 
     def _throttle_errors(orig_func):
         @functools.wraps(orig_func)
-        def wrapper(self, *args, **kwargs):
+        def throttle_wrapper(self, *args, **kwargs):
             throttle_time = self.next_request_time - time.time()
             if throttle_time > 0:
                 time.sleep(throttle_time)
             self.next_request_time = time.time()
             try:
                 result = orig_func(self, *args, **kwargs)
-            except config.CLOUD_ERRORS:
+            except Exception as error:
                 self.error_streak += 1
                 self.next_request_time += min(2 ** self.error_streak,
                                               self.max_retry_wait)
@@ -225,7 +237,7 @@ class ComputeNodeUpdateActor(config.actor_class):
             else:
                 self.error_streak = 0
                 return result
-        return wrapper
+        return throttle_wrapper
 
     @_throttle_errors
     def sync_node(self, cloud_node, arvados_node):
index a20cfde37146a46f05438817aef14ac307565491..3a0c2063b426ccd759ad4dd8323847d51f72bce4 100644 (file)
@@ -2,6 +2,10 @@
 
 from __future__ import absolute_import, print_function
 
+import libcloud.common.types as cloud_types
+
+from ...config import NETWORK_ERRORS
+
 class BaseComputeNodeDriver(object):
     """Abstract base class for compute node drivers.
 
@@ -15,6 +19,8 @@ class BaseComputeNodeDriver(object):
     creation kwargs with information about the specific Arvados node
     record), sync_node, and node_start_time.
     """
+    CLOUD_ERRORS = NETWORK_ERRORS + (cloud_types.LibcloudError,)
+
     def __init__(self, auth_kwargs, list_kwargs, create_kwargs, driver_class):
         self.real = driver_class(**auth_kwargs)
         self.list_kwargs = list_kwargs
@@ -52,6 +58,12 @@ class BaseComputeNodeDriver(object):
         kwargs['size'] = size
         return self.real.create_node(**kwargs)
 
+    def post_create_node(self, cloud_node):
+        # ComputeNodeSetupActor calls this method after the cloud node is
+        # created.  Any setup tasks that need to happen afterward (e.g.,
+        # tagging) should be done in this method.
+        pass
+
     def sync_node(self, cloud_node, arvados_node):
         # When a compute node first pings the API server, the API server
         # will automatically assign some attributes on the corresponding
@@ -62,3 +74,11 @@ class BaseComputeNodeDriver(object):
     @classmethod
     def node_start_time(cls, node):
         raise NotImplementedError("BaseComputeNodeDriver.node_start_time")
+
+    @classmethod
+    def is_cloud_exception(cls, exception):
+        # libcloud compute drivers typically raise bare Exceptions to
+        # represent API errors.  Return True for any exception that is
+        # exactly an Exception, or a better-known higher-level exception.
+        return (isinstance(exception, cls.CLOUD_ERRORS) or
+                getattr(exception, '__class__', None) is Exception)
index c0992f7b9635e2c47edd8e67cf5b887ba5692718..255a948a6c3aa0ee2c17ae1581685d282c341813 100644 (file)
@@ -79,8 +79,7 @@ class ComputeNodeDriver(BaseComputeNodeDriver):
         return 'auth', key
 
     def arvados_create_kwargs(self, arvados_node):
-        result = {'ex_metadata': self.tags.copy(),
-                  'name': arvados_node_fqdn(arvados_node)}
+        result = {'name': arvados_node_fqdn(arvados_node)}
         ping_secret = arvados_node['info'].get('ping_secret')
         if ping_secret is not None:
             ping_url = ('https://{}/arvados/v1/nodes/{}/ping?ping_secret={}'.
@@ -89,11 +88,12 @@ class ComputeNodeDriver(BaseComputeNodeDriver):
             result['ex_userdata'] = ping_url
         return result
 
+    def post_create_node(self, cloud_node):
+        self.real.ex_create_tags(cloud_node, self.tags)
+
     def sync_node(self, cloud_node, arvados_node):
-        metadata = self.arvados_create_kwargs(arvados_node)
-        tags = metadata['ex_metadata']
-        tags['Name'] = metadata['name']
-        self.real.ex_create_tags(cloud_node, tags)
+        self.real.ex_create_tags(cloud_node,
+                                 {'Name': arvados_node_fqdn(arvados_node)})
 
     @classmethod
     def node_start_time(cls, node):
index f018015717c150b0065212248f35bbfbf3d1d4a7..b7ec1fc80d9a0211867b7d06e5dd8ffb272f1ff4 100644 (file)
@@ -10,7 +10,6 @@ import sys
 
 import arvados
 import httplib2
-import libcloud.common.types as cloud_types
 import pykka
 from apiclient import errors as apierror
 
@@ -19,7 +18,6 @@ from apiclient import errors as apierror
 # it's low-level, but unlikely to catch code bugs.
 NETWORK_ERRORS = (IOError, ssl.SSLError)
 ARVADOS_ERRORS = NETWORK_ERRORS + (apierror.Error,)
-CLOUD_ERRORS = NETWORK_ERRORS + (cloud_types.LibcloudError,)
 
 actor_class = pykka.ThreadingActor
 
index d03d1450bbe67bfdb0184293010e987b14e2d6fc..0e480786da369d3f844e8d0111835c389c5d03b4 100644 (file)
@@ -97,7 +97,7 @@ class NodeManagerDaemonActor(actor_class):
     def __init__(self, server_wishlist_actor, arvados_nodes_actor,
                  cloud_nodes_actor, cloud_update_actor, timer_actor,
                  arvados_factory, cloud_factory,
-                 shutdown_windows, min_nodes, max_nodes,
+                 shutdown_windows, min_size, min_nodes, max_nodes,
                  poll_stale_after=600,
                  boot_fail_after=1800,
                  node_stale_after=7200,
@@ -116,6 +116,7 @@ class NodeManagerDaemonActor(actor_class):
         self._logger = logging.getLogger('arvnodeman.daemon')
         self._later = self.actor_ref.proxy()
         self.shutdown_windows = shutdown_windows
+        self.min_cloud_size = min_size
         self.min_nodes = min_nodes
         self.max_nodes = max_nodes
         self.poll_stale_after = poll_stale_after
@@ -179,6 +180,7 @@ class NodeManagerDaemonActor(actor_class):
                     break
         for key, record in self.cloud_nodes.orphans.iteritems():
             record.actor.stop()
+            record.cloud_node = None
             self.shutdowns.pop(key, None)
 
     def update_arvados_nodes(self, nodelist):
@@ -206,9 +208,12 @@ class NodeManagerDaemonActor(actor_class):
 
     def _nodes_wanted(self):
         up_count = self._nodes_up()
+        under_min = self.min_nodes - up_count
         over_max = up_count - self.max_nodes
         if over_max >= 0:
             return -over_max
+        elif under_min > 0:
+            return under_min
         else:
             up_count -= len(self.shutdowns) + self._nodes_busy()
             return len(self.last_wishlist) - up_count
@@ -253,7 +258,10 @@ class NodeManagerDaemonActor(actor_class):
         if nodes_wanted < 1:
             return None
         arvados_node = self.arvados_nodes.find_stale_node(self.node_stale_after)
-        cloud_size = self.last_wishlist[nodes_wanted - 1]
+        try:
+            cloud_size = self.last_wishlist[self._nodes_up()]
+        except IndexError:
+            cloud_size = self.min_cloud_size
         self._logger.info("Want %s more nodes.  Booting a %s node.",
                           nodes_wanted, cloud_size.name)
         new_setup = self._node_setup.start(
index 239934f52911782730840d3d1aa5042866345451..06f66b71c244f5662fb0c9871f7c92b0603f1617 100644 (file)
@@ -38,11 +38,10 @@ class ServerCalculator(object):
             return True
 
 
-    def __init__(self, server_list, min_nodes=0, max_nodes=None):
+    def __init__(self, server_list, max_nodes=None):
         self.cloud_sizes = [self.CloudSizeWrapper(s, **kws)
                             for s, kws in server_list]
         self.cloud_sizes.sort(key=lambda s: s.price)
-        self.min_nodes = min_nodes
         self.max_nodes = max_nodes or float('inf')
         self.logger = logging.getLogger('arvnodeman.jobqueue')
         self.logged_jobs = set()
@@ -79,15 +78,11 @@ class ServerCalculator(object):
             elif (want_count <= self.max_nodes):
                 servers.extend([cloud_size.real] * max(1, want_count))
         self.logged_jobs.intersection_update(seen_jobs)
-
-        # Make sure the server queue has at least enough entries to
-        # satisfy min_nodes.
-        node_shortfall = self.min_nodes - len(servers)
-        if node_shortfall > 0:
-            basic_node = self.cloud_size_for_constraints({})
-            servers.extend([basic_node.real] * node_shortfall)
         return servers
 
+    def cheapest_size(self):
+        return self.cloud_sizes[0]
+
 
 class JobQueueMonitorActor(clientactor.RemotePollLoopActor):
     """Actor to generate server wishlists from the job queue.
index 5fa404fcbbdf4df95d435ab2a73afbc2abe7bc97..880158234da668ca212604252a070b8db30cddbd 100644 (file)
@@ -57,25 +57,22 @@ def setup_logging(path, level, **sublevels):
         sublogger = logging.getLogger(logger_name)
         sublogger.setLevel(sublevel)
 
-def launch_pollers(config):
-    cloud_client = config.new_cloud_client()
-    arvados_client = config.new_arvados_client()
-    cloud_size_list = config.node_sizes(cloud_client.list_sizes())
+def build_server_calculator(config):
+    cloud_size_list = config.node_sizes(config.new_cloud_client().list_sizes())
     if not cloud_size_list:
         abort("No valid node sizes configured")
+    return ServerCalculator(cloud_size_list,
+                            config.getint('Daemon', 'max_nodes'))
 
-    server_calculator = ServerCalculator(
-        cloud_size_list,
-        config.getint('Daemon', 'min_nodes'),
-        config.getint('Daemon', 'max_nodes'))
+def launch_pollers(config, server_calculator):
     poll_time = config.getint('Daemon', 'poll_time')
     max_poll_time = config.getint('Daemon', 'max_poll_time')
 
     timer = TimedCallBackActor.start(poll_time / 10.0).proxy()
     cloud_node_poller = CloudNodeListMonitorActor.start(
-        cloud_client, timer, poll_time, max_poll_time).proxy()
+        config.new_cloud_client(), timer, poll_time, max_poll_time).proxy()
     arvados_node_poller = ArvadosNodeListMonitorActor.start(
-        arvados_client, timer, poll_time, max_poll_time).proxy()
+        config.new_arvados_client(), timer, poll_time, max_poll_time).proxy()
     job_queue_poller = JobQueueMonitorActor.start(
         config.new_arvados_client(), timer, server_calculator,
         poll_time, max_poll_time).proxy()
@@ -108,14 +105,16 @@ def main(args=None):
     setup_logging(config.get('Logging', 'file'), **config.log_levels())
     node_setup, node_shutdown, node_update, node_monitor = \
         config.dispatch_classes()
+    server_calculator = build_server_calculator(config)
     timer, cloud_node_poller, arvados_node_poller, job_queue_poller = \
-        launch_pollers(config)
+        launch_pollers(config, server_calculator)
     cloud_node_updater = node_update.start(config.new_cloud_client).proxy()
     node_daemon = NodeManagerDaemonActor.start(
         job_queue_poller, arvados_node_poller, cloud_node_poller,
         cloud_node_updater, timer,
         config.new_arvados_client, config.new_cloud_client,
         config.shutdown_windows(),
+        server_calculator.cheapest_size(),
         config.getint('Daemon', 'min_nodes'),
         config.getint('Daemon', 'max_nodes'),
         config.getint('Daemon', 'poll_stale_after'),
index 7ddfb7ca33e8b97f8132117c66789529415d8b90..83dd93f077bfb504a8f41b6f0addd93c717ac7da 100644 (file)
@@ -11,10 +11,11 @@ class ArvadosNodeListMonitorActor(clientactor.RemotePollLoopActor):
     This actor regularly polls the list of Arvados node records, and
     sends it to subscribers.
     """
-
-    CLIENT_ERRORS = config.ARVADOS_ERRORS
     LOGGER_NAME = 'arvnodeman.arvados_nodes'
 
+    def is_common_error(self, exception):
+        return isinstance(exception, config.ARVADOS_ERRORS)
+
     def _item_key(self, node):
         return node['uuid']
 
@@ -28,10 +29,11 @@ class CloudNodeListMonitorActor(clientactor.RemotePollLoopActor):
     This actor regularly polls the cloud to get a list of running compute
     nodes, and sends it to subscribers.
     """
-
-    CLIENT_ERRORS = config.CLOUD_ERRORS
     LOGGER_NAME = 'arvnodeman.cloud_nodes'
 
+    def is_common_error(self, exception):
+        return self._client.is_cloud_exception(exception)
+
     def _item_key(self, node):
         return node.id
 
index c86dcfdca21d314f20b5e7586cd2988389fc32ae..a1dfde30e1c90eeaeeb04e2542bd53866d4eff5f 100644 (file)
@@ -14,7 +14,7 @@ import arvnodeman.computenode.dispatch as dispatch
 from . import testutil
 
 class ComputeNodeSetupActorTestCase(testutil.ActorTestMixin, unittest.TestCase):
-    def make_mocks(self, arvados_effect=None, cloud_effect=None):
+    def make_mocks(self, arvados_effect=None):
         if arvados_effect is None:
             arvados_effect = [testutil.arvados_node_mock()]
         self.arvados_effect = arvados_effect
@@ -48,14 +48,33 @@ class ComputeNodeSetupActorTestCase(testutil.ActorTestMixin, unittest.TestCase):
         self.assertEqual(self.cloud_client.create_node(),
                          self.setup_actor.cloud_node.get(self.TIMEOUT))
 
-    def test_failed_calls_retried(self):
+    def test_failed_arvados_calls_retried(self):
         self.make_mocks([
                 arverror.ApiError(httplib2.Response({'status': '500'}), ""),
                 testutil.arvados_node_mock(),
                 ])
         self.make_actor()
+        self.wait_for_assignment(self.setup_actor, 'arvados_node')
+
+    def test_failed_cloud_calls_retried(self):
+        self.make_mocks()
+        self.cloud_client.create_node.side_effect = [
+            Exception("test cloud creation error"),
+            self.cloud_client.create_node.return_value,
+            ]
+        self.make_actor()
         self.wait_for_assignment(self.setup_actor, 'cloud_node')
 
+    def test_failed_post_create_retried(self):
+        self.make_mocks()
+        self.cloud_client.post_create_node.side_effect = [
+            Exception("test cloud post-create error"), None]
+        self.make_actor()
+        done = self.FUTURE_CLASS()
+        self.setup_actor.subscribe(done.set)
+        done.get(self.TIMEOUT)
+        self.assertEqual(2, self.cloud_client.post_create_node.call_count)
+
     def test_stop_when_no_cloud_node(self):
         self.make_mocks(
             arverror.ApiError(httplib2.Response({'status': '500'}), ""))
index fde103e10e606f68ca5e0b3ba262f0a350e6df64..fae63a5663d82035b43d82288de82ade2788f99b 100644 (file)
@@ -2,9 +2,11 @@
 
 from __future__ import absolute_import, print_function
 
+import ssl
 import time
 import unittest
 
+import libcloud.common.types as cloud_types
 import mock
 
 import arvnodeman.computenode.driver.ec2 as ec2
@@ -55,30 +57,41 @@ class EC2ComputeNodeDriverTestCase(unittest.TestCase):
                       create_method.call_args[1].get('ex_userdata',
                                                      'arg missing'))
 
-    def test_tags_created_from_arvados_node(self):
+    def test_hostname_from_arvados_node(self):
         arv_node = testutil.arvados_node_mock(8)
-        cloud_node = testutil.cloud_node_mock(8)
-        driver = self.new_driver(list_kwargs={'tag:list': 'test'})
-        self.assertEqual({'ex_metadata': {'list': 'test'},
-                          'name': 'compute8.zzzzz.arvadosapi.com'},
-                         driver.arvados_create_kwargs(arv_node))
+        driver = self.new_driver()
+        self.assertEqual('compute8.zzzzz.arvadosapi.com',
+                         driver.arvados_create_kwargs(arv_node)['name'])
 
-    def test_tags_set_default_hostname_from_new_arvados_node(self):
+    def test_default_hostname_from_new_arvados_node(self):
         arv_node = testutil.arvados_node_mock(hostname=None)
         driver = self.new_driver()
-        actual = driver.arvados_create_kwargs(arv_node)
         self.assertEqual('dynamic.compute.zzzzz.arvadosapi.com',
-                         actual['name'])
+                         driver.arvados_create_kwargs(arv_node)['name'])
+
+    def check_node_tagged(self, cloud_node, expected_tags):
+        tag_mock = self.driver_mock().ex_create_tags
+        self.assertTrue(tag_mock.called)
+        self.assertIs(cloud_node, tag_mock.call_args[0][0])
+        self.assertEqual(expected_tags, tag_mock.call_args[0][1])
+
+    def test_post_create_node_tags_from_list_kwargs(self):
+        expect_tags = {'key1': 'test value 1', 'key2': 'test value 2'}
+        list_kwargs = {('tag_' + key): value
+                       for key, value in expect_tags.iteritems()}
+        list_kwargs['instance-state-name'] = 'running'
+        cloud_node = testutil.cloud_node_mock()
+        driver = self.new_driver(list_kwargs=list_kwargs)
+        driver.post_create_node(cloud_node)
+        self.check_node_tagged(cloud_node, expect_tags)
 
     def test_sync_node(self):
         arv_node = testutil.arvados_node_mock(1)
         cloud_node = testutil.cloud_node_mock(2)
         driver = self.new_driver()
         driver.sync_node(cloud_node, arv_node)
-        tag_mock = self.driver_mock().ex_create_tags
-        self.assertTrue(tag_mock.called)
-        self.assertEqual('compute1.zzzzz.arvadosapi.com',
-                         tag_mock.call_args[0][1].get('Name', 'no name'))
+        self.check_node_tagged(cloud_node,
+                               {'Name': 'compute1.zzzzz.arvadosapi.com'})
 
     def test_node_create_time(self):
         refsecs = int(time.time())
@@ -87,3 +100,16 @@ class EC2ComputeNodeDriverTestCase(unittest.TestCase):
         node.extra = {'launch_time': time.strftime('%Y-%m-%dT%H:%M:%S.000Z',
                                                    reftuple)}
         self.assertEqual(refsecs, ec2.ComputeNodeDriver.node_start_time(node))
+
+    def test_cloud_exceptions(self):
+        for error in [Exception("test exception"),
+                      IOError("test exception"),
+                      ssl.SSLError("test exception"),
+                      cloud_types.LibcloudError("test exception")]:
+            self.assertTrue(ec2.ComputeNodeDriver.is_cloud_exception(error),
+                            "{} not flagged as cloud exception".format(error))
+
+    def test_noncloud_exceptions(self):
+        self.assertFalse(
+            ec2.ComputeNodeDriver.is_cloud_exception(ValueError("test error")),
+            "ValueError flagged as cloud exception")
index 0f146a162a9ea6d585265249527ad7a99a774f82..96fcde9524b910b56b45a2aea01bd3bdf06066cf 100644 (file)
@@ -14,8 +14,14 @@ from . import testutil
 
 class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
                                      unittest.TestCase):
+    def new_setup_proxy(self):
+        # Make sure that every time the daemon starts a setup actor,
+        # it gets a new mock object back.
+        self.last_setup = mock.MagicMock(name='setup_proxy_mock')
+        return self.last_setup
+
     def make_daemon(self, cloud_nodes=[], arvados_nodes=[], want_sizes=[],
-                    min_nodes=0, max_nodes=8):
+                    min_size=testutil.MockSize(1), min_nodes=0, max_nodes=8):
         for name in ['cloud_nodes', 'arvados_nodes', 'server_wishlist']:
             setattr(self, name + '_poller', mock.MagicMock(name=name + '_mock'))
         self.arv_factory = mock.MagicMock(name='arvados_mock')
@@ -24,12 +30,14 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         self.cloud_updates = mock.MagicMock(name='updates_mock')
         self.timer = testutil.MockTimer(deliver_immediately=False)
         self.node_setup = mock.MagicMock(name='setup_mock')
+        self.node_setup.start().proxy.side_effect = self.new_setup_proxy
+        self.node_setup.reset_mock()
         self.node_shutdown = mock.MagicMock(name='shutdown_mock')
         self.daemon = nmdaemon.NodeManagerDaemonActor.start(
             self.server_wishlist_poller, self.arvados_nodes_poller,
             self.cloud_nodes_poller, self.cloud_updates, self.timer,
             self.arv_factory, self.cloud_factory,
-            [54, 5, 1], min_nodes, max_nodes, 600, 1800, 3600,
+            [54, 5, 1], min_size, min_nodes, max_nodes, 600, 1800, 3600,
             self.node_setup, self.node_shutdown).proxy()
         if cloud_nodes is not None:
             self.daemon.update_cloud_nodes(cloud_nodes).get(self.TIMEOUT)
@@ -56,15 +64,17 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         self.stop_proxy(self.daemon)
         self.assertTrue(self.node_setup.start.called)
 
+    def check_monitors_arvados_nodes(self, *arv_nodes):
+        pairings = [monitor.proxy().arvados_node
+                    for monitor in self.monitor_list() if monitor.is_alive()]
+        self.assertItemsEqual(arv_nodes, pykka.get_all(pairings, self.TIMEOUT))
+
     def test_node_pairing(self):
         cloud_node = testutil.cloud_node_mock(1)
         arv_node = testutil.arvados_node_mock(1)
         self.make_daemon([cloud_node], [arv_node])
         self.stop_proxy(self.daemon)
-        self.assertEqual(1, self.alive_monitor_count())
-        self.assertIs(
-            self.monitor_list()[0].proxy().arvados_node.get(self.TIMEOUT),
-            arv_node)
+        self.check_monitors_arvados_nodes(arv_node)
 
     def test_node_pairing_after_arvados_update(self):
         cloud_node = testutil.cloud_node_mock(2)
@@ -73,21 +83,23 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         arv_node = testutil.arvados_node_mock(2)
         self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
         self.stop_proxy(self.daemon)
-        self.assertEqual(1, self.alive_monitor_count())
-        self.assertIs(
-            self.monitor_list()[0].proxy().arvados_node.get(self.TIMEOUT),
-            arv_node)
+        self.check_monitors_arvados_nodes(arv_node)
+
+    def test_arvados_node_un_and_re_paired(self):
+        arv_node = testutil.arvados_node_mock(3)
+        self.make_daemon([testutil.cloud_node_mock(3)], [arv_node])
+        self.check_monitors_arvados_nodes(arv_node)
+        self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
+        self.assertEqual(0, self.alive_monitor_count())
+        self.daemon.update_cloud_nodes([testutil.cloud_node_mock(3)])
+        self.stop_proxy(self.daemon)
+        self.check_monitors_arvados_nodes(arv_node)
 
     def test_old_arvados_node_not_double_assigned(self):
         arv_node = testutil.arvados_node_mock(3, age=9000)
         size = testutil.MockSize(3)
         self.make_daemon(arvados_nodes=[arv_node])
-        setup_ref = self.node_setup.start().proxy().actor_ref
-        setup_ref.actor_urn = 0
-        self.node_setup.start.reset_mock()
         self.daemon.update_server_wishlist([size]).get(self.TIMEOUT)
-        self.daemon.max_nodes.get(self.TIMEOUT)
-        setup_ref.actor_urn += 1
         self.daemon.update_server_wishlist([size, size]).get(self.TIMEOUT)
         self.stop_proxy(self.daemon)
         used_nodes = [call[1].get('arvados_node')
@@ -120,6 +132,26 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         self.stop_proxy(self.daemon)
         self.assertTrue(self.node_setup.start.called)
 
+    def test_boot_new_node_below_min_nodes(self):
+        min_size = testutil.MockSize(1)
+        wish_size = testutil.MockSize(3)
+        self.make_daemon([], [], None, min_size=min_size, min_nodes=2)
+        self.daemon.update_server_wishlist([wish_size]).get(self.TIMEOUT)
+        self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
+        self.daemon.update_server_wishlist([wish_size]).get(self.TIMEOUT)
+        self.stop_proxy(self.daemon)
+        self.assertEqual([wish_size, min_size],
+                         [call[1].get('cloud_size')
+                          for call in self.node_setup.start.call_args_list])
+
+    def test_no_new_node_when_ge_min_nodes_busy(self):
+        cloud_nodes = [testutil.cloud_node_mock(n) for n in range(1, 4)]
+        arv_nodes = [testutil.arvados_node_mock(n, job_uuid=True)
+                     for n in range(1, 4)]
+        self.make_daemon(cloud_nodes, arv_nodes, [], min_nodes=2)
+        self.stop_proxy(self.daemon)
+        self.assertEqual(0, self.node_setup.start.call_count)
+
     def test_no_new_node_when_max_nodes_busy(self):
         self.make_daemon([testutil.cloud_node_mock(3)],
                          [testutil.arvados_node_mock(3, job_uuid=True)],
@@ -128,14 +160,6 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         self.stop_proxy(self.daemon)
         self.assertFalse(self.node_setup.start.called)
 
-    def mock_setup_actor(self, cloud_node, arv_node):
-        setup = self.node_setup.start().proxy()
-        self.node_setup.reset_mock()
-        setup.actor_urn = cloud_node.id
-        setup.cloud_node.get.return_value = cloud_node
-        setup.arvados_node.get.return_value = arv_node
-        return setup
-
     def start_node_boot(self, cloud_node=None, arv_node=None, id_num=1):
         if cloud_node is None:
             cloud_node = testutil.cloud_node_mock(id_num)
@@ -144,7 +168,9 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         self.make_daemon(want_sizes=[testutil.MockSize(id_num)])
         self.daemon.max_nodes.get(self.TIMEOUT)
         self.assertEqual(1, self.node_setup.start.call_count)
-        return self.mock_setup_actor(cloud_node, arv_node)
+        self.last_setup.cloud_node.get.return_value = cloud_node
+        self.last_setup.arvados_node.get.return_value = arv_node
+        return self.last_setup
 
     def test_no_duplication_when_booting_node_listed_fast(self):
         # Test that we don't start two ComputeNodeMonitorActors when
@@ -179,8 +205,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         self.daemon.update_server_wishlist(
             [testutil.MockSize(1)]).get(self.TIMEOUT)
         self.stop_proxy(self.daemon)
-        self.assertFalse(self.node_setup.start.called,
-                         "daemon did not count booted node toward wishlist")
+        self.assertEqual(1, self.node_setup.start.call_count)
 
     def test_booted_node_can_shutdown(self):
         setup = self.start_node_boot()
@@ -250,8 +275,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         self.make_daemon(want_sizes=[testutil.MockSize(1)])
         self.daemon.update_server_wishlist([]).get(self.TIMEOUT)
         self.stop_proxy(self.daemon)
-        self.assertTrue(
-            self.node_setup.start().proxy().stop_if_no_cloud_node.called)
+        self.assertTrue(self.last_setup.stop_if_no_cloud_node.called)
 
     def test_shutdown_declined_at_wishlist_capacity(self):
         cloud_node = testutil.cloud_node_mock(1)
index ae5bf1e90872371671a784a44c8e40f62954487e..4c97aed8b109a576843105bad7cf7f62b14426ce 100644 (file)
@@ -48,29 +48,15 @@ class ServerCalculatorTestCase(unittest.TestCase):
                                   {'min_scratch_mb_per_node': 200})
         self.assertEqual(6, len(servlist))
 
-    def test_server_calc_min_nodes_0_jobs(self):
-        servcalc = self.make_calculator([1], min_nodes=3, max_nodes=9)
-        servlist = self.calculate(servcalc, {})
-        self.assertEqual(3, len(servlist))
-
-    def test_server_calc_min_nodes_1_job(self):
-        servcalc = self.make_calculator([1], min_nodes=3, max_nodes=9)
-        servlist = self.calculate(servcalc, {'min_nodes': 1})
-        self.assertEqual(3, len(servlist))
-
-    def test_server_calc_more_jobs_than_min_nodes(self):
-        servcalc = self.make_calculator([1], min_nodes=2, max_nodes=9)
-        servlist = self.calculate(servcalc,
-                                  {'min_nodes': 1},
-                                  {'min_nodes': 1},
-                                  {'min_nodes': 1})
-        self.assertEqual(3, len(servlist))
-
     def test_job_requesting_max_nodes_accepted(self):
         servcalc = self.make_calculator([1], max_nodes=4)
         servlist = self.calculate(servcalc, {'min_nodes': 4})
         self.assertEqual(4, len(servlist))
 
+    def test_cheapest_size(self):
+        servcalc = self.make_calculator([2, 4, 1, 3])
+        self.assertEqual(testutil.MockSize(1), servcalc.cheapest_size())
+
 
 class JobQueueMonitorActorTestCase(testutil.RemotePollLoopActorTestMixin,
                                    unittest.TestCase):