+import arvados.util
+
+class KeepLocator(object):
+ EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
+
+ def __init__(self, locator_str):
+ self.size = None
+ self.loc_hint = None
+ self._perm_sig = None
+ self._perm_expiry = None
+ pieces = iter(locator_str.split('+'))
+ self.md5sum = next(pieces)
+ for hint in pieces:
+ if hint.startswith('A'):
+ self.parse_permission_hint(hint)
+ elif hint.startswith('K'):
+ self.loc_hint = hint # FIXME
+ elif hint.isdigit():
+ self.size = int(hint)
+ else:
+ raise ValueError("unrecognized hint data {}".format(hint))
+
+ def __str__(self):
+ return '+'.join(
+ str(s) for s in [self.md5sum, self.size, self.loc_hint,
+ self.permission_hint()]
+ if s is not None)
+
+ def _make_hex_prop(name, length):
+ # Build and return a new property with the given name that
+ # must be a hex string of the given length.
+ data_name = '_{}'.format(name)
+ def getter(self):
+ return getattr(self, data_name)
+ def setter(self, hex_str):
+ if not arvados.util.is_hex(hex_str, length):
+ raise ValueError("{} must be a {}-digit hex string: {}".
+ format(name, length, hex_str))
+ setattr(self, data_name, hex_str)
+ return property(getter, setter)
+
+ md5sum = _make_hex_prop('md5sum', 32)
+ perm_sig = _make_hex_prop('perm_sig', 40)
+
+ @property
+ def perm_expiry(self):
+ return self._perm_expiry
+
+ @perm_expiry.setter
+ def perm_expiry(self, value):
+ if not arvados.util.is_hex(value, 1, 8):
+ raise ValueError(
+ "permission timestamp must be a hex Unix timestamp: {}".
+ format(value))
+ self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
+
+ def permission_hint(self):
+ data = [self.perm_sig, self.perm_expiry]
+ if None in data:
+ return None
+ data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
+ return "A{}@{:08x}".format(*data)
+
+ def parse_permission_hint(self, s):
+ try:
+ self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
+ except IndexError:
+ raise ValueError("bad permission hint {}".format(s))
+
+ def permission_expired(self, as_of_dt=None):
+ if self.perm_expiry is None:
+ return False
+ elif as_of_dt is None:
+ as_of_dt = datetime.datetime.now()
+ return self.perm_expiry <= as_of_dt
+