+# -*- coding: utf-8 -*-
+
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
from __future__ import absolute_import
from __future__ import division
from future import standard_library
standard_library.install_aliases()
from builtins import str
from builtins import range
+from functools import partial
import apiclient
+import ciso8601
+import datetime
+import hashlib
+import json
+import logging
import mock
import os
import pwd
+import random
import re
+import select
import shutil
+import signal
import subprocess
import sys
import tempfile
import time
import unittest
+import uuid
import yaml
-import threading
-import hashlib
-import random
import arvados
import arvados.commands.put as arv_put
self.last_cache.close()
resume_cache = arv_put.ResumeCache(self.last_cache.filename)
self.assertNotEqual(None, resume_cache)
- self.assertRaises(None, resume_cache.check_cache())
+ resume_cache.check_cache()
def test_basic_cache_storage(self):
thing = ['test', 'list']
def test_cache_is_locked(self):
with tempfile.NamedTemporaryFile() as cachefile:
- cache = arv_put.ResumeCache(cachefile.name)
+ _ = arv_put.ResumeCache(cachefile.name)
self.assertRaises(arv_put.ResumeCacheConflict,
arv_put.ResumeCache, cachefile.name)
with open(os.path.join(self.small_files_dir, str(i)), 'w') as f:
f.write(data + str(i))
self.arvfile_write = getattr(arvados.arvfile.ArvadosFileWriter, 'write')
+ # Temp dir to hold a symlink to other temp dir
+ self.tempdir_with_symlink = tempfile.mkdtemp()
+ os.symlink(self.tempdir, os.path.join(self.tempdir_with_symlink, 'linkeddir'))
+ os.symlink(os.path.join(self.tempdir, '1'),
+ os.path.join(self.tempdir_with_symlink, 'linkedfile'))
def tearDown(self):
super(ArvPutUploadJobTest, self).tearDown()
shutil.rmtree(self.tempdir)
os.unlink(self.large_file_name)
shutil.rmtree(self.small_files_dir)
+ shutil.rmtree(self.tempdir_with_symlink)
+
+ def test_symlinks_are_followed_by_default(self):
+ cwriter = arv_put.ArvPutUploadJob([self.tempdir_with_symlink])
+ cwriter.start(save_collection=False)
+ self.assertIn('linkeddir', cwriter.manifest_text())
+ self.assertIn('linkedfile', cwriter.manifest_text())
+ cwriter.destroy_cache()
+
+ def test_symlinks_are_not_followed_when_requested(self):
+ cwriter = arv_put.ArvPutUploadJob([self.tempdir_with_symlink],
+ follow_links=False)
+ cwriter.start(save_collection=False)
+ self.assertNotIn('linkeddir', cwriter.manifest_text())
+ self.assertNotIn('linkedfile', cwriter.manifest_text())
+ cwriter.destroy_cache()
+
+ def test_passing_nonexistant_path_raise_exception(self):
+ uuid_str = str(uuid.uuid4())
+ with self.assertRaises(arv_put.PathDoesNotExistError):
+ arv_put.ArvPutUploadJob(["/this/path/does/not/exist/{}".format(uuid_str)])
def test_writer_works_without_cache(self):
cwriter = arv_put.ArvPutUploadJob(['/dev/null'], resume=False)
f.flush()
cwriter = arv_put.ArvPutUploadJob([f.name])
cwriter.start(save_collection=False)
- self.assertEqual(3, cwriter.bytes_written - cwriter.bytes_skipped)
+ self.assertEqual(0, cwriter.bytes_skipped)
+ self.assertEqual(3, cwriter.bytes_written)
# Don't destroy the cache, and start another upload
cwriter_new = arv_put.ArvPutUploadJob([f.name])
cwriter_new.start(save_collection=False)
cwriter_new.destroy_cache()
- self.assertEqual(0, cwriter_new.bytes_written - cwriter_new.bytes_skipped)
+ self.assertEqual(3, cwriter_new.bytes_skipped)
+ self.assertEqual(3, cwriter_new.bytes_written)
def make_progress_tester(self):
progression = []
for expect_count in (None, 8):
progression, reporter = self.make_progress_tester()
cwriter = arv_put.ArvPutUploadJob([f.name],
- reporter=reporter, bytes_expected=expect_count)
+ reporter=reporter)
+ cwriter.bytes_expected = expect_count
cwriter.start(save_collection=False)
cwriter.destroy_cache()
self.assertIn((3, expect_count), progression)
self.assertGreater(writer.bytes_written, 0)
self.assertLess(writer.bytes_written,
os.path.getsize(self.large_file_name))
- # Retry the upload using dry_run to check if there is a pending upload
- writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
- replication_desired=1,
- dry_run=True)
with self.assertRaises(arv_put.ArvPutUploadIsPending):
- writer2.start(save_collection=False)
+ # Retry the upload using dry_run to check if there is a pending upload
+ writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
+ replication_desired=1,
+ dry_run=True)
# Complete the pending upload
writer3 = arv_put.ArvPutUploadJob([self.large_file_name],
replication_desired=1)
writer3.start(save_collection=False)
- # Confirm there's no pending upload with dry_run=True
- writer4 = arv_put.ArvPutUploadJob([self.large_file_name],
- replication_desired=1,
- dry_run=True)
with self.assertRaises(arv_put.ArvPutUploadNotPending):
- writer4.start(save_collection=False)
- writer4.destroy_cache()
+ # Confirm there's no pending upload with dry_run=True
+ writer4 = arv_put.ArvPutUploadJob([self.large_file_name],
+ replication_desired=1,
+ dry_run=True)
# Test obvious cases
with self.assertRaises(arv_put.ArvPutUploadIsPending):
arv_put.ArvPutUploadJob([self.large_file_name],
resume=False)
del(self.writer)
+class CachedManifestValidationTest(ArvadosBaseTestCase):
+ class MockedPut(arv_put.ArvPutUploadJob):
+ def __init__(self, cached_manifest=None):
+ self._state = arv_put.ArvPutUploadJob.EMPTY_STATE
+ self._state['manifest'] = cached_manifest
+ self._api_client = mock.MagicMock()
+ self.logger = mock.MagicMock()
+ self.num_retries = 1
+
+ def datetime_to_hex(self, dt):
+ return hex(int(time.mktime(dt.timetuple())))[2:]
+
+ def setUp(self):
+ super(CachedManifestValidationTest, self).setUp()
+ self.block1 = "fdba98970961edb29f88241b9d99d890" # foo
+ self.block2 = "37b51d194a7513e45b56f6524f2d51f2" # bar
+ self.template = ". "+self.block1+"+3+Asignature@%s "+self.block2+"+3+Anothersignature@%s 0:3:foofile.txt 3:6:barfile.txt\n"
+
+ def test_empty_cached_manifest_is_valid(self):
+ put_mock = self.MockedPut()
+ self.assertEqual(None, put_mock._state.get('manifest'))
+ self.assertTrue(put_mock._cached_manifest_valid())
+ put_mock._state['manifest'] = ''
+ self.assertTrue(put_mock._cached_manifest_valid())
+
+ def test_signature_cases(self):
+ now = datetime.datetime.utcnow()
+ yesterday = now - datetime.timedelta(days=1)
+ lastweek = now - datetime.timedelta(days=7)
+ tomorrow = now + datetime.timedelta(days=1)
+ nextweek = now + datetime.timedelta(days=7)
+
+ def mocked_head(blocks={}, loc=None):
+ blk = loc.split('+', 1)[0]
+ if blocks.get(blk):
+ return True
+ raise arvados.errors.KeepRequestError("mocked error - block invalid")
+
+ # Block1_expiration, Block2_expiration, Block1_HEAD, Block2_HEAD, Expectation
+ cases = [
+ # All expired, reset cache - OK
+ (yesterday, lastweek, False, False, True),
+ (lastweek, yesterday, False, False, True),
+ # All non-expired valid blocks - OK
+ (tomorrow, nextweek, True, True, True),
+ (nextweek, tomorrow, True, True, True),
+ # All non-expired invalid blocks - Not OK
+ (tomorrow, nextweek, False, False, False),
+ (nextweek, tomorrow, False, False, False),
+ # One non-expired valid block - OK
+ (tomorrow, yesterday, True, False, True),
+ (yesterday, tomorrow, False, True, True),
+ # One non-expired invalid block - Not OK
+ (tomorrow, yesterday, False, False, False),
+ (yesterday, tomorrow, False, False, False),
+ ]
+ for case in cases:
+ b1_expiration, b2_expiration, b1_valid, b2_valid, outcome = case
+ head_responses = {
+ self.block1: b1_valid,
+ self.block2: b2_valid,
+ }
+ cached_manifest = self.template % (
+ self.datetime_to_hex(b1_expiration),
+ self.datetime_to_hex(b2_expiration),
+ )
+ arvput = self.MockedPut(cached_manifest)
+ with mock.patch('arvados.collection.KeepClient.head') as head_mock:
+ head_mock.side_effect = partial(mocked_head, head_responses)
+ self.assertEqual(outcome, arvput._cached_manifest_valid(),
+ "Case '%s' should have produced outcome '%s'" % (case, outcome)
+ )
+ if b1_expiration > now or b2_expiration > now:
+ # A HEAD request should have been done
+ head_mock.assert_called_once()
+ else:
+ head_mock.assert_not_called()
+
+
class ArvadosExpectedBytesTest(ArvadosBaseTestCase):
TEST_SIZE = os.path.getsize(__file__)
def test_expected_bytes_for_file(self):
+ writer = arv_put.ArvPutUploadJob([__file__])
self.assertEqual(self.TEST_SIZE,
- arv_put.expected_bytes_for([__file__]))
+ writer.bytes_expected)
def test_expected_bytes_for_tree(self):
tree = self.make_tmpdir()
shutil.copyfile(__file__, os.path.join(tree, 'one'))
shutil.copyfile(__file__, os.path.join(tree, 'two'))
+
+ writer = arv_put.ArvPutUploadJob([tree])
self.assertEqual(self.TEST_SIZE * 2,
- arv_put.expected_bytes_for([tree]))
+ writer.bytes_expected)
+ writer = arv_put.ArvPutUploadJob([tree, __file__])
self.assertEqual(self.TEST_SIZE * 3,
- arv_put.expected_bytes_for([tree, __file__]))
+ writer.bytes_expected)
def test_expected_bytes_for_device(self):
- self.assertIsNone(arv_put.expected_bytes_for(['/dev/null']))
- self.assertIsNone(arv_put.expected_bytes_for([__file__, '/dev/null']))
+ writer = arv_put.ArvPutUploadJob(['/dev/null'], use_cache=False, resume=False)
+ self.assertIsNone(writer.bytes_expected)
+ writer = arv_put.ArvPutUploadJob([__file__, '/dev/null'])
+ self.assertIsNone(writer.bytes_expected)
class ArvadosPutReportTest(ArvadosBaseTestCase):
arv_put.human_progress(count, None)))
+class ArvPutLogFormatterTest(ArvadosBaseTestCase):
+ matcher = r'\(X-Request-Id: req-[a-z0-9]{20}\)'
+
+ def setUp(self):
+ super(ArvPutLogFormatterTest, self).setUp()
+ self.stderr = tutil.StringIO()
+ self.loggingHandler = logging.StreamHandler(self.stderr)
+ self.loggingHandler.setFormatter(
+ arv_put.ArvPutLogFormatter(arvados.util.new_request_id()))
+ self.logger = logging.getLogger()
+ self.logger.addHandler(self.loggingHandler)
+ self.logger.setLevel(logging.DEBUG)
+
+ def tearDown(self):
+ self.logger.removeHandler(self.loggingHandler)
+ self.stderr.close()
+ self.stderr = None
+ super(ArvPutLogFormatterTest, self).tearDown()
+
+ def test_request_id_logged_only_once_on_error(self):
+ self.logger.error('Ooops, something bad happened.')
+ self.logger.error('Another bad thing just happened.')
+ log_lines = self.stderr.getvalue().split('\n')[:-1]
+ self.assertEqual(2, len(log_lines))
+ self.assertRegex(log_lines[0], self.matcher)
+ self.assertNotRegex(log_lines[1], self.matcher)
+
+ def test_request_id_logged_only_once_on_debug(self):
+ self.logger.debug('This is just a debug message.')
+ self.logger.debug('Another message, move along.')
+ log_lines = self.stderr.getvalue().split('\n')[:-1]
+ self.assertEqual(2, len(log_lines))
+ self.assertRegex(log_lines[0], self.matcher)
+ self.assertNotRegex(log_lines[1], self.matcher)
+
+ def test_request_id_not_logged_on_info(self):
+ self.logger.info('This should be a useful message')
+ log_lines = self.stderr.getvalue().split('\n')[:-1]
+ self.assertEqual(1, len(log_lines))
+ self.assertNotRegex(log_lines[0], self.matcher)
+
class ArvadosPutTest(run_test_server.TestCaseWithServers,
ArvadosBaseTestCase,
tutil.VersionChecker):
Z_UUID = 'zzzzz-zzzzz-zzzzzzzzzzzzzzz'
def call_main_with_args(self, args):
- self.main_stdout = tutil.StringIO()
- self.main_stderr = tutil.StringIO()
+ self.main_stdout.seek(0, 0)
+ self.main_stdout.truncate(0)
+ self.main_stderr.seek(0, 0)
+ self.main_stderr.truncate(0)
return arv_put.main(args, self.main_stdout, self.main_stderr)
def call_main_on_test_file(self, args=[]):
super(ArvadosPutTest, self).setUp()
run_test_server.authorize_with('active')
arv_put.api_client = None
+ self.main_stdout = tutil.StringIO()
+ self.main_stderr = tutil.StringIO()
+ self.loggingHandler = logging.StreamHandler(self.main_stderr)
+ self.loggingHandler.setFormatter(
+ arv_put.ArvPutLogFormatter(arvados.util.new_request_id()))
+ logging.getLogger().addHandler(self.loggingHandler)
def tearDown(self):
+ logging.getLogger().removeHandler(self.loggingHandler)
for outbuf in ['main_stdout', 'main_stderr']:
if hasattr(self, outbuf):
getattr(self, outbuf).close()
self.call_main_with_args,
['--project-uuid', self.Z_UUID, '--stream'])
+ def test_error_when_multiple_storage_classes_specified(self):
+ self.assertRaises(SystemExit,
+ self.call_main_with_args,
+ ['--storage-classes', 'hot,cold'])
+
+ def test_error_when_excluding_absolute_path(self):
+ tmpdir = self.make_tmpdir()
+ self.assertRaises(SystemExit,
+ self.call_main_with_args,
+ ['--exclude', '/some/absolute/path/*',
+ tmpdir])
+
def test_api_error_handling(self):
coll_save_mock = mock.Mock(name='arv.collection.Collection().save_new()')
coll_save_mock.side_effect = arvados.errors.ApiError(
self.assertLess(0, coll_save_mock.call_count)
self.assertEqual("", self.main_stdout.getvalue())
+ def test_request_id_logging_on_error(self):
+ matcher = r'\(X-Request-Id: req-[a-z0-9]{20}\)\n'
+ coll_save_mock = mock.Mock(name='arv.collection.Collection().save_new()')
+ coll_save_mock.side_effect = arvados.errors.ApiError(
+ fake_httplib2_response(403), b'{}')
+ with mock.patch('arvados.collection.Collection.save_new',
+ new=coll_save_mock):
+ with self.assertRaises(SystemExit):
+ self.call_main_with_args(['/dev/null'])
+ self.assertRegex(
+ self.main_stderr.getvalue(), matcher)
+
class ArvPutIntegrationTest(run_test_server.TestCaseWithServers,
ArvadosBaseTestCase):
- def _getKeepServerConfig():
- for config_file, mandatory in [
- ['application.yml', False], ['application.default.yml', True]]:
- path = os.path.join(run_test_server.SERVICES_SRC_DIR,
- "api", "config", config_file)
- if not mandatory and not os.path.exists(path):
- continue
- with open(path) as f:
- rails_config = yaml.load(f.read())
- for config_section in ['test', 'common']:
- try:
- key = rails_config[config_section]["blob_signing_key"]
- except (KeyError, TypeError):
- pass
- else:
- return {'blob_signing_key': key,
- 'enforce_permissions': True}
- return {'blog_signing_key': None, 'enforce_permissions': False}
-
MAIN_SERVER = {}
- KEEP_SERVER = _getKeepServerConfig()
+ KEEP_SERVER = {'blob_signing': True}
PROJECT_UUID = run_test_server.fixture('groups')['aproject']['uuid']
@classmethod
cls.ENVIRON = os.environ.copy()
cls.ENVIRON['PYTHONPATH'] = ':'.join(sys.path)
+ def datetime_to_hex(self, dt):
+ return hex(int(time.mktime(dt.timetuple())))[2:]
+
def setUp(self):
super(ArvPutIntegrationTest, self).setUp()
arv_put.api_client = None
BAD_UUID = 'zzzzz-tpzed-zzzzzzzzzzzzzzz'
self.authorize_with('active')
with self.assertRaises(apiclient.errors.HttpError):
- result = arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID,
+ arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID,
0)
def test_short_put_from_stdin(self):
self.assertIn('4a9c8b735dce4b5fa3acf221a0b13628+11',
pipe.stdout.read().decode())
+ def test_sigint_logs_request_id(self):
+ # Start arv-put, give it a chance to start up, send SIGINT,
+ # and check that its output includes the X-Request-Id.
+ input_stream = subprocess.Popen(
+ ['sleep', '10'],
+ stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
+ pipe = subprocess.Popen(
+ [sys.executable, arv_put.__file__, '--stream'],
+ stdin=input_stream.stdout, stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT, env=self.ENVIRON)
+ # Wait for arv-put child process to print something (i.e., a
+ # log message) so we know its signal handler is installed.
+ select.select([pipe.stdout], [], [], 10)
+ pipe.send_signal(signal.SIGINT)
+ deadline = time.time() + 5
+ while (pipe.poll() is None) and (time.time() < deadline):
+ time.sleep(.1)
+ returncode = pipe.poll()
+ input_stream.terminate()
+ if returncode is None:
+ pipe.terminate()
+ self.fail("arv-put did not exit within 5 seconds")
+ self.assertRegex(pipe.stdout.read().decode(), r'\(X-Request-Id: req-[a-z0-9]{20}\)')
+
def test_ArvPutSignedManifest(self):
# ArvPutSignedManifest runs "arv-put foo" and then attempts to get
# the newly created manifest from the API server, testing to confirm
# we're about to create is not present in our test fixture.
manifest_uuid = "00b4e9f40ac4dd432ef89749f1c01e74+47"
with self.assertRaises(apiclient.errors.HttpError):
- notfound = arv_put.api_client.collections().get(
+ arv_put.api_client.collections().get(
uuid=manifest_uuid).execute()
datadir = self.make_tmpdir()
with open(os.path.join(datadir, "foo"), "w") as f:
f.write("The quick brown fox jumped over the lazy dog")
- p = subprocess.Popen([sys.executable, arv_put.__file__, datadir],
+ p = subprocess.Popen([sys.executable, arv_put.__file__,
+ os.path.join(datadir, 'foo')],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=self.ENVIRON)
- (out, err) = p.communicate()
+ (_, err) = p.communicate()
self.assertRegex(err.decode(), r'INFO: Collection saved as ')
self.assertEqual(p.returncode, 0)
self.assertEqual(1, len(collection_list))
return collection_list[0]
+ def test_all_expired_signatures_invalidates_cache(self):
+ self.authorize_with('active')
+ tmpdir = self.make_tmpdir()
+ with open(os.path.join(tmpdir, 'somefile.txt'), 'w') as f:
+ f.write('foo')
+ # Upload a directory and get the cache file name
+ p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ env=self.ENVIRON)
+ (_, err) = p.communicate()
+ self.assertRegex(err.decode(), r'INFO: Creating new cache file at ')
+ self.assertEqual(p.returncode, 0)
+ cache_filepath = re.search(r'INFO: Creating new cache file at (.*)',
+ err.decode()).groups()[0]
+ self.assertTrue(os.path.isfile(cache_filepath))
+ # Load the cache file contents and modify the manifest to simulate
+ # an expired access token
+ with open(cache_filepath, 'r') as c:
+ cache = json.load(c)
+ self.assertRegex(cache['manifest'], r'\+A\S+\@')
+ a_month_ago = datetime.datetime.now() - datetime.timedelta(days=30)
+ cache['manifest'] = re.sub(
+ r'\@.*? ',
+ "@{} ".format(self.datetime_to_hex(a_month_ago)),
+ cache['manifest'])
+ with open(cache_filepath, 'w') as c:
+ c.write(json.dumps(cache))
+ # Re-run the upload and expect to get an invalid cache message
+ p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ env=self.ENVIRON)
+ (_, err) = p.communicate()
+ self.assertRegex(
+ err.decode(),
+ r'INFO: Cache expired, starting from scratch.*')
+ self.assertEqual(p.returncode, 0)
+
+ def test_invalid_signature_invalidates_cache(self):
+ self.authorize_with('active')
+ tmpdir = self.make_tmpdir()
+ with open(os.path.join(tmpdir, 'somefile.txt'), 'w') as f:
+ f.write('foo')
+ # Upload a directory and get the cache file name
+ p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ env=self.ENVIRON)
+ (_, err) = p.communicate()
+ self.assertRegex(err.decode(), r'INFO: Creating new cache file at ')
+ self.assertEqual(p.returncode, 0)
+ cache_filepath = re.search(r'INFO: Creating new cache file at (.*)',
+ err.decode()).groups()[0]
+ self.assertTrue(os.path.isfile(cache_filepath))
+ # Load the cache file contents and modify the manifest to simulate
+ # an invalid access token
+ with open(cache_filepath, 'r') as c:
+ cache = json.load(c)
+ self.assertRegex(cache['manifest'], r'\+A\S+\@')
+ cache['manifest'] = re.sub(
+ r'\+A.*\@',
+ "+Aabcdef0123456789abcdef0123456789abcdef01@",
+ cache['manifest'])
+ with open(cache_filepath, 'w') as c:
+ c.write(json.dumps(cache))
+ # Re-run the upload and expect to get an invalid cache message
+ p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ env=self.ENVIRON)
+ (_, err) = p.communicate()
+ self.assertRegex(
+ err.decode(),
+ r'ERROR: arv-put: Resume cache contains invalid signature.*')
+ self.assertEqual(p.returncode, 1)
+
+ def test_single_expired_signature_reuploads_file(self):
+ self.authorize_with('active')
+ tmpdir = self.make_tmpdir()
+ with open(os.path.join(tmpdir, 'foofile.txt'), 'w') as f:
+ f.write('foo')
+ # Write a second file on its own subdir to force a new stream
+ os.mkdir(os.path.join(tmpdir, 'bar'))
+ with open(os.path.join(tmpdir, 'bar', 'barfile.txt'), 'w') as f:
+ f.write('bar')
+ # Upload a directory and get the cache file name
+ p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ env=self.ENVIRON)
+ (_, err) = p.communicate()
+ self.assertRegex(err.decode(), r'INFO: Creating new cache file at ')
+ self.assertEqual(p.returncode, 0)
+ cache_filepath = re.search(r'INFO: Creating new cache file at (.*)',
+ err.decode()).groups()[0]
+ self.assertTrue(os.path.isfile(cache_filepath))
+ # Load the cache file contents and modify the manifest to simulate
+ # an expired access token
+ with open(cache_filepath, 'r') as c:
+ cache = json.load(c)
+ self.assertRegex(cache['manifest'], r'\+A\S+\@')
+ a_month_ago = datetime.datetime.now() - datetime.timedelta(days=30)
+ # Make one of the signatures appear to have expired
+ cache['manifest'] = re.sub(
+ r'\@.*? 3:3:barfile.txt',
+ "@{} 3:3:barfile.txt".format(self.datetime_to_hex(a_month_ago)),
+ cache['manifest'])
+ with open(cache_filepath, 'w') as c:
+ c.write(json.dumps(cache))
+ # Re-run the upload and expect to get an invalid cache message
+ p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ env=self.ENVIRON)
+ (_, err) = p.communicate()
+ self.assertRegex(
+ err.decode(),
+ r'WARNING: Uploaded file \'.*barfile.txt\' access token expired, will re-upload it from scratch')
+ self.assertEqual(p.returncode, 0)
+ # Confirm that the resulting cache is different from the last run.
+ with open(cache_filepath, 'r') as c2:
+ new_cache = json.load(c2)
+ self.assertNotEqual(cache['manifest'], new_cache['manifest'])
+
def test_put_collection_with_later_update(self):
tmpdir = self.make_tmpdir()
with open(os.path.join(tmpdir, 'file1'), 'w') as f:
self.assertEqual(col['uuid'], updated_col['uuid'])
# Get the manifest and check that the new file is being included
c = arv_put.api_client.collections().get(uuid=updated_col['uuid']).execute()
- self.assertRegex(c['manifest_text'], r'^\. .*:44:file2\n')
+ self.assertRegex(c['manifest_text'], r'^\..* .*:44:file2\n')
+
+ def test_put_collection_with_utc_expiring_datetime(self):
+ tmpdir = self.make_tmpdir()
+ trash_at = (datetime.datetime.utcnow() + datetime.timedelta(days=90)).strftime('%Y%m%dT%H%MZ')
+ with open(os.path.join(tmpdir, 'file1'), 'w') as f:
+ f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
+ col = self.run_and_find_collection(
+ "",
+ ['--no-progress', '--trash-at', trash_at, tmpdir])
+ self.assertNotEqual(None, col['uuid'])
+ c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
+ self.assertEqual(ciso8601.parse_datetime(trash_at),
+ ciso8601.parse_datetime(c['trash_at']))
+
+ def test_put_collection_with_timezone_aware_expiring_datetime(self):
+ tmpdir = self.make_tmpdir()
+ trash_at = (datetime.datetime.utcnow() + datetime.timedelta(days=90)).strftime('%Y%m%dT%H%M-0300')
+ with open(os.path.join(tmpdir, 'file1'), 'w') as f:
+ f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
+ col = self.run_and_find_collection(
+ "",
+ ['--no-progress', '--trash-at', trash_at, tmpdir])
+ self.assertNotEqual(None, col['uuid'])
+ c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
+ self.assertEqual(
+ ciso8601.parse_datetime(trash_at).replace(tzinfo=None) + datetime.timedelta(hours=3),
+ ciso8601.parse_datetime(c['trash_at']).replace(tzinfo=None))
+
+ def test_put_collection_with_timezone_naive_expiring_datetime(self):
+ tmpdir = self.make_tmpdir()
+ trash_at = (datetime.datetime.utcnow() + datetime.timedelta(days=90)).strftime('%Y%m%dT%H%M')
+ with open(os.path.join(tmpdir, 'file1'), 'w') as f:
+ f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
+ col = self.run_and_find_collection(
+ "",
+ ['--no-progress', '--trash-at', trash_at, tmpdir])
+ self.assertNotEqual(None, col['uuid'])
+ c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
+ if time.daylight:
+ offset = datetime.timedelta(seconds=time.altzone)
+ else:
+ offset = datetime.timedelta(seconds=time.timezone)
+ self.assertEqual(
+ ciso8601.parse_datetime(trash_at) + offset,
+ ciso8601.parse_datetime(c['trash_at']).replace(tzinfo=None))
+
+ def test_put_collection_with_expiring_date_only(self):
+ tmpdir = self.make_tmpdir()
+ trash_at = '2140-01-01'
+ end_of_day = datetime.timedelta(hours=23, minutes=59, seconds=59)
+ with open(os.path.join(tmpdir, 'file1'), 'w') as f:
+ f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
+ col = self.run_and_find_collection(
+ "",
+ ['--no-progress', '--trash-at', trash_at, tmpdir])
+ self.assertNotEqual(None, col['uuid'])
+ c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
+ if time.daylight:
+ offset = datetime.timedelta(seconds=time.altzone)
+ else:
+ offset = datetime.timedelta(seconds=time.timezone)
+ self.assertEqual(
+ ciso8601.parse_datetime(trash_at) + end_of_day + offset,
+ ciso8601.parse_datetime(c['trash_at']).replace(tzinfo=None))
+
+ def test_put_collection_with_invalid_absolute_expiring_datetimes(self):
+ cases = ['2100', '210010','2100-10', '2100-Oct']
+ tmpdir = self.make_tmpdir()
+ with open(os.path.join(tmpdir, 'file1'), 'w') as f:
+ f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
+ for test_datetime in cases:
+ with self.assertRaises(AssertionError):
+ self.run_and_find_collection(
+ "",
+ ['--no-progress', '--trash-at', test_datetime, tmpdir])
+
+ def test_put_collection_with_relative_expiring_datetime(self):
+ expire_after = 7
+ dt_before = datetime.datetime.utcnow() + datetime.timedelta(days=expire_after)
+ tmpdir = self.make_tmpdir()
+ with open(os.path.join(tmpdir, 'file1'), 'w') as f:
+ f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
+ col = self.run_and_find_collection(
+ "",
+ ['--no-progress', '--trash-after', str(expire_after), tmpdir])
+ self.assertNotEqual(None, col['uuid'])
+ dt_after = datetime.datetime.utcnow() + datetime.timedelta(days=expire_after)
+ c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
+ trash_at = ciso8601.parse_datetime(c['trash_at']).replace(tzinfo=None)
+ self.assertTrue(dt_before < trash_at)
+ self.assertTrue(dt_after > trash_at)
+
+ def test_put_collection_with_invalid_relative_expiring_datetime(self):
+ expire_after = 0 # Must be >= 1
+ tmpdir = self.make_tmpdir()
+ with open(os.path.join(tmpdir, 'file1'), 'w') as f:
+ f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
+ with self.assertRaises(AssertionError):
+ self.run_and_find_collection(
+ "",
+ ['--no-progress', '--trash-after', str(expire_after), tmpdir])
+
+ def test_upload_directory_reference_without_trailing_slash(self):
+ tmpdir1 = self.make_tmpdir()
+ tmpdir2 = self.make_tmpdir()
+ with open(os.path.join(tmpdir1, 'foo'), 'w') as f:
+ f.write('This is foo')
+ with open(os.path.join(tmpdir2, 'bar'), 'w') as f:
+ f.write('This is not foo')
+ # Upload one directory and one file
+ col = self.run_and_find_collection("", ['--no-progress',
+ tmpdir1,
+ os.path.join(tmpdir2, 'bar')])
+ self.assertNotEqual(None, col['uuid'])
+ c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
+ # Check that 'foo' was written inside a subcollection
+ # OTOH, 'bar' should have been directly uploaded on the root collection
+ self.assertRegex(c['manifest_text'], r'^\. .*:15:bar\n\./.+ .*:11:foo\n')
+
+ def test_upload_directory_reference_with_trailing_slash(self):
+ tmpdir1 = self.make_tmpdir()
+ tmpdir2 = self.make_tmpdir()
+ with open(os.path.join(tmpdir1, 'foo'), 'w') as f:
+ f.write('This is foo')
+ with open(os.path.join(tmpdir2, 'bar'), 'w') as f:
+ f.write('This is not foo')
+ # Upload one directory (with trailing slash) and one file
+ col = self.run_and_find_collection("", ['--no-progress',
+ tmpdir1 + os.sep,
+ os.path.join(tmpdir2, 'bar')])
+ self.assertNotEqual(None, col['uuid'])
+ c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
+ # Check that 'foo' and 'bar' were written at the same level
+ self.assertRegex(c['manifest_text'], r'^\. .*:15:bar .*:11:foo\n')
def test_put_collection_with_high_redundancy(self):
# Write empty data: we're not testing CollectionWriter, just
'--project-uuid', self.PROJECT_UUID])
self.assertEqual(link_name, collection['name'])
+ def test_put_collection_with_storage_classes_specified(self):
+ collection = self.run_and_find_collection("", ['--storage-classes', 'hot'])
+
+ self.assertEqual(len(collection['storage_classes_desired']), 1)
+ self.assertEqual(collection['storage_classes_desired'][0], 'hot')
+
+ def test_put_collection_without_storage_classes_specified(self):
+ collection = self.run_and_find_collection("")
+
+ self.assertEqual(len(collection['storage_classes_desired']), 1)
+ self.assertEqual(collection['storage_classes_desired'][0], 'default')
+
+ def test_exclude_filename_pattern(self):
+ tmpdir = self.make_tmpdir()
+ tmpsubdir = os.path.join(tmpdir, 'subdir')
+ os.mkdir(tmpsubdir)
+ for fname in ['file1', 'file2', 'file3']:
+ with open(os.path.join(tmpdir, "%s.txt" % fname), 'w') as f:
+ f.write("This is %s" % fname)
+ with open(os.path.join(tmpsubdir, "%s.txt" % fname), 'w') as f:
+ f.write("This is %s" % fname)
+ col = self.run_and_find_collection("", ['--no-progress',
+ '--exclude', '*2.txt',
+ '--exclude', 'file3.*',
+ tmpdir])
+ self.assertNotEqual(None, col['uuid'])
+ c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
+ # None of the file2.txt & file3.txt should have been uploaded
+ self.assertRegex(c['manifest_text'], r'^.*:file1.txt')
+ self.assertNotRegex(c['manifest_text'], r'^.*:file2.txt')
+ self.assertNotRegex(c['manifest_text'], r'^.*:file3.txt')
+
+ def test_exclude_filepath_pattern(self):
+ tmpdir = self.make_tmpdir()
+ tmpsubdir = os.path.join(tmpdir, 'subdir')
+ os.mkdir(tmpsubdir)
+ for fname in ['file1', 'file2', 'file3']:
+ with open(os.path.join(tmpdir, "%s.txt" % fname), 'w') as f:
+ f.write("This is %s" % fname)
+ with open(os.path.join(tmpsubdir, "%s.txt" % fname), 'w') as f:
+ f.write("This is %s" % fname)
+ col = self.run_and_find_collection("", ['--no-progress',
+ '--exclude', 'subdir/*2.txt',
+ '--exclude', './file1.*',
+ tmpdir])
+ self.assertNotEqual(None, col['uuid'])
+ c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
+ # Only tmpdir/file1.txt & tmpdir/subdir/file2.txt should have been excluded
+ self.assertNotRegex(c['manifest_text'],
+ r'^\./%s.*:file1.txt' % os.path.basename(tmpdir))
+ self.assertNotRegex(c['manifest_text'],
+ r'^\./%s/subdir.*:file2.txt' % os.path.basename(tmpdir))
+ self.assertRegex(c['manifest_text'],
+ r'^\./%s.*:file2.txt' % os.path.basename(tmpdir))
+ self.assertRegex(c['manifest_text'], r'^.*:file3.txt')
+
+ def test_unicode_on_filename(self):
+ tmpdir = self.make_tmpdir()
+ fname = u"i❤arvados.txt"
+ with open(os.path.join(tmpdir, fname), 'w') as f:
+ f.write("This is a unicode named file")
+ col = self.run_and_find_collection("", ['--no-progress', tmpdir])
+ self.assertNotEqual(None, col['uuid'])
+ c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
+ self.assertTrue(fname in c['manifest_text'], u"{} does not include {}".format(c['manifest_text'], fname))
+
+ def test_silent_mode_no_errors(self):
+ self.authorize_with('active')
+ tmpdir = self.make_tmpdir()
+ with open(os.path.join(tmpdir, 'test.txt'), 'w') as f:
+ f.write('hello world')
+ pipe = subprocess.Popen(
+ [sys.executable, arv_put.__file__] + ['--silent', tmpdir],
+ stdin=subprocess.PIPE, stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE, env=self.ENVIRON)
+ stdout, stderr = pipe.communicate()
+ # No console output should occur on normal operations
+ self.assertNotRegex(stderr.decode(), r'.+')
+ self.assertNotRegex(stdout.decode(), r'.+')
+
+ def test_silent_mode_does_not_avoid_error_messages(self):
+ self.authorize_with('active')
+ pipe = subprocess.Popen(
+ [sys.executable, arv_put.__file__] + ['--silent',
+ '/path/not/existant'],
+ stdin=subprocess.PIPE, stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE, env=self.ENVIRON)
+ stdout, stderr = pipe.communicate()
+ # Error message should be displayed when errors happen
+ self.assertRegex(stderr.decode(), r'.*ERROR:.*')
+ self.assertNotRegex(stdout.decode(), r'.+')
+
if __name__ == '__main__':
unittest.main()