# Python 2 writes version info on stderr.
self.assertEqual(out.getvalue(), '')
v = err.getvalue()
- self.assertRegexpMatches(v, "[0-9]+\.[0-9]+\.[0-9]+$\n")
+ self.assertRegex(v, r"[0-9]+\.[0-9]+\.[0-9]+$\n")
class FakeCurl(object):
testfile.write(text)
testfile.flush()
return testfile
+
+if sys.version_info < (3, 0):
+ # There is no assert[Not]Regex that works in both Python 2 and 3,
+ # so we backport Python 3 style to Python 2.
+ def assertRegex(self, *args, **kwargs):
+ return self.assertRegexpMatches(*args, **kwargs)
+ def assertNotRegex(self, *args, **kwargs):
+ return self.assertNotRegexpMatches(*args, **kwargs)
+ unittest.TestCase.assertRegex = assertRegex
+ unittest.TestCase.assertNotRegex = assertNotRegex
self.assertEqual(out.getvalue(), '')
if expect_ok:
- self.assertNotRegexpMatches(
+ self.assertNotRegex(
err.getvalue(), "refusing to store",
msg=repr((supported, img_id)))
else:
- self.assertRegexpMatches(
+ self.assertRegex(
err.getvalue(), "refusing to store",
msg=repr((supported, img_id)))
if not supported:
- self.assertRegexpMatches(
+ self.assertRegex(
err.getvalue(),
"server does not specify supported image formats",
msg=repr((supported, img_id)))
api()._rootDesc = fakeDD
self.run_arv_keepdocker(
['--force', '--force-image-format', 'testimage'], err)
- self.assertRegexpMatches(err.getvalue(), "forcing incompatible image")
+ self.assertRegex(err.getvalue(), "forcing incompatible image")
# The manifest text stored in the API server under the same
# manifest UUID must use signed locators.
c = arv_put.api_client.collections().get(uuid=manifest_uuid).execute()
- self.assertRegexpMatches(
+ self.assertRegex(
c['manifest_text'],
r'^\. 08a008a01d498c404b0c30852b39d3b8\+44\+A[0-9a-f]+@[0-9a-f]+ 0:44:foo\n')
self.assertEqual(col['uuid'], updated_col['uuid'])
# Get the manifest and check that the new file is being included
c = arv_put.api_client.collections().get(uuid=updated_col['uuid']).execute()
- self.assertRegexpMatches(c['manifest_text'], r'^\. .*:44:file2\n')
+ self.assertRegex(c['manifest_text'], r'^\. .*:44:file2\n')
def test_put_collection_with_high_redundancy(self):
# Write empty data: we're not testing CollectionWriter, just
"Test unnamed collection",
['--portable-data-hash', '--project-uuid', self.PROJECT_UUID])
username = pwd.getpwuid(os.getuid()).pw_name
- self.assertRegexpMatches(
+ self.assertRegex(
link['name'],
r'^Saved at .* by {}@'.format(re.escape(username)))
# c1 changed, so c2 mod will go to a conflict file
c1.apply(d)
- self.assertRegexpMatches(c1.portable_manifest_text(), r"\. 95ebc3c7b3b9f1d2c40fec14415d3cb8\+5 5348b82a029fd9e971a811ce1f71360b\+43 0:5:count1\.txt 5:10:count1\.txt~\d\d\d\d\d\d\d\d-\d\d\d\d\d\d~conflict~$")
+ self.assertRegex(
+ c1.portable_manifest_text(),
+ r"\. 95ebc3c7b3b9f1d2c40fec14415d3cb8\+5 5348b82a029fd9e971a811ce1f71360b\+43 0:5:count1\.txt 5:10:count1\.txt~\d\d\d\d\d\d\d\d-\d\d\d\d\d\d~conflict~$")
def test_conflict_add(self):
c1 = Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count2.txt\n')
# c1 added count1.txt, so c2 add will go to a conflict file
c1.apply(d)
- self.assertRegexpMatches(c1.portable_manifest_text(), r"\. 95ebc3c7b3b9f1d2c40fec14415d3cb8\+5 5348b82a029fd9e971a811ce1f71360b\+43 0:5:count1\.txt 5:10:count1\.txt~\d\d\d\d\d\d\d\d-\d\d\d\d\d\d~conflict~$")
+ self.assertRegex(
+ c1.portable_manifest_text(),
+ r"\. 95ebc3c7b3b9f1d2c40fec14415d3cb8\+5 5348b82a029fd9e971a811ce1f71360b\+43 0:5:count1\.txt 5:10:count1\.txt~\d\d\d\d\d\d\d\d-\d\d\d\d\d\d~conflict~$")
def test_conflict_del(self):
c1 = Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt')
# c1 deleted, so c2 mod will go to a conflict file
c1.apply(d)
- self.assertRegexpMatches(c1.portable_manifest_text(), r"\. 5348b82a029fd9e971a811ce1f71360b\+43 0:10:count1\.txt~\d\d\d\d\d\d\d\d-\d\d\d\d\d\d~conflict~$")
+ self.assertRegex(
+ c1.portable_manifest_text(),
+ r"\. 5348b82a029fd9e971a811ce1f71360b\+43 0:10:count1\.txt~\d\d\d\d\d\d\d\d-\d\d\d\d\d\d~conflict~$")
def test_notify(self):
c1 = Collection()
def test_create_and_save(self):
c = self.create_count_txt()
c.save()
- self.assertRegexpMatches(c.manifest_text(), r"^\. 781e5e245d69b566979b86e28d23f2c7\+10\+A[a-f0-9]{40}@[a-f0-9]{8} 0:10:count\.txt$",)
+ self.assertRegex(
+ c.manifest_text(),
+ r"^\. 781e5e245d69b566979b86e28d23f2c7\+10\+A[a-f0-9]{40}@[a-f0-9]{8} 0:10:count\.txt$",)
def test_create_and_save_new(self):
c = self.create_count_txt()
c.save_new()
- self.assertRegexpMatches(c.manifest_text(), r"^\. 781e5e245d69b566979b86e28d23f2c7\+10\+A[a-f0-9]{40}@[a-f0-9]{8} 0:10:count\.txt$",)
+ self.assertRegex(
+ c.manifest_text(),
+ r"^\. 781e5e245d69b566979b86e28d23f2c7\+10\+A[a-f0-9]{40}@[a-f0-9]{8} 0:10:count\.txt$",)
def test_create_diff_apply(self):
c1 = self.create_count_txt()
c2.save()
c1.update()
- self.assertRegexpMatches(c1.manifest_text(), r"\. e65075d550f9b5bf9992fa1d71a131be\+3\S* 7ac66c0f148de9519b8bd264312c4d64\+7\S* 0:3:count\.txt 3:7:count\.txt~\d\d\d\d\d\d\d\d-\d\d\d\d\d\d~conflict~$")
+ self.assertRegex(
+ c1.manifest_text(),
+ r"\. e65075d550f9b5bf9992fa1d71a131be\+3\S* 7ac66c0f148de9519b8bd264312c4d64\+7\S* 0:3:count\.txt 3:7:count\.txt~\d\d\d\d\d\d\d\d-\d\d\d\d\d\d~conflict~$")
if __name__ == '__main__':
message = "test plain traceback string"
test_exc = arv_error.KeepRequestError(message)
exc_report = self.traceback_str(test_exc)
- self.assertTrue(exc_report.startswith("KeepRequestError: "))
+ self.assertRegex(exc_report, r"^KeepRequestError: ")
self.assertIn(message, exc_report)
def test_traceback_str_with_request_errors(self):
message = "test traceback shows Keep services"
test_exc = arv_error.KeepRequestError(message, self.REQUEST_ERRORS[:])
exc_report = self.traceback_str(test_exc)
- self.assertTrue(exc_report.startswith("KeepRequestError: "))
+ self.assertRegex(exc_report, r"^KeepRequestError: ")
for expect_substr in [message, "raised IOError", "raised MemoryError",
"test MemoryError", "second test IOError",
"responded with 500 Internal Server Error"]:
def test_KeepBasicRWTest(self):
self.assertEqual(0, self.keep_client.upload_counter.get())
foo_locator = self.keep_client.put('foo')
- self.assertRegexpMatches(
+ self.assertRegex(
foo_locator,
'^acbd18db4cc2f85cedef654fccc4a4d8\+3',
'wrong md5 hash from Keep.put("foo"): ' + foo_locator)
def test_KeepBinaryRWTest(self):
blob_str = b'\xff\xfe\xf7\x00\x01\x02'
blob_locator = self.keep_client.put(blob_str)
- self.assertRegexpMatches(
+ self.assertRegex(
blob_locator,
'^7fc7c53b45e53926ba52821140fef396\+6',
('wrong locator from Keep.put(<binarydata>):' + blob_locator))
for i in range(0,23):
blob_data = blob_data + blob_data
blob_locator = self.keep_client.put(blob_data)
- self.assertRegexpMatches(
+ self.assertRegex(
blob_locator,
'^84d90fc0d8175dd5dcfab04b999bc956\+67108864',
('wrong locator from Keep.put(<binarydata>): ' + blob_locator))
def test_KeepSingleCopyRWTest(self):
blob_data = b'\xff\xfe\xfd\xfc\x00\x01\x02\x03'
blob_locator = self.keep_client.put(blob_data, copies=1)
- self.assertRegexpMatches(
+ self.assertRegex(
blob_locator,
'^c902006bc98a3eb4a3663b65ab4a6fab\+8',
('wrong locator from Keep.put(<binarydata>): ' + blob_locator))
def test_KeepEmptyCollectionTest(self):
blob_locator = self.keep_client.put('', copies=1)
- self.assertRegexpMatches(
+ self.assertRegex(
blob_locator,
'^d41d8cd98f00b204e9800998ecf8427e\+0',
('wrong locator from Keep.put(""): ' + blob_locator))
def test_unicode_must_be_ascii(self):
# If unicode type, must only consist of valid ASCII
foo_locator = self.keep_client.put(u'foo')
- self.assertRegexpMatches(
+ self.assertRegex(
foo_locator,
'^acbd18db4cc2f85cedef654fccc4a4d8\+3',
'wrong md5 hash from Keep.put("foo"): ' + foo_locator)
def test_KeepHeadTest(self):
locator = self.keep_client.put('test_head')
- self.assertRegexpMatches(
+ self.assertRegex(
locator,
'^b9a772c7049325feb7130fff1f8333e9\+9',
'wrong md5 hash from Keep.put for "test_head": ' + locator)
run_test_server.authorize_with('active')
keep_client = arvados.KeepClient()
foo_locator = keep_client.put('foo')
- self.assertRegexpMatches(
+ self.assertRegex(
foo_locator,
r'^acbd18db4cc2f85cedef654fccc4a4d8\+3\+A[a-f0-9]+@[a-f0-9]+$',
'invalid locator from Keep.put("foo"): ' + foo_locator)
# GET with an unsigned locator => NotFound
bar_locator = keep_client.put('bar')
unsigned_bar_locator = "37b51d194a7513e45b56f6524f2d51f2+3"
- self.assertRegexpMatches(
+ self.assertRegex(
bar_locator,
r'^37b51d194a7513e45b56f6524f2d51f2\+3\+A[a-f0-9]+@[a-f0-9]+$',
'invalid locator from Keep.put("bar"): ' + bar_locator)
def _put_foo_and_check(self):
signed_locator = self.keep_client.put('foo')
- self.assertRegexpMatches(
+ self.assertRegex(
signed_locator,
r'^acbd18db4cc2f85cedef654fccc4a4d8\+3\+A[a-f0-9]+@[a-f0-9]+$',
'invalid locator from Keep.put("foo"): ' + signed_locator)
keep_client = arvados.KeepClient(api_client=self.api_client,
local_store='')
baz_locator = keep_client.put('baz')
- self.assertRegexpMatches(
+ self.assertRegex(
baz_locator,
'^73feffa4b7f6bb68e44cf984c85f6e88\+3',
'wrong md5 hash from Keep.put("baz"): ' + baz_locator)
keep_client = arvados.KeepClient(api_client=self.api_client,
proxy='', local_store='')
baz_locator = keep_client.put('baz2')
- self.assertRegexpMatches(
+ self.assertRegex(
baz_locator,
'^91f372a266fe2bf2823cb8ec7fda31ce\+4',
'wrong md5 hash from Keep.put("baz2"): ' + baz_locator)
mocks[i].getopt(pycurl.URL).decode())
# Disk services are tried next.
for i in range(gateways, gateways+disks):
- self.assertRegexpMatches(
+ self.assertRegex(
mocks[i].getopt(pycurl.URL).decode(),
r'keep0x')
mocks[i].getopt(pycurl.URL).decode())
# Disk services are tried next.
for i in range(gateways, gateways+disks):
- self.assertRegexpMatches(
+ self.assertRegex(
mocks[i].getopt(pycurl.URL).decode(),
r'keep0x')