Merge branch '6321-slurm-oserror' closes #6321
authorPeter Amstutz <peter.amstutz@curoverse.com>
Fri, 16 Oct 2015 15:42:01 +0000 (11:42 -0400)
committerPeter Amstutz <peter.amstutz@curoverse.com>
Fri, 16 Oct 2015 15:42:01 +0000 (11:42 -0400)
24 files changed:
doc/install/install-manual-prerequisites.html.textile.liquid
doc/sdk/cli/subcommands.html.textile.liquid
sdk/cli/bin/arv
sdk/cli/test/test_arv-get.rb
sdk/cli/test/test_arv-keep-get.rb [new file with mode: 0644]
sdk/cli/test/test_arv-keep-put.rb [moved from sdk/cli/test/test_arv-put.rb with 99% similarity]
sdk/go/arvadostest/run_servers.go
sdk/go/keepclient/keepclient.go
sdk/go/keepclient/keepclient_test.go
sdk/go/keepclient/support.go
sdk/go/streamer/streamer_test.go
sdk/go/streamer/transfer.go
sdk/python/tests/run_test_server.py
services/api/Gemfile
services/api/Gemfile.lock
services/keepstore/azure_blob_volume.go
services/keepstore/azure_blob_volume_test.go
services/keepstore/volume_generic_test.go
services/keepstore/volume_unix.go
tools/keep-exercise/.gitignore [new file with mode: 0644]
tools/keep-exercise/keep-exercise.go [new file with mode: 0644]
tools/keep-rsync/.gitignore [new file with mode: 0644]
tools/keep-rsync/keep-rsync.go [new file with mode: 0644]
tools/keep-rsync/keep-rsync_test.go [new file with mode: 0644]

index 52a51a191aafb90188d1906583b419f0135ca49b..a26370d21bf5f4bb877a7ece60957ff9b1eff6d1 100644 (file)
@@ -42,7 +42,7 @@ baseurl=http://rpm.arvados.org/CentOS/$releasever/os/$basearch/
 
 h3. Debian and Ubuntu
 
-Packages are available for Debian 7 ("wheezy"), Ubuntu 12.04 ("precise"), and Ubuntu 14.04 ("trusty").
+Packages are available for Debian 7 ("wheezy"), Debian 8 ("jessie"), Ubuntu 12.04 ("precise"), and Ubuntu 14.04 ("trusty").
 
 First, register the Curoverse signing key in apt's database:
 
@@ -53,6 +53,7 @@ Configure apt to retrieve packages from the Arvados package repository. This com
 table(table table-bordered table-condensed).
 |OS version|Command|
 |Debian 7 ("wheezy")|<notextile><code><span class="userinput">echo "deb http://apt.arvados.org/ wheezy main" &#x7c; sudo tee /etc/apt/sources.list.d/arvados.list</span></code></notextile>|
+|Debian 8 ("jessie")|<notextile><code><span class="userinput">echo "deb http://apt.arvados.org/ jessie main" &#x7c; sudo tee /etc/apt/sources.list.d/arvados.list</span></code></notextile>|
 |Ubuntu 12.04 ("precise")|<notextile><code><span class="userinput">echo "deb http://apt.arvados.org/ precise main" &#x7c; sudo tee /etc/apt/sources.list.d/arvados.list</span></code></notextile>|
 |Ubuntu 14.04 ("trusty")|<notextile><code><span class="userinput">echo "deb http://apt.arvados.org/ trusty main" &#x7c; sudo tee /etc/apt/sources.list.d/arvados.list</span></code></notextile>|
 
index aa7af94d5f9d4ac8527e7d3e238e15d19f177905..ca494fe17a570e0868af2157fcc92dda6b480cc2 100644 (file)
@@ -21,6 +21,20 @@ Options:
 </pre>
 </notextile>
 
+h3(#arv-get). arv get
+
+@arv get@ can be used to get a textual representation of Arvados objects from the command line. The output can be limited to a subset of the object's fields. This command can be used with only the knowledge of an object's UUID.
+
+<notextile>
+<pre>
+$ <code class="userinput">arv get --help</code>
+Usage: arv [--format json|yaml] get [uuid] [fields...]
+
+Fetch the specified Arvados object, select the specified fields,
+and print a text representation.
+</pre>
+</notextile>
+
 h3(#arv-edit). arv edit
 
 @arv edit@ can be used to edit Arvados objects from the command line. Arv edit opens up the editor of your choice (set the EDITOR environment variable) with the json or yaml description of the object. Saving the file will update the Arvados object on the API server, if it passes validation.
index 2bd7f4ef465d7a3427425716c11f9e3574d9f226..185a5b0673f1dc1c5afc5ed6530386d8255daaf4 100755 (executable)
@@ -5,6 +5,7 @@
 # Ward Vandewege <ward@curoverse.com>
 
 require 'fileutils'
+require 'shellwords'
 
 if RUBY_VERSION < '1.9.3' then
   abort <<-EOS
@@ -85,7 +86,15 @@ def init_config
 end
 
 
-subcommands = %w(copy create edit keep pipeline run tag ws)
+subcommands = %w(copy create edit get keep pipeline run tag ws)
+
+def exec_bin bin, opts
+  bin_path = `which #{bin.shellescape}`.strip
+  if bin_path.empty?
+    raise "#{bin}: command not found"
+  end
+  exec bin_path, *opts
+end
 
 def check_subcommands client, arvados, subcommand, global_opts, remaining_opts
   case subcommand
@@ -93,15 +102,17 @@ def check_subcommands client, arvados, subcommand, global_opts, remaining_opts
     arv_create client, arvados, global_opts, remaining_opts
   when 'edit'
     arv_edit client, arvados, global_opts, remaining_opts
+  when 'get'
+    arv_get client, arvados, global_opts, remaining_opts
   when 'copy', 'tag', 'ws', 'run'
-    exec `which arv-#{subcommand}`.strip, *remaining_opts
+    exec_bin "arv-#{subcommand}", remaining_opts
   when 'keep'
     @sub = remaining_opts.shift
     if ['get', 'put', 'ls', 'normalize'].index @sub then
       # Native Arvados
-      exec `which arv-#{@sub}`.strip, *remaining_opts
+      exec_bin "arv-#{@sub}", remaining_opts
     elsif @sub == 'docker'
-      exec `which arv-keepdocker`.strip, *remaining_opts
+      exec_bin "arv-keepdocker", remaining_opts
     else
       puts "Usage: arv keep [method] [--parameters]\n"
       puts "Use 'arv keep [method] --help' to get more information about specific methods.\n\n"
@@ -111,7 +122,7 @@ def check_subcommands client, arvados, subcommand, global_opts, remaining_opts
   when 'pipeline'
     sub = remaining_opts.shift
     if sub == 'run'
-      exec `which arv-run-pipeline-instance`.strip, *remaining_opts
+      exec_bin "arv-run-pipeline-instance", remaining_opts
     else
       puts "Usage: arv pipeline [method] [--parameters]\n"
       puts "Use 'arv pipeline [method] --help' to get more information about specific methods.\n\n"
@@ -147,14 +158,7 @@ end
 
 def edit_and_commit_object initial_obj, tmp_stem, global_opts, &block
 
-  content = case global_opts[:format]
-            when 'json'
-              Oj.dump(initial_obj, :indent => 1)
-            when 'yaml'
-              initial_obj.to_yaml
-            else
-              abort "Unrecognized format #{global_opts[:format]}"
-            end
+  content = get_obj_content initial_obj, global_opts
 
   tmp_file = Tempfile.new([tmp_stem, ".#{global_opts[:format]}"])
   tmp_file.write(content)
@@ -179,6 +183,8 @@ def edit_and_commit_object initial_obj, tmp_stem, global_opts, &block
                    Oj.load(newcontent)
                  when 'yaml'
                    YAML.load(newcontent)
+                 else
+                   abort "Unrecognized format #{global_opts[:format]}"
                  end
 
         yield newobj
@@ -243,20 +249,7 @@ def check_response result
   results
 end
 
-def arv_edit client, arvados, global_opts, remaining_opts
-  uuid = remaining_opts.shift
-  if uuid.nil? or uuid == "-h" or uuid == "--help"
-    puts head_banner
-    puts "Usage: arv edit [uuid] [fields...]\n\n"
-    puts "Fetch the specified Arvados object, select the specified fields, \n"
-    puts "open an interactive text editor on a text representation (json or\n"
-    puts "yaml, use --format) and then update the object.  Will use 'nano'\n"
-    puts "by default, customize with the EDITOR or VISUAL environment variable.\n"
-    exit 255
-  end
-
-  # determine controller
-
+def lookup_uuid_rsc arvados, uuid
   m = /([a-z0-9]{5})-([a-z0-9]{5})-([a-z0-9]{15})/.match uuid
   if !m
     if /^[a-f0-9]{32}/.match uuid
@@ -279,6 +272,11 @@ def arv_edit client, arvados, global_opts, remaining_opts
     abort "Could not determine resource type #{m[2]}"
   end
 
+  return rsc
+end
+
+def fetch_rsc_obj client, arvados, rsc, uuid, remaining_opts
+
   begin
     result = client.execute(:api_method => eval('arvados.' + rsc + '.get'),
                             :parameters => {"uuid" => uuid},
@@ -286,15 +284,45 @@ def arv_edit client, arvados, global_opts, remaining_opts
                             :headers => {
                               authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
                             })
-    oldobj = check_response result
+    obj = check_response result
   rescue => e
     abort "Server error: #{e}"
   end
 
   if remaining_opts.length > 0
-    oldobj.select! { |k, v| remaining_opts.include? k }
+    obj.select! { |k, v| remaining_opts.include? k }
+  end
+
+  return obj
+end
+
+def get_obj_content obj, global_opts
+  content = case global_opts[:format]
+            when 'json'
+              Oj.dump(obj, :indent => 1)
+            when 'yaml'
+              obj.to_yaml
+            else
+              abort "Unrecognized format #{global_opts[:format]}"
+            end
+  return content
+end
+
+def arv_edit client, arvados, global_opts, remaining_opts
+  uuid = remaining_opts.shift
+  if uuid.nil? or uuid == "-h" or uuid == "--help"
+    puts head_banner
+    puts "Usage: arv edit [uuid] [fields...]\n\n"
+    puts "Fetch the specified Arvados object, select the specified fields, \n"
+    puts "open an interactive text editor on a text representation (json or\n"
+    puts "yaml, use --format) and then update the object.  Will use 'nano'\n"
+    puts "by default, customize with the EDITOR or VISUAL environment variable.\n"
+    exit 255
   end
 
+  rsc = lookup_uuid_rsc arvados, uuid
+  oldobj = fetch_rsc_obj client, arvados, rsc, uuid, remaining_opts
+
   edit_and_commit_object oldobj, uuid, global_opts do |newobj|
     newobj.select! {|k| newobj[k] != oldobj[k]}
     if !newobj.empty?
@@ -315,6 +343,24 @@ def arv_edit client, arvados, global_opts, remaining_opts
   exit 0
 end
 
+def arv_get client, arvados, global_opts, remaining_opts
+  uuid = remaining_opts.shift
+  if uuid.nil? or uuid == "-h" or uuid == "--help"
+    puts head_banner
+    puts "Usage: arv [--format json|yaml] get [uuid] [fields...]\n\n"
+    puts "Fetch the specified Arvados object, select the specified fields,\n"
+    puts "and print a text representation.\n"
+    exit 255
+  end
+
+  rsc = lookup_uuid_rsc arvados, uuid
+  obj = fetch_rsc_obj client, arvados, rsc, uuid, remaining_opts
+  content = get_obj_content obj, global_opts
+
+  puts content
+  exit 0
+end
+
 def arv_create client, arvados, global_opts, remaining_opts
   types = resource_types(arvados.discovery_document)
   create_opts = Trollop::options do
index 5e58014cbfa10d3b9b67a8b7cddca8b8676f646c..2e2bba562f1e7dedc09fd3f4491da424cb7a5850 100644 (file)
 require 'minitest/autorun'
-require 'digest/md5'
+require 'json'
+require 'yaml'
 
+# Black box tests for 'arv get' command.
 class TestArvGet < Minitest::Test
-  def setup
-    begin
-      Dir.mkdir './tmp'
-    rescue Errno::EEXIST
-    end
-    @@foo_manifest_locator ||= `echo -n foo | ./bin/arv-put --filename foo --no-progress -`.strip
-    @@baz_locator ||= `echo -n baz | ./bin/arv-put --as-raw --no-progress -`.strip
-    @@multilevel_manifest_locator ||= `echo ./foo/bar #{@@baz_locator} 0:3:baz | ./bin/arv-put --as-raw --no-progress -`.strip
-  end
+  # UUID for an Arvados object that does not exist
+  NON_EXISTENT_OBJECT_UUID = "zzzzz-zzzzz-zzzzzzzzzzzzzzz"
+  # Name of field of Arvados object that can store any (textual) value
+  STORED_VALUE_FIELD_NAME = "name"
+  # Name of UUID field of Arvados object
+  UUID_FIELD_NAME = "uuid"
+  # Name of an invalid field of Arvados object
+  INVALID_FIELD_NAME = "invalid"
 
-  def test_no_args
+  # Tests that a valid Arvados object can be retrieved in a supported format
+  # using: `arv get [uuid]`. Given all other `arv foo` commands return JSON
+  # when no format is specified, JSON should be expected in this case.
+  def test_get_valid_object_no_format_specified
+    stored_value = __method__.to_s
+    uuid = create_arv_object_with_value(stored_value)
     out, err = capture_subprocess_io do
-      assert_arv_get false
+      assert(arv_get_default(uuid))
     end
-    assert_equal '', out
-    assert_match /^usage:/, err
+    assert_empty(err, "Error text not expected: '#{err}'")
+    arv_object = parse_json_arv_object(out)
+    assert(has_field_with_value(arv_object, STORED_VALUE_FIELD_NAME, stored_value))
   end
 
-  def test_help
+  # Tests that a valid Arvados object can be retrieved in JSON format using:
+  # `arv get [uuid] --format json`.
+  def test_get_valid_object_json_format_specified
+    stored_value = __method__.to_s
+    uuid = create_arv_object_with_value(stored_value)
     out, err = capture_subprocess_io do
-      assert_arv_get '-h'
+      assert(arv_get_json(uuid))
     end
-    $stderr.write err
-    assert_equal '', err
-    assert_match /^usage:/, out
+    assert_empty(err, "Error text not expected: '#{err}'")
+    arv_object = parse_json_arv_object(out)
+    assert(has_field_with_value(arv_object, STORED_VALUE_FIELD_NAME, stored_value))
   end
 
-  def test_file_to_dev_stdout
-    test_file_to_stdout('/dev/stdout')
-  end
-
-  def test_file_to_stdout(specify_stdout_as='-')
+  # Tests that a valid Arvados object can be retrieved in YAML format using:
+  # `arv get [uuid] --format yaml`.
+  def test_get_valid_object_yaml_format_specified
+    stored_value = __method__.to_s
+    uuid = create_arv_object_with_value(stored_value)
     out, err = capture_subprocess_io do
-      assert_arv_get @@foo_manifest_locator + '/foo', specify_stdout_as
+      assert(arv_get_yaml(uuid))
     end
-    assert_equal '', err
-    assert_equal 'foo', out
+    assert_empty(err, "Error text not expected: '#{err}'")
+    arv_object = parse_yaml_arv_object(out)
+    assert(has_field_with_value(arv_object, STORED_VALUE_FIELD_NAME, stored_value))
   end
 
-  def test_file_to_file
-    remove_tmp_foo
+  # Tests that a subset of all fields of a valid Arvados object can be retrieved
+  # using: `arv get [uuid] [fields...]`.
+  def test_get_valid_object_with_valid_fields
+    stored_value = __method__.to_s
+    uuid = create_arv_object_with_value(stored_value)
     out, err = capture_subprocess_io do
-      assert_arv_get @@foo_manifest_locator + '/foo', 'tmp/foo'
+      assert(arv_get_json(uuid, STORED_VALUE_FIELD_NAME, UUID_FIELD_NAME))
     end
-    assert_equal '', err
-    assert_equal '', out
-    assert_equal 'foo', IO.read('tmp/foo')
+    assert_empty(err, "Error text not expected: '#{err}'")
+    arv_object = parse_json_arv_object(out)
+    assert(has_field_with_value(arv_object, STORED_VALUE_FIELD_NAME, stored_value))
+    assert(has_field_with_value(arv_object, UUID_FIELD_NAME, uuid))
   end
 
-  def test_file_to_file_no_overwrite_file
-    File.open './tmp/foo', 'wb' do |f|
-      f.write 'baz'
-    end
+  # Tests that the valid field is retrieved when both a valid and invalid field
+  # are requested from a valid Arvados object, using:
+  # `arv get [uuid] [fields...]`.
+  def test_get_valid_object_with_both_valid_and_invalid_fields
+    stored_value = __method__.to_s
+    uuid = create_arv_object_with_value(stored_value)
     out, err = capture_subprocess_io do
-      assert_arv_get false, @@foo_manifest_locator + '/foo', 'tmp/foo'
+      assert(arv_get_json(uuid, STORED_VALUE_FIELD_NAME, INVALID_FIELD_NAME))
     end
-    assert_match /Local file tmp\/foo already exists/, err
-    assert_equal '', out
-    assert_equal 'baz', IO.read('tmp/foo')
+    assert_empty(err, "Error text not expected: '#{err}'")
+    arv_object = parse_json_arv_object(out)
+    assert(has_field_with_value(arv_object, STORED_VALUE_FIELD_NAME, stored_value))
+    refute(has_field_with_value(arv_object, INVALID_FIELD_NAME, stored_value))
   end
 
-  def test_file_to_file_no_overwrite_file_in_dir
-    File.open './tmp/foo', 'wb' do |f|
-      f.write 'baz'
-    end
+  # Tests that no fields are retreived when no valid fields are requested from
+  # a valid Arvados object, using: `arv get [uuid] [fields...]`.
+  def test_get_valid_object_with_no_valid_fields
+    stored_value = __method__.to_s
+    uuid = create_arv_object_with_value(stored_value)
     out, err = capture_subprocess_io do
-      assert_arv_get false, @@foo_manifest_locator + '/', 'tmp/'
+      assert(arv_get_json(uuid, INVALID_FIELD_NAME))
     end
-    assert_match /Local file tmp\/foo already exists/, err
-    assert_equal '', out
-    assert_equal 'baz', IO.read('tmp/foo')
+    assert_empty(err, "Error text not expected: '#{err}'")
+    arv_object = parse_json_arv_object(out)
+    assert_equal(0, arv_object.length)
   end
 
-  def test_file_to_file_force_overwrite
-    File.open './tmp/foo', 'wb' do |f|
-      f.write 'baz'
-    end
-    assert_equal 'baz', IO.read('tmp/foo')
+  # Tests that an invalid (non-existent) Arvados object is not retrieved using:
+  # using: `arv get [non-existent-uuid]`.
+  def test_get_invalid_object
     out, err = capture_subprocess_io do
-      assert_arv_get '-f', @@foo_manifest_locator + '/', 'tmp/'
+      refute(arv_get_json(NON_EXISTENT_OBJECT_UUID))
     end
-    assert_match '', err
-    assert_equal '', out
-    assert_equal 'foo', IO.read('tmp/foo')
+    refute_empty(err, "Expected error feedback on request for invalid object")
+    assert_empty(out)
   end
 
-  def test_file_to_file_skip_existing
-    File.open './tmp/foo', 'wb' do |f|
-      f.write 'baz'
-    end
-    assert_equal 'baz', IO.read('tmp/foo')
+  # Tests that help text exists using: `arv get --help`.
+  def test_help_exists
     out, err = capture_subprocess_io do
-      assert_arv_get '--skip-existing', @@foo_manifest_locator + '/', 'tmp/'
+#      assert(arv_get_default("--help"), "Expected exit code 0: #{$?}")
+       #XXX: Exit code given is 255. It probably should be 0, which seems to be
+       #     standard elsewhere. However, 255 is in line with other `arv`
+       #     commands (e.g. see `arv edit`) so ignoring the problem here.
+       arv_get_default("--help")
     end
-    assert_match '', err
-    assert_equal '', out
-    assert_equal 'baz', IO.read('tmp/foo')
+    assert_empty(err, "Error text not expected: '#{err}'")
+    refute_empty(out, "Help text should be given")
   end
 
-  def test_file_to_dir
-    remove_tmp_foo
-    out, err = capture_subprocess_io do
-      assert_arv_get @@foo_manifest_locator + '/foo', 'tmp/'
-    end
-    assert_equal '', err
-    assert_equal '', out
-    assert_equal 'foo', IO.read('tmp/foo')
-  end
-
-  def test_dir_to_file
-    out, err = capture_subprocess_io do
-      assert_arv_get false, @@foo_manifest_locator + '/', 'tmp/foo'
-    end
-    assert_equal '', out
-    assert_match /^usage:/, err
-  end
-
-  def test_dir_to_empty_string
-    out, err = capture_subprocess_io do
-      assert_arv_get false, @@foo_manifest_locator + '/', ''
-    end
-    assert_equal '', out
-    assert_match /^usage:/, err
-  end
-
-  def test_nonexistent_block
-    out, err = capture_subprocess_io do
-      assert_arv_get false, 'e796ab2294f3e48ec709ffa8d6daf58c'
-    end
-    assert_equal '', out
-    assert_match /Error:/, err
-  end
-
-  def test_nonexistent_manifest
-    out, err = capture_subprocess_io do
-      assert_arv_get false, 'acbd18db4cc2f85cedef654fccc4a4d8/', 'tmp/'
-    end
-    assert_equal '', out
-    assert_match /Error:/, err
-  end
-
-  def test_manifest_root_to_dir
-    remove_tmp_foo
-    out, err = capture_subprocess_io do
-      assert_arv_get '-r', @@foo_manifest_locator + '/', 'tmp/'
-    end
-    assert_equal '', err
-    assert_equal '', out
-    assert_equal 'foo', IO.read('tmp/foo')
-  end
-
-  def test_manifest_root_to_dir_noslash
-    remove_tmp_foo
-    out, err = capture_subprocess_io do
-      assert_arv_get '-r', @@foo_manifest_locator + '/', 'tmp'
-    end
-    assert_equal '', err
-    assert_equal '', out
-    assert_equal 'foo', IO.read('tmp/foo')
-  end
-
-  def test_display_md5sum
-    remove_tmp_foo
-    out, err = capture_subprocess_io do
-      assert_arv_get '-r', '--md5sum', @@foo_manifest_locator + '/', 'tmp/'
-    end
-    assert_equal "#{Digest::MD5.hexdigest('foo')}  ./foo\n", err
-    assert_equal '', out
-    assert_equal 'foo', IO.read('tmp/foo')
+  protected
+  # Runs 'arv get <varargs>' with given arguments. Returns whether the exit
+  # status was 0 (i.e. success). Use $? to attain more details on failure.
+  def arv_get_default(*args)
+    return system("arv", "get", *args)
   end
 
-  def test_md5sum_nowrite
-    remove_tmp_foo
-    out, err = capture_subprocess_io do
-      assert_arv_get '-n', '--md5sum', @@foo_manifest_locator + '/', 'tmp/'
-    end
-    assert_equal "#{Digest::MD5.hexdigest('foo')}  ./foo\n", err
-    assert_equal '', out
-    assert_equal false, File.exists?('tmp/foo')
+  # Runs 'arv --format json get <varargs>' with given arguments. Returns whether
+  # the exit status was 0 (i.e. success). Use $? to attain more details on
+  # failure.
+  def arv_get_json(*args)
+    return system("arv", "--format", "json", "get", *args)
   end
 
-  def test_sha1_nowrite
-    remove_tmp_foo
-    out, err = capture_subprocess_io do
-      assert_arv_get '-n', '-r', '--hash', 'sha1', @@foo_manifest_locator+'/', 'tmp/'
-    end
-    assert_equal "#{Digest::SHA1.hexdigest('foo')}  ./foo\n", err
-    assert_equal '', out
-    assert_equal false, File.exists?('tmp/foo')
+  # Runs 'arv --format yaml get <varargs>' with given arguments. Returns whether
+  # the exit status was 0 (i.e. success). Use $? to attain more details on
+  # failure.
+  def arv_get_yaml(*args)
+    return system("arv", "--format", "yaml", "get", *args)
   end
 
-  def test_block_to_file
-    remove_tmp_foo
+  # Creates an Arvados object that stores a given value. Returns the uuid of the
+  # created object.
+  def create_arv_object_with_value(value)
     out, err = capture_subprocess_io do
-      assert_arv_get @@foo_manifest_locator, 'tmp/foo'
+      system("arv", "tag", "add", value, "--object", "testing")
+      assert $?.success?, "Command failure running `arv tag`: #{$?}"
     end
     assert_equal '', err
-    assert_equal '', out
-
-    digest = Digest::MD5.hexdigest('foo')
-    !(IO.read('tmp/foo')).gsub!( /^(. #{digest}+3)(.*)( 0:3:foo)$/).nil?
+    assert_operator 0, :<, out.strip.length
+    out.strip
   end
 
-  def test_create_directory_tree
-    `rm -rf ./tmp/arv-get-test/`
-    Dir.mkdir './tmp/arv-get-test'
-    out, err = capture_subprocess_io do
-      assert_arv_get @@multilevel_manifest_locator + '/', 'tmp/arv-get-test/'
+  # Parses the given JSON representation of an Arvados object, returning
+  # an equivalent Ruby representation (a hash map).
+  def parse_json_arv_object(arvObjectAsJson)
+    begin
+      parsed = JSON.parse(arvObjectAsJson)
+      assert(parsed.instance_of?(Hash))
+      return parsed
+    rescue JSON::ParserError => e
+      raise "Invalid JSON representation of Arvados object.\n" \
+            "Parse error: '#{e}'\n" \
+            "JSON: '#{arvObjectAsJson}'\n"
     end
-    assert_equal '', err
-    assert_equal '', out
-    assert_equal 'baz', IO.read('tmp/arv-get-test/foo/bar/baz')
   end
 
-  def test_create_partial_directory_tree
-    `rm -rf ./tmp/arv-get-test/`
-    Dir.mkdir './tmp/arv-get-test'
-    out, err = capture_subprocess_io do
-      assert_arv_get(@@multilevel_manifest_locator + '/foo/',
-                     'tmp/arv-get-test/')
+  # Parses the given JSON representation of an Arvados object, returning
+  # an equivalent Ruby representation (a hash map).
+  def parse_yaml_arv_object(arvObjectAsYaml)
+    begin
+      parsed = YAML.load(arvObjectAsYaml)
+      assert(parsed.instance_of?(Hash))
+      return parsed
+    rescue
+      raise "Invalid YAML representation of Arvados object.\n" \
+            "YAML: '#{arvObjectAsYaml}'\n"
     end
-    assert_equal '', err
-    assert_equal '', out
-    assert_equal 'baz', IO.read('tmp/arv-get-test/bar/baz')
   end
 
-  protected
-  def assert_arv_get(*args)
-    expect = case args.first
-             when true, false
-               args.shift
-             else
-               true
-             end
-    assert_equal(expect,
-                 system(['./bin/arv-get', 'arv-get'], *args),
-                 "`arv-get #{args.join ' '}` " +
-                 "should exit #{if expect then 0 else 'non-zero' end}")
-  end
-
-  def remove_tmp_foo
-    begin
-      File.unlink('tmp/foo')
-    rescue Errno::ENOENT
+  # Checks whether the given Arvados object has the given expected value for the
+  # specified field.
+  def has_field_with_value(arvObjectAsHash, fieldName, expectedValue)
+    if !arvObjectAsHash.has_key?(fieldName)
+      return false
     end
+    return (arvObjectAsHash[fieldName] == expectedValue)
   end
 end
diff --git a/sdk/cli/test/test_arv-keep-get.rb b/sdk/cli/test/test_arv-keep-get.rb
new file mode 100644 (file)
index 0000000..0e578b8
--- /dev/null
@@ -0,0 +1,251 @@
+require 'minitest/autorun'
+require 'digest/md5'
+
+class TestArvKeepGet < Minitest::Test
+  def setup
+    begin
+      Dir.mkdir './tmp'
+    rescue Errno::EEXIST
+    end
+    @@foo_manifest_locator ||= `echo -n foo | ./bin/arv-put --filename foo --no-progress -`.strip
+    @@baz_locator ||= `echo -n baz | ./bin/arv-put --as-raw --no-progress -`.strip
+    @@multilevel_manifest_locator ||= `echo ./foo/bar #{@@baz_locator} 0:3:baz | ./bin/arv-put --as-raw --no-progress -`.strip
+  end
+
+  def test_no_args
+    out, err = capture_subprocess_io do
+      assert_arv_get false
+    end
+    assert_equal '', out
+    assert_match /^usage:/, err
+  end
+
+  def test_help
+    out, err = capture_subprocess_io do
+      assert_arv_get '-h'
+    end
+    $stderr.write err
+    assert_equal '', err
+    assert_match /^usage:/, out
+  end
+
+  def test_file_to_dev_stdout
+    test_file_to_stdout('/dev/stdout')
+  end
+
+  def test_file_to_stdout(specify_stdout_as='-')
+    out, err = capture_subprocess_io do
+      assert_arv_get @@foo_manifest_locator + '/foo', specify_stdout_as
+    end
+    assert_equal '', err
+    assert_equal 'foo', out
+  end
+
+  def test_file_to_file
+    remove_tmp_foo
+    out, err = capture_subprocess_io do
+      assert_arv_get @@foo_manifest_locator + '/foo', 'tmp/foo'
+    end
+    assert_equal '', err
+    assert_equal '', out
+    assert_equal 'foo', IO.read('tmp/foo')
+  end
+
+  def test_file_to_file_no_overwrite_file
+    File.open './tmp/foo', 'wb' do |f|
+      f.write 'baz'
+    end
+    out, err = capture_subprocess_io do
+      assert_arv_get false, @@foo_manifest_locator + '/foo', 'tmp/foo'
+    end
+    assert_match /Local file tmp\/foo already exists/, err
+    assert_equal '', out
+    assert_equal 'baz', IO.read('tmp/foo')
+  end
+
+  def test_file_to_file_no_overwrite_file_in_dir
+    File.open './tmp/foo', 'wb' do |f|
+      f.write 'baz'
+    end
+    out, err = capture_subprocess_io do
+      assert_arv_get false, @@foo_manifest_locator + '/', 'tmp/'
+    end
+    assert_match /Local file tmp\/foo already exists/, err
+    assert_equal '', out
+    assert_equal 'baz', IO.read('tmp/foo')
+  end
+
+  def test_file_to_file_force_overwrite
+    File.open './tmp/foo', 'wb' do |f|
+      f.write 'baz'
+    end
+    assert_equal 'baz', IO.read('tmp/foo')
+    out, err = capture_subprocess_io do
+      assert_arv_get '-f', @@foo_manifest_locator + '/', 'tmp/'
+    end
+    assert_match '', err
+    assert_equal '', out
+    assert_equal 'foo', IO.read('tmp/foo')
+  end
+
+  def test_file_to_file_skip_existing
+    File.open './tmp/foo', 'wb' do |f|
+      f.write 'baz'
+    end
+    assert_equal 'baz', IO.read('tmp/foo')
+    out, err = capture_subprocess_io do
+      assert_arv_get '--skip-existing', @@foo_manifest_locator + '/', 'tmp/'
+    end
+    assert_match '', err
+    assert_equal '', out
+    assert_equal 'baz', IO.read('tmp/foo')
+  end
+
+  def test_file_to_dir
+    remove_tmp_foo
+    out, err = capture_subprocess_io do
+      assert_arv_get @@foo_manifest_locator + '/foo', 'tmp/'
+    end
+    assert_equal '', err
+    assert_equal '', out
+    assert_equal 'foo', IO.read('tmp/foo')
+  end
+
+  def test_dir_to_file
+    out, err = capture_subprocess_io do
+      assert_arv_get false, @@foo_manifest_locator + '/', 'tmp/foo'
+    end
+    assert_equal '', out
+    assert_match /^usage:/, err
+  end
+
+  def test_dir_to_empty_string
+    out, err = capture_subprocess_io do
+      assert_arv_get false, @@foo_manifest_locator + '/', ''
+    end
+    assert_equal '', out
+    assert_match /^usage:/, err
+  end
+
+  def test_nonexistent_block
+    out, err = capture_subprocess_io do
+      assert_arv_get false, 'e796ab2294f3e48ec709ffa8d6daf58c'
+    end
+    assert_equal '', out
+    assert_match /Error:/, err
+  end
+
+  def test_nonexistent_manifest
+    out, err = capture_subprocess_io do
+      assert_arv_get false, 'acbd18db4cc2f85cedef654fccc4a4d8/', 'tmp/'
+    end
+    assert_equal '', out
+    assert_match /Error:/, err
+  end
+
+  def test_manifest_root_to_dir
+    remove_tmp_foo
+    out, err = capture_subprocess_io do
+      assert_arv_get '-r', @@foo_manifest_locator + '/', 'tmp/'
+    end
+    assert_equal '', err
+    assert_equal '', out
+    assert_equal 'foo', IO.read('tmp/foo')
+  end
+
+  def test_manifest_root_to_dir_noslash
+    remove_tmp_foo
+    out, err = capture_subprocess_io do
+      assert_arv_get '-r', @@foo_manifest_locator + '/', 'tmp'
+    end
+    assert_equal '', err
+    assert_equal '', out
+    assert_equal 'foo', IO.read('tmp/foo')
+  end
+
+  def test_display_md5sum
+    remove_tmp_foo
+    out, err = capture_subprocess_io do
+      assert_arv_get '-r', '--md5sum', @@foo_manifest_locator + '/', 'tmp/'
+    end
+    assert_equal "#{Digest::MD5.hexdigest('foo')}  ./foo\n", err
+    assert_equal '', out
+    assert_equal 'foo', IO.read('tmp/foo')
+  end
+
+  def test_md5sum_nowrite
+    remove_tmp_foo
+    out, err = capture_subprocess_io do
+      assert_arv_get '-n', '--md5sum', @@foo_manifest_locator + '/', 'tmp/'
+    end
+    assert_equal "#{Digest::MD5.hexdigest('foo')}  ./foo\n", err
+    assert_equal '', out
+    assert_equal false, File.exists?('tmp/foo')
+  end
+
+  def test_sha1_nowrite
+    remove_tmp_foo
+    out, err = capture_subprocess_io do
+      assert_arv_get '-n', '-r', '--hash', 'sha1', @@foo_manifest_locator+'/', 'tmp/'
+    end
+    assert_equal "#{Digest::SHA1.hexdigest('foo')}  ./foo\n", err
+    assert_equal '', out
+    assert_equal false, File.exists?('tmp/foo')
+  end
+
+  def test_block_to_file
+    remove_tmp_foo
+    out, err = capture_subprocess_io do
+      assert_arv_get @@foo_manifest_locator, 'tmp/foo'
+    end
+    assert_equal '', err
+    assert_equal '', out
+
+    digest = Digest::MD5.hexdigest('foo')
+    !(IO.read('tmp/foo')).gsub!( /^(. #{digest}+3)(.*)( 0:3:foo)$/).nil?
+  end
+
+  def test_create_directory_tree
+    `rm -rf ./tmp/arv-get-test/`
+    Dir.mkdir './tmp/arv-get-test'
+    out, err = capture_subprocess_io do
+      assert_arv_get @@multilevel_manifest_locator + '/', 'tmp/arv-get-test/'
+    end
+    assert_equal '', err
+    assert_equal '', out
+    assert_equal 'baz', IO.read('tmp/arv-get-test/foo/bar/baz')
+  end
+
+  def test_create_partial_directory_tree
+    `rm -rf ./tmp/arv-get-test/`
+    Dir.mkdir './tmp/arv-get-test'
+    out, err = capture_subprocess_io do
+      assert_arv_get(@@multilevel_manifest_locator + '/foo/',
+                     'tmp/arv-get-test/')
+    end
+    assert_equal '', err
+    assert_equal '', out
+    assert_equal 'baz', IO.read('tmp/arv-get-test/bar/baz')
+  end
+
+  protected
+  def assert_arv_get(*args)
+    expect = case args.first
+             when true, false
+               args.shift
+             else
+               true
+             end
+    assert_equal(expect,
+                 system(['./bin/arv-get', 'arv-get'], *args),
+                 "`arv-get #{args.join ' '}` " +
+                 "should exit #{if expect then 0 else 'non-zero' end}")
+  end
+
+  def remove_tmp_foo
+    begin
+      File.unlink('tmp/foo')
+    rescue Errno::ENOENT
+    end
+  end
+end
similarity index 99%
rename from sdk/cli/test/test_arv-put.rb
rename to sdk/cli/test/test_arv-keep-put.rb
index 2f20e18440a2ff61dde6b748d3b327587530b142..fefbc2729875e70cb890f69d56fe1d7f1c614b8d 100644 (file)
@@ -1,7 +1,7 @@
 require 'minitest/autorun'
 require 'digest/md5'
 
-class TestArvPut < Minitest::Test
+class TestArvKeepPut < Minitest::Test
   def setup
     begin Dir.mkdir './tmp' rescue Errno::EEXIST end
     begin Dir.mkdir './tmp/empty_dir' rescue Errno::EEXIST end
index cad16917dba286504f6693cac3a3fbd4d05a741e..e922104aeb75096b4a6f36b6648e74af0abe821d 100644 (file)
@@ -9,6 +9,7 @@ import (
        "log"
        "os"
        "os/exec"
+       "strconv"
        "strings"
 )
 
@@ -98,12 +99,25 @@ func StopAPI() {
        exec.Command("python", "run_test_server.py", "stop").Run()
 }
 
+// StartKeep starts 2 keep servers with enforcePermissions=false
 func StartKeep() {
+       StartKeepWithParams(2, false)
+}
+
+// StartKeepWithParams starts the given number of keep servers,
+// optionally with -enforce-permissions enabled.
+func StartKeepWithParams(numKeepServers int, enforcePermissions bool) {
        cwd, _ := os.Getwd()
        defer os.Chdir(cwd)
        chdirToPythonTests()
 
-       cmd := exec.Command("python", "run_test_server.py", "start_keep")
+       cmdArgs := []string{"run_test_server.py", "start_keep", "--num-keep-servers", strconv.Itoa(numKeepServers)}
+       if enforcePermissions {
+               cmdArgs = append(cmdArgs, "--keep-enforce-permissions")
+       }
+
+       cmd := exec.Command("python", cmdArgs...)
+
        stderr, err := cmd.StderrPipe()
        if err != nil {
                log.Fatalf("Setting up stderr pipe: %s", err)
@@ -115,9 +129,16 @@ func StartKeep() {
 }
 
 func StopKeep() {
+       StopKeepWithParams(2)
+}
+
+// StopKeepServers stops keep servers that were started with
+// StartKeep. numkeepServers should be the same value that was passed
+// to StartKeep.
+func StopKeepWithParams(numKeepServers int) {
        cwd, _ := os.Getwd()
        defer os.Chdir(cwd)
        chdirToPythonTests()
 
-       exec.Command("python", "run_test_server.py", "stop_keep").Run()
+       exec.Command("python", "run_test_server.py", "stop_keep", "--num-keep-servers", strconv.Itoa(numKeepServers))
 }
index 8b7cf419ead2f56c7c124799eadd0479dc69eac6..67c304deaf3ae54b2668cb8c2f2856e909da8c5a 100644 (file)
@@ -13,7 +13,6 @@ import (
        "io/ioutil"
        "log"
        "net/http"
-       "os"
        "regexp"
        "strconv"
        "strings"
@@ -49,24 +48,39 @@ type KeepClient struct {
        gatewayRoots       *map[string]string
        lock               sync.RWMutex
        Client             *http.Client
+       Retries            int
 
        // set to 1 if all writable services are of disk type, otherwise 0
        replicasPerService int
 }
 
-// Create a new KeepClient.  This will contact the API server to discover Keep
-// servers.
+// MakeKeepClient creates a new KeepClient by contacting the API server to discover Keep servers.
 func MakeKeepClient(arv *arvadosclient.ArvadosClient) (*KeepClient, error) {
-       var matchTrue = regexp.MustCompile("^(?i:1|yes|true)$")
-       insecure := matchTrue.MatchString(os.Getenv("ARVADOS_API_HOST_INSECURE"))
+       kc := New(arv)
+       return kc, kc.DiscoverKeepServers()
+}
+
+// New func creates a new KeepClient struct.
+// This func does not discover keep servers. It is the caller's responsibility.
+func New(arv *arvadosclient.ArvadosClient) *KeepClient {
+       defaultReplicationLevel := 2
+       value, err := arv.Discovery("defaultCollectionReplication")
+       if err == nil {
+               v, ok := value.(float64)
+               if ok && v > 0 {
+                       defaultReplicationLevel = int(v)
+               }
+       }
+
        kc := &KeepClient{
                Arvados:       arv,
-               Want_replicas: 2,
+               Want_replicas: defaultReplicationLevel,
                Using_proxy:   false,
                Client: &http.Client{Transport: &http.Transport{
-                       TLSClientConfig: &tls.Config{InsecureSkipVerify: insecure}}},
+                       TLSClientConfig: &tls.Config{InsecureSkipVerify: arv.ApiInsecure}}},
+               Retries: 2,
        }
-       return kc, kc.DiscoverKeepServers()
+       return kc
 }
 
 // Put a block given the block hash, a reader, and the number of bytes
@@ -127,6 +141,69 @@ func (kc *KeepClient) PutR(r io.Reader) (locator string, replicas int, err error
        }
 }
 
+func (kc *KeepClient) getOrHead(method string, locator string) (io.ReadCloser, int64, string, error) {
+       var errs []string
+
+       tries_remaining := 1 + kc.Retries
+       serversToTry := kc.getSortedRoots(locator)
+       var retryList []string
+
+       for tries_remaining > 0 {
+               tries_remaining -= 1
+               retryList = nil
+
+               for _, host := range serversToTry {
+                       url := host + "/" + locator
+
+                       req, err := http.NewRequest(method, url, nil)
+                       if err != nil {
+                               errs = append(errs, fmt.Sprintf("%s: %v", url, err))
+                               continue
+                       }
+                       req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", kc.Arvados.ApiToken))
+                       resp, err := kc.Client.Do(req)
+                       if err != nil {
+                               // Probably a network error, may be transient,
+                               // can try again.
+                               errs = append(errs, fmt.Sprintf("%s: %v", url, err))
+                               retryList = append(retryList, host)
+                       } else if resp.StatusCode != http.StatusOK {
+                               var respbody []byte
+                               respbody, _ = ioutil.ReadAll(&io.LimitedReader{resp.Body, 4096})
+                               resp.Body.Close()
+                               errs = append(errs, fmt.Sprintf("%s: HTTP %d %q",
+                                       url, resp.StatusCode, bytes.TrimSpace(respbody)))
+
+                               if resp.StatusCode == 408 ||
+                                       resp.StatusCode == 429 ||
+                                       resp.StatusCode >= 500 {
+                                       // Timeout, too many requests, or other
+                                       // server side failure, transient
+                                       // error, can try again.
+                                       retryList = append(retryList, host)
+                               }
+                       } else {
+                               // Success.
+                               if method == "GET" {
+                                       return HashCheckingReader{
+                                               Reader: resp.Body,
+                                               Hash:   md5.New(),
+                                               Check:  locator[0:32],
+                                       }, resp.ContentLength, url, nil
+                               } else {
+                                       resp.Body.Close()
+                                       return nil, resp.ContentLength, url, nil
+                               }
+                       }
+
+               }
+               serversToTry = retryList
+       }
+       log.Printf("DEBUG: %s %s failed: %v", method, locator, errs)
+
+       return nil, 0, "", BlockNotFound
+}
+
 // Get() retrieves a block, given a locator. Returns a reader, the
 // expected data length, the URL the block is being fetched from, and
 // an error.
@@ -135,34 +212,7 @@ func (kc *KeepClient) PutR(r io.Reader) (locator string, replicas int, err error
 // reader returned by this method will return a BadChecksum error
 // instead of EOF.
 func (kc *KeepClient) Get(locator string) (io.ReadCloser, int64, string, error) {
-       var errs []string
-       for _, host := range kc.getSortedRoots(locator) {
-               url := host + "/" + locator
-               req, err := http.NewRequest("GET", url, nil)
-               if err != nil {
-                       errs = append(errs, fmt.Sprintf("%s: %v", url, err))
-                       continue
-               }
-               req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", kc.Arvados.ApiToken))
-               resp, err := kc.Client.Do(req)
-               if err != nil {
-                       errs = append(errs, fmt.Sprintf("%s: %v", url, err))
-                       continue
-               } else if resp.StatusCode != http.StatusOK {
-                       respbody, _ := ioutil.ReadAll(&io.LimitedReader{resp.Body, 4096})
-                       resp.Body.Close()
-                       errs = append(errs, fmt.Sprintf("%s: HTTP %d %q",
-                               url, resp.StatusCode, bytes.TrimSpace(respbody)))
-                       continue
-               }
-               return HashCheckingReader{
-                       Reader: resp.Body,
-                       Hash:   md5.New(),
-                       Check:  locator[0:32],
-               }, resp.ContentLength, url, nil
-       }
-       log.Printf("DEBUG: GET %s failed: %v", locator, errs)
-       return nil, 0, "", BlockNotFound
+       return kc.getOrHead("GET", locator)
 }
 
 // Ask() verifies that a block with the given hash is available and
@@ -173,18 +223,8 @@ func (kc *KeepClient) Get(locator string) (io.ReadCloser, int64, string, error)
 // Returns the data size (content length) reported by the Keep service
 // and the URI reporting the data size.
 func (kc *KeepClient) Ask(locator string) (int64, string, error) {
-       for _, host := range kc.getSortedRoots(locator) {
-               url := host + "/" + locator
-               req, err := http.NewRequest("HEAD", url, nil)
-               if err != nil {
-                       continue
-               }
-               req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", kc.Arvados.ApiToken))
-               if resp, err := kc.Client.Do(req); err == nil && resp.StatusCode == http.StatusOK {
-                       return resp.ContentLength, url, nil
-               }
-       }
-       return 0, "", BlockNotFound
+       _, size, url, err := kc.getOrHead("HEAD", locator)
+       return size, url, err
 }
 
 // GetIndex retrieves a list of blocks stored on the given server whose hashes
index ee60d2822f2f95b439101b974d9a11001831b346..b5bc5ced4b23716a381f2af7d53f075a17b6771e 100644 (file)
@@ -69,6 +69,22 @@ func (s *ServerRequiredSuite) TestMakeKeepClient(c *C) {
        }
 }
 
+func (s *ServerRequiredSuite) TestDefaultReplications(c *C) {
+       arv, err := arvadosclient.MakeArvadosClient()
+       c.Assert(err, Equals, nil)
+
+       kc, err := MakeKeepClient(&arv)
+       c.Assert(kc.Want_replicas, Equals, 2)
+
+       arv.DiscoveryDoc["defaultCollectionReplication"] = 3.0
+       kc, err = MakeKeepClient(&arv)
+       c.Assert(kc.Want_replicas, Equals, 3)
+
+       arv.DiscoveryDoc["defaultCollectionReplication"] = 1.0
+       kc, err = MakeKeepClient(&arv)
+       c.Assert(kc.Want_replicas, Equals, 1)
+}
+
 type StubPutHandler struct {
        c              *C
        expectPath     string
@@ -184,6 +200,31 @@ func (fh FailHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
        fh.handled <- fmt.Sprintf("http://%s", req.Host)
 }
 
+type FailThenSucceedHandler struct {
+       handled        chan string
+       count          int
+       successhandler StubGetHandler
+}
+
+func (fh *FailThenSucceedHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+       if fh.count == 0 {
+               resp.WriteHeader(500)
+               fh.count += 1
+               fh.handled <- fmt.Sprintf("http://%s", req.Host)
+       } else {
+               fh.successhandler.ServeHTTP(resp, req)
+       }
+}
+
+type Error404Handler struct {
+       handled chan string
+}
+
+func (fh Error404Handler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+       resp.WriteHeader(404)
+       fh.handled <- fmt.Sprintf("http://%s", req.Host)
+}
+
 func (s *StandaloneSuite) TestFailedUploadToStubKeepServer(c *C) {
        log.Printf("TestFailedUploadToStubKeepServer")
 
@@ -464,7 +505,7 @@ func (s *StandaloneSuite) TestGet(c *C) {
        arv, err := arvadosclient.MakeArvadosClient()
        kc, _ := MakeKeepClient(&arv)
        arv.ApiToken = "abc123"
-       kc.SetServiceRoots(map[string]string{"x": ks.url}, map[string]string{ks.url: ""}, nil)
+       kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
 
        r, n, url2, err := kc.Get(hash)
        defer r.Close()
@@ -479,6 +520,26 @@ func (s *StandaloneSuite) TestGet(c *C) {
        log.Printf("TestGet done")
 }
 
+func (s *StandaloneSuite) TestGet404(c *C) {
+       hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
+
+       st := Error404Handler{make(chan string, 1)}
+
+       ks := RunFakeKeepServer(st)
+       defer ks.listener.Close()
+
+       arv, err := arvadosclient.MakeArvadosClient()
+       kc, _ := MakeKeepClient(&arv)
+       arv.ApiToken = "abc123"
+       kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
+
+       r, n, url2, err := kc.Get(hash)
+       c.Check(err, Equals, BlockNotFound)
+       c.Check(n, Equals, int64(0))
+       c.Check(url2, Equals, "")
+       c.Check(r, Equals, nil)
+}
+
 func (s *StandaloneSuite) TestGetFail(c *C) {
        hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
 
@@ -490,7 +551,52 @@ func (s *StandaloneSuite) TestGetFail(c *C) {
        arv, err := arvadosclient.MakeArvadosClient()
        kc, _ := MakeKeepClient(&arv)
        arv.ApiToken = "abc123"
-       kc.SetServiceRoots(map[string]string{"x": ks.url}, map[string]string{ks.url: ""}, nil)
+       kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
+
+       r, n, url2, err := kc.Get(hash)
+       c.Check(err, Equals, BlockNotFound)
+       c.Check(n, Equals, int64(0))
+       c.Check(url2, Equals, "")
+       c.Check(r, Equals, nil)
+}
+
+func (s *StandaloneSuite) TestGetFailRetry(c *C) {
+       hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
+
+       st := &FailThenSucceedHandler{make(chan string, 1), 0,
+               StubGetHandler{
+                       c,
+                       hash,
+                       "abc123",
+                       http.StatusOK,
+                       []byte("foo")}}
+
+       ks := RunFakeKeepServer(st)
+       defer ks.listener.Close()
+
+       arv, err := arvadosclient.MakeArvadosClient()
+       kc, _ := MakeKeepClient(&arv)
+       arv.ApiToken = "abc123"
+       kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
+
+       r, n, url2, err := kc.Get(hash)
+       defer r.Close()
+       c.Check(err, Equals, nil)
+       c.Check(n, Equals, int64(3))
+       c.Check(url2, Equals, fmt.Sprintf("%s/%s", ks.url, hash))
+
+       content, err2 := ioutil.ReadAll(r)
+       c.Check(err2, Equals, nil)
+       c.Check(content, DeepEquals, []byte("foo"))
+}
+
+func (s *StandaloneSuite) TestGetNetError(c *C) {
+       hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
+
+       arv, err := arvadosclient.MakeArvadosClient()
+       kc, _ := MakeKeepClient(&arv)
+       arv.ApiToken = "abc123"
+       kc.SetServiceRoots(map[string]string{"x": "http://localhost:62222"}, nil, nil)
 
        r, n, url2, err := kc.Get(hash)
        c.Check(err, Equals, BlockNotFound)
@@ -525,7 +631,7 @@ func (s *StandaloneSuite) TestGetWithServiceHint(c *C) {
        arv.ApiToken = "abc123"
        kc.SetServiceRoots(
                map[string]string{"x": ks0.url},
-               map[string]string{"x": ks0.url},
+               nil,
                map[string]string{uuid: ks.url})
 
        r, n, uri, err := kc.Get(hash + "+K@" + uuid)
@@ -572,11 +678,7 @@ func (s *StandaloneSuite) TestGetWithLocalServiceHint(c *C) {
                        "zzzzz-bi6l4-xxxxxxxxxxxxxxx": ks0.url,
                        "zzzzz-bi6l4-wwwwwwwwwwwwwww": ks0.url,
                        uuid: ks.url},
-               map[string]string{
-                       "zzzzz-bi6l4-yyyyyyyyyyyyyyy": ks0.url,
-                       "zzzzz-bi6l4-xxxxxxxxxxxxxxx": ks0.url,
-                       "zzzzz-bi6l4-wwwwwwwwwwwwwww": ks0.url,
-                       uuid: ks.url},
+               nil,
                map[string]string{
                        "zzzzz-bi6l4-yyyyyyyyyyyyyyy": ks0.url,
                        "zzzzz-bi6l4-xxxxxxxxxxxxxxx": ks0.url,
@@ -619,7 +721,7 @@ func (s *StandaloneSuite) TestGetWithServiceHintFailoverToLocals(c *C) {
        arv.ApiToken = "abc123"
        kc.SetServiceRoots(
                map[string]string{"zzzzz-bi6l4-keepdisk0000000": ksLocal.url},
-               map[string]string{"zzzzz-bi6l4-keepdisk0000000": ksLocal.url},
+               nil,
                map[string]string{uuid: ksGateway.url})
 
        r, n, uri, err := kc.Get(hash + "+K@" + uuid)
@@ -654,7 +756,7 @@ func (s *StandaloneSuite) TestChecksum(c *C) {
        arv, err := arvadosclient.MakeArvadosClient()
        kc, _ := MakeKeepClient(&arv)
        arv.ApiToken = "abc123"
-       kc.SetServiceRoots(map[string]string{"x": ks.url}, map[string]string{ks.url: ""}, nil)
+       kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
 
        r, n, _, err := kc.Get(barhash)
        _, err = ioutil.ReadAll(r)
@@ -675,7 +777,7 @@ func (s *StandaloneSuite) TestGetWithFailures(c *C) {
        content := []byte("waz")
        hash := fmt.Sprintf("%x", md5.Sum(content))
 
-       fh := FailHandler{
+       fh := Error404Handler{
                make(chan string, 4)}
 
        st := StubGetHandler{
@@ -981,7 +1083,7 @@ func (s *StandaloneSuite) TestGetIndexWithNoPrefix(c *C) {
        arv, err := arvadosclient.MakeArvadosClient()
        kc, _ := MakeKeepClient(&arv)
        arv.ApiToken = "abc123"
-       kc.SetServiceRoots(map[string]string{"x": ks.url}, map[string]string{ks.url: ""}, nil)
+       kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
 
        r, err := kc.GetIndex("x", "")
        c.Check(err, Equals, nil)
@@ -1007,7 +1109,7 @@ func (s *StandaloneSuite) TestGetIndexWithPrefix(c *C) {
        arv, err := arvadosclient.MakeArvadosClient()
        kc, _ := MakeKeepClient(&arv)
        arv.ApiToken = "abc123"
-       kc.SetServiceRoots(map[string]string{"x": ks.url}, map[string]string{ks.url: ""}, nil)
+       kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
 
        r, err := kc.GetIndex("x", hash[0:3])
        c.Check(err, Equals, nil)
@@ -1033,7 +1135,7 @@ func (s *StandaloneSuite) TestGetIndexIncomplete(c *C) {
        arv, err := arvadosclient.MakeArvadosClient()
        kc, _ := MakeKeepClient(&arv)
        arv.ApiToken = "abc123"
-       kc.SetServiceRoots(map[string]string{"x": ks.url}, map[string]string{ks.url: ""}, nil)
+       kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
 
        _, err = kc.GetIndex("x", hash[0:3])
        c.Check(err, Equals, ErrIncompleteIndex)
@@ -1055,7 +1157,7 @@ func (s *StandaloneSuite) TestGetIndexWithNoSuchServer(c *C) {
        arv, err := arvadosclient.MakeArvadosClient()
        kc, _ := MakeKeepClient(&arv)
        arv.ApiToken = "abc123"
-       kc.SetServiceRoots(map[string]string{"x": ks.url}, map[string]string{ks.url: ""}, nil)
+       kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
 
        _, err = kc.GetIndex("y", hash[0:3])
        c.Check(err, Equals, ErrNoSuchKeepServer)
@@ -1075,7 +1177,7 @@ func (s *StandaloneSuite) TestGetIndexWithNoSuchPrefix(c *C) {
        arv, err := arvadosclient.MakeArvadosClient()
        kc, _ := MakeKeepClient(&arv)
        arv.ApiToken = "abc123"
-       kc.SetServiceRoots(map[string]string{"x": ks.url}, map[string]string{ks.url: ""}, nil)
+       kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
 
        r, err := kc.GetIndex("x", "abcd")
        c.Check(err, Equals, nil)
index 51e3e082b683e01ca153d352e54b8e53db9a9893..0791d3cf856ee7d5d1268338eafa883fe9bcbb18 100644 (file)
@@ -2,6 +2,7 @@ package keepclient
 
 import (
        "crypto/md5"
+       "encoding/json"
        "errors"
        "fmt"
        "git.curoverse.com/arvados.git/sdk/go/streamer"
@@ -76,19 +77,38 @@ func (this *KeepClient) setClientSettingsDisk() {
        }
 }
 
+type svcList struct {
+       Items []keepService `json:"items"`
+}
+
 // DiscoverKeepServers gets list of available keep services from api server
 func (this *KeepClient) DiscoverKeepServers() error {
-       type svcList struct {
-               Items []keepService `json:"items"`
-       }
-       var m svcList
+       var list svcList
 
        // Get keep services from api server
-       err := this.Arvados.Call("GET", "keep_services", "", "accessible", nil, &m)
+       err := this.Arvados.Call("GET", "keep_services", "", "accessible", nil, &list)
        if err != nil {
                return err
        }
 
+       return this.loadKeepServers(list)
+}
+
+// LoadKeepServicesFromJSON gets list of available keep services from given JSON
+func (this *KeepClient) LoadKeepServicesFromJSON(services string) error {
+       var list svcList
+
+       // Load keep services from given json
+       dec := json.NewDecoder(strings.NewReader(services))
+       if err := dec.Decode(&list); err != nil {
+               return err
+       }
+
+       return this.loadKeepServers(list)
+}
+
+// loadKeepServers
+func (this *KeepClient) loadKeepServers(list svcList) error {
        listed := make(map[string]bool)
        localRoots := make(map[string]string)
        gatewayRoots := make(map[string]string)
@@ -98,7 +118,7 @@ func (this *KeepClient) DiscoverKeepServers() error {
        this.replicasPerService = 1
        this.Using_proxy = false
 
-       for _, service := range m.Items {
+       for _, service := range list.Items {
                scheme := "http"
                if service.SSL {
                        scheme = "https"
index 853d7d30354daddb86e602c4b580141fc18e1bdf..80aeb268975d8acbc7f7d1c2e771c17e7adfa719 100644 (file)
@@ -251,6 +251,7 @@ func (s *StandaloneSuite) TestTransferShortBuffer(c *C) {
 
        n, err := sr.Read(out)
        c.Check(n, Equals, 100)
+       c.Check(err, IsNil)
 
        n, err = sr.Read(out)
        c.Check(n, Equals, 0)
index a4a194f69bcc8fbdbf43650caa881325bde5dc24..3f5f9344c521f3021e2f6fd35f025e4b9c7fb95e 100644 (file)
@@ -249,9 +249,7 @@ func (this *AsyncStream) transfer(source_reader io.Reader) {
                                        }
                                }
                        } else {
-                               if reader_status == io.EOF {
-                                       // no more reads expected, so this is ok
-                               } else {
+                               if reader_status == nil {
                                        // slices channel closed without signaling EOF
                                        reader_status = io.ErrUnexpectedEOF
                                }
index 5d0c42ad2109e2d605f5ab45fea5bd64fc26b1e8..d90d2ad1a7b61dd567bc6931c95a06b8d8463226 100644 (file)
@@ -323,8 +323,8 @@ def _start_keep(n, keep_args):
 
     return port
 
-def run_keep(blob_signing_key=None, enforce_permissions=False):
-    stop_keep()
+def run_keep(blob_signing_key=None, enforce_permissions=False, num_servers=2):
+    stop_keep(num_servers)
 
     keep_args = {}
     if not blob_signing_key:
@@ -344,12 +344,13 @@ def run_keep(blob_signing_key=None, enforce_permissions=False):
         host=os.environ['ARVADOS_API_HOST'],
         token=os.environ['ARVADOS_API_TOKEN'],
         insecure=True)
+
     for d in api.keep_services().list().execute()['items']:
         api.keep_services().delete(uuid=d['uuid']).execute()
     for d in api.keep_disks().list().execute()['items']:
         api.keep_disks().delete(uuid=d['uuid']).execute()
 
-    for d in range(0, 2):
+    for d in range(0, num_servers):
         port = _start_keep(d, keep_args)
         svc = api.keep_services().create(body={'keep_service': {
             'uuid': 'zzzzz-bi6l4-keepdisk{:07d}'.format(d),
@@ -371,9 +372,9 @@ def _stop_keep(n):
     if os.path.exists(os.path.join(TEST_TMPDIR, "keep.blob_signing_key")):
         os.remove(os.path.join(TEST_TMPDIR, "keep.blob_signing_key"))
 
-def stop_keep():
-    _stop_keep(0)
-    _stop_keep(1)
+def stop_keep(num_servers=2):
+    for n in range(0, num_servers):
+        _stop_keep(n)
 
 def run_keep_proxy():
     if 'ARVADOS_TEST_PROXY_SERVICES' in os.environ:
@@ -595,6 +596,9 @@ if __name__ == "__main__":
     parser = argparse.ArgumentParser()
     parser.add_argument('action', type=str, help="one of {}".format(actions))
     parser.add_argument('--auth', type=str, metavar='FIXTURE_NAME', help='Print authorization info for given api_client_authorizations fixture')
+    parser.add_argument('--num-keep-servers', metavar='int', type=int, default=2, help="Number of keep servers desired")
+    parser.add_argument('--keep-enforce-permissions', action="store_true", help="Enforce keep permissions")
+
     args = parser.parse_args()
 
     if args.action not in actions:
@@ -614,7 +618,7 @@ if __name__ == "__main__":
     elif args.action == 'stop':
         stop(force=('ARVADOS_TEST_API_HOST' not in os.environ))
     elif args.action == 'start_keep':
-        run_keep()
+        run_keep(enforce_permissions=args.keep_enforce_permissions, num_servers=args.num_keep_servers)
     elif args.action == 'stop_keep':
         stop_keep()
     elif args.action == 'start_keep_proxy':
index 8f1b687a9cad9a01e593b08f1e34cee2d4db8e53..1193e915363e80e7ccc70a0ce16c731dc3ebcaeb 100644 (file)
@@ -74,7 +74,7 @@ gem 'faye-websocket'
 gem 'themes_for_rails'
 
 gem 'arvados', '>= 0.1.20150615153458'
-gem 'arvados-cli', '>= 0.1.20150128223752'
+gem 'arvados-cli', '>=  0.1.20150605170031'
 
 # pg_power lets us use partial indexes in schema.rb in Rails 3
 gem 'pg_power'
index d671182a57c749b65bd9f7c93d2db215bc762f16..be4d4606ab21599e6e1c744d9f1ad40dd25887db 100644 (file)
@@ -41,10 +41,10 @@ GEM
       google-api-client (~> 0.6.3, >= 0.6.3)
       json (~> 1.7, >= 1.7.7)
       jwt (>= 0.1.5, < 1.0.0)
-    arvados-cli (0.1.20150205181653)
+    arvados-cli (0.1.20150930141818)
       activesupport (~> 3.2, >= 3.2.13)
       andand (~> 1.3, >= 1.3.3)
-      arvados (~> 0.1, >= 0.1.20150615153458)
+      arvados (~> 0.1, >= 0.1.20150128223554)
       curb (~> 0.8)
       google-api-client (~> 0.6.3, >= 0.6.3)
       json (~> 1.7, >= 1.7.7)
@@ -69,7 +69,7 @@ GEM
       coffee-script-source
       execjs
     coffee-script-source (1.7.0)
-    curb (0.8.6)
+    curb (0.8.8)
     daemon_controller (1.2.0)
     database_cleaner (1.2.0)
     erubis (2.7.0)
@@ -228,7 +228,7 @@ DEPENDENCIES
   acts_as_api
   andand
   arvados (>= 0.1.20150615153458)
-  arvados-cli (>= 0.1.20150128223752)
+  arvados-cli (>= 0.1.20150605170031)
   coffee-rails (~> 3.2.0)
   database_cleaner
   factory_girl_rails
index 657c3151caade8aeca6cbedcd96ffef70d3d7ac4..e9fda2ab76dd41c6cdde36cb139e3f3f030c273b 100644 (file)
@@ -9,6 +9,7 @@ import (
        "io/ioutil"
        "log"
        "os"
+       "regexp"
        "strings"
        "time"
 
@@ -19,8 +20,8 @@ var (
        azureStorageAccountName    string
        azureStorageAccountKeyFile string
        azureStorageReplication    int
-       azureWriteRaceInterval     time.Duration = 15 * time.Second
-       azureWriteRacePollTime     time.Duration = time.Second
+       azureWriteRaceInterval     = 15 * time.Second
+       azureWriteRacePollTime     = time.Second
 )
 
 func readKeyFromFile(file string) (string, error) {
@@ -96,6 +97,9 @@ type AzureBlobVolume struct {
        replication   int
 }
 
+// NewAzureBlobVolume returns a new AzureBlobVolume using the given
+// client and container name. The replication argument specifies the
+// replication level to report when writing data.
 func NewAzureBlobVolume(client storage.Client, containerName string, readonly bool, replication int) *AzureBlobVolume {
        return &AzureBlobVolume{
                azClient:      client,
@@ -118,6 +122,12 @@ func (v *AzureBlobVolume) Check() error {
        return nil
 }
 
+// Get reads a Keep block that has been stored as a block blob in the
+// container.
+//
+// If the block is younger than azureWriteRaceInterval and is
+// unexpectedly empty, assume a PutBlob operation is in progress, and
+// wait for it to finish writing.
 func (v *AzureBlobVolume) Get(loc string) ([]byte, error) {
        var deadline time.Time
        haveDeadline := false
@@ -169,6 +179,7 @@ func (v *AzureBlobVolume) get(loc string) ([]byte, error) {
        }
 }
 
+// Compare the given data with existing stored data.
 func (v *AzureBlobVolume) Compare(loc string, expect []byte) error {
        rdr, err := v.bsClient.GetBlob(v.containerName, loc)
        if err != nil {
@@ -178,6 +189,7 @@ func (v *AzureBlobVolume) Compare(loc string, expect []byte) error {
        return compareReaderWithBuf(rdr, expect, loc[:32])
 }
 
+// Put sotres a Keep block as a block blob in the container.
 func (v *AzureBlobVolume) Put(loc string, block []byte) error {
        if v.readonly {
                return MethodDisabledError
@@ -185,6 +197,7 @@ func (v *AzureBlobVolume) Put(loc string, block []byte) error {
        return v.bsClient.CreateBlockBlobFromReader(v.containerName, loc, uint64(len(block)), bytes.NewReader(block))
 }
 
+// Touch updates the last-modified property of a block blob.
 func (v *AzureBlobVolume) Touch(loc string) error {
        if v.readonly {
                return MethodDisabledError
@@ -194,6 +207,7 @@ func (v *AzureBlobVolume) Touch(loc string) error {
        })
 }
 
+// Mtime returns the last-modified property of a block blob.
 func (v *AzureBlobVolume) Mtime(loc string) (time.Time, error) {
        props, err := v.bsClient.GetBlobProperties(v.containerName, loc)
        if err != nil {
@@ -202,6 +216,8 @@ func (v *AzureBlobVolume) Mtime(loc string) (time.Time, error) {
        return time.Parse(time.RFC1123, props.LastModified)
 }
 
+// IndexTo writes a list of Keep blocks that are stored in the
+// container.
 func (v *AzureBlobVolume) IndexTo(prefix string, writer io.Writer) error {
        params := storage.ListBlobsParameters{
                Prefix: prefix,
@@ -216,6 +232,9 @@ func (v *AzureBlobVolume) IndexTo(prefix string, writer io.Writer) error {
                        if err != nil {
                                return err
                        }
+                       if !v.isKeepBlock(b.Name) {
+                               continue
+                       }
                        if b.Properties.ContentLength == 0 && t.Add(azureWriteRaceInterval).After(time.Now()) {
                                // A new zero-length blob is probably
                                // just a new non-empty blob that
@@ -233,6 +252,7 @@ func (v *AzureBlobVolume) IndexTo(prefix string, writer io.Writer) error {
        }
 }
 
+// Delete a Keep block.
 func (v *AzureBlobVolume) Delete(loc string) error {
        if v.readonly {
                return MethodDisabledError
@@ -256,6 +276,7 @@ func (v *AzureBlobVolume) Delete(loc string) error {
        })
 }
 
+// Status returns a VolumeStatus struct with placeholder data.
 func (v *AzureBlobVolume) Status() *VolumeStatus {
        return &VolumeStatus{
                DeviceNum: 1,
@@ -264,14 +285,19 @@ func (v *AzureBlobVolume) Status() *VolumeStatus {
        }
 }
 
+// String returns a volume label, including the container name.
 func (v *AzureBlobVolume) String() string {
        return fmt.Sprintf("azure-storage-container:%+q", v.containerName)
 }
 
+// Writable returns true, unless the -readonly flag was on when the
+// volume was added.
 func (v *AzureBlobVolume) Writable() bool {
        return !v.readonly
 }
 
+// Replication returns the replication level of the container, as
+// specified by the -azure-storage-replication argument.
 func (v *AzureBlobVolume) Replication() int {
        return v.replication
 }
@@ -289,3 +315,8 @@ func (v *AzureBlobVolume) translateError(err error) error {
                return err
        }
 }
+
+var keepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
+func (v *AzureBlobVolume) isKeepBlock(s string) bool {
+       return keepBlockRegexp.MatchString(s)
+}
index a4c6e62a7d5f6f02c938ae27872cb96bb6a8e3e6..a240c23e1622b525f62a6957a23c025651f94190 100644 (file)
@@ -60,11 +60,11 @@ func newAzStubHandler() *azStubHandler {
 }
 
 func (h *azStubHandler) TouchWithDate(container, hash string, t time.Time) {
-       if blob, ok := h.blobs[container+"|"+hash]; !ok {
+       blob, ok := h.blobs[container+"|"+hash]
+       if !ok {
                return
-       } else {
-               blob.Mtime = t
        }
+       blob.Mtime = t
 }
 
 func (h *azStubHandler) PutRaw(container, hash string, data []byte) {
@@ -427,7 +427,7 @@ func TestAzureBlobVolumeCreateBlobRaceDeadline(t *testing.T) {
        azureWriteRaceInterval = 2 * time.Second
        azureWriteRacePollTime = 5 * time.Millisecond
 
-       v.PutRaw(TestHash, []byte{})
+       v.PutRaw(TestHash, nil)
 
        buf := new(bytes.Buffer)
        v.IndexTo("", buf)
index 7ef079f1f7ac5b588c87e31918739db77a22ab82..61088f10fa2d4ef30e969a77e107b824073684e6 100644 (file)
@@ -350,6 +350,13 @@ func testIndexTo(t *testing.T, factory TestableVolumeFactory) {
        v.PutRaw(TestHash2, TestBlock2)
        v.PutRaw(TestHash3, TestBlock3)
 
+       // Blocks whose names aren't Keep hashes should be omitted from
+       // index
+       v.PutRaw("fffffffffnotreallyahashfffffffff", nil)
+       v.PutRaw("FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF", nil)
+       v.PutRaw("f0000000000000000000000000000000f", nil)
+       v.PutRaw("f00", nil)
+
        buf := new(bytes.Buffer)
        v.IndexTo("", buf)
        indexRows := strings.Split(string(buf.Bytes()), "\n")
index e745eb269100c5b5b0ba5ff7e1df6c62ab5cb089..910cc25d613cb7690f944b418aebf5c205c7aced 100644 (file)
@@ -292,6 +292,7 @@ func (v *UnixVolume) Status() *VolumeStatus {
 }
 
 var blockDirRe = regexp.MustCompile(`^[0-9a-f]+$`)
+var blockFileRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
 
 // IndexTo writes (to the given Writer) a list of blocks found on this
 // volume which begin with the specified prefix. If the prefix is an
@@ -348,6 +349,9 @@ func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
                        if !strings.HasPrefix(name, prefix) {
                                continue
                        }
+                       if !blockFileRe.MatchString(name) {
+                               continue
+                       }
                        _, err = fmt.Fprint(w,
                                name,
                                "+", fileInfo[0].Size(),
diff --git a/tools/keep-exercise/.gitignore b/tools/keep-exercise/.gitignore
new file mode 100644 (file)
index 0000000..6a1d10c
--- /dev/null
@@ -0,0 +1 @@
+keep-exercise
diff --git a/tools/keep-exercise/keep-exercise.go b/tools/keep-exercise/keep-exercise.go
new file mode 100644 (file)
index 0000000..a94c01e
--- /dev/null
@@ -0,0 +1,157 @@
+// Testing tool for Keep services.
+//
+// keepexercise helps measure throughput and test reliability under
+// various usage patterns.
+//
+// By default, it reads and writes blocks containing 2^26 NUL
+// bytes. This generates network traffic without consuming much disk
+// space.
+//
+// For a more realistic test, enable -vary-request. Warning: this will
+// fill your storage volumes with random data if you leave it running,
+// which can cost you money or leave you with too little room for
+// useful data.
+//
+package main
+
+import (
+       "crypto/rand"
+       "encoding/binary"
+       "flag"
+       "io"
+       "io/ioutil"
+       "log"
+       "time"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+       "git.curoverse.com/arvados.git/sdk/go/keepclient"
+)
+
+// Command line config knobs
+var (
+       BlockSize     = flag.Int("block-size", keepclient.BLOCKSIZE, "bytes per read/write op")
+       ReadThreads   = flag.Int("rthreads", 1, "number of concurrent readers")
+       WriteThreads  = flag.Int("wthreads", 1, "number of concurrent writers")
+       VaryRequest   = flag.Bool("vary-request", false, "vary the data for each request: consumes disk space, exercises write behavior")
+       VaryThread    = flag.Bool("vary-thread", false, "use -wthreads different data blocks")
+       Replicas      = flag.Int("replicas", 1, "replication level for writing")
+       StatsInterval = flag.Duration("stats-interval", time.Second, "time interval between IO stats reports, or 0 to disable")
+)
+
+func main() {
+       flag.Parse()
+
+       arv, err := arvadosclient.MakeArvadosClient()
+       if err != nil {
+               log.Fatal(err)
+       }
+       kc, err := keepclient.MakeKeepClient(&arv)
+       if err != nil {
+               log.Fatal(err)
+       }
+       kc.Want_replicas = *Replicas
+       kc.Client.Timeout = 10 * time.Minute
+
+       nextBuf := make(chan []byte, *WriteThreads)
+       nextLocator := make(chan string, *ReadThreads+*WriteThreads)
+
+       go countBeans(nextLocator)
+       for i := 0; i < *WriteThreads; i++ {
+               go makeBufs(nextBuf, i)
+               go doWrites(kc, nextBuf, nextLocator)
+       }
+       for i := 0; i < *ReadThreads; i++ {
+               go doReads(kc, nextLocator)
+       }
+       <-make(chan struct{})
+}
+
+// Send 1234 to bytesInChan when we receive 1234 bytes from keepstore.
+var bytesInChan = make(chan uint64)
+var bytesOutChan = make(chan uint64)
+
+// Send struct{}{} to errorsChan when an error happens.
+var errorsChan = make(chan struct{})
+
+func countBeans(nextLocator chan string) {
+       t0 := time.Now()
+       var tickChan <-chan time.Time
+       if *StatsInterval > 0 {
+               tickChan = time.NewTicker(*StatsInterval).C
+       }
+       var bytesIn uint64
+       var bytesOut uint64
+       var errors uint64
+       for {
+               select {
+               case <-tickChan:
+                       elapsed := time.Since(t0)
+                       log.Printf("%v elapsed: read %v bytes (%.1f MiB/s), wrote %v bytes (%.1f MiB/s), errors %d",
+                               elapsed,
+                               bytesIn, (float64(bytesIn) / elapsed.Seconds() / 1048576),
+                               bytesOut, (float64(bytesOut) / elapsed.Seconds() / 1048576),
+                               errors,
+                       )
+               case i := <-bytesInChan:
+                       bytesIn += i
+               case o := <-bytesOutChan:
+                       bytesOut += o
+               case <-errorsChan:
+                       errors++
+               }
+       }
+}
+
+func makeBufs(nextBuf chan []byte, threadID int) {
+       buf := make([]byte, *BlockSize)
+       if *VaryThread {
+               binary.PutVarint(buf, int64(threadID))
+       }
+       for {
+               if *VaryRequest {
+                       if _, err := io.ReadFull(rand.Reader, buf); err != nil {
+                               log.Fatal(err)
+                       }
+               }
+               nextBuf <- buf
+       }
+}
+
+func doWrites(kc *keepclient.KeepClient, nextBuf chan []byte, nextLocator chan string) {
+       for buf := range nextBuf {
+               locator, _, err := kc.PutB(buf)
+               if err != nil {
+                       log.Print(err)
+                       errorsChan <- struct{}{}
+                       continue
+               }
+               bytesOutChan <- uint64(len(buf))
+               for cap(nextLocator) > len(nextLocator)+*WriteThreads {
+                       // Give the readers something to do, unless
+                       // they have lots queued up already.
+                       nextLocator <- locator
+               }
+       }
+}
+
+func doReads(kc *keepclient.KeepClient, nextLocator chan string) {
+       for locator := range nextLocator {
+               rdr, size, url, err := kc.Get(locator)
+               if err != nil {
+                       log.Print(err)
+                       errorsChan <- struct{}{}
+                       continue
+               }
+               n, err := io.Copy(ioutil.Discard, rdr)
+               rdr.Close()
+               if n != size || err != nil {
+                       log.Printf("Got %d bytes (expected %d) from %s: %v", n, size, url, err)
+                       errorsChan <- struct{}{}
+                       continue
+                       // Note we don't count the bytes received in
+                       // partial/corrupt responses: we are measuring
+                       // throughput, not resource consumption.
+               }
+               bytesInChan <- uint64(n)
+       }
+}
diff --git a/tools/keep-rsync/.gitignore b/tools/keep-rsync/.gitignore
new file mode 100644 (file)
index 0000000..5ee7f3b
--- /dev/null
@@ -0,0 +1 @@
+keep-rsync
diff --git a/tools/keep-rsync/keep-rsync.go b/tools/keep-rsync/keep-rsync.go
new file mode 100644 (file)
index 0000000..705025b
--- /dev/null
@@ -0,0 +1,288 @@
+package main
+
+import (
+       "bufio"
+       "crypto/tls"
+       "errors"
+       "flag"
+       "fmt"
+       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+       "git.curoverse.com/arvados.git/sdk/go/keepclient"
+       "io/ioutil"
+       "log"
+       "net/http"
+       "os"
+       "regexp"
+       "strings"
+       "time"
+)
+
+func main() {
+       var srcConfigFile, dstConfigFile, srcKeepServicesJSON, dstKeepServicesJSON, prefix string
+       var replications int
+       var srcBlobSigningKey string
+
+       flag.StringVar(
+               &srcConfigFile,
+               "src",
+               "",
+               "Source configuration filename. May be either a pathname to a config file, or (for example) 'foo' as shorthand for $HOME/.config/arvados/foo.conf")
+
+       flag.StringVar(
+               &dstConfigFile,
+               "dst",
+               "",
+               "Destination configuration filename. May be either a pathname to a config file, or (for example) 'foo' as shorthand for $HOME/.config/arvados/foo.conf")
+
+       flag.StringVar(
+               &srcKeepServicesJSON,
+               "src-keep-services-json",
+               "",
+               "An optional list of available source keepservices. "+
+                       "If not provided, this list is obtained from api server configured in src-config-file.")
+
+       flag.StringVar(
+               &dstKeepServicesJSON,
+               "dst-keep-services-json",
+               "",
+               "An optional list of available destination keepservices. "+
+                       "If not provided, this list is obtained from api server configured in dst-config-file.")
+
+       flag.IntVar(
+               &replications,
+               "replications",
+               0,
+               "Number of replications to write to the destination. If replications not specified, "+
+                       "default replication level configured on destination server will be used.")
+
+       flag.StringVar(
+               &prefix,
+               "prefix",
+               "",
+               "Index prefix")
+
+       flag.Parse()
+
+       srcConfig, srcBlobSigningKey, err := loadConfig(srcConfigFile)
+       if err != nil {
+               log.Fatalf("Error loading src configuration from file: %s", err.Error())
+       }
+
+       dstConfig, _, err := loadConfig(dstConfigFile)
+       if err != nil {
+               log.Fatalf("Error loading dst configuration from file: %s", err.Error())
+       }
+
+       // setup src and dst keepclients
+       kcSrc, err := setupKeepClient(srcConfig, srcKeepServicesJSON, false, 0)
+       if err != nil {
+               log.Fatalf("Error configuring src keepclient: %s", err.Error())
+       }
+
+       kcDst, err := setupKeepClient(dstConfig, dstKeepServicesJSON, true, replications)
+       if err != nil {
+               log.Fatalf("Error configuring dst keepclient: %s", err.Error())
+       }
+
+       // Copy blocks not found in dst from src
+       err = performKeepRsync(kcSrc, kcDst, srcBlobSigningKey, prefix)
+       if err != nil {
+               log.Fatalf("Error while syncing data: %s", err.Error())
+       }
+}
+
+type apiConfig struct {
+       APIToken        string
+       APIHost         string
+       APIHostInsecure bool
+       ExternalClient  bool
+}
+
+// Load src and dst config from given files
+func loadConfig(configFile string) (config apiConfig, blobSigningKey string, err error) {
+       if configFile == "" {
+               return config, blobSigningKey, errors.New("config file not specified")
+       }
+
+       config, blobSigningKey, err = readConfigFromFile(configFile)
+       if err != nil {
+               return config, blobSigningKey, fmt.Errorf("Error reading config file: %v", err)
+       }
+
+       return
+}
+
+var matchTrue = regexp.MustCompile("^(?i:1|yes|true)$")
+
+// Read config from file
+func readConfigFromFile(filename string) (config apiConfig, blobSigningKey string, err error) {
+       if !strings.Contains(filename, "/") {
+               filename = os.Getenv("HOME") + "/.config/arvados/" + filename + ".conf"
+       }
+
+       content, err := ioutil.ReadFile(filename)
+
+       if err != nil {
+               return config, "", err
+       }
+
+       lines := strings.Split(string(content), "\n")
+       for _, line := range lines {
+               if line == "" {
+                       continue
+               }
+
+               kv := strings.SplitN(line, "=", 2)
+               key := strings.TrimSpace(kv[0])
+               value := strings.TrimSpace(kv[1])
+
+               switch key {
+               case "ARVADOS_API_TOKEN":
+                       config.APIToken = value
+               case "ARVADOS_API_HOST":
+                       config.APIHost = value
+               case "ARVADOS_API_HOST_INSECURE":
+                       config.APIHostInsecure = matchTrue.MatchString(value)
+               case "ARVADOS_EXTERNAL_CLIENT":
+                       config.ExternalClient = matchTrue.MatchString(value)
+               case "ARVADOS_BLOB_SIGNING_KEY":
+                       blobSigningKey = value
+               }
+       }
+       return
+}
+
+// setup keepclient using the config provided
+func setupKeepClient(config apiConfig, keepServicesJSON string, isDst bool, replications int) (kc *keepclient.KeepClient, err error) {
+       arv := arvadosclient.ArvadosClient{
+               ApiToken:    config.APIToken,
+               ApiServer:   config.APIHost,
+               ApiInsecure: config.APIHostInsecure,
+               Client: &http.Client{Transport: &http.Transport{
+                       TLSClientConfig: &tls.Config{InsecureSkipVerify: config.APIHostInsecure}}},
+               External: config.ExternalClient,
+       }
+
+       // if keepServicesJSON is provided, use it to load services; else, use DiscoverKeepServers
+       if keepServicesJSON == "" {
+               kc, err = keepclient.MakeKeepClient(&arv)
+               if err != nil {
+                       return nil, err
+               }
+       } else {
+               kc = keepclient.New(&arv)
+               err = kc.LoadKeepServicesFromJSON(keepServicesJSON)
+               if err != nil {
+                       return kc, err
+               }
+       }
+
+       if isDst {
+               // Get default replications value from destination, if it is not already provided
+               if replications == 0 {
+                       value, err := arv.Discovery("defaultCollectionReplication")
+                       if err == nil {
+                               replications = int(value.(float64))
+                       } else {
+                               return nil, err
+                       }
+               }
+
+               kc.Want_replicas = replications
+       }
+
+       return kc, nil
+}
+
+// Get unique block locators from src and dst
+// Copy any blocks missing in dst
+func performKeepRsync(kcSrc, kcDst *keepclient.KeepClient, blobSigningKey, prefix string) error {
+       // Get unique locators from src
+       srcIndex, err := getUniqueLocators(kcSrc, prefix)
+       if err != nil {
+               return err
+       }
+
+       // Get unique locators from dst
+       dstIndex, err := getUniqueLocators(kcDst, prefix)
+       if err != nil {
+               return err
+       }
+
+       // Get list of locators found in src, but missing in dst
+       toBeCopied := getMissingLocators(srcIndex, dstIndex)
+
+       // Copy each missing block to dst
+       log.Printf("Before keep-rsync, there are %d blocks in src and %d blocks in dst. Start copying %d blocks from src not found in dst.",
+               len(srcIndex), len(dstIndex), len(toBeCopied))
+
+       err = copyBlocksToDst(toBeCopied, kcSrc, kcDst, blobSigningKey)
+
+       return err
+}
+
+// Get list of unique locators from the specified cluster
+func getUniqueLocators(kc *keepclient.KeepClient, prefix string) (map[string]bool, error) {
+       uniqueLocators := map[string]bool{}
+
+       // Get index and dedup
+       for uuid := range kc.LocalRoots() {
+               reader, err := kc.GetIndex(uuid, prefix)
+               if err != nil {
+                       return uniqueLocators, err
+               }
+               scanner := bufio.NewScanner(reader)
+               for scanner.Scan() {
+                       uniqueLocators[strings.Split(scanner.Text(), " ")[0]] = true
+               }
+       }
+
+       return uniqueLocators, nil
+}
+
+// Get list of locators that are in src but not in dst
+func getMissingLocators(srcLocators, dstLocators map[string]bool) []string {
+       var missingLocators []string
+       for locator := range srcLocators {
+               if _, ok := dstLocators[locator]; !ok {
+                       missingLocators = append(missingLocators, locator)
+               }
+       }
+       return missingLocators
+}
+
+// Copy blocks from src to dst; only those that are missing in dst are copied
+func copyBlocksToDst(toBeCopied []string, kcSrc, kcDst *keepclient.KeepClient, blobSigningKey string) error {
+       total := len(toBeCopied)
+
+       startedAt := time.Now()
+       for done, locator := range toBeCopied {
+               if done == 0 {
+                       log.Printf("Copying data block %d of %d (%.2f%% done): %v", done+1, total,
+                               float64(done)/float64(total)*100, locator)
+               } else {
+                       timePerBlock := time.Since(startedAt) / time.Duration(done)
+                       log.Printf("Copying data block %d of %d (%.2f%% done, %v est. time remaining): %v", done+1, total,
+                               float64(done)/float64(total)*100, timePerBlock*time.Duration(total-done), locator)
+               }
+
+               getLocator := locator
+               expiresAt := time.Now().AddDate(0, 0, 1)
+               if blobSigningKey != "" {
+                       getLocator = keepclient.SignLocator(getLocator, kcSrc.Arvados.ApiToken, expiresAt, []byte(blobSigningKey))
+               }
+
+               reader, len, _, err := kcSrc.Get(getLocator)
+               if err != nil {
+                       return fmt.Errorf("Error getting block: %v %v", locator, err)
+               }
+
+               _, _, err = kcDst.PutHR(getLocator[:32], reader, len)
+               if err != nil {
+                       return fmt.Errorf("Error copying data block: %v %v", locator, err)
+               }
+       }
+
+       log.Printf("Successfully copied to destination %d blocks.", total)
+       return nil
+}
diff --git a/tools/keep-rsync/keep-rsync_test.go b/tools/keep-rsync/keep-rsync_test.go
new file mode 100644 (file)
index 0000000..299df5a
--- /dev/null
@@ -0,0 +1,414 @@
+package main
+
+import (
+       "crypto/md5"
+       "fmt"
+       "io/ioutil"
+       "os"
+       "strings"
+       "testing"
+       "time"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvadostest"
+       "git.curoverse.com/arvados.git/sdk/go/keepclient"
+
+       . "gopkg.in/check.v1"
+)
+
+// Gocheck boilerplate
+func Test(t *testing.T) {
+       TestingT(t)
+}
+
+// Gocheck boilerplate
+var _ = Suite(&ServerRequiredSuite{})
+var _ = Suite(&ServerNotRequiredSuite{})
+
+// Tests that require the Keep server running
+type ServerRequiredSuite struct{}
+type ServerNotRequiredSuite struct{}
+
+func (s *ServerRequiredSuite) SetUpSuite(c *C) {
+       // Start API server
+       arvadostest.StartAPI()
+}
+
+func (s *ServerRequiredSuite) TearDownSuite(c *C) {
+       arvadostest.StopAPI()
+       arvadostest.ResetEnv()
+}
+
+var kcSrc, kcDst *keepclient.KeepClient
+var srcKeepServicesJSON, dstKeepServicesJSON, blobSigningKey string
+
+func (s *ServerRequiredSuite) SetUpTest(c *C) {
+       // reset all variables between tests
+       blobSigningKey = ""
+       srcKeepServicesJSON = ""
+       dstKeepServicesJSON = ""
+       kcSrc = &keepclient.KeepClient{}
+       kcDst = &keepclient.KeepClient{}
+}
+
+func (s *ServerRequiredSuite) TearDownTest(c *C) {
+       arvadostest.StopKeepWithParams(3)
+}
+
+var testKeepServicesJSON = "{ \"kind\":\"arvados#keepServiceList\", \"etag\":\"\", \"self_link\":\"\", \"offset\":null, \"limit\":null, \"items\":[ { \"href\":\"/keep_services/zzzzz-bi6l4-123456789012340\", \"kind\":\"arvados#keepService\", \"etag\":\"641234567890enhj7hzx432e5\", \"uuid\":\"zzzzz-bi6l4-123456789012340\", \"owner_uuid\":\"zzzzz-tpzed-123456789012345\", \"service_host\":\"keep0.zzzzz.arvadosapi.com\", \"service_port\":25107, \"service_ssl_flag\":false, \"service_type\":\"disk\", \"read_only\":false }, { \"href\":\"/keep_services/zzzzz-bi6l4-123456789012341\", \"kind\":\"arvados#keepService\", \"etag\":\"641234567890enhj7hzx432e5\", \"uuid\":\"zzzzz-bi6l4-123456789012341\", \"owner_uuid\":\"zzzzz-tpzed-123456789012345\", \"service_host\":\"keep0.zzzzz.arvadosapi.com\", \"service_port\":25108, \"service_ssl_flag\":false, \"service_type\":\"disk\", \"read_only\":false } ], \"items_available\":2 }"
+
+// Testing keep-rsync needs two sets of keep services: src and dst.
+// The test setup hence creates 3 servers instead of the default 2,
+// and uses the first 2 as src and the 3rd as dst keep servers.
+func setupRsync(c *C, enforcePermissions bool, replications int) {
+       // srcConfig
+       var srcConfig apiConfig
+       srcConfig.APIHost = os.Getenv("ARVADOS_API_HOST")
+       srcConfig.APIToken = os.Getenv("ARVADOS_API_TOKEN")
+       srcConfig.APIHostInsecure = matchTrue.MatchString(os.Getenv("ARVADOS_API_HOST_INSECURE"))
+
+       // dstConfig
+       var dstConfig apiConfig
+       dstConfig.APIHost = os.Getenv("ARVADOS_API_HOST")
+       dstConfig.APIToken = os.Getenv("ARVADOS_API_TOKEN")
+       dstConfig.APIHostInsecure = matchTrue.MatchString(os.Getenv("ARVADOS_API_HOST_INSECURE"))
+
+       if enforcePermissions {
+               blobSigningKey = "zfhgfenhffzltr9dixws36j1yhksjoll2grmku38mi7yxd66h5j4q9w4jzanezacp8s6q0ro3hxakfye02152hncy6zml2ed0uc"
+       }
+
+       // Start Keep servers
+       arvadostest.StartAPI()
+       arvadostest.StartKeepWithParams(3, enforcePermissions)
+
+       // setup keepclients
+       var err error
+       kcSrc, err = setupKeepClient(srcConfig, srcKeepServicesJSON, false, 0)
+       c.Check(err, IsNil)
+
+       kcDst, err = setupKeepClient(dstConfig, dstKeepServicesJSON, true, replications)
+       c.Check(err, IsNil)
+
+       for uuid := range kcSrc.LocalRoots() {
+               if strings.HasSuffix(uuid, "02") {
+                       delete(kcSrc.LocalRoots(), uuid)
+               }
+       }
+       for uuid := range kcSrc.GatewayRoots() {
+               if strings.HasSuffix(uuid, "02") {
+                       delete(kcSrc.GatewayRoots(), uuid)
+               }
+       }
+       for uuid := range kcSrc.WritableLocalRoots() {
+               if strings.HasSuffix(uuid, "02") {
+                       delete(kcSrc.WritableLocalRoots(), uuid)
+               }
+       }
+
+       for uuid := range kcDst.LocalRoots() {
+               if strings.HasSuffix(uuid, "00") || strings.HasSuffix(uuid, "01") {
+                       delete(kcDst.LocalRoots(), uuid)
+               }
+       }
+       for uuid := range kcDst.GatewayRoots() {
+               if strings.HasSuffix(uuid, "00") || strings.HasSuffix(uuid, "01") {
+                       delete(kcDst.GatewayRoots(), uuid)
+               }
+       }
+       for uuid := range kcDst.WritableLocalRoots() {
+               if strings.HasSuffix(uuid, "00") || strings.HasSuffix(uuid, "01") {
+                       delete(kcDst.WritableLocalRoots(), uuid)
+               }
+       }
+
+       if replications == 0 {
+               // Must have got default replications value of 2 from dst discovery document
+               c.Assert(kcDst.Want_replicas, Equals, 2)
+       } else {
+               // Since replications value is provided, it is used
+               c.Assert(kcDst.Want_replicas, Equals, replications)
+       }
+}
+
+func (s *ServerRequiredSuite) TestRsyncPutInOne_GetFromOtherShouldFail(c *C) {
+       setupRsync(c, false, 1)
+
+       // Put a block in src and verify that it is not found in dst
+       testNoCrosstalk(c, "test-data-1", kcSrc, kcDst)
+
+       // Put a block in dst and verify that it is not found in src
+       testNoCrosstalk(c, "test-data-2", kcDst, kcSrc)
+}
+
+func (s *ServerRequiredSuite) TestRsyncWithBlobSigning_PutInOne_GetFromOtherShouldFail(c *C) {
+       setupRsync(c, true, 1)
+
+       // Put a block in src and verify that it is not found in dst
+       testNoCrosstalk(c, "test-data-1", kcSrc, kcDst)
+
+       // Put a block in dst and verify that it is not found in src
+       testNoCrosstalk(c, "test-data-2", kcDst, kcSrc)
+}
+
+// Do a Put in the first and Get from the second,
+// which should raise block not found error.
+func testNoCrosstalk(c *C, testData string, kc1, kc2 *keepclient.KeepClient) {
+       // Put a block using kc1
+       locator, _, err := kc1.PutB([]byte(testData))
+       c.Assert(err, Equals, nil)
+
+       locator = strings.Split(locator, "+")[0]
+       _, _, _, err = kc2.Get(keepclient.SignLocator(locator, kc2.Arvados.ApiToken, time.Now().AddDate(0, 0, 1), []byte(blobSigningKey)))
+       c.Assert(err, NotNil)
+       c.Check(err.Error(), Equals, "Block not found")
+}
+
+// Test keep-rsync initialization, with srcKeepServicesJSON
+func (s *ServerRequiredSuite) TestRsyncInitializeWithKeepServicesJSON(c *C) {
+       srcKeepServicesJSON = testKeepServicesJSON
+
+       setupRsync(c, false, 1)
+
+       localRoots := kcSrc.LocalRoots()
+       c.Check(localRoots, NotNil)
+
+       foundIt := false
+       for k := range localRoots {
+               if k == "zzzzz-bi6l4-123456789012340" {
+                       foundIt = true
+               }
+       }
+       c.Check(foundIt, Equals, true)
+
+       foundIt = false
+       for k := range localRoots {
+               if k == "zzzzz-bi6l4-123456789012341" {
+                       foundIt = true
+               }
+       }
+       c.Check(foundIt, Equals, true)
+}
+
+// Test keep-rsync initialization with default replications count
+func (s *ServerRequiredSuite) TestInitializeRsyncDefaultReplicationsCount(c *C) {
+       setupRsync(c, false, 0)
+}
+
+// Test keep-rsync initialization with replications count argument
+func (s *ServerRequiredSuite) TestInitializeRsyncReplicationsCount(c *C) {
+       setupRsync(c, false, 3)
+}
+
+// Put some blocks in Src and some more in Dst
+// And copy missing blocks from Src to Dst
+func (s *ServerRequiredSuite) TestKeepRsync(c *C) {
+       testKeepRsync(c, false, "")
+}
+
+// Put some blocks in Src and some more in Dst with blob signing enabled.
+// And copy missing blocks from Src to Dst
+func (s *ServerRequiredSuite) TestKeepRsync_WithBlobSigning(c *C) {
+       testKeepRsync(c, true, "")
+}
+
+// Put some blocks in Src and some more in Dst
+// Use prefix while doing rsync
+// And copy missing blocks from Src to Dst
+func (s *ServerRequiredSuite) TestKeepRsync_WithPrefix(c *C) {
+       data := []byte("test-data-4")
+       hash := fmt.Sprintf("%x", md5.Sum(data))
+
+       testKeepRsync(c, false, hash[0:3])
+       c.Check(len(dstIndex) > len(dstLocators), Equals, true)
+}
+
+// Put some blocks in Src and some more in Dst
+// Use prefix not in src while doing rsync
+// And copy missing blocks from Src to Dst
+func (s *ServerRequiredSuite) TestKeepRsync_WithNoSuchPrefixInSrc(c *C) {
+       testKeepRsync(c, false, "999")
+       c.Check(len(dstIndex), Equals, len(dstLocators))
+}
+
+// Put 5 blocks in src. Put 2 of those blocks in dst
+// Hence there are 3 additional blocks in src
+// Also, put 2 extra blocks in dst; they are hence only in dst
+// Run rsync and verify that those 7 blocks are now available in dst
+func testKeepRsync(c *C, enforcePermissions bool, prefix string) {
+       setupRsync(c, enforcePermissions, 1)
+
+       // setupTestData
+       setupTestData(c, prefix)
+
+       err := performKeepRsync(kcSrc, kcDst, blobSigningKey, prefix)
+       c.Check(err, IsNil)
+
+       // Now GetIndex from dst and verify that all 5 from src and the 2 extra blocks are found
+       dstIndex, err = getUniqueLocators(kcDst, "")
+       c.Check(err, IsNil)
+
+       for _, locator := range srcLocatorsMatchingPrefix {
+               _, ok := dstIndex[locator]
+               c.Assert(ok, Equals, true)
+       }
+
+       for _, locator := range extraDstLocators {
+               _, ok := dstIndex[locator]
+               c.Assert(ok, Equals, true)
+       }
+
+       if prefix == "" {
+               // all blocks from src and the two extra blocks
+               c.Assert(len(dstIndex), Equals, len(srcLocators)+len(extraDstLocators))
+       } else {
+               // 1 matching prefix and copied over, 2 that were initially copied into dst along with src, and the 2 extra blocks
+               c.Assert(len(dstIndex), Equals, len(srcLocatorsMatchingPrefix)+len(extraDstLocators)+2)
+       }
+}
+
+// Setup test data in src and dst.
+var srcLocators, srcLocatorsMatchingPrefix, dstLocators, extraDstLocators []string
+var dstIndex map[string]bool
+
+func setupTestData(c *C, indexPrefix string) {
+       srcLocators = []string{}
+       srcLocatorsMatchingPrefix = []string{}
+       dstLocators = []string{}
+       extraDstLocators = []string{}
+       dstIndex = make(map[string]bool)
+
+       // Put a few blocks in src using kcSrc
+       for i := 0; i < 5; i++ {
+               hash, _, err := kcSrc.PutB([]byte(fmt.Sprintf("test-data-%d", i)))
+               c.Check(err, IsNil)
+
+               srcLocators = append(srcLocators, strings.Split(hash, "+A")[0])
+               if strings.HasPrefix(hash, indexPrefix) {
+                       srcLocatorsMatchingPrefix = append(srcLocatorsMatchingPrefix, strings.Split(hash, "+A")[0])
+               }
+       }
+
+       // Put first two of those src blocks in dst using kcDst
+       for i := 0; i < 2; i++ {
+               hash, _, err := kcDst.PutB([]byte(fmt.Sprintf("test-data-%d", i)))
+               c.Check(err, IsNil)
+               dstLocators = append(dstLocators, strings.Split(hash, "+A")[0])
+       }
+
+       // Put two more blocks in dst; they are not in src at all
+       for i := 0; i < 2; i++ {
+               hash, _, err := kcDst.PutB([]byte(fmt.Sprintf("other-data-%d", i)))
+               c.Check(err, IsNil)
+               dstLocators = append(dstLocators, strings.Split(hash, "+A")[0])
+               extraDstLocators = append(extraDstLocators, strings.Split(hash, "+A")[0])
+       }
+}
+
+// Setup rsync using srcKeepServicesJSON with fake keepservers.
+// Expect error during performKeepRsync due to unreachable src keepservers.
+func (s *ServerRequiredSuite) TestErrorDuringRsync_FakeSrcKeepservers(c *C) {
+       srcKeepServicesJSON = testKeepServicesJSON
+
+       setupRsync(c, false, 1)
+
+       err := performKeepRsync(kcSrc, kcDst, "", "")
+       c.Check(strings.HasSuffix(err.Error(), "no such host"), Equals, true)
+}
+
+// Setup rsync using dstKeepServicesJSON with fake keepservers.
+// Expect error during performKeepRsync due to unreachable dst keepservers.
+func (s *ServerRequiredSuite) TestErrorDuringRsync_FakeDstKeepservers(c *C) {
+       dstKeepServicesJSON = testKeepServicesJSON
+
+       setupRsync(c, false, 1)
+
+       err := performKeepRsync(kcSrc, kcDst, "", "")
+       c.Check(strings.HasSuffix(err.Error(), "no such host"), Equals, true)
+}
+
+// Test rsync with signature error during Get from src.
+func (s *ServerRequiredSuite) TestErrorDuringRsync_ErrorGettingBlockFromSrc(c *C) {
+       setupRsync(c, true, 1)
+
+       // put some blocks in src and dst
+       setupTestData(c, "")
+
+       // Change blob signing key to a fake key, so that Get from src fails
+       blobSigningKey = "thisisfakeblobsigningkey"
+
+       err := performKeepRsync(kcSrc, kcDst, blobSigningKey, "")
+       c.Check(strings.HasSuffix(err.Error(), "Block not found"), Equals, true)
+}
+
+// Test rsync with error during Put to src.
+func (s *ServerRequiredSuite) TestErrorDuringRsync_ErrorPuttingBlockInDst(c *C) {
+       setupRsync(c, false, 1)
+
+       // put some blocks in src and dst
+       setupTestData(c, "")
+
+       // Increase Want_replicas on dst to result in insufficient replicas error during Put
+       kcDst.Want_replicas = 2
+
+       err := performKeepRsync(kcSrc, kcDst, blobSigningKey, "")
+       c.Check(strings.HasSuffix(err.Error(), "Could not write sufficient replicas"), Equals, true)
+}
+
+// Test loadConfig func
+func (s *ServerNotRequiredSuite) TestLoadConfig(c *C) {
+       // Setup a src config file
+       srcFile := setupConfigFile(c, "src-config")
+       defer os.Remove(srcFile.Name())
+       srcConfigFile := srcFile.Name()
+
+       // Setup a dst config file
+       dstFile := setupConfigFile(c, "dst-config")
+       defer os.Remove(dstFile.Name())
+       dstConfigFile := dstFile.Name()
+
+       // load configuration from those files
+       srcConfig, srcBlobSigningKey, err := loadConfig(srcConfigFile)
+       c.Check(err, IsNil)
+
+       c.Assert(srcConfig.APIHost, Equals, "testhost")
+       c.Assert(srcConfig.APIToken, Equals, "testtoken")
+       c.Assert(srcConfig.APIHostInsecure, Equals, true)
+       c.Assert(srcConfig.ExternalClient, Equals, false)
+
+       dstConfig, _, err := loadConfig(dstConfigFile)
+       c.Check(err, IsNil)
+
+       c.Assert(dstConfig.APIHost, Equals, "testhost")
+       c.Assert(dstConfig.APIToken, Equals, "testtoken")
+       c.Assert(dstConfig.APIHostInsecure, Equals, true)
+       c.Assert(dstConfig.ExternalClient, Equals, false)
+
+       c.Assert(srcBlobSigningKey, Equals, "abcdefg")
+}
+
+// Test loadConfig func without setting up the config files
+func (s *ServerNotRequiredSuite) TestLoadConfig_MissingSrcConfig(c *C) {
+       _, _, err := loadConfig("")
+       c.Assert(err.Error(), Equals, "config file not specified")
+}
+
+// Test loadConfig func - error reading config
+func (s *ServerNotRequiredSuite) TestLoadConfig_ErrorLoadingSrcConfig(c *C) {
+       _, _, err := loadConfig("no-such-config-file")
+       c.Assert(strings.HasSuffix(err.Error(), "no such file or directory"), Equals, true)
+}
+
+func setupConfigFile(c *C, name string) *os.File {
+       // Setup a config file
+       file, err := ioutil.TempFile(os.TempDir(), name)
+       c.Check(err, IsNil)
+
+       fileContent := "ARVADOS_API_HOST=testhost\n"
+       fileContent += "ARVADOS_API_TOKEN=testtoken\n"
+       fileContent += "ARVADOS_API_HOST_INSECURE=true\n"
+       fileContent += "ARVADOS_BLOB_SIGNING_KEY=abcdefg"
+
+       _, err = file.Write([]byte(fileContent))
+       c.Check(err, IsNil)
+
+       return file
+}