source 'https://rubygems.org'
gem 'rails', '~> 4.1.0'
-gem 'arvados', '>= 0.1.20150313191637'
+gem 'arvados', '>= 0.1.20150413172135'
gem 'sqlite3'
andand (1.3.3)
angularjs-rails (1.3.8)
arel (5.0.1.20140414130214)
- arvados (0.1.20150313191637)
+ arvados (0.1.20150413172135)
activesupport (>= 3.2.13)
andand (~> 1.3, >= 1.3.3)
google-api-client (~> 0.6.3, >= 0.6.3)
RedCloth
andand
angularjs-rails
- arvados (>= 0.1.20150313191637)
+ arvados (>= 0.1.20150413172135)
bootstrap-sass (~> 3.1.0)
bootstrap-tab-history-rails
bootstrap-x-editable-rails
on('ready ajax:complete', function() {
// This makes the dialog close on Esc key, obviously.
$('.modal').attr('tabindex', '-1')
+ }).
+ on('ready', function() {
+ // Need this to trigger input validation/synchronization callbacks because some browsers
+ // auto-fill form fields (e.g., when navigating "back" to a page where some text
+ // had been entered in a search box) without triggering a change or input event.
+ $('input').trigger('input');
});
HeaderRowFixer = function(selector) {
--- /dev/null
+// Usage:
+//
+// 1. Add some buttons to your modal, one with class="pager-next" and
+// one with class="pager-prev".
+//
+// 2. Put multiple .modal-body sections in your modal.
+//
+// 3. Add a "pager-count" div where page count is shown.
+// For ex: "1 of 10" when showing first page of 10 pages.
+
+$(document).on('click', '.modal .pager-next', function() {
+ var $modal = $(this).parents('.modal');
+ $modal.data('page', ($modal.data('page') || 0) + 1).trigger('pager:render');
+ return false;
+}).on('click', '.modal .pager-prev', function() {
+ var $modal = $(this).parents('.modal');
+ $modal.data('page', ($modal.data('page') || 1) - 1).trigger('pager:render');
+ return false;
+}).on('ready ajax:success', function() {
+ $('.modal').trigger('pager:render');
+}).on('pager:render', '.modal', function() {
+ var $modal = $(this);
+ var page = $modal.data('page') || 0;
+ var $panes = $('.modal-body', $modal);
+ if (page >= $panes.length) {
+ // Somehow moved past end
+ page = $panes.length - 1;
+ $modal.data('page', page);
+ } else if (page < 0) {
+ page = 0;
+ }
+
+ var $pager_count = $('.pager-count', $modal);
+ $pager_count.text((page+1) + " of " + $panes.length);
+
+ var selected = $panes.hide().eq(page).show();
+ enableButton($('.pager-prev', $modal), page > 0);
+ enableButton($('.pager-next', $modal), page < $panes.length - 1);
+ function enableButton(btn, ok) {
+ btn.prop('disabled', !ok).
+ toggleClass('btn-primary', ok).
+ toggleClass('btn-default', !ok);
+ }
+});
function run_pipeline_button_state() {
var a = $('a.editable.required.editable-empty,input.form-control.required[value=""]');
- if (a.length > 0) {
+ if ((a.length > 0) || ($('.unreadable-inputs-present').length)) {
$(".run-pipeline-button").addClass("disabled");
}
else {
$tag.parent().prev().css("background-color", "");
}
});
+ $('input.required').each(function() {
+ var $tag = $(this);
+ if ($tag.hasClass("unreadable-input")) {
+ $tag.parent().parent().css("background-color", "#ffdddd");
+ $tag.parent().parent().prev().css("background-color", "#ffdddd");
+ }
+ else {
+ $tag.parent().parent().css("background-color", "");
+ $tag.parent().parent().prev().css("background-color", "");
+ }
+ });
run_pipeline_button_state();
});
ul.nav.nav-tabs {
font-size: 90%
}
+
+.hover-dropdown:hover .dropdown-menu {
+ display: block;
+}
+require "arvados/collection"
+
class ActionsController < ApplicationController
skip_filter :require_thread_api_token, only: [:report_issue_popup, :report_issue]
end
end
- def arv_normalize mt, *opts
- r = ""
- env = Hash[ENV].
- merge({'ARVADOS_API_HOST' =>
- arvados_api_client.arvados_v1_base.
- sub(/\/arvados\/v1/, '').
- sub(/^https?:\/\//, ''),
- 'ARVADOS_API_TOKEN' => 'x',
- 'ARVADOS_API_HOST_INSECURE' =>
- Rails.configuration.arvados_insecure_https ? 'true' : 'false'
- })
- IO.popen([env, 'arv-normalize'] + opts, 'w+b') do |io|
- io.write mt
- io.close_write
- while buf = io.read(2**16)
- r += buf
- end
+ expose_action :combine_selected_files_into_collection do
+ link_uuids, coll_ids = params["selection"].partition do |sel_s|
+ ArvadosBase::resource_class_for_uuid(sel_s) == Link
end
- r
- end
- expose_action :combine_selected_files_into_collection do
- uuids = []
- pdhs = []
- files = []
- params["selection"].each do |s|
- a = ArvadosBase::resource_class_for_uuid s
- if a == Link
- begin
- if (m = CollectionsHelper.match(Link.find(s).head_uuid))
- pdhs.append(m[1] + m[2])
- files.append(m)
- end
- rescue
+ unless link_uuids.empty?
+ Link.select([:head_uuid]).where(uuid: link_uuids).each do |link|
+ if ArvadosBase::resource_class_for_uuid(link.head_uuid) == Collection
+ coll_ids << link.head_uuid
end
- elsif (m = CollectionsHelper.match(s))
- pdhs.append(m[1] + m[2])
- files.append(m)
- elsif (m = CollectionsHelper.match_uuid_with_optional_filepath(s))
- uuids.append(m[1])
- files.append(m)
end
end
- pdhs = pdhs.uniq
- uuids = uuids.uniq
- chash = {}
-
- Collection.select([:uuid, :manifest_text]).where(uuid: uuids).each do |c|
- chash[c.uuid] = c
+ uuids = []
+ pdhs = []
+ source_paths = Hash.new { |hash, key| hash[key] = [] }
+ coll_ids.each do |coll_id|
+ if m = CollectionsHelper.match(coll_id)
+ key = m[1] + m[2]
+ pdhs << key
+ source_paths[key] << m[4]
+ elsif m = CollectionsHelper.match_uuid_with_optional_filepath(coll_id)
+ key = m[1]
+ uuids << key
+ source_paths[key] << m[4]
+ end
end
- Collection.select([:portable_data_hash, :manifest_text]).where(portable_data_hash: pdhs).each do |c|
- chash[c.portable_data_hash] = c
+ unless pdhs.empty?
+ Collection.where(portable_data_hash: pdhs.uniq).
+ select([:uuid, :portable_data_hash]).each do |coll|
+ unless source_paths[coll.portable_data_hash].empty?
+ uuids << coll.uuid
+ source_paths[coll.uuid] = source_paths.delete(coll.portable_data_hash)
+ end
+ end
end
- combined = ""
- files_in_dirs = {}
- files.each do |m|
- mt = chash[m[1]+m[2]].andand.manifest_text
- if not m[4].nil? and m[4].size > 1
- manifest_files = files_in_dirs['.']
- if !manifest_files
- manifest_files = []
- files_in_dirs['.'] = manifest_files
- end
- manifest_file = m[4].split('/')[-1]
- uniq_file = derive_unique_filename(manifest_file, manifest_files)
- normalized = arv_normalize mt, '--extract', ".#{m[4]}"
- normalized = normalized.gsub(/(\d+:\d+:)(#{Regexp.quote manifest_file})/) {|s| "#{$1}#{uniq_file}" }
- combined += normalized
- manifest_files << uniq_file
+ new_coll = Arv::Collection.new
+ Collection.where(uuid: uuids.uniq).
+ select([:uuid, :manifest_text]).each do |coll|
+ src_coll = Arv::Collection.new(coll.manifest_text)
+ src_pathlist = source_paths[coll.uuid]
+ if src_pathlist.any?(&:blank?)
+ src_pathlist = src_coll.each_file_path
+ destdir = nil
else
- mt = arv_normalize mt
- manifest_streams = mt.split "\n"
- adjusted_streams = []
- manifest_streams.each do |stream|
- manifest_parts = stream.split
- adjusted_parts = []
- manifest_files = files_in_dirs[manifest_parts[0]]
- if !manifest_files
- manifest_files = []
- files_in_dirs[manifest_parts[0]] = manifest_files
- end
-
- manifest_parts.each do |part|
- part_match = /(\d+:\d+:)(\S+)/.match(part)
- if part_match
- uniq_file = derive_unique_filename(part_match[2], manifest_files)
- adjusted_parts << "#{part_match[1]}#{uniq_file}"
- manifest_files << uniq_file
- else
- adjusted_parts << part
- end
- end
- adjusted_streams << adjusted_parts.join(' ')
+ destdir = "."
+ end
+ src_pathlist.each do |src_path|
+ src_path = src_path.sub(/^(\.\/|\/|)/, "./")
+ src_stream, _, basename = src_path.rpartition("/")
+ dst_stream = destdir || src_stream
+ # Generate a unique name by adding (1), (2), etc. to it.
+ # If the filename has a dot that's not at the beginning, insert the
+ # number just before that. Otherwise, append the number to the name.
+ if match = basename.match(/[^\.]\./)
+ suffix_start = match.begin(0) + 1
+ else
+ suffix_start = basename.size
end
- adjusted_streams.each do |stream|
- combined += (stream + "\n")
+ suffix_size = 0
+ dst_path = nil
+ loop.each_with_index do |_, try_count|
+ dst_path = "#{dst_stream}/#{basename}"
+ break unless new_coll.exist?(dst_path)
+ uniq_suffix = "(#{try_count + 1})"
+ basename[suffix_start, suffix_size] = uniq_suffix
+ suffix_size = uniq_suffix.size
end
+ new_coll.cp_r(src_path, dst_path, src_coll)
end
end
- normalized = arv_normalize combined
- newc = Collection.new({:manifest_text => normalized})
- newc.name = newc.name || "Collection created at #{Time.now.localtime}"
+ coll_attrs = {
+ manifest_text: new_coll.manifest_text,
+ name: "Collection created at #{Time.now.localtime}",
+ }
+ flash = {}
# set owner_uuid to current project, provided it is writable
- current_project_writable = false
- action_data = JSON.parse(params['action_data']) if params['action_data']
- if action_data && action_data['current_project_uuid']
- current_project = Group.find(action_data['current_project_uuid']) rescue nil
- if (current_project && current_project.writable_by.andand.include?(current_user.uuid))
- newc.owner_uuid = action_data['current_project_uuid']
- current_project_writable = true
- end
+ action_data = Oj.load(params['action_data'] || "{}")
+ if action_data['current_project_uuid'] and
+ current_project = Group.find?(action_data['current_project_uuid']) and
+ current_project.writable_by.andand.include?(current_user.uuid)
+ coll_attrs[:owner_uuid] = current_project.uuid
+ flash[:message] =
+ "Created new collection in the project #{current_project.name}."
+ else
+ flash[:message] = "Created new collection in your Home project."
end
- newc.save!
-
- chash.each do |k,v|
- l = Link.new({
- tail_uuid: k,
- head_uuid: newc.uuid,
- link_class: "provenance",
- name: "provided"
- })
- l.save!
+ newc = Collection.create!(coll_attrs)
+ source_paths.each_key do |src_uuid|
+ unless Link.create({
+ tail_uuid: src_uuid,
+ head_uuid: newc.uuid,
+ link_class: "provenance",
+ name: "provided",
+ })
+ flash[:error] = "
+An error occurred when saving provenance information for this collection.
+You can try recreating the collection to get a copy with full provenance data."
+ break
+ end
end
-
- msg = current_project_writable ?
- "Created new collection in the project #{current_project.name}." :
- "Created new collection in your Home project."
-
- redirect_to newc, flash: {'message' => msg}
+ redirect_to(newc, flash: flash)
end
def report_issue_popup
%w(Compare Graph)
end
+ helper_method :unreadable_inputs_present?
+ def unreadable_inputs_present?
+ unless @unreadable_inputs_present.nil?
+ return @unreadable_inputs_present
+ end
+
+ input_uuids = []
+ input_pdhs = []
+ @object.components.each do |k, component|
+ next if !component
+ component[:script_parameters].andand.each do |p, tv|
+ if (tv.is_a? Hash) and ((tv[:dataclass] == "Collection") || (tv[:dataclass] == "File"))
+ if tv[:value]
+ value = tv[:value]
+ elsif tv[:default]
+ value = tv[:default]
+ end
+ if value
+ split = value.split '/'
+ if CollectionsHelper.match(split[0])
+ input_pdhs << split[0]
+ else
+ input_uuids << split[0]
+ end
+ end
+ end
+ end
+ end
+
+ input_pdhs = input_pdhs.uniq
+ input_uuids = input_uuids.uniq
+
+ preload_collections_for_objects input_uuids if input_uuids.any?
+ preload_for_pdhs input_pdhs if input_pdhs.any?
+
+ @unreadable_inputs_present = false
+ input_uuids.each do |uuid|
+ if !collections_for_object(uuid).any?
+ @unreadable_inputs_present = true
+ break
+ end
+ end
+ if !@unreadable_inputs_present
+ input_pdhs.each do |pdh|
+ if !collection_for_pdh(pdh).any?
+ @unreadable_inputs_present = true
+ break
+ end
+ end
+ end
+
+ @unreadable_inputs_present
+ end
+
protected
def for_comparison v
if v.is_a? Hash or v.is_a? Array
1.month.ago.beginning_of_month,
Time.now.beginning_of_month]]
@spans.each do |span, threshold_start, threshold_end|
- @activity[:logins][span] = Log.
+ @activity[:logins][span] = Log.select(%w(uuid modified_by_user_uuid)).
filter([[:event_type, '=', 'login'],
[:object_kind, '=', 'arvados#user'],
[:created_at, '>=', threshold_start],
[:created_at, '<', threshold_end]])
- @activity[:jobs][span] = Job.
+ @activity[:jobs][span] = Job.select(%w(uuid modified_by_user_uuid)).
filter([[:created_at, '>=', threshold_start],
[:created_at, '<', threshold_end]])
- @activity[:pipeline_instances][span] = PipelineInstance.
+ @activity[:pipeline_instances][span] = PipelineInstance.select(%w(uuid modified_by_user_uuid)).
filter([[:created_at, '>=', threshold_start],
[:created_at, '<', threshold_end]])
@activity.each do |type, act|
end
def link_to_arvados_object_if_readable(attrvalue, link_text_if_not_readable, opts={})
- resource_class = resource_class_for_uuid(attrvalue)
+ resource_class = resource_class_for_uuid(attrvalue.split('/')[0]) if attrvalue
if !resource_class
return link_to_if_arvados_object attrvalue, opts
end
+ readable = object_readable attrvalue, resource_class
+ if readable
+ link_to_if_arvados_object attrvalue, opts
+ elsif opts[:required] and current_user # no need to show this for anonymous user
+ raw('<div><input type="text" style="border:none;width:100%;background:#ffdddd" disabled=true class="required unreadable-input" value="') + link_text_if_not_readable + raw('" ></input></div>')
+ else
+ link_text_if_not_readable
+ end
+ end
+
+ # This method takes advantage of preloaded collections and objects.
+ # Hence you can improve performance by first preloading objects
+ # related to the page context before using this method.
+ def object_readable attrvalue, resource_class=nil
+ # if it is a collection filename, check readable for the locator
+ attrvalue = attrvalue.split('/')[0] if attrvalue
+
+ resource_class = resource_class_for_uuid(attrvalue) if resource_class.nil?
+ return if resource_class.nil?
+
+ return_value = nil
if resource_class.to_s == 'Collection'
if CollectionsHelper.match(attrvalue)
- readable = collection_for_pdh(attrvalue).any?
+ found = collection_for_pdh(attrvalue)
+ return_value = found.first if found.any?
else
- readable = collections_for_object(attrvalue).any?
+ found = collections_for_object(attrvalue)
+ return_value = found.first if found.any?
end
else
- readable = object_for_dataclass(resource_class, attrvalue)
- end
-
- if readable
- link_to_if_arvados_object attrvalue, opts
- else
- link_text_if_not_readable
+ return_value = object_for_dataclass(resource_class, attrvalue)
end
+ return_value
end
def render_editable_attribute(object, attr, attrvalue=nil, htmloptions={})
end
if not object.andand.attribute_editable?(attr)
- return link_to_arvados_object_if_readable(attrvalue, attrvalue, friendly_name: true)
+ return link_to_arvados_object_if_readable(attrvalue, attrvalue, {friendly_name: true, required: required})
end
if dataclass
success: 'page-refresh'
}.to_json,
})
+ is_readable_input = attrvalue.present? and object_readable attrvalue, Collection
return content_tag('div', :class => 'input-group') do
html = text_field_tag(dn, display_value,
:class =>
- "form-control #{'required' if required}")
+ "form-control #{'required' if required} #{'unreadable-input' if !attrvalue.andand.empty? and !is_readable_input}")
html + content_tag('span', :class => 'input-group-btn') do
link_to('Choose',
modal_path,
raw(s)
end
+ def render_unreadable_inputs_present
+ if current_user and controller.class.name.eql?('PipelineInstancesController') and unreadable_inputs_present?
+ raw('<div class="alert alert-danger unreadable-inputs-present">' +
+ '<p>One or more inputs provided are not readable by you. ' +
+ 'Please correct these before you can run the pipeline.</p></div>')
+ end
+ end
end
<% end %>
<% end %>
-<%
- # Display any flash messages in an alert. If there is any entry with "error" key, alert-danger is used.
- flash_msg = ''
- flash_msg_is_error = false
- flash.each do |msg|
- flash_msg_is_error ||= (msg[0]=='error')
- flash_msg += ('<p class="contain-align-left">' + msg[1] + '</p>')
- end
- if flash_msg != ''
-%>
-<div class="flash-message alert <%= flash_msg_is_error ? 'alert-danger' : 'alert-warning' %>"><%=flash_msg.html_safe%></div>
+<% unless flash["error"].blank? %>
+<div class="flash-message alert alert-danger" role="alert">
+ <p class="contain-align-left"><%= flash["error"] %></p>
+</div>
+<% flash.delete("error") %>
+<% end %>
+
+<% unless flash.empty? %>
+<div class="flash-message alert alert-warning">
+ <% flash.each do |_, msg| %>
+ <p class="contain-align-left"><%= msg %></p>
+ <% end %>
+</div>
<% end %>
--- /dev/null
+<style>
+div.figure {
+}
+.style_image1 {
+ border: 10px solid #ddd;
+ display: block;
+ margin-left: auto;
+ margin-right: auto;
+}
+.style_image2 {
+ border: 10px solid #ddd;
+ display: block;
+ margin-left: 1em;
+}
+div.figure p {
+ text-align: center;
+ font-style: italic;
+ text-indent: 0;
+ border-top:-0.3em;
+}
+</style>
+
+<div id="getting-started-modal-window" class="modal">
+ <div class="modal-dialog modal-with-loading-spinner" style="width: 50em">
+ <div class="modal-content">
+ <div class="modal-header" style="text-align: center">
+ <button type="button" class="close" data-dismiss="modal" aria-hidden="true">x</button>
+ <div>
+ <div class="col-sm-8"><h4 class="modal-title" style="text-align: right">Getting Started with Arvados</h4></div> <%#Todo: center instead of right%>
+ <div class="spinner spinner-32px spinner-h-center col-sm-1" hidden="true"></div>
+ </div>
+ <br/>
+ </div>
+
+ <%#First Page%>
+ <div class="modal-body" style="height: 40em; overflow-y: scroll">
+ <div style="margin-top: -0.5em; margin-left: 0.5em;">
+ <p><div style="font-size: 150%;">Welcome!</div></p>
+ <p>
+ What you're looking at right now is <b>Workbench</b>, the graphical interface to the Arvados system.
+ </p><p>
+ <div class="figure">
+ <p> <%= image_tag "pipeline-running.gif", :class => "style_image1" %></p> <%#Todo: shorter gif%>
+ <p>Running the Pathomap pipeline in Arvados.</p>
+ </div>
+ </p><p>
+ Click the <span class="btn btn-sm btn-primary">Next ></span> button below for a speed tour of Arvados.
+ </p><p style="margin-top:2em;">
+ <em><strong>Note:</strong> You can always come back to this Getting Started guide by clicking the <span class="fa fa-lg fa-question-circle"></span> in the upper-right corner.</em>
+ </p>
+ </div>
+ </div>
+
+ <%#Page Two%>
+ <div class="modal-body" style="height: 40em; overflow-y: scroll">
+ <div style="margin-top: -0.5em; margin-left: 0.5em;">
+ <p><div style="font-size: 150%;">Take It for a Spin</div></p>
+ <p>
+ Run your first pipeline in 3 quick steps:
+ </p>
+ <div style="display: block; margin: 0em 2em; padding-top: 1em; padding-bottom: 1em; border: thin dashed silver;">
+ <p style="margin-left: 1em;">
+ <em>First, <a href="/users/welcome">log-in or register</a> with any Google account if you haven't already.</em>
+ </p><p>
+ <ol><li> Go to the <span class="btn btn-sm btn-default"><i class="fa fa-lg fa-fw fa-dashboard"></i> Dashboard</span> > <span class="btn btn-sm btn-primary"><i class="fa fa-fw fa-gear"></i> Run a pipeline...</span>
+ <p style="margin-top:1em;">
+ <%= image_tag "mouse-move.gif", :class => "style_image2" %>
+ </p>
+ </li>
+ <li> <span class="btn btn-sm btn-default"><i class="fa fa-fw fa-gear"></i>Mason Lab -- Ancestry Mapper (public)</span> > <span class="btn btn-sm btn-primary">Next: choose inputs <i class="fa fa-fw fa-arrow-circle-right"></i></span></li><br>
+ <li> <span class="btn btn-sm btn-primary">Run <i class="fa fa-fw fa-play"></i></span></li>
+ </ol>
+ </p></div>
+ <p style="margin-top:1em;">
+ <i class="fa fa-flag fa-flip-horizontal" style="color: green"></i> <i class="fa fa-child"></i>
+ <strong>Voila!</strong> <i class="fa fa-child"></i> <i class="fa fa-flag" style="color: green"></i>
+ Your pipeline is now spooling up and getting ready to run!
+ </p><p>
+ Go ahead, try it for yourself right now. <span class="glyphicon glyphicon-thumbs-up"></span>
+ </p><p>
+ Or click <span class="btn btn-sm btn-primary">Next ></span> below to keep reading!
+ </p>
+ </div>
+ </div>
+
+ <%#Page Three%>
+ <div class="modal-body" style="height: 40em; overflow-y: scroll">
+ <div style="margin-top: -0.5em; margin-left: 0.5em;">
+ <p><div style="font-size: 150%;">Three Useful Terms</div></p>
+ <ol>
+ <li>
+ <strong>Pipeline</strong> — A re-usable series of analysis steps.
+ <ul>
+ <li>
+ Also known as a “workflow” in other systems
+ </li><li>
+ A list of well-documented public pipelines can be found in the upper right corner by clicking the <span class="fa fa-lg fa-question-circle"></span> > <a href="<%= Rails.configuration.arvados_public_data_doc_url %>">Public Pipelines and Datasets</a>
+ </li><li>
+ Pro-tip: A Pipeline contains Jobs which contain Tasks
+ </li><li>
+ Pipelines can only be shared within a project
+ </li>
+ </ul>
+ </li>
+
+ <li>
+ <strong>Collection </strong>— Like a folder, but better.
+ <ul>
+ <li>
+ Upload data right in your browser
+ </li><li>
+ Better than a folder?
+ <ul><li>
+ Collections contain the content-address of the data instead of the data itself
+ </li><li>
+ Sets of data can be flexibly defined and re-defined without duplicating data
+ </li>
+ </ul></li><li>
+ Collections can be shared using the "Sharing and Permissions" > "Share" button
+ </li>
+ </ul>
+ </li>
+
+ <li>
+ <strong>Projects </strong>— Contain pipelines templates, pipeline instances (individual runs of a pipeline), and collections.
+ <ul><li>
+ The most useful one is your default "Home" project, under Projects > Home
+ </li><li>
+ Projects can be shared using the "sharing" tab
+ </li>
+ </ul>
+ </li>
+ </ol>
+
+ </div>
+ </div>
+
+ <%#Page Four%>
+ <div class="modal-body" style="height: 40em; overflow-y: scroll">
+ <div style="margin-top: -0.5em; margin-left: 0.5em;">
+ <p><div style="font-size: 150%;">Six Reasons Arvados is Awesome</div></p>
+ <p>
+ This guide, and in fact all of Workbench, is just a teaser for the full power of Arvados:
+ </p>
+ <ol>
+ <li>
+ <strong>Reproducible analyses</strong>: Enough said.
+ </li><li>
+ <strong>Data provenance</strong>: Every file in Arvados can tell you where it came from.
+ </li><li>
+ <strong>Serious scaling</strong>: Need 500 GB of space? 200 compute hours? Arvados scales and parallelizes your work for you intelligently.
+ </li><li>
+ <strong>Share pipelines or data</strong>: Easily publish your work to the world, just like <a href="http://www.pathomap.org/2015/04/08/run-the-pathomap-human-ancestry-pipeline-on-arvados/">the Pathomap team did</a>.
+ </li><li>
+ <strong>Use existing pipelines</strong>: Use best-practices pipelines on your own data with the click of a button.
+ </li><li>
+ <strong>Open-source</strong>: Arvados is completely open-source. Check out our <a href="http://arvados.org">developer site</a>.
+ </li>
+ </ol>
+ <p style="margin-top: 1em;">
+ Want to use the command-line, or hungry to learn more? Check out the User Guide at <a href="http://doc.arvados.org/">doc.arvados.org</a>.
+ </p><p>
+ Questions still? Head over to <a href="http://doc.arvados.org/">doc.arvados.org</a> to find mailing-list and contact info for the Arvados community.
+ </p><p>
+ That's all, folks! Click the "x" up top to leave this guide.
+ </p>
+ </div>
+ </div>
+
+ <div class="modal-footer">
+ <div style="text-align:center">
+ <button class="btn btn-default pager-prev"><i class="fa fa-fw fa-chevron-left"></i><span style="font-weight: bold;"> Prev</span></button>
+ <button class="btn btn-default pager-next"><span style="font-weight: bold;">Next </span><i class="fa fa-fw fa-chevron-right"></i></button>
+ <div class="pager-count pull-right"><span style="margin:5px"></span></div>
+ </div>
+ </div>
+ </div>
+ </div>
+</div>
</li>
<% end %>
<% else %>
- <li><a href="<%= arvados_api_client.arvados_login_url(return_to: root_url) %>">Log in</a></li>
+ <li class="dropdown hover-dropdown login-menu">
+ <a href="<%= arvados_api_client.arvados_login_url(return_to: root_url) %>">Log in</a>
+ <ul class="dropdown-menu">
+ <li>
+ <a href="<%= arvados_api_client.arvados_login_url(return_to: root_url) %>">
+ <span class="fa fa-lg fa-sign-in"></span>
+ <p style="margin-left: 1.6em; margin-top: -1.35em; margin-bottom: 0em; margin-right: 0.5em;">Log in or register with<br/>any Google account</p>
+ </a>
+ </li>
+ </ul>
+ </li>
<% end %>
<li class="dropdown help-menu">
<span class="fa fa-lg fa-question-circle"></span>
</a>
<ul class="dropdown-menu">
+ <li>
+ <%= link_to raw('<i class="fa fa-fw fa-info"></i> Getting Started ...'), "#",
+ {'data-toggle' => "modal", 'data-target' => '#getting-started-modal-window'} %>
+ </li>
+ <% if Rails.configuration.arvados_public_data_doc_url %>
+ <li><%= link_to raw('<i class="fa fa-book fa-fw"></i> Public Pipelines and Data sets'), "#{Rails.configuration.arvados_public_data_doc_url}", target: "_blank" %></li>
+ <% end %>
<li><%= link_to raw('<i class="fa fa-book fa-fw"></i> Tutorials and User guide'), "#{Rails.configuration.arvados_docsite}/user", target: "_blank" %></li>
<li><%= link_to raw('<i class="fa fa-book fa-fw"></i> API Reference'), "#{Rails.configuration.arvados_docsite}/api", target: "_blank" %></li>
<li><%= link_to raw('<i class="fa fa-book fa-fw"></i> SDK Reference'), "#{Rails.configuration.arvados_docsite}/sdk", target: "_blank" %></li>
<% end %>
<%= render partial: 'browser_unsupported' %><%# requires JS support below %>
+ <%= render partial: 'getting_started/getting_started_popup' %>
<div id="page-wrapper">
<%= yield %>
<div class="modal-container"></div>
<div id="report-issue-modal-window"></div>
<script src="/browser_unsupported.js"></script>
+
+<% if current_user and !current_user.prefs[:getting_started_shown] and
+ !request.url.include?("/profile") and
+ !request.url.include?("/user_agreements") and
+ !request.url.include?("/inactive")%>
+ <script>
+ $("#getting-started-modal-window").modal('show');
+ </script>
+ <%
+ prefs = current_user.prefs
+ prefs[:getting_started_shown] = Time.now
+ current_user.update_attributes prefs: prefs.to_json
+ %>
+<% end %>
<% else %>
<%# state is either New or Ready %>
+ <%= render_unreadable_inputs_present %>
+
<p><i>Here are all of the pipeline's components (jobs that will need to run in order to complete the pipeline). If you know what you're doing (or you're experimenting) you can modify these parameters before starting the pipeline. Usually, you only need to edit the settings presented on the "Inputs" tab above.</i></p>
<%= render_pipeline_components("editable", :json, editable: true) %>
-<%
- input_uuids = []
- input_pdhs = []
- @object.components.each do |k, component|
- next if !component
- component[:script_parameters].andand.each do |p, tv|
- if tv.is_a? Hash and !tv[:value].nil? and (tv[:dataclass] == "Collection")
- if CollectionsHelper.match(tv[:value])
- input_pdhs << tv[:value]
- else
- input_uuids << tv[:value]
- end
- end
- end
- end
-
- preload_collections_for_objects input_uuids if input_uuids.any?
- preload_for_pdhs input_pdhs if input_pdhs.any?
-%>
-
<table class="table pipeline-components-table" style="margin-top: -.1em">
<colgroup>
<col style="width: 20%" />
<% if n_inputs == 0 %>
<p>This pipeline does not need any further inputs specified. You can start it by clicking the "Run" button whenever you're ready. (It's not too late to change existing settings, though.)</p>
<% else %>
+ <%= render_unreadable_inputs_present %>
+
<p><i>Provide <%= n_inputs > 1 ? 'values' : 'a value' %> for the following <%= n_inputs > 1 ? 'parameters' : 'parameter' %>, then click the "Run" button to start the pipeline.</i></p>
<% if @object.editable? %>
<%= content_for :pi_input_form %>
</td>
<td>
- <%= render_editable_attribute (name_link || object), 'name', nil, {tiptitle: 'rename'} %>
+ <% if object.respond_to?(:name) %>
+ <%= render_editable_attribute (name_link || object), 'name', nil, {tiptitle: 'rename'} %>
+ <% end %>
</td>
<td class="arv-description-in-table">
<div class="panel-body">
<% if !missing_required_profile? && params[:offer_return_to] %>
<div class="alert alert-success">
- <p>Thank you for filling in your profile. <%= link_to 'Back to work!', params[:offer_return_to], class: 'btn btn-sm btn-primary' %></p>
+ <% if current_user.prefs[:getting_started_shown] %>
+ <p>Thank you for filling in your profile. <%= link_to 'Back to work!', params[:offer_return_to], class: 'btn btn-sm btn-primary' %></p>
+ <% else %>
+ <p>Thank you for filling in your profile. <%= link_to 'Get started', params[:offer_return_to], class: 'btn btn-sm btn-primary' %></p>
+ <% end %>
</div>
<% else %>
<div class="alert alert-info">
<% end %>
<% end %>
+ <%# If the user has other prefs, we need to preserve them %>
+ <% current_user.prefs.each do |key, value| %>
+ <% if key != :profile %>
+ <input type="hidden" name="user[prefs][:<%=key%>]" value="<%=value.to_json%>">
+ <% end %>
+ <% end %>
+
<% if show_save_button %>
<div class="form-group">
<div class="col-sm-offset-3 col-sm-8">
arvados_insecure_https: true
activation_contact_link: mailto:info@arvados.org
arvados_docsite: http://doc.arvados.org
+ arvados_public_data_doc_url: http://arvados.org/projects/arvados/wiki/Public_Pipelines_and_Datasets
arvados_theme: default
show_user_agreement_inline: false
secret_token: ~
assert_response 302 # collection created and redirected to new collection page
- assert response.headers['Location'].include? '/collections/'
+ assert_includes(response.headers['Location'], '/collections/')
new_collection_uuid = response.headers['Location'].split('/')[-1]
use_token :active
collection = Collection.select([:uuid, :manifest_text]).where(uuid: new_collection_uuid).first
manifest_text = collection['manifest_text']
- assert manifest_text.include?('foo'), 'Not found foo in new collection manifest text'
- assert manifest_text.include?('bar'), 'Not found bar in new collection manifest text'
- assert manifest_text.include?('baz'), 'Not found baz in new collection manifest text'
- assert manifest_text.include?('0:0:file1 0:0:file2 0:0:file3'),
- 'Not found 0:0:file1 0:0:file2 0:0:file3 in new collection manifest text'
- assert manifest_text.include?('dir1/subdir'), 'Not found dir1/subdir in new collection manifest text'
- assert manifest_text.include?('dir2'), 'Not found dir2 in new collection manifest text'
+ assert_includes(manifest_text, "foo")
+ assert_includes(manifest_text, "bar")
+ assert_includes(manifest_text, "baz")
+ assert_includes(manifest_text, "0:0:file1 0:0:file2 0:0:file3")
+ assert_includes(manifest_text, "dir1/subdir")
+ assert_includes(manifest_text, "dir2")
end
test "combine files with repeated names into new collection" do
assert_response 302 # collection created and redirected to new collection page
- assert response.headers['Location'].include? '/collections/'
+ assert_includes(response.headers['Location'], '/collections/')
new_collection_uuid = response.headers['Location'].split('/')[-1]
use_token :active
collection = Collection.select([:uuid, :manifest_text]).where(uuid: new_collection_uuid).first
manifest_text = collection['manifest_text']
- assert manifest_text.include?('foo'), 'Not found foo in new collection manifest text'
- assert manifest_text.include?('foo(1)'), 'Not found foo(1) in new collection manifest text'
- assert manifest_text.include?('foo(2)'), 'Not found foo(2) in new collection manifest text'
- assert manifest_text.include?('bar'), 'Not found bar in new collection manifest text'
- assert manifest_text.include?('baz'), 'Not found baz in new collection manifest text'
- assert manifest_text.include?('0:0:file1 0:0:file2 0:0:file3'),
- 'Not found 0:0:file1 0:0:file2 0:0:file3 in new collection manifest text'
- assert manifest_text.include?('dir1/subdir'), 'Not found dir1/subdir in new collection manifest text'
- assert manifest_text.include?('dir2'), 'Not found dir2 in new collection manifest text'
+ assert_includes(manifest_text, "foo(1)")
+ assert_includes(manifest_text, "foo(2)")
+ assert_includes(manifest_text, "bar")
+ assert_includes(manifest_text, "baz")
+ assert_includes(manifest_text, "0:0:file1 0:0:file2 0:0:file3")
+ assert_includes(manifest_text, "dir1/subdir")
+ assert_includes(manifest_text, "dir2")
end
test "combine collections with repeated filenames in almost similar directories and expect files with proper suffixes" do
collection = Collection.select([:uuid, :manifest_text]).where(uuid: new_collection_uuid).first
manifest_text = collection['manifest_text']
- assert manifest_text.include?('foo'), 'Not found foo in new collection manifest text'
- assert manifest_text.include?('foo(1)'), 'Not found foo(1) in new collection manifest text'
+ assert_includes(manifest_text, 'foo')
+ assert_includes(manifest_text, 'foo(1)')
streams = manifest_text.split "\n"
streams.each do |stream|
if stream.start_with? './dir1'
# dir1 stream
- assert stream.include?(':alice(1)'), "Not found: alice(1) in dir1 in manifest text #{manifest_text}"
- assert stream.include?(':alice.txt'), "Not found: alice.txt in dir1 in manifest text #{manifest_text}"
- assert stream.include?(':alice(1).txt'), "Not found: alice(1).txt in dir1 in manifest text #{manifest_text}"
- assert stream.include?(':bob.txt'), "Not found: bob.txt in dir1 in manifest text #{manifest_text}"
- assert stream.include?(':carol.txt'), "Not found: carol.txt in dir1 in manifest text #{manifest_text}"
+ assert_includes(stream, ':alice(1)')
+ assert_includes(stream, ':alice.txt')
+ assert_includes(stream, ':alice(1).txt')
+ assert_includes(stream, ':bob.txt')
+ assert_includes(stream, ':carol.txt')
elsif stream.start_with? './dir2'
# dir2 stream
- assert stream.include?(':alice.txt'), "Not found: alice.txt in dir2 in manifest text #{manifest_text}"
- assert stream.include?(':alice(1).txt'), "Not found: alice(1).txt in dir2 in manifest text #{manifest_text}"
+ assert_includes(stream, ':alice.txt')
+ assert_includes(stream, ':alice(1).txt')
elsif stream.start_with? '. '
# . stream
- assert stream.include?(':foo'), "Not found: foo in . in manifest text #{manifest_text}"
- assert stream.include?(':foo(1)'), "Not found: foo(1) in . in manifest text #{manifest_text}"
+ assert_includes(stream, ':foo')
+ assert_includes(stream, ':foo(1)')
end
end
end
assert_response 302 # collection created and redirected to new collection page
- assert response.headers['Location'].include? '/collections/'
+ assert_includes(response.headers['Location'], '/collections/')
new_collection_uuid = response.headers['Location'].split('/')[-1]
use_token :active
assert_equal 2, streams.length
streams.each do |stream|
if stream.start_with? './dir1'
- assert stream.include?('foo'), 'Not found: foo in dir1'
+ assert_includes(stream, 'foo')
elsif stream.start_with? '. '
- assert stream.include?('foo'), 'Not found: foo in .'
+ assert_includes(stream, 'foo')
end
end
- assert !manifest_text.include?('foo(1)'), 'Found foo(1) in new collection manifest text'
+ refute_includes(manifest_text, 'foo(1)')
end
test "combine foo files from two different collection streams and expect proper filename suffixes" do
assert_response 302 # collection created and redirected to new collection page
- assert response.headers['Location'].include? '/collections/'
+ assert_includes(response.headers['Location'], '/collections/')
new_collection_uuid = response.headers['Location'].split('/')[-1]
use_token :active
streams = manifest_text.split "\n"
assert_equal 1, streams.length, "Incorrect number of streams in #{manifest_text}"
- assert manifest_text.include?('foo'), "Not found foo in new collection manifest text #{manifest_text}"
- assert manifest_text.include?('foo(1)'), "Not found foo(1) in new collection manifest text #{manifest_text}"
+ assert_includes(manifest_text, 'foo')
+ assert_includes(manifest_text, 'foo(1)')
end
end
require 'test_helper'
class CollectionsHelperTest < ActionView::TestCase
+ reset_api_fixtures :after_each_test, false
+
[
["filename.csv", true],
["filename.fa", true],
[
['new_pipeline_in_publicly_accessible_project', true],
+ ['new_pipeline_in_publicly_accessible_project', true, 'spectator'],
['new_pipeline_in_publicly_accessible_project_but_other_objects_elsewhere', false],
['new_pipeline_in_publicly_accessible_project_but_other_objects_elsewhere', false, 'spectator'],
['new_pipeline_in_publicly_accessible_project_but_other_objects_elsewhere', true, 'admin'],
+ ['new_pipeline_in_publicly_accessible_project_with_dataclass_file_and_other_objects_elsewhere', false],
+ ['new_pipeline_in_publicly_accessible_project_with_dataclass_file_and_other_objects_elsewhere', false, 'spectator'],
+ ['new_pipeline_in_publicly_accessible_project_with_dataclass_file_and_other_objects_elsewhere', true, 'admin'],
].each do |fixture, objects_readable, user=nil|
test "access #{fixture} in public project with objects readable=#{objects_readable} with user #{user}" do
object = api_fixture('pipeline_instances')[fixture]
if user == 'admin'
assert_text 'input'
assert_selector 'a', text: 'Choose'
+ assert_selector 'a', text: 'Run'
+ assert_no_selector 'a.disabled', text: 'Run'
else
assert_selector 'a', text: object['components']['foo']['script_parameters']['input']['value']
+ user ? (assert_selector 'a', text: 'Run') : (assert_no_selector 'a', text: 'Run')
end
else
assert_no_text 'This pipeline was created from' # template is not readable
- assert_text object['components']['foo']['script_parameters']['input']['value']
- assert_no_selector 'a', text: object['components']['foo']['script_parameters']['input']['value']
+ input = object['components']['foo']['script_parameters']['input']['value']
+ assert_no_selector 'a', text: input
+ if user
+ input = input.gsub('/', '\\/')
+ assert_text "One or more inputs provided are not readable"
+ assert_selector "input[type=text][value=#{input}]"
+ assert_selector 'a.disabled', text: 'Run'
+ else
+ assert_no_text "One or more inputs provided are not readable"
+ assert_text input
+ assert_no_selector 'a', text: 'Run'
+ end
end
end
end
within('.navbar-fixed-top') do
page.find("#arv-help").click
within('.dropdown-menu') do
+ assert_selector 'a', text:'Getting Started ...'
+ assert_selector 'a', text:'Public Pipelines and Data sets'
assert page.has_link?('Tutorials and User guide'), 'No link - Tutorials and User guide'
assert page.has_link?('API Reference'), 'No link - API Reference'
assert page.has_link?('SDK Reference'), 'No link - SDK Reference'
['active', api_fixture('users')['active'], true, true],
['admin', api_fixture('users')['admin'], true, true],
['active_no_prefs', api_fixture('users')['active_no_prefs'], true, false],
- ['active_no_prefs_profile', api_fixture('users')['active_no_prefs_profile'], true, false],
+ ['active_no_prefs_profile_no_getting_started_shown',
+ api_fixture('users')['active_no_prefs_profile_no_getting_started_shown'], true, false],
].each do |token, user, invited, has_profile|
test "visit home page for user #{token}" do
verify_system_menu user
end
end
+
+ test "test getting started help menu item" do
+ visit page_with_token('active')
+ within '.navbar-fixed-top' do
+ find('.help-menu > a').click
+ find('.help-menu .dropdown-menu a', text: 'Getting Started ...').click
+ end
+
+ within '.modal-content' do
+ assert_text 'Getting Started'
+ assert_selector 'button:not([disabled])', text: 'Next'
+ assert_no_selector 'button:not([disabled])', text: 'Prev'
+
+ # Use Next button to enable Prev button
+ click_button 'Next'
+ assert_selector 'button:not([disabled])', text: 'Prev' # Prev button is now enabled
+ click_button 'Prev'
+ assert_no_selector 'button:not([disabled])', text: 'Prev' # Prev button is again disabled
+
+ # Click Next until last page is reached and verify that it is disabled
+ (0..20).each do |i| # currently we only have 4 pages, and don't expect to have more than 20 in future
+ click_button 'Next'
+ begin
+ find('button:not([disabled])', text: 'Next')
+ rescue => e
+ break
+ end
+ end
+ assert_no_selector 'button:not([disabled])', text: 'Next' # Next button is disabled
+ assert_selector 'button:not([disabled])', text: 'Prev' # Prev button is enabled
+ click_button 'Prev'
+ assert_selector 'button:not([disabled])', text: 'Next' # Next button is now enabled
+
+ first('button', text: 'x').click
+ end
+ assert_text 'Active pipelines' # seeing dashboard now
+ end
+
+ test "test arvados_public_data_doc_url config unset" do
+ Rails.configuration.arvados_public_data_doc_url = false
+
+ visit page_with_token('active')
+ within '.navbar-fixed-top' do
+ find('.help-menu > a').click
+
+ assert_no_selector 'a', text:'Public Pipelines and Data sets'
+
+ assert_selector 'a', text:'Getting Started ...'
+ assert page.has_link?('Tutorials and User guide'), 'No link - Tutorials and User guide'
+ assert page.has_link?('API Reference'), 'No link - API Reference'
+ assert page.has_link?('SDK Reference'), 'No link - SDK Reference'
+ assert page.has_link?('Show version / debugging info ...'), 'No link - Show version / debugging info'
+ assert page.has_link?('Report a problem ...'), 'No link - Report a problem'
+ end
+ end
end
end
test "API error page has Report problem button" do
+ # point to a bad api server url to generate fiddlesticks error
original_arvados_v1_base = Rails.configuration.arvados_v1_base
+ Rails.configuration.arvados_v1_base = "https://[::1]:1/"
- begin
- # point to a bad api server url to generate fiddlesticks error
- Rails.configuration.arvados_v1_base = "https://[100::f]:1/"
+ visit page_with_token("active")
- visit page_with_token("active")
+ assert_text 'fiddlesticks'
- assert_text 'fiddlesticks'
+ # reset api server base config to let the popup rendering to work
+ Rails.configuration.arvados_v1_base = original_arvados_v1_base
- # reset api server base config to let the popup rendering to work
- Rails.configuration.arvados_v1_base = original_arvados_v1_base
+ click_link 'Report problem'
- click_link 'Report problem'
+ within '.modal-content' do
+ assert_text 'Report a problem'
+ assert_no_text 'Version / debugging info'
+ assert_text 'Describe the problem'
+ assert_text 'Send problem report'
+ # "Send" button should be disabled until text is entered
+ assert_no_selector 'a,button:not([disabled])', text: 'Send problem report'
+ assert_selector 'a,button', text: 'Cancel'
- within '.modal-content' do
- assert_text 'Report a problem'
- assert_no_text 'Version / debugging info'
- assert_text 'Describe the problem'
- assert_text 'Send problem report'
- # "Send" button should be disabled until text is entered
- assert_no_selector 'a,button:not([disabled])', text: 'Send problem report'
- assert_selector 'a,button', text: 'Cancel'
+ report = mock
+ report.expects(:deliver).returns true
+ IssueReporter.expects(:send_report).returns report
- report = mock
- report.expects(:deliver).returns true
- IssueReporter.expects(:send_report).returns report
+ # enter a report text and click on report
+ find_field('report_issue_text').set 'my test report text'
+ click_button 'Send problem report'
- # enter a report text and click on report
- find_field('report_issue_text').set 'my test report text'
- click_button 'Send problem report'
-
- # ajax success updated button texts and added footer message
- assert_no_selector 'a,button', text: 'Send problem report'
- assert_no_selector 'a,button', text: 'Cancel'
- assert_text 'Report sent'
- assert_text 'Thanks for reporting this issue'
- click_button 'Close'
- end
-
- # out of the popup now and should be back in the error page
- assert_text 'fiddlesticks'
- ensure
- Rails.configuration.arvados_v1_base = original_arvados_v1_base
+ # ajax success updated button texts and added footer message
+ assert_no_selector 'a,button', text: 'Send problem report'
+ assert_no_selector 'a,button', text: 'Cancel'
+ assert_text 'Report sent'
+ assert_text 'Thanks for reporting this issue'
+ click_button 'Close'
end
+
+ # out of the popup now and should be back in the error page
+ assert_text 'fiddlesticks'
end
end
# Re-running jobs doesn't currently work because the test API
# server has no git repository to check against. For now, check
- # that the correct script version is mentioned in the
- # Fiddlesticks error message.
+ # that the error message says something appropriate for that
+ # situation.
if expect_options && use_latest
assert_text "Script version #{job['supplied_script_version']} does not resolve to a commit"
else
visit page_with_token 'active', '/projects/' + api_fixture('groups')['aproject']['uuid']
# Point to a bad api server url to generate error
- Rails.configuration.arvados_v1_base = "https://[100::f]:1/"
+ Rails.configuration.arvados_v1_base = "https://[::1]:1/"
click_link 'Other objects'
within '#Other_objects' do
# Error
['active', api_fixture('users')['active']],
['admin', api_fixture('users')['admin']],
['active_no_prefs', api_fixture('users')['active_no_prefs']],
- ['active_no_prefs_profile', api_fixture('users')['active_no_prefs_profile']],
+ ['active_no_prefs_profile_no_getting_started_shown',
+ api_fixture('users')['active_no_prefs_profile_no_getting_started_shown']],
].each do |token, user|
test "check version info and report issue for user #{token}" do
assert page.has_no_text?('Save profile'), 'Found text - Save profile'
end
elsif invited
- assert page.has_text?('Please check the box below to indicate that you have read and accepted the user agreement'), 'Not found text - Please check the box below . . .'
+ assert page.has_text?('Please check the box below to indicate that you have read and accepted the user agreement'),
+ 'Not found text - Please check the box below . . .'
assert page.has_no_text?('Save profile'), 'Found text - Save profile'
else
assert page.has_text?('Your account is inactive'), 'Not found text - Your account is inactive'
assert page.has_no_text?('Save profile'), 'Found text - Save profile'
end
+ # If the user has not already seen getting_started modal, it will be shown on first visit.
+ if user and user['is_active'] and !user['prefs']['getting_started_shown']
+ within '.modal-content' do
+ assert_text 'Getting Started'
+ assert_selector 'button', text: 'Next'
+ assert_selector 'button', text: 'Prev'
+ first('button', text: 'x').click
+ end
+ end
+
within('.navbar-fixed-top') do
if !user
assert page.has_link?('Log in'), 'Not found link - Log in'
click_button "Save profile"
# profile saved and in profile page now with success
assert page.has_text?('Thank you for filling in your profile'), 'No text - Thank you for filling'
- click_link 'Back to work!'
+ if user['prefs']['getting_started_shown']
+ click_link 'Back to work!'
+ else
+ click_link 'Get started'
+ end
# profile saved and in home page now
assert page.has_text?('Active pipelines'), 'No text - Active pipelines'
['active', api_fixture('users')['active'], true, true],
['admin', api_fixture('users')['admin'], true, true],
['active_no_prefs', api_fixture('users')['active_no_prefs'], true, false],
- ['active_no_prefs_profile', api_fixture('users')['active_no_prefs_profile'], true, false],
+ ['active_no_prefs_profile_no_getting_started_shown',
+ api_fixture('users')['active_no_prefs_profile_no_getting_started_shown'], true, false],
+ ['active_no_prefs_profile_with_getting_started_shown',
+ api_fixture('users')['active_no_prefs_profile_with_getting_started_shown'], true, false],
].each do |token, user, invited, has_profile|
test "visit home page when profile is configured for user #{token}" do
end
end
- setup do
+ teardown do
Thread.current[:arvados_api_token] = nil
Thread.current[:user] = nil
Thread.current[:reader_tokens] = nil
exclude: ["Rakefile", "tmp", "vendor"]
navbar:
- start:
- - Getting Started:
- - start/index.html.textile.liquid
- - Quickstart:
- - start/getting_started/publicproject.html.textile.liquid
- - start/getting_started/firstpipeline.html.textile.liquid
- - Common Use Cases:
- - start/getting_started/sharedata.html.textile.liquid
- - Next Steps:
- - start/getting_started/nextsteps.html.textile.liquid
+ #start:
+ #- Getting Started:
+ #- start/index.html.textile.liquid
+ #- Quickstart:
+ #- start/getting_started/publicproject.html.textile.liquid
+ #- start/getting_started/firstpipeline.html.textile.liquid
+ #- Common Use Cases:
+ #- start/getting_started/sharedata.html.textile.liquid
+ #- Next Steps:
+ #- start/getting_started/nextsteps.html.textile.liquid
userguide:
- Welcome:
<span class="icon-bar"></span>
<span class="icon-bar"></span>
</button>
- <a class="navbar-brand" href="{{ site.baseurl }}/">Arvados</a>
+ <a class="navbar-brand" href="{{ site.baseurl }}/">Arvados Docs</a>
</div>
<div class="collapse navbar-collapse" id="bs-navbar-collapse">
<ul class="nav navbar-nav">
- <li {% if page.navsection == 'start' %} class="active" {% endif %}><a href="{{ site.baseurl }}/start/index.html">Getting Started</a></li>
+ <!--<li {% if page.navsection == 'start' %} class="active" {% endif %}><a href="{{ site.baseurl }}/start/index.html">Getting Started</a></li>-->
<li {% if page.navsection == 'userguide' %} class="active" {% endif %}><a href="{{ site.baseurl }}/user/index.html">User Guide</a></li>
<li {% if page.navsection == 'sdk' %} class="active" {% endif %}><a href="{{ site.baseurl }}/sdk/index.html">SDKs</a></li>
<li {% if page.navsection == 'api' %} class="active" {% endif %}><a href="{{ site.baseurl }}/api/index.html">API</a></li>
<li {% if page.navsection == 'installguide' %} class="active" {% endif %}><a href="{{ site.baseurl }}/install/index.html">Install</a></li>
- <li><a href="https://arvados.org/projects/arvados/" style="padding-left: 2em">Developer Site »</a></li>
+ <li><a href="https://arvados.org/projects/arvados/" style="padding-left: 2em">arvados.org »</a></li>
</ul>
<div class="pull-right" style="padding-top: 6px">
<html>
<head>
<meta charset="utf-8">
- <title>{% unless page.title == "Arvados" %} Arvados | Documentation | {% endunless %}{{ page.title }}</title>
+ <title>{% unless page.title == "Arvados | Documentation" %} Arvados | Documentation | {% endunless %}{{ page.title }}</title>
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<meta name="description" content="">
<meta name="author" content="">
|minimum_script_version |string |Git branch, tag, or commit hash specifying the minimum acceptable script version (earliest ancestor) to consider when deciding whether to re-use a past job.[1]|query|@"c3e86c9"@|
|exclude_script_versions|array of strings|Git commit branches, tags, or hashes to exclude when deciding whether to re-use a past job.|query|@["8f03c71","8f03c71"]@
@["badtag1","badtag2"]@|
-|filters|array|Conditions to find Jobs to reuse.|query||
+|filters|array of arrays|Conditions to find Jobs to reuse.|query||
|find_or_create |boolean |Before creating, look for an existing job that has identical script, script_version, and script_parameters to those in the present job, has nondeterministic=false, and did not fail (it could be queued, running, or completed). If such a job exists, respond with the existing job instead of submitting a new one.|query|@false@|
When a job is submitted to the queue using the **create** method, the @script_version@ attribute is updated to a full 40-character Git commit hash based on the current content of the specified repository. If @script_version@ cannot be resolved, the job submission is rejected.
notextile. <div class="spaced-out">
# If @find_or_create@ is false or omitted, create a new job and skip the rest of these steps.
-# If @filters@ are specified, find jobs that match those filters. Filters *must* be specified to limit the @repository@ and @script@ attributes. An error is returned if they are missing.
-# If @filters@ are not specified, find jobs with the same @repository@ and @script@, with a @script_version@ between @minimum_script_version@ and @script_version@ (excluding @excluded_script_versions@), and a @docker_image_locator@ with the latest Collection that matches the submitted job's @docker_image@ constraint. If the submitted job includes an @arvados_sdk_version@ constraint, jobs must have an @arvados_sdk_version@ between that refspec and HEAD to be found.
+# If @filters@ are specified, find jobs that match those filters. If any filters are given, there must be at least one filter on the @repository@ attribute and one on the @script@ attribute: otherwise an error is returned.
+# If @filters@ are not specified, find jobs with the same @repository@ and @script@, with a @script_version@ between @minimum_script_version@ and @script_version@ inclusively (excluding @excluded_script_versions@), and a @docker_image_locator@ with the latest Collection that matches the submitted job's @docker_image@ constraint. If the submitted job includes an @arvados_sdk_version@ constraint, jobs must have an @arvados_sdk_version@ between that refspec and HEAD to be found. *This form is deprecated: use filters instead.*
# If the found jobs include a completed job, and all found completed jobs have consistent output, return one of them. Which specific job is returned is undefined.
# If the found jobs only include incomplete jobs, return one of them. Which specific job is returned is undefined.
# If no job has been returned so far, create and return a new job.
|_. Attribute|_. Type|_. Description|_. Notes|
|script|string|The filename of the job script.|This program will be invoked by Crunch for each job task. It is given as a path to an executable file, relative to the @/crunch_scripts@ directory in the Git tree specified by the _repository_ and _script_version_ attributes.|
|script_parameters|hash|The input parameters for the job.|Conventionally, one of the parameters is called @"input"@. Typically, some parameter values are collection UUIDs. Ultimately, though, the significance of parameters is left entirely up to the script itself.|
-|repository|string|Git repository|Given as the name of a locally hosted Git repository.|
+|repository|string|Git repository name or URL.|Source of the repository where the given script_version is to be found. This can be given as the name of a locally hosted repository, or as a publicly accessible URL starting with @git://@, @http://@, or @https://@.
+Examples:
+@yourusername/yourrepo@
+@https://github.com/curoverse/arvados.git@|
|script_version|string|Git commit|During a **create** transaction, this is the Git branch, tag, or hash supplied by the client. Before the job starts, Arvados updates it to the full 40-character SHA-1 hash of the commit used by the job.
See "Specifying Git versions":#script_version below for more detail about acceptable ways to specify a commit.|
|cancelled_by_client_uuid|string|API client ID|Is null if job has not been cancelled|
layout: default
no_nav_left: true
navsection: top
-title: Arvados
+title: Arvados | Documentation
...
<div class="jumbotron">
<div class="col-sm-5">
<p><a href="https://arvados.org/">Arvados</a> enables you to quickly begin using cloud computing resources in your data science work. It allows you to track your methods and datasets, share them securely, and easily re-run analyses.
</p>
- <p><strong>Quickstart</strong>: Check out our <a href="{{ site.baseurl }}/start/index.html">key features</a>, complete with screenshots, and then follow our tutorial to <a href="{{ site.baseurl }}/start/getting_started/firstpipeline.html">run your first pipeline</a> using our <a href="http://lp.curoverse.com/beta-signup/">public beta</a>.
- </p>
<p><strong>News</strong>: Read our <a href="https://arvados.org/projects/arvados/blogs">blog updates</a> or look through our <a href="https://arvados.org/projects/arvados/activity">recent developer activity</a>.
</p>
<p><strong>Questions?</strong> Email <a href="http://lists.arvados.org/mailman/listinfo/arvados">the mailing list</a>, or chat with us on IRC: <a href="irc://irc.oftc.net:6667/#arvados">#arvados</a> @ OFTC (you can <a href="https://webchat.oftc.net/?channels=arvados">join in your browser</a>).
</div>
<div class="col-sm-7" style="border-left: solid; border-width: 1px">
- <p>Below you can also find more in-depth guides for using Arvados.
+ <p><strong>Quickstart</strong>
+ <p>
+ Try any pipeline from the <a href="https://arvados.org/projects/arvados/wiki/Public_Pipelines_and_Datasets">list of public pipelines</a>. For instance, the <a href="http://curover.se/pathomap">Pathomap Pipeline</a> links to these <a href="https://arvados.org/projects/arvados/wiki/pathomap_tutorial/">step-by-step instructions</a> for trying Arvados out right in your browser using Curoverse's <a href="http://lp.curoverse.com/beta-signup/">public Arvados instance</a>.
</p>
- <br>
+ <!--<p>-->
+ <!--<ol>-->
+ <!--<li>-->
+ <!--Go to <a href="https://{{ site.arvados_workbench_host }}/" target="_blank">https://{{ site.arvados_workbench_host }}/</a>-->
+ <!--</li><li>-->
+ <!--Register with any Google account-->
+ <!--</li><li>-->
+ <!--Follow the Getting Started guide-->
+ <!--<br>-->
+ <!--<em>Tip: Don't see the guide? You can find it by clicking (in the upper-right corner) <span class="fa fa-lg fa-question-circle"></span> > Getting Started)</em>-->
+ <!--</li>-->
+ <!--</ol>-->
+ <!--</p>-->
+ <p><strong>
+ Pipeline Developer Quickstart
+ </strong></p>
<p>
- <a href="{{ site.baseurl }}/start/index.html">Getting Started</a> — Start here if you're new to Arvados.
+ Want to port your pipeline to Arvados? Check out the step-by-step <a href="https://arvados.org/projects/arvados/wiki/Port_a_Pipeline">Port-a-Pipeline</a> guide on the Arvados wiki.
</p>
+ <p><strong>Below you can also find more in-depth guides for using Arvados.
+ </strong></p>
+ <!--<p>-->
+ <!--<a href="{{ site.baseurl }}/start/index.html">Getting Started</a> — Start here if you're new to Arvados.-->
+ <!--</p>-->
<p>
<a href="{{ site.baseurl }}/user/index.html">User Guide</a> — How to manage data and do analysis with Arvados.
</p>
title: Welcome to Arvados!
...
-_If you are new to Arvados, please read the "Getting Started":{{site.baseurl}}/start/index.html guide for a quick introduction to working with Arvados._
+_If you are new to Arvados, please try the Quickstart on <a href="http://doc.arvados.org">the documentation homepage</a> instead of this detailed User Guide._
This guide provides a reference for using Arvados to solve big data bioinformatics problems, including:
* Storing and querying metadata about genome sequence files, such as human subjects and their phenotypic traits using the "Arvados Metadata Database.":{{site.baseurl}}/user/topics/tutorial-trait-search.html
* Accessing, organizing, and sharing data, pipelines and results using the "Arvados Workbench":{{site.baseurl}}/user/getting_started/workbench.html web application.
-This User Guide goes into more depth than the "Getting Started guide":{{site.baseurl}}/start/index.html, covers how to develop your own pipelines in addition to using pre-existing pipelines, covers the Arvados commandline tools in addition to the Workbench graphical interface to Arvados, and can be referenced in any order.
-
The examples in this guide use the Arvados instance located at <a href="https://{{ site.arvados_workbench_host }}/" target="_blank">https://{{ site.arvados_workbench_host }}</a>. If you are using a different Arvados instance replace @{{ site.arvados_workbench_host }}@ with your private instance in all of the examples in this guide.
Curoverse maintains a public Arvados instance located at <a href="https://workbench.qr1hi.arvadosapi.com/" target="_blank">https://workbench.qr1hi.arvadosapi.com/</a>. You must have an account in order to use this service. If you would like to request an account, please send an email to "arvados@curoverse.com":mailto:arvados@curoverse.com.
permission_args=""
fi
-exec keepstore $permission_args -listen=":25107" -volumes="/keep-data"
+exec keepstore $permission_args -listen=":25107" -volume="/keep-data"
# will be filled in later, if [ -z "$skipDetection" ]
lsbDist=''
-target="/tmp/docker-rootfs-debootstrap-$suite-$$-$RANDOM"
+target="${TMPDIR:-/tmp}/docker-rootfs-debootstrap-$suite-$$-$RANDOM"
cd "$(dirname "$(readlink -f "$BASH_SOURCE")")"
returnTo="$(pwd -P)"
a2enmod rewrite && \
a2enmod ssl && \
cd /usr/src/sso-provider && \
+ cp config/environments/production.rb.example config/environments/production.rb && \
RAILS_ENV=production /usr/local/rvm/bin/rvm-exec default bundle exec rake db:setup && \
/usr/local/rvm/bin/rvm-exec default bundle exec rake assets:precompile && \
- chown www-data:www-data tmp_omniauth log config.ru -R && \
+ chown www-data:www-data log config.ru -R && \
chown www-data:www-data db db/production.sqlite3 && \
/bin/mkdir /var/run/apache2
@jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
or $a <=> $b } @jobstep_todo;
my $level = $jobstep[$jobstep_todo[0]]->{level};
-Log (undef, "start level $level");
+my $initial_tasks_this_level = 0;
+foreach my $id (@jobstep_todo) {
+ $initial_tasks_this_level++ if ($jobstep[$id]->{level} == $level);
+}
+# If the number of tasks scheduled at this level #T is smaller than the number
+# of slots available #S, only use the first #T slots, or the first slot on
+# each node, whichever number is greater.
+#
+# When we dispatch tasks later, we'll allocate whole-node resources like RAM
+# based on these numbers. Using fewer slots makes more resources available
+# to each individual task, which should normally be a better strategy when
+# there are fewer of them running with less parallelism.
+#
+# Note that this calculation is not redone if the initial tasks at
+# this level queue more tasks at the same level. This may harm
+# overall task throughput for that level.
+my @freeslot;
+if ($initial_tasks_this_level < @node) {
+ @freeslot = (0..$#node);
+} elsif ($initial_tasks_this_level < @slot) {
+ @freeslot = (0..$initial_tasks_this_level - 1);
+} else {
+ @freeslot = (0..$#slot);
+}
+my $round_num_freeslots = scalar(@freeslot);
+my %round_max_slots = ();
+for (my $ii = $#freeslot; $ii >= 0; $ii--) {
+ my $this_slot = $slot[$freeslot[$ii]];
+ my $node_name = $this_slot->{node}->{name};
+ $round_max_slots{$node_name} ||= $this_slot->{cpu};
+ last if (scalar(keys(%round_max_slots)) >= @node);
+}
+
+Log(undef, "start level $level with $round_num_freeslots slots");
my %proc;
-my @freeslot = (0..$#slot);
my @holdslot;
my %reader;
my $progress_is_dirty = 1;
update_progress_stats();
-
THISROUND:
-my $tasks_this_level = 0;
-foreach my $id (@jobstep_todo) {
- $tasks_this_level++ if ($jobstep[$id]->{level} == $level);
-}
for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
{
my $id = $jobstep_todo[$todo_ptr];
$ENV{"HOME"} = $ENV{"TASK_WORK"};
$ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
$ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
- $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
+ $ENV{"CRUNCH_NODE_SLOTS"} = $round_max_slots{$ENV{TASK_SLOT_NODE}};
$ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
$ENV{"GZIP"} = "-n";
- my $max_node_concurrent_tasks = $ENV{CRUNCH_NODE_SLOTS};
- if ($tasks_this_level < $max_node_concurrent_tasks) {
- $max_node_concurrent_tasks = $tasks_this_level;
- }
-
my @srunargs = (
"srun",
"--nodelist=".$childnode->{name},
# $command. No tool is expected to read these values directly.
.q{&& MEM=$(awk '($1 == "MemTotal:"){print $2}' </proc/meminfo) }
.q{&& SWAP=$(awk '($1 == "SwapTotal:"){print $2}' </proc/meminfo) }
- ."&& MEMLIMIT=\$(( (\$MEM * 95) / ($max_node_concurrent_tasks * 100) )) "
+ ."&& MEMLIMIT=\$(( (\$MEM * 95) / ($ENV{CRUNCH_NODE_SLOTS} * 100) )) "
."&& let SWAPLIMIT=\$MEMLIMIT+\$SWAP ";
$command .= "&& exec arv-mount --by-id --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
if ($docker_hash)
while (!@freeslot
||
- (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
+ ($round_num_freeslots > @freeslot && $todo_ptr+1 > $#jobstep_todo))
{
last THISROUND if $main::please_freeze || defined($main::success);
if ($main::please_info)
"net/http"
"os"
"regexp"
+ "strconv"
"strings"
"sync"
- "sync/atomic"
- "time"
- "unsafe"
)
// A Keep "block" is 64MB.
var BlockNotFound = errors.New("Block not found")
var InsufficientReplicasError = errors.New("Could not write sufficient replicas")
-var OversizeBlockError = errors.New("Block too big")
+var OversizeBlockError = errors.New("Exceeded maximum block size (" + strconv.Itoa(BLOCKSIZE) + ")")
var MissingArvadosApiHost = errors.New("Missing required environment variable ARVADOS_API_HOST")
var MissingArvadosApiToken = errors.New("Missing required environment variable ARVADOS_API_TOKEN")
+var InvalidLocatorError = errors.New("Invalid locator")
const X_Keep_Desired_Replicas = "X-Keep-Desired-Replicas"
const X_Keep_Replicas_Stored = "X-Keep-Replicas-Stored"
Arvados *arvadosclient.ArvadosClient
Want_replicas int
Using_proxy bool
- service_roots *map[string]string
- lock sync.Mutex
+ localRoots *map[string]string
+ gatewayRoots *map[string]string
+ lock sync.RWMutex
Client *http.Client
}
// Create a new KeepClient. This will contact the API server to discover Keep
// servers.
-func MakeKeepClient(arv *arvadosclient.ArvadosClient) (kc KeepClient, err error) {
+func MakeKeepClient(arv *arvadosclient.ArvadosClient) (*KeepClient, error) {
var matchTrue = regexp.MustCompile("^(?i:1|yes|true)$")
insecure := matchTrue.MatchString(os.Getenv("ARVADOS_API_HOST_INSECURE"))
- kc = KeepClient{
+ kc := &KeepClient{
Arvados: arv,
Want_replicas: 2,
Using_proxy: false,
Client: &http.Client{Transport: &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: insecure}}},
}
- _, err = (&kc).DiscoverKeepServers()
-
- return kc, err
+ return kc, kc.DiscoverKeepServers()
}
-// Put a block given the block hash, a reader with the block data, and the
-// expected length of that data. The desired number of replicas is given in
-// KeepClient.Want_replicas. Returns the number of replicas that were written
-// and if there was an error. Note this will return InsufficientReplias
-// whenever 0 <= replicas < this.Wants_replicas.
-func (this KeepClient) PutHR(hash string, r io.Reader, expectedLength int64) (locator string, replicas int, err error) {
-
+// Put a block given the block hash, a reader, and the number of bytes
+// to read from the reader (which must be between 0 and BLOCKSIZE).
+//
+// Returns the locator for the written block, the number of replicas
+// written, and an error.
+//
+// Returns an InsufficientReplicas error if 0 <= replicas <
+// kc.Wants_replicas.
+func (kc *KeepClient) PutHR(hash string, r io.Reader, dataBytes int64) (string, int, error) {
// Buffer for reads from 'r'
var bufsize int
- if expectedLength > 0 {
- if expectedLength > BLOCKSIZE {
+ if dataBytes > 0 {
+ if dataBytes > BLOCKSIZE {
return "", 0, OversizeBlockError
}
- bufsize = int(expectedLength)
+ bufsize = int(dataBytes)
} else {
bufsize = BLOCKSIZE
}
t := streamer.AsyncStreamFromReader(bufsize, HashCheckingReader{r, md5.New(), hash})
defer t.Close()
- return this.putReplicas(hash, t, expectedLength)
+ return kc.putReplicas(hash, t, dataBytes)
}
-// Put a block given the block hash and a byte buffer. The desired number of
-// replicas is given in KeepClient.Want_replicas. Returns the number of
-// replicas that were written and if there was an error. Note this will return
-// InsufficientReplias whenever 0 <= replicas < this.Wants_replicas.
-func (this KeepClient) PutHB(hash string, buf []byte) (locator string, replicas int, err error) {
+// PutHB writes a block to Keep. The hash of the bytes is given in
+// hash, and the data is given in buf.
+//
+// Return values are the same as for PutHR.
+func (kc *KeepClient) PutHB(hash string, buf []byte) (string, int, error) {
t := streamer.AsyncStreamFromSlice(buf)
defer t.Close()
-
- return this.putReplicas(hash, t, int64(len(buf)))
+ return kc.putReplicas(hash, t, int64(len(buf)))
}
-// Put a block given a buffer. The hash will be computed. The desired number
-// of replicas is given in KeepClient.Want_replicas. Returns the number of
-// replicas that were written and if there was an error. Note this will return
-// InsufficientReplias whenever 0 <= replicas < this.Wants_replicas.
-func (this KeepClient) PutB(buffer []byte) (locator string, replicas int, err error) {
+// PutB writes a block to Keep. It computes the hash itself.
+//
+// Return values are the same as for PutHR.
+func (kc *KeepClient) PutB(buffer []byte) (string, int, error) {
hash := fmt.Sprintf("%x", md5.Sum(buffer))
- return this.PutHB(hash, buffer)
+ return kc.PutHB(hash, buffer)
}
-// Put a block, given a Reader. This will read the entire reader into a buffer
-// to compute the hash. The desired number of replicas is given in
-// KeepClient.Want_replicas. Returns the number of replicas that were written
-// and if there was an error. Note this will return InsufficientReplias
-// whenever 0 <= replicas < this.Wants_replicas. Also nhote that if the block
-// hash and data size are available, PutHR() is more efficient.
-func (this KeepClient) PutR(r io.Reader) (locator string, replicas int, err error) {
+// PutR writes a block to Keep. It first reads all data from r into a buffer
+// in order to compute the hash.
+//
+// Return values are the same as for PutHR.
+//
+// If the block hash and data size are known, PutHR is more efficient.
+func (kc *KeepClient) PutR(r io.Reader) (locator string, replicas int, err error) {
if buffer, err := ioutil.ReadAll(r); err != nil {
return "", 0, err
} else {
- return this.PutB(buffer)
+ return kc.PutB(buffer)
}
}
-// Get a block given a hash. Return a reader, the expected data length, the
-// URL the block was fetched from, and if there was an error. If the block
-// checksum does not match, the final Read() on the reader returned by this
-// method will return a BadChecksum error instead of EOF.
-func (this KeepClient) Get(hash string) (reader io.ReadCloser,
- contentLength int64, url string, err error) {
- return this.AuthorizedGet(hash, "", "")
-}
-
-// Get a block given a hash, with additional authorization provided by
-// signature and timestamp. Return a reader, the expected data length, the URL
-// the block was fetched from, and if there was an error. If the block
-// checksum does not match, the final Read() on the reader returned by this
-// method will return a BadChecksum error instead of EOF.
-func (this KeepClient) AuthorizedGet(hash string,
- signature string,
- timestamp string) (reader io.ReadCloser,
- contentLength int64, url string, err error) {
-
- // Take the hash of locator and timestamp in order to identify this
- // specific transaction in log statements.
- requestId := fmt.Sprintf("%x", md5.Sum([]byte(hash+time.Now().String())))[0:8]
-
- // Calculate the ordering for asking servers
- sv := NewRootSorter(this.ServiceRoots(), hash).GetSortedRoots()
-
- for _, host := range sv {
- var req *http.Request
- var err error
- var url string
- if signature != "" {
- url = fmt.Sprintf("%s/%s+A%s@%s", host, hash,
- signature, timestamp)
- } else {
- url = fmt.Sprintf("%s/%s", host, hash)
- }
- if req, err = http.NewRequest("GET", url, nil); err != nil {
+// Get() retrieves a block, given a locator. Returns a reader, the
+// expected data length, the URL the block is being fetched from, and
+// an error.
+//
+// If the block checksum does not match, the final Read() on the
+// reader returned by this method will return a BadChecksum error
+// instead of EOF.
+func (kc *KeepClient) Get(locator string) (io.ReadCloser, int64, string, error) {
+ var errs []string
+ for _, host := range kc.getSortedRoots(locator) {
+ url := host + "/" + locator
+ req, err := http.NewRequest("GET", url, nil)
+ if err != nil {
continue
}
-
- req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.Arvados.ApiToken))
-
- log.Printf("[%v] Begin download %s", requestId, url)
-
- var resp *http.Response
- if resp, err = this.Client.Do(req); err != nil || resp.StatusCode != http.StatusOK {
- statusCode := -1
- var respbody []byte
+ req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", kc.Arvados.ApiToken))
+ resp, err := kc.Client.Do(req)
+ if err != nil || resp.StatusCode != http.StatusOK {
if resp != nil {
- statusCode = resp.StatusCode
+ var respbody []byte
if resp.Body != nil {
respbody, _ = ioutil.ReadAll(&io.LimitedReader{resp.Body, 4096})
}
+ errs = append(errs, fmt.Sprintf("%s: %d %s",
+ url, resp.StatusCode, strings.TrimSpace(string(respbody))))
+ } else {
+ errs = append(errs, fmt.Sprintf("%s: %v", url, err))
}
- response := strings.TrimSpace(string(respbody))
- log.Printf("[%v] Download %v status code: %v error: \"%v\" response: \"%v\"",
- requestId, url, statusCode, err, response)
continue
}
-
- if resp.StatusCode == http.StatusOK {
- log.Printf("[%v] Download %v status code: %v", requestId, url, resp.StatusCode)
- return HashCheckingReader{resp.Body, md5.New(), hash}, resp.ContentLength, url, nil
- }
+ return HashCheckingReader{
+ Reader: resp.Body,
+ Hash: md5.New(),
+ Check: locator[0:32],
+ }, resp.ContentLength, url, nil
}
-
+ log.Printf("DEBUG: GET %s failed: %v", locator, errs)
return nil, 0, "", BlockNotFound
}
-// Determine if a block with the given hash is available and readable, but does
-// not return the block contents.
-func (this KeepClient) Ask(hash string) (contentLength int64, url string, err error) {
- return this.AuthorizedAsk(hash, "", "")
-}
-
-// Determine if a block with the given hash is available and readable with the
-// given signature and timestamp, but does not return the block contents.
-func (this KeepClient) AuthorizedAsk(hash string, signature string,
- timestamp string) (contentLength int64, url string, err error) {
- // Calculate the ordering for asking servers
- sv := NewRootSorter(this.ServiceRoots(), hash).GetSortedRoots()
-
- for _, host := range sv {
- var req *http.Request
- var err error
- if signature != "" {
- url = fmt.Sprintf("%s/%s+A%s@%s", host, hash,
- signature, timestamp)
- } else {
- url = fmt.Sprintf("%s/%s", host, hash)
- }
-
- if req, err = http.NewRequest("HEAD", url, nil); err != nil {
- continue
- }
-
- req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.Arvados.ApiToken))
-
- var resp *http.Response
- if resp, err = this.Client.Do(req); err != nil {
+// Ask() verifies that a block with the given hash is available and
+// readable, according to at least one Keep service. Unlike Get, it
+// does not retrieve the data or verify that the data content matches
+// the hash specified by the locator.
+//
+// Returns the data size (content length) reported by the Keep service
+// and the URI reporting the data size.
+func (kc *KeepClient) Ask(locator string) (int64, string, error) {
+ for _, host := range kc.getSortedRoots(locator) {
+ url := host + "/" + locator
+ req, err := http.NewRequest("HEAD", url, nil)
+ if err != nil {
continue
}
-
- if resp.StatusCode == http.StatusOK {
+ req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", kc.Arvados.ApiToken))
+ if resp, err := kc.Client.Do(req); err == nil && resp.StatusCode == http.StatusOK {
return resp.ContentLength, url, nil
}
}
-
return 0, "", BlockNotFound
-
}
-// Atomically read the service_roots field.
-func (this *KeepClient) ServiceRoots() map[string]string {
- r := (*map[string]string)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&this.service_roots))))
- return *r
+// LocalRoots() returns the map of local (i.e., disk and proxy) Keep
+// services: uuid -> baseURI.
+func (kc *KeepClient) LocalRoots() map[string]string {
+ kc.lock.RLock()
+ defer kc.lock.RUnlock()
+ return *kc.localRoots
}
-// Atomically update the service_roots field. Enables you to update
-// service_roots without disrupting any GET or PUT operations that might
-// already be in progress.
-func (this *KeepClient) SetServiceRoots(new_roots map[string]string) {
- roots := make(map[string]string)
- for uuid, root := range new_roots {
- roots[uuid] = root
- }
- atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(&this.service_roots)),
- unsafe.Pointer(&roots))
+// GatewayRoots() returns the map of Keep remote gateway services:
+// uuid -> baseURI.
+func (kc *KeepClient) GatewayRoots() map[string]string {
+ kc.lock.RLock()
+ defer kc.lock.RUnlock()
+ return *kc.gatewayRoots
}
-type Locator struct {
- Hash string
- Size int
- Signature string
- Timestamp string
+// SetServiceRoots updates the localRoots and gatewayRoots maps,
+// without risk of disrupting operations that are already in progress.
+//
+// The KeepClient makes its own copy of the supplied maps, so the
+// caller can reuse/modify them after SetServiceRoots returns, but
+// they should not be modified by any other goroutine while
+// SetServiceRoots is running.
+func (kc *KeepClient) SetServiceRoots(newLocals, newGateways map[string]string) {
+ locals := make(map[string]string)
+ for uuid, root := range newLocals {
+ locals[uuid] = root
+ }
+ gateways := make(map[string]string)
+ for uuid, root := range newGateways {
+ gateways[uuid] = root
+ }
+ kc.lock.Lock()
+ defer kc.lock.Unlock()
+ kc.localRoots = &locals
+ kc.gatewayRoots = &gateways
}
-func MakeLocator2(hash string, hints string) (locator Locator) {
- locator.Hash = hash
- if hints != "" {
- signature_pat, _ := regexp.Compile("^A([[:xdigit:]]+)@([[:xdigit:]]{8})$")
- for _, hint := range strings.Split(hints, "+") {
- if hint != "" {
- if match, _ := regexp.MatchString("^[[:digit:]]+$", hint); match {
- fmt.Sscanf(hint, "%d", &locator.Size)
- } else if m := signature_pat.FindStringSubmatch(hint); m != nil {
- locator.Signature = m[1]
- locator.Timestamp = m[2]
- } else if match, _ := regexp.MatchString("^[:upper:]", hint); match {
- // Any unknown hint that starts with an uppercase letter is
- // presumed to be valid and ignored, to permit forward compatibility.
- } else {
- // Unknown format; not a valid locator.
- return Locator{"", 0, "", ""}
- }
+// getSortedRoots returns a list of base URIs of Keep services, in the
+// order they should be attempted in order to retrieve content for the
+// given locator.
+func (kc *KeepClient) getSortedRoots(locator string) []string {
+ var found []string
+ for _, hint := range strings.Split(locator, "+") {
+ if len(hint) < 7 || hint[0:2] != "K@" {
+ // Not a service hint.
+ continue
+ }
+ if len(hint) == 7 {
+ // +K@abcde means fetch from proxy at
+ // keep.abcde.arvadosapi.com
+ found = append(found, "https://keep."+hint[2:]+".arvadosapi.com")
+ } else if len(hint) == 29 {
+ // +K@abcde-abcde-abcdeabcdeabcde means fetch
+ // from gateway with given uuid
+ if gwURI, ok := kc.GatewayRoots()[hint[2:]]; ok {
+ found = append(found, gwURI)
}
+ // else this hint is no use to us; carry on.
}
}
- return locator
+ // After trying all usable service hints, fall back to local roots.
+ found = append(found, NewRootSorter(kc.LocalRoots(), locator[0:32]).GetSortedRoots()...)
+ return found
+}
+
+type Locator struct {
+ Hash string
+ Size int // -1 if data size is not known
+ Hints []string // Including the size hint, if any
}
-func MakeLocator(path string) Locator {
- pathpattern, err := regexp.Compile("^([0-9a-f]{32})([+].*)?$")
- if err != nil {
- log.Print("Don't like regexp", err)
+func (loc *Locator) String() string {
+ s := loc.Hash
+ if len(loc.Hints) > 0 {
+ s = s + "+" + strings.Join(loc.Hints, "+")
}
+ return s
+}
+
+var locatorMatcher = regexp.MustCompile("^([0-9a-f]{32})([+](.*))?$")
- sm := pathpattern.FindStringSubmatch(path)
+func MakeLocator(path string) (*Locator, error) {
+ sm := locatorMatcher.FindStringSubmatch(path)
if sm == nil {
- log.Print("Failed match ", path)
- return Locator{"", 0, "", ""}
+ return nil, InvalidLocatorError
}
-
- return MakeLocator2(sm[1], sm[2])
+ loc := Locator{Hash: sm[1], Size: -1}
+ if sm[2] != "" {
+ loc.Hints = strings.Split(sm[3], "+")
+ } else {
+ loc.Hints = []string{}
+ }
+ if len(loc.Hints) > 0 {
+ if size, err := strconv.Atoi(loc.Hints[0]); err == nil {
+ loc.Size = size
+ }
+ }
+ return &loc, nil
}
kc, err := MakeKeepClient(&arv)
c.Assert(err, Equals, nil)
- c.Check(len(kc.ServiceRoots()), Equals, 2)
- for _, root := range kc.ServiceRoots() {
+ c.Check(len(kc.LocalRoots()), Equals, 2)
+ for _, root := range kc.LocalRoots() {
c.Check(root, Matches, "http://localhost:\\d+")
}
}
handled chan string
}
-func (this StubPutHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
- this.c.Check(req.URL.Path, Equals, "/"+this.expectPath)
- this.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", this.expectApiToken))
+func (sph StubPutHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+ sph.c.Check(req.URL.Path, Equals, "/"+sph.expectPath)
+ sph.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", sph.expectApiToken))
body, err := ioutil.ReadAll(req.Body)
- this.c.Check(err, Equals, nil)
- this.c.Check(body, DeepEquals, []byte(this.expectBody))
+ sph.c.Check(err, Equals, nil)
+ sph.c.Check(body, DeepEquals, []byte(sph.expectBody))
resp.WriteHeader(200)
- this.handled <- fmt.Sprintf("http://%s", req.Host)
+ sph.handled <- fmt.Sprintf("http://%s", req.Host)
}
func RunFakeKeepServer(st http.Handler) (ks KeepServer) {
return
}
-func UploadToStubHelper(c *C, st http.Handler, f func(KeepClient, string,
+func UploadToStubHelper(c *C, st http.Handler, f func(*KeepClient, string,
io.ReadCloser, io.WriteCloser, chan uploadStatus)) {
ks := RunFakeKeepServer(st)
make(chan string)}
UploadToStubHelper(c, st,
- func(kc KeepClient, url string, reader io.ReadCloser,
+ func(kc *KeepClient, url string, reader io.ReadCloser,
writer io.WriteCloser, upload_status chan uploadStatus) {
go kc.uploadToKeepServer(url, st.expectPath, reader, upload_status, int64(len("foo")), "TestUploadToStubKeepServer")
make(chan string)}
UploadToStubHelper(c, st,
- func(kc KeepClient, url string, reader io.ReadCloser,
+ func(kc *KeepClient, url string, reader io.ReadCloser,
writer io.WriteCloser, upload_status chan uploadStatus) {
tr := streamer.AsyncStreamFromReader(512, reader)
handled chan string
}
-func (this FailHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+func (fh FailHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
resp.WriteHeader(500)
- this.handled <- fmt.Sprintf("http://%s", req.Host)
+ fh.handled <- fmt.Sprintf("http://%s", req.Host)
}
func (s *StandaloneSuite) TestFailedUploadToStubKeepServer(c *C) {
hash := "acbd18db4cc2f85cedef654fccc4a4d8"
UploadToStubHelper(c, st,
- func(kc KeepClient, url string, reader io.ReadCloser,
+ func(kc *KeepClient, url string, reader io.ReadCloser,
writer io.WriteCloser, upload_status chan uploadStatus) {
go kc.uploadToKeepServer(url, hash, reader, upload_status, 3, "TestFailedUploadToStubKeepServer")
kc.Want_replicas = 2
arv.ApiToken = "abc123"
- service_roots := make(map[string]string)
+ localRoots := make(map[string]string)
ks := RunSomeFakeKeepServers(st, 5)
for i, k := range ks {
- service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
+ localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
defer k.listener.Close()
}
- kc.SetServiceRoots(service_roots)
+ kc.SetServiceRoots(localRoots, nil)
kc.PutB([]byte("foo"))
shuff := NewRootSorter(
- kc.ServiceRoots(), Md5String("foo")).GetSortedRoots()
+ kc.LocalRoots(), Md5String("foo")).GetSortedRoots()
s1 := <-st.handled
s2 := <-st.handled
kc.Want_replicas = 2
arv.ApiToken = "abc123"
- service_roots := make(map[string]string)
+ localRoots := make(map[string]string)
ks := RunSomeFakeKeepServers(st, 5)
for i, k := range ks {
- service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
+ localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
defer k.listener.Close()
}
- kc.SetServiceRoots(service_roots)
+ kc.SetServiceRoots(localRoots, nil)
reader, writer := io.Pipe()
kc.PutHR(hash, reader, 3)
- shuff := NewRootSorter(kc.ServiceRoots(), hash).GetSortedRoots()
+ shuff := NewRootSorter(kc.LocalRoots(), hash).GetSortedRoots()
log.Print(shuff)
s1 := <-st.handled
kc.Want_replicas = 2
arv.ApiToken = "abc123"
- service_roots := make(map[string]string)
+ localRoots := make(map[string]string)
ks1 := RunSomeFakeKeepServers(st, 4)
ks2 := RunSomeFakeKeepServers(fh, 1)
for i, k := range ks1 {
- service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
+ localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
defer k.listener.Close()
}
for i, k := range ks2 {
- service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
+ localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
defer k.listener.Close()
}
- kc.SetServiceRoots(service_roots)
+ kc.SetServiceRoots(localRoots, nil)
shuff := NewRootSorter(
- kc.ServiceRoots(), Md5String("foo")).GetSortedRoots()
+ kc.LocalRoots(), Md5String("foo")).GetSortedRoots()
phash, replicas, err := kc.PutB([]byte("foo"))
kc.Want_replicas = 2
arv.ApiToken = "abc123"
- service_roots := make(map[string]string)
+ localRoots := make(map[string]string)
ks1 := RunSomeFakeKeepServers(st, 1)
ks2 := RunSomeFakeKeepServers(fh, 4)
for i, k := range ks1 {
- service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
+ localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
defer k.listener.Close()
}
for i, k := range ks2 {
- service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
+ localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
defer k.listener.Close()
}
- kc.SetServiceRoots(service_roots)
+ kc.SetServiceRoots(localRoots, nil)
_, replicas, err := kc.PutB([]byte("foo"))
c *C
expectPath string
expectApiToken string
- returnBody []byte
+ httpStatus int
+ body []byte
}
-func (this StubGetHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
- this.c.Check(req.URL.Path, Equals, "/"+this.expectPath)
- this.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", this.expectApiToken))
- resp.Header().Set("Content-Length", fmt.Sprintf("%d", len(this.returnBody)))
- resp.Write(this.returnBody)
+func (sgh StubGetHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+ sgh.c.Check(req.URL.Path, Equals, "/"+sgh.expectPath)
+ sgh.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", sgh.expectApiToken))
+ resp.WriteHeader(sgh.httpStatus)
+ resp.Header().Set("Content-Length", fmt.Sprintf("%d", len(sgh.body)))
+ resp.Write(sgh.body)
}
func (s *StandaloneSuite) TestGet(c *C) {
c,
hash,
"abc123",
+ http.StatusOK,
[]byte("foo")}
ks := RunFakeKeepServer(st)
arv, err := arvadosclient.MakeArvadosClient()
kc, _ := MakeKeepClient(&arv)
arv.ApiToken = "abc123"
- kc.SetServiceRoots(map[string]string{"x": ks.url})
+ kc.SetServiceRoots(map[string]string{"x": ks.url}, nil)
r, n, url2, err := kc.Get(hash)
defer r.Close()
arv, err := arvadosclient.MakeArvadosClient()
kc, _ := MakeKeepClient(&arv)
arv.ApiToken = "abc123"
- kc.SetServiceRoots(map[string]string{"x": ks.url})
+ kc.SetServiceRoots(map[string]string{"x": ks.url}, nil)
r, n, url2, err := kc.Get(hash)
c.Check(err, Equals, BlockNotFound)
c.Check(r, Equals, nil)
}
+func (s *StandaloneSuite) TestGetWithServiceHint(c *C) {
+ uuid := "zzzzz-bi6l4-123451234512345"
+ hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
+
+ // This one shouldn't be used:
+ ks0 := RunFakeKeepServer(StubGetHandler{
+ c,
+ "error if used",
+ "abc123",
+ http.StatusOK,
+ []byte("foo")})
+ defer ks0.listener.Close()
+ // This one should be used:
+ ks := RunFakeKeepServer(StubGetHandler{
+ c,
+ hash + "+K@" + uuid,
+ "abc123",
+ http.StatusOK,
+ []byte("foo")})
+ defer ks.listener.Close()
+
+ arv, err := arvadosclient.MakeArvadosClient()
+ kc, _ := MakeKeepClient(&arv)
+ arv.ApiToken = "abc123"
+ kc.SetServiceRoots(
+ map[string]string{"x": ks0.url},
+ map[string]string{uuid: ks.url})
+
+ r, n, uri, err := kc.Get(hash + "+K@" + uuid)
+ defer r.Close()
+ c.Check(err, Equals, nil)
+ c.Check(n, Equals, int64(3))
+ c.Check(uri, Equals, fmt.Sprintf("%s/%s", ks.url, hash+"+K@"+uuid))
+
+ content, err := ioutil.ReadAll(r)
+ c.Check(err, Equals, nil)
+ c.Check(content, DeepEquals, []byte("foo"))
+}
+
+// Use a service hint to fetch from a local disk service, overriding
+// rendezvous probe order.
+func (s *StandaloneSuite) TestGetWithLocalServiceHint(c *C) {
+ uuid := "zzzzz-bi6l4-zzzzzzzzzzzzzzz"
+ hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
+
+ // This one shouldn't be used, although it appears first in
+ // rendezvous probe order:
+ ks0 := RunFakeKeepServer(StubGetHandler{
+ c,
+ "error if used",
+ "abc123",
+ http.StatusOK,
+ []byte("foo")})
+ defer ks0.listener.Close()
+ // This one should be used:
+ ks := RunFakeKeepServer(StubGetHandler{
+ c,
+ hash + "+K@" + uuid,
+ "abc123",
+ http.StatusOK,
+ []byte("foo")})
+ defer ks.listener.Close()
+
+ arv, err := arvadosclient.MakeArvadosClient()
+ kc, _ := MakeKeepClient(&arv)
+ arv.ApiToken = "abc123"
+ kc.SetServiceRoots(
+ map[string]string{
+ "zzzzz-bi6l4-yyyyyyyyyyyyyyy": ks0.url,
+ "zzzzz-bi6l4-xxxxxxxxxxxxxxx": ks0.url,
+ "zzzzz-bi6l4-wwwwwwwwwwwwwww": ks0.url,
+ uuid: ks.url},
+ map[string]string{
+ "zzzzz-bi6l4-yyyyyyyyyyyyyyy": ks0.url,
+ "zzzzz-bi6l4-xxxxxxxxxxxxxxx": ks0.url,
+ "zzzzz-bi6l4-wwwwwwwwwwwwwww": ks0.url,
+ uuid: ks.url},
+ )
+
+ r, n, uri, err := kc.Get(hash + "+K@" + uuid)
+ defer r.Close()
+ c.Check(err, Equals, nil)
+ c.Check(n, Equals, int64(3))
+ c.Check(uri, Equals, fmt.Sprintf("%s/%s", ks.url, hash+"+K@"+uuid))
+
+ content, err := ioutil.ReadAll(r)
+ c.Check(err, Equals, nil)
+ c.Check(content, DeepEquals, []byte("foo"))
+}
+
+func (s *StandaloneSuite) TestGetWithServiceHintFailoverToLocals(c *C) {
+ uuid := "zzzzz-bi6l4-123451234512345"
+ hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
+
+ ksLocal := RunFakeKeepServer(StubGetHandler{
+ c,
+ hash + "+K@" + uuid,
+ "abc123",
+ http.StatusOK,
+ []byte("foo")})
+ defer ksLocal.listener.Close()
+ ksGateway := RunFakeKeepServer(StubGetHandler{
+ c,
+ hash + "+K@" + uuid,
+ "abc123",
+ http.StatusInternalServerError,
+ []byte("Error")})
+ defer ksGateway.listener.Close()
+
+ arv, err := arvadosclient.MakeArvadosClient()
+ kc, _ := MakeKeepClient(&arv)
+ arv.ApiToken = "abc123"
+ kc.SetServiceRoots(
+ map[string]string{"zzzzz-bi6l4-keepdisk0000000": ksLocal.url},
+ map[string]string{uuid: ksGateway.url})
+
+ r, n, uri, err := kc.Get(hash + "+K@" + uuid)
+ c.Assert(err, Equals, nil)
+ defer r.Close()
+ c.Check(n, Equals, int64(3))
+ c.Check(uri, Equals, fmt.Sprintf("%s/%s", ksLocal.url, hash+"+K@"+uuid))
+
+ content, err := ioutil.ReadAll(r)
+ c.Check(err, Equals, nil)
+ c.Check(content, DeepEquals, []byte("foo"))
+}
+
type BarHandler struct {
handled chan string
}
arv, err := arvadosclient.MakeArvadosClient()
kc, _ := MakeKeepClient(&arv)
arv.ApiToken = "abc123"
- kc.SetServiceRoots(map[string]string{"x": ks.url})
+ kc.SetServiceRoots(map[string]string{"x": ks.url}, nil)
r, n, _, err := kc.Get(barhash)
_, err = ioutil.ReadAll(r)
c,
hash,
"abc123",
+ http.StatusOK,
content}
arv, err := arvadosclient.MakeArvadosClient()
kc, _ := MakeKeepClient(&arv)
arv.ApiToken = "abc123"
- service_roots := make(map[string]string)
+ localRoots := make(map[string]string)
ks1 := RunSomeFakeKeepServers(st, 1)
ks2 := RunSomeFakeKeepServers(fh, 4)
for i, k := range ks1 {
- service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
+ localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
defer k.listener.Close()
}
for i, k := range ks2 {
- service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
+ localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
defer k.listener.Close()
}
- kc.SetServiceRoots(service_roots)
+ kc.SetServiceRoots(localRoots, nil)
// This test works only if one of the failing services is
// attempted before the succeeding service. Otherwise,
// the choice of block content "waz" and the UUIDs of the fake
// servers, so we just tried different strings until we found
// an example that passes this Assert.)
- c.Assert(NewRootSorter(service_roots, hash).GetSortedRoots()[0], Not(Equals), ks1[0].url)
+ c.Assert(NewRootSorter(localRoots, hash).GetSortedRoots()[0], Not(Equals), ks1[0].url)
r, n, url2, err := kc.Get(hash)
kc.Want_replicas = 2
kc.Using_proxy = true
arv.ApiToken = "abc123"
- service_roots := make(map[string]string)
+ localRoots := make(map[string]string)
ks1 := RunSomeFakeKeepServers(st, 1)
for i, k := range ks1 {
- service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
+ localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
defer k.listener.Close()
}
- kc.SetServiceRoots(service_roots)
+ kc.SetServiceRoots(localRoots, nil)
_, replicas, err := kc.PutB([]byte("foo"))
<-st.handled
kc.Want_replicas = 3
kc.Using_proxy = true
arv.ApiToken = "abc123"
- service_roots := make(map[string]string)
+ localRoots := make(map[string]string)
ks1 := RunSomeFakeKeepServers(st, 1)
for i, k := range ks1 {
- service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
+ localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
defer k.listener.Close()
}
- kc.SetServiceRoots(service_roots)
+ kc.SetServiceRoots(localRoots, nil)
_, replicas, err := kc.PutB([]byte("foo"))
<-st.handled
}
func (s *StandaloneSuite) TestMakeLocator(c *C) {
- l := MakeLocator("91f372a266fe2bf2823cb8ec7fda31ce+3+Aabcde@12345678")
+ l, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31ce+3+Aabcde@12345678")
+ c.Check(err, Equals, nil)
+ c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
+ c.Check(l.Size, Equals, 3)
+ c.Check(l.Hints, DeepEquals, []string{"3", "Aabcde@12345678"})
+}
+
+func (s *StandaloneSuite) TestMakeLocatorNoHints(c *C) {
+ l, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31ce")
+ c.Check(err, Equals, nil)
+ c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
+ c.Check(l.Size, Equals, -1)
+ c.Check(l.Hints, DeepEquals, []string{})
+}
+func (s *StandaloneSuite) TestMakeLocatorNoSizeHint(c *C) {
+ l, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31ce+Aabcde@12345678")
+ c.Check(err, Equals, nil)
+ c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
+ c.Check(l.Size, Equals, -1)
+ c.Check(l.Hints, DeepEquals, []string{"Aabcde@12345678"})
+}
+
+func (s *StandaloneSuite) TestMakeLocatorPreservesUnrecognizedHints(c *C) {
+ str := "91f372a266fe2bf2823cb8ec7fda31ce+3+Unknown+Kzzzzz+Afoobar"
+ l, err := MakeLocator(str)
+ c.Check(err, Equals, nil)
c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
c.Check(l.Size, Equals, 3)
- c.Check(l.Signature, Equals, "abcde")
- c.Check(l.Timestamp, Equals, "12345678")
+ c.Check(l.Hints, DeepEquals, []string{"3", "Unknown", "Kzzzzz", "Afoobar"})
+ c.Check(l.String(), Equals, str)
+}
+
+func (s *StandaloneSuite) TestMakeLocatorInvalidInput(c *C) {
+ _, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31c")
+ c.Check(err, Equals, InvalidLocatorError)
}
)
type RootSorter struct {
- root []string
- weight []string
- order []int
+ root []string
+ weight []string
+ order []int
}
-func NewRootSorter(serviceRoots map[string]string, hash string) (*RootSorter) {
+func NewRootSorter(serviceRoots map[string]string, hash string) *RootSorter {
rs := new(RootSorter)
rs.root = make([]string, len(serviceRoots))
rs.weight = make([]string, len(serviceRoots))
return rs
}
-func (rs RootSorter) getWeight(hash string, uuid string) (string) {
+func (rs RootSorter) getWeight(hash string, uuid string) string {
if len(uuid) == 27 {
return Md5String(hash + uuid[12:])
} else {
}
}
-func (rs RootSorter) GetSortedRoots() ([]string) {
+func (rs RootSorter) GetSortedRoots() []string {
sorted := make([]string, len(rs.order))
for i := range rs.order {
sorted[i] = rs.root[rs.order[i]]
)
type RootSorterSuite struct{}
+
var _ = Suite(&RootSorterSuite{})
-func FakeSvcRoot(i uint64) (string) {
+func FakeSvcRoot(i uint64) string {
return fmt.Sprintf("https://%x.svc/", i)
}
-func FakeSvcUuid(i uint64) (string) {
+func FakeSvcUuid(i uint64) string {
return fmt.Sprintf("zzzzz-bi6l4-%015x", i)
}
-func FakeServiceRoots(n uint64) (map[string]string) {
+func FakeServiceRoots(n uint64) map[string]string {
sr := map[string]string{}
- for i := uint64(0); i < n; i ++ {
+ for i := uint64(0); i < n; i++ {
sr[FakeSvcUuid(i)] = FakeSvcRoot(i)
}
return sr
}
}
-func (this *KeepClient) DiscoverKeepServers() (map[string]string, error) {
+func (this *KeepClient) DiscoverKeepServers() error {
type svcList struct {
Items []keepDisk `json:"items"`
}
if err != nil {
if err := this.Arvados.List("keep_disks", nil, &m); err != nil {
- return nil, err
+ return err
}
}
listed := make(map[string]bool)
- service_roots := make(map[string]string)
+ localRoots := make(map[string]string)
+ gatewayRoots := make(map[string]string)
- for _, element := range m.Items {
- n := ""
-
- if element.SSL {
- n = "s"
+ for _, service := range m.Items {
+ scheme := "http"
+ if service.SSL {
+ scheme = "https"
}
-
- // Construct server URL
- url := fmt.Sprintf("http%s://%s:%d", n, element.Hostname, element.Port)
+ url := fmt.Sprintf("%s://%s:%d", scheme, service.Hostname, service.Port)
// Skip duplicates
- if !listed[url] {
- listed[url] = true
- service_roots[element.Uuid] = url
+ if listed[url] {
+ continue
}
- if element.SvcType == "proxy" {
+ listed[url] = true
+
+ switch service.SvcType {
+ case "disk":
+ localRoots[service.Uuid] = url
+ case "proxy":
+ localRoots[service.Uuid] = url
this.Using_proxy = true
}
+ // Gateway services are only used when specified by
+ // UUID, so there's nothing to gain by filtering them
+ // by service type. Including all accessible services
+ // (gateway and otherwise) merely accommodates more
+ // service configurations.
+ gatewayRoots[service.Uuid] = url
}
if this.Using_proxy {
this.setClientSettingsStore()
}
- this.SetServiceRoots(service_roots)
-
- return service_roots, nil
+ this.SetServiceRoots(localRoots, gatewayRoots)
+ return nil
}
type uploadStatus struct {
requestId := fmt.Sprintf("%x", md5.Sum([]byte(locator+time.Now().String())))[0:8]
// Calculate the ordering for uploading to servers
- sv := NewRootSorter(this.ServiceRoots(), hash).GetSortedRoots()
+ sv := NewRootSorter(this.LocalRoots(), hash).GetSortedRoots()
// The next server to try contacting
next_server := 0
if self._put_queue is not None:
self._put_queue.task_done()
+ if block.state() != _BufferBlock.WRITABLE:
+ return
+
if wait:
block.set_state(_BufferBlock.PENDING)
loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
for k,v in items:
if v.state() == _BufferBlock.WRITABLE:
- self.commit_bufferblock(v, False)
+ v.owner.flush(False)
with self.lock:
if self._put_queue is not None:
break
return ''.join(data)
- def _repack_writes(self):
+ def _repack_writes(self, num_retries):
"""Test if the buffer block has more data than actual segments.
This happens when a buffered write over-writes a file range written in
if write_total < self._current_bblock.size():
# There is more data in the buffer block than is actually accounted for by segments, so
# re-pack into a new buffer by copying over to a new buffer block.
+ contents = self.parent._my_block_manager().get_block_contents(self._current_bblock.blockid, num_retries)
new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self)
for t in bufferblock_segs:
- new_bb.append(self._current_bblock.buffer_view[t.segment_offset:t.segment_offset+t.range_size].tobytes())
+ new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
t.segment_offset = new_bb.size() - t.range_size
self._current_bblock = new_bb
self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
- self._repack_writes()
+ self._repack_writes(num_retries)
if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
self.parent._my_block_manager().commit_bufferblock(self._current_bblock, False)
self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
return len(data)
@synchronized
- def flush(self, wait=True):
+ def flush(self, wait=True, num_retries=0):
if self.modified():
if self._current_bblock and self._current_bblock.state() == _BufferBlock.WRITABLE:
- self._repack_writes()
+ self._repack_writes(num_retries)
self.parent._my_block_manager().commit_bufferblock(self._current_bblock, wait)
self.parent.notify(MOD, self.parent, self.name, (self, self))
raise IOError(errno.EROFS, "Source collection must be writable.")
target_dir.add(source_obj, target_name, overwrite, True)
- @synchronized
+ def portable_manifest_text(self, stream_name="."):
+ """Get the manifest text for this collection, sub collections and files.
+
+ This method does not flush outstanding blocks to Keep. It will return
+ a normalized manifest with access tokens stripped.
+
+ :stream_name:
+ Name to use for this stream (directory)
+
+ """
+ return self._get_manifest_text(stream_name, True, True)
+
def manifest_text(self, stream_name=".", strip=False, normalize=False):
"""Get the manifest text for this collection, sub collections and files.
+ This method will flush outstanding blocks to Keep. By default, it will
+ not normalize an unmodified manifest or strip access tokens.
+
+ :stream_name:
+ Name to use for this stream (directory)
+
+ :strip:
+ If True, remove signing tokens from block locators if present.
+ If False (default), block locators are left unchanged.
+
+ :normalize:
+ If True, always export the manifest text in normalized form
+ even if the Collection is not modified. If False (default) and the collection
+ is not modified, return the original manifest text even if it is not
+ in normalized form.
+
+ """
+
+ self._my_block_manager().commit_all()
+ return self._get_manifest_text(stream_name, strip, normalize)
+
+ @synchronized
+ def _get_manifest_text(self, stream_name, strip, normalize):
+ """Get the manifest text for this collection, sub collections and files.
+
:stream_name:
- Name of the stream (directory)
+ Name to use for this stream (directory)
:strip:
If True, remove signing tokens from block locators if present.
if stream:
buf.append(" ".join(normalize_stream(stream_name, stream)) + "\n")
for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]:
- buf.append(self[dirname].manifest_text(stream_name=os.path.join(stream_name, dirname), strip=strip))
+ buf.append(self[dirname].manifest_text(stream_name=os.path.join(stream_name, dirname), strip=strip, normalize=True))
return "".join(buf)
else:
if strip:
def portable_data_hash(self):
"""Get the portable data hash for this collection's manifest."""
- stripped = self.manifest_text(strip=True)
+ stripped = self.portable_manifest_text()
return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
@synchronized
"""Save collection to an existing collection record.
Commit pending buffer blocks to Keep, merge with remote record (if
- merge=True, the default), write the manifest to Keep, and update the
- collection record.
+ merge=True, the default), and update the collection record. Returns
+ the current manifest text.
Will raise AssertionError if not associated with a collection record on
the API server. If you want to save a manifest to Keep only, see
if self.modified():
if not self._has_collection_uuid():
raise AssertionError("Collection manifest_locator must be a collection uuid. Use save_new() for new collections.")
+
self._my_block_manager().commit_all()
+
if merge:
self.update()
- self._my_keep().put(self.manifest_text(strip=True), num_retries=num_retries)
text = self.manifest_text(strip=False)
self._api_response = self._my_api().collections().update(
self._manifest_text = self._api_response["manifest_text"]
self.set_unmodified()
+ return self._manifest_text
+
@must_be_writable
@synchronized
@retry_method
- def save_new(self, name=None, create_collection_record=True, owner_uuid=None, ensure_unique_name=False, num_retries=None):
+ def save_new(self, name=None,
+ create_collection_record=True,
+ owner_uuid=None,
+ ensure_unique_name=False,
+ num_retries=None):
"""Save collection to a new collection record.
- Commit pending buffer blocks to Keep, write the manifest to Keep, and
- create a new collection record (if create_collection_record True).
- After creating a new collection record, this Collection object will be
- associated with the new record used by `save()`.
+ Commit pending buffer blocks to Keep and, when create_collection_record
+ is True (default), create a new collection record. After creating a
+ new collection record, this Collection object will be associated with
+ the new record used by `save()`. Returns the current manifest text.
:name:
The collection name.
:create_collection_record:
- If True, create a collection record. If False, only save the manifest to keep.
+ If True, create a collection record on the API server.
+ If False, only commit blocks to Keep and return the manifest text.
:owner_uuid:
the user, or project uuid that will own this collection.
"""
self._my_block_manager().commit_all()
- self._my_keep().put(self.manifest_text(strip=True), num_retries=num_retries)
text = self.manifest_text(strip=False)
if create_collection_record:
self._manifest_locator = self._api_response["uuid"]
- self._manifest_text = text
- self.set_unmodified()
+ self._manifest_text = text
+ self.set_unmodified()
+
+ return text
@synchronized
def _import_manifest(self, manifest_text):
self.size = None
for hint in pieces:
if self.HINT_RE.match(hint) is None:
- raise ValueError("unrecognized hint data {}".format(hint))
+ raise ValueError("invalid hint format: {}".format(hint))
elif hint.startswith('A'):
self.parse_permission_hint(hint)
else:
if not proxy.endswith('/'):
proxy += '/'
self.api_token = api_token
+ self._gateway_services = {}
self._keep_services = [{
'uuid': 'proxy',
'_service_root': proxy,
api_client = arvados.api('v1')
self.api_client = api_client
self.api_token = api_client.api_token
+ self._gateway_services = {}
self._keep_services = None
self.using_proxy = None
self._static_services_list = False
except Exception: # API server predates Keep services.
keep_services = self.api_client.keep_disks().list()
- self._keep_services = keep_services.execute().get('items')
- if not self._keep_services:
+ accessible = keep_services.execute().get('items')
+ if not accessible:
raise arvados.errors.NoKeepServersError()
- self.using_proxy = any(ks.get('service_type') == 'proxy'
- for ks in self._keep_services)
-
# Precompute the base URI for each service.
- for r in self._keep_services:
+ for r in accessible:
r['_service_root'] = "{}://[{}]:{:d}/".format(
'https' if r['service_ssl_flag'] else 'http',
r['service_host'],
r['service_port'])
+
+ # Gateway services are only used when specified by UUID,
+ # so there's nothing to gain by filtering them by
+ # service_type.
+ self._gateway_services = {ks.get('uuid'): ks for ks in accessible}
+ _logger.debug(str(self._gateway_services))
+
+ self._keep_services = [
+ ks for ks in accessible
+ if ks.get('service_type') in ['disk', 'proxy']]
_logger.debug(str(self._keep_services))
+ self.using_proxy = any(ks.get('service_type') == 'proxy'
+ for ks in self._keep_services)
+
def _service_weight(self, data_hash, service_uuid):
"""Compute the weight of a Keep service endpoint for a data
block with a known hash.
"""
return hashlib.md5(data_hash + service_uuid[-15:]).hexdigest()
- def weighted_service_roots(self, data_hash, force_rebuild=False):
+ def weighted_service_roots(self, locator, force_rebuild=False):
"""Return an array of Keep service endpoints, in the order in
which they should be probed when reading or writing data with
- the given hash.
+ the given hash+hints.
"""
self.build_services_list(force_rebuild)
- # Sort the available services by weight (heaviest first) for
- # this data_hash, and return their service_roots (base URIs)
+ sorted_roots = []
+
+ # Use the services indicated by the given +K@... remote
+ # service hints, if any are present and can be resolved to a
+ # URI.
+ for hint in locator.hints:
+ if hint.startswith('K@'):
+ if len(hint) == 7:
+ sorted_roots.append(
+ "https://keep.{}.arvadosapi.com/".format(hint[2:]))
+ elif len(hint) == 29:
+ svc = self._gateway_services.get(hint[2:])
+ if svc:
+ sorted_roots.append(svc['_service_root'])
+
+ # Sort the available local services by weight (heaviest first)
+ # for this locator, and return their service_roots (base URIs)
# in that order.
- sorted_roots = [
+ sorted_roots.extend([
svc['_service_root'] for svc in sorted(
self._keep_services,
reverse=True,
- key=lambda svc: self._service_weight(data_hash, svc['uuid']))]
- _logger.debug(data_hash + ': ' + str(sorted_roots))
+ key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
+ _logger.debug("{}: {}".format(locator, sorted_roots))
return sorted_roots
- def map_new_services(self, roots_map, md5_s, force_rebuild, **headers):
+ def map_new_services(self, roots_map, locator, force_rebuild, **headers):
# roots_map is a dictionary, mapping Keep service root strings
# to KeepService objects. Poll for Keep services, and add any
# new ones to roots_map. Return the current list of local
# root strings.
headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
- local_roots = self.weighted_service_roots(md5_s, force_rebuild)
+ local_roots = self.weighted_service_roots(locator, force_rebuild)
for root in local_roots:
if root not in roots_map:
roots_map[root] = self.KeepService(root, self.session, **headers)
if ',' in loc_s:
return ''.join(self.get(x) for x in loc_s.split(','))
locator = KeepLocator(loc_s)
- expect_hash = locator.md5sum
- slot, first = self.block_cache.reserve_cache(expect_hash)
+ slot, first = self.block_cache.reserve_cache(locator.md5sum)
if not first:
v = slot.get()
return v
+ # If the locator has hints specifying a prefix (indicating a
+ # remote keepproxy) or the UUID of a local gateway service,
+ # read data from the indicated service(s) instead of the usual
+ # list of local disk services.
+ hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
+ for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
+ hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
+ for hint in locator.hints if (
+ hint.startswith('K@') and
+ len(hint) == 29 and
+ self._gateway_services.get(hint[2:])
+ )])
+ # Map root URLs to their KeepService objects.
+ roots_map = {root: self.KeepService(root, self.session) for root in hint_roots}
+
# See #3147 for a discussion of the loop implementation. Highlights:
# * Refresh the list of Keep services after each failure, in case
# it's being updated.
# * Retry until we succeed, we're out of retries, or every available
# service has returned permanent failure.
- hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
- for hint in locator.hints if hint.startswith('K@')]
- # Map root URLs their KeepService objects.
- roots_map = {root: self.KeepService(root, self.session) for root in hint_roots}
+ sorted_roots = []
+ roots_map = {}
blob = None
loop = retry.RetryLoop(num_retries, self._check_loop_result,
backoff_start=2)
for tries_left in loop:
try:
- local_roots = self.map_new_services(
- roots_map, expect_hash,
+ sorted_roots = self.map_new_services(
+ roots_map, locator,
force_rebuild=(tries_left < num_retries))
except Exception as error:
loop.save_result(error)
# Query KeepService objects that haven't returned
# permanent failure, in our specified shuffle order.
services_to_try = [roots_map[root]
- for root in (local_roots + hint_roots)
+ for root in sorted_roots
if roots_map[root].usable()]
for keep_service in services_to_try:
blob = keep_service.get(locator, timeout=self.current_timeout(num_retries-tries_left))
if loop.success():
return blob
- try:
- all_roots = local_roots + hint_roots
- except NameError:
- # We never successfully fetched local_roots.
- all_roots = hint_roots
# Q: Including 403 is necessary for the Keep tests to continue
# passing, but maybe they should expect KeepReadError instead?
- not_founds = sum(1 for key in all_roots
+ not_founds = sum(1 for key in sorted_roots
if roots_map[key].last_status() in {403, 404, 410})
service_errors = ((key, roots_map[key].last_result)
- for key in all_roots)
+ for key in sorted_roots)
if not roots_map:
raise arvados.errors.KeepReadError(
"failed to read {}: no Keep services available ({})".format(
loc_s, loop.last_result()))
- elif not_founds == len(all_roots):
+ elif not_founds == len(sorted_roots):
raise arvados.errors.NotFoundError(
"{} not found".format(loc_s), service_errors)
else:
data_hash = hashlib.md5(data).hexdigest()
if copies < 1:
return data_hash
+ locator = KeepLocator(data_hash + '+' + str(len(data)))
headers = {}
if self.using_proxy:
for tries_left in loop:
try:
local_roots = self.map_new_services(
- roots_map, data_hash,
+ roots_map, locator,
force_rebuild=(tries_left < num_retries), **headers)
except Exception as error:
loop.save_result(error)
service_type='disk',
service_host=None,
service_port=None,
- service_ssl_flag=False):
+ service_ssl_flag=False,
+ additional_services=[]):
if api_mock is None:
api_mock = self.api_client_mock()
body = {
'service_port': service_port or 65535-i,
'service_ssl_flag': service_ssl_flag,
'service_type': service_type,
- } for i in range(0, count)]
+ } for i in range(0, count)] + additional_services
}
self._mock_api_call(api_mock.keep_services().accessible, status, body)
return api_mock
keep0 = tempfile.mkdtemp()
port = find_available_port()
keep_cmd = ["keepstore",
- "-volumes={}".format(keep0),
+ "-volume={}".format(keep0),
"-listen=:{}".format(port),
"-pid={}".format("{}/keep{}.pid".format(TEST_TMPDIR, n))]
writer.write("world")
self.assertEqual(writer.read(20), "0123456789helloworld")
- self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 fc5e038d38a57032085441e7fe7010b0+10 0:20:count.txt\n", c.manifest_text())
+ self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 fc5e038d38a57032085441e7fe7010b0+10 0:20:count.txt\n", c.portable_manifest_text())
def test_write_at_beginning(self):
keep = ArvadosFileWriterTestCase.MockKeep({"781e5e245d69b566979b86e28d23f2c7+10": "0123456789"})
writer.write("foo")
self.assertEqual(writer.size(), 10)
self.assertEqual("foo3456789", writer.readfrom(0, 13))
- self.assertEqual(". acbd18db4cc2f85cedef654fccc4a4d8+3 781e5e245d69b566979b86e28d23f2c7+10 0:3:count.txt 6:7:count.txt\n", c.manifest_text())
+ self.assertEqual(". acbd18db4cc2f85cedef654fccc4a4d8+3 781e5e245d69b566979b86e28d23f2c7+10 0:3:count.txt 6:7:count.txt\n", c.portable_manifest_text())
def test_write_empty(self):
keep = ArvadosFileWriterTestCase.MockKeep({})
with Collection(keep_client=keep) as c:
writer = c.open("count.txt", "w")
self.assertEqual(writer.size(), 0)
- self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:count.txt\n", c.manifest_text())
+ self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:count.txt\n", c.portable_manifest_text())
+
+ def test_save_manifest_text(self):
+ keep = ArvadosFileWriterTestCase.MockKeep({})
+ with Collection(keep_client=keep) as c:
+ writer = c.open("count.txt", "w")
+ writer.write("0123456789")
+ self.assertEqual('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count.txt\n', c.portable_manifest_text())
+ self.assertNotIn('781e5e245d69b566979b86e28d23f2c7+10', keep.blocks)
+
+ self.assertEqual('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count.txt\n', c.save_new(create_collection_record=False))
+ self.assertIn('781e5e245d69b566979b86e28d23f2c7+10', keep.blocks)
+
+ def test_get_manifest_text_commits(self):
+ keep = ArvadosFileWriterTestCase.MockKeep({})
+ with Collection(keep_client=keep) as c:
+ writer = c.open("count.txt", "w")
+ writer.write("0123456789")
+ self.assertEqual('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count.txt\n', c.portable_manifest_text())
+ self.assertNotIn('781e5e245d69b566979b86e28d23f2c7+10', keep.blocks)
+ self.assertEqual('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count.txt\n', c.manifest_text())
+ self.assertIn('781e5e245d69b566979b86e28d23f2c7+10', keep.blocks)
+
def test_write_in_middle(self):
keep = ArvadosFileWriterTestCase.MockKeep({"781e5e245d69b566979b86e28d23f2c7+10": "0123456789"})
writer.write("foo")
self.assertEqual(writer.size(), 10)
self.assertEqual("012foo6789", writer.readfrom(0, 13))
- self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:count.txt 10:3:count.txt 6:4:count.txt\n", c.manifest_text())
+ self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:count.txt 10:3:count.txt 6:4:count.txt\n", c.portable_manifest_text())
def test_write_at_end(self):
keep = ArvadosFileWriterTestCase.MockKeep({"781e5e245d69b566979b86e28d23f2c7+10": "0123456789"})
writer.write("foo")
self.assertEqual(writer.size(), 10)
self.assertEqual("0123456foo", writer.readfrom(0, 13))
- self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 acbd18db4cc2f85cedef654fccc4a4d8+3 0:7:count.txt 10:3:count.txt\n", c.manifest_text())
+ self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 acbd18db4cc2f85cedef654fccc4a4d8+3 0:7:count.txt 10:3:count.txt\n", c.portable_manifest_text())
def test_write_across_segment_boundary(self):
keep = ArvadosFileWriterTestCase.MockKeep({"781e5e245d69b566979b86e28d23f2c7+10": "0123456789"})
writer.write("foobar")
self.assertEqual(writer.size(), 20)
self.assertEqual("0123456foobar34", writer.readfrom(0, 15))
- self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 3858f62230ac3c915f300c664312c63f+6 0:7:count.txt 10:6:count.txt 3:7:count.txt\n", c.manifest_text())
+ self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 3858f62230ac3c915f300c664312c63f+6 0:7:count.txt 10:6:count.txt 3:7:count.txt\n", c.portable_manifest_text())
def test_write_across_several_segments(self):
keep = ArvadosFileWriterTestCase.MockKeep({"781e5e245d69b566979b86e28d23f2c7+10": "0123456789"})
writer.write("abcdefg")
self.assertEqual(writer.size(), 12)
self.assertEqual("01abcdefg123", writer.readfrom(0, 15))
- self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 7ac66c0f148de9519b8bd264312c4d64+7 0:2:count.txt 10:7:count.txt 1:3:count.txt\n", c.manifest_text())
+ self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 7ac66c0f148de9519b8bd264312c4d64+7 0:2:count.txt 10:7:count.txt 1:3:count.txt\n", c.portable_manifest_text())
def test_write_large(self):
keep = ArvadosFileWriterTestCase.MockKeep({})
self.assertEqual(writer.size(), 10)
self.assertEqual("0123456789", writer.readfrom(0, 20))
- self.assertEqual(". 7a08b07e84641703e5f2c836aa59a170+100 90:10:count.txt\n", c.manifest_text())
+ self.assertEqual(". 7a08b07e84641703e5f2c836aa59a170+100 90:10:count.txt\n", c.portable_manifest_text())
writer.flush()
self.assertEqual(writer.size(), 10)
self.assertEqual("0123456789", writer.readfrom(0, 20))
- self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 0:10:count.txt\n", c.manifest_text())
+ self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 0:10:count.txt\n", c.portable_manifest_text())
def test_rewrite_append_existing_file(self):
keep = ArvadosFileWriterTestCase.MockKeep({"781e5e245d69b566979b86e28d23f2c7+10": "0123456789"})
self.assertEqual(writer.size(), 20)
self.assertEqual("0123456789abcdefghij", writer.readfrom(0, 20))
- self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 ae5f43bab79cf0be33f025fa97ae7398+100 0:10:count.txt 100:10:count.txt\n", c.manifest_text())
+ self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 ae5f43bab79cf0be33f025fa97ae7398+100 0:10:count.txt 100:10:count.txt\n", c.portable_manifest_text())
writer.arvadosfile.flush()
self.assertEqual(writer.size(), 20)
self.assertEqual("0123456789abcdefghij", writer.readfrom(0, 20))
- self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 a925576942e94b2ef57a066101b48876+10 0:20:count.txt\n", c.manifest_text())
+ self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 a925576942e94b2ef57a066101b48876+10 0:20:count.txt\n", c.portable_manifest_text())
def test_rewrite_over_existing_file(self):
keep = ArvadosFileWriterTestCase.MockKeep({"781e5e245d69b566979b86e28d23f2c7+10": "0123456789"})
self.assertEqual(writer.size(), 15)
self.assertEqual("01234abcdefghij", writer.readfrom(0, 20))
- self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 ae5f43bab79cf0be33f025fa97ae7398+100 0:5:count.txt 100:10:count.txt\n", c.manifest_text())
+ self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 ae5f43bab79cf0be33f025fa97ae7398+100 0:5:count.txt 100:10:count.txt\n", c.portable_manifest_text())
writer.arvadosfile.flush()
self.assertEqual(writer.size(), 15)
self.assertEqual("01234abcdefghij", writer.readfrom(0, 20))
- self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 a925576942e94b2ef57a066101b48876+10 0:5:count.txt 10:10:count.txt\n", c.manifest_text())
+ self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 a925576942e94b2ef57a066101b48876+10 0:5:count.txt 10:10:count.txt\n", c.portable_manifest_text())
def test_write_large_rewrite(self):
keep = ArvadosFileWriterTestCase.MockKeep({})
mockkeep = mock.MagicMock()
blockmanager = arvados.arvfile._BlockManager(mockkeep)
bufferblock = blockmanager.alloc_bufferblock()
+ bufferblock.owner = mock.MagicMock()
+ bufferblock.owner.flush.side_effect = lambda x: blockmanager.commit_bufferblock(bufferblock, False)
bufferblock.append("foo")
blockmanager.commit_all()
+ self.assertTrue(bufferblock.owner.flush.called)
self.assertTrue(mockkeep.put.called)
self.assertEqual(bufferblock.state(), arvados.arvfile._BufferBlock.COMMITTED)
self.assertIsNone(bufferblock.buffer_view)
mockkeep.put.side_effect = arvados.errors.KeepWriteError("fail")
blockmanager = arvados.arvfile._BlockManager(mockkeep)
bufferblock = blockmanager.alloc_bufferblock()
+ bufferblock.owner = mock.MagicMock()
+ bufferblock.owner.flush.side_effect = lambda x: blockmanager.commit_bufferblock(bufferblock, False)
bufferblock.append("foo")
with self.assertRaises(arvados.errors.KeepWriteError) as err:
blockmanager.commit_all()
+ self.assertTrue(bufferblock.owner.flush.called)
self.assertEqual(str(err.exception), "Error writing some blocks: block acbd18db4cc2f85cedef654fccc4a4d8+3 raised KeepWriteError (fail)")
self.assertEqual(bufferblock.state(), arvados.arvfile._BufferBlock.PENDING)
def test_remove(self):
c = Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt 0:10:count2.txt\n')
- self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt 0:10:count2.txt\n", c.manifest_text())
+ self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt 0:10:count2.txt\n", c.portable_manifest_text())
self.assertIn("count1.txt", c)
c.remove("count1.txt")
self.assertNotIn("count1.txt", c)
- self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 0:10:count2.txt\n", c.manifest_text())
+ self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 0:10:count2.txt\n", c.portable_manifest_text())
with self.assertRaises(arvados.errors.ArgumentError):
c.remove("")
def test_remove_in_subdir(self):
c = Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n./foo 781e5e245d69b566979b86e28d23f2c7+10 0:10:count2.txt\n')
c.remove("foo/count2.txt")
- self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n", c.manifest_text())
+ self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n", c.portable_manifest_text())
def test_remove_empty_subdir(self):
c = Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n./foo 781e5e245d69b566979b86e28d23f2c7+10 0:10:count2.txt\n')
c.remove("foo/count2.txt")
c.remove("foo")
- self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n", c.manifest_text())
+ self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n", c.portable_manifest_text())
def test_remove_nonempty_subdir(self):
c = Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n./foo 781e5e245d69b566979b86e28d23f2c7+10 0:10:count2.txt\n')
with self.assertRaises(IOError):
c.remove("foo")
c.remove("foo", recursive=True)
- self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n", c.manifest_text())
+ self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n", c.portable_manifest_text())
def test_copy_to_file_in_dir(self):
c = Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n')
c.copy("count1.txt", "foo/count2.txt")
- self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n./foo 781e5e245d69b566979b86e28d23f2c7+10 0:10:count2.txt\n", c.manifest_text())
+ self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n./foo 781e5e245d69b566979b86e28d23f2c7+10 0:10:count2.txt\n", c.portable_manifest_text())
def test_copy_file(self):
c = Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n')
c.copy("count1.txt", "count2.txt")
- self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt 0:10:count2.txt\n", c.manifest_text())
+ self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt 0:10:count2.txt\n", c.portable_manifest_text())
def test_copy_to_existing_dir(self):
c = Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n./foo 781e5e245d69b566979b86e28d23f2c7+10 0:10:count2.txt\n')
c.copy("count1.txt", "foo")
- self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n./foo 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt 0:10:count2.txt\n", c.manifest_text())
+ self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n./foo 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt 0:10:count2.txt\n", c.portable_manifest_text())
def test_copy_to_new_dir(self):
c = Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n')
c.copy("count1.txt", "foo/")
- self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n./foo 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n", c.manifest_text())
+ self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n./foo 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n", c.portable_manifest_text())
def test_rename_file(self):
c = Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n')
def test_clone(self):
c = Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n./foo 781e5e245d69b566979b86e28d23f2c7+10 0:10:count2.txt\n')
cl = c.clone()
- self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n./foo 781e5e245d69b566979b86e28d23f2c7+10 0:10:count2.txt\n", cl.manifest_text())
+ self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n./foo 781e5e245d69b566979b86e28d23f2c7+10 0:10:count2.txt\n", cl.portable_manifest_text())
def test_diff_del_add(self):
c1 = Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n')
d = c1.diff(c2)
self.assertEqual(d, [('del', './count1.txt', c1["count1.txt"]),
('add', './count2.txt', c2["count2.txt"])])
- self.assertNotEqual(c1.manifest_text(), c2.manifest_text())
+ self.assertNotEqual(c1.portable_manifest_text(), c2.portable_manifest_text())
c1.apply(d)
- self.assertEqual(c1.manifest_text(), c2.manifest_text())
+ self.assertEqual(c1.portable_manifest_text(), c2.portable_manifest_text())
def test_diff_same(self):
c1 = Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n')
d = c1.diff(c2)
self.assertEqual(d, [])
- self.assertEqual(c1.manifest_text(), c2.manifest_text())
+ self.assertEqual(c1.portable_manifest_text(), c2.portable_manifest_text())
c1.apply(d)
- self.assertEqual(c1.manifest_text(), c2.manifest_text())
+ self.assertEqual(c1.portable_manifest_text(), c2.portable_manifest_text())
def test_diff_mod(self):
c1 = Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n')
d = c1.diff(c2)
self.assertEqual(d, [('mod', './count1.txt', c1["count1.txt"], c2["count1.txt"])])
- self.assertNotEqual(c1.manifest_text(), c2.manifest_text())
+ self.assertNotEqual(c1.portable_manifest_text(), c2.portable_manifest_text())
c1.apply(d)
- self.assertEqual(c1.manifest_text(), c2.manifest_text())
+ self.assertEqual(c1.portable_manifest_text(), c2.portable_manifest_text())
def test_diff_add(self):
c1 = Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n')
d = c1.diff(c2)
self.assertEqual(d, [('add', './count2.txt', c2["count2.txt"])])
- self.assertNotEqual(c1.manifest_text(), c2.manifest_text())
+ self.assertNotEqual(c1.portable_manifest_text(), c2.portable_manifest_text())
c1.apply(d)
- self.assertEqual(c1.manifest_text(), c2.manifest_text())
+ self.assertEqual(c1.portable_manifest_text(), c2.portable_manifest_text())
def test_diff_add_in_subcollection(self):
c1 = Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n')
d = c1.diff(c2)
self.assertEqual(d, [('add', './foo', c2["foo"])])
- self.assertNotEqual(c1.manifest_text(), c2.manifest_text())
+ self.assertNotEqual(c1.portable_manifest_text(), c2.portable_manifest_text())
c1.apply(d)
- self.assertEqual(c1.manifest_text(), c2.manifest_text())
+ self.assertEqual(c1.portable_manifest_text(), c2.portable_manifest_text())
def test_diff_del_add_in_subcollection(self):
c1 = Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n./foo 5348b82a029fd9e971a811ce1f71360b+43 0:10:count2.txt\n')
self.assertEqual(d, [('del', './foo/count2.txt', c1.find("foo/count2.txt")),
('add', './foo/count3.txt', c2.find("foo/count3.txt"))])
- self.assertNotEqual(c1.manifest_text(), c2.manifest_text())
+ self.assertNotEqual(c1.portable_manifest_text(), c2.portable_manifest_text())
c1.apply(d)
- self.assertEqual(c1.manifest_text(), c2.manifest_text())
+ self.assertEqual(c1.portable_manifest_text(), c2.portable_manifest_text())
def test_diff_mod_in_subcollection(self):
c1 = Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n./foo 5348b82a029fd9e971a811ce1f71360b+43 0:10:count2.txt\n')
d = c1.diff(c2)
self.assertEqual(d, [('mod', './foo', c1["foo"], c2["foo"])])
- self.assertNotEqual(c1.manifest_text(), c2.manifest_text())
+ self.assertNotEqual(c1.portable_manifest_text(), c2.portable_manifest_text())
c1.apply(d)
- self.assertEqual(c1.manifest_text(), c2.manifest_text())
+ self.assertEqual(c1.portable_manifest_text(), c2.portable_manifest_text())
def test_conflict_keep_local_change(self):
c1 = Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n')
# c1 changed, so it should not be deleted.
c1.apply(d)
- self.assertEqual(c1.manifest_text(), ". 95ebc3c7b3b9f1d2c40fec14415d3cb8+5 5348b82a029fd9e971a811ce1f71360b+43 0:5:count1.txt 5:10:count2.txt\n")
+ self.assertEqual(c1.portable_manifest_text(), ". 95ebc3c7b3b9f1d2c40fec14415d3cb8+5 5348b82a029fd9e971a811ce1f71360b+43 0:5:count1.txt 5:10:count2.txt\n")
def test_conflict_mod(self):
c1 = Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt')
# c1 changed, so c2 mod will go to a conflict file
c1.apply(d)
- self.assertRegexpMatches(c1.manifest_text(), r"\. 95ebc3c7b3b9f1d2c40fec14415d3cb8\+5 5348b82a029fd9e971a811ce1f71360b\+43 0:5:count1\.txt 5:10:count1\.txt~conflict-\d\d\d\d-\d\d-\d\d-\d\d:\d\d:\d\d~$")
+ self.assertRegexpMatches(c1.portable_manifest_text(), r"\. 95ebc3c7b3b9f1d2c40fec14415d3cb8\+5 5348b82a029fd9e971a811ce1f71360b\+43 0:5:count1\.txt 5:10:count1\.txt~conflict-\d\d\d\d-\d\d-\d\d-\d\d:\d\d:\d\d~$")
def test_conflict_add(self):
c1 = Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count2.txt\n')
# c1 added count1.txt, so c2 add will go to a conflict file
c1.apply(d)
- self.assertRegexpMatches(c1.manifest_text(), r"\. 95ebc3c7b3b9f1d2c40fec14415d3cb8\+5 5348b82a029fd9e971a811ce1f71360b\+43 0:5:count1\.txt 5:10:count1\.txt~conflict-\d\d\d\d-\d\d-\d\d-\d\d:\d\d:\d\d~$")
+ self.assertRegexpMatches(c1.portable_manifest_text(), r"\. 95ebc3c7b3b9f1d2c40fec14415d3cb8\+5 5348b82a029fd9e971a811ce1f71360b\+43 0:5:count1\.txt 5:10:count1\.txt~conflict-\d\d\d\d-\d\d-\d\d-\d\d:\d\d:\d\d~$")
def test_conflict_del(self):
c1 = Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt')
# c1 deleted, so c2 mod will go to a conflict file
c1.apply(d)
- self.assertRegexpMatches(c1.manifest_text(), r"\. 5348b82a029fd9e971a811ce1f71360b\+43 0:10:count1\.txt~conflict-\d\d\d\d-\d\d-\d\d-\d\d:\d\d:\d\d~$")
+ self.assertRegexpMatches(c1.portable_manifest_text(), r"\. 5348b82a029fd9e971a811ce1f71360b\+43 0:10:count1\.txt~conflict-\d\d\d\d-\d\d-\d\d-\d\d:\d\d:\d\d~$")
def test_notify(self):
c1 = Collection()
with c.open("count.txt", "w") as f:
f.write("0123456789")
- self.assertEqual(c.manifest_text(), ". 781e5e245d69b566979b86e28d23f2c7+10 0:10:count.txt\n")
+ self.assertEqual(c.portable_manifest_text(), ". 781e5e245d69b566979b86e28d23f2c7+10 0:10:count.txt\n")
return c
c.save()
self.assertRegexpMatches(c.manifest_text(), r"^\. 781e5e245d69b566979b86e28d23f2c7\+10\+A[a-f0-9]{40}@[a-f0-9]{8} 0:10:count\.txt$",)
-
def test_create_and_save_new(self):
c = self.create_count_txt()
c.save_new()
class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
def get_service_roots(self, api_client):
keep_client = arvados.KeepClient(api_client=api_client)
- services = keep_client.weighted_service_roots('000000')
+ services = keep_client.weighted_service_roots(arvados.KeepLocator('0'*32))
return [urlparse.urlparse(url) for url in sorted(services)]
def test_ssl_flag_respected_in_roots(self):
api_client = self.mock_keep_services(count=16)
keep_client = arvados.KeepClient(api_client=api_client)
for i, hash in enumerate(hashes):
- roots = keep_client.weighted_service_roots(hash)
+ roots = keep_client.weighted_service_roots(arvados.KeepLocator(hash))
got_order = [
re.search(r'//\[?keep0x([0-9a-f]+)', root).group(1)
for root in roots]
api_client = self.mock_keep_services(count=initial_services)
keep_client = arvados.KeepClient(api_client=api_client)
probes_before = [
- keep_client.weighted_service_roots(hash) for hash in hashes]
+ keep_client.weighted_service_roots(arvados.KeepLocator(hash)) for hash in hashes]
for added_services in range(1, 12):
api_client = self.mock_keep_services(count=initial_services+added_services)
keep_client = arvados.KeepClient(api_client=api_client)
total_penalty = 0
for hash_index in range(len(hashes)):
probe_after = keep_client.weighted_service_roots(
- hashes[hash_index])
+ arvados.KeepLocator(hashes[hash_index]))
penalty = probe_after.index(probes_before[hash_index][0])
self.assertLessEqual(penalty, added_services)
total_penalty += penalty
self.assertEqual(2, len(exc_check.exception.request_errors()))
+class KeepClientGatewayTestCase(unittest.TestCase, tutil.ApiClientMock):
+ def mock_disks_and_gateways(self, disks=3, gateways=1):
+ self.gateways = [{
+ 'uuid': 'zzzzz-bi6l4-gateway{:08d}'.format(i),
+ 'owner_uuid': 'zzzzz-tpzed-000000000000000',
+ 'service_host': 'gatewayhost{}'.format(i),
+ 'service_port': 12345,
+ 'service_ssl_flag': True,
+ 'service_type': 'gateway:test',
+ } for i in range(gateways)]
+ self.gateway_roots = [
+ "https://[{service_host}]:{service_port}/".format(**gw)
+ for gw in self.gateways]
+ self.api_client = self.mock_keep_services(
+ count=disks, additional_services=self.gateways)
+ self.keepClient = arvados.KeepClient(api_client=self.api_client)
+
+ @mock.patch('requests.Session')
+ def test_get_with_gateway_hint_first(self, MockSession):
+ MockSession.return_value.get.return_value = tutil.fake_requests_response(
+ code=200, body='foo', headers={'Content-Length': 3})
+ self.mock_disks_and_gateways()
+ locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@' + self.gateways[0]['uuid']
+ self.assertEqual('foo', self.keepClient.get(locator))
+ self.assertEqual((self.gateway_roots[0]+locator,),
+ MockSession.return_value.get.call_args_list[0][0])
+
+ @mock.patch('requests.Session')
+ def test_get_with_gateway_hints_in_order(self, MockSession):
+ gateways = 4
+ disks = 3
+ MockSession.return_value.get.return_value = tutil.fake_requests_response(
+ code=404, body='')
+ self.mock_disks_and_gateways(gateways=gateways, disks=disks)
+ locator = '+'.join(['acbd18db4cc2f85cedef654fccc4a4d8+3'] +
+ ['K@'+gw['uuid'] for gw in self.gateways])
+ with self.assertRaises(arvados.errors.NotFoundError):
+ self.keepClient.get(locator)
+ # Gateways are tried first, in the order given.
+ for i, root in enumerate(self.gateway_roots):
+ self.assertEqual((root+locator,),
+ MockSession.return_value.get.call_args_list[i][0])
+ # Disk services are tried next.
+ for i in range(gateways, gateways+disks):
+ self.assertRegexpMatches(
+ MockSession.return_value.get.call_args_list[i][0][0],
+ r'keep0x')
+
+ @mock.patch('requests.Session')
+ def test_get_with_remote_proxy_hint(self, MockSession):
+ MockSession.return_value.get.return_value = tutil.fake_requests_response(
+ code=200, body='foo', headers={'Content-Length': 3})
+ self.mock_disks_and_gateways()
+ locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@xyzzy'
+ self.assertEqual('foo', self.keepClient.get(locator))
+ self.assertEqual(('https://keep.xyzzy.arvadosapi.com/'+locator,),
+ MockSession.return_value.get.call_args_list[0][0])
+
+
class KeepClientRetryTestMixin(object):
# Testing with a local Keep store won't exercise the retry behavior.
# Instead, our strategy is:
loc_list = LocatorList.new(locators)
file_specs.map { |s| manifest.split_file_token(s) }.
each do |file_start, file_len, file_path|
- @root.file_at(normalize_path(stream_root, file_path)).
- add_segment(loc_list.segment(file_start, file_len))
+ begin
+ @root.file_at(normalize_path(stream_root, file_path)).
+ add_segment(loc_list.segment(file_start, file_len))
+ rescue Errno::ENOTDIR, Errno::EISDIR => error
+ raise ArgumentError.new("%p is both a stream and file" %
+ error.to_s.partition(" - ").last)
+ end
end
end
end
copy(:merge, source.chomp("/"), target, source_collection, opts)
end
+ def each_file_path(&block)
+ @root.each_file_path(&block)
+ end
+
+ def exist?(path)
+ begin
+ substream, item = find(path)
+ not (substream.leaf? or substream[item].nil?)
+ rescue Errno::ENOENT, Errno::ENOTDIR
+ false
+ end
+ end
+
def rename(source, target)
copy(:add_copy, source, target) { rm_r(source) }
end
# is found and can be copied.
source_collection = self if source_collection.nil?
src_stream, src_tail = source_collection.find(source)
- dst_stream, dst_tail = find(target)
+ dst_stream_path, _, dst_tail = normalize_path(target).rpartition("/")
+ if dst_stream_path.empty?
+ dst_stream, dst_tail = @root.find(dst_tail)
+ dst_tail ||= src_tail
+ else
+ dst_stream = @root.stream_at(dst_stream_path)
+ dst_tail = src_tail if dst_tail.empty?
+ end
if (source_collection.equal?(self) and
(src_stream.path == dst_stream.path) and (src_tail == dst_tail))
return self
end
src_item = src_stream[src_tail]
- dst_tail ||= src_tail
check_method = "check_can_#{copy_method}".to_sym
target_name = nil
if opts.fetch(:descend_target, true)
end
end
+ def each_file_path
+ return to_enum(__method__) unless block_given?
+ items.each_value do |item|
+ if item.file?
+ yield item.path
+ else
+ item.each_file_path { |path| yield path }
+ end
+ end
+ end
+
def find(find_path)
# Given a POSIX-style path, return the CollectionStream that
# contains the object at that path, and the name of the object
def stream_at(find_path)
key, rest = find_path.split("/", 2)
- next_stream = get_or_new(key, CollectionStream)
+ next_stream = get_or_new(key, CollectionStream, Errno::ENOTDIR)
if rest.nil?
next_stream
else
def file_at(find_path)
stream_path, _, file_name = find_path.rpartition("/")
if stream_path.empty?
- get_or_new(file_name, CollectionFile)
+ get_or_new(file_name, CollectionFile, Errno::EISDIR)
else
stream_at(stream_path).file_at(file_name)
end
items[key] = item
end
- def get_or_new(key, klass)
+ def get_or_new(key, klass, err_class)
# Return the collection item at `key` and ensure that it's a `klass`.
# If `key` does not exist, create a new `klass` there.
- # If the value for `key` is not a `klass`, raise an ArgumentError.
+ # If the value for `key` is not a `klass`, raise an `err_class`.
item = items[key]
if item.nil?
self[key] = klass.new("#{path}/#{key}")
elsif not item.is_a?(klass)
- raise ArgumentError.
- new("in stream %p, %p is a %s, not a %s" %
- [path, key, items[key].class.human_name, klass.human_name])
+ raise err_class.new(item.path)
else
item
end
assert_equal(expected.join(""), coll.manifest_text)
end
- def test_copy_stream_over_file_raises_ENOTDIR
+ def test_copy_stream_over_file_raises_ENOTDIR(source="./s1", target="./f2")
coll = Arv::Collection.new(TWO_BY_TWO_MANIFEST_S)
assert_raises(Errno::ENOTDIR) do
- coll.cp_r("./s1", "./f2")
+ coll.cp_r(source, target)
end
end
+ def test_copy_file_under_file_raises_ENOTDIR
+ test_copy_stream_over_file_raises_ENOTDIR("./f1", "./f2/newfile")
+ end
+
def test_copy_stream_over_nonempty_stream_merges_and_overwrites
blocks = random_blocks(3, 9)
manifest_a =
assert_equal(expect_lines.join(""), coll.manifest_text)
end
+ def test_copy_file_into_new_stream_with_implicit_filename
+ coll = Arv::Collection.new(SIMPLEST_MANIFEST)
+ coll.cp_r("./simple.txt", "./new/")
+ assert_equal(SIMPLEST_MANIFEST + SIMPLEST_MANIFEST.sub(". ", "./new "),
+ coll.manifest_text)
+ end
+
+ def test_copy_file_into_new_stream_with_explicit_filename
+ coll = Arv::Collection.new(SIMPLEST_MANIFEST)
+ coll.cp_r("./simple.txt", "./new/newfile.txt")
+ new_line = SIMPLEST_MANIFEST.sub(". ", "./new ").sub(":simple", ":newfile")
+ assert_equal(SIMPLEST_MANIFEST + new_line, coll.manifest_text)
+ end
+
def test_copy_stream_contents_into_root
coll = Arv::Collection.new(TWO_BY_TWO_MANIFEST_S)
coll.cp_r("./s1/", ".")
test_copy_empty_source_path_raises_ArgumentError(".", "")
end
+ ### .each_file_path
+
+ def test_each_file_path
+ coll = Arv::Collection.new(TWO_BY_TWO_MANIFEST_S)
+ if block_given?
+ result = yield(coll)
+ else
+ result = []
+ coll.each_file_path { |path| result << path }
+ end
+ assert_equal(["./f1", "./f2", "./s1/f1", "./s1/f3"], result.sort)
+ end
+
+ def test_each_file_path_without_block
+ test_each_file_path { |coll| coll.each_file_path.to_a }
+ end
+
+ def test_each_file_path_empty_collection
+ assert_empty(Arv::Collection.new.each_file_path.to_a)
+ end
+
+ def test_each_file_path_after_collection_emptied
+ coll = Arv::Collection.new(SIMPLEST_MANIFEST)
+ coll.rm("simple.txt")
+ assert_empty(coll.each_file_path.to_a)
+ end
+
+ def test_each_file_path_deduplicates_manifest_listings
+ coll = Arv::Collection.new(MULTIBLOCK_FILE_MANIFEST)
+ assert_equal(["./repfile", "./s1/repfile", "./s1/uniqfile",
+ "./uniqfile", "./uniqfile2"],
+ coll.each_file_path.to_a.sort)
+ end
+
+ ### .exist?
+
+ def test_exist(test_method=:assert, path="f2")
+ coll = Arv::Collection.new(TWO_BY_TWO_MANIFEST_S)
+ send(test_method, coll.exist?(path))
+ end
+
+ def test_file_not_exist
+ test_exist(:refute, "f3")
+ end
+
+ def test_stream_exist
+ test_exist(:assert, "s1")
+ end
+
+ def test_file_inside_stream_exist
+ test_exist(:assert, "s1/f1")
+ end
+
+ def test_path_inside_stream_not_exist
+ test_exist(:refute, "s1/f2")
+ end
+
+ def test_path_under_file_not_exist
+ test_exist(:refute, "f2/nonexistent")
+ end
+
+ def test_deep_substreams_not_exist
+ test_exist(:refute, "a/b/c/d/e/f/g")
+ end
+
### .rename
def test_simple_file_rename
# still mandatory.
gem 'simplecov', '~> 0.7.1', require: false
gem 'simplecov-rcov', require: false
+ gem 'mocha', require: false
end
# This might not be needed in :test and :development, but we load it
mail (2.5.4)
mime-types (~> 1.16)
treetop (~> 1.4.8)
+ metaclass (0.0.4)
mime-types (1.25.1)
+ mocha (1.1.0)
+ metaclass (~> 0.0.1)
multi_json (1.10.1)
multipart-post (1.2.0)
net-scp (1.2.0)
faye-websocket
google-api-client (~> 0.6.3)
jquery-rails
+ mocha
multi_json
oj
omniauth (= 1.1.1)
else
raise ArgumentError.new("unknown attribute for git filter: #{attr}")
end
- version_range = Commit.find_commit_range(current_user,
- filter["repository"],
- filter["min_version"],
- filter["max_version"],
- filter["exclude_versions"])
- if version_range.nil?
+ revisions = Commit.find_commit_range(filter["repository"],
+ filter["min_version"],
+ filter["max_version"],
+ filter["exclude_versions"])
+ if revisions.empty?
raise ArgumentError.
new("error searching #{filter['repository']} from " +
"'#{filter['min_version']}' to '#{filter['max_version']}', " +
"excluding #{filter['exclude_versions']}")
end
- @filters.append([attr, "in", version_range])
+ @filters.append([attr, "in", revisions])
end
end
class Commit < ActiveRecord::Base
- require 'shellwords'
+ extend CurrentApiClient
+
+ class GitError < StandardError
+ def http_status
+ 422
+ end
+ end
def self.git_check_ref_format(e)
if !e or e.empty? or e[0] == '-' or e[0] == '$'
end
end
- def self.find_commit_range(current_user, repository, minimum, maximum, exclude)
+ # Return an array of commits (each a 40-char sha1) satisfying the
+ # given criteria.
+ #
+ # Return [] if the revisions given in minimum/maximum are invalid or
+ # don't exist in the given repository.
+ #
+ # Raise ArgumentError if the given repository is invalid, does not
+ # exist, or cannot be read for any reason. (Any transient error that
+ # prevents commit ranges from resolving must raise rather than
+ # returning an empty array.)
+ #
+ # repository can be the name of a locally hosted repository or a git
+ # URL (see git-fetch(1)). Currently http, https, and git schemes are
+ # supported.
+ def self.find_commit_range repository, minimum, maximum, exclude
if minimum and minimum.empty?
minimum = nil
end
if minimum and !git_check_ref_format(minimum)
logger.warn "find_commit_range called with invalid minimum revision: '#{minimum}'"
- return nil
+ return []
end
if maximum and !git_check_ref_format(maximum)
logger.warn "find_commit_range called with invalid maximum revision: '#{maximum}'"
- return nil
+ return []
end
if !maximum
maximum = "HEAD"
end
- # Get list of actual repository directories under management
- on_disk_repos = repositories
-
- # Get list of repository objects readable by user
- readable = Repository.readable_by(current_user)
-
- # filter repository objects on requested repository name
- if repository
- readable = readable.where(name: repository)
- end
+ gitdir, is_remote = git_dir_for repository
+ fetch_remote_repository gitdir, repository if is_remote
+ ENV['GIT_DIR'] = gitdir
commits = []
- readable.each do |r|
- if on_disk_repos[r.name]
- ENV['GIT_DIR'] = on_disk_repos[r.name][:git_dir]
- # We've filtered for invalid characters, so we can pass the contents of
- # minimum and maximum safely on the command line
+ # Get the commit hash for the upper bound
+ max_hash = nil
+ IO.foreach("|git rev-list --max-count=1 #{maximum.shellescape} --") do |line|
+ max_hash = line.strip
+ end
- # Get the commit hash for the upper bound
- max_hash = nil
- IO.foreach("|git rev-list --max-count=1 #{maximum.shellescape} --") do |line|
- max_hash = line.strip
- end
+ # If not found or string is invalid, nothing else to do
+ return [] if !max_hash or !git_check_ref_format(max_hash)
- # If not found or string is invalid, nothing else to do
- next if !max_hash or !git_check_ref_format(max_hash)
-
- resolved_exclude = nil
- if exclude
- resolved_exclude = []
- exclude.each do |e|
- if git_check_ref_format(e)
- IO.foreach("|git rev-list --max-count=1 #{e.shellescape} --") do |line|
- resolved_exclude.push(line.strip)
- end
- else
- logger.warn "find_commit_range called with invalid exclude invalid characters: '#{exclude}'"
- return nil
- end
+ resolved_exclude = nil
+ if exclude
+ resolved_exclude = []
+ exclude.each do |e|
+ if git_check_ref_format(e)
+ IO.foreach("|git rev-list --max-count=1 #{e.shellescape} --") do |line|
+ resolved_exclude.push(line.strip)
end
+ else
+ logger.warn "find_commit_range called with invalid exclude invalid characters: '#{exclude}'"
+ return []
end
+ end
+ end
- if minimum
- # Get the commit hash for the lower bound
- min_hash = nil
- IO.foreach("|git rev-list --max-count=1 #{minimum.shellescape} --") do |line|
- min_hash = line.strip
- end
-
- # If not found or string is invalid, nothing else to do
- next if !min_hash or !git_check_ref_format(min_hash)
+ if minimum
+ # Get the commit hash for the lower bound
+ min_hash = nil
+ IO.foreach("|git rev-list --max-count=1 #{minimum.shellescape} --") do |line|
+ min_hash = line.strip
+ end
- # Now find all commits between them
- IO.foreach("|git rev-list #{min_hash.shellescape}..#{max_hash.shellescape} --") do |line|
- hash = line.strip
- commits.push(hash) if !resolved_exclude or !resolved_exclude.include? hash
- end
+ # If not found or string is invalid, nothing else to do
+ return [] if !min_hash or !git_check_ref_format(min_hash)
- commits.push(min_hash) if !resolved_exclude or !resolved_exclude.include? min_hash
- else
- commits.push(max_hash) if !resolved_exclude or !resolved_exclude.include? max_hash
- end
- else
- logger.warn "Repository #{r.name} exists in table but not found on disk"
+ # Now find all commits between them
+ IO.foreach("|git rev-list #{min_hash.shellescape}..#{max_hash.shellescape} --") do |line|
+ hash = line.strip
+ commits.push(hash) if !resolved_exclude or !resolved_exclude.include? hash
end
- end
- if !commits or commits.empty?
- nil
+ commits.push(min_hash) if !resolved_exclude or !resolved_exclude.include? min_hash
else
- commits
+ commits.push(max_hash) if !resolved_exclude or !resolved_exclude.include? max_hash
end
+
+ commits
end
- # Import all commits from configured git directory into the commits
- # database.
-
- def self.import_all
- repositories.each do |repo_name, repo|
- stat = { true => 0, false => 0 }
- ENV['GIT_DIR'] = repo[:git_dir]
- IO.foreach("|git rev-list --format=oneline --all") do |line|
- sha1, message = line.strip.split " ", 2
- imported = false
- Commit.find_or_create_by_repository_name_and_sha1_and_message(repo_name, sha1, message[0..254]) do
- imported = true
- end
- stat[!!imported] += 1
- if (stat[true] + stat[false]) % 100 == 0
- if $stdout.tty? or ARGV[0] == '-v'
- puts "#{$0} #{$$}: repo #{repo_name} add #{stat[true]} skip #{stat[false]}"
- end
- end
- end
- if $stdout.tty? or ARGV[0] == '-v'
- puts "#{$0} #{$$}: repo #{repo_name} add #{stat[true]} skip #{stat[false]}"
- end
+ # Given a repository (url, or name of hosted repo) and commit sha1,
+ # copy the commit into the internal git repo and tag it with the
+ # given tag (typically a job UUID).
+ #
+ # The repo can be a remote url, but in this case sha1 must already
+ # be present in our local cache for that repo: e.g., sha1 was just
+ # returned by find_commit_range.
+ def self.tag_in_internal_repository repo_name, sha1, tag
+ unless git_check_ref_format tag
+ raise ArgumentError.new "invalid tag #{tag}"
+ end
+ unless /^[0-9a-f]{40}$/ =~ sha1
+ raise ArgumentError.new "invalid sha1 #{sha1}"
+ end
+ src_gitdir, _ = git_dir_for repo_name
+ unless src_gitdir
+ raise ArgumentError.new "no local repository for #{repo_name}"
end
+ dst_gitdir = Rails.configuration.git_internal_dir
+ must_pipe("echo #{sha1.shellescape}",
+ "git --git-dir #{src_gitdir.shellescape} pack-objects -q --revs --stdout",
+ "git --git-dir #{dst_gitdir.shellescape} unpack-objects -q")
+ must_git(dst_gitdir,
+ "tag --force #{tag.shellescape} #{sha1.shellescape}")
end
- def self.refresh_repositories
- @repositories = nil
+ protected
+
+ def self.remote_url? repo_name
+ /^(https?|git):\/\// =~ repo_name
end
- protected
+ # Return [local_git_dir, is_remote]. If is_remote, caller must use
+ # fetch_remote_repository to ensure content is up-to-date.
+ #
+ # Raises an exception if the latest content could not be fetched for
+ # any reason.
+ def self.git_dir_for repo_name
+ if remote_url? repo_name
+ return [cache_dir_for(repo_name), true]
+ end
+ repos = Repository.readable_by(current_user).where(name: repo_name)
+ if repos.count == 0
+ raise ArgumentError.new "Repository not found: '#{repo_name}'"
+ elsif repos.count > 1
+ logger.error "Multiple repositories with name=='#{repo_name}'!"
+ raise ArgumentError.new "Name conflict"
+ else
+ return [repos.first.server_path, false]
+ end
+ end
+
+ def self.cache_dir_for git_url
+ File.join(cache_dir_base, Digest::SHA1.hexdigest(git_url) + ".git").to_s
+ end
- def self.repositories
- return @repositories if @repositories
+ def self.cache_dir_base
+ Rails.root.join 'tmp', 'git'
+ end
- @repositories = {}
- Repository.find_each do |repo|
- if git_dir = repo.server_path
- @repositories[repo.name] = {git_dir: git_dir}
- end
- end
+ def self.fetch_remote_repository gitdir, git_url
+ # Caller decides which protocols are worth using. This is just a
+ # safety check to ensure we never use urls like "--flag" or wander
+ # into git's hardlink features by using bare "/path/foo" instead
+ # of "file:///path/foo".
+ unless /^[a-z]+:\/\// =~ git_url
+ raise ArgumentError.new "invalid git url #{git_url}"
+ end
+ begin
+ must_git gitdir, "branch"
+ rescue GitError => e
+ raise unless /Not a git repository/ =~ e.to_s
+ # OK, this just means we need to create a blank cache repository
+ # before fetching.
+ FileUtils.mkdir_p gitdir
+ must_git gitdir, "init"
+ end
+ must_git(gitdir,
+ "fetch --no-progress --tags --prune --force --update-head-ok #{git_url.shellescape} 'refs/heads/*:refs/heads/*'")
+ end
- @repositories
- end
+ def self.must_git gitdir, *cmds
+ # Clear token in case a git helper tries to use it as a password.
+ orig_token = ENV['ARVADOS_API_TOKEN']
+ ENV['ARVADOS_API_TOKEN'] = ''
+ begin
+ git = "git --git-dir #{gitdir.shellescape}"
+ cmds.each do |cmd|
+ must_pipe git+" "+cmd
+ end
+ ensure
+ ENV['ARVADOS_API_TOKEN'] = orig_token
+ end
+ end
+
+ def self.must_pipe *cmds
+ cmd = cmds.join(" 2>&1 |") + " 2>&1"
+ out = IO.read("| </dev/null #{cmd}")
+ if not $?.success?
+ raise GitError.new "#{cmd}: #{$?}: #{out}"
+ end
+ end
end
validate :validate_status
validate :validate_state_change
validate :ensure_no_collection_uuids_in_script_params
+ before_save :tag_version_in_internal_repository
before_save :update_timestamps_when_state_changes
has_many :commit_ancestors, :foreign_key => :descendant, :primary_key => :script_version
end
def ensure_script_version_is_commit
- if self.state == Running
+ if state == Running
# Apparently client has already decided to go for it. This is
# needed to run a local job using a local working directory
# instead of a commit-ish.
return true
end
- if new_record? or script_version_changed?
- sha1 = Commit.find_commit_range(current_user, self.repository, nil, self.script_version, nil)[0] rescue nil
- if sha1
- self.supplied_script_version = self.script_version if self.supplied_script_version.nil? or self.supplied_script_version.empty?
- self.script_version = sha1
- else
- self.errors.add :script_version, "#{self.script_version} does not resolve to a commit"
+ if new_record? or repository_changed? or script_version_changed?
+ sha1 = Commit.find_commit_range(repository,
+ nil, script_version, nil).first
+ if not sha1
+ errors.add :script_version, "#{script_version} does not resolve to a commit"
return false
end
+ if supplied_script_version.nil? or supplied_script_version.empty?
+ self.supplied_script_version = script_version
+ end
+ self.script_version = sha1
+ end
+ true
+ end
+
+ def tag_version_in_internal_repository
+ if state == Running
+ # No point now. See ensure_script_version_is_commit.
+ true
+ elsif errors.any?
+ # Won't be saved, and script_version might not even be valid.
+ true
+ elsif new_record? or repository_changed? or script_version_changed?
+ uuid_was = uuid
+ begin
+ assign_uuid
+ Commit.tag_in_internal_repository repository, script_version, uuid
+ rescue
+ uuid = uuid_was
+ raise
+ end
end
end
def find_arvados_sdk_version
resolve_runtime_constraint("arvados_sdk_version",
:arvados_sdk_version) do |git_search|
- commits = Commit.find_commit_range(current_user, "arvados",
+ commits = Commit.find_commit_range("arvados",
nil, git_search, nil)
- if commits.nil? or commits.empty?
+ if commits.empty?
[false, "#{git_search} does not resolve to a commit"]
elsif not runtime_constraints["docker_image"]
[false, "cannot be specified without a Docker image constraint"]
def setup_repo_vm_links(repo_name, vm_uuid, openid_prefix)
oid_login_perm = create_oid_login_perm openid_prefix
repo_perm = create_user_repo_link repo_name
- vm_login_perm = create_vm_login_permission_link vm_uuid, repo_name
+ vm_login_perm = create_vm_login_permission_link vm_uuid, username
group_perm = create_user_group_link
return [oid_login_perm, repo_perm, vm_login_perm, group_perm, self].compact
blob_signing_key: zfhgfenhffzltr9dixws36j1yhksjoll2grmku38mi7yxd66h5j4q9w4jzanezacp8s6q0ro3hxakfye02152hncy6zml2ed0uc
user_profile_notification_address: arvados@example.com
workbench_address: https://localhost:3001/
+ git_repositories_dir: <%= Rails.root.join 'tmp', 'git', 'test' %>
+ git_internal_dir: <%= Rails.root.join 'tmp', 'internal.git' %>
common:
# The prefix used for all database identifiers to identify the record as
+begin
+ # If secret_token.rb exists here, we need to load it first.
+ require_relative 'secret_token.rb'
+rescue LoadError
+ # Normally secret_token.rb is missing and the secret token is
+ # configured by application.yml (i.e., here!) instead.
+end
+
$application_config = {}
%w(application.default application).each do |cfgfile|
if File.exists? path
yaml = ERB.new(IO.read path).result(binding)
confs = YAML.load(yaml)
+ # Ignore empty YAML file:
+ next if confs == false
$application_config.merge!(confs['common'] || {})
$application_config.merge!(confs[::Rails.env.to_s] || {})
end
# See http://aaronvb.com/articles/37-rails-caching-and-undefined-class-module
+# Config must be done before we load model class files; otherwise they
+# won't be able to use Rails.configuration.* to initialize their
+# classes.
+require_relative 'load_config.rb'
+
if Rails.env == 'development'
Dir.foreach("#{Rails.root}/app/models") do |model_file|
require_dependency model_file if model_file.match /\.rb$/
- end
+ end
end
@authorizations[job.uuid]
end
- def get_commit(src_repo, commit_hash)
- # @fetched_commits[V]==true if we know commit V exists in the
- # arvados_internal git repository.
- if !@fetched_commits[commit_hash]
- # check if the commit needs to be fetched or not
- commit_rev = stdout_s(git_cmd("rev-list", "-n1", commit_hash),
- err: "/dev/null")
- unless $? == 0 and commit_rev == commit_hash
- # commit does not exist in internal repository, so import the source repository using git fetch-pack
- cmd = git_cmd("fetch-pack", "--no-progress", "--all", src_repo)
- $stderr.puts "dispatch: #{cmd}"
- $stderr.puts(stdout_s(cmd))
- unless $? == 0
- fail_job job, "git fetch-pack failed"
- return nil
- end
- end
- @fetched_commits[commit_hash] = true
+ def internal_repo_has_commit? sha1
+ if (not @fetched_commits[sha1] and
+ sha1 == stdout_s(git_cmd("rev-list", "-n1", sha1), err: "/dev/null") and
+ $? == 0)
+ @fetched_commits[sha1] = true
end
- @fetched_commits[commit_hash]
+ return @fetched_commits[sha1]
+ end
+
+ def get_commit src_repo, sha1
+ return true if internal_repo_has_commit? sha1
+
+ # commit does not exist in internal repository, so import the
+ # source repository using git fetch-pack
+ cmd = git_cmd("fetch-pack", "--no-progress", "--all", src_repo)
+ $stderr.puts "dispatch: #{cmd}"
+ $stderr.puts(stdout_s(cmd))
+ @fetched_commits[sha1] = ($? == 0)
end
def tag_commit(commit_hash, tag_name)
"GEM_PATH=#{ENV['GEM_PATH']}")
end
- repo = Repository.where(name: job.repository).first
- if repo.nil? or repo.server_path.nil?
- fail_job "Repository #{job.repository} not found under #{@repo_root}"
- next
+ next unless get_authorization job
+
+ ready = internal_repo_has_commit? job.script_version
+
+ if not ready
+ # Import the commit from the specified repository into the
+ # internal repository. This should have been done already when
+ # the job was created/updated; this code is obsolete except to
+ # avoid deployment races. Failing the job would be a
+ # reasonable thing to do at this point.
+ repo = Repository.where(name: job.repository).first
+ if repo.nil? or repo.server_path.nil?
+ fail_job "Repository #{job.repository} not found under #{@repo_root}"
+ next
+ end
+ ready &&= get_commit repo.server_path, job.script_version
+ ready &&= tag_commit job.script_version, job.uuid
end
- ready = (get_authorization(job) and
- get_commit(repo.server_path, job.script_version) and
- tag_commit(job.script_version, job.uuid))
- if ready and job.arvados_sdk_version
- ready = (get_commit(@arvados_repo_path, job.arvados_sdk_version) and
- tag_commit(job.arvados_sdk_version, "#{job.uuid}-arvados-sdk"))
+ # This should be unnecessary, because API server does it during
+ # job create/update, but it's still not a bad idea to verify the
+ # tag is correct before starting the job:
+ ready &&= tag_commit job.script_version, job.uuid
+
+ # The arvados_sdk_version doesn't support use of arbitrary
+ # remote URLs, so the requested version isn't necessarily copied
+ # into the internal repository yet.
+ if job.arvados_sdk_version
+ ready &&= get_commit @arvados_repo_path, job.arvados_sdk_version
+ ready &&= tag_commit job.arvados_sdk_version, "#{job.uuid}-arvados-sdk"
+ end
+
+ if not ready
+ fail_job job, "commit not present in internal repository"
+ next
end
- next unless ready
cmd_args += [@crunch_job_bin,
'--job-api-token', @authorizations[job.uuid].api_token,
api_token: 3kg612cdc0f3415c2428b9758f33bdfb07bc3561b00e86qdmi
expires_at: 2038-01-01 00:00:00
-active_no_prefs_profile:
+active_no_prefs_profile_no_getting_started_shown:
api_client: untrusted
- user: active_no_prefs_profile
+ user: active_no_prefs_profile_no_getting_started_shown
api_token: 3kg612cdc0f3415c242856758f33bdfb07bc3561b00e86qdmi
expires_at: 2038-01-01 00:00:00
+active_no_prefs_profile_with_getting_started_shown:
+ api_client: untrusted
+ user: active_no_prefs_profile_with_getting_started_shown
+ api_token: 3kg612cdc0f3415c245786758f33bdfb07babcd1b00e86qdmi
+ expires_at: 2038-01-01 00:00:00
+
user_foo_in_sharing_group:
api_client: untrusted
user: user_foo_in_sharing_group
name: Pipeline in publicly accessible project
pipeline_template_uuid: zzzzz-p5p6p-tmpltpublicproj
state: Complete
- created_at: 2014-09-15 12:00:00
+ created_at: <%= 1.minute.ago.to_s(:db) %>
components:
foo:
script: foo
dataclass: Collection
value: zzzzz-4zz18-bv31uwvy3neko21
+new_pipeline_in_publicly_accessible_project_with_dataclass_file_and_other_objects_elsewhere:
+ uuid: zzzzz-d1hrv-newsharenotfile
+ owner_uuid: zzzzz-j7d0g-zhxawtyetzwc5f0
+ name: Pipeline in public project in New state with file type data class with objects elsewhere
+ pipeline_template_uuid: zzzzz-p5p6p-aox0k0ofxrystgw
+ state: New
+ created_at: 2014-09-15 12:00:00
+ components:
+ foo:
+ script: foo
+ script_version: master
+ script_parameters:
+ input:
+ required: true
+ dataclass: File
+ value: zzzzz-4zz18-bv31uwvy3neko21/bar
+
pipeline_in_running_state:
name: running_with_job
state: Ready
profile:
organization: example.com
role: IT
+ getting_started_shown: 2015-03-26 12:34:56.789000000 Z
miniadmin:
owner_uuid: zzzzz-tpzed-000000000000000
profile:
organization: example.com
role: IT
+ getting_started_shown: 2015-03-26 12:34:56.789000000 Z
rominiadmin:
owner_uuid: zzzzz-tpzed-000000000000000
profile:
organization: example.com
role: IT
+ getting_started_shown: 2015-03-26 12:34:56.789000000 Z
active:
owner_uuid: zzzzz-tpzed-000000000000000
profile:
organization: example.com
role: Computational biologist
+ getting_started_shown: 2015-03-26 12:34:56.789000000 Z
project_viewer:
owner_uuid: zzzzz-tpzed-000000000000000
profile:
organization: example.com
role: Computational biologist
+ getting_started_shown: 2015-03-26 12:34:56.789000000 Z
future_project_user:
# Workbench tests give this user permission on aproject.
profile:
organization: example.com
role: Computational biologist
+ getting_started_shown: 2015-03-26 12:34:56.789000000 Z
subproject_admin:
owner_uuid: zzzzz-tpzed-000000000000000
profile:
organization: example.com
role: Computational biologist
+ getting_started_shown: 2015-03-26 12:34:56.789000000 Z
spectator:
owner_uuid: zzzzz-tpzed-000000000000000
profile:
organization: example.com
role: Computational biologist
+ getting_started_shown: 2015-03-26 12:34:56.789000000 Z
inactive_uninvited:
owner_uuid: zzzzz-tpzed-000000000000000
profile:
organization: example.com
role: Computational biologist
+ getting_started_shown: 2015-03-26 12:34:56.789000000 Z
anonymous:
owner_uuid: zzzzz-tpzed-000000000000000
profile:
organization: example.com
role: Computational biologist
+ getting_started_shown: 2015-03-26 12:34:56.789000000 Z
active_no_prefs:
owner_uuid: zzzzz-tpzed-000000000000000
is_admin: false
prefs: {}
-active_no_prefs_profile:
+active_no_prefs_profile_no_getting_started_shown:
owner_uuid: zzzzz-tpzed-000000000000000
uuid: zzzzz-tpzed-a46c98d1td4aoj4
email: active_no_prefs_profile@arvados.local
prefs:
test: abc
+active_no_prefs_profile_with_getting_started_shown:
+ owner_uuid: zzzzz-tpzed-000000000000000
+ uuid: zzzzz-tpzed-getstartnoprofl
+ email: active_no_prefs_profile@arvados.local
+ first_name: HasPrefs
+ last_name: NoProfileWithGettingStartedShown
+ identity_url: https://active_no_prefs_profile_seen_gs.openid.local
+ is_active: true
+ is_admin: false
+ prefs:
+ test: abc
+ getting_started_shown: 2015-03-26 12:34:56.789000000 Z
+
# Fixtures to test granting and removing permissions.
user_foo_in_sharing_group:
profile:
organization: example.com
role: IT
+ getting_started_shown: 2015-03-26 12:34:56.789000000 Z
fuse:
owner_uuid: zzzzz-tpzed-000000000000000
profile:
organization: example.com
role: IT
+ getting_started_shown: 2015-03-26 12:34:56.789000000 Z
require 'test_helper'
-require 'helpers/git_test_helper'
-
-# NOTE: calling Commit.find_commit_range(user, nil, nil, 'rev') will produce
-# an error message "fatal: bad object 'rev'" on stderr if 'rev' does not exist
-# in a given repository. Many of these tests report such errors; their presence
-# does not represent a fatal condition.
-#
-# TODO(twp): consider better error handling of these messages, or
-# decide to abandon it.
class Arvados::V1::CommitsControllerTest < ActionController::TestCase
- fixtures :repositories, :users
-
- # See git_setup.rb for the commit log for test.git.tar
- include GitTestHelper
-
- test "test_find_commit_range" do
- authorize_with :active
-
- # single
- a = Commit.find_commit_range(users(:active), nil, nil, '31ce37fe365b3dc204300a3e4c396ad333ed0556', nil)
- assert_equal ['31ce37fe365b3dc204300a3e4c396ad333ed0556'], a
-
- #test "test_branch1" do
- # complains "fatal: bad object 077ba2ad3ea24a929091a9e6ce545c93199b8e57"
- a = Commit.find_commit_range(users(:active), nil, nil, 'master', nil)
- assert_includes(a, 'f35f99b7d32bac257f5989df02b9f12ee1a9b0d6')
- assert_includes(a, '077ba2ad3ea24a929091a9e6ce545c93199b8e57')
-
- #test "test_branch2" do
- a = Commit.find_commit_range(users(:active), 'active/foo', nil, 'b1', nil)
- assert_equal ['1de84a854e2b440dc53bf42f8548afa4c17da332'], a
-
- #test "test_branch3" do
- a = Commit.find_commit_range(users(:active), 'active/foo', nil, 'HEAD', nil)
- assert_equal ['1de84a854e2b440dc53bf42f8548afa4c17da332'], a
-
- #test "test_single_revision_repo" do
- a = Commit.find_commit_range(users(:active), "active/foo", nil, '31ce37fe365b3dc204300a3e4c396ad333ed0556', nil)
- assert_equal ['31ce37fe365b3dc204300a3e4c396ad333ed0556'], a
- a = Commit.find_commit_range(users(:active), "arvados", nil, '31ce37fe365b3dc204300a3e4c396ad333ed0556', nil)
- assert_equal nil, a
-
- #test "test_multi_revision" do
- # complains "fatal: bad object 077ba2ad3ea24a929091a9e6ce545c93199b8e57"
- a = Commit.find_commit_range(users(:active), nil, '31ce37fe365b3dc204300a3e4c396ad333ed0556', '077ba2ad3ea24a929091a9e6ce545c93199b8e57', nil)
- assert_equal ['077ba2ad3ea24a929091a9e6ce545c93199b8e57', '4fe459abe02d9b365932b8f5dc419439ab4e2577', '31ce37fe365b3dc204300a3e4c396ad333ed0556'], a
-
- #test "test_tag" do
- # complains "fatal: ambiguous argument 'tag1': unknown revision or path
- # not in the working tree."
- a = Commit.find_commit_range(users(:active), nil, 'tag1', 'master', nil)
- assert_equal ['077ba2ad3ea24a929091a9e6ce545c93199b8e57', '4fe459abe02d9b365932b8f5dc419439ab4e2577'], a
-
- #test "test_multi_revision_exclude" do
- a = Commit.find_commit_range(users(:active), nil, '31ce37fe365b3dc204300a3e4c396ad333ed0556', '077ba2ad3ea24a929091a9e6ce545c93199b8e57', ['4fe459abe02d9b365932b8f5dc419439ab4e2577'])
- assert_equal ['077ba2ad3ea24a929091a9e6ce545c93199b8e57', '31ce37fe365b3dc204300a3e4c396ad333ed0556'], a
-
- #test "test_multi_revision_tagged_exclude" do
- # complains "fatal: bad object 077ba2ad3ea24a929091a9e6ce545c93199b8e57"
- a = Commit.find_commit_range(users(:active), nil, '31ce37fe365b3dc204300a3e4c396ad333ed0556', '077ba2ad3ea24a929091a9e6ce545c93199b8e57', ['tag1'])
- assert_equal ['077ba2ad3ea24a929091a9e6ce545c93199b8e57', '31ce37fe365b3dc204300a3e4c396ad333ed0556'], a
-
- Dir.mktmpdir do |touchdir|
- # invalid input to maximum
- a = Commit.find_commit_range(users(:active), nil, nil, "31ce37fe365b3dc204300a3e4c396ad333ed0556 ; touch #{touchdir}/uh_oh", nil)
- assert !File.exists?("#{touchdir}/uh_oh"), "#{touchdir}/uh_oh should not exist, 'maximum' parameter of find_commit_range is exploitable"
- assert_equal nil, a
-
- # invalid input to maximum
- a = Commit.find_commit_range(users(:active), nil, nil, "$(uname>#{touchdir}/uh_oh)", nil)
- assert !File.exists?("#{touchdir}/uh_oh"), "#{touchdir}/uh_oh should not exist, 'maximum' parameter of find_commit_range is exploitable"
- assert_equal nil, a
-
- # invalid input to minimum
- a = Commit.find_commit_range(users(:active), nil, "31ce37fe365b3dc204300a3e4c396ad333ed0556 ; touch #{touchdir}/uh_oh", "31ce37fe365b3dc204300a3e4c396ad333ed0556", nil)
- assert !File.exists?("#{touchdir}/uh_oh"), "#{touchdir}/uh_oh should not exist, 'minimum' parameter of find_commit_range is exploitable"
- assert_equal nil, a
-
- # invalid input to minimum
- a = Commit.find_commit_range(users(:active), nil, "$(uname>#{touchdir}/uh_oh)", "31ce37fe365b3dc204300a3e4c396ad333ed0556", nil)
- assert !File.exists?("#{touchdir}/uh_oh"), "#{touchdir}/uh_oh should not exist, 'minimum' parameter of find_commit_range is exploitable"
- assert_equal nil, a
-
- # invalid input to 'excludes'
- # complains "fatal: bad object 077ba2ad3ea24a929091a9e6ce545c93199b8e57"
- a = Commit.find_commit_range(users(:active), nil, "31ce37fe365b3dc204300a3e4c396ad333ed0556", "077ba2ad3ea24a929091a9e6ce545c93199b8e57", ["4fe459abe02d9b365932b8f5dc419439ab4e2577 ; touch #{touchdir}/uh_oh"])
- assert !File.exists?("#{touchdir}/uh_oh"), "#{touchdir}/uh_oh should not exist, 'excludes' parameter of find_commit_range is exploitable"
- assert_equal nil, a
-
- # invalid input to 'excludes'
- # complains "fatal: bad object 077ba2ad3ea24a929091a9e6ce545c93199b8e57"
- a = Commit.find_commit_range(users(:active), nil, "31ce37fe365b3dc204300a3e4c396ad333ed0556", "077ba2ad3ea24a929091a9e6ce545c93199b8e57", ["$(uname>#{touchdir}/uh_oh)"])
- assert !File.exists?("#{touchdir}/uh_oh"), "#{touchdir}/uh_oh should not exist, 'excludes' parameter of find_commit_range is exploitable"
- assert_equal nil, a
-
- end
-
- end
-
end
post :lock, {id: jobs(:running).uuid}
assert_response 403 # forbidden
end
+
+ test 'reject invalid commit in remote repository' do
+ authorize_with :active
+ url = "http://localhost:1/fake/fake.git"
+ fetch_remote_from_local_repo url, :foo
+ post :create, job: {
+ script: "hash",
+ script_version: "abc123",
+ repository: url,
+ script_parameters: {}
+ }
+ assert_response 422
+ end
+
+ test 'tag remote commit in internal repository' do
+ authorize_with :active
+ url = "http://localhost:1/fake/fake.git"
+ fetch_remote_from_local_repo url, :foo
+ post :create, job: {
+ script: "hash",
+ script_version: "master",
+ repository: url,
+ script_parameters: {}
+ }
+ assert_response :success
+ assert_equal('077ba2ad3ea24a929091a9e6ce545c93199b8e57',
+ internal_tag(json_response['uuid']))
+ end
+
+ test 'tag local commit in internal repository' do
+ authorize_with :active
+ post :create, job: {
+ script: "hash",
+ script_version: "master",
+ repository: "active/foo",
+ script_parameters: {}
+ }
+ assert_response :success
+ assert_equal('077ba2ad3ea24a929091a9e6ce545c93199b8e57',
+ internal_tag(json_response['uuid']))
+ end
end
authorize_with :admin
user = {}
- user[:prefs] = users(:active_no_prefs_profile).prefs
+ user[:prefs] = users(:active_no_prefs_profile_no_getting_started_shown).prefs
user[:prefs][:profile] = {:profile => {'organization' => 'example.com'}}
put :update, {
- id: users(:active_no_prefs_profile).uuid,
+ id: users(:active_no_prefs_profile_no_getting_started_shown).uuid,
user: user
}
assert_response :success
found_email = false
ActionMailer::Base.deliveries.andand.each do |email|
- if email.subject == "Profile created by #{users(:active_no_prefs_profile).email}"
+ if email.subject == "Profile created by #{users(:active_no_prefs_profile_no_getting_started_shown).email}"
found_email = true
break
end
@tmpdir = Dir.mktmpdir()
system("tar", "-xC", @tmpdir, "-f", "test/test.git.tar")
Rails.configuration.git_repositories_dir = "#{@tmpdir}/test"
- Commit.refresh_repositories
+ intdir = Rails.configuration.git_internal_dir
+ if not File.exist? intdir
+ FileUtils.mkdir_p intdir
+ IO.read("|git --git-dir #{intdir.to_s.shellescape} init")
+ assert $?.success?
+ end
end
base.teardown do
FileUtils.remove_entry @tmpdir, true
- Commit.refresh_repositories
+ FileUtils.remove_entry Commit.cache_dir_base, true
+ end
+ end
+
+ def internal_tag tag
+ IO.read "|git --git-dir #{Rails.configuration.git_internal_dir.shellescape} log --format=format:%H -n1 #{tag.shellescape}"
+ end
+
+ # Intercept fetch_remote_repository and fetch from a specified url
+ # or local fixture instead of the remote url requested. fakeurl can
+ # be a url (probably starting with file:///) or the name of a
+ # fixture (as a symbol)
+ def fetch_remote_from_local_repo url, fakeurl
+ if fakeurl.is_a? Symbol
+ fakeurl = 'file://' + repositories(fakeurl).server_path
+ end
+ Commit.expects(:fetch_remote_repository).once.with do |gitdir, giturl|
+ if giturl == url
+ Commit.unstub(:fetch_remote_repository)
+ Commit.fetch_remote_repository gitdir, fakeurl
+ true
+ end
end
end
end
require File.expand_path('../../config/environment', __FILE__)
require 'rails/test_help'
+require 'mocha/mini_test'
module ArvadosTestSupport
def json_response
require 'test_helper'
+require 'helpers/git_test_helper'
+
+# NOTE: calling Commit.find_commit_range(nil, nil, 'rev')
+# produces an error message "fatal: bad object 'rev'" on stderr if
+# 'rev' does not exist in a given repository. Many of these tests
+# report such errors; their presence does not represent a fatal
+# condition.
class CommitTest < ActiveSupport::TestCase
- # test "the truth" do
- # assert true
- # end
+ # See git_setup.rb for the commit log for test.git.tar
+ include GitTestHelper
+
+ setup do
+ authorize_with :active
+ end
+
+ test 'find_commit_range does not bypass permissions' do
+ authorize_with :inactive
+ assert_raises ArgumentError do
+ c = Commit.find_commit_range 'foo', nil, 'master', []
+ end
+ end
+
+ [
+ 'https://github.com/curoverse/arvados.git',
+ 'http://github.com/curoverse/arvados.git',
+ 'git://github.com/curoverse/arvados.git',
+ ].each do |url|
+ test "find_commit_range uses fetch_remote_repository to get #{url}" do
+ fake_gitdir = repositories(:foo).server_path
+ Commit.expects(:cache_dir_for).once.with(url).returns fake_gitdir
+ Commit.expects(:fetch_remote_repository).once.with(fake_gitdir, url).returns true
+ c = Commit.find_commit_range url, nil, 'master', []
+ refute_empty c
+ end
+ end
+
+ [
+ 'bogus/repo',
+ '/bogus/repo',
+ '/not/allowed/.git',
+ 'file:///not/allowed.git',
+ 'git.curoverse.com/arvados.git',
+ 'github.com/curoverse/arvados.git',
+ ].each do |url|
+ test "find_commit_range skips fetch_remote_repository for #{url}" do
+ Commit.expects(:fetch_remote_repository).never
+ assert_raises ArgumentError do
+ Commit.find_commit_range url, nil, 'master', []
+ end
+ end
+ end
+
+ test 'fetch_remote_repository does not leak commits across repositories' do
+ url = "http://localhost:1/fake/fake.git"
+ fetch_remote_from_local_repo url, :foo
+ c = Commit.find_commit_range url, nil, 'master', []
+ assert_equal ['077ba2ad3ea24a929091a9e6ce545c93199b8e57'], c
+
+ url = "http://localhost:2/fake/fake.git"
+ fetch_remote_from_local_repo url, 'file://' + File.expand_path('../../.git', Rails.root)
+ c = Commit.find_commit_range url, nil, '077ba2ad3ea24a929091a9e6ce545c93199b8e57', []
+ assert_equal [], c
+ end
+
+ test 'tag_in_internal_repository creates and updates tags in internal.git' do
+ authorize_with :active
+ gitint = "git --git-dir #{Rails.configuration.git_internal_dir}"
+ IO.read("|#{gitint} tag -d testtag 2>/dev/null") # "no such tag", fine
+ assert_match /^fatal: /, IO.read("|#{gitint} show testtag 2>&1")
+ refute $?.success?
+ Commit.tag_in_internal_repository 'active/foo', '31ce37fe365b3dc204300a3e4c396ad333ed0556', 'testtag'
+ assert_match /^commit 31ce37f/, IO.read("|#{gitint} show testtag")
+ assert $?.success?
+ end
+
+ test "find_commit_range laundry list" do
+ authorize_with :active
+
+ # single
+ a = Commit.find_commit_range('active/foo', nil, '31ce37fe365b3dc204300a3e4c396ad333ed0556', nil)
+ assert_equal ['31ce37fe365b3dc204300a3e4c396ad333ed0556'], a
+
+ #test "test_branch1" do
+ a = Commit.find_commit_range('active/foo', nil, 'master', nil)
+ assert_includes(a, '077ba2ad3ea24a929091a9e6ce545c93199b8e57')
+
+ #test "test_branch2" do
+ a = Commit.find_commit_range('active/foo', nil, 'b1', nil)
+ assert_equal ['1de84a854e2b440dc53bf42f8548afa4c17da332'], a
+
+ #test "test_branch3" do
+ a = Commit.find_commit_range('active/foo', nil, 'HEAD', nil)
+ assert_equal ['1de84a854e2b440dc53bf42f8548afa4c17da332'], a
+
+ #test "test_single_revision_repo" do
+ a = Commit.find_commit_range('active/foo', nil, '31ce37fe365b3dc204300a3e4c396ad333ed0556', nil)
+ assert_equal ['31ce37fe365b3dc204300a3e4c396ad333ed0556'], a
+ a = Commit.find_commit_range('arvados', nil, '31ce37fe365b3dc204300a3e4c396ad333ed0556', nil)
+ assert_equal [], a
+
+ #test "test_multi_revision" do
+ # complains "fatal: bad object 077ba2ad3ea24a929091a9e6ce545c93199b8e57"
+ a = Commit.find_commit_range('active/foo', '31ce37fe365b3dc204300a3e4c396ad333ed0556', '077ba2ad3ea24a929091a9e6ce545c93199b8e57', nil)
+ assert_equal ['077ba2ad3ea24a929091a9e6ce545c93199b8e57', '4fe459abe02d9b365932b8f5dc419439ab4e2577', '31ce37fe365b3dc204300a3e4c396ad333ed0556'], a
+
+ #test "test_tag" do
+ # complains "fatal: ambiguous argument 'tag1': unknown revision or path
+ # not in the working tree."
+ a = Commit.find_commit_range('active/foo', 'tag1', 'master', nil)
+ assert_equal ['077ba2ad3ea24a929091a9e6ce545c93199b8e57', '4fe459abe02d9b365932b8f5dc419439ab4e2577'], a
+
+ #test "test_multi_revision_exclude" do
+ a = Commit.find_commit_range('active/foo', '31ce37fe365b3dc204300a3e4c396ad333ed0556', '077ba2ad3ea24a929091a9e6ce545c93199b8e57', ['4fe459abe02d9b365932b8f5dc419439ab4e2577'])
+ assert_equal ['077ba2ad3ea24a929091a9e6ce545c93199b8e57', '31ce37fe365b3dc204300a3e4c396ad333ed0556'], a
+
+ #test "test_multi_revision_tagged_exclude" do
+ # complains "fatal: bad object 077ba2ad3ea24a929091a9e6ce545c93199b8e57"
+ a = Commit.find_commit_range('active/foo', '31ce37fe365b3dc204300a3e4c396ad333ed0556', '077ba2ad3ea24a929091a9e6ce545c93199b8e57', ['tag1'])
+ assert_equal ['077ba2ad3ea24a929091a9e6ce545c93199b8e57', '31ce37fe365b3dc204300a3e4c396ad333ed0556'], a
+
+ Dir.mktmpdir do |touchdir|
+ # invalid input to maximum
+ a = Commit.find_commit_range('active/foo', nil, "31ce37fe365b3dc204300a3e4c396ad333ed0556 ; touch #{touchdir}/uh_oh", nil)
+ assert !File.exists?("#{touchdir}/uh_oh"), "#{touchdir}/uh_oh should not exist, 'maximum' parameter of find_commit_range is exploitable"
+ assert_equal [], a
+
+ # invalid input to maximum
+ a = Commit.find_commit_range('active/foo', nil, "$(uname>#{touchdir}/uh_oh)", nil)
+ assert !File.exists?("#{touchdir}/uh_oh"), "#{touchdir}/uh_oh should not exist, 'maximum' parameter of find_commit_range is exploitable"
+ assert_equal [], a
+
+ # invalid input to minimum
+ a = Commit.find_commit_range('active/foo', "31ce37fe365b3dc204300a3e4c396ad333ed0556 ; touch #{touchdir}/uh_oh", "31ce37fe365b3dc204300a3e4c396ad333ed0556", nil)
+ assert !File.exists?("#{touchdir}/uh_oh"), "#{touchdir}/uh_oh should not exist, 'minimum' parameter of find_commit_range is exploitable"
+ assert_equal [], a
+
+ # invalid input to minimum
+ a = Commit.find_commit_range('active/foo', "$(uname>#{touchdir}/uh_oh)", "31ce37fe365b3dc204300a3e4c396ad333ed0556", nil)
+ assert !File.exists?("#{touchdir}/uh_oh"), "#{touchdir}/uh_oh should not exist, 'minimum' parameter of find_commit_range is exploitable"
+ assert_equal [], a
+
+ # invalid input to 'excludes'
+ # complains "fatal: bad object 077ba2ad3ea24a929091a9e6ce545c93199b8e57"
+ a = Commit.find_commit_range('active/foo', "31ce37fe365b3dc204300a3e4c396ad333ed0556", "077ba2ad3ea24a929091a9e6ce545c93199b8e57", ["4fe459abe02d9b365932b8f5dc419439ab4e2577 ; touch #{touchdir}/uh_oh"])
+ assert !File.exists?("#{touchdir}/uh_oh"), "#{touchdir}/uh_oh should not exist, 'excludes' parameter of find_commit_range is exploitable"
+ assert_equal [], a
+
+ # invalid input to 'excludes'
+ # complains "fatal: bad object 077ba2ad3ea24a929091a9e6ce545c93199b8e57"
+ a = Commit.find_commit_range('active/foo', "31ce37fe365b3dc204300a3e4c396ad333ed0556", "077ba2ad3ea24a929091a9e6ce545c93199b8e57", ["$(uname>#{touchdir}/uh_oh)"])
+ assert !File.exists?("#{touchdir}/uh_oh"), "#{touchdir}/uh_oh should not exist, 'excludes' parameter of find_commit_range is exploitable"
+ assert_equal [], a
+ end
+ end
end
job = Job.create!(job_attrs(good_params))
assert job.valid?
end
+
+ test 'update job uuid tag in internal.git when version changes' do
+ authorize_with :active
+ j = jobs :queued
+ j.update_attributes repository: 'active/foo', script_version: 'b1'
+ assert_equal('1de84a854e2b440dc53bf42f8548afa4c17da332',
+ internal_tag(j.uuid))
+ j.update_attributes repository: 'active/foo', script_version: 'master'
+ assert_equal('077ba2ad3ea24a929091a9e6ce545c93199b8e57',
+ internal_tag(j.uuid))
+ end
end
vm_perm = find_obj_in_resp response, 'Link', 'arvados#virtualMachine'
verify_link vm_perm, 'permission', 'can_login', resp_user[:uuid], vm.uuid
+ assert_equal("foo", vm_perm.properties["username"])
end
test "setup new user with junk in database" do
vm_perm = find_obj_in_resp response, 'Link', 'arvados#virtualMachine'
verify_link vm_perm, 'permission', 'can_login', resp_user[:uuid], vm.uuid
+ assert_equal("foo", vm_perm.properties["username"])
end
test "setup new user in multiple steps" do
vm_perm = find_obj_in_resp response, 'Link', 'arvados#virtualMachine'
verify_link vm_perm, 'permission', 'can_login', resp_user[:uuid], vm.uuid
+ assert_equal("foo", vm_perm.properties["username"])
end
def find_obj_in_resp (response_items, object_type, head_kind=nil)
package main
import (
+ "errors"
"flag"
"fmt"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"net/http"
"os"
"os/signal"
+ "reflect"
"sync"
"syscall"
"time"
log.Fatalf("Could not listen on %v", listen)
}
- go RefreshServicesList(&kc)
+ go RefreshServicesList(kc)
// Shut down the server gracefully (by closing the listener)
// if SIGTERM is received.
signal.Notify(term, syscall.SIGTERM)
signal.Notify(term, syscall.SIGINT)
- log.Printf("Arvados Keep proxy started listening on %v with server list %v", listener.Addr(), kc.ServiceRoots())
+ log.Printf("Arvados Keep proxy started listening on %v", listener.Addr())
// Start listening for requests.
- http.Serve(listener, MakeRESTRouter(!no_get, !no_put, &kc))
+ http.Serve(listener, MakeRESTRouter(!no_get, !no_put, kc))
log.Println("shutting down")
}
// Refresh the keep service list every five minutes.
func RefreshServicesList(kc *keepclient.KeepClient) {
- var sleeptime time.Duration
+ var previousRoots = []map[string]string{}
+ var delay time.Duration = 0
for {
- oldservices := kc.ServiceRoots()
- newservices, err := kc.DiscoverKeepServers()
- if err == nil && len(newservices) > 0 {
- s1 := fmt.Sprint(oldservices)
- s2 := fmt.Sprint(newservices)
- if s1 != s2 {
- log.Printf("Updated server list to %v", s2)
- }
- sleeptime = 300 * time.Second
- } else {
- // There was an error, or the list is empty, so wait 3 seconds and try again.
- if err != nil {
- log.Printf("Error retrieving server list: %v", err)
- } else {
- log.Printf("Retrieved an empty server list")
- }
- sleeptime = 3 * time.Second
+ time.Sleep(delay * time.Second)
+ delay = 300
+ if err := kc.DiscoverKeepServers(); err != nil {
+ log.Println("Error retrieving services list:", err)
+ delay = 3
+ continue
+ }
+ newRoots := []map[string]string{kc.LocalRoots(), kc.GatewayRoots()}
+ if !reflect.DeepEqual(previousRoots, newRoots) {
+ log.Printf("Updated services list: locals %v gateways %v", newRoots[0], newRoots[1])
+ }
+ if len(newRoots[0]) == 0 {
+ log.Print("WARNING: No local services. Retrying in 3 seconds.")
+ delay = 3
}
- time.Sleep(sleeptime)
+ previousRoots = newRoots
}
}
rest := mux.NewRouter()
if enable_get {
- rest.Handle(`/{hash:[0-9a-f]{32}}+{hints}`,
+ rest.Handle(`/{locator:[0-9a-f]{32}\+.*}`,
GetBlockHandler{kc, t}).Methods("GET", "HEAD")
- rest.Handle(`/{hash:[0-9a-f]{32}}`, GetBlockHandler{kc, t}).Methods("GET", "HEAD")
+ rest.Handle(`/{locator:[0-9a-f]{32}}`, GetBlockHandler{kc, t}).Methods("GET", "HEAD")
}
if enable_put {
- rest.Handle(`/{hash:[0-9a-f]{32}}+{hints}`, PutBlockHandler{kc, t}).Methods("PUT")
- rest.Handle(`/{hash:[0-9a-f]{32}}`, PutBlockHandler{kc, t}).Methods("PUT")
+ rest.Handle(`/{locator:[0-9a-f]{32}\+.*}`, PutBlockHandler{kc, t}).Methods("PUT")
+ rest.Handle(`/{locator:[0-9a-f]{32}}`, PutBlockHandler{kc, t}).Methods("PUT")
rest.Handle(`/`, PutBlockHandler{kc, t}).Methods("POST")
rest.Handle(`/{any}`, OptionsHandler{}).Methods("OPTIONS")
rest.Handle(`/`, OptionsHandler{}).Methods("OPTIONS")
SetCorsHeaders(resp)
}
+var BadAuthorizationHeader = errors.New("Missing or invalid Authorization header")
+var ContentLengthMismatch = errors.New("Actual length != expected content length")
+var MethodNotSupported = errors.New("Method not supported")
+
func (this GetBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
SetCorsHeaders(resp)
- kc := *this.KeepClient
-
- hash := mux.Vars(req)["hash"]
- hints := mux.Vars(req)["hints"]
-
- locator := keepclient.MakeLocator2(hash, hints)
+ locator := mux.Vars(req)["locator"]
+ var err error
+ var status int
+ var expectLength, responseLength int64
+ var proxiedURI = "-"
+
+ defer func() {
+ log.Println(GetRemoteAddress(req), req.Method, req.URL.Path, status, expectLength, responseLength, proxiedURI, err)
+ if status != http.StatusOK {
+ http.Error(resp, err.Error(), status)
+ }
+ }()
- log.Printf("%s: %s %s begin", GetRemoteAddress(req), req.Method, hash)
+ kc := *this.KeepClient
var pass bool
var tok string
if pass, tok = CheckAuthorizationHeader(kc, this.ApiTokenCache, req); !pass {
- http.Error(resp, "Missing or invalid Authorization header", http.StatusForbidden)
+ status, err = http.StatusForbidden, BadAuthorizationHeader
return
}
kc.Arvados = &arvclient
var reader io.ReadCloser
- var err error
- var blocklen int64
- if req.Method == "GET" {
- reader, blocklen, _, err = kc.AuthorizedGet(hash, locator.Signature, locator.Timestamp)
+ switch req.Method {
+ case "HEAD":
+ expectLength, proxiedURI, err = kc.Ask(locator)
+ case "GET":
+ reader, expectLength, proxiedURI, err = kc.Get(locator)
if reader != nil {
defer reader.Close()
}
- } else if req.Method == "HEAD" {
- blocklen, _, err = kc.AuthorizedAsk(hash, locator.Signature, locator.Timestamp)
+ default:
+ status, err = http.StatusNotImplemented, MethodNotSupported
+ return
}
- if blocklen == -1 {
- log.Printf("%s: %s %s Keep server did not return Content-Length",
- GetRemoteAddress(req), req.Method, hash)
+ if expectLength == -1 {
+ log.Println("Warning:", GetRemoteAddress(req), req.Method, proxiedURI, "Content-Length not provided")
}
- var status = 0
switch err {
case nil:
status = http.StatusOK
- resp.Header().Set("Content-Length", fmt.Sprint(blocklen))
- if reader != nil {
- n, err2 := io.Copy(resp, reader)
- if blocklen > -1 && n != blocklen {
- log.Printf("%s: %s %s %v %v mismatched copy size expected Content-Length: %v",
- GetRemoteAddress(req), req.Method, hash, status, n, blocklen)
- } else if err2 == nil {
- log.Printf("%s: %s %s %v %v",
- GetRemoteAddress(req), req.Method, hash, status, n)
- } else {
- log.Printf("%s: %s %s %v %v copy error: %v",
- GetRemoteAddress(req), req.Method, hash, status, n, err2.Error())
+ resp.Header().Set("Content-Length", fmt.Sprint(expectLength))
+ switch req.Method {
+ case "HEAD":
+ responseLength = 0
+ case "GET":
+ responseLength, err = io.Copy(resp, reader)
+ if err == nil && expectLength > -1 && responseLength != expectLength {
+ err = ContentLengthMismatch
}
- } else {
- log.Printf("%s: %s %s %v 0", GetRemoteAddress(req), req.Method, hash, status)
}
case keepclient.BlockNotFound:
status = http.StatusNotFound
- http.Error(resp, "Not Found", http.StatusNotFound)
default:
status = http.StatusBadGateway
- http.Error(resp, err.Error(), http.StatusBadGateway)
- }
-
- if err != nil {
- log.Printf("%s: %s %s %v error: %v",
- GetRemoteAddress(req), req.Method, hash, status, err.Error())
}
}
+var LengthRequiredError = errors.New(http.StatusText(http.StatusLengthRequired))
+var LengthMismatchError = errors.New("Locator size hint does not match Content-Length header")
+
func (this PutBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
SetCorsHeaders(resp)
kc := *this.KeepClient
+ var err error
+ var expectLength int64 = -1
+ var status = http.StatusInternalServerError
+ var wroteReplicas int
+ var locatorOut string = "-"
+
+ defer func() {
+ log.Println(GetRemoteAddress(req), req.Method, req.URL.Path, status, expectLength, kc.Want_replicas, wroteReplicas, locatorOut, err)
+ if status != http.StatusOK {
+ http.Error(resp, err.Error(), status)
+ }
+ }()
- hash := mux.Vars(req)["hash"]
- hints := mux.Vars(req)["hints"]
-
- locator := keepclient.MakeLocator2(hash, hints)
+ locatorIn := mux.Vars(req)["locator"]
- var contentLength int64 = -1
if req.Header.Get("Content-Length") != "" {
- _, err := fmt.Sscanf(req.Header.Get("Content-Length"), "%d", &contentLength)
+ _, err := fmt.Sscanf(req.Header.Get("Content-Length"), "%d", &expectLength)
if err != nil {
- resp.Header().Set("Content-Length", fmt.Sprintf("%d", contentLength))
+ resp.Header().Set("Content-Length", fmt.Sprintf("%d", expectLength))
}
}
- log.Printf("%s: %s %s Content-Length %v", GetRemoteAddress(req), req.Method, hash, contentLength)
-
- if contentLength < 0 {
- http.Error(resp, "Must include Content-Length header", http.StatusLengthRequired)
+ if expectLength < 0 {
+ err = LengthRequiredError
+ status = http.StatusLengthRequired
return
}
- if locator.Size > 0 && int64(locator.Size) != contentLength {
- http.Error(resp, "Locator size hint does not match Content-Length header", http.StatusBadRequest)
- return
+ if locatorIn != "" {
+ var loc *keepclient.Locator
+ if loc, err = keepclient.MakeLocator(locatorIn); err != nil {
+ status = http.StatusBadRequest
+ return
+ } else if loc.Size > 0 && int64(loc.Size) != expectLength {
+ err = LengthMismatchError
+ status = http.StatusBadRequest
+ return
+ }
}
var pass bool
var tok string
if pass, tok = CheckAuthorizationHeader(kc, this.ApiTokenCache, req); !pass {
- http.Error(resp, "Missing or invalid Authorization header", http.StatusForbidden)
+ err = BadAuthorizationHeader
+ status = http.StatusForbidden
return
}
}
// Now try to put the block through
- var replicas int
- var put_err error
- if hash == "" {
+ if locatorIn == "" {
if bytes, err := ioutil.ReadAll(req.Body); err != nil {
- msg := fmt.Sprintf("Error reading request body: %s", err)
- log.Printf(msg)
- http.Error(resp, msg, http.StatusInternalServerError)
+ err = errors.New(fmt.Sprintf("Error reading request body: %s", err))
+ status = http.StatusInternalServerError
return
} else {
- hash, replicas, put_err = kc.PutB(bytes)
+ locatorOut, wroteReplicas, err = kc.PutB(bytes)
}
} else {
- hash, replicas, put_err = kc.PutHR(hash, req.Body, contentLength)
+ locatorOut, wroteReplicas, err = kc.PutHR(locatorIn, req.Body, expectLength)
}
// Tell the client how many successful PUTs we accomplished
- resp.Header().Set(keepclient.X_Keep_Replicas_Stored, fmt.Sprintf("%d", replicas))
+ resp.Header().Set(keepclient.X_Keep_Replicas_Stored, fmt.Sprintf("%d", wroteReplicas))
- switch put_err {
+ switch err {
case nil:
- // Default will return http.StatusOK
- log.Printf("%s: %s %s finished, stored %v replicas (desired %v)", GetRemoteAddress(req), req.Method, hash, replicas, kc.Want_replicas)
- n, err2 := io.WriteString(resp, hash)
- if err2 != nil {
- log.Printf("%s: wrote %v bytes to response body and got error %v", n, err2.Error())
- }
+ status = http.StatusOK
+ _, err = io.WriteString(resp, locatorOut)
case keepclient.OversizeBlockError:
// Too much data
- http.Error(resp, fmt.Sprintf("Exceeded maximum blocksize %d", keepclient.BLOCKSIZE), http.StatusRequestEntityTooLarge)
+ status = http.StatusRequestEntityTooLarge
case keepclient.InsufficientReplicasError:
- if replicas > 0 {
+ if wroteReplicas > 0 {
// At least one write is considered success. The
// client can decide if getting less than the number of
// replications it asked for is a fatal error.
- // Default will return http.StatusOK
- n, err2 := io.WriteString(resp, hash)
- if err2 != nil {
- log.Printf("%s: wrote %v bytes to response body and got error %v", n, err2.Error())
- }
+ status = http.StatusOK
+ _, err = io.WriteString(resp, locatorOut)
} else {
- http.Error(resp, put_err.Error(), http.StatusServiceUnavailable)
+ status = http.StatusServiceUnavailable
}
default:
- http.Error(resp, put_err.Error(), http.StatusBadGateway)
- }
-
- if put_err != nil {
- log.Printf("%s: %s %s stored %v replicas (desired %v) got error %v", GetRemoteAddress(req), req.Method, hash, replicas, kc.Want_replicas, put_err.Error())
+ status = http.StatusBadGateway
}
-
}
}
kc.SetServiceRoots(map[string]string{
"proxy": fmt.Sprintf("http://localhost:%v", port),
- })
+ }, nil)
c.Check(kc.Using_proxy, Equals, true)
- c.Check(len(kc.ServiceRoots()), Equals, 1)
- for _, root := range kc.ServiceRoots() {
+ c.Check(len(kc.LocalRoots()), Equals, 1)
+ for _, root := range kc.LocalRoots() {
c.Check(root, Equals, fmt.Sprintf("http://localhost:%v", port))
}
log.Print("keepclient created")
c.Assert(err, Equals, nil)
c.Check(kc.Arvados.External, Equals, true)
c.Check(kc.Using_proxy, Equals, true)
- c.Check(len(kc.ServiceRoots()), Equals, 1)
- for _, root := range kc.ServiceRoots() {
+ c.Check(len(kc.LocalRoots()), Equals, 1)
+ for _, root := range kc.LocalRoots() {
c.Check(root, Equals, "http://localhost:29950")
}
os.Setenv("ARVADOS_EXTERNAL_CLIENT", "")
// Prepare two test Keep volumes. Our block is stored on the second volume.
KeepVM = MakeTestVolumeManager(2)
- defer KeepVM.Quit()
+ defer KeepVM.Close()
- vols := KeepVM.Volumes()
+ vols := KeepVM.AllWritable()
if err := vols[0].Put(TEST_HASH, TEST_BLOCK); err != nil {
t.Error(err)
}
// Prepare two test Keep volumes.
KeepVM = MakeTestVolumeManager(2)
- defer KeepVM.Quit()
+ defer KeepVM.Close()
// --------------
// No server key.
TEST_HASH_PUT_RESPONSE, response)
}
+func TestPutAndDeleteSkipReadonlyVolumes(t *testing.T) {
+ defer teardown()
+ data_manager_token = "fake-data-manager-token"
+ vols := []*MockVolume{CreateMockVolume(), CreateMockVolume()}
+ vols[0].Readonly = true
+ KeepVM = MakeRRVolumeManager([]Volume{vols[0], vols[1]})
+ defer KeepVM.Close()
+ IssueRequest(
+ &RequestTester{
+ method: "PUT",
+ uri: "/"+TEST_HASH,
+ request_body: TEST_BLOCK,
+ })
+ IssueRequest(
+ &RequestTester{
+ method: "DELETE",
+ uri: "/"+TEST_HASH,
+ request_body: TEST_BLOCK,
+ api_token: data_manager_token,
+ })
+ type expect struct {
+ volnum int
+ method string
+ callcount int
+ }
+ for _, e := range []expect{
+ {0, "Get", 0},
+ {0, "Touch", 0},
+ {0, "Put", 0},
+ {0, "Delete", 0},
+ {1, "Get", 1},
+ {1, "Put", 1},
+ {1, "Delete", 1},
+ } {
+ if calls := vols[e.volnum].CallCount(e.method); calls != e.callcount {
+ t.Errorf("Got %d %s() on vol %d, expect %d", calls, e.method, e.volnum, e.callcount)
+ }
+ }
+}
+
// Test /index requests:
// - unauthenticated /index request
// - unauthenticated /index/prefix request
// Include multiple blocks on different volumes, and
// some metadata files (which should be omitted from index listings)
KeepVM = MakeTestVolumeManager(2)
- defer KeepVM.Quit()
+ defer KeepVM.Close()
- vols := KeepVM.Volumes()
+ vols := KeepVM.AllWritable()
vols[0].Put(TEST_HASH, TEST_BLOCK)
vols[1].Put(TEST_HASH_2, TEST_BLOCK_2)
vols[0].Put(TEST_HASH+".meta", []byte("metadata"))
// Include multiple blocks on different volumes, and
// some metadata files (which should be omitted from index listings)
KeepVM = MakeTestVolumeManager(2)
- defer KeepVM.Quit()
+ defer KeepVM.Close()
- vols := KeepVM.Volumes()
+ vols := KeepVM.AllWritable()
vols[0].Put(TEST_HASH, TEST_BLOCK)
// Explicitly set the permission_ttl to 0 for these
// StatusHandler (GET /status.json)
import (
- "bufio"
"bytes"
"container/list"
"crypto/md5"
http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
}
-// FindKeepVolumes scans all mounted volumes on the system for Keep
-// volumes, and returns a list of matching paths.
-//
-// A device is assumed to be a Keep volume if it is a normal or tmpfs
-// volume and has a "/keep" directory directly underneath the mount
-// point.
-//
-func FindKeepVolumes() []string {
- vols := make([]string, 0)
-
- if f, err := os.Open(PROC_MOUNTS); err != nil {
- log.Fatalf("opening %s: %s\n", PROC_MOUNTS, err)
- } else {
- scanner := bufio.NewScanner(f)
- for scanner.Scan() {
- args := strings.Fields(scanner.Text())
- dev, mount := args[0], args[1]
- if mount != "/" &&
- (dev == "tmpfs" || strings.HasPrefix(dev, "/dev/")) {
- keep := mount + "/keep"
- if st, err := os.Stat(keep); err == nil && st.IsDir() {
- vols = append(vols, keep)
- }
- }
- }
- if err := scanner.Err(); err != nil {
- log.Fatal(err)
- }
- }
- return vols
-}
-
func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
hash := mux.Vars(req)["hash"]
prefix := mux.Vars(req)["prefix"]
var index string
- for _, vol := range KeepVM.Volumes() {
+ for _, vol := range KeepVM.AllReadable() {
index = index + vol.Index(prefix)
}
resp.Write([]byte(index))
func GetNodeStatus() *NodeStatus {
st := new(NodeStatus)
- st.Volumes = make([]*VolumeStatus, len(KeepVM.Volumes()))
- for i, vol := range KeepVM.Volumes() {
+ st.Volumes = make([]*VolumeStatus, len(KeepVM.AllReadable()))
+ for i, vol := range KeepVM.AllReadable() {
st.Volumes[i] = vol.Status()
}
return st
return
}
- // Delete copies of this block from all available volumes. Report
- // how many blocks were successfully and unsuccessfully
- // deleted.
+ // Delete copies of this block from all available volumes.
+ // Report how many blocks were successfully deleted, and how
+ // many were found on writable volumes but not deleted.
var result struct {
Deleted int `json:"copies_deleted"`
Failed int `json:"copies_failed"`
}
- for _, vol := range KeepVM.Volumes() {
+ for _, vol := range KeepVM.AllWritable() {
if err := vol.Delete(hash); err == nil {
result.Deleted++
} else if os.IsNotExist(err) {
// Attempt to read the requested hash from a keep volume.
error_to_caller := NotFoundError
- for _, vol := range KeepVM.Volumes() {
- if buf, err := vol.Get(hash); err != nil {
- // IsNotExist is an expected error and may be ignored.
- // (If all volumes report IsNotExist, we return a NotFoundError)
- // All other errors should be logged but we continue trying to
- // read.
- switch {
- case os.IsNotExist(err):
- continue
- default:
+ var vols []Volume
+ if update_timestamp {
+ // Pointless to find the block on an unwritable volume
+ // because Touch() will fail -- this is as good as
+ // "not found" for purposes of callers who need to
+ // update_timestamp.
+ vols = KeepVM.AllWritable()
+ } else {
+ vols = KeepVM.AllReadable()
+ }
+
+ for _, vol := range vols {
+ buf, err := vol.Get(hash)
+ if err != nil {
+ // IsNotExist is an expected error and may be
+ // ignored. All other errors are logged. In
+ // any case we continue trying to read other
+ // volumes. If all volumes report IsNotExist,
+ // we return a NotFoundError.
+ if !os.IsNotExist(err) {
log.Printf("GetBlock: reading %s: %s\n", hash, err)
}
- } else {
- // Double check the file checksum.
- //
- filehash := fmt.Sprintf("%x", md5.Sum(buf))
- if filehash != hash {
- // TODO(twp): this condition probably represents a bad disk and
- // should raise major alarm bells for an administrator: e.g.
- // they should be sent directly to an event manager at high
- // priority or logged as urgent problems.
- //
- log.Printf("%s: checksum mismatch for request %s (actual %s)\n",
- vol, hash, filehash)
- error_to_caller = DiskHashError
- } else {
- // Success!
- if error_to_caller != NotFoundError {
- log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned\n",
- vol, hash)
- }
- // Update the timestamp if the caller requested.
- // If we could not update the timestamp, continue looking on
- // other volumes.
- if update_timestamp {
- if vol.Touch(hash) != nil {
- continue
- }
- }
- return buf, nil
+ continue
+ }
+ // Check the file checksum.
+ //
+ filehash := fmt.Sprintf("%x", md5.Sum(buf))
+ if filehash != hash {
+ // TODO: Try harder to tell a sysadmin about
+ // this.
+ log.Printf("%s: checksum mismatch for request %s (actual %s)\n",
+ vol, hash, filehash)
+ error_to_caller = DiskHashError
+ continue
+ }
+ if error_to_caller == DiskHashError {
+ log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
+ vol, hash)
+ }
+ if update_timestamp {
+ if err := vol.Touch(hash); err != nil {
+ error_to_caller = GenericError
+ log.Printf("%s: Touch %s failed: %s",
+ vol, hash, error_to_caller)
+ continue
}
}
- }
-
- if error_to_caller != NotFoundError {
- log.Printf("%s: checksum mismatch, no good copy found\n", hash)
+ return buf, nil
}
return nil, error_to_caller
}
// Choose a Keep volume to write to.
// If this volume fails, try all of the volumes in order.
- vol := KeepVM.Choose()
- if err := vol.Put(hash, block); err == nil {
- return nil // success!
- } else {
- allFull := true
- for _, vol := range KeepVM.Volumes() {
- err := vol.Put(hash, block)
- if err == nil {
- return nil // success!
- }
- if err != FullError {
- // The volume is not full but the write did not succeed.
- // Report the error and continue trying.
- allFull = false
- log.Printf("%s: Write(%s): %s\n", vol, hash, err)
- }
+ if vol := KeepVM.NextWritable(); vol != nil {
+ if err := vol.Put(hash, block); err == nil {
+ return nil // success!
}
+ }
- if allFull {
- log.Printf("all Keep volumes full")
- return FullError
- } else {
- log.Printf("all Keep volumes failed")
- return GenericError
+ writables := KeepVM.AllWritable()
+ if len(writables) == 0 {
+ log.Print("No writable volumes.")
+ return FullError
+ }
+
+ allFull := true
+ for _, vol := range writables {
+ err := vol.Put(hash, block)
+ if err == nil {
+ return nil // success!
}
+ if err != FullError {
+ // The volume is not full but the
+ // write did not succeed. Report the
+ // error and continue trying.
+ allFull = false
+ log.Printf("%s: Write(%s): %s\n", vol, hash, err)
+ }
+ }
+
+ if allFull {
+ log.Print("All volumes are full.")
+ return FullError
+ } else {
+ // Already logged the non-full errors.
+ return GenericError
}
}
package main
import (
+ "bufio"
"bytes"
+ "errors"
"flag"
"fmt"
"git.curoverse.com/arvados.git/sdk/go/keepclient"
var pullq *WorkQueue
var trashq *WorkQueue
+var (
+ flagSerializeIO bool
+ flagReadonly bool
+)
+type volumeSet []Volume
+
+func (vs *volumeSet) Set(value string) error {
+ if dirs := strings.Split(value, ","); len(dirs) > 1 {
+ log.Print("DEPRECATED: using comma-separated volume list.")
+ for _, dir := range dirs {
+ if err := vs.Set(dir); err != nil {
+ return err
+ }
+ }
+ return nil
+ }
+ if len(value) == 0 || value[0] != '/' {
+ return errors.New("Invalid volume: must begin with '/'.")
+ }
+ if _, err := os.Stat(value); err != nil {
+ return err
+ }
+ *vs = append(*vs, MakeUnixVolume(value, flagSerializeIO, flagReadonly))
+ return nil
+}
+
+func (vs *volumeSet) String() string {
+ s := "["
+ for i, v := range *vs {
+ if i > 0 {
+ s = s + " "
+ }
+ s = s + v.String()
+ }
+ return s + "]"
+}
+
+// Discover adds a volume for every directory named "keep" that is
+// located at the top level of a device- or tmpfs-backed mount point
+// other than "/". It returns the number of volumes added.
+func (vs *volumeSet) Discover() int {
+ added := 0
+ f, err := os.Open(PROC_MOUNTS)
+ if err != nil {
+ log.Fatalf("opening %s: %s", PROC_MOUNTS, err)
+ }
+ scanner := bufio.NewScanner(f)
+ for scanner.Scan() {
+ args := strings.Fields(scanner.Text())
+ if err := scanner.Err(); err != nil {
+ log.Fatalf("reading %s: %s", PROC_MOUNTS, err)
+ }
+ dev, mount := args[0], args[1]
+ if mount == "/" {
+ continue
+ }
+ if dev != "tmpfs" && !strings.HasPrefix(dev, "/dev/") {
+ continue
+ }
+ keepdir := mount + "/keep"
+ if st, err := os.Stat(keepdir); err != nil || !st.IsDir() {
+ continue
+ }
+ // Set the -readonly flag (but only for this volume)
+ // if the filesystem is mounted readonly.
+ flagReadonlyWas := flagReadonly
+ for _, fsopt := range strings.Split(args[3], ",") {
+ if fsopt == "ro" {
+ flagReadonly = true
+ break
+ }
+ if fsopt == "rw" {
+ break
+ }
+ }
+ vs.Set(keepdir)
+ flagReadonly = flagReadonlyWas
+ added++
+ }
+ return added
+}
+
// TODO(twp): continue moving as much code as possible out of main
// so it can be effectively tested. Esp. handling and postprocessing
// of command line flags (identifying Keep volumes and initializing
func main() {
log.Println("Keep started: pid", os.Getpid())
- // Parse command-line flags:
- //
- // -listen=ipaddr:port
- // Interface on which to listen for requests. Use :port without
- // an ipaddr to listen on all network interfaces.
- // Examples:
- // -listen=127.0.0.1:4949
- // -listen=10.0.1.24:8000
- // -listen=:25107 (to listen to port 25107 on all interfaces)
- //
- // -volumes
- // A comma-separated list of directories to use as Keep volumes.
- // Example:
- // -volumes=/var/keep01,/var/keep02,/var/keep03/subdir
- //
- // If -volumes is empty or is not present, Keep will select volumes
- // by looking at currently mounted filesystems for /keep top-level
- // directories.
-
var (
data_manager_token_file string
listen string
permission_key_file string
permission_ttl_sec int
- serialize_io bool
- volumearg string
+ volumes volumeSet
pidfile string
)
flag.StringVar(
&listen,
"listen",
DEFAULT_ADDR,
- "Interface on which to listen for requests, in the format "+
- "ipaddr:port. e.g. -listen=10.0.1.24:8000. Use -listen=:port "+
- "to listen on all network interfaces.")
+ "Listening address, in the form \"host:port\". e.g., 10.0.1.24:8000. Omit the host part to listen on all interfaces.")
flag.BoolVar(
&never_delete,
"never-delete",
"Expiration time (in seconds) for newly generated permission "+
"signatures.")
flag.BoolVar(
- &serialize_io,
+ &flagSerializeIO,
"serialize",
false,
- "If set, all read and write operations on local Keep volumes will "+
- "be serialized.")
- flag.StringVar(
- &volumearg,
+ "Serialize read and write operations on the following volumes.")
+ flag.BoolVar(
+ &flagReadonly,
+ "readonly",
+ false,
+ "Do not write, delete, or touch anything on the following volumes.")
+ flag.Var(
+ &volumes,
"volumes",
- "",
- "Comma-separated list of directories to use for Keep volumes, "+
- "e.g. -volumes=/var/keep1,/var/keep2. If empty or not "+
- "supplied, Keep will scan mounted filesystems for volumes "+
- "with a /keep top-level directory.")
-
+ "Deprecated synonym for -volume.")
+ flag.Var(
+ &volumes,
+ "volume",
+ "Local storage directory. Can be given more than once to add multiple directories. If none are supplied, the default is to use all directories named \"keep\" that exist in the top level directory of a mount point at startup time. Can be a comma-separated list, but this is deprecated: use multiple -volume arguments instead.")
flag.StringVar(
&pidfile,
"pid",
flag.Parse()
- // Look for local keep volumes.
- var keepvols []string
- if volumearg == "" {
- // TODO(twp): decide whether this is desirable default behavior.
- // In production we may want to require the admin to specify
- // Keep volumes explicitly.
- keepvols = FindKeepVolumes()
- } else {
- keepvols = strings.Split(volumearg, ",")
- }
-
- // Check that the specified volumes actually exist.
- var goodvols []Volume = nil
- for _, v := range keepvols {
- if _, err := os.Stat(v); err == nil {
- log.Println("adding Keep volume:", v)
- newvol := MakeUnixVolume(v, serialize_io)
- goodvols = append(goodvols, &newvol)
- } else {
- log.Printf("bad Keep volume: %s\n", err)
+ if len(volumes) == 0 {
+ if volumes.Discover() == 0 {
+ log.Fatal("No volumes found.")
}
}
- if len(goodvols) == 0 {
- log.Fatal("could not find any keep volumes")
+ for _, v := range volumes {
+ log.Printf("Using volume %v (writable=%v)", v, v.Writable())
}
// Initialize data manager token and permission key.
}
// Start a round-robin VolumeManager with the volumes we have found.
- KeepVM = MakeRRVolumeManager(goodvols)
+ KeepVM = MakeRRVolumeManager(volumes)
// Tell the built-in HTTP server to direct all requests to the REST router.
loggingRouter := MakeLoggingRESTRouter()
}
// Initialize Pull queue and worker
- keepClient := keepclient.KeepClient{
+ keepClient := &keepclient.KeepClient{
Arvados: nil,
Want_replicas: 1,
Using_proxy: true,
// Prepare two test Keep volumes. Our block is stored on the second volume.
KeepVM = MakeTestVolumeManager(2)
- defer func() { KeepVM.Quit() }()
+ defer KeepVM.Close()
- vols := KeepVM.Volumes()
+ vols := KeepVM.AllReadable()
if err := vols[1].Put(TEST_HASH, TEST_BLOCK); err != nil {
t.Error(err)
}
// Create two empty test Keep volumes.
KeepVM = MakeTestVolumeManager(2)
- defer func() { KeepVM.Quit() }()
+ defer KeepVM.Close()
// Check that GetBlock returns failure.
result, err := GetBlock(TEST_HASH, false)
// Create two test Keep volumes and store a corrupt block in one.
KeepVM = MakeTestVolumeManager(2)
- defer func() { KeepVM.Quit() }()
+ defer KeepVM.Close()
- vols := KeepVM.Volumes()
+ vols := KeepVM.AllReadable()
vols[0].Put(TEST_HASH, BAD_BLOCK)
// Check that GetBlock returns failure.
// Create two test Keep volumes.
KeepVM = MakeTestVolumeManager(2)
- defer func() { KeepVM.Quit() }()
+ defer KeepVM.Close()
// Check that PutBlock stores the data as expected.
if err := PutBlock(TEST_BLOCK, TEST_HASH); err != nil {
t.Fatalf("PutBlock: %v", err)
}
- vols := KeepVM.Volumes()
- result, err := vols[0].Get(TEST_HASH)
+ vols := KeepVM.AllReadable()
+ result, err := vols[1].Get(TEST_HASH)
if err != nil {
t.Fatalf("Volume #0 Get returned error: %v", err)
}
// Create two test Keep volumes, but cripple one of them.
KeepVM = MakeTestVolumeManager(2)
- defer func() { KeepVM.Quit() }()
+ defer KeepVM.Close()
- vols := KeepVM.Volumes()
+ vols := KeepVM.AllWritable()
vols[0].(*MockVolume).Bad = true
// Check that PutBlock stores the data as expected.
// Create two test Keep volumes.
KeepVM = MakeTestVolumeManager(2)
- defer func() { KeepVM.Quit() }()
+ defer KeepVM.Close()
// Check that PutBlock returns the expected error when the hash does
// not match the block.
// Create two test Keep volumes.
KeepVM = MakeTestVolumeManager(2)
- defer func() { KeepVM.Quit() }()
+ defer KeepVM.Close()
// Store a corrupted block under TEST_HASH.
- vols := KeepVM.Volumes()
+ vols := KeepVM.AllWritable()
vols[0].Put(TEST_HASH, BAD_BLOCK)
if err := PutBlock(TEST_BLOCK, TEST_HASH); err != nil {
t.Errorf("PutBlock: %v", err)
// Prepare two test Keep volumes.
KeepVM = MakeTestVolumeManager(2)
- defer func() { KeepVM.Quit() }()
+ defer KeepVM.Close()
// Store one block, then attempt to store the other. Confirm that
// PutBlock reported a CollisionError.
// Prepare two test Keep volumes.
KeepVM = MakeTestVolumeManager(2)
- defer func() { KeepVM.Quit() }()
- vols := KeepVM.Volumes()
+ defer KeepVM.Close()
+ vols := KeepVM.AllWritable()
// Store a block and then make the underlying volume bad,
// so a subsequent attempt to update the file timestamp
}
}
-// ========================================
-// FindKeepVolumes tests.
-// ========================================
-
-// TestFindKeepVolumes
-// Confirms that FindKeepVolumes finds tmpfs volumes with "/keep"
-// directories at the top level.
-//
-func TestFindKeepVolumes(t *testing.T) {
- var tempVols [2]string
+func TestDiscoverTmpfs(t *testing.T) {
+ var tempVols [4]string
var err error
- defer func() {
- for _, path := range tempVols {
- os.RemoveAll(path)
- }
- }()
-
- // Create two directories suitable for using as keep volumes.
+ // Create some directories suitable for using as keep volumes.
for i := range tempVols {
if tempVols[i], err = ioutil.TempDir("", "findvol"); err != nil {
t.Fatal(err)
}
+ defer os.RemoveAll(tempVols[i])
tempVols[i] = tempVols[i] + "/keep"
if err = os.Mkdir(tempVols[i], 0755); err != nil {
t.Fatal(err)
}
// Set up a bogus PROC_MOUNTS file.
- if f, err := ioutil.TempFile("", "keeptest"); err == nil {
- for _, vol := range tempVols {
- fmt.Fprintf(f, "tmpfs %s tmpfs opts\n", path.Dir(vol))
+ f, err := ioutil.TempFile("", "keeptest")
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer os.Remove(f.Name())
+ for i, vol := range tempVols {
+ // Add readonly mount points at odd indexes.
+ var opts string
+ switch i % 2 {
+ case 0:
+ opts = "rw,nosuid,nodev,noexec"
+ case 1:
+ opts = "nosuid,nodev,noexec,ro"
}
- f.Close()
- PROC_MOUNTS = f.Name()
-
- // Check that FindKeepVolumes finds the temp volumes.
- resultVols := FindKeepVolumes()
- if len(tempVols) != len(resultVols) {
- t.Fatalf("set up %d volumes, FindKeepVolumes found %d\n",
- len(tempVols), len(resultVols))
+ fmt.Fprintf(f, "tmpfs %s tmpfs %s 0 0\n", path.Dir(vol), opts)
+ }
+ f.Close()
+ PROC_MOUNTS = f.Name()
+
+ var resultVols volumeSet
+ added := resultVols.Discover()
+
+ if added != len(resultVols) {
+ t.Errorf("Discover returned %d, but added %d volumes",
+ added, len(resultVols))
+ }
+ if added != len(tempVols) {
+ t.Errorf("Discover returned %d but we set up %d volumes",
+ added, len(tempVols))
+ }
+ for i, tmpdir := range tempVols {
+ if tmpdir != resultVols[i].(*UnixVolume).root {
+ t.Errorf("Discover returned %s, expected %s\n",
+ resultVols[i].(*UnixVolume).root, tmpdir)
}
- for i := range tempVols {
- if tempVols[i] != resultVols[i] {
- t.Errorf("FindKeepVolumes returned %s, expected %s\n",
- resultVols[i], tempVols[i])
- }
+ if expectReadonly := i % 2 == 1; expectReadonly != resultVols[i].(*UnixVolume).readonly {
+ t.Errorf("Discover added %s with readonly=%v, should be %v",
+ tmpdir, !expectReadonly, expectReadonly)
}
-
- os.Remove(f.Name())
}
}
-// TestFindKeepVolumesFail
-// When no Keep volumes are present, FindKeepVolumes returns an empty slice.
-//
-func TestFindKeepVolumesFail(t *testing.T) {
+func TestDiscoverNone(t *testing.T) {
defer teardown()
// Set up a bogus PROC_MOUNTS file with no Keep vols.
- if f, err := ioutil.TempFile("", "keeptest"); err == nil {
- fmt.Fprintln(f, "rootfs / rootfs opts 0 0")
- fmt.Fprintln(f, "sysfs /sys sysfs opts 0 0")
- fmt.Fprintln(f, "proc /proc proc opts 0 0")
- fmt.Fprintln(f, "udev /dev devtmpfs opts 0 0")
- fmt.Fprintln(f, "devpts /dev/pts devpts opts 0 0")
- f.Close()
- PROC_MOUNTS = f.Name()
-
- // Check that FindKeepVolumes returns an empty array.
- resultVols := FindKeepVolumes()
- if len(resultVols) != 0 {
- t.Fatalf("FindKeepVolumes returned %v", resultVols)
- }
-
- os.Remove(PROC_MOUNTS)
+ f, err := ioutil.TempFile("", "keeptest")
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer os.Remove(f.Name())
+ fmt.Fprintln(f, "rootfs / rootfs opts 0 0")
+ fmt.Fprintln(f, "sysfs /sys sysfs opts 0 0")
+ fmt.Fprintln(f, "proc /proc proc opts 0 0")
+ fmt.Fprintln(f, "udev /dev devtmpfs opts 0 0")
+ fmt.Fprintln(f, "devpts /dev/pts devpts opts 0 0")
+ f.Close()
+ PROC_MOUNTS = f.Name()
+
+ var resultVols volumeSet
+ added := resultVols.Discover()
+ if added != 0 || len(resultVols) != 0 {
+ t.Fatalf("got %d, %v; expected 0, []", added, resultVols)
}
}
// Include multiple blocks on different volumes, and
// some metadata files.
KeepVM = MakeTestVolumeManager(2)
- defer func() { KeepVM.Quit() }()
+ defer KeepVM.Close()
- vols := KeepVM.Volumes()
+ vols := KeepVM.AllReadable()
vols[0].Put(TEST_HASH, TEST_BLOCK)
vols[1].Put(TEST_HASH_2, TEST_BLOCK_2)
vols[0].Put(TEST_HASH_3, TEST_BLOCK_3)
// Set up test Keep volumes with some blocks.
KeepVM = MakeTestVolumeManager(2)
- defer func() { KeepVM.Quit() }()
+ defer KeepVM.Close()
- vols := KeepVM.Volumes()
+ vols := KeepVM.AllReadable()
vols[0].Put(TEST_HASH, TEST_BLOCK)
vols[1].Put(TEST_HASH_2, TEST_BLOCK_2)
// Helper functions for unit tests.
// ========================================
-// MakeTestVolumeManager
-// Creates and returns a RRVolumeManager with the specified number
-// of MockVolumes.
-//
+// MakeTestVolumeManager returns a RRVolumeManager with the specified
+// number of MockVolumes.
func MakeTestVolumeManager(num_volumes int) VolumeManager {
vols := make([]Volume, num_volumes)
for i := range vols {
return MakeRRVolumeManager(vols)
}
-// teardown
-// Cleanup to perform after each test.
-//
+// teardown cleans up after each test.
func teardown() {
data_manager_token = ""
enforce_permissions = false
Skip the rest of the servers if no errors
Repeat
*/
-func RunPullWorker(pullq *WorkQueue, keepClient keepclient.KeepClient) {
+func RunPullWorker(pullq *WorkQueue, keepClient *keepclient.KeepClient) {
nextItem := pullq.NextItem
for item := range nextItem {
pullRequest := item.(PullRequest)
Using this token & signature, retrieve the given block.
Write to storage
*/
-func PullItemAndProcess(pullRequest PullRequest, token string, keepClient keepclient.KeepClient) (err error) {
+func PullItemAndProcess(pullRequest PullRequest, token string, keepClient *keepclient.KeepClient) (err error) {
keepClient.Arvados.ApiToken = token
service_roots := make(map[string]string)
for _, addr := range pullRequest.Servers {
service_roots[addr] = addr
}
- keepClient.SetServiceRoots(service_roots)
+ keepClient.SetServiceRoots(service_roots, nil)
// Generate signature with a random token
expires_at := time.Now().Add(60 * time.Second)
}
// Fetch the content for the given locator using keepclient.
-var GetContent = func(signedLocator string, keepClient keepclient.KeepClient) (
+var GetContent = func(signedLocator string, keepClient *keepclient.KeepClient) (
reader io.ReadCloser, contentLength int64, url string, err error) {
reader, blocklen, url, err := keepClient.Get(signedLocator)
return reader, blocklen, url, err
"testing"
)
-var keepClient keepclient.KeepClient
+var keepClient *keepclient.KeepClient
type PullWorkIntegrationTestData struct {
Name string
}
// keep client
- keepClient = keepclient.KeepClient{
+ keepClient = &keepclient.KeepClient{
Arvados: &arv,
Want_replicas: 1,
Using_proxy: true,
// discover keep services
var servers []string
- service_roots, err := keepClient.DiscoverKeepServers()
- if err != nil {
+ if err := keepClient.DiscoverKeepServers(); err != nil {
t.Error("Error discovering keep services")
}
- for _, host := range service_roots {
+ for _, host := range keepClient.LocalRoots() {
servers = append(servers, host)
}
// Put content if the test needs it
if wantData {
- keepClient.SetServiceRoots(service_roots)
locator, _, err := keepClient.PutB([]byte(testData.Content))
if err != nil {
t.Errorf("Error putting test data in setup for %s %s %v", testData.Content, locator, err)
testPullLists[testData.name] = testData.response_body
// Override GetContent to mock keepclient Get functionality
- GetContent = func(signedLocator string, keepClient keepclient.KeepClient) (
+ GetContent = func(signedLocator string, keepClient *keepclient.KeepClient) (
reader io.ReadCloser, contentLength int64, url string, err error) {
processedPullLists[testData.name] = testData.response_body
*/
func TrashItem(trashRequest TrashRequest) (err error) {
// Verify if the block is to be deleted based on its Mtime
- for _, volume := range KeepVM.Volumes() {
+ for _, volume := range KeepVM.AllWritable() {
mtime, err := volume.Mtime(trashRequest.Locator)
- if err == nil {
- if trashRequest.BlockMtime == mtime.Unix() {
- currentTime := time.Now().Unix()
- if time.Duration(currentTime-trashRequest.BlockMtime)*time.Second >= permission_ttl {
- err = volume.Delete(trashRequest.Locator)
- }
- }
+ if err != nil || trashRequest.BlockMtime != mtime.Unix() {
+ continue
+ }
+ currentTime := time.Now().Unix()
+ if time.Duration(currentTime-trashRequest.BlockMtime)*time.Second >= permission_ttl {
+ err = volume.Delete(trashRequest.Locator)
}
}
return
// Create Keep Volumes
KeepVM = MakeTestVolumeManager(2)
+ defer KeepVM.Close()
// Put test content
- vols := KeepVM.Volumes()
+ vols := KeepVM.AllWritable()
if testData.CreateData {
vols[0].Put(testData.Locator1, testData.Block1)
vols[0].Put(testData.Locator1+".meta", []byte("metadata"))
if (testData.ExpectLocator1) &&
(testData.Locator1 == testData.Locator2) {
locatorFoundIn := 0
- for _, volume := range KeepVM.Volumes() {
+ for _, volume := range KeepVM.AllReadable() {
if _, err := volume.Get(testData.Locator1); err == nil {
locatorFoundIn = locatorFoundIn + 1
}
// Done
permission_ttl = actual_permission_ttl
trashq.Close()
- KeepVM.Quit()
}
package main
import (
- "errors"
- "fmt"
- "os"
- "strings"
+ "sync/atomic"
"time"
)
Delete(loc string) error
Status() *VolumeStatus
String() string
+ Writable() bool
}
-// MockVolumes are Volumes used to test the Keep front end.
-//
-// If the Bad field is true, this volume should return an error
-// on all writes and puts.
-//
-// The Touchable field signifies whether the Touch method will
-// succeed. Defaults to true. Note that Bad and Touchable are
-// independent: a MockVolume may be set up so that Put fails but Touch
-// works or vice versa.
-//
-// TODO(twp): rename Bad to something more descriptive, e.g. Writable,
-// and make sure that the tests that rely on it are testing the right
-// thing. We may need to simulate Writable, Touchable and Corrupt
-// volumes in different ways.
-//
-type MockVolume struct {
- Store map[string][]byte
- Timestamps map[string]time.Time
- Bad bool
- Touchable bool
-}
-
-func CreateMockVolume() *MockVolume {
- return &MockVolume{
- Store: make(map[string][]byte),
- Timestamps: make(map[string]time.Time),
- Bad: false,
- Touchable: true,
- }
-}
-
-func (v *MockVolume) Get(loc string) ([]byte, error) {
- if v.Bad {
- return nil, errors.New("Bad volume")
- } else if block, ok := v.Store[loc]; ok {
- return block, nil
- }
- return nil, os.ErrNotExist
+// A VolumeManager tells callers which volumes can read, which volumes
+// can write, and on which volume the next write should be attempted.
+type VolumeManager interface {
+ // AllReadable returns all volumes.
+ AllReadable() []Volume
+ // AllWritable returns all volumes that aren't known to be in
+ // a read-only state. (There is no guarantee that a write to
+ // one will succeed, though.)
+ AllWritable() []Volume
+ // NextWritable returns the volume where the next new block
+ // should be written. A VolumeManager can select a volume in
+ // order to distribute activity across spindles, fill up disks
+ // with more free space, etc.
+ NextWritable() Volume
+ // Close shuts down the volume manager cleanly.
+ Close()
}
-func (v *MockVolume) Put(loc string, block []byte) error {
- if v.Bad {
- return errors.New("Bad volume")
- }
- v.Store[loc] = block
- return v.Touch(loc)
+type RRVolumeManager struct {
+ readables []Volume
+ writables []Volume
+ counter uint32
}
-func (v *MockVolume) Touch(loc string) error {
- if v.Touchable {
- v.Timestamps[loc] = time.Now()
- return nil
+func MakeRRVolumeManager(volumes []Volume) *RRVolumeManager {
+ vm := &RRVolumeManager{}
+ for _, v := range volumes {
+ vm.readables = append(vm.readables, v)
+ if v.Writable() {
+ vm.writables = append(vm.writables, v)
+ }
}
- return errors.New("Touch failed")
+ return vm
}
-func (v *MockVolume) Mtime(loc string) (time.Time, error) {
- var mtime time.Time
- var err error
- if v.Bad {
- err = errors.New("Bad volume")
- } else if t, ok := v.Timestamps[loc]; ok {
- mtime = t
- } else {
- err = os.ErrNotExist
- }
- return mtime, err
+func (vm *RRVolumeManager) AllReadable() []Volume {
+ return vm.readables
}
-func (v *MockVolume) Index(prefix string) string {
- var result string
- for loc, block := range v.Store {
- if IsValidLocator(loc) && strings.HasPrefix(loc, prefix) {
- result = result + fmt.Sprintf("%s+%d %d\n",
- loc, len(block), 123456789)
- }
- }
- return result
+func (vm *RRVolumeManager) AllWritable() []Volume {
+ return vm.writables
}
-func (v *MockVolume) Delete(loc string) error {
- if _, ok := v.Store[loc]; ok {
- if time.Since(v.Timestamps[loc]) < permission_ttl {
- return nil
- }
- delete(v.Store, loc)
+func (vm *RRVolumeManager) NextWritable() Volume {
+ if len(vm.writables) == 0 {
return nil
}
- return os.ErrNotExist
-}
-
-func (v *MockVolume) Status() *VolumeStatus {
- var used uint64
- for _, block := range v.Store {
- used = used + uint64(len(block))
- }
- return &VolumeStatus{"/bogo", 123, 1000000 - used, used}
-}
-
-func (v *MockVolume) String() string {
- return "[MockVolume]"
-}
-
-// A VolumeManager manages a collection of volumes.
-//
-// - Volumes is a slice of available Volumes.
-// - Choose() returns a Volume suitable for writing to.
-// - Quit() instructs the VolumeManager to shut down gracefully.
-//
-type VolumeManager interface {
- Volumes() []Volume
- Choose() Volume
- Quit()
-}
-
-type RRVolumeManager struct {
- volumes []Volume
- nextwrite chan Volume
- quit chan int
-}
-
-func MakeRRVolumeManager(vols []Volume) *RRVolumeManager {
- // Create a new VolumeManager struct with the specified volumes,
- // and with new Nextwrite and Quit channels.
- // The Quit channel is buffered with a capacity of 1 so that
- // another routine may write to it without blocking.
- vm := &RRVolumeManager{vols, make(chan Volume), make(chan int, 1)}
-
- // This goroutine implements round-robin volume selection.
- // It sends each available Volume in turn to the Nextwrite
- // channel, until receiving a notification on the Quit channel
- // that it should terminate.
- go func() {
- var i int = 0
- for {
- select {
- case <-vm.quit:
- return
- case vm.nextwrite <- vm.volumes[i]:
- i = (i + 1) % len(vm.volumes)
- }
- }
- }()
-
- return vm
-}
-
-func (vm *RRVolumeManager) Volumes() []Volume {
- return vm.volumes
-}
-
-func (vm *RRVolumeManager) Choose() Volume {
- return <-vm.nextwrite
+ i := atomic.AddUint32(&vm.counter, 1)
+ return vm.writables[i % uint32(len(vm.writables))]
}
-func (vm *RRVolumeManager) Quit() {
- vm.quit <- 1
+func (vm *RRVolumeManager) Close() {
}
--- /dev/null
+package main
+
+import (
+ "errors"
+ "fmt"
+ "os"
+ "strings"
+ "sync"
+ "time"
+)
+
+// MockVolumes are test doubles for Volumes, used to test handlers.
+type MockVolume struct {
+ Store map[string][]byte
+ Timestamps map[string]time.Time
+ // Bad volumes return an error for every operation.
+ Bad bool
+ // Touchable volumes' Touch() method succeeds for a locator
+ // that has been Put().
+ Touchable bool
+ // Readonly volumes return an error for Put, Delete, and
+ // Touch.
+ Readonly bool
+ called map[string]int
+ mutex sync.Mutex
+}
+
+// CreateMockVolume returns a non-Bad, non-Readonly, Touchable mock
+// volume.
+func CreateMockVolume() *MockVolume {
+ return &MockVolume{
+ Store: make(map[string][]byte),
+ Timestamps: make(map[string]time.Time),
+ Bad: false,
+ Touchable: true,
+ Readonly: false,
+ called: map[string]int{},
+ }
+}
+
+// CallCount returns how many times the named method has been called.
+func (v *MockVolume) CallCount(method string) int {
+ v.mutex.Lock()
+ defer v.mutex.Unlock()
+ if c, ok := v.called[method]; !ok {
+ return 0
+ } else {
+ return c
+ }
+}
+
+func (v *MockVolume) gotCall(method string) {
+ v.mutex.Lock()
+ defer v.mutex.Unlock()
+ if _, ok := v.called[method]; !ok {
+ v.called[method] = 1
+ } else {
+ v.called[method]++
+ }
+}
+
+func (v *MockVolume) Get(loc string) ([]byte, error) {
+ v.gotCall("Get")
+ if v.Bad {
+ return nil, errors.New("Bad volume")
+ } else if block, ok := v.Store[loc]; ok {
+ return block, nil
+ }
+ return nil, os.ErrNotExist
+}
+
+func (v *MockVolume) Put(loc string, block []byte) error {
+ v.gotCall("Put")
+ if v.Bad {
+ return errors.New("Bad volume")
+ }
+ if v.Readonly {
+ return MethodDisabledError
+ }
+ v.Store[loc] = block
+ return v.Touch(loc)
+}
+
+func (v *MockVolume) Touch(loc string) error {
+ v.gotCall("Touch")
+ if v.Readonly {
+ return MethodDisabledError
+ }
+ if v.Touchable {
+ v.Timestamps[loc] = time.Now()
+ return nil
+ }
+ return errors.New("Touch failed")
+}
+
+func (v *MockVolume) Mtime(loc string) (time.Time, error) {
+ v.gotCall("Mtime")
+ var mtime time.Time
+ var err error
+ if v.Bad {
+ err = errors.New("Bad volume")
+ } else if t, ok := v.Timestamps[loc]; ok {
+ mtime = t
+ } else {
+ err = os.ErrNotExist
+ }
+ return mtime, err
+}
+
+func (v *MockVolume) Index(prefix string) string {
+ v.gotCall("Index")
+ var result string
+ for loc, block := range v.Store {
+ if IsValidLocator(loc) && strings.HasPrefix(loc, prefix) {
+ result = result + fmt.Sprintf("%s+%d %d\n",
+ loc, len(block), 123456789)
+ }
+ }
+ return result
+}
+
+func (v *MockVolume) Delete(loc string) error {
+ v.gotCall("Delete")
+ if v.Readonly {
+ return MethodDisabledError
+ }
+ if _, ok := v.Store[loc]; ok {
+ if time.Since(v.Timestamps[loc]) < permission_ttl {
+ return nil
+ }
+ delete(v.Store, loc)
+ return nil
+ }
+ return os.ErrNotExist
+}
+
+func (v *MockVolume) Status() *VolumeStatus {
+ var used uint64
+ for _, block := range v.Store {
+ used = used + uint64(len(block))
+ }
+ return &VolumeStatus{"/bogo", 123, 1000000 - used, used}
+}
+
+func (v *MockVolume) String() string {
+ return "[MockVolume]"
+}
+
+func (v *MockVolume) Writable() bool {
+ return !v.Readonly
+}
// request.
//
type UnixVolume struct {
- root string // path to this volume
- queue chan *IORequest
+ root string // path to this volume
+ queue chan *IORequest
+ readonly bool
}
func (v *UnixVolume) IOHandler() {
}
}
-func MakeUnixVolume(root string, serialize bool) (v UnixVolume) {
+func MakeUnixVolume(root string, serialize bool, readonly bool) *UnixVolume {
+ v := &UnixVolume{
+ root: root,
+ queue: nil,
+ readonly: readonly,
+ }
if serialize {
- v = UnixVolume{root, make(chan *IORequest)}
+ v.queue =make(chan *IORequest)
go v.IOHandler()
- } else {
- v = UnixVolume{root, nil}
}
- return
+ return v
}
func (v *UnixVolume) Get(loc string) ([]byte, error) {
}
func (v *UnixVolume) Put(loc string, block []byte) error {
+ if v.readonly {
+ return MethodDisabledError
+ }
if v.queue == nil {
return v.Write(loc, block)
}
}
func (v *UnixVolume) Touch(loc string) error {
+ if v.readonly {
+ return MethodDisabledError
+ }
p := v.blockPath(loc)
f, err := os.OpenFile(p, os.O_RDWR|os.O_APPEND, 0644)
if err != nil {
// Delete() will read the correct up-to-date timestamp and choose not to
// delete the file.
+ if v.readonly {
+ return MethodDisabledError
+ }
p := v.blockPath(loc)
f, err := os.OpenFile(p, os.O_RDWR|os.O_APPEND, 0644)
if err != nil {
return fmt.Sprintf("[UnixVolume %s]", v.root)
}
+func (v *UnixVolume) Writable() bool {
+ return !v.readonly
+}
+
// lockfile and unlockfile use flock(2) to manage kernel file locks.
func lockfile(f *os.File) error {
return syscall.Flock(int(f.Fd()), syscall.LOCK_EX)
"time"
)
-func TempUnixVolume(t *testing.T, serialize bool) UnixVolume {
+func TempUnixVolume(t *testing.T, serialize bool, readonly bool) *UnixVolume {
d, err := ioutil.TempDir("", "volume_test")
if err != nil {
t.Fatal(err)
}
- return MakeUnixVolume(d, serialize)
+ return MakeUnixVolume(d, serialize, readonly)
}
-func _teardown(v UnixVolume) {
+func _teardown(v *UnixVolume) {
if v.queue != nil {
close(v.queue)
}
// store writes a Keep block directly into a UnixVolume, for testing
// UnixVolume methods.
//
-func _store(t *testing.T, vol UnixVolume, filename string, block []byte) {
+func _store(t *testing.T, vol *UnixVolume, filename string, block []byte) {
blockdir := fmt.Sprintf("%s/%s", vol.root, filename[:3])
if err := os.MkdirAll(blockdir, 0755); err != nil {
t.Fatal(err)
}
func TestGet(t *testing.T) {
- v := TempUnixVolume(t, false)
+ v := TempUnixVolume(t, false, false)
defer _teardown(v)
_store(t, v, TEST_HASH, TEST_BLOCK)
}
func TestGetNotFound(t *testing.T) {
- v := TempUnixVolume(t, false)
+ v := TempUnixVolume(t, false, false)
defer _teardown(v)
_store(t, v, TEST_HASH, TEST_BLOCK)
}
func TestPut(t *testing.T) {
- v := TempUnixVolume(t, false)
+ v := TempUnixVolume(t, false, false)
defer _teardown(v)
err := v.Put(TEST_HASH, TEST_BLOCK)
}
func TestPutBadVolume(t *testing.T) {
- v := TempUnixVolume(t, false)
+ v := TempUnixVolume(t, false, false)
defer _teardown(v)
os.Chmod(v.root, 000)
}
}
+func TestUnixVolumeReadonly(t *testing.T) {
+ v := TempUnixVolume(t, false, false)
+ defer _teardown(v)
+
+ // First write something before marking readonly
+ err := v.Put(TEST_HASH, TEST_BLOCK)
+ if err != nil {
+ t.Error("got err %v, expected nil", err)
+ }
+
+ v.readonly = true
+
+ _, err = v.Get(TEST_HASH)
+ if err != nil {
+ t.Error("got err %v, expected nil", err)
+ }
+
+ err = v.Put(TEST_HASH, TEST_BLOCK)
+ if err != MethodDisabledError {
+ t.Error("got err %v, expected MethodDisabledError", err)
+ }
+
+ err = v.Touch(TEST_HASH)
+ if err != MethodDisabledError {
+ t.Error("got err %v, expected MethodDisabledError", err)
+ }
+
+ err = v.Delete(TEST_HASH)
+ if err != MethodDisabledError {
+ t.Error("got err %v, expected MethodDisabledError", err)
+ }
+}
+
// TestPutTouch
// Test that when applying PUT to a block that already exists,
// the block's modification time is updated.
func TestPutTouch(t *testing.T) {
- v := TempUnixVolume(t, false)
+ v := TempUnixVolume(t, false, false)
defer _teardown(v)
if err := v.Put(TEST_HASH, TEST_BLOCK); err != nil {
//
func TestGetSerialized(t *testing.T) {
// Create a volume with I/O serialization enabled.
- v := TempUnixVolume(t, true)
+ v := TempUnixVolume(t, true, false)
defer _teardown(v)
_store(t, v, TEST_HASH, TEST_BLOCK)
func TestPutSerialized(t *testing.T) {
// Create a volume with I/O serialization enabled.
- v := TempUnixVolume(t, true)
+ v := TempUnixVolume(t, true, false)
defer _teardown(v)
sem := make(chan int)
}
func TestIsFull(t *testing.T) {
- v := TempUnixVolume(t, false)
+ v := TempUnixVolume(t, false, false)
defer _teardown(v)
full_path := v.root + "/full"
else:
self._later.prepare_arvados_node(arvados_node)
- @ComputeNodeStateChangeBase._retry()
+ @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
def create_arvados_node(self):
self.arvados_node = self._arvados.nodes().create(body={}).execute()
self._later.create_cloud_node()
- @ComputeNodeStateChangeBase._retry()
+ @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
def prepare_arvados_node(self, node):
self.arvados_node = self._arvados.nodes().update(
uuid=node['uuid'],
self._finished()
def stop_if_no_cloud_node(self):
- if self.cloud_node is None:
- self.stop()
+ if self.cloud_node is not None:
+ return False
+ self.stop()
+ return True
class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
if (nodes_excess < 1) or not self.booting:
return None
for key, node in self.booting.iteritems():
- node.stop_if_no_cloud_node().get()
- if not node.actor_ref.is_alive():
+ if node.stop_if_no_cloud_node().get():
del self.booting[key]
if nodes_excess > 1:
self._later.stop_booting_node()
def shutdown(self):
self._logger.info("Shutting down after signal.")
self.poll_stale_after = -1 # Inhibit starting/stopping nodes
- for bootnode in self.booting.itervalues():
- bootnode.stop_if_no_cloud_node()
+ setup_stops = {key: node.stop_if_no_cloud_node()
+ for key, node in self.booting.iteritems()}
+ self.booting = {key: self.booting[key]
+ for key in setup_stops if not setup_stops[key].get()}
self._later.await_shutdown()
def await_shutdown(self):
- if any(node.actor_ref.is_alive() for node in self.booting.itervalues()):
+ if self.booting:
self._timer.schedule(time.time() + 1, self._later.await_shutdown)
else:
self.stop()
self.make_mocks(
arverror.ApiError(httplib2.Response({'status': '500'}), ""))
self.make_actor()
- self.setup_actor.stop_if_no_cloud_node()
+ self.assertTrue(
+ self.setup_actor.stop_if_no_cloud_node().get(self.TIMEOUT))
self.assertTrue(
self.setup_actor.actor_ref.actor_stopped.wait(self.TIMEOUT))
def test_no_stop_when_cloud_node(self):
self.make_actor()
self.wait_for_assignment(self.setup_actor, 'cloud_node')
- self.setup_actor.stop_if_no_cloud_node().get(self.TIMEOUT)
+ self.assertFalse(
+ self.setup_actor.stop_if_no_cloud_node().get(self.TIMEOUT))
self.assertTrue(self.stop_proxy(self.setup_actor),
"actor was stopped by stop_if_no_cloud_node")
self.stop_proxy(self.daemon)
self.assertTrue(self.last_setup.stop_if_no_cloud_node.called)
+ def test_all_booting_nodes_tried_to_shut_down(self):
+ size = testutil.MockSize(2)
+ self.make_daemon(want_sizes=[size])
+ self.daemon.max_nodes.get(self.TIMEOUT)
+ setup1 = self.last_setup
+ setup1.stop_if_no_cloud_node().get.return_value = False
+ setup1.stop_if_no_cloud_node.reset_mock()
+ self.daemon.update_server_wishlist([size, size]).get(self.TIMEOUT)
+ self.daemon.max_nodes.get(self.TIMEOUT)
+ self.assertIsNot(setup1, self.last_setup)
+ self.last_setup.stop_if_no_cloud_node().get.return_value = True
+ self.last_setup.stop_if_no_cloud_node.reset_mock()
+ self.daemon.update_server_wishlist([]).get(self.TIMEOUT)
+ self.daemon.max_nodes.get(self.TIMEOUT)
+ self.stop_proxy(self.daemon)
+ self.assertEqual(1, self.last_setup.stop_if_no_cloud_node.call_count)
+ self.assertTrue(setup1.stop_if_no_cloud_node.called)
+
def test_shutdown_declined_at_wishlist_capacity(self):
cloud_node = testutil.cloud_node_mock(1)
size = testutil.MockSize(1)
def test_clean_shutdown_waits_for_node_setup_finish(self):
new_node = self.start_node_boot()
+ new_node.stop_if_no_cloud_node().get.return_value = False
+ new_node.stop_if_no_cloud_node.reset_mock()
self.daemon.shutdown().get(self.TIMEOUT)
self.assertTrue(new_node.stop_if_no_cloud_node.called)
self.daemon.node_up(new_node).get(self.TIMEOUT)
self.daemon.actor_ref.actor_stopped.wait(self.TIMEOUT))
def test_wishlist_ignored_after_shutdown(self):
- size = testutil.MockSize(2)
- self.make_daemon(want_sizes=[size])
+ new_node = self.start_node_boot()
+ new_node.stop_if_no_cloud_node().get.return_value = False
+ new_node.stop_if_no_cloud_node.reset_mock()
self.daemon.shutdown().get(self.TIMEOUT)
+ size = testutil.MockSize(2)
self.daemon.update_server_wishlist([size] * 2).get(self.TIMEOUT)
self.timer.deliver()
self.stop_proxy(self.daemon)