Merge branch 'master' into 2525-java-sdk
authorradhika <radhika@curoverse.com>
Thu, 1 May 2014 00:01:13 +0000 (20:01 -0400)
committerradhika <radhika@curoverse.com>
Thu, 1 May 2014 00:01:13 +0000 (20:01 -0400)
35 files changed:
apps/workbench/app/controllers/websocket_controller.rb [new file with mode: 0644]
apps/workbench/app/views/websocket/index.html.erb [new file with mode: 0644]
apps/workbench/config/routes.rb
services/api/Gemfile
services/api/Gemfile.lock
services/api/app/controllers/application_controller.rb
services/api/app/controllers/arvados/v1/schema_controller.rb
services/api/app/middlewares/arvados_api_token.rb [new file with mode: 0644]
services/api/app/middlewares/rack_socket.rb [new file with mode: 0644]
services/api/app/models/api_client_authorization.rb
services/api/app/models/arvados_model.rb
services/api/app/models/log.rb
services/api/config/application.yml.example
services/api/config/initializers/authorization.rb [new file with mode: 0644]
services/api/config/initializers/eventbus.rb [new file with mode: 0644]
services/api/db/migrate/20140423132913_add_object_owner_to_logs.rb [new file with mode: 0644]
services/api/db/schema.rb
services/api/lib/current_api_client.rb
services/api/lib/eventbus.rb [new file with mode: 0644]
services/api/lib/load_param.rb [new file with mode: 0644]
services/api/lib/record_filters.rb [new file with mode: 0644]
services/api/test/fixtures/api_client_authorizations.yml
services/api/test/fixtures/logs.yml
services/api/test/fixtures/repositories.yml
services/api/test/integration/api_client_authorizations_scopes_test.rb
services/api/test/integration/reader_tokens_test.rb [new file with mode: 0644]
services/api/test/integration/websocket_test.rb [new file with mode: 0644]
services/api/test/test_helper.rb
services/api/test/unit/log_test.rb
services/api/test/websocket_runner.rb [new file with mode: 0644]
services/keep/src/keep/keep.go
services/keep/src/keep/keep_test.go
services/keep/src/keep/volume.go [new file with mode: 0644]
services/keep/src/keep/volume_unix.go [new file with mode: 0644]
services/keep/src/keep/volume_unix_test.go [new file with mode: 0644]

diff --git a/apps/workbench/app/controllers/websocket_controller.rb b/apps/workbench/app/controllers/websocket_controller.rb
new file mode 100644 (file)
index 0000000..a49c15f
--- /dev/null
@@ -0,0 +1,10 @@
+class WebsocketController < ApplicationController
+  skip_before_filter :find_objects_for_index
+
+  def index
+  end
+
+  def model_class
+    "Websocket"
+  end
+end
diff --git a/apps/workbench/app/views/websocket/index.html.erb b/apps/workbench/app/views/websocket/index.html.erb
new file mode 100644 (file)
index 0000000..85202b8
--- /dev/null
@@ -0,0 +1,34 @@
+<% content_for :page_title do %>
+  Event bus debugging page
+<% end %>
+<h1>Event bus debugging page</h1>
+
+<form>
+<textarea style="width:100%; height: 10em" id="websocket-message-content"></textarea>
+<button type="button" id="send-to-websocket">Send</button>
+</form>
+
+<br>
+
+<p id="PutStuffHere"></p>
+
+<script>
+$(function() {
+putStuffThere = function (content) {
+  $("#PutStuffHere").append(content + "<br>");
+};
+
+var dispatcher = new WebSocket('<%= $arvados_api_client.discovery[:websocketUrl] %>?api_token=<%= Thread.current[:arvados_api_token] %>');
+dispatcher.onmessage = function(event) {
+  //putStuffThere(JSON.parse(event.data));
+  putStuffThere(event.data);
+};
+
+sendStuff = function () {
+  dispatcher.send($("#websocket-message-content").val());
+};
+
+$("#send-to-websocket").click(sendStuff);
+});
+
+</script>
index f915671b530cd23b575ee74dcabbb034d331ae56..cac3431667a80e0c9437747b8de0eb6cdc9d8c79 100644 (file)
@@ -46,6 +46,7 @@ ArvadosWorkbench::Application.routes.draw do
   get '/collections/:uuid/*file' => 'collections#show_file', :format => false
 
   post 'actions' => 'actions#post'
+  get 'websockets' => 'websocket#index'
 
   root :to => 'users#welcome'
 
index e50802ef9920585c00574691eda90b68b230df13..3b715d3a82c3edd0bcc4c1a5e0303126dfb083ff 100644 (file)
@@ -62,6 +62,8 @@ gem 'test_after_commit', :group => :test
 
 gem 'google-api-client', '~> 0.6.3'
 gem 'trollop'
+gem 'faye-websocket'
+gem 'database_cleaner'
 
 gem 'themes_for_rails'
 
index 7a516d5725c30c349797966e658e66be0925aadd..d00e681cf739c90ff2d1ef869973c14fdf258362 100644 (file)
@@ -69,11 +69,16 @@ GEM
     coffee-script-source (1.7.0)
     curb (0.8.5)
     daemon_controller (1.2.0)
+    database_cleaner (1.2.0)
     erubis (2.7.0)
+    eventmachine (1.0.3)
     execjs (2.0.2)
     extlib (0.9.16)
     faraday (0.8.9)
       multipart-post (~> 1.2.0)
+    faye-websocket (0.7.2)
+      eventmachine (>= 0.12.0)
+      websocket-driver (>= 0.3.1)
     google-api-client (0.6.4)
       addressable (>= 2.3.2)
       autoparse (>= 0.3.3)
@@ -193,6 +198,7 @@ GEM
       execjs (>= 0.3.0)
       json (>= 1.8.0)
     uuidtools (2.1.4)
+    websocket-driver (0.3.2)
 
 PLATFORMS
   ruby
@@ -202,6 +208,8 @@ DEPENDENCIES
   andand
   arvados-cli (>= 0.1.20140328152103)
   coffee-rails (~> 3.2.0)
+  database_cleaner
+  faye-websocket
   google-api-client (~> 0.6.3)
   jquery-rails
   multi_json
index d40c6de47ee2cfff4c5de92a31cc3d062cf8686c..4b5a27d99621d4d25e4c66fcb30581fadace4186 100644 (file)
@@ -1,34 +1,37 @@
+require 'load_param'
+require 'record_filters'
+
 class ApplicationController < ActionController::Base
   include CurrentApiClient
   include ThemesForRails::ActionController
+  include LoadParam
+  include RecordFilters
+
+  ERROR_ACTIONS = [:render_error, :render_not_found]
 
   respond_to :json
   protect_from_forgery
-  around_filter :thread_with_auth_info, :except => [:render_error, :render_not_found]
 
   before_filter :respond_with_json_by_default
   before_filter :remote_ip
-  before_filter :require_auth_scope, :except => :render_not_found
-  before_filter :catch_redirect_hint
+  before_filter :load_read_auths
+  before_filter :require_auth_scope, except: ERROR_ACTIONS
 
-  before_filter :find_object_by_uuid, :except => [:index, :create,
-                                                  :render_error,
-                                                  :render_not_found]
+  before_filter :catch_redirect_hint
+  before_filter(:find_object_by_uuid,
+                except: [:index, :create] + ERROR_ACTIONS)
   before_filter :load_limit_offset_order_params, only: [:index, :owned_items]
   before_filter :load_where_param, only: [:index, :owned_items]
   before_filter :load_filters_param, only: [:index, :owned_items]
   before_filter :find_objects_for_index, :only => :index
   before_filter :reload_object_before_update, :only => :update
-  before_filter :render_404_if_no_object, except: [:index, :create,
-                                                   :render_error,
-                                                   :render_not_found]
+  before_filter(:render_404_if_no_object,
+                except: [:index, :create] + ERROR_ACTIONS)
 
   theme :select_theme
 
   attr_accessor :resource_attrs
 
-  DEFAULT_LIMIT = 100
-
   def index
     @objects.uniq!(&:id)
     if params[:eager] and params[:eager] != '0' and params[:eager] != 0 and params[:eager] != ''
@@ -92,7 +95,7 @@ class ApplicationController < ActionController::Base
       when 'ApiClientAuthorization'
         # Do not want.
       else
-        @objects = klass.readable_by(current_user)
+        @objects = klass.readable_by(*@read_users)
         cond_sql = "#{klass.table_name}.owner_uuid = ?"
         cond_params = [@object.uuid]
         if params[:include_linked]
@@ -177,140 +180,19 @@ class ApplicationController < ActionController::Base
 
   protected
 
-  def load_where_param
-    if params[:where].nil? or params[:where] == ""
-      @where = {}
-    elsif params[:where].is_a? Hash
-      @where = params[:where]
-    elsif params[:where].is_a? String
-      begin
-        @where = Oj.load(params[:where])
-        raise unless @where.is_a? Hash
-      rescue
-        raise ArgumentError.new("Could not parse \"where\" param as an object")
-      end
-    end
-    @where = @where.with_indifferent_access
-  end
-
-  def load_filters_param
-    @filters ||= []
-    if params[:filters].is_a? Array
-      @filters += params[:filters]
-    elsif params[:filters].is_a? String and !params[:filters].empty?
-      begin
-        f = Oj.load params[:filters]
-        raise unless f.is_a? Array
-        @filters += f
-      rescue
-        raise ArgumentError.new("Could not parse \"filters\" param as an array")
-      end
-    end
-  end
-
-  def default_orders
-    ["#{table_name}.modified_at desc"]
-  end
-
-  def load_limit_offset_order_params
-    if params[:limit]
-      unless params[:limit].to_s.match(/^\d+$/)
-        raise ArgumentError.new("Invalid value for limit parameter")
-      end
-      @limit = params[:limit].to_i
-    else
-      @limit = DEFAULT_LIMIT
-    end
-
-    if params[:offset]
-      unless params[:offset].to_s.match(/^\d+$/)
-        raise ArgumentError.new("Invalid value for offset parameter")
-      end
-      @offset = params[:offset].to_i
-    else
-      @offset = 0
-    end
-
-    @orders = []
-    if params[:order]
-      params[:order].split(',').each do |order|
-        attr, direction = order.strip.split " "
-        direction ||= 'asc'
-        if attr.match /^[a-z][_a-z0-9]+$/ and
-            model_class.columns.collect(&:name).index(attr) and
-            ['asc','desc'].index direction.downcase
-          @orders << "#{table_name}.#{attr} #{direction.downcase}"
-        end
-      end
-    end
-    if @orders.empty?
-      @orders = default_orders
-    end
-  end
-
   def find_objects_for_index
-    @objects ||= model_class.readable_by(current_user)
+    @objects ||= model_class.readable_by(*@read_users)
     apply_where_limit_order_params
   end
 
   def apply_where_limit_order_params
     ar_table_name = @objects.table_name
-    if @filters.is_a? Array and @filters.any?
-      cond_out = []
-      param_out = []
-      @filters.each do |filter|
-        attr, operator, operand = filter
-        if !filter.is_a? Array
-          raise ArgumentError.new("Invalid element in filters array: #{filter.inspect} is not an array")
-        elsif !operator.is_a? String
-          raise ArgumentError.new("Invalid operator '#{operator}' (#{operator.class}) in filter")
-        elsif !model_class.searchable_columns(operator).index attr.to_s
-          raise ArgumentError.new("Invalid attribute '#{attr}' in filter")
-        end
-        case operator.downcase
-        when '=', '<', '<=', '>', '>=', 'like'
-          if operand.is_a? String
-            cond_out << "#{ar_table_name}.#{attr} #{operator} ?"
-            if (# any operator that operates on value rather than
-                # representation:
-                operator.match(/[<=>]/) and
-                model_class.attribute_column(attr).type == :datetime)
-              operand = Time.parse operand
-            end
-            param_out << operand
-          elsif operand.nil? and operator == '='
-            cond_out << "#{ar_table_name}.#{attr} is null"
-          else
-            raise ArgumentError.new("Invalid operand type '#{operand.class}' "\
-                                    "for '#{operator}' operator in filters")
-          end
-        when 'in'
-          if operand.is_a? Array
-            cond_out << "#{ar_table_name}.#{attr} IN (?)"
-            param_out << operand
-          else
-            raise ArgumentError.new("Invalid operand type '#{operand.class}' "\
-                                    "for '#{operator}' operator in filters")
-          end
-        when 'is_a'
-          operand = [operand] unless operand.is_a? Array
-          cond = []
-          operand.each do |op|
-              cl = ArvadosModel::kind_class op
-              if cl
-                cond << "#{ar_table_name}.#{attr} like ?"
-                param_out << cl.uuid_like_pattern
-              else
-                cond << "1=0"
-              end
-          end
-          cond_out << cond.join(' OR ')
-        end
-      end
-      if cond_out.any?
-        @objects = @objects.where(cond_out.join(' AND '), *param_out)
-      end
+
+    ft = record_filters @filters, ar_table_name
+    if ft[:cond_out].any?
+      @objects = @objects.where(ft[:cond_out].join(' AND '), *ft[:param_out])
     end
+
     if @where.is_a? Hash and @where.any?
       conditions = ['1=1']
       @where.each do |attr,value|
@@ -391,15 +273,34 @@ class ApplicationController < ActionController::Base
   end
 
   # Authentication
+  def load_read_auths
+    @read_auths = []
+    if current_api_client_authorization
+      @read_auths << current_api_client_authorization
+    end
+    # Load reader tokens if this is a read request.
+    # If there are too many reader tokens, assume the request is malicious
+    # and ignore it.
+    if request.get? and params[:reader_tokens] and
+        params[:reader_tokens].size < 100
+      @read_auths += ApiClientAuthorization
+        .includes(:user)
+        .where('api_token IN (?) AND
+                (expires_at IS NULL OR expires_at > CURRENT_TIMESTAMP)',
+               params[:reader_tokens])
+        .all
+    end
+    @read_auths.select! { |auth| auth.scopes_allow_request? request }
+    @read_users = @read_auths.map { |auth| auth.user }.uniq
+  end
+
   def require_login
-    if current_user
-      true
-    else
+    if not current_user
       respond_to do |format|
         format.json {
           render :json => { errors: ['Not logged in'] }.to_json, status: 401
         }
-        format.html  {
+        format.html {
           redirect_to '/auth/joshid'
         }
       end
@@ -414,68 +315,13 @@ class ApplicationController < ActionController::Base
   end
 
   def require_auth_scope
-    return false unless require_login
-    unless current_api_client_auth_has_scope("#{request.method} #{request.path}")
-      render :json => { errors: ['Forbidden'] }.to_json, status: 403
-    end
-  end
-
-  def thread_with_auth_info
-    Thread.current[:request_starttime] = Time.now
-    Thread.current[:api_url_base] = root_url.sub(/\/$/,'') + '/arvados/v1'
-    begin
-      user = nil
-      api_client = nil
-      api_client_auth = nil
-      supplied_token =
-        params[:api_token] ||
-        params[:oauth_token] ||
-        request.headers["Authorization"].andand.match(/OAuth2 ([a-z0-9]+)/).andand[1]
-      if supplied_token
-        api_client_auth = ApiClientAuthorization.
-          includes(:api_client, :user).
-          where('api_token=? and (expires_at is null or expires_at > CURRENT_TIMESTAMP)', supplied_token).
-          first
-        if api_client_auth.andand.user
-          session[:user_id] = api_client_auth.user.id
-          session[:api_client_uuid] = api_client_auth.api_client.andand.uuid
-          session[:api_client_authorization_id] = api_client_auth.id
-          user = api_client_auth.user
-          api_client = api_client_auth.api_client
-        else
-          # Token seems valid, but points to a non-existent (deleted?) user.
-          api_client_auth = nil
-        end
-      elsif session[:user_id]
-        user = User.find(session[:user_id]) rescue nil
-        api_client = ApiClient.
-          where('uuid=?',session[:api_client_uuid]).
-          first rescue nil
-        if session[:api_client_authorization_id] then
-          api_client_auth = ApiClientAuthorization.
-            find session[:api_client_authorization_id]
-        end
-      end
-      Thread.current[:api_client_ip_address] = remote_ip
-      Thread.current[:api_client_authorization] = api_client_auth
-      Thread.current[:api_client_uuid] = api_client.andand.uuid
-      Thread.current[:api_client] = api_client
-      Thread.current[:user] = user
-      if api_client_auth
-        api_client_auth.last_used_at = Time.now
-        api_client_auth.last_used_by_ip_address = remote_ip
-        api_client_auth.save validate: false
+    if @read_auths.empty?
+      if require_login != false
+        render :json => { errors: ['Forbidden'] }.to_json, status: 403
       end
-      yield
-    ensure
-      Thread.current[:api_client_ip_address] = nil
-      Thread.current[:api_client_authorization] = nil
-      Thread.current[:api_client_uuid] = nil
-      Thread.current[:api_client] = nil
-      Thread.current[:user] = nil
+      false
     end
   end
-  # /Authentication
 
   def respond_with_json_by_default
     html_index = request.accepts.index(Mime::HTML)
@@ -518,28 +364,38 @@ class ApplicationController < ActionController::Base
     end
   end
 
-  def self.accept_attribute_as_json(attr, force_class=nil)
-    before_filter lambda { accept_attribute_as_json attr, force_class }
+  def load_json_value(hash, key, must_be_class=nil)
+    if hash[key].is_a? String
+      hash[key] = Oj.load(hash[key], symbol_keys: false)
+      if must_be_class and !hash[key].is_a? must_be_class
+        raise TypeError.new("parameter #{key.to_s} must be a #{must_be_class.to_s}")
+      end
+    end
+  end
+
+  def self.accept_attribute_as_json(attr, must_be_class=nil)
+    before_filter lambda { accept_attribute_as_json attr, must_be_class }
   end
   accept_attribute_as_json :properties, Hash
   accept_attribute_as_json :info, Hash
-  def accept_attribute_as_json(attr, force_class)
+  def accept_attribute_as_json(attr, must_be_class)
     if params[resource_name] and resource_attrs.is_a? Hash
-      if resource_attrs[attr].is_a? String
-        resource_attrs[attr] = Oj.load(resource_attrs[attr],
-                                       symbol_keys: false)
-        if force_class and !resource_attrs[attr].is_a? force_class
-          raise TypeError.new("#{resource_name}[#{attr.to_s}] must be a #{force_class.to_s}")
-        end
-      elsif resource_attrs[attr].is_a? Hash
+      if resource_attrs[attr].is_a? Hash
         # Convert symbol keys to strings (in hashes provided by
         # resource_attrs)
         resource_attrs[attr] = resource_attrs[attr].
           with_indifferent_access.to_hash
+      else
+        load_json_value(resource_attrs, attr, must_be_class)
       end
     end
   end
 
+  def self.accept_param_as_json(key, must_be_class=nil)
+    prepend_before_filter lambda { load_json_value(params, key, must_be_class) }
+  end
+  accept_param_as_json :reader_tokens, Array
+
   def render_list
     @object_list = {
       :kind  => "arvados##{(@response_resource_name || resource_name).camelize(:lower)}List",
index 39aa024d58334f97c01f10731575837c25cd699b..48aa166acb2ad568c20b1164265fcb5017e3a1c9 100644 (file)
@@ -70,6 +70,12 @@ class Arvados::V1::SchemaController < ApplicationController
         resources: {}
       }
 
+      if Rails.application.config.websocket_address
+        discovery[:websocketUrl] = Rails.application.config.websocket_address
+      elsif ENV['ARVADOS_WEBSOCKETS']
+        discovery[:websocketUrl] = (root_url.sub /^http/, 'ws') + "/websocket"
+      end
+
       ActiveRecord::Base.descendants.reject(&:abstract_class?).each do |k|
         begin
           ctl_class = "Arvados::V1::#{k.to_s.pluralize}Controller".constantize
diff --git a/services/api/app/middlewares/arvados_api_token.rb b/services/api/app/middlewares/arvados_api_token.rb
new file mode 100644 (file)
index 0000000..57d3ad0
--- /dev/null
@@ -0,0 +1,61 @@
+# Perform api_token checking very early in the request process.  We want to do
+# this in the Rack stack instead of in ApplicationController because
+# websockets needs access to authentication but doesn't use any of the rails
+# active dispatch infrastructure.
+class ArvadosApiToken
+
+  # Create a new ArvadosApiToken handler
+  # +app+  The next layer of the Rack stack.
+  def initialize(app = nil, options = nil)
+    @app = app if app.respond_to?(:call)
+  end
+
+  def call env
+    # First, clean up just in case we have a multithreaded server and thread
+    # local variables are still set from a prior request.  Also useful for
+    # tests that call this code to set up the environment.
+    Thread.current[:api_client_ip_address] = nil
+    Thread.current[:api_client_authorization] = nil
+    Thread.current[:api_client_uuid] = nil
+    Thread.current[:api_client] = nil
+    Thread.current[:user] = nil
+
+    request = Rack::Request.new(env)
+    params = request.params
+    remote_ip = env["action_dispatch.remote_ip"]
+
+    Thread.current[:request_starttime] = Time.now
+    user = nil
+    api_client = nil
+    api_client_auth = nil
+    supplied_token =
+      params["api_token"] ||
+      params["oauth_token"] ||
+      env["HTTP_AUTHORIZATION"].andand.match(/OAuth2 ([a-z0-9]+)/).andand[1]
+    if supplied_token
+      api_client_auth = ApiClientAuthorization.
+        includes(:api_client, :user).
+        where('api_token=? and (expires_at is null or expires_at > CURRENT_TIMESTAMP)', supplied_token).
+        first
+      if api_client_auth.andand.user
+        user = api_client_auth.user
+        api_client = api_client_auth.api_client
+      else
+        # Token seems valid, but points to a non-existent (deleted?) user.
+        api_client_auth = nil
+      end
+    end
+    Thread.current[:api_client_ip_address] = remote_ip
+    Thread.current[:api_client_authorization] = api_client_auth
+    Thread.current[:api_client_uuid] = api_client.andand.uuid
+    Thread.current[:api_client] = api_client
+    Thread.current[:user] = user
+    if api_client_auth
+      api_client_auth.last_used_at = Time.now
+      api_client_auth.last_used_by_ip_address = remote_ip.to_s
+      api_client_auth.save validate: false
+    end
+
+    @app.call env if @app
+  end
+end
diff --git a/services/api/app/middlewares/rack_socket.rb b/services/api/app/middlewares/rack_socket.rb
new file mode 100644 (file)
index 0000000..795df4a
--- /dev/null
@@ -0,0 +1,86 @@
+require 'rack'
+require 'faye/websocket'
+require 'eventmachine'
+
+# A Rack middleware to handle inbound websocket connection requests and hand
+# them over to the faye websocket library.
+class RackSocket
+
+  DEFAULT_ENDPOINT  = '/websocket'
+
+  # Stop EventMachine on signal, this should give it a chance to to unwind any
+  # open connections.
+  def die_gracefully_on_signal
+    Signal.trap("INT") { EM.stop }
+    Signal.trap("TERM") { EM.stop }
+  end
+
+  # Create a new RackSocket handler
+  # +app+  The next layer of the Rack stack.
+  #
+  # Accepts options:
+  # +:handler+ (Required) A class to handle new connections.  #initialize will
+  # call handler.new to create the actual handler instance object.  When a new
+  # websocket connection is established, #on_connect on the handler instance
+  # object will be called with the new connection.
+  #
+  # +:mount+ The HTTP request path that will be recognized for websocket
+  # connect requests, defaults to '/websocket'.
+  #
+  # +:websocket_only+  If true, the server will only handle websocket requests,
+  # and all other requests will result in an error.  If false, unhandled
+  # non-websocket requests will be passed along on to 'app' in the usual Rack
+  # way.
+  def initialize(app = nil, options = nil)
+    @app = app if app.respond_to?(:call)
+    @options = [app, options].grep(Hash).first || {}
+    @endpoint = @options[:mount] || DEFAULT_ENDPOINT
+    @websocket_only = @options[:websocket_only] || false
+
+    # from https://gist.github.com/eatenbyagrue/1338545#file-eventmachine-rb
+    if defined?(PhusionPassenger)
+      PhusionPassenger.on_event(:starting_worker_process) do |forked|
+        # for passenger, we need to avoid orphaned threads
+        if forked && EM.reactor_running?
+          EM.stop
+        end
+        Thread.new {
+          EM.run
+        }
+        die_gracefully_on_signal
+      end
+    else
+      # faciliates debugging
+      Thread.abort_on_exception = true
+      # just spawn a thread and start it up
+      Thread.new {
+        EM.run
+      }
+    end
+
+    # Create actual handler instance object from handler class.
+    @handler = @options[:handler].new
+  end
+
+  # Handle websocket connection request, or pass on to the next middleware
+  # supplied in +app+ initialize (unless +:websocket_only+ option is true, in
+  # which case return an error response.)
+  # +env+ the Rack environment with information about the request.
+  def call env
+    request = Rack::Request.new(env)
+    if request.path_info == @endpoint and Faye::WebSocket.websocket?(env)
+      ws = Faye::WebSocket.new(env)
+
+      # Notify handler about new connection
+      @handler.on_connect ws
+
+      # Return async Rack response
+      ws.rack_response
+    elsif not @websocket_only
+      @app.call env
+    else
+      [406, {"Content-Type" => "text/plain"}, ["Only websocket connections are permitted on this port."]]
+    end
+  end
+
+end
index 3b73f408c3f03bf35f48a0711357a0487fdeee8e..2488d8322fbc97dfb9d3ee3446bf902bf581a9ce 100644 (file)
@@ -62,6 +62,18 @@ class ApiClientAuthorization < ArvadosModel
   end
   def modified_at=(x) end
 
+  def scopes_allow?(req_s)
+    scopes.each do |scope|
+      return true if (scope == 'all') or (scope == req_s) or
+        ((scope.end_with? '/') and (req_s.start_with? scope))
+    end
+    false
+  end
+
+  def scopes_allow_request?(request)
+    scopes_allow? [request.method, request.path].join(' ')
+  end
+
   def logged_attributes
     attrs = attributes.dup
     attrs.delete('api_token')
index e42194b07151468327233c4c1958b2a2b2a10dd7..1dcd9e2e82d8c2041158322f6a4b5ce7545203f9 100644 (file)
@@ -59,32 +59,71 @@ class ArvadosModel < ActiveRecord::Base
     self.columns.select { |col| col.name == attr.to_s }.first
   end
 
-  # def eager_load_associations
-  #   self.class.columns.each do |col|
-  #     re = col.name.match /^(.*)_kind$/
-  #     if (re and
-  #         self.respond_to? re[1].to_sym and
-  #         (auuid = self.send((re[1] + '_uuid').to_sym)) and
-  #         (aclass = self.class.kind_class(self.send(col.name.to_sym))) and
-  #         (aobject = aclass.where('uuid=?', auuid).first))
-  #       self.instance_variable_set('@'+re[1], aobject)
-  #     end
-  #   end
-  # end
-
-  def self.readable_by user
-    uuid_list = [user.uuid, *user.groups_i_can(:read)]
-    sanitized_uuid_list = uuid_list.
-      collect { |uuid| sanitize(uuid) }.join(', ')
-    or_references_me = ''
-    if self == Link and user
-      or_references_me = "OR (#{table_name}.link_class in (#{sanitize 'permission'}, #{sanitize 'resources'}) AND #{sanitize user.uuid} IN (#{table_name}.head_uuid, #{table_name}.tail_uuid))"
+  # Return a query with read permissions restricted to the union of of the
+  # permissions of the members of users_list, i.e. if something is readable by
+  # any user in users_list, it will be readable in the query returned by this
+  # function.
+  def self.readable_by(*users_list)
+    # Get rid of troublesome nils
+    users_list.compact!
+
+    # Check if any of the users are admin.  If so, we're done.
+    if users_list.select { |u| u.is_admin }.empty?
+
+      # Collect the uuids for each user and any groups readable by each user.
+      user_uuids = users_list.map { |u| u.uuid }
+      uuid_list = user_uuids + users_list.flat_map { |u| u.groups_i_can(:read) }
+      sanitized_uuid_list = uuid_list.
+        collect { |uuid| sanitize(uuid) }.join(', ')
+      sql_conds = []
+      sql_params = []
+      or_object_uuid = ''
+
+      # This row is owned by a member of users_list, or owned by a group
+      # readable by a member of users_list
+      # or
+      # This row uuid is the uuid of a member of users_list
+      # or
+      # A permission link exists ('write' and 'manage' implicitly include
+      # 'read') from a member of users_list, or a group readable by users_list,
+      # to this row, or to the owner of this row (see join() below).
+      sql_conds += ["#{table_name}.owner_uuid in (?)",
+                    "#{table_name}.uuid in (?)",
+                    "permissions.head_uuid IS NOT NULL"]
+      sql_params += [uuid_list, user_uuids]
+
+      if self == Link and users_list.any?
+        # This row is a 'permission' or 'resources' link class
+        # The uuid for a member of users_list is referenced in either the head
+        # or tail of the link
+        sql_conds += ["(#{table_name}.link_class in (#{sanitize 'permission'}, #{sanitize 'resources'}) AND (#{table_name}.head_uuid IN (?) OR #{table_name}.tail_uuid IN (?)))"]
+        sql_params += [user_uuids, user_uuids]
+      end
+
+      if self == Log and users_list.any?
+        # Link head points to the object described by this row
+        or_object_uuid = ", #{table_name}.object_uuid"
+
+        # This object described by this row is owned by this user, or owned by a group readable by this user
+        sql_conds += ["#{table_name}.object_owner_uuid in (?)"]
+        sql_params += [uuid_list]
+      end
+
+      # Link head points to this row, or to the owner of this row (the thing to be read)
+      #
+      # Link tail originates from this user, or a group that is readable by this
+      # user (the identity with authorization to read)
+      #
+      # Link class is 'permission' ('write' and 'manage' implicitly include 'read')
+
+      joins("LEFT JOIN links permissions ON permissions.head_uuid in (#{table_name}.owner_uuid, #{table_name}.uuid #{or_object_uuid}) AND permissions.tail_uuid in (#{sanitized_uuid_list}) AND permissions.link_class='permission'")
+        .where(sql_conds.join(' OR '), *sql_params).uniq
+
+    else
+      # At least one user is admin, so don't bother to apply any restrictions.
+      self
     end
-    joins("LEFT JOIN links permissions ON permissions.head_uuid in (#{table_name}.owner_uuid, #{table_name}.uuid) AND permissions.tail_uuid in (#{sanitized_uuid_list}) AND permissions.link_class='permission'").
-      where("?=? OR #{table_name}.owner_uuid in (?) OR #{table_name}.uuid=? OR permissions.head_uuid IS NOT NULL #{or_references_me}",
-            true, user.is_admin,
-            uuid_list,
-            user.uuid)
+
   end
 
   def logged_attributes
@@ -251,7 +290,7 @@ class ArvadosModel < ActiveRecord::Base
       self.class.kind
     end
 
-    def self.readable_by (u)
+    def self.readable_by (*u)
       self
     end
 
@@ -293,6 +332,7 @@ class ArvadosModel < ActiveRecord::Base
     log = Log.new(event_type: event_type).fill_object(self)
     yield log
     log.save!
+    connection.execute "NOTIFY logs, '#{log.id}'"
     log_start_state
   end
 
index f8e337b201018a7f2638fa551e1b9304e6334211..e7e5c1a83376feaf501f7d94ed2cea907ce5877d 100644 (file)
@@ -8,7 +8,7 @@ class Log < ArvadosModel
 
   api_accessible :user, extend: :common do |t|
     t.add :object_uuid
-    t.add :object, :if => :object
+    t.add :object_owner_uuid
     t.add :object_kind
     t.add :event_at
     t.add :event_type
@@ -24,6 +24,7 @@ class Log < ArvadosModel
 
   def fill_object(thing)
     self.object_uuid ||= thing.uuid
+    self.object_owner_uuid = thing.owner_uuid
     self.summary ||= "#{self.event_type} of #{thing.uuid}"
     self
   end
@@ -69,4 +70,5 @@ class Log < ArvadosModel
   def ensure_valid_uuids
     # logs can have references to deleted objects
   end
+
 end
index 9162fc444585d9fa1e59ade9f0df7e144f81cbda..2705fa1a74044ea12ab3898d9746297552daaf9f 100644 (file)
@@ -39,3 +39,10 @@ test:
 common:
   #git_repositories_dir: /var/cache/git
   #git_internal_dir: /var/cache/arvados/internal.git
+
+  # You can run the websocket server separately from the regular HTTP service
+  # by setting "ARVADOS_WEBSOCKETS=ws-only" in the environment before running
+  # the websocket server.  When you do this, you need to set the following
+  # configuration variable so that the primary server can give out the correct
+  # address of the dedicated websocket server:
+  #websocket_address: wss://websocket.local/websocket
diff --git a/services/api/config/initializers/authorization.rb b/services/api/config/initializers/authorization.rb
new file mode 100644 (file)
index 0000000..e491e40
--- /dev/null
@@ -0,0 +1,5 @@
+Server::Application.configure do
+  config.middleware.insert_before ActionDispatch::Static, ArvadosApiToken
+  config.middleware.delete ActionDispatch::RemoteIp
+  config.middleware.insert_before ArvadosApiToken, ActionDispatch::RemoteIp
+end
diff --git a/services/api/config/initializers/eventbus.rb b/services/api/config/initializers/eventbus.rb
new file mode 100644 (file)
index 0000000..7da8ade
--- /dev/null
@@ -0,0 +1,18 @@
+require 'eventbus'
+
+Server::Application.configure do
+  # Enables websockets if ARVADOS_WEBSOCKETS is defined with any value.  If
+  # ARVADOS_WEBSOCKETS=ws-only, server will only accept websocket connections
+  # and return an error response for all other requests.
+  if ENV['ARVADOS_WEBSOCKETS']
+    config.middleware.insert_after ArvadosApiToken, RackSocket, {
+      :handler => EventBus,
+      :mount => "/websocket",
+      :websocket_only => (ENV['ARVADOS_WEBSOCKETS'] == "ws-only")
+    }
+  end
+
+  # Define websocket_address configuration option, can be overridden in config files.
+  # See application.yml.example for details.
+  config.websocket_address = nil
+end
diff --git a/services/api/db/migrate/20140423132913_add_object_owner_to_logs.rb b/services/api/db/migrate/20140423132913_add_object_owner_to_logs.rb
new file mode 100644 (file)
index 0000000..7fa4702
--- /dev/null
@@ -0,0 +1,21 @@
+class AddObjectOwnerToLogs < ActiveRecord::Migration
+  include CurrentApiClient
+
+  def up
+    add_column :logs, :object_owner_uuid, :string
+    act_as_system_user do
+      Log.all.each do |log|
+        if log.properties[:new_attributes]
+          log.object_owner_uuid = log.properties[:new_attributes][:owner_uuid]
+        elsif log.properties[:old_attributes]
+          log.object_owner_uuid = log.properties[:old_attributes][:owner_uuid]
+        end
+        log.save!
+      end
+    end
+  end
+
+  def down
+    remove_column :logs, :object_owner_uuid
+  end
+end
index 034ee35e5f8d954ae40be045892b7ddb60f9e0af..c4cef7c6fb36c5159016dcfdcb6c2ee66f1691ed 100644 (file)
@@ -268,6 +268,7 @@ ActiveRecord::Schema.define(:version => 20140423133559) do
     t.datetime "created_at",              :null => false
     t.datetime "updated_at",              :null => false
     t.datetime "modified_at"
+    t.string   "object_owner_uuid"
   end
 
   add_index "logs", ["created_at"], :name => "index_logs_on_created_at"
index 0803d5464d441425c80dc89aeb8be88c96534953..8e7d7fca27e4f172ef92bf76ae16156e49827d52 100644 (file)
@@ -29,19 +29,6 @@ module CurrentApiClient
     Thread.current[:api_client_ip_address]
   end
 
-  # Is the current API client authorization scoped for the request?
-  def current_api_client_auth_has_scope(req_s)
-    (current_api_client_authorization.andand.scopes || []).select { |scope|
-      if scope == 'all'
-        true
-      elsif scope.end_with? '/'
-        req_s.start_with? scope
-      else
-        req_s == scope
-      end
-    }.any?
-  end
-
   def system_user_uuid
     [Server::Application.config.uuid_prefix,
      User.uuid_prefix,
diff --git a/services/api/lib/eventbus.rb b/services/api/lib/eventbus.rb
new file mode 100644 (file)
index 0000000..0b8cae2
--- /dev/null
@@ -0,0 +1,226 @@
+require 'eventmachine'
+require 'oj'
+require 'faye/websocket'
+require 'record_filters'
+require 'load_param'
+
+# Patch in user, last_log_id and filters fields into the Faye::Websocket class.
+module Faye
+  class WebSocket
+    attr_accessor :user
+    attr_accessor :last_log_id
+    attr_accessor :filters
+  end
+end
+
+# Store the filters supplied by the user that will be applied to the logs table
+# to determine which events to return to the listener.
+class Filter
+  include LoadParam
+
+  attr_accessor :filters
+
+  def initialize p
+    @params = p
+    load_filters_param
+  end
+
+  def params
+    @params
+  end
+end
+
+# Manages websocket connections, accepts subscription messages and publishes
+# log table events.
+class EventBus
+  include CurrentApiClient
+  include RecordFilters
+
+  # used in RecordFilters
+  def model_class
+    Log
+  end
+
+  # Initialize EventBus.  Takes no parameters.
+  def initialize
+    @channel = EventMachine::Channel.new
+    @mtx = Mutex.new
+    @bgthread = false
+  end
+
+  # Push out any pending events to the connection +ws+
+  # +id+  the id of the most recent row in the log table, may be nil
+  def push_events ws, id = nil
+      begin
+        # Must have at least one filter set up to receive events
+        if ws.filters.length > 0
+          # Start with log rows readable by user, sorted in ascending order
+          logs = Log.readable_by(ws.user).order("id asc")
+
+          if ws.last_log_id
+            # Client is only interested in log rows that are newer than the
+            # last log row seen by the client.
+            logs = logs.where("logs.id > ?", ws.last_log_id)
+          elsif id
+            # No last log id, so only look at the most recently changed row
+            logs = logs.where("logs.id = ?", id.to_i)
+          else
+            return
+          end
+
+          # Now process filters provided by client
+          cond_out = []
+          param_out = []
+          ws.filters.each do |filter|
+            ft = record_filters filter.filters, Log.table_name
+            cond_out += ft[:cond_out]
+            param_out += ft[:param_out]
+          end
+
+          # Add filters to query
+          if cond_out.any?
+            logs = logs.where(cond_out.join(' OR '), *param_out)
+          end
+
+          # Finally execute query and actually send the matching log rows
+          logs.each do |l|
+            ws.send(l.as_api_response.to_json)
+            ws.last_log_id = l.id
+          end
+        elsif id
+          # No filters set up, so just record the sequence number
+          ws.last_log_id = id.to_i
+        end
+      rescue Exception => e
+        puts "Error publishing event: #{$!}"
+        puts "Backtrace:\n\t#{e.backtrace.join("\n\t")}"
+        ws.send ({status: 500, message: 'error'}.to_json)
+        ws.close
+      end
+  end
+
+  # Handle inbound subscribe or unsubscribe message.
+  def handle_message ws, event
+    begin
+      # Parse event data as JSON
+      p = (Oj.load event.data).symbolize_keys
+
+      if p[:method] == 'subscribe'
+        # Handle subscribe event
+
+        if p[:last_log_id]
+          # Set or reset the last_log_id.  The event bus only reports events
+          # for rows that come after last_log_id.
+          ws.last_log_id = p[:last_log_id].to_i
+        end
+
+        if ws.filters.length < MAX_FILTERS
+          # Add a filter.  This gets the :filters field which is the same
+          # format as used for regular index queries.
+          ws.filters << Filter.new(p)
+          ws.send ({status: 200, message: 'subscribe ok'}.to_json)
+
+          # Send any pending events
+          push_events ws
+        else
+          ws.send ({status: 403, message: "maximum of #{MAX_FILTERS} filters allowed per connection"}.to_json)
+        end
+
+      elsif p[:method] == 'unsubscribe'
+        # Handle unsubscribe event
+
+        len = ws.filters.length
+        ws.filters.select! { |f| not ((f.filters == p[:filters]) or (f.filters.empty? and p[:filters].nil?)) }
+        if ws.filters.length < len
+          ws.send ({status: 200, message: 'unsubscribe ok'}.to_json)
+        else
+          ws.send ({status: 404, message: 'filter not found'}.to_json)
+        end
+
+      else
+        ws.send ({status: 400, message: "missing or unrecognized method"}.to_json)
+      end
+    rescue Oj::Error => e
+      ws.send ({status: 400, message: "malformed request"}.to_json)
+    rescue Exception => e
+      puts "Error handling message: #{$!}"
+      puts "Backtrace:\n\t#{e.backtrace.join("\n\t")}"
+      ws.send ({status: 500, message: 'error'}.to_json)
+      ws.close
+    end
+  end
+
+  # Constant maximum number of filters, to avoid silly huge database queries.
+  MAX_FILTERS = 16
+
+  # Called by RackSocket when a new websocket connection has been established.
+  def on_connect ws
+
+    # Disconnect if no valid API token.
+    # current_user is included from CurrentApiClient
+    if not current_user
+      ws.send ({status: 401, message: "Valid API token required"}.to_json)
+      ws.close
+      return
+    end
+
+    # Initialize our custom fields on the websocket connection object.
+    ws.user = current_user
+    ws.filters = []
+    ws.last_log_id = nil
+
+    # Subscribe to internal postgres notifications through @channel.  This will
+    # call push_events when a notification comes through.
+    sub = @channel.subscribe do |msg|
+      push_events ws, msg
+    end
+
+    # Set up callback for inbound message dispatch.
+    ws.on :message do |event|
+      handle_message ws, event
+    end
+
+    # Set up socket close callback
+    ws.on :close do |event|
+      @channel.unsubscribe sub
+      ws = nil
+    end
+
+    # Start up thread to monitor the Postgres database, if none exists already.
+    @mtx.synchronize do
+      unless @bgthread
+        @bgthread = true
+        Thread.new do
+          # from http://stackoverflow.com/questions/16405520/postgres-listen-notify-rails
+          ActiveRecord::Base.connection_pool.with_connection do |connection|
+            conn = connection.instance_variable_get(:@connection)
+            begin
+              conn.async_exec "LISTEN logs"
+              while true
+                # wait_for_notify will block until there is a change
+                # notification from Postgres about the logs table, then push
+                # the notification into the EventMachine channel.  Each
+                # websocket connection subscribes to the other end of the
+                # channel and calls #push_events to actually dispatch the
+                # events to the client.
+                conn.wait_for_notify do |channel, pid, payload|
+                  @channel.push payload
+                end
+              end
+            ensure
+              # Don't want the connection to still be listening once we return
+              # it to the pool - could result in weird behavior for the next
+              # thread to check it out.
+              conn.async_exec "UNLISTEN *"
+            end
+          end
+          @bgthread = false
+        end
+      end
+    end
+
+    # Since EventMachine is an asynchronous event based dispatcher, #on_connect
+    # does not block but instead returns immediately after having set up the
+    # websocket and notification channel callbacks.
+  end
+end
diff --git a/services/api/lib/load_param.rb b/services/api/lib/load_param.rb
new file mode 100644 (file)
index 0000000..e01063d
--- /dev/null
@@ -0,0 +1,88 @@
+# Mixin module for reading out query parameters from request params.
+#
+# Expects:
+#   +params+ Hash
+# Sets:
+#   @where, @filters, @limit, @offset, @orders
+module LoadParam
+
+  # Default limit on number of rows to return in a single query.
+  DEFAULT_LIMIT = 100
+
+  # Load params[:where] into @where
+  def load_where_param
+    if params[:where].nil? or params[:where] == ""
+      @where = {}
+    elsif params[:where].is_a? Hash
+      @where = params[:where]
+    elsif params[:where].is_a? String
+      begin
+        @where = Oj.load(params[:where])
+        raise unless @where.is_a? Hash
+      rescue
+        raise ArgumentError.new("Could not parse \"where\" param as an object")
+      end
+    end
+    @where = @where.with_indifferent_access
+  end
+
+  # Load params[:filters] into @filters
+  def load_filters_param
+    @filters ||= []
+    if params[:filters].is_a? Array
+      @filters += params[:filters]
+    elsif params[:filters].is_a? String and !params[:filters].empty?
+      begin
+        f = Oj.load params[:filters]
+        raise unless f.is_a? Array
+        @filters += f
+      rescue
+        raise ArgumentError.new("Could not parse \"filters\" param as an array")
+      end
+    end
+  end
+
+  def default_orders
+    ["#{table_name}.modified_at desc"]
+  end
+
+  # Load params[:limit], params[:offset] and params[:order]
+  # into @limit, @offset, @orders
+  def load_limit_offset_order_params
+    if params[:limit]
+      unless params[:limit].to_s.match(/^\d+$/)
+        raise ArgumentError.new("Invalid value for limit parameter")
+      end
+      @limit = params[:limit].to_i
+    else
+      @limit = DEFAULT_LIMIT
+    end
+
+    if params[:offset]
+      unless params[:offset].to_s.match(/^\d+$/)
+        raise ArgumentError.new("Invalid value for offset parameter")
+      end
+      @offset = params[:offset].to_i
+    else
+      @offset = 0
+    end
+
+    @orders = []
+    if params[:order]
+      params[:order].split(',').each do |order|
+        attr, direction = order.strip.split " "
+        direction ||= 'asc'
+        if attr.match /^[a-z][_a-z0-9]+$/ and
+            model_class.columns.collect(&:name).index(attr) and
+            ['asc','desc'].index direction.downcase
+          @orders << "#{table_name}.#{attr} #{direction.downcase}"
+        end
+      end
+    end
+    if @orders.empty?
+      @orders << default_orders
+    end
+  end
+
+
+end
diff --git a/services/api/lib/record_filters.rb b/services/api/lib/record_filters.rb
new file mode 100644 (file)
index 0000000..d7e556b
--- /dev/null
@@ -0,0 +1,75 @@
+# Mixin module providing a method to convert filters into a list of SQL
+# fragments suitable to be fed to ActiveRecord #where.
+#
+# Expects:
+#   model_class
+# Operates on:
+#   @objects
+module RecordFilters
+
+  # Input:
+  # +filters+  Arvados filters as list of lists.
+  # +ar_table_name+  name of SQL table
+  #
+  # Output:
+  # Hash with two keys:
+  # :cond_out  array of SQL fragments for each filter expression
+  # :param_out  array of values for parameter substitution in cond_out
+  def record_filters filters, ar_table_name
+    cond_out = []
+    param_out = []
+
+    filters.each do |filter|
+      attr, operator, operand = filter
+      if !filter.is_a? Array
+        raise ArgumentError.new("Invalid element in filters array: #{filter.inspect} is not an array")
+      elsif !operator.is_a? String
+        raise ArgumentError.new("Invalid operator '#{operator}' (#{operator.class}) in filter")
+      elsif !model_class.searchable_columns(operator).index attr.to_s
+        raise ArgumentError.new("Invalid attribute '#{attr}' in filter")
+      end
+      case operator.downcase
+      when '=', '<', '<=', '>', '>=', 'like'
+        if operand.is_a? String
+          cond_out << "#{ar_table_name}.#{attr} #{operator} ?"
+          if (# any operator that operates on value rather than
+              # representation:
+              operator.match(/[<=>]/) and
+              model_class.attribute_column(attr).type == :datetime)
+            operand = Time.parse operand
+          end
+          param_out << operand
+        elsif operand.nil? and operator == '='
+          cond_out << "#{ar_table_name}.#{attr} is null"
+        else
+          raise ArgumentError.new("Invalid operand type '#{operand.class}' "\
+                                  "for '#{operator}' operator in filters")
+        end
+      when 'in'
+        if operand.is_a? Array
+          cond_out << "#{ar_table_name}.#{attr} IN (?)"
+          param_out << operand
+        else
+          raise ArgumentError.new("Invalid operand type '#{operand.class}' "\
+                                  "for '#{operator}' operator in filters")
+        end
+      when 'is_a'
+        operand = [operand] unless operand.is_a? Array
+        cond = []
+        operand.each do |op|
+          cl = ArvadosModel::kind_class op
+          if cl
+            cond << "#{ar_table_name}.#{attr} like ?"
+            param_out << cl.uuid_like_pattern
+          else
+            cond << "1=0"
+          end
+        end
+        cond_out << cond.join(' OR ')
+      end
+    end
+
+    {:cond_out => cond_out, :param_out => param_out}
+  end
+
+end
index f772c4fd1303ee15fac20e26c98170c6820a393e..9901ec4f038390c6364a6df467260d80afbb51e5 100644 (file)
@@ -36,6 +36,13 @@ active_trustedclient:
   api_token: 27bnddk6x2nmq00a1e3gq43n9tsl5v87a3faqar2ijj8tud5en
   expires_at: 2038-01-01 00:00:00
 
+active_noscope:
+  api_client: untrusted
+  user: active
+  api_token: activenoscopeabcdefghijklmnopqrstuvwxyz12345678901
+  expires_at: 2038-01-01 00:00:00
+  scopes: []
+
 admin_vm:
   api_client: untrusted
   user: admin
@@ -79,6 +86,14 @@ spectator:
   api_token: zw2f4gwx8hw8cjre7yp6v1zylhrhn3m5gvjq73rtpwhmknrybu
   expires_at: 2038-01-01 00:00:00
 
+spectator_specimens:
+  api_client: untrusted
+  user: spectator
+  api_token: spectatorspecimensabcdefghijklmnopqrstuvwxyz123245
+  expires_at: 2038-01-01 00:00:00
+  scopes: ["GET /arvados/v1/specimens", "GET /arvados/v1/specimens/",
+           "POST /arvados/v1/specimens"]
+
 inactive:
   api_client: untrusted
   user: inactive
index d805439a6c34ee2e931dd9508eeb28c137e592a4..f1ba81d7908721c261e23ce9c958cb3c898a3e50 100644 (file)
@@ -1,3 +1,32 @@
 log1:
+  id: 1
   uuid: zzzzz-xxxxx-pshmckwoma9plh7
-  object_uuid: zzzzz-tpzed-l1s2piq4t4mps8r
\ No newline at end of file
+  object_uuid: zzzzz-tpzed-l1s2piq4t4mps8r
+
+log2: # admin changes repository2, which is owned by active user
+  id: 2
+  uuid: zzzzz-xxxxx-pshmckwoma00002
+  owner_uuid: zzzzz-tpzed-d9tiejq69daie8f # admin user
+  object_uuid: zzzzz-2x53u-382brsig8rp3667 # repository foo
+  object_owner_uuid: zzzzz-tpzed-xurymjxw79nv3jz # active user
+
+log3: # admin changes specimen owned_by_spectator
+  id: 3
+  uuid: zzzzz-xxxxx-pshmckwoma00003
+  owner_uuid: zzzzz-tpzed-d9tiejq69daie8f # admin user
+  object_uuid: zzzzz-2x53u-3b0xxwzlbzxq5yr # specimen owned_by_spectator
+  object_owner_uuid: zzzzz-tpzed-l1s2piq4t4mps8r # spectator user
+
+log4: # foo collection added, readable by active through link
+  id: 4
+  uuid: zzzzz-xxxxx-pshmckwoma00004
+  owner_uuid: zzzzz-tpzed-000000000000000 # system user
+  object_uuid: 1f4b0bc7583c2a7f9102c395f4ffc5e3+45 # foo file
+  object_owner_uuid: zzzzz-tpzed-000000000000000 # system user
+
+log5: # baz collection added, readable by active and spectator through group 'all users' group membership
+  id: 5
+  uuid: zzzzz-xxxxx-pshmckwoma00005
+  owner_uuid: zzzzz-tpzed-000000000000000 # system user
+  object_uuid: ea10d51bcf88862dbcc36eb292017dfd+45 # baz file
+  object_owner_uuid: zzzzz-tpzed-000000000000000 # system user
index ec3755de04f6853d1ee344ba9fd3e9e2b7f12b3d..62a153d898f41b5b60b07dd34739c98f22bdfd61 100644 (file)
@@ -1,4 +1,9 @@
 foo:
   uuid: zzzzz-2x53u-382brsig8rp3666
-  owner_uuid: zzzzz-tpzed-xurymjxw79nv3jz
+  owner_uuid: zzzzz-tpzed-xurymjxw79nv3jz # active user
   name: foo
+
+repository2:
+  uuid: zzzzz-2x53u-382brsig8rp3667
+  owner_uuid: zzzzz-tpzed-xurymjxw79nv3jz # active user
+  name: foo2
index ba916708222bcef7d8495803ae6b652d07bc345f..20f83dc0ffce532856ab75df3af5b08193496edc 100644 (file)
@@ -7,84 +7,65 @@ require 'test_helper'
 class Arvados::V1::ApiTokensScopeTest < ActionController::IntegrationTest
   fixtures :all
 
-  def setup
-    @token = {}
-  end
-
-  def auth_with(name)
-    @token = {api_token: api_client_authorizations(name).api_token}
-  end
-
   def v1_url(*parts)
     (['arvados', 'v1'] + parts).join('/')
   end
 
-  def request_with_auth(method, path, params={})
-    send(method, path, @token.merge(params))
-  end
-
-  def get_with_auth(*args)
-    request_with_auth(:get_via_redirect, *args)
-  end
-
-  def post_with_auth(*args)
-    request_with_auth(:post_via_redirect, *args)
-  end
-
   test "user list token can only list users" do
-    auth_with :active_userlist
-    get_with_auth v1_url('users')
+    get_args = [{}, auth(:active_userlist)]
+    get(v1_url('users'), *get_args)
     assert_response :success
-    get_with_auth v1_url('users', '')  # Add trailing slash.
+    get(v1_url('users', ''), *get_args)  # Add trailing slash.
     assert_response :success
-    get_with_auth v1_url('users', 'current')
+    get(v1_url('users', 'current'), *get_args)
     assert_response 403
-    get_with_auth v1_url('virtual_machines')
+    get(v1_url('virtual_machines'), *get_args)
     assert_response 403
   end
 
   test "specimens token can see exactly owned specimens" do
-    auth_with :active_specimens
-    get_with_auth v1_url('specimens')
+    get_args = [{}, auth(:active_specimens)]
+    get(v1_url('specimens'), *get_args)
     assert_response 403
-    get_with_auth v1_url('specimens', specimens(:owned_by_active_user).uuid)
+    get(v1_url('specimens', specimens(:owned_by_active_user).uuid), *get_args)
     assert_response :success
-    get_with_auth v1_url('specimens', specimens(:owned_by_spectator).uuid)
+    get(v1_url('specimens', specimens(:owned_by_spectator).uuid), *get_args)
     assert_includes(403..404, @response.status)
   end
 
   test "token with multiple scopes can use them all" do
     def get_token_count
-      get_with_auth v1_url('api_client_authorizations')
+      get(v1_url('api_client_authorizations'), {}, auth(:active_apitokens))
       assert_response :success
       token_count = JSON.parse(@response.body)['items_available']
       assert_not_nil(token_count, "could not find token count")
       token_count
     end
-    auth_with :active_apitokens
     # Test the GET scope.
     token_count = get_token_count
     # Test the POST scope.
-    post_with_auth(v1_url('api_client_authorizations'),
-                   api_client_authorization: {user_id: users(:active).id})
+    post(v1_url('api_client_authorizations'),
+         {api_client_authorization: {user_id: users(:active).id}},
+         auth(:active_apitokens))
     assert_response :success
     assert_equal(token_count + 1, get_token_count,
                  "token count suggests POST was not accepted")
     # Test other requests are denied.
-    get_with_auth v1_url('api_client_authorizations',
-                         api_client_authorizations(:active_apitokens).uuid)
+    get(v1_url('api_client_authorizations',
+               api_client_authorizations(:active_apitokens).uuid),
+        {}, auth(:active_apitokens))
     assert_response 403
   end
 
   test "token without scope has no access" do
     # Logs are good for this test, because logs have relatively
     # few access controls enforced at the model level.
-    auth_with :admin_noscope
-    get_with_auth v1_url('logs')
+    req_args = [{}, auth(:admin_noscope)]
+    get(v1_url('logs'), *req_args)
     assert_response 403
-    get_with_auth v1_url('logs', logs(:log1).uuid)
+    get(v1_url('logs', logs(:log1).uuid), *req_args)
     assert_response 403
-    post_with_auth(v1_url('logs'), log: {})
+    post(v1_url('logs'), *req_args)
     assert_response 403
   end
 
@@ -94,10 +75,11 @@ class Arvados::V1::ApiTokensScopeTest < ActionController::IntegrationTest
     def vm_logins_url(name)
       v1_url('virtual_machines', virtual_machines(name).uuid, 'logins')
     end
-    auth_with :admin_vm
-    get_with_auth vm_logins_url(:testvm)
+    get_args = [{}, auth(:admin_vm)]
+    get(vm_logins_url(:testvm), *get_args)
     assert_response :success
-    get_with_auth vm_logins_url(:testvm2)
-    assert(@response.status >= 400, "getting testvm2 logins should have failed")
+    get(vm_logins_url(:testvm2), *get_args)
+    assert_includes(400..419, @response.status,
+                    "getting testvm2 logins should have failed")
   end
 end
diff --git a/services/api/test/integration/reader_tokens_test.rb b/services/api/test/integration/reader_tokens_test.rb
new file mode 100644 (file)
index 0000000..6ed8461
--- /dev/null
@@ -0,0 +1,85 @@
+require 'test_helper'
+
+class Arvados::V1::ReaderTokensTest < ActionController::IntegrationTest
+  fixtures :all
+
+  def spectator_specimen
+    specimens(:owned_by_spectator).uuid
+  end
+
+  def get_specimens(main_auth, read_auth, formatter=:to_a)
+    params = {}
+    params[:reader_tokens] = [api_token(read_auth)].send(formatter) if read_auth
+    headers = {}
+    headers.merge!(auth(main_auth)) if main_auth
+    get('/arvados/v1/specimens', params, headers)
+  end
+
+  def get_specimen_uuids(main_auth, read_auth, formatter=:to_a)
+    get_specimens(main_auth, read_auth, formatter)
+    assert_response :success
+    json_response['items'].map { |spec| spec['uuid'] }
+  end
+
+  def assert_post_denied(main_auth, read_auth, formatter=:to_a)
+    if main_auth
+      headers = auth(main_auth)
+      expected = 403
+    else
+      headers = {}
+      expected = 401
+    end
+    post('/arvados/v1/specimens.json',
+         {specimen: {}, reader_tokens: [api_token(read_auth)].send(formatter)},
+         headers)
+    assert_response expected
+  end
+
+  test "active user can't see spectator specimen" do
+    # Other tests in this suite assume that the active user doesn't
+    # have read permission to the owned_by_spectator specimen.
+    # This test checks that this assumption still holds.
+    refute_includes(get_specimen_uuids(:active, nil), spectator_specimen,
+                    ["active user can read the owned_by_spectator specimen",
+                     "other tests will return false positives"].join(" - "))
+  end
+
+  [nil, :active_noscope].each do |main_auth|
+    [:spectator, :spectator_specimens].each do |read_auth|
+      test "#{main_auth} auth with reader token #{read_auth} can read" do
+        assert_includes(get_specimen_uuids(main_auth, read_auth),
+                        spectator_specimen, "did not find spectator specimen")
+      end
+
+      test "#{main_auth} auth with JSON read token #{read_auth} can read" do
+        assert_includes(get_specimen_uuids(main_auth, read_auth, :to_json),
+                        spectator_specimen, "did not find spectator specimen")
+      end
+
+      test "#{main_auth} auth with reader token #{read_auth} can't write" do
+        assert_post_denied(main_auth, read_auth)
+      end
+
+      test "#{main_auth} auth with JSON read token #{read_auth} can't write" do
+        assert_post_denied(main_auth, read_auth, :to_json)
+      end
+    end
+  end
+
+  test "scopes are still limited with reader tokens" do
+    get('/arvados/v1/collections',
+        {reader_tokens: [api_token(:spectator_specimens)]},
+        auth(:active_noscope))
+    assert_response 403
+  end
+
+  test "reader tokens grant no permissions when expired" do
+    get_specimens(:active_noscope, :expired)
+    assert_response 403
+  end
+
+  test "reader tokens grant no permissions outside their scope" do
+    refute_includes(get_specimen_uuids(:active, :admin_vm), spectator_specimen,
+                    "scoped reader token granted permissions out of scope")
+  end
+end
diff --git a/services/api/test/integration/websocket_test.rb b/services/api/test/integration/websocket_test.rb
new file mode 100644 (file)
index 0000000..9ce53a6
--- /dev/null
@@ -0,0 +1,570 @@
+require 'test_helper'
+require 'websocket_runner'
+require 'oj'
+require 'database_cleaner'
+
+DatabaseCleaner.strategy = :truncation
+
+class WebsocketTest < ActionDispatch::IntegrationTest
+  self.use_transactional_fixtures = false
+
+  setup do
+    DatabaseCleaner.start
+  end
+
+  teardown do
+    DatabaseCleaner.clean
+  end
+
+  def ws_helper (token = nil, timeout = true)
+    opened = false
+    close_status = nil
+    too_long = false
+
+    EM.run {
+      if token
+        ws = Faye::WebSocket::Client.new("ws://localhost:3002/websocket?api_token=#{api_client_authorizations(token).api_token}")
+      else
+        ws = Faye::WebSocket::Client.new("ws://localhost:3002/websocket")
+      end
+
+      ws.on :open do |event|
+        opened = true
+        if timeout
+          EM::Timer.new 3 do
+            too_long = true
+            EM.stop_event_loop
+          end
+        end
+      end
+
+      ws.on :close do |event|
+        close_status = [:close, event.code, event.reason]
+        EM.stop_event_loop
+      end
+
+      yield ws
+    }
+
+    assert opened, "Should have opened web socket"
+    assert (not too_long), "Test took too long"
+    assert_equal 1000, close_status[1], "Connection closed unexpectedly (check log for errors)"
+  end
+
+  test "connect with no token" do
+    status = nil
+
+    ws_helper do |ws|
+      ws.on :message do |event|
+        d = Oj.load event.data
+        status = d["status"]
+        ws.close
+      end
+    end
+
+    assert_equal 401, status
+  end
+
+
+  test "connect, subscribe and get response" do
+    status = nil
+
+    ws_helper :admin do |ws|
+      ws.on :open do |event|
+        ws.send ({method: 'subscribe'}.to_json)
+      end
+
+      ws.on :message do |event|
+        d = Oj.load event.data
+        status = d["status"]
+        ws.close
+      end
+    end
+
+    assert_equal 200, status
+  end
+
+  test "connect, subscribe, get event" do
+    state = 1
+    spec = nil
+    ev_uuid = nil
+
+    authorize_with :admin
+
+    ws_helper :admin do |ws|
+      ws.on :open do |event|
+        ws.send ({method: 'subscribe'}.to_json)
+      end
+
+      ws.on :message do |event|
+        d = Oj.load event.data
+        case state
+        when 1
+          assert_equal 200, d["status"]
+          spec = Specimen.create
+          state = 2
+        when 2
+          ev_uuid = d["object_uuid"]
+          ws.close
+        end
+      end
+
+    end
+
+    assert_not_nil spec
+    assert_equal spec.uuid, ev_uuid
+  end
+
+  test "connect, subscribe, get two events" do
+    state = 1
+    spec = nil
+    human = nil
+    spec_ev_uuid = nil
+    human_ev_uuid = nil
+
+    authorize_with :admin
+
+    ws_helper :admin do |ws|
+      ws.on :open do |event|
+        ws.send ({method: 'subscribe'}.to_json)
+      end
+
+      ws.on :message do |event|
+        d = Oj.load event.data
+        case state
+        when 1
+          assert_equal 200, d["status"]
+          spec = Specimen.create
+          human = Human.create
+          state = 2
+        when 2
+          spec_ev_uuid = d["object_uuid"]
+          state = 3
+        when 3
+          human_ev_uuid = d["object_uuid"]
+          state = 4
+          ws.close
+        when 4
+          assert false, "Should not get any more events"
+        end
+      end
+
+    end
+
+    assert_not_nil spec
+    assert_not_nil human
+    assert_equal spec.uuid, spec_ev_uuid
+    assert_equal human.uuid, human_ev_uuid
+  end
+
+  test "connect, subscribe, filter events" do
+    state = 1
+    human = nil
+    human_ev_uuid = nil
+
+    authorize_with :admin
+
+    ws_helper :admin do |ws|
+      ws.on :open do |event|
+        ws.send ({method: 'subscribe', filters: [['object_uuid', 'is_a', 'arvados#human']]}.to_json)
+      end
+
+      ws.on :message do |event|
+        d = Oj.load event.data
+        case state
+        when 1
+          assert_equal 200, d["status"]
+          Specimen.create
+          human = Human.create
+          state = 2
+        when 2
+          human_ev_uuid = d["object_uuid"]
+          state = 3
+          ws.close
+        when 3
+          assert false, "Should not get any more events"
+        end
+      end
+
+    end
+
+    assert_not_nil human
+    assert_equal human.uuid, human_ev_uuid
+  end
+
+
+  test "connect, subscribe, multiple filters" do
+    state = 1
+    spec = nil
+    human = nil
+    spec_ev_uuid = nil
+    human_ev_uuid = nil
+
+    authorize_with :admin
+
+    ws_helper :admin do |ws|
+      ws.on :open do |event|
+        ws.send ({method: 'subscribe', filters: [['object_uuid', 'is_a', 'arvados#human']]}.to_json)
+        ws.send ({method: 'subscribe', filters: [['object_uuid', 'is_a', 'arvados#specimen']]}.to_json)
+      end
+
+      ws.on :message do |event|
+        d = Oj.load event.data
+        case state
+        when 1
+          assert_equal 200, d["status"]
+          state = 2
+        when 2
+          assert_equal 200, d["status"]
+          spec = Specimen.create
+          Trait.create # not part of filters, should not be received
+          human = Human.create
+          state = 3
+        when 3
+          spec_ev_uuid = d["object_uuid"]
+          state = 4
+        when 4
+          human_ev_uuid = d["object_uuid"]
+          state = 5
+          ws.close
+        when 5
+          assert false, "Should not get any more events"
+        end
+      end
+
+    end
+
+    assert_not_nil spec
+    assert_not_nil human
+    assert_equal spec.uuid, spec_ev_uuid
+    assert_equal human.uuid, human_ev_uuid
+  end
+
+  test "connect, subscribe, ask events starting at seq num" do
+    state = 1
+    human = nil
+    human_ev_uuid = nil
+
+    authorize_with :admin
+
+    lastid = logs(:log3).id
+    l1 = nil
+    l2 = nil
+
+    ws_helper :admin do |ws|
+      ws.on :open do |event|
+        ws.send ({method: 'subscribe', last_log_id: lastid}.to_json)
+      end
+
+      ws.on :message do |event|
+        d = Oj.load event.data
+        case state
+        when 1
+          assert_equal 200, d["status"]
+          state = 2
+        when 2
+          l1 = d["object_uuid"]
+          assert_not_nil l1, "Unexpected message: #{d}"
+          state = 3
+        when 3
+          l2 = d["object_uuid"]
+          assert_not_nil l2, "Unexpected message: #{d}"
+          state = 4
+          ws.close
+        when 4
+          assert false, "Should not get any more events"
+        end
+      end
+
+    end
+
+    assert_equal logs(:log4).object_uuid, l1
+    assert_equal logs(:log5).object_uuid, l2
+  end
+
+  test "connect, subscribe, get event, unsubscribe" do
+    state = 1
+    spec = nil
+    spec_ev_uuid = nil
+    filter_id = nil
+
+    authorize_with :admin
+
+    ws_helper :admin, false do |ws|
+      ws.on :open do |event|
+        ws.send ({method: 'subscribe'}.to_json)
+        EM::Timer.new 3 do
+          # Set a time limit on the test because after unsubscribing the server
+          # still has to process the next event (and then hopefully correctly
+          # decides not to send it because we unsubscribed.)
+          ws.close
+        end
+      end
+
+      ws.on :message do |event|
+        d = Oj.load event.data
+        case state
+        when 1
+          assert_equal 200, d["status"]
+          spec = Specimen.create
+          state = 2
+        when 2
+          spec_ev_uuid = d["object_uuid"]
+          ws.send ({method: 'unsubscribe'}.to_json)
+
+          EM::Timer.new 1 do
+            Specimen.create
+          end
+
+          state = 3
+        when 3
+          assert_equal 200, d["status"]
+          state = 4
+        when 4
+          assert false, "Should not get any more events"
+        end
+      end
+
+    end
+
+    assert_not_nil spec
+    assert_equal spec.uuid, spec_ev_uuid
+  end
+
+  test "connect, subscribe, get event, unsubscribe with filter" do
+    state = 1
+    spec = nil
+    spec_ev_uuid = nil
+
+    authorize_with :admin
+
+    ws_helper :admin, false do |ws|
+      ws.on :open do |event|
+        ws.send ({method: 'subscribe', filters: [['object_uuid', 'is_a', 'arvados#human']]}.to_json)
+        EM::Timer.new 3 do
+          # Set a time limit on the test because after unsubscribing the server
+          # still has to process the next event (and then hopefully correctly
+          # decides not to send it because we unsubscribed.)
+          ws.close
+        end
+      end
+
+      ws.on :message do |event|
+        d = Oj.load event.data
+        case state
+        when 1
+          assert_equal 200, d["status"]
+          spec = Human.create
+          state = 2
+        when 2
+          spec_ev_uuid = d["object_uuid"]
+          ws.send ({method: 'unsubscribe', filters: [['object_uuid', 'is_a', 'arvados#human']]}.to_json)
+
+          EM::Timer.new 1 do
+            Human.create
+          end
+
+          state = 3
+        when 3
+          assert_equal 200, d["status"]
+          state = 4
+        when 4
+          assert false, "Should not get any more events"
+        end
+      end
+
+    end
+
+    assert_not_nil spec
+    assert_equal spec.uuid, spec_ev_uuid
+  end
+
+
+  test "connect, subscribe, get event, try to unsubscribe with bogus filter" do
+    state = 1
+    spec = nil
+    spec_ev_uuid = nil
+    human = nil
+    human_ev_uuid = nil
+
+    authorize_with :admin
+
+    ws_helper :admin do |ws|
+      ws.on :open do |event|
+        ws.send ({method: 'subscribe'}.to_json)
+      end
+
+      ws.on :message do |event|
+        d = Oj.load event.data
+        case state
+        when 1
+          assert_equal 200, d["status"]
+          spec = Specimen.create
+          state = 2
+        when 2
+          spec_ev_uuid = d["object_uuid"]
+          ws.send ({method: 'unsubscribe', filters: [['foo', 'bar', 'baz']]}.to_json)
+
+          EM::Timer.new 1 do
+            human = Human.create
+          end
+
+          state = 3
+        when 3
+          assert_equal 404, d["status"]
+          state = 4
+        when 4
+          human_ev_uuid = d["object_uuid"]
+          state = 5
+          ws.close
+        when 5
+          assert false, "Should not get any more events"
+        end
+      end
+
+    end
+
+    assert_not_nil spec
+    assert_not_nil human
+    assert_equal spec.uuid, spec_ev_uuid
+    assert_equal human.uuid, human_ev_uuid
+  end
+
+
+
+  test "connected, not subscribed, no event" do
+    authorize_with :admin
+
+    ws_helper :admin, false do |ws|
+      ws.on :open do |event|
+        EM::Timer.new 1 do
+          Specimen.create
+        end
+
+        EM::Timer.new 3 do
+          ws.close
+        end
+      end
+
+      ws.on :message do |event|
+        assert false, "Should not get any messages, message was #{event.data}"
+      end
+    end
+  end
+
+  test "connected, not authorized to see event" do
+    state = 1
+
+    authorize_with :admin
+
+    ws_helper :active, false do |ws|
+      ws.on :open do |event|
+        ws.send ({method: 'subscribe'}.to_json)
+
+        EM::Timer.new 3 do
+          ws.close
+        end
+      end
+
+      ws.on :message do |event|
+        d = Oj.load event.data
+        case state
+        when 1
+          assert_equal 200, d["status"]
+          Specimen.create
+          state = 2
+        when 2
+          assert false, "Should not get any messages, message was #{event.data}"
+        end
+      end
+
+    end
+
+  end
+
+  test "connect, try bogus method" do
+    status = nil
+
+    ws_helper :admin do |ws|
+      ws.on :open do |event|
+        ws.send ({method: 'frobnabble'}.to_json)
+      end
+
+      ws.on :message do |event|
+        d = Oj.load event.data
+        status = d["status"]
+        ws.close
+      end
+    end
+
+    assert_equal 400, status
+  end
+
+  test "connect, missing method" do
+    status = nil
+
+    ws_helper :admin do |ws|
+      ws.on :open do |event|
+        ws.send ({fizzbuzz: 'frobnabble'}.to_json)
+      end
+
+      ws.on :message do |event|
+        d = Oj.load event.data
+        status = d["status"]
+        ws.close
+      end
+    end
+
+    assert_equal 400, status
+  end
+
+  test "connect, send malformed request" do
+    status = nil
+
+    ws_helper :admin do |ws|
+      ws.on :open do |event|
+        ws.send '<XML4EVER></XML4EVER>'
+      end
+
+      ws.on :message do |event|
+        d = Oj.load event.data
+        status = d["status"]
+        ws.close
+      end
+    end
+
+    assert_equal 400, status
+  end
+
+
+  test "connect, try subscribe too many filters" do
+    state = 1
+
+    authorize_with :admin
+
+    ws_helper :admin do |ws|
+      ws.on :open do |event|
+        (1..17).each do |i|
+          ws.send ({method: 'subscribe', filters: [['object_uuid', '=', i]]}.to_json)
+        end
+      end
+
+      ws.on :message do |event|
+        d = Oj.load event.data
+        case state
+        when (1..EventBus::MAX_FILTERS)
+          assert_equal 200, d["status"]
+          state += 1
+        when (EventBus::MAX_FILTERS+1)
+          assert_equal 403, d["status"]
+          ws.close
+        end
+      end
+
+    end
+
+    assert_equal 17, state
+
+  end
+
+end
index 286cf66f28f240f00ea9aaf98bbb0ce5c13e7b8e..869d87f7db19f24d63743b04d410404ba4379082 100644 (file)
@@ -2,6 +2,20 @@ ENV["RAILS_ENV"] = "test"
 require File.expand_path('../../config/environment', __FILE__)
 require 'rails/test_help'
 
+module ArvadosTestSupport
+  def json_response
+    @json_response ||= ActiveSupport::JSON.decode @response.body
+  end
+
+  def api_token(api_client_auth_name)
+    api_client_authorizations(api_client_auth_name).api_token
+  end
+
+  def auth(api_client_auth_name)
+    {'HTTP_AUTHORIZATION' => "OAuth2 #{api_token(api_client_auth_name)}"}
+  end
+end
+
 class ActiveSupport::TestCase
   # Setup all fixtures in test/fixtures/*.(yml|csv) for all tests in alphabetical order.
   #
@@ -9,6 +23,8 @@ class ActiveSupport::TestCase
   # -- they do not yet inherit this setting
   fixtures :all
 
+  include ArvadosTestSupport
+
   teardown do
     Thread.current[:api_client_ip_address] = nil
     Thread.current[:api_client_authorization] = nil
@@ -21,19 +37,12 @@ class ActiveSupport::TestCase
     self.request.headers["Accept"] = "text/json"
   end
 
-  def json_response
-    @json_response ||= ActiveSupport::JSON.decode @response.body
-  end
-
   def authorize_with(api_client_auth_name)
-    self.request.env['HTTP_AUTHORIZATION'] = "OAuth2 #{api_client_authorizations(api_client_auth_name).api_token}"
+    ArvadosApiToken.new.call ({"rack.input" => "", "HTTP_AUTHORIZATION" => "OAuth2 #{api_client_authorizations(api_client_auth_name).api_token}"})
   end
-
-  # Add more helper methods to be used by all tests here...
 end
 
 class ActionDispatch::IntegrationTest
-
   teardown do
     Thread.current[:api_client_ip_address] = nil
     Thread.current[:api_client_authorization] = nil
@@ -41,10 +50,6 @@ class ActionDispatch::IntegrationTest
     Thread.current[:api_client] = nil
     Thread.current[:user] = nil
   end
-
-  def auth auth_fixture
-    {'HTTP_AUTHORIZATION' => "OAuth2 #{api_client_authorizations(auth_fixture).api_token}"}
-  end
 end
 
 # Ensure permissions are computed from the test fixtures.
index 3876775916f321ba6b7ed5269499f1123baf46f9..be8498d4bac3883621df2688dcf4ad5423f8c900 100644 (file)
@@ -224,4 +224,25 @@ class LogTest < ActiveSupport::TestCase
     auth.destroy
     assert_auth_logged_with_clean_properties(auth, :destroy)
   end
+
+  test "use ownership and permission links to determine which logs a user can see" do
+    c = Log.readable_by(users(:admin)).order("id asc").each.to_a
+    assert_equal 5, c.size
+    assert_equal 1, c[0].id # no-op
+    assert_equal 2, c[1].id # admin changes repository foo, which is owned by active user
+    assert_equal 3, c[2].id # admin changes specimen owned_by_spectator
+    assert_equal 4, c[3].id # foo collection added, readable by active through link
+    assert_equal 5, c[4].id # baz collection added, readable by active and spectator through group 'all users' group membership
+
+    c = Log.readable_by(users(:active)).order("id asc").each.to_a
+    assert_equal 3, c.size
+    assert_equal 2, c[0].id # admin changes repository foo, which is owned by active user
+    assert_equal 4, c[1].id # foo collection added, readable by active through link
+    assert_equal 5, c[2].id # baz collection added, readable by active and spectator through group 'all users' group membership
+
+    c = Log.readable_by(users(:spectator)).order("id asc").each.to_a
+    assert_equal 2, c.size
+    assert_equal 3, c[0].id # admin changes specimen owned_by_spectator
+    assert_equal 5, c[1].id # baz collection added, readable by active and spectator through group 'all users' group membership
+  end
 end
diff --git a/services/api/test/websocket_runner.rb b/services/api/test/websocket_runner.rb
new file mode 100644 (file)
index 0000000..df72e24
--- /dev/null
@@ -0,0 +1,42 @@
+require 'bundler'
+
+$ARV_API_SERVER_DIR = File.expand_path('../..', __FILE__)
+SERVER_PID_PATH = 'tmp/pids/passenger.3002.pid'
+
+class WebsocketTestRunner < MiniTest::Unit
+  def _system(*cmd)
+    Bundler.with_clean_env do
+      if not system({'ARVADOS_WEBSOCKETS' => 'ws-only', 'RAILS_ENV' => 'test'}, *cmd)
+        raise RuntimeError, "#{cmd[0]} returned exit code #{$?.exitstatus}"
+      end
+    end
+  end
+
+  def _run(args=[])
+    server_pid = Dir.chdir($ARV_API_SERVER_DIR) do |apidir|
+      # Only passenger seems to be able to run the websockets server successfully.
+      _system('passenger', 'start', '-d', '-p3002')
+      timeout = Time.now.tv_sec + 10
+      begin
+        sleep 0.2
+        begin
+          server_pid = IO.read(SERVER_PID_PATH).to_i
+          good_pid = (server_pid > 0) and (Process.kill(0, pid) rescue false)
+        rescue Errno::ENOENT
+          good_pid = false
+        end
+      end while (not good_pid) and (Time.now.tv_sec < timeout)
+      if not good_pid
+        raise RuntimeError, "could not find API server Rails pid"
+      end
+      server_pid
+    end
+    begin
+      super(args)
+    ensure
+      Process.kill('TERM', server_pid)
+    end
+  end
+end
+
+MiniTest::Unit.runner = WebsocketTestRunner.new
index 74c3b15e4e2d1132cf62009ef0a483f3183927d7..e621955487d0bd9a59656d6c587f13cc3203521e 100644 (file)
@@ -14,12 +14,9 @@ import (
        "log"
        "net/http"
        "os"
-       "path/filepath"
        "regexp"
-       "strconv"
        "strings"
        "syscall"
-       "time"
 )
 
 // ======================
@@ -40,7 +37,8 @@ const MIN_FREE_KILOBYTES = BLOCKSIZE / 1024
 
 var PROC_MOUNTS = "/proc/mounts"
 
-var KeepVolumes []string
+// The Keep VolumeManager maintains a list of available volumes.
+var KeepVM VolumeManager
 
 // ==========
 // Error types.
@@ -89,10 +87,13 @@ func main() {
        //    directories.
 
        var listen, volumearg string
+       var serialize_io bool
        flag.StringVar(&listen, "listen", DEFAULT_ADDR,
                "interface on which to listen for requests, in the format ipaddr:port. e.g. -listen=10.0.1.24:8000. Use -listen=:port to listen on all network interfaces.")
        flag.StringVar(&volumearg, "volumes", "",
                "Comma-separated list of directories to use for Keep volumes, e.g. -volumes=/var/keep1,/var/keep2. If empty or not supplied, Keep will scan mounted filesystems for volumes with a /keep top-level directory.")
+       flag.BoolVar(&serialize_io, "serialize", false,
+               "If set, all read and write operations on local Keep volumes will be serialized.")
        flag.Parse()
 
        // Look for local keep volumes.
@@ -107,20 +108,24 @@ func main() {
        }
 
        // Check that the specified volumes actually exist.
-       KeepVolumes = []string(nil)
+       var goodvols []Volume = nil
        for _, v := range keepvols {
                if _, err := os.Stat(v); err == nil {
                        log.Println("adding Keep volume:", v)
-                       KeepVolumes = append(KeepVolumes, v)
+                       newvol := MakeUnixVolume(v, serialize_io)
+                       goodvols = append(goodvols, &newvol)
                } else {
                        log.Printf("bad Keep volume: %s\n", err)
                }
        }
 
-       if len(KeepVolumes) == 0 {
+       if len(goodvols) == 0 {
                log.Fatal("could not find any keep volumes")
        }
 
+       // Start a round-robin VolumeManager with the volumes we have found.
+       KeepVM = MakeRRVolumeManager(goodvols)
+
        // Set up REST handlers.
        //
        // Start with a router that will route each URL path to an
@@ -226,7 +231,10 @@ func PutBlockHandler(w http.ResponseWriter, req *http.Request) {
 func IndexHandler(w http.ResponseWriter, req *http.Request) {
        prefix := mux.Vars(req)["prefix"]
 
-       index := IndexLocators(prefix)
+       var index string
+       for _, vol := range KeepVM.Volumes() {
+               index = index + vol.Index(prefix)
+       }
        w.Write([]byte(index))
 }
 
@@ -271,9 +279,9 @@ func StatusHandler(w http.ResponseWriter, req *http.Request) {
 func GetNodeStatus() *NodeStatus {
        st := new(NodeStatus)
 
-       st.Volumes = make([]*VolumeStatus, len(KeepVolumes))
-       for i, vol := range KeepVolumes {
-               st.Volumes[i] = GetVolumeStatus(vol)
+       st.Volumes = make([]*VolumeStatus, len(KeepVM.Volumes()))
+       for i, vol := range KeepVM.Volumes() {
+               st.Volumes[i] = vol.Status()
        }
        return st
 }
@@ -305,108 +313,42 @@ func GetVolumeStatus(volume string) *VolumeStatus {
        return &VolumeStatus{volume, devnum, free, used}
 }
 
-// IndexLocators
-//     Returns a string containing a list of locator ids found on this
-//     Keep server.  If {prefix} is given, return only those locator
-//     ids that begin with the given prefix string.
-//
-//     The return string consists of a sequence of newline-separated
-//     strings in the format
-//
-//         locator+size modification-time
-//
-//     e.g.:
-//
-//         e4df392f86be161ca6ed3773a962b8f3+67108864 1388894303
-//         e4d41e6fd68460e0e3fc18cc746959d2+67108864 1377796043
-//         e4de7a2810f5554cd39b36d8ddb132ff+67108864 1388701136
-//
-func IndexLocators(prefix string) string {
-       var output string
-       for _, vol := range KeepVolumes {
-               filepath.Walk(vol,
-                       func(path string, info os.FileInfo, err error) error {
-                               // This WalkFunc inspects each path in the volume
-                               // and prints an index line for all files that begin
-                               // with prefix.
-                               if err != nil {
-                                       log.Printf("IndexHandler: %s: walking to %s: %s",
-                                               vol, path, err)
-                                       return nil
-                               }
-                               locator := filepath.Base(path)
-                               // Skip directories that do not match prefix.
-                               // We know there is nothing interesting inside.
-                               if info.IsDir() &&
-                                       !strings.HasPrefix(locator, prefix) &&
-                                       !strings.HasPrefix(prefix, locator) {
-                                       return filepath.SkipDir
-                               }
-                               // Skip any file that is not apparently a locator, e.g. .meta files
-                               if is_valid, err := IsValidLocator(locator); err != nil {
-                                       return err
-                               } else if !is_valid {
-                                       return nil
-                               }
-                               // Print filenames beginning with prefix
-                               if !info.IsDir() && strings.HasPrefix(locator, prefix) {
-                                       output = output + fmt.Sprintf(
-                                               "%s+%d %d\n", locator, info.Size(), info.ModTime().Unix())
-                               }
-                               return nil
-                       })
-       }
-
-       return output
-}
-
 func GetBlock(hash string) ([]byte, error) {
-       var buf = make([]byte, BLOCKSIZE)
-
        // Attempt to read the requested hash from a keep volume.
-       for _, vol := range KeepVolumes {
-               var f *os.File
-               var err error
-               var nread int
-
-               blockFilename := fmt.Sprintf("%s/%s/%s", vol, hash[0:3], hash)
-
-               f, err = os.Open(blockFilename)
-               if err != nil {
-                       if !os.IsNotExist(err) {
-                               // A block is stored on only one Keep disk,
-                               // so os.IsNotExist is expected.  Report any other errors.
-                               log.Printf("%s: opening %s: %s\n", vol, blockFilename, err)
+       for _, vol := range KeepVM.Volumes() {
+               if buf, err := vol.Get(hash); err != nil {
+                       // IsNotExist is an expected error and may be ignored.
+                       // (If all volumes report IsNotExist, we return a NotFoundError)
+                       // A CorruptError should be returned immediately.
+                       // Any other errors should be logged but we continue trying to
+                       // read.
+                       switch {
+                       case os.IsNotExist(err):
+                               continue
+                       default:
+                               log.Printf("GetBlock: reading %s: %s\n", hash, err)
                        }
-                       continue
-               }
-
-               nread, err = f.Read(buf)
-               if err != nil {
-                       log.Printf("%s: reading %s: %s\n", vol, blockFilename, err)
-                       continue
-               }
-
-               // Double check the file checksum.
-               //
-               filehash := fmt.Sprintf("%x", md5.Sum(buf[:nread]))
-               if filehash != hash {
-                       // TODO(twp): this condition probably represents a bad disk and
-                       // should raise major alarm bells for an administrator: e.g.
-                       // they should be sent directly to an event manager at high
-                       // priority or logged as urgent problems.
+               } else {
+                       // Double check the file checksum.
                        //
-                       log.Printf("%s: checksum mismatch: %s (actual hash %s)\n",
-                               vol, blockFilename, filehash)
-                       return buf, CorruptError
+                       filehash := fmt.Sprintf("%x", md5.Sum(buf))
+                       if filehash != hash {
+                               // TODO(twp): this condition probably represents a bad disk and
+                               // should raise major alarm bells for an administrator: e.g.
+                               // they should be sent directly to an event manager at high
+                               // priority or logged as urgent problems.
+                               //
+                               log.Printf("%s: checksum mismatch for request %s (actual hash %s)\n",
+                                       vol, hash, filehash)
+                               return buf, CorruptError
+                       }
+                       // Success!
+                       return buf, nil
                }
-
-               // Success!
-               return buf[:nread], nil
        }
 
        log.Printf("%s: not found on any volumes, giving up\n", hash)
-       return buf, NotFoundError
+       return nil, NotFoundError
 }
 
 /* PutBlock(block, hash)
@@ -456,98 +398,34 @@ func PutBlock(block []byte, hash string) error {
                }
        }
 
-       // Store the block on the first available Keep volume.
-       allFull := true
-       for _, vol := range KeepVolumes {
-               if IsFull(vol) {
-                       continue
-               }
-               allFull = false
-               blockDir := fmt.Sprintf("%s/%s", vol, hash[0:3])
-               if err := os.MkdirAll(blockDir, 0755); err != nil {
-                       log.Printf("%s: could not create directory %s: %s",
-                               hash, blockDir, err)
-                       continue
-               }
-
-               tmpfile, tmperr := ioutil.TempFile(blockDir, "tmp"+hash)
-               if tmperr != nil {
-                       log.Printf("ioutil.TempFile(%s, tmp%s): %s", blockDir, hash, tmperr)
-                       continue
-               }
-               blockFilename := fmt.Sprintf("%s/%s", blockDir, hash)
-
-               if _, err := tmpfile.Write(block); err != nil {
-                       log.Printf("%s: writing to %s: %s\n", vol, blockFilename, err)
-                       continue
-               }
-               if err := tmpfile.Close(); err != nil {
-                       log.Printf("closing %s: %s\n", tmpfile.Name(), err)
-                       os.Remove(tmpfile.Name())
-                       continue
-               }
-               if err := os.Rename(tmpfile.Name(), blockFilename); err != nil {
-                       log.Printf("rename %s %s: %s\n", tmpfile.Name(), blockFilename, err)
-                       os.Remove(tmpfile.Name())
-                       continue
-               }
-               return nil
-       }
-
-       if allFull {
-               log.Printf("all Keep volumes full")
-               return FullError
+       // Choose a Keep volume to write to.
+       // If this volume fails, try all of the volumes in order.
+       vol := KeepVM.Choose()
+       if err := vol.Put(hash, block); err == nil {
+               return nil // success!
        } else {
-               log.Printf("all Keep volumes failed")
-               return GenericError
-       }
-}
-
-func IsFull(volume string) (isFull bool) {
-       fullSymlink := volume + "/full"
-
-       // Check if the volume has been marked as full in the last hour.
-       if link, err := os.Readlink(fullSymlink); err == nil {
-               if ts, err := strconv.Atoi(link); err == nil {
-                       fulltime := time.Unix(int64(ts), 0)
-                       if time.Since(fulltime).Hours() < 1.0 {
-                               return true
+               allFull := true
+               for _, vol := range KeepVM.Volumes() {
+                       err := vol.Put(hash, block)
+                       if err == nil {
+                               return nil // success!
+                       }
+                       if err != FullError {
+                               // The volume is not full but the write did not succeed.
+                               // Report the error and continue trying.
+                               allFull = false
+                               log.Printf("%s: Write(%s): %s\n", vol, hash, err)
                        }
                }
-       }
-
-       if avail, err := FreeDiskSpace(volume); err == nil {
-               isFull = avail < MIN_FREE_KILOBYTES
-       } else {
-               log.Printf("%s: FreeDiskSpace: %s\n", volume, err)
-               isFull = false
-       }
 
-       // If the volume is full, timestamp it.
-       if isFull {
-               now := fmt.Sprintf("%d", time.Now().Unix())
-               os.Symlink(now, fullSymlink)
-       }
-       return
-}
-
-// FreeDiskSpace(volume)
-//     Returns the amount of available disk space on VOLUME,
-//     as a number of 1k blocks.
-//
-//     TODO(twp): consider integrating this better with
-//     VolumeStatus (e.g. keep a NodeStatus object up-to-date
-//     periodically and use it as the source of info)
-//
-func FreeDiskSpace(volume string) (free uint64, err error) {
-       var fs syscall.Statfs_t
-       err = syscall.Statfs(volume, &fs)
-       if err == nil {
-               // Statfs output is not guaranteed to measure free
-               // space in terms of 1K blocks.
-               free = fs.Bavail * uint64(fs.Bsize) / 1024
+               if allFull {
+                       log.Printf("all Keep volumes full")
+                       return FullError
+               } else {
+                       log.Printf("all Keep volumes failed")
+                       return GenericError
+               }
        }
-       return
 }
 
 // ReadAtMost
@@ -573,6 +451,11 @@ func ReadAtMost(r io.Reader, maxbytes int) ([]byte, error) {
 //     When Keep is extended to support hash types other than MD5,
 //     this should be updated to cover those as well.
 //
-func IsValidLocator(loc string) (bool, error) {
-       return regexp.MatchString(`^[0-9a-f]{32}$`, loc)
+func IsValidLocator(loc string) bool {
+       match, err := regexp.MatchString(`^[0-9a-f]{32}$`, loc)
+       if err == nil {
+               return match
+       }
+       log.Printf("IsValidLocator: %s\n", err)
+       return false
 }
index 97fa1c78919e3070bf02690dbb876a741837dda7..30d103da72e89c13a17181f725eb7eb1a0ee4c5c 100644 (file)
@@ -48,8 +48,13 @@ func TestGetBlock(t *testing.T) {
        defer teardown()
 
        // Prepare two test Keep volumes. Our block is stored on the second volume.
-       KeepVolumes = setup(t, 2)
-       store(t, KeepVolumes[1], TEST_HASH, TEST_BLOCK)
+       KeepVM = MakeTestVolumeManager(2)
+       defer func() { KeepVM.Quit() }()
+
+       vols := KeepVM.Volumes()
+       if err := vols[1].Put(TEST_HASH, TEST_BLOCK); err != nil {
+               t.Error(err)
+       }
 
        // Check that GetBlock returns success.
        result, err := GetBlock(TEST_HASH)
@@ -68,7 +73,8 @@ func TestGetBlockMissing(t *testing.T) {
        defer teardown()
 
        // Create two empty test Keep volumes.
-       KeepVolumes = setup(t, 2)
+       KeepVM = MakeTestVolumeManager(2)
+       defer func() { KeepVM.Quit() }()
 
        // Check that GetBlock returns failure.
        result, err := GetBlock(TEST_HASH)
@@ -84,17 +90,17 @@ func TestGetBlockMissing(t *testing.T) {
 func TestGetBlockCorrupt(t *testing.T) {
        defer teardown()
 
-       // Create two test Keep volumes and store a block in each of them,
-       // but the hash of the block does not match the filename.
-       KeepVolumes = setup(t, 2)
-       for _, vol := range KeepVolumes {
-               store(t, vol, TEST_HASH, BAD_BLOCK)
-       }
+       // Create two test Keep volumes and store a corrupt block in one.
+       KeepVM = MakeTestVolumeManager(2)
+       defer func() { KeepVM.Quit() }()
+
+       vols := KeepVM.Volumes()
+       vols[0].Put(TEST_HASH, BAD_BLOCK)
 
        // Check that GetBlock returns failure.
        result, err := GetBlock(TEST_HASH)
        if err != CorruptError {
-               t.Errorf("Expected CorruptError, got %v", result)
+               t.Errorf("Expected CorruptError, got %v (buf: %v)", err, result)
        }
 }
 
@@ -109,20 +115,21 @@ func TestPutBlockOK(t *testing.T) {
        defer teardown()
 
        // Create two test Keep volumes.
-       KeepVolumes = setup(t, 2)
+       KeepVM = MakeTestVolumeManager(2)
+       defer func() { KeepVM.Quit() }()
 
        // Check that PutBlock stores the data as expected.
        if err := PutBlock(TEST_BLOCK, TEST_HASH); err != nil {
                t.Fatalf("PutBlock: %v", err)
        }
 
-       result, err := GetBlock(TEST_HASH)
+       vols := KeepVM.Volumes()
+       result, err := vols[0].Get(TEST_HASH)
        if err != nil {
-               t.Fatalf("GetBlock returned error: %v", err)
+               t.Fatalf("Volume #0 Get returned error: %v", err)
        }
        if string(result) != string(TEST_BLOCK) {
-               t.Error("PutBlock/GetBlock mismatch")
-               t.Fatalf("PutBlock stored '%s', GetBlock retrieved '%s'",
+               t.Fatalf("PutBlock stored '%s', Get retrieved '%s'",
                        string(TEST_BLOCK), string(result))
        }
 }
@@ -135,8 +142,11 @@ func TestPutBlockOneVol(t *testing.T) {
        defer teardown()
 
        // Create two test Keep volumes, but cripple one of them.
-       KeepVolumes = setup(t, 2)
-       os.Chmod(KeepVolumes[0], 000)
+       KeepVM = MakeTestVolumeManager(2)
+       defer func() { KeepVM.Quit() }()
+
+       vols := KeepVM.Volumes()
+       vols[0].(*MockVolume).Bad = true
 
        // Check that PutBlock stores the data as expected.
        if err := PutBlock(TEST_BLOCK, TEST_HASH); err != nil {
@@ -162,7 +172,8 @@ func TestPutBlockMD5Fail(t *testing.T) {
        defer teardown()
 
        // Create two test Keep volumes.
-       KeepVolumes = setup(t, 2)
+       KeepVM = MakeTestVolumeManager(2)
+       defer func() { KeepVM.Quit() }()
 
        // Check that PutBlock returns the expected error when the hash does
        // not match the block.
@@ -185,10 +196,12 @@ func TestPutBlockCorrupt(t *testing.T) {
        defer teardown()
 
        // Create two test Keep volumes.
-       KeepVolumes = setup(t, 2)
+       KeepVM = MakeTestVolumeManager(2)
+       defer func() { KeepVM.Quit() }()
 
        // Store a corrupted block under TEST_HASH.
-       store(t, KeepVolumes[0], TEST_HASH, BAD_BLOCK)
+       vols := KeepVM.Volumes()
+       vols[0].Put(TEST_HASH, BAD_BLOCK)
        if err := PutBlock(TEST_BLOCK, TEST_HASH); err != nil {
                t.Errorf("PutBlock: %v", err)
        }
@@ -213,11 +226,15 @@ func TestPutBlockCollision(t *testing.T) {
        var b2 = []byte("\x0e0eaU\x9a\xa7\x87\xd0\x0b\xc6\xf7\x0b\xbd\xfe4\x04\xcf\x03e\x9etO\x854\xc0\x0f\xfbe\x9cL\x87@\xcc\x94/\xeb-\xa1\x15\xa3\xf4\x15\xdc\xbb\x86\x07Is\x86em}\x1f4\xa4 Y\xd7\x8fZ\x8d\xd1\xef")
        var locator = "cee9a457e790cf20d4bdaa6d69f01e41"
 
-       // Prepare two test Keep volumes. Store one block,
-       // then attempt to store the other.
-       KeepVolumes = setup(t, 2)
-       store(t, KeepVolumes[1], locator, b1)
+       // Prepare two test Keep volumes.
+       KeepVM = MakeTestVolumeManager(2)
+       defer func() { KeepVM.Quit() }()
 
+       // Store one block, then attempt to store the other. Confirm that
+       // PutBlock reported a CollisionError.
+       if err := PutBlock(b1, locator); err != nil {
+               t.Error(err)
+       }
        if err := PutBlock(b2, locator); err == nil {
                t.Error("PutBlock did not report a collision")
        } else if err != CollisionError {
@@ -234,10 +251,25 @@ func TestPutBlockCollision(t *testing.T) {
 //     directories at the top level.
 //
 func TestFindKeepVolumes(t *testing.T) {
-       defer teardown()
+       var tempVols [2]string
+       var err error
+
+       defer func() {
+               for _, path := range tempVols {
+                       os.RemoveAll(path)
+               }
+       }()
 
-       // Initialize two keep volumes.
-       var tempVols []string = setup(t, 2)
+       // Create two directories suitable for using as keep volumes.
+       for i := range tempVols {
+               if tempVols[i], err = ioutil.TempDir("", "findvol"); err != nil {
+                       t.Fatal(err)
+               }
+               tempVols[i] = tempVols[i] + "/keep"
+               if err = os.Mkdir(tempVols[i], 0755); err != nil {
+                       t.Fatal(err)
+               }
+       }
 
        // Set up a bogus PROC_MOUNTS file.
        if f, err := ioutil.TempFile("", "keeptest"); err == nil {
@@ -298,14 +330,17 @@ func TestIndex(t *testing.T) {
        // Set up Keep volumes and populate them.
        // Include multiple blocks on different volumes, and
        // some metadata files.
-       KeepVolumes = setup(t, 2)
-       store(t, KeepVolumes[0], TEST_HASH, TEST_BLOCK)
-       store(t, KeepVolumes[1], TEST_HASH_2, TEST_BLOCK_2)
-       store(t, KeepVolumes[0], TEST_HASH_3, TEST_BLOCK_3)
-       store(t, KeepVolumes[0], TEST_HASH+".meta", []byte("metadata"))
-       store(t, KeepVolumes[1], TEST_HASH_2+".meta", []byte("metadata"))
-
-       index := IndexLocators("")
+       KeepVM = MakeTestVolumeManager(2)
+       defer func() { KeepVM.Quit() }()
+
+       vols := KeepVM.Volumes()
+       vols[0].Put(TEST_HASH, TEST_BLOCK)
+       vols[1].Put(TEST_HASH_2, TEST_BLOCK_2)
+       vols[0].Put(TEST_HASH_3, TEST_BLOCK_3)
+       vols[0].Put(TEST_HASH+".meta", []byte("metadata"))
+       vols[1].Put(TEST_HASH_2+".meta", []byte("metadata"))
+
+       index := vols[0].Index("") + vols[1].Index("")
        expected := `^` + TEST_HASH + `\+\d+ \d+\n` +
                TEST_HASH_3 + `\+\d+ \d+\n` +
                TEST_HASH_2 + `\+\d+ \d+\n$`
@@ -329,16 +364,21 @@ func TestIndex(t *testing.T) {
 func TestNodeStatus(t *testing.T) {
        defer teardown()
 
-       // Set up test Keep volumes.
-       KeepVolumes = setup(t, 2)
+       // Set up test Keep volumes with some blocks.
+       KeepVM = MakeTestVolumeManager(2)
+       defer func() { KeepVM.Quit() }()
+
+       vols := KeepVM.Volumes()
+       vols[0].Put(TEST_HASH, TEST_BLOCK)
+       vols[1].Put(TEST_HASH_2, TEST_BLOCK_2)
 
        // Get node status and make a basic sanity check.
        st := GetNodeStatus()
-       for i, vol := range KeepVolumes {
+       for i := range vols {
                volinfo := st.Volumes[i]
                mtp := volinfo.MountPoint
-               if mtp != vol {
-                       t.Errorf("GetNodeStatus mount_point %s != KeepVolume %s", mtp, vol)
+               if mtp != "/bogo" {
+                       t.Errorf("GetNodeStatus mount_point %s, expected /bogo", mtp)
                }
                if volinfo.DeviceNum == 0 {
                        t.Errorf("uninitialized device_num in %v", volinfo)
@@ -356,47 +396,21 @@ func TestNodeStatus(t *testing.T) {
 // Helper functions for unit tests.
 // ========================================
 
-// setup
-//     Create KeepVolumes for testing.
-//     Returns a slice of pathnames to temporary Keep volumes.
+// MakeTestVolumeManager
+//     Creates and returns a RRVolumeManager with the specified number
+//     of MockVolumes.
 //
-func setup(t *testing.T, num_volumes int) []string {
-       vols := make([]string, num_volumes)
+func MakeTestVolumeManager(num_volumes int) VolumeManager {
+       vols := make([]Volume, num_volumes)
        for i := range vols {
-               if dir, err := ioutil.TempDir(os.TempDir(), "keeptest"); err == nil {
-                       vols[i] = dir + "/keep"
-                       os.Mkdir(vols[i], 0755)
-               } else {
-                       t.Fatal(err)
-               }
+               vols[i] = CreateMockVolume()
        }
-       return vols
+       return MakeRRVolumeManager(vols)
 }
 
 // teardown
 //     Cleanup to perform after each test.
 //
 func teardown() {
-       for _, vol := range KeepVolumes {
-               os.RemoveAll(path.Dir(vol))
-       }
-       KeepVolumes = nil
-}
-
-// store
-//     Low-level code to write Keep blocks directly to disk for testing.
-//
-func store(t *testing.T, keepdir string, filename string, block []byte) {
-       blockdir := fmt.Sprintf("%s/%s", keepdir, filename[:3])
-       if err := os.MkdirAll(blockdir, 0755); err != nil {
-               t.Fatal(err)
-       }
-
-       blockpath := fmt.Sprintf("%s/%s", blockdir, filename)
-       if f, err := os.Create(blockpath); err == nil {
-               f.Write(block)
-               f.Close()
-       } else {
-               t.Fatal(err)
-       }
+       KeepVM = nil
 }
diff --git a/services/keep/src/keep/volume.go b/services/keep/src/keep/volume.go
new file mode 100644 (file)
index 0000000..fffc815
--- /dev/null
@@ -0,0 +1,129 @@
+// A Volume is an interface representing a Keep back-end storage unit:
+// for example, a single mounted disk, a RAID array, an Amazon S3 volume,
+// etc.
+
+package main
+
+import (
+       "errors"
+       "fmt"
+       "strings"
+)
+
+type Volume interface {
+       Get(loc string) ([]byte, error)
+       Put(loc string, block []byte) error
+       Index(prefix string) string
+       Status() *VolumeStatus
+       String() string
+}
+
+// MockVolumes are Volumes used to test the Keep front end.
+//
+// If the Bad field is true, this volume should return an error
+// on all writes and puts.
+//
+type MockVolume struct {
+       Store map[string][]byte
+       Bad   bool
+}
+
+func CreateMockVolume() *MockVolume {
+       return &MockVolume{make(map[string][]byte), false}
+}
+
+func (v *MockVolume) Get(loc string) ([]byte, error) {
+       if v.Bad {
+               return nil, errors.New("Bad volume")
+       } else if block, ok := v.Store[loc]; ok {
+               return block, nil
+       }
+       return nil, errors.New("not found")
+}
+
+func (v *MockVolume) Put(loc string, block []byte) error {
+       if v.Bad {
+               return errors.New("Bad volume")
+       }
+       v.Store[loc] = block
+       return nil
+}
+
+func (v *MockVolume) Index(prefix string) string {
+       var result string
+       for loc, block := range v.Store {
+               if IsValidLocator(loc) && strings.HasPrefix(loc, prefix) {
+                       result = result + fmt.Sprintf("%s+%d %d\n",
+                               loc, len(block), 123456789)
+               }
+       }
+       return result
+}
+
+func (v *MockVolume) Status() *VolumeStatus {
+       var used uint64
+       for _, block := range v.Store {
+               used = used + uint64(len(block))
+       }
+       return &VolumeStatus{"/bogo", 123, 1000000 - used, used}
+}
+
+func (v *MockVolume) String() string {
+       return "[MockVolume]"
+}
+
+// A VolumeManager manages a collection of volumes.
+//
+// - Volumes is a slice of available Volumes.
+// - Choose() returns a Volume suitable for writing to.
+// - Quit() instructs the VolumeManager to shut down gracefully.
+//
+type VolumeManager interface {
+       Volumes() []Volume
+       Choose() Volume
+       Quit()
+}
+
+type RRVolumeManager struct {
+       volumes   []Volume
+       nextwrite chan Volume
+       quit      chan int
+}
+
+func MakeRRVolumeManager(vols []Volume) *RRVolumeManager {
+       // Create a new VolumeManager struct with the specified volumes,
+       // and with new Nextwrite and Quit channels.
+       // The Quit channel is buffered with a capacity of 1 so that
+       // another routine may write to it without blocking.
+       vm := &RRVolumeManager{vols, make(chan Volume), make(chan int, 1)}
+
+       // This goroutine implements round-robin volume selection.
+       // It sends each available Volume in turn to the Nextwrite
+       // channel, until receiving a notification on the Quit channel
+       // that it should terminate.
+       go func() {
+               var i int = 0
+               for {
+                       select {
+                       case <-vm.quit:
+                               return
+                       case vm.nextwrite <- vm.volumes[i]:
+                               i = (i + 1) % len(vm.volumes)
+                       }
+               }
+       }()
+
+       return vm
+}
+
+func (vm *RRVolumeManager) Volumes() []Volume {
+       return vm.volumes
+}
+
+func (vm *RRVolumeManager) Choose() Volume {
+       return <-vm.nextwrite
+}
+
+func (vm *RRVolumeManager) Quit() {
+       vm.quit <- 1
+}
diff --git a/services/keep/src/keep/volume_unix.go b/services/keep/src/keep/volume_unix.go
new file mode 100644 (file)
index 0000000..88410de
--- /dev/null
@@ -0,0 +1,295 @@
+// A UnixVolume is a Volume backed by a locally mounted disk.
+//
+package main
+
+import (
+       "fmt"
+       "io/ioutil"
+       "log"
+       "os"
+       "path/filepath"
+       "strconv"
+       "strings"
+       "syscall"
+       "time"
+)
+
+// IORequests are encapsulated Get or Put requests.  They are used to
+// implement serialized I/O (i.e. only one read/write operation per
+// volume). When running in serialized mode, the Keep front end sends
+// IORequests on a channel to an IORunner, which handles them one at a
+// time and returns an IOResponse.
+//
+type IOMethod int
+
+const (
+       KeepGet IOMethod = iota
+       KeepPut
+)
+
+type IORequest struct {
+       method IOMethod
+       loc    string
+       data   []byte
+       reply  chan *IOResponse
+}
+
+type IOResponse struct {
+       data []byte
+       err  error
+}
+
+// A UnixVolume has the following properties:
+//
+//   root
+//       the path to the volume's root directory
+//   queue
+//       A channel of IORequests. If non-nil, all I/O requests for
+//       this volume should be queued on this channel; the result
+//       will be delivered on the IOResponse channel supplied in the
+//       request.
+//
+type UnixVolume struct {
+       root  string // path to this volume
+       queue chan *IORequest
+}
+
+func (v *UnixVolume) IOHandler() {
+       for req := range v.queue {
+               var result IOResponse
+               switch req.method {
+               case KeepGet:
+                       result.data, result.err = v.Read(req.loc)
+               case KeepPut:
+                       result.err = v.Write(req.loc, req.data)
+               }
+               req.reply <- &result
+       }
+}
+
+func MakeUnixVolume(root string, serialize bool) (v UnixVolume) {
+       if serialize {
+               v = UnixVolume{root, make(chan *IORequest)}
+               go v.IOHandler()
+       } else {
+               v = UnixVolume{root, nil}
+       }
+       return
+}
+
+func (v *UnixVolume) Get(loc string) ([]byte, error) {
+       if v.queue == nil {
+               return v.Read(loc)
+       }
+       reply := make(chan *IOResponse)
+       v.queue <- &IORequest{KeepGet, loc, nil, reply}
+       response := <-reply
+       return response.data, response.err
+}
+
+func (v *UnixVolume) Put(loc string, block []byte) error {
+       if v.queue == nil {
+               return v.Write(loc, block)
+       }
+       reply := make(chan *IOResponse)
+       v.queue <- &IORequest{KeepPut, loc, block, reply}
+       response := <-reply
+       return response.err
+}
+
+// Read retrieves a block identified by the locator string "loc", and
+// returns its contents as a byte slice.
+//
+// If the block could not be opened or read, Read returns a nil slice
+// and the os.Error that was generated.
+//
+// If the block is present but its content hash does not match loc,
+// Read returns the block and a CorruptError.  It is the caller's
+// responsibility to decide what (if anything) to do with the
+// corrupted data block.
+//
+func (v *UnixVolume) Read(loc string) ([]byte, error) {
+       var f *os.File
+       var err error
+       var buf []byte
+
+       blockFilename := filepath.Join(v.root, loc[0:3], loc)
+
+       f, err = os.Open(blockFilename)
+       if err != nil {
+               return nil, err
+       }
+
+       if buf, err = ioutil.ReadAll(f); err != nil {
+               log.Printf("%s: reading %s: %s\n", v, blockFilename, err)
+               return buf, err
+       }
+
+       // Success!
+       return buf, nil
+}
+
+// Write stores a block of data identified by the locator string
+// "loc".  It returns nil on success.  If the volume is full, it
+// returns a FullError.  If the write fails due to some other error,
+// that error is returned.
+//
+func (v *UnixVolume) Write(loc string, block []byte) error {
+       if v.IsFull() {
+               return FullError
+       }
+       blockDir := filepath.Join(v.root, loc[0:3])
+       if err := os.MkdirAll(blockDir, 0755); err != nil {
+               log.Printf("%s: could not create directory %s: %s",
+                       loc, blockDir, err)
+               return err
+       }
+
+       tmpfile, tmperr := ioutil.TempFile(blockDir, "tmp"+loc)
+       if tmperr != nil {
+               log.Printf("ioutil.TempFile(%s, tmp%s): %s", blockDir, loc, tmperr)
+               return tmperr
+       }
+       blockFilename := filepath.Join(blockDir, loc)
+
+       if _, err := tmpfile.Write(block); err != nil {
+               log.Printf("%s: writing to %s: %s\n", v, blockFilename, err)
+               return err
+       }
+       if err := tmpfile.Close(); err != nil {
+               log.Printf("closing %s: %s\n", tmpfile.Name(), err)
+               os.Remove(tmpfile.Name())
+               return err
+       }
+       if err := os.Rename(tmpfile.Name(), blockFilename); err != nil {
+               log.Printf("rename %s %s: %s\n", tmpfile.Name(), blockFilename, err)
+               os.Remove(tmpfile.Name())
+               return err
+       }
+       return nil
+}
+
+// Status returns a VolumeStatus struct describing the volume's
+// current state.
+//
+func (v *UnixVolume) Status() *VolumeStatus {
+       var fs syscall.Statfs_t
+       var devnum uint64
+
+       if fi, err := os.Stat(v.root); err == nil {
+               devnum = fi.Sys().(*syscall.Stat_t).Dev
+       } else {
+               log.Printf("%s: os.Stat: %s\n", v, err)
+               return nil
+       }
+
+       err := syscall.Statfs(v.root, &fs)
+       if err != nil {
+               log.Printf("%s: statfs: %s\n", v, err)
+               return nil
+       }
+       // These calculations match the way df calculates disk usage:
+       // "free" space is measured by fs.Bavail, but "used" space
+       // uses fs.Blocks - fs.Bfree.
+       free := fs.Bavail * uint64(fs.Bsize)
+       used := (fs.Blocks - fs.Bfree) * uint64(fs.Bsize)
+       return &VolumeStatus{v.root, devnum, free, used}
+}
+
+// Index returns a list of blocks found on this volume which begin with
+// the specified prefix. If the prefix is an empty string, Index returns
+// a complete list of blocks.
+//
+// The return value is a multiline string (separated by
+// newlines). Each line is in the format
+//
+//     locator+size modification-time
+//
+// e.g.:
+//
+//     e4df392f86be161ca6ed3773a962b8f3+67108864 1388894303
+//     e4d41e6fd68460e0e3fc18cc746959d2+67108864 1377796043
+//     e4de7a2810f5554cd39b36d8ddb132ff+67108864 1388701136
+//
+func (v *UnixVolume) Index(prefix string) (output string) {
+       filepath.Walk(v.root,
+               func(path string, info os.FileInfo, err error) error {
+                       // This WalkFunc inspects each path in the volume
+                       // and prints an index line for all files that begin
+                       // with prefix.
+                       if err != nil {
+                               log.Printf("IndexHandler: %s: walking to %s: %s",
+                                       v, path, err)
+                               return nil
+                       }
+                       locator := filepath.Base(path)
+                       // Skip directories that do not match prefix.
+                       // We know there is nothing interesting inside.
+                       if info.IsDir() &&
+                               !strings.HasPrefix(locator, prefix) &&
+                               !strings.HasPrefix(prefix, locator) {
+                               return filepath.SkipDir
+                       }
+                       // Skip any file that is not apparently a locator, e.g. .meta files
+                       if !IsValidLocator(locator) {
+                               return nil
+                       }
+                       // Print filenames beginning with prefix
+                       if !info.IsDir() && strings.HasPrefix(locator, prefix) {
+                               output = output + fmt.Sprintf(
+                                       "%s+%d %d\n", locator, info.Size(), info.ModTime().Unix())
+                       }
+                       return nil
+               })
+
+       return
+}
+
+// IsFull returns true if the free space on the volume is less than
+// MIN_FREE_KILOBYTES.
+//
+func (v *UnixVolume) IsFull() (isFull bool) {
+       fullSymlink := v.root + "/full"
+
+       // Check if the volume has been marked as full in the last hour.
+       if link, err := os.Readlink(fullSymlink); err == nil {
+               if ts, err := strconv.Atoi(link); err == nil {
+                       fulltime := time.Unix(int64(ts), 0)
+                       if time.Since(fulltime).Hours() < 1.0 {
+                               return true
+                       }
+               }
+       }
+
+       if avail, err := v.FreeDiskSpace(); err == nil {
+               isFull = avail < MIN_FREE_KILOBYTES
+       } else {
+               log.Printf("%s: FreeDiskSpace: %s\n", v, err)
+               isFull = false
+       }
+
+       // If the volume is full, timestamp it.
+       if isFull {
+               now := fmt.Sprintf("%d", time.Now().Unix())
+               os.Symlink(now, fullSymlink)
+       }
+       return
+}
+
+// FreeDiskSpace returns the number of unused 1k blocks available on
+// the volume.
+//
+func (v *UnixVolume) FreeDiskSpace() (free uint64, err error) {
+       var fs syscall.Statfs_t
+       err = syscall.Statfs(v.root, &fs)
+       if err == nil {
+               // Statfs output is not guaranteed to measure free
+               // space in terms of 1K blocks.
+               free = fs.Bavail * uint64(fs.Bsize) / 1024
+       }
+       return
+}
+
+func (v *UnixVolume) String() string {
+       return fmt.Sprintf("[UnixVolume %s]", v.root)
+}
diff --git a/services/keep/src/keep/volume_unix_test.go b/services/keep/src/keep/volume_unix_test.go
new file mode 100644 (file)
index 0000000..278e656
--- /dev/null
@@ -0,0 +1,245 @@
+package main
+
+import (
+       "bytes"
+       "fmt"
+       "io/ioutil"
+       "os"
+       "testing"
+       "time"
+)
+
+func TempUnixVolume(t *testing.T, serialize bool) UnixVolume {
+       d, err := ioutil.TempDir("", "volume_test")
+       if err != nil {
+               t.Fatal(err)
+       }
+       return MakeUnixVolume(d, serialize)
+}
+
+func _teardown(v UnixVolume) {
+       if v.queue != nil {
+               close(v.queue)
+       }
+       os.RemoveAll(v.root)
+}
+
+// store writes a Keep block directly into a UnixVolume, for testing
+// UnixVolume methods.
+//
+func _store(t *testing.T, vol UnixVolume, filename string, block []byte) {
+       blockdir := fmt.Sprintf("%s/%s", vol.root, filename[:3])
+       if err := os.MkdirAll(blockdir, 0755); err != nil {
+               t.Fatal(err)
+       }
+
+       blockpath := fmt.Sprintf("%s/%s", blockdir, filename)
+       if f, err := os.Create(blockpath); err == nil {
+               f.Write(block)
+               f.Close()
+       } else {
+               t.Fatal(err)
+       }
+}
+
+func TestGet(t *testing.T) {
+       v := TempUnixVolume(t, false)
+       defer _teardown(v)
+       _store(t, v, TEST_HASH, TEST_BLOCK)
+
+       buf, err := v.Get(TEST_HASH)
+       if err != nil {
+               t.Error(err)
+       }
+       if bytes.Compare(buf, TEST_BLOCK) != 0 {
+               t.Errorf("expected %s, got %s", string(TEST_BLOCK), string(buf))
+       }
+}
+
+func TestGetNotFound(t *testing.T) {
+       v := TempUnixVolume(t, false)
+       defer _teardown(v)
+       _store(t, v, TEST_HASH, TEST_BLOCK)
+
+       buf, err := v.Get(TEST_HASH_2)
+       switch {
+       case os.IsNotExist(err):
+               break
+       case err == nil:
+               t.Errorf("Read should have failed, returned %s", string(buf))
+       default:
+               t.Errorf("Read expected ErrNotExist, got: %s", err)
+       }
+}
+
+func TestPut(t *testing.T) {
+       v := TempUnixVolume(t, false)
+       defer _teardown(v)
+
+       err := v.Put(TEST_HASH, TEST_BLOCK)
+       if err != nil {
+               t.Error(err)
+       }
+       p := fmt.Sprintf("%s/%s/%s", v.root, TEST_HASH[:3], TEST_HASH)
+       if buf, err := ioutil.ReadFile(p); err != nil {
+               t.Error(err)
+       } else if bytes.Compare(buf, TEST_BLOCK) != 0 {
+               t.Errorf("Write should have stored %s, did store %s",
+                       string(TEST_BLOCK), string(buf))
+       }
+}
+
+func TestPutBadVolume(t *testing.T) {
+       v := TempUnixVolume(t, false)
+       defer _teardown(v)
+
+       os.Chmod(v.root, 000)
+       err := v.Put(TEST_HASH, TEST_BLOCK)
+       if err == nil {
+               t.Error("Write should have failed")
+       }
+}
+
+// Serialization tests: launch a bunch of concurrent
+//
+// TODO(twp): show that the underlying Read/Write operations executed
+// serially and not concurrently. The easiest way to do this is
+// probably to activate verbose or debug logging, capture log output
+// and examine it to confirm that Reads and Writes did not overlap.
+//
+// TODO(twp): a proper test of I/O serialization requires that a
+// second request start while the first one is still underway.
+// Guaranteeing that the test behaves this way requires some tricky
+// synchronization and mocking.  For now we'll just launch a bunch of
+// requests simultaenously in goroutines and demonstrate that they
+// return accurate results.
+//
+func TestGetSerialized(t *testing.T) {
+       // Create a volume with I/O serialization enabled.
+       v := TempUnixVolume(t, true)
+       defer _teardown(v)
+
+       _store(t, v, TEST_HASH, TEST_BLOCK)
+       _store(t, v, TEST_HASH_2, TEST_BLOCK_2)
+       _store(t, v, TEST_HASH_3, TEST_BLOCK_3)
+
+       sem := make(chan int)
+       go func(sem chan int) {
+               buf, err := v.Get(TEST_HASH)
+               if err != nil {
+                       t.Errorf("err1: %v", err)
+               }
+               if bytes.Compare(buf, TEST_BLOCK) != 0 {
+                       t.Errorf("buf should be %s, is %s", string(TEST_BLOCK), string(buf))
+               }
+               sem <- 1
+       }(sem)
+
+       go func(sem chan int) {
+               buf, err := v.Get(TEST_HASH_2)
+               if err != nil {
+                       t.Errorf("err2: %v", err)
+               }
+               if bytes.Compare(buf, TEST_BLOCK_2) != 0 {
+                       t.Errorf("buf should be %s, is %s", string(TEST_BLOCK_2), string(buf))
+               }
+               sem <- 1
+       }(sem)
+
+       go func(sem chan int) {
+               buf, err := v.Get(TEST_HASH_3)
+               if err != nil {
+                       t.Errorf("err3: %v", err)
+               }
+               if bytes.Compare(buf, TEST_BLOCK_3) != 0 {
+                       t.Errorf("buf should be %s, is %s", string(TEST_BLOCK_3), string(buf))
+               }
+               sem <- 1
+       }(sem)
+
+       // Wait for all goroutines to finish
+       for done := 0; done < 3; {
+               done += <-sem
+       }
+}
+
+func TestPutSerialized(t *testing.T) {
+       // Create a volume with I/O serialization enabled.
+       v := TempUnixVolume(t, true)
+       defer _teardown(v)
+
+       sem := make(chan int)
+       go func(sem chan int) {
+               err := v.Put(TEST_HASH, TEST_BLOCK)
+               if err != nil {
+                       t.Errorf("err1: %v", err)
+               }
+               sem <- 1
+       }(sem)
+
+       go func(sem chan int) {
+               err := v.Put(TEST_HASH_2, TEST_BLOCK_2)
+               if err != nil {
+                       t.Errorf("err2: %v", err)
+               }
+               sem <- 1
+       }(sem)
+
+       go func(sem chan int) {
+               err := v.Put(TEST_HASH_3, TEST_BLOCK_3)
+               if err != nil {
+                       t.Errorf("err3: %v", err)
+               }
+               sem <- 1
+       }(sem)
+
+       // Wait for all goroutines to finish
+       for done := 0; done < 2; {
+               done += <-sem
+       }
+
+       // Double check that we actually wrote the blocks we expected to write.
+       buf, err := v.Get(TEST_HASH)
+       if err != nil {
+               t.Errorf("Get #1: %v", err)
+       }
+       if bytes.Compare(buf, TEST_BLOCK) != 0 {
+               t.Errorf("Get #1: expected %s, got %s", string(TEST_BLOCK), string(buf))
+       }
+
+       buf, err = v.Get(TEST_HASH_2)
+       if err != nil {
+               t.Errorf("Get #2: %v", err)
+       }
+       if bytes.Compare(buf, TEST_BLOCK_2) != 0 {
+               t.Errorf("Get #2: expected %s, got %s", string(TEST_BLOCK_2), string(buf))
+       }
+
+       buf, err = v.Get(TEST_HASH_3)
+       if err != nil {
+               t.Errorf("Get #3: %v", err)
+       }
+       if bytes.Compare(buf, TEST_BLOCK_3) != 0 {
+               t.Errorf("Get #3: expected %s, got %s", string(TEST_BLOCK_3), string(buf))
+       }
+}
+
+func TestIsFull(t *testing.T) {
+       v := TempUnixVolume(t, false)
+       defer _teardown(v)
+
+       full_path := v.root + "/full"
+       now := fmt.Sprintf("%d", time.Now().Unix())
+       os.Symlink(now, full_path)
+       if !v.IsFull() {
+               t.Errorf("%s: claims not to be full", v)
+       }
+       os.Remove(full_path)
+
+       // Test with an expired /full link.
+       expired := fmt.Sprintf("%d", time.Now().Unix()-3605)
+       os.Symlink(expired, full_path)
+       if v.IsFull() {
+               t.Errorf("%s: should no longer be full", v)
+       }
+}