os.chmod(cls.CACHE_DIR, 0o700)
def __init__(self, file_spec):
- try:
- self.cache_file = open(file_spec, 'a+')
- except TypeError:
- file_spec = self.make_path(file_spec)
- self.cache_file = open(file_spec, 'a+')
+ self.cache_file = open(file_spec, 'a+')
self._lock_file(self.cache_file)
self.filename = self.cache_file.name
class ArvPutCollectionWriter(arvados.ResumableCollectionWriter):
- def __init__(self, cache=None, reporter=None, bytes_expected=None):
- self.__init_locals__(cache, reporter, bytes_expected)
- super(ArvPutCollectionWriter, self).__init__()
+ STATE_PROPS = (arvados.ResumableCollectionWriter.STATE_PROPS +
+ ['bytes_written', '_seen_inputs'])
- def __init_locals__(self, cache, reporter, bytes_expected):
- self.cache = cache
- self.report_func = reporter
+ def __init__(self, cache=None, reporter=None, bytes_expected=None):
self.bytes_written = 0
+ self._seen_inputs = []
+ self.cache = cache
+ if reporter is None:
+ self.report_progress = lambda bytes_w, bytes_e: None
+ else:
+ self.report_progress = reporter
self.bytes_expected = bytes_expected
+ super(ArvPutCollectionWriter, self).__init__()
@classmethod
def from_cache(cls, cache, reporter=None, bytes_expected=None):
try:
state = cache.load()
state['_data_buffer'] = [base64.decodestring(state['_data_buffer'])]
- writer = cls.from_state(state)
+ writer = cls.from_state(state, cache, reporter, bytes_expected)
except (TypeError, ValueError,
arvados.errors.StaleWriterStateError) as error:
return cls(cache, reporter, bytes_expected)
else:
- writer.__init_locals__(cache, reporter, bytes_expected)
return writer
+ def preresume_hook(self):
+ print >>sys.stderr, "arv-put: Resuming previous upload. Bypass with the --no-resume option."
+ self.report_progress(self.bytes_written, self.bytes_expected)
+
def checkpoint_state(self):
if self.cache is None:
return
bytes_buffered = self._data_buffer_len
super(ArvPutCollectionWriter, self).flush_data()
self.bytes_written += (bytes_buffered - self._data_buffer_len)
- if self.report_func is not None:
- self.report_func(self.bytes_written, self.bytes_expected)
+ self.report_progress(self.bytes_written, self.bytes_expected)
+
+ def _record_new_input(self, input_type, source_name, dest_name):
+ # The key needs to be a list because that's what we'll get back
+ # from JSON deserialization.
+ key = [input_type, source_name, dest_name]
+ if key in self._seen_inputs:
+ return False
+ self._seen_inputs.append(key)
+ return True
+
+ def write_file(self, source, filename=None):
+ if self._record_new_input('file', source, filename):
+ super(ArvPutCollectionWriter, self).write_file(source, filename)
+
+ def write_directory_tree(self,
+ path, stream_name='.', max_manifest_depth=-1):
+ if self._record_new_input('directory', path, stream_name):
+ super(ArvPutCollectionWriter, self).write_directory_tree(
+ path, stream_name, max_manifest_depth)
def expected_bytes_for(pathlist):
def human_progress(bytes_written, bytes_expected):
if bytes_expected:
- return "\r{}M / {}M {:.1f}% ".format(
+ return "\r{}M / {}M {:.1%} ".format(
bytes_written >> 20, bytes_expected >> 20,
- bytes_written / bytes_expected)
+ float(bytes_written) / bytes_expected)
else:
return "\r{} ".format(bytes_written)
reporter = None
try:
- resume_cache = ResumeCache(args)
+ resume_cache = ResumeCache(ResumeCache.make_path(args))
if not args.resume:
resume_cache.restart()
except ResumeCacheConflict: