wget -cqO common-generated/$(NODETARBALL) https://nodejs.org/dist/v6.11.2/$(NODETARBALL)
common-generated/$(RVMKEY): common-generated
- wget -cqO common-generated/$(RVMKEY) https://rvm.io/mpapis.asc
+ wget -cqO common-generated/$(RVMKEY) https://rvm.io/pkuczynski.asc
common-generated:
mkdir common-generated
common-generated-all: common-generated/$(RVMKEY)
common-generated/$(RVMKEY): common-generated
- wget -cqO common-generated/$(RVMKEY) https://rvm.io/mpapis.asc
+ wget -cqO common-generated/$(RVMKEY) https://rvm.io/pkuczynski.asc
common-generated:
mkdir common-generated
from apiclient import errors as apiclient_errors
from arvados._version import __version__
+from arvados.util import keep_locator_pattern
import arvados.commands._util as arv_cmd
pass
+class ResumeCacheInvalidError(Exception):
+ pass
+
class ArvPutArgumentConflict(Exception):
pass
new_cache = os.fdopen(new_cache_fd, 'r+')
json.dump(data, new_cache)
os.rename(new_cache_name, self.filename)
- except (IOError, OSError, ResumeCacheConflict) as error:
+ except (IOError, OSError, ResumeCacheConflict):
try:
os.unlink(new_cache_name)
except NameError: # mkstemp failed.
def _build_upload_list(self):
"""
- Scan the requested paths to count file sizes, excluding files & dirs if requested
- and building the upload file list.
+ Scan the requested paths to count file sizes, excluding requested files
+ and dirs and building the upload file list.
"""
# If there aren't special files to be read, reset total bytes count to zero
# to start counting.
def _my_collection(self):
return self._remote_collection if self.update else self._local_collection
+ def _get_cache_filepath(self):
+ # Set up cache file name from input paths.
+ md5 = hashlib.md5()
+ md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
+ realpaths = sorted(os.path.realpath(path) for path in self.paths)
+ md5.update(b'\0'.join([p.encode() for p in realpaths]))
+ if self.filename:
+ md5.update(self.filename.encode())
+ cache_filename = md5.hexdigest()
+ cache_filepath = os.path.join(
+ arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
+ cache_filename)
+ return cache_filepath
+
def _setup_state(self, update_collection):
"""
Create a new cache file or load a previously existing one.
raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
if self.use_cache:
- # Set up cache file name from input paths.
- md5 = hashlib.md5()
- md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
- realpaths = sorted(os.path.realpath(path) for path in self.paths)
- md5.update(b'\0'.join([p.encode() for p in realpaths]))
- if self.filename:
- md5.update(self.filename.encode())
- cache_filename = md5.hexdigest()
- cache_filepath = os.path.join(
- arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
- cache_filename)
+ cache_filepath = self._get_cache_filepath()
if self.resume and os.path.exists(cache_filepath):
self.logger.info("Resuming upload from cache file {}".format(cache_filepath))
self._cache_file = open(cache_filepath, 'a+')
self.logger.info("No cache usage requested for this run.")
# No cache file, set empty state
self._state = copy.deepcopy(self.EMPTY_STATE)
+ if not self._cached_manifest_valid():
+ raise ResumeCacheInvalidError()
# Load the previous manifest so we can check if files were modified remotely.
self._local_collection = arvados.collection.Collection(
self._state['manifest'],
put_threads=self.put_threads,
api_client=self._api_client)
+ def _cached_manifest_valid(self):
+ """
+ Validate the oldest non-expired block signature to check if cached manifest
+ is usable: checking if the cached manifest was not created with a different
+ arvados account.
+ """
+ if self._state.get('manifest', None) is None:
+ # No cached manifest yet, all good.
+ return True
+ now = datetime.datetime.utcnow()
+ oldest_exp = None
+ oldest_loc = None
+ block_found = False
+ for m in keep_locator_pattern.finditer(self._state['manifest']):
+ loc = m.group(0)
+ try:
+ exp = datetime.datetime.utcfromtimestamp(int(loc.split('@')[1], 16))
+ except IndexError:
+ # Locator without signature
+ continue
+ block_found = True
+ if exp > now and (oldest_exp is None or exp < oldest_exp):
+ oldest_exp = exp
+ oldest_loc = loc
+ if not block_found:
+ # No block signatures found => no invalid block signatures.
+ return True
+ if oldest_loc is None:
+ # Locator signatures found, but all have expired.
+ # Reset the cache and move on.
+ self.logger.info('Cache expired, starting from scratch.')
+ self._state['manifest'] = ''
+ return True
+ kc = arvados.KeepClient(api_client=self._api_client,
+ num_retries=self.num_retries)
+ try:
+ kc.head(oldest_loc)
+ except arvados.errors.KeepRequestError:
+ # Something is wrong, cached manifest is not valid.
+ return False
+ return True
+
def collection_file_paths(self, col, path_prefix='.'):
"""Return a list of file paths by recursively go through the entire collection `col`"""
file_paths = []
"arv-put: Another process is already uploading this data.",
" Use --no-cache if this is really what you want."]))
sys.exit(1)
+ except ResumeCacheInvalidError:
+ logger.error("\n".join([
+ "arv-put: Resume cache contains invalid signature: it may have expired",
+ " or been created with another Arvados user's credentials.",
+ " Switch user or use one of the following options to restart upload:",
+ " --no-resume to start a new resume cache.",
+ " --no-cache to disable resume cache."]))
+ sys.exit(1)
except CollectionUpdateError as error:
logger.error("\n".join([
"arv-put: %s" % str(error)]))
if local_store:
self.local_store = local_store
+ self.head = self.local_store_head
self.get = self.local_store_get
self.put = self.local_store_put
else:
with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f:
return f.read()
+ def local_store_head(self, loc_s, num_retries=None):
+ """Companion to local_store_put()."""
+ try:
+ locator = KeepLocator(loc_s)
+ except ValueError:
+ raise arvados.errors.NotFoundError(
+ "Invalid data locator: '%s'" % loc_s)
+ if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
+ return True
+ if os.path.exists(os.path.join(self.local_store, locator.md5sum)):
+ return True
+
def is_cached(self, locator):
return self.block_cache.reserve_cache(expect_hash)
standard_library.install_aliases()
from builtins import str
from builtins import range
+from functools import partial
import apiclient
import datetime
import hashlib
resume=False)
del(self.writer)
+class CachedManifestValidationTest(ArvadosBaseTestCase):
+ class MockedPut(arv_put.ArvPutUploadJob):
+ def __init__(self, cached_manifest=None):
+ self._state = arv_put.ArvPutUploadJob.EMPTY_STATE
+ self._state['manifest'] = cached_manifest
+ self._api_client = mock.MagicMock()
+ self.logger = mock.MagicMock()
+ self.num_retries = 1
+
+ def datetime_to_hex(self, dt):
+ return hex(int(time.mktime(dt.timetuple())))[2:]
+
+ def setUp(self):
+ super(CachedManifestValidationTest, self).setUp()
+ self.block1 = "fdba98970961edb29f88241b9d99d890" # foo
+ self.block2 = "37b51d194a7513e45b56f6524f2d51f2" # bar
+ self.template = ". "+self.block1+"+3+Asignature@%s "+self.block2+"+3+Anothersignature@%s 0:3:foofile.txt 3:6:barfile.txt\n"
+
+ def test_empty_cached_manifest_is_valid(self):
+ put_mock = self.MockedPut()
+ self.assertEqual(None, put_mock._state.get('manifest'))
+ self.assertTrue(put_mock._cached_manifest_valid())
+ put_mock._state['manifest'] = ''
+ self.assertTrue(put_mock._cached_manifest_valid())
+
+ def test_signature_cases(self):
+ now = datetime.datetime.utcnow()
+ yesterday = now - datetime.timedelta(days=1)
+ lastweek = now - datetime.timedelta(days=7)
+ tomorrow = now + datetime.timedelta(days=1)
+ nextweek = now + datetime.timedelta(days=7)
+
+ def mocked_head(blocks={}, loc=None):
+ blk = loc.split('+', 1)[0]
+ if blocks.get(blk):
+ return True
+ raise arvados.errors.KeepRequestError("mocked error - block invalid")
+
+ # Block1_expiration, Block2_expiration, Block1_HEAD, Block2_HEAD, Expectation
+ cases = [
+ # All expired, reset cache - OK
+ (yesterday, lastweek, False, False, True),
+ (lastweek, yesterday, False, False, True),
+ # All non-expired valid blocks - OK
+ (tomorrow, nextweek, True, True, True),
+ (nextweek, tomorrow, True, True, True),
+ # All non-expired invalid blocks - Not OK
+ (tomorrow, nextweek, False, False, False),
+ (nextweek, tomorrow, False, False, False),
+ # One non-expired valid block - OK
+ (tomorrow, yesterday, True, False, True),
+ (yesterday, tomorrow, False, True, True),
+ # One non-expired invalid block - Not OK
+ (tomorrow, yesterday, False, False, False),
+ (yesterday, tomorrow, False, False, False),
+ ]
+ for case in cases:
+ b1_expiration, b2_expiration, b1_valid, b2_valid, outcome = case
+ head_responses = {
+ self.block1: b1_valid,
+ self.block2: b2_valid,
+ }
+ cached_manifest = self.template % (
+ self.datetime_to_hex(b1_expiration),
+ self.datetime_to_hex(b2_expiration),
+ )
+ arvput = self.MockedPut(cached_manifest)
+ with mock.patch('arvados.collection.KeepClient.head') as head_mock:
+ head_mock.side_effect = partial(mocked_head, head_responses)
+ self.assertEqual(outcome, arvput._cached_manifest_valid(),
+ "Case '%s' should have produced outcome '%s'" % (case, outcome)
+ )
+ if b1_expiration > now or b2_expiration > now:
+ # A HEAD request should have been done
+ head_mock.assert_called_once()
+ else:
+ head_mock.assert_not_called()
+
+
class ArvadosExpectedBytesTest(ArvadosBaseTestCase):
TEST_SIZE = os.path.getsize(__file__)
writer.bytes_expected)
def test_expected_bytes_for_device(self):
- writer = arv_put.ArvPutUploadJob(['/dev/null'])
+ writer = arv_put.ArvPutUploadJob(['/dev/null'], use_cache=False, resume=False)
self.assertIsNone(writer.bytes_expected)
writer = arv_put.ArvPutUploadJob([__file__, '/dev/null'])
self.assertIsNone(writer.bytes_expected)
self.assertEqual(1, len(collection_list))
return collection_list[0]
- def test_expired_token_invalidates_cache(self):
+ def test_all_expired_signatures_invalidates_cache(self):
self.authorize_with('active')
tmpdir = self.make_tmpdir()
with open(os.path.join(tmpdir, 'somefile.txt'), 'w') as f:
(out, err) = p.communicate()
self.assertRegex(
err.decode(),
- r'WARNING: Uploaded file .* access token expired, will re-upload it from scratch')
+ r'INFO: Cache expired, starting from scratch.*')
+ self.assertEqual(p.returncode, 0)
+
+ def test_invalid_signature_invalidates_cache(self):
+ self.authorize_with('active')
+ tmpdir = self.make_tmpdir()
+ with open(os.path.join(tmpdir, 'somefile.txt'), 'w') as f:
+ f.write('foo')
+ # Upload a directory and get the cache file name
+ p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ env=self.ENVIRON)
+ (out, err) = p.communicate()
+ self.assertRegex(err.decode(), r'INFO: Creating new cache file at ')
+ self.assertEqual(p.returncode, 0)
+ cache_filepath = re.search(r'INFO: Creating new cache file at (.*)',
+ err.decode()).groups()[0]
+ self.assertTrue(os.path.isfile(cache_filepath))
+ # Load the cache file contents and modify the manifest to simulate
+ # an invalid access token
+ with open(cache_filepath, 'r') as c:
+ cache = json.load(c)
+ self.assertRegex(cache['manifest'], r'\+A\S+\@')
+ cache['manifest'] = re.sub(
+ r'\+A.*\@',
+ "+Aabcdef0123456789abcdef0123456789abcdef01@",
+ cache['manifest'])
+ with open(cache_filepath, 'w') as c:
+ c.write(json.dumps(cache))
+ # Re-run the upload and expect to get an invalid cache message
+ p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ env=self.ENVIRON)
+ (out, err) = p.communicate()
+ self.assertRegex(
+ err.decode(),
+ r'ERROR: arv-put: Resume cache contains invalid signature.*')
+ self.assertEqual(p.returncode, 1)
+
+ def test_single_expired_signature_reuploads_file(self):
+ self.authorize_with('active')
+ tmpdir = self.make_tmpdir()
+ with open(os.path.join(tmpdir, 'foofile.txt'), 'w') as f:
+ f.write('foo')
+ # Write a second file on its own subdir to force a new stream
+ os.mkdir(os.path.join(tmpdir, 'bar'))
+ with open(os.path.join(tmpdir, 'bar', 'barfile.txt'), 'w') as f:
+ f.write('bar')
+ # Upload a directory and get the cache file name
+ p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ env=self.ENVIRON)
+ (out, err) = p.communicate()
+ self.assertRegex(err.decode(), r'INFO: Creating new cache file at ')
+ self.assertEqual(p.returncode, 0)
+ cache_filepath = re.search(r'INFO: Creating new cache file at (.*)',
+ err.decode()).groups()[0]
+ self.assertTrue(os.path.isfile(cache_filepath))
+ # Load the cache file contents and modify the manifest to simulate
+ # an expired access token
+ with open(cache_filepath, 'r') as c:
+ cache = json.load(c)
+ self.assertRegex(cache['manifest'], r'\+A\S+\@')
+ a_month_ago = datetime.datetime.now() - datetime.timedelta(days=30)
+ # Make one of the signatures appear to have expired
+ cache['manifest'] = re.sub(
+ r'\@.*? 3:3:barfile.txt',
+ "@{} 3:3:barfile.txt".format(self.datetime_to_hex(a_month_ago)),
+ cache['manifest'])
+ with open(cache_filepath, 'w') as c:
+ c.write(json.dumps(cache))
+ # Re-run the upload and expect to get an invalid cache message
+ p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ env=self.ENVIRON)
+ (out, err) = p.communicate()
+ self.assertRegex(
+ err.decode(),
+ r'WARNING: Uploaded file \'.*barfile.txt\' access token expired, will re-upload it from scratch')
self.assertEqual(p.returncode, 0)
# Confirm that the resulting cache is different from the last run.
with open(cache_filepath, 'r') as c2:
super
# head_kind and tail_kind columns are now virtual,
- # equivilent functionality is now provided by
+ # equivalent functionality is now provided by
# 'is_a', so fix up any old-style 'where' clauses.
if @where
@filters ||= []
super
# head_kind and tail_kind columns are now virtual,
- # equivilent functionality is now provided by
+ # equivalent functionality is now provided by
# 'is_a', so fix up any old-style 'filter' clauses.
@filters = @filters.map do |k|
if k[0] == 'head_kind' and k[1] == '='
self.owner_uuid ||= current_default_owner if self.respond_to? :owner_uuid=
if !anonymous_updater
self.modified_by_user_uuid = current_user ? current_user.uuid : nil
+ end
+ if !timeless_updater
self.modified_at = current_time
end
self.modified_by_client_uuid = current_api_client ? current_api_client.uuid : nil
# Use a different validation context to skip the 'old_versions_cannot_be_updated'
# validator, as on this case it is legal to update some fields.
leave_modified_by_user_alone do
- c.save(context: :update_old_versions)
+ leave_modified_at_alone do
+ c.save(context: :update_old_versions)
+ end
end
end
end
--- /dev/null
+class AddExpressionIndexToLinks < ActiveRecord::Migration
+ def up
+ ActiveRecord::Base.connection.execute 'CREATE INDEX index_links_on_substring_head_uuid on links (substring(head_uuid, 7, 5))'
+ ActiveRecord::Base.connection.execute 'CREATE INDEX index_links_on_substring_tail_uuid on links (substring(tail_uuid, 7, 5))'
+ end
+
+ def down
+ ActiveRecord::Base.connection.execute 'DROP INDEX index_links_on_substring_head_uuid'
+ ActiveRecord::Base.connection.execute 'DROP INDEX index_links_on_substring_tail_uuid'
+ end
+end
CREATE INDEX index_links_on_owner_uuid ON public.links USING btree (owner_uuid);
+--
+-- Name: index_links_on_substring_head_uuid; Type: INDEX; Schema: public; Owner: -
+--
+
+CREATE INDEX index_links_on_substring_head_uuid ON public.links USING btree ("substring"((head_uuid)::text, 7, 5));
+
+
+--
+-- Name: index_links_on_substring_tail_uuid; Type: INDEX; Schema: public; Owner: -
+--
+
+CREATE INDEX index_links_on_substring_tail_uuid ON public.links USING btree ("substring"((tail_uuid)::text, 7, 5));
+
+
--
-- Name: index_links_on_tail_uuid; Type: INDEX; Schema: public; Owner: -
--
INSERT INTO schema_migrations (version) VALUES ('20181011184200');
+INSERT INTO schema_migrations (version) VALUES ('20181213183234');
+
Thread.current[:anonymous_updater] = anonymous_updater_was
end
end
+
+ # ArvadosModel checks this to decide whether it should update the
+ # 'modified_at' field.
+ def timeless_updater
+ Thread.current[:timeless_updater] || false
+ end
+
+ def leave_modified_at_alone
+ timeless_updater_was = timeless_updater
+ begin
+ Thread.current[:timeless_updater] = true
+ yield
+ ensure
+ Thread.current[:timeless_updater] = timeless_updater_was
+ end
+ end
+
end
subproperty[1] = subproperty[1][1..-2]
end
- # jsonb search
+ # jsonb search
case operator.downcase
when '=', '!='
not_in = if operator.downcase == "!=" then "NOT " else "" end
"for '#{operator}' operator in filters")
end
when 'exists'
- if operand == true
- cond_out << "jsonb_exists(#{ar_table_name}.#{subproperty[0]}, ?)"
- elsif operand == false
- cond_out << "(NOT jsonb_exists(#{ar_table_name}.#{subproperty[0]}, ?)) OR #{ar_table_name}.#{subproperty[0]} is NULL"
- else
- raise ArgumentError.new("Invalid operand '#{operand}' for '#{operator}' must be true or false")
- end
- param_out << subproperty[1]
+ if operand == true
+ cond_out << "jsonb_exists(#{ar_table_name}.#{subproperty[0]}, ?)"
+ elsif operand == false
+ cond_out << "(NOT jsonb_exists(#{ar_table_name}.#{subproperty[0]}, ?)) OR #{ar_table_name}.#{subproperty[0]} is NULL"
+ else
+ raise ArgumentError.new("Invalid operand '#{operand}' for '#{operator}' must be true or false")
+ end
+ param_out << subproperty[1]
else
raise ArgumentError.new("Invalid operator for subproperty search '#{operator}'")
end
operand.each do |op|
cl = ArvadosModel::kind_class op
if cl
- cond << "#{ar_table_name}.#{attr} like ?"
- param_out << cl.uuid_like_pattern
+ if attr == 'uuid'
+ if model_class.uuid_prefix == cl.uuid_prefix
+ cond << "1=1"
+ else
+ cond << "1=0"
+ end
+ else
+ # Use a substring query to support remote uuids
+ cond << "substring(#{ar_table_name}.#{attr}, 7, 5) = ?"
+ param_out << cl.uuid_prefix
+ end
else
cond << "1=0"
end
head_uuid: zzzzz-4zz18-znfnqtbbv4spc3w
properties: {}
+foo_file_readable_by_federated_active:
+ uuid: zzzzz-o0j2j-dp1d8395ldqw23r
+ owner_uuid: zzzzz-tpzed-000000000000000
+ created_at: 2014-01-24 20:42:26 -0800
+ modified_by_client_uuid: zzzzz-ozdt8-brczlopd8u8d0jr
+ modified_by_user_uuid: zzzzz-tpzed-000000000000000
+ modified_at: 2014-01-24 20:42:26 -0800
+ updated_at: 2014-01-24 20:42:26 -0800
+ tail_uuid: zbbbb-tpzed-xurymjxw79nv3jz
+ link_class: permission
+ name: can_read
+ head_uuid: zzzzz-4zz18-znfnqtbbv4spc3w
+ properties: {}
+
foo_file_readable_by_active_duplicate_permission:
uuid: zzzzz-o0j2j-2qlmhgothiur55r
owner_uuid: zzzzz-tpzed-000000000000000
assert_equal found.count, (found.select { |f| f.tail_uuid.match User.uuid_regex }).count
end
+ test "filter links with 'is_a' operator includes remote objects" do
+ authorize_with :admin
+ get :index, {
+ filters: [
+ ['tail_uuid', 'is_a', 'arvados#user'],
+ ['link_class', '=', 'permission'],
+ ['name', '=', 'can_read'],
+ ['head_uuid', '=', collections(:foo_file).uuid],
+ ]
+ }
+ assert_response :success
+ found = assigns(:objects)
+ assert_not_equal 0, found.count
+ assert_includes(found.map(&:tail_uuid),
+ users(:federated_active).uuid)
+ end
+
test "filter links with 'is_a' operator with more than one" do
authorize_with :admin
get :index, {
Operand: group.UUID,
}, {
Attr: "head_uuid",
- Operator: "like",
- Operand: "%-tpzed-%",
+ Operator: "is_a",
+ Operand: "arvados#user",
}},
}
// User -> Group filter
Operand: group.UUID,
}, {
Attr: "tail_uuid",
- Operator: "like",
- Operand: "%-tpzed-%",
+ Operator: "is_a",
+ Operand: "arvados#user",
}},
}
g2uLinks, err := GetAll(cfg.Client, "links", g2uFilter, &LinkList{})