15003: Remove NodeProfiles section from integration test config.
[arvados.git] / sdk / python / tests / test_arv_put.py
index 4b1f69477e5823502b2a5396a586db82a56e6ff7..540e06c6c6a0d571e7a269e5eae7c9e8a1989419 100644 (file)
@@ -1,3 +1,5 @@
+# -*- coding: utf-8 -*-
+
 # Copyright (C) The Arvados Authors. All rights reserved.
 #
 # SPDX-License-Identifier: Apache-2.0
@@ -8,6 +10,7 @@ from future import standard_library
 standard_library.install_aliases()
 from builtins import str
 from builtins import range
+from functools import partial
 import apiclient
 import datetime
 import hashlib
@@ -528,6 +531,85 @@ class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
                                     resume=False)
         del(self.writer)
 
+class CachedManifestValidationTest(ArvadosBaseTestCase):
+    class MockedPut(arv_put.ArvPutUploadJob):
+        def __init__(self, cached_manifest=None):
+            self._state = arv_put.ArvPutUploadJob.EMPTY_STATE
+            self._state['manifest'] = cached_manifest
+            self._api_client = mock.MagicMock()
+            self.logger = mock.MagicMock()
+            self.num_retries = 1
+
+    def datetime_to_hex(self, dt):
+        return hex(int(time.mktime(dt.timetuple())))[2:]
+
+    def setUp(self):
+        super(CachedManifestValidationTest, self).setUp()
+        self.block1 = "fdba98970961edb29f88241b9d99d890" # foo
+        self.block2 = "37b51d194a7513e45b56f6524f2d51f2" # bar
+        self.template = ". "+self.block1+"+3+Asignature@%s "+self.block2+"+3+Anothersignature@%s 0:3:foofile.txt 3:6:barfile.txt\n"
+
+    def test_empty_cached_manifest_is_valid(self):
+        put_mock = self.MockedPut()
+        self.assertEqual(None, put_mock._state.get('manifest'))
+        self.assertTrue(put_mock._cached_manifest_valid())
+        put_mock._state['manifest'] = ''
+        self.assertTrue(put_mock._cached_manifest_valid())
+
+    def test_signature_cases(self):
+        now = datetime.datetime.utcnow()
+        yesterday = now - datetime.timedelta(days=1)
+        lastweek = now - datetime.timedelta(days=7)
+        tomorrow = now + datetime.timedelta(days=1)
+        nextweek = now + datetime.timedelta(days=7)
+
+        def mocked_head(blocks={}, loc=None):
+            blk = loc.split('+', 1)[0]
+            if blocks.get(blk):
+                return True
+            raise arvados.errors.KeepRequestError("mocked error - block invalid")
+
+        # Block1_expiration, Block2_expiration, Block1_HEAD, Block2_HEAD, Expectation
+        cases = [
+            # All expired, reset cache - OK
+            (yesterday, lastweek, False, False, True),
+            (lastweek, yesterday, False, False, True),
+            # All non-expired valid blocks - OK
+            (tomorrow, nextweek, True, True, True),
+            (nextweek, tomorrow, True, True, True),
+            # All non-expired invalid blocks - Not OK
+            (tomorrow, nextweek, False, False, False),
+            (nextweek, tomorrow, False, False, False),
+            # One non-expired valid block - OK
+            (tomorrow, yesterday, True, False, True),
+            (yesterday, tomorrow, False, True, True),
+            # One non-expired invalid block - Not OK
+            (tomorrow, yesterday, False, False, False),
+            (yesterday, tomorrow, False, False, False),
+        ]
+        for case in cases:
+            b1_expiration, b2_expiration, b1_valid, b2_valid, outcome = case
+            head_responses = {
+                self.block1: b1_valid,
+                self.block2: b2_valid,
+            }
+            cached_manifest = self.template % (
+                self.datetime_to_hex(b1_expiration),
+                self.datetime_to_hex(b2_expiration),
+            )
+            arvput = self.MockedPut(cached_manifest)
+            with mock.patch('arvados.collection.KeepClient.head') as head_mock:
+                head_mock.side_effect = partial(mocked_head, head_responses)
+                self.assertEqual(outcome, arvput._cached_manifest_valid(),
+                    "Case '%s' should have produced outcome '%s'" % (case, outcome)
+                )
+                if b1_expiration > now or b2_expiration > now:
+                    # A HEAD request should have been done
+                    head_mock.assert_called_once()
+                else:
+                    head_mock.assert_not_called()
+
+
 class ArvadosExpectedBytesTest(ArvadosBaseTestCase):
     TEST_SIZE = os.path.getsize(__file__)
 
@@ -549,7 +631,7 @@ class ArvadosExpectedBytesTest(ArvadosBaseTestCase):
                          writer.bytes_expected)
 
     def test_expected_bytes_for_device(self):
-        writer = arv_put.ArvPutUploadJob(['/dev/null'])
+        writer = arv_put.ArvPutUploadJob(['/dev/null'], use_cache=False, resume=False)
         self.assertIsNone(writer.bytes_expected)
         writer = arv_put.ArvPutUploadJob([__file__, '/dev/null'])
         self.assertIsNone(writer.bytes_expected)
@@ -730,6 +812,11 @@ class ArvadosPutTest(run_test_server.TestCaseWithServers,
                           self.call_main_with_args,
                           ['--project-uuid', self.Z_UUID, '--stream'])
 
+    def test_error_when_multiple_storage_classes_specified(self):
+        self.assertRaises(SystemExit,
+                          self.call_main_with_args,
+                          ['--storage-classes', 'hot,cold'])
+
     def test_error_when_excluding_absolute_path(self):
         tmpdir = self.make_tmpdir()
         self.assertRaises(SystemExit,
@@ -772,7 +859,7 @@ class ArvPutIntegrationTest(run_test_server.TestCaseWithServers,
             if not mandatory and not os.path.exists(path):
                 continue
             with open(path) as f:
-                rails_config = yaml.load(f.read())
+                rails_config = yaml.safe_load(f.read())
                 for config_section in ['test', 'common']:
                     try:
                         key = rails_config[config_section]["blob_signing_key"]
@@ -933,7 +1020,7 @@ class ArvPutIntegrationTest(run_test_server.TestCaseWithServers,
         self.assertEqual(1, len(collection_list))
         return collection_list[0]
 
-    def test_expired_token_invalidates_cache(self):
+    def test_all_expired_signatures_invalidates_cache(self):
         self.authorize_with('active')
         tmpdir = self.make_tmpdir()
         with open(os.path.join(tmpdir, 'somefile.txt'), 'w') as f:
@@ -969,7 +1056,89 @@ class ArvPutIntegrationTest(run_test_server.TestCaseWithServers,
         (out, err) = p.communicate()
         self.assertRegex(
             err.decode(),
-            r'WARNING: Uploaded file .* access token expired, will re-upload it from scratch')
+            r'INFO: Cache expired, starting from scratch.*')
+        self.assertEqual(p.returncode, 0)
+
+    def test_invalid_signature_invalidates_cache(self):
+        self.authorize_with('active')
+        tmpdir = self.make_tmpdir()
+        with open(os.path.join(tmpdir, 'somefile.txt'), 'w') as f:
+            f.write('foo')
+        # Upload a directory and get the cache file name
+        p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
+                             stdout=subprocess.PIPE,
+                             stderr=subprocess.PIPE,
+                             env=self.ENVIRON)
+        (out, err) = p.communicate()
+        self.assertRegex(err.decode(), r'INFO: Creating new cache file at ')
+        self.assertEqual(p.returncode, 0)
+        cache_filepath = re.search(r'INFO: Creating new cache file at (.*)',
+                                   err.decode()).groups()[0]
+        self.assertTrue(os.path.isfile(cache_filepath))
+        # Load the cache file contents and modify the manifest to simulate
+        # an invalid access token
+        with open(cache_filepath, 'r') as c:
+            cache = json.load(c)
+        self.assertRegex(cache['manifest'], r'\+A\S+\@')
+        cache['manifest'] = re.sub(
+            r'\+A.*\@',
+            "+Aabcdef0123456789abcdef0123456789abcdef01@",
+            cache['manifest'])
+        with open(cache_filepath, 'w') as c:
+            c.write(json.dumps(cache))
+        # Re-run the upload and expect to get an invalid cache message
+        p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
+                             stdout=subprocess.PIPE,
+                             stderr=subprocess.PIPE,
+                             env=self.ENVIRON)
+        (out, err) = p.communicate()
+        self.assertRegex(
+            err.decode(),
+            r'ERROR: arv-put: Resume cache contains invalid signature.*')
+        self.assertEqual(p.returncode, 1)
+
+    def test_single_expired_signature_reuploads_file(self):
+        self.authorize_with('active')
+        tmpdir = self.make_tmpdir()
+        with open(os.path.join(tmpdir, 'foofile.txt'), 'w') as f:
+            f.write('foo')
+        # Write a second file on its own subdir to force a new stream
+        os.mkdir(os.path.join(tmpdir, 'bar'))
+        with open(os.path.join(tmpdir, 'bar', 'barfile.txt'), 'w') as f:
+            f.write('bar')
+        # Upload a directory and get the cache file name
+        p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
+                             stdout=subprocess.PIPE,
+                             stderr=subprocess.PIPE,
+                             env=self.ENVIRON)
+        (out, err) = p.communicate()
+        self.assertRegex(err.decode(), r'INFO: Creating new cache file at ')
+        self.assertEqual(p.returncode, 0)
+        cache_filepath = re.search(r'INFO: Creating new cache file at (.*)',
+                                   err.decode()).groups()[0]
+        self.assertTrue(os.path.isfile(cache_filepath))
+        # Load the cache file contents and modify the manifest to simulate
+        # an expired access token
+        with open(cache_filepath, 'r') as c:
+            cache = json.load(c)
+        self.assertRegex(cache['manifest'], r'\+A\S+\@')
+        a_month_ago = datetime.datetime.now() - datetime.timedelta(days=30)
+        # Make one of the signatures appear to have expired
+        cache['manifest'] = re.sub(
+            r'\@.*? 3:3:barfile.txt',
+            "@{} 3:3:barfile.txt".format(self.datetime_to_hex(a_month_ago)),
+            cache['manifest'])
+        with open(cache_filepath, 'w') as c:
+            c.write(json.dumps(cache))
+        # Re-run the upload and expect to get an invalid cache message
+        p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
+                             stdout=subprocess.PIPE,
+                             stderr=subprocess.PIPE,
+                             env=self.ENVIRON)
+        (out, err) = p.communicate()
+        self.assertRegex(
+            err.decode(),
+            r'WARNING: Uploaded file \'.*barfile.txt\' access token expired, will re-upload it from scratch')
         self.assertEqual(p.returncode, 0)
         # Confirm that the resulting cache is different from the last run.
         with open(cache_filepath, 'r') as c2:
@@ -1061,6 +1230,18 @@ class ArvPutIntegrationTest(run_test_server.TestCaseWithServers,
                                        '--project-uuid', self.PROJECT_UUID])
         self.assertEqual(link_name, collection['name'])
 
+    def test_put_collection_with_storage_classes_specified(self):
+        collection = self.run_and_find_collection("", ['--storage-classes', 'hot'])
+
+        self.assertEqual(len(collection['storage_classes_desired']), 1)
+        self.assertEqual(collection['storage_classes_desired'][0], 'hot')
+
+    def test_put_collection_without_storage_classes_specified(self):
+        collection = self.run_and_find_collection("")
+
+        self.assertEqual(len(collection['storage_classes_desired']), 1)
+        self.assertEqual(collection['storage_classes_desired'][0], 'default')
+
     def test_exclude_filename_pattern(self):
         tmpdir = self.make_tmpdir()
         tmpsubdir = os.path.join(tmpdir, 'subdir')
@@ -1105,6 +1286,16 @@ class ArvPutIntegrationTest(run_test_server.TestCaseWithServers,
                          r'^\./%s.*:file2.txt' % os.path.basename(tmpdir))
         self.assertRegex(c['manifest_text'], r'^.*:file3.txt')
 
+    def test_unicode_on_filename(self):
+        tmpdir = self.make_tmpdir()
+        fname = u"iā¤arvados.txt"
+        with open(os.path.join(tmpdir, fname), 'w') as f:
+            f.write("This is a unicode named file")
+        col = self.run_and_find_collection("", ['--no-progress', tmpdir])
+        self.assertNotEqual(None, col['uuid'])
+        c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
+        self.assertTrue(fname in c['manifest_text'], u"{} does not include {}".format(c['manifest_text'], fname))
+
     def test_silent_mode_no_errors(self):
         self.authorize_with('active')
         tmpdir = self.make_tmpdir()