9463: Polishing the last details to make the integration tests work ok
[arvados.git] / sdk / python / tests / test_arv_put.py
old mode 100644 (file)
new mode 100755 (executable)
index 4e4e096..0990075
@@ -2,11 +2,13 @@
 # -*- coding: utf-8 -*-
 
 import apiclient
+import mock
 import os
 import pwd
 import re
 import shutil
 import subprocess
+import multiprocessing
 import sys
 import tempfile
 import time
@@ -18,7 +20,7 @@ from cStringIO import StringIO
 import arvados
 import arvados.commands.put as arv_put
 
-from arvados_testutil import ArvadosBaseTestCase
+from arvados_testutil import ArvadosBaseTestCase, fake_httplib2_response
 import run_test_server
 
 class ArvadosPutResumeCacheTest(ArvadosBaseTestCase):
@@ -43,7 +45,7 @@ class ArvadosPutResumeCacheTest(ArvadosBaseTestCase):
 
     def test_cache_names_stable(self):
         for argset in self.CACHE_ARGSET:
-            self.assertEquals(self.cache_path_from_arglist(argset),
+            self.assertEqual(self.cache_path_from_arglist(argset),
                               self.cache_path_from_arglist(argset),
                               "cache name changed for {}".format(argset))
 
@@ -65,10 +67,10 @@ class ArvadosPutResumeCacheTest(ArvadosBaseTestCase):
                              "path too exotic: {}".format(path))
 
     def test_cache_names_ignore_argument_order(self):
-        self.assertEquals(
+        self.assertEqual(
             self.cache_path_from_arglist(['a', 'b', 'c']),
             self.cache_path_from_arglist(['c', 'a', 'b']))
-        self.assertEquals(
+        self.assertEqual(
             self.cache_path_from_arglist(['-', '--filename', 'stdin']),
             self.cache_path_from_arglist(['--filename', 'stdin', '-']))
 
@@ -84,32 +86,32 @@ class ArvadosPutResumeCacheTest(ArvadosBaseTestCase):
         args = arv_put.parse_arguments(['/tmp'])
         args.filename = 'tmp'
         path2 = arv_put.ResumeCache.make_path(args)
-        self.assertEquals(path1, path2,
+        self.assertEqual(path1, path2,
                          "cache path considered --filename for directory")
-        self.assertEquals(
+        self.assertEqual(
             self.cache_path_from_arglist(['-']),
             self.cache_path_from_arglist(['-', '--max-manifest-depth', '1']),
             "cache path considered --max-manifest-depth for file")
 
     def test_cache_names_treat_negative_manifest_depths_identically(self):
         base_args = ['/tmp', '--max-manifest-depth']
-        self.assertEquals(
+        self.assertEqual(
             self.cache_path_from_arglist(base_args + ['-1']),
             self.cache_path_from_arglist(base_args + ['-2']))
 
     def test_cache_names_treat_stdin_consistently(self):
-        self.assertEquals(
+        self.assertEqual(
             self.cache_path_from_arglist(['-', '--filename', 'test']),
             self.cache_path_from_arglist(['/dev/stdin', '--filename', 'test']))
 
     def test_cache_names_identical_for_synonymous_names(self):
-        self.assertEquals(
+        self.assertEqual(
             self.cache_path_from_arglist(['.']),
             self.cache_path_from_arglist([os.path.realpath('.')]))
         testdir = self.make_tmpdir()
         looplink = os.path.join(testdir, 'loop')
         os.symlink(testdir, looplink)
-        self.assertEquals(
+        self.assertEqual(
             self.cache_path_from_arglist([testdir]),
             self.cache_path_from_arglist([looplink]))
 
@@ -126,12 +128,49 @@ class ArvadosPutResumeCacheTest(ArvadosBaseTestCase):
             else:
                 config['ARVADOS_API_HOST'] = orig_host
 
+    @mock.patch('arvados.keep.KeepClient.head')
+    def test_resume_cache_with_current_stream_locators(self, keep_client_head):
+        keep_client_head.side_effect = [True]
+        thing = {}
+        thing['_current_stream_locators'] = ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']
+        with tempfile.NamedTemporaryFile() as cachefile:
+            self.last_cache = arv_put.ResumeCache(cachefile.name)
+        self.last_cache.save(thing)
+        self.last_cache.close()
+        resume_cache = arv_put.ResumeCache(self.last_cache.filename)
+        self.assertNotEqual(None, resume_cache)
+
+    @mock.patch('arvados.keep.KeepClient.head')
+    def test_resume_cache_with_finished_streams(self, keep_client_head):
+        keep_client_head.side_effect = [True]
+        thing = {}
+        thing['_finished_streams'] = [['.', ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']]]
+        with tempfile.NamedTemporaryFile() as cachefile:
+            self.last_cache = arv_put.ResumeCache(cachefile.name)
+        self.last_cache.save(thing)
+        self.last_cache.close()
+        resume_cache = arv_put.ResumeCache(self.last_cache.filename)
+        self.assertNotEqual(None, resume_cache)
+
+    @mock.patch('arvados.keep.KeepClient.head')
+    def test_resume_cache_with_finished_streams_error_on_head(self, keep_client_head):
+        keep_client_head.side_effect = Exception('Locator not found')
+        thing = {}
+        thing['_finished_streams'] = [['.', ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']]]
+        with tempfile.NamedTemporaryFile() as cachefile:
+            self.last_cache = arv_put.ResumeCache(cachefile.name)
+        self.last_cache.save(thing)
+        self.last_cache.close()
+        resume_cache = arv_put.ResumeCache(self.last_cache.filename)
+        self.assertNotEqual(None, resume_cache)
+        self.assertRaises(None, resume_cache.check_cache())
+
     def test_basic_cache_storage(self):
         thing = ['test', 'list']
         with tempfile.NamedTemporaryFile() as cachefile:
             self.last_cache = arv_put.ResumeCache(cachefile.name)
         self.last_cache.save(thing)
-        self.assertEquals(thing, self.last_cache.load())
+        self.assertEqual(thing, self.last_cache.load())
 
     def test_empty_cache(self):
         with tempfile.NamedTemporaryFile() as cachefile:
@@ -145,7 +184,7 @@ class ArvadosPutResumeCacheTest(ArvadosBaseTestCase):
         cache.save(thing)
         cache.close()
         self.last_cache = arv_put.ResumeCache(path)
-        self.assertEquals(thing, self.last_cache.load())
+        self.assertEqual(thing, self.last_cache.load())
 
     def test_multiple_cache_writes(self):
         thing = ['short', 'list']
@@ -155,7 +194,7 @@ class ArvadosPutResumeCacheTest(ArvadosBaseTestCase):
         # sure the cache file gets truncated.
         self.last_cache.save(['long', 'long', 'list'])
         self.last_cache.save(thing)
-        self.assertEquals(thing, self.last_cache.load())
+        self.assertEqual(thing, self.last_cache.load())
 
     def test_cache_is_locked(self):
         with tempfile.NamedTemporaryFile() as cachefile:
@@ -196,6 +235,86 @@ class ArvadosPutResumeCacheTest(ArvadosBaseTestCase):
                           arv_put.ResumeCache, path)
 
 
+class ArvadosPutCollectionTest(run_test_server.TestCaseWithServers):
+    MAIN_SERVER = {}
+    KEEP_SERVER = {}
+    import shutil
+        
+    # def test_write_files(self):
+    #     c = arv_put.ArvPutCollection()
+    #     data = 'a' * 1024 * 1024 # 1 MB
+    #     tmpdir = tempfile.mkdtemp()
+    #     for size in [1, 10, 64, 128]:
+    #         with open(os.path.join(tmpdir, 'file_%d' % size), 'w') as f:
+    #             for _ in range(size):
+    #                 f.write(data)
+    #         c.write_file(f.name, os.path.basename(f.name))
+    #     shutil.rmtree(tmpdir)
+    #     self.assertEqual(True, c.manifest())
+    #
+    # def test_write_directory(self):
+    #     data = 'b' * 1024 * 1024
+    #     tmpdir = tempfile.mkdtemp()
+    #     for size in [1, 5, 10, 70]:
+    #         with open(os.path.join(tmpdir, 'file_%d' % size), 'w') as f:
+    #             for _ in range(size):
+    #                 f.write(data)
+    #     os.mkdir(os.path.join(tmpdir, 'subdir1'))
+    #     for size in [2, 4, 6]:
+    #         with open(os.path.join(tmpdir, 'subdir1', 'file_%d' % size), 'w') as f:
+    #             for _ in range(size):
+    #                 f.write(data)
+    #     c = arv_put.ArvPutUploader([tmpdir])
+    #     shutil.rmtree(tmpdir)
+    #     self.assertEqual(True, c.manifest())
+
+    def fake_reporter(self, written, expected):
+        # Use this callback as a intra-block pause to be able to simulate an interruption
+        print "Written %d / %d bytes" % (written, expected)
+        time.sleep(10)
+    
+    def bg_uploader(self, paths):
+        return arv_put.ArvPutUploader(paths, reporter=self.fake_reporter)
+
+    # def test_resume_large_file_upload(self):
+    #     import multiprocessing
+    #     data = 'x' * 1024 * 1024 # 1 MB
+    #     _, filename = tempfile.mkstemp()
+    #     fileobj = open(filename, 'w')
+    #     for _ in range(200):
+    #         fileobj.write(data)
+    #     fileobj.close()
+    #     uploader = multiprocessing.Process(target=self.bg_uploader, args=([filename],))
+    #     uploader.start()
+    #     time.sleep(5)
+    #     uploader.terminate()
+    #     time.sleep(1)
+    #     # cache = arv_put.ArvPutCollectionCache([filename])
+    #     # print "Collection detected: %s" % cache.collection()
+    #     # c = arv_put.ArvPutCollection(locator=cache.collection(), cache=cache)
+    #     # print "UPLOADED: %d" % c.collection[os.path.basename(filename)].size()
+    #     # self.assertLess(c.collection[os.path.basename(filename)].size(), os.path.getsize(filename))
+    #     os.unlink(filename)
+
+    # def test_write_directory_twice(self):
+    #     data = 'b' * 1024 * 1024
+    #     tmpdir = tempfile.mkdtemp()
+    #     for size in [1, 5, 10, 70]:
+    #         with open(os.path.join(tmpdir, 'file_%d' % size), 'w') as f:
+    #             for _ in range(size):
+    #                 f.write(data)
+    #     os.mkdir(os.path.join(tmpdir, 'subdir1'))
+    #     for size in [2, 4, 6]:
+    #         with open(os.path.join(tmpdir, 'subdir1', 'file_%d' % size), 'w') as f:
+    #             for _ in range(size):
+    #                 f.write(data)
+    #     c = arv_put.ArvPutUploader([tmpdir])
+    #     d = arv_put.ArvPutUploader([tmpdir])
+    #     print "ESCRIBIERON: c: %d, d: %d" % (c.bytes_written(), d.bytes_written())
+    #     shutil.rmtree(tmpdir)
+    #     self.assertEqual(0, d.bytes_written())
+        
+
 class ArvadosPutCollectionWriterTest(run_test_server.TestCaseWithServers,
                                      ArvadosBaseTestCase):
     def setUp(self):
@@ -216,12 +335,12 @@ class ArvadosPutCollectionWriterTest(run_test_server.TestCaseWithServers,
         cwriter.write_file('/dev/null')
         cwriter.cache_state()
         self.assertTrue(self.cache.load())
-        self.assertEquals(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text())
+        self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text())
 
     def test_writer_works_without_cache(self):
         cwriter = arv_put.ArvPutCollectionWriter()
         cwriter.write_file('/dev/null')
-        self.assertEquals(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text())
+        self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text())
 
     def test_writer_resumes_from_cache(self):
         cwriter = arv_put.ArvPutCollectionWriter(self.cache)
@@ -230,7 +349,7 @@ class ArvadosPutCollectionWriterTest(run_test_server.TestCaseWithServers,
             cwriter.cache_state()
             new_writer = arv_put.ArvPutCollectionWriter.from_cache(
                 self.cache)
-            self.assertEquals(
+            self.assertEqual(
                 ". 098f6bcd4621d373cade4e832627b4f6+4 0:4:test\n",
                 new_writer.manifest_text())
 
@@ -240,12 +359,12 @@ class ArvadosPutCollectionWriterTest(run_test_server.TestCaseWithServers,
             cwriter.write_file(testfile.name, 'test')
         new_writer = arv_put.ArvPutCollectionWriter.from_cache(self.cache)
         new_writer.write_file('/dev/null')
-        self.assertEquals(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", new_writer.manifest_text())
+        self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", new_writer.manifest_text())
 
     def test_new_writer_from_empty_cache(self):
         cwriter = arv_put.ArvPutCollectionWriter.from_cache(self.cache)
         cwriter.write_file('/dev/null')
-        self.assertEquals(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text())
+        self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text())
 
     def test_writer_resumable_after_arbitrary_bytes(self):
         cwriter = arv_put.ArvPutCollectionWriter(self.cache)
@@ -255,7 +374,7 @@ class ArvadosPutCollectionWriterTest(run_test_server.TestCaseWithServers,
             cwriter.cache_state()
             new_writer = arv_put.ArvPutCollectionWriter.from_cache(
                 self.cache)
-        self.assertEquals(cwriter.manifest_text(), new_writer.manifest_text())
+        self.assertEqual(cwriter.manifest_text(), new_writer.manifest_text())
 
     def make_progress_tester(self):
         progression = []
@@ -288,16 +407,16 @@ class ArvadosExpectedBytesTest(ArvadosBaseTestCase):
     TEST_SIZE = os.path.getsize(__file__)
 
     def test_expected_bytes_for_file(self):
-        self.assertEquals(self.TEST_SIZE,
+        self.assertEqual(self.TEST_SIZE,
                           arv_put.expected_bytes_for([__file__]))
 
     def test_expected_bytes_for_tree(self):
         tree = self.make_tmpdir()
         shutil.copyfile(__file__, os.path.join(tree, 'one'))
         shutil.copyfile(__file__, os.path.join(tree, 'two'))
-        self.assertEquals(self.TEST_SIZE * 2,
+        self.assertEqual(self.TEST_SIZE * 2,
                           arv_put.expected_bytes_for([tree]))
-        self.assertEquals(self.TEST_SIZE * 3,
+        self.assertEqual(self.TEST_SIZE * 3,
                           arv_put.expected_bytes_for([tree, __file__]))
 
     def test_expected_bytes_for_device(self):
@@ -335,10 +454,10 @@ class ArvadosPutTest(run_test_server.TestCaseWithServers, ArvadosBaseTestCase):
         self.main_stderr = StringIO()
         return arv_put.main(args, self.main_stdout, self.main_stderr)
 
-    def call_main_on_test_file(self):
+    def call_main_on_test_file(self, args=[]):
         with self.make_test_file() as testfile:
             path = testfile.name
-            self.call_main_with_args(['--stream', '--no-progress'path])
+            self.call_main_with_args(['--stream', '--no-progress'] + args + [path])
         self.assertTrue(
             os.path.exists(os.path.join(os.environ['KEEP_LOCAL_STORE'],
                                         '098f6bcd4621d373cade4e832627b4f6')),
@@ -381,6 +500,32 @@ class ArvadosPutTest(run_test_server.TestCaseWithServers, ArvadosBaseTestCase):
             arv_put.ResumeCache.CACHE_DIR = orig_cachedir
             os.chmod(cachedir, 0o700)
 
+    def test_put_block_replication(self):
+        with mock.patch('arvados.collection.KeepClient.local_store_put') as put_mock, \
+             mock.patch('arvados.commands.put.ResumeCache.load') as cache_mock:
+            cache_mock.side_effect = ValueError
+            put_mock.return_value = 'acbd18db4cc2f85cedef654fccc4a4d8+3'
+            self.call_main_on_test_file(['--replication', '1'])
+            self.call_main_on_test_file(['--replication', '4'])
+            self.call_main_on_test_file(['--replication', '5'])
+            self.assertEqual(
+                [x[-1].get('copies') for x in put_mock.call_args_list],
+                [1, 4, 5])
+
+    def test_normalize(self):
+        testfile1 = self.make_test_file()
+        testfile2 = self.make_test_file()
+        test_paths = [testfile1.name, testfile2.name]
+        # Reverse-sort the paths, so normalization must change their order.
+        test_paths.sort(reverse=True)
+        self.call_main_with_args(['--stream', '--no-progress', '--normalize'] +
+                                 test_paths)
+        manifest = self.main_stdout.getvalue()
+        # Assert the second file we specified appears first in the manifest.
+        file_indices = [manifest.find(':' + os.path.basename(path))
+                        for path in test_paths]
+        self.assertGreater(*file_indices)
+
     def test_error_name_without_collection(self):
         self.assertRaises(SystemExit, self.call_main_with_args,
                           ['--name', 'test without Collection',
@@ -396,12 +541,30 @@ class ArvadosPutTest(run_test_server.TestCaseWithServers, ArvadosBaseTestCase):
                           self.call_main_with_args,
                           ['--project-uuid', self.Z_UUID, '--stream'])
 
+    def test_api_error_handling(self):
+        collections_mock = mock.Mock(name='arv.collections()')
+        coll_create_mock = collections_mock().create().execute
+        coll_create_mock.side_effect = arvados.errors.ApiError(
+            fake_httplib2_response(403), '{}')
+        arv_put.api_client = arvados.api('v1')
+        arv_put.api_client.collections = collections_mock
+        with self.assertRaises(SystemExit) as exc_test:
+            self.call_main_with_args(['/dev/null'])
+        self.assertLess(0, exc_test.exception.args[0])
+        self.assertLess(0, coll_create_mock.call_count)
+        self.assertEqual("", self.main_stdout.getvalue())
+
+
 class ArvPutIntegrationTest(run_test_server.TestCaseWithServers,
                             ArvadosBaseTestCase):
     def _getKeepServerConfig():
-        for config_file in ['application.yml', 'application.default.yml']:
-            with open(os.path.join(run_test_server.SERVICES_SRC_DIR,
-                                   "api", "config", config_file)) as f:
+        for config_file, mandatory in [
+                ['application.yml', False], ['application.default.yml', True]]:
+            path = os.path.join(run_test_server.SERVICES_SRC_DIR,
+                                "api", "config", config_file)
+            if not mandatory and not os.path.exists(path):
+                continue
+            with open(path) as f:
                 rails_config = yaml.load(f.read())
                 for config_section in ['test', 'common']:
                     try: