class ArvPutCollectionWriter(arvados.ResumableCollectionWriter):
STATE_PROPS = (arvados.ResumableCollectionWriter.STATE_PROPS +
- ['bytes_written'])
+ ['bytes_written', '_seen_inputs'])
def __init__(self, cache=None, reporter=None, bytes_expected=None):
self.bytes_written = 0
+ self._seen_inputs = []
self.cache = cache
if reporter is None:
self.report_progress = lambda bytes_w, bytes_e: None
self.bytes_written += (bytes_buffered - self._data_buffer_len)
self.report_progress(self.bytes_written, self.bytes_expected)
+ def _record_new_input(self, input_type, source_name, dest_name):
+ # The key needs to be a list because that's what we'll get back
+ # from JSON deserialization.
+ key = [input_type, source_name, dest_name]
+ if key in self._seen_inputs:
+ return False
+ self._seen_inputs.append(key)
+ return True
+
+ def write_file(self, source, filename=None):
+ if self._record_new_input('file', source, filename):
+ super(ArvPutCollectionWriter, self).write_file(source, filename)
+
+ def write_directory_tree(self,
+ path, stream_name='.', max_manifest_depth=-1):
+ if self._record_new_input('directory', path, stream_name):
+ super(ArvPutCollectionWriter, self).write_directory_tree(
+ path, stream_name, max_manifest_depth)
+
def expected_bytes_for(pathlist):
# Walk the given directory trees and stat files, adding up file sizes,