# -*- coding: utf-8 -*-
import apiclient
+import io
import mock
import os
import pwd
import time
import unittest
import yaml
+import threading
+import hashlib
+import random
from cStringIO import StringIO
import arvados
import arvados.commands.put as arv_put
+import arvados_testutil as tutil
from arvados_testutil import ArvadosBaseTestCase, fake_httplib2_response
import run_test_server
[],
['/dev/null'],
['/dev/null', '--filename', 'empty'],
- ['/tmp'],
- ['/tmp', '--max-manifest-depth', '0'],
- ['/tmp', '--max-manifest-depth', '1']
+ ['/tmp']
]
def tearDown(self):
arv_put.ResumeCache, path)
-class ArvadosPutCollectionWriterTest(run_test_server.TestCaseWithServers,
- ArvadosBaseTestCase):
+class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
+ ArvadosBaseTestCase):
+
def setUp(self):
- super(ArvadosPutCollectionWriterTest, self).setUp()
+ super(ArvPutUploadJobTest, self).setUp()
run_test_server.authorize_with('active')
- with tempfile.NamedTemporaryFile(delete=False) as cachefile:
- self.cache = arv_put.ResumeCache(cachefile.name)
- self.cache_filename = cachefile.name
+ # Temp files creation
+ self.tempdir = tempfile.mkdtemp()
+ subdir = os.path.join(self.tempdir, 'subdir')
+ os.mkdir(subdir)
+ data = "x" * 1024 # 1 KB
+ for i in range(1, 5):
+ with open(os.path.join(self.tempdir, str(i)), 'w') as f:
+ f.write(data * i)
+ with open(os.path.join(subdir, 'otherfile'), 'w') as f:
+ f.write(data * 5)
+ # Large temp file for resume test
+ _, self.large_file_name = tempfile.mkstemp()
+ fileobj = open(self.large_file_name, 'w')
+ # Make sure to write just a little more than one block
+ for _ in range((arvados.config.KEEP_BLOCK_SIZE/(1024*1024))+1):
+ data = random.choice(['x', 'y', 'z']) * 1024 * 1024 # 1 MB
+ fileobj.write(data)
+ fileobj.close()
+ self.arvfile_write = getattr(arvados.arvfile.ArvadosFileWriter, 'write')
def tearDown(self):
- super(ArvadosPutCollectionWriterTest, self).tearDown()
- if os.path.exists(self.cache_filename):
- self.cache.destroy()
- self.cache.close()
-
- def test_writer_caches(self):
- cwriter = arv_put.ArvPutCollectionWriter(self.cache)
- cwriter.write_file('/dev/null')
- cwriter.cache_state()
- self.assertTrue(self.cache.load())
- self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text())
+ super(ArvPutUploadJobTest, self).tearDown()
+ shutil.rmtree(self.tempdir)
+ os.unlink(self.large_file_name)
def test_writer_works_without_cache(self):
- cwriter = arv_put.ArvPutCollectionWriter()
- cwriter.write_file('/dev/null')
+ cwriter = arv_put.ArvPutUploadJob(['/dev/null'], resume=False)
+ cwriter.start(save_collection=False)
self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text())
- def test_writer_resumes_from_cache(self):
- cwriter = arv_put.ArvPutCollectionWriter(self.cache)
- with self.make_test_file() as testfile:
- cwriter.write_file(testfile.name, 'test')
- cwriter.cache_state()
- new_writer = arv_put.ArvPutCollectionWriter.from_cache(
- self.cache)
- self.assertEqual(
- ". 098f6bcd4621d373cade4e832627b4f6+4 0:4:test\n",
- new_writer.manifest_text())
-
- def test_new_writer_from_stale_cache(self):
- cwriter = arv_put.ArvPutCollectionWriter(self.cache)
- with self.make_test_file() as testfile:
- cwriter.write_file(testfile.name, 'test')
- new_writer = arv_put.ArvPutCollectionWriter.from_cache(self.cache)
- new_writer.write_file('/dev/null')
- self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", new_writer.manifest_text())
-
- def test_new_writer_from_empty_cache(self):
- cwriter = arv_put.ArvPutCollectionWriter.from_cache(self.cache)
- cwriter.write_file('/dev/null')
- self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text())
-
- def test_writer_resumable_after_arbitrary_bytes(self):
- cwriter = arv_put.ArvPutCollectionWriter(self.cache)
- # These bytes are intentionally not valid UTF-8.
- with self.make_test_file('\x00\x07\xe2') as testfile:
- cwriter.write_file(testfile.name, 'test')
- cwriter.cache_state()
- new_writer = arv_put.ArvPutCollectionWriter.from_cache(
- self.cache)
- self.assertEqual(cwriter.manifest_text(), new_writer.manifest_text())
+ def test_writer_works_with_cache(self):
+ with tempfile.NamedTemporaryFile() as f:
+ f.write('foo')
+ f.flush()
+ cwriter = arv_put.ArvPutUploadJob([f.name])
+ cwriter.start(save_collection=False)
+ self.assertEqual(3, cwriter.bytes_written - cwriter.bytes_skipped)
+ # Don't destroy the cache, and start another upload
+ cwriter_new = arv_put.ArvPutUploadJob([f.name])
+ cwriter_new.start(save_collection=False)
+ cwriter_new.destroy_cache()
+ self.assertEqual(0, cwriter_new.bytes_written - cwriter_new.bytes_skipped)
def make_progress_tester(self):
progression = []
return progression, record_func
def test_progress_reporting(self):
- for expect_count in (None, 8):
- progression, reporter = self.make_progress_tester()
- cwriter = arv_put.ArvPutCollectionWriter(
- reporter=reporter, bytes_expected=expect_count)
- with self.make_test_file() as testfile:
- cwriter.write_file(testfile.name, 'test')
- cwriter.finish_current_stream()
- self.assertIn((4, expect_count), progression)
-
- def test_resume_progress(self):
- cwriter = arv_put.ArvPutCollectionWriter(self.cache, bytes_expected=4)
- with self.make_test_file() as testfile:
- # Set up a writer with some flushed bytes.
- cwriter.write_file(testfile.name, 'test')
- cwriter.finish_current_stream()
- cwriter.cache_state()
- new_writer = arv_put.ArvPutCollectionWriter.from_cache(self.cache)
- self.assertEqual(new_writer.bytes_written, 4)
+ with tempfile.NamedTemporaryFile() as f:
+ f.write('foo')
+ f.flush()
+ for expect_count in (None, 8):
+ progression, reporter = self.make_progress_tester()
+ cwriter = arv_put.ArvPutUploadJob([f.name],
+ reporter=reporter, bytes_expected=expect_count)
+ cwriter.start(save_collection=False)
+ cwriter.destroy_cache()
+ self.assertIn((3, expect_count), progression)
+
+ def test_writer_upload_directory(self):
+ cwriter = arv_put.ArvPutUploadJob([self.tempdir])
+ cwriter.start(save_collection=False)
+ cwriter.destroy_cache()
+ self.assertEqual(1024*(1+2+3+4+5), cwriter.bytes_written)
+
+ def test_resume_large_file_upload(self):
+ def wrapped_write(*args, **kwargs):
+ data = args[1]
+ # Exit only on last block
+ if len(data) < arvados.config.KEEP_BLOCK_SIZE:
+ raise SystemExit("Simulated error")
+ return self.arvfile_write(*args, **kwargs)
+
+ with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
+ autospec=True) as mocked_write:
+ mocked_write.side_effect = wrapped_write
+ writer = arv_put.ArvPutUploadJob([self.large_file_name],
+ replication_desired=1)
+ with self.assertRaises(SystemExit):
+ writer.start(save_collection=False)
+ # Confirm that the file was partially uploaded
+ self.assertGreater(writer.bytes_written, 0)
+ self.assertLess(writer.bytes_written,
+ os.path.getsize(self.large_file_name))
+ # Retry the upload
+ writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
+ replication_desired=1)
+ writer2.start(save_collection=False)
+ self.assertEqual(writer.bytes_written + writer2.bytes_written - writer2.bytes_skipped,
+ os.path.getsize(self.large_file_name))
+ writer2.destroy_cache()
+
+ def test_no_resume_when_asked(self):
+ def wrapped_write(*args, **kwargs):
+ data = args[1]
+ # Exit only on last block
+ if len(data) < arvados.config.KEEP_BLOCK_SIZE:
+ raise SystemExit("Simulated error")
+ return self.arvfile_write(*args, **kwargs)
+
+ with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
+ autospec=True) as mocked_write:
+ mocked_write.side_effect = wrapped_write
+ writer = arv_put.ArvPutUploadJob([self.large_file_name],
+ replication_desired=1)
+ with self.assertRaises(SystemExit):
+ writer.start(save_collection=False)
+ # Confirm that the file was partially uploaded
+ self.assertGreater(writer.bytes_written, 0)
+ self.assertLess(writer.bytes_written,
+ os.path.getsize(self.large_file_name))
+ # Retry the upload, this time without resume
+ writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
+ replication_desired=1,
+ resume=False)
+ writer2.start(save_collection=False)
+ self.assertEqual(writer2.bytes_skipped, 0)
+ self.assertEqual(writer2.bytes_written,
+ os.path.getsize(self.large_file_name))
+ writer2.destroy_cache()
+
+ def test_no_resume_when_no_cache(self):
+ def wrapped_write(*args, **kwargs):
+ data = args[1]
+ # Exit only on last block
+ if len(data) < arvados.config.KEEP_BLOCK_SIZE:
+ raise SystemExit("Simulated error")
+ return self.arvfile_write(*args, **kwargs)
+
+ with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
+ autospec=True) as mocked_write:
+ mocked_write.side_effect = wrapped_write
+ writer = arv_put.ArvPutUploadJob([self.large_file_name],
+ replication_desired=1)
+ with self.assertRaises(SystemExit):
+ writer.start(save_collection=False)
+ # Confirm that the file was partially uploaded
+ self.assertGreater(writer.bytes_written, 0)
+ self.assertLess(writer.bytes_written,
+ os.path.getsize(self.large_file_name))
+ # Retry the upload, this time without cache usage
+ writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
+ replication_desired=1,
+ resume=False,
+ use_cache=False)
+ writer2.start(save_collection=False)
+ self.assertEqual(writer2.bytes_skipped, 0)
+ self.assertEqual(writer2.bytes_written,
+ os.path.getsize(self.large_file_name))
+ writer2.destroy_cache()
+
+
+ def test_dry_run_feature(self):
+ def wrapped_write(*args, **kwargs):
+ data = args[1]
+ # Exit only on last block
+ if len(data) < arvados.config.KEEP_BLOCK_SIZE:
+ raise SystemExit("Simulated error")
+ return self.arvfile_write(*args, **kwargs)
+
+ with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
+ autospec=True) as mocked_write:
+ mocked_write.side_effect = wrapped_write
+ writer = arv_put.ArvPutUploadJob([self.large_file_name],
+ replication_desired=1)
+ with self.assertRaises(SystemExit):
+ writer.start(save_collection=False)
+ # Confirm that the file was partially uploaded
+ self.assertGreater(writer.bytes_written, 0)
+ self.assertLess(writer.bytes_written,
+ os.path.getsize(self.large_file_name))
+ # Retry the upload using dry_run to check if there is a pending upload
+ writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
+ replication_desired=1,
+ dry_run=True)
+ with self.assertRaises(arv_put.ArvPutUploadIsPending):
+ writer2.start(save_collection=False)
+ # Complete the pending upload
+ writer3 = arv_put.ArvPutUploadJob([self.large_file_name],
+ replication_desired=1)
+ writer3.start(save_collection=False)
+ # Confirm there's no pending upload with dry_run=True
+ writer4 = arv_put.ArvPutUploadJob([self.large_file_name],
+ replication_desired=1,
+ dry_run=True)
+ with self.assertRaises(arv_put.ArvPutUploadNotPending):
+ writer4.start(save_collection=False)
+ writer4.destroy_cache()
+ # Test obvious cases
+ with self.assertRaises(arv_put.ArvPutUploadIsPending):
+ arv_put.ArvPutUploadJob([self.large_file_name],
+ replication_desired=1,
+ dry_run=True,
+ resume=False,
+ use_cache=False)
+ with self.assertRaises(arv_put.ArvPutUploadIsPending):
+ arv_put.ArvPutUploadJob([self.large_file_name],
+ replication_desired=1,
+ dry_run=True,
+ resume=False)
class ArvadosExpectedBytesTest(ArvadosBaseTestCase):
delattr(self, outbuf)
super(ArvadosPutTest, self).tearDown()
+ def test_version_argument(self):
+ err = io.BytesIO()
+ out = io.BytesIO()
+ with tutil.redirected_streams(stdout=out, stderr=err):
+ with self.assertRaises(SystemExit):
+ self.call_main_with_args(['--version'])
+ self.assertEqual(out.getvalue(), '')
+ self.assertRegexpMatches(err.getvalue(), "[0-9]+\.[0-9]+\.[0-9]+")
+
def test_simple_file_put(self):
self.call_main_on_test_file()
os.chmod(cachedir, 0o700)
def test_put_block_replication(self):
- with mock.patch('arvados.collection.KeepClient.local_store_put') as put_mock, \
- mock.patch('arvados.commands.put.ResumeCache.load') as cache_mock:
- cache_mock.side_effect = ValueError
+ self.call_main_on_test_file()
+ with mock.patch('arvados.collection.KeepClient.local_store_put') as put_mock:
put_mock.return_value = 'acbd18db4cc2f85cedef654fccc4a4d8+3'
self.call_main_on_test_file(['--replication', '1'])
self.call_main_on_test_file(['--replication', '4'])
['--project-uuid', self.Z_UUID, '--stream'])
def test_api_error_handling(self):
- collections_mock = mock.Mock(name='arv.collections()')
- coll_create_mock = collections_mock().create().execute
- coll_create_mock.side_effect = arvados.errors.ApiError(
+ coll_save_mock = mock.Mock(name='arv.collection.Collection().save_new()')
+ coll_save_mock.side_effect = arvados.errors.ApiError(
fake_httplib2_response(403), '{}')
- arv_put.api_client = arvados.api('v1')
- arv_put.api_client.collections = collections_mock
- with self.assertRaises(SystemExit) as exc_test:
- self.call_main_with_args(['/dev/null'])
- self.assertLess(0, exc_test.exception.args[0])
- self.assertLess(0, coll_create_mock.call_count)
- self.assertEqual("", self.main_stdout.getvalue())
+ with mock.patch('arvados.collection.Collection.save_new',
+ new=coll_save_mock):
+ with self.assertRaises(SystemExit) as exc_test:
+ self.call_main_with_args(['/dev/null'])
+ self.assertLess(0, exc_test.exception.args[0])
+ self.assertLess(0, coll_save_mock.call_count)
+ self.assertEqual("", self.main_stdout.getvalue())
class ArvPutIntegrationTest(run_test_server.TestCaseWithServers,
self.assertEqual(1, len(collection_list))
return collection_list[0]
+ def test_put_collection_with_later_update(self):
+ tmpdir = self.make_tmpdir()
+ with open(os.path.join(tmpdir, 'file1'), 'w') as f:
+ f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
+ col = self.run_and_find_collection("", ['--no-progress', tmpdir])
+ self.assertNotEqual(None, col['uuid'])
+ # Add a new file to the directory
+ with open(os.path.join(tmpdir, 'file2'), 'w') as f:
+ f.write('The quick brown fox jumped over the lazy dog')
+ updated_col = self.run_and_find_collection("", ['--no-progress', '--update-collection', col['uuid'], tmpdir])
+ self.assertEqual(col['uuid'], updated_col['uuid'])
+ # Get the manifest and check that the new file is being included
+ c = arv_put.api_client.collections().get(uuid=updated_col['uuid']).execute()
+ self.assertRegexpMatches(c['manifest_text'], r'^\. .*:44:file2\n')
+
def test_put_collection_with_high_redundancy(self):
# Write empty data: we're not testing CollectionWriter, just
# making sure collections.create tells the API server what our