# SimpleCov reports
/coverage
+
+# Dev/test SSL certificates
+/self-signed.key
+/self-signed.pem
div.graph {
margin-top: 20px;
}
-div.graph h3,h4 {
+div.graph h3, div.graph h4 {
text-align: center;
}
class ApiClientAuthorizationsController < ApplicationController
- def index
- m = model_class.all
- items_available = m.items_available
- offset = m.result_offset
- limit = m.result_limit
- filtered = m.to_ary.reject do |x|
- x.api_client_id == 0 or (x.expires_at and x.expires_at < Time.now) rescue false
- end
- ArvadosApiClient::patch_paging_vars(filtered, items_available, offset, limit, nil)
- @objects = ArvadosResourceList.new(ApiClientAuthorization)
- @objects.results= filtered
- super
- end
def index_pane_list
%w(Recent Help)
end
def index
+ @limit ||= 200
if params[:limit]
- limit = params[:limit].to_i
- else
- limit = 200
+ @limit = params[:limit].to_i
end
+ @offset ||= 0
if params[:offset]
- offset = params[:offset].to_i
- else
- offset = 0
+ @offset = params[:offset].to_i
end
+ @filters ||= []
if params[:filters]
filters = params[:filters]
if filters.is_a? String
filters = Oj.load filters
end
- else
- filters = []
+ @filters += filters
end
- @objects ||= model_class.filter(filters).limit(limit).offset(offset).all
+ @objects ||= model_class
+ @objects = @objects.filter(@filters).limit(@limit).offset(@offset).all
respond_to do |f|
f.json { render json: @objects }
f.html { render }
def index
@svg = ""
if params[:uuid]
- @jobs = Job.where(uuid: params[:uuid])
- generate_provenance(@jobs)
+ @objects = Job.where(uuid: params[:uuid])
+ generate_provenance(@objects)
else
- @jobs = Job.all
+ @limit = 20
+ super
end
end
end
def index
- @objects ||= model_class.limit(20).all
+ @limit = 20
super
end
header = {"Accept" => "application/json"}
- profile_checkpoint { "Prepare request #{url} #{query[:uuid]} #{query[:where]}" }
+ profile_checkpoint { "Prepare request #{url} #{query[:uuid]} #{query[:where]} #{query[:filters]}" }
msg = @@api_client.post(url,
query,
header: header)
def attribute_editable?(attr)
false
end
+
+ def self.creatable?
+ false
+ end
end
end
def attribute_editable?(attr)
- attr && (attr.to_sym == :name || (attr.to_sym == :components and self.active == nil))
+ attr && (attr.to_sym == :name ||
+ (attr.to_sym == :components and (self.state == 'New' || self.state == 'Ready')))
end
def attributes_for_display
-<% if p.success %>
+<% if p.state == 'Complete' %>
<span class="label label-success">finished</span>
-<% elsif p.success == false %>
+<% elsif p.state == 'Failed' %>
<span class="label label-danger">failed</span>
-<% elsif p.active %>
+<% elsif p.state == 'RunningOnServer' || p.state == 'RunningOnClient' %>
<span class="label label-info">running</span>
<% else %>
<% if (p.components.select do |k,v| v[:job] end).length == 0 %>
}
<% end %>
+<%= render partial: "paging", locals: {results: objects, object: @object} %>
+
<table class="topalign table">
<thead>
<tr class="contain-align-left">
</thead>
<tbody>
- <% @jobs.sort_by { |j| j[:created_at] }.reverse.each do |j| %>
+ <% @objects.sort_by { |j| j[:created_at] }.reverse.each do |j| %>
<tr class="cell-noborder">
<td>
</div>
</td>
<td>
- <%= link_to_if_arvados_object j.uuid %>
+ <%= link_to_if_arvados_object j %>
</td>
<td>
<%= j.script %>
</li>
<li class="dropdown">
- <a href="#" class="dropdown-toggle" data-toggle="dropdown"><i class="fa fa-lg fa-folder-o fa-fw"></i> Folders <b class="caret"></b></a>
+ <a href="/folders" class="dropdown-toggle" data-toggle="dropdown"><i class="fa fa-lg fa-folder-o fa-fw"></i> Folders <b class="caret"></b></a>
<ul class="dropdown-menu">
<li><%= link_to raw('<i class="fa fa-plus fa-fw"></i> Create new folder'), folders_path, method: :post %></li>
<% @my_top_level_folders.call[0..7].each do |folder| %>
<li><a href="/collections">
<i class="fa fa-lg fa-briefcase fa-fw"></i> Collections (data files)
</a></li>
+ <li><a href="/jobs">
+ <i class="fa fa-lg fa-tasks fa-fw"></i> Jobs
+ </a></li>
<li><a href="/pipeline_instances">
<i class="fa fa-lg fa-tasks fa-fw"></i> Pipeline instances
</a></li>
<% end %>
<% end %>
-<% if @object.active != nil %>
+<% if !@object.state.in? ['New', 'Ready', 'Paused'] %>
<table class="table pipeline-components-table">
<colgroup>
<col style="width: 15%" />
</tfoot>
</table>
-<% if @object.active %>
+<% if @object.state == 'RunningOnServer' || @object.state == 'RunningOnClient' %>
<% content_for :js do %>
setInterval(function(){$('a.refresh').click()}, 15000);
<% end %>
<% content_for :tab_line_buttons do %>
<%= form_tag @object, :method => :put do |f| %>
- <%= hidden_field @object.class.to_s.underscore.singularize.to_sym, :active, :value => false %>
+ <%= hidden_field @object.class.to_s.underscore.singularize.to_sym, :state, :value => 'Paused' %>
<%= button_tag "Stop pipeline", {class: 'btn btn-primary pull-right', id: "run-pipeline-button"} %>
<% end %>
<% end %>
<% else %>
-
- <p>Please set the desired input parameters for the components of this pipeline. Parameters highlighted in red are required.</p>
+ <% if @object.state == 'New' %>
+ <p>Please set the desired input parameters for the components of this pipeline. Parameters highlighted in red are required.</p>
+ <% end %>
<% content_for :tab_line_buttons do %>
<%= form_tag @object, :method => :put do |f| %>
- <%= hidden_field @object.class.to_s.underscore.singularize.to_sym, :active, :value => true %>
+ <%= hidden_field @object.class.to_s.underscore.singularize.to_sym, :state, :value => 'RunningOnServer' %>
<%= button_tag "Run pipeline", {class: 'btn btn-primary pull-right', id: "run-pipeline-button"} %>
<% end %>
<% end %>
- <%= render partial: 'show_components_editable', locals: {editable: true} %>
-
+ <% if @object.state.in? ['New', 'Ready'] %>
+ <%= render partial: 'show_components_editable', locals: {editable: true} %>
+ <% else %>
+ <%= render partial: 'show_components_editable', locals: {editable: false} %>
+ <% end %>
<% end %>
<col width="25%" />
<col width="20%" />
<col width="15%" />
- <col width="20%" />
+ <col width="15%" />
+ <col width="5%" />
</colgroup>
<thead>
<tr class="contain-align-left">
Owner
</th><th>
Age
+ </th><th>
</th>
</tr>
</thead>
<%= link_to_if_arvados_object ob.owner_uuid, friendly_name: true %>
</td><td>
<%= distance_of_time_in_words(ob.created_at, Time.now) %>
+ </td><td>
+ <%= render partial: 'delete_object_button', locals: {object:ob} %>
</td>
</tr>
<tr>
<td style="border-top: 0;" colspan="2">
</td>
- <td style="border-top: 0; opacity: 0.5;" colspan="5">
+ <td style="border-top: 0; opacity: 0.5;" colspan="6">
<% ob.components.each do |cname, c| %>
<% if c[:job] %>
<%= render partial: "job_status_label", locals: {:j => c[:job], :title => cname.to_s } %>
s.executables << "arv-run-pipeline-instance"
s.executables << "arv-crunch-job"
s.executables << "arv-tag"
+ s.required_ruby_version = '>= 2.1.0'
s.add_runtime_dependency 'arvados', '~> 0.1.0'
s.add_runtime_dependency 'google-api-client', '~> 0.6.3'
s.add_runtime_dependency 'activesupport', '~> 3.2', '>= 3.2.13'
exit
end
-request_parameters = {}.merge(method_opts)
+request_parameters = {_profile:true}.merge(method_opts)
resource_body = request_parameters.delete(resource_schema.to_sym)
if resource_body
request_body = {
resource_schema => resource_body
}
else
- request_body = {}
+ request_body = nil
end
case api_method
end
exit 0
else
- request_body[:api_token] = ENV['ARVADOS_API_TOKEN']
- request_body[:_profile] = true
result = client.execute(:api_method => eval(api_method),
:parameters => request_parameters,
:body => request_body,
- :authenticated => false)
+ :authenticated => false,
+ :headers => {
+ authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
+ })
end
begin
:parameters => {
:uuid => uuid
},
- :body => {
- :api_token => ENV['ARVADOS_API_TOKEN']
- },
- :authenticated => false)
+ :authenticated => false,
+ :headers => {
+ authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
+ })
j = JSON.parse result.body, :symbolize_names => true
unless j.is_a? Hash and j[:uuid]
debuglog "Failed to get pipeline_instance: #{j[:errors] rescue nil}", 0
def self.create(attributes)
result = $client.execute(:api_method => $arvados.pipeline_instances.create,
:body => {
- :api_token => ENV['ARVADOS_API_TOKEN'],
:pipeline_instance => attributes
},
- :authenticated => false)
+ :authenticated => false,
+ :headers => {
+ authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
+ })
j = JSON.parse result.body, :symbolize_names => true
unless j.is_a? Hash and j[:uuid]
abort "Failed to create pipeline_instance: #{j[:errors] rescue nil} #{j.inspect}"
:uuid => @pi[:uuid]
},
:body => {
- :api_token => ENV['ARVADOS_API_TOKEN'],
:pipeline_instance => @attributes_to_update.to_json
},
- :authenticated => false)
+ :authenticated => false,
+ :headers => {
+ authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
+ })
j = JSON.parse result.body, :symbolize_names => true
unless j.is_a? Hash and j[:uuid]
debuglog "Failed to save pipeline_instance: #{j[:errors] rescue nil}", 0
@cache ||= {}
result = $client.execute(:api_method => $arvados.jobs.get,
:parameters => {
- :api_token => ENV['ARVADOS_API_TOKEN'],
:uuid => uuid
},
- :authenticated => false)
+ :authenticated => false,
+ :headers => {
+ authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
+ })
@cache[uuid] = JSON.parse result.body, :symbolize_names => true
end
def self.where(conditions)
result = $client.execute(:api_method => $arvados.jobs.list,
:parameters => {
- :api_token => ENV['ARVADOS_API_TOKEN'],
:limit => 10000,
:where => conditions.to_json
},
- :authenticated => false)
+ :authenticated => false,
+ :headers => {
+ authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
+ })
list = JSON.parse result.body, :symbolize_names => true
if list and list[:items].is_a? Array
list[:items]
def self.create(job, create_params)
@cache ||= {}
result = $client.execute(:api_method => $arvados.jobs.create,
- :parameters => {
- :api_token => ENV['ARVADOS_API_TOKEN'],
+ :body => {
:job => job.to_json
}.merge(create_params),
- :authenticated => false)
+ :authenticated => false,
+ :headers => {
+ authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
+ })
j = JSON.parse result.body, :symbolize_names => true
if j.is_a? Hash and j[:uuid]
@cache[j[:uuid]] = j
else
result = $client.execute(:api_method => $arvados.pipeline_templates.get,
:parameters => {
- :api_token => ENV['ARVADOS_API_TOKEN'],
:uuid => template
},
- :authenticated => false)
+ :authenticated => false,
+ :headers => {
+ authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
+ })
@template = JSON.parse result.body, :symbolize_names => true
if !@template[:uuid]
abort "#{$0}: fatal: failed to retrieve pipeline template #{template} #{@template[:errors].inspect rescue nil}"
end
end
@instance[:components] = @components
- @instance[:active] = moretodo
report_status
if @options[:no_wait]
debuglog "interrupt", 0
interrupted = true
break
- #abort
end
end
end
if interrupted
if success
- @instance[:active] = false
- @instance[:success] = success
- @instance[:state] = "Complete"
+ @instance[:state] = 'Complete'
else
- @instance[:active] = nil
- @instance[:success] = nil
@instance[:state] = 'Paused'
end
else
if ended == @components.length or failed > 0
- @instance[:active] = false
- @instance[:success] = success
- @instance[:state] = success ? "Complete" : "Failed"
+ @instance[:state] = success ? 'Complete' : 'Failed'
end
end
end
def cleanup
- if @instance
- @instance[:active] = false
+ if @instance and @instance[:state] == 'RunningOnClient'
+ @instance[:state] = 'Paused'
@instance.save
end
end
my $arv = Arvados->new('apiVersion' => 'v1');
-my $metastream;
+my $local_logfile;
my $User = $arv->{'users'}->{'current'}->execute;
$job_id = $Job->{'uuid'};
my $keep_logfile = $job_id . '.log.txt';
-my $local_logfile = File::Temp->new();
+$local_logfile = File::Temp->new();
$Job->{'runtime_constraints'} ||= {};
$Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
$message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
$message .= "\n";
my $datetime;
- if ($metastream || -t STDERR) {
+ if ($local_logfile || -t STDERR) {
my @gmtime = gmtime;
$datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
$gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
}
print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
- if ($metastream) {
- print $metastream $datetime . " " . $message;
+ if ($local_logfile) {
+ print $local_logfile $datetime . " " . $message;
}
}
freeze() if @jobstep_todo;
collate_output() if @jobstep_todo;
cleanup();
- save_meta() if $metastream;
+ save_meta() if $local_logfile;
die;
}
Protocol scheme. Default: C<ARVADOS_API_PROTOCOL_SCHEME> environment
variable, or C<https>
-=item apiToken
+=item authToken
Authorization token. Default: C<ARVADOS_API_TOKEN> environment variable
{
my $self = shift;
my %req;
- $req{$self->{'method'}} = $self->{'uri'};
+ my %content;
+ my $method = $self->{'method'};
+ if ($method eq 'GET' || $method eq 'HEAD') {
+ $content{'_method'} = $method;
+ $method = 'POST';
+ }
+ $req{$method} = $self->{'uri'};
$self->{'req'} = new HTTP::Request (%req);
$self->{'req'}->header('Authorization' => ('OAuth2 ' . $self->{'authToken'})) if $self->{'authToken'};
$self->{'req'}->header('Accept' => 'application/json');
- my %content;
my ($p, $v);
while (($p, $v) = each %{$self->{'queryParams'}}) {
$content{$p} = (ref($v) eq "") ? $v : JSON::encode_json($v);
s.email = 'gem-dev@curoverse.com'
s.licenses = ['Apache License, Version 2.0']
s.files = ["lib/arvados.rb"]
+ s.required_ruby_version = '>= 2.1.0'
s.add_dependency('google-api-client', '~> 0.6.3')
s.add_dependency('activesupport', '>= 3.2.13')
s.add_dependency('json', '>= 1.7.7')
end
def self.api_exec(method, parameters={})
api_method = arvados_api.send(api_models_sym).send(method.name.to_sym)
- parameters = parameters.
- merge(:api_token => arvados.config['ARVADOS_API_TOKEN'])
parameters.each do |k,v|
parameters[k] = v.to_json if v.is_a? Array or v.is_a? Hash
end
execute(:api_method => api_method,
:authenticated => false,
:parameters => parameters,
- :body => body)
+ :body => body,
+ :headers => {
+ authorization: 'OAuth2 '+arvados.config['ARVADOS_API_TOKEN']
+ })
resp = JSON.parse result.body, :symbolize_names => true
if resp[:errors]
raise Arvados::TransactionFailedError.new(resp[:errors])
# SimpleCov reports
/coverage
+
+# Dev/test SSL certificates
+/self-signed.key
+/self-signed.pem
addressable (2.3.6)
andand (1.3.3)
arel (3.0.3)
- arvados (0.1.20140414145041)
+ arvados (0.1.20140513131358)
activesupport (>= 3.2.13)
andand
google-api-client (~> 0.6.3)
json (>= 1.7.7)
- arvados-cli (0.1.20140414145041)
+ arvados-cli (0.1.20140513131358)
activesupport (~> 3.2, >= 3.2.13)
andand (~> 1.3, >= 1.3.3)
arvados (~> 0.1.0)
railties (>= 3.0, < 5.0)
thor (>= 0.14, < 2.0)
json (1.8.1)
- jwt (0.1.11)
+ jwt (0.1.13)
multi_json (>= 1.5)
launchy (2.4.2)
addressable (~> 2.3)
mime-types (~> 1.16)
treetop (~> 1.4.8)
mime-types (1.25.1)
- multi_json (1.9.2)
+ multi_json (1.10.0)
multipart-post (1.2.0)
net-scp (1.2.0)
net-ssh (>= 2.6.5)
jwt (~> 0.1.4)
multi_json (~> 1.0)
rack (~> 1.2)
- oj (2.7.3)
+ oj (2.9.0)
omniauth (1.1.1)
hashie (~> 1.2)
rack
return false
end
elsif 'success'.in? changed_attributes
+ logger.info "pipeline_instance changed_attributes has success for #{self.uuid}"
if self.success
self.active = false
self.state = Complete
self.state = Failed
end
elsif 'active'.in? changed_attributes
+ logger.info "pipeline_instance changed_attributes has active for #{self.uuid}"
if self.active
if self.state.in? [New, Ready, Paused]
self.state = RunningOnServer
end
if new_record? or 'components'.in? changed_attributes
- state ||= New
- if state == New and self.components_look_ready?
+ self.state ||= New
+ if self.state == New and self.components_look_ready?
self.state = Ready
end
end
if operand.is_a? Array
cond_out << "#{ar_table_name}.#{attr} #{operator} (?)"
param_out << operand
+ if operator == 'not in' and not operand.include?(nil)
+ # explicitly allow NULL
+ cond_out[-1] = "(#{cond_out[-1]} OR #{ar_table_name}.#{attr} IS NULL)"
+ end
else
raise ArgumentError.new("Invalid operand type '#{operand.class}' "\
"for '#{operator}' operator in filters")
j_done[:wait_thr].value
jobrecord = Job.find_by_uuid(job_done.uuid)
- jobrecord.running = false
- jobrecord.finished_at ||= Time.now
- # Don't set 'jobrecord.success = false' because if the job failed to run due to an
- # issue with crunch-job or slurm, we want the job to stay in the queue.
- jobrecord.save!
+ if jobrecord.started_at
+ # Clean up state fields in case crunch-job exited without
+ # putting the job in a suitable "finished" state.
+ jobrecord.running = false
+ jobrecord.finished_at ||= Time.now
+ if jobrecord.success.nil?
+ jobrecord.success = false
+ end
+ jobrecord.save!
+ else
+ # Don't fail the job if crunch-job didn't even get as far as
+ # starting it. If the job failed to run due to an infrastructure
+ # issue with crunch-job or slurm, we want the job to stay in the
+ # queue.
+ end
# Invalidate the per-job auth token
j_done[:job_auth].update_attributes expires_at: Time.now
--- /dev/null
+require 'test_helper'
+
+class Arvados::V1::FiltersTest < ActionController::TestCase
+ test '"not in" filter passes null values' do
+ @controller = Arvados::V1::GroupsController.new
+ authorize_with :admin
+ get :index, {
+ filters: [ ['group_class', 'not in', ['folder']] ],
+ controller: 'groups',
+ }
+ assert_response :success
+ found = assigns(:objects)
+ assert_includes(found.collect(&:group_class), nil,
+ "'group_class not in ['folder']' filter should pass null")
+ end
+end
--- /dev/null
+#! /usr/bin/env python
+
+import arvados
+
+import argparse
+import cgi
+import csv
+import json
+import logging
+import math
+import pprint
+import re
+import threading
+import urllib2
+
+from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler
+from collections import defaultdict, Counter
+from functools import partial
+from operator import itemgetter
+from SocketServer import ThreadingMixIn
+
+arv = arvados.api('v1')
+
+# Adapted from http://stackoverflow.com/questions/4180980/formatting-data-quantity-capacity-as-string
+byteunits = ('B', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB', 'EiB', 'ZiB', 'YiB')
+def fileSizeFormat(value):
+ exponent = 0 if value == 0 else int(math.log(value, 1024))
+ return "%7.2f %-3s" % (float(value) / pow(1024, exponent),
+ byteunits[exponent])
+
+def percentageFloor(x):
+ """ Returns a float which is the input rounded down to the neared 0.01.
+
+e.g. precentageFloor(0.941354) = 0.94
+"""
+ return math.floor(x*100) / 100.0
+
+
+def byteSizeFromValidUuid(valid_uuid):
+ return int(valid_uuid.split('+')[1])
+
+class maxdict(dict):
+ """A dictionary that holds the largest value entered for each key."""
+ def addValue(self, key, value):
+ dict.__setitem__(self, key, max(dict.get(self, key), value))
+ def addValues(self, kv_pairs):
+ for key,value in kv_pairs:
+ self.addValue(key, value)
+ def addDict(self, d):
+ self.addValues(d.items())
+
+class CollectionInfo:
+ DEFAULT_PERSISTER_REPLICATION_LEVEL=2
+ all_by_uuid = {}
+
+ def __init__(self, uuid):
+ if CollectionInfo.all_by_uuid.has_key(uuid):
+ raise ValueError('Collection for uuid "%s" already exists.' % uuid)
+ self.uuid = uuid
+ self.block_uuids = set() # uuids of keep blocks in this collection
+ self.reader_uuids = set() # uuids of users who can read this collection
+ self.persister_uuids = set() # uuids of users who want this collection saved
+ # map from user uuid to replication level they desire
+ self.persister_replication = maxdict()
+
+ # The whole api response in case we need anything else later.
+ self.api_response = []
+ CollectionInfo.all_by_uuid[uuid] = self
+
+ def byteSize(self):
+ return sum(map(byteSizeFromValidUuid, self.block_uuids))
+
+ def __str__(self):
+ return ('CollectionInfo uuid: %s\n'
+ ' %d block(s) containing %s\n'
+ ' reader_uuids: %s\n'
+ ' persister_replication: %s' %
+ (self.uuid,
+ len(self.block_uuids),
+ fileSizeFormat(self.byteSize()),
+ pprint.pformat(self.reader_uuids, indent = 15),
+ pprint.pformat(self.persister_replication, indent = 15)))
+
+ @staticmethod
+ def get(uuid):
+ if not CollectionInfo.all_by_uuid.has_key(uuid):
+ CollectionInfo(uuid)
+ return CollectionInfo.all_by_uuid[uuid]
+
+
+def extractUuid(candidate):
+ """ Returns a canonical (hash+size) uuid from a valid uuid, or None if candidate is not a valid uuid."""
+ match = re.match('([0-9a-fA-F]{32}\+[0-9]+)(\+[^+]+)*$', candidate)
+ return match and match.group(1)
+
+def checkUserIsAdmin():
+ current_user = arv.users().current().execute()
+
+ if not current_user['is_admin']:
+ log.warning('Current user %s (%s - %s) does not have '
+ 'admin access and will not see much of the data.',
+ current_user['full_name'],
+ current_user['email'],
+ current_user['uuid'])
+ if args.require_admin_user:
+ log.critical('Exiting, rerun with --no-require-admin-user '
+ 'if you wish to continue.')
+ exit(1)
+
+def buildCollectionsList():
+ if args.uuid:
+ return [args.uuid,]
+ else:
+ collections_list_response = arv.collections().list(limit=args.max_api_results).execute()
+
+ print ('Returned %d of %d collections.' %
+ (len(collections_list_response['items']),
+ collections_list_response['items_available']))
+
+ return [item['uuid'] for item in collections_list_response['items']]
+
+
+def readCollections(collection_uuids):
+ for collection_uuid in collection_uuids:
+ collection_block_uuids = set()
+ collection_response = arv.collections().get(uuid=collection_uuid).execute()
+ collection_info = CollectionInfo.get(collection_uuid)
+ collection_info.api_response = collection_response
+ manifest_lines = collection_response['manifest_text'].split('\n')
+
+ if args.verbose:
+ print 'Manifest text for %s:' % collection_uuid
+ pprint.pprint(manifest_lines)
+
+ for manifest_line in manifest_lines:
+ if manifest_line:
+ manifest_tokens = manifest_line.split(' ')
+ if args.verbose:
+ print 'manifest tokens: ' + pprint.pformat(manifest_tokens)
+ stream_name = manifest_tokens[0]
+
+ line_block_uuids = set(filter(None,
+ [extractUuid(candidate)
+ for candidate in manifest_tokens[1:]]))
+ collection_info.block_uuids.update(line_block_uuids)
+
+ # file_tokens = [token
+ # for token in manifest_tokens[1:]
+ # if extractUuid(token) is None]
+
+ # # Sort file tokens by start position in case they aren't already
+ # file_tokens.sort(key=lambda file_token: int(file_token.split(':')[0]))
+
+ # if args.verbose:
+ # print 'line_block_uuids: ' + pprint.pformat(line_block_uuids)
+ # print 'file_tokens: ' + pprint.pformat(file_tokens)
+
+
+def readLinks():
+ link_classes = set()
+
+ for collection_uuid,collection_info in CollectionInfo.all_by_uuid.items():
+ # TODO(misha): We may not be seing all the links, but since items
+ # available does not return an accurate number, I don't knos how
+ # to confirm that we saw all of them.
+ collection_links_response = arv.links().list(where={'head_uuid':collection_uuid}).execute()
+ link_classes.update([link['link_class'] for link in collection_links_response['items']])
+ for link in collection_links_response['items']:
+ if link['link_class'] == 'permission':
+ collection_info.reader_uuids.add(link['tail_uuid'])
+ elif link['link_class'] == 'resources':
+ replication_level = link['properties'].get(
+ 'replication',
+ CollectionInfo.DEFAULT_PERSISTER_REPLICATION_LEVEL)
+ collection_info.persister_replication.addValue(
+ link['tail_uuid'],
+ replication_level)
+ collection_info.persister_uuids.add(link['tail_uuid'])
+
+ print 'Found the following link classes:'
+ pprint.pprint(link_classes)
+
+def reportMostPopularCollections():
+ most_popular_collections = sorted(
+ CollectionInfo.all_by_uuid.values(),
+ key=lambda info: len(info.reader_uuids) + 10 * len(info.persister_replication),
+ reverse=True)[:10]
+
+ print 'Most popular Collections:'
+ for collection_info in most_popular_collections:
+ print collection_info
+
+
+def buildMaps():
+ for collection_uuid,collection_info in CollectionInfo.all_by_uuid.items():
+ # Add the block holding the manifest itself for all calculations
+ block_uuids = collection_info.block_uuids.union([collection_uuid,])
+ for block_uuid in block_uuids:
+ block_to_collections[block_uuid].add(collection_uuid)
+ block_to_readers[block_uuid].update(collection_info.reader_uuids)
+ block_to_persisters[block_uuid].update(collection_info.persister_uuids)
+ block_to_persister_replication[block_uuid].addDict(
+ collection_info.persister_replication)
+ for reader_uuid in collection_info.reader_uuids:
+ reader_to_collections[reader_uuid].add(collection_uuid)
+ reader_to_blocks[reader_uuid].update(block_uuids)
+ for persister_uuid in collection_info.persister_uuids:
+ persister_to_collections[persister_uuid].add(collection_uuid)
+ persister_to_blocks[persister_uuid].update(block_uuids)
+
+
+def itemsByValueLength(original):
+ return sorted(original.items(),
+ key=lambda item:len(item[1]),
+ reverse=True)
+
+
+def reportBusiestUsers():
+ busiest_readers = itemsByValueLength(reader_to_collections)
+ print 'The busiest readers are:'
+ for reader,collections in busiest_readers:
+ print '%s reading %d collections.' % (reader, len(collections))
+ busiest_persisters = itemsByValueLength(persister_to_collections)
+ print 'The busiest persisters are:'
+ for persister,collections in busiest_persisters:
+ print '%s reading %d collections.' % (persister, len(collections))
+
+
+def blockDiskUsage(block_uuid):
+ """Returns the disk usage of a block given its uuid.
+
+ Will return 0 before reading the contents of the keep servers.
+ """
+ return byteSizeFromValidUuid(block_uuid) * block_to_replication[block_uuid]
+
+def blockPersistedUsage(user_uuid, block_uuid):
+ return (byteSizeFromValidUuid(block_uuid) *
+ block_to_persister_replication[block_uuid].get(user_uuid, 0))
+
+memo_computeWeightedReplicationCosts = {}
+def computeWeightedReplicationCosts(replication_levels):
+ """Computes the relative cost of varied replication levels.
+
+ replication_levels: a tuple of integers representing the desired
+ replication level. If n users want a replication level of x then x
+ should appear n times in replication_levels.
+
+ Returns a dictionary from replication level to cost.
+
+ The basic thinking is that the cost of replicating at level x should
+ be shared by everyone who wants replication of level x or higher.
+
+ For example, if we have two users who want 1 copy, one user who
+ wants 3 copies and two users who want 6 copies:
+ the input would be [1, 1, 3, 6, 6] (or any permutation)
+
+ The cost of the first copy is shared by all 5 users, so they each
+ pay 1 copy / 5 users = 0.2.
+ The cost of the second and third copies shared by 3 users, so they
+ each pay 2 copies / 3 users = 0.67 (plus the above costs)
+ The cost of the fourth, fifth and sixth copies is shared by two
+ users, so they each pay 3 copies / 2 users = 1.5 (plus the above costs)
+
+ Here are some other examples:
+ computeWeightedReplicationCosts([1,]) -> {1:1.0}
+ computeWeightedReplicationCosts([2,]) -> {2:2.0}
+ computeWeightedReplicationCosts([1,1]) -> {1:0.5}
+ computeWeightedReplicationCosts([2,2]) -> {1:1.0}
+ computeWeightedReplicationCosts([1,2]) -> {1:0.5,2:1.5}
+ computeWeightedReplicationCosts([1,3]) -> {1:0.5,2:2.5}
+ computeWeightedReplicationCosts([1,3,6,6,10]) -> {1:0.2,3:0.7,6:1.7,10:5.7}
+ """
+ replication_level_counts = sorted(Counter(replication_levels).items())
+
+ memo_key = str(replication_level_counts)
+
+ if not memo_key in memo_computeWeightedReplicationCosts:
+ last_level = 0
+ current_cost = 0
+ total_interested = float(sum(map(itemgetter(1), replication_level_counts)))
+ cost_for_level = {}
+ for replication_level, count in replication_level_counts:
+ copies_added = replication_level - last_level
+ # compute marginal cost from last level and add it to the last cost
+ current_cost += copies_added / total_interested
+ cost_for_level[replication_level] = current_cost
+ # update invariants
+ last_level = replication_level
+ total_interested -= count
+ memo_computeWeightedReplicationCosts[memo_key] = cost_for_level
+
+ return memo_computeWeightedReplicationCosts[memo_key]
+
+def blockPersistedWeightedUsage(user_uuid, block_uuid):
+ persister_replication_for_block = block_to_persister_replication[block_uuid]
+ user_replication = persister_replication_for_block[user_uuid]
+ return (
+ byteSizeFromValidUuid(block_uuid) *
+ computeWeightedReplicationCosts(
+ persister_replication_for_block.values())[user_replication])
+
+
+def computeUserStorageUsage():
+ for user, blocks in reader_to_blocks.items():
+ user_to_usage[user][UNWEIGHTED_READ_SIZE_COL] = sum(map(
+ byteSizeFromValidUuid,
+ blocks))
+ user_to_usage[user][WEIGHTED_READ_SIZE_COL] = sum(map(
+ lambda block_uuid:(float(byteSizeFromValidUuid(block_uuid))/
+ len(block_to_readers[block_uuid])),
+ blocks))
+ for user, blocks in persister_to_blocks.items():
+ user_to_usage[user][UNWEIGHTED_PERSIST_SIZE_COL] = sum(map(
+ partial(blockPersistedUsage, user),
+ blocks))
+ user_to_usage[user][WEIGHTED_PERSIST_SIZE_COL] = sum(map(
+ partial(blockPersistedWeightedUsage, user),
+ blocks))
+
+def printUserStorageUsage():
+ print ('user: unweighted readable block size, weighted readable block size, '
+ 'unweighted persisted block size, weighted persisted block size:')
+ for user, usage in user_to_usage.items():
+ print ('%s: %s %s %s %s' %
+ (user,
+ fileSizeFormat(usage[UNWEIGHTED_READ_SIZE_COL]),
+ fileSizeFormat(usage[WEIGHTED_READ_SIZE_COL]),
+ fileSizeFormat(usage[UNWEIGHTED_PERSIST_SIZE_COL]),
+ fileSizeFormat(usage[WEIGHTED_PERSIST_SIZE_COL])))
+
+def logUserStorageUsage():
+ for user, usage in user_to_usage.items():
+ body = {}
+ # user could actually represent a user or a group. We don't set
+ # the object_type field since we don't know which we have.
+ body['object_uuid'] = user
+ body['event_type'] = args.user_storage_log_event_type
+ properties = {}
+ properties['read_collections_total_bytes'] = usage[UNWEIGHTED_READ_SIZE_COL]
+ properties['read_collections_weighted_bytes'] = (
+ usage[WEIGHTED_READ_SIZE_COL])
+ properties['persisted_collections_total_bytes'] = (
+ usage[UNWEIGHTED_PERSIST_SIZE_COL])
+ properties['persisted_collections_weighted_bytes'] = (
+ usage[WEIGHTED_PERSIST_SIZE_COL])
+ body['properties'] = properties
+ # TODO(misha): Confirm that this will throw an exception if it
+ # fails to create the log entry.
+ arv.logs().create(body=body).execute()
+
+def getKeepServers():
+ response = arv.keep_disks().list().execute()
+ return [[keep_server['service_host'], keep_server['service_port']]
+ for keep_server in response['items']]
+
+
+def getKeepBlocks(keep_servers):
+ blocks = []
+ for host,port in keep_servers:
+ response = urllib2.urlopen('http://%s:%d/index' % (host, port))
+ server_blocks = [line.split(' ')
+ for line in response.read().split('\n')
+ if line]
+ server_blocks = [(block_id, int(mtime))
+ for block_id, mtime in server_blocks]
+ blocks.append(server_blocks)
+ return blocks
+
+def getKeepStats(keep_servers):
+ MOUNT_COLUMN = 5
+ TOTAL_COLUMN = 1
+ FREE_COLUMN = 3
+ DISK_BLOCK_SIZE = 1024
+ stats = []
+ for host,port in keep_servers:
+ response = urllib2.urlopen('http://%s:%d/status.json' % (host, port))
+
+ parsed_json = json.load(response)
+ df_entries = [line.split()
+ for line in parsed_json['df'].split('\n')
+ if line]
+ keep_volumes = [columns
+ for columns in df_entries
+ if 'keep' in columns[MOUNT_COLUMN]]
+ total_space = DISK_BLOCK_SIZE*sum(map(int,map(itemgetter(TOTAL_COLUMN),
+ keep_volumes)))
+ free_space = DISK_BLOCK_SIZE*sum(map(int,map(itemgetter(FREE_COLUMN),
+ keep_volumes)))
+ stats.append([total_space, free_space])
+ return stats
+
+
+def computeReplication(keep_blocks):
+ for server_blocks in keep_blocks:
+ for block_uuid, _ in server_blocks:
+ block_to_replication[block_uuid] += 1
+ log.debug('Seeing the following replication levels among blocks: %s',
+ str(set(block_to_replication.values())))
+
+
+def computeGarbageCollectionCandidates():
+ for server_blocks in keep_blocks:
+ block_to_latest_mtime.addValues(server_blocks)
+ empty_set = set()
+ garbage_collection_priority = sorted(
+ [(block,mtime)
+ for block,mtime in block_to_latest_mtime.items()
+ if len(block_to_persisters.get(block,empty_set)) == 0],
+ key = itemgetter(1))
+ global garbage_collection_report
+ garbage_collection_report = []
+ cumulative_disk_size = 0
+ for block,mtime in garbage_collection_priority:
+ disk_size = blockDiskUsage(block)
+ cumulative_disk_size += disk_size
+ garbage_collection_report.append(
+ (block,
+ mtime,
+ disk_size,
+ cumulative_disk_size,
+ float(free_keep_space + cumulative_disk_size)/total_keep_space))
+
+ print 'The oldest Garbage Collection Candidates: '
+ pprint.pprint(garbage_collection_report[:20])
+
+
+def outputGarbageCollectionReport(filename):
+ with open(filename, 'wb') as csvfile:
+ gcwriter = csv.writer(csvfile)
+ gcwriter.writerow(['block uuid', 'latest mtime', 'disk size',
+ 'cumulative size', 'disk free'])
+ for line in garbage_collection_report:
+ gcwriter.writerow(line)
+
+def computeGarbageCollectionHistogram():
+ # TODO(misha): Modify this to allow users to specify the number of
+ # histogram buckets through a flag.
+ histogram = []
+ last_percentage = -1
+ for _,mtime,_,_,disk_free in garbage_collection_report:
+ curr_percentage = percentageFloor(disk_free)
+ if curr_percentage > last_percentage:
+ histogram.append( (mtime, curr_percentage) )
+ last_percentage = curr_percentage
+
+ log.info('Garbage collection histogram is: %s', histogram)
+
+ return histogram
+
+
+def logGarbageCollectionHistogram():
+ body = {}
+ # TODO(misha): Decide whether we should specify an object_uuid in
+ # the body and if so, which uuid to use.
+ body['event_type'] = args.block_age_free_space_histogram_log_event_type
+ properties = {}
+ properties['histogram'] = garbage_collection_histogram
+ body['properties'] = properties
+ # TODO(misha): Confirm that this will throw an exception if it
+ # fails to create the log entry.
+ arv.logs().create(body=body).execute()
+
+
+def detectReplicationProblems():
+ blocks_not_in_any_collections.update(
+ set(block_to_replication.keys()).difference(block_to_collections.keys()))
+ underreplicated_persisted_blocks.update(
+ [uuid
+ for uuid, persister_replication in block_to_persister_replication.items()
+ if len(persister_replication) > 0 and
+ block_to_replication[uuid] < max(persister_replication.values())])
+ overreplicated_persisted_blocks.update(
+ [uuid
+ for uuid, persister_replication in block_to_persister_replication.items()
+ if len(persister_replication) > 0 and
+ block_to_replication[uuid] > max(persister_replication.values())])
+
+ log.info('Found %d blocks not in any collections, e.g. %s...',
+ len(blocks_not_in_any_collections),
+ ','.join(list(blocks_not_in_any_collections)[:5]))
+ log.info('Found %d underreplicated blocks, e.g. %s...',
+ len(underreplicated_persisted_blocks),
+ ','.join(list(underreplicated_persisted_blocks)[:5]))
+ log.info('Found %d overreplicated blocks, e.g. %s...',
+ len(overreplicated_persisted_blocks),
+ ','.join(list(overreplicated_persisted_blocks)[:5]))
+
+ # TODO:
+ # Read blocks sorted by mtime
+ # Cache window vs % free space
+ # Collections which candidates will appear in
+ # Youngest underreplicated read blocks that appear in collections.
+ # Report Collections that have blocks which are missing from (or
+ # underreplicated in) keep.
+
+
+# This is the main flow here
+
+parser = argparse.ArgumentParser(description='Report on keep disks.')
+"""The command line argument parser we use.
+
+We only use it in the __main__ block, but leave it outside the block
+in case another package wants to use it or customize it by specifying
+it as a parent to their commandline parser.
+"""
+parser.add_argument('-m',
+ '--max-api-results',
+ type=int,
+ default=5000,
+ help=('The max results to get at once.'))
+parser.add_argument('-p',
+ '--port',
+ type=int,
+ default=9090,
+ help=('The port number to serve on. 0 means no server.'))
+parser.add_argument('-v',
+ '--verbose',
+ help='increase output verbosity',
+ action='store_true')
+parser.add_argument('-u',
+ '--uuid',
+ help='uuid of specific collection to process')
+parser.add_argument('--require-admin-user',
+ action='store_true',
+ default=True,
+ help='Fail if the user is not an admin [default]')
+parser.add_argument('--no-require-admin-user',
+ dest='require_admin_user',
+ action='store_false',
+ help=('Allow users without admin permissions with '
+ 'only a warning.'))
+parser.add_argument('--log-to-workbench',
+ action='store_true',
+ default=False,
+ help='Log findings to workbench')
+parser.add_argument('--no-log-to-workbench',
+ dest='log_to_workbench',
+ action='store_false',
+ help='Don\'t log findings to workbench [default]')
+parser.add_argument('--user-storage-log-event-type',
+ default='user-storage-report',
+ help=('The event type to set when logging user '
+ 'storage usage to workbench.'))
+parser.add_argument('--block-age-free-space-histogram-log-event-type',
+ default='block-age-free-space-histogram',
+ help=('The event type to set when logging user '
+ 'storage usage to workbench.'))
+parser.add_argument('--garbage-collection-file',
+ default='',
+ help=('The file to write a garbage collection report, or '
+ 'leave empty for no report.'))
+
+args = None
+
+# TODO(misha): Think about moving some of this to the __main__ block.
+log = logging.getLogger('arvados.services.datamanager')
+stderr_handler = logging.StreamHandler()
+log.setLevel(logging.INFO)
+stderr_handler.setFormatter(
+ logging.Formatter('%(asctime)-15s %(levelname)-8s %(message)s'))
+log.addHandler(stderr_handler)
+
+# Global Data - don't try this at home
+collection_uuids = []
+
+# These maps all map from uuids to a set of uuids
+block_to_collections = defaultdict(set) # keep blocks
+reader_to_collections = defaultdict(set) # collection(s) for which the user has read access
+persister_to_collections = defaultdict(set) # collection(s) which the user has persisted
+block_to_readers = defaultdict(set)
+block_to_persisters = defaultdict(set)
+block_to_persister_replication = defaultdict(maxdict)
+reader_to_blocks = defaultdict(set)
+persister_to_blocks = defaultdict(set)
+
+UNWEIGHTED_READ_SIZE_COL = 0
+WEIGHTED_READ_SIZE_COL = 1
+UNWEIGHTED_PERSIST_SIZE_COL = 2
+WEIGHTED_PERSIST_SIZE_COL = 3
+NUM_COLS = 4
+user_to_usage = defaultdict(lambda : [0,]*NUM_COLS)
+
+keep_servers = []
+keep_blocks = []
+keep_stats = []
+total_keep_space = 0
+free_keep_space = 0
+
+block_to_replication = defaultdict(lambda: 0)
+block_to_latest_mtime = maxdict()
+
+garbage_collection_report = []
+"""A list of non-persisted blocks, sorted by increasing mtime
+
+Each entry is of the form (block uuid, latest mtime, disk size,
+cumulative size)
+
+* block uuid: The id of the block we want to delete
+* latest mtime: The latest mtime of the block across all keep servers.
+* disk size: The total disk space used by this block (block size
+multiplied by current replication level)
+* cumulative disk size: The sum of this block's disk size and all the
+blocks listed above it
+* disk free: The proportion of our disk space that would be free if we
+deleted this block and all the above. So this is (free disk space +
+cumulative disk size) / total disk capacity
+"""
+
+garbage_collection_histogram = []
+""" Shows the tradeoff of keep block age vs keep disk free space.
+
+Each entry is of the form (mtime, Disk Proportion).
+
+An entry of the form (1388747781, 0.52) means that if we deleted the
+oldest non-presisted blocks until we had 52% of the disk free, then
+all blocks with an mtime greater than 1388747781 would be preserved.
+"""
+
+# Stuff to report on
+blocks_not_in_any_collections = set()
+underreplicated_persisted_blocks = set()
+overreplicated_persisted_blocks = set()
+
+all_data_loaded = False
+
+def loadAllData():
+ checkUserIsAdmin()
+
+ log.info('Building Collection List')
+ global collection_uuids
+ collection_uuids = filter(None, [extractUuid(candidate)
+ for candidate in buildCollectionsList()])
+
+ log.info('Reading Collections')
+ readCollections(collection_uuids)
+
+ if args.verbose:
+ pprint.pprint(CollectionInfo.all_by_uuid)
+
+ log.info('Reading Links')
+ readLinks()
+
+ reportMostPopularCollections()
+
+ log.info('Building Maps')
+ buildMaps()
+
+ reportBusiestUsers()
+
+ log.info('Getting Keep Servers')
+ global keep_servers
+ keep_servers = getKeepServers()
+
+ print keep_servers
+
+ log.info('Getting Blocks from each Keep Server.')
+ global keep_blocks
+ keep_blocks = getKeepBlocks(keep_servers)
+
+ log.info('Getting Stats from each Keep Server.')
+ global keep_stats, total_keep_space, free_keep_space
+ keep_stats = getKeepStats(keep_servers)
+
+ total_keep_space = sum(map(itemgetter(0), keep_stats))
+ free_keep_space = sum(map(itemgetter(1), keep_stats))
+
+ # TODO(misha): Delete this hack when the keep servers are fixed!
+ # This hack deals with the fact that keep servers report each other's disks.
+ total_keep_space /= len(keep_stats)
+ free_keep_space /= len(keep_stats)
+
+ log.info('Total disk space: %s, Free disk space: %s (%d%%).' %
+ (fileSizeFormat(total_keep_space),
+ fileSizeFormat(free_keep_space),
+ 100*free_keep_space/total_keep_space))
+
+ computeReplication(keep_blocks)
+
+ log.info('average replication level is %f',
+ (float(sum(block_to_replication.values())) /
+ len(block_to_replication)))
+
+ computeGarbageCollectionCandidates()
+
+ if args.garbage_collection_file:
+ log.info('Writing garbage Collection report to %s',
+ args.garbage_collection_file)
+ outputGarbageCollectionReport(args.garbage_collection_file)
+
+ global garbage_collection_histogram
+ garbage_collection_histogram = computeGarbageCollectionHistogram()
+
+ if args.log_to_workbench:
+ logGarbageCollectionHistogram()
+
+ detectReplicationProblems()
+
+ computeUserStorageUsage()
+ printUserStorageUsage()
+ if args.log_to_workbench:
+ logUserStorageUsage()
+
+ global all_data_loaded
+ all_data_loaded = True
+
+
+class DataManagerHandler(BaseHTTPRequestHandler):
+ USER_PATH = 'user'
+ COLLECTION_PATH = 'collection'
+ BLOCK_PATH = 'block'
+
+ def userLink(self, uuid):
+ return ('<A HREF="/%(path)s/%(uuid)s">%(uuid)s</A>' %
+ {'uuid': uuid,
+ 'path': DataManagerHandler.USER_PATH})
+
+ def collectionLink(self, uuid):
+ return ('<A HREF="/%(path)s/%(uuid)s">%(uuid)s</A>' %
+ {'uuid': uuid,
+ 'path': DataManagerHandler.COLLECTION_PATH})
+
+ def blockLink(self, uuid):
+ return ('<A HREF="/%(path)s/%(uuid)s">%(uuid)s</A>' %
+ {'uuid': uuid,
+ 'path': DataManagerHandler.BLOCK_PATH})
+
+ def writeTop(self, title):
+ self.wfile.write('<HTML><HEAD><TITLE>%s</TITLE></HEAD>\n<BODY>' % title)
+
+ def writeBottom(self):
+ self.wfile.write('</BODY></HTML>\n')
+
+ def writeHomePage(self):
+ self.send_response(200)
+ self.end_headers()
+ self.writeTop('Home')
+ self.wfile.write('<TABLE>')
+ self.wfile.write('<TR><TH>user'
+ '<TH>unweighted readable block size'
+ '<TH>weighted readable block size'
+ '<TH>unweighted persisted block size'
+ '<TH>weighted persisted block size</TR>\n')
+ for user, usage in user_to_usage.items():
+ self.wfile.write('<TR><TD>%s<TD>%s<TD>%s<TD>%s<TD>%s</TR>\n' %
+ (self.userLink(user),
+ fileSizeFormat(usage[UNWEIGHTED_READ_SIZE_COL]),
+ fileSizeFormat(usage[WEIGHTED_READ_SIZE_COL]),
+ fileSizeFormat(usage[UNWEIGHTED_PERSIST_SIZE_COL]),
+ fileSizeFormat(usage[WEIGHTED_PERSIST_SIZE_COL])))
+ self.wfile.write('</TABLE>\n')
+ self.writeBottom()
+
+ def userExists(self, uuid):
+ # Currently this will return false for a user who exists but
+ # doesn't appear on any manifests.
+ # TODO(misha): Figure out if we need to fix this.
+ return user_to_usage.has_key(uuid)
+
+ def writeUserPage(self, uuid):
+ if not self.userExists(uuid):
+ self.send_error(404,
+ 'User (%s) Not Found.' % cgi.escape(uuid, quote=False))
+ else:
+ # Here we assume that since a user exists, they don't need to be
+ # html escaped.
+ self.send_response(200)
+ self.end_headers()
+ self.writeTop('User %s' % uuid)
+ self.wfile.write('<TABLE>')
+ self.wfile.write('<TR><TH>user'
+ '<TH>unweighted readable block size'
+ '<TH>weighted readable block size'
+ '<TH>unweighted persisted block size'
+ '<TH>weighted persisted block size</TR>\n')
+ usage = user_to_usage[uuid]
+ self.wfile.write('<TR><TD>%s<TD>%s<TD>%s<TD>%s<TD>%s</TR>\n' %
+ (self.userLink(uuid),
+ fileSizeFormat(usage[UNWEIGHTED_READ_SIZE_COL]),
+ fileSizeFormat(usage[WEIGHTED_READ_SIZE_COL]),
+ fileSizeFormat(usage[UNWEIGHTED_PERSIST_SIZE_COL]),
+ fileSizeFormat(usage[WEIGHTED_PERSIST_SIZE_COL])))
+ self.wfile.write('</TABLE>\n')
+ self.wfile.write('<P>Persisting Collections: %s\n' %
+ ', '.join(map(self.collectionLink,
+ persister_to_collections[uuid])))
+ self.wfile.write('<P>Reading Collections: %s\n' %
+ ', '.join(map(self.collectionLink,
+ reader_to_collections[uuid])))
+ self.writeBottom()
+
+ def collectionExists(self, uuid):
+ return CollectionInfo.all_by_uuid.has_key(uuid)
+
+ def writeCollectionPage(self, uuid):
+ if not self.collectionExists(uuid):
+ self.send_error(404,
+ 'Collection (%s) Not Found.' % cgi.escape(uuid, quote=False))
+ else:
+ collection = CollectionInfo.get(uuid)
+ # Here we assume that since a collection exists, its id doesn't
+ # need to be html escaped.
+ self.send_response(200)
+ self.end_headers()
+ self.writeTop('Collection %s' % uuid)
+ self.wfile.write('<H1>Collection %s</H1>\n' % uuid)
+ self.wfile.write('<P>Total size %s (not factoring in replication).\n' %
+ fileSizeFormat(collection.byteSize()))
+ self.wfile.write('<P>Readers: %s\n' %
+ ', '.join(map(self.userLink, collection.reader_uuids)))
+
+ if len(collection.persister_replication) == 0:
+ self.wfile.write('<P>No persisters\n')
+ else:
+ replication_to_users = defaultdict(set)
+ for user,replication in collection.persister_replication.items():
+ replication_to_users[replication].add(user)
+ replication_levels = sorted(replication_to_users.keys())
+
+ self.wfile.write('<P>%d persisters in %d replication level(s) maxing '
+ 'out at %dx replication:\n' %
+ (len(collection.persister_replication),
+ len(replication_levels),
+ replication_levels[-1]))
+
+ # TODO(misha): This code is used twice, let's move it to a method.
+ self.wfile.write('<TABLE><TR><TH>%s</TR>\n' %
+ '<TH>'.join(['Replication Level ' + str(x)
+ for x in replication_levels]))
+ self.wfile.write('<TR>\n')
+ for replication_level in replication_levels:
+ users = replication_to_users[replication_level]
+ self.wfile.write('<TD valign="top">%s\n' % '<BR>\n'.join(
+ map(self.userLink, users)))
+ self.wfile.write('</TR></TABLE>\n')
+
+ replication_to_blocks = defaultdict(set)
+ for block in collection.block_uuids:
+ replication_to_blocks[block_to_replication[block]].add(block)
+ replication_levels = sorted(replication_to_blocks.keys())
+ self.wfile.write('<P>%d blocks in %d replication level(s):\n' %
+ (len(collection.block_uuids), len(replication_levels)))
+ self.wfile.write('<TABLE><TR><TH>%s</TR>\n' %
+ '<TH>'.join(['Replication Level ' + str(x)
+ for x in replication_levels]))
+ self.wfile.write('<TR>\n')
+ for replication_level in replication_levels:
+ blocks = replication_to_blocks[replication_level]
+ self.wfile.write('<TD valign="top">%s\n' % '<BR>\n'.join(blocks))
+ self.wfile.write('</TR></TABLE>\n')
+
+
+ def do_GET(self):
+ if not all_data_loaded:
+ self.send_error(503,
+ 'Sorry, but I am still loading all the data I need.')
+ else:
+ # Removing leading '/' and process request path
+ split_path = self.path[1:].split('/')
+ request_type = split_path[0]
+ log.debug('path (%s) split as %s with request_type %s' % (self.path,
+ split_path,
+ request_type))
+ if request_type == '':
+ self.writeHomePage()
+ elif request_type == DataManagerHandler.USER_PATH:
+ self.writeUserPage(split_path[1])
+ elif request_type == DataManagerHandler.COLLECTION_PATH:
+ self.writeCollectionPage(split_path[1])
+ else:
+ self.send_error(404, 'Unrecognized request path.')
+ return
+
+class ThreadedHTTPServer(ThreadingMixIn, HTTPServer):
+ """Handle requests in a separate thread."""
+
+
+if __name__ == '__main__':
+ args = parser.parse_args()
+
+ if args.port == 0:
+ loadAllData()
+ else:
+ loader = threading.Thread(target = loadAllData, name = 'loader')
+ loader.start()
+
+ server = ThreadedHTTPServer(('localhost', args.port), DataManagerHandler)
+ server.serve_forever()
--- /dev/null
+#! /usr/bin/env python
+
+import datamanager
+import unittest
+
+class TestComputeWeightedReplicationCosts(unittest.TestCase):
+ def test_obvious(self):
+ self.assertEqual(datamanager.computeWeightedReplicationCosts([1,]),
+ {1:1.0})
+
+ def test_simple(self):
+ self.assertEqual(datamanager.computeWeightedReplicationCosts([2,]),
+ {2:2.0})
+
+ def test_even_split(self):
+ self.assertEqual(datamanager.computeWeightedReplicationCosts([1,1]),
+ {1:0.5})
+
+ def test_even_split_bigger(self):
+ self.assertEqual(datamanager.computeWeightedReplicationCosts([2,2]),
+ {2:1.0})
+
+ def test_uneven_split(self):
+ self.assertEqual(datamanager.computeWeightedReplicationCosts([1,2]),
+ {1:0.5, 2:1.5})
+
+ def test_uneven_split_bigger(self):
+ self.assertEqual(datamanager.computeWeightedReplicationCosts([1,3]),
+ {1:0.5, 3:2.5})
+
+ def test_uneven_split_jumble(self):
+ self.assertEqual(datamanager.computeWeightedReplicationCosts([1,3,6,6,10]),
+ {1:0.2, 3:0.7, 6:1.7, 10:5.7})
+
+ def test_documentation_example(self):
+ self.assertEqual(datamanager.computeWeightedReplicationCosts([1,1,3,6,6]),
+ {1:0.2, 3: 0.2 + 2.0 / 3, 6: 0.2 + 2.0 / 3 + 1.5})
+
+
+if __name__ == '__main__':
+ unittest.main()
--- /dev/null
+// Tests for Keep HTTP handlers:
+//
+// GetBlockHandler
+// PutBlockHandler
+// IndexHandler
+//
+// The HTTP handlers are responsible for enforcing permission policy,
+// so these tests must exercise all possible permission permutations.
+
+package main
+
+import (
+ "bytes"
+ "github.com/gorilla/mux"
+ "net/http"
+ "net/http/httptest"
+ "regexp"
+ "testing"
+ "time"
+)
+
+// A RequestTester represents the parameters for an HTTP request to
+// be issued on behalf of a unit test.
+type RequestTester struct {
+ uri string
+ api_token string
+ method string
+ request_body []byte
+}
+
+// Test GetBlockHandler on the following situations:
+// - permissions off, unauthenticated request, unsigned locator
+// - permissions on, authenticated request, signed locator
+// - permissions on, authenticated request, unsigned locator
+// - permissions on, unauthenticated request, signed locator
+// - permissions on, authenticated request, expired locator
+//
+func TestGetHandler(t *testing.T) {
+ defer teardown()
+
+ // Prepare two test Keep volumes. Our block is stored on the second volume.
+ KeepVM = MakeTestVolumeManager(2)
+ defer func() { KeepVM.Quit() }()
+
+ vols := KeepVM.Volumes()
+ if err := vols[0].Put(TEST_HASH, TEST_BLOCK); err != nil {
+ t.Error(err)
+ }
+
+ // Set up a REST router for testing the handlers.
+ rest := MakeRESTRouter()
+
+ // Create locators for testing.
+ // Turn on permission settings so we can generate signed locators.
+ enforce_permissions = true
+ PermissionSecret = []byte(known_key)
+ permission_ttl = time.Duration(300) * time.Second
+
+ var (
+ unsigned_locator = "http://localhost:25107/" + TEST_HASH
+ valid_timestamp = time.Now().Add(permission_ttl)
+ expired_timestamp = time.Now().Add(-time.Hour)
+ signed_locator = "http://localhost:25107/" + SignLocator(TEST_HASH, known_token, valid_timestamp)
+ expired_locator = "http://localhost:25107/" + SignLocator(TEST_HASH, known_token, expired_timestamp)
+ )
+
+ // -----------------
+ // Test unauthenticated request with permissions off.
+ enforce_permissions = false
+
+ // Unauthenticated request, unsigned locator
+ // => OK
+ response := IssueRequest(rest,
+ &RequestTester{
+ method: "GET",
+ uri: unsigned_locator,
+ })
+ ExpectStatusCode(t,
+ "Unauthenticated request, unsigned locator", http.StatusOK, response)
+ ExpectBody(t,
+ "Unauthenticated request, unsigned locator",
+ string(TEST_BLOCK),
+ response)
+
+ // ----------------
+ // Permissions: on.
+ enforce_permissions = true
+
+ // Authenticated request, signed locator
+ // => OK
+ response = IssueRequest(rest, &RequestTester{
+ method: "GET",
+ uri: signed_locator,
+ api_token: known_token,
+ })
+ ExpectStatusCode(t,
+ "Authenticated request, signed locator", http.StatusOK, response)
+ ExpectBody(t,
+ "Authenticated request, signed locator", string(TEST_BLOCK), response)
+
+ // Authenticated request, unsigned locator
+ // => PermissionError
+ response = IssueRequest(rest, &RequestTester{
+ method: "GET",
+ uri: unsigned_locator,
+ api_token: known_token,
+ })
+ ExpectStatusCode(t, "unsigned locator", PermissionError.HTTPCode, response)
+
+ // Unauthenticated request, signed locator
+ // => PermissionError
+ response = IssueRequest(rest, &RequestTester{
+ method: "GET",
+ uri: signed_locator,
+ })
+ ExpectStatusCode(t,
+ "Unauthenticated request, signed locator",
+ PermissionError.HTTPCode, response)
+
+ // Authenticated request, expired locator
+ // => ExpiredError
+ response = IssueRequest(rest, &RequestTester{
+ method: "GET",
+ uri: expired_locator,
+ api_token: known_token,
+ })
+ ExpectStatusCode(t,
+ "Authenticated request, expired locator",
+ ExpiredError.HTTPCode, response)
+}
+
+// Test PutBlockHandler on the following situations:
+// - no server key
+// - with server key, authenticated request, unsigned locator
+// - with server key, unauthenticated request, unsigned locator
+//
+func TestPutHandler(t *testing.T) {
+ defer teardown()
+
+ // Prepare two test Keep volumes.
+ KeepVM = MakeTestVolumeManager(2)
+ defer func() { KeepVM.Quit() }()
+
+ // Set up a REST router for testing the handlers.
+ rest := MakeRESTRouter()
+
+ // --------------
+ // No server key.
+
+ // Unauthenticated request, no server key
+ // => OK (unsigned response)
+ unsigned_locator := "http://localhost:25107/" + TEST_HASH
+ response := IssueRequest(rest,
+ &RequestTester{
+ method: "PUT",
+ uri: unsigned_locator,
+ request_body: TEST_BLOCK,
+ })
+
+ ExpectStatusCode(t,
+ "Unauthenticated request, no server key", http.StatusOK, response)
+ ExpectBody(t, "Unauthenticated request, no server key", TEST_HASH, response)
+
+ // ------------------
+ // With a server key.
+
+ PermissionSecret = []byte(known_key)
+ permission_ttl = time.Duration(300) * time.Second
+
+ // When a permission key is available, the locator returned
+ // from an authenticated PUT request will be signed.
+
+ // Authenticated PUT, signed locator
+ // => OK (signed response)
+ response = IssueRequest(rest,
+ &RequestTester{
+ method: "PUT",
+ uri: unsigned_locator,
+ request_body: TEST_BLOCK,
+ api_token: known_token,
+ })
+
+ ExpectStatusCode(t,
+ "Authenticated PUT, signed locator, with server key",
+ http.StatusOK, response)
+ if !VerifySignature(response.Body.String(), known_token) {
+ t.Errorf("Authenticated PUT, signed locator, with server key:\n"+
+ "response '%s' does not contain a valid signature",
+ response.Body.String())
+ }
+
+ // Unauthenticated PUT, unsigned locator
+ // => OK
+ response = IssueRequest(rest,
+ &RequestTester{
+ method: "PUT",
+ uri: unsigned_locator,
+ request_body: TEST_BLOCK,
+ })
+
+ ExpectStatusCode(t,
+ "Unauthenticated PUT, unsigned locator, with server key",
+ http.StatusOK, response)
+ ExpectBody(t,
+ "Unauthenticated PUT, unsigned locator, with server key",
+ TEST_HASH, response)
+}
+
+// Test /index requests:
+// - enforce_permissions off | unauthenticated /index request
+// - enforce_permissions off | unauthenticated /index/prefix request
+// - enforce_permissions off | authenticated /index request | non-superuser
+// - enforce_permissions off | authenticated /index/prefix request | non-superuser
+// - enforce_permissions off | authenticated /index request | superuser
+// - enforce_permissions off | authenticated /index/prefix request | superuser
+// - enforce_permissions on | unauthenticated /index request
+// - enforce_permissions on | unauthenticated /index/prefix request
+// - enforce_permissions on | authenticated /index request | non-superuser
+// - enforce_permissions on | authenticated /index/prefix request | non-superuser
+// - enforce_permissions on | authenticated /index request | superuser
+// - enforce_permissions on | authenticated /index/prefix request | superuser
+//
+// The only /index requests that should succeed are those issued by the
+// superuser when enforce_permissions = true.
+//
+func TestIndexHandler(t *testing.T) {
+ defer teardown()
+
+ // Set up Keep volumes and populate them.
+ // Include multiple blocks on different volumes, and
+ // some metadata files (which should be omitted from index listings)
+ KeepVM = MakeTestVolumeManager(2)
+ defer func() { KeepVM.Quit() }()
+
+ vols := KeepVM.Volumes()
+ vols[0].Put(TEST_HASH, TEST_BLOCK)
+ vols[1].Put(TEST_HASH_2, TEST_BLOCK_2)
+ vols[0].Put(TEST_HASH+".meta", []byte("metadata"))
+ vols[1].Put(TEST_HASH_2+".meta", []byte("metadata"))
+
+ // Set up a REST router for testing the handlers.
+ rest := MakeRESTRouter()
+
+ data_manager_token = "DATA MANAGER TOKEN"
+
+ unauthenticated_req := &RequestTester{
+ method: "GET",
+ uri: "http://localhost:25107/index",
+ }
+ authenticated_req := &RequestTester{
+ method: "GET",
+ uri: "http://localhost:25107/index",
+ api_token: known_token,
+ }
+ superuser_req := &RequestTester{
+ method: "GET",
+ uri: "http://localhost:25107/index",
+ api_token: data_manager_token,
+ }
+ unauth_prefix_req := &RequestTester{
+ method: "GET",
+ uri: "http://localhost:25107/index/" + TEST_HASH[0:3],
+ }
+ auth_prefix_req := &RequestTester{
+ method: "GET",
+ uri: "http://localhost:25107/index/" + TEST_HASH[0:3],
+ api_token: known_token,
+ }
+ superuser_prefix_req := &RequestTester{
+ method: "GET",
+ uri: "http://localhost:25107/index/" + TEST_HASH[0:3],
+ api_token: data_manager_token,
+ }
+
+ // ----------------------------
+ // enforce_permissions disabled
+ // All /index requests should fail.
+ enforce_permissions = false
+
+ // unauthenticated /index request
+ // => PermissionError
+ response := IssueRequest(rest, unauthenticated_req)
+ ExpectStatusCode(t,
+ "enforce_permissions off, unauthenticated request",
+ PermissionError.HTTPCode,
+ response)
+
+ // unauthenticated /index/prefix request
+ // => PermissionError
+ response = IssueRequest(rest, unauth_prefix_req)
+ ExpectStatusCode(t,
+ "enforce_permissions off, unauthenticated /index/prefix request",
+ PermissionError.HTTPCode,
+ response)
+
+ // authenticated /index request, non-superuser
+ // => PermissionError
+ response = IssueRequest(rest, authenticated_req)
+ ExpectStatusCode(t,
+ "enforce_permissions off, authenticated request, non-superuser",
+ PermissionError.HTTPCode,
+ response)
+
+ // authenticated /index/prefix request, non-superuser
+ // => PermissionError
+ response = IssueRequest(rest, auth_prefix_req)
+ ExpectStatusCode(t,
+ "enforce_permissions off, authenticated /index/prefix request, non-superuser",
+ PermissionError.HTTPCode,
+ response)
+
+ // authenticated /index request, superuser
+ // => PermissionError
+ response = IssueRequest(rest, superuser_req)
+ ExpectStatusCode(t,
+ "enforce_permissions off, superuser request",
+ PermissionError.HTTPCode,
+ response)
+
+ // superuser /index/prefix request
+ // => PermissionError
+ response = IssueRequest(rest, superuser_prefix_req)
+ ExpectStatusCode(t,
+ "enforce_permissions off, superuser /index/prefix request",
+ PermissionError.HTTPCode,
+ response)
+
+ // ---------------------------
+ // enforce_permissions enabled
+ // Only the superuser should be allowed to issue /index requests.
+ enforce_permissions = true
+
+ // unauthenticated /index request
+ // => PermissionError
+ response = IssueRequest(rest, unauthenticated_req)
+ ExpectStatusCode(t,
+ "enforce_permissions on, unauthenticated request",
+ PermissionError.HTTPCode,
+ response)
+
+ // unauthenticated /index/prefix request
+ // => PermissionError
+ response = IssueRequest(rest, unauth_prefix_req)
+ ExpectStatusCode(t,
+ "permissions on, unauthenticated /index/prefix request",
+ PermissionError.HTTPCode,
+ response)
+
+ // authenticated /index request, non-superuser
+ // => PermissionError
+ response = IssueRequest(rest, authenticated_req)
+ ExpectStatusCode(t,
+ "permissions on, authenticated request, non-superuser",
+ PermissionError.HTTPCode,
+ response)
+
+ // authenticated /index/prefix request, non-superuser
+ // => PermissionError
+ response = IssueRequest(rest, auth_prefix_req)
+ ExpectStatusCode(t,
+ "permissions on, authenticated /index/prefix request, non-superuser",
+ PermissionError.HTTPCode,
+ response)
+
+ // superuser /index request
+ // => OK
+ response = IssueRequest(rest, superuser_req)
+ ExpectStatusCode(t,
+ "permissions on, superuser request",
+ http.StatusOK,
+ response)
+
+ expected := `^` + TEST_HASH + `\+\d+ \d+\n` +
+ TEST_HASH_2 + `\+\d+ \d+\n$`
+ match, _ := regexp.MatchString(expected, response.Body.String())
+ if !match {
+ t.Errorf(
+ "permissions on, superuser request: expected %s, got:\n%s",
+ expected, response.Body.String())
+ }
+
+ // superuser /index/prefix request
+ // => OK
+ response = IssueRequest(rest, superuser_prefix_req)
+ ExpectStatusCode(t,
+ "permissions on, superuser request",
+ http.StatusOK,
+ response)
+
+ expected = `^` + TEST_HASH + `\+\d+ \d+\n$`
+ match, _ = regexp.MatchString(expected, response.Body.String())
+ if !match {
+ t.Errorf(
+ "permissions on, superuser /index/prefix request: expected %s, got:\n%s",
+ expected, response.Body.String())
+ }
+}
+
+// ====================
+// Helper functions
+// ====================
+
+// IssueTestRequest executes an HTTP request described by rt, to a
+// specified REST router. It returns the HTTP response to the request.
+func IssueRequest(router *mux.Router, rt *RequestTester) *httptest.ResponseRecorder {
+ response := httptest.NewRecorder()
+ body := bytes.NewReader(rt.request_body)
+ req, _ := http.NewRequest(rt.method, rt.uri, body)
+ if rt.api_token != "" {
+ req.Header.Set("Authorization", "OAuth "+rt.api_token)
+ }
+ router.ServeHTTP(response, req)
+ return response
+}
+
+// ExpectStatusCode checks whether a response has the specified status code,
+// and reports a test failure if not.
+func ExpectStatusCode(
+ t *testing.T,
+ testname string,
+ expected_status int,
+ response *httptest.ResponseRecorder) {
+ if response.Code != expected_status {
+ t.Errorf("%s: expected status %s, got %+v",
+ testname, expected_status, response)
+ }
+}
+
+func ExpectBody(
+ t *testing.T,
+ testname string,
+ expected_body string,
+ response *httptest.ResponseRecorder) {
+ if response.Body.String() != expected_body {
+ t.Errorf("%s: expected response body '%s', got %+v",
+ testname, expected_body, response)
+ }
+}
"net/http"
"os"
"regexp"
+ "strconv"
"strings"
"syscall"
+ "time"
)
// ======================
// and/or configuration file settings.
// Default TCP address on which to listen for requests.
+// Initialized by the --listen flag.
const DEFAULT_ADDR = ":25107"
// A Keep "block" is 64MB.
var PROC_MOUNTS = "/proc/mounts"
// The Keep VolumeManager maintains a list of available volumes.
+// Initialized by the --volumes flag (or by FindKeepVolumes).
var KeepVM VolumeManager
+// enforce_permissions controls whether permission signatures
+// should be enforced (affecting GET and DELETE requests).
+// Initialized by the --enforce-permissions flag.
+var enforce_permissions bool
+
+// permission_ttl is the time duration for which new permission
+// signatures (returned by PUT requests) will be valid.
+// Initialized by the --permission-ttl flag.
+var permission_ttl time.Duration
+
+// data_manager_token represents the API token used by the
+// Data Manager, and is required on certain privileged operations.
+// Initialized by the --data-manager-token-file flag.
+var data_manager_token string
+
// ==========
// Error types.
//
}
var (
- CollisionError = &KeepError{400, "Collision"}
- MD5Error = &KeepError{401, "MD5 Failure"}
- CorruptError = &KeepError{402, "Corruption"}
- NotFoundError = &KeepError{404, "Not Found"}
- GenericError = &KeepError{500, "Fail"}
- FullError = &KeepError{503, "Full"}
- TooLongError = &KeepError{504, "Too Long"}
+ CollisionError = &KeepError{400, "Collision"}
+ MD5Error = &KeepError{401, "MD5 Failure"}
+ PermissionError = &KeepError{401, "Permission denied"}
+ CorruptError = &KeepError{402, "Corruption"}
+ ExpiredError = &KeepError{403, "Expired permission signature"}
+ NotFoundError = &KeepError{404, "Not Found"}
+ GenericError = &KeepError{500, "Fail"}
+ FullError = &KeepError{503, "Full"}
+ TooLongError = &KeepError{504, "Too Long"}
)
func (e *KeepError) Error() string {
// data exceeds BLOCKSIZE bytes.
var ReadErrorTooLong = errors.New("Too long")
+// TODO(twp): continue moving as much code as possible out of main
+// so it can be effectively tested. Esp. handling and postprocessing
+// of command line flags (identifying Keep volumes and initializing
+// permission arguments).
+
func main() {
// Parse command-line flags:
//
// by looking at currently mounted filesystems for /keep top-level
// directories.
- var listen, volumearg string
- var serialize_io bool
- flag.StringVar(&listen, "listen", DEFAULT_ADDR,
- "interface on which to listen for requests, in the format ipaddr:port. e.g. -listen=10.0.1.24:8000. Use -listen=:port to listen on all network interfaces.")
- flag.StringVar(&volumearg, "volumes", "",
- "Comma-separated list of directories to use for Keep volumes, e.g. -volumes=/var/keep1,/var/keep2. If empty or not supplied, Keep will scan mounted filesystems for volumes with a /keep top-level directory.")
- flag.BoolVar(&serialize_io, "serialize", false,
- "If set, all read and write operations on local Keep volumes will be serialized.")
+ var (
+ data_manager_token_file string
+ listen string
+ permission_key_file string
+ permission_ttl_sec int
+ serialize_io bool
+ volumearg string
+ )
+ flag.StringVar(
+ &data_manager_token_file,
+ "data-manager-token-file",
+ "",
+ "File with the API token used by the Data Manager. All DELETE "+
+ "requests or GET /index requests must carry this token.")
+ flag.BoolVar(
+ &enforce_permissions,
+ "enforce-permissions",
+ false,
+ "Enforce permission signatures on requests.")
+ flag.StringVar(
+ &listen,
+ "listen",
+ DEFAULT_ADDR,
+ "Interface on which to listen for requests, in the format "+
+ "ipaddr:port. e.g. -listen=10.0.1.24:8000. Use -listen=:port "+
+ "to listen on all network interfaces.")
+ flag.StringVar(
+ &permission_key_file,
+ "permission-key-file",
+ "",
+ "File containing the secret key for generating and verifying "+
+ "permission signatures.")
+ flag.IntVar(
+ &permission_ttl_sec,
+ "permission-ttl",
+ 300,
+ "Expiration time (in seconds) for newly generated permission "+
+ "signatures.")
+ flag.BoolVar(
+ &serialize_io,
+ "serialize",
+ false,
+ "If set, all read and write operations on local Keep volumes will "+
+ "be serialized.")
+ flag.StringVar(
+ &volumearg,
+ "volumes",
+ "",
+ "Comma-separated list of directories to use for Keep volumes, "+
+ "e.g. -volumes=/var/keep1,/var/keep2. If empty or not "+
+ "supplied, Keep will scan mounted filesystems for volumes "+
+ "with a /keep top-level directory.")
flag.Parse()
// Look for local keep volumes.
log.Fatal("could not find any keep volumes")
}
+ // Initialize data manager token and permission key.
+ // If these tokens are specified but cannot be read,
+ // raise a fatal error.
+ if data_manager_token_file != "" {
+ if buf, err := ioutil.ReadFile(data_manager_token_file); err == nil {
+ data_manager_token = strings.TrimSpace(string(buf))
+ } else {
+ log.Fatalf("reading data manager token: %s\n", err)
+ }
+ }
+ if permission_key_file != "" {
+ if buf, err := ioutil.ReadFile(permission_key_file); err == nil {
+ PermissionSecret = bytes.TrimSpace(buf)
+ } else {
+ log.Fatalf("reading permission key: %s\n", err)
+ }
+ }
+
+ // Initialize permission TTL
+ permission_ttl = time.Duration(permission_ttl_sec) * time.Second
+
+ // If --enforce-permissions is true, we must have a permission key
+ // to continue.
+ if PermissionSecret == nil {
+ if enforce_permissions {
+ log.Fatal("--enforce-permissions requires a permission key")
+ } else {
+ log.Println("Running without a PermissionSecret. Block locators " +
+ "returned by this server will not be signed, and will be rejected " +
+ "by a server that enforces permissions.")
+ log.Println("To fix this, run Keep with --permission-key-file=<path> " +
+ "to define the location of a file containing the permission key.")
+ }
+ }
+
// Start a round-robin VolumeManager with the volumes we have found.
KeepVM = MakeRRVolumeManager(goodvols)
- // Set up REST handlers.
- //
- // Start with a router that will route each URL path to an
- // appropriate handler.
- //
- rest := mux.NewRouter()
- rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
- rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
- rest.HandleFunc(`/index`, IndexHandler).Methods("GET", "HEAD")
- rest.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
- rest.HandleFunc(`/status.json`, StatusHandler).Methods("GET", "HEAD")
-
// Tell the built-in HTTP server to direct all requests to the REST
// router.
- http.Handle("/", rest)
+ http.Handle("/", MakeRESTRouter())
// Start listening for requests.
http.ListenAndServe(listen, nil)
}
+// MakeRESTRouter
+// Returns a mux.Router that passes GET and PUT requests to the
+// appropriate handlers.
+//
+func MakeRESTRouter() *mux.Router {
+ rest := mux.NewRouter()
+ rest.HandleFunc(
+ `/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
+ rest.HandleFunc(
+ `/{hash:[0-9a-f]{32}}+A{signature:[0-9a-f]+}@{timestamp:[0-9a-f]+}`,
+ GetBlockHandler).Methods("GET", "HEAD")
+ rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
+
+ // For IndexHandler we support:
+ // /index - returns all locators
+ // /index/{prefix} - returns all locators that begin with {prefix}
+ // {prefix} is a string of hexadecimal digits between 0 and 32 digits.
+ // If {prefix} is the empty string, return an index of all locators
+ // (so /index and /index/ behave identically)
+ // A client may supply a full 32-digit locator string, in which
+ // case the server will return an index with either zero or one
+ // entries. This usage allows a client to check whether a block is
+ // present, and its size and upload time, without retrieving the
+ // entire block.
+ //
+ rest.HandleFunc(`/index`, IndexHandler).Methods("GET", "HEAD")
+ rest.HandleFunc(
+ `/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
+ rest.HandleFunc(`/status.json`, StatusHandler).Methods("GET", "HEAD")
+ return rest
+}
+
// FindKeepVolumes
// Returns a list of Keep volumes mounted on this system.
//
for scanner.Scan() {
args := strings.Fields(scanner.Text())
dev, mount := args[0], args[1]
- if (dev == "tmpfs" || strings.HasPrefix(dev, "/dev/")) && mount != "/" {
+ if mount != "/" &&
+ (dev == "tmpfs" || strings.HasPrefix(dev, "/dev/")) {
keep := mount + "/keep"
if st, err := os.Stat(keep); err == nil && st.IsDir() {
vols = append(vols, keep)
return vols
}
-func GetBlockHandler(w http.ResponseWriter, req *http.Request) {
+func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
hash := mux.Vars(req)["hash"]
+ signature := mux.Vars(req)["signature"]
+ timestamp := mux.Vars(req)["timestamp"]
+
+ // If permission checking is in effect, verify this
+ // request's permission signature.
+ if enforce_permissions {
+ if signature == "" || timestamp == "" {
+ http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
+ return
+ } else if IsExpired(timestamp) {
+ http.Error(resp, ExpiredError.Error(), ExpiredError.HTTPCode)
+ return
+ } else {
+ validsig := MakePermSignature(hash, GetApiToken(req), timestamp)
+ if signature != validsig {
+ http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
+ return
+ }
+ }
+ }
block, err := GetBlock(hash)
if err != nil {
- http.Error(w, err.Error(), 404)
+ // This type assertion is safe because the only errors
+ // GetBlock can return are CorruptError or NotFoundError.
+ http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
return
}
- _, err = w.Write(block)
+ _, err = resp.Write(block)
if err != nil {
log.Printf("GetBlockHandler: writing response: %s", err)
}
return
}
-func PutBlockHandler(w http.ResponseWriter, req *http.Request) {
+func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
hash := mux.Vars(req)["hash"]
// Read the block data to be stored.
//
if buf, err := ReadAtMost(req.Body, BLOCKSIZE); err == nil {
if err := PutBlock(buf, hash); err == nil {
- w.WriteHeader(http.StatusOK)
+ // Success; sign the locator and return it to the client.
+ api_token := GetApiToken(req)
+ expiry := time.Now().Add(permission_ttl)
+ signed_loc := SignLocator(hash, api_token, expiry)
+ resp.Write([]byte(signed_loc))
} else {
ke := err.(*KeepError)
- http.Error(w, ke.Error(), ke.HTTPCode)
+ http.Error(resp, ke.Error(), ke.HTTPCode)
}
} else {
log.Println("error reading request: ", err)
// the maximum request size.
errmsg = fmt.Sprintf("Max request size %d bytes", BLOCKSIZE)
}
- http.Error(w, errmsg, 500)
+ http.Error(resp, errmsg, 500)
}
}
// IndexHandler
// A HandleFunc to address /index and /index/{prefix} requests.
//
-func IndexHandler(w http.ResponseWriter, req *http.Request) {
+func IndexHandler(resp http.ResponseWriter, req *http.Request) {
prefix := mux.Vars(req)["prefix"]
+ // Only the data manager may issue /index requests,
+ // and only if enforce_permissions is enabled.
+ // All other requests return 403 Permission denied.
+ api_token := GetApiToken(req)
+ if !enforce_permissions ||
+ api_token == "" ||
+ data_manager_token != api_token {
+ http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
+ return
+ }
var index string
for _, vol := range KeepVM.Volumes() {
index = index + vol.Index(prefix)
}
- w.Write([]byte(index))
+ resp.Write([]byte(index))
}
// StatusHandler
Volumes []*VolumeStatus `json:"volumes"`
}
-func StatusHandler(w http.ResponseWriter, req *http.Request) {
+func StatusHandler(resp http.ResponseWriter, req *http.Request) {
st := GetNodeStatus()
if jstat, err := json.Marshal(st); err == nil {
- w.Write(jstat)
+ resp.Write(jstat)
} else {
log.Printf("json.Marshal: %s\n", err)
log.Printf("NodeStatus = %v\n", st)
- http.Error(w, err.Error(), 500)
+ http.Error(resp, err.Error(), 500)
}
}
// they should be sent directly to an event manager at high
// priority or logged as urgent problems.
//
- log.Printf("%s: checksum mismatch for request %s (actual hash %s)\n",
+ log.Printf("%s: checksum mismatch for request %s (actual %s)\n",
vol, hash, filehash)
return buf, CorruptError
}
// If we already have a block on disk under this identifier, return
// success (but check for MD5 collisions).
// The only errors that GetBlock can return are ErrCorrupt and ErrNotFound.
- // In either case, we want to write our new (good) block to disk, so there is
- // nothing special to do if err != nil.
+ // In either case, we want to write our new (good) block to disk,
+ // so there is nothing special to do if err != nil.
if oldblock, err := GetBlock(hash); err == nil {
if bytes.Compare(block, oldblock) == 0 {
return nil
log.Printf("IsValidLocator: %s\n", err)
return false
}
+
+// GetApiToken returns the OAuth token from the Authorization
+// header of a HTTP request, or an empty string if no matching
+// token is found.
+func GetApiToken(req *http.Request) string {
+ if auth, ok := req.Header["Authorization"]; ok {
+ if strings.HasPrefix(auth[0], "OAuth ") {
+ return auth[0][6:]
+ }
+ }
+ return ""
+}
+
+// IsExpired returns true if the given Unix timestamp (expressed as a
+// hexadecimal string) is in the past, or if timestamp_hex cannot be
+// parsed as a hexadecimal string.
+func IsExpired(timestamp_hex string) bool {
+ ts, err := strconv.ParseInt(timestamp_hex, 16, 0)
+ if err != nil {
+ log.Printf("IsExpired: %s\n", err)
+ return true
+ }
+ return time.Unix(ts, 0).Before(time.Now())
+}
match, err := regexp.MatchString(expected, index)
if err == nil {
if !match {
- t.Errorf("IndexLocators returned:\n-----\n%s-----\n", index)
+ t.Errorf("IndexLocators returned:\n%s", index)
}
} else {
t.Errorf("regexp.MatchString: %s", err)
// Cleanup to perform after each test.
//
func teardown() {
+ data_manager_token = ""
+ enforce_permissions = false
+ PermissionSecret = nil
KeepVM = nil
}
// key.
var PermissionSecret []byte
-// makePermSignature returns a string representing the signed permission
+// MakePermSignature returns a string representing the signed permission
// hint for the blob identified by blob_hash, api_token and expiration timestamp.
-func makePermSignature(blob_hash string, api_token string, expiry string) string {
+func MakePermSignature(blob_hash string, api_token string, expiry string) string {
hmac := hmac.New(sha1.New, PermissionSecret)
hmac.Write([]byte(blob_hash))
hmac.Write([]byte("@"))
// SignLocator takes a blob_locator, an api_token and an expiry time, and
// returns a signed locator string.
func SignLocator(blob_locator string, api_token string, expiry time.Time) string {
+ // If no permission secret or API token is available,
+ // return an unsigned locator.
+ if PermissionSecret == nil || api_token == "" {
+ return blob_locator
+ }
// Extract the hash from the blob locator, omitting any size hint that may be present.
blob_hash := strings.Split(blob_locator, "+")[0]
// Return the signed locator string.
timestamp_hex := fmt.Sprintf("%08x", expiry.Unix())
return blob_locator +
- "+A" + makePermSignature(blob_hash, api_token, timestamp_hex) +
+ "+A" + MakePermSignature(blob_hash, api_token, timestamp_hex) +
"@" + timestamp_hex
}