+# -*- coding: utf-8 -*-
# Copyright (C) The Arvados Authors. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
from builtins import str
from builtins import range
+from functools import partial
import apiclient
+import datetime
+import hashlib
+import json
+import logging
import mock
import os
import pwd
+import random
import re
+import select
import shutil
+import signal
import subprocess
import sys
import tempfile
import time
import unittest
-import yaml
-import threading
-import hashlib
-import random
import uuid
+import yaml
import arvados
import arvados.commands.put as arv_put
+class CachedManifestValidationTest(ArvadosBaseTestCase):
+ class MockedPut(arv_put.ArvPutUploadJob):
+ def __init__(self, cached_manifest=None):
+ self._state = arv_put.ArvPutUploadJob.EMPTY_STATE
+ self._state['manifest'] = cached_manifest
+ self._api_client = mock.MagicMock()
+ self.logger = mock.MagicMock()
+ self.num_retries = 1
+ def datetime_to_hex(self, dt):
+ return hex(int(time.mktime(dt.timetuple())))[2:]
+ def setUp(self):
+ super(CachedManifestValidationTest, self).setUp()
+ self.block1 = "fdba98970961edb29f88241b9d99d890" # foo
+ self.block2 = "37b51d194a7513e45b56f6524f2d51f2" # bar
+ self.template = ". "+self.block1+"+3+Asignature@%s "+self.block2+"+3+Anothersignature@%s 0:3:foofile.txt 3:6:barfile.txt\n"
+ def test_empty_cached_manifest_is_valid(self):
+ put_mock = self.MockedPut()
+ self.assertEqual(None, put_mock._state.get('manifest'))
+ self.assertTrue(put_mock._cached_manifest_valid())
+ put_mock._state['manifest'] = ''
+ self.assertTrue(put_mock._cached_manifest_valid())
+ def test_signature_cases(self):
+ now = datetime.datetime.utcnow()
+ yesterday = now - datetime.timedelta(days=1)
+ lastweek = now - datetime.timedelta(days=7)
+ tomorrow = now + datetime.timedelta(days=1)
+ nextweek = now + datetime.timedelta(days=7)
+ def mocked_head(blocks={}, loc=None):
+ blk = loc.split('+', 1)[0]
+ if blocks.get(blk):
+ return True
+ raise arvados.errors.KeepRequestError("mocked error - block invalid")
+ # Block1_expiration, Block2_expiration, Block1_HEAD, Block2_HEAD, Expectation
+ cases = [
+ # All expired, reset cache - OK
+ (yesterday, lastweek, False, False, True),
+ (lastweek, yesterday, False, False, True),
+ # All non-expired valid blocks - OK
+ (tomorrow, nextweek, True, True, True),
+ (nextweek, tomorrow, True, True, True),
+ # All non-expired invalid blocks - Not OK
+ (tomorrow, nextweek, False, False, False),
+ (nextweek, tomorrow, False, False, False),
+ # One non-expired valid block - OK
+ (tomorrow, yesterday, True, False, True),
+ (yesterday, tomorrow, False, True, True),
+ # One non-expired invalid block - Not OK
+ (tomorrow, yesterday, False, False, False),
+ (yesterday, tomorrow, False, False, False),
+ ]
+ for case in cases:
+ b1_expiration, b2_expiration, b1_valid, b2_valid, outcome = case
+ head_responses = {
+ self.block1: b1_valid,
+ self.block2: b2_valid,
+ }
+ cached_manifest = self.template % (
+ self.datetime_to_hex(b1_expiration),
+ self.datetime_to_hex(b2_expiration),
+ )
+ arvput = self.MockedPut(cached_manifest)
+ with mock.patch('arvados.collection.KeepClient.head') as head_mock:
+ head_mock.side_effect = partial(mocked_head, head_responses)
+ self.assertEqual(outcome, arvput._cached_manifest_valid(),
+ "Case '%s' should have produced outcome '%s'" % (case, outcome)
+ )
+ if b1_expiration > now or b2_expiration > now:
+ # A HEAD request should have been done
+ head_mock.assert_called_once()
+ else:
+ head_mock.assert_not_called()
class ArvadosExpectedBytesTest(ArvadosBaseTestCase):
TEST_SIZE = os.path.getsize(__file__)
def test_expected_bytes_for_device(self):
- writer = arv_put.ArvPutUploadJob(['/dev/null'])
+ writer = arv_put.ArvPutUploadJob(['/dev/null'], use_cache=False, resume=False)
writer = arv_put.ArvPutUploadJob([__file__, '/dev/null'])
arv_put.human_progress(count, None)))
+class ArvPutLogFormatterTest(ArvadosBaseTestCase):
+ matcher = r'\(X-Request-Id: req-[a-z0-9]{20}\)'
+ def setUp(self):
+ super(ArvPutLogFormatterTest, self).setUp()
+ self.stderr = tutil.StringIO()
+ self.loggingHandler = logging.StreamHandler(self.stderr)
+ self.loggingHandler.setFormatter(
+ arv_put.ArvPutLogFormatter(arvados.util.new_request_id()))
+ self.logger = logging.getLogger()
+ self.logger.addHandler(self.loggingHandler)
+ self.logger.setLevel(logging.DEBUG)
+ def tearDown(self):
+ self.logger.removeHandler(self.loggingHandler)
+ self.stderr.close()
+ self.stderr = None
+ super(ArvPutLogFormatterTest, self).tearDown()
+ def test_request_id_logged_only_once_on_error(self):
+ self.logger.error('Ooops, something bad happened.')
+ self.logger.error('Another bad thing just happened.')
+ log_lines = self.stderr.getvalue().split('\n')[:-1]
+ self.assertEqual(2, len(log_lines))
+ self.assertRegex(log_lines[0], self.matcher)
+ self.assertNotRegex(log_lines[1], self.matcher)
+ def test_request_id_logged_only_once_on_debug(self):
+ self.logger.debug('This is just a debug message.')
+ self.logger.debug('Another message, move along.')
+ log_lines = self.stderr.getvalue().split('\n')[:-1]
+ self.assertEqual(2, len(log_lines))
+ self.assertRegex(log_lines[0], self.matcher)
+ self.assertNotRegex(log_lines[1], self.matcher)
+ def test_request_id_not_logged_on_info(self):
+ self.logger.info('This should be a useful message')
+ log_lines = self.stderr.getvalue().split('\n')[:-1]
+ self.assertEqual(1, len(log_lines))
+ self.assertNotRegex(log_lines[0], self.matcher)
class ArvadosPutTest(run_test_server.TestCaseWithServers,
Z_UUID = 'zzzzz-zzzzz-zzzzzzzzzzzzzzz'
def call_main_with_args(self, args):
- self.main_stdout = tutil.StringIO()
- self.main_stderr = tutil.StringIO()
+ self.main_stdout.seek(0, 0)
+ self.main_stdout.truncate(0)
+ self.main_stderr.seek(0, 0)
+ self.main_stderr.truncate(0)
return arv_put.main(args, self.main_stdout, self.main_stderr)
def call_main_on_test_file(self, args=[]):
super(ArvadosPutTest, self).setUp()
arv_put.api_client = None
+ self.main_stdout = tutil.StringIO()
+ self.main_stderr = tutil.StringIO()
+ self.loggingHandler = logging.StreamHandler(self.main_stderr)
+ self.loggingHandler.setFormatter(
+ arv_put.ArvPutLogFormatter(arvados.util.new_request_id()))
+ logging.getLogger().addHandler(self.loggingHandler)
def tearDown(self):
+ logging.getLogger().removeHandler(self.loggingHandler)
for outbuf in ['main_stdout', 'main_stderr']:
if hasattr(self, outbuf):
getattr(self, outbuf).close()
['--project-uuid', self.Z_UUID, '--stream'])
+ def test_error_when_multiple_storage_classes_specified(self):
+ self.assertRaises(SystemExit,
+ self.call_main_with_args,
+ ['--storage-classes', 'hot,cold'])
def test_error_when_excluding_absolute_path(self):
tmpdir = self.make_tmpdir()
self.assertLess(0, coll_save_mock.call_count)
self.assertEqual("", self.main_stdout.getvalue())
+ def test_request_id_logging_on_error(self):
+ matcher = r'\(X-Request-Id: req-[a-z0-9]{20}\)\n'
+ coll_save_mock = mock.Mock(name='arv.collection.Collection().save_new()')
+ coll_save_mock.side_effect = arvados.errors.ApiError(
+ fake_httplib2_response(403), b'{}')
+ with mock.patch('arvados.collection.Collection.save_new',
+ new=coll_save_mock):
+ with self.assertRaises(SystemExit) as exc_test:
+ self.call_main_with_args(['/dev/null'])
+ self.assertRegex(
+ self.main_stderr.getvalue(), matcher)
class ArvPutIntegrationTest(run_test_server.TestCaseWithServers,
cls.ENVIRON = os.environ.copy()
cls.ENVIRON['PYTHONPATH'] = ':'.join(sys.path)
+ def datetime_to_hex(self, dt):
+ return hex(int(time.mktime(dt.timetuple())))[2:]
def setUp(self):
super(ArvPutIntegrationTest, self).setUp()
arv_put.api_client = None
+ def test_sigint_logs_request_id(self):
+ # Start arv-put, give it a chance to start up, send SIGINT,
+ # and check that its output includes the X-Request-Id.
+ input_stream = subprocess.Popen(
+ ['sleep', '10'],
+ stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
+ pipe = subprocess.Popen(
+ [sys.executable, arv_put.__file__, '--stream'],
+ stdin=input_stream.stdout, stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT, env=self.ENVIRON)
+ # Wait for arv-put child process to print something (i.e., a
+ # log message) so we know its signal handler is installed.
+ select.select([pipe.stdout], [], [], 10)
+ pipe.send_signal(signal.SIGINT)
+ deadline = time.time() + 5
+ while (pipe.poll() is None) and (time.time() < deadline):
+ time.sleep(.1)
+ returncode = pipe.poll()
+ input_stream.terminate()
+ if returncode is None:
+ pipe.terminate()
+ self.fail("arv-put did not exit within 5 seconds")
+ self.assertRegex(pipe.stdout.read().decode(), r'\(X-Request-Id: req-[a-z0-9]{20}\)')
def test_ArvPutSignedManifest(self):
# ArvPutSignedManifest runs "arv-put foo" and then attempts to get
# the newly created manifest from the API server, testing to confirm
self.assertEqual(1, len(collection_list))
return collection_list[0]
+ def test_all_expired_signatures_invalidates_cache(self):
+ self.authorize_with('active')
+ tmpdir = self.make_tmpdir()
+ with open(os.path.join(tmpdir, 'somefile.txt'), 'w') as f:
+ f.write('foo')
+ # Upload a directory and get the cache file name
+ p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ env=self.ENVIRON)
+ (out, err) = p.communicate()
+ self.assertRegex(err.decode(), r'INFO: Creating new cache file at ')
+ self.assertEqual(p.returncode, 0)
+ cache_filepath = re.search(r'INFO: Creating new cache file at (.*)',
+ err.decode()).groups()[0]
+ self.assertTrue(os.path.isfile(cache_filepath))
+ # Load the cache file contents and modify the manifest to simulate
+ # an expired access token
+ with open(cache_filepath, 'r') as c:
+ cache = json.load(c)
+ self.assertRegex(cache['manifest'], r'\+A\S+\@')
+ a_month_ago = datetime.datetime.now() - datetime.timedelta(days=30)
+ cache['manifest'] = re.sub(
+ r'\@.*? ',
+ "@{} ".format(self.datetime_to_hex(a_month_ago)),
+ cache['manifest'])
+ with open(cache_filepath, 'w') as c:
+ c.write(json.dumps(cache))
+ # Re-run the upload and expect to get an invalid cache message
+ p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ env=self.ENVIRON)
+ (out, err) = p.communicate()
+ self.assertRegex(
+ err.decode(),
+ r'INFO: Cache expired, starting from scratch.*')
+ self.assertEqual(p.returncode, 0)
+ def test_invalid_signature_invalidates_cache(self):
+ self.authorize_with('active')
+ tmpdir = self.make_tmpdir()
+ with open(os.path.join(tmpdir, 'somefile.txt'), 'w') as f:
+ f.write('foo')
+ # Upload a directory and get the cache file name
+ p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ env=self.ENVIRON)
+ (out, err) = p.communicate()
+ self.assertRegex(err.decode(), r'INFO: Creating new cache file at ')
+ self.assertEqual(p.returncode, 0)
+ cache_filepath = re.search(r'INFO: Creating new cache file at (.*)',
+ err.decode()).groups()[0]
+ self.assertTrue(os.path.isfile(cache_filepath))
+ # Load the cache file contents and modify the manifest to simulate
+ # an invalid access token
+ with open(cache_filepath, 'r') as c:
+ cache = json.load(c)
+ self.assertRegex(cache['manifest'], r'\+A\S+\@')
+ cache['manifest'] = re.sub(
+ r'\+A.*\@',
+ "+Aabcdef0123456789abcdef0123456789abcdef01@",
+ cache['manifest'])
+ with open(cache_filepath, 'w') as c:
+ c.write(json.dumps(cache))
+ # Re-run the upload and expect to get an invalid cache message
+ p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ env=self.ENVIRON)
+ (out, err) = p.communicate()
+ self.assertRegex(
+ err.decode(),
+ r'ERROR: arv-put: Resume cache contains invalid signature.*')
+ self.assertEqual(p.returncode, 1)
+ def test_single_expired_signature_reuploads_file(self):
+ self.authorize_with('active')
+ tmpdir = self.make_tmpdir()
+ with open(os.path.join(tmpdir, 'foofile.txt'), 'w') as f:
+ f.write('foo')
+ # Write a second file on its own subdir to force a new stream
+ os.mkdir(os.path.join(tmpdir, 'bar'))
+ with open(os.path.join(tmpdir, 'bar', 'barfile.txt'), 'w') as f:
+ f.write('bar')
+ # Upload a directory and get the cache file name
+ p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ env=self.ENVIRON)
+ (out, err) = p.communicate()
+ self.assertRegex(err.decode(), r'INFO: Creating new cache file at ')
+ self.assertEqual(p.returncode, 0)
+ cache_filepath = re.search(r'INFO: Creating new cache file at (.*)',
+ err.decode()).groups()[0]
+ self.assertTrue(os.path.isfile(cache_filepath))
+ # Load the cache file contents and modify the manifest to simulate
+ # an expired access token
+ with open(cache_filepath, 'r') as c:
+ cache = json.load(c)
+ self.assertRegex(cache['manifest'], r'\+A\S+\@')
+ a_month_ago = datetime.datetime.now() - datetime.timedelta(days=30)
+ # Make one of the signatures appear to have expired
+ cache['manifest'] = re.sub(
+ r'\@.*? 3:3:barfile.txt',
+ "@{} 3:3:barfile.txt".format(self.datetime_to_hex(a_month_ago)),
+ cache['manifest'])
+ with open(cache_filepath, 'w') as c:
+ c.write(json.dumps(cache))
+ # Re-run the upload and expect to get an invalid cache message
+ p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ env=self.ENVIRON)
+ (out, err) = p.communicate()
+ self.assertRegex(
+ err.decode(),
+ r'WARNING: Uploaded file \'.*barfile.txt\' access token expired, will re-upload it from scratch')
+ self.assertEqual(p.returncode, 0)
+ # Confirm that the resulting cache is different from the last run.
+ with open(cache_filepath, 'r') as c2:
+ new_cache = json.load(c2)
+ self.assertNotEqual(cache['manifest'], new_cache['manifest'])
def test_put_collection_with_later_update(self):
tmpdir = self.make_tmpdir()
with open(os.path.join(tmpdir, 'file1'), 'w') as f:
'--project-uuid', self.PROJECT_UUID])
self.assertEqual(link_name, collection['name'])
+ def test_put_collection_with_storage_classes_specified(self):
+ collection = self.run_and_find_collection("", ['--storage-classes', 'hot'])
+ self.assertEqual(len(collection['storage_classes_desired']), 1)
+ self.assertEqual(collection['storage_classes_desired'][0], 'hot')
+ def test_put_collection_without_storage_classes_specified(self):
+ collection = self.run_and_find_collection("")
+ self.assertEqual(len(collection['storage_classes_desired']), 1)
+ self.assertEqual(collection['storage_classes_desired'][0], 'default')
def test_exclude_filename_pattern(self):
tmpdir = self.make_tmpdir()
tmpsubdir = os.path.join(tmpdir, 'subdir')
r'^\./%s.*:file2.txt' % os.path.basename(tmpdir))
self.assertRegex(c['manifest_text'], r'^.*:file3.txt')
+ def test_unicode_on_filename(self):
+ tmpdir = self.make_tmpdir()
+ fname = u"i❤arvados.txt"
+ with open(os.path.join(tmpdir, fname), 'w') as f:
+ f.write("This is a unicode named file")
+ col = self.run_and_find_collection("", ['--no-progress', tmpdir])
+ self.assertNotEqual(None, col['uuid'])
+ c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
+ self.assertTrue(fname in c['manifest_text'], u"{} does not include {}".format(c['manifest_text'], fname))
def test_silent_mode_no_errors(self):
tmpdir = self.make_tmpdir()