from builtins import str
from builtins import range
import apiclient
+import datetime
import hashlib
import json
+import logging
import mock
import os
import pwd
Z_UUID = 'zzzzz-zzzzz-zzzzzzzzzzzzzzz'
def call_main_with_args(self, args):
- self.main_stdout = tutil.StringIO()
- self.main_stderr = tutil.StringIO()
+ self.main_stdout.seek(0, 0)
+ self.main_stdout.truncate(0)
+ self.main_stderr.seek(0, 0)
+ self.main_stderr.truncate(0)
return arv_put.main(args, self.main_stdout, self.main_stderr)
def call_main_on_test_file(self, args=[]):
super(ArvadosPutTest, self).setUp()
run_test_server.authorize_with('active')
arv_put.api_client = None
+ self.main_stdout = tutil.StringIO()
+ self.main_stderr = tutil.StringIO()
+ self.loggingHandler = logging.StreamHandler(self.main_stderr)
+ self.loggingHandler.setFormatter(logging.Formatter('%(levelname)s: %(message)s'))
+ logging.getLogger().addHandler(self.loggingHandler)
def tearDown(self):
+ logging.getLogger().removeHandler(self.loggingHandler)
for outbuf in ['main_stdout', 'main_stderr']:
if hasattr(self, outbuf):
getattr(self, outbuf).close()
self.assertLess(0, coll_save_mock.call_count)
self.assertEqual("", self.main_stdout.getvalue())
+ def test_request_id_logging(self):
+ matcher = r'INFO: X-Request-Id: req-[a-z0-9]{20}\n'
+
+ self.call_main_on_test_file()
+ self.assertRegex(self.main_stderr.getvalue(), matcher)
+
+ self.call_main_on_test_file(['--silent'])
+ self.assertNotRegex(self.main_stderr.getvalue(), matcher)
+
class ArvPutIntegrationTest(run_test_server.TestCaseWithServers,
ArvadosBaseTestCase):
cls.ENVIRON = os.environ.copy()
cls.ENVIRON['PYTHONPATH'] = ':'.join(sys.path)
+ def datetime_to_hex(self, dt):
+ return hex(int(time.mktime(dt.timetuple())))[2:]
+
def setUp(self):
super(ArvPutIntegrationTest, self).setUp()
arv_put.api_client = None
self.assertEqual(1, len(collection_list))
return collection_list[0]
- def test_invalid_token_invalidates_cache(self):
+ def test_expired_token_invalidates_cache(self):
self.authorize_with('active')
tmpdir = self.make_tmpdir()
with open(os.path.join(tmpdir, 'somefile.txt'), 'w') as f:
err.decode()).groups()[0]
self.assertTrue(os.path.isfile(cache_filepath))
# Load the cache file contents and modify the manifest to simulate
- # an invalid access token
+ # an expired access token
with open(cache_filepath, 'r') as c:
cache = json.load(c)
self.assertRegex(cache['manifest'], r'\+A\S+\@')
- cache['manifest'] = re.sub(r'\+A\S+\@',
- '+Adeadbeefdeadbeefdeadbeefdeadbeefdeadbeef@',
- cache['manifest'])
+ a_month_ago = datetime.datetime.now() - datetime.timedelta(days=30)
+ cache['manifest'] = re.sub(
+ r'\@.*? ',
+ "@{} ".format(self.datetime_to_hex(a_month_ago)),
+ cache['manifest'])
with open(cache_filepath, 'w') as c:
c.write(json.dumps(cache))
# Re-run the upload and expect to get an invalid cache message
(out, err) = p.communicate()
self.assertRegex(
err.decode(),
- r'INFO: Cache file (.*) is not valid, starting from scratch')
+ r'WARNING: Uploaded file .* access token expired, will re-upload it from scratch')
self.assertEqual(p.returncode, 0)
+ # Confirm that the resulting cache is different from the last run.
+ with open(cache_filepath, 'r') as c2:
+ new_cache = json.load(c2)
+ self.assertNotEqual(cache['manifest'], new_cache['manifest'])
def test_put_collection_with_later_update(self):
tmpdir = self.make_tmpdir()