2752: Add KeepLocator class to Python SDK.
authorBrett Smith <brett@curoverse.com>
Mon, 26 May 2014 11:15:37 +0000 (07:15 -0400)
committerBrett Smith <brett@curoverse.com>
Mon, 26 May 2014 12:12:28 +0000 (08:12 -0400)
I hope this can be one place to parse and manipulate locator strings.

sdk/python/arvados/keep.py
sdk/python/tests/test_keep_locator.py [new file with mode: 0644]

index e414d267a1347a51c9ae48354082e14bf48da29d..bd9846622e9fc642464e773968f85176dc88c4b3 100644 (file)
@@ -18,6 +18,7 @@ import fcntl
 import time
 import threading
 import timer
+import datetime
 
 global_client_object = None
 
@@ -25,6 +26,89 @@ from api import *
 import config
 import arvados.errors
 
+class KeepLocator(object):
+    EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
+    HEX_RE = re.compile(r'^[0-9a-fA-F]+$')
+
+    def __init__(self, locator_str):
+        self.size = None
+        self.loc_hint = None
+        self._perm_sig = None
+        self._perm_expiry = None
+        pieces = iter(locator_str.split('+'))
+        self.md5sum = next(pieces)
+        for hint in pieces:
+            if hint.startswith('A'):
+                self.parse_permission_hint(hint)
+            elif hint.startswith('K'):
+                self.loc_hint = hint  # FIXME
+            elif hint.isdigit():
+                self.size = int(hint)
+            else:
+                raise ValueError("unrecognized hint data {}".format(hint))
+
+    def __str__(self):
+        return '+'.join(
+            str(s) for s in [self.md5sum, self.size, self.loc_hint,
+                             self.permission_hint()]
+            if s is not None)
+
+    def _is_hex_length(self, s, *size_spec):
+        if len(size_spec) == 1:
+            good_len = (len(s) == size_spec[0])
+        else:
+            good_len = (size_spec[0] <= len(s) <= size_spec[1])
+        return good_len and self.HEX_RE.match(s)
+
+    def _make_hex_prop(name, length):
+        # Build and return a new property with the given name that
+        # must be a hex string of the given length.
+        data_name = '_{}'.format(name)
+        def getter(self):
+            return getattr(self, data_name)
+        def setter(self, hex_str):
+            if not self._is_hex_length(hex_str, length):
+                raise ValueError("{} must be a {}-digit hex string: {}".
+                                 format(name, length, hex_str))
+            setattr(self, data_name, hex_str)
+        return property(getter, setter)
+
+    md5sum = _make_hex_prop('md5sum', 32)
+    perm_sig = _make_hex_prop('perm_sig', 40)
+
+    @property
+    def perm_expiry(self):
+        return self._perm_expiry
+
+    @perm_expiry.setter
+    def perm_expiry(self, value):
+        if not self._is_hex_length(value, 1, 8):
+            raise ValueError(
+                "permission timestamp must be a hex Unix timestamp: {}".
+                format(value))
+        self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
+
+    def permission_hint(self):
+        data = [self.perm_sig, self.perm_expiry]
+        if None in data:
+            return None
+        data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
+        return "A{}@{:08x}".format(*data)
+
+    def parse_permission_hint(self, s):
+        try:
+            self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
+        except IndexError:
+            raise ValueError("bad permission hint {}".format(s))
+
+    def permission_expired(self, as_of_dt=None):
+        if self.perm_expiry is None:
+            return False
+        elif as_of_dt is None:
+            as_of_dt = datetime.datetime.now()
+        return self.perm_expiry <= as_of_dt
+
+
 class Keep:
     @staticmethod
     def global_client_object():
diff --git a/sdk/python/tests/test_keep_locator.py b/sdk/python/tests/test_keep_locator.py
new file mode 100644 (file)
index 0000000..e9d6356
--- /dev/null
@@ -0,0 +1,67 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+import datetime
+import itertools
+import random
+import unittest
+
+from arvados.keep import KeepLocator
+
+class ArvadosPutResumeCacheTest(unittest.TestCase):
+    DEFAULT_TEST_COUNT = 10
+
+    def numstrs(fmtstr, base, exponent):
+        def genstrs(self, count=None):
+            return (fmtstr.format(random.randint(0, base ** exponent))
+                    for c in xrange(count or self.DEFAULT_TEST_COUNT))
+        return genstrs
+
+    checksums = numstrs('{:032x}', 16, 32)
+    sizes = numstrs('{:d}', 2, 26)
+    signatures = numstrs('{:040x}', 16, 40)
+    timestamps = numstrs('{:08x}', 16, 8)
+
+    def perm_hints(self, count=DEFAULT_TEST_COUNT):
+        for sig, ts in itertools.izip(self.signatures(count),
+                                      self.timestamps(count)):
+            yield 'A{}@{}'.format(sig, ts)
+
+    def test_good_locators_returned(self):
+        for hint_gens in [(), (self.sizes(),), (self.perm_hints(),),
+                          (self.sizes(), self.perm_hints())]:
+            for loc_data in itertools.izip(self.checksums(), *hint_gens):
+                locator = '+'.join(loc_data)
+                self.assertEquals(locator, str(KeepLocator(locator)))
+
+    def test_nonchecksum_rejected(self):
+        for badstr in ['', 'badbadbad', '8f9e68d957b504a29ba76c526c3145dj',
+                       '+8f9e68d957b504a29ba76c526c3145d9',
+                       '3+8f9e68d957b504a29ba76c526c3145d9']:
+            self.assertRaises(ValueError, KeepLocator, badstr)
+
+    def test_bad_hints_rejected(self):
+        checksum = next(self.checksums(1))
+        for badhint in ['', 'nonsense', '+32', checksum]:
+            self.assertRaises(ValueError, KeepLocator,
+                              '+'.join([checksum, badhint]))
+
+    def test_expiry_passed(self):
+        checksum = next(self.checksums(1))
+        signature = next(self.signatures(1))
+        dt1980 = datetime.datetime(1980, 1, 1)
+        dt2000 = datetime.datetime(2000, 2, 2)
+        dt2080 = datetime.datetime(2080, 3, 3)
+        locator = KeepLocator(checksum)
+        self.assertFalse(locator.permission_expired())
+        self.assertFalse(locator.permission_expired(dt1980))
+        self.assertFalse(locator.permission_expired(dt2080))
+        # Timestamped to 1987-01-05 18:48:32.
+        locator = KeepLocator('{}+A{}@20000000'.format(checksum, signature))
+        self.assertTrue(locator.permission_expired())
+        self.assertTrue(locator.permission_expired(dt2000))
+        self.assertFalse(locator.permission_expired(dt1980))
+
+
+if __name__ == '__main__':
+    unittest.main()