10315: Brought back the new arv-put tests from 9701 branch.
authorLucas Di Pentima <lucas@curoverse.com>
Tue, 25 Oct 2016 23:04:23 +0000 (20:04 -0300)
committerLucas Di Pentima <lucas@curoverse.com>
Tue, 25 Oct 2016 23:04:23 +0000 (20:04 -0300)
sdk/python/tests/test_arv_put.py

index e64d91474170ce688780c3ab94ea3ae6bb69bbfb..7a0120c02814d00b27e81dd41fbb50e51ef2855c 100644 (file)
@@ -13,11 +13,15 @@ import tempfile
 import time
 import unittest
 import yaml
+import threading
+import hashlib
+import random
 
 from cStringIO import StringIO
 
 import arvados
 import arvados.commands.put as arv_put
+import arvados_testutil as tutil
 
 from arvados_testutil import ArvadosBaseTestCase, fake_httplib2_response
 import run_test_server
@@ -234,66 +238,53 @@ class ArvadosPutResumeCacheTest(ArvadosBaseTestCase):
                           arv_put.ResumeCache, path)
 
 
-class ArvadosPutCollectionWriterTest(run_test_server.TestCaseWithServers,
-                                     ArvadosBaseTestCase):
+class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
+                          ArvadosBaseTestCase):
     def setUp(self):
-        super(ArvadosPutCollectionWriterTest, self).setUp()
+        super(ArvPutUploadJobTest, self).setUp()
         run_test_server.authorize_with('active')
-        with tempfile.NamedTemporaryFile(delete=False) as cachefile:
-            self.cache = arv_put.ResumeCache(cachefile.name)
-            self.cache_filename = cachefile.name
+        # Temp files creation
+        self.tempdir = tempfile.mkdtemp()
+        subdir = os.path.join(self.tempdir, 'subdir')
+        os.mkdir(subdir)
+        data = "x" * 1024 # 1 KB
+        for i in range(1, 5):
+            with open(os.path.join(self.tempdir, str(i)), 'w') as f:
+                f.write(data * i)
+        with open(os.path.join(subdir, 'otherfile'), 'w') as f:
+            f.write(data * 5)
+        # Large temp file for resume test
+        _, self.large_file_name = tempfile.mkstemp()
+        fileobj = open(self.large_file_name, 'w')
+        # Make sure to write just a little more than one block
+        for _ in range((arvados.config.KEEP_BLOCK_SIZE/(1024*1024))+1):
+            data = random.choice(['x', 'y', 'z']) * 1024 * 1024 # 1 MB
+            fileobj.write(data)
+        fileobj.close()
+        self.arvfile_write = getattr(arvados.arvfile.ArvadosFileWriter, 'write')
 
     def tearDown(self):
-        super(ArvadosPutCollectionWriterTest, self).tearDown()
-        if os.path.exists(self.cache_filename):
-            self.cache.destroy()
-        self.cache.close()
-
-    def test_writer_caches(self):
-        cwriter = arv_put.ArvPutCollectionWriter(self.cache)
-        cwriter.write_file('/dev/null')
-        cwriter.cache_state()
-        self.assertTrue(self.cache.load())
-        self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text())
+        super(ArvPutUploadJobTest, self).tearDown()
+        shutil.rmtree(self.tempdir)
+        os.unlink(self.large_file_name)
 
     def test_writer_works_without_cache(self):
-        cwriter = arv_put.ArvPutCollectionWriter()
-        cwriter.write_file('/dev/null')
-        self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text())
-
-    def test_writer_resumes_from_cache(self):
-        cwriter = arv_put.ArvPutCollectionWriter(self.cache)
-        with self.make_test_file() as testfile:
-            cwriter.write_file(testfile.name, 'test')
-            cwriter.cache_state()
-            new_writer = arv_put.ArvPutCollectionWriter.from_cache(
-                self.cache)
-            self.assertEqual(
-                ". 098f6bcd4621d373cade4e832627b4f6+4 0:4:test\n",
-                new_writer.manifest_text())
-
-    def test_new_writer_from_stale_cache(self):
-        cwriter = arv_put.ArvPutCollectionWriter(self.cache)
-        with self.make_test_file() as testfile:
-            cwriter.write_file(testfile.name, 'test')
-        new_writer = arv_put.ArvPutCollectionWriter.from_cache(self.cache)
-        new_writer.write_file('/dev/null')
-        self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", new_writer.manifest_text())
-
-    def test_new_writer_from_empty_cache(self):
-        cwriter = arv_put.ArvPutCollectionWriter.from_cache(self.cache)
-        cwriter.write_file('/dev/null')
+        cwriter = arv_put.ArvPutUploadJob(['/dev/null'], resume=False)
+        cwriter.start()
         self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text())
 
-    def test_writer_resumable_after_arbitrary_bytes(self):
-        cwriter = arv_put.ArvPutCollectionWriter(self.cache)
-        # These bytes are intentionally not valid UTF-8.
-        with self.make_test_file('\x00\x07\xe2') as testfile:
-            cwriter.write_file(testfile.name, 'test')
-            cwriter.cache_state()
-            new_writer = arv_put.ArvPutCollectionWriter.from_cache(
-                self.cache)
-        self.assertEqual(cwriter.manifest_text(), new_writer.manifest_text())
+    def test_writer_works_with_cache(self):
+        with tempfile.NamedTemporaryFile() as f:
+            f.write('foo')
+            f.flush()
+            cwriter = arv_put.ArvPutUploadJob([f.name])
+            cwriter.start()
+            self.assertEqual(3, cwriter.bytes_written)
+            # Don't destroy the cache, and start another upload
+            cwriter_new = arv_put.ArvPutUploadJob([f.name])
+            cwriter_new.start()
+            cwriter_new.destroy_cache()
+            self.assertEqual(0, cwriter_new.bytes_written)
 
     def make_progress_tester(self):
         progression = []
@@ -302,24 +293,47 @@ class ArvadosPutCollectionWriterTest(run_test_server.TestCaseWithServers,
         return progression, record_func
 
     def test_progress_reporting(self):
-        for expect_count in (None, 8):
-            progression, reporter = self.make_progress_tester()
-            cwriter = arv_put.ArvPutCollectionWriter(
-                reporter=reporter, bytes_expected=expect_count)
-            with self.make_test_file() as testfile:
-                cwriter.write_file(testfile.name, 'test')
-            cwriter.finish_current_stream()
-            self.assertIn((4, expect_count), progression)
-
-    def test_resume_progress(self):
-        cwriter = arv_put.ArvPutCollectionWriter(self.cache, bytes_expected=4)
-        with self.make_test_file() as testfile:
-            # Set up a writer with some flushed bytes.
-            cwriter.write_file(testfile.name, 'test')
-            cwriter.finish_current_stream()
-            cwriter.cache_state()
-            new_writer = arv_put.ArvPutCollectionWriter.from_cache(self.cache)
-            self.assertEqual(new_writer.bytes_written, 4)
+        with tempfile.NamedTemporaryFile() as f:
+            f.write('foo')
+            f.flush()
+            for expect_count in (None, 8):
+                progression, reporter = self.make_progress_tester()
+                cwriter = arv_put.ArvPutUploadJob([f.name],
+                    reporter=reporter, bytes_expected=expect_count)
+                cwriter.start()
+                cwriter.destroy_cache()
+                self.assertIn((3, expect_count), progression)
+
+    def test_writer_upload_directory(self):
+        cwriter = arv_put.ArvPutUploadJob([self.tempdir])
+        cwriter.start()
+        cwriter.destroy_cache()
+        self.assertEqual(1024*(1+2+3+4+5), cwriter.bytes_written)
+
+    def test_resume_large_file_upload(self):
+        def wrapped_write(*args, **kwargs):
+            data = args[1]
+            # Exit only on last block
+            if len(data) < arvados.config.KEEP_BLOCK_SIZE:
+                raise SystemExit("Simulated error")
+            return self.arvfile_write(*args, **kwargs)
+
+        with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
+                        autospec=True) as mocked_write:
+            mocked_write.side_effect = wrapped_write
+            writer = arv_put.ArvPutUploadJob([self.large_file_name],
+                                             replication_desired=1)
+            with self.assertRaises(SystemExit):
+                writer.start()
+                self.assertLess(writer.bytes_written,
+                                os.path.getsize(self.large_file_name))
+        # Retry the upload
+        writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
+                                          replication_desired=1)
+        writer2.start()
+        self.assertEqual(writer.bytes_written + writer2.bytes_written,
+                         os.path.getsize(self.large_file_name))
+        writer2.destroy_cache()
 
 
 class ArvadosExpectedBytesTest(ArvadosBaseTestCase):
@@ -420,9 +434,8 @@ class ArvadosPutTest(run_test_server.TestCaseWithServers, ArvadosBaseTestCase):
             os.chmod(cachedir, 0o700)
 
     def test_put_block_replication(self):
-        with mock.patch('arvados.collection.KeepClient.local_store_put') as put_mock, \
-             mock.patch('arvados.commands.put.ResumeCache.load') as cache_mock:
-            cache_mock.side_effect = ValueError
+        self.call_main_on_test_file()
+        with mock.patch('arvados.collection.KeepClient.local_store_put') as put_mock:
             put_mock.return_value = 'acbd18db4cc2f85cedef654fccc4a4d8+3'
             self.call_main_on_test_file(['--replication', '1'])
             self.call_main_on_test_file(['--replication', '4'])
@@ -461,17 +474,16 @@ class ArvadosPutTest(run_test_server.TestCaseWithServers, ArvadosBaseTestCase):
                           ['--project-uuid', self.Z_UUID, '--stream'])
 
     def test_api_error_handling(self):
-        collections_mock = mock.Mock(name='arv.collections()')
-        coll_create_mock = collections_mock().create().execute
-        coll_create_mock.side_effect = arvados.errors.ApiError(
+        coll_save_mock = mock.Mock(name='arv.collection.Collection().save_new()')
+        coll_save_mock.side_effect = arvados.errors.ApiError(
             fake_httplib2_response(403), '{}')
-        arv_put.api_client = arvados.api('v1')
-        arv_put.api_client.collections = collections_mock
-        with self.assertRaises(SystemExit) as exc_test:
-            self.call_main_with_args(['/dev/null'])
-        self.assertLess(0, exc_test.exception.args[0])
-        self.assertLess(0, coll_create_mock.call_count)
-        self.assertEqual("", self.main_stdout.getvalue())
+        with mock.patch('arvados.collection.Collection.save_new',
+                        new=coll_save_mock):
+            with self.assertRaises(SystemExit) as exc_test:
+                self.call_main_with_args(['/dev/null'])
+            self.assertLess(0, exc_test.exception.args[0])
+            self.assertLess(0, coll_save_mock.call_count)
+            self.assertEqual("", self.main_stdout.getvalue())
 
 
 class ArvPutIntegrationTest(run_test_server.TestCaseWithServers,