require "arvados/keep"
+require "uri"
class CollectionsController < ApplicationController
include ActionController::Live
usable_token = find_usable_token(tokens) do
coll = Collection.find(params[:uuid])
end
+ if usable_token.nil?
+ # Response already rendered.
+ return
+ end
+
+ # If we are configured to use a keep-web server, just redirect to
+ # the appropriate URL.
+ if Rails.configuration.keep_web_url or
+ Rails.configuration.keep_web_download_url
+ opts = {}
+ if usable_token == params[:reader_token]
+ opts[:path_token] = usable_token
+ elsif usable_token == Rails.configuration.anonymous_user_token
+ # Don't pass a token at all
+ else
+ # We pass the current user's real token only if it's necessary
+ # to read the collection.
+ opts[:query_token] = usable_token
+ end
+ opts[:disposition] = params[:disposition] if params[:disposition]
+ return redirect_to keep_web_url(params[:uuid], params[:file], opts)
+ end
+
+ # No keep-web server available. Get the file data with arv-get,
+ # and serve it through Rails.
file_name = params[:file].andand.sub(/^(\.\/|\/|)/, './')
- if usable_token.nil?
- return # Response already rendered.
- elsif file_name.nil? or not coll.manifest.has_file?(file_name)
+ if file_name.nil? or not coll.manifest.has_file?(file_name)
return render_not_found
end
return nil
end
+ def keep_web_url(uuid_or_pdh, file, opts)
+ munged_id = uuid_or_pdh.sub('+', '-')
+ fmt = {uuid_or_pdh: munged_id}
+
+ tmpl = Rails.configuration.keep_web_url
+ if Rails.configuration.keep_web_download_url and
+ (!tmpl or opts[:disposition] == 'attachment')
+ # Prefer the attachment-only-host when we want an attachment
+ # (and when there is no preview link configured)
+ tmpl = Rails.configuration.keep_web_download_url
+ else
+ check_uri = URI.parse(tmpl % fmt)
+ if opts[:query_token] and
+ not check_uri.host.start_with?(munged_id + "--") and
+ not check_uri.host.start_with?(munged_id + ".")
+ # We're about to pass a token in the query string, but
+ # keep-web can't accept that safely at a single-origin URL
+ # template (unless it's -attachment-only-host).
+ tmpl = Rails.configuration.keep_web_download_url
+ if not tmpl
+ raise ArgumentError, "Download precluded by site configuration"
+ end
+ logger.warn("Using download link, even though inline content " \
+ "was requested: #{check_uri.to_s}")
+ end
+ end
+
+ if tmpl == Rails.configuration.keep_web_download_url
+ # This takes us to keep-web's -attachment-only-host so there is
+ # no need to add ?disposition=attachment.
+ opts.delete :disposition
+ end
+
+ uri = URI.parse(tmpl % fmt)
+ uri.path += '/' unless uri.path.end_with? '/'
+ if opts[:path_token]
+ uri.path += 't=' + opts[:path_token] + '/'
+ end
+ uri.path += '_/'
+ uri.path += URI.escape(file)
+
+ query = Hash[URI.decode_www_form(uri.query || '')]
+ { query_token: 'api_token',
+ disposition: 'disposition' }.each do |opt, param|
+ if opts.include? opt
+ query[param] = opts[opt]
+ end
+ end
+ unless query.empty?
+ uri.query = URI.encode_www_form(query)
+ end
+
+ uri.to_s
+ end
+
# Note: several controller and integration tests rely on stubbing
# file_enumerator to return fake file content.
def file_enumerator opts
<% if !current_user %>
<p>
- (I notice you are not logged in. If you're looking for a private
- page, you'll need to <%=link_to 'log in', arvados_api_client.arvados_login_url(return_to: strip_token_from_path(request.url))%> first.)
+ <%= link_to(arvados_api_client.arvados_login_url(return_to: strip_token_from_path(request.url)),
+ {class: "btn btn-primary report-issue-modal-window"}) do %>
+ <i class="fa fa-fw fa-sign-in"></i> Log in
+ <% end %>
+ to view private data.
</p>
<% elsif class_name %>
+<%
+ popup_params = {
+ popup_type: 'report',
+ current_location: request.url,
+ current_path: request.fullpath,
+ action_method: 'post',
+ }
+ if error_type == "api"
+ popup_params.merge!(
+ api_error_request_url: api_error.andand.request_url || "",
+ api_error_response: api_error.andand.api_response || "",
+ )
+ else
+ popup_params.merge!(error_message: error_message)
+ end
+%>
+
<p>
-<br/><strong>If you suspect this is a bug, you can help us fix it by sending us a problem report:</strong><br/><br/>
-<% if error_type == 'api' %>
- <%
- api_request_url = api_error.andand.request_url ? api_error.request_url : ''
- api_error_response = api_error.andand.api_response ? api_error.api_response : ''
- %>
- Send a problem report right here. <%= link_to report_issue_popup_path(popup_type: 'report', current_location: request.url, current_path: request.fullpath, action_method: 'post', api_error_request_url: api_request_url, api_error_response: api_error_response),
- {class: 'btn btn-primary report-issue-modal-window', :remote => true, return_to: request.url} do %>
- <i class="fa fa-fw fa-support"></i> Report problem
- <% end %>
-<% else %>
- Send a problem report right here. <%= link_to report_issue_popup_path(popup_type: 'report', current_location: request.url, current_path: request.fullpath, action_method: 'post', error_message: error_message),
- {class: 'btn btn-primary report-issue-modal-window', :remote => true, return_to: request.url} do %>
- <i class="fa fa-fw fa-support"></i> Report problem
- <% end %>
+<%= link_to(report_issue_popup_path(popup_params),
+ {class: 'btn btn-primary report-issue-modal-window', :remote => true, return_to: request.url}) do %>
+ <i class="fa fa-fw fa-support"></i> Report problem
<% end %>
-<% support_email = Rails.configuration.support_email_address%>
-<br/><br/>
- If you prefer, send email to: <a href="mailto:<%=support_email%>?subject=Workbench problem report&body=Problem while viewing page <%=request.url%>"><%=support_email%></a>
+
+or
+
+<%= mail_to(Rails.configuration.support_email_address, "email us",
+ subject: "Workbench problem report",
+ body: "Problem while viewing page #{request.url}") %>
+
+if you suspect this is a bug.
</p>
<li><%= link_to 'Browse public projects', "/projects/public" %></li>
<% end %>
<li class="dropdown hover-dropdown login-menu">
- <a href="<%= arvados_api_client.arvados_login_url(return_to: root_url) %>">Log in</a>
+ <a href="<%= arvados_api_client.arvados_login_url(return_to: request.url) %>">Log in</a>
<ul class="dropdown-menu">
<li>
- <a href="<%= arvados_api_client.arvados_login_url(return_to: root_url) %>">
+ <a href="<%= arvados_api_client.arvados_login_url(return_to: request.url) %>">
<span class="fa fa-lg fa-sign-in"></span>
<p style="margin-left: 1.6em; margin-top: -1.35em; margin-bottom: 0em; margin-right: 0.5em;">Log in or register with<br/>any Google account</p>
</a>
# E.g., using a name-based proxy server to forward connections to shell hosts:
# https://%{hostname}.webshell.uuid_prefix.arvadosapi.com/
shell_in_a_box_url: false
+
+ # Format of preview links. If false, use keep_web_download_url
+ # instead, and disable inline preview. If both are false, use
+ # Workbench's built-in file download/preview mechanism.
+ #
+ # Examples:
+ # keep_web_url: https://%{uuid_or_pdh}.collections.uuid_prefix.arvadosapi.com
+ # keep_web_url: https://%{uuid_or_pdh}--collections.uuid_prefix.arvadosapi.com
+ #
+ # Example supporting only public data and collection-sharing links
+ # (other data will be handled as downloads via keep_web_download_url):
+ # keep_web_url: https://collections.uuid_prefix.arvadosapi.com/c=%{uuid_or_pdh}
+ keep_web_url: false
+
+ # Format of download links. If false, use keep_web_url with
+ # disposition=attachment query param.
+ #
+ # The host part of the keep_web_download_url value here must match
+ # the -attachment-only-host argument given to keep-web: if
+ # keep_web_download_url is "https://FOO.EXAMPLE/c=..." then keep-web
+ # must run with "-attachment-only-host=FOO.EXAMPLE".
+ #
+ # If keep_web_download_url is false, and keep_web_url uses a
+ # single-origin form, then Workbench will show an error page
+ # when asked to download or preview private data.
+ #
+ # Example:
+ # keep_web_download_url: https://download.uuid_prefix.arvadosapi.com/c=%{uuid_or_pdh}
+ keep_web_download_url: false
assert_response 404
end
+ [".navbar .login-menu a",
+ ".navbar .login-menu .dropdown-menu a"
+ ].each do |css_selector|
+ test "login link at #{css_selector.inspect} includes return_to param" do
+ # Without an anonymous token, we're immediately redirected to login.
+ Rails.configuration.anonymous_user_token =
+ api_fixture("api_client_authorizations", "anonymous", "api_token")
+ @controller = ProjectsController.new
+ test_uuid = "zzzzz-j7d0g-zzzzzzzzzzzzzzz"
+ get(:show, {id: test_uuid})
+ login_link = css_select(css_selector).first
+ assert_not_nil(login_link, "failed to select login link")
+ login_href = URI.unescape(login_link.attributes["href"])
+ # The parameter needs to include the full URL to work.
+ assert_includes(login_href, "://")
+ assert_match(/[\?&]return_to=[^&]*\/projects\/#{test_uuid}(&|$)/,
+ login_href)
+ end
+ end
+
test "Workbench returns 4xx when API server is unreachable" do
# We're really testing ApplicationController's render_exception.
# Our primary concern is that it doesn't raise an error and
NONEXISTENT_COLLECTION = "ffffffffffffffffffffffffffffffff+0"
+ def config_anonymous enable
+ Rails.configuration.anonymous_user_token =
+ if enable
+ api_fixture('api_client_authorizations')['anonymous']['api_token']
+ else
+ false
+ end
+ end
+
def stub_file_content
# For the duration of the current test case, stub file download
# content with a randomized (but recognizable) string. Return the
end
test 'anonymous download' do
- Rails.configuration.anonymous_user_token =
- api_fixture('api_client_authorizations')['anonymous']['api_token']
+ config_anonymous true
expect_content = stub_file_content
get :show_file, {
uuid: api_fixture('collections')['user_agreement_in_anonymously_accessible_project']['uuid'],
"using a reader token set the session's API token")
end
- [false, api_fixture('api_client_authorizations')['anonymous']['api_token']].
- each do |anon_conf|
- test "download a file using a reader token with insufficient scope (anon_conf=#{!!anon_conf})" do
- Rails.configuration.anonymous_user_token = anon_conf
+ [false, true].each do |anon|
+ test "download a file using a reader token with insufficient scope, anon #{anon}" do
+ config_anonymous anon
params = collection_params(:foo_file, 'foo')
params[:reader_token] =
api_fixture('api_client_authorizations')['active_noscope']['api_token']
get(:show_file, params)
- if anon_conf
+ if anon
# Some files can be shown without a valid token, but not this one.
assert_response 404
else
end
test "anonymous user accesses collection in shared project" do
- Rails.configuration.anonymous_user_token =
- api_fixture('api_client_authorizations')['anonymous']['api_token']
+ config_anonymous true
collection = api_fixture('collections')['public_text_file']
get(:show, {id: collection['uuid']})
get :show, {id: api_fixture('collections')['user_agreement']['uuid']}, session_for(:active)
assert_not_includes @response.body, '<a href="#Upload"'
end
+
+ def setup_for_keep_web cfg='https://%{uuid_or_pdh}.example', dl_cfg=false
+ Rails.configuration.keep_web_url = cfg
+ Rails.configuration.keep_web_download_url = dl_cfg
+ @controller.expects(:file_enumerator).never
+ end
+
+ %w(uuid portable_data_hash).each do |id_type|
+ test "Redirect to keep_web_url via #{id_type}" do
+ setup_for_keep_web
+ tok = api_fixture('api_client_authorizations')['active']['api_token']
+ id = api_fixture('collections')['w_a_z_file'][id_type]
+ get :show_file, {uuid: id, file: "w a z"}, session_for(:active)
+ assert_response :redirect
+ assert_equal "https://#{id.sub '+', '-'}.example/_/w%20a%20z?api_token=#{tok}", @response.redirect_url
+ end
+
+ test "Redirect to keep_web_url via #{id_type} with reader token" do
+ setup_for_keep_web
+ tok = api_fixture('api_client_authorizations')['active']['api_token']
+ id = api_fixture('collections')['w_a_z_file'][id_type]
+ get :show_file, {uuid: id, file: "w a z", reader_token: tok}, session_for(:expired)
+ assert_response :redirect
+ assert_equal "https://#{id.sub '+', '-'}.example/t=#{tok}/_/w%20a%20z", @response.redirect_url
+ end
+
+ test "Redirect to keep_web_url via #{id_type} with no token" do
+ setup_for_keep_web
+ config_anonymous true
+ id = api_fixture('collections')['public_text_file'][id_type]
+ get :show_file, {uuid: id, file: "Hello World.txt"}
+ assert_response :redirect
+ assert_equal "https://#{id.sub '+', '-'}.example/_/Hello%20World.txt", @response.redirect_url
+ end
+
+ test "Redirect to keep_web_url via #{id_type} with disposition param" do
+ setup_for_keep_web
+ config_anonymous true
+ id = api_fixture('collections')['public_text_file'][id_type]
+ get :show_file, {
+ uuid: id,
+ file: "Hello World.txt",
+ disposition: 'attachment',
+ }
+ assert_response :redirect
+ assert_equal "https://#{id.sub '+', '-'}.example/_/Hello%20World.txt?disposition=attachment", @response.redirect_url
+ end
+
+ test "Redirect to keep_web_download_url via #{id_type}" do
+ setup_for_keep_web('https://collections.example/c=%{uuid_or_pdh}',
+ 'https://download.example/c=%{uuid_or_pdh}')
+ tok = api_fixture('api_client_authorizations')['active']['api_token']
+ id = api_fixture('collections')['w_a_z_file'][id_type]
+ get :show_file, {uuid: id, file: "w a z"}, session_for(:active)
+ assert_response :redirect
+ assert_equal "https://download.example/c=#{id.sub '+', '-'}/_/w%20a%20z?api_token=#{tok}", @response.redirect_url
+ end
+ end
+
+ [false, true].each do |anon|
+ test "No redirect to keep_web_url if collection not found, anon #{anon}" do
+ setup_for_keep_web
+ config_anonymous anon
+ id = api_fixture('collections')['w_a_z_file']['uuid']
+ get :show_file, {uuid: id, file: "w a z"}, session_for(:spectator)
+ assert_response 404
+ end
+
+ test "Redirect download to keep_web_download_url, anon #{anon}" do
+ config_anonymous anon
+ setup_for_keep_web('https://collections.example/c=%{uuid_or_pdh}',
+ 'https://download.example/c=%{uuid_or_pdh}')
+ tok = api_fixture('api_client_authorizations')['active']['api_token']
+ id = api_fixture('collections')['public_text_file']['uuid']
+ get :show_file, {
+ uuid: id,
+ file: 'Hello world.txt',
+ disposition: 'attachment',
+ }, session_for(:active)
+ assert_response :redirect
+ expect_url = "https://download.example/c=#{id.sub '+', '-'}/_/Hello%20world.txt"
+ if not anon
+ expect_url += "?api_token=#{tok}"
+ end
+ assert_equal expect_url, @response.redirect_url
+ end
+ end
+
+ test "Error if file is impossible to retrieve from keep_web_url" do
+ # Cannot pass a session token using a single-origin keep-web URL,
+ # cannot read this collection without a session token.
+ setup_for_keep_web 'https://collections.example/c=%{uuid_or_pdh}', false
+ id = api_fixture('collections')['w_a_z_file']['uuid']
+ get :show_file, {uuid: id, file: "w a z"}, session_for(:active)
+ assert_response 422
+ end
+
+ test "Redirect preview to keep_web_download_url when preview is disabled" do
+ setup_for_keep_web false, 'https://download.example/c=%{uuid_or_pdh}'
+ tok = api_fixture('api_client_authorizations')['active']['api_token']
+ id = api_fixture('collections')['w_a_z_file']['uuid']
+ get :show_file, {uuid: id, file: "w a z"}, session_for(:active)
+ assert_response :redirect
+ assert_equal "https://download.example/c=#{id.sub '+', '-'}/_/w%20a%20z?api_token=#{tok}", @response.redirect_url
+ end
end
Rails.configuration.anonymous_user_token = api_fixture('api_client_authorizations')['anonymous']['api_token']
get(:show, {id: api_fixture('groups')['aproject']['uuid']})
assert_response 404
- assert_includes @response.inspect, 'you are not logged in'
+ assert_match(/log ?in/i, @response.body)
end
test "visit home page as anonymous when anonymous browsing is enabled and expect login" do
--- /dev/null
+module DownloadHelper
+ module_function
+
+ def path
+ Rails.root.join 'tmp', 'downloads'
+ end
+
+ def clear
+ FileUtils.rm_r path
+ begin
+ Dir.mkdir path
+ rescue Errno::EEXIST
+ end
+ end
+
+ def done
+ Dir[path.join '*'].reject do |f|
+ /\.part$/ =~ f
+ end
+ end
+end
io.write content
end
end
+ # Database reset doesn't restore KeepServices; we have to
+ # save/restore manually.
+ use_token :admin do
+ @keep_services = KeepService.all.to_a
+ end
end
teardown do
+ use_token :admin do
+ @keep_services.each do |ks|
+ KeepService.find(ks.uuid).update_attributes(ks.attributes)
+ end
+ end
testfiles.each do |filename, _|
File.unlink(testfile_path filename)
end
assert_match /_text":"\. d41d8\S+ 0:0:empty.txt\\n\. d41d8\S+ 0:0:empty\\\\040\(1\).txt\\n"/, body
end
- test "Upload non-empty files, report errors" do
+ test "Upload non-empty files" do
need_selenium "to make file uploads work"
visit page_with_token 'active', sandbox_path
find('.nav-tabs a', text: 'Upload').click
attach_file 'file_selector', testfile_path('foo.txt')
assert_selector 'button:not([disabled])', text: 'Start'
click_button 'Start'
- if "test environment does not have a keepproxy yet, see #4534" != "fixed"
- using_wait_time 20 do
- assert_text :visible, 'error'
- end
- else
- assert_text :visible, 'Done!'
- visit sandbox_path+'.json'
- assert_match /_text":"\. 0cc1\S+ 0:1:a\\n\. acbd\S+ 0:3:foo.txt\\n"/, body
- end
+ assert_text :visible, 'Done!'
+ visit sandbox_path+'.json'
+ assert_match /_text":"\. 0cc1\S+ 0:1:a\\n\. acbd\S+ 0:3:foo.txt\\n"/, body
end
test "Report mixed-content error" do
skip 'Test suite does not use TLS'
need_selenium "to make file uploads work"
- begin
- use_token :admin
- proxy = KeepService.find(api_fixture('keep_services')['proxy']['uuid'])
- proxy.update_attributes service_ssl_flag: false
+ use_token :admin do
+ KeepService.where(service_type: 'proxy').first.
+ update_attributes(service_ssl_flag: false)
end
visit page_with_token 'active', sandbox_path
find('.nav-tabs a', text: 'Upload').click
test "Report network error" do
need_selenium "to make file uploads work"
- begin
- use_token :admin
- proxy = KeepService.find(api_fixture('keep_services')['proxy']['uuid'])
- # Even if you somehow do port>2^16, surely nx.example.net won't respond
- proxy.update_attributes service_host: 'nx.example.net', service_port: 99999
+ use_token :admin do
+ # Even if you somehow do port>2^16, surely nx.example.net won't
+ # respond
+ KeepService.where(service_type: 'proxy').first.
+ update_attributes(service_host: 'nx.example.net',
+ service_port: 99999)
end
visit page_with_token 'active', sandbox_path
find('.nav-tabs a', text: 'Upload').click
--- /dev/null
+require 'integration_helper'
+require 'helpers/download_helper'
+
+class DownloadTest < ActionDispatch::IntegrationTest
+ def getport service
+ File.read(File.expand_path("../../../../../tmp/#{service}.port", __FILE__))
+ end
+
+ setup do
+ @kwport = getport 'keep-web-ssl'
+ @kwdport = getport 'keep-web-dl-ssl'
+ Rails.configuration.keep_web_url = "https://localhost:#{@kwport}/c=%{uuid_or_pdh}"
+ Rails.configuration.keep_web_download_url = "https://localhost:#{@kwdport}/c=%{uuid_or_pdh}"
+ CollectionsController.any_instance.expects(:file_enumerator).never
+
+ # Make sure Capybara can download files.
+ need_selenium 'for downloading', :selenium_with_download
+ DownloadHelper.clear
+
+ # Keep data isn't populated by fixtures, so we have to write any
+ # data we expect to read.
+ ['foo', 'w a z', "Hello world\n"].each do |data|
+ md5 = `echo -n #{data.shellescape} | arv-put --no-progress --raw -`
+ assert_match /^#{Digest::MD5.hexdigest(data)}/, md5
+ assert $?.success?, $?
+ end
+ end
+
+ ['uuid', 'portable_data_hash'].each do |id_type|
+ test "preview from keep-web by #{id_type} using a reader token" do
+ uuid_or_pdh = api_fixture('collections')['foo_file'][id_type]
+ token = api_fixture('api_client_authorizations')['active_all_collections']['api_token']
+ visit "/collections/download/#{uuid_or_pdh}/#{token}/"
+ within "#collection_files" do
+ click_link 'foo'
+ end
+ assert_no_selector 'a'
+ assert_text 'foo'
+ end
+
+ test "preview anonymous content from keep-web by #{id_type}" do
+ Rails.configuration.anonymous_user_token =
+ api_fixture('api_client_authorizations')['anonymous']['api_token']
+ uuid_or_pdh =
+ api_fixture('collections')['public_text_file'][id_type]
+ visit "/collections/#{uuid_or_pdh}"
+ within "#collection_files" do
+ find('[title~=View]').click
+ end
+ assert_no_selector 'a'
+ assert_text 'Hello world'
+ end
+
+ test "download anonymous content from keep-web by #{id_type}" do
+ Rails.configuration.anonymous_user_token =
+ api_fixture('api_client_authorizations')['anonymous']['api_token']
+ uuid_or_pdh =
+ api_fixture('collections')['public_text_file'][id_type]
+ visit "/collections/#{uuid_or_pdh}"
+ within "#collection_files" do
+ find('[title~=Download]').click
+ end
+ wait_for_download 'Hello world.txt', "Hello world\n"
+ end
+ end
+
+ test "download from keep-web using a session token" do
+ uuid = api_fixture('collections')['w_a_z_file']['uuid']
+ token = api_fixture('api_client_authorizations')['active']['api_token']
+ visit page_with_token('active', "/collections/#{uuid}")
+ within "#collection_files" do
+ find('[title~=Download]').click
+ end
+ wait_for_download 'w a z', 'w a z'
+ end
+
+ def wait_for_download filename, expect_data
+ data = nil
+ tries = 0
+ while tries < 20
+ sleep 0.1
+ tries += 1
+ data = File.read(DownloadHelper.path.join filename) rescue nil
+ end
+ assert_equal expect_data, data
+ end
+
+ # TODO(TC): test "view pages hosted by keep-web, using session
+ # token". We might persuade selenium to send
+ # "collection-uuid.dl.example" requests to localhost by configuring
+ # our test nginx server to work as its forward proxy. Until then,
+ # we're relying on the "Redirect to keep_web_url via #{id_type}"
+ # test in CollectionsControllerTest (and keep-web's tests).
+end
Capybara::Poltergeist::Driver.new app, POLTERGEIST_OPTS.merge(extensions: [js])
end
+Capybara.register_driver :selenium_with_download do |app|
+ profile = Selenium::WebDriver::Firefox::Profile.new
+ profile['browser.download.dir'] = DownloadHelper.path.to_s
+ profile['browser.download.downloadDir'] = DownloadHelper.path.to_s
+ profile['browser.download.defaultFolder'] = DownloadHelper.path.to_s
+ profile['browser.download.folderList'] = 2 # "save to user-defined location"
+ profile['browser.download.manager.showWhenStarting'] = false
+ profile['browser.helperApps.alwaysAsk.force'] = false
+ profile['browser.helperApps.neverAsk.saveToDisk'] = 'text/plain,application/octet-stream'
+ Capybara::Selenium::Driver.new app, profile: profile
+end
+
module WaitForAjax
Capybara.default_wait_time = 5
def wait_for_ajax
end
end
- def need_selenium reason=nil
- Capybara.current_driver = :selenium
+ def need_selenium reason=nil, driver=:selenium
+ Capybara.current_driver = driver
unless ENV['ARVADOS_TEST_HEADFUL'] or @headless
@headless = HeadlessSingleton.get
@headless.start
# though it doesn't need to start up a new server).
env_script = check_output %w(python ./run_test_server.py start --auth admin)
check_output %w(python ./run_test_server.py start_arv-git-httpd)
+ check_output %w(python ./run_test_server.py start_keep-web)
check_output %w(python ./run_test_server.py start_nginx)
+ # This one isn't a no-op, even under run-tests.sh.
+ check_output %w(python ./run_test_server.py start_keep)
end
test_env = {}
env_script.each_line do |line|
def stop_test_server
Dir.chdir PYTHON_TESTS_DIR do
+ check_output %w(python ./run_test_server.py stop_keep)
# These are no-ops if we're running within run-tests.sh
check_output %w(python ./run_test_server.py stop_nginx)
check_output %w(python ./run_test_server.py stop_arv-git-httpd)
+ check_output %w(python ./run_test_server.py stop_keep-web)
check_output %w(python ./run_test_server.py stop)
end
@@server_is_running = false
passenger_enabled on;
# If you're using RVM, uncomment the line below.
#passenger_ruby /usr/local/rvm/wrappers/default/ruby;
+
+ # This value effectively limits the size of API objects users can
+ # create, especially collections. If you change this, you should
+ # also ensure the following settings match it:
+ # * `client_max_body_size` in the server section below
+ # * `client_max_body_size` in the Workbench Nginx configuration (twice)
+ # * `max_request_size` in the API server's application.yml file
+ client_max_body_size 128m;
}
upstream api {
index index.html index.htm index.php;
- # This value effectively limits the size of API objects users can create,
- # especially collections. If you change this, you should also set
- # `max_request_size` in the API server's application.yml file to the same
- # value.
+ # Refer to the comment about this setting in the server section above.
client_max_body_size 128m;
location / {
ssl_certificate <span class="userinput">/YOUR/PATH/TO/cert.pem</span>;
ssl_certificate_key <span class="userinput">/YOUR/PATH/TO/cert.key</span>;
+ # The server needs to accept potentially large refpacks from push clients.
+ client_max_body_size 50m;
+
location / {
proxy_pass http://arvados-git-httpd;
}
export CRUNCH_JOB_DOCKER_BIN=<span class="userinput">docker.io</span>
fuser -TERM -k $CRUNCH_DISPATCH_LOCKFILE || true
-cd /var/www/arvados-api/services/api
+cd /var/www/arvados-api/current
exec $rvmexec bundle exec ./script/crunch-dispatch.rb 2>&1
</code></pre>
</notextile>
The keep-web server provides read-only HTTP access to files stored in Keep. It serves public data to unauthenticated clients, and serves private data to clients that supply Arvados API tokens. It can be installed anywhere with access to Keep services, typically behind a web proxy that provides SSL support. See the "godoc page":http://godoc.org/github.com/curoverse/arvados/services/keep-web for more detail.
-By convention, we use the following hostname for the keep-web service:
+By convention, we use the following hostnames for the keep-web service:
<notextile>
-<pre><code>collections.<span class="userinput">uuid_prefix</span>.your.domain
+<pre><code>download.<span class="userinput">uuid_prefix</span>.your.domain
+collections.<span class="userinput">uuid_prefix</span>.your.domain
</code></pre>
</notextile>
-This hostname should resolve from anywhere on the internet.
+The above hostnames should resolve from anywhere on the internet.
h2. Install keep-web
<notextile>
<pre><code>~$ <span class="userinput">keep-web -h</span>
Usage of keep-web:
- -address string
- Address to listen on: "host:port", or ":port" to listen on all interfaces. (default ":80")
- -anonymous-token value
- API token to try when none of the tokens provided in an HTTP request succeed in reading the desired collection. If this flag is used more than once, each token will be attempted in turn until one works. (default [])
+ -allow-anonymous
+ Serve public data to anonymous clients. Try the token supplied in the ARVADOS_API_TOKEN environment variable when none of the tokens provided in an HTTP request succeed in reading the desired collection. (default false)
-attachment-only-host string
Accept credentials, and add "Content-Disposition: attachment" response headers, for requests at this hostname:port. Prohibiting inline display makes it possible to serve untrusted and non-public content from a single origin, i.e., without wildcard DNS or SSL.
+ -listen string
+ Address to listen on: "host:port", or ":port" to listen on all interfaces. (default ":80")
-trust-all-content
Serve non-public content from a single origin. Dangerous: read docs before using!
</code></pre>
<notextile>
<pre><code>export ARVADOS_API_HOST=<span class="userinput">uuid_prefix</span>.your.domain
-exec sudo -u nobody keep-web -address=<span class="userinput">:9002</span> -anonymous-token=<span class="userinput">hoShoomoo2bai3Ju1xahg6aeng1siquuaZ1yae2gi2Uhaeng2r</span> 2>&1
+export ARVADOS_API_TOKEN="<span class="userinput">hoShoomoo2bai3Ju1xahg6aeng1siquuaZ1yae2gi2Uhaeng2r</span>"
+exec sudo -u nobody keep-web \
+ -listen=<span class="userinput">:9002</span> \
+ -attachment-only-host=<span class="userinput">download.uuid_prefix.your.domain</span> \
+ -allow-anonymous \
+ 2>&1
</code></pre>
</notextile>
-Omit the @-anonymous-token@ arguments if you do not want to serve public data.
+Omit the @-allow-anonymous@ argument if you do not want to serve public data.
Set @ARVADOS_API_HOST_INSECURE=1@ if your API server's SSL certificate is not signed by a recognized CA.
This is best achieved by putting a reverse proxy with SSL support in front of keep-web, running on port 443 and passing requests to keep-web on port 9002 (or whatever port you chose in your run script).
-Note: A wildcard SSL certificate is required in order to proxy keep-web effectively.
+Note: A wildcard SSL certificate is required in order to support a full-featured secure keep-web service. Without it, keep-web can offer file downloads for all Keep data; however, in order to avoid cross-site scripting vulnerabilities, keep-web refuses to serve private data as web content except when it is accessed using a "secret link" share. With a wildcard SSL certificate and DNS configured appropriately, all data can be served as web content.
For example, using Nginx:
server {
listen <span class="userinput">[your public IP address]</span>:443 ssl;
- server_name collections.<span class="userinput">uuid_prefix</span>.your.domain *.collections.<span class="userinput">uuid_prefix</span>.your.domain ~.*--collections.<span class="userinput">uuid_prefix</span>.your.domain;
+ server_name download.<span class="userinput">uuid_prefix</span>.your.domain
+ collections.<span class="userinput">uuid_prefix</span>.your.domain
+ *.collections.<span class="userinput">uuid_prefix</span>.your.domain
+ ~.*--collections.<span class="userinput">uuid_prefix</span>.your.domain;
proxy_connect_timeout 90s;
proxy_read_timeout 300s;
h3. Configure DNS
Configure your DNS servers so the following names resolve to your Nginx proxy's public IP address.
-* @*--collections.uuid_prefix.your.domain@, if your DNS server allows this without interfering with other DNS names; or
-* @*.collections.uuid_prefix.your.domain@, if you have a wildcard SSL certificate valid for these names; or
-* @collections.uuid_prefix.your.domain@, if neither of the above options is feasible. In this case, only unauthenticated requests will be served, i.e., public data and collection sharing links.
+* @download.uuid_prefix.your.domain@
+* @collections.uuid_prefix.your.domain@
+* @*--collections.uuid_prefix.your.domain@, if you have a wildcard SSL certificate valid for @*.uuid_prefix.your.domain@ and your DNS server allows this without interfering with other DNS names.
+* @*.collections.uuid_prefix.your.domain@, if you have a wildcard SSL certificate valid for these names.
+
+If neither of the above wildcard options is feasible, only unauthenticated requests (public data and collection sharing links) will be served as web content at @collections.uuid_prefix.your.domain@. The @download@ name will be used to serve authenticated content, but only as file downloads.
h3. Tell Workbench about the keep-web service
-Add *one* of the following entries to your Workbench configuration file (@/etc/arvados/workbench/application.yml@), depending on your DNS setup:
+Workbench has features like "download file from collection" and "show image" which work better if the content is served by keep-web rather than Workbench itself. We recommend using the two different hostnames ("download" and "collections" above) for file downloads and inline content respectively.
+
+Add the following entry to your Workbench configuration file (@/etc/arvados/workbench/application.yml@). This URL will be used for file downloads.
+
+<notextile>
+<pre><code>keep_web_download_url: https://download.<span class="userinput">uuid_prefix</span>.your.domain/c=%{uuid_or_pdh}
+</code></pre>
+</notextile>
+
+Additionally, add *one* of the following entries to your Workbench configuration file, depending on your DNS setup. This URL will be used to serve user content that can be displayed in the browser, like image previews and static HTML pages.
<notextile>
<pre><code>keep_web_url: https://%{uuid_or_pdh}--collections.<span class="userinput">uuid_prefix</span>.your.domain
keep_web_url: https://%{uuid_or_pdh}.collections.<span class="userinput">uuid_prefix</span>.your.domain
-keep_web_url: https://collections.<span class="userinput">uuid_prefix</span>.your.domain
+keep_web_url: https://collections.<span class="userinput">uuid_prefix</span>.your.domain/c=%{uuid_or_pdh}
</code></pre>
</notextile>
On the <strong>API server</strong>, use the following command to create the token:
<notextile>
-<pre><code>~/arvados/services/api/script$ <span class="userinput">RAILS_ENV=production bundle exec ./get_anonymous_user_token.rb</span>
+<pre><code>/var/www/arvados-api/current/script$ <span class="userinput">RAILS_ENV=production bundle exec ./get_anonymous_user_token.rb</span>
hoShoomoo2bai3Ju1xahg6aeng1siquuaZ1yae2gi2Uhaeng2r
</code></pre></notextile>
passenger_enabled on;
# If you're using RVM, uncomment the line below.
#passenger_ruby /usr/local/rvm/wrappers/default/ruby;
+
+ # `client_max_body_size` should match the corresponding setting in
+ # the API server's Nginx configuration.
+ client_max_body_size 128m;
}
upstream workbench {
ssl_certificate_key <span class="userinput">/YOUR/PATH/TO/cert.key</span>;
index index.html index.htm index.php;
+ # `client_max_body_size` should match the corresponding setting in
+ # the API server's Nginx configuration.
client_max_body_size 128m;
location / {
"Description for the pipeline instance.",
:short => :none,
:type => :string)
+ opt(:project_uuid,
+ "UUID of the project for the pipeline instance.",
+ short: :none,
+ type: :string)
stop_on [:'--']
end
$options = Trollop::with_standard_exception_handling p do
end
end
else
- description = $options[:description]
- description = ("Created at #{Time.now.localtime}" + (@template[:name].andand.size.andand>0 ? " using the pipeline template *#{@template[:name]}*" : "")) if !description
- @instance = PipelineInstance.
- create(components: @components,
- properties: {
- run_options: {
- enable_job_reuse: !@options[:no_reuse]
- }
- },
- pipeline_template_uuid: @template[:uuid],
- description: description,
- state: ($options[:submit] ? 'RunningOnServer' : 'RunningOnClient'))
+ description = $options[:description] ||
+ ("Created at #{Time.now.localtime}" + (@template[:name].andand.size.andand>0 ? " using the pipeline template *#{@template[:name]}*" : ""))
+ instance_body = {
+ components: @components,
+ properties: {
+ run_options: {
+ enable_job_reuse: !@options[:no_reuse]
+ }
+ },
+ pipeline_template_uuid: @template[:uuid],
+ description: description,
+ state: ($options[:submit] ? 'RunningOnServer' : 'RunningOnClient')
+ }
+ if @options[:project_uuid]
+ instance_body[:owner_uuid] = @options[:project_uuid]
+ end
+ @instance = PipelineInstance.create(instance_body)
end
self
end
THISROUND:
for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
{
+ # Don't create new tasks if we already know the job's final result.
+ last if defined($main::success);
+
my $id = $jobstep_todo[$todo_ptr];
my $Jobstep = $jobstep[$id];
if ($Jobstep->{level} != $level)
.q{&& SWAP=$(awk '($1 == "SwapTotal:"){print $2}' </proc/meminfo) }
."&& MEMLIMIT=\$(( (\$MEM * 95) / ($ENV{CRUNCH_NODE_SLOTS} * 100) )) "
."&& let SWAPLIMIT=\$MEMLIMIT+\$SWAP ";
- $command .= "&& exec arv-mount --by-id --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
+ $command .= "&& exec arv-mount --by-pdh --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
if ($docker_hash)
{
my $containername = "$Jobstep->{arvados_task}->{uuid}-$Jobstep->{failures}";
||
($round_num_freeslots > @freeslot && $todo_ptr+1 > $#jobstep_todo))
{
- last THISROUND if $main::please_freeze || defined($main::success);
+ last THISROUND if $main::please_freeze;
if ($main::please_info)
{
$main::please_info = 0;
if (!defined $task_success) {
# task did not indicate one way or the other --> fail
+ Log($jobstepid, sprintf(
+ "ERROR: Task process exited %d, but never updated its task record to indicate success and record its output.",
+ exit_status_s($childstatus)));
$Jobstep->{'arvados_task'}->{success} = 0;
$Jobstep->{'arvados_task'}->save;
$task_success = 0;
close($log_pipe_in);
+ my $logger_failed = 0;
my $read_result = log_writer_read_output(120);
if ($read_result == -1) {
+ $logger_failed = -1;
Log (undef, "timed out reading from 'arv-put'");
} elsif ($read_result != 0) {
+ $logger_failed = -2;
Log(undef, "failed to read arv-put log manifest to EOF");
}
waitpid($log_pipe_pid, 0);
if ($?) {
+ $logger_failed ||= $?;
Log(undef, "log_writer_finish: arv-put exited " . exit_status_s($?))
}
close($log_pipe_out);
- my $arv_put_output = $log_pipe_out_buf;
+ my $arv_put_output = $logger_failed ? undef : $log_pipe_out_buf;
$log_pipe_pid = $log_pipe_in = $log_pipe_out = $log_pipe_out_buf =
$log_pipe_out_select = undef;
my $justcheckpoint = shift; # false if this will be the last meta saved
return if $justcheckpoint; # checkpointing is not relevant post-Warehouse.pm
return unless log_writer_is_active();
+ my $log_manifest = log_writer_finish();
+ return unless defined($log_manifest);
- my $log_manifest = "";
if ($Job->{log}) {
my $prev_log_coll = api_call("collections/get", uuid => $Job->{log});
- $log_manifest .= $prev_log_coll->{manifest_text};
+ $log_manifest = $prev_log_coll->{manifest_text} . $log_manifest;
}
- $log_manifest .= log_writer_finish();
my $log_coll = api_call(
"collections/create", ensure_unique_name => 1, collection => {
// such failures by always using a new or recently active socket.
var MaxIdleConnectionDuration = 30 * time.Second
+var RetryDelay = 2 * time.Second
+
// Indicates an error that was returned by the API server.
type APIServerError struct {
// Address of server returning error, of the form "host:port".
// Information about how to contact the Arvados server
type ArvadosClient struct {
+ // https
+ Scheme string
+
// Arvados API server, form "host:port"
ApiServer string
DiscoveryDoc Dict
lastClosedIdlesAt time.Time
+
+ // Number of retries
+ Retries int
}
// Create a new ArvadosClient, initialized with standard Arvados environment
external := matchTrue.MatchString(os.Getenv("ARVADOS_EXTERNAL_CLIENT"))
ac = ArvadosClient{
+ Scheme: "https",
ApiServer: os.Getenv("ARVADOS_API_HOST"),
ApiToken: os.Getenv("ARVADOS_API_TOKEN"),
ApiInsecure: insecure,
Client: &http.Client{Transport: &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: insecure}}},
- External: external}
+ External: external,
+ Retries: 2}
if ac.ApiServer == "" {
return ac, MissingArvadosApiHost
// CallRaw is the same as Call() but returns a Reader that reads the
// response body, instead of taking an output object.
func (c ArvadosClient) CallRaw(method string, resourceType string, uuid string, action string, parameters Dict) (reader io.ReadCloser, err error) {
- var req *http.Request
-
+ scheme := c.Scheme
+ if scheme == "" {
+ scheme = "https"
+ }
u := url.URL{
- Scheme: "https",
+ Scheme: scheme,
Host: c.ApiServer}
if resourceType != API_DISCOVERY_RESOURCE {
}
}
- if method == "GET" || method == "HEAD" {
- u.RawQuery = vals.Encode()
- if req, err = http.NewRequest(method, u.String(), nil); err != nil {
- return nil, err
- }
- } else {
- if req, err = http.NewRequest(method, u.String(), bytes.NewBufferString(vals.Encode())); err != nil {
- return nil, err
- }
- req.Header.Add("Content-Type", "application/x-www-form-urlencoded")
+ retryable := false
+ switch method {
+ case "GET", "HEAD", "PUT", "OPTIONS", "DELETE":
+ retryable = true
}
- // Add api token header
- req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", c.ApiToken))
- if c.External {
- req.Header.Add("X-External-Client", "1")
- }
-
- // POST and DELETE are not safe to retry automatically, so we minimize
- // such failures by always using a new or recently active socket
- if method == "POST" || method == "DELETE" {
+ // Non-retryable methods such as POST are not safe to retry automatically,
+ // so we minimize such failures by always using a new or recently active socket
+ if !retryable {
if time.Since(c.lastClosedIdlesAt) > MaxIdleConnectionDuration {
c.lastClosedIdlesAt = time.Now()
c.Client.Transport.(*http.Transport).CloseIdleConnections()
}
// Make the request
+ var req *http.Request
var resp *http.Response
- if resp, err = c.Client.Do(req); err != nil {
- return nil, err
- }
- if resp.StatusCode == http.StatusOK {
- return resp.Body, nil
+ for attempt := 0; attempt <= c.Retries; attempt++ {
+ if method == "GET" || method == "HEAD" {
+ u.RawQuery = vals.Encode()
+ if req, err = http.NewRequest(method, u.String(), nil); err != nil {
+ return nil, err
+ }
+ } else {
+ if req, err = http.NewRequest(method, u.String(), bytes.NewBufferString(vals.Encode())); err != nil {
+ return nil, err
+ }
+ req.Header.Add("Content-Type", "application/x-www-form-urlencoded")
+ }
+
+ // Add api token header
+ req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", c.ApiToken))
+ if c.External {
+ req.Header.Add("X-External-Client", "1")
+ }
+
+ resp, err = c.Client.Do(req)
+ if err != nil {
+ if retryable {
+ time.Sleep(RetryDelay)
+ continue
+ } else {
+ return nil, err
+ }
+ }
+
+ if resp.StatusCode == http.StatusOK {
+ return resp.Body, nil
+ }
+
+ defer resp.Body.Close()
+
+ switch resp.StatusCode {
+ case 408, 409, 422, 423, 500, 502, 503, 504:
+ time.Sleep(RetryDelay)
+ continue
+ default:
+ return nil, newAPIServerError(c.ApiServer, resp)
+ }
}
- defer resp.Body.Close()
- return nil, newAPIServerError(c.ApiServer, resp)
+ if resp != nil {
+ return nil, newAPIServerError(c.ApiServer, resp)
+ }
+ return nil, err
}
func newAPIServerError(ServerAddress string, resp *http.Response) APIServerError {
package arvadosclient
import (
+ "fmt"
"git.curoverse.com/arvados.git/sdk/go/arvadostest"
. "gopkg.in/check.v1"
+ "net"
"net/http"
"os"
"testing"
var _ = Suite(&ServerRequiredSuite{})
var _ = Suite(&UnitSuite{})
+var _ = Suite(&MockArvadosServerSuite{})
// Tests that require the Keep server running
type ServerRequiredSuite struct{}
func (s *ServerRequiredSuite) SetUpSuite(c *C) {
arvadostest.StartAPI()
arvadostest.StartKeep(2, false)
+ RetryDelay = 0
+}
+
+func (s *ServerRequiredSuite) TearDownSuite(c *C) {
+ arvadostest.StopKeep(2)
+ arvadostest.StopAPI()
}
func (s *ServerRequiredSuite) SetUpTest(c *C) {
c.Assert(PDHMatch("+12345"), Equals, false)
c.Assert(PDHMatch(""), Equals, false)
}
+
+// Tests that use mock arvados server
+type MockArvadosServerSuite struct{}
+
+func (s *MockArvadosServerSuite) SetUpSuite(c *C) {
+ RetryDelay = 0
+}
+
+func (s *MockArvadosServerSuite) SetUpTest(c *C) {
+ arvadostest.ResetEnv()
+}
+
+type APIServer struct {
+ listener net.Listener
+ url string
+}
+
+func RunFakeArvadosServer(st http.Handler) (api APIServer, err error) {
+ api.listener, err = net.ListenTCP("tcp", &net.TCPAddr{Port: 0})
+ if err != nil {
+ return
+ }
+ api.url = api.listener.Addr().String()
+ go http.Serve(api.listener, st)
+ return
+}
+
+type APIStub struct {
+ method string
+ retryAttempts int
+ expected int
+ respStatus []int
+ responseBody []string
+}
+
+func (h *APIStub) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+ if req.URL.Path == "/redirect-loop" {
+ http.Redirect(resp, req, "/redirect-loop", http.StatusFound)
+ return
+ }
+ if h.respStatus[h.retryAttempts] < 0 {
+ // Fail the client's Do() by starting a redirect loop
+ http.Redirect(resp, req, "/redirect-loop", http.StatusFound)
+ } else {
+ resp.WriteHeader(h.respStatus[h.retryAttempts])
+ resp.Write([]byte(h.responseBody[h.retryAttempts]))
+ }
+ h.retryAttempts++
+}
+
+func (s *MockArvadosServerSuite) TestWithRetries(c *C) {
+ for _, stub := range []APIStub{
+ {
+ "get", 0, 200, []int{200, 500}, []string{`{"ok":"ok"}`, ``},
+ },
+ {
+ "create", 0, 200, []int{200, 500}, []string{`{"ok":"ok"}`, ``},
+ },
+ {
+ "get", 0, 500, []int{500, 500, 500, 200}, []string{``, ``, ``, `{"ok":"ok"}`},
+ },
+ {
+ "create", 0, 500, []int{500, 500, 500, 200}, []string{``, ``, ``, `{"ok":"ok"}`},
+ },
+ {
+ "update", 0, 500, []int{500, 500, 500, 200}, []string{``, ``, ``, `{"ok":"ok"}`},
+ },
+ {
+ "delete", 0, 500, []int{500, 500, 500, 200}, []string{``, ``, ``, `{"ok":"ok"}`},
+ },
+ {
+ "get", 0, 502, []int{500, 500, 502, 200}, []string{``, ``, ``, `{"ok":"ok"}`},
+ },
+ {
+ "create", 0, 502, []int{500, 500, 502, 200}, []string{``, ``, ``, `{"ok":"ok"}`},
+ },
+ {
+ "get", 0, 200, []int{500, 500, 200}, []string{``, ``, `{"ok":"ok"}`},
+ },
+ {
+ "create", 0, 200, []int{500, 500, 200}, []string{``, ``, `{"ok":"ok"}`},
+ },
+ {
+ "delete", 0, 200, []int{500, 500, 200}, []string{``, ``, `{"ok":"ok"}`},
+ },
+ {
+ "update", 0, 200, []int{500, 500, 200}, []string{``, ``, `{"ok":"ok"}`},
+ },
+ {
+ "get", 0, 401, []int{401, 200}, []string{``, `{"ok":"ok"}`},
+ },
+ {
+ "create", 0, 401, []int{401, 200}, []string{``, `{"ok":"ok"}`},
+ },
+ {
+ "get", 0, 404, []int{404, 200}, []string{``, `{"ok":"ok"}`},
+ },
+ {
+ "get", 0, 401, []int{500, 401, 200}, []string{``, ``, `{"ok":"ok"}`},
+ },
+
+ // Response code -1 simulates an HTTP/network error
+ // (i.e., Do() returns an error; there is no HTTP
+ // response status code).
+
+ // Succeed on second retry
+ {
+ "get", 0, 200, []int{-1, -1, 200}, []string{``, ``, `{"ok":"ok"}`},
+ },
+ // "POST" is not safe to retry: fail after one error
+ {
+ "create", 0, -1, []int{-1, 200}, []string{``, `{"ok":"ok"}`},
+ },
+ } {
+ api, err := RunFakeArvadosServer(&stub)
+ c.Check(err, IsNil)
+
+ defer api.listener.Close()
+
+ arv := ArvadosClient{
+ Scheme: "http",
+ ApiServer: api.url,
+ ApiToken: "abc123",
+ ApiInsecure: true,
+ Client: &http.Client{Transport: &http.Transport{}},
+ Retries: 2}
+
+ getback := make(Dict)
+ switch stub.method {
+ case "get":
+ err = arv.Get("collections", "zzzzz-4zz18-znfnqtbbv4spc3w", nil, &getback)
+ case "create":
+ err = arv.Create("collections",
+ Dict{"collection": Dict{"name": "testing"}},
+ &getback)
+ case "update":
+ err = arv.Update("collections", "zzzzz-4zz18-znfnqtbbv4spc3w",
+ Dict{"collection": Dict{"name": "testing"}},
+ &getback)
+ case "delete":
+ err = arv.Delete("pipeline_templates", "zzzzz-4zz18-znfnqtbbv4spc3w", nil, &getback)
+ }
+
+ switch stub.expected {
+ case 200:
+ c.Check(err, IsNil)
+ c.Check(getback["ok"], Equals, "ok")
+ case -1:
+ c.Check(err, NotNil)
+ c.Check(err, ErrorMatches, `.*stopped after \d+ redirects`)
+ default:
+ c.Check(err, NotNil)
+ c.Check(err, ErrorMatches, fmt.Sprintf("arvados API server error: %d.*", stub.expected))
+ c.Check(err.(APIServerError).HttpStatusCode, Equals, stub.expected)
+ }
+ }
+}
const (
SpectatorToken = "zw2f4gwx8hw8cjre7yp6v1zylhrhn3m5gvjq73rtpwhmknrybu"
ActiveToken = "3kg6k6lzmp9kj5cpkcoxie963cmvjahbt2fod9zru30k1jqdmi"
+ AdminToken = "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h"
AnonymousToken = "4kg6k6lzmp9kj4cpkcoxie964cmvjahbt4fod9zru44k4jqdmi"
+ DataManagerToken = "320mkve8qkswstz7ff61glpk3mhgghmg67wmic7elw4z41pke1"
FooCollection = "zzzzz-4zz18-fy296fx3hot09f7"
NonexistentCollection = "zzzzz-4zz18-totallynotexist"
HelloWorldCollection = "zzzzz-4zz18-4en62shvi99lxd4"
import (
"bufio"
"bytes"
- "fmt"
- "io"
- "io/ioutil"
"log"
"os"
"os/exec"
chdirToPythonTests()
cmd := exec.Command("python", "run_test_server.py", "start", "--auth", "admin")
- stderr, err := cmd.StderrPipe()
- if err != nil {
- log.Fatal(err)
- }
- go io.Copy(os.Stderr, stderr)
- stdout, err := cmd.StdoutPipe()
+ cmd.Stdin = nil
+ cmd.Stderr = os.Stderr
+
+ authScript, err := cmd.Output()
if err != nil {
- log.Fatal(err)
- }
- if err = cmd.Start(); err != nil {
- log.Fatal(err)
- }
- var authScript []byte
- if authScript, err = ioutil.ReadAll(stdout); err != nil {
- log.Fatal(err)
- }
- if err = cmd.Wait(); err != nil {
- log.Fatal(err)
+ log.Fatalf("%+v: %s", cmd.Args, err)
}
ParseAuthSettings(authScript)
ResetEnv()
defer os.Chdir(cwd)
chdirToPythonTests()
- exec.Command("python", "run_test_server.py", "stop").Run()
+ bgRun(exec.Command("python", "run_test_server.py", "stop"))
}
// StartKeep starts the given number of keep servers,
cmdArgs = append(cmdArgs, "--keep-enforce-permissions")
}
- cmd := exec.Command("python", cmdArgs...)
-
- stderr, err := cmd.StderrPipe()
- if err != nil {
- log.Fatalf("Setting up stderr pipe: %s", err)
- }
- go io.Copy(os.Stderr, stderr)
- if err := cmd.Run(); err != nil {
- panic(fmt.Sprintf("'python run_test_server.py start_keep' returned error %s", err))
- }
+ bgRun(exec.Command("python", cmdArgs...))
}
// StopKeep stops keep servers that were started with StartKeep.
defer os.Chdir(cwd)
chdirToPythonTests()
- exec.Command("python", "run_test_server.py", "stop_keep", "--num-keep-servers", strconv.Itoa(numKeepServers))
+ cmd := exec.Command("python", "run_test_server.py", "stop_keep", "--num-keep-servers", strconv.Itoa(numKeepServers))
+ cmd.Stdin = nil
+ cmd.Stderr = os.Stderr
+ cmd.Stdout = os.Stderr
+ if err := cmd.Run(); err != nil {
+ log.Fatalf("%+v: %s", cmd.Args, err)
+ }
+}
+
+// Start cmd, with stderr and stdout redirected to our own
+// stderr. Return when the process exits, but do not wait for its
+// stderr and stdout to close: any grandchild processes will continue
+// writing to our stderr.
+func bgRun(cmd *exec.Cmd) {
+ cmd.Stdin = nil
+ cmd.Stderr = os.Stderr
+ cmd.Stdout = os.Stderr
+ if err := cmd.Start(); err != nil {
+ log.Fatalf("%+v: %s", cmd.Args, err)
+ }
+ if _, err := cmd.Process.Wait(); err != nil {
+ log.Fatalf("%+v: %s", cmd.Args, err)
+ }
}
--- /dev/null
+package keepclient
+
+import (
+ "encoding/json"
+ "fmt"
+ "log"
+ "os"
+ "os/signal"
+ "reflect"
+ "strings"
+ "syscall"
+ "time"
+)
+
+// DiscoverKeepServers gets list of available keep services from api server
+func (this *KeepClient) DiscoverKeepServers() error {
+ var list svcList
+
+ // Get keep services from api server
+ err := this.Arvados.Call("GET", "keep_services", "", "accessible", nil, &list)
+ if err != nil {
+ return err
+ }
+
+ return this.loadKeepServers(list)
+}
+
+// LoadKeepServicesFromJSON gets list of available keep services from given JSON
+func (this *KeepClient) LoadKeepServicesFromJSON(services string) error {
+ var list svcList
+
+ // Load keep services from given json
+ dec := json.NewDecoder(strings.NewReader(services))
+ if err := dec.Decode(&list); err != nil {
+ return err
+ }
+
+ return this.loadKeepServers(list)
+}
+
+// RefreshServices calls DiscoverKeepServers to refresh the keep
+// service list on SIGHUP; when the given interval has elapsed since
+// the last refresh; and (if the last refresh failed) the given
+// errInterval has elapsed.
+func (kc *KeepClient) RefreshServices(interval, errInterval time.Duration) {
+ var previousRoots = []map[string]string{}
+
+ timer := time.NewTimer(interval)
+ gotHUP := make(chan os.Signal, 1)
+ signal.Notify(gotHUP, syscall.SIGHUP)
+
+ for {
+ select {
+ case <-gotHUP:
+ case <-timer.C:
+ }
+ timer.Reset(interval)
+
+ if err := kc.DiscoverKeepServers(); err != nil {
+ log.Println("Error retrieving services list: %v (retrying in %v)", err, errInterval)
+ timer.Reset(errInterval)
+ continue
+ }
+ newRoots := []map[string]string{kc.LocalRoots(), kc.GatewayRoots()}
+
+ if !reflect.DeepEqual(previousRoots, newRoots) {
+ log.Printf("Updated services list: locals %v gateways %v", newRoots[0], newRoots[1])
+ previousRoots = newRoots
+ }
+
+ if len(newRoots[0]) == 0 {
+ log.Printf("WARNING: No local services (retrying in %v)", errInterval)
+ timer.Reset(errInterval)
+ }
+ }
+}
+
+// loadKeepServers
+func (this *KeepClient) loadKeepServers(list svcList) error {
+ listed := make(map[string]bool)
+ localRoots := make(map[string]string)
+ gatewayRoots := make(map[string]string)
+ writableLocalRoots := make(map[string]string)
+
+ // replicasPerService is 1 for disks; unknown or unlimited otherwise
+ this.replicasPerService = 1
+ this.Using_proxy = false
+
+ for _, service := range list.Items {
+ scheme := "http"
+ if service.SSL {
+ scheme = "https"
+ }
+ url := fmt.Sprintf("%s://%s:%d", scheme, service.Hostname, service.Port)
+
+ // Skip duplicates
+ if listed[url] {
+ continue
+ }
+ listed[url] = true
+
+ localRoots[service.Uuid] = url
+ if service.SvcType == "proxy" {
+ this.Using_proxy = true
+ }
+
+ if service.ReadOnly == false {
+ writableLocalRoots[service.Uuid] = url
+ if service.SvcType != "disk" {
+ this.replicasPerService = 0
+ }
+ }
+
+ // Gateway services are only used when specified by
+ // UUID, so there's nothing to gain by filtering them
+ // by service type. Including all accessible services
+ // (gateway and otherwise) merely accommodates more
+ // service configurations.
+ gatewayRoots[service.Uuid] = url
+ }
+
+ if this.Using_proxy {
+ this.setClientSettingsProxy()
+ } else {
+ this.setClientSettingsDisk()
+ }
+
+ this.SetServiceRoots(localRoots, writableLocalRoots, gatewayRoots)
+ return nil
+}
--- /dev/null
+package keepclient
+
+import (
+ "fmt"
+ "time"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+)
+
+func ExampleRefreshServices() {
+ arv, err := arvadosclient.MakeArvadosClient()
+ if err != nil {
+ panic(err)
+ }
+ kc, err := MakeKeepClient(&arv)
+ if err != nil {
+ panic(err)
+ }
+ go kc.RefreshServices(5*time.Minute, 3*time.Second)
+ fmt.Printf("LocalRoots: %#v\n", kc.LocalRoots())
+}
import (
"crypto/md5"
- "encoding/json"
"errors"
"fmt"
"git.curoverse.com/arvados.git/sdk/go/streamer"
Items []keepService `json:"items"`
}
-// DiscoverKeepServers gets list of available keep services from api server
-func (this *KeepClient) DiscoverKeepServers() error {
- var list svcList
-
- // Get keep services from api server
- err := this.Arvados.Call("GET", "keep_services", "", "accessible", nil, &list)
- if err != nil {
- return err
- }
-
- return this.loadKeepServers(list)
-}
-
-// LoadKeepServicesFromJSON gets list of available keep services from given JSON
-func (this *KeepClient) LoadKeepServicesFromJSON(services string) error {
- var list svcList
-
- // Load keep services from given json
- dec := json.NewDecoder(strings.NewReader(services))
- if err := dec.Decode(&list); err != nil {
- return err
- }
-
- return this.loadKeepServers(list)
-}
-
-// loadKeepServers
-func (this *KeepClient) loadKeepServers(list svcList) error {
- listed := make(map[string]bool)
- localRoots := make(map[string]string)
- gatewayRoots := make(map[string]string)
- writableLocalRoots := make(map[string]string)
-
- // replicasPerService is 1 for disks; unknown or unlimited otherwise
- this.replicasPerService = 1
- this.Using_proxy = false
-
- for _, service := range list.Items {
- scheme := "http"
- if service.SSL {
- scheme = "https"
- }
- url := fmt.Sprintf("%s://%s:%d", scheme, service.Hostname, service.Port)
-
- // Skip duplicates
- if listed[url] {
- continue
- }
- listed[url] = true
-
- localRoots[service.Uuid] = url
- if service.SvcType == "proxy" {
- this.Using_proxy = true
- }
-
- if service.ReadOnly == false {
- writableLocalRoots[service.Uuid] = url
- if service.SvcType != "disk" {
- this.replicasPerService = 0
- }
- }
-
- // Gateway services are only used when specified by
- // UUID, so there's nothing to gain by filtering them
- // by service type. Including all accessible services
- // (gateway and otherwise) merely accommodates more
- // service configurations.
- gatewayRoots[service.Uuid] = url
- }
-
- if this.Using_proxy {
- this.setClientSettingsProxy()
- } else {
- this.setClientSettingsDisk()
- }
-
- this.SetServiceRoots(localRoots, writableLocalRoots, gatewayRoots)
- return nil
-}
-
type uploadStatus struct {
err error
url string
-import bz2
+import cStringIO
import datetime
-import fcntl
-import functools
-import gflags
import hashlib
-import json
import logging
+import math
import os
-import pprint
import pycurl
import Queue
import re
import socket
import ssl
-import string
-import cStringIO
-import subprocess
-import sys
import threading
-import time
import timer
-import types
-import UserDict
-import zlib
import arvados
import arvados.config as config
class KeepClient(object):
# Default Keep server connection timeout: 2 seconds
- # Default Keep server read timeout: 300 seconds
+ # Default Keep server read timeout: 64 seconds
+ # Default Keep server bandwidth minimum: 32768 bytes per second
# Default Keep proxy connection timeout: 20 seconds
- # Default Keep proxy read timeout: 300 seconds
- DEFAULT_TIMEOUT = (2, 300)
- DEFAULT_PROXY_TIMEOUT = (20, 300)
+ # Default Keep proxy read timeout: 64 seconds
+ # Default Keep proxy bandwidth minimum: 32768 bytes per second
+ DEFAULT_TIMEOUT = (2, 64, 32768)
+ DEFAULT_PROXY_TIMEOUT = (20, 64, 32768)
class ThreadLimiter(object):
- """
- Limit the number of threads running at a given time to
- {desired successes} minus {successes reported}. When successes
- reported == desired, wake up the remaining threads and tell
- them to quit.
+ """Limit the number of threads writing to Keep at once.
+
+ This ensures that only a number of writer threads that could
+ potentially achieve the desired replication level run at once.
+ Once the desired replication level is achieved, queued threads
+ are instructed not to run.
Should be used in a "with" block.
"""
- def __init__(self, todo):
+ def __init__(self, want_copies, max_service_replicas):
self._started = 0
- self._todo = todo
+ self._want_copies = want_copies
self._done = 0
self._response = None
self._start_lock = threading.Condition()
- self._todo_lock = threading.Semaphore(todo)
+ if (not max_service_replicas) or (max_service_replicas >= want_copies):
+ max_threads = 1
+ else:
+ max_threads = math.ceil(float(want_copies) / max_service_replicas)
+ _logger.debug("Limiter max threads is %d", max_threads)
+ self._todo_lock = threading.Semaphore(max_threads)
self._done_lock = threading.Lock()
self._local = threading.local()
def shall_i_proceed(self):
"""
- Return true if the current thread should do stuff. Return
- false if the current thread should just stop.
+ Return true if the current thread should write to Keep.
+ Return false otherwise.
"""
with self._done_lock:
- return (self._done < self._todo)
+ return (self._done < self._want_copies)
def save_response(self, response_body, replicas_stored):
"""
Records a response body (a locator, possibly signed) returned by
- the Keep server. It is not necessary to save more than
- one response, since we presume that any locator returned
- in response to a successful request is valid.
+ the Keep server, and the number of replicas it stored.
"""
with self._done_lock:
self._done += replicas_stored
self._response = response_body
def response(self):
- """
- Returns the body from the response to a PUT request.
- """
+ """Return the body from the response to a PUT request."""
with self._done_lock:
return self._response
def done(self):
- """
- Return how many successes were reported.
- """
+ """Return the total number of replicas successfully stored."""
with self._done_lock:
return self._done
if not timeouts:
return
elif isinstance(timeouts, tuple):
- conn_t, xfer_t = timeouts
+ if len(timeouts) == 2:
+ conn_t, xfer_t = timeouts
+ bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
+ else:
+ conn_t, xfer_t, bandwidth_bps = timeouts
else:
conn_t, xfer_t = (timeouts, timeouts)
+ bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
- curl.setopt(pycurl.TIMEOUT_MS, int(xfer_t*1000))
+ curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t)))
+ curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
def _headerfunction(self, header_line):
header_line = header_line.decode('iso-8859-1')
:timeout:
The initial timeout (in seconds) for HTTP requests to Keep
- non-proxy servers. A tuple of two floats is interpreted as
- (connection_timeout, read_timeout): see
- http://docs.python-requests.org/en/latest/user/advanced/#timeouts.
- Because timeouts are often a result of transient server load, the
- actual connection timeout will be increased by a factor of two on
- each retry.
- Default: (2, 300).
+ non-proxy servers. A tuple of three floats is interpreted as
+ (connection_timeout, read_timeout, minimum_bandwidth). A connection
+ will be aborted if the average traffic rate falls below
+ minimum_bandwidth bytes per second over an interval of read_timeout
+ seconds. Because timeouts are often a result of transient server
+ load, the actual connection timeout will be increased by a factor
+ of two on each retry.
+ Default: (2, 64, 32768).
:proxy_timeout:
The initial timeout (in seconds) for HTTP requests to
- Keep proxies. A tuple of two floats is interpreted as
- (connection_timeout, read_timeout). The behavior described
- above for adjusting connection timeouts on retry also applies.
- Default: (20, 300).
+ Keep proxies. A tuple of three floats is interpreted as
+ (connection_timeout, read_timeout, minimum_bandwidth). The behavior
+ described above for adjusting connection timeouts on retry also
+ applies.
+ Default: (20, 64, 32768).
:api_token:
If you're not using an API client, but only talking
self.put = self.local_store_put
else:
self.num_retries = num_retries
+ self.max_replicas_per_service = None
if proxy:
if not proxy.endswith('/'):
proxy += '/'
self._gateway_services = {}
self._keep_services = [{
'uuid': 'proxy',
+ 'service_type': 'proxy',
'_service_root': proxy,
}]
self._writable_services = self._keep_services
self.using_proxy = True
self._static_services_list = True
- self.max_replicas_per_service = 1
else:
# It's important to avoid instantiating an API client
# unless we actually need one, for testing's sake.
self._writable_services = None
self.using_proxy = None
self._static_services_list = False
- self.max_replicas_per_service = 1
def current_timeout(self, attempt_number):
"""Return the appropriate timeout to use for this client.
# TODO(twp): the timeout should be a property of a
# KeepService, not a KeepClient. See #4488.
t = self.proxy_timeout if self.using_proxy else self.timeout
- return (t[0] * (1 << attempt_number), t[1])
+ if len(t) == 2:
+ return (t[0] * (1 << attempt_number), t[1])
+ else:
+ return (t[0] * (1 << attempt_number), t[1], t[2])
+ def _any_nondisk_services(self, service_list):
+ return any(ks.get('service_type', 'disk') != 'disk'
+ for ks in service_list)
def build_services_list(self, force_rebuild=False):
if (self._static_services_list or
except Exception: # API server predates Keep services.
keep_services = self.api_client.keep_disks().list()
- accessible = keep_services.execute().get('items')
- if not accessible:
+ # Gateway services are only used when specified by UUID,
+ # so there's nothing to gain by filtering them by
+ # service_type.
+ self._gateway_services = {ks['uuid']: ks for ks in
+ keep_services.execute()['items']}
+ if not self._gateway_services:
raise arvados.errors.NoKeepServersError()
# Precompute the base URI for each service.
- for r in accessible:
+ for r in self._gateway_services.itervalues():
host = r['service_host']
if not host.startswith('[') and host.find(':') >= 0:
# IPv6 URIs must be formatted like http://[::1]:80/...
host,
r['service_port'])
- # Gateway services are only used when specified by UUID,
- # so there's nothing to gain by filtering them by
- # service_type.
- self._gateway_services = {ks.get('uuid'): ks for ks in accessible}
_logger.debug(str(self._gateway_services))
-
self._keep_services = [
- ks for ks in accessible
- if ks.get('service_type') in ['disk', 'proxy']]
- self._writable_services = [
- ks for ks in accessible
- if (ks.get('service_type') in ['disk', 'proxy']) and (True != ks.get('read_only'))]
- _logger.debug(str(self._keep_services))
-
- self.using_proxy = any(ks.get('service_type') == 'proxy'
- for ks in self._keep_services)
+ ks for ks in self._gateway_services.itervalues()
+ if not ks.get('service_type', '').startswith('gateway:')]
+ self._writable_services = [ks for ks in self._keep_services
+ if not ks.get('read_only')]
+
# For disk type services, max_replicas_per_service is 1
- # It is unknown or unlimited for non-disk typed services.
- for ks in accessible:
- if ('disk' != ks.get('service_type')) and (not ks.get('read_only')):
- self.max_replicas_per_service = None
+ # It is unknown (unlimited) for other service types.
+ if self._any_nondisk_services(self._writable_services):
+ self.max_replicas_per_service = None
+ else:
+ self.max_replicas_per_service = 1
def _service_weight(self, data_hash, service_uuid):
"""Compute the weight of a Keep service endpoint for a data
# in that order.
use_services = self._keep_services
if need_writable:
- use_services = self._writable_services
+ use_services = self._writable_services
+ self.using_proxy = self._any_nondisk_services(use_services)
sorted_roots.extend([
svc['_service_root'] for svc in sorted(
use_services,
# Tell the proxy how many copies we want it to store
headers['X-Keep-Desired-Replication'] = str(copies)
roots_map = {}
- thread_limiter = KeepClient.ThreadLimiter(1 if self.max_replicas_per_service is None else copies)
loop = retry.RetryLoop(num_retries, self._check_loop_result,
backoff_start=2)
- thread_sequence = 0
for tries_left in loop:
try:
sorted_roots = self.map_new_services(
loop.save_result(error)
continue
+ thread_limiter = KeepClient.ThreadLimiter(
+ copies, self.max_replicas_per_service)
threads = []
for service_root, ks in [(root, roots_map[root])
for root in sorted_roots]:
service_root=service_root,
thread_limiter=thread_limiter,
timeout=self.current_timeout(num_retries-tries_left),
- thread_sequence=thread_sequence)
+ thread_sequence=len(threads))
t.start()
threads.append(t)
- thread_sequence += 1
for t in threads:
t.join()
loop.save_result((thread_limiter.done() >= copies, len(threads)))
return mock.patch.object(api_client._http, 'request', side_effect=queue_with((
(fake_httplib2_response(code, **headers), body) for code in codes)))
+def str_keep_locator(s):
+ return '{}+{}'.format(hashlib.md5(s).hexdigest(), len(s))
class FakeCurl:
@classmethod
def __init__(self, name='.', *data):
self._name = name
self._data = ''.join(data)
- self._data_locators = ['{}+{}'.format(hashlib.md5(d).hexdigest(),
- len(d)) for d in data]
+ self._data_locators = [str_keep_locator(d) for d in data]
self.num_retries = 0
def name(self):
'response_body': 0,
# before returning from handler (thus setting response EOF)
'response_close': 0,
+ # after writing over 1s worth of data at self.bandwidth
+ 'mid_write': 0,
+ # after reading over 1s worth of data at self.bandwidth
+ 'mid_read': 0,
}
+ self.bandwidth = None
super(Server, self).__init__(*args, **kwargs)
def setdelays(self, **kwargs):
self.delays.get(k) # NameError if unknown key
self.delays[k] = v
+ def setbandwidth(self, bandwidth):
+ """For future requests, set the maximum bandwidth (number of bytes per
+ second) to operate at. If setbandwidth is never called, function at
+ maximum bandwidth possible"""
+ self.bandwidth = float(bandwidth)
+
def _sleep_at_least(self, seconds):
"""Sleep for given time, even if signals are received."""
wake = time.time() + seconds
class Handler(BaseHTTPServer.BaseHTTPRequestHandler, object):
+ def wfile_bandwidth_write(self, data_to_write):
+ if self.server.bandwidth == None and self.server.delays['mid_write'] == 0:
+ self.wfile.write(data_to_write)
+ else:
+ BYTES_PER_WRITE = int(self.server.bandwidth/4.0) or 32768
+ outage_happened = False
+ num_bytes = len(data_to_write)
+ num_sent_bytes = 0
+ target_time = time.time()
+ while num_sent_bytes < num_bytes:
+ if num_sent_bytes > self.server.bandwidth and not outage_happened:
+ self.server._do_delay('mid_write')
+ target_time += self.delays['mid_write']
+ outage_happened = True
+ num_write_bytes = min(BYTES_PER_WRITE,
+ num_bytes - num_sent_bytes)
+ self.wfile.write(data_to_write[
+ num_sent_bytes:num_sent_bytes+num_write_bytes])
+ num_sent_bytes += num_write_bytes
+ if self.server.bandwidth is not None:
+ target_time += num_write_bytes / self.server.bandwidth
+ self.server._sleep_at_least(target_time - time.time())
+ return None
+
+ def rfile_bandwidth_read(self, bytes_to_read):
+ if self.server.bandwidth == None and self.server.delays['mid_read'] == 0:
+ return self.rfile.read(bytes_to_read)
+ else:
+ BYTES_PER_READ = int(self.server.bandwidth/4.0) or 32768
+ data = ''
+ outage_happened = False
+ bytes_read = 0
+ target_time = time.time()
+ while bytes_to_read > bytes_read:
+ if bytes_read > self.server.bandwidth and not outage_happened:
+ self.server._do_delay('mid_read')
+ target_time += self.delays['mid_read']
+ outage_happened = True
+ next_bytes_to_read = min(BYTES_PER_READ,
+ bytes_to_read - bytes_read)
+ data += self.rfile.read(next_bytes_to_read)
+ bytes_read += next_bytes_to_read
+ if self.server.bandwidth is not None:
+ target_time += next_bytes_to_read / self.server.bandwidth
+ self.server._sleep_at_least(target_time - time.time())
+ return data
+
def handle(self, *args, **kwargs):
self.server._do_delay('request')
return super(Handler, self).handle(*args, **kwargs)
self.send_header('Content-type', 'application/octet-stream')
self.end_headers()
self.server._do_delay('response_body')
- self.wfile.write(self.server.store[datahash])
+ self.wfile_bandwidth_write(self.server.store[datahash])
self.server._do_delay('response_close')
def do_PUT(self):
self.server._do_delay('request_body')
-
# The comments at https://bugs.python.org/issue1491 implies that Python
# 2.7 BaseHTTPRequestHandler was patched to support 100 Continue, but
# reading the actual code that ships in Debian it clearly is not, so we
# need to send the response on the socket directly.
-
- self.wfile.write("%s %d %s\r\n\r\n" %
+ self.wfile_bandwidth_write("%s %d %s\r\n\r\n" %
(self.protocol_version, 100, "Continue"))
-
- data = self.rfile.read(int(self.headers.getheader('content-length')))
+ data = self.rfile_bandwidth_read(int(self.headers.getheader('content-length')))
datahash = hashlib.md5(data).hexdigest()
self.server.store[datahash] = data
self.server._do_delay('response')
self.send_header('Content-type', 'text/plain')
self.end_headers()
self.server._do_delay('response_body')
- self.wfile.write(datahash + '+' + str(len(data)))
+ self.wfile_bandwidth_write(datahash + '+' + str(len(data)))
self.server._do_delay('response_close')
def log_request(self, *args, **kwargs):
import arvados
import arvados_testutil as tutil
-import hashlib
class ManifestExamples(object):
def make_manifest(self,
files_per_stream=1,
streams=1):
datablip = 'x' * bytes_per_block
- data_loc = '{}+{}'.format(hashlib.md5(datablip).hexdigest(),
- bytes_per_block)
+ data_loc = tutil.str_keep_locator(datablip)
with tutil.mock_keep_responses(data_loc, 200):
coll = arvados.CollectionWriter()
for si in range(0, streams):
proxy_pass http://keepproxy;
}
}
+ upstream keep-web {
+ server localhost:{{KEEPWEBPORT}};
+ }
+ server {
+ listen *:{{KEEPWEBSSLPORT}} ssl default_server;
+ server_name ~^(?<request_host>.*)$;
+ ssl_certificate {{SSLCERT}};
+ ssl_certificate_key {{SSLKEY}};
+ location / {
+ proxy_pass http://keep-web;
+ proxy_set_header Host $request_host:{{KEEPWEBPORT}};
+ proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
+ }
+ }
+ server {
+ listen *:{{KEEPWEBDLSSLPORT}} ssl default_server;
+ server_name ~.*;
+ ssl_certificate {{SSLCERT}};
+ ssl_certificate_key {{SSLKEY}};
+ location / {
+ proxy_pass http://keep-web;
+ proxy_set_header Host download:{{KEEPWEBPORT}};
+ proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
+ proxy_redirect //download:{{KEEPWEBPORT}}/ https://$host:{{KEEPWEBDLSSLPORT}}/;
+ }
+ }
}
ARVADOS_DIR = os.path.realpath(os.path.join(MY_DIRNAME, '../../..'))
SERVICES_SRC_DIR = os.path.join(ARVADOS_DIR, 'services')
-SERVER_PID_PATH = 'tmp/pids/test-server.pid'
if 'GOPATH' in os.environ:
gopaths = os.environ['GOPATH'].split(':')
gobins = [os.path.join(path, 'bin') for path in gopaths]
import signal
import subprocess
import time
- try:
- if passenger_root:
- # First try to shut down nicely
- restore_cwd = os.getcwd()
- os.chdir(passenger_root)
- subprocess.call([
- 'bundle', 'exec', 'passenger', 'stop', '--pid-file', pidfile])
- os.chdir(restore_cwd)
- now = time.time()
- timeout = now + wait
- with open(pidfile, 'r') as f:
- server_pid = int(f.read())
- while now <= timeout:
- if not passenger_root or timeout - now < wait / 2:
- # Half timeout has elapsed. Start sending SIGTERM
- os.kill(server_pid, signal.SIGTERM)
- # Raise OSError if process has disappeared
- os.getpgid(server_pid)
+
+ now = time.time()
+ startTERM = now
+ deadline = now + wait
+
+ if passenger_root:
+ # First try to shut down nicely
+ restore_cwd = os.getcwd()
+ os.chdir(passenger_root)
+ subprocess.call([
+ 'bundle', 'exec', 'passenger', 'stop', '--pid-file', pidfile])
+ os.chdir(restore_cwd)
+ # Use up to half of the +wait+ period waiting for "passenger
+ # stop" to work. If the process hasn't exited by then, start
+ # sending TERM signals.
+ startTERM += wait/2
+
+ server_pid = None
+ while now <= deadline and server_pid is None:
+ try:
+ with open(pidfile, 'r') as f:
+ server_pid = int(f.read())
+ except IOError:
+ # No pidfile = nothing to kill.
+ return
+ except ValueError as error:
+ # Pidfile exists, but we can't parse it. Perhaps the
+ # server has created the file but hasn't written its PID
+ # yet?
+ print("Parse error reading pidfile {}: {}".format(pidfile, error))
time.sleep(0.1)
now = time.time()
- except EnvironmentError:
- pass
+
+ while now <= deadline:
+ try:
+ exited, _ = os.waitpid(server_pid, os.WNOHANG)
+ if exited > 0:
+ return
+ except OSError:
+ # already exited, or isn't our child process
+ pass
+ try:
+ if now >= startTERM:
+ os.kill(server_pid, signal.SIGTERM)
+ print("Sent SIGTERM to {} ({})".format(server_pid, pidfile))
+ except OSError as error:
+ if error.errno == errno.ESRCH:
+ # Thrown by os.getpgid() or os.kill() if the process
+ # does not exist, i.e., our work here is done.
+ return
+ raise
+ time.sleep(0.1)
+ now = time.time()
+
+ print("Server PID {} ({}) did not exit, giving up after {}s".
+ format(server_pid, pidfile, wait))
def find_available_port():
"""Return an IPv4 port number that is not in use right now.
format(port, timeout),
file=sys.stderr)
+def _fifo2stderr(label):
+ """Create a fifo, and copy it to stderr, prepending label to each line.
+
+ Return value is the path to the new FIFO.
+
+ +label+ should contain only alphanumerics: it is also used as part
+ of the FIFO filename.
+ """
+ fifo = os.path.join(TEST_TMPDIR, label+'.fifo')
+ try:
+ os.remove(fifo)
+ except OSError as error:
+ if error.errno != errno.ENOENT:
+ raise
+ os.mkfifo(fifo, 0700)
+ subprocess.Popen(
+ ['sed', '-e', 's/^/['+label+'] /', fifo],
+ stdout=sys.stderr)
+ return fifo
+
def run(leave_running_atexit=False):
"""Ensure an API server is running, and ARVADOS_API_* env vars have
admin credentials for it.
# Delete cached discovery document.
shutil.rmtree(arvados.http_cache('discovery'))
- pid_file = os.path.join(SERVICES_SRC_DIR, 'api', SERVER_PID_PATH)
+ pid_file = _pidfile('api')
pid_file_ok = find_server_pid(pid_file, 0)
existing_api_host = os.environ.get('ARVADOS_TEST_API_HOST', my_api_host)
start_msg = subprocess.check_output(
['bundle', 'exec',
'passenger', 'start', '-d', '-p{}'.format(port),
- '--pid-file', os.path.join(os.getcwd(), pid_file),
+ '--pid-file', pid_file,
'--log-file', os.path.join(os.getcwd(), 'log/test.log'),
'--ssl',
'--ssl-certificate', 'tmp/self-signed.pem',
"""
global my_api_host
if force or my_api_host is not None:
- kill_server_pid(os.path.join(SERVICES_SRC_DIR, 'api', SERVER_PID_PATH))
+ kill_server_pid(_pidfile('api'))
my_api_host = None
def _start_keep(n, keep_args):
for arg, val in keep_args.iteritems():
keep_cmd.append("{}={}".format(arg, val))
- logf = open(os.path.join(TEST_TMPDIR, 'keep{}.log'.format(n)), 'a+')
+ logf = open(_fifo2stderr('keep{}'.format(n)), 'w')
kp0 = subprocess.Popen(
keep_cmd, stdin=open('/dev/null'), stdout=logf, stderr=logf, close_fds=True)
+
with open(_pidfile('keep{}'.format(n)), 'w') as f:
f.write(str(kp0.pid))
keep_args['-enforce-permissions'] = 'true'
with open(os.path.join(TEST_TMPDIR, "keep.data-manager-token-file"), "w") as f:
keep_args['-data-manager-token-file'] = f.name
- f.write(os.environ['ARVADOS_API_TOKEN'])
+ f.write(auth_token('data_manager'))
keep_args['-never-delete'] = 'false'
api = arvados.api(
token=os.environ['ARVADOS_API_TOKEN'],
insecure=True)
- for d in api.keep_services().list().execute()['items']:
+ for d in api.keep_services().list(filters=[['service_type','=','disk']]).execute()['items']:
api.keep_services().delete(uuid=d['uuid']).execute()
for d in api.keep_disks().list().execute()['items']:
api.keep_disks().delete(uuid=d['uuid']).execute()
'keep_disk': {'keep_service_uuid': svc['uuid'] }
}).execute()
+ # If keepproxy is running, send SIGHUP to make it discover the new
+ # keepstore services.
+ proxypidfile = _pidfile('keepproxy')
+ if os.path.exists(proxypidfile):
+ os.kill(int(open(proxypidfile).read()), signal.SIGHUP)
+
def _stop_keep(n):
- kill_server_pid(_pidfile('keep{}'.format(n)), 0)
+ kill_server_pid(_pidfile('keep{}'.format(n)))
if os.path.exists("{}/keep{}.volume".format(TEST_TMPDIR, n)):
with open("{}/keep{}.volume".format(TEST_TMPDIR, n), 'r') as r:
shutil.rmtree(r.read(), True)
return
stop_keep_proxy()
- admin_token = auth_token('admin')
port = find_available_port()
env = os.environ.copy()
- env['ARVADOS_API_TOKEN'] = admin_token
+ env['ARVADOS_API_TOKEN'] = auth_token('anonymous')
+ logf = open(_fifo2stderr('keepproxy'), 'w')
kp = subprocess.Popen(
['keepproxy',
'-pid='+_pidfile('keepproxy'),
'-listen=:{}'.format(port)],
- env=env, stdin=open('/dev/null'), stdout=sys.stderr)
+ env=env, stdin=open('/dev/null'), stdout=logf, stderr=logf, close_fds=True)
api = arvados.api(
version='v1',
host=os.environ['ARVADOS_API_HOST'],
- token=admin_token,
+ token=auth_token('admin'),
insecure=True)
for d in api.keep_services().list(
filters=[['service_type','=','proxy']]).execute()['items']:
def stop_keep_proxy():
if 'ARVADOS_TEST_PROXY_SERVICES' in os.environ:
return
- kill_server_pid(_pidfile('keepproxy'), wait=0)
+ kill_server_pid(_pidfile('keepproxy'))
def run_arv_git_httpd():
if 'ARVADOS_TEST_PROXY_SERVICES' in os.environ:
gitport = find_available_port()
env = os.environ.copy()
env.pop('ARVADOS_API_TOKEN', None)
+ logf = open(_fifo2stderr('arv-git-httpd'), 'w')
agh = subprocess.Popen(
['arv-git-httpd',
'-repo-root='+gitdir+'/test',
'-address=:'+str(gitport)],
- env=env, stdin=open('/dev/null'), stdout=sys.stderr)
+ env=env, stdin=open('/dev/null'), stdout=logf, stderr=logf)
with open(_pidfile('arv-git-httpd'), 'w') as f:
f.write(str(agh.pid))
_setport('arv-git-httpd', gitport)
def stop_arv_git_httpd():
if 'ARVADOS_TEST_PROXY_SERVICES' in os.environ:
return
- kill_server_pid(_pidfile('arv-git-httpd'), wait=0)
+ kill_server_pid(_pidfile('arv-git-httpd'))
+
+def run_keep_web():
+ if 'ARVADOS_TEST_PROXY_SERVICES' in os.environ:
+ return
+ stop_keep_web()
+
+ keepwebport = find_available_port()
+ env = os.environ.copy()
+ env['ARVADOS_API_TOKEN'] = auth_token('anonymous')
+ logf = open(_fifo2stderr('keep-web'), 'w')
+ keepweb = subprocess.Popen(
+ ['keep-web',
+ '-allow-anonymous',
+ '-attachment-only-host=download:'+str(keepwebport),
+ '-listen=:'+str(keepwebport)],
+ env=env, stdin=open('/dev/null'), stdout=logf, stderr=logf)
+ with open(_pidfile('keep-web'), 'w') as f:
+ f.write(str(keepweb.pid))
+ _setport('keep-web', keepwebport)
+ _wait_until_port_listens(keepwebport)
+
+def stop_keep_web():
+ if 'ARVADOS_TEST_PROXY_SERVICES' in os.environ:
+ return
+ kill_server_pid(_pidfile('keep-web'))
def run_nginx():
if 'ARVADOS_TEST_PROXY_SERVICES' in os.environ:
return
nginxconf = {}
+ nginxconf['KEEPWEBPORT'] = _getport('keep-web')
+ nginxconf['KEEPWEBDLSSLPORT'] = find_available_port()
+ nginxconf['KEEPWEBSSLPORT'] = find_available_port()
nginxconf['KEEPPROXYPORT'] = _getport('keepproxy')
nginxconf['KEEPPROXYSSLPORT'] = find_available_port()
nginxconf['GITPORT'] = _getport('arv-git-httpd')
nginxconf['GITSSLPORT'] = find_available_port()
nginxconf['SSLCERT'] = os.path.join(SERVICES_SRC_DIR, 'api', 'tmp', 'self-signed.pem')
nginxconf['SSLKEY'] = os.path.join(SERVICES_SRC_DIR, 'api', 'tmp', 'self-signed.key')
- nginxconf['ACCESSLOG'] = os.path.join(TEST_TMPDIR, 'nginx_access_log.fifo')
+ nginxconf['ACCESSLOG'] = _fifo2stderr('nginx_access_log')
conftemplatefile = os.path.join(MY_DIRNAME, 'nginx.conf')
conffile = os.path.join(TEST_TMPDIR, 'nginx.conf')
env = os.environ.copy()
env['PATH'] = env['PATH']+':/sbin:/usr/sbin:/usr/local/sbin'
- try:
- os.remove(nginxconf['ACCESSLOG'])
- except OSError as error:
- if error.errno != errno.ENOENT:
- raise
-
- os.mkfifo(nginxconf['ACCESSLOG'], 0700)
nginx = subprocess.Popen(
['nginx',
'-g', 'error_log stderr info;',
'-g', 'pid '+_pidfile('nginx')+';',
'-c', conffile],
env=env, stdin=open('/dev/null'), stdout=sys.stderr)
- cat_access = subprocess.Popen(
- ['cat', nginxconf['ACCESSLOG']],
- stdout=sys.stderr)
+ _setport('keep-web-dl-ssl', nginxconf['KEEPWEBDLSSLPORT'])
+ _setport('keep-web-ssl', nginxconf['KEEPWEBSSLPORT'])
_setport('keepproxy-ssl', nginxconf['KEEPPROXYSSLPORT'])
_setport('arv-git-httpd-ssl', nginxconf['GITSSLPORT'])
def stop_nginx():
if 'ARVADOS_TEST_PROXY_SERVICES' in os.environ:
return
- kill_server_pid(_pidfile('nginx'), wait=0)
+ kill_server_pid(_pidfile('nginx'))
def _pidfile(program):
return os.path.join(TEST_TMPDIR, program + '.pid')
MAIN_SERVER = None
KEEP_SERVER = None
KEEP_PROXY_SERVER = None
+ KEEP_WEB_SERVER = None
@staticmethod
def _restore_dict(src, dest):
for server_kwargs, start_func, stop_func in (
(cls.MAIN_SERVER, run, reset),
(cls.KEEP_SERVER, run_keep, stop_keep),
- (cls.KEEP_PROXY_SERVER, run_keep_proxy, stop_keep_proxy)):
+ (cls.KEEP_PROXY_SERVER, run_keep_proxy, stop_keep_proxy),
+ (cls.KEEP_WEB_SERVER, run_keep_web, stop_keep_web)):
if server_kwargs is not None:
start_func(**server_kwargs)
cls._cleanup_funcs.append(stop_func)
'start', 'stop',
'start_keep', 'stop_keep',
'start_keep_proxy', 'stop_keep_proxy',
+ 'start_keep-web', 'stop_keep-web',
'start_arv-git-httpd', 'stop_arv-git-httpd',
'start_nginx', 'stop_nginx',
]
elif args.action == 'start_keep':
run_keep(enforce_permissions=args.keep_enforce_permissions, num_servers=args.num_keep_servers)
elif args.action == 'stop_keep':
- stop_keep()
+ stop_keep(num_servers=args.num_keep_servers)
elif args.action == 'start_keep_proxy':
run_keep_proxy()
elif args.action == 'stop_keep_proxy':
run_arv_git_httpd()
elif args.action == 'stop_arv-git-httpd':
stop_arv_git_httpd()
+ elif args.action == 'start_keep-web':
+ run_keep_web()
+ elif args.action == 'stop_keep-web':
+ stop_keep_web()
elif args.action == 'start_nginx':
run_nginx()
elif args.action == 'stop_nginx':
#!/usr/bin/env python
# -*- coding: utf-8 -*-
-import hashlib
import io
import random
import arvados.commands.ls as arv_ls
import run_test_server
+from arvados_testutil import str_keep_locator
+
class ArvLsTestCase(run_test_server.TestCaseWithServers):
FAKE_UUID = 'zzzzz-4zz18-12345abcde12345'
def mock_api_for_manifest(self, manifest_lines, uuid=FAKE_UUID):
manifest_text = self.newline_join(manifest_lines)
- pdh = '{}+{}'.format(hashlib.md5(manifest_text).hexdigest(),
- len(manifest_text))
+ pdh = str_keep_locator(manifest_text)
coll_info = {'uuid': uuid,
'portable_data_hash': pdh,
'manifest_text': manifest_text}
import mock
import os
import unittest
-import hashlib
import time
import arvados
self.requests.append(locator)
return self.blocks.get(locator)
def put(self, data, num_retries=None):
- pdh = "%s+%i" % (hashlib.md5(data).hexdigest(), len(data))
+ pdh = tutil.str_keep_locator(data)
self.blocks[pdh] = str(data)
return pdh
n = 0
blocks = {}
for d in ['01234', '34567', '67890']:
- loc = '{}+{}'.format(hashlib.md5(d).hexdigest(), len(d))
+ loc = tutil.str_keep_locator(d)
blocks[loc] = d
stream.append(Range(loc, n, len(d)))
n += len(d)
import arvados
import copy
-import hashlib
import mock
import os
import pprint
def test_write_directory_tree_with_zero_recursion(self):
cwriter = arvados.CollectionWriter(self.api_client)
content = 'd1/d2/f3d1/f2f1'
- blockhash = hashlib.md5(content).hexdigest() + '+' + str(len(content))
+ blockhash = tutil.str_keep_locator(content)
cwriter.write_directory_tree(
self.build_directory_tree(['f1', 'd1/f2', 'd1/d2/f3']),
max_manifest_depth=0)
self.assertEqual('.', writer.current_stream_name())
self.assertEqual('out', writer.current_file_name())
out_file.write('test data')
- data_loc = hashlib.md5('test data').hexdigest() + '+9'
+ data_loc = tutil.str_keep_locator('test data')
self.assertTrue(out_file.closed, "writer file not closed after context")
self.assertRaises(ValueError, out_file.write, 'extra text')
with self.mock_keep(data_loc, 200) as keep_mock:
writer = arvados.CollectionWriter(client)
with writer.open('six') as out_file:
out_file.writelines(['12', '34', '56'])
- data_loc = hashlib.md5('123456').hexdigest() + '+6'
+ data_loc = tutil.str_keep_locator('123456')
with self.mock_keep(data_loc, 200) as keep_mock:
self.assertEqual(". {} 0:6:six\n".format(data_loc),
writer.manifest_text())
def test_open_flush(self):
client = self.api_client_mock()
- data_loc1 = hashlib.md5('flush1').hexdigest() + '+6'
- data_loc2 = hashlib.md5('flush2').hexdigest() + '+6'
+ data_loc1 = tutil.str_keep_locator('flush1')
+ data_loc2 = tutil.str_keep_locator('flush2')
with self.mock_keep((data_loc1, 200), (data_loc2, 200)) as keep_mock:
writer = arvados.CollectionWriter(client)
with writer.open('flush_test') as out_file:
out_file.write('1st')
with writer.open('.', '2') as out_file:
out_file.write('2nd')
- data_loc = hashlib.md5('1st2nd').hexdigest() + '+6'
+ data_loc = tutil.str_keep_locator('1st2nd')
with self.mock_keep(data_loc, 200) as keep_mock:
self.assertEqual(". {} 0:3:1 3:3:2\n".format(data_loc),
writer.manifest_text())
def test_two_opens_two_streams(self):
client = self.api_client_mock()
- data_loc1 = hashlib.md5('file').hexdigest() + '+4'
- data_loc2 = hashlib.md5('indir').hexdigest() + '+5'
+ data_loc1 = tutil.str_keep_locator('file')
+ data_loc2 = tutil.str_keep_locator('indir')
with self.mock_keep((data_loc1, 200), (data_loc2, 200)) as keep_mock:
writer = arvados.CollectionWriter(client)
with writer.open('file') as out_file:
mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
self.assertEqual(
- mock.responses[0].getopt(pycurl.TIMEOUT_MS),
- int(arvados.KeepClient.DEFAULT_TIMEOUT[1]*1000))
+ mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
+ int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
+ self.assertEqual(
+ mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
+ int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
def test_put_timeout(self):
api_client = self.mock_keep_services(count=1)
mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
self.assertEqual(
- mock.responses[0].getopt(pycurl.TIMEOUT_MS),
- int(arvados.KeepClient.DEFAULT_TIMEOUT[1]*1000))
+ mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
+ int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
+ self.assertEqual(
+ mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
+ int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
def test_proxy_get_timeout(self):
api_client = self.mock_keep_services(service_type='proxy', count=1)
mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
self.assertEqual(
- mock.responses[0].getopt(pycurl.TIMEOUT_MS),
- int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]*1000))
+ mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
+ int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
+ self.assertEqual(
+ mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
+ int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
def test_proxy_put_timeout(self):
api_client = self.mock_keep_services(service_type='proxy', count=1)
mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
self.assertEqual(
- mock.responses[0].getopt(pycurl.TIMEOUT_MS),
- int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]*1000))
+ mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
+ int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
+ self.assertEqual(
+ mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
+ int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
def check_no_services_error(self, verb, exc_class):
api_client = mock.MagicMock(name='api_client')
def test_put_error_does_not_include_successful_puts(self):
data = 'partial failure test'
- data_loc = '{}+{}'.format(hashlib.md5(data).hexdigest(), len(data))
+ data_loc = tutil.str_keep_locator(data)
api_client = self.mock_keep_services(count=3)
with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
def test_proxy_put_with_no_writable_services(self):
data = 'test with no writable services'
- data_loc = '{}+{}'.format(hashlib.md5(data).hexdigest(), len(data))
+ data_loc = tutil.str_keep_locator(data)
api_client = self.mock_keep_services(service_type='proxy', read_only=True, count=1)
with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
self.assertEqual(True, ("no Keep services available" in str(exc_check.exception)))
self.assertEqual(0, len(exc_check.exception.request_errors()))
+ def test_oddball_service_get(self):
+ body = 'oddball service get'
+ api_client = self.mock_keep_services(service_type='fancynewblobstore')
+ with tutil.mock_keep_responses(body, 200):
+ keep_client = arvados.KeepClient(api_client=api_client)
+ actual = keep_client.get(tutil.str_keep_locator(body))
+ self.assertEqual(body, actual)
+
+ def test_oddball_service_put(self):
+ body = 'oddball service put'
+ pdh = tutil.str_keep_locator(body)
+ api_client = self.mock_keep_services(service_type='fancynewblobstore')
+ with tutil.mock_keep_responses(pdh, 200):
+ keep_client = arvados.KeepClient(api_client=api_client)
+ actual = keep_client.put(body, copies=1)
+ self.assertEqual(pdh, actual)
+
+ def test_oddball_service_writer_count(self):
+ body = 'oddball service writer count'
+ pdh = tutil.str_keep_locator(body)
+ api_client = self.mock_keep_services(service_type='fancynewblobstore',
+ count=4)
+ headers = {'x-keep-replicas-stored': 3}
+ with tutil.mock_keep_responses(pdh, 200, 418, 418, 418,
+ **headers) as req_mock:
+ keep_client = arvados.KeepClient(api_client=api_client)
+ actual = keep_client.put(body, copies=2)
+ self.assertEqual(pdh, actual)
+ self.assertEqual(1, req_mock.call_count)
+
@tutil.skip_sleep
class KeepClientRendezvousTestCase(unittest.TestCase, tutil.ApiClientMock):
def check_64_zeros_error_order(self, verb, exc_class):
data = '0' * 64
if verb == 'get':
- data = hashlib.md5(data).hexdigest() + '+1234'
+ data = tutil.str_keep_locator(data)
# Arbitrary port number:
aport = random.randint(1024,65535)
api_client = self.mock_keep_services(service_port=aport, count=self.services)
class KeepClientTimeout(unittest.TestCase, tutil.ApiClientMock):
- DATA = 'x' * 2**10
+ # BANDWIDTH_LOW_LIM must be less than len(DATA) so we can transfer
+ # 1s worth of data and then trigger bandwidth errors before running
+ # out of data.
+ DATA = 'x'*2**11
+ BANDWIDTH_LOW_LIM = 1024
+ TIMEOUT_TIME = 1.0
class assertTakesBetween(unittest.TestCase):
def __init__(self, tmin, tmax):
self.t0 = time.time()
def __exit__(self, *args, **kwargs):
- self.assertGreater(time.time() - self.t0, self.tmin)
- self.assertLess(time.time() - self.t0, self.tmax)
+ # Round times to milliseconds, like CURL. Otherwise, we
+ # fail when CURL reaches a 1s timeout at 0.9998s.
+ delta = round(time.time() - self.t0, 3)
+ self.assertGreaterEqual(delta, self.tmin)
+ self.assertLessEqual(delta, self.tmax)
+
+ class assertTakesGreater(unittest.TestCase):
+ def __init__(self, tmin):
+ self.tmin = tmin
+
+ def __enter__(self):
+ self.t0 = time.time()
+
+ def __exit__(self, *args, **kwargs):
+ delta = round(time.time() - self.t0, 3)
+ self.assertGreaterEqual(delta, self.tmin)
def setUp(self):
sock = socket.socket()
def tearDown(self):
self.server.shutdown()
- def keepClient(self, timeouts=(0.1, 1.0)):
+ def keepClient(self, timeouts=(0.1, TIMEOUT_TIME, BANDWIDTH_LOW_LIM)):
return arvados.KeepClient(
api_client=self.api_client,
timeout=timeouts)
)
with self.assertTakesBetween(0.1, 0.5):
with self.assertRaises(arvados.errors.KeepWriteError):
- self.keepClient((0.1, 1)).put(self.DATA, copies=1, num_retries=0)
+ self.keepClient().put(self.DATA, copies=1, num_retries=0)
+
+ def test_low_bandwidth_no_delays_success(self):
+ self.server.setbandwidth(2*self.BANDWIDTH_LOW_LIM)
+ kc = self.keepClient()
+ loc = kc.put(self.DATA, copies=1, num_retries=0)
+ self.assertEqual(self.DATA, kc.get(loc, num_retries=0))
+
+ def test_too_low_bandwidth_no_delays_failure(self):
+ # Check that lessening bandwidth corresponds to failing
+ kc = self.keepClient()
+ loc = kc.put(self.DATA, copies=1, num_retries=0)
+ self.server.setbandwidth(0.5*self.BANDWIDTH_LOW_LIM)
+ with self.assertTakesGreater(self.TIMEOUT_TIME):
+ with self.assertRaises(arvados.errors.KeepReadError) as e:
+ kc.get(loc, num_retries=0)
+ with self.assertTakesGreater(self.TIMEOUT_TIME):
+ with self.assertRaises(arvados.errors.KeepWriteError):
+ kc.put(self.DATA, copies=1, num_retries=0)
+
+ def test_low_bandwidth_with_server_response_delay_failure(self):
+ kc = self.keepClient()
+ loc = kc.put(self.DATA, copies=1, num_retries=0)
+ self.server.setbandwidth(self.BANDWIDTH_LOW_LIM)
+ self.server.setdelays(response=self.TIMEOUT_TIME)
+ with self.assertTakesGreater(self.TIMEOUT_TIME):
+ with self.assertRaises(arvados.errors.KeepReadError) as e:
+ kc.get(loc, num_retries=0)
+ with self.assertTakesGreater(self.TIMEOUT_TIME):
+ with self.assertRaises(arvados.errors.KeepWriteError):
+ kc.put(self.DATA, copies=1, num_retries=0)
+
+ def test_low_bandwidth_with_server_mid_delay_failure(self):
+ kc = self.keepClient()
+ loc = kc.put(self.DATA, copies=1, num_retries=0)
+ self.server.setbandwidth(self.BANDWIDTH_LOW_LIM)
+ self.server.setdelays(mid_write=self.TIMEOUT_TIME, mid_read=self.TIMEOUT_TIME)
+ with self.assertTakesGreater(self.TIMEOUT_TIME):
+ with self.assertRaises(arvados.errors.KeepReadError) as e:
+ kc.get(loc, num_retries=0)
+ with self.assertTakesGreater(self.TIMEOUT_TIME):
+ with self.assertRaises(arvados.errors.KeepWriteError):
+ kc.put(self.DATA, copies=1, num_retries=0)
def test_timeout_slow_request(self):
- self.server.setdelays(request=0.2)
- self._test_200ms()
+ loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
+ self.server.setdelays(request=.2)
+ self._test_connect_timeout_under_200ms(loc)
+ self.server.setdelays(request=2)
+ self._test_response_timeout_under_2s(loc)
def test_timeout_slow_response(self):
- self.server.setdelays(response=0.2)
- self._test_200ms()
+ loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
+ self.server.setdelays(response=.2)
+ self._test_connect_timeout_under_200ms(loc)
+ self.server.setdelays(response=2)
+ self._test_response_timeout_under_2s(loc)
def test_timeout_slow_response_body(self):
- self.server.setdelays(response_body=0.2)
- self._test_200ms()
-
- def _test_200ms(self):
- """Connect should be t<100ms, request should be 200ms <= t < 300ms"""
+ loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
+ self.server.setdelays(response_body=.2)
+ self._test_connect_timeout_under_200ms(loc)
+ self.server.setdelays(response_body=2)
+ self._test_response_timeout_under_2s(loc)
+ def _test_connect_timeout_under_200ms(self, loc):
# Allow 100ms to connect, then 1s for response. Everything
# should work, and everything should take at least 200ms to
# return.
- kc = self.keepClient((.1, 1))
+ kc = self.keepClient(timeouts=(.1, 1))
with self.assertTakesBetween(.2, .3):
- loc = kc.put(self.DATA, copies=1, num_retries=0)
+ kc.put(self.DATA, copies=1, num_retries=0)
with self.assertTakesBetween(.2, .3):
self.assertEqual(self.DATA, kc.get(loc, num_retries=0))
- # Allow 1s to connect, then 100ms for response. Nothing should
- # work, and everything should take at least 100ms to return.
- kc = self.keepClient((1, .1))
- with self.assertTakesBetween(.1, .2):
+ def _test_response_timeout_under_2s(self, loc):
+ # Allow 10s to connect, then 1s for response. Nothing should
+ # work, and everything should take at least 1s to return.
+ kc = self.keepClient(timeouts=(10, 1))
+ with self.assertTakesBetween(1, 1.9):
with self.assertRaises(arvados.errors.KeepReadError):
kc.get(loc, num_retries=0)
- with self.assertTakesBetween(.1, .2):
+ with self.assertTakesBetween(1, 1.9):
with self.assertRaises(arvados.errors.KeepWriteError):
kc.put(self.DATA, copies=1, num_retries=0)
fixturesets = Dir.glob(Rails.root.join('test', 'fixtures', '*.yml')).
collect { |yml| yml.match(/([^\/]*)\.yml$/)[1] }
+ # Don't reset keep_services: clients need to discover our
+ # integration-testing keepstores, not test fixtures.
+ fixturesets -= %w[keep_services]
+
table_names = '"' + ActiveRecord::Base.connection.tables.join('","') + '"'
attempts_left = 20
api_token: 1a9ffdcga2o7cw8q12dndskomgs1ygli3ns9k2o9hgzgmktc78
expires_at: 2038-01-01 00:00:00
+data_manager:
+ api_client: untrusted
+ user: system_user
+ api_token: 320mkve8qkswstz7ff61glpk3mhgghmg67wmic7elw4z41pke1
+ expires_at: 2038-01-01 00:00:00
+ scopes:
+ - GET /arvados/v1/collections
+ - GET /arvados/v1/keep_services
+ - GET /arvados/v1/keep_services/accessible
+ - GET /arvados/v1/users/current
+ - POST /arvados/v1/logs
+
miniadmin:
api_client: untrusted
user: miniadmin
user: active
api_token: activecollectionsabcdefghijklmnopqrstuvwxyz1234567
expires_at: 2038-01-01 00:00:00
- scopes: ["GET /arvados/v1/collections/", "GET /arvados/v1/keep_disks"]
+ scopes: ["GET /arvados/v1/collections/", "GET /arvados/v1/keep_services/accessible"]
active_userlist:
api_client: untrusted
"time"
)
-const (
- ActiveUserToken = "3kg6k6lzmp9kj5cpkcoxie963cmvjahbt2fod9zru30k1jqdmi"
- AdminToken = "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h"
-)
-
var arv arvadosclient.ArvadosClient
var keepClient *keepclient.KeepClient
var keepServers []string
arvadostest.StartKeep(2, false)
arv = makeArvadosClient()
+ arv.ApiToken = arvadostest.DataManagerToken
// keep client
keepClient = &keepclient.KeepClient{
return match[1] + "+" + match[2]
}
+func switchToken(t string) func() {
+ orig := arv.ApiToken
+ restore := func() {
+ arv.ApiToken = orig
+ }
+ arv.ApiToken = t
+ return restore
+}
+
func getCollection(t *testing.T, uuid string) Dict {
+ defer switchToken(arvadostest.AdminToken)()
+
getback := make(Dict)
err := arv.Get("collections", uuid, nil, &getback)
if err != nil {
}
func updateCollection(t *testing.T, uuid string, paramName string, paramValue string) {
+ defer switchToken(arvadostest.AdminToken)()
+
err := arv.Update("collections", uuid, arvadosclient.Dict{
"collection": arvadosclient.Dict{
paramName: paramValue,
type Dict map[string]interface{}
func deleteCollection(t *testing.T, uuid string) {
+ defer switchToken(arvadostest.AdminToken)()
+
getback := make(Dict)
err := arv.Delete("collections", uuid, nil, &getback)
if err != nil {
path := keepServers[i] + "/index"
client := http.Client{}
req, err := http.NewRequest("GET", path, nil)
- req.Header.Add("Authorization", "OAuth2 "+AdminToken)
+ req.Header.Add("Authorization", "OAuth2 "+arvadostest.DataManagerToken)
req.Header.Add("Content-Type", "application/octet-stream")
resp, err := client.Do(req)
defer resp.Body.Close()
func getStatus(t *testing.T, path string) interface{} {
client := http.Client{}
req, err := http.NewRequest("GET", path, nil)
- req.Header.Add("Authorization", "OAuth2 "+AdminToken)
+ req.Header.Add("Authorization", "OAuth2 "+arvadostest.DataManagerToken)
req.Header.Add("Content-Type", "application/octet-stream")
resp, err := client.Do(req)
if err != nil {
defer TearDownDataManagerTest(t)
SetupDataManagerTest(t)
- arv.ApiToken = ActiveUserToken
+ arv.ApiToken = arvadostest.ActiveToken
err := singlerun(arv)
if err == nil {
will appear if it exists.
""".lstrip()
- def __init__(self, parent_inode, inodes, api, num_retries):
+ def __init__(self, parent_inode, inodes, api, num_retries, pdh_only=False):
super(MagicDirectory, self).__init__(parent_inode, inodes)
self.api = api
self.num_retries = num_retries
+ self.pdh_only = pdh_only
def __setattr__(self, name, value):
super(MagicDirectory, self).__setattr__(name, value)
# If we're the root directory, add an identical by_id subdirectory.
if self.inode == llfuse.ROOT_INODE:
self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
- self.inode, self.inodes, self.api, self.num_retries))
+ self.inode, self.inodes, self.api, self.num_retries, self.pdh_only))
def __contains__(self, k):
if k in self._entries:
return True
- if not portable_data_hash_pattern.match(k) and not uuid_pattern.match(k):
+ if not portable_data_hash_pattern.match(k) and (self.pdh_only or not uuid_pattern.match(k)):
return False
try:
help="""Mount subdirectories listed by tag.""")
mount_mode.add_argument('--by-id', action='store_true',
help="""Mount subdirectories listed by portable data hash or uuid.""")
+ mount_mode.add_argument('--by-pdh', action='store_true',
+ help="""Mount subdirectories listed by portable data hash.""")
mount_mode.add_argument('--project', type=str, help="""Mount a specific project.""")
mount_mode.add_argument('--collection', type=str, help="""Mount only the specified collection.""")
now = time.time()
dir_class = None
dir_args = [llfuse.ROOT_INODE, operations.inodes, api, args.retries]
- if args.by_id:
+ if args.by_id or args.by_pdh:
# Set up the request handler with the 'magic directory' at the root
dir_class = MagicDirectory
+ dir_args.append(args.by_pdh)
elif args.by_tag:
dir_class = TagsDirectory
elif args.shared:
dir_args[0] = e.inode
e._entries['by_id'] = operations.inodes.add_entry(MagicDirectory(*dir_args))
+
e._entries['by_tag'] = operations.inodes.add_entry(TagsDirectory(*dir_args))
dir_args.append(usr)
llfuse.init(operations, args.mountpoint, opts)
# Subscribe to change events from API server
- operations.listen_for_events(api)
+ if not args.by_pdh:
+ operations.listen_for_events(api)
t = threading.Thread(None, lambda: llfuse.main())
t.start()
self.assertEqual("_", fuse.sanitize_filename(""))
self.assertEqual("_", fuse.sanitize_filename("."))
self.assertEqual("__", fuse.sanitize_filename(".."))
+
+
+class FuseMagicTestPDHOnly(MountTestBase):
+ def setUp(self, api=None):
+ super(FuseMagicTestPDHOnly, self).setUp(api=api)
+
+ cw = arvados.CollectionWriter()
+
+ cw.start_new_file('thing1.txt')
+ cw.write("data 1")
+
+ self.testcollection = cw.finish()
+ self.test_manifest = cw.manifest_text()
+ created = self.api.collections().create(body={"manifest_text":self.test_manifest}).execute()
+ self.testcollectionuuid = str(created['uuid'])
+
+ def verify_pdh_only(self, pdh_only=False, skip_pdh_only=False):
+ if skip_pdh_only is True:
+ self.make_mount(fuse.MagicDirectory) # in this case, the default by_id applies
+ else:
+ self.make_mount(fuse.MagicDirectory, pdh_only=pdh_only)
+
+ mount_ls = llfuse.listdir(self.mounttmp)
+ self.assertIn('README', mount_ls)
+ self.assertFalse(any(arvados.util.keep_locator_pattern.match(fn) or
+ arvados.util.uuid_pattern.match(fn)
+ for fn in mount_ls),
+ "new FUSE MagicDirectory lists Collection")
+
+ # look up using pdh should succeed in all cases
+ self.assertDirContents(self.testcollection, ['thing1.txt'])
+ self.assertDirContents(os.path.join('by_id', self.testcollection),
+ ['thing1.txt'])
+ mount_ls = llfuse.listdir(self.mounttmp)
+ self.assertIn('README', mount_ls)
+ self.assertIn(self.testcollection, mount_ls)
+ self.assertIn(self.testcollection,
+ llfuse.listdir(os.path.join(self.mounttmp, 'by_id')))
+
+ files = {}
+ files[os.path.join(self.mounttmp, self.testcollection, 'thing1.txt')] = 'data 1'
+
+ for k, v in files.items():
+ with open(os.path.join(self.mounttmp, k)) as f:
+ self.assertEqual(v, f.read())
+
+ # look up using uuid should fail when pdh_only is set
+ if pdh_only is True:
+ with self.assertRaises(OSError):
+ self.assertDirContents(os.path.join('by_id', self.testcollectionuuid),
+ ['thing1.txt'])
+ else:
+ self.assertDirContents(os.path.join('by_id', self.testcollectionuuid),
+ ['thing1.txt'])
+
+ def test_with_pdh_only_true(self):
+ self.verify_pdh_only(pdh_only=True)
+
+ def test_with_pdh_only_false(self):
+ self.verify_pdh_only(pdh_only=False)
+
+ def test_with_default_by_id(self):
+ self.verify_pdh_only(skip_pdh_only=True)
import (
"flag"
"fmt"
+ "os"
+ "strconv"
)
var anonymousTokens tokenSet
type tokenSet []string
-func (ts *tokenSet) Set(t string) error {
- *ts = append(*ts, t)
- return nil
+func (ts *tokenSet) Set(s string) error {
+ v, err := strconv.ParseBool(s)
+ if v && len(*ts) == 0 {
+ *ts = append(*ts, os.Getenv("ARVADOS_API_TOKEN"))
+ } else if !v {
+ *ts = (*ts)[:0]
+ }
+ return err
}
func (ts *tokenSet) String() string {
- return fmt.Sprintf("%+v", (*ts)[:])
+ return fmt.Sprintf("%v", len(*ts) > 0)
+}
+
+func (ts *tokenSet) IsBoolFlag() bool {
+ return true
}
func init() {
- flag.Var(&anonymousTokens, "anonymous-token",
- "API token to try when none of the tokens provided in an HTTP request succeed in reading the desired collection. Multiple anonymous tokens can be provided by using this flag more than once; each token will be attempted in turn until one works.")
+ flag.Var(&anonymousTokens, "allow-anonymous",
+ "Serve public data to anonymous clients. Try the token supplied in the ARVADOS_API_TOKEN environment variable when none of the tokens provided in an HTTP request succeed in reading the desired collection.")
}
//
// Serve HTTP requests at port 1234 on all interfaces:
//
-// keep-web -address=:1234
+// keep-web -listen=:1234
//
// Serve HTTP requests at port 1234 on the interface with IP address 1.2.3.4:
//
-// keep-web -address=1.2.3.4:1234
+// keep-web -listen=1.2.3.4:1234
//
// Proxy configuration
//
//
// Anonymous downloads
//
-// Use the -anonymous-token option to specify a token to use when clients
-// try to retrieve files without providing their own Arvados API token.
+// Use the -allow-anonymous flag with an ARVADOS_API_TOKEN environment
+// variable to specify a token to use when clients try to retrieve
+// files without providing their own Arvados API token.
//
-// keep-web [...] -anonymous-token=zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz
+// export ARVADOS_API_TOKEN=zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz
+// keep-web [...] -allow-anonymous
//
// See http://doc.arvados.org/install/install-keep-web.html for examples.
//
// only when the designated origin matches exactly the Host header
// provided by the client or downstream proxy.
//
-// keep-web -address :9999 -attachment-only-host domain.example:9999
+// keep-web -listen :9999 -attachment-only-host domain.example:9999
//
// Trust All Content mode
//
//
// In such cases you can enable trust-all-content mode.
//
-// keep-web -address :9999 -trust-all-content
+// keep-web -listen :9999 -trust-all-content
//
// When using trust-all-content mode, the only effect of the
// -attachment-only-host option is to add a "Content-Disposition:
// attachment" header.
//
-// keep-web -address :9999 -attachment-only-host domain.example:9999 -trust-all-content
+// keep-web -listen :9999 -attachment-only-host domain.example:9999 -trust-all-content
//
package main
"net/http"
"net/url"
"os"
+ "regexp"
+ "strconv"
"strings"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
Path: "/",
HttpOnly: true,
})
- redir := (&url.URL{Host: r.Host, Path: r.URL.Path}).String()
+
+ // Propagate query parameters (except api_token) from
+ // the original request.
+ redirQuery := r.URL.Query()
+ redirQuery.Del("api_token")
+
+ redir := (&url.URL{
+ Host: r.Host,
+ Path: r.URL.Path,
+ RawQuery: redirQuery.Encode(),
+ }).String()
w.Header().Add("Location", redir)
statusCode, statusText = http.StatusSeeOther, redir
}
defer rdr.Close()
- // One or both of these can be -1 if not found:
basenamePos := strings.LastIndex(filename, "/")
+ if basenamePos < 0 {
+ basenamePos = 0
+ }
extPos := strings.LastIndex(filename, ".")
if extPos > basenamePos {
// Now extPos is safely >= 0.
if rdr, ok := rdr.(keepclient.ReadCloserWithLen); ok {
w.Header().Set("Content-Length", fmt.Sprintf("%d", rdr.Len()))
}
- if attachment {
- w.Header().Set("Content-Disposition", "attachment")
- }
- w.WriteHeader(http.StatusOK)
- _, err = io.Copy(w, rdr)
+ applyContentDispositionHdr(w, r, filename[basenamePos:], attachment)
+ rangeRdr, statusCode := applyRangeHdr(w, r, rdr)
+
+ w.WriteHeader(statusCode)
+ _, err = io.Copy(w, rangeRdr)
if err != nil {
statusCode, statusText = http.StatusBadGateway, err.Error()
}
}
+
+var rangeRe = regexp.MustCompile(`^bytes=0-([0-9]*)$`)
+
+func applyRangeHdr(w http.ResponseWriter, r *http.Request, rdr keepclient.ReadCloserWithLen) (io.Reader, int) {
+ w.Header().Set("Accept-Ranges", "bytes")
+ hdr := r.Header.Get("Range")
+ fields := rangeRe.FindStringSubmatch(hdr)
+ if fields == nil {
+ return rdr, http.StatusOK
+ }
+ rangeEnd, err := strconv.ParseInt(fields[1], 10, 64)
+ if err != nil {
+ // Empty or too big for int64 == send entire content
+ return rdr, http.StatusOK
+ }
+ if uint64(rangeEnd) >= rdr.Len() {
+ return rdr, http.StatusOK
+ }
+ w.Header().Set("Content-Length", fmt.Sprintf("%d", rangeEnd+1))
+ w.Header().Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", 0, rangeEnd, rdr.Len()))
+ return &io.LimitedReader{R: rdr, N: rangeEnd + 1}, http.StatusPartialContent
+}
+
+func applyContentDispositionHdr(w http.ResponseWriter, r *http.Request, filename string, isAttachment bool) {
+ disposition := "inline"
+ if isAttachment {
+ disposition = "attachment"
+ }
+ if strings.ContainsRune(r.RequestURI, '?') {
+ // Help the UA realize that the filename is just
+ // "filename.txt", not
+ // "filename.txt?disposition=attachment".
+ //
+ // TODO(TC): Follow advice at RFC 6266 appendix D
+ disposition += "; filename=" + strconv.QuoteToASCII(filename)
+ }
+ if disposition != "inline" {
+ w.Header().Set("Content-Disposition", disposition)
+ }
+}
arvadostest.NonexistentCollection + ".example.com/t=" + arvadostest.ActiveToken + "/theperthcountyconspiracy",
} {
resp := httptest.NewRecorder()
+ u := mustParseURL(testURL)
req := &http.Request{
- Method: "GET",
- URL: mustParseURL(testURL),
+ Method: "GET",
+ URL: u,
+ RequestURI: u.RequestURI(),
}
(&handler{}).ServeHTTP(resp, req)
c.Check(resp.Code, check.Equals, http.StatusNotFound)
} {
u := mustParseURL("http://" + hostPath)
req := &http.Request{
- Method: "GET",
- Host: u.Host,
- URL: u,
- Header: http.Header{},
+ Method: "GET",
+ Host: u.Host,
+ URL: u,
+ RequestURI: u.RequestURI(),
+ Header: http.Header{},
}
failCode := authz(req, tok)
resp := doReq(req)
cookies := (&http.Response{Header: resp.Header()}).Cookies()
u, _ := req.URL.Parse(resp.Header().Get("Location"))
req = &http.Request{
- Method: "GET",
- Host: u.Host,
- URL: u,
- Header: http.Header{},
+ Method: "GET",
+ Host: u.Host,
+ URL: u,
+ RequestURI: u.RequestURI(),
+ Header: http.Header{},
}
for _, c := range cookies {
req.AddCookie(c)
)
}
+// If client requests an attachment by putting ?disposition=attachment
+// in the query string, and gets redirected, the redirect target
+// should respond with an attachment.
+func (s *IntegrationSuite) TestVhostRedirectQueryTokenRequestAttachment(c *check.C) {
+ resp := s.testVhostRedirectTokenToCookie(c, "GET",
+ arvadostest.FooCollection+".example.com/foo",
+ "?disposition=attachment&api_token="+arvadostest.ActiveToken,
+ "",
+ "",
+ http.StatusOK,
+ "foo",
+ )
+ c.Check(strings.Split(resp.Header().Get("Content-Disposition"), ";")[0], check.Equals, "attachment")
+}
+
func (s *IntegrationSuite) TestVhostRedirectQueryTokenTrustAllContent(c *check.C) {
defer func(orig bool) {
trustAllContent = orig
)
}
+func (s *IntegrationSuite) TestRange(c *check.C) {
+ u, _ := url.Parse("http://example.com/c=" + arvadostest.HelloWorldCollection + "/Hello%20world.txt")
+ req := &http.Request{
+ Method: "GET",
+ Host: u.Host,
+ URL: u,
+ RequestURI: u.RequestURI(),
+ Header: http.Header{"Range": {"bytes=0-4"}},
+ }
+ resp := httptest.NewRecorder()
+ (&handler{}).ServeHTTP(resp, req)
+ c.Check(resp.Code, check.Equals, http.StatusPartialContent)
+ c.Check(resp.Body.String(), check.Equals, "Hello")
+ c.Check(resp.Header().Get("Content-Length"), check.Equals, "5")
+ c.Check(resp.Header().Get("Content-Range"), check.Equals, "bytes 0-4/12")
+
+ req.Header.Set("Range", "bytes=0-")
+ resp = httptest.NewRecorder()
+ (&handler{}).ServeHTTP(resp, req)
+ // 200 and 206 are both correct:
+ c.Check(resp.Code, check.Equals, http.StatusOK)
+ c.Check(resp.Body.String(), check.Equals, "Hello world\n")
+ c.Check(resp.Header().Get("Content-Length"), check.Equals, "12")
+
+ // Unsupported ranges are ignored
+ for _, hdr := range []string{
+ "bytes=5-5", // non-zero start byte
+ "bytes=-5", // last 5 bytes
+ "cubits=0-5", // unsupported unit
+ "bytes=0-340282366920938463463374607431768211456", // 2^128
+ } {
+ req.Header.Set("Range", hdr)
+ resp = httptest.NewRecorder()
+ (&handler{}).ServeHTTP(resp, req)
+ c.Check(resp.Code, check.Equals, http.StatusOK)
+ c.Check(resp.Body.String(), check.Equals, "Hello world\n")
+ c.Check(resp.Header().Get("Content-Length"), check.Equals, "12")
+ c.Check(resp.Header().Get("Content-Range"), check.Equals, "")
+ c.Check(resp.Header().Get("Accept-Ranges"), check.Equals, "bytes")
+ }
+}
+
func (s *IntegrationSuite) testVhostRedirectTokenToCookie(c *check.C, method, hostPath, queryString, contentType, reqBody string, expectStatus int, expectRespBody string) *httptest.ResponseRecorder {
u, _ := url.Parse(`http://` + hostPath + queryString)
req := &http.Request{
- Method: method,
- Host: u.Host,
- URL: u,
- Header: http.Header{"Content-Type": {contentType}},
- Body: ioutil.NopCloser(strings.NewReader(reqBody)),
+ Method: method,
+ Host: u.Host,
+ URL: u,
+ RequestURI: u.RequestURI(),
+ Header: http.Header{"Content-Type": {contentType}},
+ Body: ioutil.NopCloser(strings.NewReader(reqBody)),
}
resp := httptest.NewRecorder()
if resp.Code != http.StatusSeeOther {
return resp
}
- c.Check(resp.Body.String(), check.Matches, `.*href="//`+regexp.QuoteMeta(html.EscapeString(hostPath))+`".*`)
+ c.Check(resp.Body.String(), check.Matches, `.*href="//`+regexp.QuoteMeta(html.EscapeString(hostPath))+`(\?[^"]*)?".*`)
cookies := (&http.Response{Header: resp.Header()}).Cookies()
u, _ = u.Parse(resp.Header().Get("Location"))
req = &http.Request{
- Method: "GET",
- Host: u.Host,
- URL: u,
- Header: http.Header{},
+ Method: "GET",
+ Host: u.Host,
+ URL: u,
+ RequestURI: u.RequestURI(),
+ Header: http.Header{},
}
for _, c := range cookies {
req.AddCookie(c)
// different token before doing anything with the client). We
// set this dummy value during init so it doesn't clobber the
// one used by "run test servers".
- os.Setenv("ARVADOS_API_TOKEN", "xxx")
+ if os.Getenv("ARVADOS_API_TOKEN") == "" {
+ os.Setenv("ARVADOS_API_TOKEN", "xxx")
+ }
}
func main() {
var address string
func init() {
- flag.StringVar(&address, "address", ":80",
+ flag.StringVar(&address, "listen", ":80",
"Address to listen on: \"host:port\", or \":port\" to listen on all interfaces.")
}
"net/http"
"os"
"os/signal"
- "reflect"
"regexp"
"sync"
"syscall"
)
// Default TCP address on which to listen for requests.
-// Initialized by the -listen flag.
-const DEFAULT_ADDR = ":25107"
+// Override with -listen.
+const DefaultAddr = ":25107"
var listener net.Listener
flagset.StringVar(
&listen,
"listen",
- DEFAULT_ADDR,
+ DefaultAddr,
"Interface on which to listen for requests, in the format "+
"ipaddr:port. e.g. -listen=10.0.1.24:8000. Use -listen=:port "+
"to listen on all network interfaces.")
}
kc.Want_replicas = default_replicas
-
kc.Client.Timeout = time.Duration(timeout) * time.Second
+ go kc.RefreshServices(5*time.Minute, 3*time.Second)
listener, err = net.Listen("tcp", listen)
if err != nil {
log.Fatalf("Could not listen on %v", listen)
}
-
- go RefreshServicesList(kc)
+ log.Printf("Arvados Keep proxy started listening on %v", listener.Addr())
// Shut down the server gracefully (by closing the listener)
// if SIGTERM is received.
signal.Notify(term, syscall.SIGTERM)
signal.Notify(term, syscall.SIGINT)
- log.Printf("Arvados Keep proxy started listening on %v", listener.Addr())
-
- // Start listening for requests.
+ // Start serving requests.
http.Serve(listener, MakeRESTRouter(!no_get, !no_put, kc))
log.Println("shutting down")
expireTime int64
}
-// Refresh the keep service list every five minutes.
-func RefreshServicesList(kc *keepclient.KeepClient) {
- var previousRoots = []map[string]string{}
- var delay time.Duration = 0
- for {
- time.Sleep(delay * time.Second)
- delay = 300
- if err := kc.DiscoverKeepServers(); err != nil {
- log.Println("Error retrieving services list:", err)
- delay = 3
- continue
- }
- newRoots := []map[string]string{kc.LocalRoots(), kc.GatewayRoots()}
- if !reflect.DeepEqual(previousRoots, newRoots) {
- log.Printf("Updated services list: locals %v gateways %v", newRoots[0], newRoots[1])
- }
- if len(newRoots[0]) == 0 {
- log.Print("WARNING: No local services. Retrying in 3 seconds.")
- delay = 3
- }
- previousRoots = newRoots
- }
-}
-
// Cache the token and set an expire time. If we already have an expire time
// on the token, it is not updated.
func (this *ApiTokenCache) RememberToken(token string) {
}
func GetRemoteAddress(req *http.Request) string {
- if realip := req.Header.Get("X-Real-IP"); realip != "" {
- if forwarded := req.Header.Get("X-Forwarded-For"); forwarded != realip {
- return fmt.Sprintf("%s (X-Forwarded-For %s)", realip, forwarded)
- } else {
- return realip
- }
+ if xff := req.Header.Get("X-Forwarded-For"); xff != "" {
+ return xff + "," + req.RemoteAddr
}
return req.RemoteAddr
}
import (
"crypto/md5"
- "crypto/tls"
"fmt"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/arvadostest"
"git.curoverse.com/arvados.git/sdk/go/keepclient"
- . "gopkg.in/check.v1"
- "io"
"io/ioutil"
"log"
"net/http"
- "net/url"
"os"
"strings"
"testing"
"time"
+
+ . "gopkg.in/check.v1"
)
// Gocheck boilerplate
// Test with no keepserver to simulate errors
type NoKeepServerSuite struct{}
+var TestProxyUUID = "zzzzz-bi6l4-lrixqc4fxofbmzz"
+
// Wait (up to 1 second) for keepproxy to listen on a port. This
// avoids a race condition where we hit a "connection refused" error
// because we start testing the proxy too soon.
func (s *NoKeepServerSuite) SetUpSuite(c *C) {
arvadostest.StartAPI()
+ // We need API to have some keep services listed, but the
+ // services themselves should be unresponsive.
+ arvadostest.StartKeep(2, false)
+ arvadostest.StopKeep(2)
}
func (s *NoKeepServerSuite) SetUpTest(c *C) {
arvadostest.StopAPI()
}
-func setupProxyService() {
-
- client := &http.Client{Transport: &http.Transport{
- TLSClientConfig: &tls.Config{InsecureSkipVerify: true}}}
-
- var req *http.Request
- var err error
- if req, err = http.NewRequest("POST", fmt.Sprintf("https://%s/arvados/v1/keep_services", os.Getenv("ARVADOS_API_HOST")), nil); err != nil {
- panic(err.Error())
- }
- req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", os.Getenv("ARVADOS_API_TOKEN")))
-
- reader, writer := io.Pipe()
-
- req.Body = reader
-
- go func() {
- data := url.Values{}
- data.Set("keep_service", `{
- "service_host": "localhost",
- "service_port": 29950,
- "service_ssl_flag": false,
- "service_type": "proxy"
-}`)
-
- writer.Write([]byte(data.Encode()))
- writer.Close()
- }()
-
- var resp *http.Response
- if resp, err = client.Do(req); err != nil {
- panic(err.Error())
- }
- if resp.StatusCode != 200 {
- panic(resp.Status)
- }
-}
+func runProxy(c *C, args []string, bogusClientToken bool) *keepclient.KeepClient {
+ args = append([]string{"keepproxy"}, args...)
+ os.Args = append(args, "-listen=:0")
+ listener = nil
+ go main()
+ waitForListener()
-func runProxy(c *C, args []string, port int, bogusClientToken bool) *keepclient.KeepClient {
- if bogusClientToken {
- os.Setenv("ARVADOS_API_TOKEN", "bogus-token")
- }
arv, err := arvadosclient.MakeArvadosClient()
c.Assert(err, Equals, nil)
- kc := keepclient.KeepClient{
- Arvados: &arv,
- Want_replicas: 2,
- Using_proxy: true,
- Client: &http.Client{},
- }
- locals := map[string]string{
- "proxy": fmt.Sprintf("http://localhost:%v", port),
- }
- writableLocals := map[string]string{
- "proxy": fmt.Sprintf("http://localhost:%v", port),
- }
- kc.SetServiceRoots(locals, writableLocals, nil)
- c.Check(kc.Using_proxy, Equals, true)
- c.Check(len(kc.LocalRoots()), Equals, 1)
- for _, root := range kc.LocalRoots() {
- c.Check(root, Equals, fmt.Sprintf("http://localhost:%v", port))
- }
- log.Print("keepclient created")
if bogusClientToken {
- arvadostest.ResetEnv()
+ arv.ApiToken = "bogus-token"
}
-
- {
- os.Args = append(args, fmt.Sprintf("-listen=:%v", port))
- listener = nil
- go main()
+ kc := keepclient.New(&arv)
+ sr := map[string]string{
+ TestProxyUUID: "http://" + listener.Addr().String(),
}
+ kc.SetServiceRoots(sr, sr, sr)
+ kc.Arvados.External = true
+ kc.Using_proxy = true
- return &kc
+ return kc
}
func (s *ServerRequiredSuite) TestPutAskGet(c *C) {
- log.Print("TestPutAndGet start")
-
- os.Args = []string{"keepproxy", "-listen=:29950"}
- listener = nil
- go main()
- time.Sleep(100 * time.Millisecond)
-
- setupProxyService()
-
- os.Setenv("ARVADOS_EXTERNAL_CLIENT", "true")
- arv, err := arvadosclient.MakeArvadosClient()
- c.Assert(err, Equals, nil)
- kc, err := keepclient.MakeKeepClient(&arv)
- c.Assert(err, Equals, nil)
- c.Check(kc.Arvados.External, Equals, true)
- c.Check(kc.Using_proxy, Equals, true)
- c.Check(len(kc.LocalRoots()), Equals, 1)
- for _, root := range kc.LocalRoots() {
- c.Check(root, Equals, "http://localhost:29950")
- }
- os.Setenv("ARVADOS_EXTERNAL_CLIENT", "")
-
- waitForListener()
+ kc := runProxy(c, nil, false)
defer closeListener()
hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
c.Check(blocklen, Equals, int64(0))
log.Print("Finished Get zero block")
}
-
- log.Print("TestPutAndGet done")
}
func (s *ServerRequiredSuite) TestPutAskGetForbidden(c *C) {
- log.Print("TestPutAskGetForbidden start")
-
- kc := runProxy(c, []string{"keepproxy"}, 29951, true)
- waitForListener()
+ kc := runProxy(c, nil, true)
defer closeListener()
hash := fmt.Sprintf("%x", md5.Sum([]byte("bar")))
c.Check(blocklen, Equals, int64(0))
log.Print("Get")
}
-
- log.Print("TestPutAskGetForbidden done")
}
func (s *ServerRequiredSuite) TestGetDisabled(c *C) {
- log.Print("TestGetDisabled start")
-
- kc := runProxy(c, []string{"keepproxy", "-no-get"}, 29952, false)
- waitForListener()
+ kc := runProxy(c, []string{"-no-get"}, false)
defer closeListener()
hash := fmt.Sprintf("%x", md5.Sum([]byte("baz")))
c.Check(blocklen, Equals, int64(0))
log.Print("Get")
}
-
- log.Print("TestGetDisabled done")
}
func (s *ServerRequiredSuite) TestPutDisabled(c *C) {
- log.Print("TestPutDisabled start")
-
- kc := runProxy(c, []string{"keepproxy", "-no-put"}, 29953, false)
- waitForListener()
+ kc := runProxy(c, []string{"-no-put"}, false)
defer closeListener()
- {
- hash2, rep, err := kc.PutB([]byte("quux"))
- c.Check(hash2, Equals, "")
- c.Check(rep, Equals, 0)
- c.Check(err, Equals, keepclient.InsufficientReplicasError)
- log.Print("PutB")
- }
-
- log.Print("TestPutDisabled done")
+ hash2, rep, err := kc.PutB([]byte("quux"))
+ c.Check(hash2, Equals, "")
+ c.Check(rep, Equals, 0)
+ c.Check(err, Equals, keepclient.InsufficientReplicasError)
}
func (s *ServerRequiredSuite) TestCorsHeaders(c *C) {
- runProxy(c, []string{"keepproxy"}, 29954, false)
- waitForListener()
+ runProxy(c, nil, false)
defer closeListener()
{
client := http.Client{}
req, err := http.NewRequest("OPTIONS",
- fmt.Sprintf("http://localhost:29954/%x+3",
- md5.Sum([]byte("foo"))),
+ fmt.Sprintf("http://%s/%x+3", listener.Addr().String(), md5.Sum([]byte("foo"))),
nil)
req.Header.Add("Access-Control-Request-Method", "PUT")
req.Header.Add("Access-Control-Request-Headers", "Authorization, X-Keep-Desired-Replicas")
{
resp, err := http.Get(
- fmt.Sprintf("http://localhost:29954/%x+3",
- md5.Sum([]byte("foo"))))
+ fmt.Sprintf("http://%s/%x+3", listener.Addr().String(), md5.Sum([]byte("foo"))))
c.Check(err, Equals, nil)
c.Check(resp.Header.Get("Access-Control-Allow-Headers"), Equals, "Authorization, Content-Length, Content-Type, X-Keep-Desired-Replicas")
c.Check(resp.Header.Get("Access-Control-Allow-Origin"), Equals, "*")
}
func (s *ServerRequiredSuite) TestPostWithoutHash(c *C) {
- runProxy(c, []string{"keepproxy"}, 29955, false)
- waitForListener()
+ runProxy(c, nil, false)
defer closeListener()
{
client := http.Client{}
req, err := http.NewRequest("POST",
- "http://localhost:29955/",
+ "http://"+listener.Addr().String()+"/",
strings.NewReader("qux"))
req.Header.Add("Authorization", "OAuth2 4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h")
req.Header.Add("Content-Type", "application/octet-stream")
// With a valid but non-existing prefix (expect "\n")
// With an invalid prefix (expect error)
func (s *ServerRequiredSuite) TestGetIndex(c *C) {
- kc := runProxy(c, []string{"keepproxy"}, 28852, false)
- waitForListener()
+ kc := runProxy(c, nil, false)
defer closeListener()
// Put "index-data" blocks
_, rep, err = kc.PutB([]byte("some-more-index-data"))
c.Check(err, Equals, nil)
+ kc.Arvados.ApiToken = arvadostest.DataManagerToken
+
// Invoke GetIndex
for _, spec := range []struct {
prefix string
{hash[:3], true, false}, // with matching prefix
{"abcdef", false, false}, // with no such prefix
} {
- indexReader, err := kc.GetIndex("proxy", spec.prefix)
+ indexReader, err := kc.GetIndex(TestProxyUUID, spec.prefix)
c.Assert(err, Equals, nil)
indexResp, err := ioutil.ReadAll(indexReader)
c.Assert(err, Equals, nil)
}
// GetIndex with invalid prefix
- _, err = kc.GetIndex("proxy", "xyz")
+ _, err = kc.GetIndex(TestProxyUUID, "xyz")
c.Assert((err != nil), Equals, true)
}
func (s *ServerRequiredSuite) TestPutAskGetInvalidToken(c *C) {
- kc := runProxy(c, []string{"keepproxy"}, 28852, false)
- waitForListener()
+ kc := runProxy(c, nil, false)
defer closeListener()
// Put a test block
// keepclient with no such keep server
kc := keepclient.New(&arv)
locals := map[string]string{
- "proxy": "http://localhost:12345",
+ TestProxyUUID: "http://localhost:12345",
}
kc.SetServiceRoots(locals, nil, nil)
}
func (s *NoKeepServerSuite) TestAskGetNoKeepServerError(c *C) {
- kc := runProxy(c, []string{"keepproxy"}, 29999, false)
- waitForListener()
+ kc := runProxy(c, nil, false)
defer closeListener()
- // Ask should result in temporary connection refused error
hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
- _, _, err := kc.Ask(hash)
- c.Check(err, NotNil)
- errNotFound, _ := err.(*keepclient.ErrNotFound)
- c.Check(errNotFound.Temporary(), Equals, true)
- c.Assert(strings.Contains(err.Error(), "HTTP 502"), Equals, true)
-
- // Get should result in temporary connection refused error
- _, _, _, err = kc.Get(hash)
- c.Check(err, NotNil)
- errNotFound, _ = err.(*keepclient.ErrNotFound)
- c.Check(errNotFound.Temporary(), Equals, true)
- c.Assert(strings.Contains(err.Error(), "HTTP 502"), Equals, true)
+ for _, f := range []func()error {
+ func() error {
+ _, _, err := kc.Ask(hash)
+ return err
+ },
+ func() error {
+ _, _, _, err := kc.Get(hash)
+ return err
+ },
+ } {
+ err := f()
+ c.Assert(err, NotNil)
+ errNotFound, _ := err.(*keepclient.ErrNotFound)
+ c.Check(errNotFound.Temporary(), Equals, true)
+ c.Check(err, ErrorMatches, `.*HTTP 502.*`)
+ }
}
}
pullRequest := SetupPullWorkerIntegrationTest(t, testData, false)
+ defer arvadostest.StopAPI()
+ defer arvadostest.StopKeep(2)
performPullWorkerIntegrationTest(testData, pullRequest, t)
}
}
pullRequest := SetupPullWorkerIntegrationTest(t, testData, true)
+ defer arvadostest.StopAPI()
+ defer arvadostest.StopKeep(2)
performPullWorkerIntegrationTest(testData, pullRequest, t)
}
// srcConfig
var srcConfig apiConfig
srcConfig.APIHost = os.Getenv("ARVADOS_API_HOST")
- srcConfig.APIToken = os.Getenv("ARVADOS_API_TOKEN")
+ srcConfig.APIToken = arvadostest.DataManagerToken
srcConfig.APIHostInsecure = matchTrue.MatchString(os.Getenv("ARVADOS_API_HOST_INSECURE"))
// dstConfig
var dstConfig apiConfig
dstConfig.APIHost = os.Getenv("ARVADOS_API_HOST")
- dstConfig.APIToken = os.Getenv("ARVADOS_API_TOKEN")
+ dstConfig.APIToken = arvadostest.DataManagerToken
dstConfig.APIHostInsecure = matchTrue.MatchString(os.Getenv("ARVADOS_API_HOST_INSECURE"))
if enforcePermissions {
c.Check(err, IsNil)
c.Assert(srcConfig.APIHost, Equals, os.Getenv("ARVADOS_API_HOST"))
- c.Assert(srcConfig.APIToken, Equals, os.Getenv("ARVADOS_API_TOKEN"))
+ c.Assert(srcConfig.APIToken, Equals, arvadostest.DataManagerToken)
c.Assert(srcConfig.APIHostInsecure, Equals, matchTrue.MatchString(os.Getenv("ARVADOS_API_HOST_INSECURE")))
c.Assert(srcConfig.ExternalClient, Equals, false)
c.Check(err, IsNil)
c.Assert(dstConfig.APIHost, Equals, os.Getenv("ARVADOS_API_HOST"))
- c.Assert(dstConfig.APIToken, Equals, os.Getenv("ARVADOS_API_TOKEN"))
+ c.Assert(dstConfig.APIToken, Equals, arvadostest.DataManagerToken)
c.Assert(dstConfig.APIHostInsecure, Equals, matchTrue.MatchString(os.Getenv("ARVADOS_API_HOST_INSECURE")))
c.Assert(dstConfig.ExternalClient, Equals, false)
c.Check(err, IsNil)
fileContent := "ARVADOS_API_HOST=" + os.Getenv("ARVADOS_API_HOST") + "\n"
- fileContent += "ARVADOS_API_TOKEN=" + os.Getenv("ARVADOS_API_TOKEN") + "\n"
+ fileContent += "ARVADOS_API_TOKEN=" + arvadostest.DataManagerToken + "\n"
fileContent += "ARVADOS_API_HOST_INSECURE=" + os.Getenv("ARVADOS_API_HOST_INSECURE") + "\n"
fileContent += "ARVADOS_EXTERNAL_CLIENT=false\n"
fileContent += "ARVADOS_BLOB_SIGNING_KEY=abcdefg"
args := []string{"-src", srcConfig.Name(), "-dst", dstConfig.Name()}
os.Args = append(os.Args, args...)
- // Start keepservers. Since we are not doing any tweaking as in setupRsync func,
- // kcSrc and kcDst will be the same and no actual copying to dst will happen, but that's ok.
+ // Start keepservers. Since we are not doing any tweaking as
+ // in setupRsync func, kcSrc and kcDst will be the same and no
+ // actual copying to dst will happen, but that's ok.
arvadostest.StartKeep(2, false)
+ defer arvadostest.StopKeep(2)
err := doMain()
c.Check(err, IsNil)