X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/67c0bb9abe2f9069761a271145ae48368ee3d7d6..ca56623679bcf5733a3266711f513f8c23f8b0df:/sdk/python/tests/test_arv_put.py diff --git a/sdk/python/tests/test_arv_put.py b/sdk/python/tests/test_arv_put.py old mode 100755 new mode 100644 index a373de9df2..f1dfd03def --- a/sdk/python/tests/test_arv_put.py +++ b/sdk/python/tests/test_arv_put.py @@ -2,6 +2,7 @@ # -*- coding: utf-8 -*- import apiclient +import io import mock import os import pwd @@ -13,8 +14,7 @@ import tempfile import time import unittest import yaml -import multiprocessing -import shutil +import threading import hashlib import random @@ -22,6 +22,7 @@ from cStringIO import StringIO import arvados import arvados.commands.put as arv_put +import arvados_testutil as tutil from arvados_testutil import ArvadosBaseTestCase, fake_httplib2_response import run_test_server @@ -31,9 +32,7 @@ class ArvadosPutResumeCacheTest(ArvadosBaseTestCase): [], ['/dev/null'], ['/dev/null', '--filename', 'empty'], - ['/tmp'], - ['/tmp', '--max-manifest-depth', '0'], - ['/tmp', '--max-manifest-depth', '1'] + ['/tmp'] ] def tearDown(self): @@ -238,181 +237,54 @@ class ArvadosPutResumeCacheTest(ArvadosBaseTestCase): arv_put.ResumeCache, path) -class ArvadosPutCollectionTest(run_test_server.TestCaseWithServers): - MAIN_SERVER = {} - KEEP_SERVER = {} - - def setUp(self): - self.lock = multiprocessing.Lock() - - def fake_reporter(self, written, expected): - self.lock.release() # Allow caller process to terminate() us... - - def bg_uploader(self, filename): - cache = arv_put.ArvPutCollectionCache([filename]) - c = arv_put.ArvPutCollection(reporter=self.fake_reporter, cache=cache) - c.collection_flush_time = 0 # flush collection on every block flush, just for this test - c.write_file(filename, os.path.basename(filename)) - - def test_write_collection_with_name(self): - name = 'This is a collection' - c = arv_put.ArvPutCollection(name=name) - self.assertEqual(name, c.name()) - - def test_write_file_on_collection_without_save(self): - c = arv_put.ArvPutCollection(should_save=False) - with tempfile.NamedTemporaryFile(delete=False) as f: - f.write("The quick brown fox jumped over the lazy dog") - c.write_file(f.name, os.path.basename(f.name)) - self.assertEqual(None, c.manifest_locator()) - os.unlink(f.name) - - def test_write_file_and_check_data_locators(self): - c = arv_put.ArvPutCollection(should_save=False) - with tempfile.NamedTemporaryFile(delete=False) as f: - # Writing ~90 MB, so that it writes 2 data blocks - for _ in range(2 * 1024 * 1024): - f.write("The quick brown fox jumped over the lazy dog\n") - c.write_file(f.name, os.path.basename(f.name)) - self.assertEqual(2, len(c.data_locators())) - os.unlink(f.name) - - def test_write_directory_and_check_data_locators(self): - data = 'b' * 1024 * 1024 # 1 MB - tmpdir = tempfile.mkdtemp() - for size in [1, 5, 10, 70]: - with open(os.path.join(tmpdir, 'file_%d' % size), 'w') as f: - for _ in range(size): - f.write(data) - os.mkdir(os.path.join(tmpdir, 'subdir1')) - for size in [2, 4, 6]: - with open(os.path.join(tmpdir, 'subdir1', 'file_%d' % size), 'w') as f: - for _ in range(size): - f.write(data) - c = arv_put.ArvPutCollection() - c.write_directory_tree(tmpdir) - shutil.rmtree(tmpdir) - self.assertEqual(8, len(c.data_locators())) +class ArvPutUploadJobTest(run_test_server.TestCaseWithServers, + ArvadosBaseTestCase): - def test_resume_large_file_upload(self): - _, filename = tempfile.mkstemp() - md5_original = hashlib.md5() - md5_uploaded = hashlib.md5() - fileobj = open(filename, 'w') - for _ in range(70): + def setUp(self): + super(ArvPutUploadJobTest, self).setUp() + run_test_server.authorize_with('active') + # Temp files creation + self.tempdir = tempfile.mkdtemp() + subdir = os.path.join(self.tempdir, 'subdir') + os.mkdir(subdir) + data = "x" * 1024 # 1 KB + for i in range(1, 5): + with open(os.path.join(self.tempdir, str(i)), 'w') as f: + f.write(data * i) + with open(os.path.join(subdir, 'otherfile'), 'w') as f: + f.write(data * 5) + # Large temp file for resume test + _, self.large_file_name = tempfile.mkstemp() + fileobj = open(self.large_file_name, 'w') + # Make sure to write just a little more than one block + for _ in range((arvados.config.KEEP_BLOCK_SIZE/(1024*1024))+1): data = random.choice(['x', 'y', 'z']) * 1024 * 1024 # 1 MB fileobj.write(data) - md5_original.update(data) fileobj.close() - self.lock.acquire() - uploader = multiprocessing.Process(target=self.bg_uploader, args=(filename,)) - uploader.start() - self.lock.acquire() # We can now proceed, because one block and collection flush()ed - self.lock.release() - uploader.terminate() - uploader.join() - cache = arv_put.ArvPutCollectionCache([filename]) - c = arv_put.ArvPutCollection(cache=cache) - self.assertLess(c.collection[os.path.basename(filename)].size(), os.path.getsize(filename)) - c.write_file(filename, os.path.basename(filename)) - uploaded = c.collection.open(os.path.basename(filename), 'r') - while True: - data = uploaded.read(1024*1024) - if not data: - break - md5_uploaded.update(data) - os.unlink(filename) - cache.destroy() - self.assertEqual(md5_original.hexdigest(), md5_uploaded.hexdigest()) - - def test_write_directory_twice(self): - data = 'b' * 1024 * 1024 - tmpdir = tempfile.mkdtemp() - for size in [1, 5, 10, 70]: - with open(os.path.join(tmpdir, 'file_%d' % size), 'w') as f: - for _ in range(size): - f.write(data) - os.mkdir(os.path.join(tmpdir, 'subdir1')) - for size in [2, 4, 6]: - with open(os.path.join(tmpdir, 'subdir1', 'file_%d' % size), 'w') as f: - for _ in range(size): - f.write(data) - c_cache = arv_put.ArvPutCollectionCache([tmpdir]) - c = arv_put.ArvPutCollection(cache=c_cache) - c.write_directory_tree(tmpdir) - c_cache.close() - d_cache = arv_put.ArvPutCollectionCache([tmpdir]) - d = arv_put.ArvPutCollection(cache=d_cache) - d.write_directory_tree(tmpdir) - d_cache.close() - c_cache.destroy() - d_cache.destroy() - shutil.rmtree(tmpdir) - self.assertNotEqual(c.bytes_written, d.bytes_written) - # self.assertGreater(c.bytes_written, 0) - # self.assertEqual(d.bytes_written, 0) - - -class ArvadosPutCollectionWriterTest(run_test_server.TestCaseWithServers, - ArvadosBaseTestCase): - def setUp(self): - super(ArvadosPutCollectionWriterTest, self).setUp() - run_test_server.authorize_with('active') - with tempfile.NamedTemporaryFile(delete=False) as cachefile: - self.cache = arv_put.ResumeCache(cachefile.name) - self.cache_filename = cachefile.name + self.arvfile_write = getattr(arvados.arvfile.ArvadosFileWriter, 'write') def tearDown(self): - super(ArvadosPutCollectionWriterTest, self).tearDown() - if os.path.exists(self.cache_filename): - self.cache.destroy() - self.cache.close() - - def test_writer_caches(self): - cwriter = arv_put.ArvPutCollectionWriter(self.cache) - cwriter.write_file('/dev/null') - cwriter.cache_state() - self.assertTrue(self.cache.load()) - self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text()) + super(ArvPutUploadJobTest, self).tearDown() + shutil.rmtree(self.tempdir) + os.unlink(self.large_file_name) def test_writer_works_without_cache(self): - cwriter = arv_put.ArvPutCollectionWriter() - cwriter.write_file('/dev/null') + cwriter = arv_put.ArvPutUploadJob(['/dev/null'], resume=False) + cwriter.start(save_collection=False) self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text()) - def test_writer_resumes_from_cache(self): - cwriter = arv_put.ArvPutCollectionWriter(self.cache) - with self.make_test_file() as testfile: - cwriter.write_file(testfile.name, 'test') - cwriter.cache_state() - new_writer = arv_put.ArvPutCollectionWriter.from_cache( - self.cache) - self.assertEqual( - ". 098f6bcd4621d373cade4e832627b4f6+4 0:4:test\n", - new_writer.manifest_text()) - - def test_new_writer_from_stale_cache(self): - cwriter = arv_put.ArvPutCollectionWriter(self.cache) - with self.make_test_file() as testfile: - cwriter.write_file(testfile.name, 'test') - new_writer = arv_put.ArvPutCollectionWriter.from_cache(self.cache) - new_writer.write_file('/dev/null') - self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", new_writer.manifest_text()) - - def test_new_writer_from_empty_cache(self): - cwriter = arv_put.ArvPutCollectionWriter.from_cache(self.cache) - cwriter.write_file('/dev/null') - self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text()) - - def test_writer_resumable_after_arbitrary_bytes(self): - cwriter = arv_put.ArvPutCollectionWriter(self.cache) - # These bytes are intentionally not valid UTF-8. - with self.make_test_file('\x00\x07\xe2') as testfile: - cwriter.write_file(testfile.name, 'test') - cwriter.cache_state() - new_writer = arv_put.ArvPutCollectionWriter.from_cache( - self.cache) - self.assertEqual(cwriter.manifest_text(), new_writer.manifest_text()) + def test_writer_works_with_cache(self): + with tempfile.NamedTemporaryFile() as f: + f.write('foo') + f.flush() + cwriter = arv_put.ArvPutUploadJob([f.name]) + cwriter.start(save_collection=False) + self.assertEqual(3, cwriter.bytes_written - cwriter.bytes_skipped) + # Don't destroy the cache, and start another upload + cwriter_new = arv_put.ArvPutUploadJob([f.name]) + cwriter_new.start(save_collection=False) + cwriter_new.destroy_cache() + self.assertEqual(0, cwriter_new.bytes_written - cwriter_new.bytes_skipped) def make_progress_tester(self): progression = [] @@ -421,24 +293,158 @@ class ArvadosPutCollectionWriterTest(run_test_server.TestCaseWithServers, return progression, record_func def test_progress_reporting(self): - for expect_count in (None, 8): - progression, reporter = self.make_progress_tester() - cwriter = arv_put.ArvPutCollectionWriter( - reporter=reporter, bytes_expected=expect_count) - with self.make_test_file() as testfile: - cwriter.write_file(testfile.name, 'test') - cwriter.finish_current_stream() - self.assertIn((4, expect_count), progression) - - def test_resume_progress(self): - cwriter = arv_put.ArvPutCollectionWriter(self.cache, bytes_expected=4) - with self.make_test_file() as testfile: - # Set up a writer with some flushed bytes. - cwriter.write_file(testfile.name, 'test') - cwriter.finish_current_stream() - cwriter.cache_state() - new_writer = arv_put.ArvPutCollectionWriter.from_cache(self.cache) - self.assertEqual(new_writer.bytes_written, 4) + with tempfile.NamedTemporaryFile() as f: + f.write('foo') + f.flush() + for expect_count in (None, 8): + progression, reporter = self.make_progress_tester() + cwriter = arv_put.ArvPutUploadJob([f.name], + reporter=reporter, bytes_expected=expect_count) + cwriter.start(save_collection=False) + cwriter.destroy_cache() + self.assertIn((3, expect_count), progression) + + def test_writer_upload_directory(self): + cwriter = arv_put.ArvPutUploadJob([self.tempdir]) + cwriter.start(save_collection=False) + cwriter.destroy_cache() + self.assertEqual(1024*(1+2+3+4+5), cwriter.bytes_written) + + def test_resume_large_file_upload(self): + def wrapped_write(*args, **kwargs): + data = args[1] + # Exit only on last block + if len(data) < arvados.config.KEEP_BLOCK_SIZE: + raise SystemExit("Simulated error") + return self.arvfile_write(*args, **kwargs) + + with mock.patch('arvados.arvfile.ArvadosFileWriter.write', + autospec=True) as mocked_write: + mocked_write.side_effect = wrapped_write + writer = arv_put.ArvPutUploadJob([self.large_file_name], + replication_desired=1) + with self.assertRaises(SystemExit): + writer.start(save_collection=False) + # Confirm that the file was partially uploaded + self.assertGreater(writer.bytes_written, 0) + self.assertLess(writer.bytes_written, + os.path.getsize(self.large_file_name)) + # Retry the upload + writer2 = arv_put.ArvPutUploadJob([self.large_file_name], + replication_desired=1) + writer2.start(save_collection=False) + self.assertEqual(writer.bytes_written + writer2.bytes_written - writer2.bytes_skipped, + os.path.getsize(self.large_file_name)) + writer2.destroy_cache() + + def test_no_resume_when_asked(self): + def wrapped_write(*args, **kwargs): + data = args[1] + # Exit only on last block + if len(data) < arvados.config.KEEP_BLOCK_SIZE: + raise SystemExit("Simulated error") + return self.arvfile_write(*args, **kwargs) + + with mock.patch('arvados.arvfile.ArvadosFileWriter.write', + autospec=True) as mocked_write: + mocked_write.side_effect = wrapped_write + writer = arv_put.ArvPutUploadJob([self.large_file_name], + replication_desired=1) + with self.assertRaises(SystemExit): + writer.start(save_collection=False) + # Confirm that the file was partially uploaded + self.assertGreater(writer.bytes_written, 0) + self.assertLess(writer.bytes_written, + os.path.getsize(self.large_file_name)) + # Retry the upload, this time without resume + writer2 = arv_put.ArvPutUploadJob([self.large_file_name], + replication_desired=1, + resume=False) + writer2.start(save_collection=False) + self.assertEqual(writer2.bytes_skipped, 0) + self.assertEqual(writer2.bytes_written, + os.path.getsize(self.large_file_name)) + writer2.destroy_cache() + + def test_no_resume_when_no_cache(self): + def wrapped_write(*args, **kwargs): + data = args[1] + # Exit only on last block + if len(data) < arvados.config.KEEP_BLOCK_SIZE: + raise SystemExit("Simulated error") + return self.arvfile_write(*args, **kwargs) + + with mock.patch('arvados.arvfile.ArvadosFileWriter.write', + autospec=True) as mocked_write: + mocked_write.side_effect = wrapped_write + writer = arv_put.ArvPutUploadJob([self.large_file_name], + replication_desired=1) + with self.assertRaises(SystemExit): + writer.start(save_collection=False) + # Confirm that the file was partially uploaded + self.assertGreater(writer.bytes_written, 0) + self.assertLess(writer.bytes_written, + os.path.getsize(self.large_file_name)) + # Retry the upload, this time without cache usage + writer2 = arv_put.ArvPutUploadJob([self.large_file_name], + replication_desired=1, + resume=False, + use_cache=False) + writer2.start(save_collection=False) + self.assertEqual(writer2.bytes_skipped, 0) + self.assertEqual(writer2.bytes_written, + os.path.getsize(self.large_file_name)) + writer2.destroy_cache() + + + def test_dry_run_feature(self): + def wrapped_write(*args, **kwargs): + data = args[1] + # Exit only on last block + if len(data) < arvados.config.KEEP_BLOCK_SIZE: + raise SystemExit("Simulated error") + return self.arvfile_write(*args, **kwargs) + + with mock.patch('arvados.arvfile.ArvadosFileWriter.write', + autospec=True) as mocked_write: + mocked_write.side_effect = wrapped_write + writer = arv_put.ArvPutUploadJob([self.large_file_name], + replication_desired=1) + with self.assertRaises(SystemExit): + writer.start(save_collection=False) + # Confirm that the file was partially uploaded + self.assertGreater(writer.bytes_written, 0) + self.assertLess(writer.bytes_written, + os.path.getsize(self.large_file_name)) + # Retry the upload using dry_run to check if there is a pending upload + writer2 = arv_put.ArvPutUploadJob([self.large_file_name], + replication_desired=1, + dry_run=True) + with self.assertRaises(arv_put.ArvPutUploadIsPending): + writer2.start(save_collection=False) + # Complete the pending upload + writer3 = arv_put.ArvPutUploadJob([self.large_file_name], + replication_desired=1) + writer3.start(save_collection=False) + # Confirm there's no pending upload with dry_run=True + writer4 = arv_put.ArvPutUploadJob([self.large_file_name], + replication_desired=1, + dry_run=True) + with self.assertRaises(arv_put.ArvPutUploadNotPending): + writer4.start(save_collection=False) + writer4.destroy_cache() + # Test obvious cases + with self.assertRaises(arv_put.ArvPutUploadIsPending): + arv_put.ArvPutUploadJob([self.large_file_name], + replication_desired=1, + dry_run=True, + resume=False, + use_cache=False) + with self.assertRaises(arv_put.ArvPutUploadIsPending): + arv_put.ArvPutUploadJob([self.large_file_name], + replication_desired=1, + dry_run=True, + resume=False) class ArvadosExpectedBytesTest(ArvadosBaseTestCase): @@ -513,6 +519,15 @@ class ArvadosPutTest(run_test_server.TestCaseWithServers, ArvadosBaseTestCase): delattr(self, outbuf) super(ArvadosPutTest, self).tearDown() + def test_version_argument(self): + err = io.BytesIO() + out = io.BytesIO() + with tutil.redirected_streams(stdout=out, stderr=err): + with self.assertRaises(SystemExit): + self.call_main_with_args(['--version']) + self.assertEqual(out.getvalue(), '') + self.assertRegexpMatches(err.getvalue(), "[0-9]+\.[0-9]+\.[0-9]+") + def test_simple_file_put(self): self.call_main_on_test_file() @@ -539,9 +554,8 @@ class ArvadosPutTest(run_test_server.TestCaseWithServers, ArvadosBaseTestCase): os.chmod(cachedir, 0o700) def test_put_block_replication(self): - with mock.patch('arvados.collection.KeepClient.local_store_put') as put_mock, \ - mock.patch('arvados.commands.put.ResumeCache.load') as cache_mock: - cache_mock.side_effect = ValueError + self.call_main_on_test_file() + with mock.patch('arvados.collection.KeepClient.local_store_put') as put_mock: put_mock.return_value = 'acbd18db4cc2f85cedef654fccc4a4d8+3' self.call_main_on_test_file(['--replication', '1']) self.call_main_on_test_file(['--replication', '4']) @@ -580,17 +594,16 @@ class ArvadosPutTest(run_test_server.TestCaseWithServers, ArvadosBaseTestCase): ['--project-uuid', self.Z_UUID, '--stream']) def test_api_error_handling(self): - collections_mock = mock.Mock(name='arv.collections()') - coll_create_mock = collections_mock().create().execute - coll_create_mock.side_effect = arvados.errors.ApiError( + coll_save_mock = mock.Mock(name='arv.collection.Collection().save_new()') + coll_save_mock.side_effect = arvados.errors.ApiError( fake_httplib2_response(403), '{}') - arv_put.api_client = arvados.api('v1') - arv_put.api_client.collections = collections_mock - with self.assertRaises(SystemExit) as exc_test: - self.call_main_with_args(['/dev/null']) - self.assertLess(0, exc_test.exception.args[0]) - self.assertLess(0, coll_create_mock.call_count) - self.assertEqual("", self.main_stdout.getvalue()) + with mock.patch('arvados.collection.Collection.save_new', + new=coll_save_mock): + with self.assertRaises(SystemExit) as exc_test: + self.call_main_with_args(['/dev/null']) + self.assertLess(0, exc_test.exception.args[0]) + self.assertLess(0, coll_save_mock.call_count) + self.assertEqual("", self.main_stdout.getvalue()) class ArvPutIntegrationTest(run_test_server.TestCaseWithServers, @@ -731,6 +744,21 @@ class ArvPutIntegrationTest(run_test_server.TestCaseWithServers, self.assertEqual(1, len(collection_list)) return collection_list[0] + def test_put_collection_with_later_update(self): + tmpdir = self.make_tmpdir() + with open(os.path.join(tmpdir, 'file1'), 'w') as f: + f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box') + col = self.run_and_find_collection("", ['--no-progress', tmpdir]) + self.assertNotEqual(None, col['uuid']) + # Add a new file to the directory + with open(os.path.join(tmpdir, 'file2'), 'w') as f: + f.write('The quick brown fox jumped over the lazy dog') + updated_col = self.run_and_find_collection("", ['--no-progress', '--update-collection', col['uuid'], tmpdir]) + self.assertEqual(col['uuid'], updated_col['uuid']) + # Get the manifest and check that the new file is being included + c = arv_put.api_client.collections().get(uuid=updated_col['uuid']).execute() + self.assertRegexpMatches(c['manifest_text'], r'^\. .*:44:file2\n') + def test_put_collection_with_high_redundancy(self): # Write empty data: we're not testing CollectionWriter, just # making sure collections.create tells the API server what our