From: Tom Clegg Date: Wed, 14 May 2014 22:02:51 +0000 (-0400) Subject: 2762: Merge branch 'master' into 2762-owner-uuid-integrity X-Git-Tag: 1.1.0~2608^2~9^2 X-Git-Url: https://git.arvados.org/arvados.git/commitdiff_plain/3dccfa028282d8b667a7b447ea061b7eecc8618f?hp=fb8dd22df54d0c8f87e97f41e1a1677741a47892 2762: Merge branch 'master' into 2762-owner-uuid-integrity --- diff --git a/apps/workbench/.gitignore b/apps/workbench/.gitignore index 8502e958ba..24a7a84a31 100644 --- a/apps/workbench/.gitignore +++ b/apps/workbench/.gitignore @@ -31,3 +31,7 @@ # SimpleCov reports /coverage + +# Dev/test SSL certificates +/self-signed.key +/self-signed.pem diff --git a/apps/workbench/app/assets/stylesheets/keep_disks.css.scss b/apps/workbench/app/assets/stylesheets/keep_disks.css.scss index 1fe5cd4f3b..e7a1b12c96 100644 --- a/apps/workbench/app/assets/stylesheets/keep_disks.css.scss +++ b/apps/workbench/app/assets/stylesheets/keep_disks.css.scss @@ -6,6 +6,6 @@ div.graph { margin-top: 20px; } -div.graph h3,h4 { +div.graph h3, div.graph h4 { text-align: center; } diff --git a/apps/workbench/app/controllers/api_client_authorizations_controller.rb b/apps/workbench/app/controllers/api_client_authorizations_controller.rb index 8385b6b2d0..85f52f20ab 100644 --- a/apps/workbench/app/controllers/api_client_authorizations_controller.rb +++ b/apps/workbench/app/controllers/api_client_authorizations_controller.rb @@ -1,17 +1,4 @@ class ApiClientAuthorizationsController < ApplicationController - def index - m = model_class.all - items_available = m.items_available - offset = m.result_offset - limit = m.result_limit - filtered = m.to_ary.reject do |x| - x.api_client_id == 0 or (x.expires_at and x.expires_at < Time.now) rescue false - end - ArvadosApiClient::patch_paging_vars(filtered, items_available, offset, limit, nil) - @objects = ArvadosResourceList.new(ApiClientAuthorization) - @objects.results= filtered - super - end def index_pane_list %w(Recent Help) diff --git a/apps/workbench/app/controllers/application_controller.rb b/apps/workbench/app/controllers/application_controller.rb index a3576bc83e..2fbd8c560f 100644 --- a/apps/workbench/app/controllers/application_controller.rb +++ b/apps/workbench/app/controllers/application_controller.rb @@ -65,28 +65,27 @@ class ApplicationController < ActionController::Base end def index + @limit ||= 200 if params[:limit] - limit = params[:limit].to_i - else - limit = 200 + @limit = params[:limit].to_i end + @offset ||= 0 if params[:offset] - offset = params[:offset].to_i - else - offset = 0 + @offset = params[:offset].to_i end + @filters ||= [] if params[:filters] filters = params[:filters] if filters.is_a? String filters = Oj.load filters end - else - filters = [] + @filters += filters end - @objects ||= model_class.filter(filters).limit(limit).offset(offset).all + @objects ||= model_class + @objects = @objects.filter(@filters).limit(@limit).offset(@offset).all respond_to do |f| f.json { render json: @objects } f.html { render } diff --git a/apps/workbench/app/controllers/jobs_controller.rb b/apps/workbench/app/controllers/jobs_controller.rb index 4705bb5204..4746635c72 100644 --- a/apps/workbench/app/controllers/jobs_controller.rb +++ b/apps/workbench/app/controllers/jobs_controller.rb @@ -23,10 +23,11 @@ class JobsController < ApplicationController def index @svg = "" if params[:uuid] - @jobs = Job.where(uuid: params[:uuid]) - generate_provenance(@jobs) + @objects = Job.where(uuid: params[:uuid]) + generate_provenance(@objects) else - @jobs = Job.all + @limit = 20 + super end end diff --git a/apps/workbench/app/controllers/pipeline_instances_controller.rb b/apps/workbench/app/controllers/pipeline_instances_controller.rb index 221ed87ad7..d54cd4961e 100644 --- a/apps/workbench/app/controllers/pipeline_instances_controller.rb +++ b/apps/workbench/app/controllers/pipeline_instances_controller.rb @@ -150,7 +150,7 @@ class PipelineInstancesController < ApplicationController end def index - @objects ||= model_class.limit(20).all + @limit = 20 super end diff --git a/apps/workbench/app/models/arvados_api_client.rb b/apps/workbench/app/models/arvados_api_client.rb index c7f7d3435e..c6d8720c92 100644 --- a/apps/workbench/app/models/arvados_api_client.rb +++ b/apps/workbench/app/models/arvados_api_client.rb @@ -57,7 +57,7 @@ class ArvadosApiClient header = {"Accept" => "application/json"} - profile_checkpoint { "Prepare request #{url} #{query[:uuid]} #{query[:where]}" } + profile_checkpoint { "Prepare request #{url} #{query[:uuid]} #{query[:where]} #{query[:filters]}" } msg = @@api_client.post(url, query, header: header) diff --git a/apps/workbench/app/models/job.rb b/apps/workbench/app/models/job.rb index f88834e0c3..eb81f40230 100644 --- a/apps/workbench/app/models/job.rb +++ b/apps/workbench/app/models/job.rb @@ -2,4 +2,8 @@ class Job < ArvadosBase def attribute_editable?(attr) false end + + def self.creatable? + false + end end diff --git a/apps/workbench/app/models/pipeline_instance.rb b/apps/workbench/app/models/pipeline_instance.rb index d435ad40ea..45e472fae9 100644 --- a/apps/workbench/app/models/pipeline_instance.rb +++ b/apps/workbench/app/models/pipeline_instance.rb @@ -18,7 +18,8 @@ class PipelineInstance < ArvadosBase end def attribute_editable?(attr) - attr && (attr.to_sym == :name || (attr.to_sym == :components and self.active == nil)) + attr && (attr.to_sym == :name || + (attr.to_sym == :components and (self.state == 'New' || self.state == 'Ready'))) end def attributes_for_display diff --git a/apps/workbench/app/views/application/_pipeline_status_label.html.erb b/apps/workbench/app/views/application/_pipeline_status_label.html.erb index 020ce81c57..f68d547aa5 100644 --- a/apps/workbench/app/views/application/_pipeline_status_label.html.erb +++ b/apps/workbench/app/views/application/_pipeline_status_label.html.erb @@ -1,8 +1,8 @@ -<% if p.success %> +<% if p.state == 'Complete' %> finished -<% elsif p.success == false %> +<% elsif p.state == 'Failed' %> failed -<% elsif p.active %> +<% elsif p.state == 'RunningOnServer' || p.state == 'RunningOnClient' %> running <% else %> <% if (p.components.select do |k,v| v[:job] end).length == 0 %> diff --git a/apps/workbench/app/views/jobs/_show_recent.html.erb b/apps/workbench/app/views/jobs/_show_recent.html.erb index 304a3b5c1f..b19b7d93ed 100644 --- a/apps/workbench/app/views/jobs/_show_recent.html.erb +++ b/apps/workbench/app/views/jobs/_show_recent.html.erb @@ -7,6 +7,8 @@ } <% end %> +<%= render partial: "paging", locals: {results: objects, object: @object} %> + @@ -28,7 +30,7 @@ - <% @jobs.sort_by { |j| j[:created_at] }.reverse.each do |j| %> + <% @objects.sort_by { |j| j[:created_at] }.reverse.each do |j| %> - + + @@ -31,6 +32,7 @@ Owner @@ -51,12 +53,14 @@ <%= link_to_if_arvados_object ob.owner_uuid, friendly_name: true %> -
@@ -43,7 +45,7 @@ - <%= link_to_if_arvados_object j.uuid %> + <%= link_to_if_arvados_object j %> <%= j.script %> diff --git a/apps/workbench/app/views/layouts/application.html.erb b/apps/workbench/app/views/layouts/application.html.erb index 2b5ec88fc6..e2db3d9b7e 100644 --- a/apps/workbench/app/views/layouts/application.html.erb +++ b/apps/workbench/app/views/layouts/application.html.erb @@ -77,7 +77,7 @@
Age +
<%= distance_of_time_in_words(ob.created_at, Time.now) %> + + <%= render partial: 'delete_object_button', locals: {object:ob} %>
+ <% ob.components.each do |cname, c| %> <% if c[:job] %> <%= render partial: "job_status_label", locals: {:j => c[:job], :title => cname.to_s } %> diff --git a/sdk/cli/arvados-cli.gemspec b/sdk/cli/arvados-cli.gemspec index c43e3b8c1f..1b01642805 100644 --- a/sdk/cli/arvados-cli.gemspec +++ b/sdk/cli/arvados-cli.gemspec @@ -18,6 +18,7 @@ Gem::Specification.new do |s| s.executables << "arv-run-pipeline-instance" s.executables << "arv-crunch-job" s.executables << "arv-tag" + s.required_ruby_version = '>= 2.1.0' s.add_runtime_dependency 'arvados', '~> 0.1.0' s.add_runtime_dependency 'google-api-client', '~> 0.6.3' s.add_runtime_dependency 'activesupport', '~> 3.2', '>= 3.2.13' diff --git a/sdk/cli/bin/arv b/sdk/cli/bin/arv index f453675ea8..d047204508 100755 --- a/sdk/cli/bin/arv +++ b/sdk/cli/bin/arv @@ -301,14 +301,14 @@ if global_opts[:dry_run] exit end -request_parameters = {}.merge(method_opts) +request_parameters = {_profile:true}.merge(method_opts) resource_body = request_parameters.delete(resource_schema.to_sym) if resource_body request_body = { resource_schema => resource_body } else - request_body = {} + request_body = nil end case api_method @@ -335,12 +335,13 @@ when end exit 0 else - request_body[:api_token] = ENV['ARVADOS_API_TOKEN'] - request_body[:_profile] = true result = client.execute(:api_method => eval(api_method), :parameters => request_parameters, :body => request_body, - :authenticated => false) + :authenticated => false, + :headers => { + authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN'] + }) end begin diff --git a/sdk/cli/bin/arv-run-pipeline-instance b/sdk/cli/bin/arv-run-pipeline-instance index 5f925ef2a7..e552d77f3a 100755 --- a/sdk/cli/bin/arv-run-pipeline-instance +++ b/sdk/cli/bin/arv-run-pipeline-instance @@ -226,10 +226,10 @@ class PipelineInstance :parameters => { :uuid => uuid }, - :body => { - :api_token => ENV['ARVADOS_API_TOKEN'] - }, - :authenticated => false) + :authenticated => false, + :headers => { + authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN'] + }) j = JSON.parse result.body, :symbolize_names => true unless j.is_a? Hash and j[:uuid] debuglog "Failed to get pipeline_instance: #{j[:errors] rescue nil}", 0 @@ -242,10 +242,12 @@ class PipelineInstance def self.create(attributes) result = $client.execute(:api_method => $arvados.pipeline_instances.create, :body => { - :api_token => ENV['ARVADOS_API_TOKEN'], :pipeline_instance => attributes }, - :authenticated => false) + :authenticated => false, + :headers => { + authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN'] + }) j = JSON.parse result.body, :symbolize_names => true unless j.is_a? Hash and j[:uuid] abort "Failed to create pipeline_instance: #{j[:errors] rescue nil} #{j.inspect}" @@ -259,10 +261,12 @@ class PipelineInstance :uuid => @pi[:uuid] }, :body => { - :api_token => ENV['ARVADOS_API_TOKEN'], :pipeline_instance => @attributes_to_update.to_json }, - :authenticated => false) + :authenticated => false, + :headers => { + authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN'] + }) j = JSON.parse result.body, :symbolize_names => true unless j.is_a? Hash and j[:uuid] debuglog "Failed to save pipeline_instance: #{j[:errors] rescue nil}", 0 @@ -291,20 +295,24 @@ class JobCache @cache ||= {} result = $client.execute(:api_method => $arvados.jobs.get, :parameters => { - :api_token => ENV['ARVADOS_API_TOKEN'], :uuid => uuid }, - :authenticated => false) + :authenticated => false, + :headers => { + authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN'] + }) @cache[uuid] = JSON.parse result.body, :symbolize_names => true end def self.where(conditions) result = $client.execute(:api_method => $arvados.jobs.list, :parameters => { - :api_token => ENV['ARVADOS_API_TOKEN'], :limit => 10000, :where => conditions.to_json }, - :authenticated => false) + :authenticated => false, + :headers => { + authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN'] + }) list = JSON.parse result.body, :symbolize_names => true if list and list[:items].is_a? Array list[:items] @@ -315,11 +323,13 @@ class JobCache def self.create(job, create_params) @cache ||= {} result = $client.execute(:api_method => $arvados.jobs.create, - :parameters => { - :api_token => ENV['ARVADOS_API_TOKEN'], + :body => { :job => job.to_json }.merge(create_params), - :authenticated => false) + :authenticated => false, + :headers => { + authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN'] + }) j = JSON.parse result.body, :symbolize_names => true if j.is_a? Hash and j[:uuid] @cache[j[:uuid]] = j @@ -348,10 +358,12 @@ class WhRunPipelineInstance else result = $client.execute(:api_method => $arvados.pipeline_templates.get, :parameters => { - :api_token => ENV['ARVADOS_API_TOKEN'], :uuid => template }, - :authenticated => false) + :authenticated => false, + :headers => { + authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN'] + }) @template = JSON.parse result.body, :symbolize_names => true if !@template[:uuid] abort "#{$0}: fatal: failed to retrieve pipeline template #{template} #{@template[:errors].inspect rescue nil}" @@ -541,7 +553,6 @@ class WhRunPipelineInstance end end @instance[:components] = @components - @instance[:active] = moretodo report_status if @options[:no_wait] @@ -555,7 +566,6 @@ class WhRunPipelineInstance debuglog "interrupt", 0 interrupted = true break - #abort end end end @@ -580,19 +590,13 @@ class WhRunPipelineInstance if interrupted if success - @instance[:active] = false - @instance[:success] = success - @instance[:state] = "Complete" + @instance[:state] = 'Complete' else - @instance[:active] = nil - @instance[:success] = nil @instance[:state] = 'Paused' end else if ended == @components.length or failed > 0 - @instance[:active] = false - @instance[:success] = success - @instance[:state] = success ? "Complete" : "Failed" + @instance[:state] = success ? 'Complete' : 'Failed' end end @@ -604,8 +608,8 @@ class WhRunPipelineInstance end def cleanup - if @instance - @instance[:active] = false + if @instance and @instance[:state] == 'RunningOnClient' + @instance[:state] = 'Paused' @instance.save end end diff --git a/sdk/cli/bin/crunch-job b/sdk/cli/bin/crunch-job index 9995ec7344..f092558cd7 100755 --- a/sdk/cli/bin/crunch-job +++ b/sdk/cli/bin/crunch-job @@ -139,7 +139,7 @@ $SIG{'USR2'} = sub my $arv = Arvados->new('apiVersion' => 'v1'); -my $metastream; +my $local_logfile; my $User = $arv->{'users'}->{'current'}->execute; @@ -185,7 +185,7 @@ else $job_id = $Job->{'uuid'}; my $keep_logfile = $job_id . '.log.txt'; -my $local_logfile = File::Temp->new(); +$local_logfile = File::Temp->new(); $Job->{'runtime_constraints'} ||= {}; $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0; @@ -1251,15 +1251,15 @@ sub Log # ($jobstep_id, $logmessage) $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge; $message .= "\n"; my $datetime; - if ($metastream || -t STDERR) { + if ($local_logfile || -t STDERR) { my @gmtime = gmtime; $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d", $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]); } print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message); - if ($metastream) { - print $metastream $datetime . " " . $message; + if ($local_logfile) { + print $local_logfile $datetime . " " . $message; } } @@ -1272,7 +1272,7 @@ sub croak freeze() if @jobstep_todo; collate_output() if @jobstep_todo; cleanup(); - save_meta() if $metastream; + save_meta() if $local_logfile; die; } diff --git a/sdk/perl/lib/Arvados.pm b/sdk/perl/lib/Arvados.pm index 31258f5172..d5eca9035e 100644 --- a/sdk/perl/lib/Arvados.pm +++ b/sdk/perl/lib/Arvados.pm @@ -41,7 +41,7 @@ environment variable, or C Protocol scheme. Default: C environment variable, or C -=item apiToken +=item authToken Authorization token. Default: C environment variable diff --git a/sdk/perl/lib/Arvados/Request.pm b/sdk/perl/lib/Arvados/Request.pm index 0faed28d1a..07ca763d2b 100644 --- a/sdk/perl/lib/Arvados/Request.pm +++ b/sdk/perl/lib/Arvados/Request.pm @@ -32,11 +32,16 @@ sub process_request { my $self = shift; my %req; - $req{$self->{'method'}} = $self->{'uri'}; + my %content; + my $method = $self->{'method'}; + if ($method eq 'GET' || $method eq 'HEAD') { + $content{'_method'} = $method; + $method = 'POST'; + } + $req{$method} = $self->{'uri'}; $self->{'req'} = new HTTP::Request (%req); $self->{'req'}->header('Authorization' => ('OAuth2 ' . $self->{'authToken'})) if $self->{'authToken'}; $self->{'req'}->header('Accept' => 'application/json'); - my %content; my ($p, $v); while (($p, $v) = each %{$self->{'queryParams'}}) { $content{$p} = (ref($v) eq "") ? $v : JSON::encode_json($v); diff --git a/sdk/ruby/arvados.gemspec b/sdk/ruby/arvados.gemspec index 68c4970867..37e0d800c3 100644 --- a/sdk/ruby/arvados.gemspec +++ b/sdk/ruby/arvados.gemspec @@ -13,6 +13,7 @@ Gem::Specification.new do |s| s.email = 'gem-dev@curoverse.com' s.licenses = ['Apache License, Version 2.0'] s.files = ["lib/arvados.rb"] + s.required_ruby_version = '>= 2.1.0' s.add_dependency('google-api-client', '~> 0.6.3') s.add_dependency('activesupport', '>= 3.2.13') s.add_dependency('json', '>= 1.7.7') diff --git a/sdk/ruby/lib/arvados.rb b/sdk/ruby/lib/arvados.rb index 567423ff4f..429777e73f 100644 --- a/sdk/ruby/lib/arvados.rb +++ b/sdk/ruby/lib/arvados.rb @@ -210,8 +210,6 @@ class Arvados end def self.api_exec(method, parameters={}) api_method = arvados_api.send(api_models_sym).send(method.name.to_sym) - parameters = parameters. - merge(:api_token => arvados.config['ARVADOS_API_TOKEN']) parameters.each do |k,v| parameters[k] = v.to_json if v.is_a? Array or v.is_a? Hash end @@ -230,7 +228,10 @@ class Arvados execute(:api_method => api_method, :authenticated => false, :parameters => parameters, - :body => body) + :body => body, + :headers => { + authorization: 'OAuth2 '+arvados.config['ARVADOS_API_TOKEN'] + }) resp = JSON.parse result.body, :symbolize_names => true if resp[:errors] raise Arvados::TransactionFailedError.new(resp[:errors]) diff --git a/services/api/.gitignore b/services/api/.gitignore index c286717a7e..a1cb5ed033 100644 --- a/services/api/.gitignore +++ b/services/api/.gitignore @@ -24,3 +24,7 @@ # SimpleCov reports /coverage + +# Dev/test SSL certificates +/self-signed.key +/self-signed.pem diff --git a/services/api/Gemfile.lock b/services/api/Gemfile.lock index d574644644..4a4419f806 100644 --- a/services/api/Gemfile.lock +++ b/services/api/Gemfile.lock @@ -35,12 +35,12 @@ GEM addressable (2.3.6) andand (1.3.3) arel (3.0.3) - arvados (0.1.20140414145041) + arvados (0.1.20140513131358) activesupport (>= 3.2.13) andand google-api-client (~> 0.6.3) json (>= 1.7.7) - arvados-cli (0.1.20140414145041) + arvados-cli (0.1.20140513131358) activesupport (~> 3.2, >= 3.2.13) andand (~> 1.3, >= 1.3.3) arvados (~> 0.1.0) @@ -99,7 +99,7 @@ GEM railties (>= 3.0, < 5.0) thor (>= 0.14, < 2.0) json (1.8.1) - jwt (0.1.11) + jwt (0.1.13) multi_json (>= 1.5) launchy (2.4.2) addressable (~> 2.3) @@ -108,7 +108,7 @@ GEM mime-types (~> 1.16) treetop (~> 1.4.8) mime-types (1.25.1) - multi_json (1.9.2) + multi_json (1.10.0) multipart-post (1.2.0) net-scp (1.2.0) net-ssh (>= 2.6.5) @@ -123,7 +123,7 @@ GEM jwt (~> 0.1.4) multi_json (~> 1.0) rack (~> 1.2) - oj (2.7.3) + oj (2.9.0) omniauth (1.1.1) hashie (~> 1.2) rack diff --git a/services/api/app/models/pipeline_instance.rb b/services/api/app/models/pipeline_instance.rb index 9ce0c4dec1..7bb814c60d 100644 --- a/services/api/app/models/pipeline_instance.rb +++ b/services/api/app/models/pipeline_instance.rb @@ -166,6 +166,7 @@ class PipelineInstance < ArvadosModel return false end elsif 'success'.in? changed_attributes + logger.info "pipeline_instance changed_attributes has success for #{self.uuid}" if self.success self.active = false self.state = Complete @@ -174,6 +175,7 @@ class PipelineInstance < ArvadosModel self.state = Failed end elsif 'active'.in? changed_attributes + logger.info "pipeline_instance changed_attributes has active for #{self.uuid}" if self.active if self.state.in? [New, Ready, Paused] self.state = RunningOnServer @@ -194,8 +196,8 @@ class PipelineInstance < ArvadosModel end if new_record? or 'components'.in? changed_attributes - state ||= New - if state == New and self.components_look_ready? + self.state ||= New + if self.state == New and self.components_look_ready? self.state = Ready end end diff --git a/services/api/lib/record_filters.rb b/services/api/lib/record_filters.rb index d3727d30f3..01d0ae4da5 100644 --- a/services/api/lib/record_filters.rb +++ b/services/api/lib/record_filters.rb @@ -54,6 +54,10 @@ module RecordFilters if operand.is_a? Array cond_out << "#{ar_table_name}.#{attr} #{operator} (?)" param_out << operand + if operator == 'not in' and not operand.include?(nil) + # explicitly allow NULL + cond_out[-1] = "(#{cond_out[-1]} OR #{ar_table_name}.#{attr} IS NULL)" + end else raise ArgumentError.new("Invalid operand type '#{operand.class}' "\ "for '#{operator}' operator in filters") diff --git a/services/api/script/crunch-dispatch.rb b/services/api/script/crunch-dispatch.rb index f15258d420..43a527afac 100755 --- a/services/api/script/crunch-dispatch.rb +++ b/services/api/script/crunch-dispatch.rb @@ -314,11 +314,21 @@ class Dispatcher j_done[:wait_thr].value jobrecord = Job.find_by_uuid(job_done.uuid) - jobrecord.running = false - jobrecord.finished_at ||= Time.now - # Don't set 'jobrecord.success = false' because if the job failed to run due to an - # issue with crunch-job or slurm, we want the job to stay in the queue. - jobrecord.save! + if jobrecord.started_at + # Clean up state fields in case crunch-job exited without + # putting the job in a suitable "finished" state. + jobrecord.running = false + jobrecord.finished_at ||= Time.now + if jobrecord.success.nil? + jobrecord.success = false + end + jobrecord.save! + else + # Don't fail the job if crunch-job didn't even get as far as + # starting it. If the job failed to run due to an infrastructure + # issue with crunch-job or slurm, we want the job to stay in the + # queue. + end # Invalidate the per-job auth token j_done[:job_auth].update_attributes expires_at: Time.now diff --git a/services/api/test/functional/arvados/v1/filters_test.rb b/services/api/test/functional/arvados/v1/filters_test.rb new file mode 100644 index 0000000000..a5582e66b5 --- /dev/null +++ b/services/api/test/functional/arvados/v1/filters_test.rb @@ -0,0 +1,16 @@ +require 'test_helper' + +class Arvados::V1::FiltersTest < ActionController::TestCase + test '"not in" filter passes null values' do + @controller = Arvados::V1::GroupsController.new + authorize_with :admin + get :index, { + filters: [ ['group_class', 'not in', ['folder']] ], + controller: 'groups', + } + assert_response :success + found = assigns(:objects) + assert_includes(found.collect(&:group_class), nil, + "'group_class not in ['folder']' filter should pass null") + end +end diff --git a/services/datamanager/experimental/datamanager.py b/services/datamanager/experimental/datamanager.py new file mode 100755 index 0000000000..8207bdcd5c --- /dev/null +++ b/services/datamanager/experimental/datamanager.py @@ -0,0 +1,887 @@ +#! /usr/bin/env python + +import arvados + +import argparse +import cgi +import csv +import json +import logging +import math +import pprint +import re +import threading +import urllib2 + +from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler +from collections import defaultdict, Counter +from functools import partial +from operator import itemgetter +from SocketServer import ThreadingMixIn + +arv = arvados.api('v1') + +# Adapted from http://stackoverflow.com/questions/4180980/formatting-data-quantity-capacity-as-string +byteunits = ('B', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB', 'EiB', 'ZiB', 'YiB') +def fileSizeFormat(value): + exponent = 0 if value == 0 else int(math.log(value, 1024)) + return "%7.2f %-3s" % (float(value) / pow(1024, exponent), + byteunits[exponent]) + +def percentageFloor(x): + """ Returns a float which is the input rounded down to the neared 0.01. + +e.g. precentageFloor(0.941354) = 0.94 +""" + return math.floor(x*100) / 100.0 + + +def byteSizeFromValidUuid(valid_uuid): + return int(valid_uuid.split('+')[1]) + +class maxdict(dict): + """A dictionary that holds the largest value entered for each key.""" + def addValue(self, key, value): + dict.__setitem__(self, key, max(dict.get(self, key), value)) + def addValues(self, kv_pairs): + for key,value in kv_pairs: + self.addValue(key, value) + def addDict(self, d): + self.addValues(d.items()) + +class CollectionInfo: + DEFAULT_PERSISTER_REPLICATION_LEVEL=2 + all_by_uuid = {} + + def __init__(self, uuid): + if CollectionInfo.all_by_uuid.has_key(uuid): + raise ValueError('Collection for uuid "%s" already exists.' % uuid) + self.uuid = uuid + self.block_uuids = set() # uuids of keep blocks in this collection + self.reader_uuids = set() # uuids of users who can read this collection + self.persister_uuids = set() # uuids of users who want this collection saved + # map from user uuid to replication level they desire + self.persister_replication = maxdict() + + # The whole api response in case we need anything else later. + self.api_response = [] + CollectionInfo.all_by_uuid[uuid] = self + + def byteSize(self): + return sum(map(byteSizeFromValidUuid, self.block_uuids)) + + def __str__(self): + return ('CollectionInfo uuid: %s\n' + ' %d block(s) containing %s\n' + ' reader_uuids: %s\n' + ' persister_replication: %s' % + (self.uuid, + len(self.block_uuids), + fileSizeFormat(self.byteSize()), + pprint.pformat(self.reader_uuids, indent = 15), + pprint.pformat(self.persister_replication, indent = 15))) + + @staticmethod + def get(uuid): + if not CollectionInfo.all_by_uuid.has_key(uuid): + CollectionInfo(uuid) + return CollectionInfo.all_by_uuid[uuid] + + +def extractUuid(candidate): + """ Returns a canonical (hash+size) uuid from a valid uuid, or None if candidate is not a valid uuid.""" + match = re.match('([0-9a-fA-F]{32}\+[0-9]+)(\+[^+]+)*$', candidate) + return match and match.group(1) + +def checkUserIsAdmin(): + current_user = arv.users().current().execute() + + if not current_user['is_admin']: + log.warning('Current user %s (%s - %s) does not have ' + 'admin access and will not see much of the data.', + current_user['full_name'], + current_user['email'], + current_user['uuid']) + if args.require_admin_user: + log.critical('Exiting, rerun with --no-require-admin-user ' + 'if you wish to continue.') + exit(1) + +def buildCollectionsList(): + if args.uuid: + return [args.uuid,] + else: + collections_list_response = arv.collections().list(limit=args.max_api_results).execute() + + print ('Returned %d of %d collections.' % + (len(collections_list_response['items']), + collections_list_response['items_available'])) + + return [item['uuid'] for item in collections_list_response['items']] + + +def readCollections(collection_uuids): + for collection_uuid in collection_uuids: + collection_block_uuids = set() + collection_response = arv.collections().get(uuid=collection_uuid).execute() + collection_info = CollectionInfo.get(collection_uuid) + collection_info.api_response = collection_response + manifest_lines = collection_response['manifest_text'].split('\n') + + if args.verbose: + print 'Manifest text for %s:' % collection_uuid + pprint.pprint(manifest_lines) + + for manifest_line in manifest_lines: + if manifest_line: + manifest_tokens = manifest_line.split(' ') + if args.verbose: + print 'manifest tokens: ' + pprint.pformat(manifest_tokens) + stream_name = manifest_tokens[0] + + line_block_uuids = set(filter(None, + [extractUuid(candidate) + for candidate in manifest_tokens[1:]])) + collection_info.block_uuids.update(line_block_uuids) + + # file_tokens = [token + # for token in manifest_tokens[1:] + # if extractUuid(token) is None] + + # # Sort file tokens by start position in case they aren't already + # file_tokens.sort(key=lambda file_token: int(file_token.split(':')[0])) + + # if args.verbose: + # print 'line_block_uuids: ' + pprint.pformat(line_block_uuids) + # print 'file_tokens: ' + pprint.pformat(file_tokens) + + +def readLinks(): + link_classes = set() + + for collection_uuid,collection_info in CollectionInfo.all_by_uuid.items(): + # TODO(misha): We may not be seing all the links, but since items + # available does not return an accurate number, I don't knos how + # to confirm that we saw all of them. + collection_links_response = arv.links().list(where={'head_uuid':collection_uuid}).execute() + link_classes.update([link['link_class'] for link in collection_links_response['items']]) + for link in collection_links_response['items']: + if link['link_class'] == 'permission': + collection_info.reader_uuids.add(link['tail_uuid']) + elif link['link_class'] == 'resources': + replication_level = link['properties'].get( + 'replication', + CollectionInfo.DEFAULT_PERSISTER_REPLICATION_LEVEL) + collection_info.persister_replication.addValue( + link['tail_uuid'], + replication_level) + collection_info.persister_uuids.add(link['tail_uuid']) + + print 'Found the following link classes:' + pprint.pprint(link_classes) + +def reportMostPopularCollections(): + most_popular_collections = sorted( + CollectionInfo.all_by_uuid.values(), + key=lambda info: len(info.reader_uuids) + 10 * len(info.persister_replication), + reverse=True)[:10] + + print 'Most popular Collections:' + for collection_info in most_popular_collections: + print collection_info + + +def buildMaps(): + for collection_uuid,collection_info in CollectionInfo.all_by_uuid.items(): + # Add the block holding the manifest itself for all calculations + block_uuids = collection_info.block_uuids.union([collection_uuid,]) + for block_uuid in block_uuids: + block_to_collections[block_uuid].add(collection_uuid) + block_to_readers[block_uuid].update(collection_info.reader_uuids) + block_to_persisters[block_uuid].update(collection_info.persister_uuids) + block_to_persister_replication[block_uuid].addDict( + collection_info.persister_replication) + for reader_uuid in collection_info.reader_uuids: + reader_to_collections[reader_uuid].add(collection_uuid) + reader_to_blocks[reader_uuid].update(block_uuids) + for persister_uuid in collection_info.persister_uuids: + persister_to_collections[persister_uuid].add(collection_uuid) + persister_to_blocks[persister_uuid].update(block_uuids) + + +def itemsByValueLength(original): + return sorted(original.items(), + key=lambda item:len(item[1]), + reverse=True) + + +def reportBusiestUsers(): + busiest_readers = itemsByValueLength(reader_to_collections) + print 'The busiest readers are:' + for reader,collections in busiest_readers: + print '%s reading %d collections.' % (reader, len(collections)) + busiest_persisters = itemsByValueLength(persister_to_collections) + print 'The busiest persisters are:' + for persister,collections in busiest_persisters: + print '%s reading %d collections.' % (persister, len(collections)) + + +def blockDiskUsage(block_uuid): + """Returns the disk usage of a block given its uuid. + + Will return 0 before reading the contents of the keep servers. + """ + return byteSizeFromValidUuid(block_uuid) * block_to_replication[block_uuid] + +def blockPersistedUsage(user_uuid, block_uuid): + return (byteSizeFromValidUuid(block_uuid) * + block_to_persister_replication[block_uuid].get(user_uuid, 0)) + +memo_computeWeightedReplicationCosts = {} +def computeWeightedReplicationCosts(replication_levels): + """Computes the relative cost of varied replication levels. + + replication_levels: a tuple of integers representing the desired + replication level. If n users want a replication level of x then x + should appear n times in replication_levels. + + Returns a dictionary from replication level to cost. + + The basic thinking is that the cost of replicating at level x should + be shared by everyone who wants replication of level x or higher. + + For example, if we have two users who want 1 copy, one user who + wants 3 copies and two users who want 6 copies: + the input would be [1, 1, 3, 6, 6] (or any permutation) + + The cost of the first copy is shared by all 5 users, so they each + pay 1 copy / 5 users = 0.2. + The cost of the second and third copies shared by 3 users, so they + each pay 2 copies / 3 users = 0.67 (plus the above costs) + The cost of the fourth, fifth and sixth copies is shared by two + users, so they each pay 3 copies / 2 users = 1.5 (plus the above costs) + + Here are some other examples: + computeWeightedReplicationCosts([1,]) -> {1:1.0} + computeWeightedReplicationCosts([2,]) -> {2:2.0} + computeWeightedReplicationCosts([1,1]) -> {1:0.5} + computeWeightedReplicationCosts([2,2]) -> {1:1.0} + computeWeightedReplicationCosts([1,2]) -> {1:0.5,2:1.5} + computeWeightedReplicationCosts([1,3]) -> {1:0.5,2:2.5} + computeWeightedReplicationCosts([1,3,6,6,10]) -> {1:0.2,3:0.7,6:1.7,10:5.7} + """ + replication_level_counts = sorted(Counter(replication_levels).items()) + + memo_key = str(replication_level_counts) + + if not memo_key in memo_computeWeightedReplicationCosts: + last_level = 0 + current_cost = 0 + total_interested = float(sum(map(itemgetter(1), replication_level_counts))) + cost_for_level = {} + for replication_level, count in replication_level_counts: + copies_added = replication_level - last_level + # compute marginal cost from last level and add it to the last cost + current_cost += copies_added / total_interested + cost_for_level[replication_level] = current_cost + # update invariants + last_level = replication_level + total_interested -= count + memo_computeWeightedReplicationCosts[memo_key] = cost_for_level + + return memo_computeWeightedReplicationCosts[memo_key] + +def blockPersistedWeightedUsage(user_uuid, block_uuid): + persister_replication_for_block = block_to_persister_replication[block_uuid] + user_replication = persister_replication_for_block[user_uuid] + return ( + byteSizeFromValidUuid(block_uuid) * + computeWeightedReplicationCosts( + persister_replication_for_block.values())[user_replication]) + + +def computeUserStorageUsage(): + for user, blocks in reader_to_blocks.items(): + user_to_usage[user][UNWEIGHTED_READ_SIZE_COL] = sum(map( + byteSizeFromValidUuid, + blocks)) + user_to_usage[user][WEIGHTED_READ_SIZE_COL] = sum(map( + lambda block_uuid:(float(byteSizeFromValidUuid(block_uuid))/ + len(block_to_readers[block_uuid])), + blocks)) + for user, blocks in persister_to_blocks.items(): + user_to_usage[user][UNWEIGHTED_PERSIST_SIZE_COL] = sum(map( + partial(blockPersistedUsage, user), + blocks)) + user_to_usage[user][WEIGHTED_PERSIST_SIZE_COL] = sum(map( + partial(blockPersistedWeightedUsage, user), + blocks)) + +def printUserStorageUsage(): + print ('user: unweighted readable block size, weighted readable block size, ' + 'unweighted persisted block size, weighted persisted block size:') + for user, usage in user_to_usage.items(): + print ('%s: %s %s %s %s' % + (user, + fileSizeFormat(usage[UNWEIGHTED_READ_SIZE_COL]), + fileSizeFormat(usage[WEIGHTED_READ_SIZE_COL]), + fileSizeFormat(usage[UNWEIGHTED_PERSIST_SIZE_COL]), + fileSizeFormat(usage[WEIGHTED_PERSIST_SIZE_COL]))) + +def logUserStorageUsage(): + for user, usage in user_to_usage.items(): + body = {} + # user could actually represent a user or a group. We don't set + # the object_type field since we don't know which we have. + body['object_uuid'] = user + body['event_type'] = args.user_storage_log_event_type + properties = {} + properties['read_collections_total_bytes'] = usage[UNWEIGHTED_READ_SIZE_COL] + properties['read_collections_weighted_bytes'] = ( + usage[WEIGHTED_READ_SIZE_COL]) + properties['persisted_collections_total_bytes'] = ( + usage[UNWEIGHTED_PERSIST_SIZE_COL]) + properties['persisted_collections_weighted_bytes'] = ( + usage[WEIGHTED_PERSIST_SIZE_COL]) + body['properties'] = properties + # TODO(misha): Confirm that this will throw an exception if it + # fails to create the log entry. + arv.logs().create(body=body).execute() + +def getKeepServers(): + response = arv.keep_disks().list().execute() + return [[keep_server['service_host'], keep_server['service_port']] + for keep_server in response['items']] + + +def getKeepBlocks(keep_servers): + blocks = [] + for host,port in keep_servers: + response = urllib2.urlopen('http://%s:%d/index' % (host, port)) + server_blocks = [line.split(' ') + for line in response.read().split('\n') + if line] + server_blocks = [(block_id, int(mtime)) + for block_id, mtime in server_blocks] + blocks.append(server_blocks) + return blocks + +def getKeepStats(keep_servers): + MOUNT_COLUMN = 5 + TOTAL_COLUMN = 1 + FREE_COLUMN = 3 + DISK_BLOCK_SIZE = 1024 + stats = [] + for host,port in keep_servers: + response = urllib2.urlopen('http://%s:%d/status.json' % (host, port)) + + parsed_json = json.load(response) + df_entries = [line.split() + for line in parsed_json['df'].split('\n') + if line] + keep_volumes = [columns + for columns in df_entries + if 'keep' in columns[MOUNT_COLUMN]] + total_space = DISK_BLOCK_SIZE*sum(map(int,map(itemgetter(TOTAL_COLUMN), + keep_volumes))) + free_space = DISK_BLOCK_SIZE*sum(map(int,map(itemgetter(FREE_COLUMN), + keep_volumes))) + stats.append([total_space, free_space]) + return stats + + +def computeReplication(keep_blocks): + for server_blocks in keep_blocks: + for block_uuid, _ in server_blocks: + block_to_replication[block_uuid] += 1 + log.debug('Seeing the following replication levels among blocks: %s', + str(set(block_to_replication.values()))) + + +def computeGarbageCollectionCandidates(): + for server_blocks in keep_blocks: + block_to_latest_mtime.addValues(server_blocks) + empty_set = set() + garbage_collection_priority = sorted( + [(block,mtime) + for block,mtime in block_to_latest_mtime.items() + if len(block_to_persisters.get(block,empty_set)) == 0], + key = itemgetter(1)) + global garbage_collection_report + garbage_collection_report = [] + cumulative_disk_size = 0 + for block,mtime in garbage_collection_priority: + disk_size = blockDiskUsage(block) + cumulative_disk_size += disk_size + garbage_collection_report.append( + (block, + mtime, + disk_size, + cumulative_disk_size, + float(free_keep_space + cumulative_disk_size)/total_keep_space)) + + print 'The oldest Garbage Collection Candidates: ' + pprint.pprint(garbage_collection_report[:20]) + + +def outputGarbageCollectionReport(filename): + with open(filename, 'wb') as csvfile: + gcwriter = csv.writer(csvfile) + gcwriter.writerow(['block uuid', 'latest mtime', 'disk size', + 'cumulative size', 'disk free']) + for line in garbage_collection_report: + gcwriter.writerow(line) + +def computeGarbageCollectionHistogram(): + # TODO(misha): Modify this to allow users to specify the number of + # histogram buckets through a flag. + histogram = [] + last_percentage = -1 + for _,mtime,_,_,disk_free in garbage_collection_report: + curr_percentage = percentageFloor(disk_free) + if curr_percentage > last_percentage: + histogram.append( (mtime, curr_percentage) ) + last_percentage = curr_percentage + + log.info('Garbage collection histogram is: %s', histogram) + + return histogram + + +def logGarbageCollectionHistogram(): + body = {} + # TODO(misha): Decide whether we should specify an object_uuid in + # the body and if so, which uuid to use. + body['event_type'] = args.block_age_free_space_histogram_log_event_type + properties = {} + properties['histogram'] = garbage_collection_histogram + body['properties'] = properties + # TODO(misha): Confirm that this will throw an exception if it + # fails to create the log entry. + arv.logs().create(body=body).execute() + + +def detectReplicationProblems(): + blocks_not_in_any_collections.update( + set(block_to_replication.keys()).difference(block_to_collections.keys())) + underreplicated_persisted_blocks.update( + [uuid + for uuid, persister_replication in block_to_persister_replication.items() + if len(persister_replication) > 0 and + block_to_replication[uuid] < max(persister_replication.values())]) + overreplicated_persisted_blocks.update( + [uuid + for uuid, persister_replication in block_to_persister_replication.items() + if len(persister_replication) > 0 and + block_to_replication[uuid] > max(persister_replication.values())]) + + log.info('Found %d blocks not in any collections, e.g. %s...', + len(blocks_not_in_any_collections), + ','.join(list(blocks_not_in_any_collections)[:5])) + log.info('Found %d underreplicated blocks, e.g. %s...', + len(underreplicated_persisted_blocks), + ','.join(list(underreplicated_persisted_blocks)[:5])) + log.info('Found %d overreplicated blocks, e.g. %s...', + len(overreplicated_persisted_blocks), + ','.join(list(overreplicated_persisted_blocks)[:5])) + + # TODO: + # Read blocks sorted by mtime + # Cache window vs % free space + # Collections which candidates will appear in + # Youngest underreplicated read blocks that appear in collections. + # Report Collections that have blocks which are missing from (or + # underreplicated in) keep. + + +# This is the main flow here + +parser = argparse.ArgumentParser(description='Report on keep disks.') +"""The command line argument parser we use. + +We only use it in the __main__ block, but leave it outside the block +in case another package wants to use it or customize it by specifying +it as a parent to their commandline parser. +""" +parser.add_argument('-m', + '--max-api-results', + type=int, + default=5000, + help=('The max results to get at once.')) +parser.add_argument('-p', + '--port', + type=int, + default=9090, + help=('The port number to serve on. 0 means no server.')) +parser.add_argument('-v', + '--verbose', + help='increase output verbosity', + action='store_true') +parser.add_argument('-u', + '--uuid', + help='uuid of specific collection to process') +parser.add_argument('--require-admin-user', + action='store_true', + default=True, + help='Fail if the user is not an admin [default]') +parser.add_argument('--no-require-admin-user', + dest='require_admin_user', + action='store_false', + help=('Allow users without admin permissions with ' + 'only a warning.')) +parser.add_argument('--log-to-workbench', + action='store_true', + default=False, + help='Log findings to workbench') +parser.add_argument('--no-log-to-workbench', + dest='log_to_workbench', + action='store_false', + help='Don\'t log findings to workbench [default]') +parser.add_argument('--user-storage-log-event-type', + default='user-storage-report', + help=('The event type to set when logging user ' + 'storage usage to workbench.')) +parser.add_argument('--block-age-free-space-histogram-log-event-type', + default='block-age-free-space-histogram', + help=('The event type to set when logging user ' + 'storage usage to workbench.')) +parser.add_argument('--garbage-collection-file', + default='', + help=('The file to write a garbage collection report, or ' + 'leave empty for no report.')) + +args = None + +# TODO(misha): Think about moving some of this to the __main__ block. +log = logging.getLogger('arvados.services.datamanager') +stderr_handler = logging.StreamHandler() +log.setLevel(logging.INFO) +stderr_handler.setFormatter( + logging.Formatter('%(asctime)-15s %(levelname)-8s %(message)s')) +log.addHandler(stderr_handler) + +# Global Data - don't try this at home +collection_uuids = [] + +# These maps all map from uuids to a set of uuids +block_to_collections = defaultdict(set) # keep blocks +reader_to_collections = defaultdict(set) # collection(s) for which the user has read access +persister_to_collections = defaultdict(set) # collection(s) which the user has persisted +block_to_readers = defaultdict(set) +block_to_persisters = defaultdict(set) +block_to_persister_replication = defaultdict(maxdict) +reader_to_blocks = defaultdict(set) +persister_to_blocks = defaultdict(set) + +UNWEIGHTED_READ_SIZE_COL = 0 +WEIGHTED_READ_SIZE_COL = 1 +UNWEIGHTED_PERSIST_SIZE_COL = 2 +WEIGHTED_PERSIST_SIZE_COL = 3 +NUM_COLS = 4 +user_to_usage = defaultdict(lambda : [0,]*NUM_COLS) + +keep_servers = [] +keep_blocks = [] +keep_stats = [] +total_keep_space = 0 +free_keep_space = 0 + +block_to_replication = defaultdict(lambda: 0) +block_to_latest_mtime = maxdict() + +garbage_collection_report = [] +"""A list of non-persisted blocks, sorted by increasing mtime + +Each entry is of the form (block uuid, latest mtime, disk size, +cumulative size) + +* block uuid: The id of the block we want to delete +* latest mtime: The latest mtime of the block across all keep servers. +* disk size: The total disk space used by this block (block size +multiplied by current replication level) +* cumulative disk size: The sum of this block's disk size and all the +blocks listed above it +* disk free: The proportion of our disk space that would be free if we +deleted this block and all the above. So this is (free disk space + +cumulative disk size) / total disk capacity +""" + +garbage_collection_histogram = [] +""" Shows the tradeoff of keep block age vs keep disk free space. + +Each entry is of the form (mtime, Disk Proportion). + +An entry of the form (1388747781, 0.52) means that if we deleted the +oldest non-presisted blocks until we had 52% of the disk free, then +all blocks with an mtime greater than 1388747781 would be preserved. +""" + +# Stuff to report on +blocks_not_in_any_collections = set() +underreplicated_persisted_blocks = set() +overreplicated_persisted_blocks = set() + +all_data_loaded = False + +def loadAllData(): + checkUserIsAdmin() + + log.info('Building Collection List') + global collection_uuids + collection_uuids = filter(None, [extractUuid(candidate) + for candidate in buildCollectionsList()]) + + log.info('Reading Collections') + readCollections(collection_uuids) + + if args.verbose: + pprint.pprint(CollectionInfo.all_by_uuid) + + log.info('Reading Links') + readLinks() + + reportMostPopularCollections() + + log.info('Building Maps') + buildMaps() + + reportBusiestUsers() + + log.info('Getting Keep Servers') + global keep_servers + keep_servers = getKeepServers() + + print keep_servers + + log.info('Getting Blocks from each Keep Server.') + global keep_blocks + keep_blocks = getKeepBlocks(keep_servers) + + log.info('Getting Stats from each Keep Server.') + global keep_stats, total_keep_space, free_keep_space + keep_stats = getKeepStats(keep_servers) + + total_keep_space = sum(map(itemgetter(0), keep_stats)) + free_keep_space = sum(map(itemgetter(1), keep_stats)) + + # TODO(misha): Delete this hack when the keep servers are fixed! + # This hack deals with the fact that keep servers report each other's disks. + total_keep_space /= len(keep_stats) + free_keep_space /= len(keep_stats) + + log.info('Total disk space: %s, Free disk space: %s (%d%%).' % + (fileSizeFormat(total_keep_space), + fileSizeFormat(free_keep_space), + 100*free_keep_space/total_keep_space)) + + computeReplication(keep_blocks) + + log.info('average replication level is %f', + (float(sum(block_to_replication.values())) / + len(block_to_replication))) + + computeGarbageCollectionCandidates() + + if args.garbage_collection_file: + log.info('Writing garbage Collection report to %s', + args.garbage_collection_file) + outputGarbageCollectionReport(args.garbage_collection_file) + + global garbage_collection_histogram + garbage_collection_histogram = computeGarbageCollectionHistogram() + + if args.log_to_workbench: + logGarbageCollectionHistogram() + + detectReplicationProblems() + + computeUserStorageUsage() + printUserStorageUsage() + if args.log_to_workbench: + logUserStorageUsage() + + global all_data_loaded + all_data_loaded = True + + +class DataManagerHandler(BaseHTTPRequestHandler): + USER_PATH = 'user' + COLLECTION_PATH = 'collection' + BLOCK_PATH = 'block' + + def userLink(self, uuid): + return ('%(uuid)s' % + {'uuid': uuid, + 'path': DataManagerHandler.USER_PATH}) + + def collectionLink(self, uuid): + return ('%(uuid)s' % + {'uuid': uuid, + 'path': DataManagerHandler.COLLECTION_PATH}) + + def blockLink(self, uuid): + return ('%(uuid)s' % + {'uuid': uuid, + 'path': DataManagerHandler.BLOCK_PATH}) + + def writeTop(self, title): + self.wfile.write('%s\n' % title) + + def writeBottom(self): + self.wfile.write('\n') + + def writeHomePage(self): + self.send_response(200) + self.end_headers() + self.writeTop('Home') + self.wfile.write('') + self.wfile.write('\n') + for user, usage in user_to_usage.items(): + self.wfile.write('\n' % + (self.userLink(user), + fileSizeFormat(usage[UNWEIGHTED_READ_SIZE_COL]), + fileSizeFormat(usage[WEIGHTED_READ_SIZE_COL]), + fileSizeFormat(usage[UNWEIGHTED_PERSIST_SIZE_COL]), + fileSizeFormat(usage[WEIGHTED_PERSIST_SIZE_COL]))) + self.wfile.write('
user' + 'unweighted readable block size' + 'weighted readable block size' + 'unweighted persisted block size' + 'weighted persisted block size
%s%s%s%s%s
\n') + self.writeBottom() + + def userExists(self, uuid): + # Currently this will return false for a user who exists but + # doesn't appear on any manifests. + # TODO(misha): Figure out if we need to fix this. + return user_to_usage.has_key(uuid) + + def writeUserPage(self, uuid): + if not self.userExists(uuid): + self.send_error(404, + 'User (%s) Not Found.' % cgi.escape(uuid, quote=False)) + else: + # Here we assume that since a user exists, they don't need to be + # html escaped. + self.send_response(200) + self.end_headers() + self.writeTop('User %s' % uuid) + self.wfile.write('') + self.wfile.write('\n') + usage = user_to_usage[uuid] + self.wfile.write('\n' % + (self.userLink(uuid), + fileSizeFormat(usage[UNWEIGHTED_READ_SIZE_COL]), + fileSizeFormat(usage[WEIGHTED_READ_SIZE_COL]), + fileSizeFormat(usage[UNWEIGHTED_PERSIST_SIZE_COL]), + fileSizeFormat(usage[WEIGHTED_PERSIST_SIZE_COL]))) + self.wfile.write('
user' + 'unweighted readable block size' + 'weighted readable block size' + 'unweighted persisted block size' + 'weighted persisted block size
%s%s%s%s%s
\n') + self.wfile.write('

Persisting Collections: %s\n' % + ', '.join(map(self.collectionLink, + persister_to_collections[uuid]))) + self.wfile.write('

Reading Collections: %s\n' % + ', '.join(map(self.collectionLink, + reader_to_collections[uuid]))) + self.writeBottom() + + def collectionExists(self, uuid): + return CollectionInfo.all_by_uuid.has_key(uuid) + + def writeCollectionPage(self, uuid): + if not self.collectionExists(uuid): + self.send_error(404, + 'Collection (%s) Not Found.' % cgi.escape(uuid, quote=False)) + else: + collection = CollectionInfo.get(uuid) + # Here we assume that since a collection exists, its id doesn't + # need to be html escaped. + self.send_response(200) + self.end_headers() + self.writeTop('Collection %s' % uuid) + self.wfile.write('

Collection %s

\n' % uuid) + self.wfile.write('

Total size %s (not factoring in replication).\n' % + fileSizeFormat(collection.byteSize())) + self.wfile.write('

Readers: %s\n' % + ', '.join(map(self.userLink, collection.reader_uuids))) + + if len(collection.persister_replication) == 0: + self.wfile.write('

No persisters\n') + else: + replication_to_users = defaultdict(set) + for user,replication in collection.persister_replication.items(): + replication_to_users[replication].add(user) + replication_levels = sorted(replication_to_users.keys()) + + self.wfile.write('

%d persisters in %d replication level(s) maxing ' + 'out at %dx replication:\n' % + (len(collection.persister_replication), + len(replication_levels), + replication_levels[-1])) + + # TODO(misha): This code is used twice, let's move it to a method. + self.wfile.write('\n' % + '\n') + for replication_level in replication_levels: + users = replication_to_users[replication_level] + self.wfile.write('
%s
'.join(['Replication Level ' + str(x) + for x in replication_levels])) + self.wfile.write('
%s\n' % '
\n'.join( + map(self.userLink, users))) + self.wfile.write('
\n') + + replication_to_blocks = defaultdict(set) + for block in collection.block_uuids: + replication_to_blocks[block_to_replication[block]].add(block) + replication_levels = sorted(replication_to_blocks.keys()) + self.wfile.write('

%d blocks in %d replication level(s):\n' % + (len(collection.block_uuids), len(replication_levels))) + self.wfile.write('\n' % + '\n') + for replication_level in replication_levels: + blocks = replication_to_blocks[replication_level] + self.wfile.write('
%s
'.join(['Replication Level ' + str(x) + for x in replication_levels])) + self.wfile.write('
%s\n' % '
\n'.join(blocks)) + self.wfile.write('
\n') + + + def do_GET(self): + if not all_data_loaded: + self.send_error(503, + 'Sorry, but I am still loading all the data I need.') + else: + # Removing leading '/' and process request path + split_path = self.path[1:].split('/') + request_type = split_path[0] + log.debug('path (%s) split as %s with request_type %s' % (self.path, + split_path, + request_type)) + if request_type == '': + self.writeHomePage() + elif request_type == DataManagerHandler.USER_PATH: + self.writeUserPage(split_path[1]) + elif request_type == DataManagerHandler.COLLECTION_PATH: + self.writeCollectionPage(split_path[1]) + else: + self.send_error(404, 'Unrecognized request path.') + return + +class ThreadedHTTPServer(ThreadingMixIn, HTTPServer): + """Handle requests in a separate thread.""" + + +if __name__ == '__main__': + args = parser.parse_args() + + if args.port == 0: + loadAllData() + else: + loader = threading.Thread(target = loadAllData, name = 'loader') + loader.start() + + server = ThreadedHTTPServer(('localhost', args.port), DataManagerHandler) + server.serve_forever() diff --git a/services/datamanager/experimental/datamanager_test.py b/services/datamanager/experimental/datamanager_test.py new file mode 100755 index 0000000000..0842c16511 --- /dev/null +++ b/services/datamanager/experimental/datamanager_test.py @@ -0,0 +1,41 @@ +#! /usr/bin/env python + +import datamanager +import unittest + +class TestComputeWeightedReplicationCosts(unittest.TestCase): + def test_obvious(self): + self.assertEqual(datamanager.computeWeightedReplicationCosts([1,]), + {1:1.0}) + + def test_simple(self): + self.assertEqual(datamanager.computeWeightedReplicationCosts([2,]), + {2:2.0}) + + def test_even_split(self): + self.assertEqual(datamanager.computeWeightedReplicationCosts([1,1]), + {1:0.5}) + + def test_even_split_bigger(self): + self.assertEqual(datamanager.computeWeightedReplicationCosts([2,2]), + {2:1.0}) + + def test_uneven_split(self): + self.assertEqual(datamanager.computeWeightedReplicationCosts([1,2]), + {1:0.5, 2:1.5}) + + def test_uneven_split_bigger(self): + self.assertEqual(datamanager.computeWeightedReplicationCosts([1,3]), + {1:0.5, 3:2.5}) + + def test_uneven_split_jumble(self): + self.assertEqual(datamanager.computeWeightedReplicationCosts([1,3,6,6,10]), + {1:0.2, 3:0.7, 6:1.7, 10:5.7}) + + def test_documentation_example(self): + self.assertEqual(datamanager.computeWeightedReplicationCosts([1,1,3,6,6]), + {1:0.2, 3: 0.2 + 2.0 / 3, 6: 0.2 + 2.0 / 3 + 1.5}) + + +if __name__ == '__main__': + unittest.main() diff --git a/services/keep/src/keep/handler_test.go b/services/keep/src/keep/handler_test.go new file mode 100644 index 0000000000..8e7bfea468 --- /dev/null +++ b/services/keep/src/keep/handler_test.go @@ -0,0 +1,438 @@ +// Tests for Keep HTTP handlers: +// +// GetBlockHandler +// PutBlockHandler +// IndexHandler +// +// The HTTP handlers are responsible for enforcing permission policy, +// so these tests must exercise all possible permission permutations. + +package main + +import ( + "bytes" + "github.com/gorilla/mux" + "net/http" + "net/http/httptest" + "regexp" + "testing" + "time" +) + +// A RequestTester represents the parameters for an HTTP request to +// be issued on behalf of a unit test. +type RequestTester struct { + uri string + api_token string + method string + request_body []byte +} + +// Test GetBlockHandler on the following situations: +// - permissions off, unauthenticated request, unsigned locator +// - permissions on, authenticated request, signed locator +// - permissions on, authenticated request, unsigned locator +// - permissions on, unauthenticated request, signed locator +// - permissions on, authenticated request, expired locator +// +func TestGetHandler(t *testing.T) { + defer teardown() + + // Prepare two test Keep volumes. Our block is stored on the second volume. + KeepVM = MakeTestVolumeManager(2) + defer func() { KeepVM.Quit() }() + + vols := KeepVM.Volumes() + if err := vols[0].Put(TEST_HASH, TEST_BLOCK); err != nil { + t.Error(err) + } + + // Set up a REST router for testing the handlers. + rest := MakeRESTRouter() + + // Create locators for testing. + // Turn on permission settings so we can generate signed locators. + enforce_permissions = true + PermissionSecret = []byte(known_key) + permission_ttl = time.Duration(300) * time.Second + + var ( + unsigned_locator = "http://localhost:25107/" + TEST_HASH + valid_timestamp = time.Now().Add(permission_ttl) + expired_timestamp = time.Now().Add(-time.Hour) + signed_locator = "http://localhost:25107/" + SignLocator(TEST_HASH, known_token, valid_timestamp) + expired_locator = "http://localhost:25107/" + SignLocator(TEST_HASH, known_token, expired_timestamp) + ) + + // ----------------- + // Test unauthenticated request with permissions off. + enforce_permissions = false + + // Unauthenticated request, unsigned locator + // => OK + response := IssueRequest(rest, + &RequestTester{ + method: "GET", + uri: unsigned_locator, + }) + ExpectStatusCode(t, + "Unauthenticated request, unsigned locator", http.StatusOK, response) + ExpectBody(t, + "Unauthenticated request, unsigned locator", + string(TEST_BLOCK), + response) + + // ---------------- + // Permissions: on. + enforce_permissions = true + + // Authenticated request, signed locator + // => OK + response = IssueRequest(rest, &RequestTester{ + method: "GET", + uri: signed_locator, + api_token: known_token, + }) + ExpectStatusCode(t, + "Authenticated request, signed locator", http.StatusOK, response) + ExpectBody(t, + "Authenticated request, signed locator", string(TEST_BLOCK), response) + + // Authenticated request, unsigned locator + // => PermissionError + response = IssueRequest(rest, &RequestTester{ + method: "GET", + uri: unsigned_locator, + api_token: known_token, + }) + ExpectStatusCode(t, "unsigned locator", PermissionError.HTTPCode, response) + + // Unauthenticated request, signed locator + // => PermissionError + response = IssueRequest(rest, &RequestTester{ + method: "GET", + uri: signed_locator, + }) + ExpectStatusCode(t, + "Unauthenticated request, signed locator", + PermissionError.HTTPCode, response) + + // Authenticated request, expired locator + // => ExpiredError + response = IssueRequest(rest, &RequestTester{ + method: "GET", + uri: expired_locator, + api_token: known_token, + }) + ExpectStatusCode(t, + "Authenticated request, expired locator", + ExpiredError.HTTPCode, response) +} + +// Test PutBlockHandler on the following situations: +// - no server key +// - with server key, authenticated request, unsigned locator +// - with server key, unauthenticated request, unsigned locator +// +func TestPutHandler(t *testing.T) { + defer teardown() + + // Prepare two test Keep volumes. + KeepVM = MakeTestVolumeManager(2) + defer func() { KeepVM.Quit() }() + + // Set up a REST router for testing the handlers. + rest := MakeRESTRouter() + + // -------------- + // No server key. + + // Unauthenticated request, no server key + // => OK (unsigned response) + unsigned_locator := "http://localhost:25107/" + TEST_HASH + response := IssueRequest(rest, + &RequestTester{ + method: "PUT", + uri: unsigned_locator, + request_body: TEST_BLOCK, + }) + + ExpectStatusCode(t, + "Unauthenticated request, no server key", http.StatusOK, response) + ExpectBody(t, "Unauthenticated request, no server key", TEST_HASH, response) + + // ------------------ + // With a server key. + + PermissionSecret = []byte(known_key) + permission_ttl = time.Duration(300) * time.Second + + // When a permission key is available, the locator returned + // from an authenticated PUT request will be signed. + + // Authenticated PUT, signed locator + // => OK (signed response) + response = IssueRequest(rest, + &RequestTester{ + method: "PUT", + uri: unsigned_locator, + request_body: TEST_BLOCK, + api_token: known_token, + }) + + ExpectStatusCode(t, + "Authenticated PUT, signed locator, with server key", + http.StatusOK, response) + if !VerifySignature(response.Body.String(), known_token) { + t.Errorf("Authenticated PUT, signed locator, with server key:\n"+ + "response '%s' does not contain a valid signature", + response.Body.String()) + } + + // Unauthenticated PUT, unsigned locator + // => OK + response = IssueRequest(rest, + &RequestTester{ + method: "PUT", + uri: unsigned_locator, + request_body: TEST_BLOCK, + }) + + ExpectStatusCode(t, + "Unauthenticated PUT, unsigned locator, with server key", + http.StatusOK, response) + ExpectBody(t, + "Unauthenticated PUT, unsigned locator, with server key", + TEST_HASH, response) +} + +// Test /index requests: +// - enforce_permissions off | unauthenticated /index request +// - enforce_permissions off | unauthenticated /index/prefix request +// - enforce_permissions off | authenticated /index request | non-superuser +// - enforce_permissions off | authenticated /index/prefix request | non-superuser +// - enforce_permissions off | authenticated /index request | superuser +// - enforce_permissions off | authenticated /index/prefix request | superuser +// - enforce_permissions on | unauthenticated /index request +// - enforce_permissions on | unauthenticated /index/prefix request +// - enforce_permissions on | authenticated /index request | non-superuser +// - enforce_permissions on | authenticated /index/prefix request | non-superuser +// - enforce_permissions on | authenticated /index request | superuser +// - enforce_permissions on | authenticated /index/prefix request | superuser +// +// The only /index requests that should succeed are those issued by the +// superuser when enforce_permissions = true. +// +func TestIndexHandler(t *testing.T) { + defer teardown() + + // Set up Keep volumes and populate them. + // Include multiple blocks on different volumes, and + // some metadata files (which should be omitted from index listings) + KeepVM = MakeTestVolumeManager(2) + defer func() { KeepVM.Quit() }() + + vols := KeepVM.Volumes() + vols[0].Put(TEST_HASH, TEST_BLOCK) + vols[1].Put(TEST_HASH_2, TEST_BLOCK_2) + vols[0].Put(TEST_HASH+".meta", []byte("metadata")) + vols[1].Put(TEST_HASH_2+".meta", []byte("metadata")) + + // Set up a REST router for testing the handlers. + rest := MakeRESTRouter() + + data_manager_token = "DATA MANAGER TOKEN" + + unauthenticated_req := &RequestTester{ + method: "GET", + uri: "http://localhost:25107/index", + } + authenticated_req := &RequestTester{ + method: "GET", + uri: "http://localhost:25107/index", + api_token: known_token, + } + superuser_req := &RequestTester{ + method: "GET", + uri: "http://localhost:25107/index", + api_token: data_manager_token, + } + unauth_prefix_req := &RequestTester{ + method: "GET", + uri: "http://localhost:25107/index/" + TEST_HASH[0:3], + } + auth_prefix_req := &RequestTester{ + method: "GET", + uri: "http://localhost:25107/index/" + TEST_HASH[0:3], + api_token: known_token, + } + superuser_prefix_req := &RequestTester{ + method: "GET", + uri: "http://localhost:25107/index/" + TEST_HASH[0:3], + api_token: data_manager_token, + } + + // ---------------------------- + // enforce_permissions disabled + // All /index requests should fail. + enforce_permissions = false + + // unauthenticated /index request + // => PermissionError + response := IssueRequest(rest, unauthenticated_req) + ExpectStatusCode(t, + "enforce_permissions off, unauthenticated request", + PermissionError.HTTPCode, + response) + + // unauthenticated /index/prefix request + // => PermissionError + response = IssueRequest(rest, unauth_prefix_req) + ExpectStatusCode(t, + "enforce_permissions off, unauthenticated /index/prefix request", + PermissionError.HTTPCode, + response) + + // authenticated /index request, non-superuser + // => PermissionError + response = IssueRequest(rest, authenticated_req) + ExpectStatusCode(t, + "enforce_permissions off, authenticated request, non-superuser", + PermissionError.HTTPCode, + response) + + // authenticated /index/prefix request, non-superuser + // => PermissionError + response = IssueRequest(rest, auth_prefix_req) + ExpectStatusCode(t, + "enforce_permissions off, authenticated /index/prefix request, non-superuser", + PermissionError.HTTPCode, + response) + + // authenticated /index request, superuser + // => PermissionError + response = IssueRequest(rest, superuser_req) + ExpectStatusCode(t, + "enforce_permissions off, superuser request", + PermissionError.HTTPCode, + response) + + // superuser /index/prefix request + // => PermissionError + response = IssueRequest(rest, superuser_prefix_req) + ExpectStatusCode(t, + "enforce_permissions off, superuser /index/prefix request", + PermissionError.HTTPCode, + response) + + // --------------------------- + // enforce_permissions enabled + // Only the superuser should be allowed to issue /index requests. + enforce_permissions = true + + // unauthenticated /index request + // => PermissionError + response = IssueRequest(rest, unauthenticated_req) + ExpectStatusCode(t, + "enforce_permissions on, unauthenticated request", + PermissionError.HTTPCode, + response) + + // unauthenticated /index/prefix request + // => PermissionError + response = IssueRequest(rest, unauth_prefix_req) + ExpectStatusCode(t, + "permissions on, unauthenticated /index/prefix request", + PermissionError.HTTPCode, + response) + + // authenticated /index request, non-superuser + // => PermissionError + response = IssueRequest(rest, authenticated_req) + ExpectStatusCode(t, + "permissions on, authenticated request, non-superuser", + PermissionError.HTTPCode, + response) + + // authenticated /index/prefix request, non-superuser + // => PermissionError + response = IssueRequest(rest, auth_prefix_req) + ExpectStatusCode(t, + "permissions on, authenticated /index/prefix request, non-superuser", + PermissionError.HTTPCode, + response) + + // superuser /index request + // => OK + response = IssueRequest(rest, superuser_req) + ExpectStatusCode(t, + "permissions on, superuser request", + http.StatusOK, + response) + + expected := `^` + TEST_HASH + `\+\d+ \d+\n` + + TEST_HASH_2 + `\+\d+ \d+\n$` + match, _ := regexp.MatchString(expected, response.Body.String()) + if !match { + t.Errorf( + "permissions on, superuser request: expected %s, got:\n%s", + expected, response.Body.String()) + } + + // superuser /index/prefix request + // => OK + response = IssueRequest(rest, superuser_prefix_req) + ExpectStatusCode(t, + "permissions on, superuser request", + http.StatusOK, + response) + + expected = `^` + TEST_HASH + `\+\d+ \d+\n$` + match, _ = regexp.MatchString(expected, response.Body.String()) + if !match { + t.Errorf( + "permissions on, superuser /index/prefix request: expected %s, got:\n%s", + expected, response.Body.String()) + } +} + +// ==================== +// Helper functions +// ==================== + +// IssueTestRequest executes an HTTP request described by rt, to a +// specified REST router. It returns the HTTP response to the request. +func IssueRequest(router *mux.Router, rt *RequestTester) *httptest.ResponseRecorder { + response := httptest.NewRecorder() + body := bytes.NewReader(rt.request_body) + req, _ := http.NewRequest(rt.method, rt.uri, body) + if rt.api_token != "" { + req.Header.Set("Authorization", "OAuth "+rt.api_token) + } + router.ServeHTTP(response, req) + return response +} + +// ExpectStatusCode checks whether a response has the specified status code, +// and reports a test failure if not. +func ExpectStatusCode( + t *testing.T, + testname string, + expected_status int, + response *httptest.ResponseRecorder) { + if response.Code != expected_status { + t.Errorf("%s: expected status %s, got %+v", + testname, expected_status, response) + } +} + +func ExpectBody( + t *testing.T, + testname string, + expected_body string, + response *httptest.ResponseRecorder) { + if response.Body.String() != expected_body { + t.Errorf("%s: expected response body '%s', got %+v", + testname, expected_body, response) + } +} diff --git a/services/keep/src/keep/keep.go b/services/keep/src/keep/keep.go index e621955487..7c4173707a 100644 --- a/services/keep/src/keep/keep.go +++ b/services/keep/src/keep/keep.go @@ -15,8 +15,10 @@ import ( "net/http" "os" "regexp" + "strconv" "strings" "syscall" + "time" ) // ====================== @@ -26,6 +28,7 @@ import ( // and/or configuration file settings. // Default TCP address on which to listen for requests. +// Initialized by the --listen flag. const DEFAULT_ADDR = ":25107" // A Keep "block" is 64MB. @@ -38,8 +41,24 @@ const MIN_FREE_KILOBYTES = BLOCKSIZE / 1024 var PROC_MOUNTS = "/proc/mounts" // The Keep VolumeManager maintains a list of available volumes. +// Initialized by the --volumes flag (or by FindKeepVolumes). var KeepVM VolumeManager +// enforce_permissions controls whether permission signatures +// should be enforced (affecting GET and DELETE requests). +// Initialized by the --enforce-permissions flag. +var enforce_permissions bool + +// permission_ttl is the time duration for which new permission +// signatures (returned by PUT requests) will be valid. +// Initialized by the --permission-ttl flag. +var permission_ttl time.Duration + +// data_manager_token represents the API token used by the +// Data Manager, and is required on certain privileged operations. +// Initialized by the --data-manager-token-file flag. +var data_manager_token string + // ========== // Error types. // @@ -49,13 +68,15 @@ type KeepError struct { } var ( - CollisionError = &KeepError{400, "Collision"} - MD5Error = &KeepError{401, "MD5 Failure"} - CorruptError = &KeepError{402, "Corruption"} - NotFoundError = &KeepError{404, "Not Found"} - GenericError = &KeepError{500, "Fail"} - FullError = &KeepError{503, "Full"} - TooLongError = &KeepError{504, "Too Long"} + CollisionError = &KeepError{400, "Collision"} + MD5Error = &KeepError{401, "MD5 Failure"} + PermissionError = &KeepError{401, "Permission denied"} + CorruptError = &KeepError{402, "Corruption"} + ExpiredError = &KeepError{403, "Expired permission signature"} + NotFoundError = &KeepError{404, "Not Found"} + GenericError = &KeepError{500, "Fail"} + FullError = &KeepError{503, "Full"} + TooLongError = &KeepError{504, "Too Long"} ) func (e *KeepError) Error() string { @@ -66,6 +87,11 @@ func (e *KeepError) Error() string { // data exceeds BLOCKSIZE bytes. var ReadErrorTooLong = errors.New("Too long") +// TODO(twp): continue moving as much code as possible out of main +// so it can be effectively tested. Esp. handling and postprocessing +// of command line flags (identifying Keep volumes and initializing +// permission arguments). + func main() { // Parse command-line flags: // @@ -86,14 +112,58 @@ func main() { // by looking at currently mounted filesystems for /keep top-level // directories. - var listen, volumearg string - var serialize_io bool - flag.StringVar(&listen, "listen", DEFAULT_ADDR, - "interface on which to listen for requests, in the format ipaddr:port. e.g. -listen=10.0.1.24:8000. Use -listen=:port to listen on all network interfaces.") - flag.StringVar(&volumearg, "volumes", "", - "Comma-separated list of directories to use for Keep volumes, e.g. -volumes=/var/keep1,/var/keep2. If empty or not supplied, Keep will scan mounted filesystems for volumes with a /keep top-level directory.") - flag.BoolVar(&serialize_io, "serialize", false, - "If set, all read and write operations on local Keep volumes will be serialized.") + var ( + data_manager_token_file string + listen string + permission_key_file string + permission_ttl_sec int + serialize_io bool + volumearg string + ) + flag.StringVar( + &data_manager_token_file, + "data-manager-token-file", + "", + "File with the API token used by the Data Manager. All DELETE "+ + "requests or GET /index requests must carry this token.") + flag.BoolVar( + &enforce_permissions, + "enforce-permissions", + false, + "Enforce permission signatures on requests.") + flag.StringVar( + &listen, + "listen", + DEFAULT_ADDR, + "Interface on which to listen for requests, in the format "+ + "ipaddr:port. e.g. -listen=10.0.1.24:8000. Use -listen=:port "+ + "to listen on all network interfaces.") + flag.StringVar( + &permission_key_file, + "permission-key-file", + "", + "File containing the secret key for generating and verifying "+ + "permission signatures.") + flag.IntVar( + &permission_ttl_sec, + "permission-ttl", + 300, + "Expiration time (in seconds) for newly generated permission "+ + "signatures.") + flag.BoolVar( + &serialize_io, + "serialize", + false, + "If set, all read and write operations on local Keep volumes will "+ + "be serialized.") + flag.StringVar( + &volumearg, + "volumes", + "", + "Comma-separated list of directories to use for Keep volumes, "+ + "e.g. -volumes=/var/keep1,/var/keep2. If empty or not "+ + "supplied, Keep will scan mounted filesystems for volumes "+ + "with a /keep top-level directory.") flag.Parse() // Look for local keep volumes. @@ -123,29 +193,84 @@ func main() { log.Fatal("could not find any keep volumes") } + // Initialize data manager token and permission key. + // If these tokens are specified but cannot be read, + // raise a fatal error. + if data_manager_token_file != "" { + if buf, err := ioutil.ReadFile(data_manager_token_file); err == nil { + data_manager_token = strings.TrimSpace(string(buf)) + } else { + log.Fatalf("reading data manager token: %s\n", err) + } + } + if permission_key_file != "" { + if buf, err := ioutil.ReadFile(permission_key_file); err == nil { + PermissionSecret = bytes.TrimSpace(buf) + } else { + log.Fatalf("reading permission key: %s\n", err) + } + } + + // Initialize permission TTL + permission_ttl = time.Duration(permission_ttl_sec) * time.Second + + // If --enforce-permissions is true, we must have a permission key + // to continue. + if PermissionSecret == nil { + if enforce_permissions { + log.Fatal("--enforce-permissions requires a permission key") + } else { + log.Println("Running without a PermissionSecret. Block locators " + + "returned by this server will not be signed, and will be rejected " + + "by a server that enforces permissions.") + log.Println("To fix this, run Keep with --permission-key-file= " + + "to define the location of a file containing the permission key.") + } + } + // Start a round-robin VolumeManager with the volumes we have found. KeepVM = MakeRRVolumeManager(goodvols) - // Set up REST handlers. - // - // Start with a router that will route each URL path to an - // appropriate handler. - // - rest := mux.NewRouter() - rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD") - rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT") - rest.HandleFunc(`/index`, IndexHandler).Methods("GET", "HEAD") - rest.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD") - rest.HandleFunc(`/status.json`, StatusHandler).Methods("GET", "HEAD") - // Tell the built-in HTTP server to direct all requests to the REST // router. - http.Handle("/", rest) + http.Handle("/", MakeRESTRouter()) // Start listening for requests. http.ListenAndServe(listen, nil) } +// MakeRESTRouter +// Returns a mux.Router that passes GET and PUT requests to the +// appropriate handlers. +// +func MakeRESTRouter() *mux.Router { + rest := mux.NewRouter() + rest.HandleFunc( + `/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD") + rest.HandleFunc( + `/{hash:[0-9a-f]{32}}+A{signature:[0-9a-f]+}@{timestamp:[0-9a-f]+}`, + GetBlockHandler).Methods("GET", "HEAD") + rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT") + + // For IndexHandler we support: + // /index - returns all locators + // /index/{prefix} - returns all locators that begin with {prefix} + // {prefix} is a string of hexadecimal digits between 0 and 32 digits. + // If {prefix} is the empty string, return an index of all locators + // (so /index and /index/ behave identically) + // A client may supply a full 32-digit locator string, in which + // case the server will return an index with either zero or one + // entries. This usage allows a client to check whether a block is + // present, and its size and upload time, without retrieving the + // entire block. + // + rest.HandleFunc(`/index`, IndexHandler).Methods("GET", "HEAD") + rest.HandleFunc( + `/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD") + rest.HandleFunc(`/status.json`, StatusHandler).Methods("GET", "HEAD") + return rest +} + // FindKeepVolumes // Returns a list of Keep volumes mounted on this system. // @@ -162,7 +287,8 @@ func FindKeepVolumes() []string { for scanner.Scan() { args := strings.Fields(scanner.Text()) dev, mount := args[0], args[1] - if (dev == "tmpfs" || strings.HasPrefix(dev, "/dev/")) && mount != "/" { + if mount != "/" && + (dev == "tmpfs" || strings.HasPrefix(dev, "/dev/")) { keep := mount + "/keep" if st, err := os.Stat(keep); err == nil && st.IsDir() { vols = append(vols, keep) @@ -176,16 +302,38 @@ func FindKeepVolumes() []string { return vols } -func GetBlockHandler(w http.ResponseWriter, req *http.Request) { +func GetBlockHandler(resp http.ResponseWriter, req *http.Request) { hash := mux.Vars(req)["hash"] + signature := mux.Vars(req)["signature"] + timestamp := mux.Vars(req)["timestamp"] + + // If permission checking is in effect, verify this + // request's permission signature. + if enforce_permissions { + if signature == "" || timestamp == "" { + http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode) + return + } else if IsExpired(timestamp) { + http.Error(resp, ExpiredError.Error(), ExpiredError.HTTPCode) + return + } else { + validsig := MakePermSignature(hash, GetApiToken(req), timestamp) + if signature != validsig { + http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode) + return + } + } + } block, err := GetBlock(hash) if err != nil { - http.Error(w, err.Error(), 404) + // This type assertion is safe because the only errors + // GetBlock can return are CorruptError or NotFoundError. + http.Error(resp, err.Error(), err.(*KeepError).HTTPCode) return } - _, err = w.Write(block) + _, err = resp.Write(block) if err != nil { log.Printf("GetBlockHandler: writing response: %s", err) } @@ -193,7 +341,7 @@ func GetBlockHandler(w http.ResponseWriter, req *http.Request) { return } -func PutBlockHandler(w http.ResponseWriter, req *http.Request) { +func PutBlockHandler(resp http.ResponseWriter, req *http.Request) { hash := mux.Vars(req)["hash"] // Read the block data to be stored. @@ -208,10 +356,14 @@ func PutBlockHandler(w http.ResponseWriter, req *http.Request) { // if buf, err := ReadAtMost(req.Body, BLOCKSIZE); err == nil { if err := PutBlock(buf, hash); err == nil { - w.WriteHeader(http.StatusOK) + // Success; sign the locator and return it to the client. + api_token := GetApiToken(req) + expiry := time.Now().Add(permission_ttl) + signed_loc := SignLocator(hash, api_token, expiry) + resp.Write([]byte(signed_loc)) } else { ke := err.(*KeepError) - http.Error(w, ke.Error(), ke.HTTPCode) + http.Error(resp, ke.Error(), ke.HTTPCode) } } else { log.Println("error reading request: ", err) @@ -221,21 +373,31 @@ func PutBlockHandler(w http.ResponseWriter, req *http.Request) { // the maximum request size. errmsg = fmt.Sprintf("Max request size %d bytes", BLOCKSIZE) } - http.Error(w, errmsg, 500) + http.Error(resp, errmsg, 500) } } // IndexHandler // A HandleFunc to address /index and /index/{prefix} requests. // -func IndexHandler(w http.ResponseWriter, req *http.Request) { +func IndexHandler(resp http.ResponseWriter, req *http.Request) { prefix := mux.Vars(req)["prefix"] + // Only the data manager may issue /index requests, + // and only if enforce_permissions is enabled. + // All other requests return 403 Permission denied. + api_token := GetApiToken(req) + if !enforce_permissions || + api_token == "" || + data_manager_token != api_token { + http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode) + return + } var index string for _, vol := range KeepVM.Volumes() { index = index + vol.Index(prefix) } - w.Write([]byte(index)) + resp.Write([]byte(index)) } // StatusHandler @@ -261,14 +423,14 @@ type NodeStatus struct { Volumes []*VolumeStatus `json:"volumes"` } -func StatusHandler(w http.ResponseWriter, req *http.Request) { +func StatusHandler(resp http.ResponseWriter, req *http.Request) { st := GetNodeStatus() if jstat, err := json.Marshal(st); err == nil { - w.Write(jstat) + resp.Write(jstat) } else { log.Printf("json.Marshal: %s\n", err) log.Printf("NodeStatus = %v\n", st) - http.Error(w, err.Error(), 500) + http.Error(resp, err.Error(), 500) } } @@ -338,7 +500,7 @@ func GetBlock(hash string) ([]byte, error) { // they should be sent directly to an event manager at high // priority or logged as urgent problems. // - log.Printf("%s: checksum mismatch for request %s (actual hash %s)\n", + log.Printf("%s: checksum mismatch for request %s (actual %s)\n", vol, hash, filehash) return buf, CorruptError } @@ -388,8 +550,8 @@ func PutBlock(block []byte, hash string) error { // If we already have a block on disk under this identifier, return // success (but check for MD5 collisions). // The only errors that GetBlock can return are ErrCorrupt and ErrNotFound. - // In either case, we want to write our new (good) block to disk, so there is - // nothing special to do if err != nil. + // In either case, we want to write our new (good) block to disk, + // so there is nothing special to do if err != nil. if oldblock, err := GetBlock(hash); err == nil { if bytes.Compare(block, oldblock) == 0 { return nil @@ -459,3 +621,27 @@ func IsValidLocator(loc string) bool { log.Printf("IsValidLocator: %s\n", err) return false } + +// GetApiToken returns the OAuth token from the Authorization +// header of a HTTP request, or an empty string if no matching +// token is found. +func GetApiToken(req *http.Request) string { + if auth, ok := req.Header["Authorization"]; ok { + if strings.HasPrefix(auth[0], "OAuth ") { + return auth[0][6:] + } + } + return "" +} + +// IsExpired returns true if the given Unix timestamp (expressed as a +// hexadecimal string) is in the past, or if timestamp_hex cannot be +// parsed as a hexadecimal string. +func IsExpired(timestamp_hex string) bool { + ts, err := strconv.ParseInt(timestamp_hex, 16, 0) + if err != nil { + log.Printf("IsExpired: %s\n", err) + return true + } + return time.Unix(ts, 0).Before(time.Now()) +} diff --git a/services/keep/src/keep/keep_test.go b/services/keep/src/keep/keep_test.go index 30d103da72..6642c72211 100644 --- a/services/keep/src/keep/keep_test.go +++ b/services/keep/src/keep/keep_test.go @@ -348,7 +348,7 @@ func TestIndex(t *testing.T) { match, err := regexp.MatchString(expected, index) if err == nil { if !match { - t.Errorf("IndexLocators returned:\n-----\n%s-----\n", index) + t.Errorf("IndexLocators returned:\n%s", index) } } else { t.Errorf("regexp.MatchString: %s", err) @@ -412,5 +412,8 @@ func MakeTestVolumeManager(num_volumes int) VolumeManager { // Cleanup to perform after each test. // func teardown() { + data_manager_token = "" + enforce_permissions = false + PermissionSecret = nil KeepVM = nil } diff --git a/services/keep/src/keep/perms.go b/services/keep/src/keep/perms.go index 183bc2fbde..0d1b091365 100644 --- a/services/keep/src/keep/perms.go +++ b/services/keep/src/keep/perms.go @@ -50,9 +50,9 @@ import ( // key. var PermissionSecret []byte -// makePermSignature returns a string representing the signed permission +// MakePermSignature returns a string representing the signed permission // hint for the blob identified by blob_hash, api_token and expiration timestamp. -func makePermSignature(blob_hash string, api_token string, expiry string) string { +func MakePermSignature(blob_hash string, api_token string, expiry string) string { hmac := hmac.New(sha1.New, PermissionSecret) hmac.Write([]byte(blob_hash)) hmac.Write([]byte("@")) @@ -66,12 +66,17 @@ func makePermSignature(blob_hash string, api_token string, expiry string) string // SignLocator takes a blob_locator, an api_token and an expiry time, and // returns a signed locator string. func SignLocator(blob_locator string, api_token string, expiry time.Time) string { + // If no permission secret or API token is available, + // return an unsigned locator. + if PermissionSecret == nil || api_token == "" { + return blob_locator + } // Extract the hash from the blob locator, omitting any size hint that may be present. blob_hash := strings.Split(blob_locator, "+")[0] // Return the signed locator string. timestamp_hex := fmt.Sprintf("%08x", expiry.Unix()) return blob_locator + - "+A" + makePermSignature(blob_hash, api_token, timestamp_hex) + + "+A" + MakePermSignature(blob_hash, api_token, timestamp_hex) + "@" + timestamp_hex }