+ def checkpoint_state(self):
+ if self.cache is None:
+ return
+ state = self.dump_state()
+ # Transform attributes for serialization.
+ for attr, value in state.items():
+ if attr == '_data_buffer':
+ state[attr] = base64.encodestring(''.join(value))
+ elif hasattr(value, 'popleft'):
+ state[attr] = list(value)
+ self.cache.save(state)
+
+ def flush_data(self):
+ bytes_buffered = self._data_buffer_len
+ super(ArvPutCollectionWriter, self).flush_data()
+ self.bytes_written += (bytes_buffered - self._data_buffer_len)
+ self.report_progress(self.bytes_written, self.bytes_expected)
+
+ def _record_new_input(self, input_type, source_name, dest_name):
+ # The key needs to be a list because that's what we'll get back
+ # from JSON deserialization.
+ key = [input_type, source_name, dest_name]
+ if key in self._seen_inputs:
+ return False
+ self._seen_inputs.append(key)
+ return True
+
+ def write_file(self, source, filename=None):
+ if self._record_new_input('file', source, filename):
+ super(ArvPutCollectionWriter, self).write_file(source, filename)
+
+ def write_directory_tree(self,
+ path, stream_name='.', max_manifest_depth=-1):
+ if self._record_new_input('directory', path, stream_name):
+ super(ArvPutCollectionWriter, self).write_directory_tree(
+ path, stream_name, max_manifest_depth)
+
+
+def expected_bytes_for(pathlist):