From: Bryan Cosca Date: Wed, 21 Oct 2015 15:36:25 +0000 (-0400) Subject: Merge branch '7015-update-user-guide' X-Git-Tag: 1.1.0~1298 X-Git-Url: https://git.arvados.org/arvados.git/commitdiff_plain/f0ea4324260fb4dc6df693d9548285bb64b3b69f?hp=78068d0f47e28b8caae3a16fb5a5a5c4037a287b Merge branch '7015-update-user-guide' closes #7015 --- diff --git a/apps/workbench/Gemfile.lock b/apps/workbench/Gemfile.lock index 20b8d6164c..8b2118ce08 100644 --- a/apps/workbench/Gemfile.lock +++ b/apps/workbench/Gemfile.lock @@ -74,7 +74,7 @@ GEM rack (>= 1.0.0) rack-test (>= 0.5.4) xpath (~> 2.0) - childprocess (0.5.5) + childprocess (0.5.6) ffi (~> 1.0, >= 1.0.11) cliver (0.3.2) coffee-rails (4.1.0) @@ -98,7 +98,7 @@ GEM fast_stack (0.1.0) rake rake-compiler - ffi (1.9.6) + ffi (1.9.10) flamegraph (0.1.0) fast_stack google-api-client (0.6.4) @@ -139,7 +139,7 @@ GEM metaclass (~> 0.0.1) morrisjs-rails (0.5.1) railties (> 3.1, < 5) - multi_json (1.11.1) + multi_json (1.11.2) multipart-post (1.2.0) net-scp (1.2.1) net-ssh (>= 2.6.5) @@ -192,7 +192,7 @@ GEM ref (1.0.5) ruby-debug-passenger (0.2.0) ruby-prof (0.15.2) - rubyzip (1.1.6) + rubyzip (1.1.7) rvm-capistrano (1.5.5) capistrano (~> 2.15.4) sass (3.4.9) @@ -202,7 +202,7 @@ GEM sprockets (>= 2.8, < 4.0) sprockets-rails (>= 2.0, < 4.0) tilt (~> 1.1) - selenium-webdriver (2.44.0) + selenium-webdriver (2.48.1) childprocess (~> 0.5) multi_json (~> 1.0) rubyzip (~> 1.0) @@ -239,7 +239,7 @@ GEM execjs (>= 0.3.0) json (>= 1.8.0) uuidtools (2.1.5) - websocket (1.2.1) + websocket (1.2.2) websocket-driver (0.5.1) websocket-extensions (>= 0.1.0) websocket-extensions (0.1.1) @@ -294,3 +294,6 @@ DEPENDENCIES therubyracer uglifier (>= 1.0.3) wiselinks + +BUNDLED WITH + 1.10.6 diff --git a/apps/workbench/app/views/getting_started/_getting_started_popup.html.erb b/apps/workbench/app/views/getting_started/_getting_started_popup.html.erb index 0db0567ec9..3020a1249b 100644 --- a/apps/workbench/app/views/getting_started/_getting_started_popup.html.erb +++ b/apps/workbench/app/views/getting_started/_getting_started_popup.html.erb @@ -154,7 +154,7 @@ div.figure p {
  • Use existing pipelines: Use best-practices pipelines on your own data with the click of a button.
  • - Open-source: Arvados is completely open-source. Check out our developer site. + Open source: Arvados is completely open source. Check out our developer site.
  • diff --git a/doc/install/install-manual-prerequisites.html.textile.liquid b/doc/install/install-manual-prerequisites.html.textile.liquid index 52a51a191a..a26370d21b 100644 --- a/doc/install/install-manual-prerequisites.html.textile.liquid +++ b/doc/install/install-manual-prerequisites.html.textile.liquid @@ -42,7 +42,7 @@ baseurl=http://rpm.arvados.org/CentOS/$releasever/os/$basearch/ h3. Debian and Ubuntu -Packages are available for Debian 7 ("wheezy"), Ubuntu 12.04 ("precise"), and Ubuntu 14.04 ("trusty"). +Packages are available for Debian 7 ("wheezy"), Debian 8 ("jessie"), Ubuntu 12.04 ("precise"), and Ubuntu 14.04 ("trusty"). First, register the Curoverse signing key in apt's database: @@ -53,6 +53,7 @@ Configure apt to retrieve packages from the Arvados package repository. This com table(table table-bordered table-condensed). |OS version|Command| |Debian 7 ("wheezy")|echo "deb http://apt.arvados.org/ wheezy main" | sudo tee /etc/apt/sources.list.d/arvados.list| +|Debian 8 ("jessie")|echo "deb http://apt.arvados.org/ jessie main" | sudo tee /etc/apt/sources.list.d/arvados.list| |Ubuntu 12.04 ("precise")|echo "deb http://apt.arvados.org/ precise main" | sudo tee /etc/apt/sources.list.d/arvados.list| |Ubuntu 14.04 ("trusty")|echo "deb http://apt.arvados.org/ trusty main" | sudo tee /etc/apt/sources.list.d/arvados.list| diff --git a/doc/install/install-sso.html.textile.liquid b/doc/install/install-sso.html.textile.liquid index 56c7a4b337..ca620f478a 100644 --- a/doc/install/install-sso.html.textile.liquid +++ b/doc/install/install-sso.html.textile.liquid @@ -164,7 +164,8 @@ Use @rails console@ to create a @Client@ record that will be used by the Arvados

    ~$ ruby -e 'puts rand(2**400).to_s(36)'
     xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
    -~$ RAILS_ENV=production bundle exec rails console
    +~$ cd /var/www/arvados-sso/current
    +/var/www/arvados-sso/current$ RAILS_ENV=production bundle exec rails console
     :001 > c = Client.new
     :002 > c.name = "joshid"
     :003 > c.app_id = "arvados-server"
    diff --git a/doc/sdk/cli/subcommands.html.textile.liquid b/doc/sdk/cli/subcommands.html.textile.liquid
    index aa7af94d5f..ca494fe17a 100644
    --- a/doc/sdk/cli/subcommands.html.textile.liquid
    +++ b/doc/sdk/cli/subcommands.html.textile.liquid
    @@ -21,6 +21,20 @@ Options:
     
    +h3(#arv-get). arv get + +@arv get@ can be used to get a textual representation of Arvados objects from the command line. The output can be limited to a subset of the object's fields. This command can be used with only the knowledge of an object's UUID. + + +
    +$ arv get --help
    +Usage: arv [--format json|yaml] get [uuid] [fields...]
    +
    +Fetch the specified Arvados object, select the specified fields,
    +and print a text representation.
    +
    +
    + h3(#arv-edit). arv edit @arv edit@ can be used to edit Arvados objects from the command line. Arv edit opens up the editor of your choice (set the EDITOR environment variable) with the json or yaml description of the object. Saving the file will update the Arvados object on the API server, if it passes validation. diff --git a/sdk/cli/bin/arv b/sdk/cli/bin/arv index 2bd7f4ef46..185a5b0673 100755 --- a/sdk/cli/bin/arv +++ b/sdk/cli/bin/arv @@ -5,6 +5,7 @@ # Ward Vandewege require 'fileutils' +require 'shellwords' if RUBY_VERSION < '1.9.3' then abort <<-EOS @@ -85,7 +86,15 @@ def init_config end -subcommands = %w(copy create edit keep pipeline run tag ws) +subcommands = %w(copy create edit get keep pipeline run tag ws) + +def exec_bin bin, opts + bin_path = `which #{bin.shellescape}`.strip + if bin_path.empty? + raise "#{bin}: command not found" + end + exec bin_path, *opts +end def check_subcommands client, arvados, subcommand, global_opts, remaining_opts case subcommand @@ -93,15 +102,17 @@ def check_subcommands client, arvados, subcommand, global_opts, remaining_opts arv_create client, arvados, global_opts, remaining_opts when 'edit' arv_edit client, arvados, global_opts, remaining_opts + when 'get' + arv_get client, arvados, global_opts, remaining_opts when 'copy', 'tag', 'ws', 'run' - exec `which arv-#{subcommand}`.strip, *remaining_opts + exec_bin "arv-#{subcommand}", remaining_opts when 'keep' @sub = remaining_opts.shift if ['get', 'put', 'ls', 'normalize'].index @sub then # Native Arvados - exec `which arv-#{@sub}`.strip, *remaining_opts + exec_bin "arv-#{@sub}", remaining_opts elsif @sub == 'docker' - exec `which arv-keepdocker`.strip, *remaining_opts + exec_bin "arv-keepdocker", remaining_opts else puts "Usage: arv keep [method] [--parameters]\n" puts "Use 'arv keep [method] --help' to get more information about specific methods.\n\n" @@ -111,7 +122,7 @@ def check_subcommands client, arvados, subcommand, global_opts, remaining_opts when 'pipeline' sub = remaining_opts.shift if sub == 'run' - exec `which arv-run-pipeline-instance`.strip, *remaining_opts + exec_bin "arv-run-pipeline-instance", remaining_opts else puts "Usage: arv pipeline [method] [--parameters]\n" puts "Use 'arv pipeline [method] --help' to get more information about specific methods.\n\n" @@ -147,14 +158,7 @@ end def edit_and_commit_object initial_obj, tmp_stem, global_opts, &block - content = case global_opts[:format] - when 'json' - Oj.dump(initial_obj, :indent => 1) - when 'yaml' - initial_obj.to_yaml - else - abort "Unrecognized format #{global_opts[:format]}" - end + content = get_obj_content initial_obj, global_opts tmp_file = Tempfile.new([tmp_stem, ".#{global_opts[:format]}"]) tmp_file.write(content) @@ -179,6 +183,8 @@ def edit_and_commit_object initial_obj, tmp_stem, global_opts, &block Oj.load(newcontent) when 'yaml' YAML.load(newcontent) + else + abort "Unrecognized format #{global_opts[:format]}" end yield newobj @@ -243,20 +249,7 @@ def check_response result results end -def arv_edit client, arvados, global_opts, remaining_opts - uuid = remaining_opts.shift - if uuid.nil? or uuid == "-h" or uuid == "--help" - puts head_banner - puts "Usage: arv edit [uuid] [fields...]\n\n" - puts "Fetch the specified Arvados object, select the specified fields, \n" - puts "open an interactive text editor on a text representation (json or\n" - puts "yaml, use --format) and then update the object. Will use 'nano'\n" - puts "by default, customize with the EDITOR or VISUAL environment variable.\n" - exit 255 - end - - # determine controller - +def lookup_uuid_rsc arvados, uuid m = /([a-z0-9]{5})-([a-z0-9]{5})-([a-z0-9]{15})/.match uuid if !m if /^[a-f0-9]{32}/.match uuid @@ -279,6 +272,11 @@ def arv_edit client, arvados, global_opts, remaining_opts abort "Could not determine resource type #{m[2]}" end + return rsc +end + +def fetch_rsc_obj client, arvados, rsc, uuid, remaining_opts + begin result = client.execute(:api_method => eval('arvados.' + rsc + '.get'), :parameters => {"uuid" => uuid}, @@ -286,15 +284,45 @@ def arv_edit client, arvados, global_opts, remaining_opts :headers => { authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN'] }) - oldobj = check_response result + obj = check_response result rescue => e abort "Server error: #{e}" end if remaining_opts.length > 0 - oldobj.select! { |k, v| remaining_opts.include? k } + obj.select! { |k, v| remaining_opts.include? k } + end + + return obj +end + +def get_obj_content obj, global_opts + content = case global_opts[:format] + when 'json' + Oj.dump(obj, :indent => 1) + when 'yaml' + obj.to_yaml + else + abort "Unrecognized format #{global_opts[:format]}" + end + return content +end + +def arv_edit client, arvados, global_opts, remaining_opts + uuid = remaining_opts.shift + if uuid.nil? or uuid == "-h" or uuid == "--help" + puts head_banner + puts "Usage: arv edit [uuid] [fields...]\n\n" + puts "Fetch the specified Arvados object, select the specified fields, \n" + puts "open an interactive text editor on a text representation (json or\n" + puts "yaml, use --format) and then update the object. Will use 'nano'\n" + puts "by default, customize with the EDITOR or VISUAL environment variable.\n" + exit 255 end + rsc = lookup_uuid_rsc arvados, uuid + oldobj = fetch_rsc_obj client, arvados, rsc, uuid, remaining_opts + edit_and_commit_object oldobj, uuid, global_opts do |newobj| newobj.select! {|k| newobj[k] != oldobj[k]} if !newobj.empty? @@ -315,6 +343,24 @@ def arv_edit client, arvados, global_opts, remaining_opts exit 0 end +def arv_get client, arvados, global_opts, remaining_opts + uuid = remaining_opts.shift + if uuid.nil? or uuid == "-h" or uuid == "--help" + puts head_banner + puts "Usage: arv [--format json|yaml] get [uuid] [fields...]\n\n" + puts "Fetch the specified Arvados object, select the specified fields,\n" + puts "and print a text representation.\n" + exit 255 + end + + rsc = lookup_uuid_rsc arvados, uuid + obj = fetch_rsc_obj client, arvados, rsc, uuid, remaining_opts + content = get_obj_content obj, global_opts + + puts content + exit 0 +end + def arv_create client, arvados, global_opts, remaining_opts types = resource_types(arvados.discovery_document) create_opts = Trollop::options do diff --git a/sdk/cli/test/test_arv-get.rb b/sdk/cli/test/test_arv-get.rb index 5e58014cbf..2e2bba562f 100644 --- a/sdk/cli/test/test_arv-get.rb +++ b/sdk/cli/test/test_arv-get.rb @@ -1,251 +1,189 @@ require 'minitest/autorun' -require 'digest/md5' +require 'json' +require 'yaml' +# Black box tests for 'arv get' command. class TestArvGet < Minitest::Test - def setup - begin - Dir.mkdir './tmp' - rescue Errno::EEXIST - end - @@foo_manifest_locator ||= `echo -n foo | ./bin/arv-put --filename foo --no-progress -`.strip - @@baz_locator ||= `echo -n baz | ./bin/arv-put --as-raw --no-progress -`.strip - @@multilevel_manifest_locator ||= `echo ./foo/bar #{@@baz_locator} 0:3:baz | ./bin/arv-put --as-raw --no-progress -`.strip - end + # UUID for an Arvados object that does not exist + NON_EXISTENT_OBJECT_UUID = "zzzzz-zzzzz-zzzzzzzzzzzzzzz" + # Name of field of Arvados object that can store any (textual) value + STORED_VALUE_FIELD_NAME = "name" + # Name of UUID field of Arvados object + UUID_FIELD_NAME = "uuid" + # Name of an invalid field of Arvados object + INVALID_FIELD_NAME = "invalid" - def test_no_args + # Tests that a valid Arvados object can be retrieved in a supported format + # using: `arv get [uuid]`. Given all other `arv foo` commands return JSON + # when no format is specified, JSON should be expected in this case. + def test_get_valid_object_no_format_specified + stored_value = __method__.to_s + uuid = create_arv_object_with_value(stored_value) out, err = capture_subprocess_io do - assert_arv_get false + assert(arv_get_default(uuid)) end - assert_equal '', out - assert_match /^usage:/, err + assert_empty(err, "Error text not expected: '#{err}'") + arv_object = parse_json_arv_object(out) + assert(has_field_with_value(arv_object, STORED_VALUE_FIELD_NAME, stored_value)) end - def test_help + # Tests that a valid Arvados object can be retrieved in JSON format using: + # `arv get [uuid] --format json`. + def test_get_valid_object_json_format_specified + stored_value = __method__.to_s + uuid = create_arv_object_with_value(stored_value) out, err = capture_subprocess_io do - assert_arv_get '-h' + assert(arv_get_json(uuid)) end - $stderr.write err - assert_equal '', err - assert_match /^usage:/, out + assert_empty(err, "Error text not expected: '#{err}'") + arv_object = parse_json_arv_object(out) + assert(has_field_with_value(arv_object, STORED_VALUE_FIELD_NAME, stored_value)) end - def test_file_to_dev_stdout - test_file_to_stdout('/dev/stdout') - end - - def test_file_to_stdout(specify_stdout_as='-') + # Tests that a valid Arvados object can be retrieved in YAML format using: + # `arv get [uuid] --format yaml`. + def test_get_valid_object_yaml_format_specified + stored_value = __method__.to_s + uuid = create_arv_object_with_value(stored_value) out, err = capture_subprocess_io do - assert_arv_get @@foo_manifest_locator + '/foo', specify_stdout_as + assert(arv_get_yaml(uuid)) end - assert_equal '', err - assert_equal 'foo', out + assert_empty(err, "Error text not expected: '#{err}'") + arv_object = parse_yaml_arv_object(out) + assert(has_field_with_value(arv_object, STORED_VALUE_FIELD_NAME, stored_value)) end - def test_file_to_file - remove_tmp_foo + # Tests that a subset of all fields of a valid Arvados object can be retrieved + # using: `arv get [uuid] [fields...]`. + def test_get_valid_object_with_valid_fields + stored_value = __method__.to_s + uuid = create_arv_object_with_value(stored_value) out, err = capture_subprocess_io do - assert_arv_get @@foo_manifest_locator + '/foo', 'tmp/foo' + assert(arv_get_json(uuid, STORED_VALUE_FIELD_NAME, UUID_FIELD_NAME)) end - assert_equal '', err - assert_equal '', out - assert_equal 'foo', IO.read('tmp/foo') + assert_empty(err, "Error text not expected: '#{err}'") + arv_object = parse_json_arv_object(out) + assert(has_field_with_value(arv_object, STORED_VALUE_FIELD_NAME, stored_value)) + assert(has_field_with_value(arv_object, UUID_FIELD_NAME, uuid)) end - def test_file_to_file_no_overwrite_file - File.open './tmp/foo', 'wb' do |f| - f.write 'baz' - end + # Tests that the valid field is retrieved when both a valid and invalid field + # are requested from a valid Arvados object, using: + # `arv get [uuid] [fields...]`. + def test_get_valid_object_with_both_valid_and_invalid_fields + stored_value = __method__.to_s + uuid = create_arv_object_with_value(stored_value) out, err = capture_subprocess_io do - assert_arv_get false, @@foo_manifest_locator + '/foo', 'tmp/foo' + assert(arv_get_json(uuid, STORED_VALUE_FIELD_NAME, INVALID_FIELD_NAME)) end - assert_match /Local file tmp\/foo already exists/, err - assert_equal '', out - assert_equal 'baz', IO.read('tmp/foo') + assert_empty(err, "Error text not expected: '#{err}'") + arv_object = parse_json_arv_object(out) + assert(has_field_with_value(arv_object, STORED_VALUE_FIELD_NAME, stored_value)) + refute(has_field_with_value(arv_object, INVALID_FIELD_NAME, stored_value)) end - def test_file_to_file_no_overwrite_file_in_dir - File.open './tmp/foo', 'wb' do |f| - f.write 'baz' - end + # Tests that no fields are retreived when no valid fields are requested from + # a valid Arvados object, using: `arv get [uuid] [fields...]`. + def test_get_valid_object_with_no_valid_fields + stored_value = __method__.to_s + uuid = create_arv_object_with_value(stored_value) out, err = capture_subprocess_io do - assert_arv_get false, @@foo_manifest_locator + '/', 'tmp/' + assert(arv_get_json(uuid, INVALID_FIELD_NAME)) end - assert_match /Local file tmp\/foo already exists/, err - assert_equal '', out - assert_equal 'baz', IO.read('tmp/foo') + assert_empty(err, "Error text not expected: '#{err}'") + arv_object = parse_json_arv_object(out) + assert_equal(0, arv_object.length) end - def test_file_to_file_force_overwrite - File.open './tmp/foo', 'wb' do |f| - f.write 'baz' - end - assert_equal 'baz', IO.read('tmp/foo') + # Tests that an invalid (non-existent) Arvados object is not retrieved using: + # using: `arv get [non-existent-uuid]`. + def test_get_invalid_object out, err = capture_subprocess_io do - assert_arv_get '-f', @@foo_manifest_locator + '/', 'tmp/' + refute(arv_get_json(NON_EXISTENT_OBJECT_UUID)) end - assert_match '', err - assert_equal '', out - assert_equal 'foo', IO.read('tmp/foo') + refute_empty(err, "Expected error feedback on request for invalid object") + assert_empty(out) end - def test_file_to_file_skip_existing - File.open './tmp/foo', 'wb' do |f| - f.write 'baz' - end - assert_equal 'baz', IO.read('tmp/foo') + # Tests that help text exists using: `arv get --help`. + def test_help_exists out, err = capture_subprocess_io do - assert_arv_get '--skip-existing', @@foo_manifest_locator + '/', 'tmp/' +# assert(arv_get_default("--help"), "Expected exit code 0: #{$?}") + #XXX: Exit code given is 255. It probably should be 0, which seems to be + # standard elsewhere. However, 255 is in line with other `arv` + # commands (e.g. see `arv edit`) so ignoring the problem here. + arv_get_default("--help") end - assert_match '', err - assert_equal '', out - assert_equal 'baz', IO.read('tmp/foo') + assert_empty(err, "Error text not expected: '#{err}'") + refute_empty(out, "Help text should be given") end - def test_file_to_dir - remove_tmp_foo - out, err = capture_subprocess_io do - assert_arv_get @@foo_manifest_locator + '/foo', 'tmp/' - end - assert_equal '', err - assert_equal '', out - assert_equal 'foo', IO.read('tmp/foo') - end - - def test_dir_to_file - out, err = capture_subprocess_io do - assert_arv_get false, @@foo_manifest_locator + '/', 'tmp/foo' - end - assert_equal '', out - assert_match /^usage:/, err - end - - def test_dir_to_empty_string - out, err = capture_subprocess_io do - assert_arv_get false, @@foo_manifest_locator + '/', '' - end - assert_equal '', out - assert_match /^usage:/, err - end - - def test_nonexistent_block - out, err = capture_subprocess_io do - assert_arv_get false, 'e796ab2294f3e48ec709ffa8d6daf58c' - end - assert_equal '', out - assert_match /Error:/, err - end - - def test_nonexistent_manifest - out, err = capture_subprocess_io do - assert_arv_get false, 'acbd18db4cc2f85cedef654fccc4a4d8/', 'tmp/' - end - assert_equal '', out - assert_match /Error:/, err - end - - def test_manifest_root_to_dir - remove_tmp_foo - out, err = capture_subprocess_io do - assert_arv_get '-r', @@foo_manifest_locator + '/', 'tmp/' - end - assert_equal '', err - assert_equal '', out - assert_equal 'foo', IO.read('tmp/foo') - end - - def test_manifest_root_to_dir_noslash - remove_tmp_foo - out, err = capture_subprocess_io do - assert_arv_get '-r', @@foo_manifest_locator + '/', 'tmp' - end - assert_equal '', err - assert_equal '', out - assert_equal 'foo', IO.read('tmp/foo') - end - - def test_display_md5sum - remove_tmp_foo - out, err = capture_subprocess_io do - assert_arv_get '-r', '--md5sum', @@foo_manifest_locator + '/', 'tmp/' - end - assert_equal "#{Digest::MD5.hexdigest('foo')} ./foo\n", err - assert_equal '', out - assert_equal 'foo', IO.read('tmp/foo') + protected + # Runs 'arv get ' with given arguments. Returns whether the exit + # status was 0 (i.e. success). Use $? to attain more details on failure. + def arv_get_default(*args) + return system("arv", "get", *args) end - def test_md5sum_nowrite - remove_tmp_foo - out, err = capture_subprocess_io do - assert_arv_get '-n', '--md5sum', @@foo_manifest_locator + '/', 'tmp/' - end - assert_equal "#{Digest::MD5.hexdigest('foo')} ./foo\n", err - assert_equal '', out - assert_equal false, File.exists?('tmp/foo') + # Runs 'arv --format json get ' with given arguments. Returns whether + # the exit status was 0 (i.e. success). Use $? to attain more details on + # failure. + def arv_get_json(*args) + return system("arv", "--format", "json", "get", *args) end - def test_sha1_nowrite - remove_tmp_foo - out, err = capture_subprocess_io do - assert_arv_get '-n', '-r', '--hash', 'sha1', @@foo_manifest_locator+'/', 'tmp/' - end - assert_equal "#{Digest::SHA1.hexdigest('foo')} ./foo\n", err - assert_equal '', out - assert_equal false, File.exists?('tmp/foo') + # Runs 'arv --format yaml get ' with given arguments. Returns whether + # the exit status was 0 (i.e. success). Use $? to attain more details on + # failure. + def arv_get_yaml(*args) + return system("arv", "--format", "yaml", "get", *args) end - def test_block_to_file - remove_tmp_foo + # Creates an Arvados object that stores a given value. Returns the uuid of the + # created object. + def create_arv_object_with_value(value) out, err = capture_subprocess_io do - assert_arv_get @@foo_manifest_locator, 'tmp/foo' + system("arv", "tag", "add", value, "--object", "testing") + assert $?.success?, "Command failure running `arv tag`: #{$?}" end assert_equal '', err - assert_equal '', out - - digest = Digest::MD5.hexdigest('foo') - !(IO.read('tmp/foo')).gsub!( /^(. #{digest}+3)(.*)( 0:3:foo)$/).nil? + assert_operator 0, :<, out.strip.length + out.strip end - def test_create_directory_tree - `rm -rf ./tmp/arv-get-test/` - Dir.mkdir './tmp/arv-get-test' - out, err = capture_subprocess_io do - assert_arv_get @@multilevel_manifest_locator + '/', 'tmp/arv-get-test/' + # Parses the given JSON representation of an Arvados object, returning + # an equivalent Ruby representation (a hash map). + def parse_json_arv_object(arvObjectAsJson) + begin + parsed = JSON.parse(arvObjectAsJson) + assert(parsed.instance_of?(Hash)) + return parsed + rescue JSON::ParserError => e + raise "Invalid JSON representation of Arvados object.\n" \ + "Parse error: '#{e}'\n" \ + "JSON: '#{arvObjectAsJson}'\n" end - assert_equal '', err - assert_equal '', out - assert_equal 'baz', IO.read('tmp/arv-get-test/foo/bar/baz') end - def test_create_partial_directory_tree - `rm -rf ./tmp/arv-get-test/` - Dir.mkdir './tmp/arv-get-test' - out, err = capture_subprocess_io do - assert_arv_get(@@multilevel_manifest_locator + '/foo/', - 'tmp/arv-get-test/') + # Parses the given JSON representation of an Arvados object, returning + # an equivalent Ruby representation (a hash map). + def parse_yaml_arv_object(arvObjectAsYaml) + begin + parsed = YAML.load(arvObjectAsYaml) + assert(parsed.instance_of?(Hash)) + return parsed + rescue + raise "Invalid YAML representation of Arvados object.\n" \ + "YAML: '#{arvObjectAsYaml}'\n" end - assert_equal '', err - assert_equal '', out - assert_equal 'baz', IO.read('tmp/arv-get-test/bar/baz') end - protected - def assert_arv_get(*args) - expect = case args.first - when true, false - args.shift - else - true - end - assert_equal(expect, - system(['./bin/arv-get', 'arv-get'], *args), - "`arv-get #{args.join ' '}` " + - "should exit #{if expect then 0 else 'non-zero' end}") - end - - def remove_tmp_foo - begin - File.unlink('tmp/foo') - rescue Errno::ENOENT + # Checks whether the given Arvados object has the given expected value for the + # specified field. + def has_field_with_value(arvObjectAsHash, fieldName, expectedValue) + if !arvObjectAsHash.has_key?(fieldName) + return false end + return (arvObjectAsHash[fieldName] == expectedValue) end end diff --git a/sdk/cli/test/test_arv-keep-get.rb b/sdk/cli/test/test_arv-keep-get.rb new file mode 100644 index 0000000000..0e578b8fb4 --- /dev/null +++ b/sdk/cli/test/test_arv-keep-get.rb @@ -0,0 +1,251 @@ +require 'minitest/autorun' +require 'digest/md5' + +class TestArvKeepGet < Minitest::Test + def setup + begin + Dir.mkdir './tmp' + rescue Errno::EEXIST + end + @@foo_manifest_locator ||= `echo -n foo | ./bin/arv-put --filename foo --no-progress -`.strip + @@baz_locator ||= `echo -n baz | ./bin/arv-put --as-raw --no-progress -`.strip + @@multilevel_manifest_locator ||= `echo ./foo/bar #{@@baz_locator} 0:3:baz | ./bin/arv-put --as-raw --no-progress -`.strip + end + + def test_no_args + out, err = capture_subprocess_io do + assert_arv_get false + end + assert_equal '', out + assert_match /^usage:/, err + end + + def test_help + out, err = capture_subprocess_io do + assert_arv_get '-h' + end + $stderr.write err + assert_equal '', err + assert_match /^usage:/, out + end + + def test_file_to_dev_stdout + test_file_to_stdout('/dev/stdout') + end + + def test_file_to_stdout(specify_stdout_as='-') + out, err = capture_subprocess_io do + assert_arv_get @@foo_manifest_locator + '/foo', specify_stdout_as + end + assert_equal '', err + assert_equal 'foo', out + end + + def test_file_to_file + remove_tmp_foo + out, err = capture_subprocess_io do + assert_arv_get @@foo_manifest_locator + '/foo', 'tmp/foo' + end + assert_equal '', err + assert_equal '', out + assert_equal 'foo', IO.read('tmp/foo') + end + + def test_file_to_file_no_overwrite_file + File.open './tmp/foo', 'wb' do |f| + f.write 'baz' + end + out, err = capture_subprocess_io do + assert_arv_get false, @@foo_manifest_locator + '/foo', 'tmp/foo' + end + assert_match /Local file tmp\/foo already exists/, err + assert_equal '', out + assert_equal 'baz', IO.read('tmp/foo') + end + + def test_file_to_file_no_overwrite_file_in_dir + File.open './tmp/foo', 'wb' do |f| + f.write 'baz' + end + out, err = capture_subprocess_io do + assert_arv_get false, @@foo_manifest_locator + '/', 'tmp/' + end + assert_match /Local file tmp\/foo already exists/, err + assert_equal '', out + assert_equal 'baz', IO.read('tmp/foo') + end + + def test_file_to_file_force_overwrite + File.open './tmp/foo', 'wb' do |f| + f.write 'baz' + end + assert_equal 'baz', IO.read('tmp/foo') + out, err = capture_subprocess_io do + assert_arv_get '-f', @@foo_manifest_locator + '/', 'tmp/' + end + assert_match '', err + assert_equal '', out + assert_equal 'foo', IO.read('tmp/foo') + end + + def test_file_to_file_skip_existing + File.open './tmp/foo', 'wb' do |f| + f.write 'baz' + end + assert_equal 'baz', IO.read('tmp/foo') + out, err = capture_subprocess_io do + assert_arv_get '--skip-existing', @@foo_manifest_locator + '/', 'tmp/' + end + assert_match '', err + assert_equal '', out + assert_equal 'baz', IO.read('tmp/foo') + end + + def test_file_to_dir + remove_tmp_foo + out, err = capture_subprocess_io do + assert_arv_get @@foo_manifest_locator + '/foo', 'tmp/' + end + assert_equal '', err + assert_equal '', out + assert_equal 'foo', IO.read('tmp/foo') + end + + def test_dir_to_file + out, err = capture_subprocess_io do + assert_arv_get false, @@foo_manifest_locator + '/', 'tmp/foo' + end + assert_equal '', out + assert_match /^usage:/, err + end + + def test_dir_to_empty_string + out, err = capture_subprocess_io do + assert_arv_get false, @@foo_manifest_locator + '/', '' + end + assert_equal '', out + assert_match /^usage:/, err + end + + def test_nonexistent_block + out, err = capture_subprocess_io do + assert_arv_get false, 'e796ab2294f3e48ec709ffa8d6daf58c' + end + assert_equal '', out + assert_match /Error:/, err + end + + def test_nonexistent_manifest + out, err = capture_subprocess_io do + assert_arv_get false, 'acbd18db4cc2f85cedef654fccc4a4d8/', 'tmp/' + end + assert_equal '', out + assert_match /Error:/, err + end + + def test_manifest_root_to_dir + remove_tmp_foo + out, err = capture_subprocess_io do + assert_arv_get '-r', @@foo_manifest_locator + '/', 'tmp/' + end + assert_equal '', err + assert_equal '', out + assert_equal 'foo', IO.read('tmp/foo') + end + + def test_manifest_root_to_dir_noslash + remove_tmp_foo + out, err = capture_subprocess_io do + assert_arv_get '-r', @@foo_manifest_locator + '/', 'tmp' + end + assert_equal '', err + assert_equal '', out + assert_equal 'foo', IO.read('tmp/foo') + end + + def test_display_md5sum + remove_tmp_foo + out, err = capture_subprocess_io do + assert_arv_get '-r', '--md5sum', @@foo_manifest_locator + '/', 'tmp/' + end + assert_equal "#{Digest::MD5.hexdigest('foo')} ./foo\n", err + assert_equal '', out + assert_equal 'foo', IO.read('tmp/foo') + end + + def test_md5sum_nowrite + remove_tmp_foo + out, err = capture_subprocess_io do + assert_arv_get '-n', '--md5sum', @@foo_manifest_locator + '/', 'tmp/' + end + assert_equal "#{Digest::MD5.hexdigest('foo')} ./foo\n", err + assert_equal '', out + assert_equal false, File.exists?('tmp/foo') + end + + def test_sha1_nowrite + remove_tmp_foo + out, err = capture_subprocess_io do + assert_arv_get '-n', '-r', '--hash', 'sha1', @@foo_manifest_locator+'/', 'tmp/' + end + assert_equal "#{Digest::SHA1.hexdigest('foo')} ./foo\n", err + assert_equal '', out + assert_equal false, File.exists?('tmp/foo') + end + + def test_block_to_file + remove_tmp_foo + out, err = capture_subprocess_io do + assert_arv_get @@foo_manifest_locator, 'tmp/foo' + end + assert_equal '', err + assert_equal '', out + + digest = Digest::MD5.hexdigest('foo') + !(IO.read('tmp/foo')).gsub!( /^(. #{digest}+3)(.*)( 0:3:foo)$/).nil? + end + + def test_create_directory_tree + `rm -rf ./tmp/arv-get-test/` + Dir.mkdir './tmp/arv-get-test' + out, err = capture_subprocess_io do + assert_arv_get @@multilevel_manifest_locator + '/', 'tmp/arv-get-test/' + end + assert_equal '', err + assert_equal '', out + assert_equal 'baz', IO.read('tmp/arv-get-test/foo/bar/baz') + end + + def test_create_partial_directory_tree + `rm -rf ./tmp/arv-get-test/` + Dir.mkdir './tmp/arv-get-test' + out, err = capture_subprocess_io do + assert_arv_get(@@multilevel_manifest_locator + '/foo/', + 'tmp/arv-get-test/') + end + assert_equal '', err + assert_equal '', out + assert_equal 'baz', IO.read('tmp/arv-get-test/bar/baz') + end + + protected + def assert_arv_get(*args) + expect = case args.first + when true, false + args.shift + else + true + end + assert_equal(expect, + system(['./bin/arv-get', 'arv-get'], *args), + "`arv-get #{args.join ' '}` " + + "should exit #{if expect then 0 else 'non-zero' end}") + end + + def remove_tmp_foo + begin + File.unlink('tmp/foo') + rescue Errno::ENOENT + end + end +end diff --git a/sdk/cli/test/test_arv-put.rb b/sdk/cli/test/test_arv-keep-put.rb similarity index 99% rename from sdk/cli/test/test_arv-put.rb rename to sdk/cli/test/test_arv-keep-put.rb index 2f20e18440..fefbc27298 100644 --- a/sdk/cli/test/test_arv-put.rb +++ b/sdk/cli/test/test_arv-keep-put.rb @@ -1,7 +1,7 @@ require 'minitest/autorun' require 'digest/md5' -class TestArvPut < Minitest::Test +class TestArvKeepPut < Minitest::Test def setup begin Dir.mkdir './tmp' rescue Errno::EEXIST end begin Dir.mkdir './tmp/empty_dir' rescue Errno::EEXIST end diff --git a/sdk/go/arvadosclient/arvadosclient_test.go b/sdk/go/arvadosclient/arvadosclient_test.go index d35f6dacb7..2c508dcb4a 100644 --- a/sdk/go/arvadosclient/arvadosclient_test.go +++ b/sdk/go/arvadosclient/arvadosclient_test.go @@ -21,7 +21,7 @@ type ServerRequiredSuite struct{} func (s *ServerRequiredSuite) SetUpSuite(c *C) { arvadostest.StartAPI() - arvadostest.StartKeep() + arvadostest.StartKeep(2, false) } func (s *ServerRequiredSuite) SetUpTest(c *C) { diff --git a/sdk/go/arvadostest/run_servers.go b/sdk/go/arvadostest/run_servers.go index cad16917db..27c552a4e1 100644 --- a/sdk/go/arvadostest/run_servers.go +++ b/sdk/go/arvadostest/run_servers.go @@ -9,6 +9,7 @@ import ( "log" "os" "os/exec" + "strconv" "strings" ) @@ -98,12 +99,21 @@ func StopAPI() { exec.Command("python", "run_test_server.py", "stop").Run() } -func StartKeep() { +// StartKeep starts the given number of keep servers, +// optionally with -enforce-permissions enabled. +// Use numKeepServers = 2 and enforcePermissions = false under all normal circumstances. +func StartKeep(numKeepServers int, enforcePermissions bool) { cwd, _ := os.Getwd() defer os.Chdir(cwd) chdirToPythonTests() - cmd := exec.Command("python", "run_test_server.py", "start_keep") + cmdArgs := []string{"run_test_server.py", "start_keep", "--num-keep-servers", strconv.Itoa(numKeepServers)} + if enforcePermissions { + cmdArgs = append(cmdArgs, "--keep-enforce-permissions") + } + + cmd := exec.Command("python", cmdArgs...) + stderr, err := cmd.StderrPipe() if err != nil { log.Fatalf("Setting up stderr pipe: %s", err) @@ -114,10 +124,13 @@ func StartKeep() { } } -func StopKeep() { +// StopKeep stops keep servers that were started with StartKeep. +// numkeepServers should be the same value that was passed to StartKeep, +// which is 2 under all normal circumstances. +func StopKeep(numKeepServers int) { cwd, _ := os.Getwd() defer os.Chdir(cwd) chdirToPythonTests() - exec.Command("python", "run_test_server.py", "stop_keep").Run() + exec.Command("python", "run_test_server.py", "stop_keep", "--num-keep-servers", strconv.Itoa(numKeepServers)) } diff --git a/sdk/go/keepclient/keepclient.go b/sdk/go/keepclient/keepclient.go index 8b7cf419ea..67c304deaf 100644 --- a/sdk/go/keepclient/keepclient.go +++ b/sdk/go/keepclient/keepclient.go @@ -13,7 +13,6 @@ import ( "io/ioutil" "log" "net/http" - "os" "regexp" "strconv" "strings" @@ -49,24 +48,39 @@ type KeepClient struct { gatewayRoots *map[string]string lock sync.RWMutex Client *http.Client + Retries int // set to 1 if all writable services are of disk type, otherwise 0 replicasPerService int } -// Create a new KeepClient. This will contact the API server to discover Keep -// servers. +// MakeKeepClient creates a new KeepClient by contacting the API server to discover Keep servers. func MakeKeepClient(arv *arvadosclient.ArvadosClient) (*KeepClient, error) { - var matchTrue = regexp.MustCompile("^(?i:1|yes|true)$") - insecure := matchTrue.MatchString(os.Getenv("ARVADOS_API_HOST_INSECURE")) + kc := New(arv) + return kc, kc.DiscoverKeepServers() +} + +// New func creates a new KeepClient struct. +// This func does not discover keep servers. It is the caller's responsibility. +func New(arv *arvadosclient.ArvadosClient) *KeepClient { + defaultReplicationLevel := 2 + value, err := arv.Discovery("defaultCollectionReplication") + if err == nil { + v, ok := value.(float64) + if ok && v > 0 { + defaultReplicationLevel = int(v) + } + } + kc := &KeepClient{ Arvados: arv, - Want_replicas: 2, + Want_replicas: defaultReplicationLevel, Using_proxy: false, Client: &http.Client{Transport: &http.Transport{ - TLSClientConfig: &tls.Config{InsecureSkipVerify: insecure}}}, + TLSClientConfig: &tls.Config{InsecureSkipVerify: arv.ApiInsecure}}}, + Retries: 2, } - return kc, kc.DiscoverKeepServers() + return kc } // Put a block given the block hash, a reader, and the number of bytes @@ -127,6 +141,69 @@ func (kc *KeepClient) PutR(r io.Reader) (locator string, replicas int, err error } } +func (kc *KeepClient) getOrHead(method string, locator string) (io.ReadCloser, int64, string, error) { + var errs []string + + tries_remaining := 1 + kc.Retries + serversToTry := kc.getSortedRoots(locator) + var retryList []string + + for tries_remaining > 0 { + tries_remaining -= 1 + retryList = nil + + for _, host := range serversToTry { + url := host + "/" + locator + + req, err := http.NewRequest(method, url, nil) + if err != nil { + errs = append(errs, fmt.Sprintf("%s: %v", url, err)) + continue + } + req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", kc.Arvados.ApiToken)) + resp, err := kc.Client.Do(req) + if err != nil { + // Probably a network error, may be transient, + // can try again. + errs = append(errs, fmt.Sprintf("%s: %v", url, err)) + retryList = append(retryList, host) + } else if resp.StatusCode != http.StatusOK { + var respbody []byte + respbody, _ = ioutil.ReadAll(&io.LimitedReader{resp.Body, 4096}) + resp.Body.Close() + errs = append(errs, fmt.Sprintf("%s: HTTP %d %q", + url, resp.StatusCode, bytes.TrimSpace(respbody))) + + if resp.StatusCode == 408 || + resp.StatusCode == 429 || + resp.StatusCode >= 500 { + // Timeout, too many requests, or other + // server side failure, transient + // error, can try again. + retryList = append(retryList, host) + } + } else { + // Success. + if method == "GET" { + return HashCheckingReader{ + Reader: resp.Body, + Hash: md5.New(), + Check: locator[0:32], + }, resp.ContentLength, url, nil + } else { + resp.Body.Close() + return nil, resp.ContentLength, url, nil + } + } + + } + serversToTry = retryList + } + log.Printf("DEBUG: %s %s failed: %v", method, locator, errs) + + return nil, 0, "", BlockNotFound +} + // Get() retrieves a block, given a locator. Returns a reader, the // expected data length, the URL the block is being fetched from, and // an error. @@ -135,34 +212,7 @@ func (kc *KeepClient) PutR(r io.Reader) (locator string, replicas int, err error // reader returned by this method will return a BadChecksum error // instead of EOF. func (kc *KeepClient) Get(locator string) (io.ReadCloser, int64, string, error) { - var errs []string - for _, host := range kc.getSortedRoots(locator) { - url := host + "/" + locator - req, err := http.NewRequest("GET", url, nil) - if err != nil { - errs = append(errs, fmt.Sprintf("%s: %v", url, err)) - continue - } - req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", kc.Arvados.ApiToken)) - resp, err := kc.Client.Do(req) - if err != nil { - errs = append(errs, fmt.Sprintf("%s: %v", url, err)) - continue - } else if resp.StatusCode != http.StatusOK { - respbody, _ := ioutil.ReadAll(&io.LimitedReader{resp.Body, 4096}) - resp.Body.Close() - errs = append(errs, fmt.Sprintf("%s: HTTP %d %q", - url, resp.StatusCode, bytes.TrimSpace(respbody))) - continue - } - return HashCheckingReader{ - Reader: resp.Body, - Hash: md5.New(), - Check: locator[0:32], - }, resp.ContentLength, url, nil - } - log.Printf("DEBUG: GET %s failed: %v", locator, errs) - return nil, 0, "", BlockNotFound + return kc.getOrHead("GET", locator) } // Ask() verifies that a block with the given hash is available and @@ -173,18 +223,8 @@ func (kc *KeepClient) Get(locator string) (io.ReadCloser, int64, string, error) // Returns the data size (content length) reported by the Keep service // and the URI reporting the data size. func (kc *KeepClient) Ask(locator string) (int64, string, error) { - for _, host := range kc.getSortedRoots(locator) { - url := host + "/" + locator - req, err := http.NewRequest("HEAD", url, nil) - if err != nil { - continue - } - req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", kc.Arvados.ApiToken)) - if resp, err := kc.Client.Do(req); err == nil && resp.StatusCode == http.StatusOK { - return resp.ContentLength, url, nil - } - } - return 0, "", BlockNotFound + _, size, url, err := kc.getOrHead("HEAD", locator) + return size, url, err } // GetIndex retrieves a list of blocks stored on the given server whose hashes diff --git a/sdk/go/keepclient/keepclient_test.go b/sdk/go/keepclient/keepclient_test.go index ee60d2822f..aaba0695f0 100644 --- a/sdk/go/keepclient/keepclient_test.go +++ b/sdk/go/keepclient/keepclient_test.go @@ -45,14 +45,14 @@ func (s *ServerRequiredSuite) SetUpSuite(c *C) { return } arvadostest.StartAPI() - arvadostest.StartKeep() + arvadostest.StartKeep(2, false) } func (s *ServerRequiredSuite) TearDownSuite(c *C) { if *no_server { return } - arvadostest.StopKeep() + arvadostest.StopKeep(2) arvadostest.StopAPI() } @@ -69,6 +69,22 @@ func (s *ServerRequiredSuite) TestMakeKeepClient(c *C) { } } +func (s *ServerRequiredSuite) TestDefaultReplications(c *C) { + arv, err := arvadosclient.MakeArvadosClient() + c.Assert(err, Equals, nil) + + kc, err := MakeKeepClient(&arv) + c.Assert(kc.Want_replicas, Equals, 2) + + arv.DiscoveryDoc["defaultCollectionReplication"] = 3.0 + kc, err = MakeKeepClient(&arv) + c.Assert(kc.Want_replicas, Equals, 3) + + arv.DiscoveryDoc["defaultCollectionReplication"] = 1.0 + kc, err = MakeKeepClient(&arv) + c.Assert(kc.Want_replicas, Equals, 1) +} + type StubPutHandler struct { c *C expectPath string @@ -184,6 +200,31 @@ func (fh FailHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { fh.handled <- fmt.Sprintf("http://%s", req.Host) } +type FailThenSucceedHandler struct { + handled chan string + count int + successhandler StubGetHandler +} + +func (fh *FailThenSucceedHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { + if fh.count == 0 { + resp.WriteHeader(500) + fh.count += 1 + fh.handled <- fmt.Sprintf("http://%s", req.Host) + } else { + fh.successhandler.ServeHTTP(resp, req) + } +} + +type Error404Handler struct { + handled chan string +} + +func (fh Error404Handler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { + resp.WriteHeader(404) + fh.handled <- fmt.Sprintf("http://%s", req.Host) +} + func (s *StandaloneSuite) TestFailedUploadToStubKeepServer(c *C) { log.Printf("TestFailedUploadToStubKeepServer") @@ -464,7 +505,7 @@ func (s *StandaloneSuite) TestGet(c *C) { arv, err := arvadosclient.MakeArvadosClient() kc, _ := MakeKeepClient(&arv) arv.ApiToken = "abc123" - kc.SetServiceRoots(map[string]string{"x": ks.url}, map[string]string{ks.url: ""}, nil) + kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil) r, n, url2, err := kc.Get(hash) defer r.Close() @@ -479,6 +520,26 @@ func (s *StandaloneSuite) TestGet(c *C) { log.Printf("TestGet done") } +func (s *StandaloneSuite) TestGet404(c *C) { + hash := fmt.Sprintf("%x", md5.Sum([]byte("foo"))) + + st := Error404Handler{make(chan string, 1)} + + ks := RunFakeKeepServer(st) + defer ks.listener.Close() + + arv, err := arvadosclient.MakeArvadosClient() + kc, _ := MakeKeepClient(&arv) + arv.ApiToken = "abc123" + kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil) + + r, n, url2, err := kc.Get(hash) + c.Check(err, Equals, BlockNotFound) + c.Check(n, Equals, int64(0)) + c.Check(url2, Equals, "") + c.Check(r, Equals, nil) +} + func (s *StandaloneSuite) TestGetFail(c *C) { hash := fmt.Sprintf("%x", md5.Sum([]byte("foo"))) @@ -490,7 +551,52 @@ func (s *StandaloneSuite) TestGetFail(c *C) { arv, err := arvadosclient.MakeArvadosClient() kc, _ := MakeKeepClient(&arv) arv.ApiToken = "abc123" - kc.SetServiceRoots(map[string]string{"x": ks.url}, map[string]string{ks.url: ""}, nil) + kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil) + + r, n, url2, err := kc.Get(hash) + c.Check(err, Equals, BlockNotFound) + c.Check(n, Equals, int64(0)) + c.Check(url2, Equals, "") + c.Check(r, Equals, nil) +} + +func (s *StandaloneSuite) TestGetFailRetry(c *C) { + hash := fmt.Sprintf("%x", md5.Sum([]byte("foo"))) + + st := &FailThenSucceedHandler{make(chan string, 1), 0, + StubGetHandler{ + c, + hash, + "abc123", + http.StatusOK, + []byte("foo")}} + + ks := RunFakeKeepServer(st) + defer ks.listener.Close() + + arv, err := arvadosclient.MakeArvadosClient() + kc, _ := MakeKeepClient(&arv) + arv.ApiToken = "abc123" + kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil) + + r, n, url2, err := kc.Get(hash) + defer r.Close() + c.Check(err, Equals, nil) + c.Check(n, Equals, int64(3)) + c.Check(url2, Equals, fmt.Sprintf("%s/%s", ks.url, hash)) + + content, err2 := ioutil.ReadAll(r) + c.Check(err2, Equals, nil) + c.Check(content, DeepEquals, []byte("foo")) +} + +func (s *StandaloneSuite) TestGetNetError(c *C) { + hash := fmt.Sprintf("%x", md5.Sum([]byte("foo"))) + + arv, err := arvadosclient.MakeArvadosClient() + kc, _ := MakeKeepClient(&arv) + arv.ApiToken = "abc123" + kc.SetServiceRoots(map[string]string{"x": "http://localhost:62222"}, nil, nil) r, n, url2, err := kc.Get(hash) c.Check(err, Equals, BlockNotFound) @@ -525,7 +631,7 @@ func (s *StandaloneSuite) TestGetWithServiceHint(c *C) { arv.ApiToken = "abc123" kc.SetServiceRoots( map[string]string{"x": ks0.url}, - map[string]string{"x": ks0.url}, + nil, map[string]string{uuid: ks.url}) r, n, uri, err := kc.Get(hash + "+K@" + uuid) @@ -572,11 +678,7 @@ func (s *StandaloneSuite) TestGetWithLocalServiceHint(c *C) { "zzzzz-bi6l4-xxxxxxxxxxxxxxx": ks0.url, "zzzzz-bi6l4-wwwwwwwwwwwwwww": ks0.url, uuid: ks.url}, - map[string]string{ - "zzzzz-bi6l4-yyyyyyyyyyyyyyy": ks0.url, - "zzzzz-bi6l4-xxxxxxxxxxxxxxx": ks0.url, - "zzzzz-bi6l4-wwwwwwwwwwwwwww": ks0.url, - uuid: ks.url}, + nil, map[string]string{ "zzzzz-bi6l4-yyyyyyyyyyyyyyy": ks0.url, "zzzzz-bi6l4-xxxxxxxxxxxxxxx": ks0.url, @@ -619,7 +721,7 @@ func (s *StandaloneSuite) TestGetWithServiceHintFailoverToLocals(c *C) { arv.ApiToken = "abc123" kc.SetServiceRoots( map[string]string{"zzzzz-bi6l4-keepdisk0000000": ksLocal.url}, - map[string]string{"zzzzz-bi6l4-keepdisk0000000": ksLocal.url}, + nil, map[string]string{uuid: ksGateway.url}) r, n, uri, err := kc.Get(hash + "+K@" + uuid) @@ -654,7 +756,7 @@ func (s *StandaloneSuite) TestChecksum(c *C) { arv, err := arvadosclient.MakeArvadosClient() kc, _ := MakeKeepClient(&arv) arv.ApiToken = "abc123" - kc.SetServiceRoots(map[string]string{"x": ks.url}, map[string]string{ks.url: ""}, nil) + kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil) r, n, _, err := kc.Get(barhash) _, err = ioutil.ReadAll(r) @@ -675,7 +777,7 @@ func (s *StandaloneSuite) TestGetWithFailures(c *C) { content := []byte("waz") hash := fmt.Sprintf("%x", md5.Sum(content)) - fh := FailHandler{ + fh := Error404Handler{ make(chan string, 4)} st := StubGetHandler{ @@ -981,7 +1083,7 @@ func (s *StandaloneSuite) TestGetIndexWithNoPrefix(c *C) { arv, err := arvadosclient.MakeArvadosClient() kc, _ := MakeKeepClient(&arv) arv.ApiToken = "abc123" - kc.SetServiceRoots(map[string]string{"x": ks.url}, map[string]string{ks.url: ""}, nil) + kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil) r, err := kc.GetIndex("x", "") c.Check(err, Equals, nil) @@ -1007,7 +1109,7 @@ func (s *StandaloneSuite) TestGetIndexWithPrefix(c *C) { arv, err := arvadosclient.MakeArvadosClient() kc, _ := MakeKeepClient(&arv) arv.ApiToken = "abc123" - kc.SetServiceRoots(map[string]string{"x": ks.url}, map[string]string{ks.url: ""}, nil) + kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil) r, err := kc.GetIndex("x", hash[0:3]) c.Check(err, Equals, nil) @@ -1033,7 +1135,7 @@ func (s *StandaloneSuite) TestGetIndexIncomplete(c *C) { arv, err := arvadosclient.MakeArvadosClient() kc, _ := MakeKeepClient(&arv) arv.ApiToken = "abc123" - kc.SetServiceRoots(map[string]string{"x": ks.url}, map[string]string{ks.url: ""}, nil) + kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil) _, err = kc.GetIndex("x", hash[0:3]) c.Check(err, Equals, ErrIncompleteIndex) @@ -1055,7 +1157,7 @@ func (s *StandaloneSuite) TestGetIndexWithNoSuchServer(c *C) { arv, err := arvadosclient.MakeArvadosClient() kc, _ := MakeKeepClient(&arv) arv.ApiToken = "abc123" - kc.SetServiceRoots(map[string]string{"x": ks.url}, map[string]string{ks.url: ""}, nil) + kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil) _, err = kc.GetIndex("y", hash[0:3]) c.Check(err, Equals, ErrNoSuchKeepServer) @@ -1075,7 +1177,7 @@ func (s *StandaloneSuite) TestGetIndexWithNoSuchPrefix(c *C) { arv, err := arvadosclient.MakeArvadosClient() kc, _ := MakeKeepClient(&arv) arv.ApiToken = "abc123" - kc.SetServiceRoots(map[string]string{"x": ks.url}, map[string]string{ks.url: ""}, nil) + kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil) r, err := kc.GetIndex("x", "abcd") c.Check(err, Equals, nil) diff --git a/sdk/go/keepclient/support.go b/sdk/go/keepclient/support.go index 51e3e082b6..0791d3cf85 100644 --- a/sdk/go/keepclient/support.go +++ b/sdk/go/keepclient/support.go @@ -2,6 +2,7 @@ package keepclient import ( "crypto/md5" + "encoding/json" "errors" "fmt" "git.curoverse.com/arvados.git/sdk/go/streamer" @@ -76,19 +77,38 @@ func (this *KeepClient) setClientSettingsDisk() { } } +type svcList struct { + Items []keepService `json:"items"` +} + // DiscoverKeepServers gets list of available keep services from api server func (this *KeepClient) DiscoverKeepServers() error { - type svcList struct { - Items []keepService `json:"items"` - } - var m svcList + var list svcList // Get keep services from api server - err := this.Arvados.Call("GET", "keep_services", "", "accessible", nil, &m) + err := this.Arvados.Call("GET", "keep_services", "", "accessible", nil, &list) if err != nil { return err } + return this.loadKeepServers(list) +} + +// LoadKeepServicesFromJSON gets list of available keep services from given JSON +func (this *KeepClient) LoadKeepServicesFromJSON(services string) error { + var list svcList + + // Load keep services from given json + dec := json.NewDecoder(strings.NewReader(services)) + if err := dec.Decode(&list); err != nil { + return err + } + + return this.loadKeepServers(list) +} + +// loadKeepServers +func (this *KeepClient) loadKeepServers(list svcList) error { listed := make(map[string]bool) localRoots := make(map[string]string) gatewayRoots := make(map[string]string) @@ -98,7 +118,7 @@ func (this *KeepClient) DiscoverKeepServers() error { this.replicasPerService = 1 this.Using_proxy = false - for _, service := range m.Items { + for _, service := range list.Items { scheme := "http" if service.SSL { scheme = "https" diff --git a/sdk/go/streamer/streamer_test.go b/sdk/go/streamer/streamer_test.go index 853d7d3035..80aeb26897 100644 --- a/sdk/go/streamer/streamer_test.go +++ b/sdk/go/streamer/streamer_test.go @@ -251,6 +251,7 @@ func (s *StandaloneSuite) TestTransferShortBuffer(c *C) { n, err := sr.Read(out) c.Check(n, Equals, 100) + c.Check(err, IsNil) n, err = sr.Read(out) c.Check(n, Equals, 0) diff --git a/sdk/go/streamer/transfer.go b/sdk/go/streamer/transfer.go index a4a194f69b..3f5f9344c5 100644 --- a/sdk/go/streamer/transfer.go +++ b/sdk/go/streamer/transfer.go @@ -249,9 +249,7 @@ func (this *AsyncStream) transfer(source_reader io.Reader) { } } } else { - if reader_status == io.EOF { - // no more reads expected, so this is ok - } else { + if reader_status == nil { // slices channel closed without signaling EOF reader_status = io.ErrUnexpectedEOF } diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py index 0754744241..8ed86fd79e 100644 --- a/sdk/python/arvados/keep.py +++ b/sdk/python/arvados/keep.py @@ -241,19 +241,34 @@ class KeepClient(object): Should be used in a "with" block. """ def __init__(self, todo): + self._started = 0 self._todo = todo self._done = 0 self._response = None + self._start_lock = threading.Condition() self._todo_lock = threading.Semaphore(todo) self._done_lock = threading.Lock() + self._local = threading.local() def __enter__(self): + self._start_lock.acquire() + if getattr(self._local, 'sequence', None) is not None: + # If the calling thread has used set_sequence(N), then + # we wait here until N other threads have started. + while self._started < self._local.sequence: + self._start_lock.wait() self._todo_lock.acquire() + self._started += 1 + self._start_lock.notifyAll() + self._start_lock.release() return self def __exit__(self, type, value, traceback): self._todo_lock.release() + def set_sequence(self, sequence): + self._local.sequence = sequence + def shall_i_proceed(self): """ Return true if the current thread should do stuff. Return @@ -517,7 +532,11 @@ class KeepClient(object): return self._success def run(self): - with self.args['thread_limiter'] as limiter: + limiter = self.args['thread_limiter'] + sequence = self.args['thread_sequence'] + if sequence is not None: + limiter.set_sequence(sequence) + with limiter: if not limiter.shall_i_proceed(): # My turn arrived, but the job has been done without # me. @@ -950,9 +969,10 @@ class KeepClient(object): thread_limiter = KeepClient.ThreadLimiter(1 if self.max_replicas_per_service is None else copies) loop = retry.RetryLoop(num_retries, self._check_loop_result, backoff_start=2) + thread_sequence = 0 for tries_left in loop: try: - local_roots = self.map_new_services( + sorted_roots = self.map_new_services( roots_map, locator, force_rebuild=(tries_left < num_retries), need_writable=True, **headers) except Exception as error: @@ -960,7 +980,8 @@ class KeepClient(object): continue threads = [] - for service_root, ks in roots_map.iteritems(): + for service_root, ks in [(root, roots_map[root]) + for root in sorted_roots]: if ks.finished(): continue t = KeepClient.KeepWriterThread( @@ -969,9 +990,11 @@ class KeepClient(object): data_hash=data_hash, service_root=service_root, thread_limiter=thread_limiter, - timeout=self.current_timeout(num_retries-tries_left)) + timeout=self.current_timeout(num_retries-tries_left), + thread_sequence=thread_sequence) t.start() threads.append(t) + thread_sequence += 1 for t in threads: t.join() loop.save_result((thread_limiter.done() >= copies, len(threads))) @@ -984,7 +1007,7 @@ class KeepClient(object): data_hash, loop.last_result())) else: service_errors = ((key, roots_map[key].last_result()['error']) - for key in local_roots + for key in sorted_roots if roots_map[key].last_result()['error']) raise arvados.errors.KeepWriteError( "failed to write {} (wanted {} copies but wrote {})".format( diff --git a/sdk/python/tests/nginx.conf b/sdk/python/tests/nginx.conf index 61966054a0..d8a207f2cf 100644 --- a/sdk/python/tests/nginx.conf +++ b/sdk/python/tests/nginx.conf @@ -3,7 +3,7 @@ error_log stderr info; # Yes, must be specified here _and_ cmdline events { } http { - access_log /dev/stderr combined; + access_log {{ACCESSLOG}} combined; upstream arv-git-http { server localhost:{{GITPORT}}; } diff --git a/sdk/python/tests/run_test_server.py b/sdk/python/tests/run_test_server.py index 5d0c42ad21..d325b4eb6e 100644 --- a/sdk/python/tests/run_test_server.py +++ b/sdk/python/tests/run_test_server.py @@ -3,6 +3,7 @@ from __future__ import print_function import argparse import atexit +import errno import httplib2 import os import pipes @@ -54,9 +55,7 @@ def find_server_pid(PID_PATH, wait=10): with open(PID_PATH, 'r') as f: server_pid = int(f.read()) good_pid = (os.kill(server_pid, 0) is None) - except IOError: - good_pid = False - except OSError: + except EnvironmentError: good_pid = False now = time.time() @@ -91,9 +90,7 @@ def kill_server_pid(pidfile, wait=10, passenger_root=False): os.getpgid(server_pid) time.sleep(0.1) now = time.time() - except IOError: - pass - except OSError: + except EnvironmentError: pass def find_available_port(): @@ -323,8 +320,8 @@ def _start_keep(n, keep_args): return port -def run_keep(blob_signing_key=None, enforce_permissions=False): - stop_keep() +def run_keep(blob_signing_key=None, enforce_permissions=False, num_servers=2): + stop_keep(num_servers) keep_args = {} if not blob_signing_key: @@ -344,12 +341,13 @@ def run_keep(blob_signing_key=None, enforce_permissions=False): host=os.environ['ARVADOS_API_HOST'], token=os.environ['ARVADOS_API_TOKEN'], insecure=True) + for d in api.keep_services().list().execute()['items']: api.keep_services().delete(uuid=d['uuid']).execute() for d in api.keep_disks().list().execute()['items']: api.keep_disks().delete(uuid=d['uuid']).execute() - for d in range(0, 2): + for d in range(0, num_servers): port = _start_keep(d, keep_args) svc = api.keep_services().create(body={'keep_service': { 'uuid': 'zzzzz-bi6l4-keepdisk{:07d}'.format(d), @@ -371,9 +369,9 @@ def _stop_keep(n): if os.path.exists(os.path.join(TEST_TMPDIR, "keep.blob_signing_key")): os.remove(os.path.join(TEST_TMPDIR, "keep.blob_signing_key")) -def stop_keep(): - _stop_keep(0) - _stop_keep(1) +def stop_keep(num_servers=2): + for n in range(0, num_servers): + _stop_keep(n) def run_keep_proxy(): if 'ARVADOS_TEST_PROXY_SERVICES' in os.environ: @@ -447,6 +445,7 @@ def run_nginx(): nginxconf['GITSSLPORT'] = find_available_port() nginxconf['SSLCERT'] = os.path.join(SERVICES_SRC_DIR, 'api', 'tmp', 'self-signed.pem') nginxconf['SSLKEY'] = os.path.join(SERVICES_SRC_DIR, 'api', 'tmp', 'self-signed.key') + nginxconf['ACCESSLOG'] = os.path.join(TEST_TMPDIR, 'nginx_access_log.fifo') conftemplatefile = os.path.join(MY_DIRNAME, 'nginx.conf') conffile = os.path.join(TEST_TMPDIR, 'nginx.conf') @@ -458,12 +457,23 @@ def run_nginx(): env = os.environ.copy() env['PATH'] = env['PATH']+':/sbin:/usr/sbin:/usr/local/sbin' + + try: + os.remove(nginxconf['ACCESSLOG']) + except OSError as error: + if error.errno != errno.ENOENT: + raise + + os.mkfifo(nginxconf['ACCESSLOG'], 0700) nginx = subprocess.Popen( ['nginx', '-g', 'error_log stderr info;', '-g', 'pid '+_pidfile('nginx')+';', '-c', conffile], env=env, stdin=open('/dev/null'), stdout=sys.stderr) + cat_access = subprocess.Popen( + ['cat', nginxconf['ACCESSLOG']], + stdout=sys.stderr) _setport('keepproxy-ssl', nginxconf['KEEPPROXYSSLPORT']) _setport('arv-git-httpd-ssl', nginxconf['GITSSLPORT']) @@ -595,6 +605,9 @@ if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument('action', type=str, help="one of {}".format(actions)) parser.add_argument('--auth', type=str, metavar='FIXTURE_NAME', help='Print authorization info for given api_client_authorizations fixture') + parser.add_argument('--num-keep-servers', metavar='int', type=int, default=2, help="Number of keep servers desired") + parser.add_argument('--keep-enforce-permissions', action="store_true", help="Enforce keep permissions") + args = parser.parse_args() if args.action not in actions: @@ -614,7 +627,7 @@ if __name__ == "__main__": elif args.action == 'stop': stop(force=('ARVADOS_TEST_API_HOST' not in os.environ)) elif args.action == 'start_keep': - run_keep() + run_keep(enforce_permissions=args.keep_enforce_permissions, num_servers=args.num_keep_servers) elif args.action == 'stop_keep': stop_keep() elif args.action == 'start_keep_proxy': diff --git a/sdk/python/tests/test_keep_client.py b/sdk/python/tests/test_keep_client.py index c44379bac7..90468924a6 100644 --- a/sdk/python/tests/test_keep_client.py +++ b/sdk/python/tests/test_keep_client.py @@ -332,39 +332,158 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock): mock.responses[0].getopt(pycurl.TIMEOUT_MS), int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]*1000)) - def test_probe_order_reference_set(self): + def check_no_services_error(self, verb, exc_class): + api_client = mock.MagicMock(name='api_client') + api_client.keep_services().accessible().execute.side_effect = ( + arvados.errors.ApiError) + keep_client = arvados.KeepClient(api_client=api_client) + with self.assertRaises(exc_class) as err_check: + getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0') + self.assertEqual(0, len(err_check.exception.request_errors())) + + def test_get_error_with_no_services(self): + self.check_no_services_error('get', arvados.errors.KeepReadError) + + def test_put_error_with_no_services(self): + self.check_no_services_error('put', arvados.errors.KeepWriteError) + + def check_errors_from_last_retry(self, verb, exc_class): + api_client = self.mock_keep_services(count=2) + req_mock = tutil.mock_keep_responses( + "retry error reporting test", 500, 500, 403, 403) + with req_mock, tutil.skip_sleep, \ + self.assertRaises(exc_class) as err_check: + keep_client = arvados.KeepClient(api_client=api_client) + getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0', + num_retries=3) + self.assertEqual([403, 403], [ + getattr(error, 'status_code', None) + for error in err_check.exception.request_errors().itervalues()]) + + def test_get_error_reflects_last_retry(self): + self.check_errors_from_last_retry('get', arvados.errors.KeepReadError) + + def test_put_error_reflects_last_retry(self): + self.check_errors_from_last_retry('put', arvados.errors.KeepWriteError) + + def test_put_error_does_not_include_successful_puts(self): + data = 'partial failure test' + data_loc = '{}+{}'.format(hashlib.md5(data).hexdigest(), len(data)) + api_client = self.mock_keep_services(count=3) + with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \ + self.assertRaises(arvados.errors.KeepWriteError) as exc_check: + keep_client = arvados.KeepClient(api_client=api_client) + keep_client.put(data) + self.assertEqual(2, len(exc_check.exception.request_errors())) + + def test_proxy_put_with_no_writable_services(self): + data = 'test with no writable services' + data_loc = '{}+{}'.format(hashlib.md5(data).hexdigest(), len(data)) + api_client = self.mock_keep_services(service_type='proxy', read_only=True, count=1) + with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \ + self.assertRaises(arvados.errors.KeepWriteError) as exc_check: + keep_client = arvados.KeepClient(api_client=api_client) + keep_client.put(data) + self.assertEqual(True, ("no Keep services available" in str(exc_check.exception))) + self.assertEqual(0, len(exc_check.exception.request_errors())) + + +@tutil.skip_sleep +class KeepClientRendezvousTestCase(unittest.TestCase, tutil.ApiClientMock): + + def setUp(self): # expected_order[i] is the probe order for # hash=md5(sprintf("%064x",i)) where there are 16 services # with uuid sprintf("anything-%015x",j) with j in 0..15. E.g., # the first probe for the block consisting of 64 "0" # characters is the service whose uuid is # "zzzzz-bi6l4-000000000000003", so expected_order[0][0]=='3'. - expected_order = [ + self.services = 16 + self.expected_order = [ list('3eab2d5fc9681074'), list('097dba52e648f1c3'), list('c5b4e023f8a7d691'), list('9d81c02e76a3bf54'), ] - hashes = [ - hashlib.md5("{:064x}".format(x)).hexdigest() - for x in range(len(expected_order))] - api_client = self.mock_keep_services(count=16) - keep_client = arvados.KeepClient(api_client=api_client) - for i, hash in enumerate(hashes): - roots = keep_client.weighted_service_roots(arvados.KeepLocator(hash)) + self.blocks = [ + "{:064x}".format(x) + for x in range(len(self.expected_order))] + self.hashes = [ + hashlib.md5(self.blocks[x]).hexdigest() + for x in range(len(self.expected_order))] + self.api_client = self.mock_keep_services(count=self.services) + self.keep_client = arvados.KeepClient(api_client=self.api_client) + + def test_weighted_service_roots_against_reference_set(self): + # Confirm weighted_service_roots() returns the correct order + for i, hash in enumerate(self.hashes): + roots = self.keep_client.weighted_service_roots(arvados.KeepLocator(hash)) got_order = [ re.search(r'//\[?keep0x([0-9a-f]+)', root).group(1) for root in roots] - self.assertEqual(expected_order[i], got_order) + self.assertEqual(self.expected_order[i], got_order) + + def test_get_probe_order_against_reference_set(self): + self._test_probe_order_against_reference_set( + lambda i: self.keep_client.get(self.hashes[i], num_retries=1)) + + def test_put_probe_order_against_reference_set(self): + # copies=1 prevents the test from being sensitive to races + # between writer threads. + self._test_probe_order_against_reference_set( + lambda i: self.keep_client.put(self.blocks[i], num_retries=1, copies=1)) + + def _test_probe_order_against_reference_set(self, op): + for i in range(len(self.blocks)): + with tutil.mock_keep_responses('', *[500 for _ in range(self.services*2)]) as mock, \ + self.assertRaises(arvados.errors.KeepRequestError): + op(i) + got_order = [ + re.search(r'//\[?keep0x([0-9a-f]+)', resp.getopt(pycurl.URL)).group(1) + for resp in mock.responses] + self.assertEqual(self.expected_order[i]*2, got_order) + + def test_put_probe_order_multiple_copies(self): + for copies in range(2, 4): + for i in range(len(self.blocks)): + with tutil.mock_keep_responses('', *[500 for _ in range(self.services*3)]) as mock, \ + self.assertRaises(arvados.errors.KeepWriteError): + self.keep_client.put(self.blocks[i], num_retries=2, copies=copies) + got_order = [ + re.search(r'//\[?keep0x([0-9a-f]+)', resp.getopt(pycurl.URL)).group(1) + for resp in mock.responses] + # With T threads racing to make requests, the position + # of a given server in the sequence of HTTP requests + # (got_order) cannot be more than T-1 positions + # earlier than that server's position in the reference + # probe sequence (expected_order). + # + # Loop invariant: we have accounted for +pos+ expected + # probes, either by seeing them in +got_order+ or by + # putting them in +pending+ in the hope of seeing them + # later. As long as +len(pending)= 0.1.20150615153458' -gem 'arvados-cli', '>= 0.1.20150128223752' +gem 'arvados-cli', '>= 0.1.20150605170031' # pg_power lets us use partial indexes in schema.rb in Rails 3 gem 'pg_power' diff --git a/services/api/Gemfile.lock b/services/api/Gemfile.lock index d671182a57..be4d4606ab 100644 --- a/services/api/Gemfile.lock +++ b/services/api/Gemfile.lock @@ -41,10 +41,10 @@ GEM google-api-client (~> 0.6.3, >= 0.6.3) json (~> 1.7, >= 1.7.7) jwt (>= 0.1.5, < 1.0.0) - arvados-cli (0.1.20150205181653) + arvados-cli (0.1.20150930141818) activesupport (~> 3.2, >= 3.2.13) andand (~> 1.3, >= 1.3.3) - arvados (~> 0.1, >= 0.1.20150615153458) + arvados (~> 0.1, >= 0.1.20150128223554) curb (~> 0.8) google-api-client (~> 0.6.3, >= 0.6.3) json (~> 1.7, >= 1.7.7) @@ -69,7 +69,7 @@ GEM coffee-script-source execjs coffee-script-source (1.7.0) - curb (0.8.6) + curb (0.8.8) daemon_controller (1.2.0) database_cleaner (1.2.0) erubis (2.7.0) @@ -228,7 +228,7 @@ DEPENDENCIES acts_as_api andand arvados (>= 0.1.20150615153458) - arvados-cli (>= 0.1.20150128223752) + arvados-cli (>= 0.1.20150605170031) coffee-rails (~> 3.2.0) database_cleaner factory_girl_rails diff --git a/services/datamanager/datamanager_test.go b/services/datamanager/datamanager_test.go index 3d9bb3da90..c2cb762d52 100644 --- a/services/datamanager/datamanager_test.go +++ b/services/datamanager/datamanager_test.go @@ -31,7 +31,7 @@ func SetupDataManagerTest(t *testing.T) { // start api and keep servers arvadostest.ResetEnv() arvadostest.StartAPI() - arvadostest.StartKeep() + arvadostest.StartKeep(2, false) arv = makeArvadosClient() @@ -54,7 +54,7 @@ func SetupDataManagerTest(t *testing.T) { } func TearDownDataManagerTest(t *testing.T) { - arvadostest.StopKeep() + arvadostest.StopKeep(2) arvadostest.StopAPI() } diff --git a/services/keepproxy/keepproxy_test.go b/services/keepproxy/keepproxy_test.go index 6fe8fe7ac3..7643e4b0fa 100644 --- a/services/keepproxy/keepproxy_test.go +++ b/services/keepproxy/keepproxy_test.go @@ -53,7 +53,7 @@ func closeListener() { func (s *ServerRequiredSuite) SetUpSuite(c *C) { arvadostest.StartAPI() - arvadostest.StartKeep() + arvadostest.StartKeep(2, false) } func (s *ServerRequiredSuite) SetUpTest(c *C) { @@ -61,7 +61,7 @@ func (s *ServerRequiredSuite) SetUpTest(c *C) { } func (s *ServerRequiredSuite) TearDownSuite(c *C) { - arvadostest.StopKeep() + arvadostest.StopKeep(2) arvadostest.StopAPI() } diff --git a/services/keepstore/azure_blob_volume.go b/services/keepstore/azure_blob_volume.go index 657c3151ca..e9fda2ab76 100644 --- a/services/keepstore/azure_blob_volume.go +++ b/services/keepstore/azure_blob_volume.go @@ -9,6 +9,7 @@ import ( "io/ioutil" "log" "os" + "regexp" "strings" "time" @@ -19,8 +20,8 @@ var ( azureStorageAccountName string azureStorageAccountKeyFile string azureStorageReplication int - azureWriteRaceInterval time.Duration = 15 * time.Second - azureWriteRacePollTime time.Duration = time.Second + azureWriteRaceInterval = 15 * time.Second + azureWriteRacePollTime = time.Second ) func readKeyFromFile(file string) (string, error) { @@ -96,6 +97,9 @@ type AzureBlobVolume struct { replication int } +// NewAzureBlobVolume returns a new AzureBlobVolume using the given +// client and container name. The replication argument specifies the +// replication level to report when writing data. func NewAzureBlobVolume(client storage.Client, containerName string, readonly bool, replication int) *AzureBlobVolume { return &AzureBlobVolume{ azClient: client, @@ -118,6 +122,12 @@ func (v *AzureBlobVolume) Check() error { return nil } +// Get reads a Keep block that has been stored as a block blob in the +// container. +// +// If the block is younger than azureWriteRaceInterval and is +// unexpectedly empty, assume a PutBlob operation is in progress, and +// wait for it to finish writing. func (v *AzureBlobVolume) Get(loc string) ([]byte, error) { var deadline time.Time haveDeadline := false @@ -169,6 +179,7 @@ func (v *AzureBlobVolume) get(loc string) ([]byte, error) { } } +// Compare the given data with existing stored data. func (v *AzureBlobVolume) Compare(loc string, expect []byte) error { rdr, err := v.bsClient.GetBlob(v.containerName, loc) if err != nil { @@ -178,6 +189,7 @@ func (v *AzureBlobVolume) Compare(loc string, expect []byte) error { return compareReaderWithBuf(rdr, expect, loc[:32]) } +// Put sotres a Keep block as a block blob in the container. func (v *AzureBlobVolume) Put(loc string, block []byte) error { if v.readonly { return MethodDisabledError @@ -185,6 +197,7 @@ func (v *AzureBlobVolume) Put(loc string, block []byte) error { return v.bsClient.CreateBlockBlobFromReader(v.containerName, loc, uint64(len(block)), bytes.NewReader(block)) } +// Touch updates the last-modified property of a block blob. func (v *AzureBlobVolume) Touch(loc string) error { if v.readonly { return MethodDisabledError @@ -194,6 +207,7 @@ func (v *AzureBlobVolume) Touch(loc string) error { }) } +// Mtime returns the last-modified property of a block blob. func (v *AzureBlobVolume) Mtime(loc string) (time.Time, error) { props, err := v.bsClient.GetBlobProperties(v.containerName, loc) if err != nil { @@ -202,6 +216,8 @@ func (v *AzureBlobVolume) Mtime(loc string) (time.Time, error) { return time.Parse(time.RFC1123, props.LastModified) } +// IndexTo writes a list of Keep blocks that are stored in the +// container. func (v *AzureBlobVolume) IndexTo(prefix string, writer io.Writer) error { params := storage.ListBlobsParameters{ Prefix: prefix, @@ -216,6 +232,9 @@ func (v *AzureBlobVolume) IndexTo(prefix string, writer io.Writer) error { if err != nil { return err } + if !v.isKeepBlock(b.Name) { + continue + } if b.Properties.ContentLength == 0 && t.Add(azureWriteRaceInterval).After(time.Now()) { // A new zero-length blob is probably // just a new non-empty blob that @@ -233,6 +252,7 @@ func (v *AzureBlobVolume) IndexTo(prefix string, writer io.Writer) error { } } +// Delete a Keep block. func (v *AzureBlobVolume) Delete(loc string) error { if v.readonly { return MethodDisabledError @@ -256,6 +276,7 @@ func (v *AzureBlobVolume) Delete(loc string) error { }) } +// Status returns a VolumeStatus struct with placeholder data. func (v *AzureBlobVolume) Status() *VolumeStatus { return &VolumeStatus{ DeviceNum: 1, @@ -264,14 +285,19 @@ func (v *AzureBlobVolume) Status() *VolumeStatus { } } +// String returns a volume label, including the container name. func (v *AzureBlobVolume) String() string { return fmt.Sprintf("azure-storage-container:%+q", v.containerName) } +// Writable returns true, unless the -readonly flag was on when the +// volume was added. func (v *AzureBlobVolume) Writable() bool { return !v.readonly } +// Replication returns the replication level of the container, as +// specified by the -azure-storage-replication argument. func (v *AzureBlobVolume) Replication() int { return v.replication } @@ -289,3 +315,8 @@ func (v *AzureBlobVolume) translateError(err error) error { return err } } + +var keepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`) +func (v *AzureBlobVolume) isKeepBlock(s string) bool { + return keepBlockRegexp.MatchString(s) +} diff --git a/services/keepstore/azure_blob_volume_test.go b/services/keepstore/azure_blob_volume_test.go index a4c6e62a7d..a240c23e16 100644 --- a/services/keepstore/azure_blob_volume_test.go +++ b/services/keepstore/azure_blob_volume_test.go @@ -60,11 +60,11 @@ func newAzStubHandler() *azStubHandler { } func (h *azStubHandler) TouchWithDate(container, hash string, t time.Time) { - if blob, ok := h.blobs[container+"|"+hash]; !ok { + blob, ok := h.blobs[container+"|"+hash] + if !ok { return - } else { - blob.Mtime = t } + blob.Mtime = t } func (h *azStubHandler) PutRaw(container, hash string, data []byte) { @@ -427,7 +427,7 @@ func TestAzureBlobVolumeCreateBlobRaceDeadline(t *testing.T) { azureWriteRaceInterval = 2 * time.Second azureWriteRacePollTime = 5 * time.Millisecond - v.PutRaw(TestHash, []byte{}) + v.PutRaw(TestHash, nil) buf := new(bytes.Buffer) v.IndexTo("", buf) diff --git a/services/keepstore/pull_worker_integration_test.go b/services/keepstore/pull_worker_integration_test.go index e0bad0045a..3a3069ab77 100644 --- a/services/keepstore/pull_worker_integration_test.go +++ b/services/keepstore/pull_worker_integration_test.go @@ -27,7 +27,7 @@ func SetupPullWorkerIntegrationTest(t *testing.T, testData PullWorkIntegrationTe // start api and keep servers arvadostest.StartAPI() - arvadostest.StartKeep() + arvadostest.StartKeep(2, false) // make arvadosclient arv, err := arvadosclient.MakeArvadosClient() diff --git a/services/keepstore/volume_generic_test.go b/services/keepstore/volume_generic_test.go index 7ef079f1f7..61088f10fa 100644 --- a/services/keepstore/volume_generic_test.go +++ b/services/keepstore/volume_generic_test.go @@ -350,6 +350,13 @@ func testIndexTo(t *testing.T, factory TestableVolumeFactory) { v.PutRaw(TestHash2, TestBlock2) v.PutRaw(TestHash3, TestBlock3) + // Blocks whose names aren't Keep hashes should be omitted from + // index + v.PutRaw("fffffffffnotreallyahashfffffffff", nil) + v.PutRaw("FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF", nil) + v.PutRaw("f0000000000000000000000000000000f", nil) + v.PutRaw("f00", nil) + buf := new(bytes.Buffer) v.IndexTo("", buf) indexRows := strings.Split(string(buf.Bytes()), "\n") diff --git a/services/keepstore/volume_unix.go b/services/keepstore/volume_unix.go index e745eb2691..910cc25d61 100644 --- a/services/keepstore/volume_unix.go +++ b/services/keepstore/volume_unix.go @@ -292,6 +292,7 @@ func (v *UnixVolume) Status() *VolumeStatus { } var blockDirRe = regexp.MustCompile(`^[0-9a-f]+$`) +var blockFileRe = regexp.MustCompile(`^[0-9a-f]{32}$`) // IndexTo writes (to the given Writer) a list of blocks found on this // volume which begin with the specified prefix. If the prefix is an @@ -348,6 +349,9 @@ func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error { if !strings.HasPrefix(name, prefix) { continue } + if !blockFileRe.MatchString(name) { + continue + } _, err = fmt.Fprint(w, name, "+", fileInfo[0].Size(), diff --git a/services/nodemanager/arvnodeman/computenode/dispatch/slurm.py b/services/nodemanager/arvnodeman/computenode/dispatch/slurm.py index dfb26bc303..ec5014e9f9 100644 --- a/services/nodemanager/arvnodeman/computenode/dispatch/slurm.py +++ b/services/nodemanager/arvnodeman/computenode/dispatch/slurm.py @@ -34,7 +34,15 @@ class ComputeNodeShutdownActor(ShutdownActorBase): def _get_slurm_state(self): return subprocess.check_output(['sinfo', '--noheader', '-o', '%t', '-n', self._nodename]) - @ShutdownActorBase._retry((subprocess.CalledProcessError,)) + # The following methods retry on OSError. This is intended to mitigate bug + # #6321 where fork() of node manager raises "OSError: [Errno 12] Cannot + # allocate memory" resulting in the untimely death of the shutdown actor + # and tends to result in node manager getting into a wedged state where it + # won't allocate new nodes or shut down gracefully. The underlying causes + # of the excessive memory usage that result in the "Cannot allocate memory" + # error are still being investigated. + + @ShutdownActorBase._retry((subprocess.CalledProcessError, OSError)) def cancel_shutdown(self): if self._nodename: if self._get_slurm_state() in self.SLURM_DRAIN_STATES: @@ -46,15 +54,15 @@ class ComputeNodeShutdownActor(ShutdownActorBase): pass return super(ComputeNodeShutdownActor, self).cancel_shutdown() + @ShutdownActorBase._retry((subprocess.CalledProcessError, OSError)) @ShutdownActorBase._stop_if_window_closed - @ShutdownActorBase._retry((subprocess.CalledProcessError,)) def issue_slurm_drain(self): self._set_node_state('DRAIN', 'Reason=Node Manager shutdown') self._logger.info("Waiting for SLURM node %s to drain", self._nodename) self._later.await_slurm_drain() + @ShutdownActorBase._retry((subprocess.CalledProcessError, OSError)) @ShutdownActorBase._stop_if_window_closed - @ShutdownActorBase._retry((subprocess.CalledProcessError,)) def await_slurm_drain(self): output = self._get_slurm_state() if output in self.SLURM_END_STATES: diff --git a/services/nodemanager/tests/test_computenode_dispatch_slurm.py b/services/nodemanager/tests/test_computenode_dispatch_slurm.py index 2ddf7676c8..8648783bac 100644 --- a/services/nodemanager/tests/test_computenode_dispatch_slurm.py +++ b/services/nodemanager/tests/test_computenode_dispatch_slurm.py @@ -73,6 +73,15 @@ class SLURMComputeNodeShutdownActorTestCase(ComputeNodeShutdownActorMixin, self.check_success_flag(False, 2) self.check_slurm_got_args(proc_mock, 'sinfo', '--noheader', '-o', '%t', '-n', 'compute99') + def test_cancel_shutdown_retry(self, proc_mock): + proc_mock.side_effect = iter([OSError, 'drain\n', OSError, 'idle\n']) + self.make_mocks(arvados_node=testutil.arvados_node_mock(job_uuid=True)) + self.make_actor() + self.check_success_flag(False, 2) + + def test_issue_slurm_drain_retry(self, proc_mock): + proc_mock.side_effect = iter([OSError, '', OSError, 'drng\n']) + self.check_success_after_reset(proc_mock) def test_arvados_node_cleaned_after_shutdown(self, proc_mock): proc_mock.return_value = 'drain\n' diff --git a/tools/keep-exercise/.gitignore b/tools/keep-exercise/.gitignore new file mode 100644 index 0000000000..6a1d10c494 --- /dev/null +++ b/tools/keep-exercise/.gitignore @@ -0,0 +1 @@ +keep-exercise diff --git a/tools/keep-exercise/keep-exercise.go b/tools/keep-exercise/keep-exercise.go new file mode 100644 index 0000000000..a94c01e55b --- /dev/null +++ b/tools/keep-exercise/keep-exercise.go @@ -0,0 +1,157 @@ +// Testing tool for Keep services. +// +// keepexercise helps measure throughput and test reliability under +// various usage patterns. +// +// By default, it reads and writes blocks containing 2^26 NUL +// bytes. This generates network traffic without consuming much disk +// space. +// +// For a more realistic test, enable -vary-request. Warning: this will +// fill your storage volumes with random data if you leave it running, +// which can cost you money or leave you with too little room for +// useful data. +// +package main + +import ( + "crypto/rand" + "encoding/binary" + "flag" + "io" + "io/ioutil" + "log" + "time" + + "git.curoverse.com/arvados.git/sdk/go/arvadosclient" + "git.curoverse.com/arvados.git/sdk/go/keepclient" +) + +// Command line config knobs +var ( + BlockSize = flag.Int("block-size", keepclient.BLOCKSIZE, "bytes per read/write op") + ReadThreads = flag.Int("rthreads", 1, "number of concurrent readers") + WriteThreads = flag.Int("wthreads", 1, "number of concurrent writers") + VaryRequest = flag.Bool("vary-request", false, "vary the data for each request: consumes disk space, exercises write behavior") + VaryThread = flag.Bool("vary-thread", false, "use -wthreads different data blocks") + Replicas = flag.Int("replicas", 1, "replication level for writing") + StatsInterval = flag.Duration("stats-interval", time.Second, "time interval between IO stats reports, or 0 to disable") +) + +func main() { + flag.Parse() + + arv, err := arvadosclient.MakeArvadosClient() + if err != nil { + log.Fatal(err) + } + kc, err := keepclient.MakeKeepClient(&arv) + if err != nil { + log.Fatal(err) + } + kc.Want_replicas = *Replicas + kc.Client.Timeout = 10 * time.Minute + + nextBuf := make(chan []byte, *WriteThreads) + nextLocator := make(chan string, *ReadThreads+*WriteThreads) + + go countBeans(nextLocator) + for i := 0; i < *WriteThreads; i++ { + go makeBufs(nextBuf, i) + go doWrites(kc, nextBuf, nextLocator) + } + for i := 0; i < *ReadThreads; i++ { + go doReads(kc, nextLocator) + } + <-make(chan struct{}) +} + +// Send 1234 to bytesInChan when we receive 1234 bytes from keepstore. +var bytesInChan = make(chan uint64) +var bytesOutChan = make(chan uint64) + +// Send struct{}{} to errorsChan when an error happens. +var errorsChan = make(chan struct{}) + +func countBeans(nextLocator chan string) { + t0 := time.Now() + var tickChan <-chan time.Time + if *StatsInterval > 0 { + tickChan = time.NewTicker(*StatsInterval).C + } + var bytesIn uint64 + var bytesOut uint64 + var errors uint64 + for { + select { + case <-tickChan: + elapsed := time.Since(t0) + log.Printf("%v elapsed: read %v bytes (%.1f MiB/s), wrote %v bytes (%.1f MiB/s), errors %d", + elapsed, + bytesIn, (float64(bytesIn) / elapsed.Seconds() / 1048576), + bytesOut, (float64(bytesOut) / elapsed.Seconds() / 1048576), + errors, + ) + case i := <-bytesInChan: + bytesIn += i + case o := <-bytesOutChan: + bytesOut += o + case <-errorsChan: + errors++ + } + } +} + +func makeBufs(nextBuf chan []byte, threadID int) { + buf := make([]byte, *BlockSize) + if *VaryThread { + binary.PutVarint(buf, int64(threadID)) + } + for { + if *VaryRequest { + if _, err := io.ReadFull(rand.Reader, buf); err != nil { + log.Fatal(err) + } + } + nextBuf <- buf + } +} + +func doWrites(kc *keepclient.KeepClient, nextBuf chan []byte, nextLocator chan string) { + for buf := range nextBuf { + locator, _, err := kc.PutB(buf) + if err != nil { + log.Print(err) + errorsChan <- struct{}{} + continue + } + bytesOutChan <- uint64(len(buf)) + for cap(nextLocator) > len(nextLocator)+*WriteThreads { + // Give the readers something to do, unless + // they have lots queued up already. + nextLocator <- locator + } + } +} + +func doReads(kc *keepclient.KeepClient, nextLocator chan string) { + for locator := range nextLocator { + rdr, size, url, err := kc.Get(locator) + if err != nil { + log.Print(err) + errorsChan <- struct{}{} + continue + } + n, err := io.Copy(ioutil.Discard, rdr) + rdr.Close() + if n != size || err != nil { + log.Printf("Got %d bytes (expected %d) from %s: %v", n, size, url, err) + errorsChan <- struct{}{} + continue + // Note we don't count the bytes received in + // partial/corrupt responses: we are measuring + // throughput, not resource consumption. + } + bytesInChan <- uint64(n) + } +} diff --git a/tools/keep-rsync/.gitignore b/tools/keep-rsync/.gitignore new file mode 100644 index 0000000000..5ee7f3b61a --- /dev/null +++ b/tools/keep-rsync/.gitignore @@ -0,0 +1 @@ +keep-rsync diff --git a/tools/keep-rsync/keep-rsync.go b/tools/keep-rsync/keep-rsync.go new file mode 100644 index 0000000000..7cd795ecf3 --- /dev/null +++ b/tools/keep-rsync/keep-rsync.go @@ -0,0 +1,290 @@ +package main + +import ( + "bufio" + "crypto/tls" + "errors" + "flag" + "fmt" + "git.curoverse.com/arvados.git/sdk/go/arvadosclient" + "git.curoverse.com/arvados.git/sdk/go/keepclient" + "io/ioutil" + "log" + "net/http" + "os" + "regexp" + "strings" + "time" +) + +func main() { + err := doMain() + if err != nil { + log.Fatalf("%v", err) + } +} + +func doMain() error { + flags := flag.NewFlagSet("keep-rsync", flag.ExitOnError) + + srcConfigFile := flags.String( + "src", + "", + "Source configuration filename. May be either a pathname to a config file, or (for example) 'foo' as shorthand for $HOME/.config/arvados/foo.conf") + + dstConfigFile := flags.String( + "dst", + "", + "Destination configuration filename. May be either a pathname to a config file, or (for example) 'foo' as shorthand for $HOME/.config/arvados/foo.conf") + + srcKeepServicesJSON := flags.String( + "src-keep-services-json", + "", + "An optional list of available source keepservices. "+ + "If not provided, this list is obtained from api server configured in src-config-file.") + + dstKeepServicesJSON := flags.String( + "dst-keep-services-json", + "", + "An optional list of available destination keepservices. "+ + "If not provided, this list is obtained from api server configured in dst-config-file.") + + replications := flags.Int( + "replications", + 0, + "Number of replications to write to the destination. If replications not specified, "+ + "default replication level configured on destination server will be used.") + + prefix := flags.String( + "prefix", + "", + "Index prefix") + + // Parse args; omit the first arg which is the command name + flags.Parse(os.Args[1:]) + + srcConfig, srcBlobSigningKey, err := loadConfig(*srcConfigFile) + if err != nil { + return fmt.Errorf("Error loading src configuration from file: %s", err.Error()) + } + + dstConfig, _, err := loadConfig(*dstConfigFile) + if err != nil { + return fmt.Errorf("Error loading dst configuration from file: %s", err.Error()) + } + + // setup src and dst keepclients + kcSrc, err := setupKeepClient(srcConfig, *srcKeepServicesJSON, false, 0) + if err != nil { + return fmt.Errorf("Error configuring src keepclient: %s", err.Error()) + } + + kcDst, err := setupKeepClient(dstConfig, *dstKeepServicesJSON, true, *replications) + if err != nil { + return fmt.Errorf("Error configuring dst keepclient: %s", err.Error()) + } + + // Copy blocks not found in dst from src + err = performKeepRsync(kcSrc, kcDst, srcBlobSigningKey, *prefix) + if err != nil { + return fmt.Errorf("Error while syncing data: %s", err.Error()) + } + + return nil +} + +type apiConfig struct { + APIToken string + APIHost string + APIHostInsecure bool + ExternalClient bool +} + +// Load src and dst config from given files +func loadConfig(configFile string) (config apiConfig, blobSigningKey string, err error) { + if configFile == "" { + return config, blobSigningKey, errors.New("config file not specified") + } + + config, blobSigningKey, err = readConfigFromFile(configFile) + if err != nil { + return config, blobSigningKey, fmt.Errorf("Error reading config file: %v", err) + } + + return +} + +var matchTrue = regexp.MustCompile("^(?i:1|yes|true)$") + +// Read config from file +func readConfigFromFile(filename string) (config apiConfig, blobSigningKey string, err error) { + if !strings.Contains(filename, "/") { + filename = os.Getenv("HOME") + "/.config/arvados/" + filename + ".conf" + } + + content, err := ioutil.ReadFile(filename) + + if err != nil { + return config, "", err + } + + lines := strings.Split(string(content), "\n") + for _, line := range lines { + if line == "" { + continue + } + + kv := strings.SplitN(line, "=", 2) + key := strings.TrimSpace(kv[0]) + value := strings.TrimSpace(kv[1]) + + switch key { + case "ARVADOS_API_TOKEN": + config.APIToken = value + case "ARVADOS_API_HOST": + config.APIHost = value + case "ARVADOS_API_HOST_INSECURE": + config.APIHostInsecure = matchTrue.MatchString(value) + case "ARVADOS_EXTERNAL_CLIENT": + config.ExternalClient = matchTrue.MatchString(value) + case "ARVADOS_BLOB_SIGNING_KEY": + blobSigningKey = value + } + } + return +} + +// setup keepclient using the config provided +func setupKeepClient(config apiConfig, keepServicesJSON string, isDst bool, replications int) (kc *keepclient.KeepClient, err error) { + arv := arvadosclient.ArvadosClient{ + ApiToken: config.APIToken, + ApiServer: config.APIHost, + ApiInsecure: config.APIHostInsecure, + Client: &http.Client{Transport: &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: config.APIHostInsecure}}}, + External: config.ExternalClient, + } + + // if keepServicesJSON is provided, use it to load services; else, use DiscoverKeepServers + if keepServicesJSON == "" { + kc, err = keepclient.MakeKeepClient(&arv) + if err != nil { + return nil, err + } + } else { + kc = keepclient.New(&arv) + err = kc.LoadKeepServicesFromJSON(keepServicesJSON) + if err != nil { + return kc, err + } + } + + if isDst { + // Get default replications value from destination, if it is not already provided + if replications == 0 { + value, err := arv.Discovery("defaultCollectionReplication") + if err == nil { + replications = int(value.(float64)) + } else { + return nil, err + } + } + + kc.Want_replicas = replications + } + + return kc, nil +} + +// Get unique block locators from src and dst +// Copy any blocks missing in dst +func performKeepRsync(kcSrc, kcDst *keepclient.KeepClient, blobSigningKey, prefix string) error { + // Get unique locators from src + srcIndex, err := getUniqueLocators(kcSrc, prefix) + if err != nil { + return err + } + + // Get unique locators from dst + dstIndex, err := getUniqueLocators(kcDst, prefix) + if err != nil { + return err + } + + // Get list of locators found in src, but missing in dst + toBeCopied := getMissingLocators(srcIndex, dstIndex) + + // Copy each missing block to dst + log.Printf("Before keep-rsync, there are %d blocks in src and %d blocks in dst. Start copying %d blocks from src not found in dst.", + len(srcIndex), len(dstIndex), len(toBeCopied)) + + err = copyBlocksToDst(toBeCopied, kcSrc, kcDst, blobSigningKey) + + return err +} + +// Get list of unique locators from the specified cluster +func getUniqueLocators(kc *keepclient.KeepClient, prefix string) (map[string]bool, error) { + uniqueLocators := map[string]bool{} + + // Get index and dedup + for uuid := range kc.LocalRoots() { + reader, err := kc.GetIndex(uuid, prefix) + if err != nil { + return uniqueLocators, err + } + scanner := bufio.NewScanner(reader) + for scanner.Scan() { + uniqueLocators[strings.Split(scanner.Text(), " ")[0]] = true + } + } + + return uniqueLocators, nil +} + +// Get list of locators that are in src but not in dst +func getMissingLocators(srcLocators, dstLocators map[string]bool) []string { + var missingLocators []string + for locator := range srcLocators { + if _, ok := dstLocators[locator]; !ok { + missingLocators = append(missingLocators, locator) + } + } + return missingLocators +} + +// Copy blocks from src to dst; only those that are missing in dst are copied +func copyBlocksToDst(toBeCopied []string, kcSrc, kcDst *keepclient.KeepClient, blobSigningKey string) error { + total := len(toBeCopied) + + startedAt := time.Now() + for done, locator := range toBeCopied { + if done == 0 { + log.Printf("Copying data block %d of %d (%.2f%% done): %v", done+1, total, + float64(done)/float64(total)*100, locator) + } else { + timePerBlock := time.Since(startedAt) / time.Duration(done) + log.Printf("Copying data block %d of %d (%.2f%% done, %v est. time remaining): %v", done+1, total, + float64(done)/float64(total)*100, timePerBlock*time.Duration(total-done), locator) + } + + getLocator := locator + expiresAt := time.Now().AddDate(0, 0, 1) + if blobSigningKey != "" { + getLocator = keepclient.SignLocator(getLocator, kcSrc.Arvados.ApiToken, expiresAt, []byte(blobSigningKey)) + } + + reader, len, _, err := kcSrc.Get(getLocator) + if err != nil { + return fmt.Errorf("Error getting block: %v %v", locator, err) + } + + _, _, err = kcDst.PutHR(getLocator[:32], reader, len) + if err != nil { + return fmt.Errorf("Error copying data block: %v %v", locator, err) + } + } + + log.Printf("Successfully copied to destination %d blocks.", total) + return nil +} diff --git a/tools/keep-rsync/keep-rsync_test.go b/tools/keep-rsync/keep-rsync_test.go new file mode 100644 index 0000000000..6fbb535a03 --- /dev/null +++ b/tools/keep-rsync/keep-rsync_test.go @@ -0,0 +1,476 @@ +package main + +import ( + "crypto/md5" + "fmt" + "io/ioutil" + "os" + "strings" + "testing" + "time" + + "git.curoverse.com/arvados.git/sdk/go/arvadostest" + "git.curoverse.com/arvados.git/sdk/go/keepclient" + + . "gopkg.in/check.v1" +) + +// Gocheck boilerplate +func Test(t *testing.T) { + TestingT(t) +} + +// Gocheck boilerplate +var _ = Suite(&ServerRequiredSuite{}) +var _ = Suite(&ServerNotRequiredSuite{}) +var _ = Suite(&DoMainTestSuite{}) + +// Tests that require the Keep server running +type ServerRequiredSuite struct{} +type ServerNotRequiredSuite struct{} +type DoMainTestSuite struct{} + +func (s *ServerRequiredSuite) SetUpSuite(c *C) { + // Start API server + arvadostest.StartAPI() +} + +func (s *ServerRequiredSuite) TearDownSuite(c *C) { + arvadostest.StopAPI() + arvadostest.ResetEnv() +} + +var initialArgs []string + +func (s *DoMainTestSuite) SetUpSuite(c *C) { + initialArgs = os.Args +} + +var kcSrc, kcDst *keepclient.KeepClient +var srcKeepServicesJSON, dstKeepServicesJSON, blobSigningKey string + +func (s *ServerRequiredSuite) SetUpTest(c *C) { + // reset all variables between tests + blobSigningKey = "" + srcKeepServicesJSON = "" + dstKeepServicesJSON = "" + kcSrc = &keepclient.KeepClient{} + kcDst = &keepclient.KeepClient{} +} + +func (s *ServerRequiredSuite) TearDownTest(c *C) { + arvadostest.StopKeep(3) +} + +func (s *DoMainTestSuite) SetUpTest(c *C) { + args := []string{"keep-rsync"} + os.Args = args +} + +func (s *DoMainTestSuite) TearDownTest(c *C) { + os.Args = initialArgs +} + +var testKeepServicesJSON = "{ \"kind\":\"arvados#keepServiceList\", \"etag\":\"\", \"self_link\":\"\", \"offset\":null, \"limit\":null, \"items\":[ { \"href\":\"/keep_services/zzzzz-bi6l4-123456789012340\", \"kind\":\"arvados#keepService\", \"etag\":\"641234567890enhj7hzx432e5\", \"uuid\":\"zzzzz-bi6l4-123456789012340\", \"owner_uuid\":\"zzzzz-tpzed-123456789012345\", \"service_host\":\"keep0.zzzzz.arvadosapi.com\", \"service_port\":25107, \"service_ssl_flag\":false, \"service_type\":\"disk\", \"read_only\":false }, { \"href\":\"/keep_services/zzzzz-bi6l4-123456789012341\", \"kind\":\"arvados#keepService\", \"etag\":\"641234567890enhj7hzx432e5\", \"uuid\":\"zzzzz-bi6l4-123456789012341\", \"owner_uuid\":\"zzzzz-tpzed-123456789012345\", \"service_host\":\"keep0.zzzzz.arvadosapi.com\", \"service_port\":25108, \"service_ssl_flag\":false, \"service_type\":\"disk\", \"read_only\":false } ], \"items_available\":2 }" + +// Testing keep-rsync needs two sets of keep services: src and dst. +// The test setup hence creates 3 servers instead of the default 2, +// and uses the first 2 as src and the 3rd as dst keep servers. +func setupRsync(c *C, enforcePermissions bool, replications int) { + // srcConfig + var srcConfig apiConfig + srcConfig.APIHost = os.Getenv("ARVADOS_API_HOST") + srcConfig.APIToken = os.Getenv("ARVADOS_API_TOKEN") + srcConfig.APIHostInsecure = matchTrue.MatchString(os.Getenv("ARVADOS_API_HOST_INSECURE")) + + // dstConfig + var dstConfig apiConfig + dstConfig.APIHost = os.Getenv("ARVADOS_API_HOST") + dstConfig.APIToken = os.Getenv("ARVADOS_API_TOKEN") + dstConfig.APIHostInsecure = matchTrue.MatchString(os.Getenv("ARVADOS_API_HOST_INSECURE")) + + if enforcePermissions { + blobSigningKey = "zfhgfenhffzltr9dixws36j1yhksjoll2grmku38mi7yxd66h5j4q9w4jzanezacp8s6q0ro3hxakfye02152hncy6zml2ed0uc" + } + + // Start Keep servers + arvadostest.StartKeep(3, enforcePermissions) + + // setup keepclients + var err error + kcSrc, err = setupKeepClient(srcConfig, srcKeepServicesJSON, false, 0) + c.Check(err, IsNil) + + kcDst, err = setupKeepClient(dstConfig, dstKeepServicesJSON, true, replications) + c.Check(err, IsNil) + + for uuid := range kcSrc.LocalRoots() { + if strings.HasSuffix(uuid, "02") { + delete(kcSrc.LocalRoots(), uuid) + } + } + for uuid := range kcSrc.GatewayRoots() { + if strings.HasSuffix(uuid, "02") { + delete(kcSrc.GatewayRoots(), uuid) + } + } + for uuid := range kcSrc.WritableLocalRoots() { + if strings.HasSuffix(uuid, "02") { + delete(kcSrc.WritableLocalRoots(), uuid) + } + } + + for uuid := range kcDst.LocalRoots() { + if strings.HasSuffix(uuid, "00") || strings.HasSuffix(uuid, "01") { + delete(kcDst.LocalRoots(), uuid) + } + } + for uuid := range kcDst.GatewayRoots() { + if strings.HasSuffix(uuid, "00") || strings.HasSuffix(uuid, "01") { + delete(kcDst.GatewayRoots(), uuid) + } + } + for uuid := range kcDst.WritableLocalRoots() { + if strings.HasSuffix(uuid, "00") || strings.HasSuffix(uuid, "01") { + delete(kcDst.WritableLocalRoots(), uuid) + } + } + + if replications == 0 { + // Must have got default replications value of 2 from dst discovery document + c.Assert(kcDst.Want_replicas, Equals, 2) + } else { + // Since replications value is provided, it is used + c.Assert(kcDst.Want_replicas, Equals, replications) + } +} + +func (s *ServerRequiredSuite) TestRsyncPutInOne_GetFromOtherShouldFail(c *C) { + setupRsync(c, false, 1) + + // Put a block in src and verify that it is not found in dst + testNoCrosstalk(c, "test-data-1", kcSrc, kcDst) + + // Put a block in dst and verify that it is not found in src + testNoCrosstalk(c, "test-data-2", kcDst, kcSrc) +} + +func (s *ServerRequiredSuite) TestRsyncWithBlobSigning_PutInOne_GetFromOtherShouldFail(c *C) { + setupRsync(c, true, 1) + + // Put a block in src and verify that it is not found in dst + testNoCrosstalk(c, "test-data-1", kcSrc, kcDst) + + // Put a block in dst and verify that it is not found in src + testNoCrosstalk(c, "test-data-2", kcDst, kcSrc) +} + +// Do a Put in the first and Get from the second, +// which should raise block not found error. +func testNoCrosstalk(c *C, testData string, kc1, kc2 *keepclient.KeepClient) { + // Put a block using kc1 + locator, _, err := kc1.PutB([]byte(testData)) + c.Assert(err, Equals, nil) + + locator = strings.Split(locator, "+")[0] + _, _, _, err = kc2.Get(keepclient.SignLocator(locator, kc2.Arvados.ApiToken, time.Now().AddDate(0, 0, 1), []byte(blobSigningKey))) + c.Assert(err, NotNil) + c.Check(err.Error(), Equals, "Block not found") +} + +// Test keep-rsync initialization, with srcKeepServicesJSON +func (s *ServerRequiredSuite) TestRsyncInitializeWithKeepServicesJSON(c *C) { + srcKeepServicesJSON = testKeepServicesJSON + + setupRsync(c, false, 1) + + localRoots := kcSrc.LocalRoots() + c.Check(localRoots, NotNil) + + foundIt := false + for k := range localRoots { + if k == "zzzzz-bi6l4-123456789012340" { + foundIt = true + } + } + c.Check(foundIt, Equals, true) + + foundIt = false + for k := range localRoots { + if k == "zzzzz-bi6l4-123456789012341" { + foundIt = true + } + } + c.Check(foundIt, Equals, true) +} + +// Test keep-rsync initialization with default replications count +func (s *ServerRequiredSuite) TestInitializeRsyncDefaultReplicationsCount(c *C) { + setupRsync(c, false, 0) +} + +// Test keep-rsync initialization with replications count argument +func (s *ServerRequiredSuite) TestInitializeRsyncReplicationsCount(c *C) { + setupRsync(c, false, 3) +} + +// Put some blocks in Src and some more in Dst +// And copy missing blocks from Src to Dst +func (s *ServerRequiredSuite) TestKeepRsync(c *C) { + testKeepRsync(c, false, "") +} + +// Put some blocks in Src and some more in Dst with blob signing enabled. +// And copy missing blocks from Src to Dst +func (s *ServerRequiredSuite) TestKeepRsync_WithBlobSigning(c *C) { + testKeepRsync(c, true, "") +} + +// Put some blocks in Src and some more in Dst +// Use prefix while doing rsync +// And copy missing blocks from Src to Dst +func (s *ServerRequiredSuite) TestKeepRsync_WithPrefix(c *C) { + data := []byte("test-data-4") + hash := fmt.Sprintf("%x", md5.Sum(data)) + + testKeepRsync(c, false, hash[0:3]) + c.Check(len(dstIndex) > len(dstLocators), Equals, true) +} + +// Put some blocks in Src and some more in Dst +// Use prefix not in src while doing rsync +// And copy missing blocks from Src to Dst +func (s *ServerRequiredSuite) TestKeepRsync_WithNoSuchPrefixInSrc(c *C) { + testKeepRsync(c, false, "999") + c.Check(len(dstIndex), Equals, len(dstLocators)) +} + +// Put 5 blocks in src. Put 2 of those blocks in dst +// Hence there are 3 additional blocks in src +// Also, put 2 extra blocks in dst; they are hence only in dst +// Run rsync and verify that those 7 blocks are now available in dst +func testKeepRsync(c *C, enforcePermissions bool, prefix string) { + setupRsync(c, enforcePermissions, 1) + + // setupTestData + setupTestData(c, prefix) + + err := performKeepRsync(kcSrc, kcDst, blobSigningKey, prefix) + c.Check(err, IsNil) + + // Now GetIndex from dst and verify that all 5 from src and the 2 extra blocks are found + dstIndex, err = getUniqueLocators(kcDst, "") + c.Check(err, IsNil) + + for _, locator := range srcLocatorsMatchingPrefix { + _, ok := dstIndex[locator] + c.Assert(ok, Equals, true) + } + + for _, locator := range extraDstLocators { + _, ok := dstIndex[locator] + c.Assert(ok, Equals, true) + } + + if prefix == "" { + // all blocks from src and the two extra blocks + c.Assert(len(dstIndex), Equals, len(srcLocators)+len(extraDstLocators)) + } else { + // 1 matching prefix and copied over, 2 that were initially copied into dst along with src, and the 2 extra blocks + c.Assert(len(dstIndex), Equals, len(srcLocatorsMatchingPrefix)+len(extraDstLocators)+2) + } +} + +// Setup test data in src and dst. +var srcLocators, srcLocatorsMatchingPrefix, dstLocators, extraDstLocators []string +var dstIndex map[string]bool + +func setupTestData(c *C, indexPrefix string) { + srcLocators = []string{} + srcLocatorsMatchingPrefix = []string{} + dstLocators = []string{} + extraDstLocators = []string{} + dstIndex = make(map[string]bool) + + // Put a few blocks in src using kcSrc + for i := 0; i < 5; i++ { + hash, _, err := kcSrc.PutB([]byte(fmt.Sprintf("test-data-%d", i))) + c.Check(err, IsNil) + + srcLocators = append(srcLocators, strings.Split(hash, "+A")[0]) + if strings.HasPrefix(hash, indexPrefix) { + srcLocatorsMatchingPrefix = append(srcLocatorsMatchingPrefix, strings.Split(hash, "+A")[0]) + } + } + + // Put first two of those src blocks in dst using kcDst + for i := 0; i < 2; i++ { + hash, _, err := kcDst.PutB([]byte(fmt.Sprintf("test-data-%d", i))) + c.Check(err, IsNil) + dstLocators = append(dstLocators, strings.Split(hash, "+A")[0]) + } + + // Put two more blocks in dst; they are not in src at all + for i := 0; i < 2; i++ { + hash, _, err := kcDst.PutB([]byte(fmt.Sprintf("other-data-%d", i))) + c.Check(err, IsNil) + dstLocators = append(dstLocators, strings.Split(hash, "+A")[0]) + extraDstLocators = append(extraDstLocators, strings.Split(hash, "+A")[0]) + } +} + +// Setup rsync using srcKeepServicesJSON with fake keepservers. +// Expect error during performKeepRsync due to unreachable src keepservers. +func (s *ServerRequiredSuite) TestErrorDuringRsync_FakeSrcKeepservers(c *C) { + srcKeepServicesJSON = testKeepServicesJSON + + setupRsync(c, false, 1) + + err := performKeepRsync(kcSrc, kcDst, "", "") + c.Check(strings.HasSuffix(err.Error(), "no such host"), Equals, true) +} + +// Setup rsync using dstKeepServicesJSON with fake keepservers. +// Expect error during performKeepRsync due to unreachable dst keepservers. +func (s *ServerRequiredSuite) TestErrorDuringRsync_FakeDstKeepservers(c *C) { + dstKeepServicesJSON = testKeepServicesJSON + + setupRsync(c, false, 1) + + err := performKeepRsync(kcSrc, kcDst, "", "") + c.Check(strings.HasSuffix(err.Error(), "no such host"), Equals, true) +} + +// Test rsync with signature error during Get from src. +func (s *ServerRequiredSuite) TestErrorDuringRsync_ErrorGettingBlockFromSrc(c *C) { + setupRsync(c, true, 1) + + // put some blocks in src and dst + setupTestData(c, "") + + // Change blob signing key to a fake key, so that Get from src fails + blobSigningKey = "thisisfakeblobsigningkey" + + err := performKeepRsync(kcSrc, kcDst, blobSigningKey, "") + c.Check(strings.HasSuffix(err.Error(), "Block not found"), Equals, true) +} + +// Test rsync with error during Put to src. +func (s *ServerRequiredSuite) TestErrorDuringRsync_ErrorPuttingBlockInDst(c *C) { + setupRsync(c, false, 1) + + // put some blocks in src and dst + setupTestData(c, "") + + // Increase Want_replicas on dst to result in insufficient replicas error during Put + kcDst.Want_replicas = 2 + + err := performKeepRsync(kcSrc, kcDst, blobSigningKey, "") + c.Check(strings.HasSuffix(err.Error(), "Could not write sufficient replicas"), Equals, true) +} + +// Test loadConfig func +func (s *ServerNotRequiredSuite) TestLoadConfig(c *C) { + // Setup a src config file + srcFile := setupConfigFile(c, "src-config") + defer os.Remove(srcFile.Name()) + srcConfigFile := srcFile.Name() + + // Setup a dst config file + dstFile := setupConfigFile(c, "dst-config") + defer os.Remove(dstFile.Name()) + dstConfigFile := dstFile.Name() + + // load configuration from those files + srcConfig, srcBlobSigningKey, err := loadConfig(srcConfigFile) + c.Check(err, IsNil) + + c.Assert(srcConfig.APIHost, Equals, os.Getenv("ARVADOS_API_HOST")) + c.Assert(srcConfig.APIToken, Equals, os.Getenv("ARVADOS_API_TOKEN")) + c.Assert(srcConfig.APIHostInsecure, Equals, matchTrue.MatchString(os.Getenv("ARVADOS_API_HOST_INSECURE"))) + c.Assert(srcConfig.ExternalClient, Equals, false) + + dstConfig, _, err := loadConfig(dstConfigFile) + c.Check(err, IsNil) + + c.Assert(dstConfig.APIHost, Equals, os.Getenv("ARVADOS_API_HOST")) + c.Assert(dstConfig.APIToken, Equals, os.Getenv("ARVADOS_API_TOKEN")) + c.Assert(dstConfig.APIHostInsecure, Equals, matchTrue.MatchString(os.Getenv("ARVADOS_API_HOST_INSECURE"))) + c.Assert(dstConfig.ExternalClient, Equals, false) + + c.Assert(srcBlobSigningKey, Equals, "abcdefg") +} + +// Test loadConfig func without setting up the config files +func (s *ServerNotRequiredSuite) TestLoadConfig_MissingSrcConfig(c *C) { + _, _, err := loadConfig("") + c.Assert(err.Error(), Equals, "config file not specified") +} + +// Test loadConfig func - error reading config +func (s *ServerNotRequiredSuite) TestLoadConfig_ErrorLoadingSrcConfig(c *C) { + _, _, err := loadConfig("no-such-config-file") + c.Assert(strings.HasSuffix(err.Error(), "no such file or directory"), Equals, true) +} + +func setupConfigFile(c *C, name string) *os.File { + // Setup a config file + file, err := ioutil.TempFile(os.TempDir(), name) + c.Check(err, IsNil) + + fileContent := "ARVADOS_API_HOST=" + os.Getenv("ARVADOS_API_HOST") + "\n" + fileContent += "ARVADOS_API_TOKEN=" + os.Getenv("ARVADOS_API_TOKEN") + "\n" + fileContent += "ARVADOS_API_HOST_INSECURE=" + os.Getenv("ARVADOS_API_HOST_INSECURE") + "\n" + fileContent += "ARVADOS_EXTERNAL_CLIENT=false\n" + fileContent += "ARVADOS_BLOB_SIGNING_KEY=abcdefg" + + _, err = file.Write([]byte(fileContent)) + c.Check(err, IsNil) + + return file +} + +func (s *DoMainTestSuite) Test_doMain_NoSrcConfig(c *C) { + err := doMain() + c.Check(err, NotNil) + c.Assert(err.Error(), Equals, "Error loading src configuration from file: config file not specified") +} + +func (s *DoMainTestSuite) Test_doMain_SrcButNoDstConfig(c *C) { + srcConfig := setupConfigFile(c, "src") + args := []string{"-replications", "3", "-src", srcConfig.Name()} + os.Args = append(os.Args, args...) + err := doMain() + c.Check(err, NotNil) + c.Assert(err.Error(), Equals, "Error loading dst configuration from file: config file not specified") +} + +func (s *DoMainTestSuite) Test_doMain_BadSrcConfig(c *C) { + args := []string{"-src", "abcd"} + os.Args = append(os.Args, args...) + err := doMain() + c.Check(err, NotNil) + c.Assert(strings.HasPrefix(err.Error(), "Error loading src configuration from file: Error reading config file"), Equals, true) +} + +func (s *DoMainTestSuite) Test_doMain_WithReplicationsButNoSrcConfig(c *C) { + args := []string{"-replications", "3"} + os.Args = append(os.Args, args...) + err := doMain() + c.Check(err, NotNil) + c.Assert(err.Error(), Equals, "Error loading src configuration from file: config file not specified") +} + +func (s *DoMainTestSuite) Test_doMainWithSrcAndDstConfig(c *C) { + srcConfig := setupConfigFile(c, "src") + dstConfig := setupConfigFile(c, "dst") + args := []string{"-src", srcConfig.Name(), "-dst", dstConfig.Name()} + os.Args = append(os.Args, args...) + + // Start keepservers. Since we are not doing any tweaking as in setupRsync func, + // kcSrc and kcDst will be the same and no actual copying to dst will happen, but that's ok. + arvadostest.StartKeep(2, false) + + err := doMain() + c.Check(err, IsNil) +}