import time
import threading
import timer
+import datetime
global_client_object = None
import config
import arvados.errors
+class KeepLocator(object):
+ EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
+ HEX_RE = re.compile(r'^[0-9a-fA-F]+$')
+
+ def __init__(self, locator_str):
+ self.size = None
+ self.loc_hint = None
+ self._perm_sig = None
+ self._perm_expiry = None
+ pieces = iter(locator_str.split('+'))
+ self.md5sum = next(pieces)
+ for hint in pieces:
+ if hint.startswith('A'):
+ self.parse_permission_hint(hint)
+ elif hint.startswith('K'):
+ self.loc_hint = hint # FIXME
+ elif hint.isdigit():
+ self.size = int(hint)
+ else:
+ raise ValueError("unrecognized hint data {}".format(hint))
+
+ def __str__(self):
+ return '+'.join(
+ str(s) for s in [self.md5sum, self.size, self.loc_hint,
+ self.permission_hint()]
+ if s is not None)
+
+ def _is_hex_length(self, s, *size_spec):
+ if len(size_spec) == 1:
+ good_len = (len(s) == size_spec[0])
+ else:
+ good_len = (size_spec[0] <= len(s) <= size_spec[1])
+ return good_len and self.HEX_RE.match(s)
+
+ def _make_hex_prop(name, length):
+ # Build and return a new property with the given name that
+ # must be a hex string of the given length.
+ data_name = '_{}'.format(name)
+ def getter(self):
+ return getattr(self, data_name)
+ def setter(self, hex_str):
+ if not self._is_hex_length(hex_str, length):
+ raise ValueError("{} must be a {}-digit hex string: {}".
+ format(name, length, hex_str))
+ setattr(self, data_name, hex_str)
+ return property(getter, setter)
+
+ md5sum = _make_hex_prop('md5sum', 32)
+ perm_sig = _make_hex_prop('perm_sig', 40)
+
+ @property
+ def perm_expiry(self):
+ return self._perm_expiry
+
+ @perm_expiry.setter
+ def perm_expiry(self, value):
+ if not self._is_hex_length(value, 1, 8):
+ raise ValueError(
+ "permission timestamp must be a hex Unix timestamp: {}".
+ format(value))
+ self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
+
+ def permission_hint(self):
+ data = [self.perm_sig, self.perm_expiry]
+ if None in data:
+ return None
+ data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
+ return "A{}@{:08x}".format(*data)
+
+ def parse_permission_hint(self, s):
+ try:
+ self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
+ except IndexError:
+ raise ValueError("bad permission hint {}".format(s))
+
+ def permission_expired(self, as_of_dt=None):
+ if self.perm_expiry is None:
+ return False
+ elif as_of_dt is None:
+ as_of_dt = datetime.datetime.now()
+ return self.perm_expiry <= as_of_dt
+
+
class Keep:
@staticmethod
def global_client_object():
--- /dev/null
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+import datetime
+import itertools
+import random
+import unittest
+
+from arvados.keep import KeepLocator
+
+class ArvadosPutResumeCacheTest(unittest.TestCase):
+ DEFAULT_TEST_COUNT = 10
+
+ def numstrs(fmtstr, base, exponent):
+ def genstrs(self, count=None):
+ return (fmtstr.format(random.randint(0, base ** exponent))
+ for c in xrange(count or self.DEFAULT_TEST_COUNT))
+ return genstrs
+
+ checksums = numstrs('{:032x}', 16, 32)
+ sizes = numstrs('{:d}', 2, 26)
+ signatures = numstrs('{:040x}', 16, 40)
+ timestamps = numstrs('{:08x}', 16, 8)
+
+ def perm_hints(self, count=DEFAULT_TEST_COUNT):
+ for sig, ts in itertools.izip(self.signatures(count),
+ self.timestamps(count)):
+ yield 'A{}@{}'.format(sig, ts)
+
+ def test_good_locators_returned(self):
+ for hint_gens in [(), (self.sizes(),), (self.perm_hints(),),
+ (self.sizes(), self.perm_hints())]:
+ for loc_data in itertools.izip(self.checksums(), *hint_gens):
+ locator = '+'.join(loc_data)
+ self.assertEquals(locator, str(KeepLocator(locator)))
+
+ def test_nonchecksum_rejected(self):
+ for badstr in ['', 'badbadbad', '8f9e68d957b504a29ba76c526c3145dj',
+ '+8f9e68d957b504a29ba76c526c3145d9',
+ '3+8f9e68d957b504a29ba76c526c3145d9']:
+ self.assertRaises(ValueError, KeepLocator, badstr)
+
+ def test_bad_hints_rejected(self):
+ checksum = next(self.checksums(1))
+ for badhint in ['', 'nonsense', '+32', checksum]:
+ self.assertRaises(ValueError, KeepLocator,
+ '+'.join([checksum, badhint]))
+
+ def test_expiry_passed(self):
+ checksum = next(self.checksums(1))
+ signature = next(self.signatures(1))
+ dt1980 = datetime.datetime(1980, 1, 1)
+ dt2000 = datetime.datetime(2000, 2, 2)
+ dt2080 = datetime.datetime(2080, 3, 3)
+ locator = KeepLocator(checksum)
+ self.assertFalse(locator.permission_expired())
+ self.assertFalse(locator.permission_expired(dt1980))
+ self.assertFalse(locator.permission_expired(dt2080))
+ # Timestamped to 1987-01-05 18:48:32.
+ locator = KeepLocator('{}+A{}@20000000'.format(checksum, signature))
+ self.assertTrue(locator.permission_expired())
+ self.assertTrue(locator.permission_expired(dt2000))
+ self.assertFalse(locator.permission_expired(dt1980))
+
+
+if __name__ == '__main__':
+ unittest.main()