From: radhika Date: Tue, 10 May 2016 17:23:28 +0000 (-0400) Subject: closes #8464 X-Git-Tag: 1.1.0~946 X-Git-Url: https://git.arvados.org/arvados.git/commitdiff_plain/44f0e83d50f688bf73c336747402d490346f5c34?hp=735502484467241d088fd3c2ebbccc0d6a628dc1 closes #8464 Merge branch '8464-crunch2-stdout' --- diff --git a/apps/workbench/Gemfile.lock b/apps/workbench/Gemfile.lock index b4e2400bed..fdcd375ed2 100644 --- a/apps/workbench/Gemfile.lock +++ b/apps/workbench/Gemfile.lock @@ -37,16 +37,17 @@ GEM minitest (~> 5.1) thread_safe (~> 0.1) tzinfo (~> 1.1) - addressable (2.3.6) + addressable (2.4.0) andand (1.3.3) angularjs-rails (1.3.8) arel (5.0.1.20140414130214) - arvados (0.1.20150511150219) - activesupport (>= 3.2.13) + arvados (0.1.20160420143004) + activesupport (>= 3, < 4.2.6) andand (~> 1.3, >= 1.3.3) - google-api-client (~> 0.6.3, >= 0.6.3) + google-api-client (>= 0.7, < 0.9) + i18n (~> 0) json (~> 1.7, >= 1.7.7) - jwt (>= 0.1.5, < 1.0.0) + jwt (>= 0.1.5, < 2) autoparse (0.3.3) addressable (>= 2.3.1) extlib (>= 0.9.15) @@ -93,24 +94,33 @@ GEM erubis (2.7.0) execjs (2.2.2) extlib (0.9.16) - faraday (0.8.9) - multipart-post (~> 1.2.0) + faraday (0.9.2) + multipart-post (>= 1.2, < 3) fast_stack (0.1.0) rake rake-compiler ffi (1.9.10) flamegraph (0.1.0) fast_stack - google-api-client (0.6.4) - addressable (>= 2.3.2) - autoparse (>= 0.3.3) - extlib (>= 0.9.15) - faraday (~> 0.8.4) - jwt (>= 0.1.5) - launchy (>= 2.1.1) - multi_json (>= 1.0.0) - signet (~> 0.4.5) - uuidtools (>= 2.1.0) + google-api-client (0.8.6) + activesupport (>= 3.2) + addressable (~> 2.3) + autoparse (~> 0.3) + extlib (~> 0.9) + faraday (~> 0.9) + googleauth (~> 0.3) + launchy (~> 2.4) + multi_json (~> 1.10) + retriable (~> 1.4) + signet (~> 0.6) + googleauth (0.5.1) + faraday (~> 0.9) + jwt (~> 1.4) + logging (~> 2.0) + memoist (~> 0.12) + multi_json (~> 1.11) + os (~> 0.9) + signet (~> 0.7) headless (1.0.2) highline (1.6.21) httpclient (2.6.0.1) @@ -119,8 +129,7 @@ GEM railties (>= 3.0, < 5.0) thor (>= 0.14, < 2.0) json (1.8.3) - jwt (0.1.13) - multi_json (>= 1.5) + jwt (1.5.4) launchy (2.4.3) addressable (~> 2.3) less (2.6.0) @@ -129,18 +138,23 @@ GEM actionpack (>= 3.1) less (~> 2.6.0) libv8 (3.16.14.7) + little-plugger (1.1.4) + logging (2.1.0) + little-plugger (~> 1.1) + multi_json (~> 1.10) mail (2.6.3) mime-types (>= 1.16, < 3) + memoist (0.14.0) metaclass (0.0.4) mime-types (2.99) mini_portile (0.6.2) - minitest (5.7.0) + minitest (5.8.4) mocha (1.1.0) metaclass (~> 0.0.1) morrisjs-rails (0.5.1) railties (> 3.1, < 5) - multi_json (1.11.2) - multipart-post (1.2.0) + multi_json (1.12.0) + multipart-post (2.0.0) net-scp (1.2.1) net-ssh (>= 2.6.5) net-sftp (2.1.2) @@ -151,6 +165,7 @@ GEM nokogiri (1.6.6.4) mini_portile (~> 0.6.0) oj (2.11.2) + os (0.9.6) passenger (4.0.57) daemon_controller (>= 1.2.0) rack @@ -190,6 +205,7 @@ GEM rake raphael-rails (2.1.2) ref (1.0.5) + retriable (1.4.1) ruby-debug-passenger (0.2.0) ruby-prof (0.15.2) rubyzip (1.1.7) @@ -207,11 +223,11 @@ GEM multi_json (~> 1.0) rubyzip (~> 1.0) websocket (~> 1.0) - signet (0.4.5) - addressable (>= 2.2.3) - faraday (~> 0.8.1) - jwt (>= 0.1.5) - multi_json (>= 1.0.0) + signet (0.7.2) + addressable (~> 2.3) + faraday (~> 0.9) + jwt (~> 1.5) + multi_json (~> 1.10) simplecov (0.9.1) docile (~> 1.1.0) multi_json (~> 1.0) @@ -238,7 +254,6 @@ GEM uglifier (2.7.0) execjs (>= 0.3.0) json (>= 1.8.0) - uuidtools (2.1.5) websocket (1.2.2) websocket-driver (0.5.1) websocket-extensions (>= 0.1.0) @@ -294,3 +309,6 @@ DEPENDENCIES therubyracer uglifier (>= 1.0.3) wiselinks + +BUNDLED WITH + 1.12.1 diff --git a/build/run-tests.sh b/build/run-tests.sh index c94f831a36..d22934199f 100755 --- a/build/run-tests.sh +++ b/build/run-tests.sh @@ -494,22 +494,23 @@ do_test_once() { # before trying "go test". Otherwise, coverage-reporting # mode makes Go show the wrong line numbers when reporting # compilation errors. + go get -t "git.curoverse.com/arvados.git/$1" || return 1 if [[ -n "${testargs[$1]}" ]] then # "go test -check.vv giturl" doesn't work, but this # does: - cd "$WORKSPACE/$1" && \ - go get -t "git.curoverse.com/arvados.git/$1" && \ - go test ${short:+-short} ${coverflags[@]} ${testargs[$1]} + cd "$WORKSPACE/$1" && go test ${short:+-short} ${coverflags[@]} ${testargs[$1]} else # The above form gets verbose even when testargs is # empty, so use this form in such cases: - go get -t "git.curoverse.com/arvados.git/$1" && \ - go test ${short:+-short} ${coverflags[@]} "git.curoverse.com/arvados.git/$1" + go test ${short:+-short} ${coverflags[@]} "git.curoverse.com/arvados.git/$1" fi result="$?" - go tool cover -html="$WORKSPACE/tmp/.$covername.tmp" -o "$WORKSPACE/tmp/$covername.html" - rm "$WORKSPACE/tmp/.$covername.tmp" + if [[ -f "$WORKSPACE/tmp/.$covername.tmp" ]] + then + go tool cover -html="$WORKSPACE/tmp/.$covername.tmp" -o "$WORKSPACE/tmp/$covername.html" + rm "$WORKSPACE/tmp/.$covername.tmp" + fi elif [[ "$2" == "pip" ]] then # $3 can name a path directory for us to use, including trailing @@ -753,7 +754,7 @@ stop_services test_apiserver() { rm -f "$WORKSPACE/services/api/git-commit.version" cd "$WORKSPACE/services/api" \ - && RAILS_ENV=test bundle exec rake test TESTOPTS=-v ${testargs[services/api]} + && env RAILS_ENV=test ${short:+RAILS_TEST_SHORT=1} bundle exec rake test TESTOPTS=-v ${testargs[services/api]} } do_test services/api apiserver @@ -799,21 +800,21 @@ done test_workbench() { start_nginx_proxy_services \ && cd "$WORKSPACE/apps/workbench" \ - && RAILS_ENV=test bundle exec rake test TESTOPTS=-v ${testargs[apps/workbench]} + && env RAILS_ENV=test ${short:+RAILS_TEST_SHORT=1} bundle exec rake test TESTOPTS=-v ${testargs[apps/workbench]} } do_test apps/workbench workbench test_workbench_benchmark() { start_nginx_proxy_services \ && cd "$WORKSPACE/apps/workbench" \ - && RAILS_ENV=test bundle exec rake test:benchmark ${testargs[apps/workbench_benchmark]} + && env RAILS_ENV=test ${short:+RAILS_TEST_SHORT=1} bundle exec rake test:benchmark ${testargs[apps/workbench_benchmark]} } do_test apps/workbench_benchmark workbench_benchmark test_workbench_profile() { start_nginx_proxy_services \ && cd "$WORKSPACE/apps/workbench" \ - && RAILS_ENV=test bundle exec rake test:profile ${testargs[apps/workbench_profile]} + && env RAILS_ENV=test ${short:+RAILS_TEST_SHORT=1} bundle exec rake test:profile ${testargs[apps/workbench_profile]} } do_test apps/workbench_profile workbench_profile diff --git a/services/api/Gemfile b/services/api/Gemfile index 48998aad36..1e25467e74 100644 --- a/services/api/Gemfile +++ b/services/api/Gemfile @@ -67,7 +67,6 @@ gem 'andand' gem 'test_after_commit', :group => :test -gem 'google-api-client', '~> 0.6.3' gem 'trollop' gem 'faye-websocket' diff --git a/services/api/Gemfile.lock b/services/api/Gemfile.lock index ac6be5a522..7be4e0f39d 100644 --- a/services/api/Gemfile.lock +++ b/services/api/Gemfile.lock @@ -32,23 +32,23 @@ GEM activemodel (>= 3.0.0) activesupport (>= 3.0.0) rack (>= 1.1.0) - addressable (2.3.8) + addressable (2.4.0) andand (1.3.3) arel (3.0.3) - arvados (0.1.20150615153458) - activesupport (>= 3.2.13) + arvados (0.1.20160420143004) + activesupport (>= 3, < 4.2.6) andand (~> 1.3, >= 1.3.3) - google-api-client (~> 0.6.3, >= 0.6.3) + google-api-client (>= 0.7, < 0.9) + i18n (~> 0) json (~> 1.7, >= 1.7.7) - jwt (>= 0.1.5, < 1.0.0) - arvados-cli (0.1.20151207150126) + jwt (>= 0.1.5, < 2) + arvados-cli (0.1.20160503204200) activesupport (~> 3.2, >= 3.2.13) andand (~> 1.3, >= 1.3.3) arvados (~> 0.1, >= 0.1.20150128223554) curb (~> 0.8) - google-api-client (~> 0.6.3, >= 0.6.3) + google-api-client (~> 0.6, >= 0.6.3, < 0.9) json (~> 1.7, >= 1.7.7) - jwt (>= 0.1.5, < 1.0.0) oj (~> 2.0, >= 2.0.3) trollop (~> 2.0) autoparse (0.3.3) @@ -69,7 +69,7 @@ GEM coffee-script-source execjs coffee-script-source (1.7.0) - curb (0.8.8) + curb (0.9.3) daemon_controller (1.2.0) database_cleaner (1.2.0) erubis (2.7.0) @@ -81,20 +81,21 @@ GEM factory_girl_rails (4.4.1) factory_girl (~> 4.4.0) railties (>= 3.0.0) - faraday (0.8.9) - multipart-post (~> 1.2.0) + faraday (0.9.2) + multipart-post (>= 1.2, < 3) faye-websocket (0.7.2) eventmachine (>= 0.12.0) websocket-driver (>= 0.3.1) - google-api-client (0.6.4) + google-api-client (0.7.1) addressable (>= 2.3.2) autoparse (>= 0.3.3) extlib (>= 0.9.15) - faraday (~> 0.8.4) + faraday (>= 0.9.0) jwt (>= 0.1.5) launchy (>= 2.1.1) multi_json (>= 1.0.0) - signet (~> 0.4.5) + retriable (>= 1.4) + signet (>= 0.5.0) uuidtools (>= 2.1.0) hashie (1.2.0) highline (1.6.21) @@ -118,8 +119,8 @@ GEM mime-types (1.25.1) mocha (1.1.0) metaclass (~> 0.0.1) - multi_json (1.11.1) - multipart-post (1.2.0) + multi_json (1.12.0) + multipart-post (2.0.0) net-scp (1.2.0) net-ssh (>= 2.6.5) net-sftp (2.1.2) @@ -133,7 +134,7 @@ GEM jwt (~> 0.1.4) multi_json (~> 1.0) rack (~> 1.2) - oj (2.11.4) + oj (2.15.0) omniauth (1.1.1) hashie (~> 1.2) rack @@ -177,6 +178,7 @@ GEM rdoc (3.12.2) json (~> 1.4) ref (1.0.5) + retriable (2.1.0) ruby-prof (0.15.2) rvm-capistrano (1.5.1) capistrano (~> 2.15.4) @@ -185,9 +187,9 @@ GEM railties (~> 3.2.0) sass (>= 3.1.10) tilt (~> 1.3) - signet (0.4.5) + signet (0.5.1) addressable (>= 2.2.3) - faraday (~> 0.8.1) + faraday (>= 0.9.0.rc5) jwt (>= 0.1.5) multi_json (>= 1.0.0) simplecov (0.7.1) @@ -213,7 +215,7 @@ GEM treetop (1.4.15) polyglot polyglot (>= 0.3.1) - trollop (2.1.1) + trollop (2.1.2) tzinfo (0.3.39) uglifier (2.5.0) execjs (>= 0.3.0) @@ -233,7 +235,6 @@ DEPENDENCIES database_cleaner factory_girl_rails faye-websocket - google-api-client (~> 0.6.3) jquery-rails mocha multi_json @@ -258,4 +259,4 @@ DEPENDENCIES uglifier (>= 1.0.3) BUNDLED WITH - 1.10.6 + 1.12.1 diff --git a/services/api/test/integration/collections_performance_test.rb b/services/api/test/integration/collections_performance_test.rb index 77a26e5b21..a952c202cb 100644 --- a/services/api/test/integration/collections_performance_test.rb +++ b/services/api/test/integration/collections_performance_test.rb @@ -6,6 +6,7 @@ class CollectionsApiPerformanceTest < ActionDispatch::IntegrationTest include ManifestExamples test "crud cycle for a collection with a big manifest" do + slow_test bigmanifest = time_block 'make example' do make_manifest(streams: 100, files_per_stream: 100, @@ -39,16 +40,17 @@ class CollectionsApiPerformanceTest < ActionDispatch::IntegrationTest end test "memory usage" do - hugemanifest = make_manifest(streams: 1, - files_per_stream: 2000, - blocks_per_file: 200, - bytes_per_block: 2**26, - api_token: api_token(:active)) + slow_test + hugemanifest = make_manifest(streams: 1, + files_per_stream: 2000, + blocks_per_file: 200, + bytes_per_block: 2**26, + api_token: api_token(:active)) json = time_block "JSON encode #{hugemanifest.length>>20}MiB manifest" do Oj.dump({manifest_text: hugemanifest}) end - vmpeak "post" do - post '/arvados/v1/collections', {collection: json}, auth(:active) - end + vmpeak "post" do + post '/arvados/v1/collections', {collection: json}, auth(:active) + end end end diff --git a/services/api/test/integration/database_reset_test.rb b/services/api/test/integration/database_reset_test.rb index 58f2abf697..ecb2f2a058 100644 --- a/services/api/test/integration/database_reset_test.rb +++ b/services/api/test/integration/database_reset_test.rb @@ -4,6 +4,7 @@ class DatabaseResetTest < ActionDispatch::IntegrationTest self.use_transactional_fixtures = false test "reset fails when Rails.env != 'test'" do + slow_test rails_env_was = Rails.env begin Rails.env = 'production' @@ -22,6 +23,7 @@ class DatabaseResetTest < ActionDispatch::IntegrationTest end test "database reset doesn't break basic CRUD operations" do + slow_test active_auth = auth(:active) admin_auth = auth(:admin) @@ -48,6 +50,7 @@ class DatabaseResetTest < ActionDispatch::IntegrationTest end test "roll back database change" do + slow_test active_auth = auth(:active) admin_auth = auth(:admin) diff --git a/services/api/test/integration/websocket_test.rb b/services/api/test/integration/websocket_test.rb index 313a22d8f8..d1b8c34a43 100644 --- a/services/api/test/integration/websocket_test.rb +++ b/services/api/test/integration/websocket_test.rb @@ -323,6 +323,7 @@ class WebsocketTest < ActionDispatch::IntegrationTest end test "connect, subscribe, get event, unsubscribe" do + slow_test state = 1 spec = nil spec_ev_uuid = nil @@ -372,6 +373,7 @@ class WebsocketTest < ActionDispatch::IntegrationTest end test "connect, subscribe, get event, unsubscribe with filter" do + slow_test state = 1 spec = nil spec_ev_uuid = nil @@ -421,6 +423,7 @@ class WebsocketTest < ActionDispatch::IntegrationTest test "connect, subscribe, get event, try to unsubscribe with bogus filter" do + slow_test state = 1 spec = nil spec_ev_uuid = nil @@ -473,6 +476,7 @@ class WebsocketTest < ActionDispatch::IntegrationTest test "connected, not subscribed, no event" do + slow_test authorize_with :admin ws_helper :admin, false do |ws| @@ -493,6 +497,7 @@ class WebsocketTest < ActionDispatch::IntegrationTest end test "connected, not authorized to see event" do + slow_test state = 1 authorize_with :admin @@ -608,6 +613,7 @@ class WebsocketTest < ActionDispatch::IntegrationTest end test "connect, subscribe, lots of events" do + slow_test state = 1 event_count = 0 log_start = Log.order(:id).last.id diff --git a/services/api/test/test_helper.rb b/services/api/test/test_helper.rb index 881a080772..25ab286a23 100644 --- a/services/api/test/test_helper.rb +++ b/services/api/test/test_helper.rb @@ -106,6 +106,10 @@ class ActiveSupport::TestCase ArvadosApiToken.new.call("rack.input" => "", "HTTP_AUTHORIZATION" => "OAuth2 #{t}") end + + def slow_test + skip "RAILS_TEST_SHORT is set" unless (ENV['RAILS_TEST_SHORT'] || '').empty? + end end class ActionController::TestCase diff --git a/services/api/test/unit/collection_performance_test.rb b/services/api/test/unit/collection_performance_test.rb index 37da5fddde..1c6e4f2db2 100644 --- a/services/api/test/unit/collection_performance_test.rb +++ b/services/api/test/unit/collection_performance_test.rb @@ -18,6 +18,7 @@ class CollectionModelPerformanceTest < ActiveSupport::TestCase # "crrud" == "create read render update delete", not a typo test "crrud cycle for a collection with a big manifest)" do + slow_test bigmanifest = time_block 'make example' do make_manifest(streams: 100, files_per_stream: 100, diff --git a/services/keepstore/azure_blob_volume.go b/services/keepstore/azure_blob_volume.go index f08cebff63..9c0374351b 100644 --- a/services/keepstore/azure_blob_volume.go +++ b/services/keepstore/azure_blob_volume.go @@ -139,11 +139,11 @@ func (v *AzureBlobVolume) Check() error { // If the block is younger than azureWriteRaceInterval and is // unexpectedly empty, assume a PutBlob operation is in progress, and // wait for it to finish writing. -func (v *AzureBlobVolume) Get(loc string) ([]byte, error) { +func (v *AzureBlobVolume) Get(loc string, buf []byte) (int, error) { var deadline time.Time haveDeadline := false - buf, err := v.get(loc) - for err == nil && len(buf) == 0 && loc != "d41d8cd98f00b204e9800998ecf8427e" { + size, err := v.get(loc, buf) + for err == nil && size == 0 && loc != "d41d8cd98f00b204e9800998ecf8427e" { // Seeing a brand new empty block probably means we're // in a race with CreateBlob, which under the hood // (apparently) does "CreateEmpty" and "CommitData" @@ -163,34 +163,32 @@ func (v *AzureBlobVolume) Get(loc string) ([]byte, error) { } else if time.Now().After(deadline) { break } - bufs.Put(buf) time.Sleep(azureWriteRacePollTime) - buf, err = v.get(loc) + size, err = v.get(loc, buf) } if haveDeadline { - log.Printf("Race ended with len(buf)==%d", len(buf)) + log.Printf("Race ended with size==%d", size) } - return buf, err + return size, err } -func (v *AzureBlobVolume) get(loc string) ([]byte, error) { - expectSize := BlockSize +func (v *AzureBlobVolume) get(loc string, buf []byte) (int, error) { + expectSize := len(buf) if azureMaxGetBytes < BlockSize { // Unfortunately the handler doesn't tell us how long the blob // is expected to be, so we have to ask Azure. props, err := v.bsClient.GetBlobProperties(v.containerName, loc) if err != nil { - return nil, v.translateError(err) + return 0, v.translateError(err) } if props.ContentLength > int64(BlockSize) || props.ContentLength < 0 { - return nil, fmt.Errorf("block %s invalid size %d (max %d)", loc, props.ContentLength, BlockSize) + return 0, fmt.Errorf("block %s invalid size %d (max %d)", loc, props.ContentLength, BlockSize) } expectSize = int(props.ContentLength) } - buf := bufs.Get(expectSize) if expectSize == 0 { - return buf, nil + return 0, nil } // We'll update this actualSize if/when we get the last piece. @@ -212,7 +210,7 @@ func (v *AzureBlobVolume) get(loc string) ([]byte, error) { if startPos == 0 && endPos == expectSize { rdr, err = v.bsClient.GetBlob(v.containerName, loc) } else { - rdr, err = v.bsClient.GetBlobRange(v.containerName, loc, fmt.Sprintf("%d-%d", startPos, endPos-1)) + rdr, err = v.bsClient.GetBlobRange(v.containerName, loc, fmt.Sprintf("%d-%d", startPos, endPos-1), nil) } if err != nil { errors[p] = err @@ -235,11 +233,10 @@ func (v *AzureBlobVolume) get(loc string) ([]byte, error) { wg.Wait() for _, err := range errors { if err != nil { - bufs.Put(buf) - return nil, v.translateError(err) + return 0, v.translateError(err) } } - return buf[:actualSize], nil + return actualSize, nil } // Compare the given data with existing stored data. diff --git a/services/keepstore/azure_blob_volume_test.go b/services/keepstore/azure_blob_volume_test.go index 439b402214..e3c0e27083 100644 --- a/services/keepstore/azure_blob_volume_test.go +++ b/services/keepstore/azure_blob_volume_test.go @@ -425,13 +425,12 @@ func TestAzureBlobVolumeRangeFenceposts(t *testing.T) { if err != nil { t.Error(err) } - gotData, err := v.Get(hash) + gotData := make([]byte, len(data)) + gotLen, err := v.Get(hash, gotData) if err != nil { t.Error(err) } gotHash := fmt.Sprintf("%x", md5.Sum(gotData)) - gotLen := len(gotData) - bufs.Put(gotData) if gotLen != size { t.Error("length mismatch: got %d != %d", gotLen, size) } @@ -477,11 +476,10 @@ func TestAzureBlobVolumeCreateBlobRace(t *testing.T) { // Wait for the stub's Put to create the empty blob v.azHandler.race <- continuePut go func() { - buf, err := v.Get(TestHash) + buf := make([]byte, len(TestBlock)) + _, err := v.Get(TestHash, buf) if err != nil { t.Error(err) - } else { - bufs.Put(buf) } close(allDone) }() @@ -521,15 +519,15 @@ func TestAzureBlobVolumeCreateBlobRaceDeadline(t *testing.T) { allDone := make(chan struct{}) go func() { defer close(allDone) - buf, err := v.Get(TestHash) + buf := make([]byte, BlockSize) + n, err := v.Get(TestHash, buf) if err != nil { t.Error(err) return } - if len(buf) != 0 { - t.Errorf("Got %+q, expected empty buf", buf) + if n != 0 { + t.Errorf("Got %+q, expected empty buf", buf[:n]) } - bufs.Put(buf) }() select { case <-allDone: diff --git a/services/keepstore/handler_test.go b/services/keepstore/handler_test.go index 33d585ae1e..7c17424ba5 100644 --- a/services/keepstore/handler_test.go +++ b/services/keepstore/handler_test.go @@ -561,7 +561,8 @@ func TestDeleteHandler(t *testing.T) { expectedDc, responseDc) } // Confirm the block has been deleted - _, err := vols[0].Get(TestHash) + buf := make([]byte, BlockSize) + _, err := vols[0].Get(TestHash, buf) var blockDeleted = os.IsNotExist(err) if !blockDeleted { t.Error("superuserExistingBlockReq: block not deleted") @@ -585,7 +586,7 @@ func TestDeleteHandler(t *testing.T) { expectedDc, responseDc) } // Confirm the block has NOT been deleted. - _, err = vols[0].Get(TestHash) + _, err = vols[0].Get(TestHash, buf) if err != nil { t.Errorf("testing delete on new block: %s\n", err) } @@ -913,6 +914,65 @@ func TestPutHandlerNoBufferleak(t *testing.T) { } } +type notifyingResponseRecorder struct { + *httptest.ResponseRecorder + closer chan bool +} + +func (r *notifyingResponseRecorder) CloseNotify() <-chan bool { + return r.closer +} + +func TestGetHandlerClientDisconnect(t *testing.T) { + defer func(was bool) { + enforcePermissions = was + }(enforcePermissions) + enforcePermissions = false + + defer func(orig *bufferPool) { + bufs = orig + }(bufs) + bufs = newBufferPool(1, BlockSize) + defer bufs.Put(bufs.Get(BlockSize)) + + KeepVM = MakeTestVolumeManager(2) + defer KeepVM.Close() + + if err := KeepVM.AllWritable()[0].Put(TestHash, TestBlock); err != nil { + t.Error(err) + } + + resp := ¬ifyingResponseRecorder{ + ResponseRecorder: httptest.NewRecorder(), + closer: make(chan bool, 1), + } + if _, ok := http.ResponseWriter(resp).(http.CloseNotifier); !ok { + t.Fatal("notifyingResponseRecorder is broken") + } + // If anyone asks, the client has disconnected. + resp.closer <- true + + ok := make(chan struct{}) + go func() { + req, _ := http.NewRequest("GET", fmt.Sprintf("/%s+%d", TestHash, len(TestBlock)), nil) + (&LoggingRESTRouter{MakeRESTRouter()}).ServeHTTP(resp, req) + ok <- struct{}{} + }() + + select { + case <-time.After(20 * time.Second): + t.Fatal("request took >20s, close notifier must be broken") + case <-ok: + } + + ExpectStatusCode(t, "client disconnect", http.StatusServiceUnavailable, resp.ResponseRecorder) + for i, v := range KeepVM.AllWritable() { + if calls := v.(*MockVolume).called["GET"]; calls != 0 { + t.Errorf("volume %d got %d calls, expected 0", i, calls) + } + } +} + // Invoke the GetBlockHandler a bunch of times to test for bufferpool resource // leak. func TestGetHandlerNoBufferleak(t *testing.T) { diff --git a/services/keepstore/handlers.go b/services/keepstore/handlers.go index 043ab69b17..f698982415 100644 --- a/services/keepstore/handlers.go +++ b/services/keepstore/handlers.go @@ -79,18 +79,61 @@ func GetBlockHandler(resp http.ResponseWriter, req *http.Request) { } } - block, err := GetBlock(mux.Vars(req)["hash"]) + // TODO: Probe volumes to check whether the block _might_ + // exist. Some volumes/types could support a quick existence + // check without causing other operations to suffer. If all + // volumes support that, and assure us the block definitely + // isn't here, we can return 404 now instead of waiting for a + // buffer. + + buf, err := getBufferForResponseWriter(resp, bufs, BlockSize) if err != nil { - // This type assertion is safe because the only errors - // GetBlock can return are DiskHashError or NotFoundError. - http.Error(resp, err.Error(), err.(*KeepError).HTTPCode) + http.Error(resp, err.Error(), http.StatusServiceUnavailable) return } - defer bufs.Put(block) + defer bufs.Put(buf) - resp.Header().Set("Content-Length", strconv.Itoa(len(block))) + size, err := GetBlock(mux.Vars(req)["hash"], buf, resp) + if err != nil { + code := http.StatusInternalServerError + if err, ok := err.(*KeepError); ok { + code = err.HTTPCode + } + http.Error(resp, err.Error(), code) + return + } + + resp.Header().Set("Content-Length", strconv.Itoa(size)) resp.Header().Set("Content-Type", "application/octet-stream") - resp.Write(block) + resp.Write(buf[:size]) +} + +// Get a buffer from the pool -- but give up and return a non-nil +// error if resp implements http.CloseNotifier and tells us that the +// client has disconnected before we get a buffer. +func getBufferForResponseWriter(resp http.ResponseWriter, bufs *bufferPool, bufSize int) ([]byte, error) { + var closeNotifier <-chan bool + if resp, ok := resp.(http.CloseNotifier); ok { + closeNotifier = resp.CloseNotify() + } + var buf []byte + bufReady := make(chan []byte) + go func() { + bufReady <- bufs.Get(bufSize) + close(bufReady) + }() + select { + case buf = <-bufReady: + return buf, nil + case <-closeNotifier: + go func() { + // Even if closeNotifier happened first, we + // need to keep waiting for our buf so we can + // return it to the pool. + bufs.Put(<-bufReady) + }() + return nil, ErrClientDisconnect + } } // PutBlockHandler is a HandleFunc to address Put block requests. @@ -116,8 +159,13 @@ func PutBlockHandler(resp http.ResponseWriter, req *http.Request) { return } - buf := bufs.Get(int(req.ContentLength)) - _, err := io.ReadFull(req.Body, buf) + buf, err := getBufferForResponseWriter(resp, bufs, int(req.ContentLength)) + if err != nil { + http.Error(resp, err.Error(), http.StatusServiceUnavailable) + return + } + + _, err = io.ReadFull(req.Body, buf) if err != nil { http.Error(resp, err.Error(), 500) bufs.Put(buf) @@ -481,7 +529,6 @@ func UntrashHandler(resp http.ResponseWriter, req *http.Request) { } } -// ============================== // GetBlock and PutBlock implement lower-level code for handling // blocks by rooting through volumes connected to the local machine. // Once the handler has determined that system policy permits the @@ -492,24 +539,21 @@ func UntrashHandler(resp http.ResponseWriter, req *http.Request) { // should be the only part of the code that cares about which volume a // block is stored on, so it should be responsible for figuring out // which volume to check for fetching blocks, storing blocks, etc. -// ============================== -// GetBlock fetches and returns the block identified by "hash". -// -// On success, GetBlock returns a byte slice with the block data, and -// a nil error. +// GetBlock fetches the block identified by "hash" into the provided +// buf, and returns the data size. // // If the block cannot be found on any volume, returns NotFoundError. // // If the block found does not have the correct MD5 hash, returns // DiskHashError. // -func GetBlock(hash string) ([]byte, error) { +func GetBlock(hash string, buf []byte, resp http.ResponseWriter) (int, error) { // Attempt to read the requested hash from a keep volume. errorToCaller := NotFoundError for _, vol := range KeepVM.AllReadable() { - buf, err := vol.Get(hash) + size, err := vol.Get(hash, buf) if err != nil { // IsNotExist is an expected error and may be // ignored. All other errors are logged. In @@ -523,23 +567,22 @@ func GetBlock(hash string) ([]byte, error) { } // Check the file checksum. // - filehash := fmt.Sprintf("%x", md5.Sum(buf)) + filehash := fmt.Sprintf("%x", md5.Sum(buf[:size])) if filehash != hash { // TODO: Try harder to tell a sysadmin about // this. log.Printf("%s: checksum mismatch for request %s (actual %s)", vol, hash, filehash) errorToCaller = DiskHashError - bufs.Put(buf) continue } if errorToCaller == DiskHashError { log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned", vol, hash) } - return buf, nil + return size, nil } - return nil, errorToCaller + return 0, errorToCaller } // PutBlock Stores the BLOCK (identified by the content id HASH) in Keep. diff --git a/services/keepstore/handlers_with_generic_volume_test.go b/services/keepstore/handlers_with_generic_volume_test.go index c5349d399c..dda7edcec3 100644 --- a/services/keepstore/handlers_with_generic_volume_test.go +++ b/services/keepstore/handlers_with_generic_volume_test.go @@ -45,12 +45,13 @@ func testGetBlock(t TB, factory TestableVolumeManagerFactory, testHash string, t testableVolumes[1].PutRaw(testHash, testBlock) // Get should pass - buf, err := GetBlock(testHash) + buf := make([]byte, len(testBlock)) + n, err := GetBlock(testHash, buf, nil) if err != nil { t.Fatalf("Error while getting block %s", err) } - if bytes.Compare(buf, testBlock) != 0 { - t.Errorf("Put succeeded but Get returned %+v, expected %+v", buf, testBlock) + if bytes.Compare(buf[:n], testBlock) != 0 { + t.Errorf("Put succeeded but Get returned %+v, expected %+v", buf[:n], testBlock) } } @@ -64,9 +65,10 @@ func testPutRawBadDataGetBlock(t TB, factory TestableVolumeManagerFactory, testableVolumes[1].PutRaw(testHash, badData) // Get should fail - _, err := GetBlock(testHash) + buf := make([]byte, BlockSize) + size, err := GetBlock(testHash, buf, nil) if err == nil { - t.Fatalf("Expected error while getting corrupt block %v", testHash) + t.Fatalf("Got %+q, expected error while getting corrupt block %v", buf[:size], testHash) } } @@ -85,11 +87,12 @@ func testPutBlock(t TB, factory TestableVolumeManagerFactory, testHash string, t } // Check that PutBlock stored the data as expected - buf, err := GetBlock(testHash) + buf := make([]byte, BlockSize) + size, err := GetBlock(testHash, buf, nil) if err != nil { t.Fatalf("Error during GetBlock for %q: %s", testHash, err) - } else if bytes.Compare(buf, testBlock) != 0 { - t.Errorf("Get response incorrect. Expected %q; found %q", testBlock, buf) + } else if bytes.Compare(buf[:size], testBlock) != 0 { + t.Errorf("Get response incorrect. Expected %q; found %q", testBlock, buf[:size]) } } @@ -109,10 +112,11 @@ func testPutBlockCorrupt(t TB, factory TestableVolumeManagerFactory, // Put succeeded and overwrote the badData in one volume, // and Get should return the testBlock now, ignoring the bad data. - buf, err := GetBlock(testHash) + buf := make([]byte, BlockSize) + size, err := GetBlock(testHash, buf, nil) if err != nil { t.Fatalf("Error during GetBlock for %q: %s", testHash, err) - } else if bytes.Compare(buf, testBlock) != 0 { - t.Errorf("Get response incorrect. Expected %q; found %q", testBlock, buf) + } else if bytes.Compare(buf[:size], testBlock) != 0 { + t.Errorf("Get response incorrect. Expected %q; found %q", testBlock, buf[:size]) } } diff --git a/services/keepstore/keepstore.go b/services/keepstore/keepstore.go index 93ee43c446..80d8670105 100644 --- a/services/keepstore/keepstore.go +++ b/services/keepstore/keepstore.go @@ -91,6 +91,7 @@ var ( TooLongError = &KeepError{413, "Block is too large"} MethodDisabledError = &KeepError{405, "Method disabled"} ErrNotImplemented = &KeepError{500, "Unsupported configuration"} + ErrClientDisconnect = &KeepError{503, "Client disconnected"} ) func (e *KeepError) Error() string { diff --git a/services/keepstore/keepstore_test.go b/services/keepstore/keepstore_test.go index 2a1c3d243a..c0adbc0bd7 100644 --- a/services/keepstore/keepstore_test.go +++ b/services/keepstore/keepstore_test.go @@ -66,12 +66,13 @@ func TestGetBlock(t *testing.T) { } // Check that GetBlock returns success. - result, err := GetBlock(TestHash) + buf := make([]byte, BlockSize) + size, err := GetBlock(TestHash, buf, nil) if err != nil { t.Errorf("GetBlock error: %s", err) } - if fmt.Sprint(result) != fmt.Sprint(TestBlock) { - t.Errorf("expected %s, got %s", TestBlock, result) + if bytes.Compare(buf[:size], TestBlock) != 0 { + t.Errorf("got %v, expected %v", buf[:size], TestBlock) } } @@ -86,9 +87,10 @@ func TestGetBlockMissing(t *testing.T) { defer KeepVM.Close() // Check that GetBlock returns failure. - result, err := GetBlock(TestHash) + buf := make([]byte, BlockSize) + size, err := GetBlock(TestHash, buf, nil) if err != NotFoundError { - t.Errorf("Expected NotFoundError, got %v", result) + t.Errorf("Expected NotFoundError, got %v, err %v", buf[:size], err) } } @@ -107,9 +109,10 @@ func TestGetBlockCorrupt(t *testing.T) { vols[0].Put(TestHash, BadBlock) // Check that GetBlock returns failure. - result, err := GetBlock(TestHash) + buf := make([]byte, BlockSize) + size, err := GetBlock(TestHash, buf, nil) if err != DiskHashError { - t.Errorf("Expected DiskHashError, got %v (buf: %v)", err, result) + t.Errorf("Expected DiskHashError, got %v (buf: %v)", err, buf[:size]) } } @@ -133,13 +136,14 @@ func TestPutBlockOK(t *testing.T) { } vols := KeepVM.AllReadable() - result, err := vols[1].Get(TestHash) + buf := make([]byte, BlockSize) + n, err := vols[1].Get(TestHash, buf) if err != nil { t.Fatalf("Volume #0 Get returned error: %v", err) } - if string(result) != string(TestBlock) { + if string(buf[:n]) != string(TestBlock) { t.Fatalf("PutBlock stored '%s', Get retrieved '%s'", - string(TestBlock), string(result)) + string(TestBlock), string(buf[:n])) } } @@ -162,14 +166,14 @@ func TestPutBlockOneVol(t *testing.T) { t.Fatalf("PutBlock: n %d err %v", n, err) } - result, err := GetBlock(TestHash) + buf := make([]byte, BlockSize) + size, err := GetBlock(TestHash, buf, nil) if err != nil { t.Fatalf("GetBlock: %v", err) } - if string(result) != string(TestBlock) { - t.Error("PutBlock/GetBlock mismatch") - t.Fatalf("PutBlock stored '%s', GetBlock retrieved '%s'", - string(TestBlock), string(result)) + if bytes.Compare(buf[:size], TestBlock) != 0 { + t.Fatalf("PutBlock stored %+q, GetBlock retrieved %+q", + TestBlock, buf[:size]) } } @@ -191,7 +195,7 @@ func TestPutBlockMD5Fail(t *testing.T) { } // Confirm that GetBlock fails to return anything. - if result, err := GetBlock(TestHash); err != NotFoundError { + if result, err := GetBlock(TestHash, make([]byte, BlockSize), nil); err != NotFoundError { t.Errorf("GetBlock succeeded after a corrupt block store (result = %s, err = %v)", string(result), err) } @@ -216,10 +220,11 @@ func TestPutBlockCorrupt(t *testing.T) { } // The block on disk should now match TestBlock. - if block, err := GetBlock(TestHash); err != nil { + buf := make([]byte, BlockSize) + if size, err := GetBlock(TestHash, buf, nil); err != nil { t.Errorf("GetBlock: %v", err) - } else if bytes.Compare(block, TestBlock) != 0 { - t.Errorf("GetBlock returned: '%s'", string(block)) + } else if bytes.Compare(buf[:size], TestBlock) != 0 { + t.Errorf("Got %+q, expected %+q", buf[:size], TestBlock) } } @@ -290,12 +295,13 @@ func TestPutBlockTouchFails(t *testing.T) { t.Errorf("mtime was changed on vols[0]:\noldMtime = %v\nnewMtime = %v\n", oldMtime, newMtime) } - result, err := vols[1].Get(TestHash) + buf := make([]byte, BlockSize) + n, err := vols[1].Get(TestHash, buf) if err != nil { t.Fatalf("vols[1]: %v", err) } - if bytes.Compare(result, TestBlock) != 0 { - t.Errorf("new block does not match test block\nnew block = %v\n", result) + if bytes.Compare(buf[:n], TestBlock) != 0 { + t.Errorf("new block does not match test block\nnew block = %v\n", buf[:n]) } } diff --git a/services/keepstore/logging_router.go b/services/keepstore/logging_router.go index a93b72cf61..0f556b538a 100644 --- a/services/keepstore/logging_router.go +++ b/services/keepstore/logging_router.go @@ -19,26 +19,40 @@ type LoggingResponseWriter struct { sentHdr time.Time } +// CloseNotify implements http.CloseNotifier. +func (resp *LoggingResponseWriter) CloseNotify() <-chan bool { + wrapped, ok := resp.ResponseWriter.(http.CloseNotifier) + if !ok { + // If upstream doesn't implement CloseNotifier, we can + // satisfy the interface by returning a channel that + // never sends anything (the interface doesn't + // guarantee that anything will ever be sent on the + // channel even if the client disconnects). + return nil + } + return wrapped.CloseNotify() +} + // WriteHeader writes header to ResponseWriter -func (loggingWriter *LoggingResponseWriter) WriteHeader(code int) { - if loggingWriter.sentHdr == zeroTime { - loggingWriter.sentHdr = time.Now() +func (resp *LoggingResponseWriter) WriteHeader(code int) { + if resp.sentHdr == zeroTime { + resp.sentHdr = time.Now() } - loggingWriter.Status = code - loggingWriter.ResponseWriter.WriteHeader(code) + resp.Status = code + resp.ResponseWriter.WriteHeader(code) } var zeroTime time.Time -func (loggingWriter *LoggingResponseWriter) Write(data []byte) (int, error) { - if loggingWriter.Length == 0 && len(data) > 0 && loggingWriter.sentHdr == zeroTime { - loggingWriter.sentHdr = time.Now() +func (resp *LoggingResponseWriter) Write(data []byte) (int, error) { + if resp.Length == 0 && len(data) > 0 && resp.sentHdr == zeroTime { + resp.sentHdr = time.Now() } - loggingWriter.Length += len(data) - if loggingWriter.Status >= 400 { - loggingWriter.ResponseBody += string(data) + resp.Length += len(data) + if resp.Status >= 400 { + resp.ResponseBody += string(data) } - return loggingWriter.ResponseWriter.Write(data) + return resp.ResponseWriter.Write(data) } // LoggingRESTRouter is used to add logging capabilities to mux.Router @@ -46,18 +60,18 @@ type LoggingRESTRouter struct { router http.Handler } -func (loggingRouter *LoggingRESTRouter) ServeHTTP(resp http.ResponseWriter, req *http.Request) { +func (loggingRouter *LoggingRESTRouter) ServeHTTP(wrappedResp http.ResponseWriter, req *http.Request) { t0 := time.Now() - loggingWriter := LoggingResponseWriter{http.StatusOK, 0, resp, "", zeroTime} - loggingRouter.router.ServeHTTP(&loggingWriter, req) - statusText := http.StatusText(loggingWriter.Status) - if loggingWriter.Status >= 400 { - statusText = strings.Replace(loggingWriter.ResponseBody, "\n", "", -1) + resp := LoggingResponseWriter{http.StatusOK, 0, wrappedResp, "", zeroTime} + loggingRouter.router.ServeHTTP(&resp, req) + statusText := http.StatusText(resp.Status) + if resp.Status >= 400 { + statusText = strings.Replace(resp.ResponseBody, "\n", "", -1) } now := time.Now() tTotal := now.Sub(t0) - tLatency := loggingWriter.sentHdr.Sub(t0) - tResponse := now.Sub(loggingWriter.sentHdr) - log.Printf("[%s] %s %s %d %.6fs %.6fs %.6fs %d %d \"%s\"", req.RemoteAddr, req.Method, req.URL.Path[1:], req.ContentLength, tTotal.Seconds(), tLatency.Seconds(), tResponse.Seconds(), loggingWriter.Status, loggingWriter.Length, statusText) + tLatency := resp.sentHdr.Sub(t0) + tResponse := now.Sub(resp.sentHdr) + log.Printf("[%s] %s %s %d %.6fs %.6fs %.6fs %d %d \"%s\"", req.RemoteAddr, req.Method, req.URL.Path[1:], req.ContentLength, tTotal.Seconds(), tLatency.Seconds(), tResponse.Seconds(), resp.Status, resp.Length, statusText) } diff --git a/services/keepstore/logging_router_test.go b/services/keepstore/logging_router_test.go new file mode 100644 index 0000000000..aa88556ae4 --- /dev/null +++ b/services/keepstore/logging_router_test.go @@ -0,0 +1,10 @@ +package main + +import ( + "net/http" + "testing" +) + +func TestLoggingResponseWriterImplementsCloseNotifier(t *testing.T) { + http.ResponseWriter(&LoggingResponseWriter{}).(http.CloseNotifier).CloseNotify() +} diff --git a/services/keepstore/s3_volume.go b/services/keepstore/s3_volume.go index 79a680d58a..d068b2a6e5 100644 --- a/services/keepstore/s3_volume.go +++ b/services/keepstore/s3_volume.go @@ -153,20 +153,18 @@ func (v *S3Volume) Check() error { return nil } -func (v *S3Volume) Get(loc string) ([]byte, error) { +func (v *S3Volume) Get(loc string, buf []byte) (int, error) { rdr, err := v.Bucket.GetReader(loc) if err != nil { - return nil, v.translateError(err) + return 0, v.translateError(err) } defer rdr.Close() - buf := bufs.Get(BlockSize) n, err := io.ReadFull(rdr, buf) switch err { case nil, io.EOF, io.ErrUnexpectedEOF: - return buf[:n], nil + return n, nil default: - bufs.Put(buf) - return nil, v.translateError(err) + return 0, v.translateError(err) } } diff --git a/services/keepstore/trash_worker_test.go b/services/keepstore/trash_worker_test.go index ac9406178c..d111caeac8 100644 --- a/services/keepstore/trash_worker_test.go +++ b/services/keepstore/trash_worker_test.go @@ -290,26 +290,27 @@ func performTrashWorkerTest(testData TrashWorkerTestData, t *testing.T) { expectEqualWithin(t, time.Second, 0, func() interface{} { return trashq.Status().InProgress }) // Verify Locator1 to be un/deleted as expected - data, _ := GetBlock(testData.Locator1) + buf := make([]byte, BlockSize) + size, err := GetBlock(testData.Locator1, buf, nil) if testData.ExpectLocator1 { - if len(data) == 0 { + if size == 0 || err != nil { t.Errorf("Expected Locator1 to be still present: %s", testData.Locator1) } } else { - if len(data) > 0 { + if size > 0 || err == nil { t.Errorf("Expected Locator1 to be deleted: %s", testData.Locator1) } } // Verify Locator2 to be un/deleted as expected if testData.Locator1 != testData.Locator2 { - data, _ = GetBlock(testData.Locator2) + size, err = GetBlock(testData.Locator2, buf, nil) if testData.ExpectLocator2 { - if len(data) == 0 { + if size == 0 || err != nil { t.Errorf("Expected Locator2 to be still present: %s", testData.Locator2) } } else { - if len(data) > 0 { + if size > 0 || err == nil { t.Errorf("Expected Locator2 to be deleted: %s", testData.Locator2) } } @@ -321,7 +322,8 @@ func performTrashWorkerTest(testData TrashWorkerTestData, t *testing.T) { if testData.DifferentMtimes { locatorFoundIn := 0 for _, volume := range KeepVM.AllReadable() { - if _, err := volume.Get(testData.Locator1); err == nil { + buf := make([]byte, BlockSize) + if _, err := volume.Get(testData.Locator1, buf); err == nil { locatorFoundIn = locatorFoundIn + 1 } } diff --git a/services/keepstore/volume.go b/services/keepstore/volume.go index 17da54fdad..8ae6660fd4 100644 --- a/services/keepstore/volume.go +++ b/services/keepstore/volume.go @@ -10,17 +10,14 @@ import ( // for example, a single mounted disk, a RAID array, an Amazon S3 volume, // etc. type Volume interface { - // Get a block. IFF the returned error is nil, the caller must - // put the returned slice back into the buffer pool when it's - // finished with it. (Otherwise, the buffer pool will be - // depleted and eventually -- when all available buffers are - // used and not returned -- operations will reach deadlock.) + // Get a block: copy the block data into buf, and return the + // number of bytes copied. // // loc is guaranteed to consist of 32 or more lowercase hex // digits. // - // Get should not verify the integrity of the returned data: - // it should just return whatever was found in its backing + // Get should not verify the integrity of the data: it should + // just return whatever was found in its backing // store. (Integrity checking is the caller's responsibility.) // // If an error is encountered that prevents it from @@ -36,10 +33,12 @@ type Volume interface { // access log if the block is not found on any other volumes // either). // - // If the data in the backing store is bigger than BlockSize, - // Get is permitted to return an error without reading any of - // the data. - Get(loc string) ([]byte, error) + // If the data in the backing store is bigger than len(buf), + // then Get is permitted to return an error without reading + // any of the data. + // + // len(buf) will not exceed BlockSize. + Get(loc string, buf []byte) (int, error) // Compare the given data with the stored data (i.e., what Get // would return). If equal, return nil. If not, return diff --git a/services/keepstore/volume_generic_test.go b/services/keepstore/volume_generic_test.go index 95166c252f..105795c146 100644 --- a/services/keepstore/volume_generic_test.go +++ b/services/keepstore/volume_generic_test.go @@ -89,14 +89,13 @@ func testGet(t TB, factory TestableVolumeFactory) { v.PutRaw(TestHash, TestBlock) - buf, err := v.Get(TestHash) + buf := make([]byte, BlockSize) + n, err := v.Get(TestHash, buf) if err != nil { t.Fatal(err) } - bufs.Put(buf) - - if bytes.Compare(buf, TestBlock) != 0 { + if bytes.Compare(buf[:n], TestBlock) != 0 { t.Errorf("expected %s, got %s", string(TestBlock), string(buf)) } } @@ -107,7 +106,8 @@ func testGetNoSuchBlock(t TB, factory TestableVolumeFactory) { v := factory(t) defer v.Teardown() - if _, err := v.Get(TestHash2); err == nil { + buf := make([]byte, BlockSize) + if _, err := v.Get(TestHash2, buf); err == nil { t.Errorf("Expected error while getting non-existing block %v", TestHash2) } } @@ -208,24 +208,22 @@ func testPutBlockWithDifferentContent(t TB, factory TestableVolumeFactory, testH v.PutRaw(testHash, testDataA) putErr := v.Put(testHash, testDataB) - buf, getErr := v.Get(testHash) + buf := make([]byte, BlockSize) + n, getErr := v.Get(testHash, buf) if putErr == nil { // Put must not return a nil error unless it has // overwritten the existing data. - if bytes.Compare(buf, testDataB) != 0 { - t.Errorf("Put succeeded but Get returned %+q, expected %+q", buf, testDataB) + if bytes.Compare(buf[:n], testDataB) != 0 { + t.Errorf("Put succeeded but Get returned %+q, expected %+q", buf[:n], testDataB) } } else { // It is permissible for Put to fail, but it must // leave us with either the original data, the new // data, or nothing at all. - if getErr == nil && bytes.Compare(buf, testDataA) != 0 && bytes.Compare(buf, testDataB) != 0 { - t.Errorf("Put failed but Get returned %+q, which is neither %+q nor %+q", buf, testDataA, testDataB) + if getErr == nil && bytes.Compare(buf[:n], testDataA) != 0 && bytes.Compare(buf[:n], testDataB) != 0 { + t.Errorf("Put failed but Get returned %+q, which is neither %+q nor %+q", buf[:n], testDataA, testDataB) } } - if getErr == nil { - bufs.Put(buf) - } } // Put and get multiple blocks @@ -253,34 +251,32 @@ func testPutMultipleBlocks(t TB, factory TestableVolumeFactory) { t.Errorf("Got err putting block %q: %q, expected nil", TestBlock3, err) } - data, err := v.Get(TestHash) + data := make([]byte, BlockSize) + n, err := v.Get(TestHash, data) if err != nil { t.Error(err) } else { - if bytes.Compare(data, TestBlock) != 0 { - t.Errorf("Block present, but got %+q, expected %+q", data, TestBlock) + if bytes.Compare(data[:n], TestBlock) != 0 { + t.Errorf("Block present, but got %+q, expected %+q", data[:n], TestBlock) } - bufs.Put(data) } - data, err = v.Get(TestHash2) + n, err = v.Get(TestHash2, data) if err != nil { t.Error(err) } else { - if bytes.Compare(data, TestBlock2) != 0 { - t.Errorf("Block present, but got %+q, expected %+q", data, TestBlock2) + if bytes.Compare(data[:n], TestBlock2) != 0 { + t.Errorf("Block present, but got %+q, expected %+q", data[:n], TestBlock2) } - bufs.Put(data) } - data, err = v.Get(TestHash3) + n, err = v.Get(TestHash3, data) if err != nil { t.Error(err) } else { - if bytes.Compare(data, TestBlock3) != 0 { - t.Errorf("Block present, but to %+q, expected %+q", data, TestBlock3) + if bytes.Compare(data[:n], TestBlock3) != 0 { + t.Errorf("Block present, but to %+q, expected %+q", data[:n], TestBlock3) } - bufs.Put(data) } } @@ -426,14 +422,12 @@ func testDeleteNewBlock(t TB, factory TestableVolumeFactory) { if err := v.Trash(TestHash); err != nil { t.Error(err) } - data, err := v.Get(TestHash) + data := make([]byte, BlockSize) + n, err := v.Get(TestHash, data) if err != nil { t.Error(err) - } else { - if bytes.Compare(data, TestBlock) != 0 { - t.Errorf("Got data %+q, expected %+q", data, TestBlock) - } - bufs.Put(data) + } else if bytes.Compare(data[:n], TestBlock) != 0 { + t.Errorf("Got data %+q, expected %+q", data[:n], TestBlock) } } @@ -455,7 +449,8 @@ func testDeleteOldBlock(t TB, factory TestableVolumeFactory) { if err := v.Trash(TestHash); err != nil { t.Error(err) } - if _, err := v.Get(TestHash); err == nil || !os.IsNotExist(err) { + data := make([]byte, BlockSize) + if _, err := v.Get(TestHash, data); err == nil || !os.IsNotExist(err) { t.Errorf("os.IsNotExist(%v) should have been true", err) } } @@ -514,9 +509,10 @@ func testUpdateReadOnly(t TB, factory TestableVolumeFactory) { } v.PutRaw(TestHash, TestBlock) + buf := make([]byte, BlockSize) // Get from read-only volume should succeed - _, err := v.Get(TestHash) + _, err := v.Get(TestHash, buf) if err != nil { t.Errorf("got err %v, expected nil", err) } @@ -526,7 +522,7 @@ func testUpdateReadOnly(t TB, factory TestableVolumeFactory) { if err == nil { t.Errorf("Expected error when putting block in a read-only volume") } - _, err = v.Get(TestHash2) + _, err = v.Get(TestHash2, buf) if err == nil { t.Errorf("Expected error when getting block whose put in read-only volume failed") } @@ -561,45 +557,45 @@ func testGetConcurrent(t TB, factory TestableVolumeFactory) { v.PutRaw(TestHash3, TestBlock3) sem := make(chan int) - go func(sem chan int) { - buf, err := v.Get(TestHash) + go func() { + buf := make([]byte, BlockSize) + n, err := v.Get(TestHash, buf) if err != nil { t.Errorf("err1: %v", err) } - bufs.Put(buf) - if bytes.Compare(buf, TestBlock) != 0 { - t.Errorf("buf should be %s, is %s", string(TestBlock), string(buf)) + if bytes.Compare(buf[:n], TestBlock) != 0 { + t.Errorf("buf should be %s, is %s", string(TestBlock), string(buf[:n])) } sem <- 1 - }(sem) + }() - go func(sem chan int) { - buf, err := v.Get(TestHash2) + go func() { + buf := make([]byte, BlockSize) + n, err := v.Get(TestHash2, buf) if err != nil { t.Errorf("err2: %v", err) } - bufs.Put(buf) - if bytes.Compare(buf, TestBlock2) != 0 { - t.Errorf("buf should be %s, is %s", string(TestBlock2), string(buf)) + if bytes.Compare(buf[:n], TestBlock2) != 0 { + t.Errorf("buf should be %s, is %s", string(TestBlock2), string(buf[:n])) } sem <- 1 - }(sem) + }() - go func(sem chan int) { - buf, err := v.Get(TestHash3) + go func() { + buf := make([]byte, BlockSize) + n, err := v.Get(TestHash3, buf) if err != nil { t.Errorf("err3: %v", err) } - bufs.Put(buf) - if bytes.Compare(buf, TestBlock3) != 0 { - t.Errorf("buf should be %s, is %s", string(TestBlock3), string(buf)) + if bytes.Compare(buf[:n], TestBlock3) != 0 { + t.Errorf("buf should be %s, is %s", string(TestBlock3), string(buf[:n])) } sem <- 1 - }(sem) + }() // Wait for all goroutines to finish - for done := 0; done < 3; { - done += <-sem + for done := 0; done < 3; done++ { + <-sem } } @@ -639,36 +635,34 @@ func testPutConcurrent(t TB, factory TestableVolumeFactory) { }(sem) // Wait for all goroutines to finish - for done := 0; done < 3; { - done += <-sem + for done := 0; done < 3; done++ { + <-sem } // Double check that we actually wrote the blocks we expected to write. - buf, err := v.Get(TestHash) + buf := make([]byte, BlockSize) + n, err := v.Get(TestHash, buf) if err != nil { t.Errorf("Get #1: %v", err) } - bufs.Put(buf) - if bytes.Compare(buf, TestBlock) != 0 { - t.Errorf("Get #1: expected %s, got %s", string(TestBlock), string(buf)) + if bytes.Compare(buf[:n], TestBlock) != 0 { + t.Errorf("Get #1: expected %s, got %s", string(TestBlock), string(buf[:n])) } - buf, err = v.Get(TestHash2) + n, err = v.Get(TestHash2, buf) if err != nil { t.Errorf("Get #2: %v", err) } - bufs.Put(buf) - if bytes.Compare(buf, TestBlock2) != 0 { - t.Errorf("Get #2: expected %s, got %s", string(TestBlock2), string(buf)) + if bytes.Compare(buf[:n], TestBlock2) != 0 { + t.Errorf("Get #2: expected %s, got %s", string(TestBlock2), string(buf[:n])) } - buf, err = v.Get(TestHash3) + n, err = v.Get(TestHash3, buf) if err != nil { t.Errorf("Get #3: %v", err) } - bufs.Put(buf) - if bytes.Compare(buf, TestBlock3) != 0 { - t.Errorf("Get #3: expected %s, got %s", string(TestBlock3), string(buf)) + if bytes.Compare(buf[:n], TestBlock3) != 0 { + t.Errorf("Get #3: expected %s, got %s", string(TestBlock3), string(buf[:n])) } } @@ -689,14 +683,13 @@ func testPutFullBlock(t TB, factory TestableVolumeFactory) { if err != nil { t.Fatal(err) } - rdata, err := v.Get(hash) + buf := make([]byte, BlockSize) + n, err := v.Get(hash, buf) if err != nil { t.Error(err) - } else { - defer bufs.Put(rdata) } - if bytes.Compare(rdata, wdata) != 0 { - t.Error("rdata != wdata") + if bytes.Compare(buf[:n], wdata) != 0 { + t.Error("buf %+q != wdata %+q", buf[:n], wdata) } } @@ -717,14 +710,14 @@ func testTrashUntrash(t TB, factory TestableVolumeFactory) { v.PutRaw(TestHash, TestBlock) v.TouchWithDate(TestHash, time.Now().Add(-2*blobSignatureTTL)) - buf, err := v.Get(TestHash) + buf := make([]byte, BlockSize) + n, err := v.Get(TestHash, buf) if err != nil { t.Fatal(err) } - if bytes.Compare(buf, TestBlock) != 0 { - t.Errorf("Got data %+q, expected %+q", buf, TestBlock) + if bytes.Compare(buf[:n], TestBlock) != 0 { + t.Errorf("Got data %+q, expected %+q", buf[:n], TestBlock) } - bufs.Put(buf) // Trash err = v.Trash(TestHash) @@ -737,7 +730,7 @@ func testTrashUntrash(t TB, factory TestableVolumeFactory) { t.Error(err) } } else { - _, err = v.Get(TestHash) + _, err = v.Get(TestHash, buf) if err == nil || !os.IsNotExist(err) { t.Errorf("os.IsNotExist(%v) should have been true", err) } @@ -750,14 +743,13 @@ func testTrashUntrash(t TB, factory TestableVolumeFactory) { } // Get the block - after trash and untrash sequence - buf, err = v.Get(TestHash) + n, err = v.Get(TestHash, buf) if err != nil { t.Fatal(err) } - if bytes.Compare(buf, TestBlock) != 0 { - t.Errorf("Got data %+q, expected %+q", buf, TestBlock) + if bytes.Compare(buf[:n], TestBlock) != 0 { + t.Errorf("Got data %+q, expected %+q", buf[:n], TestBlock) } - bufs.Put(buf) } func testTrashEmptyTrashUntrash(t TB, factory TestableVolumeFactory) { @@ -768,14 +760,14 @@ func testTrashEmptyTrashUntrash(t TB, factory TestableVolumeFactory) { }(trashLifetime) checkGet := func() error { - buf, err := v.Get(TestHash) + buf := make([]byte, BlockSize) + n, err := v.Get(TestHash, buf) if err != nil { return err } - if bytes.Compare(buf, TestBlock) != 0 { - t.Fatalf("Got data %+q, expected %+q", buf, TestBlock) + if bytes.Compare(buf[:n], TestBlock) != 0 { + t.Fatalf("Got data %+q, expected %+q", buf[:n], TestBlock) } - bufs.Put(buf) return nil } diff --git a/services/keepstore/volume_test.go b/services/keepstore/volume_test.go index e8a5a338f5..5671b8d4a9 100644 --- a/services/keepstore/volume_test.go +++ b/services/keepstore/volume_test.go @@ -113,17 +113,16 @@ func (v *MockVolume) Compare(loc string, buf []byte) error { } } -func (v *MockVolume) Get(loc string) ([]byte, error) { +func (v *MockVolume) Get(loc string, buf []byte) (int, error) { v.gotCall("Get") <-v.Gate if v.Bad { - return nil, errors.New("Bad volume") + return 0, errors.New("Bad volume") } else if block, ok := v.Store[loc]; ok { - buf := bufs.Get(len(block)) - copy(buf, block) - return buf, nil + copy(buf[:len(block)], block) + return len(block), nil } - return nil, os.ErrNotExist + return 0, os.ErrNotExist } func (v *MockVolume) Put(loc string, block []byte) error { diff --git a/services/keepstore/volume_unix.go b/services/keepstore/volume_unix.go index 996068cf3d..edec048dfe 100644 --- a/services/keepstore/volume_unix.go +++ b/services/keepstore/volume_unix.go @@ -181,26 +181,24 @@ func (v *UnixVolume) stat(path string) (os.FileInfo, error) { return stat, err } -// Get retrieves a block identified by the locator string "loc", and -// returns its contents as a byte slice. -// -// Get returns a nil buffer IFF it returns a non-nil error. -func (v *UnixVolume) Get(loc string) ([]byte, error) { +// Get retrieves a block, copies it to the given slice, and returns +// the number of bytes copied. +func (v *UnixVolume) Get(loc string, buf []byte) (int, error) { path := v.blockPath(loc) stat, err := v.stat(path) if err != nil { - return nil, v.translateError(err) + return 0, v.translateError(err) + } + if stat.Size() > int64(len(buf)) { + return 0, TooLongError } - buf := bufs.Get(int(stat.Size())) + var read int + size := int(stat.Size()) err = v.getFunc(path, func(rdr io.Reader) error { - _, err = io.ReadFull(rdr, buf) + read, err = io.ReadFull(rdr, buf[:size]) return err }) - if err != nil { - bufs.Put(buf) - return nil, err - } - return buf, nil + return read, err } // Compare returns nil if Get(loc) would return the same content as diff --git a/services/keepstore/volume_unix_test.go b/services/keepstore/volume_unix_test.go index 0775e89ed2..c95538bc4d 100644 --- a/services/keepstore/volume_unix_test.go +++ b/services/keepstore/volume_unix_test.go @@ -106,12 +106,13 @@ func TestGetNotFound(t *testing.T) { defer v.Teardown() v.Put(TestHash, TestBlock) - buf, err := v.Get(TestHash2) + buf := make([]byte, BlockSize) + n, err := v.Get(TestHash2, buf) switch { case os.IsNotExist(err): break case err == nil: - t.Errorf("Read should have failed, returned %s", string(buf)) + t.Errorf("Read should have failed, returned %+q", buf[:n]) default: t.Errorf("Read expected ErrNotExist, got: %s", err) } @@ -151,7 +152,8 @@ func TestUnixVolumeReadonly(t *testing.T) { v.PutRaw(TestHash, TestBlock) - _, err := v.Get(TestHash) + buf := make([]byte, BlockSize) + _, err := v.Get(TestHash, buf) if err != nil { t.Errorf("got err %v, expected nil", err) }