#
# SPDX-License-Identifier: Apache-2.0
-from __future__ import absolute_import
-from __future__ import division
-from future import standard_library
-standard_library.install_aliases()
-from builtins import str
-from builtins import range
-from functools import partial
import apiclient
+import ciso8601
import datetime
-import hashlib
import json
import logging
-import mock
+import multiprocessing
import os
import pwd
import random
import time
import unittest
import uuid
-import yaml
+
+from functools import partial
+from unittest import mock
import arvados
import arvados.commands.put as arv_put
def test_cache_is_locked(self):
with tempfile.NamedTemporaryFile() as cachefile:
- cache = arv_put.ResumeCache(cachefile.name)
+ _ = arv_put.ResumeCache(cachefile.name)
self.assertRaises(arv_put.ResumeCacheConflict,
arv_put.ResumeCache, cachefile.name)
shutil.rmtree(self.small_files_dir)
shutil.rmtree(self.tempdir_with_symlink)
+ def test_non_regular_files_are_ignored_except_symlinks_to_dirs(self):
+ def pfunc(x):
+ with open(x, 'w') as f:
+ f.write('test')
+ fifo_filename = 'fifo-file'
+ fifo_path = os.path.join(self.tempdir_with_symlink, fifo_filename)
+ self.assertTrue(os.path.islink(os.path.join(self.tempdir_with_symlink, 'linkeddir')))
+ os.mkfifo(fifo_path)
+ producer = multiprocessing.Process(target=pfunc, args=(fifo_path,))
+ producer.start()
+ cwriter = arv_put.ArvPutUploadJob([self.tempdir_with_symlink])
+ cwriter.start(save_collection=False)
+ if producer.exitcode is None:
+ # If the producer is still running, kill it. This should always be
+ # before any assertion that may fail.
+ producer.terminate()
+ producer.join(1)
+ self.assertIn('linkeddir', cwriter.manifest_text())
+ self.assertNotIn(fifo_filename, cwriter.manifest_text())
+
def test_symlinks_are_followed_by_default(self):
+ self.assertTrue(os.path.islink(os.path.join(self.tempdir_with_symlink, 'linkeddir')))
+ self.assertTrue(os.path.islink(os.path.join(self.tempdir_with_symlink, 'linkedfile')))
cwriter = arv_put.ArvPutUploadJob([self.tempdir_with_symlink])
cwriter.start(save_collection=False)
self.assertIn('linkeddir', cwriter.manifest_text())
cwriter.destroy_cache()
def test_symlinks_are_not_followed_when_requested(self):
+ self.assertTrue(os.path.islink(os.path.join(self.tempdir_with_symlink, 'linkeddir')))
+ self.assertTrue(os.path.islink(os.path.join(self.tempdir_with_symlink, 'linkedfile')))
cwriter = arv_put.ArvPutUploadJob([self.tempdir_with_symlink],
follow_links=False)
cwriter.start(save_collection=False)
self.assertNotIn('linkeddir', cwriter.manifest_text())
self.assertNotIn('linkedfile', cwriter.manifest_text())
cwriter.destroy_cache()
+ # Check for bug #17800: passed symlinks should also be ignored.
+ linked_dir = os.path.join(self.tempdir_with_symlink, 'linkeddir')
+ cwriter = arv_put.ArvPutUploadJob([linked_dir], follow_links=False)
+ cwriter.start(save_collection=False)
+ self.assertNotIn('linkeddir', cwriter.manifest_text())
+ cwriter.destroy_cache()
+
+ def test_no_empty_collection_saved(self):
+ self.assertTrue(os.path.islink(os.path.join(self.tempdir_with_symlink, 'linkeddir')))
+ linked_dir = os.path.join(self.tempdir_with_symlink, 'linkeddir')
+ cwriter = arv_put.ArvPutUploadJob([linked_dir], follow_links=False)
+ cwriter.start(save_collection=True)
+ self.assertIsNone(cwriter.manifest_locator())
+ self.assertEqual('', cwriter.manifest_text())
+ cwriter.destroy_cache()
def test_passing_nonexistant_path_raise_exception(self):
uuid_str = str(uuid.uuid4())
with self.assertRaises(arv_put.PathDoesNotExistError):
- cwriter = arv_put.ArvPutUploadJob(["/this/path/does/not/exist/{}".format(uuid_str)])
+ arv_put.ArvPutUploadJob(["/this/path/does/not/exist/{}".format(uuid_str)])
def test_writer_works_without_cache(self):
cwriter = arv_put.ArvPutUploadJob(['/dev/null'], resume=False)
def test_put_block_replication(self):
self.call_main_on_test_file()
+ arv_put.api_client = None
with mock.patch('arvados.collection.KeepClient.local_store_put') as put_mock:
put_mock.return_value = 'acbd18db4cc2f85cedef654fccc4a4d8+3'
self.call_main_on_test_file(['--replication', '1'])
self.call_main_with_args,
['--project-uuid', self.Z_UUID, '--stream'])
- def test_error_when_multiple_storage_classes_specified(self):
- self.assertRaises(SystemExit,
- self.call_main_with_args,
- ['--storage-classes', 'hot,cold'])
-
def test_error_when_excluding_absolute_path(self):
tmpdir = self.make_tmpdir()
self.assertRaises(SystemExit,
fake_httplib2_response(403), b'{}')
with mock.patch('arvados.collection.Collection.save_new',
new=coll_save_mock):
- with self.assertRaises(SystemExit) as exc_test:
+ with self.assertRaises(SystemExit):
self.call_main_with_args(['/dev/null'])
self.assertRegex(
self.main_stderr.getvalue(), matcher)
class ArvPutIntegrationTest(run_test_server.TestCaseWithServers,
ArvadosBaseTestCase):
- def _getKeepServerConfig():
- for config_file, mandatory in [
- ['application.yml', False], ['application.default.yml', True]]:
- path = os.path.join(run_test_server.SERVICES_SRC_DIR,
- "api", "config", config_file)
- if not mandatory and not os.path.exists(path):
- continue
- with open(path) as f:
- rails_config = yaml.safe_load(f.read())
- for config_section in ['test', 'common']:
- try:
- key = rails_config[config_section]["blob_signing_key"]
- except (KeyError, TypeError):
- pass
- else:
- return {'blob_signing_key': key,
- 'enforce_permissions': True}
- return {'blog_signing_key': None, 'enforce_permissions': False}
-
MAIN_SERVER = {}
- KEEP_SERVER = _getKeepServerConfig()
+ KEEP_SERVER = {'blob_signing': True}
PROJECT_UUID = run_test_server.fixture('groups')['aproject']['uuid']
@classmethod
BAD_UUID = 'zzzzz-tpzed-zzzzzzzzzzzzzzz'
self.authorize_with('active')
with self.assertRaises(apiclient.errors.HttpError):
- result = arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID,
+ arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID,
0)
def test_short_put_from_stdin(self):
[sys.executable, arv_put.__file__, '--stream'],
stdin=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT, env=self.ENVIRON)
- pipe.stdin.write(b'stdin test\n')
+ pipe.stdin.write(b'stdin test\xa6\n')
pipe.stdin.close()
deadline = time.time() + 5
while (pipe.poll() is None) and (time.time() < deadline):
elif returncode != 0:
sys.stdout.write(pipe.stdout.read())
self.fail("arv-put returned exit code {}".format(returncode))
- self.assertIn('4a9c8b735dce4b5fa3acf221a0b13628+11',
+ self.assertIn('1cb671b355a0c23d5d1c61d59cdb1b2b+12',
pipe.stdout.read().decode())
def test_sigint_logs_request_id(self):
# we're about to create is not present in our test fixture.
manifest_uuid = "00b4e9f40ac4dd432ef89749f1c01e74+47"
with self.assertRaises(apiclient.errors.HttpError):
- notfound = arv_put.api_client.collections().get(
+ arv_put.api_client.collections().get(
uuid=manifest_uuid).execute()
datadir = self.make_tmpdir()
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=self.ENVIRON)
- (out, err) = p.communicate()
+ (_, err) = p.communicate()
self.assertRegex(err.decode(), r'INFO: Collection saved as ')
self.assertEqual(p.returncode, 0)
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=self.ENVIRON)
- (out, err) = p.communicate()
+ (_, err) = p.communicate()
self.assertRegex(err.decode(), r'INFO: Creating new cache file at ')
self.assertEqual(p.returncode, 0)
cache_filepath = re.search(r'INFO: Creating new cache file at (.*)',
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=self.ENVIRON)
- (out, err) = p.communicate()
+ (_, err) = p.communicate()
self.assertRegex(
err.decode(),
r'INFO: Cache expired, starting from scratch.*')
self.assertEqual(p.returncode, 0)
- def test_invalid_signature_invalidates_cache(self):
- self.authorize_with('active')
- tmpdir = self.make_tmpdir()
- with open(os.path.join(tmpdir, 'somefile.txt'), 'w') as f:
- f.write('foo')
- # Upload a directory and get the cache file name
- p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
- env=self.ENVIRON)
- (out, err) = p.communicate()
- self.assertRegex(err.decode(), r'INFO: Creating new cache file at ')
- self.assertEqual(p.returncode, 0)
- cache_filepath = re.search(r'INFO: Creating new cache file at (.*)',
- err.decode()).groups()[0]
- self.assertTrue(os.path.isfile(cache_filepath))
- # Load the cache file contents and modify the manifest to simulate
- # an invalid access token
- with open(cache_filepath, 'r') as c:
- cache = json.load(c)
- self.assertRegex(cache['manifest'], r'\+A\S+\@')
- cache['manifest'] = re.sub(
- r'\+A.*\@',
- "+Aabcdef0123456789abcdef0123456789abcdef01@",
- cache['manifest'])
- with open(cache_filepath, 'w') as c:
- c.write(json.dumps(cache))
- # Re-run the upload and expect to get an invalid cache message
- p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
- env=self.ENVIRON)
- (out, err) = p.communicate()
- self.assertRegex(
- err.decode(),
- r'ERROR: arv-put: Resume cache contains invalid signature.*')
- self.assertEqual(p.returncode, 1)
+ def test_invalid_signature_in_cache(self):
+ for batch_mode in [False, True]:
+ self.authorize_with('active')
+ tmpdir = self.make_tmpdir()
+ with open(os.path.join(tmpdir, 'somefile.txt'), 'w') as f:
+ f.write('foo')
+ # Upload a directory and get the cache file name
+ arv_put_args = [tmpdir]
+ if batch_mode:
+ arv_put_args = ['--batch'] + arv_put_args
+ p = subprocess.Popen([sys.executable, arv_put.__file__] + arv_put_args,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ env=self.ENVIRON)
+ (_, err) = p.communicate()
+ self.assertRegex(err.decode(), r'INFO: Creating new cache file at ')
+ self.assertEqual(p.returncode, 0)
+ cache_filepath = re.search(r'INFO: Creating new cache file at (.*)',
+ err.decode()).groups()[0]
+ self.assertTrue(os.path.isfile(cache_filepath))
+ # Load the cache file contents and modify the manifest to simulate
+ # an invalid access token
+ with open(cache_filepath, 'r') as c:
+ cache = json.load(c)
+ self.assertRegex(cache['manifest'], r'\+A\S+\@')
+ cache['manifest'] = re.sub(
+ r'\+A.*\@',
+ "+Aabcdef0123456789abcdef0123456789abcdef01@",
+ cache['manifest'])
+ with open(cache_filepath, 'w') as c:
+ c.write(json.dumps(cache))
+ # Re-run the upload and expect to get an invalid cache message
+ p = subprocess.Popen([sys.executable, arv_put.__file__] + arv_put_args,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ env=self.ENVIRON)
+ (_, err) = p.communicate()
+ if not batch_mode:
+ self.assertRegex(
+ err.decode(),
+ r'ERROR: arv-put: Resume cache contains invalid signature.*')
+ self.assertEqual(p.returncode, 1)
+ else:
+ self.assertRegex(
+ err.decode(),
+ r'Invalid signatures on cache file \'.*\' while being run in \'batch mode\' -- continuing anyways.*')
+ self.assertEqual(p.returncode, 0)
def test_single_expired_signature_reuploads_file(self):
self.authorize_with('active')
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=self.ENVIRON)
- (out, err) = p.communicate()
+ (_, err) = p.communicate()
self.assertRegex(err.decode(), r'INFO: Creating new cache file at ')
self.assertEqual(p.returncode, 0)
cache_filepath = re.search(r'INFO: Creating new cache file at (.*)',
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=self.ENVIRON)
- (out, err) = p.communicate()
+ (_, err) = p.communicate()
self.assertRegex(
err.decode(),
r'WARNING: Uploaded file \'.*barfile.txt\' access token expired, will re-upload it from scratch')
c = arv_put.api_client.collections().get(uuid=updated_col['uuid']).execute()
self.assertRegex(c['manifest_text'], r'^\..* .*:44:file2\n')
+ def test_put_collection_with_utc_expiring_datetime(self):
+ tmpdir = self.make_tmpdir()
+ trash_at = (datetime.datetime.utcnow() + datetime.timedelta(days=90)).strftime('%Y%m%dT%H%MZ')
+ with open(os.path.join(tmpdir, 'file1'), 'w') as f:
+ f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
+ col = self.run_and_find_collection(
+ "",
+ ['--no-progress', '--trash-at', trash_at, tmpdir])
+ self.assertNotEqual(None, col['uuid'])
+ c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
+ self.assertEqual(ciso8601.parse_datetime(trash_at),
+ ciso8601.parse_datetime(c['trash_at']))
+
+ def test_put_collection_with_timezone_aware_expiring_datetime(self):
+ tmpdir = self.make_tmpdir()
+ trash_at = (datetime.datetime.utcnow() + datetime.timedelta(days=90)).strftime('%Y%m%dT%H%M-0300')
+ with open(os.path.join(tmpdir, 'file1'), 'w') as f:
+ f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
+ col = self.run_and_find_collection(
+ "",
+ ['--no-progress', '--trash-at', trash_at, tmpdir])
+ self.assertNotEqual(None, col['uuid'])
+ c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
+ self.assertEqual(
+ ciso8601.parse_datetime(trash_at).replace(tzinfo=None) + datetime.timedelta(hours=3),
+ ciso8601.parse_datetime(c['trash_at']).replace(tzinfo=None))
+
+ def test_put_collection_with_timezone_naive_expiring_datetime(self):
+ tmpdir = self.make_tmpdir()
+ trash_at = (datetime.datetime.utcnow() + datetime.timedelta(days=90)).strftime('%Y%m%dT%H%M')
+ with open(os.path.join(tmpdir, 'file1'), 'w') as f:
+ f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
+ col = self.run_and_find_collection(
+ "",
+ ['--no-progress', '--trash-at', trash_at, tmpdir])
+ self.assertNotEqual(None, col['uuid'])
+ c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
+ if time.daylight:
+ offset = datetime.timedelta(seconds=time.altzone)
+ else:
+ offset = datetime.timedelta(seconds=time.timezone)
+ self.assertEqual(
+ ciso8601.parse_datetime(trash_at) + offset,
+ ciso8601.parse_datetime(c['trash_at']).replace(tzinfo=None))
+
+ def test_put_collection_with_expiring_date_only(self):
+ tmpdir = self.make_tmpdir()
+ trash_at = '2140-01-01'
+ end_of_day = datetime.timedelta(hours=23, minutes=59, seconds=59)
+ with open(os.path.join(tmpdir, 'file1'), 'w') as f:
+ f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
+ col = self.run_and_find_collection(
+ "",
+ ['--no-progress', '--trash-at', trash_at, tmpdir])
+ self.assertNotEqual(None, col['uuid'])
+ c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
+ if time.daylight:
+ offset = datetime.timedelta(seconds=time.altzone)
+ else:
+ offset = datetime.timedelta(seconds=time.timezone)
+ self.assertEqual(
+ ciso8601.parse_datetime(trash_at) + end_of_day + offset,
+ ciso8601.parse_datetime(c['trash_at']).replace(tzinfo=None))
+
+ def test_put_collection_with_invalid_absolute_expiring_datetimes(self):
+ cases = ['2100', '210010','2100-10', '2100-Oct']
+ tmpdir = self.make_tmpdir()
+ with open(os.path.join(tmpdir, 'file1'), 'w') as f:
+ f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
+ for test_datetime in cases:
+ with self.assertRaises(AssertionError):
+ self.run_and_find_collection(
+ "",
+ ['--no-progress', '--trash-at', test_datetime, tmpdir])
+
+ def test_put_collection_with_relative_expiring_datetime(self):
+ expire_after = 7
+ dt_before = datetime.datetime.utcnow() + datetime.timedelta(days=expire_after)
+ tmpdir = self.make_tmpdir()
+ with open(os.path.join(tmpdir, 'file1'), 'w') as f:
+ f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
+ col = self.run_and_find_collection(
+ "",
+ ['--no-progress', '--trash-after', str(expire_after), tmpdir])
+ self.assertNotEqual(None, col['uuid'])
+ dt_after = datetime.datetime.utcnow() + datetime.timedelta(days=expire_after)
+ c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
+ trash_at = ciso8601.parse_datetime(c['trash_at']).replace(tzinfo=None)
+ self.assertTrue(dt_before < trash_at)
+ self.assertTrue(dt_after > trash_at)
+
+ def test_put_collection_with_invalid_relative_expiring_datetime(self):
+ expire_after = 0 # Must be >= 1
+ tmpdir = self.make_tmpdir()
+ with open(os.path.join(tmpdir, 'file1'), 'w') as f:
+ f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
+ with self.assertRaises(AssertionError):
+ self.run_and_find_collection(
+ "",
+ ['--no-progress', '--trash-after', str(expire_after), tmpdir])
+
def test_upload_directory_reference_without_trailing_slash(self):
tmpdir1 = self.make_tmpdir()
tmpdir2 = self.make_tmpdir()
def test_put_collection_with_storage_classes_specified(self):
collection = self.run_and_find_collection("", ['--storage-classes', 'hot'])
-
self.assertEqual(len(collection['storage_classes_desired']), 1)
self.assertEqual(collection['storage_classes_desired'][0], 'hot')
+ def test_put_collection_with_multiple_storage_classes_specified(self):
+ collection = self.run_and_find_collection("", ['--storage-classes', ' foo, bar ,baz'])
+ self.assertEqual(len(collection['storage_classes_desired']), 3)
+ self.assertEqual(collection['storage_classes_desired'], ['foo', 'bar', 'baz'])
+
def test_put_collection_without_storage_classes_specified(self):
collection = self.run_and_find_collection("")
-
self.assertEqual(len(collection['storage_classes_desired']), 1)
self.assertEqual(collection['storage_classes_desired'][0], 'default')