# -*- coding: utf-8 -*-
import apiclient
+import mock
import os
import pwd
import re
import shutil
import subprocess
+import multiprocessing
import sys
import tempfile
import time
import arvados
import arvados.commands.put as arv_put
-from arvados_testutil import ArvadosBaseTestCase
+from arvados_testutil import ArvadosBaseTestCase, fake_httplib2_response
import run_test_server
class ArvadosPutResumeCacheTest(ArvadosBaseTestCase):
else:
config['ARVADOS_API_HOST'] = orig_host
+ @mock.patch('arvados.keep.KeepClient.head')
+ def test_resume_cache_with_current_stream_locators(self, keep_client_head):
+ keep_client_head.side_effect = [True]
+ thing = {}
+ thing['_current_stream_locators'] = ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']
+ with tempfile.NamedTemporaryFile() as cachefile:
+ self.last_cache = arv_put.ResumeCache(cachefile.name)
+ self.last_cache.save(thing)
+ self.last_cache.close()
+ resume_cache = arv_put.ResumeCache(self.last_cache.filename)
+ self.assertNotEqual(None, resume_cache)
+
+ @mock.patch('arvados.keep.KeepClient.head')
+ def test_resume_cache_with_finished_streams(self, keep_client_head):
+ keep_client_head.side_effect = [True]
+ thing = {}
+ thing['_finished_streams'] = [['.', ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']]]
+ with tempfile.NamedTemporaryFile() as cachefile:
+ self.last_cache = arv_put.ResumeCache(cachefile.name)
+ self.last_cache.save(thing)
+ self.last_cache.close()
+ resume_cache = arv_put.ResumeCache(self.last_cache.filename)
+ self.assertNotEqual(None, resume_cache)
+
+ @mock.patch('arvados.keep.KeepClient.head')
+ def test_resume_cache_with_finished_streams_error_on_head(self, keep_client_head):
+ keep_client_head.side_effect = Exception('Locator not found')
+ thing = {}
+ thing['_finished_streams'] = [['.', ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']]]
+ with tempfile.NamedTemporaryFile() as cachefile:
+ self.last_cache = arv_put.ResumeCache(cachefile.name)
+ self.last_cache.save(thing)
+ self.last_cache.close()
+ resume_cache = arv_put.ResumeCache(self.last_cache.filename)
+ self.assertNotEqual(None, resume_cache)
+ self.assertRaises(None, resume_cache.check_cache())
+
def test_basic_cache_storage(self):
thing = ['test', 'list']
with tempfile.NamedTemporaryFile() as cachefile:
arv_put.ResumeCache, path)
+class ArvadosPutCollectionTest(run_test_server.TestCaseWithServers):
+ MAIN_SERVER = {}
+ KEEP_SERVER = {}
+ import shutil
+
+ # def test_write_files(self):
+ # c = arv_put.ArvPutCollection()
+ # data = 'a' * 1024 * 1024 # 1 MB
+ # tmpdir = tempfile.mkdtemp()
+ # for size in [1, 10, 64, 128]:
+ # with open(os.path.join(tmpdir, 'file_%d' % size), 'w') as f:
+ # for _ in range(size):
+ # f.write(data)
+ # c.write_file(f.name, os.path.basename(f.name))
+ # shutil.rmtree(tmpdir)
+ # self.assertEqual(True, c.manifest())
+ #
+ # def test_write_directory(self):
+ # data = 'b' * 1024 * 1024
+ # tmpdir = tempfile.mkdtemp()
+ # for size in [1, 5, 10, 70]:
+ # with open(os.path.join(tmpdir, 'file_%d' % size), 'w') as f:
+ # for _ in range(size):
+ # f.write(data)
+ # os.mkdir(os.path.join(tmpdir, 'subdir1'))
+ # for size in [2, 4, 6]:
+ # with open(os.path.join(tmpdir, 'subdir1', 'file_%d' % size), 'w') as f:
+ # for _ in range(size):
+ # f.write(data)
+ # c = arv_put.ArvPutUploader([tmpdir])
+ # shutil.rmtree(tmpdir)
+ # self.assertEqual(True, c.manifest())
+
+ def fake_reporter(self, written, expected):
+ # Use this callback as a intra-block pause to be able to simulate an interruption
+ print "Written %d / %d bytes" % (written, expected)
+ time.sleep(10)
+
+ def bg_uploader(self, paths):
+ return arv_put.ArvPutUploader(paths, reporter=self.fake_reporter)
+
+ # def test_resume_large_file_upload(self):
+ # import multiprocessing
+ # data = 'x' * 1024 * 1024 # 1 MB
+ # _, filename = tempfile.mkstemp()
+ # fileobj = open(filename, 'w')
+ # for _ in range(200):
+ # fileobj.write(data)
+ # fileobj.close()
+ # uploader = multiprocessing.Process(target=self.bg_uploader, args=([filename],))
+ # uploader.start()
+ # time.sleep(5)
+ # uploader.terminate()
+ # time.sleep(1)
+ # # cache = arv_put.ArvPutCollectionCache([filename])
+ # # print "Collection detected: %s" % cache.collection()
+ # # c = arv_put.ArvPutCollection(locator=cache.collection(), cache=cache)
+ # # print "UPLOADED: %d" % c.collection[os.path.basename(filename)].size()
+ # # self.assertLess(c.collection[os.path.basename(filename)].size(), os.path.getsize(filename))
+ # os.unlink(filename)
+
+ # def test_write_directory_twice(self):
+ # data = 'b' * 1024 * 1024
+ # tmpdir = tempfile.mkdtemp()
+ # for size in [1, 5, 10, 70]:
+ # with open(os.path.join(tmpdir, 'file_%d' % size), 'w') as f:
+ # for _ in range(size):
+ # f.write(data)
+ # os.mkdir(os.path.join(tmpdir, 'subdir1'))
+ # for size in [2, 4, 6]:
+ # with open(os.path.join(tmpdir, 'subdir1', 'file_%d' % size), 'w') as f:
+ # for _ in range(size):
+ # f.write(data)
+ # c = arv_put.ArvPutUploader([tmpdir])
+ # d = arv_put.ArvPutUploader([tmpdir])
+ # print "ESCRIBIERON: c: %d, d: %d" % (c.bytes_written(), d.bytes_written())
+ # shutil.rmtree(tmpdir)
+ # self.assertEqual(0, d.bytes_written())
+
+
class ArvadosPutCollectionWriterTest(run_test_server.TestCaseWithServers,
ArvadosBaseTestCase):
def setUp(self):
self.main_stderr = StringIO()
return arv_put.main(args, self.main_stdout, self.main_stderr)
- def call_main_on_test_file(self):
+ def call_main_on_test_file(self, args=[]):
with self.make_test_file() as testfile:
path = testfile.name
- self.call_main_with_args(['--stream', '--no-progress', path])
+ self.call_main_with_args(['--stream', '--no-progress'] + args + [path])
self.assertTrue(
os.path.exists(os.path.join(os.environ['KEEP_LOCAL_STORE'],
'098f6bcd4621d373cade4e832627b4f6')),
arv_put.ResumeCache.CACHE_DIR = orig_cachedir
os.chmod(cachedir, 0o700)
+ def test_put_block_replication(self):
+ with mock.patch('arvados.collection.KeepClient.local_store_put') as put_mock, \
+ mock.patch('arvados.commands.put.ResumeCache.load') as cache_mock:
+ cache_mock.side_effect = ValueError
+ put_mock.return_value = 'acbd18db4cc2f85cedef654fccc4a4d8+3'
+ self.call_main_on_test_file(['--replication', '1'])
+ self.call_main_on_test_file(['--replication', '4'])
+ self.call_main_on_test_file(['--replication', '5'])
+ self.assertEqual(
+ [x[-1].get('copies') for x in put_mock.call_args_list],
+ [1, 4, 5])
+
+ def test_normalize(self):
+ testfile1 = self.make_test_file()
+ testfile2 = self.make_test_file()
+ test_paths = [testfile1.name, testfile2.name]
+ # Reverse-sort the paths, so normalization must change their order.
+ test_paths.sort(reverse=True)
+ self.call_main_with_args(['--stream', '--no-progress', '--normalize'] +
+ test_paths)
+ manifest = self.main_stdout.getvalue()
+ # Assert the second file we specified appears first in the manifest.
+ file_indices = [manifest.find(':' + os.path.basename(path))
+ for path in test_paths]
+ self.assertGreater(*file_indices)
+
def test_error_name_without_collection(self):
self.assertRaises(SystemExit, self.call_main_with_args,
['--name', 'test without Collection',
self.call_main_with_args,
['--project-uuid', self.Z_UUID, '--stream'])
+ def test_api_error_handling(self):
+ collections_mock = mock.Mock(name='arv.collections()')
+ coll_create_mock = collections_mock().create().execute
+ coll_create_mock.side_effect = arvados.errors.ApiError(
+ fake_httplib2_response(403), '{}')
+ arv_put.api_client = arvados.api('v1')
+ arv_put.api_client.collections = collections_mock
+ with self.assertRaises(SystemExit) as exc_test:
+ self.call_main_with_args(['/dev/null'])
+ self.assertLess(0, exc_test.exception.args[0])
+ self.assertLess(0, coll_create_mock.call_count)
+ self.assertEqual("", self.main_stdout.getvalue())
+
+
class ArvPutIntegrationTest(run_test_server.TestCaseWithServers,
ArvadosBaseTestCase):
def _getKeepServerConfig():
for config_file, mandatory in [
- ['application.yml', True], ['application.default.yml', False]]:
+ ['application.yml', False], ['application.default.yml', True]]:
path = os.path.join(run_test_server.SERVICES_SRC_DIR,
"api", "config", config_file)
if not mandatory and not os.path.exists(path):