def __exit__(self):
self.finish()
- def _do_queued_work(self):
+ def do_queued_work(self):
# The work queue consists of three pieces:
# * _queued_file: The file object we're currently writing to the
# Collection.
self._work_trees()
else:
break
- self.checkpoint_state()
-
- def checkpoint_state(self):
- # Subclasses can implement this method to, e.g., report or record state.
- pass
def _work_file(self):
while True:
def write_file(self, source, filename=None):
self._queue_file(source, filename)
- self._do_queued_work()
+ self.do_queued_work()
def write_directory_tree(self,
path, stream_name='.', max_manifest_depth=-1):
self._queue_tree(path, stream_name, max_manifest_depth)
- self._do_queued_work()
+ self.do_queued_work()
def write(self, newdata):
if hasattr(newdata, '__iter__'):
self._current_stream_locators += [Keep.put(data_buffer[0:self.KEEP_BLOCK_SIZE])]
self._data_buffer = [data_buffer[self.KEEP_BLOCK_SIZE:]]
self._data_buffer_len = len(self._data_buffer[0])
- self.checkpoint_state()
def start_new_file(self, newfilename=None):
self.finish_current_file()
self._current_file_name = None
def finish(self):
- return Keep.put(self.manifest_text())
-
+ # Send the stripped manifest to Keep, to ensure that we use the
+ # same UUID regardless of what hints are used on the collection.
+ return Keep.put(self.stripped_manifest())
+
+ def stripped_manifest(self):
+ """
+ Return the manifest for the current collection with all permission
+ hints removed from the locators in the manifest.
+ """
+ raw = self.manifest_text()
+ clean = ''
+ for line in raw.split("\n"):
+ fields = line.split()
+ if len(fields) > 0:
+ locators = [ re.sub(r'\+A[a-z0-9@_-]+', '', x)
+ for x in fields[1:-1] ]
+ clean += fields[0] + ' ' + ' '.join(locators) + ' ' + fields[-1] + "\n"
+ return clean
+
def manifest_text(self):
self.finish_current_stream()
manifest = ''
super(ResumableCollectionWriter, self).__init__()
@classmethod
- def from_state(cls, state):
- writer = cls()
+ def from_state(cls, state, *init_args, **init_kwargs):
+ # Try to build a new writer from scratch with the given state.
+ # If the state is not suitable to resume (because files have changed,
+ # been deleted, aren't predictable, etc.), raise a
+ # StaleWriterStateError. Otherwise, return the initialized writer.
+ # The caller is responsible for calling writer.do_queued_work()
+ # appropriately after it's returned.
+ writer = cls(*init_args, **init_kwargs)
for attr_name in cls.STATE_PROPS:
attr_value = state[attr_name]
attr_class = getattr(writer, attr_name).__class__
attr_value = attr_class(attr_value)
setattr(writer, attr_name, attr_value)
# Check dependencies before we try to resume anything.
+ if any(KeepLocator(ls).permission_expired()
+ for ls in writer._current_stream_locators):
+ raise errors.StaleWriterStateError(
+ "locators include expired permission hint")
writer.check_dependencies()
if state['_current_file'] is not None:
path, pos = state['_current_file']
except IOError as error:
raise errors.StaleWriterStateError(
"failed to reopen active file {}: {}".format(path, error))
- writer._do_queued_work()
return writer
def check_dependencies(self):
raise errors.AssertionError("{} not a file path".format(source))
try:
path_stat = os.stat(src_path)
- except OSError as error:
- raise errors.AssertionError(
- "could not stat {}: {}".format(source, error))
+ except OSError as stat_error:
+ path_stat = None
super(ResumableCollectionWriter, self)._queue_file(source, filename)
fd_stat = os.fstat(self._queued_file.fileno())
- if path_stat.st_ino != fd_stat.st_ino:
+ if not S_ISREG(fd_stat.st_mode):
+ # We won't be able to resume from this cache anyway, so don't
+ # worry about further checks.
+ self._dependencies[source] = tuple(fd_stat)
+ elif path_stat is None:
+ raise errors.AssertionError(
+ "could not stat {}: {}".format(source, stat_error))
+ elif path_stat.st_ino != fd_stat.st_ino:
raise errors.AssertionError(
"{} changed between open and stat calls".format(source))
- self._dependencies[src_path] = tuple(fd_stat)
+ else:
+ self._dependencies[src_path] = tuple(fd_stat)
def write(self, data):
if self._queued_file is None: