# -*- coding: utf-8 -*-
import apiclient
+import io
import mock
import os
import pwd
import arvados
import arvados.commands.put as arv_put
+import arvados_testutil as tutil
from arvados_testutil import ArvadosBaseTestCase, fake_httplib2_response
import run_test_server
def setUp(self):
super(ArvPutUploadJobTest, self).setUp()
run_test_server.authorize_with('active')
- self.exit_lock = threading.Lock()
- self.save_manifest_lock = threading.Lock()
+ # Temp files creation
+ self.tempdir = tempfile.mkdtemp()
+ subdir = os.path.join(self.tempdir, 'subdir')
+ os.mkdir(subdir)
+ data = "x" * 1024 # 1 KB
+ for i in range(1, 5):
+ with open(os.path.join(self.tempdir, str(i)), 'w') as f:
+ f.write(data * i)
+ with open(os.path.join(subdir, 'otherfile'), 'w') as f:
+ f.write(data * 5)
+ # Large temp file for resume test
+ _, self.large_file_name = tempfile.mkstemp()
+ fileobj = open(self.large_file_name, 'w')
+ # Make sure to write just a little more than one block
+ for _ in range((arvados.config.KEEP_BLOCK_SIZE/(1024*1024))+1):
+ data = random.choice(['x', 'y', 'z']) * 1024 * 1024 # 1 MB
+ fileobj.write(data)
+ fileobj.close()
+ self.arvfile_write = getattr(arvados.arvfile.ArvadosFileWriter, 'write')
def tearDown(self):
super(ArvPutUploadJobTest, self).tearDown()
+ shutil.rmtree(self.tempdir)
+ os.unlink(self.large_file_name)
def test_writer_works_without_cache(self):
cwriter = arv_put.ArvPutUploadJob(['/dev/null'], resume=False)
# Don't destroy the cache, and start another upload
cwriter_new = arv_put.ArvPutUploadJob([f.name])
cwriter_new.start()
- self.assertEqual(0, cwriter_new.bytes_written)
cwriter_new.destroy_cache()
+ self.assertEqual(0, cwriter_new.bytes_written)
def make_progress_tester(self):
progression = []
self.assertIn((3, expect_count), progression)
def test_writer_upload_directory(self):
- tempdir = tempfile.mkdtemp()
- subdir = os.path.join(tempdir, 'subdir')
- os.mkdir(subdir)
- data = "x" * 1024 # 1 KB
- for i in range(1, 5):
- with open(os.path.join(tempdir, str(i)), 'w') as f:
- f.write(data * i)
- with open(os.path.join(subdir, 'otherfile'), 'w') as f:
- f.write(data * 5)
- cwriter = arv_put.ArvPutUploadJob([tempdir])
+ cwriter = arv_put.ArvPutUploadJob([self.tempdir])
cwriter.start()
cwriter.destroy_cache()
- shutil.rmtree(tempdir)
self.assertEqual(1024*(1+2+3+4+5), cwriter.bytes_written)
def test_resume_large_file_upload(self):
- # Proxying ArvadosFile.writeto() method to be able to synchronize it
- # with partial manifest saves
- orig_func = getattr(arvados.arvfile.ArvadosFile, 'writeto')
- def wrapped_func(*args, **kwargs):
- data = args[2]
+ def wrapped_write(*args, **kwargs):
+ data = args[1]
+ # Exit only on last block
if len(data) < arvados.config.KEEP_BLOCK_SIZE:
- # Lock on the last block write call, waiting for the
- # manifest to be saved
- self.exit_lock.acquire()
- raise SystemExit('Test exception')
- ret = orig_func(*args, **kwargs)
- self.save_manifest_lock.release()
- return ret
- setattr(arvados.arvfile.ArvadosFile, 'writeto', wrapped_func)
- # Take advantage of the reporter feature to sync the partial
- # manifest writing with the simulated upload error.
- def fake_reporter(written, expected):
- # Wait until there's something to save
- self.save_manifest_lock.acquire()
- # Once the partial manifest is saved, allow exiting
- self.exit_lock.release()
- # Create random data to be uploaded
- md5_original = hashlib.md5()
- _, filename = tempfile.mkstemp()
- fileobj = open(filename, 'w')
- # Make sure to write just a little more than one block
- for _ in range((arvados.config.KEEP_BLOCK_SIZE/(1024*1024))+1):
- data = random.choice(['x', 'y', 'z']) * 1024 * 1024 # 1 MB
- md5_original.update(data)
- fileobj.write(data)
- fileobj.close()
- self.exit_lock.acquire()
- self.save_manifest_lock.acquire()
- writer = arv_put.ArvPutUploadJob([filename],
- reporter=fake_reporter,
- update_time=0.1)
- # First upload: partially completed with simulated error
- try:
- self.assertRaises(SystemExit, writer.start())
- except SystemExit:
- # Avoid getting a ResumeCacheConflict on the 2nd run
- writer._cache_file.close()
- self.assertLess(writer.bytes_written, os.path.getsize(filename))
-
- # Restore the ArvadosFile.writeto() method to before retrying
- setattr(arvados.arvfile.ArvadosFile, 'writeto', orig_func)
- writer_new = arv_put.ArvPutUploadJob([filename])
- writer_new.start()
- writer_new.destroy_cache()
- self.assertEqual(os.path.getsize(filename),
- writer.bytes_written + writer_new.bytes_written)
- # Read the uploaded file to compare its md5 hash
- md5_uploaded = hashlib.md5()
- c = arvados.collection.Collection(writer_new.manifest_text())
- with c.open(os.path.basename(filename), 'r') as f:
- new_data = f.read()
- md5_uploaded.update(new_data)
- self.assertEqual(md5_original.hexdigest(), md5_uploaded.hexdigest())
- # Cleaning up
- os.unlink(filename)
+ raise SystemExit("Simulated error")
+ return self.arvfile_write(*args, **kwargs)
+
+ with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
+ autospec=True) as mocked_write:
+ mocked_write.side_effect = wrapped_write
+ writer = arv_put.ArvPutUploadJob([self.large_file_name],
+ replication_desired=1)
+ with self.assertRaises(SystemExit):
+ writer.start()
+ self.assertLess(writer.bytes_written,
+ os.path.getsize(self.large_file_name))
+ # Retry the upload
+ writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
+ replication_desired=1)
+ writer2.start()
+ self.assertEqual(writer.bytes_written + writer2.bytes_written,
+ os.path.getsize(self.large_file_name))
+ writer2.destroy_cache()
class ArvadosExpectedBytesTest(ArvadosBaseTestCase):
delattr(self, outbuf)
super(ArvadosPutTest, self).tearDown()
+ def test_version_argument(self):
+ err = io.BytesIO()
+ out = io.BytesIO()
+ with tutil.redirected_streams(stdout=out, stderr=err):
+ with self.assertRaises(SystemExit):
+ self.call_main_with_args(['--version'])
+ self.assertEqual(out.getvalue(), '')
+ self.assertRegexpMatches(err.getvalue(), "[0-9]+\.[0-9]+\.[0-9]+")
+
def test_simple_file_put(self):
self.call_main_on_test_file()
os.chmod(cachedir, 0o700)
def test_put_block_replication(self):
- with mock.patch('arvados.collection.KeepClient.local_store_put') as put_mock, \
- mock.patch('arvados.commands.put.ResumeCache.load') as cache_mock:
- cache_mock.side_effect = ValueError
+ self.call_main_on_test_file()
+ with mock.patch('arvados.collection.KeepClient.local_store_put') as put_mock:
put_mock.return_value = 'acbd18db4cc2f85cedef654fccc4a4d8+3'
self.call_main_on_test_file(['--replication', '1'])
self.call_main_on_test_file(['--replication', '4'])
coll_save_mock = mock.Mock(name='arv.collection.Collection().save_new()')
coll_save_mock.side_effect = arvados.errors.ApiError(
fake_httplib2_response(403), '{}')
- arvados.collection.Collection.save_new = coll_save_mock
- with self.assertRaises(SystemExit) as exc_test:
- self.call_main_with_args(['/dev/null'])
- self.assertLess(0, exc_test.exception.args[0])
- self.assertLess(0, coll_save_mock.call_count)
- self.assertEqual("", self.main_stdout.getvalue())
+ with mock.patch('arvados.collection.Collection.save_new',
+ new=coll_save_mock):
+ with self.assertRaises(SystemExit) as exc_test:
+ self.call_main_with_args(['/dev/null'])
+ self.assertLess(0, exc_test.exception.args[0])
+ self.assertLess(0, coll_save_mock.call_count)
+ self.assertEqual("", self.main_stdout.getvalue())
class ArvPutIntegrationTest(run_test_server.TestCaseWithServers,