Merge branch 'master' into 4024-pipeline-instances-scroll
authorradhika <radhika@curoverse.com>
Fri, 14 Nov 2014 19:07:17 +0000 (14:07 -0500)
committerradhika <radhika@curoverse.com>
Fri, 14 Nov 2014 19:07:17 +0000 (14:07 -0500)
64 files changed:
apps/workbench/app/assets/javascripts/select_modal.js
apps/workbench/app/assets/javascripts/selection.js.erb
apps/workbench/app/controllers/application_controller.rb
apps/workbench/app/controllers/collections_controller.rb
apps/workbench/app/controllers/pipeline_instances_controller.rb
apps/workbench/app/controllers/users_controller.rb
apps/workbench/app/helpers/application_helper.rb
apps/workbench/app/models/arvados_api_client.rb
apps/workbench/app/models/arvados_base.rb
apps/workbench/app/models/arvados_resource_list.rb
apps/workbench/app/views/application/_choose.html.erb
apps/workbench/app/views/application/_paging.html.erb
apps/workbench/app/views/collections/_choose_rows.html.erb
apps/workbench/app/views/collections/_index_tbody.html.erb
apps/workbench/app/views/collections/_show_chooser_preview.html.erb
apps/workbench/app/views/collections/_show_files.html.erb
apps/workbench/app/views/collections/_show_recent.html.erb
apps/workbench/app/views/layouts/body.html.erb
apps/workbench/app/views/projects/_show_sharing.html.erb
apps/workbench/app/views/users/_show_admin.html.erb
apps/workbench/test/integration/pipeline_instances_test.rb
apps/workbench/test/unit/arvados_resource_list_test.rb
crunch_scripts/run-command
doc/api/schema/Job.html.textile.liquid
doc/sdk/cli/index.html.textile.liquid
doc/sdk/cli/subcommands.html.textile.liquid
sdk/cli/bin/arv
sdk/cli/bin/arv-tag
sdk/go/keepclient/keepclient.go
sdk/go/keepclient/keepclient_test.go
sdk/go/keepclient/root_sorter.go [new file with mode: 0644]
sdk/go/keepclient/root_sorter_test.go [new file with mode: 0644]
sdk/go/keepclient/support.go
sdk/python/arvados/keep.py
sdk/python/tests/run_test_server.py
sdk/python/tests/test_keep_client.py
services/api/app/models/job.rb
services/api/db/migrate/20141111133038_add_arvados_sdk_version_to_jobs.rb [new file with mode: 0644]
services/api/db/structure.sql
services/api/lib/load_param.rb
services/api/test/fixtures/keep_services.yml
services/api/test/fixtures/links.yml
services/api/test/fixtures/pipeline_templates.yml
services/api/test/fixtures/repositories.yml
services/api/test/functional/arvados/v1/commits_controller_test.rb
services/api/test/functional/arvados/v1/groups_controller_test.rb
services/api/test/functional/arvados/v1/repositories_controller_test.rb
services/api/test/helpers/git_test_helper.rb
services/api/test/integration/keep_proxy_test.rb
services/api/test/test.git.tar
services/api/test/unit/job_test.rb
services/keepproxy/keepproxy_test.go
services/nodemanager/arvnodeman/computenode/__init__.py
services/nodemanager/arvnodeman/computenode/dispatch/__init__.py [new file with mode: 0644]
services/nodemanager/arvnodeman/computenode/driver/__init__.py [new file with mode: 0644]
services/nodemanager/arvnodeman/computenode/driver/dummy.py [moved from services/nodemanager/arvnodeman/computenode/dummy.py with 96% similarity]
services/nodemanager/arvnodeman/computenode/driver/ec2.py [moved from services/nodemanager/arvnodeman/computenode/ec2.py with 98% similarity]
services/nodemanager/arvnodeman/config.py
services/nodemanager/arvnodeman/daemon.py
services/nodemanager/arvnodeman/launcher.py
services/nodemanager/tests/test_computenode.py
services/nodemanager/tests/test_computenode_dispatch.py [new file with mode: 0644]
services/nodemanager/tests/test_computenode_driver_ec2.py [moved from services/nodemanager/tests/test_computenode_ec2.py with 98% similarity]
services/nodemanager/tests/test_daemon.py

index ba106d111046309b83453210301ec2dfde736676..bd68bc8a2facf92e0769b3ccec9dc9cd384e8aba 100644 (file)
@@ -8,14 +8,19 @@ $(document).on('click', '.selectable', function() {
             removeClass('active');
     }
     $this.toggleClass('active');
-    any = ($container.
+
+    if (!$this.hasClass('use-preview-selection')) {
+      any = ($container.
            find('.selectable.active').length > 0)
-    $this.
+    }
+
+    if (!$container.hasClass('preview-selectable-container')) {
+      $this.
         closest('.modal').
         find('[data-enable-if-selection]').
         prop('disabled', !any);
 
-    if ($this.hasClass('active')) {
+      if ($this.hasClass('active')) {
         var no_preview_available = '<div class="spinner-h-center spinner-v-center"><center>(No preview available)</center></div>';
         if (!$this.attr('data-preview-href')) {
             $(".modal-dialog-preview-pane").html(no_preview_available);
@@ -30,6 +35,14 @@ $(document).on('click', '.selectable', function() {
             fail(function(data, status, jqxhr) {
                 $(".modal-dialog-preview-pane").html(no_preview_available);
             });
+      }
+    } else {
+      any = ($container.
+           find('.preview-selectable.active').length > 0)
+      $(this).
+          closest('.modal').
+          find('[data-enable-if-selection]').
+          prop('disabled', !any);
     }
 
 }).on('click', '.modal button[data-action-href]', function() {
@@ -40,10 +53,18 @@ $(document).on('click', '.selectable', function() {
     var action_data_from_params = $(this).data('action-data-from-params');
     var selection_param = action_data.selection_param;
     $modal.find('.modal-error').removeClass('hide').hide();
-    $modal.find('.selectable.active[data-object-uuid]').each(function() {
+
+    var $preview_selections = $modal.find('.preview-selectable.active');
+    if ($preview_selections.length > 0) {
+      data.push({name: selection_param, value: $preview_selections.first().attr('href')});
+    }
+
+    if (data.length == 0) {   // not using preview selection option
+      $modal.find('.selectable.active[data-object-uuid]').each(function() {
         var val = $(this).attr('data-object-uuid');
         data.push({name: selection_param, value: val});
-    });
+      });
+    }
     $.each($.extend({}, action_data, action_data_from_params),
            function(key, value) {
                if (value instanceof Array && key[-1] != ']') {
index ca4dce3d9bf373f2e93949cbfb27f818d78318ae..40724bb566b4f8903687edfc9e2e69bf331d4ff4 100644 (file)
@@ -108,7 +108,8 @@ jQuery(function($){
             }
         });
 
-    $(window).on('load storage', update_count);
+    $(window).on('load', clear_selections);
+    $(window).on('storage', update_count);
 
     $('#selection-form-content').on("click", function(e) {
         e.stopPropagation();
index 71a7c766c7670307f0ff4b749ce2737bb6a1603d..88ea4b0a6141b484a7f4a60404f17fa54b1cf979 100644 (file)
@@ -268,6 +268,7 @@ class ApplicationController < ActionController::Base
       if params[:partial]
         f.json {
           find_objects_for_index if !@objects
+          @objects.fetch_multiple_pages(false)
           render json: {
             content: render_to_string(partial: "choose_rows.html",
                                       formats: [:html]),
index 5ddf93c2d35292555d538105c771e0ae1579b551..39f637e2274d5ac12037363c8051fd17b63d5d9e 100644 (file)
@@ -47,34 +47,6 @@ class CollectionsController < ApplicationController
     end
   end
 
-  def choose
-    # Find collections using default find_objects logic, then search for name
-    # links, and preload any other links connected to the collections that are
-    # found.
-    # Name links will be obsolete when issue #3036 is merged,
-    # at which point this entire custom #choose function can probably be
-    # eliminated.
-
-    params[:limit] ||= 40
-
-    find_objects_for_index
-    @collections = @objects
-
-    @filters += [['link_class','=','name'],
-                 ['head_uuid','is_a','arvados#collection']]
-
-    @objects = Link
-    find_objects_for_index
-
-    @name_links = @objects
-
-    @objects = Collection.
-      filter([['uuid','in',@name_links.collect(&:head_uuid)]])
-
-    preload_links_for_objects (@collections.to_a + @objects.to_a)
-    super
-  end
-
   def index
     # API server index doesn't return manifest_text by default, but our
     # callers want it unless otherwise specified.
@@ -82,7 +54,7 @@ class CollectionsController < ApplicationController
     base_search = Collection.select(@select)
     if params[:search].andand.length.andand > 0
       tags = Link.where(any: ['contains', params[:search]])
-      @collections = (base_search.where(uuid: tags.collect(&:head_uuid)) |
+      @objects = (base_search.where(uuid: tags.collect(&:head_uuid)) |
                       base_search.where(any: ['contains', params[:search]])).
         uniq { |c| c.uuid }
     else
@@ -98,12 +70,11 @@ class CollectionsController < ApplicationController
         offset = 0
       end
 
-      @collections = base_search.limit(limit).offset(offset)
+      @objects = base_search.limit(limit).offset(offset)
     end
-    @links = Link.limit(1000).
-      where(head_uuid: @collections.collect(&:uuid))
+    @links = Link.where(head_uuid: @objects.collect(&:uuid))
     @collection_info = {}
-    @collections.each do |c|
+    @objects.each do |c|
       @collection_info[c.uuid] = {
         tag_links: [],
         wanted: false,
@@ -209,7 +180,7 @@ class CollectionsController < ApplicationController
     return super if !@object
     if current_user
       if Keep::Locator.parse params["uuid"]
-        @same_pdh = Collection.filter([["portable_data_hash", "=", @object.portable_data_hash]]).limit(1000)
+        @same_pdh = Collection.filter([["portable_data_hash", "=", @object.portable_data_hash]])
         if @same_pdh.results.size == 1
           redirect_to collection_path(@same_pdh[0]["uuid"])
           return
index f5522ce360efea502a73f69de83a961f03eb6326..82a9b348bc1943f393855390e11441224778e034 100644 (file)
@@ -64,7 +64,9 @@ class PipelineInstancesController < ApplicationController
         if component[:script_parameters]
           component[:script_parameters].each do |param, value_info|
             if value_info.is_a? Hash
-              value_info_class = resource_class_for_uuid(value_info[:value])
+              value_info_partitioned = value_info[:value].partition('/') if value_info[:value].andand.class.eql?(String)
+              value_info_value = value_info_partitioned ? value_info_partitioned[0] : value_info[:value]
+              value_info_class = resource_class_for_uuid value_info_value
               if value_info_class == Link
                 # Use the link target, not the link itself, as script
                 # parameter; but keep the link info around as well.
@@ -81,10 +83,15 @@ class PipelineInstancesController < ApplicationController
                 # to ensure reproducibility, the script_parameter for a
                 # collection should be the portable_data_hash
                 # keep the collection name and uuid for human-readability
-                obj = Collection.find value_info[:value]
-                value_info[:value] = obj.portable_data_hash
+                obj = Collection.find value_info_value
+                if value_info_partitioned
+                  value_info[:value] = obj.portable_data_hash + value_info_partitioned[1] + value_info_partitioned[2]
+                  value_info[:selection_name] = obj.name + value_info_partitioned[1] + value_info_partitioned[2]
+                else
+                  value_info[:value] = obj.portable_data_hash
+                  value_info[:selection_name] = obj.name
+                end
                 value_info[:selection_uuid] = obj.uuid
-                value_info[:selection_name] = obj.name
               end
             end
           end
index 86e982368c04425a98ea3d9537bfa10507af3b9b..43a88955f042301b673fe6c4e067039a9ee142ff 100644 (file)
@@ -35,7 +35,7 @@ class UsersController < ApplicationController
 
   def activity
     @breadcrumb_page_name = nil
-    @users = User.limit(params[:limit] || 1000).all
+    @users = User.limit(params[:limit])
     @user_activity = {}
     @activity = {
       logins: {},
@@ -88,7 +88,7 @@ class UsersController < ApplicationController
 
   def storage
     @breadcrumb_page_name = nil
-    @users = User.limit(params[:limit] || 1000).all
+    @users = User.limit(params[:limit])
     @user_storage = {}
     total_storage = {}
     @log_date = {}
@@ -159,7 +159,7 @@ class UsersController < ApplicationController
       @persist_state[uuid] = 'cache'
     end
 
-    Link.limit(1000).filter([['head_uuid', 'in', collection_uuids],
+    Link.filter([['head_uuid', 'in', collection_uuids],
                              ['link_class', 'in', ['tag', 'resources']]]).
       each do |link|
       case link.link_class
index cfe7f19187a064bb19759950778fd72a890854ae..c8c4ccc1a96a89facdbc580ab7f9c40eb7f80505 100644 (file)
@@ -261,7 +261,7 @@ module ApplicationHelper
       dn += '[value]'
     end
 
-    if dataclass == Collection
+    if (dataclass == Collection) or (dataclass == File)
       selection_param = object.class.to_s.underscore + dn
       display_value = attrvalue
       if value_info.is_a?(Hash)
@@ -274,9 +274,9 @@ module ApplicationHelper
         end
       end
       if (attr == :components) and (subattr.size > 2)
-        chooser_title = "Choose a dataset for #{object.component_input_title(subattr[0], subattr[2])}:"
+        chooser_title = "Choose a #{dataclass == Collection ? 'dataset' : 'file'} for #{object.component_input_title(subattr[0], subattr[2])}:"
       else
-        chooser_title = "Choose a dataset:"
+        chooser_title = "Choose a #{dataclass == Collection ? 'dataset' : 'file'}:"
       end
       modal_path = choose_collections_path \
       ({ title: chooser_title,
@@ -287,6 +287,7 @@ module ApplicationHelper
          preconfigured_search_str: (preconfigured_search_str || ""),
          action_data: {
            merge: true,
+           use_preview_selection: dataclass == File ? true : nil,
            selection_param: selection_param,
            success: 'page-refresh'
          }.to_json,
@@ -448,10 +449,10 @@ module ApplicationHelper
     end
   end
 
-  def chooser_preview_url_for object
+  def chooser_preview_url_for object, use_preview_selection=false
     case object.class.to_s
     when 'Collection'
-      polymorphic_path(object, tab_pane: 'chooser_preview')
+      polymorphic_path(object, tab_pane: 'chooser_preview', use_preview_selection: use_preview_selection)
     else
       nil
     end
index 7076799f8369e64b55e4d3b33d3c404787a08ad5..0a99d662cbeaeca246379cc967c9cba6e9ec3840 100644 (file)
@@ -78,7 +78,8 @@ class ArvadosApiClient
     @client_mtx = Mutex.new
   end
 
-  def api(resources_kind, action, data=nil)
+  def api(resources_kind, action, data=nil, tokens={})
+
     profile_checkpoint
 
     if not @api_client
@@ -100,8 +101,8 @@ class ArvadosApiClient
     url.sub! '/arvados/v1/../../', '/'
 
     query = {
-      'api_token' => Thread.current[:arvados_api_token] || '',
-      'reader_tokens' => (Thread.current[:reader_tokens] || []).to_json,
+      'api_token' => tokens[:arvados_api_token] || Thread.current[:arvados_api_token] || '',
+      'reader_tokens' => (tokens[:reader_tokens] || Thread.current[:reader_tokens] || []).to_json,
     }
     if !data.nil?
       data.each do |k,v|
index f5be0e1edcba20ddfb1f80f4ece1912eac0d5dfd..bc5a9a37ddc351754feaef68c8d01916f6ca052c 100644 (file)
@@ -139,8 +139,8 @@ class ArvadosBase < ActiveRecord::Base
     ArvadosResourceList.new(self).eager(*args)
   end
 
-  def self.all(*args)
-    ArvadosResourceList.new(self).all(*args)
+  def self.all
+    ArvadosResourceList.new(self)
   end
 
   def self.permit_attribute_params raw_params
index 1a3c6b7e3c8e55036306743843f31839489c4da3..3000aa8ac264a1e18523e810a594689a8ec607f5 100644 (file)
@@ -4,6 +4,9 @@ class ArvadosResourceList
 
   def initialize resource_class=nil
     @resource_class = resource_class
+    @fetch_multiple_pages = true
+    @arvados_api_token = Thread.current[:arvados_api_token]
+    @reader_tokens = Thread.current[:reader_tokens]
   end
 
   def eager(bool=true)
@@ -12,6 +15,9 @@ class ArvadosResourceList
   end
 
   def limit(max_results)
+    if not max_results.nil? and not max_results.is_a? Integer
+      raise ArgumentError("argument to limit() must be an Integer or nil")
+    end
     @limit = max_results
     self
   end
@@ -44,17 +50,17 @@ class ArvadosResourceList
   end
 
   def where(cond)
-    cond = cond.dup
-    cond.keys.each do |uuid_key|
-      if cond[uuid_key] and (cond[uuid_key].is_a? Array or
-                             cond[uuid_key].is_a? ArvadosBase)
+    @cond = cond.dup
+    @cond.keys.each do |uuid_key|
+      if @cond[uuid_key] and (@cond[uuid_key].is_a? Array or
+                             @cond[uuid_key].is_a? ArvadosBase)
         # Coerce cond[uuid_key] to an array of uuid strings.  This
         # allows caller the convenience of passing an array of real
         # objects and uuids in cond[uuid_key].
-        if !cond[uuid_key].is_a? Array
-          cond[uuid_key] = [cond[uuid_key]]
+        if !@cond[uuid_key].is_a? Array
+          @cond[uuid_key] = [@cond[uuid_key]]
         end
-        cond[uuid_key] = cond[uuid_key].collect do |item|
+        @cond[uuid_key] = @cond[uuid_key].collect do |item|
           if item.is_a? ArvadosBase
             item.uuid
           else
@@ -63,52 +69,54 @@ class ArvadosResourceList
         end
       end
     end
-    cond.keys.select { |x| x.match /_kind$/ }.each do |kind_key|
-      if cond[kind_key].is_a? Class
-        cond = cond.merge({ kind_key => 'arvados#' + arvados_api_client.class_kind(cond[kind_key]) })
+    @cond.keys.select { |x| x.match /_kind$/ }.each do |kind_key|
+      if @cond[kind_key].is_a? Class
+        @cond = @cond.merge({ kind_key => 'arvados#' + arvados_api_client.class_kind(@cond[kind_key]) })
       end
     end
-    api_params = {
-      _method: 'GET',
-      where: cond
-    }
-    api_params[:eager] = '1' if @eager
-    api_params[:limit] = @limit if @limit
-    api_params[:offset] = @offset if @offset
-    api_params[:select] = @select if @select
-    api_params[:order] = @orderby_spec if @orderby_spec
-    api_params[:filters] = @filters if @filters
-    res = arvados_api_client.api @resource_class, '', api_params
-    @results = arvados_api_client.unpack_api_response res
+    self
+  end
+
+  def fetch_multiple_pages(f)
+    @fetch_multiple_pages = f
     self
   end
 
   def results
-    self.where({}) if !@results
+    if !@results
+      @results = []
+      self.each_page do |r|
+        @results.concat r
+      end
+    end
     @results
   end
 
   def results=(r)
     @results = r
+    @items_available = r.items_available if r.respond_to? :items_available
+    @result_limit = r.limit if r.respond_to? :limit
+    @result_offset = r.offset if r.respond_to? :offset
+    @results
   end
 
-  def all
-    where({})
+  def to_ary
+    results
   end
 
   def each(&block)
-    results.each do |m|
-      block.call m
+    if not @results.nil?
+      @results.each &block
+    else
+      self.each_page do |items|
+        items.each do |i|
+          block.call i
+        end
+      end
     end
     self
   end
 
-  def collect
-    results.collect do |m|
-      yield m
-    end
-  end
-
   def first
     results.first
   end
@@ -129,62 +137,77 @@ class ArvadosResourceList
     end
   end
 
-  def to_ary
-    results
-  end
-
   def to_hash
-    Hash[results.collect { |x| [x.uuid, x] }]
+    Hash[self.collect { |x| [x.uuid, x] }]
   end
 
   def empty?
-    results.empty?
+    self.first.nil?
   end
 
   def items_available
-    results.items_available if results.respond_to? :items_available
+    @items_available
   end
 
   def result_limit
-    results.limit if results.respond_to? :limit
+    @result_limit
   end
 
   def result_offset
-    results.offset if results.respond_to? :offset
+    @result_offset
   end
 
-  def result_links
-    results.links if results.respond_to? :links
+  # Obsolete method retained during api transition.
+  def links_for item_or_uuid, link_class=false
+    []
   end
 
-  # Return links provided with API response that point to the
-  # specified object, and have the specified link_class. If link_class
-  # is false or omitted, return all links pointing to the specified
-  # object.
-  def links_for item_or_uuid, link_class=false
-    return [] if !result_links
-    unless @links_for_uuid
-      @links_for_uuid = {}
-      result_links.each do |link|
-        if link.respond_to? :head_uuid
-          @links_for_uuid[link.head_uuid] ||= []
-          @links_for_uuid[link.head_uuid] << link
-        end
+  protected
+
+  def each_page
+    api_params = {
+      _method: 'GET'
+    }
+    api_params[:where] = @cond if @cond
+    api_params[:eager] = '1' if @eager
+    api_params[:select] = @select if @select
+    api_params[:order] = @orderby_spec if @orderby_spec
+    api_params[:filters] = @filters if @filters
+
+
+    item_count = 0
+    offset = @offset || 0
+    @result_limit = nil
+    @result_offset = nil
+
+    begin
+      api_params[:offset] = offset
+      api_params[:limit] = (@limit - item_count) if @limit
+
+      res = arvados_api_client.api(@resource_class, '', api_params,
+                                   arvados_api_token: @arvados_api_token,
+                                   reader_tokens: @reader_tokens)
+      items = arvados_api_client.unpack_api_response res
+
+      break if items.nil? or not items.any?
+
+      @items_available = items.items_available if items.respond_to?(:items_available)
+      @result_limit = items.limit if (@fetch_multiple_pages == false) and items.respond_to?(:limit)
+      @result_offset = items.offset if (@fetch_multiple_pages == false) and items.respond_to?(:offset)
+
+      item_count += items.size
+      if items.respond_to?(:offset)
+        offset = items.offset + items.size
+      else
+        offset = item_count
       end
-    end
-    if item_or_uuid.respond_to? :uuid
-      uuid = item_or_uuid.uuid
-    else
-      uuid = item_or_uuid
-    end
-    (@links_for_uuid[uuid] || []).select do |link|
-      link_class == false or link.link_class == link_class
-    end
-  end
 
-  # Note: this arbitrarily chooses one of (possibly) multiple names.
-  def name_for item_or_uuid
-    links_for(item_or_uuid, 'name').first.andand.name
+      yield items
+
+      break if @limit and item_count >= @limit
+      break if items.respond_to? :items_available and offset >= items.items_available
+    end while @fetch_multiple_pages
+    self
   end
 
 end
index fc161397d2b845353c1c8dbca3f8cd9aae6a936b..d36f5f95d2ccff0aede8bfc878ae475a60a86f30 100644 (file)
                <% if project_filters.any? %>
                  data-infinite-content-params-from-project-dropdown="<%= {filters: project_filters, project_uuid: project_filters.last.last}.to_json %>"
                <% end %>
-               data-infinite-content-href="<%= url_for partial: true %>">
+               <%
+                  action_data = JSON.parse params['action_data'] if params['action_data']
+                  use_preview_sel = action_data ? action_data['use_preview_selection'] : false
+                %>
+               data-infinite-content-href="<%= url_for partial: true,
+                                                       use_preview_selection: use_preview_sel %>">
           </div>
           <% if preview_pane %>
             <div class="col-md-6 hidden-xs hidden-sm modal-dialog-preview-pane" style="height: 100%; overflow-y: scroll">
index df9d08d778c997fb641600eecb70c87fddb59ed1..9c64c2b30418e530dbeb7aaca85cf303bc17cb06 100644 (file)
@@ -11,25 +11,27 @@ min-width: 1.2em;
 }
 <% end %>
 
+<% results.fetch_multiple_pages(false) %>
+
 <% if results.respond_to? :result_offset and
        results.respond_to? :result_limit and
        results.respond_to? :items_available and
        results.result_offset != nil and
        results.result_limit != nil and
-       results.items_available != nil 
+       results.items_available != nil
 %>
 <div class="index-paging">
-  Displaying <%= results.result_offset+1 %> &ndash; 
-  <%= if results.result_offset + results.result_limit > results.items_available 
-        results.items_available 
-      else 
-        results.result_offset + results.result_limit 
+  Displaying <%= results.result_offset+1 %> &ndash;
+  <%= if results.result_offset + results.result_limit > results.items_available
+        results.items_available
+      else
+        results.result_offset + results.result_limit
       end %>
  out of <%= results.items_available %>
 </div>
 
 <% if not (results.result_offset == 0 and results.items_available <= results.result_limit) %>
-  
+
 <div class="index-paging">
 
 <% if results.result_offset > 0 %>
@@ -54,7 +56,7 @@ min-width: 1.2em;
 <% if results.result_offset > 0 %>
   <%= link_to raw("<span class='glyphicon glyphicon-fast-backward'></span>"), {:id => object, :offset => 0, :limit => results.result_limit}  %>
 <% else %>
-  <span class='glyphicon glyphicon-fast-backward text-muted'></span>  
+  <span class='glyphicon glyphicon-fast-backward text-muted'></span>
 <% end %>
 
 <% if prev_offset %>
@@ -111,10 +113,10 @@ min-width: 1.2em;
 <% end %>
 
 <% if (results.items_available - results.result_offset) >= results.result_limit %>
-  <%= link_to raw("<span class='glyphicon glyphicon-fast-forward'></span>"), {:id => @object, :offset => results.items_available - (results.items_available % results.result_limit), 
+  <%= link_to raw("<span class='glyphicon glyphicon-fast-forward'></span>"), {:id => @object, :offset => results.items_available - (results.items_available % results.result_limit),
         :limit => results.result_limit}  %>
 <% else %>
-  <span class='glyphicon glyphicon-fast-forward text-muted'></span>  
+  <span class='glyphicon glyphicon-fast-forward text-muted'></span>
 <% end %>
 
 </span>
index da0f9759c1beae0c6f3c71786bdc438a17315fdb..17274ddb94f31150992e98ecc0114a86e31f6631 100644 (file)
@@ -1,6 +1,6 @@
-<% @collections.each do |object| %>
-    <div class="row filterable selectable" data-object-uuid="<%= object.uuid %>"
-         data-preview-href="<%= chooser_preview_url_for object %>"
+<% @objects.each do |object| %>
+    <div class="row filterable selectable <%= 'use-preview-selection' if params['use_preview_selection']%>" data-object-uuid="<%= object.uuid %>"
+         data-preview-href="<%= chooser_preview_url_for object, params['use_preview_selection'] %>"
          style="margin-left: 1em; border-bottom-style: solid; border-bottom-width: 1px; border-bottom-color: #DDDDDD">
       <i class="fa fa-fw fa-archive"></i>
       <% if object.respond_to? :name %>
       <% end %>
     </div>
 <% end %>
-
-<% @name_links.each do |name_link| %>
-  <% if (object = get_object(name_link.head_uuid)) %>
-    <div class="row filterable selectable" data-object-uuid="<%= name_link.uuid %>"
-         data-preview-href="<%= chooser_preview_url_for object %>"
-         style="margin-left: 1em; border-bottom-style: solid; border-bottom-width: 1px; border-bottom-color: #DDDDDD">
-      <i class="fa fa-fw fa-archive"></i>
-      <%= name_link.name %>
-      <% links_for_object(object).each do |tag| %>
-        <% if tag.link_class == 'tag' %>
-          <span class="label label-info"><%= tag.name %></span>
-        <% end %>
-      <% end %>
-    </div>
-  <% end %>
-<% end %>
index 3d5c1c7a5899213fafb6bec3ab2909b18ac77e5f..25d8796b41889ac913953d75dfeb22a710d0f70f 100644 (file)
@@ -1,4 +1,4 @@
-<% @collections.each do |c| %>
+<% @objects.each do |c| %>
 
 <tr class="collection" data-object-uuid="<%= c.uuid %>">
   <td>
index 4a0a8377cd9f20e658bca395b207a346356cc63b..cb91f8576f511626684be3015bf15f43a9dc75e2 100644 (file)
@@ -1,2 +1,2 @@
 <%= render partial: "show_source_summary" %>
-<%= render partial: "show_files", locals: {no_checkboxes: true} %>
+<%= render partial: "show_files", locals: {no_checkboxes: true, use_preview_selection: params['use_preview_selection']} %>
index 5b6dac81ed72846e510d1515051ca7e3e287dc74..76d8731a9353e4ae55ae68734d176258e8f2b9ed 100644 (file)
@@ -19,7 +19,18 @@ function unselect_all_files() {
 }
 </script>
 
-<div class="selection-action-container" style="padding-left: 1em">
+<%
+  preview_selectable_container = ''
+  preview_selectable = ''
+  padding_left = '1em'
+  if !params['use_preview_selection'].nil? and params['use_preview_selection'] == 'true'
+    preview_selectable_container = 'preview-selectable-container selectable-container'
+    preview_selectable = 'preview-selectable selectable'
+    padding_left = '0em'
+  end
+%>
+
+<div class="selection-action-container" style="padding-left: <%=padding_left%>">
   <% if !defined? no_checkboxes or !no_checkboxes %>
   <div class="row">
     <div class="pull-left">
@@ -53,7 +64,7 @@ function unselect_all_files() {
 <% if file_tree.nil? or file_tree.empty? %>
   <p>This collection is empty.</p>
 <% else %>
-  <ul id="collection_files" class="collection_files">
+  <ul id="collection_files" class="collection_files <%=preview_selectable_container%>">
   <% dirstack = [file_tree.first.first] %>
   <% file_tree.take(10000).each_with_index do |(dirname, filename, size), index| %>
     <% file_path = CollectionsHelper::file_path([dirname, filename]) %>
@@ -64,13 +75,13 @@ function unselect_all_files() {
     <% if size.nil?  # This is a subdirectory. %>
       <% dirstack.push(File.join(dirname, filename)) %>
       <div class="collection_files_row">
-       <div class="collection_files_name"><i class="fa fa-fw fa-folder-open"></i> <%= filename %></div>
+       <div class="collection_files_name><i class="fa fa-fw fa-folder-open"></i> <%= filename %></div>
       </div>
       <ul class="collection_files">
     <% else %>
       <% link_params = {controller: 'collections', action: 'show_file',
                         uuid: @object.portable_data_hash, file: file_path, size: size} %>
-       <div class="collection_files_row filterable">
+       <div class="collection_files_row filterable <%=preview_selectable%>" href="<%=@object.uuid%>/<%=file_path%>">
         <div class="collection_files_buttons pull-right">
           <%= raw(human_readable_bytes_html(size)) %>
           <% disable_search = (Rails.configuration.filename_suffixes_with_view_icon.include? file_path.split('.')[-1]) ? false : true %>
@@ -85,7 +96,7 @@ function unselect_all_files() {
         <div class="collection_files_name">
           <% if !defined? no_checkboxes or !no_checkboxes %>
           <%= check_box_tag 'uuids[]', "#{@object.uuid}/#{file_path}", false, {
-                :class => 'persistent-selection',
+                :class => "persistent-selection",
                 :friendly_type => "File",
                 :friendly_name => "#{@object.uuid}/#{file_path}",
                 :href => url_for(controller: 'collections', action: 'show_file',
@@ -103,7 +114,7 @@ function unselect_all_files() {
                       {title: file_path}) %>
         </div>
       <% else %>
-          <i class="fa fa-fw fa-file"></i> <%= filename %></div>
+          <i class="fa fa-fw fa-file" href="<%=@object.uuid%>/<%=file_path%>" ></i> <%= filename %></div>
        </div>
       <% end %>
       </li>
index c958b290e8600c8b789b49ad21d06186873bd078..6ebb3b2a28e67632b134959e612be7fa8c319637 100644 (file)
@@ -17,7 +17,7 @@
   </div>
   <p/>
 
-<%= render partial: "paging", locals: {results: @collections, object: @object} %>
+<%= render partial: "paging", locals: {results: @objects, object: @object} %>
 
 <div style="padding-right: 1em">
 
@@ -49,7 +49,7 @@
 
 </div>
 
-<%= render partial: "paging", locals: {results: @collections, object: @object} %>
+<%= render partial: "paging", locals: {results: @objects, object: @object} %>
 
 <% content_for :footer_js do %>
 $(document).on('click', 'form[data-remote] input[type=submit]', function() {
index 3fc0c92fcd8b1ef8cc400c979cf9d97f997d394b..9e966e148a9de3e54254259a7d1e911a35aaaaf7 100644 (file)
               </ul>
             </li>
 
-            <li class="dropdown selection-menu">
-              <a href="#" class="dropdown-toggle" data-toggle="dropdown">
-                <span class="fa fa-lg fa-paperclip"></span>
-                <span class="badge" id="persistent-selection-count"></span>
-              </a>
-              <ul class="dropdown-menu" role="menu" id="persistent-selection-list">
-                <%= form_tag '/actions' do %>
-                  <%= hidden_field_tag 'uuid', @object.andand.uuid %>
-                  <div id="selection-form-content"></div>
-                <% end %>
-              </ul>
-            </li>
-
             <% if current_user.is_admin %>
               <li class="dropdown">
                 <a href="#" class="dropdown-toggle" data-toggle="dropdown" id="system-menu">
index 95a7ee100dacf5f495d2ac4c12d86ac795ad99fd..cc862c425c168e5ab61e4a12827bed2df517adbc 100644 (file)
@@ -2,7 +2,7 @@
    uuid_map = {}
    if @share_links
      [User, Group].each do |type|
-       type.limit(10000)
+       type
          .filter([['uuid','in',@share_links.collect(&:tail_uuid)]])
          .each do |o|
          uuid_map[o.uuid] = o
@@ -40,7 +40,6 @@
       by_project: false,
       preview_pane: false,
       multiple: true,
-      limit: 10000,
       filters: choose_filters[share_class].to_json,
       action_method: 'post',
       action_href: share_with_project_path,
index 4c76ede5151c1fc67387aa752233962a57d25efa..262dfa06876cf72cccab68317cd92d8779c340a7 100644 (file)
@@ -47,7 +47,7 @@
         </div>
         <form>
           <% permitted_group_perms = {}
-             Link.limit(10000).filter([
+             Link.filter([
              ['tail_uuid', '=', @object.uuid],
              ['head_uuid', 'is_a', 'arvados#group'],
              ['link_class', '=', 'permission'],
index cdaa3f1f3806c4be51755d8042e97e4651d86704..4aea40858f2a439098b3f9d05eac984d85571d62 100644 (file)
@@ -29,13 +29,6 @@ class PipelineInstancesTest < ActionDispatch::IntegrationTest
 
     instance_page = current_path
 
-    # Go over to the collections page and select something
-    visit '/collections'
-    within('tr', text: 'GNU_General_Public_License') do
-      find('input[type=checkbox]').click
-    end
-    find('#persistent-selection-count').click
-
     # Add this collection to the project
     visit '/projects'
     find("#projects-menu").click
@@ -106,13 +99,6 @@ class PipelineInstancesTest < ActionDispatch::IntegrationTest
   test 'Create pipeline inside a project and run' do
     visit page_with_token('active_trustedclient')
 
-    # Go over to the collections page and select something
-    visit '/collections'
-    within('tr', text: 'GNU_General_Public_License') do
-      find('input[type=checkbox]').click
-    end
-    find('#persistent-selection-count').click
-
     # Add this collection to the project using collections menu from top nav
     visit '/projects'
     find("#projects-menu").click
@@ -127,13 +113,13 @@ class PipelineInstancesTest < ActionDispatch::IntegrationTest
       wait_for_ajax
     end
 
-    create_and_run_pipeline_in_aproject true
+    create_and_run_pipeline_in_aproject true, 'Two Part Pipeline Template', false
   end
 
   # Create a pipeline instance from outside of a project
   test 'Run a pipeline from dashboard' do
     visit page_with_token('active_trustedclient')
-    create_and_run_pipeline_in_aproject false
+    create_and_run_pipeline_in_aproject false, 'Two Part Pipeline Template', false
   end
 
   test 'view pipeline with job and see graph' do
@@ -215,25 +201,31 @@ class PipelineInstancesTest < ActionDispatch::IntegrationTest
   end
 
   [
-    ['active', false, false, false],
-    ['active', false, false, true],
-    ['active', true, false, false],
-    ['active', true, true, false],
-    ['active', true, false, true],
-    ['active', true, true, true],
-    ['project_viewer', false, false, true],
-    ['project_viewer', true, false, true],
-    ['project_viewer', true, true, true],
-  ].each do |user, with_options, choose_options, in_aproject|
-    test "Rerun pipeline instance as #{user} using options #{with_options} #{choose_options} in #{in_aproject}" do
+    ['active', false, false, false, 'Two Part Pipeline Template', false],
+    ['active', false, false, true, 'Two Part Pipeline Template', false],
+    ['active', true, false, false, 'Two Part Pipeline Template', false],
+    ['active', true, true, false, 'Two Part Pipeline Template', false],
+    ['active', true, false, true, 'Two Part Pipeline Template', false],
+    ['active', true, true, true, 'Two Part Pipeline Template', false],
+    ['project_viewer', false, false, true, 'Two Part Pipeline Template', false],
+    ['project_viewer', true, false, true, 'Two Part Pipeline Template', false],
+    ['project_viewer', true, true, true, 'Two Part Pipeline Template', false],
+    ['active', false, false, false, 'Two Part Template with dataclass File', true],
+    ['active', false, false, true, 'Two Part Template with dataclass File', true],
+  ].each do |user, with_options, choose_options, in_aproject, template_name, choose_file|
+    test "Rerun pipeline instance as #{user} using options #{with_options} #{choose_options}
+          in #{in_aproject} with #{template_name} with file #{choose_file}" do
       visit page_with_token('active')
 
+      # need bigger modal size when choosing a file from collection
+      Capybara.current_session.driver.browser.manage.window.resize_to(1024, 768)
+
       if in_aproject
         find("#projects-menu").click
         find('.dropdown-menu a,button', text: 'A Project').click
       end
 
-      create_and_run_pipeline_in_aproject in_aproject
+      create_and_run_pipeline_in_aproject in_aproject, template_name, choose_file
       instance_path = current_path
 
       # Pause the pipeline
@@ -283,11 +275,11 @@ class PipelineInstancesTest < ActionDispatch::IntegrationTest
   end
 
   # Create and run a pipeline for 'Two Part Pipeline Template' in 'A Project'
-  def create_and_run_pipeline_in_aproject in_aproject
+  def create_and_run_pipeline_in_aproject in_aproject, template_name, choose_file
     # create a pipeline instance
     find('.btn', text: 'Run a pipeline').click
     within('.modal-dialog') do
-      find('.selectable', text: 'Two Part Pipeline Template').click
+      find('.selectable', text: template_name).click
       find('.btn', text: 'Next: choose inputs').click
     end
 
@@ -309,6 +301,10 @@ class PipelineInstancesTest < ActionDispatch::IntegrationTest
         wait_for_ajax
       end
       first('span', text: 'foo_tag').click
+      if choose_file
+        wait_for_ajax
+        find('.preview-selectable', text: 'foo').click
+      end
       find('button', text: 'OK').click
     end
     wait_for_ajax
@@ -322,9 +318,14 @@ class PipelineInstancesTest < ActionDispatch::IntegrationTest
     click_link 'API response'
     api_response = JSON.parse(find('div#advanced_api_response pre').text)
     input_params = api_response['components']['part-one']['script_parameters']['input']
-    assert_equal input_params['value'], col['portable_data_hash']
-    assert_equal input_params['selection_name'], col['name']
-    assert_equal input_params['selection_uuid'], col['uuid']
+    assert_equal(input_params['selection_uuid'], col['uuid'], "Not found expected input param uuid")
+    if choose_file
+      assert_equal(input_params['value'], col['portable_data_hash']+'/foo', "Not found expected input file param value")
+      assert_equal(input_params['selection_name'], col['name']+'/foo', "Not found expected input file param name")
+    else
+      assert_equal(input_params['value'], col['portable_data_hash'], "Not found expected input param value")
+      assert_equal(input_params['selection_name'], col['name'], "Not found expected input param name")
+    end
 
     # "Run" button present and enabled
     page.assert_no_selector 'a.disabled,button.disabled', text: 'Run'
index 619c346ccceb646b8d8528a6fd773a88d3594e1c..6494bc524f3218b8d07d1e4dca38e03e56224949 100644 (file)
@@ -8,33 +8,76 @@ class ResourceListTest < ActiveSupport::TestCase
     assert_equal [], results.links_for(api_fixture('users')['active']['uuid'])
   end
 
-  test 'links_for returns all link classes (simulated results)' do
-    project_uuid = api_fixture('groups')['aproject']['uuid']
-    specimen_uuid = api_fixture('specimens')['in_aproject']['uuid']
-    api_response = {
-      kind: 'arvados#specimenList',
-      links: [{kind: 'arvados#link',
-                uuid: 'zzzzz-o0j2j-asdfasdfasdfas1',
-                tail_uuid: project_uuid,
-                head_uuid: specimen_uuid,
-                link_class: 'foo',
-                name: 'Bob'},
-              {kind: 'arvados#link',
-                uuid: 'zzzzz-o0j2j-asdfasdfasdfas2',
-                tail_uuid: project_uuid,
-                head_uuid: specimen_uuid,
-                link_class: nil,
-                name: 'Clydesdale'}],
-      items: [{kind: 'arvados#specimen',
-                uuid: specimen_uuid}]
-    }
-    arl = ArvadosResourceList.new
-    arl.results = ArvadosApiClient.new.unpack_api_response(api_response)
-    assert_equal(['foo', nil],
-                 (arl.
-                  links_for(specimen_uuid).
-                  collect { |x| x.link_class }),
-                 "Expected links_for to return all link_classes")
+  test 'get all items by default' do
+    use_token :admin
+    a = 0
+    Collection.where(owner_uuid: 'zzzzz-j7d0g-0201collections').each do
+      a += 1
+    end
+    assert_equal 201, a
+  end
+
+  test 'prefetch all items' do
+    use_token :admin
+    a = 0
+    Collection.where(owner_uuid: 'zzzzz-j7d0g-0201collections').each do
+      a += 1
+    end
+    assert_equal 201, a
+  end
+
+  test 'get limited items' do
+    use_token :admin
+    a = 0
+    Collection.where(owner_uuid: 'zzzzz-j7d0g-0201collections').limit(51).each do
+      a += 1
+    end
+    assert_equal 51, a
+  end
+
+  test 'get limited items, limit % page_size != 0' do
+    skip "Requires server MAX_LIMIT < 200 which is not currently the default"
+
+    use_token :admin
+    max_page_size = Collection.
+      where(owner_uuid: 'zzzzz-j7d0g-0201collections').
+      limit(1000000000).
+      fetch_multiple_pages(false).
+      count
+    # Conditions necessary for this test to be valid:
+    assert_operator 200, :>, max_page_size
+    assert_operator 1, :<, max_page_size
+    # Verify that the server really sends max_page_size when asked for max_page_size+1
+    assert_equal max_page_size, Collection.
+      where(owner_uuid: 'zzzzz-j7d0g-0201collections').
+      limit(max_page_size+1).
+      fetch_multiple_pages(false).
+      results.
+      count
+    # Now that we know the max_page_size+1 is in the middle of page 2,
+    # make sure #each returns page 1 and only the requested part of
+    # page 2.
+    a = 0
+    saw_uuid = {}
+    Collection.where(owner_uuid: 'zzzzz-j7d0g-0201collections').limit(max_page_size+1).each do |item|
+      a += 1
+      saw_uuid[item.uuid] = true
+    end
+    assert_equal max_page_size+1, a
+    # Ensure no overlap between pages
+    assert_equal max_page_size+1, saw_uuid.size
+  end
+
+  test 'get single page of items' do
+    use_token :admin
+    a = 0
+    c = Collection.where(owner_uuid: 'zzzzz-j7d0g-0201collections').fetch_multiple_pages(false)
+    c.each do
+      a += 1
+    end
+
+    assert_operator a, :<, 201
+    assert_equal c.result_limit, a
   end
 
 end
index 3dafc970d34a192a81425aa808196cd8ca61e043..13ae918895d6192553107ae00273e247c6eb4c34 100755 (executable)
@@ -348,6 +348,8 @@ except Exception as e:
     logger.error(pprint.pformat(taskp))
     sys.exit(1)
 
+# rcode holds the return codes produced by each subprocess
+rcode = {}
 try:
     subprocesses = []
     close_streams = []
@@ -389,7 +391,6 @@ try:
 
     active = 1
     pids = set([s.pid for s in subprocesses])
-    rcode = {}
     while len(pids) > 0:
         (pid, status) = os.wait()
         pids.discard(pid)
@@ -434,7 +435,7 @@ else:
     outcollection = robust_put.upload(outdir, logger)
 
 # Success if no non-zero return codes
-success = not any([status != 0 for status in rcode.values()])
+success = any(rcode) and not any([status != 0 for status in rcode.values()])
 
 api.job_tasks().update(uuid=arvados.current_task()['uuid'],
                                      body={
index f94f0932ece43b714bb39474b05ee620119f69f5..e2363ac818dccd3b781ca64f6d1e2adee7156ae4 100644 (file)
@@ -23,9 +23,9 @@ table(table table-bordered table-condensed).
 |_. Attribute|_. Type|_. Description|_. Notes|
 |script|string|The filename of the job script.|This program will be invoked by Crunch for each job task. It is given as a path to an executable file, relative to the @/crunch_scripts@ directory in the Git tree specified by the _repository_ and _script_version_ attributes.|
 |script_parameters|hash|The input parameters for the job.|Conventionally, one of the parameters is called @"input"@. Typically, some parameter values are collection UUIDs. Ultimately, though, the significance of parameters is left entirely up to the script itself.|
-|repository|string|Git repository|Given as the name of a locally hosted git repository.|
+|repository|string|Git repository|Given as the name of a locally hosted Git repository.|
 |script_version|string|Git commit|During a **create** transaction, this is the Git branch, tag, or hash supplied by the client. Before the job starts, Arvados updates it to the full 40-character SHA-1 hash of the commit used by the job.
-See "Script versions":#script_version below for more detail about acceptable ways to specify a commit.|
+See "Specifying Git versions":#script_version below for more detail about acceptable ways to specify a commit.|
 |cancelled_by_client_uuid|string|API client ID|Is null if job has not been cancelled|
 |cancelled_by_user_uuid|string|Authenticated user ID|Is null if job has not been cancelled|
 |cancelled_at|datetime|When job was cancelled|Is null if job has not been cancelled|
@@ -41,17 +41,20 @@ See "Script versions":#script_version below for more detail about acceptable way
 |nondeterministic|boolean|The job is expected to produce different results if run more than once.|If true, this job will not be considered as a candidate for automatic re-use when submitting subsequent identical jobs.|
 |submit_id|string|Unique ID provided by client when job was submitted|Optional. This can be used by a client to make the "jobs.create":{{site.baseurl}}/api/methods/jobs.html#create method idempotent.|
 |priority|string|||
+|arvados_sdk_version|string|Git commit hash that specifies the SDK version to use from the Arvados repository|This is set by searching the Arvados repository for a match for the arvados_sdk_version runtime constraint.|
+|docker_image_locator|string|Portable data hash of the collection that contains the Docker image to use|This is set by searching readable collections for a match for the docker_image runtime constraint.|
 |runtime_constraints|hash|Constraints that must be satisfied by the job/task scheduler in order to run the job.|See below.|
 
-h3(#script_version). Script versions
+h3(#script_version). Specifying Git versions
 
-The @script_version@ attribute is typically given as a branch, tag, or commit hash, but there are many more ways to specify a Git commit. The "specifying revisions" section of the "gitrevisions manual page":http://git-scm.com/docs/gitrevisions.html has a definitive list. Arvados accepts @script_version@ in any format listed there that names a single commit (not a tree, a blob, or a range of commits). However, some kinds of names can be expected to resolve differently in Arvados than they do in your local repository. For example, <code>HEAD@{1}</code> refers to the local reflog, and @origin/master@ typically refers to a remote branch: neither is likely to work as desired if given as a @script_version@.
+The script_version attribute and arvados_sdk_version runtime constraint are typically given as a branch, tag, or commit hash, but there are many more ways to specify a Git commit. The "specifying revisions" section of the "gitrevisions manual page":http://git-scm.com/docs/gitrevisions.html has a definitive list. Arvados accepts Git versions in any format listed there that names a single commit (not a tree, a blob, or a range of commits). However, some kinds of names can be expected to resolve differently in Arvados than they do in your local repository. For example, <code>HEAD@{1}</code> refers to the local reflog, and @origin/master@ typically refers to a remote branch: neither is likely to work as desired if given as a Git version.
 
 h3. Runtime constraints
 
 table(table table-bordered table-condensed).
 |_. Key|_. Type|_. Description|_. Implemented|
-|docker_image|string|The Docker image that this Job needs to run.  If specified, Crunch will create a Docker container from this image, and run the Job's script inside that.  The Keep mount and work directories will be available as volumes inside this container.  The image must be uploaded to Arvados using @arv keep docker@.  You may specify the image in any format that Docker accepts, such as @arvados/jobs@, @debian:latest@, or the Docker image id.  Alternatively, you may specify the UUID of the image Collection, returned by @arv keep docker@.|&#10003;|
+|arvados_sdk_version|string|The Git version of the SDKs to use from the Arvados git repository.  See "Specifying Git versions":#script_version for more detail about acceptable ways to specify a commit.||
+|docker_image|string|The Docker image that this Job needs to run.  If specified, Crunch will create a Docker container from this image, and run the Job's script inside that.  The Keep mount and work directories will be available as volumes inside this container.  The image must be uploaded to Arvados using @arv keep docker@.  You may specify the image in any format that Docker accepts, such as @arvados/jobs@, @debian:latest@, or the Docker image id.  Alternatively, you may specify the UUID or portable data hash of the image Collection, returned by @arv keep docker@.|&#10003;|
 |min_nodes|integer||&#10003;|
 |max_nodes|integer|||
 |min_cores_per_node|integer|Require that each node assigned to this Job have the specified number of CPU cores|&#10003;|
index 6b0af21eb251768d223c1128a31b42bc3f13a749..08880b1b3c6e0829833e94de37f4fe39289cc8cd 100644 (file)
@@ -15,4 +15,3 @@ See the "command line interface":{{site.baseurl}}/user/reference/sdk-cli.html pa
 h3. Subcommands
 
 See the "arv subcommands":{{site.baseurl}}/sdk/cli/subcommands.html page.
-
index 07be3cf84de1576b129c6efcd9b2b80a7cbc8af9..11b79e3774ae3872a37e14a90e98ec8e5f5c65fc 100644 (file)
@@ -97,7 +97,12 @@ h3(#arv-tag). arv tag
 <notextile>
 <pre>
 $ <code class="userinput">arv tag --help</code>
-arvados cli client
+
+Usage:
+arv tag add tag1 [tag2 ...] --object object_uuid1 [object_uuid2...]
+arv tag remove tag1 [tag2 ...] --object object_uuid1 [object_uuid2...]
+arv tag remove --all
+
   --dry-run, -n:   Don't actually do anything
   --verbose, -v:   Print some things on stderr
      --uuid, -u:   Return the UUIDs of the objects in the response, one per
@@ -245,4 +250,3 @@ optional arguments:
                         script_version field of component, default 'master'
 </pre>
 </notextile>
-
index 77affe909384cbefd6e5ea5dc0cae02cf252f5e3..533ea39eb7b27c2d96f391e1bc47c2cb8d599fc9 100755 (executable)
@@ -122,7 +122,7 @@ def check_subcommands client, arvados, subcommand, global_opts, remaining_opts
     arv_create client, arvados, global_opts, remaining_opts
   when 'edit'
     arv_edit client, arvados, global_opts, remaining_opts
-  when 'copy', 'tag', 'ws'
+  when 'copy', 'tag', 'ws', 'run'
     exec `which arv-#{subcommand}`.strip, *remaining_opts
   when 'keep'
     @sub = remaining_opts.shift
@@ -150,12 +150,6 @@ def check_subcommands client, arvados, subcommand, global_opts, remaining_opts
       puts "Available methods: run"
     end
     abort
-  when 'run'
-    exec `which arv-run`.strip, *remaining_opts
-  when 'tag'
-    exec `which arv-tag`.strip, *remaining_opts
-  when 'ws'
-    exec `which arv-ws`.strip, *remaining_opts
   end
 end
 
index 4ee6770f778587f8780eb49a230484496d496f8d..e400dab7dabdc656d6e552ed762194d0bc0b20fb 100755 (executable)
@@ -5,13 +5,17 @@
 #   arv tag remove tag1 [tag2 ...] --object obj_uuid1 [--object obj_uuid2 ...]
 #   arv tag remove tag1 [tag2 ...] --all
 
-def usage
-  abort "Usage:\n" +
-    "arv tag add tag1 [tag2 ...] --objects object_uuid1 [object_uuid2...]\n" +
-    "arv tag remove tag1 [tag2 ...] --objects object_uuid1 [object_uuid2...]\n" +
+def usage_string
+  return "\nUsage:\n" +
+    "arv tag add tag1 [tag2 ...] --object object_uuid1 [object_uuid2...]\n" +
+    "arv tag remove tag1 [tag2 ...] --object object_uuid1 [object_uuid2...]\n" +
     "arv tag remove --all\n"
 end
 
+def usage
+  abort usage_string
+end
+
 def api_call(method, parameters:{}, request_body:{})
   request_body[:api_token] = ENV['ARVADOS_API_TOKEN']
   result = $client.execute(:api_method => method,
@@ -147,7 +151,8 @@ class Google::APIClient
 end
 
 global_opts = Trollop::options do
-  banner "arvados cli client"
+  banner usage_string
+  banner ""
   opt :dry_run, "Don't actually do anything", :short => "-n"
   opt :verbose, "Print some things on stderr", :short => "-v"
   opt :uuid, "Return the UUIDs of the objects in the response, one per line (default)", :short => nil
index e1c25c9e1d82ee5f2f41bbb72182faf5a73b638d..326c2a06ae1a5620af8a388ffb9392357dc7d15e 100644 (file)
@@ -12,7 +12,6 @@ import (
        "log"
        "net/http"
        "regexp"
-       "sort"
        "strings"
        "sync"
        "sync/atomic"
@@ -36,7 +35,7 @@ type KeepClient struct {
        Arvados       *arvadosclient.ArvadosClient
        Want_replicas int
        Using_proxy   bool
-       service_roots *[]string
+       service_roots *map[string]string
        lock          sync.Mutex
        Client        *http.Client
 }
@@ -133,7 +132,7 @@ func (this KeepClient) AuthorizedGet(hash string,
        contentLength int64, url string, err error) {
 
        // Calculate the ordering for asking servers
-       sv := this.shuffledServiceRoots(hash)
+       sv := NewRootSorter(this.ServiceRoots(), hash).GetSortedRoots()
 
        for _, host := range sv {
                var req *http.Request
@@ -175,7 +174,7 @@ func (this KeepClient) Ask(hash string) (contentLength int64, url string, err er
 func (this KeepClient) AuthorizedAsk(hash string, signature string,
        timestamp string) (contentLength int64, url string, err error) {
        // Calculate the ordering for asking servers
-       sv := this.shuffledServiceRoots(hash)
+       sv := NewRootSorter(this.ServiceRoots(), hash).GetSortedRoots()
 
        for _, host := range sv {
                var req *http.Request
@@ -208,20 +207,19 @@ func (this KeepClient) AuthorizedAsk(hash string, signature string,
 }
 
 // Atomically read the service_roots field.
-func (this *KeepClient) ServiceRoots() []string {
-       r := (*[]string)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&this.service_roots))))
+func (this *KeepClient) ServiceRoots() map[string]string {
+       r := (*map[string]string)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&this.service_roots))))
        return *r
 }
 
 // Atomically update the service_roots field.  Enables you to update
 // service_roots without disrupting any GET or PUT operations that might
 // already be in progress.
-func (this *KeepClient) SetServiceRoots(svc []string) {
-       // Must be sorted for ShuffledServiceRoots() to produce consistent
-       // results.
-       roots := make([]string, len(svc))
-       copy(roots, svc)
-       sort.Strings(roots)
+func (this *KeepClient) SetServiceRoots(new_roots map[string]string) {
+       roots := make(map[string]string)
+       for uuid, root := range new_roots {
+               roots[uuid] = root
+       }
        atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(&this.service_roots)),
                unsafe.Pointer(&roots))
 }
index 04be03baa415e3f294b921f7d59d1f65c8bb9ccb..c1088ef8c8540624d5ffd5b74d350f8500d55604 100644 (file)
@@ -87,21 +87,9 @@ func (s *ServerRequiredSuite) TestMakeKeepClient(c *C) {
 
        c.Assert(err, Equals, nil)
        c.Check(len(kc.ServiceRoots()), Equals, 2)
-       c.Check(kc.ServiceRoots()[0], Equals, "http://localhost:25107")
-       c.Check(kc.ServiceRoots()[1], Equals, "http://localhost:25108")
-}
-
-func (s *StandaloneSuite) TestShuffleServiceRoots(c *C) {
-       kc := KeepClient{}
-       kc.SetServiceRoots([]string{"http://localhost:25107", "http://localhost:25108", "http://localhost:25109", "http://localhost:25110", "http://localhost:25111", "http://localhost:25112", "http://localhost:25113", "http://localhost:25114", "http://localhost:25115", "http://localhost:25116", "http://localhost:25117", "http://localhost:25118", "http://localhost:25119", "http://localhost:25120", "http://localhost:25121", "http://localhost:25122", "http://localhost:25123"})
-
-       // "foo" acbd18db4cc2f85cedef654fccc4a4d8
-       foo_shuffle := []string{"http://localhost:25116", "http://localhost:25120", "http://localhost:25119", "http://localhost:25122", "http://localhost:25108", "http://localhost:25114", "http://localhost:25112", "http://localhost:25107", "http://localhost:25118", "http://localhost:25111", "http://localhost:25113", "http://localhost:25121", "http://localhost:25110", "http://localhost:25117", "http://localhost:25109", "http://localhost:25115", "http://localhost:25123"}
-       c.Check(kc.shuffledServiceRoots("acbd18db4cc2f85cedef654fccc4a4d8"), DeepEquals, foo_shuffle)
-
-       // "bar" 37b51d194a7513e45b56f6524f2d51f2
-       bar_shuffle := []string{"http://localhost:25108", "http://localhost:25112", "http://localhost:25119", "http://localhost:25107", "http://localhost:25110", "http://localhost:25116", "http://localhost:25122", "http://localhost:25120", "http://localhost:25121", "http://localhost:25117", "http://localhost:25111", "http://localhost:25123", "http://localhost:25118", "http://localhost:25113", "http://localhost:25114", "http://localhost:25115", "http://localhost:25109"}
-       c.Check(kc.shuffledServiceRoots("37b51d194a7513e45b56f6524f2d51f2"), DeepEquals, bar_shuffle)
+       for _, root := range kc.ServiceRoots() {
+               c.Check(root, Matches, "http://localhost:2510[\\d]")
+       }
 }
 
 type StubPutHandler struct {
@@ -266,26 +254,26 @@ func RunSomeFakeKeepServers(st http.Handler, n int, port int) (ks []KeepServer)
 func (s *StandaloneSuite) TestPutB(c *C) {
        log.Printf("TestPutB")
 
-       hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
+       hash := Md5String("foo")
 
        st := StubPutHandler{
                c,
                hash,
                "abc123",
                "foo",
-               make(chan string, 2)}
+               make(chan string, 5)}
 
        arv, _ := arvadosclient.MakeArvadosClient()
        kc, _ := MakeKeepClient(&arv)
 
        kc.Want_replicas = 2
        arv.ApiToken = "abc123"
-       service_roots := make([]string, 5)
+       service_roots := make(map[string]string)
 
        ks := RunSomeFakeKeepServers(st, 5, 2990)
 
        for i := 0; i < len(ks); i += 1 {
-               service_roots[i] = ks[i].url
+               service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = ks[i].url
                defer ks[i].listener.Close()
        }
 
@@ -293,7 +281,8 @@ func (s *StandaloneSuite) TestPutB(c *C) {
 
        kc.PutB([]byte("foo"))
 
-       shuff := kc.shuffledServiceRoots(fmt.Sprintf("%x", md5.Sum([]byte("foo"))))
+       shuff := NewRootSorter(
+               kc.ServiceRoots(), Md5String("foo")).GetSortedRoots()
 
        s1 := <-st.handled
        s2 := <-st.handled
@@ -315,19 +304,19 @@ func (s *StandaloneSuite) TestPutHR(c *C) {
                hash,
                "abc123",
                "foo",
-               make(chan string, 2)}
+               make(chan string, 5)}
 
        arv, _ := arvadosclient.MakeArvadosClient()
        kc, _ := MakeKeepClient(&arv)
 
        kc.Want_replicas = 2
        arv.ApiToken = "abc123"
-       service_roots := make([]string, 5)
+       service_roots := make(map[string]string)
 
        ks := RunSomeFakeKeepServers(st, 5, 2990)
 
        for i := 0; i < len(ks); i += 1 {
-               service_roots[i] = ks[i].url
+               service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = ks[i].url
                defer ks[i].listener.Close()
        }
 
@@ -342,7 +331,7 @@ func (s *StandaloneSuite) TestPutHR(c *C) {
 
        kc.PutHR(hash, reader, 3)
 
-       shuff := kc.shuffledServiceRoots(hash)
+       shuff := NewRootSorter(kc.ServiceRoots(), hash).GetSortedRoots()
        log.Print(shuff)
 
        s1 := <-st.handled
@@ -366,7 +355,7 @@ func (s *StandaloneSuite) TestPutWithFail(c *C) {
                hash,
                "abc123",
                "foo",
-               make(chan string, 2)}
+               make(chan string, 4)}
 
        fh := FailHandler{
                make(chan string, 1)}
@@ -376,23 +365,24 @@ func (s *StandaloneSuite) TestPutWithFail(c *C) {
 
        kc.Want_replicas = 2
        arv.ApiToken = "abc123"
-       service_roots := make([]string, 5)
+       service_roots := make(map[string]string)
 
        ks1 := RunSomeFakeKeepServers(st, 4, 2990)
        ks2 := RunSomeFakeKeepServers(fh, 1, 2995)
 
        for i, k := range ks1 {
-               service_roots[i] = k.url
+               service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
                defer k.listener.Close()
        }
        for i, k := range ks2 {
-               service_roots[len(ks1)+i] = k.url
+               service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
                defer k.listener.Close()
        }
 
        kc.SetServiceRoots(service_roots)
 
-       shuff := kc.shuffledServiceRoots(fmt.Sprintf("%x", md5.Sum([]byte("foo"))))
+       shuff := NewRootSorter(
+               kc.ServiceRoots(), Md5String("foo")).GetSortedRoots()
 
        phash, replicas, err := kc.PutB([]byte("foo"))
 
@@ -425,29 +415,27 @@ func (s *StandaloneSuite) TestPutWithTooManyFail(c *C) {
 
        kc.Want_replicas = 2
        arv.ApiToken = "abc123"
-       service_roots := make([]string, 5)
+       service_roots := make(map[string]string)
 
        ks1 := RunSomeFakeKeepServers(st, 1, 2990)
        ks2 := RunSomeFakeKeepServers(fh, 4, 2991)
 
        for i, k := range ks1 {
-               service_roots[i] = k.url
+               service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
                defer k.listener.Close()
        }
        for i, k := range ks2 {
-               service_roots[len(ks1)+i] = k.url
+               service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
                defer k.listener.Close()
        }
 
        kc.SetServiceRoots(service_roots)
 
-       shuff := kc.shuffledServiceRoots(fmt.Sprintf("%x", md5.Sum([]byte("foo"))))
-
        _, replicas, err := kc.PutB([]byte("foo"))
 
        c.Check(err, Equals, InsufficientReplicasError)
        c.Check(replicas, Equals, 1)
-       c.Check(<-st.handled, Equals, shuff[1])
+       c.Check(<-st.handled, Matches, ".*2990")
 
        log.Printf("TestPutWithTooManyFail done")
 }
@@ -483,7 +471,7 @@ func (s *StandaloneSuite) TestGet(c *C) {
        arv, err := arvadosclient.MakeArvadosClient()
        kc, _ := MakeKeepClient(&arv)
        arv.ApiToken = "abc123"
-       kc.SetServiceRoots([]string{url})
+       kc.SetServiceRoots(map[string]string{"x":url})
 
        r, n, url2, err := kc.Get(hash)
        defer r.Close()
@@ -509,7 +497,7 @@ func (s *StandaloneSuite) TestGetFail(c *C) {
        arv, err := arvadosclient.MakeArvadosClient()
        kc, _ := MakeKeepClient(&arv)
        arv.ApiToken = "abc123"
-       kc.SetServiceRoots([]string{url})
+       kc.SetServiceRoots(map[string]string{"x":url})
 
        r, n, url2, err := kc.Get(hash)
        c.Check(err, Equals, BlockNotFound)
@@ -539,7 +527,7 @@ func (s *StandaloneSuite) TestChecksum(c *C) {
        arv, err := arvadosclient.MakeArvadosClient()
        kc, _ := MakeKeepClient(&arv)
        arv.ApiToken = "abc123"
-       kc.SetServiceRoots([]string{url})
+       kc.SetServiceRoots(map[string]string{"x":url})
 
        r, n, _, err := kc.Get(barhash)
        _, err = ioutil.ReadAll(r)
@@ -557,58 +545,68 @@ func (s *StandaloneSuite) TestChecksum(c *C) {
 }
 
 func (s *StandaloneSuite) TestGetWithFailures(c *C) {
-
-       hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
+       content := []byte("waz")
+       hash := fmt.Sprintf("%x", md5.Sum(content))
 
        fh := FailHandler{
-               make(chan string, 1)}
+               make(chan string, 4)}
 
        st := StubGetHandler{
                c,
                hash,
                "abc123",
-               []byte("foo")}
+               content}
 
        arv, err := arvadosclient.MakeArvadosClient()
        kc, _ := MakeKeepClient(&arv)
        arv.ApiToken = "abc123"
-       service_roots := make([]string, 5)
+       service_roots := make(map[string]string)
 
        ks1 := RunSomeFakeKeepServers(st, 1, 2990)
        ks2 := RunSomeFakeKeepServers(fh, 4, 2991)
 
        for i, k := range ks1 {
-               service_roots[i] = k.url
+               service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
                defer k.listener.Close()
        }
        for i, k := range ks2 {
-               service_roots[len(ks1)+i] = k.url
+               service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
                defer k.listener.Close()
        }
 
        kc.SetServiceRoots(service_roots)
 
+       // This test works only if one of the failing services is
+       // attempted before the succeeding service. Otherwise,
+       // <-fh.handled below will just hang! (Probe order depends on
+       // the choice of block content "waz" and the UUIDs of the fake
+       // servers, so we just tried different strings until we found
+       // an example that passes this Assert.)
+       c.Assert(NewRootSorter(service_roots, hash).GetSortedRoots()[0], Matches, ".*299[1-4]")
+
        r, n, url2, err := kc.Get(hash)
+
        <-fh.handled
        c.Check(err, Equals, nil)
        c.Check(n, Equals, int64(3))
        c.Check(url2, Equals, fmt.Sprintf("%s/%s", ks1[0].url, hash))
 
-       content, err2 := ioutil.ReadAll(r)
+       read_content, err2 := ioutil.ReadAll(r)
        c.Check(err2, Equals, nil)
-       c.Check(content, DeepEquals, []byte("foo"))
+       c.Check(read_content, DeepEquals, content)
 }
 
 func (s *ServerRequiredSuite) TestPutGetHead(c *C) {
        os.Setenv("ARVADOS_API_HOST", "localhost:3000")
        os.Setenv("ARVADOS_API_TOKEN", "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h")
        os.Setenv("ARVADOS_API_HOST_INSECURE", "true")
+       content := []byte("TestPutGetHead")
 
        arv, err := arvadosclient.MakeArvadosClient()
        kc, err := MakeKeepClient(&arv)
        c.Assert(err, Equals, nil)
 
-       hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
+       hash := fmt.Sprintf("%x", md5.Sum(content))
 
        {
                n, _, err := kc.Ask(hash)
@@ -616,25 +614,25 @@ func (s *ServerRequiredSuite) TestPutGetHead(c *C) {
                c.Check(n, Equals, int64(0))
        }
        {
-               hash2, replicas, err := kc.PutB([]byte("foo"))
-               c.Check(hash2, Equals, fmt.Sprintf("%s+%v", hash, 3))
+               hash2, replicas, err := kc.PutB(content)
+               c.Check(hash2, Equals, fmt.Sprintf("%s+%d", hash, len(content)))
                c.Check(replicas, Equals, 2)
                c.Check(err, Equals, nil)
        }
        {
                r, n, url2, err := kc.Get(hash)
                c.Check(err, Equals, nil)
-               c.Check(n, Equals, int64(3))
+               c.Check(n, Equals, int64(len(content)))
                c.Check(url2, Equals, fmt.Sprintf("http://localhost:25108/%s", hash))
 
-               content, err2 := ioutil.ReadAll(r)
+               read_content, err2 := ioutil.ReadAll(r)
                c.Check(err2, Equals, nil)
-               c.Check(content, DeepEquals, []byte("foo"))
+               c.Check(read_content, DeepEquals, content)
        }
        {
                n, url2, err := kc.Ask(hash)
                c.Check(err, Equals, nil)
-               c.Check(n, Equals, int64(3))
+               c.Check(n, Equals, int64(len(content)))
                c.Check(url2, Equals, fmt.Sprintf("http://localhost:25108/%s", hash))
        }
 }
@@ -659,12 +657,12 @@ func (s *StandaloneSuite) TestPutProxy(c *C) {
        kc.Want_replicas = 2
        kc.Using_proxy = true
        arv.ApiToken = "abc123"
-       service_roots := make([]string, 1)
+       service_roots := make(map[string]string)
 
        ks1 := RunSomeFakeKeepServers(st, 1, 2990)
 
        for i, k := range ks1 {
-               service_roots[i] = k.url
+               service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
                defer k.listener.Close()
        }
 
@@ -690,12 +688,12 @@ func (s *StandaloneSuite) TestPutProxyInsufficientReplicas(c *C) {
        kc.Want_replicas = 3
        kc.Using_proxy = true
        arv.ApiToken = "abc123"
-       service_roots := make([]string, 1)
+       service_roots := make(map[string]string)
 
        ks1 := RunSomeFakeKeepServers(st, 1, 2990)
 
        for i, k := range ks1 {
-               service_roots[i] = k.url
+               service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
                defer k.listener.Close()
        }
        kc.SetServiceRoots(service_roots)
diff --git a/sdk/go/keepclient/root_sorter.go b/sdk/go/keepclient/root_sorter.go
new file mode 100644 (file)
index 0000000..c2780bc
--- /dev/null
@@ -0,0 +1,57 @@
+package keepclient
+
+import (
+       "sort"
+)
+
+type RootSorter struct {
+       root         []string
+       weight       []string
+       order        []int
+}
+
+func NewRootSorter(serviceRoots map[string]string, hash string) (*RootSorter) {
+       rs := new(RootSorter)
+       rs.root = make([]string, len(serviceRoots))
+       rs.weight = make([]string, len(serviceRoots))
+       rs.order = make([]int, len(serviceRoots))
+       i := 0
+       for uuid, root := range serviceRoots {
+               rs.root[i] = root
+               rs.weight[i] = rs.getWeight(hash, uuid)
+               rs.order[i] = i
+               i++
+       }
+       sort.Sort(rs)
+       return rs
+}
+
+func (rs RootSorter) getWeight(hash string, uuid string) (string) {
+       if len(uuid) == 27 {
+               return Md5String(hash + uuid[12:])
+       } else {
+               // Only useful for testing, a set of one service root, etc.
+               return Md5String(hash + uuid)
+       }
+}
+
+func (rs RootSorter) GetSortedRoots() ([]string) {
+       sorted := make([]string, len(rs.order))
+       for i := range rs.order {
+               sorted[i] = rs.root[rs.order[i]]
+       }
+       return sorted
+}
+
+// Less is really More here: the heaviest root will be at the front of the list.
+func (rs RootSorter) Less(i, j int) bool {
+       return rs.weight[rs.order[j]] < rs.weight[rs.order[i]]
+}
+
+func (rs RootSorter) Len() int {
+       return len(rs.order)
+}
+
+func (rs RootSorter) Swap(i, j int) {
+       sort.IntSlice(rs.order).Swap(i, j)
+}
diff --git a/sdk/go/keepclient/root_sorter_test.go b/sdk/go/keepclient/root_sorter_test.go
new file mode 100644 (file)
index 0000000..455715d
--- /dev/null
@@ -0,0 +1,58 @@
+package keepclient
+
+import (
+       "fmt"
+       . "gopkg.in/check.v1"
+       "strconv"
+       "strings"
+)
+
+type RootSorterSuite struct{}
+var _ = Suite(&RootSorterSuite{})
+
+func FakeSvcRoot(i uint64) (string) {
+       return fmt.Sprintf("https://%x.svc/", i)
+}
+
+func FakeSvcUuid(i uint64) (string) {
+       return fmt.Sprintf("zzzzz-bi6l4-%015x", i)
+}
+
+func FakeServiceRoots(n uint64) (map[string]string) {
+       sr := map[string]string{}
+       for i := uint64(0); i < n; i ++ {
+               sr[FakeSvcUuid(i)] = FakeSvcRoot(i)
+       }
+       return sr
+}
+
+func (*RootSorterSuite) EmptyRoots(c *C) {
+       rs := NewRootSorter(map[string]string{}, Md5String("foo"))
+       c.Check(rs.GetSortedRoots(), Equals, []string{})
+}
+
+func (*RootSorterSuite) JustOneRoot(c *C) {
+       rs := NewRootSorter(FakeServiceRoots(1), Md5String("foo"))
+       c.Check(rs.GetSortedRoots(), Equals, []string{FakeSvcRoot(0)})
+}
+
+func (*RootSorterSuite) ReferenceSet(c *C) {
+       fakeroots := FakeServiceRoots(16)
+       // These reference probe orders are explained further in
+       // ../../python/tests/test_keep_client.py:
+       expected_orders := []string{
+               "3eab2d5fc9681074",
+               "097dba52e648f1c3",
+               "c5b4e023f8a7d691",
+               "9d81c02e76a3bf54",
+       }
+       for h, expected_order := range expected_orders {
+               hash := Md5String(fmt.Sprintf("%064x", h))
+               roots := NewRootSorter(fakeroots, hash).GetSortedRoots()
+               for i, svc_id_s := range strings.Split(expected_order, "") {
+                       svc_id, err := strconv.ParseUint(svc_id_s, 16, 64)
+                       c.Assert(err, Equals, nil)
+                       c.Check(roots[i], Equals, FakeSvcRoot(svc_id))
+               }
+       }
+}
index ce15ce91adbab7926c0eca76ece43b52ab051bb2..e12214450c12527f9e447d77025df0d542cb208d 100644 (file)
@@ -2,6 +2,7 @@
 package keepclient
 
 import (
+       "crypto/md5"
        "git.curoverse.com/arvados.git/sdk/go/streamer"
        "errors"
        "fmt"
@@ -10,20 +11,25 @@ import (
        "log"
        "net/http"
        "os"
-       "strconv"
        "strings"
 )
 
 type keepDisk struct {
+       Uuid     string `json:"uuid"`
        Hostname string `json:"service_host"`
        Port     int    `json:"service_port"`
        SSL      bool   `json:"service_ssl_flag"`
        SvcType  string `json:"service_type"`
 }
 
+func Md5String(s string) (string) {
+       return fmt.Sprintf("%x", md5.Sum([]byte(s)))
+}
+
 func (this *KeepClient) DiscoverKeepServers() error {
        if prx := os.Getenv("ARVADOS_KEEP_PROXY"); prx != "" {
-               this.SetServiceRoots([]string{prx})
+               sr := map[string]string{"proxy":prx}
+               this.SetServiceRoots(sr)
                this.Using_proxy = true
                return nil
        }
@@ -42,7 +48,7 @@ func (this *KeepClient) DiscoverKeepServers() error {
        }
 
        listed := make(map[string]bool)
-       service_roots := make([]string, 0, len(m.Items))
+       service_roots := make(map[string]string)
 
        for _, element := range m.Items {
                n := ""
@@ -57,7 +63,7 @@ func (this *KeepClient) DiscoverKeepServers() error {
                // Skip duplicates
                if !listed[url] {
                        listed[url] = true
-                       service_roots = append(service_roots, url)
+                       service_roots[element.Uuid] = url
                }
                if element.SvcType == "proxy" {
                        this.Using_proxy = true
@@ -69,55 +75,6 @@ func (this *KeepClient) DiscoverKeepServers() error {
        return nil
 }
 
-func (this KeepClient) shuffledServiceRoots(hash string) (pseq []string) {
-       // Build an ordering with which to query the Keep servers based on the
-       // contents of the hash.  "hash" is a hex-encoded number at least 8
-       // digits (32 bits) long
-
-       // seed used to calculate the next keep server from 'pool' to be added
-       // to 'pseq'
-       seed := hash
-
-       // Keep servers still to be added to the ordering
-       service_roots := this.ServiceRoots()
-       pool := make([]string, len(service_roots))
-       copy(pool, service_roots)
-
-       // output probe sequence
-       pseq = make([]string, 0, len(service_roots))
-
-       // iterate while there are servers left to be assigned
-       for len(pool) > 0 {
-
-               if len(seed) < 8 {
-                       // ran out of digits in the seed
-                       if len(pseq) < (len(hash) / 4) {
-                               // the number of servers added to the probe
-                               // sequence is less than the number of 4-digit
-                               // slices in 'hash' so refill the seed with the
-                               // last 4 digits.
-                               seed = hash[len(hash)-4:]
-                       }
-                       seed += hash
-               }
-
-               // Take the next 8 digits (32 bytes) and interpret as an integer,
-               // then modulus with the size of the remaining pool to get the next
-               // selected server.
-               probe, _ := strconv.ParseUint(seed[0:8], 16, 32)
-               probe %= uint64(len(pool))
-
-               // Append the selected server to the probe sequence and remove it
-               // from the pool.
-               pseq = append(pseq, pool[probe])
-               pool = append(pool[:probe], pool[probe+1:]...)
-
-               // Remove the digits just used from the seed
-               seed = seed[8:]
-       }
-       return pseq
-}
-
 type uploadStatus struct {
        err             error
        url             string
@@ -189,7 +146,7 @@ func (this KeepClient) putReplicas(
        expectedLength int64) (locator string, replicas int, err error) {
 
        // Calculate the ordering for uploading to servers
-       sv := this.shuffledServiceRoots(hash)
+       sv := NewRootSorter(this.ServiceRoots(), hash).GetSortedRoots()
 
        // The next server to try contacting
        next_server := 0
index 1fd8fa5d2c640b1291cd99c643bf8d90ac5bd287..f4c85969c3904cbba5ecb2e6dfd0b47218555a88 100644 (file)
@@ -21,15 +21,15 @@ import ssl
 import socket
 import requests
 
-_logger = logging.getLogger('arvados.keep')
-global_client_object = None
-
 import arvados
 import arvados.config as config
 import arvados.errors
 import arvados.retry as retry
 import arvados.util
 
+_logger = logging.getLogger('arvados.keep')
+global_client_object = None
+
 class KeepLocator(object):
     EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
     HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
@@ -466,9 +466,12 @@ class KeepClient(object):
                 if not proxy.endswith('/'):
                     proxy += '/'
                 self.api_token = api_token
-                self.service_roots = [proxy]
+                self._keep_services = [{
+                    'uuid': 'proxy',
+                    '_service_root': proxy,
+                    }]
                 self.using_proxy = True
-                self.static_service_roots = True
+                self._static_services_list = True
             else:
                 # It's important to avoid instantiating an API client
                 # unless we actually need one, for testing's sake.
@@ -476,9 +479,9 @@ class KeepClient(object):
                     api_client = arvados.api('v1')
                 self.api_client = api_client
                 self.api_token = api_client.api_token
-                self.service_roots = None
+                self._keep_services = None
                 self.using_proxy = None
-                self.static_service_roots = False
+                self._static_services_list = False
 
     def current_timeout(self):
         """Return the appropriate timeout to use for this client: the proxy
@@ -489,9 +492,9 @@ class KeepClient(object):
         # KeepService, not a KeepClient. See #4488.
         return self.proxy_timeout if self.using_proxy else self.timeout
 
-    def build_service_roots(self, force_rebuild=False):
-        if (self.static_service_roots or
-              (self.service_roots and not force_rebuild)):
+    def build_services_list(self, force_rebuild=False):
+        if (self._static_services_list or
+              (self._keep_services and not force_rebuild)):
             return
         with self.lock:
             try:
@@ -499,68 +502,47 @@ class KeepClient(object):
             except Exception:  # API server predates Keep services.
                 keep_services = self.api_client.keep_disks().list()
 
-            keep_services = keep_services.execute().get('items')
-            if not keep_services:
+            self._keep_services = keep_services.execute().get('items')
+            if not self._keep_services:
                 raise arvados.errors.NoKeepServersError()
 
             self.using_proxy = any(ks.get('service_type') == 'proxy'
-                                   for ks in keep_services)
-
-            roots = ("{}://[{}]:{:d}/".format(
-                        'https' if ks['service_ssl_flag'] else 'http',
-                         ks['service_host'],
-                         ks['service_port'])
-                     for ks in keep_services)
-            self.service_roots = sorted(set(roots))
-            _logger.debug(str(self.service_roots))
-
-    def shuffled_service_roots(self, hash, force_rebuild=False):
-        self.build_service_roots(force_rebuild)
-
-        # Build an ordering with which to query the Keep servers based on the
-        # contents of the hash.
-        # "hash" is a hex-encoded number at least 8 digits
-        # (32 bits) long
-
-        # seed used to calculate the next keep server from 'pool'
-        # to be added to 'pseq'
-        seed = hash
-
-        # Keep servers still to be added to the ordering
-        pool = self.service_roots[:]
-
-        # output probe sequence
-        pseq = []
-
-        # iterate while there are servers left to be assigned
-        while len(pool) > 0:
-            if len(seed) < 8:
-                # ran out of digits in the seed
-                if len(pseq) < len(hash) / 4:
-                    # the number of servers added to the probe sequence is less
-                    # than the number of 4-digit slices in 'hash' so refill the
-                    # seed with the last 4 digits and then append the contents
-                    # of 'hash'.
-                    seed = hash[-4:] + hash
-                else:
-                    # refill the seed with the contents of 'hash'
-                    seed += hash
-
-            # Take the next 8 digits (32 bytes) and interpret as an integer,
-            # then modulus with the size of the remaining pool to get the next
-            # selected server.
-            probe = int(seed[0:8], 16) % len(pool)
-
-            # Append the selected server to the probe sequence and remove it
-            # from the pool.
-            pseq += [pool[probe]]
-            pool = pool[:probe] + pool[probe+1:]
-
-            # Remove the digits just used from the seed
-            seed = seed[8:]
-        _logger.debug(str(pseq))
-        return pseq
+                                   for ks in self._keep_services)
+
+            # Precompute the base URI for each service.
+            for r in self._keep_services:
+                r['_service_root'] = "{}://[{}]:{:d}/".format(
+                    'https' if r['service_ssl_flag'] else 'http',
+                    r['service_host'],
+                    r['service_port'])
+            _logger.debug(str(self._keep_services))
+
+    def _service_weight(self, data_hash, service_uuid):
+        """Compute the weight of a Keep service endpoint for a data
+        block with a known hash.
+
+        The weight is md5(h + u) where u is the last 15 characters of
+        the service endpoint's UUID.
+        """
+        return hashlib.md5(data_hash + service_uuid[-15:]).hexdigest()
 
+    def weighted_service_roots(self, data_hash, force_rebuild=False):
+        """Return an array of Keep service endpoints, in the order in
+        which they should be probed when reading or writing data with
+        the given hash.
+        """
+        self.build_services_list(force_rebuild)
+
+        # Sort the available services by weight (heaviest first) for
+        # this data_hash, and return their service_roots (base URIs)
+        # in that order.
+        sorted_roots = [
+            svc['_service_root'] for svc in sorted(
+                self._keep_services,
+                reverse=True,
+                key=lambda svc: self._service_weight(data_hash, svc['uuid']))]
+        _logger.debug(data_hash + ': ' + str(sorted_roots))
+        return sorted_roots
 
     def map_new_services(self, roots_map, md5_s, force_rebuild, **headers):
         # roots_map is a dictionary, mapping Keep service root strings
@@ -568,7 +550,7 @@ class KeepClient(object):
         # new ones to roots_map.  Return the current list of local
         # root strings.
         headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
-        local_roots = self.shuffled_service_roots(md5_s, force_rebuild)
+        local_roots = self.weighted_service_roots(md5_s, force_rebuild)
         for root in local_roots:
             if root not in roots_map:
                 roots_map[root] = self.KeepService(root, **headers)
index 16244025994a001cba4d988c9ffe221948235098..739c75499509fabb24312bdd6c87a9cb5d8f48e8 100644 (file)
@@ -178,8 +178,18 @@ def run_keep(blob_signing_key=None, enforce_permissions=False):
     for d in api.keep_disks().list().execute()['items']:
         api.keep_disks().delete(uuid=d['uuid']).execute()
 
-    s1 = api.keep_services().create(body={"keep_service": {"service_host": "localhost",  "service_port": 25107, "service_type": "disk"} }).execute()
-    s2 = api.keep_services().create(body={"keep_service": {"service_host": "localhost",  "service_port": 25108, "service_type": "disk"} }).execute()
+    s1 = api.keep_services().create(body={"keep_service": {
+                "uuid": "zzzzz-bi6l4-5bo5n1iekkjyz6b",
+                "service_host": "localhost",
+                "service_port": 25107,
+                "service_type": "disk"
+                }}).execute()
+    s2 = api.keep_services().create(body={"keep_service": {
+                "uuid": "zzzzz-bi6l4-2nz60e0ksj7vr3s",
+                "service_host": "localhost",
+                "service_port": 25108,
+                "service_type": "disk"
+                }}).execute()
     api.keep_disks().create(body={"keep_disk": {"keep_service_uuid": s1["uuid"] } }).execute()
     api.keep_disks().create(body={"keep_disk": {"keep_service_uuid": s2["uuid"] } }).execute()
 
index c724efc1875329f5a7a78fef10af9dfca58a2674..982e4b44a8a1d457667eb6088d92c4f032a6fb82 100644 (file)
@@ -1,5 +1,7 @@
+import hashlib
 import mock
 import os
+import re
 import socket
 import unittest
 import urlparse
@@ -235,8 +237,8 @@ class KeepClientServiceTestCase(unittest.TestCase):
         api_client.keep_services().accessible().execute.return_value = {
             'items_available': len(services),
             'items': [{
-                    'uuid': 'zzzzz-bi6l4-mockservice{:04x}'.format(index),
-                    'owner_uuid': 'zzzzz-tpzed-mockownerabcdef',
+                    'uuid': 'zzzzz-bi6l4-{:015x}'.format(index),
+                    'owner_uuid': 'zzzzz-tpzed-000000000000000',
                     'service_host': host,
                     'service_port': port,
                     'service_ssl_flag': ssl,
@@ -246,10 +248,15 @@ class KeepClientServiceTestCase(unittest.TestCase):
             }
         return api_client
 
+    def mock_n_keep_disks(self, service_count):
+        return self.mock_keep_services(
+            *[("keep0x{:x}".format(index), 80, False, 'disk')
+              for index in range(service_count)])
+
     def get_service_roots(self, *services):
         api_client = self.mock_keep_services(*services)
         keep_client = arvados.KeepClient(api_client=api_client)
-        services = keep_client.shuffled_service_roots('000000')
+        services = keep_client.weighted_service_roots('000000')
         return [urlparse.urlparse(url) for url in sorted(services)]
 
     def test_ssl_flag_respected_in_roots(self):
@@ -323,6 +330,70 @@ class KeepClientServiceTestCase(unittest.TestCase):
                 arvados.KeepClient.DEFAULT_PROXY_TIMEOUT,
                 mock_request.call_args[1]['timeout'])
 
+    def test_probe_order_reference_set(self):
+        # expected_order[i] is the probe order for
+        # hash=md5(sprintf("%064x",i)) where there are 16 services
+        # with uuid sprintf("anything-%015x",j) with j in 0..15. E.g.,
+        # the first probe for the block consisting of 64 "0"
+        # characters is the service whose uuid is
+        # "zzzzz-bi6l4-000000000000003", so expected_order[0][0]=='3'.
+        expected_order = [
+            list('3eab2d5fc9681074'),
+            list('097dba52e648f1c3'),
+            list('c5b4e023f8a7d691'),
+            list('9d81c02e76a3bf54'),
+            ]
+        hashes = [
+            hashlib.md5("{:064x}".format(x)).hexdigest()
+            for x in range(len(expected_order))]
+        api_client = self.mock_n_keep_disks(16)
+        keep_client = arvados.KeepClient(api_client=api_client)
+        for i, hash in enumerate(hashes):
+            roots = keep_client.weighted_service_roots(hash)
+            got_order = [
+                re.search(r'//\[?keep0x([0-9a-f]+)', root).group(1)
+                for root in roots]
+            self.assertEqual(expected_order[i], got_order)
+
+    def test_probe_waste_adding_one_server(self):
+        hashes = [
+            hashlib.md5("{:064x}".format(x)).hexdigest() for x in range(100)]
+        initial_services = 12
+        api_client = self.mock_n_keep_disks(initial_services)
+        keep_client = arvados.KeepClient(api_client=api_client)
+        probes_before = [
+            keep_client.weighted_service_roots(hash) for hash in hashes]
+        for added_services in range(1, 12):
+            api_client = self.mock_n_keep_disks(initial_services+added_services)
+            keep_client = arvados.KeepClient(api_client=api_client)
+            total_penalty = 0
+            for hash_index in range(len(hashes)):
+                probe_after = keep_client.weighted_service_roots(
+                    hashes[hash_index])
+                penalty = probe_after.index(probes_before[hash_index][0])
+                self.assertLessEqual(penalty, added_services)
+                total_penalty += penalty
+            # Average penalty per block should not exceed
+            # N(added)/N(orig) by more than 20%, and should get closer
+            # to the ideal as we add data points.
+            expect_penalty = (
+                added_services *
+                len(hashes) / initial_services)
+            max_penalty = (
+                expect_penalty *
+                (120 - added_services)/100)
+            min_penalty = (
+                expect_penalty * 8/10)
+            self.assertTrue(
+                min_penalty <= total_penalty <= max_penalty,
+                "With {}+{} services, {} blocks, penalty {} but expected {}..{}".format(
+                    initial_services,
+                    added_services,
+                    len(hashes),
+                    total_penalty,
+                    min_penalty,
+                    max_penalty))
+
 
 class KeepClientRetryTestMixin(object):
     # Testing with a local Keep store won't exercise the retry behavior.
index 864e60832f4e8312307fd628ba333ba3e64a2dec..6e01de9213b1d8329451602ff3ea8fdc996e5679 100644 (file)
@@ -2,7 +2,7 @@ class Job < ArvadosModel
   include HasUuid
   include KindAndEtag
   include CommonApiTemplate
-  attr_protected :docker_image_locator
+  attr_protected :arvados_sdk_version, :docker_image_locator
   serialize :script_parameters, Hash
   serialize :runtime_constraints, Hash
   serialize :tasks_summary, Hash
@@ -11,6 +11,7 @@ class Job < ArvadosModel
   before_validation :set_priority
   before_validation :update_state_from_old_state_attrs
   validate :ensure_script_version_is_commit
+  validate :find_arvados_sdk_version
   validate :find_docker_image_locator
   validate :validate_status
   validate :validate_state_change
@@ -45,6 +46,7 @@ class Job < ArvadosModel
     t.add :nondeterministic
     t.add :repository
     t.add :supplied_script_version
+    t.add :arvados_sdk_version
     t.add :docker_image_locator
     t.add :queue_position
     t.add :node_uuids
@@ -149,28 +151,43 @@ class Job < ArvadosModel
     true
   end
 
-  def find_docker_image_locator
-    # Find the Collection that holds the Docker image specified in the
-    # runtime constraints, and store its locator in docker_image_locator.
-    unless runtime_constraints.is_a? Hash
-      # We're still in validation stage, so we can't assume
-      # runtime_constraints isn't something horrible like an array or
-      # a string. Treat those cases as "no docker image supplied";
-      # other validations will fail anyway.
-      self.docker_image_locator = nil
-      return true
+  def resolve_runtime_constraint(key, attr_sym)
+    if ((runtime_constraints.is_a? Hash) and
+        (search = runtime_constraints[key]))
+      ok, result = yield search
+    else
+      ok, result = true, nil
     end
-    image_search = runtime_constraints['docker_image']
-    image_tag = runtime_constraints['docker_image_tag']
-    if image_search.nil?
-      self.docker_image_locator = nil
-      true
-    elsif coll = Collection.for_latest_docker_image(image_search, image_tag)
-      self.docker_image_locator = coll.portable_data_hash
-      true
+    if ok
+      send("#{attr_sym}=".to_sym, result)
     else
-      errors.add(:docker_image_locator, "not found for #{image_search}")
-      false
+      errors.add(attr_sym, result)
+    end
+    ok
+  end
+
+  def find_arvados_sdk_version
+    resolve_runtime_constraint("arvados_sdk_version",
+                               :arvados_sdk_version) do |git_search|
+      commits = Commit.find_commit_range(current_user, "arvados",
+                                         nil, git_search, nil)
+      if commits.andand.any?
+        [true, commits.first]
+      else
+        [false, "#{git_search} does not resolve to a commit"]
+      end
+    end
+  end
+
+  def find_docker_image_locator
+    resolve_runtime_constraint("docker_image",
+                               :docker_image_locator) do |image_search|
+      image_tag = runtime_constraints['docker_image_tag']
+      if coll = Collection.for_latest_docker_image(image_search, image_tag)
+        [true, coll.portable_data_hash]
+      else
+        [false, "not found for #{image_search}"]
+      end
     end
   end
 
diff --git a/services/api/db/migrate/20141111133038_add_arvados_sdk_version_to_jobs.rb b/services/api/db/migrate/20141111133038_add_arvados_sdk_version_to_jobs.rb
new file mode 100644 (file)
index 0000000..214919d
--- /dev/null
@@ -0,0 +1,13 @@
+class AddArvadosSdkVersionToJobs < ActiveRecord::Migration
+  def up
+    change_table :jobs do |t|
+      t.column :arvados_sdk_version, :string
+    end
+  end
+
+  def down
+    change_table :jobs do |t|
+      t.remove :arvados_sdk_version
+    end
+  end
+end
index 48bc57ab97e2e1da8b9eb52e5ca01b987ad2fe56..2eca2caf16abcfec8cde761b639f931e3dc8cb42 100644 (file)
@@ -432,7 +432,8 @@ CREATE TABLE jobs (
     docker_image_locator character varying(255),
     priority integer DEFAULT 0 NOT NULL,
     description text,
-    state character varying(255)
+    state character varying(255),
+    arvados_sdk_version character varying(255)
 );
 
 
@@ -2029,4 +2030,6 @@ INSERT INTO schema_migrations (version) VALUES ('20140918153541');
 
 INSERT INTO schema_migrations (version) VALUES ('20140918153705');
 
-INSERT INTO schema_migrations (version) VALUES ('20140924091559');
\ No newline at end of file
+INSERT INTO schema_migrations (version) VALUES ('20140924091559');
+
+INSERT INTO schema_migrations (version) VALUES ('20141111133038');
\ No newline at end of file
index 8d5a9d21ee20d39bb9673d5e4671dfbdc97e3a2e..3f1a3b223a851f46c171a58d5b182a6790df12f1 100644 (file)
@@ -6,9 +6,12 @@
 #   @where, @filters, @limit, @offset, @orders
 module LoadParam
 
-  # Default limit on number of rows to return in a single query.
+  # Default number of rows to return in a single query.
   DEFAULT_LIMIT = 100
 
+  # Maximum number of rows to return in a single query, even if the client asks for more.
+  MAX_LIMIT = 1000
+
   # Load params[:where] into @where
   def load_where_param
     if params[:where].nil? or params[:where] == ""
@@ -55,7 +58,7 @@ module LoadParam
       unless params[:limit].to_s.match(/^\d+$/)
         raise ArgumentError.new("Invalid value for limit parameter")
       end
-      @limit = params[:limit].to_i
+      @limit = [params[:limit].to_i, MAX_LIMIT].min
     else
       @limit = DEFAULT_LIMIT
     end
index 84ac316d0e593ffbca008fcd57aa4677f5471652..f668cbcd29ec42f39b23233390dfd2ecbd818fa8 100644 (file)
@@ -1,7 +1,7 @@
 keep0:
   uuid: zzzzz-bi6l4-6zhilxar6r8ey90
   owner_uuid: zzzzz-tpzed-d9tiejq69daie8f
-  service_host: keep0.qr1hi.arvadosapi.com
+  service_host: keep0.zzzzz.arvadosapi.com
   service_port: 25107
   service_ssl_flag: false
   service_type: disk
@@ -9,7 +9,7 @@ keep0:
 keep1:
   uuid: zzzzz-bi6l4-rsnj3c76ndxb7o0
   owner_uuid: zzzzz-tpzed-d9tiejq69daie8f
-  service_host: keep1.qr1hi.arvadosapi.com
+  service_host: keep1.zzzzz.arvadosapi.com
   service_port: 25107
   service_ssl_flag: false
   service_type: disk
@@ -17,7 +17,7 @@ keep1:
 proxy:
   uuid: zzzzz-bi6l4-h0a0xwut9qa6g3a
   owner_uuid: zzzzz-tpzed-d9tiejq69daie8f
-  service_host: keep.qr1hi.arvadosapi.com
+  service_host: keep.zzzzz.arvadosapi.com
   service_port: 25333
   service_ssl_flag: true
   service_type: proxy
index 0d9b06a7250051678f3142174c3aa02404c61c03..4d576a25c14ab73f96989564ac0948811071e9b0 100644 (file)
@@ -250,6 +250,20 @@ runningbarbaz_job_readable_by_spectator:
   head_uuid: zzzzz-8i9sb-cjs4pklxxjykyuj
   properties: {}
 
+arvados_repository_readable_by_all_users:
+  uuid: zzzzz-o0j2j-allcanreadarvrp
+  owner_uuid: zzzzz-tpzed-000000000000000
+  created_at: 2014-01-24 20:42:26 -0800
+  modified_by_client_uuid: zzzzz-ozdt8-brczlopd8u8d0jr
+  modified_by_user_uuid: zzzzz-tpzed-000000000000000
+  modified_at: 2014-01-24 20:42:26 -0800
+  updated_at: 2014-01-24 20:42:26 -0800
+  tail_uuid: zzzzz-j7d0g-fffffffffffffff
+  link_class: permission
+  name: can_read
+  head_uuid: zzzzz-s0uqq-arvadosrepo0123
+  properties: {}
+
 foo_repository_readable_by_spectator:
   uuid: zzzzz-o0j2j-cpy7p41hpk5xxx
   owner_uuid: zzzzz-tpzed-000000000000000
index 11557e96d1b27c0a6092beb850a50efd9d44d404..8579cf66a2dfd453206d807639e7052cb328f952 100644 (file)
@@ -127,3 +127,41 @@ pipeline_template_in_fuse_project:
           title: "default input"
           description: "input collection"
 
+template_with_dataclass_file:
+  uuid: zzzzz-p5p6p-k0xoa0ofxrystgw
+  owner_uuid: zzzzz-j7d0g-v955i6s2oi1cbso
+  created_at: 2014-04-14 12:35:04 -0400
+  updated_at: 2014-04-14 12:35:04 -0400
+  modified_at: 2014-04-14 12:35:04 -0400
+  modified_by_client_uuid: zzzzz-ozdt8-brczlopd8u8d0jr
+  modified_by_user_uuid: zzzzz-tpzed-xurymjxw79nv3jz
+  name: Two Part Template with dataclass File
+  components:
+    part-one:
+      script: foo
+      script_version: master
+      script_parameters:
+        input:
+          required: true
+          dataclass: File
+          title: "Foo/bar pair"
+          description: "Provide an input file"
+    part-two:
+      script: bar
+      script_version: master
+      script_parameters:
+        input:
+          output_of: part-one
+        integer_with_default:
+          default: 123
+        integer_with_value:
+          value: 123
+        string_with_default:
+          default: baz
+        string_with_value:
+          value: baz
+        plain_string: qux
+        array_with_default: # important to test repeating values in the array!
+          default: [1,1,2,3,5]
+        array_with_value: # important to test repeating values in the array!
+          value: [1,1,2,3,5]
index e173bf5463216e009ae60c896ec9570c2af763b6..5775f8ac80759be42b0ade7240d1d029a52bd4f5 100644 (file)
@@ -3,6 +3,11 @@ crunch_dispatch_test:
   owner_uuid: zzzzz-tpzed-xurymjxw79nv3jz # active user
   name: crunch_dispatch_test
 
+arvados:
+  uuid: zzzzz-s0uqq-arvadosrepo0123
+  owner_uuid: zzzzz-tpzed-000000000000000 # root
+  name: arvados
+
 foo:
   uuid: zzzzz-s0uqq-382brsig8rp3666
   owner_uuid: zzzzz-tpzed-xurymjxw79nv3jz # active user
index f7f99d1bcfc6d6c614160bc7b94b19af807e7397..ceaebffb2305b2255b556c7c28519606f384910a 100644 (file)
@@ -25,7 +25,8 @@ class Arvados::V1::CommitsControllerTest < ActionController::TestCase
   #test "test_branch1" do
     # complains "fatal: bad object 077ba2ad3ea24a929091a9e6ce545c93199b8e57"
     a = Commit.find_commit_range(users(:active), nil, nil, 'master', nil)
-    assert_equal ['f35f99b7d32bac257f5989df02b9f12ee1a9b0d6', '077ba2ad3ea24a929091a9e6ce545c93199b8e57'], a
+    assert_includes(a, 'f35f99b7d32bac257f5989df02b9f12ee1a9b0d6')
+    assert_includes(a, '077ba2ad3ea24a929091a9e6ce545c93199b8e57')
 
   #test "test_branch2" do
     a = Commit.find_commit_range(users(:active), 'foo', nil, 'b1', nil)
index 654adade5da3626a470dad9f771e0ce635109419..c974076c6fc15610e2e062e3b89dc926375f7355 100644 (file)
@@ -131,35 +131,44 @@ class Arvados::V1::GroupsControllerTest < ActionController::TestCase
     assert_includes ids, collections(:baz_file_in_asubproject).uuid
   end
 
-  test "user with project read permission can sort project collections ascending, ignoring case" do
-    authorize_with :project_viewer
-    get :contents, {
-      id: groups(:asubproject).uuid,
-      format: :json,
-      filters: [['uuid', 'is_a', "arvados#collection"]],
-      order: 'collections.name asc'
-    }
-    sorted_entries = json_response['items'].collect { |item| item["name"].downcase }
-    previous = nil
-    sorted_entries.each do |entry|
-      assert_operator( previous, :<=, entry) if previous
-      previous = entry
-    end
-  end
-
-  test "user with project read permission can sort project collections descending, ignoring case" do
-    authorize_with :project_viewer
-    get :contents, {
-      id: groups(:asubproject).uuid,
-      format: :json,
-      filters: [['uuid', 'is_a', "arvados#collection"]],
-      order: 'collections.name desc'
-    }
-    sorted_entries = json_response['items'].collect { |item| item["name"].downcase }
-    previous = nil
-    sorted_entries.each do |entry|
-      assert_operator( previous, :>=, entry) if previous
-      previous = entry
+  [['asc', :<=],
+   ['desc', :>=]].each do |order, operator|
+    test "user with project read permission can sort project collections #{order}" do
+      authorize_with :project_viewer
+      get :contents, {
+        id: groups(:asubproject).uuid,
+        format: :json,
+        filters: [['uuid', 'is_a', "arvados#collection"]],
+        order: "collections.name #{order}"
+      }
+      sorted_names = json_response['items'].collect { |item| item["name"] }
+      # Here we avoid assuming too much about the database
+      # collation. Both "alice"<"Bob" and "alice">"Bob" can be
+      # correct. Hopefully it _is_ safe to assume that if "a" comes
+      # before "b" in the ascii alphabet, "aX">"bY" is never true for
+      # any strings X and Y.
+      reliably_sortable_names = sorted_names.select do |name|
+        name[0] >= 'a' and name[0] <= 'z'
+      end.uniq do |name|
+        name[0]
+      end
+      # Preserve order of sorted_names. But do not use &=. If
+      # sorted_names has out-of-order duplicates, we want to preserve
+      # them here, so we can detect them and fail the test below.
+      sorted_names.select! do |name|
+        reliably_sortable_names.include? name
+      end
+      actually_checked_anything = false
+      previous = nil
+      sorted_names.each do |entry|
+        if previous
+          assert_operator(previous, operator, entry,
+                          "Entries sorted incorrectly.")
+          actually_checked_anything = true
+        end
+        previous = entry
+      end
+      assert actually_checked_anything, "Didn't even find two names to compare."
     end
   end
 
index 0793d12e4e5103c0298f6b96122a9558d489677d..5304bcafc531bc1138c8c17071fd2793d71aff1c 100644 (file)
@@ -43,17 +43,18 @@ class Arvados::V1::RepositoriesControllerTest < ActionController::TestCase
   end
 
   test "get_all_permissions does not give any access to user without permission" do
+    viewer_uuid = users(:project_viewer).uuid
+    assert_equal(authorized_keys(:project_viewer).authorized_user_uuid,
+                 viewer_uuid,
+                 "project_viewer must have an authorized_key for this test to work")
     authorize_with :admin
     get :get_all_permissions
     assert_response :success
-    assert_equal(authorized_keys(:project_viewer).authorized_user_uuid,
-                 users(:project_viewer).uuid,
-                 "project_viewer must have an authorized_key for this test to work")
-    json_response['repositories'].each do |repo|
-      assert_equal(false,
-                   repo['user_permissions'].has_key?(users(:project_viewer).uuid),
-                   "project_viewer user should not have perms for #{repo['uuid']}")
+    readable_repos = json_response["repositories"].select do |repo|
+      repo["user_permissions"].has_key?(viewer_uuid)
     end
+    assert_equal(["arvados"], readable_repos.map { |r| r["name"] },
+                 "project_viewer should only have permissions on public repos")
   end
 
   test "get_all_permissions gives gitolite R to user with read-only access" do
index 39e506f54585da3953a336442e1d18365fa90f2a..67e99c18dcfb25340153ace9e4d29674af17bb16 100644 (file)
@@ -15,15 +15,13 @@ module GitTestHelper
   def self.included base
     base.setup do
       @tmpdir = Dir.mktmpdir()
-      `cp test/test.git.tar #{@tmpdir} && cd #{@tmpdir} && tar xf test.git.tar`
-      @orig_git_repositories_dir = Rails.configuration.git_repositories_dir
+      system("tar", "-xC", @tmpdir, "-f", "test/test.git.tar")
       Rails.configuration.git_repositories_dir = "#{@tmpdir}/test"
       Commit.refresh_repositories
     end
 
     base.teardown do
       FileUtils.remove_entry @tmpdir, true
-      Rails.configuration.git_repositories_dir = @orig_git_repositories_dir
       Commit.refresh_repositories
     end
   end
index d4155c2c4a6e39b88aa22b5100ec458634c70bfe..aacda517f24b8e5433f599f9e86306b3c7aaad46 100644 (file)
@@ -6,20 +6,23 @@ class KeepProxyTest < ActionDispatch::IntegrationTest
     assert_response :success
     services = json_response['items']
 
-    assert_equal 2, services.length
-    assert_equal 'disk', services[0]['service_type']
-    assert_equal 'disk', services[1]['service_type']
+    assert_operator 2, :<=, services.length
+    services.each do |service|
+      assert_equal 'disk', service['service_type']
+    end
+  end
 
+  test "request keep proxy" do
     get "/arvados/v1/keep_services/accessible", {:format => :json}, auth(:active).merge({'HTTP_X_EXTERNAL_CLIENT' => '1'})
     assert_response :success
     services = json_response['items']
 
     assert_equal 1, services.length
 
-    assert_equal "zzzzz-bi6l4-h0a0xwut9qa6g3a", services[0]['uuid']
-    assert_equal "keep.qr1hi.arvadosapi.com", services[0]['service_host']
-    assert_equal 25333, services[0]['service_port']
-    assert_equal true, services[0]['service_ssl_flag']
+    assert_equal keep_services(:proxy).uuid, services[0]['uuid']
+    assert_equal keep_services(:proxy).service_host, services[0]['service_host']
+    assert_equal keep_services(:proxy).service_port, services[0]['service_port']
+    assert_equal keep_services(:proxy).service_ssl_flag, services[0]['service_ssl_flag']
     assert_equal 'proxy', services[0]['service_type']
   end
 end
index 6845160c47215c4d021bc57a6cf0182a2c74a114..ae466016a3ab658bf133bcb3d5f0ce9358900a05 100644 (file)
Binary files a/services/api/test/test.git.tar and b/services/api/test/test.git.tar differ
index cf1e26c17fe37bd9a1ef7c65a8e54b0d01966af8..32f9e3e9c9296d80cbe2c21c47fe0a1dde2fb4bf 100644 (file)
@@ -110,24 +110,35 @@ class JobTest < ActiveSupport::TestCase
     assert_equal(image_uuid, job.docker_image_locator)
   end
 
-  test "can't create Job with Docker image locator" do
+  def check_attrs_unset(job, attrs)
+    assert_empty(attrs.each_key.map { |key| job.send(key) }.compact,
+                 "job has values for #{attrs.keys}")
+  end
+
+  def check_creation_prohibited(attrs)
     begin
-      job = Job.new job_attrs(docker_image_locator: BAD_COLLECTION)
+      job = Job.new(job_attrs(attrs))
     rescue ActiveModel::MassAssignmentSecurity::Error
       # Test passes - expected attribute protection
     else
-      assert_nil job.docker_image_locator
+      check_attrs_unset(job, attrs)
     end
   end
 
-  test "can't assign Docker image locator to Job" do
-    job = Job.new job_attrs
-    begin
-      Job.docker_image_locator = BAD_COLLECTION
-    rescue NoMethodError
-      # Test passes - expected attribute protection
+  def check_modification_prohibited(attrs)
+    job = Job.new(job_attrs)
+    attrs.each_pair do |key, value|
+      assert_raises(NoMethodError) { job.send("{key}=".to_sym, value) }
     end
-    assert_nil job.docker_image_locator
+    check_attrs_unset(job, attrs)
+  end
+
+  test "can't create Job with Docker image locator" do
+    check_creation_prohibited(docker_image_locator: BAD_COLLECTION)
+  end
+
+  test "can't assign Docker image locator to Job" do
+    check_modification_prohibited(docker_image_locator: BAD_COLLECTION)
   end
 
   [
@@ -281,4 +292,57 @@ class JobTest < ActiveSupport::TestCase
     assert_not_equal job1.queue_position, job2.queue_position
   end
 
+  SDK_MASTER = "ca68b24e51992e790f29df5cc4bc54ce1da4a1c2"
+  SDK_TAGGED = "00634b2b8a492d6f121e3cf1d6587b821136a9a7"
+
+  def sdk_constraint(version)
+    {runtime_constraints: {"arvados_sdk_version" => version}}
+  end
+
+  def check_job_sdk_version(expected)
+    job = yield
+    if expected.nil?
+      refute(job.valid?, "job valid with bad Arvados SDK version")
+    else
+      assert(job.valid?, "job not valid with good Arvados SDK version")
+      assert_equal(expected, job.arvados_sdk_version)
+    end
+  end
+
+  { "master" => SDK_MASTER,
+    "commit2" => SDK_TAGGED,
+    SDK_TAGGED[0, 8] => SDK_TAGGED,
+    "__nonexistent__" => nil,
+  }.each_pair do |search, commit_hash|
+    test "creating job with SDK version '#{search}'" do
+      check_job_sdk_version(commit_hash) do
+        Job.new(job_attrs(sdk_constraint(search)))
+      end
+    end
+
+    test "updating job with SDK version '#{search}'" do
+      job = Job.create!(job_attrs)
+      assert_nil job.arvados_sdk_version
+      check_job_sdk_version(commit_hash) do
+        job.runtime_constraints = sdk_constraint(search)[:runtime_constraints]
+        job
+      end
+    end
+  end
+
+  test "clear the SDK version" do
+    job = Job.create!(job_attrs(sdk_constraint("master")))
+    assert_equal(SDK_MASTER, job.arvados_sdk_version)
+    job.runtime_constraints = {}
+    assert(job.valid?, "job invalid after clearing SDK version")
+    assert_nil(job.arvados_sdk_version)
+  end
+
+  test "can't create job with SDK version assigned directly" do
+    check_creation_prohibited(arvados_sdk_version: SDK_MASTER)
+  end
+
+  test "can't modify job to assign SDK version directly" do
+    check_modification_prohibited(arvados_sdk_version: SDK_MASTER)
+  end
 end
index f6d163c1f19fe7449adb764f7d98f728438b3fa8..88ac8a6a1dc1ec3c4e6cbb055ed40faaa9c63cc7 100644 (file)
@@ -143,11 +143,14 @@ func runProxy(c *C, args []string, token string, port int) keepclient.KeepClient
        os.Setenv("ARVADOS_KEEP_PROXY", fmt.Sprintf("http://localhost:%v", port))
        os.Setenv("ARVADOS_API_TOKEN", token)
        arv, err := arvadosclient.MakeArvadosClient()
+       c.Assert(err, Equals, nil)
        kc, err := keepclient.MakeKeepClient(&arv)
+       c.Assert(err, Equals, nil)
        c.Check(kc.Using_proxy, Equals, true)
        c.Check(len(kc.ServiceRoots()), Equals, 1)
-       c.Check(kc.ServiceRoots()[0], Equals, fmt.Sprintf("http://localhost:%v", port))
-       c.Check(err, Equals, nil)
+       for _, root := range(kc.ServiceRoots()) {
+               c.Check(root, Equals, fmt.Sprintf("http://localhost:%v", port))
+       }
        os.Setenv("ARVADOS_KEEP_PROXY", "")
        log.Print("keepclient created")
        return kc
@@ -165,12 +168,15 @@ func (s *ServerRequiredSuite) TestPutAskGet(c *C) {
 
        os.Setenv("ARVADOS_EXTERNAL_CLIENT", "true")
        arv, err := arvadosclient.MakeArvadosClient()
+       c.Assert(err, Equals, nil)
        kc, err := keepclient.MakeKeepClient(&arv)
+       c.Assert(err, Equals, nil)
        c.Check(kc.Arvados.External, Equals, true)
        c.Check(kc.Using_proxy, Equals, true)
        c.Check(len(kc.ServiceRoots()), Equals, 1)
-       c.Check(kc.ServiceRoots()[0], Equals, "http://localhost:29950")
-       c.Check(err, Equals, nil)
+       for _, root := range kc.ServiceRoots() {
+               c.Check(root, Equals, "http://localhost:29950")
+       }
        os.Setenv("ARVADOS_EXTERNAL_CLIENT", "")
        log.Print("keepclient created")
 
index 63effe9d37bd1150380c5723f02ca15db7028c0b..4955992faa4d7cc2da43e2d39b940615a1c63710 100644 (file)
@@ -2,16 +2,9 @@
 
 from __future__ import absolute_import, print_function
 
-import functools
 import itertools
-import logging
 import time
 
-import pykka
-
-from ..clientactor import _notify_subscribers
-from .. import config
-
 def arvados_node_fqdn(arvados_node, default_hostname='dynamic.compute'):
     hostname = arvados_node.get('hostname') or default_hostname
     return '{}.{}'.format(hostname, arvados_node['domain'])
@@ -23,252 +16,6 @@ def arvados_node_mtime(node):
 def timestamp_fresh(timestamp, fresh_time):
     return (time.time() - timestamp) < fresh_time
 
-class BaseComputeNodeDriver(object):
-    """Abstract base class for compute node drivers.
-
-    libcloud abstracts away many of the differences between cloud providers,
-    but managing compute nodes requires some cloud-specific features (e.g.,
-    on EC2 we use tags to identify compute nodes).  Compute node drivers
-    are responsible for translating the node manager's cloud requests to a
-    specific cloud's vocabulary.
-
-    Subclasses must implement arvados_create_kwargs (to update node
-    creation kwargs with information about the specific Arvados node
-    record), sync_node, and node_start_time.
-    """
-    def __init__(self, auth_kwargs, list_kwargs, create_kwargs, driver_class):
-        self.real = driver_class(**auth_kwargs)
-        self.list_kwargs = list_kwargs
-        self.create_kwargs = create_kwargs
-
-    def __getattr__(self, name):
-        # Proxy non-extension methods to the real driver.
-        if (not name.startswith('_') and not name.startswith('ex_')
-              and hasattr(self.real, name)):
-            return getattr(self.real, name)
-        else:
-            return super(BaseComputeNodeDriver, self).__getattr__(name)
-
-    def search_for(self, term, list_method, key=lambda item: item.id):
-        cache_key = (list_method, term)
-        if cache_key not in self.SEARCH_CACHE:
-            results = [item for item in getattr(self.real, list_method)()
-                       if key(item) == term]
-            count = len(results)
-            if count != 1:
-                raise ValueError("{} returned {} results for '{}'".format(
-                        list_method, count, term))
-            self.SEARCH_CACHE[cache_key] = results[0]
-        return self.SEARCH_CACHE[cache_key]
-
-    def list_nodes(self):
-        return self.real.list_nodes(**self.list_kwargs)
-
-    def arvados_create_kwargs(self, arvados_node):
-        raise NotImplementedError("BaseComputeNodeDriver.arvados_create_kwargs")
-
-    def create_node(self, size, arvados_node):
-        kwargs = self.create_kwargs.copy()
-        kwargs.update(self.arvados_create_kwargs(arvados_node))
-        kwargs['size'] = size
-        return self.real.create_node(**kwargs)
-
-    def sync_node(self, cloud_node, arvados_node):
-        # When a compute node first pings the API server, the API server
-        # will automatically assign some attributes on the corresponding
-        # node record, like hostname.  This method should propagate that
-        # information back to the cloud node appropriately.
-        raise NotImplementedError("BaseComputeNodeDriver.sync_node")
-
-    @classmethod
-    def node_start_time(cls, node):
-        raise NotImplementedError("BaseComputeNodeDriver.node_start_time")
-
-
-ComputeNodeDriverClass = BaseComputeNodeDriver
-
-class ComputeNodeStateChangeBase(config.actor_class):
-    """Base class for actors that change a compute node's state.
-
-    This base class takes care of retrying changes and notifying
-    subscribers when the change is finished.
-    """
-    def __init__(self, logger_name, timer_actor, retry_wait, max_retry_wait):
-        super(ComputeNodeStateChangeBase, self).__init__()
-        self._later = self.actor_ref.proxy()
-        self._timer = timer_actor
-        self._logger = logging.getLogger(logger_name)
-        self.min_retry_wait = retry_wait
-        self.max_retry_wait = max_retry_wait
-        self.retry_wait = retry_wait
-        self.subscribers = set()
-
-    @staticmethod
-    def _retry(errors):
-        """Retry decorator for an actor method that makes remote requests.
-
-        Use this function to decorator an actor method, and pass in a
-        tuple of exceptions to catch.  This decorator will schedule
-        retries of that method with exponential backoff if the
-        original method raises any of the given errors.
-        """
-        def decorator(orig_func):
-            @functools.wraps(orig_func)
-            def wrapper(self, *args, **kwargs):
-                try:
-                    orig_func(self, *args, **kwargs)
-                except errors as error:
-                    self._logger.warning(
-                        "Client error: %s - waiting %s seconds",
-                        error, self.retry_wait)
-                    self._timer.schedule(self.retry_wait,
-                                         getattr(self._later,
-                                                 orig_func.__name__),
-                                         *args, **kwargs)
-                    self.retry_wait = min(self.retry_wait * 2,
-                                          self.max_retry_wait)
-                else:
-                    self.retry_wait = self.min_retry_wait
-            return wrapper
-        return decorator
-
-    def _finished(self):
-        _notify_subscribers(self._later, self.subscribers)
-        self.subscribers = None
-
-    def subscribe(self, subscriber):
-        if self.subscribers is None:
-            try:
-                subscriber(self._later)
-            except pykka.ActorDeadError:
-                pass
-        else:
-            self.subscribers.add(subscriber)
-
-
-class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
-    """Actor to create and set up a cloud compute node.
-
-    This actor prepares an Arvados node record for a new compute node
-    (either creating one or cleaning one passed in), then boots the
-    actual compute node.  It notifies subscribers when the cloud node
-    is successfully created (the last step in the process for Node
-    Manager to handle).
-    """
-    def __init__(self, timer_actor, arvados_client, cloud_client,
-                 cloud_size, arvados_node=None,
-                 retry_wait=1, max_retry_wait=180):
-        super(ComputeNodeSetupActor, self).__init__(
-            'arvnodeman.nodeup', timer_actor, retry_wait, max_retry_wait)
-        self._arvados = arvados_client
-        self._cloud = cloud_client
-        self.cloud_size = cloud_size
-        self.arvados_node = None
-        self.cloud_node = None
-        if arvados_node is None:
-            self._later.create_arvados_node()
-        else:
-            self._later.prepare_arvados_node(arvados_node)
-
-    @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
-    def create_arvados_node(self):
-        self.arvados_node = self._arvados.nodes().create(body={}).execute()
-        self._later.create_cloud_node()
-
-    @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
-    def prepare_arvados_node(self, node):
-        self.arvados_node = self._arvados.nodes().update(
-            uuid=node['uuid'],
-            body={'hostname': None,
-                  'ip_address': None,
-                  'slot_number': None,
-                  'first_ping_at': None,
-                  'last_ping_at': None,
-                  'info': {'ec2_instance_id': None,
-                           'last_action': "Prepared by Node Manager"}}
-            ).execute()
-        self._later.create_cloud_node()
-
-    @ComputeNodeStateChangeBase._retry(config.CLOUD_ERRORS)
-    def create_cloud_node(self):
-        self._logger.info("Creating cloud node with size %s.",
-                          self.cloud_size.name)
-        self.cloud_node = self._cloud.create_node(self.cloud_size,
-                                                  self.arvados_node)
-        self._logger.info("Cloud node %s created.", self.cloud_node.id)
-        self._finished()
-
-    def stop_if_no_cloud_node(self):
-        if self.cloud_node is None:
-            self.stop()
-
-
-class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
-    """Actor to shut down a compute node.
-
-    This actor simply destroys a cloud node, retrying as needed.
-    """
-    def __init__(self, timer_actor, cloud_client, cloud_node,
-                 retry_wait=1, max_retry_wait=180):
-        super(ComputeNodeShutdownActor, self).__init__(
-            'arvnodeman.nodedown', timer_actor, retry_wait, max_retry_wait)
-        self._cloud = cloud_client
-        self.cloud_node = cloud_node
-        self._later.shutdown_node()
-
-    @ComputeNodeStateChangeBase._retry(config.CLOUD_ERRORS)
-    def shutdown_node(self):
-        self._cloud.destroy_node(self.cloud_node)
-        self._logger.info("Cloud node %s shut down.", self.cloud_node.id)
-        self._finished()
-
-
-class ComputeNodeUpdateActor(config.actor_class):
-    """Actor to dispatch one-off cloud management requests.
-
-    This actor receives requests for small cloud updates, and
-    dispatches them to a real driver.  ComputeNodeMonitorActors use
-    this to perform maintenance tasks on themselves.  Having a
-    dedicated actor for this gives us the opportunity to control the
-    flow of requests; e.g., by backing off when errors occur.
-
-    This actor is most like a "traditional" Pykka actor: there's no
-    subscribing, but instead methods return real driver results.  If
-    you're interested in those results, you should get them from the
-    Future that the proxy method returns.  Be prepared to handle exceptions
-    from the cloud driver when you do.
-    """
-    def __init__(self, cloud_factory, max_retry_wait=180):
-        super(ComputeNodeUpdateActor, self).__init__()
-        self._cloud = cloud_factory()
-        self.max_retry_wait = max_retry_wait
-        self.error_streak = 0
-        self.next_request_time = time.time()
-
-    def _throttle_errors(orig_func):
-        @functools.wraps(orig_func)
-        def wrapper(self, *args, **kwargs):
-            throttle_time = self.next_request_time - time.time()
-            if throttle_time > 0:
-                time.sleep(throttle_time)
-            self.next_request_time = time.time()
-            try:
-                result = orig_func(self, *args, **kwargs)
-            except config.CLOUD_ERRORS:
-                self.error_streak += 1
-                self.next_request_time += min(2 ** self.error_streak,
-                                              self.max_retry_wait)
-                raise
-            else:
-                self.error_streak = 0
-                return result
-        return wrapper
-
-    @_throttle_errors
-    def sync_node(self, cloud_node, arvados_node):
-        return self._cloud.sync_node(cloud_node, arvados_node)
-
-
 class ShutdownTimer(object):
     """Keep track of a cloud node's shutdown windows.
 
@@ -309,103 +56,3 @@ class ShutdownTimer(object):
     def window_open(self):
         self._advance_opening()
         return 0 < (time.time() - self._open_start) < self._open_for
-
-
-class ComputeNodeMonitorActor(config.actor_class):
-    """Actor to manage a running compute node.
-
-    This actor gets updates about a compute node's cloud and Arvados records.
-    It uses this information to notify subscribers when the node is eligible
-    for shutdown.
-    """
-    def __init__(self, cloud_node, cloud_node_start_time, shutdown_timer,
-                 timer_actor, update_actor, arvados_node=None,
-                 poll_stale_after=600, node_stale_after=3600):
-        super(ComputeNodeMonitorActor, self).__init__()
-        self._later = self.actor_ref.proxy()
-        self._logger = logging.getLogger('arvnodeman.computenode')
-        self._last_log = None
-        self._shutdowns = shutdown_timer
-        self._timer = timer_actor
-        self._update = update_actor
-        self.cloud_node = cloud_node
-        self.cloud_node_start_time = cloud_node_start_time
-        self.poll_stale_after = poll_stale_after
-        self.node_stale_after = node_stale_after
-        self.subscribers = set()
-        self.arvados_node = None
-        self._later.update_arvados_node(arvados_node)
-        self.last_shutdown_opening = None
-        self._later.consider_shutdown()
-
-    def subscribe(self, subscriber):
-        self.subscribers.add(subscriber)
-
-    def _debug(self, msg, *args):
-        if msg == self._last_log:
-            return
-        self._last_log = msg
-        self._logger.debug(msg, *args)
-
-    def in_state(self, *states):
-        # Return a boolean to say whether or not our Arvados node record is in
-        # one of the given states.  If state information is not
-        # available--because this node has no Arvados record, the record is
-        # stale, or the record has no state information--return None.
-        if (self.arvados_node is None) or not timestamp_fresh(
-              arvados_node_mtime(self.arvados_node), self.node_stale_after):
-            return None
-        state = self.arvados_node['info'].get('slurm_state')
-        if not state:
-            return None
-        result = state in states
-        if state == 'idle':
-            result = result and not self.arvados_node['job_uuid']
-        return result
-
-    def _shutdown_eligible(self):
-        if self.arvados_node is None:
-            # If this is a new, unpaired node, it's eligible for
-            # shutdown--we figure there was an error during bootstrap.
-            return timestamp_fresh(self.cloud_node_start_time,
-                                   self.node_stale_after)
-        else:
-            return self.in_state('idle')
-
-    def consider_shutdown(self):
-        next_opening = self._shutdowns.next_opening()
-        if self._shutdowns.window_open():
-            if self._shutdown_eligible():
-                self._debug("Node %s suggesting shutdown.", self.cloud_node.id)
-                _notify_subscribers(self._later, self.subscribers)
-            else:
-                self._debug("Node %s shutdown window open but node busy.",
-                            self.cloud_node.id)
-        else:
-            self._debug("Node %s shutdown window closed.  Next at %s.",
-                        self.cloud_node.id, time.ctime(next_opening))
-        if self.last_shutdown_opening != next_opening:
-            self._timer.schedule(next_opening, self._later.consider_shutdown)
-            self.last_shutdown_opening = next_opening
-
-    def offer_arvados_pair(self, arvados_node):
-        if self.arvados_node is not None:
-            return None
-        elif arvados_node['ip_address'] in self.cloud_node.private_ips:
-            self._later.update_arvados_node(arvados_node)
-            return self.cloud_node.id
-        else:
-            return None
-
-    def update_cloud_node(self, cloud_node):
-        if cloud_node is not None:
-            self.cloud_node = cloud_node
-            self._later.consider_shutdown()
-
-    def update_arvados_node(self, arvados_node):
-        if arvados_node is not None:
-            self.arvados_node = arvados_node
-            new_hostname = arvados_node_fqdn(self.arvados_node)
-            if new_hostname != self.cloud_node.name:
-                self._update.sync_node(self.cloud_node, self.arvados_node)
-            self._later.consider_shutdown()
diff --git a/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py b/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py
new file mode 100644 (file)
index 0000000..d613ef1
--- /dev/null
@@ -0,0 +1,294 @@
+#!/usr/bin/env python
+
+from __future__ import absolute_import, print_function
+
+import functools
+import logging
+import time
+
+import pykka
+
+from .. import arvados_node_fqdn, arvados_node_mtime, timestamp_fresh
+from ...clientactor import _notify_subscribers
+from ... import config
+
+class ComputeNodeStateChangeBase(config.actor_class):
+    """Base class for actors that change a compute node's state.
+
+    This base class takes care of retrying changes and notifying
+    subscribers when the change is finished.
+    """
+    def __init__(self, logger_name, timer_actor, retry_wait, max_retry_wait):
+        super(ComputeNodeStateChangeBase, self).__init__()
+        self._later = self.actor_ref.proxy()
+        self._timer = timer_actor
+        self._logger = logging.getLogger(logger_name)
+        self.min_retry_wait = retry_wait
+        self.max_retry_wait = max_retry_wait
+        self.retry_wait = retry_wait
+        self.subscribers = set()
+
+    @staticmethod
+    def _retry(errors):
+        """Retry decorator for an actor method that makes remote requests.
+
+        Use this function to decorator an actor method, and pass in a
+        tuple of exceptions to catch.  This decorator will schedule
+        retries of that method with exponential backoff if the
+        original method raises any of the given errors.
+        """
+        def decorator(orig_func):
+            @functools.wraps(orig_func)
+            def wrapper(self, *args, **kwargs):
+                try:
+                    orig_func(self, *args, **kwargs)
+                except errors as error:
+                    self._logger.warning(
+                        "Client error: %s - waiting %s seconds",
+                        error, self.retry_wait)
+                    self._timer.schedule(self.retry_wait,
+                                         getattr(self._later,
+                                                 orig_func.__name__),
+                                         *args, **kwargs)
+                    self.retry_wait = min(self.retry_wait * 2,
+                                          self.max_retry_wait)
+                else:
+                    self.retry_wait = self.min_retry_wait
+            return wrapper
+        return decorator
+
+    def _finished(self):
+        _notify_subscribers(self._later, self.subscribers)
+        self.subscribers = None
+
+    def subscribe(self, subscriber):
+        if self.subscribers is None:
+            try:
+                subscriber(self._later)
+            except pykka.ActorDeadError:
+                pass
+        else:
+            self.subscribers.add(subscriber)
+
+
+class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
+    """Actor to create and set up a cloud compute node.
+
+    This actor prepares an Arvados node record for a new compute node
+    (either creating one or cleaning one passed in), then boots the
+    actual compute node.  It notifies subscribers when the cloud node
+    is successfully created (the last step in the process for Node
+    Manager to handle).
+    """
+    def __init__(self, timer_actor, arvados_client, cloud_client,
+                 cloud_size, arvados_node=None,
+                 retry_wait=1, max_retry_wait=180):
+        super(ComputeNodeSetupActor, self).__init__(
+            'arvnodeman.nodeup', timer_actor, retry_wait, max_retry_wait)
+        self._arvados = arvados_client
+        self._cloud = cloud_client
+        self.cloud_size = cloud_size
+        self.arvados_node = None
+        self.cloud_node = None
+        if arvados_node is None:
+            self._later.create_arvados_node()
+        else:
+            self._later.prepare_arvados_node(arvados_node)
+
+    @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
+    def create_arvados_node(self):
+        self.arvados_node = self._arvados.nodes().create(body={}).execute()
+        self._later.create_cloud_node()
+
+    @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
+    def prepare_arvados_node(self, node):
+        self.arvados_node = self._arvados.nodes().update(
+            uuid=node['uuid'],
+            body={'hostname': None,
+                  'ip_address': None,
+                  'slot_number': None,
+                  'first_ping_at': None,
+                  'last_ping_at': None,
+                  'info': {'ec2_instance_id': None,
+                           'last_action': "Prepared by Node Manager"}}
+            ).execute()
+        self._later.create_cloud_node()
+
+    @ComputeNodeStateChangeBase._retry(config.CLOUD_ERRORS)
+    def create_cloud_node(self):
+        self._logger.info("Creating cloud node with size %s.",
+                          self.cloud_size.name)
+        self.cloud_node = self._cloud.create_node(self.cloud_size,
+                                                  self.arvados_node)
+        self._logger.info("Cloud node %s created.", self.cloud_node.id)
+        self._finished()
+
+    def stop_if_no_cloud_node(self):
+        if self.cloud_node is None:
+            self.stop()
+
+
+class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
+    """Actor to shut down a compute node.
+
+    This actor simply destroys a cloud node, retrying as needed.
+    """
+    def __init__(self, timer_actor, cloud_client, cloud_node,
+                 retry_wait=1, max_retry_wait=180):
+        super(ComputeNodeShutdownActor, self).__init__(
+            'arvnodeman.nodedown', timer_actor, retry_wait, max_retry_wait)
+        self._cloud = cloud_client
+        self.cloud_node = cloud_node
+        self._later.shutdown_node()
+
+    @ComputeNodeStateChangeBase._retry(config.CLOUD_ERRORS)
+    def shutdown_node(self):
+        self._cloud.destroy_node(self.cloud_node)
+        self._logger.info("Cloud node %s shut down.", self.cloud_node.id)
+        self._finished()
+
+
+class ComputeNodeUpdateActor(config.actor_class):
+    """Actor to dispatch one-off cloud management requests.
+
+    This actor receives requests for small cloud updates, and
+    dispatches them to a real driver.  ComputeNodeMonitorActors use
+    this to perform maintenance tasks on themselves.  Having a
+    dedicated actor for this gives us the opportunity to control the
+    flow of requests; e.g., by backing off when errors occur.
+
+    This actor is most like a "traditional" Pykka actor: there's no
+    subscribing, but instead methods return real driver results.  If
+    you're interested in those results, you should get them from the
+    Future that the proxy method returns.  Be prepared to handle exceptions
+    from the cloud driver when you do.
+    """
+    def __init__(self, cloud_factory, max_retry_wait=180):
+        super(ComputeNodeUpdateActor, self).__init__()
+        self._cloud = cloud_factory()
+        self.max_retry_wait = max_retry_wait
+        self.error_streak = 0
+        self.next_request_time = time.time()
+
+    def _throttle_errors(orig_func):
+        @functools.wraps(orig_func)
+        def wrapper(self, *args, **kwargs):
+            throttle_time = self.next_request_time - time.time()
+            if throttle_time > 0:
+                time.sleep(throttle_time)
+            self.next_request_time = time.time()
+            try:
+                result = orig_func(self, *args, **kwargs)
+            except config.CLOUD_ERRORS:
+                self.error_streak += 1
+                self.next_request_time += min(2 ** self.error_streak,
+                                              self.max_retry_wait)
+                raise
+            else:
+                self.error_streak = 0
+                return result
+        return wrapper
+
+    @_throttle_errors
+    def sync_node(self, cloud_node, arvados_node):
+        return self._cloud.sync_node(cloud_node, arvados_node)
+
+
+class ComputeNodeMonitorActor(config.actor_class):
+    """Actor to manage a running compute node.
+
+    This actor gets updates about a compute node's cloud and Arvados records.
+    It uses this information to notify subscribers when the node is eligible
+    for shutdown.
+    """
+    def __init__(self, cloud_node, cloud_node_start_time, shutdown_timer,
+                 timer_actor, update_actor, arvados_node=None,
+                 poll_stale_after=600, node_stale_after=3600):
+        super(ComputeNodeMonitorActor, self).__init__()
+        self._later = self.actor_ref.proxy()
+        self._logger = logging.getLogger('arvnodeman.computenode')
+        self._last_log = None
+        self._shutdowns = shutdown_timer
+        self._timer = timer_actor
+        self._update = update_actor
+        self.cloud_node = cloud_node
+        self.cloud_node_start_time = cloud_node_start_time
+        self.poll_stale_after = poll_stale_after
+        self.node_stale_after = node_stale_after
+        self.subscribers = set()
+        self.arvados_node = None
+        self._later.update_arvados_node(arvados_node)
+        self.last_shutdown_opening = None
+        self._later.consider_shutdown()
+
+    def subscribe(self, subscriber):
+        self.subscribers.add(subscriber)
+
+    def _debug(self, msg, *args):
+        if msg == self._last_log:
+            return
+        self._last_log = msg
+        self._logger.debug(msg, *args)
+
+    def in_state(self, *states):
+        # Return a boolean to say whether or not our Arvados node record is in
+        # one of the given states.  If state information is not
+        # available--because this node has no Arvados record, the record is
+        # stale, or the record has no state information--return None.
+        if (self.arvados_node is None) or not timestamp_fresh(
+              arvados_node_mtime(self.arvados_node), self.node_stale_after):
+            return None
+        state = self.arvados_node['info'].get('slurm_state')
+        if not state:
+            return None
+        result = state in states
+        if state == 'idle':
+            result = result and not self.arvados_node['job_uuid']
+        return result
+
+    def _shutdown_eligible(self):
+        if self.arvados_node is None:
+            # If this is a new, unpaired node, it's eligible for
+            # shutdown--we figure there was an error during bootstrap.
+            return timestamp_fresh(self.cloud_node_start_time,
+                                   self.node_stale_after)
+        else:
+            return self.in_state('idle')
+
+    def consider_shutdown(self):
+        next_opening = self._shutdowns.next_opening()
+        if self._shutdowns.window_open():
+            if self._shutdown_eligible():
+                self._debug("Node %s suggesting shutdown.", self.cloud_node.id)
+                _notify_subscribers(self._later, self.subscribers)
+            else:
+                self._debug("Node %s shutdown window open but node busy.",
+                            self.cloud_node.id)
+        else:
+            self._debug("Node %s shutdown window closed.  Next at %s.",
+                        self.cloud_node.id, time.ctime(next_opening))
+        if self.last_shutdown_opening != next_opening:
+            self._timer.schedule(next_opening, self._later.consider_shutdown)
+            self.last_shutdown_opening = next_opening
+
+    def offer_arvados_pair(self, arvados_node):
+        if self.arvados_node is not None:
+            return None
+        elif arvados_node['ip_address'] in self.cloud_node.private_ips:
+            self._later.update_arvados_node(arvados_node)
+            return self.cloud_node.id
+        else:
+            return None
+
+    def update_cloud_node(self, cloud_node):
+        if cloud_node is not None:
+            self.cloud_node = cloud_node
+            self._later.consider_shutdown()
+
+    def update_arvados_node(self, arvados_node):
+        if arvados_node is not None:
+            self.arvados_node = arvados_node
+            new_hostname = arvados_node_fqdn(self.arvados_node)
+            if new_hostname != self.cloud_node.name:
+                self._update.sync_node(self.cloud_node, self.arvados_node)
+            self._later.consider_shutdown()
diff --git a/services/nodemanager/arvnodeman/computenode/driver/__init__.py b/services/nodemanager/arvnodeman/computenode/driver/__init__.py
new file mode 100644 (file)
index 0000000..a20cfde
--- /dev/null
@@ -0,0 +1,64 @@
+#!/usr/bin/env python
+
+from __future__ import absolute_import, print_function
+
+class BaseComputeNodeDriver(object):
+    """Abstract base class for compute node drivers.
+
+    libcloud abstracts away many of the differences between cloud providers,
+    but managing compute nodes requires some cloud-specific features (e.g.,
+    on EC2 we use tags to identify compute nodes).  Compute node drivers
+    are responsible for translating the node manager's cloud requests to a
+    specific cloud's vocabulary.
+
+    Subclasses must implement arvados_create_kwargs (to update node
+    creation kwargs with information about the specific Arvados node
+    record), sync_node, and node_start_time.
+    """
+    def __init__(self, auth_kwargs, list_kwargs, create_kwargs, driver_class):
+        self.real = driver_class(**auth_kwargs)
+        self.list_kwargs = list_kwargs
+        self.create_kwargs = create_kwargs
+
+    def __getattr__(self, name):
+        # Proxy non-extension methods to the real driver.
+        if (not name.startswith('_') and not name.startswith('ex_')
+              and hasattr(self.real, name)):
+            return getattr(self.real, name)
+        else:
+            return super(BaseComputeNodeDriver, self).__getattr__(name)
+
+    def search_for(self, term, list_method, key=lambda item: item.id):
+        cache_key = (list_method, term)
+        if cache_key not in self.SEARCH_CACHE:
+            results = [item for item in getattr(self.real, list_method)()
+                       if key(item) == term]
+            count = len(results)
+            if count != 1:
+                raise ValueError("{} returned {} results for '{}'".format(
+                        list_method, count, term))
+            self.SEARCH_CACHE[cache_key] = results[0]
+        return self.SEARCH_CACHE[cache_key]
+
+    def list_nodes(self):
+        return self.real.list_nodes(**self.list_kwargs)
+
+    def arvados_create_kwargs(self, arvados_node):
+        raise NotImplementedError("BaseComputeNodeDriver.arvados_create_kwargs")
+
+    def create_node(self, size, arvados_node):
+        kwargs = self.create_kwargs.copy()
+        kwargs.update(self.arvados_create_kwargs(arvados_node))
+        kwargs['size'] = size
+        return self.real.create_node(**kwargs)
+
+    def sync_node(self, cloud_node, arvados_node):
+        # When a compute node first pings the API server, the API server
+        # will automatically assign some attributes on the corresponding
+        # node record, like hostname.  This method should propagate that
+        # information back to the cloud node appropriately.
+        raise NotImplementedError("BaseComputeNodeDriver.sync_node")
+
+    @classmethod
+    def node_start_time(cls, node):
+        raise NotImplementedError("BaseComputeNodeDriver.node_start_time")
similarity index 96%
rename from services/nodemanager/arvnodeman/computenode/dummy.py
rename to services/nodemanager/arvnodeman/computenode/driver/dummy.py
index 6c39feaf614d9feaf84bc59a003f5ada657250a6..3a286bba017d3b9fd8999fe7185a90d197917df8 100644 (file)
@@ -7,7 +7,8 @@ import time
 import libcloud.compute.providers as cloud_provider
 import libcloud.compute.types as cloud_types
 
-from . import BaseComputeNodeDriver, arvados_node_fqdn
+from . import BaseComputeNodeDriver
+from .. import arvados_node_fqdn
 
 class ComputeNodeDriver(BaseComputeNodeDriver):
     """Compute node driver wrapper for libcloud's dummy driver.
similarity index 98%
rename from services/nodemanager/arvnodeman/computenode/ec2.py
rename to services/nodemanager/arvnodeman/computenode/driver/ec2.py
index 359bed4d1cb76fc557647e18395f9497134b66a3..c0992f7b9635e2c47edd8e67cf5b887ba5692718 100644 (file)
@@ -9,7 +9,8 @@ import libcloud.compute.providers as cloud_provider
 import libcloud.compute.types as cloud_types
 from libcloud.compute.drivers import ec2 as cloud_ec2
 
-from . import BaseComputeNodeDriver, arvados_node_fqdn
+from . import BaseComputeNodeDriver
+from .. import arvados_node_fqdn
 
 ### Monkeypatch libcloud to support AWS' new SecurityGroup API.
 # These classes can be removed when libcloud support specifying
index 754b9319f0797d864ab0c9eb50107054485e6eed..24fd828cf5b2a20b9bc91ffd906d464a155eabc6 100644 (file)
@@ -85,7 +85,7 @@ class NodeManagerConfig(ConfigParser.SafeConfigParser):
                            http=http)
 
     def new_cloud_client(self):
-        module = importlib.import_module('arvnodeman.computenode.' +
+        module = importlib.import_module('arvnodeman.computenode.driver.' +
                                          self.get('Cloud', 'provider'))
         auth_kwargs = self.get_section('Cloud Credentials')
         if 'timeout' in auth_kwargs:
index eaf10be324de16811e655e2b8427bed85ad6709d..7ff736fb86dcd075e0c9c3777479a746e1767ea7 100644 (file)
@@ -9,6 +9,7 @@ import time
 import pykka
 
 from . import computenode as cnode
+from .computenode import dispatch
 from .config import actor_class
 
 class _ComputeNodeRecord(object):
@@ -96,9 +97,9 @@ class NodeManagerDaemonActor(actor_class):
                  arvados_factory, cloud_factory,
                  shutdown_windows, min_nodes, max_nodes,
                  poll_stale_after=600, node_stale_after=7200,
-                 node_setup_class=cnode.ComputeNodeSetupActor,
-                 node_shutdown_class=cnode.ComputeNodeShutdownActor,
-                 node_actor_class=cnode.ComputeNodeMonitorActor):
+                 node_setup_class=dispatch.ComputeNodeSetupActor,
+                 node_shutdown_class=dispatch.ComputeNodeShutdownActor,
+                 node_actor_class=dispatch.ComputeNodeMonitorActor):
         super(NodeManagerDaemonActor, self).__init__()
         self._node_setup = node_setup_class
         self._node_shutdown = node_shutdown_class
index f4ad7163fa2ac0ef445d5feb8317f412e07401b4..d2f4afee061e26fd19085575f7b5bef33c977efd 100644 (file)
@@ -12,9 +12,7 @@ import daemon
 import pykka
 
 from . import config as nmconfig
-from .computenode import \
-    ComputeNodeSetupActor, ComputeNodeShutdownActor, ComputeNodeUpdateActor, \
-    ShutdownTimer
+from .computenode.dispatch import ComputeNodeUpdateActor
 from .daemon import NodeManagerDaemonActor
 from .jobqueue import JobQueueMonitorActor, ServerCalculator
 from .nodelist import ArvadosNodeListMonitorActor, CloudNodeListMonitorActor
index 5ced5f99bbc6a768f9817b7ec364dfc550354541..e22cccc5b0193b1949297fdae1fdd9fa24a4836b 100644 (file)
@@ -6,137 +6,11 @@ import time
 import unittest
 
 import arvados.errors as arverror
-import httplib2
 import mock
-import pykka
 
 import arvnodeman.computenode as cnode
 from . import testutil
 
-class ComputeNodeSetupActorTestCase(testutil.ActorTestMixin, unittest.TestCase):
-    def make_mocks(self, arvados_effect=None, cloud_effect=None):
-        if arvados_effect is None:
-            arvados_effect = [testutil.arvados_node_mock()]
-        self.arvados_effect = arvados_effect
-        self.timer = testutil.MockTimer()
-        self.api_client = mock.MagicMock(name='api_client')
-        self.api_client.nodes().create().execute.side_effect = arvados_effect
-        self.api_client.nodes().update().execute.side_effect = arvados_effect
-        self.cloud_client = mock.MagicMock(name='cloud_client')
-        self.cloud_client.create_node.return_value = testutil.cloud_node_mock(1)
-
-    def make_actor(self, arv_node=None):
-        if not hasattr(self, 'timer'):
-            self.make_mocks(arvados_effect=[arv_node])
-        self.setup_actor = cnode.ComputeNodeSetupActor.start(
-            self.timer, self.api_client, self.cloud_client,
-            testutil.MockSize(1), arv_node).proxy()
-
-    def test_creation_without_arvados_node(self):
-        self.make_actor()
-        self.assertEqual(self.arvados_effect[-1],
-                         self.setup_actor.arvados_node.get(self.TIMEOUT))
-        self.assertTrue(self.api_client.nodes().create().execute.called)
-        self.assertEqual(self.cloud_client.create_node(),
-                         self.setup_actor.cloud_node.get(self.TIMEOUT))
-
-    def test_creation_with_arvados_node(self):
-        self.make_actor(testutil.arvados_node_mock())
-        self.assertEqual(self.arvados_effect[-1],
-                         self.setup_actor.arvados_node.get(self.TIMEOUT))
-        self.assertTrue(self.api_client.nodes().update().execute.called)
-        self.assertEqual(self.cloud_client.create_node(),
-                         self.setup_actor.cloud_node.get(self.TIMEOUT))
-
-    def test_failed_calls_retried(self):
-        self.make_mocks([
-                arverror.ApiError(httplib2.Response({'status': '500'}), ""),
-                testutil.arvados_node_mock(),
-                ])
-        self.make_actor()
-        self.wait_for_assignment(self.setup_actor, 'cloud_node')
-
-    def test_stop_when_no_cloud_node(self):
-        self.make_mocks(
-            arverror.ApiError(httplib2.Response({'status': '500'}), ""))
-        self.make_actor()
-        self.setup_actor.stop_if_no_cloud_node()
-        self.assertTrue(
-            self.setup_actor.actor_ref.actor_stopped.wait(self.TIMEOUT))
-
-    def test_no_stop_when_cloud_node(self):
-        self.make_actor()
-        self.wait_for_assignment(self.setup_actor, 'cloud_node')
-        self.setup_actor.stop_if_no_cloud_node().get(self.TIMEOUT)
-        self.assertTrue(self.stop_proxy(self.setup_actor),
-                        "actor was stopped by stop_if_no_cloud_node")
-
-    def test_subscribe(self):
-        self.make_mocks(
-            arverror.ApiError(httplib2.Response({'status': '500'}), ""))
-        self.make_actor()
-        subscriber = mock.Mock(name='subscriber_mock')
-        self.setup_actor.subscribe(subscriber)
-        self.api_client.nodes().create().execute.side_effect = [
-            testutil.arvados_node_mock()]
-        self.wait_for_assignment(self.setup_actor, 'cloud_node')
-        self.assertEqual(self.setup_actor.actor_ref.actor_urn,
-                         subscriber.call_args[0][0].actor_ref.actor_urn)
-
-    def test_late_subscribe(self):
-        self.make_actor()
-        subscriber = mock.Mock(name='subscriber_mock')
-        self.wait_for_assignment(self.setup_actor, 'cloud_node')
-        self.setup_actor.subscribe(subscriber).get(self.TIMEOUT)
-        self.stop_proxy(self.setup_actor)
-        self.assertEqual(self.setup_actor.actor_ref.actor_urn,
-                         subscriber.call_args[0][0].actor_ref.actor_urn)
-
-
-class ComputeNodeShutdownActorTestCase(testutil.ActorTestMixin,
-                                       unittest.TestCase):
-    def make_mocks(self, cloud_node=None):
-        self.timer = testutil.MockTimer()
-        self.cloud_client = mock.MagicMock(name='cloud_client')
-        if cloud_node is None:
-            cloud_node = testutil.cloud_node_mock()
-        self.cloud_node = cloud_node
-
-    def make_actor(self, arv_node=None):
-        if not hasattr(self, 'timer'):
-            self.make_mocks()
-        self.shutdown_actor = cnode.ComputeNodeShutdownActor.start(
-            self.timer, self.cloud_client, self.cloud_node).proxy()
-
-    def test_easy_shutdown(self):
-        self.make_actor()
-        self.shutdown_actor.cloud_node.get(self.TIMEOUT)
-        self.stop_proxy(self.shutdown_actor)
-        self.assertTrue(self.cloud_client.destroy_node.called)
-
-    def test_late_subscribe(self):
-        self.make_actor()
-        subscriber = mock.Mock(name='subscriber_mock')
-        self.shutdown_actor.subscribe(subscriber).get(self.TIMEOUT)
-        self.stop_proxy(self.shutdown_actor)
-        self.assertEqual(self.shutdown_actor.actor_ref.actor_urn,
-                         subscriber.call_args[0][0].actor_ref.actor_urn)
-
-
-class ComputeNodeUpdateActorTestCase(testutil.ActorTestMixin,
-                                     unittest.TestCase):
-    def make_actor(self):
-        self.driver = mock.MagicMock(name='driver_mock')
-        self.updater = cnode.ComputeNodeUpdateActor.start(self.driver).proxy()
-
-    def test_node_sync(self):
-        self.make_actor()
-        cloud_node = testutil.cloud_node_mock()
-        arv_node = testutil.arvados_node_mock()
-        self.updater.sync_node(cloud_node, arv_node).get(self.TIMEOUT)
-        self.driver().sync_node.assert_called_with(cloud_node, arv_node)
-
-
 @mock.patch('time.time', return_value=1)
 class ShutdownTimerTestCase(unittest.TestCase):
     def test_two_length_window(self, time_mock):
@@ -160,156 +34,3 @@ class ShutdownTimerTestCase(unittest.TestCase):
         time_mock.return_value += 200
         self.assertEqual(961, timer.next_opening())
         self.assertFalse(timer.window_open())
-
-
-class ComputeNodeMonitorActorTestCase(testutil.ActorTestMixin,
-                                      unittest.TestCase):
-    class MockShutdownTimer(object):
-        def _set_state(self, is_open, next_opening):
-            self.window_open = lambda: is_open
-            self.next_opening = lambda: next_opening
-
-
-    def make_mocks(self, node_num):
-        self.shutdowns = self.MockShutdownTimer()
-        self.shutdowns._set_state(False, 300)
-        self.timer = mock.MagicMock(name='timer_mock')
-        self.updates = mock.MagicMock(name='update_mock')
-        self.cloud_mock = testutil.cloud_node_mock(node_num)
-        self.subscriber = mock.Mock(name='subscriber_mock')
-
-    def make_actor(self, node_num=1, arv_node=None, start_time=None):
-        if not hasattr(self, 'cloud_mock'):
-            self.make_mocks(node_num)
-        if start_time is None:
-            start_time = time.time()
-        self.node_actor = cnode.ComputeNodeMonitorActor.start(
-            self.cloud_mock, start_time, self.shutdowns, self.timer,
-            self.updates, arv_node).proxy()
-        self.node_actor.subscribe(self.subscriber).get(self.TIMEOUT)
-
-    def node_state(self, *states):
-        return self.node_actor.in_state(*states).get(self.TIMEOUT)
-
-    def test_in_state_when_unpaired(self):
-        self.make_actor()
-        self.assertIsNone(self.node_state('idle', 'alloc'))
-
-    def test_in_state_when_pairing_stale(self):
-        self.make_actor(arv_node=testutil.arvados_node_mock(
-                job_uuid=None, age=90000))
-        self.assertIsNone(self.node_state('idle', 'alloc'))
-
-    def test_in_state_when_no_state_available(self):
-        self.make_actor(arv_node=testutil.arvados_node_mock(info={}))
-        self.assertIsNone(self.node_state('idle', 'alloc'))
-
-    def test_in_idle_state(self):
-        self.make_actor(2, arv_node=testutil.arvados_node_mock(job_uuid=None))
-        self.assertTrue(self.node_state('idle'))
-        self.assertFalse(self.node_state('alloc'))
-        self.assertTrue(self.node_state('idle', 'alloc'))
-
-    def test_in_alloc_state(self):
-        self.make_actor(3, arv_node=testutil.arvados_node_mock(job_uuid=True))
-        self.assertFalse(self.node_state('idle'))
-        self.assertTrue(self.node_state('alloc'))
-        self.assertTrue(self.node_state('idle', 'alloc'))
-
-    def test_init_shutdown_scheduling(self):
-        self.make_actor()
-        self.assertTrue(self.timer.schedule.called)
-        self.assertEqual(300, self.timer.schedule.call_args[0][0])
-
-    def test_shutdown_subscription(self):
-        self.make_actor()
-        self.shutdowns._set_state(True, 600)
-        self.node_actor.consider_shutdown().get(self.TIMEOUT)
-        self.assertTrue(self.subscriber.called)
-        self.assertEqual(self.node_actor.actor_ref.actor_urn,
-                         self.subscriber.call_args[0][0].actor_ref.actor_urn)
-
-    def test_shutdown_without_arvados_node(self):
-        self.make_actor()
-        self.shutdowns._set_state(True, 600)
-        self.node_actor.consider_shutdown().get(self.TIMEOUT)
-        self.assertTrue(self.subscriber.called)
-
-    def test_no_shutdown_without_arvados_node_and_old_cloud_node(self):
-        self.make_actor(start_time=0)
-        self.shutdowns._set_state(True, 600)
-        self.node_actor.consider_shutdown().get(self.TIMEOUT)
-        self.assertFalse(self.subscriber.called)
-
-    def check_shutdown_rescheduled(self, window_open, next_window,
-                                   schedule_time=None):
-        self.shutdowns._set_state(window_open, next_window)
-        self.timer.schedule.reset_mock()
-        self.node_actor.consider_shutdown().get(self.TIMEOUT)
-        self.stop_proxy(self.node_actor)
-        self.assertTrue(self.timer.schedule.called)
-        if schedule_time is not None:
-            self.assertEqual(schedule_time, self.timer.schedule.call_args[0][0])
-        self.assertFalse(self.subscriber.called)
-
-    def test_shutdown_window_close_scheduling(self):
-        self.make_actor()
-        self.check_shutdown_rescheduled(False, 600, 600)
-
-    def test_no_shutdown_when_node_running_job(self):
-        self.make_actor(4, testutil.arvados_node_mock(4, job_uuid=True))
-        self.check_shutdown_rescheduled(True, 600)
-
-    def test_no_shutdown_when_node_state_unknown(self):
-        self.make_actor(5, testutil.arvados_node_mock(5, info={}))
-        self.check_shutdown_rescheduled(True, 600)
-
-    def test_no_shutdown_when_node_state_stale(self):
-        self.make_actor(6, testutil.arvados_node_mock(6, age=90000))
-        self.check_shutdown_rescheduled(True, 600)
-
-    def test_arvados_node_match(self):
-        self.make_actor(2)
-        arv_node = testutil.arvados_node_mock(
-            2, hostname='compute-two.zzzzz.arvadosapi.com')
-        pair_id = self.node_actor.offer_arvados_pair(arv_node).get(self.TIMEOUT)
-        self.assertEqual(self.cloud_mock.id, pair_id)
-        self.stop_proxy(self.node_actor)
-        self.updates.sync_node.assert_called_with(self.cloud_mock, arv_node)
-
-    def test_arvados_node_mismatch(self):
-        self.make_actor(3)
-        arv_node = testutil.arvados_node_mock(1)
-        self.assertIsNone(
-            self.node_actor.offer_arvados_pair(arv_node).get(self.TIMEOUT))
-
-    def test_update_cloud_node(self):
-        self.make_actor(1)
-        self.make_mocks(2)
-        self.cloud_mock.id = '1'
-        self.node_actor.update_cloud_node(self.cloud_mock)
-        current_cloud = self.node_actor.cloud_node.get(self.TIMEOUT)
-        self.assertEqual([testutil.ip_address_mock(2)],
-                         current_cloud.private_ips)
-
-    def test_missing_cloud_node_update(self):
-        self.make_actor(1)
-        self.node_actor.update_cloud_node(None)
-        current_cloud = self.node_actor.cloud_node.get(self.TIMEOUT)
-        self.assertEqual([testutil.ip_address_mock(1)],
-                         current_cloud.private_ips)
-
-    def test_update_arvados_node(self):
-        self.make_actor(3)
-        job_uuid = 'zzzzz-jjjjj-updatejobnode00'
-        new_arvados = testutil.arvados_node_mock(3, job_uuid)
-        self.node_actor.update_arvados_node(new_arvados)
-        current_arvados = self.node_actor.arvados_node.get(self.TIMEOUT)
-        self.assertEqual(job_uuid, current_arvados['job_uuid'])
-
-    def test_missing_arvados_node_update(self):
-        self.make_actor(4, testutil.arvados_node_mock(4))
-        self.node_actor.update_arvados_node(None)
-        current_arvados = self.node_actor.arvados_node.get(self.TIMEOUT)
-        self.assertEqual(testutil.ip_address_mock(4),
-                         current_arvados['ip_address'])
diff --git a/services/nodemanager/tests/test_computenode_dispatch.py b/services/nodemanager/tests/test_computenode_dispatch.py
new file mode 100644 (file)
index 0000000..ece186b
--- /dev/null
@@ -0,0 +1,290 @@
+#!/usr/bin/env python
+
+from __future__ import absolute_import, print_function
+
+import time
+import unittest
+
+import arvados.errors as arverror
+import httplib2
+import mock
+import pykka
+
+import arvnodeman.computenode.dispatch as dispatch
+from . import testutil
+
+class ComputeNodeSetupActorTestCase(testutil.ActorTestMixin, unittest.TestCase):
+    def make_mocks(self, arvados_effect=None, cloud_effect=None):
+        if arvados_effect is None:
+            arvados_effect = [testutil.arvados_node_mock()]
+        self.arvados_effect = arvados_effect
+        self.timer = testutil.MockTimer()
+        self.api_client = mock.MagicMock(name='api_client')
+        self.api_client.nodes().create().execute.side_effect = arvados_effect
+        self.api_client.nodes().update().execute.side_effect = arvados_effect
+        self.cloud_client = mock.MagicMock(name='cloud_client')
+        self.cloud_client.create_node.return_value = testutil.cloud_node_mock(1)
+
+    def make_actor(self, arv_node=None):
+        if not hasattr(self, 'timer'):
+            self.make_mocks(arvados_effect=[arv_node])
+        self.setup_actor = dispatch.ComputeNodeSetupActor.start(
+            self.timer, self.api_client, self.cloud_client,
+            testutil.MockSize(1), arv_node).proxy()
+
+    def test_creation_without_arvados_node(self):
+        self.make_actor()
+        self.assertEqual(self.arvados_effect[-1],
+                         self.setup_actor.arvados_node.get(self.TIMEOUT))
+        self.assertTrue(self.api_client.nodes().create().execute.called)
+        self.assertEqual(self.cloud_client.create_node(),
+                         self.setup_actor.cloud_node.get(self.TIMEOUT))
+
+    def test_creation_with_arvados_node(self):
+        self.make_actor(testutil.arvados_node_mock())
+        self.assertEqual(self.arvados_effect[-1],
+                         self.setup_actor.arvados_node.get(self.TIMEOUT))
+        self.assertTrue(self.api_client.nodes().update().execute.called)
+        self.assertEqual(self.cloud_client.create_node(),
+                         self.setup_actor.cloud_node.get(self.TIMEOUT))
+
+    def test_failed_calls_retried(self):
+        self.make_mocks([
+                arverror.ApiError(httplib2.Response({'status': '500'}), ""),
+                testutil.arvados_node_mock(),
+                ])
+        self.make_actor()
+        self.wait_for_assignment(self.setup_actor, 'cloud_node')
+
+    def test_stop_when_no_cloud_node(self):
+        self.make_mocks(
+            arverror.ApiError(httplib2.Response({'status': '500'}), ""))
+        self.make_actor()
+        self.setup_actor.stop_if_no_cloud_node()
+        self.assertTrue(
+            self.setup_actor.actor_ref.actor_stopped.wait(self.TIMEOUT))
+
+    def test_no_stop_when_cloud_node(self):
+        self.make_actor()
+        self.wait_for_assignment(self.setup_actor, 'cloud_node')
+        self.setup_actor.stop_if_no_cloud_node().get(self.TIMEOUT)
+        self.assertTrue(self.stop_proxy(self.setup_actor),
+                        "actor was stopped by stop_if_no_cloud_node")
+
+    def test_subscribe(self):
+        self.make_mocks(
+            arverror.ApiError(httplib2.Response({'status': '500'}), ""))
+        self.make_actor()
+        subscriber = mock.Mock(name='subscriber_mock')
+        self.setup_actor.subscribe(subscriber)
+        self.api_client.nodes().create().execute.side_effect = [
+            testutil.arvados_node_mock()]
+        self.wait_for_assignment(self.setup_actor, 'cloud_node')
+        self.assertEqual(self.setup_actor.actor_ref.actor_urn,
+                         subscriber.call_args[0][0].actor_ref.actor_urn)
+
+    def test_late_subscribe(self):
+        self.make_actor()
+        subscriber = mock.Mock(name='subscriber_mock')
+        self.wait_for_assignment(self.setup_actor, 'cloud_node')
+        self.setup_actor.subscribe(subscriber).get(self.TIMEOUT)
+        self.stop_proxy(self.setup_actor)
+        self.assertEqual(self.setup_actor.actor_ref.actor_urn,
+                         subscriber.call_args[0][0].actor_ref.actor_urn)
+
+
+class ComputeNodeShutdownActorTestCase(testutil.ActorTestMixin,
+                                       unittest.TestCase):
+    def make_mocks(self, cloud_node=None):
+        self.timer = testutil.MockTimer()
+        self.cloud_client = mock.MagicMock(name='cloud_client')
+        if cloud_node is None:
+            cloud_node = testutil.cloud_node_mock()
+        self.cloud_node = cloud_node
+
+    def make_actor(self, arv_node=None):
+        if not hasattr(self, 'timer'):
+            self.make_mocks()
+        self.shutdown_actor = dispatch.ComputeNodeShutdownActor.start(
+            self.timer, self.cloud_client, self.cloud_node).proxy()
+
+    def test_easy_shutdown(self):
+        self.make_actor()
+        self.shutdown_actor.cloud_node.get(self.TIMEOUT)
+        self.stop_proxy(self.shutdown_actor)
+        self.assertTrue(self.cloud_client.destroy_node.called)
+
+    def test_late_subscribe(self):
+        self.make_actor()
+        subscriber = mock.Mock(name='subscriber_mock')
+        self.shutdown_actor.subscribe(subscriber).get(self.TIMEOUT)
+        self.stop_proxy(self.shutdown_actor)
+        self.assertEqual(self.shutdown_actor.actor_ref.actor_urn,
+                         subscriber.call_args[0][0].actor_ref.actor_urn)
+
+
+class ComputeNodeUpdateActorTestCase(testutil.ActorTestMixin,
+                                     unittest.TestCase):
+    def make_actor(self):
+        self.driver = mock.MagicMock(name='driver_mock')
+        self.updater = dispatch.ComputeNodeUpdateActor.start(self.driver).proxy()
+
+    def test_node_sync(self):
+        self.make_actor()
+        cloud_node = testutil.cloud_node_mock()
+        arv_node = testutil.arvados_node_mock()
+        self.updater.sync_node(cloud_node, arv_node).get(self.TIMEOUT)
+        self.driver().sync_node.assert_called_with(cloud_node, arv_node)
+
+
+class ComputeNodeMonitorActorTestCase(testutil.ActorTestMixin,
+                                      unittest.TestCase):
+    class MockShutdownTimer(object):
+        def _set_state(self, is_open, next_opening):
+            self.window_open = lambda: is_open
+            self.next_opening = lambda: next_opening
+
+
+    def make_mocks(self, node_num):
+        self.shutdowns = self.MockShutdownTimer()
+        self.shutdowns._set_state(False, 300)
+        self.timer = mock.MagicMock(name='timer_mock')
+        self.updates = mock.MagicMock(name='update_mock')
+        self.cloud_mock = testutil.cloud_node_mock(node_num)
+        self.subscriber = mock.Mock(name='subscriber_mock')
+
+    def make_actor(self, node_num=1, arv_node=None, start_time=None):
+        if not hasattr(self, 'cloud_mock'):
+            self.make_mocks(node_num)
+        if start_time is None:
+            start_time = time.time()
+        self.node_actor = dispatch.ComputeNodeMonitorActor.start(
+            self.cloud_mock, start_time, self.shutdowns, self.timer,
+            self.updates, arv_node).proxy()
+        self.node_actor.subscribe(self.subscriber).get(self.TIMEOUT)
+
+    def node_state(self, *states):
+        return self.node_actor.in_state(*states).get(self.TIMEOUT)
+
+    def test_in_state_when_unpaired(self):
+        self.make_actor()
+        self.assertIsNone(self.node_state('idle', 'alloc'))
+
+    def test_in_state_when_pairing_stale(self):
+        self.make_actor(arv_node=testutil.arvados_node_mock(
+                job_uuid=None, age=90000))
+        self.assertIsNone(self.node_state('idle', 'alloc'))
+
+    def test_in_state_when_no_state_available(self):
+        self.make_actor(arv_node=testutil.arvados_node_mock(info={}))
+        self.assertIsNone(self.node_state('idle', 'alloc'))
+
+    def test_in_idle_state(self):
+        self.make_actor(2, arv_node=testutil.arvados_node_mock(job_uuid=None))
+        self.assertTrue(self.node_state('idle'))
+        self.assertFalse(self.node_state('alloc'))
+        self.assertTrue(self.node_state('idle', 'alloc'))
+
+    def test_in_alloc_state(self):
+        self.make_actor(3, arv_node=testutil.arvados_node_mock(job_uuid=True))
+        self.assertFalse(self.node_state('idle'))
+        self.assertTrue(self.node_state('alloc'))
+        self.assertTrue(self.node_state('idle', 'alloc'))
+
+    def test_init_shutdown_scheduling(self):
+        self.make_actor()
+        self.assertTrue(self.timer.schedule.called)
+        self.assertEqual(300, self.timer.schedule.call_args[0][0])
+
+    def test_shutdown_subscription(self):
+        self.make_actor()
+        self.shutdowns._set_state(True, 600)
+        self.node_actor.consider_shutdown().get(self.TIMEOUT)
+        self.assertTrue(self.subscriber.called)
+        self.assertEqual(self.node_actor.actor_ref.actor_urn,
+                         self.subscriber.call_args[0][0].actor_ref.actor_urn)
+
+    def test_shutdown_without_arvados_node(self):
+        self.make_actor()
+        self.shutdowns._set_state(True, 600)
+        self.node_actor.consider_shutdown().get(self.TIMEOUT)
+        self.assertTrue(self.subscriber.called)
+
+    def test_no_shutdown_without_arvados_node_and_old_cloud_node(self):
+        self.make_actor(start_time=0)
+        self.shutdowns._set_state(True, 600)
+        self.node_actor.consider_shutdown().get(self.TIMEOUT)
+        self.assertFalse(self.subscriber.called)
+
+    def check_shutdown_rescheduled(self, window_open, next_window,
+                                   schedule_time=None):
+        self.shutdowns._set_state(window_open, next_window)
+        self.timer.schedule.reset_mock()
+        self.node_actor.consider_shutdown().get(self.TIMEOUT)
+        self.stop_proxy(self.node_actor)
+        self.assertTrue(self.timer.schedule.called)
+        if schedule_time is not None:
+            self.assertEqual(schedule_time, self.timer.schedule.call_args[0][0])
+        self.assertFalse(self.subscriber.called)
+
+    def test_shutdown_window_close_scheduling(self):
+        self.make_actor()
+        self.check_shutdown_rescheduled(False, 600, 600)
+
+    def test_no_shutdown_when_node_running_job(self):
+        self.make_actor(4, testutil.arvados_node_mock(4, job_uuid=True))
+        self.check_shutdown_rescheduled(True, 600)
+
+    def test_no_shutdown_when_node_state_unknown(self):
+        self.make_actor(5, testutil.arvados_node_mock(5, info={}))
+        self.check_shutdown_rescheduled(True, 600)
+
+    def test_no_shutdown_when_node_state_stale(self):
+        self.make_actor(6, testutil.arvados_node_mock(6, age=90000))
+        self.check_shutdown_rescheduled(True, 600)
+
+    def test_arvados_node_match(self):
+        self.make_actor(2)
+        arv_node = testutil.arvados_node_mock(
+            2, hostname='compute-two.zzzzz.arvadosapi.com')
+        pair_id = self.node_actor.offer_arvados_pair(arv_node).get(self.TIMEOUT)
+        self.assertEqual(self.cloud_mock.id, pair_id)
+        self.stop_proxy(self.node_actor)
+        self.updates.sync_node.assert_called_with(self.cloud_mock, arv_node)
+
+    def test_arvados_node_mismatch(self):
+        self.make_actor(3)
+        arv_node = testutil.arvados_node_mock(1)
+        self.assertIsNone(
+            self.node_actor.offer_arvados_pair(arv_node).get(self.TIMEOUT))
+
+    def test_update_cloud_node(self):
+        self.make_actor(1)
+        self.make_mocks(2)
+        self.cloud_mock.id = '1'
+        self.node_actor.update_cloud_node(self.cloud_mock)
+        current_cloud = self.node_actor.cloud_node.get(self.TIMEOUT)
+        self.assertEqual([testutil.ip_address_mock(2)],
+                         current_cloud.private_ips)
+
+    def test_missing_cloud_node_update(self):
+        self.make_actor(1)
+        self.node_actor.update_cloud_node(None)
+        current_cloud = self.node_actor.cloud_node.get(self.TIMEOUT)
+        self.assertEqual([testutil.ip_address_mock(1)],
+                         current_cloud.private_ips)
+
+    def test_update_arvados_node(self):
+        self.make_actor(3)
+        job_uuid = 'zzzzz-jjjjj-updatejobnode00'
+        new_arvados = testutil.arvados_node_mock(3, job_uuid)
+        self.node_actor.update_arvados_node(new_arvados)
+        current_arvados = self.node_actor.arvados_node.get(self.TIMEOUT)
+        self.assertEqual(job_uuid, current_arvados['job_uuid'])
+
+    def test_missing_arvados_node_update(self):
+        self.make_actor(4, testutil.arvados_node_mock(4))
+        self.node_actor.update_arvados_node(None)
+        current_arvados = self.node_actor.arvados_node.get(self.TIMEOUT)
+        self.assertEqual(testutil.ip_address_mock(4),
+                         current_arvados['ip_address'])
similarity index 98%
rename from services/nodemanager/tests/test_computenode_ec2.py
rename to services/nodemanager/tests/test_computenode_driver_ec2.py
index d1c9e43fe47b9968bd13d28c1244f8b7e2efb879..fde103e10e606f68ca5e0b3ba262f0a350e6df64 100644 (file)
@@ -7,7 +7,7 @@ import unittest
 
 import mock
 
-import arvnodeman.computenode.ec2 as ec2
+import arvnodeman.computenode.driver.ec2 as ec2
 from . import testutil
 
 class EC2ComputeNodeDriverTestCase(unittest.TestCase):
index 4bffd092a164c75824453f08e00f3ea755315fcb..394fb88b80cf6ff10efc04fe79839c2eb42954ea 100644 (file)
@@ -8,8 +8,8 @@ import unittest
 import mock
 import pykka
 
-import arvnodeman.computenode as nmcnode
 import arvnodeman.daemon as nmdaemon
+from arvnodeman.computenode.dispatch import ComputeNodeMonitorActor
 from . import testutil
 
 class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
@@ -39,7 +39,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
             self.daemon.update_server_wishlist(want_sizes).get(self.TIMEOUT)
 
     def monitor_list(self):
-        return pykka.ActorRegistry.get_by_class(nmcnode.ComputeNodeMonitorActor)
+        return pykka.ActorRegistry.get_by_class(ComputeNodeMonitorActor)
 
     def alive_monitor_count(self):
         return sum(1 for actor in self.monitor_list() if actor.is_alive())