Merge branch '10200-cwl-crunch-script' closes #10200
authorPeter Amstutz <peter.amstutz@curoverse.com>
Mon, 17 Oct 2016 19:05:53 +0000 (15:05 -0400)
committerPeter Amstutz <peter.amstutz@curoverse.com>
Mon, 17 Oct 2016 19:05:53 +0000 (15:05 -0400)
67 files changed:
apps/workbench/app/controllers/application_controller.rb
apps/workbench/app/helpers/application_helper.rb
apps/workbench/app/models/proxy_work_unit.rb
apps/workbench/app/models/work_unit.rb
apps/workbench/app/views/work_units/_component_detail.html.erb
apps/workbench/app/views/work_units/_show_child.html.erb
apps/workbench/app/views/work_units/_show_component.html.erb
apps/workbench/app/views/work_units/_show_log_link.html.erb [new file with mode: 0644]
apps/workbench/test/controllers/work_units_controller_test.rb
apps/workbench/test/diagnostics/pipeline_test.rb
apps/workbench/test/diagnostics_test_helper.rb
apps/workbench/test/unit/work_unit_test.rb
doc/install/install-arv-git-httpd.html.textile.liquid
sdk/cli/test/test_arv-keep-get.rb
sdk/cwl/arvados_cwl/__init__.py
sdk/cwl/arvados_cwl/arvjob.py
sdk/cwl/arvados_cwl/pathmapper.py
sdk/cwl/arvados_cwl/runner.py
sdk/cwl/tests/hw.py [new file with mode: 0644]
sdk/cwl/tests/test_pathmapper.py [new file with mode: 0644]
sdk/cwl/tests/test_submit.py
sdk/go/arvados/duration.go
sdk/go/streamer/streamer.go
sdk/go/streamer/streamer_test.go
sdk/python/arvados/arvfile.py
sdk/python/tests/run_test_server.py
sdk/python/tests/test_collections.py
services/api/app/models/container.rb
services/api/app/models/container_request.rb
services/api/test/unit/container_request_test.rb
services/arv-git-httpd/arvados-git-httpd.service [moved from services/arv-git-httpd/arv-git-httpd.service with 75% similarity]
services/arv-git-httpd/doc.go [deleted file]
services/arv-git-httpd/git_handler.go
services/arv-git-httpd/git_handler_test.go
services/arv-git-httpd/gitolite_test.go
services/arv-git-httpd/integration_test.go
services/arv-git-httpd/main.go
services/arv-git-httpd/usage.go
services/crunch-dispatch-slurm/squeue.go
services/fuse/arvados_fuse/command.py
services/fuse/tests/test_command_args.py
services/fuse/tests/test_mount.py
services/keepstore/azure_blob_volume.go
services/keepstore/azure_blob_volume_test.go
services/keepstore/bufferpool_test.go
services/keepstore/config.go [new file with mode: 0644]
services/keepstore/deprecated.go [new file with mode: 0644]
services/keepstore/handler_test.go
services/keepstore/handlers.go
services/keepstore/keepstore.go
services/keepstore/keepstore.service [new file with mode: 0644]
services/keepstore/keepstore_test.go
services/keepstore/perms.go
services/keepstore/perms_test.go
services/keepstore/pull_worker_test.go
services/keepstore/s3_volume.go
services/keepstore/s3_volume_test.go
services/keepstore/trash_worker.go
services/keepstore/trash_worker_test.go
services/keepstore/usage.go [new file with mode: 0644]
services/keepstore/volume.go
services/keepstore/volume_generic_test.go
services/keepstore/volume_test.go
services/keepstore/volume_unix.go
services/keepstore/volume_unix_test.go
services/login-sync/bin/arvados-login-sync
tools/arvbox/lib/arvbox/docker/service/arv-git-httpd/run-service

index 9da1d78003624db7c6f7ac13f285897631b81893..f68250ba15bbf77d9e25821632f3d37a0154e25c 100644 (file)
@@ -526,16 +526,17 @@ class ApplicationController < ActionController::Base
     begin
       if not model_class
         @object = nil
+      elsif params[:uuid].nil? or params[:uuid].empty?
+        @object = nil
       elsif not params[:uuid].is_a?(String)
         @object = model_class.where(uuid: params[:uuid]).first
-      elsif params[:uuid].empty?
-        @object = nil
       elsif (model_class != Link and
              resource_class_for_uuid(params[:uuid]) == Link)
         @name_link = Link.find(params[:uuid])
         @object = model_class.find(@name_link.head_uuid)
       else
         @object = model_class.find(params[:uuid])
+        load_preloaded_objects [@object]
       end
     rescue ArvadosApiClient::NotFoundException, ArvadosApiClient::NotLoggedInException, RuntimeError => error
       if error.is_a?(RuntimeError) and (error.message !~ /^argument to find\(/)
@@ -1180,15 +1181,15 @@ class ApplicationController < ActionController::Base
 
   # helper method to get object of a given dataclass and uuid
   helper_method :object_for_dataclass
-  def object_for_dataclass dataclass, uuid
+  def object_for_dataclass dataclass, uuid, by_attr=nil
     raise ArgumentError, 'No input argument dataclass' unless (dataclass && uuid)
-    preload_objects_for_dataclass(dataclass, [uuid])
+    preload_objects_for_dataclass(dataclass, [uuid], by_attr)
     @objects_for[uuid]
   end
 
   # helper method to preload objects for given dataclass and uuids
   helper_method :preload_objects_for_dataclass
-  def preload_objects_for_dataclass dataclass, uuids
+  def preload_objects_for_dataclass dataclass, uuids, by_attr=nil
     @objects_for ||= {}
 
     raise ArgumentError, 'Argument is not a data class' unless dataclass.is_a? Class
@@ -1205,12 +1206,29 @@ class ApplicationController < ActionController::Base
     uuids.each do |x|
       @objects_for[x] = nil
     end
-    dataclass.where(uuid: uuids).each do |obj|
-      @objects_for[obj.uuid] = obj
+    if by_attr and ![:uuid, :name].include?(by_attr)
+      raise ArgumentError, "Preloading only using lookups by uuid or name are supported: #{by_attr}"
+    elsif by_attr and by_attr == :name
+      dataclass.where(name: uuids).each do |obj|
+        @objects_for[obj.name] = obj
+      end
+    else
+      dataclass.where(uuid: uuids).each do |obj|
+        @objects_for[obj.uuid] = obj
+      end
     end
     @objects_for
   end
 
+  # helper method to load objects that are already preloaded
+  helper_method :load_preloaded_objects
+  def load_preloaded_objects objs
+    @objects_for ||= {}
+    objs.each do |obj|
+      @objects_for[obj.uuid] = obj
+    end
+  end
+
   def wiselinks_layout
     'body'
   end
index b5df9f38a8c3c5da421476d47621ee96f3ca612e..21879a57a7d90aa77ba0f90e6a325f9ae9d5f00c 100644 (file)
@@ -126,7 +126,11 @@ module ApplicationHelper
           else
             begin
               if resource_class.name == 'Collection'
-                link_name = collections_for_object(link_uuid).andand.first.andand.friendly_link_name
+                if CollectionsHelper.match(link_uuid)
+                  link_name = collection_for_pdh(link_uuid).andand.first.andand.portable_data_hash
+                else
+                  link_name = collections_for_object(link_uuid).andand.first.andand.friendly_link_name
+                end
               else
                 link_name = object_for_dataclass(resource_class, link_uuid).andand.friendly_link_name
               end
index 11ec0ee196326d6a5c7d06cf0f0455a11fc9b167..44905be061d9808d58b34950d9fa0d36edac1bb2 100644 (file)
@@ -173,31 +173,6 @@ class ProxyWorkUnit < WorkUnit
     @unreadable_children
   end
 
-  def readable?
-    resource_class = ArvadosBase::resource_class_for_uuid(uuid)
-    resource_class.where(uuid: [uuid]).first rescue nil
-  end
-
-  def link_to_log
-    if state_label.in? ["Complete", "Failed", "Cancelled"]
-      lc = log_collection
-      if lc
-        logCollection = Collection.find? lc
-        if logCollection
-          ApplicationController.helpers.link_to("Log", "#{uri}#Log")
-        else
-          "Log unavailable"
-        end
-      end
-    elsif state_label == "Running"
-      if readable?
-        ApplicationController.helpers.link_to("Log", "#{uri}#Log")
-      else
-        "Log unavailable"
-      end
-    end
-  end
-
   def walltime
     if state_label != "Queued"
       if started_at
index 924e067815718fc0fddf52a836c99cd2d6ffd94b..0c384bb209d905f0e8d3efa164c79383b0088059 100644 (file)
@@ -115,10 +115,6 @@ class WorkUnit
     # returns true if this work unit can be canceled
   end
 
-  def readable?
-    # is the proxied object readable by current user?
-  end
-
   def uri
     # returns the uri for this work unit
   end
@@ -132,10 +128,6 @@ class WorkUnit
   end
 
   # view helper methods
-  def link_to_log
-    # display a link to log if present
-  end
-
   def walltime
     # return walltime for a running or completed work unit
   end
index e15cc443a93ca9062b4edb8bc99554ca42e48c2b..7d588bace69d05651ca675a4069fa4ecafcd27be 100644 (file)
           </div>
           <div class="col-md-6">
             <table>
-              <% # link to repo tree/file only if the repo is readable
-                 # and the commit is a sha1...
-                 repo =
-                 (/^[0-9a-f]{40}$/ =~ current_obj.script_version and
-                 Repository.where(name: current_obj.repository).first)
-
-                 # ...and the api server provides an http:// or https:// url
+              <% # link to repo tree/file only if the repo is readable and the commit is a sha1
+                 repo = (/^[0-9a-f]{40}$/ =~ current_obj.script_version and
+                         current_obj.repository and
+                         object_for_dataclass(Repository, current_obj.repository, :name))
                  repo = nil unless repo.andand.http_fetch_url
                  %>
               <% [:script, :repository, :script_version, :supplied_script_version, :nondeterministic,
index acf19fd6b4cedc06553c2f9b4cd25d087bcebe65..2693334bc21404b0631e313d08a73311cd369f91 100644 (file)
@@ -17,7 +17,7 @@
           <div class="col-md-8"></div>
         <% else %>
           <div class="col-md-1">
-            <%= current_obj.link_to_log %>
+            <%= render partial: 'work_units/show_log_link', locals: {wu: current_obj} %>
           </div>
 
           <% walltime = current_obj.walltime %>
index 89233cfe003b3987a4df02e594e1ebd24155ee83..4feb292209689f069de1e4c6911257730b5f31c9 100644 (file)
   uuids = wu.children.collect {|c| c.uuid}.compact
   if uuids.any?
     resource_class = resource_class_for_uuid(uuids.first, friendly_name: true)
-    preload_objects_for_dataclass resource_class, uuids
+
+    start = 0; inc = 200
+    while start < uuids.length
+      preload_objects_for_dataclass resource_class, uuids[start, inc]
+      start += inc
+    end
   end
 
+  collections = wu.outputs.flatten.uniq
+  collections << wu.log_collection if wu.log_collection
+  collections << wu.docker_image if wu.docker_image
   collections = wu.children.collect {|j| j.outputs}.compact
   collections = collections.flatten.uniq
   collections.concat wu.children.collect {|j| j.docker_image}.uniq.compact
+  collections.concat wu.children.collect {|j| j.log_collection}.uniq.compact
   collections_pdhs = collections.select {|x| !(m = CollectionsHelper.match(x)).nil?}.uniq.compact
   collections_uuids = collections - collections_pdhs
-  preload_collections_for_objects collections_uuids if collections_uuids.any?
-  preload_for_pdhs collections_pdhs if collections_pdhs.any?
+
+  if collections_uuids.any?
+    start = 0; inc = 200
+    while start < collections_uuids.length
+      preload_collections_for_objects collections_uuids[start, inc]
+      start += inc
+    end
+  end
+
+  if collections_pdhs.any?
+    start = 0; inc = 200
+    while start < collections_pdhs.length
+      preload_for_pdhs collections_pdhs[start, inc]
+      start += inc
+    end
+  end
+
+  repos = wu.children.collect {|c| c.repository}.uniq.compact
+  preload_objects_for_dataclass(Repository, repos, :name) if repos.any?
 %>
 
 <% if wu.has_unreadable_children %>
diff --git a/apps/workbench/app/views/work_units/_show_log_link.html.erb b/apps/workbench/app/views/work_units/_show_log_link.html.erb
new file mode 100644 (file)
index 0000000..a54ab32
--- /dev/null
@@ -0,0 +1,14 @@
+<% if wu.state_label.in? ["Complete", "Failed", "Cancelled"] %>
+  <% lc = wu.log_collection %>
+  <% if lc and object_readable(lc, Collection) and object_readable(wu.uuid) %>
+    <%= link_to("Log", "#{wu.uri}#Log") %>
+  <% else %>
+    Log unavailable
+  <% end %>
+<% elsif wu.state_label == "Running" %>
+  <% if object_readable(wu.uuid) %>
+    <%= link_to("Log", "#{wu.uri}#Log") %>
+  <% else %>
+    Log unavailable
+  <% end %>
+<% end %>
index 12e0271260edf3f815c6be70b0e995438d32761e..ee18861c92099ea91ec62bf551ff7ad17317e6f0 100644 (file)
@@ -65,4 +65,27 @@ class WorkUnitsControllerTest < ActionController::TestCase
                           }]
     get :index, encoded_params, session_for(:active)
   end
+
+  [
+    [Job, 'active', 'running_job_with_components', '/jobs/zzzzz-8i9sb-jyq01m7in1jlofj#Log'],
+    [PipelineInstance, 'active', 'pipeline_in_running_state', '/jobs/zzzzz-8i9sb-pshmckwoma9plh7#Log'],
+    [PipelineInstance, nil, 'pipeline_in_publicly_accessible_project_but_other_objects_elsewhere', 'Log unavailable'],
+  ].each do |type, token, fixture, log_link|
+    test "link_to_log for #{fixture} for #{token}" do
+      use_token 'admin'
+      obj = find_fixture(type, fixture)
+
+      @controller = if type == Job then JobsController.new else PipelineInstancesController.new end
+
+      if token
+        get :show, {id: obj['uuid']}, session_for(token)
+      else
+        Rails.configuration.anonymous_user_token =
+          api_fixture("api_client_authorizations", "anonymous", "api_token")
+        get :show, {id: obj['uuid']}
+      end
+
+      assert_includes @response.body, log_link
+    end
+  end
 end
index f9e324ca41f84377cf9b4cb6bc284e58b3abebf2..d038222cf0cf58278818bd087e288a4e1c11b52c 100644 (file)
@@ -46,7 +46,7 @@ class PipelineTest < DiagnosticsTest
       page.assert_selector 'a,button', text: 'Pause'
 
       # Wait for pipeline run to complete
-      wait_until_page_has 'Complete', pipeline_config['max_wait_seconds']
+      wait_until_page_has 'completed', pipeline_config['max_wait_seconds']
     end
   end
 
index c7433bb247450464fb42fa780e309ce09fdb27b7..3587721edae7bc6e96778efceaaedfadddbeacd3 100644 (file)
@@ -24,9 +24,12 @@ class DiagnosticsTest < ActionDispatch::IntegrationTest
   # Looks for the text_to_look_for for up to the max_time provided
   def wait_until_page_has text_to_look_for, max_time=30
     max_time = 30 if (!max_time || (max_time.to_s != max_time.to_i.to_s))
+    text_found = false
     Timeout.timeout(max_time) do
-      loop until page.has_text?(text_to_look_for)
+      until text_found do
+        visit_page_with_token 'active', current_path
+        text_found = has_text?(text_to_look_for)
+      end
     end
   end
-
 end
index e59d30d19a9ed0b3d9087150d76fc5abdb18a273..0ff38140e1a6f976ffc5b0da27e73b43415b4d54 100644 (file)
@@ -83,32 +83,6 @@ class WorkUnitTest < ActiveSupport::TestCase
     end
   end
 
-  [
-    [Job, 'active', 'running_job_with_components', true],
-    [Job, 'active', 'queued', false],
-    [Job, nil, 'completed_job_in_publicly_accessible_project', true],
-    [Job, 'active', 'completed_job_in_publicly_accessible_project', true],
-    [PipelineInstance, 'active', 'pipeline_in_running_state', true],  # no log, but while running the log link points to pi Log tab
-    [PipelineInstance, nil, 'pipeline_in_publicly_accessible_project_but_other_objects_elsewhere', false],
-    [PipelineInstance, 'active', 'pipeline_in_publicly_accessible_project_but_other_objects_elsewhere', false], #no log for completed pi
-    [Job, nil, 'job_in_publicly_accessible_project_but_other_objects_elsewhere', false, "Log unavailable"],
-  ].each do |type, token, fixture, has_log, log_link|
-    test "link_to_log for #{fixture} for #{token}" do
-      use_token token if token
-      obj = find_fixture(type, fixture)
-      wu = obj.work_unit
-
-      link = "#{wu.uri}#Log" if has_log
-      link_to_log = wu.link_to_log
-
-      if has_log
-        assert_includes link_to_log, link
-      else
-        assert_equal log_link, link_to_log
-      end
-    end
-  end
-
   test 'can_cancel?' do
     use_token 'active' do
       assert find_fixture(Job, 'running').work_unit.can_cancel?
index 5e373c38b855bb3b2f27410d94b0a6835da07c18..b28674de03cc8364d657ba957a9700e119da6732 100644 (file)
@@ -232,6 +232,7 @@ On Red Hat-based systems:
 
 <notextile>
 <pre><code>~$ <span class="userinput">sudo yum install git arvados-git-httpd</span>
+~$ <span class="userinput">sudo systemctl enable arvados-git-httpd</span>
 </code></pre>
 </notextile>
 
@@ -239,10 +240,9 @@ Verify that @arvados-git-httpd@ and @git-http-backend@ can be run:
 
 <notextile>
 <pre><code>~$ <span class="userinput">arvados-git-httpd -h</span>
-Usage of arvados-git-httpd:
-  -address="0.0.0.0:80": Address to listen on, "host:port".
-  -git-command="/usr/bin/git": Path to git executable. Each authenticated request will execute this program with a single argument, "http-backend".
-  -repo-root="/path/to/cwd": Path to git repositories.
+[...]
+Usage: arvados-git-httpd [-config path/to/arvados/git-httpd.yml]
+[...]
 ~$ <span class="userinput">git http-backend</span>
 Status: 500 Internal Server Error
 Expires: Fri, 01 Jan 1980 00:00:00 GMT
@@ -255,43 +255,29 @@ fatal: No REQUEST_METHOD from server
 
 h3. Enable arvados-git-httpd
 
-Install runit to supervise the arvados-git-httpd daemon.  {% include 'install_runit' %}
+{% include 'notebox_begin' %}
+
+The arvados-git-httpd package includes configuration files for systemd.  If you're using a different init system, you'll need to configure a service to start and stop an @arvados-git-httpd@ process as desired.
+
+{% include 'notebox_end' %}
 
-Configure runit to run arvados-git-httpd, making sure to update the API host to match your site:
+Create the configuration file @/etc/arvados/git-httpd/git-httpd.yml@. Run @arvados-git-httpd -h@ to learn more about configuration entries.
 
 <notextile>
-<pre><code>~$ <span class="userinput">sudo mkdir -p /etc/sv</span>
-~$ <span class="userinput">cd /etc/sv</span>
-/etc/sv$ <span class="userinput">sudo mkdir arvados-git-httpd; cd arvados-git-httpd</span>
-/etc/sv/arvados-git-httpd$ <span class="userinput">sudo mkdir log</span>
-/etc/sv/arvados-git-httpd$ <span class="userinput">sudo sh -c 'cat &gt;log/run' &lt;&lt;'EOF'
-#!/bin/sh
-mkdir -p main
-chown git:git main
-exec chpst -u git:git svlogd -tt main
-EOF</span>
-/etc/sv/arvados-git-httpd$ <span class="userinput">sudo sh -c 'cat &gt;run' &lt;&lt;'EOF'
-#!/bin/sh
-export ARVADOS_API_HOST=<b>uuid_prefix.your.domain</b>
-export GITOLITE_HTTP_HOME=/var/lib/arvados/git
-export GL_BYPASS_ACCESS_CHECKS=1
-export PATH="$PATH:/var/lib/arvados/git/bin"
-exec chpst -u git:git arvados-git-httpd -address=:9001 -git-command=/var/lib/arvados/git/gitolite/src/gitolite-shell -repo-root=<b>/var/lib/arvados/git</b>/repositories 2&gt;&1
-EOF</span>
-/etc/sv/arvados-git-httpd$ <span class="userinput">sudo chmod +x run log/run</span>
-/etc/sv/arvados-git-httpd$ <span class="userinput">sudo ln -s "$(pwd)" /etc/service/</span>
+<pre><code>Client:
+  APIHost: <b>uuid_prefix.your.domain</b>
+  Insecure: false
+GitCommand: /var/lib/arvados/git/gitolite/src/gitolite-shell
+GitoliteHome: /var/lib/arvados/git
+Listen: :9001
+RepoRoot: /var/lib/arvados/git/repositories
 </code></pre>
 </notextile>
 
-If you are using a different daemon supervisor, or if you want to test the daemon in a terminal window, an equivalent shell command to run arvados-git-httpd is:
+Restart the systemd service to ensure the new configuration is used.
 
 <notextile>
-<pre><code>sudo -u git \
-  ARVADOS_API_HOST=<span class="userinput">uuid_prefix.your.domain</span> \
-  GITOLITE_HTTP_HOME=/var/lib/arvados/git \
-  GL_BYPASS_ACCESS_CHECKS=1 \
-  PATH="$PATH:/var/lib/arvados/git/bin" \
-  arvados-git-httpd -address=:9001 -git-command=/var/lib/arvados/git/gitolite/src/gitolite-shell -repo-root=/var/lib/arvados/git/repositories 2&gt;&1
+<pre><code>~$ <span class="userinput">sudo systemctl restart arvados-git-httpd</span>
 </code></pre>
 </notextile>
 
index 0e578b8fb42d4aa2fd4cf46d55e6b113cda958a5..d0224aedb01e354408f5a710eb88dff344314a90 100644 (file)
@@ -180,7 +180,7 @@ class TestArvKeepGet < Minitest::Test
     end
     assert_equal "#{Digest::MD5.hexdigest('foo')}  ./foo\n", err
     assert_equal '', out
-    assert_equal false, File.exists?('tmp/foo')
+    assert_equal false, File.exist?('tmp/foo')
   end
 
   def test_sha1_nowrite
@@ -190,7 +190,7 @@ class TestArvKeepGet < Minitest::Test
     end
     assert_equal "#{Digest::SHA1.hexdigest('foo')}  ./foo\n", err
     assert_equal '', out
-    assert_equal false, File.exists?('tmp/foo')
+    assert_equal false, File.exist?('tmp/foo')
   end
 
   def test_block_to_file
index c90f8902684304b400cc7ece97068a8e6b094000..7ebb13f1bb48af456ce50f2c7d8629e03c975cb0 100644 (file)
@@ -24,7 +24,7 @@ import arvados.config
 
 from .arvcontainer import ArvadosContainer, RunnerContainer
 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
-from. runner import Runner
+from. runner import Runner, upload_instance
 from .arvtool import ArvadosCommandTool
 from .arvworkflow import ArvadosWorkflow, upload_workflow
 from .fsaccess import CollectionFsAccess
@@ -64,6 +64,8 @@ class ArvCwlRunner(object):
         self.pipeline = None
         self.final_output_collection = None
         self.output_name = output_name
+        self.project_uuid = None
+
         if keep_client is not None:
             self.keep_client = keep_client
         else:
@@ -266,6 +268,8 @@ class ArvCwlRunner(object):
             kwargs["docker_outdir"] = "$(task.outdir)"
             kwargs["tmpdir"] = "$(task.tmpdir)"
 
+        upload_instance(self, shortname(tool.tool["id"]), tool, job_order)
+
         runnerjob = None
         if kwargs.get("submit"):
             if self.work_api == "containers":
@@ -289,7 +293,7 @@ class ArvCwlRunner(object):
             logger.info("Pipeline instance %s", self.pipeline["uuid"])
 
         if runnerjob and not kwargs.get("wait"):
-            runnerjob.run()
+            runnerjob.run(wait=kwargs.get("wait"))
             return runnerjob.uuid
 
         self.poll_api = arvados.api('v1')
index 8b1a9346830b349e145fcb0fe5ec3f3cf1acf823..8269eeebdbd417ba908c8e6ca56e8535fd1ffcd9 100644 (file)
@@ -2,6 +2,7 @@ import logging
 import re
 import copy
 import json
+import time
 
 from cwltool.process import get_feature, shortname
 from cwltool.errors import WorkflowException
@@ -233,6 +234,12 @@ class RunnerJob(Runner):
 
         workflowmapper = super(RunnerJob, self).arvados_job_spec(dry_run=dry_run, pull_image=pull_image, **kwargs)
 
+        # Need to filter this out, gets added by cwltool when providing
+        # parameters on the command line, and arv-run-pipeline-instance doesn't
+        # like it.
+        if "job_order" in self.job_order:
+            del self.job_order["job_order"]
+
         self.job_order["cwl:tool"] = workflowmapper.mapper(self.tool.tool["id"]).target[5:]
         if self.output_name:
             self.job_order["arv:output_name"] = self.output_name
@@ -248,28 +255,38 @@ class RunnerJob(Runner):
 
     def run(self, *args, **kwargs):
         job_spec = self.arvados_job_spec(*args, **kwargs)
-        job_spec.setdefault("owner_uuid", self.arvrunner.project_uuid)
-
-        response = self.arvrunner.api.jobs().create(
-            body=job_spec,
-            find_or_create=self.enable_reuse
-        ).execute(num_retries=self.arvrunner.num_retries)
 
-        self.uuid = response["uuid"]
+        for k,v in job_spec["script_parameters"].items():
+            if isinstance(v, dict):
+                job_spec["script_parameters"][k] = {"value": v}
+
+        self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().create(
+            body={
+                "owner_uuid": self.arvrunner.project_uuid,
+                "name": shortname(self.tool.tool["id"]),
+                "components": {"cwl-runner": job_spec },
+                "state": "RunningOnServer"}).execute(num_retries=self.arvrunner.num_retries)
+        logger.info("Created pipeline %s", self.arvrunner.pipeline["uuid"])
+
+        if kwargs.get("wait") is False:
+            self.uuid = self.arvrunner.pipeline["uuid"]
+            return
+
+        job = None
+        while not job:
+            time.sleep(2)
+            self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().get(
+                uuid=self.arvrunner.pipeline["uuid"]).execute(
+                    num_retries=self.arvrunner.num_retries)
+            job = self.arvrunner.pipeline["components"]["cwl-runner"].get("job")
+            if not job and self.arvrunner.pipeline["state"] != "RunningOnServer":
+                raise WorkflowException("Submitted pipeline is %s" % (self.arvrunner.pipeline["state"]))
+
+        self.uuid = job["uuid"]
         self.arvrunner.processes[self.uuid] = self
 
-        logger.info("Submitted job %s", response["uuid"])
-
-        if kwargs.get("submit"):
-            self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().create(
-                body={
-                    "owner_uuid": self.arvrunner.project_uuid,
-                    "name": shortname(self.tool.tool["id"]),
-                    "components": {"cwl-runner": {"job": {"uuid": self.uuid, "state": response["state"]} } },
-                    "state": "RunningOnClient"}).execute(num_retries=self.arvrunner.num_retries)
-
-        if response["state"] in ("Complete", "Failed", "Cancelled"):
-            self.done(response)
+        if job["state"] in ("Complete", "Failed", "Cancelled"):
+            self.done(job)
 
 
 class RunnerTemplate(object):
index 228d43304af9e4345b1800043b175cb0ec13f594..73c81ceb0fcdb033203c1b7e5425b3875ea121d6 100644 (file)
@@ -37,11 +37,11 @@ class ArvPathMapper(PathMapper):
                 # Local FS ref, may need to be uploaded or may be on keep
                 # mount.
                 ab = abspath(src, self.input_basedir)
-                st = arvados.commands.run.statfile("", ab, fnPattern=self.file_pattern)
+                st = arvados.commands.run.statfile("", ab, fnPattern="keep:%s/%s")
                 if isinstance(st, arvados.commands.run.UploadFile):
                     uploadfiles.add((src, ab, st))
                 elif isinstance(st, arvados.commands.run.ArvFile):
-                    self._pathmap[src] = MapperEnt(ab, st.fn, "File")
+                    self._pathmap[src] = MapperEnt(st.fn, self.collection_pattern % st.fn[5:], "File")
                 elif src.startswith("_:"):
                     if "contents" in srcobj:
                         pass
@@ -78,9 +78,11 @@ class ArvPathMapper(PathMapper):
 
     def setup(self, referenced_files, basedir):
         # type: (List[Any], unicode) -> None
-        self._pathmap = self.arvrunner.get_uploaded()
         uploadfiles = set()
 
+        for k,v in self.arvrunner.get_uploaded().iteritems():
+            self._pathmap[k] = MapperEnt(v.resolved, self.collection_pattern % v.resolved[5:], "File")
+
         for srcobj in referenced_files:
             self.visit(srcobj, uploadfiles)
 
@@ -89,12 +91,12 @@ class ArvPathMapper(PathMapper):
                                              self.arvrunner.api,
                                              dry_run=False,
                                              num_retries=self.arvrunner.num_retries,
-                                             fnPattern=self.file_pattern,
+                                             fnPattern="keep:%s/%s",
                                              name=self.name,
                                              project=self.arvrunner.project_uuid)
 
         for src, ab, st in uploadfiles:
-            self._pathmap[src] = MapperEnt("keep:" + st.keepref, st.fn, "File")
+            self._pathmap[src] = MapperEnt(st.fn, self.collection_pattern % st.fn[5:], "File")
             self.arvrunner.add_uploaded(src, self._pathmap[src])
 
         for srcobj in referenced_files:
index e5b4e006e8cce7cac780436fc06c6dcd79882730..054d3530cfe174b9a5da3025d248097ca1062d7a 100644 (file)
@@ -112,6 +112,30 @@ def upload_docker(arvrunner, tool):
         for s in tool.steps:
             upload_docker(arvrunner, s.embedded_tool)
 
+def upload_instance(arvrunner, name, tool, job_order):
+        upload_docker(arvrunner, tool)
+
+        workflowmapper = upload_dependencies(arvrunner,
+                                             name,
+                                             tool.doc_loader,
+                                             tool.tool,
+                                             tool.tool["id"],
+                                             True)
+
+        jobmapper = upload_dependencies(arvrunner,
+                                        os.path.basename(job_order.get("id", "#")),
+                                        tool.doc_loader,
+                                        job_order,
+                                        job_order.get("id", "#"),
+                                        False)
+
+        adjustDirObjs(job_order, trim_listing)
+
+        if "id" in job_order:
+            del job_order["id"]
+
+        return workflowmapper
+
 
 class Runner(object):
     def __init__(self, runner, tool, job_order, enable_reuse, output_name):
@@ -128,31 +152,8 @@ class Runner(object):
         pass
 
     def arvados_job_spec(self, *args, **kwargs):
-        upload_docker(self.arvrunner, self.tool)
-
         self.name = os.path.basename(self.tool.tool["id"])
-
-        workflowmapper = upload_dependencies(self.arvrunner,
-                                             self.name,
-                                             self.tool.doc_loader,
-                                             self.tool.tool,
-                                             self.tool.tool["id"],
-                                             True)
-
-        jobmapper = upload_dependencies(self.arvrunner,
-                                        os.path.basename(self.job_order.get("id", "#")),
-                                        self.tool.doc_loader,
-                                        self.job_order,
-                                        self.job_order.get("id", "#"),
-                                        False)
-
-        adjustDirObjs(self.job_order, trim_listing)
-
-        if "id" in self.job_order:
-            del self.job_order["id"]
-
-        return workflowmapper
-
+        return upload_instance(self.arvrunner, self.name, self.tool, self.job_order)
 
     def done(self, record):
         if record["state"] == "Complete":
diff --git a/sdk/cwl/tests/hw.py b/sdk/cwl/tests/hw.py
new file mode 100644 (file)
index 0000000..62c813a
--- /dev/null
@@ -0,0 +1 @@
+print "Hello world"
diff --git a/sdk/cwl/tests/test_pathmapper.py b/sdk/cwl/tests/test_pathmapper.py
new file mode 100644 (file)
index 0000000..7e13066
--- /dev/null
@@ -0,0 +1,91 @@
+import functools
+import mock
+import sys
+import unittest
+import json
+import logging
+import os
+
+import arvados
+import arvados.keep
+import arvados.collection
+import arvados_cwl
+
+from cwltool.pathmapper import MapperEnt
+
+from arvados_cwl.pathmapper import ArvPathMapper
+
+def upload_mock(files, api, dry_run=False, num_retries=0, project=None, fnPattern="$(file %s/%s)", name=None):
+    pdh = "99999999999999999999999999999991+99"
+    for c in files:
+        c.fn = fnPattern % (pdh, os.path.basename(c.fn))
+
+class TestPathmap(unittest.TestCase):
+    def test_keepref(self):
+        """Test direct keep references."""
+
+        arvrunner = arvados_cwl.ArvCwlRunner(mock.MagicMock())
+
+        p = ArvPathMapper(arvrunner, [{
+            "class": "File",
+            "location": "keep:99999999999999999999999999999991+99/hw.py"
+        }], "", "/test/%s", "/test/%s/%s")
+
+        self.assertEqual({'keep:99999999999999999999999999999991+99/hw.py': MapperEnt(resolved='keep:99999999999999999999999999999991+99/hw.py', target='/test/99999999999999999999999999999991+99/hw.py', type='File')},
+                         p._pathmap)
+
+    @mock.patch("arvados.commands.run.uploadfiles")
+    def test_upload(self, upl):
+        """Test pathmapper uploading files."""
+
+        arvrunner = arvados_cwl.ArvCwlRunner(mock.MagicMock())
+
+        upl.side_effect = upload_mock
+
+        p = ArvPathMapper(arvrunner, [{
+            "class": "File",
+            "location": "tests/hw.py"
+        }], "", "/test/%s", "/test/%s/%s")
+
+        self.assertEqual({'tests/hw.py': MapperEnt(resolved='keep:99999999999999999999999999999991+99/hw.py', target='/test/99999999999999999999999999999991+99/hw.py', type='File')},
+                         p._pathmap)
+
+    @mock.patch("arvados.commands.run.uploadfiles")
+    def test_prev_uploaded(self, upl):
+        """Test pathmapper handling previously uploaded files."""
+
+        arvrunner = arvados_cwl.ArvCwlRunner(mock.MagicMock())
+        arvrunner.add_uploaded('tests/hw.py', MapperEnt(resolved='keep:99999999999999999999999999999991+99/hw.py', target='', type='File'))
+
+        upl.side_effect = upload_mock
+
+        p = ArvPathMapper(arvrunner, [{
+            "class": "File",
+            "location": "tests/hw.py"
+        }], "", "/test/%s", "/test/%s/%s")
+
+        self.assertEqual({'tests/hw.py': MapperEnt(resolved='keep:99999999999999999999999999999991+99/hw.py', target='/test/99999999999999999999999999999991+99/hw.py', type='File')},
+                         p._pathmap)
+
+    @mock.patch("arvados.commands.run.uploadfiles")
+    @mock.patch("arvados.commands.run.statfile")
+    def test_statfile(self, statfile, upl):
+        """Test pathmapper handling ArvFile references."""
+        arvrunner = arvados_cwl.ArvCwlRunner(mock.MagicMock())
+
+        # An ArvFile object returned from arvados.commands.run.statfile means the file is located on a
+        # keep mount, so we can construct a direct reference directly without upload.
+        def statfile_mock(prefix, fn, fnPattern="$(file %s/%s)", dirPattern="$(dir %s/%s/)"):
+            st = arvados.commands.run.ArvFile("", fnPattern % ("99999999999999999999999999999991+99", "hw.py"))
+            return st
+
+        upl.side_effect = upload_mock
+        statfile.side_effect = statfile_mock
+
+        p = ArvPathMapper(arvrunner, [{
+            "class": "File",
+            "location": "tests/hw.py"
+        }], "", "/test/%s", "/test/%s/%s")
+
+        self.assertEqual({'tests/hw.py': MapperEnt(resolved='keep:99999999999999999999999999999991+99/hw.py', target='/test/99999999999999999999999999999991+99/hw.py', type='File')},
+                         p._pathmap)
index 6674efb8c4f75c2391160a702588ea2354e377a3..d3bdf8fedc30d897b323cd23fce5e74a017f3da9 100644 (file)
@@ -60,7 +60,13 @@ def stubs(func):
             "uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz5",
             "portable_data_hash": "99999999999999999999999999999995+99",
             "manifest_text": ""
-        }        )
+        },
+        {
+            "uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz6",
+            "portable_data_hash": "99999999999999999999999999999996+99",
+            "manifest_text": ""
+        }
+        )
         stubs.api.collections().get().execute.return_value = {
             "portable_data_hash": "99999999999999999999999999999993+99", "manifest_text": "./tool 00000000000000000000000000000000+0 0:0:submit_tool.cwl 0:0:blub.txt"}
 
@@ -112,6 +118,38 @@ def stubs(func):
             'script_version': 'master',
             'script': 'cwl-runner'
         }
+        stubs.pipeline_component = stubs.expect_job_spec.copy()
+        stubs.expect_pipeline_instance = {
+            'name': 'submit_wf.cwl',
+            'state': 'RunningOnServer',
+            "components": {
+                "cwl-runner": {
+                    'runtime_constraints': {'docker_image': 'arvados/jobs'},
+                    'script_parameters': {
+                        'y': {"value": {'basename': '99999999999999999999999999999998+99', 'location': 'keep:99999999999999999999999999999998+99', 'class': 'Directory'}},
+                        'x': {"value": {'basename': 'blorp.txt', 'class': 'File', 'location': 'keep:99999999999999999999999999999994+99/blorp.txt'}},
+                        'z': {"value": {'basename': 'anonymous', 'class': 'Directory',
+                              'listing': [
+                                  {'basename': 'renamed.txt', 'class': 'File', 'location': 'keep:99999999999999999999999999999998+99/file1.txt'}
+                              ]}},
+                        'cwl:tool': '99999999999999999999999999999991+99/wf/submit_wf.cwl'
+                    },
+                    'repository': 'arvados',
+                    'script_version': 'master',
+                    'script': 'cwl-runner'
+                }
+            }
+        }
+        stubs.pipeline_create = copy.deepcopy(stubs.expect_pipeline_instance)
+        stubs.expect_pipeline_uuid = "zzzzz-d1hrv-zzzzzzzzzzzzzzz"
+        stubs.pipeline_create["uuid"] = stubs.expect_pipeline_uuid
+        stubs.pipeline_with_job = copy.deepcopy(stubs.pipeline_create)
+        stubs.pipeline_with_job["components"]["cwl-runner"]["job"] = {
+            "uuid": "zzzzz-8i9sb-zzzzzzzzzzzzzzz",
+            "state": "Queued"
+        }
+        stubs.api.pipeline_instances().create().execute.return_value = stubs.pipeline_create
+        stubs.api.pipeline_instances().get().execute.return_value = stubs.pipeline_with_job
 
         stubs.expect_container_spec = {
             'priority': 1,
@@ -157,11 +195,12 @@ def stubs(func):
 
 
 class TestSubmit(unittest.TestCase):
+    @mock.patch("time.sleep")
     @stubs
-    def test_submit(self, stubs):
+    def test_submit(self, stubs, tm):
         capture_stdout = cStringIO.StringIO()
         exited = arvados_cwl.main(
-            ["--submit", "--no-wait",
+            ["--submit", "--no-wait", "--debug",
              "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
             capture_stdout, sys.stderr, api_client=stubs.api)
         self.assertEqual(exited, 0)
@@ -192,16 +231,16 @@ class TestSubmit(unittest.TestCase):
             }, ensure_unique_name=True),
             mock.call().execute()])
 
-        expect_job = copy.deepcopy(stubs.expect_job_spec)
-        expect_job["owner_uuid"] = stubs.fake_user_uuid
-        stubs.api.jobs().create.assert_called_with(
-            body=expect_job,
-            find_or_create=True)
+        expect_pipeline = copy.deepcopy(stubs.expect_pipeline_instance)
+        expect_pipeline["owner_uuid"] = stubs.fake_user_uuid
+        stubs.api.pipeline_instances().create.assert_called_with(
+            body=expect_pipeline)
         self.assertEqual(capture_stdout.getvalue(),
-                         stubs.expect_job_uuid + '\n')
+                         stubs.expect_pipeline_uuid + '\n')
 
+    @mock.patch("time.sleep")
     @stubs
-    def test_submit_with_project_uuid(self, stubs):
+    def test_submit_with_project_uuid(self, stubs, tm):
         project_uuid = 'zzzzz-j7d0g-zzzzzzzzzzzzzzz'
 
         exited = arvados_cwl.main(
@@ -211,11 +250,10 @@ class TestSubmit(unittest.TestCase):
             sys.stdout, sys.stderr, api_client=stubs.api)
         self.assertEqual(exited, 0)
 
-        expect_body = copy.deepcopy(stubs.expect_job_spec)
-        expect_body["owner_uuid"] = project_uuid
-        stubs.api.jobs().create.assert_called_with(
-            body=expect_body,
-            find_or_create=True)
+        expect_pipeline = copy.deepcopy(stubs.expect_pipeline_instance)
+        expect_pipeline["owner_uuid"] = project_uuid
+        stubs.api.pipeline_instances().create.assert_called_with(
+            body=expect_pipeline)
 
     @stubs
     def test_submit_container(self, stubs):
index 7b87aee6ab7b14c875aa044e3b88e11ed22bd567..a64eaacf8347eb2d6937c84e2c1b0598994ce937 100644 (file)
@@ -28,6 +28,11 @@ func (d Duration) String() string {
        return time.Duration(d).String()
 }
 
+// Duration returns a time.Duration
+func (d Duration) Duration() time.Duration {
+       return time.Duration(d)
+}
+
 // Value implements flag.Value
 func (d *Duration) Set(s string) error {
        dur, err := time.ParseDuration(s)
index 2217dd3352eae69255b74b4faa5a74425efca0ee..a46ca4cc55aa5c3faa8ccdbe5b73339aacfa50ef 100644 (file)
@@ -36,15 +36,19 @@ Alternately, if you already have a filled buffer and just want to read out from
 package streamer
 
 import (
+       "errors"
        "io"
 )
 
+var ErrAlreadyClosed = errors.New("cannot close a stream twice")
+
 type AsyncStream struct {
        buffer            []byte
        requests          chan sliceRequest
        add_reader        chan bool
        subtract_reader   chan bool
        wait_zero_readers chan bool
+       closed            bool
 }
 
 // Reads from the buffer managed by the Transfer()
@@ -55,7 +59,13 @@ type StreamReader struct {
 }
 
 func AsyncStreamFromReader(buffersize int, source io.Reader) *AsyncStream {
-       t := &AsyncStream{make([]byte, buffersize), make(chan sliceRequest), make(chan bool), make(chan bool), make(chan bool)}
+       t := &AsyncStream{
+               buffer:            make([]byte, buffersize),
+               requests:          make(chan sliceRequest),
+               add_reader:        make(chan bool),
+               subtract_reader:   make(chan bool),
+               wait_zero_readers: make(chan bool),
+       }
 
        go t.transfer(source)
        go t.readersMonitor()
@@ -64,7 +74,13 @@ func AsyncStreamFromReader(buffersize int, source io.Reader) *AsyncStream {
 }
 
 func AsyncStreamFromSlice(buf []byte) *AsyncStream {
-       t := &AsyncStream{buf, make(chan sliceRequest), make(chan bool), make(chan bool), make(chan bool)}
+       t := &AsyncStream{
+               buffer:            buf,
+               requests:          make(chan sliceRequest),
+               add_reader:        make(chan bool),
+               subtract_reader:   make(chan bool),
+               wait_zero_readers: make(chan bool),
+       }
 
        go t.transfer(nil)
        go t.readersMonitor()
@@ -115,16 +131,24 @@ func (this *StreamReader) WriteTo(dest io.Writer) (written int64, err error) {
 
 // Close the responses channel
 func (this *StreamReader) Close() error {
+       if this.stream == nil {
+               return ErrAlreadyClosed
+       }
        this.stream.subtract_reader <- true
        close(this.responses)
        this.stream = nil
        return nil
 }
 
-func (this *AsyncStream) Close() {
+func (this *AsyncStream) Close() error {
+       if this.closed {
+               return ErrAlreadyClosed
+       }
+       this.closed = true
        this.wait_zero_readers <- true
        close(this.requests)
        close(this.add_reader)
        close(this.subtract_reader)
        close(this.wait_zero_readers)
+       return nil
 }
index 80aeb268975d8acbc7f7d1c2e771c17e7adfa719..f5333c37c175be1774eab5318824bb018f47f31f 100644 (file)
@@ -365,3 +365,13 @@ func (s *StandaloneSuite) TestManyReaders(c *C) {
        writer.Write([]byte("baz"))
        writer.Close()
 }
+
+func (s *StandaloneSuite) TestMultipleCloseNoPanic(c *C) {
+       buffer := make([]byte, 100)
+       tr := AsyncStreamFromSlice(buffer)
+       sr := tr.MakeStreamReader()
+       c.Check(sr.Close(), IsNil)
+       c.Check(sr.Close(), Equals, ErrAlreadyClosed)
+       c.Check(tr.Close(), IsNil)
+       c.Check(tr.Close(), Equals, ErrAlreadyClosed)
+}
index f2f7df2dce2121b0c0c0e4b562f7c7963f1fc0ab..c394dab810715c2659b6f72f8f5f1e173d711ead 100644 (file)
@@ -10,6 +10,8 @@ import copy
 import errno
 import re
 import logging
+import collections
+import uuid
 
 from .errors import KeepWriteError, AssertionError, ArgumentError
 from .keep import KeepLocator
@@ -405,7 +407,7 @@ class _BlockManager(object):
     def __init__(self, keep, copies=None):
         """keep: KeepClient object to use"""
         self._keep = keep
-        self._bufferblocks = {}
+        self._bufferblocks = collections.OrderedDict()
         self._put_queue = None
         self._put_threads = None
         self._prefetch_queue = None
@@ -430,8 +432,11 @@ class _BlockManager(object):
           ArvadosFile that owns this block
 
         """
+        return self._alloc_bufferblock(blockid, starting_capacity, owner)
+
+    def _alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
         if blockid is None:
-            blockid = "bufferblock%i" % len(self._bufferblocks)
+            blockid = "%s" % uuid.uuid4()
         bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
         self._bufferblocks[bufferblock.blockid] = bufferblock
         return bufferblock
@@ -549,6 +554,33 @@ class _BlockManager(object):
     def __exit__(self, exc_type, exc_value, traceback):
         self.stop_threads()
 
+    @synchronized
+    def repack_small_blocks(self, force=False, sync=False):
+        """Packs small blocks together before uploading"""
+        # Search blocks ready for getting packed together before being committed to Keep.
+        # A WRITABLE block always has an owner.
+        # A WRITABLE block with its owner.closed() implies that it's
+        # size is <= KEEP_BLOCK_SIZE/2.
+        small_blocks = [b for b in self._bufferblocks.values() if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
+        if len(small_blocks) <= 1:
+            # Not enough small blocks for repacking
+            return
+
+        # Check if there are enough small blocks for filling up one in full
+        pending_write_size = sum([b.size() for b in small_blocks])
+        if force or (pending_write_size >= config.KEEP_BLOCK_SIZE):
+            new_bb = self._alloc_bufferblock()
+            while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
+                bb = small_blocks.pop(0)
+                arvfile = bb.owner
+                new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
+                arvfile.set_segments([Range(new_bb.blockid,
+                                            0,
+                                            bb.size(),
+                                            new_bb.write_pointer - bb.size())])
+                self._delete_bufferblock(bb.blockid)
+            self.commit_bufferblock(new_bb, sync=sync)
+
     def commit_bufferblock(self, block, sync):
         """Initiate a background upload of a bufferblock.
 
@@ -562,7 +594,6 @@ class _BlockManager(object):
           which case it will wait on an upload queue slot.
 
         """
-
         try:
             # Mark the block as PENDING so to disallow any more appends.
             block.set_state(_BufferBlock.PENDING)
@@ -599,6 +630,9 @@ class _BlockManager(object):
 
     @synchronized
     def delete_bufferblock(self, locator):
+        self._delete_bufferblock(locator)
+
+    def _delete_bufferblock(self, locator):
         bb = self._bufferblocks[locator]
         bb.clear()
         del self._bufferblocks[locator]
@@ -629,11 +663,13 @@ class _BlockManager(object):
         are uploaded.  Raises KeepWriteError() if any blocks failed to upload.
 
         """
+        self.repack_small_blocks(force=True, sync=True)
+
         with self.lock:
             items = self._bufferblocks.items()
 
         for k,v in items:
-            if v.state() != _BufferBlock.COMMITTED:
+            if v.state() != _BufferBlock.COMMITTED and v.owner:
                 v.owner.flush(sync=False)
 
         with self.lock:
@@ -700,6 +736,7 @@ class ArvadosFile(object):
         """
         self.parent = parent
         self.name = name
+        self._writers = set()
         self._committed = False
         self._segments = []
         self.lock = parent.root_collection().lock
@@ -776,9 +813,13 @@ class ArvadosFile(object):
     def __ne__(self, other):
         return not self.__eq__(other)
 
+    @synchronized
+    def set_segments(self, segs):
+        self._segments = segs
+
     @synchronized
     def set_committed(self):
-        """Set committed flag to False"""
+        """Set committed flag to True"""
         self._committed = True
 
     @synchronized
@@ -786,6 +827,34 @@ class ArvadosFile(object):
         """Get whether this is committed or not."""
         return self._committed
 
+    @synchronized
+    def add_writer(self, writer):
+        """Add an ArvadosFileWriter reference to the list of writers"""
+        if isinstance(writer, ArvadosFileWriter):
+            self._writers.add(writer)
+
+    @synchronized
+    def remove_writer(self, writer, flush):
+        """
+        Called from ArvadosFileWriter.close(). Remove a writer reference from the list
+        and do some block maintenance tasks.
+        """
+        self._writers.remove(writer)
+
+        if flush or self.size() > config.KEEP_BLOCK_SIZE / 2:
+            # File writer closed, not small enough for repacking
+            self.flush()
+        elif self.closed():
+            # All writers closed and size is adequate for repacking
+            self.parent._my_block_manager().repack_small_blocks()
+
+    def closed(self):
+        """
+        Get whether this is closed or not. When the writers list is empty, the file
+        is supposed to be closed.
+        """
+        return len(self._writers) == 0
+
     @must_be_writable
     @synchronized
     def truncate(self, size):
@@ -1067,6 +1136,7 @@ class ArvadosFileWriter(ArvadosFileReader):
     def __init__(self, arvadosfile, mode, num_retries=None):
         super(ArvadosFileWriter, self).__init__(arvadosfile, num_retries=num_retries)
         self.mode = mode
+        self.arvadosfile.add_writer(self)
 
     @_FileLikeObjectBase._before_close
     @retry_method
@@ -1096,7 +1166,7 @@ class ArvadosFileWriter(ArvadosFileReader):
     def flush(self):
         self.arvadosfile.flush()
 
-    def close(self):
+    def close(self, flush=True):
         if not self.closed:
-            self.flush()
+            self.arvadosfile.remove_writer(self, flush)
             super(ArvadosFileWriter, self).close()
index e72f67dce49049f37c9b2e68794eb62cc780297c..642b7ccbad51846a9f1c25acc86f6b0505897c62 100644 (file)
@@ -394,8 +394,7 @@ def run_keep(blob_signing_key=None, enforce_permissions=False, num_servers=2):
     with open(os.path.join(TEST_TMPDIR, "keep.blob_signing_key"), "w") as f:
         keep_args['-blob-signing-key-file'] = f.name
         f.write(blob_signing_key)
-    if enforce_permissions:
-        keep_args['-enforce-permissions'] = 'true'
+    keep_args['-enforce-permissions'] = str(enforce_permissions).lower()
     with open(os.path.join(TEST_TMPDIR, "keep.data-manager-token-file"), "w") as f:
         keep_args['-data-manager-token-file'] = f.name
         f.write(auth_token('data_manager'))
index cf8f23e375f66835aca09c1b2085113da1cf7c67..fc30a242eba1bfc665a05747de66f999869ef8a4 100644 (file)
@@ -1089,6 +1089,7 @@ class NewCollectionTestCaseWithServers(run_test_server.TestCaseWithServers):
             # One file committed
             with c.open("foo.txt", "w") as foo:
                 foo.write("foo")
+                foo.flush() # Force block commit
             f.write("0123456789")
             # Other file not committed. Block not written to keep yet.
             self.assertEqual(
@@ -1097,7 +1098,8 @@ class NewCollectionTestCaseWithServers(run_test_server.TestCaseWithServers):
                                      normalize=False,
                                      only_committed=True),
                 '. acbd18db4cc2f85cedef654fccc4a4d8+3 0:0:count.txt 0:3:foo.txt\n')
-        # And now with the file closed...
+            # And now with the file closed...
+            f.flush() # Force block commit
         self.assertEqual(
             c._get_manifest_text(".",
                                  strip=False,
@@ -1105,6 +1107,23 @@ class NewCollectionTestCaseWithServers(run_test_server.TestCaseWithServers):
                                  only_committed=True),
             ". 781e5e245d69b566979b86e28d23f2c7+10 acbd18db4cc2f85cedef654fccc4a4d8+3 0:10:count.txt 10:3:foo.txt\n")
 
+    def test_only_small_blocks_are_packed_together(self):
+        c = Collection()
+        # Write a couple of small files, 
+        f = c.open("count.txt", "w")
+        f.write("0123456789")
+        f.close(flush=False)
+        foo = c.open("foo.txt", "w")
+        foo.write("foo")
+        foo.close(flush=False)
+        # Then, write a big file, it shouldn't be packed with the ones above
+        big = c.open("bigfile.txt", "w")
+        big.write("x" * 1024 * 1024 * 33) # 33 MB > KEEP_BLOCK_SIZE/2
+        big.close(flush=False)
+        self.assertEqual(
+            c.manifest_text("."),
+            '. 2d303c138c118af809f39319e5d507e9+34603008 a8430a058b8fbf408e1931b794dbd6fb+13 0:34603008:bigfile.txt 34603008:10:count.txt 34603018:3:foo.txt\n')
+
 
 class CollectionCreateUpdateTest(run_test_server.TestCaseWithServers):
     MAIN_SERVER = {}
index 43c5b30a1f603fd94b828b3449f71c01b5ba716a..3a16e30e9ec545840b3be592138a0b2aacf34694 100644 (file)
@@ -166,6 +166,10 @@ class Container < ArvadosModel
             uuids: uuid_list)
   end
 
+  def final?
+    [Complete, Cancelled].include?(self.state)
+  end
+
   protected
 
   def fill_field_defaults
@@ -305,7 +309,7 @@ class Container < ArvadosModel
   def handle_completed
     # This container is finished so finalize any associated container requests
     # that are associated with this container.
-    if self.state_changed? and [Complete, Cancelled].include? self.state
+    if self.state_changed? and self.final?
       act_as_system_user do
 
         if self.state == Cancelled
@@ -337,7 +341,7 @@ class Container < ArvadosModel
         # Notify container requests associated with this container
         ContainerRequest.where(container_uuid: uuid,
                                state: ContainerRequest::Committed).each do |cr|
-          cr.container_completed!
+          cr.finalize!
         end
 
         # Try to cancel any outstanding container requests made by this container.
index a588c86451a88c2205472ba2dcc19eaff3dd15d3..696b873bde383ade05993a7d7c33800b70dffe04 100644 (file)
@@ -19,6 +19,7 @@ class ContainerRequest < ArvadosModel
   validate :validate_change
   validate :validate_runtime_constraints
   after_save :update_priority
+  after_save :finalize_if_needed
   before_create :set_requesting_container_uuid
 
   api_accessible :user, extend: :common do |t|
@@ -65,10 +66,19 @@ class ContainerRequest < ArvadosModel
     %w(modified_by_client_uuid container_uuid requesting_container_uuid)
   end
 
+  def finalize_if_needed
+    if state == Committed && Container.find_by_uuid(container_uuid).final?
+      reload
+      act_as_system_user do
+        finalize!
+      end
+    end
+  end
+
   # Finalize the container request after the container has
   # finished/cancelled.
-  def container_completed!
-    update_attributes!(state: ContainerRequest::Final)
+  def finalize!
+    update_attributes!(state: Final)
     c = Container.find_by_uuid(container_uuid)
     ['output', 'log'].each do |out_type|
       pdh = c.send(out_type)
index 3b175742370b93bf983754d06e459115a7180d84..1c5c7ae5cea5a55da5c2af9500e42321cd5811f8 100644 (file)
@@ -481,4 +481,31 @@ class ContainerRequestTest < ActiveSupport::TestCase
     assert_equal prev_container_uuid, cr.container_uuid
   end
 
+  test "Finalize committed request when reusing a finished container" do
+    set_user_from_auth :active
+    cr = create_minimal_req!(priority: 1, state: ContainerRequest::Committed)
+    cr.reload
+    assert_equal ContainerRequest::Committed, cr.state
+    act_as_system_user do
+      c = Container.find_by_uuid(cr.container_uuid)
+      c.update_attributes!(state: Container::Locked)
+      c.update_attributes!(state: Container::Running)
+      c.update_attributes!(state: Container::Complete,
+                           exit_code: 0,
+                           output: '1f4b0bc7583c2a7f9102c395f4ffc5e3+45',
+                           log: 'fa7aeb5140e2848d39b416daeef4ffc5+45')
+    end
+    cr.reload
+    assert_equal ContainerRequest::Final, cr.state
+
+    cr2 = create_minimal_req!(priority: 1, state: ContainerRequest::Committed)
+    assert_equal cr.container_uuid, cr2.container_uuid
+    assert_equal ContainerRequest::Final, cr2.state
+
+    cr3 = create_minimal_req!(priority: 1, state: ContainerRequest::Uncommitted)
+    assert_equal ContainerRequest::Uncommitted, cr3.state
+    cr3.update_attributes!(state: ContainerRequest::Committed)
+    assert_equal cr.container_uuid, cr3.container_uuid
+    assert_equal ContainerRequest::Final, cr3.state
+  end
 end
similarity index 75%
rename from services/arv-git-httpd/arv-git-httpd.service
rename to services/arv-git-httpd/arvados-git-httpd.service
index f71c2ffbb5482ee6b93914ad4c7d824a555325cc..c41a5f3465d61403959a366565a89ec671af236e 100644 (file)
@@ -2,7 +2,7 @@
 Description=Arvados git server
 Documentation=https://doc.arvados.org/
 After=network.target
-AssertPathExists=/etc/arvados/arvados-git-httpd/arvados-git-httpd.yml
+AssertPathExists=/etc/arvados/git-httpd/git-httpd.yml
 
 [Service]
 Type=notify
diff --git a/services/arv-git-httpd/doc.go b/services/arv-git-httpd/doc.go
deleted file mode 100644 (file)
index ff4599d..0000000
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
-arv-git-httpd provides authenticated access to Arvados-hosted git repositories.
-
-See http://doc.arvados.org/install/install-arv-git-httpd.html.
-
-Example:
-
-       arv-git-httpd -address=:8000 -repo-root=/var/lib/arvados/git
-
-Options:
-
-       -address [host]:[port]
-
-Listen at the given host and port.
-
-Host can be a domain name, an IP address, or empty (listen on all
-addresses).
-
-Port can be a name, a port number, or 0 (choose an available port).
-
-       -repo-root path
-
-Directory containing git repositories. When a client requests either
-"foo/bar.git" or "foo/bar/.git", git-http-backend will be invoked on
-"path/foo/bar.git" or (if that doesn't exist) "path/foo/bar/.git".
-
-       -git-command path
-
-Location of the CGI program to execute for each authorized request
-(normally this is gitolite-shell if repositories are controlled by
-gitolite, otherwise git). It is invoked with a single argument,
-'http-backend'.  Default is /usr/bin/git.
-
-*/
-package main
index f0b98fab72382dfa02c2b12a144e2a6b9f5190c4..2caf1294e060c7af7f1323a10a75e837a417f177 100644 (file)
@@ -5,6 +5,7 @@ import (
        "net"
        "net/http"
        "net/http/cgi"
+       "os"
 )
 
 // gitHandler is an http.Handler that invokes git-http-backend (or
@@ -16,21 +17,29 @@ type gitHandler struct {
 }
 
 func newGitHandler() http.Handler {
+       const glBypass = "GL_BYPASS_ACCESS_CHECKS"
+       const glHome = "GITOLITE_HTTP_HOME"
+       var env []string
+       path := os.Getenv("PATH")
+       if theConfig.GitoliteHome != "" {
+               env = append(env,
+                       glHome+"="+theConfig.GitoliteHome,
+                       glBypass+"=1")
+               path = path + ":" + theConfig.GitoliteHome + "/bin"
+       } else if home, bypass := os.Getenv(glHome), os.Getenv(glBypass); home != "" || bypass != "" {
+               env = append(env, glHome+"="+home, glBypass+"="+bypass)
+               log.Printf("DEPRECATED: Passing through %s and %s environment variables. Use GitoliteHome configuration instead.", glHome, glBypass)
+       }
+       env = append(env,
+               "GIT_PROJECT_ROOT="+theConfig.RepoRoot,
+               "GIT_HTTP_EXPORT_ALL=",
+               "SERVER_ADDR="+theConfig.Listen,
+               "PATH="+path)
        return &gitHandler{
                Handler: cgi.Handler{
                        Path: theConfig.GitCommand,
                        Dir:  theConfig.RepoRoot,
-                       Env: []string{
-                               "GIT_PROJECT_ROOT=" + theConfig.RepoRoot,
-                               "GIT_HTTP_EXPORT_ALL=",
-                               "SERVER_ADDR=" + theConfig.Listen,
-                       },
-                       InheritEnv: []string{
-                               "PATH",
-                               // Needed if GitCommand is gitolite-shell:
-                               "GITOLITE_HTTP_HOME",
-                               "GL_BYPASS_ACCESS_CHECKS",
-                       },
+                       Env:  env,
                        Args: []string{"http-backend"},
                },
        }
index d87162dca3aa6f80ac16411c4a138e6286fc40e2..6b08eeecdc303a246cba91873fd5c44a4ddddb4a 100644 (file)
@@ -4,7 +4,6 @@ import (
        "net/http"
        "net/http/httptest"
        "net/url"
-       "os"
        "regexp"
 
        check "gopkg.in/check.v1"
@@ -15,6 +14,10 @@ var _ = check.Suite(&GitHandlerSuite{})
 type GitHandlerSuite struct{}
 
 func (s *GitHandlerSuite) TestEnvVars(c *check.C) {
+       theConfig = defaultConfig()
+       theConfig.RepoRoot = "/"
+       theConfig.GitoliteHome = "/test/ghh"
+
        u, err := url.Parse("git.zzzzz.arvadosapi.com/test")
        c.Check(err, check.Equals, nil)
        resp := httptest.NewRecorder()
@@ -26,15 +29,14 @@ func (s *GitHandlerSuite) TestEnvVars(c *check.C) {
        h := newGitHandler()
        h.(*gitHandler).Path = "/bin/sh"
        h.(*gitHandler).Args = []string{"-c", "printf 'Content-Type: text/plain\r\n\r\n'; env"}
-       os.Setenv("GITOLITE_HTTP_HOME", "/test/ghh")
-       os.Setenv("GL_BYPASS_ACCESS_CHECKS", "yesplease")
 
        h.ServeHTTP(resp, req)
 
        c.Check(resp.Code, check.Equals, http.StatusOK)
        body := resp.Body.String()
+       c.Check(body, check.Matches, `(?ms).*^PATH=.*:/test/ghh/bin$.*`)
        c.Check(body, check.Matches, `(?ms).*^GITOLITE_HTTP_HOME=/test/ghh$.*`)
-       c.Check(body, check.Matches, `(?ms).*^GL_BYPASS_ACCESS_CHECKS=yesplease$.*`)
+       c.Check(body, check.Matches, `(?ms).*^GL_BYPASS_ACCESS_CHECKS=1$.*`)
        c.Check(body, check.Matches, `(?ms).*^REMOTE_HOST=::1$.*`)
        c.Check(body, check.Matches, `(?ms).*^REMOTE_PORT=12345$.*`)
        c.Check(body, check.Matches, `(?ms).*^SERVER_ADDR=`+regexp.QuoteMeta(theConfig.Listen)+`$.*`)
index 74c2b8cf4d91a8ac3da2835b12e90a35e6dd0380..38ff2309c1d2a3a57def17ac588276dab204e265 100644 (file)
@@ -48,9 +48,10 @@ func (s *GitoliteSuite) SetUpTest(c *check.C) {
                        APIHost:  arvadostest.APIHost(),
                        Insecure: true,
                },
-               Listen:     ":0",
-               GitCommand: "/usr/share/gitolite3/gitolite-shell",
-               RepoRoot:   s.tmpRepoRoot,
+               Listen:       ":0",
+               GitCommand:   "/usr/share/gitolite3/gitolite-shell",
+               GitoliteHome: s.gitoliteHome,
+               RepoRoot:     s.tmpRepoRoot,
        }
        s.IntegrationSuite.SetUpTest(c)
 
@@ -58,9 +59,6 @@ func (s *GitoliteSuite) SetUpTest(c *check.C) {
        // (*IntegrationTest)SetUpTest() -- see 2.2.4 at
        // http://gitolite.com/gitolite/gitolite.html
        runGitolite("gitolite", "setup")
-
-       os.Setenv("GITOLITE_HTTP_HOME", s.gitoliteHome)
-       os.Setenv("GL_BYPASS_ACCESS_CHECKS", "1")
 }
 
 func (s *GitoliteSuite) TearDownTest(c *check.C) {
index 5e55eca754838d97d2aaa8888482c686306a42cf..1d252599cdf3078b9924318980d91e031feb687d 100644 (file)
@@ -112,6 +112,8 @@ func (s *IntegrationSuite) TearDownTest(c *check.C) {
        s.tmpWorkdir = ""
 
        s.Config = nil
+
+       theConfig = defaultConfig()
 }
 
 func (s *IntegrationSuite) RunGit(c *check.C, token, gitCmd, repo string, args ...string) error {
index dd281366b29ac886365056ad8c2c4e2250a0d739..3bd7b3a8aa93c5871ff18773188c1336033d2599 100644 (file)
@@ -14,27 +14,24 @@ import (
 
 // Server configuration
 type Config struct {
-       Client     arvados.Client
-       Listen     string
-       GitCommand string
-       RepoRoot   string
+       Client       arvados.Client
+       Listen       string
+       GitCommand   string
+       RepoRoot     string
+       GitoliteHome string
 }
 
 var theConfig = defaultConfig()
 
 func defaultConfig() *Config {
-       cwd, err := os.Getwd()
-       if err != nil {
-               log.Fatalln("Getwd():", err)
-       }
        return &Config{
                Listen:     ":80",
                GitCommand: "/usr/bin/git",
-               RepoRoot:   cwd,
+               RepoRoot:   "/var/lib/arvados/git/repositories",
        }
 }
 
-func init() {
+func main() {
        const defaultCfgPath = "/etc/arvados/git-httpd/git-httpd.yml"
        const deprecated = " (DEPRECATED -- use config file instead)"
        flag.StringVar(&theConfig.Listen, "address", theConfig.Listen,
@@ -43,6 +40,8 @@ func init() {
                "Path to git or gitolite-shell executable. Each authenticated request will execute this program with a single argument, \"http-backend\"."+deprecated)
        flag.StringVar(&theConfig.RepoRoot, "repo-root", theConfig.RepoRoot,
                "Path to git repositories."+deprecated)
+       flag.StringVar(&theConfig.GitoliteHome, "gitolite-home", theConfig.GitoliteHome,
+               "Value for GITOLITE_HTTP_HOME environment variable. If not empty, GL_BYPASS_ACCESS_CHECKS=1 will also be set."+deprecated)
 
        cfgPath := flag.String("config", defaultCfgPath, "Configuration file `path`.")
        flag.Usage = usage
@@ -63,9 +62,7 @@ func init() {
                        log.Print("Current configuration:\n", string(j))
                }
        }
-}
 
-func main() {
        srv := &server{}
        if err := srv.Start(); err != nil {
                log.Fatal(err)
index 666edc01aa54f8a3ae0545809403a69be7850f20..1fb25b92f2f17286c576203f609e20ec82f3b300 100644 (file)
@@ -1,33 +1,40 @@
+// arvados-git-httpd provides authenticated access to Arvados-hosted
+// git repositories.
+//
+// See http://doc.arvados.org/install/install-arv-git-httpd.html.
 package main
 
 import (
-       "encoding/json"
        "flag"
        "fmt"
        "os"
+
+       "github.com/ghodss/yaml"
 )
 
 func usage() {
        c := defaultConfig()
        c.Client.APIHost = "zzzzz.arvadosapi.com:443"
-       exampleConfigFile, err := json.MarshalIndent(c, "    ", "  ")
+       exampleConfigFile, err := yaml.Marshal(c)
        if err != nil {
                panic(err)
        }
        fmt.Fprintf(os.Stderr, `
 
-arv-git-httpd provides authenticated access to Arvados-hosted git repositories.
+arvados-git-httpd provides authenticated access to Arvados-hosted git
+repositories.
 
 See http://doc.arvados.org/install/install-arv-git-httpd.html.
 
-Usage: arv-git-httpd [-config path/to/arv-git-httpd.yml]
+Usage: arvados-git-httpd [-config path/to/arvados/git-httpd.yml]
 
 Options:
 `)
        flag.PrintDefaults()
        fmt.Fprintf(os.Stderr, `
 Example config file:
-    %s
+
+%s
 
 Client.APIHost:
 
@@ -42,21 +49,29 @@ Client.Insecure:
     True if your Arvados API endpoint uses an unverifiable SSL/TLS
     certificate.
 
-Listen:
-
-    Local port to listen on. Can be "address:port" or ":port", where
-    "address" is a host IP address or name and "port" is a port number
-    or name.
-
 GitCommand:
 
     Path to git or gitolite-shell executable. Each authenticated
     request will execute this program with the single argument
     "http-backend".
 
+GitoliteHome:
+
+    Path to Gitolite's home directory. If a non-empty path is given,
+    the CGI environment will be set up to support the use of
+    gitolite-shell as a GitCommand: for example, if GitoliteHome is
+    "/gh", then the CGI environment will have GITOLITE_HTTP_HOME=/gh,
+    PATH=$PATH:/gh/bin, and GL_BYPASS_ACCESS_CHECKS=1.
+
+Listen:
+
+    Local port to listen on. Can be "address:port" or ":port", where
+    "address" is a host IP address or name and "port" is a port number
+    or name.
+
 RepoRoot:
 
-    Path to git repositories. Defaults to current working directory.
+    Path to git repositories.
 
 `, exampleConfigFile)
 }
index fafa3c36073c2408c6265af8afef01bffbe2e44d..61decde61c4bd61d0a92e96bde20ff0c82780f57 100644 (file)
@@ -45,7 +45,11 @@ func (squeue *Squeue) RunSqueue() {
                log.Printf("Error creating stdout pipe for squeue: %v", err)
                return
        }
-       cmd.Start()
+       err = cmd.Start()
+       if err != nil {
+               log.Printf("Error running squeue: %v", err)
+               return
+       }
        scanner := bufio.NewScanner(sq)
        for scanner.Scan() {
                newSqueueContents = append(newSqueueContents, scanner.Text())
index d15f01792a8e8dd4b433611ce20a752c1138c877..3f89732bea25dcd1ca546fbef126227e9e0a9256 100644 (file)
@@ -77,6 +77,8 @@ class ArgumentParser(argparse.ArgumentParser):
         self.add_argument('--file-cache', type=int, help="File data cache size, in bytes (default 256MiB)", default=256*1024*1024)
         self.add_argument('--directory-cache', type=int, help="Directory data cache size, in bytes (default 128MiB)", default=128*1024*1024)
 
+        self.add_argument('--disable-event-listening', action='store_true', help="Don't subscribe to events on the API server", dest="disable_event_listening", default=False)
+
         self.add_argument('--read-only', action='store_false', help="Mount will be read only (default)", dest="enable_write", default=False)
         self.add_argument('--read-write', action='store_true', help="Mount will be read-write", dest="enable_write", default=False)
 
@@ -111,7 +113,7 @@ class Mount(object):
 
     def __enter__(self):
         llfuse.init(self.operations, self.args.mountpoint, self._fuse_options())
-        if self.listen_for_events:
+        if self.listen_for_events and not self.args.disable_event_listening:
             self.operations.listen_for_events()
         self.llfuse_thread = threading.Thread(None, lambda: self._llfuse_main())
         self.llfuse_thread.daemon = True
@@ -330,7 +332,7 @@ From here, the following directories are available:
                 self.daemon_ctx.open()
 
             # Subscribe to change events from API server
-            if self.listen_for_events:
+            if self.listen_for_events and not self.args.disable_event_listening:
                 self.operations.listen_for_events()
 
             self._llfuse_main()
index bb80d0a2fc94dc4c77c0f46f59414a8d00627235..e8488d7ff967179423f3732c8e6e56b05194ed58 100644 (file)
@@ -170,6 +170,20 @@ class MountArgsTest(unittest.TestCase):
                          run_test_server.fixture('users')['active']['uuid'])
         self.assertEqual(True, self.mnt.listen_for_events)
 
+    @noexit
+    @mock.patch('arvados.events.subscribe')
+    def test_disable_event_listening(self, mock_subscribe):
+        args = arvados_fuse.command.ArgumentParser().parse_args([
+            '--disable-event-listening',
+            '--by-id',
+            '--foreground', self.mntdir])
+        self.mnt = arvados_fuse.command.Mount(args)
+        self.assertEqual(True, self.mnt.listen_for_events)
+        self.assertEqual(True, self.mnt.args.disable_event_listening)
+        with self.mnt:
+            pass
+        self.assertEqual(0, mock_subscribe.call_count)
+
     @noexit
     @mock.patch('arvados.events.subscribe')
     def test_custom(self, mock_subscribe):
index 8b6d01969a7819f52975c511f1d7a954aaaa324d..8e4510355d80d996a0f02a730945df20e56611e6 100644 (file)
@@ -1163,6 +1163,7 @@ class TokenExpiryTest(MountTestBase):
     def setUp(self):
         super(TokenExpiryTest, self).setUp(local_store=False)
 
+    @unittest.skip("bug #10008")
     @mock.patch('arvados.keep.KeepClient.get')
     def runTest(self, mocked_get):
         self.api._rootDesc = {"blobSignatureTtl": 2}
index 48cb02647cfd098cdc67796ba992ac5cba327bde..d2163f6b490376768383b260444d6be90a9ca1ed 100644 (file)
@@ -40,41 +40,29 @@ func readKeyFromFile(file string) (string, error) {
 }
 
 type azureVolumeAdder struct {
-       *volumeSet
+       *Config
 }
 
-func (s *azureVolumeAdder) Set(containerName string) error {
-       if trashLifetime != 0 {
-               return ErrNotImplemented
-       }
+// String implements flag.Value
+func (s *azureVolumeAdder) String() string {
+       return "-"
+}
 
-       if containerName == "" {
-               return errors.New("no container name given")
-       }
-       if azureStorageAccountName == "" || azureStorageAccountKeyFile == "" {
-               return errors.New("-azure-storage-account-name and -azure-storage-account-key-file arguments must given before -azure-storage-container-volume")
-       }
-       accountKey, err := readKeyFromFile(azureStorageAccountKeyFile)
-       if err != nil {
-               return err
-       }
-       azClient, err := storage.NewBasicClient(azureStorageAccountName, accountKey)
-       if err != nil {
-               return errors.New("creating Azure storage client: " + err.Error())
-       }
-       if flagSerializeIO {
-               log.Print("Notice: -serialize is not supported by azure-blob-container volumes.")
-       }
-       v := NewAzureBlobVolume(azClient, containerName, flagReadonly, azureStorageReplication)
-       if err := v.Check(); err != nil {
-               return err
-       }
-       *s.volumeSet = append(*s.volumeSet, v)
+func (s *azureVolumeAdder) Set(containerName string) error {
+       s.Config.Volumes = append(s.Config.Volumes, &AzureBlobVolume{
+               ContainerName:         containerName,
+               StorageAccountName:    azureStorageAccountName,
+               StorageAccountKeyFile: azureStorageAccountKeyFile,
+               AzureReplication:      azureStorageReplication,
+               ReadOnly:              deprecated.flagReadonly,
+       })
        return nil
 }
 
 func init() {
-       flag.Var(&azureVolumeAdder{&volumes},
+       VolumeTypes = append(VolumeTypes, func() VolumeWithExamples { return &AzureBlobVolume{} })
+
+       flag.Var(&azureVolumeAdder{theConfig},
                "azure-storage-container-volume",
                "Use the given container as a storage volume. Can be given multiple times.")
        flag.StringVar(
@@ -86,7 +74,7 @@ func init() {
                &azureStorageAccountKeyFile,
                "azure-storage-account-key-file",
                "",
-               "File containing the account key used for subsequent --azure-storage-container-volume arguments.")
+               "`File` containing the account key used for subsequent --azure-storage-container-volume arguments.")
        flag.IntVar(
                &azureStorageReplication,
                "azure-storage-replication",
@@ -102,41 +90,64 @@ func init() {
 // An AzureBlobVolume stores and retrieves blocks in an Azure Blob
 // container.
 type AzureBlobVolume struct {
-       azClient      storage.Client
-       bsClient      storage.BlobStorageClient
-       containerName string
-       readonly      bool
-       replication   int
+       StorageAccountName    string
+       StorageAccountKeyFile string
+       ContainerName         string
+       AzureReplication      int
+       ReadOnly              bool
+
+       azClient storage.Client
+       bsClient storage.BlobStorageClient
 }
 
-// NewAzureBlobVolume returns a new AzureBlobVolume using the given
-// client and container name. The replication argument specifies the
-// replication level to report when writing data.
-func NewAzureBlobVolume(client storage.Client, containerName string, readonly bool, replication int) *AzureBlobVolume {
-       return &AzureBlobVolume{
-               azClient:      client,
-               bsClient:      client.GetBlobService(),
-               containerName: containerName,
-               readonly:      readonly,
-               replication:   replication,
+// Examples implements VolumeWithExamples.
+func (*AzureBlobVolume) Examples() []Volume {
+       return []Volume{
+               &AzureBlobVolume{
+                       StorageAccountName:    "example-account-name",
+                       StorageAccountKeyFile: "/etc/azure_storage_account_key.txt",
+                       ContainerName:         "example-container-name",
+                       AzureReplication:      3,
+               },
        }
 }
 
-// Check returns nil if the volume is usable.
-func (v *AzureBlobVolume) Check() error {
-       ok, err := v.bsClient.ContainerExists(v.containerName)
+// Type implements Volume.
+func (v *AzureBlobVolume) Type() string {
+       return "Azure"
+}
+
+// Start implements Volume.
+func (v *AzureBlobVolume) Start() error {
+       if v.ContainerName == "" {
+               return errors.New("no container name given")
+       }
+       if v.StorageAccountName == "" || v.StorageAccountKeyFile == "" {
+               return errors.New("StorageAccountName and StorageAccountKeyFile must be given")
+       }
+       accountKey, err := readKeyFromFile(v.StorageAccountKeyFile)
+       if err != nil {
+               return err
+       }
+       v.azClient, err = storage.NewBasicClient(v.StorageAccountName, accountKey)
+       if err != nil {
+               return fmt.Errorf("creating Azure storage client: %s", err)
+       }
+       v.bsClient = v.azClient.GetBlobService()
+
+       ok, err := v.bsClient.ContainerExists(v.ContainerName)
        if err != nil {
                return err
        }
        if !ok {
-               return errors.New("container does not exist")
+               return fmt.Errorf("Azure container %q does not exist", v.ContainerName)
        }
        return nil
 }
 
 // Return true if expires_at metadata attribute is found on the block
 func (v *AzureBlobVolume) checkTrashed(loc string) (bool, map[string]string, error) {
-       metadata, err := v.bsClient.GetBlobMetadata(v.containerName, loc)
+       metadata, err := v.bsClient.GetBlobMetadata(v.ContainerName, loc)
        if err != nil {
                return false, metadata, v.translateError(err)
        }
@@ -197,7 +208,7 @@ func (v *AzureBlobVolume) get(loc string, buf []byte) (int, error) {
        if azureMaxGetBytes < BlockSize {
                // Unfortunately the handler doesn't tell us how long the blob
                // is expected to be, so we have to ask Azure.
-               props, err := v.bsClient.GetBlobProperties(v.containerName, loc)
+               props, err := v.bsClient.GetBlobProperties(v.ContainerName, loc)
                if err != nil {
                        return 0, v.translateError(err)
                }
@@ -228,9 +239,9 @@ func (v *AzureBlobVolume) get(loc string, buf []byte) (int, error) {
                        var rdr io.ReadCloser
                        var err error
                        if startPos == 0 && endPos == expectSize {
-                               rdr, err = v.bsClient.GetBlob(v.containerName, loc)
+                               rdr, err = v.bsClient.GetBlob(v.ContainerName, loc)
                        } else {
-                               rdr, err = v.bsClient.GetBlobRange(v.containerName, loc, fmt.Sprintf("%d-%d", startPos, endPos-1), nil)
+                               rdr, err = v.bsClient.GetBlobRange(v.ContainerName, loc, fmt.Sprintf("%d-%d", startPos, endPos-1), nil)
                        }
                        if err != nil {
                                errors[p] = err
@@ -268,7 +279,7 @@ func (v *AzureBlobVolume) Compare(loc string, expect []byte) error {
        if trashed {
                return os.ErrNotExist
        }
-       rdr, err := v.bsClient.GetBlob(v.containerName, loc)
+       rdr, err := v.bsClient.GetBlob(v.ContainerName, loc)
        if err != nil {
                return v.translateError(err)
        }
@@ -278,15 +289,15 @@ func (v *AzureBlobVolume) Compare(loc string, expect []byte) error {
 
 // Put stores a Keep block as a block blob in the container.
 func (v *AzureBlobVolume) Put(loc string, block []byte) error {
-       if v.readonly {
+       if v.ReadOnly {
                return MethodDisabledError
        }
-       return v.bsClient.CreateBlockBlobFromReader(v.containerName, loc, uint64(len(block)), bytes.NewReader(block), nil)
+       return v.bsClient.CreateBlockBlobFromReader(v.ContainerName, loc, uint64(len(block)), bytes.NewReader(block), nil)
 }
 
 // Touch updates the last-modified property of a block blob.
 func (v *AzureBlobVolume) Touch(loc string) error {
-       if v.readonly {
+       if v.ReadOnly {
                return MethodDisabledError
        }
        trashed, metadata, err := v.checkTrashed(loc)
@@ -298,7 +309,7 @@ func (v *AzureBlobVolume) Touch(loc string) error {
        }
 
        metadata["touch"] = fmt.Sprintf("%d", time.Now())
-       return v.bsClient.SetBlobMetadata(v.containerName, loc, metadata, nil)
+       return v.bsClient.SetBlobMetadata(v.ContainerName, loc, metadata, nil)
 }
 
 // Mtime returns the last-modified property of a block blob.
@@ -311,7 +322,7 @@ func (v *AzureBlobVolume) Mtime(loc string) (time.Time, error) {
                return time.Time{}, os.ErrNotExist
        }
 
-       props, err := v.bsClient.GetBlobProperties(v.containerName, loc)
+       props, err := v.bsClient.GetBlobProperties(v.ContainerName, loc)
        if err != nil {
                return time.Time{}, err
        }
@@ -326,7 +337,7 @@ func (v *AzureBlobVolume) IndexTo(prefix string, writer io.Writer) error {
                Include: "metadata",
        }
        for {
-               resp, err := v.bsClient.ListBlobs(v.containerName, params)
+               resp, err := v.bsClient.ListBlobs(v.ContainerName, params)
                if err != nil {
                        return err
                }
@@ -361,7 +372,7 @@ func (v *AzureBlobVolume) IndexTo(prefix string, writer io.Writer) error {
 
 // Trash a Keep block.
 func (v *AzureBlobVolume) Trash(loc string) error {
-       if v.readonly {
+       if v.ReadOnly {
                return MethodDisabledError
        }
 
@@ -370,26 +381,26 @@ func (v *AzureBlobVolume) Trash(loc string) error {
        // we get the Etag before checking Mtime, and use If-Match to
        // ensure we don't delete data if Put() or Touch() happens
        // between our calls to Mtime() and DeleteBlob().
-       props, err := v.bsClient.GetBlobProperties(v.containerName, loc)
+       props, err := v.bsClient.GetBlobProperties(v.ContainerName, loc)
        if err != nil {
                return err
        }
        if t, err := v.Mtime(loc); err != nil {
                return err
-       } else if time.Since(t) < blobSignatureTTL {
+       } else if time.Since(t) < theConfig.BlobSignatureTTL.Duration() {
                return nil
        }
 
-       // If trashLifetime == 0, just delete it
-       if trashLifetime == 0 {
-               return v.bsClient.DeleteBlob(v.containerName, loc, map[string]string{
+       // If TrashLifetime == 0, just delete it
+       if theConfig.TrashLifetime == 0 {
+               return v.bsClient.DeleteBlob(v.ContainerName, loc, map[string]string{
                        "If-Match": props.Etag,
                })
        }
 
        // Otherwise, mark as trash
-       return v.bsClient.SetBlobMetadata(v.containerName, loc, map[string]string{
-               "expires_at": fmt.Sprintf("%d", time.Now().Add(trashLifetime).Unix()),
+       return v.bsClient.SetBlobMetadata(v.ContainerName, loc, map[string]string{
+               "expires_at": fmt.Sprintf("%d", time.Now().Add(theConfig.TrashLifetime.Duration()).Unix()),
        }, map[string]string{
                "If-Match": props.Etag,
        })
@@ -399,7 +410,7 @@ func (v *AzureBlobVolume) Trash(loc string) error {
 // Delete the expires_at metadata attribute
 func (v *AzureBlobVolume) Untrash(loc string) error {
        // if expires_at does not exist, return NotFoundError
-       metadata, err := v.bsClient.GetBlobMetadata(v.containerName, loc)
+       metadata, err := v.bsClient.GetBlobMetadata(v.ContainerName, loc)
        if err != nil {
                return v.translateError(err)
        }
@@ -409,7 +420,7 @@ func (v *AzureBlobVolume) Untrash(loc string) error {
 
        // reset expires_at metadata attribute
        metadata["expires_at"] = ""
-       err = v.bsClient.SetBlobMetadata(v.containerName, loc, metadata, nil)
+       err = v.bsClient.SetBlobMetadata(v.ContainerName, loc, metadata, nil)
        return v.translateError(err)
 }
 
@@ -424,19 +435,19 @@ func (v *AzureBlobVolume) Status() *VolumeStatus {
 
 // String returns a volume label, including the container name.
 func (v *AzureBlobVolume) String() string {
-       return fmt.Sprintf("azure-storage-container:%+q", v.containerName)
+       return fmt.Sprintf("azure-storage-container:%+q", v.ContainerName)
 }
 
 // Writable returns true, unless the -readonly flag was on when the
 // volume was added.
 func (v *AzureBlobVolume) Writable() bool {
-       return !v.readonly
+       return !v.ReadOnly
 }
 
 // Replication returns the replication level of the container, as
 // specified by the -azure-storage-replication argument.
 func (v *AzureBlobVolume) Replication() int {
-       return v.replication
+       return v.AzureReplication
 }
 
 // If possible, translate an Azure SDK error to a recognizable error
@@ -459,7 +470,7 @@ func (v *AzureBlobVolume) isKeepBlock(s string) bool {
        return keepBlockRegexp.MatchString(s)
 }
 
-// EmptyTrash looks for trashed blocks that exceeded trashLifetime
+// EmptyTrash looks for trashed blocks that exceeded TrashLifetime
 // and deletes them from the volume.
 func (v *AzureBlobVolume) EmptyTrash() {
        var bytesDeleted, bytesInTrash int64
@@ -467,7 +478,7 @@ func (v *AzureBlobVolume) EmptyTrash() {
        params := storage.ListBlobsParameters{Include: "metadata"}
 
        for {
-               resp, err := v.bsClient.ListBlobs(v.containerName, params)
+               resp, err := v.bsClient.ListBlobs(v.ContainerName, params)
                if err != nil {
                        log.Printf("EmptyTrash: ListBlobs: %v", err)
                        break
@@ -491,7 +502,7 @@ func (v *AzureBlobVolume) EmptyTrash() {
                                continue
                        }
 
-                       err = v.bsClient.DeleteBlob(v.containerName, b.Name, map[string]string{
+                       err = v.bsClient.DeleteBlob(v.ContainerName, b.Name, map[string]string{
                                "If-Match": b.Properties.Etag,
                        })
                        if err != nil {
index 5d556b3e8c40eb242addf53f4996c49eb396138f..c8c898fe2da3957e3efdf069c5370e146ac5d693 100644 (file)
@@ -365,7 +365,13 @@ func NewTestableAzureBlobVolume(t TB, readonly bool, replication int) *TestableA
                }
        }
 
-       v := NewAzureBlobVolume(azClient, container, readonly, replication)
+       v := &AzureBlobVolume{
+               ContainerName:    container,
+               ReadOnly:         readonly,
+               AzureReplication: replication,
+               azClient:         azClient,
+               bsClient:         azClient.GetBlobService(),
+       }
 
        return &TestableAzureBlobVolume{
                AzureBlobVolume: v,
@@ -570,11 +576,11 @@ func TestAzureBlobVolumeCreateBlobRaceDeadline(t *testing.T) {
 }
 
 func (v *TestableAzureBlobVolume) PutRaw(locator string, data []byte) {
-       v.azHandler.PutRaw(v.containerName, locator, data)
+       v.azHandler.PutRaw(v.ContainerName, locator, data)
 }
 
 func (v *TestableAzureBlobVolume) TouchWithDate(locator string, lastPut time.Time) {
-       v.azHandler.TouchWithDate(v.containerName, locator, lastPut)
+       v.azHandler.TouchWithDate(v.ContainerName, locator, lastPut)
 }
 
 func (v *TestableAzureBlobVolume) Teardown() {
index 7b51b643cea54ba0b5643a2bf84c4ccfed9bcd5f..bce82377b5d0c2a0d56cb8fa5011c7944ad6a5ea 100644 (file)
@@ -12,12 +12,12 @@ type BufferPoolSuite struct{}
 // Initialize a default-sized buffer pool for the benefit of test
 // suites that don't run main().
 func init() {
-       bufs = newBufferPool(maxBuffers, BlockSize)
+       bufs = newBufferPool(theConfig.MaxBuffers, BlockSize)
 }
 
 // Restore sane default after bufferpool's own tests
 func (s *BufferPoolSuite) TearDownTest(c *C) {
-       bufs = newBufferPool(maxBuffers, BlockSize)
+       bufs = newBufferPool(theConfig.MaxBuffers, BlockSize)
 }
 
 func (s *BufferPoolSuite) TestBufferPoolBufSize(c *C) {
diff --git a/services/keepstore/config.go b/services/keepstore/config.go
new file mode 100644 (file)
index 0000000..9c318d1
--- /dev/null
@@ -0,0 +1,179 @@
+package main
+
+import (
+       "bytes"
+       "encoding/json"
+       "fmt"
+       "io/ioutil"
+       "log"
+       "strings"
+       "time"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+type Config struct {
+       Listen string
+
+       PIDFile string
+
+       MaxBuffers  int
+       MaxRequests int
+
+       BlobSignatureTTL    arvados.Duration
+       BlobSigningKeyFile  string
+       RequireSignatures   bool
+       SystemAuthTokenFile string
+       EnableDelete        bool
+       TrashLifetime       arvados.Duration
+       TrashCheckInterval  arvados.Duration
+
+       Volumes VolumeList
+
+       blobSigningKey  []byte
+       systemAuthToken string
+}
+
+var theConfig = DefaultConfig()
+
+// DefaultConfig returns the default configuration.
+func DefaultConfig() *Config {
+       return &Config{
+               Listen:             ":25107",
+               MaxBuffers:         128,
+               RequireSignatures:  true,
+               BlobSignatureTTL:   arvados.Duration(14 * 24 * time.Hour),
+               TrashLifetime:      arvados.Duration(14 * 24 * time.Hour),
+               TrashCheckInterval: arvados.Duration(24 * time.Hour),
+               Volumes:            []Volume{},
+       }
+}
+
+// Start should be called exactly once: after setting all public
+// fields, and before using the config.
+func (cfg *Config) Start() error {
+       if cfg.MaxBuffers < 0 {
+               return fmt.Errorf("MaxBuffers must be greater than zero")
+       }
+       bufs = newBufferPool(cfg.MaxBuffers, BlockSize)
+
+       if cfg.MaxRequests < 1 {
+               cfg.MaxRequests = cfg.MaxBuffers * 2
+               log.Printf("MaxRequests <1 or not specified; defaulting to MaxBuffers * 2 == %d", cfg.MaxRequests)
+       }
+
+       if cfg.BlobSigningKeyFile != "" {
+               buf, err := ioutil.ReadFile(cfg.BlobSigningKeyFile)
+               if err != nil {
+                       return fmt.Errorf("reading blob signing key file: %s", err)
+               }
+               cfg.blobSigningKey = bytes.TrimSpace(buf)
+               if len(cfg.blobSigningKey) == 0 {
+                       return fmt.Errorf("blob signing key file %q is empty", cfg.BlobSigningKeyFile)
+               }
+       } else if cfg.RequireSignatures {
+               return fmt.Errorf("cannot enable RequireSignatures (-enforce-permissions) without a blob signing key")
+       } else {
+               log.Println("Running without a blob signing key. Block locators " +
+                       "returned by this server will not be signed, and will be rejected " +
+                       "by a server that enforces permissions.")
+               log.Println("To fix this, use the BlobSigningKeyFile config entry.")
+       }
+
+       if fn := cfg.SystemAuthTokenFile; fn != "" {
+               buf, err := ioutil.ReadFile(fn)
+               if err != nil {
+                       return fmt.Errorf("cannot read system auth token file %q: %s", fn, err)
+               }
+               cfg.systemAuthToken = strings.TrimSpace(string(buf))
+       }
+
+       if cfg.EnableDelete {
+               log.Print("Trash/delete features are enabled. WARNING: this has not " +
+                       "been extensively tested. You should disable this unless you can afford to lose data.")
+       }
+
+       if len(cfg.Volumes) == 0 {
+               if (&unixVolumeAdder{cfg}).Discover() == 0 {
+                       return fmt.Errorf("no volumes found")
+               }
+       }
+       for _, v := range cfg.Volumes {
+               if err := v.Start(); err != nil {
+                       return fmt.Errorf("volume %s: %s", v, err)
+               }
+               log.Printf("Using volume %v (writable=%v)", v, v.Writable())
+       }
+       return nil
+}
+
+// VolumeTypes is built up by init() funcs in the source files that
+// define the volume types.
+var VolumeTypes = []func() VolumeWithExamples{}
+
+type VolumeList []Volume
+
+// UnmarshalJSON, given an array of objects, deserializes each object
+// as the volume type indicated by the object's Type field.
+func (vols *VolumeList) UnmarshalJSON(data []byte) error {
+       typeMap := map[string]func() VolumeWithExamples{}
+       for _, factory := range VolumeTypes {
+               t := factory().Type()
+               if _, ok := typeMap[t]; ok {
+                       log.Fatal("volume type %+q is claimed by multiple VolumeTypes")
+               }
+               typeMap[t] = factory
+       }
+
+       var mapList []map[string]interface{}
+       err := json.Unmarshal(data, &mapList)
+       if err != nil {
+               return err
+       }
+       for _, mapIn := range mapList {
+               typeIn, ok := mapIn["Type"].(string)
+               if !ok {
+                       return fmt.Errorf("invalid volume type %+v", mapIn["Type"])
+               }
+               factory, ok := typeMap[typeIn]
+               if !ok {
+                       return fmt.Errorf("unsupported volume type %+q", typeIn)
+               }
+               data, err := json.Marshal(mapIn)
+               if err != nil {
+                       return err
+               }
+               vol := factory()
+               err = json.Unmarshal(data, vol)
+               if err != nil {
+                       return err
+               }
+               *vols = append(*vols, vol)
+       }
+       return nil
+}
+
+// MarshalJSON adds a "Type" field to each volume corresponding to its
+// Type().
+func (vl *VolumeList) MarshalJSON() ([]byte, error) {
+       data := []byte{'['}
+       for _, vs := range *vl {
+               j, err := json.Marshal(vs)
+               if err != nil {
+                       return nil, err
+               }
+               if len(data) > 1 {
+                       data = append(data, byte(','))
+               }
+               t, err := json.Marshal(vs.Type())
+               if err != nil {
+                       panic(err)
+               }
+               data = append(data, j[0])
+               data = append(data, []byte(`"Type":`)...)
+               data = append(data, t...)
+               data = append(data, byte(','))
+               data = append(data, j[1:]...)
+       }
+       return append(data, byte(']')), nil
+}
diff --git a/services/keepstore/deprecated.go b/services/keepstore/deprecated.go
new file mode 100644 (file)
index 0000000..7caa6b5
--- /dev/null
@@ -0,0 +1,43 @@
+package main
+
+import (
+       "flag"
+       "time"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+type deprecatedOptions struct {
+       flagSerializeIO     bool
+       flagReadonly        bool
+       neverDelete         bool
+       signatureTTLSeconds int
+}
+
+var deprecated = deprecatedOptions{
+       neverDelete:         !theConfig.EnableDelete,
+       signatureTTLSeconds: int(theConfig.BlobSignatureTTL.Duration() / time.Second),
+}
+
+func (depr *deprecatedOptions) beforeFlagParse(cfg *Config) {
+       flag.StringVar(&cfg.Listen, "listen", cfg.Listen, "see Listen configuration")
+       flag.IntVar(&cfg.MaxBuffers, "max-buffers", cfg.MaxBuffers, "see MaxBuffers configuration")
+       flag.IntVar(&cfg.MaxRequests, "max-requests", cfg.MaxRequests, "see MaxRequests configuration")
+       flag.BoolVar(&depr.neverDelete, "never-delete", depr.neverDelete, "see EnableDelete configuration")
+       flag.BoolVar(&cfg.RequireSignatures, "enforce-permissions", cfg.RequireSignatures, "see RequireSignatures configuration")
+       flag.StringVar(&cfg.BlobSigningKeyFile, "permission-key-file", cfg.BlobSigningKeyFile, "see BlobSigningKey`File` configuration")
+       flag.StringVar(&cfg.BlobSigningKeyFile, "blob-signing-key-file", cfg.BlobSigningKeyFile, "see BlobSigningKey`File` configuration")
+       flag.StringVar(&cfg.SystemAuthTokenFile, "data-manager-token-file", cfg.SystemAuthTokenFile, "see SystemAuthToken`File` configuration")
+       flag.IntVar(&depr.signatureTTLSeconds, "permission-ttl", depr.signatureTTLSeconds, "signature TTL in seconds; see BlobSignatureTTL configuration")
+       flag.IntVar(&depr.signatureTTLSeconds, "blob-signature-ttl", depr.signatureTTLSeconds, "signature TTL in seconds; see BlobSignatureTTL configuration")
+       flag.Var(&cfg.TrashLifetime, "trash-lifetime", "see TrashLifetime configuration")
+       flag.BoolVar(&depr.flagSerializeIO, "serialize", depr.flagSerializeIO, "serialize read and write operations on the following volumes.")
+       flag.BoolVar(&depr.flagReadonly, "readonly", depr.flagReadonly, "do not write, delete, or touch anything on the following volumes.")
+       flag.StringVar(&cfg.PIDFile, "pid", cfg.PIDFile, "see `PIDFile` configuration")
+       flag.Var(&cfg.TrashCheckInterval, "trash-check-interval", "see TrashCheckInterval configuration")
+}
+
+func (depr *deprecatedOptions) afterFlagParse(cfg *Config) {
+       cfg.BlobSignatureTTL = arvados.Duration(depr.signatureTTLSeconds) * arvados.Duration(time.Second)
+       cfg.EnableDelete = !depr.neverDelete
+}
index 7c17424ba568227790469e4e32867f33fea8ff4e..dc9bcb117f0508e748a97ff3cb2a736aa5c00178 100644 (file)
@@ -20,6 +20,8 @@ import (
        "strings"
        "testing"
        "time"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
 )
 
 // A RequestTester represents the parameters for an HTTP request to
@@ -52,13 +54,13 @@ func TestGetHandler(t *testing.T) {
 
        // Create locators for testing.
        // Turn on permission settings so we can generate signed locators.
-       enforcePermissions = true
-       PermissionSecret = []byte(knownKey)
-       blobSignatureTTL = 300 * time.Second
+       theConfig.RequireSignatures = true
+       theConfig.blobSigningKey = []byte(knownKey)
+       theConfig.BlobSignatureTTL.Set("5m")
 
        var (
                unsignedLocator  = "/" + TestHash
-               validTimestamp   = time.Now().Add(blobSignatureTTL)
+               validTimestamp   = time.Now().Add(theConfig.BlobSignatureTTL.Duration())
                expiredTimestamp = time.Now().Add(-time.Hour)
                signedLocator    = "/" + SignLocator(TestHash, knownToken, validTimestamp)
                expiredLocator   = "/" + SignLocator(TestHash, knownToken, expiredTimestamp)
@@ -66,7 +68,7 @@ func TestGetHandler(t *testing.T) {
 
        // -----------------
        // Test unauthenticated request with permissions off.
-       enforcePermissions = false
+       theConfig.RequireSignatures = false
 
        // Unauthenticated request, unsigned locator
        // => OK
@@ -90,7 +92,7 @@ func TestGetHandler(t *testing.T) {
 
        // ----------------
        // Permissions: on.
-       enforcePermissions = true
+       theConfig.RequireSignatures = true
 
        // Authenticated request, signed locator
        // => OK
@@ -175,8 +177,8 @@ func TestPutHandler(t *testing.T) {
        // ------------------
        // With a server key.
 
-       PermissionSecret = []byte(knownKey)
-       blobSignatureTTL = 300 * time.Second
+       theConfig.blobSigningKey = []byte(knownKey)
+       theConfig.BlobSignatureTTL.Set("5m")
 
        // When a permission key is available, the locator returned
        // from an authenticated PUT request will be signed.
@@ -220,7 +222,7 @@ func TestPutHandler(t *testing.T) {
 
 func TestPutAndDeleteSkipReadonlyVolumes(t *testing.T) {
        defer teardown()
-       dataManagerToken = "fake-data-manager-token"
+       theConfig.systemAuthToken = "fake-data-manager-token"
        vols := []*MockVolume{CreateMockVolume(), CreateMockVolume()}
        vols[0].Readonly = true
        KeepVM = MakeRRVolumeManager([]Volume{vols[0], vols[1]})
@@ -232,15 +234,15 @@ func TestPutAndDeleteSkipReadonlyVolumes(t *testing.T) {
                        requestBody: TestBlock,
                })
        defer func(orig bool) {
-               neverDelete = orig
-       }(neverDelete)
-       neverDelete = false
+               theConfig.EnableDelete = orig
+       }(theConfig.EnableDelete)
+       theConfig.EnableDelete = true
        IssueRequest(
                &RequestTester{
                        method:      "DELETE",
                        uri:         "/" + TestHash,
                        requestBody: TestBlock,
-                       apiToken:    dataManagerToken,
+                       apiToken:    theConfig.systemAuthToken,
                })
        type expect struct {
                volnum    int
@@ -274,7 +276,7 @@ func TestPutAndDeleteSkipReadonlyVolumes(t *testing.T) {
 //   - authenticated   /index/prefix request | superuser
 //
 // The only /index requests that should succeed are those issued by the
-// superuser. They should pass regardless of the value of enforcePermissions.
+// superuser. They should pass regardless of the value of RequireSignatures.
 //
 func TestIndexHandler(t *testing.T) {
        defer teardown()
@@ -291,7 +293,7 @@ func TestIndexHandler(t *testing.T) {
        vols[0].Put(TestHash+".meta", []byte("metadata"))
        vols[1].Put(TestHash2+".meta", []byte("metadata"))
 
-       dataManagerToken = "DATA MANAGER TOKEN"
+       theConfig.systemAuthToken = "DATA MANAGER TOKEN"
 
        unauthenticatedReq := &RequestTester{
                method: "GET",
@@ -305,7 +307,7 @@ func TestIndexHandler(t *testing.T) {
        superuserReq := &RequestTester{
                method:   "GET",
                uri:      "/index",
-               apiToken: dataManagerToken,
+               apiToken: theConfig.systemAuthToken,
        }
        unauthPrefixReq := &RequestTester{
                method: "GET",
@@ -319,32 +321,32 @@ func TestIndexHandler(t *testing.T) {
        superuserPrefixReq := &RequestTester{
                method:   "GET",
                uri:      "/index/" + TestHash[0:3],
-               apiToken: dataManagerToken,
+               apiToken: theConfig.systemAuthToken,
        }
        superuserNoSuchPrefixReq := &RequestTester{
                method:   "GET",
                uri:      "/index/abcd",
-               apiToken: dataManagerToken,
+               apiToken: theConfig.systemAuthToken,
        }
        superuserInvalidPrefixReq := &RequestTester{
                method:   "GET",
                uri:      "/index/xyz",
-               apiToken: dataManagerToken,
+               apiToken: theConfig.systemAuthToken,
        }
 
        // -------------------------------------------------------------
        // Only the superuser should be allowed to issue /index requests.
 
        // ---------------------------
-       // enforcePermissions enabled
+       // RequireSignatures enabled
        // This setting should not affect tests passing.
-       enforcePermissions = true
+       theConfig.RequireSignatures = true
 
        // unauthenticated /index request
        // => UnauthorizedError
        response := IssueRequest(unauthenticatedReq)
        ExpectStatusCode(t,
-               "enforcePermissions on, unauthenticated request",
+               "RequireSignatures on, unauthenticated request",
                UnauthorizedError.HTTPCode,
                response)
 
@@ -381,9 +383,9 @@ func TestIndexHandler(t *testing.T) {
                response)
 
        // ----------------------------
-       // enforcePermissions disabled
+       // RequireSignatures disabled
        // Valid Request should still pass.
-       enforcePermissions = false
+       theConfig.RequireSignatures = false
 
        // superuser /index request
        // => OK
@@ -477,15 +479,15 @@ func TestDeleteHandler(t *testing.T) {
        vols := KeepVM.AllWritable()
        vols[0].Put(TestHash, TestBlock)
 
-       // Explicitly set the blobSignatureTTL to 0 for these
+       // Explicitly set the BlobSignatureTTL to 0 for these
        // tests, to ensure the MockVolume deletes the blocks
        // even though they have just been created.
-       blobSignatureTTL = time.Duration(0)
+       theConfig.BlobSignatureTTL = arvados.Duration(0)
 
        var userToken = "NOT DATA MANAGER TOKEN"
-       dataManagerToken = "DATA MANAGER TOKEN"
+       theConfig.systemAuthToken = "DATA MANAGER TOKEN"
 
-       neverDelete = false
+       theConfig.EnableDelete = true
 
        unauthReq := &RequestTester{
                method: "DELETE",
@@ -501,13 +503,13 @@ func TestDeleteHandler(t *testing.T) {
        superuserExistingBlockReq := &RequestTester{
                method:   "DELETE",
                uri:      "/" + TestHash,
-               apiToken: dataManagerToken,
+               apiToken: theConfig.systemAuthToken,
        }
 
        superuserNonexistentBlockReq := &RequestTester{
                method:   "DELETE",
                uri:      "/" + TestHash2,
-               apiToken: dataManagerToken,
+               apiToken: theConfig.systemAuthToken,
        }
 
        // Unauthenticated request returns PermissionError.
@@ -538,14 +540,14 @@ func TestDeleteHandler(t *testing.T) {
                http.StatusNotFound,
                response)
 
-       // Authenticated admin request for existing block while neverDelete is set.
-       neverDelete = true
+       // Authenticated admin request for existing block while EnableDelete is false.
+       theConfig.EnableDelete = false
        response = IssueRequest(superuserExistingBlockReq)
        ExpectStatusCode(t,
                "authenticated request, existing block, method disabled",
                MethodDisabledError.HTTPCode,
                response)
-       neverDelete = false
+       theConfig.EnableDelete = true
 
        // Authenticated admin request for existing block.
        response = IssueRequest(superuserExistingBlockReq)
@@ -568,10 +570,10 @@ func TestDeleteHandler(t *testing.T) {
                t.Error("superuserExistingBlockReq: block not deleted")
        }
 
-       // A DELETE request on a block newer than blobSignatureTTL
+       // A DELETE request on a block newer than BlobSignatureTTL
        // should return success but leave the block on the volume.
        vols[0].Put(TestHash, TestBlock)
-       blobSignatureTTL = time.Hour
+       theConfig.BlobSignatureTTL = arvados.Duration(time.Hour)
 
        response = IssueRequest(superuserExistingBlockReq)
        ExpectStatusCode(t,
@@ -623,7 +625,7 @@ func TestPullHandler(t *testing.T) {
        defer teardown()
 
        var userToken = "USER TOKEN"
-       dataManagerToken = "DATA MANAGER TOKEN"
+       theConfig.systemAuthToken = "DATA MANAGER TOKEN"
 
        pullq = NewWorkQueue()
 
@@ -668,13 +670,13 @@ func TestPullHandler(t *testing.T) {
                },
                {
                        "Valid pull request from the data manager",
-                       RequestTester{"/pull", dataManagerToken, "PUT", goodJSON},
+                       RequestTester{"/pull", theConfig.systemAuthToken, "PUT", goodJSON},
                        http.StatusOK,
                        "Received 3 pull requests\n",
                },
                {
                        "Invalid pull request from the data manager",
-                       RequestTester{"/pull", dataManagerToken, "PUT", badJSON},
+                       RequestTester{"/pull", theConfig.systemAuthToken, "PUT", badJSON},
                        http.StatusBadRequest,
                        "",
                },
@@ -729,7 +731,7 @@ func TestTrashHandler(t *testing.T) {
        defer teardown()
 
        var userToken = "USER TOKEN"
-       dataManagerToken = "DATA MANAGER TOKEN"
+       theConfig.systemAuthToken = "DATA MANAGER TOKEN"
 
        trashq = NewWorkQueue()
 
@@ -772,13 +774,13 @@ func TestTrashHandler(t *testing.T) {
                },
                {
                        "Valid trash list from the data manager",
-                       RequestTester{"/trash", dataManagerToken, "PUT", goodJSON},
+                       RequestTester{"/trash", theConfig.systemAuthToken, "PUT", goodJSON},
                        http.StatusOK,
                        "Received 3 trash requests\n",
                },
                {
                        "Invalid trash list from the data manager",
-                       RequestTester{"/trash", dataManagerToken, "PUT", badJSON},
+                       RequestTester{"/trash", theConfig.systemAuthToken, "PUT", badJSON},
                        http.StatusBadRequest,
                        "",
                },
@@ -873,7 +875,7 @@ func TestPutNeedsOnlyOneBuffer(t *testing.T) {
        select {
        case <-ok:
        case <-time.After(time.Second):
-               t.Fatal("PUT deadlocks with maxBuffers==1")
+               t.Fatal("PUT deadlocks with MaxBuffers==1")
        }
 }
 
@@ -888,7 +890,7 @@ func TestPutHandlerNoBufferleak(t *testing.T) {
 
        ok := make(chan bool)
        go func() {
-               for i := 0; i < maxBuffers+1; i++ {
+               for i := 0; i < theConfig.MaxBuffers+1; i++ {
                        // Unauthenticated request, no server key
                        // => OK (unsigned response)
                        unsignedLocator := "/" + TestHash
@@ -925,9 +927,9 @@ func (r *notifyingResponseRecorder) CloseNotify() <-chan bool {
 
 func TestGetHandlerClientDisconnect(t *testing.T) {
        defer func(was bool) {
-               enforcePermissions = was
-       }(enforcePermissions)
-       enforcePermissions = false
+               theConfig.RequireSignatures = was
+       }(theConfig.RequireSignatures)
+       theConfig.RequireSignatures = false
 
        defer func(orig *bufferPool) {
                bufs = orig
@@ -975,7 +977,7 @@ func TestGetHandlerClientDisconnect(t *testing.T) {
 
 // Invoke the GetBlockHandler a bunch of times to test for bufferpool resource
 // leak.
-func TestGetHandlerNoBufferleak(t *testing.T) {
+func TestGetHandlerNoBufferLeak(t *testing.T) {
        defer teardown()
 
        // Prepare two test Keep volumes. Our block is stored on the second volume.
@@ -989,7 +991,7 @@ func TestGetHandlerNoBufferleak(t *testing.T) {
 
        ok := make(chan bool)
        go func() {
-               for i := 0; i < maxBuffers+1; i++ {
+               for i := 0; i < theConfig.MaxBuffers+1; i++ {
                        // Unauthenticated request, unsigned locator
                        // => OK
                        unsignedLocator := "/" + TestHash
@@ -1040,7 +1042,7 @@ func TestUntrashHandler(t *testing.T) {
        vols := KeepVM.AllWritable()
        vols[0].Put(TestHash, TestBlock)
 
-       dataManagerToken = "DATA MANAGER TOKEN"
+       theConfig.systemAuthToken = "DATA MANAGER TOKEN"
 
        // unauthenticatedReq => UnauthorizedError
        unauthenticatedReq := &RequestTester{
@@ -1070,7 +1072,7 @@ func TestUntrashHandler(t *testing.T) {
        datamanagerWithBadHashReq := &RequestTester{
                method:   "PUT",
                uri:      "/untrash/thisisnotalocator",
-               apiToken: dataManagerToken,
+               apiToken: theConfig.systemAuthToken,
        }
        response = IssueRequest(datamanagerWithBadHashReq)
        ExpectStatusCode(t,
@@ -1082,7 +1084,7 @@ func TestUntrashHandler(t *testing.T) {
        datamanagerWrongMethodReq := &RequestTester{
                method:   "GET",
                uri:      "/untrash/" + TestHash,
-               apiToken: dataManagerToken,
+               apiToken: theConfig.systemAuthToken,
        }
        response = IssueRequest(datamanagerWrongMethodReq)
        ExpectStatusCode(t,
@@ -1094,7 +1096,7 @@ func TestUntrashHandler(t *testing.T) {
        datamanagerReq := &RequestTester{
                method:   "PUT",
                uri:      "/untrash/" + TestHash,
-               apiToken: dataManagerToken,
+               apiToken: theConfig.systemAuthToken,
        }
        response = IssueRequest(datamanagerReq)
        ExpectStatusCode(t,
@@ -1119,13 +1121,13 @@ func TestUntrashHandlerWithNoWritableVolumes(t *testing.T) {
        KeepVM = MakeRRVolumeManager([]Volume{vols[0], vols[1]})
        defer KeepVM.Close()
 
-       dataManagerToken = "DATA MANAGER TOKEN"
+       theConfig.systemAuthToken = "DATA MANAGER TOKEN"
 
        // datamanagerReq => StatusOK
        datamanagerReq := &RequestTester{
                method:   "PUT",
                uri:      "/untrash/" + TestHash,
-               apiToken: dataManagerToken,
+               apiToken: theConfig.systemAuthToken,
        }
        response := IssueRequest(datamanagerReq)
        ExpectStatusCode(t,
index a6798a9f72bb6355ba8b5f6d9cb7d58f3ffe69e9..54b8b485e1dc99d491bd94d4b5b888b60b990b13 100644 (file)
@@ -71,7 +71,7 @@ func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
 
 // GetBlockHandler is a HandleFunc to address Get block requests.
 func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
-       if enforcePermissions {
+       if theConfig.RequireSignatures {
                locator := req.URL.Path[1:] // strip leading slash
                if err := VerifySignature(locator, GetAPIToken(req)); err != nil {
                        http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
@@ -185,8 +185,8 @@ func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
        // return it to the client.
        returnHash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
        apiToken := GetAPIToken(req)
-       if PermissionSecret != nil && apiToken != "" {
-               expiry := time.Now().Add(blobSignatureTTL)
+       if theConfig.blobSigningKey != nil && apiToken != "" {
+               expiry := time.Now().Add(theConfig.BlobSignatureTTL.Duration())
                returnHash = SignLocator(returnHash, apiToken, expiry)
        }
        resp.Header().Set("X-Keep-Replicas-Stored", strconv.Itoa(replication))
@@ -196,7 +196,7 @@ func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
 // IndexHandler is a HandleFunc to address /index and /index/{prefix} requests.
 func IndexHandler(resp http.ResponseWriter, req *http.Request) {
        // Reject unauthorized requests.
-       if !IsDataManagerToken(GetAPIToken(req)) {
+       if !IsSystemAuth(GetAPIToken(req)) {
                http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
                return
        }
@@ -334,7 +334,7 @@ func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
                return
        }
 
-       if neverDelete {
+       if !theConfig.EnableDelete {
                http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
                return
        }
@@ -419,7 +419,7 @@ type PullRequest struct {
 // PullHandler processes "PUT /pull" requests for the data manager.
 func PullHandler(resp http.ResponseWriter, req *http.Request) {
        // Reject unauthorized requests.
-       if !IsDataManagerToken(GetAPIToken(req)) {
+       if !IsSystemAuth(GetAPIToken(req)) {
                http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
                return
        }
@@ -455,7 +455,7 @@ type TrashRequest struct {
 // TrashHandler processes /trash requests.
 func TrashHandler(resp http.ResponseWriter, req *http.Request) {
        // Reject unauthorized requests.
-       if !IsDataManagerToken(GetAPIToken(req)) {
+       if !IsSystemAuth(GetAPIToken(req)) {
                http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
                return
        }
@@ -485,7 +485,7 @@ func TrashHandler(resp http.ResponseWriter, req *http.Request) {
 // UntrashHandler processes "PUT /untrash/{hash:[0-9a-f]{32}}" requests for the data manager.
 func UntrashHandler(resp http.ResponseWriter, req *http.Request) {
        // Reject unauthorized requests.
-       if !IsDataManagerToken(GetAPIToken(req)) {
+       if !IsSystemAuth(GetAPIToken(req)) {
                http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
                return
        }
@@ -746,7 +746,7 @@ func CanDelete(apiToken string) bool {
        }
        // Blocks may be deleted only when Keep has been configured with a
        // data manager.
-       if IsDataManagerToken(apiToken) {
+       if IsSystemAuth(apiToken) {
                return true
        }
        // TODO(twp): look up apiToken with the API server
@@ -755,8 +755,8 @@ func CanDelete(apiToken string) bool {
        return false
 }
 
-// IsDataManagerToken returns true if apiToken represents the data
-// manager's token.
-func IsDataManagerToken(apiToken string) bool {
-       return dataManagerToken != "" && apiToken == dataManagerToken
+// IsSystemAuth returns true if the given token is allowed to perform
+// system level actions like deleting data.
+func IsSystemAuth(token string) bool {
+       return token != "" && token == theConfig.systemAuthToken
 }
index 48b83de4b8aa2a40e62be953e236162d396ddae0..3fb86bc0f147182087cb845ebb4b250891507a27 100644 (file)
@@ -1,32 +1,23 @@
 package main
 
 import (
-       "bytes"
        "flag"
        "fmt"
-       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
-       "git.curoverse.com/arvados.git/sdk/go/httpserver"
-       "git.curoverse.com/arvados.git/sdk/go/keepclient"
-       "io/ioutil"
        "log"
        "net"
        "net/http"
        "os"
        "os/signal"
-       "strings"
        "syscall"
        "time"
-)
 
-// ======================
-// Configuration settings
-//
-// TODO(twp): make all of these configurable via command line flags
-// and/or configuration file settings.
-
-// Default TCP address on which to listen for requests.
-// Initialized by the --listen flag.
-const DefaultAddr = ":25107"
+       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+       "git.curoverse.com/arvados.git/sdk/go/config"
+       "git.curoverse.com/arvados.git/sdk/go/httpserver"
+       "git.curoverse.com/arvados.git/sdk/go/keepclient"
+       "github.com/coreos/go-systemd/daemon"
+       "github.com/ghodss/yaml"
+)
 
 // A Keep "block" is 64MB.
 const BlockSize = 64 * 1024 * 1024
@@ -38,36 +29,6 @@ const MinFreeKilobytes = BlockSize / 1024
 // ProcMounts /proc/mounts
 var ProcMounts = "/proc/mounts"
 
-// enforcePermissions controls whether permission signatures
-// should be enforced (affecting GET and DELETE requests).
-// Initialized by the -enforce-permissions flag.
-var enforcePermissions bool
-
-// blobSignatureTTL is the time duration for which new permission
-// signatures (returned by PUT requests) will be valid.
-// Initialized by the -permission-ttl flag.
-var blobSignatureTTL time.Duration
-
-// dataManagerToken represents the API token used by the
-// Data Manager, and is required on certain privileged operations.
-// Initialized by the -data-manager-token-file flag.
-var dataManagerToken string
-
-// neverDelete can be used to prevent the DELETE handler from
-// actually deleting anything.
-var neverDelete = true
-
-// trashLifetime is the time duration after a block is trashed
-// during which it can be recovered using an /untrash request
-// Use 10s or 10m or 10h to set as 10 seconds or minutes or hours respectively.
-var trashLifetime time.Duration
-
-// trashCheckInterval is the time duration at which the emptyTrash goroutine
-// will check and delete expired trashed blocks. Default is one day.
-// Use 10s or 10m or 10h to set as 10 seconds or minutes or hours respectively.
-var trashCheckInterval time.Duration
-
-var maxBuffers = 128
 var bufs *bufferPool
 
 // KeepError types.
@@ -121,132 +82,50 @@ var KeepVM VolumeManager
 var pullq *WorkQueue
 var trashq *WorkQueue
 
-type volumeSet []Volume
-
-var (
-       flagSerializeIO bool
-       flagReadonly    bool
-       volumes         volumeSet
-)
-
-func (vs *volumeSet) String() string {
-       return fmt.Sprintf("%+v", (*vs)[:])
-}
-
-// TODO(twp): continue moving as much code as possible out of main
-// so it can be effectively tested. Esp. handling and postprocessing
-// of command line flags (identifying Keep volumes and initializing
-// permission arguments).
-
 func main() {
-       log.Println("keepstore starting, pid", os.Getpid())
-       defer log.Println("keepstore exiting, pid", os.Getpid())
+       deprecated.beforeFlagParse(theConfig)
 
-       var (
-               dataManagerTokenFile string
-               listen               string
-               blobSigningKeyFile   string
-               permissionTTLSec     int
-               pidfile              string
-               maxRequests          int
-       )
-       flag.StringVar(
-               &dataManagerTokenFile,
-               "data-manager-token-file",
-               "",
-               "File with the API token used by the Data Manager. All DELETE "+
-                       "requests or GET /index requests must carry this token.")
-       flag.BoolVar(
-               &enforcePermissions,
-               "enforce-permissions",
-               false,
-               "Enforce permission signatures on requests.")
-       flag.StringVar(
-               &listen,
-               "listen",
-               DefaultAddr,
-               "Listening address, in the form \"host:port\". e.g., 10.0.1.24:8000. Omit the host part to listen on all interfaces.")
-       flag.IntVar(
-               &maxRequests,
-               "max-requests",
-               0,
-               "Maximum concurrent requests. When this limit is reached, new requests will receive 503 responses. Note: this limit does not include idle connections from clients using HTTP keepalive, so it does not strictly limit the number of concurrent connections. (default 2 * max-buffers)")
-       flag.BoolVar(
-               &neverDelete,
-               "never-delete",
-               true,
-               "If true, nothing will be deleted. "+
-                       "Warning: the relevant features in keepstore and data manager have not been extensively tested. "+
-                       "You should leave this option alone unless you can afford to lose data.")
-       flag.StringVar(
-               &blobSigningKeyFile,
-               "permission-key-file",
-               "",
-               "Synonym for -blob-signing-key-file.")
-       flag.StringVar(
-               &blobSigningKeyFile,
-               "blob-signing-key-file",
-               "",
-               "File containing the secret key for generating and verifying "+
-                       "blob permission signatures.")
-       flag.IntVar(
-               &permissionTTLSec,
-               "permission-ttl",
-               0,
-               "Synonym for -blob-signature-ttl.")
-       flag.IntVar(
-               &permissionTTLSec,
-               "blob-signature-ttl",
-               2*7*24*3600,
-               "Lifetime of blob permission signatures in seconds. Modifying the ttl will invalidate all existing signatures. "+
-                       "See services/api/config/application.default.yml.")
-       flag.BoolVar(
-               &flagSerializeIO,
-               "serialize",
-               false,
-               "Serialize read and write operations on the following volumes.")
-       flag.BoolVar(
-               &flagReadonly,
-               "readonly",
-               false,
-               "Do not write, delete, or touch anything on the following volumes.")
-       flag.StringVar(
-               &pidfile,
-               "pid",
-               "",
-               "Path to write pid file during startup. This file is kept open and locked with LOCK_EX until keepstore exits, so `fuser -k pidfile` is one way to shut down. Exit immediately if there is an error opening, locking, or writing the pid file.")
-       flag.IntVar(
-               &maxBuffers,
-               "max-buffers",
-               maxBuffers,
-               fmt.Sprintf("Maximum RAM to use for data buffers, given in multiples of block size (%d MiB). When this limit is reached, HTTP requests requiring buffers (like GET and PUT) will wait for buffer space to be released.", BlockSize>>20))
-       flag.DurationVar(
-               &trashLifetime,
-               "trash-lifetime",
-               0,
-               "Time duration after a block is trashed during which it can be recovered using an /untrash request")
-       flag.DurationVar(
-               &trashCheckInterval,
-               "trash-check-interval",
-               24*time.Hour,
-               "Time duration at which the emptyTrash goroutine will check and delete expired trashed blocks. Default is one day.")
+       dumpConfig := flag.Bool("dump-config", false, "write current configuration to stdout and exit (useful for migrating from command line flags to config file)")
 
+       defaultConfigPath := "/etc/arvados/keepstore/keepstore.yml"
+       var configPath string
+       flag.StringVar(
+               &configPath,
+               "config",
+               defaultConfigPath,
+               "YAML or JSON configuration file `path`")
+       flag.Usage = usage
        flag.Parse()
 
-       if maxBuffers < 0 {
-               log.Fatal("-max-buffers must be greater than zero.")
+       deprecated.afterFlagParse(theConfig)
+
+       err := config.LoadFile(theConfig, configPath)
+       if err != nil && (!os.IsNotExist(err) || configPath != defaultConfigPath) {
+               log.Fatal(err)
+       }
+
+       if *dumpConfig {
+               y, err := yaml.Marshal(theConfig)
+               if err != nil {
+                       log.Fatal(err)
+               }
+               os.Stdout.Write(y)
+               os.Exit(0)
        }
-       bufs = newBufferPool(maxBuffers, BlockSize)
 
-       if pidfile != "" {
+       err = theConfig.Start()
+
+       if pidfile := theConfig.PIDFile; pidfile != "" {
                f, err := os.OpenFile(pidfile, os.O_RDWR|os.O_CREATE, 0777)
                if err != nil {
                        log.Fatalf("open pidfile (%s): %s", pidfile, err)
                }
+               defer f.Close()
                err = syscall.Flock(int(f.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
                if err != nil {
                        log.Fatalf("flock pidfile (%s): %s", pidfile, err)
                }
+               defer os.Remove(pidfile)
                err = f.Truncate(0)
                if err != nil {
                        log.Fatalf("truncate pidfile (%s): %s", pidfile, err)
@@ -259,74 +138,22 @@ func main() {
                if err != nil {
                        log.Fatalf("sync pidfile (%s): %s", pidfile, err)
                }
-               defer f.Close()
-               defer os.Remove(pidfile)
-       }
-
-       if len(volumes) == 0 {
-               if (&unixVolumeAdder{&volumes}).Discover() == 0 {
-                       log.Fatal("No volumes found.")
-               }
-       }
-
-       for _, v := range volumes {
-               log.Printf("Using volume %v (writable=%v)", v, v.Writable())
        }
 
-       // Initialize data manager token and permission key.
-       // If these tokens are specified but cannot be read,
-       // raise a fatal error.
-       if dataManagerTokenFile != "" {
-               if buf, err := ioutil.ReadFile(dataManagerTokenFile); err == nil {
-                       dataManagerToken = strings.TrimSpace(string(buf))
-               } else {
-                       log.Fatalf("reading data manager token: %s\n", err)
-               }
-       }
-
-       if neverDelete != true {
-               log.Print("never-delete is not set. Warning: the relevant features in keepstore and data manager have not " +
-                       "been extensively tested. You should leave this option alone unless you can afford to lose data.")
-       }
-
-       if blobSigningKeyFile != "" {
-               if buf, err := ioutil.ReadFile(blobSigningKeyFile); err == nil {
-                       PermissionSecret = bytes.TrimSpace(buf)
-               } else {
-                       log.Fatalf("reading permission key: %s\n", err)
-               }
-       }
-
-       blobSignatureTTL = time.Duration(permissionTTLSec) * time.Second
-
-       if PermissionSecret == nil {
-               if enforcePermissions {
-                       log.Fatal("-enforce-permissions requires a permission key")
-               } else {
-                       log.Println("Running without a PermissionSecret. Block locators " +
-                               "returned by this server will not be signed, and will be rejected " +
-                               "by a server that enforces permissions.")
-                       log.Println("To fix this, use the -blob-signing-key-file flag " +
-                               "to specify the file containing the permission key.")
-               }
-       }
-
-       if maxRequests <= 0 {
-               maxRequests = maxBuffers * 2
-               log.Printf("-max-requests <1 or not specified; defaulting to maxBuffers * 2 == %d", maxRequests)
-       }
+       log.Println("keepstore starting, pid", os.Getpid())
+       defer log.Println("keepstore exiting, pid", os.Getpid())
 
        // Start a round-robin VolumeManager with the volumes we have found.
-       KeepVM = MakeRRVolumeManager(volumes)
+       KeepVM = MakeRRVolumeManager(theConfig.Volumes)
 
-       // Middleware stack: logger, maxRequests limiter, method handlers
+       // Middleware stack: logger, MaxRequests limiter, method handlers
        http.Handle("/", &LoggingRESTRouter{
-               httpserver.NewRequestLimiter(maxRequests,
+               httpserver.NewRequestLimiter(theConfig.MaxRequests,
                        MakeRESTRouter()),
        })
 
        // Set up a TCP listener.
-       listener, err := net.Listen("tcp", listen)
+       listener, err := net.Listen("tcp", theConfig.Listen)
        if err != nil {
                log.Fatal(err)
        }
@@ -348,7 +175,7 @@ func main() {
 
        // Start emptyTrash goroutine
        doneEmptyingTrash := make(chan bool)
-       go emptyTrash(doneEmptyingTrash, trashCheckInterval)
+       go emptyTrash(doneEmptyingTrash, theConfig.TrashCheckInterval.Duration())
 
        // Shut down the server gracefully (by closing the listener)
        // if SIGTERM is received.
@@ -362,24 +189,27 @@ func main() {
        signal.Notify(term, syscall.SIGTERM)
        signal.Notify(term, syscall.SIGINT)
 
-       log.Println("listening at", listen)
-       srv := &http.Server{Addr: listen}
+       if _, err := daemon.SdNotify("READY=1"); err != nil {
+               log.Printf("Error notifying init daemon: %v", err)
+       }
+       log.Println("listening at", listener.Addr())
+       srv := &http.Server{}
        srv.Serve(listener)
 }
 
-// At every trashCheckInterval tick, invoke EmptyTrash on all volumes.
-func emptyTrash(doneEmptyingTrash chan bool, trashCheckInterval time.Duration) {
-       ticker := time.NewTicker(trashCheckInterval)
+// Periodically (once per interval) invoke EmptyTrash on all volumes.
+func emptyTrash(done <-chan bool, interval time.Duration) {
+       ticker := time.NewTicker(interval)
 
        for {
                select {
                case <-ticker.C:
-                       for _, v := range volumes {
+                       for _, v := range theConfig.Volumes {
                                if v.Writable() {
                                        v.EmptyTrash()
                                }
                        }
-               case <-doneEmptyingTrash:
+               case <-done:
                        ticker.Stop()
                        return
                }
diff --git a/services/keepstore/keepstore.service b/services/keepstore/keepstore.service
new file mode 100644 (file)
index 0000000..b9e2793
--- /dev/null
@@ -0,0 +1,13 @@
+[Unit]
+Description=Arvados Keep Storage Daemon
+Documentation=https://doc.arvados.org/
+After=network.target
+AssertPathExists=/etc/arvados/keepstore/keepstore.yml
+
+[Service]
+Type=notify
+ExecStart=/usr/bin/keepstore
+Restart=always
+
+[Install]
+WantedBy=multi-user.target
index c0adbc0bd74dad7d115dfe70a3374b2815704c56..dc6af0fa0d651c79cd6694a006fd3ad83ba2d677 100644 (file)
@@ -341,23 +341,23 @@ func TestDiscoverTmpfs(t *testing.T) {
        f.Close()
        ProcMounts = f.Name()
 
-       resultVols := volumeSet{}
-       added := (&unixVolumeAdder{&resultVols}).Discover()
+       cfg := &Config{}
+       added := (&unixVolumeAdder{cfg}).Discover()
 
-       if added != len(resultVols) {
+       if added != len(cfg.Volumes) {
                t.Errorf("Discover returned %d, but added %d volumes",
-                       added, len(resultVols))
+                       added, len(cfg.Volumes))
        }
        if added != len(tempVols) {
                t.Errorf("Discover returned %d but we set up %d volumes",
                        added, len(tempVols))
        }
        for i, tmpdir := range tempVols {
-               if tmpdir != resultVols[i].(*UnixVolume).root {
+               if tmpdir != cfg.Volumes[i].(*UnixVolume).Root {
                        t.Errorf("Discover returned %s, expected %s\n",
-                               resultVols[i].(*UnixVolume).root, tmpdir)
+                               cfg.Volumes[i].(*UnixVolume).Root, tmpdir)
                }
-               if expectReadonly := i%2 == 1; expectReadonly != resultVols[i].(*UnixVolume).readonly {
+               if expectReadonly := i%2 == 1; expectReadonly != cfg.Volumes[i].(*UnixVolume).ReadOnly {
                        t.Errorf("Discover added %s with readonly=%v, should be %v",
                                tmpdir, !expectReadonly, expectReadonly)
                }
@@ -381,10 +381,10 @@ func TestDiscoverNone(t *testing.T) {
        f.Close()
        ProcMounts = f.Name()
 
-       resultVols := volumeSet{}
-       added := (&unixVolumeAdder{&resultVols}).Discover()
-       if added != 0 || len(resultVols) != 0 {
-               t.Fatalf("got %d, %v; expected 0, []", added, resultVols)
+       cfg := &Config{}
+       added := (&unixVolumeAdder{cfg}).Discover()
+       if added != 0 || len(cfg.Volumes) != 0 {
+               t.Fatalf("got %d, %v; expected 0, []", added, cfg.Volumes)
        }
 }
 
@@ -442,8 +442,8 @@ func MakeTestVolumeManager(numVolumes int) VolumeManager {
 
 // teardown cleans up after each test.
 func teardown() {
-       dataManagerToken = ""
-       enforcePermissions = false
-       PermissionSecret = nil
+       theConfig.systemAuthToken = ""
+       theConfig.RequireSignatures = false
+       theConfig.blobSigningKey = nil
        KeepVM = nil
 }
index 9cd97bd3b746b1d66c0eba3b002fe5c9b8d70083..38445d982b438e3b744f09c74727d81d11304c84 100644 (file)
@@ -5,15 +5,10 @@ import (
        "time"
 )
 
-// The PermissionSecret is the secret key used to generate SHA1
-// digests for permission hints. apiserver and Keep must use the same
-// key.
-var PermissionSecret []byte
-
 // SignLocator takes a blobLocator, an apiToken and an expiry time, and
 // returns a signed locator string.
 func SignLocator(blobLocator, apiToken string, expiry time.Time) string {
-       return keepclient.SignLocator(blobLocator, apiToken, expiry, blobSignatureTTL, PermissionSecret)
+       return keepclient.SignLocator(blobLocator, apiToken, expiry, theConfig.BlobSignatureTTL.Duration(), theConfig.blobSigningKey)
 }
 
 // VerifySignature returns nil if the signature on the signedLocator
@@ -22,7 +17,7 @@ func SignLocator(blobLocator, apiToken string, expiry time.Time) string {
 // something the client could have figured out independently) or
 // PermissionError.
 func VerifySignature(signedLocator, apiToken string) error {
-       err := keepclient.VerifySignature(signedLocator, apiToken, blobSignatureTTL, PermissionSecret)
+       err := keepclient.VerifySignature(signedLocator, apiToken, theConfig.BlobSignatureTTL.Duration(), theConfig.blobSigningKey)
        if err == keepclient.ErrSignatureExpired {
                return ExpiredError
        } else if err != nil {
index 43717b23720d8c71b32c126810f8e39dd41a0429..8e47e4a4429c77c99df8b155f33dd6d56b801855 100644 (file)
@@ -4,6 +4,8 @@ import (
        "strconv"
        "testing"
        "time"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
 )
 
 const (
@@ -17,7 +19,7 @@ const (
                "gokee3eamvjy8qq1fvy238838enjmy5wzy2md7yvsitp5vztft6j4q866efym7e6" +
                "vu5wm9fpnwjyxfldw3vbo01mgjs75rgo7qioh8z8ij7jpyp8508okhgbbex3ceei" +
                "786u5rw2a9gx743dj3fgq2irk"
-       knownSignatureTTL  = 1209600 * time.Second
+       knownSignatureTTL  = arvados.Duration(24 * 14 * time.Hour)
        knownSignature     = "89118b78732c33104a4d6231e8b5a5fa1e4301e3"
        knownTimestamp     = "7fffffff"
        knownSigHint       = "+A" + knownSignature + "@" + knownTimestamp
@@ -26,8 +28,8 @@ const (
 
 func TestSignLocator(t *testing.T) {
        defer func(b []byte) {
-               PermissionSecret = b
-       }(PermissionSecret)
+               theConfig.blobSigningKey = b
+       }(theConfig.blobSigningKey)
 
        tsInt, err := strconv.ParseInt(knownTimestamp, 16, 0)
        if err != nil {
@@ -35,33 +37,33 @@ func TestSignLocator(t *testing.T) {
        }
        t0 := time.Unix(tsInt, 0)
 
-       blobSignatureTTL = knownSignatureTTL
+       theConfig.BlobSignatureTTL = knownSignatureTTL
 
-       PermissionSecret = []byte(knownKey)
+       theConfig.blobSigningKey = []byte(knownKey)
        if x := SignLocator(knownLocator, knownToken, t0); x != knownSignedLocator {
                t.Fatalf("Got %+q, expected %+q", x, knownSignedLocator)
        }
 
-       PermissionSecret = []byte("arbitrarykey")
+       theConfig.blobSigningKey = []byte("arbitrarykey")
        if x := SignLocator(knownLocator, knownToken, t0); x == knownSignedLocator {
-               t.Fatalf("Got same signature %+q, even though PermissionSecret changed", x)
+               t.Fatalf("Got same signature %+q, even though blobSigningKey changed", x)
        }
 }
 
 func TestVerifyLocator(t *testing.T) {
        defer func(b []byte) {
-               PermissionSecret = b
-       }(PermissionSecret)
+               theConfig.blobSigningKey = b
+       }(theConfig.blobSigningKey)
 
-       blobSignatureTTL = knownSignatureTTL
+       theConfig.BlobSignatureTTL = knownSignatureTTL
 
-       PermissionSecret = []byte(knownKey)
+       theConfig.blobSigningKey = []byte(knownKey)
        if err := VerifySignature(knownSignedLocator, knownToken); err != nil {
                t.Fatal(err)
        }
 
-       PermissionSecret = []byte("arbitrarykey")
+       theConfig.blobSigningKey = []byte("arbitrarykey")
        if err := VerifySignature(knownSignedLocator, knownToken); err == nil {
-               t.Fatal("Verified signature even with wrong PermissionSecret")
+               t.Fatal("Verified signature even with wrong blobSigningKey")
        }
 }
index 4d85d5fd20cf6abe408035d294e3d85c5d011251..43a6de68443f693acb85696a0f5938836a34b2f7 100644 (file)
@@ -84,10 +84,10 @@ type PullWorkerTestData struct {
 func (s *PullWorkerTestSuite) TestPullWorkerPullList_with_two_locators(c *C) {
        defer teardown()
 
-       dataManagerToken = "DATA MANAGER TOKEN"
+       theConfig.systemAuthToken = "DATA MANAGER TOKEN"
        testData := PullWorkerTestData{
                name:         "TestPullWorkerPullList_with_two_locators",
-               req:          RequestTester{"/pull", dataManagerToken, "PUT", firstPullList},
+               req:          RequestTester{"/pull", theConfig.systemAuthToken, "PUT", firstPullList},
                responseCode: http.StatusOK,
                responseBody: "Received 2 pull requests\n",
                readContent:  "hello",
@@ -101,10 +101,10 @@ func (s *PullWorkerTestSuite) TestPullWorkerPullList_with_two_locators(c *C) {
 func (s *PullWorkerTestSuite) TestPullWorkerPullList_with_one_locator(c *C) {
        defer teardown()
 
-       dataManagerToken = "DATA MANAGER TOKEN"
+       theConfig.systemAuthToken = "DATA MANAGER TOKEN"
        testData := PullWorkerTestData{
                name:         "TestPullWorkerPullList_with_one_locator",
-               req:          RequestTester{"/pull", dataManagerToken, "PUT", secondPullList},
+               req:          RequestTester{"/pull", theConfig.systemAuthToken, "PUT", secondPullList},
                responseCode: http.StatusOK,
                responseBody: "Received 1 pull requests\n",
                readContent:  "hola",
@@ -118,10 +118,10 @@ func (s *PullWorkerTestSuite) TestPullWorkerPullList_with_one_locator(c *C) {
 func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_one_locator(c *C) {
        defer teardown()
 
-       dataManagerToken = "DATA MANAGER TOKEN"
+       theConfig.systemAuthToken = "DATA MANAGER TOKEN"
        testData := PullWorkerTestData{
                name:         "TestPullWorker_error_on_get_one_locator",
-               req:          RequestTester{"/pull", dataManagerToken, "PUT", secondPullList},
+               req:          RequestTester{"/pull", theConfig.systemAuthToken, "PUT", secondPullList},
                responseCode: http.StatusOK,
                responseBody: "Received 1 pull requests\n",
                readContent:  "unused",
@@ -135,10 +135,10 @@ func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_one_locator(c *C) {
 func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_two_locators(c *C) {
        defer teardown()
 
-       dataManagerToken = "DATA MANAGER TOKEN"
+       theConfig.systemAuthToken = "DATA MANAGER TOKEN"
        testData := PullWorkerTestData{
                name:         "TestPullWorker_error_on_get_two_locators",
-               req:          RequestTester{"/pull", dataManagerToken, "PUT", firstPullList},
+               req:          RequestTester{"/pull", theConfig.systemAuthToken, "PUT", firstPullList},
                responseCode: http.StatusOK,
                responseBody: "Received 2 pull requests\n",
                readContent:  "unused",
@@ -152,10 +152,10 @@ func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_two_locators(c *C) {
 func (s *PullWorkerTestSuite) TestPullWorker_error_on_put_one_locator(c *C) {
        defer teardown()
 
-       dataManagerToken = "DATA MANAGER TOKEN"
+       theConfig.systemAuthToken = "DATA MANAGER TOKEN"
        testData := PullWorkerTestData{
                name:         "TestPullWorker_error_on_put_one_locator",
-               req:          RequestTester{"/pull", dataManagerToken, "PUT", secondPullList},
+               req:          RequestTester{"/pull", theConfig.systemAuthToken, "PUT", secondPullList},
                responseCode: http.StatusOK,
                responseBody: "Received 1 pull requests\n",
                readContent:  "hello hello",
@@ -169,10 +169,10 @@ func (s *PullWorkerTestSuite) TestPullWorker_error_on_put_one_locator(c *C) {
 func (s *PullWorkerTestSuite) TestPullWorker_error_on_put_two_locators(c *C) {
        defer teardown()
 
-       dataManagerToken = "DATA MANAGER TOKEN"
+       theConfig.systemAuthToken = "DATA MANAGER TOKEN"
        testData := PullWorkerTestData{
                name:         "TestPullWorker_error_on_put_two_locators",
-               req:          RequestTester{"/pull", dataManagerToken, "PUT", firstPullList},
+               req:          RequestTester{"/pull", theConfig.systemAuthToken, "PUT", firstPullList},
                responseCode: http.StatusOK,
                responseBody: "Received 2 pull requests\n",
                readContent:  "hello again",
@@ -195,10 +195,10 @@ func (s *PullWorkerTestSuite) TestPullWorkerPullList_with_two_items_latest_repla
        pullq.ReplaceQueue(makeTestWorkList(firstInput))
        testPullLists["Added_before_actual_test_item"] = string(1)
 
-       dataManagerToken = "DATA MANAGER TOKEN"
+       theConfig.systemAuthToken = "DATA MANAGER TOKEN"
        testData := PullWorkerTestData{
                name:         "TestPullWorkerPullList_with_two_items_latest_replacing_old",
-               req:          RequestTester{"/pull", dataManagerToken, "PUT", secondPullList},
+               req:          RequestTester{"/pull", theConfig.systemAuthToken, "PUT", secondPullList},
                responseCode: http.StatusOK,
                responseBody: "Received 1 pull requests\n",
                readContent:  "hola de nuevo",
@@ -210,14 +210,14 @@ func (s *PullWorkerTestSuite) TestPullWorkerPullList_with_two_items_latest_repla
 }
 
 // In this case, the item will not be placed on pullq
-func (s *PullWorkerTestSuite) TestPullWorker_invalid_dataManagerToken(c *C) {
+func (s *PullWorkerTestSuite) TestPullWorker_invalidToken(c *C) {
        defer teardown()
 
-       dataManagerToken = "DATA MANAGER TOKEN"
+       theConfig.systemAuthToken = "DATA MANAGER TOKEN"
 
        testData := PullWorkerTestData{
                name:         "TestPullWorkerPullList_with_two_locators",
-               req:          RequestTester{"/pull", "invalid_dataManagerToken", "PUT", firstPullList},
+               req:          RequestTester{"/pull", "invalidToken", "PUT", firstPullList},
                responseCode: http.StatusUnauthorized,
                responseBody: "Unauthorized\n",
                readContent:  "hello",
index 1a2a47b0df3b27d9c38ae7f4c8986a0f48f933a2..caed35b670e9484e9978e771e0e8c2aea2532e52 100644 (file)
@@ -11,8 +11,10 @@ import (
        "os"
        "regexp"
        "strings"
+       "sync"
        "time"
 
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
        "github.com/AdRoll/goamz/aws"
        "github.com/AdRoll/goamz/s3"
 )
@@ -39,7 +41,12 @@ const (
 )
 
 type s3VolumeAdder struct {
-       *volumeSet
+       *Config
+}
+
+// String implements flag.Value
+func (s *s3VolumeAdder) String() string {
+       return "-"
 }
 
 func (s *s3VolumeAdder) Set(bucketName string) error {
@@ -49,39 +56,21 @@ func (s *s3VolumeAdder) Set(bucketName string) error {
        if s3AccessKeyFile == "" || s3SecretKeyFile == "" {
                return fmt.Errorf("-s3-access-key-file and -s3-secret-key-file arguments must given before -s3-bucket-volume")
        }
-       region, ok := aws.Regions[s3RegionName]
-       if s3Endpoint == "" {
-               if !ok {
-                       return fmt.Errorf("unrecognized region %+q; try specifying -s3-endpoint instead", s3RegionName)
-               }
-       } else {
-               if ok {
-                       return fmt.Errorf("refusing to use AWS region name %+q with endpoint %+q; "+
-                               "specify empty endpoint (\"-s3-endpoint=\") or use a different region name", s3RegionName, s3Endpoint)
-               }
-               region = aws.Region{
-                       Name:       s3RegionName,
-                       S3Endpoint: s3Endpoint,
-               }
-       }
-       var err error
-       var auth aws.Auth
-       auth.AccessKey, err = readKeyFromFile(s3AccessKeyFile)
-       if err != nil {
-               return err
-       }
-       auth.SecretKey, err = readKeyFromFile(s3SecretKeyFile)
-       if err != nil {
-               return err
-       }
-       if flagSerializeIO {
+       if deprecated.flagSerializeIO {
                log.Print("Notice: -serialize is not supported by s3-bucket volumes.")
        }
-       v := NewS3Volume(auth, region, bucketName, s3RaceWindow, flagReadonly, s3Replication)
-       if err := v.Check(); err != nil {
-               return err
-       }
-       *s.volumeSet = append(*s.volumeSet, v)
+       s.Config.Volumes = append(s.Config.Volumes, &S3Volume{
+               Bucket:        bucketName,
+               AccessKeyFile: s3AccessKeyFile,
+               SecretKeyFile: s3SecretKeyFile,
+               Endpoint:      s3Endpoint,
+               Region:        s3RegionName,
+               RaceWindow:    arvados.Duration(s3RaceWindow),
+               S3Replication: s3Replication,
+               UnsafeDelete:  s3UnsafeDelete,
+               ReadOnly:      deprecated.flagReadonly,
+               IndexPageSize: 1000,
+       })
        return nil
 }
 
@@ -93,7 +82,9 @@ func s3regions() (okList []string) {
 }
 
 func init() {
-       flag.Var(&s3VolumeAdder{&volumes},
+       VolumeTypes = append(VolumeTypes, func() VolumeWithExamples { return &S3Volume{} })
+
+       flag.Var(&s3VolumeAdder{theConfig},
                "s3-bucket-volume",
                "Use the given bucket as a storage volume. Can be given multiple times.")
        flag.StringVar(
@@ -110,12 +101,12 @@ func init() {
                &s3AccessKeyFile,
                "s3-access-key-file",
                "",
-               "File containing the access key used for subsequent -s3-bucket-volume arguments.")
+               "`File` containing the access key used for subsequent -s3-bucket-volume arguments.")
        flag.StringVar(
                &s3SecretKeyFile,
                "s3-secret-key-file",
                "",
-               "File containing the secret key used for subsequent -s3-bucket-volume arguments.")
+               "`File` containing the secret key used for subsequent -s3-bucket-volume arguments.")
        flag.DurationVar(
                &s3RaceWindow,
                "s3-race-window",
@@ -135,32 +126,87 @@ func init() {
 
 // S3Volume implements Volume using an S3 bucket.
 type S3Volume struct {
-       *s3.Bucket
-       raceWindow    time.Duration
-       readonly      bool
-       replication   int
-       indexPageSize int
-}
-
-// NewS3Volume returns a new S3Volume using the given auth, region,
-// and bucket name. The replication argument specifies the replication
-// level to report when writing data.
-func NewS3Volume(auth aws.Auth, region aws.Region, bucket string, raceWindow time.Duration, readonly bool, replication int) *S3Volume {
-       return &S3Volume{
-               Bucket: &s3.Bucket{
-                       S3:   s3.New(auth, region),
-                       Name: bucket,
+       AccessKeyFile      string
+       SecretKeyFile      string
+       Endpoint           string
+       Region             string
+       Bucket             string
+       LocationConstraint bool
+       IndexPageSize      int
+       S3Replication      int
+       RaceWindow         arvados.Duration
+       ReadOnly           bool
+       UnsafeDelete       bool
+
+       bucket *s3.Bucket
+
+       startOnce sync.Once
+}
+
+// Examples implements VolumeWithExamples.
+func (*S3Volume) Examples() []Volume {
+       return []Volume{
+               &S3Volume{
+                       AccessKeyFile: "/etc/aws_s3_access_key.txt",
+                       SecretKeyFile: "/etc/aws_s3_secret_key.txt",
+                       Endpoint:      "",
+                       Region:        "us-east-1",
+                       Bucket:        "example-bucket-name",
+                       IndexPageSize: 1000,
+                       S3Replication: 2,
+                       RaceWindow:    arvados.Duration(24 * time.Hour),
+               },
+               &S3Volume{
+                       AccessKeyFile: "/etc/gce_s3_access_key.txt",
+                       SecretKeyFile: "/etc/gce_s3_secret_key.txt",
+                       Endpoint:      "https://storage.googleapis.com",
+                       Region:        "",
+                       Bucket:        "example-bucket-name",
+                       IndexPageSize: 1000,
+                       S3Replication: 2,
+                       RaceWindow:    arvados.Duration(24 * time.Hour),
                },
-               raceWindow:    raceWindow,
-               readonly:      readonly,
-               replication:   replication,
-               indexPageSize: 1000,
        }
 }
 
-// Check returns an error if the volume is inaccessible (e.g., config
-// error).
-func (v *S3Volume) Check() error {
+// Type implements Volume.
+func (*S3Volume) Type() string {
+       return "S3"
+}
+
+// Start populates private fields and verifies the configuration is
+// valid.
+func (v *S3Volume) Start() error {
+       region, ok := aws.Regions[v.Region]
+       if v.Endpoint == "" {
+               if !ok {
+                       return fmt.Errorf("unrecognized region %+q; try specifying -s3-endpoint instead", v.Region)
+               }
+       } else if ok {
+               return fmt.Errorf("refusing to use AWS region name %+q with endpoint %+q; "+
+                       "specify empty endpoint (\"-s3-endpoint=\") or use a different region name", v.Region, v.Endpoint)
+       } else {
+               region = aws.Region{
+                       Name:                 v.Region,
+                       S3Endpoint:           v.Endpoint,
+                       S3LocationConstraint: v.LocationConstraint,
+               }
+       }
+
+       var err error
+       var auth aws.Auth
+       auth.AccessKey, err = readKeyFromFile(v.AccessKeyFile)
+       if err != nil {
+               return err
+       }
+       auth.SecretKey, err = readKeyFromFile(v.SecretKeyFile)
+       if err != nil {
+               return err
+       }
+       v.bucket = &s3.Bucket{
+               S3:   s3.New(auth, region),
+               Name: v.Bucket,
+       }
        return nil
 }
 
@@ -170,12 +216,12 @@ func (v *S3Volume) Check() error {
 // disappeared in a Trash race, getReader calls fixRace to recover the
 // data, and tries again.
 func (v *S3Volume) getReader(loc string) (rdr io.ReadCloser, err error) {
-       rdr, err = v.Bucket.GetReader(loc)
+       rdr, err = v.bucket.GetReader(loc)
        err = v.translateError(err)
        if err == nil || !os.IsNotExist(err) {
                return
        }
-       _, err = v.Bucket.Head("recent/"+loc, nil)
+       _, err = v.bucket.Head("recent/"+loc, nil)
        err = v.translateError(err)
        if err != nil {
                // If we can't read recent/X, there's no point in
@@ -186,7 +232,7 @@ func (v *S3Volume) getReader(loc string) (rdr io.ReadCloser, err error) {
                err = os.ErrNotExist
                return
        }
-       rdr, err = v.Bucket.GetReader(loc)
+       rdr, err = v.bucket.GetReader(loc)
        if err != nil {
                log.Printf("warning: reading %s after successful fixRace: %s", loc, err)
                err = v.translateError(err)
@@ -223,7 +269,7 @@ func (v *S3Volume) Compare(loc string, expect []byte) error {
 
 // Put writes a block.
 func (v *S3Volume) Put(loc string, block []byte) error {
-       if v.readonly {
+       if v.ReadOnly {
                return MethodDisabledError
        }
        var opts s3.Options
@@ -234,20 +280,20 @@ func (v *S3Volume) Put(loc string, block []byte) error {
                }
                opts.ContentMD5 = base64.StdEncoding.EncodeToString(md5)
        }
-       err := v.Bucket.Put(loc, block, "application/octet-stream", s3ACL, opts)
+       err := v.bucket.Put(loc, block, "application/octet-stream", s3ACL, opts)
        if err != nil {
                return v.translateError(err)
        }
-       err = v.Bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
+       err = v.bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
        return v.translateError(err)
 }
 
 // Touch sets the timestamp for the given locator to the current time.
 func (v *S3Volume) Touch(loc string) error {
-       if v.readonly {
+       if v.ReadOnly {
                return MethodDisabledError
        }
-       _, err := v.Bucket.Head(loc, nil)
+       _, err := v.bucket.Head(loc, nil)
        err = v.translateError(err)
        if os.IsNotExist(err) && v.fixRace(loc) {
                // The data object got trashed in a race, but fixRace
@@ -255,27 +301,27 @@ func (v *S3Volume) Touch(loc string) error {
        } else if err != nil {
                return err
        }
-       err = v.Bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
+       err = v.bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
        return v.translateError(err)
 }
 
 // Mtime returns the stored timestamp for the given locator.
 func (v *S3Volume) Mtime(loc string) (time.Time, error) {
-       _, err := v.Bucket.Head(loc, nil)
+       _, err := v.bucket.Head(loc, nil)
        if err != nil {
                return zeroTime, v.translateError(err)
        }
-       resp, err := v.Bucket.Head("recent/"+loc, nil)
+       resp, err := v.bucket.Head("recent/"+loc, nil)
        err = v.translateError(err)
        if os.IsNotExist(err) {
                // The data object X exists, but recent/X is missing.
-               err = v.Bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
+               err = v.bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
                if err != nil {
                        log.Printf("error: creating %q: %s", "recent/"+loc, err)
                        return zeroTime, v.translateError(err)
                }
                log.Printf("info: created %q to migrate existing block to new storage scheme", "recent/"+loc)
-               resp, err = v.Bucket.Head("recent/"+loc, nil)
+               resp, err = v.bucket.Head("recent/"+loc, nil)
                if err != nil {
                        log.Printf("error: created %q but HEAD failed: %s", "recent/"+loc, err)
                        return zeroTime, v.translateError(err)
@@ -292,14 +338,14 @@ func (v *S3Volume) Mtime(loc string) (time.Time, error) {
 func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
        // Use a merge sort to find matching sets of X and recent/X.
        dataL := s3Lister{
-               Bucket:   v.Bucket,
+               Bucket:   v.bucket,
                Prefix:   prefix,
-               PageSize: v.indexPageSize,
+               PageSize: v.IndexPageSize,
        }
        recentL := s3Lister{
-               Bucket:   v.Bucket,
+               Bucket:   v.bucket,
                Prefix:   "recent/" + prefix,
-               PageSize: v.indexPageSize,
+               PageSize: v.IndexPageSize,
        }
        for data, recent := dataL.First(), recentL.First(); data != nil; data = dataL.Next() {
                if data.Key >= "g" {
@@ -346,19 +392,19 @@ func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
 
 // Trash a Keep block.
 func (v *S3Volume) Trash(loc string) error {
-       if v.readonly {
+       if v.ReadOnly {
                return MethodDisabledError
        }
        if t, err := v.Mtime(loc); err != nil {
                return err
-       } else if time.Since(t) < blobSignatureTTL {
+       } else if time.Since(t) < theConfig.BlobSignatureTTL.Duration() {
                return nil
        }
-       if trashLifetime == 0 {
+       if theConfig.TrashLifetime == 0 {
                if !s3UnsafeDelete {
                        return ErrS3TrashDisabled
                }
-               return v.Bucket.Del(loc)
+               return v.bucket.Del(loc)
        }
        err := v.checkRaceWindow(loc)
        if err != nil {
@@ -368,13 +414,13 @@ func (v *S3Volume) Trash(loc string) error {
        if err != nil {
                return err
        }
-       return v.translateError(v.Bucket.Del(loc))
+       return v.translateError(v.bucket.Del(loc))
 }
 
 // checkRaceWindow returns a non-nil error if trash/loc is, or might
 // be, in the race window (i.e., it's not safe to trash loc).
 func (v *S3Volume) checkRaceWindow(loc string) error {
-       resp, err := v.Bucket.Head("trash/"+loc, nil)
+       resp, err := v.bucket.Head("trash/"+loc, nil)
        err = v.translateError(err)
        if os.IsNotExist(err) {
                // OK, trash/X doesn't exist so we're not in the race
@@ -390,7 +436,7 @@ func (v *S3Volume) checkRaceWindow(loc string) error {
                // Can't parse timestamp
                return err
        }
-       safeWindow := t.Add(trashLifetime).Sub(time.Now().Add(v.raceWindow))
+       safeWindow := t.Add(theConfig.TrashLifetime.Duration()).Sub(time.Now().Add(time.Duration(v.RaceWindow)))
        if safeWindow <= 0 {
                // We can't count on "touch trash/X" to prolong
                // trash/X's lifetime. The new timestamp might not
@@ -408,10 +454,10 @@ func (v *S3Volume) checkRaceWindow(loc string) error {
 // (PutCopy returns 200 OK if the request was received, even if the
 // copy failed).
 func (v *S3Volume) safeCopy(dst, src string) error {
-       resp, err := v.Bucket.PutCopy(dst, s3ACL, s3.CopyOptions{
+       resp, err := v.bucket.PutCopy(dst, s3ACL, s3.CopyOptions{
                ContentType:       "application/octet-stream",
                MetadataDirective: "REPLACE",
-       }, v.Bucket.Name+"/"+src)
+       }, v.bucket.Name+"/"+src)
        err = v.translateError(err)
        if err != nil {
                return err
@@ -446,7 +492,7 @@ func (v *S3Volume) Untrash(loc string) error {
        if err != nil {
                return err
        }
-       err = v.Bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
+       err = v.bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
        return v.translateError(err)
 }
 
@@ -463,19 +509,19 @@ func (v *S3Volume) Status() *VolumeStatus {
 
 // String implements fmt.Stringer.
 func (v *S3Volume) String() string {
-       return fmt.Sprintf("s3-bucket:%+q", v.Bucket.Name)
+       return fmt.Sprintf("s3-bucket:%+q", v.bucket.Name)
 }
 
 // Writable returns false if all future Put, Mtime, and Delete calls
 // are expected to fail.
 func (v *S3Volume) Writable() bool {
-       return !v.readonly
+       return !v.ReadOnly
 }
 
 // Replication returns the storage redundancy of the underlying
 // device. Configured via command line flag.
 func (v *S3Volume) Replication() int {
-       return v.replication
+       return v.S3Replication
 }
 
 var s3KeepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
@@ -489,7 +535,7 @@ func (v *S3Volume) isKeepBlock(s string) bool {
 // there was a race between Put and Trash, fixRace recovers from the
 // race by Untrashing the block.
 func (v *S3Volume) fixRace(loc string) bool {
-       trash, err := v.Bucket.Head("trash/"+loc, nil)
+       trash, err := v.bucket.Head("trash/"+loc, nil)
        if err != nil {
                if !os.IsNotExist(v.translateError(err)) {
                        log.Printf("error: fixRace: HEAD %q: %s", "trash/"+loc, err)
@@ -502,7 +548,7 @@ func (v *S3Volume) fixRace(loc string) bool {
                return false
        }
 
-       recent, err := v.Bucket.Head("recent/"+loc, nil)
+       recent, err := v.bucket.Head("recent/"+loc, nil)
        if err != nil {
                log.Printf("error: fixRace: HEAD %q: %s", "recent/"+loc, err)
                return false
@@ -514,13 +560,13 @@ func (v *S3Volume) fixRace(loc string) bool {
        }
 
        ageWhenTrashed := trashTime.Sub(recentTime)
-       if ageWhenTrashed >= blobSignatureTTL {
+       if ageWhenTrashed >= theConfig.BlobSignatureTTL.Duration() {
                // No evidence of a race: block hasn't been written
                // since it became eligible for Trash. No fix needed.
                return false
        }
 
-       log.Printf("notice: fixRace: %q: trashed at %s but touched at %s (age when trashed = %s < %s)", loc, trashTime, recentTime, ageWhenTrashed, blobSignatureTTL)
+       log.Printf("notice: fixRace: %q: trashed at %s but touched at %s (age when trashed = %s < %s)", loc, trashTime, recentTime, ageWhenTrashed, theConfig.BlobSignatureTTL)
        log.Printf("notice: fixRace: copying %q to %q to recover from race between Put/Touch and Trash", "recent/"+loc, loc)
        err = v.safeCopy(loc, "trash/"+loc)
        if err != nil {
@@ -545,16 +591,16 @@ func (v *S3Volume) translateError(err error) error {
        return err
 }
 
-// EmptyTrash looks for trashed blocks that exceeded trashLifetime
+// EmptyTrash looks for trashed blocks that exceeded TrashLifetime
 // and deletes them from the volume.
 func (v *S3Volume) EmptyTrash() {
        var bytesInTrash, blocksInTrash, bytesDeleted, blocksDeleted int64
 
        // Use a merge sort to find matching sets of trash/X and recent/X.
        trashL := s3Lister{
-               Bucket:   v.Bucket,
+               Bucket:   v.bucket,
                Prefix:   "trash/",
-               PageSize: v.indexPageSize,
+               PageSize: v.IndexPageSize,
        }
        // Define "ready to delete" as "...when EmptyTrash started".
        startT := time.Now()
@@ -571,7 +617,7 @@ func (v *S3Volume) EmptyTrash() {
                        log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, trash.Key, trash.LastModified, err)
                        continue
                }
-               recent, err := v.Bucket.Head("recent/"+loc, nil)
+               recent, err := v.bucket.Head("recent/"+loc, nil)
                if err != nil && os.IsNotExist(v.translateError(err)) {
                        log.Printf("warning: %s: EmptyTrash: found trash marker %q but no %q (%s); calling Untrash", v, trash.Key, "recent/"+loc, err)
                        err = v.Untrash(loc)
@@ -588,21 +634,21 @@ func (v *S3Volume) EmptyTrash() {
                        log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, "recent/"+loc, recent.Header.Get("Last-Modified"), err)
                        continue
                }
-               if trashT.Sub(recentT) < blobSignatureTTL {
-                       if age := startT.Sub(recentT); age >= blobSignatureTTL-v.raceWindow {
+               if trashT.Sub(recentT) < theConfig.BlobSignatureTTL.Duration() {
+                       if age := startT.Sub(recentT); age >= theConfig.BlobSignatureTTL.Duration()-time.Duration(v.RaceWindow) {
                                // recent/loc is too old to protect
                                // loc from being Trashed again during
                                // the raceWindow that starts if we
                                // delete trash/X now.
                                //
-                               // Note this means (trashCheckInterval
-                               // < blobSignatureTTL - raceWindow) is
+                               // Note this means (TrashCheckInterval
+                               // < BlobSignatureTTL - raceWindow) is
                                // necessary to avoid starvation.
                                log.Printf("notice: %s: EmptyTrash: detected old race for %q, calling fixRace + Touch", v, loc)
                                v.fixRace(loc)
                                v.Touch(loc)
                                continue
-                       } else if _, err := v.Bucket.Head(loc, nil); os.IsNotExist(err) {
+                       } else if _, err := v.bucket.Head(loc, nil); os.IsNotExist(err) {
                                log.Printf("notice: %s: EmptyTrash: detected recent race for %q, calling fixRace", v, loc)
                                v.fixRace(loc)
                                continue
@@ -611,10 +657,10 @@ func (v *S3Volume) EmptyTrash() {
                                continue
                        }
                }
-               if startT.Sub(trashT) < trashLifetime {
+               if startT.Sub(trashT) < theConfig.TrashLifetime.Duration() {
                        continue
                }
-               err = v.Bucket.Del(trash.Key)
+               err = v.bucket.Del(trash.Key)
                if err != nil {
                        log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, trash.Key, err)
                        continue
@@ -622,9 +668,9 @@ func (v *S3Volume) EmptyTrash() {
                bytesDeleted += trash.Size
                blocksDeleted++
 
-               _, err = v.Bucket.Head(loc, nil)
+               _, err = v.bucket.Head(loc, nil)
                if os.IsNotExist(err) {
-                       err = v.Bucket.Del("recent/" + loc)
+                       err = v.bucket.Del("recent/" + loc)
                        if err != nil {
                                log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, "recent/"+loc, err)
                        }
index 6ba390426f51dbe4f8ef1d6ac2ce2a27b4d3f3b2..76dcbc9f9ea2f8fb680a25a31b84735f991b1b51 100644 (file)
@@ -4,23 +4,17 @@ import (
        "bytes"
        "crypto/md5"
        "fmt"
+       "io/ioutil"
        "log"
        "os"
        "time"
 
-       "github.com/AdRoll/goamz/aws"
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
        "github.com/AdRoll/goamz/s3"
        "github.com/AdRoll/goamz/s3/s3test"
        check "gopkg.in/check.v1"
 )
 
-type TestableS3Volume struct {
-       *S3Volume
-       server      *s3test.Server
-       c           *check.C
-       serverClock *fakeClock
-}
-
 const (
        TestBucketName = "testbucket"
 )
@@ -42,30 +36,6 @@ func init() {
        s3UnsafeDelete = true
 }
 
-func NewTestableS3Volume(c *check.C, raceWindow time.Duration, readonly bool, replication int) *TestableS3Volume {
-       clock := &fakeClock{}
-       srv, err := s3test.NewServer(&s3test.Config{Clock: clock})
-       c.Assert(err, check.IsNil)
-       auth := aws.Auth{}
-       region := aws.Region{
-               Name:                 "test-region-1",
-               S3Endpoint:           srv.URL(),
-               S3LocationConstraint: true,
-       }
-       bucket := &s3.Bucket{
-               S3:   s3.New(auth, region),
-               Name: TestBucketName,
-       }
-       err = bucket.PutBucket(s3.ACL("private"))
-       c.Assert(err, check.IsNil)
-
-       return &TestableS3Volume{
-               S3Volume:    NewS3Volume(auth, region, TestBucketName, raceWindow, readonly, replication),
-               server:      srv,
-               serverClock: clock,
-       }
-}
-
 var _ = check.Suite(&StubbedS3Suite{})
 
 type StubbedS3Suite struct {
@@ -76,19 +46,19 @@ func (s *StubbedS3Suite) TestGeneric(c *check.C) {
        DoGenericVolumeTests(c, func(t TB) TestableVolume {
                // Use a negative raceWindow so s3test's 1-second
                // timestamp precision doesn't confuse fixRace.
-               return NewTestableS3Volume(c, -2*time.Second, false, 2)
+               return s.newTestableVolume(c, -2*time.Second, false, 2)
        })
 }
 
 func (s *StubbedS3Suite) TestGenericReadOnly(c *check.C) {
        DoGenericVolumeTests(c, func(t TB) TestableVolume {
-               return NewTestableS3Volume(c, -2*time.Second, true, 2)
+               return s.newTestableVolume(c, -2*time.Second, true, 2)
        })
 }
 
 func (s *StubbedS3Suite) TestIndex(c *check.C) {
-       v := NewTestableS3Volume(c, 0, false, 2)
-       v.indexPageSize = 3
+       v := s.newTestableVolume(c, 0, false, 2)
+       v.IndexPageSize = 3
        for i := 0; i < 256; i++ {
                v.PutRaw(fmt.Sprintf("%02x%030x", i, i), []byte{102, 111, 111})
        }
@@ -112,14 +82,14 @@ func (s *StubbedS3Suite) TestIndex(c *check.C) {
 }
 
 func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
-       defer func(tl, bs time.Duration) {
-               trashLifetime = tl
-               blobSignatureTTL = bs
-       }(trashLifetime, blobSignatureTTL)
-       trashLifetime = time.Hour
-       blobSignatureTTL = time.Hour
+       defer func(tl, bs arvados.Duration) {
+               theConfig.TrashLifetime = tl
+               theConfig.BlobSignatureTTL = bs
+       }(theConfig.TrashLifetime, theConfig.BlobSignatureTTL)
+       theConfig.TrashLifetime.Set("1h")
+       theConfig.BlobSignatureTTL.Set("1h")
 
-       v := NewTestableS3Volume(c, 5*time.Minute, false, 2)
+       v := s.newTestableVolume(c, 5*time.Minute, false, 2)
        var none time.Time
 
        putS3Obj := func(t time.Time, key string, data []byte) {
@@ -127,7 +97,7 @@ func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
                        return
                }
                v.serverClock.now = &t
-               v.Bucket.Put(key, data, "application/octet-stream", s3ACL, s3.Options{})
+               v.bucket.Put(key, data, "application/octet-stream", s3ACL, s3.Options{})
        }
 
        t0 := time.Now()
@@ -214,12 +184,12 @@ func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
                        false, false, false, true, false, false,
                },
                {
-                       "Erroneously trashed during a race, detected before trashLifetime",
+                       "Erroneously trashed during a race, detected before TrashLifetime",
                        none, t0.Add(-30 * time.Minute), t0.Add(-29 * time.Minute),
                        true, false, true, true, true, false,
                },
                {
-                       "Erroneously trashed during a race, rescue during EmptyTrash despite reaching trashLifetime",
+                       "Erroneously trashed during a race, rescue during EmptyTrash despite reaching TrashLifetime",
                        none, t0.Add(-90 * time.Minute), t0.Add(-89 * time.Minute),
                        true, false, true, true, true, false,
                },
@@ -286,7 +256,7 @@ func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
                // freshAfterEmpty
                loc, blk = setupScenario()
                v.EmptyTrash()
-               _, err = v.Bucket.Head("trash/"+loc, nil)
+               _, err = v.bucket.Head("trash/"+loc, nil)
                c.Check(err == nil, check.Equals, scenario.haveTrashAfterEmpty)
                if scenario.freshAfterEmpty {
                        t, err := v.Mtime(loc)
@@ -307,9 +277,51 @@ func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
        }
 }
 
+type TestableS3Volume struct {
+       *S3Volume
+       server      *s3test.Server
+       c           *check.C
+       serverClock *fakeClock
+}
+
+func (s *StubbedS3Suite) newTestableVolume(c *check.C, raceWindow time.Duration, readonly bool, replication int) *TestableS3Volume {
+       clock := &fakeClock{}
+       srv, err := s3test.NewServer(&s3test.Config{Clock: clock})
+       c.Assert(err, check.IsNil)
+
+       tmp, err := ioutil.TempFile("", "keepstore")
+       c.Assert(err, check.IsNil)
+       defer os.Remove(tmp.Name())
+       _, err = tmp.Write([]byte("xxx\n"))
+       c.Assert(err, check.IsNil)
+       c.Assert(tmp.Close(), check.IsNil)
+
+       v := &TestableS3Volume{
+               S3Volume: &S3Volume{
+                       Bucket:             TestBucketName,
+                       AccessKeyFile:      tmp.Name(),
+                       SecretKeyFile:      tmp.Name(),
+                       Endpoint:           srv.URL(),
+                       Region:             "test-region-1",
+                       LocationConstraint: true,
+                       RaceWindow:         arvados.Duration(raceWindow),
+                       S3Replication:      replication,
+                       UnsafeDelete:       s3UnsafeDelete,
+                       ReadOnly:           readonly,
+                       IndexPageSize:      1000,
+               },
+               server:      srv,
+               serverClock: clock,
+       }
+       c.Assert(v.Start(), check.IsNil)
+       err = v.bucket.PutBucket(s3.ACL("private"))
+       c.Assert(err, check.IsNil)
+       return v
+}
+
 // PutRaw skips the ContentMD5 test
 func (v *TestableS3Volume) PutRaw(loc string, block []byte) {
-       err := v.Bucket.Put(loc, block, "application/octet-stream", s3ACL, s3.Options{})
+       err := v.bucket.Put(loc, block, "application/octet-stream", s3ACL, s3.Options{})
        if err != nil {
                log.Printf("PutRaw: %+v", err)
        }
@@ -320,7 +332,7 @@ func (v *TestableS3Volume) PutRaw(loc string, block []byte) {
 // while we do this.
 func (v *TestableS3Volume) TouchWithDate(locator string, lastPut time.Time) {
        v.serverClock.now = &lastPut
-       err := v.Bucket.Put("recent/"+locator, nil, "application/octet-stream", s3ACL, s3.Options{})
+       err := v.bucket.Put("recent/"+locator, nil, "application/octet-stream", s3ACL, s3.Options{})
        if err != nil {
                panic(err)
        }
index d11bc05192246a75e8ba4c95bd544b0712279ff6..27d6216d01633feca360de94f0a8febaabfb475a 100644 (file)
@@ -4,6 +4,8 @@ import (
        "errors"
        "log"
        "time"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
 )
 
 // RunTrashWorker is used by Keepstore to initiate trash worker channel goroutine.
@@ -23,13 +25,13 @@ func RunTrashWorker(trashq *WorkQueue) {
 // TrashItem deletes the indicated block from every writable volume.
 func TrashItem(trashRequest TrashRequest) {
        reqMtime := time.Unix(0, trashRequest.BlockMtime)
-       if time.Since(reqMtime) < blobSignatureTTL {
+       if time.Since(reqMtime) < theConfig.BlobSignatureTTL.Duration() {
                log.Printf("WARNING: data manager asked to delete a %v old block %v (BlockMtime %d = %v), but my blobSignatureTTL is %v! Skipping.",
-                       time.Since(reqMtime),
+                       arvados.Duration(time.Since(reqMtime)),
                        trashRequest.Locator,
                        trashRequest.BlockMtime,
                        reqMtime,
-                       blobSignatureTTL)
+                       theConfig.BlobSignatureTTL)
                return
        }
 
@@ -44,8 +46,8 @@ func TrashItem(trashRequest TrashRequest) {
                        continue
                }
 
-               if neverDelete {
-                       err = errors.New("did not delete block because neverDelete is true")
+               if !theConfig.EnableDelete {
+                       err = errors.New("did not delete block because EnableDelete is false")
                } else {
                        err = volume.Trash(trashRequest.Locator)
                }
index 94798d95acfd85216ad60982b71282d84530ef7d..5ec413d1bde899d606a6792f40ffd3afe65f3615 100644 (file)
@@ -31,7 +31,7 @@ type TrashWorkerTestData struct {
    Expect no errors.
 */
 func TestTrashWorkerIntegration_GetNonExistingLocator(t *testing.T) {
-       neverDelete = false
+       theConfig.EnableDelete = true
        testData := TrashWorkerTestData{
                Locator1: "5d41402abc4b2a76b9719d911017c592",
                Block1:   []byte("hello"),
@@ -53,7 +53,7 @@ func TestTrashWorkerIntegration_GetNonExistingLocator(t *testing.T) {
    Expect the second locator in volume 2 to be unaffected.
 */
 func TestTrashWorkerIntegration_LocatorInVolume1(t *testing.T) {
-       neverDelete = false
+       theConfig.EnableDelete = true
        testData := TrashWorkerTestData{
                Locator1: TestHash,
                Block1:   TestBlock,
@@ -75,7 +75,7 @@ func TestTrashWorkerIntegration_LocatorInVolume1(t *testing.T) {
    Expect the first locator in volume 1 to be unaffected.
 */
 func TestTrashWorkerIntegration_LocatorInVolume2(t *testing.T) {
-       neverDelete = false
+       theConfig.EnableDelete = true
        testData := TrashWorkerTestData{
                Locator1: TestHash,
                Block1:   TestBlock,
@@ -97,7 +97,7 @@ func TestTrashWorkerIntegration_LocatorInVolume2(t *testing.T) {
    Expect locator to be deleted from both volumes.
 */
 func TestTrashWorkerIntegration_LocatorInBothVolumes(t *testing.T) {
-       neverDelete = false
+       theConfig.EnableDelete = true
        testData := TrashWorkerTestData{
                Locator1: TestHash,
                Block1:   TestBlock,
@@ -119,7 +119,7 @@ func TestTrashWorkerIntegration_LocatorInBothVolumes(t *testing.T) {
    Delete the second and expect the first to be still around.
 */
 func TestTrashWorkerIntegration_MtimeMatchesForLocator1ButNotForLocator2(t *testing.T) {
-       neverDelete = false
+       theConfig.EnableDelete = true
        testData := TrashWorkerTestData{
                Locator1: TestHash,
                Block1:   TestBlock,
@@ -143,7 +143,7 @@ func TestTrashWorkerIntegration_MtimeMatchesForLocator1ButNotForLocator2(t *test
    Expect the other unaffected.
 */
 func TestTrashWorkerIntegration_TwoDifferentLocatorsInVolume1(t *testing.T) {
-       neverDelete = false
+       theConfig.EnableDelete = true
        testData := TrashWorkerTestData{
                Locator1: TestHash,
                Block1:   TestBlock,
@@ -166,7 +166,7 @@ func TestTrashWorkerIntegration_TwoDifferentLocatorsInVolume1(t *testing.T) {
    will not be deleted because its Mtime is within the trash life time.
 */
 func TestTrashWorkerIntegration_SameLocatorInTwoVolumesWithDefaultTrashLifeTime(t *testing.T) {
-       neverDelete = false
+       theConfig.EnableDelete = true
        testData := TrashWorkerTestData{
                Locator1: TestHash,
                Block1:   TestBlock,
@@ -188,11 +188,11 @@ func TestTrashWorkerIntegration_SameLocatorInTwoVolumesWithDefaultTrashLifeTime(
        performTrashWorkerTest(testData, t)
 }
 
-/* Delete a block with matching mtime for locator in both volumes, but neverDelete is true,
+/* Delete a block with matching mtime for locator in both volumes, but EnableDelete is false,
    so block won't be deleted.
 */
-func TestTrashWorkerIntegration_NeverDelete(t *testing.T) {
-       neverDelete = true
+func TestTrashWorkerIntegration_DisabledDelete(t *testing.T) {
+       theConfig.EnableDelete = false
        testData := TrashWorkerTestData{
                Locator1: TestHash,
                Block1:   TestBlock,
@@ -231,7 +231,7 @@ func performTrashWorkerTest(testData TrashWorkerTestData, t *testing.T) {
                }
        }
 
-       oldBlockTime := time.Now().Add(-blobSignatureTTL - time.Minute)
+       oldBlockTime := time.Now().Add(-theConfig.BlobSignatureTTL.Duration() - time.Minute)
 
        // Create TrashRequest for the test
        trashRequest := TrashRequest{
diff --git a/services/keepstore/usage.go b/services/keepstore/usage.go
new file mode 100644 (file)
index 0000000..29f89f5
--- /dev/null
@@ -0,0 +1,124 @@
+package main
+
+import (
+       "flag"
+       "fmt"
+       "os"
+       "sort"
+       "strings"
+
+       "github.com/ghodss/yaml"
+)
+
+func usage() {
+       c := DefaultConfig()
+       knownTypes := []string{}
+       for _, vt := range VolumeTypes {
+               c.Volumes = append(c.Volumes, vt().Examples()...)
+               knownTypes = append(knownTypes, vt().Type())
+       }
+       exampleConfigFile, err := yaml.Marshal(c)
+       if err != nil {
+               panic(err)
+       }
+       sort.Strings(knownTypes)
+       knownTypeList := strings.Join(knownTypes, ", ")
+       fmt.Fprintf(os.Stderr, `
+
+keepstore provides a content-addressed data store backed by a local filesystem or networked storage.
+
+Usage: keepstore -config path/to/keepstore.yml
+       keepstore [OPTIONS] -dump-config
+
+NOTE: All options (other than -config) are deprecated in favor of YAML
+      configuration. Use -dump-config to translate existing
+      configurations to YAML format.
+
+Options:
+`)
+       flag.PrintDefaults()
+       fmt.Fprintf(os.Stderr, `
+Example config file:
+
+%s
+
+Listen:
+
+    Local port to listen on. Can be "address:port" or ":port", where
+    "address" is a host IP address or name and "port" is a port number
+    or name.
+
+PIDFile:
+
+   Path to write PID file during startup. This file is kept open and
+   locked with LOCK_EX until keepstore exits, so "fuser -k pidfile" is
+   one way to shut down. Exit immediately if there is an error
+   opening, locking, or writing the PID file.
+
+MaxBuffers:
+
+    Maximum RAM to use for data buffers, given in multiples of block
+    size (64 MiB). When this limit is reached, HTTP requests requiring
+    buffers (like GET and PUT) will wait for buffer space to be
+    released.
+
+MaxRequests:
+
+    Maximum concurrent requests. When this limit is reached, new
+    requests will receive 503 responses. Note: this limit does not
+    include idle connections from clients using HTTP keepalive, so it
+    does not strictly limit the number of concurrent connections. If
+    omitted or zero, the default is 2 * MaxBuffers.
+
+BlobSigningKeyFile:
+
+    Local file containing the secret blob signing key (used to
+    generate and verify blob signatures).  This key should be
+    identical to the API server's blob_signing_key configuration
+    entry.
+
+RequireSignatures:
+
+    Honor read requests only if a valid signature is provided.  This
+    should be true, except for development use and when migrating from
+    a very old version.
+
+BlobSignatureTTL:
+
+    Duration for which new permission signatures (returned in PUT
+    responses) will be valid.  This should be equal to the API
+    server's blob_signature_ttl configuration entry.
+
+SystemAuthTokenFile:
+
+    Local file containing the Arvados API token used by keep-balance
+    or data manager.  Delete, trash, and index requests are honored
+    only for this token.
+
+EnableDelete:
+
+    Enable trash and delete features. If false, trash lists will be
+    accepted but blocks will not be trashed or deleted.
+
+TrashLifetime:
+
+    Time duration after a block is trashed during which it can be
+    recovered using an /untrash request.
+
+TrashCheckInterval:
+
+    How often to check for (and delete) trashed blocks whose
+    TrashLifetime has expired.
+
+Volumes:
+
+    List of storage volumes. If omitted or empty, the default is to
+    use all directories named "keep" that exist in the top level
+    directory of a mount point at startup time.
+
+    Volume types: %s
+
+    (See volume configuration examples above.)
+
+`, exampleConfigFile, knownTypeList)
+}
index 8ae6660fd477fa90365a019c837a121a08cc9595..6e01e75b879b339232603d38f93cb040ecc6d86c 100644 (file)
@@ -10,6 +10,15 @@ import (
 // for example, a single mounted disk, a RAID array, an Amazon S3 volume,
 // etc.
 type Volume interface {
+       // Volume type as specified in config file. Examples: "S3",
+       // "Directory".
+       Type() string
+
+       // Do whatever private setup tasks and configuration checks
+       // are needed. Return non-nil if the volume is unusable (e.g.,
+       // invalid config).
+       Start() error
+
        // Get a block: copy the block data into buf, and return the
        // number of bytes copied.
        //
@@ -150,7 +159,7 @@ type Volume interface {
        // loc is as described in Get.
        //
        // If the timestamp for the given locator is newer than
-       // blobSignatureTTL, Trash must not trash the data.
+       // BlobSignatureTTL, Trash must not trash the data.
        //
        // If a Trash operation overlaps with any Touch or Put
        // operations on the same locator, the implementation must
@@ -171,7 +180,7 @@ type Volume interface {
        // reliably or fail outright.
        //
        // Corollary: A successful Touch or Put guarantees a block
-       // will not be trashed for at least blobSignatureTTL
+       // will not be trashed for at least BlobSignatureTTL
        // seconds.
        Trash(loc string) error
 
@@ -204,11 +213,18 @@ type Volume interface {
        // responses to PUT requests.
        Replication() int
 
-       // EmptyTrash looks for trashed blocks that exceeded trashLifetime
+       // EmptyTrash looks for trashed blocks that exceeded TrashLifetime
        // and deletes them from the volume.
        EmptyTrash()
 }
 
+// A VolumeWithExamples provides example configs to display in the
+// -help message.
+type VolumeWithExamples interface {
+       Volume
+       Examples() []Volume
+}
+
 // A VolumeManager tells callers which volumes can read, which volumes
 // can write, and on which volume the next write should be attempted.
 type VolumeManager interface {
index bc3e537a89a815037102af7fb920e8b9d2b84f61..1738fe9b513bb4d86482ceede86a04539d29d418 100644 (file)
@@ -11,6 +11,7 @@ import (
        "strings"
        "time"
 
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
        "git.curoverse.com/arvados.git/sdk/go/arvadostest"
 )
 
@@ -430,7 +431,7 @@ func testIndexTo(t TB, factory TestableVolumeFactory) {
 func testDeleteNewBlock(t TB, factory TestableVolumeFactory) {
        v := factory(t)
        defer v.Teardown()
-       blobSignatureTTL = 300 * time.Second
+       theConfig.BlobSignatureTTL.Set("5m")
 
        if v.Writable() == false {
                return
@@ -451,19 +452,19 @@ func testDeleteNewBlock(t TB, factory TestableVolumeFactory) {
 }
 
 // Calling Delete() for a block with a timestamp older than
-// blobSignatureTTL seconds in the past should delete the data.
+// BlobSignatureTTL seconds in the past should delete the data.
 // Test is intended for only writable volumes
 func testDeleteOldBlock(t TB, factory TestableVolumeFactory) {
        v := factory(t)
        defer v.Teardown()
-       blobSignatureTTL = 300 * time.Second
+       theConfig.BlobSignatureTTL.Set("5m")
 
        if v.Writable() == false {
                return
        }
 
        v.Put(TestHash, TestBlock)
-       v.TouchWithDate(TestHash, time.Now().Add(-2*blobSignatureTTL))
+       v.TouchWithDate(TestHash, time.Now().Add(-2*theConfig.BlobSignatureTTL.Duration()))
 
        if err := v.Trash(TestHash); err != nil {
                t.Error(err)
@@ -733,7 +734,7 @@ func testPutFullBlock(t TB, factory TestableVolumeFactory) {
        }
 }
 
-// With trashLifetime != 0, perform:
+// With TrashLifetime != 0, perform:
 // Trash an old block - which either raises ErrNotImplemented or succeeds
 // Untrash -  which either raises ErrNotImplemented or succeeds
 // Get - which must succeed
@@ -741,14 +742,14 @@ func testTrashUntrash(t TB, factory TestableVolumeFactory) {
        v := factory(t)
        defer v.Teardown()
        defer func() {
-               trashLifetime = 0
+               theConfig.TrashLifetime = 0
        }()
 
-       trashLifetime = 3600 * time.Second
+       theConfig.TrashLifetime.Set("1h")
 
        // put block and backdate it
        v.PutRaw(TestHash, TestBlock)
-       v.TouchWithDate(TestHash, time.Now().Add(-2*blobSignatureTTL))
+       v.TouchWithDate(TestHash, time.Now().Add(-2*theConfig.BlobSignatureTTL.Duration()))
 
        buf := make([]byte, BlockSize)
        n, err := v.Get(TestHash, buf)
@@ -795,9 +796,9 @@ func testTrashUntrash(t TB, factory TestableVolumeFactory) {
 func testTrashEmptyTrashUntrash(t TB, factory TestableVolumeFactory) {
        v := factory(t)
        defer v.Teardown()
-       defer func(orig time.Duration) {
-               trashLifetime = orig
-       }(trashLifetime)
+       defer func(orig arvados.Duration) {
+               theConfig.TrashLifetime = orig
+       }(theConfig.TrashLifetime)
 
        checkGet := func() error {
                buf := make([]byte, BlockSize)
@@ -830,10 +831,10 @@ func testTrashEmptyTrashUntrash(t TB, factory TestableVolumeFactory) {
 
        // First set: EmptyTrash before reaching the trash deadline.
 
-       trashLifetime = time.Hour
+       theConfig.TrashLifetime.Set("1h")
 
        v.PutRaw(TestHash, TestBlock)
-       v.TouchWithDate(TestHash, time.Now().Add(-2*blobSignatureTTL))
+       v.TouchWithDate(TestHash, time.Now().Add(-2*theConfig.BlobSignatureTTL.Duration()))
 
        err := checkGet()
        if err != nil {
@@ -844,7 +845,7 @@ func testTrashEmptyTrashUntrash(t TB, factory TestableVolumeFactory) {
        err = v.Trash(TestHash)
        if err == MethodDisabledError || err == ErrNotImplemented {
                // Skip the trash tests for read-only volumes, and
-               // volume types that don't support trashLifetime>0.
+               // volume types that don't support TrashLifetime>0.
                return
        }
 
@@ -878,7 +879,7 @@ func testTrashEmptyTrashUntrash(t TB, factory TestableVolumeFactory) {
        }
 
        // Because we Touch'ed, need to backdate again for next set of tests
-       v.TouchWithDate(TestHash, time.Now().Add(-2*blobSignatureTTL))
+       v.TouchWithDate(TestHash, time.Now().Add(-2*theConfig.BlobSignatureTTL.Duration()))
 
        // If the only block in the trash has already been untrashed,
        // most volumes will fail a subsequent Untrash with a 404, but
@@ -896,11 +897,11 @@ func testTrashEmptyTrashUntrash(t TB, factory TestableVolumeFactory) {
        }
 
        // Untrash might have updated the timestamp, so backdate again
-       v.TouchWithDate(TestHash, time.Now().Add(-2*blobSignatureTTL))
+       v.TouchWithDate(TestHash, time.Now().Add(-2*theConfig.BlobSignatureTTL.Duration()))
 
        // Second set: EmptyTrash after the trash deadline has passed.
 
-       trashLifetime = time.Nanosecond
+       theConfig.TrashLifetime.Set("1ns")
 
        err = v.Trash(TestHash)
        if err != nil {
@@ -925,7 +926,7 @@ func testTrashEmptyTrashUntrash(t TB, factory TestableVolumeFactory) {
        // Trash it again, and this time call EmptyTrash so it really
        // goes away.
        // (In Azure volumes, un/trash changes Mtime, so first backdate again)
-       v.TouchWithDate(TestHash, time.Now().Add(-2*blobSignatureTTL))
+       v.TouchWithDate(TestHash, time.Now().Add(-2*theConfig.BlobSignatureTTL.Duration()))
        err = v.Trash(TestHash)
        err = checkGet()
        if err == nil || !os.IsNotExist(err) {
@@ -950,9 +951,9 @@ func testTrashEmptyTrashUntrash(t TB, factory TestableVolumeFactory) {
        // un-trashed copy doesn't get deleted along with it.
 
        v.PutRaw(TestHash, TestBlock)
-       v.TouchWithDate(TestHash, time.Now().Add(-2*blobSignatureTTL))
+       v.TouchWithDate(TestHash, time.Now().Add(-2*theConfig.BlobSignatureTTL.Duration()))
 
-       trashLifetime = time.Nanosecond
+       theConfig.TrashLifetime.Set("1ns")
        err = v.Trash(TestHash)
        if err != nil {
                t.Fatal(err)
@@ -963,7 +964,7 @@ func testTrashEmptyTrashUntrash(t TB, factory TestableVolumeFactory) {
        }
 
        v.PutRaw(TestHash, TestBlock)
-       v.TouchWithDate(TestHash, time.Now().Add(-2*blobSignatureTTL))
+       v.TouchWithDate(TestHash, time.Now().Add(-2*theConfig.BlobSignatureTTL.Duration()))
 
        // EmptyTrash should not delete the untrashed copy.
        v.EmptyTrash()
@@ -978,18 +979,18 @@ func testTrashEmptyTrashUntrash(t TB, factory TestableVolumeFactory) {
        // untrash the block whose deadline is "C".
 
        v.PutRaw(TestHash, TestBlock)
-       v.TouchWithDate(TestHash, time.Now().Add(-2*blobSignatureTTL))
+       v.TouchWithDate(TestHash, time.Now().Add(-2*theConfig.BlobSignatureTTL.Duration()))
 
-       trashLifetime = time.Nanosecond
+       theConfig.TrashLifetime.Set("1ns")
        err = v.Trash(TestHash)
        if err != nil {
                t.Fatal(err)
        }
 
        v.PutRaw(TestHash, TestBlock)
-       v.TouchWithDate(TestHash, time.Now().Add(-2*blobSignatureTTL))
+       v.TouchWithDate(TestHash, time.Now().Add(-2*theConfig.BlobSignatureTTL.Duration()))
 
-       trashLifetime = time.Hour
+       theConfig.TrashLifetime.Set("1h")
        err = v.Trash(TestHash)
        if err != nil {
                t.Fatal(err)
index 5671b8d4a9fd7405f8ca7fd35a18fdf10289a059..6ab386aec4fcc7774af90c4fa5ca879258ac9404 100644 (file)
@@ -189,7 +189,7 @@ func (v *MockVolume) Trash(loc string) error {
                return MethodDisabledError
        }
        if _, ok := v.Store[loc]; ok {
-               if time.Since(v.Timestamps[loc]) < blobSignatureTTL {
+               if time.Since(v.Timestamps[loc]) < time.Duration(theConfig.BlobSignatureTTL) {
                        return nil
                }
                delete(v.Store, loc)
@@ -198,7 +198,14 @@ func (v *MockVolume) Trash(loc string) error {
        return os.ErrNotExist
 }
 
-// TBD
+func (v *MockVolume) Type() string {
+       return "Mock"
+}
+
+func (v *MockVolume) Start() error {
+       return nil
+}
+
 func (v *MockVolume) Untrash(loc string) error {
        return nil
 }
index 5982fb0484eae0a37ef09e24b9526492c5b0459f..b5753dec04638927162a328d2a43f2fd4e567a50 100644 (file)
@@ -2,7 +2,6 @@ package main
 
 import (
        "bufio"
-       "errors"
        "flag"
        "fmt"
        "io"
@@ -19,11 +18,16 @@ import (
 )
 
 type unixVolumeAdder struct {
-       *volumeSet
+       *Config
 }
 
-func (vs *unixVolumeAdder) Set(value string) error {
-       if dirs := strings.Split(value, ","); len(dirs) > 1 {
+// String implements flag.Value
+func (s *unixVolumeAdder) String() string {
+       return "-"
+}
+
+func (vs *unixVolumeAdder) Set(path string) error {
+       if dirs := strings.Split(path, ","); len(dirs) > 1 {
                log.Print("DEPRECATED: using comma-separated volume list.")
                for _, dir := range dirs {
                        if err := vs.Set(dir); err != nil {
@@ -32,33 +36,19 @@ func (vs *unixVolumeAdder) Set(value string) error {
                }
                return nil
        }
-       if len(value) == 0 || value[0] != '/' {
-               return errors.New("Invalid volume: must begin with '/'.")
-       }
-       if _, err := os.Stat(value); err != nil {
-               return err
-       }
-       var locker sync.Locker
-       if flagSerializeIO {
-               locker = &sync.Mutex{}
-       }
-       *vs.volumeSet = append(*vs.volumeSet, &UnixVolume{
-               root:     value,
-               locker:   locker,
-               readonly: flagReadonly,
+       vs.Config.Volumes = append(vs.Config.Volumes, &UnixVolume{
+               Root:      path,
+               ReadOnly:  deprecated.flagReadonly,
+               Serialize: deprecated.flagSerializeIO,
        })
        return nil
 }
 
 func init() {
-       flag.Var(
-               &unixVolumeAdder{&volumes},
-               "volumes",
-               "Deprecated synonym for -volume.")
-       flag.Var(
-               &unixVolumeAdder{&volumes},
-               "volume",
-               "Local storage directory. Can be given more than once to add multiple directories. If none are supplied, the default is to use all directories named \"keep\" that exist in the top level directory of a mount point at startup time. Can be a comma-separated list, but this is deprecated: use multiple -volume arguments instead.")
+       VolumeTypes = append(VolumeTypes, func() VolumeWithExamples { return &UnixVolume{} })
+
+       flag.Var(&unixVolumeAdder{theConfig}, "volumes", "see Volumes configuration")
+       flag.Var(&unixVolumeAdder{theConfig}, "volume", "see Volumes configuration")
 }
 
 // Discover adds a UnixVolume for every directory named "keep" that is
@@ -89,10 +79,10 @@ func (vs *unixVolumeAdder) Discover() int {
                }
                // Set the -readonly flag (but only for this volume)
                // if the filesystem is mounted readonly.
-               flagReadonlyWas := flagReadonly
+               flagReadonlyWas := deprecated.flagReadonly
                for _, fsopt := range strings.Split(args[3], ",") {
                        if fsopt == "ro" {
-                               flagReadonly = true
+                               deprecated.flagReadonly = true
                                break
                        }
                        if fsopt == "rw" {
@@ -104,24 +94,62 @@ func (vs *unixVolumeAdder) Discover() int {
                } else {
                        added++
                }
-               flagReadonly = flagReadonlyWas
+               deprecated.flagReadonly = flagReadonlyWas
        }
        return added
 }
 
 // A UnixVolume stores and retrieves blocks in a local directory.
 type UnixVolume struct {
-       // path to the volume's root directory
-       root string
+       Root                 string // path to the volume's root directory
+       ReadOnly             bool
+       Serialize            bool
+       DirectoryReplication int
+
        // something to lock during IO, typically a sync.Mutex (or nil
        // to skip locking)
-       locker   sync.Locker
-       readonly bool
+       locker sync.Locker
+}
+
+// Examples implements VolumeWithExamples.
+func (*UnixVolume) Examples() []Volume {
+       return []Volume{
+               &UnixVolume{
+                       Root:                 "/mnt/local-disk",
+                       Serialize:            true,
+                       DirectoryReplication: 1,
+               },
+               &UnixVolume{
+                       Root:                 "/mnt/network-disk",
+                       Serialize:            false,
+                       DirectoryReplication: 2,
+               },
+       }
+}
+
+// Type implements Volume
+func (v *UnixVolume) Type() string {
+       return "Directory"
+}
+
+// Start implements Volume
+func (v *UnixVolume) Start() error {
+       if v.Serialize {
+               v.locker = &sync.Mutex{}
+       }
+       if !strings.HasPrefix(v.Root, "/") {
+               return fmt.Errorf("volume root does not start with '/': %q", v.Root)
+       }
+       if v.DirectoryReplication == 0 {
+               v.DirectoryReplication = 1
+       }
+       _, err := os.Stat(v.Root)
+       return err
 }
 
 // Touch sets the timestamp for the given locator to the current time
 func (v *UnixVolume) Touch(loc string) error {
-       if v.readonly {
+       if v.ReadOnly {
                return MethodDisabledError
        }
        p := v.blockPath(loc)
@@ -218,7 +246,7 @@ func (v *UnixVolume) Compare(loc string, expect []byte) error {
 // returns a FullError.  If the write fails due to some other error,
 // that error is returned.
 func (v *UnixVolume) Put(loc string, block []byte) error {
-       if v.readonly {
+       if v.ReadOnly {
                return MethodDisabledError
        }
        if v.IsFull() {
@@ -268,14 +296,14 @@ func (v *UnixVolume) Status() *VolumeStatus {
        var fs syscall.Statfs_t
        var devnum uint64
 
-       if fi, err := os.Stat(v.root); err == nil {
+       if fi, err := os.Stat(v.Root); err == nil {
                devnum = fi.Sys().(*syscall.Stat_t).Dev
        } else {
                log.Printf("%s: os.Stat: %s\n", v, err)
                return nil
        }
 
-       err := syscall.Statfs(v.root, &fs)
+       err := syscall.Statfs(v.Root, &fs)
        if err != nil {
                log.Printf("%s: statfs: %s\n", v, err)
                return nil
@@ -285,7 +313,7 @@ func (v *UnixVolume) Status() *VolumeStatus {
        // uses fs.Blocks - fs.Bfree.
        free := fs.Bavail * uint64(fs.Bsize)
        used := (fs.Blocks - fs.Bfree) * uint64(fs.Bsize)
-       return &VolumeStatus{v.root, devnum, free, used}
+       return &VolumeStatus{v.Root, devnum, free, used}
 }
 
 var blockDirRe = regexp.MustCompile(`^[0-9a-f]+$`)
@@ -307,7 +335,7 @@ var blockFileRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
 //
 func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
        var lastErr error
-       rootdir, err := os.Open(v.root)
+       rootdir, err := os.Open(v.Root)
        if err != nil {
                return err
        }
@@ -326,7 +354,7 @@ func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
                if !blockDirRe.MatchString(names[0]) {
                        continue
                }
-               blockdirpath := filepath.Join(v.root, names[0])
+               blockdirpath := filepath.Join(v.Root, names[0])
                blockdir, err := os.Open(blockdirpath)
                if err != nil {
                        log.Print("Error reading ", blockdirpath, ": ", err)
@@ -360,9 +388,9 @@ func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
 }
 
 // Trash trashes the block data from the unix storage
-// If trashLifetime == 0, the block is deleted
+// If TrashLifetime == 0, the block is deleted
 // Else, the block is renamed as path/{loc}.trash.{deadline},
-// where deadline = now + trashLifetime
+// where deadline = now + TrashLifetime
 func (v *UnixVolume) Trash(loc string) error {
        // Touch() must be called before calling Write() on a block.  Touch()
        // also uses lockfile().  This avoids a race condition between Write()
@@ -372,7 +400,7 @@ func (v *UnixVolume) Trash(loc string) error {
        // Trash() will read the correct up-to-date timestamp and choose not to
        // trash the file.
 
-       if v.readonly {
+       if v.ReadOnly {
                return MethodDisabledError
        }
        if v.locker != nil {
@@ -397,21 +425,21 @@ func (v *UnixVolume) Trash(loc string) error {
        // anyway (because the permission signatures have expired).
        if fi, err := os.Stat(p); err != nil {
                return err
-       } else if time.Since(fi.ModTime()) < blobSignatureTTL {
+       } else if time.Since(fi.ModTime()) < time.Duration(theConfig.BlobSignatureTTL) {
                return nil
        }
 
-       if trashLifetime == 0 {
+       if theConfig.TrashLifetime == 0 {
                return os.Remove(p)
        }
-       return os.Rename(p, fmt.Sprintf("%v.trash.%d", p, time.Now().Add(trashLifetime).Unix()))
+       return os.Rename(p, fmt.Sprintf("%v.trash.%d", p, time.Now().Add(theConfig.TrashLifetime.Duration()).Unix()))
 }
 
 // Untrash moves block from trash back into store
 // Look for path/{loc}.trash.{deadline} in storage,
 // and rename the first such file as path/{loc}
 func (v *UnixVolume) Untrash(loc string) (err error) {
-       if v.readonly {
+       if v.ReadOnly {
                return MethodDisabledError
        }
 
@@ -446,7 +474,7 @@ func (v *UnixVolume) Untrash(loc string) (err error) {
 // blockDir returns the fully qualified directory name for the directory
 // where loc is (or would be) stored on this volume.
 func (v *UnixVolume) blockDir(loc string) string {
-       return filepath.Join(v.root, loc[0:3])
+       return filepath.Join(v.Root, loc[0:3])
 }
 
 // blockPath returns the fully qualified pathname for the path to loc
@@ -459,7 +487,7 @@ func (v *UnixVolume) blockPath(loc string) string {
 // MinFreeKilobytes.
 //
 func (v *UnixVolume) IsFull() (isFull bool) {
-       fullSymlink := v.root + "/full"
+       fullSymlink := v.Root + "/full"
 
        // Check if the volume has been marked as full in the last hour.
        if link, err := os.Readlink(fullSymlink); err == nil {
@@ -491,7 +519,7 @@ func (v *UnixVolume) IsFull() (isFull bool) {
 //
 func (v *UnixVolume) FreeDiskSpace() (free uint64, err error) {
        var fs syscall.Statfs_t
-       err = syscall.Statfs(v.root, &fs)
+       err = syscall.Statfs(v.Root, &fs)
        if err == nil {
                // Statfs output is not guaranteed to measure free
                // space in terms of 1K blocks.
@@ -501,19 +529,19 @@ func (v *UnixVolume) FreeDiskSpace() (free uint64, err error) {
 }
 
 func (v *UnixVolume) String() string {
-       return fmt.Sprintf("[UnixVolume %s]", v.root)
+       return fmt.Sprintf("[UnixVolume %s]", v.Root)
 }
 
 // Writable returns false if all future Put, Mtime, and Delete calls
 // are expected to fail.
 func (v *UnixVolume) Writable() bool {
-       return !v.readonly
+       return !v.ReadOnly
 }
 
 // Replication returns the number of replicas promised by the
-// underlying device (currently assumed to be 1).
+// underlying device (as specified in configuration).
 func (v *UnixVolume) Replication() int {
-       return 1
+       return v.DirectoryReplication
 }
 
 // lockfile and unlockfile use flock(2) to manage kernel file locks.
@@ -546,7 +574,7 @@ func (v *UnixVolume) EmptyTrash() {
        var bytesDeleted, bytesInTrash int64
        var blocksDeleted, blocksInTrash int
 
-       err := filepath.Walk(v.root, func(path string, info os.FileInfo, err error) error {
+       err := filepath.Walk(v.Root, func(path string, info os.FileInfo, err error) error {
                if err != nil {
                        log.Printf("EmptyTrash: filepath.Walk: %v: %v", path, err)
                        return nil
index c95538bc4da380f7af5561984d7a069324cea970..887247d3c3956e9475edf8437c913b3e1fc922c9 100644 (file)
@@ -30,9 +30,9 @@ func NewTestableUnixVolume(t TB, serialize bool, readonly bool) *TestableUnixVol
        }
        return &TestableUnixVolume{
                UnixVolume: UnixVolume{
-                       root:     d,
+                       Root:     d,
+                       ReadOnly: readonly,
                        locker:   locker,
-                       readonly: readonly,
                },
                t: t,
        }
@@ -42,9 +42,9 @@ func NewTestableUnixVolume(t TB, serialize bool, readonly bool) *TestableUnixVol
 // the volume is readonly.
 func (v *TestableUnixVolume) PutRaw(locator string, data []byte) {
        defer func(orig bool) {
-               v.readonly = orig
-       }(v.readonly)
-       v.readonly = false
+               v.ReadOnly = orig
+       }(v.ReadOnly)
+       v.ReadOnly = false
        err := v.Put(locator, data)
        if err != nil {
                v.t.Fatal(err)
@@ -59,7 +59,7 @@ func (v *TestableUnixVolume) TouchWithDate(locator string, lastPut time.Time) {
 }
 
 func (v *TestableUnixVolume) Teardown() {
-       if err := os.RemoveAll(v.root); err != nil {
+       if err := os.RemoveAll(v.Root); err != nil {
                v.t.Fatal(err)
        }
 }
@@ -101,6 +101,19 @@ func TestUnixVolumeHandlersWithGenericVolumeTests(t *testing.T) {
        })
 }
 
+func TestReplicationDefault1(t *testing.T) {
+       v := &UnixVolume{
+               Root:     "/",
+               ReadOnly: true,
+       }
+       if err := v.Start(); err != nil {
+               t.Error(err)
+       }
+       if got := v.Replication(); got != 1 {
+               t.Errorf("Replication() returned %d, expected 1 if no config given", got)
+       }
+}
+
 func TestGetNotFound(t *testing.T) {
        v := NewTestableUnixVolume(t, false, false)
        defer v.Teardown()
@@ -126,7 +139,7 @@ func TestPut(t *testing.T) {
        if err != nil {
                t.Error(err)
        }
-       p := fmt.Sprintf("%s/%s/%s", v.root, TestHash[:3], TestHash)
+       p := fmt.Sprintf("%s/%s/%s", v.Root, TestHash[:3], TestHash)
        if buf, err := ioutil.ReadFile(p); err != nil {
                t.Error(err)
        } else if bytes.Compare(buf, TestBlock) != 0 {
@@ -139,7 +152,7 @@ func TestPutBadVolume(t *testing.T) {
        v := NewTestableUnixVolume(t, false, false)
        defer v.Teardown()
 
-       os.Chmod(v.root, 000)
+       os.Chmod(v.Root, 000)
        err := v.Put(TestHash, TestBlock)
        if err == nil {
                t.Error("Write should have failed")
@@ -178,7 +191,7 @@ func TestIsFull(t *testing.T) {
        v := NewTestableUnixVolume(t, false, false)
        defer v.Teardown()
 
-       fullPath := v.root + "/full"
+       fullPath := v.Root + "/full"
        now := fmt.Sprintf("%d", time.Now().Unix())
        os.Symlink(now, fullPath)
        if !v.IsFull() {
@@ -200,8 +213,8 @@ func TestNodeStatus(t *testing.T) {
 
        // Get node status and make a basic sanity check.
        volinfo := v.Status()
-       if volinfo.MountPoint != v.root {
-               t.Errorf("GetNodeStatus mount_point %s, expected %s", volinfo.MountPoint, v.root)
+       if volinfo.MountPoint != v.Root {
+               t.Errorf("GetNodeStatus mount_point %s, expected %s", volinfo.MountPoint, v.Root)
        }
        if volinfo.DeviceNum == 0 {
                t.Errorf("uninitialized device_num in %v", volinfo)
@@ -301,7 +314,7 @@ func TestUnixVolumeCompare(t *testing.T) {
                t.Errorf("Got err %q, expected %q", err, DiskHashError)
        }
 
-       p := fmt.Sprintf("%s/%s/%s", v.root, TestHash[:3], TestHash)
+       p := fmt.Sprintf("%s/%s/%s", v.Root, TestHash[:3], TestHash)
        os.Chmod(p, 000)
        err = v.Compare(TestHash, TestBlock)
        if err == nil || strings.Index(err.Error(), "permission denied") < 0 {
index e1b8c484f0413cb8ff6bfe3ac0be77aebbcd2aa7..b25ed942fb350751d5d476889ca164897feca160 100755 (executable)
@@ -97,7 +97,7 @@ begin
       f.write(@key)
       f.close()
     end
-    FileUtils.chown_R(l[:username], l[:username], userdotssh)
+    FileUtils.chown_R(l[:username], nil, userdotssh)
     File.chmod(0700, userdotssh)
     File.chmod(0750, @homedir)
     File.chmod(0600, userauthkeys)
index 518fe33d049a753cd9cece8b416c46dc4045e483..b6caa14ccce73f0026084427a30d579a1c363211 100755 (executable)
@@ -20,12 +20,11 @@ fi
 
 export ARVADOS_API_HOST=$localip:${services[api]}
 export ARVADOS_API_HOST_INSECURE=1
-export GITOLITE_HTTP_HOME=/var/lib/arvados/git
-export GL_BYPASS_ACCESS_CHECKS=1
 export PATH="$PATH:/var/lib/arvados/git/bin"
 cd ~git
 
 exec /usr/local/bin/arv-git-httpd \
-     -address=:${services[arv-git-httpd]} \
-     -git-command=/usr/share/gitolite3/gitolite-shell \
-     -repo-root=/var/lib/arvados/git/repositories
+    -address=:${services[arv-git-httpd]} \
+    -git-command=/usr/share/gitolite3/gitolite-shell \
+    -gitolite-home=/var/lib/arvados/git \
+    -repo-root=/var/lib/arvados/git/repositories