X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/1487c0577921c88801c377d753d1322fd1485968..e289b9ad5959a76795681ca95d310bad2288656a:/sdk/python/tests/test_arv_put.py diff --git a/sdk/python/tests/test_arv_put.py b/sdk/python/tests/test_arv_put.py index 87facac658..547cfc296b 100644 --- a/sdk/python/tests/test_arv_put.py +++ b/sdk/python/tests/test_arv_put.py @@ -2,6 +2,7 @@ # -*- coding: utf-8 -*- import apiclient +import io import mock import os import pwd @@ -21,6 +22,7 @@ from cStringIO import StringIO import arvados import arvados.commands.put as arv_put +import arvados_testutil as tutil from arvados_testutil import ArvadosBaseTestCase, fake_httplib2_response import run_test_server @@ -30,9 +32,7 @@ class ArvadosPutResumeCacheTest(ArvadosBaseTestCase): [], ['/dev/null'], ['/dev/null', '--filename', 'empty'], - ['/tmp'], - ['/tmp', '--max-manifest-depth', '0'], - ['/tmp', '--max-manifest-depth', '1'] + ['/tmp'] ] def tearDown(self): @@ -239,18 +239,72 @@ class ArvadosPutResumeCacheTest(ArvadosBaseTestCase): class ArvPutUploadJobTest(run_test_server.TestCaseWithServers, ArvadosBaseTestCase): + def setUp(self): super(ArvPutUploadJobTest, self).setUp() run_test_server.authorize_with('active') - self.exit_lock = threading.Lock() - self.save_manifest_lock = threading.Lock() + # Temp files creation + self.tempdir = tempfile.mkdtemp() + subdir = os.path.join(self.tempdir, 'subdir') + os.mkdir(subdir) + data = "x" * 1024 # 1 KB + for i in range(1, 5): + with open(os.path.join(self.tempdir, str(i)), 'w') as f: + f.write(data * i) + with open(os.path.join(subdir, 'otherfile'), 'w') as f: + f.write(data * 5) + # Large temp file for resume test + _, self.large_file_name = tempfile.mkstemp() + fileobj = open(self.large_file_name, 'w') + # Make sure to write just a little more than one block + for _ in range((arvados.config.KEEP_BLOCK_SIZE/(1024*1024))+1): + data = random.choice(['x', 'y', 'z']) * 1024 * 1024 # 1 MB + fileobj.write(data) + fileobj.close() + # Temp dir containing small files to be repacked + self.small_files_dir = tempfile.mkdtemp() + data = 'y' * 1024 * 1024 # 1 MB + for i in range(1, 70): + with open(os.path.join(self.small_files_dir, str(i)), 'w') as f: + f.write(data + str(i)) + self.arvfile_write = getattr(arvados.arvfile.ArvadosFileWriter, 'write') + # Temp dir to hold a symlink to other temp dir + self.tempdir_with_symlink = tempfile.mkdtemp() + os.symlink(self.tempdir, os.path.join(self.tempdir_with_symlink, 'linkeddir1')) + os.symlink(self.tempdir, os.path.join(self.tempdir_with_symlink, 'linkeddir2')) def tearDown(self): super(ArvPutUploadJobTest, self).tearDown() + shutil.rmtree(self.tempdir) + os.unlink(self.large_file_name) + shutil.rmtree(self.small_files_dir) + shutil.rmtree(self.tempdir_with_symlink) + + def test_symlinks_are_followed_by_default(self): + cwriter = arv_put.ArvPutUploadJob([self.tempdir_with_symlink]) + cwriter.start(save_collection=False) + self.assertIn('linkeddir1', cwriter.manifest_text()) + cwriter.destroy_cache() + + def test_symlinks_are_followed_only_once(self): + cwriter = arv_put.ArvPutUploadJob([self.tempdir_with_symlink], + follow_links=True) + cwriter.start(save_collection=False) + self.assertIn('linkeddir1', cwriter.manifest_text()) + self.assertNotIn('linkeddir2', cwriter.manifest_text()) + cwriter.destroy_cache() + + def test_symlinks_are_not_followed_when_requested(self): + cwriter = arv_put.ArvPutUploadJob([self.tempdir_with_symlink], + follow_links=False) + cwriter.start(save_collection=False) + self.assertNotIn('linkeddir1', cwriter.manifest_text()) + self.assertNotIn('linkeddir2', cwriter.manifest_text()) + cwriter.destroy_cache() def test_writer_works_without_cache(self): cwriter = arv_put.ArvPutUploadJob(['/dev/null'], resume=False) - cwriter.start() + cwriter.start(save_collection=False) self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text()) def test_writer_works_with_cache(self): @@ -258,13 +312,13 @@ class ArvPutUploadJobTest(run_test_server.TestCaseWithServers, f.write('foo') f.flush() cwriter = arv_put.ArvPutUploadJob([f.name]) - cwriter.start() - self.assertEqual(3, cwriter.bytes_written) + cwriter.start(save_collection=False) + self.assertEqual(3, cwriter.bytes_written - cwriter.bytes_skipped) # Don't destroy the cache, and start another upload cwriter_new = arv_put.ArvPutUploadJob([f.name]) - cwriter_new.start() - self.assertEqual(0, cwriter_new.bytes_written) + cwriter_new.start(save_collection=False) cwriter_new.destroy_cache() + self.assertEqual(0, cwriter_new.bytes_written - cwriter_new.bytes_skipped) def make_progress_tester(self): progression = [] @@ -280,88 +334,191 @@ class ArvPutUploadJobTest(run_test_server.TestCaseWithServers, progression, reporter = self.make_progress_tester() cwriter = arv_put.ArvPutUploadJob([f.name], reporter=reporter, bytes_expected=expect_count) - cwriter.start() + cwriter.start(save_collection=False) cwriter.destroy_cache() self.assertIn((3, expect_count), progression) def test_writer_upload_directory(self): - tempdir = tempfile.mkdtemp() - subdir = os.path.join(tempdir, 'subdir') - os.mkdir(subdir) - data = "x" * 1024 # 1 KB - for i in range(1, 5): - with open(os.path.join(tempdir, str(i)), 'w') as f: - f.write(data * i) - with open(os.path.join(subdir, 'otherfile'), 'w') as f: - f.write(data * 5) - cwriter = arv_put.ArvPutUploadJob([tempdir]) - cwriter.start() + cwriter = arv_put.ArvPutUploadJob([self.tempdir]) + cwriter.start(save_collection=False) cwriter.destroy_cache() - shutil.rmtree(tempdir) self.assertEqual(1024*(1+2+3+4+5), cwriter.bytes_written) def test_resume_large_file_upload(self): - # Proxying ArvadosFile.writeto() method to be able to synchronize it - # with partial manifest saves - orig_func = getattr(arvados.arvfile.ArvadosFile, 'writeto') - def wrapped_func(*args, **kwargs): - data = args[2] + def wrapped_write(*args, **kwargs): + data = args[1] + # Exit only on last block if len(data) < arvados.config.KEEP_BLOCK_SIZE: - # Lock on the last block write call, waiting for the - # manifest to be saved - self.exit_lock.acquire() - raise SystemExit('Test exception') - ret = orig_func(*args, **kwargs) - self.save_manifest_lock.release() - return ret - setattr(arvados.arvfile.ArvadosFile, 'writeto', wrapped_func) - # Take advantage of the reporter feature to sync the partial - # manifest writing with the simulated upload error. - def fake_reporter(written, expected): - # Wait until there's something to save - self.save_manifest_lock.acquire() - # Once the partial manifest is saved, allow exiting - self.exit_lock.release() - # Create random data to be uploaded - md5_original = hashlib.md5() - _, filename = tempfile.mkstemp() - fileobj = open(filename, 'w') - # Make sure to write just a little more than one block - for _ in range((arvados.config.KEEP_BLOCK_SIZE/(1024*1024))+1): - data = random.choice(['x', 'y', 'z']) * 1024 * 1024 # 1 MB - md5_original.update(data) - fileobj.write(data) - fileobj.close() - self.exit_lock.acquire() - self.save_manifest_lock.acquire() - writer = arv_put.ArvPutUploadJob([filename], - reporter=fake_reporter, - update_time=0.1) - # First upload: partially completed with simulated error - try: - self.assertRaises(SystemExit, writer.start()) - except SystemExit: - # Avoid getting a ResumeCacheConflict on the 2nd run - writer._cache_file.close() - self.assertLess(writer.bytes_written, os.path.getsize(filename)) - - # Restore the ArvadosFile.writeto() method to before retrying - setattr(arvados.arvfile.ArvadosFile, 'writeto', orig_func) - writer_new = arv_put.ArvPutUploadJob([filename]) - writer_new.start() - writer_new.destroy_cache() - self.assertEqual(os.path.getsize(filename), - writer.bytes_written + writer_new.bytes_written) - # Read the uploaded file to compare its md5 hash - md5_uploaded = hashlib.md5() - c = arvados.collection.Collection(writer_new.manifest_text()) - with c.open(os.path.basename(filename), 'r') as f: - new_data = f.read() - md5_uploaded.update(new_data) - self.assertEqual(md5_original.hexdigest(), md5_uploaded.hexdigest()) - # Cleaning up - os.unlink(filename) - + # Simulate a checkpoint before quitting. Ensure block commit. + self.writer._update(final=True) + raise SystemExit("Simulated error") + return self.arvfile_write(*args, **kwargs) + + with mock.patch('arvados.arvfile.ArvadosFileWriter.write', + autospec=True) as mocked_write: + mocked_write.side_effect = wrapped_write + writer = arv_put.ArvPutUploadJob([self.large_file_name], + replication_desired=1) + # We'll be accessing from inside the wrapper + self.writer = writer + with self.assertRaises(SystemExit): + writer.start(save_collection=False) + # Confirm that the file was partially uploaded + self.assertGreater(writer.bytes_written, 0) + self.assertLess(writer.bytes_written, + os.path.getsize(self.large_file_name)) + # Retry the upload + writer2 = arv_put.ArvPutUploadJob([self.large_file_name], + replication_desired=1) + writer2.start(save_collection=False) + self.assertEqual(writer.bytes_written + writer2.bytes_written - writer2.bytes_skipped, + os.path.getsize(self.large_file_name)) + writer2.destroy_cache() + del(self.writer) + + # Test for bug #11002 + def test_graceful_exit_while_repacking_small_blocks(self): + def wrapped_commit(*args, **kwargs): + raise SystemExit("Simulated error") + + with mock.patch('arvados.arvfile._BlockManager.commit_bufferblock', + autospec=True) as mocked_commit: + mocked_commit.side_effect = wrapped_commit + # Upload a little more than 1 block, wrapped_commit will make the first block + # commit to fail. + # arv-put should not exit with an exception by trying to commit the collection + # as it's in an inconsistent state. + writer = arv_put.ArvPutUploadJob([self.small_files_dir], + replication_desired=1) + try: + with self.assertRaises(SystemExit): + writer.start(save_collection=False) + except arvados.arvfile.UnownedBlockError: + self.fail("arv-put command is trying to use a corrupted BlockManager. See https://dev.arvados.org/issues/11002") + writer.destroy_cache() + + def test_no_resume_when_asked(self): + def wrapped_write(*args, **kwargs): + data = args[1] + # Exit only on last block + if len(data) < arvados.config.KEEP_BLOCK_SIZE: + # Simulate a checkpoint before quitting. + self.writer._update() + raise SystemExit("Simulated error") + return self.arvfile_write(*args, **kwargs) + + with mock.patch('arvados.arvfile.ArvadosFileWriter.write', + autospec=True) as mocked_write: + mocked_write.side_effect = wrapped_write + writer = arv_put.ArvPutUploadJob([self.large_file_name], + replication_desired=1) + # We'll be accessing from inside the wrapper + self.writer = writer + with self.assertRaises(SystemExit): + writer.start(save_collection=False) + # Confirm that the file was partially uploaded + self.assertGreater(writer.bytes_written, 0) + self.assertLess(writer.bytes_written, + os.path.getsize(self.large_file_name)) + # Retry the upload, this time without resume + writer2 = arv_put.ArvPutUploadJob([self.large_file_name], + replication_desired=1, + resume=False) + writer2.start(save_collection=False) + self.assertEqual(writer2.bytes_skipped, 0) + self.assertEqual(writer2.bytes_written, + os.path.getsize(self.large_file_name)) + writer2.destroy_cache() + del(self.writer) + + def test_no_resume_when_no_cache(self): + def wrapped_write(*args, **kwargs): + data = args[1] + # Exit only on last block + if len(data) < arvados.config.KEEP_BLOCK_SIZE: + # Simulate a checkpoint before quitting. + self.writer._update() + raise SystemExit("Simulated error") + return self.arvfile_write(*args, **kwargs) + + with mock.patch('arvados.arvfile.ArvadosFileWriter.write', + autospec=True) as mocked_write: + mocked_write.side_effect = wrapped_write + writer = arv_put.ArvPutUploadJob([self.large_file_name], + replication_desired=1) + # We'll be accessing from inside the wrapper + self.writer = writer + with self.assertRaises(SystemExit): + writer.start(save_collection=False) + # Confirm that the file was partially uploaded + self.assertGreater(writer.bytes_written, 0) + self.assertLess(writer.bytes_written, + os.path.getsize(self.large_file_name)) + # Retry the upload, this time without cache usage + writer2 = arv_put.ArvPutUploadJob([self.large_file_name], + replication_desired=1, + resume=False, + use_cache=False) + writer2.start(save_collection=False) + self.assertEqual(writer2.bytes_skipped, 0) + self.assertEqual(writer2.bytes_written, + os.path.getsize(self.large_file_name)) + writer2.destroy_cache() + del(self.writer) + + def test_dry_run_feature(self): + def wrapped_write(*args, **kwargs): + data = args[1] + # Exit only on last block + if len(data) < arvados.config.KEEP_BLOCK_SIZE: + # Simulate a checkpoint before quitting. + self.writer._update() + raise SystemExit("Simulated error") + return self.arvfile_write(*args, **kwargs) + + with mock.patch('arvados.arvfile.ArvadosFileWriter.write', + autospec=True) as mocked_write: + mocked_write.side_effect = wrapped_write + writer = arv_put.ArvPutUploadJob([self.large_file_name], + replication_desired=1) + # We'll be accessing from inside the wrapper + self.writer = writer + with self.assertRaises(SystemExit): + writer.start(save_collection=False) + # Confirm that the file was partially uploaded + self.assertGreater(writer.bytes_written, 0) + self.assertLess(writer.bytes_written, + os.path.getsize(self.large_file_name)) + # Retry the upload using dry_run to check if there is a pending upload + writer2 = arv_put.ArvPutUploadJob([self.large_file_name], + replication_desired=1, + dry_run=True) + with self.assertRaises(arv_put.ArvPutUploadIsPending): + writer2.start(save_collection=False) + # Complete the pending upload + writer3 = arv_put.ArvPutUploadJob([self.large_file_name], + replication_desired=1) + writer3.start(save_collection=False) + # Confirm there's no pending upload with dry_run=True + writer4 = arv_put.ArvPutUploadJob([self.large_file_name], + replication_desired=1, + dry_run=True) + with self.assertRaises(arv_put.ArvPutUploadNotPending): + writer4.start(save_collection=False) + writer4.destroy_cache() + # Test obvious cases + with self.assertRaises(arv_put.ArvPutUploadIsPending): + arv_put.ArvPutUploadJob([self.large_file_name], + replication_desired=1, + dry_run=True, + resume=False, + use_cache=False) + with self.assertRaises(arv_put.ArvPutUploadIsPending): + arv_put.ArvPutUploadJob([self.large_file_name], + replication_desired=1, + dry_run=True, + resume=False) + del(self.writer) class ArvadosExpectedBytesTest(ArvadosBaseTestCase): TEST_SIZE = os.path.getsize(__file__) @@ -435,6 +592,15 @@ class ArvadosPutTest(run_test_server.TestCaseWithServers, ArvadosBaseTestCase): delattr(self, outbuf) super(ArvadosPutTest, self).tearDown() + def test_version_argument(self): + err = io.BytesIO() + out = io.BytesIO() + with tutil.redirected_streams(stdout=out, stderr=err): + with self.assertRaises(SystemExit): + self.call_main_with_args(['--version']) + self.assertEqual(out.getvalue(), '') + self.assertRegexpMatches(err.getvalue(), "[0-9]+\.[0-9]+\.[0-9]+") + def test_simple_file_put(self): self.call_main_on_test_file() @@ -461,9 +627,8 @@ class ArvadosPutTest(run_test_server.TestCaseWithServers, ArvadosBaseTestCase): os.chmod(cachedir, 0o700) def test_put_block_replication(self): - with mock.patch('arvados.collection.KeepClient.local_store_put') as put_mock, \ - mock.patch('arvados.commands.put.ResumeCache.load') as cache_mock: - cache_mock.side_effect = ValueError + self.call_main_on_test_file() + with mock.patch('arvados.collection.KeepClient.local_store_put') as put_mock: put_mock.return_value = 'acbd18db4cc2f85cedef654fccc4a4d8+3' self.call_main_on_test_file(['--replication', '1']) self.call_main_on_test_file(['--replication', '4']) @@ -505,12 +670,13 @@ class ArvadosPutTest(run_test_server.TestCaseWithServers, ArvadosBaseTestCase): coll_save_mock = mock.Mock(name='arv.collection.Collection().save_new()') coll_save_mock.side_effect = arvados.errors.ApiError( fake_httplib2_response(403), '{}') - arvados.collection.Collection.save_new = coll_save_mock - with self.assertRaises(SystemExit) as exc_test: - self.call_main_with_args(['/dev/null']) - self.assertLess(0, exc_test.exception.args[0]) - self.assertLess(0, coll_save_mock.call_count) - self.assertEqual("", self.main_stdout.getvalue()) + with mock.patch('arvados.collection.Collection.save_new', + new=coll_save_mock): + with self.assertRaises(SystemExit) as exc_test: + self.call_main_with_args(['/dev/null']) + self.assertLess(0, exc_test.exception.args[0]) + self.assertLess(0, coll_save_mock.call_count) + self.assertEqual("", self.main_stdout.getvalue()) class ArvPutIntegrationTest(run_test_server.TestCaseWithServers, @@ -651,6 +817,21 @@ class ArvPutIntegrationTest(run_test_server.TestCaseWithServers, self.assertEqual(1, len(collection_list)) return collection_list[0] + def test_put_collection_with_later_update(self): + tmpdir = self.make_tmpdir() + with open(os.path.join(tmpdir, 'file1'), 'w') as f: + f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box') + col = self.run_and_find_collection("", ['--no-progress', tmpdir]) + self.assertNotEqual(None, col['uuid']) + # Add a new file to the directory + with open(os.path.join(tmpdir, 'file2'), 'w') as f: + f.write('The quick brown fox jumped over the lazy dog') + updated_col = self.run_and_find_collection("", ['--no-progress', '--update-collection', col['uuid'], tmpdir]) + self.assertEqual(col['uuid'], updated_col['uuid']) + # Get the manifest and check that the new file is being included + c = arv_put.api_client.collections().get(uuid=updated_col['uuid']).execute() + self.assertRegexpMatches(c['manifest_text'], r'^\. .*:44:file2\n') + def test_put_collection_with_high_redundancy(self): # Write empty data: we're not testing CollectionWriter, just # making sure collections.create tells the API server what our