Merge branch '5353-node-sizes' closes #5353
authorPeter Amstutz <peter.amstutz@curoverse.com>
Wed, 18 Nov 2015 17:09:47 +0000 (12:09 -0500)
committerPeter Amstutz <peter.amstutz@curoverse.com>
Wed, 18 Nov 2015 17:09:47 +0000 (12:09 -0500)
54 files changed:
apps/workbench/app/controllers/collections_controller.rb
apps/workbench/app/views/application/404.html.erb
apps/workbench/app/views/application/_report_error.html.erb
apps/workbench/app/views/layouts/body.html.erb
apps/workbench/config/application.default.yml
apps/workbench/test/controllers/application_controller_test.rb
apps/workbench/test/controllers/collections_controller_test.rb
apps/workbench/test/controllers/projects_controller_test.rb
apps/workbench/test/helpers/download_helper.rb [new file with mode: 0644]
apps/workbench/test/integration/collection_upload_test.rb
apps/workbench/test/integration/download_test.rb [new file with mode: 0644]
apps/workbench/test/integration_helper.rb
apps/workbench/test/test_helper.rb
doc/install/install-api-server.html.textile.liquid
doc/install/install-arv-git-httpd.html.textile.liquid
doc/install/install-crunch-dispatch.html.textile.liquid
doc/install/install-keep-web.html.textile.liquid
doc/install/install-keepproxy.html.textile.liquid
doc/install/install-workbench-app.html.textile.liquid
sdk/cli/bin/arv-run-pipeline-instance
sdk/cli/bin/crunch-job
sdk/go/arvadosclient/arvadosclient.go
sdk/go/arvadosclient/arvadosclient_test.go
sdk/go/arvadostest/fixtures.go
sdk/go/arvadostest/run_servers.go
sdk/go/keepclient/discover.go [new file with mode: 0644]
sdk/go/keepclient/discover_test.go [new file with mode: 0644]
sdk/go/keepclient/support.go
sdk/python/arvados/keep.py
sdk/python/tests/arvados_testutil.py
sdk/python/tests/keepstub.py
sdk/python/tests/manifest_examples.py
sdk/python/tests/nginx.conf
sdk/python/tests/run_test_server.py
sdk/python/tests/test_arv_ls.py
sdk/python/tests/test_arvfile.py
sdk/python/tests/test_collections.py
sdk/python/tests/test_keep_client.py
services/api/app/controllers/database_controller.rb
services/api/test/fixtures/api_client_authorizations.yml
services/datamanager/datamanager_test.go
services/fuse/arvados_fuse/fusedir.py
services/fuse/bin/arv-mount
services/fuse/tests/test_mount.py
services/keep-web/anonymous.go
services/keep-web/doc.go
services/keep-web/handler.go
services/keep-web/handler_test.go
services/keep-web/main.go
services/keep-web/server.go
services/keepproxy/keepproxy.go
services/keepproxy/keepproxy_test.go
services/keepstore/pull_worker_integration_test.go
tools/keep-rsync/keep-rsync_test.go

index e01151ca408b567ec9908349bdfd7a51bcd15a2f..f8b359c89060cd9d2703b9303cffaf40f5eef54f 100644 (file)
@@ -1,4 +1,5 @@
 require "arvados/keep"
+require "uri"
 
 class CollectionsController < ApplicationController
   include ActionController::Live
@@ -130,11 +131,34 @@ class CollectionsController < ApplicationController
     usable_token = find_usable_token(tokens) do
       coll = Collection.find(params[:uuid])
     end
+    if usable_token.nil?
+      # Response already rendered.
+      return
+    end
+
+    # If we are configured to use a keep-web server, just redirect to
+    # the appropriate URL.
+    if Rails.configuration.keep_web_url or
+        Rails.configuration.keep_web_download_url
+      opts = {}
+      if usable_token == params[:reader_token]
+        opts[:path_token] = usable_token
+      elsif usable_token == Rails.configuration.anonymous_user_token
+        # Don't pass a token at all
+      else
+        # We pass the current user's real token only if it's necessary
+        # to read the collection.
+        opts[:query_token] = usable_token
+      end
+      opts[:disposition] = params[:disposition] if params[:disposition]
+      return redirect_to keep_web_url(params[:uuid], params[:file], opts)
+    end
+
+    # No keep-web server available. Get the file data with arv-get,
+    # and serve it through Rails.
 
     file_name = params[:file].andand.sub(/^(\.\/|\/|)/, './')
-    if usable_token.nil?
-      return  # Response already rendered.
-    elsif file_name.nil? or not coll.manifest.has_file?(file_name)
+    if file_name.nil? or not coll.manifest.has_file?(file_name)
       return render_not_found
     end
 
@@ -305,6 +329,61 @@ class CollectionsController < ApplicationController
     return nil
   end
 
+  def keep_web_url(uuid_or_pdh, file, opts)
+    munged_id = uuid_or_pdh.sub('+', '-')
+    fmt = {uuid_or_pdh: munged_id}
+
+    tmpl = Rails.configuration.keep_web_url
+    if Rails.configuration.keep_web_download_url and
+        (!tmpl or opts[:disposition] == 'attachment')
+      # Prefer the attachment-only-host when we want an attachment
+      # (and when there is no preview link configured)
+      tmpl = Rails.configuration.keep_web_download_url
+    else
+      check_uri = URI.parse(tmpl % fmt)
+      if opts[:query_token] and
+          not check_uri.host.start_with?(munged_id + "--") and
+          not check_uri.host.start_with?(munged_id + ".")
+        # We're about to pass a token in the query string, but
+        # keep-web can't accept that safely at a single-origin URL
+        # template (unless it's -attachment-only-host).
+        tmpl = Rails.configuration.keep_web_download_url
+        if not tmpl
+          raise ArgumentError, "Download precluded by site configuration"
+        end
+        logger.warn("Using download link, even though inline content " \
+                    "was requested: #{check_uri.to_s}")
+      end
+    end
+
+    if tmpl == Rails.configuration.keep_web_download_url
+      # This takes us to keep-web's -attachment-only-host so there is
+      # no need to add ?disposition=attachment.
+      opts.delete :disposition
+    end
+
+    uri = URI.parse(tmpl % fmt)
+    uri.path += '/' unless uri.path.end_with? '/'
+    if opts[:path_token]
+      uri.path += 't=' + opts[:path_token] + '/'
+    end
+    uri.path += '_/'
+    uri.path += URI.escape(file)
+
+    query = Hash[URI.decode_www_form(uri.query || '')]
+    { query_token: 'api_token',
+      disposition: 'disposition' }.each do |opt, param|
+      if opts.include? opt
+        query[param] = opts[opt]
+      end
+    end
+    unless query.empty?
+      uri.query = URI.encode_www_form(query)
+    end
+
+    uri.to_s
+  end
+
   # Note: several controller and integration tests rely on stubbing
   # file_enumerator to return fake file content.
   def file_enumerator opts
index aa1ffda017120bb085997e130074e1bd799dace9..ea6e7033cb23c113fecfebfbb3dad8e6a7c01533 100644 (file)
 <% if !current_user %>
 
   <p>
-    (I notice you are not logged in. If you're looking for a private
-    page, you'll need to <%=link_to 'log in', arvados_api_client.arvados_login_url(return_to: strip_token_from_path(request.url))%> first.)
+    <%= link_to(arvados_api_client.arvados_login_url(return_to: strip_token_from_path(request.url)),
+                {class: "btn btn-primary report-issue-modal-window"}) do %>
+      <i class="fa fa-fw fa-sign-in"></i> Log in
+    <% end %>
+    to view private data.
   </p>
 
 <% elsif class_name %>
index 2e449b79d718ef53d7c7e507f46509867447ee2d..d9573a210bfcc6c06b0a9546eb6214fb5ddd13bf 100644 (file)
@@ -1,21 +1,31 @@
+<%
+   popup_params = {
+     popup_type: 'report',
+     current_location: request.url,
+     current_path: request.fullpath,
+     action_method: 'post',
+   }
+   if error_type == "api"
+     popup_params.merge!(
+       api_error_request_url: api_error.andand.request_url || "",
+       api_error_response: api_error.andand.api_response || "",
+     )
+   else
+     popup_params.merge!(error_message: error_message)
+   end
+%>
+
 <p>
-<br/><strong>If you suspect this is a bug, you can help us fix it by sending us a problem report:</strong><br/><br/>
-<% if error_type == 'api' %>
-  <%
-    api_request_url = api_error.andand.request_url ? api_error.request_url : ''
-    api_error_response = api_error.andand.api_response ? api_error.api_response : ''
-  %>
-  Send a problem report right here. <%= link_to report_issue_popup_path(popup_type: 'report', current_location: request.url, current_path: request.fullpath, action_method: 'post', api_error_request_url: api_request_url, api_error_response: api_error_response),
-        {class: 'btn btn-primary report-issue-modal-window', :remote => true, return_to: request.url} do %>
-        <i class="fa fa-fw fa-support"></i> Report problem
-  <% end %>
-<% else %>
-  Send a problem report right here. <%= link_to report_issue_popup_path(popup_type: 'report', current_location: request.url, current_path: request.fullpath, action_method: 'post', error_message: error_message),
-        {class: 'btn btn-primary report-issue-modal-window', :remote => true, return_to: request.url} do %>
-        <i class="fa fa-fw fa-support"></i> Report problem
-  <% end %>
+<%= link_to(report_issue_popup_path(popup_params),
+            {class: 'btn btn-primary report-issue-modal-window', :remote => true, return_to: request.url}) do %>
+  <i class="fa fa-fw fa-support"></i> Report problem
 <% end %>
-<% support_email = Rails.configuration.support_email_address%>
-<br/><br/>
-  If you prefer, send email to: <a href="mailto:<%=support_email%>?subject=Workbench problem report&amp;body=Problem while viewing page <%=request.url%>"><%=support_email%></a>
+
+or
+
+<%= mail_to(Rails.configuration.support_email_address, "email us",
+            subject: "Workbench problem report",
+            body: "Problem while viewing page #{request.url}") %>
+
+if you suspect this is a bug.
 </p>
index 22ccc2f91dd69a7c88fd66bc10e7f817de8bda1d..acb056e9c970635eeef55bee7f1638789ac88fd2 100644 (file)
               <li><%= link_to 'Browse public projects', "/projects/public" %></li>
             <% end %>
             <li class="dropdown hover-dropdown login-menu">
-              <a href="<%= arvados_api_client.arvados_login_url(return_to: root_url) %>">Log in</a>
+              <a href="<%= arvados_api_client.arvados_login_url(return_to: request.url) %>">Log in</a>
               <ul class="dropdown-menu">
                 <li>
-                  <a href="<%= arvados_api_client.arvados_login_url(return_to: root_url) %>">
+                  <a href="<%= arvados_api_client.arvados_login_url(return_to: request.url) %>">
                     <span class="fa fa-lg fa-sign-in"></span>
                     <p style="margin-left: 1.6em; margin-top: -1.35em; margin-bottom: 0em; margin-right: 0.5em;">Log in or register with<br/>any Google account</p>
                   </a>
index 00959bbb3bea30568e62ddb82c9f0d7f733fb44d..c4a92d273950bbebe15b0faf29ccbe58b3cde3b4 100644 (file)
@@ -225,3 +225,32 @@ common:
   # E.g., using a name-based proxy server to forward connections to shell hosts:
   # https://%{hostname}.webshell.uuid_prefix.arvadosapi.com/
   shell_in_a_box_url: false
+
+  # Format of preview links. If false, use keep_web_download_url
+  # instead, and disable inline preview. If both are false, use
+  # Workbench's built-in file download/preview mechanism.
+  #
+  # Examples:
+  # keep_web_url: https://%{uuid_or_pdh}.collections.uuid_prefix.arvadosapi.com
+  # keep_web_url: https://%{uuid_or_pdh}--collections.uuid_prefix.arvadosapi.com
+  #
+  # Example supporting only public data and collection-sharing links
+  # (other data will be handled as downloads via keep_web_download_url):
+  # keep_web_url: https://collections.uuid_prefix.arvadosapi.com/c=%{uuid_or_pdh}
+  keep_web_url: false
+
+  # Format of download links. If false, use keep_web_url with
+  # disposition=attachment query param.
+  #
+  # The host part of the keep_web_download_url value here must match
+  # the -attachment-only-host argument given to keep-web: if
+  # keep_web_download_url is "https://FOO.EXAMPLE/c=..." then keep-web
+  # must run with "-attachment-only-host=FOO.EXAMPLE".
+  #
+  # If keep_web_download_url is false, and keep_web_url uses a
+  # single-origin form, then Workbench will show an error page
+  # when asked to download or preview private data.
+  #
+  # Example:
+  # keep_web_download_url: https://download.uuid_prefix.arvadosapi.com/c=%{uuid_or_pdh}
+  keep_web_download_url: false
index 15d52da3c34f5f12a45091968d5b875f3909872f..ef2a989427948a693c1257e7466f09e523c43c7b 100644 (file)
@@ -334,6 +334,26 @@ class ApplicationControllerTest < ActionController::TestCase
     assert_response 404
   end
 
+  [".navbar .login-menu a",
+   ".navbar .login-menu .dropdown-menu a"
+  ].each do |css_selector|
+    test "login link at #{css_selector.inspect} includes return_to param" do
+      # Without an anonymous token, we're immediately redirected to login.
+      Rails.configuration.anonymous_user_token =
+        api_fixture("api_client_authorizations", "anonymous", "api_token")
+      @controller = ProjectsController.new
+      test_uuid = "zzzzz-j7d0g-zzzzzzzzzzzzzzz"
+      get(:show, {id: test_uuid})
+      login_link = css_select(css_selector).first
+      assert_not_nil(login_link, "failed to select login link")
+      login_href = URI.unescape(login_link.attributes["href"])
+      # The parameter needs to include the full URL to work.
+      assert_includes(login_href, "://")
+      assert_match(/[\?&]return_to=[^&]*\/projects\/#{test_uuid}(&|$)/,
+                   login_href)
+    end
+  end
+
   test "Workbench returns 4xx when API server is unreachable" do
     # We're really testing ApplicationController's render_exception.
     # Our primary concern is that it doesn't raise an error and
index 13644e00bdce28db3460aa2f722f679deb107c7e..978a5133578c9bafa2adb98e6b7b86d455adfe09 100644 (file)
@@ -10,6 +10,15 @@ class CollectionsControllerTest < ActionController::TestCase
 
   NONEXISTENT_COLLECTION = "ffffffffffffffffffffffffffffffff+0"
 
+  def config_anonymous enable
+    Rails.configuration.anonymous_user_token =
+      if enable
+        api_fixture('api_client_authorizations')['anonymous']['api_token']
+      else
+        false
+      end
+  end
+
   def stub_file_content
     # For the duration of the current test case, stub file download
     # content with a randomized (but recognizable) string. Return the
@@ -167,8 +176,7 @@ class CollectionsControllerTest < ActionController::TestCase
   end
 
   test 'anonymous download' do
-    Rails.configuration.anonymous_user_token =
-      api_fixture('api_client_authorizations')['anonymous']['api_token']
+    config_anonymous true
     expect_content = stub_file_content
     get :show_file, {
       uuid: api_fixture('collections')['user_agreement_in_anonymously_accessible_project']['uuid'],
@@ -205,15 +213,14 @@ class CollectionsControllerTest < ActionController::TestCase
                      "using a reader token set the session's API token")
   end
 
-  [false, api_fixture('api_client_authorizations')['anonymous']['api_token']].
-    each do |anon_conf|
-    test "download a file using a reader token with insufficient scope (anon_conf=#{!!anon_conf})" do
-      Rails.configuration.anonymous_user_token = anon_conf
+  [false, true].each do |anon|
+    test "download a file using a reader token with insufficient scope, anon #{anon}" do
+      config_anonymous anon
       params = collection_params(:foo_file, 'foo')
       params[:reader_token] =
         api_fixture('api_client_authorizations')['active_noscope']['api_token']
       get(:show_file, params)
-      if anon_conf
+      if anon
         # Some files can be shown without a valid token, but not this one.
         assert_response 404
       else
@@ -463,8 +470,7 @@ class CollectionsControllerTest < ActionController::TestCase
   end
 
   test "anonymous user accesses collection in shared project" do
-    Rails.configuration.anonymous_user_token =
-      api_fixture('api_client_authorizations')['anonymous']['api_token']
+    config_anonymous true
     collection = api_fixture('collections')['public_text_file']
     get(:show, {id: collection['uuid']})
 
@@ -514,4 +520,109 @@ class CollectionsControllerTest < ActionController::TestCase
     get :show, {id: api_fixture('collections')['user_agreement']['uuid']}, session_for(:active)
     assert_not_includes @response.body, '<a href="#Upload"'
   end
+
+  def setup_for_keep_web cfg='https://%{uuid_or_pdh}.example', dl_cfg=false
+    Rails.configuration.keep_web_url = cfg
+    Rails.configuration.keep_web_download_url = dl_cfg
+    @controller.expects(:file_enumerator).never
+  end
+
+  %w(uuid portable_data_hash).each do |id_type|
+    test "Redirect to keep_web_url via #{id_type}" do
+      setup_for_keep_web
+      tok = api_fixture('api_client_authorizations')['active']['api_token']
+      id = api_fixture('collections')['w_a_z_file'][id_type]
+      get :show_file, {uuid: id, file: "w a z"}, session_for(:active)
+      assert_response :redirect
+      assert_equal "https://#{id.sub '+', '-'}.example/_/w%20a%20z?api_token=#{tok}", @response.redirect_url
+    end
+
+    test "Redirect to keep_web_url via #{id_type} with reader token" do
+      setup_for_keep_web
+      tok = api_fixture('api_client_authorizations')['active']['api_token']
+      id = api_fixture('collections')['w_a_z_file'][id_type]
+      get :show_file, {uuid: id, file: "w a z", reader_token: tok}, session_for(:expired)
+      assert_response :redirect
+      assert_equal "https://#{id.sub '+', '-'}.example/t=#{tok}/_/w%20a%20z", @response.redirect_url
+    end
+
+    test "Redirect to keep_web_url via #{id_type} with no token" do
+      setup_for_keep_web
+      config_anonymous true
+      id = api_fixture('collections')['public_text_file'][id_type]
+      get :show_file, {uuid: id, file: "Hello World.txt"}
+      assert_response :redirect
+      assert_equal "https://#{id.sub '+', '-'}.example/_/Hello%20World.txt", @response.redirect_url
+    end
+
+    test "Redirect to keep_web_url via #{id_type} with disposition param" do
+      setup_for_keep_web
+      config_anonymous true
+      id = api_fixture('collections')['public_text_file'][id_type]
+      get :show_file, {
+        uuid: id,
+        file: "Hello World.txt",
+        disposition: 'attachment',
+      }
+      assert_response :redirect
+      assert_equal "https://#{id.sub '+', '-'}.example/_/Hello%20World.txt?disposition=attachment", @response.redirect_url
+    end
+
+    test "Redirect to keep_web_download_url via #{id_type}" do
+      setup_for_keep_web('https://collections.example/c=%{uuid_or_pdh}',
+                         'https://download.example/c=%{uuid_or_pdh}')
+      tok = api_fixture('api_client_authorizations')['active']['api_token']
+      id = api_fixture('collections')['w_a_z_file'][id_type]
+      get :show_file, {uuid: id, file: "w a z"}, session_for(:active)
+      assert_response :redirect
+      assert_equal "https://download.example/c=#{id.sub '+', '-'}/_/w%20a%20z?api_token=#{tok}", @response.redirect_url
+    end
+  end
+
+  [false, true].each do |anon|
+    test "No redirect to keep_web_url if collection not found, anon #{anon}" do
+      setup_for_keep_web
+      config_anonymous anon
+      id = api_fixture('collections')['w_a_z_file']['uuid']
+      get :show_file, {uuid: id, file: "w a z"}, session_for(:spectator)
+      assert_response 404
+    end
+
+    test "Redirect download to keep_web_download_url, anon #{anon}" do
+      config_anonymous anon
+      setup_for_keep_web('https://collections.example/c=%{uuid_or_pdh}',
+                         'https://download.example/c=%{uuid_or_pdh}')
+      tok = api_fixture('api_client_authorizations')['active']['api_token']
+      id = api_fixture('collections')['public_text_file']['uuid']
+      get :show_file, {
+        uuid: id,
+        file: 'Hello world.txt',
+        disposition: 'attachment',
+      }, session_for(:active)
+      assert_response :redirect
+      expect_url = "https://download.example/c=#{id.sub '+', '-'}/_/Hello%20world.txt"
+      if not anon
+        expect_url += "?api_token=#{tok}"
+      end
+      assert_equal expect_url, @response.redirect_url
+    end
+  end
+
+  test "Error if file is impossible to retrieve from keep_web_url" do
+    # Cannot pass a session token using a single-origin keep-web URL,
+    # cannot read this collection without a session token.
+    setup_for_keep_web 'https://collections.example/c=%{uuid_or_pdh}', false
+    id = api_fixture('collections')['w_a_z_file']['uuid']
+    get :show_file, {uuid: id, file: "w a z"}, session_for(:active)
+    assert_response 422
+  end
+
+  test "Redirect preview to keep_web_download_url when preview is disabled" do
+    setup_for_keep_web false, 'https://download.example/c=%{uuid_or_pdh}'
+    tok = api_fixture('api_client_authorizations')['active']['api_token']
+    id = api_fixture('collections')['w_a_z_file']['uuid']
+    get :show_file, {uuid: id, file: "w a z"}, session_for(:active)
+    assert_response :redirect
+    assert_equal "https://download.example/c=#{id.sub '+', '-'}/_/w%20a%20z?api_token=#{tok}", @response.redirect_url
+  end
 end
index 3416cc0e61026c9ef5d4e7caee3baf4289095ca2..8fa9fe9a817e7f2e3b6494099b85afc53c05ac57 100644 (file)
@@ -239,7 +239,7 @@ class ProjectsControllerTest < ActionController::TestCase
     Rails.configuration.anonymous_user_token = api_fixture('api_client_authorizations')['anonymous']['api_token']
     get(:show, {id: api_fixture('groups')['aproject']['uuid']})
     assert_response 404
-    assert_includes @response.inspect, 'you are not logged in'
+    assert_match(/log ?in/i, @response.body)
   end
 
   test "visit home page as anonymous when anonymous browsing is enabled and expect login" do
diff --git a/apps/workbench/test/helpers/download_helper.rb b/apps/workbench/test/helpers/download_helper.rb
new file mode 100644 (file)
index 0000000..8f387f4
--- /dev/null
@@ -0,0 +1,21 @@
+module DownloadHelper
+  module_function
+
+  def path
+    Rails.root.join 'tmp', 'downloads'
+  end
+
+  def clear
+    FileUtils.rm_r path
+    begin
+      Dir.mkdir path
+    rescue Errno::EEXIST
+    end
+  end
+
+  def done
+    Dir[path.join '*'].reject do |f|
+      /\.part$/ =~ f
+    end
+  end
+end
index 62efee4d67e6b4e5a84e2340bcc55902b18ba30d..903df90fb419c7ba231ae3c42d679abc85af41ed 100644 (file)
@@ -7,9 +7,19 @@ class CollectionUploadTest < ActionDispatch::IntegrationTest
         io.write content
       end
     end
+    # Database reset doesn't restore KeepServices; we have to
+    # save/restore manually.
+    use_token :admin do
+      @keep_services = KeepService.all.to_a
+    end
   end
 
   teardown do
+    use_token :admin do
+      @keep_services.each do |ks|
+        KeepService.find(ks.uuid).update_attributes(ks.attributes)
+      end
+    end
     testfiles.each do |filename, _|
       File.unlink(testfile_path filename)
     end
@@ -42,7 +52,7 @@ class CollectionUploadTest < ActionDispatch::IntegrationTest
     assert_match /_text":"\. d41d8\S+ 0:0:empty.txt\\n\. d41d8\S+ 0:0:empty\\\\040\(1\).txt\\n"/, body
   end
 
-  test "Upload non-empty files, report errors" do
+  test "Upload non-empty files" do
     need_selenium "to make file uploads work"
     visit page_with_token 'active', sandbox_path
     find('.nav-tabs a', text: 'Upload').click
@@ -50,24 +60,17 @@ class CollectionUploadTest < ActionDispatch::IntegrationTest
     attach_file 'file_selector', testfile_path('foo.txt')
     assert_selector 'button:not([disabled])', text: 'Start'
     click_button 'Start'
-    if "test environment does not have a keepproxy yet, see #4534" != "fixed"
-      using_wait_time 20 do
-        assert_text :visible, 'error'
-      end
-    else
-      assert_text :visible, 'Done!'
-      visit sandbox_path+'.json'
-      assert_match /_text":"\. 0cc1\S+ 0:1:a\\n\. acbd\S+ 0:3:foo.txt\\n"/, body
-    end
+    assert_text :visible, 'Done!'
+    visit sandbox_path+'.json'
+    assert_match /_text":"\. 0cc1\S+ 0:1:a\\n\. acbd\S+ 0:3:foo.txt\\n"/, body
   end
 
   test "Report mixed-content error" do
     skip 'Test suite does not use TLS'
     need_selenium "to make file uploads work"
-    begin
-      use_token :admin
-      proxy = KeepService.find(api_fixture('keep_services')['proxy']['uuid'])
-      proxy.update_attributes service_ssl_flag: false
+    use_token :admin do
+      KeepService.where(service_type: 'proxy').first.
+        update_attributes(service_ssl_flag: false)
     end
     visit page_with_token 'active', sandbox_path
     find('.nav-tabs a', text: 'Upload').click
@@ -82,11 +85,12 @@ class CollectionUploadTest < ActionDispatch::IntegrationTest
 
   test "Report network error" do
     need_selenium "to make file uploads work"
-    begin
-      use_token :admin
-      proxy = KeepService.find(api_fixture('keep_services')['proxy']['uuid'])
-      # Even if you somehow do port>2^16, surely nx.example.net won't respond
-      proxy.update_attributes service_host: 'nx.example.net', service_port: 99999
+    use_token :admin do
+      # Even if you somehow do port>2^16, surely nx.example.net won't
+      # respond
+      KeepService.where(service_type: 'proxy').first.
+        update_attributes(service_host: 'nx.example.net',
+                          service_port: 99999)
     end
     visit page_with_token 'active', sandbox_path
     find('.nav-tabs a', text: 'Upload').click
diff --git a/apps/workbench/test/integration/download_test.rb b/apps/workbench/test/integration/download_test.rb
new file mode 100644 (file)
index 0000000..ed91ae0
--- /dev/null
@@ -0,0 +1,94 @@
+require 'integration_helper'
+require 'helpers/download_helper'
+
+class DownloadTest < ActionDispatch::IntegrationTest
+  def getport service
+    File.read(File.expand_path("../../../../../tmp/#{service}.port", __FILE__))
+  end
+
+  setup do
+    @kwport = getport 'keep-web-ssl'
+    @kwdport = getport 'keep-web-dl-ssl'
+    Rails.configuration.keep_web_url = "https://localhost:#{@kwport}/c=%{uuid_or_pdh}"
+    Rails.configuration.keep_web_download_url = "https://localhost:#{@kwdport}/c=%{uuid_or_pdh}"
+    CollectionsController.any_instance.expects(:file_enumerator).never
+
+    # Make sure Capybara can download files.
+    need_selenium 'for downloading', :selenium_with_download
+    DownloadHelper.clear
+
+    # Keep data isn't populated by fixtures, so we have to write any
+    # data we expect to read.
+    ['foo', 'w a z', "Hello world\n"].each do |data|
+      md5 = `echo -n #{data.shellescape} | arv-put --no-progress --raw -`
+      assert_match /^#{Digest::MD5.hexdigest(data)}/, md5
+      assert $?.success?, $?
+    end
+  end
+
+  ['uuid', 'portable_data_hash'].each do |id_type|
+    test "preview from keep-web by #{id_type} using a reader token" do
+      uuid_or_pdh = api_fixture('collections')['foo_file'][id_type]
+      token = api_fixture('api_client_authorizations')['active_all_collections']['api_token']
+      visit "/collections/download/#{uuid_or_pdh}/#{token}/"
+      within "#collection_files" do
+        click_link 'foo'
+      end
+      assert_no_selector 'a'
+      assert_text 'foo'
+    end
+
+    test "preview anonymous content from keep-web by #{id_type}" do
+      Rails.configuration.anonymous_user_token =
+        api_fixture('api_client_authorizations')['anonymous']['api_token']
+      uuid_or_pdh =
+        api_fixture('collections')['public_text_file'][id_type]
+      visit "/collections/#{uuid_or_pdh}"
+      within "#collection_files" do
+        find('[title~=View]').click
+      end
+      assert_no_selector 'a'
+      assert_text 'Hello world'
+    end
+
+    test "download anonymous content from keep-web by #{id_type}" do
+      Rails.configuration.anonymous_user_token =
+        api_fixture('api_client_authorizations')['anonymous']['api_token']
+      uuid_or_pdh =
+        api_fixture('collections')['public_text_file'][id_type]
+      visit "/collections/#{uuid_or_pdh}"
+      within "#collection_files" do
+        find('[title~=Download]').click
+      end
+      wait_for_download 'Hello world.txt', "Hello world\n"
+    end
+  end
+
+  test "download from keep-web using a session token" do
+    uuid = api_fixture('collections')['w_a_z_file']['uuid']
+    token = api_fixture('api_client_authorizations')['active']['api_token']
+    visit page_with_token('active', "/collections/#{uuid}")
+    within "#collection_files" do
+      find('[title~=Download]').click
+    end
+    wait_for_download 'w a z', 'w a z'
+  end
+
+  def wait_for_download filename, expect_data
+    data = nil
+    tries = 0
+    while tries < 20
+      sleep 0.1
+      tries += 1
+      data = File.read(DownloadHelper.path.join filename) rescue nil
+    end
+    assert_equal expect_data, data
+  end
+
+  # TODO(TC): test "view pages hosted by keep-web, using session
+  # token". We might persuade selenium to send
+  # "collection-uuid.dl.example" requests to localhost by configuring
+  # our test nginx server to work as its forward proxy. Until then,
+  # we're relying on the "Redirect to keep_web_url via #{id_type}"
+  # test in CollectionsControllerTest (and keep-web's tests).
+end
index 39fdf4b260abd68be05f252158f29629aa199199..1cda1275e11724074d2fd39e1c273e7613b74bd4 100644 (file)
@@ -19,6 +19,18 @@ Capybara.register_driver :poltergeist_without_file_api do |app|
   Capybara::Poltergeist::Driver.new app, POLTERGEIST_OPTS.merge(extensions: [js])
 end
 
+Capybara.register_driver :selenium_with_download do |app|
+  profile = Selenium::WebDriver::Firefox::Profile.new
+  profile['browser.download.dir'] = DownloadHelper.path.to_s
+  profile['browser.download.downloadDir'] = DownloadHelper.path.to_s
+  profile['browser.download.defaultFolder'] = DownloadHelper.path.to_s
+  profile['browser.download.folderList'] = 2 # "save to user-defined location"
+  profile['browser.download.manager.showWhenStarting'] = false
+  profile['browser.helperApps.alwaysAsk.force'] = false
+  profile['browser.helperApps.neverAsk.saveToDisk'] = 'text/plain,application/octet-stream'
+  Capybara::Selenium::Driver.new app, profile: profile
+end
+
 module WaitForAjax
   Capybara.default_wait_time = 5
   def wait_for_ajax
@@ -73,8 +85,8 @@ module HeadlessHelper
     end
   end
 
-  def need_selenium reason=nil
-    Capybara.current_driver = :selenium
+  def need_selenium reason=nil, driver=:selenium
+    Capybara.current_driver = driver
     unless ENV['ARVADOS_TEST_HEADFUL'] or @headless
       @headless = HeadlessSingleton.get
       @headless.start
index 89d15c67d8de3708c4d74540518b9707e09aa432..41592af993261b6affd55fa6cba0f05780d475ec 100644 (file)
@@ -176,7 +176,10 @@ class ApiServerForTests
       # though it doesn't need to start up a new server).
       env_script = check_output %w(python ./run_test_server.py start --auth admin)
       check_output %w(python ./run_test_server.py start_arv-git-httpd)
+      check_output %w(python ./run_test_server.py start_keep-web)
       check_output %w(python ./run_test_server.py start_nginx)
+      # This one isn't a no-op, even under run-tests.sh.
+      check_output %w(python ./run_test_server.py start_keep)
     end
     test_env = {}
     env_script.each_line do |line|
@@ -192,9 +195,11 @@ class ApiServerForTests
 
   def stop_test_server
     Dir.chdir PYTHON_TESTS_DIR do
+      check_output %w(python ./run_test_server.py stop_keep)
       # These are no-ops if we're running within run-tests.sh
       check_output %w(python ./run_test_server.py stop_nginx)
       check_output %w(python ./run_test_server.py stop_arv-git-httpd)
+      check_output %w(python ./run_test_server.py stop_keep-web)
       check_output %w(python ./run_test_server.py stop)
     end
     @@server_is_running = false
index 695584fa247f2176db5d690adebf4b9002389f0d..69ff768ee1f17286c372719d5deb0c6171799356 100644 (file)
@@ -244,6 +244,14 @@ exec chpst -m 1073741824 -u webserver-user:webserver-group -e "$envdir" \
   passenger_enabled on;
   # If you're using RVM, uncomment the line below.
   #passenger_ruby /usr/local/rvm/wrappers/default/ruby;
+
+  # This value effectively limits the size of API objects users can
+  # create, especially collections.  If you change this, you should
+  # also ensure the following settings match it:
+  # * `client_max_body_size` in the server section below
+  # * `client_max_body_size` in the Workbench Nginx configuration (twice)
+  # * `max_request_size` in the API server's application.yml file
+  client_max_body_size 128m;
 }
 
 upstream api {
@@ -277,10 +285,7 @@ server {
 
   index  index.html index.htm index.php;
 
-  # This value effectively limits the size of API objects users can create,
-  # especially collections.  If you change this, you should also set
-  # `max_request_size` in the API server's application.yml file to the same
-  # value.
+  # Refer to the comment about this setting in the server section above.
   client_max_body_size 128m;
 
   location / {
index 1c31dc4d6ef664b424a5ee6b901c8a2350912b4e..7eec2d645a67960738e13acfce16231532403a22 100644 (file)
@@ -338,6 +338,9 @@ server {
   ssl_certificate         <span class="userinput">/YOUR/PATH/TO/cert.pem</span>;
   ssl_certificate_key     <span class="userinput">/YOUR/PATH/TO/cert.key</span>;
 
+  # The server needs to accept potentially large refpacks from push clients.
+  client_max_body_size 50m;
+
   location  / {
     proxy_pass            http://arvados-git-httpd;
   }
index 370a6e7c3d04fce578ccc1c231e82ec0e44960f6..d632f9bbd61974966499090bed0c71949633edac 100644 (file)
@@ -185,7 +185,7 @@ export RAILS_ENV=production
 export CRUNCH_JOB_DOCKER_BIN=<span class="userinput">docker.io</span>
 
 fuser -TERM -k $CRUNCH_DISPATCH_LOCKFILE || true
-cd /var/www/arvados-api/services/api
+cd /var/www/arvados-api/current
 exec $rvmexec bundle exec ./script/crunch-dispatch.rb 2>&1
 </code></pre>
 </notextile>
index 0a00ca85f1d87348103e27b949575682e70e6174..5eb4191e8cab6133944904a6a43c4028b19e39ee 100644 (file)
@@ -6,14 +6,15 @@ title: Install the keep-web server
 
 The keep-web server provides read-only HTTP access to files stored in Keep. It serves public data to unauthenticated clients, and serves private data to clients that supply Arvados API tokens. It can be installed anywhere with access to Keep services, typically behind a web proxy that provides SSL support. See the "godoc page":http://godoc.org/github.com/curoverse/arvados/services/keep-web for more detail.
 
-By convention, we use the following hostname for the keep-web service:
+By convention, we use the following hostnames for the keep-web service:
 
 <notextile>
-<pre><code>collections.<span class="userinput">uuid_prefix</span>.your.domain
+<pre><code>download.<span class="userinput">uuid_prefix</span>.your.domain
+collections.<span class="userinput">uuid_prefix</span>.your.domain
 </code></pre>
 </notextile>
 
-This hostname should resolve from anywhere on the internet.
+The above hostnames should resolve from anywhere on the internet.
 
 h2. Install keep-web
 
@@ -36,12 +37,12 @@ Verify that @keep-web@ is functional:
 <notextile>
 <pre><code>~$ <span class="userinput">keep-web -h</span>
 Usage of keep-web:
-  -address string
-        Address to listen on: "host:port", or ":port" to listen on all interfaces. (default ":80")
-  -anonymous-token value
-        API token to try when none of the tokens provided in an HTTP request succeed in reading the desired collection. If this flag is used more than once, each token will be attempted in turn until one works. (default [])
+  -allow-anonymous
+        Serve public data to anonymous clients. Try the token supplied in the ARVADOS_API_TOKEN environment variable when none of the tokens provided in an HTTP request succeed in reading the desired collection. (default false)
   -attachment-only-host string
         Accept credentials, and add "Content-Disposition: attachment" response headers, for requests at this hostname:port. Prohibiting inline display makes it possible to serve untrusted and non-public content from a single origin, i.e., without wildcard DNS or SSL.
+  -listen string
+        Address to listen on: "host:port", or ":port" to listen on all interfaces. (default ":80")
   -trust-all-content
         Serve non-public content from a single origin. Dangerous: read docs before using!
 </code></pre>
@@ -58,11 +59,16 @@ We recommend running @keep-web@ under "runit":https://packages.debian.org/search
 
 <notextile>
 <pre><code>export ARVADOS_API_HOST=<span class="userinput">uuid_prefix</span>.your.domain
-exec sudo -u nobody keep-web -address=<span class="userinput">:9002</span> -anonymous-token=<span class="userinput">hoShoomoo2bai3Ju1xahg6aeng1siquuaZ1yae2gi2Uhaeng2r</span> 2&gt;&amp;1
+export ARVADOS_API_TOKEN="<span class="userinput">hoShoomoo2bai3Ju1xahg6aeng1siquuaZ1yae2gi2Uhaeng2r</span>"
+exec sudo -u nobody keep-web \
+ -listen=<span class="userinput">:9002</span> \
+ -attachment-only-host=<span class="userinput">download.uuid_prefix.your.domain</span> \
+ -allow-anonymous \
+ 2&gt;&amp;1
 </code></pre>
 </notextile>
 
-Omit the @-anonymous-token@ arguments if you do not want to serve public data.
+Omit the @-allow-anonymous@ argument if you do not want to serve public data.
 
 Set @ARVADOS_API_HOST_INSECURE=1@ if your API server's SSL certificate is not signed by a recognized CA.
 
@@ -72,7 +78,7 @@ The keep-web service will be accessible from anywhere on the internet, so we rec
 
 This is best achieved by putting a reverse proxy with SSL support in front of keep-web, running on port 443 and passing requests to keep-web on port 9002 (or whatever port you chose in your run script).
 
-Note: A wildcard SSL certificate is required in order to proxy keep-web effectively.
+Note: A wildcard SSL certificate is required in order to support a full-featured secure keep-web service. Without it, keep-web can offer file downloads for all Keep data; however, in order to avoid cross-site scripting vulnerabilities, keep-web refuses to serve private data as web content except when it is accessed using a "secret link" share. With a wildcard SSL certificate and DNS configured appropriately, all data can be served as web content.
 
 For example, using Nginx:
 
@@ -83,7 +89,10 @@ upstream keep-web {
 
 server {
   listen                <span class="userinput">[your public IP address]</span>:443 ssl;
-  server_name           collections.<span class="userinput">uuid_prefix</span>.your.domain *.collections.<span class="userinput">uuid_prefix</span>.your.domain ~.*--collections.<span class="userinput">uuid_prefix</span>.your.domain;
+  server_name           download.<span class="userinput">uuid_prefix</span>.your.domain
+                        collections.<span class="userinput">uuid_prefix</span>.your.domain
+                        *.collections.<span class="userinput">uuid_prefix</span>.your.domain
+                        ~.*--collections.<span class="userinput">uuid_prefix</span>.your.domain;
 
   proxy_connect_timeout 90s;
   proxy_read_timeout    300s;
@@ -103,17 +112,29 @@ server {
 h3. Configure DNS
 
 Configure your DNS servers so the following names resolve to your Nginx proxy's public IP address.
-* @*--collections.uuid_prefix.your.domain@, if your DNS server allows this without interfering with other DNS names; or
-* @*.collections.uuid_prefix.your.domain@, if you have a wildcard SSL certificate valid for these names; or
-* @collections.uuid_prefix.your.domain@, if neither of the above options is feasible. In this case, only unauthenticated requests will be served, i.e., public data and collection sharing links.
+* @download.uuid_prefix.your.domain@
+* @collections.uuid_prefix.your.domain@
+* @*--collections.uuid_prefix.your.domain@, if you have a wildcard SSL certificate valid for @*.uuid_prefix.your.domain@ and your DNS server allows this without interfering with other DNS names.
+* @*.collections.uuid_prefix.your.domain@, if you have a wildcard SSL certificate valid for these names.
+
+If neither of the above wildcard options is feasible, only unauthenticated requests (public data and collection sharing links) will be served as web content at @collections.uuid_prefix.your.domain@. The @download@ name will be used to serve authenticated content, but only as file downloads.
 
 h3. Tell Workbench about the keep-web service
 
-Add *one* of the following entries to your Workbench configuration file (@/etc/arvados/workbench/application.yml@), depending on your DNS setup:
+Workbench has features like "download file from collection" and "show image" which work better if the content is served by keep-web rather than Workbench itself. We recommend using the two different hostnames ("download" and "collections" above) for file downloads and inline content respectively.
+
+Add the following entry to your Workbench configuration file (@/etc/arvados/workbench/application.yml@). This URL will be used for file downloads.
+
+<notextile>
+<pre><code>keep_web_download_url: https://download.<span class="userinput">uuid_prefix</span>.your.domain/c=%{uuid_or_pdh}
+</code></pre>
+</notextile>
+
+Additionally, add *one* of the following entries to your Workbench configuration file, depending on your DNS setup. This URL will be used to serve user content that can be displayed in the browser, like image previews and static HTML pages.
 
 <notextile>
 <pre><code>keep_web_url: https://%{uuid_or_pdh}--collections.<span class="userinput">uuid_prefix</span>.your.domain
 keep_web_url: https://%{uuid_or_pdh}.collections.<span class="userinput">uuid_prefix</span>.your.domain
-keep_web_url: https://collections.<span class="userinput">uuid_prefix</span>.your.domain
+keep_web_url: https://collections.<span class="userinput">uuid_prefix</span>.your.domain/c=%{uuid_or_pdh}
 </code></pre>
 </notextile>
index e6e2b103ae286317e2ba88910d5897adeee72068..5a5b66aaaef98c1ee2e42525df2c881655baf3ec 100644 (file)
@@ -56,7 +56,7 @@ The Keepproxy server needs a token to talk to the API server.
 On the <strong>API server</strong>, use the following command to create the token:
 
 <notextile>
-<pre><code>~/arvados/services/api/script$ <span class="userinput">RAILS_ENV=production bundle exec ./get_anonymous_user_token.rb</span>
+<pre><code>/var/www/arvados-api/current/script$ <span class="userinput">RAILS_ENV=production bundle exec ./get_anonymous_user_token.rb</span>
 hoShoomoo2bai3Ju1xahg6aeng1siquuaZ1yae2gi2Uhaeng2r
 </code></pre></notextile>
 
index 22fc1557a8fae0336fff6f0f80e89e8f8d1b1262..d05d94daa102e0b6b3a085c06e887905da775f29 100644 (file)
@@ -115,6 +115,10 @@ For best performance, we recommend you use Nginx as your Web server front-end, w
   passenger_enabled on;
   # If you're using RVM, uncomment the line below.
   #passenger_ruby /usr/local/rvm/wrappers/default/ruby;
+
+  # `client_max_body_size` should match the corresponding setting in
+  # the API server's Nginx configuration.
+  client_max_body_size 128m;
 }
 
 upstream workbench {
@@ -132,6 +136,8 @@ server {
   ssl_certificate_key <span class="userinput">/YOUR/PATH/TO/cert.key</span>;
 
   index  index.html index.htm index.php;
+  # `client_max_body_size` should match the corresponding setting in
+  # the API server's Nginx configuration.
   client_max_body_size 128m;
 
   location / {
index d9e00dc22bf7d69ab91aaf2b9efce1616e1842ec..3e72658d476020bd6bf2cf8e58b5b1afb7a6b65a 100755 (executable)
@@ -149,6 +149,10 @@ p = Trollop::Parser.new do
       "Description for the pipeline instance.",
       :short => :none,
       :type => :string)
+  opt(:project_uuid,
+      "UUID of the project for the pipeline instance.",
+      short: :none,
+      type: :string)
   stop_on [:'--']
 end
 $options = Trollop::with_standard_exception_handling p do
@@ -440,18 +444,23 @@ class WhRunPipelineInstance
         end
       end
     else
-      description = $options[:description]
-      description = ("Created at #{Time.now.localtime}" + (@template[:name].andand.size.andand>0 ? " using the pipeline template *#{@template[:name]}*" : "")) if !description
-      @instance = PipelineInstance.
-        create(components: @components,
-               properties: {
-                 run_options: {
-                   enable_job_reuse: !@options[:no_reuse]
-                 }
-               },
-               pipeline_template_uuid: @template[:uuid],
-               description: description,
-               state: ($options[:submit] ? 'RunningOnServer' : 'RunningOnClient'))
+      description = $options[:description] ||
+                    ("Created at #{Time.now.localtime}" + (@template[:name].andand.size.andand>0 ? " using the pipeline template *#{@template[:name]}*" : ""))
+      instance_body = {
+        components: @components,
+        properties: {
+          run_options: {
+            enable_job_reuse: !@options[:no_reuse]
+          }
+        },
+        pipeline_template_uuid: @template[:uuid],
+        description: description,
+        state: ($options[:submit] ? 'RunningOnServer' : 'RunningOnClient')
+      }
+      if @options[:project_uuid]
+        instance_body[:owner_uuid] = @options[:project_uuid]
+      end
+      @instance = PipelineInstance.create(instance_body)
     end
     self
   end
index c2ea186ef5114f17a3198c1aee872103d5c0dc95..28da66d0ca74296ef8475a5d96c0cb6fb3d41764 100755 (executable)
@@ -820,6 +820,9 @@ update_progress_stats();
 THISROUND:
 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
 {
+  # Don't create new tasks if we already know the job's final result.
+  last if defined($main::success);
+
   my $id = $jobstep_todo[$todo_ptr];
   my $Jobstep = $jobstep[$id];
   if ($Jobstep->{level} != $level)
@@ -893,7 +896,7 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
         .q{&& SWAP=$(awk '($1 == "SwapTotal:"){print $2}' </proc/meminfo) }
         ."&& MEMLIMIT=\$(( (\$MEM * 95) / ($ENV{CRUNCH_NODE_SLOTS} * 100) )) "
         ."&& let SWAPLIMIT=\$MEMLIMIT+\$SWAP ";
-    $command .= "&& exec arv-mount --by-id --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
+    $command .= "&& exec arv-mount --by-pdh --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
     if ($docker_hash)
     {
       my $containername = "$Jobstep->{arvados_task}->{uuid}-$Jobstep->{failures}";
@@ -1018,7 +1021,7 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
         ||
         ($round_num_freeslots > @freeslot && $todo_ptr+1 > $#jobstep_todo))
   {
-    last THISROUND if $main::please_freeze || defined($main::success);
+    last THISROUND if $main::please_freeze;
     if ($main::please_info)
     {
       $main::please_info = 0;
@@ -1179,6 +1182,9 @@ sub reapchildren
 
   if (!defined $task_success) {
     # task did not indicate one way or the other --> fail
+    Log($jobstepid, sprintf(
+          "ERROR: Task process exited %d, but never updated its task record to indicate success and record its output.",
+          exit_status_s($childstatus)));
     $Jobstep->{'arvados_task'}->{success} = 0;
     $Jobstep->{'arvados_task'}->save;
     $task_success = 0;
@@ -1695,20 +1701,24 @@ sub log_writer_finish()
 
   close($log_pipe_in);
 
+  my $logger_failed = 0;
   my $read_result = log_writer_read_output(120);
   if ($read_result == -1) {
+    $logger_failed = -1;
     Log (undef, "timed out reading from 'arv-put'");
   } elsif ($read_result != 0) {
+    $logger_failed = -2;
     Log(undef, "failed to read arv-put log manifest to EOF");
   }
 
   waitpid($log_pipe_pid, 0);
   if ($?) {
+    $logger_failed ||= $?;
     Log(undef, "log_writer_finish: arv-put exited " . exit_status_s($?))
   }
 
   close($log_pipe_out);
-  my $arv_put_output = $log_pipe_out_buf;
+  my $arv_put_output = $logger_failed ? undef : $log_pipe_out_buf;
   $log_pipe_pid = $log_pipe_in = $log_pipe_out = $log_pipe_out_buf =
       $log_pipe_out_select = undef;
 
@@ -1774,13 +1784,13 @@ sub save_meta
   my $justcheckpoint = shift; # false if this will be the last meta saved
   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
   return unless log_writer_is_active();
+  my $log_manifest = log_writer_finish();
+  return unless defined($log_manifest);
 
-  my $log_manifest = "";
   if ($Job->{log}) {
     my $prev_log_coll = api_call("collections/get", uuid => $Job->{log});
-    $log_manifest .= $prev_log_coll->{manifest_text};
+    $log_manifest = $prev_log_coll->{manifest_text} . $log_manifest;
   }
-  $log_manifest .= log_writer_finish();
 
   my $log_coll = api_call(
     "collections/create", ensure_unique_name => 1, collection => {
index 18e1074bf6f6c801c21593bc57586a902807f8b4..b67eaa59a6749fb7e9b1a3da5ad2617344fc799d 100644 (file)
@@ -32,6 +32,8 @@ var ErrInvalidArgument = errors.New("Invalid argument")
 // such failures by always using a new or recently active socket.
 var MaxIdleConnectionDuration = 30 * time.Second
 
+var RetryDelay = 2 * time.Second
+
 // Indicates an error that was returned by the API server.
 type APIServerError struct {
        // Address of server returning error, of the form "host:port".
@@ -65,6 +67,9 @@ type Dict map[string]interface{}
 
 // Information about how to contact the Arvados server
 type ArvadosClient struct {
+       // https
+       Scheme string
+
        // Arvados API server, form "host:port"
        ApiServer string
 
@@ -85,6 +90,9 @@ type ArvadosClient struct {
        DiscoveryDoc Dict
 
        lastClosedIdlesAt time.Time
+
+       // Number of retries
+       Retries int
 }
 
 // Create a new ArvadosClient, initialized with standard Arvados environment
@@ -96,12 +104,14 @@ func MakeArvadosClient() (ac ArvadosClient, err error) {
        external := matchTrue.MatchString(os.Getenv("ARVADOS_EXTERNAL_CLIENT"))
 
        ac = ArvadosClient{
+               Scheme:      "https",
                ApiServer:   os.Getenv("ARVADOS_API_HOST"),
                ApiToken:    os.Getenv("ARVADOS_API_TOKEN"),
                ApiInsecure: insecure,
                Client: &http.Client{Transport: &http.Transport{
                        TLSClientConfig: &tls.Config{InsecureSkipVerify: insecure}}},
-               External: external}
+               External: external,
+               Retries:  2}
 
        if ac.ApiServer == "" {
                return ac, MissingArvadosApiHost
@@ -118,10 +128,12 @@ func MakeArvadosClient() (ac ArvadosClient, err error) {
 // CallRaw is the same as Call() but returns a Reader that reads the
 // response body, instead of taking an output object.
 func (c ArvadosClient) CallRaw(method string, resourceType string, uuid string, action string, parameters Dict) (reader io.ReadCloser, err error) {
-       var req *http.Request
-
+       scheme := c.Scheme
+       if scheme == "" {
+               scheme = "https"
+       }
        u := url.URL{
-               Scheme: "https",
+               Scheme: scheme,
                Host:   c.ApiServer}
 
        if resourceType != API_DISCOVERY_RESOURCE {
@@ -151,27 +163,15 @@ func (c ArvadosClient) CallRaw(method string, resourceType string, uuid string,
                }
        }
 
-       if method == "GET" || method == "HEAD" {
-               u.RawQuery = vals.Encode()
-               if req, err = http.NewRequest(method, u.String(), nil); err != nil {
-                       return nil, err
-               }
-       } else {
-               if req, err = http.NewRequest(method, u.String(), bytes.NewBufferString(vals.Encode())); err != nil {
-                       return nil, err
-               }
-               req.Header.Add("Content-Type", "application/x-www-form-urlencoded")
+       retryable := false
+       switch method {
+       case "GET", "HEAD", "PUT", "OPTIONS", "DELETE":
+               retryable = true
        }
 
-       // Add api token header
-       req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", c.ApiToken))
-       if c.External {
-               req.Header.Add("X-External-Client", "1")
-       }
-
-       // POST and DELETE are not safe to retry automatically, so we minimize
-       // such failures by always using a new or recently active socket
-       if method == "POST" || method == "DELETE" {
+       // Non-retryable methods such as POST are not safe to retry automatically,
+       // so we minimize such failures by always using a new or recently active socket
+       if !retryable {
                if time.Since(c.lastClosedIdlesAt) > MaxIdleConnectionDuration {
                        c.lastClosedIdlesAt = time.Now()
                        c.Client.Transport.(*http.Transport).CloseIdleConnections()
@@ -179,17 +179,57 @@ func (c ArvadosClient) CallRaw(method string, resourceType string, uuid string,
        }
 
        // Make the request
+       var req *http.Request
        var resp *http.Response
-       if resp, err = c.Client.Do(req); err != nil {
-               return nil, err
-       }
 
-       if resp.StatusCode == http.StatusOK {
-               return resp.Body, nil
+       for attempt := 0; attempt <= c.Retries; attempt++ {
+               if method == "GET" || method == "HEAD" {
+                       u.RawQuery = vals.Encode()
+                       if req, err = http.NewRequest(method, u.String(), nil); err != nil {
+                               return nil, err
+                       }
+               } else {
+                       if req, err = http.NewRequest(method, u.String(), bytes.NewBufferString(vals.Encode())); err != nil {
+                               return nil, err
+                       }
+                       req.Header.Add("Content-Type", "application/x-www-form-urlencoded")
+               }
+
+               // Add api token header
+               req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", c.ApiToken))
+               if c.External {
+                       req.Header.Add("X-External-Client", "1")
+               }
+
+               resp, err = c.Client.Do(req)
+               if err != nil {
+                       if retryable {
+                               time.Sleep(RetryDelay)
+                               continue
+                       } else {
+                               return nil, err
+                       }
+               }
+
+               if resp.StatusCode == http.StatusOK {
+                       return resp.Body, nil
+               }
+
+               defer resp.Body.Close()
+
+               switch resp.StatusCode {
+               case 408, 409, 422, 423, 500, 502, 503, 504:
+                       time.Sleep(RetryDelay)
+                       continue
+               default:
+                       return nil, newAPIServerError(c.ApiServer, resp)
+               }
        }
 
-       defer resp.Body.Close()
-       return nil, newAPIServerError(c.ApiServer, resp)
+       if resp != nil {
+               return nil, newAPIServerError(c.ApiServer, resp)
+       }
+       return nil, err
 }
 
 func newAPIServerError(ServerAddress string, resp *http.Response) APIServerError {
index 75af3ca51960a1b47a8ce9406ad3200823aac5db..8e32efe4f987adccbe77b573a15ab27dbdfcc707 100644 (file)
@@ -1,8 +1,10 @@
 package arvadosclient
 
 import (
+       "fmt"
        "git.curoverse.com/arvados.git/sdk/go/arvadostest"
        . "gopkg.in/check.v1"
+       "net"
        "net/http"
        "os"
        "testing"
@@ -16,6 +18,7 @@ func Test(t *testing.T) {
 
 var _ = Suite(&ServerRequiredSuite{})
 var _ = Suite(&UnitSuite{})
+var _ = Suite(&MockArvadosServerSuite{})
 
 // Tests that require the Keep server running
 type ServerRequiredSuite struct{}
@@ -23,6 +26,12 @@ type ServerRequiredSuite struct{}
 func (s *ServerRequiredSuite) SetUpSuite(c *C) {
        arvadostest.StartAPI()
        arvadostest.StartKeep(2, false)
+       RetryDelay = 0
+}
+
+func (s *ServerRequiredSuite) TearDownSuite(c *C) {
+       arvadostest.StopKeep(2)
+       arvadostest.StopAPI()
 }
 
 func (s *ServerRequiredSuite) SetUpTest(c *C) {
@@ -218,3 +227,160 @@ func (s *UnitSuite) TestPDHMatch(c *C) {
        c.Assert(PDHMatch("+12345"), Equals, false)
        c.Assert(PDHMatch(""), Equals, false)
 }
+
+// Tests that use mock arvados server
+type MockArvadosServerSuite struct{}
+
+func (s *MockArvadosServerSuite) SetUpSuite(c *C) {
+       RetryDelay = 0
+}
+
+func (s *MockArvadosServerSuite) SetUpTest(c *C) {
+       arvadostest.ResetEnv()
+}
+
+type APIServer struct {
+       listener net.Listener
+       url      string
+}
+
+func RunFakeArvadosServer(st http.Handler) (api APIServer, err error) {
+       api.listener, err = net.ListenTCP("tcp", &net.TCPAddr{Port: 0})
+       if err != nil {
+               return
+       }
+       api.url = api.listener.Addr().String()
+       go http.Serve(api.listener, st)
+       return
+}
+
+type APIStub struct {
+       method        string
+       retryAttempts int
+       expected      int
+       respStatus    []int
+       responseBody  []string
+}
+
+func (h *APIStub) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+       if req.URL.Path == "/redirect-loop" {
+               http.Redirect(resp, req, "/redirect-loop", http.StatusFound)
+               return
+       }
+       if h.respStatus[h.retryAttempts] < 0 {
+               // Fail the client's Do() by starting a redirect loop
+               http.Redirect(resp, req, "/redirect-loop", http.StatusFound)
+       } else {
+               resp.WriteHeader(h.respStatus[h.retryAttempts])
+               resp.Write([]byte(h.responseBody[h.retryAttempts]))
+       }
+       h.retryAttempts++
+}
+
+func (s *MockArvadosServerSuite) TestWithRetries(c *C) {
+       for _, stub := range []APIStub{
+               {
+                       "get", 0, 200, []int{200, 500}, []string{`{"ok":"ok"}`, ``},
+               },
+               {
+                       "create", 0, 200, []int{200, 500}, []string{`{"ok":"ok"}`, ``},
+               },
+               {
+                       "get", 0, 500, []int{500, 500, 500, 200}, []string{``, ``, ``, `{"ok":"ok"}`},
+               },
+               {
+                       "create", 0, 500, []int{500, 500, 500, 200}, []string{``, ``, ``, `{"ok":"ok"}`},
+               },
+               {
+                       "update", 0, 500, []int{500, 500, 500, 200}, []string{``, ``, ``, `{"ok":"ok"}`},
+               },
+               {
+                       "delete", 0, 500, []int{500, 500, 500, 200}, []string{``, ``, ``, `{"ok":"ok"}`},
+               },
+               {
+                       "get", 0, 502, []int{500, 500, 502, 200}, []string{``, ``, ``, `{"ok":"ok"}`},
+               },
+               {
+                       "create", 0, 502, []int{500, 500, 502, 200}, []string{``, ``, ``, `{"ok":"ok"}`},
+               },
+               {
+                       "get", 0, 200, []int{500, 500, 200}, []string{``, ``, `{"ok":"ok"}`},
+               },
+               {
+                       "create", 0, 200, []int{500, 500, 200}, []string{``, ``, `{"ok":"ok"}`},
+               },
+               {
+                       "delete", 0, 200, []int{500, 500, 200}, []string{``, ``, `{"ok":"ok"}`},
+               },
+               {
+                       "update", 0, 200, []int{500, 500, 200}, []string{``, ``, `{"ok":"ok"}`},
+               },
+               {
+                       "get", 0, 401, []int{401, 200}, []string{``, `{"ok":"ok"}`},
+               },
+               {
+                       "create", 0, 401, []int{401, 200}, []string{``, `{"ok":"ok"}`},
+               },
+               {
+                       "get", 0, 404, []int{404, 200}, []string{``, `{"ok":"ok"}`},
+               },
+               {
+                       "get", 0, 401, []int{500, 401, 200}, []string{``, ``, `{"ok":"ok"}`},
+               },
+
+               // Response code -1 simulates an HTTP/network error
+               // (i.e., Do() returns an error; there is no HTTP
+               // response status code).
+
+               // Succeed on second retry
+               {
+                       "get", 0, 200, []int{-1, -1, 200}, []string{``, ``, `{"ok":"ok"}`},
+               },
+               // "POST" is not safe to retry: fail after one error
+               {
+                       "create", 0, -1, []int{-1, 200}, []string{``, `{"ok":"ok"}`},
+               },
+       } {
+               api, err := RunFakeArvadosServer(&stub)
+               c.Check(err, IsNil)
+
+               defer api.listener.Close()
+
+               arv := ArvadosClient{
+                       Scheme:      "http",
+                       ApiServer:   api.url,
+                       ApiToken:    "abc123",
+                       ApiInsecure: true,
+                       Client:      &http.Client{Transport: &http.Transport{}},
+                       Retries:     2}
+
+               getback := make(Dict)
+               switch stub.method {
+               case "get":
+                       err = arv.Get("collections", "zzzzz-4zz18-znfnqtbbv4spc3w", nil, &getback)
+               case "create":
+                       err = arv.Create("collections",
+                               Dict{"collection": Dict{"name": "testing"}},
+                               &getback)
+               case "update":
+                       err = arv.Update("collections", "zzzzz-4zz18-znfnqtbbv4spc3w",
+                               Dict{"collection": Dict{"name": "testing"}},
+                               &getback)
+               case "delete":
+                       err = arv.Delete("pipeline_templates", "zzzzz-4zz18-znfnqtbbv4spc3w", nil, &getback)
+               }
+
+               switch stub.expected {
+               case 200:
+                       c.Check(err, IsNil)
+                       c.Check(getback["ok"], Equals, "ok")
+               case -1:
+                       c.Check(err, NotNil)
+                       c.Check(err, ErrorMatches, `.*stopped after \d+ redirects`)
+               default:
+                       c.Check(err, NotNil)
+                       c.Check(err, ErrorMatches, fmt.Sprintf("arvados API server error: %d.*", stub.expected))
+                       c.Check(err.(APIServerError).HttpStatusCode, Equals, stub.expected)
+               }
+       }
+}
index d0270a6a71f79643bd4fdfbdd621b514449353f2..3256ec27a2572c0d9889ab1067dc43845075c540 100644 (file)
@@ -4,7 +4,9 @@ package arvadostest
 const (
        SpectatorToken        = "zw2f4gwx8hw8cjre7yp6v1zylhrhn3m5gvjq73rtpwhmknrybu"
        ActiveToken           = "3kg6k6lzmp9kj5cpkcoxie963cmvjahbt2fod9zru30k1jqdmi"
+       AdminToken            = "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h"
        AnonymousToken        = "4kg6k6lzmp9kj4cpkcoxie964cmvjahbt4fod9zru44k4jqdmi"
+       DataManagerToken      = "320mkve8qkswstz7ff61glpk3mhgghmg67wmic7elw4z41pke1"
        FooCollection         = "zzzzz-4zz18-fy296fx3hot09f7"
        NonexistentCollection = "zzzzz-4zz18-totallynotexist"
        HelloWorldCollection  = "zzzzz-4zz18-4en62shvi99lxd4"
index 27c552a4e104094ca2ed15991e310a3b7e9cd65e..c61b68b319fbe50b5e71ea97e5751da57af687d9 100644 (file)
@@ -3,9 +3,6 @@ package arvadostest
 import (
        "bufio"
        "bytes"
-       "fmt"
-       "io"
-       "io/ioutil"
        "log"
        "os"
        "os/exec"
@@ -68,24 +65,12 @@ func StartAPI() {
        chdirToPythonTests()
 
        cmd := exec.Command("python", "run_test_server.py", "start", "--auth", "admin")
-       stderr, err := cmd.StderrPipe()
-       if err != nil {
-               log.Fatal(err)
-       }
-       go io.Copy(os.Stderr, stderr)
-       stdout, err := cmd.StdoutPipe()
+       cmd.Stdin = nil
+       cmd.Stderr = os.Stderr
+
+       authScript, err := cmd.Output()
        if err != nil {
-               log.Fatal(err)
-       }
-       if err = cmd.Start(); err != nil {
-               log.Fatal(err)
-       }
-       var authScript []byte
-       if authScript, err = ioutil.ReadAll(stdout); err != nil {
-               log.Fatal(err)
-       }
-       if err = cmd.Wait(); err != nil {
-               log.Fatal(err)
+               log.Fatalf("%+v: %s", cmd.Args, err)
        }
        ParseAuthSettings(authScript)
        ResetEnv()
@@ -96,7 +81,7 @@ func StopAPI() {
        defer os.Chdir(cwd)
        chdirToPythonTests()
 
-       exec.Command("python", "run_test_server.py", "stop").Run()
+       bgRun(exec.Command("python", "run_test_server.py", "stop"))
 }
 
 // StartKeep starts the given number of keep servers,
@@ -112,16 +97,7 @@ func StartKeep(numKeepServers int, enforcePermissions bool) {
                cmdArgs = append(cmdArgs, "--keep-enforce-permissions")
        }
 
-       cmd := exec.Command("python", cmdArgs...)
-
-       stderr, err := cmd.StderrPipe()
-       if err != nil {
-               log.Fatalf("Setting up stderr pipe: %s", err)
-       }
-       go io.Copy(os.Stderr, stderr)
-       if err := cmd.Run(); err != nil {
-               panic(fmt.Sprintf("'python run_test_server.py start_keep' returned error %s", err))
-       }
+       bgRun(exec.Command("python", cmdArgs...))
 }
 
 // StopKeep stops keep servers that were started with StartKeep.
@@ -132,5 +108,27 @@ func StopKeep(numKeepServers int) {
        defer os.Chdir(cwd)
        chdirToPythonTests()
 
-       exec.Command("python", "run_test_server.py", "stop_keep", "--num-keep-servers", strconv.Itoa(numKeepServers))
+       cmd := exec.Command("python", "run_test_server.py", "stop_keep", "--num-keep-servers", strconv.Itoa(numKeepServers))
+       cmd.Stdin = nil
+       cmd.Stderr = os.Stderr
+       cmd.Stdout = os.Stderr
+       if err := cmd.Run(); err != nil {
+               log.Fatalf("%+v: %s", cmd.Args, err)
+       }
+}
+
+// Start cmd, with stderr and stdout redirected to our own
+// stderr. Return when the process exits, but do not wait for its
+// stderr and stdout to close: any grandchild processes will continue
+// writing to our stderr.
+func bgRun(cmd *exec.Cmd) {
+       cmd.Stdin = nil
+       cmd.Stderr = os.Stderr
+       cmd.Stdout = os.Stderr
+       if err := cmd.Start(); err != nil {
+               log.Fatalf("%+v: %s", cmd.Args, err)
+       }
+       if _, err := cmd.Process.Wait(); err != nil {
+               log.Fatalf("%+v: %s", cmd.Args, err)
+       }
 }
diff --git a/sdk/go/keepclient/discover.go b/sdk/go/keepclient/discover.go
new file mode 100644 (file)
index 0000000..650c2b5
--- /dev/null
@@ -0,0 +1,130 @@
+package keepclient
+
+import (
+       "encoding/json"
+       "fmt"
+       "log"
+       "os"
+       "os/signal"
+       "reflect"
+       "strings"
+       "syscall"
+       "time"
+)
+
+// DiscoverKeepServers gets list of available keep services from api server
+func (this *KeepClient) DiscoverKeepServers() error {
+       var list svcList
+
+       // Get keep services from api server
+       err := this.Arvados.Call("GET", "keep_services", "", "accessible", nil, &list)
+       if err != nil {
+               return err
+       }
+
+       return this.loadKeepServers(list)
+}
+
+// LoadKeepServicesFromJSON gets list of available keep services from given JSON
+func (this *KeepClient) LoadKeepServicesFromJSON(services string) error {
+       var list svcList
+
+       // Load keep services from given json
+       dec := json.NewDecoder(strings.NewReader(services))
+       if err := dec.Decode(&list); err != nil {
+               return err
+       }
+
+       return this.loadKeepServers(list)
+}
+
+// RefreshServices calls DiscoverKeepServers to refresh the keep
+// service list on SIGHUP; when the given interval has elapsed since
+// the last refresh; and (if the last refresh failed) the given
+// errInterval has elapsed.
+func (kc *KeepClient) RefreshServices(interval, errInterval time.Duration) {
+       var previousRoots = []map[string]string{}
+
+       timer := time.NewTimer(interval)
+       gotHUP := make(chan os.Signal, 1)
+       signal.Notify(gotHUP, syscall.SIGHUP)
+
+       for {
+               select {
+               case <-gotHUP:
+               case <-timer.C:
+               }
+               timer.Reset(interval)
+
+               if err := kc.DiscoverKeepServers(); err != nil {
+                       log.Println("Error retrieving services list: %v (retrying in %v)", err, errInterval)
+                       timer.Reset(errInterval)
+                       continue
+               }
+               newRoots := []map[string]string{kc.LocalRoots(), kc.GatewayRoots()}
+
+               if !reflect.DeepEqual(previousRoots, newRoots) {
+                       log.Printf("Updated services list: locals %v gateways %v", newRoots[0], newRoots[1])
+                       previousRoots = newRoots
+               }
+
+               if len(newRoots[0]) == 0 {
+                       log.Printf("WARNING: No local services (retrying in %v)", errInterval)
+                       timer.Reset(errInterval)
+               }
+       }
+}
+
+// loadKeepServers
+func (this *KeepClient) loadKeepServers(list svcList) error {
+       listed := make(map[string]bool)
+       localRoots := make(map[string]string)
+       gatewayRoots := make(map[string]string)
+       writableLocalRoots := make(map[string]string)
+
+       // replicasPerService is 1 for disks; unknown or unlimited otherwise
+       this.replicasPerService = 1
+       this.Using_proxy = false
+
+       for _, service := range list.Items {
+               scheme := "http"
+               if service.SSL {
+                       scheme = "https"
+               }
+               url := fmt.Sprintf("%s://%s:%d", scheme, service.Hostname, service.Port)
+
+               // Skip duplicates
+               if listed[url] {
+                       continue
+               }
+               listed[url] = true
+
+               localRoots[service.Uuid] = url
+               if service.SvcType == "proxy" {
+                       this.Using_proxy = true
+               }
+
+               if service.ReadOnly == false {
+                       writableLocalRoots[service.Uuid] = url
+                       if service.SvcType != "disk" {
+                               this.replicasPerService = 0
+                       }
+               }
+
+               // Gateway services are only used when specified by
+               // UUID, so there's nothing to gain by filtering them
+               // by service type. Including all accessible services
+               // (gateway and otherwise) merely accommodates more
+               // service configurations.
+               gatewayRoots[service.Uuid] = url
+       }
+
+       if this.Using_proxy {
+               this.setClientSettingsProxy()
+       } else {
+               this.setClientSettingsDisk()
+       }
+
+       this.SetServiceRoots(localRoots, writableLocalRoots, gatewayRoots)
+       return nil
+}
diff --git a/sdk/go/keepclient/discover_test.go b/sdk/go/keepclient/discover_test.go
new file mode 100644 (file)
index 0000000..43a984e
--- /dev/null
@@ -0,0 +1,21 @@
+package keepclient
+
+import (
+       "fmt"
+       "time"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+)
+
+func ExampleRefreshServices() {
+       arv, err := arvadosclient.MakeArvadosClient()
+       if err != nil {
+               panic(err)
+       }
+       kc, err := MakeKeepClient(&arv)
+       if err != nil {
+               panic(err)
+       }
+       go kc.RefreshServices(5*time.Minute, 3*time.Second)
+       fmt.Printf("LocalRoots: %#v\n", kc.LocalRoots())
+}
index 8414afab1eace8b00125ca88f4f37279b95fc558..90ac3bcb17dc12913abe6b5a130c0c44d1efb6fa 100644 (file)
@@ -2,7 +2,6 @@ package keepclient
 
 import (
        "crypto/md5"
-       "encoding/json"
        "errors"
        "fmt"
        "git.curoverse.com/arvados.git/sdk/go/streamer"
@@ -87,86 +86,6 @@ type svcList struct {
        Items []keepService `json:"items"`
 }
 
-// DiscoverKeepServers gets list of available keep services from api server
-func (this *KeepClient) DiscoverKeepServers() error {
-       var list svcList
-
-       // Get keep services from api server
-       err := this.Arvados.Call("GET", "keep_services", "", "accessible", nil, &list)
-       if err != nil {
-               return err
-       }
-
-       return this.loadKeepServers(list)
-}
-
-// LoadKeepServicesFromJSON gets list of available keep services from given JSON
-func (this *KeepClient) LoadKeepServicesFromJSON(services string) error {
-       var list svcList
-
-       // Load keep services from given json
-       dec := json.NewDecoder(strings.NewReader(services))
-       if err := dec.Decode(&list); err != nil {
-               return err
-       }
-
-       return this.loadKeepServers(list)
-}
-
-// loadKeepServers
-func (this *KeepClient) loadKeepServers(list svcList) error {
-       listed := make(map[string]bool)
-       localRoots := make(map[string]string)
-       gatewayRoots := make(map[string]string)
-       writableLocalRoots := make(map[string]string)
-
-       // replicasPerService is 1 for disks; unknown or unlimited otherwise
-       this.replicasPerService = 1
-       this.Using_proxy = false
-
-       for _, service := range list.Items {
-               scheme := "http"
-               if service.SSL {
-                       scheme = "https"
-               }
-               url := fmt.Sprintf("%s://%s:%d", scheme, service.Hostname, service.Port)
-
-               // Skip duplicates
-               if listed[url] {
-                       continue
-               }
-               listed[url] = true
-
-               localRoots[service.Uuid] = url
-               if service.SvcType == "proxy" {
-                       this.Using_proxy = true
-               }
-
-               if service.ReadOnly == false {
-                       writableLocalRoots[service.Uuid] = url
-                       if service.SvcType != "disk" {
-                               this.replicasPerService = 0
-                       }
-               }
-
-               // Gateway services are only used when specified by
-               // UUID, so there's nothing to gain by filtering them
-               // by service type. Including all accessible services
-               // (gateway and otherwise) merely accommodates more
-               // service configurations.
-               gatewayRoots[service.Uuid] = url
-       }
-
-       if this.Using_proxy {
-               this.setClientSettingsProxy()
-       } else {
-               this.setClientSettingsDisk()
-       }
-
-       this.SetServiceRoots(localRoots, writableLocalRoots, gatewayRoots)
-       return nil
-}
-
 type uploadStatus struct {
        err             error
        url             string
index 8ed86fd79e96144886744dba81786ef21ef12ae8..e01fec412bc4e9cb83d47930b850b1a101e68438 100644 (file)
@@ -1,28 +1,16 @@
-import bz2
+import cStringIO
 import datetime
-import fcntl
-import functools
-import gflags
 import hashlib
-import json
 import logging
+import math
 import os
-import pprint
 import pycurl
 import Queue
 import re
 import socket
 import ssl
-import string
-import cStringIO
-import subprocess
-import sys
 import threading
-import time
 import timer
-import types
-import UserDict
-import zlib
 
 import arvados
 import arvados.config as config
@@ -225,28 +213,36 @@ class KeepBlockCache(object):
 class KeepClient(object):
 
     # Default Keep server connection timeout:  2 seconds
-    # Default Keep server read timeout:      300 seconds
+    # Default Keep server read timeout:       64 seconds
+    # Default Keep server bandwidth minimum:  32768 bytes per second
     # Default Keep proxy connection timeout:  20 seconds
-    # Default Keep proxy read timeout:       300 seconds
-    DEFAULT_TIMEOUT = (2, 300)
-    DEFAULT_PROXY_TIMEOUT = (20, 300)
+    # Default Keep proxy read timeout:        64 seconds
+    # Default Keep proxy bandwidth minimum:   32768 bytes per second
+    DEFAULT_TIMEOUT = (2, 64, 32768)
+    DEFAULT_PROXY_TIMEOUT = (20, 64, 32768)
 
     class ThreadLimiter(object):
-        """
-        Limit the number of threads running at a given time to
-        {desired successes} minus {successes reported}. When successes
-        reported == desired, wake up the remaining threads and tell
-        them to quit.
+        """Limit the number of threads writing to Keep at once.
+
+        This ensures that only a number of writer threads that could
+        potentially achieve the desired replication level run at once.
+        Once the desired replication level is achieved, queued threads
+        are instructed not to run.
 
         Should be used in a "with" block.
         """
-        def __init__(self, todo):
+        def __init__(self, want_copies, max_service_replicas):
             self._started = 0
-            self._todo = todo
+            self._want_copies = want_copies
             self._done = 0
             self._response = None
             self._start_lock = threading.Condition()
-            self._todo_lock = threading.Semaphore(todo)
+            if (not max_service_replicas) or (max_service_replicas >= want_copies):
+                max_threads = 1
+            else:
+                max_threads = math.ceil(float(want_copies) / max_service_replicas)
+            _logger.debug("Limiter max threads is %d", max_threads)
+            self._todo_lock = threading.Semaphore(max_threads)
             self._done_lock = threading.Lock()
             self._local = threading.local()
 
@@ -271,34 +267,28 @@ class KeepClient(object):
 
         def shall_i_proceed(self):
             """
-            Return true if the current thread should do stuff. Return
-            false if the current thread should just stop.
+            Return true if the current thread should write to Keep.
+            Return false otherwise.
             """
             with self._done_lock:
-                return (self._done < self._todo)
+                return (self._done < self._want_copies)
 
         def save_response(self, response_body, replicas_stored):
             """
             Records a response body (a locator, possibly signed) returned by
-            the Keep server.  It is not necessary to save more than
-            one response, since we presume that any locator returned
-            in response to a successful request is valid.
+            the Keep server, and the number of replicas it stored.
             """
             with self._done_lock:
                 self._done += replicas_stored
                 self._response = response_body
 
         def response(self):
-            """
-            Returns the body from the response to a PUT request.
-            """
+            """Return the body from the response to a PUT request."""
             with self._done_lock:
                 return self._response
 
         def done(self):
-            """
-            Return how many successes were reported.
-            """
+            """Return the total number of replicas successfully stored."""
             with self._done_lock:
                 return self._done
 
@@ -490,11 +480,17 @@ class KeepClient(object):
             if not timeouts:
                 return
             elif isinstance(timeouts, tuple):
-                conn_t, xfer_t = timeouts
+                if len(timeouts) == 2:
+                    conn_t, xfer_t = timeouts
+                    bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
+                else:
+                    conn_t, xfer_t, bandwidth_bps = timeouts
             else:
                 conn_t, xfer_t = (timeouts, timeouts)
+                bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
             curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
-            curl.setopt(pycurl.TIMEOUT_MS, int(xfer_t*1000))
+            curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t)))
+            curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
 
         def _headerfunction(self, header_line):
             header_line = header_line.decode('iso-8859-1')
@@ -598,20 +594,22 @@ class KeepClient(object):
 
         :timeout:
           The initial timeout (in seconds) for HTTP requests to Keep
-          non-proxy servers.  A tuple of two floats is interpreted as
-          (connection_timeout, read_timeout): see
-          http://docs.python-requests.org/en/latest/user/advanced/#timeouts.
-          Because timeouts are often a result of transient server load, the
-          actual connection timeout will be increased by a factor of two on
-          each retry.
-          Default: (2, 300).
+          non-proxy servers.  A tuple of three floats is interpreted as
+          (connection_timeout, read_timeout, minimum_bandwidth). A connection
+          will be aborted if the average traffic rate falls below
+          minimum_bandwidth bytes per second over an interval of read_timeout
+          seconds. Because timeouts are often a result of transient server
+          load, the actual connection timeout will be increased by a factor
+          of two on each retry.
+          Default: (2, 64, 32768).
 
         :proxy_timeout:
           The initial timeout (in seconds) for HTTP requests to
-          Keep proxies. A tuple of two floats is interpreted as
-          (connection_timeout, read_timeout). The behavior described
-          above for adjusting connection timeouts on retry also applies.
-          Default: (20, 300).
+          Keep proxies. A tuple of three floats is interpreted as
+          (connection_timeout, read_timeout, minimum_bandwidth). The behavior
+          described above for adjusting connection timeouts on retry also
+          applies.
+          Default: (20, 64, 32768).
 
         :api_token:
           If you're not using an API client, but only talking
@@ -658,6 +656,7 @@ class KeepClient(object):
             self.put = self.local_store_put
         else:
             self.num_retries = num_retries
+            self.max_replicas_per_service = None
             if proxy:
                 if not proxy.endswith('/'):
                     proxy += '/'
@@ -665,12 +664,12 @@ class KeepClient(object):
                 self._gateway_services = {}
                 self._keep_services = [{
                     'uuid': 'proxy',
+                    'service_type': 'proxy',
                     '_service_root': proxy,
                     }]
                 self._writable_services = self._keep_services
                 self.using_proxy = True
                 self._static_services_list = True
-                self.max_replicas_per_service = 1
             else:
                 # It's important to avoid instantiating an API client
                 # unless we actually need one, for testing's sake.
@@ -683,7 +682,6 @@ class KeepClient(object):
                 self._writable_services = None
                 self.using_proxy = None
                 self._static_services_list = False
-                self.max_replicas_per_service = 1
 
     def current_timeout(self, attempt_number):
         """Return the appropriate timeout to use for this client.
@@ -698,7 +696,13 @@ class KeepClient(object):
         # TODO(twp): the timeout should be a property of a
         # KeepService, not a KeepClient. See #4488.
         t = self.proxy_timeout if self.using_proxy else self.timeout
-        return (t[0] * (1 << attempt_number), t[1])
+        if len(t) == 2:
+            return (t[0] * (1 << attempt_number), t[1])
+        else:
+            return (t[0] * (1 << attempt_number), t[1], t[2])
+    def _any_nondisk_services(self, service_list):
+        return any(ks.get('service_type', 'disk') != 'disk'
+                   for ks in service_list)
 
     def build_services_list(self, force_rebuild=False):
         if (self._static_services_list or
@@ -710,12 +714,16 @@ class KeepClient(object):
             except Exception:  # API server predates Keep services.
                 keep_services = self.api_client.keep_disks().list()
 
-            accessible = keep_services.execute().get('items')
-            if not accessible:
+            # Gateway services are only used when specified by UUID,
+            # so there's nothing to gain by filtering them by
+            # service_type.
+            self._gateway_services = {ks['uuid']: ks for ks in
+                                      keep_services.execute()['items']}
+            if not self._gateway_services:
                 raise arvados.errors.NoKeepServersError()
 
             # Precompute the base URI for each service.
-            for r in accessible:
+            for r in self._gateway_services.itervalues():
                 host = r['service_host']
                 if not host.startswith('[') and host.find(':') >= 0:
                     # IPv6 URIs must be formatted like http://[::1]:80/...
@@ -725,27 +733,19 @@ class KeepClient(object):
                     host,
                     r['service_port'])
 
-            # Gateway services are only used when specified by UUID,
-            # so there's nothing to gain by filtering them by
-            # service_type.
-            self._gateway_services = {ks.get('uuid'): ks for ks in accessible}
             _logger.debug(str(self._gateway_services))
-
             self._keep_services = [
-                ks for ks in accessible
-                if ks.get('service_type') in ['disk', 'proxy']]
-            self._writable_services = [
-                ks for ks in accessible
-                if (ks.get('service_type') in ['disk', 'proxy']) and (True != ks.get('read_only'))]
-            _logger.debug(str(self._keep_services))
-
-            self.using_proxy = any(ks.get('service_type') == 'proxy'
-                                   for ks in self._keep_services)
+                ks for ks in self._gateway_services.itervalues()
+                if not ks.get('service_type', '').startswith('gateway:')]
+            self._writable_services = [ks for ks in self._keep_services
+                                       if not ks.get('read_only')]
+
             # For disk type services, max_replicas_per_service is 1
-            # It is unknown or unlimited for non-disk typed services.
-            for ks in accessible:
-                if ('disk' != ks.get('service_type')) and (not ks.get('read_only')):
-                    self.max_replicas_per_service = None
+            # It is unknown (unlimited) for other service types.
+            if self._any_nondisk_services(self._writable_services):
+                self.max_replicas_per_service = None
+            else:
+                self.max_replicas_per_service = 1
 
     def _service_weight(self, data_hash, service_uuid):
         """Compute the weight of a Keep service endpoint for a data
@@ -782,7 +782,8 @@ class KeepClient(object):
         # in that order.
         use_services = self._keep_services
         if need_writable:
-          use_services = self._writable_services
+            use_services = self._writable_services
+        self.using_proxy = self._any_nondisk_services(use_services)
         sorted_roots.extend([
             svc['_service_root'] for svc in sorted(
                 use_services,
@@ -966,10 +967,8 @@ class KeepClient(object):
         # Tell the proxy how many copies we want it to store
         headers['X-Keep-Desired-Replication'] = str(copies)
         roots_map = {}
-        thread_limiter = KeepClient.ThreadLimiter(1 if self.max_replicas_per_service is None else copies)
         loop = retry.RetryLoop(num_retries, self._check_loop_result,
                                backoff_start=2)
-        thread_sequence = 0
         for tries_left in loop:
             try:
                 sorted_roots = self.map_new_services(
@@ -979,6 +978,8 @@ class KeepClient(object):
                 loop.save_result(error)
                 continue
 
+            thread_limiter = KeepClient.ThreadLimiter(
+                copies, self.max_replicas_per_service)
             threads = []
             for service_root, ks in [(root, roots_map[root])
                                      for root in sorted_roots]:
@@ -991,10 +992,9 @@ class KeepClient(object):
                     service_root=service_root,
                     thread_limiter=thread_limiter,
                     timeout=self.current_timeout(num_retries-tries_left),
-                    thread_sequence=thread_sequence)
+                    thread_sequence=len(threads))
                 t.start()
                 threads.append(t)
-                thread_sequence += 1
             for t in threads:
                 t.join()
             loop.save_result((thread_limiter.done() >= copies, len(threads)))
index ea318c6de05c9624d8517d15e8c4071f287a4ab9..b2cf43652bce288858dca4a1db1a121b040af9ec 100644 (file)
@@ -47,6 +47,8 @@ def mock_api_responses(api_client, body, codes, headers={}):
     return mock.patch.object(api_client._http, 'request', side_effect=queue_with((
         (fake_httplib2_response(code, **headers), body) for code in codes)))
 
+def str_keep_locator(s):
+    return '{}+{}'.format(hashlib.md5(s).hexdigest(), len(s))
 
 class FakeCurl:
     @classmethod
@@ -126,8 +128,7 @@ class MockStreamReader(object):
     def __init__(self, name='.', *data):
         self._name = name
         self._data = ''.join(data)
-        self._data_locators = ['{}+{}'.format(hashlib.md5(d).hexdigest(),
-                                              len(d)) for d in data]
+        self._data_locators = [str_keep_locator(d) for d in data]
         self.num_retries = 0
 
     def name(self):
index ef724ed5c52130c76ca878a60d94a150ed044c12..f074f8d6cf67ee768a4f09e2515a850cac5e4c5c 100644 (file)
@@ -22,7 +22,12 @@ class Server(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer, object):
             'response_body': 0,
             # before returning from handler (thus setting response EOF)
             'response_close': 0,
+            # after writing over 1s worth of data at self.bandwidth
+            'mid_write': 0,
+            # after reading over 1s worth of data at self.bandwidth
+            'mid_read': 0,
         }
+        self.bandwidth = None
         super(Server, self).__init__(*args, **kwargs)
 
     def setdelays(self, **kwargs):
@@ -31,6 +36,12 @@ class Server(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer, object):
             self.delays.get(k) # NameError if unknown key
             self.delays[k] = v
 
+    def setbandwidth(self, bandwidth):
+        """For future requests, set the maximum bandwidth (number of bytes per
+        second) to operate at. If setbandwidth is never called, function at
+        maximum bandwidth possible"""
+        self.bandwidth = float(bandwidth)
+
     def _sleep_at_least(self, seconds):
         """Sleep for given time, even if signals are received."""
         wake = time.time() + seconds
@@ -44,6 +55,53 @@ class Server(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer, object):
 
 
 class Handler(BaseHTTPServer.BaseHTTPRequestHandler, object):
+    def wfile_bandwidth_write(self, data_to_write):
+        if self.server.bandwidth == None and self.server.delays['mid_write'] == 0:
+            self.wfile.write(data_to_write)
+        else:
+            BYTES_PER_WRITE = int(self.server.bandwidth/4.0) or 32768
+            outage_happened = False
+            num_bytes = len(data_to_write)
+            num_sent_bytes = 0
+            target_time = time.time()
+            while num_sent_bytes < num_bytes:
+                if num_sent_bytes > self.server.bandwidth and not outage_happened:
+                    self.server._do_delay('mid_write')
+                    target_time += self.delays['mid_write']
+                    outage_happened = True
+                num_write_bytes = min(BYTES_PER_WRITE,
+                    num_bytes - num_sent_bytes)
+                self.wfile.write(data_to_write[
+                    num_sent_bytes:num_sent_bytes+num_write_bytes])
+                num_sent_bytes += num_write_bytes
+                if self.server.bandwidth is not None:
+                    target_time += num_write_bytes / self.server.bandwidth
+                    self.server._sleep_at_least(target_time - time.time())
+        return None
+
+    def rfile_bandwidth_read(self, bytes_to_read):
+        if self.server.bandwidth == None and self.server.delays['mid_read'] == 0:
+            return self.rfile.read(bytes_to_read)
+        else:
+            BYTES_PER_READ = int(self.server.bandwidth/4.0) or 32768
+            data = ''
+            outage_happened = False
+            bytes_read = 0
+            target_time = time.time()
+            while bytes_to_read > bytes_read:
+                if bytes_read > self.server.bandwidth and not outage_happened:
+                    self.server._do_delay('mid_read')
+                    target_time += self.delays['mid_read']
+                    outage_happened = True
+                next_bytes_to_read = min(BYTES_PER_READ,
+                    bytes_to_read - bytes_read)
+                data += self.rfile.read(next_bytes_to_read)
+                bytes_read += next_bytes_to_read
+                if self.server.bandwidth is not None:
+                    target_time += next_bytes_to_read / self.server.bandwidth
+                    self.server._sleep_at_least(target_time - time.time())
+        return data
+
     def handle(self, *args, **kwargs):
         self.server._do_delay('request')
         return super(Handler, self).handle(*args, **kwargs)
@@ -60,21 +118,18 @@ class Handler(BaseHTTPServer.BaseHTTPRequestHandler, object):
         self.send_header('Content-type', 'application/octet-stream')
         self.end_headers()
         self.server._do_delay('response_body')
-        self.wfile.write(self.server.store[datahash])
+        self.wfile_bandwidth_write(self.server.store[datahash])
         self.server._do_delay('response_close')
 
     def do_PUT(self):
         self.server._do_delay('request_body')
-
         # The comments at https://bugs.python.org/issue1491 implies that Python
         # 2.7 BaseHTTPRequestHandler was patched to support 100 Continue, but
         # reading the actual code that ships in Debian it clearly is not, so we
         # need to send the response on the socket directly.
-
-        self.wfile.write("%s %d %s\r\n\r\n" %
+        self.wfile_bandwidth_write("%s %d %s\r\n\r\n" %
                          (self.protocol_version, 100, "Continue"))
-
-        data = self.rfile.read(int(self.headers.getheader('content-length')))
+        data = self.rfile_bandwidth_read(int(self.headers.getheader('content-length')))
         datahash = hashlib.md5(data).hexdigest()
         self.server.store[datahash] = data
         self.server._do_delay('response')
@@ -82,7 +137,7 @@ class Handler(BaseHTTPServer.BaseHTTPRequestHandler, object):
         self.send_header('Content-type', 'text/plain')
         self.end_headers()
         self.server._do_delay('response_body')
-        self.wfile.write(datahash + '+' + str(len(data)))
+        self.wfile_bandwidth_write(datahash + '+' + str(len(data)))
         self.server._do_delay('response_close')
 
     def log_request(self, *args, **kwargs):
index 2d8e4759d695a4538def2aad1b6c3794daa3f073..8f0abd245ba752cd778473f0951f1b89a38cc868 100644 (file)
@@ -1,6 +1,5 @@
 import arvados
 import arvados_testutil as tutil
-import hashlib
 
 class ManifestExamples(object):
     def make_manifest(self,
@@ -9,8 +8,7 @@ class ManifestExamples(object):
                       files_per_stream=1,
                       streams=1):
         datablip = 'x' * bytes_per_block
-        data_loc = '{}+{}'.format(hashlib.md5(datablip).hexdigest(),
-                                  bytes_per_block)
+        data_loc = tutil.str_keep_locator(datablip)
         with tutil.mock_keep_responses(data_loc, 200):
             coll = arvados.CollectionWriter()
             for si in range(0, streams):
index d8a207f2cf2ce506d9406a646272e92c51c201e6..2b8b6ca1c4ad531c29bfd1c9a149da7c9bdf3599 100644 (file)
@@ -28,4 +28,30 @@ http {
       proxy_pass http://keepproxy;
     }
   }
+  upstream keep-web {
+    server localhost:{{KEEPWEBPORT}};
+  }
+  server {
+    listen *:{{KEEPWEBSSLPORT}} ssl default_server;
+    server_name ~^(?<request_host>.*)$;
+    ssl_certificate {{SSLCERT}};
+    ssl_certificate_key {{SSLKEY}};
+    location  / {
+      proxy_pass http://keep-web;
+      proxy_set_header Host $request_host:{{KEEPWEBPORT}};
+      proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
+    }
+  }
+  server {
+    listen *:{{KEEPWEBDLSSLPORT}} ssl default_server;
+    server_name ~.*;
+    ssl_certificate {{SSLCERT}};
+    ssl_certificate_key {{SSLKEY}};
+    location  / {
+      proxy_pass http://keep-web;
+      proxy_set_header Host download:{{KEEPWEBPORT}};
+      proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
+      proxy_redirect //download:{{KEEPWEBPORT}}/ https://$host:{{KEEPWEBDLSSLPORT}}/;
+    }
+  }
 }
index d325b4eb6ecb086d15effa34bc26db3e95c9ad15..adb8652edc952ab80eb67c0b8a8708da3fb954bd 100644 (file)
@@ -32,7 +32,6 @@ import arvados.config
 
 ARVADOS_DIR = os.path.realpath(os.path.join(MY_DIRNAME, '../../..'))
 SERVICES_SRC_DIR = os.path.join(ARVADOS_DIR, 'services')
-SERVER_PID_PATH = 'tmp/pids/test-server.pid'
 if 'GOPATH' in os.environ:
     gopaths = os.environ['GOPATH'].split(':')
     gobins = [os.path.join(path, 'bin') for path in gopaths]
@@ -70,28 +69,62 @@ def kill_server_pid(pidfile, wait=10, passenger_root=False):
     import signal
     import subprocess
     import time
-    try:
-        if passenger_root:
-            # First try to shut down nicely
-            restore_cwd = os.getcwd()
-            os.chdir(passenger_root)
-            subprocess.call([
-                'bundle', 'exec', 'passenger', 'stop', '--pid-file', pidfile])
-            os.chdir(restore_cwd)
-        now = time.time()
-        timeout = now + wait
-        with open(pidfile, 'r') as f:
-            server_pid = int(f.read())
-        while now <= timeout:
-            if not passenger_root or timeout - now < wait / 2:
-                # Half timeout has elapsed. Start sending SIGTERM
-                os.kill(server_pid, signal.SIGTERM)
-            # Raise OSError if process has disappeared
-            os.getpgid(server_pid)
+
+    now = time.time()
+    startTERM = now
+    deadline = now + wait
+
+    if passenger_root:
+        # First try to shut down nicely
+        restore_cwd = os.getcwd()
+        os.chdir(passenger_root)
+        subprocess.call([
+            'bundle', 'exec', 'passenger', 'stop', '--pid-file', pidfile])
+        os.chdir(restore_cwd)
+        # Use up to half of the +wait+ period waiting for "passenger
+        # stop" to work. If the process hasn't exited by then, start
+        # sending TERM signals.
+        startTERM += wait/2
+
+    server_pid = None
+    while now <= deadline and server_pid is None:
+        try:
+            with open(pidfile, 'r') as f:
+                server_pid = int(f.read())
+        except IOError:
+            # No pidfile = nothing to kill.
+            return
+        except ValueError as error:
+            # Pidfile exists, but we can't parse it. Perhaps the
+            # server has created the file but hasn't written its PID
+            # yet?
+            print("Parse error reading pidfile {}: {}".format(pidfile, error))
             time.sleep(0.1)
             now = time.time()
-    except EnvironmentError:
-        pass
+
+    while now <= deadline:
+        try:
+            exited, _ = os.waitpid(server_pid, os.WNOHANG)
+            if exited > 0:
+                return
+        except OSError:
+            # already exited, or isn't our child process
+            pass
+        try:
+            if now >= startTERM:
+                os.kill(server_pid, signal.SIGTERM)
+                print("Sent SIGTERM to {} ({})".format(server_pid, pidfile))
+        except OSError as error:
+            if error.errno == errno.ESRCH:
+                # Thrown by os.getpgid() or os.kill() if the process
+                # does not exist, i.e., our work here is done.
+                return
+            raise
+        time.sleep(0.1)
+        now = time.time()
+
+    print("Server PID {} ({}) did not exit, giving up after {}s".
+          format(server_pid, pidfile, wait))
 
 def find_available_port():
     """Return an IPv4 port number that is not in use right now.
@@ -139,6 +172,26 @@ def _wait_until_port_listens(port, timeout=10):
         format(port, timeout),
         file=sys.stderr)
 
+def _fifo2stderr(label):
+    """Create a fifo, and copy it to stderr, prepending label to each line.
+
+    Return value is the path to the new FIFO.
+
+    +label+ should contain only alphanumerics: it is also used as part
+    of the FIFO filename.
+    """
+    fifo = os.path.join(TEST_TMPDIR, label+'.fifo')
+    try:
+        os.remove(fifo)
+    except OSError as error:
+        if error.errno != errno.ENOENT:
+            raise
+    os.mkfifo(fifo, 0700)
+    subprocess.Popen(
+        ['sed', '-e', 's/^/['+label+'] /', fifo],
+        stdout=sys.stderr)
+    return fifo
+
 def run(leave_running_atexit=False):
     """Ensure an API server is running, and ARVADOS_API_* env vars have
     admin credentials for it.
@@ -159,7 +212,7 @@ def run(leave_running_atexit=False):
     # Delete cached discovery document.
     shutil.rmtree(arvados.http_cache('discovery'))
 
-    pid_file = os.path.join(SERVICES_SRC_DIR, 'api', SERVER_PID_PATH)
+    pid_file = _pidfile('api')
     pid_file_ok = find_server_pid(pid_file, 0)
 
     existing_api_host = os.environ.get('ARVADOS_TEST_API_HOST', my_api_host)
@@ -231,7 +284,7 @@ def run(leave_running_atexit=False):
     start_msg = subprocess.check_output(
         ['bundle', 'exec',
          'passenger', 'start', '-d', '-p{}'.format(port),
-         '--pid-file', os.path.join(os.getcwd(), pid_file),
+         '--pid-file', pid_file,
          '--log-file', os.path.join(os.getcwd(), 'log/test.log'),
          '--ssl',
          '--ssl-certificate', 'tmp/self-signed.pem',
@@ -293,7 +346,7 @@ def stop(force=False):
     """
     global my_api_host
     if force or my_api_host is not None:
-        kill_server_pid(os.path.join(SERVICES_SRC_DIR, 'api', SERVER_PID_PATH))
+        kill_server_pid(_pidfile('api'))
         my_api_host = None
 
 def _start_keep(n, keep_args):
@@ -307,9 +360,10 @@ def _start_keep(n, keep_args):
     for arg, val in keep_args.iteritems():
         keep_cmd.append("{}={}".format(arg, val))
 
-    logf = open(os.path.join(TEST_TMPDIR, 'keep{}.log'.format(n)), 'a+')
+    logf = open(_fifo2stderr('keep{}'.format(n)), 'w')
     kp0 = subprocess.Popen(
         keep_cmd, stdin=open('/dev/null'), stdout=logf, stderr=logf, close_fds=True)
+
     with open(_pidfile('keep{}'.format(n)), 'w') as f:
         f.write(str(kp0.pid))
 
@@ -333,7 +387,7 @@ def run_keep(blob_signing_key=None, enforce_permissions=False, num_servers=2):
         keep_args['-enforce-permissions'] = 'true'
     with open(os.path.join(TEST_TMPDIR, "keep.data-manager-token-file"), "w") as f:
         keep_args['-data-manager-token-file'] = f.name
-        f.write(os.environ['ARVADOS_API_TOKEN'])
+        f.write(auth_token('data_manager'))
     keep_args['-never-delete'] = 'false'
 
     api = arvados.api(
@@ -342,7 +396,7 @@ def run_keep(blob_signing_key=None, enforce_permissions=False, num_servers=2):
         token=os.environ['ARVADOS_API_TOKEN'],
         insecure=True)
 
-    for d in api.keep_services().list().execute()['items']:
+    for d in api.keep_services().list(filters=[['service_type','=','disk']]).execute()['items']:
         api.keep_services().delete(uuid=d['uuid']).execute()
     for d in api.keep_disks().list().execute()['items']:
         api.keep_disks().delete(uuid=d['uuid']).execute()
@@ -360,8 +414,14 @@ def run_keep(blob_signing_key=None, enforce_permissions=False, num_servers=2):
             'keep_disk': {'keep_service_uuid': svc['uuid'] }
         }).execute()
 
+    # If keepproxy is running, send SIGHUP to make it discover the new
+    # keepstore services.
+    proxypidfile = _pidfile('keepproxy')
+    if os.path.exists(proxypidfile):
+        os.kill(int(open(proxypidfile).read()), signal.SIGHUP)
+
 def _stop_keep(n):
-    kill_server_pid(_pidfile('keep{}'.format(n)), 0)
+    kill_server_pid(_pidfile('keep{}'.format(n)))
     if os.path.exists("{}/keep{}.volume".format(TEST_TMPDIR, n)):
         with open("{}/keep{}.volume".format(TEST_TMPDIR, n), 'r') as r:
             shutil.rmtree(r.read(), True)
@@ -378,20 +438,20 @@ def run_keep_proxy():
         return
     stop_keep_proxy()
 
-    admin_token = auth_token('admin')
     port = find_available_port()
     env = os.environ.copy()
-    env['ARVADOS_API_TOKEN'] = admin_token
+    env['ARVADOS_API_TOKEN'] = auth_token('anonymous')
+    logf = open(_fifo2stderr('keepproxy'), 'w')
     kp = subprocess.Popen(
         ['keepproxy',
          '-pid='+_pidfile('keepproxy'),
          '-listen=:{}'.format(port)],
-        env=env, stdin=open('/dev/null'), stdout=sys.stderr)
+        env=env, stdin=open('/dev/null'), stdout=logf, stderr=logf, close_fds=True)
 
     api = arvados.api(
         version='v1',
         host=os.environ['ARVADOS_API_HOST'],
-        token=admin_token,
+        token=auth_token('admin'),
         insecure=True)
     for d in api.keep_services().list(
             filters=[['service_type','=','proxy']]).execute()['items']:
@@ -409,7 +469,7 @@ def run_keep_proxy():
 def stop_keep_proxy():
     if 'ARVADOS_TEST_PROXY_SERVICES' in os.environ:
         return
-    kill_server_pid(_pidfile('keepproxy'), wait=0)
+    kill_server_pid(_pidfile('keepproxy'))
 
 def run_arv_git_httpd():
     if 'ARVADOS_TEST_PROXY_SERVICES' in os.environ:
@@ -420,11 +480,12 @@ def run_arv_git_httpd():
     gitport = find_available_port()
     env = os.environ.copy()
     env.pop('ARVADOS_API_TOKEN', None)
+    logf = open(_fifo2stderr('arv-git-httpd'), 'w')
     agh = subprocess.Popen(
         ['arv-git-httpd',
          '-repo-root='+gitdir+'/test',
          '-address=:'+str(gitport)],
-        env=env, stdin=open('/dev/null'), stdout=sys.stderr)
+        env=env, stdin=open('/dev/null'), stdout=logf, stderr=logf)
     with open(_pidfile('arv-git-httpd'), 'w') as f:
         f.write(str(agh.pid))
     _setport('arv-git-httpd', gitport)
@@ -433,19 +494,47 @@ def run_arv_git_httpd():
 def stop_arv_git_httpd():
     if 'ARVADOS_TEST_PROXY_SERVICES' in os.environ:
         return
-    kill_server_pid(_pidfile('arv-git-httpd'), wait=0)
+    kill_server_pid(_pidfile('arv-git-httpd'))
+
+def run_keep_web():
+    if 'ARVADOS_TEST_PROXY_SERVICES' in os.environ:
+        return
+    stop_keep_web()
+
+    keepwebport = find_available_port()
+    env = os.environ.copy()
+    env['ARVADOS_API_TOKEN'] = auth_token('anonymous')
+    logf = open(_fifo2stderr('keep-web'), 'w')
+    keepweb = subprocess.Popen(
+        ['keep-web',
+         '-allow-anonymous',
+         '-attachment-only-host=download:'+str(keepwebport),
+         '-listen=:'+str(keepwebport)],
+        env=env, stdin=open('/dev/null'), stdout=logf, stderr=logf)
+    with open(_pidfile('keep-web'), 'w') as f:
+        f.write(str(keepweb.pid))
+    _setport('keep-web', keepwebport)
+    _wait_until_port_listens(keepwebport)
+
+def stop_keep_web():
+    if 'ARVADOS_TEST_PROXY_SERVICES' in os.environ:
+        return
+    kill_server_pid(_pidfile('keep-web'))
 
 def run_nginx():
     if 'ARVADOS_TEST_PROXY_SERVICES' in os.environ:
         return
     nginxconf = {}
+    nginxconf['KEEPWEBPORT'] = _getport('keep-web')
+    nginxconf['KEEPWEBDLSSLPORT'] = find_available_port()
+    nginxconf['KEEPWEBSSLPORT'] = find_available_port()
     nginxconf['KEEPPROXYPORT'] = _getport('keepproxy')
     nginxconf['KEEPPROXYSSLPORT'] = find_available_port()
     nginxconf['GITPORT'] = _getport('arv-git-httpd')
     nginxconf['GITSSLPORT'] = find_available_port()
     nginxconf['SSLCERT'] = os.path.join(SERVICES_SRC_DIR, 'api', 'tmp', 'self-signed.pem')
     nginxconf['SSLKEY'] = os.path.join(SERVICES_SRC_DIR, 'api', 'tmp', 'self-signed.key')
-    nginxconf['ACCESSLOG'] = os.path.join(TEST_TMPDIR, 'nginx_access_log.fifo')
+    nginxconf['ACCESSLOG'] = _fifo2stderr('nginx_access_log')
 
     conftemplatefile = os.path.join(MY_DIRNAME, 'nginx.conf')
     conffile = os.path.join(TEST_TMPDIR, 'nginx.conf')
@@ -458,29 +547,21 @@ def run_nginx():
     env = os.environ.copy()
     env['PATH'] = env['PATH']+':/sbin:/usr/sbin:/usr/local/sbin'
 
-    try:
-        os.remove(nginxconf['ACCESSLOG'])
-    except OSError as error:
-        if error.errno != errno.ENOENT:
-            raise
-
-    os.mkfifo(nginxconf['ACCESSLOG'], 0700)
     nginx = subprocess.Popen(
         ['nginx',
          '-g', 'error_log stderr info;',
          '-g', 'pid '+_pidfile('nginx')+';',
          '-c', conffile],
         env=env, stdin=open('/dev/null'), stdout=sys.stderr)
-    cat_access = subprocess.Popen(
-        ['cat', nginxconf['ACCESSLOG']],
-        stdout=sys.stderr)
+    _setport('keep-web-dl-ssl', nginxconf['KEEPWEBDLSSLPORT'])
+    _setport('keep-web-ssl', nginxconf['KEEPWEBSSLPORT'])
     _setport('keepproxy-ssl', nginxconf['KEEPPROXYSSLPORT'])
     _setport('arv-git-httpd-ssl', nginxconf['GITSSLPORT'])
 
 def stop_nginx():
     if 'ARVADOS_TEST_PROXY_SERVICES' in os.environ:
         return
-    kill_server_pid(_pidfile('nginx'), wait=0)
+    kill_server_pid(_pidfile('nginx'))
 
 def _pidfile(program):
     return os.path.join(TEST_TMPDIR, program + '.pid')
@@ -555,6 +636,7 @@ class TestCaseWithServers(unittest.TestCase):
     MAIN_SERVER = None
     KEEP_SERVER = None
     KEEP_PROXY_SERVER = None
+    KEEP_WEB_SERVER = None
 
     @staticmethod
     def _restore_dict(src, dest):
@@ -573,7 +655,8 @@ class TestCaseWithServers(unittest.TestCase):
         for server_kwargs, start_func, stop_func in (
                 (cls.MAIN_SERVER, run, reset),
                 (cls.KEEP_SERVER, run_keep, stop_keep),
-                (cls.KEEP_PROXY_SERVER, run_keep_proxy, stop_keep_proxy)):
+                (cls.KEEP_PROXY_SERVER, run_keep_proxy, stop_keep_proxy),
+                (cls.KEEP_WEB_SERVER, run_keep_web, stop_keep_web)):
             if server_kwargs is not None:
                 start_func(**server_kwargs)
                 cls._cleanup_funcs.append(stop_func)
@@ -599,6 +682,7 @@ if __name__ == "__main__":
         'start', 'stop',
         'start_keep', 'stop_keep',
         'start_keep_proxy', 'stop_keep_proxy',
+        'start_keep-web', 'stop_keep-web',
         'start_arv-git-httpd', 'stop_arv-git-httpd',
         'start_nginx', 'stop_nginx',
     ]
@@ -629,7 +713,7 @@ if __name__ == "__main__":
     elif args.action == 'start_keep':
         run_keep(enforce_permissions=args.keep_enforce_permissions, num_servers=args.num_keep_servers)
     elif args.action == 'stop_keep':
-        stop_keep()
+        stop_keep(num_servers=args.num_keep_servers)
     elif args.action == 'start_keep_proxy':
         run_keep_proxy()
     elif args.action == 'stop_keep_proxy':
@@ -638,6 +722,10 @@ if __name__ == "__main__":
         run_arv_git_httpd()
     elif args.action == 'stop_arv-git-httpd':
         stop_arv_git_httpd()
+    elif args.action == 'start_keep-web':
+        run_keep_web()
+    elif args.action == 'stop_keep-web':
+        stop_keep_web()
     elif args.action == 'start_nginx':
         run_nginx()
     elif args.action == 'stop_nginx':
index 90bbacfe5af7146ffa4ec3924c00e1b44e0d6715..664b57fc00a57cef068e352232d55d0dfa548a58 100644 (file)
@@ -1,7 +1,6 @@
 #!/usr/bin/env python
 # -*- coding: utf-8 -*-
 
-import hashlib
 import io
 import random
 
@@ -11,6 +10,8 @@ import arvados.errors as arv_error
 import arvados.commands.ls as arv_ls
 import run_test_server
 
+from arvados_testutil import str_keep_locator
+
 class ArvLsTestCase(run_test_server.TestCaseWithServers):
     FAKE_UUID = 'zzzzz-4zz18-12345abcde12345'
 
@@ -24,8 +25,7 @@ class ArvLsTestCase(run_test_server.TestCaseWithServers):
 
     def mock_api_for_manifest(self, manifest_lines, uuid=FAKE_UUID):
         manifest_text = self.newline_join(manifest_lines)
-        pdh = '{}+{}'.format(hashlib.md5(manifest_text).hexdigest(),
-                             len(manifest_text))
+        pdh = str_keep_locator(manifest_text)
         coll_info = {'uuid': uuid,
                      'portable_data_hash': pdh,
                      'manifest_text': manifest_text}
index 330dd448278259aef3a686d652c81f72cdf6405e..ea8661437c04f3c0d2dec8d4a01c0c8176a2e87c 100644 (file)
@@ -6,7 +6,6 @@ import io
 import mock
 import os
 import unittest
-import hashlib
 import time
 
 import arvados
@@ -30,7 +29,7 @@ class ArvadosFileWriterTestCase(unittest.TestCase):
             self.requests.append(locator)
             return self.blocks.get(locator)
         def put(self, data, num_retries=None):
-            pdh = "%s+%i" % (hashlib.md5(data).hexdigest(), len(data))
+            pdh = tutil.str_keep_locator(data)
             self.blocks[pdh] = str(data)
             return pdh
 
@@ -453,7 +452,7 @@ class ArvadosFileReaderTestCase(StreamFileReaderTestCase):
         n = 0
         blocks = {}
         for d in ['01234', '34567', '67890']:
-            loc = '{}+{}'.format(hashlib.md5(d).hexdigest(), len(d))
+            loc = tutil.str_keep_locator(d)
             blocks[loc] = d
             stream.append(Range(loc, n, len(d)))
             n += len(d)
index ac7dd1b9f678ab6391ad71b13201374d127aaba3..bc3c0bf1afc7e292f07f34e3e8450f32b69b8b8f 100644 (file)
@@ -4,7 +4,6 @@
 
 import arvados
 import copy
-import hashlib
 import mock
 import os
 import pprint
@@ -395,7 +394,7 @@ class ArvadosCollectionsTest(run_test_server.TestCaseWithServers,
     def test_write_directory_tree_with_zero_recursion(self):
         cwriter = arvados.CollectionWriter(self.api_client)
         content = 'd1/d2/f3d1/f2f1'
-        blockhash = hashlib.md5(content).hexdigest() + '+' + str(len(content))
+        blockhash = tutil.str_keep_locator(content)
         cwriter.write_directory_tree(
             self.build_directory_tree(['f1', 'd1/f2', 'd1/d2/f3']),
             max_manifest_depth=0)
@@ -739,7 +738,7 @@ class CollectionWriterTestCase(unittest.TestCase, CollectionTestMixin):
             self.assertEqual('.', writer.current_stream_name())
             self.assertEqual('out', writer.current_file_name())
             out_file.write('test data')
-            data_loc = hashlib.md5('test data').hexdigest() + '+9'
+            data_loc = tutil.str_keep_locator('test data')
         self.assertTrue(out_file.closed, "writer file not closed after context")
         self.assertRaises(ValueError, out_file.write, 'extra text')
         with self.mock_keep(data_loc, 200) as keep_mock:
@@ -751,15 +750,15 @@ class CollectionWriterTestCase(unittest.TestCase, CollectionTestMixin):
         writer = arvados.CollectionWriter(client)
         with writer.open('six') as out_file:
             out_file.writelines(['12', '34', '56'])
-            data_loc = hashlib.md5('123456').hexdigest() + '+6'
+            data_loc = tutil.str_keep_locator('123456')
         with self.mock_keep(data_loc, 200) as keep_mock:
             self.assertEqual(". {} 0:6:six\n".format(data_loc),
                              writer.manifest_text())
 
     def test_open_flush(self):
         client = self.api_client_mock()
-        data_loc1 = hashlib.md5('flush1').hexdigest() + '+6'
-        data_loc2 = hashlib.md5('flush2').hexdigest() + '+6'
+        data_loc1 = tutil.str_keep_locator('flush1')
+        data_loc2 = tutil.str_keep_locator('flush2')
         with self.mock_keep((data_loc1, 200), (data_loc2, 200)) as keep_mock:
             writer = arvados.CollectionWriter(client)
             with writer.open('flush_test') as out_file:
@@ -777,15 +776,15 @@ class CollectionWriterTestCase(unittest.TestCase, CollectionTestMixin):
             out_file.write('1st')
         with writer.open('.', '2') as out_file:
             out_file.write('2nd')
-        data_loc = hashlib.md5('1st2nd').hexdigest() + '+6'
+        data_loc = tutil.str_keep_locator('1st2nd')
         with self.mock_keep(data_loc, 200) as keep_mock:
             self.assertEqual(". {} 0:3:1 3:3:2\n".format(data_loc),
                              writer.manifest_text())
 
     def test_two_opens_two_streams(self):
         client = self.api_client_mock()
-        data_loc1 = hashlib.md5('file').hexdigest() + '+4'
-        data_loc2 = hashlib.md5('indir').hexdigest() + '+5'
+        data_loc1 = tutil.str_keep_locator('file')
+        data_loc2 = tutil.str_keep_locator('indir')
         with self.mock_keep((data_loc1, 200), (data_loc2, 200)) as keep_mock:
             writer = arvados.CollectionWriter(client)
             with writer.open('file') as out_file:
index 90468924a668b09e9116084adf20581878a382b2..52eb0e9915d813b3ce0d392463e1990fa00c04ca 100644 (file)
@@ -287,8 +287,11 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
             self.assertEqual(
-                mock.responses[0].getopt(pycurl.TIMEOUT_MS),
-                int(arvados.KeepClient.DEFAULT_TIMEOUT[1]*1000))
+                mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
+                int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
+            self.assertEqual(
+                mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
+                int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
 
     def test_put_timeout(self):
         api_client = self.mock_keep_services(count=1)
@@ -301,8 +304,11 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
             self.assertEqual(
-                mock.responses[0].getopt(pycurl.TIMEOUT_MS),
-                int(arvados.KeepClient.DEFAULT_TIMEOUT[1]*1000))
+                mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
+                int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
+            self.assertEqual(
+                mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
+                int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
 
     def test_proxy_get_timeout(self):
         api_client = self.mock_keep_services(service_type='proxy', count=1)
@@ -315,8 +321,11 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
             self.assertEqual(
-                mock.responses[0].getopt(pycurl.TIMEOUT_MS),
-                int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]*1000))
+                mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
+                int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
+            self.assertEqual(
+                mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
+                int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
 
     def test_proxy_put_timeout(self):
         api_client = self.mock_keep_services(service_type='proxy', count=1)
@@ -329,8 +338,11 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
             self.assertEqual(
-                mock.responses[0].getopt(pycurl.TIMEOUT_MS),
-                int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]*1000))
+                mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
+                int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
+            self.assertEqual(
+                mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
+                int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
 
     def check_no_services_error(self, verb, exc_class):
         api_client = mock.MagicMock(name='api_client')
@@ -368,7 +380,7 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
 
     def test_put_error_does_not_include_successful_puts(self):
         data = 'partial failure test'
-        data_loc = '{}+{}'.format(hashlib.md5(data).hexdigest(), len(data))
+        data_loc = tutil.str_keep_locator(data)
         api_client = self.mock_keep_services(count=3)
         with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
                 self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
@@ -378,7 +390,7 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
 
     def test_proxy_put_with_no_writable_services(self):
         data = 'test with no writable services'
-        data_loc = '{}+{}'.format(hashlib.md5(data).hexdigest(), len(data))
+        data_loc = tutil.str_keep_locator(data)
         api_client = self.mock_keep_services(service_type='proxy', read_only=True, count=1)
         with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
                 self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
@@ -387,6 +399,36 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
         self.assertEqual(True, ("no Keep services available" in str(exc_check.exception)))
         self.assertEqual(0, len(exc_check.exception.request_errors()))
 
+    def test_oddball_service_get(self):
+        body = 'oddball service get'
+        api_client = self.mock_keep_services(service_type='fancynewblobstore')
+        with tutil.mock_keep_responses(body, 200):
+            keep_client = arvados.KeepClient(api_client=api_client)
+            actual = keep_client.get(tutil.str_keep_locator(body))
+        self.assertEqual(body, actual)
+
+    def test_oddball_service_put(self):
+        body = 'oddball service put'
+        pdh = tutil.str_keep_locator(body)
+        api_client = self.mock_keep_services(service_type='fancynewblobstore')
+        with tutil.mock_keep_responses(pdh, 200):
+            keep_client = arvados.KeepClient(api_client=api_client)
+            actual = keep_client.put(body, copies=1)
+        self.assertEqual(pdh, actual)
+
+    def test_oddball_service_writer_count(self):
+        body = 'oddball service writer count'
+        pdh = tutil.str_keep_locator(body)
+        api_client = self.mock_keep_services(service_type='fancynewblobstore',
+                                             count=4)
+        headers = {'x-keep-replicas-stored': 3}
+        with tutil.mock_keep_responses(pdh, 200, 418, 418, 418,
+                                       **headers) as req_mock:
+            keep_client = arvados.KeepClient(api_client=api_client)
+            actual = keep_client.put(body, copies=2)
+        self.assertEqual(pdh, actual)
+        self.assertEqual(1, req_mock.call_count)
+
 
 @tutil.skip_sleep
 class KeepClientRendezvousTestCase(unittest.TestCase, tutil.ApiClientMock):
@@ -518,7 +560,7 @@ class KeepClientRendezvousTestCase(unittest.TestCase, tutil.ApiClientMock):
     def check_64_zeros_error_order(self, verb, exc_class):
         data = '0' * 64
         if verb == 'get':
-            data = hashlib.md5(data).hexdigest() + '+1234'
+            data = tutil.str_keep_locator(data)
         # Arbitrary port number:
         aport = random.randint(1024,65535)
         api_client = self.mock_keep_services(service_port=aport, count=self.services)
@@ -540,7 +582,12 @@ class KeepClientRendezvousTestCase(unittest.TestCase, tutil.ApiClientMock):
 
 
 class KeepClientTimeout(unittest.TestCase, tutil.ApiClientMock):
-    DATA = 'x' * 2**10
+    # BANDWIDTH_LOW_LIM must be less than len(DATA) so we can transfer
+    # 1s worth of data and then trigger bandwidth errors before running
+    # out of data.
+    DATA = 'x'*2**11
+    BANDWIDTH_LOW_LIM = 1024
+    TIMEOUT_TIME = 1.0
 
     class assertTakesBetween(unittest.TestCase):
         def __init__(self, tmin, tmax):
@@ -551,8 +598,22 @@ class KeepClientTimeout(unittest.TestCase, tutil.ApiClientMock):
             self.t0 = time.time()
 
         def __exit__(self, *args, **kwargs):
-            self.assertGreater(time.time() - self.t0, self.tmin)
-            self.assertLess(time.time() - self.t0, self.tmax)
+            # Round times to milliseconds, like CURL. Otherwise, we
+            # fail when CURL reaches a 1s timeout at 0.9998s.
+            delta = round(time.time() - self.t0, 3)
+            self.assertGreaterEqual(delta, self.tmin)
+            self.assertLessEqual(delta, self.tmax)
+
+    class assertTakesGreater(unittest.TestCase):
+        def __init__(self, tmin):
+            self.tmin = tmin
+
+        def __enter__(self):
+            self.t0 = time.time()
+
+        def __exit__(self, *args, **kwargs):
+            delta = round(time.time() - self.t0, 3)
+            self.assertGreaterEqual(delta, self.tmin)
 
     def setUp(self):
         sock = socket.socket()
@@ -572,7 +633,7 @@ class KeepClientTimeout(unittest.TestCase, tutil.ApiClientMock):
     def tearDown(self):
         self.server.shutdown()
 
-    def keepClient(self, timeouts=(0.1, 1.0)):
+    def keepClient(self, timeouts=(0.1, TIMEOUT_TIME, BANDWIDTH_LOW_LIM)):
         return arvados.KeepClient(
             api_client=self.api_client,
             timeout=timeouts)
@@ -587,39 +648,89 @@ class KeepClientTimeout(unittest.TestCase, tutil.ApiClientMock):
         )
         with self.assertTakesBetween(0.1, 0.5):
             with self.assertRaises(arvados.errors.KeepWriteError):
-                self.keepClient((0.1, 1)).put(self.DATA, copies=1, num_retries=0)
+                self.keepClient().put(self.DATA, copies=1, num_retries=0)
+
+    def test_low_bandwidth_no_delays_success(self):
+        self.server.setbandwidth(2*self.BANDWIDTH_LOW_LIM)
+        kc = self.keepClient()
+        loc = kc.put(self.DATA, copies=1, num_retries=0)
+        self.assertEqual(self.DATA, kc.get(loc, num_retries=0))
+
+    def test_too_low_bandwidth_no_delays_failure(self):
+        # Check that lessening bandwidth corresponds to failing
+        kc = self.keepClient()
+        loc = kc.put(self.DATA, copies=1, num_retries=0)
+        self.server.setbandwidth(0.5*self.BANDWIDTH_LOW_LIM)
+        with self.assertTakesGreater(self.TIMEOUT_TIME):
+            with self.assertRaises(arvados.errors.KeepReadError) as e:
+                kc.get(loc, num_retries=0)
+        with self.assertTakesGreater(self.TIMEOUT_TIME):
+            with self.assertRaises(arvados.errors.KeepWriteError):
+                kc.put(self.DATA, copies=1, num_retries=0)
+
+    def test_low_bandwidth_with_server_response_delay_failure(self):
+        kc = self.keepClient()
+        loc = kc.put(self.DATA, copies=1, num_retries=0)
+        self.server.setbandwidth(self.BANDWIDTH_LOW_LIM)
+        self.server.setdelays(response=self.TIMEOUT_TIME)
+        with self.assertTakesGreater(self.TIMEOUT_TIME):
+            with self.assertRaises(arvados.errors.KeepReadError) as e:
+                kc.get(loc, num_retries=0)
+        with self.assertTakesGreater(self.TIMEOUT_TIME):
+            with self.assertRaises(arvados.errors.KeepWriteError):
+                kc.put(self.DATA, copies=1, num_retries=0)
+
+    def test_low_bandwidth_with_server_mid_delay_failure(self):
+        kc = self.keepClient()
+        loc = kc.put(self.DATA, copies=1, num_retries=0)
+        self.server.setbandwidth(self.BANDWIDTH_LOW_LIM)
+        self.server.setdelays(mid_write=self.TIMEOUT_TIME, mid_read=self.TIMEOUT_TIME)
+        with self.assertTakesGreater(self.TIMEOUT_TIME):
+            with self.assertRaises(arvados.errors.KeepReadError) as e:
+                kc.get(loc, num_retries=0)
+        with self.assertTakesGreater(self.TIMEOUT_TIME):
+            with self.assertRaises(arvados.errors.KeepWriteError):
+                kc.put(self.DATA, copies=1, num_retries=0)
 
     def test_timeout_slow_request(self):
-        self.server.setdelays(request=0.2)
-        self._test_200ms()
+        loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
+        self.server.setdelays(request=.2)
+        self._test_connect_timeout_under_200ms(loc)
+        self.server.setdelays(request=2)
+        self._test_response_timeout_under_2s(loc)
 
     def test_timeout_slow_response(self):
-        self.server.setdelays(response=0.2)
-        self._test_200ms()
+        loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
+        self.server.setdelays(response=.2)
+        self._test_connect_timeout_under_200ms(loc)
+        self.server.setdelays(response=2)
+        self._test_response_timeout_under_2s(loc)
 
     def test_timeout_slow_response_body(self):
-        self.server.setdelays(response_body=0.2)
-        self._test_200ms()
-
-    def _test_200ms(self):
-        """Connect should be t<100ms, request should be 200ms <= t < 300ms"""
+        loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
+        self.server.setdelays(response_body=.2)
+        self._test_connect_timeout_under_200ms(loc)
+        self.server.setdelays(response_body=2)
+        self._test_response_timeout_under_2s(loc)
 
+    def _test_connect_timeout_under_200ms(self, loc):
         # Allow 100ms to connect, then 1s for response. Everything
         # should work, and everything should take at least 200ms to
         # return.
-        kc = self.keepClient((.1, 1))
+        kc = self.keepClient(timeouts=(.1, 1))
         with self.assertTakesBetween(.2, .3):
-            loc = kc.put(self.DATA, copies=1, num_retries=0)
+            kc.put(self.DATA, copies=1, num_retries=0)
         with self.assertTakesBetween(.2, .3):
             self.assertEqual(self.DATA, kc.get(loc, num_retries=0))
 
-        # Allow 1s to connect, then 100ms for response. Nothing should
-        # work, and everything should take at least 100ms to return.
-        kc = self.keepClient((1, .1))
-        with self.assertTakesBetween(.1, .2):
+    def _test_response_timeout_under_2s(self, loc):
+        # Allow 10s to connect, then 1s for response. Nothing should
+        # work, and everything should take at least 1s to return.
+        kc = self.keepClient(timeouts=(10, 1))
+        with self.assertTakesBetween(1, 1.9):
             with self.assertRaises(arvados.errors.KeepReadError):
                 kc.get(loc, num_retries=0)
-        with self.assertTakesBetween(.1, .2):
+        with self.assertTakesBetween(1, 1.9):
             with self.assertRaises(arvados.errors.KeepWriteError):
                 kc.put(self.DATA, copies=1, num_retries=0)
 
index 64818da375ecc4fb0277a10be0f3f7e6b552a2e8..21c8e4710cb5e62dbe224a6034c68ea1bd40b05e 100644 (file)
@@ -29,6 +29,10 @@ class DatabaseController < ApplicationController
     fixturesets = Dir.glob(Rails.root.join('test', 'fixtures', '*.yml')).
       collect { |yml| yml.match(/([^\/]*)\.yml$/)[1] }
 
+    # Don't reset keep_services: clients need to discover our
+    # integration-testing keepstores, not test fixtures.
+    fixturesets -= %w[keep_services]
+
     table_names = '"' + ActiveRecord::Base.connection.tables.join('","') + '"'
 
     attempts_left = 20
index 9199d178f6bcdfec3c8536d8da9f7e6b22613898..85c2c791a379552caabc10f6665e1dfca10a2190 100644 (file)
@@ -18,6 +18,18 @@ admin_trustedclient:
   api_token: 1a9ffdcga2o7cw8q12dndskomgs1ygli3ns9k2o9hgzgmktc78
   expires_at: 2038-01-01 00:00:00
 
+data_manager:
+  api_client: untrusted
+  user: system_user
+  api_token: 320mkve8qkswstz7ff61glpk3mhgghmg67wmic7elw4z41pke1
+  expires_at: 2038-01-01 00:00:00
+  scopes:
+    - GET /arvados/v1/collections
+    - GET /arvados/v1/keep_services
+    - GET /arvados/v1/keep_services/accessible
+    - GET /arvados/v1/users/current
+    - POST /arvados/v1/logs
+
 miniadmin:
   api_client: untrusted
   user: miniadmin
@@ -87,7 +99,7 @@ active_all_collections:
   user: active
   api_token: activecollectionsabcdefghijklmnopqrstuvwxyz1234567
   expires_at: 2038-01-01 00:00:00
-  scopes: ["GET /arvados/v1/collections/", "GET /arvados/v1/keep_disks"]
+  scopes: ["GET /arvados/v1/collections/", "GET /arvados/v1/keep_services/accessible"]
 
 active_userlist:
   api_client: untrusted
index c2cb762d52b625b625634f24d385ddbf9ad4e7d8..685f94c88f3a35c33f6aa986e85701a5853e5d32 100644 (file)
@@ -16,11 +16,6 @@ import (
        "time"
 )
 
-const (
-       ActiveUserToken = "3kg6k6lzmp9kj5cpkcoxie963cmvjahbt2fod9zru30k1jqdmi"
-       AdminToken      = "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h"
-)
-
 var arv arvadosclient.ArvadosClient
 var keepClient *keepclient.KeepClient
 var keepServers []string
@@ -34,6 +29,7 @@ func SetupDataManagerTest(t *testing.T) {
        arvadostest.StartKeep(2, false)
 
        arv = makeArvadosClient()
+       arv.ApiToken = arvadostest.DataManagerToken
 
        // keep client
        keepClient = &keepclient.KeepClient{
@@ -124,7 +120,18 @@ func getFirstLocatorFromCollection(t *testing.T, uuid string) string {
        return match[1] + "+" + match[2]
 }
 
+func switchToken(t string) func() {
+       orig := arv.ApiToken
+       restore := func() {
+               arv.ApiToken = orig
+       }
+       arv.ApiToken = t
+       return restore
+}
+
 func getCollection(t *testing.T, uuid string) Dict {
+       defer switchToken(arvadostest.AdminToken)()
+
        getback := make(Dict)
        err := arv.Get("collections", uuid, nil, &getback)
        if err != nil {
@@ -138,6 +145,8 @@ func getCollection(t *testing.T, uuid string) Dict {
 }
 
 func updateCollection(t *testing.T, uuid string, paramName string, paramValue string) {
+       defer switchToken(arvadostest.AdminToken)()
+
        err := arv.Update("collections", uuid, arvadosclient.Dict{
                "collection": arvadosclient.Dict{
                        paramName: paramValue,
@@ -152,6 +161,8 @@ func updateCollection(t *testing.T, uuid string, paramName string, paramValue st
 type Dict map[string]interface{}
 
 func deleteCollection(t *testing.T, uuid string) {
+       defer switchToken(arvadostest.AdminToken)()
+
        getback := make(Dict)
        err := arv.Delete("collections", uuid, nil, &getback)
        if err != nil {
@@ -175,7 +186,7 @@ func getBlockIndexesForServer(t *testing.T, i int) []string {
        path := keepServers[i] + "/index"
        client := http.Client{}
        req, err := http.NewRequest("GET", path, nil)
-       req.Header.Add("Authorization", "OAuth2 "+AdminToken)
+       req.Header.Add("Authorization", "OAuth2 "+arvadostest.DataManagerToken)
        req.Header.Add("Content-Type", "application/octet-stream")
        resp, err := client.Do(req)
        defer resp.Body.Close()
@@ -297,7 +308,7 @@ func backdateBlocks(t *testing.T, oldUnusedBlockLocators []string) {
 func getStatus(t *testing.T, path string) interface{} {
        client := http.Client{}
        req, err := http.NewRequest("GET", path, nil)
-       req.Header.Add("Authorization", "OAuth2 "+AdminToken)
+       req.Header.Add("Authorization", "OAuth2 "+arvadostest.DataManagerToken)
        req.Header.Add("Content-Type", "application/octet-stream")
        resp, err := client.Do(req)
        if err != nil {
@@ -504,7 +515,7 @@ func TestRunDatamanagerAsNonAdminUser(t *testing.T) {
        defer TearDownDataManagerTest(t)
        SetupDataManagerTest(t)
 
-       arv.ApiToken = ActiveUserToken
+       arv.ApiToken = arvadostest.ActiveToken
 
        err := singlerun(arv)
        if err == nil {
index 8ffca49601d795fc0622326e04eb219dd3dd0d8d..fdc93fb3beb37b7e4d4eda8400eb6f72ff82423a 100644 (file)
@@ -496,10 +496,11 @@ point the collection will actually be looked up on the server and the directory
 will appear if it exists.
 """.lstrip()
 
-    def __init__(self, parent_inode, inodes, api, num_retries):
+    def __init__(self, parent_inode, inodes, api, num_retries, pdh_only=False):
         super(MagicDirectory, self).__init__(parent_inode, inodes)
         self.api = api
         self.num_retries = num_retries
+        self.pdh_only = pdh_only
 
     def __setattr__(self, name, value):
         super(MagicDirectory, self).__setattr__(name, value)
@@ -511,13 +512,13 @@ will appear if it exists.
             # If we're the root directory, add an identical by_id subdirectory.
             if self.inode == llfuse.ROOT_INODE:
                 self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
-                        self.inode, self.inodes, self.api, self.num_retries))
+                        self.inode, self.inodes, self.api, self.num_retries, self.pdh_only))
 
     def __contains__(self, k):
         if k in self._entries:
             return True
 
-        if not portable_data_hash_pattern.match(k) and not uuid_pattern.match(k):
+        if not portable_data_hash_pattern.match(k) and (self.pdh_only or not uuid_pattern.match(k)):
             return False
 
         try:
index 7f9c916c456f49e76744f78ba06b18af2eb1ed1b..d15553456a2238fcee7d32d926517ada43d05785 100755 (executable)
@@ -39,6 +39,8 @@ with "--".
                             help="""Mount subdirectories listed by tag.""")
     mount_mode.add_argument('--by-id', action='store_true',
                             help="""Mount subdirectories listed by portable data hash or uuid.""")
+    mount_mode.add_argument('--by-pdh', action='store_true',
+                            help="""Mount subdirectories listed by portable data hash.""")
     mount_mode.add_argument('--project', type=str, help="""Mount a specific project.""")
     mount_mode.add_argument('--collection', type=str, help="""Mount only the specified collection.""")
 
@@ -103,9 +105,10 @@ with "--".
         now = time.time()
         dir_class = None
         dir_args = [llfuse.ROOT_INODE, operations.inodes, api, args.retries]
-        if args.by_id:
+        if args.by_id or args.by_pdh:
             # Set up the request handler with the 'magic directory' at the root
             dir_class = MagicDirectory
+            dir_args.append(args.by_pdh)
         elif args.by_tag:
             dir_class = TagsDirectory
         elif args.shared:
@@ -131,6 +134,7 @@ with "--".
             dir_args[0] = e.inode
 
             e._entries['by_id'] = operations.inodes.add_entry(MagicDirectory(*dir_args))
+
             e._entries['by_tag'] = operations.inodes.add_entry(TagsDirectory(*dir_args))
 
             dir_args.append(usr)
@@ -170,7 +174,8 @@ From here, the following directories are available:
         llfuse.init(operations, args.mountpoint, opts)
 
         # Subscribe to change events from API server
-        operations.listen_for_events(api)
+        if not args.by_pdh:
+            operations.listen_for_events(api)
 
         t = threading.Thread(None, lambda: llfuse.main())
         t.start()
index ff8883714512c8a8f6b6a0196a19cb128204d317..cc8693bd34c40628f193afd40c386751d86d1c78 100644 (file)
@@ -1060,3 +1060,66 @@ class FuseUnitTest(unittest.TestCase):
         self.assertEqual("_", fuse.sanitize_filename(""))
         self.assertEqual("_", fuse.sanitize_filename("."))
         self.assertEqual("__", fuse.sanitize_filename(".."))
+
+
+class FuseMagicTestPDHOnly(MountTestBase):
+    def setUp(self, api=None):
+        super(FuseMagicTestPDHOnly, self).setUp(api=api)
+
+        cw = arvados.CollectionWriter()
+
+        cw.start_new_file('thing1.txt')
+        cw.write("data 1")
+
+        self.testcollection = cw.finish()
+        self.test_manifest = cw.manifest_text()
+        created = self.api.collections().create(body={"manifest_text":self.test_manifest}).execute()
+        self.testcollectionuuid = str(created['uuid'])
+
+    def verify_pdh_only(self, pdh_only=False, skip_pdh_only=False):
+        if skip_pdh_only is True:
+            self.make_mount(fuse.MagicDirectory)    # in this case, the default by_id applies
+        else:
+            self.make_mount(fuse.MagicDirectory, pdh_only=pdh_only)
+
+        mount_ls = llfuse.listdir(self.mounttmp)
+        self.assertIn('README', mount_ls)
+        self.assertFalse(any(arvados.util.keep_locator_pattern.match(fn) or
+                             arvados.util.uuid_pattern.match(fn)
+                             for fn in mount_ls),
+                         "new FUSE MagicDirectory lists Collection")
+
+        # look up using pdh should succeed in all cases
+        self.assertDirContents(self.testcollection, ['thing1.txt'])
+        self.assertDirContents(os.path.join('by_id', self.testcollection),
+                               ['thing1.txt'])
+        mount_ls = llfuse.listdir(self.mounttmp)
+        self.assertIn('README', mount_ls)
+        self.assertIn(self.testcollection, mount_ls)
+        self.assertIn(self.testcollection,
+                      llfuse.listdir(os.path.join(self.mounttmp, 'by_id')))
+
+        files = {}
+        files[os.path.join(self.mounttmp, self.testcollection, 'thing1.txt')] = 'data 1'
+
+        for k, v in files.items():
+            with open(os.path.join(self.mounttmp, k)) as f:
+                self.assertEqual(v, f.read())
+
+        # look up using uuid should fail when pdh_only is set
+        if pdh_only is True:
+            with self.assertRaises(OSError):
+                self.assertDirContents(os.path.join('by_id', self.testcollectionuuid),
+                               ['thing1.txt'])
+        else:
+            self.assertDirContents(os.path.join('by_id', self.testcollectionuuid),
+                               ['thing1.txt'])
+
+    def test_with_pdh_only_true(self):
+        self.verify_pdh_only(pdh_only=True)
+
+    def test_with_pdh_only_false(self):
+        self.verify_pdh_only(pdh_only=False)
+
+    def test_with_default_by_id(self):
+        self.verify_pdh_only(skip_pdh_only=True)
index bfc716f2b2b865d9a12577bcca41812e7252f4e9..15a98c2f361ef62f2ddfd313da5d5402ea718c56 100644 (file)
@@ -3,22 +3,33 @@ package main
 import (
        "flag"
        "fmt"
+       "os"
+       "strconv"
 )
 
 var anonymousTokens tokenSet
 
 type tokenSet []string
 
-func (ts *tokenSet) Set(t string) error {
-       *ts = append(*ts, t)
-       return nil
+func (ts *tokenSet) Set(s string) error {
+       v, err := strconv.ParseBool(s)
+       if v && len(*ts) == 0 {
+               *ts = append(*ts, os.Getenv("ARVADOS_API_TOKEN"))
+       } else if !v {
+               *ts = (*ts)[:0]
+       }
+       return err
 }
 
 func (ts *tokenSet) String() string {
-       return fmt.Sprintf("%+v", (*ts)[:])
+       return fmt.Sprintf("%v", len(*ts) > 0)
+}
+
+func (ts *tokenSet) IsBoolFlag() bool {
+       return true
 }
 
 func init() {
-       flag.Var(&anonymousTokens, "anonymous-token",
-               "API token to try when none of the tokens provided in an HTTP request succeed in reading the desired collection. Multiple anonymous tokens can be provided by using this flag more than once; each token will be attempted in turn until one works.")
+       flag.Var(&anonymousTokens, "allow-anonymous",
+               "Serve public data to anonymous clients. Try the token supplied in the ARVADOS_API_TOKEN environment variable when none of the tokens provided in an HTTP request succeed in reading the desired collection.")
 }
index 5aa09e20a455b979954855cd4367224b36900c69..4207d7bfc7344cb72dfedbb176000bced1966077 100644 (file)
 //
 // Serve HTTP requests at port 1234 on all interfaces:
 //
-//   keep-web -address=:1234
+//   keep-web -listen=:1234
 //
 // Serve HTTP requests at port 1234 on the interface with IP address 1.2.3.4:
 //
-//   keep-web -address=1.2.3.4:1234
+//   keep-web -listen=1.2.3.4:1234
 //
 // Proxy configuration
 //
 //
 // Anonymous downloads
 //
-// Use the -anonymous-token option to specify a token to use when clients
-// try to retrieve files without providing their own Arvados API token.
+// Use the -allow-anonymous flag with an ARVADOS_API_TOKEN environment
+// variable to specify a token to use when clients try to retrieve
+// files without providing their own Arvados API token.
 //
-//   keep-web [...] -anonymous-token=zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz
+//   export ARVADOS_API_TOKEN=zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz
+//   keep-web [...] -allow-anonymous
 //
 // See http://doc.arvados.org/install/install-keep-web.html for examples.
 //
 // only when the designated origin matches exactly the Host header
 // provided by the client or downstream proxy.
 //
-//   keep-web -address :9999 -attachment-only-host domain.example:9999
+//   keep-web -listen :9999 -attachment-only-host domain.example:9999
 //
 // Trust All Content mode
 //
 //
 // In such cases you can enable trust-all-content mode.
 //
-//   keep-web -address :9999 -trust-all-content
+//   keep-web -listen :9999 -trust-all-content
 //
 // When using trust-all-content mode, the only effect of the
 // -attachment-only-host option is to add a "Content-Disposition:
 // attachment" header.
 //
-//   keep-web -address :9999 -attachment-only-host domain.example:9999 -trust-all-content
+//   keep-web -listen :9999 -attachment-only-host domain.example:9999 -trust-all-content
 //
 package main
index 3e38cc35cad611b5944d5c68221f43989daf4bb5..847329f33137ca259b0399349e539cce01885172 100644 (file)
@@ -9,6 +9,8 @@ import (
        "net/http"
        "net/url"
        "os"
+       "regexp"
+       "strconv"
        "strings"
 
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
@@ -181,7 +183,17 @@ func (h *handler) ServeHTTP(wOrig http.ResponseWriter, r *http.Request) {
                        Path:     "/",
                        HttpOnly: true,
                })
-               redir := (&url.URL{Host: r.Host, Path: r.URL.Path}).String()
+
+               // Propagate query parameters (except api_token) from
+               // the original request.
+               redirQuery := r.URL.Query()
+               redirQuery.Del("api_token")
+
+               redir := (&url.URL{
+                       Host:     r.Host,
+                       Path:     r.URL.Path,
+                       RawQuery: redirQuery.Encode(),
+               }).String()
 
                w.Header().Add("Location", redir)
                statusCode, statusText = http.StatusSeeOther, redir
@@ -292,8 +304,10 @@ func (h *handler) ServeHTTP(wOrig http.ResponseWriter, r *http.Request) {
        }
        defer rdr.Close()
 
-       // One or both of these can be -1 if not found:
        basenamePos := strings.LastIndex(filename, "/")
+       if basenamePos < 0 {
+               basenamePos = 0
+       }
        extPos := strings.LastIndex(filename, ".")
        if extPos > basenamePos {
                // Now extPos is safely >= 0.
@@ -304,13 +318,53 @@ func (h *handler) ServeHTTP(wOrig http.ResponseWriter, r *http.Request) {
        if rdr, ok := rdr.(keepclient.ReadCloserWithLen); ok {
                w.Header().Set("Content-Length", fmt.Sprintf("%d", rdr.Len()))
        }
-       if attachment {
-               w.Header().Set("Content-Disposition", "attachment")
-       }
 
-       w.WriteHeader(http.StatusOK)
-       _, err = io.Copy(w, rdr)
+       applyContentDispositionHdr(w, r, filename[basenamePos:], attachment)
+       rangeRdr, statusCode := applyRangeHdr(w, r, rdr)
+
+       w.WriteHeader(statusCode)
+       _, err = io.Copy(w, rangeRdr)
        if err != nil {
                statusCode, statusText = http.StatusBadGateway, err.Error()
        }
 }
+
+var rangeRe = regexp.MustCompile(`^bytes=0-([0-9]*)$`)
+
+func applyRangeHdr(w http.ResponseWriter, r *http.Request, rdr keepclient.ReadCloserWithLen) (io.Reader, int) {
+       w.Header().Set("Accept-Ranges", "bytes")
+       hdr := r.Header.Get("Range")
+       fields := rangeRe.FindStringSubmatch(hdr)
+       if fields == nil {
+               return rdr, http.StatusOK
+       }
+       rangeEnd, err := strconv.ParseInt(fields[1], 10, 64)
+       if err != nil {
+               // Empty or too big for int64 == send entire content
+               return rdr, http.StatusOK
+       }
+       if uint64(rangeEnd) >= rdr.Len() {
+               return rdr, http.StatusOK
+       }
+       w.Header().Set("Content-Length", fmt.Sprintf("%d", rangeEnd+1))
+       w.Header().Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", 0, rangeEnd, rdr.Len()))
+       return &io.LimitedReader{R: rdr, N: rangeEnd + 1}, http.StatusPartialContent
+}
+
+func applyContentDispositionHdr(w http.ResponseWriter, r *http.Request, filename string, isAttachment bool) {
+       disposition := "inline"
+       if isAttachment {
+               disposition = "attachment"
+       }
+       if strings.ContainsRune(r.RequestURI, '?') {
+               // Help the UA realize that the filename is just
+               // "filename.txt", not
+               // "filename.txt?disposition=attachment".
+               //
+               // TODO(TC): Follow advice at RFC 6266 appendix D
+               disposition += "; filename=" + strconv.QuoteToASCII(filename)
+       }
+       if disposition != "inline" {
+               w.Header().Set("Content-Disposition", disposition)
+       }
+}
index 392de94ffb5af1399bada756ed4a8efc7f33506b..94a1cf7c0746a3fd6a883a9f84177f61646da257 100644 (file)
@@ -32,9 +32,11 @@ func (s *IntegrationSuite) TestVhost404(c *check.C) {
                arvadostest.NonexistentCollection + ".example.com/t=" + arvadostest.ActiveToken + "/theperthcountyconspiracy",
        } {
                resp := httptest.NewRecorder()
+               u := mustParseURL(testURL)
                req := &http.Request{
-                       Method: "GET",
-                       URL:    mustParseURL(testURL),
+                       Method:     "GET",
+                       URL:        u,
+                       RequestURI: u.RequestURI(),
                }
                (&handler{}).ServeHTTP(resp, req)
                c.Check(resp.Code, check.Equals, http.StatusNotFound)
@@ -120,10 +122,11 @@ func doVhostRequestsWithHostPath(c *check.C, authz authorizer, hostPath string)
        } {
                u := mustParseURL("http://" + hostPath)
                req := &http.Request{
-                       Method: "GET",
-                       Host:   u.Host,
-                       URL:    u,
-                       Header: http.Header{},
+                       Method:     "GET",
+                       Host:       u.Host,
+                       URL:        u,
+                       RequestURI: u.RequestURI(),
+                       Header:     http.Header{},
                }
                failCode := authz(req, tok)
                resp := doReq(req)
@@ -157,10 +160,11 @@ func doReq(req *http.Request) *httptest.ResponseRecorder {
        cookies := (&http.Response{Header: resp.Header()}).Cookies()
        u, _ := req.URL.Parse(resp.Header().Get("Location"))
        req = &http.Request{
-               Method: "GET",
-               Host:   u.Host,
-               URL:    u,
-               Header: http.Header{},
+               Method:     "GET",
+               Host:       u.Host,
+               URL:        u,
+               RequestURI: u.RequestURI(),
+               Header:     http.Header{},
        }
        for _, c := range cookies {
                req.AddCookie(c)
@@ -228,6 +232,21 @@ func (s *IntegrationSuite) TestVhostRedirectQueryTokenSingleOriginError(c *check
        )
 }
 
+// If client requests an attachment by putting ?disposition=attachment
+// in the query string, and gets redirected, the redirect target
+// should respond with an attachment.
+func (s *IntegrationSuite) TestVhostRedirectQueryTokenRequestAttachment(c *check.C) {
+       resp := s.testVhostRedirectTokenToCookie(c, "GET",
+               arvadostest.FooCollection+".example.com/foo",
+               "?disposition=attachment&api_token="+arvadostest.ActiveToken,
+               "",
+               "",
+               http.StatusOK,
+               "foo",
+       )
+       c.Check(strings.Split(resp.Header().Get("Content-Disposition"), ";")[0], check.Equals, "attachment")
+}
+
 func (s *IntegrationSuite) TestVhostRedirectQueryTokenTrustAllContent(c *check.C) {
        defer func(orig bool) {
                trustAllContent = orig
@@ -315,14 +334,57 @@ func (s *IntegrationSuite) TestAnonymousTokenError(c *check.C) {
        )
 }
 
+func (s *IntegrationSuite) TestRange(c *check.C) {
+       u, _ := url.Parse("http://example.com/c=" + arvadostest.HelloWorldCollection + "/Hello%20world.txt")
+       req := &http.Request{
+               Method:     "GET",
+               Host:       u.Host,
+               URL:        u,
+               RequestURI: u.RequestURI(),
+               Header:     http.Header{"Range": {"bytes=0-4"}},
+       }
+       resp := httptest.NewRecorder()
+       (&handler{}).ServeHTTP(resp, req)
+       c.Check(resp.Code, check.Equals, http.StatusPartialContent)
+       c.Check(resp.Body.String(), check.Equals, "Hello")
+       c.Check(resp.Header().Get("Content-Length"), check.Equals, "5")
+       c.Check(resp.Header().Get("Content-Range"), check.Equals, "bytes 0-4/12")
+
+       req.Header.Set("Range", "bytes=0-")
+       resp = httptest.NewRecorder()
+       (&handler{}).ServeHTTP(resp, req)
+       // 200 and 206 are both correct:
+       c.Check(resp.Code, check.Equals, http.StatusOK)
+       c.Check(resp.Body.String(), check.Equals, "Hello world\n")
+       c.Check(resp.Header().Get("Content-Length"), check.Equals, "12")
+
+       // Unsupported ranges are ignored
+       for _, hdr := range []string{
+               "bytes=5-5",  // non-zero start byte
+               "bytes=-5",   // last 5 bytes
+               "cubits=0-5", // unsupported unit
+               "bytes=0-340282366920938463463374607431768211456", // 2^128
+       } {
+               req.Header.Set("Range", hdr)
+               resp = httptest.NewRecorder()
+               (&handler{}).ServeHTTP(resp, req)
+               c.Check(resp.Code, check.Equals, http.StatusOK)
+               c.Check(resp.Body.String(), check.Equals, "Hello world\n")
+               c.Check(resp.Header().Get("Content-Length"), check.Equals, "12")
+               c.Check(resp.Header().Get("Content-Range"), check.Equals, "")
+               c.Check(resp.Header().Get("Accept-Ranges"), check.Equals, "bytes")
+       }
+}
+
 func (s *IntegrationSuite) testVhostRedirectTokenToCookie(c *check.C, method, hostPath, queryString, contentType, reqBody string, expectStatus int, expectRespBody string) *httptest.ResponseRecorder {
        u, _ := url.Parse(`http://` + hostPath + queryString)
        req := &http.Request{
-               Method: method,
-               Host:   u.Host,
-               URL:    u,
-               Header: http.Header{"Content-Type": {contentType}},
-               Body:   ioutil.NopCloser(strings.NewReader(reqBody)),
+               Method:     method,
+               Host:       u.Host,
+               URL:        u,
+               RequestURI: u.RequestURI(),
+               Header:     http.Header{"Content-Type": {contentType}},
+               Body:       ioutil.NopCloser(strings.NewReader(reqBody)),
        }
 
        resp := httptest.NewRecorder()
@@ -335,15 +397,16 @@ func (s *IntegrationSuite) testVhostRedirectTokenToCookie(c *check.C, method, ho
        if resp.Code != http.StatusSeeOther {
                return resp
        }
-       c.Check(resp.Body.String(), check.Matches, `.*href="//`+regexp.QuoteMeta(html.EscapeString(hostPath))+`".*`)
+       c.Check(resp.Body.String(), check.Matches, `.*href="//`+regexp.QuoteMeta(html.EscapeString(hostPath))+`(\?[^"]*)?".*`)
        cookies := (&http.Response{Header: resp.Header()}).Cookies()
 
        u, _ = u.Parse(resp.Header().Get("Location"))
        req = &http.Request{
-               Method: "GET",
-               Host:   u.Host,
-               URL:    u,
-               Header: http.Header{},
+               Method:     "GET",
+               Host:       u.Host,
+               URL:        u,
+               RequestURI: u.RequestURI(),
+               Header:     http.Header{},
        }
        for _, c := range cookies {
                req.AddCookie(c)
index 751543e8fa81a0af389a5dcfc2c744a3c66ea503..135f01b394720efba18dc8082744637bfaf3a7c1 100644 (file)
@@ -12,7 +12,9 @@ func init() {
        // different token before doing anything with the client). We
        // set this dummy value during init so it doesn't clobber the
        // one used by "run test servers".
-       os.Setenv("ARVADOS_API_TOKEN", "xxx")
+       if os.Getenv("ARVADOS_API_TOKEN") == "" {
+               os.Setenv("ARVADOS_API_TOKEN", "xxx")
+       }
 }
 
 func main() {
index 2359f23c761504cca5d8116db420b35607e4a311..100900830f5d6808563a928fdd3e501ca660d501 100644 (file)
@@ -10,7 +10,7 @@ import (
 var address string
 
 func init() {
-       flag.StringVar(&address, "address", ":80",
+       flag.StringVar(&address, "listen", ":80",
                "Address to listen on: \"host:port\", or \":port\" to listen on all interfaces.")
 }
 
index 79ed51eb0e00f57eb38d4a31251a87c2bc5e866c..7b5cd2befb8f69bd25fa62674d01590214aec5ad 100644 (file)
@@ -14,7 +14,6 @@ import (
        "net/http"
        "os"
        "os/signal"
-       "reflect"
        "regexp"
        "sync"
        "syscall"
@@ -22,8 +21,8 @@ import (
 )
 
 // Default TCP address on which to listen for requests.
-// Initialized by the -listen flag.
-const DEFAULT_ADDR = ":25107"
+// Override with -listen.
+const DefaultAddr = ":25107"
 
 var listener net.Listener
 
@@ -42,7 +41,7 @@ func main() {
        flagset.StringVar(
                &listen,
                "listen",
-               DEFAULT_ADDR,
+               DefaultAddr,
                "Interface on which to listen for requests, in the format "+
                        "ipaddr:port. e.g. -listen=10.0.1.24:8000. Use -listen=:port "+
                        "to listen on all network interfaces.")
@@ -103,15 +102,14 @@ func main() {
        }
 
        kc.Want_replicas = default_replicas
-
        kc.Client.Timeout = time.Duration(timeout) * time.Second
+       go kc.RefreshServices(5*time.Minute, 3*time.Second)
 
        listener, err = net.Listen("tcp", listen)
        if err != nil {
                log.Fatalf("Could not listen on %v", listen)
        }
-
-       go RefreshServicesList(kc)
+       log.Printf("Arvados Keep proxy started listening on %v", listener.Addr())
 
        // Shut down the server gracefully (by closing the listener)
        // if SIGTERM is received.
@@ -124,9 +122,7 @@ func main() {
        signal.Notify(term, syscall.SIGTERM)
        signal.Notify(term, syscall.SIGINT)
 
-       log.Printf("Arvados Keep proxy started listening on %v", listener.Addr())
-
-       // Start listening for requests.
+       // Start serving requests.
        http.Serve(listener, MakeRESTRouter(!no_get, !no_put, kc))
 
        log.Println("shutting down")
@@ -138,30 +134,6 @@ type ApiTokenCache struct {
        expireTime int64
 }
 
-// Refresh the keep service list every five minutes.
-func RefreshServicesList(kc *keepclient.KeepClient) {
-       var previousRoots = []map[string]string{}
-       var delay time.Duration = 0
-       for {
-               time.Sleep(delay * time.Second)
-               delay = 300
-               if err := kc.DiscoverKeepServers(); err != nil {
-                       log.Println("Error retrieving services list:", err)
-                       delay = 3
-                       continue
-               }
-               newRoots := []map[string]string{kc.LocalRoots(), kc.GatewayRoots()}
-               if !reflect.DeepEqual(previousRoots, newRoots) {
-                       log.Printf("Updated services list: locals %v gateways %v", newRoots[0], newRoots[1])
-               }
-               if len(newRoots[0]) == 0 {
-                       log.Print("WARNING: No local services. Retrying in 3 seconds.")
-                       delay = 3
-               }
-               previousRoots = newRoots
-       }
-}
-
 // Cache the token and set an expire time.  If we already have an expire time
 // on the token, it is not updated.
 func (this *ApiTokenCache) RememberToken(token string) {
@@ -194,12 +166,8 @@ func (this *ApiTokenCache) RecallToken(token string) bool {
 }
 
 func GetRemoteAddress(req *http.Request) string {
-       if realip := req.Header.Get("X-Real-IP"); realip != "" {
-               if forwarded := req.Header.Get("X-Forwarded-For"); forwarded != realip {
-                       return fmt.Sprintf("%s (X-Forwarded-For %s)", realip, forwarded)
-               } else {
-                       return realip
-               }
+       if xff := req.Header.Get("X-Forwarded-For"); xff != "" {
+               return xff + "," + req.RemoteAddr
        }
        return req.RemoteAddr
 }
index 2c75ec1616e3b404a9547b6e2f969ce9fc1fe9f2..7c5b3628963ca44e29f3c487e6792ed1f8e33feb 100644 (file)
@@ -2,21 +2,19 @@ package main
 
 import (
        "crypto/md5"
-       "crypto/tls"
        "fmt"
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
        "git.curoverse.com/arvados.git/sdk/go/arvadostest"
        "git.curoverse.com/arvados.git/sdk/go/keepclient"
-       . "gopkg.in/check.v1"
-       "io"
        "io/ioutil"
        "log"
        "net/http"
-       "net/url"
        "os"
        "strings"
        "testing"
        "time"
+
+       . "gopkg.in/check.v1"
 )
 
 // Gocheck boilerplate
@@ -36,6 +34,8 @@ var _ = Suite(&NoKeepServerSuite{})
 // Test with no keepserver to simulate errors
 type NoKeepServerSuite struct{}
 
+var TestProxyUUID = "zzzzz-bi6l4-lrixqc4fxofbmzz"
+
 // Wait (up to 1 second) for keepproxy to listen on a port. This
 // avoids a race condition where we hit a "connection refused" error
 // because we start testing the proxy too soon.
@@ -73,6 +73,10 @@ func (s *ServerRequiredSuite) TearDownSuite(c *C) {
 
 func (s *NoKeepServerSuite) SetUpSuite(c *C) {
        arvadostest.StartAPI()
+       // We need API to have some keep services listed, but the
+       // services themselves should be unresponsive.
+       arvadostest.StartKeep(2, false)
+       arvadostest.StopKeep(2)
 }
 
 func (s *NoKeepServerSuite) SetUpTest(c *C) {
@@ -83,106 +87,31 @@ func (s *NoKeepServerSuite) TearDownSuite(c *C) {
        arvadostest.StopAPI()
 }
 
-func setupProxyService() {
-
-       client := &http.Client{Transport: &http.Transport{
-               TLSClientConfig: &tls.Config{InsecureSkipVerify: true}}}
-
-       var req *http.Request
-       var err error
-       if req, err = http.NewRequest("POST", fmt.Sprintf("https://%s/arvados/v1/keep_services", os.Getenv("ARVADOS_API_HOST")), nil); err != nil {
-               panic(err.Error())
-       }
-       req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", os.Getenv("ARVADOS_API_TOKEN")))
-
-       reader, writer := io.Pipe()
-
-       req.Body = reader
-
-       go func() {
-               data := url.Values{}
-               data.Set("keep_service", `{
-  "service_host": "localhost",
-  "service_port": 29950,
-  "service_ssl_flag": false,
-  "service_type": "proxy"
-}`)
-
-               writer.Write([]byte(data.Encode()))
-               writer.Close()
-       }()
-
-       var resp *http.Response
-       if resp, err = client.Do(req); err != nil {
-               panic(err.Error())
-       }
-       if resp.StatusCode != 200 {
-               panic(resp.Status)
-       }
-}
+func runProxy(c *C, args []string, bogusClientToken bool) *keepclient.KeepClient {
+       args = append([]string{"keepproxy"}, args...)
+       os.Args = append(args, "-listen=:0")
+       listener = nil
+       go main()
+       waitForListener()
 
-func runProxy(c *C, args []string, port int, bogusClientToken bool) *keepclient.KeepClient {
-       if bogusClientToken {
-               os.Setenv("ARVADOS_API_TOKEN", "bogus-token")
-       }
        arv, err := arvadosclient.MakeArvadosClient()
        c.Assert(err, Equals, nil)
-       kc := keepclient.KeepClient{
-               Arvados:       &arv,
-               Want_replicas: 2,
-               Using_proxy:   true,
-               Client:        &http.Client{},
-       }
-       locals := map[string]string{
-               "proxy": fmt.Sprintf("http://localhost:%v", port),
-       }
-       writableLocals := map[string]string{
-               "proxy": fmt.Sprintf("http://localhost:%v", port),
-       }
-       kc.SetServiceRoots(locals, writableLocals, nil)
-       c.Check(kc.Using_proxy, Equals, true)
-       c.Check(len(kc.LocalRoots()), Equals, 1)
-       for _, root := range kc.LocalRoots() {
-               c.Check(root, Equals, fmt.Sprintf("http://localhost:%v", port))
-       }
-       log.Print("keepclient created")
        if bogusClientToken {
-               arvadostest.ResetEnv()
+               arv.ApiToken = "bogus-token"
        }
-
-       {
-               os.Args = append(args, fmt.Sprintf("-listen=:%v", port))
-               listener = nil
-               go main()
+       kc := keepclient.New(&arv)
+       sr := map[string]string{
+               TestProxyUUID: "http://" + listener.Addr().String(),
        }
+       kc.SetServiceRoots(sr, sr, sr)
+       kc.Arvados.External = true
+       kc.Using_proxy = true
 
-       return &kc
+       return kc
 }
 
 func (s *ServerRequiredSuite) TestPutAskGet(c *C) {
-       log.Print("TestPutAndGet start")
-
-       os.Args = []string{"keepproxy", "-listen=:29950"}
-       listener = nil
-       go main()
-       time.Sleep(100 * time.Millisecond)
-
-       setupProxyService()
-
-       os.Setenv("ARVADOS_EXTERNAL_CLIENT", "true")
-       arv, err := arvadosclient.MakeArvadosClient()
-       c.Assert(err, Equals, nil)
-       kc, err := keepclient.MakeKeepClient(&arv)
-       c.Assert(err, Equals, nil)
-       c.Check(kc.Arvados.External, Equals, true)
-       c.Check(kc.Using_proxy, Equals, true)
-       c.Check(len(kc.LocalRoots()), Equals, 1)
-       for _, root := range kc.LocalRoots() {
-               c.Check(root, Equals, "http://localhost:29950")
-       }
-       os.Setenv("ARVADOS_EXTERNAL_CLIENT", "")
-
-       waitForListener()
+       kc := runProxy(c, nil, false)
        defer closeListener()
 
        hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
@@ -254,15 +183,10 @@ func (s *ServerRequiredSuite) TestPutAskGet(c *C) {
                c.Check(blocklen, Equals, int64(0))
                log.Print("Finished Get zero block")
        }
-
-       log.Print("TestPutAndGet done")
 }
 
 func (s *ServerRequiredSuite) TestPutAskGetForbidden(c *C) {
-       log.Print("TestPutAskGetForbidden start")
-
-       kc := runProxy(c, []string{"keepproxy"}, 29951, true)
-       waitForListener()
+       kc := runProxy(c, nil, true)
        defer closeListener()
 
        hash := fmt.Sprintf("%x", md5.Sum([]byte("bar")))
@@ -300,15 +224,10 @@ func (s *ServerRequiredSuite) TestPutAskGetForbidden(c *C) {
                c.Check(blocklen, Equals, int64(0))
                log.Print("Get")
        }
-
-       log.Print("TestPutAskGetForbidden done")
 }
 
 func (s *ServerRequiredSuite) TestGetDisabled(c *C) {
-       log.Print("TestGetDisabled start")
-
-       kc := runProxy(c, []string{"keepproxy", "-no-get"}, 29952, false)
-       waitForListener()
+       kc := runProxy(c, []string{"-no-get"}, false)
        defer closeListener()
 
        hash := fmt.Sprintf("%x", md5.Sum([]byte("baz")))
@@ -346,38 +265,26 @@ func (s *ServerRequiredSuite) TestGetDisabled(c *C) {
                c.Check(blocklen, Equals, int64(0))
                log.Print("Get")
        }
-
-       log.Print("TestGetDisabled done")
 }
 
 func (s *ServerRequiredSuite) TestPutDisabled(c *C) {
-       log.Print("TestPutDisabled start")
-
-       kc := runProxy(c, []string{"keepproxy", "-no-put"}, 29953, false)
-       waitForListener()
+       kc := runProxy(c, []string{"-no-put"}, false)
        defer closeListener()
 
-       {
-               hash2, rep, err := kc.PutB([]byte("quux"))
-               c.Check(hash2, Equals, "")
-               c.Check(rep, Equals, 0)
-               c.Check(err, Equals, keepclient.InsufficientReplicasError)
-               log.Print("PutB")
-       }
-
-       log.Print("TestPutDisabled done")
+       hash2, rep, err := kc.PutB([]byte("quux"))
+       c.Check(hash2, Equals, "")
+       c.Check(rep, Equals, 0)
+       c.Check(err, Equals, keepclient.InsufficientReplicasError)
 }
 
 func (s *ServerRequiredSuite) TestCorsHeaders(c *C) {
-       runProxy(c, []string{"keepproxy"}, 29954, false)
-       waitForListener()
+       runProxy(c, nil, false)
        defer closeListener()
 
        {
                client := http.Client{}
                req, err := http.NewRequest("OPTIONS",
-                       fmt.Sprintf("http://localhost:29954/%x+3",
-                               md5.Sum([]byte("foo"))),
+                       fmt.Sprintf("http://%s/%x+3", listener.Addr().String(), md5.Sum([]byte("foo"))),
                        nil)
                req.Header.Add("Access-Control-Request-Method", "PUT")
                req.Header.Add("Access-Control-Request-Headers", "Authorization, X-Keep-Desired-Replicas")
@@ -392,8 +299,7 @@ func (s *ServerRequiredSuite) TestCorsHeaders(c *C) {
 
        {
                resp, err := http.Get(
-                       fmt.Sprintf("http://localhost:29954/%x+3",
-                               md5.Sum([]byte("foo"))))
+                       fmt.Sprintf("http://%s/%x+3", listener.Addr().String(), md5.Sum([]byte("foo"))))
                c.Check(err, Equals, nil)
                c.Check(resp.Header.Get("Access-Control-Allow-Headers"), Equals, "Authorization, Content-Length, Content-Type, X-Keep-Desired-Replicas")
                c.Check(resp.Header.Get("Access-Control-Allow-Origin"), Equals, "*")
@@ -401,14 +307,13 @@ func (s *ServerRequiredSuite) TestCorsHeaders(c *C) {
 }
 
 func (s *ServerRequiredSuite) TestPostWithoutHash(c *C) {
-       runProxy(c, []string{"keepproxy"}, 29955, false)
-       waitForListener()
+       runProxy(c, nil, false)
        defer closeListener()
 
        {
                client := http.Client{}
                req, err := http.NewRequest("POST",
-                       "http://localhost:29955/",
+                       "http://"+listener.Addr().String()+"/",
                        strings.NewReader("qux"))
                req.Header.Add("Authorization", "OAuth2 4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h")
                req.Header.Add("Content-Type", "application/octet-stream")
@@ -444,8 +349,7 @@ func (s *ServerRequiredSuite) TestStripHint(c *C) {
 //   With a valid but non-existing prefix (expect "\n")
 //   With an invalid prefix (expect error)
 func (s *ServerRequiredSuite) TestGetIndex(c *C) {
-       kc := runProxy(c, []string{"keepproxy"}, 28852, false)
-       waitForListener()
+       kc := runProxy(c, nil, false)
        defer closeListener()
 
        // Put "index-data" blocks
@@ -467,6 +371,8 @@ func (s *ServerRequiredSuite) TestGetIndex(c *C) {
        _, rep, err = kc.PutB([]byte("some-more-index-data"))
        c.Check(err, Equals, nil)
 
+       kc.Arvados.ApiToken = arvadostest.DataManagerToken
+
        // Invoke GetIndex
        for _, spec := range []struct {
                prefix         string
@@ -477,7 +383,7 @@ func (s *ServerRequiredSuite) TestGetIndex(c *C) {
                {hash[:3], true, false},  // with matching prefix
                {"abcdef", false, false}, // with no such prefix
        } {
-               indexReader, err := kc.GetIndex("proxy", spec.prefix)
+               indexReader, err := kc.GetIndex(TestProxyUUID, spec.prefix)
                c.Assert(err, Equals, nil)
                indexResp, err := ioutil.ReadAll(indexReader)
                c.Assert(err, Equals, nil)
@@ -500,13 +406,12 @@ func (s *ServerRequiredSuite) TestGetIndex(c *C) {
        }
 
        // GetIndex with invalid prefix
-       _, err = kc.GetIndex("proxy", "xyz")
+       _, err = kc.GetIndex(TestProxyUUID, "xyz")
        c.Assert((err != nil), Equals, true)
 }
 
 func (s *ServerRequiredSuite) TestPutAskGetInvalidToken(c *C) {
-       kc := runProxy(c, []string{"keepproxy"}, 28852, false)
-       waitForListener()
+       kc := runProxy(c, nil, false)
        defer closeListener()
 
        // Put a test block
@@ -544,7 +449,7 @@ func (s *ServerRequiredSuite) TestAskGetKeepProxyConnectionError(c *C) {
        // keepclient with no such keep server
        kc := keepclient.New(&arv)
        locals := map[string]string{
-               "proxy": "http://localhost:12345",
+               TestProxyUUID: "http://localhost:12345",
        }
        kc.SetServiceRoots(locals, nil, nil)
 
@@ -565,22 +470,24 @@ func (s *ServerRequiredSuite) TestAskGetKeepProxyConnectionError(c *C) {
 }
 
 func (s *NoKeepServerSuite) TestAskGetNoKeepServerError(c *C) {
-       kc := runProxy(c, []string{"keepproxy"}, 29999, false)
-       waitForListener()
+       kc := runProxy(c, nil, false)
        defer closeListener()
 
-       // Ask should result in temporary connection refused error
        hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
-       _, _, err := kc.Ask(hash)
-       c.Check(err, NotNil)
-       errNotFound, _ := err.(*keepclient.ErrNotFound)
-       c.Check(errNotFound.Temporary(), Equals, true)
-       c.Assert(strings.Contains(err.Error(), "HTTP 502"), Equals, true)
-
-       // Get should result in temporary connection refused error
-       _, _, _, err = kc.Get(hash)
-       c.Check(err, NotNil)
-       errNotFound, _ = err.(*keepclient.ErrNotFound)
-       c.Check(errNotFound.Temporary(), Equals, true)
-       c.Assert(strings.Contains(err.Error(), "HTTP 502"), Equals, true)
+       for _, f := range []func()error {
+               func() error {
+                       _, _, err := kc.Ask(hash)
+                       return err
+               },
+               func() error {
+                       _, _, _, err := kc.Get(hash)
+                       return err
+               },
+       } {
+               err := f()
+               c.Assert(err, NotNil)
+               errNotFound, _ := err.(*keepclient.ErrNotFound)
+               c.Check(errNotFound.Temporary(), Equals, true)
+               c.Check(err, ErrorMatches, `.*HTTP 502.*`)
+       }
 }
index 3a3069ab7745c1efc0a21fd8c706565f65c525e0..52b59ec1ef797b1e09b25529b20285f5190166df 100644 (file)
@@ -82,6 +82,8 @@ func TestPullWorkerIntegration_GetNonExistingLocator(t *testing.T) {
        }
 
        pullRequest := SetupPullWorkerIntegrationTest(t, testData, false)
+       defer arvadostest.StopAPI()
+       defer arvadostest.StopKeep(2)
 
        performPullWorkerIntegrationTest(testData, pullRequest, t)
 }
@@ -97,6 +99,8 @@ func TestPullWorkerIntegration_GetExistingLocator(t *testing.T) {
        }
 
        pullRequest := SetupPullWorkerIntegrationTest(t, testData, true)
+       defer arvadostest.StopAPI()
+       defer arvadostest.StopKeep(2)
 
        performPullWorkerIntegrationTest(testData, pullRequest, t)
 }
index e72889038850631bb8205ee1602326d307d729fc..94281fa8bcbb89f5432614adf715376ad666beab 100644 (file)
@@ -81,13 +81,13 @@ func setupRsync(c *C, enforcePermissions bool, replications int) {
        // srcConfig
        var srcConfig apiConfig
        srcConfig.APIHost = os.Getenv("ARVADOS_API_HOST")
-       srcConfig.APIToken = os.Getenv("ARVADOS_API_TOKEN")
+       srcConfig.APIToken = arvadostest.DataManagerToken
        srcConfig.APIHostInsecure = matchTrue.MatchString(os.Getenv("ARVADOS_API_HOST_INSECURE"))
 
        // dstConfig
        var dstConfig apiConfig
        dstConfig.APIHost = os.Getenv("ARVADOS_API_HOST")
-       dstConfig.APIToken = os.Getenv("ARVADOS_API_TOKEN")
+       dstConfig.APIToken = arvadostest.DataManagerToken
        dstConfig.APIHostInsecure = matchTrue.MatchString(os.Getenv("ARVADOS_API_HOST_INSECURE"))
 
        if enforcePermissions {
@@ -389,7 +389,7 @@ func (s *ServerNotRequiredSuite) TestLoadConfig(c *C) {
        c.Check(err, IsNil)
 
        c.Assert(srcConfig.APIHost, Equals, os.Getenv("ARVADOS_API_HOST"))
-       c.Assert(srcConfig.APIToken, Equals, os.Getenv("ARVADOS_API_TOKEN"))
+       c.Assert(srcConfig.APIToken, Equals, arvadostest.DataManagerToken)
        c.Assert(srcConfig.APIHostInsecure, Equals, matchTrue.MatchString(os.Getenv("ARVADOS_API_HOST_INSECURE")))
        c.Assert(srcConfig.ExternalClient, Equals, false)
 
@@ -397,7 +397,7 @@ func (s *ServerNotRequiredSuite) TestLoadConfig(c *C) {
        c.Check(err, IsNil)
 
        c.Assert(dstConfig.APIHost, Equals, os.Getenv("ARVADOS_API_HOST"))
-       c.Assert(dstConfig.APIToken, Equals, os.Getenv("ARVADOS_API_TOKEN"))
+       c.Assert(dstConfig.APIToken, Equals, arvadostest.DataManagerToken)
        c.Assert(dstConfig.APIHostInsecure, Equals, matchTrue.MatchString(os.Getenv("ARVADOS_API_HOST_INSECURE")))
        c.Assert(dstConfig.ExternalClient, Equals, false)
 
@@ -422,7 +422,7 @@ func setupConfigFile(c *C, name string) *os.File {
        c.Check(err, IsNil)
 
        fileContent := "ARVADOS_API_HOST=" + os.Getenv("ARVADOS_API_HOST") + "\n"
-       fileContent += "ARVADOS_API_TOKEN=" + os.Getenv("ARVADOS_API_TOKEN") + "\n"
+       fileContent += "ARVADOS_API_TOKEN=" + arvadostest.DataManagerToken + "\n"
        fileContent += "ARVADOS_API_HOST_INSECURE=" + os.Getenv("ARVADOS_API_HOST_INSECURE") + "\n"
        fileContent += "ARVADOS_EXTERNAL_CLIENT=false\n"
        fileContent += "ARVADOS_BLOB_SIGNING_KEY=abcdefg"
@@ -470,9 +470,11 @@ func (s *DoMainTestSuite) Test_doMainWithSrcAndDstConfig(c *C) {
        args := []string{"-src", srcConfig.Name(), "-dst", dstConfig.Name()}
        os.Args = append(os.Args, args...)
 
-       // Start keepservers. Since we are not doing any tweaking as in setupRsync func,
-       // kcSrc and kcDst will be the same and no actual copying to dst will happen, but that's ok.
+       // Start keepservers. Since we are not doing any tweaking as
+       // in setupRsync func, kcSrc and kcDst will be the same and no
+       // actual copying to dst will happen, but that's ok.
        arvadostest.StartKeep(2, false)
+       defer arvadostest.StopKeep(2)
 
        err := doMain()
        c.Check(err, IsNil)