.awesomplete > ul {
max-height: 410px;
overflow-y: auto;
-}
\ No newline at end of file
+}
+
+.dropdown-menu > li > form > button {
+ display: block;
+ padding: 3px 20px;
+ clear: both;
+ font-weight: normal;
+ line-height: 1.428571429;
+ color: #333333;
+ white-space: nowrap;
+ cursor: pointer;
+ text-decoration: none;
+ background: transparent;
+ border-style: none;
+}
+
+.dropdown-menu > li > form > button:hover {
+ text-decoration: none;
+ color: #262626;
+ background-color: #f5f5f5;
+}
<li role="menuitem"><a href="/projects/<%=current_user.uuid%>" role="menuitem"><i class="fa fa-lg fa-home fa-fw"></i> Home project </a></li>
<% if Rails.configuration.composer_url %>
<li role="menuitem">
- <%= link_to Rails.configuration.composer_url, role: 'menu-item' do %>
- <i class="fa fa-lg fa-share-alt fa-fw"></i> Workflow Composer
- <% end %>
+ <form action="<%= Rails.configuration.composer_url %>" method="GET">
+ <input type="hidden" name="api_token" value="<%= Thread.current[:arvados_api_token] %>" />
+ <button role="menuitem" type="submit">
+ <i class="fa fa-lg fa-share-alt fa-fw"></i> Workflow Composer
+ </button>
+ </form>
</li>
<% end %>
<li role="menuitem">
min_entries=2):
self.api_client = api_client
self.keep_client = keep_client
+ self.num_retries = num_retries
self.collections = OrderedDict()
self.lock = threading.Lock()
self.total = 0
if pdh not in self.collections:
logger.debug("Creating collection reader for %s", pdh)
cr = arvados.collection.CollectionReader(pdh, api_client=self.api_client,
- keep_client=self.keep_client)
+ keep_client=self.keep_client,
+ num_retries=self.num_retries)
sz = len(cr.manifest_text()) * 128
self.collections[pdh] = (cr, sz)
self.total += sz
err = s.fs.Remove("foo/bar")
c.Check(err, check.IsNil)
- // mkdir succeds after the file is deleted
+ // mkdir succeeds after the file is deleted
err = s.fs.Mkdir("foo/bar", 0755)
c.Check(err, check.IsNil)
"""
+ __slots__ = ('parent', 'name', '_writers', '_committed',
+ '_segments', 'lock', '_current_bblock', 'fuse_entry')
+
def __init__(self, parent, name, stream=[], segments=[]):
"""
ArvadosFile constructor.
return text
+ _token_re = re.compile(r'(\S+)(\s+|$)')
+ _block_re = re.compile(r'[0-9a-f]{32}\+(\d+)(\+\S+)*')
+ _segment_re = re.compile(r'(\d+):(\d+):(\S+)')
+
@synchronized
def _import_manifest(self, manifest_text):
"""Import a manifest into a `Collection`.
stream_name = None
state = STREAM_NAME
- for token_and_separator in re.finditer(r'(\S+)(\s+|$)', manifest_text):
+ for token_and_separator in self._token_re.finditer(manifest_text):
tok = token_and_separator.group(1)
sep = token_and_separator.group(2)
continue
if state == BLOCKS:
- block_locator = re.match(r'[0-9a-f]{32}\+(\d+)(\+\S+)*', tok)
+ block_locator = self._block_re.match(tok)
if block_locator:
blocksize = int(block_locator.group(1))
blocks.append(Range(tok, streamoffset, blocksize, 0))
state = SEGMENTS
if state == SEGMENTS:
- file_segment = re.search(r'^(\d+):(\d+):(\S+)', tok)
+ file_segment = self._segment_re.match(tok)
if file_segment:
pos = int(file_segment.group(1))
size = int(file_segment.group(2))
self._lastheadername = name
self._headers[name] = value
# Returning None implies all bytes were written
-
+
class KeepWriterQueue(queue.Queue):
def __init__(self, copies):
self.successful_copies_lock = threading.Lock()
self.pending_tries = copies
self.pending_tries_notification = threading.Condition()
-
+
def write_success(self, response, replicas_nr):
with self.successful_copies_lock:
self.successful_copies += replicas_nr
self.response = response
with self.pending_tries_notification:
self.pending_tries_notification.notify_all()
-
+
def write_fail(self, ks):
with self.pending_tries_notification:
self.pending_tries += 1
self.pending_tries_notification.notify()
-
+
def pending_copies(self):
with self.successful_copies_lock:
return self.wanted_copies - self.successful_copies
for _ in range(num_threads):
w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
self.workers.append(w)
-
+
def add_task(self, ks, service_root):
self.queue.put((ks, service_root))
self.total_task_nr += 1
-
+
def done(self):
return self.queue.successful_copies
-
+
def join(self):
# Start workers
for worker in self.workers:
worker.start()
# Wait for finished work
self.queue.join()
-
+
def response(self):
return self.queue.response
-
-
+
+
class KeepWriterThread(threading.Thread):
TaskFailed = RuntimeError()
self.get_counter.add(1)
- locator = KeepLocator(loc_s)
- if method == "GET":
- slot, first = self.block_cache.reserve_cache(locator.md5sum)
- if not first:
- self.hits_counter.add(1)
- v = slot.get()
- return v
-
- self.misses_counter.add(1)
-
- headers = {
- 'X-Request-Id': (request_id or
- (hasattr(self, 'api_client') and self.api_client.request_id) or
- arvados.util.new_request_id()),
- }
-
- # If the locator has hints specifying a prefix (indicating a
- # remote keepproxy) or the UUID of a local gateway service,
- # read data from the indicated service(s) instead of the usual
- # list of local disk services.
- hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
- for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
- hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
- for hint in locator.hints if (
- hint.startswith('K@') and
- len(hint) == 29 and
- self._gateway_services.get(hint[2:])
- )])
- # Map root URLs to their KeepService objects.
- roots_map = {
- root: self.KeepService(root, self._user_agent_pool,
- upload_counter=self.upload_counter,
- download_counter=self.download_counter,
- headers=headers)
- for root in hint_roots
- }
-
- # See #3147 for a discussion of the loop implementation. Highlights:
- # * Refresh the list of Keep services after each failure, in case
- # it's being updated.
- # * Retry until we succeed, we're out of retries, or every available
- # service has returned permanent failure.
- sorted_roots = []
- roots_map = {}
+ slot = None
blob = None
- loop = retry.RetryLoop(num_retries, self._check_loop_result,
- backoff_start=2)
- for tries_left in loop:
- try:
- sorted_roots = self.map_new_services(
- roots_map, locator,
- force_rebuild=(tries_left < num_retries),
- need_writable=False,
- headers=headers)
- except Exception as error:
- loop.save_result(error)
- continue
+ try:
+ locator = KeepLocator(loc_s)
+ if method == "GET":
+ slot, first = self.block_cache.reserve_cache(locator.md5sum)
+ if not first:
+ self.hits_counter.add(1)
+ blob = slot.get()
+ if blob is None:
+ raise arvados.errors.KeepReadError(
+ "failed to read {}".format(loc_s))
+ return blob
+
+ self.misses_counter.add(1)
+
+ headers = {
+ 'X-Request-Id': (request_id or
+ (hasattr(self, 'api_client') and self.api_client.request_id) or
+ arvados.util.new_request_id()),
+ }
+
+ # If the locator has hints specifying a prefix (indicating a
+ # remote keepproxy) or the UUID of a local gateway service,
+ # read data from the indicated service(s) instead of the usual
+ # list of local disk services.
+ hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
+ for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
+ hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
+ for hint in locator.hints if (
+ hint.startswith('K@') and
+ len(hint) == 29 and
+ self._gateway_services.get(hint[2:])
+ )])
+ # Map root URLs to their KeepService objects.
+ roots_map = {
+ root: self.KeepService(root, self._user_agent_pool,
+ upload_counter=self.upload_counter,
+ download_counter=self.download_counter,
+ headers=headers)
+ for root in hint_roots
+ }
+
+ # See #3147 for a discussion of the loop implementation. Highlights:
+ # * Refresh the list of Keep services after each failure, in case
+ # it's being updated.
+ # * Retry until we succeed, we're out of retries, or every available
+ # service has returned permanent failure.
+ sorted_roots = []
+ roots_map = {}
+ loop = retry.RetryLoop(num_retries, self._check_loop_result,
+ backoff_start=2)
+ for tries_left in loop:
+ try:
+ sorted_roots = self.map_new_services(
+ roots_map, locator,
+ force_rebuild=(tries_left < num_retries),
+ need_writable=False,
+ headers=headers)
+ except Exception as error:
+ loop.save_result(error)
+ continue
- # Query KeepService objects that haven't returned
- # permanent failure, in our specified shuffle order.
- services_to_try = [roots_map[root]
- for root in sorted_roots
- if roots_map[root].usable()]
- for keep_service in services_to_try:
- blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
- if blob is not None:
- break
- loop.save_result((blob, len(services_to_try)))
-
- # Always cache the result, then return it if we succeeded.
- if method == "GET":
- slot.set(blob)
- self.block_cache.cap_cache()
- if loop.success():
- if method == "HEAD":
- return True
- else:
- return blob
+ # Query KeepService objects that haven't returned
+ # permanent failure, in our specified shuffle order.
+ services_to_try = [roots_map[root]
+ for root in sorted_roots
+ if roots_map[root].usable()]
+ for keep_service in services_to_try:
+ blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
+ if blob is not None:
+ break
+ loop.save_result((blob, len(services_to_try)))
+
+ # Always cache the result, then return it if we succeeded.
+ if loop.success():
+ if method == "HEAD":
+ return True
+ else:
+ return blob
+ finally:
+ if slot is not None:
+ slot.set(blob)
+ self.block_cache.cap_cache()
# Q: Including 403 is necessary for the Keep tests to continue
# passing, but maybe they should expect KeepReadError instead?
loop.save_result(error)
continue
- writer_pool = KeepClient.KeepWriterThreadPool(data=data,
+ writer_pool = KeepClient.KeepWriterThreadPool(data=data,
data_hash=data_hash,
copies=copies - done,
max_service_replicas=self.max_replicas_per_service,
def finished(self):
return False
-
+
def setUp(self):
self.copies = 3
self.pool = arvados.KeepClient.KeepWriterThreadPool(
self.pool.add_task(ks, None)
self.pool.join()
self.assertEqual(self.pool.done(), self.copies-1)
-
+
@tutil.skip_sleep
class RetryNeedsMultipleServices(unittest.TestCase, tutil.ApiClientMock):
with self.assertRaises(arvados.errors.KeepWriteError):
self.keep_client.put('foo', num_retries=1, copies=2)
self.assertEqual(2, req_mock.call_count)
+
+class KeepClientAPIErrorTest(unittest.TestCase):
+ def test_api_fail(self):
+ class ApiMock(object):
+ def __getattr__(self, r):
+ if r == "api_token":
+ return "abc"
+ else:
+ raise arvados.errors.KeepReadError()
+ keep_client = arvados.KeepClient(api_client=ApiMock(),
+ proxy='', local_store='')
+
+ # The bug this is testing for is that if an API (not
+ # keepstore) exception is thrown as part of a get(), the next
+ # attempt to get that same block will result in a deadlock.
+ # This is why there are two get()s in a row. Unfortunately,
+ # the failure mode for this test is that the test suite
+ # deadlocks, there isn't a good way to avoid that without
+ # adding a special case that has no use except for this test.
+
+ with self.assertRaises(arvados.errors.KeepReadError):
+ keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8+3")
+ with self.assertRaises(arvados.errors.KeepReadError):
+ keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8+3")
include Trashable
serialize :properties, Hash
+ serialize :storage_classes_desired, Array
+ serialize :storage_classes_confirmed, Array
before_validation :default_empty_manifest
+ before_validation :default_storage_classes, on: :create
before_validation :check_encoding
before_validation :check_manifest_validity
before_validation :check_signatures
before_validation :strip_signatures_and_update_replication_confirmed
validate :ensure_pdh_matches_manifest_text
+ validate :ensure_storage_classes_desired_is_not_empty
+ validate :ensure_storage_classes_contain_non_empty_strings
before_save :set_file_names
api_accessible :user, extend: :common do |t|
t.add :replication_desired
t.add :replication_confirmed
t.add :replication_confirmed_at
+ t.add :storage_classes_desired
+ t.add :storage_classes_confirmed
+ t.add :storage_classes_confirmed_at
t.add :delete_at
t.add :trash_at
t.add :is_trashed
end
def self.full_text_searchable_columns
- super - ["manifest_text"]
+ super - ["manifest_text", "storage_classes_desired", "storage_classes_confirmed"]
end
def self.where *args
end
protected
+
+ # Although the defaults for these columns is already set up on the schema,
+ # collection creation from an API client seems to ignore them, making the
+ # validation on empty desired storage classes return an error.
+ def default_storage_classes
+ if self.storage_classes_desired.nil? || self.storage_classes_desired.empty?
+ self.storage_classes_desired = ["default"]
+ end
+ self.storage_classes_confirmed ||= []
+ end
+
def portable_manifest_text
self.class.munge_manifest_locators(manifest_text) do |match|
if match[2] # size
end
def ensure_permission_to_save
- if (not current_user.andand.is_admin and
- (replication_confirmed_at_changed? or replication_confirmed_changed?) and
- not (replication_confirmed_at.nil? and replication_confirmed.nil?))
- raise ArvadosModel::PermissionDeniedError.new("replication_confirmed and replication_confirmed_at attributes cannot be changed, except by setting both to nil")
+ if (not current_user.andand.is_admin)
+ if (replication_confirmed_at_changed? or replication_confirmed_changed?) and
+ not (replication_confirmed_at.nil? and replication_confirmed.nil?)
+ raise ArvadosModel::PermissionDeniedError.new("replication_confirmed and replication_confirmed_at attributes cannot be changed, except by setting both to nil")
+ end
+ if (storage_classes_confirmed_changed? or storage_classes_confirmed_at_changed?) and
+ not (storage_classes_confirmed == [] and storage_classes_confirmed_at.nil?)
+ raise ArvadosModel::PermissionDeniedError.new("storage_classes_confirmed and storage_classes_confirmed_at attributes cannot be changed, except by setting them to [] and nil respectively")
+ end
end
super
end
+ def ensure_storage_classes_desired_is_not_empty
+ if self.storage_classes_desired.empty?
+ raise ArvadosModel::InvalidStateTransitionError.new("storage_classes_desired shouldn't be empty")
+ end
+ end
+
+ def ensure_storage_classes_contain_non_empty_strings
+ (self.storage_classes_desired + self.storage_classes_confirmed).each do |c|
+ if !c.is_a?(String) || c == ''
+ raise ArvadosModel::InvalidStateTransitionError.new("storage classes should only be non-empty strings")
+ end
+ end
+ end
end
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
+class AddStorageClassesToCollections < ActiveRecord::Migration
+ def up
+ add_column :collections, :storage_classes_desired, :jsonb, :default => ["default"]
+ add_column :collections, :storage_classes_confirmed, :jsonb, :default => []
+ add_column :collections, :storage_classes_confirmed_at, :datetime, :default => nil, :null => true
+ end
+
+ def down
+ remove_column :collections, :storage_classes_desired
+ remove_column :collections, :storage_classes_confirmed
+ remove_column :collections, :storage_classes_confirmed_at
+ end
+end
delete_at timestamp without time zone,
file_names character varying(8192),
trash_at timestamp without time zone,
- is_trashed boolean DEFAULT false NOT NULL
+ is_trashed boolean DEFAULT false NOT NULL,
+ storage_classes_desired jsonb DEFAULT '["default"]'::jsonb,
+ storage_classes_confirmed jsonb DEFAULT '[]'::jsonb,
+ storage_classes_confirmed_at timestamp without time zone
);
INSERT INTO schema_migrations (version) VALUES ('20171212153352');
+INSERT INTO schema_migrations (version) VALUES ('20180216203422');
+
manifest_text: ". acbd18db4cc2f85cedef654fccc4a4d8+3 37b51d194a7513e45b56f6524f2d51f2+3 0:3:foo 3:6:bar\n"
name: replication want=2 have=2
+storage_classes_desired_default_unconfirmed:
+ owner_uuid: zzzzz-tpzed-xurymjxw79nv3jz
+ created_at: 2015-02-07 00:21:35.050333515 Z
+ modified_by_user_uuid: zzzzz-tpzed-xurymjxw79nv3jz
+ modified_at: 2015-02-07 00:21:35.050189104 Z
+ portable_data_hash: fa7aeb5140e2848d39b416daeef4ffc5+45
+ storage_classes_desired: ["default"]
+ storage_classes_confirmed_at: ~
+ storage_classes_confirmed: ~
+ updated_at: 2015-02-07 00:21:35.050126576 Z
+ uuid: zzzzz-4zz18-3t236wrz4769tga
+ manifest_text: ". 37b51d194a7513e45b56f6524f2d51f2+3 0:3:bar\n"
+ name: storage classes want=[default] have=[]
+
+storage_classes_desired_default_confirmed_default:
+ owner_uuid: zzzzz-tpzed-xurymjxw79nv3jz
+ created_at: 2015-02-07 00:21:35.050333515 Z
+ modified_by_user_uuid: zzzzz-tpzed-xurymjxw79nv3jz
+ modified_at: 2015-02-07 00:21:35.050189104 Z
+ portable_data_hash: fa7aeb5140e2848d39b416daeef4ffc5+45
+ storage_classes_desired: ["default"]
+ storage_classes_confirmed_at: 2015-02-07 00:21:35.050126576 Z
+ storage_classes_confirmed: ["default"]
+ updated_at: 2015-02-07 00:21:35.050126576 Z
+ uuid: zzzzz-4zz18-3t236wr12769tga
+ manifest_text: ". 37b51d194a7513e45b56f6524f2d51f2+3 0:3:bar\n"
+ name: storage classes want=[default] have=[default]
+
+storage_classes_desired_archive_confirmed_default:
+ owner_uuid: zzzzz-tpzed-xurymjxw79nv3jz
+ created_at: 2015-02-07 00:21:35.050333515 Z
+ modified_by_user_uuid: zzzzz-tpzed-xurymjxw79nv3jz
+ modified_at: 2015-02-07 00:21:35.050189104 Z
+ portable_data_hash: fa7aeb5140e2848d39b416daeef4ffc5+45
+ storage_classes_desired: ["archive"]
+ storage_classes_confirmed_at: ~
+ storage_classes_confirmed: ["default"]
+ updated_at: 2015-02-07 00:21:35.050126576 Z
+ uuid: zzzzz-4zz18-3t236wr12769qqa
+ manifest_text: ". 37b51d194a7513e45b56f6524f2d51f2+3 0:3:bar\n"
+ name: storage classes want=[archive] have=[default]
+
collection_with_empty_properties:
uuid: zzzzz-4zz18-emptyproperties
portable_data_hash: fa7aeb5140e2848d39b416daeef4ffc5+45
end
end
- test "jsonb 'exists' and '!=' filter" do
+ test "jsonb hash 'exists' and '!=' filter" do
@controller = Arvados::V1::CollectionsController.new
authorize_with :admin
get :index, {
assert_includes(found, collections(:collection_with_prop1_other1).uuid)
end
- test "jsonb alternate form 'exists' and '!=' filter" do
+ test "jsonb array 'exists'" do
+ @controller = Arvados::V1::CollectionsController.new
+ authorize_with :admin
+ get :index, {
+ filters: [ ['storage_classes_confirmed.default', 'exists', true] ]
+ }
+ assert_response :success
+ found = assigns(:objects).collect(&:uuid)
+ assert_equal 2, found.length
+ assert_not_includes(found,
+ collections(:storage_classes_desired_default_unconfirmed).uuid)
+ assert_includes(found,
+ collections(:storage_classes_desired_default_confirmed_default).uuid)
+ assert_includes(found,
+ collections(:storage_classes_desired_archive_confirmed_default).uuid)
+ end
+
+ test "jsonb hash alternate form 'exists' and '!=' filter" do
@controller = Arvados::V1::CollectionsController.new
authorize_with :admin
get :index, {
assert_includes(found, collections(:collection_with_prop1_other1).uuid)
end
+ test "jsonb array alternate form 'exists' filter" do
+ @controller = Arvados::V1::CollectionsController.new
+ authorize_with :admin
+ get :index, {
+ filters: [ ['storage_classes_confirmed', 'exists', 'default'] ]
+ }
+ assert_response :success
+ found = assigns(:objects).collect(&:uuid)
+ assert_equal 2, found.length
+ assert_not_includes(found,
+ collections(:storage_classes_desired_default_unconfirmed).uuid)
+ assert_includes(found,
+ collections(:storage_classes_desired_default_confirmed_default).uuid)
+ assert_includes(found,
+ collections(:storage_classes_desired_archive_confirmed_default).uuid)
+ end
+
test "jsonb 'exists' must be boolean" do
@controller = Arvados::V1::CollectionsController.new
authorize_with :admin
end
end
+ test "storage_classes_desired cannot be empty" do
+ act_as_user users(:active) do
+ c = collections(:collection_owned_by_active)
+ c.update_attributes storage_classes_desired: ["hot"]
+ assert_equal ["hot"], c.storage_classes_desired
+ assert_raise ArvadosModel::InvalidStateTransitionError do
+ c.update_attributes storage_classes_desired: []
+ end
+ end
+ end
+
+ test "storage classes lists should only contain non-empty strings" do
+ c = collections(:storage_classes_desired_default_unconfirmed)
+ act_as_user users(:admin) do
+ assert c.update_attributes(storage_classes_desired: ["default", "a_string"],
+ storage_classes_confirmed: ["another_string"])
+ [
+ ["storage_classes_desired", ["default", 42]],
+ ["storage_classes_confirmed", [{the_answer: 42}]],
+ ["storage_classes_desired", ["default", ""]],
+ ["storage_classes_confirmed", [""]],
+ ].each do |attr, val|
+ assert_raise ArvadosModel::InvalidStateTransitionError do
+ assert c.update_attributes({attr => val})
+ end
+ end
+ end
+ end
+
+ test "storage_classes_confirmed* can be set by admin user" do
+ c = collections(:storage_classes_desired_default_unconfirmed)
+ act_as_user users(:admin) do
+ assert c.update_attributes(storage_classes_confirmed: ["default"],
+ storage_classes_confirmed_at: Time.now)
+ end
+ end
+
+ test "storage_classes_confirmed* cannot be set by non-admin user" do
+ act_as_user users(:active) do
+ c = collections(:storage_classes_desired_default_unconfirmed)
+ # Cannot set just one at a time.
+ assert_raise ArvadosModel::PermissionDeniedError do
+ c.update_attributes storage_classes_confirmed: ["default"]
+ end
+ c.reload
+ assert_raise ArvadosModel::PermissionDeniedError do
+ c.update_attributes storage_classes_confirmed_at: Time.now
+ end
+ # Cannot set bot at once, either.
+ c.reload
+ assert_raise ArvadosModel::PermissionDeniedError do
+ assert c.update_attributes(storage_classes_confirmed: ["default"],
+ storage_classes_confirmed_at: Time.now)
+ end
+ end
+ end
+
+ test "storage_classes_confirmed* can be cleared (but only together) by non-admin user" do
+ act_as_user users(:active) do
+ c = collections(:storage_classes_desired_default_confirmed_default)
+ # Cannot clear just one at a time.
+ assert_raise ArvadosModel::PermissionDeniedError do
+ c.update_attributes storage_classes_confirmed: []
+ end
+ c.reload
+ assert_raise ArvadosModel::PermissionDeniedError do
+ c.update_attributes storage_classes_confirmed_at: nil
+ end
+ # Can clear both at once.
+ c.reload
+ assert c.update_attributes(storage_classes_confirmed: [],
+ storage_classes_confirmed_at: nil)
+ end
+ end
+
[0, 2, 4, nil].each do |ask|
test "set replication_desired to #{ask.inspect}" do
Rails.configuration.default_collection_replication = 2
// Example: []string{"crunch-run", "--cgroup-parent-subsystem=memory"}
CrunchRunCommand []string
+ // Extra RAM to reserve (in Bytes) for SLURM job, in addition
+ // to the amount specified in the container's RuntimeConstraints
+ ReserveExtraRAM int64
+
// Minimum time between two attempts to run the same container
MinRetryPeriod arvados.Duration
}
}
func (disp *Dispatcher) sbatchArgs(container arvados.Container) ([]string, error) {
- mem := int64(math.Ceil(float64(container.RuntimeConstraints.RAM+container.RuntimeConstraints.KeepCacheRAM) / float64(1048576)))
+ mem := int64(math.Ceil(float64(container.RuntimeConstraints.RAM+container.RuntimeConstraints.KeepCacheRAM+disp.ReserveExtraRAM) / float64(1048576)))
var disk int64
for _, m := range container.Mounts {
},
"CrunchRunCommand": ["crunch-run"],
"PollPeriod": "10s",
- "SbatchArguments": ["--partition=foo", "--exclude=node13"]
+ "SbatchArguments": ["--partition=foo", "--exclude=node13"],
+ "ReserveExtraRAM": 268435456,
}`)
func usage(fs *flag.FlagSet) {
def _remove(self, obj, clear):
if clear:
- if obj.in_use():
- _logger.debug("InodeCache cannot clear inode %i, in use", obj.inode)
- return
+ # Kernel behavior seems to be that if a file is
+ # referenced, its parents remain referenced too. This
+ # means has_ref() exits early when a collection is not
+ # candidate for eviction.
+ #
+ # By contrast, in_use() doesn't increment references on
+ # parents, so it requires a full tree walk to determine if
+ # a collection is a candidate for eviction. This takes
+ # .07s for 240000 files, which becomes a major drag when
+ # cap_cache is being called several times a second and
+ # there are multiple non-evictable collections in the
+ # cache.
+ #
+ # So it is important for performance that we do the
+ # has_ref() check first.
+
if obj.has_ref(True):
_logger.debug("InodeCache cannot clear inode %i, still referenced", obj.inode)
return
+
+ if obj.in_use():
+ _logger.debug("InodeCache cannot clear inode %i, in use", obj.inode)
+ return
+
obj.kernel_invalidate()
_logger.debug("InodeCache sent kernel invalidate inode %i", obj.inode)
obj.clear()
if obj not in self._by_uuid[obj.cache_uuid]:
self._by_uuid[obj.cache_uuid].append(obj)
self._total += obj.objsize()
- _logger.debug("InodeCache touched inode %i (size %i) (uuid %s) total now %i", obj.inode, obj.objsize(), obj.cache_uuid, self._total)
+ _logger.debug("InodeCache touched inode %i (size %i) (uuid %s) total now %i (%i entries)",
+ obj.inode, obj.objsize(), obj.cache_uuid, self._total, len(self._entries))
self.cap_cache()
def touch(self, obj):
* Clear the object contents (invalidates the object)
"""
+
+ __slots__ = ("_stale", "_poll", "_last_update", "_atime", "_poll_time", "use_count",
+ "ref_count", "dead", "cache_size", "cache_uuid", "allow_attr_cache")
+
def __init__(self):
self._stale = True
self._poll = False
class File(FreshBase):
"""Base for file objects."""
+ __slots__ = ("inode", "parent_inode", "_mtime")
+
def __init__(self, parent_inode, _mtime=0):
super(File, self).__init__()
self.inode = None
class FuseArvadosFile(File):
"""Wraps a ArvadosFile."""
+ __slots__ = ('arvfile',)
+
def __init__(self, parent_inode, arvfile, _mtime):
super(FuseArvadosFile, self).__init__(parent_inode, _mtime)
self.arvfile = arvfile
AzureReplication int
ReadOnly bool
RequestTimeout arvados.Duration
+ StorageClasses []string
azClient storage.Client
container *azureContainer
return v.AzureReplication
}
+// GetStorageClasses implements Volume
+func (v *AzureBlobVolume) GetStorageClasses() []string {
+ return v.StorageClasses
+}
+
// If possible, translate an Azure SDK error to a recognizable error
// like os.ErrNotExist.
func (v *AzureBlobVolume) translateError(err error) error {
"time"
"github.com/Azure/azure-sdk-for-go/storage"
+ "github.com/ghodss/yaml"
check "gopkg.in/check.v1"
)
c.Check(stats(), check.Matches, `.*"InBytes":6,.*`)
}
+func (s *StubbedAzureBlobSuite) TestConfig(c *check.C) {
+ var cfg Config
+ err := yaml.Unmarshal([]byte(`
+Volumes:
+ - Type: Azure
+ StorageClasses: ["class_a", "class_b"]
+`), &cfg)
+
+ c.Check(err, check.IsNil)
+ c.Check(cfg.Volumes[0].GetStorageClasses(), check.DeepEquals, []string{"class_a", "class_b"})
+}
+
func (v *TestableAzureBlobVolume) PutRaw(locator string, data []byte) {
v.azHandler.PutRaw(v.ContainerName, locator, data)
}
resp := s.call("GET", "/mounts", "", nil)
c.Check(resp.Code, check.Equals, http.StatusOK)
var mntList []struct {
- UUID string
- DeviceID string
- ReadOnly bool
- Replication int
- Tier int
+ UUID string
+ DeviceID string
+ ReadOnly bool
+ Replication int
+ StorageClasses []string
}
err := json.Unmarshal(resp.Body.Bytes(), &mntList)
c.Assert(err, check.IsNil)
c.Check(m.DeviceID, check.Equals, "mock-device-id")
c.Check(m.ReadOnly, check.Equals, false)
c.Check(m.Replication, check.Equals, 1)
- c.Check(m.Tier, check.Equals, 1)
+ c.Check(m.StorageClasses, check.DeepEquals, []string{"default"})
}
c.Check(mntList[0].UUID, check.Not(check.Equals), mntList[1].UUID)
RaceWindow arvados.Duration
ReadOnly bool
UnsafeDelete bool
+ StorageClasses []string
bucket *s3bucket
return v.S3Replication
}
+// GetStorageClasses implements Volume
+func (v *S3Volume) GetStorageClasses() []string {
+ return v.StorageClasses
+}
+
var s3KeepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
func (v *S3Volume) isKeepBlock(s string) bool {
"git.curoverse.com/arvados.git/sdk/go/arvados"
"github.com/AdRoll/goamz/s3"
"github.com/AdRoll/goamz/s3/s3test"
+ "github.com/ghodss/yaml"
check "gopkg.in/check.v1"
)
return v
}
+func (s *StubbedS3Suite) TestConfig(c *check.C) {
+ var cfg Config
+ err := yaml.Unmarshal([]byte(`
+Volumes:
+ - Type: S3
+ StorageClasses: ["class_a", "class_b"]
+`), &cfg)
+
+ c.Check(err, check.IsNil)
+ c.Check(cfg.Volumes[0].GetStorageClasses(), check.DeepEquals, []string{"class_a", "class_b"})
+}
+
func (v *TestableS3Volume) Start() error {
tmp, err := ioutil.TempFile("", "keepstore")
v.c.Assert(err, check.IsNil)
// Return a globally unique ID of the underlying storage
// device if possible, otherwise "".
DeviceID() string
+
+ // Get the storage classes associated with this volume
+ GetStorageClasses() []string
}
// A VolumeWithExamples provides example configs to display in the
// A VolumeMount is an attachment of a Volume to a VolumeManager.
type VolumeMount struct {
- UUID string
- DeviceID string
- ReadOnly bool
- Replication int
- Tier int
- volume Volume
+ UUID string
+ DeviceID string
+ ReadOnly bool
+ Replication int
+ StorageClasses []string
+ volume Volume
}
// Generate a UUID the way API server would for a "KeepVolumeMount"
}
vm.mountMap = make(map[string]*VolumeMount)
for _, v := range volumes {
+ sc := v.GetStorageClasses()
+ if len(sc) == 0 {
+ sc = []string{"default"}
+ }
mnt := &VolumeMount{
- UUID: (*VolumeMount)(nil).generateUUID(),
- DeviceID: v.DeviceID(),
- ReadOnly: !v.Writable(),
- Replication: v.Replication(),
- Tier: 1,
- volume: v,
+ UUID: (*VolumeMount)(nil).generateUUID(),
+ DeviceID: v.DeviceID(),
+ ReadOnly: !v.Writable(),
+ Replication: v.Replication(),
+ StorageClasses: sc,
+ volume: v,
}
vm.iostats[v] = &ioStats{}
vm.mounts = append(vm.mounts, mnt)
func (v *MockVolume) EmptyTrash() {
}
+
+func (v *MockVolume) GetStorageClasses() []string {
+ return nil
+}
ReadOnly bool
Serialize bool
DirectoryReplication int
+ StorageClasses []string
// something to lock during IO, typically a sync.Mutex (or nil
// to skip locking)
return v.DirectoryReplication
}
+// GetStorageClasses implements Volume
+func (v *UnixVolume) GetStorageClasses() []string {
+ return v.StorageClasses
+}
+
// InternalStats returns I/O and filesystem ops counters.
func (v *UnixVolume) InternalStats() interface{} {
return &v.os.stats
"testing"
"time"
+ "github.com/ghodss/yaml"
check "gopkg.in/check.v1"
)
c.Check(err, check.IsNil)
c.Check(stats(), check.Matches, `.*"FlockOps":2,.*`)
}
+
+func (s *UnixVolumeSuite) TestConfig(c *check.C) {
+ var cfg Config
+ err := yaml.Unmarshal([]byte(`
+Volumes:
+ - Type: Directory
+ StorageClasses: ["class_a", "class_b"]
+`), &cfg)
+
+ c.Check(err, check.IsNil)
+ c.Check(cfg.Volumes[0].GetStorageClasses(), check.DeepEquals, []string{"class_a", "class_b"})
+}
return fmt.Errorf("error searching for parent group: %s", err)
}
if len(gl.Items) == 0 {
- // Default parent group not existant, create one.
+ // Default parent group does not exist, create it.
if cfg.Verbose {
log.Println("Default parent group not found, creating...")
}