X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/69902ee6583e1de32786e80b77c8f61870ed6f90..965565ddc62635928a6b043158fd683738961c8c:/sdk/python/tests/test_arvfile.py diff --git a/sdk/python/tests/test_arvfile.py b/sdk/python/tests/test_arvfile.py index 2e43216da8..8f02d517fc 100644 --- a/sdk/python/tests/test_arvfile.py +++ b/sdk/python/tests/test_arvfile.py @@ -1,6 +1,7 @@ #!/usr/bin/env python import bz2 +import datetime import gzip import io import mock @@ -28,7 +29,7 @@ class ArvadosFileWriterTestCase(unittest.TestCase): def get_from_cache(self, locator): self.requests.append(locator) return self.blocks.get(locator) - def put(self, data, num_retries=None): + def put(self, data, num_retries=None, copies=None): pdh = tutil.str_keep_locator(data) self.blocks[pdh] = str(data) return pdh @@ -38,6 +39,7 @@ class ArvadosFileWriterTestCase(unittest.TestCase): self.body = b self.response = r self._schema = ArvadosFileWriterTestCase.MockApi.MockSchema() + self._rootDesc = {} class MockSchema(object): def __init__(self): self.schemas = {'Collection': {'properties': {'replication_desired': {'type':'integer'}}}} @@ -569,6 +571,26 @@ class ArvadosFileReadlinesTestCase(ArvadosFileReadTestCase): def read_for_test(self, reader, byte_count, **kwargs): return ''.join(reader.readlines(**kwargs)) + +class ArvadosFileTestCase(unittest.TestCase): + def datetime_to_hex(self, dt): + return hex(int(time.mktime(dt.timetuple())))[2:] + + def test_permission_expired(self): + base_manifest = ". 781e5e245d69b566979b86e28d23f2c7+10+A715fd31f8111894f717eb1003c1b0216799dd9ec@{} 0:10:count.txt\n" + now = datetime.datetime.now() + a_week_ago = now - datetime.timedelta(days=7) + a_month_ago = now - datetime.timedelta(days=30) + a_week_from_now = now + datetime.timedelta(days=7) + with Collection(base_manifest.format(self.datetime_to_hex(a_week_from_now))) as c: + self.assertFalse(c.find('count.txt').permission_expired()) + with Collection(base_manifest.format(self.datetime_to_hex(a_week_ago))) as c: + f = c.find('count.txt') + self.assertTrue(f.permission_expired()) + self.assertTrue(f.permission_expired(a_week_from_now)) + self.assertFalse(f.permission_expired(a_month_ago)) + + class BlockManagerTest(unittest.TestCase): def test_bufferblock_append(self): keep = ArvadosFileWriterTestCase.MockKeep({})