services/api/config/arvados-clients.yml
*#*
.DS_Store
+.vscode
end
def show_file_links
- if Rails.configuration.keep_web_url || Rails.configuration.keep_web_download_url
- # show_file will redirect to keep-web's directory listing
- return show_file
- end
- Thread.current[:reader_tokens] = [params[:reader_token]]
- return if false.equal?(find_object_by_uuid)
- render layout: false
+ return show_file
end
def show_file
- # We pipe from arv-get to send the file to the user. Before we start it,
- # we ask the API server if the file actually exists. This serves two
- # purposes: it lets us return a useful status code for common errors, and
- # helps us figure out which token to provide to arv-get.
# The order of searched tokens is important: because the anonymous user
# token is passed along with every API request, we have to check it first.
# Otherwise, it's impossible to know whether any other request succeeded
return
end
- # If we are configured to use a keep-web server, just redirect to
- # the appropriate URL.
- if Rails.configuration.keep_web_url or
- Rails.configuration.keep_web_download_url
- opts = {}
- if usable_token == params[:reader_token]
- opts[:path_token] = usable_token
- elsif usable_token == Rails.configuration.anonymous_user_token
- # Don't pass a token at all
- else
- # We pass the current user's real token only if it's necessary
- # to read the collection.
- opts[:query_token] = usable_token
- end
- opts[:disposition] = params[:disposition] if params[:disposition]
- return redirect_to keep_web_url(params[:uuid], params[:file], opts)
- end
-
- # No keep-web server available. Get the file data with arv-get,
- # and serve it through Rails.
-
- file_name = params[:file].andand.sub(/^(\.\/|\/|)/, './')
- if file_name.nil? or not coll.manifest.has_file?(file_name)
- return render_not_found
- end
-
- opts = params.merge(arvados_api_token: usable_token)
-
- # Handle Range requests. Currently we support only 'bytes=0-....'
- if request.headers.include? 'HTTP_RANGE'
- if m = /^bytes=0-(\d+)/.match(request.headers['HTTP_RANGE'])
- opts[:maxbytes] = m[1]
- size = params[:size] || '*'
- self.response.status = 206
- self.response.headers['Content-Range'] = "bytes 0-#{m[1]}/#{size}"
- end
- end
-
- ext = File.extname(params[:file])
- self.response.headers['Content-Type'] =
- Rack::Mime::MIME_TYPES[ext] || 'application/octet-stream'
- if params[:size]
- size = params[:size].to_i
- if opts[:maxbytes]
- size = [size, opts[:maxbytes].to_i].min
- end
- self.response.headers['Content-Length'] = size.to_s
- end
- self.response.headers['Content-Disposition'] = params[:disposition] if params[:disposition]
- begin
- file_enumerator(opts).each do |bytes|
- response.stream.write bytes
- end
- ensure
- response.stream.close
+ opts = {}
+ if usable_token == params[:reader_token]
+ opts[:path_token] = usable_token
+ elsif usable_token == Rails.configuration.anonymous_user_token
+ # Don't pass a token at all
+ else
+ # We pass the current user's real token only if it's necessary
+ # to read the collection.
+ opts[:query_token] = usable_token
end
+ opts[:disposition] = params[:disposition] if params[:disposition]
+ return redirect_to keep_web_url(params[:uuid], params[:file], opts)
end
def sharing_scopes
def download_link
token = @search_sharing.first.api_token
- if Rails.configuration.keep_web_url || Rails.configuration.keep_web_download_url
- keep_web_url(@object.uuid, nil, {path_token: token})
- else
- collections_url + "/download/#{@object.uuid}/#{token}/"
- end
+ keep_web_url(@object.uuid, nil, {path_token: token})
end
def share
uri.to_s
end
-
- # Note: several controller and integration tests rely on stubbing
- # file_enumerator to return fake file content.
- def file_enumerator opts
- FileStreamer.new opts
- end
-
- class FileStreamer
- include ArvadosApiClientHelper
- def initialize(opts={})
- @opts = opts
- end
- def each
- return unless @opts[:uuid] && @opts[:file]
-
- env = Hash[ENV].dup
-
- require 'uri'
- u = URI.parse(arvados_api_client.arvados_v1_base)
- env['ARVADOS_API_HOST'] = "#{u.host}:#{u.port}"
- env['ARVADOS_API_TOKEN'] = @opts[:arvados_api_token]
- env['ARVADOS_API_HOST_INSECURE'] = "true" if Rails.configuration.arvados_insecure_https
-
- bytesleft = @opts[:maxbytes].andand.to_i || 2**16
- io = IO.popen([env, 'arv-get', "#{@opts[:uuid]}/#{@opts[:file]}"], 'rb')
- while bytesleft > 0 && (buf = io.read([bytesleft, 2**16].min)) != nil
- # shrink the bytesleft count, if we were given a maximum byte
- # count to read
- if @opts.include? :maxbytes
- bytesleft = bytesleft - buf.length
- end
- yield buf
- end
- io.close
- # "If ios is opened by IO.popen, close sets $?."
- # http://www.ruby-doc.org/core-2.1.3/IO.html#method-i-close
- Rails.logger.warn("#{@opts[:uuid]}/#{@opts[:file]}: #{$?}") if $? != 0
- end
- end
end
$("#log-viewer-download-pane").show();
var headers = {};
if (log_size > log_maxbytes) {
- headers['Range'] = 'bytes=0-' + log_maxbytes;
+ headers['Range'] = 'bytes=0-' + (log_maxbytes - 1);
}
var ajax_opts = { dataType: 'text', headers: headers };
load_log();
profiling_enabled: true
secret_token: <%= rand(2**256).to_s(36) %>
secret_key_base: <%= rand(2**256).to_s(36) %>
+ # This setting is to allow workbench start when running tests, it should be
+ # set to a correct value when testing relevant features.
+ keep_web_url: http://example.com/c=%{uuid_or_pdh}
# When you run the Workbench's integration tests, it starts the API
# server as a dependency. These settings should match the API
shell_in_a_box_url: false
# Format of preview links. If false, use keep_web_download_url
- # instead, and disable inline preview. If both are false, use
- # Workbench's built-in file download/preview mechanism.
+ # instead, and disable inline preview.
+ # If both are false, Workbench won't start, this is a mandatory configuration.
#
# Examples:
# keep_web_url: https://%{uuid_or_pdh}.collections.uuid_prefix.arvadosapi.com
arvados_v1_base: https://arvados.local:3030/arvados/v1
arvados_insecure_https: true
+ # You need to configure at least one of these:
+ keep_web_url: false
+ keep_web_download_url: false
+
production:
# At minimum, you need a nice long randomly generated secret_token here.
secret_token: ~
arvados_login_base: https://arvados.local:3030/login
arvados_v1_base: https://arvados.local:3030/arvados/v1
arvados_insecure_https: false
+
+ # You need to configure at least one of these:
+ keep_web_url: false
+ keep_web_download_url: false
cfg.send "#{k}=", v
end
end
- if !nils.empty?
+ if !nils.empty? and not ::Rails.groups.include?('assets')
raise <<EOS
+#{::Rails.groups.include?('assets')}
Refusing to start in #{::Rails.env.to_s} mode with missing configuration.
The following configuration settings must be specified in
config/application.yml:
* #{nils.join "\n* "}
+EOS
+ end
+ # Refuse to start if keep-web isn't configured
+ if not (config.keep_web_url or config.keep_web_download_url) and not ::Rails.groups.include?('assets')
+ raise <<EOS
+Refusing to start in #{::Rails.env.to_s} mode with missing configuration.
+
+Keep-web service must be configured in config/application.yml:
+* keep_web_url
+* keep_web_download_url
+
EOS
end
end
end
end
- def stub_file_content
- # For the duration of the current test case, stub file download
- # content with a randomized (but recognizable) string. Return the
- # string, the test case can use it in assertions.
- txt = 'the quick brown fox ' + rand(2**32).to_s
- @controller.stubs(:file_enumerator).returns([txt])
- txt
- end
-
def collection_params(collection_name, file_name=nil)
uuid = api_fixture('collections')[collection_name.to_s]['uuid']
params = {uuid: uuid, id: uuid}
end
test "download a file with spaces in filename" do
+ setup_for_keep_web
collection = api_fixture('collections')['w_a_z_file']
- fakepipe = IO.popen(['echo', '-n', 'w a z'], 'rb')
- IO.expects(:popen).with { |cmd, mode|
- cmd.include? "#{collection['uuid']}/w a z"
- }.returns(fakepipe)
get :show_file, {
uuid: collection['uuid'],
file: 'w a z'
}, session_for(:active)
- assert_response :success
- assert_equal 'w a z', response.body
+ assert_response :redirect
+ assert_match /w%20a%20z/, response.redirect_url
end
test "viewing a collection fetches related projects" do
params[:reader_token] = api_fixture("api_client_authorizations",
"active_all_collections", "api_token")
get(:show_file_links, params)
- assert_response :success
- assert_equal([['.', 'foo', 3]], assigns(:object).files)
+ assert_response :redirect
assert_no_session
end
test "fetching collection file with reader token" do
- expected = stub_file_content
+ setup_for_keep_web
params = collection_params(:foo_file, "foo")
params[:reader_token] = api_fixture("api_client_authorizations",
"active_all_collections", "api_token")
get(:show_file, params)
- assert_response :success
- assert_equal(expected, @response.body,
- "failed to fetch a Collection file with a reader token")
+ assert_response :redirect
+ assert_match /foo/, response.redirect_url
assert_no_session
end
end
test "getting a file from Keep" do
+ setup_for_keep_web
params = collection_params(:foo_file, 'foo')
sess = session_for(:active)
- expect_content = stub_file_content
get(:show_file, params, sess)
- assert_response :success
- assert_equal(expect_content, @response.body,
- "failed to get a correct file from Keep")
+ assert_response :redirect
+ assert_match /foo/, response.redirect_url
end
test 'anonymous download' do
+ setup_for_keep_web
config_anonymous true
- expect_content = stub_file_content
get :show_file, {
uuid: api_fixture('collections')['user_agreement_in_anonymously_accessible_project']['uuid'],
file: 'GNU_General_Public_License,_version_3.pdf',
}
- assert_response :success
- assert_equal expect_content, response.body
+ assert_response :redirect
+ assert_match /GNU_General_Public_License/, response.redirect_url
end
test "can't get a file from Keep without permission" do
assert_response 404
end
- test "trying to get a nonexistent file from Keep returns a 404" do
- params = collection_params(:foo_file, 'gone')
- sess = session_for(:admin)
- get(:show_file, params, sess)
- assert_response 404
- end
-
test "getting a file from Keep with a good reader token" do
+ setup_for_keep_web
params = collection_params(:foo_file, 'foo')
read_token = api_fixture('api_client_authorizations')['active']['api_token']
params[:reader_token] = read_token
- expect_content = stub_file_content
get(:show_file, params)
- assert_response :success
- assert_equal(expect_content, @response.body,
- "failed to get a correct file from Keep using a reader token")
+ assert_response :redirect
+ assert_match /foo/, response.redirect_url
assert_not_equal(read_token, session[:arvados_api_token],
"using a reader token set the session's API token")
end
end
test "can get a file with an unpermissioned auth but in-scope reader token" do
+ setup_for_keep_web
params = collection_params(:foo_file, 'foo')
sess = session_for(:expired)
read_token = api_fixture('api_client_authorizations')['active']['api_token']
params[:reader_token] = read_token
- expect_content = stub_file_content
get(:show_file, params, sess)
- assert_response :success
- assert_equal(expect_content, @response.body,
- "failed to get a correct file from Keep using a reader token")
+ assert_response :redirect
assert_not_equal(read_token, session[:arvados_api_token],
"using a reader token set the session's API token")
end
test "inactive user can retrieve user agreement" do
+ setup_for_keep_web
ua_collection = api_fixture('collections')['user_agreement']
# Here we don't test whether the agreement can be retrieved from
- # Keep. We only test that show_file decides to send file content,
- # so we use the file content stub.
- stub_file_content
+ # Keep. We only test that show_file decides to send file content.
get :show_file, {
uuid: ua_collection['uuid'],
file: ua_collection['manifest_text'].match(/ \d+:\d+:(\S+)/)[1]
assert_nil(assigns(:unsigned_user_agreements),
"Did not skip check_user_agreements filter " +
"when showing the user agreement.")
- assert_response :success
+ assert_response :redirect
end
test "requesting nonexistent Collection returns 404" do
:active, 404)
end
- test "use a reasonable read buffer even if client requests a huge range" do
- fakefiledata = mock
- IO.expects(:popen).returns(fakefiledata)
- fakefiledata.expects(:read).twice.with() do |length|
- # Fail the test if read() is called with length>1MiB:
- length < 2**20
- ## Force the ActionController::Live thread to lose the race to
- ## verify that @response.body.length actually waits for the
- ## response (see below):
- # sleep 3
- end.returns("foo\n", nil)
- fakefiledata.expects(:close)
- foo_file = api_fixture('collections')['foo_file']
- @request.headers['Range'] = 'bytes=0-4294967296/*'
- get :show_file, {
- uuid: foo_file['uuid'],
- file: foo_file['manifest_text'].match(/ \d+:\d+:(\S+)/)[1]
- }, session_for(:active)
- # Wait for the whole response to arrive before deciding whether
- # mocks' expectations were met. Otherwise, Mocha will fail the
- # test depending on how slowly the ActionController::Live thread
- # runs.
- @response.body.length
- end
-
test "show file in a subdirectory of a collection" do
+ setup_for_keep_web
params = collection_params(:collection_with_files_in_subdir, 'subdir2/subdir3/subdir4/file1_in_subdir4.txt')
- expect_content = stub_file_content
get(:show_file, params, session_for(:user1_with_load))
- assert_response :success
- assert_equal(expect_content, @response.body, "failed to get a correct file from Keep")
+ assert_response :redirect
+ assert_match /subdir2\/subdir3\/subdir4\/file1_in_subdir4\.txt/, response.redirect_url
end
test 'provenance graph' do
def setup_for_keep_web cfg='https://%{uuid_or_pdh}.example', dl_cfg=false
Rails.configuration.keep_web_url = cfg
Rails.configuration.keep_web_download_url = dl_cfg
- @controller.expects(:file_enumerator).never
end
%w(uuid portable_data_hash).each do |id_type|
require 'integration_helper'
class AnonymousAccessTest < ActionDispatch::IntegrationTest
+ include KeepWebConfig
+
# These tests don't do state-changing API calls. Save some time by
# skipping the database reset.
reset_api_fixtures :after_each_test, false
end
test 'view file' do
+ use_keep_web_config
+
magic = rand(2**512).to_s 36
- CollectionsController.any_instance.stubs(:file_enumerator).returns([magic])
- collection = api_fixture('collections')['public_text_file']
- visit '/collections/' + collection['uuid']
+ owner = api_fixture('groups')['anonymously_accessible_project']['uuid']
+ col = upload_data_and_get_collection(magic, 'admin', "Hello\\040world.txt", owner)
+ visit '/collections/' + col.uuid
find('tr,li', text: 'Hello world.txt').
find('a[title~=View]').click
assert_text magic
require_relative 'integration_test_utils'
class CollectionsTest < ActionDispatch::IntegrationTest
+ include KeepWebConfig
+
setup do
need_javascript
end
test "creating and uncreating a sharing link" do
coll_uuid = api_fixture("collections", "collection_owned_by_active", "uuid")
download_link_re =
- Regexp.new(Regexp.escape("/collections/download/#{coll_uuid}/"))
+ Regexp.new(Regexp.escape("/c=#{coll_uuid}/"))
visit page_with_token("active_trustedclient", "/collections/#{coll_uuid}")
within "#sharing-button" do
check_sharing(:on, download_link_re)
end
test "can download an entire collection with a reader token" do
- Capybara.current_driver = :rack_test
- CollectionsController.any_instance.
- stubs(:file_enumerator).returns(["foo\n", "file\n"])
- uuid = api_fixture('collections')['foo_file']['uuid']
+ use_keep_web_config
+
+ token = api_fixture('api_client_authorizations')['active']['api_token']
+ data = "foo\nfile\n"
+ datablock = `echo -n #{data.shellescape} | ARVADOS_API_TOKEN=#{token.shellescape} arv-put --no-progress --raw -`.strip
+ assert $?.success?, $?
+
+ col = nil
+ use_token 'active' do
+ mtxt = ". #{datablock} 0:#{data.length}:foo\n"
+ col = Collection.create(manifest_text: mtxt)
+ end
+
+ uuid = col.uuid
token = api_fixture('api_client_authorizations')['active_all_collections']['api_token']
url_head = "/collections/download/#{uuid}/#{token}/"
visit url_head
end
assert_equal(['foo'], hrefs.compact.sort,
"download page did provide strictly file links")
- within "#collection_files" do
- click_link "foo"
- assert_equal("foo\nfile\n", page.html)
- end
+ click_link "foo"
+ assert_text "foo\nfile\n"
end
test "combine selected collections into new collection" do
assert_selector 'a[href="/"]', text: 'Go to dashboard'
end
- test "view job log" do
- job = api_fixture('jobs')['job_with_real_log']
-
- IO.expects(:popen).returns(fakepipe_with_log_data)
-
- visit page_with_token("active", "/jobs/#{job['uuid']}")
- assert page.has_text? job['script_version']
-
- find(:xpath, "//a[@href='#Log']").click
- wait_for_ajax
- assert page.has_text? 'Started at'
- assert page.has_text? 'Finished at'
- assert page.has_text? 'log message 1'
- assert page.has_text? 'log message 2'
- assert page.has_text? 'log message 3'
- assert page.has_no_text? 'Showing only 100 bytes of this log'
- end
-
test 'view partial job log' do
+ need_selenium 'to be able to see the CORS response headers (PhantomJS 1.9.8 does not)'
+ use_keep_web_config
+
# This config will be restored during teardown by ../test_helper.rb:
Rails.configuration.log_viewer_max_bytes = 100
- IO.expects(:popen).returns(fakepipe_with_log_data)
- job = api_fixture('jobs')['job_with_real_log']
-
- visit page_with_token("active", "/jobs/#{job['uuid']}")
- assert page.has_text? job['script_version']
-
- find(:xpath, "//a[@href='#Log']").click
+ logdata = fakepipe_with_log_data.read
+ job_uuid = api_fixture('jobs')['running']['uuid']
+ logcollection = upload_data_and_get_collection(logdata, 'active', "#{job_uuid}.log.txt")
+ job = nil
+ use_token 'active' do
+ job = Job.find job_uuid
+ job.update_attributes log: logcollection.portable_data_hash
+ end
+ visit page_with_token 'active', '/jobs/'+job.uuid
+ find('a[href="#Log"]').click
wait_for_ajax
- assert page.has_text? 'Showing only 100 bytes of this log'
+ assert_text 'Showing only 100 bytes of this log'
end
test 'view log via keep-web redirect' do
@kwdport = getport 'keep-web-dl-ssl'
Rails.configuration.keep_web_url = "https://localhost:#{@kwport}/c=%{uuid_or_pdh}"
Rails.configuration.keep_web_download_url = "https://localhost:#{@kwdport}/c=%{uuid_or_pdh}"
- CollectionsController.any_instance.expects(:file_enumerator).never
end
end
end
end
end
+
+def upload_data_and_get_collection(data, user, filename, owner_uuid=nil)
+ token = api_fixture('api_client_authorizations')[user]['api_token']
+ datablock = `echo -n #{data.shellescape} | ARVADOS_API_TOKEN=#{token.shellescape} arv-put --no-progress --raw -`.strip
+ assert $?.success?, $?
+ col = nil
+ use_token user do
+ mtxt = ". #{datablock} 0:#{data.length}:#{filename}\n"
+ if owner_uuid
+ col = Collection.create(manifest_text: mtxt, owner_uuid: owner_uuid)
+ else
+ col = Collection.create(manifest_text: mtxt)
+ end
+ end
+ return col
+end
debian8,debian9,ubuntu1204,centos7|six|1.10.0|2|python3|all
debian8,debian9,ubuntu1204,ubuntu1404,centos7|requests|2.12.4|2|python3|all
debian8,debian9,ubuntu1204,ubuntu1404,ubuntu1604,centos7|websocket-client|0.37.0|2|python3|all
-ubuntu1204|requests|2.12.4|2|python|all
+ubuntu1204,ubuntu1404|requests|2.4.3|2|python|all
ubuntu1204,centos7|contextlib2|0.5.4|2|python|all
ubuntu1204,centos7|isodate|0.5.4|2|python|all
centos7|daemon|2.1.1|2|python|all
# clean up the docker build environment
cd "$WORKSPACE"
+title "Starting arvbox build localdemo"
+
tools/arvbox/bin/arvbox build localdemo
ECODE=$?
EXITCODE=$(($EXITCODE + $ECODE))
fi
+title "Starting arvbox build dev"
+
tools/arvbox/bin/arvbox build dev
ECODE=$?
\cp config/application.yml.example config/application.yml -f
\cp config/environments/production.rb.example config/environments/production.rb -f
sed -i 's/secret_token: ~/secret_token: xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx/' config/application.yml
+ sed -i 's/keep_web_url: false/keep_web_url: exampledotcom/' config/application.yml
RAILS_ENV=production RAILS_GROUPS=assets bundle exec rake npm:install >/dev/null
RAILS_ENV=production RAILS_GROUPS=assets bundle exec rake assets:precompile >/dev/null
services/keep-balance
services/login-sync
services/nodemanager
-services/nodemanager-integration
+services/nodemanager_integration
services/crunch-run
services/crunch-dispatch-local
services/crunch-dispatch-slurm
apps/workbench_units | apps/workbench_functionals | apps/workbench_integration)
suite=apps/workbench
;;
+ services/nodemanager | services/nodemanager_integration)
+ suite=services/nodemanager_suite
+ ;;
*)
suite="${1}"
;;
}
do_test services/login-sync login-sync
-test_nodemanager-integration() {
+test_nodemanager_integration() {
cd "$WORKSPACE/services/nodemanager" \
- && tests/integration_test.py ${testargs[services/nodemanager-integration]}
+ && tests/integration_test.py ${testargs[services/nodemanager_integration]}
}
-do_test services/nodemanager-integration nodemanager-integration
+do_test services/nodemanager_integration nodemanager_integration
for p in "${pythonstuff[@]}"
do
delete $Jobstep->{tempfail};
$Jobstep->{'arvados_task'}->{started_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{starttime});
- $Jobstep->{'arvados_task'}->save;
+ retry_op(sub { $Jobstep->{'arvados_task'}->save; }, "job_tasks.update API");
splice @jobstep_todo, $todo_ptr, 1;
--$todo_ptr;
"ERROR: Task process exited %s, but never updated its task record to indicate success and record its output.",
exit_status_s($childstatus)));
$Jobstep->{'arvados_task'}->{success} = 0;
- $Jobstep->{'arvados_task'}->save;
+ retry_op(sub { $Jobstep->{'arvados_task'}->save; }, "job_tasks.update API");
$task_success = 0;
}
$Jobstep->{exitcode} = $childstatus;
$Jobstep->{finishtime} = time;
$Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
- $Jobstep->{'arvados_task'}->save;
+ retry_op(sub { $Jobstep->{'arvados_task'}->save; }, "job_tasks.update API");
process_stderr_final ($jobstepidx);
Log ($jobstepidx, sprintf("task output (%d bytes): %s",
length($Jobstep->{'arvados_task'}->{output}),
$st->{node}->{fail_count}++;
}
}
- elsif ($line =~ /srun: error: .*?\b(Node failure on|Aborting, .*?\bio error\b)/i) {
+ elsif ($line =~ /srun: error: .*?\b(Node failure on|Aborting, .*?\bio error\b|cannot communicate with node .* aborting job)/i) {
$jobstep[$jobstepidx]->{tempfail} = 1;
if (defined($job_slot_index)) {
$slot[$job_slot_index]->{node}->{fail_count}++;
# that can be retried, the second function will be called with
# the current try count (0-based), next try time, and error message.
my $operation = shift;
- my $retry_callback = shift;
+ my $op_text = shift;
my $retries = retry_count();
+ my $retry_callback = sub {
+ my ($try_count, $next_try_at, $errmsg) = @_;
+ $errmsg =~ s/\s*\bat \Q$0\E line \d+\.?\s*//;
+ $errmsg =~ s/\s/ /g;
+ $errmsg =~ s/\s+$//;
+ my $retry_msg;
+ if ($next_try_at < time) {
+ $retry_msg = "Retrying.";
+ } else {
+ my $next_try_fmt = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($next_try_at);
+ $retry_msg = "Retrying at $next_try_fmt.";
+ }
+ Log(undef, "$op_text failed: $errmsg. $retry_msg");
+ };
foreach my $try_count (0..$retries) {
my $next_try = time + (2 ** $try_count);
my $result = eval { $operation->(@_); };
# This function will call that method, retrying as needed until
# the current retry_count is exhausted, with a log on the first failure.
my $method_name = shift;
- my $log_api_retry = sub {
- my ($try_count, $next_try_at, $errmsg) = @_;
- $errmsg =~ s/\s*\bat \Q$0\E line \d+\.?\s*//;
- $errmsg =~ s/\s/ /g;
- $errmsg =~ s/\s+$//;
- my $retry_msg;
- if ($next_try_at < time) {
- $retry_msg = "Retrying.";
- } else {
- my $next_try_fmt = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($next_try_at);
- $retry_msg = "Retrying at $next_try_fmt.";
- }
- Log(undef, "API method $method_name failed: $errmsg. $retry_msg");
- };
my $method = $arv;
foreach my $key (split(/\//, $method_name)) {
$method = $method->{$key};
}
- return retry_op(sub { $method->execute(@_); }, $log_api_retry, @_);
+ return retry_op(sub { $method->execute(@_); }, "API method $method_name", @_);
}
sub exit_status_s {
if self.arvrunner.trash_intermediate:
command.append("--trash-intermediate")
+ if self.arvrunner.project_uuid:
+ command.append("--project-uuid="+self.arvrunner.project_uuid)
+
command.extend([workflowpath, "/var/lib/cwl/cwl.input.json"])
container_req["command"] = command
self.arvrunner = arvrunner
self.work_api = kwargs["work_api"]
- def makeJobRunner(self, use_container=True):
+ def makeJobRunner(self, **kwargs):
if self.work_api == "containers":
return ArvadosContainer(self.arvrunner)
elif self.work_api == "jobs":
# Note that arvados/build/run-build-packages.sh looks at this
# file to determine what version of cwltool and schema-salad to build.
install_requires=[
- 'cwltool==1.0.20170727112954',
+ 'cwltool==1.0.20170811195303',
'schema-salad==2.6.20170712194300',
'typing==3.5.3.0',
'ruamel.yaml==0.13.7',
stubs.expect_container_request_uuid + '\n')
+ @stubs
+ def test_submit_container_project(self, stubs):
+ project_uuid = 'zzzzz-j7d0g-zzzzzzzzzzzzzzz'
+ capture_stdout = cStringIO.StringIO()
+ try:
+ exited = arvados_cwl.main(
+ ["--submit", "--no-wait", "--api=containers", "--debug", "--project-uuid="+project_uuid,
+ "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
+ capture_stdout, sys.stderr, api_client=stubs.api, keep_client=stubs.keep_client)
+ self.assertEqual(exited, 0)
+ except:
+ logging.exception("")
+
+ expect_container = copy.deepcopy(stubs.expect_container_spec)
+ expect_container["owner_uuid"] = project_uuid
+ expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
+ '--enable-reuse', '--on-error=continue', '--project-uuid='+project_uuid,
+ '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
+
+ stubs.api.container_requests().create.assert_called_with(
+ body=JsonDiffMatcher(expect_container))
+ self.assertEqual(capture_stdout.getvalue(),
+ stubs.expect_container_request_uuid + '\n')
+
+
@stubs
def test_submit_job_runner_image(self, stubs):
capture_stdout = cStringIO.StringIO()
"$graph": [
{
"class": "Workflow",
+ "cwlVersion": "v1.0",
"hints": [],
"id": "#main",
"inputs": [
Path string `json:"path"`
Content interface{} `json:"content"`
ExcludeFromOutput bool `json:"exclude_from_output"`
- Capacity int64 `json:capacity`
+ Capacity int64 `json:"capacity"`
}
// RuntimeConstraints specify a container's compute resources (RAM,
}
func (e TransactionError) Error() (s string) {
- s = fmt.Sprintf("request failed: %s", e.URL)
+ s = fmt.Sprintf("request failed: %s", e.URL.String())
if e.Status != "" {
s = s + ": " + e.Status
}
Routes Routes
// If non-nil, Log is called after handling each request. The
- // error argument is nil if the request was succesfully
+ // error argument is nil if the request was successfully
// authenticated and served, even if the health check itself
// failed.
Log func(*http.Request, error)
$self->{'req'} = new HTTP::Request (%req);
$self->{'req'}->header('Authorization' => ('OAuth2 ' . $self->{'authToken'})) if $self->{'authToken'};
$self->{'req'}->header('Accept' => 'application/json');
+
+ # allow_nonref lets us encode JSON::true and JSON::false, see #12078
+ my $json = JSON->new->allow_nonref;
my ($p, $v);
while (($p, $v) = each %{$self->{'queryParams'}}) {
- $content{$p} = (ref($v) eq "") ? $v : JSON::encode_json($v);
+ $content{$p} = (ref($v) eq "") ? $v : $json->encode($v);
}
my $content;
while (($p, $v) = each %content) {
elif file_in_local_collection.permission_expired():
# Permission token expired, re-upload file. This will change whenever
# we have a API for refreshing tokens.
+ self.logger.warning("Uploaded file '{}' access token expired, will re-upload it from scratch".format(filename))
should_upload = True
self._local_collection.remove(filename)
elif cached_file_data['size'] == file_in_local_collection.size():
from builtins import str
from builtins import range
import apiclient
+import datetime
+import hashlib
+import json
import mock
import os
import pwd
+import random
import re
import shutil
import subprocess
import sys
import tempfile
+import threading
import time
import unittest
-import yaml
-import threading
-import hashlib
-import random
import uuid
+import yaml
import arvados
import arvados.commands.put as arv_put
cls.ENVIRON = os.environ.copy()
cls.ENVIRON['PYTHONPATH'] = ':'.join(sys.path)
+ def datetime_to_hex(self, dt):
+ return hex(int(time.mktime(dt.timetuple())))[2:]
+
def setUp(self):
super(ArvPutIntegrationTest, self).setUp()
arv_put.api_client = None
self.assertEqual(1, len(collection_list))
return collection_list[0]
+ def test_expired_token_invalidates_cache(self):
+ self.authorize_with('active')
+ tmpdir = self.make_tmpdir()
+ with open(os.path.join(tmpdir, 'somefile.txt'), 'w') as f:
+ f.write('foo')
+ # Upload a directory and get the cache file name
+ p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ env=self.ENVIRON)
+ (out, err) = p.communicate()
+ self.assertRegex(err.decode(), r'INFO: Creating new cache file at ')
+ self.assertEqual(p.returncode, 0)
+ cache_filepath = re.search(r'INFO: Creating new cache file at (.*)',
+ err.decode()).groups()[0]
+ self.assertTrue(os.path.isfile(cache_filepath))
+ # Load the cache file contents and modify the manifest to simulate
+ # an expired access token
+ with open(cache_filepath, 'r') as c:
+ cache = json.load(c)
+ self.assertRegex(cache['manifest'], r'\+A\S+\@')
+ a_month_ago = datetime.datetime.now() - datetime.timedelta(days=30)
+ cache['manifest'] = re.sub(
+ r'\@.*? ',
+ "@{} ".format(self.datetime_to_hex(a_month_ago)),
+ cache['manifest'])
+ with open(cache_filepath, 'w') as c:
+ c.write(json.dumps(cache))
+ # Re-run the upload and expect to get an invalid cache message
+ p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ env=self.ENVIRON)
+ (out, err) = p.communicate()
+ self.assertRegex(
+ err.decode(),
+ r'WARNING: Uploaded file .* access token expired, will re-upload it from scratch')
+ self.assertEqual(p.returncode, 0)
+ # Confirm that the resulting cache is different from the last run.
+ with open(cache_filepath, 'r') as c2:
+ new_cache = json.load(c2)
+ self.assertNotEqual(cache['manifest'], new_cache['manifest'])
+
def test_put_collection_with_later_update(self):
tmpdir = self.make_tmpdir()
with open(os.path.join(tmpdir, 'file1'), 'w') as f:
end
def index
- if @select.nil? || @select.include?("id")
- @objects = @objects.uniq(&:id)
- end
if params[:eager] and params[:eager] != '0' and params[:eager] != 0 and params[:eager] != ''
@objects.each(&:eager_load_associations)
end
// SSL certificates. See
// http://www.w3.org/TR/cors/#user-credentials).
w.Header().Set("Access-Control-Allow-Origin", "*")
+ w.Header().Set("Access-Control-Expose-Headers", "Content-Range")
}
arv := h.clientPool.Get()
&s3UnsafeDelete,
"s3-unsafe-delete",
false,
- "EXPERIMENTAL. Enable deletion (garbage collection), even though there are known race conditions that can cause data loss.")
+ "EXPERIMENTAL. Enable deletion (garbage collection) even when trash lifetime is zero, even though there are known race conditions that can cause data loss.")
}
// S3Volume implements Volume using an S3 bucket.
def __init__(self, *args, **kwargs):
super(pykka.ThreadingActor, self).__init__(*args, **kwargs)
self.actor_ref = TellableActorRef(self)
+ self._killfunc = kwargs.get("killfunc", os.kill)
def on_failure(self, exception_type, exception_value, tb):
lg = getattr(self, "_logger", logging)
if (exception_type in (threading.ThreadError, MemoryError) or
exception_type is OSError and exception_value.errno == errno.ENOMEM):
lg.critical("Unhandled exception is a fatal error, killing Node Manager")
- os.kill(os.getpid(), signal.SIGKILL)
+ self._killfunc(os.getpid(), signal.SIGKILL)
def ping(self):
return True
+ def get_thread(self):
+ return threading.current_thread()
class WatchdogActor(pykka.ThreadingActor):
def __init__(self, timeout, *args, **kwargs):
self.actors = [a.proxy() for a in args]
self.actor_ref = TellableActorRef(self)
self._later = self.actor_ref.tell_proxy()
+ self._killfunc = kwargs.get("killfunc", os.kill)
def kill_self(self, e, act):
lg = getattr(self, "_logger", logging)
lg.critical("Watchdog exception", exc_info=e)
lg.critical("Actor %s watchdog ping time out, killing Node Manager", act)
- os.kill(os.getpid(), signal.SIGKILL)
+ self._killfunc(os.getpid(), signal.SIGKILL)
def on_start(self):
self._later.run()
return super(ComputeNodeShutdownActor, self)._finished()
def cancel_shutdown(self, reason, **kwargs):
+ if self.cancel_reason is not None:
+ # already cancelled
+ return
self.cancel_reason = reason
self._logger.info("Shutdown cancelled: %s.", reason)
self._finished(success_flag=False)
@_cancel_on_exception
def shutdown_node(self):
+ if self.cancel_reason is not None:
+ # already cancelled
+ return
if self.cancellable:
self._logger.info("Checking that node is still eligible for shutdown")
eligible, reason = self._monitor.shutdown_eligible().get()
if output in ("drng\n", "alloc\n", "drng*\n", "alloc*\n"):
self._timer.schedule(time.time() + 10,
self._later.await_slurm_drain)
- elif output in ("idle\n"):
+ elif output in ("idle\n",):
# Not in "drng" but idle, don't shut down
self.cancel_shutdown("slurm state is %s" % output.strip(), try_resume=False)
else:
message at a later time. This actor runs the necessary event loop for
delivery.
"""
- def __init__(self, max_sleep=1):
+ def __init__(self, max_sleep=1, timefunc=None):
super(TimedCallBackActor, self).__init__()
self._proxy = self.actor_ref.tell_proxy()
self.messages = []
self.max_sleep = max_sleep
+ if timefunc is None:
+ self._timefunc = time.time
+ else:
+ self._timefunc = timefunc
def schedule(self, delivery_time, receiver, *args, **kwargs):
if not self.messages:
def deliver(self):
if not self.messages:
return
- til_next = self.messages[0][0] - time.time()
+ til_next = self.messages[0][0] - self._timefunc()
if til_next <= 0:
t, receiver, args, kwargs = heapq.heappop(self.messages)
try:
],
install_requires=[
'apache-libcloud>=0.20',
- 'arvados-python-client>=0.1.20150206225333',
+ 'arvados-python-client>=0.1.20170731145219',
'future',
'pykka',
'python-daemon',
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+#
+#
+# Usage: arvados-cwl-runner stress_test.cwl
+#
+# Submits 100 jobs or containers, creating load on node manager and
+# scheduler.
+
+class: Workflow
+cwlVersion: v1.0
+requirements:
+ ScatterFeatureRequirement: {}
+ InlineJavascriptRequirement: {}
+inputs: []
+outputs: []
+steps:
+ step1:
+ in: []
+ out: [out]
+ run:
+ class: ExpressionTool
+ inputs: []
+ outputs:
+ out: int[]
+ expression: |
+ ${
+ var r = [];
+ for (var i = 1; i <= 100; i++) {
+ r.push(i);
+ }
+ return {out: r};
+ }
+ step2:
+ in:
+ num: step1/out
+ out: []
+ scatter: num
+ run:
+ class: CommandLineTool
+ requirements:
+ ShellCommandRequirement: {}
+ inputs:
+ num: int
+ outputs: []
+ arguments: [echo, "starting",
+ {shellQuote: false, valueFrom: "&&"},
+ sleep, $((101-inputs.num)*2),
+ {shellQuote: false, valueFrom: "&&"},
+ echo, "the number of the day is", $(inputs.num)]
]
self.make_actor()
self.wait_for_assignment(self.setup_actor, 'cloud_node')
+ self.setup_actor.ping().get(self.TIMEOUT)
self.assertEqual(1, self.cloud_client.post_create_node.call_count)
def test_instance_exceeded_not_retried(self):
self.api_client.nodes().create().execute.side_effect = retry_resp
self.api_client.nodes().update().execute.side_effect = retry_resp
self.wait_for_assignment(self.setup_actor, 'cloud_node')
+ self.setup_actor.ping().get(self.TIMEOUT)
self.assertEqual(self.setup_actor.actor_ref.actor_urn,
subscriber.call_args[0][0].actor_ref.actor_urn)
self.make_mocks(shutdown_open=True, arvados_node=testutil.arvados_node_mock(crunch_worker_state="busy"))
self.cloud_client.destroy_node.return_value = True
self.make_actor(cancellable=True)
- self.check_success_flag(False)
+ self.check_success_flag(False, 2)
self.assertFalse(self.cloud_client.destroy_node.called)
def test_uncancellable_shutdown(self, *mocks):
self.make_mocks(shutdown_open=True, arvados_node=testutil.arvados_node_mock(crunch_worker_state="busy"))
self.cloud_client.destroy_node.return_value = True
self.make_actor(cancellable=False)
- self.check_success_flag(True, 2)
+ self.check_success_flag(True, 4)
self.assertTrue(self.cloud_client.destroy_node.called)
def test_arvados_node_cleaned_after_shutdown(self, *mocks):
+ if len(mocks) == 1:
+ mocks[0].return_value = "drain\n"
cloud_node = testutil.cloud_node_mock(62)
arv_node = testutil.arvados_node_mock(62)
self.make_mocks(cloud_node, arv_node)
self.assertTrue(update_mock().execute.called)
def test_arvados_node_not_cleaned_after_shutdown_cancelled(self, *mocks):
+ if len(mocks) == 1:
+ mocks[0].return_value = "idle\n"
cloud_node = testutil.cloud_node_mock(61)
arv_node = testutil.arvados_node_mock(61)
self.make_mocks(cloud_node, arv_node, shutdown_open=False)
self.cloud_client.destroy_node.return_value = False
self.make_actor(cancellable=True)
self.shutdown_actor.cancel_shutdown("test")
+ self.shutdown_actor.ping().get(self.TIMEOUT)
self.check_success_flag(False, 2)
self.assertFalse(self.arvados_client.nodes().update.called)
def test_in_state_when_no_state_available(self):
self.make_actor(arv_node=testutil.arvados_node_mock(
crunch_worker_state=None))
- print(self.node_actor.get_state().get())
self.assertTrue(self.node_state('idle'))
def test_in_state_when_no_state_available_old(self):
self.make_actor(arv_node=testutil.arvados_node_mock(
crunch_worker_state=None, age=90000))
- print(self.node_actor.get_state().get())
self.assertTrue(self.node_state('down'))
def test_in_idle_state(self):
self.timer = testutil.MockTimer(False)
self.make_actor()
self.check_success_flag(None, 0)
+ # At this point, 1st try should have happened.
+
self.timer.deliver()
self.check_success_flag(None, 0)
- self.timer.deliver()
+ # At this point, 2nd try should have happened.
+
# Order is critical here: if the mock gets called when no return value
# or side effect is set, we may invoke a real subprocess.
proc_mock.return_value = end_state
proc_mock.side_effect = None
+
+ # 3rd try
+ self.timer.deliver()
+
self.check_success_flag(True, 3)
self.check_slurm_got_args(proc_mock, 'NodeName=compute63')
self.check_success_flag(True)
self.assertFalse(proc_mock.called)
- def test_node_undrained_when_shutdown_cancelled(self, proc_mock):
+ def test_node_resumed_when_shutdown_cancelled(self, proc_mock):
try:
proc_mock.side_effect = iter(['', 'drng\n', 'drng\n', ''])
self.make_mocks(arvados_node=testutil.arvados_node_mock(job_uuid=True))
self.timer = testutil.MockTimer(False)
self.make_actor()
self.busywait(lambda: proc_mock.call_args is not None)
- self.shutdown_actor.cancel_shutdown("test").get(self.TIMEOUT)
+ self.shutdown_actor.cancel_shutdown("test")
self.check_success_flag(False, 2)
- self.assertEqual(proc_mock.call_args_list,
- [mock.call(['scontrol', 'update', 'NodeName=compute99', 'State=DRAIN', 'Reason=Node Manager shutdown']),
- mock.call(['sinfo', '--noheader', '-o', '%t', '-n', 'compute99']),
- mock.call(['sinfo', '--noheader', '-o', '%t', '-n', 'compute99']),
- mock.call(['scontrol', 'update', 'NodeName=compute99', 'State=RESUME'])])
+ self.assertEqual(proc_mock.call_args_list[0], mock.call(['scontrol', 'update', 'NodeName=compute99', 'State=DRAIN', 'Reason=Node Manager shutdown']))
+ self.assertEqual(proc_mock.call_args_list[-1], mock.call(['scontrol', 'update', 'NodeName=compute99', 'State=RESUME']))
+
finally:
self.shutdown_actor.actor_ref.stop()
proc_mock.side_effect = iter([OSError, 'drain\n', OSError, 'idle\n', 'idle\n'])
self.make_mocks(arvados_node=testutil.arvados_node_mock(job_uuid=True))
self.make_actor()
- self.check_success_flag(False, 2)
+ self.check_success_flag(False, 5)
def test_issue_slurm_drain_retry(self, proc_mock):
- proc_mock.side_effect = iter([OSError, '', OSError, 'drng\n'])
+ proc_mock.side_effect = iter([OSError, OSError, 'drng\n', 'drain\n'])
self.check_success_after_reset(proc_mock, timer=False)
def test_arvados_node_cleaned_after_shutdown(self, proc_mock):
class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
unittest.TestCase):
+
+ def busywait(self, f):
+ n = 0
+ while not f() and n < 200:
+ time.sleep(.1)
+ self.daemon.ping().get(self.TIMEOUT)
+ n += 1
+ self.assertTrue(f())
+
def mock_node_start(self, **kwargs):
# Make sure that every time the daemon starts a setup actor,
# it gets a new mock object back.
self.daemon.update_server_wishlist(want_sizes).get(self.TIMEOUT)
def monitor_list(self):
- return pykka.ActorRegistry.get_by_class(ComputeNodeMonitorActor)
+ return [c.actor.actor_ref for c in self.daemon.cloud_nodes.get(self.TIMEOUT).nodes.values() if c.actor]
- def monitored_arvados_nodes(self):
+ def monitored_arvados_nodes(self, include_unpaired=True):
pairings = []
for future in [actor.proxy().arvados_node
for actor in self.monitor_list()]:
try:
- pairings.append(future.get(self.TIMEOUT))
+ g = future.get(self.TIMEOUT)
+ if g or include_unpaired:
+ pairings.append(g)
except pykka.ActorDeadError:
pass
return pairings
def alive_monitor_count(self):
return len(self.monitored_arvados_nodes())
+ def paired_monitor_count(self):
+ return len(self.monitored_arvados_nodes(False))
+
def assertShutdownCancellable(self, expected=True):
self.assertTrue(self.node_shutdown.start.called)
self.assertIs(expected,
def test_easy_node_creation(self):
size = testutil.MockSize(1)
self.make_daemon(want_sizes=[size])
- self.stop_proxy(self.daemon)
- self.assertTrue(self.node_setup.start.called)
+ self.busywait(lambda: self.node_setup.start.called)
def check_monitors_arvados_nodes(self, *arv_nodes):
+ self.busywait(lambda: len(arv_nodes) == len(self.monitored_arvados_nodes()))
self.assertItemsEqual(arv_nodes, self.monitored_arvados_nodes())
def test_node_pairing(self):
cloud_node = testutil.cloud_node_mock(1)
arv_node = testutil.arvados_node_mock(1)
self.make_daemon([cloud_node], [arv_node])
- self.stop_proxy(self.daemon)
self.check_monitors_arvados_nodes(arv_node)
def test_node_pairing_after_arvados_update(self):
[testutil.arvados_node_mock(1, ip_address=None)])
arv_node = testutil.arvados_node_mock(2)
self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
- self.stop_proxy(self.daemon)
self.check_monitors_arvados_nodes(arv_node)
def test_arvados_node_un_and_re_paired(self):
self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
self.check_monitors_arvados_nodes(arv_node)
self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
- self.assertEqual(0, self.alive_monitor_count())
+ self.busywait(lambda: 0 == self.alive_monitor_count())
self.daemon.update_cloud_nodes([testutil.cloud_node_mock(3)])
- self.stop_proxy(self.daemon)
self.check_monitors_arvados_nodes(arv_node)
def test_old_arvados_node_not_double_assigned(self):
def test_node_count_satisfied(self):
self.make_daemon(cloud_nodes=[testutil.cloud_node_mock(1)],
want_sizes=[testutil.MockSize(1)])
- self.stop_proxy(self.daemon)
- self.assertFalse(self.node_setup.start.called)
+ self.busywait(lambda: not self.node_setup.start.called)
def test_dont_count_missing_as_busy(self):
size = testutil.MockSize(1)
2,
last_ping_at='1970-01-01T01:02:03.04050607Z')],
want_sizes=[size, size])
- self.stop_proxy(self.daemon)
- self.assertTrue(self.node_setup.start.called)
+ self.busywait(lambda: 2 == self.alive_monitor_count())
+ self.busywait(lambda: self.node_setup.start.called)
def test_missing_counts_towards_max(self):
size = testutil.MockSize(1)
testutil.arvados_node_mock(2, last_ping_at='1970-01-01T01:02:03.04050607Z')],
want_sizes=[size, size],
max_nodes=2)
- self.stop_proxy(self.daemon)
- self.assertFalse(self.node_setup.start.called)
+ self.busywait(lambda: not self.node_setup.start.called)
def test_excess_counts_missing(self):
size = testutil.MockSize(1)
arvados_nodes=[testutil.arvados_node_mock(1),
testutil.arvados_node_mock(2, last_ping_at='1970-01-01T01:02:03.04050607Z')],
want_sizes=[size])
- self.assertEqual(2, self.alive_monitor_count())
+ self.busywait(lambda: 2 == self.paired_monitor_count())
for mon_ref in self.monitor_list():
self.daemon.node_can_shutdown(mon_ref.proxy()).get(self.TIMEOUT)
self.assertEqual(1, self.node_shutdown.start.call_count)
arvados_nodes=[testutil.arvados_node_mock(1),
testutil.arvados_node_mock(2, last_ping_at='1970-01-01T01:02:03.04050607Z')],
want_sizes=[size])
-
+ self.busywait(lambda: 2 == self.paired_monitor_count())
get_cloud_node = mock.MagicMock(name="get_cloud_node")
get_cloud_node.get.return_value = cloud_nodes[1]
mock_node_monitor = mock.MagicMock()
self.daemon.cloud_nodes.get()[cloud_nodes[1].id].shutdown_actor = mock_shutdown.proxy()
- self.assertEqual(2, self.alive_monitor_count())
+ self.busywait(lambda: 2 == self.alive_monitor_count())
for mon_ref in self.monitor_list():
self.daemon.node_can_shutdown(mon_ref.proxy()).get(self.TIMEOUT)
- self.assertEqual(1, self.node_shutdown.start.call_count)
+ self.busywait(lambda: 1 == self.node_shutdown.start.call_count)
def test_booting_nodes_counted(self):
cloud_node = testutil.cloud_node_mock(1)
self.daemon.max_nodes.get(self.TIMEOUT)
self.assertTrue(self.node_setup.start.called)
self.daemon.update_server_wishlist(server_wishlist).get(self.TIMEOUT)
- self.stop_proxy(self.daemon)
- self.assertEqual(1, self.node_setup.start.call_count)
+ self.busywait(lambda: 1 == self.node_setup.start.call_count)
def test_boot_new_node_when_all_nodes_busy(self):
size = testutil.MockSize(2)
arv_node = testutil.arvados_node_mock(2, job_uuid=True)
self.make_daemon([testutil.cloud_node_mock(2, size=size)], [arv_node],
[size], avail_sizes=[(size, {"cores":1})])
+ self.busywait(lambda: 1 == self.paired_monitor_count())
self.busywait(lambda: self.node_setup.start.called)
- self.stop_proxy(self.daemon)
- self.assertTrue(self.node_setup.start.called)
def test_boot_new_node_below_min_nodes(self):
min_size = testutil.MockSize(1)
now = time.time()
self.monitor_list()[0].tell_proxy().consider_shutdown()
self.busywait(lambda: self.node_shutdown.start.called)
- self.stop_proxy(self.daemon)
self.assertShutdownCancellable(False)
def test_booted_node_shut_down_when_never_paired(self):
self.daemon.update_cloud_nodes([cloud_node])
self.monitor_list()[0].tell_proxy().consider_shutdown()
self.busywait(lambda: self.node_shutdown.start.called)
- self.stop_proxy(self.daemon)
self.assertShutdownCancellable(False)
def test_booted_node_shut_down_when_never_working(self):
self.monitor_list()[0].proxy().cloud_node_start_time = time.time()-3601
self.daemon.update_cloud_nodes([cloud_node])
self.busywait(lambda: self.node_shutdown.start.called)
- self.stop_proxy(self.daemon)
self.assertShutdownCancellable(False)
def test_node_that_pairs_not_considered_failed_boot(self):
def test_booting_nodes_shut_down(self):
self.make_daemon(want_sizes=[testutil.MockSize(1)])
self.daemon.update_server_wishlist([]).get(self.TIMEOUT)
- self.stop_proxy(self.daemon)
- self.assertTrue(self.last_setup.stop_if_no_cloud_node.called)
+ self.busywait(lambda: self.last_setup.stop_if_no_cloud_node.called)
def test_all_booting_nodes_tried_to_shut_down(self):
size = testutil.MockSize(2)
arv_node = testutil.arvados_node_mock(1)
size = testutil.MockSize(1)
self.make_daemon(cloud_nodes=[cloud_node], arvados_nodes=[arv_node], want_sizes=[size])
- self.assertEqual(1, self.alive_monitor_count())
+ self.busywait(lambda: 1 == self.paired_monitor_count())
monitor = self.monitor_list()[0].proxy()
self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
self.stop_proxy(self.daemon)
cloud_node = testutil.cloud_node_mock(1)
arv_node = testutil.arvados_node_mock(1)
self.make_daemon(cloud_nodes=[cloud_node], arvados_nodes=[arv_node], min_nodes=1)
- self.assertEqual(1, self.alive_monitor_count())
+ self.busywait(lambda: 1 == self.paired_monitor_count())
monitor = self.monitor_list()[0].proxy()
self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
self.stop_proxy(self.daemon)
def test_shutdown_accepted_below_capacity(self):
self.make_daemon(cloud_nodes=[testutil.cloud_node_mock()])
- self.assertEqual(1, self.alive_monitor_count())
+ self.busywait(lambda: 1 == self.alive_monitor_count())
monitor = self.monitor_list()[0].proxy()
self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
- self.stop_proxy(self.daemon)
- self.assertTrue(self.node_shutdown.start.called)
+ self.busywait(lambda: self.node_shutdown.start.called)
def test_shutdown_declined_when_idle_and_job_queued(self):
size = testutil.MockSize(1)
arv_nodes = [testutil.arvados_node_mock(3, job_uuid=True),
testutil.arvados_node_mock(4, job_uuid=None)]
self.make_daemon(cloud_nodes, arv_nodes, [size])
- self.assertEqual(2, self.alive_monitor_count())
+ self.busywait(lambda: 2 == self.paired_monitor_count())
for mon_ref in self.monitor_list():
monitor = mon_ref.proxy()
if monitor.cloud_node.get(self.TIMEOUT) is cloud_nodes[-1]:
self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
self.last_shutdown.success.get.return_value = False
self.daemon.node_finished_shutdown(self.last_shutdown).get(self.TIMEOUT)
- self.assertEqual(1, self.alive_monitor_count())
+ self.busywait(lambda: 1 == self.paired_monitor_count())
self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
self.last_shutdown.success.get.return_value = True
self.last_shutdown.stop.side_effect = lambda: monitor.stop()
self.daemon.node_finished_shutdown(self.last_shutdown).get(self.TIMEOUT)
- self.assertEqual(0, self.alive_monitor_count())
+ self.busywait(lambda: 0 == self.paired_monitor_count())
def test_nodes_shutting_down_replaced_below_max_nodes(self):
size = testutil.MockSize(6)
self.assertTrue(self.node_shutdown.start.called)
self.daemon.update_server_wishlist(
[testutil.MockSize(6)]).get(self.TIMEOUT)
- self.stop_proxy(self.daemon)
- self.assertTrue(self.node_setup.start.called)
+ self.busywait(lambda: self.node_setup.start.called)
def test_nodes_shutting_down_not_replaced_at_max_nodes(self):
cloud_node = testutil.cloud_node_mock(7)
self.make_daemon([cloud_node], [testutil.arvados_node_mock(7)],
max_nodes=1)
- self.assertEqual(1, self.alive_monitor_count())
+ self.busywait(lambda: 1 == self.paired_monitor_count())
monitor = self.monitor_list()[0].proxy()
self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
self.assertTrue(self.node_shutdown.start.called)
self.daemon.update_server_wishlist(
[testutil.MockSize(7)]).get(self.TIMEOUT)
- self.stop_proxy(self.daemon)
- self.assertFalse(self.node_setup.start.called)
+ self.busywait(lambda: not self.node_setup.start.called)
def test_nodes_shutting_down_count_against_excess(self):
size = testutil.MockSize(8)
arv_nodes = [testutil.arvados_node_mock(n, size=size) for n in [8, 9]]
self.make_daemon(cloud_nodes, arv_nodes, [size],
avail_sizes=[(size, {"cores":1})])
- self.assertEqual(2, self.alive_monitor_count())
+ self.busywait(lambda: 2 == self.paired_monitor_count())
for mon_ref in self.monitor_list():
self.daemon.node_can_shutdown(mon_ref.proxy()).get(self.TIMEOUT)
self.assertEqual(1, self.node_shutdown.start.call_count)
size = testutil.MockSize(2)
self.daemon.update_server_wishlist([size] * 2).get(self.TIMEOUT)
self.timer.deliver()
- self.stop_proxy(self.daemon)
- self.assertEqual(1, self.node_setup.start.call_count)
+ self.busywait(lambda: 1 == self.node_setup.start.call_count)
def test_shutdown_actor_stopped_when_cloud_node_delisted(self):
self.make_daemon(cloud_nodes=[testutil.cloud_node_mock()])
monitor = self.monitor_list()[0].proxy()
self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
- self.stop_proxy(self.daemon)
- self.assertEqual(
- 1, self.last_shutdown.stop.call_count)
+ self.busywait(lambda: 1 == self.last_shutdown.stop.call_count)
def test_shutdown_actor_cleanup_copes_with_dead_actors(self):
self.make_daemon(cloud_nodes=[testutil.cloud_node_mock()])
# the ActorDeadError.
self.last_shutdown.stop.side_effect = pykka.ActorDeadError
self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
- self.stop_proxy(self.daemon)
- self.assertEqual(1, self.last_shutdown.stop.call_count)
+ self.busywait(lambda: 1 == self.last_shutdown.stop.call_count)
def test_node_create_two_sizes(self):
small = testutil.MockSize(1)
testutil.arvados_node_mock(3)],
want_sizes=[small, small, big],
avail_sizes=avail_sizes)
-
+ self.busywait(lambda: 3 == self.paired_monitor_count())
self.daemon.update_server_wishlist([small, big, big]).get(self.TIMEOUT)
self.assertEqual(0, self.node_shutdown.start.call_count)
booting = self.daemon.booting.get()
cloud_nodes = self.daemon.cloud_nodes.get()
- self.stop_proxy(self.daemon)
+ self.busywait(lambda: 1 == self.node_setup.start.call_count)
+ self.busywait(lambda: 1 == self.node_shutdown.start.call_count)
- self.assertEqual(1, self.node_setup.start.call_count)
- self.assertEqual(1, self.node_shutdown.start.call_count)
+ self.stop_proxy(self.daemon)
# booting a new big node
sizecounts = {a[0].id: 0 for a in avail_sizes}
import arvnodeman.baseactor
class BogusActor(arvnodeman.baseactor.BaseNodeManagerActor):
- def __init__(self, e):
- super(BogusActor, self).__init__()
+ def __init__(self, e, killfunc=None):
+ super(BogusActor, self).__init__(killfunc=killfunc)
self.exp = e
def doStuff(self):
def ping(self):
# Called by WatchdogActorTest, this delay is longer than the test timeout
# of 1 second, which should cause the watchdog ping to fail.
- time.sleep(4)
+ time.sleep(2)
return True
class ActorUnhandledExceptionTest(testutil.ActorTestMixin, unittest.TestCase):
def test_fatal_error(self):
for e in (MemoryError(), threading.ThreadError(), OSError(errno.ENOMEM, "")):
- with mock.patch('os.kill') as kill_mock:
- act = BogusActor.start(e).tell_proxy()
- act.doStuff()
- act.actor_ref.stop(block=True)
- self.assertTrue(kill_mock.called)
-
- @mock.patch('os.kill')
- def test_nonfatal_error(self, kill_mock):
- act = BogusActor.start(OSError(errno.ENOENT, "")).tell_proxy()
+ kill_mock = mock.Mock('os.kill')
+ bgact = BogusActor.start(e, killfunc=kill_mock)
+ act_thread = bgact.proxy().get_thread().get()
+ act = bgact.tell_proxy()
+ act.doStuff()
+ act.actor_ref.stop(block=True)
+ act_thread.join()
+ self.assertTrue(kill_mock.called)
+
+ def test_nonfatal_error(self):
+ kill_mock = mock.Mock('os.kill')
+ act = BogusActor.start(OSError(errno.ENOENT, ""), killfunc=kill_mock).tell_proxy()
act.doStuff()
act.actor_ref.stop(block=True)
self.assertFalse(kill_mock.called)
class WatchdogActorTest(testutil.ActorTestMixin, unittest.TestCase):
- @mock.patch('os.kill')
- def test_time_timout(self, kill_mock):
+
+ def test_time_timout(self):
+ kill_mock = mock.Mock('os.kill')
act = BogusActor.start(OSError(errno.ENOENT, ""))
- watch = arvnodeman.baseactor.WatchdogActor.start(1, act)
+ watch = arvnodeman.baseactor.WatchdogActor.start(1, act, killfunc=kill_mock)
+ time.sleep(1)
watch.stop(block=True)
act.stop(block=True)
self.assertTrue(kill_mock.called)
@mock.patch("subprocess.check_call")
@mock.patch("subprocess.check_output")
def test_unsatisfiable_jobs(self, mock_squeue, mock_scancel):
- #mock_scancel.return_value = ""
job_uuid = 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'
container_uuid = 'yyyyy-dz642-yyyyyyyyyyyyyyy'
mock_squeue.return_value = "1|1024|0|Resources|" + container_uuid + "\n"
self.build_monitor([{'items': [{'uuid': job_uuid}]}],
self.MockCalculatorUnsatisfiableJobs(), True, True)
self.monitor.subscribe(self.subscriber).get(self.TIMEOUT)
+ self.monitor.ping().get(self.TIMEOUT)
self.stop_proxy(self.monitor)
self.client.jobs().cancel.assert_called_with(uuid=job_uuid)
mock_scancel.assert_called_with(['scancel', '--name='+container_uuid])
def test_delayed_turnaround(self):
receiver = mock.Mock()
- with mock.patch('time.time', return_value=0) as mock_now:
- deliverer = timedcallback.TimedCallBackActor.start().proxy()
- deliverer.schedule(1, receiver, 'delayed')
- deliverer.schedule(3, receiver, 'failure').get(self.TIMEOUT)
- self.assertFalse(receiver.called)
- mock_now.return_value = 2
- deliverer.schedule(3, receiver, 'failure').get(self.TIMEOUT)
- self.stop_proxy(deliverer)
+ mock_now = mock.Mock()
+ mock_now.return_value = 0
+ deliverer = timedcallback.TimedCallBackActor.start(timefunc=mock_now).proxy()
+ deliverer.schedule(1, receiver, 'delayed')
+ deliverer.schedule(3, receiver, 'failure').get(self.TIMEOUT)
+ self.assertFalse(receiver.called)
+ mock_now.return_value = 2
+ deliverer.schedule(3, receiver, 'failure').get(self.TIMEOUT)
+ self.stop_proxy(deliverer)
receiver.assert_called_with('delayed')
def test_out_of_order_scheduling(self):
receiver = mock.Mock()
- with mock.patch('time.time', return_value=1.5) as mock_now:
- deliverer = timedcallback.TimedCallBackActor.start().proxy()
- deliverer.schedule(2, receiver, 'second')
- deliverer.schedule(1, receiver, 'first')
- deliverer.schedule(3, receiver, 'failure').get(self.TIMEOUT)
- receiver.assert_called_with('first')
- mock_now.return_value = 2.5
- deliverer.schedule(3, receiver, 'failure').get(self.TIMEOUT)
- self.stop_proxy(deliverer)
+ mock_now = mock.Mock()
+ mock_now.return_value = 1.5
+ deliverer = timedcallback.TimedCallBackActor.start(timefunc=mock_now).proxy()
+ deliverer.schedule(2, receiver, 'second')
+ deliverer.schedule(1, receiver, 'first')
+ deliverer.schedule(3, receiver, 'failure').get(self.TIMEOUT)
+ receiver.assert_called_with('first')
+ mock_now.return_value = 2.5
+ deliverer.schedule(3, receiver, 'failure').get(self.TIMEOUT)
+ self.stop_proxy(deliverer)
receiver.assert_called_with('second')
def test_dead_actors_ignored(self):
if __name__ == '__main__':
unittest.main()
-
pykka.ActorRegistry.stop_all()
def stop_proxy(self, proxy):
- return proxy.actor_ref.stop(timeout=self.TIMEOUT)
+ th = proxy.get_thread().get()
+ t = proxy.actor_ref.stop(timeout=self.TIMEOUT)
+ th.join()
+ return t
def wait_for_assignment(self, proxy, attr_name, unassigned=None,
timeout=TIMEOUT):
if result is not unassigned:
return result
- def busywait(self, f):
+ def busywait(self, f, finalize=None):
n = 0
- while not f() and n < 10:
+ while not f() and n < 20:
time.sleep(.1)
n += 1
+ if finalize is not None:
+ finalize()
self.assertTrue(f())
}
func (ps *pgEventSource) DBHealth() error {
- ctx, _ := context.WithDeadline(context.Background(), time.Now().Add(time.Second))
+ ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second))
+ defer cancel()
var i int
return ps.db.QueryRowContext(ctx, "SELECT 1").Scan(&i)
}
// Receive websocket frames from the client and pass them to
// sess.Receive().
go func() {
+ defer cancel()
buf := make([]byte, 2<<20)
for {
select {
err = errFrameTooBig
}
if err != nil {
- if err != io.EOF {
+ if err != io.EOF && ctx.Err() == nil {
log.WithError(err).Info("read error")
}
- cancel()
return
}
err = sess.Receive(buf)
if err != nil {
log.WithError(err).Error("sess.Receive() failed")
- cancel()
return
}
}
// sess.EventMessage() as needed, and send them to the client
// as websocket frames.
go func() {
+ defer cancel()
for {
var ok bool
var data interface{}
buf, err = sess.EventMessage(e)
if err != nil {
log.WithError(err).Error("EventMessage failed")
- cancel()
- break
+ return
} else if len(buf) == 0 {
log.Debug("skip")
continue
t0 := time.Now()
_, err = ws.Write(buf)
if err != nil {
- log.WithError(err).Error("write failed")
- cancel()
- break
+ if ctx.Err() == nil {
+ log.WithError(err).Error("write failed")
+ }
+ return
}
log.Debug("sent")
// is done/cancelled or the incoming event stream ends. Shut
// down the handler if the outgoing queue fills up.
go func() {
+ defer cancel()
ticker := time.NewTicker(h.PingTimeout)
defer ticker.Stop()
default:
}
}
- continue
case e, ok := <-incoming.Channel():
if !ok {
- cancel()
return
}
if !sess.Filter(e) {
case queue <- e:
default:
log.WithError(errQueueFull).Error("terminate")
- cancel()
return
}
}
// client will probably reconnect and do the
// same thing all over again.
time.Sleep(100 * time.Millisecond)
+ if sess.ws.Request().Context().Err() != nil {
+ // Session terminated while we were sleeping
+ return
+ }
}
now := time.Now()
e := &event{
GOPATH=$PWD go get github.com/curoverse/runsvinit && \
install bin/runsvinit /usr/local/bin
+ENV PJSVERSION=1.9.7
+
RUN set -e && \
- PJS=phantomjs-1.9.7-linux-x86_64 && \
- curl -L -o/tmp/$PJS.tar.bz2 http://cache.arvados.org/$PJS.tar.bz2 && \
- tar -C /usr/local -xjf /tmp/$PJS.tar.bz2 && \
- ln -s ../$PJS/bin/phantomjs /usr/local/bin/
+ curl -L -f http://cache.arvados.org/phantomjs-${PJSVERSION}-linux-x86_64.tar.bz2 | tar -C /usr/local -xjf - && \
+ ln -s ../phantomjs-${PJSVERSION}-linux-x86_64/bin/phantomjs /usr/local/bin
RUN pip install -U setuptools
+ENV NODEVERSION v6.11.2
+
+# Install nodejs binary
+RUN curl -L -f https://nodejs.org/dist/${NODEVERSION}/node-${NODEVERSION}-linux-x64.tar.xz | tar -C /usr/local -xJf - && \
+ ln -s ../node-${NODEVERSION}-linux-x64/bin/node ../node-${NODEVERSION}-linux-x64/bin/npm /usr/local/bin
+
ARG arvados_version
RUN echo arvados_version is git commit $arvados_version