from builtins import str
from builtins import range
import apiclient
+import datetime
+import hashlib
+import json
import mock
import os
import pwd
+import random
import re
import shutil
import subprocess
import sys
import tempfile
+import threading
import time
import unittest
-import yaml
-import threading
-import hashlib
-import random
import uuid
+import yaml
import arvados
import arvados.commands.put as arv_put
self.last_cache.close()
resume_cache = arv_put.ResumeCache(self.last_cache.filename)
self.assertNotEqual(None, resume_cache)
- self.assertRaises(None, resume_cache.check_cache())
+ resume_cache.check_cache()
def test_basic_cache_storage(self):
thing = ['test', 'list']
cls.ENVIRON = os.environ.copy()
cls.ENVIRON['PYTHONPATH'] = ':'.join(sys.path)
+ def datetime_to_hex(self, dt):
+ return hex(int(time.mktime(dt.timetuple())))[2:]
+
def setUp(self):
super(ArvPutIntegrationTest, self).setUp()
arv_put.api_client = None
self.assertEqual(1, len(collection_list))
return collection_list[0]
+ def test_expired_token_invalidates_cache(self):
+ self.authorize_with('active')
+ tmpdir = self.make_tmpdir()
+ with open(os.path.join(tmpdir, 'somefile.txt'), 'w') as f:
+ f.write('foo')
+ # Upload a directory and get the cache file name
+ p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ env=self.ENVIRON)
+ (out, err) = p.communicate()
+ self.assertRegex(err.decode(), r'INFO: Creating new cache file at ')
+ self.assertEqual(p.returncode, 0)
+ cache_filepath = re.search(r'INFO: Creating new cache file at (.*)',
+ err.decode()).groups()[0]
+ self.assertTrue(os.path.isfile(cache_filepath))
+ # Load the cache file contents and modify the manifest to simulate
+ # an expired access token
+ with open(cache_filepath, 'r') as c:
+ cache = json.load(c)
+ self.assertRegex(cache['manifest'], r'\+A\S+\@')
+ a_month_ago = datetime.datetime.now() - datetime.timedelta(days=30)
+ cache['manifest'] = re.sub(
+ r'\@.*? ',
+ "@{} ".format(self.datetime_to_hex(a_month_ago)),
+ cache['manifest'])
+ with open(cache_filepath, 'w') as c:
+ c.write(json.dumps(cache))
+ # Re-run the upload and expect to get an invalid cache message
+ p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ env=self.ENVIRON)
+ (out, err) = p.communicate()
+ self.assertRegex(
+ err.decode(),
+ r'WARNING: Uploaded file .* access token expired, will re-upload it from scratch')
+ self.assertEqual(p.returncode, 0)
+ # Confirm that the resulting cache is different from the last run.
+ with open(cache_filepath, 'r') as c2:
+ new_cache = json.load(c2)
+ self.assertNotEqual(cache['manifest'], new_cache['manifest'])
+
def test_put_collection_with_later_update(self):
tmpdir = self.make_tmpdir()
with open(os.path.join(tmpdir, 'file1'), 'w') as f:
r'^\./%s.*:file2.txt' % os.path.basename(tmpdir))
self.assertRegex(c['manifest_text'], r'^.*:file3.txt')
+ def test_silent_mode_no_errors(self):
+ self.authorize_with('active')
+ tmpdir = self.make_tmpdir()
+ with open(os.path.join(tmpdir, 'test.txt'), 'w') as f:
+ f.write('hello world')
+ pipe = subprocess.Popen(
+ [sys.executable, arv_put.__file__] + ['--silent', tmpdir],
+ stdin=subprocess.PIPE, stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE, env=self.ENVIRON)
+ stdout, stderr = pipe.communicate()
+ # No console output should occur on normal operations
+ self.assertNotRegex(stderr.decode(), r'.+')
+ self.assertNotRegex(stdout.decode(), r'.+')
+
+ def test_silent_mode_does_not_avoid_error_messages(self):
+ self.authorize_with('active')
+ pipe = subprocess.Popen(
+ [sys.executable, arv_put.__file__] + ['--silent',
+ '/path/not/existant'],
+ stdin=subprocess.PIPE, stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE, env=self.ENVIRON)
+ stdout, stderr = pipe.communicate()
+ # Error message should be displayed when errors happen
+ self.assertRegex(stderr.decode(), r'.*ERROR:.*')
+ self.assertNotRegex(stdout.decode(), r'.+')
+
if __name__ == '__main__':
unittest.main()