-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+from __future__ import absolute_import
+from __future__ import division
+from future import standard_library
+standard_library.install_aliases()
+from builtins import str
+from builtins import range
import apiclient
import mock
import os
import threading
import hashlib
import random
-import multiprocessing
-
-from cStringIO import StringIO
+import uuid
import arvados
import arvados.commands.put as arv_put
-import arvados_testutil as tutil
+from . import arvados_testutil as tutil
-from arvados_testutil import ArvadosBaseTestCase, fake_httplib2_response
-import run_test_server
+from .arvados_testutil import ArvadosBaseTestCase, fake_httplib2_response
+from . import run_test_server
class ArvadosPutResumeCacheTest(ArvadosBaseTestCase):
CACHE_ARGSET = [
[],
['/dev/null'],
['/dev/null', '--filename', 'empty'],
- ['/tmp'],
- ['/tmp', '--max-manifest-depth', '0'],
- ['/tmp', '--max-manifest-depth', '1']
+ ['/tmp']
]
def tearDown(self):
class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
ArvadosBaseTestCase):
+
def setUp(self):
super(ArvPutUploadJobTest, self).setUp()
run_test_server.authorize_with('active')
_, self.large_file_name = tempfile.mkstemp()
fileobj = open(self.large_file_name, 'w')
# Make sure to write just a little more than one block
- for _ in range((arvados.config.KEEP_BLOCK_SIZE/(1024*1024))+1):
- data = random.choice(['x', 'y', 'z']) * 1024 * 1024 # 1 MB
+ for _ in range((arvados.config.KEEP_BLOCK_SIZE>>20)+1):
+ data = random.choice(['x', 'y', 'z']) * 1024 * 1024 # 1 MiB
fileobj.write(data)
fileobj.close()
+ # Temp dir containing small files to be repacked
+ self.small_files_dir = tempfile.mkdtemp()
+ data = 'y' * 1024 * 1024 # 1 MB
+ for i in range(1, 70):
+ with open(os.path.join(self.small_files_dir, str(i)), 'w') as f:
+ f.write(data + str(i))
self.arvfile_write = getattr(arvados.arvfile.ArvadosFileWriter, 'write')
+ # Temp dir to hold a symlink to other temp dir
+ self.tempdir_with_symlink = tempfile.mkdtemp()
+ os.symlink(self.tempdir, os.path.join(self.tempdir_with_symlink, 'linkeddir'))
+ os.symlink(os.path.join(self.tempdir, '1'),
+ os.path.join(self.tempdir_with_symlink, 'linkedfile'))
def tearDown(self):
super(ArvPutUploadJobTest, self).tearDown()
shutil.rmtree(self.tempdir)
os.unlink(self.large_file_name)
+ shutil.rmtree(self.small_files_dir)
+ shutil.rmtree(self.tempdir_with_symlink)
+
+ def test_symlinks_are_followed_by_default(self):
+ cwriter = arv_put.ArvPutUploadJob([self.tempdir_with_symlink])
+ cwriter.start(save_collection=False)
+ self.assertIn('linkeddir', cwriter.manifest_text())
+ self.assertIn('linkedfile', cwriter.manifest_text())
+ cwriter.destroy_cache()
+
+ def test_symlinks_are_not_followed_when_requested(self):
+ cwriter = arv_put.ArvPutUploadJob([self.tempdir_with_symlink],
+ follow_links=False)
+ cwriter.start(save_collection=False)
+ self.assertNotIn('linkeddir', cwriter.manifest_text())
+ self.assertNotIn('linkedfile', cwriter.manifest_text())
+ cwriter.destroy_cache()
+
+ def test_passing_nonexistant_path_raise_exception(self):
+ uuid_str = str(uuid.uuid4())
+ cwriter = arv_put.ArvPutUploadJob(["/this/path/does/not/exist/{}".format(uuid_str)])
+ with self.assertRaises(arv_put.PathDoesNotExistError):
+ cwriter.start(save_collection=False)
def test_writer_works_without_cache(self):
cwriter = arv_put.ArvPutUploadJob(['/dev/null'], resume=False)
- cwriter.start()
+ cwriter.start(save_collection=False)
self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text())
def test_writer_works_with_cache(self):
with tempfile.NamedTemporaryFile() as f:
- f.write('foo')
+ f.write(b'foo')
f.flush()
cwriter = arv_put.ArvPutUploadJob([f.name])
- cwriter.start()
+ cwriter.start(save_collection=False)
+ self.assertEqual(0, cwriter.bytes_skipped)
self.assertEqual(3, cwriter.bytes_written)
# Don't destroy the cache, and start another upload
cwriter_new = arv_put.ArvPutUploadJob([f.name])
- cwriter_new.start()
+ cwriter_new.start(save_collection=False)
cwriter_new.destroy_cache()
- self.assertEqual(0, cwriter_new.bytes_written)
+ self.assertEqual(3, cwriter_new.bytes_skipped)
+ self.assertEqual(3, cwriter_new.bytes_written)
def make_progress_tester(self):
progression = []
def test_progress_reporting(self):
with tempfile.NamedTemporaryFile() as f:
- f.write('foo')
+ f.write(b'foo')
f.flush()
for expect_count in (None, 8):
progression, reporter = self.make_progress_tester()
cwriter = arv_put.ArvPutUploadJob([f.name],
reporter=reporter, bytes_expected=expect_count)
- cwriter.start()
+ cwriter.start(save_collection=False)
cwriter.destroy_cache()
self.assertIn((3, expect_count), progression)
def test_writer_upload_directory(self):
cwriter = arv_put.ArvPutUploadJob([self.tempdir])
- cwriter.start()
+ cwriter.start(save_collection=False)
cwriter.destroy_cache()
self.assertEqual(1024*(1+2+3+4+5), cwriter.bytes_written)
data = args[1]
# Exit only on last block
if len(data) < arvados.config.KEEP_BLOCK_SIZE:
+ # Simulate a checkpoint before quitting. Ensure block commit.
+ self.writer._update(final=True)
raise SystemExit("Simulated error")
return self.arvfile_write(*args, **kwargs)
mocked_write.side_effect = wrapped_write
writer = arv_put.ArvPutUploadJob([self.large_file_name],
replication_desired=1)
+ # We'll be accessing from inside the wrapper
+ self.writer = writer
with self.assertRaises(SystemExit):
- writer.start()
- self.assertLess(writer.bytes_written,
- os.path.getsize(self.large_file_name))
+ writer.start(save_collection=False)
+ # Confirm that the file was partially uploaded
+ self.assertGreater(writer.bytes_written, 0)
+ self.assertLess(writer.bytes_written,
+ os.path.getsize(self.large_file_name))
# Retry the upload
writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
replication_desired=1)
- writer2.start()
- self.assertEqual(writer.bytes_written + writer2.bytes_written,
+ writer2.start(save_collection=False)
+ self.assertEqual(writer.bytes_written + writer2.bytes_written - writer2.bytes_skipped,
os.path.getsize(self.large_file_name))
writer2.destroy_cache()
+ del(self.writer)
+
+ # Test for bug #11002
+ def test_graceful_exit_while_repacking_small_blocks(self):
+ def wrapped_commit(*args, **kwargs):
+ raise SystemExit("Simulated error")
+
+ with mock.patch('arvados.arvfile._BlockManager.commit_bufferblock',
+ autospec=True) as mocked_commit:
+ mocked_commit.side_effect = wrapped_commit
+ # Upload a little more than 1 block, wrapped_commit will make the first block
+ # commit to fail.
+ # arv-put should not exit with an exception by trying to commit the collection
+ # as it's in an inconsistent state.
+ writer = arv_put.ArvPutUploadJob([self.small_files_dir],
+ replication_desired=1)
+ try:
+ with self.assertRaises(SystemExit):
+ writer.start(save_collection=False)
+ except arvados.arvfile.UnownedBlockError:
+ self.fail("arv-put command is trying to use a corrupted BlockManager. See https://dev.arvados.org/issues/11002")
+ writer.destroy_cache()
+ def test_no_resume_when_asked(self):
+ def wrapped_write(*args, **kwargs):
+ data = args[1]
+ # Exit only on last block
+ if len(data) < arvados.config.KEEP_BLOCK_SIZE:
+ # Simulate a checkpoint before quitting.
+ self.writer._update()
+ raise SystemExit("Simulated error")
+ return self.arvfile_write(*args, **kwargs)
+
+ with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
+ autospec=True) as mocked_write:
+ mocked_write.side_effect = wrapped_write
+ writer = arv_put.ArvPutUploadJob([self.large_file_name],
+ replication_desired=1)
+ # We'll be accessing from inside the wrapper
+ self.writer = writer
+ with self.assertRaises(SystemExit):
+ writer.start(save_collection=False)
+ # Confirm that the file was partially uploaded
+ self.assertGreater(writer.bytes_written, 0)
+ self.assertLess(writer.bytes_written,
+ os.path.getsize(self.large_file_name))
+ # Retry the upload, this time without resume
+ writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
+ replication_desired=1,
+ resume=False)
+ writer2.start(save_collection=False)
+ self.assertEqual(writer2.bytes_skipped, 0)
+ self.assertEqual(writer2.bytes_written,
+ os.path.getsize(self.large_file_name))
+ writer2.destroy_cache()
+ del(self.writer)
+
+ def test_no_resume_when_no_cache(self):
+ def wrapped_write(*args, **kwargs):
+ data = args[1]
+ # Exit only on last block
+ if len(data) < arvados.config.KEEP_BLOCK_SIZE:
+ # Simulate a checkpoint before quitting.
+ self.writer._update()
+ raise SystemExit("Simulated error")
+ return self.arvfile_write(*args, **kwargs)
+
+ with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
+ autospec=True) as mocked_write:
+ mocked_write.side_effect = wrapped_write
+ writer = arv_put.ArvPutUploadJob([self.large_file_name],
+ replication_desired=1)
+ # We'll be accessing from inside the wrapper
+ self.writer = writer
+ with self.assertRaises(SystemExit):
+ writer.start(save_collection=False)
+ # Confirm that the file was partially uploaded
+ self.assertGreater(writer.bytes_written, 0)
+ self.assertLess(writer.bytes_written,
+ os.path.getsize(self.large_file_name))
+ # Retry the upload, this time without cache usage
+ writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
+ replication_desired=1,
+ resume=False,
+ use_cache=False)
+ writer2.start(save_collection=False)
+ self.assertEqual(writer2.bytes_skipped, 0)
+ self.assertEqual(writer2.bytes_written,
+ os.path.getsize(self.large_file_name))
+ writer2.destroy_cache()
+ del(self.writer)
+
+ def test_dry_run_feature(self):
+ def wrapped_write(*args, **kwargs):
+ data = args[1]
+ # Exit only on last block
+ if len(data) < arvados.config.KEEP_BLOCK_SIZE:
+ # Simulate a checkpoint before quitting.
+ self.writer._update()
+ raise SystemExit("Simulated error")
+ return self.arvfile_write(*args, **kwargs)
+
+ with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
+ autospec=True) as mocked_write:
+ mocked_write.side_effect = wrapped_write
+ writer = arv_put.ArvPutUploadJob([self.large_file_name],
+ replication_desired=1)
+ # We'll be accessing from inside the wrapper
+ self.writer = writer
+ with self.assertRaises(SystemExit):
+ writer.start(save_collection=False)
+ # Confirm that the file was partially uploaded
+ self.assertGreater(writer.bytes_written, 0)
+ self.assertLess(writer.bytes_written,
+ os.path.getsize(self.large_file_name))
+ # Retry the upload using dry_run to check if there is a pending upload
+ writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
+ replication_desired=1,
+ dry_run=True)
+ with self.assertRaises(arv_put.ArvPutUploadIsPending):
+ writer2.start(save_collection=False)
+ # Complete the pending upload
+ writer3 = arv_put.ArvPutUploadJob([self.large_file_name],
+ replication_desired=1)
+ writer3.start(save_collection=False)
+ # Confirm there's no pending upload with dry_run=True
+ writer4 = arv_put.ArvPutUploadJob([self.large_file_name],
+ replication_desired=1,
+ dry_run=True)
+ with self.assertRaises(arv_put.ArvPutUploadNotPending):
+ writer4.start(save_collection=False)
+ writer4.destroy_cache()
+ # Test obvious cases
+ with self.assertRaises(arv_put.ArvPutUploadIsPending):
+ arv_put.ArvPutUploadJob([self.large_file_name],
+ replication_desired=1,
+ dry_run=True,
+ resume=False,
+ use_cache=False)
+ with self.assertRaises(arv_put.ArvPutUploadIsPending):
+ arv_put.ArvPutUploadJob([self.large_file_name],
+ replication_desired=1,
+ dry_run=True,
+ resume=False)
+ del(self.writer)
class ArvadosExpectedBytesTest(ArvadosBaseTestCase):
TEST_SIZE = os.path.getsize(__file__)
def test_known_human_progress(self):
for count, total in [(0, 1), (2, 4), (45, 60)]:
- expect = '{:.1%}'.format(float(count) / total)
+ expect = '{:.1%}'.format(1.0*count/total)
actual = arv_put.human_progress(count, total)
self.assertTrue(actual.startswith('\r'))
self.assertIn(expect, actual)
arv_put.human_progress(count, None)))
-class ArvadosPutTest(run_test_server.TestCaseWithServers, ArvadosBaseTestCase):
+class ArvadosPutTest(run_test_server.TestCaseWithServers,
+ ArvadosBaseTestCase,
+ tutil.VersionChecker):
MAIN_SERVER = {}
Z_UUID = 'zzzzz-zzzzz-zzzzzzzzzzzzzzz'
def call_main_with_args(self, args):
- self.main_stdout = StringIO()
- self.main_stderr = StringIO()
+ self.main_stdout = tutil.StringIO()
+ self.main_stderr = tutil.StringIO()
return arv_put.main(args, self.main_stdout, self.main_stderr)
def call_main_on_test_file(self, args=[]):
'098f6bcd4621d373cade4e832627b4f6')),
"did not find file stream in Keep store")
- def run_main_process(self, args):
- _, stdout_path = tempfile.mkstemp()
- _, stderr_path = tempfile.mkstemp()
- def wrap():
- def wrapper(*args, **kwargs):
- sys.stdout = open(stdout_path, 'w')
- sys.stderr = open(stderr_path, 'w')
- arv_put.main(*args, **kwargs)
- return wrapper
- p = multiprocessing.Process(target=wrap(), args=(args, sys.stdout, sys.stderr))
- p.start()
- p.join()
- out = open(stdout_path, 'r').read()
- err = open(stderr_path, 'r').read()
- os.unlink(stdout_path)
- os.unlink(stderr_path)
- return p.exitcode, out, err
-
def setUp(self):
super(ArvadosPutTest, self).setUp()
run_test_server.authorize_with('active')
super(ArvadosPutTest, self).tearDown()
def test_version_argument(self):
- exitcode, out, err = self.run_main_process(['--version'])
- self.assertEqual(0, exitcode)
- self.assertEqual('', out)
- self.assertNotEqual('', err)
- self.assertRegexpMatches(err, "[0-9]+\.[0-9]+\.[0-9]+")
+ with tutil.redirected_streams(
+ stdout=tutil.StringIO, stderr=tutil.StringIO) as (out, err):
+ with self.assertRaises(SystemExit):
+ self.call_main_with_args(['--version'])
+ self.assertVersionOutput(out, err)
def test_simple_file_put(self):
self.call_main_on_test_file()
def test_api_error_handling(self):
coll_save_mock = mock.Mock(name='arv.collection.Collection().save_new()')
coll_save_mock.side_effect = arvados.errors.ApiError(
- fake_httplib2_response(403), '{}')
+ fake_httplib2_response(403), b'{}')
with mock.patch('arvados.collection.Collection.save_new',
new=coll_save_mock):
with self.assertRaises(SystemExit) as exc_test:
result = arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID,
0)
except ValueError as error:
- self.assertIn(BAD_UUID, error.message)
+ self.assertIn(BAD_UUID, str(error))
else:
self.assertFalse(result, "incorrectly found nonexistent project")
[sys.executable, arv_put.__file__, '--stream'],
stdin=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT, env=self.ENVIRON)
- pipe.stdin.write('stdin test\n')
+ pipe.stdin.write(b'stdin test\n')
pipe.stdin.close()
deadline = time.time() + 5
while (pipe.poll() is None) and (time.time() < deadline):
elif returncode != 0:
sys.stdout.write(pipe.stdout.read())
self.fail("arv-put returned exit code {}".format(returncode))
- self.assertIn('4a9c8b735dce4b5fa3acf221a0b13628+11', pipe.stdout.read())
+ self.assertIn('4a9c8b735dce4b5fa3acf221a0b13628+11',
+ pipe.stdout.read().decode())
def test_ArvPutSignedManifest(self):
# ArvPutSignedManifest runs "arv-put foo" and then attempts to get
datadir = self.make_tmpdir()
with open(os.path.join(datadir, "foo"), "w") as f:
f.write("The quick brown fox jumped over the lazy dog")
- p = subprocess.Popen([sys.executable, arv_put.__file__, datadir],
- stdout=subprocess.PIPE, env=self.ENVIRON)
- (arvout, arverr) = p.communicate()
- self.assertEqual(arverr, None)
+ p = subprocess.Popen([sys.executable, arv_put.__file__,
+ os.path.join(datadir, 'foo')],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ env=self.ENVIRON)
+ (out, err) = p.communicate()
+ self.assertRegex(err.decode(), r'INFO: Collection saved as ')
self.assertEqual(p.returncode, 0)
# The manifest text stored in the API server under the same
# manifest UUID must use signed locators.
c = arv_put.api_client.collections().get(uuid=manifest_uuid).execute()
- self.assertRegexpMatches(
+ self.assertRegex(
c['manifest_text'],
r'^\. 08a008a01d498c404b0c30852b39d3b8\+44\+A[0-9a-f]+@[0-9a-f]+ 0:44:foo\n')
[sys.executable, arv_put.__file__] + extra_args,
stdin=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, env=self.ENVIRON)
- stdout, stderr = pipe.communicate(text)
+ stdout, stderr = pipe.communicate(text.encode())
+ self.assertRegex(stderr.decode(), r'INFO: Collection (updated:|saved as)')
search_key = ('portable_data_hash'
if '--portable-data-hash' in extra_args else 'uuid')
collection_list = arvados.api('v1').collections().list(
- filters=[[search_key, '=', stdout.strip()]]).execute().get('items', [])
+ filters=[[search_key, '=', stdout.decode().strip()]]
+ ).execute().get('items', [])
self.assertEqual(1, len(collection_list))
return collection_list[0]
+ def test_put_collection_with_later_update(self):
+ tmpdir = self.make_tmpdir()
+ with open(os.path.join(tmpdir, 'file1'), 'w') as f:
+ f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
+ col = self.run_and_find_collection("", ['--no-progress', tmpdir])
+ self.assertNotEqual(None, col['uuid'])
+ # Add a new file to the directory
+ with open(os.path.join(tmpdir, 'file2'), 'w') as f:
+ f.write('The quick brown fox jumped over the lazy dog')
+ updated_col = self.run_and_find_collection("", ['--no-progress', '--update-collection', col['uuid'], tmpdir])
+ self.assertEqual(col['uuid'], updated_col['uuid'])
+ # Get the manifest and check that the new file is being included
+ c = arv_put.api_client.collections().get(uuid=updated_col['uuid']).execute()
+ self.assertRegex(c['manifest_text'], r'^\..* .*:44:file2\n')
+
+ def test_upload_directory_reference_without_trailing_slash(self):
+ tmpdir1 = self.make_tmpdir()
+ tmpdir2 = self.make_tmpdir()
+ with open(os.path.join(tmpdir1, 'foo'), 'w') as f:
+ f.write('This is foo')
+ with open(os.path.join(tmpdir2, 'bar'), 'w') as f:
+ f.write('This is not foo')
+ # Upload one directory and one file
+ col = self.run_and_find_collection("", ['--no-progress',
+ tmpdir1,
+ os.path.join(tmpdir2, 'bar')])
+ self.assertNotEqual(None, col['uuid'])
+ c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
+ # Check that 'foo' was written inside a subcollection
+ # OTOH, 'bar' should have been directly uploaded on the root collection
+ self.assertRegex(c['manifest_text'], r'^\. .*:15:bar\n\./.+ .*:11:foo\n')
+
+ def test_upload_directory_reference_with_trailing_slash(self):
+ tmpdir1 = self.make_tmpdir()
+ tmpdir2 = self.make_tmpdir()
+ with open(os.path.join(tmpdir1, 'foo'), 'w') as f:
+ f.write('This is foo')
+ with open(os.path.join(tmpdir2, 'bar'), 'w') as f:
+ f.write('This is not foo')
+ # Upload one directory (with trailing slash) and one file
+ col = self.run_and_find_collection("", ['--no-progress',
+ tmpdir1 + os.sep,
+ os.path.join(tmpdir2, 'bar')])
+ self.assertNotEqual(None, col['uuid'])
+ c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
+ # Check that 'foo' and 'bar' were written at the same level
+ self.assertRegex(c['manifest_text'], r'^\. .*:15:bar .*:11:foo\n')
+
def test_put_collection_with_high_redundancy(self):
# Write empty data: we're not testing CollectionWriter, just
# making sure collections.create tells the API server what our
"Test unnamed collection",
['--portable-data-hash', '--project-uuid', self.PROJECT_UUID])
username = pwd.getpwuid(os.getuid()).pw_name
- self.assertRegexpMatches(
+ self.assertRegex(
link['name'],
r'^Saved at .* by {}@'.format(re.escape(username)))