Merge branch '21666-provision-test-improvement'
[arvados.git] / sdk / python / tests / test_arv_put.py
index fac970c95a6fdb13ecbef709ff850798c67840a5..e8a3e65bdcd99a7c0a8a9ac6a19af5ebc2655aae 100644 (file)
@@ -4,20 +4,13 @@
 #
 # SPDX-License-Identifier: Apache-2.0
 
-from __future__ import absolute_import
-from __future__ import division
-from future import standard_library
-standard_library.install_aliases()
-from builtins import str
-from builtins import range
-from functools import partial
 import apiclient
 import ciso8601
+import copy
 import datetime
-import hashlib
 import json
 import logging
-import mock
+import multiprocessing
 import os
 import pwd
 import random
@@ -31,7 +24,9 @@ import tempfile
 import time
 import unittest
 import uuid
-import yaml
+
+from functools import partial
+from unittest import mock
 
 import arvados
 import arvados.commands.put as arv_put
@@ -294,6 +289,26 @@ class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
         shutil.rmtree(self.small_files_dir)
         shutil.rmtree(self.tempdir_with_symlink)
 
+    def test_non_regular_files_are_ignored_except_symlinks_to_dirs(self):
+        def pfunc(x):
+            with open(x, 'w') as f:
+                f.write('test')
+        fifo_filename = 'fifo-file'
+        fifo_path = os.path.join(self.tempdir_with_symlink, fifo_filename)
+        self.assertTrue(os.path.islink(os.path.join(self.tempdir_with_symlink, 'linkeddir')))
+        os.mkfifo(fifo_path)
+        producer = multiprocessing.Process(target=pfunc, args=(fifo_path,))
+        producer.start()
+        cwriter = arv_put.ArvPutUploadJob([self.tempdir_with_symlink])
+        cwriter.start(save_collection=False)
+        if producer.exitcode is None:
+            # If the producer is still running, kill it. This should always be
+            # before any assertion that may fail.
+            producer.terminate()
+            producer.join(1)
+        self.assertIn('linkeddir', cwriter.manifest_text())
+        self.assertNotIn(fifo_filename, cwriter.manifest_text())
+
     def test_symlinks_are_followed_by_default(self):
         self.assertTrue(os.path.islink(os.path.join(self.tempdir_with_symlink, 'linkeddir')))
         self.assertTrue(os.path.islink(os.path.join(self.tempdir_with_symlink, 'linkedfile')))
@@ -554,7 +569,7 @@ class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
 class CachedManifestValidationTest(ArvadosBaseTestCase):
     class MockedPut(arv_put.ArvPutUploadJob):
         def __init__(self, cached_manifest=None):
-            self._state = arv_put.ArvPutUploadJob.EMPTY_STATE
+            self._state = copy.deepcopy(arv_put.ArvPutUploadJob.EMPTY_STATE)
             self._state['manifest'] = cached_manifest
             self._api_client = mock.MagicMock()
             self.logger = mock.MagicMock()
@@ -794,6 +809,7 @@ class ArvadosPutTest(run_test_server.TestCaseWithServers,
 
     def test_put_block_replication(self):
         self.call_main_on_test_file()
+        arv_put.api_client = None
         with mock.patch('arvados.collection.KeepClient.local_store_put') as put_mock:
             put_mock.return_value = 'acbd18db4cc2f85cedef654fccc4a4d8+3'
             self.call_main_on_test_file(['--replication', '1'])
@@ -1055,43 +1071,53 @@ class ArvPutIntegrationTest(run_test_server.TestCaseWithServers,
             r'INFO: Cache expired, starting from scratch.*')
         self.assertEqual(p.returncode, 0)
 
-    def test_invalid_signature_invalidates_cache(self):
-        self.authorize_with('active')
-        tmpdir = self.make_tmpdir()
-        with open(os.path.join(tmpdir, 'somefile.txt'), 'w') as f:
-            f.write('foo')
-        # Upload a directory and get the cache file name
-        p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
-                             stdout=subprocess.PIPE,
-                             stderr=subprocess.PIPE,
-                             env=self.ENVIRON)
-        (_, err) = p.communicate()
-        self.assertRegex(err.decode(), r'INFO: Creating new cache file at ')
-        self.assertEqual(p.returncode, 0)
-        cache_filepath = re.search(r'INFO: Creating new cache file at (.*)',
-                                   err.decode()).groups()[0]
-        self.assertTrue(os.path.isfile(cache_filepath))
-        # Load the cache file contents and modify the manifest to simulate
-        # an invalid access token
-        with open(cache_filepath, 'r') as c:
-            cache = json.load(c)
-        self.assertRegex(cache['manifest'], r'\+A\S+\@')
-        cache['manifest'] = re.sub(
-            r'\+A.*\@',
-            "+Aabcdef0123456789abcdef0123456789abcdef01@",
-            cache['manifest'])
-        with open(cache_filepath, 'w') as c:
-            c.write(json.dumps(cache))
-        # Re-run the upload and expect to get an invalid cache message
-        p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
-                             stdout=subprocess.PIPE,
-                             stderr=subprocess.PIPE,
-                             env=self.ENVIRON)
-        (_, err) = p.communicate()
-        self.assertRegex(
-            err.decode(),
-            r'ERROR: arv-put: Resume cache contains invalid signature.*')
-        self.assertEqual(p.returncode, 1)
+    def test_invalid_signature_in_cache(self):
+        for batch_mode in [False, True]:
+            self.authorize_with('active')
+            tmpdir = self.make_tmpdir()
+            with open(os.path.join(tmpdir, 'somefile.txt'), 'w') as f:
+                f.write('foo')
+            # Upload a directory and get the cache file name
+            arv_put_args = [tmpdir]
+            if batch_mode:
+                arv_put_args = ['--batch'] + arv_put_args
+            p = subprocess.Popen([sys.executable, arv_put.__file__] + arv_put_args,
+                                stdout=subprocess.PIPE,
+                                stderr=subprocess.PIPE,
+                                env=self.ENVIRON)
+            (_, err) = p.communicate()
+            self.assertRegex(err.decode(), r'INFO: Creating new cache file at ')
+            self.assertEqual(p.returncode, 0)
+            cache_filepath = re.search(r'INFO: Creating new cache file at (.*)',
+                                    err.decode()).groups()[0]
+            self.assertTrue(os.path.isfile(cache_filepath))
+            # Load the cache file contents and modify the manifest to simulate
+            # an invalid access token
+            with open(cache_filepath, 'r') as c:
+                cache = json.load(c)
+            self.assertRegex(cache['manifest'], r'\+A\S+\@')
+            cache['manifest'] = re.sub(
+                r'\+A.*\@',
+                "+Aabcdef0123456789abcdef0123456789abcdef01@",
+                cache['manifest'])
+            with open(cache_filepath, 'w') as c:
+                c.write(json.dumps(cache))
+            # Re-run the upload and expect to get an invalid cache message
+            p = subprocess.Popen([sys.executable, arv_put.__file__] + arv_put_args,
+                                stdout=subprocess.PIPE,
+                                stderr=subprocess.PIPE,
+                                env=self.ENVIRON)
+            (_, err) = p.communicate()
+            if not batch_mode:
+                self.assertRegex(
+                    err.decode(),
+                    r'ERROR: arv-put: Resume cache contains invalid signature.*')
+                self.assertEqual(p.returncode, 1)
+            else:
+                self.assertRegex(
+                    err.decode(),
+                    r'Invalid signatures on cache file \'.*\' while being run in \'batch mode\' -- continuing anyways.*')
+                self.assertEqual(p.returncode, 0)
 
     def test_single_expired_signature_reuploads_file(self):
         self.authorize_with('active')