def test_cache_names_stable(self):
for argset in self.CACHE_ARGSET:
- self.assertEquals(self.cache_path_from_arglist(argset),
+ self.assertEqual(self.cache_path_from_arglist(argset),
self.cache_path_from_arglist(argset),
"cache name changed for {}".format(argset))
"path too exotic: {}".format(path))
def test_cache_names_ignore_argument_order(self):
- self.assertEquals(
+ self.assertEqual(
self.cache_path_from_arglist(['a', 'b', 'c']),
self.cache_path_from_arglist(['c', 'a', 'b']))
- self.assertEquals(
+ self.assertEqual(
self.cache_path_from_arglist(['-', '--filename', 'stdin']),
self.cache_path_from_arglist(['--filename', 'stdin', '-']))
args = arv_put.parse_arguments(['/tmp'])
args.filename = 'tmp'
path2 = arv_put.ResumeCache.make_path(args)
- self.assertEquals(path1, path2,
+ self.assertEqual(path1, path2,
"cache path considered --filename for directory")
- self.assertEquals(
+ self.assertEqual(
self.cache_path_from_arglist(['-']),
self.cache_path_from_arglist(['-', '--max-manifest-depth', '1']),
"cache path considered --max-manifest-depth for file")
def test_cache_names_treat_negative_manifest_depths_identically(self):
base_args = ['/tmp', '--max-manifest-depth']
- self.assertEquals(
+ self.assertEqual(
self.cache_path_from_arglist(base_args + ['-1']),
self.cache_path_from_arglist(base_args + ['-2']))
def test_cache_names_treat_stdin_consistently(self):
- self.assertEquals(
+ self.assertEqual(
self.cache_path_from_arglist(['-', '--filename', 'test']),
self.cache_path_from_arglist(['/dev/stdin', '--filename', 'test']))
def test_cache_names_identical_for_synonymous_names(self):
- self.assertEquals(
+ self.assertEqual(
self.cache_path_from_arglist(['.']),
self.cache_path_from_arglist([os.path.realpath('.')]))
testdir = self.make_tmpdir()
looplink = os.path.join(testdir, 'loop')
os.symlink(testdir, looplink)
- self.assertEquals(
+ self.assertEqual(
self.cache_path_from_arglist([testdir]),
self.cache_path_from_arglist([looplink]))
with tempfile.NamedTemporaryFile() as cachefile:
self.last_cache = arv_put.ResumeCache(cachefile.name)
self.last_cache.save(thing)
- self.assertEquals(thing, self.last_cache.load())
+ self.assertEqual(thing, self.last_cache.load())
def test_empty_cache(self):
with tempfile.NamedTemporaryFile() as cachefile:
cache.save(thing)
cache.close()
self.last_cache = arv_put.ResumeCache(path)
- self.assertEquals(thing, self.last_cache.load())
+ self.assertEqual(thing, self.last_cache.load())
def test_multiple_cache_writes(self):
thing = ['short', 'list']
# sure the cache file gets truncated.
self.last_cache.save(['long', 'long', 'list'])
self.last_cache.save(thing)
- self.assertEquals(thing, self.last_cache.load())
+ self.assertEqual(thing, self.last_cache.load())
def test_cache_is_locked(self):
with tempfile.NamedTemporaryFile() as cachefile:
cwriter.write_file('/dev/null')
cwriter.cache_state()
self.assertTrue(self.cache.load())
- self.assertEquals(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text())
+ self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text())
def test_writer_works_without_cache(self):
cwriter = arv_put.ArvPutCollectionWriter()
cwriter.write_file('/dev/null')
- self.assertEquals(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text())
+ self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text())
def test_writer_resumes_from_cache(self):
cwriter = arv_put.ArvPutCollectionWriter(self.cache)
cwriter.cache_state()
new_writer = arv_put.ArvPutCollectionWriter.from_cache(
self.cache)
- self.assertEquals(
+ self.assertEqual(
". 098f6bcd4621d373cade4e832627b4f6+4 0:4:test\n",
new_writer.manifest_text())
cwriter.write_file(testfile.name, 'test')
new_writer = arv_put.ArvPutCollectionWriter.from_cache(self.cache)
new_writer.write_file('/dev/null')
- self.assertEquals(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", new_writer.manifest_text())
+ self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", new_writer.manifest_text())
def test_new_writer_from_empty_cache(self):
cwriter = arv_put.ArvPutCollectionWriter.from_cache(self.cache)
cwriter.write_file('/dev/null')
- self.assertEquals(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text())
+ self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text())
def test_writer_resumable_after_arbitrary_bytes(self):
cwriter = arv_put.ArvPutCollectionWriter(self.cache)
cwriter.cache_state()
new_writer = arv_put.ArvPutCollectionWriter.from_cache(
self.cache)
- self.assertEquals(cwriter.manifest_text(), new_writer.manifest_text())
+ self.assertEqual(cwriter.manifest_text(), new_writer.manifest_text())
def make_progress_tester(self):
progression = []
TEST_SIZE = os.path.getsize(__file__)
def test_expected_bytes_for_file(self):
- self.assertEquals(self.TEST_SIZE,
+ self.assertEqual(self.TEST_SIZE,
arv_put.expected_bytes_for([__file__]))
def test_expected_bytes_for_tree(self):
tree = self.make_tmpdir()
shutil.copyfile(__file__, os.path.join(tree, 'one'))
shutil.copyfile(__file__, os.path.join(tree, 'two'))
- self.assertEquals(self.TEST_SIZE * 2,
+ self.assertEqual(self.TEST_SIZE * 2,
arv_put.expected_bytes_for([tree]))
- self.assertEquals(self.TEST_SIZE * 3,
+ self.assertEqual(self.TEST_SIZE * 3,
arv_put.expected_bytes_for([tree, __file__]))
def test_expected_bytes_for_device(self):
arv_put.ResumeCache.CACHE_DIR = orig_cachedir
os.chmod(cachedir, 0o700)
+ def test_normalize(self):
+ testfile1 = self.make_test_file()
+ testfile2 = self.make_test_file()
+ test_paths = [testfile1.name, testfile2.name]
+ # Reverse-sort the paths, so normalization must change their order.
+ test_paths.sort(reverse=True)
+ self.call_main_with_args(['--stream', '--no-progress', '--normalize'] +
+ test_paths)
+ manifest = self.main_stdout.getvalue()
+ # Assert the second file we specified appears first in the manifest.
+ file_indices = [manifest.find(':' + os.path.basename(path))
+ for path in test_paths]
+ self.assertGreater(*file_indices)
+
def test_error_name_without_collection(self):
self.assertRaises(SystemExit, self.call_main_with_args,
['--name', 'test without Collection',
class ArvPutIntegrationTest(run_test_server.TestCaseWithServers,
ArvadosBaseTestCase):
def _getKeepServerConfig():
- for config_file in ['application.yml', 'application.default.yml']:
- with open(os.path.join(run_test_server.SERVICES_SRC_DIR,
- "api", "config", config_file)) as f:
+ for config_file, mandatory in [
+ ['application.yml', False], ['application.default.yml', True]]:
+ path = os.path.join(run_test_server.SERVICES_SRC_DIR,
+ "api", "config", config_file)
+ if not mandatory and not os.path.exists(path):
+ continue
+ with open(path) as f:
rails_config = yaml.load(f.read())
for config_section in ['test', 'common']:
try:
"ARVADOS_API_HOST_INSECURE",
"ARVADOS_API_TOKEN"]:
self.ENVIRON[v] = arvados.config.settings()[v]
- arv_put.api_client = arvados.api('v1', cache=False)
+ arv_put.api_client = arvados.api('v1')
def current_user(self):
return arv_put.api_client.users().current().execute()
def test_check_real_project_found(self):
self.authorize_with('active')
- self.assertTrue(arv_put.desired_project_uuid(arv_put.api_client, self.PROJECT_UUID),
+ self.assertTrue(arv_put.desired_project_uuid(arv_put.api_client, self.PROJECT_UUID, 0),
"did not correctly find test fixture project")
def test_check_error_finding_nonexistent_uuid(self):
BAD_UUID = 'zzzzz-zzzzz-zzzzzzzzzzzzzzz'
self.authorize_with('active')
try:
- result = arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID)
+ result = arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID,
+ 0)
except ValueError as error:
self.assertIn(BAD_UUID, error.message)
else:
BAD_UUID = 'zzzzz-tpzed-zzzzzzzzzzzzzzz'
self.authorize_with('active')
with self.assertRaises(apiclient.errors.HttpError):
- result = arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID)
+ result = arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID,
+ 0)
def test_short_put_from_stdin(self):
# Have to run this as an integration test since arv-put can't
stdin=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, env=self.ENVIRON)
stdout, stderr = pipe.communicate(text)
- collection_list = arvados.api('v1', cache=False).collections().list(
- filters=[['portable_data_hash', '=', stdout.strip()]]).execute().get('items', [])
+ search_key = ('portable_data_hash'
+ if '--portable-data-hash' in extra_args else 'uuid')
+ collection_list = arvados.api('v1').collections().list(
+ filters=[[search_key, '=', stdout.strip()]]).execute().get('items', [])
self.assertEqual(1, len(collection_list))
return collection_list[0]
+ def test_put_collection_with_high_redundancy(self):
+ # Write empty data: we're not testing CollectionWriter, just
+ # making sure collections.create tells the API server what our
+ # desired replication level is.
+ collection = self.run_and_find_collection("", ['--replication', '4'])
+ self.assertEqual(4, collection['replication_desired'])
+
+ def test_put_collection_with_default_redundancy(self):
+ collection = self.run_and_find_collection("")
+ self.assertEqual(None, collection['replication_desired'])
+
def test_put_collection_with_unnamed_project_link(self):
- link = self.run_and_find_collection("Test unnamed collection",
- ['--portable-data-hash', '--project-uuid', self.PROJECT_UUID])
+ link = self.run_and_find_collection(
+ "Test unnamed collection",
+ ['--portable-data-hash', '--project-uuid', self.PROJECT_UUID])
username = pwd.getpwuid(os.getuid()).pw_name
self.assertRegexpMatches(
link['name'],
def test_put_collection_with_name_and_no_project(self):
link_name = 'Test Collection Link in home project'
- collection = self.run_and_find_collection("Test named collection in home project",
- ['--portable-data-hash', '--name', link_name])
+ collection = self.run_and_find_collection(
+ "Test named collection in home project",
+ ['--portable-data-hash', '--name', link_name])
self.assertEqual(link_name, collection['name'])
my_user_uuid = self.current_user()['uuid']
self.assertEqual(my_user_uuid, collection['owner_uuid'])