data = random.choice(['x', 'y', 'z']) * 1024 * 1024 # 1 MB
fileobj.write(data)
fileobj.close()
+ # Temp dir containing small files to be repacked
+ self.small_files_dir = tempfile.mkdtemp()
+ data = 'y' * 1024 * 1024 # 1 MB
+ for i in range(1, 70):
+ with open(os.path.join(self.small_files_dir, str(i)), 'w') as f:
+ f.write(data + str(i))
self.arvfile_write = getattr(arvados.arvfile.ArvadosFileWriter, 'write')
+ # Temp dir to hold a symlink to other temp dir
+ self.tempdir_with_symlink = tempfile.mkdtemp()
+ os.symlink(self.tempdir, os.path.join(self.tempdir_with_symlink, 'linkeddir1'))
+ os.symlink(self.tempdir, os.path.join(self.tempdir_with_symlink, 'linkeddir2'))
def tearDown(self):
super(ArvPutUploadJobTest, self).tearDown()
shutil.rmtree(self.tempdir)
os.unlink(self.large_file_name)
+ shutil.rmtree(self.small_files_dir)
+ shutil.rmtree(self.tempdir_with_symlink)
+
+ def test_symlinks_are_followed_by_default(self):
+ cwriter = arv_put.ArvPutUploadJob([self.tempdir_with_symlink])
+ cwriter.start(save_collection=False)
+ self.assertIn('linkeddir1', cwriter.manifest_text())
+ cwriter.destroy_cache()
+
+ def test_symlinks_are_followed_only_once(self):
+ cwriter = arv_put.ArvPutUploadJob([self.tempdir_with_symlink])
+ cwriter.start(save_collection=False)
+ self.assertIn('linkeddir1', cwriter.manifest_text())
+ self.assertNotIn('linkeddir2', cwriter.manifest_text())
+ cwriter.destroy_cache()
+
+ def test_symlinks_are_not_followed_when_requested(self):
+ cwriter = arv_put.ArvPutUploadJob([self.tempdir_with_symlink],
+ follow_links=False)
+ cwriter.start(save_collection=False)
+ self.assertNotIn('linkeddir1', cwriter.manifest_text())
+ self.assertNotIn('linkeddir2', cwriter.manifest_text())
+ cwriter.destroy_cache()
def test_writer_works_without_cache(self):
cwriter = arv_put.ArvPutUploadJob(['/dev/null'], resume=False)
data = args[1]
# Exit only on last block
if len(data) < arvados.config.KEEP_BLOCK_SIZE:
+ # Simulate a checkpoint before quitting. Ensure block commit.
+ self.writer._update(final=True)
raise SystemExit("Simulated error")
return self.arvfile_write(*args, **kwargs)
mocked_write.side_effect = wrapped_write
writer = arv_put.ArvPutUploadJob([self.large_file_name],
replication_desired=1)
+ # We'll be accessing from inside the wrapper
+ self.writer = writer
with self.assertRaises(SystemExit):
writer.start(save_collection=False)
# Confirm that the file was partially uploaded
self.assertEqual(writer.bytes_written + writer2.bytes_written - writer2.bytes_skipped,
os.path.getsize(self.large_file_name))
writer2.destroy_cache()
+ del(self.writer)
+
+ # Test for bug #11002
+ def test_graceful_exit_while_repacking_small_blocks(self):
+ def wrapped_commit(*args, **kwargs):
+ raise SystemExit("Simulated error")
+
+ with mock.patch('arvados.arvfile._BlockManager.commit_bufferblock',
+ autospec=True) as mocked_commit:
+ mocked_commit.side_effect = wrapped_commit
+ # Upload a little more than 1 block, wrapped_commit will make the first block
+ # commit to fail.
+ # arv-put should not exit with an exception by trying to commit the collection
+ # as it's in an inconsistent state.
+ writer = arv_put.ArvPutUploadJob([self.small_files_dir],
+ replication_desired=1)
+ try:
+ with self.assertRaises(SystemExit):
+ writer.start(save_collection=False)
+ except arvados.arvfile.UnownedBlockError:
+ self.fail("arv-put command is trying to use a corrupted BlockManager. See https://dev.arvados.org/issues/11002")
+ writer.destroy_cache()
def test_no_resume_when_asked(self):
def wrapped_write(*args, **kwargs):
data = args[1]
# Exit only on last block
if len(data) < arvados.config.KEEP_BLOCK_SIZE:
+ # Simulate a checkpoint before quitting.
+ self.writer._update()
raise SystemExit("Simulated error")
return self.arvfile_write(*args, **kwargs)
mocked_write.side_effect = wrapped_write
writer = arv_put.ArvPutUploadJob([self.large_file_name],
replication_desired=1)
+ # We'll be accessing from inside the wrapper
+ self.writer = writer
with self.assertRaises(SystemExit):
writer.start(save_collection=False)
# Confirm that the file was partially uploaded
self.assertEqual(writer2.bytes_written,
os.path.getsize(self.large_file_name))
writer2.destroy_cache()
+ del(self.writer)
def test_no_resume_when_no_cache(self):
def wrapped_write(*args, **kwargs):
data = args[1]
# Exit only on last block
if len(data) < arvados.config.KEEP_BLOCK_SIZE:
+ # Simulate a checkpoint before quitting.
+ self.writer._update()
raise SystemExit("Simulated error")
return self.arvfile_write(*args, **kwargs)
mocked_write.side_effect = wrapped_write
writer = arv_put.ArvPutUploadJob([self.large_file_name],
replication_desired=1)
+ # We'll be accessing from inside the wrapper
+ self.writer = writer
with self.assertRaises(SystemExit):
writer.start(save_collection=False)
# Confirm that the file was partially uploaded
self.assertEqual(writer2.bytes_written,
os.path.getsize(self.large_file_name))
writer2.destroy_cache()
+ del(self.writer)
+ def test_dry_run_feature(self):
+ def wrapped_write(*args, **kwargs):
+ data = args[1]
+ # Exit only on last block
+ if len(data) < arvados.config.KEEP_BLOCK_SIZE:
+ # Simulate a checkpoint before quitting.
+ self.writer._update()
+ raise SystemExit("Simulated error")
+ return self.arvfile_write(*args, **kwargs)
+
+ with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
+ autospec=True) as mocked_write:
+ mocked_write.side_effect = wrapped_write
+ writer = arv_put.ArvPutUploadJob([self.large_file_name],
+ replication_desired=1)
+ # We'll be accessing from inside the wrapper
+ self.writer = writer
+ with self.assertRaises(SystemExit):
+ writer.start(save_collection=False)
+ # Confirm that the file was partially uploaded
+ self.assertGreater(writer.bytes_written, 0)
+ self.assertLess(writer.bytes_written,
+ os.path.getsize(self.large_file_name))
+ # Retry the upload using dry_run to check if there is a pending upload
+ writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
+ replication_desired=1,
+ dry_run=True)
+ with self.assertRaises(arv_put.ArvPutUploadIsPending):
+ writer2.start(save_collection=False)
+ # Complete the pending upload
+ writer3 = arv_put.ArvPutUploadJob([self.large_file_name],
+ replication_desired=1)
+ writer3.start(save_collection=False)
+ # Confirm there's no pending upload with dry_run=True
+ writer4 = arv_put.ArvPutUploadJob([self.large_file_name],
+ replication_desired=1,
+ dry_run=True)
+ with self.assertRaises(arv_put.ArvPutUploadNotPending):
+ writer4.start(save_collection=False)
+ writer4.destroy_cache()
+ # Test obvious cases
+ with self.assertRaises(arv_put.ArvPutUploadIsPending):
+ arv_put.ArvPutUploadJob([self.large_file_name],
+ replication_desired=1,
+ dry_run=True,
+ resume=False,
+ use_cache=False)
+ with self.assertRaises(arv_put.ArvPutUploadIsPending):
+ arv_put.ArvPutUploadJob([self.large_file_name],
+ replication_desired=1,
+ dry_run=True,
+ resume=False)
+ del(self.writer)
class ArvadosExpectedBytesTest(ArvadosBaseTestCase):
TEST_SIZE = os.path.getsize(__file__)