X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/6f2ce945ee9ad422c3b28fb6433e22d816dddfe1..c4107b1da872fe7df4c73de1a1d496bbfa1290dc:/sdk/python/tests/test_arv_put.py diff --git a/sdk/python/tests/test_arv_put.py b/sdk/python/tests/test_arv_put.py index 346167846c..540e06c6c6 100644 --- a/sdk/python/tests/test_arv_put.py +++ b/sdk/python/tests/test_arv_put.py @@ -1,3 +1,5 @@ +# -*- coding: utf-8 -*- + # Copyright (C) The Arvados Authors. All rights reserved. # # SPDX-License-Identifier: Apache-2.0 @@ -8,20 +10,23 @@ from future import standard_library standard_library.install_aliases() from builtins import str from builtins import range +from functools import partial import apiclient import datetime import hashlib import json +import logging import mock import os import pwd import random import re +import select import shutil +import signal import subprocess import sys import tempfile -import threading import time import unittest import uuid @@ -526,6 +531,85 @@ class ArvPutUploadJobTest(run_test_server.TestCaseWithServers, resume=False) del(self.writer) +class CachedManifestValidationTest(ArvadosBaseTestCase): + class MockedPut(arv_put.ArvPutUploadJob): + def __init__(self, cached_manifest=None): + self._state = arv_put.ArvPutUploadJob.EMPTY_STATE + self._state['manifest'] = cached_manifest + self._api_client = mock.MagicMock() + self.logger = mock.MagicMock() + self.num_retries = 1 + + def datetime_to_hex(self, dt): + return hex(int(time.mktime(dt.timetuple())))[2:] + + def setUp(self): + super(CachedManifestValidationTest, self).setUp() + self.block1 = "fdba98970961edb29f88241b9d99d890" # foo + self.block2 = "37b51d194a7513e45b56f6524f2d51f2" # bar + self.template = ". "+self.block1+"+3+Asignature@%s "+self.block2+"+3+Anothersignature@%s 0:3:foofile.txt 3:6:barfile.txt\n" + + def test_empty_cached_manifest_is_valid(self): + put_mock = self.MockedPut() + self.assertEqual(None, put_mock._state.get('manifest')) + self.assertTrue(put_mock._cached_manifest_valid()) + put_mock._state['manifest'] = '' + self.assertTrue(put_mock._cached_manifest_valid()) + + def test_signature_cases(self): + now = datetime.datetime.utcnow() + yesterday = now - datetime.timedelta(days=1) + lastweek = now - datetime.timedelta(days=7) + tomorrow = now + datetime.timedelta(days=1) + nextweek = now + datetime.timedelta(days=7) + + def mocked_head(blocks={}, loc=None): + blk = loc.split('+', 1)[0] + if blocks.get(blk): + return True + raise arvados.errors.KeepRequestError("mocked error - block invalid") + + # Block1_expiration, Block2_expiration, Block1_HEAD, Block2_HEAD, Expectation + cases = [ + # All expired, reset cache - OK + (yesterday, lastweek, False, False, True), + (lastweek, yesterday, False, False, True), + # All non-expired valid blocks - OK + (tomorrow, nextweek, True, True, True), + (nextweek, tomorrow, True, True, True), + # All non-expired invalid blocks - Not OK + (tomorrow, nextweek, False, False, False), + (nextweek, tomorrow, False, False, False), + # One non-expired valid block - OK + (tomorrow, yesterday, True, False, True), + (yesterday, tomorrow, False, True, True), + # One non-expired invalid block - Not OK + (tomorrow, yesterday, False, False, False), + (yesterday, tomorrow, False, False, False), + ] + for case in cases: + b1_expiration, b2_expiration, b1_valid, b2_valid, outcome = case + head_responses = { + self.block1: b1_valid, + self.block2: b2_valid, + } + cached_manifest = self.template % ( + self.datetime_to_hex(b1_expiration), + self.datetime_to_hex(b2_expiration), + ) + arvput = self.MockedPut(cached_manifest) + with mock.patch('arvados.collection.KeepClient.head') as head_mock: + head_mock.side_effect = partial(mocked_head, head_responses) + self.assertEqual(outcome, arvput._cached_manifest_valid(), + "Case '%s' should have produced outcome '%s'" % (case, outcome) + ) + if b1_expiration > now or b2_expiration > now: + # A HEAD request should have been done + head_mock.assert_called_once() + else: + head_mock.assert_not_called() + + class ArvadosExpectedBytesTest(ArvadosBaseTestCase): TEST_SIZE = os.path.getsize(__file__) @@ -547,7 +631,7 @@ class ArvadosExpectedBytesTest(ArvadosBaseTestCase): writer.bytes_expected) def test_expected_bytes_for_device(self): - writer = arv_put.ArvPutUploadJob(['/dev/null']) + writer = arv_put.ArvPutUploadJob(['/dev/null'], use_cache=False, resume=False) self.assertIsNone(writer.bytes_expected) writer = arv_put.ArvPutUploadJob([__file__, '/dev/null']) self.assertIsNone(writer.bytes_expected) @@ -574,6 +658,47 @@ class ArvadosPutReportTest(ArvadosBaseTestCase): arv_put.human_progress(count, None))) +class ArvPutLogFormatterTest(ArvadosBaseTestCase): + matcher = r'\(X-Request-Id: req-[a-z0-9]{20}\)' + + def setUp(self): + super(ArvPutLogFormatterTest, self).setUp() + self.stderr = tutil.StringIO() + self.loggingHandler = logging.StreamHandler(self.stderr) + self.loggingHandler.setFormatter( + arv_put.ArvPutLogFormatter(arvados.util.new_request_id())) + self.logger = logging.getLogger() + self.logger.addHandler(self.loggingHandler) + self.logger.setLevel(logging.DEBUG) + + def tearDown(self): + self.logger.removeHandler(self.loggingHandler) + self.stderr.close() + self.stderr = None + super(ArvPutLogFormatterTest, self).tearDown() + + def test_request_id_logged_only_once_on_error(self): + self.logger.error('Ooops, something bad happened.') + self.logger.error('Another bad thing just happened.') + log_lines = self.stderr.getvalue().split('\n')[:-1] + self.assertEqual(2, len(log_lines)) + self.assertRegex(log_lines[0], self.matcher) + self.assertNotRegex(log_lines[1], self.matcher) + + def test_request_id_logged_only_once_on_debug(self): + self.logger.debug('This is just a debug message.') + self.logger.debug('Another message, move along.') + log_lines = self.stderr.getvalue().split('\n')[:-1] + self.assertEqual(2, len(log_lines)) + self.assertRegex(log_lines[0], self.matcher) + self.assertNotRegex(log_lines[1], self.matcher) + + def test_request_id_not_logged_on_info(self): + self.logger.info('This should be a useful message') + log_lines = self.stderr.getvalue().split('\n')[:-1] + self.assertEqual(1, len(log_lines)) + self.assertNotRegex(log_lines[0], self.matcher) + class ArvadosPutTest(run_test_server.TestCaseWithServers, ArvadosBaseTestCase, tutil.VersionChecker): @@ -581,8 +706,10 @@ class ArvadosPutTest(run_test_server.TestCaseWithServers, Z_UUID = 'zzzzz-zzzzz-zzzzzzzzzzzzzzz' def call_main_with_args(self, args): - self.main_stdout = tutil.StringIO() - self.main_stderr = tutil.StringIO() + self.main_stdout.seek(0, 0) + self.main_stdout.truncate(0) + self.main_stderr.seek(0, 0) + self.main_stderr.truncate(0) return arv_put.main(args, self.main_stdout, self.main_stderr) def call_main_on_test_file(self, args=[]): @@ -598,8 +725,15 @@ class ArvadosPutTest(run_test_server.TestCaseWithServers, super(ArvadosPutTest, self).setUp() run_test_server.authorize_with('active') arv_put.api_client = None + self.main_stdout = tutil.StringIO() + self.main_stderr = tutil.StringIO() + self.loggingHandler = logging.StreamHandler(self.main_stderr) + self.loggingHandler.setFormatter( + arv_put.ArvPutLogFormatter(arvados.util.new_request_id())) + logging.getLogger().addHandler(self.loggingHandler) def tearDown(self): + logging.getLogger().removeHandler(self.loggingHandler) for outbuf in ['main_stdout', 'main_stderr']: if hasattr(self, outbuf): getattr(self, outbuf).close() @@ -678,6 +812,11 @@ class ArvadosPutTest(run_test_server.TestCaseWithServers, self.call_main_with_args, ['--project-uuid', self.Z_UUID, '--stream']) + def test_error_when_multiple_storage_classes_specified(self): + self.assertRaises(SystemExit, + self.call_main_with_args, + ['--storage-classes', 'hot,cold']) + def test_error_when_excluding_absolute_path(self): tmpdir = self.make_tmpdir() self.assertRaises(SystemExit, @@ -697,6 +836,18 @@ class ArvadosPutTest(run_test_server.TestCaseWithServers, self.assertLess(0, coll_save_mock.call_count) self.assertEqual("", self.main_stdout.getvalue()) + def test_request_id_logging_on_error(self): + matcher = r'\(X-Request-Id: req-[a-z0-9]{20}\)\n' + coll_save_mock = mock.Mock(name='arv.collection.Collection().save_new()') + coll_save_mock.side_effect = arvados.errors.ApiError( + fake_httplib2_response(403), b'{}') + with mock.patch('arvados.collection.Collection.save_new', + new=coll_save_mock): + with self.assertRaises(SystemExit) as exc_test: + self.call_main_with_args(['/dev/null']) + self.assertRegex( + self.main_stderr.getvalue(), matcher) + class ArvPutIntegrationTest(run_test_server.TestCaseWithServers, ArvadosBaseTestCase): @@ -708,7 +859,7 @@ class ArvPutIntegrationTest(run_test_server.TestCaseWithServers, if not mandatory and not os.path.exists(path): continue with open(path) as f: - rails_config = yaml.load(f.read()) + rails_config = yaml.safe_load(f.read()) for config_section in ['test', 'common']: try: key = rails_config[config_section]["blob_signing_key"] @@ -794,6 +945,30 @@ class ArvPutIntegrationTest(run_test_server.TestCaseWithServers, self.assertIn('4a9c8b735dce4b5fa3acf221a0b13628+11', pipe.stdout.read().decode()) + def test_sigint_logs_request_id(self): + # Start arv-put, give it a chance to start up, send SIGINT, + # and check that its output includes the X-Request-Id. + input_stream = subprocess.Popen( + ['sleep', '10'], + stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + pipe = subprocess.Popen( + [sys.executable, arv_put.__file__, '--stream'], + stdin=input_stream.stdout, stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, env=self.ENVIRON) + # Wait for arv-put child process to print something (i.e., a + # log message) so we know its signal handler is installed. + select.select([pipe.stdout], [], [], 10) + pipe.send_signal(signal.SIGINT) + deadline = time.time() + 5 + while (pipe.poll() is None) and (time.time() < deadline): + time.sleep(.1) + returncode = pipe.poll() + input_stream.terminate() + if returncode is None: + pipe.terminate() + self.fail("arv-put did not exit within 5 seconds") + self.assertRegex(pipe.stdout.read().decode(), r'\(X-Request-Id: req-[a-z0-9]{20}\)') + def test_ArvPutSignedManifest(self): # ArvPutSignedManifest runs "arv-put foo" and then attempts to get # the newly created manifest from the API server, testing to confirm @@ -845,7 +1020,7 @@ class ArvPutIntegrationTest(run_test_server.TestCaseWithServers, self.assertEqual(1, len(collection_list)) return collection_list[0] - def test_expired_token_invalidates_cache(self): + def test_all_expired_signatures_invalidates_cache(self): self.authorize_with('active') tmpdir = self.make_tmpdir() with open(os.path.join(tmpdir, 'somefile.txt'), 'w') as f: @@ -881,7 +1056,89 @@ class ArvPutIntegrationTest(run_test_server.TestCaseWithServers, (out, err) = p.communicate() self.assertRegex( err.decode(), - r'WARNING: Uploaded file .* access token expired, will re-upload it from scratch') + r'INFO: Cache expired, starting from scratch.*') + self.assertEqual(p.returncode, 0) + + def test_invalid_signature_invalidates_cache(self): + self.authorize_with('active') + tmpdir = self.make_tmpdir() + with open(os.path.join(tmpdir, 'somefile.txt'), 'w') as f: + f.write('foo') + # Upload a directory and get the cache file name + p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + env=self.ENVIRON) + (out, err) = p.communicate() + self.assertRegex(err.decode(), r'INFO: Creating new cache file at ') + self.assertEqual(p.returncode, 0) + cache_filepath = re.search(r'INFO: Creating new cache file at (.*)', + err.decode()).groups()[0] + self.assertTrue(os.path.isfile(cache_filepath)) + # Load the cache file contents and modify the manifest to simulate + # an invalid access token + with open(cache_filepath, 'r') as c: + cache = json.load(c) + self.assertRegex(cache['manifest'], r'\+A\S+\@') + cache['manifest'] = re.sub( + r'\+A.*\@', + "+Aabcdef0123456789abcdef0123456789abcdef01@", + cache['manifest']) + with open(cache_filepath, 'w') as c: + c.write(json.dumps(cache)) + # Re-run the upload and expect to get an invalid cache message + p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + env=self.ENVIRON) + (out, err) = p.communicate() + self.assertRegex( + err.decode(), + r'ERROR: arv-put: Resume cache contains invalid signature.*') + self.assertEqual(p.returncode, 1) + + def test_single_expired_signature_reuploads_file(self): + self.authorize_with('active') + tmpdir = self.make_tmpdir() + with open(os.path.join(tmpdir, 'foofile.txt'), 'w') as f: + f.write('foo') + # Write a second file on its own subdir to force a new stream + os.mkdir(os.path.join(tmpdir, 'bar')) + with open(os.path.join(tmpdir, 'bar', 'barfile.txt'), 'w') as f: + f.write('bar') + # Upload a directory and get the cache file name + p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + env=self.ENVIRON) + (out, err) = p.communicate() + self.assertRegex(err.decode(), r'INFO: Creating new cache file at ') + self.assertEqual(p.returncode, 0) + cache_filepath = re.search(r'INFO: Creating new cache file at (.*)', + err.decode()).groups()[0] + self.assertTrue(os.path.isfile(cache_filepath)) + # Load the cache file contents and modify the manifest to simulate + # an expired access token + with open(cache_filepath, 'r') as c: + cache = json.load(c) + self.assertRegex(cache['manifest'], r'\+A\S+\@') + a_month_ago = datetime.datetime.now() - datetime.timedelta(days=30) + # Make one of the signatures appear to have expired + cache['manifest'] = re.sub( + r'\@.*? 3:3:barfile.txt', + "@{} 3:3:barfile.txt".format(self.datetime_to_hex(a_month_ago)), + cache['manifest']) + with open(cache_filepath, 'w') as c: + c.write(json.dumps(cache)) + # Re-run the upload and expect to get an invalid cache message + p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + env=self.ENVIRON) + (out, err) = p.communicate() + self.assertRegex( + err.decode(), + r'WARNING: Uploaded file \'.*barfile.txt\' access token expired, will re-upload it from scratch') self.assertEqual(p.returncode, 0) # Confirm that the resulting cache is different from the last run. with open(cache_filepath, 'r') as c2: @@ -973,6 +1230,18 @@ class ArvPutIntegrationTest(run_test_server.TestCaseWithServers, '--project-uuid', self.PROJECT_UUID]) self.assertEqual(link_name, collection['name']) + def test_put_collection_with_storage_classes_specified(self): + collection = self.run_and_find_collection("", ['--storage-classes', 'hot']) + + self.assertEqual(len(collection['storage_classes_desired']), 1) + self.assertEqual(collection['storage_classes_desired'][0], 'hot') + + def test_put_collection_without_storage_classes_specified(self): + collection = self.run_and_find_collection("") + + self.assertEqual(len(collection['storage_classes_desired']), 1) + self.assertEqual(collection['storage_classes_desired'][0], 'default') + def test_exclude_filename_pattern(self): tmpdir = self.make_tmpdir() tmpsubdir = os.path.join(tmpdir, 'subdir') @@ -1017,6 +1286,16 @@ class ArvPutIntegrationTest(run_test_server.TestCaseWithServers, r'^\./%s.*:file2.txt' % os.path.basename(tmpdir)) self.assertRegex(c['manifest_text'], r'^.*:file3.txt') + def test_unicode_on_filename(self): + tmpdir = self.make_tmpdir() + fname = u"i❤arvados.txt" + with open(os.path.join(tmpdir, fname), 'w') as f: + f.write("This is a unicode named file") + col = self.run_and_find_collection("", ['--no-progress', tmpdir]) + self.assertNotEqual(None, col['uuid']) + c = arv_put.api_client.collections().get(uuid=col['uuid']).execute() + self.assertTrue(fname in c['manifest_text'], u"{} does not include {}".format(c['manifest_text'], fname)) + def test_silent_mode_no_errors(self): self.authorize_with('active') tmpdir = self.make_tmpdir()