closes #8464
authorradhika <radhika@curoverse.com>
Tue, 10 May 2016 17:23:28 +0000 (13:23 -0400)
committerradhika <radhika@curoverse.com>
Tue, 10 May 2016 17:23:28 +0000 (13:23 -0400)
Merge branch '8464-crunch2-stdout'

25 files changed:
apps/workbench/Gemfile.lock
build/run-tests.sh
services/api/Gemfile
services/api/Gemfile.lock
services/api/test/integration/collections_performance_test.rb
services/api/test/integration/database_reset_test.rb
services/api/test/integration/websocket_test.rb
services/api/test/test_helper.rb
services/api/test/unit/collection_performance_test.rb
services/keepstore/azure_blob_volume.go
services/keepstore/azure_blob_volume_test.go
services/keepstore/handler_test.go
services/keepstore/handlers.go
services/keepstore/handlers_with_generic_volume_test.go
services/keepstore/keepstore.go
services/keepstore/keepstore_test.go
services/keepstore/logging_router.go
services/keepstore/logging_router_test.go [new file with mode: 0644]
services/keepstore/s3_volume.go
services/keepstore/trash_worker_test.go
services/keepstore/volume.go
services/keepstore/volume_generic_test.go
services/keepstore/volume_test.go
services/keepstore/volume_unix.go
services/keepstore/volume_unix_test.go

index b4e2400beda11a9186dd5f4c04468638f237a517..fdcd375ed2081b0d1a770900c3d0dde5ccae67d2 100644 (file)
@@ -37,16 +37,17 @@ GEM
       minitest (~> 5.1)
       thread_safe (~> 0.1)
       tzinfo (~> 1.1)
-    addressable (2.3.6)
+    addressable (2.4.0)
     andand (1.3.3)
     angularjs-rails (1.3.8)
     arel (5.0.1.20140414130214)
-    arvados (0.1.20150511150219)
-      activesupport (>= 3.2.13)
+    arvados (0.1.20160420143004)
+      activesupport (>= 3, < 4.2.6)
       andand (~> 1.3, >= 1.3.3)
-      google-api-client (~> 0.6.3, >= 0.6.3)
+      google-api-client (>= 0.7, < 0.9)
+      i18n (~> 0)
       json (~> 1.7, >= 1.7.7)
-      jwt (>= 0.1.5, < 1.0.0)
+      jwt (>= 0.1.5, < 2)
     autoparse (0.3.3)
       addressable (>= 2.3.1)
       extlib (>= 0.9.15)
@@ -93,24 +94,33 @@ GEM
     erubis (2.7.0)
     execjs (2.2.2)
     extlib (0.9.16)
-    faraday (0.8.9)
-      multipart-post (~> 1.2.0)
+    faraday (0.9.2)
+      multipart-post (>= 1.2, < 3)
     fast_stack (0.1.0)
       rake
       rake-compiler
     ffi (1.9.10)
     flamegraph (0.1.0)
       fast_stack
-    google-api-client (0.6.4)
-      addressable (>= 2.3.2)
-      autoparse (>= 0.3.3)
-      extlib (>= 0.9.15)
-      faraday (~> 0.8.4)
-      jwt (>= 0.1.5)
-      launchy (>= 2.1.1)
-      multi_json (>= 1.0.0)
-      signet (~> 0.4.5)
-      uuidtools (>= 2.1.0)
+    google-api-client (0.8.6)
+      activesupport (>= 3.2)
+      addressable (~> 2.3)
+      autoparse (~> 0.3)
+      extlib (~> 0.9)
+      faraday (~> 0.9)
+      googleauth (~> 0.3)
+      launchy (~> 2.4)
+      multi_json (~> 1.10)
+      retriable (~> 1.4)
+      signet (~> 0.6)
+    googleauth (0.5.1)
+      faraday (~> 0.9)
+      jwt (~> 1.4)
+      logging (~> 2.0)
+      memoist (~> 0.12)
+      multi_json (~> 1.11)
+      os (~> 0.9)
+      signet (~> 0.7)
     headless (1.0.2)
     highline (1.6.21)
     httpclient (2.6.0.1)
@@ -119,8 +129,7 @@ GEM
       railties (>= 3.0, < 5.0)
       thor (>= 0.14, < 2.0)
     json (1.8.3)
-    jwt (0.1.13)
-      multi_json (>= 1.5)
+    jwt (1.5.4)
     launchy (2.4.3)
       addressable (~> 2.3)
     less (2.6.0)
@@ -129,18 +138,23 @@ GEM
       actionpack (>= 3.1)
       less (~> 2.6.0)
     libv8 (3.16.14.7)
+    little-plugger (1.1.4)
+    logging (2.1.0)
+      little-plugger (~> 1.1)
+      multi_json (~> 1.10)
     mail (2.6.3)
       mime-types (>= 1.16, < 3)
+    memoist (0.14.0)
     metaclass (0.0.4)
     mime-types (2.99)
     mini_portile (0.6.2)
-    minitest (5.7.0)
+    minitest (5.8.4)
     mocha (1.1.0)
       metaclass (~> 0.0.1)
     morrisjs-rails (0.5.1)
       railties (> 3.1, < 5)
-    multi_json (1.11.2)
-    multipart-post (1.2.0)
+    multi_json (1.12.0)
+    multipart-post (2.0.0)
     net-scp (1.2.1)
       net-ssh (>= 2.6.5)
     net-sftp (2.1.2)
@@ -151,6 +165,7 @@ GEM
     nokogiri (1.6.6.4)
       mini_portile (~> 0.6.0)
     oj (2.11.2)
+    os (0.9.6)
     passenger (4.0.57)
       daemon_controller (>= 1.2.0)
       rack
@@ -190,6 +205,7 @@ GEM
       rake
     raphael-rails (2.1.2)
     ref (1.0.5)
+    retriable (1.4.1)
     ruby-debug-passenger (0.2.0)
     ruby-prof (0.15.2)
     rubyzip (1.1.7)
@@ -207,11 +223,11 @@ GEM
       multi_json (~> 1.0)
       rubyzip (~> 1.0)
       websocket (~> 1.0)
-    signet (0.4.5)
-      addressable (>= 2.2.3)
-      faraday (~> 0.8.1)
-      jwt (>= 0.1.5)
-      multi_json (>= 1.0.0)
+    signet (0.7.2)
+      addressable (~> 2.3)
+      faraday (~> 0.9)
+      jwt (~> 1.5)
+      multi_json (~> 1.10)
     simplecov (0.9.1)
       docile (~> 1.1.0)
       multi_json (~> 1.0)
@@ -238,7 +254,6 @@ GEM
     uglifier (2.7.0)
       execjs (>= 0.3.0)
       json (>= 1.8.0)
-    uuidtools (2.1.5)
     websocket (1.2.2)
     websocket-driver (0.5.1)
       websocket-extensions (>= 0.1.0)
@@ -294,3 +309,6 @@ DEPENDENCIES
   therubyracer
   uglifier (>= 1.0.3)
   wiselinks
+
+BUNDLED WITH
+   1.12.1
index c94f831a36a5e41d69d524c6eb723ade34ca6549..d22934199f651840a65ed03255b6e4d03332d181 100755 (executable)
@@ -494,22 +494,23 @@ do_test_once() {
             # before trying "go test". Otherwise, coverage-reporting
             # mode makes Go show the wrong line numbers when reporting
             # compilation errors.
+            go get -t "git.curoverse.com/arvados.git/$1" || return 1
             if [[ -n "${testargs[$1]}" ]]
             then
                 # "go test -check.vv giturl" doesn't work, but this
                 # does:
-                cd "$WORKSPACE/$1" && \
-                    go get -t "git.curoverse.com/arvados.git/$1" && \
-                    go test ${short:+-short} ${coverflags[@]} ${testargs[$1]}
+                cd "$WORKSPACE/$1" && go test ${short:+-short} ${coverflags[@]} ${testargs[$1]}
             else
                 # The above form gets verbose even when testargs is
                 # empty, so use this form in such cases:
-                go get -t "git.curoverse.com/arvados.git/$1" && \
-                    go test ${short:+-short} ${coverflags[@]} "git.curoverse.com/arvados.git/$1"
+                go test ${short:+-short} ${coverflags[@]} "git.curoverse.com/arvados.git/$1"
             fi
             result="$?"
-            go tool cover -html="$WORKSPACE/tmp/.$covername.tmp" -o "$WORKSPACE/tmp/$covername.html"
-            rm "$WORKSPACE/tmp/.$covername.tmp"
+            if [[ -f "$WORKSPACE/tmp/.$covername.tmp" ]]
+            then
+                go tool cover -html="$WORKSPACE/tmp/.$covername.tmp" -o "$WORKSPACE/tmp/$covername.html"
+                rm "$WORKSPACE/tmp/.$covername.tmp"
+            fi
         elif [[ "$2" == "pip" ]]
         then
             # $3 can name a path directory for us to use, including trailing
@@ -753,7 +754,7 @@ stop_services
 test_apiserver() {
     rm -f "$WORKSPACE/services/api/git-commit.version"
     cd "$WORKSPACE/services/api" \
-        && RAILS_ENV=test bundle exec rake test TESTOPTS=-v ${testargs[services/api]}
+        && env RAILS_ENV=test ${short:+RAILS_TEST_SHORT=1} bundle exec rake test TESTOPTS=-v ${testargs[services/api]}
 }
 do_test services/api apiserver
 
@@ -799,21 +800,21 @@ done
 test_workbench() {
     start_nginx_proxy_services \
         && cd "$WORKSPACE/apps/workbench" \
-        && RAILS_ENV=test bundle exec rake test TESTOPTS=-v ${testargs[apps/workbench]}
+        && env RAILS_ENV=test ${short:+RAILS_TEST_SHORT=1} bundle exec rake test TESTOPTS=-v ${testargs[apps/workbench]}
 }
 do_test apps/workbench workbench
 
 test_workbench_benchmark() {
     start_nginx_proxy_services \
         && cd "$WORKSPACE/apps/workbench" \
-        && RAILS_ENV=test bundle exec rake test:benchmark ${testargs[apps/workbench_benchmark]}
+        && env RAILS_ENV=test ${short:+RAILS_TEST_SHORT=1} bundle exec rake test:benchmark ${testargs[apps/workbench_benchmark]}
 }
 do_test apps/workbench_benchmark workbench_benchmark
 
 test_workbench_profile() {
     start_nginx_proxy_services \
         && cd "$WORKSPACE/apps/workbench" \
-        && RAILS_ENV=test bundle exec rake test:profile ${testargs[apps/workbench_profile]}
+        && env RAILS_ENV=test ${short:+RAILS_TEST_SHORT=1} bundle exec rake test:profile ${testargs[apps/workbench_profile]}
 }
 do_test apps/workbench_profile workbench_profile
 
index 48998aad36d10f7630f24e574a4d278968dc7032..1e25467e7485ed05337e2b212f4cf4fc4bc44d1e 100644 (file)
@@ -67,7 +67,6 @@ gem 'andand'
 
 gem 'test_after_commit', :group => :test
 
-gem 'google-api-client', '~> 0.6.3'
 gem 'trollop'
 gem 'faye-websocket'
 
index ac6be5a522303fd7418ba69974165e3b2821e17f..7be4e0f39d5df3253944f3884008980b9076d58a 100644 (file)
@@ -32,23 +32,23 @@ GEM
       activemodel (>= 3.0.0)
       activesupport (>= 3.0.0)
       rack (>= 1.1.0)
-    addressable (2.3.8)
+    addressable (2.4.0)
     andand (1.3.3)
     arel (3.0.3)
-    arvados (0.1.20150615153458)
-      activesupport (>= 3.2.13)
+    arvados (0.1.20160420143004)
+      activesupport (>= 3, < 4.2.6)
       andand (~> 1.3, >= 1.3.3)
-      google-api-client (~> 0.6.3, >= 0.6.3)
+      google-api-client (>= 0.7, < 0.9)
+      i18n (~> 0)
       json (~> 1.7, >= 1.7.7)
-      jwt (>= 0.1.5, < 1.0.0)
-    arvados-cli (0.1.20151207150126)
+      jwt (>= 0.1.5, < 2)
+    arvados-cli (0.1.20160503204200)
       activesupport (~> 3.2, >= 3.2.13)
       andand (~> 1.3, >= 1.3.3)
       arvados (~> 0.1, >= 0.1.20150128223554)
       curb (~> 0.8)
-      google-api-client (~> 0.6.3, >= 0.6.3)
+      google-api-client (~> 0.6, >= 0.6.3, < 0.9)
       json (~> 1.7, >= 1.7.7)
-      jwt (>= 0.1.5, < 1.0.0)
       oj (~> 2.0, >= 2.0.3)
       trollop (~> 2.0)
     autoparse (0.3.3)
@@ -69,7 +69,7 @@ GEM
       coffee-script-source
       execjs
     coffee-script-source (1.7.0)
-    curb (0.8.8)
+    curb (0.9.3)
     daemon_controller (1.2.0)
     database_cleaner (1.2.0)
     erubis (2.7.0)
@@ -81,20 +81,21 @@ GEM
     factory_girl_rails (4.4.1)
       factory_girl (~> 4.4.0)
       railties (>= 3.0.0)
-    faraday (0.8.9)
-      multipart-post (~> 1.2.0)
+    faraday (0.9.2)
+      multipart-post (>= 1.2, < 3)
     faye-websocket (0.7.2)
       eventmachine (>= 0.12.0)
       websocket-driver (>= 0.3.1)
-    google-api-client (0.6.4)
+    google-api-client (0.7.1)
       addressable (>= 2.3.2)
       autoparse (>= 0.3.3)
       extlib (>= 0.9.15)
-      faraday (~> 0.8.4)
+      faraday (>= 0.9.0)
       jwt (>= 0.1.5)
       launchy (>= 2.1.1)
       multi_json (>= 1.0.0)
-      signet (~> 0.4.5)
+      retriable (>= 1.4)
+      signet (>= 0.5.0)
       uuidtools (>= 2.1.0)
     hashie (1.2.0)
     highline (1.6.21)
@@ -118,8 +119,8 @@ GEM
     mime-types (1.25.1)
     mocha (1.1.0)
       metaclass (~> 0.0.1)
-    multi_json (1.11.1)
-    multipart-post (1.2.0)
+    multi_json (1.12.0)
+    multipart-post (2.0.0)
     net-scp (1.2.0)
       net-ssh (>= 2.6.5)
     net-sftp (2.1.2)
@@ -133,7 +134,7 @@ GEM
       jwt (~> 0.1.4)
       multi_json (~> 1.0)
       rack (~> 1.2)
-    oj (2.11.4)
+    oj (2.15.0)
     omniauth (1.1.1)
       hashie (~> 1.2)
       rack
@@ -177,6 +178,7 @@ GEM
     rdoc (3.12.2)
       json (~> 1.4)
     ref (1.0.5)
+    retriable (2.1.0)
     ruby-prof (0.15.2)
     rvm-capistrano (1.5.1)
       capistrano (~> 2.15.4)
@@ -185,9 +187,9 @@ GEM
       railties (~> 3.2.0)
       sass (>= 3.1.10)
       tilt (~> 1.3)
-    signet (0.4.5)
+    signet (0.5.1)
       addressable (>= 2.2.3)
-      faraday (~> 0.8.1)
+      faraday (>= 0.9.0.rc5)
       jwt (>= 0.1.5)
       multi_json (>= 1.0.0)
     simplecov (0.7.1)
@@ -213,7 +215,7 @@ GEM
     treetop (1.4.15)
       polyglot
       polyglot (>= 0.3.1)
-    trollop (2.1.1)
+    trollop (2.1.2)
     tzinfo (0.3.39)
     uglifier (2.5.0)
       execjs (>= 0.3.0)
@@ -233,7 +235,6 @@ DEPENDENCIES
   database_cleaner
   factory_girl_rails
   faye-websocket
-  google-api-client (~> 0.6.3)
   jquery-rails
   mocha
   multi_json
@@ -258,4 +259,4 @@ DEPENDENCIES
   uglifier (>= 1.0.3)
 
 BUNDLED WITH
-   1.10.6
+   1.12.1
index 77a26e5b21af35c401fa8ac301a70db9fdd2a22e..a952c202cb7dbadf73fae734ca0141d000ac5cde 100644 (file)
@@ -6,6 +6,7 @@ class CollectionsApiPerformanceTest < ActionDispatch::IntegrationTest
   include ManifestExamples
 
   test "crud cycle for a collection with a big manifest" do
+    slow_test
     bigmanifest = time_block 'make example' do
       make_manifest(streams: 100,
                     files_per_stream: 100,
@@ -39,16 +40,17 @@ class CollectionsApiPerformanceTest < ActionDispatch::IntegrationTest
   end
 
   test "memory usage" do
-     hugemanifest = make_manifest(streams: 1,
-                                  files_per_stream: 2000,
-                                  blocks_per_file: 200,
-                                  bytes_per_block: 2**26,
-                                  api_token: api_token(:active))
+    slow_test
+    hugemanifest = make_manifest(streams: 1,
+                                 files_per_stream: 2000,
+                                 blocks_per_file: 200,
+                                 bytes_per_block: 2**26,
+                                 api_token: api_token(:active))
     json = time_block "JSON encode #{hugemanifest.length>>20}MiB manifest" do
       Oj.dump({manifest_text: hugemanifest})
     end
-     vmpeak "post" do
-       post '/arvados/v1/collections', {collection: json}, auth(:active)
-     end
+    vmpeak "post" do
+      post '/arvados/v1/collections', {collection: json}, auth(:active)
+    end
   end
 end
index 58f2abf69709d62f63989aca05ee5cb4a26e869c..ecb2f2a05831a44a7798fd98d048a821878fd11a 100644 (file)
@@ -4,6 +4,7 @@ class DatabaseResetTest < ActionDispatch::IntegrationTest
   self.use_transactional_fixtures = false
 
   test "reset fails when Rails.env != 'test'" do
+    slow_test
     rails_env_was = Rails.env
     begin
       Rails.env = 'production'
@@ -22,6 +23,7 @@ class DatabaseResetTest < ActionDispatch::IntegrationTest
   end
 
   test "database reset doesn't break basic CRUD operations" do
+    slow_test
     active_auth = auth(:active)
     admin_auth = auth(:admin)
 
@@ -48,6 +50,7 @@ class DatabaseResetTest < ActionDispatch::IntegrationTest
   end
 
   test "roll back database change" do
+    slow_test
     active_auth = auth(:active)
     admin_auth = auth(:admin)
 
index 313a22d8f883202361aa12a3502dcf3f496a0764..d1b8c34a43be5df0736475996b378002e87b6baa 100644 (file)
@@ -323,6 +323,7 @@ class WebsocketTest < ActionDispatch::IntegrationTest
   end
 
   test "connect, subscribe, get event, unsubscribe" do
+    slow_test
     state = 1
     spec = nil
     spec_ev_uuid = nil
@@ -372,6 +373,7 @@ class WebsocketTest < ActionDispatch::IntegrationTest
   end
 
   test "connect, subscribe, get event, unsubscribe with filter" do
+    slow_test
     state = 1
     spec = nil
     spec_ev_uuid = nil
@@ -421,6 +423,7 @@ class WebsocketTest < ActionDispatch::IntegrationTest
 
 
   test "connect, subscribe, get event, try to unsubscribe with bogus filter" do
+    slow_test
     state = 1
     spec = nil
     spec_ev_uuid = nil
@@ -473,6 +476,7 @@ class WebsocketTest < ActionDispatch::IntegrationTest
 
 
   test "connected, not subscribed, no event" do
+    slow_test
     authorize_with :admin
 
     ws_helper :admin, false do |ws|
@@ -493,6 +497,7 @@ class WebsocketTest < ActionDispatch::IntegrationTest
   end
 
   test "connected, not authorized to see event" do
+    slow_test
     state = 1
 
     authorize_with :admin
@@ -608,6 +613,7 @@ class WebsocketTest < ActionDispatch::IntegrationTest
   end
 
   test "connect, subscribe, lots of events" do
+    slow_test
     state = 1
     event_count = 0
     log_start = Log.order(:id).last.id
index 881a0807720f8fdacb1c095a9b12e7e5d8dbefb5..25ab286a23099f4125aaf1cce65a24450804e051 100644 (file)
@@ -106,6 +106,10 @@ class ActiveSupport::TestCase
     ArvadosApiToken.new.call("rack.input" => "",
                              "HTTP_AUTHORIZATION" => "OAuth2 #{t}")
   end
+
+  def slow_test
+    skip "RAILS_TEST_SHORT is set" unless (ENV['RAILS_TEST_SHORT'] || '').empty?
+  end
 end
 
 class ActionController::TestCase
index 37da5fddde9da8a0d0e34f7d292394c3ffe4cf87..1c6e4f2db2c0dfafcde3d7fba519fe8e4431cf6b 100644 (file)
@@ -18,6 +18,7 @@ class CollectionModelPerformanceTest < ActiveSupport::TestCase
 
   # "crrud" == "create read render update delete", not a typo
   test "crrud cycle for a collection with a big manifest)" do
+    slow_test
     bigmanifest = time_block 'make example' do
       make_manifest(streams: 100,
                     files_per_stream: 100,
index f08cebff63c65dc5cbbd941407602c262779708a..9c0374351b075d03e2bd0469c4b813c373308e87 100644 (file)
@@ -139,11 +139,11 @@ func (v *AzureBlobVolume) Check() error {
 // If the block is younger than azureWriteRaceInterval and is
 // unexpectedly empty, assume a PutBlob operation is in progress, and
 // wait for it to finish writing.
-func (v *AzureBlobVolume) Get(loc string) ([]byte, error) {
+func (v *AzureBlobVolume) Get(loc string, buf []byte) (int, error) {
        var deadline time.Time
        haveDeadline := false
-       buf, err := v.get(loc)
-       for err == nil && len(buf) == 0 && loc != "d41d8cd98f00b204e9800998ecf8427e" {
+       size, err := v.get(loc, buf)
+       for err == nil && size == 0 && loc != "d41d8cd98f00b204e9800998ecf8427e" {
                // Seeing a brand new empty block probably means we're
                // in a race with CreateBlob, which under the hood
                // (apparently) does "CreateEmpty" and "CommitData"
@@ -163,34 +163,32 @@ func (v *AzureBlobVolume) Get(loc string) ([]byte, error) {
                } else if time.Now().After(deadline) {
                        break
                }
-               bufs.Put(buf)
                time.Sleep(azureWriteRacePollTime)
-               buf, err = v.get(loc)
+               size, err = v.get(loc, buf)
        }
        if haveDeadline {
-               log.Printf("Race ended with len(buf)==%d", len(buf))
+               log.Printf("Race ended with size==%d", size)
        }
-       return buf, err
+       return size, err
 }
 
-func (v *AzureBlobVolume) get(loc string) ([]byte, error) {
-       expectSize := BlockSize
+func (v *AzureBlobVolume) get(loc string, buf []byte) (int, error) {
+       expectSize := len(buf)
        if azureMaxGetBytes < BlockSize {
                // Unfortunately the handler doesn't tell us how long the blob
                // is expected to be, so we have to ask Azure.
                props, err := v.bsClient.GetBlobProperties(v.containerName, loc)
                if err != nil {
-                       return nil, v.translateError(err)
+                       return 0, v.translateError(err)
                }
                if props.ContentLength > int64(BlockSize) || props.ContentLength < 0 {
-                       return nil, fmt.Errorf("block %s invalid size %d (max %d)", loc, props.ContentLength, BlockSize)
+                       return 0, fmt.Errorf("block %s invalid size %d (max %d)", loc, props.ContentLength, BlockSize)
                }
                expectSize = int(props.ContentLength)
        }
 
-       buf := bufs.Get(expectSize)
        if expectSize == 0 {
-               return buf, nil
+               return 0, nil
        }
 
        // We'll update this actualSize if/when we get the last piece.
@@ -212,7 +210,7 @@ func (v *AzureBlobVolume) get(loc string) ([]byte, error) {
                        if startPos == 0 && endPos == expectSize {
                                rdr, err = v.bsClient.GetBlob(v.containerName, loc)
                        } else {
-                               rdr, err = v.bsClient.GetBlobRange(v.containerName, loc, fmt.Sprintf("%d-%d", startPos, endPos-1))
+                               rdr, err = v.bsClient.GetBlobRange(v.containerName, loc, fmt.Sprintf("%d-%d", startPos, endPos-1), nil)
                        }
                        if err != nil {
                                errors[p] = err
@@ -235,11 +233,10 @@ func (v *AzureBlobVolume) get(loc string) ([]byte, error) {
        wg.Wait()
        for _, err := range errors {
                if err != nil {
-                       bufs.Put(buf)
-                       return nil, v.translateError(err)
+                       return 0, v.translateError(err)
                }
        }
-       return buf[:actualSize], nil
+       return actualSize, nil
 }
 
 // Compare the given data with existing stored data.
index 439b40221465ada53c805c7b7afb47ba974652a9..e3c0e27083245f2d7cbead33f915f9365774cc02 100644 (file)
@@ -425,13 +425,12 @@ func TestAzureBlobVolumeRangeFenceposts(t *testing.T) {
                if err != nil {
                        t.Error(err)
                }
-               gotData, err := v.Get(hash)
+               gotData := make([]byte, len(data))
+               gotLen, err := v.Get(hash, gotData)
                if err != nil {
                        t.Error(err)
                }
                gotHash := fmt.Sprintf("%x", md5.Sum(gotData))
-               gotLen := len(gotData)
-               bufs.Put(gotData)
                if gotLen != size {
                        t.Error("length mismatch: got %d != %d", gotLen, size)
                }
@@ -477,11 +476,10 @@ func TestAzureBlobVolumeCreateBlobRace(t *testing.T) {
        // Wait for the stub's Put to create the empty blob
        v.azHandler.race <- continuePut
        go func() {
-               buf, err := v.Get(TestHash)
+               buf := make([]byte, len(TestBlock))
+               _, err := v.Get(TestHash, buf)
                if err != nil {
                        t.Error(err)
-               } else {
-                       bufs.Put(buf)
                }
                close(allDone)
        }()
@@ -521,15 +519,15 @@ func TestAzureBlobVolumeCreateBlobRaceDeadline(t *testing.T) {
        allDone := make(chan struct{})
        go func() {
                defer close(allDone)
-               buf, err := v.Get(TestHash)
+               buf := make([]byte, BlockSize)
+               n, err := v.Get(TestHash, buf)
                if err != nil {
                        t.Error(err)
                        return
                }
-               if len(buf) != 0 {
-                       t.Errorf("Got %+q, expected empty buf", buf)
+               if n != 0 {
+                       t.Errorf("Got %+q, expected empty buf", buf[:n])
                }
-               bufs.Put(buf)
        }()
        select {
        case <-allDone:
index 33d585ae1e69f18868f7eb57278637c7f855761b..7c17424ba568227790469e4e32867f33fea8ff4e 100644 (file)
@@ -561,7 +561,8 @@ func TestDeleteHandler(t *testing.T) {
                        expectedDc, responseDc)
        }
        // Confirm the block has been deleted
-       _, err := vols[0].Get(TestHash)
+       buf := make([]byte, BlockSize)
+       _, err := vols[0].Get(TestHash, buf)
        var blockDeleted = os.IsNotExist(err)
        if !blockDeleted {
                t.Error("superuserExistingBlockReq: block not deleted")
@@ -585,7 +586,7 @@ func TestDeleteHandler(t *testing.T) {
                        expectedDc, responseDc)
        }
        // Confirm the block has NOT been deleted.
-       _, err = vols[0].Get(TestHash)
+       _, err = vols[0].Get(TestHash, buf)
        if err != nil {
                t.Errorf("testing delete on new block: %s\n", err)
        }
@@ -913,6 +914,65 @@ func TestPutHandlerNoBufferleak(t *testing.T) {
        }
 }
 
+type notifyingResponseRecorder struct {
+       *httptest.ResponseRecorder
+       closer chan bool
+}
+
+func (r *notifyingResponseRecorder) CloseNotify() <-chan bool {
+       return r.closer
+}
+
+func TestGetHandlerClientDisconnect(t *testing.T) {
+       defer func(was bool) {
+               enforcePermissions = was
+       }(enforcePermissions)
+       enforcePermissions = false
+
+       defer func(orig *bufferPool) {
+               bufs = orig
+       }(bufs)
+       bufs = newBufferPool(1, BlockSize)
+       defer bufs.Put(bufs.Get(BlockSize))
+
+       KeepVM = MakeTestVolumeManager(2)
+       defer KeepVM.Close()
+
+       if err := KeepVM.AllWritable()[0].Put(TestHash, TestBlock); err != nil {
+               t.Error(err)
+       }
+
+       resp := &notifyingResponseRecorder{
+               ResponseRecorder: httptest.NewRecorder(),
+               closer:           make(chan bool, 1),
+       }
+       if _, ok := http.ResponseWriter(resp).(http.CloseNotifier); !ok {
+               t.Fatal("notifyingResponseRecorder is broken")
+       }
+       // If anyone asks, the client has disconnected.
+       resp.closer <- true
+
+       ok := make(chan struct{})
+       go func() {
+               req, _ := http.NewRequest("GET", fmt.Sprintf("/%s+%d", TestHash, len(TestBlock)), nil)
+               (&LoggingRESTRouter{MakeRESTRouter()}).ServeHTTP(resp, req)
+               ok <- struct{}{}
+       }()
+
+       select {
+       case <-time.After(20 * time.Second):
+               t.Fatal("request took >20s, close notifier must be broken")
+       case <-ok:
+       }
+
+       ExpectStatusCode(t, "client disconnect", http.StatusServiceUnavailable, resp.ResponseRecorder)
+       for i, v := range KeepVM.AllWritable() {
+               if calls := v.(*MockVolume).called["GET"]; calls != 0 {
+                       t.Errorf("volume %d got %d calls, expected 0", i, calls)
+               }
+       }
+}
+
 // Invoke the GetBlockHandler a bunch of times to test for bufferpool resource
 // leak.
 func TestGetHandlerNoBufferleak(t *testing.T) {
index 043ab69b17c255aa463fe8259a777cec682453f5..f698982415aae5bd7d8a341428acb2d8bdb57317 100644 (file)
@@ -79,18 +79,61 @@ func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
                }
        }
 
-       block, err := GetBlock(mux.Vars(req)["hash"])
+       // TODO: Probe volumes to check whether the block _might_
+       // exist. Some volumes/types could support a quick existence
+       // check without causing other operations to suffer. If all
+       // volumes support that, and assure us the block definitely
+       // isn't here, we can return 404 now instead of waiting for a
+       // buffer.
+
+       buf, err := getBufferForResponseWriter(resp, bufs, BlockSize)
        if err != nil {
-               // This type assertion is safe because the only errors
-               // GetBlock can return are DiskHashError or NotFoundError.
-               http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
+               http.Error(resp, err.Error(), http.StatusServiceUnavailable)
                return
        }
-       defer bufs.Put(block)
+       defer bufs.Put(buf)
 
-       resp.Header().Set("Content-Length", strconv.Itoa(len(block)))
+       size, err := GetBlock(mux.Vars(req)["hash"], buf, resp)
+       if err != nil {
+               code := http.StatusInternalServerError
+               if err, ok := err.(*KeepError); ok {
+                       code = err.HTTPCode
+               }
+               http.Error(resp, err.Error(), code)
+               return
+       }
+
+       resp.Header().Set("Content-Length", strconv.Itoa(size))
        resp.Header().Set("Content-Type", "application/octet-stream")
-       resp.Write(block)
+       resp.Write(buf[:size])
+}
+
+// Get a buffer from the pool -- but give up and return a non-nil
+// error if resp implements http.CloseNotifier and tells us that the
+// client has disconnected before we get a buffer.
+func getBufferForResponseWriter(resp http.ResponseWriter, bufs *bufferPool, bufSize int) ([]byte, error) {
+       var closeNotifier <-chan bool
+       if resp, ok := resp.(http.CloseNotifier); ok {
+               closeNotifier = resp.CloseNotify()
+       }
+       var buf []byte
+       bufReady := make(chan []byte)
+       go func() {
+               bufReady <- bufs.Get(bufSize)
+               close(bufReady)
+       }()
+       select {
+       case buf = <-bufReady:
+               return buf, nil
+       case <-closeNotifier:
+               go func() {
+                       // Even if closeNotifier happened first, we
+                       // need to keep waiting for our buf so we can
+                       // return it to the pool.
+                       bufs.Put(<-bufReady)
+               }()
+               return nil, ErrClientDisconnect
+       }
 }
 
 // PutBlockHandler is a HandleFunc to address Put block requests.
@@ -116,8 +159,13 @@ func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
                return
        }
 
-       buf := bufs.Get(int(req.ContentLength))
-       _, err := io.ReadFull(req.Body, buf)
+       buf, err := getBufferForResponseWriter(resp, bufs, int(req.ContentLength))
+       if err != nil {
+               http.Error(resp, err.Error(), http.StatusServiceUnavailable)
+               return
+       }
+
+       _, err = io.ReadFull(req.Body, buf)
        if err != nil {
                http.Error(resp, err.Error(), 500)
                bufs.Put(buf)
@@ -481,7 +529,6 @@ func UntrashHandler(resp http.ResponseWriter, req *http.Request) {
        }
 }
 
-// ==============================
 // GetBlock and PutBlock implement lower-level code for handling
 // blocks by rooting through volumes connected to the local machine.
 // Once the handler has determined that system policy permits the
@@ -492,24 +539,21 @@ func UntrashHandler(resp http.ResponseWriter, req *http.Request) {
 // should be the only part of the code that cares about which volume a
 // block is stored on, so it should be responsible for figuring out
 // which volume to check for fetching blocks, storing blocks, etc.
-// ==============================
 
-// GetBlock fetches and returns the block identified by "hash".
-//
-// On success, GetBlock returns a byte slice with the block data, and
-// a nil error.
+// GetBlock fetches the block identified by "hash" into the provided
+// buf, and returns the data size.
 //
 // If the block cannot be found on any volume, returns NotFoundError.
 //
 // If the block found does not have the correct MD5 hash, returns
 // DiskHashError.
 //
-func GetBlock(hash string) ([]byte, error) {
+func GetBlock(hash string, buf []byte, resp http.ResponseWriter) (int, error) {
        // Attempt to read the requested hash from a keep volume.
        errorToCaller := NotFoundError
 
        for _, vol := range KeepVM.AllReadable() {
-               buf, err := vol.Get(hash)
+               size, err := vol.Get(hash, buf)
                if err != nil {
                        // IsNotExist is an expected error and may be
                        // ignored. All other errors are logged. In
@@ -523,23 +567,22 @@ func GetBlock(hash string) ([]byte, error) {
                }
                // Check the file checksum.
                //
-               filehash := fmt.Sprintf("%x", md5.Sum(buf))
+               filehash := fmt.Sprintf("%x", md5.Sum(buf[:size]))
                if filehash != hash {
                        // TODO: Try harder to tell a sysadmin about
                        // this.
                        log.Printf("%s: checksum mismatch for request %s (actual %s)",
                                vol, hash, filehash)
                        errorToCaller = DiskHashError
-                       bufs.Put(buf)
                        continue
                }
                if errorToCaller == DiskHashError {
                        log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
                                vol, hash)
                }
-               return buf, nil
+               return size, nil
        }
-       return nil, errorToCaller
+       return 0, errorToCaller
 }
 
 // PutBlock Stores the BLOCK (identified by the content id HASH) in Keep.
index c5349d399c32ebc5692d0a39d4cc8c9c3ad83e1a..dda7edcec3509e683465a93d5eb775bf18f16d19 100644 (file)
@@ -45,12 +45,13 @@ func testGetBlock(t TB, factory TestableVolumeManagerFactory, testHash string, t
        testableVolumes[1].PutRaw(testHash, testBlock)
 
        // Get should pass
-       buf, err := GetBlock(testHash)
+       buf := make([]byte, len(testBlock))
+       n, err := GetBlock(testHash, buf, nil)
        if err != nil {
                t.Fatalf("Error while getting block %s", err)
        }
-       if bytes.Compare(buf, testBlock) != 0 {
-               t.Errorf("Put succeeded but Get returned %+v, expected %+v", buf, testBlock)
+       if bytes.Compare(buf[:n], testBlock) != 0 {
+               t.Errorf("Put succeeded but Get returned %+v, expected %+v", buf[:n], testBlock)
        }
 }
 
@@ -64,9 +65,10 @@ func testPutRawBadDataGetBlock(t TB, factory TestableVolumeManagerFactory,
        testableVolumes[1].PutRaw(testHash, badData)
 
        // Get should fail
-       _, err := GetBlock(testHash)
+       buf := make([]byte, BlockSize)
+       size, err := GetBlock(testHash, buf, nil)
        if err == nil {
-               t.Fatalf("Expected error while getting corrupt block %v", testHash)
+               t.Fatalf("Got %+q, expected error while getting corrupt block %v", buf[:size], testHash)
        }
 }
 
@@ -85,11 +87,12 @@ func testPutBlock(t TB, factory TestableVolumeManagerFactory, testHash string, t
        }
 
        // Check that PutBlock stored the data as expected
-       buf, err := GetBlock(testHash)
+       buf := make([]byte, BlockSize)
+       size, err := GetBlock(testHash, buf, nil)
        if err != nil {
                t.Fatalf("Error during GetBlock for %q: %s", testHash, err)
-       } else if bytes.Compare(buf, testBlock) != 0 {
-               t.Errorf("Get response incorrect. Expected %q; found %q", testBlock, buf)
+       } else if bytes.Compare(buf[:size], testBlock) != 0 {
+               t.Errorf("Get response incorrect. Expected %q; found %q", testBlock, buf[:size])
        }
 }
 
@@ -109,10 +112,11 @@ func testPutBlockCorrupt(t TB, factory TestableVolumeManagerFactory,
 
        // Put succeeded and overwrote the badData in one volume,
        // and Get should return the testBlock now, ignoring the bad data.
-       buf, err := GetBlock(testHash)
+       buf := make([]byte, BlockSize)
+       size, err := GetBlock(testHash, buf, nil)
        if err != nil {
                t.Fatalf("Error during GetBlock for %q: %s", testHash, err)
-       } else if bytes.Compare(buf, testBlock) != 0 {
-               t.Errorf("Get response incorrect. Expected %q; found %q", testBlock, buf)
+       } else if bytes.Compare(buf[:size], testBlock) != 0 {
+               t.Errorf("Get response incorrect. Expected %q; found %q", testBlock, buf[:size])
        }
 }
index 93ee43c446cf96624a09a0ff7660d198cacdd3cd..80d867010568090592283e14a8bff28a35b4dda4 100644 (file)
@@ -91,6 +91,7 @@ var (
        TooLongError        = &KeepError{413, "Block is too large"}
        MethodDisabledError = &KeepError{405, "Method disabled"}
        ErrNotImplemented   = &KeepError{500, "Unsupported configuration"}
+       ErrClientDisconnect = &KeepError{503, "Client disconnected"}
 )
 
 func (e *KeepError) Error() string {
index 2a1c3d243ab922855b2bf6344f69631a78272662..c0adbc0bd74dad7d115dfe70a3374b2815704c56 100644 (file)
@@ -66,12 +66,13 @@ func TestGetBlock(t *testing.T) {
        }
 
        // Check that GetBlock returns success.
-       result, err := GetBlock(TestHash)
+       buf := make([]byte, BlockSize)
+       size, err := GetBlock(TestHash, buf, nil)
        if err != nil {
                t.Errorf("GetBlock error: %s", err)
        }
-       if fmt.Sprint(result) != fmt.Sprint(TestBlock) {
-               t.Errorf("expected %s, got %s", TestBlock, result)
+       if bytes.Compare(buf[:size], TestBlock) != 0 {
+               t.Errorf("got %v, expected %v", buf[:size], TestBlock)
        }
 }
 
@@ -86,9 +87,10 @@ func TestGetBlockMissing(t *testing.T) {
        defer KeepVM.Close()
 
        // Check that GetBlock returns failure.
-       result, err := GetBlock(TestHash)
+       buf := make([]byte, BlockSize)
+       size, err := GetBlock(TestHash, buf, nil)
        if err != NotFoundError {
-               t.Errorf("Expected NotFoundError, got %v", result)
+               t.Errorf("Expected NotFoundError, got %v, err %v", buf[:size], err)
        }
 }
 
@@ -107,9 +109,10 @@ func TestGetBlockCorrupt(t *testing.T) {
        vols[0].Put(TestHash, BadBlock)
 
        // Check that GetBlock returns failure.
-       result, err := GetBlock(TestHash)
+       buf := make([]byte, BlockSize)
+       size, err := GetBlock(TestHash, buf, nil)
        if err != DiskHashError {
-               t.Errorf("Expected DiskHashError, got %v (buf: %v)", err, result)
+               t.Errorf("Expected DiskHashError, got %v (buf: %v)", err, buf[:size])
        }
 }
 
@@ -133,13 +136,14 @@ func TestPutBlockOK(t *testing.T) {
        }
 
        vols := KeepVM.AllReadable()
-       result, err := vols[1].Get(TestHash)
+       buf := make([]byte, BlockSize)
+       n, err := vols[1].Get(TestHash, buf)
        if err != nil {
                t.Fatalf("Volume #0 Get returned error: %v", err)
        }
-       if string(result) != string(TestBlock) {
+       if string(buf[:n]) != string(TestBlock) {
                t.Fatalf("PutBlock stored '%s', Get retrieved '%s'",
-                       string(TestBlock), string(result))
+                       string(TestBlock), string(buf[:n]))
        }
 }
 
@@ -162,14 +166,14 @@ func TestPutBlockOneVol(t *testing.T) {
                t.Fatalf("PutBlock: n %d err %v", n, err)
        }
 
-       result, err := GetBlock(TestHash)
+       buf := make([]byte, BlockSize)
+       size, err := GetBlock(TestHash, buf, nil)
        if err != nil {
                t.Fatalf("GetBlock: %v", err)
        }
-       if string(result) != string(TestBlock) {
-               t.Error("PutBlock/GetBlock mismatch")
-               t.Fatalf("PutBlock stored '%s', GetBlock retrieved '%s'",
-                       string(TestBlock), string(result))
+       if bytes.Compare(buf[:size], TestBlock) != 0 {
+               t.Fatalf("PutBlock stored %+q, GetBlock retrieved %+q",
+                       TestBlock, buf[:size])
        }
 }
 
@@ -191,7 +195,7 @@ func TestPutBlockMD5Fail(t *testing.T) {
        }
 
        // Confirm that GetBlock fails to return anything.
-       if result, err := GetBlock(TestHash); err != NotFoundError {
+       if result, err := GetBlock(TestHash, make([]byte, BlockSize), nil); err != NotFoundError {
                t.Errorf("GetBlock succeeded after a corrupt block store (result = %s, err = %v)",
                        string(result), err)
        }
@@ -216,10 +220,11 @@ func TestPutBlockCorrupt(t *testing.T) {
        }
 
        // The block on disk should now match TestBlock.
-       if block, err := GetBlock(TestHash); err != nil {
+       buf := make([]byte, BlockSize)
+       if size, err := GetBlock(TestHash, buf, nil); err != nil {
                t.Errorf("GetBlock: %v", err)
-       } else if bytes.Compare(block, TestBlock) != 0 {
-               t.Errorf("GetBlock returned: '%s'", string(block))
+       } else if bytes.Compare(buf[:size], TestBlock) != 0 {
+               t.Errorf("Got %+q, expected %+q", buf[:size], TestBlock)
        }
 }
 
@@ -290,12 +295,13 @@ func TestPutBlockTouchFails(t *testing.T) {
                t.Errorf("mtime was changed on vols[0]:\noldMtime = %v\nnewMtime = %v\n",
                        oldMtime, newMtime)
        }
-       result, err := vols[1].Get(TestHash)
+       buf := make([]byte, BlockSize)
+       n, err := vols[1].Get(TestHash, buf)
        if err != nil {
                t.Fatalf("vols[1]: %v", err)
        }
-       if bytes.Compare(result, TestBlock) != 0 {
-               t.Errorf("new block does not match test block\nnew block = %v\n", result)
+       if bytes.Compare(buf[:n], TestBlock) != 0 {
+               t.Errorf("new block does not match test block\nnew block = %v\n", buf[:n])
        }
 }
 
index a93b72cf611cfc4dfa6283b8bd249c02db52ca19..0f556b538ac7ae15b1939f61bad13be3ed0404e5 100644 (file)
@@ -19,26 +19,40 @@ type LoggingResponseWriter struct {
        sentHdr      time.Time
 }
 
+// CloseNotify implements http.CloseNotifier.
+func (resp *LoggingResponseWriter) CloseNotify() <-chan bool {
+       wrapped, ok := resp.ResponseWriter.(http.CloseNotifier)
+       if !ok {
+               // If upstream doesn't implement CloseNotifier, we can
+               // satisfy the interface by returning a channel that
+               // never sends anything (the interface doesn't
+               // guarantee that anything will ever be sent on the
+               // channel even if the client disconnects).
+               return nil
+       }
+       return wrapped.CloseNotify()
+}
+
 // WriteHeader writes header to ResponseWriter
-func (loggingWriter *LoggingResponseWriter) WriteHeader(code int) {
-       if loggingWriter.sentHdr == zeroTime {
-               loggingWriter.sentHdr = time.Now()
+func (resp *LoggingResponseWriter) WriteHeader(code int) {
+       if resp.sentHdr == zeroTime {
+               resp.sentHdr = time.Now()
        }
-       loggingWriter.Status = code
-       loggingWriter.ResponseWriter.WriteHeader(code)
+       resp.Status = code
+       resp.ResponseWriter.WriteHeader(code)
 }
 
 var zeroTime time.Time
 
-func (loggingWriter *LoggingResponseWriter) Write(data []byte) (int, error) {
-       if loggingWriter.Length == 0 && len(data) > 0 && loggingWriter.sentHdr == zeroTime {
-               loggingWriter.sentHdr = time.Now()
+func (resp *LoggingResponseWriter) Write(data []byte) (int, error) {
+       if resp.Length == 0 && len(data) > 0 && resp.sentHdr == zeroTime {
+               resp.sentHdr = time.Now()
        }
-       loggingWriter.Length += len(data)
-       if loggingWriter.Status >= 400 {
-               loggingWriter.ResponseBody += string(data)
+       resp.Length += len(data)
+       if resp.Status >= 400 {
+               resp.ResponseBody += string(data)
        }
-       return loggingWriter.ResponseWriter.Write(data)
+       return resp.ResponseWriter.Write(data)
 }
 
 // LoggingRESTRouter is used to add logging capabilities to mux.Router
@@ -46,18 +60,18 @@ type LoggingRESTRouter struct {
        router http.Handler
 }
 
-func (loggingRouter *LoggingRESTRouter) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+func (loggingRouter *LoggingRESTRouter) ServeHTTP(wrappedResp http.ResponseWriter, req *http.Request) {
        t0 := time.Now()
-       loggingWriter := LoggingResponseWriter{http.StatusOK, 0, resp, "", zeroTime}
-       loggingRouter.router.ServeHTTP(&loggingWriter, req)
-       statusText := http.StatusText(loggingWriter.Status)
-       if loggingWriter.Status >= 400 {
-               statusText = strings.Replace(loggingWriter.ResponseBody, "\n", "", -1)
+       resp := LoggingResponseWriter{http.StatusOK, 0, wrappedResp, "", zeroTime}
+       loggingRouter.router.ServeHTTP(&resp, req)
+       statusText := http.StatusText(resp.Status)
+       if resp.Status >= 400 {
+               statusText = strings.Replace(resp.ResponseBody, "\n", "", -1)
        }
        now := time.Now()
        tTotal := now.Sub(t0)
-       tLatency := loggingWriter.sentHdr.Sub(t0)
-       tResponse := now.Sub(loggingWriter.sentHdr)
-       log.Printf("[%s] %s %s %d %.6fs %.6fs %.6fs %d %d \"%s\"", req.RemoteAddr, req.Method, req.URL.Path[1:], req.ContentLength, tTotal.Seconds(), tLatency.Seconds(), tResponse.Seconds(), loggingWriter.Status, loggingWriter.Length, statusText)
+       tLatency := resp.sentHdr.Sub(t0)
+       tResponse := now.Sub(resp.sentHdr)
+       log.Printf("[%s] %s %s %d %.6fs %.6fs %.6fs %d %d \"%s\"", req.RemoteAddr, req.Method, req.URL.Path[1:], req.ContentLength, tTotal.Seconds(), tLatency.Seconds(), tResponse.Seconds(), resp.Status, resp.Length, statusText)
 
 }
diff --git a/services/keepstore/logging_router_test.go b/services/keepstore/logging_router_test.go
new file mode 100644 (file)
index 0000000..aa88556
--- /dev/null
@@ -0,0 +1,10 @@
+package main
+
+import (
+       "net/http"
+       "testing"
+)
+
+func TestLoggingResponseWriterImplementsCloseNotifier(t *testing.T) {
+       http.ResponseWriter(&LoggingResponseWriter{}).(http.CloseNotifier).CloseNotify()
+}
index 79a680d58a3efebab11467ca2f3a474d2e0d0feb..d068b2a6e5da0601eb7f9bb4c6a0ec8e4b891774 100644 (file)
@@ -153,20 +153,18 @@ func (v *S3Volume) Check() error {
        return nil
 }
 
-func (v *S3Volume) Get(loc string) ([]byte, error) {
+func (v *S3Volume) Get(loc string, buf []byte) (int, error) {
        rdr, err := v.Bucket.GetReader(loc)
        if err != nil {
-               return nil, v.translateError(err)
+               return 0, v.translateError(err)
        }
        defer rdr.Close()
-       buf := bufs.Get(BlockSize)
        n, err := io.ReadFull(rdr, buf)
        switch err {
        case nil, io.EOF, io.ErrUnexpectedEOF:
-               return buf[:n], nil
+               return n, nil
        default:
-               bufs.Put(buf)
-               return nil, v.translateError(err)
+               return 0, v.translateError(err)
        }
 }
 
index ac9406178c00ffaffc024f61ffc981479fc9d4e9..d111caeac8e5b571202502e0aea63f07816365ba 100644 (file)
@@ -290,26 +290,27 @@ func performTrashWorkerTest(testData TrashWorkerTestData, t *testing.T) {
        expectEqualWithin(t, time.Second, 0, func() interface{} { return trashq.Status().InProgress })
 
        // Verify Locator1 to be un/deleted as expected
-       data, _ := GetBlock(testData.Locator1)
+       buf := make([]byte, BlockSize)
+       size, err := GetBlock(testData.Locator1, buf, nil)
        if testData.ExpectLocator1 {
-               if len(data) == 0 {
+               if size == 0 || err != nil {
                        t.Errorf("Expected Locator1 to be still present: %s", testData.Locator1)
                }
        } else {
-               if len(data) > 0 {
+               if size > 0 || err == nil {
                        t.Errorf("Expected Locator1 to be deleted: %s", testData.Locator1)
                }
        }
 
        // Verify Locator2 to be un/deleted as expected
        if testData.Locator1 != testData.Locator2 {
-               data, _ = GetBlock(testData.Locator2)
+               size, err = GetBlock(testData.Locator2, buf, nil)
                if testData.ExpectLocator2 {
-                       if len(data) == 0 {
+                       if size == 0 || err != nil {
                                t.Errorf("Expected Locator2 to be still present: %s", testData.Locator2)
                        }
                } else {
-                       if len(data) > 0 {
+                       if size > 0 || err == nil {
                                t.Errorf("Expected Locator2 to be deleted: %s", testData.Locator2)
                        }
                }
@@ -321,7 +322,8 @@ func performTrashWorkerTest(testData TrashWorkerTestData, t *testing.T) {
        if testData.DifferentMtimes {
                locatorFoundIn := 0
                for _, volume := range KeepVM.AllReadable() {
-                       if _, err := volume.Get(testData.Locator1); err == nil {
+                       buf := make([]byte, BlockSize)
+                       if _, err := volume.Get(testData.Locator1, buf); err == nil {
                                locatorFoundIn = locatorFoundIn + 1
                        }
                }
index 17da54fdadbca571cae93fde18342d2776b4e3a7..8ae6660fd477fa90365a019c837a121a08cc9595 100644 (file)
@@ -10,17 +10,14 @@ import (
 // for example, a single mounted disk, a RAID array, an Amazon S3 volume,
 // etc.
 type Volume interface {
-       // Get a block. IFF the returned error is nil, the caller must
-       // put the returned slice back into the buffer pool when it's
-       // finished with it. (Otherwise, the buffer pool will be
-       // depleted and eventually -- when all available buffers are
-       // used and not returned -- operations will reach deadlock.)
+       // Get a block: copy the block data into buf, and return the
+       // number of bytes copied.
        //
        // loc is guaranteed to consist of 32 or more lowercase hex
        // digits.
        //
-       // Get should not verify the integrity of the returned data:
-       // it should just return whatever was found in its backing
+       // Get should not verify the integrity of the data: it should
+       // just return whatever was found in its backing
        // store. (Integrity checking is the caller's responsibility.)
        //
        // If an error is encountered that prevents it from
@@ -36,10 +33,12 @@ type Volume interface {
        // access log if the block is not found on any other volumes
        // either).
        //
-       // If the data in the backing store is bigger than BlockSize,
-       // Get is permitted to return an error without reading any of
-       // the data.
-       Get(loc string) ([]byte, error)
+       // If the data in the backing store is bigger than len(buf),
+       // then Get is permitted to return an error without reading
+       // any of the data.
+       //
+       // len(buf) will not exceed BlockSize.
+       Get(loc string, buf []byte) (int, error)
 
        // Compare the given data with the stored data (i.e., what Get
        // would return). If equal, return nil. If not, return
index 95166c252f004bef5a1ba1f583f4e13f54117f47..105795c146e2d932f3066ee2f26948dd639d9436 100644 (file)
@@ -89,14 +89,13 @@ func testGet(t TB, factory TestableVolumeFactory) {
 
        v.PutRaw(TestHash, TestBlock)
 
-       buf, err := v.Get(TestHash)
+       buf := make([]byte, BlockSize)
+       n, err := v.Get(TestHash, buf)
        if err != nil {
                t.Fatal(err)
        }
 
-       bufs.Put(buf)
-
-       if bytes.Compare(buf, TestBlock) != 0 {
+       if bytes.Compare(buf[:n], TestBlock) != 0 {
                t.Errorf("expected %s, got %s", string(TestBlock), string(buf))
        }
 }
@@ -107,7 +106,8 @@ func testGetNoSuchBlock(t TB, factory TestableVolumeFactory) {
        v := factory(t)
        defer v.Teardown()
 
-       if _, err := v.Get(TestHash2); err == nil {
+       buf := make([]byte, BlockSize)
+       if _, err := v.Get(TestHash2, buf); err == nil {
                t.Errorf("Expected error while getting non-existing block %v", TestHash2)
        }
 }
@@ -208,24 +208,22 @@ func testPutBlockWithDifferentContent(t TB, factory TestableVolumeFactory, testH
        v.PutRaw(testHash, testDataA)
 
        putErr := v.Put(testHash, testDataB)
-       buf, getErr := v.Get(testHash)
+       buf := make([]byte, BlockSize)
+       n, getErr := v.Get(testHash, buf)
        if putErr == nil {
                // Put must not return a nil error unless it has
                // overwritten the existing data.
-               if bytes.Compare(buf, testDataB) != 0 {
-                       t.Errorf("Put succeeded but Get returned %+q, expected %+q", buf, testDataB)
+               if bytes.Compare(buf[:n], testDataB) != 0 {
+                       t.Errorf("Put succeeded but Get returned %+q, expected %+q", buf[:n], testDataB)
                }
        } else {
                // It is permissible for Put to fail, but it must
                // leave us with either the original data, the new
                // data, or nothing at all.
-               if getErr == nil && bytes.Compare(buf, testDataA) != 0 && bytes.Compare(buf, testDataB) != 0 {
-                       t.Errorf("Put failed but Get returned %+q, which is neither %+q nor %+q", buf, testDataA, testDataB)
+               if getErr == nil && bytes.Compare(buf[:n], testDataA) != 0 && bytes.Compare(buf[:n], testDataB) != 0 {
+                       t.Errorf("Put failed but Get returned %+q, which is neither %+q nor %+q", buf[:n], testDataA, testDataB)
                }
        }
-       if getErr == nil {
-               bufs.Put(buf)
-       }
 }
 
 // Put and get multiple blocks
@@ -253,34 +251,32 @@ func testPutMultipleBlocks(t TB, factory TestableVolumeFactory) {
                t.Errorf("Got err putting block %q: %q, expected nil", TestBlock3, err)
        }
 
-       data, err := v.Get(TestHash)
+       data := make([]byte, BlockSize)
+       n, err := v.Get(TestHash, data)
        if err != nil {
                t.Error(err)
        } else {
-               if bytes.Compare(data, TestBlock) != 0 {
-                       t.Errorf("Block present, but got %+q, expected %+q", data, TestBlock)
+               if bytes.Compare(data[:n], TestBlock) != 0 {
+                       t.Errorf("Block present, but got %+q, expected %+q", data[:n], TestBlock)
                }
-               bufs.Put(data)
        }
 
-       data, err = v.Get(TestHash2)
+       n, err = v.Get(TestHash2, data)
        if err != nil {
                t.Error(err)
        } else {
-               if bytes.Compare(data, TestBlock2) != 0 {
-                       t.Errorf("Block present, but got %+q, expected %+q", data, TestBlock2)
+               if bytes.Compare(data[:n], TestBlock2) != 0 {
+                       t.Errorf("Block present, but got %+q, expected %+q", data[:n], TestBlock2)
                }
-               bufs.Put(data)
        }
 
-       data, err = v.Get(TestHash3)
+       n, err = v.Get(TestHash3, data)
        if err != nil {
                t.Error(err)
        } else {
-               if bytes.Compare(data, TestBlock3) != 0 {
-                       t.Errorf("Block present, but to %+q, expected %+q", data, TestBlock3)
+               if bytes.Compare(data[:n], TestBlock3) != 0 {
+                       t.Errorf("Block present, but to %+q, expected %+q", data[:n], TestBlock3)
                }
-               bufs.Put(data)
        }
 }
 
@@ -426,14 +422,12 @@ func testDeleteNewBlock(t TB, factory TestableVolumeFactory) {
        if err := v.Trash(TestHash); err != nil {
                t.Error(err)
        }
-       data, err := v.Get(TestHash)
+       data := make([]byte, BlockSize)
+       n, err := v.Get(TestHash, data)
        if err != nil {
                t.Error(err)
-       } else {
-               if bytes.Compare(data, TestBlock) != 0 {
-                       t.Errorf("Got data %+q, expected %+q", data, TestBlock)
-               }
-               bufs.Put(data)
+       } else if bytes.Compare(data[:n], TestBlock) != 0 {
+               t.Errorf("Got data %+q, expected %+q", data[:n], TestBlock)
        }
 }
 
@@ -455,7 +449,8 @@ func testDeleteOldBlock(t TB, factory TestableVolumeFactory) {
        if err := v.Trash(TestHash); err != nil {
                t.Error(err)
        }
-       if _, err := v.Get(TestHash); err == nil || !os.IsNotExist(err) {
+       data := make([]byte, BlockSize)
+       if _, err := v.Get(TestHash, data); err == nil || !os.IsNotExist(err) {
                t.Errorf("os.IsNotExist(%v) should have been true", err)
        }
 }
@@ -514,9 +509,10 @@ func testUpdateReadOnly(t TB, factory TestableVolumeFactory) {
        }
 
        v.PutRaw(TestHash, TestBlock)
+       buf := make([]byte, BlockSize)
 
        // Get from read-only volume should succeed
-       _, err := v.Get(TestHash)
+       _, err := v.Get(TestHash, buf)
        if err != nil {
                t.Errorf("got err %v, expected nil", err)
        }
@@ -526,7 +522,7 @@ func testUpdateReadOnly(t TB, factory TestableVolumeFactory) {
        if err == nil {
                t.Errorf("Expected error when putting block in a read-only volume")
        }
-       _, err = v.Get(TestHash2)
+       _, err = v.Get(TestHash2, buf)
        if err == nil {
                t.Errorf("Expected error when getting block whose put in read-only volume failed")
        }
@@ -561,45 +557,45 @@ func testGetConcurrent(t TB, factory TestableVolumeFactory) {
        v.PutRaw(TestHash3, TestBlock3)
 
        sem := make(chan int)
-       go func(sem chan int) {
-               buf, err := v.Get(TestHash)
+       go func() {
+               buf := make([]byte, BlockSize)
+               n, err := v.Get(TestHash, buf)
                if err != nil {
                        t.Errorf("err1: %v", err)
                }
-               bufs.Put(buf)
-               if bytes.Compare(buf, TestBlock) != 0 {
-                       t.Errorf("buf should be %s, is %s", string(TestBlock), string(buf))
+               if bytes.Compare(buf[:n], TestBlock) != 0 {
+                       t.Errorf("buf should be %s, is %s", string(TestBlock), string(buf[:n]))
                }
                sem <- 1
-       }(sem)
+       }()
 
-       go func(sem chan int) {
-               buf, err := v.Get(TestHash2)
+       go func() {
+               buf := make([]byte, BlockSize)
+               n, err := v.Get(TestHash2, buf)
                if err != nil {
                        t.Errorf("err2: %v", err)
                }
-               bufs.Put(buf)
-               if bytes.Compare(buf, TestBlock2) != 0 {
-                       t.Errorf("buf should be %s, is %s", string(TestBlock2), string(buf))
+               if bytes.Compare(buf[:n], TestBlock2) != 0 {
+                       t.Errorf("buf should be %s, is %s", string(TestBlock2), string(buf[:n]))
                }
                sem <- 1
-       }(sem)
+       }()
 
-       go func(sem chan int) {
-               buf, err := v.Get(TestHash3)
+       go func() {
+               buf := make([]byte, BlockSize)
+               n, err := v.Get(TestHash3, buf)
                if err != nil {
                        t.Errorf("err3: %v", err)
                }
-               bufs.Put(buf)
-               if bytes.Compare(buf, TestBlock3) != 0 {
-                       t.Errorf("buf should be %s, is %s", string(TestBlock3), string(buf))
+               if bytes.Compare(buf[:n], TestBlock3) != 0 {
+                       t.Errorf("buf should be %s, is %s", string(TestBlock3), string(buf[:n]))
                }
                sem <- 1
-       }(sem)
+       }()
 
        // Wait for all goroutines to finish
-       for done := 0; done < 3; {
-               done += <-sem
+       for done := 0; done < 3; done++ {
+               <-sem
        }
 }
 
@@ -639,36 +635,34 @@ func testPutConcurrent(t TB, factory TestableVolumeFactory) {
        }(sem)
 
        // Wait for all goroutines to finish
-       for done := 0; done < 3; {
-               done += <-sem
+       for done := 0; done < 3; done++ {
+               <-sem
        }
 
        // Double check that we actually wrote the blocks we expected to write.
-       buf, err := v.Get(TestHash)
+       buf := make([]byte, BlockSize)
+       n, err := v.Get(TestHash, buf)
        if err != nil {
                t.Errorf("Get #1: %v", err)
        }
-       bufs.Put(buf)
-       if bytes.Compare(buf, TestBlock) != 0 {
-               t.Errorf("Get #1: expected %s, got %s", string(TestBlock), string(buf))
+       if bytes.Compare(buf[:n], TestBlock) != 0 {
+               t.Errorf("Get #1: expected %s, got %s", string(TestBlock), string(buf[:n]))
        }
 
-       buf, err = v.Get(TestHash2)
+       n, err = v.Get(TestHash2, buf)
        if err != nil {
                t.Errorf("Get #2: %v", err)
        }
-       bufs.Put(buf)
-       if bytes.Compare(buf, TestBlock2) != 0 {
-               t.Errorf("Get #2: expected %s, got %s", string(TestBlock2), string(buf))
+       if bytes.Compare(buf[:n], TestBlock2) != 0 {
+               t.Errorf("Get #2: expected %s, got %s", string(TestBlock2), string(buf[:n]))
        }
 
-       buf, err = v.Get(TestHash3)
+       n, err = v.Get(TestHash3, buf)
        if err != nil {
                t.Errorf("Get #3: %v", err)
        }
-       bufs.Put(buf)
-       if bytes.Compare(buf, TestBlock3) != 0 {
-               t.Errorf("Get #3: expected %s, got %s", string(TestBlock3), string(buf))
+       if bytes.Compare(buf[:n], TestBlock3) != 0 {
+               t.Errorf("Get #3: expected %s, got %s", string(TestBlock3), string(buf[:n]))
        }
 }
 
@@ -689,14 +683,13 @@ func testPutFullBlock(t TB, factory TestableVolumeFactory) {
        if err != nil {
                t.Fatal(err)
        }
-       rdata, err := v.Get(hash)
+       buf := make([]byte, BlockSize)
+       n, err := v.Get(hash, buf)
        if err != nil {
                t.Error(err)
-       } else {
-               defer bufs.Put(rdata)
        }
-       if bytes.Compare(rdata, wdata) != 0 {
-               t.Error("rdata != wdata")
+       if bytes.Compare(buf[:n], wdata) != 0 {
+               t.Error("buf %+q != wdata %+q", buf[:n], wdata)
        }
 }
 
@@ -717,14 +710,14 @@ func testTrashUntrash(t TB, factory TestableVolumeFactory) {
        v.PutRaw(TestHash, TestBlock)
        v.TouchWithDate(TestHash, time.Now().Add(-2*blobSignatureTTL))
 
-       buf, err := v.Get(TestHash)
+       buf := make([]byte, BlockSize)
+       n, err := v.Get(TestHash, buf)
        if err != nil {
                t.Fatal(err)
        }
-       if bytes.Compare(buf, TestBlock) != 0 {
-               t.Errorf("Got data %+q, expected %+q", buf, TestBlock)
+       if bytes.Compare(buf[:n], TestBlock) != 0 {
+               t.Errorf("Got data %+q, expected %+q", buf[:n], TestBlock)
        }
-       bufs.Put(buf)
 
        // Trash
        err = v.Trash(TestHash)
@@ -737,7 +730,7 @@ func testTrashUntrash(t TB, factory TestableVolumeFactory) {
                        t.Error(err)
                }
        } else {
-               _, err = v.Get(TestHash)
+               _, err = v.Get(TestHash, buf)
                if err == nil || !os.IsNotExist(err) {
                        t.Errorf("os.IsNotExist(%v) should have been true", err)
                }
@@ -750,14 +743,13 @@ func testTrashUntrash(t TB, factory TestableVolumeFactory) {
        }
 
        // Get the block - after trash and untrash sequence
-       buf, err = v.Get(TestHash)
+       n, err = v.Get(TestHash, buf)
        if err != nil {
                t.Fatal(err)
        }
-       if bytes.Compare(buf, TestBlock) != 0 {
-               t.Errorf("Got data %+q, expected %+q", buf, TestBlock)
+       if bytes.Compare(buf[:n], TestBlock) != 0 {
+               t.Errorf("Got data %+q, expected %+q", buf[:n], TestBlock)
        }
-       bufs.Put(buf)
 }
 
 func testTrashEmptyTrashUntrash(t TB, factory TestableVolumeFactory) {
@@ -768,14 +760,14 @@ func testTrashEmptyTrashUntrash(t TB, factory TestableVolumeFactory) {
        }(trashLifetime)
 
        checkGet := func() error {
-               buf, err := v.Get(TestHash)
+               buf := make([]byte, BlockSize)
+               n, err := v.Get(TestHash, buf)
                if err != nil {
                        return err
                }
-               if bytes.Compare(buf, TestBlock) != 0 {
-                       t.Fatalf("Got data %+q, expected %+q", buf, TestBlock)
+               if bytes.Compare(buf[:n], TestBlock) != 0 {
+                       t.Fatalf("Got data %+q, expected %+q", buf[:n], TestBlock)
                }
-               bufs.Put(buf)
                return nil
        }
 
index e8a5a338f51cb25d47f419d02f6ca76a211d535c..5671b8d4a9fd7405f8ca7fd35a18fdf10289a059 100644 (file)
@@ -113,17 +113,16 @@ func (v *MockVolume) Compare(loc string, buf []byte) error {
        }
 }
 
-func (v *MockVolume) Get(loc string) ([]byte, error) {
+func (v *MockVolume) Get(loc string, buf []byte) (int, error) {
        v.gotCall("Get")
        <-v.Gate
        if v.Bad {
-               return nil, errors.New("Bad volume")
+               return 0, errors.New("Bad volume")
        } else if block, ok := v.Store[loc]; ok {
-               buf := bufs.Get(len(block))
-               copy(buf, block)
-               return buf, nil
+               copy(buf[:len(block)], block)
+               return len(block), nil
        }
-       return nil, os.ErrNotExist
+       return 0, os.ErrNotExist
 }
 
 func (v *MockVolume) Put(loc string, block []byte) error {
index 996068cf3d2438f71364b0b2c9ddafcdbd712c54..edec048dfe5836701f821beefdc3d7b131777982 100644 (file)
@@ -181,26 +181,24 @@ func (v *UnixVolume) stat(path string) (os.FileInfo, error) {
        return stat, err
 }
 
-// Get retrieves a block identified by the locator string "loc", and
-// returns its contents as a byte slice.
-//
-// Get returns a nil buffer IFF it returns a non-nil error.
-func (v *UnixVolume) Get(loc string) ([]byte, error) {
+// Get retrieves a block, copies it to the given slice, and returns
+// the number of bytes copied.
+func (v *UnixVolume) Get(loc string, buf []byte) (int, error) {
        path := v.blockPath(loc)
        stat, err := v.stat(path)
        if err != nil {
-               return nil, v.translateError(err)
+               return 0, v.translateError(err)
+       }
+       if stat.Size() > int64(len(buf)) {
+               return 0, TooLongError
        }
-       buf := bufs.Get(int(stat.Size()))
+       var read int
+       size := int(stat.Size())
        err = v.getFunc(path, func(rdr io.Reader) error {
-               _, err = io.ReadFull(rdr, buf)
+               read, err = io.ReadFull(rdr, buf[:size])
                return err
        })
-       if err != nil {
-               bufs.Put(buf)
-               return nil, err
-       }
-       return buf, nil
+       return read, err
 }
 
 // Compare returns nil if Get(loc) would return the same content as
index 0775e89ed275d14f7e2be510084a52e39af84472..c95538bc4da380f7af5561984d7a069324cea970 100644 (file)
@@ -106,12 +106,13 @@ func TestGetNotFound(t *testing.T) {
        defer v.Teardown()
        v.Put(TestHash, TestBlock)
 
-       buf, err := v.Get(TestHash2)
+       buf := make([]byte, BlockSize)
+       n, err := v.Get(TestHash2, buf)
        switch {
        case os.IsNotExist(err):
                break
        case err == nil:
-               t.Errorf("Read should have failed, returned %s", string(buf))
+               t.Errorf("Read should have failed, returned %+q", buf[:n])
        default:
                t.Errorf("Read expected ErrNotExist, got: %s", err)
        }
@@ -151,7 +152,8 @@ func TestUnixVolumeReadonly(t *testing.T) {
 
        v.PutRaw(TestHash, TestBlock)
 
-       _, err := v.Get(TestHash)
+       buf := make([]byte, BlockSize)
+       _, err := v.Get(TestHash, buf)
        if err != nil {
                t.Errorf("got err %v, expected nil", err)
        }