11579: Added assertion on test to check for file symlinks to be copied by default.
[arvados.git] / sdk / python / tests / test_arv_put.py
index 7a0120c02814d00b27e81dd41fbb50e51ef2855c..320189104ab555dc97be4c618e83adc8d6acdd7f 100644 (file)
@@ -2,6 +2,7 @@
 # -*- coding: utf-8 -*-
 
 import apiclient
+import io
 import mock
 import os
 import pwd
@@ -16,6 +17,7 @@ import yaml
 import threading
 import hashlib
 import random
+import uuid
 
 from cStringIO import StringIO
 
@@ -31,9 +33,7 @@ class ArvadosPutResumeCacheTest(ArvadosBaseTestCase):
         [],
         ['/dev/null'],
         ['/dev/null', '--filename', 'empty'],
-        ['/tmp'],
-        ['/tmp', '--max-manifest-depth', '0'],
-        ['/tmp', '--max-manifest-depth', '1']
+        ['/tmp']
         ]
 
     def tearDown(self):
@@ -240,6 +240,7 @@ class ArvadosPutResumeCacheTest(ArvadosBaseTestCase):
 
 class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
                           ArvadosBaseTestCase):
+
     def setUp(self):
         super(ArvPutUploadJobTest, self).setUp()
         run_test_server.authorize_with('active')
@@ -261,16 +262,50 @@ class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
             data = random.choice(['x', 'y', 'z']) * 1024 * 1024 # 1 MB
             fileobj.write(data)
         fileobj.close()
+        # Temp dir containing small files to be repacked
+        self.small_files_dir = tempfile.mkdtemp()
+        data = 'y' * 1024 * 1024 # 1 MB
+        for i in range(1, 70):
+            with open(os.path.join(self.small_files_dir, str(i)), 'w') as f:
+                f.write(data + str(i))
         self.arvfile_write = getattr(arvados.arvfile.ArvadosFileWriter, 'write')
+        # Temp dir to hold a symlink to other temp dir
+        self.tempdir_with_symlink = tempfile.mkdtemp()
+        os.symlink(self.tempdir, os.path.join(self.tempdir_with_symlink, 'linkeddir'))
+        os.symlink(os.path.join(self.tempdir, '1'),
+                   os.path.join(self.tempdir_with_symlink, 'linkedfile'))
 
     def tearDown(self):
         super(ArvPutUploadJobTest, self).tearDown()
         shutil.rmtree(self.tempdir)
         os.unlink(self.large_file_name)
+        shutil.rmtree(self.small_files_dir)
+        shutil.rmtree(self.tempdir_with_symlink)
+
+    def test_symlinks_are_followed_by_default(self):
+        cwriter = arv_put.ArvPutUploadJob([self.tempdir_with_symlink])
+        cwriter.start(save_collection=False)
+        self.assertIn('linkeddir', cwriter.manifest_text())
+        self.assertIn('linkedfile', cwriter.manifest_text())
+        cwriter.destroy_cache()
+
+    def test_symlinks_are_not_followed_when_requested(self):
+        cwriter = arv_put.ArvPutUploadJob([self.tempdir_with_symlink],
+                                          follow_links=False)
+        cwriter.start(save_collection=False)
+        self.assertNotIn('linkeddir', cwriter.manifest_text())
+        self.assertNotIn('linkedfile', cwriter.manifest_text())
+        cwriter.destroy_cache()
+
+    def test_passing_nonexistant_path_raise_exception(self):
+        uuid_str = str(uuid.uuid4())
+        cwriter = arv_put.ArvPutUploadJob(["/this/path/does/not/exist/{}".format(uuid_str)])
+        with self.assertRaises(arv_put.PathDoesNotExistError):
+            cwriter.start(save_collection=False)
 
     def test_writer_works_without_cache(self):
         cwriter = arv_put.ArvPutUploadJob(['/dev/null'], resume=False)
-        cwriter.start()
+        cwriter.start(save_collection=False)
         self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text())
 
     def test_writer_works_with_cache(self):
@@ -278,13 +313,13 @@ class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
             f.write('foo')
             f.flush()
             cwriter = arv_put.ArvPutUploadJob([f.name])
-            cwriter.start()
-            self.assertEqual(3, cwriter.bytes_written)
+            cwriter.start(save_collection=False)
+            self.assertEqual(3, cwriter.bytes_written - cwriter.bytes_skipped)
             # Don't destroy the cache, and start another upload
             cwriter_new = arv_put.ArvPutUploadJob([f.name])
-            cwriter_new.start()
+            cwriter_new.start(save_collection=False)
             cwriter_new.destroy_cache()
-            self.assertEqual(0, cwriter_new.bytes_written)
+            self.assertEqual(0, cwriter_new.bytes_written - cwriter_new.bytes_skipped)
 
     def make_progress_tester(self):
         progression = []
@@ -300,13 +335,13 @@ class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
                 progression, reporter = self.make_progress_tester()
                 cwriter = arv_put.ArvPutUploadJob([f.name],
                     reporter=reporter, bytes_expected=expect_count)
-                cwriter.start()
+                cwriter.start(save_collection=False)
                 cwriter.destroy_cache()
                 self.assertIn((3, expect_count), progression)
 
     def test_writer_upload_directory(self):
         cwriter = arv_put.ArvPutUploadJob([self.tempdir])
-        cwriter.start()
+        cwriter.start(save_collection=False)
         cwriter.destroy_cache()
         self.assertEqual(1024*(1+2+3+4+5), cwriter.bytes_written)
 
@@ -315,6 +350,8 @@ class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
             data = args[1]
             # Exit only on last block
             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
+                # Simulate a checkpoint before quitting. Ensure block commit.
+                self.writer._update(final=True)
                 raise SystemExit("Simulated error")
             return self.arvfile_write(*args, **kwargs)
 
@@ -323,18 +360,166 @@ class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
             mocked_write.side_effect = wrapped_write
             writer = arv_put.ArvPutUploadJob([self.large_file_name],
                                              replication_desired=1)
+            # We'll be accessing from inside the wrapper
+            self.writer = writer
             with self.assertRaises(SystemExit):
-                writer.start()
-                self.assertLess(writer.bytes_written,
-                                os.path.getsize(self.large_file_name))
+                writer.start(save_collection=False)
+            # Confirm that the file was partially uploaded
+            self.assertGreater(writer.bytes_written, 0)
+            self.assertLess(writer.bytes_written,
+                            os.path.getsize(self.large_file_name))
         # Retry the upload
         writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
                                           replication_desired=1)
-        writer2.start()
-        self.assertEqual(writer.bytes_written + writer2.bytes_written,
+        writer2.start(save_collection=False)
+        self.assertEqual(writer.bytes_written + writer2.bytes_written - writer2.bytes_skipped,
+                         os.path.getsize(self.large_file_name))
+        writer2.destroy_cache()
+        del(self.writer)
+
+    # Test for bug #11002
+    def test_graceful_exit_while_repacking_small_blocks(self):
+        def wrapped_commit(*args, **kwargs):
+            raise SystemExit("Simulated error")
+
+        with mock.patch('arvados.arvfile._BlockManager.commit_bufferblock',
+                        autospec=True) as mocked_commit:
+            mocked_commit.side_effect = wrapped_commit
+            # Upload a little more than 1 block, wrapped_commit will make the first block
+            # commit to fail.
+            # arv-put should not exit with an exception by trying to commit the collection
+            # as it's in an inconsistent state.
+            writer = arv_put.ArvPutUploadJob([self.small_files_dir],
+                                             replication_desired=1)
+            try:
+                with self.assertRaises(SystemExit):
+                    writer.start(save_collection=False)
+            except arvados.arvfile.UnownedBlockError:
+                self.fail("arv-put command is trying to use a corrupted BlockManager. See https://dev.arvados.org/issues/11002")
+        writer.destroy_cache()
+
+    def test_no_resume_when_asked(self):
+        def wrapped_write(*args, **kwargs):
+            data = args[1]
+            # Exit only on last block
+            if len(data) < arvados.config.KEEP_BLOCK_SIZE:
+                # Simulate a checkpoint before quitting.
+                self.writer._update()
+                raise SystemExit("Simulated error")
+            return self.arvfile_write(*args, **kwargs)
+
+        with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
+                        autospec=True) as mocked_write:
+            mocked_write.side_effect = wrapped_write
+            writer = arv_put.ArvPutUploadJob([self.large_file_name],
+                                             replication_desired=1)
+            # We'll be accessing from inside the wrapper
+            self.writer = writer
+            with self.assertRaises(SystemExit):
+                writer.start(save_collection=False)
+            # Confirm that the file was partially uploaded
+            self.assertGreater(writer.bytes_written, 0)
+            self.assertLess(writer.bytes_written,
+                            os.path.getsize(self.large_file_name))
+        # Retry the upload, this time without resume
+        writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
+                                          replication_desired=1,
+                                          resume=False)
+        writer2.start(save_collection=False)
+        self.assertEqual(writer2.bytes_skipped, 0)
+        self.assertEqual(writer2.bytes_written,
                          os.path.getsize(self.large_file_name))
         writer2.destroy_cache()
+        del(self.writer)
 
+    def test_no_resume_when_no_cache(self):
+        def wrapped_write(*args, **kwargs):
+            data = args[1]
+            # Exit only on last block
+            if len(data) < arvados.config.KEEP_BLOCK_SIZE:
+                # Simulate a checkpoint before quitting.
+                self.writer._update()
+                raise SystemExit("Simulated error")
+            return self.arvfile_write(*args, **kwargs)
+
+        with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
+                        autospec=True) as mocked_write:
+            mocked_write.side_effect = wrapped_write
+            writer = arv_put.ArvPutUploadJob([self.large_file_name],
+                                             replication_desired=1)
+            # We'll be accessing from inside the wrapper
+            self.writer = writer
+            with self.assertRaises(SystemExit):
+                writer.start(save_collection=False)
+            # Confirm that the file was partially uploaded
+            self.assertGreater(writer.bytes_written, 0)
+            self.assertLess(writer.bytes_written,
+                            os.path.getsize(self.large_file_name))
+        # Retry the upload, this time without cache usage
+        writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
+                                          replication_desired=1,
+                                          resume=False,
+                                          use_cache=False)
+        writer2.start(save_collection=False)
+        self.assertEqual(writer2.bytes_skipped, 0)
+        self.assertEqual(writer2.bytes_written,
+                         os.path.getsize(self.large_file_name))
+        writer2.destroy_cache()
+        del(self.writer)
+
+    def test_dry_run_feature(self):
+        def wrapped_write(*args, **kwargs):
+            data = args[1]
+            # Exit only on last block
+            if len(data) < arvados.config.KEEP_BLOCK_SIZE:
+                # Simulate a checkpoint before quitting.
+                self.writer._update()
+                raise SystemExit("Simulated error")
+            return self.arvfile_write(*args, **kwargs)
+
+        with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
+                        autospec=True) as mocked_write:
+            mocked_write.side_effect = wrapped_write
+            writer = arv_put.ArvPutUploadJob([self.large_file_name],
+                                             replication_desired=1)
+            # We'll be accessing from inside the wrapper
+            self.writer = writer
+            with self.assertRaises(SystemExit):
+                writer.start(save_collection=False)
+            # Confirm that the file was partially uploaded
+            self.assertGreater(writer.bytes_written, 0)
+            self.assertLess(writer.bytes_written,
+                            os.path.getsize(self.large_file_name))
+        # Retry the upload using dry_run to check if there is a pending upload
+        writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
+                                          replication_desired=1,
+                                          dry_run=True)
+        with self.assertRaises(arv_put.ArvPutUploadIsPending):
+            writer2.start(save_collection=False)
+        # Complete the pending upload
+        writer3 = arv_put.ArvPutUploadJob([self.large_file_name],
+                                          replication_desired=1)
+        writer3.start(save_collection=False)
+        # Confirm there's no pending upload with dry_run=True
+        writer4 = arv_put.ArvPutUploadJob([self.large_file_name],
+                                          replication_desired=1,
+                                          dry_run=True)
+        with self.assertRaises(arv_put.ArvPutUploadNotPending):
+            writer4.start(save_collection=False)
+        writer4.destroy_cache()
+        # Test obvious cases
+        with self.assertRaises(arv_put.ArvPutUploadIsPending):
+            arv_put.ArvPutUploadJob([self.large_file_name],
+                                    replication_desired=1,
+                                    dry_run=True,
+                                    resume=False,
+                                    use_cache=False)
+        with self.assertRaises(arv_put.ArvPutUploadIsPending):
+            arv_put.ArvPutUploadJob([self.large_file_name],
+                                    replication_desired=1,
+                                    dry_run=True,
+                                    resume=False)
+        del(self.writer)
 
 class ArvadosExpectedBytesTest(ArvadosBaseTestCase):
     TEST_SIZE = os.path.getsize(__file__)
@@ -408,6 +593,15 @@ class ArvadosPutTest(run_test_server.TestCaseWithServers, ArvadosBaseTestCase):
                 delattr(self, outbuf)
         super(ArvadosPutTest, self).tearDown()
 
+    def test_version_argument(self):
+        err = io.BytesIO()
+        out = io.BytesIO()
+        with tutil.redirected_streams(stdout=out, stderr=err):
+            with self.assertRaises(SystemExit):
+                self.call_main_with_args(['--version'])
+        self.assertEqual(out.getvalue(), '')
+        self.assertRegexpMatches(err.getvalue(), "[0-9]+\.[0-9]+\.[0-9]+")
+
     def test_simple_file_put(self):
         self.call_main_on_test_file()
 
@@ -624,6 +818,21 @@ class ArvPutIntegrationTest(run_test_server.TestCaseWithServers,
         self.assertEqual(1, len(collection_list))
         return collection_list[0]
 
+    def test_put_collection_with_later_update(self):
+        tmpdir = self.make_tmpdir()
+        with open(os.path.join(tmpdir, 'file1'), 'w') as f:
+            f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
+        col = self.run_and_find_collection("", ['--no-progress', tmpdir])
+        self.assertNotEqual(None, col['uuid'])
+        # Add a new file to the directory
+        with open(os.path.join(tmpdir, 'file2'), 'w') as f:
+            f.write('The quick brown fox jumped over the lazy dog')
+        updated_col = self.run_and_find_collection("", ['--no-progress', '--update-collection', col['uuid'], tmpdir])
+        self.assertEqual(col['uuid'], updated_col['uuid'])
+        # Get the manifest and check that the new file is being included
+        c = arv_put.api_client.collections().get(uuid=updated_col['uuid']).execute()
+        self.assertRegexpMatches(c['manifest_text'], r'^\. .*:44:file2\n')
+
     def test_put_collection_with_high_redundancy(self):
         # Write empty data: we're not testing CollectionWriter, just
         # making sure collections.create tells the API server what our