minitest (~> 5.1)
thread_safe (~> 0.1)
tzinfo (~> 1.1)
- addressable (2.3.6)
+ addressable (2.4.0)
andand (1.3.3)
angularjs-rails (1.3.8)
arel (5.0.1.20140414130214)
- arvados (0.1.20150511150219)
- activesupport (>= 3.2.13)
+ arvados (0.1.20160420143004)
+ activesupport (>= 3, < 4.2.6)
andand (~> 1.3, >= 1.3.3)
- google-api-client (~> 0.6.3, >= 0.6.3)
+ google-api-client (>= 0.7, < 0.9)
+ i18n (~> 0)
json (~> 1.7, >= 1.7.7)
- jwt (>= 0.1.5, < 1.0.0)
+ jwt (>= 0.1.5, < 2)
autoparse (0.3.3)
addressable (>= 2.3.1)
extlib (>= 0.9.15)
erubis (2.7.0)
execjs (2.2.2)
extlib (0.9.16)
- faraday (0.8.9)
- multipart-post (~> 1.2.0)
+ faraday (0.9.2)
+ multipart-post (>= 1.2, < 3)
fast_stack (0.1.0)
rake
rake-compiler
ffi (1.9.10)
flamegraph (0.1.0)
fast_stack
- google-api-client (0.6.4)
- addressable (>= 2.3.2)
- autoparse (>= 0.3.3)
- extlib (>= 0.9.15)
- faraday (~> 0.8.4)
- jwt (>= 0.1.5)
- launchy (>= 2.1.1)
- multi_json (>= 1.0.0)
- signet (~> 0.4.5)
- uuidtools (>= 2.1.0)
+ google-api-client (0.8.6)
+ activesupport (>= 3.2)
+ addressable (~> 2.3)
+ autoparse (~> 0.3)
+ extlib (~> 0.9)
+ faraday (~> 0.9)
+ googleauth (~> 0.3)
+ launchy (~> 2.4)
+ multi_json (~> 1.10)
+ retriable (~> 1.4)
+ signet (~> 0.6)
+ googleauth (0.5.1)
+ faraday (~> 0.9)
+ jwt (~> 1.4)
+ logging (~> 2.0)
+ memoist (~> 0.12)
+ multi_json (~> 1.11)
+ os (~> 0.9)
+ signet (~> 0.7)
headless (1.0.2)
highline (1.6.21)
httpclient (2.6.0.1)
railties (>= 3.0, < 5.0)
thor (>= 0.14, < 2.0)
json (1.8.3)
- jwt (0.1.13)
- multi_json (>= 1.5)
+ jwt (1.5.4)
launchy (2.4.3)
addressable (~> 2.3)
less (2.6.0)
actionpack (>= 3.1)
less (~> 2.6.0)
libv8 (3.16.14.7)
+ little-plugger (1.1.4)
+ logging (2.1.0)
+ little-plugger (~> 1.1)
+ multi_json (~> 1.10)
mail (2.6.3)
mime-types (>= 1.16, < 3)
+ memoist (0.14.0)
metaclass (0.0.4)
mime-types (2.99)
mini_portile (0.6.2)
- minitest (5.7.0)
+ minitest (5.8.4)
mocha (1.1.0)
metaclass (~> 0.0.1)
morrisjs-rails (0.5.1)
railties (> 3.1, < 5)
- multi_json (1.11.2)
- multipart-post (1.2.0)
+ multi_json (1.12.0)
+ multipart-post (2.0.0)
net-scp (1.2.1)
net-ssh (>= 2.6.5)
net-sftp (2.1.2)
nokogiri (1.6.6.4)
mini_portile (~> 0.6.0)
oj (2.11.2)
+ os (0.9.6)
passenger (4.0.57)
daemon_controller (>= 1.2.0)
rack
rake
raphael-rails (2.1.2)
ref (1.0.5)
+ retriable (1.4.1)
ruby-debug-passenger (0.2.0)
ruby-prof (0.15.2)
rubyzip (1.1.7)
multi_json (~> 1.0)
rubyzip (~> 1.0)
websocket (~> 1.0)
- signet (0.4.5)
- addressable (>= 2.2.3)
- faraday (~> 0.8.1)
- jwt (>= 0.1.5)
- multi_json (>= 1.0.0)
+ signet (0.7.2)
+ addressable (~> 2.3)
+ faraday (~> 0.9)
+ jwt (~> 1.5)
+ multi_json (~> 1.10)
simplecov (0.9.1)
docile (~> 1.1.0)
multi_json (~> 1.0)
uglifier (2.7.0)
execjs (>= 0.3.0)
json (>= 1.8.0)
- uuidtools (2.1.5)
websocket (1.2.2)
websocket-driver (0.5.1)
websocket-extensions (>= 0.1.0)
therubyracer
uglifier (>= 1.0.3)
wiselinks
+
+BUNDLED WITH
+ 1.12.1
# before trying "go test". Otherwise, coverage-reporting
# mode makes Go show the wrong line numbers when reporting
# compilation errors.
+ go get -t "git.curoverse.com/arvados.git/$1" || return 1
if [[ -n "${testargs[$1]}" ]]
then
# "go test -check.vv giturl" doesn't work, but this
# does:
- cd "$WORKSPACE/$1" && \
- go get -t "git.curoverse.com/arvados.git/$1" && \
- go test ${short:+-short} ${coverflags[@]} ${testargs[$1]}
+ cd "$WORKSPACE/$1" && go test ${short:+-short} ${coverflags[@]} ${testargs[$1]}
else
# The above form gets verbose even when testargs is
# empty, so use this form in such cases:
- go get -t "git.curoverse.com/arvados.git/$1" && \
- go test ${short:+-short} ${coverflags[@]} "git.curoverse.com/arvados.git/$1"
+ go test ${short:+-short} ${coverflags[@]} "git.curoverse.com/arvados.git/$1"
fi
result="$?"
- go tool cover -html="$WORKSPACE/tmp/.$covername.tmp" -o "$WORKSPACE/tmp/$covername.html"
- rm "$WORKSPACE/tmp/.$covername.tmp"
+ if [[ -f "$WORKSPACE/tmp/.$covername.tmp" ]]
+ then
+ go tool cover -html="$WORKSPACE/tmp/.$covername.tmp" -o "$WORKSPACE/tmp/$covername.html"
+ rm "$WORKSPACE/tmp/.$covername.tmp"
+ fi
elif [[ "$2" == "pip" ]]
then
# $3 can name a path directory for us to use, including trailing
test_apiserver() {
rm -f "$WORKSPACE/services/api/git-commit.version"
cd "$WORKSPACE/services/api" \
- && RAILS_ENV=test bundle exec rake test TESTOPTS=-v ${testargs[services/api]}
+ && env RAILS_ENV=test ${short:+RAILS_TEST_SHORT=1} bundle exec rake test TESTOPTS=-v ${testargs[services/api]}
}
do_test services/api apiserver
test_workbench() {
start_nginx_proxy_services \
&& cd "$WORKSPACE/apps/workbench" \
- && RAILS_ENV=test bundle exec rake test TESTOPTS=-v ${testargs[apps/workbench]}
+ && env RAILS_ENV=test ${short:+RAILS_TEST_SHORT=1} bundle exec rake test TESTOPTS=-v ${testargs[apps/workbench]}
}
do_test apps/workbench workbench
test_workbench_benchmark() {
start_nginx_proxy_services \
&& cd "$WORKSPACE/apps/workbench" \
- && RAILS_ENV=test bundle exec rake test:benchmark ${testargs[apps/workbench_benchmark]}
+ && env RAILS_ENV=test ${short:+RAILS_TEST_SHORT=1} bundle exec rake test:benchmark ${testargs[apps/workbench_benchmark]}
}
do_test apps/workbench_benchmark workbench_benchmark
test_workbench_profile() {
start_nginx_proxy_services \
&& cd "$WORKSPACE/apps/workbench" \
- && RAILS_ENV=test bundle exec rake test:profile ${testargs[apps/workbench_profile]}
+ && env RAILS_ENV=test ${short:+RAILS_TEST_SHORT=1} bundle exec rake test:profile ${testargs[apps/workbench_profile]}
}
do_test apps/workbench_profile workbench_profile
s.executables << "arv-tag"
s.required_ruby_version = '>= 2.1.0'
s.add_runtime_dependency 'arvados', '~> 0.1', '>= 0.1.20150128223554'
- s.add_runtime_dependency 'google-api-client', '~> 0.6.3', '>= 0.6.3'
+ s.add_runtime_dependency 'google-api-client', '~> 0.6', '>= 0.6.3', '<0.9'
s.add_runtime_dependency 'activesupport', '~> 3.2', '>= 3.2.13'
s.add_runtime_dependency 'json', '~> 1.7', '>= 1.7.7'
s.add_runtime_dependency 'trollop', '~> 2.0'
s.add_runtime_dependency 'andand', '~> 1.3', '>= 1.3.3'
s.add_runtime_dependency 'oj', '~> 2.0', '>= 2.0.3'
s.add_runtime_dependency 'curb', '~> 0.8'
- s.add_runtime_dependency('jwt', '>= 0.1.5', '< 1.0.0')
s.homepage =
'https://arvados.org'
end
if args.progress: # Print newline to split stderr from stdout for humans.
print >>stderr
+ output = None
if args.stream:
output = writer.manifest_text()
if args.normalize:
status = 1
# Print the locator (uuid) of the new collection.
- stdout.write(output)
- if not output.endswith('\n'):
- stdout.write('\n')
+ if output is None:
+ status = status or 1
+ else:
+ stdout.write(output)
+ if not output.endswith('\n'):
+ stdout.write('\n')
for sigcode, orig_handler in orig_signal_handlers.items():
signal.signal(sigcode, orig_handler)
def test_empty_list(self):
answer = arvados.api('v1').humans().list(
- filters=[['uuid', 'is', None]]).execute()
+ filters=[['uuid', '=', None]]).execute()
self.assertEqual(answer['items_available'], len(answer['items']))
def test_nonempty_list(self):
import arvados
import arvados.commands.put as arv_put
-from arvados_testutil import ArvadosBaseTestCase
+from arvados_testutil import ArvadosBaseTestCase, fake_httplib2_response
import run_test_server
class ArvadosPutResumeCacheTest(ArvadosBaseTestCase):
self.call_main_with_args,
['--project-uuid', self.Z_UUID, '--stream'])
+ def test_api_error_handling(self):
+ collections_mock = mock.Mock(name='arv.collections()')
+ coll_create_mock = collections_mock().create().execute
+ coll_create_mock.side_effect = arvados.errors.ApiError(
+ fake_httplib2_response(403), '{}')
+ arv_put.api_client = arvados.api('v1')
+ arv_put.api_client.collections = collections_mock
+ with self.assertRaises(SystemExit) as exc_test:
+ self.call_main_with_args(['/dev/null'])
+ self.assertLess(0, exc_test.exception.args[0])
+ self.assertLess(0, coll_create_mock.call_count)
+ self.assertEqual("", self.main_stdout.getvalue())
+
+
class ArvPutIntegrationTest(run_test_server.TestCaseWithServers,
ArvadosBaseTestCase):
def _getKeepServerConfig():
gem 'test_after_commit', :group => :test
-gem 'google-api-client', '~> 0.6.3'
gem 'trollop'
gem 'faye-websocket'
activemodel (>= 3.0.0)
activesupport (>= 3.0.0)
rack (>= 1.1.0)
- addressable (2.3.8)
+ addressable (2.4.0)
andand (1.3.3)
arel (3.0.3)
- arvados (0.1.20150615153458)
- activesupport (>= 3.2.13)
+ arvados (0.1.20160420143004)
+ activesupport (>= 3, < 4.2.6)
andand (~> 1.3, >= 1.3.3)
- google-api-client (~> 0.6.3, >= 0.6.3)
+ google-api-client (>= 0.7, < 0.9)
+ i18n (~> 0)
json (~> 1.7, >= 1.7.7)
- jwt (>= 0.1.5, < 1.0.0)
- arvados-cli (0.1.20151207150126)
+ jwt (>= 0.1.5, < 2)
+ arvados-cli (0.1.20160503204200)
activesupport (~> 3.2, >= 3.2.13)
andand (~> 1.3, >= 1.3.3)
arvados (~> 0.1, >= 0.1.20150128223554)
curb (~> 0.8)
- google-api-client (~> 0.6.3, >= 0.6.3)
+ google-api-client (~> 0.6, >= 0.6.3, < 0.9)
json (~> 1.7, >= 1.7.7)
- jwt (>= 0.1.5, < 1.0.0)
oj (~> 2.0, >= 2.0.3)
trollop (~> 2.0)
autoparse (0.3.3)
coffee-script-source
execjs
coffee-script-source (1.7.0)
- curb (0.8.8)
+ curb (0.9.3)
daemon_controller (1.2.0)
database_cleaner (1.2.0)
erubis (2.7.0)
factory_girl_rails (4.4.1)
factory_girl (~> 4.4.0)
railties (>= 3.0.0)
- faraday (0.8.9)
- multipart-post (~> 1.2.0)
+ faraday (0.9.2)
+ multipart-post (>= 1.2, < 3)
faye-websocket (0.7.2)
eventmachine (>= 0.12.0)
websocket-driver (>= 0.3.1)
- google-api-client (0.6.4)
+ google-api-client (0.7.1)
addressable (>= 2.3.2)
autoparse (>= 0.3.3)
extlib (>= 0.9.15)
- faraday (~> 0.8.4)
+ faraday (>= 0.9.0)
jwt (>= 0.1.5)
launchy (>= 2.1.1)
multi_json (>= 1.0.0)
- signet (~> 0.4.5)
+ retriable (>= 1.4)
+ signet (>= 0.5.0)
uuidtools (>= 2.1.0)
hashie (1.2.0)
highline (1.6.21)
mime-types (1.25.1)
mocha (1.1.0)
metaclass (~> 0.0.1)
- multi_json (1.11.1)
- multipart-post (1.2.0)
+ multi_json (1.12.0)
+ multipart-post (2.0.0)
net-scp (1.2.0)
net-ssh (>= 2.6.5)
net-sftp (2.1.2)
jwt (~> 0.1.4)
multi_json (~> 1.0)
rack (~> 1.2)
- oj (2.11.4)
+ oj (2.15.0)
omniauth (1.1.1)
hashie (~> 1.2)
rack
rdoc (3.12.2)
json (~> 1.4)
ref (1.0.5)
+ retriable (2.1.0)
ruby-prof (0.15.2)
rvm-capistrano (1.5.1)
capistrano (~> 2.15.4)
railties (~> 3.2.0)
sass (>= 3.1.10)
tilt (~> 1.3)
- signet (0.4.5)
+ signet (0.5.1)
addressable (>= 2.2.3)
- faraday (~> 0.8.1)
+ faraday (>= 0.9.0.rc5)
jwt (>= 0.1.5)
multi_json (>= 1.0.0)
simplecov (0.7.1)
treetop (1.4.15)
polyglot
polyglot (>= 0.3.1)
- trollop (2.1.1)
+ trollop (2.1.2)
tzinfo (0.3.39)
uglifier (2.5.0)
execjs (>= 0.3.0)
database_cleaner
factory_girl_rails
faye-websocket
- google-api-client (~> 0.6.3)
jquery-rails
mocha
multi_json
uglifier (>= 1.0.3)
BUNDLED WITH
- 1.10.6
+ 1.12.1
return @attrs if @attrs
@attrs = params[resource_name]
if @attrs.is_a? String
- @attrs = Oj.load @attrs, symbol_keys: true
+ @attrs = Oj.strict_load @attrs, symbol_keys: true
end
unless @attrs.is_a? Hash
message = "No #{resource_name}"
def load_json_value(hash, key, must_be_class=nil)
if hash[key].is_a? String
- hash[key] = Oj.load(hash[key], symbol_keys: false)
+ hash[key] = Oj.strict_load(hash[key], symbol_keys: false)
if must_be_class and !hash[key].is_a? must_be_class
raise TypeError.new("parameter #{key.to_s} must be a #{must_be_class.to_s}")
end
new(user_id: system_user.id,
api_client_id: params[:api_client_id] || current_api_client.andand.id,
created_by_ip_address: remote_ip,
- scopes: Oj.load(params[:scopes] || '["all"]'))
+ scopes: Oj.strict_load(params[:scopes] || '["all"]'))
@object.save!
show
end
begin
begin
# Parse event data as JSON
- p = (Oj.load event.data).symbolize_keys
+ p = (Oj.strict_load event.data).symbolize_keys
filter = Filter.new(p)
rescue Oj::Error => e
ws.send ({status: 400, message: "malformed request"}.to_json)
@where = params[:where]
elsif params[:where].is_a? String
begin
- @where = Oj.load(params[:where])
+ @where = Oj.strict_load(params[:where])
raise unless @where.is_a? Hash
rescue
raise ArgumentError.new("Could not parse \"where\" param as an object")
@filters += params[:filters]
elsif params[:filters].is_a? String and !params[:filters].empty?
begin
- f = Oj.load params[:filters]
+ f = Oj.strict_load params[:filters]
if not f.nil?
raise unless f.is_a? Array
@filters += f
(case params[:order]
when String
if params[:order].starts_with? '['
- od = Oj.load(params[:order])
+ od = Oj.strict_load(params[:order])
raise unless od.is_a? Array
od
else
@select = params[:select]
when String
begin
- @select = Oj.load params[:select]
+ @select = Oj.strict_load params[:select]
raise unless @select.is_a? Array or @select.nil?
rescue
raise ArgumentError.new("Could not parse \"select\" param as an array")
end
end
cond_out << cond.join(' OR ')
+ else
+ raise ArgumentError.new("Invalid operator '#{operator}'")
end
end
conds_out << cond_out.join(' OR ') if cond_out.any?
include ManifestExamples
test "crud cycle for a collection with a big manifest" do
+ slow_test
bigmanifest = time_block 'make example' do
make_manifest(streams: 100,
files_per_stream: 100,
api_token: api_token(:active))
end
json = time_block "JSON encode #{bigmanifest.length>>20}MiB manifest" do
- Oj.dump({manifest_text: bigmanifest})
+ Oj.dump({"manifest_text" => bigmanifest})
end
time_block 'create' do
post '/arvados/v1/collections', {collection: json}, auth(:active)
end
test "memory usage" do
- hugemanifest = make_manifest(streams: 1,
- files_per_stream: 2000,
- blocks_per_file: 200,
- bytes_per_block: 2**26,
- api_token: api_token(:active))
+ slow_test
+ hugemanifest = make_manifest(streams: 1,
+ files_per_stream: 2000,
+ blocks_per_file: 200,
+ bytes_per_block: 2**26,
+ api_token: api_token(:active))
json = time_block "JSON encode #{hugemanifest.length>>20}MiB manifest" do
Oj.dump({manifest_text: hugemanifest})
end
- vmpeak "post" do
- post '/arvados/v1/collections', {collection: json}, auth(:active)
- end
+ vmpeak "post" do
+ post '/arvados/v1/collections', {collection: json}, auth(:active)
+ end
end
end
self.use_transactional_fixtures = false
test "reset fails when Rails.env != 'test'" do
+ slow_test
rails_env_was = Rails.env
begin
Rails.env = 'production'
end
test "database reset doesn't break basic CRUD operations" do
+ slow_test
active_auth = auth(:active)
admin_auth = auth(:admin)
end
test "roll back database change" do
+ slow_test
active_auth = auth(:active)
admin_auth = auth(:admin)
ws_helper do |ws|
ws.on :message do |event|
- d = Oj.load event.data
+ d = Oj.strict_load event.data
status = d["status"]
ws.close
end
end
ws.on :message do |event|
- d = Oj.load event.data
+ d = Oj.strict_load event.data
status = d["status"]
ws.close
end
end
ws.on :message do |event|
- d = Oj.load event.data
+ d = Oj.strict_load event.data
case state
when 1
assert_equal 200, d["status"]
end
ws.on :message do |event|
- d = Oj.load event.data
+ d = Oj.strict_load event.data
case state
when 1
assert_equal 200, d["status"]
end
ws.on :message do |event|
- d = Oj.load event.data
+ d = Oj.strict_load event.data
case state
when 1
assert_equal 200, d["status"]
end
ws.on :message do |event|
- d = Oj.load event.data
+ d = Oj.strict_load event.data
case state
when 1
assert_equal 200, d["status"]
end
ws.on :message do |event|
- d = Oj.load event.data
+ d = Oj.strict_load event.data
case state
when 1
assert_equal 200, d["status"]
end
ws.on :message do |event|
- d = Oj.load event.data
+ d = Oj.strict_load event.data
case state
when 1
assert_equal 200, d["status"]
end
test "connect, subscribe, get event, unsubscribe" do
+ slow_test
state = 1
spec = nil
spec_ev_uuid = nil
end
ws.on :message do |event|
- d = Oj.load event.data
+ d = Oj.strict_load event.data
case state
when 1
assert_equal 200, d["status"]
end
test "connect, subscribe, get event, unsubscribe with filter" do
+ slow_test
state = 1
spec = nil
spec_ev_uuid = nil
end
ws.on :message do |event|
- d = Oj.load event.data
+ d = Oj.strict_load event.data
case state
when 1
assert_equal 200, d["status"]
test "connect, subscribe, get event, try to unsubscribe with bogus filter" do
+ slow_test
state = 1
spec = nil
spec_ev_uuid = nil
end
ws.on :message do |event|
- d = Oj.load event.data
+ d = Oj.strict_load event.data
case state
when 1
assert_equal 200, d["status"]
test "connected, not subscribed, no event" do
+ slow_test
authorize_with :admin
ws_helper :admin, false do |ws|
end
test "connected, not authorized to see event" do
+ slow_test
state = 1
authorize_with :admin
end
ws.on :message do |event|
- d = Oj.load event.data
+ d = Oj.strict_load event.data
case state
when 1
assert_equal 200, d["status"]
end
ws.on :message do |event|
- d = Oj.load event.data
+ d = Oj.strict_load event.data
status = d["status"]
ws.close
end
end
ws.on :message do |event|
- d = Oj.load event.data
+ d = Oj.strict_load event.data
status = d["status"]
ws.close
end
end
ws.on :message do |event|
- d = Oj.load event.data
+ d = Oj.strict_load event.data
status = d["status"]
ws.close
end
end
ws.on :message do |event|
- d = Oj.load event.data
+ d = Oj.strict_load event.data
case state
when (1..EventBus::MAX_FILTERS)
assert_equal 200, d["status"]
end
test "connect, subscribe, lots of events" do
+ slow_test
state = 1
event_count = 0
log_start = Log.order(:id).last.id
end
ws.on :message do |event|
- d = Oj.load event.data
+ d = Oj.strict_load event.data
case state
when 1
assert_equal 200, d["status"]
end
ws.on :message do |event|
- d = Oj.load event.data
+ d = Oj.strict_load event.data
case state
when 1
assert_equal 200, d["status"]
module ArvadosTestSupport
def json_response
- Oj.load response.body
+ Oj.strict_load response.body
end
def api_token(api_client_auth_name)
ArvadosApiToken.new.call("rack.input" => "",
"HTTP_AUTHORIZATION" => "OAuth2 #{t}")
end
+
+ def slow_test
+ skip "RAILS_TEST_SHORT is set" unless (ENV['RAILS_TEST_SHORT'] || '').empty?
+ end
end
class ActionController::TestCase
# "crrud" == "create read render update delete", not a typo
test "crrud cycle for a collection with a big manifest)" do
+ slow_test
bigmanifest = time_block 'make example' do
make_manifest(streams: 100,
files_per_stream: 100,
// If the block is younger than azureWriteRaceInterval and is
// unexpectedly empty, assume a PutBlob operation is in progress, and
// wait for it to finish writing.
-func (v *AzureBlobVolume) Get(loc string) ([]byte, error) {
+func (v *AzureBlobVolume) Get(loc string, buf []byte) (int, error) {
var deadline time.Time
haveDeadline := false
- buf, err := v.get(loc)
- for err == nil && len(buf) == 0 && loc != "d41d8cd98f00b204e9800998ecf8427e" {
+ size, err := v.get(loc, buf)
+ for err == nil && size == 0 && loc != "d41d8cd98f00b204e9800998ecf8427e" {
// Seeing a brand new empty block probably means we're
// in a race with CreateBlob, which under the hood
// (apparently) does "CreateEmpty" and "CommitData"
} else if time.Now().After(deadline) {
break
}
- bufs.Put(buf)
time.Sleep(azureWriteRacePollTime)
- buf, err = v.get(loc)
+ size, err = v.get(loc, buf)
}
if haveDeadline {
- log.Printf("Race ended with len(buf)==%d", len(buf))
+ log.Printf("Race ended with size==%d", size)
}
- return buf, err
+ return size, err
}
-func (v *AzureBlobVolume) get(loc string) ([]byte, error) {
- expectSize := BlockSize
+func (v *AzureBlobVolume) get(loc string, buf []byte) (int, error) {
+ expectSize := len(buf)
if azureMaxGetBytes < BlockSize {
// Unfortunately the handler doesn't tell us how long the blob
// is expected to be, so we have to ask Azure.
props, err := v.bsClient.GetBlobProperties(v.containerName, loc)
if err != nil {
- return nil, v.translateError(err)
+ return 0, v.translateError(err)
}
if props.ContentLength > int64(BlockSize) || props.ContentLength < 0 {
- return nil, fmt.Errorf("block %s invalid size %d (max %d)", loc, props.ContentLength, BlockSize)
+ return 0, fmt.Errorf("block %s invalid size %d (max %d)", loc, props.ContentLength, BlockSize)
}
expectSize = int(props.ContentLength)
}
- buf := bufs.Get(expectSize)
if expectSize == 0 {
- return buf, nil
+ return 0, nil
}
// We'll update this actualSize if/when we get the last piece.
if startPos == 0 && endPos == expectSize {
rdr, err = v.bsClient.GetBlob(v.containerName, loc)
} else {
- rdr, err = v.bsClient.GetBlobRange(v.containerName, loc, fmt.Sprintf("%d-%d", startPos, endPos-1))
+ rdr, err = v.bsClient.GetBlobRange(v.containerName, loc, fmt.Sprintf("%d-%d", startPos, endPos-1), nil)
}
if err != nil {
errors[p] = err
wg.Wait()
for _, err := range errors {
if err != nil {
- bufs.Put(buf)
- return nil, v.translateError(err)
+ return 0, v.translateError(err)
}
}
- return buf[:actualSize], nil
+ return actualSize, nil
}
// Compare the given data with existing stored data.
if err != nil {
t.Error(err)
}
- gotData, err := v.Get(hash)
+ gotData := make([]byte, len(data))
+ gotLen, err := v.Get(hash, gotData)
if err != nil {
t.Error(err)
}
gotHash := fmt.Sprintf("%x", md5.Sum(gotData))
- gotLen := len(gotData)
- bufs.Put(gotData)
if gotLen != size {
t.Error("length mismatch: got %d != %d", gotLen, size)
}
// Wait for the stub's Put to create the empty blob
v.azHandler.race <- continuePut
go func() {
- buf, err := v.Get(TestHash)
+ buf := make([]byte, len(TestBlock))
+ _, err := v.Get(TestHash, buf)
if err != nil {
t.Error(err)
- } else {
- bufs.Put(buf)
}
close(allDone)
}()
allDone := make(chan struct{})
go func() {
defer close(allDone)
- buf, err := v.Get(TestHash)
+ buf := make([]byte, BlockSize)
+ n, err := v.Get(TestHash, buf)
if err != nil {
t.Error(err)
return
}
- if len(buf) != 0 {
- t.Errorf("Got %+q, expected empty buf", buf)
+ if n != 0 {
+ t.Errorf("Got %+q, expected empty buf", buf[:n])
}
- bufs.Put(buf)
}()
select {
case <-allDone:
expectedDc, responseDc)
}
// Confirm the block has been deleted
- _, err := vols[0].Get(TestHash)
+ buf := make([]byte, BlockSize)
+ _, err := vols[0].Get(TestHash, buf)
var blockDeleted = os.IsNotExist(err)
if !blockDeleted {
t.Error("superuserExistingBlockReq: block not deleted")
expectedDc, responseDc)
}
// Confirm the block has NOT been deleted.
- _, err = vols[0].Get(TestHash)
+ _, err = vols[0].Get(TestHash, buf)
if err != nil {
t.Errorf("testing delete on new block: %s\n", err)
}
}
}
+type notifyingResponseRecorder struct {
+ *httptest.ResponseRecorder
+ closer chan bool
+}
+
+func (r *notifyingResponseRecorder) CloseNotify() <-chan bool {
+ return r.closer
+}
+
+func TestGetHandlerClientDisconnect(t *testing.T) {
+ defer func(was bool) {
+ enforcePermissions = was
+ }(enforcePermissions)
+ enforcePermissions = false
+
+ defer func(orig *bufferPool) {
+ bufs = orig
+ }(bufs)
+ bufs = newBufferPool(1, BlockSize)
+ defer bufs.Put(bufs.Get(BlockSize))
+
+ KeepVM = MakeTestVolumeManager(2)
+ defer KeepVM.Close()
+
+ if err := KeepVM.AllWritable()[0].Put(TestHash, TestBlock); err != nil {
+ t.Error(err)
+ }
+
+ resp := ¬ifyingResponseRecorder{
+ ResponseRecorder: httptest.NewRecorder(),
+ closer: make(chan bool, 1),
+ }
+ if _, ok := http.ResponseWriter(resp).(http.CloseNotifier); !ok {
+ t.Fatal("notifyingResponseRecorder is broken")
+ }
+ // If anyone asks, the client has disconnected.
+ resp.closer <- true
+
+ ok := make(chan struct{})
+ go func() {
+ req, _ := http.NewRequest("GET", fmt.Sprintf("/%s+%d", TestHash, len(TestBlock)), nil)
+ (&LoggingRESTRouter{MakeRESTRouter()}).ServeHTTP(resp, req)
+ ok <- struct{}{}
+ }()
+
+ select {
+ case <-time.After(20 * time.Second):
+ t.Fatal("request took >20s, close notifier must be broken")
+ case <-ok:
+ }
+
+ ExpectStatusCode(t, "client disconnect", http.StatusServiceUnavailable, resp.ResponseRecorder)
+ for i, v := range KeepVM.AllWritable() {
+ if calls := v.(*MockVolume).called["GET"]; calls != 0 {
+ t.Errorf("volume %d got %d calls, expected 0", i, calls)
+ }
+ }
+}
+
// Invoke the GetBlockHandler a bunch of times to test for bufferpool resource
// leak.
func TestGetHandlerNoBufferleak(t *testing.T) {
}
}
- block, err := GetBlock(mux.Vars(req)["hash"])
+ // TODO: Probe volumes to check whether the block _might_
+ // exist. Some volumes/types could support a quick existence
+ // check without causing other operations to suffer. If all
+ // volumes support that, and assure us the block definitely
+ // isn't here, we can return 404 now instead of waiting for a
+ // buffer.
+
+ buf, err := getBufferForResponseWriter(resp, bufs, BlockSize)
if err != nil {
- // This type assertion is safe because the only errors
- // GetBlock can return are DiskHashError or NotFoundError.
- http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
+ http.Error(resp, err.Error(), http.StatusServiceUnavailable)
return
}
- defer bufs.Put(block)
+ defer bufs.Put(buf)
- resp.Header().Set("Content-Length", strconv.Itoa(len(block)))
+ size, err := GetBlock(mux.Vars(req)["hash"], buf, resp)
+ if err != nil {
+ code := http.StatusInternalServerError
+ if err, ok := err.(*KeepError); ok {
+ code = err.HTTPCode
+ }
+ http.Error(resp, err.Error(), code)
+ return
+ }
+
+ resp.Header().Set("Content-Length", strconv.Itoa(size))
resp.Header().Set("Content-Type", "application/octet-stream")
- resp.Write(block)
+ resp.Write(buf[:size])
+}
+
+// Get a buffer from the pool -- but give up and return a non-nil
+// error if resp implements http.CloseNotifier and tells us that the
+// client has disconnected before we get a buffer.
+func getBufferForResponseWriter(resp http.ResponseWriter, bufs *bufferPool, bufSize int) ([]byte, error) {
+ var closeNotifier <-chan bool
+ if resp, ok := resp.(http.CloseNotifier); ok {
+ closeNotifier = resp.CloseNotify()
+ }
+ var buf []byte
+ bufReady := make(chan []byte)
+ go func() {
+ bufReady <- bufs.Get(bufSize)
+ close(bufReady)
+ }()
+ select {
+ case buf = <-bufReady:
+ return buf, nil
+ case <-closeNotifier:
+ go func() {
+ // Even if closeNotifier happened first, we
+ // need to keep waiting for our buf so we can
+ // return it to the pool.
+ bufs.Put(<-bufReady)
+ }()
+ return nil, ErrClientDisconnect
+ }
}
// PutBlockHandler is a HandleFunc to address Put block requests.
return
}
- buf := bufs.Get(int(req.ContentLength))
- _, err := io.ReadFull(req.Body, buf)
+ buf, err := getBufferForResponseWriter(resp, bufs, int(req.ContentLength))
+ if err != nil {
+ http.Error(resp, err.Error(), http.StatusServiceUnavailable)
+ return
+ }
+
+ _, err = io.ReadFull(req.Body, buf)
if err != nil {
http.Error(resp, err.Error(), 500)
bufs.Put(buf)
}
}
-// ==============================
// GetBlock and PutBlock implement lower-level code for handling
// blocks by rooting through volumes connected to the local machine.
// Once the handler has determined that system policy permits the
// should be the only part of the code that cares about which volume a
// block is stored on, so it should be responsible for figuring out
// which volume to check for fetching blocks, storing blocks, etc.
-// ==============================
-// GetBlock fetches and returns the block identified by "hash".
-//
-// On success, GetBlock returns a byte slice with the block data, and
-// a nil error.
+// GetBlock fetches the block identified by "hash" into the provided
+// buf, and returns the data size.
//
// If the block cannot be found on any volume, returns NotFoundError.
//
// If the block found does not have the correct MD5 hash, returns
// DiskHashError.
//
-func GetBlock(hash string) ([]byte, error) {
+func GetBlock(hash string, buf []byte, resp http.ResponseWriter) (int, error) {
// Attempt to read the requested hash from a keep volume.
errorToCaller := NotFoundError
for _, vol := range KeepVM.AllReadable() {
- buf, err := vol.Get(hash)
+ size, err := vol.Get(hash, buf)
if err != nil {
// IsNotExist is an expected error and may be
// ignored. All other errors are logged. In
}
// Check the file checksum.
//
- filehash := fmt.Sprintf("%x", md5.Sum(buf))
+ filehash := fmt.Sprintf("%x", md5.Sum(buf[:size]))
if filehash != hash {
// TODO: Try harder to tell a sysadmin about
// this.
log.Printf("%s: checksum mismatch for request %s (actual %s)",
vol, hash, filehash)
errorToCaller = DiskHashError
- bufs.Put(buf)
continue
}
if errorToCaller == DiskHashError {
log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
vol, hash)
}
- return buf, nil
+ return size, nil
}
- return nil, errorToCaller
+ return 0, errorToCaller
}
// PutBlock Stores the BLOCK (identified by the content id HASH) in Keep.
testableVolumes[1].PutRaw(testHash, testBlock)
// Get should pass
- buf, err := GetBlock(testHash)
+ buf := make([]byte, len(testBlock))
+ n, err := GetBlock(testHash, buf, nil)
if err != nil {
t.Fatalf("Error while getting block %s", err)
}
- if bytes.Compare(buf, testBlock) != 0 {
- t.Errorf("Put succeeded but Get returned %+v, expected %+v", buf, testBlock)
+ if bytes.Compare(buf[:n], testBlock) != 0 {
+ t.Errorf("Put succeeded but Get returned %+v, expected %+v", buf[:n], testBlock)
}
}
testableVolumes[1].PutRaw(testHash, badData)
// Get should fail
- _, err := GetBlock(testHash)
+ buf := make([]byte, BlockSize)
+ size, err := GetBlock(testHash, buf, nil)
if err == nil {
- t.Fatalf("Expected error while getting corrupt block %v", testHash)
+ t.Fatalf("Got %+q, expected error while getting corrupt block %v", buf[:size], testHash)
}
}
}
// Check that PutBlock stored the data as expected
- buf, err := GetBlock(testHash)
+ buf := make([]byte, BlockSize)
+ size, err := GetBlock(testHash, buf, nil)
if err != nil {
t.Fatalf("Error during GetBlock for %q: %s", testHash, err)
- } else if bytes.Compare(buf, testBlock) != 0 {
- t.Errorf("Get response incorrect. Expected %q; found %q", testBlock, buf)
+ } else if bytes.Compare(buf[:size], testBlock) != 0 {
+ t.Errorf("Get response incorrect. Expected %q; found %q", testBlock, buf[:size])
}
}
// Put succeeded and overwrote the badData in one volume,
// and Get should return the testBlock now, ignoring the bad data.
- buf, err := GetBlock(testHash)
+ buf := make([]byte, BlockSize)
+ size, err := GetBlock(testHash, buf, nil)
if err != nil {
t.Fatalf("Error during GetBlock for %q: %s", testHash, err)
- } else if bytes.Compare(buf, testBlock) != 0 {
- t.Errorf("Get response incorrect. Expected %q; found %q", testBlock, buf)
+ } else if bytes.Compare(buf[:size], testBlock) != 0 {
+ t.Errorf("Get response incorrect. Expected %q; found %q", testBlock, buf[:size])
}
}
TooLongError = &KeepError{413, "Block is too large"}
MethodDisabledError = &KeepError{405, "Method disabled"}
ErrNotImplemented = &KeepError{500, "Unsupported configuration"}
+ ErrClientDisconnect = &KeepError{503, "Client disconnected"}
)
func (e *KeepError) Error() string {
}
// Check that GetBlock returns success.
- result, err := GetBlock(TestHash)
+ buf := make([]byte, BlockSize)
+ size, err := GetBlock(TestHash, buf, nil)
if err != nil {
t.Errorf("GetBlock error: %s", err)
}
- if fmt.Sprint(result) != fmt.Sprint(TestBlock) {
- t.Errorf("expected %s, got %s", TestBlock, result)
+ if bytes.Compare(buf[:size], TestBlock) != 0 {
+ t.Errorf("got %v, expected %v", buf[:size], TestBlock)
}
}
defer KeepVM.Close()
// Check that GetBlock returns failure.
- result, err := GetBlock(TestHash)
+ buf := make([]byte, BlockSize)
+ size, err := GetBlock(TestHash, buf, nil)
if err != NotFoundError {
- t.Errorf("Expected NotFoundError, got %v", result)
+ t.Errorf("Expected NotFoundError, got %v, err %v", buf[:size], err)
}
}
vols[0].Put(TestHash, BadBlock)
// Check that GetBlock returns failure.
- result, err := GetBlock(TestHash)
+ buf := make([]byte, BlockSize)
+ size, err := GetBlock(TestHash, buf, nil)
if err != DiskHashError {
- t.Errorf("Expected DiskHashError, got %v (buf: %v)", err, result)
+ t.Errorf("Expected DiskHashError, got %v (buf: %v)", err, buf[:size])
}
}
}
vols := KeepVM.AllReadable()
- result, err := vols[1].Get(TestHash)
+ buf := make([]byte, BlockSize)
+ n, err := vols[1].Get(TestHash, buf)
if err != nil {
t.Fatalf("Volume #0 Get returned error: %v", err)
}
- if string(result) != string(TestBlock) {
+ if string(buf[:n]) != string(TestBlock) {
t.Fatalf("PutBlock stored '%s', Get retrieved '%s'",
- string(TestBlock), string(result))
+ string(TestBlock), string(buf[:n]))
}
}
t.Fatalf("PutBlock: n %d err %v", n, err)
}
- result, err := GetBlock(TestHash)
+ buf := make([]byte, BlockSize)
+ size, err := GetBlock(TestHash, buf, nil)
if err != nil {
t.Fatalf("GetBlock: %v", err)
}
- if string(result) != string(TestBlock) {
- t.Error("PutBlock/GetBlock mismatch")
- t.Fatalf("PutBlock stored '%s', GetBlock retrieved '%s'",
- string(TestBlock), string(result))
+ if bytes.Compare(buf[:size], TestBlock) != 0 {
+ t.Fatalf("PutBlock stored %+q, GetBlock retrieved %+q",
+ TestBlock, buf[:size])
}
}
}
// Confirm that GetBlock fails to return anything.
- if result, err := GetBlock(TestHash); err != NotFoundError {
+ if result, err := GetBlock(TestHash, make([]byte, BlockSize), nil); err != NotFoundError {
t.Errorf("GetBlock succeeded after a corrupt block store (result = %s, err = %v)",
string(result), err)
}
}
// The block on disk should now match TestBlock.
- if block, err := GetBlock(TestHash); err != nil {
+ buf := make([]byte, BlockSize)
+ if size, err := GetBlock(TestHash, buf, nil); err != nil {
t.Errorf("GetBlock: %v", err)
- } else if bytes.Compare(block, TestBlock) != 0 {
- t.Errorf("GetBlock returned: '%s'", string(block))
+ } else if bytes.Compare(buf[:size], TestBlock) != 0 {
+ t.Errorf("Got %+q, expected %+q", buf[:size], TestBlock)
}
}
t.Errorf("mtime was changed on vols[0]:\noldMtime = %v\nnewMtime = %v\n",
oldMtime, newMtime)
}
- result, err := vols[1].Get(TestHash)
+ buf := make([]byte, BlockSize)
+ n, err := vols[1].Get(TestHash, buf)
if err != nil {
t.Fatalf("vols[1]: %v", err)
}
- if bytes.Compare(result, TestBlock) != 0 {
- t.Errorf("new block does not match test block\nnew block = %v\n", result)
+ if bytes.Compare(buf[:n], TestBlock) != 0 {
+ t.Errorf("new block does not match test block\nnew block = %v\n", buf[:n])
}
}
sentHdr time.Time
}
+// CloseNotify implements http.CloseNotifier.
+func (resp *LoggingResponseWriter) CloseNotify() <-chan bool {
+ wrapped, ok := resp.ResponseWriter.(http.CloseNotifier)
+ if !ok {
+ // If upstream doesn't implement CloseNotifier, we can
+ // satisfy the interface by returning a channel that
+ // never sends anything (the interface doesn't
+ // guarantee that anything will ever be sent on the
+ // channel even if the client disconnects).
+ return nil
+ }
+ return wrapped.CloseNotify()
+}
+
// WriteHeader writes header to ResponseWriter
-func (loggingWriter *LoggingResponseWriter) WriteHeader(code int) {
- if loggingWriter.sentHdr == zeroTime {
- loggingWriter.sentHdr = time.Now()
+func (resp *LoggingResponseWriter) WriteHeader(code int) {
+ if resp.sentHdr == zeroTime {
+ resp.sentHdr = time.Now()
}
- loggingWriter.Status = code
- loggingWriter.ResponseWriter.WriteHeader(code)
+ resp.Status = code
+ resp.ResponseWriter.WriteHeader(code)
}
var zeroTime time.Time
-func (loggingWriter *LoggingResponseWriter) Write(data []byte) (int, error) {
- if loggingWriter.Length == 0 && len(data) > 0 && loggingWriter.sentHdr == zeroTime {
- loggingWriter.sentHdr = time.Now()
+func (resp *LoggingResponseWriter) Write(data []byte) (int, error) {
+ if resp.Length == 0 && len(data) > 0 && resp.sentHdr == zeroTime {
+ resp.sentHdr = time.Now()
}
- loggingWriter.Length += len(data)
- if loggingWriter.Status >= 400 {
- loggingWriter.ResponseBody += string(data)
+ resp.Length += len(data)
+ if resp.Status >= 400 {
+ resp.ResponseBody += string(data)
}
- return loggingWriter.ResponseWriter.Write(data)
+ return resp.ResponseWriter.Write(data)
}
// LoggingRESTRouter is used to add logging capabilities to mux.Router
router http.Handler
}
-func (loggingRouter *LoggingRESTRouter) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+func (loggingRouter *LoggingRESTRouter) ServeHTTP(wrappedResp http.ResponseWriter, req *http.Request) {
t0 := time.Now()
- loggingWriter := LoggingResponseWriter{http.StatusOK, 0, resp, "", zeroTime}
- loggingRouter.router.ServeHTTP(&loggingWriter, req)
- statusText := http.StatusText(loggingWriter.Status)
- if loggingWriter.Status >= 400 {
- statusText = strings.Replace(loggingWriter.ResponseBody, "\n", "", -1)
+ resp := LoggingResponseWriter{http.StatusOK, 0, wrappedResp, "", zeroTime}
+ loggingRouter.router.ServeHTTP(&resp, req)
+ statusText := http.StatusText(resp.Status)
+ if resp.Status >= 400 {
+ statusText = strings.Replace(resp.ResponseBody, "\n", "", -1)
}
now := time.Now()
tTotal := now.Sub(t0)
- tLatency := loggingWriter.sentHdr.Sub(t0)
- tResponse := now.Sub(loggingWriter.sentHdr)
- log.Printf("[%s] %s %s %d %.6fs %.6fs %.6fs %d %d \"%s\"", req.RemoteAddr, req.Method, req.URL.Path[1:], req.ContentLength, tTotal.Seconds(), tLatency.Seconds(), tResponse.Seconds(), loggingWriter.Status, loggingWriter.Length, statusText)
+ tLatency := resp.sentHdr.Sub(t0)
+ tResponse := now.Sub(resp.sentHdr)
+ log.Printf("[%s] %s %s %d %.6fs %.6fs %.6fs %d %d \"%s\"", req.RemoteAddr, req.Method, req.URL.Path[1:], req.ContentLength, tTotal.Seconds(), tLatency.Seconds(), tResponse.Seconds(), resp.Status, resp.Length, statusText)
}
--- /dev/null
+package main
+
+import (
+ "net/http"
+ "testing"
+)
+
+func TestLoggingResponseWriterImplementsCloseNotifier(t *testing.T) {
+ http.ResponseWriter(&LoggingResponseWriter{}).(http.CloseNotifier).CloseNotify()
+}
return nil
}
-func (v *S3Volume) Get(loc string) ([]byte, error) {
+func (v *S3Volume) Get(loc string, buf []byte) (int, error) {
rdr, err := v.Bucket.GetReader(loc)
if err != nil {
- return nil, v.translateError(err)
+ return 0, v.translateError(err)
}
defer rdr.Close()
- buf := bufs.Get(BlockSize)
n, err := io.ReadFull(rdr, buf)
switch err {
case nil, io.EOF, io.ErrUnexpectedEOF:
- return buf[:n], nil
+ return n, nil
default:
- bufs.Put(buf)
- return nil, v.translateError(err)
+ return 0, v.translateError(err)
}
}
expectEqualWithin(t, time.Second, 0, func() interface{} { return trashq.Status().InProgress })
// Verify Locator1 to be un/deleted as expected
- data, _ := GetBlock(testData.Locator1)
+ buf := make([]byte, BlockSize)
+ size, err := GetBlock(testData.Locator1, buf, nil)
if testData.ExpectLocator1 {
- if len(data) == 0 {
+ if size == 0 || err != nil {
t.Errorf("Expected Locator1 to be still present: %s", testData.Locator1)
}
} else {
- if len(data) > 0 {
+ if size > 0 || err == nil {
t.Errorf("Expected Locator1 to be deleted: %s", testData.Locator1)
}
}
// Verify Locator2 to be un/deleted as expected
if testData.Locator1 != testData.Locator2 {
- data, _ = GetBlock(testData.Locator2)
+ size, err = GetBlock(testData.Locator2, buf, nil)
if testData.ExpectLocator2 {
- if len(data) == 0 {
+ if size == 0 || err != nil {
t.Errorf("Expected Locator2 to be still present: %s", testData.Locator2)
}
} else {
- if len(data) > 0 {
+ if size > 0 || err == nil {
t.Errorf("Expected Locator2 to be deleted: %s", testData.Locator2)
}
}
if testData.DifferentMtimes {
locatorFoundIn := 0
for _, volume := range KeepVM.AllReadable() {
- if _, err := volume.Get(testData.Locator1); err == nil {
+ buf := make([]byte, BlockSize)
+ if _, err := volume.Get(testData.Locator1, buf); err == nil {
locatorFoundIn = locatorFoundIn + 1
}
}
// for example, a single mounted disk, a RAID array, an Amazon S3 volume,
// etc.
type Volume interface {
- // Get a block. IFF the returned error is nil, the caller must
- // put the returned slice back into the buffer pool when it's
- // finished with it. (Otherwise, the buffer pool will be
- // depleted and eventually -- when all available buffers are
- // used and not returned -- operations will reach deadlock.)
+ // Get a block: copy the block data into buf, and return the
+ // number of bytes copied.
//
// loc is guaranteed to consist of 32 or more lowercase hex
// digits.
//
- // Get should not verify the integrity of the returned data:
- // it should just return whatever was found in its backing
+ // Get should not verify the integrity of the data: it should
+ // just return whatever was found in its backing
// store. (Integrity checking is the caller's responsibility.)
//
// If an error is encountered that prevents it from
// access log if the block is not found on any other volumes
// either).
//
- // If the data in the backing store is bigger than BlockSize,
- // Get is permitted to return an error without reading any of
- // the data.
- Get(loc string) ([]byte, error)
+ // If the data in the backing store is bigger than len(buf),
+ // then Get is permitted to return an error without reading
+ // any of the data.
+ //
+ // len(buf) will not exceed BlockSize.
+ Get(loc string, buf []byte) (int, error)
// Compare the given data with the stored data (i.e., what Get
// would return). If equal, return nil. If not, return
v.PutRaw(TestHash, TestBlock)
- buf, err := v.Get(TestHash)
+ buf := make([]byte, BlockSize)
+ n, err := v.Get(TestHash, buf)
if err != nil {
t.Fatal(err)
}
- bufs.Put(buf)
-
- if bytes.Compare(buf, TestBlock) != 0 {
+ if bytes.Compare(buf[:n], TestBlock) != 0 {
t.Errorf("expected %s, got %s", string(TestBlock), string(buf))
}
}
v := factory(t)
defer v.Teardown()
- if _, err := v.Get(TestHash2); err == nil {
+ buf := make([]byte, BlockSize)
+ if _, err := v.Get(TestHash2, buf); err == nil {
t.Errorf("Expected error while getting non-existing block %v", TestHash2)
}
}
v.PutRaw(testHash, testDataA)
putErr := v.Put(testHash, testDataB)
- buf, getErr := v.Get(testHash)
+ buf := make([]byte, BlockSize)
+ n, getErr := v.Get(testHash, buf)
if putErr == nil {
// Put must not return a nil error unless it has
// overwritten the existing data.
- if bytes.Compare(buf, testDataB) != 0 {
- t.Errorf("Put succeeded but Get returned %+q, expected %+q", buf, testDataB)
+ if bytes.Compare(buf[:n], testDataB) != 0 {
+ t.Errorf("Put succeeded but Get returned %+q, expected %+q", buf[:n], testDataB)
}
} else {
// It is permissible for Put to fail, but it must
// leave us with either the original data, the new
// data, or nothing at all.
- if getErr == nil && bytes.Compare(buf, testDataA) != 0 && bytes.Compare(buf, testDataB) != 0 {
- t.Errorf("Put failed but Get returned %+q, which is neither %+q nor %+q", buf, testDataA, testDataB)
+ if getErr == nil && bytes.Compare(buf[:n], testDataA) != 0 && bytes.Compare(buf[:n], testDataB) != 0 {
+ t.Errorf("Put failed but Get returned %+q, which is neither %+q nor %+q", buf[:n], testDataA, testDataB)
}
}
- if getErr == nil {
- bufs.Put(buf)
- }
}
// Put and get multiple blocks
t.Errorf("Got err putting block %q: %q, expected nil", TestBlock3, err)
}
- data, err := v.Get(TestHash)
+ data := make([]byte, BlockSize)
+ n, err := v.Get(TestHash, data)
if err != nil {
t.Error(err)
} else {
- if bytes.Compare(data, TestBlock) != 0 {
- t.Errorf("Block present, but got %+q, expected %+q", data, TestBlock)
+ if bytes.Compare(data[:n], TestBlock) != 0 {
+ t.Errorf("Block present, but got %+q, expected %+q", data[:n], TestBlock)
}
- bufs.Put(data)
}
- data, err = v.Get(TestHash2)
+ n, err = v.Get(TestHash2, data)
if err != nil {
t.Error(err)
} else {
- if bytes.Compare(data, TestBlock2) != 0 {
- t.Errorf("Block present, but got %+q, expected %+q", data, TestBlock2)
+ if bytes.Compare(data[:n], TestBlock2) != 0 {
+ t.Errorf("Block present, but got %+q, expected %+q", data[:n], TestBlock2)
}
- bufs.Put(data)
}
- data, err = v.Get(TestHash3)
+ n, err = v.Get(TestHash3, data)
if err != nil {
t.Error(err)
} else {
- if bytes.Compare(data, TestBlock3) != 0 {
- t.Errorf("Block present, but to %+q, expected %+q", data, TestBlock3)
+ if bytes.Compare(data[:n], TestBlock3) != 0 {
+ t.Errorf("Block present, but to %+q, expected %+q", data[:n], TestBlock3)
}
- bufs.Put(data)
}
}
if err := v.Trash(TestHash); err != nil {
t.Error(err)
}
- data, err := v.Get(TestHash)
+ data := make([]byte, BlockSize)
+ n, err := v.Get(TestHash, data)
if err != nil {
t.Error(err)
- } else {
- if bytes.Compare(data, TestBlock) != 0 {
- t.Errorf("Got data %+q, expected %+q", data, TestBlock)
- }
- bufs.Put(data)
+ } else if bytes.Compare(data[:n], TestBlock) != 0 {
+ t.Errorf("Got data %+q, expected %+q", data[:n], TestBlock)
}
}
if err := v.Trash(TestHash); err != nil {
t.Error(err)
}
- if _, err := v.Get(TestHash); err == nil || !os.IsNotExist(err) {
+ data := make([]byte, BlockSize)
+ if _, err := v.Get(TestHash, data); err == nil || !os.IsNotExist(err) {
t.Errorf("os.IsNotExist(%v) should have been true", err)
}
}
}
v.PutRaw(TestHash, TestBlock)
+ buf := make([]byte, BlockSize)
// Get from read-only volume should succeed
- _, err := v.Get(TestHash)
+ _, err := v.Get(TestHash, buf)
if err != nil {
t.Errorf("got err %v, expected nil", err)
}
if err == nil {
t.Errorf("Expected error when putting block in a read-only volume")
}
- _, err = v.Get(TestHash2)
+ _, err = v.Get(TestHash2, buf)
if err == nil {
t.Errorf("Expected error when getting block whose put in read-only volume failed")
}
v.PutRaw(TestHash3, TestBlock3)
sem := make(chan int)
- go func(sem chan int) {
- buf, err := v.Get(TestHash)
+ go func() {
+ buf := make([]byte, BlockSize)
+ n, err := v.Get(TestHash, buf)
if err != nil {
t.Errorf("err1: %v", err)
}
- bufs.Put(buf)
- if bytes.Compare(buf, TestBlock) != 0 {
- t.Errorf("buf should be %s, is %s", string(TestBlock), string(buf))
+ if bytes.Compare(buf[:n], TestBlock) != 0 {
+ t.Errorf("buf should be %s, is %s", string(TestBlock), string(buf[:n]))
}
sem <- 1
- }(sem)
+ }()
- go func(sem chan int) {
- buf, err := v.Get(TestHash2)
+ go func() {
+ buf := make([]byte, BlockSize)
+ n, err := v.Get(TestHash2, buf)
if err != nil {
t.Errorf("err2: %v", err)
}
- bufs.Put(buf)
- if bytes.Compare(buf, TestBlock2) != 0 {
- t.Errorf("buf should be %s, is %s", string(TestBlock2), string(buf))
+ if bytes.Compare(buf[:n], TestBlock2) != 0 {
+ t.Errorf("buf should be %s, is %s", string(TestBlock2), string(buf[:n]))
}
sem <- 1
- }(sem)
+ }()
- go func(sem chan int) {
- buf, err := v.Get(TestHash3)
+ go func() {
+ buf := make([]byte, BlockSize)
+ n, err := v.Get(TestHash3, buf)
if err != nil {
t.Errorf("err3: %v", err)
}
- bufs.Put(buf)
- if bytes.Compare(buf, TestBlock3) != 0 {
- t.Errorf("buf should be %s, is %s", string(TestBlock3), string(buf))
+ if bytes.Compare(buf[:n], TestBlock3) != 0 {
+ t.Errorf("buf should be %s, is %s", string(TestBlock3), string(buf[:n]))
}
sem <- 1
- }(sem)
+ }()
// Wait for all goroutines to finish
- for done := 0; done < 3; {
- done += <-sem
+ for done := 0; done < 3; done++ {
+ <-sem
}
}
}(sem)
// Wait for all goroutines to finish
- for done := 0; done < 3; {
- done += <-sem
+ for done := 0; done < 3; done++ {
+ <-sem
}
// Double check that we actually wrote the blocks we expected to write.
- buf, err := v.Get(TestHash)
+ buf := make([]byte, BlockSize)
+ n, err := v.Get(TestHash, buf)
if err != nil {
t.Errorf("Get #1: %v", err)
}
- bufs.Put(buf)
- if bytes.Compare(buf, TestBlock) != 0 {
- t.Errorf("Get #1: expected %s, got %s", string(TestBlock), string(buf))
+ if bytes.Compare(buf[:n], TestBlock) != 0 {
+ t.Errorf("Get #1: expected %s, got %s", string(TestBlock), string(buf[:n]))
}
- buf, err = v.Get(TestHash2)
+ n, err = v.Get(TestHash2, buf)
if err != nil {
t.Errorf("Get #2: %v", err)
}
- bufs.Put(buf)
- if bytes.Compare(buf, TestBlock2) != 0 {
- t.Errorf("Get #2: expected %s, got %s", string(TestBlock2), string(buf))
+ if bytes.Compare(buf[:n], TestBlock2) != 0 {
+ t.Errorf("Get #2: expected %s, got %s", string(TestBlock2), string(buf[:n]))
}
- buf, err = v.Get(TestHash3)
+ n, err = v.Get(TestHash3, buf)
if err != nil {
t.Errorf("Get #3: %v", err)
}
- bufs.Put(buf)
- if bytes.Compare(buf, TestBlock3) != 0 {
- t.Errorf("Get #3: expected %s, got %s", string(TestBlock3), string(buf))
+ if bytes.Compare(buf[:n], TestBlock3) != 0 {
+ t.Errorf("Get #3: expected %s, got %s", string(TestBlock3), string(buf[:n]))
}
}
if err != nil {
t.Fatal(err)
}
- rdata, err := v.Get(hash)
+ buf := make([]byte, BlockSize)
+ n, err := v.Get(hash, buf)
if err != nil {
t.Error(err)
- } else {
- defer bufs.Put(rdata)
}
- if bytes.Compare(rdata, wdata) != 0 {
- t.Error("rdata != wdata")
+ if bytes.Compare(buf[:n], wdata) != 0 {
+ t.Error("buf %+q != wdata %+q", buf[:n], wdata)
}
}
v.PutRaw(TestHash, TestBlock)
v.TouchWithDate(TestHash, time.Now().Add(-2*blobSignatureTTL))
- buf, err := v.Get(TestHash)
+ buf := make([]byte, BlockSize)
+ n, err := v.Get(TestHash, buf)
if err != nil {
t.Fatal(err)
}
- if bytes.Compare(buf, TestBlock) != 0 {
- t.Errorf("Got data %+q, expected %+q", buf, TestBlock)
+ if bytes.Compare(buf[:n], TestBlock) != 0 {
+ t.Errorf("Got data %+q, expected %+q", buf[:n], TestBlock)
}
- bufs.Put(buf)
// Trash
err = v.Trash(TestHash)
t.Error(err)
}
} else {
- _, err = v.Get(TestHash)
+ _, err = v.Get(TestHash, buf)
if err == nil || !os.IsNotExist(err) {
t.Errorf("os.IsNotExist(%v) should have been true", err)
}
}
// Get the block - after trash and untrash sequence
- buf, err = v.Get(TestHash)
+ n, err = v.Get(TestHash, buf)
if err != nil {
t.Fatal(err)
}
- if bytes.Compare(buf, TestBlock) != 0 {
- t.Errorf("Got data %+q, expected %+q", buf, TestBlock)
+ if bytes.Compare(buf[:n], TestBlock) != 0 {
+ t.Errorf("Got data %+q, expected %+q", buf[:n], TestBlock)
}
- bufs.Put(buf)
}
func testTrashEmptyTrashUntrash(t TB, factory TestableVolumeFactory) {
}(trashLifetime)
checkGet := func() error {
- buf, err := v.Get(TestHash)
+ buf := make([]byte, BlockSize)
+ n, err := v.Get(TestHash, buf)
if err != nil {
return err
}
- if bytes.Compare(buf, TestBlock) != 0 {
- t.Fatalf("Got data %+q, expected %+q", buf, TestBlock)
+ if bytes.Compare(buf[:n], TestBlock) != 0 {
+ t.Fatalf("Got data %+q, expected %+q", buf[:n], TestBlock)
}
- bufs.Put(buf)
return nil
}
}
}
-func (v *MockVolume) Get(loc string) ([]byte, error) {
+func (v *MockVolume) Get(loc string, buf []byte) (int, error) {
v.gotCall("Get")
<-v.Gate
if v.Bad {
- return nil, errors.New("Bad volume")
+ return 0, errors.New("Bad volume")
} else if block, ok := v.Store[loc]; ok {
- buf := bufs.Get(len(block))
- copy(buf, block)
- return buf, nil
+ copy(buf[:len(block)], block)
+ return len(block), nil
}
- return nil, os.ErrNotExist
+ return 0, os.ErrNotExist
}
func (v *MockVolume) Put(loc string, block []byte) error {
return stat, err
}
-// Get retrieves a block identified by the locator string "loc", and
-// returns its contents as a byte slice.
-//
-// Get returns a nil buffer IFF it returns a non-nil error.
-func (v *UnixVolume) Get(loc string) ([]byte, error) {
+// Get retrieves a block, copies it to the given slice, and returns
+// the number of bytes copied.
+func (v *UnixVolume) Get(loc string, buf []byte) (int, error) {
path := v.blockPath(loc)
stat, err := v.stat(path)
if err != nil {
- return nil, v.translateError(err)
+ return 0, v.translateError(err)
+ }
+ if stat.Size() > int64(len(buf)) {
+ return 0, TooLongError
}
- buf := bufs.Get(int(stat.Size()))
+ var read int
+ size := int(stat.Size())
err = v.getFunc(path, func(rdr io.Reader) error {
- _, err = io.ReadFull(rdr, buf)
+ read, err = io.ReadFull(rdr, buf[:size])
return err
})
- if err != nil {
- bufs.Put(buf)
- return nil, err
- }
- return buf, nil
+ return read, err
}
// Compare returns nil if Get(loc) would return the same content as
defer v.Teardown()
v.Put(TestHash, TestBlock)
- buf, err := v.Get(TestHash2)
+ buf := make([]byte, BlockSize)
+ n, err := v.Get(TestHash2, buf)
switch {
case os.IsNotExist(err):
break
case err == nil:
- t.Errorf("Read should have failed, returned %s", string(buf))
+ t.Errorf("Read should have failed, returned %+q", buf[:n])
default:
t.Errorf("Read expected ErrNotExist, got: %s", err)
}
v.PutRaw(TestHash, TestBlock)
- _, err := v.Get(TestHash)
+ buf := make([]byte, BlockSize)
+ _, err := v.Get(TestHash, buf)
if err != nil {
t.Errorf("got err %v, expected nil", err)
}
value search for a `term` match on each item. Returns the
object's 'id' attribute by default.
"""
- items = getattr(self, list_method)(**kwargs)
+ try:
+ list_func = getattr(self, list_method)
+ except AttributeError:
+ list_func = getattr(self.real, list_method)
+ items = list_func(**kwargs)
results = [item for item in items if key(item) == term]
count = len(results)
if count != 1:
--- /dev/null
+#!/usr/bin/env python
+
+from __future__ import absolute_import, print_function
+
+import unittest
+
+import libcloud.common.types as cloud_types
+import mock
+
+import arvnodeman.computenode.driver as driver_base
+from . import testutil
+
+class ComputeNodeDriverTestCase(unittest.TestCase):
+ def setUp(self):
+ self.driver_mock = mock.MagicMock(name='driver_mock')
+ driver_base.BaseComputeNodeDriver.SEARCH_CACHE = {}
+
+ def test_search_for_now_uses_public_method(self):
+ image = testutil.cloud_object_mock(1)
+ self.driver_mock().list_images.return_value = [image]
+ driver = driver_base.BaseComputeNodeDriver({}, {}, {}, self.driver_mock)
+ self.assertIs(image, driver.search_for_now('id_1', 'list_images'))
+ self.assertEqual(1, self.driver_mock().list_images.call_count)
+
+ def test_search_for_now_uses_private_method(self):
+ net = testutil.cloud_object_mock(1)
+ self.driver_mock().ex_list_networks.return_value = [net]
+ driver = driver_base.BaseComputeNodeDriver({}, {}, {}, self.driver_mock)
+ self.assertIs(net, driver.search_for_now('id_1', 'ex_list_networks'))
+ self.assertEqual(1, self.driver_mock().ex_list_networks.call_count)
+
+ def test_search_for_now_raises_ValueError_on_zero_results(self):
+ self.driver_mock().list_images.return_value = []
+ driver = driver_base.BaseComputeNodeDriver({}, {}, {}, self.driver_mock)
+ with self.assertRaises(ValueError) as test:
+ driver.search_for_now('id_1', 'list_images')
+
+ def test_search_for_now_raises_ValueError_on_extra_results(self):
+ image = testutil.cloud_object_mock(1)
+ self.driver_mock().list_images.return_value = [image, image]
+ driver = driver_base.BaseComputeNodeDriver({}, {}, {}, self.driver_mock)
+ with self.assertRaises(ValueError) as test:
+ driver.search_for_now('id_1', 'list_images')
+
+ def test_search_for_now_does_not_cache_results(self):
+ image1 = testutil.cloud_object_mock(1)
+ image2 = testutil.cloud_object_mock(1)
+ self.driver_mock().list_images.side_effect = [[image1], [image2]]
+ driver = driver_base.BaseComputeNodeDriver({}, {}, {}, self.driver_mock)
+ self.assertIsNot(driver.search_for_now('id_1', 'list_images'),
+ driver.search_for_now('id_1', 'list_images'))
+ self.assertEqual(2, self.driver_mock().list_images.call_count)
+
+ def test_search_for_returns_cached_results(self):
+ image1 = testutil.cloud_object_mock(1)
+ image2 = testutil.cloud_object_mock(1)
+ self.driver_mock().list_images.side_effect = [[image1], [image2]]
+ driver = driver_base.BaseComputeNodeDriver({}, {}, {}, self.driver_mock)
+ self.assertIs(driver.search_for('id_1', 'list_images'),
+ driver.search_for('id_1', 'list_images'))
+ self.assertEqual(1, self.driver_mock().list_images.call_count)