rack (>= 1.0.0)
rack-test (>= 0.5.4)
xpath (~> 2.0)
- childprocess (0.5.5)
+ childprocess (0.5.6)
ffi (~> 1.0, >= 1.0.11)
cliver (0.3.2)
coffee-rails (4.1.0)
fast_stack (0.1.0)
rake
rake-compiler
- ffi (1.9.6)
+ ffi (1.9.10)
flamegraph (0.1.0)
fast_stack
google-api-client (0.6.4)
metaclass (~> 0.0.1)
morrisjs-rails (0.5.1)
railties (> 3.1, < 5)
- multi_json (1.11.1)
+ multi_json (1.11.2)
multipart-post (1.2.0)
net-scp (1.2.1)
net-ssh (>= 2.6.5)
ref (1.0.5)
ruby-debug-passenger (0.2.0)
ruby-prof (0.15.2)
- rubyzip (1.1.6)
+ rubyzip (1.1.7)
rvm-capistrano (1.5.5)
capistrano (~> 2.15.4)
sass (3.4.9)
sprockets (>= 2.8, < 4.0)
sprockets-rails (>= 2.0, < 4.0)
tilt (~> 1.1)
- selenium-webdriver (2.44.0)
+ selenium-webdriver (2.48.1)
childprocess (~> 0.5)
multi_json (~> 1.0)
rubyzip (~> 1.0)
execjs (>= 0.3.0)
json (>= 1.8.0)
uuidtools (2.1.5)
- websocket (1.2.1)
+ websocket (1.2.2)
websocket-driver (0.5.1)
websocket-extensions (>= 0.1.0)
websocket-extensions (0.1.1)
therubyracer
uglifier (>= 1.0.3)
wiselinks
+
+BUNDLED WITH
+ 1.10.6
</li><li>
<strong>Use existing pipelines</strong>: Use best-practices pipelines on your own data with the click of a button.
</li><li>
- <strong>Open-source</strong>: Arvados is completely open-source. Check out our <a href="http://arvados.org">developer site</a>.
+ <strong>Open source</strong>: Arvados is completely open source. Check out our <a href="http://dev.arvados.org">developer site</a>.
</li>
</ol>
<p style="margin-top: 1em;">
h3. Debian and Ubuntu
-Packages are available for Debian 7 ("wheezy"), Ubuntu 12.04 ("precise"), and Ubuntu 14.04 ("trusty").
+Packages are available for Debian 7 ("wheezy"), Debian 8 ("jessie"), Ubuntu 12.04 ("precise"), and Ubuntu 14.04 ("trusty").
First, register the Curoverse signing key in apt's database:
table(table table-bordered table-condensed).
|OS version|Command|
|Debian 7 ("wheezy")|<notextile><code><span class="userinput">echo "deb http://apt.arvados.org/ wheezy main" | sudo tee /etc/apt/sources.list.d/arvados.list</span></code></notextile>|
+|Debian 8 ("jessie")|<notextile><code><span class="userinput">echo "deb http://apt.arvados.org/ jessie main" | sudo tee /etc/apt/sources.list.d/arvados.list</span></code></notextile>|
|Ubuntu 12.04 ("precise")|<notextile><code><span class="userinput">echo "deb http://apt.arvados.org/ precise main" | sudo tee /etc/apt/sources.list.d/arvados.list</span></code></notextile>|
|Ubuntu 14.04 ("trusty")|<notextile><code><span class="userinput">echo "deb http://apt.arvados.org/ trusty main" | sudo tee /etc/apt/sources.list.d/arvados.list</span></code></notextile>|
<notextile>
<pre><code>~$ <span class="userinput">ruby -e 'puts rand(2**400).to_s(36)'</span>
xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
-~$ <span class="userinput">RAILS_ENV=production bundle exec rails console</span>
+~$ <span class="userinput">cd /var/www/arvados-sso/current</span>
+/var/www/arvados-sso/current$ <span class="userinput">RAILS_ENV=production bundle exec rails console</span>
:001 > <span class="userinput">c = Client.new</span>
:002 > <span class="userinput">c.name = "joshid"</span>
:003 > <span class="userinput">c.app_id = "arvados-server"</span>
</pre>
</notextile>
+h3(#arv-get). arv get
+
+@arv get@ can be used to get a textual representation of Arvados objects from the command line. The output can be limited to a subset of the object's fields. This command can be used with only the knowledge of an object's UUID.
+
+<notextile>
+<pre>
+$ <code class="userinput">arv get --help</code>
+Usage: arv [--format json|yaml] get [uuid] [fields...]
+
+Fetch the specified Arvados object, select the specified fields,
+and print a text representation.
+</pre>
+</notextile>
+
h3(#arv-edit). arv edit
@arv edit@ can be used to edit Arvados objects from the command line. Arv edit opens up the editor of your choice (set the EDITOR environment variable) with the json or yaml description of the object. Saving the file will update the Arvados object on the API server, if it passes validation.
# Ward Vandewege <ward@curoverse.com>
require 'fileutils'
+require 'shellwords'
if RUBY_VERSION < '1.9.3' then
abort <<-EOS
end
-subcommands = %w(copy create edit keep pipeline run tag ws)
+subcommands = %w(copy create edit get keep pipeline run tag ws)
+
+def exec_bin bin, opts
+ bin_path = `which #{bin.shellescape}`.strip
+ if bin_path.empty?
+ raise "#{bin}: command not found"
+ end
+ exec bin_path, *opts
+end
def check_subcommands client, arvados, subcommand, global_opts, remaining_opts
case subcommand
arv_create client, arvados, global_opts, remaining_opts
when 'edit'
arv_edit client, arvados, global_opts, remaining_opts
+ when 'get'
+ arv_get client, arvados, global_opts, remaining_opts
when 'copy', 'tag', 'ws', 'run'
- exec `which arv-#{subcommand}`.strip, *remaining_opts
+ exec_bin "arv-#{subcommand}", remaining_opts
when 'keep'
@sub = remaining_opts.shift
if ['get', 'put', 'ls', 'normalize'].index @sub then
# Native Arvados
- exec `which arv-#{@sub}`.strip, *remaining_opts
+ exec_bin "arv-#{@sub}", remaining_opts
elsif @sub == 'docker'
- exec `which arv-keepdocker`.strip, *remaining_opts
+ exec_bin "arv-keepdocker", remaining_opts
else
puts "Usage: arv keep [method] [--parameters]\n"
puts "Use 'arv keep [method] --help' to get more information about specific methods.\n\n"
when 'pipeline'
sub = remaining_opts.shift
if sub == 'run'
- exec `which arv-run-pipeline-instance`.strip, *remaining_opts
+ exec_bin "arv-run-pipeline-instance", remaining_opts
else
puts "Usage: arv pipeline [method] [--parameters]\n"
puts "Use 'arv pipeline [method] --help' to get more information about specific methods.\n\n"
def edit_and_commit_object initial_obj, tmp_stem, global_opts, &block
- content = case global_opts[:format]
- when 'json'
- Oj.dump(initial_obj, :indent => 1)
- when 'yaml'
- initial_obj.to_yaml
- else
- abort "Unrecognized format #{global_opts[:format]}"
- end
+ content = get_obj_content initial_obj, global_opts
tmp_file = Tempfile.new([tmp_stem, ".#{global_opts[:format]}"])
tmp_file.write(content)
Oj.load(newcontent)
when 'yaml'
YAML.load(newcontent)
+ else
+ abort "Unrecognized format #{global_opts[:format]}"
end
yield newobj
results
end
-def arv_edit client, arvados, global_opts, remaining_opts
- uuid = remaining_opts.shift
- if uuid.nil? or uuid == "-h" or uuid == "--help"
- puts head_banner
- puts "Usage: arv edit [uuid] [fields...]\n\n"
- puts "Fetch the specified Arvados object, select the specified fields, \n"
- puts "open an interactive text editor on a text representation (json or\n"
- puts "yaml, use --format) and then update the object. Will use 'nano'\n"
- puts "by default, customize with the EDITOR or VISUAL environment variable.\n"
- exit 255
- end
-
- # determine controller
-
+def lookup_uuid_rsc arvados, uuid
m = /([a-z0-9]{5})-([a-z0-9]{5})-([a-z0-9]{15})/.match uuid
if !m
if /^[a-f0-9]{32}/.match uuid
abort "Could not determine resource type #{m[2]}"
end
+ return rsc
+end
+
+def fetch_rsc_obj client, arvados, rsc, uuid, remaining_opts
+
begin
result = client.execute(:api_method => eval('arvados.' + rsc + '.get'),
:parameters => {"uuid" => uuid},
:headers => {
authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
})
- oldobj = check_response result
+ obj = check_response result
rescue => e
abort "Server error: #{e}"
end
if remaining_opts.length > 0
- oldobj.select! { |k, v| remaining_opts.include? k }
+ obj.select! { |k, v| remaining_opts.include? k }
+ end
+
+ return obj
+end
+
+def get_obj_content obj, global_opts
+ content = case global_opts[:format]
+ when 'json'
+ Oj.dump(obj, :indent => 1)
+ when 'yaml'
+ obj.to_yaml
+ else
+ abort "Unrecognized format #{global_opts[:format]}"
+ end
+ return content
+end
+
+def arv_edit client, arvados, global_opts, remaining_opts
+ uuid = remaining_opts.shift
+ if uuid.nil? or uuid == "-h" or uuid == "--help"
+ puts head_banner
+ puts "Usage: arv edit [uuid] [fields...]\n\n"
+ puts "Fetch the specified Arvados object, select the specified fields, \n"
+ puts "open an interactive text editor on a text representation (json or\n"
+ puts "yaml, use --format) and then update the object. Will use 'nano'\n"
+ puts "by default, customize with the EDITOR or VISUAL environment variable.\n"
+ exit 255
end
+ rsc = lookup_uuid_rsc arvados, uuid
+ oldobj = fetch_rsc_obj client, arvados, rsc, uuid, remaining_opts
+
edit_and_commit_object oldobj, uuid, global_opts do |newobj|
newobj.select! {|k| newobj[k] != oldobj[k]}
if !newobj.empty?
exit 0
end
+def arv_get client, arvados, global_opts, remaining_opts
+ uuid = remaining_opts.shift
+ if uuid.nil? or uuid == "-h" or uuid == "--help"
+ puts head_banner
+ puts "Usage: arv [--format json|yaml] get [uuid] [fields...]\n\n"
+ puts "Fetch the specified Arvados object, select the specified fields,\n"
+ puts "and print a text representation.\n"
+ exit 255
+ end
+
+ rsc = lookup_uuid_rsc arvados, uuid
+ obj = fetch_rsc_obj client, arvados, rsc, uuid, remaining_opts
+ content = get_obj_content obj, global_opts
+
+ puts content
+ exit 0
+end
+
def arv_create client, arvados, global_opts, remaining_opts
types = resource_types(arvados.discovery_document)
create_opts = Trollop::options do
require 'minitest/autorun'
-require 'digest/md5'
+require 'json'
+require 'yaml'
+# Black box tests for 'arv get' command.
class TestArvGet < Minitest::Test
- def setup
- begin
- Dir.mkdir './tmp'
- rescue Errno::EEXIST
- end
- @@foo_manifest_locator ||= `echo -n foo | ./bin/arv-put --filename foo --no-progress -`.strip
- @@baz_locator ||= `echo -n baz | ./bin/arv-put --as-raw --no-progress -`.strip
- @@multilevel_manifest_locator ||= `echo ./foo/bar #{@@baz_locator} 0:3:baz | ./bin/arv-put --as-raw --no-progress -`.strip
- end
+ # UUID for an Arvados object that does not exist
+ NON_EXISTENT_OBJECT_UUID = "zzzzz-zzzzz-zzzzzzzzzzzzzzz"
+ # Name of field of Arvados object that can store any (textual) value
+ STORED_VALUE_FIELD_NAME = "name"
+ # Name of UUID field of Arvados object
+ UUID_FIELD_NAME = "uuid"
+ # Name of an invalid field of Arvados object
+ INVALID_FIELD_NAME = "invalid"
- def test_no_args
+ # Tests that a valid Arvados object can be retrieved in a supported format
+ # using: `arv get [uuid]`. Given all other `arv foo` commands return JSON
+ # when no format is specified, JSON should be expected in this case.
+ def test_get_valid_object_no_format_specified
+ stored_value = __method__.to_s
+ uuid = create_arv_object_with_value(stored_value)
out, err = capture_subprocess_io do
- assert_arv_get false
+ assert(arv_get_default(uuid))
end
- assert_equal '', out
- assert_match /^usage:/, err
+ assert_empty(err, "Error text not expected: '#{err}'")
+ arv_object = parse_json_arv_object(out)
+ assert(has_field_with_value(arv_object, STORED_VALUE_FIELD_NAME, stored_value))
end
- def test_help
+ # Tests that a valid Arvados object can be retrieved in JSON format using:
+ # `arv get [uuid] --format json`.
+ def test_get_valid_object_json_format_specified
+ stored_value = __method__.to_s
+ uuid = create_arv_object_with_value(stored_value)
out, err = capture_subprocess_io do
- assert_arv_get '-h'
+ assert(arv_get_json(uuid))
end
- $stderr.write err
- assert_equal '', err
- assert_match /^usage:/, out
+ assert_empty(err, "Error text not expected: '#{err}'")
+ arv_object = parse_json_arv_object(out)
+ assert(has_field_with_value(arv_object, STORED_VALUE_FIELD_NAME, stored_value))
end
- def test_file_to_dev_stdout
- test_file_to_stdout('/dev/stdout')
- end
-
- def test_file_to_stdout(specify_stdout_as='-')
+ # Tests that a valid Arvados object can be retrieved in YAML format using:
+ # `arv get [uuid] --format yaml`.
+ def test_get_valid_object_yaml_format_specified
+ stored_value = __method__.to_s
+ uuid = create_arv_object_with_value(stored_value)
out, err = capture_subprocess_io do
- assert_arv_get @@foo_manifest_locator + '/foo', specify_stdout_as
+ assert(arv_get_yaml(uuid))
end
- assert_equal '', err
- assert_equal 'foo', out
+ assert_empty(err, "Error text not expected: '#{err}'")
+ arv_object = parse_yaml_arv_object(out)
+ assert(has_field_with_value(arv_object, STORED_VALUE_FIELD_NAME, stored_value))
end
- def test_file_to_file
- remove_tmp_foo
+ # Tests that a subset of all fields of a valid Arvados object can be retrieved
+ # using: `arv get [uuid] [fields...]`.
+ def test_get_valid_object_with_valid_fields
+ stored_value = __method__.to_s
+ uuid = create_arv_object_with_value(stored_value)
out, err = capture_subprocess_io do
- assert_arv_get @@foo_manifest_locator + '/foo', 'tmp/foo'
+ assert(arv_get_json(uuid, STORED_VALUE_FIELD_NAME, UUID_FIELD_NAME))
end
- assert_equal '', err
- assert_equal '', out
- assert_equal 'foo', IO.read('tmp/foo')
+ assert_empty(err, "Error text not expected: '#{err}'")
+ arv_object = parse_json_arv_object(out)
+ assert(has_field_with_value(arv_object, STORED_VALUE_FIELD_NAME, stored_value))
+ assert(has_field_with_value(arv_object, UUID_FIELD_NAME, uuid))
end
- def test_file_to_file_no_overwrite_file
- File.open './tmp/foo', 'wb' do |f|
- f.write 'baz'
- end
+ # Tests that the valid field is retrieved when both a valid and invalid field
+ # are requested from a valid Arvados object, using:
+ # `arv get [uuid] [fields...]`.
+ def test_get_valid_object_with_both_valid_and_invalid_fields
+ stored_value = __method__.to_s
+ uuid = create_arv_object_with_value(stored_value)
out, err = capture_subprocess_io do
- assert_arv_get false, @@foo_manifest_locator + '/foo', 'tmp/foo'
+ assert(arv_get_json(uuid, STORED_VALUE_FIELD_NAME, INVALID_FIELD_NAME))
end
- assert_match /Local file tmp\/foo already exists/, err
- assert_equal '', out
- assert_equal 'baz', IO.read('tmp/foo')
+ assert_empty(err, "Error text not expected: '#{err}'")
+ arv_object = parse_json_arv_object(out)
+ assert(has_field_with_value(arv_object, STORED_VALUE_FIELD_NAME, stored_value))
+ refute(has_field_with_value(arv_object, INVALID_FIELD_NAME, stored_value))
end
- def test_file_to_file_no_overwrite_file_in_dir
- File.open './tmp/foo', 'wb' do |f|
- f.write 'baz'
- end
+ # Tests that no fields are retreived when no valid fields are requested from
+ # a valid Arvados object, using: `arv get [uuid] [fields...]`.
+ def test_get_valid_object_with_no_valid_fields
+ stored_value = __method__.to_s
+ uuid = create_arv_object_with_value(stored_value)
out, err = capture_subprocess_io do
- assert_arv_get false, @@foo_manifest_locator + '/', 'tmp/'
+ assert(arv_get_json(uuid, INVALID_FIELD_NAME))
end
- assert_match /Local file tmp\/foo already exists/, err
- assert_equal '', out
- assert_equal 'baz', IO.read('tmp/foo')
+ assert_empty(err, "Error text not expected: '#{err}'")
+ arv_object = parse_json_arv_object(out)
+ assert_equal(0, arv_object.length)
end
- def test_file_to_file_force_overwrite
- File.open './tmp/foo', 'wb' do |f|
- f.write 'baz'
- end
- assert_equal 'baz', IO.read('tmp/foo')
+ # Tests that an invalid (non-existent) Arvados object is not retrieved using:
+ # using: `arv get [non-existent-uuid]`.
+ def test_get_invalid_object
out, err = capture_subprocess_io do
- assert_arv_get '-f', @@foo_manifest_locator + '/', 'tmp/'
+ refute(arv_get_json(NON_EXISTENT_OBJECT_UUID))
end
- assert_match '', err
- assert_equal '', out
- assert_equal 'foo', IO.read('tmp/foo')
+ refute_empty(err, "Expected error feedback on request for invalid object")
+ assert_empty(out)
end
- def test_file_to_file_skip_existing
- File.open './tmp/foo', 'wb' do |f|
- f.write 'baz'
- end
- assert_equal 'baz', IO.read('tmp/foo')
+ # Tests that help text exists using: `arv get --help`.
+ def test_help_exists
out, err = capture_subprocess_io do
- assert_arv_get '--skip-existing', @@foo_manifest_locator + '/', 'tmp/'
+# assert(arv_get_default("--help"), "Expected exit code 0: #{$?}")
+ #XXX: Exit code given is 255. It probably should be 0, which seems to be
+ # standard elsewhere. However, 255 is in line with other `arv`
+ # commands (e.g. see `arv edit`) so ignoring the problem here.
+ arv_get_default("--help")
end
- assert_match '', err
- assert_equal '', out
- assert_equal 'baz', IO.read('tmp/foo')
+ assert_empty(err, "Error text not expected: '#{err}'")
+ refute_empty(out, "Help text should be given")
end
- def test_file_to_dir
- remove_tmp_foo
- out, err = capture_subprocess_io do
- assert_arv_get @@foo_manifest_locator + '/foo', 'tmp/'
- end
- assert_equal '', err
- assert_equal '', out
- assert_equal 'foo', IO.read('tmp/foo')
- end
-
- def test_dir_to_file
- out, err = capture_subprocess_io do
- assert_arv_get false, @@foo_manifest_locator + '/', 'tmp/foo'
- end
- assert_equal '', out
- assert_match /^usage:/, err
- end
-
- def test_dir_to_empty_string
- out, err = capture_subprocess_io do
- assert_arv_get false, @@foo_manifest_locator + '/', ''
- end
- assert_equal '', out
- assert_match /^usage:/, err
- end
-
- def test_nonexistent_block
- out, err = capture_subprocess_io do
- assert_arv_get false, 'e796ab2294f3e48ec709ffa8d6daf58c'
- end
- assert_equal '', out
- assert_match /Error:/, err
- end
-
- def test_nonexistent_manifest
- out, err = capture_subprocess_io do
- assert_arv_get false, 'acbd18db4cc2f85cedef654fccc4a4d8/', 'tmp/'
- end
- assert_equal '', out
- assert_match /Error:/, err
- end
-
- def test_manifest_root_to_dir
- remove_tmp_foo
- out, err = capture_subprocess_io do
- assert_arv_get '-r', @@foo_manifest_locator + '/', 'tmp/'
- end
- assert_equal '', err
- assert_equal '', out
- assert_equal 'foo', IO.read('tmp/foo')
- end
-
- def test_manifest_root_to_dir_noslash
- remove_tmp_foo
- out, err = capture_subprocess_io do
- assert_arv_get '-r', @@foo_manifest_locator + '/', 'tmp'
- end
- assert_equal '', err
- assert_equal '', out
- assert_equal 'foo', IO.read('tmp/foo')
- end
-
- def test_display_md5sum
- remove_tmp_foo
- out, err = capture_subprocess_io do
- assert_arv_get '-r', '--md5sum', @@foo_manifest_locator + '/', 'tmp/'
- end
- assert_equal "#{Digest::MD5.hexdigest('foo')} ./foo\n", err
- assert_equal '', out
- assert_equal 'foo', IO.read('tmp/foo')
+ protected
+ # Runs 'arv get <varargs>' with given arguments. Returns whether the exit
+ # status was 0 (i.e. success). Use $? to attain more details on failure.
+ def arv_get_default(*args)
+ return system("arv", "get", *args)
end
- def test_md5sum_nowrite
- remove_tmp_foo
- out, err = capture_subprocess_io do
- assert_arv_get '-n', '--md5sum', @@foo_manifest_locator + '/', 'tmp/'
- end
- assert_equal "#{Digest::MD5.hexdigest('foo')} ./foo\n", err
- assert_equal '', out
- assert_equal false, File.exists?('tmp/foo')
+ # Runs 'arv --format json get <varargs>' with given arguments. Returns whether
+ # the exit status was 0 (i.e. success). Use $? to attain more details on
+ # failure.
+ def arv_get_json(*args)
+ return system("arv", "--format", "json", "get", *args)
end
- def test_sha1_nowrite
- remove_tmp_foo
- out, err = capture_subprocess_io do
- assert_arv_get '-n', '-r', '--hash', 'sha1', @@foo_manifest_locator+'/', 'tmp/'
- end
- assert_equal "#{Digest::SHA1.hexdigest('foo')} ./foo\n", err
- assert_equal '', out
- assert_equal false, File.exists?('tmp/foo')
+ # Runs 'arv --format yaml get <varargs>' with given arguments. Returns whether
+ # the exit status was 0 (i.e. success). Use $? to attain more details on
+ # failure.
+ def arv_get_yaml(*args)
+ return system("arv", "--format", "yaml", "get", *args)
end
- def test_block_to_file
- remove_tmp_foo
+ # Creates an Arvados object that stores a given value. Returns the uuid of the
+ # created object.
+ def create_arv_object_with_value(value)
out, err = capture_subprocess_io do
- assert_arv_get @@foo_manifest_locator, 'tmp/foo'
+ system("arv", "tag", "add", value, "--object", "testing")
+ assert $?.success?, "Command failure running `arv tag`: #{$?}"
end
assert_equal '', err
- assert_equal '', out
-
- digest = Digest::MD5.hexdigest('foo')
- !(IO.read('tmp/foo')).gsub!( /^(. #{digest}+3)(.*)( 0:3:foo)$/).nil?
+ assert_operator 0, :<, out.strip.length
+ out.strip
end
- def test_create_directory_tree
- `rm -rf ./tmp/arv-get-test/`
- Dir.mkdir './tmp/arv-get-test'
- out, err = capture_subprocess_io do
- assert_arv_get @@multilevel_manifest_locator + '/', 'tmp/arv-get-test/'
+ # Parses the given JSON representation of an Arvados object, returning
+ # an equivalent Ruby representation (a hash map).
+ def parse_json_arv_object(arvObjectAsJson)
+ begin
+ parsed = JSON.parse(arvObjectAsJson)
+ assert(parsed.instance_of?(Hash))
+ return parsed
+ rescue JSON::ParserError => e
+ raise "Invalid JSON representation of Arvados object.\n" \
+ "Parse error: '#{e}'\n" \
+ "JSON: '#{arvObjectAsJson}'\n"
end
- assert_equal '', err
- assert_equal '', out
- assert_equal 'baz', IO.read('tmp/arv-get-test/foo/bar/baz')
end
- def test_create_partial_directory_tree
- `rm -rf ./tmp/arv-get-test/`
- Dir.mkdir './tmp/arv-get-test'
- out, err = capture_subprocess_io do
- assert_arv_get(@@multilevel_manifest_locator + '/foo/',
- 'tmp/arv-get-test/')
+ # Parses the given JSON representation of an Arvados object, returning
+ # an equivalent Ruby representation (a hash map).
+ def parse_yaml_arv_object(arvObjectAsYaml)
+ begin
+ parsed = YAML.load(arvObjectAsYaml)
+ assert(parsed.instance_of?(Hash))
+ return parsed
+ rescue
+ raise "Invalid YAML representation of Arvados object.\n" \
+ "YAML: '#{arvObjectAsYaml}'\n"
end
- assert_equal '', err
- assert_equal '', out
- assert_equal 'baz', IO.read('tmp/arv-get-test/bar/baz')
end
- protected
- def assert_arv_get(*args)
- expect = case args.first
- when true, false
- args.shift
- else
- true
- end
- assert_equal(expect,
- system(['./bin/arv-get', 'arv-get'], *args),
- "`arv-get #{args.join ' '}` " +
- "should exit #{if expect then 0 else 'non-zero' end}")
- end
-
- def remove_tmp_foo
- begin
- File.unlink('tmp/foo')
- rescue Errno::ENOENT
+ # Checks whether the given Arvados object has the given expected value for the
+ # specified field.
+ def has_field_with_value(arvObjectAsHash, fieldName, expectedValue)
+ if !arvObjectAsHash.has_key?(fieldName)
+ return false
end
+ return (arvObjectAsHash[fieldName] == expectedValue)
end
end
--- /dev/null
+require 'minitest/autorun'
+require 'digest/md5'
+
+class TestArvKeepGet < Minitest::Test
+ def setup
+ begin
+ Dir.mkdir './tmp'
+ rescue Errno::EEXIST
+ end
+ @@foo_manifest_locator ||= `echo -n foo | ./bin/arv-put --filename foo --no-progress -`.strip
+ @@baz_locator ||= `echo -n baz | ./bin/arv-put --as-raw --no-progress -`.strip
+ @@multilevel_manifest_locator ||= `echo ./foo/bar #{@@baz_locator} 0:3:baz | ./bin/arv-put --as-raw --no-progress -`.strip
+ end
+
+ def test_no_args
+ out, err = capture_subprocess_io do
+ assert_arv_get false
+ end
+ assert_equal '', out
+ assert_match /^usage:/, err
+ end
+
+ def test_help
+ out, err = capture_subprocess_io do
+ assert_arv_get '-h'
+ end
+ $stderr.write err
+ assert_equal '', err
+ assert_match /^usage:/, out
+ end
+
+ def test_file_to_dev_stdout
+ test_file_to_stdout('/dev/stdout')
+ end
+
+ def test_file_to_stdout(specify_stdout_as='-')
+ out, err = capture_subprocess_io do
+ assert_arv_get @@foo_manifest_locator + '/foo', specify_stdout_as
+ end
+ assert_equal '', err
+ assert_equal 'foo', out
+ end
+
+ def test_file_to_file
+ remove_tmp_foo
+ out, err = capture_subprocess_io do
+ assert_arv_get @@foo_manifest_locator + '/foo', 'tmp/foo'
+ end
+ assert_equal '', err
+ assert_equal '', out
+ assert_equal 'foo', IO.read('tmp/foo')
+ end
+
+ def test_file_to_file_no_overwrite_file
+ File.open './tmp/foo', 'wb' do |f|
+ f.write 'baz'
+ end
+ out, err = capture_subprocess_io do
+ assert_arv_get false, @@foo_manifest_locator + '/foo', 'tmp/foo'
+ end
+ assert_match /Local file tmp\/foo already exists/, err
+ assert_equal '', out
+ assert_equal 'baz', IO.read('tmp/foo')
+ end
+
+ def test_file_to_file_no_overwrite_file_in_dir
+ File.open './tmp/foo', 'wb' do |f|
+ f.write 'baz'
+ end
+ out, err = capture_subprocess_io do
+ assert_arv_get false, @@foo_manifest_locator + '/', 'tmp/'
+ end
+ assert_match /Local file tmp\/foo already exists/, err
+ assert_equal '', out
+ assert_equal 'baz', IO.read('tmp/foo')
+ end
+
+ def test_file_to_file_force_overwrite
+ File.open './tmp/foo', 'wb' do |f|
+ f.write 'baz'
+ end
+ assert_equal 'baz', IO.read('tmp/foo')
+ out, err = capture_subprocess_io do
+ assert_arv_get '-f', @@foo_manifest_locator + '/', 'tmp/'
+ end
+ assert_match '', err
+ assert_equal '', out
+ assert_equal 'foo', IO.read('tmp/foo')
+ end
+
+ def test_file_to_file_skip_existing
+ File.open './tmp/foo', 'wb' do |f|
+ f.write 'baz'
+ end
+ assert_equal 'baz', IO.read('tmp/foo')
+ out, err = capture_subprocess_io do
+ assert_arv_get '--skip-existing', @@foo_manifest_locator + '/', 'tmp/'
+ end
+ assert_match '', err
+ assert_equal '', out
+ assert_equal 'baz', IO.read('tmp/foo')
+ end
+
+ def test_file_to_dir
+ remove_tmp_foo
+ out, err = capture_subprocess_io do
+ assert_arv_get @@foo_manifest_locator + '/foo', 'tmp/'
+ end
+ assert_equal '', err
+ assert_equal '', out
+ assert_equal 'foo', IO.read('tmp/foo')
+ end
+
+ def test_dir_to_file
+ out, err = capture_subprocess_io do
+ assert_arv_get false, @@foo_manifest_locator + '/', 'tmp/foo'
+ end
+ assert_equal '', out
+ assert_match /^usage:/, err
+ end
+
+ def test_dir_to_empty_string
+ out, err = capture_subprocess_io do
+ assert_arv_get false, @@foo_manifest_locator + '/', ''
+ end
+ assert_equal '', out
+ assert_match /^usage:/, err
+ end
+
+ def test_nonexistent_block
+ out, err = capture_subprocess_io do
+ assert_arv_get false, 'e796ab2294f3e48ec709ffa8d6daf58c'
+ end
+ assert_equal '', out
+ assert_match /Error:/, err
+ end
+
+ def test_nonexistent_manifest
+ out, err = capture_subprocess_io do
+ assert_arv_get false, 'acbd18db4cc2f85cedef654fccc4a4d8/', 'tmp/'
+ end
+ assert_equal '', out
+ assert_match /Error:/, err
+ end
+
+ def test_manifest_root_to_dir
+ remove_tmp_foo
+ out, err = capture_subprocess_io do
+ assert_arv_get '-r', @@foo_manifest_locator + '/', 'tmp/'
+ end
+ assert_equal '', err
+ assert_equal '', out
+ assert_equal 'foo', IO.read('tmp/foo')
+ end
+
+ def test_manifest_root_to_dir_noslash
+ remove_tmp_foo
+ out, err = capture_subprocess_io do
+ assert_arv_get '-r', @@foo_manifest_locator + '/', 'tmp'
+ end
+ assert_equal '', err
+ assert_equal '', out
+ assert_equal 'foo', IO.read('tmp/foo')
+ end
+
+ def test_display_md5sum
+ remove_tmp_foo
+ out, err = capture_subprocess_io do
+ assert_arv_get '-r', '--md5sum', @@foo_manifest_locator + '/', 'tmp/'
+ end
+ assert_equal "#{Digest::MD5.hexdigest('foo')} ./foo\n", err
+ assert_equal '', out
+ assert_equal 'foo', IO.read('tmp/foo')
+ end
+
+ def test_md5sum_nowrite
+ remove_tmp_foo
+ out, err = capture_subprocess_io do
+ assert_arv_get '-n', '--md5sum', @@foo_manifest_locator + '/', 'tmp/'
+ end
+ assert_equal "#{Digest::MD5.hexdigest('foo')} ./foo\n", err
+ assert_equal '', out
+ assert_equal false, File.exists?('tmp/foo')
+ end
+
+ def test_sha1_nowrite
+ remove_tmp_foo
+ out, err = capture_subprocess_io do
+ assert_arv_get '-n', '-r', '--hash', 'sha1', @@foo_manifest_locator+'/', 'tmp/'
+ end
+ assert_equal "#{Digest::SHA1.hexdigest('foo')} ./foo\n", err
+ assert_equal '', out
+ assert_equal false, File.exists?('tmp/foo')
+ end
+
+ def test_block_to_file
+ remove_tmp_foo
+ out, err = capture_subprocess_io do
+ assert_arv_get @@foo_manifest_locator, 'tmp/foo'
+ end
+ assert_equal '', err
+ assert_equal '', out
+
+ digest = Digest::MD5.hexdigest('foo')
+ !(IO.read('tmp/foo')).gsub!( /^(. #{digest}+3)(.*)( 0:3:foo)$/).nil?
+ end
+
+ def test_create_directory_tree
+ `rm -rf ./tmp/arv-get-test/`
+ Dir.mkdir './tmp/arv-get-test'
+ out, err = capture_subprocess_io do
+ assert_arv_get @@multilevel_manifest_locator + '/', 'tmp/arv-get-test/'
+ end
+ assert_equal '', err
+ assert_equal '', out
+ assert_equal 'baz', IO.read('tmp/arv-get-test/foo/bar/baz')
+ end
+
+ def test_create_partial_directory_tree
+ `rm -rf ./tmp/arv-get-test/`
+ Dir.mkdir './tmp/arv-get-test'
+ out, err = capture_subprocess_io do
+ assert_arv_get(@@multilevel_manifest_locator + '/foo/',
+ 'tmp/arv-get-test/')
+ end
+ assert_equal '', err
+ assert_equal '', out
+ assert_equal 'baz', IO.read('tmp/arv-get-test/bar/baz')
+ end
+
+ protected
+ def assert_arv_get(*args)
+ expect = case args.first
+ when true, false
+ args.shift
+ else
+ true
+ end
+ assert_equal(expect,
+ system(['./bin/arv-get', 'arv-get'], *args),
+ "`arv-get #{args.join ' '}` " +
+ "should exit #{if expect then 0 else 'non-zero' end}")
+ end
+
+ def remove_tmp_foo
+ begin
+ File.unlink('tmp/foo')
+ rescue Errno::ENOENT
+ end
+ end
+end
require 'minitest/autorun'
require 'digest/md5'
-class TestArvPut < Minitest::Test
+class TestArvKeepPut < Minitest::Test
def setup
begin Dir.mkdir './tmp' rescue Errno::EEXIST end
begin Dir.mkdir './tmp/empty_dir' rescue Errno::EEXIST end
func (s *ServerRequiredSuite) SetUpSuite(c *C) {
arvadostest.StartAPI()
- arvadostest.StartKeep()
+ arvadostest.StartKeep(2, false)
}
func (s *ServerRequiredSuite) SetUpTest(c *C) {
"log"
"os"
"os/exec"
+ "strconv"
"strings"
)
exec.Command("python", "run_test_server.py", "stop").Run()
}
-func StartKeep() {
+// StartKeep starts the given number of keep servers,
+// optionally with -enforce-permissions enabled.
+// Use numKeepServers = 2 and enforcePermissions = false under all normal circumstances.
+func StartKeep(numKeepServers int, enforcePermissions bool) {
cwd, _ := os.Getwd()
defer os.Chdir(cwd)
chdirToPythonTests()
- cmd := exec.Command("python", "run_test_server.py", "start_keep")
+ cmdArgs := []string{"run_test_server.py", "start_keep", "--num-keep-servers", strconv.Itoa(numKeepServers)}
+ if enforcePermissions {
+ cmdArgs = append(cmdArgs, "--keep-enforce-permissions")
+ }
+
+ cmd := exec.Command("python", cmdArgs...)
+
stderr, err := cmd.StderrPipe()
if err != nil {
log.Fatalf("Setting up stderr pipe: %s", err)
}
}
-func StopKeep() {
+// StopKeep stops keep servers that were started with StartKeep.
+// numkeepServers should be the same value that was passed to StartKeep,
+// which is 2 under all normal circumstances.
+func StopKeep(numKeepServers int) {
cwd, _ := os.Getwd()
defer os.Chdir(cwd)
chdirToPythonTests()
- exec.Command("python", "run_test_server.py", "stop_keep").Run()
+ exec.Command("python", "run_test_server.py", "stop_keep", "--num-keep-servers", strconv.Itoa(numKeepServers))
}
"io/ioutil"
"log"
"net/http"
- "os"
"regexp"
"strconv"
"strings"
gatewayRoots *map[string]string
lock sync.RWMutex
Client *http.Client
+ Retries int
// set to 1 if all writable services are of disk type, otherwise 0
replicasPerService int
}
-// Create a new KeepClient. This will contact the API server to discover Keep
-// servers.
+// MakeKeepClient creates a new KeepClient by contacting the API server to discover Keep servers.
func MakeKeepClient(arv *arvadosclient.ArvadosClient) (*KeepClient, error) {
- var matchTrue = regexp.MustCompile("^(?i:1|yes|true)$")
- insecure := matchTrue.MatchString(os.Getenv("ARVADOS_API_HOST_INSECURE"))
+ kc := New(arv)
+ return kc, kc.DiscoverKeepServers()
+}
+
+// New func creates a new KeepClient struct.
+// This func does not discover keep servers. It is the caller's responsibility.
+func New(arv *arvadosclient.ArvadosClient) *KeepClient {
+ defaultReplicationLevel := 2
+ value, err := arv.Discovery("defaultCollectionReplication")
+ if err == nil {
+ v, ok := value.(float64)
+ if ok && v > 0 {
+ defaultReplicationLevel = int(v)
+ }
+ }
+
kc := &KeepClient{
Arvados: arv,
- Want_replicas: 2,
+ Want_replicas: defaultReplicationLevel,
Using_proxy: false,
Client: &http.Client{Transport: &http.Transport{
- TLSClientConfig: &tls.Config{InsecureSkipVerify: insecure}}},
+ TLSClientConfig: &tls.Config{InsecureSkipVerify: arv.ApiInsecure}}},
+ Retries: 2,
}
- return kc, kc.DiscoverKeepServers()
+ return kc
}
// Put a block given the block hash, a reader, and the number of bytes
}
}
+func (kc *KeepClient) getOrHead(method string, locator string) (io.ReadCloser, int64, string, error) {
+ var errs []string
+
+ tries_remaining := 1 + kc.Retries
+ serversToTry := kc.getSortedRoots(locator)
+ var retryList []string
+
+ for tries_remaining > 0 {
+ tries_remaining -= 1
+ retryList = nil
+
+ for _, host := range serversToTry {
+ url := host + "/" + locator
+
+ req, err := http.NewRequest(method, url, nil)
+ if err != nil {
+ errs = append(errs, fmt.Sprintf("%s: %v", url, err))
+ continue
+ }
+ req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", kc.Arvados.ApiToken))
+ resp, err := kc.Client.Do(req)
+ if err != nil {
+ // Probably a network error, may be transient,
+ // can try again.
+ errs = append(errs, fmt.Sprintf("%s: %v", url, err))
+ retryList = append(retryList, host)
+ } else if resp.StatusCode != http.StatusOK {
+ var respbody []byte
+ respbody, _ = ioutil.ReadAll(&io.LimitedReader{resp.Body, 4096})
+ resp.Body.Close()
+ errs = append(errs, fmt.Sprintf("%s: HTTP %d %q",
+ url, resp.StatusCode, bytes.TrimSpace(respbody)))
+
+ if resp.StatusCode == 408 ||
+ resp.StatusCode == 429 ||
+ resp.StatusCode >= 500 {
+ // Timeout, too many requests, or other
+ // server side failure, transient
+ // error, can try again.
+ retryList = append(retryList, host)
+ }
+ } else {
+ // Success.
+ if method == "GET" {
+ return HashCheckingReader{
+ Reader: resp.Body,
+ Hash: md5.New(),
+ Check: locator[0:32],
+ }, resp.ContentLength, url, nil
+ } else {
+ resp.Body.Close()
+ return nil, resp.ContentLength, url, nil
+ }
+ }
+
+ }
+ serversToTry = retryList
+ }
+ log.Printf("DEBUG: %s %s failed: %v", method, locator, errs)
+
+ return nil, 0, "", BlockNotFound
+}
+
// Get() retrieves a block, given a locator. Returns a reader, the
// expected data length, the URL the block is being fetched from, and
// an error.
// reader returned by this method will return a BadChecksum error
// instead of EOF.
func (kc *KeepClient) Get(locator string) (io.ReadCloser, int64, string, error) {
- var errs []string
- for _, host := range kc.getSortedRoots(locator) {
- url := host + "/" + locator
- req, err := http.NewRequest("GET", url, nil)
- if err != nil {
- errs = append(errs, fmt.Sprintf("%s: %v", url, err))
- continue
- }
- req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", kc.Arvados.ApiToken))
- resp, err := kc.Client.Do(req)
- if err != nil {
- errs = append(errs, fmt.Sprintf("%s: %v", url, err))
- continue
- } else if resp.StatusCode != http.StatusOK {
- respbody, _ := ioutil.ReadAll(&io.LimitedReader{resp.Body, 4096})
- resp.Body.Close()
- errs = append(errs, fmt.Sprintf("%s: HTTP %d %q",
- url, resp.StatusCode, bytes.TrimSpace(respbody)))
- continue
- }
- return HashCheckingReader{
- Reader: resp.Body,
- Hash: md5.New(),
- Check: locator[0:32],
- }, resp.ContentLength, url, nil
- }
- log.Printf("DEBUG: GET %s failed: %v", locator, errs)
- return nil, 0, "", BlockNotFound
+ return kc.getOrHead("GET", locator)
}
// Ask() verifies that a block with the given hash is available and
// Returns the data size (content length) reported by the Keep service
// and the URI reporting the data size.
func (kc *KeepClient) Ask(locator string) (int64, string, error) {
- for _, host := range kc.getSortedRoots(locator) {
- url := host + "/" + locator
- req, err := http.NewRequest("HEAD", url, nil)
- if err != nil {
- continue
- }
- req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", kc.Arvados.ApiToken))
- if resp, err := kc.Client.Do(req); err == nil && resp.StatusCode == http.StatusOK {
- return resp.ContentLength, url, nil
- }
- }
- return 0, "", BlockNotFound
+ _, size, url, err := kc.getOrHead("HEAD", locator)
+ return size, url, err
}
// GetIndex retrieves a list of blocks stored on the given server whose hashes
return
}
arvadostest.StartAPI()
- arvadostest.StartKeep()
+ arvadostest.StartKeep(2, false)
}
func (s *ServerRequiredSuite) TearDownSuite(c *C) {
if *no_server {
return
}
- arvadostest.StopKeep()
+ arvadostest.StopKeep(2)
arvadostest.StopAPI()
}
}
}
+func (s *ServerRequiredSuite) TestDefaultReplications(c *C) {
+ arv, err := arvadosclient.MakeArvadosClient()
+ c.Assert(err, Equals, nil)
+
+ kc, err := MakeKeepClient(&arv)
+ c.Assert(kc.Want_replicas, Equals, 2)
+
+ arv.DiscoveryDoc["defaultCollectionReplication"] = 3.0
+ kc, err = MakeKeepClient(&arv)
+ c.Assert(kc.Want_replicas, Equals, 3)
+
+ arv.DiscoveryDoc["defaultCollectionReplication"] = 1.0
+ kc, err = MakeKeepClient(&arv)
+ c.Assert(kc.Want_replicas, Equals, 1)
+}
+
type StubPutHandler struct {
c *C
expectPath string
fh.handled <- fmt.Sprintf("http://%s", req.Host)
}
+type FailThenSucceedHandler struct {
+ handled chan string
+ count int
+ successhandler StubGetHandler
+}
+
+func (fh *FailThenSucceedHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+ if fh.count == 0 {
+ resp.WriteHeader(500)
+ fh.count += 1
+ fh.handled <- fmt.Sprintf("http://%s", req.Host)
+ } else {
+ fh.successhandler.ServeHTTP(resp, req)
+ }
+}
+
+type Error404Handler struct {
+ handled chan string
+}
+
+func (fh Error404Handler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+ resp.WriteHeader(404)
+ fh.handled <- fmt.Sprintf("http://%s", req.Host)
+}
+
func (s *StandaloneSuite) TestFailedUploadToStubKeepServer(c *C) {
log.Printf("TestFailedUploadToStubKeepServer")
arv, err := arvadosclient.MakeArvadosClient()
kc, _ := MakeKeepClient(&arv)
arv.ApiToken = "abc123"
- kc.SetServiceRoots(map[string]string{"x": ks.url}, map[string]string{ks.url: ""}, nil)
+ kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
r, n, url2, err := kc.Get(hash)
defer r.Close()
log.Printf("TestGet done")
}
+func (s *StandaloneSuite) TestGet404(c *C) {
+ hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
+
+ st := Error404Handler{make(chan string, 1)}
+
+ ks := RunFakeKeepServer(st)
+ defer ks.listener.Close()
+
+ arv, err := arvadosclient.MakeArvadosClient()
+ kc, _ := MakeKeepClient(&arv)
+ arv.ApiToken = "abc123"
+ kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
+
+ r, n, url2, err := kc.Get(hash)
+ c.Check(err, Equals, BlockNotFound)
+ c.Check(n, Equals, int64(0))
+ c.Check(url2, Equals, "")
+ c.Check(r, Equals, nil)
+}
+
func (s *StandaloneSuite) TestGetFail(c *C) {
hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
arv, err := arvadosclient.MakeArvadosClient()
kc, _ := MakeKeepClient(&arv)
arv.ApiToken = "abc123"
- kc.SetServiceRoots(map[string]string{"x": ks.url}, map[string]string{ks.url: ""}, nil)
+ kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
+
+ r, n, url2, err := kc.Get(hash)
+ c.Check(err, Equals, BlockNotFound)
+ c.Check(n, Equals, int64(0))
+ c.Check(url2, Equals, "")
+ c.Check(r, Equals, nil)
+}
+
+func (s *StandaloneSuite) TestGetFailRetry(c *C) {
+ hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
+
+ st := &FailThenSucceedHandler{make(chan string, 1), 0,
+ StubGetHandler{
+ c,
+ hash,
+ "abc123",
+ http.StatusOK,
+ []byte("foo")}}
+
+ ks := RunFakeKeepServer(st)
+ defer ks.listener.Close()
+
+ arv, err := arvadosclient.MakeArvadosClient()
+ kc, _ := MakeKeepClient(&arv)
+ arv.ApiToken = "abc123"
+ kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
+
+ r, n, url2, err := kc.Get(hash)
+ defer r.Close()
+ c.Check(err, Equals, nil)
+ c.Check(n, Equals, int64(3))
+ c.Check(url2, Equals, fmt.Sprintf("%s/%s", ks.url, hash))
+
+ content, err2 := ioutil.ReadAll(r)
+ c.Check(err2, Equals, nil)
+ c.Check(content, DeepEquals, []byte("foo"))
+}
+
+func (s *StandaloneSuite) TestGetNetError(c *C) {
+ hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
+
+ arv, err := arvadosclient.MakeArvadosClient()
+ kc, _ := MakeKeepClient(&arv)
+ arv.ApiToken = "abc123"
+ kc.SetServiceRoots(map[string]string{"x": "http://localhost:62222"}, nil, nil)
r, n, url2, err := kc.Get(hash)
c.Check(err, Equals, BlockNotFound)
arv.ApiToken = "abc123"
kc.SetServiceRoots(
map[string]string{"x": ks0.url},
- map[string]string{"x": ks0.url},
+ nil,
map[string]string{uuid: ks.url})
r, n, uri, err := kc.Get(hash + "+K@" + uuid)
"zzzzz-bi6l4-xxxxxxxxxxxxxxx": ks0.url,
"zzzzz-bi6l4-wwwwwwwwwwwwwww": ks0.url,
uuid: ks.url},
- map[string]string{
- "zzzzz-bi6l4-yyyyyyyyyyyyyyy": ks0.url,
- "zzzzz-bi6l4-xxxxxxxxxxxxxxx": ks0.url,
- "zzzzz-bi6l4-wwwwwwwwwwwwwww": ks0.url,
- uuid: ks.url},
+ nil,
map[string]string{
"zzzzz-bi6l4-yyyyyyyyyyyyyyy": ks0.url,
"zzzzz-bi6l4-xxxxxxxxxxxxxxx": ks0.url,
arv.ApiToken = "abc123"
kc.SetServiceRoots(
map[string]string{"zzzzz-bi6l4-keepdisk0000000": ksLocal.url},
- map[string]string{"zzzzz-bi6l4-keepdisk0000000": ksLocal.url},
+ nil,
map[string]string{uuid: ksGateway.url})
r, n, uri, err := kc.Get(hash + "+K@" + uuid)
arv, err := arvadosclient.MakeArvadosClient()
kc, _ := MakeKeepClient(&arv)
arv.ApiToken = "abc123"
- kc.SetServiceRoots(map[string]string{"x": ks.url}, map[string]string{ks.url: ""}, nil)
+ kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
r, n, _, err := kc.Get(barhash)
_, err = ioutil.ReadAll(r)
content := []byte("waz")
hash := fmt.Sprintf("%x", md5.Sum(content))
- fh := FailHandler{
+ fh := Error404Handler{
make(chan string, 4)}
st := StubGetHandler{
arv, err := arvadosclient.MakeArvadosClient()
kc, _ := MakeKeepClient(&arv)
arv.ApiToken = "abc123"
- kc.SetServiceRoots(map[string]string{"x": ks.url}, map[string]string{ks.url: ""}, nil)
+ kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
r, err := kc.GetIndex("x", "")
c.Check(err, Equals, nil)
arv, err := arvadosclient.MakeArvadosClient()
kc, _ := MakeKeepClient(&arv)
arv.ApiToken = "abc123"
- kc.SetServiceRoots(map[string]string{"x": ks.url}, map[string]string{ks.url: ""}, nil)
+ kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
r, err := kc.GetIndex("x", hash[0:3])
c.Check(err, Equals, nil)
arv, err := arvadosclient.MakeArvadosClient()
kc, _ := MakeKeepClient(&arv)
arv.ApiToken = "abc123"
- kc.SetServiceRoots(map[string]string{"x": ks.url}, map[string]string{ks.url: ""}, nil)
+ kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
_, err = kc.GetIndex("x", hash[0:3])
c.Check(err, Equals, ErrIncompleteIndex)
arv, err := arvadosclient.MakeArvadosClient()
kc, _ := MakeKeepClient(&arv)
arv.ApiToken = "abc123"
- kc.SetServiceRoots(map[string]string{"x": ks.url}, map[string]string{ks.url: ""}, nil)
+ kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
_, err = kc.GetIndex("y", hash[0:3])
c.Check(err, Equals, ErrNoSuchKeepServer)
arv, err := arvadosclient.MakeArvadosClient()
kc, _ := MakeKeepClient(&arv)
arv.ApiToken = "abc123"
- kc.SetServiceRoots(map[string]string{"x": ks.url}, map[string]string{ks.url: ""}, nil)
+ kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
r, err := kc.GetIndex("x", "abcd")
c.Check(err, Equals, nil)
import (
"crypto/md5"
+ "encoding/json"
"errors"
"fmt"
"git.curoverse.com/arvados.git/sdk/go/streamer"
}
}
+type svcList struct {
+ Items []keepService `json:"items"`
+}
+
// DiscoverKeepServers gets list of available keep services from api server
func (this *KeepClient) DiscoverKeepServers() error {
- type svcList struct {
- Items []keepService `json:"items"`
- }
- var m svcList
+ var list svcList
// Get keep services from api server
- err := this.Arvados.Call("GET", "keep_services", "", "accessible", nil, &m)
+ err := this.Arvados.Call("GET", "keep_services", "", "accessible", nil, &list)
if err != nil {
return err
}
+ return this.loadKeepServers(list)
+}
+
+// LoadKeepServicesFromJSON gets list of available keep services from given JSON
+func (this *KeepClient) LoadKeepServicesFromJSON(services string) error {
+ var list svcList
+
+ // Load keep services from given json
+ dec := json.NewDecoder(strings.NewReader(services))
+ if err := dec.Decode(&list); err != nil {
+ return err
+ }
+
+ return this.loadKeepServers(list)
+}
+
+// loadKeepServers
+func (this *KeepClient) loadKeepServers(list svcList) error {
listed := make(map[string]bool)
localRoots := make(map[string]string)
gatewayRoots := make(map[string]string)
this.replicasPerService = 1
this.Using_proxy = false
- for _, service := range m.Items {
+ for _, service := range list.Items {
scheme := "http"
if service.SSL {
scheme = "https"
n, err := sr.Read(out)
c.Check(n, Equals, 100)
+ c.Check(err, IsNil)
n, err = sr.Read(out)
c.Check(n, Equals, 0)
}
}
} else {
- if reader_status == io.EOF {
- // no more reads expected, so this is ok
- } else {
+ if reader_status == nil {
// slices channel closed without signaling EOF
reader_status = io.ErrUnexpectedEOF
}
Should be used in a "with" block.
"""
def __init__(self, todo):
+ self._started = 0
self._todo = todo
self._done = 0
self._response = None
+ self._start_lock = threading.Condition()
self._todo_lock = threading.Semaphore(todo)
self._done_lock = threading.Lock()
+ self._local = threading.local()
def __enter__(self):
+ self._start_lock.acquire()
+ if getattr(self._local, 'sequence', None) is not None:
+ # If the calling thread has used set_sequence(N), then
+ # we wait here until N other threads have started.
+ while self._started < self._local.sequence:
+ self._start_lock.wait()
self._todo_lock.acquire()
+ self._started += 1
+ self._start_lock.notifyAll()
+ self._start_lock.release()
return self
def __exit__(self, type, value, traceback):
self._todo_lock.release()
+ def set_sequence(self, sequence):
+ self._local.sequence = sequence
+
def shall_i_proceed(self):
"""
Return true if the current thread should do stuff. Return
return self._success
def run(self):
- with self.args['thread_limiter'] as limiter:
+ limiter = self.args['thread_limiter']
+ sequence = self.args['thread_sequence']
+ if sequence is not None:
+ limiter.set_sequence(sequence)
+ with limiter:
if not limiter.shall_i_proceed():
# My turn arrived, but the job has been done without
# me.
thread_limiter = KeepClient.ThreadLimiter(1 if self.max_replicas_per_service is None else copies)
loop = retry.RetryLoop(num_retries, self._check_loop_result,
backoff_start=2)
+ thread_sequence = 0
for tries_left in loop:
try:
- local_roots = self.map_new_services(
+ sorted_roots = self.map_new_services(
roots_map, locator,
force_rebuild=(tries_left < num_retries), need_writable=True, **headers)
except Exception as error:
continue
threads = []
- for service_root, ks in roots_map.iteritems():
+ for service_root, ks in [(root, roots_map[root])
+ for root in sorted_roots]:
if ks.finished():
continue
t = KeepClient.KeepWriterThread(
data_hash=data_hash,
service_root=service_root,
thread_limiter=thread_limiter,
- timeout=self.current_timeout(num_retries-tries_left))
+ timeout=self.current_timeout(num_retries-tries_left),
+ thread_sequence=thread_sequence)
t.start()
threads.append(t)
+ thread_sequence += 1
for t in threads:
t.join()
loop.save_result((thread_limiter.done() >= copies, len(threads)))
data_hash, loop.last_result()))
else:
service_errors = ((key, roots_map[key].last_result()['error'])
- for key in local_roots
+ for key in sorted_roots
if roots_map[key].last_result()['error'])
raise arvados.errors.KeepWriteError(
"failed to write {} (wanted {} copies but wrote {})".format(
events {
}
http {
- access_log /dev/stderr combined;
+ access_log {{ACCESSLOG}} combined;
upstream arv-git-http {
server localhost:{{GITPORT}};
}
from __future__ import print_function
import argparse
import atexit
+import errno
import httplib2
import os
import pipes
with open(PID_PATH, 'r') as f:
server_pid = int(f.read())
good_pid = (os.kill(server_pid, 0) is None)
- except IOError:
- good_pid = False
- except OSError:
+ except EnvironmentError:
good_pid = False
now = time.time()
os.getpgid(server_pid)
time.sleep(0.1)
now = time.time()
- except IOError:
- pass
- except OSError:
+ except EnvironmentError:
pass
def find_available_port():
return port
-def run_keep(blob_signing_key=None, enforce_permissions=False):
- stop_keep()
+def run_keep(blob_signing_key=None, enforce_permissions=False, num_servers=2):
+ stop_keep(num_servers)
keep_args = {}
if not blob_signing_key:
host=os.environ['ARVADOS_API_HOST'],
token=os.environ['ARVADOS_API_TOKEN'],
insecure=True)
+
for d in api.keep_services().list().execute()['items']:
api.keep_services().delete(uuid=d['uuid']).execute()
for d in api.keep_disks().list().execute()['items']:
api.keep_disks().delete(uuid=d['uuid']).execute()
- for d in range(0, 2):
+ for d in range(0, num_servers):
port = _start_keep(d, keep_args)
svc = api.keep_services().create(body={'keep_service': {
'uuid': 'zzzzz-bi6l4-keepdisk{:07d}'.format(d),
if os.path.exists(os.path.join(TEST_TMPDIR, "keep.blob_signing_key")):
os.remove(os.path.join(TEST_TMPDIR, "keep.blob_signing_key"))
-def stop_keep():
- _stop_keep(0)
- _stop_keep(1)
+def stop_keep(num_servers=2):
+ for n in range(0, num_servers):
+ _stop_keep(n)
def run_keep_proxy():
if 'ARVADOS_TEST_PROXY_SERVICES' in os.environ:
nginxconf['GITSSLPORT'] = find_available_port()
nginxconf['SSLCERT'] = os.path.join(SERVICES_SRC_DIR, 'api', 'tmp', 'self-signed.pem')
nginxconf['SSLKEY'] = os.path.join(SERVICES_SRC_DIR, 'api', 'tmp', 'self-signed.key')
+ nginxconf['ACCESSLOG'] = os.path.join(TEST_TMPDIR, 'nginx_access_log.fifo')
conftemplatefile = os.path.join(MY_DIRNAME, 'nginx.conf')
conffile = os.path.join(TEST_TMPDIR, 'nginx.conf')
env = os.environ.copy()
env['PATH'] = env['PATH']+':/sbin:/usr/sbin:/usr/local/sbin'
+
+ try:
+ os.remove(nginxconf['ACCESSLOG'])
+ except OSError as error:
+ if error.errno != errno.ENOENT:
+ raise
+
+ os.mkfifo(nginxconf['ACCESSLOG'], 0700)
nginx = subprocess.Popen(
['nginx',
'-g', 'error_log stderr info;',
'-g', 'pid '+_pidfile('nginx')+';',
'-c', conffile],
env=env, stdin=open('/dev/null'), stdout=sys.stderr)
+ cat_access = subprocess.Popen(
+ ['cat', nginxconf['ACCESSLOG']],
+ stdout=sys.stderr)
_setport('keepproxy-ssl', nginxconf['KEEPPROXYSSLPORT'])
_setport('arv-git-httpd-ssl', nginxconf['GITSSLPORT'])
parser = argparse.ArgumentParser()
parser.add_argument('action', type=str, help="one of {}".format(actions))
parser.add_argument('--auth', type=str, metavar='FIXTURE_NAME', help='Print authorization info for given api_client_authorizations fixture')
+ parser.add_argument('--num-keep-servers', metavar='int', type=int, default=2, help="Number of keep servers desired")
+ parser.add_argument('--keep-enforce-permissions', action="store_true", help="Enforce keep permissions")
+
args = parser.parse_args()
if args.action not in actions:
elif args.action == 'stop':
stop(force=('ARVADOS_TEST_API_HOST' not in os.environ))
elif args.action == 'start_keep':
- run_keep()
+ run_keep(enforce_permissions=args.keep_enforce_permissions, num_servers=args.num_keep_servers)
elif args.action == 'stop_keep':
stop_keep()
elif args.action == 'start_keep_proxy':
mock.responses[0].getopt(pycurl.TIMEOUT_MS),
int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]*1000))
- def test_probe_order_reference_set(self):
+ def check_no_services_error(self, verb, exc_class):
+ api_client = mock.MagicMock(name='api_client')
+ api_client.keep_services().accessible().execute.side_effect = (
+ arvados.errors.ApiError)
+ keep_client = arvados.KeepClient(api_client=api_client)
+ with self.assertRaises(exc_class) as err_check:
+ getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0')
+ self.assertEqual(0, len(err_check.exception.request_errors()))
+
+ def test_get_error_with_no_services(self):
+ self.check_no_services_error('get', arvados.errors.KeepReadError)
+
+ def test_put_error_with_no_services(self):
+ self.check_no_services_error('put', arvados.errors.KeepWriteError)
+
+ def check_errors_from_last_retry(self, verb, exc_class):
+ api_client = self.mock_keep_services(count=2)
+ req_mock = tutil.mock_keep_responses(
+ "retry error reporting test", 500, 500, 403, 403)
+ with req_mock, tutil.skip_sleep, \
+ self.assertRaises(exc_class) as err_check:
+ keep_client = arvados.KeepClient(api_client=api_client)
+ getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0',
+ num_retries=3)
+ self.assertEqual([403, 403], [
+ getattr(error, 'status_code', None)
+ for error in err_check.exception.request_errors().itervalues()])
+
+ def test_get_error_reflects_last_retry(self):
+ self.check_errors_from_last_retry('get', arvados.errors.KeepReadError)
+
+ def test_put_error_reflects_last_retry(self):
+ self.check_errors_from_last_retry('put', arvados.errors.KeepWriteError)
+
+ def test_put_error_does_not_include_successful_puts(self):
+ data = 'partial failure test'
+ data_loc = '{}+{}'.format(hashlib.md5(data).hexdigest(), len(data))
+ api_client = self.mock_keep_services(count=3)
+ with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
+ self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
+ keep_client = arvados.KeepClient(api_client=api_client)
+ keep_client.put(data)
+ self.assertEqual(2, len(exc_check.exception.request_errors()))
+
+ def test_proxy_put_with_no_writable_services(self):
+ data = 'test with no writable services'
+ data_loc = '{}+{}'.format(hashlib.md5(data).hexdigest(), len(data))
+ api_client = self.mock_keep_services(service_type='proxy', read_only=True, count=1)
+ with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
+ self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
+ keep_client = arvados.KeepClient(api_client=api_client)
+ keep_client.put(data)
+ self.assertEqual(True, ("no Keep services available" in str(exc_check.exception)))
+ self.assertEqual(0, len(exc_check.exception.request_errors()))
+
+
+@tutil.skip_sleep
+class KeepClientRendezvousTestCase(unittest.TestCase, tutil.ApiClientMock):
+
+ def setUp(self):
# expected_order[i] is the probe order for
# hash=md5(sprintf("%064x",i)) where there are 16 services
# with uuid sprintf("anything-%015x",j) with j in 0..15. E.g.,
# the first probe for the block consisting of 64 "0"
# characters is the service whose uuid is
# "zzzzz-bi6l4-000000000000003", so expected_order[0][0]=='3'.
- expected_order = [
+ self.services = 16
+ self.expected_order = [
list('3eab2d5fc9681074'),
list('097dba52e648f1c3'),
list('c5b4e023f8a7d691'),
list('9d81c02e76a3bf54'),
]
- hashes = [
- hashlib.md5("{:064x}".format(x)).hexdigest()
- for x in range(len(expected_order))]
- api_client = self.mock_keep_services(count=16)
- keep_client = arvados.KeepClient(api_client=api_client)
- for i, hash in enumerate(hashes):
- roots = keep_client.weighted_service_roots(arvados.KeepLocator(hash))
+ self.blocks = [
+ "{:064x}".format(x)
+ for x in range(len(self.expected_order))]
+ self.hashes = [
+ hashlib.md5(self.blocks[x]).hexdigest()
+ for x in range(len(self.expected_order))]
+ self.api_client = self.mock_keep_services(count=self.services)
+ self.keep_client = arvados.KeepClient(api_client=self.api_client)
+
+ def test_weighted_service_roots_against_reference_set(self):
+ # Confirm weighted_service_roots() returns the correct order
+ for i, hash in enumerate(self.hashes):
+ roots = self.keep_client.weighted_service_roots(arvados.KeepLocator(hash))
got_order = [
re.search(r'//\[?keep0x([0-9a-f]+)', root).group(1)
for root in roots]
- self.assertEqual(expected_order[i], got_order)
+ self.assertEqual(self.expected_order[i], got_order)
+
+ def test_get_probe_order_against_reference_set(self):
+ self._test_probe_order_against_reference_set(
+ lambda i: self.keep_client.get(self.hashes[i], num_retries=1))
+
+ def test_put_probe_order_against_reference_set(self):
+ # copies=1 prevents the test from being sensitive to races
+ # between writer threads.
+ self._test_probe_order_against_reference_set(
+ lambda i: self.keep_client.put(self.blocks[i], num_retries=1, copies=1))
+
+ def _test_probe_order_against_reference_set(self, op):
+ for i in range(len(self.blocks)):
+ with tutil.mock_keep_responses('', *[500 for _ in range(self.services*2)]) as mock, \
+ self.assertRaises(arvados.errors.KeepRequestError):
+ op(i)
+ got_order = [
+ re.search(r'//\[?keep0x([0-9a-f]+)', resp.getopt(pycurl.URL)).group(1)
+ for resp in mock.responses]
+ self.assertEqual(self.expected_order[i]*2, got_order)
+
+ def test_put_probe_order_multiple_copies(self):
+ for copies in range(2, 4):
+ for i in range(len(self.blocks)):
+ with tutil.mock_keep_responses('', *[500 for _ in range(self.services*3)]) as mock, \
+ self.assertRaises(arvados.errors.KeepWriteError):
+ self.keep_client.put(self.blocks[i], num_retries=2, copies=copies)
+ got_order = [
+ re.search(r'//\[?keep0x([0-9a-f]+)', resp.getopt(pycurl.URL)).group(1)
+ for resp in mock.responses]
+ # With T threads racing to make requests, the position
+ # of a given server in the sequence of HTTP requests
+ # (got_order) cannot be more than T-1 positions
+ # earlier than that server's position in the reference
+ # probe sequence (expected_order).
+ #
+ # Loop invariant: we have accounted for +pos+ expected
+ # probes, either by seeing them in +got_order+ or by
+ # putting them in +pending+ in the hope of seeing them
+ # later. As long as +len(pending)<T+, we haven't
+ # started a request too early.
+ pending = []
+ for pos, expected in enumerate(self.expected_order[i]*3):
+ got = got_order[pos-len(pending)]
+ while got in pending:
+ del pending[pending.index(got)]
+ got = got_order[pos-len(pending)]
+ if got != expected:
+ pending.append(expected)
+ self.assertLess(
+ len(pending), copies,
+ "pending={}, with copies={}, got {}, expected {}".format(
+ pending, copies, repr(got_order), repr(self.expected_order[i]*3)))
def test_probe_waste_adding_one_server(self):
hashes = [
hashlib.md5("{:064x}".format(x)).hexdigest() for x in range(100)]
initial_services = 12
- api_client = self.mock_keep_services(count=initial_services)
- keep_client = arvados.KeepClient(api_client=api_client)
+ self.api_client = self.mock_keep_services(count=initial_services)
+ self.keep_client = arvados.KeepClient(api_client=self.api_client)
probes_before = [
- keep_client.weighted_service_roots(arvados.KeepLocator(hash)) for hash in hashes]
+ self.keep_client.weighted_service_roots(arvados.KeepLocator(hash)) for hash in hashes]
for added_services in range(1, 12):
api_client = self.mock_keep_services(count=initial_services+added_services)
keep_client = arvados.KeepClient(api_client=api_client)
data = hashlib.md5(data).hexdigest() + '+1234'
# Arbitrary port number:
aport = random.randint(1024,65535)
- api_client = self.mock_keep_services(service_port=aport, count=16)
+ api_client = self.mock_keep_services(service_port=aport, count=self.services)
keep_client = arvados.KeepClient(api_client=api_client)
with mock.patch('pycurl.Curl') as curl_mock, \
self.assertRaises(exc_class) as err_check:
def test_put_error_shows_probe_order(self):
self.check_64_zeros_error_order('put', arvados.errors.KeepWriteError)
- def check_no_services_error(self, verb, exc_class):
- api_client = mock.MagicMock(name='api_client')
- api_client.keep_services().accessible().execute.side_effect = (
- arvados.errors.ApiError)
- keep_client = arvados.KeepClient(api_client=api_client)
- with self.assertRaises(exc_class) as err_check:
- getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0')
- self.assertEqual(0, len(err_check.exception.request_errors()))
-
- def test_get_error_with_no_services(self):
- self.check_no_services_error('get', arvados.errors.KeepReadError)
-
- def test_put_error_with_no_services(self):
- self.check_no_services_error('put', arvados.errors.KeepWriteError)
-
- def check_errors_from_last_retry(self, verb, exc_class):
- api_client = self.mock_keep_services(count=2)
- req_mock = tutil.mock_keep_responses(
- "retry error reporting test", 500, 500, 403, 403)
- with req_mock, tutil.skip_sleep, \
- self.assertRaises(exc_class) as err_check:
- keep_client = arvados.KeepClient(api_client=api_client)
- getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0',
- num_retries=3)
- self.assertEqual([403, 403], [
- getattr(error, 'status_code', None)
- for error in err_check.exception.request_errors().itervalues()])
-
- def test_get_error_reflects_last_retry(self):
- self.check_errors_from_last_retry('get', arvados.errors.KeepReadError)
-
- def test_put_error_reflects_last_retry(self):
- self.check_errors_from_last_retry('put', arvados.errors.KeepWriteError)
-
- def test_put_error_does_not_include_successful_puts(self):
- data = 'partial failure test'
- data_loc = '{}+{}'.format(hashlib.md5(data).hexdigest(), len(data))
- api_client = self.mock_keep_services(count=3)
- with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
- self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
- keep_client = arvados.KeepClient(api_client=api_client)
- keep_client.put(data)
- self.assertEqual(2, len(exc_check.exception.request_errors()))
-
- def test_proxy_put_with_no_writable_services(self):
- data = 'test with no writable services'
- data_loc = '{}+{}'.format(hashlib.md5(data).hexdigest(), len(data))
- api_client = self.mock_keep_services(service_type='proxy', read_only=True, count=1)
- with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
- self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
- keep_client = arvados.KeepClient(api_client=api_client)
- keep_client.put(data)
- self.assertEqual(True, ("no Keep services available" in str(exc_check.exception)))
- self.assertEqual(0, len(exc_check.exception.request_errors()))
class KeepClientTimeout(unittest.TestCase, tutil.ApiClientMock):
DATA = 'x' * 2**10
gem 'themes_for_rails'
gem 'arvados', '>= 0.1.20150615153458'
-gem 'arvados-cli', '>= 0.1.20150128223752'
+gem 'arvados-cli', '>= 0.1.20150605170031'
# pg_power lets us use partial indexes in schema.rb in Rails 3
gem 'pg_power'
google-api-client (~> 0.6.3, >= 0.6.3)
json (~> 1.7, >= 1.7.7)
jwt (>= 0.1.5, < 1.0.0)
- arvados-cli (0.1.20150205181653)
+ arvados-cli (0.1.20150930141818)
activesupport (~> 3.2, >= 3.2.13)
andand (~> 1.3, >= 1.3.3)
- arvados (~> 0.1, >= 0.1.20150615153458)
+ arvados (~> 0.1, >= 0.1.20150128223554)
curb (~> 0.8)
google-api-client (~> 0.6.3, >= 0.6.3)
json (~> 1.7, >= 1.7.7)
coffee-script-source
execjs
coffee-script-source (1.7.0)
- curb (0.8.6)
+ curb (0.8.8)
daemon_controller (1.2.0)
database_cleaner (1.2.0)
erubis (2.7.0)
acts_as_api
andand
arvados (>= 0.1.20150615153458)
- arvados-cli (>= 0.1.20150128223752)
+ arvados-cli (>= 0.1.20150605170031)
coffee-rails (~> 3.2.0)
database_cleaner
factory_girl_rails
// start api and keep servers
arvadostest.ResetEnv()
arvadostest.StartAPI()
- arvadostest.StartKeep()
+ arvadostest.StartKeep(2, false)
arv = makeArvadosClient()
}
func TearDownDataManagerTest(t *testing.T) {
- arvadostest.StopKeep()
+ arvadostest.StopKeep(2)
arvadostest.StopAPI()
}
func (s *ServerRequiredSuite) SetUpSuite(c *C) {
arvadostest.StartAPI()
- arvadostest.StartKeep()
+ arvadostest.StartKeep(2, false)
}
func (s *ServerRequiredSuite) SetUpTest(c *C) {
}
func (s *ServerRequiredSuite) TearDownSuite(c *C) {
- arvadostest.StopKeep()
+ arvadostest.StopKeep(2)
arvadostest.StopAPI()
}
"io/ioutil"
"log"
"os"
+ "regexp"
"strings"
"time"
azureStorageAccountName string
azureStorageAccountKeyFile string
azureStorageReplication int
- azureWriteRaceInterval time.Duration = 15 * time.Second
- azureWriteRacePollTime time.Duration = time.Second
+ azureWriteRaceInterval = 15 * time.Second
+ azureWriteRacePollTime = time.Second
)
func readKeyFromFile(file string) (string, error) {
replication int
}
+// NewAzureBlobVolume returns a new AzureBlobVolume using the given
+// client and container name. The replication argument specifies the
+// replication level to report when writing data.
func NewAzureBlobVolume(client storage.Client, containerName string, readonly bool, replication int) *AzureBlobVolume {
return &AzureBlobVolume{
azClient: client,
return nil
}
+// Get reads a Keep block that has been stored as a block blob in the
+// container.
+//
+// If the block is younger than azureWriteRaceInterval and is
+// unexpectedly empty, assume a PutBlob operation is in progress, and
+// wait for it to finish writing.
func (v *AzureBlobVolume) Get(loc string) ([]byte, error) {
var deadline time.Time
haveDeadline := false
}
}
+// Compare the given data with existing stored data.
func (v *AzureBlobVolume) Compare(loc string, expect []byte) error {
rdr, err := v.bsClient.GetBlob(v.containerName, loc)
if err != nil {
return compareReaderWithBuf(rdr, expect, loc[:32])
}
+// Put sotres a Keep block as a block blob in the container.
func (v *AzureBlobVolume) Put(loc string, block []byte) error {
if v.readonly {
return MethodDisabledError
return v.bsClient.CreateBlockBlobFromReader(v.containerName, loc, uint64(len(block)), bytes.NewReader(block))
}
+// Touch updates the last-modified property of a block blob.
func (v *AzureBlobVolume) Touch(loc string) error {
if v.readonly {
return MethodDisabledError
})
}
+// Mtime returns the last-modified property of a block blob.
func (v *AzureBlobVolume) Mtime(loc string) (time.Time, error) {
props, err := v.bsClient.GetBlobProperties(v.containerName, loc)
if err != nil {
return time.Parse(time.RFC1123, props.LastModified)
}
+// IndexTo writes a list of Keep blocks that are stored in the
+// container.
func (v *AzureBlobVolume) IndexTo(prefix string, writer io.Writer) error {
params := storage.ListBlobsParameters{
Prefix: prefix,
if err != nil {
return err
}
+ if !v.isKeepBlock(b.Name) {
+ continue
+ }
if b.Properties.ContentLength == 0 && t.Add(azureWriteRaceInterval).After(time.Now()) {
// A new zero-length blob is probably
// just a new non-empty blob that
}
}
+// Delete a Keep block.
func (v *AzureBlobVolume) Delete(loc string) error {
if v.readonly {
return MethodDisabledError
})
}
+// Status returns a VolumeStatus struct with placeholder data.
func (v *AzureBlobVolume) Status() *VolumeStatus {
return &VolumeStatus{
DeviceNum: 1,
}
}
+// String returns a volume label, including the container name.
func (v *AzureBlobVolume) String() string {
return fmt.Sprintf("azure-storage-container:%+q", v.containerName)
}
+// Writable returns true, unless the -readonly flag was on when the
+// volume was added.
func (v *AzureBlobVolume) Writable() bool {
return !v.readonly
}
+// Replication returns the replication level of the container, as
+// specified by the -azure-storage-replication argument.
func (v *AzureBlobVolume) Replication() int {
return v.replication
}
return err
}
}
+
+var keepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
+func (v *AzureBlobVolume) isKeepBlock(s string) bool {
+ return keepBlockRegexp.MatchString(s)
+}
}
func (h *azStubHandler) TouchWithDate(container, hash string, t time.Time) {
- if blob, ok := h.blobs[container+"|"+hash]; !ok {
+ blob, ok := h.blobs[container+"|"+hash]
+ if !ok {
return
- } else {
- blob.Mtime = t
}
+ blob.Mtime = t
}
func (h *azStubHandler) PutRaw(container, hash string, data []byte) {
azureWriteRaceInterval = 2 * time.Second
azureWriteRacePollTime = 5 * time.Millisecond
- v.PutRaw(TestHash, []byte{})
+ v.PutRaw(TestHash, nil)
buf := new(bytes.Buffer)
v.IndexTo("", buf)
// start api and keep servers
arvadostest.StartAPI()
- arvadostest.StartKeep()
+ arvadostest.StartKeep(2, false)
// make arvadosclient
arv, err := arvadosclient.MakeArvadosClient()
v.PutRaw(TestHash2, TestBlock2)
v.PutRaw(TestHash3, TestBlock3)
+ // Blocks whose names aren't Keep hashes should be omitted from
+ // index
+ v.PutRaw("fffffffffnotreallyahashfffffffff", nil)
+ v.PutRaw("FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF", nil)
+ v.PutRaw("f0000000000000000000000000000000f", nil)
+ v.PutRaw("f00", nil)
+
buf := new(bytes.Buffer)
v.IndexTo("", buf)
indexRows := strings.Split(string(buf.Bytes()), "\n")
}
var blockDirRe = regexp.MustCompile(`^[0-9a-f]+$`)
+var blockFileRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
// IndexTo writes (to the given Writer) a list of blocks found on this
// volume which begin with the specified prefix. If the prefix is an
if !strings.HasPrefix(name, prefix) {
continue
}
+ if !blockFileRe.MatchString(name) {
+ continue
+ }
_, err = fmt.Fprint(w,
name,
"+", fileInfo[0].Size(),
def _get_slurm_state(self):
return subprocess.check_output(['sinfo', '--noheader', '-o', '%t', '-n', self._nodename])
- @ShutdownActorBase._retry((subprocess.CalledProcessError,))
+ # The following methods retry on OSError. This is intended to mitigate bug
+ # #6321 where fork() of node manager raises "OSError: [Errno 12] Cannot
+ # allocate memory" resulting in the untimely death of the shutdown actor
+ # and tends to result in node manager getting into a wedged state where it
+ # won't allocate new nodes or shut down gracefully. The underlying causes
+ # of the excessive memory usage that result in the "Cannot allocate memory"
+ # error are still being investigated.
+
+ @ShutdownActorBase._retry((subprocess.CalledProcessError, OSError))
def cancel_shutdown(self):
if self._nodename:
if self._get_slurm_state() in self.SLURM_DRAIN_STATES:
pass
return super(ComputeNodeShutdownActor, self).cancel_shutdown()
+ @ShutdownActorBase._retry((subprocess.CalledProcessError, OSError))
@ShutdownActorBase._stop_if_window_closed
- @ShutdownActorBase._retry((subprocess.CalledProcessError,))
def issue_slurm_drain(self):
self._set_node_state('DRAIN', 'Reason=Node Manager shutdown')
self._logger.info("Waiting for SLURM node %s to drain", self._nodename)
self._later.await_slurm_drain()
+ @ShutdownActorBase._retry((subprocess.CalledProcessError, OSError))
@ShutdownActorBase._stop_if_window_closed
- @ShutdownActorBase._retry((subprocess.CalledProcessError,))
def await_slurm_drain(self):
output = self._get_slurm_state()
if output in self.SLURM_END_STATES:
self.check_success_flag(False, 2)
self.check_slurm_got_args(proc_mock, 'sinfo', '--noheader', '-o', '%t', '-n', 'compute99')
+ def test_cancel_shutdown_retry(self, proc_mock):
+ proc_mock.side_effect = iter([OSError, 'drain\n', OSError, 'idle\n'])
+ self.make_mocks(arvados_node=testutil.arvados_node_mock(job_uuid=True))
+ self.make_actor()
+ self.check_success_flag(False, 2)
+
+ def test_issue_slurm_drain_retry(self, proc_mock):
+ proc_mock.side_effect = iter([OSError, '', OSError, 'drng\n'])
+ self.check_success_after_reset(proc_mock)
def test_arvados_node_cleaned_after_shutdown(self, proc_mock):
proc_mock.return_value = 'drain\n'
--- /dev/null
+keep-exercise
--- /dev/null
+// Testing tool for Keep services.
+//
+// keepexercise helps measure throughput and test reliability under
+// various usage patterns.
+//
+// By default, it reads and writes blocks containing 2^26 NUL
+// bytes. This generates network traffic without consuming much disk
+// space.
+//
+// For a more realistic test, enable -vary-request. Warning: this will
+// fill your storage volumes with random data if you leave it running,
+// which can cost you money or leave you with too little room for
+// useful data.
+//
+package main
+
+import (
+ "crypto/rand"
+ "encoding/binary"
+ "flag"
+ "io"
+ "io/ioutil"
+ "log"
+ "time"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+ "git.curoverse.com/arvados.git/sdk/go/keepclient"
+)
+
+// Command line config knobs
+var (
+ BlockSize = flag.Int("block-size", keepclient.BLOCKSIZE, "bytes per read/write op")
+ ReadThreads = flag.Int("rthreads", 1, "number of concurrent readers")
+ WriteThreads = flag.Int("wthreads", 1, "number of concurrent writers")
+ VaryRequest = flag.Bool("vary-request", false, "vary the data for each request: consumes disk space, exercises write behavior")
+ VaryThread = flag.Bool("vary-thread", false, "use -wthreads different data blocks")
+ Replicas = flag.Int("replicas", 1, "replication level for writing")
+ StatsInterval = flag.Duration("stats-interval", time.Second, "time interval between IO stats reports, or 0 to disable")
+)
+
+func main() {
+ flag.Parse()
+
+ arv, err := arvadosclient.MakeArvadosClient()
+ if err != nil {
+ log.Fatal(err)
+ }
+ kc, err := keepclient.MakeKeepClient(&arv)
+ if err != nil {
+ log.Fatal(err)
+ }
+ kc.Want_replicas = *Replicas
+ kc.Client.Timeout = 10 * time.Minute
+
+ nextBuf := make(chan []byte, *WriteThreads)
+ nextLocator := make(chan string, *ReadThreads+*WriteThreads)
+
+ go countBeans(nextLocator)
+ for i := 0; i < *WriteThreads; i++ {
+ go makeBufs(nextBuf, i)
+ go doWrites(kc, nextBuf, nextLocator)
+ }
+ for i := 0; i < *ReadThreads; i++ {
+ go doReads(kc, nextLocator)
+ }
+ <-make(chan struct{})
+}
+
+// Send 1234 to bytesInChan when we receive 1234 bytes from keepstore.
+var bytesInChan = make(chan uint64)
+var bytesOutChan = make(chan uint64)
+
+// Send struct{}{} to errorsChan when an error happens.
+var errorsChan = make(chan struct{})
+
+func countBeans(nextLocator chan string) {
+ t0 := time.Now()
+ var tickChan <-chan time.Time
+ if *StatsInterval > 0 {
+ tickChan = time.NewTicker(*StatsInterval).C
+ }
+ var bytesIn uint64
+ var bytesOut uint64
+ var errors uint64
+ for {
+ select {
+ case <-tickChan:
+ elapsed := time.Since(t0)
+ log.Printf("%v elapsed: read %v bytes (%.1f MiB/s), wrote %v bytes (%.1f MiB/s), errors %d",
+ elapsed,
+ bytesIn, (float64(bytesIn) / elapsed.Seconds() / 1048576),
+ bytesOut, (float64(bytesOut) / elapsed.Seconds() / 1048576),
+ errors,
+ )
+ case i := <-bytesInChan:
+ bytesIn += i
+ case o := <-bytesOutChan:
+ bytesOut += o
+ case <-errorsChan:
+ errors++
+ }
+ }
+}
+
+func makeBufs(nextBuf chan []byte, threadID int) {
+ buf := make([]byte, *BlockSize)
+ if *VaryThread {
+ binary.PutVarint(buf, int64(threadID))
+ }
+ for {
+ if *VaryRequest {
+ if _, err := io.ReadFull(rand.Reader, buf); err != nil {
+ log.Fatal(err)
+ }
+ }
+ nextBuf <- buf
+ }
+}
+
+func doWrites(kc *keepclient.KeepClient, nextBuf chan []byte, nextLocator chan string) {
+ for buf := range nextBuf {
+ locator, _, err := kc.PutB(buf)
+ if err != nil {
+ log.Print(err)
+ errorsChan <- struct{}{}
+ continue
+ }
+ bytesOutChan <- uint64(len(buf))
+ for cap(nextLocator) > len(nextLocator)+*WriteThreads {
+ // Give the readers something to do, unless
+ // they have lots queued up already.
+ nextLocator <- locator
+ }
+ }
+}
+
+func doReads(kc *keepclient.KeepClient, nextLocator chan string) {
+ for locator := range nextLocator {
+ rdr, size, url, err := kc.Get(locator)
+ if err != nil {
+ log.Print(err)
+ errorsChan <- struct{}{}
+ continue
+ }
+ n, err := io.Copy(ioutil.Discard, rdr)
+ rdr.Close()
+ if n != size || err != nil {
+ log.Printf("Got %d bytes (expected %d) from %s: %v", n, size, url, err)
+ errorsChan <- struct{}{}
+ continue
+ // Note we don't count the bytes received in
+ // partial/corrupt responses: we are measuring
+ // throughput, not resource consumption.
+ }
+ bytesInChan <- uint64(n)
+ }
+}
--- /dev/null
+keep-rsync
--- /dev/null
+package main
+
+import (
+ "bufio"
+ "crypto/tls"
+ "errors"
+ "flag"
+ "fmt"
+ "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+ "git.curoverse.com/arvados.git/sdk/go/keepclient"
+ "io/ioutil"
+ "log"
+ "net/http"
+ "os"
+ "regexp"
+ "strings"
+ "time"
+)
+
+func main() {
+ err := doMain()
+ if err != nil {
+ log.Fatalf("%v", err)
+ }
+}
+
+func doMain() error {
+ flags := flag.NewFlagSet("keep-rsync", flag.ExitOnError)
+
+ srcConfigFile := flags.String(
+ "src",
+ "",
+ "Source configuration filename. May be either a pathname to a config file, or (for example) 'foo' as shorthand for $HOME/.config/arvados/foo.conf")
+
+ dstConfigFile := flags.String(
+ "dst",
+ "",
+ "Destination configuration filename. May be either a pathname to a config file, or (for example) 'foo' as shorthand for $HOME/.config/arvados/foo.conf")
+
+ srcKeepServicesJSON := flags.String(
+ "src-keep-services-json",
+ "",
+ "An optional list of available source keepservices. "+
+ "If not provided, this list is obtained from api server configured in src-config-file.")
+
+ dstKeepServicesJSON := flags.String(
+ "dst-keep-services-json",
+ "",
+ "An optional list of available destination keepservices. "+
+ "If not provided, this list is obtained from api server configured in dst-config-file.")
+
+ replications := flags.Int(
+ "replications",
+ 0,
+ "Number of replications to write to the destination. If replications not specified, "+
+ "default replication level configured on destination server will be used.")
+
+ prefix := flags.String(
+ "prefix",
+ "",
+ "Index prefix")
+
+ // Parse args; omit the first arg which is the command name
+ flags.Parse(os.Args[1:])
+
+ srcConfig, srcBlobSigningKey, err := loadConfig(*srcConfigFile)
+ if err != nil {
+ return fmt.Errorf("Error loading src configuration from file: %s", err.Error())
+ }
+
+ dstConfig, _, err := loadConfig(*dstConfigFile)
+ if err != nil {
+ return fmt.Errorf("Error loading dst configuration from file: %s", err.Error())
+ }
+
+ // setup src and dst keepclients
+ kcSrc, err := setupKeepClient(srcConfig, *srcKeepServicesJSON, false, 0)
+ if err != nil {
+ return fmt.Errorf("Error configuring src keepclient: %s", err.Error())
+ }
+
+ kcDst, err := setupKeepClient(dstConfig, *dstKeepServicesJSON, true, *replications)
+ if err != nil {
+ return fmt.Errorf("Error configuring dst keepclient: %s", err.Error())
+ }
+
+ // Copy blocks not found in dst from src
+ err = performKeepRsync(kcSrc, kcDst, srcBlobSigningKey, *prefix)
+ if err != nil {
+ return fmt.Errorf("Error while syncing data: %s", err.Error())
+ }
+
+ return nil
+}
+
+type apiConfig struct {
+ APIToken string
+ APIHost string
+ APIHostInsecure bool
+ ExternalClient bool
+}
+
+// Load src and dst config from given files
+func loadConfig(configFile string) (config apiConfig, blobSigningKey string, err error) {
+ if configFile == "" {
+ return config, blobSigningKey, errors.New("config file not specified")
+ }
+
+ config, blobSigningKey, err = readConfigFromFile(configFile)
+ if err != nil {
+ return config, blobSigningKey, fmt.Errorf("Error reading config file: %v", err)
+ }
+
+ return
+}
+
+var matchTrue = regexp.MustCompile("^(?i:1|yes|true)$")
+
+// Read config from file
+func readConfigFromFile(filename string) (config apiConfig, blobSigningKey string, err error) {
+ if !strings.Contains(filename, "/") {
+ filename = os.Getenv("HOME") + "/.config/arvados/" + filename + ".conf"
+ }
+
+ content, err := ioutil.ReadFile(filename)
+
+ if err != nil {
+ return config, "", err
+ }
+
+ lines := strings.Split(string(content), "\n")
+ for _, line := range lines {
+ if line == "" {
+ continue
+ }
+
+ kv := strings.SplitN(line, "=", 2)
+ key := strings.TrimSpace(kv[0])
+ value := strings.TrimSpace(kv[1])
+
+ switch key {
+ case "ARVADOS_API_TOKEN":
+ config.APIToken = value
+ case "ARVADOS_API_HOST":
+ config.APIHost = value
+ case "ARVADOS_API_HOST_INSECURE":
+ config.APIHostInsecure = matchTrue.MatchString(value)
+ case "ARVADOS_EXTERNAL_CLIENT":
+ config.ExternalClient = matchTrue.MatchString(value)
+ case "ARVADOS_BLOB_SIGNING_KEY":
+ blobSigningKey = value
+ }
+ }
+ return
+}
+
+// setup keepclient using the config provided
+func setupKeepClient(config apiConfig, keepServicesJSON string, isDst bool, replications int) (kc *keepclient.KeepClient, err error) {
+ arv := arvadosclient.ArvadosClient{
+ ApiToken: config.APIToken,
+ ApiServer: config.APIHost,
+ ApiInsecure: config.APIHostInsecure,
+ Client: &http.Client{Transport: &http.Transport{
+ TLSClientConfig: &tls.Config{InsecureSkipVerify: config.APIHostInsecure}}},
+ External: config.ExternalClient,
+ }
+
+ // if keepServicesJSON is provided, use it to load services; else, use DiscoverKeepServers
+ if keepServicesJSON == "" {
+ kc, err = keepclient.MakeKeepClient(&arv)
+ if err != nil {
+ return nil, err
+ }
+ } else {
+ kc = keepclient.New(&arv)
+ err = kc.LoadKeepServicesFromJSON(keepServicesJSON)
+ if err != nil {
+ return kc, err
+ }
+ }
+
+ if isDst {
+ // Get default replications value from destination, if it is not already provided
+ if replications == 0 {
+ value, err := arv.Discovery("defaultCollectionReplication")
+ if err == nil {
+ replications = int(value.(float64))
+ } else {
+ return nil, err
+ }
+ }
+
+ kc.Want_replicas = replications
+ }
+
+ return kc, nil
+}
+
+// Get unique block locators from src and dst
+// Copy any blocks missing in dst
+func performKeepRsync(kcSrc, kcDst *keepclient.KeepClient, blobSigningKey, prefix string) error {
+ // Get unique locators from src
+ srcIndex, err := getUniqueLocators(kcSrc, prefix)
+ if err != nil {
+ return err
+ }
+
+ // Get unique locators from dst
+ dstIndex, err := getUniqueLocators(kcDst, prefix)
+ if err != nil {
+ return err
+ }
+
+ // Get list of locators found in src, but missing in dst
+ toBeCopied := getMissingLocators(srcIndex, dstIndex)
+
+ // Copy each missing block to dst
+ log.Printf("Before keep-rsync, there are %d blocks in src and %d blocks in dst. Start copying %d blocks from src not found in dst.",
+ len(srcIndex), len(dstIndex), len(toBeCopied))
+
+ err = copyBlocksToDst(toBeCopied, kcSrc, kcDst, blobSigningKey)
+
+ return err
+}
+
+// Get list of unique locators from the specified cluster
+func getUniqueLocators(kc *keepclient.KeepClient, prefix string) (map[string]bool, error) {
+ uniqueLocators := map[string]bool{}
+
+ // Get index and dedup
+ for uuid := range kc.LocalRoots() {
+ reader, err := kc.GetIndex(uuid, prefix)
+ if err != nil {
+ return uniqueLocators, err
+ }
+ scanner := bufio.NewScanner(reader)
+ for scanner.Scan() {
+ uniqueLocators[strings.Split(scanner.Text(), " ")[0]] = true
+ }
+ }
+
+ return uniqueLocators, nil
+}
+
+// Get list of locators that are in src but not in dst
+func getMissingLocators(srcLocators, dstLocators map[string]bool) []string {
+ var missingLocators []string
+ for locator := range srcLocators {
+ if _, ok := dstLocators[locator]; !ok {
+ missingLocators = append(missingLocators, locator)
+ }
+ }
+ return missingLocators
+}
+
+// Copy blocks from src to dst; only those that are missing in dst are copied
+func copyBlocksToDst(toBeCopied []string, kcSrc, kcDst *keepclient.KeepClient, blobSigningKey string) error {
+ total := len(toBeCopied)
+
+ startedAt := time.Now()
+ for done, locator := range toBeCopied {
+ if done == 0 {
+ log.Printf("Copying data block %d of %d (%.2f%% done): %v", done+1, total,
+ float64(done)/float64(total)*100, locator)
+ } else {
+ timePerBlock := time.Since(startedAt) / time.Duration(done)
+ log.Printf("Copying data block %d of %d (%.2f%% done, %v est. time remaining): %v", done+1, total,
+ float64(done)/float64(total)*100, timePerBlock*time.Duration(total-done), locator)
+ }
+
+ getLocator := locator
+ expiresAt := time.Now().AddDate(0, 0, 1)
+ if blobSigningKey != "" {
+ getLocator = keepclient.SignLocator(getLocator, kcSrc.Arvados.ApiToken, expiresAt, []byte(blobSigningKey))
+ }
+
+ reader, len, _, err := kcSrc.Get(getLocator)
+ if err != nil {
+ return fmt.Errorf("Error getting block: %v %v", locator, err)
+ }
+
+ _, _, err = kcDst.PutHR(getLocator[:32], reader, len)
+ if err != nil {
+ return fmt.Errorf("Error copying data block: %v %v", locator, err)
+ }
+ }
+
+ log.Printf("Successfully copied to destination %d blocks.", total)
+ return nil
+}
--- /dev/null
+package main
+
+import (
+ "crypto/md5"
+ "fmt"
+ "io/ioutil"
+ "os"
+ "strings"
+ "testing"
+ "time"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvadostest"
+ "git.curoverse.com/arvados.git/sdk/go/keepclient"
+
+ . "gopkg.in/check.v1"
+)
+
+// Gocheck boilerplate
+func Test(t *testing.T) {
+ TestingT(t)
+}
+
+// Gocheck boilerplate
+var _ = Suite(&ServerRequiredSuite{})
+var _ = Suite(&ServerNotRequiredSuite{})
+var _ = Suite(&DoMainTestSuite{})
+
+// Tests that require the Keep server running
+type ServerRequiredSuite struct{}
+type ServerNotRequiredSuite struct{}
+type DoMainTestSuite struct{}
+
+func (s *ServerRequiredSuite) SetUpSuite(c *C) {
+ // Start API server
+ arvadostest.StartAPI()
+}
+
+func (s *ServerRequiredSuite) TearDownSuite(c *C) {
+ arvadostest.StopAPI()
+ arvadostest.ResetEnv()
+}
+
+var initialArgs []string
+
+func (s *DoMainTestSuite) SetUpSuite(c *C) {
+ initialArgs = os.Args
+}
+
+var kcSrc, kcDst *keepclient.KeepClient
+var srcKeepServicesJSON, dstKeepServicesJSON, blobSigningKey string
+
+func (s *ServerRequiredSuite) SetUpTest(c *C) {
+ // reset all variables between tests
+ blobSigningKey = ""
+ srcKeepServicesJSON = ""
+ dstKeepServicesJSON = ""
+ kcSrc = &keepclient.KeepClient{}
+ kcDst = &keepclient.KeepClient{}
+}
+
+func (s *ServerRequiredSuite) TearDownTest(c *C) {
+ arvadostest.StopKeep(3)
+}
+
+func (s *DoMainTestSuite) SetUpTest(c *C) {
+ args := []string{"keep-rsync"}
+ os.Args = args
+}
+
+func (s *DoMainTestSuite) TearDownTest(c *C) {
+ os.Args = initialArgs
+}
+
+var testKeepServicesJSON = "{ \"kind\":\"arvados#keepServiceList\", \"etag\":\"\", \"self_link\":\"\", \"offset\":null, \"limit\":null, \"items\":[ { \"href\":\"/keep_services/zzzzz-bi6l4-123456789012340\", \"kind\":\"arvados#keepService\", \"etag\":\"641234567890enhj7hzx432e5\", \"uuid\":\"zzzzz-bi6l4-123456789012340\", \"owner_uuid\":\"zzzzz-tpzed-123456789012345\", \"service_host\":\"keep0.zzzzz.arvadosapi.com\", \"service_port\":25107, \"service_ssl_flag\":false, \"service_type\":\"disk\", \"read_only\":false }, { \"href\":\"/keep_services/zzzzz-bi6l4-123456789012341\", \"kind\":\"arvados#keepService\", \"etag\":\"641234567890enhj7hzx432e5\", \"uuid\":\"zzzzz-bi6l4-123456789012341\", \"owner_uuid\":\"zzzzz-tpzed-123456789012345\", \"service_host\":\"keep0.zzzzz.arvadosapi.com\", \"service_port\":25108, \"service_ssl_flag\":false, \"service_type\":\"disk\", \"read_only\":false } ], \"items_available\":2 }"
+
+// Testing keep-rsync needs two sets of keep services: src and dst.
+// The test setup hence creates 3 servers instead of the default 2,
+// and uses the first 2 as src and the 3rd as dst keep servers.
+func setupRsync(c *C, enforcePermissions bool, replications int) {
+ // srcConfig
+ var srcConfig apiConfig
+ srcConfig.APIHost = os.Getenv("ARVADOS_API_HOST")
+ srcConfig.APIToken = os.Getenv("ARVADOS_API_TOKEN")
+ srcConfig.APIHostInsecure = matchTrue.MatchString(os.Getenv("ARVADOS_API_HOST_INSECURE"))
+
+ // dstConfig
+ var dstConfig apiConfig
+ dstConfig.APIHost = os.Getenv("ARVADOS_API_HOST")
+ dstConfig.APIToken = os.Getenv("ARVADOS_API_TOKEN")
+ dstConfig.APIHostInsecure = matchTrue.MatchString(os.Getenv("ARVADOS_API_HOST_INSECURE"))
+
+ if enforcePermissions {
+ blobSigningKey = "zfhgfenhffzltr9dixws36j1yhksjoll2grmku38mi7yxd66h5j4q9w4jzanezacp8s6q0ro3hxakfye02152hncy6zml2ed0uc"
+ }
+
+ // Start Keep servers
+ arvadostest.StartKeep(3, enforcePermissions)
+
+ // setup keepclients
+ var err error
+ kcSrc, err = setupKeepClient(srcConfig, srcKeepServicesJSON, false, 0)
+ c.Check(err, IsNil)
+
+ kcDst, err = setupKeepClient(dstConfig, dstKeepServicesJSON, true, replications)
+ c.Check(err, IsNil)
+
+ for uuid := range kcSrc.LocalRoots() {
+ if strings.HasSuffix(uuid, "02") {
+ delete(kcSrc.LocalRoots(), uuid)
+ }
+ }
+ for uuid := range kcSrc.GatewayRoots() {
+ if strings.HasSuffix(uuid, "02") {
+ delete(kcSrc.GatewayRoots(), uuid)
+ }
+ }
+ for uuid := range kcSrc.WritableLocalRoots() {
+ if strings.HasSuffix(uuid, "02") {
+ delete(kcSrc.WritableLocalRoots(), uuid)
+ }
+ }
+
+ for uuid := range kcDst.LocalRoots() {
+ if strings.HasSuffix(uuid, "00") || strings.HasSuffix(uuid, "01") {
+ delete(kcDst.LocalRoots(), uuid)
+ }
+ }
+ for uuid := range kcDst.GatewayRoots() {
+ if strings.HasSuffix(uuid, "00") || strings.HasSuffix(uuid, "01") {
+ delete(kcDst.GatewayRoots(), uuid)
+ }
+ }
+ for uuid := range kcDst.WritableLocalRoots() {
+ if strings.HasSuffix(uuid, "00") || strings.HasSuffix(uuid, "01") {
+ delete(kcDst.WritableLocalRoots(), uuid)
+ }
+ }
+
+ if replications == 0 {
+ // Must have got default replications value of 2 from dst discovery document
+ c.Assert(kcDst.Want_replicas, Equals, 2)
+ } else {
+ // Since replications value is provided, it is used
+ c.Assert(kcDst.Want_replicas, Equals, replications)
+ }
+}
+
+func (s *ServerRequiredSuite) TestRsyncPutInOne_GetFromOtherShouldFail(c *C) {
+ setupRsync(c, false, 1)
+
+ // Put a block in src and verify that it is not found in dst
+ testNoCrosstalk(c, "test-data-1", kcSrc, kcDst)
+
+ // Put a block in dst and verify that it is not found in src
+ testNoCrosstalk(c, "test-data-2", kcDst, kcSrc)
+}
+
+func (s *ServerRequiredSuite) TestRsyncWithBlobSigning_PutInOne_GetFromOtherShouldFail(c *C) {
+ setupRsync(c, true, 1)
+
+ // Put a block in src and verify that it is not found in dst
+ testNoCrosstalk(c, "test-data-1", kcSrc, kcDst)
+
+ // Put a block in dst and verify that it is not found in src
+ testNoCrosstalk(c, "test-data-2", kcDst, kcSrc)
+}
+
+// Do a Put in the first and Get from the second,
+// which should raise block not found error.
+func testNoCrosstalk(c *C, testData string, kc1, kc2 *keepclient.KeepClient) {
+ // Put a block using kc1
+ locator, _, err := kc1.PutB([]byte(testData))
+ c.Assert(err, Equals, nil)
+
+ locator = strings.Split(locator, "+")[0]
+ _, _, _, err = kc2.Get(keepclient.SignLocator(locator, kc2.Arvados.ApiToken, time.Now().AddDate(0, 0, 1), []byte(blobSigningKey)))
+ c.Assert(err, NotNil)
+ c.Check(err.Error(), Equals, "Block not found")
+}
+
+// Test keep-rsync initialization, with srcKeepServicesJSON
+func (s *ServerRequiredSuite) TestRsyncInitializeWithKeepServicesJSON(c *C) {
+ srcKeepServicesJSON = testKeepServicesJSON
+
+ setupRsync(c, false, 1)
+
+ localRoots := kcSrc.LocalRoots()
+ c.Check(localRoots, NotNil)
+
+ foundIt := false
+ for k := range localRoots {
+ if k == "zzzzz-bi6l4-123456789012340" {
+ foundIt = true
+ }
+ }
+ c.Check(foundIt, Equals, true)
+
+ foundIt = false
+ for k := range localRoots {
+ if k == "zzzzz-bi6l4-123456789012341" {
+ foundIt = true
+ }
+ }
+ c.Check(foundIt, Equals, true)
+}
+
+// Test keep-rsync initialization with default replications count
+func (s *ServerRequiredSuite) TestInitializeRsyncDefaultReplicationsCount(c *C) {
+ setupRsync(c, false, 0)
+}
+
+// Test keep-rsync initialization with replications count argument
+func (s *ServerRequiredSuite) TestInitializeRsyncReplicationsCount(c *C) {
+ setupRsync(c, false, 3)
+}
+
+// Put some blocks in Src and some more in Dst
+// And copy missing blocks from Src to Dst
+func (s *ServerRequiredSuite) TestKeepRsync(c *C) {
+ testKeepRsync(c, false, "")
+}
+
+// Put some blocks in Src and some more in Dst with blob signing enabled.
+// And copy missing blocks from Src to Dst
+func (s *ServerRequiredSuite) TestKeepRsync_WithBlobSigning(c *C) {
+ testKeepRsync(c, true, "")
+}
+
+// Put some blocks in Src and some more in Dst
+// Use prefix while doing rsync
+// And copy missing blocks from Src to Dst
+func (s *ServerRequiredSuite) TestKeepRsync_WithPrefix(c *C) {
+ data := []byte("test-data-4")
+ hash := fmt.Sprintf("%x", md5.Sum(data))
+
+ testKeepRsync(c, false, hash[0:3])
+ c.Check(len(dstIndex) > len(dstLocators), Equals, true)
+}
+
+// Put some blocks in Src and some more in Dst
+// Use prefix not in src while doing rsync
+// And copy missing blocks from Src to Dst
+func (s *ServerRequiredSuite) TestKeepRsync_WithNoSuchPrefixInSrc(c *C) {
+ testKeepRsync(c, false, "999")
+ c.Check(len(dstIndex), Equals, len(dstLocators))
+}
+
+// Put 5 blocks in src. Put 2 of those blocks in dst
+// Hence there are 3 additional blocks in src
+// Also, put 2 extra blocks in dst; they are hence only in dst
+// Run rsync and verify that those 7 blocks are now available in dst
+func testKeepRsync(c *C, enforcePermissions bool, prefix string) {
+ setupRsync(c, enforcePermissions, 1)
+
+ // setupTestData
+ setupTestData(c, prefix)
+
+ err := performKeepRsync(kcSrc, kcDst, blobSigningKey, prefix)
+ c.Check(err, IsNil)
+
+ // Now GetIndex from dst and verify that all 5 from src and the 2 extra blocks are found
+ dstIndex, err = getUniqueLocators(kcDst, "")
+ c.Check(err, IsNil)
+
+ for _, locator := range srcLocatorsMatchingPrefix {
+ _, ok := dstIndex[locator]
+ c.Assert(ok, Equals, true)
+ }
+
+ for _, locator := range extraDstLocators {
+ _, ok := dstIndex[locator]
+ c.Assert(ok, Equals, true)
+ }
+
+ if prefix == "" {
+ // all blocks from src and the two extra blocks
+ c.Assert(len(dstIndex), Equals, len(srcLocators)+len(extraDstLocators))
+ } else {
+ // 1 matching prefix and copied over, 2 that were initially copied into dst along with src, and the 2 extra blocks
+ c.Assert(len(dstIndex), Equals, len(srcLocatorsMatchingPrefix)+len(extraDstLocators)+2)
+ }
+}
+
+// Setup test data in src and dst.
+var srcLocators, srcLocatorsMatchingPrefix, dstLocators, extraDstLocators []string
+var dstIndex map[string]bool
+
+func setupTestData(c *C, indexPrefix string) {
+ srcLocators = []string{}
+ srcLocatorsMatchingPrefix = []string{}
+ dstLocators = []string{}
+ extraDstLocators = []string{}
+ dstIndex = make(map[string]bool)
+
+ // Put a few blocks in src using kcSrc
+ for i := 0; i < 5; i++ {
+ hash, _, err := kcSrc.PutB([]byte(fmt.Sprintf("test-data-%d", i)))
+ c.Check(err, IsNil)
+
+ srcLocators = append(srcLocators, strings.Split(hash, "+A")[0])
+ if strings.HasPrefix(hash, indexPrefix) {
+ srcLocatorsMatchingPrefix = append(srcLocatorsMatchingPrefix, strings.Split(hash, "+A")[0])
+ }
+ }
+
+ // Put first two of those src blocks in dst using kcDst
+ for i := 0; i < 2; i++ {
+ hash, _, err := kcDst.PutB([]byte(fmt.Sprintf("test-data-%d", i)))
+ c.Check(err, IsNil)
+ dstLocators = append(dstLocators, strings.Split(hash, "+A")[0])
+ }
+
+ // Put two more blocks in dst; they are not in src at all
+ for i := 0; i < 2; i++ {
+ hash, _, err := kcDst.PutB([]byte(fmt.Sprintf("other-data-%d", i)))
+ c.Check(err, IsNil)
+ dstLocators = append(dstLocators, strings.Split(hash, "+A")[0])
+ extraDstLocators = append(extraDstLocators, strings.Split(hash, "+A")[0])
+ }
+}
+
+// Setup rsync using srcKeepServicesJSON with fake keepservers.
+// Expect error during performKeepRsync due to unreachable src keepservers.
+func (s *ServerRequiredSuite) TestErrorDuringRsync_FakeSrcKeepservers(c *C) {
+ srcKeepServicesJSON = testKeepServicesJSON
+
+ setupRsync(c, false, 1)
+
+ err := performKeepRsync(kcSrc, kcDst, "", "")
+ c.Check(strings.HasSuffix(err.Error(), "no such host"), Equals, true)
+}
+
+// Setup rsync using dstKeepServicesJSON with fake keepservers.
+// Expect error during performKeepRsync due to unreachable dst keepservers.
+func (s *ServerRequiredSuite) TestErrorDuringRsync_FakeDstKeepservers(c *C) {
+ dstKeepServicesJSON = testKeepServicesJSON
+
+ setupRsync(c, false, 1)
+
+ err := performKeepRsync(kcSrc, kcDst, "", "")
+ c.Check(strings.HasSuffix(err.Error(), "no such host"), Equals, true)
+}
+
+// Test rsync with signature error during Get from src.
+func (s *ServerRequiredSuite) TestErrorDuringRsync_ErrorGettingBlockFromSrc(c *C) {
+ setupRsync(c, true, 1)
+
+ // put some blocks in src and dst
+ setupTestData(c, "")
+
+ // Change blob signing key to a fake key, so that Get from src fails
+ blobSigningKey = "thisisfakeblobsigningkey"
+
+ err := performKeepRsync(kcSrc, kcDst, blobSigningKey, "")
+ c.Check(strings.HasSuffix(err.Error(), "Block not found"), Equals, true)
+}
+
+// Test rsync with error during Put to src.
+func (s *ServerRequiredSuite) TestErrorDuringRsync_ErrorPuttingBlockInDst(c *C) {
+ setupRsync(c, false, 1)
+
+ // put some blocks in src and dst
+ setupTestData(c, "")
+
+ // Increase Want_replicas on dst to result in insufficient replicas error during Put
+ kcDst.Want_replicas = 2
+
+ err := performKeepRsync(kcSrc, kcDst, blobSigningKey, "")
+ c.Check(strings.HasSuffix(err.Error(), "Could not write sufficient replicas"), Equals, true)
+}
+
+// Test loadConfig func
+func (s *ServerNotRequiredSuite) TestLoadConfig(c *C) {
+ // Setup a src config file
+ srcFile := setupConfigFile(c, "src-config")
+ defer os.Remove(srcFile.Name())
+ srcConfigFile := srcFile.Name()
+
+ // Setup a dst config file
+ dstFile := setupConfigFile(c, "dst-config")
+ defer os.Remove(dstFile.Name())
+ dstConfigFile := dstFile.Name()
+
+ // load configuration from those files
+ srcConfig, srcBlobSigningKey, err := loadConfig(srcConfigFile)
+ c.Check(err, IsNil)
+
+ c.Assert(srcConfig.APIHost, Equals, os.Getenv("ARVADOS_API_HOST"))
+ c.Assert(srcConfig.APIToken, Equals, os.Getenv("ARVADOS_API_TOKEN"))
+ c.Assert(srcConfig.APIHostInsecure, Equals, matchTrue.MatchString(os.Getenv("ARVADOS_API_HOST_INSECURE")))
+ c.Assert(srcConfig.ExternalClient, Equals, false)
+
+ dstConfig, _, err := loadConfig(dstConfigFile)
+ c.Check(err, IsNil)
+
+ c.Assert(dstConfig.APIHost, Equals, os.Getenv("ARVADOS_API_HOST"))
+ c.Assert(dstConfig.APIToken, Equals, os.Getenv("ARVADOS_API_TOKEN"))
+ c.Assert(dstConfig.APIHostInsecure, Equals, matchTrue.MatchString(os.Getenv("ARVADOS_API_HOST_INSECURE")))
+ c.Assert(dstConfig.ExternalClient, Equals, false)
+
+ c.Assert(srcBlobSigningKey, Equals, "abcdefg")
+}
+
+// Test loadConfig func without setting up the config files
+func (s *ServerNotRequiredSuite) TestLoadConfig_MissingSrcConfig(c *C) {
+ _, _, err := loadConfig("")
+ c.Assert(err.Error(), Equals, "config file not specified")
+}
+
+// Test loadConfig func - error reading config
+func (s *ServerNotRequiredSuite) TestLoadConfig_ErrorLoadingSrcConfig(c *C) {
+ _, _, err := loadConfig("no-such-config-file")
+ c.Assert(strings.HasSuffix(err.Error(), "no such file or directory"), Equals, true)
+}
+
+func setupConfigFile(c *C, name string) *os.File {
+ // Setup a config file
+ file, err := ioutil.TempFile(os.TempDir(), name)
+ c.Check(err, IsNil)
+
+ fileContent := "ARVADOS_API_HOST=" + os.Getenv("ARVADOS_API_HOST") + "\n"
+ fileContent += "ARVADOS_API_TOKEN=" + os.Getenv("ARVADOS_API_TOKEN") + "\n"
+ fileContent += "ARVADOS_API_HOST_INSECURE=" + os.Getenv("ARVADOS_API_HOST_INSECURE") + "\n"
+ fileContent += "ARVADOS_EXTERNAL_CLIENT=false\n"
+ fileContent += "ARVADOS_BLOB_SIGNING_KEY=abcdefg"
+
+ _, err = file.Write([]byte(fileContent))
+ c.Check(err, IsNil)
+
+ return file
+}
+
+func (s *DoMainTestSuite) Test_doMain_NoSrcConfig(c *C) {
+ err := doMain()
+ c.Check(err, NotNil)
+ c.Assert(err.Error(), Equals, "Error loading src configuration from file: config file not specified")
+}
+
+func (s *DoMainTestSuite) Test_doMain_SrcButNoDstConfig(c *C) {
+ srcConfig := setupConfigFile(c, "src")
+ args := []string{"-replications", "3", "-src", srcConfig.Name()}
+ os.Args = append(os.Args, args...)
+ err := doMain()
+ c.Check(err, NotNil)
+ c.Assert(err.Error(), Equals, "Error loading dst configuration from file: config file not specified")
+}
+
+func (s *DoMainTestSuite) Test_doMain_BadSrcConfig(c *C) {
+ args := []string{"-src", "abcd"}
+ os.Args = append(os.Args, args...)
+ err := doMain()
+ c.Check(err, NotNil)
+ c.Assert(strings.HasPrefix(err.Error(), "Error loading src configuration from file: Error reading config file"), Equals, true)
+}
+
+func (s *DoMainTestSuite) Test_doMain_WithReplicationsButNoSrcConfig(c *C) {
+ args := []string{"-replications", "3"}
+ os.Args = append(os.Args, args...)
+ err := doMain()
+ c.Check(err, NotNil)
+ c.Assert(err.Error(), Equals, "Error loading src configuration from file: config file not specified")
+}
+
+func (s *DoMainTestSuite) Test_doMainWithSrcAndDstConfig(c *C) {
+ srcConfig := setupConfigFile(c, "src")
+ dstConfig := setupConfigFile(c, "dst")
+ args := []string{"-src", srcConfig.Name(), "-dst", dstConfig.Name()}
+ os.Args = append(os.Args, args...)
+
+ // Start keepservers. Since we are not doing any tweaking as in setupRsync func,
+ // kcSrc and kcDst will be the same and no actual copying to dst will happen, but that's ok.
+ arvadostest.StartKeep(2, false)
+
+ err := doMain()
+ c.Check(err, IsNil)
+}