</pre>
</notextile>
+h3(#arv-get). arv get
+
+@arv get@ can be used to get a textual representation of Arvados objects from the command line. The output can be limited to a subset of the object's fields. This command can be used with only the knowledge of an object's UUID.
+
+<notextile>
+<pre>
+$ <code class="userinput">arv get --help</code>
+Usage: arv [--format json|yaml] get [uuid] [fields...]
+
+Fetch the specified Arvados object, select the specified fields,
+and print a text representation.
+</pre>
+</notextile>
+
h3(#arv-edit). arv edit
@arv edit@ can be used to edit Arvados objects from the command line. Arv edit opens up the editor of your choice (set the EDITOR environment variable) with the json or yaml description of the object. Saving the file will update the Arvados object on the API server, if it passes validation.
# Ward Vandewege <ward@curoverse.com>
require 'fileutils'
+require 'shellwords'
if RUBY_VERSION < '1.9.3' then
abort <<-EOS
end
-subcommands = %w(copy create edit keep pipeline run tag ws)
+subcommands = %w(copy create edit get keep pipeline run tag ws)
+
+def exec_bin bin, opts
+ bin_path = `which #{bin.shellescape}`.strip
+ if bin_path.empty?
+ raise "#{bin}: command not found"
+ end
+ exec bin_path, *opts
+end
def check_subcommands client, arvados, subcommand, global_opts, remaining_opts
case subcommand
arv_create client, arvados, global_opts, remaining_opts
when 'edit'
arv_edit client, arvados, global_opts, remaining_opts
+ when 'get'
+ arv_get client, arvados, global_opts, remaining_opts
when 'copy', 'tag', 'ws', 'run'
- exec `which arv-#{subcommand}`.strip, *remaining_opts
+ exec_bin "arv-#{subcommand}", remaining_opts
when 'keep'
@sub = remaining_opts.shift
if ['get', 'put', 'ls', 'normalize'].index @sub then
# Native Arvados
- exec `which arv-#{@sub}`.strip, *remaining_opts
+ exec_bin "arv-#{@sub}", remaining_opts
elsif @sub == 'docker'
- exec `which arv-keepdocker`.strip, *remaining_opts
+ exec_bin "arv-keepdocker", remaining_opts
else
puts "Usage: arv keep [method] [--parameters]\n"
puts "Use 'arv keep [method] --help' to get more information about specific methods.\n\n"
when 'pipeline'
sub = remaining_opts.shift
if sub == 'run'
- exec `which arv-run-pipeline-instance`.strip, *remaining_opts
+ exec_bin "arv-run-pipeline-instance", remaining_opts
else
puts "Usage: arv pipeline [method] [--parameters]\n"
puts "Use 'arv pipeline [method] --help' to get more information about specific methods.\n\n"
def edit_and_commit_object initial_obj, tmp_stem, global_opts, &block
- content = case global_opts[:format]
- when 'json'
- Oj.dump(initial_obj, :indent => 1)
- when 'yaml'
- initial_obj.to_yaml
- else
- abort "Unrecognized format #{global_opts[:format]}"
- end
+ content = get_obj_content initial_obj, global_opts
tmp_file = Tempfile.new([tmp_stem, ".#{global_opts[:format]}"])
tmp_file.write(content)
Oj.load(newcontent)
when 'yaml'
YAML.load(newcontent)
+ else
+ abort "Unrecognized format #{global_opts[:format]}"
end
yield newobj
results
end
-def arv_edit client, arvados, global_opts, remaining_opts
- uuid = remaining_opts.shift
- if uuid.nil? or uuid == "-h" or uuid == "--help"
- puts head_banner
- puts "Usage: arv edit [uuid] [fields...]\n\n"
- puts "Fetch the specified Arvados object, select the specified fields, \n"
- puts "open an interactive text editor on a text representation (json or\n"
- puts "yaml, use --format) and then update the object. Will use 'nano'\n"
- puts "by default, customize with the EDITOR or VISUAL environment variable.\n"
- exit 255
- end
-
- # determine controller
-
+def lookup_uuid_rsc arvados, uuid
m = /([a-z0-9]{5})-([a-z0-9]{5})-([a-z0-9]{15})/.match uuid
if !m
if /^[a-f0-9]{32}/.match uuid
abort "Could not determine resource type #{m[2]}"
end
+ return rsc
+end
+
+def fetch_rsc_obj client, arvados, rsc, uuid, remaining_opts
+
begin
result = client.execute(:api_method => eval('arvados.' + rsc + '.get'),
:parameters => {"uuid" => uuid},
:headers => {
authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
})
- oldobj = check_response result
+ obj = check_response result
rescue => e
abort "Server error: #{e}"
end
if remaining_opts.length > 0
- oldobj.select! { |k, v| remaining_opts.include? k }
+ obj.select! { |k, v| remaining_opts.include? k }
+ end
+
+ return obj
+end
+
+def get_obj_content obj, global_opts
+ content = case global_opts[:format]
+ when 'json'
+ Oj.dump(obj, :indent => 1)
+ when 'yaml'
+ obj.to_yaml
+ else
+ abort "Unrecognized format #{global_opts[:format]}"
+ end
+ return content
+end
+
+def arv_edit client, arvados, global_opts, remaining_opts
+ uuid = remaining_opts.shift
+ if uuid.nil? or uuid == "-h" or uuid == "--help"
+ puts head_banner
+ puts "Usage: arv edit [uuid] [fields...]\n\n"
+ puts "Fetch the specified Arvados object, select the specified fields, \n"
+ puts "open an interactive text editor on a text representation (json or\n"
+ puts "yaml, use --format) and then update the object. Will use 'nano'\n"
+ puts "by default, customize with the EDITOR or VISUAL environment variable.\n"
+ exit 255
end
+ rsc = lookup_uuid_rsc arvados, uuid
+ oldobj = fetch_rsc_obj client, arvados, rsc, uuid, remaining_opts
+
edit_and_commit_object oldobj, uuid, global_opts do |newobj|
newobj.select! {|k| newobj[k] != oldobj[k]}
if !newobj.empty?
exit 0
end
+def arv_get client, arvados, global_opts, remaining_opts
+ uuid = remaining_opts.shift
+ if uuid.nil? or uuid == "-h" or uuid == "--help"
+ puts head_banner
+ puts "Usage: arv [--format json|yaml] get [uuid] [fields...]\n\n"
+ puts "Fetch the specified Arvados object, select the specified fields,\n"
+ puts "and print a text representation.\n"
+ exit 255
+ end
+
+ rsc = lookup_uuid_rsc arvados, uuid
+ obj = fetch_rsc_obj client, arvados, rsc, uuid, remaining_opts
+ content = get_obj_content obj, global_opts
+
+ puts content
+ exit 0
+end
+
def arv_create client, arvados, global_opts, remaining_opts
types = resource_types(arvados.discovery_document)
create_opts = Trollop::options do
require 'minitest/autorun'
-require 'digest/md5'
+require 'json'
+require 'yaml'
+# Black box tests for 'arv get' command.
class TestArvGet < Minitest::Test
- def setup
- begin
- Dir.mkdir './tmp'
- rescue Errno::EEXIST
- end
- @@foo_manifest_locator ||= `echo -n foo | ./bin/arv-put --filename foo --no-progress -`.strip
- @@baz_locator ||= `echo -n baz | ./bin/arv-put --as-raw --no-progress -`.strip
- @@multilevel_manifest_locator ||= `echo ./foo/bar #{@@baz_locator} 0:3:baz | ./bin/arv-put --as-raw --no-progress -`.strip
- end
+ # UUID for an Arvados object that does not exist
+ NON_EXISTENT_OBJECT_UUID = "zzzzz-zzzzz-zzzzzzzzzzzzzzz"
+ # Name of field of Arvados object that can store any (textual) value
+ STORED_VALUE_FIELD_NAME = "name"
+ # Name of UUID field of Arvados object
+ UUID_FIELD_NAME = "uuid"
+ # Name of an invalid field of Arvados object
+ INVALID_FIELD_NAME = "invalid"
- def test_no_args
+ # Tests that a valid Arvados object can be retrieved in a supported format
+ # using: `arv get [uuid]`. Given all other `arv foo` commands return JSON
+ # when no format is specified, JSON should be expected in this case.
+ def test_get_valid_object_no_format_specified
+ stored_value = __method__.to_s
+ uuid = create_arv_object_with_value(stored_value)
out, err = capture_subprocess_io do
- assert_arv_get false
+ assert(arv_get_default(uuid))
end
- assert_equal '', out
- assert_match /^usage:/, err
+ assert_empty(err, "Error text not expected: '#{err}'")
+ arv_object = parse_json_arv_object(out)
+ assert(has_field_with_value(arv_object, STORED_VALUE_FIELD_NAME, stored_value))
end
- def test_help
+ # Tests that a valid Arvados object can be retrieved in JSON format using:
+ # `arv get [uuid] --format json`.
+ def test_get_valid_object_json_format_specified
+ stored_value = __method__.to_s
+ uuid = create_arv_object_with_value(stored_value)
out, err = capture_subprocess_io do
- assert_arv_get '-h'
+ assert(arv_get_json(uuid))
end
- $stderr.write err
- assert_equal '', err
- assert_match /^usage:/, out
+ assert_empty(err, "Error text not expected: '#{err}'")
+ arv_object = parse_json_arv_object(out)
+ assert(has_field_with_value(arv_object, STORED_VALUE_FIELD_NAME, stored_value))
end
- def test_file_to_dev_stdout
- test_file_to_stdout('/dev/stdout')
- end
-
- def test_file_to_stdout(specify_stdout_as='-')
+ # Tests that a valid Arvados object can be retrieved in YAML format using:
+ # `arv get [uuid] --format yaml`.
+ def test_get_valid_object_yaml_format_specified
+ stored_value = __method__.to_s
+ uuid = create_arv_object_with_value(stored_value)
out, err = capture_subprocess_io do
- assert_arv_get @@foo_manifest_locator + '/foo', specify_stdout_as
+ assert(arv_get_yaml(uuid))
end
- assert_equal '', err
- assert_equal 'foo', out
+ assert_empty(err, "Error text not expected: '#{err}'")
+ arv_object = parse_yaml_arv_object(out)
+ assert(has_field_with_value(arv_object, STORED_VALUE_FIELD_NAME, stored_value))
end
- def test_file_to_file
- remove_tmp_foo
+ # Tests that a subset of all fields of a valid Arvados object can be retrieved
+ # using: `arv get [uuid] [fields...]`.
+ def test_get_valid_object_with_valid_fields
+ stored_value = __method__.to_s
+ uuid = create_arv_object_with_value(stored_value)
out, err = capture_subprocess_io do
- assert_arv_get @@foo_manifest_locator + '/foo', 'tmp/foo'
+ assert(arv_get_json(uuid, STORED_VALUE_FIELD_NAME, UUID_FIELD_NAME))
end
- assert_equal '', err
- assert_equal '', out
- assert_equal 'foo', IO.read('tmp/foo')
+ assert_empty(err, "Error text not expected: '#{err}'")
+ arv_object = parse_json_arv_object(out)
+ assert(has_field_with_value(arv_object, STORED_VALUE_FIELD_NAME, stored_value))
+ assert(has_field_with_value(arv_object, UUID_FIELD_NAME, uuid))
end
- def test_file_to_file_no_overwrite_file
- File.open './tmp/foo', 'wb' do |f|
- f.write 'baz'
- end
+ # Tests that the valid field is retrieved when both a valid and invalid field
+ # are requested from a valid Arvados object, using:
+ # `arv get [uuid] [fields...]`.
+ def test_get_valid_object_with_both_valid_and_invalid_fields
+ stored_value = __method__.to_s
+ uuid = create_arv_object_with_value(stored_value)
out, err = capture_subprocess_io do
- assert_arv_get false, @@foo_manifest_locator + '/foo', 'tmp/foo'
+ assert(arv_get_json(uuid, STORED_VALUE_FIELD_NAME, INVALID_FIELD_NAME))
end
- assert_match /Local file tmp\/foo already exists/, err
- assert_equal '', out
- assert_equal 'baz', IO.read('tmp/foo')
+ assert_empty(err, "Error text not expected: '#{err}'")
+ arv_object = parse_json_arv_object(out)
+ assert(has_field_with_value(arv_object, STORED_VALUE_FIELD_NAME, stored_value))
+ refute(has_field_with_value(arv_object, INVALID_FIELD_NAME, stored_value))
end
- def test_file_to_file_no_overwrite_file_in_dir
- File.open './tmp/foo', 'wb' do |f|
- f.write 'baz'
- end
+ # Tests that no fields are retreived when no valid fields are requested from
+ # a valid Arvados object, using: `arv get [uuid] [fields...]`.
+ def test_get_valid_object_with_no_valid_fields
+ stored_value = __method__.to_s
+ uuid = create_arv_object_with_value(stored_value)
out, err = capture_subprocess_io do
- assert_arv_get false, @@foo_manifest_locator + '/', 'tmp/'
+ assert(arv_get_json(uuid, INVALID_FIELD_NAME))
end
- assert_match /Local file tmp\/foo already exists/, err
- assert_equal '', out
- assert_equal 'baz', IO.read('tmp/foo')
+ assert_empty(err, "Error text not expected: '#{err}'")
+ arv_object = parse_json_arv_object(out)
+ assert_equal(0, arv_object.length)
end
- def test_file_to_file_force_overwrite
- File.open './tmp/foo', 'wb' do |f|
- f.write 'baz'
- end
- assert_equal 'baz', IO.read('tmp/foo')
+ # Tests that an invalid (non-existent) Arvados object is not retrieved using:
+ # using: `arv get [non-existent-uuid]`.
+ def test_get_invalid_object
out, err = capture_subprocess_io do
- assert_arv_get '-f', @@foo_manifest_locator + '/', 'tmp/'
+ refute(arv_get_json(NON_EXISTENT_OBJECT_UUID))
end
- assert_match '', err
- assert_equal '', out
- assert_equal 'foo', IO.read('tmp/foo')
+ refute_empty(err, "Expected error feedback on request for invalid object")
+ assert_empty(out)
end
- def test_file_to_file_skip_existing
- File.open './tmp/foo', 'wb' do |f|
- f.write 'baz'
- end
- assert_equal 'baz', IO.read('tmp/foo')
+ # Tests that help text exists using: `arv get --help`.
+ def test_help_exists
out, err = capture_subprocess_io do
- assert_arv_get '--skip-existing', @@foo_manifest_locator + '/', 'tmp/'
+# assert(arv_get_default("--help"), "Expected exit code 0: #{$?}")
+ #XXX: Exit code given is 255. It probably should be 0, which seems to be
+ # standard elsewhere. However, 255 is in line with other `arv`
+ # commands (e.g. see `arv edit`) so ignoring the problem here.
+ arv_get_default("--help")
end
- assert_match '', err
- assert_equal '', out
- assert_equal 'baz', IO.read('tmp/foo')
+ assert_empty(err, "Error text not expected: '#{err}'")
+ refute_empty(out, "Help text should be given")
end
- def test_file_to_dir
- remove_tmp_foo
- out, err = capture_subprocess_io do
- assert_arv_get @@foo_manifest_locator + '/foo', 'tmp/'
- end
- assert_equal '', err
- assert_equal '', out
- assert_equal 'foo', IO.read('tmp/foo')
- end
-
- def test_dir_to_file
- out, err = capture_subprocess_io do
- assert_arv_get false, @@foo_manifest_locator + '/', 'tmp/foo'
- end
- assert_equal '', out
- assert_match /^usage:/, err
- end
-
- def test_dir_to_empty_string
- out, err = capture_subprocess_io do
- assert_arv_get false, @@foo_manifest_locator + '/', ''
- end
- assert_equal '', out
- assert_match /^usage:/, err
- end
-
- def test_nonexistent_block
- out, err = capture_subprocess_io do
- assert_arv_get false, 'e796ab2294f3e48ec709ffa8d6daf58c'
- end
- assert_equal '', out
- assert_match /Error:/, err
- end
-
- def test_nonexistent_manifest
- out, err = capture_subprocess_io do
- assert_arv_get false, 'acbd18db4cc2f85cedef654fccc4a4d8/', 'tmp/'
- end
- assert_equal '', out
- assert_match /Error:/, err
- end
-
- def test_manifest_root_to_dir
- remove_tmp_foo
- out, err = capture_subprocess_io do
- assert_arv_get '-r', @@foo_manifest_locator + '/', 'tmp/'
- end
- assert_equal '', err
- assert_equal '', out
- assert_equal 'foo', IO.read('tmp/foo')
- end
-
- def test_manifest_root_to_dir_noslash
- remove_tmp_foo
- out, err = capture_subprocess_io do
- assert_arv_get '-r', @@foo_manifest_locator + '/', 'tmp'
- end
- assert_equal '', err
- assert_equal '', out
- assert_equal 'foo', IO.read('tmp/foo')
- end
-
- def test_display_md5sum
- remove_tmp_foo
- out, err = capture_subprocess_io do
- assert_arv_get '-r', '--md5sum', @@foo_manifest_locator + '/', 'tmp/'
- end
- assert_equal "#{Digest::MD5.hexdigest('foo')} ./foo\n", err
- assert_equal '', out
- assert_equal 'foo', IO.read('tmp/foo')
+ protected
+ # Runs 'arv get <varargs>' with given arguments. Returns whether the exit
+ # status was 0 (i.e. success). Use $? to attain more details on failure.
+ def arv_get_default(*args)
+ return system("arv", "get", *args)
end
- def test_md5sum_nowrite
- remove_tmp_foo
- out, err = capture_subprocess_io do
- assert_arv_get '-n', '--md5sum', @@foo_manifest_locator + '/', 'tmp/'
- end
- assert_equal "#{Digest::MD5.hexdigest('foo')} ./foo\n", err
- assert_equal '', out
- assert_equal false, File.exists?('tmp/foo')
+ # Runs 'arv --format json get <varargs>' with given arguments. Returns whether
+ # the exit status was 0 (i.e. success). Use $? to attain more details on
+ # failure.
+ def arv_get_json(*args)
+ return system("arv", "--format", "json", "get", *args)
end
- def test_sha1_nowrite
- remove_tmp_foo
- out, err = capture_subprocess_io do
- assert_arv_get '-n', '-r', '--hash', 'sha1', @@foo_manifest_locator+'/', 'tmp/'
- end
- assert_equal "#{Digest::SHA1.hexdigest('foo')} ./foo\n", err
- assert_equal '', out
- assert_equal false, File.exists?('tmp/foo')
+ # Runs 'arv --format yaml get <varargs>' with given arguments. Returns whether
+ # the exit status was 0 (i.e. success). Use $? to attain more details on
+ # failure.
+ def arv_get_yaml(*args)
+ return system("arv", "--format", "yaml", "get", *args)
end
- def test_block_to_file
- remove_tmp_foo
+ # Creates an Arvados object that stores a given value. Returns the uuid of the
+ # created object.
+ def create_arv_object_with_value(value)
out, err = capture_subprocess_io do
- assert_arv_get @@foo_manifest_locator, 'tmp/foo'
+ system("arv", "tag", "add", value, "--object", "testing")
+ assert $?.success?, "Command failure running `arv tag`: #{$?}"
end
assert_equal '', err
- assert_equal '', out
-
- digest = Digest::MD5.hexdigest('foo')
- !(IO.read('tmp/foo')).gsub!( /^(. #{digest}+3)(.*)( 0:3:foo)$/).nil?
+ assert_operator 0, :<, out.strip.length
+ out.strip
end
- def test_create_directory_tree
- `rm -rf ./tmp/arv-get-test/`
- Dir.mkdir './tmp/arv-get-test'
- out, err = capture_subprocess_io do
- assert_arv_get @@multilevel_manifest_locator + '/', 'tmp/arv-get-test/'
+ # Parses the given JSON representation of an Arvados object, returning
+ # an equivalent Ruby representation (a hash map).
+ def parse_json_arv_object(arvObjectAsJson)
+ begin
+ parsed = JSON.parse(arvObjectAsJson)
+ assert(parsed.instance_of?(Hash))
+ return parsed
+ rescue JSON::ParserError => e
+ raise "Invalid JSON representation of Arvados object.\n" \
+ "Parse error: '#{e}'\n" \
+ "JSON: '#{arvObjectAsJson}'\n"
end
- assert_equal '', err
- assert_equal '', out
- assert_equal 'baz', IO.read('tmp/arv-get-test/foo/bar/baz')
end
- def test_create_partial_directory_tree
- `rm -rf ./tmp/arv-get-test/`
- Dir.mkdir './tmp/arv-get-test'
- out, err = capture_subprocess_io do
- assert_arv_get(@@multilevel_manifest_locator + '/foo/',
- 'tmp/arv-get-test/')
+ # Parses the given JSON representation of an Arvados object, returning
+ # an equivalent Ruby representation (a hash map).
+ def parse_yaml_arv_object(arvObjectAsYaml)
+ begin
+ parsed = YAML.load(arvObjectAsYaml)
+ assert(parsed.instance_of?(Hash))
+ return parsed
+ rescue
+ raise "Invalid YAML representation of Arvados object.\n" \
+ "YAML: '#{arvObjectAsYaml}'\n"
end
- assert_equal '', err
- assert_equal '', out
- assert_equal 'baz', IO.read('tmp/arv-get-test/bar/baz')
end
- protected
- def assert_arv_get(*args)
- expect = case args.first
- when true, false
- args.shift
- else
- true
- end
- assert_equal(expect,
- system(['./bin/arv-get', 'arv-get'], *args),
- "`arv-get #{args.join ' '}` " +
- "should exit #{if expect then 0 else 'non-zero' end}")
- end
-
- def remove_tmp_foo
- begin
- File.unlink('tmp/foo')
- rescue Errno::ENOENT
+ # Checks whether the given Arvados object has the given expected value for the
+ # specified field.
+ def has_field_with_value(arvObjectAsHash, fieldName, expectedValue)
+ if !arvObjectAsHash.has_key?(fieldName)
+ return false
end
+ return (arvObjectAsHash[fieldName] == expectedValue)
end
end
--- /dev/null
+require 'minitest/autorun'
+require 'digest/md5'
+
+class TestArvKeepGet < Minitest::Test
+ def setup
+ begin
+ Dir.mkdir './tmp'
+ rescue Errno::EEXIST
+ end
+ @@foo_manifest_locator ||= `echo -n foo | ./bin/arv-put --filename foo --no-progress -`.strip
+ @@baz_locator ||= `echo -n baz | ./bin/arv-put --as-raw --no-progress -`.strip
+ @@multilevel_manifest_locator ||= `echo ./foo/bar #{@@baz_locator} 0:3:baz | ./bin/arv-put --as-raw --no-progress -`.strip
+ end
+
+ def test_no_args
+ out, err = capture_subprocess_io do
+ assert_arv_get false
+ end
+ assert_equal '', out
+ assert_match /^usage:/, err
+ end
+
+ def test_help
+ out, err = capture_subprocess_io do
+ assert_arv_get '-h'
+ end
+ $stderr.write err
+ assert_equal '', err
+ assert_match /^usage:/, out
+ end
+
+ def test_file_to_dev_stdout
+ test_file_to_stdout('/dev/stdout')
+ end
+
+ def test_file_to_stdout(specify_stdout_as='-')
+ out, err = capture_subprocess_io do
+ assert_arv_get @@foo_manifest_locator + '/foo', specify_stdout_as
+ end
+ assert_equal '', err
+ assert_equal 'foo', out
+ end
+
+ def test_file_to_file
+ remove_tmp_foo
+ out, err = capture_subprocess_io do
+ assert_arv_get @@foo_manifest_locator + '/foo', 'tmp/foo'
+ end
+ assert_equal '', err
+ assert_equal '', out
+ assert_equal 'foo', IO.read('tmp/foo')
+ end
+
+ def test_file_to_file_no_overwrite_file
+ File.open './tmp/foo', 'wb' do |f|
+ f.write 'baz'
+ end
+ out, err = capture_subprocess_io do
+ assert_arv_get false, @@foo_manifest_locator + '/foo', 'tmp/foo'
+ end
+ assert_match /Local file tmp\/foo already exists/, err
+ assert_equal '', out
+ assert_equal 'baz', IO.read('tmp/foo')
+ end
+
+ def test_file_to_file_no_overwrite_file_in_dir
+ File.open './tmp/foo', 'wb' do |f|
+ f.write 'baz'
+ end
+ out, err = capture_subprocess_io do
+ assert_arv_get false, @@foo_manifest_locator + '/', 'tmp/'
+ end
+ assert_match /Local file tmp\/foo already exists/, err
+ assert_equal '', out
+ assert_equal 'baz', IO.read('tmp/foo')
+ end
+
+ def test_file_to_file_force_overwrite
+ File.open './tmp/foo', 'wb' do |f|
+ f.write 'baz'
+ end
+ assert_equal 'baz', IO.read('tmp/foo')
+ out, err = capture_subprocess_io do
+ assert_arv_get '-f', @@foo_manifest_locator + '/', 'tmp/'
+ end
+ assert_match '', err
+ assert_equal '', out
+ assert_equal 'foo', IO.read('tmp/foo')
+ end
+
+ def test_file_to_file_skip_existing
+ File.open './tmp/foo', 'wb' do |f|
+ f.write 'baz'
+ end
+ assert_equal 'baz', IO.read('tmp/foo')
+ out, err = capture_subprocess_io do
+ assert_arv_get '--skip-existing', @@foo_manifest_locator + '/', 'tmp/'
+ end
+ assert_match '', err
+ assert_equal '', out
+ assert_equal 'baz', IO.read('tmp/foo')
+ end
+
+ def test_file_to_dir
+ remove_tmp_foo
+ out, err = capture_subprocess_io do
+ assert_arv_get @@foo_manifest_locator + '/foo', 'tmp/'
+ end
+ assert_equal '', err
+ assert_equal '', out
+ assert_equal 'foo', IO.read('tmp/foo')
+ end
+
+ def test_dir_to_file
+ out, err = capture_subprocess_io do
+ assert_arv_get false, @@foo_manifest_locator + '/', 'tmp/foo'
+ end
+ assert_equal '', out
+ assert_match /^usage:/, err
+ end
+
+ def test_dir_to_empty_string
+ out, err = capture_subprocess_io do
+ assert_arv_get false, @@foo_manifest_locator + '/', ''
+ end
+ assert_equal '', out
+ assert_match /^usage:/, err
+ end
+
+ def test_nonexistent_block
+ out, err = capture_subprocess_io do
+ assert_arv_get false, 'e796ab2294f3e48ec709ffa8d6daf58c'
+ end
+ assert_equal '', out
+ assert_match /Error:/, err
+ end
+
+ def test_nonexistent_manifest
+ out, err = capture_subprocess_io do
+ assert_arv_get false, 'acbd18db4cc2f85cedef654fccc4a4d8/', 'tmp/'
+ end
+ assert_equal '', out
+ assert_match /Error:/, err
+ end
+
+ def test_manifest_root_to_dir
+ remove_tmp_foo
+ out, err = capture_subprocess_io do
+ assert_arv_get '-r', @@foo_manifest_locator + '/', 'tmp/'
+ end
+ assert_equal '', err
+ assert_equal '', out
+ assert_equal 'foo', IO.read('tmp/foo')
+ end
+
+ def test_manifest_root_to_dir_noslash
+ remove_tmp_foo
+ out, err = capture_subprocess_io do
+ assert_arv_get '-r', @@foo_manifest_locator + '/', 'tmp'
+ end
+ assert_equal '', err
+ assert_equal '', out
+ assert_equal 'foo', IO.read('tmp/foo')
+ end
+
+ def test_display_md5sum
+ remove_tmp_foo
+ out, err = capture_subprocess_io do
+ assert_arv_get '-r', '--md5sum', @@foo_manifest_locator + '/', 'tmp/'
+ end
+ assert_equal "#{Digest::MD5.hexdigest('foo')} ./foo\n", err
+ assert_equal '', out
+ assert_equal 'foo', IO.read('tmp/foo')
+ end
+
+ def test_md5sum_nowrite
+ remove_tmp_foo
+ out, err = capture_subprocess_io do
+ assert_arv_get '-n', '--md5sum', @@foo_manifest_locator + '/', 'tmp/'
+ end
+ assert_equal "#{Digest::MD5.hexdigest('foo')} ./foo\n", err
+ assert_equal '', out
+ assert_equal false, File.exists?('tmp/foo')
+ end
+
+ def test_sha1_nowrite
+ remove_tmp_foo
+ out, err = capture_subprocess_io do
+ assert_arv_get '-n', '-r', '--hash', 'sha1', @@foo_manifest_locator+'/', 'tmp/'
+ end
+ assert_equal "#{Digest::SHA1.hexdigest('foo')} ./foo\n", err
+ assert_equal '', out
+ assert_equal false, File.exists?('tmp/foo')
+ end
+
+ def test_block_to_file
+ remove_tmp_foo
+ out, err = capture_subprocess_io do
+ assert_arv_get @@foo_manifest_locator, 'tmp/foo'
+ end
+ assert_equal '', err
+ assert_equal '', out
+
+ digest = Digest::MD5.hexdigest('foo')
+ !(IO.read('tmp/foo')).gsub!( /^(. #{digest}+3)(.*)( 0:3:foo)$/).nil?
+ end
+
+ def test_create_directory_tree
+ `rm -rf ./tmp/arv-get-test/`
+ Dir.mkdir './tmp/arv-get-test'
+ out, err = capture_subprocess_io do
+ assert_arv_get @@multilevel_manifest_locator + '/', 'tmp/arv-get-test/'
+ end
+ assert_equal '', err
+ assert_equal '', out
+ assert_equal 'baz', IO.read('tmp/arv-get-test/foo/bar/baz')
+ end
+
+ def test_create_partial_directory_tree
+ `rm -rf ./tmp/arv-get-test/`
+ Dir.mkdir './tmp/arv-get-test'
+ out, err = capture_subprocess_io do
+ assert_arv_get(@@multilevel_manifest_locator + '/foo/',
+ 'tmp/arv-get-test/')
+ end
+ assert_equal '', err
+ assert_equal '', out
+ assert_equal 'baz', IO.read('tmp/arv-get-test/bar/baz')
+ end
+
+ protected
+ def assert_arv_get(*args)
+ expect = case args.first
+ when true, false
+ args.shift
+ else
+ true
+ end
+ assert_equal(expect,
+ system(['./bin/arv-get', 'arv-get'], *args),
+ "`arv-get #{args.join ' '}` " +
+ "should exit #{if expect then 0 else 'non-zero' end}")
+ end
+
+ def remove_tmp_foo
+ begin
+ File.unlink('tmp/foo')
+ rescue Errno::ENOENT
+ end
+ end
+end
require 'minitest/autorun'
require 'digest/md5'
-class TestArvPut < Minitest::Test
+class TestArvKeepPut < Minitest::Test
def setup
begin Dir.mkdir './tmp' rescue Errno::EEXIST end
begin Dir.mkdir './tmp/empty_dir' rescue Errno::EEXIST end
"log"
"os"
"os/exec"
+ "strconv"
"strings"
)
exec.Command("python", "run_test_server.py", "stop").Run()
}
+// StartKeep starts 2 keep servers with enforcePermissions=false
func StartKeep() {
+ StartKeepWithParams(2, false)
+}
+
+// StartKeepWithParams starts the given number of keep servers,
+// optionally with -enforce-permissions enabled.
+func StartKeepWithParams(numKeepServers int, enforcePermissions bool) {
cwd, _ := os.Getwd()
defer os.Chdir(cwd)
chdirToPythonTests()
- cmd := exec.Command("python", "run_test_server.py", "start_keep")
+ cmdArgs := []string{"run_test_server.py", "start_keep", "--num-keep-servers", strconv.Itoa(numKeepServers)}
+ if enforcePermissions {
+ cmdArgs = append(cmdArgs, "--keep-enforce-permissions")
+ }
+
+ cmd := exec.Command("python", cmdArgs...)
+
stderr, err := cmd.StderrPipe()
if err != nil {
log.Fatalf("Setting up stderr pipe: %s", err)
}
func StopKeep() {
+ StopKeepWithParams(2)
+}
+
+// StopKeepServers stops keep servers that were started with
+// StartKeep. numkeepServers should be the same value that was passed
+// to StartKeep.
+func StopKeepWithParams(numKeepServers int) {
cwd, _ := os.Getwd()
defer os.Chdir(cwd)
chdirToPythonTests()
- exec.Command("python", "run_test_server.py", "stop_keep").Run()
+ exec.Command("python", "run_test_server.py", "stop_keep", "--num-keep-servers", strconv.Itoa(numKeepServers))
}
"io/ioutil"
"log"
"net/http"
- "os"
"regexp"
"strconv"
"strings"
replicasPerService int
}
-// Create a new KeepClient. This will contact the API server to discover Keep
-// servers.
+// MakeKeepClient creates a new KeepClient by contacting the API server to discover Keep servers.
func MakeKeepClient(arv *arvadosclient.ArvadosClient) (*KeepClient, error) {
- var matchTrue = regexp.MustCompile("^(?i:1|yes|true)$")
- insecure := matchTrue.MatchString(os.Getenv("ARVADOS_API_HOST_INSECURE"))
+ kc := New(arv)
+ return kc, kc.DiscoverKeepServers()
+}
+// New func creates a new KeepClient struct.
+// This func does not discover keep servers. It is the caller's responsibility.
+func New(arv *arvadosclient.ArvadosClient) *KeepClient {
defaultReplicationLevel := 2
value, err := arv.Discovery("defaultCollectionReplication")
if err == nil {
Want_replicas: defaultReplicationLevel,
Using_proxy: false,
Client: &http.Client{Transport: &http.Transport{
- TLSClientConfig: &tls.Config{InsecureSkipVerify: insecure}}},
+ TLSClientConfig: &tls.Config{InsecureSkipVerify: arv.ApiInsecure}}},
Retries: 2,
}
- return kc, kc.DiscoverKeepServers()
+ return kc
}
// Put a block given the block hash, a reader, and the number of bytes
import (
"crypto/md5"
+ "encoding/json"
"errors"
"fmt"
"git.curoverse.com/arvados.git/sdk/go/streamer"
}
}
+type svcList struct {
+ Items []keepService `json:"items"`
+}
+
// DiscoverKeepServers gets list of available keep services from api server
func (this *KeepClient) DiscoverKeepServers() error {
- type svcList struct {
- Items []keepService `json:"items"`
- }
- var m svcList
+ var list svcList
// Get keep services from api server
- err := this.Arvados.Call("GET", "keep_services", "", "accessible", nil, &m)
+ err := this.Arvados.Call("GET", "keep_services", "", "accessible", nil, &list)
if err != nil {
return err
}
+ return this.loadKeepServers(list)
+}
+
+// LoadKeepServicesFromJSON gets list of available keep services from given JSON
+func (this *KeepClient) LoadKeepServicesFromJSON(services string) error {
+ var list svcList
+
+ // Load keep services from given json
+ dec := json.NewDecoder(strings.NewReader(services))
+ if err := dec.Decode(&list); err != nil {
+ return err
+ }
+
+ return this.loadKeepServers(list)
+}
+
+// loadKeepServers
+func (this *KeepClient) loadKeepServers(list svcList) error {
listed := make(map[string]bool)
localRoots := make(map[string]string)
gatewayRoots := make(map[string]string)
this.replicasPerService = 1
this.Using_proxy = false
- for _, service := range m.Items {
+ for _, service := range list.Items {
scheme := "http"
if service.SSL {
scheme = "https"
return port
-def run_keep(blob_signing_key=None, enforce_permissions=False):
- stop_keep()
+def run_keep(blob_signing_key=None, enforce_permissions=False, num_servers=2):
+ stop_keep(num_servers)
keep_args = {}
if not blob_signing_key:
host=os.environ['ARVADOS_API_HOST'],
token=os.environ['ARVADOS_API_TOKEN'],
insecure=True)
+
for d in api.keep_services().list().execute()['items']:
api.keep_services().delete(uuid=d['uuid']).execute()
for d in api.keep_disks().list().execute()['items']:
api.keep_disks().delete(uuid=d['uuid']).execute()
- for d in range(0, 2):
+ for d in range(0, num_servers):
port = _start_keep(d, keep_args)
svc = api.keep_services().create(body={'keep_service': {
'uuid': 'zzzzz-bi6l4-keepdisk{:07d}'.format(d),
if os.path.exists(os.path.join(TEST_TMPDIR, "keep.blob_signing_key")):
os.remove(os.path.join(TEST_TMPDIR, "keep.blob_signing_key"))
-def stop_keep():
- _stop_keep(0)
- _stop_keep(1)
+def stop_keep(num_servers=2):
+ for n in range(0, num_servers):
+ _stop_keep(n)
def run_keep_proxy():
if 'ARVADOS_TEST_PROXY_SERVICES' in os.environ:
parser = argparse.ArgumentParser()
parser.add_argument('action', type=str, help="one of {}".format(actions))
parser.add_argument('--auth', type=str, metavar='FIXTURE_NAME', help='Print authorization info for given api_client_authorizations fixture')
+ parser.add_argument('--num-keep-servers', metavar='int', type=int, default=2, help="Number of keep servers desired")
+ parser.add_argument('--keep-enforce-permissions', action="store_true", help="Enforce keep permissions")
+
args = parser.parse_args()
if args.action not in actions:
elif args.action == 'stop':
stop(force=('ARVADOS_TEST_API_HOST' not in os.environ))
elif args.action == 'start_keep':
- run_keep()
+ run_keep(enforce_permissions=args.keep_enforce_permissions, num_servers=args.num_keep_servers)
elif args.action == 'stop_keep':
stop_keep()
elif args.action == 'start_keep_proxy':
def _get_slurm_state(self):
return subprocess.check_output(['sinfo', '--noheader', '-o', '%t', '-n', self._nodename])
- @ShutdownActorBase._retry((subprocess.CalledProcessError,))
+ # The following methods retry on OSError. This is intended to mitigate bug
+ # #6321 where fork() of node manager raises "OSError: [Errno 12] Cannot
+ # allocate memory" resulting in the untimely death of the shutdown actor
+ # and tends to result in node manager getting into a wedged state where it
+ # won't allocate new nodes or shut down gracefully. The underlying causes
+ # of the excessive memory usage that result in the "Cannot allocate memory"
+ # error are still being investigated.
+
+ @ShutdownActorBase._retry((subprocess.CalledProcessError, OSError))
def cancel_shutdown(self):
if self._nodename:
if self._get_slurm_state() in self.SLURM_DRAIN_STATES:
pass
return super(ComputeNodeShutdownActor, self).cancel_shutdown()
+ @ShutdownActorBase._retry((subprocess.CalledProcessError, OSError))
@ShutdownActorBase._stop_if_window_closed
- @ShutdownActorBase._retry((subprocess.CalledProcessError,))
def issue_slurm_drain(self):
self._set_node_state('DRAIN', 'Reason=Node Manager shutdown')
self._logger.info("Waiting for SLURM node %s to drain", self._nodename)
self._later.await_slurm_drain()
+ @ShutdownActorBase._retry((subprocess.CalledProcessError, OSError))
@ShutdownActorBase._stop_if_window_closed
- @ShutdownActorBase._retry((subprocess.CalledProcessError,))
def await_slurm_drain(self):
output = self._get_slurm_state()
if output in self.SLURM_END_STATES:
self.check_success_flag(False, 2)
self.check_slurm_got_args(proc_mock, 'sinfo', '--noheader', '-o', '%t', '-n', 'compute99')
+ def test_cancel_shutdown_retry(self, proc_mock):
+ proc_mock.side_effect = iter([OSError, 'drain\n', OSError, 'idle\n'])
+ self.make_mocks(arvados_node=testutil.arvados_node_mock(job_uuid=True))
+ self.make_actor()
+ self.check_success_flag(False, 2)
+
+ def test_issue_slurm_drain_retry(self, proc_mock):
+ proc_mock.side_effect = iter([OSError, '', OSError, 'drng\n'])
+ self.check_success_after_reset(proc_mock)
def test_arvados_node_cleaned_after_shutdown(self, proc_mock):
proc_mock.return_value = 'drain\n'
--- /dev/null
+keep-exercise
--- /dev/null
+// Testing tool for Keep services.
+//
+// keepexercise helps measure throughput and test reliability under
+// various usage patterns.
+//
+// By default, it reads and writes blocks containing 2^26 NUL
+// bytes. This generates network traffic without consuming much disk
+// space.
+//
+// For a more realistic test, enable -vary-request. Warning: this will
+// fill your storage volumes with random data if you leave it running,
+// which can cost you money or leave you with too little room for
+// useful data.
+//
+package main
+
+import (
+ "crypto/rand"
+ "encoding/binary"
+ "flag"
+ "io"
+ "io/ioutil"
+ "log"
+ "time"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+ "git.curoverse.com/arvados.git/sdk/go/keepclient"
+)
+
+// Command line config knobs
+var (
+ BlockSize = flag.Int("block-size", keepclient.BLOCKSIZE, "bytes per read/write op")
+ ReadThreads = flag.Int("rthreads", 1, "number of concurrent readers")
+ WriteThreads = flag.Int("wthreads", 1, "number of concurrent writers")
+ VaryRequest = flag.Bool("vary-request", false, "vary the data for each request: consumes disk space, exercises write behavior")
+ VaryThread = flag.Bool("vary-thread", false, "use -wthreads different data blocks")
+ Replicas = flag.Int("replicas", 1, "replication level for writing")
+ StatsInterval = flag.Duration("stats-interval", time.Second, "time interval between IO stats reports, or 0 to disable")
+)
+
+func main() {
+ flag.Parse()
+
+ arv, err := arvadosclient.MakeArvadosClient()
+ if err != nil {
+ log.Fatal(err)
+ }
+ kc, err := keepclient.MakeKeepClient(&arv)
+ if err != nil {
+ log.Fatal(err)
+ }
+ kc.Want_replicas = *Replicas
+ kc.Client.Timeout = 10 * time.Minute
+
+ nextBuf := make(chan []byte, *WriteThreads)
+ nextLocator := make(chan string, *ReadThreads+*WriteThreads)
+
+ go countBeans(nextLocator)
+ for i := 0; i < *WriteThreads; i++ {
+ go makeBufs(nextBuf, i)
+ go doWrites(kc, nextBuf, nextLocator)
+ }
+ for i := 0; i < *ReadThreads; i++ {
+ go doReads(kc, nextLocator)
+ }
+ <-make(chan struct{})
+}
+
+// Send 1234 to bytesInChan when we receive 1234 bytes from keepstore.
+var bytesInChan = make(chan uint64)
+var bytesOutChan = make(chan uint64)
+
+// Send struct{}{} to errorsChan when an error happens.
+var errorsChan = make(chan struct{})
+
+func countBeans(nextLocator chan string) {
+ t0 := time.Now()
+ var tickChan <-chan time.Time
+ if *StatsInterval > 0 {
+ tickChan = time.NewTicker(*StatsInterval).C
+ }
+ var bytesIn uint64
+ var bytesOut uint64
+ var errors uint64
+ for {
+ select {
+ case <-tickChan:
+ elapsed := time.Since(t0)
+ log.Printf("%v elapsed: read %v bytes (%.1f MiB/s), wrote %v bytes (%.1f MiB/s), errors %d",
+ elapsed,
+ bytesIn, (float64(bytesIn) / elapsed.Seconds() / 1048576),
+ bytesOut, (float64(bytesOut) / elapsed.Seconds() / 1048576),
+ errors,
+ )
+ case i := <-bytesInChan:
+ bytesIn += i
+ case o := <-bytesOutChan:
+ bytesOut += o
+ case <-errorsChan:
+ errors++
+ }
+ }
+}
+
+func makeBufs(nextBuf chan []byte, threadID int) {
+ buf := make([]byte, *BlockSize)
+ if *VaryThread {
+ binary.PutVarint(buf, int64(threadID))
+ }
+ for {
+ if *VaryRequest {
+ if _, err := io.ReadFull(rand.Reader, buf); err != nil {
+ log.Fatal(err)
+ }
+ }
+ nextBuf <- buf
+ }
+}
+
+func doWrites(kc *keepclient.KeepClient, nextBuf chan []byte, nextLocator chan string) {
+ for buf := range nextBuf {
+ locator, _, err := kc.PutB(buf)
+ if err != nil {
+ log.Print(err)
+ errorsChan <- struct{}{}
+ continue
+ }
+ bytesOutChan <- uint64(len(buf))
+ for cap(nextLocator) > len(nextLocator)+*WriteThreads {
+ // Give the readers something to do, unless
+ // they have lots queued up already.
+ nextLocator <- locator
+ }
+ }
+}
+
+func doReads(kc *keepclient.KeepClient, nextLocator chan string) {
+ for locator := range nextLocator {
+ rdr, size, url, err := kc.Get(locator)
+ if err != nil {
+ log.Print(err)
+ errorsChan <- struct{}{}
+ continue
+ }
+ n, err := io.Copy(ioutil.Discard, rdr)
+ rdr.Close()
+ if n != size || err != nil {
+ log.Printf("Got %d bytes (expected %d) from %s: %v", n, size, url, err)
+ errorsChan <- struct{}{}
+ continue
+ // Note we don't count the bytes received in
+ // partial/corrupt responses: we are measuring
+ // throughput, not resource consumption.
+ }
+ bytesInChan <- uint64(n)
+ }
+}
--- /dev/null
+keep-rsync
--- /dev/null
+package main
+
+import (
+ "bufio"
+ "crypto/tls"
+ "errors"
+ "flag"
+ "fmt"
+ "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+ "git.curoverse.com/arvados.git/sdk/go/keepclient"
+ "io/ioutil"
+ "log"
+ "net/http"
+ "os"
+ "regexp"
+ "strings"
+ "time"
+)
+
+func main() {
+ var srcConfigFile, dstConfigFile, srcKeepServicesJSON, dstKeepServicesJSON, prefix string
+ var replications int
+ var srcBlobSigningKey string
+
+ flag.StringVar(
+ &srcConfigFile,
+ "src",
+ "",
+ "Source configuration filename. May be either a pathname to a config file, or (for example) 'foo' as shorthand for $HOME/.config/arvados/foo.conf")
+
+ flag.StringVar(
+ &dstConfigFile,
+ "dst",
+ "",
+ "Destination configuration filename. May be either a pathname to a config file, or (for example) 'foo' as shorthand for $HOME/.config/arvados/foo.conf")
+
+ flag.StringVar(
+ &srcKeepServicesJSON,
+ "src-keep-services-json",
+ "",
+ "An optional list of available source keepservices. "+
+ "If not provided, this list is obtained from api server configured in src-config-file.")
+
+ flag.StringVar(
+ &dstKeepServicesJSON,
+ "dst-keep-services-json",
+ "",
+ "An optional list of available destination keepservices. "+
+ "If not provided, this list is obtained from api server configured in dst-config-file.")
+
+ flag.IntVar(
+ &replications,
+ "replications",
+ 0,
+ "Number of replications to write to the destination. If replications not specified, "+
+ "default replication level configured on destination server will be used.")
+
+ flag.StringVar(
+ &prefix,
+ "prefix",
+ "",
+ "Index prefix")
+
+ flag.Parse()
+
+ srcConfig, srcBlobSigningKey, err := loadConfig(srcConfigFile)
+ if err != nil {
+ log.Fatalf("Error loading src configuration from file: %s", err.Error())
+ }
+
+ dstConfig, _, err := loadConfig(dstConfigFile)
+ if err != nil {
+ log.Fatalf("Error loading dst configuration from file: %s", err.Error())
+ }
+
+ // setup src and dst keepclients
+ kcSrc, err := setupKeepClient(srcConfig, srcKeepServicesJSON, false, 0)
+ if err != nil {
+ log.Fatalf("Error configuring src keepclient: %s", err.Error())
+ }
+
+ kcDst, err := setupKeepClient(dstConfig, dstKeepServicesJSON, true, replications)
+ if err != nil {
+ log.Fatalf("Error configuring dst keepclient: %s", err.Error())
+ }
+
+ // Copy blocks not found in dst from src
+ err = performKeepRsync(kcSrc, kcDst, srcBlobSigningKey, prefix)
+ if err != nil {
+ log.Fatalf("Error while syncing data: %s", err.Error())
+ }
+}
+
+type apiConfig struct {
+ APIToken string
+ APIHost string
+ APIHostInsecure bool
+ ExternalClient bool
+}
+
+// Load src and dst config from given files
+func loadConfig(configFile string) (config apiConfig, blobSigningKey string, err error) {
+ if configFile == "" {
+ return config, blobSigningKey, errors.New("config file not specified")
+ }
+
+ config, blobSigningKey, err = readConfigFromFile(configFile)
+ if err != nil {
+ return config, blobSigningKey, fmt.Errorf("Error reading config file: %v", err)
+ }
+
+ return
+}
+
+var matchTrue = regexp.MustCompile("^(?i:1|yes|true)$")
+
+// Read config from file
+func readConfigFromFile(filename string) (config apiConfig, blobSigningKey string, err error) {
+ if !strings.Contains(filename, "/") {
+ filename = os.Getenv("HOME") + "/.config/arvados/" + filename + ".conf"
+ }
+
+ content, err := ioutil.ReadFile(filename)
+
+ if err != nil {
+ return config, "", err
+ }
+
+ lines := strings.Split(string(content), "\n")
+ for _, line := range lines {
+ if line == "" {
+ continue
+ }
+
+ kv := strings.SplitN(line, "=", 2)
+ key := strings.TrimSpace(kv[0])
+ value := strings.TrimSpace(kv[1])
+
+ switch key {
+ case "ARVADOS_API_TOKEN":
+ config.APIToken = value
+ case "ARVADOS_API_HOST":
+ config.APIHost = value
+ case "ARVADOS_API_HOST_INSECURE":
+ config.APIHostInsecure = matchTrue.MatchString(value)
+ case "ARVADOS_EXTERNAL_CLIENT":
+ config.ExternalClient = matchTrue.MatchString(value)
+ case "ARVADOS_BLOB_SIGNING_KEY":
+ blobSigningKey = value
+ }
+ }
+ return
+}
+
+// setup keepclient using the config provided
+func setupKeepClient(config apiConfig, keepServicesJSON string, isDst bool, replications int) (kc *keepclient.KeepClient, err error) {
+ arv := arvadosclient.ArvadosClient{
+ ApiToken: config.APIToken,
+ ApiServer: config.APIHost,
+ ApiInsecure: config.APIHostInsecure,
+ Client: &http.Client{Transport: &http.Transport{
+ TLSClientConfig: &tls.Config{InsecureSkipVerify: config.APIHostInsecure}}},
+ External: config.ExternalClient,
+ }
+
+ // if keepServicesJSON is provided, use it to load services; else, use DiscoverKeepServers
+ if keepServicesJSON == "" {
+ kc, err = keepclient.MakeKeepClient(&arv)
+ if err != nil {
+ return nil, err
+ }
+ } else {
+ kc = keepclient.New(&arv)
+ err = kc.LoadKeepServicesFromJSON(keepServicesJSON)
+ if err != nil {
+ return kc, err
+ }
+ }
+
+ if isDst {
+ // Get default replications value from destination, if it is not already provided
+ if replications == 0 {
+ value, err := arv.Discovery("defaultCollectionReplication")
+ if err == nil {
+ replications = int(value.(float64))
+ } else {
+ return nil, err
+ }
+ }
+
+ kc.Want_replicas = replications
+ }
+
+ return kc, nil
+}
+
+// Get unique block locators from src and dst
+// Copy any blocks missing in dst
+func performKeepRsync(kcSrc, kcDst *keepclient.KeepClient, blobSigningKey, prefix string) error {
+ // Get unique locators from src
+ srcIndex, err := getUniqueLocators(kcSrc, prefix)
+ if err != nil {
+ return err
+ }
+
+ // Get unique locators from dst
+ dstIndex, err := getUniqueLocators(kcDst, prefix)
+ if err != nil {
+ return err
+ }
+
+ // Get list of locators found in src, but missing in dst
+ toBeCopied := getMissingLocators(srcIndex, dstIndex)
+
+ // Copy each missing block to dst
+ log.Printf("Before keep-rsync, there are %d blocks in src and %d blocks in dst. Start copying %d blocks from src not found in dst.",
+ len(srcIndex), len(dstIndex), len(toBeCopied))
+
+ err = copyBlocksToDst(toBeCopied, kcSrc, kcDst, blobSigningKey)
+
+ return err
+}
+
+// Get list of unique locators from the specified cluster
+func getUniqueLocators(kc *keepclient.KeepClient, prefix string) (map[string]bool, error) {
+ uniqueLocators := map[string]bool{}
+
+ // Get index and dedup
+ for uuid := range kc.LocalRoots() {
+ reader, err := kc.GetIndex(uuid, prefix)
+ if err != nil {
+ return uniqueLocators, err
+ }
+ scanner := bufio.NewScanner(reader)
+ for scanner.Scan() {
+ uniqueLocators[strings.Split(scanner.Text(), " ")[0]] = true
+ }
+ }
+
+ return uniqueLocators, nil
+}
+
+// Get list of locators that are in src but not in dst
+func getMissingLocators(srcLocators, dstLocators map[string]bool) []string {
+ var missingLocators []string
+ for locator := range srcLocators {
+ if _, ok := dstLocators[locator]; !ok {
+ missingLocators = append(missingLocators, locator)
+ }
+ }
+ return missingLocators
+}
+
+// Copy blocks from src to dst; only those that are missing in dst are copied
+func copyBlocksToDst(toBeCopied []string, kcSrc, kcDst *keepclient.KeepClient, blobSigningKey string) error {
+ total := len(toBeCopied)
+
+ startedAt := time.Now()
+ for done, locator := range toBeCopied {
+ if done == 0 {
+ log.Printf("Copying data block %d of %d (%.2f%% done): %v", done+1, total,
+ float64(done)/float64(total)*100, locator)
+ } else {
+ timePerBlock := time.Since(startedAt) / time.Duration(done)
+ log.Printf("Copying data block %d of %d (%.2f%% done, %v est. time remaining): %v", done+1, total,
+ float64(done)/float64(total)*100, timePerBlock*time.Duration(total-done), locator)
+ }
+
+ getLocator := locator
+ expiresAt := time.Now().AddDate(0, 0, 1)
+ if blobSigningKey != "" {
+ getLocator = keepclient.SignLocator(getLocator, kcSrc.Arvados.ApiToken, expiresAt, []byte(blobSigningKey))
+ }
+
+ reader, len, _, err := kcSrc.Get(getLocator)
+ if err != nil {
+ return fmt.Errorf("Error getting block: %v %v", locator, err)
+ }
+
+ _, _, err = kcDst.PutHR(getLocator[:32], reader, len)
+ if err != nil {
+ return fmt.Errorf("Error copying data block: %v %v", locator, err)
+ }
+ }
+
+ log.Printf("Successfully copied to destination %d blocks.", total)
+ return nil
+}
--- /dev/null
+package main
+
+import (
+ "crypto/md5"
+ "fmt"
+ "io/ioutil"
+ "os"
+ "strings"
+ "testing"
+ "time"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvadostest"
+ "git.curoverse.com/arvados.git/sdk/go/keepclient"
+
+ . "gopkg.in/check.v1"
+)
+
+// Gocheck boilerplate
+func Test(t *testing.T) {
+ TestingT(t)
+}
+
+// Gocheck boilerplate
+var _ = Suite(&ServerRequiredSuite{})
+var _ = Suite(&ServerNotRequiredSuite{})
+
+// Tests that require the Keep server running
+type ServerRequiredSuite struct{}
+type ServerNotRequiredSuite struct{}
+
+func (s *ServerRequiredSuite) SetUpSuite(c *C) {
+ // Start API server
+ arvadostest.StartAPI()
+}
+
+func (s *ServerRequiredSuite) TearDownSuite(c *C) {
+ arvadostest.StopAPI()
+ arvadostest.ResetEnv()
+}
+
+var kcSrc, kcDst *keepclient.KeepClient
+var srcKeepServicesJSON, dstKeepServicesJSON, blobSigningKey string
+
+func (s *ServerRequiredSuite) SetUpTest(c *C) {
+ // reset all variables between tests
+ blobSigningKey = ""
+ srcKeepServicesJSON = ""
+ dstKeepServicesJSON = ""
+ kcSrc = &keepclient.KeepClient{}
+ kcDst = &keepclient.KeepClient{}
+}
+
+func (s *ServerRequiredSuite) TearDownTest(c *C) {
+ arvadostest.StopKeepWithParams(3)
+}
+
+var testKeepServicesJSON = "{ \"kind\":\"arvados#keepServiceList\", \"etag\":\"\", \"self_link\":\"\", \"offset\":null, \"limit\":null, \"items\":[ { \"href\":\"/keep_services/zzzzz-bi6l4-123456789012340\", \"kind\":\"arvados#keepService\", \"etag\":\"641234567890enhj7hzx432e5\", \"uuid\":\"zzzzz-bi6l4-123456789012340\", \"owner_uuid\":\"zzzzz-tpzed-123456789012345\", \"service_host\":\"keep0.zzzzz.arvadosapi.com\", \"service_port\":25107, \"service_ssl_flag\":false, \"service_type\":\"disk\", \"read_only\":false }, { \"href\":\"/keep_services/zzzzz-bi6l4-123456789012341\", \"kind\":\"arvados#keepService\", \"etag\":\"641234567890enhj7hzx432e5\", \"uuid\":\"zzzzz-bi6l4-123456789012341\", \"owner_uuid\":\"zzzzz-tpzed-123456789012345\", \"service_host\":\"keep0.zzzzz.arvadosapi.com\", \"service_port\":25108, \"service_ssl_flag\":false, \"service_type\":\"disk\", \"read_only\":false } ], \"items_available\":2 }"
+
+// Testing keep-rsync needs two sets of keep services: src and dst.
+// The test setup hence creates 3 servers instead of the default 2,
+// and uses the first 2 as src and the 3rd as dst keep servers.
+func setupRsync(c *C, enforcePermissions bool, replications int) {
+ // srcConfig
+ var srcConfig apiConfig
+ srcConfig.APIHost = os.Getenv("ARVADOS_API_HOST")
+ srcConfig.APIToken = os.Getenv("ARVADOS_API_TOKEN")
+ srcConfig.APIHostInsecure = matchTrue.MatchString(os.Getenv("ARVADOS_API_HOST_INSECURE"))
+
+ // dstConfig
+ var dstConfig apiConfig
+ dstConfig.APIHost = os.Getenv("ARVADOS_API_HOST")
+ dstConfig.APIToken = os.Getenv("ARVADOS_API_TOKEN")
+ dstConfig.APIHostInsecure = matchTrue.MatchString(os.Getenv("ARVADOS_API_HOST_INSECURE"))
+
+ if enforcePermissions {
+ blobSigningKey = "zfhgfenhffzltr9dixws36j1yhksjoll2grmku38mi7yxd66h5j4q9w4jzanezacp8s6q0ro3hxakfye02152hncy6zml2ed0uc"
+ }
+
+ // Start Keep servers
+ arvadostest.StartAPI()
+ arvadostest.StartKeepWithParams(3, enforcePermissions)
+
+ // setup keepclients
+ var err error
+ kcSrc, err = setupKeepClient(srcConfig, srcKeepServicesJSON, false, 0)
+ c.Check(err, IsNil)
+
+ kcDst, err = setupKeepClient(dstConfig, dstKeepServicesJSON, true, replications)
+ c.Check(err, IsNil)
+
+ for uuid := range kcSrc.LocalRoots() {
+ if strings.HasSuffix(uuid, "02") {
+ delete(kcSrc.LocalRoots(), uuid)
+ }
+ }
+ for uuid := range kcSrc.GatewayRoots() {
+ if strings.HasSuffix(uuid, "02") {
+ delete(kcSrc.GatewayRoots(), uuid)
+ }
+ }
+ for uuid := range kcSrc.WritableLocalRoots() {
+ if strings.HasSuffix(uuid, "02") {
+ delete(kcSrc.WritableLocalRoots(), uuid)
+ }
+ }
+
+ for uuid := range kcDst.LocalRoots() {
+ if strings.HasSuffix(uuid, "00") || strings.HasSuffix(uuid, "01") {
+ delete(kcDst.LocalRoots(), uuid)
+ }
+ }
+ for uuid := range kcDst.GatewayRoots() {
+ if strings.HasSuffix(uuid, "00") || strings.HasSuffix(uuid, "01") {
+ delete(kcDst.GatewayRoots(), uuid)
+ }
+ }
+ for uuid := range kcDst.WritableLocalRoots() {
+ if strings.HasSuffix(uuid, "00") || strings.HasSuffix(uuid, "01") {
+ delete(kcDst.WritableLocalRoots(), uuid)
+ }
+ }
+
+ if replications == 0 {
+ // Must have got default replications value of 2 from dst discovery document
+ c.Assert(kcDst.Want_replicas, Equals, 2)
+ } else {
+ // Since replications value is provided, it is used
+ c.Assert(kcDst.Want_replicas, Equals, replications)
+ }
+}
+
+func (s *ServerRequiredSuite) TestRsyncPutInOne_GetFromOtherShouldFail(c *C) {
+ setupRsync(c, false, 1)
+
+ // Put a block in src and verify that it is not found in dst
+ testNoCrosstalk(c, "test-data-1", kcSrc, kcDst)
+
+ // Put a block in dst and verify that it is not found in src
+ testNoCrosstalk(c, "test-data-2", kcDst, kcSrc)
+}
+
+func (s *ServerRequiredSuite) TestRsyncWithBlobSigning_PutInOne_GetFromOtherShouldFail(c *C) {
+ setupRsync(c, true, 1)
+
+ // Put a block in src and verify that it is not found in dst
+ testNoCrosstalk(c, "test-data-1", kcSrc, kcDst)
+
+ // Put a block in dst and verify that it is not found in src
+ testNoCrosstalk(c, "test-data-2", kcDst, kcSrc)
+}
+
+// Do a Put in the first and Get from the second,
+// which should raise block not found error.
+func testNoCrosstalk(c *C, testData string, kc1, kc2 *keepclient.KeepClient) {
+ // Put a block using kc1
+ locator, _, err := kc1.PutB([]byte(testData))
+ c.Assert(err, Equals, nil)
+
+ locator = strings.Split(locator, "+")[0]
+ _, _, _, err = kc2.Get(keepclient.SignLocator(locator, kc2.Arvados.ApiToken, time.Now().AddDate(0, 0, 1), []byte(blobSigningKey)))
+ c.Assert(err, NotNil)
+ c.Check(err.Error(), Equals, "Block not found")
+}
+
+// Test keep-rsync initialization, with srcKeepServicesJSON
+func (s *ServerRequiredSuite) TestRsyncInitializeWithKeepServicesJSON(c *C) {
+ srcKeepServicesJSON = testKeepServicesJSON
+
+ setupRsync(c, false, 1)
+
+ localRoots := kcSrc.LocalRoots()
+ c.Check(localRoots, NotNil)
+
+ foundIt := false
+ for k := range localRoots {
+ if k == "zzzzz-bi6l4-123456789012340" {
+ foundIt = true
+ }
+ }
+ c.Check(foundIt, Equals, true)
+
+ foundIt = false
+ for k := range localRoots {
+ if k == "zzzzz-bi6l4-123456789012341" {
+ foundIt = true
+ }
+ }
+ c.Check(foundIt, Equals, true)
+}
+
+// Test keep-rsync initialization with default replications count
+func (s *ServerRequiredSuite) TestInitializeRsyncDefaultReplicationsCount(c *C) {
+ setupRsync(c, false, 0)
+}
+
+// Test keep-rsync initialization with replications count argument
+func (s *ServerRequiredSuite) TestInitializeRsyncReplicationsCount(c *C) {
+ setupRsync(c, false, 3)
+}
+
+// Put some blocks in Src and some more in Dst
+// And copy missing blocks from Src to Dst
+func (s *ServerRequiredSuite) TestKeepRsync(c *C) {
+ testKeepRsync(c, false, "")
+}
+
+// Put some blocks in Src and some more in Dst with blob signing enabled.
+// And copy missing blocks from Src to Dst
+func (s *ServerRequiredSuite) TestKeepRsync_WithBlobSigning(c *C) {
+ testKeepRsync(c, true, "")
+}
+
+// Put some blocks in Src and some more in Dst
+// Use prefix while doing rsync
+// And copy missing blocks from Src to Dst
+func (s *ServerRequiredSuite) TestKeepRsync_WithPrefix(c *C) {
+ data := []byte("test-data-4")
+ hash := fmt.Sprintf("%x", md5.Sum(data))
+
+ testKeepRsync(c, false, hash[0:3])
+ c.Check(len(dstIndex) > len(dstLocators), Equals, true)
+}
+
+// Put some blocks in Src and some more in Dst
+// Use prefix not in src while doing rsync
+// And copy missing blocks from Src to Dst
+func (s *ServerRequiredSuite) TestKeepRsync_WithNoSuchPrefixInSrc(c *C) {
+ testKeepRsync(c, false, "999")
+ c.Check(len(dstIndex), Equals, len(dstLocators))
+}
+
+// Put 5 blocks in src. Put 2 of those blocks in dst
+// Hence there are 3 additional blocks in src
+// Also, put 2 extra blocks in dst; they are hence only in dst
+// Run rsync and verify that those 7 blocks are now available in dst
+func testKeepRsync(c *C, enforcePermissions bool, prefix string) {
+ setupRsync(c, enforcePermissions, 1)
+
+ // setupTestData
+ setupTestData(c, prefix)
+
+ err := performKeepRsync(kcSrc, kcDst, blobSigningKey, prefix)
+ c.Check(err, IsNil)
+
+ // Now GetIndex from dst and verify that all 5 from src and the 2 extra blocks are found
+ dstIndex, err = getUniqueLocators(kcDst, "")
+ c.Check(err, IsNil)
+
+ for _, locator := range srcLocatorsMatchingPrefix {
+ _, ok := dstIndex[locator]
+ c.Assert(ok, Equals, true)
+ }
+
+ for _, locator := range extraDstLocators {
+ _, ok := dstIndex[locator]
+ c.Assert(ok, Equals, true)
+ }
+
+ if prefix == "" {
+ // all blocks from src and the two extra blocks
+ c.Assert(len(dstIndex), Equals, len(srcLocators)+len(extraDstLocators))
+ } else {
+ // 1 matching prefix and copied over, 2 that were initially copied into dst along with src, and the 2 extra blocks
+ c.Assert(len(dstIndex), Equals, len(srcLocatorsMatchingPrefix)+len(extraDstLocators)+2)
+ }
+}
+
+// Setup test data in src and dst.
+var srcLocators, srcLocatorsMatchingPrefix, dstLocators, extraDstLocators []string
+var dstIndex map[string]bool
+
+func setupTestData(c *C, indexPrefix string) {
+ srcLocators = []string{}
+ srcLocatorsMatchingPrefix = []string{}
+ dstLocators = []string{}
+ extraDstLocators = []string{}
+ dstIndex = make(map[string]bool)
+
+ // Put a few blocks in src using kcSrc
+ for i := 0; i < 5; i++ {
+ hash, _, err := kcSrc.PutB([]byte(fmt.Sprintf("test-data-%d", i)))
+ c.Check(err, IsNil)
+
+ srcLocators = append(srcLocators, strings.Split(hash, "+A")[0])
+ if strings.HasPrefix(hash, indexPrefix) {
+ srcLocatorsMatchingPrefix = append(srcLocatorsMatchingPrefix, strings.Split(hash, "+A")[0])
+ }
+ }
+
+ // Put first two of those src blocks in dst using kcDst
+ for i := 0; i < 2; i++ {
+ hash, _, err := kcDst.PutB([]byte(fmt.Sprintf("test-data-%d", i)))
+ c.Check(err, IsNil)
+ dstLocators = append(dstLocators, strings.Split(hash, "+A")[0])
+ }
+
+ // Put two more blocks in dst; they are not in src at all
+ for i := 0; i < 2; i++ {
+ hash, _, err := kcDst.PutB([]byte(fmt.Sprintf("other-data-%d", i)))
+ c.Check(err, IsNil)
+ dstLocators = append(dstLocators, strings.Split(hash, "+A")[0])
+ extraDstLocators = append(extraDstLocators, strings.Split(hash, "+A")[0])
+ }
+}
+
+// Setup rsync using srcKeepServicesJSON with fake keepservers.
+// Expect error during performKeepRsync due to unreachable src keepservers.
+func (s *ServerRequiredSuite) TestErrorDuringRsync_FakeSrcKeepservers(c *C) {
+ srcKeepServicesJSON = testKeepServicesJSON
+
+ setupRsync(c, false, 1)
+
+ err := performKeepRsync(kcSrc, kcDst, "", "")
+ c.Check(strings.HasSuffix(err.Error(), "no such host"), Equals, true)
+}
+
+// Setup rsync using dstKeepServicesJSON with fake keepservers.
+// Expect error during performKeepRsync due to unreachable dst keepservers.
+func (s *ServerRequiredSuite) TestErrorDuringRsync_FakeDstKeepservers(c *C) {
+ dstKeepServicesJSON = testKeepServicesJSON
+
+ setupRsync(c, false, 1)
+
+ err := performKeepRsync(kcSrc, kcDst, "", "")
+ c.Check(strings.HasSuffix(err.Error(), "no such host"), Equals, true)
+}
+
+// Test rsync with signature error during Get from src.
+func (s *ServerRequiredSuite) TestErrorDuringRsync_ErrorGettingBlockFromSrc(c *C) {
+ setupRsync(c, true, 1)
+
+ // put some blocks in src and dst
+ setupTestData(c, "")
+
+ // Change blob signing key to a fake key, so that Get from src fails
+ blobSigningKey = "thisisfakeblobsigningkey"
+
+ err := performKeepRsync(kcSrc, kcDst, blobSigningKey, "")
+ c.Check(strings.HasSuffix(err.Error(), "Block not found"), Equals, true)
+}
+
+// Test rsync with error during Put to src.
+func (s *ServerRequiredSuite) TestErrorDuringRsync_ErrorPuttingBlockInDst(c *C) {
+ setupRsync(c, false, 1)
+
+ // put some blocks in src and dst
+ setupTestData(c, "")
+
+ // Increase Want_replicas on dst to result in insufficient replicas error during Put
+ kcDst.Want_replicas = 2
+
+ err := performKeepRsync(kcSrc, kcDst, blobSigningKey, "")
+ c.Check(strings.HasSuffix(err.Error(), "Could not write sufficient replicas"), Equals, true)
+}
+
+// Test loadConfig func
+func (s *ServerNotRequiredSuite) TestLoadConfig(c *C) {
+ // Setup a src config file
+ srcFile := setupConfigFile(c, "src-config")
+ defer os.Remove(srcFile.Name())
+ srcConfigFile := srcFile.Name()
+
+ // Setup a dst config file
+ dstFile := setupConfigFile(c, "dst-config")
+ defer os.Remove(dstFile.Name())
+ dstConfigFile := dstFile.Name()
+
+ // load configuration from those files
+ srcConfig, srcBlobSigningKey, err := loadConfig(srcConfigFile)
+ c.Check(err, IsNil)
+
+ c.Assert(srcConfig.APIHost, Equals, "testhost")
+ c.Assert(srcConfig.APIToken, Equals, "testtoken")
+ c.Assert(srcConfig.APIHostInsecure, Equals, true)
+ c.Assert(srcConfig.ExternalClient, Equals, false)
+
+ dstConfig, _, err := loadConfig(dstConfigFile)
+ c.Check(err, IsNil)
+
+ c.Assert(dstConfig.APIHost, Equals, "testhost")
+ c.Assert(dstConfig.APIToken, Equals, "testtoken")
+ c.Assert(dstConfig.APIHostInsecure, Equals, true)
+ c.Assert(dstConfig.ExternalClient, Equals, false)
+
+ c.Assert(srcBlobSigningKey, Equals, "abcdefg")
+}
+
+// Test loadConfig func without setting up the config files
+func (s *ServerNotRequiredSuite) TestLoadConfig_MissingSrcConfig(c *C) {
+ _, _, err := loadConfig("")
+ c.Assert(err.Error(), Equals, "config file not specified")
+}
+
+// Test loadConfig func - error reading config
+func (s *ServerNotRequiredSuite) TestLoadConfig_ErrorLoadingSrcConfig(c *C) {
+ _, _, err := loadConfig("no-such-config-file")
+ c.Assert(strings.HasSuffix(err.Error(), "no such file or directory"), Equals, true)
+}
+
+func setupConfigFile(c *C, name string) *os.File {
+ // Setup a config file
+ file, err := ioutil.TempFile(os.TempDir(), name)
+ c.Check(err, IsNil)
+
+ fileContent := "ARVADOS_API_HOST=testhost\n"
+ fileContent += "ARVADOS_API_TOKEN=testtoken\n"
+ fileContent += "ARVADOS_API_HOST_INSECURE=true\n"
+ fileContent += "ARVADOS_BLOB_SIGNING_KEY=abcdefg"
+
+ _, err = file.Write([]byte(fileContent))
+ c.Check(err, IsNil)
+
+ return file
+}