import threading
from collections import deque
+from stat import *
from keep import *
from stream import *
def __exit__(self):
self.finish()
- def _do_queued_work(self):
+ def do_queued_work(self):
# The work queue consists of three pieces:
# * _queued_file: The file object we're currently writing to the
# Collection.
def _work_dirents(self):
path, stream_name, max_manifest_depth = self._queued_trees[0]
+ if stream_name != self.current_stream_name():
+ self.start_new_stream(stream_name)
while self._queued_dirents:
dirent = self._queued_dirents.popleft()
target = os.path.join(path, dirent)
def _queue_dirents(self, stream_name, dirents):
assert (not self._queued_dirents), "tried to queue more than one tree"
self._queued_dirents = deque(sorted(dirents))
- self.start_new_stream(stream_name)
def _queue_tree(self, path, stream_name, max_manifest_depth):
self._queued_trees.append((path, stream_name, max_manifest_depth))
def write_file(self, source, filename=None):
self._queue_file(source, filename)
- self._do_queued_work()
+ self.do_queued_work()
def write_directory_tree(self,
path, stream_name='.', max_manifest_depth=-1):
self._queue_tree(path, stream_name, max_manifest_depth)
- self._do_queued_work()
+ self.do_queued_work()
def write(self, newdata):
if hasattr(newdata, '__iter__'):
self._current_file_name = None
def finish(self):
- return Keep.put(self.manifest_text())
-
+ # Send the stripped manifest to Keep, to ensure that we use the
+ # same UUID regardless of what hints are used on the collection.
+ return Keep.put(self.stripped_manifest())
+
+ def stripped_manifest(self):
+ """
+ Return the manifest for the current collection with all permission
+ hints removed from the locators in the manifest.
+ """
+ raw = self.manifest_text()
+ clean = ''
+ for line in raw.split("\n"):
+ fields = line.split()
+ if len(fields) > 0:
+ locators = [ re.sub(r'\+A[a-z0-9@_-]+', '', x)
+ for x in fields[1:-1] ]
+ clean += fields[0] + ' ' + ' '.join(locators) + ' ' + fields[-1] + "\n"
+ return clean
+
def manifest_text(self):
self.finish_current_stream()
manifest = ''
for name, locators, files in self._finished_streams:
ret += locators
return ret
+
+
+class ResumableCollectionWriter(CollectionWriter):
+ STATE_PROPS = ['_current_stream_files', '_current_stream_length',
+ '_current_stream_locators', '_current_stream_name',
+ '_current_file_name', '_current_file_pos', '_close_file',
+ '_data_buffer', '_dependencies', '_finished_streams',
+ '_queued_dirents', '_queued_trees']
+
+ def __init__(self):
+ self._dependencies = {}
+ super(ResumableCollectionWriter, self).__init__()
+
+ @classmethod
+ def from_state(cls, state, *init_args, **init_kwargs):
+ # Try to build a new writer from scratch with the given state.
+ # If the state is not suitable to resume (because files have changed,
+ # been deleted, aren't predictable, etc.), raise a
+ # StaleWriterStateError. Otherwise, return the initialized writer.
+ # The caller is responsible for calling writer.do_queued_work()
+ # appropriately after it's returned.
+ writer = cls(*init_args, **init_kwargs)
+ for attr_name in cls.STATE_PROPS:
+ attr_value = state[attr_name]
+ attr_class = getattr(writer, attr_name).__class__
+ # Coerce the value into the same type as the initial value, if
+ # needed.
+ if attr_class not in (type(None), attr_value.__class__):
+ attr_value = attr_class(attr_value)
+ setattr(writer, attr_name, attr_value)
+ # Check dependencies before we try to resume anything.
+ if any(KeepLocator(ls).permission_expired()
+ for ls in writer._current_stream_locators):
+ raise errors.StaleWriterStateError(
+ "locators include expired permission hint")
+ writer.check_dependencies()
+ if state['_current_file'] is not None:
+ path, pos = state['_current_file']
+ try:
+ writer._queued_file = open(path, 'rb')
+ writer._queued_file.seek(pos)
+ except IOError as error:
+ raise errors.StaleWriterStateError(
+ "failed to reopen active file {}: {}".format(path, error))
+ return writer
+
+ def check_dependencies(self):
+ for path, orig_stat in self._dependencies.items():
+ if not S_ISREG(orig_stat[ST_MODE]):
+ raise errors.StaleWriterStateError("{} not file".format(path))
+ try:
+ now_stat = tuple(os.stat(path))
+ except OSError as error:
+ raise errors.StaleWriterStateError(
+ "failed to stat {}: {}".format(path, error))
+ if ((not S_ISREG(now_stat[ST_MODE])) or
+ (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
+ (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
+ raise errors.StaleWriterStateError("{} changed".format(path))
+
+ def dump_state(self, copy_func=lambda x: x):
+ state = {attr: copy_func(getattr(self, attr))
+ for attr in self.STATE_PROPS}
+ if self._queued_file is None:
+ state['_current_file'] = None
+ else:
+ state['_current_file'] = (os.path.realpath(self._queued_file.name),
+ self._queued_file.tell())
+ return state
+
+ def _queue_file(self, source, filename=None):
+ try:
+ src_path = os.path.realpath(source)
+ except Exception:
+ raise errors.AssertionError("{} not a file path".format(source))
+ try:
+ path_stat = os.stat(src_path)
+ except OSError as stat_error:
+ path_stat = None
+ super(ResumableCollectionWriter, self)._queue_file(source, filename)
+ fd_stat = os.fstat(self._queued_file.fileno())
+ if not S_ISREG(fd_stat.st_mode):
+ # We won't be able to resume from this cache anyway, so don't
+ # worry about further checks.
+ self._dependencies[source] = tuple(fd_stat)
+ elif path_stat is None:
+ raise errors.AssertionError(
+ "could not stat {}: {}".format(source, stat_error))
+ elif path_stat.st_ino != fd_stat.st_ino:
+ raise errors.AssertionError(
+ "{} changed between open and stat calls".format(source))
+ else:
+ self._dependencies[src_path] = tuple(fd_stat)
+
+ def write(self, data):
+ if self._queued_file is None:
+ raise errors.AssertionError(
+ "resumable writer can't accept unsourced data")
+ return super(ResumableCollectionWriter, self).write(data)