Merge branch '10346-rearrange-api-docs' closes #10346
authorPeter Amstutz <peter.amstutz@curoverse.com>
Tue, 15 Nov 2016 17:29:14 +0000 (12:29 -0500)
committerPeter Amstutz <peter.amstutz@curoverse.com>
Tue, 15 Nov 2016 17:29:14 +0000 (12:29 -0500)
89 files changed:
COPYING
apps/workbench/app/controllers/application_controller.rb
apps/workbench/app/controllers/collections_controller.rb
apps/workbench/app/controllers/projects_controller.rb
apps/workbench/app/controllers/work_unit_templates_controller.rb
apps/workbench/app/controllers/work_units_controller.rb
apps/workbench/app/models/arvados_base.rb
apps/workbench/app/views/projects/_show_workflows.html.erb [new file with mode: 0644]
apps/workbench/test/controllers/disabled_api_test.rb [new file with mode: 0644]
apps/workbench/test/integration/pipeline_instances_test.rb
apps/workbench/test/unit/disabled_api_test.rb [new file with mode: 0644]
build/go-python-package-scripts/postinst
build/go-python-package-scripts/prerm
build/run-build-docker-jobs-image.sh
build/run-build-packages.sh
build/run-tests.sh
doc/_includes/_container_runtime_constraints.liquid
doc/_includes/_install_docker_cleaner.liquid
sdk/cli/bin/arv-run-pipeline-instance
sdk/cwl/arvados_cwl/__init__.py
sdk/cwl/arvados_cwl/arvcontainer.py
sdk/cwl/arvados_cwl/arvdocker.py
sdk/cwl/arvados_cwl/arvjob.py
sdk/cwl/arvados_cwl/arvworkflow.py
sdk/cwl/arvados_cwl/crunch_script.py
sdk/cwl/arvados_cwl/pathmapper.py
sdk/cwl/arvados_cwl/runner.py
sdk/cwl/setup.py
sdk/cwl/test_with_arvbox.sh
sdk/cwl/tests/mock_discovery.py [new file with mode: 0644]
sdk/cwl/tests/test_container.py
sdk/cwl/tests/test_job.py
sdk/cwl/tests/test_make_output.py
sdk/cwl/tests/test_pathmapper.py
sdk/cwl/tests/test_submit.py
sdk/go/arvados/container.go
sdk/python/arvados/arvfile.py
sdk/python/arvados/commands/put.py
sdk/python/arvados/events.py
sdk/python/tests/test_arv_put.py
services/api/app/controllers/arvados/v1/collections_controller.rb
services/api/app/controllers/arvados/v1/groups_controller.rb
services/api/app/models/arvados_model.rb
services/api/app/models/container.rb
services/api/app/models/container_request.rb
services/api/app/models/log.rb
services/api/app/models/node.rb
services/api/config/application.default.yml
services/api/db/migrate/20161111143147_add_scheduling_parameters_to_container.rb [new file with mode: 0644]
services/api/db/structure.sql
services/api/test/fixtures/nodes.yml
services/api/test/functional/arvados/v1/collections_controller_test.rb
services/api/test/unit/container_request_test.rb
services/api/test/unit/log_test.rb
services/api/test/unit/node_test.rb
services/arv-git-httpd/main.go
services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
services/crunch-run/crunchrun.go
services/crunch-run/crunchrun_test.go
services/dockercleaner/arvados-docker-cleaner.service
services/dockercleaner/arvados_docker/cleaner.py
services/dockercleaner/setup.py
services/dockercleaner/tests/test_cleaner.py
services/fuse/arvados_fuse/__init__.py
services/keep-web/main.go
services/keepproxy/keepproxy.go
services/keepstore/azure_blob_volume.go
services/keepstore/azure_blob_volume_test.go
services/keepstore/collision.go
services/keepstore/config.go
services/keepstore/config_test.go [new file with mode: 0644]
services/keepstore/handler_test.go
services/keepstore/handlers.go
services/keepstore/handlers_with_generic_volume_test.go
services/keepstore/keepstore.go
services/keepstore/keepstore_test.go
services/keepstore/pull_worker.go
services/keepstore/s3_volume.go
services/keepstore/s3_volume_test.go
services/keepstore/trash_worker_test.go
services/keepstore/volume.go
services/keepstore/volume_generic_test.go
services/keepstore/volume_test.go
services/keepstore/volume_unix.go
services/keepstore/volume_unix_test.go
tools/arvbox/bin/arvbox
tools/arvbox/lib/arvbox/docker/crunch-setup.sh
tools/keep-exercise/keep-exercise.go

diff --git a/COPYING b/COPYING
index acbd7523ed49f01217874965aa3180cccec89d61..2cba2ad20a1fbf4aeae5e00ad55f64ed0eeb5b47 100644 (file)
--- a/COPYING
+++ b/COPYING
@@ -1,11 +1,15 @@
 Server-side components of Arvados contained in the apps/ and services/
 directories, including the API Server, Workbench, and Crunch, are licensed
-under the GNU Affero General Public License version 3 (see agpl-3.0.txt)
+under the GNU Affero General Public License version 3 (see agpl-3.0.txt).
+
+The files and directories under the build/, lib/ and tools/ directories are
+licensed under the GNU Affero General Public License version 3 (see
+agpl-3.0.txt).
 
 The Arvados client Software Development Kits contained in the sdk/ directory,
-example scripts in the crunch_scripts/ directory, and code samples in the
-Aravados documentation are licensed under the Apache License, Version 2.0 (see
-LICENSE-2.0.txt)
+example scripts in the crunch_scripts/ directory, the files and directories
+under backports/ and docker/, and code samples in the Aravados documentation
+are licensed under the Apache License, Version 2.0 (see LICENSE-2.0.txt).
 
 The Arvados Documentation located in the doc/ directory is licensed under the
-Creative Commons Attribution-Share Alike 3.0 United States (see by-sa-3.0.txt)
\ No newline at end of file
+Creative Commons Attribution-Share Alike 3.0 United States (see by-sa-3.0.txt).
index f68250ba15bbf77d9e25821632f3d37a0154e25c..c9ce8ce0b748a9473d2cd5f80739d070f1f8aef5 100644 (file)
@@ -13,6 +13,7 @@ class ApplicationController < ActionController::Base
   # Methods that don't require login should
   #   skip_around_filter :require_thread_api_token
   around_filter :require_thread_api_token, except: ERROR_ACTIONS
+  before_filter :ensure_arvados_api_exists, only: [:index, :show]
   before_filter :set_cache_buster
   before_filter :accept_uuid_as_id_param, except: ERROR_ACTIONS
   before_filter :check_user_agreements, except: ERROR_ACTIONS
@@ -213,6 +214,13 @@ class ApplicationController < ActionController::Base
     end
   end
 
+  def ensure_arvados_api_exists
+    if model_class.is_a?(Class) && model_class < ArvadosBase && !model_class.api_exists?(params['action'].to_sym)
+      @errors = ["#{params['action']} method is not supported for #{params['controller']}"]
+      return render_error(status: 404)
+    end
+  end
+
   def index
     find_objects_for_index if !@objects
     render_index
@@ -760,7 +768,11 @@ class ApplicationController < ActionController::Base
   }
 
   @@notification_tests.push lambda { |controller, current_user|
-    PipelineInstance.limit(1).where(created_by: current_user.uuid).each do
+    if PipelineInstance.api_exists?(:index)
+      PipelineInstance.limit(1).where(created_by: current_user.uuid).each do
+        return nil
+      end
+    else
       return nil
     end
     return lambda { |view|
@@ -856,12 +868,14 @@ class ApplicationController < ActionController::Base
   def recent_processes lim
     lim = 12 if lim.nil?
 
-    cols = %w(uuid owner_uuid created_at modified_at pipeline_template_uuid name state started_at finished_at)
-    pipelines = PipelineInstance.select(cols).limit(lim).order(["created_at desc"])
+    procs = {}
+    if PipelineInstance.api_exists?(:index)
+      cols = %w(uuid owner_uuid created_at modified_at pipeline_template_uuid name state started_at finished_at)
+      pipelines = PipelineInstance.select(cols).limit(lim).order(["created_at desc"])
+      pipelines.results.each { |pi| procs[pi] = pi.created_at }
+    end
 
     crs = ContainerRequest.limit(lim).order(["created_at desc"]).filter([["requesting_container_uuid", "=", nil]])
-    procs = {}
-    pipelines.results.each { |pi| procs[pi] = pi.created_at }
     crs.results.each { |c| procs[c] = c.created_at }
 
     Hash[procs.sort_by {|key, value| value}].keys.reverse.first(lim)
index 20b227c3c7277d491c74b96c0f5de7bc415c0f4c..46dcab6dce38487a7bb938472e91f209f4e926e2 100644 (file)
@@ -239,12 +239,15 @@ class CollectionsController < ApplicationController
         render 'hash_matches'
         return
       else
-        jobs_with = lambda do |conds|
-          Job.limit(RELATION_LIMIT).where(conds)
-            .results.sort_by { |j| j.finished_at || j.created_at }
+        if Job.api_exists?(:index)
+          jobs_with = lambda do |conds|
+            Job.limit(RELATION_LIMIT).where(conds)
+              .results.sort_by { |j| j.finished_at || j.created_at }
+          end
+          @output_of = jobs_with.call(output: @object.portable_data_hash)
+          @log_of = jobs_with.call(log: @object.portable_data_hash)
         end
-        @output_of = jobs_with.call(output: @object.portable_data_hash)
-        @log_of = jobs_with.call(log: @object.portable_data_hash)
+
         @project_links = Link.limit(RELATION_LIMIT).order("modified_at DESC")
           .where(head_uuid: @object.uuid, link_class: 'name').results
         project_hash = Group.where(uuid: @project_links.map(&:tail_uuid)).to_hash
index 16212a8d0ad489b381aa3619d69d72443905cfcb..0a2044a0e23e96b741d77658dfa91057fe57bdfa 100644 (file)
@@ -53,6 +53,19 @@ class ProjectsController < ApplicationController
   # It also seems to me that something like these could be used to configure the contents of the panes.
   def show_pane_list
     pane_list = []
+
+    procs = ["arvados#containerRequest"]
+    if PipelineInstance.api_exists?(:index)
+      procs << "arvados#pipelineInstance"
+    end
+
+    workflows = ["arvados#workflow"]
+    workflows_pane_name = 'Workflows'
+    if PipelineTemplate.api_exists?(:index)
+      workflows << "arvados#pipelineTemplate"
+      workflows_pane_name = 'Pipeline_templates'
+    end
+
     if @object.uuid != current_user.andand.uuid
       pane_list << 'Description'
     end
@@ -64,12 +77,12 @@ class ProjectsController < ApplicationController
     pane_list <<
       {
         :name => 'Pipelines_and_processes',
-        :filters => [%w(uuid is_a) + [%w(arvados#containerRequest arvados#pipelineInstance)]]
+        :filters => [%w(uuid is_a) + [procs]]
       }
     pane_list <<
       {
-        :name => 'Pipeline_templates',
-        :filters => [%w(uuid is_a) + [%w(arvados#pipelineTemplate arvados#workflow)]]
+        :name => workflows_pane_name,
+        :filters => [%w(uuid is_a) + [workflows]]
       }
     pane_list <<
       {
@@ -213,6 +226,10 @@ class ProjectsController < ApplicationController
       @name_link_for = {}
       kind_filters.each do |attr,op,val|
         (val.is_a?(Array) ? val : [val]).each do |type|
+          klass = type.split('#')[-1]
+          klass[0] = klass[0].capitalize
+          next if(!Object.const_get(klass).api_exists?(:index))
+
           filters = @filters - kind_filters + [['uuid', 'is_a', type]]
           if type == 'arvados#containerRequest'
             filters = filters + [['container_requests.requesting_container_uuid', '=', nil]]
index 6b5f114a66fda426aa2353fda68c48daf87c7b3e..fe53ac403c3faccf50cc17e2159be32a120e585e 100644 (file)
@@ -6,8 +6,10 @@ class WorkUnitTemplatesController < ApplicationController
     @filters = @filters || []
 
     # get next page of pipeline_templates
-    filters = @filters + [["uuid", "is_a", ["arvados#pipelineTemplate"]]]
-    pipelines = PipelineTemplate.limit(@limit).order(["created_at desc"]).filter(filters)
+    if PipelineTemplate.api_exists?(:index)
+      filters = @filters + [["uuid", "is_a", ["arvados#pipelineTemplate"]]]
+      pipelines = PipelineTemplate.limit(@limit).order(["created_at desc"]).filter(filters)
+    end
 
     # get next page of workflows
     filters = @filters + [["uuid", "is_a", ["arvados#workflow"]]]
index fe6bff1cee4dfd7fa42a8b487376713808cbbd29..3b611aa25b74e28663d9b7ecc2b0647670f066c8 100644 (file)
@@ -14,12 +14,16 @@ class WorkUnitsController < ApplicationController
     @filters = @filters || []
 
     # get next page of pipeline_instances
-    filters = @filters + [["uuid", "is_a", ["arvados#pipelineInstance"]]]
-    pipelines = PipelineInstance.limit(@limit).order(["created_at desc"]).filter(filters)
+    if PipelineInstance.api_exists?(:index)
+      filters = @filters + [["uuid", "is_a", ["arvados#pipelineInstance"]]]
+      pipelines = PipelineInstance.limit(@limit).order(["created_at desc"]).filter(filters)
+    end
 
     # get next page of jobs
-    filters = @filters + [["uuid", "is_a", ["arvados#job"]]]
-    jobs = Job.limit(@limit).order(["created_at desc"]).filter(filters)
+    if Job.api_exists?(:index)
+      filters = @filters + [["uuid", "is_a", ["arvados#job"]]]
+      jobs = Job.limit(@limit).order(["created_at desc"]).filter(filters)
+    end
 
     # get next page of container_requests
     filters = @filters + [["uuid", "is_a", ["arvados#containerRequest"]]]
index b02db7a6b63b5fad8c75ea5f107baa74a58d7151..6250daa06a3d0c65d2233f51c33588a9de3855a5 100644 (file)
@@ -334,7 +334,7 @@ class ArvadosBase < ActiveRecord::Base
   end
 
   def self.creatable?
-    current_user.andand.is_active
+    current_user.andand.is_active && api_exists?(:create)
   end
 
   def self.goes_in_projects?
@@ -361,6 +361,10 @@ class ArvadosBase < ActiveRecord::Base
     editable?
   end
 
+  def self.api_exists?(method)
+    arvados_api_client.discovery[:resources][self.to_s.underscore.pluralize.to_sym].andand[:methods].andand[method]
+  end
+
   # Array of strings that are the names of attributes that can be edited
   # with X-Editable.
   def editable_attributes
diff --git a/apps/workbench/app/views/projects/_show_workflows.html.erb b/apps/workbench/app/views/projects/_show_workflows.html.erb
new file mode 100644 (file)
index 0000000..133fddc
--- /dev/null
@@ -0,0 +1,5 @@
+<%= render_pane 'tab_contents', to_string: true, locals: {
+    limit: 50,
+    filters: [['uuid', 'is_a', ["arvados#workflow"]]],
+       sortable_columns: { 'name' => 'workflows.name', 'description' => 'workflows.description' }
+    }.merge(local_assigns) %>
diff --git a/apps/workbench/test/controllers/disabled_api_test.rb b/apps/workbench/test/controllers/disabled_api_test.rb
new file mode 100644 (file)
index 0000000..a41d87f
--- /dev/null
@@ -0,0 +1,63 @@
+require 'test_helper'
+require 'helpers/share_object_helper'
+
+class DisabledApiTest < ActionController::TestCase
+  test "dashboard recent processes when pipeline_instance index API is disabled" do
+    @controller = ProjectsController.new
+
+    dd = ArvadosApiClient.new_or_current.discovery.deep_dup
+    dd[:resources][:pipeline_instances][:methods].delete(:index)
+    ArvadosApiClient.any_instance.stubs(:discovery).returns(dd)
+
+    get :index, {}, session_for(:active)
+    assert_includes @response.body, "zzzzz-xvhdp-cr4runningcntnr" # expect crs
+    assert_not_includes @response.body, "zzzzz-d1hrv-"   # expect no pipelines
+  end
+
+  [
+    [:jobs, JobsController.new],
+    [:job_tasks, JobTasksController.new],
+    [:pipeline_instances, PipelineInstancesController.new],
+    [:pipeline_templates, PipelineTemplatesController.new],
+  ].each do |ctrl_name, ctrl|
+    test "#{ctrl_name} index page when API is disabled" do
+      @controller = ctrl
+
+      dd = ArvadosApiClient.new_or_current.discovery.deep_dup
+      dd[:resources][ctrl_name][:methods].delete(:index)
+      ArvadosApiClient.any_instance.stubs(:discovery).returns(dd)
+
+      get :index, {}, session_for(:active)
+      assert_response 404
+    end
+  end
+
+  [
+    :active,
+    nil,
+  ].each do |user|
+    test "project tabs as user #{user} when pipeline related index APIs are disabled" do
+      @controller = ProjectsController.new
+
+      Rails.configuration.anonymous_user_token = api_fixture('api_client_authorizations')['anonymous']['api_token']
+
+      dd = ArvadosApiClient.new_or_current.discovery.deep_dup
+      dd[:resources][:pipeline_templates][:methods].delete(:index)
+      ArvadosApiClient.any_instance.stubs(:discovery).returns(dd)
+
+      proj_uuid = api_fixture('groups')['anonymously_accessible_project']['uuid']
+
+      if user
+        get(:show, {id: proj_uuid}, session_for(user))
+      else
+        get(:show, {id: proj_uuid})
+      end
+
+      resp = @response.body
+      assert_includes resp, "href=\"#Data_collections\""
+      assert_includes resp, "href=\"#Pipelines_and_processes\""
+      assert_includes resp, "href=\"#Workflows\""
+      assert_not_includes resp, "href=\"#Pipeline_templates\""
+    end
+  end
+end
index d97850c991f292eca07a8040a3a65c2b2b7d99f2..171580bbaa2bc9816a9ba1061e40142d0487c8e9 100644 (file)
@@ -16,14 +16,21 @@ class PipelineInstancesTest < ActionDispatch::IntegrationTest
     # Note: Even with all this help, phantomjs seem to behave badly
     # when parsing timestamps on the other side of a DST transition.
     # See skipped tests below.
+
+    # In some locales (e.g., en_CA.UTF-8) Firefox can't parse what its
+    # own toLocaleString() puts out.
+    t.sub!(/(\d\d\d\d)-(\d\d)-(\d\d)/, '\2/\3/\1')
+
     if /(\d+:\d+ [AP]M) (\d+\/\d+\/\d+)/ =~ t
       # Currently dates.js renders timestamps as
       # '{t.toLocaleTimeString()} {t.toLocaleDateString()}' which even
-      # browsers can't make sense of. First we need to flip it around
-      # so it looks like what toLocaleString() would have made.
+      # en_US browsers can't make sense of. First we need to flip it
+      # around so it looks like what toLocaleString() would have made.
       t = $~[2] + ', ' + $~[1]
     end
-    DateTime.parse(page.evaluate_script "new Date('#{t}').toUTCString()").to_time
+
+    utc = page.evaluate_script("new Date('#{t}').toUTCString()")
+    DateTime.parse(utc).to_time
   end
 
   if false
diff --git a/apps/workbench/test/unit/disabled_api_test.rb b/apps/workbench/test/unit/disabled_api_test.rb
new file mode 100644 (file)
index 0000000..52e3bd1
--- /dev/null
@@ -0,0 +1,15 @@
+require 'test_helper'
+
+class DisabledApiTest < ActiveSupport::TestCase
+  test 'Job.creatable? reflects whether jobs.create API is enabled' do
+    use_token(:active) do
+      assert(Job.creatable?)
+    end
+    dd = ArvadosApiClient.new_or_current.discovery.deep_dup
+    dd[:resources][:jobs][:methods].delete(:create)
+    ArvadosApiClient.any_instance.stubs(:discovery).returns(dd)
+    use_token(:active) do
+      refute(Job.creatable?)
+    end
+  end
+end
index 051c8bd98bd379cf29752f08ea1d88284706d60e..729855a8806faa938bda6e9d3a95a3159eca08b5 100755 (executable)
@@ -2,39 +2,61 @@
 
 set -e
 
-# NOTE: This package name detection will only work on Debian.
-# If this postinst script ever starts doing work on Red Hat,
-# we'll need to adapt this code accordingly.
-script="$(basename "${0}")"
-pkg="${script%.postinst}"
-systemd_unit="${pkg}.service"
+if [ "%{name}" != "%\{name\}" ]; then
+    # Red Hat ("%{...}" is interpolated at package build time)
+    pkg="%{name}"
+    pkgtype=rpm
+    prefix="${RPM_INSTALL_PREFIX}"
+else
+    # Debian
+    script="$(basename "${0}")"
+    pkg="${script%.postinst}"
+    pkgtype=deb
+    prefix=/usr
+fi
 
-case "${1}" in
-    configure)
-        if [ -d /lib/systemd/system ]
-        then
-            # Python packages put all data files in /usr, so we copy
-            # them to /lib at install time.
-            py_unit="/usr/share/doc/${pkg}/${pkg}.service"
-            if [ -e "${py_unit}" ]
-            then
-                cp "${py_unit}" /lib/systemd/system/
+case "${pkgtype}-${1}" in
+    deb-configure | rpm-1)
+        dest_dir="/lib/systemd/system"
+        if ! [ -d "${dest_dir}" ]; then
+            exit 0
+        fi
+
+        # Find the unit file we need to install.
+        unit_file="${pkg}.service"
+        for dir in \
+            "${prefix}/share/doc/${pkg}" \
+            "${dest_dir}"; do
+            if [ -e "${dir}/${unit_file}" ]; then
+                src_dir="${dir}"
+                break
             fi
+        done
+        if [ -z "${src_dir}" ]; then
+            echo >&2 "WARNING: postinst script did not find ${unit_file} anywhere."
+            exit 0
+        fi
+
+        # Install/update the unit file if necessary.
+        if [ "${src_dir}" != "${dest_dir}" ]; then
+            cp "${src_dir}/${unit_file}" "${dest_dir}/" || exit 0
         fi
 
+        # Enable service, and make sure systemd re-reads the unit
+        # file, in case we changed it.
         if [ -e /run/systemd/system ]; then
-            eval "$(systemctl -p UnitFileState show "${systemd_unit}")"
+            systemctl daemon-reload || true
+            eval "$(systemctl -p UnitFileState show "${pkg}")"
             case "${UnitFileState}" in
                 disabled)
                     # Failing to enable or start the service is not a
                     # package error, so don't let errors here
                     # propagate up.
-                    systemctl enable "${systemd_unit}" || true
-                    systemctl start "${systemd_unit}" || true
+                    systemctl enable "${pkg}" || true
+                    systemctl start "${pkg}" || true
                     ;;
                 enabled)
-                    systemctl daemon-reload || true
-                    systemctl reload-or-try-restart "${systemd_unit}" || true
+                    systemctl reload-or-try-restart "${pkg}" || true
                     ;;
             esac
         fi
index c6ec18ca106cb1bba11e761363715fb1ce40c0d2..26baa62aa1b74266fd853062e5cb25bb4ec6a93d 100755 (executable)
@@ -2,26 +2,29 @@
 
 set -e
 
-# NOTE: This package name detection will only work on Debian.
-# If this prerm script ever starts doing work on Red Hat,
-# we'll need to adapt this code accordingly.
-script="$(basename "${0}")"
-pkg="${script%.prerm}"
-systemd_unit="${pkg}.service"
+if [ "%{name}" != "%\{name\}" ]; then
+    # Red Hat ("%{...}" is interpolated at package build time)
+    pkg="%{name}"
+    pkgtype=rpm
+    prefix="${RPM_INSTALL_PREFIX}"
+else
+    # Debian
+    script="$(basename "${0}")"
+    pkg="${script%.prerm}"
+    pkgtype=deb
+    prefix=/usr
+fi
 
-case "${1}" in
-    remove)
+case "${pkgtype}-${1}" in
+    deb-remove | rpm-0)
         if [ -e /run/systemd/system ]; then
-            systemctl stop "${systemd_unit}" || true
-            systemctl disable "${systemd_unit}" || true
+            systemctl stop "${pkg}" || true
+            systemctl disable "${pkg}" || true
         fi
-
-        # Unit files from Python packages get installed by postinst so
-        # we have to remove them explicitly here.
-        py_unit="/usr/share/doc/${pkg}/${pkg}.service"
-        if [ -e "${py_unit}" ]
-        then
-            rm "/lib/systemd/system/${pkg}.service" || true
+        if [ -e "${prefix}/share/doc/${pkg}/${pkg}.service" ]; then
+            # Unit files from Python packages get installed by
+            # postinst so we have to remove them explicitly here.
+            rm "/lib/systemd/system/${pkg}/${pkg}.service" || true
         fi
         ;;
 esac
index 7b5ea4ecec6bcda873ed8a98ea09d862e8dd86f4..556ac4ea194755deee1de8f8a98001cf2ae571b5 100755 (executable)
@@ -122,10 +122,10 @@ python_sdk_version=$(cd sdk/python && nohash_version_from_git 0.1)-2
 cwl_runner_version=$(cd sdk/cwl && nohash_version_from_git 1.0)-3
 
 if [[ $python_sdk_ts -gt $cwl_runner_ts ]]; then
-    cwl_runner_version=$python_sdk_version
-    gittag=$(cd sdk/python && git log --first-parent --max-count=1 --format=format:%H)
+    cwl_runner_version=$(cd sdk/python && nohash_version_from_git 1.0)-3
+    gittag=$(git log --first-parent --max-count=1 --format=format:%H sdk/python)
 else
-    gittag=$(cd sdk/cwl && git log --first-parent --max-count=1 --format=format:%H)
+    gittag=$(git log --first-parent --max-count=1 --format=format:%H sdk/cwl)
 fi
 
 echo cwl_runner_version $cwl_runner_version python_sdk_version $python_sdk_version
index 12c92607de51e3fd4aa7b6ec129e438b9427b1ec..320f9d445c3a052a62bf5b8560b2080c98b06904 100755 (executable)
@@ -431,6 +431,8 @@ package_go_binary tools/keep-block-check keep-block-check \
     "Verify that all data from one set of Keep servers to another was copied"
 package_go_binary tools/keep-rsync keep-rsync \
     "Copy all data from one set of Keep servers to another"
+package_go_binary tools/keep-exercise keep-exercise \
+    "Performance testing tool for Arvados Keep"
 
 # The Python SDK
 # Please resist the temptation to add --no-python-fix-name to the fpm call here
@@ -476,7 +478,7 @@ fpm_build ruamel.yaml "" "" python 0.12.4 --python-setup-py-arguments "--single-
 fpm_build cwltest "" "" python 1.0.20160907111242
 
 # And for cwltool we have the same problem as for schema_salad. Ward, 2016-03-17
-fpm_build cwltool "" "" python 1.0.20161007181528
+fpm_build cwltool "" "" python 1.0.20161107145355
 
 # FPM eats the trailing .0 in the python-rdflib-jsonld package when built with 'rdflib-jsonld>=0.3.0'. Force the version. Ward, 2016-03-25
 fpm_build rdflib-jsonld "" "" python 0.3.0
index 2797ec31093fc5183289123aa916efcaf051533f..8959cfbe09c3ea7ac6ded2142b626259787d2121 100755 (executable)
@@ -93,6 +93,7 @@ sdk/go/streamer
 sdk/go/crunchrunner
 sdk/cwl
 tools/crunchstat-summary
+tools/keep-exercise
 tools/keep-rsync
 tools/keep-block-check
 
@@ -158,8 +159,8 @@ sanity_checks() {
     echo -n 'go: '
     go version \
         || fatal "No go binary. See http://golang.org/doc/install"
-    [[ $(go version) =~ go1.([0-9]+) ]] && [[ ${BASH_REMATCH[1]} -ge 6 ]] \
-        || fatal "Go >= 1.6 required. See http://golang.org/doc/install"
+    [[ $(go version) =~ go1.([0-9]+) ]] && [[ ${BASH_REMATCH[1]} -ge 7 ]] \
+        || fatal "Go >= 1.7 required. See http://golang.org/doc/install"
     echo -n 'gcc: '
     gcc --version | egrep ^gcc \
         || fatal "No gcc. Try: apt-get install build-essential"
@@ -764,8 +765,9 @@ gostuff=(
     services/crunch-dispatch-local
     services/crunch-dispatch-slurm
     services/crunch-run
-    tools/keep-rsync
     tools/keep-block-check
+    tools/keep-exercise
+    tools/keep-rsync
     )
 for g in "${gostuff[@]}"
 do
index 06bfb4f5da71dbfe10c2ccb39484cf8f5ebd34ea..849db42e47827c7a3cc2ddea8a28f36d3434979e 100644 (file)
@@ -6,5 +6,6 @@ table(table table-bordered table-condensed).
 |_. Key|_. Type|_. Description|_. Notes|
 |ram|integer|Number of ram bytes to be used to run this process.|Optional. However, a ContainerRequest that is in "Committed" state must provide this.|
 |vcpus|integer|Number of cores to be used to run this process.|Optional. However, a ContainerRequest that is in "Committed" state must provide this.|
+|keep_cache_ram|integer|Number of keep cache bytes to be used to run this process.|Optional.|
 |API|boolean|When set, ARVADOS_API_HOST and ARVADOS_API_TOKEN will be set, and container will have networking enabled to access the Arvados API server.|Optional.|
 |partition|array of strings|Specify the names of one or more compute partitions that may run this container.  If not provided, the system chooses where to run the container.|Optional.|
index 5671a54ad58b2ae21e8443a683d2677224df6deb..6b7ec90e6394e4a2105ae4453d2648f56bfb7cc3 100644 (file)
@@ -3,39 +3,26 @@ h2. Configure the Docker cleaner
 The arvados-docker-cleaner program removes least recently used Docker images as needed to keep disk usage below a configured limit.
 
 {% include 'notebox_begin' %}
-This also removes all containers as soon as they exit, as if they were run with @docker run --rm@. If you need to debug or inspect containers after they stop, temporarily stop arvados-docker-cleaner or run it with @--remove-stopped-containers never@.
+This also removes all containers as soon as they exit, as if they were run with @docker run --rm@. If you need to debug or inspect containers after they stop, temporarily stop arvados-docker-cleaner or configure it with @"RemoveStoppedContainers":"never"@.
 {% include 'notebox_end' %}
 
-Create a file @/etc/systemd/system/arvados-docker-cleaner.service@ in an editor.  Include the text below as its contents.  Make sure to edit the @ExecStart@ line appropriately for your compute node.
+Create a file @/etc/arvados/docker-cleaner/docker-cleaner.json@ in an editor, with the following contents.
 
 <notextile>
-<pre><code>[Service]
-# Most deployments will want a quota that's at least 10G.  From there,
-# a larger quota can help reduce compute overhead by preventing reloading
-# the same Docker image repeatedly, but will leave less space for other
-# files on the same storage (usually Docker volumes).  Make sure the quota
-# is less than the total space available for Docker images.
-# If your deployment uses a Python 3 Software Collection, uncomment the
-# ExecStart line below, and delete the following one:
-# ExecStart=scl enable python33 "python3 -m arvados_docker.cleaner --quota <span class="userinput">20G</span>"
-ExecStart=python3 -m arvados_docker.cleaner --quota <span class="userinput">20G</span>
-Restart=always
-RestartPreventExitStatus=2
-
-[Install]
-WantedBy=default.target
-
-[Unit]
-After=docker.service
+<pre><code>{
+    "Quota": "<span class="userinput">10G</span>",
+    "RemoveStoppedContainers": "always"
+}
 </code></pre>
 </notextile>
 
-Then enable and start the service:
+*Choosing a quota:* Most deployments will want a quota that's at least 10G.  From there, a larger quota can help reduce compute overhead by preventing reloading the same Docker image repeatedly, but will leave less space for other files on the same storage (usually Docker volumes).  Make sure the quota is less than the total space available for Docker images.
+
+Restart the service after updating the configuration file.
 
 <notextile>
-<pre><code>~$ <span class="userinput">sudo systemctl enable arvados-docker-cleaner.service</span>
-~$ <span class="userinput">sudo systemctl start arvados-docker-cleaner.service</span>
+<pre><code>~$ <span class="userinput">sudo systemctl restart arvados-docker-cleaner</span>
 </code></pre>
 </notextile>
 
-If you are using a different daemon supervisor, or if you want to test the daemon in a terminal window, use the command on the @ExecStart@ line above.
+*If you are using a different daemon supervisor,* or if you want to test the daemon in a terminal window, run @arvados-docker-cleaner@. Run @arvados-docker-cleaner --help@ for more configuration options.
index bcb11d1d706d1fc6be68b340d0038daf6cc43266..960d7848de23b998ee4ce3d47edb38d35af54ea5 100755 (executable)
@@ -380,6 +380,8 @@ class WhRunPipelineInstance
           value = params[parametername.to_s]
         elsif parameter.has_key?(:default)
           value = parameter[:default]
+        elsif [false, 'false', 0, '0'].index(parameter[:required])
+          value = nil
         else
           errors << [componentname, parametername, "required parameter is missing"]
           next
index 3144592fc98f47083581e06fabcf900517d7ab01..92be92d6e0469fba63a1504a5f0c834fb4a9b2b7 100644 (file)
@@ -201,14 +201,28 @@ class ArvCwlRunner(object):
 
         srccollections = {}
         for k,v in generatemapper.items():
+            if k.startswith("_:"):
+                if v.type == "Directory":
+                    continue
+                if v.type == "CreateFile":
+                    with final.open(v.target, "wb") as f:
+                        f.write(v.resolved.encode("utf-8"))
+                    continue
+
+            if not k.startswith("keep:"):
+                raise Exception("Output source is not in keep or a literal")
             sp = k.split("/")
             srccollection = sp[0][5:]
             if srccollection not in srccollections:
-                srccollections[srccollection] = arvados.collection.CollectionReader(
-                    srccollection,
-                    api_client=self.api,
-                    keep_client=self.keep_client,
-                    num_retries=self.num_retries)
+                try:
+                    srccollections[srccollection] = arvados.collection.CollectionReader(
+                        srccollection,
+                        api_client=self.api,
+                        keep_client=self.keep_client,
+                        num_retries=self.num_retries)
+                except arvados.errors.ArgumentError as e:
+                    logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
+                    raise
             reader = srccollections[srccollection]
             try:
                 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
@@ -218,7 +232,7 @@ class ArvCwlRunner(object):
 
         def rewrite(fileobj):
             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
-            for k in ("basename", "size", "listing"):
+            for k in ("basename", "listing", "contents"):
                 if k in fileobj:
                     del fileobj[k]
 
@@ -234,7 +248,13 @@ class ArvCwlRunner(object):
                     final.api_response()["name"],
                     final.manifest_locator())
 
-        self.final_output_collection = final
+        def finalcollection(fileobj):
+            fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
+
+        adjustDirObjs(outputObj, finalcollection)
+        adjustFileObjs(outputObj, finalcollection)
+
+        return (outputObj, final)
 
     def set_crunch_output(self):
         if self.work_api == "containers":
@@ -390,7 +410,7 @@ class ArvCwlRunner(object):
         else:
             if self.output_name is None:
                 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
-            self.make_output_collection(self.output_name, self.final_output)
+            self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.final_output)
             self.set_crunch_output()
 
         if self.final_status != "success":
index 1581d20d2f62920a5259fe936248900966947ccb..aa088c5e8a06fa00ec086483b9f628c79687965f 100644 (file)
@@ -42,6 +42,7 @@ class ArvadosContainer(object):
                 "kind": "tmp"
             }
         }
+        scheduling_parameters = {}
 
         dirs = set()
         for f in self.pathmapper.files():
@@ -97,14 +98,17 @@ class ArvadosContainer(object):
 
         runtime_req, _ = get_feature(self, "http://arvados.org/cwl#RuntimeConstraints")
         if runtime_req:
-            logger.warn("RuntimeConstraints not yet supported by container API")
+            if "keep_cache" in runtime_req:
+                runtime_constraints["keep_cache_ram"] = runtime_req["keep_cache"]
 
         partition_req, _ = get_feature(self, "http://arvados.org/cwl#PartitionRequirement")
         if partition_req:
-            runtime_constraints["partition"] = aslist(partition_req["partition"])
+            scheduling_parameters["partitions"] = aslist(partition_req["partition"])
 
         container_request["mounts"] = mounts
         container_request["runtime_constraints"] = runtime_constraints
+        container_request["use_existing"] = kwargs.get("enable_reuse", True)
+        container_request["scheduling_parameters"] = scheduling_parameters
 
         try:
             response = self.arvrunner.api.container_requests().create(
@@ -186,6 +190,12 @@ class RunnerContainer(Runner):
         command = ["arvados-cwl-runner", "--local", "--api=containers"]
         if self.output_name:
             command.append("--output-name=" + self.output_name)
+
+        if self.enable_reuse:
+            command.append("--enable-reuse")
+        else:
+            command.append("--disable-reuse")
+
         command.extend([workflowpath, jobpath])
 
         return {
index 244dd3d3573a8738d7324207b439cb910aa3eba6..b9691d215c4d46e071d13740c2d3c90b9f7d1a81 100644 (file)
@@ -1,19 +1,28 @@
 import logging
 import sys
+import threading
 
 import cwltool.docker
 from cwltool.errors import WorkflowException
 import arvados.commands.keepdocker
 
-
 logger = logging.getLogger('arvados.cwl-runner')
 
+cached_lookups = {}
+cached_lookups_lock = threading.Lock()
+
 def arv_docker_get_image(api_client, dockerRequirement, pull_image, project_uuid):
     """Check if a Docker image is available in Keep, if not, upload it using arv-keepdocker."""
 
     if "dockerImageId" not in dockerRequirement and "dockerPull" in dockerRequirement:
         dockerRequirement["dockerImageId"] = dockerRequirement["dockerPull"]
 
+    global cached_lookups
+    global cached_lookups_lock
+    with cached_lookups_lock:
+        if dockerRequirement["dockerImageId"] in cached_lookups:
+            return cached_lookups[dockerRequirement["dockerImageId"]]
+
     sp = dockerRequirement["dockerImageId"].split(":")
     image_name = sp[0]
     image_tag = sp[1] if len(sp) > 1 else None
@@ -33,8 +42,9 @@ def arv_docker_get_image(api_client, dockerRequirement, pull_image, project_uuid
         logger.info("Uploading Docker image %s", ":".join(args[1:]))
         try:
             arvados.commands.keepdocker.main(args, stdout=sys.stderr)
-        except SystemExit:
-            raise WorkflowException()
+        except SystemExit as e:
+            if e.code:
+                raise WorkflowException("keepdocker exited with code %s" % e.code)
 
         images = arvados.commands.keepdocker.list_images_in_arv(api_client, 3,
                                                                 image_name=image_name,
@@ -44,4 +54,14 @@ def arv_docker_get_image(api_client, dockerRequirement, pull_image, project_uuid
         raise WorkflowException("Could not find Docker image %s:%s" % (image_name, image_tag))
 
     pdh = api_client.collections().get(uuid=images[0][0]).execute()["portable_data_hash"]
+
+    with cached_lookups_lock:
+        cached_lookups[dockerRequirement["dockerImageId"]] = pdh
+
     return pdh
+
+def arv_docker_clear_cache():
+    global cached_lookups
+    global cached_lookups_lock
+    with cached_lookups_lock:
+        cached_lookups = {}
index 8228387f95cadccb774a534b9552d0350a95c95f..8a62204f8fb9ec22298abec15529411ace70ed9e 100644 (file)
@@ -22,9 +22,7 @@ from ._version import __version__
 logger = logging.getLogger('arvados.cwl-runner')
 metrics = logging.getLogger('arvados.cwl-runner.metrics')
 
-tmpdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.tmpdir\)=(.*)")
-outdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.outdir\)=(.*)")
-keepre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.keep\)=(.*)")
+crunchrunner_re = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.(tmpdir|outdir|keep)\)=(.*)")
 
 class ArvadosJob(object):
     """Submit and manage a Crunch job for executing a CWL CommandLineTool."""
@@ -87,6 +85,8 @@ class ArvadosJob(object):
         with Perf(metrics, "arv_docker_get_image %s" % self.name):
             (docker_req, docker_is_req) = get_feature(self, "DockerRequirement")
             if docker_req and kwargs.get("use_container") is not False:
+                if docker_req.get("dockerOutputDirectory"):
+                    raise UnsupportedRequirement("Option 'dockerOutputDirectory' of DockerRequirement not supported.")
                 runtime_constraints["docker_image"] = arv_docker_get_image(self.arvrunner.api, docker_req, pull_image, self.arvrunner.project_uuid)
             else:
                 runtime_constraints["docker_image"] = arvados_jobs_image(self.arvrunner)
@@ -184,6 +184,7 @@ class ArvadosJob(object):
                                                                    keep_client=self.arvrunner.keep_client,
                                                                    num_retries=self.arvrunner.num_retries)
                         log = logc.open(logc.keys()[0])
+                        dirs = {}
                         tmpdir = None
                         outdir = None
                         keepdir = None
@@ -195,19 +196,13 @@ class ArvadosJob(object):
                             # the job restarts on a different node these values
                             # will different runs, and we need to know about the
                             # final run that actually produced output.
-
-                            g = tmpdirre.match(l)
-                            if g:
-                                tmpdir = g.group(1)
-                            g = outdirre.match(l)
+                            g = crunchrunner_re.match(l)
                             if g:
-                                outdir = g.group(1)
-                            g = keepre.match(l)
-                            if g:
-                                keepdir = g.group(1)
+                                dirs[g.group(1)] = g.group(2)
 
                     with Perf(metrics, "output collection %s" % self.name):
-                        outputs = done.done(self, record, tmpdir, outdir, keepdir)
+                        outputs = done.done(self, record, dirs["tmpdir"],
+                                            dirs["outdir"], dirs["keep"])
             except WorkflowException as e:
                 logger.error("Error while collecting job outputs:\n%s", e, exc_info=(e if self.arvrunner.debug else False))
                 processStatus = "permanentFail"
@@ -242,8 +237,12 @@ class RunnerJob(Runner):
             del self.job_order["job_order"]
 
         self.job_order["cwl:tool"] = workflowmapper.mapper(self.tool.tool["id"]).target[5:]
+
         if self.output_name:
             self.job_order["arv:output_name"] = self.output_name
+
+        self.job_order["arv:enable_reuse"] = self.enable_reuse
+
         return {
             "script": "cwl-runner",
             "script_version": __version__,
@@ -317,6 +316,7 @@ class RunnerTemplate(object):
         Specifically, translate CWL input specs to Arvados pipeline
         format, like {"dataclass":"File","value":"xyz"}.
         """
+
         spec = self.job.arvados_job_spec()
 
         # Most of the component spec is exactly the same as the job
index 8eb8fe6fee50e0722ee7066171ff7b7bbc4a10c5..ce633d43285a537268f3bc96dc446696d17d06a6 100644 (file)
@@ -87,6 +87,8 @@ class ArvadosWorkflow(Workflow):
                 joborder_keepmount = copy.deepcopy(joborder)
 
                 def keepmount(obj):
+                    if "location" not in obj:
+                        raise WorkflowException("%s object is missing required 'location' field: %s" % (obj["class"], obj))
                     if obj["location"].startswith("keep:"):
                         obj["location"] = "/keep/" + obj["location"][5:]
                         if "listing" in obj:
index 9b0680bc83783e94054f68d1a6376d4d1f5883ae..173eb93daf2c4070ba92f28fca2ac053952f1662 100644 (file)
@@ -63,10 +63,15 @@ def run():
         adjustDirObjs(job_order_object, functools.partial(getListing, arvados_cwl.fsaccess.CollectionFsAccess("", api_client=api)))
 
         output_name = None
+        enable_reuse = True
         if "arv:output_name" in job_order_object:
             output_name = job_order_object["arv:output_name"]
             del job_order_object["arv:output_name"]
 
+        if "arv:enable_reuse" in job_order_object:
+            enable_reuse = job_order_object["arv:enable_reuse"]
+            del job_order_object["arv:enable_reuse"]
+
         runner = arvados_cwl.ArvCwlRunner(api_client=arvados.api('v1', model=OrderedJsonModel()),
                                           output_name=output_name)
 
@@ -74,7 +79,7 @@ def run():
 
         args = argparse.Namespace()
         args.project_uuid = arvados.current_job()["owner_uuid"]
-        args.enable_reuse = True
+        args.enable_reuse = enable_reuse
         args.submit = False
         args.debug = True
         args.quiet = False
index 73c81ceb0fcdb033203c1b7e5425b3875ea121d6..58500d3a993ddb74327c419925c2aed2b769a1b6 100644 (file)
@@ -150,27 +150,31 @@ class ArvPathMapper(PathMapper):
         else:
             return super(ArvPathMapper, self).reversemap(target)
 
-class InitialWorkDirPathMapper(PathMapper):
+class StagingPathMapper(PathMapper):
+    _follow_dirs = True
 
     def visit(self, obj, stagedir, basedir, copy=False):
         # type: (Dict[unicode, Any], unicode, unicode, bool) -> None
         loc = obj["location"]
+        tgt = os.path.join(stagedir, obj["basename"])
         if obj["class"] == "Directory":
-            self._pathmap[loc] = MapperEnt(obj["location"], stagedir, "Directory")
-            self.visitlisting(obj.get("listing", []), stagedir, basedir)
+            self._pathmap[loc] = MapperEnt(loc, tgt, "Directory")
+            if loc.startswith("_:") or self._follow_dirs:
+                self.visitlisting(obj.get("listing", []), tgt, basedir)
         elif obj["class"] == "File":
             if loc in self._pathmap:
                 return
-            tgt = os.path.join(stagedir, obj["basename"])
-            if "contents" in obj and obj["location"].startswith("_:"):
+            if "contents" in obj and loc.startswith("_:"):
                 self._pathmap[loc] = MapperEnt(obj["contents"], tgt, "CreateFile")
             else:
                 if copy:
-                    self._pathmap[loc] = MapperEnt(obj["path"], tgt, "WritableFile")
+                    self._pathmap[loc] = MapperEnt(loc, tgt, "WritableFile")
                 else:
-                    self._pathmap[loc] = MapperEnt(obj["path"], tgt, "File")
+                    self._pathmap[loc] = MapperEnt(loc, tgt, "File")
                 self.visitlisting(obj.get("secondaryFiles", []), stagedir, basedir)
 
+
+class InitialWorkDirPathMapper(StagingPathMapper):
     def setup(self, referenced_files, basedir):
         # type: (List[Any], unicode) -> None
 
@@ -183,19 +187,8 @@ class InitialWorkDirPathMapper(PathMapper):
                 self._pathmap[path] = MapperEnt("$(task.keep)/%s" % ab[5:], tgt, type)
 
 
-class FinalOutputPathMapper(PathMapper):
-    def visit(self, obj, stagedir, basedir, copy=False):
-        # type: (Dict[unicode, Any], unicode, unicode, bool) -> None
-        loc = obj["location"]
-        if obj["class"] == "Directory":
-            self._pathmap[loc] = MapperEnt(loc, stagedir, "Directory")
-        elif obj["class"] == "File":
-            if loc in self._pathmap:
-                return
-            tgt = os.path.join(stagedir, obj["basename"])
-            self._pathmap[loc] = MapperEnt(loc, tgt, "File")
-            self.visitlisting(obj.get("secondaryFiles", []), stagedir, basedir)
-
+class FinalOutputPathMapper(StagingPathMapper):
+    _follow_dirs = False
     def setup(self, referenced_files, basedir):
         # type: (List[Any], unicode) -> None
         self.visitlisting(referenced_files, self.stagedir, basedir)
index 49d37ebd5aec177f0958088983f483123ce113b3..a1142544f5bf2e16150d56dd4d0b707cfd4db984 100644 (file)
@@ -9,9 +9,11 @@ from cStringIO import StringIO
 import cwltool.draft2tool
 from cwltool.draft2tool import CommandLineTool
 import cwltool.workflow
-from cwltool.process import get_feature, scandeps, UnsupportedRequirement, normalizeFilesDirs
+from cwltool.process import get_feature, scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname
 from cwltool.load_tool import fetch_document
 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
+from cwltool.utils import aslist
+from cwltool.builder import substitute
 
 import arvados.collection
 import ruamel.yaml as yaml
@@ -108,6 +110,9 @@ def upload_docker(arvrunner, tool):
     if isinstance(tool, CommandLineTool):
         (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
         if docker_req:
+            if docker_req.get("dockerOutputDirectory"):
+                # TODO: can be supported by containers API, but not jobs API.
+                raise UnsupportedRequirement("Option 'dockerOutputDirectory' of DockerRequirement not supported.")
             arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
     elif isinstance(tool, cwltool.workflow.Workflow):
         for s in tool.steps:
@@ -116,13 +121,25 @@ def upload_docker(arvrunner, tool):
 def upload_instance(arvrunner, name, tool, job_order):
         upload_docker(arvrunner, tool)
 
+        for t in tool.tool["inputs"]:
+            def setSecondary(fileobj):
+                if isinstance(fileobj, dict) and fileobj.get("class") == "File":
+                    if "secondaryFiles" not in fileobj:
+                        fileobj["secondaryFiles"] = [{"location": substitute(fileobj["location"], sf), "class": "File"} for sf in t["secondaryFiles"]]
+
+                if isinstance(fileobj, list):
+                    for e in fileobj:
+                        setSecondary(e)
+
+            if shortname(t["id"]) in job_order and t.get("secondaryFiles"):
+                setSecondary(job_order[shortname(t["id"])])
+
         workflowmapper = upload_dependencies(arvrunner,
                                              name,
                                              tool.doc_loader,
                                              tool.tool,
                                              tool.tool["id"],
                                              True)
-
         jobmapper = upload_dependencies(arvrunner,
                                         os.path.basename(job_order.get("id", "#")),
                                         tool.doc_loader,
index d1c8f9b567839bb6aaf1e78db2d6855b9a6038c2..9d9a1e1a7acf99f46d61d96de384681da114925a 100644 (file)
@@ -48,7 +48,7 @@ setup(name='arvados-cwl-runner',
       # Make sure to update arvados/build/run-build-packages.sh as well
       # when updating the cwltool version pin.
       install_requires=[
-          'cwltool==1.0.20161007181528',
+          'cwltool==1.0.20161107145355',
           'arvados-python-client>=0.1.20160826210445'
       ],
       data_files=[
index 3b16bbcc200819f04386db92b698db5b46f276c8..ee8daa1e15cbfc1147a00839b8d7da9f69c7d1fe 100755 (executable)
@@ -9,7 +9,6 @@ fi
 reset_container=1
 leave_running=0
 config=dev
-docker_pull=1
 tag=""
 
 while test -n "$1" ; do
@@ -27,16 +26,12 @@ while test -n "$1" ; do
             config=$2
             shift ; shift
             ;;
-        --no-docker-pull)
-            docker_pull=0
-            shift
-            ;;
         --tag)
             tag=$2
             shift ; shift
             ;;
         -h|--help)
-            echo "$0 [--no-reset-container] [--leave-running] [--no-docker-pull] [--config dev|localdemo] [--tag docker_tag]"
+            echo "$0 [--no-reset-container] [--leave-running] [--config dev|localdemo] [--tag docker_tag]"
             exit
             ;;
         *)
@@ -60,9 +55,11 @@ set -eu -o pipefail
 
 . /usr/local/lib/arvbox/common.sh
 
-cd /usr/src/arvados/sdk/cwl
-python setup.py sdist
-pip_install \$(ls -r dist/arvados-cwl-runner-*.tar.gz | head -n1)
+if test $config = dev ; then
+  cd /usr/src/arvados/sdk/cwl
+  python setup.py sdist
+  pip_install \$(ls -r dist/arvados-cwl-runner-*.tar.gz | head -n1)
+fi
 
 mkdir -p /tmp/cwltest
 cd /tmp/cwltest
@@ -75,10 +72,6 @@ export ARVADOS_API_HOST=localhost:8000
 export ARVADOS_API_HOST_INSECURE=1
 export ARVADOS_API_TOKEN=\$(cat /var/lib/arvados/superuser_token)
 
-if test $docker_pull = 1 ; then
-  arv-keepdocker --pull arvados/jobs $tag
-fi
-
 cat >/tmp/cwltest/arv-cwl-jobs <<EOF2
 #!/bin/sh
 exec arvados-cwl-runner --api=jobs --compute-checksum \\\$@
diff --git a/sdk/cwl/tests/mock_discovery.py b/sdk/cwl/tests/mock_discovery.py
new file mode 100644 (file)
index 0000000..f31ff22
--- /dev/null
@@ -0,0 +1,13 @@
+import json
+import arvados
+
+_rootDesc = None
+
+def get_rootDesc():
+    global _rootDesc
+    if not _rootDesc:
+        try:
+            _rootDesc = arvados.api('v1')._rootDesc
+        except ValueError:
+            raise Exception("Test requires an running API server to fetch discovery document")
+    return _rootDesc
index b5499970531008a7474f2ae99fb5de48f1234aa2..bb4bac31dd1767081cdc12a313496a4bb13b4546 100644 (file)
@@ -1,4 +1,5 @@
 import arvados_cwl
+from arvados_cwl.arvdocker import arv_docker_clear_cache
 import logging
 import mock
 import unittest
@@ -20,55 +21,62 @@ class TestContainer(unittest.TestCase):
     # Hence the default resources will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
     @mock.patch("arvados.commands.keepdocker.list_images_in_arv")
     def test_run(self, keepdocker):
-        runner = mock.MagicMock()
-        runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
-        runner.ignore_docker_for_reuse = False
-
-        keepdocker.return_value = [("zzzzz-4zz18-zzzzzzzzzzzzzz3", "")]
-        runner.api.collections().get().execute.return_value = {
-            "portable_data_hash": "99999999999999999999999999999993+99"}
-
-        document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
-
-        tool = {
-            "inputs": [],
-            "outputs": [],
-            "baseCommand": "ls",
-            "arguments": [{"valueFrom": "$(runtime.outdir)"}]
-        }
-        make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess, api_client=runner.api)
-        arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="containers", avsc_names=avsc_names,
-                                                 basedir="", make_fs_access=make_fs_access, loader=Loader({}))
-        arvtool.formatgraph = None
-        for j in arvtool.job({}, mock.MagicMock(), basedir="", name="test_run",
-                             make_fs_access=make_fs_access, tmpdir="/tmp"):
-            j.run()
-            runner.api.container_requests().create.assert_called_with(
-                body={
-                    'environment': {
-                        'HOME': '/var/spool/cwl',
-                        'TMPDIR': '/tmp'
-                    },
-                    'name': 'test_run',
-                    'runtime_constraints': {
-                        'vcpus': 1,
-                        'ram': 1073741824
-                    }, 'priority': 1,
-                    'mounts': {
-                        '/var/spool/cwl': {'kind': 'tmp'}
-                    },
-                    'state': 'Committed',
-                    'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
-                    'output_path': '/var/spool/cwl',
-                    'container_image': '99999999999999999999999999999993+99',
-                    'command': ['ls', '/var/spool/cwl'],
-                    'cwd': '/var/spool/cwl'
-                })
+        for enable_reuse in (True, False):
+            arv_docker_clear_cache()
+
+            runner = mock.MagicMock()
+            runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
+            runner.ignore_docker_for_reuse = False
+
+            keepdocker.return_value = [("zzzzz-4zz18-zzzzzzzzzzzzzz3", "")]
+            runner.api.collections().get().execute.return_value = {
+                "portable_data_hash": "99999999999999999999999999999993+99"}
+
+            document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
+
+            tool = {
+                "inputs": [],
+                "outputs": [],
+                "baseCommand": "ls",
+                "arguments": [{"valueFrom": "$(runtime.outdir)"}]
+            }
+            make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess, api_client=runner.api)
+            arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="containers", avsc_names=avsc_names,
+                                                     basedir="", make_fs_access=make_fs_access, loader=Loader({}))
+            arvtool.formatgraph = None
+            for j in arvtool.job({}, mock.MagicMock(), basedir="", name="test_run_"+str(enable_reuse),
+                                 make_fs_access=make_fs_access, tmpdir="/tmp"):
+                j.run(enable_reuse=enable_reuse)
+                runner.api.container_requests().create.assert_called_with(
+                    body={
+                        'environment': {
+                            'HOME': '/var/spool/cwl',
+                            'TMPDIR': '/tmp'
+                        },
+                        'name': 'test_run_'+str(enable_reuse),
+                        'runtime_constraints': {
+                            'vcpus': 1,
+                            'ram': 1073741824
+                        },
+                        'use_existing': enable_reuse,
+                        'priority': 1,
+                        'mounts': {
+                            '/var/spool/cwl': {'kind': 'tmp'}
+                        },
+                        'state': 'Committed',
+                        'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
+                        'output_path': '/var/spool/cwl',
+                        'container_image': '99999999999999999999999999999993+99',
+                        'command': ['ls', '/var/spool/cwl'],
+                        'cwd': '/var/spool/cwl',
+                        'scheduling_parameters': {}
+                    })
 
     # The test passes some fields in builder.resources
     # For the remaining fields, the defaults will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
     @mock.patch("arvados.commands.keepdocker.list_images_in_arv")
     def test_resource_requirements(self, keepdocker):
+        arv_docker_clear_cache()
         runner = mock.MagicMock()
         runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
         runner.ignore_docker_for_reuse = False
@@ -106,8 +114,9 @@ class TestContainer(unittest.TestCase):
                              make_fs_access=make_fs_access, tmpdir="/tmp"):
             j.run()
 
-        runner.api.container_requests().create.assert_called_with(
-            body={
+        call_args, call_kwargs = runner.api.container_requests().create.call_args
+
+        call_body_expected = {
                 'environment': {
                     'HOME': '/var/spool/cwl',
                     'TMPDIR': '/tmp'
@@ -116,9 +125,11 @@ class TestContainer(unittest.TestCase):
                 'runtime_constraints': {
                     'vcpus': 3,
                     'ram': 3145728000,
-                    'API': True,
-                    'partition': ['blurb']
-                }, 'priority': 1,
+                    'keep_cache_ram': 512,
+                    'API': True
+                },
+                'use_existing': True,
+                'priority': 1,
                 'mounts': {
                     '/var/spool/cwl': {'kind': 'tmp'}
                 },
@@ -127,8 +138,16 @@ class TestContainer(unittest.TestCase):
                 'output_path': '/var/spool/cwl',
                 'container_image': '99999999999999999999999999999993+99',
                 'command': ['ls'],
-                'cwd': '/var/spool/cwl'
-            })
+                'cwd': '/var/spool/cwl',
+                'scheduling_parameters': {
+                    'partitions': ['blurb']
+                }
+        }
+
+        call_body = call_kwargs.get('body', None)
+        self.assertNotEqual(None, call_body)
+        for key in call_body:
+            self.assertEqual(call_body_expected.get(key), call_body.get(key))
 
     @mock.patch("arvados.collection.Collection")
     def test_done(self, col):
index 93b5d39ffa6eb7e45efba47c64b69fb7a95a33d8..c8813adf7ea48d71f5b5b8cb9d7c091bfafa09c0 100644 (file)
@@ -4,71 +4,74 @@ import logging
 import mock
 import os
 import unittest
+import copy
+import StringIO
 
 import arvados
 import arvados_cwl
 import cwltool.process
 from schema_salad.ref_resolver import Loader
+from .mock_discovery import get_rootDesc
 
 if not os.getenv('ARVADOS_DEBUG'):
     logging.getLogger('arvados.cwl-runner').setLevel(logging.WARN)
     logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
 
-
 class TestJob(unittest.TestCase):
 
     # The test passes no builder.resources
     # Hence the default resources will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
     @mock.patch('arvados.commands.keepdocker.list_images_in_arv')
     def test_run(self, list_images_in_arv):
-        runner = mock.MagicMock()
-        runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
-        runner.ignore_docker_for_reuse = False
-        runner.num_retries = 0
-        document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
-
-        list_images_in_arv.return_value = [["zzzzz-4zz18-zzzzzzzzzzzzzzz"]]
-        runner.api.collections().get().execute.return_vaulue = {"portable_data_hash": "99999999999999999999999999999993+99"}
-
-        tool = {
-            "inputs": [],
-            "outputs": [],
-            "baseCommand": "ls",
-            "arguments": [{"valueFrom": "$(runtime.outdir)"}]
-        }
-        make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess, api_client=runner.api)
-        arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="jobs", avsc_names=avsc_names,
-                                                 basedir="", make_fs_access=make_fs_access, loader=Loader({}))
-        arvtool.formatgraph = None
-        for j in arvtool.job({}, mock.MagicMock(), basedir="", make_fs_access=make_fs_access):
-            j.run()
-            runner.api.jobs().create.assert_called_with(
-                body={
-                    'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
-                    'runtime_constraints': {},
-                    'script_parameters': {
-                        'tasks': [{
-                            'task.env': {'HOME': '$(task.outdir)', 'TMPDIR': '$(task.tmpdir)'},
-                            'command': ['ls', '$(task.outdir)']
-                        }],
+        for enable_reuse in (True, False):
+            runner = mock.MagicMock()
+            runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
+            runner.ignore_docker_for_reuse = False
+            runner.num_retries = 0
+            document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
+
+            list_images_in_arv.return_value = [["zzzzz-4zz18-zzzzzzzzzzzzzzz"]]
+            runner.api.collections().get().execute.return_vaulue = {"portable_data_hash": "99999999999999999999999999999993+99"}
+
+            tool = {
+                "inputs": [],
+                "outputs": [],
+                "baseCommand": "ls",
+                "arguments": [{"valueFrom": "$(runtime.outdir)"}]
+            }
+            make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess, api_client=runner.api)
+            arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="jobs", avsc_names=avsc_names,
+                                                     basedir="", make_fs_access=make_fs_access, loader=Loader({}))
+            arvtool.formatgraph = None
+            for j in arvtool.job({}, mock.MagicMock(), basedir="", make_fs_access=make_fs_access):
+                j.run(enable_reuse=enable_reuse)
+                runner.api.jobs().create.assert_called_with(
+                    body={
+                        'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
+                        'runtime_constraints': {},
+                        'script_parameters': {
+                            'tasks': [{
+                                'task.env': {'HOME': '$(task.outdir)', 'TMPDIR': '$(task.tmpdir)'},
+                                'command': ['ls', '$(task.outdir)']
+                            }],
+                        },
+                        'script_version': 'master',
+                        'minimum_script_version': '9e5b98e8f5f4727856b53447191f9c06e3da2ba6',
+                        'repository': 'arvados',
+                        'script': 'crunchrunner',
+                        'runtime_constraints': {
+                            'docker_image': 'arvados/jobs:'+arvados_cwl.__version__,
+                            'min_cores_per_node': 1,
+                            'min_ram_mb_per_node': 1024,
+                            'min_scratch_mb_per_node': 2048 # tmpdirSize + outdirSize
+                        }
                     },
-                    'script_version': 'master',
-                    'minimum_script_version': '9e5b98e8f5f4727856b53447191f9c06e3da2ba6',
-                    'repository': 'arvados',
-                    'script': 'crunchrunner',
-                    'runtime_constraints': {
-                        'docker_image': 'arvados/jobs:'+arvados_cwl.__version__,
-                        'min_cores_per_node': 1,
-                        'min_ram_mb_per_node': 1024,
-                        'min_scratch_mb_per_node': 2048 # tmpdirSize + outdirSize
-                    }
-                },
-                find_or_create=True,
-                filters=[['repository', '=', 'arvados'],
-                         ['script', '=', 'crunchrunner'],
-                         ['script_version', 'in git', '9e5b98e8f5f4727856b53447191f9c06e3da2ba6'],
-                         ['docker_image_locator', 'in docker', 'arvados/jobs:'+arvados_cwl.__version__]]
-            )
+                    find_or_create=enable_reuse,
+                    filters=[['repository', '=', 'arvados'],
+                             ['script', '=', 'crunchrunner'],
+                             ['script_version', 'in git', '9e5b98e8f5f4727856b53447191f9c06e3da2ba6'],
+                             ['docker_image_locator', 'in docker', 'arvados/jobs:'+arvados_cwl.__version__]]
+                )
 
     # The test passes some fields in builder.resources
     # For the remaining fields, the defaults will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
@@ -148,7 +151,11 @@ class TestJob(unittest.TestCase):
         runner.num_retries = 0
         runner.ignore_docker_for_reuse = False
 
-        reader().open.return_value = []
+        reader().open.return_value = StringIO.StringIO(
+            """2016-11-02_23:12:18 c97qk-8i9sb-cryqw2blvzy4yaj 13358 0 stderr 2016/11/02 23:12:18 crunchrunner: $(task.tmpdir)=/tmp/crunch-job-task-work/compute3.1/tmpdir
+2016-11-02_23:12:18 c97qk-8i9sb-cryqw2blvzy4yaj 13358 0 stderr 2016/11/02 23:12:18 crunchrunner: $(task.outdir)=/tmp/crunch-job-task-work/compute3.1/outdir
+2016-11-02_23:12:18 c97qk-8i9sb-cryqw2blvzy4yaj 13358 0 stderr 2016/11/02 23:12:18 crunchrunner: $(task.keep)=/keep
+        """)
         api.collections().list().execute.side_effect = ({"items": []},
                                                         {"items": [{"manifest_text": "XYZ"}]})
 
@@ -191,7 +198,12 @@ class TestJob(unittest.TestCase):
         runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
         runner.num_retries = 0
 
-        reader().open.return_value = []
+        reader().open.return_value = StringIO.StringIO(
+            """2016-11-02_23:12:18 c97qk-8i9sb-cryqw2blvzy4yaj 13358 0 stderr 2016/11/02 23:12:18 crunchrunner: $(task.tmpdir)=/tmp/crunch-job-task-work/compute3.1/tmpdir
+2016-11-02_23:12:18 c97qk-8i9sb-cryqw2blvzy4yaj 13358 0 stderr 2016/11/02 23:12:18 crunchrunner: $(task.outdir)=/tmp/crunch-job-task-work/compute3.1/outdir
+2016-11-02_23:12:18 c97qk-8i9sb-cryqw2blvzy4yaj 13358 0 stderr 2016/11/02 23:12:18 crunchrunner: $(task.keep)=/keep
+        """)
+
         api.collections().list().execute.side_effect = ({"items": [{"uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz2"}]},)
 
         arvjob = arvados_cwl.ArvadosJob(runner)
@@ -226,7 +238,8 @@ class TestWorkflow(unittest.TestCase):
         arvados_cwl.add_arv_hints()
 
         api = mock.MagicMock()
-        api._rootDesc = arvados.api('v1')._rootDesc
+        api._rootDesc = get_rootDesc()
+
         runner = arvados_cwl.ArvCwlRunner(api)
         self.assertEqual(runner.work_api, 'jobs')
 
@@ -291,7 +304,7 @@ class TestWorkflow(unittest.TestCase):
         arvados_cwl.add_arv_hints()
 
         api = mock.MagicMock()
-        api._rootDesc = arvados.api('v1')._rootDesc
+        api._rootDesc = copy.deepcopy(get_rootDesc())
         del api._rootDesc.get('resources')['jobs']['methods']['create']
         runner = arvados_cwl.ArvCwlRunner(api)
         self.assertEqual(runner.work_api, 'containers')
index cd66eb15065059579e718150372d1f0c03247688..3228ad77b3ca9343c0d6ff736b0714c23acd060b 100644 (file)
@@ -8,11 +8,12 @@ import unittest
 
 import arvados
 import arvados_cwl
+from .mock_discovery import get_rootDesc
 
 class TestMakeOutput(unittest.TestCase):
     def setUp(self):
         self.api = mock.MagicMock()
-        self.api._rootDesc = arvados.api('v1')._rootDesc
+        self.api._rootDesc = get_rootDesc()
 
     @mock.patch("arvados.collection.Collection")
     @mock.patch("arvados.collection.CollectionReader")
@@ -31,7 +32,7 @@ class TestMakeOutput(unittest.TestCase):
         final.open.return_value = openmock
         openmock.__enter__.return_value = cwlout
 
-        runner.make_output_collection("Test output", {
+        _, runner.final_output_collection = runner.make_output_collection("Test output", {
             "foo": {
                 "class": "File",
                 "location": "keep:99999999999999999999999999999991+99/foo.txt",
@@ -41,7 +42,8 @@ class TestMakeOutput(unittest.TestCase):
             "bar": {
                 "class": "File",
                 "location": "keep:99999999999999999999999999999992+99/bar.txt",
-                "basename": "baz.txt"
+                "basename": "baz.txt",
+                "size": 4
             }
         })
 
@@ -51,11 +53,13 @@ class TestMakeOutput(unittest.TestCase):
         self.assertEqual("""{
     "bar": {
         "class": "File",
-        "location": "baz.txt"
+        "location": "baz.txt",
+        "size": 4
     },
     "foo": {
         "class": "File",
-        "location": "foo.txt"
+        "location": "foo.txt",
+        "size": 3
     }
 }""", cwlout.getvalue())
 
index 57958f78d0a41a97b9d5b0aa5f10a2cf10563b22..3b6af04b293e8f48e320fa6508716d3c6d27faf6 100644 (file)
@@ -12,6 +12,7 @@ import arvados.collection
 import arvados_cwl
 
 from cwltool.pathmapper import MapperEnt
+from .mock_discovery import get_rootDesc
 
 from arvados_cwl.pathmapper import ArvPathMapper
 
@@ -23,7 +24,7 @@ def upload_mock(files, api, dry_run=False, num_retries=0, project=None, fnPatter
 class TestPathmap(unittest.TestCase):
     def setUp(self):
         self.api = mock.MagicMock()
-        self.api._rootDesc = arvados.api('v1')._rootDesc
+        self.api._rootDesc = get_rootDesc()
 
     def test_keepref(self):
         """Test direct keep references."""
index 7faef6992c37b3d4212db8a58a5606a2e0b0eed8..c195b03916992561f5e52b1d970c3cabd778df30 100644 (file)
@@ -11,35 +11,42 @@ import unittest
 import arvados
 import arvados.collection
 import arvados_cwl
+import arvados_cwl.runner
 import arvados.keep
 
 from .matcher import JsonDiffMatcher
+from .mock_discovery import get_rootDesc
 
+_rootDesc = None
 
 def stubs(func):
     @functools.wraps(func)
     @mock.patch("arvados.commands.keepdocker.list_images_in_arv")
     @mock.patch("arvados.collection.KeepClient")
+    @mock.patch("arvados.keep.KeepClient")
     @mock.patch("arvados.events.subscribe")
-    def wrapped(self, events, keep_client, keepdocker, *args, **kwargs):
+    def wrapped(self, events, keep_client1, keep_client2, keepdocker, *args, **kwargs):
         class Stubs:
             pass
         stubs = Stubs()
         stubs.events = events
         stubs.keepdocker = keepdocker
-        stubs.keep_client = keep_client
+
 
         def putstub(p, **kwargs):
             return "%s+%i" % (hashlib.md5(p).hexdigest(), len(p))
-        stubs.keep_client().put.side_effect = putstub
-        stubs.keep_client.put.side_effect = putstub
+        keep_client1().put.side_effect = putstub
+        keep_client1.put.side_effect = putstub
+        keep_client2().put.side_effect = putstub
+        keep_client2.put.side_effect = putstub
 
+        stubs.keep_client = keep_client2
         stubs.keepdocker.return_value = [("zzzzz-4zz18-zzzzzzzzzzzzzz3", "")]
         stubs.fake_user_uuid = "zzzzz-tpzed-zzzzzzzzzzzzzzz"
 
-
         stubs.api = mock.MagicMock()
-        stubs.api._rootDesc = arvados.api('v1')._rootDesc
+        stubs.api._rootDesc = get_rootDesc()
+
         stubs.api.users().current().execute.return_value = {
             "uuid": stubs.fake_user_uuid,
         }
@@ -134,7 +141,8 @@ def stubs(func):
                               'listing': [
                                   {'basename': 'renamed.txt', 'class': 'File', 'location': 'keep:99999999999999999999999999999998+99/file1.txt'}
                               ]}},
-                        'cwl:tool': '99999999999999999999999999999991+99/wf/submit_wf.cwl'
+                        'cwl:tool': '99999999999999999999999999999991+99/wf/submit_wf.cwl',
+                        'arv:enable_reuse': True
                     },
                     'repository': 'arvados',
                     'script_version': arvados_cwl.__version__,
@@ -176,7 +184,7 @@ def stubs(func):
             },
             'state': 'Committed',
             'owner_uuid': 'zzzzz-tpzed-zzzzzzzzzzzzzzz',
-            'command': ['arvados-cwl-runner', '--local', '--api=containers', '/var/lib/cwl/workflow/submit_wf.cwl', '/var/lib/cwl/job/cwl.input.json'],
+            'command': ['arvados-cwl-runner', '--local', '--api=containers', '--enable-reuse', '/var/lib/cwl/workflow/submit_wf.cwl', '/var/lib/cwl/job/cwl.input.json'],
             'name': 'submit_wf.cwl',
             'container_image': 'arvados/jobs:'+arvados_cwl.__version__,
             'output_path': '/var/spool/cwl',
@@ -241,6 +249,26 @@ class TestSubmit(unittest.TestCase):
         self.assertEqual(capture_stdout.getvalue(),
                          stubs.expect_pipeline_uuid + '\n')
 
+
+    @mock.patch("time.sleep")
+    @stubs
+    def test_submit_no_reuse(self, stubs, tm):
+        capture_stdout = cStringIO.StringIO()
+        exited = arvados_cwl.main(
+            ["--submit", "--no-wait", "--debug", "--disable-reuse",
+             "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
+            capture_stdout, sys.stderr, api_client=stubs.api)
+        self.assertEqual(exited, 0)
+
+        stubs.expect_pipeline_instance["components"]["cwl-runner"]["script_parameters"]["arv:enable_reuse"] = {"value": False}
+
+        expect_pipeline = copy.deepcopy(stubs.expect_pipeline_instance)
+        expect_pipeline["owner_uuid"] = stubs.fake_user_uuid
+        stubs.api.pipeline_instances().create.assert_called_with(
+            body=expect_pipeline)
+        self.assertEqual(capture_stdout.getvalue(),
+                         stubs.expect_pipeline_uuid + '\n')
+
     @mock.patch("time.sleep")
     @stubs
     def test_submit_with_project_uuid(self, stubs, tm):
@@ -303,6 +331,69 @@ class TestSubmit(unittest.TestCase):
         self.assertEqual(capture_stdout.getvalue(),
                          stubs.expect_container_request_uuid + '\n')
 
+    @stubs
+    def test_submit_container_no_reuse(self, stubs):
+        capture_stdout = cStringIO.StringIO()
+        try:
+            exited = arvados_cwl.main(
+                ["--submit", "--no-wait", "--api=containers", "--debug", "--disable-reuse",
+                 "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
+                capture_stdout, sys.stderr, api_client=stubs.api, keep_client=stubs.keep_client)
+            self.assertEqual(exited, 0)
+        except:
+            logging.exception("")
+
+        stubs.expect_container_spec["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--disable-reuse', '/var/lib/cwl/workflow/submit_wf.cwl', '/var/lib/cwl/job/cwl.input.json']
+
+        expect_container = copy.deepcopy(stubs.expect_container_spec)
+        expect_container["owner_uuid"] = stubs.fake_user_uuid
+        stubs.api.container_requests().create.assert_called_with(
+            body=expect_container)
+        self.assertEqual(capture_stdout.getvalue(),
+                         stubs.expect_container_request_uuid + '\n')
+
+    @mock.patch("arvados.commands.keepdocker.find_one_image_hash")
+    @mock.patch("cwltool.docker.get_image")
+    @mock.patch("arvados.api")
+    def test_arvados_jobs_image(self, api, get_image, find_one_image_hash):
+        arvrunner = mock.MagicMock()
+        arvrunner.project_uuid = ""
+        api.return_value = mock.MagicMock()
+        arvrunner.api = api.return_value
+        arvrunner.api.links().list().execute.side_effect = ({"items": [], "items_available": 0, "offset": 0},
+                                                            {"items": [], "items_available": 0, "offset": 0},
+                                                            {"items": [], "items_available": 0, "offset": 0},
+                                                            {"items": [{"created_at": "",
+                                                                        "head_uuid": "",
+                                                                        "link_class": "docker_image_hash",
+                                                                        "name": "123456",
+                                                                        "owner_uuid": "",
+                                                                        "properties": {"image_timestamp": ""}}], "items_available": 1, "offset": 0},
+                                                            {"items": [], "items_available": 0, "offset": 0},
+                                                            {"items": [{"created_at": "",
+                                                                        "head_uuid": "",
+                                                                        "link_class": "docker_image_repo+tag",
+                                                                        "name": "arvados/jobs:"+arvados_cwl.__version__,
+                                                                        "owner_uuid": "",
+                                                                        "properties": {"image_timestamp": ""}}], "items_available": 1, "offset": 0},
+                                                            {"items": [{"created_at": "",
+                                                                        "head_uuid": "",
+                                                                        "link_class": "docker_image_hash",
+                                                                        "name": "123456",
+                                                                        "owner_uuid": "",
+                                                                        "properties": {"image_timestamp": ""}}], "items_available": 1, "offset": 0}                                                            ,
+        )
+        find_one_image_hash.return_value = "123456"
+
+        arvrunner.api.collections().list().execute.side_effect = ({"items": [], "items_available": 0, "offset": 0},
+                                                                  {"items": [{"uuid": "",
+                                                                              "owner_uuid": "",
+                                                                              "manifest_text": "",
+                                                                              "properties": ""
+                                                                          }], "items_available": 1, "offset": 0},
+                                                                  {"items": [{"uuid": ""}], "items_available": 1, "offset": 0})
+        arvrunner.api.collections().create().execute.return_value = {"uuid": ""}
+        self.assertEqual("arvados/jobs:"+arvados_cwl.__version__, arvados_cwl.runner.arvados_jobs_image(arvrunner))
 
 class TestCreateTemplate(unittest.TestCase):
     @stubs
index e5050cb79c944673343878d559b75fd3e97807ae..61c14ea0b6c1d445bb2a26fb83a57614e0b240f9 100644 (file)
@@ -2,18 +2,19 @@ package arvados
 
 // Container is an arvados#container resource.
 type Container struct {
-       UUID               string             `json:"uuid"`
-       Command            []string           `json:"command"`
-       ContainerImage     string             `json:"container_image"`
-       Cwd                string             `json:"cwd"`
-       Environment        map[string]string  `json:"environment"`
-       LockedByUUID       string             `json:"locked_by_uuid"`
-       Mounts             map[string]Mount   `json:"mounts"`
-       Output             string             `json:"output"`
-       OutputPath         string             `json:"output_path"`
-       Priority           int                `json:"priority"`
-       RuntimeConstraints RuntimeConstraints `json:"runtime_constraints"`
-       State              ContainerState     `json:"state"`
+       UUID                 string               `json:"uuid"`
+       Command              []string             `json:"command"`
+       ContainerImage       string               `json:"container_image"`
+       Cwd                  string               `json:"cwd"`
+       Environment          map[string]string    `json:"environment"`
+       LockedByUUID         string               `json:"locked_by_uuid"`
+       Mounts               map[string]Mount     `json:"mounts"`
+       Output               string               `json:"output"`
+       OutputPath           string               `json:"output_path"`
+       Priority             int                  `json:"priority"`
+       RuntimeConstraints   RuntimeConstraints   `json:"runtime_constraints"`
+       State                ContainerState       `json:"state"`
+       SchedulingParameters SchedulingParameters `json:"scheduling_parameters"`
 }
 
 // Mount is special behavior to attach to a filesystem path or device.
@@ -30,10 +31,16 @@ type Mount struct {
 // RuntimeConstraints specify a container's compute resources (RAM,
 // CPU) and network connectivity.
 type RuntimeConstraints struct {
-       API       *bool
-       RAM       int      `json:"ram"`
-       VCPUs     int      `json:"vcpus"`
-       Partition []string `json:"partition"`
+       API          *bool
+       RAM          int `json:"ram"`
+       VCPUs        int `json:"vcpus"`
+       KeepCacheRAM int `json:"keep_cache_ram"`
+}
+
+// SchedulingParameters specify a container's scheduling parameters
+// such as Partitions
+type SchedulingParameters struct {
+       Partitions []string `json:"partitions"`
 }
 
 // ContainerList is an arvados#containerList resource.
index c394dab810715c2659b6f72f8f5f1e173d711ead..610fd7dc1317b6f0a6af7672d148766dbf9ce961 100644 (file)
@@ -417,6 +417,8 @@ class _BlockManager(object):
         self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
         self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
         self.copies = copies
+        self._pending_write_size = 0
+        self.threads_lock = threading.Lock()
 
     @synchronized
     def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
@@ -482,28 +484,28 @@ class _BlockManager(object):
                 if self._put_queue is not None:
                     self._put_queue.task_done()
 
-    @synchronized
     def start_put_threads(self):
-        if self._put_threads is None:
-            # Start uploader threads.
-
-            # If we don't limit the Queue size, the upload queue can quickly
-            # grow to take up gigabytes of RAM if the writing process is
-            # generating data more quickly than it can be send to the Keep
-            # servers.
-            #
-            # With two upload threads and a queue size of 2, this means up to 4
-            # blocks pending.  If they are full 64 MiB blocks, that means up to
-            # 256 MiB of internal buffering, which is the same size as the
-            # default download block cache in KeepClient.
-            self._put_queue = Queue.Queue(maxsize=2)
-
-            self._put_threads = []
-            for i in xrange(0, self.num_put_threads):
-                thread = threading.Thread(target=self._commit_bufferblock_worker)
-                self._put_threads.append(thread)
-                thread.daemon = True
-                thread.start()
+        with self.threads_lock:
+            if self._put_threads is None:
+                # Start uploader threads.
+
+                # If we don't limit the Queue size, the upload queue can quickly
+                # grow to take up gigabytes of RAM if the writing process is
+                # generating data more quickly than it can be send to the Keep
+                # servers.
+                #
+                # With two upload threads and a queue size of 2, this means up to 4
+                # blocks pending.  If they are full 64 MiB blocks, that means up to
+                # 256 MiB of internal buffering, which is the same size as the
+                # default download block cache in KeepClient.
+                self._put_queue = Queue.Queue(maxsize=2)
+
+                self._put_threads = []
+                for i in xrange(0, self.num_put_threads):
+                    thread = threading.Thread(target=self._commit_bufferblock_worker)
+                    self._put_threads.append(thread)
+                    thread.daemon = True
+                    thread.start()
 
     def _block_prefetch_worker(self):
         """The background downloader thread."""
@@ -555,24 +557,34 @@ class _BlockManager(object):
         self.stop_threads()
 
     @synchronized
-    def repack_small_blocks(self, force=False, sync=False):
+    def repack_small_blocks(self, force=False, sync=False, closed_file_size=0):
         """Packs small blocks together before uploading"""
-        # Search blocks ready for getting packed together before being committed to Keep.
-        # A WRITABLE block always has an owner.
-        # A WRITABLE block with its owner.closed() implies that it's
-        # size is <= KEEP_BLOCK_SIZE/2.
-        small_blocks = [b for b in self._bufferblocks.values() if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
-        if len(small_blocks) <= 1:
-            # Not enough small blocks for repacking
-            return
+        self._pending_write_size += closed_file_size
 
         # Check if there are enough small blocks for filling up one in full
-        pending_write_size = sum([b.size() for b in small_blocks])
-        if force or (pending_write_size >= config.KEEP_BLOCK_SIZE):
+        if force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE):
+
+            # Search blocks ready for getting packed together before being committed to Keep.
+            # A WRITABLE block always has an owner.
+            # A WRITABLE block with its owner.closed() implies that it's
+            # size is <= KEEP_BLOCK_SIZE/2.
+            small_blocks = [b for b in self._bufferblocks.values() if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
+
+            if len(small_blocks) <= 1:
+                # Not enough small blocks for repacking
+                return
+
+            # Update the pending write size count with its true value, just in case
+            # some small file was opened, written and closed several times.
+            self._pending_write_size = sum([b.size() for b in small_blocks])
+            if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force:
+                return
+
             new_bb = self._alloc_bufferblock()
             while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
                 bb = small_blocks.pop(0)
                 arvfile = bb.owner
+                self._pending_write_size -= bb.size()
                 new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
                 arvfile.set_segments([Range(new_bb.blockid,
                                             0,
@@ -846,7 +858,7 @@ class ArvadosFile(object):
             self.flush()
         elif self.closed():
             # All writers closed and size is adequate for repacking
-            self.parent._my_block_manager().repack_small_blocks()
+            self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size())
 
     def closed(self):
         """
index 5cb699f49f7a21eb8d7789a52bd0aea7bdd056f0..34cef6725500e6cb5cd12ec317aa1be8eeb4bd80 100644 (file)
@@ -14,10 +14,14 @@ import hashlib
 import json
 import os
 import pwd
+import time
 import signal
 import socket
 import sys
 import tempfile
+import threading
+import copy
+import logging
 from apiclient import errors as apiclient_errors
 
 import arvados.commands._util as arv_cmd
@@ -276,79 +280,343 @@ class ResumeCache(object):
         self.__init__(self.filename)
 
 
-class ArvPutCollectionWriter(arvados.ResumableCollectionWriter):
-    STATE_PROPS = (arvados.ResumableCollectionWriter.STATE_PROPS +
-                   ['bytes_written', '_seen_inputs'])
-
-    def __init__(self, cache=None, reporter=None, bytes_expected=None, **kwargs):
-        self.bytes_written = 0
-        self._seen_inputs = []
-        self.cache = cache
+class ArvPutUploadJob(object):
+    CACHE_DIR = '.cache/arvados/arv-put'
+    EMPTY_STATE = {
+        'manifest' : None, # Last saved manifest checkpoint
+        'files' : {} # Previous run file list: {path : {size, mtime}}
+    }
+
+    def __init__(self, paths, resume=True, reporter=None, bytes_expected=None,
+                 name=None, owner_uuid=None, ensure_unique_name=False,
+                 num_retries=None, replication_desired=None,
+                 filename=None, update_time=1.0):
+        self.paths = paths
+        self.resume = resume
         self.reporter = reporter
         self.bytes_expected = bytes_expected
-        super(ArvPutCollectionWriter, self).__init__(**kwargs)
-
-    @classmethod
-    def from_cache(cls, cache, reporter=None, bytes_expected=None,
-                   num_retries=0, replication=0):
+        self.bytes_written = 0
+        self.bytes_skipped = 0
+        self.name = name
+        self.owner_uuid = owner_uuid
+        self.ensure_unique_name = ensure_unique_name
+        self.num_retries = num_retries
+        self.replication_desired = replication_desired
+        self.filename = filename
+        self._state_lock = threading.Lock()
+        self._state = None # Previous run state (file list & manifest)
+        self._current_files = [] # Current run file list
+        self._cache_file = None
+        self._collection = None
+        self._collection_lock = threading.Lock()
+        self._stop_checkpointer = threading.Event()
+        self._checkpointer = threading.Thread(target=self._update_task)
+        self._update_task_time = update_time  # How many seconds wait between update runs
+        self.logger = logging.getLogger('arvados.arv_put')
+        # Load cached data if any and if needed
+        self._setup_state()
+
+    def start(self):
+        """
+        Start supporting thread & file uploading
+        """
+        self._checkpointer.daemon = True
+        self._checkpointer.start()
         try:
-            state = cache.load()
-            state['_data_buffer'] = [base64.decodestring(state['_data_buffer'])]
-            writer = cls.from_state(state, cache, reporter, bytes_expected,
-                                    num_retries=num_retries,
-                                    replication=replication)
-        except (TypeError, ValueError,
-                arvados.errors.StaleWriterStateError) as error:
-            return cls(cache, reporter, bytes_expected,
-                       num_retries=num_retries,
-                       replication=replication)
-        else:
-            return writer
-
-    def cache_state(self):
-        if self.cache is None:
-            return
-        state = self.dump_state()
-        # Transform attributes for serialization.
-        for attr, value in state.items():
-            if attr == '_data_buffer':
-                state[attr] = base64.encodestring(''.join(value))
-            elif hasattr(value, 'popleft'):
-                state[attr] = list(value)
-        self.cache.save(state)
+            for path in self.paths:
+                # Test for stdin first, in case some file named '-' exist
+                if path == '-':
+                    self._write_stdin(self.filename or 'stdin')
+                elif os.path.isdir(path):
+                    self._write_directory_tree(path)
+                else:
+                    self._write_file(path, self.filename or os.path.basename(path))
+        finally:
+            # Stop the thread before doing anything else
+            self._stop_checkpointer.set()
+            self._checkpointer.join()
+            # Commit all & one last _update()
+            self.manifest_text()
+            self._update()
+            if self.resume:
+                self._cache_file.close()
+                # Correct the final written bytes count
+                self.bytes_written -= self.bytes_skipped
+
+    def save_collection(self):
+        with self._collection_lock:
+            self._my_collection().save_new(
+                name=self.name, owner_uuid=self.owner_uuid,
+                ensure_unique_name=self.ensure_unique_name,
+                num_retries=self.num_retries)
+
+    def destroy_cache(self):
+        if self.resume:
+            try:
+                os.unlink(self._cache_filename)
+            except OSError as error:
+                # That's what we wanted anyway.
+                if error.errno != errno.ENOENT:
+                    raise
+            self._cache_file.close()
+
+    def _collection_size(self, collection):
+        """
+        Recursively get the total size of the collection
+        """
+        size = 0
+        for item in collection.values():
+            if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection):
+                size += self._collection_size(item)
+            else:
+                size += item.size()
+        return size
+
+    def _update_task(self):
+        """
+        Periodically called support task. File uploading is
+        asynchronous so we poll status from the collection.
+        """
+        while not self._stop_checkpointer.wait(self._update_task_time):
+            self._update()
+
+    def _update(self):
+        """
+        Update cached manifest text and report progress.
+        """
+        with self._collection_lock:
+            self.bytes_written = self._collection_size(self._my_collection())
+            # Update cache, if resume enabled
+            if self.resume:
+                with self._state_lock:
+                    # Get the manifest text without comitting pending blocks
+                    self._state['manifest'] = self._my_collection()._get_manifest_text(".", strip=False, normalize=False, only_committed=True)
+                self._save_state()
+        # Call the reporter, if any
+        self.report_progress()
 
     def report_progress(self):
         if self.reporter is not None:
             self.reporter(self.bytes_written, self.bytes_expected)
 
-    def flush_data(self):
-        start_buffer_len = self._data_buffer_len
-        start_block_count = self.bytes_written / arvados.config.KEEP_BLOCK_SIZE
-        super(ArvPutCollectionWriter, self).flush_data()
-        if self._data_buffer_len < start_buffer_len:  # We actually PUT data.
-            self.bytes_written += (start_buffer_len - self._data_buffer_len)
-            self.report_progress()
-            if (self.bytes_written / arvados.config.KEEP_BLOCK_SIZE) > start_block_count:
-                self.cache_state()
-
-    def _record_new_input(self, input_type, source_name, dest_name):
-        # The key needs to be a list because that's what we'll get back
-        # from JSON deserialization.
-        key = [input_type, source_name, dest_name]
-        if key in self._seen_inputs:
-            return False
-        self._seen_inputs.append(key)
-        return True
-
-    def write_file(self, source, filename=None):
-        if self._record_new_input('file', source, filename):
-            super(ArvPutCollectionWriter, self).write_file(source, filename)
-
-    def write_directory_tree(self,
-                             path, stream_name='.', max_manifest_depth=-1):
-        if self._record_new_input('directory', path, stream_name):
-            super(ArvPutCollectionWriter, self).write_directory_tree(
-                path, stream_name, max_manifest_depth)
+    def _write_directory_tree(self, path, stream_name="."):
+        # TODO: Check what happens when multiple directories are passed as
+        # arguments.
+        # If the code below is uncommented, integration test
+        # test_ArvPutSignedManifest (tests.test_arv_put.ArvPutIntegrationTest)
+        # fails, I suppose it is because the manifest_uuid changes because
+        # of the dir addition to stream_name.
+
+        # if stream_name == '.':
+        #     stream_name = os.path.join('.', os.path.basename(path))
+        for item in os.listdir(path):
+            if os.path.isdir(os.path.join(path, item)):
+                self._write_directory_tree(os.path.join(path, item),
+                                os.path.join(stream_name, item))
+            else:
+                self._write_file(os.path.join(path, item),
+                                os.path.join(stream_name, item))
+
+    def _write_stdin(self, filename):
+        with self._collection_lock:
+            output = self._my_collection().open(filename, 'w')
+        self._write(sys.stdin, output)
+        output.close()
+
+    def _write_file(self, source, filename):
+        resume_offset = 0
+        if self.resume:
+            # Check if file was already uploaded (at least partially)
+            with self._collection_lock:
+                try:
+                    file_in_collection = self._my_collection().find(filename)
+                except IOError:
+                    # Not found
+                    file_in_collection = None
+            # If no previous cached data on this file, store it for an eventual
+            # repeated run.
+            if source not in self._state['files']:
+                with self._state_lock:
+                    self._state['files'][source] = {
+                        'mtime': os.path.getmtime(source),
+                        'size' : os.path.getsize(source)
+                    }
+            with self._state_lock:
+                cached_file_data = self._state['files'][source]
+            # See if this file was already uploaded at least partially
+            if file_in_collection:
+                if cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
+                    if cached_file_data['size'] == file_in_collection.size():
+                        # File already there, skip it.
+                        self.bytes_skipped += cached_file_data['size']
+                        return
+                    elif cached_file_data['size'] > file_in_collection.size():
+                        # File partially uploaded, resume!
+                        resume_offset = file_in_collection.size()
+                    else:
+                        # Inconsistent cache, re-upload the file
+                        self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
+                else:
+                    # Local file differs from cached data, re-upload it
+                    pass
+        with open(source, 'r') as source_fd:
+            if resume_offset > 0:
+                # Start upload where we left off
+                with self._collection_lock:
+                    output = self._my_collection().open(filename, 'a')
+                source_fd.seek(resume_offset)
+                self.bytes_skipped += resume_offset
+            else:
+                # Start from scratch
+                with self._collection_lock:
+                    output = self._my_collection().open(filename, 'w')
+            self._write(source_fd, output)
+            output.close(flush=False)
+
+    def _write(self, source_fd, output):
+        first_read = True
+        while True:
+            data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
+            # Allow an empty file to be written
+            if not data and not first_read:
+                break
+            if first_read:
+                first_read = False
+            output.write(data)
+
+    def _my_collection(self):
+        """
+        Create a new collection if none cached. Load it from cache otherwise.
+        """
+        if self._collection is None:
+            with self._state_lock:
+                manifest = self._state['manifest']
+            if self.resume and manifest is not None:
+                # Create collection from saved state
+                self._collection = arvados.collection.Collection(
+                    manifest,
+                    replication_desired=self.replication_desired)
+            else:
+                # Create new collection
+                self._collection = arvados.collection.Collection(
+                    replication_desired=self.replication_desired)
+        return self._collection
+
+    def _setup_state(self):
+        """
+        Create a new cache file or load a previously existing one.
+        """
+        if self.resume:
+            md5 = hashlib.md5()
+            md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
+            realpaths = sorted(os.path.realpath(path) for path in self.paths)
+            md5.update('\0'.join(realpaths))
+            if self.filename:
+                md5.update(self.filename)
+            cache_filename = md5.hexdigest()
+            self._cache_file = open(os.path.join(
+                arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
+                cache_filename), 'a+')
+            self._cache_filename = self._cache_file.name
+            self._lock_file(self._cache_file)
+            self._cache_file.seek(0)
+            with self._state_lock:
+                try:
+                    self._state = json.load(self._cache_file)
+                    if not set(['manifest', 'files']).issubset(set(self._state.keys())):
+                        # Cache at least partially incomplete, set up new cache
+                        self._state = copy.deepcopy(self.EMPTY_STATE)
+                except ValueError:
+                    # Cache file empty, set up new cache
+                    self._state = copy.deepcopy(self.EMPTY_STATE)
+            # Load how many bytes were uploaded on previous run
+            with self._collection_lock:
+                self.bytes_written = self._collection_size(self._my_collection())
+        # No resume required
+        else:
+            with self._state_lock:
+                self._state = copy.deepcopy(self.EMPTY_STATE)
+
+    def _lock_file(self, fileobj):
+        try:
+            fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
+        except IOError:
+            raise ResumeCacheConflict("{} locked".format(fileobj.name))
+
+    def _save_state(self):
+        """
+        Atomically save current state into cache.
+        """
+        try:
+            with self._state_lock:
+                state = self._state
+            new_cache_fd, new_cache_name = tempfile.mkstemp(
+                dir=os.path.dirname(self._cache_filename))
+            self._lock_file(new_cache_fd)
+            new_cache = os.fdopen(new_cache_fd, 'r+')
+            json.dump(state, new_cache)
+            new_cache.flush()
+            os.fsync(new_cache)
+            os.rename(new_cache_name, self._cache_filename)
+        except (IOError, OSError, ResumeCacheConflict) as error:
+            self.logger.error("There was a problem while saving the cache file: {}".format(error))
+            try:
+                os.unlink(new_cache_name)
+            except NameError:  # mkstemp failed.
+                pass
+        else:
+            self._cache_file.close()
+            self._cache_file = new_cache
+
+    def collection_name(self):
+        with self._collection_lock:
+            name = self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
+        return name
+
+    def manifest_locator(self):
+        with self._collection_lock:
+            locator = self._my_collection().manifest_locator()
+        return locator
+
+    def portable_data_hash(self):
+        with self._collection_lock:
+            datahash = self._my_collection().portable_data_hash()
+        return datahash
+
+    def manifest_text(self, stream_name=".", strip=False, normalize=False):
+        with self._collection_lock:
+            manifest = self._my_collection().manifest_text(stream_name, strip, normalize)
+        return manifest
+
+    def _datablocks_on_item(self, item):
+        """
+        Return a list of datablock locators, recursively navigating
+        through subcollections
+        """
+        if isinstance(item, arvados.arvfile.ArvadosFile):
+            if item.size() == 0:
+                # Empty file locator
+                return ["d41d8cd98f00b204e9800998ecf8427e+0"]
+            else:
+                locators = []
+                for segment in item.segments():
+                    loc = segment.locator
+                    locators.append(loc)
+                return locators
+        elif isinstance(item, arvados.collection.Collection):
+            l = [self._datablocks_on_item(x) for x in item.values()]
+            # Fast list flattener method taken from:
+            # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python
+            return [loc for sublist in l for loc in sublist]
+        else:
+            return None
+
+    def data_locators(self):
+        with self._collection_lock:
+            # Make sure all datablocks are flushed before getting the locators
+            self._my_collection().manifest_text()
+            datablocks = self._datablocks_on_item(self._my_collection())
+        return datablocks
 
 
 def expected_bytes_for(pathlist):
@@ -430,118 +698,62 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
         print >>stderr, error
         sys.exit(1)
 
-    # write_copies diverges from args.replication here.
-    # args.replication is how many copies we will instruct Arvados to
-    # maintain (by passing it in collections().create()) after all
-    # data is written -- and if None was given, we'll use None there.
-    # Meanwhile, write_copies is how many copies of each data block we
-    # write to Keep, which has to be a number.
-    #
-    # If we simply changed args.replication from None to a default
-    # here, we'd end up erroneously passing the default replication
-    # level (instead of None) to collections().create().
-    write_copies = (args.replication or
-                    api_client._rootDesc.get('defaultCollectionReplication', 2))
-
     if args.progress:
         reporter = progress_writer(human_progress)
     elif args.batch_progress:
         reporter = progress_writer(machine_progress)
     else:
         reporter = None
-    bytes_expected = expected_bytes_for(args.paths)
-
-    resume_cache = None
-    if args.resume:
-        try:
-            resume_cache = ResumeCache(ResumeCache.make_path(args))
-            resume_cache.check_cache(api_client=api_client, num_retries=args.retries)
-        except (IOError, OSError, ValueError):
-            pass  # Couldn't open cache directory/file.  Continue without it.
-        except ResumeCacheConflict:
-            print >>stderr, "\n".join([
-                "arv-put: Another process is already uploading this data.",
-                "         Use --no-resume if this is really what you want."])
-            sys.exit(1)
 
-    if resume_cache is None:
-        writer = ArvPutCollectionWriter(
-            resume_cache, reporter, bytes_expected,
-            num_retries=args.retries,
-            replication=write_copies)
-    else:
-        writer = ArvPutCollectionWriter.from_cache(
-            resume_cache, reporter, bytes_expected,
-            num_retries=args.retries,
-            replication=write_copies)
+    bytes_expected = expected_bytes_for(args.paths)
+    try:
+        writer = ArvPutUploadJob(paths = args.paths,
+                                 resume = args.resume,
+                                 filename = args.filename,
+                                 reporter = reporter,
+                                 bytes_expected = bytes_expected,
+                                 num_retries = args.retries,
+                                 replication_desired = args.replication,
+                                 name = collection_name,
+                                 owner_uuid = project_uuid,
+                                 ensure_unique_name = True)
+    except ResumeCacheConflict:
+        print >>stderr, "\n".join([
+            "arv-put: Another process is already uploading this data.",
+            "         Use --no-resume if this is really what you want."])
+        sys.exit(1)
 
     # Install our signal handler for each code in CAUGHT_SIGNALS, and save
     # the originals.
     orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
                             for sigcode in CAUGHT_SIGNALS}
 
-    if writer.bytes_written > 0:  # We're resuming a previous upload.
+    if args.resume and writer.bytes_written > 0:
         print >>stderr, "\n".join([
                 "arv-put: Resuming previous upload from last checkpoint.",
                 "         Use the --no-resume option to start over."])
 
     writer.report_progress()
-    writer.do_queued_work()  # Do work resumed from cache.
-    for path in args.paths:  # Copy file data to Keep.
-        if path == '-':
-            writer.start_new_stream()
-            writer.start_new_file(args.filename)
-            r = sys.stdin.read(64*1024)
-            while r:
-                # Need to bypass _queued_file check in ResumableCollectionWriter.write() to get
-                # CollectionWriter.write().
-                super(arvados.collection.ResumableCollectionWriter, writer).write(r)
-                r = sys.stdin.read(64*1024)
-        elif os.path.isdir(path):
-            writer.write_directory_tree(
-                path, max_manifest_depth=args.max_manifest_depth)
-        else:
-            writer.start_new_stream()
-            writer.write_file(path, args.filename or os.path.basename(path))
-    writer.finish_current_stream()
-
+    output = None
+    writer.start()
     if args.progress:  # Print newline to split stderr from stdout for humans.
         print >>stderr
 
-    output = None
     if args.stream:
-        output = writer.manifest_text()
         if args.normalize:
-            output = arvados.collection.CollectionReader(output).manifest_text(normalize=True)
+            output = writer.manifest_text(normalize=True)
+        else:
+            output = writer.manifest_text()
     elif args.raw:
         output = ','.join(writer.data_locators())
     else:
         try:
-            manifest_text = writer.manifest_text()
-            if args.normalize:
-                manifest_text = arvados.collection.CollectionReader(manifest_text).manifest_text(normalize=True)
-            replication_attr = 'replication_desired'
-            if api_client._schema.schemas['Collection']['properties'].get(replication_attr, None) is None:
-                # API called it 'redundancy' before #3410.
-                replication_attr = 'redundancy'
-            # Register the resulting collection in Arvados.
-            collection = api_client.collections().create(
-                body={
-                    'owner_uuid': project_uuid,
-                    'name': collection_name,
-                    'manifest_text': manifest_text,
-                    replication_attr: args.replication,
-                    },
-                ensure_unique_name=True
-                ).execute(num_retries=args.retries)
-
-            print >>stderr, "Collection saved as '%s'" % collection['name']
-
-            if args.portable_data_hash and 'portable_data_hash' in collection and collection['portable_data_hash']:
-                output = collection['portable_data_hash']
+            writer.save_collection()
+            print >>stderr, "Collection saved as '%s'" % writer.collection_name()
+            if args.portable_data_hash:
+                output = writer.portable_data_hash()
             else:
-                output = collection['uuid']
-
+                output = writer.manifest_locator()
         except apiclient_errors.Error as error:
             print >>stderr, (
                 "arv-put: Error creating Collection on project: {}.".format(
@@ -562,10 +774,10 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
     if status != 0:
         sys.exit(status)
 
-    if resume_cache is not None:
-        resume_cache.destroy()
-
+    # Success!
+    writer.destroy_cache()
     return output
 
+
 if __name__ == '__main__':
     main()
index a1b46384394d84ad6b68e5c6971cc890823a9d12..cf26f9e8addb7b6e45e2af87a2504947878c7c6c 100644 (file)
@@ -155,39 +155,66 @@ class PollClient(threading.Thread):
         self._closing_lock = threading.RLock()
 
     def run(self):
-        self.id = 0
         if self.last_log_id != None:
-            self.id = self.last_log_id
+            # Caller supplied the last-seen event ID from a previous
+            # connection
+            skip_old_events = [["id", ">", str(self.last_log_id)]]
         else:
-            for f in self.filters:
-                for tries_left in RetryLoop(num_retries=25, backoff_start=.1, max_wait=self.poll_time):
-                    try:
-                        items = self.api.logs().list(limit=1, order="id desc", filters=f).execute()['items']
-                        break
-                    except errors.ApiError as error:
-                        pass
-                    else:
-                        tries_left = 0
-                        break
-                if tries_left == 0:
-                    _logger.exception("PollClient thread could not contact API server.")
-                    with self._closing_lock:
-                        self._closing.set()
-                    thread.interrupt_main()
-                    return
-                if items:
-                    if items[0]['id'] > self.id:
-                        self.id = items[0]['id']
+            # We need to do a reverse-order query to find the most
+            # recent event ID (see "if not skip_old_events" below).
+            skip_old_events = False
 
         self.on_event({'status': 200})
 
         while not self._closing.is_set():
-            max_id = self.id
             moreitems = False
             for f in self.filters:
                 for tries_left in RetryLoop(num_retries=25, backoff_start=.1, max_wait=self.poll_time):
                     try:
-                        items = self.api.logs().list(order="id asc", filters=f+[["id", ">", str(self.id)]]).execute()
+                        if not skip_old_events:
+                            # If the caller didn't provide a known
+                            # recent ID, our first request will ask
+                            # for the single most recent event from
+                            # the last 2 hours (the time restriction
+                            # avoids doing an expensive database
+                            # query, and leaves a big enough margin to
+                            # account for clock skew). If we do find a
+                            # recent event, we remember its ID but
+                            # then discard it (we are supposed to be
+                            # returning new/current events, not old
+                            # ones).
+                            #
+                            # Subsequent requests will get multiple
+                            # events in chronological order, and
+                            # filter on that same cutoff time, or
+                            # (once we see our first matching event)
+                            # the ID of the last-seen event.
+                            skip_old_events = [[
+                                "created_at", ">=",
+                                time.strftime(
+                                    "%Y-%m-%dT%H:%M:%SZ",
+                                    time.gmtime(time.time()-7200))]]
+                            items = self.api.logs().list(
+                                order="id desc",
+                                limit=1,
+                                filters=f+skip_old_events).execute()
+                            if items["items"]:
+                                skip_old_events = [
+                                    ["id", ">", str(items["items"][0]["id"])]]
+                                items = {
+                                    "items": [],
+                                    "items_available": 0,
+                                }
+                        else:
+                            # In this case, either we know the most
+                            # recent matching ID, or we know there
+                            # were no matching events in the 2-hour
+                            # window before subscribing. Either way we
+                            # can safely ask for events in ascending
+                            # order.
+                            items = self.api.logs().list(
+                                order="id asc",
+                                filters=f+skip_old_events).execute()
                         break
                     except errors.ApiError as error:
                         pass
@@ -201,8 +228,7 @@ class PollClient(threading.Thread):
                     thread.interrupt_main()
                     return
                 for i in items["items"]:
-                    if i['id'] > max_id:
-                        max_id = i['id']
+                    skip_old_events = [["id", ">", str(i["id"])]]
                     with self._closing_lock:
                         if self._closing.is_set():
                             return
@@ -213,7 +239,6 @@ class PollClient(threading.Thread):
                             thread.interrupt_main()
                 if items["items_available"] > len(items["items"]):
                     moreitems = True
-            self.id = max_id
             if not moreitems:
                 self._closing.wait(self.poll_time)
 
index e64d91474170ce688780c3ab94ea3ae6bb69bbfb..7a0120c02814d00b27e81dd41fbb50e51ef2855c 100644 (file)
@@ -13,11 +13,15 @@ import tempfile
 import time
 import unittest
 import yaml
+import threading
+import hashlib
+import random
 
 from cStringIO import StringIO
 
 import arvados
 import arvados.commands.put as arv_put
+import arvados_testutil as tutil
 
 from arvados_testutil import ArvadosBaseTestCase, fake_httplib2_response
 import run_test_server
@@ -234,66 +238,53 @@ class ArvadosPutResumeCacheTest(ArvadosBaseTestCase):
                           arv_put.ResumeCache, path)
 
 
-class ArvadosPutCollectionWriterTest(run_test_server.TestCaseWithServers,
-                                     ArvadosBaseTestCase):
+class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
+                          ArvadosBaseTestCase):
     def setUp(self):
-        super(ArvadosPutCollectionWriterTest, self).setUp()
+        super(ArvPutUploadJobTest, self).setUp()
         run_test_server.authorize_with('active')
-        with tempfile.NamedTemporaryFile(delete=False) as cachefile:
-            self.cache = arv_put.ResumeCache(cachefile.name)
-            self.cache_filename = cachefile.name
+        # Temp files creation
+        self.tempdir = tempfile.mkdtemp()
+        subdir = os.path.join(self.tempdir, 'subdir')
+        os.mkdir(subdir)
+        data = "x" * 1024 # 1 KB
+        for i in range(1, 5):
+            with open(os.path.join(self.tempdir, str(i)), 'w') as f:
+                f.write(data * i)
+        with open(os.path.join(subdir, 'otherfile'), 'w') as f:
+            f.write(data * 5)
+        # Large temp file for resume test
+        _, self.large_file_name = tempfile.mkstemp()
+        fileobj = open(self.large_file_name, 'w')
+        # Make sure to write just a little more than one block
+        for _ in range((arvados.config.KEEP_BLOCK_SIZE/(1024*1024))+1):
+            data = random.choice(['x', 'y', 'z']) * 1024 * 1024 # 1 MB
+            fileobj.write(data)
+        fileobj.close()
+        self.arvfile_write = getattr(arvados.arvfile.ArvadosFileWriter, 'write')
 
     def tearDown(self):
-        super(ArvadosPutCollectionWriterTest, self).tearDown()
-        if os.path.exists(self.cache_filename):
-            self.cache.destroy()
-        self.cache.close()
-
-    def test_writer_caches(self):
-        cwriter = arv_put.ArvPutCollectionWriter(self.cache)
-        cwriter.write_file('/dev/null')
-        cwriter.cache_state()
-        self.assertTrue(self.cache.load())
-        self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text())
+        super(ArvPutUploadJobTest, self).tearDown()
+        shutil.rmtree(self.tempdir)
+        os.unlink(self.large_file_name)
 
     def test_writer_works_without_cache(self):
-        cwriter = arv_put.ArvPutCollectionWriter()
-        cwriter.write_file('/dev/null')
-        self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text())
-
-    def test_writer_resumes_from_cache(self):
-        cwriter = arv_put.ArvPutCollectionWriter(self.cache)
-        with self.make_test_file() as testfile:
-            cwriter.write_file(testfile.name, 'test')
-            cwriter.cache_state()
-            new_writer = arv_put.ArvPutCollectionWriter.from_cache(
-                self.cache)
-            self.assertEqual(
-                ". 098f6bcd4621d373cade4e832627b4f6+4 0:4:test\n",
-                new_writer.manifest_text())
-
-    def test_new_writer_from_stale_cache(self):
-        cwriter = arv_put.ArvPutCollectionWriter(self.cache)
-        with self.make_test_file() as testfile:
-            cwriter.write_file(testfile.name, 'test')
-        new_writer = arv_put.ArvPutCollectionWriter.from_cache(self.cache)
-        new_writer.write_file('/dev/null')
-        self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", new_writer.manifest_text())
-
-    def test_new_writer_from_empty_cache(self):
-        cwriter = arv_put.ArvPutCollectionWriter.from_cache(self.cache)
-        cwriter.write_file('/dev/null')
+        cwriter = arv_put.ArvPutUploadJob(['/dev/null'], resume=False)
+        cwriter.start()
         self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text())
 
-    def test_writer_resumable_after_arbitrary_bytes(self):
-        cwriter = arv_put.ArvPutCollectionWriter(self.cache)
-        # These bytes are intentionally not valid UTF-8.
-        with self.make_test_file('\x00\x07\xe2') as testfile:
-            cwriter.write_file(testfile.name, 'test')
-            cwriter.cache_state()
-            new_writer = arv_put.ArvPutCollectionWriter.from_cache(
-                self.cache)
-        self.assertEqual(cwriter.manifest_text(), new_writer.manifest_text())
+    def test_writer_works_with_cache(self):
+        with tempfile.NamedTemporaryFile() as f:
+            f.write('foo')
+            f.flush()
+            cwriter = arv_put.ArvPutUploadJob([f.name])
+            cwriter.start()
+            self.assertEqual(3, cwriter.bytes_written)
+            # Don't destroy the cache, and start another upload
+            cwriter_new = arv_put.ArvPutUploadJob([f.name])
+            cwriter_new.start()
+            cwriter_new.destroy_cache()
+            self.assertEqual(0, cwriter_new.bytes_written)
 
     def make_progress_tester(self):
         progression = []
@@ -302,24 +293,47 @@ class ArvadosPutCollectionWriterTest(run_test_server.TestCaseWithServers,
         return progression, record_func
 
     def test_progress_reporting(self):
-        for expect_count in (None, 8):
-            progression, reporter = self.make_progress_tester()
-            cwriter = arv_put.ArvPutCollectionWriter(
-                reporter=reporter, bytes_expected=expect_count)
-            with self.make_test_file() as testfile:
-                cwriter.write_file(testfile.name, 'test')
-            cwriter.finish_current_stream()
-            self.assertIn((4, expect_count), progression)
-
-    def test_resume_progress(self):
-        cwriter = arv_put.ArvPutCollectionWriter(self.cache, bytes_expected=4)
-        with self.make_test_file() as testfile:
-            # Set up a writer with some flushed bytes.
-            cwriter.write_file(testfile.name, 'test')
-            cwriter.finish_current_stream()
-            cwriter.cache_state()
-            new_writer = arv_put.ArvPutCollectionWriter.from_cache(self.cache)
-            self.assertEqual(new_writer.bytes_written, 4)
+        with tempfile.NamedTemporaryFile() as f:
+            f.write('foo')
+            f.flush()
+            for expect_count in (None, 8):
+                progression, reporter = self.make_progress_tester()
+                cwriter = arv_put.ArvPutUploadJob([f.name],
+                    reporter=reporter, bytes_expected=expect_count)
+                cwriter.start()
+                cwriter.destroy_cache()
+                self.assertIn((3, expect_count), progression)
+
+    def test_writer_upload_directory(self):
+        cwriter = arv_put.ArvPutUploadJob([self.tempdir])
+        cwriter.start()
+        cwriter.destroy_cache()
+        self.assertEqual(1024*(1+2+3+4+5), cwriter.bytes_written)
+
+    def test_resume_large_file_upload(self):
+        def wrapped_write(*args, **kwargs):
+            data = args[1]
+            # Exit only on last block
+            if len(data) < arvados.config.KEEP_BLOCK_SIZE:
+                raise SystemExit("Simulated error")
+            return self.arvfile_write(*args, **kwargs)
+
+        with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
+                        autospec=True) as mocked_write:
+            mocked_write.side_effect = wrapped_write
+            writer = arv_put.ArvPutUploadJob([self.large_file_name],
+                                             replication_desired=1)
+            with self.assertRaises(SystemExit):
+                writer.start()
+                self.assertLess(writer.bytes_written,
+                                os.path.getsize(self.large_file_name))
+        # Retry the upload
+        writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
+                                          replication_desired=1)
+        writer2.start()
+        self.assertEqual(writer.bytes_written + writer2.bytes_written,
+                         os.path.getsize(self.large_file_name))
+        writer2.destroy_cache()
 
 
 class ArvadosExpectedBytesTest(ArvadosBaseTestCase):
@@ -420,9 +434,8 @@ class ArvadosPutTest(run_test_server.TestCaseWithServers, ArvadosBaseTestCase):
             os.chmod(cachedir, 0o700)
 
     def test_put_block_replication(self):
-        with mock.patch('arvados.collection.KeepClient.local_store_put') as put_mock, \
-             mock.patch('arvados.commands.put.ResumeCache.load') as cache_mock:
-            cache_mock.side_effect = ValueError
+        self.call_main_on_test_file()
+        with mock.patch('arvados.collection.KeepClient.local_store_put') as put_mock:
             put_mock.return_value = 'acbd18db4cc2f85cedef654fccc4a4d8+3'
             self.call_main_on_test_file(['--replication', '1'])
             self.call_main_on_test_file(['--replication', '4'])
@@ -461,17 +474,16 @@ class ArvadosPutTest(run_test_server.TestCaseWithServers, ArvadosBaseTestCase):
                           ['--project-uuid', self.Z_UUID, '--stream'])
 
     def test_api_error_handling(self):
-        collections_mock = mock.Mock(name='arv.collections()')
-        coll_create_mock = collections_mock().create().execute
-        coll_create_mock.side_effect = arvados.errors.ApiError(
+        coll_save_mock = mock.Mock(name='arv.collection.Collection().save_new()')
+        coll_save_mock.side_effect = arvados.errors.ApiError(
             fake_httplib2_response(403), '{}')
-        arv_put.api_client = arvados.api('v1')
-        arv_put.api_client.collections = collections_mock
-        with self.assertRaises(SystemExit) as exc_test:
-            self.call_main_with_args(['/dev/null'])
-        self.assertLess(0, exc_test.exception.args[0])
-        self.assertLess(0, coll_create_mock.call_count)
-        self.assertEqual("", self.main_stdout.getvalue())
+        with mock.patch('arvados.collection.Collection.save_new',
+                        new=coll_save_mock):
+            with self.assertRaises(SystemExit) as exc_test:
+                self.call_main_with_args(['/dev/null'])
+            self.assertLess(0, exc_test.exception.args[0])
+            self.assertLess(0, coll_save_mock.call_count)
+            self.assertEqual("", self.main_stdout.getvalue())
 
 
 class ArvPutIntegrationTest(run_test_server.TestCaseWithServers,
index 44733cdfb82ff1c21c4ca379a723110ebcaf5721..922cf7dac16b87741013c23e4073d4070a6fbe43 100644 (file)
@@ -182,10 +182,10 @@ class Arvados::V1::CollectionsController < ApplicationController
   protected
 
   def load_limit_offset_order_params *args
+    super
     if action_name == 'index'
       # Omit manifest_text from index results unless expressly selected.
       @select ||= model_class.selectable_attributes - ["manifest_text"]
     end
-    super
   end
 end
index 7a5713a03c59651a3bc0050674ea38171d8f9c34..d6adbf08516a7c2c199cb240017b5e64ede195be 100644 (file)
@@ -75,7 +75,20 @@ class Arvados::V1::GroupsController < ApplicationController
       end
     end
 
+    wanted_klasses = []
+    request_filters.each do |col,op,val|
+      if op == 'is_a'
+        (val.is_a?(Array) ? val : [val]).each do |type|
+          type = type.split('#')[-1]
+          type[0] = type[0].capitalize
+          wanted_klasses << type
+        end
+      end
+    end
+
     klasses.each do |klass|
+      next if wanted_klasses.any? and !wanted_klasses.include?(klass.to_s)
+
       # If the currently requested orders specifically match the
       # table_name for the current klass, apply that order.
       # Otherwise, order by recency.
index 672374bc6c768f4f7bd8be0ba81daf2fbefa1629..18d5647cc929e760a72ed48ed709a9d18b8da8a3 100644 (file)
@@ -636,7 +636,7 @@ class ArvadosModel < ActiveRecord::Base
   end
 
   def log_destroy
-    log_change('destroy') do |log|
+    log_change('delete') do |log|
       log.fill_properties('old', etag(@old_attributes), @old_logged_attributes)
       log.update_to nil
     end
index b1ea9bd230a47e2382dbb12ac0c0d6bee6929588..52f1cba723ed5744af3bf226265f9bb600d4f61f 100644 (file)
@@ -11,6 +11,7 @@ class Container < ArvadosModel
   serialize :mounts, Hash
   serialize :runtime_constraints, Hash
   serialize :command, Array
+  serialize :scheduling_parameters, Hash
 
   before_validation :fill_field_defaults, :if => :new_record?
   before_validation :set_timestamps
@@ -44,6 +45,7 @@ class Container < ArvadosModel
     t.add :started_at
     t.add :state
     t.add :auth_uuid
+    t.add :scheduling_parameters
   end
 
   # Supported states for a container
@@ -180,6 +182,7 @@ class Container < ArvadosModel
     self.mounts ||= {}
     self.cwd ||= "."
     self.priority ||= 1
+    self.scheduling_parameters ||= {}
   end
 
   def permission_to_create
@@ -222,7 +225,7 @@ class Container < ArvadosModel
     if self.new_record?
       permitted.push(:owner_uuid, :command, :container_image, :cwd,
                      :environment, :mounts, :output_path, :priority,
-                     :runtime_constraints)
+                     :runtime_constraints, :scheduling_parameters)
     end
 
     case self.state
@@ -326,6 +329,9 @@ class Container < ArvadosModel
     if self.runtime_constraints_changed?
       self.runtime_constraints = self.class.deep_sort_hash(self.runtime_constraints)
     end
+    if self.scheduling_parameters_changed?
+      self.scheduling_parameters = self.class.deep_sort_hash(self.scheduling_parameters)
+    end
   end
 
   def handle_completed
@@ -348,7 +354,8 @@ class Container < ArvadosModel
             output_path: self.output_path,
             container_image: self.container_image,
             mounts: self.mounts,
-            runtime_constraints: self.runtime_constraints
+            runtime_constraints: self.runtime_constraints,
+            scheduling_parameters: self.scheduling_parameters
           }
           c = Container.create! c_attrs
           retryable_requests.each do |cr|
index 1798150af3e83145a49762a65b778f5adf96995c..7dcfbe378b6700a88b41ae11b6a15e8a28a6fe20 100644 (file)
@@ -11,13 +11,15 @@ class ContainerRequest < ArvadosModel
   serialize :mounts, Hash
   serialize :runtime_constraints, Hash
   serialize :command, Array
+  serialize :scheduling_parameters, Hash
 
   before_validation :fill_field_defaults, :if => :new_record?
+  before_validation :validate_runtime_constraints
+  before_validation :validate_scheduling_parameters
   before_validation :set_container
   validates :command, :container_image, :output_path, :cwd, :presence => true
   validate :validate_state_change
   validate :validate_change
-  validate :validate_runtime_constraints
   after_save :update_priority
   after_save :finalize_if_needed
   before_create :set_requesting_container_uuid
@@ -42,6 +44,7 @@ class ContainerRequest < ArvadosModel
     t.add :runtime_constraints
     t.add :state
     t.add :use_existing
+    t.add :scheduling_parameters
   end
 
   # Supported states for a container request
@@ -105,6 +108,7 @@ class ContainerRequest < ArvadosModel
     self.mounts ||= {}
     self.cwd ||= "."
     self.container_count_max ||= Rails.configuration.container_count_max
+    self.scheduling_parameters ||= {}
   end
 
   # Create a new container (or find an existing one) to satisfy this
@@ -126,6 +130,7 @@ class ContainerRequest < ArvadosModel
       if not reusable.nil?
         reusable
       else
+        c_attrs[:scheduling_parameters] = self.scheduling_parameters
         Container.create!(c_attrs)
       end
     end
@@ -223,6 +228,25 @@ class ContainerRequest < ArvadosModel
           errors.add :runtime_constraints, "#{k} must be a positive integer"
         end
       end
+
+      if runtime_constraints.include? 'keep_cache_ram' and
+         (!runtime_constraints['keep_cache_ram'].is_a?(Integer) or
+          runtime_constraints['keep_cache_ram'] <= 0)
+            errors.add :runtime_constraints, "keep_cache_ram must be a positive integer"
+      elsif !runtime_constraints.include? 'keep_cache_ram'
+        runtime_constraints['keep_cache_ram'] = Rails.configuration.container_default_keep_cache_ram
+      end
+    end
+  end
+
+  def validate_scheduling_parameters
+    if self.state == Committed
+      if scheduling_parameters.include? 'partitions' and
+         (!scheduling_parameters['partitions'].is_a?(Array) ||
+          scheduling_parameters['partitions'].reject{|x| !x.is_a?(String)}.size !=
+            scheduling_parameters['partitions'].size)
+            errors.add :scheduling_parameters, "partitions must be an array of strings"
+      end
     end
   end
 
@@ -236,7 +260,7 @@ class ContainerRequest < ArvadosModel
                      :container_image, :cwd, :description, :environment,
                      :filters, :mounts, :name, :output_path, :priority,
                      :properties, :requesting_container_uuid, :runtime_constraints,
-                     :state, :container_uuid, :use_existing
+                     :state, :container_uuid, :use_existing, :scheduling_parameters
 
     when Committed
       if container_uuid.nil?
@@ -255,7 +279,7 @@ class ContainerRequest < ArvadosModel
         permitted.push :command, :container_image, :cwd, :description, :environment,
                        :filters, :mounts, :name, :output_path, :properties,
                        :requesting_container_uuid, :runtime_constraints,
-                       :state, :container_uuid
+                       :state, :container_uuid, :scheduling_parameters
       end
 
     when Final
index f8d624acb77c19261dcd16f5b2780653d774ac97..7eab402609b482a238f8a40313bf622ece86c3c0 100644 (file)
@@ -47,7 +47,7 @@ class Log < ArvadosModel
       self.event_at = thing.created_at
     when "update"
       self.event_at = thing.modified_at
-    when "destroy"
+    when "delete"
       self.event_at = db_current_time
     end
     self
index abb46fdc661128f5321a55b186d54afd142ed5f3..e470e4c2bd9c47a45b395a4c90f4814edf89a417 100644 (file)
@@ -13,6 +13,8 @@ class Node < ArvadosModel
   belongs_to(:job, foreign_key: :job_uuid, primary_key: :uuid)
   attr_accessor :job_readable
 
+  UNUSED_NODE_IP = '127.40.4.0'
+
   api_accessible :user, :extend => :common do |t|
     t.add :hostname
     t.add :domain
@@ -133,20 +135,22 @@ class Node < ArvadosModel
   end
 
   def dns_server_update
-    if self.hostname_changed? or self.ip_address_changed?
-      if not self.ip_address.nil?
-        stale_conflicting_nodes = Node.where('id != ? and ip_address = ? and last_ping_at < ?',self.id,self.ip_address,10.minutes.ago)
-        if not stale_conflicting_nodes.empty?
-          # One or more stale compute node records have the same IP address as the new node.
-          # Clear the ip_address field on the stale nodes.
-          stale_conflicting_nodes.each do |stale_node|
-            stale_node.ip_address = nil
-            stale_node.save!
-          end
+    if hostname_changed? && hostname_was
+      self.class.dns_server_update(hostname_was, UNUSED_NODE_IP)
+    end
+    if hostname_changed? or ip_address_changed?
+      if ip_address
+        Node.where('id != ? and ip_address = ? and last_ping_at < ?',
+                   id, ip_address, 10.minutes.ago).each do |stale_node|
+          # One or more stale compute node records have the same IP
+          # address as the new node.  Clear the ip_address field on
+          # the stale nodes.
+          stale_node.ip_address = nil
+          stale_node.save!
         end
       end
-      if self.hostname and self.ip_address
-        self.class.dns_server_update(self.hostname, self.ip_address)
+      if hostname
+        self.class.dns_server_update(hostname, ip_address || UNUSED_NODE_IP)
       end
     end
   end
@@ -225,7 +229,7 @@ class Node < ArvadosModel
       if !File.exists? hostfile
         n = Node.where(:slot_number => slot_number).first
         if n.nil? or n.ip_address.nil?
-          dns_server_update(hostname, '127.40.4.0')
+          dns_server_update(hostname, UNUSED_NODE_IP)
         else
           dns_server_update(hostname, n.ip_address)
         end
index 5fe03024bf7d3e9ccaee6d108c889c1078421d36..a9aa953f9f36e948dc57d34336c7d3f1cc1df43c 100644 (file)
@@ -393,6 +393,9 @@ common:
   # with the cancelled container.
   container_count_max: 3
 
+  # Default value for keep_cache_ram of a container's runtime_constraints.
+  container_default_keep_cache_ram: 268435456
+
 development:
   force_ssl: false
   cache_classes: false
diff --git a/services/api/db/migrate/20161111143147_add_scheduling_parameters_to_container.rb b/services/api/db/migrate/20161111143147_add_scheduling_parameters_to_container.rb
new file mode 100644 (file)
index 0000000..1b317cf
--- /dev/null
@@ -0,0 +1,6 @@
+class AddSchedulingParametersToContainer < ActiveRecord::Migration
+  def change
+    add_column :containers, :scheduling_parameters, :text
+    add_column :container_requests, :scheduling_parameters, :text
+  end
+end
index 0db782af69484e6a8e0c476620891702055f36c7..1d3d238c837611e2858dbfd0959cfc7373a41917 100644 (file)
@@ -291,7 +291,8 @@ CREATE TABLE container_requests (
     filters text,
     updated_at timestamp without time zone NOT NULL,
     container_count integer DEFAULT 0,
-    use_existing boolean DEFAULT true
+    use_existing boolean DEFAULT true,
+    scheduling_parameters text
 );
 
 
@@ -343,7 +344,8 @@ CREATE TABLE containers (
     updated_at timestamp without time zone NOT NULL,
     exit_code integer,
     auth_uuid character varying(255),
-    locked_by_uuid character varying(255)
+    locked_by_uuid character varying(255),
+    scheduling_parameters text
 );
 
 
@@ -2694,4 +2696,6 @@ INSERT INTO schema_migrations (version) VALUES ('20160909181442');
 
 INSERT INTO schema_migrations (version) VALUES ('20160926194129');
 
-INSERT INTO schema_migrations (version) VALUES ('20161019171346');
\ No newline at end of file
+INSERT INTO schema_migrations (version) VALUES ('20161019171346');
+
+INSERT INTO schema_migrations (version) VALUES ('20161111143147');
\ No newline at end of file
index 489bb1d6605f86d622c824260d96ef89f63bd026..c5516cc38b34dc9329fc759b1c61b2877681f1af 100644 (file)
@@ -47,7 +47,7 @@ was_idle_now_down:
   hostname: compute3
   slot_number: ~
   domain: ""
-  ip_address: 172.17.2.173
+  ip_address: 172.17.2.174
   last_ping_at: <%= 1.hour.ago.to_s(:db) %>
   first_ping_at: <%= 23.hour.ago.to_s(:db) %>
   job_uuid: ~
@@ -62,7 +62,7 @@ new_with_no_hostname:
   owner_uuid: zzzzz-tpzed-000000000000000
   hostname: ~
   slot_number: ~
-  ip_address: 172.17.2.173
+  ip_address: 172.17.2.175
   last_ping_at: ~
   first_ping_at: ~
   job_uuid: ~
@@ -74,7 +74,7 @@ new_with_custom_hostname:
   owner_uuid: zzzzz-tpzed-000000000000000
   hostname: custom1
   slot_number: 23
-  ip_address: 172.17.2.173
+  ip_address: 172.17.2.176
   last_ping_at: ~
   first_ping_at: ~
   job_uuid: ~
index a8583be12bb70d915585c8c48aba0bc06aa32d3e..c85cc1979f99482ff36ba6dc38ba5790ec7bf591 100644 (file)
@@ -46,6 +46,49 @@ class Arvados::V1::CollectionsControllerTest < ActionController::TestCase
     end
   end
 
+  test 'index without select returns everything except manifest' do
+    authorize_with :active
+    get :index
+    assert_response :success
+    assert json_response['items'].any?
+    json_response['items'].each do |coll|
+      assert_includes(coll.keys, 'uuid')
+      assert_includes(coll.keys, 'name')
+      assert_includes(coll.keys, 'created_at')
+      refute_includes(coll.keys, 'manifest_text')
+    end
+  end
+
+  ['', nil, false, 'null'].each do |select|
+    test "index with select=#{select.inspect} returns everything except manifest" do
+      authorize_with :active
+      get :index, select: select
+      assert_response :success
+      assert json_response['items'].any?
+      json_response['items'].each do |coll|
+        assert_includes(coll.keys, 'uuid')
+        assert_includes(coll.keys, 'name')
+        assert_includes(coll.keys, 'created_at')
+        refute_includes(coll.keys, 'manifest_text')
+      end
+    end
+  end
+
+  [["uuid"],
+   ["uuid", "manifest_text"],
+   '["uuid"]',
+   '["uuid", "manifest_text"]'].each do |select|
+    test "index with select=#{select.inspect} returns no name" do
+      authorize_with :active
+      get :index, select: select
+      assert_response :success
+      assert json_response['items'].any?
+      json_response['items'].each do |coll|
+        refute_includes(coll.keys, 'name')
+      end
+    end
+  end
+
   [0,1,2].each do |limit|
     test "get index with limit=#{limit}" do
       authorize_with :active
index 406bb42d1290c6f00b55421a695fac1813154245..1465c7180ad7b59cb947b4cfb825d2957ea17844 100644 (file)
@@ -129,7 +129,7 @@ class ContainerRequestTest < ActiveSupport::TestCase
     assert_equal({}, c.environment)
     assert_equal({"/out" => {"kind"=>"tmp", "capacity"=>1000000}}, c.mounts)
     assert_equal "/out", c.output_path
-    assert_equal({"vcpus" => 2, "ram" => 30}, c.runtime_constraints)
+    assert_equal({"keep_cache_ram"=>268435456, "vcpus" => 2, "ram" => 30}, c.runtime_constraints)
     assert_equal 1, c.priority
 
     assert_raises(ActiveRecord::RecordInvalid) do
@@ -423,7 +423,8 @@ class ContainerRequestTest < ActiveSupport::TestCase
                       command: ["echo", "hello"],
                       output_path: "test",
                       runtime_constraints: {"vcpus" => 4,
-                                            "ram" => 12000000000},
+                                            "ram" => 12000000000,
+                                            "keep_cache_ram" => 268435456},
                       mounts: {"test" => {"kind" => "json"}}}
       set_user_from_auth :active
       cr1 = create_minimal_req!(common_attrs.merge({state: ContainerRequest::Committed,
@@ -523,4 +524,64 @@ class ContainerRequestTest < ActiveSupport::TestCase
     assert_equal cr.container_uuid, cr3.container_uuid
     assert_equal ContainerRequest::Final, cr3.state
   end
+
+  [
+    [{"vcpus" => 1, "ram" => 123, "keep_cache_ram" => 100}, ContainerRequest::Committed, 100],
+    [{"vcpus" => 1, "ram" => 123}, ContainerRequest::Uncommitted],
+    [{"vcpus" => 1, "ram" => 123}, ContainerRequest::Committed],
+    [{"vcpus" => 1, "ram" => 123, "keep_cache_ram" => -1}, ContainerRequest::Committed, ActiveRecord::RecordInvalid],
+    [{"vcpus" => 1, "ram" => 123, "keep_cache_ram" => '123'}, ContainerRequest::Committed, ActiveRecord::RecordInvalid],
+  ].each do |rc, state, expected|
+    test "create container request with #{rc} in state #{state} and verify keep_cache_ram #{expected}" do
+      common_attrs = {cwd: "test",
+                      priority: 1,
+                      command: ["echo", "hello"],
+                      output_path: "test",
+                      runtime_constraints: rc,
+                      mounts: {"test" => {"kind" => "json"}}}
+      set_user_from_auth :active
+
+      if expected == ActiveRecord::RecordInvalid
+        assert_raises(ActiveRecord::RecordInvalid) do
+          create_minimal_req!(common_attrs.merge({state: state}))
+        end
+      else
+        cr = create_minimal_req!(common_attrs.merge({state: state}))
+        expected = Rails.configuration.container_default_keep_cache_ram if state == ContainerRequest::Committed and expected.nil?
+        assert_equal expected, cr.runtime_constraints['keep_cache_ram']
+      end
+    end
+  end
+
+  [
+    [{"partitions" => ["fastcpu","vfastcpu", 100]}, ContainerRequest::Committed, ActiveRecord::RecordInvalid],
+    [{"partitions" => ["fastcpu","vfastcpu", 100]}, ContainerRequest::Uncommitted],
+    [{"partitions" => "fastcpu"}, ContainerRequest::Committed, ActiveRecord::RecordInvalid],
+    [{"partitions" => "fastcpu"}, ContainerRequest::Uncommitted],
+    [{"partitions" => ["fastcpu","vfastcpu"]}, ContainerRequest::Committed],
+  ].each do |sp, state, expected|
+    test "create container request with scheduling_parameters #{sp} in state #{state} and verify #{expected}" do
+      common_attrs = {cwd: "test",
+                      priority: 1,
+                      command: ["echo", "hello"],
+                      output_path: "test",
+                      scheduling_parameters: sp,
+                      mounts: {"test" => {"kind" => "json"}}}
+      set_user_from_auth :active
+
+      if expected == ActiveRecord::RecordInvalid
+        assert_raises(ActiveRecord::RecordInvalid) do
+          create_minimal_req!(common_attrs.merge({state: state}))
+        end
+      else
+        cr = create_minimal_req!(common_attrs.merge({state: state}))
+        assert_equal sp, cr.scheduling_parameters
+
+        if state == ContainerRequest::Committed
+          c = Container.find_by_uuid(cr.container_uuid)
+          assert_equal sp, c.scheduling_parameters
+        end
+      end
+    end
+  end
 end
index 632271e98c263efad7a5869e1831da69ceaf3b97..efbb189c9f8f0e50f1db262a8cd3bac7342a0b03 100644 (file)
@@ -6,7 +6,7 @@ class LogTest < ActiveSupport::TestCase
   EVENT_TEST_METHODS = {
     :create => [:created_at, :assert_nil, :assert_not_nil],
     :update => [:modified_at, :assert_not_nil, :assert_not_nil],
-    :destroy => [nil, :assert_not_nil, :assert_nil],
+    :delete => [nil, :assert_not_nil, :assert_nil],
   }
 
   setup do
@@ -116,7 +116,7 @@ class LogTest < ActiveSupport::TestCase
     orig_attrs = auth.attributes
     orig_attrs.delete 'api_token'
     auth.destroy
-    assert_logged(auth, :destroy) do |props|
+    assert_logged(auth, :delete) do |props|
       assert_equal(orig_etag, props['old_etag'], "destroyed auth etag mismatch")
       assert_equal(orig_attrs, props['old_attributes'],
                    "destroyed auth attributes mismatch")
@@ -230,7 +230,7 @@ class LogTest < ActiveSupport::TestCase
     auth.save!
     assert_logged_with_clean_properties(auth, :update, 'api_token')
     auth.destroy
-    assert_logged_with_clean_properties(auth, :destroy, 'api_token')
+    assert_logged_with_clean_properties(auth, :delete, 'api_token')
   end
 
   test "use ownership and permission links to determine which logs a user can see" do
@@ -283,7 +283,7 @@ class LogTest < ActiveSupport::TestCase
       coll.save!
       assert_logged_with_clean_properties(coll, :update, 'manifest_text')
       coll.destroy
-      assert_logged_with_clean_properties(coll, :destroy, 'manifest_text')
+      assert_logged_with_clean_properties(coll, :delete, 'manifest_text')
     end
   end
 
@@ -302,7 +302,7 @@ class LogTest < ActiveSupport::TestCase
         assert_equal(txt, props['new_attributes']['manifest_text'])
       end
       coll.destroy
-      assert_logged(coll, :destroy) do |props|
+      assert_logged(coll, :delete) do |props|
         assert_equal(txt, props['old_attributes']['manifest_text'])
       end
     end
index e5b88354fb128e1308c1a00a7c9e297928f191dd..6eb1df56d129f0279c2e86323b865d13fd09817c 100644 (file)
@@ -125,4 +125,31 @@ class NodeTest < ActiveSupport::TestCase
     refute_nil node2.slot_number
     assert_equal "custom1", node2.hostname
   end
+
+  test "update dns when nodemanager clears hostname and ip_address" do
+    act_as_system_user do
+      node = ping_node(:new_with_custom_hostname, {})
+      Node.expects(:dns_server_update).with(node.hostname, Node::UNUSED_NODE_IP)
+      node.update_attributes(hostname: nil, ip_address: nil)
+    end
+  end
+
+  test "update dns when hostname changes" do
+    act_as_system_user do
+      node = ping_node(:new_with_custom_hostname, {})
+
+      Node.expects(:dns_server_update).with(node.hostname, Node::UNUSED_NODE_IP)
+      Node.expects(:dns_server_update).with('foo0', node.ip_address)
+      node.update_attributes!(hostname: 'foo0')
+
+      Node.expects(:dns_server_update).with('foo0', Node::UNUSED_NODE_IP)
+      node.update_attributes!(hostname: nil, ip_address: nil)
+
+      Node.expects(:dns_server_update).with('foo0', '10.11.12.13')
+      node.update_attributes!(hostname: 'foo0', ip_address: '10.11.12.13')
+
+      Node.expects(:dns_server_update).with('foo0', '10.11.12.14')
+      node.update_attributes!(hostname: 'foo0', ip_address: '10.11.12.14')
+    end
+  end
 end
index 3bd7b3a8aa93c5871ff18773188c1336033d2599..75645ff47215c2b5f7f666057006ea2510f99811 100644 (file)
@@ -67,7 +67,7 @@ func main() {
        if err := srv.Start(); err != nil {
                log.Fatal(err)
        }
-       if _, err := daemon.SdNotify("READY=1"); err != nil {
+       if _, err := daemon.SdNotify(false, "READY=1"); err != nil {
                log.Printf("Error notifying init daemon: %v", err)
        }
        log.Println("Listening at", srv.Addr)
index 0c1ce49592a6b08223271d440dca41f3a5d8fd46..3c4f281912842a0ceedb6df409aa61e80fa38fa2 100644 (file)
@@ -105,7 +105,7 @@ func doMain() error {
                PollInterval:   time.Duration(theConfig.PollPeriod),
                DoneProcessing: make(chan struct{})}
 
-       if _, err := daemon.SdNotify("READY=1"); err != nil {
+       if _, err := daemon.SdNotify(false, "READY=1"); err != nil {
                log.Printf("Error notifying init daemon: %v", err)
        }
 
@@ -127,8 +127,8 @@ func sbatchFunc(container arvados.Container) *exec.Cmd {
        sbatchArgs = append(sbatchArgs, fmt.Sprintf("--job-name=%s", container.UUID))
        sbatchArgs = append(sbatchArgs, fmt.Sprintf("--mem-per-cpu=%d", int(memPerCPU)))
        sbatchArgs = append(sbatchArgs, fmt.Sprintf("--cpus-per-task=%d", container.RuntimeConstraints.VCPUs))
-       if container.RuntimeConstraints.Partition != nil {
-               sbatchArgs = append(sbatchArgs, fmt.Sprintf("--partition=%s", strings.Join(container.RuntimeConstraints.Partition, ",")))
+       if container.SchedulingParameters.Partitions != nil {
+               sbatchArgs = append(sbatchArgs, fmt.Sprintf("--partition=%s", strings.Join(container.SchedulingParameters.Partitions, ",")))
        }
 
        return exec.Command("sbatch", sbatchArgs...)
index c9208a6943924a1604c7b15735536229cde68104..fbea48e548a59f78718cb0afa419b5a84a1cd89b 100644 (file)
@@ -318,7 +318,7 @@ func testSbatchFuncWithArgs(c *C, args []string) {
 
 func (s *MockArvadosServerSuite) TestSbatchPartition(c *C) {
        theConfig.SbatchArguments = nil
-       container := arvados.Container{UUID: "123", RuntimeConstraints: arvados.RuntimeConstraints{RAM: 250000000, VCPUs: 1, Partition: []string{"blurb", "b2"}}}
+       container := arvados.Container{UUID: "123", RuntimeConstraints: arvados.RuntimeConstraints{RAM: 250000000, VCPUs: 1}, SchedulingParameters: arvados.SchedulingParameters{Partitions: []string{"blurb", "b2"}}}
        sbatchCmd := sbatchFunc(container)
 
        var expected []string
index 0b59a3bb78c1b0c8919f198f5c34cc94ed00d5a9..ade40c6b03a4d4a98812172aab31da5173453c4e 100644 (file)
@@ -250,6 +250,11 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
        pdhOnly := true
        tmpcount := 0
        arvMountCmd := []string{"--foreground", "--allow-other", "--read-write"}
+
+       if runner.Container.RuntimeConstraints.KeepCacheRAM > 0 {
+               arvMountCmd = append(arvMountCmd, "--file-cache", fmt.Sprintf("%d", runner.Container.RuntimeConstraints.KeepCacheRAM))
+       }
+
        collectionPaths := []string{}
        runner.Binds = nil
 
index 7f8e80cb107296b4184dfad5c1c76f9a29585c92..2c7145998ab402786e1f9bf47bfa5575afb1c086 100644 (file)
@@ -842,6 +842,28 @@ func (s *TestSuite) TestSetupMounts(c *C) {
                checkEmpty()
        }
 
+       {
+               i = 0
+               cr.Container.RuntimeConstraints.KeepCacheRAM = 512
+               cr.Container.Mounts = map[string]arvados.Mount{
+                       "/keepinp": {Kind: "collection", PortableDataHash: "59389a8f9ee9d399be35462a0f92541c+53"},
+                       "/keepout": {Kind: "collection", Writable: true},
+               }
+               cr.OutputPath = "/keepout"
+
+               os.MkdirAll(realTemp+"/keep1/by_id/59389a8f9ee9d399be35462a0f92541c+53", os.ModePerm)
+               os.MkdirAll(realTemp+"/keep1/tmp0", os.ModePerm)
+
+               err := cr.SetupMounts()
+               c.Check(err, IsNil)
+               c.Check(am.Cmd, DeepEquals, []string{"--foreground", "--allow-other", "--read-write", "--file-cache", "512", "--mount-tmp", "tmp0", "--mount-by-pdh", "by_id", realTemp + "/keep1"})
+               sort.StringSlice(cr.Binds).Sort()
+               c.Check(cr.Binds, DeepEquals, []string{realTemp + "/keep1/by_id/59389a8f9ee9d399be35462a0f92541c+53:/keepinp:ro",
+                       realTemp + "/keep1/tmp0:/keepout"})
+               cr.CleanupDirs()
+               checkEmpty()
+       }
+
        for _, test := range []struct {
                in  interface{}
                out string
index 28653ae24ef153658484d5628cfeecdaea63f0dd..031058242a21179f15a0ef36ce9456f657811338 100644 (file)
@@ -6,9 +6,14 @@ AssertPathExists=/etc/arvados/docker-cleaner/docker-cleaner.json
 
 [Service]
 Type=simple
-ExecStart=/usr/bin/env arvados-docker-cleaner
 Restart=always
 RestartSec=10s
+RestartPreventExitStatus=2
+#
+# This unwieldy ExecStart command detects at runtime whether
+# arvados-docker-cleaner is installed with the Python 3.3 Software
+# Collection, and if so, invokes it with the "scl" wrapper.
+ExecStart=/bin/sh -c 'if [ -e /opt/rh/python33/root/bin/arvados-docker-cleaner ]; then exec scl enable python33 arvados-docker-cleaner; else exec arvados-docker-cleaner; fi'
 
 [Install]
 WantedBy=multi-user.target
index 5ac81004ace90c6a2b374d36b2af30d7eb5a2139..b03b9e50ffaa47921341305673975ea24b1bcac0 100755 (executable)
@@ -17,6 +17,8 @@ import time
 import docker
 import json
 
+DEFAULT_CONFIG_FILE = '/etc/arvados/docker-cleaner/docker-cleaner.json'
+
 SUFFIX_SIZES = {suffix: 1024 ** exp for exp, suffix in enumerate('kmgt', 1)}
 
 logger = logging.getLogger('arvados_docker.cleaner')
@@ -262,7 +264,14 @@ def load_config(arguments):
             c = json.load(f)
             config.update(c)
     except (FileNotFoundError, IOError, ValueError) as error:
-        sys.exit('error reading config file {}: {}'.format(args.config, error))
+        if (isinstance(error, FileNotFoundError) and
+            args.config == DEFAULT_CONFIG_FILE):
+            logger.warning("DEPRECATED: default config file %s not found; "
+                           "relying on command line configuration",
+                           repr(DEFAULT_CONFIG_FILE))
+        else:
+            sys.exit('error reading config file {}: {}'.format(
+                args.config, error))
 
     configargs = vars(args).copy()
     configargs.pop('config')
@@ -294,7 +303,7 @@ def parse_arguments(arguments):
         formatter_class=Formatter,
     )
     parser.add_argument(
-        '--config', action='store', type=str, default='/etc/arvados/docker-cleaner/docker-cleaner.json',
+        '--config', action='store', type=str, default=DEFAULT_CONFIG_FILE,
         help="configuration file")
 
     deprecated = " (DEPRECATED -- use config file instead)"
@@ -314,12 +323,15 @@ def parse_arguments(arguments):
     return parser.parse_args(arguments)
 
 
-def setup_logging(config):
+def setup_logging():
     log_handler = logging.StreamHandler()
     log_handler.setFormatter(logging.Formatter(
         '%(asctime)s %(name)s[%(process)d] %(levelname)s: %(message)s',
         '%Y-%m-%d %H:%M:%S'))
     logger.addHandler(log_handler)
+
+
+def configure_logging(config):
     logger.setLevel(logging.ERROR - (10 * config['Verbose']))
 
 
@@ -342,8 +354,9 @@ def run(config, docker_client):
 
 
 def main(arguments=sys.argv[1:]):
+    setup_logging()
     config = load_config(arguments)
-    setup_logging(config)
+    configure_logging(config)
     try:
         run(config, docker.Client(version='1.14'))
     except KeyboardInterrupt:
index 535650e3fa3f35bcbef48240727c7746e68eb7f8..7b7a471580788ca9ab857d1e5a9ba648af7c0077 100644 (file)
@@ -29,6 +29,7 @@ setup(name="arvados-docker-cleaner",
       ],
       install_requires=[
           'docker-py==1.7.2',
+          'setuptools',
       ],
       tests_require=[
           'pbr<1.7.0',
index 9fbd3e3014ecd0038d789d64ce3660d872268f6c..37c1d7600e34f9699b7788c0a76663173978d46f 100644 (file)
@@ -3,6 +3,7 @@
 import collections
 import itertools
 import json
+import os
 import random
 import tempfile
 import time
@@ -437,6 +438,11 @@ class ConfigTestCase(unittest.TestCase):
         self.assertEqual('never', config['RemoveStoppedContainers'])
         self.assertEqual(1, config['Verbose'])
 
+    def test_args_no_config(self):
+        self.assertEqual(False, os.path.exists(cleaner.DEFAULT_CONFIG_FILE))
+        config = cleaner.load_config(['--quota', '1G'])
+        self.assertEqual(1 << 30, config['Quota'])
+
 
 class ContainerRemovalTestCase(unittest.TestCase):
     LIFECYCLE = ['create', 'attach', 'start', 'resize', 'die', 'destroy']
index f32a5db3cc29e63d7b556adfa19d011f89ccd713..1828e150bb76bdf6185f7fcbb3fe3172fb68e616 100644 (file)
@@ -367,9 +367,10 @@ class Operations(llfuse.Operations):
         return True
 
     def listen_for_events(self):
-        self.events = arvados.events.subscribe(self._api_client,
-                                 [["event_type", "in", ["create", "update", "delete"]]],
-                                 self.on_event)
+        self.events = arvados.events.subscribe(
+            self._api_client,
+            [["event_type", "in", ["create", "update", "delete"]]],
+            self.on_event)
 
     @catch_exceptions
     def on_event(self, ev):
index 13d8c1b329ab6583fec2a3d4f281d61a55036ca2..df8a0b5c078b31d3a42b51786e3d4f6d0f15477f 100644 (file)
@@ -83,7 +83,7 @@ func main() {
        if err := srv.Start(); err != nil {
                log.Fatal(err)
        }
-       if _, err := daemon.SdNotify("READY=1"); err != nil {
+       if _, err := daemon.SdNotify(false, "READY=1"); err != nil {
                log.Printf("Error notifying init daemon: %v", err)
        }
        log.Println("Listening at", srv.Addr)
index 816de29da8abc4bad7ddcf22afcb572a7b16da69..24df531fa4ab434fea7652503db1d44f1ba04a9b 100644 (file)
@@ -131,7 +131,7 @@ func main() {
        if err != nil {
                log.Fatalf("listen(%s): %s", cfg.Listen, err)
        }
-       if _, err := daemon.SdNotify("READY=1"); err != nil {
+       if _, err := daemon.SdNotify(false, "READY=1"); err != nil {
                log.Printf("Error notifying init daemon: %v", err)
        }
        log.Println("Listening at", listener.Addr())
index d2163f6b490376768383b260444d6be90a9ca1ed..6ca31c38329ec7347631a03c802b4216bd05f167 100644 (file)
@@ -2,12 +2,14 @@ package main
 
 import (
        "bytes"
+       "context"
        "errors"
        "flag"
        "fmt"
        "io"
        "io/ioutil"
        "log"
+       "net/http"
        "os"
        "regexp"
        "strconv"
@@ -15,9 +17,12 @@ import (
        "sync"
        "time"
 
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
        "github.com/curoverse/azure-sdk-for-go/storage"
 )
 
+const azureDefaultRequestTimeout = arvados.Duration(10 * time.Minute)
+
 var (
        azureMaxGetBytes           int
        azureStorageAccountName    string
@@ -95,6 +100,7 @@ type AzureBlobVolume struct {
        ContainerName         string
        AzureReplication      int
        ReadOnly              bool
+       RequestTimeout        arvados.Duration
 
        azClient storage.Client
        bsClient storage.BlobStorageClient
@@ -108,6 +114,7 @@ func (*AzureBlobVolume) Examples() []Volume {
                        StorageAccountKeyFile: "/etc/azure_storage_account_key.txt",
                        ContainerName:         "example-container-name",
                        AzureReplication:      3,
+                       RequestTimeout:        azureDefaultRequestTimeout,
                },
        }
 }
@@ -133,6 +140,13 @@ func (v *AzureBlobVolume) Start() error {
        if err != nil {
                return fmt.Errorf("creating Azure storage client: %s", err)
        }
+
+       if v.RequestTimeout == 0 {
+               v.RequestTimeout = azureDefaultRequestTimeout
+       }
+       v.azClient.HTTPClient = &http.Client{
+               Timeout: time.Duration(v.RequestTimeout),
+       }
        v.bsClient = v.azClient.GetBlobService()
 
        ok, err := v.bsClient.ContainerExists(v.ContainerName)
@@ -163,7 +177,7 @@ func (v *AzureBlobVolume) checkTrashed(loc string) (bool, map[string]string, err
 // If the block is younger than azureWriteRaceInterval and is
 // unexpectedly empty, assume a PutBlob operation is in progress, and
 // wait for it to finish writing.
-func (v *AzureBlobVolume) Get(loc string, buf []byte) (int, error) {
+func (v *AzureBlobVolume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
        trashed, _, err := v.checkTrashed(loc)
        if err != nil {
                return 0, err
@@ -271,7 +285,7 @@ func (v *AzureBlobVolume) get(loc string, buf []byte) (int, error) {
 }
 
 // Compare the given data with existing stored data.
-func (v *AzureBlobVolume) Compare(loc string, expect []byte) error {
+func (v *AzureBlobVolume) Compare(ctx context.Context, loc string, expect []byte) error {
        trashed, _, err := v.checkTrashed(loc)
        if err != nil {
                return err
@@ -284,11 +298,11 @@ func (v *AzureBlobVolume) Compare(loc string, expect []byte) error {
                return v.translateError(err)
        }
        defer rdr.Close()
-       return compareReaderWithBuf(rdr, expect, loc[:32])
+       return compareReaderWithBuf(ctx, rdr, expect, loc[:32])
 }
 
 // Put stores a Keep block as a block blob in the container.
-func (v *AzureBlobVolume) Put(loc string, block []byte) error {
+func (v *AzureBlobVolume) Put(ctx context.Context, loc string, block []byte) error {
        if v.ReadOnly {
                return MethodDisabledError
        }
index c8c898fe2da3957e3efdf069c5370e146ac5d693..d636a5ee86887806372a14e2f291e5c4f2c11b33 100644 (file)
@@ -2,6 +2,7 @@ package main
 
 import (
        "bytes"
+       "context"
        "crypto/md5"
        "encoding/base64"
        "encoding/xml"
@@ -454,12 +455,12 @@ func TestAzureBlobVolumeRangeFenceposts(t *testing.T) {
                        data[i] = byte((i + 7) & 0xff)
                }
                hash := fmt.Sprintf("%x", md5.Sum(data))
-               err := v.Put(hash, data)
+               err := v.Put(context.Background(), hash, data)
                if err != nil {
                        t.Error(err)
                }
                gotData := make([]byte, len(data))
-               gotLen, err := v.Get(hash, gotData)
+               gotLen, err := v.Get(context.Background(), hash, gotData)
                if err != nil {
                        t.Error(err)
                }
@@ -500,7 +501,7 @@ func TestAzureBlobVolumeCreateBlobRace(t *testing.T) {
        allDone := make(chan struct{})
        v.azHandler.race = make(chan chan struct{})
        go func() {
-               err := v.Put(TestHash, TestBlock)
+               err := v.Put(context.Background(), TestHash, TestBlock)
                if err != nil {
                        t.Error(err)
                }
@@ -510,7 +511,7 @@ func TestAzureBlobVolumeCreateBlobRace(t *testing.T) {
        v.azHandler.race <- continuePut
        go func() {
                buf := make([]byte, len(TestBlock))
-               _, err := v.Get(TestHash, buf)
+               _, err := v.Get(context.Background(), TestHash, buf)
                if err != nil {
                        t.Error(err)
                }
@@ -553,7 +554,7 @@ func TestAzureBlobVolumeCreateBlobRaceDeadline(t *testing.T) {
        go func() {
                defer close(allDone)
                buf := make([]byte, BlockSize)
-               n, err := v.Get(TestHash, buf)
+               n, err := v.Get(context.Background(), TestHash, buf)
                if err != nil {
                        t.Error(err)
                        return
index a4af563729b3cf0e72686a2913fd3664e497e24d..82cb789eb954c289172ac478dc04392a14c6a989 100644 (file)
@@ -2,6 +2,7 @@ package main
 
 import (
        "bytes"
+       "context"
        "crypto/md5"
        "fmt"
        "io"
@@ -49,7 +50,7 @@ func collisionOrCorrupt(expectMD5 string, buf1, buf2 []byte, rdr io.Reader) erro
        return <-outcome
 }
 
-func compareReaderWithBuf(rdr io.Reader, expect []byte, hash string) error {
+func compareReaderWithBuf(ctx context.Context, rdr io.Reader, expect []byte, hash string) error {
        bufLen := 1 << 20
        if bufLen > len(expect) && len(expect) > 0 {
                // No need for bufLen to be longer than
@@ -67,7 +68,18 @@ func compareReaderWithBuf(rdr io.Reader, expect []byte, hash string) error {
        // expected to equal the next N bytes read from
        // rdr.
        for {
-               n, err := rdr.Read(buf)
+               ready := make(chan bool)
+               var n int
+               var err error
+               go func() {
+                       n, err = rdr.Read(buf)
+                       close(ready)
+               }()
+               select {
+               case <-ready:
+               case <-ctx.Done():
+                       return ctx.Err()
+               }
                if n > len(cmp) || bytes.Compare(cmp[:n], buf[:n]) != 0 {
                        return collisionOrCorrupt(hash, expect[:len(expect)-len(cmp)], buf[:n], rdr)
                }
index 9c318d1245abcb285634ed46eaf17ddcabe300b2..dc06ef549877ba0316294e4a0e7767393ef4436d 100644 (file)
@@ -13,6 +13,7 @@ import (
 )
 
 type Config struct {
+       Debug  bool
        Listen string
 
        PIDFile string
@@ -32,6 +33,7 @@ type Config struct {
 
        blobSigningKey  []byte
        systemAuthToken string
+       debugLogf       func(string, ...interface{})
 }
 
 var theConfig = DefaultConfig()
@@ -52,6 +54,13 @@ func DefaultConfig() *Config {
 // Start should be called exactly once: after setting all public
 // fields, and before using the config.
 func (cfg *Config) Start() error {
+       if cfg.Debug {
+               cfg.debugLogf = log.Printf
+               cfg.debugLogf("debugging enabled")
+       } else {
+               cfg.debugLogf = func(string, ...interface{}) {}
+       }
+
        if cfg.MaxBuffers < 0 {
                return fmt.Errorf("MaxBuffers must be greater than zero")
        }
diff --git a/services/keepstore/config_test.go b/services/keepstore/config_test.go
new file mode 100644 (file)
index 0000000..eaa0904
--- /dev/null
@@ -0,0 +1,9 @@
+package main
+
+import (
+       "log"
+)
+
+func init() {
+       theConfig.debugLogf = log.Printf
+}
index dc9bcb117f0508e748a97ff3cb2a736aa5c00178..9708b4e6be32f96645d500dfcd4319972f213d47 100644 (file)
@@ -11,6 +11,7 @@ package main
 
 import (
        "bytes"
+       "context"
        "encoding/json"
        "fmt"
        "net/http"
@@ -48,7 +49,7 @@ func TestGetHandler(t *testing.T) {
        defer KeepVM.Close()
 
        vols := KeepVM.AllWritable()
-       if err := vols[0].Put(TestHash, TestBlock); err != nil {
+       if err := vols[0].Put(context.Background(), TestHash, TestBlock); err != nil {
                t.Error(err)
        }
 
@@ -288,10 +289,10 @@ func TestIndexHandler(t *testing.T) {
        defer KeepVM.Close()
 
        vols := KeepVM.AllWritable()
-       vols[0].Put(TestHash, TestBlock)
-       vols[1].Put(TestHash2, TestBlock2)
-       vols[0].Put(TestHash+".meta", []byte("metadata"))
-       vols[1].Put(TestHash2+".meta", []byte("metadata"))
+       vols[0].Put(context.Background(), TestHash, TestBlock)
+       vols[1].Put(context.Background(), TestHash2, TestBlock2)
+       vols[0].Put(context.Background(), TestHash+".meta", []byte("metadata"))
+       vols[1].Put(context.Background(), TestHash2+".meta", []byte("metadata"))
 
        theConfig.systemAuthToken = "DATA MANAGER TOKEN"
 
@@ -477,7 +478,7 @@ func TestDeleteHandler(t *testing.T) {
        defer KeepVM.Close()
 
        vols := KeepVM.AllWritable()
-       vols[0].Put(TestHash, TestBlock)
+       vols[0].Put(context.Background(), TestHash, TestBlock)
 
        // Explicitly set the BlobSignatureTTL to 0 for these
        // tests, to ensure the MockVolume deletes the blocks
@@ -564,7 +565,7 @@ func TestDeleteHandler(t *testing.T) {
        }
        // Confirm the block has been deleted
        buf := make([]byte, BlockSize)
-       _, err := vols[0].Get(TestHash, buf)
+       _, err := vols[0].Get(context.Background(), TestHash, buf)
        var blockDeleted = os.IsNotExist(err)
        if !blockDeleted {
                t.Error("superuserExistingBlockReq: block not deleted")
@@ -572,7 +573,7 @@ func TestDeleteHandler(t *testing.T) {
 
        // A DELETE request on a block newer than BlobSignatureTTL
        // should return success but leave the block on the volume.
-       vols[0].Put(TestHash, TestBlock)
+       vols[0].Put(context.Background(), TestHash, TestBlock)
        theConfig.BlobSignatureTTL = arvados.Duration(time.Hour)
 
        response = IssueRequest(superuserExistingBlockReq)
@@ -588,7 +589,7 @@ func TestDeleteHandler(t *testing.T) {
                        expectedDc, responseDc)
        }
        // Confirm the block has NOT been deleted.
-       _, err = vols[0].Get(TestHash, buf)
+       _, err = vols[0].Get(context.Background(), TestHash, buf)
        if err != nil {
                t.Errorf("testing delete on new block: %s\n", err)
        }
@@ -940,7 +941,7 @@ func TestGetHandlerClientDisconnect(t *testing.T) {
        KeepVM = MakeTestVolumeManager(2)
        defer KeepVM.Close()
 
-       if err := KeepVM.AllWritable()[0].Put(TestHash, TestBlock); err != nil {
+       if err := KeepVM.AllWritable()[0].Put(context.Background(), TestHash, TestBlock); err != nil {
                t.Error(err)
        }
 
@@ -985,7 +986,7 @@ func TestGetHandlerNoBufferLeak(t *testing.T) {
        defer KeepVM.Close()
 
        vols := KeepVM.AllWritable()
-       if err := vols[0].Put(TestHash, TestBlock); err != nil {
+       if err := vols[0].Put(context.Background(), TestHash, TestBlock); err != nil {
                t.Error(err)
        }
 
@@ -1040,7 +1041,7 @@ func TestUntrashHandler(t *testing.T) {
        KeepVM = MakeTestVolumeManager(2)
        defer KeepVM.Close()
        vols := KeepVM.AllWritable()
-       vols[0].Put(TestHash, TestBlock)
+       vols[0].Put(context.Background(), TestHash, TestBlock)
 
        theConfig.systemAuthToken = "DATA MANAGER TOKEN"
 
index 54b8b485e1dc99d491bd94d4b5b888b60b990b13..289dce15a06168572f5269d7fed82bdb31a75075 100644 (file)
@@ -9,6 +9,7 @@ package main
 
 import (
        "container/list"
+       "context"
        "crypto/md5"
        "encoding/json"
        "fmt"
@@ -71,6 +72,9 @@ func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
 
 // GetBlockHandler is a HandleFunc to address Get block requests.
 func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
+       ctx, cancel := contextForResponse(context.TODO(), resp)
+       defer cancel()
+
        if theConfig.RequireSignatures {
                locator := req.URL.Path[1:] // strip leading slash
                if err := VerifySignature(locator, GetAPIToken(req)); err != nil {
@@ -86,14 +90,14 @@ func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
        // isn't here, we can return 404 now instead of waiting for a
        // buffer.
 
-       buf, err := getBufferForResponseWriter(resp, bufs, BlockSize)
+       buf, err := getBufferWithContext(ctx, bufs, BlockSize)
        if err != nil {
                http.Error(resp, err.Error(), http.StatusServiceUnavailable)
                return
        }
        defer bufs.Put(buf)
 
-       size, err := GetBlock(mux.Vars(req)["hash"], buf, resp)
+       size, err := GetBlock(ctx, mux.Vars(req)["hash"], buf, resp)
        if err != nil {
                code := http.StatusInternalServerError
                if err, ok := err.(*KeepError); ok {
@@ -108,24 +112,33 @@ func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
        resp.Write(buf[:size])
 }
 
+// Return a new context that gets cancelled by resp's CloseNotifier.
+func contextForResponse(parent context.Context, resp http.ResponseWriter) (context.Context, context.CancelFunc) {
+       ctx, cancel := context.WithCancel(parent)
+       if cn, ok := resp.(http.CloseNotifier); ok {
+               go func(c <-chan bool) {
+                       select {
+                       case <-c:
+                               theConfig.debugLogf("cancel context")
+                               cancel()
+                       case <-ctx.Done():
+                       }
+               }(cn.CloseNotify())
+       }
+       return ctx, cancel
+}
+
 // Get a buffer from the pool -- but give up and return a non-nil
-// error if resp implements http.CloseNotifier and tells us that the
-// client has disconnected before we get a buffer.
-func getBufferForResponseWriter(resp http.ResponseWriter, bufs *bufferPool, bufSize int) ([]byte, error) {
-       var closeNotifier <-chan bool
-       if resp, ok := resp.(http.CloseNotifier); ok {
-               closeNotifier = resp.CloseNotify()
-       }
-       var buf []byte
+// error if ctx ends before we get a buffer.
+func getBufferWithContext(ctx context.Context, bufs *bufferPool, bufSize int) ([]byte, error) {
        bufReady := make(chan []byte)
        go func() {
                bufReady <- bufs.Get(bufSize)
-               close(bufReady)
        }()
        select {
-       case buf = <-bufReady:
+       case buf := <-bufReady:
                return buf, nil
-       case <-closeNotifier:
+       case <-ctx.Done():
                go func() {
                        // Even if closeNotifier happened first, we
                        // need to keep waiting for our buf so we can
@@ -138,6 +151,9 @@ func getBufferForResponseWriter(resp http.ResponseWriter, bufs *bufferPool, bufS
 
 // PutBlockHandler is a HandleFunc to address Put block requests.
 func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
+       ctx, cancel := contextForResponse(context.TODO(), resp)
+       defer cancel()
+
        hash := mux.Vars(req)["hash"]
 
        // Detect as many error conditions as possible before reading
@@ -159,7 +175,7 @@ func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
                return
        }
 
-       buf, err := getBufferForResponseWriter(resp, bufs, int(req.ContentLength))
+       buf, err := getBufferWithContext(ctx, bufs, int(req.ContentLength))
        if err != nil {
                http.Error(resp, err.Error(), http.StatusServiceUnavailable)
                return
@@ -172,12 +188,15 @@ func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
                return
        }
 
-       replication, err := PutBlock(buf, hash)
+       replication, err := PutBlock(ctx, buf, hash)
        bufs.Put(buf)
 
        if err != nil {
-               ke := err.(*KeepError)
-               http.Error(resp, ke.Error(), ke.HTTPCode)
+               code := http.StatusInternalServerError
+               if err, ok := err.(*KeepError); ok {
+                       code = err.HTTPCode
+               }
+               http.Error(resp, err.Error(), code)
                return
        }
 
@@ -548,12 +567,17 @@ func UntrashHandler(resp http.ResponseWriter, req *http.Request) {
 // If the block found does not have the correct MD5 hash, returns
 // DiskHashError.
 //
-func GetBlock(hash string, buf []byte, resp http.ResponseWriter) (int, error) {
+func GetBlock(ctx context.Context, hash string, buf []byte, resp http.ResponseWriter) (int, error) {
        // Attempt to read the requested hash from a keep volume.
        errorToCaller := NotFoundError
 
        for _, vol := range KeepVM.AllReadable() {
-               size, err := vol.Get(hash, buf)
+               size, err := vol.Get(ctx, hash, buf)
+               select {
+               case <-ctx.Done():
+                       return 0, ErrClientDisconnect
+               default:
+               }
                if err != nil {
                        // IsNotExist is an expected error and may be
                        // ignored. All other errors are logged. In
@@ -587,7 +611,7 @@ func GetBlock(hash string, buf []byte, resp http.ResponseWriter) (int, error) {
 
 // PutBlock Stores the BLOCK (identified by the content id HASH) in Keep.
 //
-// PutBlock(block, hash)
+// PutBlock(ctx, block, hash)
 //   Stores the BLOCK (identified by the content id HASH) in Keep.
 //
 //   The MD5 checksum of the block must be identical to the content id HASH.
@@ -612,7 +636,7 @@ func GetBlock(hash string, buf []byte, resp http.ResponseWriter) (int, error) {
 //          all writes failed). The text of the error message should
 //          provide as much detail as possible.
 //
-func PutBlock(block []byte, hash string) (int, error) {
+func PutBlock(ctx context.Context, block []byte, hash string) (int, error) {
        // Check that BLOCK's checksum matches HASH.
        blockhash := fmt.Sprintf("%x", md5.Sum(block))
        if blockhash != hash {
@@ -623,16 +647,21 @@ func PutBlock(block []byte, hash string) (int, error) {
        // If we already have this data, it's intact on disk, and we
        // can update its timestamp, return success. If we have
        // different data with the same hash, return failure.
-       if n, err := CompareAndTouch(hash, block); err == nil || err == CollisionError {
+       if n, err := CompareAndTouch(ctx, hash, block); err == nil || err == CollisionError {
                return n, err
+       } else if ctx.Err() != nil {
+               return 0, ErrClientDisconnect
        }
 
        // Choose a Keep volume to write to.
        // If this volume fails, try all of the volumes in order.
        if vol := KeepVM.NextWritable(); vol != nil {
-               if err := vol.Put(hash, block); err == nil {
+               if err := vol.Put(ctx, hash, block); err == nil {
                        return vol.Replication(), nil // success!
                }
+               if ctx.Err() != nil {
+                       return 0, ErrClientDisconnect
+               }
        }
 
        writables := KeepVM.AllWritable()
@@ -643,7 +672,10 @@ func PutBlock(block []byte, hash string) (int, error) {
 
        allFull := true
        for _, vol := range writables {
-               err := vol.Put(hash, block)
+               err := vol.Put(ctx, hash, block)
+               if ctx.Err() != nil {
+                       return 0, ErrClientDisconnect
+               }
                if err == nil {
                        return vol.Replication(), nil // success!
                }
@@ -669,10 +701,13 @@ func PutBlock(block []byte, hash string) (int, error) {
 // the relevant block's modification time in order to protect it from
 // premature garbage collection. Otherwise, it returns a non-nil
 // error.
-func CompareAndTouch(hash string, buf []byte) (int, error) {
+func CompareAndTouch(ctx context.Context, hash string, buf []byte) (int, error) {
        var bestErr error = NotFoundError
        for _, vol := range KeepVM.AllWritable() {
-               if err := vol.Compare(hash, buf); err == CollisionError {
+               err := vol.Compare(ctx, hash, buf)
+               if ctx.Err() != nil {
+                       return 0, ctx.Err()
+               } else if err == CollisionError {
                        // Stop if we have a block with same hash but
                        // different content. (It will be impossible
                        // to tell which one is wanted if we have
index dda7edcec3509e683465a93d5eb775bf18f16d19..181e651d3b4bbef40fa146b8fe06e6b93c206b05 100644 (file)
@@ -2,6 +2,7 @@ package main
 
 import (
        "bytes"
+       "context"
 )
 
 // A TestableVolumeManagerFactory creates a volume manager with at least two TestableVolume instances.
@@ -46,7 +47,7 @@ func testGetBlock(t TB, factory TestableVolumeManagerFactory, testHash string, t
 
        // Get should pass
        buf := make([]byte, len(testBlock))
-       n, err := GetBlock(testHash, buf, nil)
+       n, err := GetBlock(context.Background(), testHash, buf, nil)
        if err != nil {
                t.Fatalf("Error while getting block %s", err)
        }
@@ -66,7 +67,7 @@ func testPutRawBadDataGetBlock(t TB, factory TestableVolumeManagerFactory,
 
        // Get should fail
        buf := make([]byte, BlockSize)
-       size, err := GetBlock(testHash, buf, nil)
+       size, err := GetBlock(context.Background(), testHash, buf, nil)
        if err == nil {
                t.Fatalf("Got %+q, expected error while getting corrupt block %v", buf[:size], testHash)
        }
@@ -77,18 +78,18 @@ func testPutBlock(t TB, factory TestableVolumeManagerFactory, testHash string, t
        setupHandlersWithGenericVolumeTest(t, factory)
 
        // PutBlock
-       if _, err := PutBlock(testBlock, testHash); err != nil {
+       if _, err := PutBlock(context.Background(), testBlock, testHash); err != nil {
                t.Fatalf("Error during PutBlock: %s", err)
        }
 
        // Check that PutBlock succeeds again even after CompareAndTouch
-       if _, err := PutBlock(testBlock, testHash); err != nil {
+       if _, err := PutBlock(context.Background(), testBlock, testHash); err != nil {
                t.Fatalf("Error during PutBlock: %s", err)
        }
 
        // Check that PutBlock stored the data as expected
        buf := make([]byte, BlockSize)
-       size, err := GetBlock(testHash, buf, nil)
+       size, err := GetBlock(context.Background(), testHash, buf, nil)
        if err != nil {
                t.Fatalf("Error during GetBlock for %q: %s", testHash, err)
        } else if bytes.Compare(buf[:size], testBlock) != 0 {
@@ -106,14 +107,14 @@ func testPutBlockCorrupt(t TB, factory TestableVolumeManagerFactory,
        testableVolumes[1].PutRaw(testHash, badData)
 
        // Check that PutBlock with good data succeeds
-       if _, err := PutBlock(testBlock, testHash); err != nil {
+       if _, err := PutBlock(context.Background(), testBlock, testHash); err != nil {
                t.Fatalf("Error during PutBlock for %q: %s", testHash, err)
        }
 
        // Put succeeded and overwrote the badData in one volume,
        // and Get should return the testBlock now, ignoring the bad data.
        buf := make([]byte, BlockSize)
-       size, err := GetBlock(testHash, buf, nil)
+       size, err := GetBlock(context.Background(), testHash, buf, nil)
        if err != nil {
                t.Fatalf("Error during GetBlock for %q: %s", testHash, err)
        } else if bytes.Compare(buf[:size], testBlock) != 0 {
index 3fb86bc0f147182087cb845ebb4b250891507a27..2f5f8d43ea70c5953a71b0936b124c3b148f1ad5 100644 (file)
@@ -189,7 +189,7 @@ func main() {
        signal.Notify(term, syscall.SIGTERM)
        signal.Notify(term, syscall.SIGINT)
 
-       if _, err := daemon.SdNotify("READY=1"); err != nil {
+       if _, err := daemon.SdNotify(false, "READY=1"); err != nil {
                log.Printf("Error notifying init daemon: %v", err)
        }
        log.Println("listening at", listener.Addr())
index dc6af0fa0d651c79cd6694a006fd3ad83ba2d677..e1d1dc5cb3cf2eb6ed3bf0b1da0a18b154f03328 100644 (file)
@@ -2,6 +2,7 @@ package main
 
 import (
        "bytes"
+       "context"
        "fmt"
        "io/ioutil"
        "os"
@@ -61,13 +62,13 @@ func TestGetBlock(t *testing.T) {
        defer KeepVM.Close()
 
        vols := KeepVM.AllReadable()
-       if err := vols[1].Put(TestHash, TestBlock); err != nil {
+       if err := vols[1].Put(context.Background(), TestHash, TestBlock); err != nil {
                t.Error(err)
        }
 
        // Check that GetBlock returns success.
        buf := make([]byte, BlockSize)
-       size, err := GetBlock(TestHash, buf, nil)
+       size, err := GetBlock(context.Background(), TestHash, buf, nil)
        if err != nil {
                t.Errorf("GetBlock error: %s", err)
        }
@@ -88,7 +89,7 @@ func TestGetBlockMissing(t *testing.T) {
 
        // Check that GetBlock returns failure.
        buf := make([]byte, BlockSize)
-       size, err := GetBlock(TestHash, buf, nil)
+       size, err := GetBlock(context.Background(), TestHash, buf, nil)
        if err != NotFoundError {
                t.Errorf("Expected NotFoundError, got %v, err %v", buf[:size], err)
        }
@@ -106,11 +107,11 @@ func TestGetBlockCorrupt(t *testing.T) {
        defer KeepVM.Close()
 
        vols := KeepVM.AllReadable()
-       vols[0].Put(TestHash, BadBlock)
+       vols[0].Put(context.Background(), TestHash, BadBlock)
 
        // Check that GetBlock returns failure.
        buf := make([]byte, BlockSize)
-       size, err := GetBlock(TestHash, buf, nil)
+       size, err := GetBlock(context.Background(), TestHash, buf, nil)
        if err != DiskHashError {
                t.Errorf("Expected DiskHashError, got %v (buf: %v)", err, buf[:size])
        }
@@ -131,13 +132,13 @@ func TestPutBlockOK(t *testing.T) {
        defer KeepVM.Close()
 
        // Check that PutBlock stores the data as expected.
-       if n, err := PutBlock(TestBlock, TestHash); err != nil || n < 1 {
+       if n, err := PutBlock(context.Background(), TestBlock, TestHash); err != nil || n < 1 {
                t.Fatalf("PutBlock: n %d err %v", n, err)
        }
 
        vols := KeepVM.AllReadable()
        buf := make([]byte, BlockSize)
-       n, err := vols[1].Get(TestHash, buf)
+       n, err := vols[1].Get(context.Background(), TestHash, buf)
        if err != nil {
                t.Fatalf("Volume #0 Get returned error: %v", err)
        }
@@ -162,12 +163,12 @@ func TestPutBlockOneVol(t *testing.T) {
        vols[0].(*MockVolume).Bad = true
 
        // Check that PutBlock stores the data as expected.
-       if n, err := PutBlock(TestBlock, TestHash); err != nil || n < 1 {
+       if n, err := PutBlock(context.Background(), TestBlock, TestHash); err != nil || n < 1 {
                t.Fatalf("PutBlock: n %d err %v", n, err)
        }
 
        buf := make([]byte, BlockSize)
-       size, err := GetBlock(TestHash, buf, nil)
+       size, err := GetBlock(context.Background(), TestHash, buf, nil)
        if err != nil {
                t.Fatalf("GetBlock: %v", err)
        }
@@ -190,12 +191,12 @@ func TestPutBlockMD5Fail(t *testing.T) {
 
        // Check that PutBlock returns the expected error when the hash does
        // not match the block.
-       if _, err := PutBlock(BadBlock, TestHash); err != RequestHashError {
+       if _, err := PutBlock(context.Background(), BadBlock, TestHash); err != RequestHashError {
                t.Errorf("Expected RequestHashError, got %v", err)
        }
 
        // Confirm that GetBlock fails to return anything.
-       if result, err := GetBlock(TestHash, make([]byte, BlockSize), nil); err != NotFoundError {
+       if result, err := GetBlock(context.Background(), TestHash, make([]byte, BlockSize), nil); err != NotFoundError {
                t.Errorf("GetBlock succeeded after a corrupt block store (result = %s, err = %v)",
                        string(result), err)
        }
@@ -214,14 +215,14 @@ func TestPutBlockCorrupt(t *testing.T) {
 
        // Store a corrupted block under TestHash.
        vols := KeepVM.AllWritable()
-       vols[0].Put(TestHash, BadBlock)
-       if n, err := PutBlock(TestBlock, TestHash); err != nil || n < 1 {
+       vols[0].Put(context.Background(), TestHash, BadBlock)
+       if n, err := PutBlock(context.Background(), TestBlock, TestHash); err != nil || n < 1 {
                t.Errorf("PutBlock: n %d err %v", n, err)
        }
 
        // The block on disk should now match TestBlock.
        buf := make([]byte, BlockSize)
-       if size, err := GetBlock(TestHash, buf, nil); err != nil {
+       if size, err := GetBlock(context.Background(), TestHash, buf, nil); err != nil {
                t.Errorf("GetBlock: %v", err)
        } else if bytes.Compare(buf[:size], TestBlock) != 0 {
                t.Errorf("Got %+q, expected %+q", buf[:size], TestBlock)
@@ -246,10 +247,10 @@ func TestPutBlockCollision(t *testing.T) {
 
        // Store one block, then attempt to store the other. Confirm that
        // PutBlock reported a CollisionError.
-       if _, err := PutBlock(b1, locator); err != nil {
+       if _, err := PutBlock(context.Background(), b1, locator); err != nil {
                t.Error(err)
        }
-       if _, err := PutBlock(b2, locator); err == nil {
+       if _, err := PutBlock(context.Background(), b2, locator); err == nil {
                t.Error("PutBlock did not report a collision")
        } else if err != CollisionError {
                t.Errorf("PutBlock returned %v", err)
@@ -271,7 +272,7 @@ func TestPutBlockTouchFails(t *testing.T) {
        // Store a block and then make the underlying volume bad,
        // so a subsequent attempt to update the file timestamp
        // will fail.
-       vols[0].Put(TestHash, BadBlock)
+       vols[0].Put(context.Background(), TestHash, BadBlock)
        oldMtime, err := vols[0].Mtime(TestHash)
        if err != nil {
                t.Fatalf("vols[0].Mtime(%s): %s\n", TestHash, err)
@@ -280,7 +281,7 @@ func TestPutBlockTouchFails(t *testing.T) {
        // vols[0].Touch will fail on the next call, so the volume
        // manager will store a copy on vols[1] instead.
        vols[0].(*MockVolume).Touchable = false
-       if n, err := PutBlock(TestBlock, TestHash); err != nil || n < 1 {
+       if n, err := PutBlock(context.Background(), TestBlock, TestHash); err != nil || n < 1 {
                t.Fatalf("PutBlock: n %d err %v", n, err)
        }
        vols[0].(*MockVolume).Touchable = true
@@ -296,7 +297,7 @@ func TestPutBlockTouchFails(t *testing.T) {
                        oldMtime, newMtime)
        }
        buf := make([]byte, BlockSize)
-       n, err := vols[1].Get(TestHash, buf)
+       n, err := vols[1].Get(context.Background(), TestHash, buf)
        if err != nil {
                t.Fatalf("vols[1]: %v", err)
        }
@@ -400,11 +401,11 @@ func TestIndex(t *testing.T) {
        defer KeepVM.Close()
 
        vols := KeepVM.AllReadable()
-       vols[0].Put(TestHash, TestBlock)
-       vols[1].Put(TestHash2, TestBlock2)
-       vols[0].Put(TestHash3, TestBlock3)
-       vols[0].Put(TestHash+".meta", []byte("metadata"))
-       vols[1].Put(TestHash2+".meta", []byte("metadata"))
+       vols[0].Put(context.Background(), TestHash, TestBlock)
+       vols[1].Put(context.Background(), TestHash2, TestBlock2)
+       vols[0].Put(context.Background(), TestHash3, TestBlock3)
+       vols[0].Put(context.Background(), TestHash+".meta", []byte("metadata"))
+       vols[1].Put(context.Background(), TestHash2+".meta", []byte("metadata"))
 
        buf := new(bytes.Buffer)
        vols[0].IndexTo("", buf)
index d53d1060e743e07d9d2bfba6b90c67376a1006ab..12860bb662d91a1e31191fed2bccace97bc2ac30 100644 (file)
@@ -1,6 +1,7 @@
 package main
 
 import (
+       "context"
        "crypto/rand"
        "fmt"
        "git.curoverse.com/arvados.git/sdk/go/keepclient"
@@ -94,6 +95,6 @@ func GenerateRandomAPIToken() string {
 
 // Put block
 var PutContent = func(content []byte, locator string) (err error) {
-       _, err = PutBlock(content, locator)
+       _, err = PutBlock(context.Background(), content, locator)
        return
 }
index caed35b670e9484e9978e771e0e8c2aea2532e52..17923f807dc8a8f11bc77ce8dc0732001a4a8ba8 100644 (file)
@@ -1,11 +1,14 @@
 package main
 
 import (
+       "bytes"
+       "context"
        "encoding/base64"
        "encoding/hex"
        "flag"
        "fmt"
        "io"
+       "io/ioutil"
        "log"
        "net/http"
        "os"
@@ -19,6 +22,11 @@ import (
        "github.com/AdRoll/goamz/s3"
 )
 
+const (
+       s3DefaultReadTimeout    = arvados.Duration(10 * time.Minute)
+       s3DefaultConnectTimeout = arvados.Duration(time.Minute)
+)
+
 var (
        // ErrS3TrashDisabled is returned by Trash if that operation
        // is impossible with the current config.
@@ -134,6 +142,8 @@ type S3Volume struct {
        LocationConstraint bool
        IndexPageSize      int
        S3Replication      int
+       ConnectTimeout     arvados.Duration
+       ReadTimeout        arvados.Duration
        RaceWindow         arvados.Duration
        ReadOnly           bool
        UnsafeDelete       bool
@@ -147,24 +157,28 @@ type S3Volume struct {
 func (*S3Volume) Examples() []Volume {
        return []Volume{
                &S3Volume{
-                       AccessKeyFile: "/etc/aws_s3_access_key.txt",
-                       SecretKeyFile: "/etc/aws_s3_secret_key.txt",
-                       Endpoint:      "",
-                       Region:        "us-east-1",
-                       Bucket:        "example-bucket-name",
-                       IndexPageSize: 1000,
-                       S3Replication: 2,
-                       RaceWindow:    arvados.Duration(24 * time.Hour),
+                       AccessKeyFile:  "/etc/aws_s3_access_key.txt",
+                       SecretKeyFile:  "/etc/aws_s3_secret_key.txt",
+                       Endpoint:       "",
+                       Region:         "us-east-1",
+                       Bucket:         "example-bucket-name",
+                       IndexPageSize:  1000,
+                       S3Replication:  2,
+                       RaceWindow:     arvados.Duration(24 * time.Hour),
+                       ConnectTimeout: arvados.Duration(time.Minute),
+                       ReadTimeout:    arvados.Duration(5 * time.Minute),
                },
                &S3Volume{
-                       AccessKeyFile: "/etc/gce_s3_access_key.txt",
-                       SecretKeyFile: "/etc/gce_s3_secret_key.txt",
-                       Endpoint:      "https://storage.googleapis.com",
-                       Region:        "",
-                       Bucket:        "example-bucket-name",
-                       IndexPageSize: 1000,
-                       S3Replication: 2,
-                       RaceWindow:    arvados.Duration(24 * time.Hour),
+                       AccessKeyFile:  "/etc/gce_s3_access_key.txt",
+                       SecretKeyFile:  "/etc/gce_s3_secret_key.txt",
+                       Endpoint:       "https://storage.googleapis.com",
+                       Region:         "",
+                       Bucket:         "example-bucket-name",
+                       IndexPageSize:  1000,
+                       S3Replication:  2,
+                       RaceWindow:     arvados.Duration(24 * time.Hour),
+                       ConnectTimeout: arvados.Duration(time.Minute),
+                       ReadTimeout:    arvados.Duration(5 * time.Minute),
                },
        }
 }
@@ -203,13 +217,47 @@ func (v *S3Volume) Start() error {
        if err != nil {
                return err
        }
+
+       // Zero timeouts mean "wait forever", which is a bad
+       // default. Default to long timeouts instead.
+       if v.ConnectTimeout == 0 {
+               v.ConnectTimeout = s3DefaultConnectTimeout
+       }
+       if v.ReadTimeout == 0 {
+               v.ReadTimeout = s3DefaultReadTimeout
+       }
+
+       client := s3.New(auth, region)
+       client.ConnectTimeout = time.Duration(v.ConnectTimeout)
+       client.ReadTimeout = time.Duration(v.ReadTimeout)
        v.bucket = &s3.Bucket{
-               S3:   s3.New(auth, region),
+               S3:   client,
                Name: v.Bucket,
        }
        return nil
 }
 
+func (v *S3Volume) getReaderWithContext(ctx context.Context, loc string) (rdr io.ReadCloser, err error) {
+       ready := make(chan bool)
+       go func() {
+               rdr, err = v.getReader(loc)
+               close(ready)
+       }()
+       select {
+       case <-ready:
+               return
+       case <-ctx.Done():
+               theConfig.debugLogf("s3: abandoning getReader(): %s", ctx.Err())
+               go func() {
+                       <-ready
+                       if err == nil {
+                               rdr.Close()
+                       }
+               }()
+               return nil, ctx.Err()
+       }
+}
+
 // getReader wraps (Bucket)GetReader.
 //
 // In situations where (Bucket)GetReader would fail because the block
@@ -242,50 +290,106 @@ func (v *S3Volume) getReader(loc string) (rdr io.ReadCloser, err error) {
 
 // Get a block: copy the block data into buf, and return the number of
 // bytes copied.
-func (v *S3Volume) Get(loc string, buf []byte) (int, error) {
-       rdr, err := v.getReader(loc)
+func (v *S3Volume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
+       rdr, err := v.getReaderWithContext(ctx, loc)
        if err != nil {
                return 0, err
        }
-       defer rdr.Close()
-       n, err := io.ReadFull(rdr, buf)
-       switch err {
-       case nil, io.EOF, io.ErrUnexpectedEOF:
-               return n, nil
-       default:
-               return 0, v.translateError(err)
+
+       var n int
+       ready := make(chan bool)
+       go func() {
+               defer close(ready)
+
+               defer rdr.Close()
+               n, err = io.ReadFull(rdr, buf)
+
+               switch err {
+               case nil, io.EOF, io.ErrUnexpectedEOF:
+                       err = nil
+               default:
+                       err = v.translateError(err)
+               }
+       }()
+       select {
+       case <-ctx.Done():
+               theConfig.debugLogf("s3: interrupting ReadFull() with Close() because %s", ctx.Err())
+               rdr.Close()
+               // Must wait for ReadFull to return, to ensure it
+               // doesn't write to buf after we return.
+               theConfig.debugLogf("s3: waiting for ReadFull() to fail")
+               <-ready
+               return 0, ctx.Err()
+       case <-ready:
+               return n, err
        }
 }
 
 // Compare the given data with the stored data.
-func (v *S3Volume) Compare(loc string, expect []byte) error {
-       rdr, err := v.getReader(loc)
+func (v *S3Volume) Compare(ctx context.Context, loc string, expect []byte) error {
+       rdr, err := v.getReaderWithContext(ctx, loc)
        if err != nil {
                return err
        }
        defer rdr.Close()
-       return v.translateError(compareReaderWithBuf(rdr, expect, loc[:32]))
+       return v.translateError(compareReaderWithBuf(ctx, rdr, expect, loc[:32]))
 }
 
 // Put writes a block.
-func (v *S3Volume) Put(loc string, block []byte) error {
+func (v *S3Volume) Put(ctx context.Context, loc string, block []byte) error {
        if v.ReadOnly {
                return MethodDisabledError
        }
        var opts s3.Options
-       if len(block) > 0 {
+       size := len(block)
+       if size > 0 {
                md5, err := hex.DecodeString(loc)
                if err != nil {
                        return err
                }
                opts.ContentMD5 = base64.StdEncoding.EncodeToString(md5)
        }
-       err := v.bucket.Put(loc, block, "application/octet-stream", s3ACL, opts)
-       if err != nil {
+
+       // Send the block data through a pipe, so that (if we need to)
+       // we can close the pipe early and abandon our PutReader()
+       // goroutine, without worrying about PutReader() accessing our
+       // block buffer after we release it.
+       bufr, bufw := io.Pipe()
+       go func() {
+               io.Copy(bufw, bytes.NewReader(block))
+               bufw.Close()
+       }()
+
+       var err error
+       ready := make(chan bool)
+       go func() {
+               defer func() {
+                       if ctx.Err() != nil {
+                               theConfig.debugLogf("%s: abandoned PutReader goroutine finished with err: %s", v, err)
+                       }
+               }()
+               defer close(ready)
+               err = v.bucket.PutReader(loc, bufr, int64(size), "application/octet-stream", s3ACL, opts)
+               if err != nil {
+                       return
+               }
+               err = v.bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
+       }()
+       select {
+       case <-ctx.Done():
+               theConfig.debugLogf("%s: taking PutReader's input away: %s", v, ctx.Err())
+               // Our pipe might be stuck in Write(), waiting for
+               // io.Copy() to read. If so, un-stick it. This means
+               // PutReader will get corrupt data, but that's OK: the
+               // size and MD5 won't match, so the write will fail.
+               go io.Copy(ioutil.Discard, bufr)
+               // CloseWithError() will return once pending I/O is done.
+               bufw.CloseWithError(ctx.Err())
+               theConfig.debugLogf("%s: abandoning PutReader goroutine", v)
+               return ctx.Err()
+       case <-ready:
                return v.translateError(err)
        }
-       err = v.bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
-       return v.translateError(err)
 }
 
 // Touch sets the timestamp for the given locator to the current time.
index 76dcbc9f9ea2f8fb680a25a31b84735f991b1b51..63b186220c30a562e900722d63d38be50bde05d6 100644 (file)
@@ -2,6 +2,7 @@ package main
 
 import (
        "bytes"
+       "context"
        "crypto/md5"
        "fmt"
        "io/ioutil"
@@ -223,7 +224,7 @@ func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
                // Check canGet
                loc, blk := setupScenario()
                buf := make([]byte, len(blk))
-               _, err := v.Get(loc, buf)
+               _, err := v.Get(context.Background(), loc, buf)
                c.Check(err == nil, check.Equals, scenario.canGet)
                if err != nil {
                        c.Check(os.IsNotExist(err), check.Equals, true)
@@ -233,7 +234,7 @@ func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
                loc, blk = setupScenario()
                err = v.Trash(loc)
                c.Check(err == nil, check.Equals, scenario.canTrash)
-               _, err = v.Get(loc, buf)
+               _, err = v.Get(context.Background(), loc, buf)
                c.Check(err == nil, check.Equals, scenario.canGetAfterTrash)
                if err != nil {
                        c.Check(os.IsNotExist(err), check.Equals, true)
@@ -248,7 +249,7 @@ func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
                        // should be able to Get after Untrash --
                        // regardless of timestamps, errors, race
                        // conditions, etc.
-                       _, err = v.Get(loc, buf)
+                       _, err = v.Get(context.Background(), loc, buf)
                        c.Check(err, check.IsNil)
                }
 
@@ -269,7 +270,7 @@ func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
                // Check for current Mtime after Put (applies to all
                // scenarios)
                loc, blk = setupScenario()
-               err = v.Put(loc, blk)
+               err = v.Put(context.Background(), loc, blk)
                c.Check(err, check.IsNil)
                t, err := v.Mtime(loc)
                c.Check(err, check.IsNil)
index 5ec413d1bde899d606a6792f40ffd3afe65f3615..04b034a97976980c5ce66d59c361b44936dabbca 100644 (file)
@@ -2,6 +2,7 @@ package main
 
 import (
        "container/list"
+       "context"
        "testing"
        "time"
 )
@@ -219,15 +220,15 @@ func performTrashWorkerTest(testData TrashWorkerTestData, t *testing.T) {
        // Put test content
        vols := KeepVM.AllWritable()
        if testData.CreateData {
-               vols[0].Put(testData.Locator1, testData.Block1)
-               vols[0].Put(testData.Locator1+".meta", []byte("metadata"))
+               vols[0].Put(context.Background(), testData.Locator1, testData.Block1)
+               vols[0].Put(context.Background(), testData.Locator1+".meta", []byte("metadata"))
 
                if testData.CreateInVolume1 {
-                       vols[0].Put(testData.Locator2, testData.Block2)
-                       vols[0].Put(testData.Locator2+".meta", []byte("metadata"))
+                       vols[0].Put(context.Background(), testData.Locator2, testData.Block2)
+                       vols[0].Put(context.Background(), testData.Locator2+".meta", []byte("metadata"))
                } else {
-                       vols[1].Put(testData.Locator2, testData.Block2)
-                       vols[1].Put(testData.Locator2+".meta", []byte("metadata"))
+                       vols[1].Put(context.Background(), testData.Locator2, testData.Block2)
+                       vols[1].Put(context.Background(), testData.Locator2+".meta", []byte("metadata"))
                }
        }
 
@@ -291,7 +292,7 @@ func performTrashWorkerTest(testData TrashWorkerTestData, t *testing.T) {
 
        // Verify Locator1 to be un/deleted as expected
        buf := make([]byte, BlockSize)
-       size, err := GetBlock(testData.Locator1, buf, nil)
+       size, err := GetBlock(context.Background(), testData.Locator1, buf, nil)
        if testData.ExpectLocator1 {
                if size == 0 || err != nil {
                        t.Errorf("Expected Locator1 to be still present: %s", testData.Locator1)
@@ -304,7 +305,7 @@ func performTrashWorkerTest(testData TrashWorkerTestData, t *testing.T) {
 
        // Verify Locator2 to be un/deleted as expected
        if testData.Locator1 != testData.Locator2 {
-               size, err = GetBlock(testData.Locator2, buf, nil)
+               size, err = GetBlock(context.Background(), testData.Locator2, buf, nil)
                if testData.ExpectLocator2 {
                        if size == 0 || err != nil {
                                t.Errorf("Expected Locator2 to be still present: %s", testData.Locator2)
@@ -323,7 +324,7 @@ func performTrashWorkerTest(testData TrashWorkerTestData, t *testing.T) {
                locatorFoundIn := 0
                for _, volume := range KeepVM.AllReadable() {
                        buf := make([]byte, BlockSize)
-                       if _, err := volume.Get(testData.Locator1, buf); err == nil {
+                       if _, err := volume.Get(context.Background(), testData.Locator1, buf); err == nil {
                                locatorFoundIn = locatorFoundIn + 1
                        }
                }
index 6e01e75b879b339232603d38f93cb040ecc6d86c..57e18aba9f691ceb43f32a928a0e3a95e9d505ec 100644 (file)
@@ -1,6 +1,7 @@
 package main
 
 import (
+       "context"
        "io"
        "sync/atomic"
        "time"
@@ -47,14 +48,14 @@ type Volume interface {
        // any of the data.
        //
        // len(buf) will not exceed BlockSize.
-       Get(loc string, buf []byte) (int, error)
+       Get(ctx context.Context, loc string, buf []byte) (int, error)
 
        // Compare the given data with the stored data (i.e., what Get
        // would return). If equal, return nil. If not, return
        // CollisionError or DiskHashError (depending on whether the
        // data on disk matches the expected hash), or whatever error
        // was encountered opening/reading the stored data.
-       Compare(loc string, data []byte) error
+       Compare(ctx context.Context, loc string, data []byte) error
 
        // Put writes a block to an underlying storage device.
        //
@@ -84,7 +85,7 @@ type Volume interface {
        //
        // Put should not verify that loc==hash(block): this is the
        // caller's responsibility.
-       Put(loc string, block []byte) error
+       Put(ctx context.Context, loc string, block []byte) error
 
        // Touch sets the timestamp for the given locator to the
        // current time.
index 1738fe9b513bb4d86482ceede86a04539d29d418..7e72a8f246ee60410e7110417de3284ae4263ca3 100644 (file)
@@ -2,6 +2,7 @@ package main
 
 import (
        "bytes"
+       "context"
        "crypto/md5"
        "fmt"
        "os"
@@ -92,7 +93,7 @@ func testGet(t TB, factory TestableVolumeFactory) {
        v.PutRaw(TestHash, TestBlock)
 
        buf := make([]byte, BlockSize)
-       n, err := v.Get(TestHash, buf)
+       n, err := v.Get(context.Background(), TestHash, buf)
        if err != nil {
                t.Fatal(err)
        }
@@ -109,7 +110,7 @@ func testGetNoSuchBlock(t TB, factory TestableVolumeFactory) {
        defer v.Teardown()
 
        buf := make([]byte, BlockSize)
-       if _, err := v.Get(TestHash2, buf); err == nil {
+       if _, err := v.Get(context.Background(), TestHash2, buf); err == nil {
                t.Errorf("Expected error while getting non-existing block %v", TestHash2)
        }
 }
@@ -121,7 +122,7 @@ func testCompareNonexistent(t TB, factory TestableVolumeFactory) {
        v := factory(t)
        defer v.Teardown()
 
-       err := v.Compare(TestHash, TestBlock)
+       err := v.Compare(context.Background(), TestHash, TestBlock)
        if err != os.ErrNotExist {
                t.Errorf("Got err %T %q, expected os.ErrNotExist", err, err)
        }
@@ -136,7 +137,7 @@ func testCompareSameContent(t TB, factory TestableVolumeFactory, testHash string
        v.PutRaw(testHash, testData)
 
        // Compare the block locator with same content
-       err := v.Compare(testHash, testData)
+       err := v.Compare(context.Background(), testHash, testData)
        if err != nil {
                t.Errorf("Got err %q, expected nil", err)
        }
@@ -154,7 +155,7 @@ func testCompareWithCollision(t TB, factory TestableVolumeFactory, testHash stri
        v.PutRaw(testHash, testDataA)
 
        // Compare the block locator with different content; collision
-       err := v.Compare(TestHash, testDataB)
+       err := v.Compare(context.Background(), TestHash, testDataB)
        if err == nil {
                t.Errorf("Got err nil, expected error due to collision")
        }
@@ -170,7 +171,7 @@ func testCompareWithCorruptStoredData(t TB, factory TestableVolumeFactory, testH
 
        v.PutRaw(TestHash, testDataB)
 
-       err := v.Compare(testHash, testDataA)
+       err := v.Compare(context.Background(), testHash, testDataA)
        if err == nil || err == CollisionError {
                t.Errorf("Got err %+v, expected non-collision error", err)
        }
@@ -186,12 +187,12 @@ func testPutBlockWithSameContent(t TB, factory TestableVolumeFactory, testHash s
                return
        }
 
-       err := v.Put(testHash, testData)
+       err := v.Put(context.Background(), testHash, testData)
        if err != nil {
                t.Errorf("Got err putting block %q: %q, expected nil", TestBlock, err)
        }
 
-       err = v.Put(testHash, testData)
+       err = v.Put(context.Background(), testHash, testData)
        if err != nil {
                t.Errorf("Got err putting block second time %q: %q, expected nil", TestBlock, err)
        }
@@ -209,9 +210,9 @@ func testPutBlockWithDifferentContent(t TB, factory TestableVolumeFactory, testH
 
        v.PutRaw(testHash, testDataA)
 
-       putErr := v.Put(testHash, testDataB)
+       putErr := v.Put(context.Background(), testHash, testDataB)
        buf := make([]byte, BlockSize)
-       n, getErr := v.Get(testHash, buf)
+       n, getErr := v.Get(context.Background(), testHash, buf)
        if putErr == nil {
                // Put must not return a nil error unless it has
                // overwritten the existing data.
@@ -238,23 +239,23 @@ func testPutMultipleBlocks(t TB, factory TestableVolumeFactory) {
                return
        }
 
-       err := v.Put(TestHash, TestBlock)
+       err := v.Put(context.Background(), TestHash, TestBlock)
        if err != nil {
                t.Errorf("Got err putting block %q: %q, expected nil", TestBlock, err)
        }
 
-       err = v.Put(TestHash2, TestBlock2)
+       err = v.Put(context.Background(), TestHash2, TestBlock2)
        if err != nil {
                t.Errorf("Got err putting block %q: %q, expected nil", TestBlock2, err)
        }
 
-       err = v.Put(TestHash3, TestBlock3)
+       err = v.Put(context.Background(), TestHash3, TestBlock3)
        if err != nil {
                t.Errorf("Got err putting block %q: %q, expected nil", TestBlock3, err)
        }
 
        data := make([]byte, BlockSize)
-       n, err := v.Get(TestHash, data)
+       n, err := v.Get(context.Background(), TestHash, data)
        if err != nil {
                t.Error(err)
        } else {
@@ -263,7 +264,7 @@ func testPutMultipleBlocks(t TB, factory TestableVolumeFactory) {
                }
        }
 
-       n, err = v.Get(TestHash2, data)
+       n, err = v.Get(context.Background(), TestHash2, data)
        if err != nil {
                t.Error(err)
        } else {
@@ -272,7 +273,7 @@ func testPutMultipleBlocks(t TB, factory TestableVolumeFactory) {
                }
        }
 
-       n, err = v.Get(TestHash3, data)
+       n, err = v.Get(context.Background(), TestHash3, data)
        if err != nil {
                t.Error(err)
        } else {
@@ -294,7 +295,7 @@ func testPutAndTouch(t TB, factory TestableVolumeFactory) {
                return
        }
 
-       if err := v.Put(TestHash, TestBlock); err != nil {
+       if err := v.Put(context.Background(), TestHash, TestBlock); err != nil {
                t.Error(err)
        }
 
@@ -314,7 +315,7 @@ func testPutAndTouch(t TB, factory TestableVolumeFactory) {
        }
 
        // Write the same block again.
-       if err := v.Put(TestHash, TestBlock); err != nil {
+       if err := v.Put(context.Background(), TestHash, TestBlock); err != nil {
                t.Error(err)
        }
 
@@ -437,13 +438,13 @@ func testDeleteNewBlock(t TB, factory TestableVolumeFactory) {
                return
        }
 
-       v.Put(TestHash, TestBlock)
+       v.Put(context.Background(), TestHash, TestBlock)
 
        if err := v.Trash(TestHash); err != nil {
                t.Error(err)
        }
        data := make([]byte, BlockSize)
-       n, err := v.Get(TestHash, data)
+       n, err := v.Get(context.Background(), TestHash, data)
        if err != nil {
                t.Error(err)
        } else if bytes.Compare(data[:n], TestBlock) != 0 {
@@ -463,14 +464,14 @@ func testDeleteOldBlock(t TB, factory TestableVolumeFactory) {
                return
        }
 
-       v.Put(TestHash, TestBlock)
+       v.Put(context.Background(), TestHash, TestBlock)
        v.TouchWithDate(TestHash, time.Now().Add(-2*theConfig.BlobSignatureTTL.Duration()))
 
        if err := v.Trash(TestHash); err != nil {
                t.Error(err)
        }
        data := make([]byte, BlockSize)
-       if _, err := v.Get(TestHash, data); err == nil || !os.IsNotExist(err) {
+       if _, err := v.Get(context.Background(), TestHash, data); err == nil || !os.IsNotExist(err) {
                t.Errorf("os.IsNotExist(%v) should have been true", err)
        }
 
@@ -479,7 +480,7 @@ func testDeleteOldBlock(t TB, factory TestableVolumeFactory) {
                t.Fatalf("os.IsNotExist(%v) should have been true", err)
        }
 
-       err = v.Compare(TestHash, TestBlock)
+       err = v.Compare(context.Background(), TestHash, TestBlock)
        if err == nil || !os.IsNotExist(err) {
                t.Fatalf("os.IsNotExist(%v) should have been true", err)
        }
@@ -553,17 +554,17 @@ func testUpdateReadOnly(t TB, factory TestableVolumeFactory) {
        buf := make([]byte, BlockSize)
 
        // Get from read-only volume should succeed
-       _, err := v.Get(TestHash, buf)
+       _, err := v.Get(context.Background(), TestHash, buf)
        if err != nil {
                t.Errorf("got err %v, expected nil", err)
        }
 
        // Put a new block to read-only volume should result in error
-       err = v.Put(TestHash2, TestBlock2)
+       err = v.Put(context.Background(), TestHash2, TestBlock2)
        if err == nil {
                t.Errorf("Expected error when putting block in a read-only volume")
        }
-       _, err = v.Get(TestHash2, buf)
+       _, err = v.Get(context.Background(), TestHash2, buf)
        if err == nil {
                t.Errorf("Expected error when getting block whose put in read-only volume failed")
        }
@@ -581,7 +582,7 @@ func testUpdateReadOnly(t TB, factory TestableVolumeFactory) {
        }
 
        // Overwriting an existing block in read-only volume should result in error
-       err = v.Put(TestHash, TestBlock)
+       err = v.Put(context.Background(), TestHash, TestBlock)
        if err == nil {
                t.Errorf("Expected error when putting block in a read-only volume")
        }
@@ -600,7 +601,7 @@ func testGetConcurrent(t TB, factory TestableVolumeFactory) {
        sem := make(chan int)
        go func() {
                buf := make([]byte, BlockSize)
-               n, err := v.Get(TestHash, buf)
+               n, err := v.Get(context.Background(), TestHash, buf)
                if err != nil {
                        t.Errorf("err1: %v", err)
                }
@@ -612,7 +613,7 @@ func testGetConcurrent(t TB, factory TestableVolumeFactory) {
 
        go func() {
                buf := make([]byte, BlockSize)
-               n, err := v.Get(TestHash2, buf)
+               n, err := v.Get(context.Background(), TestHash2, buf)
                if err != nil {
                        t.Errorf("err2: %v", err)
                }
@@ -624,7 +625,7 @@ func testGetConcurrent(t TB, factory TestableVolumeFactory) {
 
        go func() {
                buf := make([]byte, BlockSize)
-               n, err := v.Get(TestHash3, buf)
+               n, err := v.Get(context.Background(), TestHash3, buf)
                if err != nil {
                        t.Errorf("err3: %v", err)
                }
@@ -652,7 +653,7 @@ func testPutConcurrent(t TB, factory TestableVolumeFactory) {
 
        sem := make(chan int)
        go func(sem chan int) {
-               err := v.Put(TestHash, TestBlock)
+               err := v.Put(context.Background(), TestHash, TestBlock)
                if err != nil {
                        t.Errorf("err1: %v", err)
                }
@@ -660,7 +661,7 @@ func testPutConcurrent(t TB, factory TestableVolumeFactory) {
        }(sem)
 
        go func(sem chan int) {
-               err := v.Put(TestHash2, TestBlock2)
+               err := v.Put(context.Background(), TestHash2, TestBlock2)
                if err != nil {
                        t.Errorf("err2: %v", err)
                }
@@ -668,7 +669,7 @@ func testPutConcurrent(t TB, factory TestableVolumeFactory) {
        }(sem)
 
        go func(sem chan int) {
-               err := v.Put(TestHash3, TestBlock3)
+               err := v.Put(context.Background(), TestHash3, TestBlock3)
                if err != nil {
                        t.Errorf("err3: %v", err)
                }
@@ -682,7 +683,7 @@ func testPutConcurrent(t TB, factory TestableVolumeFactory) {
 
        // Double check that we actually wrote the blocks we expected to write.
        buf := make([]byte, BlockSize)
-       n, err := v.Get(TestHash, buf)
+       n, err := v.Get(context.Background(), TestHash, buf)
        if err != nil {
                t.Errorf("Get #1: %v", err)
        }
@@ -690,7 +691,7 @@ func testPutConcurrent(t TB, factory TestableVolumeFactory) {
                t.Errorf("Get #1: expected %s, got %s", string(TestBlock), string(buf[:n]))
        }
 
-       n, err = v.Get(TestHash2, buf)
+       n, err = v.Get(context.Background(), TestHash2, buf)
        if err != nil {
                t.Errorf("Get #2: %v", err)
        }
@@ -698,7 +699,7 @@ func testPutConcurrent(t TB, factory TestableVolumeFactory) {
                t.Errorf("Get #2: expected %s, got %s", string(TestBlock2), string(buf[:n]))
        }
 
-       n, err = v.Get(TestHash3, buf)
+       n, err = v.Get(context.Background(), TestHash3, buf)
        if err != nil {
                t.Errorf("Get #3: %v", err)
        }
@@ -720,12 +721,12 @@ func testPutFullBlock(t TB, factory TestableVolumeFactory) {
        wdata[0] = 'a'
        wdata[BlockSize-1] = 'z'
        hash := fmt.Sprintf("%x", md5.Sum(wdata))
-       err := v.Put(hash, wdata)
+       err := v.Put(context.Background(), hash, wdata)
        if err != nil {
                t.Fatal(err)
        }
        buf := make([]byte, BlockSize)
-       n, err := v.Get(hash, buf)
+       n, err := v.Get(context.Background(), hash, buf)
        if err != nil {
                t.Error(err)
        }
@@ -752,7 +753,7 @@ func testTrashUntrash(t TB, factory TestableVolumeFactory) {
        v.TouchWithDate(TestHash, time.Now().Add(-2*theConfig.BlobSignatureTTL.Duration()))
 
        buf := make([]byte, BlockSize)
-       n, err := v.Get(TestHash, buf)
+       n, err := v.Get(context.Background(), TestHash, buf)
        if err != nil {
                t.Fatal(err)
        }
@@ -771,7 +772,7 @@ func testTrashUntrash(t TB, factory TestableVolumeFactory) {
                        t.Fatal(err)
                }
        } else {
-               _, err = v.Get(TestHash, buf)
+               _, err = v.Get(context.Background(), TestHash, buf)
                if err == nil || !os.IsNotExist(err) {
                        t.Errorf("os.IsNotExist(%v) should have been true", err)
                }
@@ -784,7 +785,7 @@ func testTrashUntrash(t TB, factory TestableVolumeFactory) {
        }
 
        // Get the block - after trash and untrash sequence
-       n, err = v.Get(TestHash, buf)
+       n, err = v.Get(context.Background(), TestHash, buf)
        if err != nil {
                t.Fatal(err)
        }
@@ -802,7 +803,7 @@ func testTrashEmptyTrashUntrash(t TB, factory TestableVolumeFactory) {
 
        checkGet := func() error {
                buf := make([]byte, BlockSize)
-               n, err := v.Get(TestHash, buf)
+               n, err := v.Get(context.Background(), TestHash, buf)
                if err != nil {
                        return err
                }
@@ -815,7 +816,7 @@ func testTrashEmptyTrashUntrash(t TB, factory TestableVolumeFactory) {
                        return err
                }
 
-               err = v.Compare(TestHash, TestBlock)
+               err = v.Compare(context.Background(), TestHash, TestBlock)
                if err != nil {
                        return err
                }
index 6ab386aec4fcc7774af90c4fa5ca879258ac9404..931c10e69044c4715eb35ccab4d33872a848db5d 100644 (file)
@@ -2,6 +2,7 @@ package main
 
 import (
        "bytes"
+       "context"
        "crypto/md5"
        "errors"
        "fmt"
@@ -95,7 +96,7 @@ func (v *MockVolume) gotCall(method string) {
        }
 }
 
-func (v *MockVolume) Compare(loc string, buf []byte) error {
+func (v *MockVolume) Compare(ctx context.Context, loc string, buf []byte) error {
        v.gotCall("Compare")
        <-v.Gate
        if v.Bad {
@@ -113,7 +114,7 @@ func (v *MockVolume) Compare(loc string, buf []byte) error {
        }
 }
 
-func (v *MockVolume) Get(loc string, buf []byte) (int, error) {
+func (v *MockVolume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
        v.gotCall("Get")
        <-v.Gate
        if v.Bad {
@@ -125,7 +126,7 @@ func (v *MockVolume) Get(loc string, buf []byte) (int, error) {
        return 0, os.ErrNotExist
 }
 
-func (v *MockVolume) Put(loc string, block []byte) error {
+func (v *MockVolume) Put(ctx context.Context, loc string, block []byte) error {
        v.gotCall("Put")
        <-v.Gate
        if v.Bad {
index b5753dec04638927162a328d2a43f2fd4e567a50..5239ed37402c93f25af0d6c65c03f6a953597cda 100644 (file)
@@ -2,6 +2,7 @@ package main
 
 import (
        "bufio"
+       "context"
        "flag"
        "fmt"
        "io"
@@ -182,11 +183,14 @@ func (v *UnixVolume) Mtime(loc string) (time.Time, error) {
 
 // Lock the locker (if one is in use), open the file for reading, and
 // call the given function if and when the file is ready to read.
-func (v *UnixVolume) getFunc(path string, fn func(io.Reader) error) error {
+func (v *UnixVolume) getFunc(ctx context.Context, path string, fn func(io.Reader) error) error {
        if v.locker != nil {
                v.locker.Lock()
                defer v.locker.Unlock()
        }
+       if ctx.Err() != nil {
+               return ctx.Err()
+       }
        f, err := os.Open(path)
        if err != nil {
                return err
@@ -210,7 +214,7 @@ func (v *UnixVolume) stat(path string) (os.FileInfo, error) {
 
 // Get retrieves a block, copies it to the given slice, and returns
 // the number of bytes copied.
-func (v *UnixVolume) Get(loc string, buf []byte) (int, error) {
+func (v *UnixVolume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
        path := v.blockPath(loc)
        stat, err := v.stat(path)
        if err != nil {
@@ -221,7 +225,7 @@ func (v *UnixVolume) Get(loc string, buf []byte) (int, error) {
        }
        var read int
        size := int(stat.Size())
-       err = v.getFunc(path, func(rdr io.Reader) error {
+       err = v.getFunc(ctx, path, func(rdr io.Reader) error {
                read, err = io.ReadFull(rdr, buf[:size])
                return err
        })
@@ -231,13 +235,13 @@ func (v *UnixVolume) Get(loc string, buf []byte) (int, error) {
 // Compare returns nil if Get(loc) would return the same content as
 // expect. It is functionally equivalent to Get() followed by
 // bytes.Compare(), but uses less memory.
-func (v *UnixVolume) Compare(loc string, expect []byte) error {
+func (v *UnixVolume) Compare(ctx context.Context, loc string, expect []byte) error {
        path := v.blockPath(loc)
        if _, err := v.stat(path); err != nil {
                return v.translateError(err)
        }
-       return v.getFunc(path, func(rdr io.Reader) error {
-               return compareReaderWithBuf(rdr, expect, loc[:32])
+       return v.getFunc(ctx, path, func(rdr io.Reader) error {
+               return compareReaderWithBuf(ctx, rdr, expect, loc[:32])
        })
 }
 
@@ -245,7 +249,7 @@ func (v *UnixVolume) Compare(loc string, expect []byte) error {
 // "loc".  It returns nil on success.  If the volume is full, it
 // returns a FullError.  If the write fails due to some other error,
 // that error is returned.
-func (v *UnixVolume) Put(loc string, block []byte) error {
+func (v *UnixVolume) Put(ctx context.Context, loc string, block []byte) error {
        if v.ReadOnly {
                return MethodDisabledError
        }
@@ -270,6 +274,11 @@ func (v *UnixVolume) Put(loc string, block []byte) error {
                v.locker.Lock()
                defer v.locker.Unlock()
        }
+       select {
+       case <-ctx.Done():
+               return ctx.Err()
+       default:
+       }
        if _, err := tmpfile.Write(block); err != nil {
                log.Printf("%s: writing to %s: %s\n", v, bpath, err)
                tmpfile.Close()
index 887247d3c3956e9475edf8437c913b3e1fc922c9..3021d6bd362724e7136d1054095e49bb53778199 100644 (file)
@@ -2,6 +2,7 @@ package main
 
 import (
        "bytes"
+       "context"
        "errors"
        "fmt"
        "io"
@@ -45,7 +46,7 @@ func (v *TestableUnixVolume) PutRaw(locator string, data []byte) {
                v.ReadOnly = orig
        }(v.ReadOnly)
        v.ReadOnly = false
-       err := v.Put(locator, data)
+       err := v.Put(context.Background(), locator, data)
        if err != nil {
                v.t.Fatal(err)
        }
@@ -117,10 +118,10 @@ func TestReplicationDefault1(t *testing.T) {
 func TestGetNotFound(t *testing.T) {
        v := NewTestableUnixVolume(t, false, false)
        defer v.Teardown()
-       v.Put(TestHash, TestBlock)
+       v.Put(context.Background(), TestHash, TestBlock)
 
        buf := make([]byte, BlockSize)
-       n, err := v.Get(TestHash2, buf)
+       n, err := v.Get(context.Background(), TestHash2, buf)
        switch {
        case os.IsNotExist(err):
                break
@@ -135,7 +136,7 @@ func TestPut(t *testing.T) {
        v := NewTestableUnixVolume(t, false, false)
        defer v.Teardown()
 
-       err := v.Put(TestHash, TestBlock)
+       err := v.Put(context.Background(), TestHash, TestBlock)
        if err != nil {
                t.Error(err)
        }
@@ -153,7 +154,7 @@ func TestPutBadVolume(t *testing.T) {
        defer v.Teardown()
 
        os.Chmod(v.Root, 000)
-       err := v.Put(TestHash, TestBlock)
+       err := v.Put(context.Background(), TestHash, TestBlock)
        if err == nil {
                t.Error("Write should have failed")
        }
@@ -166,12 +167,12 @@ func TestUnixVolumeReadonly(t *testing.T) {
        v.PutRaw(TestHash, TestBlock)
 
        buf := make([]byte, BlockSize)
-       _, err := v.Get(TestHash, buf)
+       _, err := v.Get(context.Background(), TestHash, buf)
        if err != nil {
                t.Errorf("got err %v, expected nil", err)
        }
 
-       err = v.Put(TestHash, TestBlock)
+       err = v.Put(context.Background(), TestHash, TestBlock)
        if err != MethodDisabledError {
                t.Errorf("got err %v, expected MethodDisabledError", err)
        }
@@ -231,9 +232,9 @@ func TestUnixVolumeGetFuncWorkerError(t *testing.T) {
        v := NewTestableUnixVolume(t, false, false)
        defer v.Teardown()
 
-       v.Put(TestHash, TestBlock)
+       v.Put(context.Background(), TestHash, TestBlock)
        mockErr := errors.New("Mock error")
-       err := v.getFunc(v.blockPath(TestHash), func(rdr io.Reader) error {
+       err := v.getFunc(context.Background(), v.blockPath(TestHash), func(rdr io.Reader) error {
                return mockErr
        })
        if err != mockErr {
@@ -246,7 +247,7 @@ func TestUnixVolumeGetFuncFileError(t *testing.T) {
        defer v.Teardown()
 
        funcCalled := false
-       err := v.getFunc(v.blockPath(TestHash), func(rdr io.Reader) error {
+       err := v.getFunc(context.Background(), v.blockPath(TestHash), func(rdr io.Reader) error {
                funcCalled = true
                return nil
        })
@@ -262,13 +263,13 @@ func TestUnixVolumeGetFuncWorkerWaitsOnMutex(t *testing.T) {
        v := NewTestableUnixVolume(t, false, false)
        defer v.Teardown()
 
-       v.Put(TestHash, TestBlock)
+       v.Put(context.Background(), TestHash, TestBlock)
 
        mtx := NewMockMutex()
        v.locker = mtx
 
        funcCalled := make(chan struct{})
-       go v.getFunc(v.blockPath(TestHash), func(rdr io.Reader) error {
+       go v.getFunc(context.Background(), v.blockPath(TestHash), func(rdr io.Reader) error {
                funcCalled <- struct{}{}
                return nil
        })
@@ -297,26 +298,26 @@ func TestUnixVolumeCompare(t *testing.T) {
        v := NewTestableUnixVolume(t, false, false)
        defer v.Teardown()
 
-       v.Put(TestHash, TestBlock)
-       err := v.Compare(TestHash, TestBlock)
+       v.Put(context.Background(), TestHash, TestBlock)
+       err := v.Compare(context.Background(), TestHash, TestBlock)
        if err != nil {
                t.Errorf("Got err %q, expected nil", err)
        }
 
-       err = v.Compare(TestHash, []byte("baddata"))
+       err = v.Compare(context.Background(), TestHash, []byte("baddata"))
        if err != CollisionError {
                t.Errorf("Got err %q, expected %q", err, CollisionError)
        }
 
-       v.Put(TestHash, []byte("baddata"))
-       err = v.Compare(TestHash, TestBlock)
+       v.Put(context.Background(), TestHash, []byte("baddata"))
+       err = v.Compare(context.Background(), TestHash, TestBlock)
        if err != DiskHashError {
                t.Errorf("Got err %q, expected %q", err, DiskHashError)
        }
 
        p := fmt.Sprintf("%s/%s/%s", v.Root, TestHash[:3], TestHash)
        os.Chmod(p, 000)
-       err = v.Compare(TestHash, TestBlock)
+       err = v.Compare(context.Background(), TestHash, TestBlock)
        if err == nil || strings.Index(err.Error(), "permission denied") < 0 {
                t.Errorf("Got err %q, expected %q", err, "permission denied")
        }
index d74b6d4c079b42f97b2242966b00aa0c086c31cf..474913170a624a618ab82bad57dd74c4ca790cf3 100755 (executable)
@@ -267,13 +267,14 @@ build() {
         echo "Could not find Dockerfile (expected it at $ARVBOX_DOCKER/Dockerfile.base)"
         exit 1
     fi
-    GITHEAD=$(cd $ARVBOX_DOCKER && git log --format=%H -n1 HEAD)
-    docker build --build-arg=arvados_version=$GITHEAD $NO_CACHE -t arvados/arvbox-base:$GITHEAD -f "$ARVBOX_DOCKER/Dockerfile.base" "$ARVBOX_DOCKER"
     if docker --version |grep " 1\.[0-9]\." ; then
         # Docker version prior 1.10 require -f flag
         # -f flag removed in Docker 1.12
         FORCE=-f
     fi
+    GITHEAD=$(cd $ARVBOX_DOCKER && git log --format=%H -n1 HEAD)
+    docker build --build-arg=arvados_version=$GITHEAD $NO_CACHE -t arvados/arvbox-base:$GITHEAD -f "$ARVBOX_DOCKER/Dockerfile.base" "$ARVBOX_DOCKER"
+    docker tag $FORCE arvados/arvbox-base:$GITHEAD arvados/arvbox-base:latest
     if test "$1" = localdemo -o "$1" = publicdemo ; then
         docker build $NO_CACHE -t arvados/arvbox-demo:$GITHEAD -f "$ARVBOX_DOCKER/Dockerfile.demo" "$ARVBOX_DOCKER"
         docker tag $FORCE arvados/arvbox-demo:$GITHEAD arvados/arvbox-demo:latest
index 9efe1164c3c44a7b24d1c9b14316b34eeccc883d..0967c7950cfd43ee5dae538b7863c317299c112b 100755 (executable)
@@ -24,6 +24,7 @@ export CRUNCH_TMP=/tmp/$1
 export CRUNCH_DISPATCH_LOCKFILE=/var/lock/$1-dispatch
 export CRUNCH_JOB_DOCKER_BIN=docker
 export HOME=/tmp/$1
+export CRUNCH_JOB_DOCKER_RUN_ARGS=--net=host
 
 cd /usr/src/arvados/services/api
 if test "$1" = "crunch0" ; then
index 9dc8f9425a8e4707bc4538842911511928428095..6d791bf9876a5b84a2b1b642025b730771f76da2 100644 (file)
@@ -47,7 +47,7 @@ func main() {
        if err != nil {
                log.Fatal(err)
        }
-       kc, err := keepclient.MakeKeepClient(&arv)
+       kc, err := keepclient.MakeKeepClient(arv)
        if err != nil {
                log.Fatal(err)
        }
@@ -56,11 +56,11 @@ func main() {
 
        overrideServices(kc)
 
-       nextBuf := make(chan []byte, *WriteThreads)
        nextLocator := make(chan string, *ReadThreads+*WriteThreads)
 
        go countBeans(nextLocator)
        for i := 0; i < *WriteThreads; i++ {
+               nextBuf := make(chan []byte, 1)
                go makeBufs(nextBuf, i)
                go doWrites(kc, nextBuf, nextLocator)
        }
@@ -106,23 +106,28 @@ func countBeans(nextLocator chan string) {
        }
 }
 
-func makeBufs(nextBuf chan []byte, threadID int) {
+func makeBufs(nextBuf chan<- []byte, threadID int) {
        buf := make([]byte, *BlockSize)
        if *VaryThread {
                binary.PutVarint(buf, int64(threadID))
        }
+       randSize := 524288
+       if randSize > *BlockSize {
+               randSize = *BlockSize
+       }
        for {
                if *VaryRequest {
-                       buf = make([]byte, *BlockSize)
-                       if _, err := io.ReadFull(rand.Reader, buf); err != nil {
+                       rnd := make([]byte, randSize)
+                       if _, err := io.ReadFull(rand.Reader, rnd); err != nil {
                                log.Fatal(err)
                        }
+                       buf = append(rnd, buf[randSize:]...)
                }
                nextBuf <- buf
        }
 }
 
-func doWrites(kc *keepclient.KeepClient, nextBuf chan []byte, nextLocator chan string) {
+func doWrites(kc *keepclient.KeepClient, nextBuf <-chan []byte, nextLocator chan<- string) {
        for buf := range nextBuf {
                locator, _, err := kc.PutB(buf)
                if err != nil {
@@ -139,7 +144,7 @@ func doWrites(kc *keepclient.KeepClient, nextBuf chan []byte, nextLocator chan s
        }
 }
 
-func doReads(kc *keepclient.KeepClient, nextLocator chan string) {
+func doReads(kc *keepclient.KeepClient, nextLocator <-chan string) {
        for locator := range nextLocator {
                rdr, size, url, err := kc.Get(locator)
                if err != nil {