X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/78722718f1369094e4cc9216f14c846c3b614e7d..3e555d75efb63c66ec6e85d376dc85e7447c64cc:/sdk/python/arvados/commands/put.py diff --git a/sdk/python/arvados/commands/put.py b/sdk/python/arvados/commands/put.py index 4398c5bca6..44f911e60b 100644 --- a/sdk/python/arvados/commands/put.py +++ b/sdk/python/arvados/commands/put.py @@ -160,11 +160,7 @@ class ResumeCache(object): os.chmod(cls.CACHE_DIR, 0o700) def __init__(self, file_spec): - try: - self.cache_file = open(file_spec, 'a+') - except TypeError: - file_spec = self.make_path(file_spec) - self.cache_file = open(file_spec, 'a+') + self.cache_file = open(file_spec, 'a+') self._lock_file(self.cache_file) self.filename = self.cache_file.name @@ -225,31 +221,35 @@ class ResumeCache(object): class ArvPutCollectionWriter(arvados.ResumableCollectionWriter): STATE_PROPS = (arvados.ResumableCollectionWriter.STATE_PROPS + - ['bytes_written']) + ['bytes_written', '_seen_inputs']) def __init__(self, cache=None, reporter=None, bytes_expected=None): self.bytes_written = 0 - self.__init_locals__(cache, reporter, bytes_expected) - super(ArvPutCollectionWriter, self).__init__() - - def __init_locals__(self, cache, reporter, bytes_expected): + self._seen_inputs = [] self.cache = cache - self.report_func = reporter + if reporter is None: + self.report_progress = lambda bytes_w, bytes_e: None + else: + self.report_progress = reporter self.bytes_expected = bytes_expected + super(ArvPutCollectionWriter, self).__init__() @classmethod def from_cache(cls, cache, reporter=None, bytes_expected=None): try: state = cache.load() state['_data_buffer'] = [base64.decodestring(state['_data_buffer'])] - writer = cls.from_state(state) + writer = cls.from_state(state, cache, reporter, bytes_expected) except (TypeError, ValueError, arvados.errors.StaleWriterStateError) as error: return cls(cache, reporter, bytes_expected) else: - writer.__init_locals__(cache, reporter, bytes_expected) return writer + def preresume_hook(self): + print >>sys.stderr, "arv-put: Resuming previous upload. Bypass with the --no-resume option." + self.report_progress(self.bytes_written, self.bytes_expected) + def checkpoint_state(self): if self.cache is None: return @@ -266,8 +266,26 @@ class ArvPutCollectionWriter(arvados.ResumableCollectionWriter): bytes_buffered = self._data_buffer_len super(ArvPutCollectionWriter, self).flush_data() self.bytes_written += (bytes_buffered - self._data_buffer_len) - if self.report_func is not None: - self.report_func(self.bytes_written, self.bytes_expected) + self.report_progress(self.bytes_written, self.bytes_expected) + + def _record_new_input(self, input_type, source_name, dest_name): + # The key needs to be a list because that's what we'll get back + # from JSON deserialization. + key = [input_type, source_name, dest_name] + if key in self._seen_inputs: + return False + self._seen_inputs.append(key) + return True + + def write_file(self, source, filename=None): + if self._record_new_input('file', source, filename): + super(ArvPutCollectionWriter, self).write_file(source, filename) + + def write_directory_tree(self, + path, stream_name='.', max_manifest_depth=-1): + if self._record_new_input('directory', path, stream_name): + super(ArvPutCollectionWriter, self).write_directory_tree( + path, stream_name, max_manifest_depth) def expected_bytes_for(pathlist): @@ -292,9 +310,9 @@ def machine_progress(bytes_written, bytes_expected): def human_progress(bytes_written, bytes_expected): if bytes_expected: - return "\r{}M / {}M {:.1f}% ".format( + return "\r{}M / {}M {:.1%} ".format( bytes_written >> 20, bytes_expected >> 20, - bytes_written / bytes_expected) + float(bytes_written) / bytes_expected) else: return "\r{} ".format(bytes_written) @@ -315,7 +333,7 @@ def main(arguments=None): reporter = None try: - resume_cache = ResumeCache(args) + resume_cache = ResumeCache(ResumeCache.make_path(args)) if not args.resume: resume_cache.restart() except ResumeCacheConflict: