removeClass('active');
}
$this.toggleClass('active');
- any = ($container.
+
+ if (!$this.hasClass('use-preview-selection')) {
+ any = ($container.
find('.selectable.active').length > 0)
- $this.
+ }
+
+ if (!$container.hasClass('preview-selectable-container')) {
+ $this.
closest('.modal').
find('[data-enable-if-selection]').
prop('disabled', !any);
- if ($this.hasClass('active')) {
+ if ($this.hasClass('active')) {
var no_preview_available = '<div class="spinner-h-center spinner-v-center"><center>(No preview available)</center></div>';
if (!$this.attr('data-preview-href')) {
$(".modal-dialog-preview-pane").html(no_preview_available);
fail(function(data, status, jqxhr) {
$(".modal-dialog-preview-pane").html(no_preview_available);
});
+ }
+ } else {
+ any = ($container.
+ find('.preview-selectable.active').length > 0)
+ $(this).
+ closest('.modal').
+ find('[data-enable-if-selection]').
+ prop('disabled', !any);
}
}).on('click', '.modal button[data-action-href]', function() {
var action_data_from_params = $(this).data('action-data-from-params');
var selection_param = action_data.selection_param;
$modal.find('.modal-error').removeClass('hide').hide();
- $modal.find('.selectable.active[data-object-uuid]').each(function() {
+
+ var $preview_selections = $modal.find('.preview-selectable.active');
+ if ($preview_selections.length > 0) {
+ data.push({name: selection_param, value: $preview_selections.first().attr('href')});
+ }
+
+ if (data.length == 0) { // not using preview selection option
+ $modal.find('.selectable.active[data-object-uuid]').each(function() {
var val = $(this).attr('data-object-uuid');
data.push({name: selection_param, value: val});
- });
+ });
+ }
$.each($.extend({}, action_data, action_data_from_params),
function(key, value) {
if (value instanceof Array && key[-1] != ']') {
}
});
- $(window).on('load storage', update_count);
+ $(window).on('load', clear_selections);
+ $(window).on('storage', update_count);
$('#selection-form-content').on("click", function(e) {
e.stopPropagation();
if params[:partial]
f.json {
find_objects_for_index if !@objects
+ @objects.fetch_multiple_pages(false)
render json: {
content: render_to_string(partial: "choose_rows.html",
formats: [:html]),
end
end
- def choose
- # Find collections using default find_objects logic, then search for name
- # links, and preload any other links connected to the collections that are
- # found.
- # Name links will be obsolete when issue #3036 is merged,
- # at which point this entire custom #choose function can probably be
- # eliminated.
-
- params[:limit] ||= 40
-
- find_objects_for_index
- @collections = @objects
-
- @filters += [['link_class','=','name'],
- ['head_uuid','is_a','arvados#collection']]
-
- @objects = Link
- find_objects_for_index
-
- @name_links = @objects
-
- @objects = Collection.
- filter([['uuid','in',@name_links.collect(&:head_uuid)]])
-
- preload_links_for_objects (@collections.to_a + @objects.to_a)
- super
- end
-
def index
# API server index doesn't return manifest_text by default, but our
# callers want it unless otherwise specified.
base_search = Collection.select(@select)
if params[:search].andand.length.andand > 0
tags = Link.where(any: ['contains', params[:search]])
- @collections = (base_search.where(uuid: tags.collect(&:head_uuid)) |
+ @objects = (base_search.where(uuid: tags.collect(&:head_uuid)) |
base_search.where(any: ['contains', params[:search]])).
uniq { |c| c.uuid }
else
offset = 0
end
- @collections = base_search.limit(limit).offset(offset)
+ @objects = base_search.limit(limit).offset(offset)
end
- @links = Link.limit(1000).
- where(head_uuid: @collections.collect(&:uuid))
+ @links = Link.where(head_uuid: @objects.collect(&:uuid))
@collection_info = {}
- @collections.each do |c|
+ @objects.each do |c|
@collection_info[c.uuid] = {
tag_links: [],
wanted: false,
return super if !@object
if current_user
if Keep::Locator.parse params["uuid"]
- @same_pdh = Collection.filter([["portable_data_hash", "=", @object.portable_data_hash]]).limit(1000)
+ @same_pdh = Collection.filter([["portable_data_hash", "=", @object.portable_data_hash]])
if @same_pdh.results.size == 1
redirect_to collection_path(@same_pdh[0]["uuid"])
return
if component[:script_parameters]
component[:script_parameters].each do |param, value_info|
if value_info.is_a? Hash
- value_info_class = resource_class_for_uuid(value_info[:value])
+ value_info_partitioned = value_info[:value].partition('/') if value_info[:value].andand.class.eql?(String)
+ value_info_value = value_info_partitioned ? value_info_partitioned[0] : value_info[:value]
+ value_info_class = resource_class_for_uuid value_info_value
if value_info_class == Link
# Use the link target, not the link itself, as script
# parameter; but keep the link info around as well.
# to ensure reproducibility, the script_parameter for a
# collection should be the portable_data_hash
# keep the collection name and uuid for human-readability
- obj = Collection.find value_info[:value]
- value_info[:value] = obj.portable_data_hash
+ obj = Collection.find value_info_value
+ if value_info_partitioned
+ value_info[:value] = obj.portable_data_hash + value_info_partitioned[1] + value_info_partitioned[2]
+ value_info[:selection_name] = obj.name + value_info_partitioned[1] + value_info_partitioned[2]
+ else
+ value_info[:value] = obj.portable_data_hash
+ value_info[:selection_name] = obj.name
+ end
value_info[:selection_uuid] = obj.uuid
- value_info[:selection_name] = obj.name
end
end
end
def activity
@breadcrumb_page_name = nil
- @users = User.limit(params[:limit] || 1000).all
+ @users = User.limit(params[:limit])
@user_activity = {}
@activity = {
logins: {},
def storage
@breadcrumb_page_name = nil
- @users = User.limit(params[:limit] || 1000).all
+ @users = User.limit(params[:limit])
@user_storage = {}
total_storage = {}
@log_date = {}
@persist_state[uuid] = 'cache'
end
- Link.limit(1000).filter([['head_uuid', 'in', collection_uuids],
+ Link.filter([['head_uuid', 'in', collection_uuids],
['link_class', 'in', ['tag', 'resources']]]).
each do |link|
case link.link_class
dn += '[value]'
end
- if dataclass == Collection
+ if (dataclass == Collection) or (dataclass == File)
selection_param = object.class.to_s.underscore + dn
display_value = attrvalue
if value_info.is_a?(Hash)
end
end
if (attr == :components) and (subattr.size > 2)
- chooser_title = "Choose a dataset for #{object.component_input_title(subattr[0], subattr[2])}:"
+ chooser_title = "Choose a #{dataclass == Collection ? 'dataset' : 'file'} for #{object.component_input_title(subattr[0], subattr[2])}:"
else
- chooser_title = "Choose a dataset:"
+ chooser_title = "Choose a #{dataclass == Collection ? 'dataset' : 'file'}:"
end
modal_path = choose_collections_path \
({ title: chooser_title,
preconfigured_search_str: (preconfigured_search_str || ""),
action_data: {
merge: true,
+ use_preview_selection: dataclass == File ? true : nil,
selection_param: selection_param,
success: 'page-refresh'
}.to_json,
end
end
- def chooser_preview_url_for object
+ def chooser_preview_url_for object, use_preview_selection=false
case object.class.to_s
when 'Collection'
- polymorphic_path(object, tab_pane: 'chooser_preview')
+ polymorphic_path(object, tab_pane: 'chooser_preview', use_preview_selection: use_preview_selection)
else
nil
end
@client_mtx = Mutex.new
end
- def api(resources_kind, action, data=nil)
+ def api(resources_kind, action, data=nil, tokens={})
+
profile_checkpoint
if not @api_client
url.sub! '/arvados/v1/../../', '/'
query = {
- 'api_token' => Thread.current[:arvados_api_token] || '',
- 'reader_tokens' => (Thread.current[:reader_tokens] || []).to_json,
+ 'api_token' => tokens[:arvados_api_token] || Thread.current[:arvados_api_token] || '',
+ 'reader_tokens' => (tokens[:reader_tokens] || Thread.current[:reader_tokens] || []).to_json,
}
if !data.nil?
data.each do |k,v|
ArvadosResourceList.new(self).eager(*args)
end
- def self.all(*args)
- ArvadosResourceList.new(self).all(*args)
+ def self.all
+ ArvadosResourceList.new(self)
end
def self.permit_attribute_params raw_params
def initialize resource_class=nil
@resource_class = resource_class
+ @fetch_multiple_pages = true
+ @arvados_api_token = Thread.current[:arvados_api_token]
+ @reader_tokens = Thread.current[:reader_tokens]
end
def eager(bool=true)
end
def limit(max_results)
+ if not max_results.nil? and not max_results.is_a? Integer
+ raise ArgumentError("argument to limit() must be an Integer or nil")
+ end
@limit = max_results
self
end
end
def where(cond)
- cond = cond.dup
- cond.keys.each do |uuid_key|
- if cond[uuid_key] and (cond[uuid_key].is_a? Array or
- cond[uuid_key].is_a? ArvadosBase)
+ @cond = cond.dup
+ @cond.keys.each do |uuid_key|
+ if @cond[uuid_key] and (@cond[uuid_key].is_a? Array or
+ @cond[uuid_key].is_a? ArvadosBase)
# Coerce cond[uuid_key] to an array of uuid strings. This
# allows caller the convenience of passing an array of real
# objects and uuids in cond[uuid_key].
- if !cond[uuid_key].is_a? Array
- cond[uuid_key] = [cond[uuid_key]]
+ if !@cond[uuid_key].is_a? Array
+ @cond[uuid_key] = [@cond[uuid_key]]
end
- cond[uuid_key] = cond[uuid_key].collect do |item|
+ @cond[uuid_key] = @cond[uuid_key].collect do |item|
if item.is_a? ArvadosBase
item.uuid
else
end
end
end
- cond.keys.select { |x| x.match /_kind$/ }.each do |kind_key|
- if cond[kind_key].is_a? Class
- cond = cond.merge({ kind_key => 'arvados#' + arvados_api_client.class_kind(cond[kind_key]) })
+ @cond.keys.select { |x| x.match /_kind$/ }.each do |kind_key|
+ if @cond[kind_key].is_a? Class
+ @cond = @cond.merge({ kind_key => 'arvados#' + arvados_api_client.class_kind(@cond[kind_key]) })
end
end
- api_params = {
- _method: 'GET',
- where: cond
- }
- api_params[:eager] = '1' if @eager
- api_params[:limit] = @limit if @limit
- api_params[:offset] = @offset if @offset
- api_params[:select] = @select if @select
- api_params[:order] = @orderby_spec if @orderby_spec
- api_params[:filters] = @filters if @filters
- res = arvados_api_client.api @resource_class, '', api_params
- @results = arvados_api_client.unpack_api_response res
+ self
+ end
+
+ def fetch_multiple_pages(f)
+ @fetch_multiple_pages = f
self
end
def results
- self.where({}) if !@results
+ if !@results
+ @results = []
+ self.each_page do |r|
+ @results.concat r
+ end
+ end
@results
end
def results=(r)
@results = r
+ @items_available = r.items_available if r.respond_to? :items_available
+ @result_limit = r.limit if r.respond_to? :limit
+ @result_offset = r.offset if r.respond_to? :offset
+ @results
end
- def all
- where({})
+ def to_ary
+ results
end
def each(&block)
- results.each do |m|
- block.call m
+ if not @results.nil?
+ @results.each &block
+ else
+ self.each_page do |items|
+ items.each do |i|
+ block.call i
+ end
+ end
end
self
end
- def collect
- results.collect do |m|
- yield m
- end
- end
-
def first
results.first
end
end
end
- def to_ary
- results
- end
-
def to_hash
- Hash[results.collect { |x| [x.uuid, x] }]
+ Hash[self.collect { |x| [x.uuid, x] }]
end
def empty?
- results.empty?
+ self.first.nil?
end
def items_available
- results.items_available if results.respond_to? :items_available
+ @items_available
end
def result_limit
- results.limit if results.respond_to? :limit
+ @result_limit
end
def result_offset
- results.offset if results.respond_to? :offset
+ @result_offset
end
- def result_links
- results.links if results.respond_to? :links
+ # Obsolete method retained during api transition.
+ def links_for item_or_uuid, link_class=false
+ []
end
- # Return links provided with API response that point to the
- # specified object, and have the specified link_class. If link_class
- # is false or omitted, return all links pointing to the specified
- # object.
- def links_for item_or_uuid, link_class=false
- return [] if !result_links
- unless @links_for_uuid
- @links_for_uuid = {}
- result_links.each do |link|
- if link.respond_to? :head_uuid
- @links_for_uuid[link.head_uuid] ||= []
- @links_for_uuid[link.head_uuid] << link
- end
+ protected
+
+ def each_page
+ api_params = {
+ _method: 'GET'
+ }
+ api_params[:where] = @cond if @cond
+ api_params[:eager] = '1' if @eager
+ api_params[:select] = @select if @select
+ api_params[:order] = @orderby_spec if @orderby_spec
+ api_params[:filters] = @filters if @filters
+
+
+ item_count = 0
+ offset = @offset || 0
+ @result_limit = nil
+ @result_offset = nil
+
+ begin
+ api_params[:offset] = offset
+ api_params[:limit] = (@limit - item_count) if @limit
+
+ res = arvados_api_client.api(@resource_class, '', api_params,
+ arvados_api_token: @arvados_api_token,
+ reader_tokens: @reader_tokens)
+ items = arvados_api_client.unpack_api_response res
+
+ break if items.nil? or not items.any?
+
+ @items_available = items.items_available if items.respond_to?(:items_available)
+ @result_limit = items.limit if (@fetch_multiple_pages == false) and items.respond_to?(:limit)
+ @result_offset = items.offset if (@fetch_multiple_pages == false) and items.respond_to?(:offset)
+
+ item_count += items.size
+ if items.respond_to?(:offset)
+ offset = items.offset + items.size
+ else
+ offset = item_count
end
- end
- if item_or_uuid.respond_to? :uuid
- uuid = item_or_uuid.uuid
- else
- uuid = item_or_uuid
- end
- (@links_for_uuid[uuid] || []).select do |link|
- link_class == false or link.link_class == link_class
- end
- end
- # Note: this arbitrarily chooses one of (possibly) multiple names.
- def name_for item_or_uuid
- links_for(item_or_uuid, 'name').first.andand.name
+ yield items
+
+ break if @limit and item_count >= @limit
+ break if items.respond_to? :items_available and offset >= items.items_available
+ end while @fetch_multiple_pages
+ self
end
end
<% if project_filters.any? %>
data-infinite-content-params-from-project-dropdown="<%= {filters: project_filters, project_uuid: project_filters.last.last}.to_json %>"
<% end %>
- data-infinite-content-href="<%= url_for partial: true %>">
+ <%
+ action_data = JSON.parse params['action_data'] if params['action_data']
+ use_preview_sel = action_data ? action_data['use_preview_selection'] : false
+ %>
+ data-infinite-content-href="<%= url_for partial: true,
+ use_preview_selection: use_preview_sel %>">
</div>
<% if preview_pane %>
<div class="col-md-6 hidden-xs hidden-sm modal-dialog-preview-pane" style="height: 100%; overflow-y: scroll">
}
<% end %>
+<% results.fetch_multiple_pages(false) %>
+
<% if results.respond_to? :result_offset and
results.respond_to? :result_limit and
results.respond_to? :items_available and
results.result_offset != nil and
results.result_limit != nil and
- results.items_available != nil
+ results.items_available != nil
%>
<div class="index-paging">
- Displaying <%= results.result_offset+1 %> –
- <%= if results.result_offset + results.result_limit > results.items_available
- results.items_available
- else
- results.result_offset + results.result_limit
+ Displaying <%= results.result_offset+1 %> –
+ <%= if results.result_offset + results.result_limit > results.items_available
+ results.items_available
+ else
+ results.result_offset + results.result_limit
end %>
out of <%= results.items_available %>
</div>
<% if not (results.result_offset == 0 and results.items_available <= results.result_limit) %>
-
+
<div class="index-paging">
<% if results.result_offset > 0 %>
<% if results.result_offset > 0 %>
<%= link_to raw("<span class='glyphicon glyphicon-fast-backward'></span>"), {:id => object, :offset => 0, :limit => results.result_limit} %>
<% else %>
- <span class='glyphicon glyphicon-fast-backward text-muted'></span>
+ <span class='glyphicon glyphicon-fast-backward text-muted'></span>
<% end %>
<% if prev_offset %>
<% end %>
<% if (results.items_available - results.result_offset) >= results.result_limit %>
- <%= link_to raw("<span class='glyphicon glyphicon-fast-forward'></span>"), {:id => @object, :offset => results.items_available - (results.items_available % results.result_limit),
+ <%= link_to raw("<span class='glyphicon glyphicon-fast-forward'></span>"), {:id => @object, :offset => results.items_available - (results.items_available % results.result_limit),
:limit => results.result_limit} %>
<% else %>
- <span class='glyphicon glyphicon-fast-forward text-muted'></span>
+ <span class='glyphicon glyphicon-fast-forward text-muted'></span>
<% end %>
</span>
-<% @collections.each do |object| %>
- <div class="row filterable selectable" data-object-uuid="<%= object.uuid %>"
- data-preview-href="<%= chooser_preview_url_for object %>"
+<% @objects.each do |object| %>
+ <div class="row filterable selectable <%= 'use-preview-selection' if params['use_preview_selection']%>" data-object-uuid="<%= object.uuid %>"
+ data-preview-href="<%= chooser_preview_url_for object, params['use_preview_selection'] %>"
style="margin-left: 1em; border-bottom-style: solid; border-bottom-width: 1px; border-bottom-color: #DDDDDD">
<i class="fa fa-fw fa-archive"></i>
<% if object.respond_to? :name %>
<% end %>
</div>
<% end %>
-
-<% @name_links.each do |name_link| %>
- <% if (object = get_object(name_link.head_uuid)) %>
- <div class="row filterable selectable" data-object-uuid="<%= name_link.uuid %>"
- data-preview-href="<%= chooser_preview_url_for object %>"
- style="margin-left: 1em; border-bottom-style: solid; border-bottom-width: 1px; border-bottom-color: #DDDDDD">
- <i class="fa fa-fw fa-archive"></i>
- <%= name_link.name %>
- <% links_for_object(object).each do |tag| %>
- <% if tag.link_class == 'tag' %>
- <span class="label label-info"><%= tag.name %></span>
- <% end %>
- <% end %>
- </div>
- <% end %>
-<% end %>
-<% @collections.each do |c| %>
+<% @objects.each do |c| %>
<tr class="collection" data-object-uuid="<%= c.uuid %>">
<td>
<%= render partial: "show_source_summary" %>
-<%= render partial: "show_files", locals: {no_checkboxes: true} %>
+<%= render partial: "show_files", locals: {no_checkboxes: true, use_preview_selection: params['use_preview_selection']} %>
}
</script>
-<div class="selection-action-container" style="padding-left: 1em">
+<%
+ preview_selectable_container = ''
+ preview_selectable = ''
+ padding_left = '1em'
+ if !params['use_preview_selection'].nil? and params['use_preview_selection'] == 'true'
+ preview_selectable_container = 'preview-selectable-container selectable-container'
+ preview_selectable = 'preview-selectable selectable'
+ padding_left = '0em'
+ end
+%>
+
+<div class="selection-action-container" style="padding-left: <%=padding_left%>">
<% if !defined? no_checkboxes or !no_checkboxes %>
<div class="row">
<div class="pull-left">
<% if file_tree.nil? or file_tree.empty? %>
<p>This collection is empty.</p>
<% else %>
- <ul id="collection_files" class="collection_files">
+ <ul id="collection_files" class="collection_files <%=preview_selectable_container%>">
<% dirstack = [file_tree.first.first] %>
<% file_tree.take(10000).each_with_index do |(dirname, filename, size), index| %>
<% file_path = CollectionsHelper::file_path([dirname, filename]) %>
<% if size.nil? # This is a subdirectory. %>
<% dirstack.push(File.join(dirname, filename)) %>
<div class="collection_files_row">
- <div class="collection_files_name"><i class="fa fa-fw fa-folder-open"></i> <%= filename %></div>
+ <div class="collection_files_name><i class="fa fa-fw fa-folder-open"></i> <%= filename %></div>
</div>
<ul class="collection_files">
<% else %>
<% link_params = {controller: 'collections', action: 'show_file',
uuid: @object.portable_data_hash, file: file_path, size: size} %>
- <div class="collection_files_row filterable">
+ <div class="collection_files_row filterable <%=preview_selectable%>" href="<%=@object.uuid%>/<%=file_path%>">
<div class="collection_files_buttons pull-right">
<%= raw(human_readable_bytes_html(size)) %>
<% disable_search = (Rails.configuration.filename_suffixes_with_view_icon.include? file_path.split('.')[-1]) ? false : true %>
<div class="collection_files_name">
<% if !defined? no_checkboxes or !no_checkboxes %>
<%= check_box_tag 'uuids[]', "#{@object.uuid}/#{file_path}", false, {
- :class => 'persistent-selection',
+ :class => "persistent-selection",
:friendly_type => "File",
:friendly_name => "#{@object.uuid}/#{file_path}",
:href => url_for(controller: 'collections', action: 'show_file',
{title: file_path}) %>
</div>
<% else %>
- <i class="fa fa-fw fa-file"></i> <%= filename %></div>
+ <i class="fa fa-fw fa-file" href="<%=@object.uuid%>/<%=file_path%>" ></i> <%= filename %></div>
</div>
<% end %>
</li>
</div>
<p/>
-<%= render partial: "paging", locals: {results: @collections, object: @object} %>
+<%= render partial: "paging", locals: {results: @objects, object: @object} %>
<div style="padding-right: 1em">
</div>
-<%= render partial: "paging", locals: {results: @collections, object: @object} %>
+<%= render partial: "paging", locals: {results: @objects, object: @object} %>
<% content_for :footer_js do %>
$(document).on('click', 'form[data-remote] input[type=submit]', function() {
</ul>
</li>
- <li class="dropdown selection-menu">
- <a href="#" class="dropdown-toggle" data-toggle="dropdown">
- <span class="fa fa-lg fa-paperclip"></span>
- <span class="badge" id="persistent-selection-count"></span>
- </a>
- <ul class="dropdown-menu" role="menu" id="persistent-selection-list">
- <%= form_tag '/actions' do %>
- <%= hidden_field_tag 'uuid', @object.andand.uuid %>
- <div id="selection-form-content"></div>
- <% end %>
- </ul>
- </li>
-
<% if current_user.is_admin %>
<li class="dropdown">
<a href="#" class="dropdown-toggle" data-toggle="dropdown" id="system-menu">
uuid_map = {}
if @share_links
[User, Group].each do |type|
- type.limit(10000)
+ type
.filter([['uuid','in',@share_links.collect(&:tail_uuid)]])
.each do |o|
uuid_map[o.uuid] = o
by_project: false,
preview_pane: false,
multiple: true,
- limit: 10000,
filters: choose_filters[share_class].to_json,
action_method: 'post',
action_href: share_with_project_path,
</div>
<form>
<% permitted_group_perms = {}
- Link.limit(10000).filter([
+ Link.filter([
['tail_uuid', '=', @object.uuid],
['head_uuid', 'is_a', 'arvados#group'],
['link_class', '=', 'permission'],
instance_page = current_path
- # Go over to the collections page and select something
- visit '/collections'
- within('tr', text: 'GNU_General_Public_License') do
- find('input[type=checkbox]').click
- end
- find('#persistent-selection-count').click
-
# Add this collection to the project
visit '/projects'
find("#projects-menu").click
test 'Create pipeline inside a project and run' do
visit page_with_token('active_trustedclient')
- # Go over to the collections page and select something
- visit '/collections'
- within('tr', text: 'GNU_General_Public_License') do
- find('input[type=checkbox]').click
- end
- find('#persistent-selection-count').click
-
# Add this collection to the project using collections menu from top nav
visit '/projects'
find("#projects-menu").click
wait_for_ajax
end
- create_and_run_pipeline_in_aproject true
+ create_and_run_pipeline_in_aproject true, 'Two Part Pipeline Template', false
end
# Create a pipeline instance from outside of a project
test 'Run a pipeline from dashboard' do
visit page_with_token('active_trustedclient')
- create_and_run_pipeline_in_aproject false
+ create_and_run_pipeline_in_aproject false, 'Two Part Pipeline Template', false
end
test 'view pipeline with job and see graph' do
end
[
- ['active', false, false, false],
- ['active', false, false, true],
- ['active', true, false, false],
- ['active', true, true, false],
- ['active', true, false, true],
- ['active', true, true, true],
- ['project_viewer', false, false, true],
- ['project_viewer', true, false, true],
- ['project_viewer', true, true, true],
- ].each do |user, with_options, choose_options, in_aproject|
- test "Rerun pipeline instance as #{user} using options #{with_options} #{choose_options} in #{in_aproject}" do
+ ['active', false, false, false, 'Two Part Pipeline Template', false],
+ ['active', false, false, true, 'Two Part Pipeline Template', false],
+ ['active', true, false, false, 'Two Part Pipeline Template', false],
+ ['active', true, true, false, 'Two Part Pipeline Template', false],
+ ['active', true, false, true, 'Two Part Pipeline Template', false],
+ ['active', true, true, true, 'Two Part Pipeline Template', false],
+ ['project_viewer', false, false, true, 'Two Part Pipeline Template', false],
+ ['project_viewer', true, false, true, 'Two Part Pipeline Template', false],
+ ['project_viewer', true, true, true, 'Two Part Pipeline Template', false],
+ ['active', false, false, false, 'Two Part Template with dataclass File', true],
+ ['active', false, false, true, 'Two Part Template with dataclass File', true],
+ ].each do |user, with_options, choose_options, in_aproject, template_name, choose_file|
+ test "Rerun pipeline instance as #{user} using options #{with_options} #{choose_options}
+ in #{in_aproject} with #{template_name} with file #{choose_file}" do
visit page_with_token('active')
+ # need bigger modal size when choosing a file from collection
+ Capybara.current_session.driver.browser.manage.window.resize_to(1024, 768)
+
if in_aproject
find("#projects-menu").click
find('.dropdown-menu a,button', text: 'A Project').click
end
- create_and_run_pipeline_in_aproject in_aproject
+ create_and_run_pipeline_in_aproject in_aproject, template_name, choose_file
instance_path = current_path
# Pause the pipeline
end
# Create and run a pipeline for 'Two Part Pipeline Template' in 'A Project'
- def create_and_run_pipeline_in_aproject in_aproject
+ def create_and_run_pipeline_in_aproject in_aproject, template_name, choose_file
# create a pipeline instance
find('.btn', text: 'Run a pipeline').click
within('.modal-dialog') do
- find('.selectable', text: 'Two Part Pipeline Template').click
+ find('.selectable', text: template_name).click
find('.btn', text: 'Next: choose inputs').click
end
wait_for_ajax
end
first('span', text: 'foo_tag').click
+ if choose_file
+ wait_for_ajax
+ find('.preview-selectable', text: 'foo').click
+ end
find('button', text: 'OK').click
end
wait_for_ajax
click_link 'API response'
api_response = JSON.parse(find('div#advanced_api_response pre').text)
input_params = api_response['components']['part-one']['script_parameters']['input']
- assert_equal input_params['value'], col['portable_data_hash']
- assert_equal input_params['selection_name'], col['name']
- assert_equal input_params['selection_uuid'], col['uuid']
+ assert_equal(input_params['selection_uuid'], col['uuid'], "Not found expected input param uuid")
+ if choose_file
+ assert_equal(input_params['value'], col['portable_data_hash']+'/foo', "Not found expected input file param value")
+ assert_equal(input_params['selection_name'], col['name']+'/foo', "Not found expected input file param name")
+ else
+ assert_equal(input_params['value'], col['portable_data_hash'], "Not found expected input param value")
+ assert_equal(input_params['selection_name'], col['name'], "Not found expected input param name")
+ end
# "Run" button present and enabled
page.assert_no_selector 'a.disabled,button.disabled', text: 'Run'
assert_equal [], results.links_for(api_fixture('users')['active']['uuid'])
end
- test 'links_for returns all link classes (simulated results)' do
- project_uuid = api_fixture('groups')['aproject']['uuid']
- specimen_uuid = api_fixture('specimens')['in_aproject']['uuid']
- api_response = {
- kind: 'arvados#specimenList',
- links: [{kind: 'arvados#link',
- uuid: 'zzzzz-o0j2j-asdfasdfasdfas1',
- tail_uuid: project_uuid,
- head_uuid: specimen_uuid,
- link_class: 'foo',
- name: 'Bob'},
- {kind: 'arvados#link',
- uuid: 'zzzzz-o0j2j-asdfasdfasdfas2',
- tail_uuid: project_uuid,
- head_uuid: specimen_uuid,
- link_class: nil,
- name: 'Clydesdale'}],
- items: [{kind: 'arvados#specimen',
- uuid: specimen_uuid}]
- }
- arl = ArvadosResourceList.new
- arl.results = ArvadosApiClient.new.unpack_api_response(api_response)
- assert_equal(['foo', nil],
- (arl.
- links_for(specimen_uuid).
- collect { |x| x.link_class }),
- "Expected links_for to return all link_classes")
+ test 'get all items by default' do
+ use_token :admin
+ a = 0
+ Collection.where(owner_uuid: 'zzzzz-j7d0g-0201collections').each do
+ a += 1
+ end
+ assert_equal 201, a
+ end
+
+ test 'prefetch all items' do
+ use_token :admin
+ a = 0
+ Collection.where(owner_uuid: 'zzzzz-j7d0g-0201collections').each do
+ a += 1
+ end
+ assert_equal 201, a
+ end
+
+ test 'get limited items' do
+ use_token :admin
+ a = 0
+ Collection.where(owner_uuid: 'zzzzz-j7d0g-0201collections').limit(51).each do
+ a += 1
+ end
+ assert_equal 51, a
+ end
+
+ test 'get limited items, limit % page_size != 0' do
+ skip "Requires server MAX_LIMIT < 200 which is not currently the default"
+
+ use_token :admin
+ max_page_size = Collection.
+ where(owner_uuid: 'zzzzz-j7d0g-0201collections').
+ limit(1000000000).
+ fetch_multiple_pages(false).
+ count
+ # Conditions necessary for this test to be valid:
+ assert_operator 200, :>, max_page_size
+ assert_operator 1, :<, max_page_size
+ # Verify that the server really sends max_page_size when asked for max_page_size+1
+ assert_equal max_page_size, Collection.
+ where(owner_uuid: 'zzzzz-j7d0g-0201collections').
+ limit(max_page_size+1).
+ fetch_multiple_pages(false).
+ results.
+ count
+ # Now that we know the max_page_size+1 is in the middle of page 2,
+ # make sure #each returns page 1 and only the requested part of
+ # page 2.
+ a = 0
+ saw_uuid = {}
+ Collection.where(owner_uuid: 'zzzzz-j7d0g-0201collections').limit(max_page_size+1).each do |item|
+ a += 1
+ saw_uuid[item.uuid] = true
+ end
+ assert_equal max_page_size+1, a
+ # Ensure no overlap between pages
+ assert_equal max_page_size+1, saw_uuid.size
+ end
+
+ test 'get single page of items' do
+ use_token :admin
+ a = 0
+ c = Collection.where(owner_uuid: 'zzzzz-j7d0g-0201collections').fetch_multiple_pages(false)
+ c.each do
+ a += 1
+ end
+
+ assert_operator a, :<, 201
+ assert_equal c.result_limit, a
end
end
logger.error(pprint.pformat(taskp))
sys.exit(1)
+# rcode holds the return codes produced by each subprocess
+rcode = {}
try:
subprocesses = []
close_streams = []
active = 1
pids = set([s.pid for s in subprocesses])
- rcode = {}
while len(pids) > 0:
(pid, status) = os.wait()
pids.discard(pid)
outcollection = robust_put.upload(outdir, logger)
# Success if no non-zero return codes
-success = not any([status != 0 for status in rcode.values()])
+success = any(rcode) and not any([status != 0 for status in rcode.values()])
api.job_tasks().update(uuid=arvados.current_task()['uuid'],
body={
|_. Attribute|_. Type|_. Description|_. Notes|
|script|string|The filename of the job script.|This program will be invoked by Crunch for each job task. It is given as a path to an executable file, relative to the @/crunch_scripts@ directory in the Git tree specified by the _repository_ and _script_version_ attributes.|
|script_parameters|hash|The input parameters for the job.|Conventionally, one of the parameters is called @"input"@. Typically, some parameter values are collection UUIDs. Ultimately, though, the significance of parameters is left entirely up to the script itself.|
-|repository|string|Git repository|Given as the name of a locally hosted git repository.|
+|repository|string|Git repository|Given as the name of a locally hosted Git repository.|
|script_version|string|Git commit|During a **create** transaction, this is the Git branch, tag, or hash supplied by the client. Before the job starts, Arvados updates it to the full 40-character SHA-1 hash of the commit used by the job.
-See "Script versions":#script_version below for more detail about acceptable ways to specify a commit.|
+See "Specifying Git versions":#script_version below for more detail about acceptable ways to specify a commit.|
|cancelled_by_client_uuid|string|API client ID|Is null if job has not been cancelled|
|cancelled_by_user_uuid|string|Authenticated user ID|Is null if job has not been cancelled|
|cancelled_at|datetime|When job was cancelled|Is null if job has not been cancelled|
|nondeterministic|boolean|The job is expected to produce different results if run more than once.|If true, this job will not be considered as a candidate for automatic re-use when submitting subsequent identical jobs.|
|submit_id|string|Unique ID provided by client when job was submitted|Optional. This can be used by a client to make the "jobs.create":{{site.baseurl}}/api/methods/jobs.html#create method idempotent.|
|priority|string|||
+|arvados_sdk_version|string|Git commit hash that specifies the SDK version to use from the Arvados repository|This is set by searching the Arvados repository for a match for the arvados_sdk_version runtime constraint.|
+|docker_image_locator|string|Portable data hash of the collection that contains the Docker image to use|This is set by searching readable collections for a match for the docker_image runtime constraint.|
|runtime_constraints|hash|Constraints that must be satisfied by the job/task scheduler in order to run the job.|See below.|
-h3(#script_version). Script versions
+h3(#script_version). Specifying Git versions
-The @script_version@ attribute is typically given as a branch, tag, or commit hash, but there are many more ways to specify a Git commit. The "specifying revisions" section of the "gitrevisions manual page":http://git-scm.com/docs/gitrevisions.html has a definitive list. Arvados accepts @script_version@ in any format listed there that names a single commit (not a tree, a blob, or a range of commits). However, some kinds of names can be expected to resolve differently in Arvados than they do in your local repository. For example, <code>HEAD@{1}</code> refers to the local reflog, and @origin/master@ typically refers to a remote branch: neither is likely to work as desired if given as a @script_version@.
+The script_version attribute and arvados_sdk_version runtime constraint are typically given as a branch, tag, or commit hash, but there are many more ways to specify a Git commit. The "specifying revisions" section of the "gitrevisions manual page":http://git-scm.com/docs/gitrevisions.html has a definitive list. Arvados accepts Git versions in any format listed there that names a single commit (not a tree, a blob, or a range of commits). However, some kinds of names can be expected to resolve differently in Arvados than they do in your local repository. For example, <code>HEAD@{1}</code> refers to the local reflog, and @origin/master@ typically refers to a remote branch: neither is likely to work as desired if given as a Git version.
h3. Runtime constraints
table(table table-bordered table-condensed).
|_. Key|_. Type|_. Description|_. Implemented|
-|docker_image|string|The Docker image that this Job needs to run. If specified, Crunch will create a Docker container from this image, and run the Job's script inside that. The Keep mount and work directories will be available as volumes inside this container. The image must be uploaded to Arvados using @arv keep docker@. You may specify the image in any format that Docker accepts, such as @arvados/jobs@, @debian:latest@, or the Docker image id. Alternatively, you may specify the UUID of the image Collection, returned by @arv keep docker@.|✓|
+|arvados_sdk_version|string|The Git version of the SDKs to use from the Arvados git repository. See "Specifying Git versions":#script_version for more detail about acceptable ways to specify a commit.||
+|docker_image|string|The Docker image that this Job needs to run. If specified, Crunch will create a Docker container from this image, and run the Job's script inside that. The Keep mount and work directories will be available as volumes inside this container. The image must be uploaded to Arvados using @arv keep docker@. You may specify the image in any format that Docker accepts, such as @arvados/jobs@, @debian:latest@, or the Docker image id. Alternatively, you may specify the UUID or portable data hash of the image Collection, returned by @arv keep docker@.|✓|
|min_nodes|integer||✓|
|max_nodes|integer|||
|min_cores_per_node|integer|Require that each node assigned to this Job have the specified number of CPU cores|✓|
h3. Subcommands
See the "arv subcommands":{{site.baseurl}}/sdk/cli/subcommands.html page.
-
<notextile>
<pre>
$ <code class="userinput">arv tag --help</code>
-arvados cli client
+
+Usage:
+arv tag add tag1 [tag2 ...] --object object_uuid1 [object_uuid2...]
+arv tag remove tag1 [tag2 ...] --object object_uuid1 [object_uuid2...]
+arv tag remove --all
+
--dry-run, -n: Don't actually do anything
--verbose, -v: Print some things on stderr
--uuid, -u: Return the UUIDs of the objects in the response, one per
script_version field of component, default 'master'
</pre>
</notextile>
-
arv_create client, arvados, global_opts, remaining_opts
when 'edit'
arv_edit client, arvados, global_opts, remaining_opts
- when 'copy', 'tag', 'ws'
+ when 'copy', 'tag', 'ws', 'run'
exec `which arv-#{subcommand}`.strip, *remaining_opts
when 'keep'
@sub = remaining_opts.shift
puts "Available methods: run"
end
abort
- when 'run'
- exec `which arv-run`.strip, *remaining_opts
- when 'tag'
- exec `which arv-tag`.strip, *remaining_opts
- when 'ws'
- exec `which arv-ws`.strip, *remaining_opts
end
end
# arv tag remove tag1 [tag2 ...] --object obj_uuid1 [--object obj_uuid2 ...]
# arv tag remove tag1 [tag2 ...] --all
-def usage
- abort "Usage:\n" +
- "arv tag add tag1 [tag2 ...] --objects object_uuid1 [object_uuid2...]\n" +
- "arv tag remove tag1 [tag2 ...] --objects object_uuid1 [object_uuid2...]\n" +
+def usage_string
+ return "\nUsage:\n" +
+ "arv tag add tag1 [tag2 ...] --object object_uuid1 [object_uuid2...]\n" +
+ "arv tag remove tag1 [tag2 ...] --object object_uuid1 [object_uuid2...]\n" +
"arv tag remove --all\n"
end
+def usage
+ abort usage_string
+end
+
def api_call(method, parameters:{}, request_body:{})
request_body[:api_token] = ENV['ARVADOS_API_TOKEN']
result = $client.execute(:api_method => method,
end
global_opts = Trollop::options do
- banner "arvados cli client"
+ banner usage_string
+ banner ""
opt :dry_run, "Don't actually do anything", :short => "-n"
opt :verbose, "Print some things on stderr", :short => "-v"
opt :uuid, "Return the UUIDs of the objects in the response, one per line (default)", :short => nil
"log"
"net/http"
"regexp"
- "sort"
"strings"
"sync"
"sync/atomic"
Arvados *arvadosclient.ArvadosClient
Want_replicas int
Using_proxy bool
- service_roots *[]string
+ service_roots *map[string]string
lock sync.Mutex
Client *http.Client
}
contentLength int64, url string, err error) {
// Calculate the ordering for asking servers
- sv := this.shuffledServiceRoots(hash)
+ sv := NewRootSorter(this.ServiceRoots(), hash).GetSortedRoots()
for _, host := range sv {
var req *http.Request
func (this KeepClient) AuthorizedAsk(hash string, signature string,
timestamp string) (contentLength int64, url string, err error) {
// Calculate the ordering for asking servers
- sv := this.shuffledServiceRoots(hash)
+ sv := NewRootSorter(this.ServiceRoots(), hash).GetSortedRoots()
for _, host := range sv {
var req *http.Request
}
// Atomically read the service_roots field.
-func (this *KeepClient) ServiceRoots() []string {
- r := (*[]string)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&this.service_roots))))
+func (this *KeepClient) ServiceRoots() map[string]string {
+ r := (*map[string]string)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&this.service_roots))))
return *r
}
// Atomically update the service_roots field. Enables you to update
// service_roots without disrupting any GET or PUT operations that might
// already be in progress.
-func (this *KeepClient) SetServiceRoots(svc []string) {
- // Must be sorted for ShuffledServiceRoots() to produce consistent
- // results.
- roots := make([]string, len(svc))
- copy(roots, svc)
- sort.Strings(roots)
+func (this *KeepClient) SetServiceRoots(new_roots map[string]string) {
+ roots := make(map[string]string)
+ for uuid, root := range new_roots {
+ roots[uuid] = root
+ }
atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(&this.service_roots)),
unsafe.Pointer(&roots))
}
c.Assert(err, Equals, nil)
c.Check(len(kc.ServiceRoots()), Equals, 2)
- c.Check(kc.ServiceRoots()[0], Equals, "http://localhost:25107")
- c.Check(kc.ServiceRoots()[1], Equals, "http://localhost:25108")
-}
-
-func (s *StandaloneSuite) TestShuffleServiceRoots(c *C) {
- kc := KeepClient{}
- kc.SetServiceRoots([]string{"http://localhost:25107", "http://localhost:25108", "http://localhost:25109", "http://localhost:25110", "http://localhost:25111", "http://localhost:25112", "http://localhost:25113", "http://localhost:25114", "http://localhost:25115", "http://localhost:25116", "http://localhost:25117", "http://localhost:25118", "http://localhost:25119", "http://localhost:25120", "http://localhost:25121", "http://localhost:25122", "http://localhost:25123"})
-
- // "foo" acbd18db4cc2f85cedef654fccc4a4d8
- foo_shuffle := []string{"http://localhost:25116", "http://localhost:25120", "http://localhost:25119", "http://localhost:25122", "http://localhost:25108", "http://localhost:25114", "http://localhost:25112", "http://localhost:25107", "http://localhost:25118", "http://localhost:25111", "http://localhost:25113", "http://localhost:25121", "http://localhost:25110", "http://localhost:25117", "http://localhost:25109", "http://localhost:25115", "http://localhost:25123"}
- c.Check(kc.shuffledServiceRoots("acbd18db4cc2f85cedef654fccc4a4d8"), DeepEquals, foo_shuffle)
-
- // "bar" 37b51d194a7513e45b56f6524f2d51f2
- bar_shuffle := []string{"http://localhost:25108", "http://localhost:25112", "http://localhost:25119", "http://localhost:25107", "http://localhost:25110", "http://localhost:25116", "http://localhost:25122", "http://localhost:25120", "http://localhost:25121", "http://localhost:25117", "http://localhost:25111", "http://localhost:25123", "http://localhost:25118", "http://localhost:25113", "http://localhost:25114", "http://localhost:25115", "http://localhost:25109"}
- c.Check(kc.shuffledServiceRoots("37b51d194a7513e45b56f6524f2d51f2"), DeepEquals, bar_shuffle)
+ for _, root := range kc.ServiceRoots() {
+ c.Check(root, Matches, "http://localhost:2510[\\d]")
+ }
}
type StubPutHandler struct {
func (s *StandaloneSuite) TestPutB(c *C) {
log.Printf("TestPutB")
- hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
+ hash := Md5String("foo")
st := StubPutHandler{
c,
hash,
"abc123",
"foo",
- make(chan string, 2)}
+ make(chan string, 5)}
arv, _ := arvadosclient.MakeArvadosClient()
kc, _ := MakeKeepClient(&arv)
kc.Want_replicas = 2
arv.ApiToken = "abc123"
- service_roots := make([]string, 5)
+ service_roots := make(map[string]string)
ks := RunSomeFakeKeepServers(st, 5, 2990)
for i := 0; i < len(ks); i += 1 {
- service_roots[i] = ks[i].url
+ service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = ks[i].url
defer ks[i].listener.Close()
}
kc.PutB([]byte("foo"))
- shuff := kc.shuffledServiceRoots(fmt.Sprintf("%x", md5.Sum([]byte("foo"))))
+ shuff := NewRootSorter(
+ kc.ServiceRoots(), Md5String("foo")).GetSortedRoots()
s1 := <-st.handled
s2 := <-st.handled
hash,
"abc123",
"foo",
- make(chan string, 2)}
+ make(chan string, 5)}
arv, _ := arvadosclient.MakeArvadosClient()
kc, _ := MakeKeepClient(&arv)
kc.Want_replicas = 2
arv.ApiToken = "abc123"
- service_roots := make([]string, 5)
+ service_roots := make(map[string]string)
ks := RunSomeFakeKeepServers(st, 5, 2990)
for i := 0; i < len(ks); i += 1 {
- service_roots[i] = ks[i].url
+ service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = ks[i].url
defer ks[i].listener.Close()
}
kc.PutHR(hash, reader, 3)
- shuff := kc.shuffledServiceRoots(hash)
+ shuff := NewRootSorter(kc.ServiceRoots(), hash).GetSortedRoots()
log.Print(shuff)
s1 := <-st.handled
hash,
"abc123",
"foo",
- make(chan string, 2)}
+ make(chan string, 4)}
fh := FailHandler{
make(chan string, 1)}
kc.Want_replicas = 2
arv.ApiToken = "abc123"
- service_roots := make([]string, 5)
+ service_roots := make(map[string]string)
ks1 := RunSomeFakeKeepServers(st, 4, 2990)
ks2 := RunSomeFakeKeepServers(fh, 1, 2995)
for i, k := range ks1 {
- service_roots[i] = k.url
+ service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
defer k.listener.Close()
}
for i, k := range ks2 {
- service_roots[len(ks1)+i] = k.url
+ service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
defer k.listener.Close()
}
kc.SetServiceRoots(service_roots)
- shuff := kc.shuffledServiceRoots(fmt.Sprintf("%x", md5.Sum([]byte("foo"))))
+ shuff := NewRootSorter(
+ kc.ServiceRoots(), Md5String("foo")).GetSortedRoots()
phash, replicas, err := kc.PutB([]byte("foo"))
kc.Want_replicas = 2
arv.ApiToken = "abc123"
- service_roots := make([]string, 5)
+ service_roots := make(map[string]string)
ks1 := RunSomeFakeKeepServers(st, 1, 2990)
ks2 := RunSomeFakeKeepServers(fh, 4, 2991)
for i, k := range ks1 {
- service_roots[i] = k.url
+ service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
defer k.listener.Close()
}
for i, k := range ks2 {
- service_roots[len(ks1)+i] = k.url
+ service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
defer k.listener.Close()
}
kc.SetServiceRoots(service_roots)
- shuff := kc.shuffledServiceRoots(fmt.Sprintf("%x", md5.Sum([]byte("foo"))))
-
_, replicas, err := kc.PutB([]byte("foo"))
c.Check(err, Equals, InsufficientReplicasError)
c.Check(replicas, Equals, 1)
- c.Check(<-st.handled, Equals, shuff[1])
+ c.Check(<-st.handled, Matches, ".*2990")
log.Printf("TestPutWithTooManyFail done")
}
arv, err := arvadosclient.MakeArvadosClient()
kc, _ := MakeKeepClient(&arv)
arv.ApiToken = "abc123"
- kc.SetServiceRoots([]string{url})
+ kc.SetServiceRoots(map[string]string{"x":url})
r, n, url2, err := kc.Get(hash)
defer r.Close()
arv, err := arvadosclient.MakeArvadosClient()
kc, _ := MakeKeepClient(&arv)
arv.ApiToken = "abc123"
- kc.SetServiceRoots([]string{url})
+ kc.SetServiceRoots(map[string]string{"x":url})
r, n, url2, err := kc.Get(hash)
c.Check(err, Equals, BlockNotFound)
arv, err := arvadosclient.MakeArvadosClient()
kc, _ := MakeKeepClient(&arv)
arv.ApiToken = "abc123"
- kc.SetServiceRoots([]string{url})
+ kc.SetServiceRoots(map[string]string{"x":url})
r, n, _, err := kc.Get(barhash)
_, err = ioutil.ReadAll(r)
}
func (s *StandaloneSuite) TestGetWithFailures(c *C) {
-
- hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
+ content := []byte("waz")
+ hash := fmt.Sprintf("%x", md5.Sum(content))
fh := FailHandler{
- make(chan string, 1)}
+ make(chan string, 4)}
st := StubGetHandler{
c,
hash,
"abc123",
- []byte("foo")}
+ content}
arv, err := arvadosclient.MakeArvadosClient()
kc, _ := MakeKeepClient(&arv)
arv.ApiToken = "abc123"
- service_roots := make([]string, 5)
+ service_roots := make(map[string]string)
ks1 := RunSomeFakeKeepServers(st, 1, 2990)
ks2 := RunSomeFakeKeepServers(fh, 4, 2991)
for i, k := range ks1 {
- service_roots[i] = k.url
+ service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
defer k.listener.Close()
}
for i, k := range ks2 {
- service_roots[len(ks1)+i] = k.url
+ service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
defer k.listener.Close()
}
kc.SetServiceRoots(service_roots)
+ // This test works only if one of the failing services is
+ // attempted before the succeeding service. Otherwise,
+ // <-fh.handled below will just hang! (Probe order depends on
+ // the choice of block content "waz" and the UUIDs of the fake
+ // servers, so we just tried different strings until we found
+ // an example that passes this Assert.)
+ c.Assert(NewRootSorter(service_roots, hash).GetSortedRoots()[0], Matches, ".*299[1-4]")
+
r, n, url2, err := kc.Get(hash)
+
<-fh.handled
c.Check(err, Equals, nil)
c.Check(n, Equals, int64(3))
c.Check(url2, Equals, fmt.Sprintf("%s/%s", ks1[0].url, hash))
- content, err2 := ioutil.ReadAll(r)
+ read_content, err2 := ioutil.ReadAll(r)
c.Check(err2, Equals, nil)
- c.Check(content, DeepEquals, []byte("foo"))
+ c.Check(read_content, DeepEquals, content)
}
func (s *ServerRequiredSuite) TestPutGetHead(c *C) {
os.Setenv("ARVADOS_API_HOST", "localhost:3000")
os.Setenv("ARVADOS_API_TOKEN", "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h")
os.Setenv("ARVADOS_API_HOST_INSECURE", "true")
+ content := []byte("TestPutGetHead")
arv, err := arvadosclient.MakeArvadosClient()
kc, err := MakeKeepClient(&arv)
c.Assert(err, Equals, nil)
- hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
+ hash := fmt.Sprintf("%x", md5.Sum(content))
{
n, _, err := kc.Ask(hash)
c.Check(n, Equals, int64(0))
}
{
- hash2, replicas, err := kc.PutB([]byte("foo"))
- c.Check(hash2, Equals, fmt.Sprintf("%s+%v", hash, 3))
+ hash2, replicas, err := kc.PutB(content)
+ c.Check(hash2, Equals, fmt.Sprintf("%s+%d", hash, len(content)))
c.Check(replicas, Equals, 2)
c.Check(err, Equals, nil)
}
{
r, n, url2, err := kc.Get(hash)
c.Check(err, Equals, nil)
- c.Check(n, Equals, int64(3))
+ c.Check(n, Equals, int64(len(content)))
c.Check(url2, Equals, fmt.Sprintf("http://localhost:25108/%s", hash))
- content, err2 := ioutil.ReadAll(r)
+ read_content, err2 := ioutil.ReadAll(r)
c.Check(err2, Equals, nil)
- c.Check(content, DeepEquals, []byte("foo"))
+ c.Check(read_content, DeepEquals, content)
}
{
n, url2, err := kc.Ask(hash)
c.Check(err, Equals, nil)
- c.Check(n, Equals, int64(3))
+ c.Check(n, Equals, int64(len(content)))
c.Check(url2, Equals, fmt.Sprintf("http://localhost:25108/%s", hash))
}
}
kc.Want_replicas = 2
kc.Using_proxy = true
arv.ApiToken = "abc123"
- service_roots := make([]string, 1)
+ service_roots := make(map[string]string)
ks1 := RunSomeFakeKeepServers(st, 1, 2990)
for i, k := range ks1 {
- service_roots[i] = k.url
+ service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
defer k.listener.Close()
}
kc.Want_replicas = 3
kc.Using_proxy = true
arv.ApiToken = "abc123"
- service_roots := make([]string, 1)
+ service_roots := make(map[string]string)
ks1 := RunSomeFakeKeepServers(st, 1, 2990)
for i, k := range ks1 {
- service_roots[i] = k.url
+ service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
defer k.listener.Close()
}
kc.SetServiceRoots(service_roots)
--- /dev/null
+package keepclient
+
+import (
+ "sort"
+)
+
+type RootSorter struct {
+ root []string
+ weight []string
+ order []int
+}
+
+func NewRootSorter(serviceRoots map[string]string, hash string) (*RootSorter) {
+ rs := new(RootSorter)
+ rs.root = make([]string, len(serviceRoots))
+ rs.weight = make([]string, len(serviceRoots))
+ rs.order = make([]int, len(serviceRoots))
+ i := 0
+ for uuid, root := range serviceRoots {
+ rs.root[i] = root
+ rs.weight[i] = rs.getWeight(hash, uuid)
+ rs.order[i] = i
+ i++
+ }
+ sort.Sort(rs)
+ return rs
+}
+
+func (rs RootSorter) getWeight(hash string, uuid string) (string) {
+ if len(uuid) == 27 {
+ return Md5String(hash + uuid[12:])
+ } else {
+ // Only useful for testing, a set of one service root, etc.
+ return Md5String(hash + uuid)
+ }
+}
+
+func (rs RootSorter) GetSortedRoots() ([]string) {
+ sorted := make([]string, len(rs.order))
+ for i := range rs.order {
+ sorted[i] = rs.root[rs.order[i]]
+ }
+ return sorted
+}
+
+// Less is really More here: the heaviest root will be at the front of the list.
+func (rs RootSorter) Less(i, j int) bool {
+ return rs.weight[rs.order[j]] < rs.weight[rs.order[i]]
+}
+
+func (rs RootSorter) Len() int {
+ return len(rs.order)
+}
+
+func (rs RootSorter) Swap(i, j int) {
+ sort.IntSlice(rs.order).Swap(i, j)
+}
--- /dev/null
+package keepclient
+
+import (
+ "fmt"
+ . "gopkg.in/check.v1"
+ "strconv"
+ "strings"
+)
+
+type RootSorterSuite struct{}
+var _ = Suite(&RootSorterSuite{})
+
+func FakeSvcRoot(i uint64) (string) {
+ return fmt.Sprintf("https://%x.svc/", i)
+}
+
+func FakeSvcUuid(i uint64) (string) {
+ return fmt.Sprintf("zzzzz-bi6l4-%015x", i)
+}
+
+func FakeServiceRoots(n uint64) (map[string]string) {
+ sr := map[string]string{}
+ for i := uint64(0); i < n; i ++ {
+ sr[FakeSvcUuid(i)] = FakeSvcRoot(i)
+ }
+ return sr
+}
+
+func (*RootSorterSuite) EmptyRoots(c *C) {
+ rs := NewRootSorter(map[string]string{}, Md5String("foo"))
+ c.Check(rs.GetSortedRoots(), Equals, []string{})
+}
+
+func (*RootSorterSuite) JustOneRoot(c *C) {
+ rs := NewRootSorter(FakeServiceRoots(1), Md5String("foo"))
+ c.Check(rs.GetSortedRoots(), Equals, []string{FakeSvcRoot(0)})
+}
+
+func (*RootSorterSuite) ReferenceSet(c *C) {
+ fakeroots := FakeServiceRoots(16)
+ // These reference probe orders are explained further in
+ // ../../python/tests/test_keep_client.py:
+ expected_orders := []string{
+ "3eab2d5fc9681074",
+ "097dba52e648f1c3",
+ "c5b4e023f8a7d691",
+ "9d81c02e76a3bf54",
+ }
+ for h, expected_order := range expected_orders {
+ hash := Md5String(fmt.Sprintf("%064x", h))
+ roots := NewRootSorter(fakeroots, hash).GetSortedRoots()
+ for i, svc_id_s := range strings.Split(expected_order, "") {
+ svc_id, err := strconv.ParseUint(svc_id_s, 16, 64)
+ c.Assert(err, Equals, nil)
+ c.Check(roots[i], Equals, FakeSvcRoot(svc_id))
+ }
+ }
+}
package keepclient
import (
+ "crypto/md5"
"git.curoverse.com/arvados.git/sdk/go/streamer"
"errors"
"fmt"
"log"
"net/http"
"os"
- "strconv"
"strings"
)
type keepDisk struct {
+ Uuid string `json:"uuid"`
Hostname string `json:"service_host"`
Port int `json:"service_port"`
SSL bool `json:"service_ssl_flag"`
SvcType string `json:"service_type"`
}
+func Md5String(s string) (string) {
+ return fmt.Sprintf("%x", md5.Sum([]byte(s)))
+}
+
func (this *KeepClient) DiscoverKeepServers() error {
if prx := os.Getenv("ARVADOS_KEEP_PROXY"); prx != "" {
- this.SetServiceRoots([]string{prx})
+ sr := map[string]string{"proxy":prx}
+ this.SetServiceRoots(sr)
this.Using_proxy = true
return nil
}
}
listed := make(map[string]bool)
- service_roots := make([]string, 0, len(m.Items))
+ service_roots := make(map[string]string)
for _, element := range m.Items {
n := ""
// Skip duplicates
if !listed[url] {
listed[url] = true
- service_roots = append(service_roots, url)
+ service_roots[element.Uuid] = url
}
if element.SvcType == "proxy" {
this.Using_proxy = true
return nil
}
-func (this KeepClient) shuffledServiceRoots(hash string) (pseq []string) {
- // Build an ordering with which to query the Keep servers based on the
- // contents of the hash. "hash" is a hex-encoded number at least 8
- // digits (32 bits) long
-
- // seed used to calculate the next keep server from 'pool' to be added
- // to 'pseq'
- seed := hash
-
- // Keep servers still to be added to the ordering
- service_roots := this.ServiceRoots()
- pool := make([]string, len(service_roots))
- copy(pool, service_roots)
-
- // output probe sequence
- pseq = make([]string, 0, len(service_roots))
-
- // iterate while there are servers left to be assigned
- for len(pool) > 0 {
-
- if len(seed) < 8 {
- // ran out of digits in the seed
- if len(pseq) < (len(hash) / 4) {
- // the number of servers added to the probe
- // sequence is less than the number of 4-digit
- // slices in 'hash' so refill the seed with the
- // last 4 digits.
- seed = hash[len(hash)-4:]
- }
- seed += hash
- }
-
- // Take the next 8 digits (32 bytes) and interpret as an integer,
- // then modulus with the size of the remaining pool to get the next
- // selected server.
- probe, _ := strconv.ParseUint(seed[0:8], 16, 32)
- probe %= uint64(len(pool))
-
- // Append the selected server to the probe sequence and remove it
- // from the pool.
- pseq = append(pseq, pool[probe])
- pool = append(pool[:probe], pool[probe+1:]...)
-
- // Remove the digits just used from the seed
- seed = seed[8:]
- }
- return pseq
-}
-
type uploadStatus struct {
err error
url string
expectedLength int64) (locator string, replicas int, err error) {
// Calculate the ordering for uploading to servers
- sv := this.shuffledServiceRoots(hash)
+ sv := NewRootSorter(this.ServiceRoots(), hash).GetSortedRoots()
// The next server to try contacting
next_server := 0
import socket
import requests
-_logger = logging.getLogger('arvados.keep')
-global_client_object = None
-
import arvados
import arvados.config as config
import arvados.errors
import arvados.retry as retry
import arvados.util
+_logger = logging.getLogger('arvados.keep')
+global_client_object = None
+
class KeepLocator(object):
EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
if not proxy.endswith('/'):
proxy += '/'
self.api_token = api_token
- self.service_roots = [proxy]
+ self._keep_services = [{
+ 'uuid': 'proxy',
+ '_service_root': proxy,
+ }]
self.using_proxy = True
- self.static_service_roots = True
+ self._static_services_list = True
else:
# It's important to avoid instantiating an API client
# unless we actually need one, for testing's sake.
api_client = arvados.api('v1')
self.api_client = api_client
self.api_token = api_client.api_token
- self.service_roots = None
+ self._keep_services = None
self.using_proxy = None
- self.static_service_roots = False
+ self._static_services_list = False
def current_timeout(self):
"""Return the appropriate timeout to use for this client: the proxy
# KeepService, not a KeepClient. See #4488.
return self.proxy_timeout if self.using_proxy else self.timeout
- def build_service_roots(self, force_rebuild=False):
- if (self.static_service_roots or
- (self.service_roots and not force_rebuild)):
+ def build_services_list(self, force_rebuild=False):
+ if (self._static_services_list or
+ (self._keep_services and not force_rebuild)):
return
with self.lock:
try:
except Exception: # API server predates Keep services.
keep_services = self.api_client.keep_disks().list()
- keep_services = keep_services.execute().get('items')
- if not keep_services:
+ self._keep_services = keep_services.execute().get('items')
+ if not self._keep_services:
raise arvados.errors.NoKeepServersError()
self.using_proxy = any(ks.get('service_type') == 'proxy'
- for ks in keep_services)
-
- roots = ("{}://[{}]:{:d}/".format(
- 'https' if ks['service_ssl_flag'] else 'http',
- ks['service_host'],
- ks['service_port'])
- for ks in keep_services)
- self.service_roots = sorted(set(roots))
- _logger.debug(str(self.service_roots))
-
- def shuffled_service_roots(self, hash, force_rebuild=False):
- self.build_service_roots(force_rebuild)
-
- # Build an ordering with which to query the Keep servers based on the
- # contents of the hash.
- # "hash" is a hex-encoded number at least 8 digits
- # (32 bits) long
-
- # seed used to calculate the next keep server from 'pool'
- # to be added to 'pseq'
- seed = hash
-
- # Keep servers still to be added to the ordering
- pool = self.service_roots[:]
-
- # output probe sequence
- pseq = []
-
- # iterate while there are servers left to be assigned
- while len(pool) > 0:
- if len(seed) < 8:
- # ran out of digits in the seed
- if len(pseq) < len(hash) / 4:
- # the number of servers added to the probe sequence is less
- # than the number of 4-digit slices in 'hash' so refill the
- # seed with the last 4 digits and then append the contents
- # of 'hash'.
- seed = hash[-4:] + hash
- else:
- # refill the seed with the contents of 'hash'
- seed += hash
-
- # Take the next 8 digits (32 bytes) and interpret as an integer,
- # then modulus with the size of the remaining pool to get the next
- # selected server.
- probe = int(seed[0:8], 16) % len(pool)
-
- # Append the selected server to the probe sequence and remove it
- # from the pool.
- pseq += [pool[probe]]
- pool = pool[:probe] + pool[probe+1:]
-
- # Remove the digits just used from the seed
- seed = seed[8:]
- _logger.debug(str(pseq))
- return pseq
+ for ks in self._keep_services)
+
+ # Precompute the base URI for each service.
+ for r in self._keep_services:
+ r['_service_root'] = "{}://[{}]:{:d}/".format(
+ 'https' if r['service_ssl_flag'] else 'http',
+ r['service_host'],
+ r['service_port'])
+ _logger.debug(str(self._keep_services))
+
+ def _service_weight(self, data_hash, service_uuid):
+ """Compute the weight of a Keep service endpoint for a data
+ block with a known hash.
+
+ The weight is md5(h + u) where u is the last 15 characters of
+ the service endpoint's UUID.
+ """
+ return hashlib.md5(data_hash + service_uuid[-15:]).hexdigest()
+ def weighted_service_roots(self, data_hash, force_rebuild=False):
+ """Return an array of Keep service endpoints, in the order in
+ which they should be probed when reading or writing data with
+ the given hash.
+ """
+ self.build_services_list(force_rebuild)
+
+ # Sort the available services by weight (heaviest first) for
+ # this data_hash, and return their service_roots (base URIs)
+ # in that order.
+ sorted_roots = [
+ svc['_service_root'] for svc in sorted(
+ self._keep_services,
+ reverse=True,
+ key=lambda svc: self._service_weight(data_hash, svc['uuid']))]
+ _logger.debug(data_hash + ': ' + str(sorted_roots))
+ return sorted_roots
def map_new_services(self, roots_map, md5_s, force_rebuild, **headers):
# roots_map is a dictionary, mapping Keep service root strings
# new ones to roots_map. Return the current list of local
# root strings.
headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
- local_roots = self.shuffled_service_roots(md5_s, force_rebuild)
+ local_roots = self.weighted_service_roots(md5_s, force_rebuild)
for root in local_roots:
if root not in roots_map:
roots_map[root] = self.KeepService(root, **headers)
for d in api.keep_disks().list().execute()['items']:
api.keep_disks().delete(uuid=d['uuid']).execute()
- s1 = api.keep_services().create(body={"keep_service": {"service_host": "localhost", "service_port": 25107, "service_type": "disk"} }).execute()
- s2 = api.keep_services().create(body={"keep_service": {"service_host": "localhost", "service_port": 25108, "service_type": "disk"} }).execute()
+ s1 = api.keep_services().create(body={"keep_service": {
+ "uuid": "zzzzz-bi6l4-5bo5n1iekkjyz6b",
+ "service_host": "localhost",
+ "service_port": 25107,
+ "service_type": "disk"
+ }}).execute()
+ s2 = api.keep_services().create(body={"keep_service": {
+ "uuid": "zzzzz-bi6l4-2nz60e0ksj7vr3s",
+ "service_host": "localhost",
+ "service_port": 25108,
+ "service_type": "disk"
+ }}).execute()
api.keep_disks().create(body={"keep_disk": {"keep_service_uuid": s1["uuid"] } }).execute()
api.keep_disks().create(body={"keep_disk": {"keep_service_uuid": s2["uuid"] } }).execute()
+import hashlib
import mock
import os
+import re
import socket
import unittest
import urlparse
api_client.keep_services().accessible().execute.return_value = {
'items_available': len(services),
'items': [{
- 'uuid': 'zzzzz-bi6l4-mockservice{:04x}'.format(index),
- 'owner_uuid': 'zzzzz-tpzed-mockownerabcdef',
+ 'uuid': 'zzzzz-bi6l4-{:015x}'.format(index),
+ 'owner_uuid': 'zzzzz-tpzed-000000000000000',
'service_host': host,
'service_port': port,
'service_ssl_flag': ssl,
}
return api_client
+ def mock_n_keep_disks(self, service_count):
+ return self.mock_keep_services(
+ *[("keep0x{:x}".format(index), 80, False, 'disk')
+ for index in range(service_count)])
+
def get_service_roots(self, *services):
api_client = self.mock_keep_services(*services)
keep_client = arvados.KeepClient(api_client=api_client)
- services = keep_client.shuffled_service_roots('000000')
+ services = keep_client.weighted_service_roots('000000')
return [urlparse.urlparse(url) for url in sorted(services)]
def test_ssl_flag_respected_in_roots(self):
arvados.KeepClient.DEFAULT_PROXY_TIMEOUT,
mock_request.call_args[1]['timeout'])
+ def test_probe_order_reference_set(self):
+ # expected_order[i] is the probe order for
+ # hash=md5(sprintf("%064x",i)) where there are 16 services
+ # with uuid sprintf("anything-%015x",j) with j in 0..15. E.g.,
+ # the first probe for the block consisting of 64 "0"
+ # characters is the service whose uuid is
+ # "zzzzz-bi6l4-000000000000003", so expected_order[0][0]=='3'.
+ expected_order = [
+ list('3eab2d5fc9681074'),
+ list('097dba52e648f1c3'),
+ list('c5b4e023f8a7d691'),
+ list('9d81c02e76a3bf54'),
+ ]
+ hashes = [
+ hashlib.md5("{:064x}".format(x)).hexdigest()
+ for x in range(len(expected_order))]
+ api_client = self.mock_n_keep_disks(16)
+ keep_client = arvados.KeepClient(api_client=api_client)
+ for i, hash in enumerate(hashes):
+ roots = keep_client.weighted_service_roots(hash)
+ got_order = [
+ re.search(r'//\[?keep0x([0-9a-f]+)', root).group(1)
+ for root in roots]
+ self.assertEqual(expected_order[i], got_order)
+
+ def test_probe_waste_adding_one_server(self):
+ hashes = [
+ hashlib.md5("{:064x}".format(x)).hexdigest() for x in range(100)]
+ initial_services = 12
+ api_client = self.mock_n_keep_disks(initial_services)
+ keep_client = arvados.KeepClient(api_client=api_client)
+ probes_before = [
+ keep_client.weighted_service_roots(hash) for hash in hashes]
+ for added_services in range(1, 12):
+ api_client = self.mock_n_keep_disks(initial_services+added_services)
+ keep_client = arvados.KeepClient(api_client=api_client)
+ total_penalty = 0
+ for hash_index in range(len(hashes)):
+ probe_after = keep_client.weighted_service_roots(
+ hashes[hash_index])
+ penalty = probe_after.index(probes_before[hash_index][0])
+ self.assertLessEqual(penalty, added_services)
+ total_penalty += penalty
+ # Average penalty per block should not exceed
+ # N(added)/N(orig) by more than 20%, and should get closer
+ # to the ideal as we add data points.
+ expect_penalty = (
+ added_services *
+ len(hashes) / initial_services)
+ max_penalty = (
+ expect_penalty *
+ (120 - added_services)/100)
+ min_penalty = (
+ expect_penalty * 8/10)
+ self.assertTrue(
+ min_penalty <= total_penalty <= max_penalty,
+ "With {}+{} services, {} blocks, penalty {} but expected {}..{}".format(
+ initial_services,
+ added_services,
+ len(hashes),
+ total_penalty,
+ min_penalty,
+ max_penalty))
+
class KeepClientRetryTestMixin(object):
# Testing with a local Keep store won't exercise the retry behavior.
include HasUuid
include KindAndEtag
include CommonApiTemplate
- attr_protected :docker_image_locator
+ attr_protected :arvados_sdk_version, :docker_image_locator
serialize :script_parameters, Hash
serialize :runtime_constraints, Hash
serialize :tasks_summary, Hash
before_validation :set_priority
before_validation :update_state_from_old_state_attrs
validate :ensure_script_version_is_commit
+ validate :find_arvados_sdk_version
validate :find_docker_image_locator
validate :validate_status
validate :validate_state_change
t.add :nondeterministic
t.add :repository
t.add :supplied_script_version
+ t.add :arvados_sdk_version
t.add :docker_image_locator
t.add :queue_position
t.add :node_uuids
true
end
- def find_docker_image_locator
- # Find the Collection that holds the Docker image specified in the
- # runtime constraints, and store its locator in docker_image_locator.
- unless runtime_constraints.is_a? Hash
- # We're still in validation stage, so we can't assume
- # runtime_constraints isn't something horrible like an array or
- # a string. Treat those cases as "no docker image supplied";
- # other validations will fail anyway.
- self.docker_image_locator = nil
- return true
+ def resolve_runtime_constraint(key, attr_sym)
+ if ((runtime_constraints.is_a? Hash) and
+ (search = runtime_constraints[key]))
+ ok, result = yield search
+ else
+ ok, result = true, nil
end
- image_search = runtime_constraints['docker_image']
- image_tag = runtime_constraints['docker_image_tag']
- if image_search.nil?
- self.docker_image_locator = nil
- true
- elsif coll = Collection.for_latest_docker_image(image_search, image_tag)
- self.docker_image_locator = coll.portable_data_hash
- true
+ if ok
+ send("#{attr_sym}=".to_sym, result)
else
- errors.add(:docker_image_locator, "not found for #{image_search}")
- false
+ errors.add(attr_sym, result)
+ end
+ ok
+ end
+
+ def find_arvados_sdk_version
+ resolve_runtime_constraint("arvados_sdk_version",
+ :arvados_sdk_version) do |git_search|
+ commits = Commit.find_commit_range(current_user, "arvados",
+ nil, git_search, nil)
+ if commits.andand.any?
+ [true, commits.first]
+ else
+ [false, "#{git_search} does not resolve to a commit"]
+ end
+ end
+ end
+
+ def find_docker_image_locator
+ resolve_runtime_constraint("docker_image",
+ :docker_image_locator) do |image_search|
+ image_tag = runtime_constraints['docker_image_tag']
+ if coll = Collection.for_latest_docker_image(image_search, image_tag)
+ [true, coll.portable_data_hash]
+ else
+ [false, "not found for #{image_search}"]
+ end
end
end
--- /dev/null
+class AddArvadosSdkVersionToJobs < ActiveRecord::Migration
+ def up
+ change_table :jobs do |t|
+ t.column :arvados_sdk_version, :string
+ end
+ end
+
+ def down
+ change_table :jobs do |t|
+ t.remove :arvados_sdk_version
+ end
+ end
+end
docker_image_locator character varying(255),
priority integer DEFAULT 0 NOT NULL,
description text,
- state character varying(255)
+ state character varying(255),
+ arvados_sdk_version character varying(255)
);
INSERT INTO schema_migrations (version) VALUES ('20140918153705');
-INSERT INTO schema_migrations (version) VALUES ('20140924091559');
\ No newline at end of file
+INSERT INTO schema_migrations (version) VALUES ('20140924091559');
+
+INSERT INTO schema_migrations (version) VALUES ('20141111133038');
\ No newline at end of file
# @where, @filters, @limit, @offset, @orders
module LoadParam
- # Default limit on number of rows to return in a single query.
+ # Default number of rows to return in a single query.
DEFAULT_LIMIT = 100
+ # Maximum number of rows to return in a single query, even if the client asks for more.
+ MAX_LIMIT = 1000
+
# Load params[:where] into @where
def load_where_param
if params[:where].nil? or params[:where] == ""
unless params[:limit].to_s.match(/^\d+$/)
raise ArgumentError.new("Invalid value for limit parameter")
end
- @limit = params[:limit].to_i
+ @limit = [params[:limit].to_i, MAX_LIMIT].min
else
@limit = DEFAULT_LIMIT
end
keep0:
uuid: zzzzz-bi6l4-6zhilxar6r8ey90
owner_uuid: zzzzz-tpzed-d9tiejq69daie8f
- service_host: keep0.qr1hi.arvadosapi.com
+ service_host: keep0.zzzzz.arvadosapi.com
service_port: 25107
service_ssl_flag: false
service_type: disk
keep1:
uuid: zzzzz-bi6l4-rsnj3c76ndxb7o0
owner_uuid: zzzzz-tpzed-d9tiejq69daie8f
- service_host: keep1.qr1hi.arvadosapi.com
+ service_host: keep1.zzzzz.arvadosapi.com
service_port: 25107
service_ssl_flag: false
service_type: disk
proxy:
uuid: zzzzz-bi6l4-h0a0xwut9qa6g3a
owner_uuid: zzzzz-tpzed-d9tiejq69daie8f
- service_host: keep.qr1hi.arvadosapi.com
+ service_host: keep.zzzzz.arvadosapi.com
service_port: 25333
service_ssl_flag: true
service_type: proxy
head_uuid: zzzzz-8i9sb-cjs4pklxxjykyuj
properties: {}
+arvados_repository_readable_by_all_users:
+ uuid: zzzzz-o0j2j-allcanreadarvrp
+ owner_uuid: zzzzz-tpzed-000000000000000
+ created_at: 2014-01-24 20:42:26 -0800
+ modified_by_client_uuid: zzzzz-ozdt8-brczlopd8u8d0jr
+ modified_by_user_uuid: zzzzz-tpzed-000000000000000
+ modified_at: 2014-01-24 20:42:26 -0800
+ updated_at: 2014-01-24 20:42:26 -0800
+ tail_uuid: zzzzz-j7d0g-fffffffffffffff
+ link_class: permission
+ name: can_read
+ head_uuid: zzzzz-s0uqq-arvadosrepo0123
+ properties: {}
+
foo_repository_readable_by_spectator:
uuid: zzzzz-o0j2j-cpy7p41hpk5xxx
owner_uuid: zzzzz-tpzed-000000000000000
title: "default input"
description: "input collection"
+template_with_dataclass_file:
+ uuid: zzzzz-p5p6p-k0xoa0ofxrystgw
+ owner_uuid: zzzzz-j7d0g-v955i6s2oi1cbso
+ created_at: 2014-04-14 12:35:04 -0400
+ updated_at: 2014-04-14 12:35:04 -0400
+ modified_at: 2014-04-14 12:35:04 -0400
+ modified_by_client_uuid: zzzzz-ozdt8-brczlopd8u8d0jr
+ modified_by_user_uuid: zzzzz-tpzed-xurymjxw79nv3jz
+ name: Two Part Template with dataclass File
+ components:
+ part-one:
+ script: foo
+ script_version: master
+ script_parameters:
+ input:
+ required: true
+ dataclass: File
+ title: "Foo/bar pair"
+ description: "Provide an input file"
+ part-two:
+ script: bar
+ script_version: master
+ script_parameters:
+ input:
+ output_of: part-one
+ integer_with_default:
+ default: 123
+ integer_with_value:
+ value: 123
+ string_with_default:
+ default: baz
+ string_with_value:
+ value: baz
+ plain_string: qux
+ array_with_default: # important to test repeating values in the array!
+ default: [1,1,2,3,5]
+ array_with_value: # important to test repeating values in the array!
+ value: [1,1,2,3,5]
owner_uuid: zzzzz-tpzed-xurymjxw79nv3jz # active user
name: crunch_dispatch_test
+arvados:
+ uuid: zzzzz-s0uqq-arvadosrepo0123
+ owner_uuid: zzzzz-tpzed-000000000000000 # root
+ name: arvados
+
foo:
uuid: zzzzz-s0uqq-382brsig8rp3666
owner_uuid: zzzzz-tpzed-xurymjxw79nv3jz # active user
#test "test_branch1" do
# complains "fatal: bad object 077ba2ad3ea24a929091a9e6ce545c93199b8e57"
a = Commit.find_commit_range(users(:active), nil, nil, 'master', nil)
- assert_equal ['f35f99b7d32bac257f5989df02b9f12ee1a9b0d6', '077ba2ad3ea24a929091a9e6ce545c93199b8e57'], a
+ assert_includes(a, 'f35f99b7d32bac257f5989df02b9f12ee1a9b0d6')
+ assert_includes(a, '077ba2ad3ea24a929091a9e6ce545c93199b8e57')
#test "test_branch2" do
a = Commit.find_commit_range(users(:active), 'foo', nil, 'b1', nil)
assert_includes ids, collections(:baz_file_in_asubproject).uuid
end
- test "user with project read permission can sort project collections ascending, ignoring case" do
- authorize_with :project_viewer
- get :contents, {
- id: groups(:asubproject).uuid,
- format: :json,
- filters: [['uuid', 'is_a', "arvados#collection"]],
- order: 'collections.name asc'
- }
- sorted_entries = json_response['items'].collect { |item| item["name"].downcase }
- previous = nil
- sorted_entries.each do |entry|
- assert_operator( previous, :<=, entry) if previous
- previous = entry
- end
- end
-
- test "user with project read permission can sort project collections descending, ignoring case" do
- authorize_with :project_viewer
- get :contents, {
- id: groups(:asubproject).uuid,
- format: :json,
- filters: [['uuid', 'is_a', "arvados#collection"]],
- order: 'collections.name desc'
- }
- sorted_entries = json_response['items'].collect { |item| item["name"].downcase }
- previous = nil
- sorted_entries.each do |entry|
- assert_operator( previous, :>=, entry) if previous
- previous = entry
+ [['asc', :<=],
+ ['desc', :>=]].each do |order, operator|
+ test "user with project read permission can sort project collections #{order}" do
+ authorize_with :project_viewer
+ get :contents, {
+ id: groups(:asubproject).uuid,
+ format: :json,
+ filters: [['uuid', 'is_a', "arvados#collection"]],
+ order: "collections.name #{order}"
+ }
+ sorted_names = json_response['items'].collect { |item| item["name"] }
+ # Here we avoid assuming too much about the database
+ # collation. Both "alice"<"Bob" and "alice">"Bob" can be
+ # correct. Hopefully it _is_ safe to assume that if "a" comes
+ # before "b" in the ascii alphabet, "aX">"bY" is never true for
+ # any strings X and Y.
+ reliably_sortable_names = sorted_names.select do |name|
+ name[0] >= 'a' and name[0] <= 'z'
+ end.uniq do |name|
+ name[0]
+ end
+ # Preserve order of sorted_names. But do not use &=. If
+ # sorted_names has out-of-order duplicates, we want to preserve
+ # them here, so we can detect them and fail the test below.
+ sorted_names.select! do |name|
+ reliably_sortable_names.include? name
+ end
+ actually_checked_anything = false
+ previous = nil
+ sorted_names.each do |entry|
+ if previous
+ assert_operator(previous, operator, entry,
+ "Entries sorted incorrectly.")
+ actually_checked_anything = true
+ end
+ previous = entry
+ end
+ assert actually_checked_anything, "Didn't even find two names to compare."
end
end
end
test "get_all_permissions does not give any access to user without permission" do
+ viewer_uuid = users(:project_viewer).uuid
+ assert_equal(authorized_keys(:project_viewer).authorized_user_uuid,
+ viewer_uuid,
+ "project_viewer must have an authorized_key for this test to work")
authorize_with :admin
get :get_all_permissions
assert_response :success
- assert_equal(authorized_keys(:project_viewer).authorized_user_uuid,
- users(:project_viewer).uuid,
- "project_viewer must have an authorized_key for this test to work")
- json_response['repositories'].each do |repo|
- assert_equal(false,
- repo['user_permissions'].has_key?(users(:project_viewer).uuid),
- "project_viewer user should not have perms for #{repo['uuid']}")
+ readable_repos = json_response["repositories"].select do |repo|
+ repo["user_permissions"].has_key?(viewer_uuid)
end
+ assert_equal(["arvados"], readable_repos.map { |r| r["name"] },
+ "project_viewer should only have permissions on public repos")
end
test "get_all_permissions gives gitolite R to user with read-only access" do
def self.included base
base.setup do
@tmpdir = Dir.mktmpdir()
- `cp test/test.git.tar #{@tmpdir} && cd #{@tmpdir} && tar xf test.git.tar`
- @orig_git_repositories_dir = Rails.configuration.git_repositories_dir
+ system("tar", "-xC", @tmpdir, "-f", "test/test.git.tar")
Rails.configuration.git_repositories_dir = "#{@tmpdir}/test"
Commit.refresh_repositories
end
base.teardown do
FileUtils.remove_entry @tmpdir, true
- Rails.configuration.git_repositories_dir = @orig_git_repositories_dir
Commit.refresh_repositories
end
end
assert_response :success
services = json_response['items']
- assert_equal 2, services.length
- assert_equal 'disk', services[0]['service_type']
- assert_equal 'disk', services[1]['service_type']
+ assert_operator 2, :<=, services.length
+ services.each do |service|
+ assert_equal 'disk', service['service_type']
+ end
+ end
+ test "request keep proxy" do
get "/arvados/v1/keep_services/accessible", {:format => :json}, auth(:active).merge({'HTTP_X_EXTERNAL_CLIENT' => '1'})
assert_response :success
services = json_response['items']
assert_equal 1, services.length
- assert_equal "zzzzz-bi6l4-h0a0xwut9qa6g3a", services[0]['uuid']
- assert_equal "keep.qr1hi.arvadosapi.com", services[0]['service_host']
- assert_equal 25333, services[0]['service_port']
- assert_equal true, services[0]['service_ssl_flag']
+ assert_equal keep_services(:proxy).uuid, services[0]['uuid']
+ assert_equal keep_services(:proxy).service_host, services[0]['service_host']
+ assert_equal keep_services(:proxy).service_port, services[0]['service_port']
+ assert_equal keep_services(:proxy).service_ssl_flag, services[0]['service_ssl_flag']
assert_equal 'proxy', services[0]['service_type']
end
end
assert_equal(image_uuid, job.docker_image_locator)
end
- test "can't create Job with Docker image locator" do
+ def check_attrs_unset(job, attrs)
+ assert_empty(attrs.each_key.map { |key| job.send(key) }.compact,
+ "job has values for #{attrs.keys}")
+ end
+
+ def check_creation_prohibited(attrs)
begin
- job = Job.new job_attrs(docker_image_locator: BAD_COLLECTION)
+ job = Job.new(job_attrs(attrs))
rescue ActiveModel::MassAssignmentSecurity::Error
# Test passes - expected attribute protection
else
- assert_nil job.docker_image_locator
+ check_attrs_unset(job, attrs)
end
end
- test "can't assign Docker image locator to Job" do
- job = Job.new job_attrs
- begin
- Job.docker_image_locator = BAD_COLLECTION
- rescue NoMethodError
- # Test passes - expected attribute protection
+ def check_modification_prohibited(attrs)
+ job = Job.new(job_attrs)
+ attrs.each_pair do |key, value|
+ assert_raises(NoMethodError) { job.send("{key}=".to_sym, value) }
end
- assert_nil job.docker_image_locator
+ check_attrs_unset(job, attrs)
+ end
+
+ test "can't create Job with Docker image locator" do
+ check_creation_prohibited(docker_image_locator: BAD_COLLECTION)
+ end
+
+ test "can't assign Docker image locator to Job" do
+ check_modification_prohibited(docker_image_locator: BAD_COLLECTION)
end
[
assert_not_equal job1.queue_position, job2.queue_position
end
+ SDK_MASTER = "ca68b24e51992e790f29df5cc4bc54ce1da4a1c2"
+ SDK_TAGGED = "00634b2b8a492d6f121e3cf1d6587b821136a9a7"
+
+ def sdk_constraint(version)
+ {runtime_constraints: {"arvados_sdk_version" => version}}
+ end
+
+ def check_job_sdk_version(expected)
+ job = yield
+ if expected.nil?
+ refute(job.valid?, "job valid with bad Arvados SDK version")
+ else
+ assert(job.valid?, "job not valid with good Arvados SDK version")
+ assert_equal(expected, job.arvados_sdk_version)
+ end
+ end
+
+ { "master" => SDK_MASTER,
+ "commit2" => SDK_TAGGED,
+ SDK_TAGGED[0, 8] => SDK_TAGGED,
+ "__nonexistent__" => nil,
+ }.each_pair do |search, commit_hash|
+ test "creating job with SDK version '#{search}'" do
+ check_job_sdk_version(commit_hash) do
+ Job.new(job_attrs(sdk_constraint(search)))
+ end
+ end
+
+ test "updating job with SDK version '#{search}'" do
+ job = Job.create!(job_attrs)
+ assert_nil job.arvados_sdk_version
+ check_job_sdk_version(commit_hash) do
+ job.runtime_constraints = sdk_constraint(search)[:runtime_constraints]
+ job
+ end
+ end
+ end
+
+ test "clear the SDK version" do
+ job = Job.create!(job_attrs(sdk_constraint("master")))
+ assert_equal(SDK_MASTER, job.arvados_sdk_version)
+ job.runtime_constraints = {}
+ assert(job.valid?, "job invalid after clearing SDK version")
+ assert_nil(job.arvados_sdk_version)
+ end
+
+ test "can't create job with SDK version assigned directly" do
+ check_creation_prohibited(arvados_sdk_version: SDK_MASTER)
+ end
+
+ test "can't modify job to assign SDK version directly" do
+ check_modification_prohibited(arvados_sdk_version: SDK_MASTER)
+ end
end
os.Setenv("ARVADOS_KEEP_PROXY", fmt.Sprintf("http://localhost:%v", port))
os.Setenv("ARVADOS_API_TOKEN", token)
arv, err := arvadosclient.MakeArvadosClient()
+ c.Assert(err, Equals, nil)
kc, err := keepclient.MakeKeepClient(&arv)
+ c.Assert(err, Equals, nil)
c.Check(kc.Using_proxy, Equals, true)
c.Check(len(kc.ServiceRoots()), Equals, 1)
- c.Check(kc.ServiceRoots()[0], Equals, fmt.Sprintf("http://localhost:%v", port))
- c.Check(err, Equals, nil)
+ for _, root := range(kc.ServiceRoots()) {
+ c.Check(root, Equals, fmt.Sprintf("http://localhost:%v", port))
+ }
os.Setenv("ARVADOS_KEEP_PROXY", "")
log.Print("keepclient created")
return kc
os.Setenv("ARVADOS_EXTERNAL_CLIENT", "true")
arv, err := arvadosclient.MakeArvadosClient()
+ c.Assert(err, Equals, nil)
kc, err := keepclient.MakeKeepClient(&arv)
+ c.Assert(err, Equals, nil)
c.Check(kc.Arvados.External, Equals, true)
c.Check(kc.Using_proxy, Equals, true)
c.Check(len(kc.ServiceRoots()), Equals, 1)
- c.Check(kc.ServiceRoots()[0], Equals, "http://localhost:29950")
- c.Check(err, Equals, nil)
+ for _, root := range kc.ServiceRoots() {
+ c.Check(root, Equals, "http://localhost:29950")
+ }
os.Setenv("ARVADOS_EXTERNAL_CLIENT", "")
log.Print("keepclient created")
from __future__ import absolute_import, print_function
-import functools
import itertools
-import logging
import time
-import pykka
-
-from ..clientactor import _notify_subscribers
-from .. import config
-
def arvados_node_fqdn(arvados_node, default_hostname='dynamic.compute'):
hostname = arvados_node.get('hostname') or default_hostname
return '{}.{}'.format(hostname, arvados_node['domain'])
def timestamp_fresh(timestamp, fresh_time):
return (time.time() - timestamp) < fresh_time
-class BaseComputeNodeDriver(object):
- """Abstract base class for compute node drivers.
-
- libcloud abstracts away many of the differences between cloud providers,
- but managing compute nodes requires some cloud-specific features (e.g.,
- on EC2 we use tags to identify compute nodes). Compute node drivers
- are responsible for translating the node manager's cloud requests to a
- specific cloud's vocabulary.
-
- Subclasses must implement arvados_create_kwargs (to update node
- creation kwargs with information about the specific Arvados node
- record), sync_node, and node_start_time.
- """
- def __init__(self, auth_kwargs, list_kwargs, create_kwargs, driver_class):
- self.real = driver_class(**auth_kwargs)
- self.list_kwargs = list_kwargs
- self.create_kwargs = create_kwargs
-
- def __getattr__(self, name):
- # Proxy non-extension methods to the real driver.
- if (not name.startswith('_') and not name.startswith('ex_')
- and hasattr(self.real, name)):
- return getattr(self.real, name)
- else:
- return super(BaseComputeNodeDriver, self).__getattr__(name)
-
- def search_for(self, term, list_method, key=lambda item: item.id):
- cache_key = (list_method, term)
- if cache_key not in self.SEARCH_CACHE:
- results = [item for item in getattr(self.real, list_method)()
- if key(item) == term]
- count = len(results)
- if count != 1:
- raise ValueError("{} returned {} results for '{}'".format(
- list_method, count, term))
- self.SEARCH_CACHE[cache_key] = results[0]
- return self.SEARCH_CACHE[cache_key]
-
- def list_nodes(self):
- return self.real.list_nodes(**self.list_kwargs)
-
- def arvados_create_kwargs(self, arvados_node):
- raise NotImplementedError("BaseComputeNodeDriver.arvados_create_kwargs")
-
- def create_node(self, size, arvados_node):
- kwargs = self.create_kwargs.copy()
- kwargs.update(self.arvados_create_kwargs(arvados_node))
- kwargs['size'] = size
- return self.real.create_node(**kwargs)
-
- def sync_node(self, cloud_node, arvados_node):
- # When a compute node first pings the API server, the API server
- # will automatically assign some attributes on the corresponding
- # node record, like hostname. This method should propagate that
- # information back to the cloud node appropriately.
- raise NotImplementedError("BaseComputeNodeDriver.sync_node")
-
- @classmethod
- def node_start_time(cls, node):
- raise NotImplementedError("BaseComputeNodeDriver.node_start_time")
-
-
-ComputeNodeDriverClass = BaseComputeNodeDriver
-
-class ComputeNodeStateChangeBase(config.actor_class):
- """Base class for actors that change a compute node's state.
-
- This base class takes care of retrying changes and notifying
- subscribers when the change is finished.
- """
- def __init__(self, logger_name, timer_actor, retry_wait, max_retry_wait):
- super(ComputeNodeStateChangeBase, self).__init__()
- self._later = self.actor_ref.proxy()
- self._timer = timer_actor
- self._logger = logging.getLogger(logger_name)
- self.min_retry_wait = retry_wait
- self.max_retry_wait = max_retry_wait
- self.retry_wait = retry_wait
- self.subscribers = set()
-
- @staticmethod
- def _retry(errors):
- """Retry decorator for an actor method that makes remote requests.
-
- Use this function to decorator an actor method, and pass in a
- tuple of exceptions to catch. This decorator will schedule
- retries of that method with exponential backoff if the
- original method raises any of the given errors.
- """
- def decorator(orig_func):
- @functools.wraps(orig_func)
- def wrapper(self, *args, **kwargs):
- try:
- orig_func(self, *args, **kwargs)
- except errors as error:
- self._logger.warning(
- "Client error: %s - waiting %s seconds",
- error, self.retry_wait)
- self._timer.schedule(self.retry_wait,
- getattr(self._later,
- orig_func.__name__),
- *args, **kwargs)
- self.retry_wait = min(self.retry_wait * 2,
- self.max_retry_wait)
- else:
- self.retry_wait = self.min_retry_wait
- return wrapper
- return decorator
-
- def _finished(self):
- _notify_subscribers(self._later, self.subscribers)
- self.subscribers = None
-
- def subscribe(self, subscriber):
- if self.subscribers is None:
- try:
- subscriber(self._later)
- except pykka.ActorDeadError:
- pass
- else:
- self.subscribers.add(subscriber)
-
-
-class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
- """Actor to create and set up a cloud compute node.
-
- This actor prepares an Arvados node record for a new compute node
- (either creating one or cleaning one passed in), then boots the
- actual compute node. It notifies subscribers when the cloud node
- is successfully created (the last step in the process for Node
- Manager to handle).
- """
- def __init__(self, timer_actor, arvados_client, cloud_client,
- cloud_size, arvados_node=None,
- retry_wait=1, max_retry_wait=180):
- super(ComputeNodeSetupActor, self).__init__(
- 'arvnodeman.nodeup', timer_actor, retry_wait, max_retry_wait)
- self._arvados = arvados_client
- self._cloud = cloud_client
- self.cloud_size = cloud_size
- self.arvados_node = None
- self.cloud_node = None
- if arvados_node is None:
- self._later.create_arvados_node()
- else:
- self._later.prepare_arvados_node(arvados_node)
-
- @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
- def create_arvados_node(self):
- self.arvados_node = self._arvados.nodes().create(body={}).execute()
- self._later.create_cloud_node()
-
- @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
- def prepare_arvados_node(self, node):
- self.arvados_node = self._arvados.nodes().update(
- uuid=node['uuid'],
- body={'hostname': None,
- 'ip_address': None,
- 'slot_number': None,
- 'first_ping_at': None,
- 'last_ping_at': None,
- 'info': {'ec2_instance_id': None,
- 'last_action': "Prepared by Node Manager"}}
- ).execute()
- self._later.create_cloud_node()
-
- @ComputeNodeStateChangeBase._retry(config.CLOUD_ERRORS)
- def create_cloud_node(self):
- self._logger.info("Creating cloud node with size %s.",
- self.cloud_size.name)
- self.cloud_node = self._cloud.create_node(self.cloud_size,
- self.arvados_node)
- self._logger.info("Cloud node %s created.", self.cloud_node.id)
- self._finished()
-
- def stop_if_no_cloud_node(self):
- if self.cloud_node is None:
- self.stop()
-
-
-class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
- """Actor to shut down a compute node.
-
- This actor simply destroys a cloud node, retrying as needed.
- """
- def __init__(self, timer_actor, cloud_client, cloud_node,
- retry_wait=1, max_retry_wait=180):
- super(ComputeNodeShutdownActor, self).__init__(
- 'arvnodeman.nodedown', timer_actor, retry_wait, max_retry_wait)
- self._cloud = cloud_client
- self.cloud_node = cloud_node
- self._later.shutdown_node()
-
- @ComputeNodeStateChangeBase._retry(config.CLOUD_ERRORS)
- def shutdown_node(self):
- self._cloud.destroy_node(self.cloud_node)
- self._logger.info("Cloud node %s shut down.", self.cloud_node.id)
- self._finished()
-
-
-class ComputeNodeUpdateActor(config.actor_class):
- """Actor to dispatch one-off cloud management requests.
-
- This actor receives requests for small cloud updates, and
- dispatches them to a real driver. ComputeNodeMonitorActors use
- this to perform maintenance tasks on themselves. Having a
- dedicated actor for this gives us the opportunity to control the
- flow of requests; e.g., by backing off when errors occur.
-
- This actor is most like a "traditional" Pykka actor: there's no
- subscribing, but instead methods return real driver results. If
- you're interested in those results, you should get them from the
- Future that the proxy method returns. Be prepared to handle exceptions
- from the cloud driver when you do.
- """
- def __init__(self, cloud_factory, max_retry_wait=180):
- super(ComputeNodeUpdateActor, self).__init__()
- self._cloud = cloud_factory()
- self.max_retry_wait = max_retry_wait
- self.error_streak = 0
- self.next_request_time = time.time()
-
- def _throttle_errors(orig_func):
- @functools.wraps(orig_func)
- def wrapper(self, *args, **kwargs):
- throttle_time = self.next_request_time - time.time()
- if throttle_time > 0:
- time.sleep(throttle_time)
- self.next_request_time = time.time()
- try:
- result = orig_func(self, *args, **kwargs)
- except config.CLOUD_ERRORS:
- self.error_streak += 1
- self.next_request_time += min(2 ** self.error_streak,
- self.max_retry_wait)
- raise
- else:
- self.error_streak = 0
- return result
- return wrapper
-
- @_throttle_errors
- def sync_node(self, cloud_node, arvados_node):
- return self._cloud.sync_node(cloud_node, arvados_node)
-
-
class ShutdownTimer(object):
"""Keep track of a cloud node's shutdown windows.
def window_open(self):
self._advance_opening()
return 0 < (time.time() - self._open_start) < self._open_for
-
-
-class ComputeNodeMonitorActor(config.actor_class):
- """Actor to manage a running compute node.
-
- This actor gets updates about a compute node's cloud and Arvados records.
- It uses this information to notify subscribers when the node is eligible
- for shutdown.
- """
- def __init__(self, cloud_node, cloud_node_start_time, shutdown_timer,
- timer_actor, update_actor, arvados_node=None,
- poll_stale_after=600, node_stale_after=3600):
- super(ComputeNodeMonitorActor, self).__init__()
- self._later = self.actor_ref.proxy()
- self._logger = logging.getLogger('arvnodeman.computenode')
- self._last_log = None
- self._shutdowns = shutdown_timer
- self._timer = timer_actor
- self._update = update_actor
- self.cloud_node = cloud_node
- self.cloud_node_start_time = cloud_node_start_time
- self.poll_stale_after = poll_stale_after
- self.node_stale_after = node_stale_after
- self.subscribers = set()
- self.arvados_node = None
- self._later.update_arvados_node(arvados_node)
- self.last_shutdown_opening = None
- self._later.consider_shutdown()
-
- def subscribe(self, subscriber):
- self.subscribers.add(subscriber)
-
- def _debug(self, msg, *args):
- if msg == self._last_log:
- return
- self._last_log = msg
- self._logger.debug(msg, *args)
-
- def in_state(self, *states):
- # Return a boolean to say whether or not our Arvados node record is in
- # one of the given states. If state information is not
- # available--because this node has no Arvados record, the record is
- # stale, or the record has no state information--return None.
- if (self.arvados_node is None) or not timestamp_fresh(
- arvados_node_mtime(self.arvados_node), self.node_stale_after):
- return None
- state = self.arvados_node['info'].get('slurm_state')
- if not state:
- return None
- result = state in states
- if state == 'idle':
- result = result and not self.arvados_node['job_uuid']
- return result
-
- def _shutdown_eligible(self):
- if self.arvados_node is None:
- # If this is a new, unpaired node, it's eligible for
- # shutdown--we figure there was an error during bootstrap.
- return timestamp_fresh(self.cloud_node_start_time,
- self.node_stale_after)
- else:
- return self.in_state('idle')
-
- def consider_shutdown(self):
- next_opening = self._shutdowns.next_opening()
- if self._shutdowns.window_open():
- if self._shutdown_eligible():
- self._debug("Node %s suggesting shutdown.", self.cloud_node.id)
- _notify_subscribers(self._later, self.subscribers)
- else:
- self._debug("Node %s shutdown window open but node busy.",
- self.cloud_node.id)
- else:
- self._debug("Node %s shutdown window closed. Next at %s.",
- self.cloud_node.id, time.ctime(next_opening))
- if self.last_shutdown_opening != next_opening:
- self._timer.schedule(next_opening, self._later.consider_shutdown)
- self.last_shutdown_opening = next_opening
-
- def offer_arvados_pair(self, arvados_node):
- if self.arvados_node is not None:
- return None
- elif arvados_node['ip_address'] in self.cloud_node.private_ips:
- self._later.update_arvados_node(arvados_node)
- return self.cloud_node.id
- else:
- return None
-
- def update_cloud_node(self, cloud_node):
- if cloud_node is not None:
- self.cloud_node = cloud_node
- self._later.consider_shutdown()
-
- def update_arvados_node(self, arvados_node):
- if arvados_node is not None:
- self.arvados_node = arvados_node
- new_hostname = arvados_node_fqdn(self.arvados_node)
- if new_hostname != self.cloud_node.name:
- self._update.sync_node(self.cloud_node, self.arvados_node)
- self._later.consider_shutdown()
--- /dev/null
+#!/usr/bin/env python
+
+from __future__ import absolute_import, print_function
+
+import functools
+import logging
+import time
+
+import pykka
+
+from .. import arvados_node_fqdn, arvados_node_mtime, timestamp_fresh
+from ...clientactor import _notify_subscribers
+from ... import config
+
+class ComputeNodeStateChangeBase(config.actor_class):
+ """Base class for actors that change a compute node's state.
+
+ This base class takes care of retrying changes and notifying
+ subscribers when the change is finished.
+ """
+ def __init__(self, logger_name, timer_actor, retry_wait, max_retry_wait):
+ super(ComputeNodeStateChangeBase, self).__init__()
+ self._later = self.actor_ref.proxy()
+ self._timer = timer_actor
+ self._logger = logging.getLogger(logger_name)
+ self.min_retry_wait = retry_wait
+ self.max_retry_wait = max_retry_wait
+ self.retry_wait = retry_wait
+ self.subscribers = set()
+
+ @staticmethod
+ def _retry(errors):
+ """Retry decorator for an actor method that makes remote requests.
+
+ Use this function to decorator an actor method, and pass in a
+ tuple of exceptions to catch. This decorator will schedule
+ retries of that method with exponential backoff if the
+ original method raises any of the given errors.
+ """
+ def decorator(orig_func):
+ @functools.wraps(orig_func)
+ def wrapper(self, *args, **kwargs):
+ try:
+ orig_func(self, *args, **kwargs)
+ except errors as error:
+ self._logger.warning(
+ "Client error: %s - waiting %s seconds",
+ error, self.retry_wait)
+ self._timer.schedule(self.retry_wait,
+ getattr(self._later,
+ orig_func.__name__),
+ *args, **kwargs)
+ self.retry_wait = min(self.retry_wait * 2,
+ self.max_retry_wait)
+ else:
+ self.retry_wait = self.min_retry_wait
+ return wrapper
+ return decorator
+
+ def _finished(self):
+ _notify_subscribers(self._later, self.subscribers)
+ self.subscribers = None
+
+ def subscribe(self, subscriber):
+ if self.subscribers is None:
+ try:
+ subscriber(self._later)
+ except pykka.ActorDeadError:
+ pass
+ else:
+ self.subscribers.add(subscriber)
+
+
+class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
+ """Actor to create and set up a cloud compute node.
+
+ This actor prepares an Arvados node record for a new compute node
+ (either creating one or cleaning one passed in), then boots the
+ actual compute node. It notifies subscribers when the cloud node
+ is successfully created (the last step in the process for Node
+ Manager to handle).
+ """
+ def __init__(self, timer_actor, arvados_client, cloud_client,
+ cloud_size, arvados_node=None,
+ retry_wait=1, max_retry_wait=180):
+ super(ComputeNodeSetupActor, self).__init__(
+ 'arvnodeman.nodeup', timer_actor, retry_wait, max_retry_wait)
+ self._arvados = arvados_client
+ self._cloud = cloud_client
+ self.cloud_size = cloud_size
+ self.arvados_node = None
+ self.cloud_node = None
+ if arvados_node is None:
+ self._later.create_arvados_node()
+ else:
+ self._later.prepare_arvados_node(arvados_node)
+
+ @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
+ def create_arvados_node(self):
+ self.arvados_node = self._arvados.nodes().create(body={}).execute()
+ self._later.create_cloud_node()
+
+ @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
+ def prepare_arvados_node(self, node):
+ self.arvados_node = self._arvados.nodes().update(
+ uuid=node['uuid'],
+ body={'hostname': None,
+ 'ip_address': None,
+ 'slot_number': None,
+ 'first_ping_at': None,
+ 'last_ping_at': None,
+ 'info': {'ec2_instance_id': None,
+ 'last_action': "Prepared by Node Manager"}}
+ ).execute()
+ self._later.create_cloud_node()
+
+ @ComputeNodeStateChangeBase._retry(config.CLOUD_ERRORS)
+ def create_cloud_node(self):
+ self._logger.info("Creating cloud node with size %s.",
+ self.cloud_size.name)
+ self.cloud_node = self._cloud.create_node(self.cloud_size,
+ self.arvados_node)
+ self._logger.info("Cloud node %s created.", self.cloud_node.id)
+ self._finished()
+
+ def stop_if_no_cloud_node(self):
+ if self.cloud_node is None:
+ self.stop()
+
+
+class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
+ """Actor to shut down a compute node.
+
+ This actor simply destroys a cloud node, retrying as needed.
+ """
+ def __init__(self, timer_actor, cloud_client, cloud_node,
+ retry_wait=1, max_retry_wait=180):
+ super(ComputeNodeShutdownActor, self).__init__(
+ 'arvnodeman.nodedown', timer_actor, retry_wait, max_retry_wait)
+ self._cloud = cloud_client
+ self.cloud_node = cloud_node
+ self._later.shutdown_node()
+
+ @ComputeNodeStateChangeBase._retry(config.CLOUD_ERRORS)
+ def shutdown_node(self):
+ self._cloud.destroy_node(self.cloud_node)
+ self._logger.info("Cloud node %s shut down.", self.cloud_node.id)
+ self._finished()
+
+
+class ComputeNodeUpdateActor(config.actor_class):
+ """Actor to dispatch one-off cloud management requests.
+
+ This actor receives requests for small cloud updates, and
+ dispatches them to a real driver. ComputeNodeMonitorActors use
+ this to perform maintenance tasks on themselves. Having a
+ dedicated actor for this gives us the opportunity to control the
+ flow of requests; e.g., by backing off when errors occur.
+
+ This actor is most like a "traditional" Pykka actor: there's no
+ subscribing, but instead methods return real driver results. If
+ you're interested in those results, you should get them from the
+ Future that the proxy method returns. Be prepared to handle exceptions
+ from the cloud driver when you do.
+ """
+ def __init__(self, cloud_factory, max_retry_wait=180):
+ super(ComputeNodeUpdateActor, self).__init__()
+ self._cloud = cloud_factory()
+ self.max_retry_wait = max_retry_wait
+ self.error_streak = 0
+ self.next_request_time = time.time()
+
+ def _throttle_errors(orig_func):
+ @functools.wraps(orig_func)
+ def wrapper(self, *args, **kwargs):
+ throttle_time = self.next_request_time - time.time()
+ if throttle_time > 0:
+ time.sleep(throttle_time)
+ self.next_request_time = time.time()
+ try:
+ result = orig_func(self, *args, **kwargs)
+ except config.CLOUD_ERRORS:
+ self.error_streak += 1
+ self.next_request_time += min(2 ** self.error_streak,
+ self.max_retry_wait)
+ raise
+ else:
+ self.error_streak = 0
+ return result
+ return wrapper
+
+ @_throttle_errors
+ def sync_node(self, cloud_node, arvados_node):
+ return self._cloud.sync_node(cloud_node, arvados_node)
+
+
+class ComputeNodeMonitorActor(config.actor_class):
+ """Actor to manage a running compute node.
+
+ This actor gets updates about a compute node's cloud and Arvados records.
+ It uses this information to notify subscribers when the node is eligible
+ for shutdown.
+ """
+ def __init__(self, cloud_node, cloud_node_start_time, shutdown_timer,
+ timer_actor, update_actor, arvados_node=None,
+ poll_stale_after=600, node_stale_after=3600):
+ super(ComputeNodeMonitorActor, self).__init__()
+ self._later = self.actor_ref.proxy()
+ self._logger = logging.getLogger('arvnodeman.computenode')
+ self._last_log = None
+ self._shutdowns = shutdown_timer
+ self._timer = timer_actor
+ self._update = update_actor
+ self.cloud_node = cloud_node
+ self.cloud_node_start_time = cloud_node_start_time
+ self.poll_stale_after = poll_stale_after
+ self.node_stale_after = node_stale_after
+ self.subscribers = set()
+ self.arvados_node = None
+ self._later.update_arvados_node(arvados_node)
+ self.last_shutdown_opening = None
+ self._later.consider_shutdown()
+
+ def subscribe(self, subscriber):
+ self.subscribers.add(subscriber)
+
+ def _debug(self, msg, *args):
+ if msg == self._last_log:
+ return
+ self._last_log = msg
+ self._logger.debug(msg, *args)
+
+ def in_state(self, *states):
+ # Return a boolean to say whether or not our Arvados node record is in
+ # one of the given states. If state information is not
+ # available--because this node has no Arvados record, the record is
+ # stale, or the record has no state information--return None.
+ if (self.arvados_node is None) or not timestamp_fresh(
+ arvados_node_mtime(self.arvados_node), self.node_stale_after):
+ return None
+ state = self.arvados_node['info'].get('slurm_state')
+ if not state:
+ return None
+ result = state in states
+ if state == 'idle':
+ result = result and not self.arvados_node['job_uuid']
+ return result
+
+ def _shutdown_eligible(self):
+ if self.arvados_node is None:
+ # If this is a new, unpaired node, it's eligible for
+ # shutdown--we figure there was an error during bootstrap.
+ return timestamp_fresh(self.cloud_node_start_time,
+ self.node_stale_after)
+ else:
+ return self.in_state('idle')
+
+ def consider_shutdown(self):
+ next_opening = self._shutdowns.next_opening()
+ if self._shutdowns.window_open():
+ if self._shutdown_eligible():
+ self._debug("Node %s suggesting shutdown.", self.cloud_node.id)
+ _notify_subscribers(self._later, self.subscribers)
+ else:
+ self._debug("Node %s shutdown window open but node busy.",
+ self.cloud_node.id)
+ else:
+ self._debug("Node %s shutdown window closed. Next at %s.",
+ self.cloud_node.id, time.ctime(next_opening))
+ if self.last_shutdown_opening != next_opening:
+ self._timer.schedule(next_opening, self._later.consider_shutdown)
+ self.last_shutdown_opening = next_opening
+
+ def offer_arvados_pair(self, arvados_node):
+ if self.arvados_node is not None:
+ return None
+ elif arvados_node['ip_address'] in self.cloud_node.private_ips:
+ self._later.update_arvados_node(arvados_node)
+ return self.cloud_node.id
+ else:
+ return None
+
+ def update_cloud_node(self, cloud_node):
+ if cloud_node is not None:
+ self.cloud_node = cloud_node
+ self._later.consider_shutdown()
+
+ def update_arvados_node(self, arvados_node):
+ if arvados_node is not None:
+ self.arvados_node = arvados_node
+ new_hostname = arvados_node_fqdn(self.arvados_node)
+ if new_hostname != self.cloud_node.name:
+ self._update.sync_node(self.cloud_node, self.arvados_node)
+ self._later.consider_shutdown()
--- /dev/null
+#!/usr/bin/env python
+
+from __future__ import absolute_import, print_function
+
+class BaseComputeNodeDriver(object):
+ """Abstract base class for compute node drivers.
+
+ libcloud abstracts away many of the differences between cloud providers,
+ but managing compute nodes requires some cloud-specific features (e.g.,
+ on EC2 we use tags to identify compute nodes). Compute node drivers
+ are responsible for translating the node manager's cloud requests to a
+ specific cloud's vocabulary.
+
+ Subclasses must implement arvados_create_kwargs (to update node
+ creation kwargs with information about the specific Arvados node
+ record), sync_node, and node_start_time.
+ """
+ def __init__(self, auth_kwargs, list_kwargs, create_kwargs, driver_class):
+ self.real = driver_class(**auth_kwargs)
+ self.list_kwargs = list_kwargs
+ self.create_kwargs = create_kwargs
+
+ def __getattr__(self, name):
+ # Proxy non-extension methods to the real driver.
+ if (not name.startswith('_') and not name.startswith('ex_')
+ and hasattr(self.real, name)):
+ return getattr(self.real, name)
+ else:
+ return super(BaseComputeNodeDriver, self).__getattr__(name)
+
+ def search_for(self, term, list_method, key=lambda item: item.id):
+ cache_key = (list_method, term)
+ if cache_key not in self.SEARCH_CACHE:
+ results = [item for item in getattr(self.real, list_method)()
+ if key(item) == term]
+ count = len(results)
+ if count != 1:
+ raise ValueError("{} returned {} results for '{}'".format(
+ list_method, count, term))
+ self.SEARCH_CACHE[cache_key] = results[0]
+ return self.SEARCH_CACHE[cache_key]
+
+ def list_nodes(self):
+ return self.real.list_nodes(**self.list_kwargs)
+
+ def arvados_create_kwargs(self, arvados_node):
+ raise NotImplementedError("BaseComputeNodeDriver.arvados_create_kwargs")
+
+ def create_node(self, size, arvados_node):
+ kwargs = self.create_kwargs.copy()
+ kwargs.update(self.arvados_create_kwargs(arvados_node))
+ kwargs['size'] = size
+ return self.real.create_node(**kwargs)
+
+ def sync_node(self, cloud_node, arvados_node):
+ # When a compute node first pings the API server, the API server
+ # will automatically assign some attributes on the corresponding
+ # node record, like hostname. This method should propagate that
+ # information back to the cloud node appropriately.
+ raise NotImplementedError("BaseComputeNodeDriver.sync_node")
+
+ @classmethod
+ def node_start_time(cls, node):
+ raise NotImplementedError("BaseComputeNodeDriver.node_start_time")
import libcloud.compute.providers as cloud_provider
import libcloud.compute.types as cloud_types
-from . import BaseComputeNodeDriver, arvados_node_fqdn
+from . import BaseComputeNodeDriver
+from .. import arvados_node_fqdn
class ComputeNodeDriver(BaseComputeNodeDriver):
"""Compute node driver wrapper for libcloud's dummy driver.
import libcloud.compute.types as cloud_types
from libcloud.compute.drivers import ec2 as cloud_ec2
-from . import BaseComputeNodeDriver, arvados_node_fqdn
+from . import BaseComputeNodeDriver
+from .. import arvados_node_fqdn
### Monkeypatch libcloud to support AWS' new SecurityGroup API.
# These classes can be removed when libcloud support specifying
http=http)
def new_cloud_client(self):
- module = importlib.import_module('arvnodeman.computenode.' +
+ module = importlib.import_module('arvnodeman.computenode.driver.' +
self.get('Cloud', 'provider'))
auth_kwargs = self.get_section('Cloud Credentials')
if 'timeout' in auth_kwargs:
import pykka
from . import computenode as cnode
+from .computenode import dispatch
from .config import actor_class
class _ComputeNodeRecord(object):
arvados_factory, cloud_factory,
shutdown_windows, min_nodes, max_nodes,
poll_stale_after=600, node_stale_after=7200,
- node_setup_class=cnode.ComputeNodeSetupActor,
- node_shutdown_class=cnode.ComputeNodeShutdownActor,
- node_actor_class=cnode.ComputeNodeMonitorActor):
+ node_setup_class=dispatch.ComputeNodeSetupActor,
+ node_shutdown_class=dispatch.ComputeNodeShutdownActor,
+ node_actor_class=dispatch.ComputeNodeMonitorActor):
super(NodeManagerDaemonActor, self).__init__()
self._node_setup = node_setup_class
self._node_shutdown = node_shutdown_class
import pykka
from . import config as nmconfig
-from .computenode import \
- ComputeNodeSetupActor, ComputeNodeShutdownActor, ComputeNodeUpdateActor, \
- ShutdownTimer
+from .computenode.dispatch import ComputeNodeUpdateActor
from .daemon import NodeManagerDaemonActor
from .jobqueue import JobQueueMonitorActor, ServerCalculator
from .nodelist import ArvadosNodeListMonitorActor, CloudNodeListMonitorActor
import unittest
import arvados.errors as arverror
-import httplib2
import mock
-import pykka
import arvnodeman.computenode as cnode
from . import testutil
-class ComputeNodeSetupActorTestCase(testutil.ActorTestMixin, unittest.TestCase):
- def make_mocks(self, arvados_effect=None, cloud_effect=None):
- if arvados_effect is None:
- arvados_effect = [testutil.arvados_node_mock()]
- self.arvados_effect = arvados_effect
- self.timer = testutil.MockTimer()
- self.api_client = mock.MagicMock(name='api_client')
- self.api_client.nodes().create().execute.side_effect = arvados_effect
- self.api_client.nodes().update().execute.side_effect = arvados_effect
- self.cloud_client = mock.MagicMock(name='cloud_client')
- self.cloud_client.create_node.return_value = testutil.cloud_node_mock(1)
-
- def make_actor(self, arv_node=None):
- if not hasattr(self, 'timer'):
- self.make_mocks(arvados_effect=[arv_node])
- self.setup_actor = cnode.ComputeNodeSetupActor.start(
- self.timer, self.api_client, self.cloud_client,
- testutil.MockSize(1), arv_node).proxy()
-
- def test_creation_without_arvados_node(self):
- self.make_actor()
- self.assertEqual(self.arvados_effect[-1],
- self.setup_actor.arvados_node.get(self.TIMEOUT))
- self.assertTrue(self.api_client.nodes().create().execute.called)
- self.assertEqual(self.cloud_client.create_node(),
- self.setup_actor.cloud_node.get(self.TIMEOUT))
-
- def test_creation_with_arvados_node(self):
- self.make_actor(testutil.arvados_node_mock())
- self.assertEqual(self.arvados_effect[-1],
- self.setup_actor.arvados_node.get(self.TIMEOUT))
- self.assertTrue(self.api_client.nodes().update().execute.called)
- self.assertEqual(self.cloud_client.create_node(),
- self.setup_actor.cloud_node.get(self.TIMEOUT))
-
- def test_failed_calls_retried(self):
- self.make_mocks([
- arverror.ApiError(httplib2.Response({'status': '500'}), ""),
- testutil.arvados_node_mock(),
- ])
- self.make_actor()
- self.wait_for_assignment(self.setup_actor, 'cloud_node')
-
- def test_stop_when_no_cloud_node(self):
- self.make_mocks(
- arverror.ApiError(httplib2.Response({'status': '500'}), ""))
- self.make_actor()
- self.setup_actor.stop_if_no_cloud_node()
- self.assertTrue(
- self.setup_actor.actor_ref.actor_stopped.wait(self.TIMEOUT))
-
- def test_no_stop_when_cloud_node(self):
- self.make_actor()
- self.wait_for_assignment(self.setup_actor, 'cloud_node')
- self.setup_actor.stop_if_no_cloud_node().get(self.TIMEOUT)
- self.assertTrue(self.stop_proxy(self.setup_actor),
- "actor was stopped by stop_if_no_cloud_node")
-
- def test_subscribe(self):
- self.make_mocks(
- arverror.ApiError(httplib2.Response({'status': '500'}), ""))
- self.make_actor()
- subscriber = mock.Mock(name='subscriber_mock')
- self.setup_actor.subscribe(subscriber)
- self.api_client.nodes().create().execute.side_effect = [
- testutil.arvados_node_mock()]
- self.wait_for_assignment(self.setup_actor, 'cloud_node')
- self.assertEqual(self.setup_actor.actor_ref.actor_urn,
- subscriber.call_args[0][0].actor_ref.actor_urn)
-
- def test_late_subscribe(self):
- self.make_actor()
- subscriber = mock.Mock(name='subscriber_mock')
- self.wait_for_assignment(self.setup_actor, 'cloud_node')
- self.setup_actor.subscribe(subscriber).get(self.TIMEOUT)
- self.stop_proxy(self.setup_actor)
- self.assertEqual(self.setup_actor.actor_ref.actor_urn,
- subscriber.call_args[0][0].actor_ref.actor_urn)
-
-
-class ComputeNodeShutdownActorTestCase(testutil.ActorTestMixin,
- unittest.TestCase):
- def make_mocks(self, cloud_node=None):
- self.timer = testutil.MockTimer()
- self.cloud_client = mock.MagicMock(name='cloud_client')
- if cloud_node is None:
- cloud_node = testutil.cloud_node_mock()
- self.cloud_node = cloud_node
-
- def make_actor(self, arv_node=None):
- if not hasattr(self, 'timer'):
- self.make_mocks()
- self.shutdown_actor = cnode.ComputeNodeShutdownActor.start(
- self.timer, self.cloud_client, self.cloud_node).proxy()
-
- def test_easy_shutdown(self):
- self.make_actor()
- self.shutdown_actor.cloud_node.get(self.TIMEOUT)
- self.stop_proxy(self.shutdown_actor)
- self.assertTrue(self.cloud_client.destroy_node.called)
-
- def test_late_subscribe(self):
- self.make_actor()
- subscriber = mock.Mock(name='subscriber_mock')
- self.shutdown_actor.subscribe(subscriber).get(self.TIMEOUT)
- self.stop_proxy(self.shutdown_actor)
- self.assertEqual(self.shutdown_actor.actor_ref.actor_urn,
- subscriber.call_args[0][0].actor_ref.actor_urn)
-
-
-class ComputeNodeUpdateActorTestCase(testutil.ActorTestMixin,
- unittest.TestCase):
- def make_actor(self):
- self.driver = mock.MagicMock(name='driver_mock')
- self.updater = cnode.ComputeNodeUpdateActor.start(self.driver).proxy()
-
- def test_node_sync(self):
- self.make_actor()
- cloud_node = testutil.cloud_node_mock()
- arv_node = testutil.arvados_node_mock()
- self.updater.sync_node(cloud_node, arv_node).get(self.TIMEOUT)
- self.driver().sync_node.assert_called_with(cloud_node, arv_node)
-
-
@mock.patch('time.time', return_value=1)
class ShutdownTimerTestCase(unittest.TestCase):
def test_two_length_window(self, time_mock):
time_mock.return_value += 200
self.assertEqual(961, timer.next_opening())
self.assertFalse(timer.window_open())
-
-
-class ComputeNodeMonitorActorTestCase(testutil.ActorTestMixin,
- unittest.TestCase):
- class MockShutdownTimer(object):
- def _set_state(self, is_open, next_opening):
- self.window_open = lambda: is_open
- self.next_opening = lambda: next_opening
-
-
- def make_mocks(self, node_num):
- self.shutdowns = self.MockShutdownTimer()
- self.shutdowns._set_state(False, 300)
- self.timer = mock.MagicMock(name='timer_mock')
- self.updates = mock.MagicMock(name='update_mock')
- self.cloud_mock = testutil.cloud_node_mock(node_num)
- self.subscriber = mock.Mock(name='subscriber_mock')
-
- def make_actor(self, node_num=1, arv_node=None, start_time=None):
- if not hasattr(self, 'cloud_mock'):
- self.make_mocks(node_num)
- if start_time is None:
- start_time = time.time()
- self.node_actor = cnode.ComputeNodeMonitorActor.start(
- self.cloud_mock, start_time, self.shutdowns, self.timer,
- self.updates, arv_node).proxy()
- self.node_actor.subscribe(self.subscriber).get(self.TIMEOUT)
-
- def node_state(self, *states):
- return self.node_actor.in_state(*states).get(self.TIMEOUT)
-
- def test_in_state_when_unpaired(self):
- self.make_actor()
- self.assertIsNone(self.node_state('idle', 'alloc'))
-
- def test_in_state_when_pairing_stale(self):
- self.make_actor(arv_node=testutil.arvados_node_mock(
- job_uuid=None, age=90000))
- self.assertIsNone(self.node_state('idle', 'alloc'))
-
- def test_in_state_when_no_state_available(self):
- self.make_actor(arv_node=testutil.arvados_node_mock(info={}))
- self.assertIsNone(self.node_state('idle', 'alloc'))
-
- def test_in_idle_state(self):
- self.make_actor(2, arv_node=testutil.arvados_node_mock(job_uuid=None))
- self.assertTrue(self.node_state('idle'))
- self.assertFalse(self.node_state('alloc'))
- self.assertTrue(self.node_state('idle', 'alloc'))
-
- def test_in_alloc_state(self):
- self.make_actor(3, arv_node=testutil.arvados_node_mock(job_uuid=True))
- self.assertFalse(self.node_state('idle'))
- self.assertTrue(self.node_state('alloc'))
- self.assertTrue(self.node_state('idle', 'alloc'))
-
- def test_init_shutdown_scheduling(self):
- self.make_actor()
- self.assertTrue(self.timer.schedule.called)
- self.assertEqual(300, self.timer.schedule.call_args[0][0])
-
- def test_shutdown_subscription(self):
- self.make_actor()
- self.shutdowns._set_state(True, 600)
- self.node_actor.consider_shutdown().get(self.TIMEOUT)
- self.assertTrue(self.subscriber.called)
- self.assertEqual(self.node_actor.actor_ref.actor_urn,
- self.subscriber.call_args[0][0].actor_ref.actor_urn)
-
- def test_shutdown_without_arvados_node(self):
- self.make_actor()
- self.shutdowns._set_state(True, 600)
- self.node_actor.consider_shutdown().get(self.TIMEOUT)
- self.assertTrue(self.subscriber.called)
-
- def test_no_shutdown_without_arvados_node_and_old_cloud_node(self):
- self.make_actor(start_time=0)
- self.shutdowns._set_state(True, 600)
- self.node_actor.consider_shutdown().get(self.TIMEOUT)
- self.assertFalse(self.subscriber.called)
-
- def check_shutdown_rescheduled(self, window_open, next_window,
- schedule_time=None):
- self.shutdowns._set_state(window_open, next_window)
- self.timer.schedule.reset_mock()
- self.node_actor.consider_shutdown().get(self.TIMEOUT)
- self.stop_proxy(self.node_actor)
- self.assertTrue(self.timer.schedule.called)
- if schedule_time is not None:
- self.assertEqual(schedule_time, self.timer.schedule.call_args[0][0])
- self.assertFalse(self.subscriber.called)
-
- def test_shutdown_window_close_scheduling(self):
- self.make_actor()
- self.check_shutdown_rescheduled(False, 600, 600)
-
- def test_no_shutdown_when_node_running_job(self):
- self.make_actor(4, testutil.arvados_node_mock(4, job_uuid=True))
- self.check_shutdown_rescheduled(True, 600)
-
- def test_no_shutdown_when_node_state_unknown(self):
- self.make_actor(5, testutil.arvados_node_mock(5, info={}))
- self.check_shutdown_rescheduled(True, 600)
-
- def test_no_shutdown_when_node_state_stale(self):
- self.make_actor(6, testutil.arvados_node_mock(6, age=90000))
- self.check_shutdown_rescheduled(True, 600)
-
- def test_arvados_node_match(self):
- self.make_actor(2)
- arv_node = testutil.arvados_node_mock(
- 2, hostname='compute-two.zzzzz.arvadosapi.com')
- pair_id = self.node_actor.offer_arvados_pair(arv_node).get(self.TIMEOUT)
- self.assertEqual(self.cloud_mock.id, pair_id)
- self.stop_proxy(self.node_actor)
- self.updates.sync_node.assert_called_with(self.cloud_mock, arv_node)
-
- def test_arvados_node_mismatch(self):
- self.make_actor(3)
- arv_node = testutil.arvados_node_mock(1)
- self.assertIsNone(
- self.node_actor.offer_arvados_pair(arv_node).get(self.TIMEOUT))
-
- def test_update_cloud_node(self):
- self.make_actor(1)
- self.make_mocks(2)
- self.cloud_mock.id = '1'
- self.node_actor.update_cloud_node(self.cloud_mock)
- current_cloud = self.node_actor.cloud_node.get(self.TIMEOUT)
- self.assertEqual([testutil.ip_address_mock(2)],
- current_cloud.private_ips)
-
- def test_missing_cloud_node_update(self):
- self.make_actor(1)
- self.node_actor.update_cloud_node(None)
- current_cloud = self.node_actor.cloud_node.get(self.TIMEOUT)
- self.assertEqual([testutil.ip_address_mock(1)],
- current_cloud.private_ips)
-
- def test_update_arvados_node(self):
- self.make_actor(3)
- job_uuid = 'zzzzz-jjjjj-updatejobnode00'
- new_arvados = testutil.arvados_node_mock(3, job_uuid)
- self.node_actor.update_arvados_node(new_arvados)
- current_arvados = self.node_actor.arvados_node.get(self.TIMEOUT)
- self.assertEqual(job_uuid, current_arvados['job_uuid'])
-
- def test_missing_arvados_node_update(self):
- self.make_actor(4, testutil.arvados_node_mock(4))
- self.node_actor.update_arvados_node(None)
- current_arvados = self.node_actor.arvados_node.get(self.TIMEOUT)
- self.assertEqual(testutil.ip_address_mock(4),
- current_arvados['ip_address'])
--- /dev/null
+#!/usr/bin/env python
+
+from __future__ import absolute_import, print_function
+
+import time
+import unittest
+
+import arvados.errors as arverror
+import httplib2
+import mock
+import pykka
+
+import arvnodeman.computenode.dispatch as dispatch
+from . import testutil
+
+class ComputeNodeSetupActorTestCase(testutil.ActorTestMixin, unittest.TestCase):
+ def make_mocks(self, arvados_effect=None, cloud_effect=None):
+ if arvados_effect is None:
+ arvados_effect = [testutil.arvados_node_mock()]
+ self.arvados_effect = arvados_effect
+ self.timer = testutil.MockTimer()
+ self.api_client = mock.MagicMock(name='api_client')
+ self.api_client.nodes().create().execute.side_effect = arvados_effect
+ self.api_client.nodes().update().execute.side_effect = arvados_effect
+ self.cloud_client = mock.MagicMock(name='cloud_client')
+ self.cloud_client.create_node.return_value = testutil.cloud_node_mock(1)
+
+ def make_actor(self, arv_node=None):
+ if not hasattr(self, 'timer'):
+ self.make_mocks(arvados_effect=[arv_node])
+ self.setup_actor = dispatch.ComputeNodeSetupActor.start(
+ self.timer, self.api_client, self.cloud_client,
+ testutil.MockSize(1), arv_node).proxy()
+
+ def test_creation_without_arvados_node(self):
+ self.make_actor()
+ self.assertEqual(self.arvados_effect[-1],
+ self.setup_actor.arvados_node.get(self.TIMEOUT))
+ self.assertTrue(self.api_client.nodes().create().execute.called)
+ self.assertEqual(self.cloud_client.create_node(),
+ self.setup_actor.cloud_node.get(self.TIMEOUT))
+
+ def test_creation_with_arvados_node(self):
+ self.make_actor(testutil.arvados_node_mock())
+ self.assertEqual(self.arvados_effect[-1],
+ self.setup_actor.arvados_node.get(self.TIMEOUT))
+ self.assertTrue(self.api_client.nodes().update().execute.called)
+ self.assertEqual(self.cloud_client.create_node(),
+ self.setup_actor.cloud_node.get(self.TIMEOUT))
+
+ def test_failed_calls_retried(self):
+ self.make_mocks([
+ arverror.ApiError(httplib2.Response({'status': '500'}), ""),
+ testutil.arvados_node_mock(),
+ ])
+ self.make_actor()
+ self.wait_for_assignment(self.setup_actor, 'cloud_node')
+
+ def test_stop_when_no_cloud_node(self):
+ self.make_mocks(
+ arverror.ApiError(httplib2.Response({'status': '500'}), ""))
+ self.make_actor()
+ self.setup_actor.stop_if_no_cloud_node()
+ self.assertTrue(
+ self.setup_actor.actor_ref.actor_stopped.wait(self.TIMEOUT))
+
+ def test_no_stop_when_cloud_node(self):
+ self.make_actor()
+ self.wait_for_assignment(self.setup_actor, 'cloud_node')
+ self.setup_actor.stop_if_no_cloud_node().get(self.TIMEOUT)
+ self.assertTrue(self.stop_proxy(self.setup_actor),
+ "actor was stopped by stop_if_no_cloud_node")
+
+ def test_subscribe(self):
+ self.make_mocks(
+ arverror.ApiError(httplib2.Response({'status': '500'}), ""))
+ self.make_actor()
+ subscriber = mock.Mock(name='subscriber_mock')
+ self.setup_actor.subscribe(subscriber)
+ self.api_client.nodes().create().execute.side_effect = [
+ testutil.arvados_node_mock()]
+ self.wait_for_assignment(self.setup_actor, 'cloud_node')
+ self.assertEqual(self.setup_actor.actor_ref.actor_urn,
+ subscriber.call_args[0][0].actor_ref.actor_urn)
+
+ def test_late_subscribe(self):
+ self.make_actor()
+ subscriber = mock.Mock(name='subscriber_mock')
+ self.wait_for_assignment(self.setup_actor, 'cloud_node')
+ self.setup_actor.subscribe(subscriber).get(self.TIMEOUT)
+ self.stop_proxy(self.setup_actor)
+ self.assertEqual(self.setup_actor.actor_ref.actor_urn,
+ subscriber.call_args[0][0].actor_ref.actor_urn)
+
+
+class ComputeNodeShutdownActorTestCase(testutil.ActorTestMixin,
+ unittest.TestCase):
+ def make_mocks(self, cloud_node=None):
+ self.timer = testutil.MockTimer()
+ self.cloud_client = mock.MagicMock(name='cloud_client')
+ if cloud_node is None:
+ cloud_node = testutil.cloud_node_mock()
+ self.cloud_node = cloud_node
+
+ def make_actor(self, arv_node=None):
+ if not hasattr(self, 'timer'):
+ self.make_mocks()
+ self.shutdown_actor = dispatch.ComputeNodeShutdownActor.start(
+ self.timer, self.cloud_client, self.cloud_node).proxy()
+
+ def test_easy_shutdown(self):
+ self.make_actor()
+ self.shutdown_actor.cloud_node.get(self.TIMEOUT)
+ self.stop_proxy(self.shutdown_actor)
+ self.assertTrue(self.cloud_client.destroy_node.called)
+
+ def test_late_subscribe(self):
+ self.make_actor()
+ subscriber = mock.Mock(name='subscriber_mock')
+ self.shutdown_actor.subscribe(subscriber).get(self.TIMEOUT)
+ self.stop_proxy(self.shutdown_actor)
+ self.assertEqual(self.shutdown_actor.actor_ref.actor_urn,
+ subscriber.call_args[0][0].actor_ref.actor_urn)
+
+
+class ComputeNodeUpdateActorTestCase(testutil.ActorTestMixin,
+ unittest.TestCase):
+ def make_actor(self):
+ self.driver = mock.MagicMock(name='driver_mock')
+ self.updater = dispatch.ComputeNodeUpdateActor.start(self.driver).proxy()
+
+ def test_node_sync(self):
+ self.make_actor()
+ cloud_node = testutil.cloud_node_mock()
+ arv_node = testutil.arvados_node_mock()
+ self.updater.sync_node(cloud_node, arv_node).get(self.TIMEOUT)
+ self.driver().sync_node.assert_called_with(cloud_node, arv_node)
+
+
+class ComputeNodeMonitorActorTestCase(testutil.ActorTestMixin,
+ unittest.TestCase):
+ class MockShutdownTimer(object):
+ def _set_state(self, is_open, next_opening):
+ self.window_open = lambda: is_open
+ self.next_opening = lambda: next_opening
+
+
+ def make_mocks(self, node_num):
+ self.shutdowns = self.MockShutdownTimer()
+ self.shutdowns._set_state(False, 300)
+ self.timer = mock.MagicMock(name='timer_mock')
+ self.updates = mock.MagicMock(name='update_mock')
+ self.cloud_mock = testutil.cloud_node_mock(node_num)
+ self.subscriber = mock.Mock(name='subscriber_mock')
+
+ def make_actor(self, node_num=1, arv_node=None, start_time=None):
+ if not hasattr(self, 'cloud_mock'):
+ self.make_mocks(node_num)
+ if start_time is None:
+ start_time = time.time()
+ self.node_actor = dispatch.ComputeNodeMonitorActor.start(
+ self.cloud_mock, start_time, self.shutdowns, self.timer,
+ self.updates, arv_node).proxy()
+ self.node_actor.subscribe(self.subscriber).get(self.TIMEOUT)
+
+ def node_state(self, *states):
+ return self.node_actor.in_state(*states).get(self.TIMEOUT)
+
+ def test_in_state_when_unpaired(self):
+ self.make_actor()
+ self.assertIsNone(self.node_state('idle', 'alloc'))
+
+ def test_in_state_when_pairing_stale(self):
+ self.make_actor(arv_node=testutil.arvados_node_mock(
+ job_uuid=None, age=90000))
+ self.assertIsNone(self.node_state('idle', 'alloc'))
+
+ def test_in_state_when_no_state_available(self):
+ self.make_actor(arv_node=testutil.arvados_node_mock(info={}))
+ self.assertIsNone(self.node_state('idle', 'alloc'))
+
+ def test_in_idle_state(self):
+ self.make_actor(2, arv_node=testutil.arvados_node_mock(job_uuid=None))
+ self.assertTrue(self.node_state('idle'))
+ self.assertFalse(self.node_state('alloc'))
+ self.assertTrue(self.node_state('idle', 'alloc'))
+
+ def test_in_alloc_state(self):
+ self.make_actor(3, arv_node=testutil.arvados_node_mock(job_uuid=True))
+ self.assertFalse(self.node_state('idle'))
+ self.assertTrue(self.node_state('alloc'))
+ self.assertTrue(self.node_state('idle', 'alloc'))
+
+ def test_init_shutdown_scheduling(self):
+ self.make_actor()
+ self.assertTrue(self.timer.schedule.called)
+ self.assertEqual(300, self.timer.schedule.call_args[0][0])
+
+ def test_shutdown_subscription(self):
+ self.make_actor()
+ self.shutdowns._set_state(True, 600)
+ self.node_actor.consider_shutdown().get(self.TIMEOUT)
+ self.assertTrue(self.subscriber.called)
+ self.assertEqual(self.node_actor.actor_ref.actor_urn,
+ self.subscriber.call_args[0][0].actor_ref.actor_urn)
+
+ def test_shutdown_without_arvados_node(self):
+ self.make_actor()
+ self.shutdowns._set_state(True, 600)
+ self.node_actor.consider_shutdown().get(self.TIMEOUT)
+ self.assertTrue(self.subscriber.called)
+
+ def test_no_shutdown_without_arvados_node_and_old_cloud_node(self):
+ self.make_actor(start_time=0)
+ self.shutdowns._set_state(True, 600)
+ self.node_actor.consider_shutdown().get(self.TIMEOUT)
+ self.assertFalse(self.subscriber.called)
+
+ def check_shutdown_rescheduled(self, window_open, next_window,
+ schedule_time=None):
+ self.shutdowns._set_state(window_open, next_window)
+ self.timer.schedule.reset_mock()
+ self.node_actor.consider_shutdown().get(self.TIMEOUT)
+ self.stop_proxy(self.node_actor)
+ self.assertTrue(self.timer.schedule.called)
+ if schedule_time is not None:
+ self.assertEqual(schedule_time, self.timer.schedule.call_args[0][0])
+ self.assertFalse(self.subscriber.called)
+
+ def test_shutdown_window_close_scheduling(self):
+ self.make_actor()
+ self.check_shutdown_rescheduled(False, 600, 600)
+
+ def test_no_shutdown_when_node_running_job(self):
+ self.make_actor(4, testutil.arvados_node_mock(4, job_uuid=True))
+ self.check_shutdown_rescheduled(True, 600)
+
+ def test_no_shutdown_when_node_state_unknown(self):
+ self.make_actor(5, testutil.arvados_node_mock(5, info={}))
+ self.check_shutdown_rescheduled(True, 600)
+
+ def test_no_shutdown_when_node_state_stale(self):
+ self.make_actor(6, testutil.arvados_node_mock(6, age=90000))
+ self.check_shutdown_rescheduled(True, 600)
+
+ def test_arvados_node_match(self):
+ self.make_actor(2)
+ arv_node = testutil.arvados_node_mock(
+ 2, hostname='compute-two.zzzzz.arvadosapi.com')
+ pair_id = self.node_actor.offer_arvados_pair(arv_node).get(self.TIMEOUT)
+ self.assertEqual(self.cloud_mock.id, pair_id)
+ self.stop_proxy(self.node_actor)
+ self.updates.sync_node.assert_called_with(self.cloud_mock, arv_node)
+
+ def test_arvados_node_mismatch(self):
+ self.make_actor(3)
+ arv_node = testutil.arvados_node_mock(1)
+ self.assertIsNone(
+ self.node_actor.offer_arvados_pair(arv_node).get(self.TIMEOUT))
+
+ def test_update_cloud_node(self):
+ self.make_actor(1)
+ self.make_mocks(2)
+ self.cloud_mock.id = '1'
+ self.node_actor.update_cloud_node(self.cloud_mock)
+ current_cloud = self.node_actor.cloud_node.get(self.TIMEOUT)
+ self.assertEqual([testutil.ip_address_mock(2)],
+ current_cloud.private_ips)
+
+ def test_missing_cloud_node_update(self):
+ self.make_actor(1)
+ self.node_actor.update_cloud_node(None)
+ current_cloud = self.node_actor.cloud_node.get(self.TIMEOUT)
+ self.assertEqual([testutil.ip_address_mock(1)],
+ current_cloud.private_ips)
+
+ def test_update_arvados_node(self):
+ self.make_actor(3)
+ job_uuid = 'zzzzz-jjjjj-updatejobnode00'
+ new_arvados = testutil.arvados_node_mock(3, job_uuid)
+ self.node_actor.update_arvados_node(new_arvados)
+ current_arvados = self.node_actor.arvados_node.get(self.TIMEOUT)
+ self.assertEqual(job_uuid, current_arvados['job_uuid'])
+
+ def test_missing_arvados_node_update(self):
+ self.make_actor(4, testutil.arvados_node_mock(4))
+ self.node_actor.update_arvados_node(None)
+ current_arvados = self.node_actor.arvados_node.get(self.TIMEOUT)
+ self.assertEqual(testutil.ip_address_mock(4),
+ current_arvados['ip_address'])
import mock
-import arvnodeman.computenode.ec2 as ec2
+import arvnodeman.computenode.driver.ec2 as ec2
from . import testutil
class EC2ComputeNodeDriverTestCase(unittest.TestCase):
import mock
import pykka
-import arvnodeman.computenode as nmcnode
import arvnodeman.daemon as nmdaemon
+from arvnodeman.computenode.dispatch import ComputeNodeMonitorActor
from . import testutil
class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
self.daemon.update_server_wishlist(want_sizes).get(self.TIMEOUT)
def monitor_list(self):
- return pykka.ActorRegistry.get_by_class(nmcnode.ComputeNodeMonitorActor)
+ return pykka.ActorRegistry.get_by_class(ComputeNodeMonitorActor)
def alive_monitor_count(self):
return sum(1 for actor in self.monitor_list() if actor.is_alive())