10467: Merge branch 'master' into 10467-client-disconnect
[arvados.git] / sdk / python / tests / test_arv_put.py
index c9c63e9963eb4038b0c0514af552683258b61118..f1dfd03def33d09c1ede560f50c5a059020f9c0c 100644 (file)
@@ -2,6 +2,8 @@
 # -*- coding: utf-8 -*-
 
 import apiclient
+import io
+import mock
 import os
 import pwd
 import re
@@ -12,13 +14,17 @@ import tempfile
 import time
 import unittest
 import yaml
+import threading
+import hashlib
+import random
 
 from cStringIO import StringIO
 
 import arvados
 import arvados.commands.put as arv_put
+import arvados_testutil as tutil
 
-from arvados_testutil import ArvadosBaseTestCase, ArvadosKeepLocalStoreTestCase
+from arvados_testutil import ArvadosBaseTestCase, fake_httplib2_response
 import run_test_server
 
 class ArvadosPutResumeCacheTest(ArvadosBaseTestCase):
@@ -26,9 +32,7 @@ class ArvadosPutResumeCacheTest(ArvadosBaseTestCase):
         [],
         ['/dev/null'],
         ['/dev/null', '--filename', 'empty'],
-        ['/tmp'],
-        ['/tmp', '--max-manifest-depth', '0'],
-        ['/tmp', '--max-manifest-depth', '1']
+        ['/tmp']
         ]
 
     def tearDown(self):
@@ -43,7 +47,7 @@ class ArvadosPutResumeCacheTest(ArvadosBaseTestCase):
 
     def test_cache_names_stable(self):
         for argset in self.CACHE_ARGSET:
-            self.assertEquals(self.cache_path_from_arglist(argset),
+            self.assertEqual(self.cache_path_from_arglist(argset),
                               self.cache_path_from_arglist(argset),
                               "cache name changed for {}".format(argset))
 
@@ -65,10 +69,10 @@ class ArvadosPutResumeCacheTest(ArvadosBaseTestCase):
                              "path too exotic: {}".format(path))
 
     def test_cache_names_ignore_argument_order(self):
-        self.assertEquals(
+        self.assertEqual(
             self.cache_path_from_arglist(['a', 'b', 'c']),
             self.cache_path_from_arglist(['c', 'a', 'b']))
-        self.assertEquals(
+        self.assertEqual(
             self.cache_path_from_arglist(['-', '--filename', 'stdin']),
             self.cache_path_from_arglist(['--filename', 'stdin', '-']))
 
@@ -84,32 +88,32 @@ class ArvadosPutResumeCacheTest(ArvadosBaseTestCase):
         args = arv_put.parse_arguments(['/tmp'])
         args.filename = 'tmp'
         path2 = arv_put.ResumeCache.make_path(args)
-        self.assertEquals(path1, path2,
+        self.assertEqual(path1, path2,
                          "cache path considered --filename for directory")
-        self.assertEquals(
+        self.assertEqual(
             self.cache_path_from_arglist(['-']),
             self.cache_path_from_arglist(['-', '--max-manifest-depth', '1']),
             "cache path considered --max-manifest-depth for file")
 
     def test_cache_names_treat_negative_manifest_depths_identically(self):
         base_args = ['/tmp', '--max-manifest-depth']
-        self.assertEquals(
+        self.assertEqual(
             self.cache_path_from_arglist(base_args + ['-1']),
             self.cache_path_from_arglist(base_args + ['-2']))
 
     def test_cache_names_treat_stdin_consistently(self):
-        self.assertEquals(
+        self.assertEqual(
             self.cache_path_from_arglist(['-', '--filename', 'test']),
             self.cache_path_from_arglist(['/dev/stdin', '--filename', 'test']))
 
     def test_cache_names_identical_for_synonymous_names(self):
-        self.assertEquals(
+        self.assertEqual(
             self.cache_path_from_arglist(['.']),
             self.cache_path_from_arglist([os.path.realpath('.')]))
         testdir = self.make_tmpdir()
         looplink = os.path.join(testdir, 'loop')
         os.symlink(testdir, looplink)
-        self.assertEquals(
+        self.assertEqual(
             self.cache_path_from_arglist([testdir]),
             self.cache_path_from_arglist([looplink]))
 
@@ -126,12 +130,49 @@ class ArvadosPutResumeCacheTest(ArvadosBaseTestCase):
             else:
                 config['ARVADOS_API_HOST'] = orig_host
 
+    @mock.patch('arvados.keep.KeepClient.head')
+    def test_resume_cache_with_current_stream_locators(self, keep_client_head):
+        keep_client_head.side_effect = [True]
+        thing = {}
+        thing['_current_stream_locators'] = ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']
+        with tempfile.NamedTemporaryFile() as cachefile:
+            self.last_cache = arv_put.ResumeCache(cachefile.name)
+        self.last_cache.save(thing)
+        self.last_cache.close()
+        resume_cache = arv_put.ResumeCache(self.last_cache.filename)
+        self.assertNotEqual(None, resume_cache)
+
+    @mock.patch('arvados.keep.KeepClient.head')
+    def test_resume_cache_with_finished_streams(self, keep_client_head):
+        keep_client_head.side_effect = [True]
+        thing = {}
+        thing['_finished_streams'] = [['.', ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']]]
+        with tempfile.NamedTemporaryFile() as cachefile:
+            self.last_cache = arv_put.ResumeCache(cachefile.name)
+        self.last_cache.save(thing)
+        self.last_cache.close()
+        resume_cache = arv_put.ResumeCache(self.last_cache.filename)
+        self.assertNotEqual(None, resume_cache)
+
+    @mock.patch('arvados.keep.KeepClient.head')
+    def test_resume_cache_with_finished_streams_error_on_head(self, keep_client_head):
+        keep_client_head.side_effect = Exception('Locator not found')
+        thing = {}
+        thing['_finished_streams'] = [['.', ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']]]
+        with tempfile.NamedTemporaryFile() as cachefile:
+            self.last_cache = arv_put.ResumeCache(cachefile.name)
+        self.last_cache.save(thing)
+        self.last_cache.close()
+        resume_cache = arv_put.ResumeCache(self.last_cache.filename)
+        self.assertNotEqual(None, resume_cache)
+        self.assertRaises(None, resume_cache.check_cache())
+
     def test_basic_cache_storage(self):
         thing = ['test', 'list']
         with tempfile.NamedTemporaryFile() as cachefile:
             self.last_cache = arv_put.ResumeCache(cachefile.name)
         self.last_cache.save(thing)
-        self.assertEquals(thing, self.last_cache.load())
+        self.assertEqual(thing, self.last_cache.load())
 
     def test_empty_cache(self):
         with tempfile.NamedTemporaryFile() as cachefile:
@@ -145,7 +186,7 @@ class ArvadosPutResumeCacheTest(ArvadosBaseTestCase):
         cache.save(thing)
         cache.close()
         self.last_cache = arv_put.ResumeCache(path)
-        self.assertEquals(thing, self.last_cache.load())
+        self.assertEqual(thing, self.last_cache.load())
 
     def test_multiple_cache_writes(self):
         thing = ['short', 'list']
@@ -155,7 +196,7 @@ class ArvadosPutResumeCacheTest(ArvadosBaseTestCase):
         # sure the cache file gets truncated.
         self.last_cache.save(['long', 'long', 'list'])
         self.last_cache.save(thing)
-        self.assertEquals(thing, self.last_cache.load())
+        self.assertEqual(thing, self.last_cache.load())
 
     def test_cache_is_locked(self):
         with tempfile.NamedTemporaryFile() as cachefile:
@@ -196,64 +237,54 @@ class ArvadosPutResumeCacheTest(ArvadosBaseTestCase):
                           arv_put.ResumeCache, path)
 
 
-class ArvadosPutCollectionWriterTest(ArvadosKeepLocalStoreTestCase):
+class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
+                          ArvadosBaseTestCase):
+
     def setUp(self):
-        super(ArvadosPutCollectionWriterTest, self).setUp()
-        with tempfile.NamedTemporaryFile(delete=False) as cachefile:
-            self.cache = arv_put.ResumeCache(cachefile.name)
-            self.cache_filename = cachefile.name
+        super(ArvPutUploadJobTest, self).setUp()
+        run_test_server.authorize_with('active')
+        # Temp files creation
+        self.tempdir = tempfile.mkdtemp()
+        subdir = os.path.join(self.tempdir, 'subdir')
+        os.mkdir(subdir)
+        data = "x" * 1024 # 1 KB
+        for i in range(1, 5):
+            with open(os.path.join(self.tempdir, str(i)), 'w') as f:
+                f.write(data * i)
+        with open(os.path.join(subdir, 'otherfile'), 'w') as f:
+            f.write(data * 5)
+        # Large temp file for resume test
+        _, self.large_file_name = tempfile.mkstemp()
+        fileobj = open(self.large_file_name, 'w')
+        # Make sure to write just a little more than one block
+        for _ in range((arvados.config.KEEP_BLOCK_SIZE/(1024*1024))+1):
+            data = random.choice(['x', 'y', 'z']) * 1024 * 1024 # 1 MB
+            fileobj.write(data)
+        fileobj.close()
+        self.arvfile_write = getattr(arvados.arvfile.ArvadosFileWriter, 'write')
 
     def tearDown(self):
-        super(ArvadosPutCollectionWriterTest, self).tearDown()
-        if os.path.exists(self.cache_filename):
-            self.cache.destroy()
-        self.cache.close()
-
-    def test_writer_caches(self):
-        cwriter = arv_put.ArvPutCollectionWriter(self.cache)
-        cwriter.write_file('/dev/null')
-        cwriter.cache_state()
-        self.assertTrue(self.cache.load())
-        self.assertEquals(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text())
+        super(ArvPutUploadJobTest, self).tearDown()
+        shutil.rmtree(self.tempdir)
+        os.unlink(self.large_file_name)
 
     def test_writer_works_without_cache(self):
-        cwriter = arv_put.ArvPutCollectionWriter()
-        cwriter.write_file('/dev/null')
-        self.assertEquals(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text())
-
-    def test_writer_resumes_from_cache(self):
-        cwriter = arv_put.ArvPutCollectionWriter(self.cache)
-        with self.make_test_file() as testfile:
-            cwriter.write_file(testfile.name, 'test')
-            cwriter.cache_state()
-            new_writer = arv_put.ArvPutCollectionWriter.from_cache(
-                self.cache)
-            self.assertEquals(
-                ". 098f6bcd4621d373cade4e832627b4f6+4 0:4:test\n",
-                new_writer.manifest_text())
-
-    def test_new_writer_from_stale_cache(self):
-        cwriter = arv_put.ArvPutCollectionWriter(self.cache)
-        with self.make_test_file() as testfile:
-            cwriter.write_file(testfile.name, 'test')
-        new_writer = arv_put.ArvPutCollectionWriter.from_cache(self.cache)
-        new_writer.write_file('/dev/null')
-        self.assertEquals(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", new_writer.manifest_text())
-
-    def test_new_writer_from_empty_cache(self):
-        cwriter = arv_put.ArvPutCollectionWriter.from_cache(self.cache)
-        cwriter.write_file('/dev/null')
-        self.assertEquals(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text())
-
-    def test_writer_resumable_after_arbitrary_bytes(self):
-        cwriter = arv_put.ArvPutCollectionWriter(self.cache)
-        # These bytes are intentionally not valid UTF-8.
-        with self.make_test_file('\x00\x07\xe2') as testfile:
-            cwriter.write_file(testfile.name, 'test')
-            cwriter.cache_state()
-            new_writer = arv_put.ArvPutCollectionWriter.from_cache(
-                self.cache)
-        self.assertEquals(cwriter.manifest_text(), new_writer.manifest_text())
+        cwriter = arv_put.ArvPutUploadJob(['/dev/null'], resume=False)
+        cwriter.start(save_collection=False)
+        self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text())
+
+    def test_writer_works_with_cache(self):
+        with tempfile.NamedTemporaryFile() as f:
+            f.write('foo')
+            f.flush()
+            cwriter = arv_put.ArvPutUploadJob([f.name])
+            cwriter.start(save_collection=False)
+            self.assertEqual(3, cwriter.bytes_written - cwriter.bytes_skipped)
+            # Don't destroy the cache, and start another upload
+            cwriter_new = arv_put.ArvPutUploadJob([f.name])
+            cwriter_new.start(save_collection=False)
+            cwriter_new.destroy_cache()
+            self.assertEqual(0, cwriter_new.bytes_written - cwriter_new.bytes_skipped)
 
     def make_progress_tester(self):
         progression = []
@@ -262,40 +293,174 @@ class ArvadosPutCollectionWriterTest(ArvadosKeepLocalStoreTestCase):
         return progression, record_func
 
     def test_progress_reporting(self):
-        for expect_count in (None, 8):
-            progression, reporter = self.make_progress_tester()
-            cwriter = arv_put.ArvPutCollectionWriter(
-                reporter=reporter, bytes_expected=expect_count)
-            with self.make_test_file() as testfile:
-                cwriter.write_file(testfile.name, 'test')
-            cwriter.finish_current_stream()
-            self.assertIn((4, expect_count), progression)
-
-    def test_resume_progress(self):
-        cwriter = arv_put.ArvPutCollectionWriter(self.cache, bytes_expected=4)
-        with self.make_test_file() as testfile:
-            # Set up a writer with some flushed bytes.
-            cwriter.write_file(testfile.name, 'test')
-            cwriter.finish_current_stream()
-            cwriter.cache_state()
-            new_writer = arv_put.ArvPutCollectionWriter.from_cache(self.cache)
-            self.assertEqual(new_writer.bytes_written, 4)
+        with tempfile.NamedTemporaryFile() as f:
+            f.write('foo')
+            f.flush()
+            for expect_count in (None, 8):
+                progression, reporter = self.make_progress_tester()
+                cwriter = arv_put.ArvPutUploadJob([f.name],
+                    reporter=reporter, bytes_expected=expect_count)
+                cwriter.start(save_collection=False)
+                cwriter.destroy_cache()
+                self.assertIn((3, expect_count), progression)
+
+    def test_writer_upload_directory(self):
+        cwriter = arv_put.ArvPutUploadJob([self.tempdir])
+        cwriter.start(save_collection=False)
+        cwriter.destroy_cache()
+        self.assertEqual(1024*(1+2+3+4+5), cwriter.bytes_written)
+
+    def test_resume_large_file_upload(self):
+        def wrapped_write(*args, **kwargs):
+            data = args[1]
+            # Exit only on last block
+            if len(data) < arvados.config.KEEP_BLOCK_SIZE:
+                raise SystemExit("Simulated error")
+            return self.arvfile_write(*args, **kwargs)
+
+        with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
+                        autospec=True) as mocked_write:
+            mocked_write.side_effect = wrapped_write
+            writer = arv_put.ArvPutUploadJob([self.large_file_name],
+                                             replication_desired=1)
+            with self.assertRaises(SystemExit):
+                writer.start(save_collection=False)
+            # Confirm that the file was partially uploaded
+            self.assertGreater(writer.bytes_written, 0)
+            self.assertLess(writer.bytes_written,
+                            os.path.getsize(self.large_file_name))
+        # Retry the upload
+        writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
+                                          replication_desired=1)
+        writer2.start(save_collection=False)
+        self.assertEqual(writer.bytes_written + writer2.bytes_written - writer2.bytes_skipped,
+                         os.path.getsize(self.large_file_name))
+        writer2.destroy_cache()
+
+    def test_no_resume_when_asked(self):
+        def wrapped_write(*args, **kwargs):
+            data = args[1]
+            # Exit only on last block
+            if len(data) < arvados.config.KEEP_BLOCK_SIZE:
+                raise SystemExit("Simulated error")
+            return self.arvfile_write(*args, **kwargs)
+
+        with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
+                        autospec=True) as mocked_write:
+            mocked_write.side_effect = wrapped_write
+            writer = arv_put.ArvPutUploadJob([self.large_file_name],
+                                             replication_desired=1)
+            with self.assertRaises(SystemExit):
+                writer.start(save_collection=False)
+            # Confirm that the file was partially uploaded
+            self.assertGreater(writer.bytes_written, 0)
+            self.assertLess(writer.bytes_written,
+                            os.path.getsize(self.large_file_name))
+        # Retry the upload, this time without resume
+        writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
+                                          replication_desired=1,
+                                          resume=False)
+        writer2.start(save_collection=False)
+        self.assertEqual(writer2.bytes_skipped, 0)
+        self.assertEqual(writer2.bytes_written,
+                         os.path.getsize(self.large_file_name))
+        writer2.destroy_cache()
+
+    def test_no_resume_when_no_cache(self):
+        def wrapped_write(*args, **kwargs):
+            data = args[1]
+            # Exit only on last block
+            if len(data) < arvados.config.KEEP_BLOCK_SIZE:
+                raise SystemExit("Simulated error")
+            return self.arvfile_write(*args, **kwargs)
+
+        with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
+                        autospec=True) as mocked_write:
+            mocked_write.side_effect = wrapped_write
+            writer = arv_put.ArvPutUploadJob([self.large_file_name],
+                                             replication_desired=1)
+            with self.assertRaises(SystemExit):
+                writer.start(save_collection=False)
+            # Confirm that the file was partially uploaded
+            self.assertGreater(writer.bytes_written, 0)
+            self.assertLess(writer.bytes_written,
+                            os.path.getsize(self.large_file_name))
+        # Retry the upload, this time without cache usage
+        writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
+                                          replication_desired=1,
+                                          resume=False,
+                                          use_cache=False)
+        writer2.start(save_collection=False)
+        self.assertEqual(writer2.bytes_skipped, 0)
+        self.assertEqual(writer2.bytes_written,
+                         os.path.getsize(self.large_file_name))
+        writer2.destroy_cache()
+
+
+    def test_dry_run_feature(self):
+        def wrapped_write(*args, **kwargs):
+            data = args[1]
+            # Exit only on last block
+            if len(data) < arvados.config.KEEP_BLOCK_SIZE:
+                raise SystemExit("Simulated error")
+            return self.arvfile_write(*args, **kwargs)
+
+        with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
+                        autospec=True) as mocked_write:
+            mocked_write.side_effect = wrapped_write
+            writer = arv_put.ArvPutUploadJob([self.large_file_name],
+                                             replication_desired=1)
+            with self.assertRaises(SystemExit):
+                writer.start(save_collection=False)
+            # Confirm that the file was partially uploaded
+            self.assertGreater(writer.bytes_written, 0)
+            self.assertLess(writer.bytes_written,
+                            os.path.getsize(self.large_file_name))
+        # Retry the upload using dry_run to check if there is a pending upload
+        writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
+                                          replication_desired=1,
+                                          dry_run=True)
+        with self.assertRaises(arv_put.ArvPutUploadIsPending):
+            writer2.start(save_collection=False)
+        # Complete the pending upload
+        writer3 = arv_put.ArvPutUploadJob([self.large_file_name],
+                                          replication_desired=1)
+        writer3.start(save_collection=False)
+        # Confirm there's no pending upload with dry_run=True
+        writer4 = arv_put.ArvPutUploadJob([self.large_file_name],
+                                          replication_desired=1,
+                                          dry_run=True)
+        with self.assertRaises(arv_put.ArvPutUploadNotPending):
+            writer4.start(save_collection=False)
+        writer4.destroy_cache()
+        # Test obvious cases
+        with self.assertRaises(arv_put.ArvPutUploadIsPending):
+            arv_put.ArvPutUploadJob([self.large_file_name],
+                                    replication_desired=1,
+                                    dry_run=True,
+                                    resume=False,
+                                    use_cache=False)
+        with self.assertRaises(arv_put.ArvPutUploadIsPending):
+            arv_put.ArvPutUploadJob([self.large_file_name],
+                                    replication_desired=1,
+                                    dry_run=True,
+                                    resume=False)
 
 
 class ArvadosExpectedBytesTest(ArvadosBaseTestCase):
     TEST_SIZE = os.path.getsize(__file__)
 
     def test_expected_bytes_for_file(self):
-        self.assertEquals(self.TEST_SIZE,
+        self.assertEqual(self.TEST_SIZE,
                           arv_put.expected_bytes_for([__file__]))
 
     def test_expected_bytes_for_tree(self):
         tree = self.make_tmpdir()
         shutil.copyfile(__file__, os.path.join(tree, 'one'))
         shutil.copyfile(__file__, os.path.join(tree, 'two'))
-        self.assertEquals(self.TEST_SIZE * 2,
+        self.assertEqual(self.TEST_SIZE * 2,
                           arv_put.expected_bytes_for([tree]))
-        self.assertEquals(self.TEST_SIZE * 3,
+        self.assertEqual(self.TEST_SIZE * 3,
                           arv_put.expected_bytes_for([tree, __file__]))
 
     def test_expected_bytes_for_device(self):
@@ -324,93 +489,29 @@ class ArvadosPutReportTest(ArvadosBaseTestCase):
                                       arv_put.human_progress(count, None)))
 
 
-class ArvadosPutProjectLinkTest(ArvadosBaseTestCase):
+class ArvadosPutTest(run_test_server.TestCaseWithServers, ArvadosBaseTestCase):
+    MAIN_SERVER = {}
     Z_UUID = 'zzzzz-zzzzz-zzzzzzzzzzzzzzz'
 
-    def setUp(self):
-        self.stderr = StringIO()
-        super(ArvadosPutProjectLinkTest, self).setUp()
-
-    def tearDown(self):
-        self.stderr.close()
-        super(ArvadosPutProjectLinkTest, self).tearDown()
-
-    def prep_link_from_arguments(self, args, uuid_found=True):
-        try:
-            link = arv_put.prep_project_link(arv_put.parse_arguments(args),
-                                             self.stderr,
-                                             lambda uuid: uuid_found)
-        finally:
-            self.stderr.seek(0)
-        return link
-
-    def check_link(self, link, project_uuid, link_name=None):
-        self.assertEqual(project_uuid, link.get('tail_uuid'))
-        self.assertEqual('name', link.get('link_class'))
-        if link_name is None:
-            self.assertNotIn('name', link)
-        else:
-            self.assertEqual(link_name, link.get('name'))
-        self.assertNotIn('head_uuid', link)
-
-    def check_stderr_empty(self):
-        self.assertEqual('', self.stderr.getvalue())
-
-    def test_project_link_with_name(self):
-        link = self.prep_link_from_arguments(['--project-uuid', self.Z_UUID,
-                                              '--name', 'test link AAA'])
-        self.check_link(link, self.Z_UUID, 'test link AAA')
-        self.check_stderr_empty()
-
-    def test_project_link_without_name(self):
-        username = pwd.getpwuid(os.getuid()).pw_name
-        link = self.prep_link_from_arguments(['--project-uuid', self.Z_UUID])
-        self.assertIsNotNone(link.get('name', None))
-        self.assertRegexpMatches(
-            link['name'],
-            r'^Saved at .* by {}@'.format(re.escape(username)))
-        self.check_link(link, self.Z_UUID, link.get('name', None))
-        for line in self.stderr:
-            if "No --name specified" in line:
-                break
-        else:
-            self.fail("no warning emitted about the lack of collection name")
-
-    def test_collection_without_project_defaults_to_home(self):
-        link = self.prep_link_from_arguments(['--name', 'test link BBB'])
-        self.check_link(link, self.Z_UUID)
-        self.check_stderr_empty()
-
-    def test_no_link_or_warning_with_no_collection(self):
-        self.assertIsNone(self.prep_link_from_arguments(['--raw']))
-        self.check_stderr_empty()
-
-    def test_error_when_project_not_found(self):
-        self.assertRaises(ValueError,
-                          self.prep_link_from_arguments,
-                          ['--project-uuid', self.Z_UUID], False)
-
-    def test_link_without_collection_is_error(self):
-        self.assertRaises(ValueError,
-                          self.prep_link_from_arguments,
-                          ['--project-uuid', self.Z_UUID, '--stream'])
-
-
-class ArvadosPutTest(ArvadosKeepLocalStoreTestCase):
     def call_main_with_args(self, args):
         self.main_stdout = StringIO()
         self.main_stderr = StringIO()
         return arv_put.main(args, self.main_stdout, self.main_stderr)
 
-    def call_main_on_test_file(self):
+    def call_main_on_test_file(self, args=[]):
         with self.make_test_file() as testfile:
             path = testfile.name
-            self.call_main_with_args(['--stream', '--no-progress'path])
+            self.call_main_with_args(['--stream', '--no-progress'] + args + [path])
         self.assertTrue(
             os.path.exists(os.path.join(os.environ['KEEP_LOCAL_STORE'],
                                         '098f6bcd4621d373cade4e832627b4f6')),
             "did not find file stream in Keep store")
 
+    def setUp(self):
+        super(ArvadosPutTest, self).setUp()
+        run_test_server.authorize_with('active')
+        arv_put.api_client = None
+
     def tearDown(self):
         for outbuf in ['main_stdout', 'main_stderr']:
             if hasattr(self, outbuf):
@@ -418,6 +519,15 @@ class ArvadosPutTest(ArvadosKeepLocalStoreTestCase):
                 delattr(self, outbuf)
         super(ArvadosPutTest, self).tearDown()
 
+    def test_version_argument(self):
+        err = io.BytesIO()
+        out = io.BytesIO()
+        with tutil.redirected_streams(stdout=out, stderr=err):
+            with self.assertRaises(SystemExit):
+                self.call_main_with_args(['--version'])
+        self.assertEqual(out.getvalue(), '')
+        self.assertRegexpMatches(err.getvalue(), "[0-9]+\.[0-9]+\.[0-9]+")
+
     def test_simple_file_put(self):
         self.call_main_on_test_file()
 
@@ -443,73 +553,128 @@ class ArvadosPutTest(ArvadosKeepLocalStoreTestCase):
             arv_put.ResumeCache.CACHE_DIR = orig_cachedir
             os.chmod(cachedir, 0o700)
 
-    def test_link_without_project_uuid_aborts(self):
-        self.assertRaises(SystemExit, self.call_main_with_args,
-                          ['--name', 'test without project UUID', '/dev/null'])
-
-    def test_link_without_collection_aborts(self):
+    def test_put_block_replication(self):
+        self.call_main_on_test_file()
+        with mock.patch('arvados.collection.KeepClient.local_store_put') as put_mock:
+            put_mock.return_value = 'acbd18db4cc2f85cedef654fccc4a4d8+3'
+            self.call_main_on_test_file(['--replication', '1'])
+            self.call_main_on_test_file(['--replication', '4'])
+            self.call_main_on_test_file(['--replication', '5'])
+            self.assertEqual(
+                [x[-1].get('copies') for x in put_mock.call_args_list],
+                [1, 4, 5])
+
+    def test_normalize(self):
+        testfile1 = self.make_test_file()
+        testfile2 = self.make_test_file()
+        test_paths = [testfile1.name, testfile2.name]
+        # Reverse-sort the paths, so normalization must change their order.
+        test_paths.sort(reverse=True)
+        self.call_main_with_args(['--stream', '--no-progress', '--normalize'] +
+                                 test_paths)
+        manifest = self.main_stdout.getvalue()
+        # Assert the second file we specified appears first in the manifest.
+        file_indices = [manifest.find(':' + os.path.basename(path))
+                        for path in test_paths]
+        self.assertGreater(*file_indices)
+
+    def test_error_name_without_collection(self):
         self.assertRaises(SystemExit, self.call_main_with_args,
                           ['--name', 'test without Collection',
                            '--stream', '/dev/null'])
 
-class ArvPutIntegrationTest(unittest.TestCase):
-    PROJECT_UUID = run_test_server.fixture('groups')['aproject']['uuid']
-    ENVIRON = os.environ
-    ENVIRON['PYTHONPATH'] = ':'.join(sys.path)
+    def test_error_when_project_not_found(self):
+        self.assertRaises(SystemExit,
+                          self.call_main_with_args,
+                          ['--project-uuid', self.Z_UUID])
 
-    @classmethod
-    def setUpClass(cls):
-        try:
-            del os.environ['KEEP_LOCAL_STORE']
-        except KeyError:
-            pass
+    def test_error_bad_project_uuid(self):
+        self.assertRaises(SystemExit,
+                          self.call_main_with_args,
+                          ['--project-uuid', self.Z_UUID, '--stream'])
 
-        # Use the blob_signing_key from the Rails "test" configuration
-        # to provision the Keep server.
-        config_blob_signing_key = None
-        for config_file in ['application.yml', 'application.default.yml']:
-            with open(os.path.join(os.path.dirname(__file__),
-                                   run_test_server.ARV_API_SERVER_DIR,
-                                   "config",
-                                   config_file)) as f:
+    def test_api_error_handling(self):
+        coll_save_mock = mock.Mock(name='arv.collection.Collection().save_new()')
+        coll_save_mock.side_effect = arvados.errors.ApiError(
+            fake_httplib2_response(403), '{}')
+        with mock.patch('arvados.collection.Collection.save_new',
+                        new=coll_save_mock):
+            with self.assertRaises(SystemExit) as exc_test:
+                self.call_main_with_args(['/dev/null'])
+            self.assertLess(0, exc_test.exception.args[0])
+            self.assertLess(0, coll_save_mock.call_count)
+            self.assertEqual("", self.main_stdout.getvalue())
+
+
+class ArvPutIntegrationTest(run_test_server.TestCaseWithServers,
+                            ArvadosBaseTestCase):
+    def _getKeepServerConfig():
+        for config_file, mandatory in [
+                ['application.yml', False], ['application.default.yml', True]]:
+            path = os.path.join(run_test_server.SERVICES_SRC_DIR,
+                                "api", "config", config_file)
+            if not mandatory and not os.path.exists(path):
+                continue
+            with open(path) as f:
                 rails_config = yaml.load(f.read())
                 for config_section in ['test', 'common']:
                     try:
-                        config_blob_signing_key = rails_config[config_section]["blob_signing_key"]
-                        break
-                    except KeyError, AttributeError:
+                        key = rails_config[config_section]["blob_signing_key"]
+                    except (KeyError, TypeError):
                         pass
-            if config_blob_signing_key != None:
-                break
-        run_test_server.run()
-        run_test_server.run_keep(blob_signing_key=config_blob_signing_key,
-                                 enforce_permissions=(config_blob_signing_key != None))
+                    else:
+                        return {'blob_signing_key': key,
+                                'enforce_permissions': True}
+        return {'blog_signing_key': None, 'enforce_permissions': False}
+
+    MAIN_SERVER = {}
+    KEEP_SERVER = _getKeepServerConfig()
+    PROJECT_UUID = run_test_server.fixture('groups')['aproject']['uuid']
 
     @classmethod
-    def tearDownClass(cls):
-        run_test_server.stop()
-        run_test_server.stop_keep()
+    def setUpClass(cls):
+        super(ArvPutIntegrationTest, cls).setUpClass()
+        cls.ENVIRON = os.environ.copy()
+        cls.ENVIRON['PYTHONPATH'] = ':'.join(sys.path)
+
+    def setUp(self):
+        super(ArvPutIntegrationTest, self).setUp()
+        arv_put.api_client = None
 
     def authorize_with(self, token_name):
         run_test_server.authorize_with(token_name)
         for v in ["ARVADOS_API_HOST",
                   "ARVADOS_API_HOST_INSECURE",
                   "ARVADOS_API_TOKEN"]:
-            os.environ[v] = arvados.config.settings()[v]
+            self.ENVIRON[v] = arvados.config.settings()[v]
+        arv_put.api_client = arvados.api('v1')
+
+    def current_user(self):
+        return arv_put.api_client.users().current().execute()
 
     def test_check_real_project_found(self):
-        self.assertTrue(arv_put.check_project_exists(self.PROJECT_UUID),
+        self.authorize_with('active')
+        self.assertTrue(arv_put.desired_project_uuid(arv_put.api_client, self.PROJECT_UUID, 0),
                         "did not correctly find test fixture project")
 
-    def test_check_error_finding_nonexistent_project(self):
+    def test_check_error_finding_nonexistent_uuid(self):
         BAD_UUID = 'zzzzz-zzzzz-zzzzzzzzzzzzzzz'
+        self.authorize_with('active')
         try:
-            result = arv_put.check_project_exists(BAD_UUID)
+            result = arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID,
+                                                  0)
         except ValueError as error:
             self.assertIn(BAD_UUID, error.message)
         else:
             self.assertFalse(result, "incorrectly found nonexistent project")
 
+    def test_check_error_finding_nonexistent_project(self):
+        BAD_UUID = 'zzzzz-tpzed-zzzzzzzzzzzzzzz'
+        self.authorize_with('active')
+        with self.assertRaises(apiclient.errors.HttpError):
+            result = arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID,
+                                                  0)
+
     def test_short_put_from_stdin(self):
         # Have to run this as an integration test since arv-put can't
         # read from the tests' stdin.
@@ -541,24 +706,23 @@ class ArvPutIntegrationTest(unittest.TestCase):
 
         # Before doing anything, demonstrate that the collection
         # we're about to create is not present in our test fixture.
-        api = arvados.api('v1', cache=False)
         manifest_uuid = "00b4e9f40ac4dd432ef89749f1c01e74+47"
         with self.assertRaises(apiclient.errors.HttpError):
-            notfound = api.collections().get(uuid=manifest_uuid).execute()
+            notfound = arv_put.api_client.collections().get(
+                uuid=manifest_uuid).execute()
 
-        datadir = tempfile.mkdtemp()
+        datadir = self.make_tmpdir()
         with open(os.path.join(datadir, "foo"), "w") as f:
             f.write("The quick brown fox jumped over the lazy dog")
         p = subprocess.Popen([sys.executable, arv_put.__file__, datadir],
                              stdout=subprocess.PIPE, env=self.ENVIRON)
         (arvout, arverr) = p.communicate()
-        self.assertEqual(p.returncode, 0)
         self.assertEqual(arverr, None)
-        self.assertEqual(arvout.strip(), manifest_uuid)
+        self.assertEqual(p.returncode, 0)
 
         # The manifest text stored in the API server under the same
         # manifest UUID must use signed locators.
-        c = api.collections().get(uuid=manifest_uuid).execute()
+        c = arv_put.api_client.collections().get(uuid=manifest_uuid).execute()
         self.assertRegexpMatches(
             c['manifest_text'],
             r'^\. 08a008a01d498c404b0c30852b39d3b8\+44\+A[0-9a-f]+@[0-9a-f]+ 0:44:foo\n')
@@ -566,33 +730,71 @@ class ArvPutIntegrationTest(unittest.TestCase):
         os.remove(os.path.join(datadir, "foo"))
         os.rmdir(datadir)
 
-    def run_and_find_link(self, text, extra_args=[]):
+    def run_and_find_collection(self, text, extra_args=[]):
         self.authorize_with('active')
         pipe = subprocess.Popen(
-            [sys.executable, arv_put.__file__,
-             '--project-uuid', self.PROJECT_UUID] + extra_args,
+            [sys.executable, arv_put.__file__] + extra_args,
             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
             stderr=subprocess.PIPE, env=self.ENVIRON)
         stdout, stderr = pipe.communicate(text)
-        link_list = arvados.api('v1', cache=False).links().list(
-            filters=[['head_uuid', '=', stdout.strip()],
-                     ['tail_uuid', '=', self.PROJECT_UUID],
-                     ['link_class', '=', 'name']]).execute().get('items', [])
-        self.assertEqual(1, len(link_list))
-        return link_list[0]
+        search_key = ('portable_data_hash'
+                      if '--portable-data-hash' in extra_args else 'uuid')
+        collection_list = arvados.api('v1').collections().list(
+            filters=[[search_key, '=', stdout.strip()]]).execute().get('items', [])
+        self.assertEqual(1, len(collection_list))
+        return collection_list[0]
+
+    def test_put_collection_with_later_update(self):
+        tmpdir = self.make_tmpdir()
+        with open(os.path.join(tmpdir, 'file1'), 'w') as f:
+            f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
+        col = self.run_and_find_collection("", ['--no-progress', tmpdir])
+        self.assertNotEqual(None, col['uuid'])
+        # Add a new file to the directory
+        with open(os.path.join(tmpdir, 'file2'), 'w') as f:
+            f.write('The quick brown fox jumped over the lazy dog')
+        updated_col = self.run_and_find_collection("", ['--no-progress', '--update-collection', col['uuid'], tmpdir])
+        self.assertEqual(col['uuid'], updated_col['uuid'])
+        # Get the manifest and check that the new file is being included
+        c = arv_put.api_client.collections().get(uuid=updated_col['uuid']).execute()
+        self.assertRegexpMatches(c['manifest_text'], r'^\. .*:44:file2\n')
+
+    def test_put_collection_with_high_redundancy(self):
+        # Write empty data: we're not testing CollectionWriter, just
+        # making sure collections.create tells the API server what our
+        # desired replication level is.
+        collection = self.run_and_find_collection("", ['--replication', '4'])
+        self.assertEqual(4, collection['replication_desired'])
+
+    def test_put_collection_with_default_redundancy(self):
+        collection = self.run_and_find_collection("")
+        self.assertEqual(None, collection['replication_desired'])
 
     def test_put_collection_with_unnamed_project_link(self):
-        link = self.run_and_find_link("Test unnamed collection")
+        link = self.run_and_find_collection(
+            "Test unnamed collection",
+            ['--portable-data-hash', '--project-uuid', self.PROJECT_UUID])
         username = pwd.getpwuid(os.getuid()).pw_name
         self.assertRegexpMatches(
             link['name'],
             r'^Saved at .* by {}@'.format(re.escape(username)))
 
+    def test_put_collection_with_name_and_no_project(self):
+        link_name = 'Test Collection Link in home project'
+        collection = self.run_and_find_collection(
+            "Test named collection in home project",
+            ['--portable-data-hash', '--name', link_name])
+        self.assertEqual(link_name, collection['name'])
+        my_user_uuid = self.current_user()['uuid']
+        self.assertEqual(my_user_uuid, collection['owner_uuid'])
+
     def test_put_collection_with_named_project_link(self):
         link_name = 'Test auto Collection Link'
-        link = self.run_and_find_link("Test named collection",
-                                      ['--name', link_name])
-        self.assertEqual(link_name, link['name'])
+        collection = self.run_and_find_collection("Test named collection",
+                                      ['--portable-data-hash',
+                                       '--name', link_name,
+                                       '--project-uuid', self.PROJECT_UUID])
+        self.assertEqual(link_name, collection['name'])
 
 
 if __name__ == '__main__':