X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/2e5ac62b550f7dd608cf133ae66ef04f801be76b..dcb4db28681b6949a56a1de579891cb375c423fe:/sdk/python/tests/test_arv_put.py diff --git a/sdk/python/tests/test_arv_put.py b/sdk/python/tests/test_arv_put.py old mode 100644 new mode 100755 index 196264f766..09900750a1 --- a/sdk/python/tests/test_arv_put.py +++ b/sdk/python/tests/test_arv_put.py @@ -2,11 +2,13 @@ # -*- coding: utf-8 -*- import apiclient +import mock import os import pwd import re import shutil import subprocess +import multiprocessing import sys import tempfile import time @@ -18,7 +20,7 @@ from cStringIO import StringIO import arvados import arvados.commands.put as arv_put -from arvados_testutil import ArvadosBaseTestCase +from arvados_testutil import ArvadosBaseTestCase, fake_httplib2_response import run_test_server class ArvadosPutResumeCacheTest(ArvadosBaseTestCase): @@ -43,7 +45,7 @@ class ArvadosPutResumeCacheTest(ArvadosBaseTestCase): def test_cache_names_stable(self): for argset in self.CACHE_ARGSET: - self.assertEquals(self.cache_path_from_arglist(argset), + self.assertEqual(self.cache_path_from_arglist(argset), self.cache_path_from_arglist(argset), "cache name changed for {}".format(argset)) @@ -65,10 +67,10 @@ class ArvadosPutResumeCacheTest(ArvadosBaseTestCase): "path too exotic: {}".format(path)) def test_cache_names_ignore_argument_order(self): - self.assertEquals( + self.assertEqual( self.cache_path_from_arglist(['a', 'b', 'c']), self.cache_path_from_arglist(['c', 'a', 'b'])) - self.assertEquals( + self.assertEqual( self.cache_path_from_arglist(['-', '--filename', 'stdin']), self.cache_path_from_arglist(['--filename', 'stdin', '-'])) @@ -84,32 +86,32 @@ class ArvadosPutResumeCacheTest(ArvadosBaseTestCase): args = arv_put.parse_arguments(['/tmp']) args.filename = 'tmp' path2 = arv_put.ResumeCache.make_path(args) - self.assertEquals(path1, path2, + self.assertEqual(path1, path2, "cache path considered --filename for directory") - self.assertEquals( + self.assertEqual( self.cache_path_from_arglist(['-']), self.cache_path_from_arglist(['-', '--max-manifest-depth', '1']), "cache path considered --max-manifest-depth for file") def test_cache_names_treat_negative_manifest_depths_identically(self): base_args = ['/tmp', '--max-manifest-depth'] - self.assertEquals( + self.assertEqual( self.cache_path_from_arglist(base_args + ['-1']), self.cache_path_from_arglist(base_args + ['-2'])) def test_cache_names_treat_stdin_consistently(self): - self.assertEquals( + self.assertEqual( self.cache_path_from_arglist(['-', '--filename', 'test']), self.cache_path_from_arglist(['/dev/stdin', '--filename', 'test'])) def test_cache_names_identical_for_synonymous_names(self): - self.assertEquals( + self.assertEqual( self.cache_path_from_arglist(['.']), self.cache_path_from_arglist([os.path.realpath('.')])) testdir = self.make_tmpdir() looplink = os.path.join(testdir, 'loop') os.symlink(testdir, looplink) - self.assertEquals( + self.assertEqual( self.cache_path_from_arglist([testdir]), self.cache_path_from_arglist([looplink])) @@ -126,12 +128,49 @@ class ArvadosPutResumeCacheTest(ArvadosBaseTestCase): else: config['ARVADOS_API_HOST'] = orig_host + @mock.patch('arvados.keep.KeepClient.head') + def test_resume_cache_with_current_stream_locators(self, keep_client_head): + keep_client_head.side_effect = [True] + thing = {} + thing['_current_stream_locators'] = ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6'] + with tempfile.NamedTemporaryFile() as cachefile: + self.last_cache = arv_put.ResumeCache(cachefile.name) + self.last_cache.save(thing) + self.last_cache.close() + resume_cache = arv_put.ResumeCache(self.last_cache.filename) + self.assertNotEqual(None, resume_cache) + + @mock.patch('arvados.keep.KeepClient.head') + def test_resume_cache_with_finished_streams(self, keep_client_head): + keep_client_head.side_effect = [True] + thing = {} + thing['_finished_streams'] = [['.', ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']]] + with tempfile.NamedTemporaryFile() as cachefile: + self.last_cache = arv_put.ResumeCache(cachefile.name) + self.last_cache.save(thing) + self.last_cache.close() + resume_cache = arv_put.ResumeCache(self.last_cache.filename) + self.assertNotEqual(None, resume_cache) + + @mock.patch('arvados.keep.KeepClient.head') + def test_resume_cache_with_finished_streams_error_on_head(self, keep_client_head): + keep_client_head.side_effect = Exception('Locator not found') + thing = {} + thing['_finished_streams'] = [['.', ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']]] + with tempfile.NamedTemporaryFile() as cachefile: + self.last_cache = arv_put.ResumeCache(cachefile.name) + self.last_cache.save(thing) + self.last_cache.close() + resume_cache = arv_put.ResumeCache(self.last_cache.filename) + self.assertNotEqual(None, resume_cache) + self.assertRaises(None, resume_cache.check_cache()) + def test_basic_cache_storage(self): thing = ['test', 'list'] with tempfile.NamedTemporaryFile() as cachefile: self.last_cache = arv_put.ResumeCache(cachefile.name) self.last_cache.save(thing) - self.assertEquals(thing, self.last_cache.load()) + self.assertEqual(thing, self.last_cache.load()) def test_empty_cache(self): with tempfile.NamedTemporaryFile() as cachefile: @@ -145,7 +184,7 @@ class ArvadosPutResumeCacheTest(ArvadosBaseTestCase): cache.save(thing) cache.close() self.last_cache = arv_put.ResumeCache(path) - self.assertEquals(thing, self.last_cache.load()) + self.assertEqual(thing, self.last_cache.load()) def test_multiple_cache_writes(self): thing = ['short', 'list'] @@ -155,7 +194,7 @@ class ArvadosPutResumeCacheTest(ArvadosBaseTestCase): # sure the cache file gets truncated. self.last_cache.save(['long', 'long', 'list']) self.last_cache.save(thing) - self.assertEquals(thing, self.last_cache.load()) + self.assertEqual(thing, self.last_cache.load()) def test_cache_is_locked(self): with tempfile.NamedTemporaryFile() as cachefile: @@ -196,6 +235,86 @@ class ArvadosPutResumeCacheTest(ArvadosBaseTestCase): arv_put.ResumeCache, path) +class ArvadosPutCollectionTest(run_test_server.TestCaseWithServers): + MAIN_SERVER = {} + KEEP_SERVER = {} + import shutil + + # def test_write_files(self): + # c = arv_put.ArvPutCollection() + # data = 'a' * 1024 * 1024 # 1 MB + # tmpdir = tempfile.mkdtemp() + # for size in [1, 10, 64, 128]: + # with open(os.path.join(tmpdir, 'file_%d' % size), 'w') as f: + # for _ in range(size): + # f.write(data) + # c.write_file(f.name, os.path.basename(f.name)) + # shutil.rmtree(tmpdir) + # self.assertEqual(True, c.manifest()) + # + # def test_write_directory(self): + # data = 'b' * 1024 * 1024 + # tmpdir = tempfile.mkdtemp() + # for size in [1, 5, 10, 70]: + # with open(os.path.join(tmpdir, 'file_%d' % size), 'w') as f: + # for _ in range(size): + # f.write(data) + # os.mkdir(os.path.join(tmpdir, 'subdir1')) + # for size in [2, 4, 6]: + # with open(os.path.join(tmpdir, 'subdir1', 'file_%d' % size), 'w') as f: + # for _ in range(size): + # f.write(data) + # c = arv_put.ArvPutUploader([tmpdir]) + # shutil.rmtree(tmpdir) + # self.assertEqual(True, c.manifest()) + + def fake_reporter(self, written, expected): + # Use this callback as a intra-block pause to be able to simulate an interruption + print "Written %d / %d bytes" % (written, expected) + time.sleep(10) + + def bg_uploader(self, paths): + return arv_put.ArvPutUploader(paths, reporter=self.fake_reporter) + + # def test_resume_large_file_upload(self): + # import multiprocessing + # data = 'x' * 1024 * 1024 # 1 MB + # _, filename = tempfile.mkstemp() + # fileobj = open(filename, 'w') + # for _ in range(200): + # fileobj.write(data) + # fileobj.close() + # uploader = multiprocessing.Process(target=self.bg_uploader, args=([filename],)) + # uploader.start() + # time.sleep(5) + # uploader.terminate() + # time.sleep(1) + # # cache = arv_put.ArvPutCollectionCache([filename]) + # # print "Collection detected: %s" % cache.collection() + # # c = arv_put.ArvPutCollection(locator=cache.collection(), cache=cache) + # # print "UPLOADED: %d" % c.collection[os.path.basename(filename)].size() + # # self.assertLess(c.collection[os.path.basename(filename)].size(), os.path.getsize(filename)) + # os.unlink(filename) + + # def test_write_directory_twice(self): + # data = 'b' * 1024 * 1024 + # tmpdir = tempfile.mkdtemp() + # for size in [1, 5, 10, 70]: + # with open(os.path.join(tmpdir, 'file_%d' % size), 'w') as f: + # for _ in range(size): + # f.write(data) + # os.mkdir(os.path.join(tmpdir, 'subdir1')) + # for size in [2, 4, 6]: + # with open(os.path.join(tmpdir, 'subdir1', 'file_%d' % size), 'w') as f: + # for _ in range(size): + # f.write(data) + # c = arv_put.ArvPutUploader([tmpdir]) + # d = arv_put.ArvPutUploader([tmpdir]) + # print "ESCRIBIERON: c: %d, d: %d" % (c.bytes_written(), d.bytes_written()) + # shutil.rmtree(tmpdir) + # self.assertEqual(0, d.bytes_written()) + + class ArvadosPutCollectionWriterTest(run_test_server.TestCaseWithServers, ArvadosBaseTestCase): def setUp(self): @@ -216,12 +335,12 @@ class ArvadosPutCollectionWriterTest(run_test_server.TestCaseWithServers, cwriter.write_file('/dev/null') cwriter.cache_state() self.assertTrue(self.cache.load()) - self.assertEquals(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text()) + self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text()) def test_writer_works_without_cache(self): cwriter = arv_put.ArvPutCollectionWriter() cwriter.write_file('/dev/null') - self.assertEquals(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text()) + self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text()) def test_writer_resumes_from_cache(self): cwriter = arv_put.ArvPutCollectionWriter(self.cache) @@ -230,7 +349,7 @@ class ArvadosPutCollectionWriterTest(run_test_server.TestCaseWithServers, cwriter.cache_state() new_writer = arv_put.ArvPutCollectionWriter.from_cache( self.cache) - self.assertEquals( + self.assertEqual( ". 098f6bcd4621d373cade4e832627b4f6+4 0:4:test\n", new_writer.manifest_text()) @@ -240,12 +359,12 @@ class ArvadosPutCollectionWriterTest(run_test_server.TestCaseWithServers, cwriter.write_file(testfile.name, 'test') new_writer = arv_put.ArvPutCollectionWriter.from_cache(self.cache) new_writer.write_file('/dev/null') - self.assertEquals(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", new_writer.manifest_text()) + self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", new_writer.manifest_text()) def test_new_writer_from_empty_cache(self): cwriter = arv_put.ArvPutCollectionWriter.from_cache(self.cache) cwriter.write_file('/dev/null') - self.assertEquals(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text()) + self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text()) def test_writer_resumable_after_arbitrary_bytes(self): cwriter = arv_put.ArvPutCollectionWriter(self.cache) @@ -255,7 +374,7 @@ class ArvadosPutCollectionWriterTest(run_test_server.TestCaseWithServers, cwriter.cache_state() new_writer = arv_put.ArvPutCollectionWriter.from_cache( self.cache) - self.assertEquals(cwriter.manifest_text(), new_writer.manifest_text()) + self.assertEqual(cwriter.manifest_text(), new_writer.manifest_text()) def make_progress_tester(self): progression = [] @@ -288,16 +407,16 @@ class ArvadosExpectedBytesTest(ArvadosBaseTestCase): TEST_SIZE = os.path.getsize(__file__) def test_expected_bytes_for_file(self): - self.assertEquals(self.TEST_SIZE, + self.assertEqual(self.TEST_SIZE, arv_put.expected_bytes_for([__file__])) def test_expected_bytes_for_tree(self): tree = self.make_tmpdir() shutil.copyfile(__file__, os.path.join(tree, 'one')) shutil.copyfile(__file__, os.path.join(tree, 'two')) - self.assertEquals(self.TEST_SIZE * 2, + self.assertEqual(self.TEST_SIZE * 2, arv_put.expected_bytes_for([tree])) - self.assertEquals(self.TEST_SIZE * 3, + self.assertEqual(self.TEST_SIZE * 3, arv_put.expected_bytes_for([tree, __file__])) def test_expected_bytes_for_device(self): @@ -335,10 +454,10 @@ class ArvadosPutTest(run_test_server.TestCaseWithServers, ArvadosBaseTestCase): self.main_stderr = StringIO() return arv_put.main(args, self.main_stdout, self.main_stderr) - def call_main_on_test_file(self): + def call_main_on_test_file(self, args=[]): with self.make_test_file() as testfile: path = testfile.name - self.call_main_with_args(['--stream', '--no-progress', path]) + self.call_main_with_args(['--stream', '--no-progress'] + args + [path]) self.assertTrue( os.path.exists(os.path.join(os.environ['KEEP_LOCAL_STORE'], '098f6bcd4621d373cade4e832627b4f6')), @@ -381,6 +500,32 @@ class ArvadosPutTest(run_test_server.TestCaseWithServers, ArvadosBaseTestCase): arv_put.ResumeCache.CACHE_DIR = orig_cachedir os.chmod(cachedir, 0o700) + def test_put_block_replication(self): + with mock.patch('arvados.collection.KeepClient.local_store_put') as put_mock, \ + mock.patch('arvados.commands.put.ResumeCache.load') as cache_mock: + cache_mock.side_effect = ValueError + put_mock.return_value = 'acbd18db4cc2f85cedef654fccc4a4d8+3' + self.call_main_on_test_file(['--replication', '1']) + self.call_main_on_test_file(['--replication', '4']) + self.call_main_on_test_file(['--replication', '5']) + self.assertEqual( + [x[-1].get('copies') for x in put_mock.call_args_list], + [1, 4, 5]) + + def test_normalize(self): + testfile1 = self.make_test_file() + testfile2 = self.make_test_file() + test_paths = [testfile1.name, testfile2.name] + # Reverse-sort the paths, so normalization must change their order. + test_paths.sort(reverse=True) + self.call_main_with_args(['--stream', '--no-progress', '--normalize'] + + test_paths) + manifest = self.main_stdout.getvalue() + # Assert the second file we specified appears first in the manifest. + file_indices = [manifest.find(':' + os.path.basename(path)) + for path in test_paths] + self.assertGreater(*file_indices) + def test_error_name_without_collection(self): self.assertRaises(SystemExit, self.call_main_with_args, ['--name', 'test without Collection', @@ -396,12 +541,30 @@ class ArvadosPutTest(run_test_server.TestCaseWithServers, ArvadosBaseTestCase): self.call_main_with_args, ['--project-uuid', self.Z_UUID, '--stream']) + def test_api_error_handling(self): + collections_mock = mock.Mock(name='arv.collections()') + coll_create_mock = collections_mock().create().execute + coll_create_mock.side_effect = arvados.errors.ApiError( + fake_httplib2_response(403), '{}') + arv_put.api_client = arvados.api('v1') + arv_put.api_client.collections = collections_mock + with self.assertRaises(SystemExit) as exc_test: + self.call_main_with_args(['/dev/null']) + self.assertLess(0, exc_test.exception.args[0]) + self.assertLess(0, coll_create_mock.call_count) + self.assertEqual("", self.main_stdout.getvalue()) + + class ArvPutIntegrationTest(run_test_server.TestCaseWithServers, ArvadosBaseTestCase): def _getKeepServerConfig(): - for config_file in ['application.yml', 'application.default.yml']: - with open(os.path.join(run_test_server.SERVICES_SRC_DIR, - "api", "config", config_file)) as f: + for config_file, mandatory in [ + ['application.yml', False], ['application.default.yml', True]]: + path = os.path.join(run_test_server.SERVICES_SRC_DIR, + "api", "config", config_file) + if not mandatory and not os.path.exists(path): + continue + with open(path) as f: rails_config = yaml.load(f.read()) for config_section in ['test', 'common']: try: @@ -539,7 +702,7 @@ class ArvPutIntegrationTest(run_test_server.TestCaseWithServers, def test_put_collection_with_default_redundancy(self): collection = self.run_and_find_collection("") - self.assertEqual(2, collection['replication_desired']) + self.assertEqual(None, collection['replication_desired']) def test_put_collection_with_unnamed_project_link(self): link = self.run_and_find_collection(