source 'https://rubygems.org'
gem 'rails', '~> 4.1.0'
-gem 'arvados', '>= 0.1.20150413172135'
-
-gem 'sqlite3'
+gem 'arvados', '>= 0.1.20150511150219'
+gem 'activerecord-nulldb-adapter'
gem 'multi_json'
gem 'oj'
gem 'sass'
activemodel (= 4.1.9)
activesupport (= 4.1.9)
arel (~> 5.0.0)
+ activerecord-nulldb-adapter (0.3.1)
+ activerecord (>= 2.0.0)
activesupport (4.1.9)
i18n (~> 0.6, >= 0.6.9)
json (~> 1.7, >= 1.7.7)
andand (1.3.3)
angularjs-rails (1.3.8)
arel (5.0.1.20140414130214)
- arvados (0.1.20150413172135)
+ arvados (0.1.20150511150219)
activesupport (>= 3.2.13)
andand (~> 1.3, >= 1.3.3)
google-api-client (~> 0.6.3, >= 0.6.3)
actionpack (>= 3.0)
activesupport (>= 3.0)
sprockets (>= 2.8, < 4.0)
- sqlite3 (1.3.10)
sshkey (1.6.1)
therubyracer (0.12.1)
libv8 (~> 3.16.14.0)
DEPENDENCIES
RedCloth
+ activerecord-nulldb-adapter
andand
angularjs-rails
- arvados (>= 0.1.20150413172135)
+ arvados (>= 0.1.20150511150219)
bootstrap-sass (~> 3.1.0)
bootstrap-tab-history-rails
bootstrap-x-editable-rails
selenium-webdriver
simplecov (~> 0.7)
simplecov-rcov
- sqlite3
sshkey
themes_for_rails!
therubyracer
+// infinite_scroll.js displays a tab's content using automatic scrolling
+// when the user scrolls to the bottom of the page and there is more data.
+//
+// Usage:
+//
+// 1. Adding infinite scrolling to a tab pane using "show" method
+//
+// The steps below describe adding scrolling to the project#show action.
+//
+// a. In the "app/views/projects/" folder add a file for your tab
+// (ex: _show_jobs_and_pipelines.html.erb)
+// In this file, add a div or tbody with data-infinite-scroller.
+// Note: This page uses _show_tab_contents.html.erb so that
+// several tabs can reuse this implementation.
+// Also add the filters to be used for loading the tab content.
+//
+// b. Add a file named "_show_contents_rows.html.erb" that loads
+// the data (by invoking get_objects_and_names from the controller).
+//
+// c. In the "app/controllers/projects_controller.rb,
+// Update the show method to add a block for "params[:partial]"
+// that loads the show_contents_rows partial.
+// Optionally, add a "tab_counts" method that loads the total number
+// of objects count to be displayed for this tab.
+//
+// 2. Adding infinite scrolling to the "Recent" tab in "index" page
+// The steps below describe adding scrolling to the pipeline_instances index page.
+//
+// a. In the "app/views/pipeline_instances/_show_recent.html.erb/" file
+// add a div or tbody with data-infinite-scroller.
+//
+// b. Add the partial "_show_recent_rows.html.erb" that displays the
+// page contents on scroll using the @objects
+
function maybe_load_more_content(event) {
var scroller = this;
var $container = $(event.data.container);
on('click', dispatch_selection_action);
$(this).trigger('selections-updated');
});
+
+function select_all_items() {
+ $(".arv-selectable-items :checkbox").filter(":visible").prop("checked", true).trigger("change");
+}
+
+function unselect_all_items() {
+ $(".arv-selectable-items :checkbox").filter(":visible").prop("checked", false).trigger("change");
+}
.hover-dropdown:hover .dropdown-menu {
display: block;
}
+
+.arv-description-as-subtitle .editable-inline,
+.arv-description-as-subtitle .editable-inline .form-group,
+.arv-description-as-subtitle .editable-inline .form-group .editable-input,
+.arv-description-as-subtitle .editable-inline .form-group .editable-input textarea,
+{
+ width: 98%!important;
+}
if current_user
if Keep::Locator.parse params["uuid"]
- @same_pdh = Collection.filter([["portable_data_hash", "=", @object.portable_data_hash]])
+ @same_pdh = Collection.filter([["portable_data_hash", "=", @object.portable_data_hash]]).limit(20)
if @same_pdh.results.size == 1
redirect_to collection_path(@same_pdh[0]["uuid"])
return
owners = @same_pdh.map(&:owner_uuid).to_a.uniq
preload_objects_for_dataclass Group, owners
preload_objects_for_dataclass User, owners
+ uuids = @same_pdh.map(&:uuid).to_a.uniq
+ preload_links_for_objects uuids
render 'hash_matches'
return
else
'show' == ctrl.action_name
}
- include JobsHelper
-
def generate_provenance(jobs)
return if params['tab_pane'] != "Provenance"
value = tv[:value]
elsif tv[:default]
value = tv[:default]
+ else
+ value = ''
end
- if value
+ if value.present?
split = value.split '/'
if CollectionsHelper.match(split[0])
input_pdhs << split[0]
{
:name => 'Subprojects',
:filters => [%w(uuid is_a arvados#group)]
- } if current_user
+ }
pane_list <<
{
:name => 'Other_objects',
item.update_attributes owner_uuid: current_user.uuid
@removed_uuids << item.uuid
rescue ArvadosApiClient::ApiErrorResponseException => e
- if e.message.include? 'collection_owner_uuid_name_unique'
+ if e.message.include? '_owner_uuid_name_unique'
rename_to = item.name + ' removed from ' +
(@object.name ? @object.name : @object.uuid) +
' at ' + Time.now.to_s
panes.delete('Attributes') if !current_user.is_admin
panes
end
+
+ def show_tree
+ @commit = params[:commit]
+ @path = params[:path] || ''
+ @subtree = @object.ls_subtree @commit, @path.chomp('/')
+ end
+
+ def show_blob
+ @commit = params[:commit]
+ @path = params[:path]
+ @blobdata = @object.cat_file @commit, @path
+ end
+
+ def show_commit
+ @commit = params[:commit]
+ end
end
if opts[:no_link] or (resource_class == User && !current_user)
raw(link_name)
else
- (link_to raw(link_name), { controller: resource_class.to_s.tableize, action: 'show', id: ((opts[:name_link].andand.uuid) || link_uuid) }, style_opts) + raw(tags)
+ controller_class = resource_class.to_s.tableize
+ if controller_class.eql?('groups') and object.andand.group_class.eql?('project')
+ controller_class = 'projects'
+ end
+ (link_to raw(link_name), { controller: controller_class, action: 'show', id: ((opts[:name_link].andand.uuid) || link_uuid) }, style_opts) + raw(tags)
end
else
# just return attrvalue if it is not recognizable as an Arvados object or uuid.
end
def link_to_arvados_object_if_readable(attrvalue, link_text_if_not_readable, opts={})
- resource_class = resource_class_for_uuid(attrvalue.split('/')[0]) if attrvalue
+ resource_class = resource_class_for_uuid(attrvalue.split('/')[0]) if attrvalue.is_a?(String)
if !resource_class
return link_to_if_arvados_object attrvalue, opts
end
return_value
end
- def render_editable_attribute(object, attr, attrvalue=nil, htmloptions={})
+ # Render an editable attribute with the attrvalue of the attr.
+ # The htmloptions are added to the editable element's list of attributes.
+ # The nonhtml_options are only used to customize the display of the element.
+ def render_editable_attribute(object, attr, attrvalue=nil, htmloptions={}, nonhtml_options={})
attrvalue = object.send(attr) if attrvalue.nil?
if not object.attribute_editable?(attr)
if attrvalue && attrvalue.length > 0
"id" => span_id,
:class => "editable #{is_textile?( object, attr ) ? 'editable-textile' : ''}"
}.merge(htmloptions).merge(ajax_options)
+
edit_tiptitle = 'edit'
edit_tiptitle = 'Warning: do not use hyphens in the repository name as they will be stripped' if (object.class.to_s == 'Repository' and attr == 'name')
- edit_button = raw('<a href="#" class="btn btn-xs btn-default btn-nodecorate" data-toggle="x-editable tooltip" data-toggle-selector="#' + span_id + '" data-placement="top" title="' + (htmloptions[:tiptitle] || edit_tiptitle) + '"><i class="fa fa-fw fa-pencil"></i></a>')
- if htmloptions[:btnplacement] == :left
+
+ edit_button = raw('<a href="#" class="btn btn-xs btn-' + (nonhtml_options[:btnclass] || 'default') + ' btn-nodecorate" data-toggle="x-editable tooltip" data-toggle-selector="#' + span_id + '" data-placement="top" title="' + (nonhtml_options[:tiptitle] || edit_tiptitle) + '"><i class="fa fa-fw fa-pencil"></i>' + (nonhtml_options[:btntext] || '') + '</a>')
+
+ if nonhtml_options[:btnplacement] == :left
edit_button + ' ' + span_tag
+ elsif nonhtml_options[:btnplacement] == :top
+ edit_button + raw('<br/>') + span_tag
else
span_tag + ' ' + edit_button
end
success: 'page-refresh'
}.to_json,
})
- is_readable_input = attrvalue.present? and object_readable attrvalue, Collection
+
return content_tag('div', :class => 'input-group') do
html = text_field_tag(dn, display_value,
:class =>
- "form-control #{'required' if required} #{'unreadable-input' if !attrvalue.andand.empty? and !is_readable_input}")
+ "form-control #{'required' if required} #{'unreadable-input' if attrvalue.present? and !object_readable(attrvalue, Collection)}")
html + content_tag('span', :class => 'input-group-btn') do
link_to('Choose',
modal_path,
+++ /dev/null
-module JobsHelper
- def stderr_log_history(job_uuids, limit=2000)
- results = []
-
- log_history = Log.where(event_type: 'stderr',
- object_uuid: job_uuids).limit(limit).order('id DESC')
- if !log_history.results.empty?
- reversed_results = log_history.results.reverse
- reversed_results.each do |entry|
- if entry.andand.properties
- properties = entry.properties
- text = properties[:text]
- if text
- results = results.concat text.split("\n")
- end
- end
- end
- end
- return results
- end
-
-end
+require "arvados/keep"
+
class PipelineInstance < ArvadosBase
attr_accessor :pipeline_template
def textile_attributes
[ 'description' ]
end
+
+ def job_uuids
+ components_map { |cspec| cspec[:job][:uuid] rescue nil }
+ end
+
+ def job_log_ids
+ components_map { |cspec| cspec[:job][:log] rescue nil }
+ end
+
+ def stderr_log_object_uuids
+ result = job_uuids.values.compact
+ result << uuid
+ end
+
+ def stderr_log_query(limit=nil)
+ query = Log.
+ where(event_type: "stderr",
+ object_uuid: stderr_log_object_uuids).
+ order("id DESC")
+ unless limit.nil?
+ query = query.limit(limit)
+ end
+ query
+ end
+
+ def stderr_log_lines(limit=2000)
+ stderr_log_query(limit).results.reverse.
+ flat_map { |log| log.properties[:text].split("\n") rescue [] }
+ end
+
+ def has_readable_logs?
+ log_pdhs, log_uuids = job_log_ids.values.compact.partition do |loc_s|
+ Keep::Locator.parse(loc_s)
+ end
+ if log_pdhs.any? and
+ Collection.where(portable_data_hash: log_pdhs).limit(1).results.any?
+ true
+ elsif log_uuids.any? and
+ Collection.where(uuid: log_uuids).limit(1).results.any?
+ true
+ else
+ stderr_log_query(1).results.any?
+ end
+ end
+
+ private
+
+ def components_map
+ Hash[components.map { |cname, cspec| [cname, yield(cspec)] }]
+ end
end
[]
end
end
+
+ def show commit_sha1
+ refresh
+ run_git 'show', commit_sha1
+ end
+
+ def cat_file commit_sha1, path
+ refresh
+ run_git 'cat-file', 'blob', commit_sha1 + ':' + path
+ end
+
+ def ls_tree_lr commit_sha1
+ refresh
+ run_git 'ls-tree', '-l', '-r', commit_sha1
+ end
+
+ # subtree returns a list of files under the given path at the
+ # specified commit. Results are returned as an array of file nodes,
+ # where each file node is an array [file mode, blob sha1, file size
+ # in bytes, path relative to the given directory]. If the path is
+ # not found, [] is returned.
+ def ls_subtree commit, path
+ path = path.chomp '/'
+ subtree = []
+ ls_tree_lr(commit).each_line do |line|
+ mode, type, sha1, size, filepath = line.split
+ next if type != 'blob'
+ if filepath[0,path.length] == path and
+ (path == '' or filepath[path.length] == '/')
+ subtree << [mode.to_i(8), sha1, size.to_i,
+ filepath[path.length,filepath.length]]
+ end
+ end
+ subtree
+ end
+
+ # http_fetch_url returns the first http:// or https:// url (if any)
+ # in the api response's clone_urls attribute.
+ def http_fetch_url
+ clone_urls.andand.select { |u| /^http/ =~ u }.first
+ end
+
+ protected
+
+ # refresh fetches the latest repository content into the local
+ # cache. It is a no-op if it has already been run on this object:
+ # this (pretty much) avoids doing more than one remote git operation
+ # per Workbench request.
+ def refresh
+ run_git 'fetch', http_fetch_url, '+*:*' unless @fresh
+ @fresh = true
+ end
+
+ # run_git sets up the ARVADOS_API_TOKEN environment variable,
+ # creates a local git directory for this repository if necessary,
+ # executes "git --git-dir localgitdir {args to run_git}", and
+ # returns the output. It raises GitCommandError if git exits
+ # non-zero.
+ def run_git *gitcmd
+ if not @workdir
+ workdir = File.expand_path uuid+'.git', Rails.configuration.repository_cache
+ if not File.exists? workdir
+ FileUtils.mkdir_p Rails.configuration.repository_cache
+ [['git', 'init', '--bare', workdir],
+ ].each do |cmd|
+ system *cmd
+ raise GitCommandError.new($?.to_s) unless $?.exitstatus == 0
+ end
+ end
+ @workdir = workdir
+ end
+ [['git', '--git-dir', @workdir, 'config', '--local',
+ "credential.#{http_fetch_url}.username", 'none'],
+ ['git', '--git-dir', @workdir, 'config', '--local',
+ "credential.#{http_fetch_url}.helper",
+ '!cred(){ cat >/dev/null; if [ "$1" = get ]; then echo password=$ARVADOS_API_TOKEN; fi; };cred'],
+ ['git', '--git-dir', @workdir, 'config', '--local',
+ 'http.sslVerify',
+ Rails.configuration.arvados_insecure_https ? 'false' : 'true'],
+ ].each do |cmd|
+ system *cmd
+ raise GitCommandError.new($?.to_s) unless $?.exitstatus == 0
+ end
+ env = {}.
+ merge(ENV).
+ merge('ARVADOS_API_TOKEN' => Thread.current[:arvados_api_token])
+ cmd = ['git', '--git-dir', @workdir] + gitcmd
+ io = IO.popen(env, cmd, err: [:child, :out])
+ output = io.read
+ io.close
+ # "If [io] is opened by IO.popen, close sets $?." --ruby 2.2.1 docs
+ unless $?.exitstatus == 0
+ raise GitCommandError.new("`git #{gitcmd.join ' '}` #{$?}: #{output}")
+ end
+ output
+ end
+
+ class GitCommandError < StandardError
+ end
end
</div>
<div class="modal-body">
+ <% if params[:message].present? %>
+ <p> <%= params[:message] %> </p>
+ <% end %>
+
<% project_filters, chooser_filters = (params[:filters] || []).partition do |attr, op, val|
attr == "owner_uuid" and op == "="
end %>
link_disabled = "disabled"
end
end
- elsif controller.model_class.to_s == 'PipelineInstance'
- log_uuids = [@object.uuid] + pipeline_jobs(@object).collect{|x|x[:job].andand[:uuid]}.compact
- if stderr_log_history(log_uuids, 1).empty?
- data_toggle = "disabled"
- tab_tooltip = "Log data is not available"
- link_disabled = "disabled"
- end
+ elsif (controller.model_class.to_s == 'PipelineInstance' and
+ !@object.has_readable_logs?)
+ data_toggle = "disabled"
+ tab_tooltip = "Log data is not available"
+ link_disabled = "disabled"
end
end
%>
<%= link_to(send("choose_#{share_class}_path",
title: "Share with #{share_class}",
+ message: "Only #{share_class} you are allowed to access are shown. Please contact your administrator if you need to be added to a specific group.",
by_project: false,
preview_pane: false,
multiple: true,
-<script>
-function select_all_files() {
- $("#collection_files :checkbox").filter(":visible").prop("checked", true).trigger("change");
-}
-
-function unselect_all_files() {
- $("#collection_files :checkbox").filter(":visible").prop("checked", false).trigger("change");
-}
-</script>
-
<%
preview_selectable_container = ''
preview_selectable = ''
</ul>
</div>
<div class="btn-group btn-group-sm">
- <button id="select-all" type="button" class="btn btn-default" onClick="select_all_files()">Select all</button>
- <button id="unselect-all" type="button" class="btn btn-default" onClick="unselect_all_files()">Unselect all</button>
+ <button id="select-all" type="button" class="btn btn-default" onClick="select_all_items()">Select all</button>
+ <button id="unselect-all" type="button" class="btn btn-default" onClick="unselect_all_items()">Unselect all</button>
</div>
</div>
<div class="pull-right">
<% if file_tree.nil? or file_tree.empty? %>
<p>This collection is empty.</p>
<% else %>
- <ul id="collection_files" class="collection_files <%=preview_selectable_container%>">
+ <ul id="collection_files" class="collection_files arv-selectable-items <%=preview_selectable_container%>">
<% dirstack = [file_tree.first.first] %>
<% file_tree.take(10000).each_with_index do |(dirname, filename, size), index| %>
<% file_path = CollectionsHelper::file_path([dirname, filename]) %>
+<%
+ message = "The following collections have this content:"
+ if @same_pdh.items_available > @same_pdh.results.size
+ message += ' (' + (@same_pdh.items_available - @same_pdh.results.size).to_s + ' more results are not shown)'
+ end
+%>
<div class="row">
<div class="col-md-10 col-md-offset-1">
<div class="panel panel-info">
<h3 class="panel-title"><%= params["uuid"] %></h3>
</div>
<div class="panel-body">
- <p><i>The following collections have this content:</i></p>
+ <p><i><%= message %></i></p>
<% @same_pdh.sort { |a,b| b.created_at <=> a.created_at }.each do |c| %>
<div class="row">
<div class="col-md-8">
@media (max-width: 979px) { body { padding-top: 0; } }
@media (max-width: 767px) {
- .breadcrumbs {
- display: none;
- }
+ .breadcrumbs {
+ padding-top: 0;
+ }
}
</style>
<link href="//netdna.bootstrapcdn.com/font-awesome/4.1.0/css/font-awesome.css" rel="stylesheet">
<li class="dropdown notification-menu">
<a href="#" class="dropdown-toggle" data-toggle="dropdown" id="notifications-menu">
<span class="badge badge-alert notification-count"><%= user_notifications.length if user_notifications.any? %></span>
- <%= current_user.email %> <span class="caret"></span>
+ <span class="fa fa-lg fa-user"></span>
+ <span class="caret"></span>
</a>
<ul class="dropdown-menu" role="menu">
+ <li role="presentation" class="dropdown-header">
+ <%= current_user.email %>
+ </li>
<% if current_user.is_active %>
<li role="menuitem"><a href="/projects/<%=current_user.uuid%>" role="menuitem"><i class="fa fa-home fa-fw"></i> Home project </a></li>
<li role="menuitem"><a href="/manage_account" role="menuitem"><i class="fa fa-key fa-fw"></i> Manage account</a></li>
<li class="dropdown">
<a href="#" class="dropdown-toggle" data-toggle="dropdown" id="system-menu">
<span class="fa fa-lg fa-gear"></span>
+ <span class="caret"></span>
</a>
<ul class="dropdown-menu" role="menu">
<li role="presentation" class="dropdown-header">
<li class="dropdown help-menu">
<a href="#" class="dropdown-toggle" data-toggle="dropdown" id="arv-help">
<span class="fa fa-lg fa-question-circle"></span>
+ <span class="caret"></span>
</a>
<ul class="dropdown-menu">
+ <li role="presentation" class="dropdown-header">
+ Help
+ </li>
<li>
<%= link_to raw('<i class="fa fa-fw fa-info"></i> Getting Started ...'), "#",
{'data-toggle' => "modal", 'data-target' => '#getting-started-modal-window'} %>
<div class="row">
<div class="col-md-6">
<table>
+ <% # link to repo tree/file only if the repo is readable
+ # and the commit is a sha1...
+ repo =
+ (/^[0-9a-f]{40}$/ =~ current_component[:script_version] and
+ Repository.where(name: current_component[:repository]).first)
+
+ # ...and the api server provides an http:// or https:// url
+ repo = nil unless repo.andand.http_fetch_url
+ %>
<% [:script, :repository, :script_version, :supplied_script_version, :nondeterministic].each do |k| %>
<tr>
<td style="padding-right: 1em">
<td>
<% if current_component[k].nil? %>
(none)
+ <% elsif repo and k == :repository %>
+ <%= link_to current_component[k], show_repository_tree_path(id: repo.uuid, commit: current_component[:script_version], path: '/') %>
+ <% elsif repo and k == :script %>
+ <%= link_to current_component[k], show_repository_blob_path(id: repo.uuid, commit: current_component[:script_version], path: 'crunch_scripts/'+current_component[:script]) %>
+ <% elsif repo and k == :script_version %>
+ <%= link_to current_component[k], show_repository_commit_path(id: repo.uuid, commit: current_component[:script_version]) %>
<% else %>
<%= current_component[k] %>
<% end %>
-<% log_uuids = [@object.uuid] + pipeline_jobs(@object).collect{|x|x[:job].andand[:uuid]}.compact %>
-<% log_history = stderr_log_history(log_uuids) %>
-<div id="event_log_div"
- class="arv-log-event-listener arv-log-event-handler-append-logs arv-log-event-subscribe-to-pipeline-job-uuids arv-job-log-window"
- data-object-uuids="<%= log_uuids.join(' ') %>"
- ><%= log_history.join("\n") %></div>
-
-<%# Applying a long throttle suppresses the auto-refresh of this
- partial that would normally be triggered by arv-log-event. %>
-<div class="arv-log-refresh-control"
- data-load-throttle="86486400000" <%# 1001 nights %>
- ></div>
+<% log_ids = @object.job_log_ids
+ still_logging, done_logging = log_ids.keys.partition { |k| log_ids[k].nil? }
+%>
+
+<% unless done_logging.empty? %>
+ <table class="topalign table table-condensed table-fixedlayout">
+ <colgroup>
+ <col width="40%" />
+ <col width="60%" />
+ </colgroup>
+ <thead>
+ <tr>
+ <th>finished component</th>
+ <th>job log</th>
+ </tr>
+ </thead>
+ <tbody>
+ <% done_logging.each do |cname| %>
+ <tr>
+ <td><%= cname %></td>
+ <td><%= link_to("Log for #{cname}",
+ {controller: "collections", action: "show", id: log_ids[cname]})
+ %></td>
+ </tr>
+ <% end %>
+ </tbody>
+ </table>
+<% end %>
+
+<% unless still_logging.empty? %>
+ <h4>Logs in progress</h4>
+
+ <div id="event_log_div"
+ class="arv-log-event-listener arv-log-event-handler-append-logs arv-log-event-subscribe-to-pipeline-job-uuids arv-job-log-window"
+ data-object-uuids="<%= @object.stderr_log_object_uuids.join(' ') %>"
+ ><%= @object.stderr_log_lines.join("\n") %></div>
+
+ <%# Applying a long throttle suppresses the auto-refresh of this
+ partial that would normally be triggered by arv-log-event. %>
+ <div class="arv-log-refresh-control"
+ data-load-throttle="86486400000" <%# 1001 nights %>
+ ></div>
+<% end %>
+
<div class="col-sm-4">
<%= render :partial => "show_object_button", :locals => {object: object, size: 'xs'} %>
<% if object.respond_to?(:name) %>
- <%= render_editable_attribute object, 'name', nil, {tiptitle: 'rename'} %>
+ <%= render_editable_attribute object, 'name', nil, {}, {tiptitle: 'rename'} %>
<% else %>
<%= object.class_for_display %> <%= object.uuid %>
<% end %>
<td>
<% if object.respond_to?(:name) %>
- <%= render_editable_attribute (name_link || object), 'name', nil, {tiptitle: 'rename'} %>
+ <%= render_editable_attribute (name_link || object), 'name', nil, {}, {tiptitle: 'rename'} %>
<% end %>
</td>
<% if @object.respond_to? :description %>
<div class="arv-description-as-subtitle">
- <%= render_editable_attribute @object, 'description', nil, { 'data-emptytext' => "(No description provided)", 'data-toggle' => 'manual' } %>
+ <%= render_editable_attribute @object, 'description', nil, { 'data-emptytext' => "(No description provided)", 'data-toggle' => 'manual', 'data-mode' => 'inline', 'data-rows' => 10 }, { btntext: 'Edit', btnclass: 'primary', btnplacement: :top } %>
</div>
<% end %>
<% end %>
</ul>
</div>
+ <div class="btn-group btn-group-sm">
+ <button id="select-all" type="button" class="btn btn-default" onClick="select_all_items()">Select all</button>
+ <button id="unselect-all" type="button" class="btn btn-default" onClick="unselect_all_items()">Unselect all</button>
+ </div>
</div>
<div class="col-sm-4 pull-right">
<input type="text" class="form-control filterable-control" placeholder="Search project contents" data-filterable-target="table.arv-index.arv-project-<%= tab_pane %> tbody"/>
</div>
</div>
- <table class="table table-condensed arv-index arv-project-<%= tab_pane %>">
+ <table class="table table-condensed arv-index arv-selectable-items arv-project-<%= tab_pane %>">
<colgroup>
<col width="0*" style="max-width: fit-content;" />
<col width="0*" style="max-width: fit-content;" />
--- /dev/null
+<div class="pull-right">
+ <span class="deemphasize">Browsing <%= @object.name %> repository at commit</span>
+ <%= link_to(@commit, show_repository_commit_path(id: @object.uuid, commit: @commit), title: 'show commit message') %>
+</div>
+<p>
+ <%= link_to(@object.name, show_repository_tree_path(id: @object.uuid, commit: @commit, path: ''), title: 'show root directory of source tree') %>
+ <% parents = ''
+ (@path || '').split('/').each do |pathpart|
+ parents = parents + pathpart + '/'
+ %>
+ / <%= link_to pathpart, show_repository_tree_path(id: @object.uuid, commit: @commit, path: parents) %>
+ <% end %>
+</p>
--- /dev/null
+<%= render partial: 'repository_breadcrumbs' %>
+
+<% if not @blobdata.valid_encoding? %>
+ <div class="alert alert-warning">
+ <p>
+ This file has an invalid text encoding, so it can't be shown
+ here. (This probably just means it's a binary file, not a text
+ file.)
+ </p>
+ </div>
+<% else %>
+ <pre><%= @blobdata %></pre>
+<% end %>
--- /dev/null
+<%= render partial: 'repository_breadcrumbs' %>
+
+<pre><%= @object.show @commit %></pre>
--- /dev/null
+<%= render partial: 'repository_breadcrumbs' %>
+
+<table class="table table-condensed table-hover">
+ <thead>
+ <tr>
+ <th>File</th>
+ <th class="data-size">Size</th>
+ </tr>
+ </thead>
+ <tbody>
+ <% @subtree.each do |mode, sha1, size, subpath| %>
+ <tr>
+ <td>
+ <span style="opacity: 0.6">
+ <% pathparts = subpath.sub(/^\//, '').split('/')
+ basename = pathparts.pop
+ parents = @path
+ pathparts.each do |pathpart| %>
+ <% parents = parents + '/' + pathpart %>
+ <%= link_to pathpart, url_for(path: parents) %>
+ /
+ <% end %>
+ </span>
+ <%= link_to basename, url_for(action: :show_blob, path: parents + '/' + basename) %>
+ </td>
+ <td class="data-size">
+ <%= human_readable_bytes_html(size) %>
+ </td>
+ </tr>
+ <% end %>
+ <% if @subtree.empty? %>
+ <tr>
+ <td>
+ No files found.
+ </td>
+ </tr>
+ <% end %>
+ </tbody>
+ <tfoot></tfoot>
+</table>
</div>
<div class="modal-footer">
- <button class="btn btn-default" onClick="reset_form()" data-dismiss="modal" aria-hidden="true">Cancel</button>
+ <button type="button" class="btn btn-default" onClick="reset_form()" data-dismiss="modal" aria-hidden="true">Cancel</button>
<button type="submit" class="btn btn-primary" autofocus>Submit</button>
</div>
default_openid_prefix: https://www.google.com/accounts/o8/id
send_user_setup_notification_email: true
- # Set user_profile_form_fields to enable and configure the user profile page.
- # Default is set to false. A commented setting with full description is provided below.
+ # Scratch directory used by the remote repository browsing
+ # feature. If it doesn't exist, it (and any missing parents) will be
+ # created using mkdir_p.
+ repository_cache: <%= File.expand_path 'tmp/git', Rails.root %>
+
+ # Set user_profile_form_fields to enable and configure the user
+ # profile page. Default is set to false. A commented example with
+ # full description is provided below.
user_profile_form_fields: false
# Below is a sample setting of user_profile_form_fields config parameter.
# in the directory where your API server is running.
anonymous_user_token: false
- # Include Accept-Encoding header when making API requests
+ # Enable response payload compression in Arvados API requests.
include_accept_encoding_header_in_api_requests: true
-# SQLite version 3.x
-# gem install sqlite3
-#
-# Ensure the SQLite 3 gem is defined in your Gemfile
-# gem 'sqlite3'
+# Note: The database configuration is not actually used.
development:
- adapter: sqlite3
- database: db/development.sqlite3
- pool: 5
- timeout: 5000
-
-# Warning: The database defined as "test" will be erased and
-# re-generated from your development database when you run "rake".
-# Do not set this db to the same as development or production.
+ adapter: nulldb
test:
- adapter: sqlite3
- database: db/test.sqlite3
- pool: 5
- timeout: 5000
-
+ adapter: nulldb
production:
- adapter: sqlite3
- database: db/production.sqlite3
- pool: 5
- timeout: 5000
-
-# Note: The "diagnostics" database configuration is not actually used.
+ adapter: nulldb
diagnostics:
- adapter: sqlite3
- database: db/diagnostics.sqlite3
- pool: 5
- timeout: 5000
-
-# Note: The "performance" database configuration is not actually used.
+ adapter: nulldb
performance:
- adapter: sqlite3
- database: db/diagnostics.sqlite3
- pool: 5
- timeout: 5000
+ adapter: nulldb
resources :repositories do
post 'share_with', on: :member
end
+ # {format: false} prevents rails from treating "foo.png" as foo?format=png
+ get '/repositories/:id/tree/:commit' => 'repositories#show_tree'
+ get '/repositories/:id/tree/:commit/*path' => 'repositories#show_tree', as: :show_repository_tree, format: false
+ get '/repositories/:id/blob/:commit/*path' => 'repositories#show_blob', as: :show_repository_blob, format: false
+ get '/repositories/:id/commit/:commit' => 'repositories#show_commit', as: :show_repository_commit
match '/logout' => 'sessions#destroy', via: [:get, :post]
get '/logged_out' => 'sessions#index'
resources :users do
assert_equal api_fixture('users', 'subproject_admin')['uuid'], new_specimen.owner_uuid
end
+ # An object which does not offer an expired_at field but has a xx_owner_uuid_name_unique constraint
+ # will be renamed when removed and another object with the same name exists in user's home project.
+ [
+ ['groups', 'subproject_in_asubproject_with_same_name_as_one_in_active_user_home'],
+ ['pipeline_templates', 'template_in_asubproject_with_same_name_as_one_in_active_user_home'],
+ ].each do |dm, fixture|
+ test "removing #{dm} from a subproject results in renaming it when there is another such object with same name in home project" do
+ object = api_fixture(dm, fixture)
+ delete(:remove_item,
+ { id: api_fixture('groups', 'asubproject')['uuid'],
+ item_uuid: object['uuid'],
+ format: 'js' },
+ session_for(:active))
+ assert_response :success
+ assert_match(/\b#{object['uuid']}\b/, @response.body,
+ "removed object not named in response")
+ use_token :active
+ if dm.eql?('groups')
+ found = Group.find(object['uuid'])
+ else
+ found = PipelineTemplate.find(object['uuid'])
+ end
+ assert_equal api_fixture('users', 'active')['uuid'], found.owner_uuid
+ assert_equal true, found.name.include?(object['name'] + ' removed from ')
+ end
+ end
+
test 'projects#show tab infinite scroll partial obeys limit' do
get_contents_rows(limit: 1, filters: [['uuid','is_a',['arvados#job']]])
assert_response :success
require 'test_helper'
+require 'helpers/repository_stub_helper'
require 'helpers/share_object_helper'
class RepositoriesControllerTest < ActionController::TestCase
+ include RepositoryStubHelper
include ShareObjectHelper
[
end
end
end
+
+ ### Browse repository content
+
+ [:active, :spectator].each do |user|
+ test "show tree to #{user}" do
+ reset_api_fixtures_after_test false
+ sha1, _, _ = stub_repo_content
+ get :show_tree, {
+ id: api_fixture('repositories')['foo']['uuid'],
+ commit: sha1,
+ }, session_for(user)
+ assert_response :success
+ assert_select 'tr td a', 'COPYING'
+ assert_select 'tr td', '625 bytes'
+ assert_select 'tr td a', 'apps'
+ assert_select 'tr td a', 'workbench'
+ assert_select 'tr td a', 'Gemfile'
+ assert_select 'tr td', '33.7 KiB'
+ end
+
+ test "show commit to #{user}" do
+ reset_api_fixtures_after_test false
+ sha1, commit, _ = stub_repo_content
+ get :show_commit, {
+ id: api_fixture('repositories')['foo']['uuid'],
+ commit: sha1,
+ }, session_for(user)
+ assert_response :success
+ assert_select 'pre', h(commit)
+ end
+
+ test "show blob to #{user}" do
+ reset_api_fixtures_after_test false
+ sha1, _, filedata = stub_repo_content filename: 'COPYING'
+ get :show_blob, {
+ id: api_fixture('repositories')['foo']['uuid'],
+ commit: sha1,
+ path: 'COPYING',
+ }, session_for(user)
+ assert_response :success
+ assert_select 'pre', h(filedata)
+ end
+ end
+
+ ['', '/'].each do |path|
+ test "show tree with path '#{path}'" do
+ reset_api_fixtures_after_test false
+ sha1, _, _ = stub_repo_content filename: 'COPYING'
+ get :show_tree, {
+ id: api_fixture('repositories')['foo']['uuid'],
+ commit: sha1,
+ path: path,
+ }, session_for(:active)
+ assert_response :success
+ assert_select 'tr td', 'COPYING'
+ end
+ end
end
--- /dev/null
+module RepositoryStubHelper
+ # Supply some fake git content.
+ def stub_repo_content opts={}
+ fakesha1 = opts[:sha1] || 'abcdefabcdefabcdefabcdefabcdefabcdefabcd'
+ fakefilename = opts[:filename] || 'COPYING'
+ fakefilesrc = File.expand_path('../../../../../'+fakefilename, __FILE__)
+ fakefile = File.read fakefilesrc
+ fakecommit = <<-EOS
+ commit abcdefabcdefabcdefabcdefabcdefabcdefabcd
+ Author: Fake R <fake@example.com>
+ Date: Wed Apr 1 11:59:59 2015 -0400
+
+ It's a fake commit.
+
+ EOS
+ Repository.any_instance.stubs(:ls_tree_lr).with(fakesha1).returns <<-EOS
+ 100644 blob eec475862e6ec2a87554e0fca90697e87f441bf5 226 .gitignore
+ 100644 blob acbd7523ed49f01217874965aa3180cccec89d61 625 COPYING
+ 100644 blob d645695673349e3947e8e5ae42332d0ac3164cd7 11358 LICENSE-2.0.txt
+ 100644 blob c7a36c355b4a2b94dfab45c9748330022a788c91 622 README
+ 100644 blob dba13ed2ddf783ee8118c6a581dbf75305f816a3 34520 agpl-3.0.txt
+ 100644 blob 9bef02bbfda670595750fd99a4461005ce5b8f12 695 apps/workbench/.gitignore
+ 100644 blob b51f674d90f68bfb50d9304068f915e42b04aea4 2249 apps/workbench/Gemfile
+ 100644 blob b51f674d90f68bfb50d9304068f915e42b04aea4 2249 apps/workbench/Gemfile
+ 100755 blob cdd5ebaff27781f93ab85e484410c0ce9e97770f 1012 crunch_scripts/hash
+ EOS
+ Repository.any_instance.
+ stubs(:cat_file).with(fakesha1, fakefilename).returns fakefile
+ Repository.any_instance.
+ stubs(:show).with(fakesha1).returns fakecommit
+ return fakesha1, fakecommit, fakefile
+ end
+end
# Otherwise, the not-included assertions might falsely pass because
# the modal hasn't loaded yet.
find(".selectable", text: name).click
+ assert_text "Only #{share_type} you are allowed to access are shown"
assert(has_no_selector?(".modal-dialog-preview-pane"),
"preview pane available in sharing dialog")
if share_type == 'users' and obj and obj['email']
end
within('.navbar-fixed-top') do
assert_selector 'a', text: Rails.configuration.site_name.downcase
- assert_selector 'a', text: "#{user['email']}"
- find('a', text: "#{user['email']}").click
+ assert(page.has_link?("notifications-menu"), 'no user menu')
+ page.find("#notifications-menu").click
within('.dropdown-menu') do
assert_selector 'a', text: 'Log out'
end
assert_selector 'a', text: 'Data collections'
assert_selector 'a', text: 'Jobs and pipelines'
assert_selector 'a', text: 'Pipeline templates'
+ assert_selector 'a', text: 'Subprojects'
assert_selector 'a', text: 'Advanced'
- assert_no_selector 'a', text: 'Subprojects'
assert_no_selector 'a', text: 'Other objects'
assert_no_selector 'button', text: 'Add data'
assert_no_selector 'a', text: 'Run this pipeline'
end
+ test "anonymous user accesses subprojects tab in shared project" do
+ visit PUBLIC_PROJECT + '#Subprojects'
+
+ assert_text 'Subproject in anonymous accessible project'
+
+ within first('tr[data-kind="arvados#group"]') do
+ click_link 'Show'
+ end
+
+ # in subproject
+ assert_text 'Description for subproject in anonymous accessible project'
+ end
+
[
['pipeline_in_publicly_accessible_project', true],
['pipeline_in_publicly_accessible_project_but_other_objects_elsewhere', false],
if pipeline_page
object = api_fixture('pipeline_instances')[fixture]
page = "/pipeline_instances/#{object['uuid']}"
+ expect_log_text = "Log for foo"
else # job
object = api_fixture('jobs')[fixture]
page = "/jobs/#{object['uuid']}"
+ expect_log_text = "stderr crunchstat"
end
if user
click_link 'foo' if pipeline_page
if objects_readable
+ assert_selector 'a[href="#Log"]', text: 'Log'
+ assert_no_selector 'a[data-toggle="disabled"]', text: 'Log'
+ assert_no_text 'Output data not available'
if pipeline_page
assert_text 'This pipeline was created from'
assert_selector 'a', text: object['components']['foo']['job']['uuid']
+ # We'd like to test the Log tab on job pages too, but we can't right
+ # now because Poltergeist 1.x doesn't support JavaScript's
+ # Function.prototype.bind, which is used by job_log_graph.js.
+ click_link "Log"
+ assert_text expect_log_text
end
- assert_no_text 'Output data not available'
- assert_selector 'a[href="#Log"]', text: 'Log'
- assert_no_selector 'a[data-toggle="disabled"]', text: 'Log'
else
+ assert_selector 'a[data-toggle="disabled"]', text: 'Log'
+ assert_text 'Output data not available'
+ assert_text object['job']
if pipeline_page
assert_no_text 'This pipeline was created from' # template is not readable
assert_no_selector 'a', text: object['components']['foo']['job']['uuid']
end
+ click_link "Log"
assert_text 'Output data not available'
- assert_text object['job']
- assert_selector 'a[data-toggle="disabled"]', text: 'Log'
- end
-
- click_link 'Log'
- if objects_readable
- assert_no_text 'foo' # should be in Log tab
- assert_text 'stderr crunchstat' if pipeline_page
- else
- assert_text 'foo' # Log tab disabled and hence still in first tab
- assert_no_text 'stderr crunchstat' # log line shouldn't be seen
+ assert_no_text expect_log_text
end
end
end
else
# my account menu
assert_selector 'a', text: Rails.configuration.site_name.downcase
- assert page.has_link?("#{user['email']}"), 'Not found link - email'
- find('a', text: "#{user['email']}").click
+ assert(page.has_link?("notifications-menu"), 'no user menu')
+ page.find("#notifications-menu").click
within('.dropdown-menu') do
if user['is_active']
assert page.has_no_link?('Not active'), 'Found link - Not active'
require 'integration_helper'
+require_relative 'integration_test_utils'
class CollectionsTest < ActionDispatch::IntegrationTest
setup do
need_javascript
end
- # check_checkboxes_state asserts that the page holds at least one
- # checkbox matching 'selector', and that all matching checkboxes
- # are in state 'checkbox_status' (i.e. checked if true, unchecked otherwise)
- def assert_checkboxes_state(selector, checkbox_status, msg=nil)
- assert page.has_selector?(selector)
- page.all(selector).each do |checkbox|
- assert(checkbox.checked? == checkbox_status, msg)
- end
- end
-
test "Can copy a collection to a project" do
collection_uuid = api_fixture('collections')['foo_file']['uuid']
collection_name = api_fixture('collections')['foo_file']['name']
end
test "Collection portable data hash with multiple matches" do
- pdh = api_fixture('collections')['baz_file']['portable_data_hash']
+ pdh = api_fixture('collections')['foo_file']['portable_data_hash']
visit page_with_token('admin', "/collections/#{pdh}")
matches = api_fixture('collections').select {|k,v| v["portable_data_hash"] == pdh}
matches.each do |k,v|
assert page.has_link?(v["name"]), "Page /collections/#{pdh} should contain link '#{v['name']}'"
end
- assert page.has_no_text?("Activity")
- assert page.has_no_text?("Sharing and permissions")
+ assert_text 'The following collections have this content:'
+ assert_no_text 'more results are not shown'
+ assert_no_text 'Activity'
+ assert_no_text 'Sharing and permissions'
+ end
+
+ test "Collection portable data hash with multiple matches with more than one page of results" do
+ pdh = api_fixture('collections')['baz_file']['portable_data_hash']
+ visit page_with_token('admin', "/collections/#{pdh}")
+
+ assert_selector 'a', text: 'Collection_1'
+
+ assert_text 'The following collections have this content:'
+ assert_text 'more results are not shown'
+ assert_no_text 'Activity'
+ assert_no_text 'Sharing and permissions'
end
test "Filtering collection files by regexp" do
test "error page renders user navigation" do
visit(page_with_token("active", "/collections/#{BAD_UUID}"))
- assert(page.has_text?(api_fixture("users")["active"]["email"]),
+ assert(page.has_link?("notifications-menu"),
"User information missing from error page")
assert(page.has_no_text?(/log ?in/i),
"Logged in user prompted to log in on error page")
test "no user navigation with expired token" do
visit(page_with_token("expired", "/collections/#{BAD_UUID}"))
- assert(page.has_no_text?(api_fixture("users")["active"]["email"]),
+ assert(page.has_no_link?("notifications-menu"),
"Page visited with expired token included user information")
assert(page.has_selector?("a", text: /log ?in/i),
"Login prompt missing on expired token error page")
--- /dev/null
+# This file is used to define methods reusable by two or more integration tests
+#
+
+# check_checkboxes_state asserts that the page holds at least one
+# checkbox matching 'selector', and that all matching checkboxes
+# are in state 'checkbox_status' (i.e. checked if true, unchecked otherwise)
+def assert_checkboxes_state(selector, checkbox_status, msg=nil)
+ assert page.has_selector?(selector)
+ page.all(selector).each do |checkbox|
+ assert(checkbox.checked? == checkbox_status, msg)
+ end
+end
# that the error message says something appropriate for that
# situation.
if expect_options && use_latest
- assert_text "Script version #{job['supplied_script_version']} does not resolve to a commit"
+ assert_text "077ba2ad3ea24a929091a9e6ce545c93199b8e57"
else
assert_text "Script version #{job['script_version']} does not resolve to a commit"
end
visit page_with_token 'active', '/pipeline_instances/' + pi['uuid']
assert_text 'Queued for '
end
+
+ test "job logs linked for running pipeline" do
+ pi = api_fixture("pipeline_instances", "running_pipeline_with_complete_job")
+ visit(page_with_token("active", "/pipeline_instances/#{pi['uuid']}"))
+ click_on "Log"
+ within "#Log" do
+ assert_text "Log for previous"
+ log_link = find("a", text: "Log for previous")
+ assert_includes(log_link[:href],
+ pi["components"]["previous"]["job"]["log"])
+ assert_selector "#event_log_div"
+ end
+ end
+
+ test "job logs linked for complete pipeline" do
+ pi = api_fixture("pipeline_instances", "complete_pipeline_with_two_jobs")
+ visit(page_with_token("active", "/pipeline_instances/#{pi['uuid']}"))
+ click_on "Log"
+ within "#Log" do
+ assert_text "Log for previous"
+ pi["components"].each do |cname, cspec|
+ log_link = find("a", text: "Log for #{cname}")
+ assert_includes(log_link[:href], cspec["job"]["log"])
+ end
+ assert_no_selector "#event_log_div"
+ end
+ end
+
+ test "job logs linked for failed pipeline" do
+ pi = api_fixture("pipeline_instances", "failed_pipeline_with_two_jobs")
+ visit(page_with_token("active", "/pipeline_instances/#{pi['uuid']}"))
+ click_on "Log"
+ within "#Log" do
+ assert_text "Log for previous"
+ pi["components"].each do |cname, cspec|
+ log_link = find("a", text: "Log for #{cname}")
+ assert_includes(log_link[:href], cspec["job"]["log"])
+ end
+ assert_no_selector "#event_log_div"
+ end
+ end
end
require 'integration_helper'
require 'helpers/share_object_helper'
+require_relative 'integration_test_utils'
class ProjectsTest < ActionDispatch::IntegrationTest
include ShareObjectHelper
find("#page-wrapper .nav-tabs :first-child a").click
assert_text("Collection modified at")
end
+
+ # "Select all" and "Unselect all" options
+ test "select all and unselect all actions" do
+ need_selenium 'to check and uncheck checkboxes'
+
+ visit page_with_token 'active', '/projects/' + api_fixture('groups')['aproject']['uuid']
+
+ # Go to "Data collections" tab and click on "Select all"
+ click_link 'Data collections'
+ wait_for_ajax
+
+ # Initially, all selection options for this tab should be disabled
+ click_button 'Selection'
+ within('.selection-action-container') do
+ assert_selector 'li.disabled', text: 'Create new collection with selected collections'
+ assert_selector 'li.disabled', text: 'Copy selected'
+ end
+
+ # Select all
+ click_button 'Select all'
+
+ assert_checkboxes_state('input[type=checkbox]', true, '"select all" should check all checkboxes')
+
+ # Now the selection options should be enabled
+ click_button 'Selection'
+ within('.selection-action-container') do
+ assert_selector 'li', text: 'Create new collection with selected collections'
+ assert_no_selector 'li.disabled', text: 'Copy selected'
+ assert_selector 'li', text: 'Create new collection with selected collections'
+ assert_no_selector 'li.disabled', text: 'Copy selected'
+ end
+
+ # Go to Jobs and pipelines tab and assert none selected
+ click_link 'Jobs and pipelines'
+ wait_for_ajax
+
+ # Since this is the first visit to this tab, all selection options should be disabled
+ click_button 'Selection'
+ within('.selection-action-container') do
+ assert_selector 'li.disabled', text: 'Create new collection with selected collections'
+ assert_selector 'li.disabled', text: 'Copy selected'
+ end
+
+ assert_checkboxes_state('input[type=checkbox]', false, '"select all" should check all checkboxes')
+
+ # Select all
+ click_button 'Select all'
+ assert_checkboxes_state('input[type=checkbox]', true, '"select all" should check all checkboxes')
+
+ # Applicable selection options should be enabled
+ click_button 'Selection'
+ within('.selection-action-container') do
+ assert_selector 'li.disabled', text: 'Create new collection with selected collections'
+ assert_selector 'li', text: 'Copy selected'
+ assert_no_selector 'li.disabled', text: 'Copy selected'
+ end
+
+ # Unselect all
+ click_button 'Unselect all'
+ assert_checkboxes_state('input[type=checkbox]', false, '"select all" should check all checkboxes')
+
+ # All selection options should be disabled again
+ click_button 'Selection'
+ within('.selection-action-container') do
+ assert_selector 'li.disabled', text: 'Create new collection with selected collections'
+ assert_selector 'li.disabled', text: 'Copy selected'
+ end
+
+ # Go back to Data collections tab and verify all are still selected
+ click_link 'Data collections'
+ wait_for_ajax
+
+ # Selection options should be enabled based on the fact that all collections are still selected in this tab
+ click_button 'Selection'
+ within('.selection-action-container') do
+ assert_selector 'li', text: 'Create new collection with selected collections'
+ assert_no_selector 'li.disabled', text: 'Copy selected'
+ assert_selector 'li', text: 'Create new collection with selected collections'
+ assert_no_selector 'li.disabled', text: 'Copy selected'
+ end
+
+ assert_checkboxes_state('input[type=checkbox]', true, '"select all" should check all checkboxes')
+
+ # Unselect all
+ find('button#unselect-all').click
+ assert_checkboxes_state('input[type=checkbox]', false, '"unselect all" should clear all checkboxes')
+
+ # Now all selection options should be disabled because none of the collections are checked
+ click_button 'Selection'
+ within('.selection-action-container') do
+ assert_selector 'li.disabled', text: 'Copy selected'
+ assert_selector 'li.disabled', text: 'Copy selected'
+ end
+
+ # Verify checking just one checkbox still works as expected
+ within('tr', text: api_fixture('collections')['collection_to_move_around_in_aproject']['name']) do
+ find('input[type=checkbox]').click
+ end
+
+ click_button 'Selection'
+ within('.selection-action-container') do
+ assert_selector 'li', text: 'Create new collection with selected collections'
+ assert_no_selector 'li.disabled', text: 'Copy selected'
+ assert_selector 'li', text: 'Create new collection with selected collections'
+ assert_no_selector 'li.disabled', text: 'Copy selected'
+ end
+ end
end
--- /dev/null
+require 'integration_helper'
+require 'helpers/repository_stub_helper'
+require 'helpers/share_object_helper'
+
+class RepositoriesTest < ActionDispatch::IntegrationTest
+ include RepositoryStubHelper
+ include ShareObjectHelper
+
+ reset_api_fixtures :after_each_test, false
+
+ setup do
+ need_javascript
+ end
+
+ test "browse repository from jobs#show" do
+ sha1 = api_fixture('jobs')['running']['script_version']
+ _, fakecommit, fakefile =
+ stub_repo_content sha1: sha1, filename: 'crunch_scripts/hash'
+ show_object_using 'active', 'jobs', 'running', sha1
+ click_on api_fixture('jobs')['running']['script']
+ assert_text fakefile
+ click_on 'crunch_scripts'
+ assert_selector 'td a', text: 'hash'
+ click_on 'foo'
+ assert_selector 'td a', text: 'crunch_scripts'
+ click_on sha1
+ assert_text fakecommit
+
+ show_object_using 'active', 'jobs', 'running', sha1
+ click_on 'active/foo'
+ assert_selector 'td a', text: 'crunch_scripts'
+
+ show_object_using 'active', 'jobs', 'running', sha1
+ click_on sha1
+ assert_text fakecommit
+ end
+
+ test "browse using arv-git-http" do
+ repo = api_fixture('repositories')['foo']
+ portfile =
+ File.expand_path('../../../../../tmp/arv-git-httpd-ssl.port', __FILE__)
+ gitsslport = File.read(portfile)
+ Repository.any_instance.
+ stubs(:http_fetch_url).
+ returns "https://localhost:#{gitsslport}/#{repo['name']}.git"
+ commit_sha1 = '1de84a854e2b440dc53bf42f8548afa4c17da332'
+ visit page_with_token('active', "/repositories/#{repo['uuid']}/commit/#{commit_sha1}")
+ assert_text "Date: Tue Mar 18 15:55:28 2014 -0400"
+ visit page_with_token('active', "/repositories/#{repo['uuid']}/tree/#{commit_sha1}")
+ assert_selector "tbody td a", "foo"
+ assert_text "12 bytes"
+ end
+end
def verify_manage_account user
if user['is_active']
within('.navbar-fixed-top') do
- find('a', text: "#{user['email']}").click
+ page.find("#notifications-menu").click
within('.dropdown-menu') do
find('a', text: 'Manage account').click
end
add_and_verify_ssh_key
else # inactive user
within('.navbar-fixed-top') do
- find('a', text: "#{user['email']}").click
+ page.find("#notifications-menu").click
within('.dropdown-menu') do
assert page.has_no_link?('Manage profile'), 'Found link - Manage profile'
end
# Revisit the page and verify the request sent message along with
# the request button.
within('.navbar-fixed-top') do
- find('a', text: 'spectator').click
+ page.find("#notifications-menu").click
within('.dropdown-menu') do
find('a', text: 'Manage account').click
end
assert page.has_link?('Log in'), 'Not found link - Log in'
else
# my account menu
- assert page.has_link?("#{user['email']}"), 'Not found link - email'
- find('a', text: "#{user['email']}").click
+ assert(page.has_link?("notifications-menu"), 'no user menu')
+ page.find("#notifications-menu").click
within('.dropdown-menu') do
if user['is_active']
assert page.has_no_link?('Not active'), 'Found link - Not active'
@main_process_pid = $$
@@server_is_running = false
- def check_call *args
+ def check_output *args
output = nil
Bundler.with_clean_env do
output = IO.popen *args do |io|
def run_test_server
env_script = nil
Dir.chdir PYTHON_TESTS_DIR do
- env_script = check_call %w(python ./run_test_server.py start --auth admin)
+ # These are no-ops if we're running within run-tests.sh (except
+ # that we do get a useful env_script back from "start", even
+ # though it doesn't need to start up a new server).
+ env_script = check_output %w(python ./run_test_server.py start --auth admin)
+ check_output %w(python ./run_test_server.py start_arv-git-httpd)
+ check_output %w(python ./run_test_server.py start_nginx)
end
test_env = {}
env_script.each_line do |line|
def stop_test_server
Dir.chdir PYTHON_TESTS_DIR do
- # This is a no-op if we're running within run-tests.sh
- check_call %w(python ./run_test_server.py stop)
+ # These are no-ops if we're running within run-tests.sh
+ check_output %w(python ./run_test_server.py stop_nginx)
+ check_output %w(python ./run_test_server.py stop_arv-git-httpd)
+ check_output %w(python ./run_test_server.py stop)
end
@@server_is_running = false
end
def run_rake_task task_name, arg_string
Dir.chdir ARV_API_SERVER_DIR do
- check_call ['bundle', 'exec', 'rake', "#{task_name}[#{arg_string}]"]
+ check_output ['bundle', 'exec', 'rake', "#{task_name}[#{arg_string}]"]
end
end
end
end
def after_teardown
- if self.class.want_reset_api_fixtures[:after_each_test]
+ if self.class.want_reset_api_fixtures[:after_each_test] and
+ @want_reset_api_fixtures != false
self.class.reset_api_fixtures_now
end
super
end
+ def reset_api_fixtures_after_test t=true
+ @want_reset_api_fixtures = t
+ end
+
protected
def self.reset_api_fixtures_now
# Never try to reset fixtures when we're just using test
require 'test_helper'
class PipelineInstanceTest < ActiveSupport::TestCase
- def attribute_editable_for?(token_name, pi_name, attr_name, ever=nil)
+ def find_pi_with(token_name, pi_name)
use_token token_name
- find_fixture(PipelineInstance, pi_name).attribute_editable?(attr_name, ever)
+ find_fixture(PipelineInstance, pi_name)
+ end
+
+ def attribute_editable_for?(token_name, pi_name, attr_name, ever=nil)
+ find_pi_with(token_name, pi_name).attribute_editable?(attr_name, ever)
end
test "admin can edit name" do
"components"),
"components not editable on new pipeline")
end
+
+ test "job_logs for partially complete pipeline" do
+ log_uuid = api_fixture("collections", "real_log_collection", "uuid")
+ pi = find_pi_with(:active, "running_pipeline_with_complete_job")
+ assert_equal({previous: log_uuid, running: nil}, pi.job_log_ids)
+ end
+
+ test "job_logs for complete pipeline" do
+ log_uuid = api_fixture("collections", "real_log_collection", "uuid")
+ pi = find_pi_with(:active, "complete_pipeline_with_two_jobs")
+ assert_equal({ancient: log_uuid, previous: log_uuid}, pi.job_log_ids)
+ end
+
+ test "job_logs for malformed pipeline" do
+ pi = find_pi_with(:active, "components_is_jobspec")
+ assert_empty(pi.job_log_ids.select { |_, log| not log.nil? })
+ end
+
+ def check_stderr_logs(token_name, pi_name, log_name)
+ pi = find_pi_with(token_name, pi_name)
+ actual_logs = pi.stderr_log_lines
+ expected_text = api_fixture("logs", log_name, "properties", "text")
+ expected_text.each_line do |log_line|
+ assert_includes(actual_logs, log_line.chomp)
+ end
+ end
+
+ test "stderr_logs for running pipeline" do
+ check_stderr_logs(:active,
+ "pipeline_in_publicly_accessible_project",
+ "log_line_for_pipeline_in_publicly_accessible_project")
+ end
+
+ test "stderr_logs for job in complete pipeline" do
+ check_stderr_logs(:active,
+ "failed_pipeline_with_two_jobs",
+ "crunchstat_for_previous_job")
+ end
+
+ test "has_readable_logs? for unrun pipeline" do
+ pi = find_pi_with(:active, "new_pipeline")
+ refute(pi.has_readable_logs?)
+ end
+
+ test "has_readable_logs? for running pipeline" do
+ pi = find_pi_with(:active, "running_pipeline_with_complete_job")
+ assert(pi.has_readable_logs?)
+ end
+
+ test "has_readable_logs? for complete pipeline" do
+ pi = find_pi_with(:active, "pipeline_in_publicly_accessible_project_but_other_objects_elsewhere")
+ assert(pi.has_readable_logs?)
+ end
+
+ test "has_readable_logs? for complete pipeline when jobs unreadable" do
+ pi = find_pi_with(:anonymous, "pipeline_in_publicly_accessible_project_but_other_objects_elsewhere")
+ refute(pi.has_readable_logs?)
+ end
end
import arvados
import os
-import robust_put
import stat
+import arvados.commands.run
+import logging
# Implements "Virtual Working Directory"
# Provides a way of emulating a shared writable directory in Keep based
for f in files:
os.symlink(os.path.join(root, f), os.path.join(target_dir, rel, f))
-# Delete all symlinks and check in any remaining normal files.
-# If merge == True, merge the manifest with source_collection and return a
-# CollectionReader for the combined collection.
-def checkin(source_collection, target_dir, merge=True):
- # delete symlinks, commit directory, merge manifests and return combined
- # collection.
+def checkin(target_dir):
+ """Write files in `target_dir` to Keep.
+
+ Regular files or symlinks to files outside the keep mount are written to
+ Keep as normal files (Keep does not support symlinks).
+
+ Symlinks to files in the keep mount will result in files in the new
+ collection which reference existing Keep blocks, no data copying necessary.
+
+ Returns a new Collection object, with data flushed but the collection record
+ not saved to the API.
+
+ """
+
+ outputcollection = arvados.collection.Collection(num_retries=5)
+
+ if target_dir[-1:] != '/':
+ target_dir += '/'
+
+ collections = {}
+
+ logger = logging.getLogger("arvados")
+
+ last_error = None
for root, dirs, files in os.walk(target_dir):
for f in files:
- s = os.lstat(os.path.join(root, f))
- if stat.S_ISLNK(s.st_mode):
- os.unlink(os.path.join(root, f))
-
- uuid = robust_put.upload(target_dir)
- if merge:
- cr1 = arvados.CollectionReader(source_collection)
- cr2 = arvados.CollectionReader(uuid)
- combined = arvados.CollectionReader(cr1.manifest_text() + cr2.manifest_text())
- return combined
- else:
- return arvados.CollectionReader(uuid)
+ try:
+ s = os.lstat(os.path.join(root, f))
+
+ writeIt = False
+
+ if stat.S_ISREG(s.st_mode):
+ writeIt = True
+ elif stat.S_ISLNK(s.st_mode):
+ # 1. check if it is a link into a collection
+ real = os.path.split(os.path.realpath(os.path.join(root, f)))
+ (pdh, branch) = arvados.commands.run.is_in_collection(real[0], real[1])
+ if pdh is not None:
+ # 2. load collection
+ if pdh not in collections:
+ collections[pdh] = arvados.collection.CollectionReader(pdh,
+ api_client=outputcollection._my_api(),
+ keep_client=outputcollection._my_keep(),
+ num_retries=5)
+ # 3. copy arvfile to new collection
+ outputcollection.copy(branch, os.path.join(root[len(target_dir):], f), source_collection=collections[pdh])
+ else:
+ writeIt = True
+
+ if writeIt:
+ reldir = root[len(target_dir):]
+ with outputcollection.open(os.path.join(reldir, f), "wb") as writer:
+ with open(os.path.join(root, f), "rb") as reader:
+ dat = reader.read(64*1024)
+ while dat:
+ writer.write(dat)
+ dat = reader.read(64*1024)
+ except (IOError, OSError) as e:
+ logger.error(e)
+ last_error = e
+
+ return (outputcollection, last_error)
jobp = json.loads(args.script_parameters)
os.environ['JOB_UUID'] = 'zzzzz-8i9sb-1234567890abcde'
os.environ['TASK_UUID'] = 'zzzzz-ot0gb-1234567890abcde'
- os.environ['CRUNCH_SRC'] = '/tmp/crunche-src'
+ os.environ['CRUNCH_SRC'] = '/tmp/crunch-src'
if 'TASK_KEEPMOUNT' not in os.environ:
os.environ['TASK_KEEPMOUNT'] = '/keep'
-links = []
-
def sub_tmpdir(v):
return os.path.join(arvados.current_task().tmpdir, 'tmpdir')
signal.signal(signal.SIGTERM, signal.SIG_DFL)
signal.signal(signal.SIGQUIT, signal.SIG_DFL)
-for l in links:
- os.unlink(l)
-
logger.info("the following output files will be saved to keep:")
-subprocess.call(["find", ".", "-type", "f", "-printf", "run-command: %12.12s %h/%f\\n"], stdout=sys.stderr)
+subprocess.call(["find", "-L", ".", "-type", "f", "-printf", "run-command: %12.12s %h/%f\\n"], stdout=sys.stderr, cwd=outdir)
logger.info("start writing output to keep")
-if "task.vwd" in taskp:
- if "task.foreach" in jobp:
- # This is a subtask, so don't merge with the original collection, that will happen at the end
- outcollection = vwd.checkin(subst.do_substitution(taskp, taskp["task.vwd"]), outdir, merge=False).manifest_text()
- else:
- # Just a single task, so do merge with the original collection
- outcollection = vwd.checkin(subst.do_substitution(taskp, taskp["task.vwd"]), outdir, merge=True).manifest_text()
-else:
- outcollection = robust_put.upload(outdir, logger)
+if "task.vwd" in taskp and "task.foreach" in jobp:
+ for root, dirs, files in os.walk(outdir):
+ for f in files:
+ s = os.lstat(os.path.join(root, f))
+ if stat.S_ISLNK(s.st_mode):
+ os.unlink(os.path.join(root, f))
+
+(outcollection, checkin_error) = vwd.checkin(outdir)
# Success if we ran any subprocess, and they all exited 0.
-success = rcode and all(status == 0 for status in rcode.itervalues())
+success = rcode and all(status == 0 for status in rcode.itervalues()) and not checkin_error
api.job_tasks().update(uuid=arvados.current_task()['uuid'],
body={
- 'output': outcollection,
+ 'output': outcollection.manifest_text(),
'success': success,
'progress':1.0
}).execute()
table(table table-bordered table-condensed).
|_. Attribute|_. Type|_. Description|_. Example|
|name|string|The name of the repository on disk. Repository names must begin with a letter and contain only alphanumerics. Unless the repository is owned by the system user, the name must begin with the owner's username, then be separated from the base repository name with @/@. You may not create a repository that is owned by a user without a username.|@username/project1@|
-|fetch_url|string|The git remote's fetch URL for the repository. Read-only.||
-|push_url|string|The git remote's push URL for the repository. Read-only.||
+|clone_urls|array|URLs from which the repository can be cloned. Read-only.|@["git@git.zzzzz.arvadosapi.com:foo/bar.git",
+ "https://git.zzzzz.arvadosapi.com/foo/bar.git"]@|
+|fetch_url|string|URL suggested as a fetch-url in git config. Deprecated. Read-only.||
+|push_url|string|URL suggested as a push-url in git config. Deprecated. Read-only.||
<div class="container-fluid">
<div class="row">
- <div class="col-sm-5">
+ <div class="col-sm-6">
+ <p><strong>What is Arvados</strong>
<p><a href="https://arvados.org/">Arvados</a> enables you to quickly begin using cloud computing resources in your data science work. It allows you to track your methods and datasets, share them securely, and easily re-run analyses.
</p>
- <p><strong>News</strong>: Read our <a href="https://arvados.org/projects/arvados/blogs">blog updates</a> or look through our <a href="https://arvados.org/projects/arvados/activity">recent developer activity</a>.
+ <p><strong>News</strong>
+ <p>Read our <a href="https://arvados.org/projects/arvados/blogs">blog updates</a> or look through our <a href="https://arvados.org/projects/arvados/activity">recent developer activity</a>.
</p>
- <p><strong>Questions?</strong> Email <a href="http://lists.arvados.org/mailman/listinfo/arvados">the mailing list</a>, or chat with us on IRC: <a href="irc://irc.oftc.net:6667/#arvados">#arvados</a> @ OFTC (you can <a href="https://webchat.oftc.net/?channels=arvados">join in your browser</a>).
+ <p><strong>Questions?</strong></p>
+ <p>Email <a href="http://lists.arvados.org/mailman/listinfo/arvados">the mailing list</a>, or chat with us on IRC: <a href="irc://irc.oftc.net:6667/#arvados">#arvados</a> @ OFTC (you can <a href="https://webchat.oftc.net/?channels=arvados">join in your browser</a>).
</p>
- <p><strong>Want to contribute?</strong> Check out our <a href="https://arvados.org/projects/arvados">developer site</a>. We're open source, check out our code on <a href="https://github.com/curoverse/arvados">github</a>.
+ <p><strong>Want to contribute?</strong></p>
+ <p>Check out our <a href="https://arvados.org/projects/arvados">developer site</a>. We're open source, check out our code on <a href="https://github.com/curoverse/arvados">github</a>.
</p>
- <p><strong>License</strong>: Arvados is under the copyleft <a href="{{ site.baseurl }}/user/copying/agpl-3.0.html">GNU AGPL v3</a>, with our SDKs under <a href="{{ site.baseurl }}/user/copying/LICENSE-2.0.html">Apache License 2.0</a> (so that you can incorporate proprietary toolchains into your pipelines).
+ <p><strong>License</strong></p>
+ <p>Arvados is under the copyleft <a href="{{ site.baseurl }}/user/copying/agpl-3.0.html">GNU AGPL v3</a>, with our SDKs under <a href="{{ site.baseurl }}/user/copying/LICENSE-2.0.html">Apache License 2.0</a> (so that you can incorporate proprietary toolchains into your pipelines).
</p>
</div>
- <div class="col-sm-7" style="border-left: solid; border-width: 1px">
+ <div class="col-sm-6" style="border-left: solid; border-width: 1px">
<p><strong>Quickstart</strong>
<p>
Try any pipeline from the <a href="https://arvados.org/projects/arvados/wiki/Public_Pipelines_and_Datasets">list of public pipelines</a>. For instance, the <a href="http://curover.se/pathomap">Pathomap Pipeline</a> links to these <a href="https://arvados.org/projects/arvados/wiki/pathomap_tutorial/">step-by-step instructions</a> for trying Arvados out right in your browser using Curoverse's <a href="http://lp.curoverse.com/beta-signup/">public Arvados instance</a>.
<p>
Want to port your pipeline to Arvados? Check out the step-by-step <a href="https://arvados.org/projects/arvados/wiki/Port_a_Pipeline">Port-a-Pipeline</a> guide on the Arvados wiki.
</p>
- <p><strong>Below you can also find more in-depth guides for using Arvados.
+ <p><strong>More in-depth guides
</strong></p>
<!--<p>-->
<!--<a href="{{ site.baseurl }}/start/index.html">Getting Started</a> — Start here if you're new to Arvados.-->
<pre><code>~$ <span class="userinput">prefix=`arv --format=uuid user current | cut -d- -f1`</span>
~$ <span class="userinput">echo "Site prefix is '$prefix'"</span>
~$ <span class="userinput">all_users_group_uuid="$prefix-j7d0g-fffffffffffffff"</span>
-~$ <span class="userinput">repo_uuid=`arv --format=uuid repository create --repository '{"name":"arvados"}'`</span>
+~$ <span class="userinput">repo_uuid=`arv --format=uuid repository create --repository '{"owner_uuid":"$prefix-tpzed-000000000000000", "name":"arvados"}'`</span>
~$ <span class="userinput">echo "Arvados repository uuid is '$repo_uuid'"</span>
~$ <span class="userinput">read -rd $'\000' newlink <<EOF; arv link create --link "$newlink"</span>
<span class="userinput">{
Generate a new secret token for signing cookies:
<notextile>
-<pre><code>~/arvados/services/api$ <span class="userinput">rake secret</span>
+<pre><code>~/arvados/services/api$ <span class="userinput">ruby -e 'puts rand(2**400).to_s(36)'</span>
zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz
</code></pre></notextile>
<notextile>
<pre><code>~$ <span class="userinput">keepstore -h</span>
-2014/10/29 14:23:38 Keep started: pid 6848
-Usage of keepstore:
+2015/05/08 13:41:16 keepstore starting, pid 2565
+Usage of ./keepstore:
+ -blob-signature-ttl=1209600: Lifetime of blob permission signatures. See services/api/config/application.default.yml.
+ -blob-signing-key-file="": File containing the secret key for generating and verifying blob permission signatures.
-data-manager-token-file="": File with the API token used by the Data Manager. All DELETE requests or GET /index requests must carry this token.
-enforce-permissions=false: Enforce permission signatures on requests.
- -listen=":25107": Interface on which to listen for requests, in the format ipaddr:port. e.g. -listen=10.0.1.24:8000. Use -listen=:port to listen on all network interfaces.
+ -listen=":25107": Listening address, in the form "host:port". e.g., 10.0.1.24:8000. Omit the host part to listen on all interfaces.
+ -max-buffers=128: Maximum RAM to use for data buffers, given in multiples of block size (64 MiB). When this limit is reached, HTTP requests requiring buffers (like GET and PUT) will wait for buffer space to be released.
-never-delete=false: If set, nothing will be deleted. HTTP 405 will be returned for valid DELETE requests.
- -permission-key-file="": File containing the secret key for generating and verifying permission signatures.
- -permission-ttl=1209600: Expiration time (in seconds) for newly generated permission signatures.
- -pid="": Path to write pid file
- -serialize=false: If set, all read and write operations on local Keep volumes will be serialized.
- -volumes="": Comma-separated list of directories to use for Keep volumes, e.g. -volumes=/var/keep1,/var/keep2. If empty or not supplied, Keep will scan mounted filesystems for volumes with a /keep top-level directory.
+ -permission-key-file="": Synonym for -blob-signing-key-file.
+ -permission-ttl=0: Synonym for -blob-signature-ttl.
+ -pid="": Path to write pid file during startup. This file is kept open and locked with LOCK_EX until keepstore exits, so `fuser -k pidfile` is one way to shut down. Exit immediately if there is an error opening, locking, or writing the pid file.
+ -readonly=false: Do not write, delete, or touch anything on the following volumes.
+ -serialize=false: Serialize read and write operations on the following volumes.
+ -volume=[]: Local storage directory. Can be given more than once to add multiple directories. If none are supplied, the default is to use all directories named "keep" that exist in the top level directory of a mount point at startup time. Can be a comma-separated list, but this is deprecated: use multiple -volume arguments instead.
+ -volumes=[]: Deprecated synonym for -volume.
</code></pre>
</notextile>
-If you want access control on your Keepstore server(s), you should provide a permission key. The @-permission-key-file@ argument should contain the path to a file that contains a single line with a long random alphanumeric string. It should be the same as the @blob_signing_key@ that can be set in the "API server":install-api-server.html config/application.yml file.
+If you want access control on your Keepstore server(s), you must specify the @-enforce-permissions@ flag and provide a signing key. The @-blob-signing-key-file@ argument should be a file containing a long random alphanumeric string with no internal line breaks (it is also possible to use a socket or FIFO: keepstore reads it only once, at startup). This key must be the same as the @blob_signing_key@ configured in the "API server":install-api-server.html config/application.yml file.
+
+The @-max-buffers@ argument can be used to restrict keepstore's memory use. By default, keepstore will allocate no more than 128 blocks (8 GiB) worth of data buffers at a time. Normally this should be set as high as possible without risking swapping.
Prepare one or more volumes for Keepstore to use. Simply create a /keep directory on all the partitions you would like Keepstore to use, and then start Keepstore. For example, using 2 tmpfs volumes:
<notextile>
-<pre><code>~$ <span class="userinput">keepstore</span>
-2014/10/29 11:41:37 Keep started: pid 20736
-2014/10/29 11:41:37 adding Keep volume: /tmp/tmp.vwSCtUCyeH/keep
-2014/10/29 11:41:37 adding Keep volume: /tmp/tmp.Lsn4w8N3Xv/keep
-2014/10/29 11:41:37 Running without a PermissionSecret. Block locators returned by this server will not be signed, and will be rejected by a server that enforces permissions.
-2014/10/29 11:41:37 To fix this, run Keep with --permission-key-file=<path> to define the location of a file containing the permission key.
+<pre><code>~$ <span class="userinput">keepstore -blob-signing-key-file=./blob-signing-key</span>
+2015/05/08 13:44:26 keepstore starting, pid 2765
+2015/05/08 13:44:26 Using volume [UnixVolume /mnt/keep] (writable=true)
+2015/05/08 13:44:26 listening at :25107
</code></pre>
</notextile>
<notextile>
<pre><code>~/sso-devise-omniauth-provider$ <span class="userinput">cp -i config/initializers/secret_token.rb.example config/initializers/secret_token.rb</span>
-~/sso-devise-omniauth-provider$ <span class="userinput">rake secret</span>
+~/sso-devise-omniauth-provider$ <span class="userinput">ruby -e 'puts rand(2**400).to_s(36)'</span>
zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz
</code></pre>
</notextile>
-Edit @config/initializers/secret_token.rb@ to set @config.secret_token@ to the string produced by @rake secret@.
+Edit @config/initializers/secret_token.rb@ to set @config.secret_token@ to the string produced by @rand@ above.
h3. Configure upstream authentication provider
Use @rails console@ to create a @Client@ record that will be used by the Arvados API server. The values of @app_id@ and @app_secret@ correspond to the @APP_ID@ and @APP_SECRET@ that must be set in in "Setting up Omniauth in the API server.":install-api-server.html#omniauth
<notextile>
-<pre><code>~/sso-devise-omniauth-provider$ <span class="userinput">rake secret</span>
+<pre><code>~/sso-devise-omniauth-provider$ <span class="userinput">ruby -e 'puts rand(2**400).to_s(36)'</span>
xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
~/sso-devise-omniauth-provider$ <span class="userinput">RAILS_ENV=production bundle exec rails console</span>
:001 > <span class="userinput">c = Client.new</span>
<pre><code>~$ <span class="userinput">sudo apt-get install \
bison build-essential gettext libcurl3 libcurl3-gnutls \
libcurl4-openssl-dev libpcre3-dev libpq-dev libreadline-dev \
- libssl-dev libxslt1.1 git wget zlib1g-dev graphviz libsqlite3-dev
+ libssl-dev libxslt1.1 git wget zlib1g-dev graphviz
</span></code></pre></notextile>
Also make sure you have "Ruby and bundler":install-manual-prerequisites-ruby.html installed.
This application needs a secret token. Generate a new secret:
<notextile>
-<pre><code>~/arvados/apps/workbench$ <span class="userinput">rake secret</span>
+<pre><code>~/arvados/apps/workbench$ <span class="userinput">ruby -e 'puts rand(2**400).to_s(36)'</span>
aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
</code></pre>
</notextile>
|$(dir ...) | Takes a reference to an Arvados collection or directory within an Arvados collection and evaluates to a directory path on the local file system where that directory can be accessed by your command. The path may include a file name, in which case it will evaluate to the parent directory of the file. Uses Python's os.path.dirname(), so "/foo/bar" will evaluate to "/foo" but "/foo/bar/" will evaluate to "/foo/bar". Will raise an error if the directory is not accessible. |
|$(basename ...) | Strip leading directory and trailing file extension from the path provided. For example, $(basename /foo/bar.baz.txt) will evaluate to "bar.baz".|
|$(glob ...) | Take a Unix shell path pattern (supports @*@ @?@ and @[]@) and search the local filesystem, returning the first match found. Use together with $(dir ...) to get a local filesystem path for Arvados collections. For example: $(glob $(dir $(mycollection)/*.bam)) will find the first .bam file in the collection specified by the user parameter "mycollection". If there is more than one match, which one is returned is undefined. Will raise an error if no matches are found.|
+|$(task.tmpdir)|Designated temporary directory. This directory will be discarded when the job completes.|
+|$(task.outdir)|Designated output directory. The contents of this directory will be saved to Keep when the job completes. A symlink to a file in the keep mount will reference existing Keep blocks in your job output collection, with no data copying or duplication.|
+|$(job.srcdir)|Path to the git working directory ($CRUNCH_SRC).|
+|$(node.cores)|Number of CPU cores on the node.|
+|$(job.uuid)|Current job uuid ($JOB_UUID)|
+|$(task.uuid)|Current task uuid ($TASK_UUID)|
h3. Escape sequences
Background: because Keep collections are read-only, this does not play well with certain tools that expect to be able to write their outputs alongside their inputs (such as tools that generate indexes that are closely associated with the original file.) The run-command's solution to this is the "virtual working directory".
-@task.vwd@ specifies a Keep collection with the starting contents of the directory. @run-command@ will then populate @task.outdir@ with directories and symlinks to mirror the contents of the @task.vwd@ collection. Your command will then be able to both access its input files and write its output files in @task.outdir@. When the command completes, the output collection will merge the output of your command with the contents of the starting collection. Note that files in the starting collection remain read-only and cannot be altered or deleted.
+@task.vwd@ specifies a Keep collection with the starting contents of the output directory. @run-command@ will populate @task.outdir@ with directories and symlinks to mirror the contents of the @task.vwd@ collection. Your command will then be able to both access its input files and write its output files from within @task.outdir@. When the command completes, run-command will write the contents of the output directory, which will include the output of your command as well as symlinks to files in starting collection. Note that files from the starting collection remain read-only and cannot be altered, but may be deleted or renamed.
h3. task.foreach
production:
host: api.dev.arvados
- git_host: api.dev.arvados
+
+ git_repo_ssh_base: "git@api.dev.arvados:"
+
+ # Docker setup doesn't include arv-git-httpd yet.
+ git_repo_https_base: false
# At minimum, you need a nice long randomly generated secret_token here.
# Use a long string of alphanumeric characters (at least 36).
# Install generated config files
ADD generated/secret_token.rb /usr/src/sso-provider/config/initializers/secret_token.rb
ADD generated/seeds.rb /usr/src/sso-provider/db/seeds.rb
+ADD generated/application.yml /usr/src/sso-provider/config/application.yml
ADD generated/apache2_vhost /etc/apache2/sites-available/sso-provider
ADD generated/apache2_vhost /etc/apache2/sites-available/sso-provider
--- /dev/null
+#
+# Consult application.default.yml for the full list of configuration
+# settings.
+#
+# The order of precedence is:
+# 1. config/environments/{RAILS_ENV}.rb (deprecated)
+# 2. Section in application.yml corresponding to RAILS_ENV (e.g., development)
+# 3. Section in application.yml called "common"
+# 4. Section in application.default.yml corresponding to RAILS_ENV
+# 5. Section in application.default.yml called "common"
+
+production:
+ allow_account_registration: true
+
+ secret_token: @@SSO_SECRET@@
+ uuid_prefix: 'zzzzz'
+
+development:
+ # No development settings
+
+test:
+ # No test settings
+
+common:
+ # No common settings
+
var MissingArvadosApiHost = errors.New("Missing required environment variable ARVADOS_API_HOST")
var MissingArvadosApiToken = errors.New("Missing required environment variable ARVADOS_API_TOKEN")
-type ArvadosApiError struct {
- error
- HttpStatusCode int
- HttpStatus string
+// Indicates an error that was returned by the API server.
+type APIServerError struct {
+ // Address of server returning error, of the form "host:port".
+ ServerAddress string
+
+ // Components of server response.
+ HttpStatusCode int
+ HttpStatusMessage string
+
+ // Additional error details from response body.
+ ErrorDetails []string
}
-func (e ArvadosApiError) Error() string { return e.error.Error() }
+func (e APIServerError) Error() string {
+ if len(e.ErrorDetails) > 0 {
+ return fmt.Sprintf("arvados API server error: %s (%d: %s) returned by %s",
+ strings.Join(e.ErrorDetails, "; "),
+ e.HttpStatusCode,
+ e.HttpStatusMessage,
+ e.ServerAddress)
+ } else {
+ return fmt.Sprintf("arvados API server error: %d: %s returned by %s",
+ e.HttpStatusCode,
+ e.HttpStatusMessage,
+ e.ServerAddress)
+ }
+}
// Helper type so we don't have to write out 'map[string]interface{}' every time.
type Dict map[string]interface{}
External bool
}
-// Create a new KeepClient, initialized with standard Arvados environment
+// Create a new ArvadosClient, initialized with standard Arvados environment
// variables ARVADOS_API_HOST, ARVADOS_API_TOKEN, and (optionally)
// ARVADOS_API_HOST_INSECURE.
-func MakeArvadosClient() (kc ArvadosClient, err error) {
+func MakeArvadosClient() (ac ArvadosClient, err error) {
var matchTrue = regexp.MustCompile("^(?i:1|yes|true)$")
insecure := matchTrue.MatchString(os.Getenv("ARVADOS_API_HOST_INSECURE"))
external := matchTrue.MatchString(os.Getenv("ARVADOS_EXTERNAL_CLIENT"))
- kc = ArvadosClient{
+ ac = ArvadosClient{
ApiServer: os.Getenv("ARVADOS_API_HOST"),
ApiToken: os.Getenv("ARVADOS_API_TOKEN"),
ApiInsecure: insecure,
TLSClientConfig: &tls.Config{InsecureSkipVerify: insecure}}},
External: external}
- if kc.ApiServer == "" {
- return kc, MissingArvadosApiHost
+ if ac.ApiServer == "" {
+ return ac, MissingArvadosApiHost
}
- if kc.ApiToken == "" {
- return kc, MissingArvadosApiToken
+ if ac.ApiToken == "" {
+ return ac, MissingArvadosApiToken
}
- return kc, err
+ return ac, err
}
// Low-level access to a resource.
}
defer resp.Body.Close()
- errorText := fmt.Sprintf("API response: %s", resp.Status)
+ return nil, newAPIServerError(this.ApiServer, resp)
+}
+
+func newAPIServerError(ServerAddress string, resp *http.Response) APIServerError {
+
+ ase := APIServerError{
+ ServerAddress: ServerAddress,
+ HttpStatusCode: resp.StatusCode,
+ HttpStatusMessage: resp.Status}
// If the response body has {"errors":["reason1","reason2"]}
// then return those reasons.
var errInfo = Dict{}
if err := json.NewDecoder(resp.Body).Decode(&errInfo); err == nil {
if errorList, ok := errInfo["errors"]; ok {
- var errorStrings []string
if errArray, ok := errorList.([]interface{}); ok {
for _, errItem := range errArray {
// We expect an array of strings here.
// Non-strings will be passed along
// JSON-encoded.
if s, ok := errItem.(string); ok {
- errorStrings = append(errorStrings, s)
+ ase.ErrorDetails = append(ase.ErrorDetails, s)
} else if j, err := json.Marshal(errItem); err == nil {
- errorStrings = append(errorStrings, string(j))
+ ase.ErrorDetails = append(ase.ErrorDetails, string(j))
}
}
- errorText = strings.Join(errorStrings, "; ")
}
}
}
- return nil, ArvadosApiError{errors.New(errorText), resp.StatusCode, resp.Status}
+ return ase
}
// Access to a resource.
package arvadosclient
import (
- . "gopkg.in/check.v1"
"git.curoverse.com/arvados.git/sdk/go/arvadostest"
+ . "gopkg.in/check.v1"
"net/http"
"os"
"testing"
err := arv.Create("logs",
Dict{"log": Dict{"bogus_attr": "foo"}},
&getback)
+ c.Assert(err, ErrorMatches, "arvados API server error: .*")
c.Assert(err, ErrorMatches, ".*unknown attribute: bogus_attr.*")
- c.Assert(err, FitsTypeOf, ArvadosApiError{})
- c.Assert(err.(ArvadosApiError).HttpStatusCode, Equals, 422)
+ c.Assert(err, FitsTypeOf, APIServerError{})
+ c.Assert(err.(APIServerError).HttpStatusCode, Equals, 422)
}
{
err := arv.Create("bogus",
Dict{"bogus": Dict{}},
&getback)
- c.Assert(err, ErrorMatches, "Path not found")
- c.Assert(err, FitsTypeOf, ArvadosApiError{})
- c.Assert(err.(ArvadosApiError).HttpStatusCode, Equals, 404)
+ c.Assert(err, ErrorMatches, "arvados API server error: .*")
+ c.Assert(err, ErrorMatches, ".*Path not found.*")
+ c.Assert(err, FitsTypeOf, APIServerError{})
+ c.Assert(err.(APIServerError).HttpStatusCode, Equals, 404)
}
}
raise IOError(errno.EINVAL, "truncate() does not support extending the file size")
def readfrom(self, offset, size, num_retries, exact=False):
- """Read upto `size` bytes from the file starting at `offset`.
+ """Read up to `size` bytes from the file starting at `offset`.
:exact:
If False (default), return less data than requested if the read
rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
return ''.join(data)
else:
- data = self.arvadosfile.readfrom(self._filepos, size, num_retries)
+ data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True)
self._filepos += len(data)
return data
for link_class in ("docker_image_repo+tag", "docker_image_hash"):
docker_links = src.links().list(filters=[["head_uuid", "=", collection_uuid], ["link_class", "=", link_class]]).execute(num_retries=args.retries)['items']
- for d in docker_links:
- body={
- 'head_uuid': dst_collection['uuid'],
- 'link_class': link_class,
- 'name': d['name'],
- }
+ for src_link in docker_links:
+ body = {key: src_link[key]
+ for key in ['link_class', 'name', 'properties']}
+ body['head_uuid'] = dst_collection['uuid']
body['owner_uuid'] = args.project_uuid
lk = dst.links().create(body=body).execute(num_retries=args.retries)
# errors.py - Arvados-specific exceptions.
import json
-import requests
from apiclient import errors as apiclient_errors
from collections import OrderedDict
self.message = message
def _format_error(self, key, error):
- if isinstance(error, requests.Response):
+ if isinstance(error, HttpError):
err_fmt = "{} {} responded with {e.status_code} {e.reason}"
else:
err_fmt = "{} {} raised {e.__class__.__name__} ({e})"
return self._request_errors
+class HttpError(Exception):
+ def __init__(self, status_code, reason):
+ self.status_code = status_code
+ self.reason = reason
+
+
class ArgumentError(Exception):
pass
class SyntaxError(Exception):
+import bz2
+import datetime
+import fcntl
+import functools
import gflags
+import hashlib
+import json
import logging
import os
import pprint
-import sys
-import types
-import subprocess
-import json
-import UserDict
+import pycurl
+import Queue
import re
-import hashlib
+import socket
+import ssl
import string
-import bz2
-import zlib
-import fcntl
-import time
+import cStringIO
+import subprocess
+import sys
import threading
+import time
import timer
-import datetime
-import ssl
-import socket
-import requests
+import types
+import UserDict
+import zlib
import arvados
import arvados.config as config
import arvados.retry as retry
import arvados.util
-try:
- # Workaround for urllib3 bug.
- # The 'requests' library enables urllib3's SNI support by default, which uses pyopenssl.
- # However, urllib3 prior to version 1.10 has a major bug in this feature
- # (OpenSSL WantWriteError, https://github.com/shazow/urllib3/issues/412)
- # Unfortunately Debian 8 is stabilizing on urllib3 1.9.1 which means the
- # following workaround is necessary to be able to use
- # the arvados python sdk with the distribution-provided packages.
- import urllib3
- from pkg_resources import parse_version
- if parse_version(urllib3.__version__) < parse_version('1.10'):
- from urllib3.contrib import pyopenssl
- pyopenssl.extract_from_urllib3()
-except ImportError:
- pass
-
_logger = logging.getLogger('arvados.keep')
global_client_object = None
+
class KeepLocator(object):
EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
class KeepService(object):
- # Make requests to a single Keep service, and track results.
- HTTP_ERRORS = (requests.exceptions.RequestException,
- socket.error, ssl.SSLError)
+ """Make requests to a single Keep service, and track results.
+
+ A KeepService is intended to last long enough to perform one
+ transaction (GET or PUT) against one Keep service. This can
+ involve calling either get() or put() multiple times in order
+ to retry after transient failures. However, calling both get()
+ and put() on a single instance -- or using the same instance
+ to access two different Keep services -- will not produce
+ sensible behavior.
+ """
+
+ HTTP_ERRORS = (
+ socket.error,
+ ssl.SSLError,
+ arvados.errors.HttpError,
+ )
- def __init__(self, root, session, **headers):
+ def __init__(self, root, user_agent_pool=Queue.LifoQueue(), **headers):
self.root = root
- self.last_result = None
- self.success_flag = None
- self.session = session
+ self._user_agent_pool = user_agent_pool
+ self._result = {'error': None}
+ self._usable = True
+ self._session = None
self.get_headers = {'Accept': 'application/octet-stream'}
self.get_headers.update(headers)
self.put_headers = headers
def usable(self):
- return self.success_flag is not False
+ """Is it worth attempting a request?"""
+ return self._usable
def finished(self):
- return self.success_flag is not None
+ """Did the request succeed or encounter permanent failure?"""
+ return self._result['error'] == False or not self._usable
- def last_status(self):
+ def last_result(self):
+ return self._result
+
+ def _get_user_agent(self):
try:
- return self.last_result.status_code
- except AttributeError:
- return None
+ return self._user_agent_pool.get(False)
+ except Queue.Empty:
+ return pycurl.Curl()
+
+ def _put_user_agent(self, ua):
+ try:
+ ua.reset()
+ self._user_agent_pool.put(ua, False)
+ except:
+ ua.close()
+
+ @staticmethod
+ def _socket_open(family, socktype, protocol, address=None):
+ """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
+ s = socket.socket(family, socktype, protocol)
+ s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
+ s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
+ s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
+ return s
def get(self, locator, timeout=None):
# locator is a KeepLocator object.
url = self.root + str(locator)
_logger.debug("Request: GET %s", url)
+ curl = self._get_user_agent()
try:
with timer.Timer() as t:
- result = self.session.get(url.encode('utf-8'),
- headers=self.get_headers,
- timeout=timeout)
+ self._headers = {}
+ response_body = cStringIO.StringIO()
+ curl.setopt(pycurl.NOSIGNAL, 1)
+ curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
+ curl.setopt(pycurl.URL, url.encode('utf-8'))
+ curl.setopt(pycurl.HTTPHEADER, [
+ '{}: {}'.format(k,v) for k,v in self.get_headers.iteritems()])
+ curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
+ curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
+ self._setcurltimeouts(curl, timeout)
+ try:
+ curl.perform()
+ except Exception as e:
+ raise arvados.errors.HttpError(0, str(e))
+ self._result = {
+ 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
+ 'body': response_body.getvalue(),
+ 'headers': self._headers,
+ 'error': False,
+ }
+ ok = retry.check_http_response_success(self._result['status_code'])
+ if not ok:
+ self._result['error'] = arvados.errors.HttpError(
+ self._result['status_code'],
+ self._headers.get('x-status-line', 'Error'))
except self.HTTP_ERRORS as e:
- _logger.debug("Request fail: GET %s => %s: %s",
- url, type(e), str(e))
- self.last_result = e
+ self._result = {
+ 'error': e,
+ }
+ ok = False
+ self._usable = ok != False
+ if self._result.get('status_code', None):
+ # The client worked well enough to get an HTTP status
+ # code, so presumably any problems are just on the
+ # server side and it's OK to reuse the client.
+ self._put_user_agent(curl)
else:
- self.last_result = result
- self.success_flag = retry.check_http_response_success(result)
- content = result.content
- _logger.info("%s response: %s bytes in %s msec (%.3f MiB/sec)",
- self.last_status(), len(content), t.msecs,
- (len(content)/(1024.0*1024))/t.secs if t.secs > 0 else 0)
- if self.success_flag:
- resp_md5 = hashlib.md5(content).hexdigest()
- if resp_md5 == locator.md5sum:
- return content
- _logger.warning("Checksum fail: md5(%s) = %s",
- url, resp_md5)
- return None
+ # Don't return this client to the pool, in case it's
+ # broken.
+ curl.close()
+ if not ok:
+ _logger.debug("Request fail: GET %s => %s: %s",
+ url, type(self._result['error']), str(self._result['error']))
+ return None
+ _logger.info("%s response: %s bytes in %s msec (%.3f MiB/sec)",
+ self._result['status_code'],
+ len(self._result['body']),
+ t.msecs,
+ (len(self._result['body'])/(1024.0*1024))/t.secs if t.secs > 0 else 0)
+ resp_md5 = hashlib.md5(self._result['body']).hexdigest()
+ if resp_md5 != locator.md5sum:
+ _logger.warning("Checksum fail: md5(%s) = %s",
+ url, resp_md5)
+ self._result['error'] = arvados.errors.HttpError(
+ 0, 'Checksum fail')
+ return None
+ return self._result['body']
def put(self, hash_s, body, timeout=None):
url = self.root + hash_s
_logger.debug("Request: PUT %s", url)
+ curl = self._get_user_agent()
try:
- result = self.session.put(url.encode('utf-8'),
- data=body,
- headers=self.put_headers,
- timeout=timeout)
+ self._headers = {}
+ body_reader = cStringIO.StringIO(body)
+ response_body = cStringIO.StringIO()
+ curl.setopt(pycurl.NOSIGNAL, 1)
+ curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
+ curl.setopt(pycurl.URL, url.encode('utf-8'))
+ # Using UPLOAD tells cURL to wait for a "go ahead" from the
+ # Keep server (in the form of a HTTP/1.1 "100 Continue"
+ # response) instead of sending the request body immediately.
+ # This allows the server to reject the request if the request
+ # is invalid or the server is read-only, without waiting for
+ # the client to send the entire block.
+ curl.setopt(pycurl.UPLOAD, True)
+ curl.setopt(pycurl.INFILESIZE, len(body))
+ curl.setopt(pycurl.READFUNCTION, body_reader.read)
+ curl.setopt(pycurl.HTTPHEADER, [
+ '{}: {}'.format(k,v) for k,v in self.put_headers.iteritems()])
+ curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
+ curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
+ self._setcurltimeouts(curl, timeout)
+ try:
+ curl.perform()
+ except Exception as e:
+ raise arvados.errors.HttpError(0, str(e))
+ self._result = {
+ 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
+ 'body': response_body.getvalue(),
+ 'headers': self._headers,
+ 'error': False,
+ }
+ ok = retry.check_http_response_success(self._result['status_code'])
+ if not ok:
+ self._result['error'] = arvados.errors.HttpError(
+ self._result['status_code'],
+ self._headers.get('x-status-line', 'Error'))
except self.HTTP_ERRORS as e:
+ self._result = {
+ 'error': e,
+ }
+ ok = False
+ self._usable = ok != False # still usable if ok is True or None
+ if self._result.get('status_code', None):
+ # Client is functional. See comment in get().
+ self._put_user_agent(curl)
+ else:
+ curl.close()
+ if not ok:
_logger.debug("Request fail: PUT %s => %s: %s",
- url, type(e), str(e))
- self.last_result = e
+ url, type(self._result['error']), str(self._result['error']))
+ return False
+ return True
+
+ def _setcurltimeouts(self, curl, timeouts):
+ if not timeouts:
+ return
+ elif isinstance(timeouts, tuple):
+ conn_t, xfer_t = timeouts
+ else:
+ conn_t, xfer_t = (timeouts, timeouts)
+ curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
+ curl.setopt(pycurl.TIMEOUT_MS, int(xfer_t*1000))
+
+ def _headerfunction(self, header_line):
+ header_line = header_line.decode('iso-8859-1')
+ if ':' in header_line:
+ name, value = header_line.split(':', 1)
+ name = name.strip().lower()
+ value = value.strip()
+ elif self._headers:
+ name = self._lastheadername
+ value = self._headers[name] + ' ' + header_line.strip()
+ elif header_line.startswith('HTTP/'):
+ name = 'x-status-line'
+ value = header_line
else:
- self.last_result = result
- self.success_flag = retry.check_http_response_success(result)
- return self.success_flag
+ _logger.error("Unexpected header line: %s", header_line)
+ return
+ self._lastheadername = name
+ self._headers[name] = value
+ # Returning None implies all bytes were written
class KeepWriterThread(threading.Thread):
self.args['data_hash'],
self.args['data'],
timeout=self.args.get('timeout', None)))
- status = self.service.last_status()
+ result = self.service.last_result()
if self._success:
- result = self.service.last_result
_logger.debug("KeepWriterThread %s succeeded %s+%i %s",
str(threading.current_thread()),
self.args['data_hash'],
# we're talking to a proxy or other backend that
# stores to multiple copies for us.
try:
- replicas_stored = int(result.headers['x-keep-replicas-stored'])
+ replicas_stored = int(result['headers']['x-keep-replicas-stored'])
except (KeyError, ValueError):
replicas_stored = 1
- limiter.save_response(result.content.strip(), replicas_stored)
- elif status is not None:
+ limiter.save_response(result['body'].strip(), replicas_stored)
+ elif result.get('status_code', None):
_logger.debug("Request fail: PUT %s => %s %s",
- self.args['data_hash'], status,
- self.service.last_result.content)
+ self.args['data_hash'],
+ result['status_code'],
+ result['body'])
def __init__(self, api_client=None, proxy=None,
The default number of times to retry failed requests.
This will be used as the default num_retries value when get() and
put() are called. Default 0.
-
- :session:
- The requests.Session object to use for get() and put() requests.
- Will create one if not specified.
"""
self.lock = threading.Lock()
if proxy is None:
self.block_cache = block_cache if block_cache else KeepBlockCache()
self.timeout = timeout
self.proxy_timeout = proxy_timeout
+ self._user_agent_pool = Queue.LifoQueue()
if local_store:
self.local_store = local_store
self.put = self.local_store_put
else:
self.num_retries = num_retries
- self.session = session if session is not None else requests.Session()
if proxy:
if not proxy.endswith('/'):
proxy += '/'
# Precompute the base URI for each service.
for r in accessible:
- r['_service_root'] = "{}://[{}]:{:d}/".format(
+ host = r['service_host']
+ if not host.startswith('[') and host.find(':') >= 0:
+ # IPv6 URIs must be formatted like http://[::1]:80/...
+ host = '[' + host + ']'
+ r['_service_root'] = "{}://{}:{:d}/".format(
'https' if r['service_ssl_flag'] else 'http',
- r['service_host'],
+ host,
r['service_port'])
# Gateway services are only used when specified by UUID,
local_roots = self.weighted_service_roots(locator, force_rebuild)
for root in local_roots:
if root not in roots_map:
- roots_map[root] = self.KeepService(root, self.session, **headers)
+ roots_map[root] = self.KeepService(
+ root, self._user_agent_pool, **headers)
return local_roots
@staticmethod
def get_from_cache(self, loc):
"""Fetch a block only if is in the cache, otherwise return None."""
slot = self.block_cache.get(loc)
- if slot and slot.ready.is_set():
+ if slot is not None and slot.ready.is_set():
return slot.get()
else:
return None
self._gateway_services.get(hint[2:])
)])
# Map root URLs to their KeepService objects.
- roots_map = {root: self.KeepService(root, self.session) for root in hint_roots}
+ roots_map = {
+ root: self.KeepService(root, self._user_agent_pool)
+ for root in hint_roots
+ }
# See #3147 for a discussion of the loop implementation. Highlights:
# * Refresh the list of Keep services after each failure, in case
# Q: Including 403 is necessary for the Keep tests to continue
# passing, but maybe they should expect KeepReadError instead?
not_founds = sum(1 for key in sorted_roots
- if roots_map[key].last_status() in {403, 404, 410})
- service_errors = ((key, roots_map[key].last_result)
+ if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
+ service_errors = ((key, roots_map[key].last_result()['error'])
for key in sorted_roots)
if not roots_map:
raise arvados.errors.KeepReadError(
"failed to write {}: no Keep services available ({})".format(
data_hash, loop.last_result()))
else:
- service_errors = ((key, roots_map[key].last_result)
+ service_errors = ((key, roots_map[key].last_result()['error'])
for key in local_roots
- if not roots_map[key].success_flag)
+ if roots_map[key].last_result()['error'])
raise arvados.errors.KeepWriteError(
"failed to write {} (wanted {} copies but wrote {})".format(
data_hash, copies, thread_limiter.done()), service_errors, label="service")
import functools
import inspect
+import pycurl
import time
from collections import deque
"queried loop results before any were recorded")
-def check_http_response_success(result):
- """Convert a 'requests' response to a loop control flag.
+def check_http_response_success(status_code):
+ """Convert an HTTP status code to a loop control flag.
- Pass this method a requests.Response object. It returns True if
- the response indicates success, None if it indicates temporary
+ Pass this method a numeric HTTP status code. It returns True if
+ the code indicates success, None if it indicates temporary
failure, and False otherwise. You can use this as the
success_check for a RetryLoop.
3xx status codes. They don't indicate success, and you can't
retry those requests verbatim.
"""
- try:
- status = result.status_code
- except Exception:
- return None
- if status in _HTTP_SUCCESSES:
+ if status_code in _HTTP_SUCCESSES:
return True
- elif status in _HTTP_CAN_RETRY:
+ elif status_code in _HTTP_CAN_RETRY:
return None
- elif 100 <= status < 600:
+ elif 100 <= status_code < 600:
return False
else:
return None # Get well soon, server.
license='Apache 2.0',
packages=find_packages(),
scripts=[
- 'bin/arv-copy',
- 'bin/arv-get',
- 'bin/arv-keepdocker',
- 'bin/arv-ls',
- 'bin/arv-normalize',
- 'bin/arv-put',
- 'bin/arv-run',
- 'bin/arv-ws'
- ],
+ 'bin/arv-copy',
+ 'bin/arv-get',
+ 'bin/arv-keepdocker',
+ 'bin/arv-ls',
+ 'bin/arv-normalize',
+ 'bin/arv-put',
+ 'bin/arv-run',
+ 'bin/arv-ws'
+ ],
install_requires=[
- 'python-gflags',
- 'google-api-python-client',
- 'httplib2',
- 'requests>=2.4',
- 'urllib3',
- 'ws4py'
- ],
+ 'google-api-python-client',
+ 'httplib2',
+ 'pycurl>=7.19',
+ 'python-gflags',
+ 'requests>=2.4',
+ 'urllib3',
+ 'ws4py'
+ ],
test_suite='tests',
tests_require=['mock>=1.0', 'PyYAML'],
zip_safe=False,
import io
import mock
import os
+import pycurl
import Queue
-import requests
import shutil
import tempfile
import unittest
return mock.patch('httplib2.Http.request', side_effect=queue_with((
(fake_httplib2_response(code, **headers), body) for code in codes)))
-# fake_requests_response, mock_get_responses and mock_put_responses
-# mock calls to requests.get() and requests.put()
-def fake_requests_response(code, body, **headers):
- r = requests.Response()
- r.status_code = code
- r.reason = httplib.responses.get(code, "Unknown Response")
- r.headers = headers
- r.raw = io.BytesIO(body)
- return r
-
-# The following methods patch requests.Session(), where return_value is a mock
-# Session object. The put/get attributes are set on mock Session, and the
-# desired put/get behavior is set on the put/get mocks.
-
-def mock_put_responses(body, *codes, **headers):
- m = mock.MagicMock()
+
+class FakeCurl:
+ @classmethod
+ def make(cls, code, body='', headers={}):
+ return mock.Mock(spec=cls, wraps=cls(code, body, headers))
+
+ def __init__(self, code=200, body='', headers={}):
+ self._opt = {}
+ self._got_url = None
+ self._writer = None
+ self._headerfunction = None
+ self._resp_code = code
+ self._resp_body = body
+ self._resp_headers = headers
+
+ def getopt(self, opt):
+ return self._opt.get(str(opt), None)
+
+ def setopt(self, opt, val):
+ self._opt[str(opt)] = val
+ if opt == pycurl.WRITEFUNCTION:
+ self._writer = val
+ elif opt == pycurl.HEADERFUNCTION:
+ self._headerfunction = val
+
+ def perform(self):
+ if not isinstance(self._resp_code, int):
+ raise self._resp_code
+ if self.getopt(pycurl.URL) is None:
+ raise ValueError
+ if self._writer is None:
+ raise ValueError
+ if self._headerfunction:
+ self._headerfunction("HTTP/1.1 {} Status".format(self._resp_code))
+ for k, v in self._resp_headers.iteritems():
+ self._headerfunction(k + ': ' + str(v))
+ self._writer(self._resp_body)
+
+ def close(self):
+ pass
+
+ def reset(self):
+ """Prevent fake UAs from going back into the user agent pool."""
+ raise Exception
+
+ def getinfo(self, opt):
+ if opt == pycurl.RESPONSE_CODE:
+ return self._resp_code
+ raise Exception
+
+def mock_keep_responses(body, *codes, **headers):
+ """Patch pycurl to return fake responses and raise exceptions.
+
+ body can be a string to return as the response body; an exception
+ to raise when perform() is called; or an iterable that returns a
+ sequence of such values.
+ """
+ cm = mock.MagicMock()
if isinstance(body, tuple):
codes = list(codes)
codes.insert(0, body)
- m.return_value.put.side_effect = queue_with((fake_requests_response(code, b, **headers) for b, code in codes))
+ responses = [
+ FakeCurl.make(code=code, body=b, headers=headers)
+ for b, code in codes
+ ]
else:
- m.return_value.put.side_effect = queue_with((fake_requests_response(code, body, **headers) for code in codes))
- return mock.patch('requests.Session', m)
-
-def mock_get_responses(body, *codes, **headers):
- m = mock.MagicMock()
- m.return_value.get.side_effect = queue_with((fake_requests_response(code, body, **headers) for code in codes))
- return mock.patch('requests.Session', m)
-
-def mock_get(side_effect):
- m = mock.MagicMock()
- m.return_value.get.side_effect = side_effect
- return mock.patch('requests.Session', m)
-
-def mock_put(side_effect):
- m = mock.MagicMock()
- m.return_value.put.side_effect = side_effect
- return mock.patch('requests.Session', m)
+ responses = [
+ FakeCurl.make(code=code, body=body, headers=headers)
+ for code in codes
+ ]
+ cm.side_effect = queue_with(responses)
+ cm.responses = responses
+ return mock.patch('pycurl.Curl', cm)
+
class MockStreamReader(object):
def __init__(self, name='.', *data):
--- /dev/null
+import BaseHTTPServer
+import hashlib
+import os
+import re
+import SocketServer
+import time
+
+class Server(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer, object):
+
+ allow_reuse_address = 1
+
+ def __init__(self, *args, **kwargs):
+ self.store = {}
+ self.delays = {
+ # before reading request headers
+ 'request': 0,
+ # before reading request body
+ 'request_body': 0,
+ # before setting response status and headers
+ 'response': 0,
+ # before sending response body
+ 'response_body': 0,
+ # before returning from handler (thus setting response EOF)
+ 'response_close': 0,
+ }
+ super(Server, self).__init__(*args, **kwargs)
+
+ def setdelays(self, **kwargs):
+ """In future requests, induce delays at the given checkpoints."""
+ for (k, v) in kwargs.iteritems():
+ self.delays.get(k) # NameError if unknown key
+ self.delays[k] = v
+
+ def _sleep_at_least(self, seconds):
+ """Sleep for given time, even if signals are received."""
+ wake = time.time() + seconds
+ todo = seconds
+ while todo > 0:
+ time.sleep(todo)
+ todo = wake - time.time()
+
+ def _do_delay(self, k):
+ self._sleep_at_least(self.delays[k])
+
+
+class Handler(BaseHTTPServer.BaseHTTPRequestHandler, object):
+ def handle(self, *args, **kwargs):
+ self.server._do_delay('request')
+ return super(Handler, self).handle(*args, **kwargs)
+
+ def do_GET(self):
+ self.server._do_delay('response')
+ r = re.search(r'[0-9a-f]{32}', self.path)
+ if not r:
+ return self.send_response(422)
+ datahash = r.group(0)
+ if datahash not in self.server.store:
+ return self.send_response(404)
+ self.send_response(200)
+ self.send_header('Content-type', 'application/octet-stream')
+ self.end_headers()
+ self.server._do_delay('response_body')
+ self.wfile.write(self.server.store[datahash])
+ self.server._do_delay('response_close')
+
+ def do_PUT(self):
+ self.server._do_delay('request_body')
+
+ # The comments at https://bugs.python.org/issue1491 implies that Python
+ # 2.7 BaseHTTPRequestHandler was patched to support 100 Continue, but
+ # reading the actual code that ships in Debian it clearly is not, so we
+ # need to send the response on the socket directly.
+
+ self.wfile.write("%s %d %s\r\n\r\n" %
+ (self.protocol_version, 100, "Continue"))
+
+ data = self.rfile.read(int(self.headers.getheader('content-length')))
+ datahash = hashlib.md5(data).hexdigest()
+ self.server.store[datahash] = data
+ self.server._do_delay('response')
+ self.send_response(200)
+ self.send_header('Content-type', 'text/plain')
+ self.end_headers()
+ self.server._do_delay('response_body')
+ self.wfile.write(datahash + '+' + str(len(data)))
+ self.server._do_delay('response_close')
+
+ def log_request(self, *args, **kwargs):
+ if os.environ.get('ARVADOS_DEBUG', None):
+ super(Handler, self).log_request(*args, **kwargs)
+
+ def finish(self, *args, **kwargs):
+ """Ignore exceptions, notably "Broken pipe" when client times out."""
+ try:
+ return super(Handler, self).finish(*args, **kwargs)
+ except:
+ pass
+
+ def handle_one_request(self, *args, **kwargs):
+ """Ignore exceptions, notably "Broken pipe" when client times out."""
+ try:
+ return super(Handler, self).handle_one_request(*args, **kwargs)
+ except:
+ pass
--- /dev/null
+daemon off;
+error_log stderr info; # Yes, must be specified here _and_ cmdline
+events {
+}
+http {
+ access_log /dev/stderr combined;
+ upstream arv-git-http {
+ server localhost:{{GITPORT}};
+ }
+ server {
+ listen *:{{GITSSLPORT}} ssl default_server;
+ server_name _;
+ ssl_certificate {{SSLCERT}};
+ ssl_certificate_key {{SSLKEY}};
+ location / {
+ proxy_pass http://arv-git-http;
+ }
+ }
+ upstream keepproxy {
+ server localhost:{{KEEPPROXYPORT}};
+ }
+ server {
+ listen *:{{KEEPPROXYSSLPORT}} ssl default_server;
+ server_name _;
+ ssl_certificate {{SSLCERT}};
+ ssl_certificate_key {{SSLKEY}};
+ location / {
+ proxy_pass http://keepproxy;
+ }
+ }
+}
#!/usr/bin/env python
+from __future__ import print_function
import argparse
import atexit
import httplib2
os.mkdir(TEST_TMPDIR)
my_api_host = None
+_cached_config = {}
def find_server_pid(PID_PATH, wait=10):
now = time.time()
'-subj', '/CN=0.0.0.0'],
stdout=sys.stderr)
+ # Install the git repository fixtures.
+ gitdir = os.path.join(SERVICES_SRC_DIR, 'api', 'tmp', 'git')
+ gittarball = os.path.join(SERVICES_SRC_DIR, 'api', 'test', 'test.git.tar')
+ if not os.path.isdir(gitdir):
+ os.makedirs(gitdir)
+ subprocess.check_output(['tar', '-xC', gitdir, '-f', gittarball])
+
port = find_available_port()
env = os.environ.copy()
env['RAILS_ENV'] = 'test'
keep_cmd = ["keepstore",
"-volume={}".format(keep0),
"-listen=:{}".format(port),
- "-pid={}".format("{}/keep{}.pid".format(TEST_TMPDIR, n))]
+ "-pid="+_pidfile('keep{}'.format(n))]
for arg, val in keep_args.iteritems():
keep_cmd.append("{}={}".format(arg, val))
- kp0 = subprocess.Popen(keep_cmd)
- with open("{}/keep{}.pid".format(TEST_TMPDIR, n), 'w') as f:
+ kp0 = subprocess.Popen(
+ keep_cmd, stdin=open('/dev/null'), stdout=sys.stderr)
+ with open(_pidfile('keep{}'.format(n)), 'w') as f:
f.write(str(kp0.pid))
with open("{}/keep{}.volume".format(TEST_TMPDIR, n), 'w') as f:
}).execute()
def _stop_keep(n):
- kill_server_pid("{}/keep{}.pid".format(TEST_TMPDIR, n), 0)
+ kill_server_pid(_pidfile('keep{}'.format(n)), 0)
if os.path.exists("{}/keep{}.volume".format(TEST_TMPDIR, n)):
with open("{}/keep{}.volume".format(TEST_TMPDIR, n), 'r') as r:
shutil.rmtree(r.read(), True)
_stop_keep(1)
def run_keep_proxy():
+ if 'ARVADOS_TEST_PROXY_SERVICES' in os.environ:
+ return
stop_keep_proxy()
admin_token = auth_token('admin')
env['ARVADOS_API_TOKEN'] = admin_token
kp = subprocess.Popen(
['keepproxy',
- '-pid={}/keepproxy.pid'.format(TEST_TMPDIR),
+ '-pid='+_pidfile('keepproxy'),
'-listen=:{}'.format(port)],
- env=env)
+ env=env, stdin=open('/dev/null'), stdout=sys.stderr)
api = arvados.api(
version='v1',
'service_ssl_flag': False,
}}).execute()
os.environ["ARVADOS_KEEP_PROXY"] = "http://localhost:{}".format(port)
+ _setport('keepproxy', port)
def stop_keep_proxy():
- kill_server_pid(os.path.join(TEST_TMPDIR, "keepproxy.pid"), 0)
+ if 'ARVADOS_TEST_PROXY_SERVICES' in os.environ:
+ return
+ kill_server_pid(_pidfile('keepproxy'), wait=0)
+
+def run_arv_git_httpd():
+ if 'ARVADOS_TEST_PROXY_SERVICES' in os.environ:
+ return
+ stop_arv_git_httpd()
+
+ gitdir = os.path.join(SERVICES_SRC_DIR, 'api', 'tmp', 'git')
+ gitport = find_available_port()
+ env = os.environ.copy()
+ env.pop('ARVADOS_API_TOKEN', None)
+ agh = subprocess.Popen(
+ ['arv-git-httpd',
+ '-repo-root='+gitdir+'/test',
+ '-address=:'+str(gitport)],
+ env=env, stdin=open('/dev/null'), stdout=sys.stderr)
+ with open(_pidfile('arv-git-httpd'), 'w') as f:
+ f.write(str(agh.pid))
+ _setport('arv-git-httpd', gitport)
+
+def stop_arv_git_httpd():
+ if 'ARVADOS_TEST_PROXY_SERVICES' in os.environ:
+ return
+ kill_server_pid(_pidfile('arv-git-httpd'), wait=0)
+
+def run_nginx():
+ if 'ARVADOS_TEST_PROXY_SERVICES' in os.environ:
+ return
+ nginxconf = {}
+ nginxconf['KEEPPROXYPORT'] = _getport('keepproxy')
+ nginxconf['KEEPPROXYSSLPORT'] = find_available_port()
+ nginxconf['GITPORT'] = _getport('arv-git-httpd')
+ nginxconf['GITSSLPORT'] = find_available_port()
+ nginxconf['SSLCERT'] = os.path.join(SERVICES_SRC_DIR, 'api', 'tmp', 'self-signed.pem')
+ nginxconf['SSLKEY'] = os.path.join(SERVICES_SRC_DIR, 'api', 'tmp', 'self-signed.key')
+
+ conftemplatefile = os.path.join(MY_DIRNAME, 'nginx.conf')
+ conffile = os.path.join(TEST_TMPDIR, 'nginx.conf')
+ with open(conffile, 'w') as f:
+ f.write(re.sub(
+ r'{{([A-Z]+)}}',
+ lambda match: str(nginxconf.get(match.group(1))),
+ open(conftemplatefile).read()))
+
+ env = os.environ.copy()
+ env['PATH'] = env['PATH']+':/sbin:/usr/sbin:/usr/local/sbin'
+ nginx = subprocess.Popen(
+ ['nginx',
+ '-g', 'error_log stderr info;',
+ '-g', 'pid '+_pidfile('nginx')+';',
+ '-c', conffile],
+ env=env, stdin=open('/dev/null'), stdout=sys.stderr)
+ _setport('keepproxy-ssl', nginxconf['KEEPPROXYSSLPORT'])
+ _setport('arv-git-httpd-ssl', nginxconf['GITSSLPORT'])
+
+def stop_nginx():
+ if 'ARVADOS_TEST_PROXY_SERVICES' in os.environ:
+ return
+ kill_server_pid(_pidfile('nginx'), wait=0)
+
+def _pidfile(program):
+ return os.path.join(TEST_TMPDIR, program + '.pid')
+
+def _portfile(program):
+ return os.path.join(TEST_TMPDIR, program + '.port')
+
+def _setport(program, port):
+ with open(_portfile(program), 'w') as f:
+ f.write(str(port))
+
+# Returns 9 if program is not up.
+def _getport(program):
+ try:
+ return int(open(_portfile(program)).read())
+ except IOError:
+ return 9
+
+def _apiconfig(key):
+ if _cached_config:
+ return _cached_config[key]
+ def _load(f):
+ return yaml.load(os.path.join(SERVICES_SRC_DIR, 'api', 'config', f))
+ cdefault = _load('application.default.yml')
+ csite = _load('application.yml')
+ _cached_config = {}
+ for section in [cdefault.get('common',{}), cdefault.get('test',{}),
+ csite.get('common',{}), csite.get('test',{})]:
+ _cached_config.update(section)
+ return _cached_config[key]
def fixture(fix):
'''load a fixture yaml file'''
if __name__ == "__main__":
- actions = ['start', 'stop',
- 'start_keep', 'stop_keep',
- 'start_keep_proxy', 'stop_keep_proxy']
+ actions = [
+ 'start', 'stop',
+ 'start_keep', 'stop_keep',
+ 'start_keep_proxy', 'stop_keep_proxy',
+ 'start_arv-git-httpd', 'stop_arv-git-httpd',
+ 'start_nginx', 'stop_nginx',
+ ]
parser = argparse.ArgumentParser()
parser.add_argument('action', type=str, help="one of {}".format(actions))
parser.add_argument('--auth', type=str, metavar='FIXTURE_NAME', help='Print authorization info for given api_client_authorizations fixture')
args = parser.parse_args()
+ if args.action not in actions:
+ print("Unrecognized action '{}'. Actions are: {}.".format(args.action, actions), file=sys.stderr)
+ sys.exit(1)
if args.action == 'start':
stop(force=('ARVADOS_TEST_API_HOST' not in os.environ))
run(leave_running_atexit=True)
run_keep_proxy()
elif args.action == 'stop_keep_proxy':
stop_keep_proxy()
+ elif args.action == 'start_arv-git-httpd':
+ run_arv_git_httpd()
+ elif args.action == 'stop_arv-git-httpd':
+ stop_arv_git_httpd()
+ elif args.action == 'start_nginx':
+ run_nginx()
+ elif args.action == 'stop_nginx':
+ stop_nginx()
else:
- print("Unrecognized action '{}'. Actions are: {}.".format(args.action, actions))
+ raise Exception("action recognized but not implemented!?")
af = ArvadosFile(ArvadosFileReaderTestCase.MockParent(blocks, nocache), "count.txt", stream=stream, segments=[Range(1, 0, 3), Range(6, 3, 3), Range(11, 6, 3)])
return ArvadosFileReader(af)
- def test_read_returns_first_block(self):
- # read() calls will be aligned on block boundaries - see #3663.
+ def test_read_block_crossing_behavior(self):
+ # read() needs to return all the data requested if possible, even if it
+ # crosses uncached blocks: https://arvados.org/issues/5856
sfile = self.make_count_reader(nocache=True)
- self.assertEqual('123', sfile.read(10))
+ self.assertEqual('12345678', sfile.read(8))
def test_successive_reads(self):
+ # Override StreamFileReaderTestCase.test_successive_reads
sfile = self.make_count_reader(nocache=True)
- for expect in ['123', '456', '789', '']:
- self.assertEqual(expect, sfile.read(10))
+ self.assertEqual('1234', sfile.read(4))
+ self.assertEqual('5678', sfile.read(4))
+ self.assertEqual('9', sfile.read(4))
+ self.assertEqual('', sfile.read(4))
def test_tell_after_block_read(self):
+ # Override StreamFileReaderTestCase.test_tell_after_block_read
sfile = self.make_count_reader(nocache=True)
- sfile.read(5)
- self.assertEqual(3, sfile.tell())
+ self.assertEqual('12345678', sfile.read(8))
+ self.assertEqual(8, sfile.tell())
def test_prefetch(self):
keep = ArvadosFileWriterTestCase.MockKeep({"2e9ec317e197819358fbc43afca7d837+8": "01234567", "e8dc4081b13434b45189a720b77b6818+8": "abcdefgh"})
def test_locator_init(self):
client = self.api_client_mock(200)
# Ensure Keep will not return anything if asked.
- with tutil.mock_get_responses(None, 404):
+ with tutil.mock_keep_responses(None, 404):
reader = arvados.CollectionReader(self.DEFAULT_DATA_HASH,
api_client=client)
self.assertEqual(self.DEFAULT_MANIFEST, reader.manifest_text())
# been written to Keep.
client = self.api_client_mock(200)
self.mock_get_collection(client, 404, None)
- with tutil.mock_get_responses(self.DEFAULT_MANIFEST, 200):
+ with tutil.mock_keep_responses(self.DEFAULT_MANIFEST, 200):
reader = arvados.CollectionReader(self.DEFAULT_DATA_HASH,
api_client=client)
self.assertEqual(self.DEFAULT_MANIFEST, reader.manifest_text())
def test_uuid_init_no_fallback_to_keep(self):
# Do not look up a collection UUID in Keep.
client = self.api_client_mock(404)
- with tutil.mock_get_responses(self.DEFAULT_MANIFEST, 200):
+ with tutil.mock_keep_responses(self.DEFAULT_MANIFEST, 200):
with self.assertRaises(arvados.errors.ApiError):
reader = arvados.CollectionReader(self.DEFAULT_UUID,
api_client=client)
# To verify that CollectionReader tries Keep first here, we
# mock API server to return the wrong data.
client = self.api_client_mock(200)
- with tutil.mock_get_responses(self.ALT_MANIFEST, 200):
+ with tutil.mock_keep_responses(self.ALT_MANIFEST, 200):
self.assertEqual(
self.ALT_MANIFEST,
arvados.CollectionReader(
client = self.api_client_mock(200)
reader = arvados.CollectionReader(self.DEFAULT_UUID, api_client=client,
num_retries=3)
- with tutil.mock_get_responses('foo', 500, 500, 200):
+ with tutil.mock_keep_responses('foo', 500, 500, 200):
self.assertEqual('foo',
''.join(f.read(9) for f in reader.all_files()))
def test_api_response_with_collection_from_keep(self):
client = self.api_client_mock()
self.mock_get_collection(client, 404, 'foo')
- with tutil.mock_get_responses(self.DEFAULT_MANIFEST, 200):
+ with tutil.mock_keep_responses(self.DEFAULT_MANIFEST, 200):
reader = arvados.CollectionReader(self.DEFAULT_DATA_HASH,
api_client=client)
api_response = reader.api_response()
class CollectionWriterTestCase(unittest.TestCase, CollectionTestMixin):
def mock_keep(self, body, *codes, **headers):
headers.setdefault('x-keep-replicas-stored', 2)
- return tutil.mock_put_responses(body, *codes, **headers)
+ return tutil.mock_keep_responses(body, *codes, **headers)
def foo_writer(self, **kwargs):
kwargs.setdefault('api_client', self.api_client_mock())
def test_write_insufficient_replicas_via_proxy(self):
writer = self.foo_writer(replication=3)
- with self.mock_keep(None, 200, headers={'x-keep-replicas-stored': 2}):
+ with self.mock_keep(None, 200, **{'x-keep-replicas-stored': 2}):
with self.assertRaises(arvados.errors.KeepWriteError):
writer.manifest_text()
def test_write_three_replicas(self):
client = mock.MagicMock(name='api_client')
with self.mock_keep(
- None, 500, 500, 500, 200, 200, 200,
+ "", 500, 500, 500, 200, 200, 200,
**{'x-keep-replicas-stored': 1}) as keepmock:
self.mock_keep_services(client, status=200, service_type='disk', count=6)
writer = self.foo_writer(api_client=client, replication=3)
writer.manifest_text()
- # keepmock is the mock session constructor; keepmock.return_value
- # is the mock session object, and keepmock.return_value.put is the
- # actual mock method of interest.
- self.assertEqual(6, keepmock.return_value.put.call_count)
+ self.assertEqual(6, keepmock.call_count)
def test_write_whole_collection_through_retries(self):
writer = self.foo_writer(num_retries=2)
REQUEST_ERRORS = [
('http://keep1.zzzzz.example.org/', IOError("test IOError")),
('http://keep3.zzzzz.example.org/', MemoryError("test MemoryError")),
- ('http://keep5.zzzzz.example.org/', tutil.fake_requests_response(
- 500, "test 500")),
+ ('http://keep5.zzzzz.example.org/',
+ arv_error.HttpError(500, "Internal Server Error")),
('http://keep7.zzzzz.example.org/', IOError("second test IOError")),
]
import hashlib
import mock
import os
+import pycurl
import random
import re
import socket
+import threading
+import time
import unittest
import urlparse
import arvados
import arvados.retry
import arvados_testutil as tutil
+import keepstub
import run_test_server
class KeepTestCase(run_test_server.TestCaseWithServers):
self.assertEqual('100::1', service.hostname)
self.assertEqual(10, service.port)
- # test_get_timeout and test_put_timeout test that
- # KeepClient.get and KeepClient.put use the appropriate timeouts
- # when connected directly to a Keep server (i.e. non-proxy timeout)
+ # test_*_timeout verify that KeepClient instructs pycurl to use
+ # the appropriate connection and read timeouts. They don't care
+ # whether pycurl actually exhibits the expected timeout behavior
+ # -- those tests are in the KeepClientTimeout test class.
def test_get_timeout(self):
api_client = self.mock_keep_services(count=1)
- force_timeout = [socket.timeout("timed out")]
- with tutil.mock_get(force_timeout) as mock_session:
+ force_timeout = socket.timeout("timed out")
+ with tutil.mock_keep_responses(force_timeout, 0) as mock:
keep_client = arvados.KeepClient(api_client=api_client)
with self.assertRaises(arvados.errors.KeepReadError):
keep_client.get('ffffffffffffffffffffffffffffffff')
- self.assertTrue(mock_session.return_value.get.called)
self.assertEqual(
- arvados.KeepClient.DEFAULT_TIMEOUT,
- mock_session.return_value.get.call_args[1]['timeout'])
+ mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
+ int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
+ self.assertEqual(
+ mock.responses[0].getopt(pycurl.TIMEOUT_MS),
+ int(arvados.KeepClient.DEFAULT_TIMEOUT[1]*1000))
def test_put_timeout(self):
api_client = self.mock_keep_services(count=1)
- force_timeout = [socket.timeout("timed out")]
- with tutil.mock_put(force_timeout) as mock_session:
+ force_timeout = socket.timeout("timed out")
+ with tutil.mock_keep_responses(force_timeout, 0) as mock:
keep_client = arvados.KeepClient(api_client=api_client)
with self.assertRaises(arvados.errors.KeepWriteError):
keep_client.put('foo')
- self.assertTrue(mock_session.return_value.put.called)
self.assertEqual(
- arvados.KeepClient.DEFAULT_TIMEOUT,
- mock_session.return_value.put.call_args[1]['timeout'])
+ mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
+ int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
+ self.assertEqual(
+ mock.responses[0].getopt(pycurl.TIMEOUT_MS),
+ int(arvados.KeepClient.DEFAULT_TIMEOUT[1]*1000))
def test_proxy_get_timeout(self):
- # Force a timeout, verifying that the requests.get or
- # requests.put method was called with the proxy_timeout
- # setting rather than the default timeout.
api_client = self.mock_keep_services(service_type='proxy', count=1)
- force_timeout = [socket.timeout("timed out")]
- with tutil.mock_get(force_timeout) as mock_session:
+ force_timeout = socket.timeout("timed out")
+ with tutil.mock_keep_responses(force_timeout, 0) as mock:
keep_client = arvados.KeepClient(api_client=api_client)
with self.assertRaises(arvados.errors.KeepReadError):
keep_client.get('ffffffffffffffffffffffffffffffff')
- self.assertTrue(mock_session.return_value.get.called)
self.assertEqual(
- arvados.KeepClient.DEFAULT_PROXY_TIMEOUT,
- mock_session.return_value.get.call_args[1]['timeout'])
+ mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
+ int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
+ self.assertEqual(
+ mock.responses[0].getopt(pycurl.TIMEOUT_MS),
+ int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]*1000))
def test_proxy_put_timeout(self):
- # Force a timeout, verifying that the requests.get or
- # requests.put method was called with the proxy_timeout
- # setting rather than the default timeout.
api_client = self.mock_keep_services(service_type='proxy', count=1)
- force_timeout = [socket.timeout("timed out")]
- with tutil.mock_put(force_timeout) as mock_session:
+ force_timeout = socket.timeout("timed out")
+ with tutil.mock_keep_responses(force_timeout, 0) as mock:
keep_client = arvados.KeepClient(api_client=api_client)
with self.assertRaises(arvados.errors.KeepWriteError):
keep_client.put('foo')
- self.assertTrue(mock_session.return_value.put.called)
self.assertEqual(
- arvados.KeepClient.DEFAULT_PROXY_TIMEOUT,
- mock_session.return_value.put.call_args[1]['timeout'])
+ mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
+ int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
+ self.assertEqual(
+ mock.responses[0].getopt(pycurl.TIMEOUT_MS),
+ int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]*1000))
def test_probe_order_reference_set(self):
# expected_order[i] is the probe order for
aport = random.randint(1024,65535)
api_client = self.mock_keep_services(service_port=aport, count=16)
keep_client = arvados.KeepClient(api_client=api_client)
- with mock.patch('requests.' + verb,
- side_effect=socket.timeout) as req_mock, \
- self.assertRaises(exc_class) as err_check:
+ with mock.patch('pycurl.Curl') as curl_mock, \
+ self.assertRaises(exc_class) as err_check:
+ curl_mock.return_value.side_effect = socket.timeout
getattr(keep_client, verb)(data)
urls = [urlparse.urlparse(url)
for url in err_check.exception.request_errors()]
def check_errors_from_last_retry(self, verb, exc_class):
api_client = self.mock_keep_services(count=2)
- req_mock = getattr(tutil, 'mock_{}_responses'.format(verb))(
+ req_mock = tutil.mock_keep_responses(
"retry error reporting test", 500, 500, 403, 403)
with req_mock, tutil.skip_sleep, \
self.assertRaises(exc_class) as err_check:
data = 'partial failure test'
data_loc = '{}+{}'.format(hashlib.md5(data).hexdigest(), len(data))
api_client = self.mock_keep_services(count=3)
- with tutil.mock_put_responses(data_loc, 200, 500, 500) as req_mock, \
+ with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
keep_client = arvados.KeepClient(api_client=api_client)
keep_client.put(data)
self.assertEqual(2, len(exc_check.exception.request_errors()))
+class KeepClientTimeout(unittest.TestCase, tutil.ApiClientMock):
+ DATA = 'x' * 2**10
+
+ class assertTakesBetween(unittest.TestCase):
+ def __init__(self, tmin, tmax):
+ self.tmin = tmin
+ self.tmax = tmax
+
+ def __enter__(self):
+ self.t0 = time.time()
+
+ def __exit__(self, *args, **kwargs):
+ self.assertGreater(time.time() - self.t0, self.tmin)
+ self.assertLess(time.time() - self.t0, self.tmax)
+
+ def setUp(self):
+ sock = socket.socket()
+ sock.bind(('0.0.0.0', 0))
+ self.port = sock.getsockname()[1]
+ sock.close()
+ self.server = keepstub.Server(('0.0.0.0', self.port), keepstub.Handler)
+ self.thread = threading.Thread(target=self.server.serve_forever)
+ self.thread.daemon = True # Exit thread if main proc exits
+ self.thread.start()
+ self.api_client = self.mock_keep_services(
+ count=1,
+ service_host='localhost',
+ service_port=self.port,
+ )
+
+ def tearDown(self):
+ self.server.shutdown()
+
+ def keepClient(self, timeouts=(0.1, 1.0)):
+ return arvados.KeepClient(
+ api_client=self.api_client,
+ timeout=timeouts)
+
+ def test_timeout_slow_connect(self):
+ # Can't simulate TCP delays with our own socket. Leave our
+ # stub server running uselessly, and try to connect to an
+ # unroutable IP address instead.
+ self.api_client = self.mock_keep_services(
+ count=1,
+ service_host='240.0.0.0',
+ )
+ with self.assertTakesBetween(0.1, 0.5):
+ with self.assertRaises(arvados.errors.KeepWriteError):
+ self.keepClient((0.1, 1)).put(self.DATA, copies=1, num_retries=0)
+
+ def test_timeout_slow_request(self):
+ self.server.setdelays(request=0.2)
+ self._test_200ms()
+
+ def test_timeout_slow_response(self):
+ self.server.setdelays(response=0.2)
+ self._test_200ms()
+
+ def test_timeout_slow_response_body(self):
+ self.server.setdelays(response_body=0.2)
+ self._test_200ms()
+
+ def _test_200ms(self):
+ """Connect should be t<100ms, request should be 200ms <= t < 300ms"""
+
+ # Allow 100ms to connect, then 1s for response. Everything
+ # should work, and everything should take at least 200ms to
+ # return.
+ kc = self.keepClient((.1, 1))
+ with self.assertTakesBetween(.2, .3):
+ loc = kc.put(self.DATA, copies=1, num_retries=0)
+ with self.assertTakesBetween(.2, .3):
+ self.assertEqual(self.DATA, kc.get(loc, num_retries=0))
+
+ # Allow 1s to connect, then 100ms for response. Nothing should
+ # work, and everything should take at least 100ms to return.
+ kc = self.keepClient((1, .1))
+ with self.assertTakesBetween(.1, .2):
+ with self.assertRaises(arvados.errors.KeepReadError):
+ kc.get(loc, num_retries=0)
+ with self.assertTakesBetween(.1, .2):
+ with self.assertRaises(arvados.errors.KeepWriteError):
+ kc.put(self.DATA, copies=1, num_retries=0)
+
+
class KeepClientGatewayTestCase(unittest.TestCase, tutil.ApiClientMock):
def mock_disks_and_gateways(self, disks=3, gateways=1):
self.gateways = [{
'service_type': 'gateway:test',
} for i in range(gateways)]
self.gateway_roots = [
- "https://[{service_host}]:{service_port}/".format(**gw)
+ "https://{service_host}:{service_port}/".format(**gw)
for gw in self.gateways]
self.api_client = self.mock_keep_services(
count=disks, additional_services=self.gateways)
self.keepClient = arvados.KeepClient(api_client=self.api_client)
- @mock.patch('requests.Session')
- def test_get_with_gateway_hint_first(self, MockSession):
- MockSession.return_value.get.return_value = tutil.fake_requests_response(
+ @mock.patch('pycurl.Curl')
+ def test_get_with_gateway_hint_first(self, MockCurl):
+ MockCurl.return_value = tutil.FakeCurl.make(
code=200, body='foo', headers={'Content-Length': 3})
self.mock_disks_and_gateways()
locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@' + self.gateways[0]['uuid']
self.assertEqual('foo', self.keepClient.get(locator))
- self.assertEqual((self.gateway_roots[0]+locator,),
- MockSession.return_value.get.call_args_list[0][0])
+ self.assertEqual(self.gateway_roots[0]+locator,
+ MockCurl.return_value.getopt(pycurl.URL))
- @mock.patch('requests.Session')
- def test_get_with_gateway_hints_in_order(self, MockSession):
+ @mock.patch('pycurl.Curl')
+ def test_get_with_gateway_hints_in_order(self, MockCurl):
gateways = 4
disks = 3
- MockSession.return_value.get.return_value = tutil.fake_requests_response(
- code=404, body='')
+ mocks = [
+ tutil.FakeCurl.make(code=404, body='')
+ for _ in range(gateways+disks)
+ ]
+ MockCurl.side_effect = tutil.queue_with(mocks)
self.mock_disks_and_gateways(gateways=gateways, disks=disks)
locator = '+'.join(['acbd18db4cc2f85cedef654fccc4a4d8+3'] +
['K@'+gw['uuid'] for gw in self.gateways])
self.keepClient.get(locator)
# Gateways are tried first, in the order given.
for i, root in enumerate(self.gateway_roots):
- self.assertEqual((root+locator,),
- MockSession.return_value.get.call_args_list[i][0])
+ self.assertEqual(root+locator,
+ mocks[i].getopt(pycurl.URL))
# Disk services are tried next.
for i in range(gateways, gateways+disks):
self.assertRegexpMatches(
- MockSession.return_value.get.call_args_list[i][0][0],
+ mocks[i].getopt(pycurl.URL),
r'keep0x')
- @mock.patch('requests.Session')
- def test_get_with_remote_proxy_hint(self, MockSession):
- MockSession.return_value.get.return_value = tutil.fake_requests_response(
+ @mock.patch('pycurl.Curl')
+ def test_get_with_remote_proxy_hint(self, MockCurl):
+ MockCurl.return_value = tutil.FakeCurl.make(
code=200, body='foo', headers={'Content-Length': 3})
self.mock_disks_and_gateways()
locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@xyzzy'
self.assertEqual('foo', self.keepClient.get(locator))
- self.assertEqual(('https://keep.xyzzy.arvadosapi.com/'+locator,),
- MockSession.return_value.get.call_args_list[0][0])
+ self.assertEqual('https://keep.xyzzy.arvadosapi.com/'+locator,
+ MockCurl.return_value.getopt(pycurl.URL))
class KeepClientRetryTestMixin(object):
DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_DATA
DEFAULT_EXCEPTION = arvados.errors.KeepReadError
HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K@xyzzy'
- TEST_PATCHER = staticmethod(tutil.mock_get_responses)
+ TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
def run_method(self, locator=KeepClientRetryTestMixin.TEST_LOCATOR,
*args, **kwargs):
return self.new_client().get(locator, *args, **kwargs)
def test_specific_exception_when_not_found(self):
- with tutil.mock_get_responses(self.DEFAULT_EXPECT, 404, 200):
+ with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200):
self.check_exception(arvados.errors.NotFoundError, num_retries=3)
def test_general_exception_with_mixed_errors(self):
# This test rigs up 50/50 disagreement between two servers, and
# checks that it does not become a NotFoundError.
client = self.new_client()
- with tutil.mock_get_responses(self.DEFAULT_EXPECT, 404, 500):
+ with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 500):
with self.assertRaises(arvados.errors.KeepReadError) as exc_check:
client.get(self.HINTED_LOCATOR)
self.assertNotIsInstance(
"mixed errors raised NotFoundError")
def test_hint_server_can_succeed_without_retries(self):
- with tutil.mock_get_responses(self.DEFAULT_EXPECT, 404, 200, 500):
+ with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200, 500):
self.check_success(locator=self.HINTED_LOCATOR)
def test_try_next_server_after_timeout(self):
- with tutil.mock_get([
- socket.timeout("timed out"),
- tutil.fake_requests_response(200, self.DEFAULT_EXPECT)]):
+ with tutil.mock_keep_responses(
+ (socket.timeout("timed out"), 200),
+ (self.DEFAULT_EXPECT, 200)):
self.check_success(locator=self.HINTED_LOCATOR)
def test_retry_data_with_wrong_checksum(self):
- with tutil.mock_get((tutil.fake_requests_response(200, s) for s in ['baddata', self.TEST_DATA])):
+ with tutil.mock_keep_responses(
+ ('baddata', 200),
+ (self.DEFAULT_EXPECT, 200)):
self.check_success(locator=self.HINTED_LOCATOR)
class KeepClientRetryPutTestCase(KeepClientRetryTestMixin, unittest.TestCase):
DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_LOCATOR
DEFAULT_EXCEPTION = arvados.errors.KeepWriteError
- TEST_PATCHER = staticmethod(tutil.mock_put_responses)
+ TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
def run_method(self, data=KeepClientRetryTestMixin.TEST_DATA,
copies=1, *args, **kwargs):
return self.new_client().put(data, copies, *args, **kwargs)
def test_do_not_send_multiple_copies_to_same_server(self):
- with tutil.mock_put_responses(self.DEFAULT_EXPECT, 200):
+ with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 200):
self.check_exception(copies=2, num_retries=3)
import arvados.retry as arv_retry
import mock
-from arvados_testutil import fake_requests_response
-
class RetryLoopTestMixin(object):
@staticmethod
def loop_success(result):
class CheckHTTPResponseSuccessTestCase(unittest.TestCase):
def results_map(self, *codes):
for code in codes:
- response = fake_requests_response(code, None)
- yield code, arv_retry.check_http_response_success(response)
+ yield code, arv_retry.check_http_response_success(code)
def check(assert_name):
def check_method(self, expected, *codes):
return StreamFileReader(stream, [Range(1, 0, 3), Range(6, 3, 3), Range(11, 6, 3)],
'count.txt')
- def test_read_returns_first_block(self):
+ def test_read_block_crossing_behavior(self):
# read() calls will be aligned on block boundaries - see #3663.
sfile = self.make_count_reader()
self.assertEqual('123', sfile.read(10))
@tutil.skip_sleep
def test_success_without_retries(self):
- with tutil.mock_get_responses('bar', 200):
+ with tutil.mock_keep_responses('bar', 200):
reader = self.reader_for('bar_file')
self.assertEqual('bar', self.read_for_test(reader, 3))
@tutil.skip_sleep
def test_read_no_default_retry(self):
- with tutil.mock_get_responses('', 500):
+ with tutil.mock_keep_responses('', 500):
reader = self.reader_for('user_agreement')
with self.assertRaises(arvados.errors.KeepReadError):
self.read_for_test(reader, 10)
@tutil.skip_sleep
def test_read_with_instance_retries(self):
- with tutil.mock_get_responses('foo', 500, 200):
+ with tutil.mock_keep_responses('foo', 500, 200):
reader = self.reader_for('foo_file', num_retries=3)
self.assertEqual('foo', self.read_for_test(reader, 3))
@tutil.skip_sleep
def test_read_with_method_retries(self):
- with tutil.mock_get_responses('foo', 500, 200):
+ with tutil.mock_keep_responses('foo', 500, 200):
reader = self.reader_for('foo_file')
self.assertEqual('foo',
self.read_for_test(reader, 3, num_retries=3))
@tutil.skip_sleep
def test_read_instance_retries_exhausted(self):
- with tutil.mock_get_responses('bar', 500, 500, 500, 500, 200):
+ with tutil.mock_keep_responses('bar', 500, 500, 500, 500, 200):
reader = self.reader_for('bar_file', num_retries=3)
with self.assertRaises(arvados.errors.KeepReadError):
self.read_for_test(reader, 3)
@tutil.skip_sleep
def test_read_method_retries_exhausted(self):
- with tutil.mock_get_responses('bar', 500, 500, 500, 500, 200):
+ with tutil.mock_keep_responses('bar', 500, 500, 500, 500, 200):
reader = self.reader_for('bar_file')
with self.assertRaises(arvados.errors.KeepReadError):
self.read_for_test(reader, 3, num_retries=3)
@tutil.skip_sleep
def test_method_retries_take_precedence(self):
- with tutil.mock_get_responses('', 500, 500, 500, 200):
+ with tutil.mock_keep_responses('', 500, 500, 500, 200):
reader = self.reader_for('user_agreement', num_retries=10)
with self.assertRaises(arvados.errors.KeepReadError):
self.read_for_test(reader, 10, num_retries=1)
stream_name = unescape token
elsif in_file_tokens or not Locator.valid? token
in_file_tokens = true
- yield [stream_name] + split_file_token(token)
+
+ file_tokens = split_file_token(token)
+ stream_name_adjuster = ''
+ if file_tokens[2].include?('/') # '/' in filename
+ parts = file_tokens[2].rpartition('/')
+ stream_name_adjuster = parts[1] + parts[0] # /dir_parts
+ file_tokens[2] = parts[2]
+ end
+
+ yield [stream_name + stream_name_adjuster] + file_tokens
end
end
end
NONNORMALIZED_MANIFEST =
["./dir2 #{random_block} 0:0:z 0:0:y 0:0:x",
"./dir1 #{random_block} 0:0:p 0:0:o 0:0:n\n"].join("\n")
+ MANIFEST_WITH_DIRS_IN_FILENAMES =
+ [". #{random_block(10)} 0:3:file1 3:3:dir1/file1 6:3:dir1/dir2/file1\n"].join("")
+ MULTILEVEL_MANIFEST_WITH_DIRS_IN_FILENAMES =
+ [". #{random_block(10)} 0:3:file1 3:3:dir1/file1 6:4:dir1/dir2/file1\n",
+ "./dir1 #{random_block(10)} 0:3:file1 3:7:dir2/file1\n"].join("")
### Non-tree manifests
# These manifests follow the spec, but they express a structure that can't
assert !file_name.empty?, "empty file_name in #{name} fixture"
end
end
+
+ def test_collection_with_dirs_in_filenames
+ manifest = Keep::Manifest.new(MANIFEST_WITH_DIRS_IN_FILENAMES)
+
+ seen = Hash.new { |this, key| this[key] = [] }
+
+ manifest.files.each do |stream, basename, size|
+ refute(seen[stream].include?(basename), "each_file repeated #{stream}/#{basename}")
+ assert_equal(3, size, "wrong size for #{stream}/#{basename}")
+ seen[stream] << basename
+ end
+
+ assert_equal(%w(. ./dir1 ./dir1/dir2), seen.keys)
+
+ seen.each_pair do |stream, basenames|
+ assert_equal(%w(file1), basenames.sort, "wrong file list for #{stream}")
+ end
+ end
+
+ def test_multilevel_collection_with_dirs_in_filenames
+ manifest = Keep::Manifest.new(MULTILEVEL_MANIFEST_WITH_DIRS_IN_FILENAMES)
+
+ seen = Hash.new { |this, key| this[key] = [] }
+ expected_sizes = {'.' => 3, './dir1' => 6, './dir1/dir2' => 11}
+
+ manifest.files.each do |stream, basename, size|
+ refute(seen[stream].include?(basename), "each_file repeated #{stream}/#{basename}")
+ assert_equal(expected_sizes[stream], size, "wrong size for #{stream}/#{basename}")
+ seen[stream] << basename
+ end
+
+ assert_equal(%w(. ./dir1 ./dir1/dir2), seen.keys)
+
+ seen.each_pair do |stream, basenames|
+ assert_equal(%w(file1), basenames.sort, "wrong file list for #{stream}")
+ end
+ end
end
end
Server::Application.load_tasks
+
+namespace :test do
+ task(:run).clear
+ # Copied from the definition in Rails 3.2.
+ # This may need to be updated if we upgrade Rails.
+ task :run do
+ errors = %w(test:units test:functionals test:integration test:tasks).collect do |task|
+ begin
+ Rake::Task[task].invoke
+ nil
+ rescue => e
+ { :task => task, :exception => e }
+ end
+ end.compact
+
+ if errors.any?
+ puts errors.map { |e| "Errors running #{e[:task]}! #{e[:exception].inspect}" }.join("\n")
+ abort
+ end
+ end
+end
}.merge opts)
end
+ def self.limit_index_columns_read
+ # This method returns a list of column names.
+ # If an index request reads that column from the database,
+ # find_objects_for_index will only fetch objects until it reads
+ # max_index_database_read bytes of data from those columns.
+ []
+ end
+
def find_objects_for_index
@objects ||= model_class.readable_by(*@read_users)
apply_where_limit_order_params
+ limit_database_read if (action_name == "index")
end
def apply_filters model_class=nil
# Map attribute names in @select to real column names, resolve
# those to fully-qualified SQL column names, and pass the
# resulting string to the select method.
- api_column_map = model_class.attributes_required_columns
- columns_list = @select.
- flat_map { |attr| api_column_map[attr] }.
- uniq.
+ columns_list = model_class.columns_for_attributes(@select).
map { |s| "#{ar_table_name}.#{ActiveRecord::Base.connection.quote_column_name s}" }
@objects = @objects.select(columns_list.join(", "))
end
@objects = @objects.uniq(@distinct) if not @distinct.nil?
end
+ def limit_database_read
+ limit_columns = self.class.limit_index_columns_read
+ limit_columns &= model_class.columns_for_attributes(@select) if @select
+ return if limit_columns.empty?
+ model_class.transaction do
+ limit_query = @objects.
+ select("(%s) as read_length" %
+ limit_columns.map { |s| "length(#{s})" }.join(" + "))
+ new_limit = 0
+ read_total = 0
+ limit_query.find_each do |record|
+ new_limit += 1
+ read_total += record.read_length.to_i
+ break if ((read_total >= Rails.configuration.max_index_database_read) or
+ (new_limit >= @limit))
+ end
+ @limit = new_limit
+ @objects = @objects.limit(@limit)
+ # Force @objects to run its query inside this transaction.
+ @objects.each { |_| break }
+ end
+ end
+
def resource_attrs
return @attrs if @attrs
@attrs = params[resource_name]
require "arvados/keep"
class Arvados::V1::CollectionsController < ApplicationController
+ def self.limit_index_columns_read
+ ["manifest_text"]
+ end
+
def create
if resource_attrs[:uuid] and (loc = Keep::Locator.parse(resource_attrs[:uuid]))
resource_attrs[:portable_data_hash] = loc.to_s
@filters =
[["repository", "=", resource_attrs[:repository]],
["script", "=", resource_attrs[:script]],
- ["script_version", "in git",
- params[:minimum_script_version] || resource_attrs[:script_version]],
["script_version", "not in git", params[:exclude_script_versions]],
].reject { |filter| filter.last.nil? or filter.last.empty? }
+ if !params[:minimum_script_version].blank?
+ @filters << ["script_version", "in git",
+ params[:minimum_script_version]]
+ else
+ add_default_git_filter("script_version", resource_attrs[:repository],
+ resource_attrs[:script_version])
+ end
if image_search = resource_attrs[:runtime_constraints].andand["docker_image"]
if image_tag = resource_attrs[:runtime_constraints]["docker_image_tag"]
image_search += ":#{image_tag}"
end
- @filters.append(["docker_image_locator", "in docker", image_search])
+ image_locator = Collection.
+ for_latest_docker_image(image_search).andand.portable_data_hash
else
- @filters.append(["docker_image_locator", "=", nil])
+ image_locator = nil
end
+ @filters << ["docker_image_locator", "=", image_locator]
if sdk_version = resource_attrs[:runtime_constraints].andand["arvados_sdk_version"]
- @filters.append(["arvados_sdk_version", "in git", sdk_version])
+ add_default_git_filter("arvados_sdk_version", "arvados", sdk_version)
end
begin
load_job_specific_filters
protected
+ def add_default_git_filter(attr_name, repo_name, refspec)
+ # Add a filter to @filters for `attr_name` = the latest commit available
+ # in `repo_name` at `refspec`. No filter is added if refspec can't be
+ # resolved.
+ commits = Commit.find_commit_range(repo_name, nil, refspec, nil)
+ if commit_hash = commits.first
+ @filters << [attr_name, "=", commit_hash]
+ end
+ end
+
def load_job_specific_filters
# Convert Job-specific @filters entries into general SQL filters.
script_info = {"repository" => nil, "script" => nil}
description: "The API to interact with Arvados.",
documentationLink: "http://doc.arvados.org/api/index.html",
defaultCollectionReplication: Rails.configuration.default_collection_replication,
- gitHttpBase: Rails.configuration.git_http_base,
protocol: "rest",
baseUrl: root_url + "arvados/v1/",
basePath: "/arvados/v1/",
servicePath: "arvados/v1/",
batchPath: "batch",
defaultTrashLifetime: Rails.application.config.default_trash_lifetime,
+ blobSignatureTtl: Rails.application.config.blob_signature_ttl,
maxRequestSize: Rails.application.config.max_request_size,
parameters: {
alt: {
api_column_map
end
+ def self.columns_for_attributes(select_attributes)
+ # Given an array of attribute names to select, return an array of column
+ # names that must be fetched from the database to satisfy the request.
+ api_column_map = attributes_required_columns
+ select_attributes.flat_map { |attr| api_column_map[attr] }.uniq
+ end
+
def self.default_orders
["#{table_name}.modified_at desc", "#{table_name}.uuid"]
end
signing_opts = {
key: Rails.configuration.blob_signing_key,
api_token: api_token,
- ttl: Rails.configuration.blob_signing_ttl,
+ ttl: Rails.configuration.blob_signature_ttl,
}
self.manifest_text.lines.each do |entry|
entry.split[1..-1].each do |tok|
signing_opts = {
key: Rails.configuration.blob_signing_key,
api_token: token,
- ttl: Rails.configuration.blob_signing_ttl,
+ ttl: Rails.configuration.blob_signature_ttl,
}
m = manifest.dup
munge_manifest_locators!(m) do |loc|
before_validation :set_priority
before_validation :update_state_from_old_state_attrs
validate :ensure_script_version_is_commit
- validate :find_arvados_sdk_version
validate :find_docker_image_locator
+ validate :find_arvados_sdk_version
validate :validate_status
validate :validate_state_change
validate :ensure_no_collection_uuids_in_script_params
end
def find_docker_image_locator
+ runtime_constraints['docker_image'] =
+ Rails.configuration.default_docker_image_for_jobs if ((runtime_constraints.is_a? Hash) and
+ (runtime_constraints['docker_image']).nil? and
+ Rails.configuration.default_docker_image_for_jobs)
resolve_runtime_constraint("docker_image",
:docker_image_locator) do |image_search|
image_tag = runtime_constraints['docker_image_tag']
belongs_to(:job, foreign_key: :job_uuid, primary_key: :uuid)
attr_accessor :job_readable
- MAX_SLOTS = 64
-
+ @@max_compute_nodes = Rails.configuration.max_compute_nodes
@@dns_server_conf_dir = Rails.configuration.dns_server_conf_dir
@@dns_server_conf_template = Rails.configuration.dns_server_conf_template
@@dns_server_reload_command = Rails.configuration.dns_server_reload_command
rescue ActiveRecord::RecordNotUnique
try_slot += 1
end
- raise "No available node slots" if try_slot == MAX_SLOTS
+ raise "No available node slots" if try_slot == @@max_compute_nodes
end while true
self.hostname = self.class.hostname_for_slot(self.slot_number)
end
# At startup, make sure all DNS entries exist. Otherwise, slurmctld
# will refuse to start.
if @@dns_server_conf_dir and @@dns_server_conf_template
- (0..MAX_SLOTS-1).each do |slot_number|
+ (0..@@max_compute_nodes-1).each do |slot_number|
hostname = hostname_for_slot(slot_number)
hostfile = File.join @@dns_server_conf_dir, "#{hostname}.conf"
if !File.exists? hostfile
t.add :name
t.add :fetch_url
t.add :push_url
+ t.add :clone_urls
end
def self.attributes_required_columns
- super.merge({"push_url" => ["name"], "fetch_url" => ["name"]})
+ super.merge("clone_urls" => ["name"],
+ "fetch_url" => ["name"],
+ "push_url" => ["name"])
end
+ # Deprecated. Use clone_urls instead.
def push_url
- if Rails.configuration.git_host
- "git@%s:%s.git" % [Rails.configuration.git_host, name]
- else
- "git@git.%s.arvadosapi.com:%s.git" % [Rails.configuration.uuid_prefix, name]
- end
+ ssh_clone_url
end
+ # Deprecated. Use clone_urls instead.
def fetch_url
- push_url
+ ssh_clone_url
+ end
+
+ def clone_urls
+ [ssh_clone_url, https_clone_url].compact
end
def server_path
false
end
end
+
+ def ssh_clone_url
+ _clone_url :git_repo_ssh_base, 'git@git.%s.arvadosapi.com:'
+ end
+
+ def https_clone_url
+ _clone_url :git_repo_https_base, 'https://git.%s.arvadosapi.com/'
+ end
+
+ def _clone_url config_var, default_base_fmt
+ configured_base = Rails.configuration.send config_var
+ return nil if configured_base == false
+ prefix = new_record? ? Rails.configuration.uuid_prefix : uuid[0,5]
+ if prefix == Rails.configuration.uuid_prefix and configured_base != true
+ base = configured_base
+ else
+ base = default_base_fmt % prefix
+ end
+ '%s%s.git' % [base, name]
+ end
end
# logic for deciding on a hostname.
host: false
- # If not false, this is the hostname that will be used to generate fetch_url
- # and push_url for git repositories. By default, this will be
- # git.(uuid_prefix).arvadosapi.com
- git_host: false
+ # Base part of SSH git clone url given with repository resources. If
+ # true, the default "git@git.(uuid_prefix).arvadosapi.com:" is
+ # used. If false, SSH clone URLs are not advertised. Include a
+ # trailing ":" or "/" if needed: it will not be added automatically.
+ git_repo_ssh_base: true
+
+ # Base part of HTTPS git clone urls given with repository
+ # resources. This is expected to be an arv-git-httpd service which
+ # accepts API tokens as HTTP-auth passwords. If true, the default
+ # "https://git.(uuid_prefix).arvadosapi.com/" is used. If false,
+ # HTTPS clone URLs are not advertised. Include a trailing ":" or "/"
+ # if needed: it will not be added automatically.
+ git_repo_https_base: true
# If this is not false, HTML requests at the API server's root URL
# are redirected to this location, and it is provided in the text of
# {git_repositories_dir}/arvados/.git
git_repositories_dir: /var/lib/arvados/git
- # If an arv-git-httpd service is running, advertise it in the
- # discovery document by adding its public URI base here. Example:
- # https://git.xxxxx.arvadosapi.com
- git_http_base: false
-
# This is a (bare) repository that stores commits used in jobs. When a job
# runs, the source commits are first fetched into this repository, then this
# repository is used to deploy to compute nodes. This should NOT be a
# a site secret. It should be at least 50 characters.
blob_signing_key: ~
- # Amount of time (in seconds) for which a blob permission signature
- # remains valid. Default: 2 weeks (1209600 seconds)
- blob_signing_ttl: 1209600
+ # Lifetime (in seconds) of blob permission signatures generated by
+ # the API server. This determines how long a client can take (after
+ # retrieving a collection record) to retrieve the collection data
+ # from Keep. If the client needs more time than that (assuming the
+ # collection still has the same content and the relevant user/token
+ # still has permission) the client can retrieve the collection again
+ # to get fresh signatures.
+ #
+ # Datamanager considers an unreferenced block older than this to be
+ # eligible for garbage collection. Therefore, it should never be
+ # smaller than the corresponding value used by any local keepstore
+ # service (see keepstore -blob-signature-ttl flag). This rule
+ # prevents datamanager from trying to garbage-collect recently
+ # written blocks while clients are still holding valid signatures.
+ #
+ # The default is 2 weeks.
+ blob_signature_ttl: 1209600
# Allow clients to create collections by providing a manifest with
# unsigned data blob locators. IMPORTANT: This effectively disables
# should be at least 50 characters.
secret_token: ~
- # email address to which mail should be sent when the user creates profile for the first time
+ # Email address to notify whenever a user creates a profile for the
+ # first time
user_profile_notification_address: false
default_openid_prefix: https://www.google.com/accounts/o8/id
# Note you must separately configure the upstream web server or proxy to
# actually enforce the desired maximum request size on the server side.
max_request_size: 134217728
+
+ # Stop collecting records for an index request after we read this much
+ # data (in bytes) from large database columns.
+ # Currently only `GET /collections` respects this parameter, when the
+ # user requests an index that includes manifest_text. Once the API
+ # server collects records with a total manifest_text size at or above
+ # this amount, it returns those results immediately.
+ # Note this is a threshold, not a limit. Record collection stops
+ # *after* reading this much data.
+ max_index_database_read: 134217728
+
+ # When you run the db:delete_old_job_logs task, it will find jobs that
+ # have been finished for at least this many seconds, and delete their
+ # stderr logs from the logs table.
+ clean_job_log_rows_after: <%= 30.days %>
+
+ # The maximum number of compute nodes that can be in use simultaneously
+ # If this limit is reduced, any existing nodes with slot number >= new limit
+ # will not be counted against the new limit. In other words, the new limit
+ # won't be strictly enforced until those nodes with higher slot numbers
+ # go down.
+ max_compute_nodes: 64
+
+ # Docker image to be used when none found in runtime_constraints of a job
+ default_docker_image_for_jobs: false
--- /dev/null
+class NoFilenamesInCollectionSearchIndex < ActiveRecord::Migration
+ def up
+ remove_index :collections, :name => 'collections_search_index'
+ add_index :collections, ["owner_uuid", "modified_by_client_uuid", "modified_by_user_uuid", "portable_data_hash", "uuid", "name"], name: 'collections_search_index'
+ end
+
+ def down
+ remove_index :collections, :name => 'collections_search_index'
+ add_index :collections, ["owner_uuid", "modified_by_client_uuid", "modified_by_user_uuid", "portable_data_hash", "uuid", "name", "file_names"], name: 'collections_search_index'
+ end
+end
docker_image_locator character varying(255),
priority integer DEFAULT 0 NOT NULL,
description character varying(524288),
- state character varying(255),
- arvados_sdk_version character varying(255)
+ arvados_sdk_version character varying(255),
+ state character varying(255)
);
-- Name: collections_search_index; Type: INDEX; Schema: public; Owner: -; Tablespace:
--
-CREATE INDEX collections_search_index ON collections USING btree (owner_uuid, modified_by_client_uuid, modified_by_user_uuid, portable_data_hash, uuid, name, file_names);
+CREATE INDEX collections_search_index ON collections USING btree (owner_uuid, modified_by_client_uuid, modified_by_user_uuid, portable_data_hash, uuid, name);
--
INSERT INTO schema_migrations (version) VALUES ('20150317132720');
-INSERT INTO schema_migrations (version) VALUES ('20150324152204');
\ No newline at end of file
+INSERT INTO schema_migrations (version) VALUES ('20150324152204');
+
+INSERT INTO schema_migrations (version) VALUES ('20150423145759');
\ No newline at end of file
--- /dev/null
+# This task finds jobs that have been finished for at least as long as
+# the duration specified in the `clean_job_log_rows_after`
+# configuration setting, and deletes their stderr logs from the logs table.
+
+namespace :db do
+ desc "Remove old job stderr entries from the logs table"
+ task delete_old_job_logs: :environment do
+ Log.select("logs.id").
+ joins("JOIN jobs ON object_uuid = jobs.uuid").
+ where("event_type = :etype AND jobs.log IS NOT NULL AND jobs.finished_at < :age",
+ etype: "stderr",
+ age: Rails.configuration.clean_job_log_rows_after.ago).
+ find_in_batches do |old_log_ids|
+ Log.where(id: old_log_ids.map(&:id)).delete_all
+ end
+ end
+end
--- /dev/null
+namespace :test do
+ new_task = Rake::TestTask.new(tasks: "test:prepare") do |t|
+ t.libs << "test"
+ t.pattern = "test/tasks/**/*_test.rb"
+ end
+end
group_class: project
description: An anonymously accessible project
+subproject_in_anonymous_accessible_project:
+ uuid: zzzzz-j7d0g-mhtfesvgmkolpyf
+ owner_uuid: zzzzz-j7d0g-zhxawtyetzwc5f0
+ created_at: 2014-04-21 15:37:48 -0400
+ name: Subproject in anonymous accessible project
+ description: Description for subproject in anonymous accessible project
+ group_class: project
+
active_user_has_can_manage:
uuid: zzzzz-j7d0g-ptt1ou6a9lxrv07
owner_uuid: zzzzz-tpzed-d9tiejq69daie8f
description: ~
updated_at: 2014-11-05 22:31:24.258093171 Z
group_class: project
+
+# Used to test renaming when removed from the "asubproject" while
+# another such object with same name exists in home project.
+subproject_in_active_user_home_project_to_test_unique_key_violation:
+ uuid: zzzzz-j7d0g-subprojsamenam1
+ owner_uuid: zzzzz-tpzed-xurymjxw79nv3jz
+ created_at: 2013-04-21 15:37:48 -0400
+ modified_by_client_uuid: zzzzz-ozdt8-brczlopd8u8d0jr
+ modified_by_user_uuid: zzzzz-tpzed-xurymjxw79nv3jz
+ modified_at: 2013-04-21 15:37:48 -0400
+ updated_at: 2013-04-21 15:37:48 -0400
+ name: Subproject to test owner uuid and name unique key violation upon removal
+ description: Subproject in active user home project to test owner uuid and name unique key violation upon removal
+ group_class: project
+
+subproject_in_asubproject_with_same_name_as_one_in_active_user_home:
+ uuid: zzzzz-j7d0g-subprojsamenam2
+ owner_uuid: zzzzz-j7d0g-axqo7eu9pwvna1x
+ created_at: 2013-04-21 15:37:48 -0400
+ modified_by_client_uuid: zzzzz-ozdt8-brczlopd8u8d0jr
+ modified_by_user_uuid: zzzzz-tpzed-xurymjxw79nv3jz
+ modified_at: 2013-04-21 15:37:48 -0400
+ updated_at: 2013-04-21 15:37:48 -0400
+ name: Subproject to test owner uuid and name unique key violation upon removal
+ description: "Removing this will result in name conflict with 'A project' in Home project and hence get renamed."
+ group_class: project
created_at: <%= 3.minute.ago.to_s(:db) %>
started_at: <%= 3.minute.ago.to_s(:db) %>
finished_at: ~
+ script: hash
+ repository: active/foo
script_version: 1de84a854e2b440dc53bf42f8548afa4c17da332
running: true
success: ~
created_at: <%= 4.minute.ago.to_s(:db) %>
started_at: <%= 3.minute.ago.to_s(:db) %>
finished_at: ~
+ script: hash
+ repository: active/foo
script_version: 1de84a854e2b440dc53bf42f8548afa4c17da332
running: true
success: ~
created_at: <%= 5.minute.ago.to_s(:db) %>
started_at: <%= 3.minute.ago.to_s(:db) %>
finished_at: <%= 2.minute.ago.to_s(:db) %>
+ script: hash
+ repository: active/foo
running: false
success: true
output: d41d8cd98f00b204e9800998ecf8427e+0
previous_job_run:
uuid: zzzzz-8i9sb-cjs4pklxxjykqqq
created_at: <%= 14.minute.ago.to_s(:db) %>
+ finished_at: <%= 13.minutes.ago.to_s(:db) %>
owner_uuid: zzzzz-tpzed-xurymjxw79nv3jz
repository: active/foo
script: hash
input: fa7aeb5140e2848d39b416daeef4ffc5+45
an_integer: "1"
success: true
+ log: d41d8cd98f00b204e9800998ecf8427e+0
+ output: ea10d51bcf88862dbcc36eb292017dfd+45
+ state: Complete
+
+previous_ancient_job_run:
+ uuid: zzzzz-8i9sb-ahd7cie8jah9qui
+ created_at: <%= 366.days.ago.to_s(:db) %>
+ finished_at: <%= 365.days.ago.to_s(:db) %>
+ owner_uuid: zzzzz-tpzed-xurymjxw79nv3jz
+ repository: active/foo
+ script: hash
+ script_version: 4fe459abe02d9b365932b8f5dc419439ab4e2577
+ script_parameters:
+ input: fa7aeb5140e2848d39b416daeef4ffc5+45
+ an_integer: "2"
+ success: true
+ log: d41d8cd98f00b204e9800998ecf8427e+0
output: ea10d51bcf88862dbcc36eb292017dfd+45
state: Complete
input: fa7aeb5140e2848d39b416daeef4ffc5+45
an_integer: "1"
runtime_constraints:
- docker_image: arvados/test
+ docker_image: arvados/apitestfixture
success: true
output: ea10d51bcf88862dbcc36eb292017dfd+45
docker_image_locator: fa3c1a9cb6783f85f2ecda037e07b8c3+167
state: Complete
+previous_ancient_docker_image_job_run:
+ uuid: zzzzz-8i9sb-t3b460aolxxuldl
+ created_at: <%= 144.minute.ago.to_s(:db) %>
+ owner_uuid: zzzzz-tpzed-xurymjxw79nv3jz
+ repository: active/foo
+ script: hash
+ script_version: 4fe459abe02d9b365932b8f5dc419439ab4e2577
+ script_parameters:
+ input: fa7aeb5140e2848d39b416daeef4ffc5+45
+ an_integer: "2"
+ runtime_constraints:
+ docker_image: arvados/apitestfixture
+ success: true
+ output: ea10d51bcf88862dbcc36eb292017dfd+45
+ docker_image_locator: b519d9cb706a29fc7ea24dbea2f05851+93
+ state: Complete
+
previous_job_run_with_arvados_sdk_version:
uuid: zzzzz-8i9sb-eoo0321or2dw2jg
created_at: <%= 14.minute.ago.to_s(:db) %>
an_integer: "1"
runtime_constraints:
arvados_sdk_version: commit2
+ docker_image: arvados/apitestfixture
arvados_sdk_version: 00634b2b8a492d6f121e3cf1d6587b821136a9a7
+ docker_image_locator: fa3c1a9cb6783f85f2ecda037e07b8c3+167
success: true
output: ea10d51bcf88862dbcc36eb292017dfd+45
state: Complete
output: ~
state: Complete
+previous_job_run_superseded_by_hash_branch:
+ # This supplied_script_version is a branch name with later commits.
+ uuid: zzzzz-8i9sb-aeviezu5dahph3e
+ created_at: <%= 15.minute.ago.to_s(:db) %>
+ owner_uuid: zzzzz-tpzed-xurymjxw79nv3jz
+ repository: active/shabranchnames
+ script: testscript
+ script_version: 7387838c69a21827834586cc42b467ff6c63293b
+ supplied_script_version: 738783
+ script_parameters: {}
+ success: true
+ output: d41d8cd98f00b204e9800998ecf8427e+0
+ state: Complete
+
nondeterminisic_job_run:
uuid: zzzzz-8i9sb-cjs4pklxxjykyyy
created_at: <%= 14.minute.ago.to_s(:db) %>
properties:
image_timestamp: "2010-06-10T14:30:00.184019565Z"
+ancient_docker_image_collection_tag:
+ uuid: zzzzz-o0j2j-dockercolltagzz
+ owner_uuid: zzzzz-tpzed-xurymjxw79nv3jz
+ created_at: 2014-06-12 14:30:00.184389725 Z
+ modified_by_client_uuid: zzzzz-ozdt8-brczlopd8u8d0jr
+ modified_by_user_uuid: zzzzz-tpzed-000000000000000
+ modified_at: 2014-06-12 14:30:00.184019565 Z
+ updated_at: 2014-06-12 14:30:00.183829316 Z
+ link_class: docker_image_repo+tag
+ name: arvados/apitestfixture:latest
+ tail_uuid: ~
+ head_uuid: zzzzz-4zz18-t68oksiu9m80s4y
+ properties:
+ image_timestamp: "2010-06-10T14:30:00.184019565Z"
+
+docker_image_tag_like_hash:
+ uuid: zzzzz-o0j2j-dockerhashtagaa
+ owner_uuid: zzzzz-tpzed-xurymjxw79nv3jz
+ created_at: 2014-06-11 14:30:00.184389725 Z
+ modified_by_client_uuid: zzzzz-ozdt8-brczlopd8u8d0jr
+ modified_by_user_uuid: zzzzz-tpzed-000000000000000
+ modified_at: 2014-06-11 14:30:00.184019565 Z
+ updated_at: 2014-06-11 14:30:00.183829316 Z
+ link_class: docker_image_repo+tag
+ name: aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa:latest
+ tail_uuid: ~
+ head_uuid: zzzzz-4zz18-1v45jub259sjjgb
+ properties:
+ image_timestamp: "2014-06-10T14:30:00.184019565Z"
+
job_reader_can_read_previous_job_run:
# Permission link giving job_reader permission
# to read previous_job_run
updated_at: 2014-11-07 23:33:42.347455000 Z
modified_at: 2014-11-07 23:33:42.347455000 Z
object_owner_uuid: zzzzz-j7d0g-v955i6s2oi1cbso
+
+crunchstat_for_previous_job:
+ id: 10
+ uuid: zzzzz-57u5n-eir3aesha3kaene
+ owner_uuid: zzzzz-tpzed-xurymjxw79nv3jz
+ modified_by_client_uuid: zzzzz-ozdt8-obw7foaks3qjyej
+ modified_by_user_uuid: zzzzz-tpzed-xurymjxw79nv3jz
+ object_uuid: zzzzz-8i9sb-cjs4pklxxjykqqq
+ event_at: 2014-11-07 23:33:42.347455000 Z
+ event_type: stderr
+ summary: ~
+ properties:
+ text: '2014-11-07_23:33:41 zzzzz-8i9sb-cjs4pklxxjykqqq 11592 1 stderr crunchstat:
+ cpu 1935.4300 user 59.4100 sys 8 cpus -- interval 10.0002 seconds 12.9900 user
+ 0.9900 sys'
+ created_at: 2014-11-07 23:33:42.351913000 Z
+ updated_at: 2014-11-07 23:33:42.347455000 Z
+ modified_at: 2014-11-07 23:33:42.347455000 Z
+ object_owner_uuid: zzzzz-j7d0g-xurymjxw79nv3jz
+
+crunchstat_for_ancient_job:
+ id: 11
+ uuid: zzzzz-57u5n-ixioph7ieb5ung8
+ owner_uuid: zzzzz-tpzed-xurymjxw79nv3jz
+ modified_by_client_uuid: zzzzz-ozdt8-obw7foaks3qjyej
+ modified_by_user_uuid: zzzzz-tpzed-xurymjxw79nv3jz
+ object_uuid: zzzzz-8i9sb-ahd7cie8jah9qui
+ event_at: 2013-11-07 23:33:42.347455000 Z
+ event_type: stderr
+ summary: ~
+ properties:
+ text: '2013-11-07_23:33:41 zzzzz-8i9sb-ahd7cie8jah9qui 29610 1 stderr crunchstat:
+ cpu 1935.4300 user 59.4100 sys 8 cpus -- interval 10.0002 seconds 12.9900 user
+ 0.9900 sys'
+ created_at: 2013-11-07 23:33:42.351913000 Z
+ updated_at: 2013-11-07 23:33:42.347455000 Z
+ modified_at: 2013-11-07 23:33:42.347455000 Z
+ object_owner_uuid: zzzzz-j7d0g-xurymjxw79nv3jz
pipeline_in_running_state:
name: running_with_job
- state: Ready
uuid: zzzzz-d1hrv-runningpipeline
owner_uuid: zzzzz-tpzed-xurymjxw79nv3jz
created_at: <%= 3.1.minute.ago.to_s(:db) %>
uuid: zzzzz-8i9sb-pshmckwoma9plh7
script_version: master
+running_pipeline_with_complete_job:
+ uuid: zzzzz-d1hrv-partdonepipelin
+ owner_uuid: zzzzz-tpzed-xurymjxw79nv3jz
+ state: RunningOnServer
+ components:
+ previous:
+ job:
+ uuid: zzzzz-8i9sb-cjs4pklxxjykqqq
+ log: zzzzz-4zz18-op4e2lbej01tcvu
+ running:
+ job:
+ uuid: zzzzz-8i9sb-pshmckwoma9plh7
+
+complete_pipeline_with_two_jobs:
+ uuid: zzzzz-d1hrv-twodonepipeline
+ owner_uuid: zzzzz-tpzed-xurymjxw79nv3jz
+ state: Complete
+ components:
+ ancient:
+ job:
+ uuid: zzzzz-8i9sb-ahd7cie8jah9qui
+ log: zzzzz-4zz18-op4e2lbej01tcvu
+ previous:
+ job:
+ uuid: zzzzz-8i9sb-cjs4pklxxjykqqq
+ log: zzzzz-4zz18-op4e2lbej01tcvu
+
+failed_pipeline_with_two_jobs:
+ uuid: zzzzz-d1hrv-twofailpipeline
+ owner_uuid: zzzzz-tpzed-xurymjxw79nv3jz
+ state: Failed
+ components:
+ ancient:
+ job:
+ uuid: zzzzz-8i9sb-ahd7cie8jah9qui
+ log: zzzzz-4zz18-op4e2lbej01tcvu
+ previous:
+ job:
+ uuid: zzzzz-8i9sb-cjs4pklxxjykqqq
+ log: zzzzz-4zz18-op4e2lbej01tcvu
+
# Test Helper trims the rest of the file
# Do not add your fixtures below this line as the rest of this file will be trimmed by test_helper
dataclass: Collection
title: "default input"
description: "input collection"
+
+# Used to test renaming when removed from the "aproject" subproject
+# while another such object with same name exists in home project.
+template_in_active_user_home_project_to_test_unique_key_violation:
+ uuid: zzzzz-p5p6p-templatsamenam1
+ owner_uuid: zzzzz-tpzed-xurymjxw79nv3jz
+ created_at: 2013-04-14 12:35:04 -0400
+ updated_at: 2013-04-14 12:35:04 -0400
+ modified_at: 2013-04-14 12:35:04 -0400
+ modified_by_client_uuid: zzzzz-ozdt8-brczlopd8u8d0jr
+ modified_by_user_uuid: zzzzz-tpzed-xurymjxw79nv3jz
+ name: Template to test owner uuid and name unique key violation upon removal
+ components:
+ script: foo
+ script_version: master
+ script_parameters:
+ input:
+ required: true
+ dataclass: Collection
+ title: "Foo/bar pair"
+ description: "Provide a collection containing at least two files."
+
+template_in_asubproject_with_same_name_as_one_in_active_user_home:
+ uuid: zzzzz-p5p6p-templatsamenam2
+ owner_uuid: zzzzz-j7d0g-axqo7eu9pwvna1x
+ created_at: 2013-04-14 12:35:04 -0400
+ updated_at: 2013-04-14 12:35:04 -0400
+ modified_at: 2013-04-14 12:35:04 -0400
+ modified_by_client_uuid: zzzzz-ozdt8-brczlopd8u8d0jr
+ modified_by_user_uuid: zzzzz-tpzed-xurymjxw79nv3jz
+ name: Template to test owner uuid and name unique key violation upon removal
+ components:
+ script: foo
+ script_version: master
+ script_parameters:
+ input:
+ required: true
+ dataclass: Collection
+ title: "Foo/bar pair"
+ description: "Provide a collection containing at least two files."
name: admin/foo4
created_at: 2015-01-01T00:00:00.123456Z
modified_at: 2015-01-01T00:00:00.123456Z
+
+has_branch_with_commit_hash_name:
+ uuid: zzzzz-s0uqq-382brsig8rp3668
+ owner_uuid: zzzzz-tpzed-xurymjxw79nv3jz # active user
+ name: active/shabranchnames
+ created_at: 2015-01-01T00:00:00.123456Z
+ modified_at: 2015-01-01T00:00:00.123456Z
assert_equal 99999, resp['offset']
end
+ def request_capped_index(params={})
+ authorize_with :user1_with_load
+ coll1 = collections(:collection_1_of_201)
+ Rails.configuration.max_index_database_read =
+ yield(coll1.manifest_text.size)
+ get :index, {
+ select: %w(uuid manifest_text),
+ filters: [["owner_uuid", "=", coll1.owner_uuid]],
+ limit: 300,
+ }.merge(params)
+ end
+
+ test "index with manifest_text limited by max_index_database_read" do
+ request_capped_index() { |size| (size * 3) + 1 }
+ assert_response :success
+ assert_equal(4, json_response["items"].size)
+ assert_equal(4, json_response["limit"])
+ assert_equal(201, json_response["items_available"])
+ end
+
+ test "max_index_database_read does not interfere with limit" do
+ request_capped_index(limit: 5) { |size| size * 20 }
+ assert_response :success
+ assert_equal(5, json_response["items"].size)
+ assert_equal(5, json_response["limit"])
+ assert_equal(201, json_response["items_available"])
+ end
+
+ test "max_index_database_read does not interfere with order" do
+ request_capped_index(order: "name DESC") { |size| (size * 15) + 1 }
+ assert_response :success
+ assert_equal(16, json_response["items"].size)
+ assert_empty(json_response["items"].reject do |coll|
+ coll["name"] !~ /^Collection_9/
+ end)
+ assert_equal(16, json_response["limit"])
+ assert_equal(201, json_response["items_available"])
+ end
+
test "admin can create collection with unsigned manifest" do
authorize_with :admin
test_collection = {
new_job['script_version'])
end
+ test "cannot reuse job when hash-like branch includes newer commit" do
+ check_new_job_created_from({job: {script_version: "738783"}},
+ :previous_job_run_superseded_by_hash_branch)
+ end
+
BASE_FILTERS = {
'repository' => ['=', 'active/foo'],
'script' => ['=', 'hash'],
assert_not_equal(jobs(:previous_docker_job_run).uuid, new_job.uuid)
end
+ test "don't reuse job using older Docker image of same name" do
+ jobspec = {runtime_constraints: {
+ docker_image: "arvados/apitestfixture",
+ }}
+ check_new_job_created_from({job: jobspec},
+ :previous_ancient_docker_image_job_run)
+ end
+
+ test "reuse job with Docker image that has hash name" do
+ jobspec = {runtime_constraints: {
+ docker_image: "a" * 64,
+ }}
+ check_job_reused_from(jobspec, :previous_docker_job_run)
+ end
+
["repository", "script"].each do |skip_key|
test "missing #{skip_key} filter raises an error" do
filters = filters_from_hash(BASE_FILTERS.reject { |k| k == skip_key })
jobs(:previous_docker_job_run).uuid)
end
- def create_foo_hash_job_params(params)
+ JOB_SUBMIT_KEYS = [:script, :script_parameters, :script_version, :repository]
+ DEFAULT_START_JOB = :previous_job_run
+
+ def create_job_params(params, start_from=DEFAULT_START_JOB)
if not params.has_key?(:find_or_create)
params[:find_or_create] = true
end
job_attrs = params.delete(:job) || {}
- params[:job] = {
- script: "hash",
- script_version: "4fe459abe02d9b365932b8f5dc419439ab4e2577",
- repository: "active/foo",
- script_parameters: {
- input: 'fa7aeb5140e2848d39b416daeef4ffc5+45',
- an_integer: '1',
- },
- }.merge(job_attrs)
+ start_job = jobs(start_from)
+ params[:job] = Hash[JOB_SUBMIT_KEYS.map do |key|
+ [key, start_job.send(key)]
+ end]
+ params[:job][:runtime_constraints] =
+ job_attrs.delete(:runtime_constraints) || {}
+ { arvados_sdk_version: :arvados_sdk_version,
+ docker_image_locator: :docker_image }.each do |method, constraint_key|
+ if constraint_value = start_job.send(method)
+ params[:job][:runtime_constraints][constraint_key] ||= constraint_value
+ end
+ end
+ params[:job].merge!(job_attrs)
params
end
- def check_new_job_created_from(params)
- start_time = Time.now
- post(:create, create_foo_hash_job_params(params))
+ def create_job_from(params, start_from)
+ post(:create, create_job_params(params, start_from))
assert_response :success
new_job = assigns(:object)
assert_not_nil new_job
+ new_job
+ end
+
+ def check_new_job_created_from(params, start_from=DEFAULT_START_JOB)
+ start_time = Time.now
+ new_job = create_job_from(params, start_from)
assert_operator(start_time, :<=, new_job.created_at)
new_job
end
- def check_errors_from(params)
- post(:create, create_foo_hash_job_params(params))
+ def check_job_reused_from(params, start_from)
+ new_job = create_job_from(params, start_from)
+ assert_equal(jobs(start_from).uuid, new_job.uuid)
+ end
+
+ def check_errors_from(params, start_from=DEFAULT_START_JOB)
+ post(:create, create_job_params(params, start_from))
assert_includes(405..499, @response.code.to_i)
errors = json_response.fetch("errors", [])
assert(errors.any?, "no errors assigned from #{params}")
"bad refspec not mentioned in error message")
end
- test "can't reuse job with older Arvados SDK version" do
- params = {
- script_version: "31ce37fe365b3dc204300a3e4c396ad333ed0556",
- runtime_constraints: {
- "arvados_sdk_version" => "master",
- "docker_image" => links(:docker_image_collection_tag).name,
- },
- }
- check_new_job_created_from(job: params)
+ test "don't reuse job with older Arvados SDK version specified by branch" do
+ jobspec = {runtime_constraints: {
+ arvados_sdk_version: "master",
+ }}
+ check_new_job_created_from({job: jobspec},
+ :previous_job_run_with_arvados_sdk_version)
+ end
+
+ test "don't reuse job with older Arvados SDK version specified by commit" do
+ jobspec = {runtime_constraints: {
+ arvados_sdk_version: "ca68b24e51992e790f29df5cc4bc54ce1da4a1c2",
+ }}
+ check_new_job_created_from({job: jobspec},
+ :previous_job_run_with_arvados_sdk_version)
+ end
+
+ test "don't reuse job with newer Arvados SDK version specified by commit" do
+ jobspec = {runtime_constraints: {
+ arvados_sdk_version: "436637c87a1d2bdbf4b624008304064b6cf0e30c",
+ }}
+ check_new_job_created_from({job: jobspec},
+ :previous_job_run_with_arvados_sdk_version)
end
test "reuse job from arvados_sdk_version git filters" do
+ prev_job = jobs(:previous_job_run_with_arvados_sdk_version)
filters_hash = BASE_FILTERS.
- merge("arvados_sdk_version" => ["in git", "commit2"])
+ merge("arvados_sdk_version" => ["in git", "commit2"],
+ "docker_image_locator" => ["=", prev_job.docker_image_locator])
filters_hash.delete("script_version")
- params = create_foo_hash_job_params(filters:
- filters_from_hash(filters_hash))
+ params = create_job_params(filters: filters_from_hash(filters_hash))
post(:create, params)
assert_response :success
- assert_equal(jobs(:previous_job_run_with_arvados_sdk_version).uuid,
- assigns(:object).uuid)
+ assert_equal(prev_job.uuid, assigns(:object).uuid)
end
test "create new job because of arvados_sdk_version 'not in git' filters" do
end
[
- {config: "example.com", host: "example.com"},
- {config: false, host: "git.zzzzz.arvadosapi.com"}
- ].each do |set_git_host|
- test "setting git_host to #{set_git_host[:host]} changes fetch/push_url to #{set_git_host[:config]}" do
- Rails.configuration.git_host = set_git_host[:config]
+ {cfg: :git_repo_ssh_base, cfgval: "git@example.com:", match: %r"^git@example.com:/"},
+ {cfg: :git_repo_ssh_base, cfgval: true, match: %r"^git@git.zzzzz.arvadosapi.com:/"},
+ {cfg: :git_repo_ssh_base, cfgval: false, refute: /^git@/ },
+ {cfg: :git_repo_https_base, cfgval: "https://example.com/", match: %r"https://example.com/"},
+ {cfg: :git_repo_https_base, cfgval: true, match: %r"^https://git.zzzzz.arvadosapi.com/"},
+ {cfg: :git_repo_https_base, cfgval: false, refute: /^http/ },
+ ].each do |expect|
+ test "set #{expect[:cfg]} to #{expect[:cfgval]}" do
+ Rails.configuration.send expect[:cfg].to_s+"=", expect[:cfgval]
authorize_with :active
- get(:index)
+ get :index
assert_response :success
- assert_includes(json_response["items"].map { |r| r["fetch_url"] },
- "git@#{set_git_host[:host]}:active/foo.git")
- assert_includes(json_response["items"].map { |r| r["push_url"] },
- "git@#{set_git_host[:host]}:active/foo.git")
+ json_response['items'].each do |r|
+ if expect[:refute]
+ r['clone_urls'].each do |u|
+ refute_match expect[:refute], u
+ end
+ else
+ assert r['clone_urls'].any? do |u|
+ expect[:prefix].match u
+ end
+ end
+ end
end
end
- test "can select push_url in index" do
+ test "select push_url in index" do
authorize_with :active
get(:index, {select: ["uuid", "push_url"]})
assert_response :success
assert_includes(json_response["items"].map { |r| r["push_url"] },
"git@git.zzzzz.arvadosapi.com:active/foo.git")
end
+
+ test "select clone_urls in index" do
+ authorize_with :active
+ get(:index, {select: ["uuid", "clone_urls"]})
+ assert_response :success
+ assert_includes(json_response["items"].map { |r| r["clone_urls"] }.flatten,
+ "git@git.zzzzz.arvadosapi.com:active/foo.git")
+ end
end
module GitTestHelper
def self.included base
base.setup do
- @tmpdir = Dir.mktmpdir()
- system("tar", "-xC", @tmpdir, "-f", "test/test.git.tar")
+ # Extract the test repository data into the default test
+ # environment's Rails.configuration.git_repositories_dir. (We
+ # don't use that config setting here, though: it doesn't seem
+ # worth the risk of stepping on a real git repo root.)
+ @tmpdir = Rails.root.join 'tmp', 'git'
+ FileUtils.mkdir_p @tmpdir
+ system("tar", "-xC", @tmpdir.to_s, "-f", "test/test.git.tar")
Rails.configuration.git_repositories_dir = "#{@tmpdir}/test"
+
intdir = Rails.configuration.git_internal_dir
if not File.exist? intdir
FileUtils.mkdir_p intdir
--- /dev/null
+require 'test_helper'
+require 'rake'
+
+Rake.application.rake_require "tasks/delete_old_job_logs"
+Rake::Task.define_task(:environment)
+
+class DeleteOldJobLogsTaskTest < ActiveSupport::TestCase
+ TASK_NAME = "db:delete_old_job_logs"
+
+ def log_uuids(*fixture_names)
+ fixture_names.map { |name| logs(name).uuid }
+ end
+
+ def run_with_expiry(clean_after)
+ Rails.configuration.clean_job_log_rows_after = clean_after
+ Rake::Task[TASK_NAME].reenable
+ Rake.application.invoke_task TASK_NAME
+ end
+
+ def job_stderr_logs
+ Log.where("object_uuid LIKE :pattern AND event_type = :etype",
+ pattern: "_____-8i9sb-_______________",
+ etype: "stderr")
+ end
+
+ def check_existence(test_method, fixture_uuids)
+ uuids_now = job_stderr_logs.map(&:uuid)
+ fixture_uuids.each do |expect_uuid|
+ send(test_method, uuids_now, expect_uuid)
+ end
+ end
+
+ test "delete all logs" do
+ uuids_to_keep = log_uuids(:crunchstat_for_running_job)
+ uuids_to_clean = log_uuids(:crunchstat_for_previous_job,
+ :crunchstat_for_ancient_job)
+ run_with_expiry(1)
+ check_existence(:assert_includes, uuids_to_keep)
+ check_existence(:refute_includes, uuids_to_clean)
+ end
+
+ test "delete only old logs" do
+ uuids_to_keep = log_uuids(:crunchstat_for_running_job,
+ :crunchstat_for_previous_job)
+ uuids_to_clean = log_uuids(:crunchstat_for_ancient_job)
+ run_with_expiry(360.days)
+ check_existence(:assert_includes, uuids_to_keep)
+ check_existence(:refute_includes, uuids_to_clean)
+ end
+end
search_index_columns = table_class.searchable_columns('ilike')
# Disappointing, but text columns aren't indexed yet.
search_index_columns -= table_class.columns.select { |c|
- c.type == :text or c.name == 'description'
+ c.type == :text or c.name == 'description' or c.name == 'file_names'
}.collect(&:name)
indexes = ActiveRecord::Base.connection.indexes(table)
assert c.valid?
end
end
+
+ test "find_all_for_docker_image resolves names that look like hashes" do
+ coll_list = Collection.
+ find_all_for_docker_image('a' * 64, nil, [users(:active)])
+ coll_uuids = coll_list.map(&:uuid)
+ assert_includes(coll_uuids, collections(:docker_image).uuid)
+ end
end
assert $?.success?
end
+ # In active/shabranchnames, "7387838c69a21827834586cc42b467ff6c63293b" is
+ # both a commit hash, and the name of a branch that begins from that same
+ # commit.
+ COMMIT_BRANCH_NAME = "7387838c69a21827834586cc42b467ff6c63293b"
+ # A commit that appears in the branch after 7387838c.
+ COMMIT_BRANCH_COMMIT_2 = "abec49829bf1758413509b7ffcab32a771b71e81"
+ # "738783" is another branch that starts from the above commit.
+ SHORT_COMMIT_BRANCH_NAME = COMMIT_BRANCH_NAME[0, 6]
+ # A commit that appears in branch 738783 after 7387838c.
+ SHORT_BRANCH_COMMIT_2 = "77e1a93093663705a63bb4d505698047e109dedd"
+
+ test "find_commit_range min_version prefers commits over branch names" do
+ assert_equal([COMMIT_BRANCH_NAME],
+ Commit.find_commit_range("active/shabranchnames",
+ COMMIT_BRANCH_NAME, nil, nil))
+ end
+
+ test "find_commit_range max_version prefers commits over branch names" do
+ assert_equal([COMMIT_BRANCH_NAME],
+ Commit.find_commit_range("active/shabranchnames",
+ nil, COMMIT_BRANCH_NAME, nil))
+ end
+
+ test "find_commit_range min_version with short branch name" do
+ assert_equal([SHORT_BRANCH_COMMIT_2],
+ Commit.find_commit_range("active/shabranchnames",
+ SHORT_COMMIT_BRANCH_NAME, nil, nil))
+ end
+
+ test "find_commit_range max_version with short branch name" do
+ assert_equal([SHORT_BRANCH_COMMIT_2],
+ Commit.find_commit_range("active/shabranchnames",
+ nil, SHORT_COMMIT_BRANCH_NAME, nil))
+ end
+
+ test "find_commit_range min_version with disambiguated branch name" do
+ assert_equal([COMMIT_BRANCH_COMMIT_2],
+ Commit.find_commit_range("active/shabranchnames",
+ "heads/#{COMMIT_BRANCH_NAME}",
+ nil, nil))
+ end
+
+ test "find_commit_range max_version with disambiguated branch name" do
+ assert_equal([COMMIT_BRANCH_COMMIT_2],
+ Commit.find_commit_range("active/shabranchnames", nil,
+ "heads/#{COMMIT_BRANCH_NAME}", nil))
+ end
+
+ test "find_commit_range min_version with unambiguous short name" do
+ assert_equal([COMMIT_BRANCH_NAME],
+ Commit.find_commit_range("active/shabranchnames",
+ COMMIT_BRANCH_NAME[0..-2], nil, nil))
+ end
+
+ test "find_commit_range max_version with unambiguous short name" do
+ assert_equal([COMMIT_BRANCH_NAME],
+ Commit.find_commit_range("active/shabranchnames", nil,
+ COMMIT_BRANCH_NAME[0..-2], nil))
+ end
+
test "find_commit_range laundry list" do
authorize_with :active
assert(job.invalid?, "Job with bad Docker tag valid")
end
+ [
+ false,
+ true
+ ].each do |use_config|
+ test "Job with no Docker image uses default docker image when configuration is set #{use_config}" do
+ default_docker_image = collections(:docker_image)[:portable_data_hash]
+ Rails.configuration.default_docker_image_for_jobs = default_docker_image if use_config
+
+ job = Job.new job_attrs
+ assert job.valid?, job.errors.full_messages.to_s
+
+ if use_config
+ refute_nil job.docker_image_locator
+ assert_equal default_docker_image, job.docker_image_locator
+ else
+ assert_nil job.docker_image_locator
+ end
+ end
+ end
+
+ test "create a job with a disambiguated script_version branch name" do
+ job = Job.
+ new(script: "testscript",
+ script_version: "heads/7387838c69a21827834586cc42b467ff6c63293b",
+ repository: "active/shabranchnames",
+ script_parameters: {})
+ assert(job.save)
+ assert_equal("abec49829bf1758413509b7ffcab32a771b71e81", job.script_version)
+ end
+
test "locate a Docker image with a partial hash" do
image_hash = links(:docker_image_collection_hash).name[0..24]
job = Job.new job_attrs(runtime_constraints:
test "fetch_url" do
repo = new_repo(:active, name: "active/fetchtest")
+ repo.save
assert_equal(default_git_url("fetchtest", "active"), repo.fetch_url)
end
set_user_from_auth :admin
repo = Repository.new(owner_uuid: users(:system_user).uuid,
name: "fetchtest")
+ repo.save
assert_equal(default_git_url("fetchtest"), repo.fetch_url)
end
test "push_url" do
repo = new_repo(:active, name: "active/pushtest")
+ repo.save
assert_equal(default_git_url("pushtest", "active"), repo.push_url)
end
set_user_from_auth :admin
repo = Repository.new(owner_uuid: users(:system_user).uuid,
name: "pushtest")
+ repo.save
assert_equal(default_git_url("pushtest"), repo.push_url)
end
w.WriteHeader(statusCode)
w.Write([]byte(statusText))
}
- log.Println(quoteStrings(r.RemoteAddr, username, password, wroteStatus, statusText, repoName, r.URL.Path)...)
+ log.Println(quoteStrings(r.RemoteAddr, username, password, wroteStatus, statusText, repoName, r.Method, r.URL.Path)...)
}()
// HTTP request username is logged, but unused. Password is an
username, password, ok := BasicAuth(r)
if !ok || username == "" || password == "" {
statusCode, statusText = http.StatusUnauthorized, "no credentials provided"
- w.Header().Add("WWW-Authenticate", "basic")
+ w.Header().Add("WWW-Authenticate", "Basic realm=\"git\"")
return
}
arv.ApiToken = password
reposFound := arvadosclient.Dict{}
if err := arv.List("repositories", arvadosclient.Dict{
- "filters": [][]string{[]string{"name", "=", repoName}},
+ "filters": [][]string{{"name", "=", repoName}},
}, &reposFound); err != nil {
statusCode, statusText = http.StatusInternalServerError, err.Error()
return
func TestBasicAuth(t *testing.T) {
tests := []basicAuthTestCase{
- basicAuthTestCase{"Basic Zm9vOmJhcg==", "foo", "bar", true},
- basicAuthTestCase{"Bogus Zm9vOmJhcg==", "", "", false},
- basicAuthTestCase{"Zm9vOmJhcg==", "", "", false},
- basicAuthTestCase{"Basic", "", "", false},
- basicAuthTestCase{"", "", "", false},
+ {"Basic Zm9vOmJhcg==", "foo", "bar", true},
+ {"Bogus Zm9vOmJhcg==", "", "", false},
+ {"Zm9vOmJhcg==", "", "", false},
+ {"Basic", "", "", false},
+ {"", "", "", false},
}
for _, test := range tests {
if u, p, ok := BasicAuth(&http.Request{Header: map[string][]string{
- "Authorization": []string{test.hdr},
+ "Authorization": {test.hdr},
}}); u != test.user || p != test.pass || ok != test.ok {
t.Error("got:", u, p, ok, "expected:", test.user, test.pass, test.ok, "from:", test.hdr)
}
var _ = check.Suite(&IntegrationSuite{})
+const (
+ spectatorToken = "zw2f4gwx8hw8cjre7yp6v1zylhrhn3m5gvjq73rtpwhmknrybu"
+ activeToken = "3kg6k6lzmp9kj5cpkcoxie963cmvjahbt2fod9zru30k1jqdmi"
+ anonymousToken = "4kg6k6lzmp9kj4cpkcoxie964cmvjahbt4fod9zru44k4jqdmi"
+)
+
// IntegrationSuite tests need an API server and an arv-git-httpd server
type IntegrationSuite struct {
tmpRepoRoot string
func (s *IntegrationSuite) TestPathVariants(c *check.C) {
s.makeArvadosRepo(c)
- // Spectator token
- os.Setenv("ARVADOS_API_TOKEN", "zw2f4gwx8hw8cjre7yp6v1zylhrhn3m5gvjq73rtpwhmknrybu")
for _, repo := range []string{"active/foo.git", "active/foo/.git", "arvados.git", "arvados/.git"} {
- err := s.runGit(c, "fetch", repo)
+ err := s.runGit(c, spectatorToken, "fetch", repo)
c.Assert(err, check.Equals, nil)
}
}
func (s *IntegrationSuite) TestReadonly(c *check.C) {
- // Spectator token
- os.Setenv("ARVADOS_API_TOKEN", "zw2f4gwx8hw8cjre7yp6v1zylhrhn3m5gvjq73rtpwhmknrybu")
- err := s.runGit(c, "fetch", "active/foo.git")
+ err := s.runGit(c, spectatorToken, "fetch", "active/foo.git")
c.Assert(err, check.Equals, nil)
- err = s.runGit(c, "push", "active/foo.git", "master:newbranchfail")
+ err = s.runGit(c, spectatorToken, "push", "active/foo.git", "master:newbranchfail")
c.Assert(err, check.ErrorMatches, `.*HTTP code = 403.*`)
_, err = os.Stat(s.tmpRepoRoot + "/zzzzz-s0uqq-382brsig8rp3666/.git/refs/heads/newbranchfail")
c.Assert(err, check.FitsTypeOf, &os.PathError{})
}
func (s *IntegrationSuite) TestReadwrite(c *check.C) {
- // Active user token
- os.Setenv("ARVADOS_API_TOKEN", "3kg6k6lzmp9kj5cpkcoxie963cmvjahbt2fod9zru30k1jqdmi")
- err := s.runGit(c, "fetch", "active/foo.git")
+ err := s.runGit(c, activeToken, "fetch", "active/foo.git")
c.Assert(err, check.Equals, nil)
- err = s.runGit(c, "push", "active/foo.git", "master:newbranch")
+ err = s.runGit(c, activeToken, "push", "active/foo.git", "master:newbranch")
c.Assert(err, check.Equals, nil)
_, err = os.Stat(s.tmpRepoRoot + "/zzzzz-s0uqq-382brsig8rp3666/.git/refs/heads/newbranch")
c.Assert(err, check.Equals, nil)
}
func (s *IntegrationSuite) TestNonexistent(c *check.C) {
- // Spectator token
- os.Setenv("ARVADOS_API_TOKEN", "zw2f4gwx8hw8cjre7yp6v1zylhrhn3m5gvjq73rtpwhmknrybu")
- err := s.runGit(c, "fetch", "thisrepodoesnotexist.git")
+ err := s.runGit(c, spectatorToken, "fetch", "thisrepodoesnotexist.git")
c.Assert(err, check.ErrorMatches, `.* not found.*`)
}
func (s *IntegrationSuite) TestMissingGitdirReadableRepository(c *check.C) {
- // Active user token
- os.Setenv("ARVADOS_API_TOKEN", "3kg6k6lzmp9kj5cpkcoxie963cmvjahbt2fod9zru30k1jqdmi")
- err := s.runGit(c, "fetch", "active/foo2.git")
+ err := s.runGit(c, activeToken, "fetch", "active/foo2.git")
c.Assert(err, check.ErrorMatches, `.* not found.*`)
}
func (s *IntegrationSuite) TestNoPermission(c *check.C) {
- // Anonymous token
- os.Setenv("ARVADOS_API_TOKEN", "4kg6k6lzmp9kj4cpkcoxie964cmvjahbt4fod9zru44k4jqdmi")
for _, repo := range []string{"active/foo.git", "active/foo/.git"} {
- err := s.runGit(c, "fetch", repo)
+ err := s.runGit(c, anonymousToken, "fetch", repo)
c.Assert(err, check.ErrorMatches, `.* not found.*`)
}
}
_, err = exec.Command("sh", "-c", "cd "+s.tmpWorkdir+" && echo work >work && git add work && git -c user.name=Foo -c user.email=Foo commit -am 'workdir: test'").CombinedOutput()
c.Assert(err, check.Equals, nil)
+ _, err = exec.Command("git", "config",
+ "--file", s.tmpWorkdir+"/.git/config",
+ "credential.http://"+s.testServer.Addr+"/.helper",
+ "!cred(){ cat >/dev/null; if [ \"$1\" = get ]; then echo password=$ARVADOS_API_TOKEN; fi; };cred").Output()
+ c.Assert(err, check.Equals, nil)
+ _, err = exec.Command("git", "config",
+ "--file", s.tmpWorkdir+"/.git/config",
+ "credential.http://"+s.testServer.Addr+"/.username",
+ "none").Output()
+ c.Assert(err, check.Equals, nil)
+
theConfig = &config{
Addr: ":",
GitCommand: "/usr/bin/git",
// Clear ARVADOS_API_TOKEN after starting up the server, to
// make sure arv-git-httpd doesn't use it.
- os.Setenv("ARVADOS_API_TOKEN", "")
-
- _, err = exec.Command("git", "config",
- "--file", s.tmpWorkdir+"/.git/config",
- "credential.http://"+s.testServer.Addr+"/.helper",
- "!cred(){ echo password=$ARVADOS_API_TOKEN; };cred").Output()
- c.Assert(err, check.Equals, nil)
- _, err = exec.Command("git", "config",
- "--file", s.tmpWorkdir+"/.git/config",
- "credential.http://"+s.testServer.Addr+"/.username",
- "none").Output()
- c.Assert(err, check.Equals, nil)
+ os.Setenv("ARVADOS_API_TOKEN", "unused-token-placates-client-library")
}
func (s *IntegrationSuite) TearDownTest(c *check.C) {
}
}
-func (s *IntegrationSuite) runGit(c *check.C, gitCmd, repo string, args ...string) error {
+func (s *IntegrationSuite) runGit(c *check.C, token, gitCmd, repo string, args ...string) error {
cwd, err := os.Getwd()
c.Assert(err, check.Equals, nil)
defer os.Chdir(cwd)
gitCmd, "http://" + s.testServer.Addr + "/" + repo,
}, args...)
cmd := exec.Command("git", gitargs...)
+ cmd.Env = append(os.Environ(), "ARVADOS_API_TOKEN="+token)
w, err := cmd.StdinPipe()
c.Assert(err, check.Equals, nil)
w.Close()
// Make a bare arvados repo at {tmpRepoRoot}/arvados.git
func (s *IntegrationSuite) makeArvadosRepo(c *check.C) {
- _, err := exec.Command("git", "init", "--bare", s.tmpRepoRoot+"/zzzzz-s0uqq-arvadosrepo0123.git").Output()
+ msg, err := exec.Command("git", "init", "--bare", s.tmpRepoRoot+"/zzzzz-s0uqq-arvadosrepo0123.git").CombinedOutput()
+ c.Log(string(msg))
c.Assert(err, check.Equals, nil)
- _, err = exec.Command("git", "--git-dir", s.tmpRepoRoot+"/zzzzz-s0uqq-arvadosrepo0123.git", "fetch", "../../.git", "master:master").Output()
+ msg, err = exec.Command("git", "--git-dir", s.tmpRepoRoot+"/zzzzz-s0uqq-arvadosrepo0123.git", "fetch", "../../.git", "HEAD:master").CombinedOutput()
+ c.Log(string(msg))
c.Assert(err, check.Equals, nil)
}
--- /dev/null
+crunchstat
// cgroup root for the given statgroup. (This will avoid falling back
// to host-level stats during container setup and teardown.)
func OpenStatFile(cgroup Cgroup, statgroup string, stat string) (*os.File, error) {
- var paths = []string{
- fmt.Sprintf("%s/%s/%s/%s/%s", cgroup.root, statgroup, cgroup.parent, cgroup.cid, stat),
- fmt.Sprintf("%s/%s/%s/%s", cgroup.root, cgroup.parent, cgroup.cid, stat),
- fmt.Sprintf("%s/%s/%s", cgroup.root, statgroup, stat),
- fmt.Sprintf("%s/%s", cgroup.root, stat),
+ var paths []string
+ if cgroup.cid != "" {
+ // Collect container's stats
+ paths = []string{
+ fmt.Sprintf("%s/%s/%s/%s/%s", cgroup.root, statgroup, cgroup.parent, cgroup.cid, stat),
+ fmt.Sprintf("%s/%s/%s/%s", cgroup.root, cgroup.parent, cgroup.cid, stat),
+ }
+ } else {
+ // Collect this host's stats
+ paths = []string{
+ fmt.Sprintf("%s/%s/%s", cgroup.root, statgroup, stat),
+ fmt.Sprintf("%s/%s", cgroup.root, stat),
+ }
}
var path string
var file *os.File
// whether we happen to collect stats [a] before any
// processes have been created in the container and
// [b] after all contained processes have exited.
- reportedStatFile[stat] = path
if path == "" {
- statLog.Printf("error finding stats file: stat %s, statgroup %s, cid %s, parent %s, root %s\n", stat, statgroup, cgroup.cid, cgroup.parent, cgroup.root)
+ statLog.Printf("notice: stats not available: stat %s, statgroup %s, cid %s, parent %s, root %s\n", stat, statgroup, cgroup.cid, cgroup.parent, cgroup.root)
+ } else if ok {
+ statLog.Printf("notice: stats moved from %s to %s\n", reportedStatFile[stat], path)
} else {
- statLog.Printf("error reading stats from %s\n", path)
+ statLog.Printf("notice: reading stats from %s\n", path)
}
+ reportedStatFile[stat] = path
}
return file, err
}
"""
- def __init__(self, uid, gid,
- encoding="utf-8",
- inode_cache=InodeCache(cap=256*1024*1024),
- num_retries=4):
+ def __init__(self, uid, gid, encoding="utf-8", inode_cache=None, num_retries=4):
super(Operations, self).__init__()
+ if not inode_cache:
+ inode_cache = InodeCache(cap=256*1024*1024)
self.inodes = Inodes(inode_cache)
self.uid = uid
self.gid = gid
return (self.collection_locator is not None)
def objsize(self):
+ # This is an empirically-derived heuristic to estimate the memory used
+ # to store this collection's metadata. Calculating the memory
+ # footprint directly would be more accurate, but also more complicated.
return self._manifest_size * 128
class MagicDirectory(Directory):
import unittest
class InodeTests(unittest.TestCase):
- def test_inodes(self):
+ def test_inodes_basic(self):
cache = arvados_fuse.InodeCache(1000, 4)
inodes = arvados_fuse.Inodes(cache)
# Check that ent1 gets added to inodes
ent1 = mock.MagicMock()
- ent1.return_value.in_use = False
+ ent1.in_use.return_value = False
ent1.persisted.return_value = True
ent1.clear.return_value = True
ent1.objsize.return_value = 500
self.assertIs(inodes[ent1.inode], ent1)
self.assertEqual(500, cache.total())
+ def test_inodes_not_persisted(self):
+ cache = arvados_fuse.InodeCache(1000, 4)
+ inodes = arvados_fuse.Inodes(cache)
+
+ ent1 = mock.MagicMock()
+ ent1.in_use.return_value = False
+ ent1.persisted.return_value = True
+ ent1.clear.return_value = True
+ ent1.objsize.return_value = 500
+ inodes.add_entry(ent1)
+
# ent2 is not persisted, so it doesn't
# affect the cache total
ent2 = mock.MagicMock()
- ent2.return_value.in_use = False
+ ent2.in_use.return_value = False
ent2.persisted.return_value = False
ent2.objsize.return_value = 600
inodes.add_entry(ent2)
self.assertEqual(500, cache.total())
+ def test_inode_cleared(self):
+ cache = arvados_fuse.InodeCache(1000, 4)
+ inodes = arvados_fuse.Inodes(cache)
+
+ # Check that ent1 gets added to inodes
+ ent1 = mock.MagicMock()
+ ent1.in_use.return_value = False
+ ent1.persisted.return_value = True
+ ent1.clear.return_value = True
+ ent1.objsize.return_value = 500
+ inodes.add_entry(ent1)
+
# ent3 is persisted, adding it should cause ent1 to get cleared
ent3 = mock.MagicMock()
- ent3.return_value.in_use = False
+ ent3.in_use.return_value = False
ent3.persisted.return_value = True
ent3.objsize.return_value = 600
ent3.clear.return_value = True
self.assertTrue(ent3.clear.called)
self.assertEqual(500, cache.total())
+ def test_clear_false(self):
+ cache = arvados_fuse.InodeCache(1000, 4)
+ inodes = arvados_fuse.Inodes(cache)
+
+ ent1 = mock.MagicMock()
+ ent1.in_use.return_value = False
+ ent1.persisted.return_value = True
+ ent1.clear.return_value = True
+ ent1.objsize.return_value = 500
+ inodes.add_entry(ent1)
+
+ ent3 = mock.MagicMock()
+ ent3.in_use.return_value = False
+ ent3.persisted.return_value = True
+ ent3.objsize.return_value = 600
+ ent3.clear.return_value = True
+ inodes.add_entry(ent3)
+
+ cache.min_entries = 1
+
# ent1, ent3 clear return false, can't be cleared
ent1.clear.return_value = False
ent3.clear.return_value = False
ent3.clear.return_value = True
ent1.clear.called = False
ent3.clear.called = False
- self.assertFalse(ent1.clear.called)
- self.assertFalse(ent3.clear.called)
cache.touch(ent3)
self.assertTrue(ent1.clear.called)
self.assertTrue(ent3.clear.called)
self.assertEqual(500, cache.total())
+ def test_delete(self):
+ cache = arvados_fuse.InodeCache(1000, 4)
+ inodes = arvados_fuse.Inodes(cache)
+
+ ent1 = mock.MagicMock()
+ ent1.in_use.return_value = False
+ ent1.persisted.return_value = True
+ ent1.clear.return_value = True
+ ent1.objsize.return_value = 500
+ inodes.add_entry(ent1)
+
+ ent3 = mock.MagicMock()
+ ent3.in_use.return_value = False
+ ent3.persisted.return_value = True
+ ent3.objsize.return_value = 600
+ ent3.clear.return_value = True
+
# Delete ent1
+ self.assertEqual(500, cache.total())
ent1.clear.return_value = True
ent1.ref_count = 0
inodes.del_entry(ent1)
"os"
"os/signal"
"reflect"
+ "regexp"
"sync"
"syscall"
"time"
var ContentLengthMismatch = errors.New("Actual length != expected content length")
var MethodNotSupported = errors.New("Method not supported")
+var removeHint, _ = regexp.Compile("\\+K@[a-z0-9]{5}(\\+|$)")
+
func (this GetBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
SetCorsHeaders(resp)
var reader io.ReadCloser
+ locator = removeHint.ReplaceAllString(locator, "$1")
+
switch req.Method {
case "HEAD":
expectLength, proxiedURI, err = kc.Ask(locator)
fmt.Sprintf("%x+%d", md5.Sum([]byte("qux")), 3))
}
}
+
+func (s *ServerRequiredSuite) TestStripHint(c *C) {
+ c.Check(removeHint.ReplaceAllString("http://keep.zzzzz.arvadosapi.com:25107/2228819a18d3727630fa30c81853d23f+67108864+A37b6ab198qqqq28d903b975266b23ee711e1852c@55635f73+K@zzzzz", "$1"),
+ Equals,
+ "http://keep.zzzzz.arvadosapi.com:25107/2228819a18d3727630fa30c81853d23f+67108864+A37b6ab198qqqq28d903b975266b23ee711e1852c@55635f73")
+ c.Check(removeHint.ReplaceAllString("http://keep.zzzzz.arvadosapi.com:25107/2228819a18d3727630fa30c81853d23f+67108864+K@zzzzz+A37b6ab198qqqq28d903b975266b23ee711e1852c@55635f73", "$1"),
+ Equals,
+ "http://keep.zzzzz.arvadosapi.com:25107/2228819a18d3727630fa30c81853d23f+67108864+A37b6ab198qqqq28d903b975266b23ee711e1852c@55635f73")
+ c.Check(removeHint.ReplaceAllString("http://keep.zzzzz.arvadosapi.com:25107/2228819a18d3727630fa30c81853d23f+67108864+A37b6ab198qqqq28d903b975266b23ee711e1852c@55635f73+K@zzzzz-zzzzz-zzzzzzzzzzzzzzz", "$1"),
+ Equals,
+ "http://keep.zzzzz.arvadosapi.com:25107/2228819a18d3727630fa30c81853d23f+67108864+A37b6ab198qqqq28d903b975266b23ee711e1852c@55635f73+K@zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+ c.Check(removeHint.ReplaceAllString("http://keep.zzzzz.arvadosapi.com:25107/2228819a18d3727630fa30c81853d23f+67108864+K@zzzzz-zzzzz-zzzzzzzzzzzzzzz+A37b6ab198qqqq28d903b975266b23ee711e1852c@55635f73", "$1"),
+ Equals,
+ "http://keep.zzzzz.arvadosapi.com:25107/2228819a18d3727630fa30c81853d23f+67108864+K@zzzzz-zzzzz-zzzzzzzzzzzzzzz+A37b6ab198qqqq28d903b975266b23ee711e1852c@55635f73")
+
+}
--- /dev/null
+package main
+
+import (
+ "log"
+ "sync"
+ "time"
+)
+
+type bufferPool struct {
+ // limiter has a "true" placeholder for each in-use buffer.
+ limiter chan bool
+ // Pool has unused buffers.
+ sync.Pool
+}
+
+func newBufferPool(count int, bufSize int) *bufferPool {
+ p := bufferPool{}
+ p.New = func() interface{} {
+ return make([]byte, bufSize)
+ }
+ p.limiter = make(chan bool, count)
+ return &p
+}
+
+func (p *bufferPool) Get(size int) []byte {
+ select {
+ case p.limiter <- true:
+ default:
+ t0 := time.Now()
+ log.Printf("reached max buffers (%d), waiting", cap(p.limiter))
+ p.limiter <- true
+ log.Printf("waited %v for a buffer", time.Since(t0))
+ }
+ buf := p.Pool.Get().([]byte)
+ if cap(buf) < size {
+ log.Fatalf("bufferPool Get(size=%d) but max=%d", size, cap(buf))
+ }
+ return buf[:size]
+}
+
+func (p *bufferPool) Put(buf []byte) {
+ p.Pool.Put(buf)
+ <-p.limiter
+}
--- /dev/null
+package main
+
+import (
+ . "gopkg.in/check.v1"
+ "testing"
+ "time"
+)
+
+// Gocheck boilerplate
+func TestBufferPool(t *testing.T) {
+ TestingT(t)
+}
+var _ = Suite(&BufferPoolSuite{})
+type BufferPoolSuite struct {}
+
+// Initialize a default-sized buffer pool for the benefit of test
+// suites that don't run main().
+func init() {
+ bufs = newBufferPool(maxBuffers, BLOCKSIZE)
+}
+
+func (s *BufferPoolSuite) TestBufferPoolBufSize(c *C) {
+ bufs := newBufferPool(2, 10)
+ b1 := bufs.Get(1)
+ bufs.Get(2)
+ bufs.Put(b1)
+ b3 := bufs.Get(3)
+ c.Check(len(b3), Equals, 3)
+}
+
+func (s *BufferPoolSuite) TestBufferPoolUnderLimit(c *C) {
+ bufs := newBufferPool(3, 10)
+ b1 := bufs.Get(10)
+ bufs.Get(10)
+ testBufferPoolRace(c, bufs, b1, "Get")
+}
+
+func (s *BufferPoolSuite) TestBufferPoolAtLimit(c *C) {
+ bufs := newBufferPool(2, 10)
+ b1 := bufs.Get(10)
+ bufs.Get(10)
+ testBufferPoolRace(c, bufs, b1, "Put")
+}
+
+func testBufferPoolRace(c *C, bufs *bufferPool, unused []byte, expectWin string) {
+ race := make(chan string)
+ go func() {
+ bufs.Get(10)
+ time.Sleep(time.Millisecond)
+ race <- "Get"
+ }()
+ go func() {
+ time.Sleep(10*time.Millisecond)
+ bufs.Put(unused)
+ race <- "Put"
+ }()
+ c.Check(<-race, Equals, expectWin)
+ c.Check(<-race, Not(Equals), expectWin)
+ close(race)
+}
+
+func (s *BufferPoolSuite) TestBufferPoolReuse(c *C) {
+ bufs := newBufferPool(2, 10)
+ bufs.Get(10)
+ last := bufs.Get(10)
+ // The buffer pool is allowed to throw away unused buffers
+ // (e.g., during sync.Pool's garbage collection hook, in the
+ // the current implementation). However, if unused buffers are
+ // getting thrown away and reallocated more than {arbitrary
+ // frequency threshold} during a busy loop, it's not acting
+ // much like a buffer pool.
+ allocs := 1000
+ reuses := 0
+ for i := 0; i < allocs; i++ {
+ bufs.Put(last)
+ next := bufs.Get(10)
+ copy(last, []byte("last"))
+ copy(next, []byte("next"))
+ if last[0] == 'n' {
+ reuses++
+ }
+ last = next
+ }
+ c.Check(reuses > allocs * 95/100, Equals, true)
+}
// Turn on permission settings so we can generate signed locators.
enforce_permissions = true
PermissionSecret = []byte(known_key)
- permission_ttl = time.Duration(300) * time.Second
+ blob_signature_ttl = 300 * time.Second
var (
unsigned_locator = "/" + TEST_HASH
- valid_timestamp = time.Now().Add(permission_ttl)
+ valid_timestamp = time.Now().Add(blob_signature_ttl)
expired_timestamp = time.Now().Add(-time.Hour)
signed_locator = "/" + SignLocator(TEST_HASH, known_token, valid_timestamp)
expired_locator = "/" + SignLocator(TEST_HASH, known_token, expired_timestamp)
// With a server key.
PermissionSecret = []byte(known_key)
- permission_ttl = time.Duration(300) * time.Second
+ blob_signature_ttl = 300 * time.Second
// When a permission key is available, the locator returned
// from an authenticated PUT request will be signed.
IssueRequest(
&RequestTester{
method: "PUT",
- uri: "/"+TEST_HASH,
+ uri: "/" + TEST_HASH,
request_body: TEST_BLOCK,
})
IssueRequest(
&RequestTester{
method: "DELETE",
- uri: "/"+TEST_HASH,
+ uri: "/" + TEST_HASH,
request_body: TEST_BLOCK,
api_token: data_manager_token,
})
vols := KeepVM.AllWritable()
vols[0].Put(TEST_HASH, TEST_BLOCK)
- // Explicitly set the permission_ttl to 0 for these
+ // Explicitly set the blob_signature_ttl to 0 for these
// tests, to ensure the MockVolume deletes the blocks
// even though they have just been created.
- permission_ttl = time.Duration(0)
+ blob_signature_ttl = time.Duration(0)
var user_token = "NOT DATA MANAGER TOKEN"
data_manager_token = "DATA MANAGER TOKEN"
t.Error("superuser_existing_block_req: block not deleted")
}
- // A DELETE request on a block newer than permission_ttl should return
- // success but leave the block on the volume.
+ // A DELETE request on a block newer than blob_signature_ttl
+ // should return success but leave the block on the volume.
vols[0].Put(TEST_HASH, TEST_BLOCK)
- permission_ttl = time.Duration(1) * time.Hour
+ blob_signature_ttl = time.Hour
response = IssueRequest(superuser_existing_block_req)
ExpectStatusCode(t,
"Invalid pull request from the data manager",
RequestTester{"/pull", data_manager_token, "PUT", bad_json},
http.StatusBadRequest,
- "Bad Request\n",
+ "",
},
}
"Invalid trash list from the data manager",
RequestTester{"/trash", data_manager_token, "PUT", bad_json},
http.StatusBadRequest,
- "Bad Request\n",
+ "",
},
}
testname string,
expected_body string,
response *httptest.ResponseRecorder) {
- if response.Body.String() != expected_body {
+ if expected_body != "" && response.Body.String() != expected_body {
t.Errorf("%s: expected response body '%s', got %+v",
testname, expected_body, response)
}
rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
-
- // For IndexHandler we support:
- // /index - returns all locators
- // /index/{prefix} - returns all locators that begin with {prefix}
- // {prefix} is a string of hexadecimal digits between 0 and 32 digits.
- // If {prefix} is the empty string, return an index of all locators
- // (so /index and /index/ behave identically)
- // A client may supply a full 32-digit locator string, in which
- // case the server will return an index with either zero or one
- // entries. This usage allows a client to check whether a block is
- // present, and its size and upload time, without retrieving the
- // entire block.
- //
+ // List all blocks stored here. Privileged client only.
rest.HandleFunc(`/index`, IndexHandler).Methods("GET", "HEAD")
- rest.HandleFunc(
- `/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
+ // List blocks stored here whose hash has the given prefix.
+ // Privileged client only.
+ rest.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
+
+ // List volumes: path, device number, bytes used/avail.
rest.HandleFunc(`/status.json`, StatusHandler).Methods("GET", "HEAD")
- // The PullHandler and TrashHandler process "PUT /pull" and "PUT
- // /trash" requests from Data Manager. These requests instruct
- // Keep to replicate or delete blocks; see
- // https://arvados.org/projects/arvados/wiki/Keep_Design_Doc
- // for more details.
- //
- // Each handler parses the JSON list of block management requests
- // in the message body, and replaces any existing pull queue or
- // trash queue with their contentes.
- //
+ // Replace the current pull queue.
rest.HandleFunc(`/pull`, PullHandler).Methods("PUT")
+
+ // Replace the current trash queue.
rest.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
// Any request which does not match any of these routes gets
}
block, err := GetBlock(hash, false)
-
- // Garbage collect after each GET. Fixes #2865.
- // TODO(twp): review Keep memory usage and see if there's
- // a better way to do this than blindly garbage collecting
- // after every block.
- defer runtime.GC()
-
if err != nil {
// This type assertion is safe because the only errors
// GetBlock can return are DiskHashError or NotFoundError.
http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
return
}
+ defer bufs.Put(block)
- resp.Header().Set("Content-Length", fmt.Sprintf("%d", len(block)))
-
- _, err = resp.Write(block)
-
- return
+ resp.Header().Set("Content-Length", strconv.Itoa(len(block)))
+ resp.Header().Set("Content-Type", "application/octet-stream")
+ resp.Write(block)
}
func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
hash := mux.Vars(req)["hash"]
- // Read the block data to be stored.
- // If the request exceeds BLOCKSIZE bytes, issue a HTTP 500 error.
- //
+ // Detect as many error conditions as possible before reading
+ // the body: avoid transmitting data that will not end up
+ // being written anyway.
+
+ if req.ContentLength == -1 {
+ http.Error(resp, SizeRequiredError.Error(), SizeRequiredError.HTTPCode)
+ return
+ }
+
if req.ContentLength > BLOCKSIZE {
http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
return
}
- buf := make([]byte, req.ContentLength)
- nread, err := io.ReadFull(req.Body, buf)
+ if len(KeepVM.AllWritable()) == 0 {
+ http.Error(resp, FullError.Error(), FullError.HTTPCode)
+ return
+ }
+
+ buf := bufs.Get(int(req.ContentLength))
+ _, err := io.ReadFull(req.Body, buf)
if err != nil {
http.Error(resp, err.Error(), 500)
- } else if int64(nread) < req.ContentLength {
- http.Error(resp, "request truncated", 500)
- } else {
- if err := PutBlock(buf, hash); err == nil {
- // Success; add a size hint, sign the locator if
- // possible, and return it to the client.
- return_hash := fmt.Sprintf("%s+%d", hash, len(buf))
- api_token := GetApiToken(req)
- if PermissionSecret != nil && api_token != "" {
- expiry := time.Now().Add(permission_ttl)
- return_hash = SignLocator(return_hash, api_token, expiry)
- }
- resp.Write([]byte(return_hash + "\n"))
- } else {
- ke := err.(*KeepError)
- http.Error(resp, ke.Error(), ke.HTTPCode)
- }
+ bufs.Put(buf)
+ return
}
- return
+
+ err = PutBlock(buf, hash)
+ bufs.Put(buf)
+
+ if err != nil {
+ ke := err.(*KeepError)
+ http.Error(resp, ke.Error(), ke.HTTPCode)
+ return
+ }
+
+ // Success; add a size hint, sign the locator if possible, and
+ // return it to the client.
+ return_hash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
+ api_token := GetApiToken(req)
+ if PermissionSecret != nil && api_token != "" {
+ expiry := time.Now().Add(blob_signature_ttl)
+ return_hash = SignLocator(return_hash, api_token, expiry)
+ }
+ resp.Write([]byte(return_hash + "\n"))
}
// IndexHandler
prefix := mux.Vars(req)["prefix"]
- var index string
for _, vol := range KeepVM.AllReadable() {
- index = index + vol.Index(prefix)
+ if err := vol.IndexTo(prefix, resp); err != nil {
+ // The only errors returned by IndexTo are
+ // write errors returned by resp.Write(),
+ // which probably means the client has
+ // disconnected and this error will never be
+ // reported to the client -- but it will
+ // appear in our own error log.
+ http.Error(resp, err.Error(), http.StatusInternalServerError)
+ return
+ }
}
- resp.Write([]byte(index))
}
// StatusHandler
var pr []PullRequest
r := json.NewDecoder(req.Body)
if err := r.Decode(&pr); err != nil {
- http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode)
+ http.Error(resp, err.Error(), BadRequestError.HTTPCode)
return
}
var trash []TrashRequest
r := json.NewDecoder(req.Body)
if err := r.Decode(&trash); err != nil {
- http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode)
+ http.Error(resp, err.Error(), BadRequestError.HTTPCode)
return
}
// enforce_permissions controls whether permission signatures
// should be enforced (affecting GET and DELETE requests).
-// Initialized by the --enforce-permissions flag.
+// Initialized by the -enforce-permissions flag.
var enforce_permissions bool
-// permission_ttl is the time duration for which new permission
+// blob_signature_ttl is the time duration for which new permission
// signatures (returned by PUT requests) will be valid.
-// Initialized by the --permission-ttl flag.
-var permission_ttl time.Duration
+// Initialized by the -permission-ttl flag.
+var blob_signature_ttl time.Duration
// data_manager_token represents the API token used by the
// Data Manager, and is required on certain privileged operations.
-// Initialized by the --data-manager-token-file flag.
+// Initialized by the -data-manager-token-file flag.
var data_manager_token string
// never_delete can be used to prevent the DELETE handler from
// actually deleting anything.
var never_delete = false
+var maxBuffers = 128
+var bufs *bufferPool
+
// ==========
// Error types.
//
NotFoundError = &KeepError{404, "Not Found"}
GenericError = &KeepError{500, "Fail"}
FullError = &KeepError{503, "Full"}
- TooLongError = &KeepError{504, "Timeout"}
+ SizeRequiredError = &KeepError{411, "Missing Content-Length"}
+ TooLongError = &KeepError{413, "Block is too large"}
MethodDisabledError = &KeepError{405, "Method disabled"}
)
flagSerializeIO bool
flagReadonly bool
)
+
type volumeSet []Volume
func (vs *volumeSet) Set(value string) error {
if _, err := os.Stat(value); err != nil {
return err
}
- *vs = append(*vs, MakeUnixVolume(value, flagSerializeIO, flagReadonly))
+ *vs = append(*vs, &UnixVolume{
+ root: value,
+ serialize: flagSerializeIO,
+ readonly: flagReadonly,
+ })
return nil
}
// permission arguments).
func main() {
- log.Println("Keep started: pid", os.Getpid())
+ log.Println("keepstore starting, pid", os.Getpid())
+ defer log.Println("keepstore exiting, pid", os.Getpid())
var (
data_manager_token_file string
listen string
- permission_key_file string
+ blob_signing_key_file string
permission_ttl_sec int
volumes volumeSet
pidfile string
"If set, nothing will be deleted. HTTP 405 will be returned "+
"for valid DELETE requests.")
flag.StringVar(
- &permission_key_file,
+ &blob_signing_key_file,
"permission-key-file",
"",
+ "Synonym for -blob-signing-key-file.")
+ flag.StringVar(
+ &blob_signing_key_file,
+ "blob-signing-key-file",
+ "",
"File containing the secret key for generating and verifying "+
- "permission signatures.")
+ "blob permission signatures.")
flag.IntVar(
&permission_ttl_sec,
"permission-ttl",
- 1209600,
- "Expiration time (in seconds) for newly generated permission "+
- "signatures.")
+ 0,
+ "Synonym for -blob-signature-ttl.")
+ flag.IntVar(
+ &permission_ttl_sec,
+ "blob-signature-ttl",
+ int(time.Duration(2*7*24*time.Hour).Seconds()),
+ "Lifetime of blob permission signatures. "+
+ "See services/api/config/application.default.yml.")
flag.BoolVar(
&flagSerializeIO,
"serialize",
&pidfile,
"pid",
"",
- "Path to write pid file")
+ "Path to write pid file during startup. This file is kept open and locked with LOCK_EX until keepstore exits, so `fuser -k pidfile` is one way to shut down. Exit immediately if there is an error opening, locking, or writing the pid file.")
+ flag.IntVar(
+ &maxBuffers,
+ "max-buffers",
+ maxBuffers,
+ fmt.Sprintf("Maximum RAM to use for data buffers, given in multiples of block size (%d MiB). When this limit is reached, HTTP requests requiring buffers (like GET and PUT) will wait for buffer space to be released.", BLOCKSIZE>>20))
flag.Parse()
+ if maxBuffers < 0 {
+ log.Fatal("-max-buffers must be greater than zero.")
+ }
+ bufs = newBufferPool(maxBuffers, BLOCKSIZE)
+
+ if pidfile != "" {
+ f, err := os.OpenFile(pidfile, os.O_RDWR | os.O_CREATE, 0777)
+ if err != nil {
+ log.Fatalf("open pidfile (%s): %s", pidfile, err)
+ }
+ err = syscall.Flock(int(f.Fd()), syscall.LOCK_EX | syscall.LOCK_NB)
+ if err != nil {
+ log.Fatalf("flock pidfile (%s): %s", pidfile, err)
+ }
+ err = f.Truncate(0)
+ if err != nil {
+ log.Fatalf("truncate pidfile (%s): %s", pidfile, err)
+ }
+ _, err = fmt.Fprint(f, os.Getpid())
+ if err != nil {
+ log.Fatalf("write pidfile (%s): %s", pidfile, err)
+ }
+ err = f.Sync()
+ if err != nil {
+ log.Fatalf("sync pidfile (%s): %s", pidfile, err)
+ }
+ defer f.Close()
+ defer os.Remove(pidfile)
+ }
+
if len(volumes) == 0 {
if volumes.Discover() == 0 {
log.Fatal("No volumes found.")
log.Fatalf("reading data manager token: %s\n", err)
}
}
- if permission_key_file != "" {
- if buf, err := ioutil.ReadFile(permission_key_file); err == nil {
+ if blob_signing_key_file != "" {
+ if buf, err := ioutil.ReadFile(blob_signing_key_file); err == nil {
PermissionSecret = bytes.TrimSpace(buf)
} else {
log.Fatalf("reading permission key: %s\n", err)
}
}
- // Initialize permission TTL
- permission_ttl = time.Duration(permission_ttl_sec) * time.Second
+ blob_signature_ttl = time.Duration(permission_ttl_sec) * time.Second
- // If --enforce-permissions is true, we must have a permission key
- // to continue.
if PermissionSecret == nil {
if enforce_permissions {
- log.Fatal("--enforce-permissions requires a permission key")
+ log.Fatal("-enforce-permissions requires a permission key")
} else {
log.Println("Running without a PermissionSecret. Block locators " +
"returned by this server will not be signed, and will be rejected " +
"by a server that enforces permissions.")
- log.Println("To fix this, run Keep with --permission-key-file=<path> " +
- "to define the location of a file containing the permission key.")
+ log.Println("To fix this, use the -blob-signing-key-file flag " +
+ "to specify the file containing the permission key.")
}
}
listener.Close()
}(term)
signal.Notify(term, syscall.SIGTERM)
+ signal.Notify(term, syscall.SIGINT)
- if pidfile != "" {
- f, err := os.Create(pidfile)
- if err == nil {
- fmt.Fprint(f, os.Getpid())
- f.Close()
- } else {
- log.Printf("Error writing pid file (%s): %s", pidfile, err.Error())
- }
- }
-
- // Start listening for requests.
+ log.Println("listening at", listen)
srv := &http.Server{Addr: listen}
srv.Serve(listener)
-
- log.Println("shutting down")
-
- if pidfile != "" {
- os.Remove(pidfile)
- }
}
t.Errorf("Discover returned %s, expected %s\n",
resultVols[i].(*UnixVolume).root, tmpdir)
}
- if expectReadonly := i % 2 == 1; expectReadonly != resultVols[i].(*UnixVolume).readonly {
+ if expectReadonly := i%2 == 1; expectReadonly != resultVols[i].(*UnixVolume).readonly {
t.Errorf("Discover added %s with readonly=%v, should be %v",
tmpdir, !expectReadonly, expectReadonly)
}
vols[0].Put(TEST_HASH+".meta", []byte("metadata"))
vols[1].Put(TEST_HASH_2+".meta", []byte("metadata"))
- index := vols[0].Index("") + vols[1].Index("")
- index_rows := strings.Split(index, "\n")
+ buf := new(bytes.Buffer)
+ vols[0].IndexTo("", buf)
+ vols[1].IndexTo("", buf)
+ index_rows := strings.Split(string(buf.Bytes()), "\n")
sort.Strings(index_rows)
sorted_index := strings.Join(index_rows, "\n")
expected := `^\n` + TEST_HASH + `\+\d+ \d+\n` +
match, err := regexp.MatchString(expected, sorted_index)
if err == nil {
if !match {
- t.Errorf("IndexLocators returned:\n%s", index)
+ t.Errorf("IndexLocators returned:\n%s", string(buf.Bytes()))
}
} else {
t.Errorf("regexp.MatchString: %s", err)
"log"
"net/http"
"strings"
+ "time"
)
type LoggingResponseWriter struct {
}
func (loggingRouter *LoggingRESTRouter) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
- loggingWriter := LoggingResponseWriter{200, 0, resp, ""}
+ t0 := time.Now()
+ loggingWriter := LoggingResponseWriter{http.StatusOK, 0, resp, ""}
loggingRouter.router.ServeHTTP(&loggingWriter, req)
- statusText := "OK"
+ statusText := http.StatusText(loggingWriter.Status)
if loggingWriter.Status >= 400 {
statusText = strings.Replace(loggingWriter.ResponseBody, "\n", "", -1)
}
- log.Printf("[%s] %s %s %d %d \"%s\"", req.RemoteAddr, req.Method, req.URL.Path[1:], loggingWriter.Status, loggingWriter.Length, statusText)
+ log.Printf("[%s] %s %s %.6fs %d %d \"%s\"", req.RemoteAddr, req.Method, req.URL.Path[1:], time.Since(t0).Seconds(), loggingWriter.Status, loggingWriter.Length, statusText)
}
PermissionSecret = []byte(known_key)
defer func() { PermissionSecret = nil }()
- if !VerifySignature(known_locator + "+K@xyzzy" + known_sig_hint, known_token) {
+ if !VerifySignature(known_locator+"+K@xyzzy"+known_sig_hint, known_token) {
t.Fatal("Verify cannot handle hint before permission signature")
}
- if !VerifySignature(known_locator + known_sig_hint + "+Zfoo", known_token) {
+ if !VerifySignature(known_locator+known_sig_hint+"+Zfoo", known_token) {
t.Fatal("Verify cannot handle hint after permission signature")
}
- if !VerifySignature(known_locator + "+K@xyzzy" + known_sig_hint + "+Zfoo", known_token) {
+ if !VerifySignature(known_locator+"+K@xyzzy"+known_sig_hint+"+Zfoo", known_token) {
t.Fatal("Verify cannot handle hints around permission signature")
}
}
PermissionSecret = []byte(known_key)
defer func() { PermissionSecret = nil }()
- if !VerifySignature(known_hash + "+999999" + known_sig_hint, known_token) {
+ if !VerifySignature(known_hash+"+999999"+known_sig_hint, known_token) {
t.Fatal("Verify cannot handle incorrect size hint")
}
- if !VerifySignature(known_hash + known_sig_hint, known_token) {
+ if !VerifySignature(known_hash+known_sig_hint, known_token) {
t.Fatal("Verify cannot handle missing size hint")
}
}
package main
import (
+ "bytes"
+ "errors"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/arvadostest"
"git.curoverse.com/arvados.git/sdk/go/keepclient"
+ "io"
"net/http"
"os"
"strings"
func performPullWorkerIntegrationTest(testData PullWorkIntegrationTestData, pullRequest PullRequest, t *testing.T) {
// Override PutContent to mock PutBlock functionality
+ defer func(orig func([]byte, string)(error)) { PutContent = orig }(PutContent)
PutContent = func(content []byte, locator string) (err error) {
if string(content) != testData.Content {
t.Errorf("PutContent invoked with unexpected data. Expected: %s; Found: %s", testData.Content, content)
return
}
+ // Override GetContent to mock keepclient Get functionality
+ defer func(orig func(string, *keepclient.KeepClient)(io.ReadCloser, int64, string, error)) { GetContent = orig }(GetContent)
+ GetContent = func(signedLocator string, keepClient *keepclient.KeepClient) (
+ reader io.ReadCloser, contentLength int64, url string, err error) {
+ if testData.GetError != "" {
+ return nil, 0, "", errors.New(testData.GetError)
+ }
+ rdr := &ClosingBuffer{bytes.NewBufferString(testData.Content)}
+ return rdr, int64(len(testData.Content)), "", nil
+ }
+
keepClient.Arvados.ApiToken = GenerateRandomApiToken()
err := PullItemAndProcess(pullRequest, keepClient.Arvados.ApiToken, keepClient)
if len(testData.GetError) > 0 {
if (err == nil) || (!strings.Contains(err.Error(), testData.GetError)) {
- t.Errorf("Got error %v", err)
+ t.Errorf("Got error %v, expected %v", err, testData.GetError)
}
} else {
if err != nil {
- t.Errorf("Got error %v", err)
+ t.Errorf("Got error %v, expected nil", err)
}
}
}
var first_pull_list = []byte(`[
{
- "locator":"locator1",
+ "locator":"acbd18db4cc2f85cedef654fccc4a4d8+3",
"servers":[
"server_1",
"server_2"
]
- },
- {
- "locator":"locator2",
+ },{
+ "locator":"37b51d194a7513e45b56f6524f2d51f2+3",
"servers":[
"server_3"
]
var second_pull_list = []byte(`[
{
- "locator":"locator3",
+ "locator":"73feffa4b7f6bb68e44cf984c85f6e88+3",
"servers":[
"server_1",
- "server_2"
+ "server_2"
]
}
]`)
testPullLists[testData.name] = testData.response_body
// Override GetContent to mock keepclient Get functionality
+ defer func(orig func(string, *keepclient.KeepClient)(io.ReadCloser, int64, string, error)) { GetContent = orig }(GetContent)
GetContent = func(signedLocator string, keepClient *keepclient.KeepClient) (
reader io.ReadCloser, contentLength int64, url string, err error) {
}
// Override PutContent to mock PutBlock functionality
+ defer func(orig func([]byte, string)(error)) { PutContent = orig }(PutContent)
PutContent = func(content []byte, locator string) (err error) {
if testData.put_error {
err = errors.New("Error putting data")
}
response := IssueRequest(&testData.req)
- c.Assert(testData.response_code, Equals, response.Code)
- c.Assert(testData.response_body, Equals, response.Body.String())
+ c.Assert(response.Code, Equals, testData.response_code)
+ c.Assert(response.Body.String(), Equals, testData.response_body)
expectWorkerChannelEmpty(c, pullq.NextItem)
*/
func RunTrashWorker(trashq *WorkQueue) {
- nextItem := trashq.NextItem
- for item := range nextItem {
+ for item := range trashq.NextItem {
trashRequest := item.(TrashRequest)
- err := TrashItem(trashRequest)
- if err != nil {
- log.Printf("Trash request error for %s: %s", trashRequest, err)
- }
+ TrashItem(trashRequest)
}
}
-/*
- Delete the block indicated by the Locator in TrashRequest.
-*/
-func TrashItem(trashRequest TrashRequest) (err error) {
- // Verify if the block is to be deleted based on its Mtime
+// TrashItem deletes the indicated block from every writable volume.
+func TrashItem(trashRequest TrashRequest) {
+ reqMtime := time.Unix(trashRequest.BlockMtime, 0)
+ if time.Since(reqMtime) < blob_signature_ttl {
+ log.Printf("WARNING: data manager asked to delete a %v old block %v (BlockMtime %d = %v), but my blob_signature_ttl is %v! Skipping.",
+ time.Since(reqMtime),
+ trashRequest.Locator,
+ trashRequest.BlockMtime,
+ reqMtime,
+ blob_signature_ttl)
+ return
+ }
for _, volume := range KeepVM.AllWritable() {
mtime, err := volume.Mtime(trashRequest.Locator)
if err != nil || trashRequest.BlockMtime != mtime.Unix() {
continue
}
- currentTime := time.Now().Unix()
- if time.Duration(currentTime-trashRequest.BlockMtime)*time.Second >= permission_ttl {
- err = volume.Delete(trashRequest.Locator)
+ err = volume.Delete(trashRequest.Locator)
+ if err != nil {
+ log.Printf("%v Delete(%v): %v", volume, trashRequest.Locator, err)
+ continue
}
+ log.Printf("%v Delete(%v) OK", volume, trashRequest.Locator)
}
- return
}
Block2 []byte
BlockMtime2 int64
- CreateData bool
- CreateInVolume1 bool
- UseDelayToCreate bool
+ CreateData bool
+ CreateInVolume1 bool
UseTrashLifeTime bool
+ DifferentMtimes bool
DeleteLocator string
Locator2: TEST_HASH,
Block2: TEST_BLOCK,
- CreateData: true,
- UseDelayToCreate: true,
+ CreateData: true,
+ DifferentMtimes: true,
DeleteLocator: TEST_HASH,
/* Perform the test */
func performTrashWorkerTest(testData TrashWorkerTestData, t *testing.T) {
- actual_permission_ttl := permission_ttl
-
// Create Keep Volumes
KeepVM = MakeTestVolumeManager(2)
defer KeepVM.Close()
vols[0].Put(testData.Locator1, testData.Block1)
vols[0].Put(testData.Locator1+".meta", []byte("metadata"))
- // One of the tests deletes a locator with different Mtimes in two different volumes
- if testData.UseDelayToCreate {
- time.Sleep(1 * time.Second)
- }
-
if testData.CreateInVolume1 {
vols[0].Put(testData.Locator2, testData.Block2)
vols[0].Put(testData.Locator2+".meta", []byte("metadata"))
}
}
+ oldBlockTime := time.Now().Add(-blob_signature_ttl - time.Minute)
+
// Create TrashRequest for the test
trashRequest := TrashRequest{
Locator: testData.DeleteLocator,
- BlockMtime: time.Now().Unix(),
+ BlockMtime: oldBlockTime.Unix(),
}
- // delay by permission_ttl to allow deletes to work
- time.Sleep(1 * time.Second)
-
// Run trash worker and put the trashRequest on trashq
trashList := list.New()
trashList.PushBack(trashRequest)
trashq = NewWorkQueue()
+ defer trashq.Close()
- // Trash worker would not delete block if its Mtime is within trash life time.
- // Hence, we will have to bypass it to allow the deletion to succeed.
if !testData.UseTrashLifeTime {
- permission_ttl = time.Duration(1) * time.Second
+ // Trash worker would not delete block if its Mtime is
+ // within trash life time. Back-date the block to
+ // allow the deletion to succeed.
+ for _, v := range vols {
+ v.(*MockVolume).Timestamps[testData.DeleteLocator] = oldBlockTime
+ if testData.DifferentMtimes {
+ oldBlockTime = oldBlockTime.Add(time.Second)
+ }
+ }
}
go RunTrashWorker(trashq)
}
}
- // One test used the same locator in two different volumes but with different Mtime values
- // Hence let's verify that only one volume has it and the other is deleted
- if (testData.ExpectLocator1) &&
- (testData.Locator1 == testData.Locator2) {
+ // The DifferentMtimes test puts the same locator in two
+ // different volumes, but only one copy has an Mtime matching
+ // the trash request.
+ if testData.DifferentMtimes {
locatorFoundIn := 0
for _, volume := range KeepVM.AllReadable() {
if _, err := volume.Get(testData.Locator1); err == nil {
}
}
if locatorFoundIn != 1 {
- t.Errorf("Expected locator to be found in only one volume after deleting. But found: %s", locatorFoundIn)
+ t.Errorf("Found %d copies of %s, expected 1", locatorFoundIn, testData.Locator1)
}
}
-
- // Done
- permission_ttl = actual_permission_ttl
- trashq.Close()
}
package main
import (
+ "io"
"sync/atomic"
"time"
)
type Volume interface {
+ // Get a block. IFF the returned error is nil, the caller must
+ // put the returned slice back into the buffer pool when it's
+ // finished with it.
Get(loc string) ([]byte, error)
Put(loc string, block []byte) error
Touch(loc string) error
Mtime(loc string) (time.Time, error)
- Index(prefix string) string
+ IndexTo(prefix string, writer io.Writer) error
Delete(loc string) error
Status() *VolumeStatus
String() string
return nil
}
i := atomic.AddUint32(&vm.counter, 1)
- return vm.writables[i % uint32(len(vm.writables))]
+ return vm.writables[i%uint32(len(vm.writables))]
}
func (vm *RRVolumeManager) Close() {
import (
"errors"
"fmt"
+ "io"
"os"
"strings"
"sync"
Store map[string][]byte
Timestamps map[string]time.Time
// Bad volumes return an error for every operation.
- Bad bool
+ Bad bool
// Touchable volumes' Touch() method succeeds for a locator
// that has been Put().
- Touchable bool
+ Touchable bool
// Readonly volumes return an error for Put, Delete, and
// Touch.
- Readonly bool
- called map[string]int
- mutex sync.Mutex
+ Readonly bool
+ called map[string]int
+ mutex sync.Mutex
}
// CreateMockVolume returns a non-Bad, non-Readonly, Touchable mock
if v.Bad {
return nil, errors.New("Bad volume")
} else if block, ok := v.Store[loc]; ok {
- return block, nil
+ buf := bufs.Get(len(block))
+ copy(buf, block)
+ return buf, nil
}
return nil, os.ErrNotExist
}
return mtime, err
}
-func (v *MockVolume) Index(prefix string) string {
- v.gotCall("Index")
- var result string
+func (v *MockVolume) IndexTo(prefix string, w io.Writer) error {
+ v.gotCall("IndexTo")
for loc, block := range v.Store {
- if IsValidLocator(loc) && strings.HasPrefix(loc, prefix) {
- result = result + fmt.Sprintf("%s+%d %d\n",
- loc, len(block), 123456789)
+ if !IsValidLocator(loc) || !strings.HasPrefix(loc, prefix) {
+ continue
+ }
+ _, err := fmt.Fprintf(w, "%s+%d %d\n",
+ loc, len(block), 123456789)
+ if err != nil {
+ return err
}
}
- return result
+ return nil
}
func (v *MockVolume) Delete(loc string) error {
return MethodDisabledError
}
if _, ok := v.Store[loc]; ok {
- if time.Since(v.Timestamps[loc]) < permission_ttl {
+ if time.Since(v.Timestamps[loc]) < blob_signature_ttl {
return nil
}
delete(v.Store, loc)
import (
"fmt"
+ "io"
"io/ioutil"
"log"
"os"
"path/filepath"
"strconv"
"strings"
+ "sync"
"syscall"
"time"
)
-// IORequests are encapsulated Get or Put requests. They are used to
-// implement serialized I/O (i.e. only one read/write operation per
-// volume). When running in serialized mode, the Keep front end sends
-// IORequests on a channel to an IORunner, which handles them one at a
-// time and returns an IOResponse.
-//
-type IOMethod int
-
-const (
- KeepGet IOMethod = iota
- KeepPut
-)
-
-type IORequest struct {
- method IOMethod
- loc string
- data []byte
- reply chan *IOResponse
-}
-
-type IOResponse struct {
- data []byte
- err error
-}
-
-// A UnixVolume has the following properties:
-//
-// root
-// the path to the volume's root directory
-// queue
-// A channel of IORequests. If non-nil, all I/O requests for
-// this volume should be queued on this channel; the result
-// will be delivered on the IOResponse channel supplied in the
-// request.
-//
+// A UnixVolume stores and retrieves blocks in a local directory.
type UnixVolume struct {
- root string // path to this volume
- queue chan *IORequest
- readonly bool
-}
-
-func (v *UnixVolume) IOHandler() {
- for req := range v.queue {
- var result IOResponse
- switch req.method {
- case KeepGet:
- result.data, result.err = v.Read(req.loc)
- case KeepPut:
- result.err = v.Write(req.loc, req.data)
- }
- req.reply <- &result
- }
-}
-
-func MakeUnixVolume(root string, serialize bool, readonly bool) *UnixVolume {
- v := &UnixVolume{
- root: root,
- queue: nil,
- readonly: readonly,
- }
- if serialize {
- v.queue =make(chan *IORequest)
- go v.IOHandler()
- }
- return v
-}
-
-func (v *UnixVolume) Get(loc string) ([]byte, error) {
- if v.queue == nil {
- return v.Read(loc)
- }
- reply := make(chan *IOResponse)
- v.queue <- &IORequest{KeepGet, loc, nil, reply}
- response := <-reply
- return response.data, response.err
-}
-
-func (v *UnixVolume) Put(loc string, block []byte) error {
- if v.readonly {
- return MethodDisabledError
- }
- if v.queue == nil {
- return v.Write(loc, block)
- }
- reply := make(chan *IOResponse)
- v.queue <- &IORequest{KeepPut, loc, block, reply}
- response := <-reply
- return response.err
+ root string // path to the volume's root directory
+ serialize bool
+ readonly bool
+ mutex sync.Mutex
}
func (v *UnixVolume) Touch(loc string) error {
return err
}
defer f.Close()
+ if v.serialize {
+ v.mutex.Lock()
+ defer v.mutex.Unlock()
+ }
if e := lockfile(f); e != nil {
return e
}
}
}
-// Read retrieves a block identified by the locator string "loc", and
+// Get retrieves a block identified by the locator string "loc", and
// returns its contents as a byte slice.
//
-// If the block could not be opened or read, Read returns a nil slice
-// and the os.Error that was generated.
-//
-// If the block is present but its content hash does not match loc,
-// Read returns the block and a CorruptError. It is the caller's
-// responsibility to decide what (if anything) to do with the
-// corrupted data block.
-//
-func (v *UnixVolume) Read(loc string) ([]byte, error) {
- buf, err := ioutil.ReadFile(v.blockPath(loc))
- return buf, err
+// If the block could not be found, opened, or read, Get returns a nil
+// slice and whatever non-nil error was returned by Stat or ReadFile.
+func (v *UnixVolume) Get(loc string) ([]byte, error) {
+ path := v.blockPath(loc)
+ stat, err := os.Stat(path)
+ if err != nil {
+ return nil, err
+ }
+ if stat.Size() < 0 {
+ return nil, os.ErrInvalid
+ } else if stat.Size() == 0 {
+ return bufs.Get(0), nil
+ } else if stat.Size() > BLOCKSIZE {
+ return nil, TooLongError
+ }
+ f, err := os.Open(path)
+ if err != nil {
+ return nil, err
+ }
+ defer f.Close()
+ buf := bufs.Get(int(stat.Size()))
+ if v.serialize {
+ v.mutex.Lock()
+ defer v.mutex.Unlock()
+ }
+ _, err = io.ReadFull(f, buf)
+ if err != nil {
+ bufs.Put(buf)
+ return nil, err
+ }
+ return buf, nil
}
-// Write stores a block of data identified by the locator string
+// Put stores a block of data identified by the locator string
// "loc". It returns nil on success. If the volume is full, it
// returns a FullError. If the write fails due to some other error,
// that error is returned.
-//
-func (v *UnixVolume) Write(loc string, block []byte) error {
+func (v *UnixVolume) Put(loc string, block []byte) error {
+ if v.readonly {
+ return MethodDisabledError
+ }
if v.IsFull() {
return FullError
}
}
bpath := v.blockPath(loc)
+ if v.serialize {
+ v.mutex.Lock()
+ defer v.mutex.Unlock()
+ }
if _, err := tmpfile.Write(block); err != nil {
log.Printf("%s: writing to %s: %s\n", v, bpath, err)
+ tmpfile.Close()
+ os.Remove(tmpfile.Name())
return err
}
if err := tmpfile.Close(); err != nil {
return &VolumeStatus{v.root, devnum, free, used}
}
-// Index returns a list of blocks found on this volume which begin with
-// the specified prefix. If the prefix is an empty string, Index returns
-// a complete list of blocks.
+// IndexTo writes (to the given Writer) a list of blocks found on this
+// volume which begin with the specified prefix. If the prefix is an
+// empty string, IndexTo writes a complete list of blocks.
//
-// The return value is a multiline string (separated by
-// newlines). Each line is in the format
+// Each block is given in the format
//
-// locator+size modification-time
+// locator+size modification-time {newline}
//
// e.g.:
//
// e4d41e6fd68460e0e3fc18cc746959d2+67108864 1377796043
// e4de7a2810f5554cd39b36d8ddb132ff+67108864 1388701136
//
-func (v *UnixVolume) Index(prefix string) (output string) {
- filepath.Walk(v.root,
+func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
+ return filepath.Walk(v.root,
func(path string, info os.FileInfo, err error) error {
- // This WalkFunc inspects each path in the volume
- // and prints an index line for all files that begin
- // with prefix.
if err != nil {
- log.Printf("IndexHandler: %s: walking to %s: %s",
+ log.Printf("%s: IndexTo Walk error at %s: %s",
v, path, err)
return nil
}
- locator := filepath.Base(path)
- // Skip directories that do not match prefix.
- // We know there is nothing interesting inside.
+ basename := filepath.Base(path)
if info.IsDir() &&
- !strings.HasPrefix(locator, prefix) &&
- !strings.HasPrefix(prefix, locator) {
+ !strings.HasPrefix(basename, prefix) &&
+ !strings.HasPrefix(prefix, basename) {
+ // Skip directories that do not match
+ // prefix. We know there is nothing
+ // interesting inside.
return filepath.SkipDir
}
- // Skip any file that is not apparently a locator, e.g. .meta files
- if !IsValidLocator(locator) {
+ if info.IsDir() ||
+ !IsValidLocator(basename) ||
+ !strings.HasPrefix(basename, prefix) {
return nil
}
- // Print filenames beginning with prefix
- if !info.IsDir() && strings.HasPrefix(locator, prefix) {
- output = output + fmt.Sprintf(
- "%s+%d %d\n", locator, info.Size(), info.ModTime().Unix())
- }
- return nil
+ _, err = fmt.Fprintf(w, "%s+%d %d\n",
+ basename, info.Size(), info.ModTime().Unix())
+ return err
})
-
- return
}
func (v *UnixVolume) Delete(loc string) error {
if v.readonly {
return MethodDisabledError
}
+ if v.serialize {
+ v.mutex.Lock()
+ defer v.mutex.Unlock()
+ }
p := v.blockPath(loc)
f, err := os.OpenFile(p, os.O_RDWR|os.O_APPEND, 0644)
if err != nil {
}
defer unlockfile(f)
- // If the block has been PUT more recently than -permission_ttl,
- // return success without removing the block. This guards against
- // a race condition where a block is old enough that Data Manager
- // has added it to the trash list, but the user submitted a PUT
- // for the block since then.
+ // If the block has been PUT in the last blob_signature_ttl
+ // seconds, return success without removing the block. This
+ // protects data from garbage collection until it is no longer
+ // possible for clients to retrieve the unreferenced blocks
+ // anyway (because the permission signatures have expired).
if fi, err := os.Stat(p); err != nil {
return err
} else {
- if time.Since(fi.ModTime()) < permission_ttl {
+ if time.Since(fi.ModTime()) < blob_signature_ttl {
return nil
}
}
if err != nil {
t.Fatal(err)
}
- return MakeUnixVolume(d, serialize, readonly)
+ return &UnixVolume{
+ root: d,
+ serialize: serialize,
+ readonly: readonly,
+ }
}
func _teardown(v *UnixVolume) {
- if v.queue != nil {
- close(v.queue)
- }
os.RemoveAll(v.root)
}
-// store writes a Keep block directly into a UnixVolume, for testing
-// UnixVolume methods.
-//
+// _store writes a Keep block directly into a UnixVolume, bypassing
+// the overhead and safeguards of Put(). Useful for storing bogus data
+// and isolating unit tests from Put() behavior.
func _store(t *testing.T, vol *UnixVolume, filename string, block []byte) {
blockdir := fmt.Sprintf("%s/%s", vol.root, filename[:3])
if err := os.MkdirAll(blockdir, 0755); err != nil {
[self.cloud_nodes, self.booted, self.booting])
def _nodes_busy(self):
- return sum(1 for idle in
- pykka.get_all(rec.actor.in_state('idle') for rec in
+ return sum(1 for busy in
+ pykka.get_all(rec.actor.in_state('busy') for rec in
self.cloud_nodes.nodes.itervalues())
- if idle is False)
+ if busy)
def _nodes_wanted(self):
up_count = self._nodes_up()
break
else:
return None
- if record.arvados_node is None:
+ if not record.actor.in_state('idle', 'busy').get():
self._begin_node_shutdown(record.actor, cancellable=False)
def node_finished_shutdown(self, shutdown_actor):
self.last_setup.arvados_node.get.return_value = arv_node
return self.last_setup
+ def test_no_new_node_when_booted_node_not_usable(self):
+ cloud_node = testutil.cloud_node_mock(4)
+ arv_node = testutil.arvados_node_mock(4, crunch_worker_state='down')
+ setup = self.start_node_boot(cloud_node, arv_node)
+ self.daemon.node_up(setup).get(self.TIMEOUT)
+ self.assertEqual(1, self.alive_monitor_count())
+ self.daemon.update_cloud_nodes([cloud_node])
+ self.daemon.update_arvados_nodes([arv_node])
+ self.daemon.update_server_wishlist(
+ [testutil.MockSize(1)]).get(self.TIMEOUT)
+ self.stop_proxy(self.daemon)
+ self.assertEqual(1, self.node_setup.start.call_count)
+
def test_no_duplication_when_booting_node_listed_fast(self):
# Test that we don't start two ComputeNodeMonitorActors when
# we learn about a booting node through a listing before we
self.stop_proxy(self.daemon)
self.assertShutdownCancellable(False)
+ def test_booted_node_shut_down_when_never_working(self):
+ cloud_node = testutil.cloud_node_mock(4)
+ arv_node = testutil.arvados_node_mock(4, crunch_worker_state='down')
+ setup = self.start_node_boot(cloud_node, arv_node)
+ self.daemon.node_up(setup).get(self.TIMEOUT)
+ self.assertEqual(1, self.alive_monitor_count())
+ self.daemon.update_cloud_nodes([cloud_node])
+ self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
+ self.timer.deliver()
+ self.stop_proxy(self.daemon)
+ self.assertShutdownCancellable(False)
+
def test_node_that_pairs_not_considered_failed_boot(self):
cloud_node = testutil.cloud_node_mock(3)
arv_node = testutil.arvados_node_mock(3)
self.stop_proxy(self.daemon)
self.assertFalse(self.node_shutdown.start.called)
+ def test_node_that_pairs_busy_not_considered_failed_boot(self):
+ cloud_node = testutil.cloud_node_mock(5)
+ arv_node = testutil.arvados_node_mock(5, job_uuid=True)
+ setup = self.start_node_boot(cloud_node, arv_node)
+ self.daemon.node_up(setup).get(self.TIMEOUT)
+ self.assertEqual(1, self.alive_monitor_count())
+ self.daemon.update_cloud_nodes([cloud_node])
+ self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
+ self.timer.deliver()
+ self.stop_proxy(self.daemon)
+ self.assertFalse(self.node_shutdown.start.called)
+
def test_booting_nodes_shut_down(self):
self.make_daemon(want_sizes=[testutil.MockSize(1)])
self.daemon.update_server_wishlist([]).get(self.TIMEOUT)