} else {
params.filters = [['any', '@@', newquery.trim().concat(':*')]];
}
+ $(".modal-dialog-preview-pane").html("");
$target.data('infinite-content-params-filterable', params);
$target.data('filterable-query', newquery);
}
'project_uuid': project_uuid
};
}
+ $(".modal-dialog-preview-pane").html("");
// Use current selection as dropdown button label
$(this).
closest('.dropdown-menu').
toggleClass('disabled',
($checked.length < 0) ||
!($checked.length > 0 && collection_lock_classes && collection_lock_classes.indexOf("fa-unlock") !=-1));
+ $('[data-selection-action=untrash-selected-items]', $container).
+ closest('li').
+ toggleClass('disabled',
+ ($checked.length < 1));
}
$(document).
end
@objects = search_what.contents(limit: @limit,
offset: @offset,
+ recursive: true,
count: 'none',
last_object_class: params["last_object_class"],
filters: @filters)
def next_page_href with_params={}
super with_params.merge(last_object_class: @objects.last.class.to_s,
project_uuid: params[:project_uuid],
+ recursive: true,
count: 'none',
filters: @filters.to_json)
end
--- /dev/null
+class TrashItemsController < ApplicationController
+ def model_class
+ Collection
+ end
+
+ def index_pane_list
+ %w(Recent_trash)
+ end
+
+ def find_objects_for_index
+ # If it's not the index rows partial display, just return
+ # The /index request will again be invoked to display the
+ # partial at which time, we will be using the objects found.
+ return if !params[:partial]
+
+ trashed_items
+
+ if @objects.any?
+ @objects = @objects.sort_by { |obj| obj.trash_at }.reverse
+ @next_page_filters = next_page_filters('<=')
+ @next_page_href = url_for(partial: :trash_rows,
+ filters: @next_page_filters.to_json)
+ else
+ @next_page_href = nil
+ end
+ end
+
+ def next_page_href with_params={}
+ @next_page_href
+ end
+
+ def next_page_filters nextpage_operator
+ next_page_filters = @filters.reject do |attr, op, val|
+ (attr == 'trash_at' and op == nextpage_operator) or
+ (attr == 'uuid' and op == 'not in')
+ end
+
+ if @objects.any?
+ last_trash_at = @objects.last.trash_at
+
+ last_uuids = []
+ @objects.each do |obj|
+ last_uuids << obj.uuid if obj.trash_at.eql?(last_trash_at)
+ end
+
+ next_page_filters += [['trash_at', nextpage_operator, last_trash_at]]
+ next_page_filters += [['uuid', 'not in', last_uuids]]
+ end
+
+ next_page_filters
+ end
+
+ def trashed_items
+ # API server index doesn't return manifest_text by default, but our
+ # callers want it unless otherwise specified.
+ @select ||= Collection.columns.map(&:name)
+ limit = if params[:limit] then params[:limit].to_i else 100 end
+ offset = if params[:offset] then params[:offset].to_i else 0 end
+
+ base_search = Collection.select(@select).include_trash(true).where(is_trashed: true)
+ base_search = base_search.filter(params[:filters]) if params[:filters]
+
+ if params[:search].andand.length.andand > 0
+ tags = Link.where(any: ['contains', params[:search]])
+ base_search = base_search.limit(limit).offset(offset)
+ @objects = (base_search.where(uuid: tags.collect(&:head_uuid)) |
+ base_search.where(any: ['contains', params[:search]])).
+ uniq { |c| c.uuid }
+ else
+ @objects = base_search.limit(limit).offset(offset)
+ end
+ end
+
+ def untrash_items
+ @untrashed_uuids = []
+
+ updates = {trash_at: nil}
+
+ Collection.include_trash(1).where(uuid: params[:selection]).each do |c|
+ c.untrash
+ @untrashed_uuids << c.uuid
+ end
+
+ respond_to do |format|
+ format.js
+ end
+ end
+end
ArvadosResourceList.new(self).distinct(*args)
end
+ def self.include_trash(*args)
+ ArvadosResourceList.new(self).include_trash(*args)
+ end
+
+ def self.recursive(*args)
+ ArvadosResourceList.new(self).recursive(*args)
+ end
+
def self.eager(*args)
ArvadosResourceList.new(self).eager(*args)
end
self
end
+ def include_trash(option=nil)
+ @include_trash = option
+ self
+ end
+
+ def recursive(option=nil)
+ @recursive = option
+ self
+ end
+
def limit(max_results)
if not max_results.nil? and not max_results.is_a? Integer
raise ArgumentError("argument to limit() must be an Integer or nil")
api_params[:order] = @orderby_spec if @orderby_spec
api_params[:filters] = @filters if @filters
api_params[:distinct] = @distinct if @distinct
+ api_params[:include_trash] = @include_trash if @include_trash
if @fetch_multiple_pages
# Default limit to (effectively) api server's MAX_LIMIT
api_params[:limit] = 2**(0.size*8 - 1) - 1
[ 'description' ]
end
+ def untrash
+ arvados_api_client.api(self.class, "/#{self.uuid}/untrash", {})
+ end
end
<% end %>
<% end %>
</ul>
+ <ul class="nav navbar-nav navbar-right">
+ <li>
+ <a href="/trash">
+ <%= image_tag("trash-icon.png", size: "20x20" ) %> Trash
+ </a>
+ </li>
+ </ul>
</nav>
--- /dev/null
+<%# There is no such thing %>
--- /dev/null
+<div class="container selection-action-container" style="width: 100%">
+ <div class="col-md-2 pull-left">
+ <div class="btn-group btn-group-sm">
+ <button type="button" class="btn btn-default dropdown-toggle" data-toggle="dropdown">Selection... <span class="caret"></span></button>
+ <ul class="dropdown-menu" role="menu">
+ <li><%= link_to "Un-trash selected items", '#',
+ method: :post,
+ remote: true,
+ 'id' => 'untrash_selected_items',
+ 'data-href' => untrash_items_trash_items_path,
+ 'data-selection-param-name' => 'selection[]',
+ 'data-selection-action' => 'untrash-selected-items',
+ 'data-toggle' => 'dropdown'
+ %></li>
+ </ul>
+ </div>
+ </div>
+ <div class="col-md-4 pull-right">
+ <input type="text" class="form-control filterable-control recent-trash-items"
+ placeholder="Search trash"
+ data-filterable-target="#recent-trash-items"
+ value="<%= params[:search] %>" />
+ </div>
+
+ <div>
+ <table id="trash-index" class="topalign table table-condensed table-fixedlayout">
+ <colgroup>
+ <col width="5%" />
+ <col width="20%" />
+ <col width="15%" />
+ <col width="15%" />
+ <col width="10%" />
+ <col width="30%" />
+ <col width="5%" />
+ </colgroup>
+
+ <thead>
+ <tr class="contain-align-left">
+ <th></th>
+ <th>Name</th>
+ <th>Trashed at</th>
+ <th title="After this time, no longer available to be recovered from Trash">Permanently<br/>Deleted At</th>
+ <th>Owner</th>
+ <th>Contents</th>
+ <th></th>
+ </tr>
+ </thead>
+
+ <tbody data-infinite-scroller="#recent-trash-items" id="recent-trash-items"
+ data-infinite-content-href="<%= url_for partial: :trash_rows %>" >
+ </tbody>
+ </table>
+ </div>
+</div>
--- /dev/null
+<% @objects.each do |obj| %>
+ <tr data-object-uuid="<%= obj.uuid %>" data-kind="<%= obj.kind %>" >
+ <td>
+ <% if obj.editable? %>
+ <%= check_box_tag 'uuids[]', obj.uuid, false, :class => 'persistent-selection', style: 'cursor: pointer;' %>
+ <% end %>
+ </td>
+ <td>
+ <%= if !obj.name.blank? then obj.name else obj.uuid end %>
+ <td>
+ <%= render_localized_date(obj.trash_at) if obj.trash_at %>
+ <td>
+ <%= render_localized_date(obj.delete_at) if obj.delete_at %>
+ </td>
+ <td>
+ <%= link_to_if_arvados_object obj.owner_uuid, friendly_name: true %>
+ </td>
+ <td>
+ <% for i in (0..[2, obj.files.length-1].min) %>
+ <% file = obj.files[i] %>
+ <% file_path = "#{file[0]}/#{file[1]}" %>
+ <%= file_path %><br />
+ <% end %>
+ <% if obj.files.length > 3 %>
+ <%= "(#{obj.files.length-3} more files)" %>
+ <% end %>
+ </td>
+ <td>
+ <%= render partial: 'untrash_item', locals: {object:obj} %>
+ </td>
+ </tr>
+<% end %>
--- /dev/null
+<% if object.editable? %>
+ <% msg = "Untrash '" + if !object.name.blank? then object.name else object.uuid end + "'?" %>
+ <%= link_to({action: 'untrash_items', selection: [object.uuid]}, remote: true, method: :post,
+ title: "Untrash", style: 'cursor: pointer;') do %>
+ <i class="fa fa-fw fa-recycle"></i>
+ <% end %>
+<% end %>
--- /dev/null
+<%= render file: 'application/index.html.erb', locals: local_assigns %>
--- /dev/null
+<% @untrashed_uuids.each do |uuid| %>
+ $('[data-object-uuid=<%= uuid %>]').hide('slow', function() {
+ $(this).remove();
+ });
+<% end %>
-<div class="container">
+<div class="container" style="width: 100%">
<div class="row">
<div class="pull-right">
<input type="text" class="form-control filterable-control recent-all-processes-filterable-control"
resources :workflows
+ get "trash" => 'trash_items#index', :as => :trash
+ resources :trash_items do
+ post 'untrash_items', on: :collection
+ end
+
post 'actions' => 'actions#post'
get 'actions' => 'actions#show'
get 'websockets' => 'websocket#index'
assert_empty(json_response['content'],
'search results for empty project should be empty')
end
+
+ test 'search results for aproject and verify recursive contents' do
+ xhr :get, :choose, {
+ format: :json,
+ partial: true,
+ project_uuid: api_fixture('groups')['aproject']['uuid'],
+ }, session_for(:active)
+ assert_response :success
+ assert_not_empty(json_response['content'],
+ 'search results for aproject should not be empty')
+ items = []
+ json_response['content'].scan /<div[^>]+>/ do |div_tag|
+ div_tag.scan(/\ data-object-uuid=\"(.*?)\"/).each do |uuid,|
+ items << uuid
+ end
+ end
+
+ assert_includes(items, api_fixture('collections')['collection_to_move_around_in_aproject']['uuid'])
+ assert_includes(items, api_fixture('groups')['asubproject']['uuid'])
+ assert_includes(items, api_fixture('collections')['baz_collection_name_in_asubproject']['uuid'])
+ assert_includes(items,
+ api_fixture('groups')['subproject_in_asubproject_with_same_name_as_one_in_active_user_home']['uuid'])
+ end
end
--- /dev/null
+require 'integration_helper'
+
+class TrashTest < ActionDispatch::IntegrationTest
+ setup do
+ need_javascript
+ end
+
+ test "trash page" do
+ deleted = api_fixture('collections')['deleted_on_next_sweep']
+ expired1 = api_fixture('collections')['unique_expired_collection']
+ expired2 = api_fixture('collections')['unique_expired_collection2']
+
+ # visit trash page
+ visit page_with_token('active', "/trash")
+
+ assert_text deleted['name']
+ assert_text expired1['name']
+ assert_no_text expired2['name'] # not readable by this user
+ assert_no_text 'foo_file' # not trash
+
+ # Un-trash one item using selection dropdown
+ within('tr', text: deleted['name']) do
+ first('input').click
+ end
+
+ click_button 'Selection...'
+ within('.selection-action-container') do
+ click_link 'Un-trash selected items'
+ end
+
+ wait_for_ajax
+
+ assert_text expired1['name'] # this should still be there
+ assert_no_text deleted['name'] # this should no longer be here
+
+ # Un-trash another item using the recycle button
+ within('tr', text: expired1['name']) do
+ first('.fa-recycle').click
+ end
+
+ wait_for_ajax
+
+ assert_no_text expired1['name']
+
+ # verify that the two un-trashed items are now shown in /collections page
+ visit page_with_token('active', "/collections")
+ assert_text deleted['uuid']
+ assert_text expired1['uuid']
+ assert_no_text expired2['uuid']
+ end
+
+ test "trash page with search" do
+ deleted = api_fixture('collections')['deleted_on_next_sweep']
+ expired = api_fixture('collections')['unique_expired_collection']
+
+ visit page_with_token('active', "/trash")
+
+ assert_text deleted['name']
+ assert_text expired['name']
+
+ page.find_field('Search trash').set 'expired'
+
+ assert_text expired['name']
+ assert_no_text deleted['name']
+
+ click_button 'Selection...'
+ within('.selection-action-container') do
+ assert_selector 'li.disabled', text: 'Un-trash selected items'
+ end
+
+ first('input').click
+
+ click_button 'Selection...'
+ within('.selection-action-container') do
+ assert_selector 'li', text: 'Un-trash selected items'
+ assert_selector 'li.disabled', text: 'Un-trash selected items'
+ end
+ end
+end
# mode makes Go show the wrong line numbers when reporting
# compilation errors.
go get -t "git.curoverse.com/arvados.git/$1" && \
- cd "$WORKSPACE/$1" && \
+ cd "$GOPATH/src/git.curoverse.com/arvados.git/$1" && \
[[ -z "$(gofmt -e -d . | tee /dev/stderr)" ]] && \
if [[ -n "${testargs[$1]}" ]]
then
|limit|integer (default 100)|Maximum number of items to return.|query||
|order|string|Order in which to return matching items. Sort within a resource type by prefixing the attribute with the resource name and a dot.|query|@"collections.modified_at desc"@|
|filters|array|Conditions for filtering items.|query|@[["uuid", "is_a", "arvados#job"]]@|
+|recursive|boolean (default false)|Include items owned by subprojects.|query|@true@|
Note: Because adding access tokens to manifests can be computationally expensive, the @manifest_text@ field is not included in listed collections. If you need it, request a "list of collections":{{site.baseurl}}/api/methods/collections.html with the filter @["owner_uuid", "=", GROUP_UUID]@, and @"manifest_text"@ listed in the select parameter.
my $collated_output = save_output_collection();
Log (undef, "finish");
-save_meta();
+my $final_log = save_meta();
my $final_state;
-if ($collated_output && $main::success) {
+if ($collated_output && $final_log && $main::success) {
$final_state = 'Complete';
} else {
$final_state = 'Failed';
$log_pipe_pid = open2($log_pipe_out, $log_pipe_in,
'arv-put',
'--stream',
- '--retries', '3',
+ '--retries', '6',
'--filename', $logfilename,
'-');
$log_pipe_out_buf = "";
});
Log(undef, "log collection is " . $log_coll->{portable_data_hash});
$Job->update_attributes('log' => $log_coll->{portable_data_hash});
+
+ return $log_coll->{portable_data_hash};
}
"fmt"
"io"
"io/ioutil"
+ "log"
"math"
"net/http"
"net/url"
// ARVADOS_API_* environment variables.
func NewClientFromEnv() *Client {
var svcs []string
- if s := os.Getenv("ARVADOS_KEEP_SERVICES"); s != "" {
- svcs = strings.Split(s, " ")
+ for _, s := range strings.Split(os.Getenv("ARVADOS_KEEP_SERVICES"), " ") {
+ if s == "" {
+ continue
+ } else if u, err := url.Parse(s); err != nil {
+ log.Printf("ARVADOS_KEEP_SERVICES: %q: %s", s, err)
+ } else if !u.IsAbs() {
+ log.Printf("ARVADOS_KEEP_SERVICES: %q: not an absolute URI", s)
+ } else {
+ svcs = append(svcs, s)
+ }
+ }
+ var insecure bool
+ if s := strings.ToLower(os.Getenv("ARVADOS_API_HOST_INSECURE")); s == "1" || s == "yes" || s == "true" {
+ insecure = true
}
return &Client{
APIHost: os.Getenv("ARVADOS_API_HOST"),
AuthToken: os.Getenv("ARVADOS_API_TOKEN"),
- Insecure: os.Getenv("ARVADOS_API_HOST_INSECURE") != "",
+ Insecure: insecure,
KeepServiceURIs: svcs,
}
}
"fmt"
"io"
"io/ioutil"
+ "log"
"net/http"
"net/url"
"os"
"regexp"
"strings"
+ "sync"
"time"
"git.curoverse.com/arvados.git/sdk/go/arvados"
var RetryDelay = 2 * time.Second
+var (
+ defaultInsecureHTTPClient *http.Client
+ defaultSecureHTTPClient *http.Client
+ defaultHTTPClientMtx sync.Mutex
+)
+
// Indicates an error that was returned by the API server.
type APIServerError struct {
// Address of server returning error, of the form "host:port".
}
}
+// StringBool tests whether s is suggestive of true. It returns true
+// if s is a mixed/uppoer/lower-case variant of "1", "yes", or "true".
+func StringBool(s string) bool {
+ s = strings.ToLower(s)
+ return s == "1" || s == "yes" || s == "true"
+}
+
// Helper type so we don't have to write out 'map[string]interface{}' every time.
type Dict map[string]interface{}
"/etc/pki/tls/certs/ca-bundle.crt", // Fedora/RHEL
}
-// MakeTLSConfig sets up TLS configuration for communicating with Arvados and Keep services.
+// MakeTLSConfig sets up TLS configuration for communicating with
+// Arvados and Keep services.
func MakeTLSConfig(insecure bool) *tls.Config {
tlsconfig := tls.Config{InsecureSkipVerify: insecure}
if !insecure {
- // Look for /etc/arvados/ca-certificates.crt in addition to normal system certs.
+ // Use the first entry in CertFiles that we can read
+ // certificates from. If none of those work out, use
+ // the Go defaults.
certs := x509.NewCertPool()
for _, file := range CertFiles {
data, err := ioutil.ReadFile(file)
- if err == nil {
- success := certs.AppendCertsFromPEM(data)
- if !success {
- fmt.Printf("Unable to load any certificates from %v", file)
- } else {
- tlsconfig.RootCAs = certs
- break
+ if err != nil {
+ if !os.IsNotExist(err) {
+ log.Printf("error reading %q: %s", file, err)
}
+ continue
+ }
+ if !certs.AppendCertsFromPEM(data) {
+ log.Printf("unable to load any certificates from %v", file)
+ continue
}
+ tlsconfig.RootCAs = certs
+ break
}
- // Will use system default CA roots instead.
}
return &tlsconfig
TLSClientConfig: MakeTLSConfig(c.Insecure)}},
External: false,
Retries: 2,
+ KeepServiceURIs: c.KeepServiceURIs,
lastClosedIdlesAt: time.Now(),
}
// ARVADOS_API_HOST_INSECURE, ARVADOS_EXTERNAL_CLIENT, and
// ARVADOS_KEEP_SERVICES.
func MakeArvadosClient() (ac *ArvadosClient, err error) {
- var matchTrue = regexp.MustCompile("^(?i:1|yes|true)$")
- insecure := matchTrue.MatchString(os.Getenv("ARVADOS_API_HOST_INSECURE"))
- external := matchTrue.MatchString(os.Getenv("ARVADOS_EXTERNAL_CLIENT"))
-
- ac = &ArvadosClient{
- Scheme: "https",
- ApiServer: os.Getenv("ARVADOS_API_HOST"),
- ApiToken: os.Getenv("ARVADOS_API_TOKEN"),
- ApiInsecure: insecure,
- Client: &http.Client{Transport: &http.Transport{
- TLSClientConfig: MakeTLSConfig(insecure)}},
- External: external,
- Retries: 2}
-
- for _, s := range strings.Split(os.Getenv("ARVADOS_KEEP_SERVICES"), " ") {
- if s == "" {
- continue
- }
- if u, err := url.Parse(s); err != nil {
- return ac, fmt.Errorf("ARVADOS_KEEP_SERVICES: %q: %s", s, err)
- } else if !u.IsAbs() {
- return ac, fmt.Errorf("ARVADOS_KEEP_SERVICES: %q: not an absolute URI", s)
- }
- ac.KeepServiceURIs = append(ac.KeepServiceURIs, s)
- }
-
- if ac.ApiServer == "" {
- return ac, MissingArvadosApiHost
- }
- if ac.ApiToken == "" {
- return ac, MissingArvadosApiToken
+ ac, err = New(arvados.NewClientFromEnv())
+ if err != nil {
+ return
}
-
- ac.lastClosedIdlesAt = time.Now()
-
- return ac, err
+ ac.External = StringBool(os.Getenv("ARVADOS_EXTERNAL_CLIENT"))
+ return
}
// CallRaw is the same as Call() but returns a Reader that reads the
return value, ErrInvalidArgument
}
}
+
+func (ac *ArvadosClient) httpClient() *http.Client {
+ if ac.Client != nil {
+ return ac.Client
+ }
+ c := &defaultSecureHTTPClient
+ if ac.ApiInsecure {
+ c = &defaultInsecureHTTPClient
+ }
+ if *c == nil {
+ defaultHTTPClientMtx.Lock()
+ defer defaultHTTPClientMtx.Unlock()
+ *c = &http.Client{Transport: &http.Transport{
+ TLSClientConfig: MakeTLSConfig(ac.ApiInsecure)}}
+ }
+ return *c
+}
func (s *ServerRequiredSuite) TestMakeArvadosClientSecure(c *C) {
os.Setenv("ARVADOS_API_HOST_INSECURE", "")
- kc, err := MakeArvadosClient()
+ ac, err := MakeArvadosClient()
c.Assert(err, Equals, nil)
- c.Check(kc.ApiServer, Equals, os.Getenv("ARVADOS_API_HOST"))
- c.Check(kc.ApiToken, Equals, os.Getenv("ARVADOS_API_TOKEN"))
- c.Check(kc.ApiInsecure, Equals, false)
+ c.Check(ac.ApiServer, Equals, os.Getenv("ARVADOS_API_HOST"))
+ c.Check(ac.ApiToken, Equals, os.Getenv("ARVADOS_API_TOKEN"))
+ c.Check(ac.ApiInsecure, Equals, false)
}
func (s *ServerRequiredSuite) TestMakeArvadosClientInsecure(c *C) {
os.Setenv("ARVADOS_API_HOST_INSECURE", "true")
- kc, err := MakeArvadosClient()
+ ac, err := MakeArvadosClient()
c.Assert(err, Equals, nil)
- c.Check(kc.ApiInsecure, Equals, true)
- c.Check(kc.ApiServer, Equals, os.Getenv("ARVADOS_API_HOST"))
- c.Check(kc.ApiToken, Equals, os.Getenv("ARVADOS_API_TOKEN"))
- c.Check(kc.Client.Transport.(*http.Transport).TLSClientConfig.InsecureSkipVerify, Equals, true)
+ c.Check(ac.ApiInsecure, Equals, true)
+ c.Check(ac.ApiServer, Equals, os.Getenv("ARVADOS_API_HOST"))
+ c.Check(ac.ApiToken, Equals, os.Getenv("ARVADOS_API_TOKEN"))
+ c.Check(ac.Client.Transport.(*http.Transport).TLSClientConfig.InsecureSkipVerify, Equals, true)
}
func (s *ServerRequiredSuite) TestGetInvalidUUID(c *C) {
"encoding/json"
"fmt"
"log"
- "net/http"
"os"
"os/signal"
- "reflect"
"strings"
+ "sync"
"syscall"
"time"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
)
-// DiscoverKeepServers gets list of available keep services from the
-// API server.
-//
-// If a list of services is provided in the arvadosclient (e.g., from
-// an environment variable or local config), that list is used
-// instead.
-func (this *KeepClient) DiscoverKeepServers() error {
- if this.Arvados.KeepServiceURIs != nil {
- this.foundNonDiskSvc = true
- this.replicasPerService = 0
- if c, ok := this.Client.(*http.Client); ok {
- this.setClientSettingsNonDisk(c)
- }
- roots := make(map[string]string)
- for i, uri := range this.Arvados.KeepServiceURIs {
- roots[fmt.Sprintf("00000-bi6l4-%015d", i)] = uri
- }
- this.SetServiceRoots(roots, roots, roots)
- return nil
+// ClearCache clears the Keep service discovery cache.
+func RefreshServiceDiscovery() {
+ svcListCacheMtx.Lock()
+ defer svcListCacheMtx.Unlock()
+ for _, ent := range svcListCache {
+ ent.clear <- struct{}{}
}
+}
- // ArvadosClient did not provide a services list. Ask API
- // server for a list of accessible services.
- var list svcList
- err := this.Arvados.Call("GET", "keep_services", "", "accessible", nil, &list)
- if err != nil {
- return err
+// ClearCacheOnSIGHUP installs a signal handler that calls
+// ClearCache when SIGHUP is received.
+func RefreshServiceDiscoveryOnSIGHUP() {
+ svcListCacheMtx.Lock()
+ defer svcListCacheMtx.Unlock()
+ if svcListCacheSignal != nil {
+ return
}
- return this.loadKeepServers(list)
+ svcListCacheSignal = make(chan os.Signal, 1)
+ signal.Notify(svcListCacheSignal, syscall.SIGHUP)
+ go func() {
+ for range svcListCacheSignal {
+ RefreshServiceDiscovery()
+ }
+ }()
}
-// LoadKeepServicesFromJSON gets list of available keep services from given JSON
-func (this *KeepClient) LoadKeepServicesFromJSON(services string) error {
- var list svcList
-
- // Load keep services from given json
- dec := json.NewDecoder(strings.NewReader(services))
- if err := dec.Decode(&list); err != nil {
- return err
- }
+var (
+ svcListCache = map[string]cachedSvcList{}
+ svcListCacheSignal chan os.Signal
+ svcListCacheMtx sync.Mutex
+)
- return this.loadKeepServers(list)
+type cachedSvcList struct {
+ arv *arvadosclient.ArvadosClient
+ latest chan svcList
+ clear chan struct{}
}
-// RefreshServices calls DiscoverKeepServers to refresh the keep
-// service list on SIGHUP; when the given interval has elapsed since
-// the last refresh; and (if the last refresh failed) the given
-// errInterval has elapsed.
-func (kc *KeepClient) RefreshServices(interval, errInterval time.Duration) {
- var previousRoots = []map[string]string{}
-
- timer := time.NewTimer(interval)
- gotHUP := make(chan os.Signal, 1)
- signal.Notify(gotHUP, syscall.SIGHUP)
+// Check for new services list every few minutes. Send the latest list
+// to the "latest" channel as needed.
+func (ent *cachedSvcList) poll() {
+ wakeup := make(chan struct{})
+
+ replace := make(chan svcList)
+ go func() {
+ wakeup <- struct{}{}
+ current := <-replace
+ for {
+ select {
+ case <-ent.clear:
+ wakeup <- struct{}{}
+ // Wait here for the next success, in
+ // order to avoid returning stale
+ // results on the "latest" channel.
+ current = <-replace
+ case current = <-replace:
+ case ent.latest <- current:
+ }
+ }
+ }()
+ okDelay := 5 * time.Minute
+ errDelay := 3 * time.Second
+ timer := time.NewTimer(okDelay)
for {
select {
- case <-gotHUP:
case <-timer.C:
+ case <-wakeup:
+ if !timer.Stop() {
+ // Lost race stopping timer; skip extra firing
+ <-timer.C
+ }
}
- timer.Reset(interval)
-
- if err := kc.DiscoverKeepServers(); err != nil {
- log.Printf("WARNING: Error retrieving services list: %v (retrying in %v)", err, errInterval)
- timer.Reset(errInterval)
+ var next svcList
+ err := ent.arv.Call("GET", "keep_services", "", "accessible", nil, &next)
+ if err != nil {
+ log.Printf("WARNING: Error retrieving services list: %v (retrying in %v)", err, errDelay)
+ timer.Reset(errDelay)
continue
}
- newRoots := []map[string]string{kc.LocalRoots(), kc.GatewayRoots()}
+ replace <- next
+ timer.Reset(okDelay)
+ }
+}
- if !reflect.DeepEqual(previousRoots, newRoots) {
- DebugPrintf("DEBUG: Updated services list: locals %v gateways %v", newRoots[0], newRoots[1])
- previousRoots = newRoots
+// discoverServices gets the list of available keep services from
+// the API server.
+//
+// If a list of services is provided in the arvadosclient (e.g., from
+// an environment variable or local config), that list is used
+// instead.
+//
+// If an API call is made, the result is cached for 5 minutes or until
+// ClearCache() is called, and during this interval it is reused by
+// other KeepClients that use the same API server host.
+func (kc *KeepClient) discoverServices() error {
+ if kc.disableDiscovery {
+ return nil
+ }
+
+ if kc.Arvados.KeepServiceURIs != nil {
+ kc.disableDiscovery = true
+ kc.foundNonDiskSvc = true
+ kc.replicasPerService = 0
+ roots := make(map[string]string)
+ for i, uri := range kc.Arvados.KeepServiceURIs {
+ roots[fmt.Sprintf("00000-bi6l4-%015d", i)] = uri
}
+ kc.setServiceRoots(roots, roots, roots)
+ return nil
+ }
- if len(newRoots[0]) == 0 {
- log.Printf("WARNING: No local services (retrying in %v)", errInterval)
- timer.Reset(errInterval)
+ svcListCacheMtx.Lock()
+ cacheEnt, ok := svcListCache[kc.Arvados.ApiServer]
+ if !ok {
+ arv := *kc.Arvados
+ cacheEnt = cachedSvcList{
+ latest: make(chan svcList),
+ clear: make(chan struct{}),
+ arv: &arv,
}
+ go cacheEnt.poll()
+ svcListCache[kc.Arvados.ApiServer] = cacheEnt
}
+ svcListCacheMtx.Unlock()
+
+ return kc.loadKeepServers(<-cacheEnt.latest)
}
-// loadKeepServers
-func (this *KeepClient) loadKeepServers(list svcList) error {
+// LoadKeepServicesFromJSON gets list of available keep services from
+// given JSON and disables automatic service discovery.
+func (kc *KeepClient) LoadKeepServicesFromJSON(services string) error {
+ kc.disableDiscovery = true
+
+ var list svcList
+ dec := json.NewDecoder(strings.NewReader(services))
+ if err := dec.Decode(&list); err != nil {
+ return err
+ }
+
+ return kc.loadKeepServers(list)
+}
+
+func (kc *KeepClient) loadKeepServers(list svcList) error {
listed := make(map[string]bool)
localRoots := make(map[string]string)
gatewayRoots := make(map[string]string)
writableLocalRoots := make(map[string]string)
// replicasPerService is 1 for disks; unknown or unlimited otherwise
- this.replicasPerService = 1
+ kc.replicasPerService = 1
for _, service := range list.Items {
scheme := "http"
if service.ReadOnly == false {
writableLocalRoots[service.Uuid] = url
if service.SvcType != "disk" {
- this.replicasPerService = 0
+ kc.replicasPerService = 0
}
}
if service.SvcType != "disk" {
- this.foundNonDiskSvc = true
+ kc.foundNonDiskSvc = true
}
// Gateway services are only used when specified by
gatewayRoots[service.Uuid] = url
}
- if client, ok := this.Client.(*http.Client); ok {
- if this.foundNonDiskSvc {
- this.setClientSettingsNonDisk(client)
- } else {
- this.setClientSettingsDisk(client)
- }
- }
-
- this.SetServiceRoots(localRoots, writableLocalRoots, gatewayRoots)
+ kc.setServiceRoots(localRoots, writableLocalRoots, gatewayRoots)
return nil
}
import (
"crypto/md5"
"fmt"
- "gopkg.in/check.v1"
"net/http"
"os"
- "time"
+
+ "gopkg.in/check.v1"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/arvadostest"
)
-func ExampleKeepClient_RefreshServices() {
- arv, err := arvadosclient.MakeArvadosClient()
- if err != nil {
- panic(err)
- }
- kc, err := MakeKeepClient(arv)
- if err != nil {
- panic(err)
- }
- go kc.RefreshServices(5*time.Minute, 3*time.Second)
- fmt.Printf("LocalRoots: %#v\n", kc.LocalRoots())
-}
-
func (s *ServerRequiredSuite) TestOverrideDiscovery(c *check.C) {
defer os.Setenv("ARVADOS_KEEP_SERVICES", "")
"fmt"
"io"
"io/ioutil"
+ "net"
"net/http"
"regexp"
"strconv"
"strings"
"sync"
+ "time"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/streamer"
// A Keep "block" is 64MB.
const BLOCKSIZE = 64 * 1024 * 1024
+var (
+ DefaultRequestTimeout = 20 * time.Second
+ DefaultConnectTimeout = 2 * time.Second
+ DefaultTLSHandshakeTimeout = 4 * time.Second
+ DefaultKeepAlive = 180 * time.Second
+
+ DefaultProxyRequestTimeout = 300 * time.Second
+ DefaultProxyConnectTimeout = 30 * time.Second
+ DefaultProxyTLSHandshakeTimeout = 10 * time.Second
+ DefaultProxyKeepAlive = 120 * time.Second
+)
+
// Error interface with an error and boolean indicating whether the error is temporary
type Error interface {
error
type KeepClient struct {
Arvados *arvadosclient.ArvadosClient
Want_replicas int
- localRoots *map[string]string
- writableLocalRoots *map[string]string
- gatewayRoots *map[string]string
+ localRoots map[string]string
+ writableLocalRoots map[string]string
+ gatewayRoots map[string]string
lock sync.RWMutex
- Client HTTPClient
+ HTTPClient HTTPClient
Retries int
BlockCache *BlockCache
// Any non-disk typed services found in the list of keepservers?
foundNonDiskSvc bool
+
+ // Disable automatic discovery of keep services
+ disableDiscovery bool
}
-// MakeKeepClient creates a new KeepClient by contacting the API server to discover Keep servers.
+// MakeKeepClient creates a new KeepClient, calls
+// DiscoverKeepServices(), and returns when the client is ready to
+// use.
func MakeKeepClient(arv *arvadosclient.ArvadosClient) (*KeepClient, error) {
kc := New(arv)
- return kc, kc.DiscoverKeepServers()
+ return kc, kc.discoverServices()
}
-// New func creates a new KeepClient struct.
-// This func does not discover keep servers. It is the caller's responsibility.
+// New creates a new KeepClient. Service discovery will occur on the
+// next read/write operation.
func New(arv *arvadosclient.ArvadosClient) *KeepClient {
defaultReplicationLevel := 2
value, err := arv.Discovery("defaultCollectionReplication")
defaultReplicationLevel = int(v)
}
}
-
- kc := &KeepClient{
+ return &KeepClient{
Arvados: arv,
Want_replicas: defaultReplicationLevel,
- Client: &http.Client{Transport: &http.Transport{
- TLSClientConfig: arvadosclient.MakeTLSConfig(arv.ApiInsecure)}},
- Retries: 2,
+ Retries: 2,
}
- return kc
}
// Put a block given the block hash, a reader, and the number of bytes
continue
}
req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", kc.Arvados.ApiToken))
- resp, err := kc.Client.Do(req)
+ resp, err := kc.httpClient().Do(req)
if err != nil {
// Probably a network error, may be transient,
// can try again.
}
req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", kc.Arvados.ApiToken))
- resp, err := kc.Client.Do(req)
+ resp, err := kc.httpClient().Do(req)
if err != nil {
return nil, err
}
// LocalRoots() returns the map of local (i.e., disk and proxy) Keep
// services: uuid -> baseURI.
func (kc *KeepClient) LocalRoots() map[string]string {
+ kc.discoverServices()
kc.lock.RLock()
defer kc.lock.RUnlock()
- return *kc.localRoots
+ return kc.localRoots
}
// GatewayRoots() returns the map of Keep remote gateway services:
// uuid -> baseURI.
func (kc *KeepClient) GatewayRoots() map[string]string {
+ kc.discoverServices()
kc.lock.RLock()
defer kc.lock.RUnlock()
- return *kc.gatewayRoots
+ return kc.gatewayRoots
}
// WritableLocalRoots() returns the map of writable local Keep services:
// uuid -> baseURI.
func (kc *KeepClient) WritableLocalRoots() map[string]string {
+ kc.discoverServices()
kc.lock.RLock()
defer kc.lock.RUnlock()
- return *kc.writableLocalRoots
+ return kc.writableLocalRoots
}
-// SetServiceRoots updates the localRoots and gatewayRoots maps,
-// without risk of disrupting operations that are already in progress.
+// SetServiceRoots disables service discovery and updates the
+// localRoots and gatewayRoots maps, without disrupting operations
+// that are already in progress.
//
-// The KeepClient makes its own copy of the supplied maps, so the
-// caller can reuse/modify them after SetServiceRoots returns, but
-// they should not be modified by any other goroutine while
-// SetServiceRoots is running.
-func (kc *KeepClient) SetServiceRoots(newLocals, newWritableLocals, newGateways map[string]string) {
- locals := make(map[string]string)
- for uuid, root := range newLocals {
- locals[uuid] = root
- }
-
- writables := make(map[string]string)
- for uuid, root := range newWritableLocals {
- writables[uuid] = root
- }
-
- gateways := make(map[string]string)
- for uuid, root := range newGateways {
- gateways[uuid] = root
- }
+// The supplied maps must not be modified after calling
+// SetServiceRoots.
+func (kc *KeepClient) SetServiceRoots(locals, writables, gateways map[string]string) {
+ kc.disableDiscovery = true
+ kc.setServiceRoots(locals, writables, gateways)
+}
+func (kc *KeepClient) setServiceRoots(locals, writables, gateways map[string]string) {
kc.lock.Lock()
defer kc.lock.Unlock()
- kc.localRoots = &locals
- kc.writableLocalRoots = &writables
- kc.gatewayRoots = &gateways
+ kc.localRoots = locals
+ kc.writableLocalRoots = writables
+ kc.gatewayRoots = gateways
}
// getSortedRoots returns a list of base URIs of Keep services, in the
}
}
+var (
+ // There are four global http.Client objects for the four
+ // possible permutations of TLS behavior (verify/skip-verify)
+ // and timeout settings (proxy/non-proxy).
+ defaultClient = map[bool]map[bool]HTTPClient{
+ // defaultClient[false] is used for verified TLS reqs
+ false: {},
+ // defaultClient[true] is used for unverified
+ // (insecure) TLS reqs
+ true: {},
+ }
+ defaultClientMtx sync.Mutex
+)
+
+// httpClient returns the HTTPClient field if it's not nil, otherwise
+// whichever of the four global http.Client objects is suitable for
+// the current environment (i.e., TLS verification on/off, keep
+// services are/aren't proxies).
+func (kc *KeepClient) httpClient() HTTPClient {
+ if kc.HTTPClient != nil {
+ return kc.HTTPClient
+ }
+ defaultClientMtx.Lock()
+ defer defaultClientMtx.Unlock()
+ if c, ok := defaultClient[kc.Arvados.ApiInsecure][kc.foundNonDiskSvc]; ok {
+ return c
+ }
+
+ var requestTimeout, connectTimeout, keepAlive, tlsTimeout time.Duration
+ if kc.foundNonDiskSvc {
+ // Use longer timeouts when connecting to a proxy,
+ // because this usually means the intervening network
+ // is slower.
+ requestTimeout = DefaultProxyRequestTimeout
+ connectTimeout = DefaultProxyConnectTimeout
+ tlsTimeout = DefaultProxyTLSHandshakeTimeout
+ keepAlive = DefaultProxyKeepAlive
+ } else {
+ requestTimeout = DefaultRequestTimeout
+ connectTimeout = DefaultConnectTimeout
+ tlsTimeout = DefaultTLSHandshakeTimeout
+ keepAlive = DefaultKeepAlive
+ }
+
+ transport, ok := http.DefaultTransport.(*http.Transport)
+ if ok {
+ copy := *transport
+ transport = ©
+ } else {
+ // Evidently the application has replaced
+ // http.DefaultTransport with a different type, so we
+ // need to build our own from scratch using the Go 1.8
+ // defaults.
+ transport = &http.Transport{
+ MaxIdleConns: 100,
+ IdleConnTimeout: 90 * time.Second,
+ ExpectContinueTimeout: time.Second,
+ }
+ }
+ transport.DialContext = (&net.Dialer{
+ Timeout: connectTimeout,
+ KeepAlive: keepAlive,
+ DualStack: true,
+ }).DialContext
+ transport.TLSHandshakeTimeout = tlsTimeout
+ transport.TLSClientConfig = arvadosclient.MakeTLSConfig(kc.Arvados.ApiInsecure)
+ c := &http.Client{
+ Timeout: requestTimeout,
+ Transport: transport,
+ }
+ defaultClient[kc.Arvados.ApiInsecure][kc.foundNonDiskSvc] = c
+ return c
+}
+
type Locator struct {
Hash string
Size int // -1 if data size is not known
// Standalone tests
type StandaloneSuite struct{}
+func (s *StandaloneSuite) SetUpTest(c *C) {
+ RefreshServiceDiscovery()
+}
+
func pythonDir() string {
cwd, _ := os.Getwd()
return fmt.Sprintf("%s/../../python/tests", cwd)
arvadostest.StopAPI()
}
+func (s *ServerRequiredSuite) SetUpTest(c *C) {
+ RefreshServiceDiscovery()
+}
+
func (s *ServerRequiredSuite) TestMakeKeepClient(c *C) {
arv, err := arvadosclient.MakeArvadosClient()
c.Assert(err, Equals, nil)
defer ks.listener.Close()
arv, err := arvadosclient.MakeArvadosClient()
- kc, _ := MakeKeepClient(arv)
+ c.Assert(err, IsNil)
+ kc, err := MakeKeepClient(arv)
+ c.Assert(err, IsNil)
arv.ApiToken = "abc123"
kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
r, err := kc.GetIndex("x", "")
- c.Check(err, Equals, nil)
+ c.Check(err, IsNil)
content, err2 := ioutil.ReadAll(r)
c.Check(err2, Equals, nil)
kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
r, err := kc.GetIndex("x", hash[0:3])
- c.Check(err, Equals, nil)
+ c.Assert(err, Equals, nil)
content, err2 := ioutil.ReadAll(r)
c.Check(err2, Equals, nil)
&blobKeepService)
c.Assert(err, Equals, nil)
defer func() { arv.Delete("keep_services", blobKeepService["uuid"].(string), nil, nil) }()
+ RefreshServiceDiscovery()
// Make a keepclient and ensure that the testblobstore is included
kc, err := MakeKeepClient(arv)
c.Assert(kc.replicasPerService, Equals, 0)
c.Assert(kc.foundNonDiskSvc, Equals, true)
- c.Assert(kc.Client.(*http.Client).Timeout, Equals, 300*time.Second)
+ c.Assert(kc.httpClient().(*http.Client).Timeout, Equals, 300*time.Second)
}
"io/ioutil"
"log"
"math/rand"
- "net"
"net/http"
"os"
- "regexp"
"strings"
- "time"
+ "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/streamer"
)
var DebugPrintf = func(string, ...interface{}) {}
func init() {
- var matchTrue = regexp.MustCompile("^(?i:1|yes|true)$")
- if matchTrue.MatchString(os.Getenv("ARVADOS_DEBUG")) {
+ if arvadosclient.StringBool(os.Getenv("ARVADOS_DEBUG")) {
DebugPrintf = log.Printf
}
}
return fmt.Sprintf("%x", md5.Sum([]byte(s)))
}
-// Set timeouts applicable when connecting to non-disk services
-// (assumed to be over the Internet).
-func (*KeepClient) setClientSettingsNonDisk(client *http.Client) {
- // Maximum time to wait for a complete response
- client.Timeout = 300 * time.Second
-
- // TCP and TLS connection settings
- client.Transport = &http.Transport{
- Dial: (&net.Dialer{
- // The maximum time to wait to set up
- // the initial TCP connection.
- Timeout: 30 * time.Second,
-
- // The TCP keep alive heartbeat
- // interval.
- KeepAlive: 120 * time.Second,
- }).Dial,
-
- TLSHandshakeTimeout: 10 * time.Second,
- }
-}
-
-// Set timeouts applicable when connecting to keepstore services directly
-// (assumed to be on the local network).
-func (*KeepClient) setClientSettingsDisk(client *http.Client) {
- // Maximum time to wait for a complete response
- client.Timeout = 20 * time.Second
-
- // TCP and TLS connection timeouts
- client.Transport = &http.Transport{
- Dial: (&net.Dialer{
- // The maximum time to wait to set up
- // the initial TCP connection.
- Timeout: 2 * time.Second,
-
- // The TCP keep alive heartbeat
- // interval.
- KeepAlive: 180 * time.Second,
- }).Dial,
-
- TLSHandshakeTimeout: 4 * time.Second,
- }
-}
-
type svcList struct {
Items []keepService `json:"items"`
}
req.ContentLength = expectedLength
if expectedLength > 0 {
- // http.Client.Do will close the body ReadCloser when it is
- // done with it.
+ // Do() will close the body ReadCloser when it is done
+ // with it.
req.Body = body
} else {
// "For client requests, a value of 0 means unknown if Body is
req.Header.Add(X_Keep_Desired_Replicas, fmt.Sprint(this.Want_replicas))
var resp *http.Response
- if resp, err = this.Client.Do(req); err != nil {
+ if resp, err = this.httpClient().Do(req); err != nil {
DebugPrintf("DEBUG: [%08x] Upload failed %v error: %v", requestID, url, err.Error())
upload_status <- uploadStatus{err, url, 0, 0, ""}
return
class Arvados::V1::CollectionsController < ApplicationController
include DbCurrentTime
+ def self._index_requires_parameters
+ (super rescue {}).
+ merge({
+ include_trash: {
+ type: 'boolean', required: false, description: "Include collections whose is_trashed attribute is true."
+ },
+ })
+ end
+
+
def create
if resource_attrs[:uuid] and (loc = Keep::Locator.parse(resource_attrs[:uuid]))
resource_attrs[:portable_data_hash] = loc.to_s
end
def find_objects_for_index
- if params[:include_trash] || ['destroy', 'trash'].include?(action_name)
+ if params[:include_trash] || ['destroy', 'trash', 'untrash'].include?(action_name)
@objects = Collection.unscoped.readable_by(*@read_users)
end
super
show
end
+ def untrash
+ if @object.is_trashed
+ @object.update_attributes!(trash_at: nil)
+ else
+ raise InvalidStateTransitionError
+ end
+ show
+ end
+
def find_collections(visited, sp, &b)
case sp
when ArvadosModel
uuid: {
type: 'string', required: false, default: nil
},
+ recursive: {
+ type: 'boolean', required: false, description: 'Include contents from child groups recursively.'
+ },
})
params.delete(:select)
params
end
end
+ filter_by_owner = {}
+ if @object
+ if params['recursive']
+ filter_by_owner[:owner_uuid] = [@object.uuid] + @object.descendant_project_uuids
+ else
+ filter_by_owner[:owner_uuid] = @object.uuid
+ end
+ end
+
seen_last_class = false
klasses.each do |klass|
@offset = 0 if seen_last_class # reset offset for the new next type being processed
klass.default_orders.join(", ")
@select = nil
- where_conds = {}
- where_conds[:owner_uuid] = @object.uuid if @object
+ where_conds = filter_by_owner
if klass == Collection
@select = klass.selectable_attributes - ["manifest_text"]
elsif klass == Group
- where_conds[:group_class] = "project"
+ where_conds = where_conds.merge(group_class: "project")
end
@filters = request_filters.map do |col, op, val|
skip_before_filter :find_object_by_uuid, only: :accessible
skip_before_filter :render_404_if_no_object, only: :accessible
+ skip_before_filter :require_auth_scope, only: :accessible
def find_objects_for_index
# all users can list all keep services
# Check if any of the users are admin. If so, we're done.
if users_list.select { |u| u.is_admin }.any?
- return self
+ # Return existing relation with no new filters.
+ return where({})
end
# Collect the UUIDs of the authorized users.
log_reuse_info(candidates) { "after filtering on repo, script, and custom filters #{filters.inspect}" }
chosen = nil
+ chosen_output = nil
incomplete_job = nil
candidates.each do |j|
if j.state != Job::Complete
# Ignore: we have already decided not to reuse any completed
# job.
log_reuse_info { "job #{j.uuid} with output #{j.output} ignored, see above" }
+ elsif j.output.nil?
+ log_reuse_info { "job #{j.uuid} has nil output" }
+ elsif j.log.nil?
+ log_reuse_info { "job #{j.uuid} has nil log" }
elsif Rails.configuration.reuse_job_if_outputs_differ
- if Collection.readable_by(current_user).find_by_portable_data_hash(j.output)
- log_reuse_info { "job #{j.uuid} with output #{j.output} is reusable; decision is final." }
- return j
- else
- # Ignore: keep locking for an incomplete job or one whose
+ if !Collection.readable_by(current_user).find_by_portable_data_hash(j.output)
+ # Ignore: keep looking for an incomplete job or one whose
# output is readable.
log_reuse_info { "job #{j.uuid} output #{j.output} unavailable to user; continuing search" }
+ elsif !Collection.readable_by(current_user).find_by_portable_data_hash(j.log)
+ # Ignore: keep looking for an incomplete job or one whose
+ # log is readable.
+ log_reuse_info { "job #{j.uuid} log #{j.log} unavailable to user; continuing search" }
+ else
+ log_reuse_info { "job #{j.uuid} with output #{j.output} is reusable; decision is final." }
+ return j
end
- elsif chosen
- if chosen.output != j.output
+ elsif chosen_output
+ if chosen_output != j.output
# If two matching jobs produced different outputs, run a new
# job (or use one that's already running/queued) instead of
# choosing one arbitrarily.
# any further investigation of reusable jobs is futile.
log_reuse_info { "job #{j.uuid} output #{j.output} is unavailable to user; this means no finished job can be reused (see reuse_job_if_outputs_differ in application.default.yml)" }
chosen = false
+ elsif !Collection.readable_by(current_user).find_by_portable_data_hash(j.log)
+ # This user cannot read the log of this job, don't try to reuse the
+ # job but consider if the output is consistent.
+ log_reuse_info { "job #{j.uuid} log #{j.log} is unavailable to user; continuing search" }
+ chosen_output = j.output
else
log_reuse_info { "job #{j.uuid} with output #{j.output} can be reused; continuing search in case other candidates have different outputs" }
chosen = j
+ chosen_output = j.output
end
end
j = chosen || incomplete_job
# and perm_hash[:write] are true if this user can read and write
# objects owned by group_uuid.
def calculate_group_permissions
- conn = ActiveRecord::Base.connection
- self.class.transaction do
- # Check whether the temporary view has already been created
- # during this connection. If not, create it.
- conn.exec_query 'SAVEPOINT check_permission_view'
- begin
- conn.exec_query('SELECT 1 FROM permission_view LIMIT 0')
- rescue
- conn.exec_query 'ROLLBACK TO SAVEPOINT check_permission_view'
- sql = File.read(Rails.root.join('lib', 'create_permission_view.sql'))
- conn.exec_query(sql)
- ensure
- conn.exec_query 'RELEASE SAVEPOINT check_permission_view'
- end
- end
+ install_view('permission')
group_perms = {}
- conn.exec_query('SELECT target_owner_uuid, max(perm_level)
- FROM permission_view
- WHERE user_uuid = $1
- AND target_owner_uuid IS NOT NULL
- GROUP BY target_owner_uuid',
- # "name" arg is a query label that appears in logs:
- "group_permissions for #{uuid}",
- # "binds" arg is an array of [col_id, value] for '$1' vars:
- [[nil, uuid]],
- ).rows.each do |group_uuid, max_p_val|
+ ActiveRecord::Base.connection.
+ exec_query('SELECT target_owner_uuid, max(perm_level)
+ FROM permission_view
+ WHERE user_uuid = $1
+ AND target_owner_uuid IS NOT NULL
+ GROUP BY target_owner_uuid',
+ # "name" arg is a query label that appears in logs:
+ "group_permissions for #{uuid}",
+ # "binds" arg is an array of [col_id, value] for '$1' vars:
+ [[nil, uuid]],
+ ).rows.each do |group_uuid, max_p_val|
group_perms[group_uuid] = PERMS_FOR_VAL[max_p_val.to_i]
end
Rails.cache.write "groups_for_user_#{self.uuid}", group_perms
get 'provenance', on: :member
get 'used_by', on: :member
post 'trash', on: :member
+ post 'untrash', on: :member
end
resources :groups do
get 'contents', on: :collection
base.validate :restrict_uuid_change_breaking_associations
end
+ def descendant_project_uuids
+ install_view('ancestor')
+ ActiveRecord::Base.connection.
+ exec_query('SELECT ancestor_view.uuid
+ FROM ancestor_view
+ LEFT JOIN groups ON groups.uuid=ancestor_view.uuid
+ WHERE ancestor_uuid = $1 AND groups.group_class = $2',
+ # "name" arg is a query label that appears in logs:
+ "descendant_project_uuids for #{self.uuid}",
+ # "binds" arg is an array of [col_id, value] for '$1' vars:
+ [[nil, self.uuid], [nil, 'project']],
+ ).rows.map do |project_uuid,|
+ project_uuid
+ end
+ end
+
protected
def restrict_uuid_change_breaking_associations
end
end
+ def install_view(type)
+ conn = ActiveRecord::Base.connection
+ self.class.transaction do
+ # Check whether the temporary view has already been created
+ # during this connection. If not, create it.
+ conn.exec_query "SAVEPOINT check_#{type}_view"
+ begin
+ conn.exec_query("SELECT 1 FROM #{type}_view LIMIT 0")
+ rescue
+ conn.exec_query "ROLLBACK TO SAVEPOINT check_#{type}_view"
+ sql = File.read(Rails.root.join("lib", "create_#{type}_view.sql"))
+ conn.exec_query(sql)
+ ensure
+ conn.exec_query "RELEASE SAVEPOINT check_#{type}_view"
+ end
+ end
+ end
end
--- /dev/null
+CREATE TEMPORARY VIEW ancestor_view AS
+WITH RECURSIVE
+ancestor (uuid, ancestor_uuid) AS (
+ SELECT groups.uuid::varchar(32) AS uuid,
+ groups.owner_uuid::varchar(32) AS ancestor_uuid
+ FROM groups
+ UNION
+ SELECT ancestor.uuid::varchar(32) AS uuid,
+ groups.owner_uuid::varchar(32) AS ancestor_uuid
+ FROM ancestor
+ INNER JOIN groups
+ ON groups.uuid = ancestor.ancestor_uuid
+)
+SELECT * FROM ancestor;
trash_at: 2001-01-01T00:00:00Z
delete_at: 2038-01-01T00:00:00Z
manifest_text: ". 29d7797f1888013986899bc9083783fa+3 0:3:expired\n"
- name: unique_expired_collection
+ name: unique_expired_collection1
unique_expired_collection2:
uuid: zzzzz-4zz18-mto52zx1s7sn3jr
state: Complete
script_parameters_digest: a5f03bbfb8ba88a2efe4a7852671605b
+previous_job_run_nil_log:
+ uuid: zzzzz-8i9sb-cjs4pklxxjykqq3
+ created_at: <%= 14.minute.ago.to_s(:db) %>
+ finished_at: <%= 13.minutes.ago.to_s(:db) %>
+ owner_uuid: zzzzz-tpzed-xurymjxw79nv3jz
+ repository: active/foo
+ script: hash
+ script_version: 4fe459abe02d9b365932b8f5dc419439ab4e2577
+ script_parameters:
+ input: fa7aeb5140e2848d39b416daeef4ffc5+45
+ an_integer: "3"
+ success: true
+ log: ~
+ output: ea10d51bcf88862dbcc36eb292017dfd+45
+ state: Complete
+ script_parameters_digest: 445702df4029b8a6e7075b451ff1256a
+
previous_ancient_job_run:
uuid: zzzzz-8i9sb-ahd7cie8jah9qui
created_at: <%= 366.days.ago.to_s(:db) %>
docker_image_locator: fa3c1a9cb6783f85f2ecda037e07b8c3+167
state: Complete
script_parameters_digest: a5f03bbfb8ba88a2efe4a7852671605b
+ log: ea10d51bcf88862dbcc36eb292017dfd+45
previous_ancient_docker_image_job_run:
uuid: zzzzz-8i9sb-t3b460aolxxuldl
output: ea10d51bcf88862dbcc36eb292017dfd+45
state: Complete
script_parameters_digest: a5f03bbfb8ba88a2efe4a7852671605b
+ log: ea10d51bcf88862dbcc36eb292017dfd+45
previous_job_run_no_output:
uuid: zzzzz-8i9sb-cjs4pklxxjykppp
assert_operator c.delete_at, :>=, time_before_trashing + Rails.configuration.default_trash_lifetime
end
end
+
+ test 'untrash a trashed collection' do
+ authorize_with :active
+ post :untrash, {
+ id: collections(:expired_collection).uuid,
+ }
+ assert_response 200
+ assert_equal false, json_response['is_trashed']
+ assert_nil json_response['trash_at']
+ end
+
+ test 'untrash error on not trashed collection' do
+ authorize_with :active
+ post :untrash, {
+ id: collections(:collection_owned_by_active).uuid,
+ }
+ assert_response 422
+ end
+
+ [:active, :admin].each do |user|
+ test "get trashed collections as #{user}" do
+ authorize_with user
+ get :index, {
+ filters: [["is_trashed", "=", true]],
+ include_trash: true,
+ }
+ assert_response :success
+
+ items = []
+ json_response["items"].each do |coll|
+ items << coll['uuid']
+ end
+
+ assert_includes(items, collections('unique_expired_collection')['uuid'])
+ if user == :admin
+ assert_includes(items, collections('unique_expired_collection2')['uuid'])
+ else
+ assert_not_includes(items, collections('unique_expired_collection2')['uuid'])
+ end
+ end
+ end
end
assert_operator(json_response['items'].count,
:<, json_response['items_available'])
end
+
+ test 'get contents, recursive=true' do
+ authorize_with :active
+ params = {
+ id: groups(:aproject).uuid,
+ recursive: true,
+ format: :json,
+ }
+ get :contents, params
+ owners = json_response['items'].map do |item|
+ item['owner_uuid']
+ end
+ assert_includes(owners, groups(:aproject).uuid)
+ assert_includes(owners, groups(:asubproject).uuid)
+ end
+
+ [false, nil].each do |recursive|
+ test "get contents, recursive=#{recursive.inspect}" do
+ authorize_with :active
+ params = {
+ id: groups(:aproject).uuid,
+ format: :json,
+ }
+ params[:recursive] = false if recursive == false
+ get :contents, params
+ owners = json_response['items'].map do |item|
+ item['owner_uuid']
+ end
+ assert_includes(owners, groups(:aproject).uuid)
+ refute_includes(owners, groups(:asubproject).uuid)
+ end
+ end
+
+ test 'get home project contents, recursive=true' do
+ authorize_with :active
+ get :contents, {
+ id: users(:active).uuid,
+ recursive: true,
+ format: :json,
+ }
+ owners = json_response['items'].map do |item|
+ item['owner_uuid']
+ end
+ assert_includes(owners, users(:active).uuid)
+ assert_includes(owners, groups(:aproject).uuid)
+ assert_includes(owners, groups(:asubproject).uuid)
+ end
end
assert_equal '4fe459abe02d9b365932b8f5dc419439ab4e2577', new_job['script_version']
end
+ test "no reuse job with null log" do
+ post :create, {
+ job: {
+ script: "hash",
+ script_version: "4fe459abe02d9b365932b8f5dc419439ab4e2577",
+ repository: "active/foo",
+ script_parameters: {
+ input: 'fa7aeb5140e2848d39b416daeef4ffc5+45',
+ an_integer: '3'
+ }
+ },
+ find_or_create: true
+ }
+ assert_response :success
+ assert_not_nil assigns(:object)
+ new_job = JSON.parse(@response.body)
+ assert_not_equal 'zzzzz-8i9sb-cjs4pklxxjykqq3', new_job['uuid']
+ assert_equal '4fe459abe02d9b365932b8f5dc419439ab4e2577', new_job['script_version']
+ end
+
test "reuse job with symbolic script_version" do
post :create, {
job: {
assert_equal true, assigns(:objects).any?
end
- [:admin, :active, :inactive, :anonymous].each do |u|
- test "accessible to #{u} user" do
- authorize_with u
+ [:admin, :active, :inactive, :anonymous, nil].each do |u|
+ test "accessible to #{u.inspect} user" do
+ authorize_with(u) if u
get :accessible
assert_response :success
assert_not_empty json_response['items']
refute_includes(discovery_doc['resources'][r]['methods'].keys(), 'create')
end
end
+
+ test "groups contents parameters" do
+ get :index
+ assert_response :success
+
+ discovery_doc = JSON.parse(@response.body)
+
+ group_index_params = discovery_doc['resources']['groups']['methods']['index']['parameters']
+ group_contents_params = discovery_doc['resources']['groups']['methods']['contents']['parameters']
+
+ assert_equal group_contents_params.keys.sort, (group_index_params.keys - ['select'] + ['uuid', 'recursive']).sort
+
+ recursive_param = group_contents_params['recursive']
+ assert_equal 'boolean', recursive_param['type']
+ assert_equal false, recursive_param['required']
+ assert_equal 'query', recursive_param['location']
+ end
+
+ test "collections index parameters" do
+ get :index
+ assert_response :success
+
+ discovery_doc = JSON.parse(@response.body)
+
+ specimens_index_params = discovery_doc['resources']['specimens']['methods']['index']['parameters'] # no changes from super
+ coll_index_params = discovery_doc['resources']['collections']['methods']['index']['parameters']
+
+ assert_equal coll_index_params.keys.sort, (specimens_index_params.keys + ['include_trash']).sort
+
+ include_trash_param = coll_index_params['include_trash']
+ assert_equal 'boolean', include_trash_param['type']
+ assert_equal false, include_trash_param['required']
+ assert_equal 'query', include_trash_param['location']
+ end
end
import (
"bytes"
"log"
- "net/http"
"os"
"strings"
"testing"
arv, err := arvadosclient.MakeArvadosClient()
arv.ApiToken = arvadostest.DataManagerToken
c.Assert(err, check.IsNil)
- s.keepClient = &keepclient.KeepClient{
- Arvados: arv,
- Client: &http.Client{},
- }
- c.Assert(s.keepClient.DiscoverKeepServers(), check.IsNil)
+
+ s.keepClient, err = keepclient.MakeKeepClient(arv)
+ c.Assert(err, check.IsNil)
s.putReplicas(c, "foo", 4)
s.putReplicas(c, "bar", 1)
}
--- /dev/null
+package main
+
+import (
+ "fmt"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+ "github.com/hashicorp/golang-lru"
+)
+
+type cache struct {
+ TTL arvados.Duration
+ MaxCollectionEntries int
+ MaxCollectionBytes int64
+ MaxPermissionEntries int
+ MaxUUIDEntries int
+
+ stats cacheStats
+ pdhs *lru.TwoQueueCache
+ collections *lru.TwoQueueCache
+ permissions *lru.TwoQueueCache
+ setupOnce sync.Once
+}
+
+type cacheStats struct {
+ Requests uint64 `json:"Cache.Requests"`
+ CollectionBytes uint64 `json:"Cache.CollectionBytes"`
+ CollectionEntries int `json:"Cache.CollectionEntries"`
+ CollectionHits uint64 `json:"Cache.CollectionHits"`
+ PDHHits uint64 `json:"Cache.UUIDHits"`
+ PermissionHits uint64 `json:"Cache.PermissionHits"`
+ APICalls uint64 `json:"Cache.APICalls"`
+}
+
+type cachedPDH struct {
+ expire time.Time
+ pdh string
+}
+
+type cachedCollection struct {
+ expire time.Time
+ collection map[string]interface{}
+}
+
+type cachedPermission struct {
+ expire time.Time
+}
+
+func (c *cache) setup() {
+ var err error
+ c.pdhs, err = lru.New2Q(c.MaxUUIDEntries)
+ if err != nil {
+ panic(err)
+ }
+ c.collections, err = lru.New2Q(c.MaxCollectionEntries)
+ if err != nil {
+ panic(err)
+ }
+ c.permissions, err = lru.New2Q(c.MaxPermissionEntries)
+ if err != nil {
+ panic(err)
+ }
+}
+
+var selectPDH = map[string]interface{}{
+ "select": []string{"portable_data_hash"},
+}
+
+func (c *cache) Stats() cacheStats {
+ c.setupOnce.Do(c.setup)
+ return cacheStats{
+ Requests: atomic.LoadUint64(&c.stats.Requests),
+ CollectionBytes: c.collectionBytes(),
+ CollectionEntries: c.collections.Len(),
+ CollectionHits: atomic.LoadUint64(&c.stats.CollectionHits),
+ PDHHits: atomic.LoadUint64(&c.stats.PDHHits),
+ PermissionHits: atomic.LoadUint64(&c.stats.PermissionHits),
+ APICalls: atomic.LoadUint64(&c.stats.APICalls),
+ }
+}
+
+func (c *cache) Get(arv *arvadosclient.ArvadosClient, targetID string, forceReload bool) (map[string]interface{}, error) {
+ c.setupOnce.Do(c.setup)
+
+ atomic.AddUint64(&c.stats.Requests, 1)
+
+ permOK := false
+ permKey := arv.ApiToken + "\000" + targetID
+ if forceReload {
+ } else if ent, cached := c.permissions.Get(permKey); cached {
+ ent := ent.(*cachedPermission)
+ if ent.expire.Before(time.Now()) {
+ c.permissions.Remove(permKey)
+ } else {
+ permOK = true
+ atomic.AddUint64(&c.stats.PermissionHits, 1)
+ }
+ }
+
+ var pdh string
+ if arvadosclient.PDHMatch(targetID) {
+ pdh = targetID
+ } else if forceReload {
+ } else if ent, cached := c.pdhs.Get(targetID); cached {
+ ent := ent.(*cachedPDH)
+ if ent.expire.Before(time.Now()) {
+ c.pdhs.Remove(targetID)
+ } else {
+ pdh = ent.pdh
+ atomic.AddUint64(&c.stats.PDHHits, 1)
+ }
+ }
+
+ var collection map[string]interface{}
+ if pdh != "" {
+ collection = c.lookupCollection(pdh)
+ }
+
+ if collection != nil && permOK {
+ return collection, nil
+ } else if collection != nil {
+ // Ask API for current PDH for this targetID. Most
+ // likely, the cached PDH is still correct; if so,
+ // _and_ the current token has permission, we can
+ // use our cached manifest.
+ atomic.AddUint64(&c.stats.APICalls, 1)
+ var current map[string]interface{}
+ err := arv.Get("collections", targetID, selectPDH, ¤t)
+ if err != nil {
+ return nil, err
+ }
+ if checkPDH, ok := current["portable_data_hash"].(string); !ok {
+ return nil, fmt.Errorf("API response for %q had no PDH", targetID)
+ } else if checkPDH == pdh {
+ exp := time.Now().Add(time.Duration(c.TTL))
+ c.permissions.Add(permKey, &cachedPermission{
+ expire: exp,
+ })
+ if pdh != targetID {
+ c.pdhs.Add(targetID, &cachedPDH{
+ expire: exp,
+ pdh: pdh,
+ })
+ }
+ return collection, err
+ } else {
+ // PDH changed, but now we know we have
+ // permission -- and maybe we already have the
+ // new PDH in the cache.
+ if coll := c.lookupCollection(checkPDH); coll != nil {
+ return coll, nil
+ }
+ }
+ }
+
+ // Collection manifest is not cached.
+ atomic.AddUint64(&c.stats.APICalls, 1)
+ err := arv.Get("collections", targetID, nil, &collection)
+ if err != nil {
+ return nil, err
+ }
+ pdh, ok := collection["portable_data_hash"].(string)
+ if !ok {
+ return nil, fmt.Errorf("API response for %q had no PDH", targetID)
+ }
+ exp := time.Now().Add(time.Duration(c.TTL))
+ c.permissions.Add(permKey, &cachedPermission{
+ expire: exp,
+ })
+ c.pdhs.Add(targetID, &cachedPDH{
+ expire: exp,
+ pdh: pdh,
+ })
+ c.collections.Add(pdh, &cachedCollection{
+ expire: exp,
+ collection: collection,
+ })
+ if int64(len(collection["manifest_text"].(string))) > c.MaxCollectionBytes/int64(c.MaxCollectionEntries) {
+ go c.pruneCollections()
+ }
+ return collection, nil
+}
+
+// pruneCollections checks the total bytes occupied by manifest_text
+// in the collection cache and removes old entries as needed to bring
+// the total size down to CollectionBytes. It also deletes all expired
+// entries.
+//
+// pruneCollections does not aim to be perfectly correct when there is
+// concurrent cache activity.
+func (c *cache) pruneCollections() {
+ var size int64
+ now := time.Now()
+ keys := c.collections.Keys()
+ entsize := make([]int, len(keys))
+ expired := make([]bool, len(keys))
+ for i, k := range keys {
+ v, ok := c.collections.Peek(k)
+ if !ok {
+ continue
+ }
+ ent := v.(*cachedCollection)
+ n := len(ent.collection["manifest_text"].(string))
+ size += int64(n)
+ entsize[i] = n
+ expired[i] = ent.expire.Before(now)
+ }
+ for i, k := range keys {
+ if expired[i] {
+ c.collections.Remove(k)
+ size -= int64(entsize[i])
+ }
+ }
+ for i, k := range keys {
+ if size <= c.MaxCollectionBytes {
+ break
+ }
+ if expired[i] {
+ // already removed this entry in the previous loop
+ continue
+ }
+ c.collections.Remove(k)
+ size -= int64(entsize[i])
+ }
+}
+
+// collectionBytes returns the approximate memory size of the
+// collection cache.
+func (c *cache) collectionBytes() uint64 {
+ var size uint64
+ for _, k := range c.collections.Keys() {
+ v, ok := c.collections.Peek(k)
+ if !ok {
+ continue
+ }
+ size += uint64(len(v.(*cachedCollection).collection["manifest_text"].(string)))
+ }
+ return size
+}
+
+func (c *cache) lookupCollection(pdh string) map[string]interface{} {
+ if pdh == "" {
+ return nil
+ } else if ent, cached := c.collections.Get(pdh); !cached {
+ return nil
+ } else {
+ ent := ent.(*cachedCollection)
+ if ent.expire.Before(time.Now()) {
+ c.collections.Remove(pdh)
+ return nil
+ } else {
+ atomic.AddUint64(&c.stats.CollectionHits, 1)
+ return ent.collection
+ }
+ }
+}
--- /dev/null
+package main
+
+import (
+ "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+ "git.curoverse.com/arvados.git/sdk/go/arvadostest"
+ "gopkg.in/check.v1"
+)
+
+func (s *UnitSuite) TestCache(c *check.C) {
+ arv, err := arvadosclient.MakeArvadosClient()
+ c.Assert(err, check.Equals, nil)
+
+ cache := DefaultConfig().Cache
+
+ // Hit the same collection 5 times using the same token. Only
+ // the first req should cause an API call; the next 4 should
+ // hit all caches.
+ arv.ApiToken = arvadostest.AdminToken
+ for i := 0; i < 5; i++ {
+ coll, err := cache.Get(arv, arvadostest.FooCollection, false)
+ c.Check(err, check.Equals, nil)
+ c.Assert(coll, check.NotNil)
+ c.Check(coll["portable_data_hash"], check.Equals, arvadostest.FooPdh)
+ c.Check(coll["manifest_text"].(string)[:2], check.Equals, ". ")
+ }
+ c.Check(cache.Stats().Requests, check.Equals, uint64(5))
+ c.Check(cache.Stats().CollectionHits, check.Equals, uint64(4))
+ c.Check(cache.Stats().PermissionHits, check.Equals, uint64(4))
+ c.Check(cache.Stats().PDHHits, check.Equals, uint64(4))
+ c.Check(cache.Stats().APICalls, check.Equals, uint64(1))
+
+ // Hit the same collection 2 more times, this time requesting
+ // it by PDH and using a different token. The first req should
+ // miss the permission cache. Both reqs should hit the
+ // Collection cache and skip the API lookup.
+ arv.ApiToken = arvadostest.ActiveToken
+ for i := 0; i < 2; i++ {
+ coll, err := cache.Get(arv, arvadostest.FooPdh, false)
+ c.Check(err, check.Equals, nil)
+ c.Assert(coll, check.NotNil)
+ c.Check(coll["portable_data_hash"], check.Equals, arvadostest.FooPdh)
+ c.Check(coll["manifest_text"].(string)[:2], check.Equals, ". ")
+ }
+ c.Check(cache.Stats().Requests, check.Equals, uint64(5+2))
+ c.Check(cache.Stats().CollectionHits, check.Equals, uint64(4+2))
+ c.Check(cache.Stats().PermissionHits, check.Equals, uint64(4+1))
+ c.Check(cache.Stats().PDHHits, check.Equals, uint64(4+0))
+ c.Check(cache.Stats().APICalls, check.Equals, uint64(1+1))
+
+ // Alternating between two collections N times should produce
+ // only 2 more API calls.
+ arv.ApiToken = arvadostest.AdminToken
+ for i := 0; i < 20; i++ {
+ var target string
+ if i%2 == 0 {
+ target = arvadostest.HelloWorldCollection
+ } else {
+ target = arvadostest.FooBarDirCollection
+ }
+ _, err := cache.Get(arv, target, false)
+ c.Check(err, check.Equals, nil)
+ }
+ c.Check(cache.Stats().Requests, check.Equals, uint64(5+2+20))
+ c.Check(cache.Stats().CollectionHits, check.Equals, uint64(4+2+18))
+ c.Check(cache.Stats().PermissionHits, check.Equals, uint64(4+1+18))
+ c.Check(cache.Stats().PDHHits, check.Equals, uint64(4+0+18))
+ c.Check(cache.Stats().APICalls, check.Equals, uint64(1+1+2))
+}
+
+func (s *UnitSuite) TestCacheForceReloadByPDH(c *check.C) {
+ arv, err := arvadosclient.MakeArvadosClient()
+ c.Assert(err, check.Equals, nil)
+
+ cache := DefaultConfig().Cache
+
+ for _, forceReload := range []bool{false, true, false, true} {
+ _, err := cache.Get(arv, arvadostest.FooPdh, forceReload)
+ c.Check(err, check.Equals, nil)
+ }
+
+ c.Check(cache.Stats().Requests, check.Equals, uint64(4))
+ c.Check(cache.Stats().CollectionHits, check.Equals, uint64(3))
+ c.Check(cache.Stats().PermissionHits, check.Equals, uint64(1))
+ c.Check(cache.Stats().PDHHits, check.Equals, uint64(0))
+ c.Check(cache.Stats().APICalls, check.Equals, uint64(3))
+}
+
+func (s *UnitSuite) TestCacheForceReloadByUUID(c *check.C) {
+ arv, err := arvadosclient.MakeArvadosClient()
+ c.Assert(err, check.Equals, nil)
+
+ cache := DefaultConfig().Cache
+
+ for _, forceReload := range []bool{false, true, false, true} {
+ _, err := cache.Get(arv, arvadostest.FooCollection, forceReload)
+ c.Check(err, check.Equals, nil)
+ }
+
+ c.Check(cache.Stats().Requests, check.Equals, uint64(4))
+ c.Check(cache.Stats().CollectionHits, check.Equals, uint64(1))
+ c.Check(cache.Stats().PermissionHits, check.Equals, uint64(1))
+ c.Check(cache.Stats().PDHHits, check.Equals, uint64(1))
+ c.Check(cache.Stats().APICalls, check.Equals, uint64(3))
+}
package main
import (
+ "encoding/json"
"fmt"
"html"
"io"
func (h *handler) setup() {
h.clientPool = arvadosclient.MakeClientPool()
+ keepclient.RefreshServiceDiscoveryOnSIGHUP()
+}
+
+func (h *handler) serveStatus(w http.ResponseWriter, r *http.Request) {
+ status := struct {
+ cacheStats
+ }{
+ cacheStats: h.Config.Cache.Stats(),
+ }
+ json.NewEncoder(w).Encode(status)
}
// ServeHTTP implements http.Handler.
// http://ID.collections.example/PATH...
credentialsOK = true
targetPath = pathParts
+ } else if r.URL.Path == "/status.json" {
+ h.serveStatus(w, r)
+ return
} else if len(pathParts) >= 2 && strings.HasPrefix(pathParts[0], "c=") {
// /c=ID/PATH...
targetID = parseCollectionIDFromURL(pathParts[0][2:])
targetPath = targetPath[1:]
}
+ forceReload := false
+ if cc := r.Header.Get("Cache-Control"); strings.Contains(cc, "no-cache") || strings.Contains(cc, "must-revalidate") {
+ forceReload = true
+ }
+
+ var collection map[string]interface{}
tokenResult := make(map[string]int)
- collection := make(map[string]interface{})
found := false
for _, arv.ApiToken = range tokens {
- err := arv.Get("collections", targetID, nil, &collection)
+ var err error
+ collection, err = h.Config.Cache.Get(arv, targetID, forceReload)
if err == nil {
// Success
found = true
statusCode, statusText = http.StatusInternalServerError, err.Error()
return
}
- if client, ok := kc.Client.(*http.Client); ok && client.Transport != nil {
- // Workaround for https://dev.arvados.org/issues/9005
- if t, ok := client.Transport.(*http.Transport); ok {
- t.DisableKeepAlives = true
- }
- }
rdr, err := kc.CollectionFileReader(collection, filename)
if os.IsNotExist(err) {
statusCode = http.StatusNotFound
type UnitSuite struct{}
func (s *UnitSuite) TestCORSPreflight(c *check.C) {
- h := handler{Config: &Config{}}
+ h := handler{Config: DefaultConfig()}
u, _ := url.Parse("http://keep-web.example/c=" + arvadostest.FooCollection + "/foo")
req := &http.Request{
Method: "OPTIONS",
RequestURI: u.RequestURI(),
}
resp := httptest.NewRecorder()
- h := handler{Config: &Config{
- AnonymousTokens: []string{arvadostest.AnonymousToken},
- }}
+ cfg := DefaultConfig()
+ cfg.AnonymousTokens = []string{arvadostest.AnonymousToken}
+ h := handler{Config: cfg}
h.ServeHTTP(resp, req)
c.Check(resp.Code, check.Equals, http.StatusNotFound)
}
"flag"
"log"
"os"
+ "time"
"git.curoverse.com/arvados.git/sdk/go/arvados"
"git.curoverse.com/arvados.git/sdk/go/config"
AttachmentOnlyHost string
TrustAllContent bool
+ Cache cache
+
// Hack to support old command line flag, which is a bool
// meaning "get actual token from environment".
deprecatedAllowAnonymous bool
func DefaultConfig() *Config {
return &Config{
Listen: ":80",
+ Cache: cache{
+ TTL: arvados.Duration(5 * time.Minute),
+ MaxCollectionEntries: 1000,
+ MaxCollectionBytes: 100000000,
+ MaxPermissionEntries: 1000,
+ MaxUUIDEntries: 1000,
+ },
}
}
func (s *IntegrationSuite) SetUpTest(c *check.C) {
arvadostest.ResetEnv()
- s.testServer = &server{Config: &Config{
- Client: arvados.Client{
- APIHost: testAPIHost,
- Insecure: true,
- },
- Listen: "127.0.0.1:0",
- }}
+ cfg := DefaultConfig()
+ cfg.Client = arvados.Client{
+ APIHost: testAPIHost,
+ Insecure: true,
+ }
+ cfg.Listen = "127.0.0.1:0"
+ s.testServer = &server{Config: cfg}
err := s.testServer.Start()
c.Assert(err, check.Equals, nil)
}
--- /dev/null
+package main
+
+import (
+ "encoding/json"
+ "net/http"
+ "net/http/httptest"
+ "net/url"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvadostest"
+ "gopkg.in/check.v1"
+)
+
+func (s *UnitSuite) TestStatus(c *check.C) {
+ h := handler{Config: DefaultConfig()}
+ u, _ := url.Parse("http://keep-web.example/status.json")
+ req := &http.Request{
+ Method: "GET",
+ Host: u.Host,
+ URL: u,
+ RequestURI: u.RequestURI(),
+ }
+ resp := httptest.NewRecorder()
+ h.ServeHTTP(resp, req)
+ c.Check(resp.Code, check.Equals, http.StatusOK)
+
+ var status map[string]interface{}
+ err := json.NewDecoder(resp.Body).Decode(&status)
+ c.Check(err, check.IsNil)
+ c.Check(status["Cache.Requests"], check.Equals, float64(0))
+}
+
+func (s *IntegrationSuite) TestNoStatusFromVHost(c *check.C) {
+ u, _ := url.Parse("http://" + arvadostest.FooCollection + "--keep-web.example/status.json")
+ req := &http.Request{
+ Method: "GET",
+ Host: u.Host,
+ URL: u,
+ RequestURI: u.RequestURI(),
+ Header: http.Header{
+ "Authorization": {"OAuth2 " + arvadostest.ActiveToken},
+ },
+ }
+ resp := httptest.NewRecorder()
+ s.testServer.Handler.ServeHTTP(resp, req)
+ c.Check(resp.Code, check.Equals, http.StatusNotFound)
+}
Serve non-public content from a single origin. Dangerous: read
docs before using!
+Cache.TTL:
+
+ Maximum time to cache collection data and permission checks.
+
+Cache.MaxCollectionEntries:
+
+ Maximum number of collection cache entries.
+
+Cache.MaxCollectionBytes:
+
+ Approximate memory limit for collection cache.
+
+Cache.MaxPermissionEntries:
+
+ Maximum number of permission cache entries.
+
+Cache.MaxUUIDEntries:
+
+ Maximum number of UUID cache entries.
+
`, exampleConfigFile)
}
if err != nil {
log.Fatalf("Error setting up keep client %s", err.Error())
}
+ keepclient.RefreshServiceDiscoveryOnSIGHUP()
if cfg.PIDFile != "" {
f, err := os.Create(cfg.PIDFile)
if cfg.DefaultReplicas > 0 {
kc.Want_replicas = cfg.DefaultReplicas
}
- kc.Client.(*http.Client).Timeout = time.Duration(cfg.Timeout)
- go kc.RefreshServices(5*time.Minute, 3*time.Second)
listener, err = net.Listen("tcp", cfg.Listen)
if err != nil {
signal.Notify(term, syscall.SIGINT)
// Start serving requests.
- router = MakeRESTRouter(!cfg.DisableGet, !cfg.DisablePut, kc)
+ router = MakeRESTRouter(!cfg.DisableGet, !cfg.DisablePut, kc, time.Duration(cfg.Timeout))
http.Serve(listener, router)
log.Println("shutting down")
http.Handler
*keepclient.KeepClient
*ApiTokenCache
+ timeout time.Duration
+ transport *http.Transport
}
// MakeRESTRouter returns an http.Handler that passes GET and PUT
// requests to the appropriate handlers.
-func MakeRESTRouter(enable_get bool, enable_put bool, kc *keepclient.KeepClient) http.Handler {
+func MakeRESTRouter(enable_get bool, enable_put bool, kc *keepclient.KeepClient, timeout time.Duration) http.Handler {
rest := mux.NewRouter()
+
+ transport := *(http.DefaultTransport.(*http.Transport))
+ transport.DialContext = (&net.Dialer{
+ Timeout: keepclient.DefaultConnectTimeout,
+ KeepAlive: keepclient.DefaultKeepAlive,
+ DualStack: true,
+ }).DialContext
+ transport.TLSClientConfig = arvadosclient.MakeTLSConfig(kc.Arvados.ApiInsecure)
+ transport.TLSHandshakeTimeout = keepclient.DefaultTLSHandshakeTimeout
+
h := &proxyHandler{
Handler: rest,
KeepClient: kc,
+ timeout: timeout,
+ transport: &transport,
ApiTokenCache: &ApiTokenCache{
tokens: make(map[string]int64),
expireTime: 300,
}
}()
- kc := *h.KeepClient
- kc.Client = &proxyClient{client: kc.Client, proto: req.Proto}
+ kc := h.makeKeepClient(req)
var pass bool
var tok string
- if pass, tok = CheckAuthorizationHeader(&kc, h.ApiTokenCache, req); !pass {
+ if pass, tok = CheckAuthorizationHeader(kc, h.ApiTokenCache, req); !pass {
status, err = http.StatusForbidden, BadAuthorizationHeader
return
}
SetCorsHeaders(resp)
resp.Header().Set("Via", "HTTP/1.1 "+viaAlias)
- kc := *h.KeepClient
- kc.Client = &proxyClient{client: kc.Client, proto: req.Proto}
+ kc := h.makeKeepClient(req)
var err error
var expectLength int64
var pass bool
var tok string
- if pass, tok = CheckAuthorizationHeader(&kc, h.ApiTokenCache, req); !pass {
+ if pass, tok = CheckAuthorizationHeader(kc, h.ApiTokenCache, req); !pass {
err = BadAuthorizationHeader
status = http.StatusForbidden
return
}
}()
- kc := *h.KeepClient
-
- ok, token := CheckAuthorizationHeader(&kc, h.ApiTokenCache, req)
+ kc := h.makeKeepClient(req)
+ ok, token := CheckAuthorizationHeader(kc, h.ApiTokenCache, req)
if !ok {
status, err = http.StatusForbidden, BadAuthorizationHeader
return
status = http.StatusOK
resp.Write([]byte("\n"))
}
+
+func (h *proxyHandler) makeKeepClient(req *http.Request) *keepclient.KeepClient {
+ kc := *h.KeepClient
+ kc.HTTPClient = &proxyClient{
+ client: &http.Client{
+ Timeout: h.timeout,
+ Transport: h.transport,
+ },
+ proto: req.Proto,
+ }
+ return &kc
+}
// fixes the invalid Content-Length header. In order to test
// our server behavior, we have to call the handler directly
// using an httptest.ResponseRecorder.
- rtr := MakeRESTRouter(true, true, kc)
+ rtr := MakeRESTRouter(true, true, kc, 10*time.Second)
type testcase struct {
sendLength string
keepClient := &keepclient.KeepClient{
Arvados: &arvadosclient.ArvadosClient{},
Want_replicas: 1,
- Client: &http.Client{},
}
// Initialize the pullq and worker
"errors"
"io"
"io/ioutil"
- "net/http"
"os"
"strings"
"testing"
// start api and keep servers
arvadostest.StartAPI()
arvadostest.StartKeep(2, false)
+ keepclient.RefreshServiceDiscovery()
// make arvadosclient
arv, err := arvadosclient.MakeArvadosClient()
if err != nil {
- t.Error("Error creating arv")
+ t.Fatalf("Error creating arv: %s", err)
}
// keep client
- keepClient = &keepclient.KeepClient{
- Arvados: arv,
- Want_replicas: 1,
- Client: &http.Client{},
+ keepClient, err = keepclient.MakeKeepClient(arv)
+ if err != nil {
+ t.Fatalf("error creating KeepClient: %s", err)
}
+ keepClient.Want_replicas = 1
// discover keep services
var servers []string
- if err := keepClient.DiscoverKeepServers(); err != nil {
- t.Error("Error discovering keep services")
- }
for _, host := range keepClient.LocalRoots() {
servers = append(servers, host)
}
"errors"
"flag"
"fmt"
- "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
- "git.curoverse.com/arvados.git/sdk/go/keepclient"
"io/ioutil"
"log"
"net/http"
"os"
- "regexp"
"strings"
"time"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+ "git.curoverse.com/arvados.git/sdk/go/keepclient"
)
func main() {
return
}
-var matchTrue = regexp.MustCompile("^(?i:1|yes|true)$")
-
// Read config from file
func readConfigFromFile(filename string) (config apiConfig, blobSigningKey string, err error) {
if !strings.Contains(filename, "/") {
case "ARVADOS_API_HOST":
config.APIHost = value
case "ARVADOS_API_HOST_INSECURE":
- config.APIHostInsecure = matchTrue.MatchString(value)
+ config.APIHostInsecure = arvadosclient.StringBool(value)
case "ARVADOS_EXTERNAL_CLIENT":
- config.ExternalClient = matchTrue.MatchString(value)
+ config.ExternalClient = arvadosclient.StringBool(value)
case "ARVADOS_BLOB_SIGNING_KEY":
blobSigningKey = value
}
External: config.ExternalClient,
}
- // if keepServicesJSON is provided, use it to load services; else, use DiscoverKeepServers
+ // If keepServicesJSON is provided, use it instead of service discovery
if keepServicesJSON == "" {
kc, err = keepclient.MakeKeepClient(&arv)
if err != nil {
"testing"
"time"
+ "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/arvadostest"
"git.curoverse.com/arvados.git/sdk/go/keepclient"
func (s *DoMainTestSuite) SetUpTest(c *C) {
logOutput := io.MultiWriter(&logBuffer)
log.SetOutput(logOutput)
+ keepclient.RefreshServiceDiscovery()
}
func (s *DoMainTestSuite) TearDownTest(c *C) {
var config apiConfig
config.APIHost = os.Getenv("ARVADOS_API_HOST")
config.APIToken = arvadostest.DataManagerToken
- config.APIHostInsecure = matchTrue.MatchString(os.Getenv("ARVADOS_API_HOST_INSECURE"))
+ config.APIHostInsecure = arvadosclient.StringBool(os.Getenv("ARVADOS_API_HOST_INSECURE"))
// Start Keep servers
arvadostest.StartKeep(2, enforcePermissions)
kc, ttl, err = setupKeepClient(config, keepServicesJSON, ttl)
c.Assert(ttl, Equals, blobSignatureTTL)
c.Check(err, IsNil)
+
+ keepclient.RefreshServiceDiscovery()
}
// Setup test data
func checkErrorLog(c *C, blocks []string, prefix, suffix string) {
for _, hash := range blocks {
- expected := prefix + `.*` + hash + `.*` + suffix
- match, _ := regexp.MatchString(expected, logBuffer.String())
- c.Assert(match, Equals, true)
+ expected := `(?ms).*` + prefix + `.*` + hash + `.*` + suffix + `.*`
+ c.Check(logBuffer.String(), Matches, expected)
}
}
c.Assert(config.APIHost, Equals, os.Getenv("ARVADOS_API_HOST"))
c.Assert(config.APIToken, Equals, arvadostest.DataManagerToken)
- c.Assert(config.APIHostInsecure, Equals, matchTrue.MatchString(os.Getenv("ARVADOS_API_HOST_INSECURE")))
+ c.Assert(config.APIHostInsecure, Equals, arvadosclient.StringBool(os.Getenv("ARVADOS_API_HOST_INSECURE")))
c.Assert(config.ExternalClient, Equals, false)
c.Assert(blobSigningKey, Equals, "abcdefg")
}
log.Fatal(err)
}
kc.Want_replicas = *Replicas
- kc.Client.(*http.Client).Timeout = 10 * time.Minute
+
+ transport := *(http.DefaultTransport.(*http.Transport))
+ transport.TLSClientConfig = arvadosclient.MakeTLSConfig(arv.ApiInsecure)
+ kc.HTTPClient = &http.Client{
+ Timeout: 10 * time.Minute,
+ Transport: &transport,
+ }
overrideServices(kc)
"errors"
"flag"
"fmt"
- "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
- "git.curoverse.com/arvados.git/sdk/go/keepclient"
"io/ioutil"
"log"
"net/http"
"os"
- "regexp"
"strings"
"time"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+ "git.curoverse.com/arvados.git/sdk/go/keepclient"
)
func main() {
return
}
-var matchTrue = regexp.MustCompile("^(?i:1|yes|true)$")
-
// Read config from file
func readConfigFromFile(filename string) (config apiConfig, blobSigningKey string, err error) {
if !strings.Contains(filename, "/") {
case "ARVADOS_API_HOST":
config.APIHost = value
case "ARVADOS_API_HOST_INSECURE":
- config.APIHostInsecure = matchTrue.MatchString(value)
+ config.APIHostInsecure = arvadosclient.StringBool(value)
case "ARVADOS_EXTERNAL_CLIENT":
- config.ExternalClient = matchTrue.MatchString(value)
+ config.ExternalClient = arvadosclient.StringBool(value)
case "ARVADOS_BLOB_SIGNING_KEY":
blobSigningKey = value
}
External: config.ExternalClient,
}
- // if keepServicesJSON is provided, use it to load services; else, use DiscoverKeepServers
+ // If keepServicesJSON is provided, use it instead of service discovery
if keepServicesJSON == "" {
kc, err = keepclient.MakeKeepClient(&arv)
if err != nil {
"crypto/md5"
"fmt"
"io/ioutil"
- "log"
"os"
"strings"
"testing"
"time"
+ "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/arvadostest"
"git.curoverse.com/arvados.git/sdk/go/keepclient"
. "gopkg.in/check.v1"
)
+var kcSrc, kcDst *keepclient.KeepClient
+var srcKeepServicesJSON, dstKeepServicesJSON, blobSigningKey string
+var blobSignatureTTL = time.Duration(2*7*24) * time.Hour
+
+func resetGlobals() {
+ blobSigningKey = ""
+ srcKeepServicesJSON = ""
+ dstKeepServicesJSON = ""
+ kcSrc = nil
+ kcDst = nil
+}
+
// Gocheck boilerplate
func Test(t *testing.T) {
TestingT(t)
}
-// Gocheck boilerplate
var _ = Suite(&ServerRequiredSuite{})
var _ = Suite(&ServerNotRequiredSuite{})
var _ = Suite(&DoMainTestSuite{})
-// Tests that require the Keep server running
type ServerRequiredSuite struct{}
-type ServerNotRequiredSuite struct{}
-type DoMainTestSuite struct{}
func (s *ServerRequiredSuite) SetUpSuite(c *C) {
- // Start API server
arvadostest.StartAPI()
}
arvadostest.ResetEnv()
}
-var initialArgs []string
-
-func (s *DoMainTestSuite) SetUpSuite(c *C) {
- initialArgs = os.Args
-}
-
-var kcSrc, kcDst *keepclient.KeepClient
-var srcKeepServicesJSON, dstKeepServicesJSON, blobSigningKey string
-var blobSignatureTTL = time.Duration(2*7*24) * time.Hour
-
func (s *ServerRequiredSuite) SetUpTest(c *C) {
- // reset all variables between tests
- blobSigningKey = ""
- srcKeepServicesJSON = ""
- dstKeepServicesJSON = ""
- kcSrc = &keepclient.KeepClient{}
- kcDst = &keepclient.KeepClient{}
+ resetGlobals()
}
func (s *ServerRequiredSuite) TearDownTest(c *C) {
arvadostest.StopKeep(3)
}
+func (s *ServerNotRequiredSuite) SetUpTest(c *C) {
+ resetGlobals()
+}
+
+type ServerNotRequiredSuite struct{}
+
+type DoMainTestSuite struct {
+ initialArgs []string
+}
+
func (s *DoMainTestSuite) SetUpTest(c *C) {
- args := []string{"keep-rsync"}
- os.Args = args
+ s.initialArgs = os.Args
+ os.Args = []string{"keep-rsync"}
+ resetGlobals()
}
func (s *DoMainTestSuite) TearDownTest(c *C) {
- os.Args = initialArgs
+ os.Args = s.initialArgs
}
var testKeepServicesJSON = "{ \"kind\":\"arvados#keepServiceList\", \"etag\":\"\", \"self_link\":\"\", \"offset\":null, \"limit\":null, \"items\":[ { \"href\":\"/keep_services/zzzzz-bi6l4-123456789012340\", \"kind\":\"arvados#keepService\", \"etag\":\"641234567890enhj7hzx432e5\", \"uuid\":\"zzzzz-bi6l4-123456789012340\", \"owner_uuid\":\"zzzzz-tpzed-123456789012345\", \"service_host\":\"keep0.zzzzz.arvadosapi.com\", \"service_port\":25107, \"service_ssl_flag\":false, \"service_type\":\"disk\", \"read_only\":false }, { \"href\":\"/keep_services/zzzzz-bi6l4-123456789012341\", \"kind\":\"arvados#keepService\", \"etag\":\"641234567890enhj7hzx432e5\", \"uuid\":\"zzzzz-bi6l4-123456789012341\", \"owner_uuid\":\"zzzzz-tpzed-123456789012345\", \"service_host\":\"keep0.zzzzz.arvadosapi.com\", \"service_port\":25108, \"service_ssl_flag\":false, \"service_type\":\"disk\", \"read_only\":false } ], \"items_available\":2 }"
var srcConfig apiConfig
srcConfig.APIHost = os.Getenv("ARVADOS_API_HOST")
srcConfig.APIToken = arvadostest.DataManagerToken
- srcConfig.APIHostInsecure = matchTrue.MatchString(os.Getenv("ARVADOS_API_HOST_INSECURE"))
+ srcConfig.APIHostInsecure = arvadosclient.StringBool(os.Getenv("ARVADOS_API_HOST_INSECURE"))
// dstConfig
var dstConfig apiConfig
dstConfig.APIHost = os.Getenv("ARVADOS_API_HOST")
dstConfig.APIToken = arvadostest.DataManagerToken
- dstConfig.APIHostInsecure = matchTrue.MatchString(os.Getenv("ARVADOS_API_HOST_INSECURE"))
+ dstConfig.APIHostInsecure = arvadosclient.StringBool(os.Getenv("ARVADOS_API_HOST_INSECURE"))
if enforcePermissions {
blobSigningKey = arvadostest.BlobSigningKey
// Start Keep servers
arvadostest.StartKeep(3, enforcePermissions)
+ keepclient.RefreshServiceDiscovery()
// setup keepclients
var err error
kcSrc, _, err = setupKeepClient(srcConfig, srcKeepServicesJSON, false, 0, blobSignatureTTL)
- c.Check(err, IsNil)
+ c.Assert(err, IsNil)
kcDst, _, err = setupKeepClient(dstConfig, dstKeepServicesJSON, true, replications, 0)
- c.Check(err, IsNil)
+ c.Assert(err, IsNil)
- for uuid := range kcSrc.LocalRoots() {
+ srcRoots := map[string]string{}
+ dstRoots := map[string]string{}
+ for uuid, root := range kcSrc.LocalRoots() {
if strings.HasSuffix(uuid, "02") {
- delete(kcSrc.LocalRoots(), uuid)
+ dstRoots[uuid] = root
+ } else {
+ srcRoots[uuid] = root
}
}
- for uuid := range kcSrc.GatewayRoots() {
- if strings.HasSuffix(uuid, "02") {
- delete(kcSrc.GatewayRoots(), uuid)
- }
+ if srcKeepServicesJSON == "" {
+ kcSrc.SetServiceRoots(srcRoots, srcRoots, srcRoots)
}
- for uuid := range kcSrc.WritableLocalRoots() {
- if strings.HasSuffix(uuid, "02") {
- delete(kcSrc.WritableLocalRoots(), uuid)
- }
- }
-
- for uuid := range kcDst.LocalRoots() {
- if strings.HasSuffix(uuid, "00") || strings.HasSuffix(uuid, "01") {
- delete(kcDst.LocalRoots(), uuid)
- }
- }
- for uuid := range kcDst.GatewayRoots() {
- if strings.HasSuffix(uuid, "00") || strings.HasSuffix(uuid, "01") {
- delete(kcDst.GatewayRoots(), uuid)
- }
- }
- for uuid := range kcDst.WritableLocalRoots() {
- if strings.HasSuffix(uuid, "00") || strings.HasSuffix(uuid, "01") {
- delete(kcDst.WritableLocalRoots(), uuid)
- }
+ if dstKeepServicesJSON == "" {
+ kcDst.SetServiceRoots(dstRoots, dstRoots, dstRoots)
}
if replications == 0 {
localRoots := kcSrc.LocalRoots()
c.Check(localRoots, NotNil)
-
- foundIt := false
- for k := range localRoots {
- if k == "zzzzz-bi6l4-123456789012340" {
- foundIt = true
- }
- }
- c.Check(foundIt, Equals, true)
-
- foundIt = false
- for k := range localRoots {
- if k == "zzzzz-bi6l4-123456789012341" {
- foundIt = true
- }
- }
- c.Check(foundIt, Equals, true)
+ c.Check(localRoots["zzzzz-bi6l4-123456789012340"], Not(Equals), "")
+ c.Check(localRoots["zzzzz-bi6l4-123456789012341"], Not(Equals), "")
}
// Test keep-rsync initialization with default replications count
setupRsync(c, false, 1)
err := performKeepRsync(kcSrc, kcDst, blobSignatureTTL, "", "")
- log.Printf("Err = %v", err)
- c.Check(strings.Contains(err.Error(), "no such host"), Equals, true)
+ c.Assert(err, NotNil)
+ c.Check(err.Error(), Matches, ".*no such host.*")
}
// Setup rsync using dstKeepServicesJSON with fake keepservers.
setupRsync(c, false, 1)
err := performKeepRsync(kcSrc, kcDst, blobSignatureTTL, "", "")
- log.Printf("Err = %v", err)
- c.Check(strings.Contains(err.Error(), "no such host"), Equals, true)
+ c.Assert(err, NotNil)
+ c.Check(err.Error(), Matches, ".*no such host.*")
}
// Test rsync with signature error during Get from src.
blobSigningKey = "thisisfakeblobsigningkey"
err := performKeepRsync(kcSrc, kcDst, blobSignatureTTL, blobSigningKey, "")
- c.Check(strings.Contains(err.Error(), "HTTP 403 \"Forbidden\""), Equals, true)
+ c.Assert(err, NotNil)
+ c.Check(err.Error(), Matches, ".*HTTP 403 \"Forbidden\".*")
}
// Test rsync with error during Put to src.
kcDst.Want_replicas = 2
err := performKeepRsync(kcSrc, kcDst, blobSignatureTTL, blobSigningKey, "")
- c.Check(strings.Contains(err.Error(), "Could not write sufficient replicas"), Equals, true)
+ c.Assert(err, NotNil)
+ c.Check(err.Error(), Matches, ".*Could not write sufficient replicas.*")
}
// Test loadConfig func
c.Assert(srcConfig.APIHost, Equals, os.Getenv("ARVADOS_API_HOST"))
c.Assert(srcConfig.APIToken, Equals, arvadostest.DataManagerToken)
- c.Assert(srcConfig.APIHostInsecure, Equals, matchTrue.MatchString(os.Getenv("ARVADOS_API_HOST_INSECURE")))
+ c.Assert(srcConfig.APIHostInsecure, Equals, arvadosclient.StringBool(os.Getenv("ARVADOS_API_HOST_INSECURE")))
c.Assert(srcConfig.ExternalClient, Equals, false)
dstConfig, _, err := loadConfig(dstConfigFile)
c.Assert(dstConfig.APIHost, Equals, os.Getenv("ARVADOS_API_HOST"))
c.Assert(dstConfig.APIToken, Equals, arvadostest.DataManagerToken)
- c.Assert(dstConfig.APIHostInsecure, Equals, matchTrue.MatchString(os.Getenv("ARVADOS_API_HOST_INSECURE")))
+ c.Assert(dstConfig.APIHostInsecure, Equals, arvadosclient.StringBool(os.Getenv("ARVADOS_API_HOST_INSECURE")))
c.Assert(dstConfig.ExternalClient, Equals, false)
c.Assert(srcBlobSigningKey, Equals, "abcdefg")
// Test loadConfig func - error reading config
func (s *ServerNotRequiredSuite) TestLoadConfig_ErrorLoadingSrcConfig(c *C) {
_, _, err := loadConfig("no-such-config-file")
- c.Assert(strings.Contains(err.Error(), "no such file or directory"), Equals, true)
+ c.Assert(err, NotNil)
+ c.Check(err.Error(), Matches, ".*no such file or directory.*")
}
func (s *ServerNotRequiredSuite) TestSetupKeepClient_NoBlobSignatureTTL(c *C) {
var srcConfig apiConfig
srcConfig.APIHost = os.Getenv("ARVADOS_API_HOST")
srcConfig.APIToken = arvadostest.DataManagerToken
- srcConfig.APIHostInsecure = matchTrue.MatchString(os.Getenv("ARVADOS_API_HOST_INSECURE"))
- arvadostest.StartKeep(2, false)
+ srcConfig.APIHostInsecure = arvadosclient.StringBool(os.Getenv("ARVADOS_API_HOST_INSECURE"))
_, ttl, err := setupKeepClient(srcConfig, srcKeepServicesJSON, false, 0, 0)
c.Check(err, IsNil)
func (s *DoMainTestSuite) Test_doMain_NoSrcConfig(c *C) {
err := doMain()
- c.Check(err, NotNil)
+ c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "Error loading src configuration from file: config file not specified")
}
args := []string{"-replications", "3", "-src", srcConfig.Name()}
os.Args = append(os.Args, args...)
err := doMain()
- c.Check(err, NotNil)
+ c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "Error loading dst configuration from file: config file not specified")
}
args := []string{"-src", "abcd"}
os.Args = append(os.Args, args...)
err := doMain()
- c.Check(err, NotNil)
- c.Assert(strings.HasPrefix(err.Error(), "Error loading src configuration from file: Error reading config file"), Equals, true)
+ c.Assert(err, NotNil)
+ c.Assert(err.Error(), Matches, "Error loading src configuration from file: Error reading config file.*")
}
func (s *DoMainTestSuite) Test_doMain_WithReplicationsButNoSrcConfig(c *C) {
// actual copying to dst will happen, but that's ok.
arvadostest.StartKeep(2, false)
defer arvadostest.StopKeep(2)
+ keepclient.RefreshServiceDiscovery()
err := doMain()
c.Check(err, IsNil)