Merge branch '7015-update-user-guide'
authorBryan Cosca <bcosc@curoverse.com>
Wed, 21 Oct 2015 15:36:25 +0000 (11:36 -0400)
committerBryan Cosca <bcosc@curoverse.com>
Wed, 21 Oct 2015 15:36:25 +0000 (11:36 -0400)
closes #7015

36 files changed:
apps/workbench/Gemfile.lock
apps/workbench/app/views/getting_started/_getting_started_popup.html.erb
doc/install/install-manual-prerequisites.html.textile.liquid
doc/install/install-sso.html.textile.liquid
doc/sdk/cli/subcommands.html.textile.liquid
sdk/cli/bin/arv
sdk/cli/test/test_arv-get.rb
sdk/cli/test/test_arv-keep-get.rb [new file with mode: 0644]
sdk/cli/test/test_arv-keep-put.rb [moved from sdk/cli/test/test_arv-put.rb with 99% similarity]
sdk/go/arvadosclient/arvadosclient_test.go
sdk/go/arvadostest/run_servers.go
sdk/go/keepclient/keepclient.go
sdk/go/keepclient/keepclient_test.go
sdk/go/keepclient/support.go
sdk/go/streamer/streamer_test.go
sdk/go/streamer/transfer.go
sdk/python/arvados/keep.py
sdk/python/tests/nginx.conf
sdk/python/tests/run_test_server.py
sdk/python/tests/test_keep_client.py
services/api/Gemfile
services/api/Gemfile.lock
services/datamanager/datamanager_test.go
services/keepproxy/keepproxy_test.go
services/keepstore/azure_blob_volume.go
services/keepstore/azure_blob_volume_test.go
services/keepstore/pull_worker_integration_test.go
services/keepstore/volume_generic_test.go
services/keepstore/volume_unix.go
services/nodemanager/arvnodeman/computenode/dispatch/slurm.py
services/nodemanager/tests/test_computenode_dispatch_slurm.py
tools/keep-exercise/.gitignore [new file with mode: 0644]
tools/keep-exercise/keep-exercise.go [new file with mode: 0644]
tools/keep-rsync/.gitignore [new file with mode: 0644]
tools/keep-rsync/keep-rsync.go [new file with mode: 0644]
tools/keep-rsync/keep-rsync_test.go [new file with mode: 0644]

index 20b8d6164ccca273e11756928a21c1a17851f07a..8b2118ce087d220c32edf4acd35e7a934404a946 100644 (file)
@@ -74,7 +74,7 @@ GEM
       rack (>= 1.0.0)
       rack-test (>= 0.5.4)
       xpath (~> 2.0)
-    childprocess (0.5.5)
+    childprocess (0.5.6)
       ffi (~> 1.0, >= 1.0.11)
     cliver (0.3.2)
     coffee-rails (4.1.0)
@@ -98,7 +98,7 @@ GEM
     fast_stack (0.1.0)
       rake
       rake-compiler
-    ffi (1.9.6)
+    ffi (1.9.10)
     flamegraph (0.1.0)
       fast_stack
     google-api-client (0.6.4)
@@ -139,7 +139,7 @@ GEM
       metaclass (~> 0.0.1)
     morrisjs-rails (0.5.1)
       railties (> 3.1, < 5)
-    multi_json (1.11.1)
+    multi_json (1.11.2)
     multipart-post (1.2.0)
     net-scp (1.2.1)
       net-ssh (>= 2.6.5)
@@ -192,7 +192,7 @@ GEM
     ref (1.0.5)
     ruby-debug-passenger (0.2.0)
     ruby-prof (0.15.2)
-    rubyzip (1.1.6)
+    rubyzip (1.1.7)
     rvm-capistrano (1.5.5)
       capistrano (~> 2.15.4)
     sass (3.4.9)
@@ -202,7 +202,7 @@ GEM
       sprockets (>= 2.8, < 4.0)
       sprockets-rails (>= 2.0, < 4.0)
       tilt (~> 1.1)
-    selenium-webdriver (2.44.0)
+    selenium-webdriver (2.48.1)
       childprocess (~> 0.5)
       multi_json (~> 1.0)
       rubyzip (~> 1.0)
@@ -239,7 +239,7 @@ GEM
       execjs (>= 0.3.0)
       json (>= 1.8.0)
     uuidtools (2.1.5)
-    websocket (1.2.1)
+    websocket (1.2.2)
     websocket-driver (0.5.1)
       websocket-extensions (>= 0.1.0)
     websocket-extensions (0.1.1)
@@ -294,3 +294,6 @@ DEPENDENCIES
   therubyracer
   uglifier (>= 1.0.3)
   wiselinks
+
+BUNDLED WITH
+   1.10.6
index 0db0567ec98b22495b6e063479a60ba753b5bc83..3020a1249bca287a41103b6f5924c134b2090adc 100644 (file)
@@ -154,7 +154,7 @@ div.figure p {
             </li><li>
               <strong>Use existing pipelines</strong>: Use best-practices pipelines on your own data with the click of a button.
             </li><li>
-              <strong>Open-source</strong>: Arvados is completely open-source. Check out our <a href="http://arvados.org">developer site</a>.
+              <strong>Open source</strong>: Arvados is completely open source. Check out our <a href="http://dev.arvados.org">developer site</a>.
             </li>
           </ol>
           <p style="margin-top: 1em;">
index 52a51a191aafb90188d1906583b419f0135ca49b..a26370d21bf5f4bb877a7ece60957ff9b1eff6d1 100644 (file)
@@ -42,7 +42,7 @@ baseurl=http://rpm.arvados.org/CentOS/$releasever/os/$basearch/
 
 h3. Debian and Ubuntu
 
-Packages are available for Debian 7 ("wheezy"), Ubuntu 12.04 ("precise"), and Ubuntu 14.04 ("trusty").
+Packages are available for Debian 7 ("wheezy"), Debian 8 ("jessie"), Ubuntu 12.04 ("precise"), and Ubuntu 14.04 ("trusty").
 
 First, register the Curoverse signing key in apt's database:
 
@@ -53,6 +53,7 @@ Configure apt to retrieve packages from the Arvados package repository. This com
 table(table table-bordered table-condensed).
 |OS version|Command|
 |Debian 7 ("wheezy")|<notextile><code><span class="userinput">echo "deb http://apt.arvados.org/ wheezy main" &#x7c; sudo tee /etc/apt/sources.list.d/arvados.list</span></code></notextile>|
+|Debian 8 ("jessie")|<notextile><code><span class="userinput">echo "deb http://apt.arvados.org/ jessie main" &#x7c; sudo tee /etc/apt/sources.list.d/arvados.list</span></code></notextile>|
 |Ubuntu 12.04 ("precise")|<notextile><code><span class="userinput">echo "deb http://apt.arvados.org/ precise main" &#x7c; sudo tee /etc/apt/sources.list.d/arvados.list</span></code></notextile>|
 |Ubuntu 14.04 ("trusty")|<notextile><code><span class="userinput">echo "deb http://apt.arvados.org/ trusty main" &#x7c; sudo tee /etc/apt/sources.list.d/arvados.list</span></code></notextile>|
 
index 56c7a4b337eb5eb0c2b6c8fbd2aa47675e353b7c..ca620f478a8d215066fde5a9ffa157032174f4db 100644 (file)
@@ -164,7 +164,8 @@ Use @rails console@ to create a @Client@ record that will be used by the Arvados
 <notextile>
 <pre><code>~$ <span class="userinput">ruby -e 'puts rand(2**400).to_s(36)'</span>
 xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
-~$ <span class="userinput">RAILS_ENV=production bundle exec rails console</span>
+~$ <span class="userinput">cd /var/www/arvados-sso/current</span>
+/var/www/arvados-sso/current$ <span class="userinput">RAILS_ENV=production bundle exec rails console</span>
 :001 &gt; <span class="userinput">c = Client.new</span>
 :002 &gt; <span class="userinput">c.name = "joshid"</span>
 :003 &gt; <span class="userinput">c.app_id = "arvados-server"</span>
index aa7af94d5f9d4ac8527e7d3e238e15d19f177905..ca494fe17a570e0868af2157fcc92dda6b480cc2 100644 (file)
@@ -21,6 +21,20 @@ Options:
 </pre>
 </notextile>
 
+h3(#arv-get). arv get
+
+@arv get@ can be used to get a textual representation of Arvados objects from the command line. The output can be limited to a subset of the object's fields. This command can be used with only the knowledge of an object's UUID.
+
+<notextile>
+<pre>
+$ <code class="userinput">arv get --help</code>
+Usage: arv [--format json|yaml] get [uuid] [fields...]
+
+Fetch the specified Arvados object, select the specified fields,
+and print a text representation.
+</pre>
+</notextile>
+
 h3(#arv-edit). arv edit
 
 @arv edit@ can be used to edit Arvados objects from the command line. Arv edit opens up the editor of your choice (set the EDITOR environment variable) with the json or yaml description of the object. Saving the file will update the Arvados object on the API server, if it passes validation.
index 2bd7f4ef465d7a3427425716c11f9e3574d9f226..185a5b0673f1dc1c5afc5ed6530386d8255daaf4 100755 (executable)
@@ -5,6 +5,7 @@
 # Ward Vandewege <ward@curoverse.com>
 
 require 'fileutils'
+require 'shellwords'
 
 if RUBY_VERSION < '1.9.3' then
   abort <<-EOS
@@ -85,7 +86,15 @@ def init_config
 end
 
 
-subcommands = %w(copy create edit keep pipeline run tag ws)
+subcommands = %w(copy create edit get keep pipeline run tag ws)
+
+def exec_bin bin, opts
+  bin_path = `which #{bin.shellescape}`.strip
+  if bin_path.empty?
+    raise "#{bin}: command not found"
+  end
+  exec bin_path, *opts
+end
 
 def check_subcommands client, arvados, subcommand, global_opts, remaining_opts
   case subcommand
@@ -93,15 +102,17 @@ def check_subcommands client, arvados, subcommand, global_opts, remaining_opts
     arv_create client, arvados, global_opts, remaining_opts
   when 'edit'
     arv_edit client, arvados, global_opts, remaining_opts
+  when 'get'
+    arv_get client, arvados, global_opts, remaining_opts
   when 'copy', 'tag', 'ws', 'run'
-    exec `which arv-#{subcommand}`.strip, *remaining_opts
+    exec_bin "arv-#{subcommand}", remaining_opts
   when 'keep'
     @sub = remaining_opts.shift
     if ['get', 'put', 'ls', 'normalize'].index @sub then
       # Native Arvados
-      exec `which arv-#{@sub}`.strip, *remaining_opts
+      exec_bin "arv-#{@sub}", remaining_opts
     elsif @sub == 'docker'
-      exec `which arv-keepdocker`.strip, *remaining_opts
+      exec_bin "arv-keepdocker", remaining_opts
     else
       puts "Usage: arv keep [method] [--parameters]\n"
       puts "Use 'arv keep [method] --help' to get more information about specific methods.\n\n"
@@ -111,7 +122,7 @@ def check_subcommands client, arvados, subcommand, global_opts, remaining_opts
   when 'pipeline'
     sub = remaining_opts.shift
     if sub == 'run'
-      exec `which arv-run-pipeline-instance`.strip, *remaining_opts
+      exec_bin "arv-run-pipeline-instance", remaining_opts
     else
       puts "Usage: arv pipeline [method] [--parameters]\n"
       puts "Use 'arv pipeline [method] --help' to get more information about specific methods.\n\n"
@@ -147,14 +158,7 @@ end
 
 def edit_and_commit_object initial_obj, tmp_stem, global_opts, &block
 
-  content = case global_opts[:format]
-            when 'json'
-              Oj.dump(initial_obj, :indent => 1)
-            when 'yaml'
-              initial_obj.to_yaml
-            else
-              abort "Unrecognized format #{global_opts[:format]}"
-            end
+  content = get_obj_content initial_obj, global_opts
 
   tmp_file = Tempfile.new([tmp_stem, ".#{global_opts[:format]}"])
   tmp_file.write(content)
@@ -179,6 +183,8 @@ def edit_and_commit_object initial_obj, tmp_stem, global_opts, &block
                    Oj.load(newcontent)
                  when 'yaml'
                    YAML.load(newcontent)
+                 else
+                   abort "Unrecognized format #{global_opts[:format]}"
                  end
 
         yield newobj
@@ -243,20 +249,7 @@ def check_response result
   results
 end
 
-def arv_edit client, arvados, global_opts, remaining_opts
-  uuid = remaining_opts.shift
-  if uuid.nil? or uuid == "-h" or uuid == "--help"
-    puts head_banner
-    puts "Usage: arv edit [uuid] [fields...]\n\n"
-    puts "Fetch the specified Arvados object, select the specified fields, \n"
-    puts "open an interactive text editor on a text representation (json or\n"
-    puts "yaml, use --format) and then update the object.  Will use 'nano'\n"
-    puts "by default, customize with the EDITOR or VISUAL environment variable.\n"
-    exit 255
-  end
-
-  # determine controller
-
+def lookup_uuid_rsc arvados, uuid
   m = /([a-z0-9]{5})-([a-z0-9]{5})-([a-z0-9]{15})/.match uuid
   if !m
     if /^[a-f0-9]{32}/.match uuid
@@ -279,6 +272,11 @@ def arv_edit client, arvados, global_opts, remaining_opts
     abort "Could not determine resource type #{m[2]}"
   end
 
+  return rsc
+end
+
+def fetch_rsc_obj client, arvados, rsc, uuid, remaining_opts
+
   begin
     result = client.execute(:api_method => eval('arvados.' + rsc + '.get'),
                             :parameters => {"uuid" => uuid},
@@ -286,15 +284,45 @@ def arv_edit client, arvados, global_opts, remaining_opts
                             :headers => {
                               authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
                             })
-    oldobj = check_response result
+    obj = check_response result
   rescue => e
     abort "Server error: #{e}"
   end
 
   if remaining_opts.length > 0
-    oldobj.select! { |k, v| remaining_opts.include? k }
+    obj.select! { |k, v| remaining_opts.include? k }
+  end
+
+  return obj
+end
+
+def get_obj_content obj, global_opts
+  content = case global_opts[:format]
+            when 'json'
+              Oj.dump(obj, :indent => 1)
+            when 'yaml'
+              obj.to_yaml
+            else
+              abort "Unrecognized format #{global_opts[:format]}"
+            end
+  return content
+end
+
+def arv_edit client, arvados, global_opts, remaining_opts
+  uuid = remaining_opts.shift
+  if uuid.nil? or uuid == "-h" or uuid == "--help"
+    puts head_banner
+    puts "Usage: arv edit [uuid] [fields...]\n\n"
+    puts "Fetch the specified Arvados object, select the specified fields, \n"
+    puts "open an interactive text editor on a text representation (json or\n"
+    puts "yaml, use --format) and then update the object.  Will use 'nano'\n"
+    puts "by default, customize with the EDITOR or VISUAL environment variable.\n"
+    exit 255
   end
 
+  rsc = lookup_uuid_rsc arvados, uuid
+  oldobj = fetch_rsc_obj client, arvados, rsc, uuid, remaining_opts
+
   edit_and_commit_object oldobj, uuid, global_opts do |newobj|
     newobj.select! {|k| newobj[k] != oldobj[k]}
     if !newobj.empty?
@@ -315,6 +343,24 @@ def arv_edit client, arvados, global_opts, remaining_opts
   exit 0
 end
 
+def arv_get client, arvados, global_opts, remaining_opts
+  uuid = remaining_opts.shift
+  if uuid.nil? or uuid == "-h" or uuid == "--help"
+    puts head_banner
+    puts "Usage: arv [--format json|yaml] get [uuid] [fields...]\n\n"
+    puts "Fetch the specified Arvados object, select the specified fields,\n"
+    puts "and print a text representation.\n"
+    exit 255
+  end
+
+  rsc = lookup_uuid_rsc arvados, uuid
+  obj = fetch_rsc_obj client, arvados, rsc, uuid, remaining_opts
+  content = get_obj_content obj, global_opts
+
+  puts content
+  exit 0
+end
+
 def arv_create client, arvados, global_opts, remaining_opts
   types = resource_types(arvados.discovery_document)
   create_opts = Trollop::options do
index 5e58014cbfa10d3b9b67a8b7cddca8b8676f646c..2e2bba562f1e7dedc09fd3f4491da424cb7a5850 100644 (file)
 require 'minitest/autorun'
-require 'digest/md5'
+require 'json'
+require 'yaml'
 
+# Black box tests for 'arv get' command.
 class TestArvGet < Minitest::Test
-  def setup
-    begin
-      Dir.mkdir './tmp'
-    rescue Errno::EEXIST
-    end
-    @@foo_manifest_locator ||= `echo -n foo | ./bin/arv-put --filename foo --no-progress -`.strip
-    @@baz_locator ||= `echo -n baz | ./bin/arv-put --as-raw --no-progress -`.strip
-    @@multilevel_manifest_locator ||= `echo ./foo/bar #{@@baz_locator} 0:3:baz | ./bin/arv-put --as-raw --no-progress -`.strip
-  end
+  # UUID for an Arvados object that does not exist
+  NON_EXISTENT_OBJECT_UUID = "zzzzz-zzzzz-zzzzzzzzzzzzzzz"
+  # Name of field of Arvados object that can store any (textual) value
+  STORED_VALUE_FIELD_NAME = "name"
+  # Name of UUID field of Arvados object
+  UUID_FIELD_NAME = "uuid"
+  # Name of an invalid field of Arvados object
+  INVALID_FIELD_NAME = "invalid"
 
-  def test_no_args
+  # Tests that a valid Arvados object can be retrieved in a supported format
+  # using: `arv get [uuid]`. Given all other `arv foo` commands return JSON
+  # when no format is specified, JSON should be expected in this case.
+  def test_get_valid_object_no_format_specified
+    stored_value = __method__.to_s
+    uuid = create_arv_object_with_value(stored_value)
     out, err = capture_subprocess_io do
-      assert_arv_get false
+      assert(arv_get_default(uuid))
     end
-    assert_equal '', out
-    assert_match /^usage:/, err
+    assert_empty(err, "Error text not expected: '#{err}'")
+    arv_object = parse_json_arv_object(out)
+    assert(has_field_with_value(arv_object, STORED_VALUE_FIELD_NAME, stored_value))
   end
 
-  def test_help
+  # Tests that a valid Arvados object can be retrieved in JSON format using:
+  # `arv get [uuid] --format json`.
+  def test_get_valid_object_json_format_specified
+    stored_value = __method__.to_s
+    uuid = create_arv_object_with_value(stored_value)
     out, err = capture_subprocess_io do
-      assert_arv_get '-h'
+      assert(arv_get_json(uuid))
     end
-    $stderr.write err
-    assert_equal '', err
-    assert_match /^usage:/, out
+    assert_empty(err, "Error text not expected: '#{err}'")
+    arv_object = parse_json_arv_object(out)
+    assert(has_field_with_value(arv_object, STORED_VALUE_FIELD_NAME, stored_value))
   end
 
-  def test_file_to_dev_stdout
-    test_file_to_stdout('/dev/stdout')
-  end
-
-  def test_file_to_stdout(specify_stdout_as='-')
+  # Tests that a valid Arvados object can be retrieved in YAML format using:
+  # `arv get [uuid] --format yaml`.
+  def test_get_valid_object_yaml_format_specified
+    stored_value = __method__.to_s
+    uuid = create_arv_object_with_value(stored_value)
     out, err = capture_subprocess_io do
-      assert_arv_get @@foo_manifest_locator + '/foo', specify_stdout_as
+      assert(arv_get_yaml(uuid))
     end
-    assert_equal '', err
-    assert_equal 'foo', out
+    assert_empty(err, "Error text not expected: '#{err}'")
+    arv_object = parse_yaml_arv_object(out)
+    assert(has_field_with_value(arv_object, STORED_VALUE_FIELD_NAME, stored_value))
   end
 
-  def test_file_to_file
-    remove_tmp_foo
+  # Tests that a subset of all fields of a valid Arvados object can be retrieved
+  # using: `arv get [uuid] [fields...]`.
+  def test_get_valid_object_with_valid_fields
+    stored_value = __method__.to_s
+    uuid = create_arv_object_with_value(stored_value)
     out, err = capture_subprocess_io do
-      assert_arv_get @@foo_manifest_locator + '/foo', 'tmp/foo'
+      assert(arv_get_json(uuid, STORED_VALUE_FIELD_NAME, UUID_FIELD_NAME))
     end
-    assert_equal '', err
-    assert_equal '', out
-    assert_equal 'foo', IO.read('tmp/foo')
+    assert_empty(err, "Error text not expected: '#{err}'")
+    arv_object = parse_json_arv_object(out)
+    assert(has_field_with_value(arv_object, STORED_VALUE_FIELD_NAME, stored_value))
+    assert(has_field_with_value(arv_object, UUID_FIELD_NAME, uuid))
   end
 
-  def test_file_to_file_no_overwrite_file
-    File.open './tmp/foo', 'wb' do |f|
-      f.write 'baz'
-    end
+  # Tests that the valid field is retrieved when both a valid and invalid field
+  # are requested from a valid Arvados object, using:
+  # `arv get [uuid] [fields...]`.
+  def test_get_valid_object_with_both_valid_and_invalid_fields
+    stored_value = __method__.to_s
+    uuid = create_arv_object_with_value(stored_value)
     out, err = capture_subprocess_io do
-      assert_arv_get false, @@foo_manifest_locator + '/foo', 'tmp/foo'
+      assert(arv_get_json(uuid, STORED_VALUE_FIELD_NAME, INVALID_FIELD_NAME))
     end
-    assert_match /Local file tmp\/foo already exists/, err
-    assert_equal '', out
-    assert_equal 'baz', IO.read('tmp/foo')
+    assert_empty(err, "Error text not expected: '#{err}'")
+    arv_object = parse_json_arv_object(out)
+    assert(has_field_with_value(arv_object, STORED_VALUE_FIELD_NAME, stored_value))
+    refute(has_field_with_value(arv_object, INVALID_FIELD_NAME, stored_value))
   end
 
-  def test_file_to_file_no_overwrite_file_in_dir
-    File.open './tmp/foo', 'wb' do |f|
-      f.write 'baz'
-    end
+  # Tests that no fields are retreived when no valid fields are requested from
+  # a valid Arvados object, using: `arv get [uuid] [fields...]`.
+  def test_get_valid_object_with_no_valid_fields
+    stored_value = __method__.to_s
+    uuid = create_arv_object_with_value(stored_value)
     out, err = capture_subprocess_io do
-      assert_arv_get false, @@foo_manifest_locator + '/', 'tmp/'
+      assert(arv_get_json(uuid, INVALID_FIELD_NAME))
     end
-    assert_match /Local file tmp\/foo already exists/, err
-    assert_equal '', out
-    assert_equal 'baz', IO.read('tmp/foo')
+    assert_empty(err, "Error text not expected: '#{err}'")
+    arv_object = parse_json_arv_object(out)
+    assert_equal(0, arv_object.length)
   end
 
-  def test_file_to_file_force_overwrite
-    File.open './tmp/foo', 'wb' do |f|
-      f.write 'baz'
-    end
-    assert_equal 'baz', IO.read('tmp/foo')
+  # Tests that an invalid (non-existent) Arvados object is not retrieved using:
+  # using: `arv get [non-existent-uuid]`.
+  def test_get_invalid_object
     out, err = capture_subprocess_io do
-      assert_arv_get '-f', @@foo_manifest_locator + '/', 'tmp/'
+      refute(arv_get_json(NON_EXISTENT_OBJECT_UUID))
     end
-    assert_match '', err
-    assert_equal '', out
-    assert_equal 'foo', IO.read('tmp/foo')
+    refute_empty(err, "Expected error feedback on request for invalid object")
+    assert_empty(out)
   end
 
-  def test_file_to_file_skip_existing
-    File.open './tmp/foo', 'wb' do |f|
-      f.write 'baz'
-    end
-    assert_equal 'baz', IO.read('tmp/foo')
+  # Tests that help text exists using: `arv get --help`.
+  def test_help_exists
     out, err = capture_subprocess_io do
-      assert_arv_get '--skip-existing', @@foo_manifest_locator + '/', 'tmp/'
+#      assert(arv_get_default("--help"), "Expected exit code 0: #{$?}")
+       #XXX: Exit code given is 255. It probably should be 0, which seems to be
+       #     standard elsewhere. However, 255 is in line with other `arv`
+       #     commands (e.g. see `arv edit`) so ignoring the problem here.
+       arv_get_default("--help")
     end
-    assert_match '', err
-    assert_equal '', out
-    assert_equal 'baz', IO.read('tmp/foo')
+    assert_empty(err, "Error text not expected: '#{err}'")
+    refute_empty(out, "Help text should be given")
   end
 
-  def test_file_to_dir
-    remove_tmp_foo
-    out, err = capture_subprocess_io do
-      assert_arv_get @@foo_manifest_locator + '/foo', 'tmp/'
-    end
-    assert_equal '', err
-    assert_equal '', out
-    assert_equal 'foo', IO.read('tmp/foo')
-  end
-
-  def test_dir_to_file
-    out, err = capture_subprocess_io do
-      assert_arv_get false, @@foo_manifest_locator + '/', 'tmp/foo'
-    end
-    assert_equal '', out
-    assert_match /^usage:/, err
-  end
-
-  def test_dir_to_empty_string
-    out, err = capture_subprocess_io do
-      assert_arv_get false, @@foo_manifest_locator + '/', ''
-    end
-    assert_equal '', out
-    assert_match /^usage:/, err
-  end
-
-  def test_nonexistent_block
-    out, err = capture_subprocess_io do
-      assert_arv_get false, 'e796ab2294f3e48ec709ffa8d6daf58c'
-    end
-    assert_equal '', out
-    assert_match /Error:/, err
-  end
-
-  def test_nonexistent_manifest
-    out, err = capture_subprocess_io do
-      assert_arv_get false, 'acbd18db4cc2f85cedef654fccc4a4d8/', 'tmp/'
-    end
-    assert_equal '', out
-    assert_match /Error:/, err
-  end
-
-  def test_manifest_root_to_dir
-    remove_tmp_foo
-    out, err = capture_subprocess_io do
-      assert_arv_get '-r', @@foo_manifest_locator + '/', 'tmp/'
-    end
-    assert_equal '', err
-    assert_equal '', out
-    assert_equal 'foo', IO.read('tmp/foo')
-  end
-
-  def test_manifest_root_to_dir_noslash
-    remove_tmp_foo
-    out, err = capture_subprocess_io do
-      assert_arv_get '-r', @@foo_manifest_locator + '/', 'tmp'
-    end
-    assert_equal '', err
-    assert_equal '', out
-    assert_equal 'foo', IO.read('tmp/foo')
-  end
-
-  def test_display_md5sum
-    remove_tmp_foo
-    out, err = capture_subprocess_io do
-      assert_arv_get '-r', '--md5sum', @@foo_manifest_locator + '/', 'tmp/'
-    end
-    assert_equal "#{Digest::MD5.hexdigest('foo')}  ./foo\n", err
-    assert_equal '', out
-    assert_equal 'foo', IO.read('tmp/foo')
+  protected
+  # Runs 'arv get <varargs>' with given arguments. Returns whether the exit
+  # status was 0 (i.e. success). Use $? to attain more details on failure.
+  def arv_get_default(*args)
+    return system("arv", "get", *args)
   end
 
-  def test_md5sum_nowrite
-    remove_tmp_foo
-    out, err = capture_subprocess_io do
-      assert_arv_get '-n', '--md5sum', @@foo_manifest_locator + '/', 'tmp/'
-    end
-    assert_equal "#{Digest::MD5.hexdigest('foo')}  ./foo\n", err
-    assert_equal '', out
-    assert_equal false, File.exists?('tmp/foo')
+  # Runs 'arv --format json get <varargs>' with given arguments. Returns whether
+  # the exit status was 0 (i.e. success). Use $? to attain more details on
+  # failure.
+  def arv_get_json(*args)
+    return system("arv", "--format", "json", "get", *args)
   end
 
-  def test_sha1_nowrite
-    remove_tmp_foo
-    out, err = capture_subprocess_io do
-      assert_arv_get '-n', '-r', '--hash', 'sha1', @@foo_manifest_locator+'/', 'tmp/'
-    end
-    assert_equal "#{Digest::SHA1.hexdigest('foo')}  ./foo\n", err
-    assert_equal '', out
-    assert_equal false, File.exists?('tmp/foo')
+  # Runs 'arv --format yaml get <varargs>' with given arguments. Returns whether
+  # the exit status was 0 (i.e. success). Use $? to attain more details on
+  # failure.
+  def arv_get_yaml(*args)
+    return system("arv", "--format", "yaml", "get", *args)
   end
 
-  def test_block_to_file
-    remove_tmp_foo
+  # Creates an Arvados object that stores a given value. Returns the uuid of the
+  # created object.
+  def create_arv_object_with_value(value)
     out, err = capture_subprocess_io do
-      assert_arv_get @@foo_manifest_locator, 'tmp/foo'
+      system("arv", "tag", "add", value, "--object", "testing")
+      assert $?.success?, "Command failure running `arv tag`: #{$?}"
     end
     assert_equal '', err
-    assert_equal '', out
-
-    digest = Digest::MD5.hexdigest('foo')
-    !(IO.read('tmp/foo')).gsub!( /^(. #{digest}+3)(.*)( 0:3:foo)$/).nil?
+    assert_operator 0, :<, out.strip.length
+    out.strip
   end
 
-  def test_create_directory_tree
-    `rm -rf ./tmp/arv-get-test/`
-    Dir.mkdir './tmp/arv-get-test'
-    out, err = capture_subprocess_io do
-      assert_arv_get @@multilevel_manifest_locator + '/', 'tmp/arv-get-test/'
+  # Parses the given JSON representation of an Arvados object, returning
+  # an equivalent Ruby representation (a hash map).
+  def parse_json_arv_object(arvObjectAsJson)
+    begin
+      parsed = JSON.parse(arvObjectAsJson)
+      assert(parsed.instance_of?(Hash))
+      return parsed
+    rescue JSON::ParserError => e
+      raise "Invalid JSON representation of Arvados object.\n" \
+            "Parse error: '#{e}'\n" \
+            "JSON: '#{arvObjectAsJson}'\n"
     end
-    assert_equal '', err
-    assert_equal '', out
-    assert_equal 'baz', IO.read('tmp/arv-get-test/foo/bar/baz')
   end
 
-  def test_create_partial_directory_tree
-    `rm -rf ./tmp/arv-get-test/`
-    Dir.mkdir './tmp/arv-get-test'
-    out, err = capture_subprocess_io do
-      assert_arv_get(@@multilevel_manifest_locator + '/foo/',
-                     'tmp/arv-get-test/')
+  # Parses the given JSON representation of an Arvados object, returning
+  # an equivalent Ruby representation (a hash map).
+  def parse_yaml_arv_object(arvObjectAsYaml)
+    begin
+      parsed = YAML.load(arvObjectAsYaml)
+      assert(parsed.instance_of?(Hash))
+      return parsed
+    rescue
+      raise "Invalid YAML representation of Arvados object.\n" \
+            "YAML: '#{arvObjectAsYaml}'\n"
     end
-    assert_equal '', err
-    assert_equal '', out
-    assert_equal 'baz', IO.read('tmp/arv-get-test/bar/baz')
   end
 
-  protected
-  def assert_arv_get(*args)
-    expect = case args.first
-             when true, false
-               args.shift
-             else
-               true
-             end
-    assert_equal(expect,
-                 system(['./bin/arv-get', 'arv-get'], *args),
-                 "`arv-get #{args.join ' '}` " +
-                 "should exit #{if expect then 0 else 'non-zero' end}")
-  end
-
-  def remove_tmp_foo
-    begin
-      File.unlink('tmp/foo')
-    rescue Errno::ENOENT
+  # Checks whether the given Arvados object has the given expected value for the
+  # specified field.
+  def has_field_with_value(arvObjectAsHash, fieldName, expectedValue)
+    if !arvObjectAsHash.has_key?(fieldName)
+      return false
     end
+    return (arvObjectAsHash[fieldName] == expectedValue)
   end
 end
diff --git a/sdk/cli/test/test_arv-keep-get.rb b/sdk/cli/test/test_arv-keep-get.rb
new file mode 100644 (file)
index 0000000..0e578b8
--- /dev/null
@@ -0,0 +1,251 @@
+require 'minitest/autorun'
+require 'digest/md5'
+
+class TestArvKeepGet < Minitest::Test
+  def setup
+    begin
+      Dir.mkdir './tmp'
+    rescue Errno::EEXIST
+    end
+    @@foo_manifest_locator ||= `echo -n foo | ./bin/arv-put --filename foo --no-progress -`.strip
+    @@baz_locator ||= `echo -n baz | ./bin/arv-put --as-raw --no-progress -`.strip
+    @@multilevel_manifest_locator ||= `echo ./foo/bar #{@@baz_locator} 0:3:baz | ./bin/arv-put --as-raw --no-progress -`.strip
+  end
+
+  def test_no_args
+    out, err = capture_subprocess_io do
+      assert_arv_get false
+    end
+    assert_equal '', out
+    assert_match /^usage:/, err
+  end
+
+  def test_help
+    out, err = capture_subprocess_io do
+      assert_arv_get '-h'
+    end
+    $stderr.write err
+    assert_equal '', err
+    assert_match /^usage:/, out
+  end
+
+  def test_file_to_dev_stdout
+    test_file_to_stdout('/dev/stdout')
+  end
+
+  def test_file_to_stdout(specify_stdout_as='-')
+    out, err = capture_subprocess_io do
+      assert_arv_get @@foo_manifest_locator + '/foo', specify_stdout_as
+    end
+    assert_equal '', err
+    assert_equal 'foo', out
+  end
+
+  def test_file_to_file
+    remove_tmp_foo
+    out, err = capture_subprocess_io do
+      assert_arv_get @@foo_manifest_locator + '/foo', 'tmp/foo'
+    end
+    assert_equal '', err
+    assert_equal '', out
+    assert_equal 'foo', IO.read('tmp/foo')
+  end
+
+  def test_file_to_file_no_overwrite_file
+    File.open './tmp/foo', 'wb' do |f|
+      f.write 'baz'
+    end
+    out, err = capture_subprocess_io do
+      assert_arv_get false, @@foo_manifest_locator + '/foo', 'tmp/foo'
+    end
+    assert_match /Local file tmp\/foo already exists/, err
+    assert_equal '', out
+    assert_equal 'baz', IO.read('tmp/foo')
+  end
+
+  def test_file_to_file_no_overwrite_file_in_dir
+    File.open './tmp/foo', 'wb' do |f|
+      f.write 'baz'
+    end
+    out, err = capture_subprocess_io do
+      assert_arv_get false, @@foo_manifest_locator + '/', 'tmp/'
+    end
+    assert_match /Local file tmp\/foo already exists/, err
+    assert_equal '', out
+    assert_equal 'baz', IO.read('tmp/foo')
+  end
+
+  def test_file_to_file_force_overwrite
+    File.open './tmp/foo', 'wb' do |f|
+      f.write 'baz'
+    end
+    assert_equal 'baz', IO.read('tmp/foo')
+    out, err = capture_subprocess_io do
+      assert_arv_get '-f', @@foo_manifest_locator + '/', 'tmp/'
+    end
+    assert_match '', err
+    assert_equal '', out
+    assert_equal 'foo', IO.read('tmp/foo')
+  end
+
+  def test_file_to_file_skip_existing
+    File.open './tmp/foo', 'wb' do |f|
+      f.write 'baz'
+    end
+    assert_equal 'baz', IO.read('tmp/foo')
+    out, err = capture_subprocess_io do
+      assert_arv_get '--skip-existing', @@foo_manifest_locator + '/', 'tmp/'
+    end
+    assert_match '', err
+    assert_equal '', out
+    assert_equal 'baz', IO.read('tmp/foo')
+  end
+
+  def test_file_to_dir
+    remove_tmp_foo
+    out, err = capture_subprocess_io do
+      assert_arv_get @@foo_manifest_locator + '/foo', 'tmp/'
+    end
+    assert_equal '', err
+    assert_equal '', out
+    assert_equal 'foo', IO.read('tmp/foo')
+  end
+
+  def test_dir_to_file
+    out, err = capture_subprocess_io do
+      assert_arv_get false, @@foo_manifest_locator + '/', 'tmp/foo'
+    end
+    assert_equal '', out
+    assert_match /^usage:/, err
+  end
+
+  def test_dir_to_empty_string
+    out, err = capture_subprocess_io do
+      assert_arv_get false, @@foo_manifest_locator + '/', ''
+    end
+    assert_equal '', out
+    assert_match /^usage:/, err
+  end
+
+  def test_nonexistent_block
+    out, err = capture_subprocess_io do
+      assert_arv_get false, 'e796ab2294f3e48ec709ffa8d6daf58c'
+    end
+    assert_equal '', out
+    assert_match /Error:/, err
+  end
+
+  def test_nonexistent_manifest
+    out, err = capture_subprocess_io do
+      assert_arv_get false, 'acbd18db4cc2f85cedef654fccc4a4d8/', 'tmp/'
+    end
+    assert_equal '', out
+    assert_match /Error:/, err
+  end
+
+  def test_manifest_root_to_dir
+    remove_tmp_foo
+    out, err = capture_subprocess_io do
+      assert_arv_get '-r', @@foo_manifest_locator + '/', 'tmp/'
+    end
+    assert_equal '', err
+    assert_equal '', out
+    assert_equal 'foo', IO.read('tmp/foo')
+  end
+
+  def test_manifest_root_to_dir_noslash
+    remove_tmp_foo
+    out, err = capture_subprocess_io do
+      assert_arv_get '-r', @@foo_manifest_locator + '/', 'tmp'
+    end
+    assert_equal '', err
+    assert_equal '', out
+    assert_equal 'foo', IO.read('tmp/foo')
+  end
+
+  def test_display_md5sum
+    remove_tmp_foo
+    out, err = capture_subprocess_io do
+      assert_arv_get '-r', '--md5sum', @@foo_manifest_locator + '/', 'tmp/'
+    end
+    assert_equal "#{Digest::MD5.hexdigest('foo')}  ./foo\n", err
+    assert_equal '', out
+    assert_equal 'foo', IO.read('tmp/foo')
+  end
+
+  def test_md5sum_nowrite
+    remove_tmp_foo
+    out, err = capture_subprocess_io do
+      assert_arv_get '-n', '--md5sum', @@foo_manifest_locator + '/', 'tmp/'
+    end
+    assert_equal "#{Digest::MD5.hexdigest('foo')}  ./foo\n", err
+    assert_equal '', out
+    assert_equal false, File.exists?('tmp/foo')
+  end
+
+  def test_sha1_nowrite
+    remove_tmp_foo
+    out, err = capture_subprocess_io do
+      assert_arv_get '-n', '-r', '--hash', 'sha1', @@foo_manifest_locator+'/', 'tmp/'
+    end
+    assert_equal "#{Digest::SHA1.hexdigest('foo')}  ./foo\n", err
+    assert_equal '', out
+    assert_equal false, File.exists?('tmp/foo')
+  end
+
+  def test_block_to_file
+    remove_tmp_foo
+    out, err = capture_subprocess_io do
+      assert_arv_get @@foo_manifest_locator, 'tmp/foo'
+    end
+    assert_equal '', err
+    assert_equal '', out
+
+    digest = Digest::MD5.hexdigest('foo')
+    !(IO.read('tmp/foo')).gsub!( /^(. #{digest}+3)(.*)( 0:3:foo)$/).nil?
+  end
+
+  def test_create_directory_tree
+    `rm -rf ./tmp/arv-get-test/`
+    Dir.mkdir './tmp/arv-get-test'
+    out, err = capture_subprocess_io do
+      assert_arv_get @@multilevel_manifest_locator + '/', 'tmp/arv-get-test/'
+    end
+    assert_equal '', err
+    assert_equal '', out
+    assert_equal 'baz', IO.read('tmp/arv-get-test/foo/bar/baz')
+  end
+
+  def test_create_partial_directory_tree
+    `rm -rf ./tmp/arv-get-test/`
+    Dir.mkdir './tmp/arv-get-test'
+    out, err = capture_subprocess_io do
+      assert_arv_get(@@multilevel_manifest_locator + '/foo/',
+                     'tmp/arv-get-test/')
+    end
+    assert_equal '', err
+    assert_equal '', out
+    assert_equal 'baz', IO.read('tmp/arv-get-test/bar/baz')
+  end
+
+  protected
+  def assert_arv_get(*args)
+    expect = case args.first
+             when true, false
+               args.shift
+             else
+               true
+             end
+    assert_equal(expect,
+                 system(['./bin/arv-get', 'arv-get'], *args),
+                 "`arv-get #{args.join ' '}` " +
+                 "should exit #{if expect then 0 else 'non-zero' end}")
+  end
+
+  def remove_tmp_foo
+    begin
+      File.unlink('tmp/foo')
+    rescue Errno::ENOENT
+    end
+  end
+end
similarity index 99%
rename from sdk/cli/test/test_arv-put.rb
rename to sdk/cli/test/test_arv-keep-put.rb
index 2f20e18440a2ff61dde6b748d3b327587530b142..fefbc2729875e70cb890f69d56fe1d7f1c614b8d 100644 (file)
@@ -1,7 +1,7 @@
 require 'minitest/autorun'
 require 'digest/md5'
 
-class TestArvPut < Minitest::Test
+class TestArvKeepPut < Minitest::Test
   def setup
     begin Dir.mkdir './tmp' rescue Errno::EEXIST end
     begin Dir.mkdir './tmp/empty_dir' rescue Errno::EEXIST end
index d35f6dacb72b632e18718b503d0a2f4ff55e7a17..2c508dcb4a1100cf4609fc3ff3387ac089c6ab26 100644 (file)
@@ -21,7 +21,7 @@ type ServerRequiredSuite struct{}
 
 func (s *ServerRequiredSuite) SetUpSuite(c *C) {
        arvadostest.StartAPI()
-       arvadostest.StartKeep()
+       arvadostest.StartKeep(2, false)
 }
 
 func (s *ServerRequiredSuite) SetUpTest(c *C) {
index cad16917dba286504f6693cac3a3fbd4d05a741e..27c552a4e104094ca2ed15991e310a3b7e9cd65e 100644 (file)
@@ -9,6 +9,7 @@ import (
        "log"
        "os"
        "os/exec"
+       "strconv"
        "strings"
 )
 
@@ -98,12 +99,21 @@ func StopAPI() {
        exec.Command("python", "run_test_server.py", "stop").Run()
 }
 
-func StartKeep() {
+// StartKeep starts the given number of keep servers,
+// optionally with -enforce-permissions enabled.
+// Use numKeepServers = 2 and enforcePermissions = false under all normal circumstances.
+func StartKeep(numKeepServers int, enforcePermissions bool) {
        cwd, _ := os.Getwd()
        defer os.Chdir(cwd)
        chdirToPythonTests()
 
-       cmd := exec.Command("python", "run_test_server.py", "start_keep")
+       cmdArgs := []string{"run_test_server.py", "start_keep", "--num-keep-servers", strconv.Itoa(numKeepServers)}
+       if enforcePermissions {
+               cmdArgs = append(cmdArgs, "--keep-enforce-permissions")
+       }
+
+       cmd := exec.Command("python", cmdArgs...)
+
        stderr, err := cmd.StderrPipe()
        if err != nil {
                log.Fatalf("Setting up stderr pipe: %s", err)
@@ -114,10 +124,13 @@ func StartKeep() {
        }
 }
 
-func StopKeep() {
+// StopKeep stops keep servers that were started with StartKeep.
+// numkeepServers should be the same value that was passed to StartKeep,
+// which is 2 under all normal circumstances.
+func StopKeep(numKeepServers int) {
        cwd, _ := os.Getwd()
        defer os.Chdir(cwd)
        chdirToPythonTests()
 
-       exec.Command("python", "run_test_server.py", "stop_keep").Run()
+       exec.Command("python", "run_test_server.py", "stop_keep", "--num-keep-servers", strconv.Itoa(numKeepServers))
 }
index 8b7cf419ead2f56c7c124799eadd0479dc69eac6..67c304deaf3ae54b2668cb8c2f2856e909da8c5a 100644 (file)
@@ -13,7 +13,6 @@ import (
        "io/ioutil"
        "log"
        "net/http"
-       "os"
        "regexp"
        "strconv"
        "strings"
@@ -49,24 +48,39 @@ type KeepClient struct {
        gatewayRoots       *map[string]string
        lock               sync.RWMutex
        Client             *http.Client
+       Retries            int
 
        // set to 1 if all writable services are of disk type, otherwise 0
        replicasPerService int
 }
 
-// Create a new KeepClient.  This will contact the API server to discover Keep
-// servers.
+// MakeKeepClient creates a new KeepClient by contacting the API server to discover Keep servers.
 func MakeKeepClient(arv *arvadosclient.ArvadosClient) (*KeepClient, error) {
-       var matchTrue = regexp.MustCompile("^(?i:1|yes|true)$")
-       insecure := matchTrue.MatchString(os.Getenv("ARVADOS_API_HOST_INSECURE"))
+       kc := New(arv)
+       return kc, kc.DiscoverKeepServers()
+}
+
+// New func creates a new KeepClient struct.
+// This func does not discover keep servers. It is the caller's responsibility.
+func New(arv *arvadosclient.ArvadosClient) *KeepClient {
+       defaultReplicationLevel := 2
+       value, err := arv.Discovery("defaultCollectionReplication")
+       if err == nil {
+               v, ok := value.(float64)
+               if ok && v > 0 {
+                       defaultReplicationLevel = int(v)
+               }
+       }
+
        kc := &KeepClient{
                Arvados:       arv,
-               Want_replicas: 2,
+               Want_replicas: defaultReplicationLevel,
                Using_proxy:   false,
                Client: &http.Client{Transport: &http.Transport{
-                       TLSClientConfig: &tls.Config{InsecureSkipVerify: insecure}}},
+                       TLSClientConfig: &tls.Config{InsecureSkipVerify: arv.ApiInsecure}}},
+               Retries: 2,
        }
-       return kc, kc.DiscoverKeepServers()
+       return kc
 }
 
 // Put a block given the block hash, a reader, and the number of bytes
@@ -127,6 +141,69 @@ func (kc *KeepClient) PutR(r io.Reader) (locator string, replicas int, err error
        }
 }
 
+func (kc *KeepClient) getOrHead(method string, locator string) (io.ReadCloser, int64, string, error) {
+       var errs []string
+
+       tries_remaining := 1 + kc.Retries
+       serversToTry := kc.getSortedRoots(locator)
+       var retryList []string
+
+       for tries_remaining > 0 {
+               tries_remaining -= 1
+               retryList = nil
+
+               for _, host := range serversToTry {
+                       url := host + "/" + locator
+
+                       req, err := http.NewRequest(method, url, nil)
+                       if err != nil {
+                               errs = append(errs, fmt.Sprintf("%s: %v", url, err))
+                               continue
+                       }
+                       req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", kc.Arvados.ApiToken))
+                       resp, err := kc.Client.Do(req)
+                       if err != nil {
+                               // Probably a network error, may be transient,
+                               // can try again.
+                               errs = append(errs, fmt.Sprintf("%s: %v", url, err))
+                               retryList = append(retryList, host)
+                       } else if resp.StatusCode != http.StatusOK {
+                               var respbody []byte
+                               respbody, _ = ioutil.ReadAll(&io.LimitedReader{resp.Body, 4096})
+                               resp.Body.Close()
+                               errs = append(errs, fmt.Sprintf("%s: HTTP %d %q",
+                                       url, resp.StatusCode, bytes.TrimSpace(respbody)))
+
+                               if resp.StatusCode == 408 ||
+                                       resp.StatusCode == 429 ||
+                                       resp.StatusCode >= 500 {
+                                       // Timeout, too many requests, or other
+                                       // server side failure, transient
+                                       // error, can try again.
+                                       retryList = append(retryList, host)
+                               }
+                       } else {
+                               // Success.
+                               if method == "GET" {
+                                       return HashCheckingReader{
+                                               Reader: resp.Body,
+                                               Hash:   md5.New(),
+                                               Check:  locator[0:32],
+                                       }, resp.ContentLength, url, nil
+                               } else {
+                                       resp.Body.Close()
+                                       return nil, resp.ContentLength, url, nil
+                               }
+                       }
+
+               }
+               serversToTry = retryList
+       }
+       log.Printf("DEBUG: %s %s failed: %v", method, locator, errs)
+
+       return nil, 0, "", BlockNotFound
+}
+
 // Get() retrieves a block, given a locator. Returns a reader, the
 // expected data length, the URL the block is being fetched from, and
 // an error.
@@ -135,34 +212,7 @@ func (kc *KeepClient) PutR(r io.Reader) (locator string, replicas int, err error
 // reader returned by this method will return a BadChecksum error
 // instead of EOF.
 func (kc *KeepClient) Get(locator string) (io.ReadCloser, int64, string, error) {
-       var errs []string
-       for _, host := range kc.getSortedRoots(locator) {
-               url := host + "/" + locator
-               req, err := http.NewRequest("GET", url, nil)
-               if err != nil {
-                       errs = append(errs, fmt.Sprintf("%s: %v", url, err))
-                       continue
-               }
-               req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", kc.Arvados.ApiToken))
-               resp, err := kc.Client.Do(req)
-               if err != nil {
-                       errs = append(errs, fmt.Sprintf("%s: %v", url, err))
-                       continue
-               } else if resp.StatusCode != http.StatusOK {
-                       respbody, _ := ioutil.ReadAll(&io.LimitedReader{resp.Body, 4096})
-                       resp.Body.Close()
-                       errs = append(errs, fmt.Sprintf("%s: HTTP %d %q",
-                               url, resp.StatusCode, bytes.TrimSpace(respbody)))
-                       continue
-               }
-               return HashCheckingReader{
-                       Reader: resp.Body,
-                       Hash:   md5.New(),
-                       Check:  locator[0:32],
-               }, resp.ContentLength, url, nil
-       }
-       log.Printf("DEBUG: GET %s failed: %v", locator, errs)
-       return nil, 0, "", BlockNotFound
+       return kc.getOrHead("GET", locator)
 }
 
 // Ask() verifies that a block with the given hash is available and
@@ -173,18 +223,8 @@ func (kc *KeepClient) Get(locator string) (io.ReadCloser, int64, string, error)
 // Returns the data size (content length) reported by the Keep service
 // and the URI reporting the data size.
 func (kc *KeepClient) Ask(locator string) (int64, string, error) {
-       for _, host := range kc.getSortedRoots(locator) {
-               url := host + "/" + locator
-               req, err := http.NewRequest("HEAD", url, nil)
-               if err != nil {
-                       continue
-               }
-               req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", kc.Arvados.ApiToken))
-               if resp, err := kc.Client.Do(req); err == nil && resp.StatusCode == http.StatusOK {
-                       return resp.ContentLength, url, nil
-               }
-       }
-       return 0, "", BlockNotFound
+       _, size, url, err := kc.getOrHead("HEAD", locator)
+       return size, url, err
 }
 
 // GetIndex retrieves a list of blocks stored on the given server whose hashes
index ee60d2822f2f95b439101b974d9a11001831b346..aaba0695f0d0751c370fbd42741c3f6e77b9fbce 100644 (file)
@@ -45,14 +45,14 @@ func (s *ServerRequiredSuite) SetUpSuite(c *C) {
                return
        }
        arvadostest.StartAPI()
-       arvadostest.StartKeep()
+       arvadostest.StartKeep(2, false)
 }
 
 func (s *ServerRequiredSuite) TearDownSuite(c *C) {
        if *no_server {
                return
        }
-       arvadostest.StopKeep()
+       arvadostest.StopKeep(2)
        arvadostest.StopAPI()
 }
 
@@ -69,6 +69,22 @@ func (s *ServerRequiredSuite) TestMakeKeepClient(c *C) {
        }
 }
 
+func (s *ServerRequiredSuite) TestDefaultReplications(c *C) {
+       arv, err := arvadosclient.MakeArvadosClient()
+       c.Assert(err, Equals, nil)
+
+       kc, err := MakeKeepClient(&arv)
+       c.Assert(kc.Want_replicas, Equals, 2)
+
+       arv.DiscoveryDoc["defaultCollectionReplication"] = 3.0
+       kc, err = MakeKeepClient(&arv)
+       c.Assert(kc.Want_replicas, Equals, 3)
+
+       arv.DiscoveryDoc["defaultCollectionReplication"] = 1.0
+       kc, err = MakeKeepClient(&arv)
+       c.Assert(kc.Want_replicas, Equals, 1)
+}
+
 type StubPutHandler struct {
        c              *C
        expectPath     string
@@ -184,6 +200,31 @@ func (fh FailHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
        fh.handled <- fmt.Sprintf("http://%s", req.Host)
 }
 
+type FailThenSucceedHandler struct {
+       handled        chan string
+       count          int
+       successhandler StubGetHandler
+}
+
+func (fh *FailThenSucceedHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+       if fh.count == 0 {
+               resp.WriteHeader(500)
+               fh.count += 1
+               fh.handled <- fmt.Sprintf("http://%s", req.Host)
+       } else {
+               fh.successhandler.ServeHTTP(resp, req)
+       }
+}
+
+type Error404Handler struct {
+       handled chan string
+}
+
+func (fh Error404Handler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+       resp.WriteHeader(404)
+       fh.handled <- fmt.Sprintf("http://%s", req.Host)
+}
+
 func (s *StandaloneSuite) TestFailedUploadToStubKeepServer(c *C) {
        log.Printf("TestFailedUploadToStubKeepServer")
 
@@ -464,7 +505,7 @@ func (s *StandaloneSuite) TestGet(c *C) {
        arv, err := arvadosclient.MakeArvadosClient()
        kc, _ := MakeKeepClient(&arv)
        arv.ApiToken = "abc123"
-       kc.SetServiceRoots(map[string]string{"x": ks.url}, map[string]string{ks.url: ""}, nil)
+       kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
 
        r, n, url2, err := kc.Get(hash)
        defer r.Close()
@@ -479,6 +520,26 @@ func (s *StandaloneSuite) TestGet(c *C) {
        log.Printf("TestGet done")
 }
 
+func (s *StandaloneSuite) TestGet404(c *C) {
+       hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
+
+       st := Error404Handler{make(chan string, 1)}
+
+       ks := RunFakeKeepServer(st)
+       defer ks.listener.Close()
+
+       arv, err := arvadosclient.MakeArvadosClient()
+       kc, _ := MakeKeepClient(&arv)
+       arv.ApiToken = "abc123"
+       kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
+
+       r, n, url2, err := kc.Get(hash)
+       c.Check(err, Equals, BlockNotFound)
+       c.Check(n, Equals, int64(0))
+       c.Check(url2, Equals, "")
+       c.Check(r, Equals, nil)
+}
+
 func (s *StandaloneSuite) TestGetFail(c *C) {
        hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
 
@@ -490,7 +551,52 @@ func (s *StandaloneSuite) TestGetFail(c *C) {
        arv, err := arvadosclient.MakeArvadosClient()
        kc, _ := MakeKeepClient(&arv)
        arv.ApiToken = "abc123"
-       kc.SetServiceRoots(map[string]string{"x": ks.url}, map[string]string{ks.url: ""}, nil)
+       kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
+
+       r, n, url2, err := kc.Get(hash)
+       c.Check(err, Equals, BlockNotFound)
+       c.Check(n, Equals, int64(0))
+       c.Check(url2, Equals, "")
+       c.Check(r, Equals, nil)
+}
+
+func (s *StandaloneSuite) TestGetFailRetry(c *C) {
+       hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
+
+       st := &FailThenSucceedHandler{make(chan string, 1), 0,
+               StubGetHandler{
+                       c,
+                       hash,
+                       "abc123",
+                       http.StatusOK,
+                       []byte("foo")}}
+
+       ks := RunFakeKeepServer(st)
+       defer ks.listener.Close()
+
+       arv, err := arvadosclient.MakeArvadosClient()
+       kc, _ := MakeKeepClient(&arv)
+       arv.ApiToken = "abc123"
+       kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
+
+       r, n, url2, err := kc.Get(hash)
+       defer r.Close()
+       c.Check(err, Equals, nil)
+       c.Check(n, Equals, int64(3))
+       c.Check(url2, Equals, fmt.Sprintf("%s/%s", ks.url, hash))
+
+       content, err2 := ioutil.ReadAll(r)
+       c.Check(err2, Equals, nil)
+       c.Check(content, DeepEquals, []byte("foo"))
+}
+
+func (s *StandaloneSuite) TestGetNetError(c *C) {
+       hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
+
+       arv, err := arvadosclient.MakeArvadosClient()
+       kc, _ := MakeKeepClient(&arv)
+       arv.ApiToken = "abc123"
+       kc.SetServiceRoots(map[string]string{"x": "http://localhost:62222"}, nil, nil)
 
        r, n, url2, err := kc.Get(hash)
        c.Check(err, Equals, BlockNotFound)
@@ -525,7 +631,7 @@ func (s *StandaloneSuite) TestGetWithServiceHint(c *C) {
        arv.ApiToken = "abc123"
        kc.SetServiceRoots(
                map[string]string{"x": ks0.url},
-               map[string]string{"x": ks0.url},
+               nil,
                map[string]string{uuid: ks.url})
 
        r, n, uri, err := kc.Get(hash + "+K@" + uuid)
@@ -572,11 +678,7 @@ func (s *StandaloneSuite) TestGetWithLocalServiceHint(c *C) {
                        "zzzzz-bi6l4-xxxxxxxxxxxxxxx": ks0.url,
                        "zzzzz-bi6l4-wwwwwwwwwwwwwww": ks0.url,
                        uuid: ks.url},
-               map[string]string{
-                       "zzzzz-bi6l4-yyyyyyyyyyyyyyy": ks0.url,
-                       "zzzzz-bi6l4-xxxxxxxxxxxxxxx": ks0.url,
-                       "zzzzz-bi6l4-wwwwwwwwwwwwwww": ks0.url,
-                       uuid: ks.url},
+               nil,
                map[string]string{
                        "zzzzz-bi6l4-yyyyyyyyyyyyyyy": ks0.url,
                        "zzzzz-bi6l4-xxxxxxxxxxxxxxx": ks0.url,
@@ -619,7 +721,7 @@ func (s *StandaloneSuite) TestGetWithServiceHintFailoverToLocals(c *C) {
        arv.ApiToken = "abc123"
        kc.SetServiceRoots(
                map[string]string{"zzzzz-bi6l4-keepdisk0000000": ksLocal.url},
-               map[string]string{"zzzzz-bi6l4-keepdisk0000000": ksLocal.url},
+               nil,
                map[string]string{uuid: ksGateway.url})
 
        r, n, uri, err := kc.Get(hash + "+K@" + uuid)
@@ -654,7 +756,7 @@ func (s *StandaloneSuite) TestChecksum(c *C) {
        arv, err := arvadosclient.MakeArvadosClient()
        kc, _ := MakeKeepClient(&arv)
        arv.ApiToken = "abc123"
-       kc.SetServiceRoots(map[string]string{"x": ks.url}, map[string]string{ks.url: ""}, nil)
+       kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
 
        r, n, _, err := kc.Get(barhash)
        _, err = ioutil.ReadAll(r)
@@ -675,7 +777,7 @@ func (s *StandaloneSuite) TestGetWithFailures(c *C) {
        content := []byte("waz")
        hash := fmt.Sprintf("%x", md5.Sum(content))
 
-       fh := FailHandler{
+       fh := Error404Handler{
                make(chan string, 4)}
 
        st := StubGetHandler{
@@ -981,7 +1083,7 @@ func (s *StandaloneSuite) TestGetIndexWithNoPrefix(c *C) {
        arv, err := arvadosclient.MakeArvadosClient()
        kc, _ := MakeKeepClient(&arv)
        arv.ApiToken = "abc123"
-       kc.SetServiceRoots(map[string]string{"x": ks.url}, map[string]string{ks.url: ""}, nil)
+       kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
 
        r, err := kc.GetIndex("x", "")
        c.Check(err, Equals, nil)
@@ -1007,7 +1109,7 @@ func (s *StandaloneSuite) TestGetIndexWithPrefix(c *C) {
        arv, err := arvadosclient.MakeArvadosClient()
        kc, _ := MakeKeepClient(&arv)
        arv.ApiToken = "abc123"
-       kc.SetServiceRoots(map[string]string{"x": ks.url}, map[string]string{ks.url: ""}, nil)
+       kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
 
        r, err := kc.GetIndex("x", hash[0:3])
        c.Check(err, Equals, nil)
@@ -1033,7 +1135,7 @@ func (s *StandaloneSuite) TestGetIndexIncomplete(c *C) {
        arv, err := arvadosclient.MakeArvadosClient()
        kc, _ := MakeKeepClient(&arv)
        arv.ApiToken = "abc123"
-       kc.SetServiceRoots(map[string]string{"x": ks.url}, map[string]string{ks.url: ""}, nil)
+       kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
 
        _, err = kc.GetIndex("x", hash[0:3])
        c.Check(err, Equals, ErrIncompleteIndex)
@@ -1055,7 +1157,7 @@ func (s *StandaloneSuite) TestGetIndexWithNoSuchServer(c *C) {
        arv, err := arvadosclient.MakeArvadosClient()
        kc, _ := MakeKeepClient(&arv)
        arv.ApiToken = "abc123"
-       kc.SetServiceRoots(map[string]string{"x": ks.url}, map[string]string{ks.url: ""}, nil)
+       kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
 
        _, err = kc.GetIndex("y", hash[0:3])
        c.Check(err, Equals, ErrNoSuchKeepServer)
@@ -1075,7 +1177,7 @@ func (s *StandaloneSuite) TestGetIndexWithNoSuchPrefix(c *C) {
        arv, err := arvadosclient.MakeArvadosClient()
        kc, _ := MakeKeepClient(&arv)
        arv.ApiToken = "abc123"
-       kc.SetServiceRoots(map[string]string{"x": ks.url}, map[string]string{ks.url: ""}, nil)
+       kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
 
        r, err := kc.GetIndex("x", "abcd")
        c.Check(err, Equals, nil)
index 51e3e082b683e01ca153d352e54b8e53db9a9893..0791d3cf856ee7d5d1268338eafa883fe9bcbb18 100644 (file)
@@ -2,6 +2,7 @@ package keepclient
 
 import (
        "crypto/md5"
+       "encoding/json"
        "errors"
        "fmt"
        "git.curoverse.com/arvados.git/sdk/go/streamer"
@@ -76,19 +77,38 @@ func (this *KeepClient) setClientSettingsDisk() {
        }
 }
 
+type svcList struct {
+       Items []keepService `json:"items"`
+}
+
 // DiscoverKeepServers gets list of available keep services from api server
 func (this *KeepClient) DiscoverKeepServers() error {
-       type svcList struct {
-               Items []keepService `json:"items"`
-       }
-       var m svcList
+       var list svcList
 
        // Get keep services from api server
-       err := this.Arvados.Call("GET", "keep_services", "", "accessible", nil, &m)
+       err := this.Arvados.Call("GET", "keep_services", "", "accessible", nil, &list)
        if err != nil {
                return err
        }
 
+       return this.loadKeepServers(list)
+}
+
+// LoadKeepServicesFromJSON gets list of available keep services from given JSON
+func (this *KeepClient) LoadKeepServicesFromJSON(services string) error {
+       var list svcList
+
+       // Load keep services from given json
+       dec := json.NewDecoder(strings.NewReader(services))
+       if err := dec.Decode(&list); err != nil {
+               return err
+       }
+
+       return this.loadKeepServers(list)
+}
+
+// loadKeepServers
+func (this *KeepClient) loadKeepServers(list svcList) error {
        listed := make(map[string]bool)
        localRoots := make(map[string]string)
        gatewayRoots := make(map[string]string)
@@ -98,7 +118,7 @@ func (this *KeepClient) DiscoverKeepServers() error {
        this.replicasPerService = 1
        this.Using_proxy = false
 
-       for _, service := range m.Items {
+       for _, service := range list.Items {
                scheme := "http"
                if service.SSL {
                        scheme = "https"
index 853d7d30354daddb86e602c4b580141fc18e1bdf..80aeb268975d8acbc7f7d1c2e771c17e7adfa719 100644 (file)
@@ -251,6 +251,7 @@ func (s *StandaloneSuite) TestTransferShortBuffer(c *C) {
 
        n, err := sr.Read(out)
        c.Check(n, Equals, 100)
+       c.Check(err, IsNil)
 
        n, err = sr.Read(out)
        c.Check(n, Equals, 0)
index a4a194f69bcc8fbdbf43650caa881325bde5dc24..3f5f9344c521f3021e2f6fd35f025e4b9c7fb95e 100644 (file)
@@ -249,9 +249,7 @@ func (this *AsyncStream) transfer(source_reader io.Reader) {
                                        }
                                }
                        } else {
-                               if reader_status == io.EOF {
-                                       // no more reads expected, so this is ok
-                               } else {
+                               if reader_status == nil {
                                        // slices channel closed without signaling EOF
                                        reader_status = io.ErrUnexpectedEOF
                                }
index 0754744241a97a0011fb71ace5792a7be0c27148..8ed86fd79e96144886744dba81786ef21ef12ae8 100644 (file)
@@ -241,19 +241,34 @@ class KeepClient(object):
         Should be used in a "with" block.
         """
         def __init__(self, todo):
+            self._started = 0
             self._todo = todo
             self._done = 0
             self._response = None
+            self._start_lock = threading.Condition()
             self._todo_lock = threading.Semaphore(todo)
             self._done_lock = threading.Lock()
+            self._local = threading.local()
 
         def __enter__(self):
+            self._start_lock.acquire()
+            if getattr(self._local, 'sequence', None) is not None:
+                # If the calling thread has used set_sequence(N), then
+                # we wait here until N other threads have started.
+                while self._started < self._local.sequence:
+                    self._start_lock.wait()
             self._todo_lock.acquire()
+            self._started += 1
+            self._start_lock.notifyAll()
+            self._start_lock.release()
             return self
 
         def __exit__(self, type, value, traceback):
             self._todo_lock.release()
 
+        def set_sequence(self, sequence):
+            self._local.sequence = sequence
+
         def shall_i_proceed(self):
             """
             Return true if the current thread should do stuff. Return
@@ -517,7 +532,11 @@ class KeepClient(object):
             return self._success
 
         def run(self):
-            with self.args['thread_limiter'] as limiter:
+            limiter = self.args['thread_limiter']
+            sequence = self.args['thread_sequence']
+            if sequence is not None:
+                limiter.set_sequence(sequence)
+            with limiter:
                 if not limiter.shall_i_proceed():
                     # My turn arrived, but the job has been done without
                     # me.
@@ -950,9 +969,10 @@ class KeepClient(object):
         thread_limiter = KeepClient.ThreadLimiter(1 if self.max_replicas_per_service is None else copies)
         loop = retry.RetryLoop(num_retries, self._check_loop_result,
                                backoff_start=2)
+        thread_sequence = 0
         for tries_left in loop:
             try:
-                local_roots = self.map_new_services(
+                sorted_roots = self.map_new_services(
                     roots_map, locator,
                     force_rebuild=(tries_left < num_retries), need_writable=True, **headers)
             except Exception as error:
@@ -960,7 +980,8 @@ class KeepClient(object):
                 continue
 
             threads = []
-            for service_root, ks in roots_map.iteritems():
+            for service_root, ks in [(root, roots_map[root])
+                                     for root in sorted_roots]:
                 if ks.finished():
                     continue
                 t = KeepClient.KeepWriterThread(
@@ -969,9 +990,11 @@ class KeepClient(object):
                     data_hash=data_hash,
                     service_root=service_root,
                     thread_limiter=thread_limiter,
-                    timeout=self.current_timeout(num_retries-tries_left))
+                    timeout=self.current_timeout(num_retries-tries_left),
+                    thread_sequence=thread_sequence)
                 t.start()
                 threads.append(t)
+                thread_sequence += 1
             for t in threads:
                 t.join()
             loop.save_result((thread_limiter.done() >= copies, len(threads)))
@@ -984,7 +1007,7 @@ class KeepClient(object):
                     data_hash, loop.last_result()))
         else:
             service_errors = ((key, roots_map[key].last_result()['error'])
-                              for key in local_roots
+                              for key in sorted_roots
                               if roots_map[key].last_result()['error'])
             raise arvados.errors.KeepWriteError(
                 "failed to write {} (wanted {} copies but wrote {})".format(
index 61966054a08f1cb15c4e000ca47e302b5dd7a248..d8a207f2cf2ce506d9406a646272e92c51c201e6 100644 (file)
@@ -3,7 +3,7 @@ error_log stderr info;          # Yes, must be specified here _and_ cmdline
 events {
 }
 http {
-  access_log /dev/stderr combined;
+  access_log {{ACCESSLOG}} combined;
   upstream arv-git-http {
     server localhost:{{GITPORT}};
   }
index 5d0c42ad2109e2d605f5ab45fea5bd64fc26b1e8..d325b4eb6ecb086d15effa34bc26db3e95c9ad15 100644 (file)
@@ -3,6 +3,7 @@
 from __future__ import print_function
 import argparse
 import atexit
+import errno
 import httplib2
 import os
 import pipes
@@ -54,9 +55,7 @@ def find_server_pid(PID_PATH, wait=10):
             with open(PID_PATH, 'r') as f:
                 server_pid = int(f.read())
             good_pid = (os.kill(server_pid, 0) is None)
-        except IOError:
-            good_pid = False
-        except OSError:
+        except EnvironmentError:
             good_pid = False
         now = time.time()
 
@@ -91,9 +90,7 @@ def kill_server_pid(pidfile, wait=10, passenger_root=False):
             os.getpgid(server_pid)
             time.sleep(0.1)
             now = time.time()
-    except IOError:
-        pass
-    except OSError:
+    except EnvironmentError:
         pass
 
 def find_available_port():
@@ -323,8 +320,8 @@ def _start_keep(n, keep_args):
 
     return port
 
-def run_keep(blob_signing_key=None, enforce_permissions=False):
-    stop_keep()
+def run_keep(blob_signing_key=None, enforce_permissions=False, num_servers=2):
+    stop_keep(num_servers)
 
     keep_args = {}
     if not blob_signing_key:
@@ -344,12 +341,13 @@ def run_keep(blob_signing_key=None, enforce_permissions=False):
         host=os.environ['ARVADOS_API_HOST'],
         token=os.environ['ARVADOS_API_TOKEN'],
         insecure=True)
+
     for d in api.keep_services().list().execute()['items']:
         api.keep_services().delete(uuid=d['uuid']).execute()
     for d in api.keep_disks().list().execute()['items']:
         api.keep_disks().delete(uuid=d['uuid']).execute()
 
-    for d in range(0, 2):
+    for d in range(0, num_servers):
         port = _start_keep(d, keep_args)
         svc = api.keep_services().create(body={'keep_service': {
             'uuid': 'zzzzz-bi6l4-keepdisk{:07d}'.format(d),
@@ -371,9 +369,9 @@ def _stop_keep(n):
     if os.path.exists(os.path.join(TEST_TMPDIR, "keep.blob_signing_key")):
         os.remove(os.path.join(TEST_TMPDIR, "keep.blob_signing_key"))
 
-def stop_keep():
-    _stop_keep(0)
-    _stop_keep(1)
+def stop_keep(num_servers=2):
+    for n in range(0, num_servers):
+        _stop_keep(n)
 
 def run_keep_proxy():
     if 'ARVADOS_TEST_PROXY_SERVICES' in os.environ:
@@ -447,6 +445,7 @@ def run_nginx():
     nginxconf['GITSSLPORT'] = find_available_port()
     nginxconf['SSLCERT'] = os.path.join(SERVICES_SRC_DIR, 'api', 'tmp', 'self-signed.pem')
     nginxconf['SSLKEY'] = os.path.join(SERVICES_SRC_DIR, 'api', 'tmp', 'self-signed.key')
+    nginxconf['ACCESSLOG'] = os.path.join(TEST_TMPDIR, 'nginx_access_log.fifo')
 
     conftemplatefile = os.path.join(MY_DIRNAME, 'nginx.conf')
     conffile = os.path.join(TEST_TMPDIR, 'nginx.conf')
@@ -458,12 +457,23 @@ def run_nginx():
 
     env = os.environ.copy()
     env['PATH'] = env['PATH']+':/sbin:/usr/sbin:/usr/local/sbin'
+
+    try:
+        os.remove(nginxconf['ACCESSLOG'])
+    except OSError as error:
+        if error.errno != errno.ENOENT:
+            raise
+
+    os.mkfifo(nginxconf['ACCESSLOG'], 0700)
     nginx = subprocess.Popen(
         ['nginx',
          '-g', 'error_log stderr info;',
          '-g', 'pid '+_pidfile('nginx')+';',
          '-c', conffile],
         env=env, stdin=open('/dev/null'), stdout=sys.stderr)
+    cat_access = subprocess.Popen(
+        ['cat', nginxconf['ACCESSLOG']],
+        stdout=sys.stderr)
     _setport('keepproxy-ssl', nginxconf['KEEPPROXYSSLPORT'])
     _setport('arv-git-httpd-ssl', nginxconf['GITSSLPORT'])
 
@@ -595,6 +605,9 @@ if __name__ == "__main__":
     parser = argparse.ArgumentParser()
     parser.add_argument('action', type=str, help="one of {}".format(actions))
     parser.add_argument('--auth', type=str, metavar='FIXTURE_NAME', help='Print authorization info for given api_client_authorizations fixture')
+    parser.add_argument('--num-keep-servers', metavar='int', type=int, default=2, help="Number of keep servers desired")
+    parser.add_argument('--keep-enforce-permissions', action="store_true", help="Enforce keep permissions")
+
     args = parser.parse_args()
 
     if args.action not in actions:
@@ -614,7 +627,7 @@ if __name__ == "__main__":
     elif args.action == 'stop':
         stop(force=('ARVADOS_TEST_API_HOST' not in os.environ))
     elif args.action == 'start_keep':
-        run_keep()
+        run_keep(enforce_permissions=args.keep_enforce_permissions, num_servers=args.num_keep_servers)
     elif args.action == 'stop_keep':
         stop_keep()
     elif args.action == 'start_keep_proxy':
index c44379bac79465417e9a7d128d1aa47f13d6a6fa..90468924a668b09e9116084adf20581878a382b2 100644 (file)
@@ -332,39 +332,158 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
                 mock.responses[0].getopt(pycurl.TIMEOUT_MS),
                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]*1000))
 
-    def test_probe_order_reference_set(self):
+    def check_no_services_error(self, verb, exc_class):
+        api_client = mock.MagicMock(name='api_client')
+        api_client.keep_services().accessible().execute.side_effect = (
+            arvados.errors.ApiError)
+        keep_client = arvados.KeepClient(api_client=api_client)
+        with self.assertRaises(exc_class) as err_check:
+            getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0')
+        self.assertEqual(0, len(err_check.exception.request_errors()))
+
+    def test_get_error_with_no_services(self):
+        self.check_no_services_error('get', arvados.errors.KeepReadError)
+
+    def test_put_error_with_no_services(self):
+        self.check_no_services_error('put', arvados.errors.KeepWriteError)
+
+    def check_errors_from_last_retry(self, verb, exc_class):
+        api_client = self.mock_keep_services(count=2)
+        req_mock = tutil.mock_keep_responses(
+            "retry error reporting test", 500, 500, 403, 403)
+        with req_mock, tutil.skip_sleep, \
+                self.assertRaises(exc_class) as err_check:
+            keep_client = arvados.KeepClient(api_client=api_client)
+            getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0',
+                                       num_retries=3)
+        self.assertEqual([403, 403], [
+                getattr(error, 'status_code', None)
+                for error in err_check.exception.request_errors().itervalues()])
+
+    def test_get_error_reflects_last_retry(self):
+        self.check_errors_from_last_retry('get', arvados.errors.KeepReadError)
+
+    def test_put_error_reflects_last_retry(self):
+        self.check_errors_from_last_retry('put', arvados.errors.KeepWriteError)
+
+    def test_put_error_does_not_include_successful_puts(self):
+        data = 'partial failure test'
+        data_loc = '{}+{}'.format(hashlib.md5(data).hexdigest(), len(data))
+        api_client = self.mock_keep_services(count=3)
+        with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
+                self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
+            keep_client = arvados.KeepClient(api_client=api_client)
+            keep_client.put(data)
+        self.assertEqual(2, len(exc_check.exception.request_errors()))
+
+    def test_proxy_put_with_no_writable_services(self):
+        data = 'test with no writable services'
+        data_loc = '{}+{}'.format(hashlib.md5(data).hexdigest(), len(data))
+        api_client = self.mock_keep_services(service_type='proxy', read_only=True, count=1)
+        with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
+                self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
+          keep_client = arvados.KeepClient(api_client=api_client)
+          keep_client.put(data)
+        self.assertEqual(True, ("no Keep services available" in str(exc_check.exception)))
+        self.assertEqual(0, len(exc_check.exception.request_errors()))
+
+
+@tutil.skip_sleep
+class KeepClientRendezvousTestCase(unittest.TestCase, tutil.ApiClientMock):
+
+    def setUp(self):
         # expected_order[i] is the probe order for
         # hash=md5(sprintf("%064x",i)) where there are 16 services
         # with uuid sprintf("anything-%015x",j) with j in 0..15. E.g.,
         # the first probe for the block consisting of 64 "0"
         # characters is the service whose uuid is
         # "zzzzz-bi6l4-000000000000003", so expected_order[0][0]=='3'.
-        expected_order = [
+        self.services = 16
+        self.expected_order = [
             list('3eab2d5fc9681074'),
             list('097dba52e648f1c3'),
             list('c5b4e023f8a7d691'),
             list('9d81c02e76a3bf54'),
             ]
-        hashes = [
-            hashlib.md5("{:064x}".format(x)).hexdigest()
-            for x in range(len(expected_order))]
-        api_client = self.mock_keep_services(count=16)
-        keep_client = arvados.KeepClient(api_client=api_client)
-        for i, hash in enumerate(hashes):
-            roots = keep_client.weighted_service_roots(arvados.KeepLocator(hash))
+        self.blocks = [
+            "{:064x}".format(x)
+            for x in range(len(self.expected_order))]
+        self.hashes = [
+            hashlib.md5(self.blocks[x]).hexdigest()
+            for x in range(len(self.expected_order))]
+        self.api_client = self.mock_keep_services(count=self.services)
+        self.keep_client = arvados.KeepClient(api_client=self.api_client)
+
+    def test_weighted_service_roots_against_reference_set(self):
+        # Confirm weighted_service_roots() returns the correct order
+        for i, hash in enumerate(self.hashes):
+            roots = self.keep_client.weighted_service_roots(arvados.KeepLocator(hash))
             got_order = [
                 re.search(r'//\[?keep0x([0-9a-f]+)', root).group(1)
                 for root in roots]
-            self.assertEqual(expected_order[i], got_order)
+            self.assertEqual(self.expected_order[i], got_order)
+
+    def test_get_probe_order_against_reference_set(self):
+        self._test_probe_order_against_reference_set(
+            lambda i: self.keep_client.get(self.hashes[i], num_retries=1))
+
+    def test_put_probe_order_against_reference_set(self):
+        # copies=1 prevents the test from being sensitive to races
+        # between writer threads.
+        self._test_probe_order_against_reference_set(
+            lambda i: self.keep_client.put(self.blocks[i], num_retries=1, copies=1))
+
+    def _test_probe_order_against_reference_set(self, op):
+        for i in range(len(self.blocks)):
+            with tutil.mock_keep_responses('', *[500 for _ in range(self.services*2)]) as mock, \
+                 self.assertRaises(arvados.errors.KeepRequestError):
+                op(i)
+            got_order = [
+                re.search(r'//\[?keep0x([0-9a-f]+)', resp.getopt(pycurl.URL)).group(1)
+                for resp in mock.responses]
+            self.assertEqual(self.expected_order[i]*2, got_order)
+
+    def test_put_probe_order_multiple_copies(self):
+        for copies in range(2, 4):
+            for i in range(len(self.blocks)):
+                with tutil.mock_keep_responses('', *[500 for _ in range(self.services*3)]) as mock, \
+                     self.assertRaises(arvados.errors.KeepWriteError):
+                    self.keep_client.put(self.blocks[i], num_retries=2, copies=copies)
+                got_order = [
+                    re.search(r'//\[?keep0x([0-9a-f]+)', resp.getopt(pycurl.URL)).group(1)
+                    for resp in mock.responses]
+                # With T threads racing to make requests, the position
+                # of a given server in the sequence of HTTP requests
+                # (got_order) cannot be more than T-1 positions
+                # earlier than that server's position in the reference
+                # probe sequence (expected_order).
+                #
+                # Loop invariant: we have accounted for +pos+ expected
+                # probes, either by seeing them in +got_order+ or by
+                # putting them in +pending+ in the hope of seeing them
+                # later. As long as +len(pending)<T+, we haven't
+                # started a request too early.
+                pending = []
+                for pos, expected in enumerate(self.expected_order[i]*3):
+                    got = got_order[pos-len(pending)]
+                    while got in pending:
+                        del pending[pending.index(got)]
+                        got = got_order[pos-len(pending)]
+                    if got != expected:
+                        pending.append(expected)
+                        self.assertLess(
+                            len(pending), copies,
+                            "pending={}, with copies={}, got {}, expected {}".format(
+                                pending, copies, repr(got_order), repr(self.expected_order[i]*3)))
 
     def test_probe_waste_adding_one_server(self):
         hashes = [
             hashlib.md5("{:064x}".format(x)).hexdigest() for x in range(100)]
         initial_services = 12
-        api_client = self.mock_keep_services(count=initial_services)
-        keep_client = arvados.KeepClient(api_client=api_client)
+        self.api_client = self.mock_keep_services(count=initial_services)
+        self.keep_client = arvados.KeepClient(api_client=self.api_client)
         probes_before = [
-            keep_client.weighted_service_roots(arvados.KeepLocator(hash)) for hash in hashes]
+            self.keep_client.weighted_service_roots(arvados.KeepLocator(hash)) for hash in hashes]
         for added_services in range(1, 12):
             api_client = self.mock_keep_services(count=initial_services+added_services)
             keep_client = arvados.KeepClient(api_client=api_client)
@@ -402,7 +521,7 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
             data = hashlib.md5(data).hexdigest() + '+1234'
         # Arbitrary port number:
         aport = random.randint(1024,65535)
-        api_client = self.mock_keep_services(service_port=aport, count=16)
+        api_client = self.mock_keep_services(service_port=aport, count=self.services)
         keep_client = arvados.KeepClient(api_client=api_client)
         with mock.patch('pycurl.Curl') as curl_mock, \
              self.assertRaises(exc_class) as err_check:
@@ -419,60 +538,6 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
     def test_put_error_shows_probe_order(self):
         self.check_64_zeros_error_order('put', arvados.errors.KeepWriteError)
 
-    def check_no_services_error(self, verb, exc_class):
-        api_client = mock.MagicMock(name='api_client')
-        api_client.keep_services().accessible().execute.side_effect = (
-            arvados.errors.ApiError)
-        keep_client = arvados.KeepClient(api_client=api_client)
-        with self.assertRaises(exc_class) as err_check:
-            getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0')
-        self.assertEqual(0, len(err_check.exception.request_errors()))
-
-    def test_get_error_with_no_services(self):
-        self.check_no_services_error('get', arvados.errors.KeepReadError)
-
-    def test_put_error_with_no_services(self):
-        self.check_no_services_error('put', arvados.errors.KeepWriteError)
-
-    def check_errors_from_last_retry(self, verb, exc_class):
-        api_client = self.mock_keep_services(count=2)
-        req_mock = tutil.mock_keep_responses(
-            "retry error reporting test", 500, 500, 403, 403)
-        with req_mock, tutil.skip_sleep, \
-                self.assertRaises(exc_class) as err_check:
-            keep_client = arvados.KeepClient(api_client=api_client)
-            getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0',
-                                       num_retries=3)
-        self.assertEqual([403, 403], [
-                getattr(error, 'status_code', None)
-                for error in err_check.exception.request_errors().itervalues()])
-
-    def test_get_error_reflects_last_retry(self):
-        self.check_errors_from_last_retry('get', arvados.errors.KeepReadError)
-
-    def test_put_error_reflects_last_retry(self):
-        self.check_errors_from_last_retry('put', arvados.errors.KeepWriteError)
-
-    def test_put_error_does_not_include_successful_puts(self):
-        data = 'partial failure test'
-        data_loc = '{}+{}'.format(hashlib.md5(data).hexdigest(), len(data))
-        api_client = self.mock_keep_services(count=3)
-        with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
-                self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
-            keep_client = arvados.KeepClient(api_client=api_client)
-            keep_client.put(data)
-        self.assertEqual(2, len(exc_check.exception.request_errors()))
-
-    def test_proxy_put_with_no_writable_services(self):
-        data = 'test with no writable services'
-        data_loc = '{}+{}'.format(hashlib.md5(data).hexdigest(), len(data))
-        api_client = self.mock_keep_services(service_type='proxy', read_only=True, count=1)
-        with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
-                self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
-          keep_client = arvados.KeepClient(api_client=api_client)
-          keep_client.put(data)
-        self.assertEqual(True, ("no Keep services available" in str(exc_check.exception)))
-        self.assertEqual(0, len(exc_check.exception.request_errors()))
 
 class KeepClientTimeout(unittest.TestCase, tutil.ApiClientMock):
     DATA = 'x' * 2**10
index 8f1b687a9cad9a01e593b08f1e34cee2d4db8e53..1193e915363e80e7ccc70a0ce16c731dc3ebcaeb 100644 (file)
@@ -74,7 +74,7 @@ gem 'faye-websocket'
 gem 'themes_for_rails'
 
 gem 'arvados', '>= 0.1.20150615153458'
-gem 'arvados-cli', '>= 0.1.20150128223752'
+gem 'arvados-cli', '>=  0.1.20150605170031'
 
 # pg_power lets us use partial indexes in schema.rb in Rails 3
 gem 'pg_power'
index d671182a57c749b65bd9f7c93d2db215bc762f16..be4d4606ab21599e6e1c744d9f1ad40dd25887db 100644 (file)
@@ -41,10 +41,10 @@ GEM
       google-api-client (~> 0.6.3, >= 0.6.3)
       json (~> 1.7, >= 1.7.7)
       jwt (>= 0.1.5, < 1.0.0)
-    arvados-cli (0.1.20150205181653)
+    arvados-cli (0.1.20150930141818)
       activesupport (~> 3.2, >= 3.2.13)
       andand (~> 1.3, >= 1.3.3)
-      arvados (~> 0.1, >= 0.1.20150615153458)
+      arvados (~> 0.1, >= 0.1.20150128223554)
       curb (~> 0.8)
       google-api-client (~> 0.6.3, >= 0.6.3)
       json (~> 1.7, >= 1.7.7)
@@ -69,7 +69,7 @@ GEM
       coffee-script-source
       execjs
     coffee-script-source (1.7.0)
-    curb (0.8.6)
+    curb (0.8.8)
     daemon_controller (1.2.0)
     database_cleaner (1.2.0)
     erubis (2.7.0)
@@ -228,7 +228,7 @@ DEPENDENCIES
   acts_as_api
   andand
   arvados (>= 0.1.20150615153458)
-  arvados-cli (>= 0.1.20150128223752)
+  arvados-cli (>= 0.1.20150605170031)
   coffee-rails (~> 3.2.0)
   database_cleaner
   factory_girl_rails
index 3d9bb3da90a2c828207318908f1ee083d0d0d5f3..c2cb762d52b625b625634f24d385ddbf9ad4e7d8 100644 (file)
@@ -31,7 +31,7 @@ func SetupDataManagerTest(t *testing.T) {
        // start api and keep servers
        arvadostest.ResetEnv()
        arvadostest.StartAPI()
-       arvadostest.StartKeep()
+       arvadostest.StartKeep(2, false)
 
        arv = makeArvadosClient()
 
@@ -54,7 +54,7 @@ func SetupDataManagerTest(t *testing.T) {
 }
 
 func TearDownDataManagerTest(t *testing.T) {
-       arvadostest.StopKeep()
+       arvadostest.StopKeep(2)
        arvadostest.StopAPI()
 }
 
index 6fe8fe7ac3d0023c4d012be022c9ab3d50431d62..7643e4b0fa2225492caae1dd0aff3428505bd86d 100644 (file)
@@ -53,7 +53,7 @@ func closeListener() {
 
 func (s *ServerRequiredSuite) SetUpSuite(c *C) {
        arvadostest.StartAPI()
-       arvadostest.StartKeep()
+       arvadostest.StartKeep(2, false)
 }
 
 func (s *ServerRequiredSuite) SetUpTest(c *C) {
@@ -61,7 +61,7 @@ func (s *ServerRequiredSuite) SetUpTest(c *C) {
 }
 
 func (s *ServerRequiredSuite) TearDownSuite(c *C) {
-       arvadostest.StopKeep()
+       arvadostest.StopKeep(2)
        arvadostest.StopAPI()
 }
 
index 657c3151caade8aeca6cbedcd96ffef70d3d7ac4..e9fda2ab76dd41c6cdde36cb139e3f3f030c273b 100644 (file)
@@ -9,6 +9,7 @@ import (
        "io/ioutil"
        "log"
        "os"
+       "regexp"
        "strings"
        "time"
 
@@ -19,8 +20,8 @@ var (
        azureStorageAccountName    string
        azureStorageAccountKeyFile string
        azureStorageReplication    int
-       azureWriteRaceInterval     time.Duration = 15 * time.Second
-       azureWriteRacePollTime     time.Duration = time.Second
+       azureWriteRaceInterval     = 15 * time.Second
+       azureWriteRacePollTime     = time.Second
 )
 
 func readKeyFromFile(file string) (string, error) {
@@ -96,6 +97,9 @@ type AzureBlobVolume struct {
        replication   int
 }
 
+// NewAzureBlobVolume returns a new AzureBlobVolume using the given
+// client and container name. The replication argument specifies the
+// replication level to report when writing data.
 func NewAzureBlobVolume(client storage.Client, containerName string, readonly bool, replication int) *AzureBlobVolume {
        return &AzureBlobVolume{
                azClient:      client,
@@ -118,6 +122,12 @@ func (v *AzureBlobVolume) Check() error {
        return nil
 }
 
+// Get reads a Keep block that has been stored as a block blob in the
+// container.
+//
+// If the block is younger than azureWriteRaceInterval and is
+// unexpectedly empty, assume a PutBlob operation is in progress, and
+// wait for it to finish writing.
 func (v *AzureBlobVolume) Get(loc string) ([]byte, error) {
        var deadline time.Time
        haveDeadline := false
@@ -169,6 +179,7 @@ func (v *AzureBlobVolume) get(loc string) ([]byte, error) {
        }
 }
 
+// Compare the given data with existing stored data.
 func (v *AzureBlobVolume) Compare(loc string, expect []byte) error {
        rdr, err := v.bsClient.GetBlob(v.containerName, loc)
        if err != nil {
@@ -178,6 +189,7 @@ func (v *AzureBlobVolume) Compare(loc string, expect []byte) error {
        return compareReaderWithBuf(rdr, expect, loc[:32])
 }
 
+// Put sotres a Keep block as a block blob in the container.
 func (v *AzureBlobVolume) Put(loc string, block []byte) error {
        if v.readonly {
                return MethodDisabledError
@@ -185,6 +197,7 @@ func (v *AzureBlobVolume) Put(loc string, block []byte) error {
        return v.bsClient.CreateBlockBlobFromReader(v.containerName, loc, uint64(len(block)), bytes.NewReader(block))
 }
 
+// Touch updates the last-modified property of a block blob.
 func (v *AzureBlobVolume) Touch(loc string) error {
        if v.readonly {
                return MethodDisabledError
@@ -194,6 +207,7 @@ func (v *AzureBlobVolume) Touch(loc string) error {
        })
 }
 
+// Mtime returns the last-modified property of a block blob.
 func (v *AzureBlobVolume) Mtime(loc string) (time.Time, error) {
        props, err := v.bsClient.GetBlobProperties(v.containerName, loc)
        if err != nil {
@@ -202,6 +216,8 @@ func (v *AzureBlobVolume) Mtime(loc string) (time.Time, error) {
        return time.Parse(time.RFC1123, props.LastModified)
 }
 
+// IndexTo writes a list of Keep blocks that are stored in the
+// container.
 func (v *AzureBlobVolume) IndexTo(prefix string, writer io.Writer) error {
        params := storage.ListBlobsParameters{
                Prefix: prefix,
@@ -216,6 +232,9 @@ func (v *AzureBlobVolume) IndexTo(prefix string, writer io.Writer) error {
                        if err != nil {
                                return err
                        }
+                       if !v.isKeepBlock(b.Name) {
+                               continue
+                       }
                        if b.Properties.ContentLength == 0 && t.Add(azureWriteRaceInterval).After(time.Now()) {
                                // A new zero-length blob is probably
                                // just a new non-empty blob that
@@ -233,6 +252,7 @@ func (v *AzureBlobVolume) IndexTo(prefix string, writer io.Writer) error {
        }
 }
 
+// Delete a Keep block.
 func (v *AzureBlobVolume) Delete(loc string) error {
        if v.readonly {
                return MethodDisabledError
@@ -256,6 +276,7 @@ func (v *AzureBlobVolume) Delete(loc string) error {
        })
 }
 
+// Status returns a VolumeStatus struct with placeholder data.
 func (v *AzureBlobVolume) Status() *VolumeStatus {
        return &VolumeStatus{
                DeviceNum: 1,
@@ -264,14 +285,19 @@ func (v *AzureBlobVolume) Status() *VolumeStatus {
        }
 }
 
+// String returns a volume label, including the container name.
 func (v *AzureBlobVolume) String() string {
        return fmt.Sprintf("azure-storage-container:%+q", v.containerName)
 }
 
+// Writable returns true, unless the -readonly flag was on when the
+// volume was added.
 func (v *AzureBlobVolume) Writable() bool {
        return !v.readonly
 }
 
+// Replication returns the replication level of the container, as
+// specified by the -azure-storage-replication argument.
 func (v *AzureBlobVolume) Replication() int {
        return v.replication
 }
@@ -289,3 +315,8 @@ func (v *AzureBlobVolume) translateError(err error) error {
                return err
        }
 }
+
+var keepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
+func (v *AzureBlobVolume) isKeepBlock(s string) bool {
+       return keepBlockRegexp.MatchString(s)
+}
index a4c6e62a7d5f6f02c938ae27872cb96bb6a8e3e6..a240c23e1622b525f62a6957a23c025651f94190 100644 (file)
@@ -60,11 +60,11 @@ func newAzStubHandler() *azStubHandler {
 }
 
 func (h *azStubHandler) TouchWithDate(container, hash string, t time.Time) {
-       if blob, ok := h.blobs[container+"|"+hash]; !ok {
+       blob, ok := h.blobs[container+"|"+hash]
+       if !ok {
                return
-       } else {
-               blob.Mtime = t
        }
+       blob.Mtime = t
 }
 
 func (h *azStubHandler) PutRaw(container, hash string, data []byte) {
@@ -427,7 +427,7 @@ func TestAzureBlobVolumeCreateBlobRaceDeadline(t *testing.T) {
        azureWriteRaceInterval = 2 * time.Second
        azureWriteRacePollTime = 5 * time.Millisecond
 
-       v.PutRaw(TestHash, []byte{})
+       v.PutRaw(TestHash, nil)
 
        buf := new(bytes.Buffer)
        v.IndexTo("", buf)
index e0bad0045af34d8067591fa723e9229be093790c..3a3069ab7745c1efc0a21fd8c706565f65c525e0 100644 (file)
@@ -27,7 +27,7 @@ func SetupPullWorkerIntegrationTest(t *testing.T, testData PullWorkIntegrationTe
 
        // start api and keep servers
        arvadostest.StartAPI()
-       arvadostest.StartKeep()
+       arvadostest.StartKeep(2, false)
 
        // make arvadosclient
        arv, err := arvadosclient.MakeArvadosClient()
index 7ef079f1f7ac5b588c87e31918739db77a22ab82..61088f10fa2d4ef30e969a77e107b824073684e6 100644 (file)
@@ -350,6 +350,13 @@ func testIndexTo(t *testing.T, factory TestableVolumeFactory) {
        v.PutRaw(TestHash2, TestBlock2)
        v.PutRaw(TestHash3, TestBlock3)
 
+       // Blocks whose names aren't Keep hashes should be omitted from
+       // index
+       v.PutRaw("fffffffffnotreallyahashfffffffff", nil)
+       v.PutRaw("FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF", nil)
+       v.PutRaw("f0000000000000000000000000000000f", nil)
+       v.PutRaw("f00", nil)
+
        buf := new(bytes.Buffer)
        v.IndexTo("", buf)
        indexRows := strings.Split(string(buf.Bytes()), "\n")
index e745eb269100c5b5b0ba5ff7e1df6c62ab5cb089..910cc25d613cb7690f944b418aebf5c205c7aced 100644 (file)
@@ -292,6 +292,7 @@ func (v *UnixVolume) Status() *VolumeStatus {
 }
 
 var blockDirRe = regexp.MustCompile(`^[0-9a-f]+$`)
+var blockFileRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
 
 // IndexTo writes (to the given Writer) a list of blocks found on this
 // volume which begin with the specified prefix. If the prefix is an
@@ -348,6 +349,9 @@ func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
                        if !strings.HasPrefix(name, prefix) {
                                continue
                        }
+                       if !blockFileRe.MatchString(name) {
+                               continue
+                       }
                        _, err = fmt.Fprint(w,
                                name,
                                "+", fileInfo[0].Size(),
index dfb26bc303d2b310f1186a1fc72baf26438608b1..ec5014e9f9cf1e8848353cf3c755e22875227850 100644 (file)
@@ -34,7 +34,15 @@ class ComputeNodeShutdownActor(ShutdownActorBase):
     def _get_slurm_state(self):
         return subprocess.check_output(['sinfo', '--noheader', '-o', '%t', '-n', self._nodename])
 
-    @ShutdownActorBase._retry((subprocess.CalledProcessError,))
+    # The following methods retry on OSError.  This is intended to mitigate bug
+    # #6321 where fork() of node manager raises "OSError: [Errno 12] Cannot
+    # allocate memory" resulting in the untimely death of the shutdown actor
+    # and tends to result in node manager getting into a wedged state where it
+    # won't allocate new nodes or shut down gracefully.  The underlying causes
+    # of the excessive memory usage that result in the "Cannot allocate memory"
+    # error are still being investigated.
+
+    @ShutdownActorBase._retry((subprocess.CalledProcessError, OSError))
     def cancel_shutdown(self):
         if self._nodename:
             if self._get_slurm_state() in self.SLURM_DRAIN_STATES:
@@ -46,15 +54,15 @@ class ComputeNodeShutdownActor(ShutdownActorBase):
                 pass
         return super(ComputeNodeShutdownActor, self).cancel_shutdown()
 
+    @ShutdownActorBase._retry((subprocess.CalledProcessError, OSError))
     @ShutdownActorBase._stop_if_window_closed
-    @ShutdownActorBase._retry((subprocess.CalledProcessError,))
     def issue_slurm_drain(self):
         self._set_node_state('DRAIN', 'Reason=Node Manager shutdown')
         self._logger.info("Waiting for SLURM node %s to drain", self._nodename)
         self._later.await_slurm_drain()
 
+    @ShutdownActorBase._retry((subprocess.CalledProcessError, OSError))
     @ShutdownActorBase._stop_if_window_closed
-    @ShutdownActorBase._retry((subprocess.CalledProcessError,))
     def await_slurm_drain(self):
         output = self._get_slurm_state()
         if output in self.SLURM_END_STATES:
index 2ddf7676c8f8ddc4ca6f84630f38d35ca86abe6a..8648783bac5889f11a328af3b277bd1f21da5665 100644 (file)
@@ -73,6 +73,15 @@ class SLURMComputeNodeShutdownActorTestCase(ComputeNodeShutdownActorMixin,
         self.check_success_flag(False, 2)
         self.check_slurm_got_args(proc_mock, 'sinfo', '--noheader', '-o', '%t', '-n', 'compute99')
 
+    def test_cancel_shutdown_retry(self, proc_mock):
+        proc_mock.side_effect = iter([OSError, 'drain\n', OSError, 'idle\n'])
+        self.make_mocks(arvados_node=testutil.arvados_node_mock(job_uuid=True))
+        self.make_actor()
+        self.check_success_flag(False, 2)
+
+    def test_issue_slurm_drain_retry(self, proc_mock):
+        proc_mock.side_effect = iter([OSError, '', OSError, 'drng\n'])
+        self.check_success_after_reset(proc_mock)
 
     def test_arvados_node_cleaned_after_shutdown(self, proc_mock):
         proc_mock.return_value = 'drain\n'
diff --git a/tools/keep-exercise/.gitignore b/tools/keep-exercise/.gitignore
new file mode 100644 (file)
index 0000000..6a1d10c
--- /dev/null
@@ -0,0 +1 @@
+keep-exercise
diff --git a/tools/keep-exercise/keep-exercise.go b/tools/keep-exercise/keep-exercise.go
new file mode 100644 (file)
index 0000000..a94c01e
--- /dev/null
@@ -0,0 +1,157 @@
+// Testing tool for Keep services.
+//
+// keepexercise helps measure throughput and test reliability under
+// various usage patterns.
+//
+// By default, it reads and writes blocks containing 2^26 NUL
+// bytes. This generates network traffic without consuming much disk
+// space.
+//
+// For a more realistic test, enable -vary-request. Warning: this will
+// fill your storage volumes with random data if you leave it running,
+// which can cost you money or leave you with too little room for
+// useful data.
+//
+package main
+
+import (
+       "crypto/rand"
+       "encoding/binary"
+       "flag"
+       "io"
+       "io/ioutil"
+       "log"
+       "time"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+       "git.curoverse.com/arvados.git/sdk/go/keepclient"
+)
+
+// Command line config knobs
+var (
+       BlockSize     = flag.Int("block-size", keepclient.BLOCKSIZE, "bytes per read/write op")
+       ReadThreads   = flag.Int("rthreads", 1, "number of concurrent readers")
+       WriteThreads  = flag.Int("wthreads", 1, "number of concurrent writers")
+       VaryRequest   = flag.Bool("vary-request", false, "vary the data for each request: consumes disk space, exercises write behavior")
+       VaryThread    = flag.Bool("vary-thread", false, "use -wthreads different data blocks")
+       Replicas      = flag.Int("replicas", 1, "replication level for writing")
+       StatsInterval = flag.Duration("stats-interval", time.Second, "time interval between IO stats reports, or 0 to disable")
+)
+
+func main() {
+       flag.Parse()
+
+       arv, err := arvadosclient.MakeArvadosClient()
+       if err != nil {
+               log.Fatal(err)
+       }
+       kc, err := keepclient.MakeKeepClient(&arv)
+       if err != nil {
+               log.Fatal(err)
+       }
+       kc.Want_replicas = *Replicas
+       kc.Client.Timeout = 10 * time.Minute
+
+       nextBuf := make(chan []byte, *WriteThreads)
+       nextLocator := make(chan string, *ReadThreads+*WriteThreads)
+
+       go countBeans(nextLocator)
+       for i := 0; i < *WriteThreads; i++ {
+               go makeBufs(nextBuf, i)
+               go doWrites(kc, nextBuf, nextLocator)
+       }
+       for i := 0; i < *ReadThreads; i++ {
+               go doReads(kc, nextLocator)
+       }
+       <-make(chan struct{})
+}
+
+// Send 1234 to bytesInChan when we receive 1234 bytes from keepstore.
+var bytesInChan = make(chan uint64)
+var bytesOutChan = make(chan uint64)
+
+// Send struct{}{} to errorsChan when an error happens.
+var errorsChan = make(chan struct{})
+
+func countBeans(nextLocator chan string) {
+       t0 := time.Now()
+       var tickChan <-chan time.Time
+       if *StatsInterval > 0 {
+               tickChan = time.NewTicker(*StatsInterval).C
+       }
+       var bytesIn uint64
+       var bytesOut uint64
+       var errors uint64
+       for {
+               select {
+               case <-tickChan:
+                       elapsed := time.Since(t0)
+                       log.Printf("%v elapsed: read %v bytes (%.1f MiB/s), wrote %v bytes (%.1f MiB/s), errors %d",
+                               elapsed,
+                               bytesIn, (float64(bytesIn) / elapsed.Seconds() / 1048576),
+                               bytesOut, (float64(bytesOut) / elapsed.Seconds() / 1048576),
+                               errors,
+                       )
+               case i := <-bytesInChan:
+                       bytesIn += i
+               case o := <-bytesOutChan:
+                       bytesOut += o
+               case <-errorsChan:
+                       errors++
+               }
+       }
+}
+
+func makeBufs(nextBuf chan []byte, threadID int) {
+       buf := make([]byte, *BlockSize)
+       if *VaryThread {
+               binary.PutVarint(buf, int64(threadID))
+       }
+       for {
+               if *VaryRequest {
+                       if _, err := io.ReadFull(rand.Reader, buf); err != nil {
+                               log.Fatal(err)
+                       }
+               }
+               nextBuf <- buf
+       }
+}
+
+func doWrites(kc *keepclient.KeepClient, nextBuf chan []byte, nextLocator chan string) {
+       for buf := range nextBuf {
+               locator, _, err := kc.PutB(buf)
+               if err != nil {
+                       log.Print(err)
+                       errorsChan <- struct{}{}
+                       continue
+               }
+               bytesOutChan <- uint64(len(buf))
+               for cap(nextLocator) > len(nextLocator)+*WriteThreads {
+                       // Give the readers something to do, unless
+                       // they have lots queued up already.
+                       nextLocator <- locator
+               }
+       }
+}
+
+func doReads(kc *keepclient.KeepClient, nextLocator chan string) {
+       for locator := range nextLocator {
+               rdr, size, url, err := kc.Get(locator)
+               if err != nil {
+                       log.Print(err)
+                       errorsChan <- struct{}{}
+                       continue
+               }
+               n, err := io.Copy(ioutil.Discard, rdr)
+               rdr.Close()
+               if n != size || err != nil {
+                       log.Printf("Got %d bytes (expected %d) from %s: %v", n, size, url, err)
+                       errorsChan <- struct{}{}
+                       continue
+                       // Note we don't count the bytes received in
+                       // partial/corrupt responses: we are measuring
+                       // throughput, not resource consumption.
+               }
+               bytesInChan <- uint64(n)
+       }
+}
diff --git a/tools/keep-rsync/.gitignore b/tools/keep-rsync/.gitignore
new file mode 100644 (file)
index 0000000..5ee7f3b
--- /dev/null
@@ -0,0 +1 @@
+keep-rsync
diff --git a/tools/keep-rsync/keep-rsync.go b/tools/keep-rsync/keep-rsync.go
new file mode 100644 (file)
index 0000000..7cd795e
--- /dev/null
@@ -0,0 +1,290 @@
+package main
+
+import (
+       "bufio"
+       "crypto/tls"
+       "errors"
+       "flag"
+       "fmt"
+       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+       "git.curoverse.com/arvados.git/sdk/go/keepclient"
+       "io/ioutil"
+       "log"
+       "net/http"
+       "os"
+       "regexp"
+       "strings"
+       "time"
+)
+
+func main() {
+       err := doMain()
+       if err != nil {
+               log.Fatalf("%v", err)
+       }
+}
+
+func doMain() error {
+       flags := flag.NewFlagSet("keep-rsync", flag.ExitOnError)
+
+       srcConfigFile := flags.String(
+               "src",
+               "",
+               "Source configuration filename. May be either a pathname to a config file, or (for example) 'foo' as shorthand for $HOME/.config/arvados/foo.conf")
+
+       dstConfigFile := flags.String(
+               "dst",
+               "",
+               "Destination configuration filename. May be either a pathname to a config file, or (for example) 'foo' as shorthand for $HOME/.config/arvados/foo.conf")
+
+       srcKeepServicesJSON := flags.String(
+               "src-keep-services-json",
+               "",
+               "An optional list of available source keepservices. "+
+                       "If not provided, this list is obtained from api server configured in src-config-file.")
+
+       dstKeepServicesJSON := flags.String(
+               "dst-keep-services-json",
+               "",
+               "An optional list of available destination keepservices. "+
+                       "If not provided, this list is obtained from api server configured in dst-config-file.")
+
+       replications := flags.Int(
+               "replications",
+               0,
+               "Number of replications to write to the destination. If replications not specified, "+
+                       "default replication level configured on destination server will be used.")
+
+       prefix := flags.String(
+               "prefix",
+               "",
+               "Index prefix")
+
+       // Parse args; omit the first arg which is the command name
+       flags.Parse(os.Args[1:])
+
+       srcConfig, srcBlobSigningKey, err := loadConfig(*srcConfigFile)
+       if err != nil {
+               return fmt.Errorf("Error loading src configuration from file: %s", err.Error())
+       }
+
+       dstConfig, _, err := loadConfig(*dstConfigFile)
+       if err != nil {
+               return fmt.Errorf("Error loading dst configuration from file: %s", err.Error())
+       }
+
+       // setup src and dst keepclients
+       kcSrc, err := setupKeepClient(srcConfig, *srcKeepServicesJSON, false, 0)
+       if err != nil {
+               return fmt.Errorf("Error configuring src keepclient: %s", err.Error())
+       }
+
+       kcDst, err := setupKeepClient(dstConfig, *dstKeepServicesJSON, true, *replications)
+       if err != nil {
+               return fmt.Errorf("Error configuring dst keepclient: %s", err.Error())
+       }
+
+       // Copy blocks not found in dst from src
+       err = performKeepRsync(kcSrc, kcDst, srcBlobSigningKey, *prefix)
+       if err != nil {
+               return fmt.Errorf("Error while syncing data: %s", err.Error())
+       }
+
+       return nil
+}
+
+type apiConfig struct {
+       APIToken        string
+       APIHost         string
+       APIHostInsecure bool
+       ExternalClient  bool
+}
+
+// Load src and dst config from given files
+func loadConfig(configFile string) (config apiConfig, blobSigningKey string, err error) {
+       if configFile == "" {
+               return config, blobSigningKey, errors.New("config file not specified")
+       }
+
+       config, blobSigningKey, err = readConfigFromFile(configFile)
+       if err != nil {
+               return config, blobSigningKey, fmt.Errorf("Error reading config file: %v", err)
+       }
+
+       return
+}
+
+var matchTrue = regexp.MustCompile("^(?i:1|yes|true)$")
+
+// Read config from file
+func readConfigFromFile(filename string) (config apiConfig, blobSigningKey string, err error) {
+       if !strings.Contains(filename, "/") {
+               filename = os.Getenv("HOME") + "/.config/arvados/" + filename + ".conf"
+       }
+
+       content, err := ioutil.ReadFile(filename)
+
+       if err != nil {
+               return config, "", err
+       }
+
+       lines := strings.Split(string(content), "\n")
+       for _, line := range lines {
+               if line == "" {
+                       continue
+               }
+
+               kv := strings.SplitN(line, "=", 2)
+               key := strings.TrimSpace(kv[0])
+               value := strings.TrimSpace(kv[1])
+
+               switch key {
+               case "ARVADOS_API_TOKEN":
+                       config.APIToken = value
+               case "ARVADOS_API_HOST":
+                       config.APIHost = value
+               case "ARVADOS_API_HOST_INSECURE":
+                       config.APIHostInsecure = matchTrue.MatchString(value)
+               case "ARVADOS_EXTERNAL_CLIENT":
+                       config.ExternalClient = matchTrue.MatchString(value)
+               case "ARVADOS_BLOB_SIGNING_KEY":
+                       blobSigningKey = value
+               }
+       }
+       return
+}
+
+// setup keepclient using the config provided
+func setupKeepClient(config apiConfig, keepServicesJSON string, isDst bool, replications int) (kc *keepclient.KeepClient, err error) {
+       arv := arvadosclient.ArvadosClient{
+               ApiToken:    config.APIToken,
+               ApiServer:   config.APIHost,
+               ApiInsecure: config.APIHostInsecure,
+               Client: &http.Client{Transport: &http.Transport{
+                       TLSClientConfig: &tls.Config{InsecureSkipVerify: config.APIHostInsecure}}},
+               External: config.ExternalClient,
+       }
+
+       // if keepServicesJSON is provided, use it to load services; else, use DiscoverKeepServers
+       if keepServicesJSON == "" {
+               kc, err = keepclient.MakeKeepClient(&arv)
+               if err != nil {
+                       return nil, err
+               }
+       } else {
+               kc = keepclient.New(&arv)
+               err = kc.LoadKeepServicesFromJSON(keepServicesJSON)
+               if err != nil {
+                       return kc, err
+               }
+       }
+
+       if isDst {
+               // Get default replications value from destination, if it is not already provided
+               if replications == 0 {
+                       value, err := arv.Discovery("defaultCollectionReplication")
+                       if err == nil {
+                               replications = int(value.(float64))
+                       } else {
+                               return nil, err
+                       }
+               }
+
+               kc.Want_replicas = replications
+       }
+
+       return kc, nil
+}
+
+// Get unique block locators from src and dst
+// Copy any blocks missing in dst
+func performKeepRsync(kcSrc, kcDst *keepclient.KeepClient, blobSigningKey, prefix string) error {
+       // Get unique locators from src
+       srcIndex, err := getUniqueLocators(kcSrc, prefix)
+       if err != nil {
+               return err
+       }
+
+       // Get unique locators from dst
+       dstIndex, err := getUniqueLocators(kcDst, prefix)
+       if err != nil {
+               return err
+       }
+
+       // Get list of locators found in src, but missing in dst
+       toBeCopied := getMissingLocators(srcIndex, dstIndex)
+
+       // Copy each missing block to dst
+       log.Printf("Before keep-rsync, there are %d blocks in src and %d blocks in dst. Start copying %d blocks from src not found in dst.",
+               len(srcIndex), len(dstIndex), len(toBeCopied))
+
+       err = copyBlocksToDst(toBeCopied, kcSrc, kcDst, blobSigningKey)
+
+       return err
+}
+
+// Get list of unique locators from the specified cluster
+func getUniqueLocators(kc *keepclient.KeepClient, prefix string) (map[string]bool, error) {
+       uniqueLocators := map[string]bool{}
+
+       // Get index and dedup
+       for uuid := range kc.LocalRoots() {
+               reader, err := kc.GetIndex(uuid, prefix)
+               if err != nil {
+                       return uniqueLocators, err
+               }
+               scanner := bufio.NewScanner(reader)
+               for scanner.Scan() {
+                       uniqueLocators[strings.Split(scanner.Text(), " ")[0]] = true
+               }
+       }
+
+       return uniqueLocators, nil
+}
+
+// Get list of locators that are in src but not in dst
+func getMissingLocators(srcLocators, dstLocators map[string]bool) []string {
+       var missingLocators []string
+       for locator := range srcLocators {
+               if _, ok := dstLocators[locator]; !ok {
+                       missingLocators = append(missingLocators, locator)
+               }
+       }
+       return missingLocators
+}
+
+// Copy blocks from src to dst; only those that are missing in dst are copied
+func copyBlocksToDst(toBeCopied []string, kcSrc, kcDst *keepclient.KeepClient, blobSigningKey string) error {
+       total := len(toBeCopied)
+
+       startedAt := time.Now()
+       for done, locator := range toBeCopied {
+               if done == 0 {
+                       log.Printf("Copying data block %d of %d (%.2f%% done): %v", done+1, total,
+                               float64(done)/float64(total)*100, locator)
+               } else {
+                       timePerBlock := time.Since(startedAt) / time.Duration(done)
+                       log.Printf("Copying data block %d of %d (%.2f%% done, %v est. time remaining): %v", done+1, total,
+                               float64(done)/float64(total)*100, timePerBlock*time.Duration(total-done), locator)
+               }
+
+               getLocator := locator
+               expiresAt := time.Now().AddDate(0, 0, 1)
+               if blobSigningKey != "" {
+                       getLocator = keepclient.SignLocator(getLocator, kcSrc.Arvados.ApiToken, expiresAt, []byte(blobSigningKey))
+               }
+
+               reader, len, _, err := kcSrc.Get(getLocator)
+               if err != nil {
+                       return fmt.Errorf("Error getting block: %v %v", locator, err)
+               }
+
+               _, _, err = kcDst.PutHR(getLocator[:32], reader, len)
+               if err != nil {
+                       return fmt.Errorf("Error copying data block: %v %v", locator, err)
+               }
+       }
+
+       log.Printf("Successfully copied to destination %d blocks.", total)
+       return nil
+}
diff --git a/tools/keep-rsync/keep-rsync_test.go b/tools/keep-rsync/keep-rsync_test.go
new file mode 100644 (file)
index 0000000..6fbb535
--- /dev/null
@@ -0,0 +1,476 @@
+package main
+
+import (
+       "crypto/md5"
+       "fmt"
+       "io/ioutil"
+       "os"
+       "strings"
+       "testing"
+       "time"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvadostest"
+       "git.curoverse.com/arvados.git/sdk/go/keepclient"
+
+       . "gopkg.in/check.v1"
+)
+
+// Gocheck boilerplate
+func Test(t *testing.T) {
+       TestingT(t)
+}
+
+// Gocheck boilerplate
+var _ = Suite(&ServerRequiredSuite{})
+var _ = Suite(&ServerNotRequiredSuite{})
+var _ = Suite(&DoMainTestSuite{})
+
+// Tests that require the Keep server running
+type ServerRequiredSuite struct{}
+type ServerNotRequiredSuite struct{}
+type DoMainTestSuite struct{}
+
+func (s *ServerRequiredSuite) SetUpSuite(c *C) {
+       // Start API server
+       arvadostest.StartAPI()
+}
+
+func (s *ServerRequiredSuite) TearDownSuite(c *C) {
+       arvadostest.StopAPI()
+       arvadostest.ResetEnv()
+}
+
+var initialArgs []string
+
+func (s *DoMainTestSuite) SetUpSuite(c *C) {
+       initialArgs = os.Args
+}
+
+var kcSrc, kcDst *keepclient.KeepClient
+var srcKeepServicesJSON, dstKeepServicesJSON, blobSigningKey string
+
+func (s *ServerRequiredSuite) SetUpTest(c *C) {
+       // reset all variables between tests
+       blobSigningKey = ""
+       srcKeepServicesJSON = ""
+       dstKeepServicesJSON = ""
+       kcSrc = &keepclient.KeepClient{}
+       kcDst = &keepclient.KeepClient{}
+}
+
+func (s *ServerRequiredSuite) TearDownTest(c *C) {
+       arvadostest.StopKeep(3)
+}
+
+func (s *DoMainTestSuite) SetUpTest(c *C) {
+       args := []string{"keep-rsync"}
+       os.Args = args
+}
+
+func (s *DoMainTestSuite) TearDownTest(c *C) {
+       os.Args = initialArgs
+}
+
+var testKeepServicesJSON = "{ \"kind\":\"arvados#keepServiceList\", \"etag\":\"\", \"self_link\":\"\", \"offset\":null, \"limit\":null, \"items\":[ { \"href\":\"/keep_services/zzzzz-bi6l4-123456789012340\", \"kind\":\"arvados#keepService\", \"etag\":\"641234567890enhj7hzx432e5\", \"uuid\":\"zzzzz-bi6l4-123456789012340\", \"owner_uuid\":\"zzzzz-tpzed-123456789012345\", \"service_host\":\"keep0.zzzzz.arvadosapi.com\", \"service_port\":25107, \"service_ssl_flag\":false, \"service_type\":\"disk\", \"read_only\":false }, { \"href\":\"/keep_services/zzzzz-bi6l4-123456789012341\", \"kind\":\"arvados#keepService\", \"etag\":\"641234567890enhj7hzx432e5\", \"uuid\":\"zzzzz-bi6l4-123456789012341\", \"owner_uuid\":\"zzzzz-tpzed-123456789012345\", \"service_host\":\"keep0.zzzzz.arvadosapi.com\", \"service_port\":25108, \"service_ssl_flag\":false, \"service_type\":\"disk\", \"read_only\":false } ], \"items_available\":2 }"
+
+// Testing keep-rsync needs two sets of keep services: src and dst.
+// The test setup hence creates 3 servers instead of the default 2,
+// and uses the first 2 as src and the 3rd as dst keep servers.
+func setupRsync(c *C, enforcePermissions bool, replications int) {
+       // srcConfig
+       var srcConfig apiConfig
+       srcConfig.APIHost = os.Getenv("ARVADOS_API_HOST")
+       srcConfig.APIToken = os.Getenv("ARVADOS_API_TOKEN")
+       srcConfig.APIHostInsecure = matchTrue.MatchString(os.Getenv("ARVADOS_API_HOST_INSECURE"))
+
+       // dstConfig
+       var dstConfig apiConfig
+       dstConfig.APIHost = os.Getenv("ARVADOS_API_HOST")
+       dstConfig.APIToken = os.Getenv("ARVADOS_API_TOKEN")
+       dstConfig.APIHostInsecure = matchTrue.MatchString(os.Getenv("ARVADOS_API_HOST_INSECURE"))
+
+       if enforcePermissions {
+               blobSigningKey = "zfhgfenhffzltr9dixws36j1yhksjoll2grmku38mi7yxd66h5j4q9w4jzanezacp8s6q0ro3hxakfye02152hncy6zml2ed0uc"
+       }
+
+       // Start Keep servers
+       arvadostest.StartKeep(3, enforcePermissions)
+
+       // setup keepclients
+       var err error
+       kcSrc, err = setupKeepClient(srcConfig, srcKeepServicesJSON, false, 0)
+       c.Check(err, IsNil)
+
+       kcDst, err = setupKeepClient(dstConfig, dstKeepServicesJSON, true, replications)
+       c.Check(err, IsNil)
+
+       for uuid := range kcSrc.LocalRoots() {
+               if strings.HasSuffix(uuid, "02") {
+                       delete(kcSrc.LocalRoots(), uuid)
+               }
+       }
+       for uuid := range kcSrc.GatewayRoots() {
+               if strings.HasSuffix(uuid, "02") {
+                       delete(kcSrc.GatewayRoots(), uuid)
+               }
+       }
+       for uuid := range kcSrc.WritableLocalRoots() {
+               if strings.HasSuffix(uuid, "02") {
+                       delete(kcSrc.WritableLocalRoots(), uuid)
+               }
+       }
+
+       for uuid := range kcDst.LocalRoots() {
+               if strings.HasSuffix(uuid, "00") || strings.HasSuffix(uuid, "01") {
+                       delete(kcDst.LocalRoots(), uuid)
+               }
+       }
+       for uuid := range kcDst.GatewayRoots() {
+               if strings.HasSuffix(uuid, "00") || strings.HasSuffix(uuid, "01") {
+                       delete(kcDst.GatewayRoots(), uuid)
+               }
+       }
+       for uuid := range kcDst.WritableLocalRoots() {
+               if strings.HasSuffix(uuid, "00") || strings.HasSuffix(uuid, "01") {
+                       delete(kcDst.WritableLocalRoots(), uuid)
+               }
+       }
+
+       if replications == 0 {
+               // Must have got default replications value of 2 from dst discovery document
+               c.Assert(kcDst.Want_replicas, Equals, 2)
+       } else {
+               // Since replications value is provided, it is used
+               c.Assert(kcDst.Want_replicas, Equals, replications)
+       }
+}
+
+func (s *ServerRequiredSuite) TestRsyncPutInOne_GetFromOtherShouldFail(c *C) {
+       setupRsync(c, false, 1)
+
+       // Put a block in src and verify that it is not found in dst
+       testNoCrosstalk(c, "test-data-1", kcSrc, kcDst)
+
+       // Put a block in dst and verify that it is not found in src
+       testNoCrosstalk(c, "test-data-2", kcDst, kcSrc)
+}
+
+func (s *ServerRequiredSuite) TestRsyncWithBlobSigning_PutInOne_GetFromOtherShouldFail(c *C) {
+       setupRsync(c, true, 1)
+
+       // Put a block in src and verify that it is not found in dst
+       testNoCrosstalk(c, "test-data-1", kcSrc, kcDst)
+
+       // Put a block in dst and verify that it is not found in src
+       testNoCrosstalk(c, "test-data-2", kcDst, kcSrc)
+}
+
+// Do a Put in the first and Get from the second,
+// which should raise block not found error.
+func testNoCrosstalk(c *C, testData string, kc1, kc2 *keepclient.KeepClient) {
+       // Put a block using kc1
+       locator, _, err := kc1.PutB([]byte(testData))
+       c.Assert(err, Equals, nil)
+
+       locator = strings.Split(locator, "+")[0]
+       _, _, _, err = kc2.Get(keepclient.SignLocator(locator, kc2.Arvados.ApiToken, time.Now().AddDate(0, 0, 1), []byte(blobSigningKey)))
+       c.Assert(err, NotNil)
+       c.Check(err.Error(), Equals, "Block not found")
+}
+
+// Test keep-rsync initialization, with srcKeepServicesJSON
+func (s *ServerRequiredSuite) TestRsyncInitializeWithKeepServicesJSON(c *C) {
+       srcKeepServicesJSON = testKeepServicesJSON
+
+       setupRsync(c, false, 1)
+
+       localRoots := kcSrc.LocalRoots()
+       c.Check(localRoots, NotNil)
+
+       foundIt := false
+       for k := range localRoots {
+               if k == "zzzzz-bi6l4-123456789012340" {
+                       foundIt = true
+               }
+       }
+       c.Check(foundIt, Equals, true)
+
+       foundIt = false
+       for k := range localRoots {
+               if k == "zzzzz-bi6l4-123456789012341" {
+                       foundIt = true
+               }
+       }
+       c.Check(foundIt, Equals, true)
+}
+
+// Test keep-rsync initialization with default replications count
+func (s *ServerRequiredSuite) TestInitializeRsyncDefaultReplicationsCount(c *C) {
+       setupRsync(c, false, 0)
+}
+
+// Test keep-rsync initialization with replications count argument
+func (s *ServerRequiredSuite) TestInitializeRsyncReplicationsCount(c *C) {
+       setupRsync(c, false, 3)
+}
+
+// Put some blocks in Src and some more in Dst
+// And copy missing blocks from Src to Dst
+func (s *ServerRequiredSuite) TestKeepRsync(c *C) {
+       testKeepRsync(c, false, "")
+}
+
+// Put some blocks in Src and some more in Dst with blob signing enabled.
+// And copy missing blocks from Src to Dst
+func (s *ServerRequiredSuite) TestKeepRsync_WithBlobSigning(c *C) {
+       testKeepRsync(c, true, "")
+}
+
+// Put some blocks in Src and some more in Dst
+// Use prefix while doing rsync
+// And copy missing blocks from Src to Dst
+func (s *ServerRequiredSuite) TestKeepRsync_WithPrefix(c *C) {
+       data := []byte("test-data-4")
+       hash := fmt.Sprintf("%x", md5.Sum(data))
+
+       testKeepRsync(c, false, hash[0:3])
+       c.Check(len(dstIndex) > len(dstLocators), Equals, true)
+}
+
+// Put some blocks in Src and some more in Dst
+// Use prefix not in src while doing rsync
+// And copy missing blocks from Src to Dst
+func (s *ServerRequiredSuite) TestKeepRsync_WithNoSuchPrefixInSrc(c *C) {
+       testKeepRsync(c, false, "999")
+       c.Check(len(dstIndex), Equals, len(dstLocators))
+}
+
+// Put 5 blocks in src. Put 2 of those blocks in dst
+// Hence there are 3 additional blocks in src
+// Also, put 2 extra blocks in dst; they are hence only in dst
+// Run rsync and verify that those 7 blocks are now available in dst
+func testKeepRsync(c *C, enforcePermissions bool, prefix string) {
+       setupRsync(c, enforcePermissions, 1)
+
+       // setupTestData
+       setupTestData(c, prefix)
+
+       err := performKeepRsync(kcSrc, kcDst, blobSigningKey, prefix)
+       c.Check(err, IsNil)
+
+       // Now GetIndex from dst and verify that all 5 from src and the 2 extra blocks are found
+       dstIndex, err = getUniqueLocators(kcDst, "")
+       c.Check(err, IsNil)
+
+       for _, locator := range srcLocatorsMatchingPrefix {
+               _, ok := dstIndex[locator]
+               c.Assert(ok, Equals, true)
+       }
+
+       for _, locator := range extraDstLocators {
+               _, ok := dstIndex[locator]
+               c.Assert(ok, Equals, true)
+       }
+
+       if prefix == "" {
+               // all blocks from src and the two extra blocks
+               c.Assert(len(dstIndex), Equals, len(srcLocators)+len(extraDstLocators))
+       } else {
+               // 1 matching prefix and copied over, 2 that were initially copied into dst along with src, and the 2 extra blocks
+               c.Assert(len(dstIndex), Equals, len(srcLocatorsMatchingPrefix)+len(extraDstLocators)+2)
+       }
+}
+
+// Setup test data in src and dst.
+var srcLocators, srcLocatorsMatchingPrefix, dstLocators, extraDstLocators []string
+var dstIndex map[string]bool
+
+func setupTestData(c *C, indexPrefix string) {
+       srcLocators = []string{}
+       srcLocatorsMatchingPrefix = []string{}
+       dstLocators = []string{}
+       extraDstLocators = []string{}
+       dstIndex = make(map[string]bool)
+
+       // Put a few blocks in src using kcSrc
+       for i := 0; i < 5; i++ {
+               hash, _, err := kcSrc.PutB([]byte(fmt.Sprintf("test-data-%d", i)))
+               c.Check(err, IsNil)
+
+               srcLocators = append(srcLocators, strings.Split(hash, "+A")[0])
+               if strings.HasPrefix(hash, indexPrefix) {
+                       srcLocatorsMatchingPrefix = append(srcLocatorsMatchingPrefix, strings.Split(hash, "+A")[0])
+               }
+       }
+
+       // Put first two of those src blocks in dst using kcDst
+       for i := 0; i < 2; i++ {
+               hash, _, err := kcDst.PutB([]byte(fmt.Sprintf("test-data-%d", i)))
+               c.Check(err, IsNil)
+               dstLocators = append(dstLocators, strings.Split(hash, "+A")[0])
+       }
+
+       // Put two more blocks in dst; they are not in src at all
+       for i := 0; i < 2; i++ {
+               hash, _, err := kcDst.PutB([]byte(fmt.Sprintf("other-data-%d", i)))
+               c.Check(err, IsNil)
+               dstLocators = append(dstLocators, strings.Split(hash, "+A")[0])
+               extraDstLocators = append(extraDstLocators, strings.Split(hash, "+A")[0])
+       }
+}
+
+// Setup rsync using srcKeepServicesJSON with fake keepservers.
+// Expect error during performKeepRsync due to unreachable src keepservers.
+func (s *ServerRequiredSuite) TestErrorDuringRsync_FakeSrcKeepservers(c *C) {
+       srcKeepServicesJSON = testKeepServicesJSON
+
+       setupRsync(c, false, 1)
+
+       err := performKeepRsync(kcSrc, kcDst, "", "")
+       c.Check(strings.HasSuffix(err.Error(), "no such host"), Equals, true)
+}
+
+// Setup rsync using dstKeepServicesJSON with fake keepservers.
+// Expect error during performKeepRsync due to unreachable dst keepservers.
+func (s *ServerRequiredSuite) TestErrorDuringRsync_FakeDstKeepservers(c *C) {
+       dstKeepServicesJSON = testKeepServicesJSON
+
+       setupRsync(c, false, 1)
+
+       err := performKeepRsync(kcSrc, kcDst, "", "")
+       c.Check(strings.HasSuffix(err.Error(), "no such host"), Equals, true)
+}
+
+// Test rsync with signature error during Get from src.
+func (s *ServerRequiredSuite) TestErrorDuringRsync_ErrorGettingBlockFromSrc(c *C) {
+       setupRsync(c, true, 1)
+
+       // put some blocks in src and dst
+       setupTestData(c, "")
+
+       // Change blob signing key to a fake key, so that Get from src fails
+       blobSigningKey = "thisisfakeblobsigningkey"
+
+       err := performKeepRsync(kcSrc, kcDst, blobSigningKey, "")
+       c.Check(strings.HasSuffix(err.Error(), "Block not found"), Equals, true)
+}
+
+// Test rsync with error during Put to src.
+func (s *ServerRequiredSuite) TestErrorDuringRsync_ErrorPuttingBlockInDst(c *C) {
+       setupRsync(c, false, 1)
+
+       // put some blocks in src and dst
+       setupTestData(c, "")
+
+       // Increase Want_replicas on dst to result in insufficient replicas error during Put
+       kcDst.Want_replicas = 2
+
+       err := performKeepRsync(kcSrc, kcDst, blobSigningKey, "")
+       c.Check(strings.HasSuffix(err.Error(), "Could not write sufficient replicas"), Equals, true)
+}
+
+// Test loadConfig func
+func (s *ServerNotRequiredSuite) TestLoadConfig(c *C) {
+       // Setup a src config file
+       srcFile := setupConfigFile(c, "src-config")
+       defer os.Remove(srcFile.Name())
+       srcConfigFile := srcFile.Name()
+
+       // Setup a dst config file
+       dstFile := setupConfigFile(c, "dst-config")
+       defer os.Remove(dstFile.Name())
+       dstConfigFile := dstFile.Name()
+
+       // load configuration from those files
+       srcConfig, srcBlobSigningKey, err := loadConfig(srcConfigFile)
+       c.Check(err, IsNil)
+
+       c.Assert(srcConfig.APIHost, Equals, os.Getenv("ARVADOS_API_HOST"))
+       c.Assert(srcConfig.APIToken, Equals, os.Getenv("ARVADOS_API_TOKEN"))
+       c.Assert(srcConfig.APIHostInsecure, Equals, matchTrue.MatchString(os.Getenv("ARVADOS_API_HOST_INSECURE")))
+       c.Assert(srcConfig.ExternalClient, Equals, false)
+
+       dstConfig, _, err := loadConfig(dstConfigFile)
+       c.Check(err, IsNil)
+
+       c.Assert(dstConfig.APIHost, Equals, os.Getenv("ARVADOS_API_HOST"))
+       c.Assert(dstConfig.APIToken, Equals, os.Getenv("ARVADOS_API_TOKEN"))
+       c.Assert(dstConfig.APIHostInsecure, Equals, matchTrue.MatchString(os.Getenv("ARVADOS_API_HOST_INSECURE")))
+       c.Assert(dstConfig.ExternalClient, Equals, false)
+
+       c.Assert(srcBlobSigningKey, Equals, "abcdefg")
+}
+
+// Test loadConfig func without setting up the config files
+func (s *ServerNotRequiredSuite) TestLoadConfig_MissingSrcConfig(c *C) {
+       _, _, err := loadConfig("")
+       c.Assert(err.Error(), Equals, "config file not specified")
+}
+
+// Test loadConfig func - error reading config
+func (s *ServerNotRequiredSuite) TestLoadConfig_ErrorLoadingSrcConfig(c *C) {
+       _, _, err := loadConfig("no-such-config-file")
+       c.Assert(strings.HasSuffix(err.Error(), "no such file or directory"), Equals, true)
+}
+
+func setupConfigFile(c *C, name string) *os.File {
+       // Setup a config file
+       file, err := ioutil.TempFile(os.TempDir(), name)
+       c.Check(err, IsNil)
+
+       fileContent := "ARVADOS_API_HOST=" + os.Getenv("ARVADOS_API_HOST") + "\n"
+       fileContent += "ARVADOS_API_TOKEN=" + os.Getenv("ARVADOS_API_TOKEN") + "\n"
+       fileContent += "ARVADOS_API_HOST_INSECURE=" + os.Getenv("ARVADOS_API_HOST_INSECURE") + "\n"
+       fileContent += "ARVADOS_EXTERNAL_CLIENT=false\n"
+       fileContent += "ARVADOS_BLOB_SIGNING_KEY=abcdefg"
+
+       _, err = file.Write([]byte(fileContent))
+       c.Check(err, IsNil)
+
+       return file
+}
+
+func (s *DoMainTestSuite) Test_doMain_NoSrcConfig(c *C) {
+       err := doMain()
+       c.Check(err, NotNil)
+       c.Assert(err.Error(), Equals, "Error loading src configuration from file: config file not specified")
+}
+
+func (s *DoMainTestSuite) Test_doMain_SrcButNoDstConfig(c *C) {
+       srcConfig := setupConfigFile(c, "src")
+       args := []string{"-replications", "3", "-src", srcConfig.Name()}
+       os.Args = append(os.Args, args...)
+       err := doMain()
+       c.Check(err, NotNil)
+       c.Assert(err.Error(), Equals, "Error loading dst configuration from file: config file not specified")
+}
+
+func (s *DoMainTestSuite) Test_doMain_BadSrcConfig(c *C) {
+       args := []string{"-src", "abcd"}
+       os.Args = append(os.Args, args...)
+       err := doMain()
+       c.Check(err, NotNil)
+       c.Assert(strings.HasPrefix(err.Error(), "Error loading src configuration from file: Error reading config file"), Equals, true)
+}
+
+func (s *DoMainTestSuite) Test_doMain_WithReplicationsButNoSrcConfig(c *C) {
+       args := []string{"-replications", "3"}
+       os.Args = append(os.Args, args...)
+       err := doMain()
+       c.Check(err, NotNil)
+       c.Assert(err.Error(), Equals, "Error loading src configuration from file: config file not specified")
+}
+
+func (s *DoMainTestSuite) Test_doMainWithSrcAndDstConfig(c *C) {
+       srcConfig := setupConfigFile(c, "src")
+       dstConfig := setupConfigFile(c, "dst")
+       args := []string{"-src", srcConfig.Name(), "-dst", dstConfig.Name()}
+       os.Args = append(os.Args, args...)
+
+       // Start keepservers. Since we are not doing any tweaking as in setupRsync func,
+       // kcSrc and kcDst will be the same and no actual copying to dst will happen, but that's ok.
+       arvadostest.StartKeep(2, false)
+
+       err := doMain()
+       c.Check(err, IsNil)
+}