--- /dev/null
+class WebsocketController < ApplicationController
+ skip_before_filter :find_objects_for_index
+
+ def index
+ end
+
+ def model_class
+ "Websocket"
+ end
+end
--- /dev/null
+<% content_for :page_title do %>
+ Event bus debugging page
+<% end %>
+<h1>Event bus debugging page</h1>
+
+<form>
+<textarea style="width:100%; height: 10em" id="websocket-message-content"></textarea>
+<button type="button" id="send-to-websocket">Send</button>
+</form>
+
+<br>
+
+<p id="PutStuffHere"></p>
+
+<script>
+$(function() {
+putStuffThere = function (content) {
+ $("#PutStuffHere").append(content + "<br>");
+};
+
+var dispatcher = new WebSocket('<%= $arvados_api_client.discovery[:websocketUrl] %>?api_token=<%= Thread.current[:arvados_api_token] %>');
+dispatcher.onmessage = function(event) {
+ //putStuffThere(JSON.parse(event.data));
+ putStuffThere(event.data);
+};
+
+sendStuff = function () {
+ dispatcher.send($("#websocket-message-content").val());
+};
+
+$("#send-to-websocket").click(sendStuff);
+});
+
+</script>
get '/collections/:uuid/*file' => 'collections#show_file', :format => false
post 'actions' => 'actions#post'
+ get 'websockets' => 'websocket#index'
root :to => 'users#welcome'
gem 'google-api-client', '~> 0.6.3'
gem 'trollop'
+gem 'faye-websocket'
+gem 'database_cleaner'
gem 'themes_for_rails'
coffee-script-source (1.7.0)
curb (0.8.5)
daemon_controller (1.2.0)
+ database_cleaner (1.2.0)
erubis (2.7.0)
+ eventmachine (1.0.3)
execjs (2.0.2)
extlib (0.9.16)
faraday (0.8.9)
multipart-post (~> 1.2.0)
+ faye-websocket (0.7.2)
+ eventmachine (>= 0.12.0)
+ websocket-driver (>= 0.3.1)
google-api-client (0.6.4)
addressable (>= 2.3.2)
autoparse (>= 0.3.3)
execjs (>= 0.3.0)
json (>= 1.8.0)
uuidtools (2.1.4)
+ websocket-driver (0.3.2)
PLATFORMS
ruby
andand
arvados-cli (>= 0.1.20140328152103)
coffee-rails (~> 3.2.0)
+ database_cleaner
+ faye-websocket
google-api-client (~> 0.6.3)
jquery-rails
multi_json
+require 'load_param'
+require 'record_filters'
+
class ApplicationController < ActionController::Base
include CurrentApiClient
include ThemesForRails::ActionController
+ include LoadParam
+ include RecordFilters
+
+ ERROR_ACTIONS = [:render_error, :render_not_found]
respond_to :json
protect_from_forgery
- around_filter :thread_with_auth_info, :except => [:render_error, :render_not_found]
before_filter :respond_with_json_by_default
before_filter :remote_ip
- before_filter :require_auth_scope, :except => :render_not_found
- before_filter :catch_redirect_hint
+ before_filter :load_read_auths
+ before_filter :require_auth_scope, except: ERROR_ACTIONS
- before_filter :find_object_by_uuid, :except => [:index, :create,
- :render_error,
- :render_not_found]
+ before_filter :catch_redirect_hint
+ before_filter(:find_object_by_uuid,
+ except: [:index, :create] + ERROR_ACTIONS)
before_filter :load_limit_offset_order_params, only: [:index, :owned_items]
before_filter :load_where_param, only: [:index, :owned_items]
before_filter :load_filters_param, only: [:index, :owned_items]
before_filter :find_objects_for_index, :only => :index
before_filter :reload_object_before_update, :only => :update
- before_filter :render_404_if_no_object, except: [:index, :create,
- :render_error,
- :render_not_found]
+ before_filter(:render_404_if_no_object,
+ except: [:index, :create] + ERROR_ACTIONS)
theme :select_theme
attr_accessor :resource_attrs
- DEFAULT_LIMIT = 100
-
def index
@objects.uniq!(&:id)
if params[:eager] and params[:eager] != '0' and params[:eager] != 0 and params[:eager] != ''
when 'ApiClientAuthorization'
# Do not want.
else
- @objects = klass.readable_by(current_user)
+ @objects = klass.readable_by(*@read_users)
cond_sql = "#{klass.table_name}.owner_uuid = ?"
cond_params = [@object.uuid]
if params[:include_linked]
protected
- def load_where_param
- if params[:where].nil? or params[:where] == ""
- @where = {}
- elsif params[:where].is_a? Hash
- @where = params[:where]
- elsif params[:where].is_a? String
- begin
- @where = Oj.load(params[:where])
- raise unless @where.is_a? Hash
- rescue
- raise ArgumentError.new("Could not parse \"where\" param as an object")
- end
- end
- @where = @where.with_indifferent_access
- end
-
- def load_filters_param
- @filters ||= []
- if params[:filters].is_a? Array
- @filters += params[:filters]
- elsif params[:filters].is_a? String and !params[:filters].empty?
- begin
- f = Oj.load params[:filters]
- raise unless f.is_a? Array
- @filters += f
- rescue
- raise ArgumentError.new("Could not parse \"filters\" param as an array")
- end
- end
- end
-
- def default_orders
- ["#{table_name}.modified_at desc"]
- end
-
- def load_limit_offset_order_params
- if params[:limit]
- unless params[:limit].to_s.match(/^\d+$/)
- raise ArgumentError.new("Invalid value for limit parameter")
- end
- @limit = params[:limit].to_i
- else
- @limit = DEFAULT_LIMIT
- end
-
- if params[:offset]
- unless params[:offset].to_s.match(/^\d+$/)
- raise ArgumentError.new("Invalid value for offset parameter")
- end
- @offset = params[:offset].to_i
- else
- @offset = 0
- end
-
- @orders = []
- if params[:order]
- params[:order].split(',').each do |order|
- attr, direction = order.strip.split " "
- direction ||= 'asc'
- if attr.match /^[a-z][_a-z0-9]+$/ and
- model_class.columns.collect(&:name).index(attr) and
- ['asc','desc'].index direction.downcase
- @orders << "#{table_name}.#{attr} #{direction.downcase}"
- end
- end
- end
- if @orders.empty?
- @orders = default_orders
- end
- end
-
def find_objects_for_index
- @objects ||= model_class.readable_by(current_user)
+ @objects ||= model_class.readable_by(*@read_users)
apply_where_limit_order_params
end
def apply_where_limit_order_params
ar_table_name = @objects.table_name
- if @filters.is_a? Array and @filters.any?
- cond_out = []
- param_out = []
- @filters.each do |filter|
- attr, operator, operand = filter
- if !filter.is_a? Array
- raise ArgumentError.new("Invalid element in filters array: #{filter.inspect} is not an array")
- elsif !operator.is_a? String
- raise ArgumentError.new("Invalid operator '#{operator}' (#{operator.class}) in filter")
- elsif !model_class.searchable_columns(operator).index attr.to_s
- raise ArgumentError.new("Invalid attribute '#{attr}' in filter")
- end
- case operator.downcase
- when '=', '<', '<=', '>', '>=', 'like'
- if operand.is_a? String
- cond_out << "#{ar_table_name}.#{attr} #{operator} ?"
- if (# any operator that operates on value rather than
- # representation:
- operator.match(/[<=>]/) and
- model_class.attribute_column(attr).type == :datetime)
- operand = Time.parse operand
- end
- param_out << operand
- elsif operand.nil? and operator == '='
- cond_out << "#{ar_table_name}.#{attr} is null"
- else
- raise ArgumentError.new("Invalid operand type '#{operand.class}' "\
- "for '#{operator}' operator in filters")
- end
- when 'in'
- if operand.is_a? Array
- cond_out << "#{ar_table_name}.#{attr} IN (?)"
- param_out << operand
- else
- raise ArgumentError.new("Invalid operand type '#{operand.class}' "\
- "for '#{operator}' operator in filters")
- end
- when 'is_a'
- operand = [operand] unless operand.is_a? Array
- cond = []
- operand.each do |op|
- cl = ArvadosModel::kind_class op
- if cl
- cond << "#{ar_table_name}.#{attr} like ?"
- param_out << cl.uuid_like_pattern
- else
- cond << "1=0"
- end
- end
- cond_out << cond.join(' OR ')
- end
- end
- if cond_out.any?
- @objects = @objects.where(cond_out.join(' AND '), *param_out)
- end
+
+ ft = record_filters @filters, ar_table_name
+ if ft[:cond_out].any?
+ @objects = @objects.where(ft[:cond_out].join(' AND '), *ft[:param_out])
end
+
if @where.is_a? Hash and @where.any?
conditions = ['1=1']
@where.each do |attr,value|
end
# Authentication
+ def load_read_auths
+ @read_auths = []
+ if current_api_client_authorization
+ @read_auths << current_api_client_authorization
+ end
+ # Load reader tokens if this is a read request.
+ # If there are too many reader tokens, assume the request is malicious
+ # and ignore it.
+ if request.get? and params[:reader_tokens] and
+ params[:reader_tokens].size < 100
+ @read_auths += ApiClientAuthorization
+ .includes(:user)
+ .where('api_token IN (?) AND
+ (expires_at IS NULL OR expires_at > CURRENT_TIMESTAMP)',
+ params[:reader_tokens])
+ .all
+ end
+ @read_auths.select! { |auth| auth.scopes_allow_request? request }
+ @read_users = @read_auths.map { |auth| auth.user }.uniq
+ end
+
def require_login
- if current_user
- true
- else
+ if not current_user
respond_to do |format|
format.json {
render :json => { errors: ['Not logged in'] }.to_json, status: 401
}
- format.html {
+ format.html {
redirect_to '/auth/joshid'
}
end
end
def require_auth_scope
- return false unless require_login
- unless current_api_client_auth_has_scope("#{request.method} #{request.path}")
- render :json => { errors: ['Forbidden'] }.to_json, status: 403
- end
- end
-
- def thread_with_auth_info
- Thread.current[:request_starttime] = Time.now
- Thread.current[:api_url_base] = root_url.sub(/\/$/,'') + '/arvados/v1'
- begin
- user = nil
- api_client = nil
- api_client_auth = nil
- supplied_token =
- params[:api_token] ||
- params[:oauth_token] ||
- request.headers["Authorization"].andand.match(/OAuth2 ([a-z0-9]+)/).andand[1]
- if supplied_token
- api_client_auth = ApiClientAuthorization.
- includes(:api_client, :user).
- where('api_token=? and (expires_at is null or expires_at > CURRENT_TIMESTAMP)', supplied_token).
- first
- if api_client_auth.andand.user
- session[:user_id] = api_client_auth.user.id
- session[:api_client_uuid] = api_client_auth.api_client.andand.uuid
- session[:api_client_authorization_id] = api_client_auth.id
- user = api_client_auth.user
- api_client = api_client_auth.api_client
- else
- # Token seems valid, but points to a non-existent (deleted?) user.
- api_client_auth = nil
- end
- elsif session[:user_id]
- user = User.find(session[:user_id]) rescue nil
- api_client = ApiClient.
- where('uuid=?',session[:api_client_uuid]).
- first rescue nil
- if session[:api_client_authorization_id] then
- api_client_auth = ApiClientAuthorization.
- find session[:api_client_authorization_id]
- end
- end
- Thread.current[:api_client_ip_address] = remote_ip
- Thread.current[:api_client_authorization] = api_client_auth
- Thread.current[:api_client_uuid] = api_client.andand.uuid
- Thread.current[:api_client] = api_client
- Thread.current[:user] = user
- if api_client_auth
- api_client_auth.last_used_at = Time.now
- api_client_auth.last_used_by_ip_address = remote_ip
- api_client_auth.save validate: false
+ if @read_auths.empty?
+ if require_login != false
+ render :json => { errors: ['Forbidden'] }.to_json, status: 403
end
- yield
- ensure
- Thread.current[:api_client_ip_address] = nil
- Thread.current[:api_client_authorization] = nil
- Thread.current[:api_client_uuid] = nil
- Thread.current[:api_client] = nil
- Thread.current[:user] = nil
+ false
end
end
- # /Authentication
def respond_with_json_by_default
html_index = request.accepts.index(Mime::HTML)
end
end
- def self.accept_attribute_as_json(attr, force_class=nil)
- before_filter lambda { accept_attribute_as_json attr, force_class }
+ def load_json_value(hash, key, must_be_class=nil)
+ if hash[key].is_a? String
+ hash[key] = Oj.load(hash[key], symbol_keys: false)
+ if must_be_class and !hash[key].is_a? must_be_class
+ raise TypeError.new("parameter #{key.to_s} must be a #{must_be_class.to_s}")
+ end
+ end
+ end
+
+ def self.accept_attribute_as_json(attr, must_be_class=nil)
+ before_filter lambda { accept_attribute_as_json attr, must_be_class }
end
accept_attribute_as_json :properties, Hash
accept_attribute_as_json :info, Hash
- def accept_attribute_as_json(attr, force_class)
+ def accept_attribute_as_json(attr, must_be_class)
if params[resource_name] and resource_attrs.is_a? Hash
- if resource_attrs[attr].is_a? String
- resource_attrs[attr] = Oj.load(resource_attrs[attr],
- symbol_keys: false)
- if force_class and !resource_attrs[attr].is_a? force_class
- raise TypeError.new("#{resource_name}[#{attr.to_s}] must be a #{force_class.to_s}")
- end
- elsif resource_attrs[attr].is_a? Hash
+ if resource_attrs[attr].is_a? Hash
# Convert symbol keys to strings (in hashes provided by
# resource_attrs)
resource_attrs[attr] = resource_attrs[attr].
with_indifferent_access.to_hash
+ else
+ load_json_value(resource_attrs, attr, must_be_class)
end
end
end
+ def self.accept_param_as_json(key, must_be_class=nil)
+ prepend_before_filter lambda { load_json_value(params, key, must_be_class) }
+ end
+ accept_param_as_json :reader_tokens, Array
+
def render_list
@object_list = {
:kind => "arvados##{(@response_resource_name || resource_name).camelize(:lower)}List",
resources: {}
}
+ if Rails.application.config.websocket_address
+ discovery[:websocketUrl] = Rails.application.config.websocket_address
+ elsif ENV['ARVADOS_WEBSOCKETS']
+ discovery[:websocketUrl] = (root_url.sub /^http/, 'ws') + "/websocket"
+ end
+
ActiveRecord::Base.descendants.reject(&:abstract_class?).each do |k|
begin
ctl_class = "Arvados::V1::#{k.to_s.pluralize}Controller".constantize
--- /dev/null
+# Perform api_token checking very early in the request process. We want to do
+# this in the Rack stack instead of in ApplicationController because
+# websockets needs access to authentication but doesn't use any of the rails
+# active dispatch infrastructure.
+class ArvadosApiToken
+
+ # Create a new ArvadosApiToken handler
+ # +app+ The next layer of the Rack stack.
+ def initialize(app = nil, options = nil)
+ @app = app if app.respond_to?(:call)
+ end
+
+ def call env
+ # First, clean up just in case we have a multithreaded server and thread
+ # local variables are still set from a prior request. Also useful for
+ # tests that call this code to set up the environment.
+ Thread.current[:api_client_ip_address] = nil
+ Thread.current[:api_client_authorization] = nil
+ Thread.current[:api_client_uuid] = nil
+ Thread.current[:api_client] = nil
+ Thread.current[:user] = nil
+
+ request = Rack::Request.new(env)
+ params = request.params
+ remote_ip = env["action_dispatch.remote_ip"]
+
+ Thread.current[:request_starttime] = Time.now
+ user = nil
+ api_client = nil
+ api_client_auth = nil
+ supplied_token =
+ params["api_token"] ||
+ params["oauth_token"] ||
+ env["HTTP_AUTHORIZATION"].andand.match(/OAuth2 ([a-z0-9]+)/).andand[1]
+ if supplied_token
+ api_client_auth = ApiClientAuthorization.
+ includes(:api_client, :user).
+ where('api_token=? and (expires_at is null or expires_at > CURRENT_TIMESTAMP)', supplied_token).
+ first
+ if api_client_auth.andand.user
+ user = api_client_auth.user
+ api_client = api_client_auth.api_client
+ else
+ # Token seems valid, but points to a non-existent (deleted?) user.
+ api_client_auth = nil
+ end
+ end
+ Thread.current[:api_client_ip_address] = remote_ip
+ Thread.current[:api_client_authorization] = api_client_auth
+ Thread.current[:api_client_uuid] = api_client.andand.uuid
+ Thread.current[:api_client] = api_client
+ Thread.current[:user] = user
+ if api_client_auth
+ api_client_auth.last_used_at = Time.now
+ api_client_auth.last_used_by_ip_address = remote_ip.to_s
+ api_client_auth.save validate: false
+ end
+
+ @app.call env if @app
+ end
+end
--- /dev/null
+require 'rack'
+require 'faye/websocket'
+require 'eventmachine'
+
+# A Rack middleware to handle inbound websocket connection requests and hand
+# them over to the faye websocket library.
+class RackSocket
+
+ DEFAULT_ENDPOINT = '/websocket'
+
+ # Stop EventMachine on signal, this should give it a chance to to unwind any
+ # open connections.
+ def die_gracefully_on_signal
+ Signal.trap("INT") { EM.stop }
+ Signal.trap("TERM") { EM.stop }
+ end
+
+ # Create a new RackSocket handler
+ # +app+ The next layer of the Rack stack.
+ #
+ # Accepts options:
+ # +:handler+ (Required) A class to handle new connections. #initialize will
+ # call handler.new to create the actual handler instance object. When a new
+ # websocket connection is established, #on_connect on the handler instance
+ # object will be called with the new connection.
+ #
+ # +:mount+ The HTTP request path that will be recognized for websocket
+ # connect requests, defaults to '/websocket'.
+ #
+ # +:websocket_only+ If true, the server will only handle websocket requests,
+ # and all other requests will result in an error. If false, unhandled
+ # non-websocket requests will be passed along on to 'app' in the usual Rack
+ # way.
+ def initialize(app = nil, options = nil)
+ @app = app if app.respond_to?(:call)
+ @options = [app, options].grep(Hash).first || {}
+ @endpoint = @options[:mount] || DEFAULT_ENDPOINT
+ @websocket_only = @options[:websocket_only] || false
+
+ # from https://gist.github.com/eatenbyagrue/1338545#file-eventmachine-rb
+ if defined?(PhusionPassenger)
+ PhusionPassenger.on_event(:starting_worker_process) do |forked|
+ # for passenger, we need to avoid orphaned threads
+ if forked && EM.reactor_running?
+ EM.stop
+ end
+ Thread.new {
+ EM.run
+ }
+ die_gracefully_on_signal
+ end
+ else
+ # faciliates debugging
+ Thread.abort_on_exception = true
+ # just spawn a thread and start it up
+ Thread.new {
+ EM.run
+ }
+ end
+
+ # Create actual handler instance object from handler class.
+ @handler = @options[:handler].new
+ end
+
+ # Handle websocket connection request, or pass on to the next middleware
+ # supplied in +app+ initialize (unless +:websocket_only+ option is true, in
+ # which case return an error response.)
+ # +env+ the Rack environment with information about the request.
+ def call env
+ request = Rack::Request.new(env)
+ if request.path_info == @endpoint and Faye::WebSocket.websocket?(env)
+ ws = Faye::WebSocket.new(env)
+
+ # Notify handler about new connection
+ @handler.on_connect ws
+
+ # Return async Rack response
+ ws.rack_response
+ elsif not @websocket_only
+ @app.call env
+ else
+ [406, {"Content-Type" => "text/plain"}, ["Only websocket connections are permitted on this port."]]
+ end
+ end
+
+end
end
def modified_at=(x) end
+ def scopes_allow?(req_s)
+ scopes.each do |scope|
+ return true if (scope == 'all') or (scope == req_s) or
+ ((scope.end_with? '/') and (req_s.start_with? scope))
+ end
+ false
+ end
+
+ def scopes_allow_request?(request)
+ scopes_allow? [request.method, request.path].join(' ')
+ end
+
def logged_attributes
attrs = attributes.dup
attrs.delete('api_token')
self.columns.select { |col| col.name == attr.to_s }.first
end
- # def eager_load_associations
- # self.class.columns.each do |col|
- # re = col.name.match /^(.*)_kind$/
- # if (re and
- # self.respond_to? re[1].to_sym and
- # (auuid = self.send((re[1] + '_uuid').to_sym)) and
- # (aclass = self.class.kind_class(self.send(col.name.to_sym))) and
- # (aobject = aclass.where('uuid=?', auuid).first))
- # self.instance_variable_set('@'+re[1], aobject)
- # end
- # end
- # end
-
- def self.readable_by user
- uuid_list = [user.uuid, *user.groups_i_can(:read)]
- sanitized_uuid_list = uuid_list.
- collect { |uuid| sanitize(uuid) }.join(', ')
- or_references_me = ''
- if self == Link and user
- or_references_me = "OR (#{table_name}.link_class in (#{sanitize 'permission'}, #{sanitize 'resources'}) AND #{sanitize user.uuid} IN (#{table_name}.head_uuid, #{table_name}.tail_uuid))"
+ # Return a query with read permissions restricted to the union of of the
+ # permissions of the members of users_list, i.e. if something is readable by
+ # any user in users_list, it will be readable in the query returned by this
+ # function.
+ def self.readable_by(*users_list)
+ # Get rid of troublesome nils
+ users_list.compact!
+
+ # Check if any of the users are admin. If so, we're done.
+ if users_list.select { |u| u.is_admin }.empty?
+
+ # Collect the uuids for each user and any groups readable by each user.
+ user_uuids = users_list.map { |u| u.uuid }
+ uuid_list = user_uuids + users_list.flat_map { |u| u.groups_i_can(:read) }
+ sanitized_uuid_list = uuid_list.
+ collect { |uuid| sanitize(uuid) }.join(', ')
+ sql_conds = []
+ sql_params = []
+ or_object_uuid = ''
+
+ # This row is owned by a member of users_list, or owned by a group
+ # readable by a member of users_list
+ # or
+ # This row uuid is the uuid of a member of users_list
+ # or
+ # A permission link exists ('write' and 'manage' implicitly include
+ # 'read') from a member of users_list, or a group readable by users_list,
+ # to this row, or to the owner of this row (see join() below).
+ sql_conds += ["#{table_name}.owner_uuid in (?)",
+ "#{table_name}.uuid in (?)",
+ "permissions.head_uuid IS NOT NULL"]
+ sql_params += [uuid_list, user_uuids]
+
+ if self == Link and users_list.any?
+ # This row is a 'permission' or 'resources' link class
+ # The uuid for a member of users_list is referenced in either the head
+ # or tail of the link
+ sql_conds += ["(#{table_name}.link_class in (#{sanitize 'permission'}, #{sanitize 'resources'}) AND (#{table_name}.head_uuid IN (?) OR #{table_name}.tail_uuid IN (?)))"]
+ sql_params += [user_uuids, user_uuids]
+ end
+
+ if self == Log and users_list.any?
+ # Link head points to the object described by this row
+ or_object_uuid = ", #{table_name}.object_uuid"
+
+ # This object described by this row is owned by this user, or owned by a group readable by this user
+ sql_conds += ["#{table_name}.object_owner_uuid in (?)"]
+ sql_params += [uuid_list]
+ end
+
+ # Link head points to this row, or to the owner of this row (the thing to be read)
+ #
+ # Link tail originates from this user, or a group that is readable by this
+ # user (the identity with authorization to read)
+ #
+ # Link class is 'permission' ('write' and 'manage' implicitly include 'read')
+
+ joins("LEFT JOIN links permissions ON permissions.head_uuid in (#{table_name}.owner_uuid, #{table_name}.uuid #{or_object_uuid}) AND permissions.tail_uuid in (#{sanitized_uuid_list}) AND permissions.link_class='permission'")
+ .where(sql_conds.join(' OR '), *sql_params).uniq
+
+ else
+ # At least one user is admin, so don't bother to apply any restrictions.
+ self
end
- joins("LEFT JOIN links permissions ON permissions.head_uuid in (#{table_name}.owner_uuid, #{table_name}.uuid) AND permissions.tail_uuid in (#{sanitized_uuid_list}) AND permissions.link_class='permission'").
- where("?=? OR #{table_name}.owner_uuid in (?) OR #{table_name}.uuid=? OR permissions.head_uuid IS NOT NULL #{or_references_me}",
- true, user.is_admin,
- uuid_list,
- user.uuid)
+
end
def logged_attributes
self.class.kind
end
- def self.readable_by (u)
+ def self.readable_by (*u)
self
end
log = Log.new(event_type: event_type).fill_object(self)
yield log
log.save!
+ connection.execute "NOTIFY logs, '#{log.id}'"
log_start_state
end
api_accessible :user, extend: :common do |t|
t.add :object_uuid
- t.add :object, :if => :object
+ t.add :object_owner_uuid
t.add :object_kind
t.add :event_at
t.add :event_type
def fill_object(thing)
self.object_uuid ||= thing.uuid
+ self.object_owner_uuid = thing.owner_uuid
self.summary ||= "#{self.event_type} of #{thing.uuid}"
self
end
def ensure_valid_uuids
# logs can have references to deleted objects
end
+
end
common:
#git_repositories_dir: /var/cache/git
#git_internal_dir: /var/cache/arvados/internal.git
+
+ # You can run the websocket server separately from the regular HTTP service
+ # by setting "ARVADOS_WEBSOCKETS=ws-only" in the environment before running
+ # the websocket server. When you do this, you need to set the following
+ # configuration variable so that the primary server can give out the correct
+ # address of the dedicated websocket server:
+ #websocket_address: wss://websocket.local/websocket
--- /dev/null
+Server::Application.configure do
+ config.middleware.insert_before ActionDispatch::Static, ArvadosApiToken
+ config.middleware.delete ActionDispatch::RemoteIp
+ config.middleware.insert_before ArvadosApiToken, ActionDispatch::RemoteIp
+end
--- /dev/null
+require 'eventbus'
+
+Server::Application.configure do
+ # Enables websockets if ARVADOS_WEBSOCKETS is defined with any value. If
+ # ARVADOS_WEBSOCKETS=ws-only, server will only accept websocket connections
+ # and return an error response for all other requests.
+ if ENV['ARVADOS_WEBSOCKETS']
+ config.middleware.insert_after ArvadosApiToken, RackSocket, {
+ :handler => EventBus,
+ :mount => "/websocket",
+ :websocket_only => (ENV['ARVADOS_WEBSOCKETS'] == "ws-only")
+ }
+ end
+
+ # Define websocket_address configuration option, can be overridden in config files.
+ # See application.yml.example for details.
+ config.websocket_address = nil
+end
--- /dev/null
+class AddObjectOwnerToLogs < ActiveRecord::Migration
+ include CurrentApiClient
+
+ def up
+ add_column :logs, :object_owner_uuid, :string
+ act_as_system_user do
+ Log.all.each do |log|
+ if log.properties[:new_attributes]
+ log.object_owner_uuid = log.properties[:new_attributes][:owner_uuid]
+ elsif log.properties[:old_attributes]
+ log.object_owner_uuid = log.properties[:old_attributes][:owner_uuid]
+ end
+ log.save!
+ end
+ end
+ end
+
+ def down
+ remove_column :logs, :object_owner_uuid
+ end
+end
t.datetime "created_at", :null => false
t.datetime "updated_at", :null => false
t.datetime "modified_at"
+ t.string "object_owner_uuid"
end
add_index "logs", ["created_at"], :name => "index_logs_on_created_at"
Thread.current[:api_client_ip_address]
end
- # Is the current API client authorization scoped for the request?
- def current_api_client_auth_has_scope(req_s)
- (current_api_client_authorization.andand.scopes || []).select { |scope|
- if scope == 'all'
- true
- elsif scope.end_with? '/'
- req_s.start_with? scope
- else
- req_s == scope
- end
- }.any?
- end
-
def system_user_uuid
[Server::Application.config.uuid_prefix,
User.uuid_prefix,
--- /dev/null
+require 'eventmachine'
+require 'oj'
+require 'faye/websocket'
+require 'record_filters'
+require 'load_param'
+
+# Patch in user, last_log_id and filters fields into the Faye::Websocket class.
+module Faye
+ class WebSocket
+ attr_accessor :user
+ attr_accessor :last_log_id
+ attr_accessor :filters
+ end
+end
+
+# Store the filters supplied by the user that will be applied to the logs table
+# to determine which events to return to the listener.
+class Filter
+ include LoadParam
+
+ attr_accessor :filters
+
+ def initialize p
+ @params = p
+ load_filters_param
+ end
+
+ def params
+ @params
+ end
+end
+
+# Manages websocket connections, accepts subscription messages and publishes
+# log table events.
+class EventBus
+ include CurrentApiClient
+ include RecordFilters
+
+ # used in RecordFilters
+ def model_class
+ Log
+ end
+
+ # Initialize EventBus. Takes no parameters.
+ def initialize
+ @channel = EventMachine::Channel.new
+ @mtx = Mutex.new
+ @bgthread = false
+ end
+
+ # Push out any pending events to the connection +ws+
+ # +id+ the id of the most recent row in the log table, may be nil
+ def push_events ws, id = nil
+ begin
+ # Must have at least one filter set up to receive events
+ if ws.filters.length > 0
+ # Start with log rows readable by user, sorted in ascending order
+ logs = Log.readable_by(ws.user).order("id asc")
+
+ if ws.last_log_id
+ # Client is only interested in log rows that are newer than the
+ # last log row seen by the client.
+ logs = logs.where("logs.id > ?", ws.last_log_id)
+ elsif id
+ # No last log id, so only look at the most recently changed row
+ logs = logs.where("logs.id = ?", id.to_i)
+ else
+ return
+ end
+
+ # Now process filters provided by client
+ cond_out = []
+ param_out = []
+ ws.filters.each do |filter|
+ ft = record_filters filter.filters, Log.table_name
+ cond_out += ft[:cond_out]
+ param_out += ft[:param_out]
+ end
+
+ # Add filters to query
+ if cond_out.any?
+ logs = logs.where(cond_out.join(' OR '), *param_out)
+ end
+
+ # Finally execute query and actually send the matching log rows
+ logs.each do |l|
+ ws.send(l.as_api_response.to_json)
+ ws.last_log_id = l.id
+ end
+ elsif id
+ # No filters set up, so just record the sequence number
+ ws.last_log_id = id.to_i
+ end
+ rescue Exception => e
+ puts "Error publishing event: #{$!}"
+ puts "Backtrace:\n\t#{e.backtrace.join("\n\t")}"
+ ws.send ({status: 500, message: 'error'}.to_json)
+ ws.close
+ end
+ end
+
+ # Handle inbound subscribe or unsubscribe message.
+ def handle_message ws, event
+ begin
+ # Parse event data as JSON
+ p = (Oj.load event.data).symbolize_keys
+
+ if p[:method] == 'subscribe'
+ # Handle subscribe event
+
+ if p[:last_log_id]
+ # Set or reset the last_log_id. The event bus only reports events
+ # for rows that come after last_log_id.
+ ws.last_log_id = p[:last_log_id].to_i
+ end
+
+ if ws.filters.length < MAX_FILTERS
+ # Add a filter. This gets the :filters field which is the same
+ # format as used for regular index queries.
+ ws.filters << Filter.new(p)
+ ws.send ({status: 200, message: 'subscribe ok'}.to_json)
+
+ # Send any pending events
+ push_events ws
+ else
+ ws.send ({status: 403, message: "maximum of #{MAX_FILTERS} filters allowed per connection"}.to_json)
+ end
+
+ elsif p[:method] == 'unsubscribe'
+ # Handle unsubscribe event
+
+ len = ws.filters.length
+ ws.filters.select! { |f| not ((f.filters == p[:filters]) or (f.filters.empty? and p[:filters].nil?)) }
+ if ws.filters.length < len
+ ws.send ({status: 200, message: 'unsubscribe ok'}.to_json)
+ else
+ ws.send ({status: 404, message: 'filter not found'}.to_json)
+ end
+
+ else
+ ws.send ({status: 400, message: "missing or unrecognized method"}.to_json)
+ end
+ rescue Oj::Error => e
+ ws.send ({status: 400, message: "malformed request"}.to_json)
+ rescue Exception => e
+ puts "Error handling message: #{$!}"
+ puts "Backtrace:\n\t#{e.backtrace.join("\n\t")}"
+ ws.send ({status: 500, message: 'error'}.to_json)
+ ws.close
+ end
+ end
+
+ # Constant maximum number of filters, to avoid silly huge database queries.
+ MAX_FILTERS = 16
+
+ # Called by RackSocket when a new websocket connection has been established.
+ def on_connect ws
+
+ # Disconnect if no valid API token.
+ # current_user is included from CurrentApiClient
+ if not current_user
+ ws.send ({status: 401, message: "Valid API token required"}.to_json)
+ ws.close
+ return
+ end
+
+ # Initialize our custom fields on the websocket connection object.
+ ws.user = current_user
+ ws.filters = []
+ ws.last_log_id = nil
+
+ # Subscribe to internal postgres notifications through @channel. This will
+ # call push_events when a notification comes through.
+ sub = @channel.subscribe do |msg|
+ push_events ws, msg
+ end
+
+ # Set up callback for inbound message dispatch.
+ ws.on :message do |event|
+ handle_message ws, event
+ end
+
+ # Set up socket close callback
+ ws.on :close do |event|
+ @channel.unsubscribe sub
+ ws = nil
+ end
+
+ # Start up thread to monitor the Postgres database, if none exists already.
+ @mtx.synchronize do
+ unless @bgthread
+ @bgthread = true
+ Thread.new do
+ # from http://stackoverflow.com/questions/16405520/postgres-listen-notify-rails
+ ActiveRecord::Base.connection_pool.with_connection do |connection|
+ conn = connection.instance_variable_get(:@connection)
+ begin
+ conn.async_exec "LISTEN logs"
+ while true
+ # wait_for_notify will block until there is a change
+ # notification from Postgres about the logs table, then push
+ # the notification into the EventMachine channel. Each
+ # websocket connection subscribes to the other end of the
+ # channel and calls #push_events to actually dispatch the
+ # events to the client.
+ conn.wait_for_notify do |channel, pid, payload|
+ @channel.push payload
+ end
+ end
+ ensure
+ # Don't want the connection to still be listening once we return
+ # it to the pool - could result in weird behavior for the next
+ # thread to check it out.
+ conn.async_exec "UNLISTEN *"
+ end
+ end
+ @bgthread = false
+ end
+ end
+ end
+
+ # Since EventMachine is an asynchronous event based dispatcher, #on_connect
+ # does not block but instead returns immediately after having set up the
+ # websocket and notification channel callbacks.
+ end
+end
--- /dev/null
+# Mixin module for reading out query parameters from request params.
+#
+# Expects:
+# +params+ Hash
+# Sets:
+# @where, @filters, @limit, @offset, @orders
+module LoadParam
+
+ # Default limit on number of rows to return in a single query.
+ DEFAULT_LIMIT = 100
+
+ # Load params[:where] into @where
+ def load_where_param
+ if params[:where].nil? or params[:where] == ""
+ @where = {}
+ elsif params[:where].is_a? Hash
+ @where = params[:where]
+ elsif params[:where].is_a? String
+ begin
+ @where = Oj.load(params[:where])
+ raise unless @where.is_a? Hash
+ rescue
+ raise ArgumentError.new("Could not parse \"where\" param as an object")
+ end
+ end
+ @where = @where.with_indifferent_access
+ end
+
+ # Load params[:filters] into @filters
+ def load_filters_param
+ @filters ||= []
+ if params[:filters].is_a? Array
+ @filters += params[:filters]
+ elsif params[:filters].is_a? String and !params[:filters].empty?
+ begin
+ f = Oj.load params[:filters]
+ raise unless f.is_a? Array
+ @filters += f
+ rescue
+ raise ArgumentError.new("Could not parse \"filters\" param as an array")
+ end
+ end
+ end
+
+ def default_orders
+ ["#{table_name}.modified_at desc"]
+ end
+
+ # Load params[:limit], params[:offset] and params[:order]
+ # into @limit, @offset, @orders
+ def load_limit_offset_order_params
+ if params[:limit]
+ unless params[:limit].to_s.match(/^\d+$/)
+ raise ArgumentError.new("Invalid value for limit parameter")
+ end
+ @limit = params[:limit].to_i
+ else
+ @limit = DEFAULT_LIMIT
+ end
+
+ if params[:offset]
+ unless params[:offset].to_s.match(/^\d+$/)
+ raise ArgumentError.new("Invalid value for offset parameter")
+ end
+ @offset = params[:offset].to_i
+ else
+ @offset = 0
+ end
+
+ @orders = []
+ if params[:order]
+ params[:order].split(',').each do |order|
+ attr, direction = order.strip.split " "
+ direction ||= 'asc'
+ if attr.match /^[a-z][_a-z0-9]+$/ and
+ model_class.columns.collect(&:name).index(attr) and
+ ['asc','desc'].index direction.downcase
+ @orders << "#{table_name}.#{attr} #{direction.downcase}"
+ end
+ end
+ end
+ if @orders.empty?
+ @orders << default_orders
+ end
+ end
+
+
+end
--- /dev/null
+# Mixin module providing a method to convert filters into a list of SQL
+# fragments suitable to be fed to ActiveRecord #where.
+#
+# Expects:
+# model_class
+# Operates on:
+# @objects
+module RecordFilters
+
+ # Input:
+ # +filters+ Arvados filters as list of lists.
+ # +ar_table_name+ name of SQL table
+ #
+ # Output:
+ # Hash with two keys:
+ # :cond_out array of SQL fragments for each filter expression
+ # :param_out array of values for parameter substitution in cond_out
+ def record_filters filters, ar_table_name
+ cond_out = []
+ param_out = []
+
+ filters.each do |filter|
+ attr, operator, operand = filter
+ if !filter.is_a? Array
+ raise ArgumentError.new("Invalid element in filters array: #{filter.inspect} is not an array")
+ elsif !operator.is_a? String
+ raise ArgumentError.new("Invalid operator '#{operator}' (#{operator.class}) in filter")
+ elsif !model_class.searchable_columns(operator).index attr.to_s
+ raise ArgumentError.new("Invalid attribute '#{attr}' in filter")
+ end
+ case operator.downcase
+ when '=', '<', '<=', '>', '>=', 'like'
+ if operand.is_a? String
+ cond_out << "#{ar_table_name}.#{attr} #{operator} ?"
+ if (# any operator that operates on value rather than
+ # representation:
+ operator.match(/[<=>]/) and
+ model_class.attribute_column(attr).type == :datetime)
+ operand = Time.parse operand
+ end
+ param_out << operand
+ elsif operand.nil? and operator == '='
+ cond_out << "#{ar_table_name}.#{attr} is null"
+ else
+ raise ArgumentError.new("Invalid operand type '#{operand.class}' "\
+ "for '#{operator}' operator in filters")
+ end
+ when 'in'
+ if operand.is_a? Array
+ cond_out << "#{ar_table_name}.#{attr} IN (?)"
+ param_out << operand
+ else
+ raise ArgumentError.new("Invalid operand type '#{operand.class}' "\
+ "for '#{operator}' operator in filters")
+ end
+ when 'is_a'
+ operand = [operand] unless operand.is_a? Array
+ cond = []
+ operand.each do |op|
+ cl = ArvadosModel::kind_class op
+ if cl
+ cond << "#{ar_table_name}.#{attr} like ?"
+ param_out << cl.uuid_like_pattern
+ else
+ cond << "1=0"
+ end
+ end
+ cond_out << cond.join(' OR ')
+ end
+ end
+
+ {:cond_out => cond_out, :param_out => param_out}
+ end
+
+end
api_token: 27bnddk6x2nmq00a1e3gq43n9tsl5v87a3faqar2ijj8tud5en
expires_at: 2038-01-01 00:00:00
+active_noscope:
+ api_client: untrusted
+ user: active
+ api_token: activenoscopeabcdefghijklmnopqrstuvwxyz12345678901
+ expires_at: 2038-01-01 00:00:00
+ scopes: []
+
admin_vm:
api_client: untrusted
user: admin
api_token: zw2f4gwx8hw8cjre7yp6v1zylhrhn3m5gvjq73rtpwhmknrybu
expires_at: 2038-01-01 00:00:00
+spectator_specimens:
+ api_client: untrusted
+ user: spectator
+ api_token: spectatorspecimensabcdefghijklmnopqrstuvwxyz123245
+ expires_at: 2038-01-01 00:00:00
+ scopes: ["GET /arvados/v1/specimens", "GET /arvados/v1/specimens/",
+ "POST /arvados/v1/specimens"]
+
inactive:
api_client: untrusted
user: inactive
log1:
+ id: 1
uuid: zzzzz-xxxxx-pshmckwoma9plh7
- object_uuid: zzzzz-tpzed-l1s2piq4t4mps8r
\ No newline at end of file
+ object_uuid: zzzzz-tpzed-l1s2piq4t4mps8r
+
+log2: # admin changes repository2, which is owned by active user
+ id: 2
+ uuid: zzzzz-xxxxx-pshmckwoma00002
+ owner_uuid: zzzzz-tpzed-d9tiejq69daie8f # admin user
+ object_uuid: zzzzz-2x53u-382brsig8rp3667 # repository foo
+ object_owner_uuid: zzzzz-tpzed-xurymjxw79nv3jz # active user
+
+log3: # admin changes specimen owned_by_spectator
+ id: 3
+ uuid: zzzzz-xxxxx-pshmckwoma00003
+ owner_uuid: zzzzz-tpzed-d9tiejq69daie8f # admin user
+ object_uuid: zzzzz-2x53u-3b0xxwzlbzxq5yr # specimen owned_by_spectator
+ object_owner_uuid: zzzzz-tpzed-l1s2piq4t4mps8r # spectator user
+
+log4: # foo collection added, readable by active through link
+ id: 4
+ uuid: zzzzz-xxxxx-pshmckwoma00004
+ owner_uuid: zzzzz-tpzed-000000000000000 # system user
+ object_uuid: 1f4b0bc7583c2a7f9102c395f4ffc5e3+45 # foo file
+ object_owner_uuid: zzzzz-tpzed-000000000000000 # system user
+
+log5: # baz collection added, readable by active and spectator through group 'all users' group membership
+ id: 5
+ uuid: zzzzz-xxxxx-pshmckwoma00005
+ owner_uuid: zzzzz-tpzed-000000000000000 # system user
+ object_uuid: ea10d51bcf88862dbcc36eb292017dfd+45 # baz file
+ object_owner_uuid: zzzzz-tpzed-000000000000000 # system user
foo:
uuid: zzzzz-2x53u-382brsig8rp3666
- owner_uuid: zzzzz-tpzed-xurymjxw79nv3jz
+ owner_uuid: zzzzz-tpzed-xurymjxw79nv3jz # active user
name: foo
+
+repository2:
+ uuid: zzzzz-2x53u-382brsig8rp3667
+ owner_uuid: zzzzz-tpzed-xurymjxw79nv3jz # active user
+ name: foo2
class Arvados::V1::ApiTokensScopeTest < ActionController::IntegrationTest
fixtures :all
- def setup
- @token = {}
- end
-
- def auth_with(name)
- @token = {api_token: api_client_authorizations(name).api_token}
- end
-
def v1_url(*parts)
(['arvados', 'v1'] + parts).join('/')
end
- def request_with_auth(method, path, params={})
- send(method, path, @token.merge(params))
- end
-
- def get_with_auth(*args)
- request_with_auth(:get_via_redirect, *args)
- end
-
- def post_with_auth(*args)
- request_with_auth(:post_via_redirect, *args)
- end
-
test "user list token can only list users" do
- auth_with :active_userlist
- get_with_auth v1_url('users')
+ get_args = [{}, auth(:active_userlist)]
+ get(v1_url('users'), *get_args)
assert_response :success
- get_with_auth v1_url('users', '') # Add trailing slash.
+ get(v1_url('users', ''), *get_args) # Add trailing slash.
assert_response :success
- get_with_auth v1_url('users', 'current')
+ get(v1_url('users', 'current'), *get_args)
assert_response 403
- get_with_auth v1_url('virtual_machines')
+ get(v1_url('virtual_machines'), *get_args)
assert_response 403
end
test "specimens token can see exactly owned specimens" do
- auth_with :active_specimens
- get_with_auth v1_url('specimens')
+ get_args = [{}, auth(:active_specimens)]
+ get(v1_url('specimens'), *get_args)
assert_response 403
- get_with_auth v1_url('specimens', specimens(:owned_by_active_user).uuid)
+ get(v1_url('specimens', specimens(:owned_by_active_user).uuid), *get_args)
assert_response :success
- get_with_auth v1_url('specimens', specimens(:owned_by_spectator).uuid)
+ get(v1_url('specimens', specimens(:owned_by_spectator).uuid), *get_args)
assert_includes(403..404, @response.status)
end
test "token with multiple scopes can use them all" do
def get_token_count
- get_with_auth v1_url('api_client_authorizations')
+ get(v1_url('api_client_authorizations'), {}, auth(:active_apitokens))
assert_response :success
token_count = JSON.parse(@response.body)['items_available']
assert_not_nil(token_count, "could not find token count")
token_count
end
- auth_with :active_apitokens
# Test the GET scope.
token_count = get_token_count
# Test the POST scope.
- post_with_auth(v1_url('api_client_authorizations'),
- api_client_authorization: {user_id: users(:active).id})
+ post(v1_url('api_client_authorizations'),
+ {api_client_authorization: {user_id: users(:active).id}},
+ auth(:active_apitokens))
assert_response :success
assert_equal(token_count + 1, get_token_count,
"token count suggests POST was not accepted")
# Test other requests are denied.
- get_with_auth v1_url('api_client_authorizations',
- api_client_authorizations(:active_apitokens).uuid)
+ get(v1_url('api_client_authorizations',
+ api_client_authorizations(:active_apitokens).uuid),
+ {}, auth(:active_apitokens))
assert_response 403
end
test "token without scope has no access" do
# Logs are good for this test, because logs have relatively
# few access controls enforced at the model level.
- auth_with :admin_noscope
- get_with_auth v1_url('logs')
+ req_args = [{}, auth(:admin_noscope)]
+ get(v1_url('logs'), *req_args)
assert_response 403
- get_with_auth v1_url('logs', logs(:log1).uuid)
+ get(v1_url('logs', logs(:log1).uuid), *req_args)
assert_response 403
- post_with_auth(v1_url('logs'), log: {})
+ post(v1_url('logs'), *req_args)
assert_response 403
end
def vm_logins_url(name)
v1_url('virtual_machines', virtual_machines(name).uuid, 'logins')
end
- auth_with :admin_vm
- get_with_auth vm_logins_url(:testvm)
+ get_args = [{}, auth(:admin_vm)]
+ get(vm_logins_url(:testvm), *get_args)
assert_response :success
- get_with_auth vm_logins_url(:testvm2)
- assert(@response.status >= 400, "getting testvm2 logins should have failed")
+ get(vm_logins_url(:testvm2), *get_args)
+ assert_includes(400..419, @response.status,
+ "getting testvm2 logins should have failed")
end
end
--- /dev/null
+require 'test_helper'
+
+class Arvados::V1::ReaderTokensTest < ActionController::IntegrationTest
+ fixtures :all
+
+ def spectator_specimen
+ specimens(:owned_by_spectator).uuid
+ end
+
+ def get_specimens(main_auth, read_auth, formatter=:to_a)
+ params = {}
+ params[:reader_tokens] = [api_token(read_auth)].send(formatter) if read_auth
+ headers = {}
+ headers.merge!(auth(main_auth)) if main_auth
+ get('/arvados/v1/specimens', params, headers)
+ end
+
+ def get_specimen_uuids(main_auth, read_auth, formatter=:to_a)
+ get_specimens(main_auth, read_auth, formatter)
+ assert_response :success
+ json_response['items'].map { |spec| spec['uuid'] }
+ end
+
+ def assert_post_denied(main_auth, read_auth, formatter=:to_a)
+ if main_auth
+ headers = auth(main_auth)
+ expected = 403
+ else
+ headers = {}
+ expected = 401
+ end
+ post('/arvados/v1/specimens.json',
+ {specimen: {}, reader_tokens: [api_token(read_auth)].send(formatter)},
+ headers)
+ assert_response expected
+ end
+
+ test "active user can't see spectator specimen" do
+ # Other tests in this suite assume that the active user doesn't
+ # have read permission to the owned_by_spectator specimen.
+ # This test checks that this assumption still holds.
+ refute_includes(get_specimen_uuids(:active, nil), spectator_specimen,
+ ["active user can read the owned_by_spectator specimen",
+ "other tests will return false positives"].join(" - "))
+ end
+
+ [nil, :active_noscope].each do |main_auth|
+ [:spectator, :spectator_specimens].each do |read_auth|
+ test "#{main_auth} auth with reader token #{read_auth} can read" do
+ assert_includes(get_specimen_uuids(main_auth, read_auth),
+ spectator_specimen, "did not find spectator specimen")
+ end
+
+ test "#{main_auth} auth with JSON read token #{read_auth} can read" do
+ assert_includes(get_specimen_uuids(main_auth, read_auth, :to_json),
+ spectator_specimen, "did not find spectator specimen")
+ end
+
+ test "#{main_auth} auth with reader token #{read_auth} can't write" do
+ assert_post_denied(main_auth, read_auth)
+ end
+
+ test "#{main_auth} auth with JSON read token #{read_auth} can't write" do
+ assert_post_denied(main_auth, read_auth, :to_json)
+ end
+ end
+ end
+
+ test "scopes are still limited with reader tokens" do
+ get('/arvados/v1/collections',
+ {reader_tokens: [api_token(:spectator_specimens)]},
+ auth(:active_noscope))
+ assert_response 403
+ end
+
+ test "reader tokens grant no permissions when expired" do
+ get_specimens(:active_noscope, :expired)
+ assert_response 403
+ end
+
+ test "reader tokens grant no permissions outside their scope" do
+ refute_includes(get_specimen_uuids(:active, :admin_vm), spectator_specimen,
+ "scoped reader token granted permissions out of scope")
+ end
+end
--- /dev/null
+require 'test_helper'
+require 'websocket_runner'
+require 'oj'
+require 'database_cleaner'
+
+DatabaseCleaner.strategy = :truncation
+
+class WebsocketTest < ActionDispatch::IntegrationTest
+ self.use_transactional_fixtures = false
+
+ setup do
+ DatabaseCleaner.start
+ end
+
+ teardown do
+ DatabaseCleaner.clean
+ end
+
+ def ws_helper (token = nil, timeout = true)
+ opened = false
+ close_status = nil
+ too_long = false
+
+ EM.run {
+ if token
+ ws = Faye::WebSocket::Client.new("ws://localhost:3002/websocket?api_token=#{api_client_authorizations(token).api_token}")
+ else
+ ws = Faye::WebSocket::Client.new("ws://localhost:3002/websocket")
+ end
+
+ ws.on :open do |event|
+ opened = true
+ if timeout
+ EM::Timer.new 3 do
+ too_long = true
+ EM.stop_event_loop
+ end
+ end
+ end
+
+ ws.on :close do |event|
+ close_status = [:close, event.code, event.reason]
+ EM.stop_event_loop
+ end
+
+ yield ws
+ }
+
+ assert opened, "Should have opened web socket"
+ assert (not too_long), "Test took too long"
+ assert_equal 1000, close_status[1], "Connection closed unexpectedly (check log for errors)"
+ end
+
+ test "connect with no token" do
+ status = nil
+
+ ws_helper do |ws|
+ ws.on :message do |event|
+ d = Oj.load event.data
+ status = d["status"]
+ ws.close
+ end
+ end
+
+ assert_equal 401, status
+ end
+
+
+ test "connect, subscribe and get response" do
+ status = nil
+
+ ws_helper :admin do |ws|
+ ws.on :open do |event|
+ ws.send ({method: 'subscribe'}.to_json)
+ end
+
+ ws.on :message do |event|
+ d = Oj.load event.data
+ status = d["status"]
+ ws.close
+ end
+ end
+
+ assert_equal 200, status
+ end
+
+ test "connect, subscribe, get event" do
+ state = 1
+ spec = nil
+ ev_uuid = nil
+
+ authorize_with :admin
+
+ ws_helper :admin do |ws|
+ ws.on :open do |event|
+ ws.send ({method: 'subscribe'}.to_json)
+ end
+
+ ws.on :message do |event|
+ d = Oj.load event.data
+ case state
+ when 1
+ assert_equal 200, d["status"]
+ spec = Specimen.create
+ state = 2
+ when 2
+ ev_uuid = d["object_uuid"]
+ ws.close
+ end
+ end
+
+ end
+
+ assert_not_nil spec
+ assert_equal spec.uuid, ev_uuid
+ end
+
+ test "connect, subscribe, get two events" do
+ state = 1
+ spec = nil
+ human = nil
+ spec_ev_uuid = nil
+ human_ev_uuid = nil
+
+ authorize_with :admin
+
+ ws_helper :admin do |ws|
+ ws.on :open do |event|
+ ws.send ({method: 'subscribe'}.to_json)
+ end
+
+ ws.on :message do |event|
+ d = Oj.load event.data
+ case state
+ when 1
+ assert_equal 200, d["status"]
+ spec = Specimen.create
+ human = Human.create
+ state = 2
+ when 2
+ spec_ev_uuid = d["object_uuid"]
+ state = 3
+ when 3
+ human_ev_uuid = d["object_uuid"]
+ state = 4
+ ws.close
+ when 4
+ assert false, "Should not get any more events"
+ end
+ end
+
+ end
+
+ assert_not_nil spec
+ assert_not_nil human
+ assert_equal spec.uuid, spec_ev_uuid
+ assert_equal human.uuid, human_ev_uuid
+ end
+
+ test "connect, subscribe, filter events" do
+ state = 1
+ human = nil
+ human_ev_uuid = nil
+
+ authorize_with :admin
+
+ ws_helper :admin do |ws|
+ ws.on :open do |event|
+ ws.send ({method: 'subscribe', filters: [['object_uuid', 'is_a', 'arvados#human']]}.to_json)
+ end
+
+ ws.on :message do |event|
+ d = Oj.load event.data
+ case state
+ when 1
+ assert_equal 200, d["status"]
+ Specimen.create
+ human = Human.create
+ state = 2
+ when 2
+ human_ev_uuid = d["object_uuid"]
+ state = 3
+ ws.close
+ when 3
+ assert false, "Should not get any more events"
+ end
+ end
+
+ end
+
+ assert_not_nil human
+ assert_equal human.uuid, human_ev_uuid
+ end
+
+
+ test "connect, subscribe, multiple filters" do
+ state = 1
+ spec = nil
+ human = nil
+ spec_ev_uuid = nil
+ human_ev_uuid = nil
+
+ authorize_with :admin
+
+ ws_helper :admin do |ws|
+ ws.on :open do |event|
+ ws.send ({method: 'subscribe', filters: [['object_uuid', 'is_a', 'arvados#human']]}.to_json)
+ ws.send ({method: 'subscribe', filters: [['object_uuid', 'is_a', 'arvados#specimen']]}.to_json)
+ end
+
+ ws.on :message do |event|
+ d = Oj.load event.data
+ case state
+ when 1
+ assert_equal 200, d["status"]
+ state = 2
+ when 2
+ assert_equal 200, d["status"]
+ spec = Specimen.create
+ Trait.create # not part of filters, should not be received
+ human = Human.create
+ state = 3
+ when 3
+ spec_ev_uuid = d["object_uuid"]
+ state = 4
+ when 4
+ human_ev_uuid = d["object_uuid"]
+ state = 5
+ ws.close
+ when 5
+ assert false, "Should not get any more events"
+ end
+ end
+
+ end
+
+ assert_not_nil spec
+ assert_not_nil human
+ assert_equal spec.uuid, spec_ev_uuid
+ assert_equal human.uuid, human_ev_uuid
+ end
+
+ test "connect, subscribe, ask events starting at seq num" do
+ state = 1
+ human = nil
+ human_ev_uuid = nil
+
+ authorize_with :admin
+
+ lastid = logs(:log3).id
+ l1 = nil
+ l2 = nil
+
+ ws_helper :admin do |ws|
+ ws.on :open do |event|
+ ws.send ({method: 'subscribe', last_log_id: lastid}.to_json)
+ end
+
+ ws.on :message do |event|
+ d = Oj.load event.data
+ case state
+ when 1
+ assert_equal 200, d["status"]
+ state = 2
+ when 2
+ l1 = d["object_uuid"]
+ assert_not_nil l1, "Unexpected message: #{d}"
+ state = 3
+ when 3
+ l2 = d["object_uuid"]
+ assert_not_nil l2, "Unexpected message: #{d}"
+ state = 4
+ ws.close
+ when 4
+ assert false, "Should not get any more events"
+ end
+ end
+
+ end
+
+ assert_equal logs(:log4).object_uuid, l1
+ assert_equal logs(:log5).object_uuid, l2
+ end
+
+ test "connect, subscribe, get event, unsubscribe" do
+ state = 1
+ spec = nil
+ spec_ev_uuid = nil
+ filter_id = nil
+
+ authorize_with :admin
+
+ ws_helper :admin, false do |ws|
+ ws.on :open do |event|
+ ws.send ({method: 'subscribe'}.to_json)
+ EM::Timer.new 3 do
+ # Set a time limit on the test because after unsubscribing the server
+ # still has to process the next event (and then hopefully correctly
+ # decides not to send it because we unsubscribed.)
+ ws.close
+ end
+ end
+
+ ws.on :message do |event|
+ d = Oj.load event.data
+ case state
+ when 1
+ assert_equal 200, d["status"]
+ spec = Specimen.create
+ state = 2
+ when 2
+ spec_ev_uuid = d["object_uuid"]
+ ws.send ({method: 'unsubscribe'}.to_json)
+
+ EM::Timer.new 1 do
+ Specimen.create
+ end
+
+ state = 3
+ when 3
+ assert_equal 200, d["status"]
+ state = 4
+ when 4
+ assert false, "Should not get any more events"
+ end
+ end
+
+ end
+
+ assert_not_nil spec
+ assert_equal spec.uuid, spec_ev_uuid
+ end
+
+ test "connect, subscribe, get event, unsubscribe with filter" do
+ state = 1
+ spec = nil
+ spec_ev_uuid = nil
+
+ authorize_with :admin
+
+ ws_helper :admin, false do |ws|
+ ws.on :open do |event|
+ ws.send ({method: 'subscribe', filters: [['object_uuid', 'is_a', 'arvados#human']]}.to_json)
+ EM::Timer.new 3 do
+ # Set a time limit on the test because after unsubscribing the server
+ # still has to process the next event (and then hopefully correctly
+ # decides not to send it because we unsubscribed.)
+ ws.close
+ end
+ end
+
+ ws.on :message do |event|
+ d = Oj.load event.data
+ case state
+ when 1
+ assert_equal 200, d["status"]
+ spec = Human.create
+ state = 2
+ when 2
+ spec_ev_uuid = d["object_uuid"]
+ ws.send ({method: 'unsubscribe', filters: [['object_uuid', 'is_a', 'arvados#human']]}.to_json)
+
+ EM::Timer.new 1 do
+ Human.create
+ end
+
+ state = 3
+ when 3
+ assert_equal 200, d["status"]
+ state = 4
+ when 4
+ assert false, "Should not get any more events"
+ end
+ end
+
+ end
+
+ assert_not_nil spec
+ assert_equal spec.uuid, spec_ev_uuid
+ end
+
+
+ test "connect, subscribe, get event, try to unsubscribe with bogus filter" do
+ state = 1
+ spec = nil
+ spec_ev_uuid = nil
+ human = nil
+ human_ev_uuid = nil
+
+ authorize_with :admin
+
+ ws_helper :admin do |ws|
+ ws.on :open do |event|
+ ws.send ({method: 'subscribe'}.to_json)
+ end
+
+ ws.on :message do |event|
+ d = Oj.load event.data
+ case state
+ when 1
+ assert_equal 200, d["status"]
+ spec = Specimen.create
+ state = 2
+ when 2
+ spec_ev_uuid = d["object_uuid"]
+ ws.send ({method: 'unsubscribe', filters: [['foo', 'bar', 'baz']]}.to_json)
+
+ EM::Timer.new 1 do
+ human = Human.create
+ end
+
+ state = 3
+ when 3
+ assert_equal 404, d["status"]
+ state = 4
+ when 4
+ human_ev_uuid = d["object_uuid"]
+ state = 5
+ ws.close
+ when 5
+ assert false, "Should not get any more events"
+ end
+ end
+
+ end
+
+ assert_not_nil spec
+ assert_not_nil human
+ assert_equal spec.uuid, spec_ev_uuid
+ assert_equal human.uuid, human_ev_uuid
+ end
+
+
+
+ test "connected, not subscribed, no event" do
+ authorize_with :admin
+
+ ws_helper :admin, false do |ws|
+ ws.on :open do |event|
+ EM::Timer.new 1 do
+ Specimen.create
+ end
+
+ EM::Timer.new 3 do
+ ws.close
+ end
+ end
+
+ ws.on :message do |event|
+ assert false, "Should not get any messages, message was #{event.data}"
+ end
+ end
+ end
+
+ test "connected, not authorized to see event" do
+ state = 1
+
+ authorize_with :admin
+
+ ws_helper :active, false do |ws|
+ ws.on :open do |event|
+ ws.send ({method: 'subscribe'}.to_json)
+
+ EM::Timer.new 3 do
+ ws.close
+ end
+ end
+
+ ws.on :message do |event|
+ d = Oj.load event.data
+ case state
+ when 1
+ assert_equal 200, d["status"]
+ Specimen.create
+ state = 2
+ when 2
+ assert false, "Should not get any messages, message was #{event.data}"
+ end
+ end
+
+ end
+
+ end
+
+ test "connect, try bogus method" do
+ status = nil
+
+ ws_helper :admin do |ws|
+ ws.on :open do |event|
+ ws.send ({method: 'frobnabble'}.to_json)
+ end
+
+ ws.on :message do |event|
+ d = Oj.load event.data
+ status = d["status"]
+ ws.close
+ end
+ end
+
+ assert_equal 400, status
+ end
+
+ test "connect, missing method" do
+ status = nil
+
+ ws_helper :admin do |ws|
+ ws.on :open do |event|
+ ws.send ({fizzbuzz: 'frobnabble'}.to_json)
+ end
+
+ ws.on :message do |event|
+ d = Oj.load event.data
+ status = d["status"]
+ ws.close
+ end
+ end
+
+ assert_equal 400, status
+ end
+
+ test "connect, send malformed request" do
+ status = nil
+
+ ws_helper :admin do |ws|
+ ws.on :open do |event|
+ ws.send '<XML4EVER></XML4EVER>'
+ end
+
+ ws.on :message do |event|
+ d = Oj.load event.data
+ status = d["status"]
+ ws.close
+ end
+ end
+
+ assert_equal 400, status
+ end
+
+
+ test "connect, try subscribe too many filters" do
+ state = 1
+
+ authorize_with :admin
+
+ ws_helper :admin do |ws|
+ ws.on :open do |event|
+ (1..17).each do |i|
+ ws.send ({method: 'subscribe', filters: [['object_uuid', '=', i]]}.to_json)
+ end
+ end
+
+ ws.on :message do |event|
+ d = Oj.load event.data
+ case state
+ when (1..EventBus::MAX_FILTERS)
+ assert_equal 200, d["status"]
+ state += 1
+ when (EventBus::MAX_FILTERS+1)
+ assert_equal 403, d["status"]
+ ws.close
+ end
+ end
+
+ end
+
+ assert_equal 17, state
+
+ end
+
+end
require File.expand_path('../../config/environment', __FILE__)
require 'rails/test_help'
+module ArvadosTestSupport
+ def json_response
+ @json_response ||= ActiveSupport::JSON.decode @response.body
+ end
+
+ def api_token(api_client_auth_name)
+ api_client_authorizations(api_client_auth_name).api_token
+ end
+
+ def auth(api_client_auth_name)
+ {'HTTP_AUTHORIZATION' => "OAuth2 #{api_token(api_client_auth_name)}"}
+ end
+end
+
class ActiveSupport::TestCase
# Setup all fixtures in test/fixtures/*.(yml|csv) for all tests in alphabetical order.
#
# -- they do not yet inherit this setting
fixtures :all
+ include ArvadosTestSupport
+
teardown do
Thread.current[:api_client_ip_address] = nil
Thread.current[:api_client_authorization] = nil
self.request.headers["Accept"] = "text/json"
end
- def json_response
- @json_response ||= ActiveSupport::JSON.decode @response.body
- end
-
def authorize_with(api_client_auth_name)
- self.request.env['HTTP_AUTHORIZATION'] = "OAuth2 #{api_client_authorizations(api_client_auth_name).api_token}"
+ ArvadosApiToken.new.call ({"rack.input" => "", "HTTP_AUTHORIZATION" => "OAuth2 #{api_client_authorizations(api_client_auth_name).api_token}"})
end
-
- # Add more helper methods to be used by all tests here...
end
class ActionDispatch::IntegrationTest
-
teardown do
Thread.current[:api_client_ip_address] = nil
Thread.current[:api_client_authorization] = nil
Thread.current[:api_client] = nil
Thread.current[:user] = nil
end
-
- def auth auth_fixture
- {'HTTP_AUTHORIZATION' => "OAuth2 #{api_client_authorizations(auth_fixture).api_token}"}
- end
end
# Ensure permissions are computed from the test fixtures.
auth.destroy
assert_auth_logged_with_clean_properties(auth, :destroy)
end
+
+ test "use ownership and permission links to determine which logs a user can see" do
+ c = Log.readable_by(users(:admin)).order("id asc").each.to_a
+ assert_equal 5, c.size
+ assert_equal 1, c[0].id # no-op
+ assert_equal 2, c[1].id # admin changes repository foo, which is owned by active user
+ assert_equal 3, c[2].id # admin changes specimen owned_by_spectator
+ assert_equal 4, c[3].id # foo collection added, readable by active through link
+ assert_equal 5, c[4].id # baz collection added, readable by active and spectator through group 'all users' group membership
+
+ c = Log.readable_by(users(:active)).order("id asc").each.to_a
+ assert_equal 3, c.size
+ assert_equal 2, c[0].id # admin changes repository foo, which is owned by active user
+ assert_equal 4, c[1].id # foo collection added, readable by active through link
+ assert_equal 5, c[2].id # baz collection added, readable by active and spectator through group 'all users' group membership
+
+ c = Log.readable_by(users(:spectator)).order("id asc").each.to_a
+ assert_equal 2, c.size
+ assert_equal 3, c[0].id # admin changes specimen owned_by_spectator
+ assert_equal 5, c[1].id # baz collection added, readable by active and spectator through group 'all users' group membership
+ end
end
--- /dev/null
+require 'bundler'
+
+$ARV_API_SERVER_DIR = File.expand_path('../..', __FILE__)
+SERVER_PID_PATH = 'tmp/pids/passenger.3002.pid'
+
+class WebsocketTestRunner < MiniTest::Unit
+ def _system(*cmd)
+ Bundler.with_clean_env do
+ if not system({'ARVADOS_WEBSOCKETS' => 'ws-only', 'RAILS_ENV' => 'test'}, *cmd)
+ raise RuntimeError, "#{cmd[0]} returned exit code #{$?.exitstatus}"
+ end
+ end
+ end
+
+ def _run(args=[])
+ server_pid = Dir.chdir($ARV_API_SERVER_DIR) do |apidir|
+ # Only passenger seems to be able to run the websockets server successfully.
+ _system('passenger', 'start', '-d', '-p3002')
+ timeout = Time.now.tv_sec + 10
+ begin
+ sleep 0.2
+ begin
+ server_pid = IO.read(SERVER_PID_PATH).to_i
+ good_pid = (server_pid > 0) and (Process.kill(0, pid) rescue false)
+ rescue Errno::ENOENT
+ good_pid = false
+ end
+ end while (not good_pid) and (Time.now.tv_sec < timeout)
+ if not good_pid
+ raise RuntimeError, "could not find API server Rails pid"
+ end
+ server_pid
+ end
+ begin
+ super(args)
+ ensure
+ Process.kill('TERM', server_pid)
+ end
+ end
+end
+
+MiniTest::Unit.runner = WebsocketTestRunner.new
"log"
"net/http"
"os"
- "path/filepath"
"regexp"
- "strconv"
"strings"
"syscall"
- "time"
)
// ======================
var PROC_MOUNTS = "/proc/mounts"
-var KeepVolumes []string
+// The Keep VolumeManager maintains a list of available volumes.
+var KeepVM VolumeManager
// ==========
// Error types.
// directories.
var listen, volumearg string
+ var serialize_io bool
flag.StringVar(&listen, "listen", DEFAULT_ADDR,
"interface on which to listen for requests, in the format ipaddr:port. e.g. -listen=10.0.1.24:8000. Use -listen=:port to listen on all network interfaces.")
flag.StringVar(&volumearg, "volumes", "",
"Comma-separated list of directories to use for Keep volumes, e.g. -volumes=/var/keep1,/var/keep2. If empty or not supplied, Keep will scan mounted filesystems for volumes with a /keep top-level directory.")
+ flag.BoolVar(&serialize_io, "serialize", false,
+ "If set, all read and write operations on local Keep volumes will be serialized.")
flag.Parse()
// Look for local keep volumes.
}
// Check that the specified volumes actually exist.
- KeepVolumes = []string(nil)
+ var goodvols []Volume = nil
for _, v := range keepvols {
if _, err := os.Stat(v); err == nil {
log.Println("adding Keep volume:", v)
- KeepVolumes = append(KeepVolumes, v)
+ newvol := MakeUnixVolume(v, serialize_io)
+ goodvols = append(goodvols, &newvol)
} else {
log.Printf("bad Keep volume: %s\n", err)
}
}
- if len(KeepVolumes) == 0 {
+ if len(goodvols) == 0 {
log.Fatal("could not find any keep volumes")
}
+ // Start a round-robin VolumeManager with the volumes we have found.
+ KeepVM = MakeRRVolumeManager(goodvols)
+
// Set up REST handlers.
//
// Start with a router that will route each URL path to an
func IndexHandler(w http.ResponseWriter, req *http.Request) {
prefix := mux.Vars(req)["prefix"]
- index := IndexLocators(prefix)
+ var index string
+ for _, vol := range KeepVM.Volumes() {
+ index = index + vol.Index(prefix)
+ }
w.Write([]byte(index))
}
func GetNodeStatus() *NodeStatus {
st := new(NodeStatus)
- st.Volumes = make([]*VolumeStatus, len(KeepVolumes))
- for i, vol := range KeepVolumes {
- st.Volumes[i] = GetVolumeStatus(vol)
+ st.Volumes = make([]*VolumeStatus, len(KeepVM.Volumes()))
+ for i, vol := range KeepVM.Volumes() {
+ st.Volumes[i] = vol.Status()
}
return st
}
return &VolumeStatus{volume, devnum, free, used}
}
-// IndexLocators
-// Returns a string containing a list of locator ids found on this
-// Keep server. If {prefix} is given, return only those locator
-// ids that begin with the given prefix string.
-//
-// The return string consists of a sequence of newline-separated
-// strings in the format
-//
-// locator+size modification-time
-//
-// e.g.:
-//
-// e4df392f86be161ca6ed3773a962b8f3+67108864 1388894303
-// e4d41e6fd68460e0e3fc18cc746959d2+67108864 1377796043
-// e4de7a2810f5554cd39b36d8ddb132ff+67108864 1388701136
-//
-func IndexLocators(prefix string) string {
- var output string
- for _, vol := range KeepVolumes {
- filepath.Walk(vol,
- func(path string, info os.FileInfo, err error) error {
- // This WalkFunc inspects each path in the volume
- // and prints an index line for all files that begin
- // with prefix.
- if err != nil {
- log.Printf("IndexHandler: %s: walking to %s: %s",
- vol, path, err)
- return nil
- }
- locator := filepath.Base(path)
- // Skip directories that do not match prefix.
- // We know there is nothing interesting inside.
- if info.IsDir() &&
- !strings.HasPrefix(locator, prefix) &&
- !strings.HasPrefix(prefix, locator) {
- return filepath.SkipDir
- }
- // Skip any file that is not apparently a locator, e.g. .meta files
- if is_valid, err := IsValidLocator(locator); err != nil {
- return err
- } else if !is_valid {
- return nil
- }
- // Print filenames beginning with prefix
- if !info.IsDir() && strings.HasPrefix(locator, prefix) {
- output = output + fmt.Sprintf(
- "%s+%d %d\n", locator, info.Size(), info.ModTime().Unix())
- }
- return nil
- })
- }
-
- return output
-}
-
func GetBlock(hash string) ([]byte, error) {
- var buf = make([]byte, BLOCKSIZE)
-
// Attempt to read the requested hash from a keep volume.
- for _, vol := range KeepVolumes {
- var f *os.File
- var err error
- var nread int
-
- blockFilename := fmt.Sprintf("%s/%s/%s", vol, hash[0:3], hash)
-
- f, err = os.Open(blockFilename)
- if err != nil {
- if !os.IsNotExist(err) {
- // A block is stored on only one Keep disk,
- // so os.IsNotExist is expected. Report any other errors.
- log.Printf("%s: opening %s: %s\n", vol, blockFilename, err)
+ for _, vol := range KeepVM.Volumes() {
+ if buf, err := vol.Get(hash); err != nil {
+ // IsNotExist is an expected error and may be ignored.
+ // (If all volumes report IsNotExist, we return a NotFoundError)
+ // A CorruptError should be returned immediately.
+ // Any other errors should be logged but we continue trying to
+ // read.
+ switch {
+ case os.IsNotExist(err):
+ continue
+ default:
+ log.Printf("GetBlock: reading %s: %s\n", hash, err)
}
- continue
- }
-
- nread, err = f.Read(buf)
- if err != nil {
- log.Printf("%s: reading %s: %s\n", vol, blockFilename, err)
- continue
- }
-
- // Double check the file checksum.
- //
- filehash := fmt.Sprintf("%x", md5.Sum(buf[:nread]))
- if filehash != hash {
- // TODO(twp): this condition probably represents a bad disk and
- // should raise major alarm bells for an administrator: e.g.
- // they should be sent directly to an event manager at high
- // priority or logged as urgent problems.
+ } else {
+ // Double check the file checksum.
//
- log.Printf("%s: checksum mismatch: %s (actual hash %s)\n",
- vol, blockFilename, filehash)
- return buf, CorruptError
+ filehash := fmt.Sprintf("%x", md5.Sum(buf))
+ if filehash != hash {
+ // TODO(twp): this condition probably represents a bad disk and
+ // should raise major alarm bells for an administrator: e.g.
+ // they should be sent directly to an event manager at high
+ // priority or logged as urgent problems.
+ //
+ log.Printf("%s: checksum mismatch for request %s (actual hash %s)\n",
+ vol, hash, filehash)
+ return buf, CorruptError
+ }
+ // Success!
+ return buf, nil
}
-
- // Success!
- return buf[:nread], nil
}
log.Printf("%s: not found on any volumes, giving up\n", hash)
- return buf, NotFoundError
+ return nil, NotFoundError
}
/* PutBlock(block, hash)
}
}
- // Store the block on the first available Keep volume.
- allFull := true
- for _, vol := range KeepVolumes {
- if IsFull(vol) {
- continue
- }
- allFull = false
- blockDir := fmt.Sprintf("%s/%s", vol, hash[0:3])
- if err := os.MkdirAll(blockDir, 0755); err != nil {
- log.Printf("%s: could not create directory %s: %s",
- hash, blockDir, err)
- continue
- }
-
- tmpfile, tmperr := ioutil.TempFile(blockDir, "tmp"+hash)
- if tmperr != nil {
- log.Printf("ioutil.TempFile(%s, tmp%s): %s", blockDir, hash, tmperr)
- continue
- }
- blockFilename := fmt.Sprintf("%s/%s", blockDir, hash)
-
- if _, err := tmpfile.Write(block); err != nil {
- log.Printf("%s: writing to %s: %s\n", vol, blockFilename, err)
- continue
- }
- if err := tmpfile.Close(); err != nil {
- log.Printf("closing %s: %s\n", tmpfile.Name(), err)
- os.Remove(tmpfile.Name())
- continue
- }
- if err := os.Rename(tmpfile.Name(), blockFilename); err != nil {
- log.Printf("rename %s %s: %s\n", tmpfile.Name(), blockFilename, err)
- os.Remove(tmpfile.Name())
- continue
- }
- return nil
- }
-
- if allFull {
- log.Printf("all Keep volumes full")
- return FullError
+ // Choose a Keep volume to write to.
+ // If this volume fails, try all of the volumes in order.
+ vol := KeepVM.Choose()
+ if err := vol.Put(hash, block); err == nil {
+ return nil // success!
} else {
- log.Printf("all Keep volumes failed")
- return GenericError
- }
-}
-
-func IsFull(volume string) (isFull bool) {
- fullSymlink := volume + "/full"
-
- // Check if the volume has been marked as full in the last hour.
- if link, err := os.Readlink(fullSymlink); err == nil {
- if ts, err := strconv.Atoi(link); err == nil {
- fulltime := time.Unix(int64(ts), 0)
- if time.Since(fulltime).Hours() < 1.0 {
- return true
+ allFull := true
+ for _, vol := range KeepVM.Volumes() {
+ err := vol.Put(hash, block)
+ if err == nil {
+ return nil // success!
+ }
+ if err != FullError {
+ // The volume is not full but the write did not succeed.
+ // Report the error and continue trying.
+ allFull = false
+ log.Printf("%s: Write(%s): %s\n", vol, hash, err)
}
}
- }
-
- if avail, err := FreeDiskSpace(volume); err == nil {
- isFull = avail < MIN_FREE_KILOBYTES
- } else {
- log.Printf("%s: FreeDiskSpace: %s\n", volume, err)
- isFull = false
- }
- // If the volume is full, timestamp it.
- if isFull {
- now := fmt.Sprintf("%d", time.Now().Unix())
- os.Symlink(now, fullSymlink)
- }
- return
-}
-
-// FreeDiskSpace(volume)
-// Returns the amount of available disk space on VOLUME,
-// as a number of 1k blocks.
-//
-// TODO(twp): consider integrating this better with
-// VolumeStatus (e.g. keep a NodeStatus object up-to-date
-// periodically and use it as the source of info)
-//
-func FreeDiskSpace(volume string) (free uint64, err error) {
- var fs syscall.Statfs_t
- err = syscall.Statfs(volume, &fs)
- if err == nil {
- // Statfs output is not guaranteed to measure free
- // space in terms of 1K blocks.
- free = fs.Bavail * uint64(fs.Bsize) / 1024
+ if allFull {
+ log.Printf("all Keep volumes full")
+ return FullError
+ } else {
+ log.Printf("all Keep volumes failed")
+ return GenericError
+ }
}
- return
}
// ReadAtMost
// When Keep is extended to support hash types other than MD5,
// this should be updated to cover those as well.
//
-func IsValidLocator(loc string) (bool, error) {
- return regexp.MatchString(`^[0-9a-f]{32}$`, loc)
+func IsValidLocator(loc string) bool {
+ match, err := regexp.MatchString(`^[0-9a-f]{32}$`, loc)
+ if err == nil {
+ return match
+ }
+ log.Printf("IsValidLocator: %s\n", err)
+ return false
}
defer teardown()
// Prepare two test Keep volumes. Our block is stored on the second volume.
- KeepVolumes = setup(t, 2)
- store(t, KeepVolumes[1], TEST_HASH, TEST_BLOCK)
+ KeepVM = MakeTestVolumeManager(2)
+ defer func() { KeepVM.Quit() }()
+
+ vols := KeepVM.Volumes()
+ if err := vols[1].Put(TEST_HASH, TEST_BLOCK); err != nil {
+ t.Error(err)
+ }
// Check that GetBlock returns success.
result, err := GetBlock(TEST_HASH)
defer teardown()
// Create two empty test Keep volumes.
- KeepVolumes = setup(t, 2)
+ KeepVM = MakeTestVolumeManager(2)
+ defer func() { KeepVM.Quit() }()
// Check that GetBlock returns failure.
result, err := GetBlock(TEST_HASH)
func TestGetBlockCorrupt(t *testing.T) {
defer teardown()
- // Create two test Keep volumes and store a block in each of them,
- // but the hash of the block does not match the filename.
- KeepVolumes = setup(t, 2)
- for _, vol := range KeepVolumes {
- store(t, vol, TEST_HASH, BAD_BLOCK)
- }
+ // Create two test Keep volumes and store a corrupt block in one.
+ KeepVM = MakeTestVolumeManager(2)
+ defer func() { KeepVM.Quit() }()
+
+ vols := KeepVM.Volumes()
+ vols[0].Put(TEST_HASH, BAD_BLOCK)
// Check that GetBlock returns failure.
result, err := GetBlock(TEST_HASH)
if err != CorruptError {
- t.Errorf("Expected CorruptError, got %v", result)
+ t.Errorf("Expected CorruptError, got %v (buf: %v)", err, result)
}
}
defer teardown()
// Create two test Keep volumes.
- KeepVolumes = setup(t, 2)
+ KeepVM = MakeTestVolumeManager(2)
+ defer func() { KeepVM.Quit() }()
// Check that PutBlock stores the data as expected.
if err := PutBlock(TEST_BLOCK, TEST_HASH); err != nil {
t.Fatalf("PutBlock: %v", err)
}
- result, err := GetBlock(TEST_HASH)
+ vols := KeepVM.Volumes()
+ result, err := vols[0].Get(TEST_HASH)
if err != nil {
- t.Fatalf("GetBlock returned error: %v", err)
+ t.Fatalf("Volume #0 Get returned error: %v", err)
}
if string(result) != string(TEST_BLOCK) {
- t.Error("PutBlock/GetBlock mismatch")
- t.Fatalf("PutBlock stored '%s', GetBlock retrieved '%s'",
+ t.Fatalf("PutBlock stored '%s', Get retrieved '%s'",
string(TEST_BLOCK), string(result))
}
}
defer teardown()
// Create two test Keep volumes, but cripple one of them.
- KeepVolumes = setup(t, 2)
- os.Chmod(KeepVolumes[0], 000)
+ KeepVM = MakeTestVolumeManager(2)
+ defer func() { KeepVM.Quit() }()
+
+ vols := KeepVM.Volumes()
+ vols[0].(*MockVolume).Bad = true
// Check that PutBlock stores the data as expected.
if err := PutBlock(TEST_BLOCK, TEST_HASH); err != nil {
defer teardown()
// Create two test Keep volumes.
- KeepVolumes = setup(t, 2)
+ KeepVM = MakeTestVolumeManager(2)
+ defer func() { KeepVM.Quit() }()
// Check that PutBlock returns the expected error when the hash does
// not match the block.
defer teardown()
// Create two test Keep volumes.
- KeepVolumes = setup(t, 2)
+ KeepVM = MakeTestVolumeManager(2)
+ defer func() { KeepVM.Quit() }()
// Store a corrupted block under TEST_HASH.
- store(t, KeepVolumes[0], TEST_HASH, BAD_BLOCK)
+ vols := KeepVM.Volumes()
+ vols[0].Put(TEST_HASH, BAD_BLOCK)
if err := PutBlock(TEST_BLOCK, TEST_HASH); err != nil {
t.Errorf("PutBlock: %v", err)
}
var b2 = []byte("\x0e0eaU\x9a\xa7\x87\xd0\x0b\xc6\xf7\x0b\xbd\xfe4\x04\xcf\x03e\x9etO\x854\xc0\x0f\xfbe\x9cL\x87@\xcc\x94/\xeb-\xa1\x15\xa3\xf4\x15\xdc\xbb\x86\x07Is\x86em}\x1f4\xa4 Y\xd7\x8fZ\x8d\xd1\xef")
var locator = "cee9a457e790cf20d4bdaa6d69f01e41"
- // Prepare two test Keep volumes. Store one block,
- // then attempt to store the other.
- KeepVolumes = setup(t, 2)
- store(t, KeepVolumes[1], locator, b1)
+ // Prepare two test Keep volumes.
+ KeepVM = MakeTestVolumeManager(2)
+ defer func() { KeepVM.Quit() }()
+ // Store one block, then attempt to store the other. Confirm that
+ // PutBlock reported a CollisionError.
+ if err := PutBlock(b1, locator); err != nil {
+ t.Error(err)
+ }
if err := PutBlock(b2, locator); err == nil {
t.Error("PutBlock did not report a collision")
} else if err != CollisionError {
// directories at the top level.
//
func TestFindKeepVolumes(t *testing.T) {
- defer teardown()
+ var tempVols [2]string
+ var err error
+
+ defer func() {
+ for _, path := range tempVols {
+ os.RemoveAll(path)
+ }
+ }()
- // Initialize two keep volumes.
- var tempVols []string = setup(t, 2)
+ // Create two directories suitable for using as keep volumes.
+ for i := range tempVols {
+ if tempVols[i], err = ioutil.TempDir("", "findvol"); err != nil {
+ t.Fatal(err)
+ }
+ tempVols[i] = tempVols[i] + "/keep"
+ if err = os.Mkdir(tempVols[i], 0755); err != nil {
+ t.Fatal(err)
+ }
+ }
// Set up a bogus PROC_MOUNTS file.
if f, err := ioutil.TempFile("", "keeptest"); err == nil {
// Set up Keep volumes and populate them.
// Include multiple blocks on different volumes, and
// some metadata files.
- KeepVolumes = setup(t, 2)
- store(t, KeepVolumes[0], TEST_HASH, TEST_BLOCK)
- store(t, KeepVolumes[1], TEST_HASH_2, TEST_BLOCK_2)
- store(t, KeepVolumes[0], TEST_HASH_3, TEST_BLOCK_3)
- store(t, KeepVolumes[0], TEST_HASH+".meta", []byte("metadata"))
- store(t, KeepVolumes[1], TEST_HASH_2+".meta", []byte("metadata"))
-
- index := IndexLocators("")
+ KeepVM = MakeTestVolumeManager(2)
+ defer func() { KeepVM.Quit() }()
+
+ vols := KeepVM.Volumes()
+ vols[0].Put(TEST_HASH, TEST_BLOCK)
+ vols[1].Put(TEST_HASH_2, TEST_BLOCK_2)
+ vols[0].Put(TEST_HASH_3, TEST_BLOCK_3)
+ vols[0].Put(TEST_HASH+".meta", []byte("metadata"))
+ vols[1].Put(TEST_HASH_2+".meta", []byte("metadata"))
+
+ index := vols[0].Index("") + vols[1].Index("")
expected := `^` + TEST_HASH + `\+\d+ \d+\n` +
TEST_HASH_3 + `\+\d+ \d+\n` +
TEST_HASH_2 + `\+\d+ \d+\n$`
func TestNodeStatus(t *testing.T) {
defer teardown()
- // Set up test Keep volumes.
- KeepVolumes = setup(t, 2)
+ // Set up test Keep volumes with some blocks.
+ KeepVM = MakeTestVolumeManager(2)
+ defer func() { KeepVM.Quit() }()
+
+ vols := KeepVM.Volumes()
+ vols[0].Put(TEST_HASH, TEST_BLOCK)
+ vols[1].Put(TEST_HASH_2, TEST_BLOCK_2)
// Get node status and make a basic sanity check.
st := GetNodeStatus()
- for i, vol := range KeepVolumes {
+ for i := range vols {
volinfo := st.Volumes[i]
mtp := volinfo.MountPoint
- if mtp != vol {
- t.Errorf("GetNodeStatus mount_point %s != KeepVolume %s", mtp, vol)
+ if mtp != "/bogo" {
+ t.Errorf("GetNodeStatus mount_point %s, expected /bogo", mtp)
}
if volinfo.DeviceNum == 0 {
t.Errorf("uninitialized device_num in %v", volinfo)
// Helper functions for unit tests.
// ========================================
-// setup
-// Create KeepVolumes for testing.
-// Returns a slice of pathnames to temporary Keep volumes.
+// MakeTestVolumeManager
+// Creates and returns a RRVolumeManager with the specified number
+// of MockVolumes.
//
-func setup(t *testing.T, num_volumes int) []string {
- vols := make([]string, num_volumes)
+func MakeTestVolumeManager(num_volumes int) VolumeManager {
+ vols := make([]Volume, num_volumes)
for i := range vols {
- if dir, err := ioutil.TempDir(os.TempDir(), "keeptest"); err == nil {
- vols[i] = dir + "/keep"
- os.Mkdir(vols[i], 0755)
- } else {
- t.Fatal(err)
- }
+ vols[i] = CreateMockVolume()
}
- return vols
+ return MakeRRVolumeManager(vols)
}
// teardown
// Cleanup to perform after each test.
//
func teardown() {
- for _, vol := range KeepVolumes {
- os.RemoveAll(path.Dir(vol))
- }
- KeepVolumes = nil
-}
-
-// store
-// Low-level code to write Keep blocks directly to disk for testing.
-//
-func store(t *testing.T, keepdir string, filename string, block []byte) {
- blockdir := fmt.Sprintf("%s/%s", keepdir, filename[:3])
- if err := os.MkdirAll(blockdir, 0755); err != nil {
- t.Fatal(err)
- }
-
- blockpath := fmt.Sprintf("%s/%s", blockdir, filename)
- if f, err := os.Create(blockpath); err == nil {
- f.Write(block)
- f.Close()
- } else {
- t.Fatal(err)
- }
+ KeepVM = nil
}
--- /dev/null
+// A Volume is an interface representing a Keep back-end storage unit:
+// for example, a single mounted disk, a RAID array, an Amazon S3 volume,
+// etc.
+
+package main
+
+import (
+ "errors"
+ "fmt"
+ "strings"
+)
+
+type Volume interface {
+ Get(loc string) ([]byte, error)
+ Put(loc string, block []byte) error
+ Index(prefix string) string
+ Status() *VolumeStatus
+ String() string
+}
+
+// MockVolumes are Volumes used to test the Keep front end.
+//
+// If the Bad field is true, this volume should return an error
+// on all writes and puts.
+//
+type MockVolume struct {
+ Store map[string][]byte
+ Bad bool
+}
+
+func CreateMockVolume() *MockVolume {
+ return &MockVolume{make(map[string][]byte), false}
+}
+
+func (v *MockVolume) Get(loc string) ([]byte, error) {
+ if v.Bad {
+ return nil, errors.New("Bad volume")
+ } else if block, ok := v.Store[loc]; ok {
+ return block, nil
+ }
+ return nil, errors.New("not found")
+}
+
+func (v *MockVolume) Put(loc string, block []byte) error {
+ if v.Bad {
+ return errors.New("Bad volume")
+ }
+ v.Store[loc] = block
+ return nil
+}
+
+func (v *MockVolume) Index(prefix string) string {
+ var result string
+ for loc, block := range v.Store {
+ if IsValidLocator(loc) && strings.HasPrefix(loc, prefix) {
+ result = result + fmt.Sprintf("%s+%d %d\n",
+ loc, len(block), 123456789)
+ }
+ }
+ return result
+}
+
+func (v *MockVolume) Status() *VolumeStatus {
+ var used uint64
+ for _, block := range v.Store {
+ used = used + uint64(len(block))
+ }
+ return &VolumeStatus{"/bogo", 123, 1000000 - used, used}
+}
+
+func (v *MockVolume) String() string {
+ return "[MockVolume]"
+}
+
+// A VolumeManager manages a collection of volumes.
+//
+// - Volumes is a slice of available Volumes.
+// - Choose() returns a Volume suitable for writing to.
+// - Quit() instructs the VolumeManager to shut down gracefully.
+//
+type VolumeManager interface {
+ Volumes() []Volume
+ Choose() Volume
+ Quit()
+}
+
+type RRVolumeManager struct {
+ volumes []Volume
+ nextwrite chan Volume
+ quit chan int
+}
+
+func MakeRRVolumeManager(vols []Volume) *RRVolumeManager {
+ // Create a new VolumeManager struct with the specified volumes,
+ // and with new Nextwrite and Quit channels.
+ // The Quit channel is buffered with a capacity of 1 so that
+ // another routine may write to it without blocking.
+ vm := &RRVolumeManager{vols, make(chan Volume), make(chan int, 1)}
+
+ // This goroutine implements round-robin volume selection.
+ // It sends each available Volume in turn to the Nextwrite
+ // channel, until receiving a notification on the Quit channel
+ // that it should terminate.
+ go func() {
+ var i int = 0
+ for {
+ select {
+ case <-vm.quit:
+ return
+ case vm.nextwrite <- vm.volumes[i]:
+ i = (i + 1) % len(vm.volumes)
+ }
+ }
+ }()
+
+ return vm
+}
+
+func (vm *RRVolumeManager) Volumes() []Volume {
+ return vm.volumes
+}
+
+func (vm *RRVolumeManager) Choose() Volume {
+ return <-vm.nextwrite
+}
+
+func (vm *RRVolumeManager) Quit() {
+ vm.quit <- 1
+}
--- /dev/null
+// A UnixVolume is a Volume backed by a locally mounted disk.
+//
+package main
+
+import (
+ "fmt"
+ "io/ioutil"
+ "log"
+ "os"
+ "path/filepath"
+ "strconv"
+ "strings"
+ "syscall"
+ "time"
+)
+
+// IORequests are encapsulated Get or Put requests. They are used to
+// implement serialized I/O (i.e. only one read/write operation per
+// volume). When running in serialized mode, the Keep front end sends
+// IORequests on a channel to an IORunner, which handles them one at a
+// time and returns an IOResponse.
+//
+type IOMethod int
+
+const (
+ KeepGet IOMethod = iota
+ KeepPut
+)
+
+type IORequest struct {
+ method IOMethod
+ loc string
+ data []byte
+ reply chan *IOResponse
+}
+
+type IOResponse struct {
+ data []byte
+ err error
+}
+
+// A UnixVolume has the following properties:
+//
+// root
+// the path to the volume's root directory
+// queue
+// A channel of IORequests. If non-nil, all I/O requests for
+// this volume should be queued on this channel; the result
+// will be delivered on the IOResponse channel supplied in the
+// request.
+//
+type UnixVolume struct {
+ root string // path to this volume
+ queue chan *IORequest
+}
+
+func (v *UnixVolume) IOHandler() {
+ for req := range v.queue {
+ var result IOResponse
+ switch req.method {
+ case KeepGet:
+ result.data, result.err = v.Read(req.loc)
+ case KeepPut:
+ result.err = v.Write(req.loc, req.data)
+ }
+ req.reply <- &result
+ }
+}
+
+func MakeUnixVolume(root string, serialize bool) (v UnixVolume) {
+ if serialize {
+ v = UnixVolume{root, make(chan *IORequest)}
+ go v.IOHandler()
+ } else {
+ v = UnixVolume{root, nil}
+ }
+ return
+}
+
+func (v *UnixVolume) Get(loc string) ([]byte, error) {
+ if v.queue == nil {
+ return v.Read(loc)
+ }
+ reply := make(chan *IOResponse)
+ v.queue <- &IORequest{KeepGet, loc, nil, reply}
+ response := <-reply
+ return response.data, response.err
+}
+
+func (v *UnixVolume) Put(loc string, block []byte) error {
+ if v.queue == nil {
+ return v.Write(loc, block)
+ }
+ reply := make(chan *IOResponse)
+ v.queue <- &IORequest{KeepPut, loc, block, reply}
+ response := <-reply
+ return response.err
+}
+
+// Read retrieves a block identified by the locator string "loc", and
+// returns its contents as a byte slice.
+//
+// If the block could not be opened or read, Read returns a nil slice
+// and the os.Error that was generated.
+//
+// If the block is present but its content hash does not match loc,
+// Read returns the block and a CorruptError. It is the caller's
+// responsibility to decide what (if anything) to do with the
+// corrupted data block.
+//
+func (v *UnixVolume) Read(loc string) ([]byte, error) {
+ var f *os.File
+ var err error
+ var buf []byte
+
+ blockFilename := filepath.Join(v.root, loc[0:3], loc)
+
+ f, err = os.Open(blockFilename)
+ if err != nil {
+ return nil, err
+ }
+
+ if buf, err = ioutil.ReadAll(f); err != nil {
+ log.Printf("%s: reading %s: %s\n", v, blockFilename, err)
+ return buf, err
+ }
+
+ // Success!
+ return buf, nil
+}
+
+// Write stores a block of data identified by the locator string
+// "loc". It returns nil on success. If the volume is full, it
+// returns a FullError. If the write fails due to some other error,
+// that error is returned.
+//
+func (v *UnixVolume) Write(loc string, block []byte) error {
+ if v.IsFull() {
+ return FullError
+ }
+ blockDir := filepath.Join(v.root, loc[0:3])
+ if err := os.MkdirAll(blockDir, 0755); err != nil {
+ log.Printf("%s: could not create directory %s: %s",
+ loc, blockDir, err)
+ return err
+ }
+
+ tmpfile, tmperr := ioutil.TempFile(blockDir, "tmp"+loc)
+ if tmperr != nil {
+ log.Printf("ioutil.TempFile(%s, tmp%s): %s", blockDir, loc, tmperr)
+ return tmperr
+ }
+ blockFilename := filepath.Join(blockDir, loc)
+
+ if _, err := tmpfile.Write(block); err != nil {
+ log.Printf("%s: writing to %s: %s\n", v, blockFilename, err)
+ return err
+ }
+ if err := tmpfile.Close(); err != nil {
+ log.Printf("closing %s: %s\n", tmpfile.Name(), err)
+ os.Remove(tmpfile.Name())
+ return err
+ }
+ if err := os.Rename(tmpfile.Name(), blockFilename); err != nil {
+ log.Printf("rename %s %s: %s\n", tmpfile.Name(), blockFilename, err)
+ os.Remove(tmpfile.Name())
+ return err
+ }
+ return nil
+}
+
+// Status returns a VolumeStatus struct describing the volume's
+// current state.
+//
+func (v *UnixVolume) Status() *VolumeStatus {
+ var fs syscall.Statfs_t
+ var devnum uint64
+
+ if fi, err := os.Stat(v.root); err == nil {
+ devnum = fi.Sys().(*syscall.Stat_t).Dev
+ } else {
+ log.Printf("%s: os.Stat: %s\n", v, err)
+ return nil
+ }
+
+ err := syscall.Statfs(v.root, &fs)
+ if err != nil {
+ log.Printf("%s: statfs: %s\n", v, err)
+ return nil
+ }
+ // These calculations match the way df calculates disk usage:
+ // "free" space is measured by fs.Bavail, but "used" space
+ // uses fs.Blocks - fs.Bfree.
+ free := fs.Bavail * uint64(fs.Bsize)
+ used := (fs.Blocks - fs.Bfree) * uint64(fs.Bsize)
+ return &VolumeStatus{v.root, devnum, free, used}
+}
+
+// Index returns a list of blocks found on this volume which begin with
+// the specified prefix. If the prefix is an empty string, Index returns
+// a complete list of blocks.
+//
+// The return value is a multiline string (separated by
+// newlines). Each line is in the format
+//
+// locator+size modification-time
+//
+// e.g.:
+//
+// e4df392f86be161ca6ed3773a962b8f3+67108864 1388894303
+// e4d41e6fd68460e0e3fc18cc746959d2+67108864 1377796043
+// e4de7a2810f5554cd39b36d8ddb132ff+67108864 1388701136
+//
+func (v *UnixVolume) Index(prefix string) (output string) {
+ filepath.Walk(v.root,
+ func(path string, info os.FileInfo, err error) error {
+ // This WalkFunc inspects each path in the volume
+ // and prints an index line for all files that begin
+ // with prefix.
+ if err != nil {
+ log.Printf("IndexHandler: %s: walking to %s: %s",
+ v, path, err)
+ return nil
+ }
+ locator := filepath.Base(path)
+ // Skip directories that do not match prefix.
+ // We know there is nothing interesting inside.
+ if info.IsDir() &&
+ !strings.HasPrefix(locator, prefix) &&
+ !strings.HasPrefix(prefix, locator) {
+ return filepath.SkipDir
+ }
+ // Skip any file that is not apparently a locator, e.g. .meta files
+ if !IsValidLocator(locator) {
+ return nil
+ }
+ // Print filenames beginning with prefix
+ if !info.IsDir() && strings.HasPrefix(locator, prefix) {
+ output = output + fmt.Sprintf(
+ "%s+%d %d\n", locator, info.Size(), info.ModTime().Unix())
+ }
+ return nil
+ })
+
+ return
+}
+
+// IsFull returns true if the free space on the volume is less than
+// MIN_FREE_KILOBYTES.
+//
+func (v *UnixVolume) IsFull() (isFull bool) {
+ fullSymlink := v.root + "/full"
+
+ // Check if the volume has been marked as full in the last hour.
+ if link, err := os.Readlink(fullSymlink); err == nil {
+ if ts, err := strconv.Atoi(link); err == nil {
+ fulltime := time.Unix(int64(ts), 0)
+ if time.Since(fulltime).Hours() < 1.0 {
+ return true
+ }
+ }
+ }
+
+ if avail, err := v.FreeDiskSpace(); err == nil {
+ isFull = avail < MIN_FREE_KILOBYTES
+ } else {
+ log.Printf("%s: FreeDiskSpace: %s\n", v, err)
+ isFull = false
+ }
+
+ // If the volume is full, timestamp it.
+ if isFull {
+ now := fmt.Sprintf("%d", time.Now().Unix())
+ os.Symlink(now, fullSymlink)
+ }
+ return
+}
+
+// FreeDiskSpace returns the number of unused 1k blocks available on
+// the volume.
+//
+func (v *UnixVolume) FreeDiskSpace() (free uint64, err error) {
+ var fs syscall.Statfs_t
+ err = syscall.Statfs(v.root, &fs)
+ if err == nil {
+ // Statfs output is not guaranteed to measure free
+ // space in terms of 1K blocks.
+ free = fs.Bavail * uint64(fs.Bsize) / 1024
+ }
+ return
+}
+
+func (v *UnixVolume) String() string {
+ return fmt.Sprintf("[UnixVolume %s]", v.root)
+}
--- /dev/null
+package main
+
+import (
+ "bytes"
+ "fmt"
+ "io/ioutil"
+ "os"
+ "testing"
+ "time"
+)
+
+func TempUnixVolume(t *testing.T, serialize bool) UnixVolume {
+ d, err := ioutil.TempDir("", "volume_test")
+ if err != nil {
+ t.Fatal(err)
+ }
+ return MakeUnixVolume(d, serialize)
+}
+
+func _teardown(v UnixVolume) {
+ if v.queue != nil {
+ close(v.queue)
+ }
+ os.RemoveAll(v.root)
+}
+
+// store writes a Keep block directly into a UnixVolume, for testing
+// UnixVolume methods.
+//
+func _store(t *testing.T, vol UnixVolume, filename string, block []byte) {
+ blockdir := fmt.Sprintf("%s/%s", vol.root, filename[:3])
+ if err := os.MkdirAll(blockdir, 0755); err != nil {
+ t.Fatal(err)
+ }
+
+ blockpath := fmt.Sprintf("%s/%s", blockdir, filename)
+ if f, err := os.Create(blockpath); err == nil {
+ f.Write(block)
+ f.Close()
+ } else {
+ t.Fatal(err)
+ }
+}
+
+func TestGet(t *testing.T) {
+ v := TempUnixVolume(t, false)
+ defer _teardown(v)
+ _store(t, v, TEST_HASH, TEST_BLOCK)
+
+ buf, err := v.Get(TEST_HASH)
+ if err != nil {
+ t.Error(err)
+ }
+ if bytes.Compare(buf, TEST_BLOCK) != 0 {
+ t.Errorf("expected %s, got %s", string(TEST_BLOCK), string(buf))
+ }
+}
+
+func TestGetNotFound(t *testing.T) {
+ v := TempUnixVolume(t, false)
+ defer _teardown(v)
+ _store(t, v, TEST_HASH, TEST_BLOCK)
+
+ buf, err := v.Get(TEST_HASH_2)
+ switch {
+ case os.IsNotExist(err):
+ break
+ case err == nil:
+ t.Errorf("Read should have failed, returned %s", string(buf))
+ default:
+ t.Errorf("Read expected ErrNotExist, got: %s", err)
+ }
+}
+
+func TestPut(t *testing.T) {
+ v := TempUnixVolume(t, false)
+ defer _teardown(v)
+
+ err := v.Put(TEST_HASH, TEST_BLOCK)
+ if err != nil {
+ t.Error(err)
+ }
+ p := fmt.Sprintf("%s/%s/%s", v.root, TEST_HASH[:3], TEST_HASH)
+ if buf, err := ioutil.ReadFile(p); err != nil {
+ t.Error(err)
+ } else if bytes.Compare(buf, TEST_BLOCK) != 0 {
+ t.Errorf("Write should have stored %s, did store %s",
+ string(TEST_BLOCK), string(buf))
+ }
+}
+
+func TestPutBadVolume(t *testing.T) {
+ v := TempUnixVolume(t, false)
+ defer _teardown(v)
+
+ os.Chmod(v.root, 000)
+ err := v.Put(TEST_HASH, TEST_BLOCK)
+ if err == nil {
+ t.Error("Write should have failed")
+ }
+}
+
+// Serialization tests: launch a bunch of concurrent
+//
+// TODO(twp): show that the underlying Read/Write operations executed
+// serially and not concurrently. The easiest way to do this is
+// probably to activate verbose or debug logging, capture log output
+// and examine it to confirm that Reads and Writes did not overlap.
+//
+// TODO(twp): a proper test of I/O serialization requires that a
+// second request start while the first one is still underway.
+// Guaranteeing that the test behaves this way requires some tricky
+// synchronization and mocking. For now we'll just launch a bunch of
+// requests simultaenously in goroutines and demonstrate that they
+// return accurate results.
+//
+func TestGetSerialized(t *testing.T) {
+ // Create a volume with I/O serialization enabled.
+ v := TempUnixVolume(t, true)
+ defer _teardown(v)
+
+ _store(t, v, TEST_HASH, TEST_BLOCK)
+ _store(t, v, TEST_HASH_2, TEST_BLOCK_2)
+ _store(t, v, TEST_HASH_3, TEST_BLOCK_3)
+
+ sem := make(chan int)
+ go func(sem chan int) {
+ buf, err := v.Get(TEST_HASH)
+ if err != nil {
+ t.Errorf("err1: %v", err)
+ }
+ if bytes.Compare(buf, TEST_BLOCK) != 0 {
+ t.Errorf("buf should be %s, is %s", string(TEST_BLOCK), string(buf))
+ }
+ sem <- 1
+ }(sem)
+
+ go func(sem chan int) {
+ buf, err := v.Get(TEST_HASH_2)
+ if err != nil {
+ t.Errorf("err2: %v", err)
+ }
+ if bytes.Compare(buf, TEST_BLOCK_2) != 0 {
+ t.Errorf("buf should be %s, is %s", string(TEST_BLOCK_2), string(buf))
+ }
+ sem <- 1
+ }(sem)
+
+ go func(sem chan int) {
+ buf, err := v.Get(TEST_HASH_3)
+ if err != nil {
+ t.Errorf("err3: %v", err)
+ }
+ if bytes.Compare(buf, TEST_BLOCK_3) != 0 {
+ t.Errorf("buf should be %s, is %s", string(TEST_BLOCK_3), string(buf))
+ }
+ sem <- 1
+ }(sem)
+
+ // Wait for all goroutines to finish
+ for done := 0; done < 3; {
+ done += <-sem
+ }
+}
+
+func TestPutSerialized(t *testing.T) {
+ // Create a volume with I/O serialization enabled.
+ v := TempUnixVolume(t, true)
+ defer _teardown(v)
+
+ sem := make(chan int)
+ go func(sem chan int) {
+ err := v.Put(TEST_HASH, TEST_BLOCK)
+ if err != nil {
+ t.Errorf("err1: %v", err)
+ }
+ sem <- 1
+ }(sem)
+
+ go func(sem chan int) {
+ err := v.Put(TEST_HASH_2, TEST_BLOCK_2)
+ if err != nil {
+ t.Errorf("err2: %v", err)
+ }
+ sem <- 1
+ }(sem)
+
+ go func(sem chan int) {
+ err := v.Put(TEST_HASH_3, TEST_BLOCK_3)
+ if err != nil {
+ t.Errorf("err3: %v", err)
+ }
+ sem <- 1
+ }(sem)
+
+ // Wait for all goroutines to finish
+ for done := 0; done < 2; {
+ done += <-sem
+ }
+
+ // Double check that we actually wrote the blocks we expected to write.
+ buf, err := v.Get(TEST_HASH)
+ if err != nil {
+ t.Errorf("Get #1: %v", err)
+ }
+ if bytes.Compare(buf, TEST_BLOCK) != 0 {
+ t.Errorf("Get #1: expected %s, got %s", string(TEST_BLOCK), string(buf))
+ }
+
+ buf, err = v.Get(TEST_HASH_2)
+ if err != nil {
+ t.Errorf("Get #2: %v", err)
+ }
+ if bytes.Compare(buf, TEST_BLOCK_2) != 0 {
+ t.Errorf("Get #2: expected %s, got %s", string(TEST_BLOCK_2), string(buf))
+ }
+
+ buf, err = v.Get(TEST_HASH_3)
+ if err != nil {
+ t.Errorf("Get #3: %v", err)
+ }
+ if bytes.Compare(buf, TEST_BLOCK_3) != 0 {
+ t.Errorf("Get #3: expected %s, got %s", string(TEST_BLOCK_3), string(buf))
+ }
+}
+
+func TestIsFull(t *testing.T) {
+ v := TempUnixVolume(t, false)
+ defer _teardown(v)
+
+ full_path := v.root + "/full"
+ now := fmt.Sprintf("%d", time.Now().Unix())
+ os.Symlink(now, full_path)
+ if !v.IsFull() {
+ t.Errorf("%s: claims not to be full", v)
+ }
+ os.Remove(full_path)
+
+ // Test with an expired /full link.
+ expired := fmt.Sprintf("%d", time.Now().Unix()-3605)
+ os.Symlink(expired, full_path)
+ if v.IsFull() {
+ t.Errorf("%s: should no longer be full", v)
+ }
+}